import os import nbformat from typing import List, Dict, Any, Tuple def _get_run_manager(): """Get run manager if available, otherwise return None.""" try: from .run_manager import get_run_manager return get_run_manager() except: return None class ContentProcessor: """Processes content from .vtt, .srt, .ipynb, and .md files.""" def __init__(self): """Initialize the ContentProcessor.""" self.file_contents = [] self.run_manager = _get_run_manager() def process_file(self, file_path: str) -> List[str]: """ Process a file based on its extension and return the content. Args: file_path: Path to the file to process Returns: List containing the file content with source tags """ _, ext = os.path.splitext(file_path) if ext.lower() in ['.vtt', '.srt']: return self._process_subtitle_file(file_path) elif ext.lower() == '.ipynb': return self._process_notebook_file(file_path) elif ext.lower() == '.md': return self._process_markdown_file(file_path) else: raise ValueError(f"Unsupported file type: {ext}") def _process_subtitle_file(self, file_path: str) -> List[str]: """Process a subtitle file (.vtt or .srt).""" try: filename = os.path.basename(file_path) if self.run_manager: self.run_manager.log(f"Found source file: {filename}", level="DEBUG") with open(file_path, 'r', encoding='utf-8') as f: content = f.read() # Simple processing for subtitle files # Remove timestamp lines and other metadata lines = content.split('\n') text_content = [] for line in lines: # Skip empty lines, timestamp lines, and subtitle numbers if (line.strip() and not line.strip().isdigit() and not '-->' in line and not line.strip().startswith('WEBVTT')): text_content.append(line.strip()) # Combine all text into a single content string combined_text = "\n".join(text_content) # Add XML source tags at the beginning and end of the content tagged_content = f"\n{combined_text}\n" return [tagged_content] except Exception as e: if self.run_manager: self.run_manager.log(f"Error processing subtitle file {file_path}: {e}", level="ERROR") return [] def _process_markdown_file(self, file_path: str) -> List[str]: """Process a Markdown file (.md).""" try: filename = os.path.basename(file_path) if self.run_manager: self.run_manager.log(f"Found source file: {filename}", level="DEBUG") with open(file_path, 'r', encoding='utf-8') as f: content = f.read() # Add XML source tags at the beginning and end of the content tagged_content = f"\n{content}\n" return [tagged_content] except Exception as e: if self.run_manager: self.run_manager.log(f"Error processing markdown file {file_path}: {e}", level="ERROR") return [] def _process_notebook_file(self, file_path: str) -> List[str]: """Process a Jupyter notebook file (.ipynb).""" try: filename = os.path.basename(file_path) if self.run_manager: self.run_manager.log(f"Found source file: {filename}", level="DEBUG") # First check if the file is valid JSON try: with open(file_path, 'r', encoding='utf-8') as f: import json # Try to parse as JSON first json.load(f) except json.JSONDecodeError as json_err: if self.run_manager: self.run_manager.log(f"File {file_path} is not valid JSON: {json_err}", level="DEBUG") # If it's not valid JSON, add it as plain text with a source tag with open(file_path, 'r', encoding='utf-8') as f: content = f.read() tagged_content = f"\n```\n{content}\n```\n" return [tagged_content] # If we get here, the file is valid JSON, try to parse as notebook with open(file_path, 'r', encoding='utf-8') as f: notebook = nbformat.read(f, as_version=4) # Extract text from markdown and code cells content_parts = [] for cell in notebook.cells: if cell.cell_type == 'markdown': content_parts.append(f"[Markdown]\n{cell.source}") elif cell.cell_type == 'code': content_parts.append(f"[Code]\n```python\n{cell.source}\n```") # # Include output if present # if hasattr(cell, 'outputs') and cell.outputs: # for output in cell.outputs: # if 'text' in output: # content_parts.append(f"[Output]\n```\n{output.text}\n```") # elif 'data' in output and 'text/plain' in output.data: # content_parts.append(f"[Output]\n```\n{output.data['text/plain']}\n```") # Combine all content into a single string combined_content = "\n\n".join(content_parts) # Add XML source tags at the beginning and end of the content tagged_content = f"\n{combined_content}\n" return [tagged_content] except Exception as e: if self.run_manager: self.run_manager.log(f"Error processing notebook file {file_path}: {e}", level="ERROR") # Try to extract content as plain text if notebook parsing fails try: with open(file_path, 'r', encoding='utf-8') as f: content = f.read() tagged_content = f"\n```\n{content}\n```\n" return [tagged_content] except Exception as read_err: if self.run_manager: self.run_manager.log(f"Could not read file as text either: {read_err}", level="ERROR") return [] def process_files(self, file_paths: List[str]) -> List[str]: """ Process multiple files and combine their content. Args: file_paths: List of paths to files to process Returns: List of file contents with source tags """ all_file_contents = [] for file_path in file_paths: file_content = self.process_file(file_path) all_file_contents.extend(file_content) # Store the processed file contents self.file_contents = all_file_contents # The entire content of each file is used as context # Each file's content is wrapped in XML source tags # This approach ensures that the LLM has access to the complete context return all_file_contents