SPARKNET / src /cli /document.py
MHamdan's picture
Initial commit: SPARKNET framework
d520909
"""
Document Processing CLI Commands
Commands:
sparknet document parse <file> - Parse and extract text from document
sparknet document extract <file> - Extract structured fields
sparknet document classify <file> - Classify document type
sparknet document analyze <file> - Full document analysis
"""
import typer
from typing import Optional, List
from pathlib import Path
import json
import sys
# Create document sub-app
document_app = typer.Typer(
name="document",
help="Document processing commands",
)
@document_app.command("parse")
def parse_document(
file_path: Path = typer.Argument(..., help="Path to document file"),
output: Optional[Path] = typer.Option(None, "--output", "-o", help="Output JSON file"),
ocr_engine: str = typer.Option("paddleocr", "--ocr", help="OCR engine: paddleocr, tesseract"),
dpi: int = typer.Option(300, "--dpi", help="Rendering DPI for PDFs"),
max_pages: Optional[int] = typer.Option(None, "--max-pages", help="Maximum pages to process"),
include_images: bool = typer.Option(False, "--images", help="Include cropped region images"),
):
"""
Parse a document and extract text with layout information.
Example:
sparknet document parse invoice.pdf -o result.json
"""
from loguru import logger
if not file_path.exists():
typer.echo(f"Error: File not found: {file_path}", err=True)
raise typer.Exit(1)
typer.echo(f"Parsing document: {file_path}")
try:
from ..document.pipeline import (
PipelineConfig,
get_document_processor,
)
from ..document.ocr import OCRConfig
# Build config
ocr_config = OCRConfig(engine=ocr_engine)
config = PipelineConfig(
ocr=ocr_config,
render_dpi=dpi,
max_pages=max_pages,
)
# Process document
processor = get_document_processor(config)
result = processor.process(str(file_path))
# Format output
output_data = {
"document_id": result.metadata.document_id,
"filename": result.metadata.filename,
"num_pages": result.metadata.num_pages,
"total_chunks": result.metadata.total_chunks,
"total_characters": result.metadata.total_characters,
"ocr_confidence": result.metadata.ocr_confidence_avg,
"chunks": [
{
"chunk_id": c.chunk_id,
"type": c.chunk_type.value,
"page": c.page,
"text": c.text[:500] + "..." if len(c.text) > 500 else c.text,
"confidence": c.confidence,
"bbox": {
"x_min": c.bbox.x_min,
"y_min": c.bbox.y_min,
"x_max": c.bbox.x_max,
"y_max": c.bbox.y_max,
},
}
for c in result.chunks
],
"full_text": result.full_text[:2000] + "..." if len(result.full_text) > 2000 else result.full_text,
}
# Output
if output:
with open(output, "w") as f:
json.dump(output_data, f, indent=2)
typer.echo(f"Results written to: {output}")
else:
typer.echo(json.dumps(output_data, indent=2))
typer.echo(f"\nProcessed {result.metadata.num_pages} pages, {len(result.chunks)} chunks")
except ImportError as e:
typer.echo(f"Error: Missing dependency - {e}", err=True)
raise typer.Exit(1)
except Exception as e:
typer.echo(f"Error processing document: {e}", err=True)
raise typer.Exit(1)
@document_app.command("extract")
def extract_fields(
file_path: Path = typer.Argument(..., help="Path to document file"),
schema: Optional[Path] = typer.Option(None, "--schema", "-s", help="Extraction schema YAML file"),
fields: Optional[List[str]] = typer.Option(None, "--field", "-f", help="Fields to extract (can use multiple)"),
output: Optional[Path] = typer.Option(None, "--output", "-o", help="Output JSON file"),
validate: bool = typer.Option(True, "--validate/--no-validate", help="Validate extraction"),
):
"""
Extract structured fields from a document.
Example:
sparknet document extract invoice.pdf -f "invoice_number" -f "total_amount"
sparknet document extract contract.pdf --schema contract_schema.yaml
"""
from loguru import logger
if not file_path.exists():
typer.echo(f"Error: File not found: {file_path}", err=True)
raise typer.Exit(1)
if not schema and not fields:
typer.echo("Error: Provide --schema or --field options", err=True)
raise typer.Exit(1)
typer.echo(f"Extracting fields from: {file_path}")
try:
from ..document.schemas.extraction import ExtractionSchema, FieldDefinition
from ..agents.document_agent import DocumentAgent
# Build extraction schema
if schema:
import yaml
with open(schema) as f:
schema_data = yaml.safe_load(f)
extraction_schema = ExtractionSchema(**schema_data)
else:
# Build from field names
field_defs = [
FieldDefinition(
name=f,
field_type="string",
required=True,
)
for f in fields
]
extraction_schema = ExtractionSchema(
name="cli_extraction",
fields=field_defs,
)
# Run extraction with agent
import asyncio
agent = DocumentAgent()
asyncio.run(agent.load_document(str(file_path)))
result = asyncio.run(agent.extract_fields(extraction_schema))
# Format output
output_data = {
"document": str(file_path),
"fields": result.fields,
"confidence": result.confidence,
"evidence": [
{
"chunk_id": e.chunk_id,
"page": e.page,
"snippet": e.snippet,
}
for e in result.evidence
] if result.evidence else [],
}
# Validate if requested
if validate and result.fields:
from ..document.validation import get_extraction_critic
critic = get_extraction_critic()
evidence_chunks = [
{"text": e.snippet, "page": e.page, "chunk_id": e.chunk_id}
for e in result.evidence
] if result.evidence else []
validation = critic.validate_extraction(result.fields, evidence_chunks)
output_data["validation"] = {
"status": validation.overall_status.value,
"confidence": validation.overall_confidence,
"should_accept": validation.should_accept,
"abstain_reason": validation.abstain_reason,
}
# Output
if output:
with open(output, "w") as f:
json.dump(output_data, f, indent=2)
typer.echo(f"Results written to: {output}")
else:
typer.echo(json.dumps(output_data, indent=2))
except ImportError as e:
typer.echo(f"Error: Missing dependency - {e}", err=True)
raise typer.Exit(1)
except Exception as e:
typer.echo(f"Error extracting fields: {e}", err=True)
raise typer.Exit(1)
@document_app.command("classify")
def classify_document(
file_path: Path = typer.Argument(..., help="Path to document file"),
output: Optional[Path] = typer.Option(None, "--output", "-o", help="Output JSON file"),
):
"""
Classify document type.
Example:
sparknet document classify document.pdf
"""
from loguru import logger
if not file_path.exists():
typer.echo(f"Error: File not found: {file_path}", err=True)
raise typer.Exit(1)
typer.echo(f"Classifying document: {file_path}")
try:
from ..agents.document_agent import DocumentAgent
import asyncio
agent = DocumentAgent()
asyncio.run(agent.load_document(str(file_path)))
classification = asyncio.run(agent.classify())
output_data = {
"document": str(file_path),
"document_type": classification.document_type.value,
"confidence": classification.confidence,
"reasoning": classification.reasoning,
"metadata": classification.metadata,
}
if output:
with open(output, "w") as f:
json.dump(output_data, f, indent=2)
typer.echo(f"Results written to: {output}")
else:
typer.echo(json.dumps(output_data, indent=2))
except Exception as e:
typer.echo(f"Error classifying document: {e}", err=True)
raise typer.Exit(1)
@document_app.command("ask")
def ask_document(
file_path: Path = typer.Argument(..., help="Path to document file"),
question: str = typer.Argument(..., help="Question to ask about the document"),
output: Optional[Path] = typer.Option(None, "--output", "-o", help="Output JSON file"),
):
"""
Ask a question about a document.
Example:
sparknet document ask invoice.pdf "What is the total amount?"
"""
from loguru import logger
if not file_path.exists():
typer.echo(f"Error: File not found: {file_path}", err=True)
raise typer.Exit(1)
typer.echo(f"Processing question for: {file_path}")
try:
from ..agents.document_agent import DocumentAgent
import asyncio
agent = DocumentAgent()
asyncio.run(agent.load_document(str(file_path)))
answer, evidence = asyncio.run(agent.answer_question(question))
output_data = {
"document": str(file_path),
"question": question,
"answer": answer,
"evidence": [
{
"chunk_id": e.chunk_id,
"page": e.page,
"snippet": e.snippet,
"confidence": e.confidence,
}
for e in evidence
] if evidence else [],
}
if output:
with open(output, "w") as f:
json.dump(output_data, f, indent=2)
typer.echo(f"Results written to: {output}")
else:
typer.echo(f"\nQuestion: {question}")
typer.echo(f"\nAnswer: {answer}")
if evidence:
typer.echo(f"\nEvidence ({len(evidence)} sources):")
for e in evidence[:3]:
typer.echo(f" - Page {e.page + 1}: {e.snippet[:100]}...")
except Exception as e:
typer.echo(f"Error processing question: {e}", err=True)
raise typer.Exit(1)