quiz-generator-v3 / ui /content_processor.py
ecuartasm's picture
Initial commit: AI Course Assessment Generator
217abc3
import os
import nbformat
from typing import List, Dict, Any, Tuple
def _get_run_manager():
"""Get run manager if available, otherwise return None."""
try:
from .run_manager import get_run_manager
return get_run_manager()
except:
return None
class ContentProcessor:
"""Processes content from .vtt, .srt, .ipynb, and .md files."""
def __init__(self):
"""Initialize the ContentProcessor."""
self.file_contents = []
self.run_manager = _get_run_manager()
def process_file(self, file_path: str) -> List[str]:
"""
Process a file based on its extension and return the content.
Args:
file_path: Path to the file to process
Returns:
List containing the file content with source tags
"""
_, ext = os.path.splitext(file_path)
if ext.lower() in ['.vtt', '.srt']:
return self._process_subtitle_file(file_path)
elif ext.lower() == '.ipynb':
return self._process_notebook_file(file_path)
elif ext.lower() == '.md':
return self._process_markdown_file(file_path)
else:
raise ValueError(f"Unsupported file type: {ext}")
def _process_subtitle_file(self, file_path: str) -> List[str]:
"""Process a subtitle file (.vtt or .srt)."""
try:
filename = os.path.basename(file_path)
if self.run_manager:
self.run_manager.log(f"Found source file: {filename}", level="DEBUG")
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
# Simple processing for subtitle files
# Remove timestamp lines and other metadata
lines = content.split('\n')
text_content = []
for line in lines:
# Skip empty lines, timestamp lines, and subtitle numbers
if (line.strip() and
not line.strip().isdigit() and
not '-->' in line and
not line.strip().startswith('WEBVTT')):
text_content.append(line.strip())
# Combine all text into a single content string
combined_text = "\n".join(text_content)
# Add XML source tags at the beginning and end of the content
tagged_content = f"<source file='{filename}'>\n{combined_text}\n</source>"
return [tagged_content]
except Exception as e:
if self.run_manager:
self.run_manager.log(f"Error processing subtitle file {file_path}: {e}", level="ERROR")
return []
def _process_markdown_file(self, file_path: str) -> List[str]:
"""Process a Markdown file (.md)."""
try:
filename = os.path.basename(file_path)
if self.run_manager:
self.run_manager.log(f"Found source file: {filename}", level="DEBUG")
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
# Add XML source tags at the beginning and end of the content
tagged_content = f"<source file='{filename}'>\n{content}\n</source>"
return [tagged_content]
except Exception as e:
if self.run_manager:
self.run_manager.log(f"Error processing markdown file {file_path}: {e}", level="ERROR")
return []
def _process_notebook_file(self, file_path: str) -> List[str]:
"""Process a Jupyter notebook file (.ipynb)."""
try:
filename = os.path.basename(file_path)
if self.run_manager:
self.run_manager.log(f"Found source file: {filename}", level="DEBUG")
# First check if the file is valid JSON
try:
with open(file_path, 'r', encoding='utf-8') as f:
import json
# Try to parse as JSON first
json.load(f)
except json.JSONDecodeError as json_err:
if self.run_manager:
self.run_manager.log(f"File {file_path} is not valid JSON: {json_err}", level="DEBUG")
# If it's not valid JSON, add it as plain text with a source tag
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
tagged_content = f"<source file='{filename}'>\n```\n{content}\n```\n</source>"
return [tagged_content]
# If we get here, the file is valid JSON, try to parse as notebook
with open(file_path, 'r', encoding='utf-8') as f:
notebook = nbformat.read(f, as_version=4)
# Extract text from markdown and code cells
content_parts = []
for cell in notebook.cells:
if cell.cell_type == 'markdown':
content_parts.append(f"[Markdown]\n{cell.source}")
elif cell.cell_type == 'code':
content_parts.append(f"[Code]\n```python\n{cell.source}\n```")
# # Include output if present
# if hasattr(cell, 'outputs') and cell.outputs:
# for output in cell.outputs:
# if 'text' in output:
# content_parts.append(f"[Output]\n```\n{output.text}\n```")
# elif 'data' in output and 'text/plain' in output.data:
# content_parts.append(f"[Output]\n```\n{output.data['text/plain']}\n```")
# Combine all content into a single string
combined_content = "\n\n".join(content_parts)
# Add XML source tags at the beginning and end of the content
tagged_content = f"<source file='{filename}'>\n{combined_content}\n</source>"
return [tagged_content]
except Exception as e:
if self.run_manager:
self.run_manager.log(f"Error processing notebook file {file_path}: {e}", level="ERROR")
# Try to extract content as plain text if notebook parsing fails
try:
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
tagged_content = f"<source file='{filename}'>\n```\n{content}\n```\n</source>"
return [tagged_content]
except Exception as read_err:
if self.run_manager:
self.run_manager.log(f"Could not read file as text either: {read_err}", level="ERROR")
return []
def process_files(self, file_paths: List[str]) -> List[str]:
"""
Process multiple files and combine their content.
Args:
file_paths: List of paths to files to process
Returns:
List of file contents with source tags
"""
all_file_contents = []
for file_path in file_paths:
file_content = self.process_file(file_path)
all_file_contents.extend(file_content)
# Store the processed file contents
self.file_contents = all_file_contents
# The entire content of each file is used as context
# Each file's content is wrapped in XML source tags
# This approach ensures that the LLM has access to the complete context
return all_file_contents