|
|
|
|
|
""" |
|
|
Document Intelligence Demo |
|
|
|
|
|
Demonstrates the capabilities of the SPARKNET document_intelligence subsystem: |
|
|
- Document parsing with OCR and layout detection |
|
|
- Schema-driven field extraction |
|
|
- Visual grounding with evidence |
|
|
- Question answering |
|
|
- Document classification |
|
|
""" |
|
|
|
|
|
import asyncio |
|
|
import json |
|
|
from pathlib import Path |
|
|
|
|
|
|
|
|
import sys |
|
|
sys.path.insert(0, str(Path(__file__).parent.parent)) |
|
|
|
|
|
|
|
|
def demo_parse_document(doc_path: str): |
|
|
"""Demo: Parse a document into semantic chunks.""" |
|
|
print("\n" + "=" * 60) |
|
|
print("1. DOCUMENT PARSING") |
|
|
print("=" * 60) |
|
|
|
|
|
from src.document_intelligence import ( |
|
|
DocumentParser, |
|
|
ParserConfig, |
|
|
) |
|
|
|
|
|
|
|
|
config = ParserConfig( |
|
|
render_dpi=200, |
|
|
max_pages=5, |
|
|
include_markdown=True, |
|
|
) |
|
|
|
|
|
parser = DocumentParser(config=config) |
|
|
|
|
|
print(f"\nParsing: {doc_path}") |
|
|
result = parser.parse(doc_path) |
|
|
|
|
|
print(f"\nDocument ID: {result.doc_id}") |
|
|
print(f"Filename: {result.filename}") |
|
|
print(f"Pages: {result.num_pages}") |
|
|
print(f"Chunks: {len(result.chunks)}") |
|
|
print(f"Processing time: {result.processing_time_ms:.0f}ms") |
|
|
|
|
|
|
|
|
print("\nChunk types:") |
|
|
by_type = {} |
|
|
for chunk in result.chunks: |
|
|
t = chunk.chunk_type.value |
|
|
by_type[t] = by_type.get(t, 0) + 1 |
|
|
|
|
|
for t, count in sorted(by_type.items()): |
|
|
print(f" - {t}: {count}") |
|
|
|
|
|
|
|
|
print("\nFirst 3 chunks:") |
|
|
for i, chunk in enumerate(result.chunks[:3]): |
|
|
print(f"\n [{i+1}] Type: {chunk.chunk_type.value}, Page: {chunk.page}") |
|
|
print(f" ID: {chunk.chunk_id}") |
|
|
print(f" Text: {chunk.text[:100]}...") |
|
|
print(f" BBox: {chunk.bbox.xyxy}") |
|
|
print(f" Confidence: {chunk.confidence:.2f}") |
|
|
|
|
|
return result |
|
|
|
|
|
|
|
|
def demo_extract_fields(parse_result): |
|
|
"""Demo: Extract fields using a schema.""" |
|
|
print("\n" + "=" * 60) |
|
|
print("2. SCHEMA-DRIVEN EXTRACTION") |
|
|
print("=" * 60) |
|
|
|
|
|
from src.document_intelligence import ( |
|
|
FieldExtractor, |
|
|
ExtractionSchema, |
|
|
FieldType, |
|
|
ExtractionValidator, |
|
|
) |
|
|
|
|
|
|
|
|
schema = ExtractionSchema( |
|
|
name="DocumentInfo", |
|
|
description="Basic document information", |
|
|
) |
|
|
|
|
|
schema.add_string_field("title", "Document title or heading", required=True) |
|
|
schema.add_string_field("date", "Document date", required=False) |
|
|
schema.add_string_field("author", "Author or organization name", required=False) |
|
|
schema.add_string_field("reference_number", "Reference or ID number", required=False) |
|
|
|
|
|
print(f"\nExtraction schema: {schema.name}") |
|
|
print("Fields:") |
|
|
for field in schema.fields: |
|
|
req = "required" if field.required else "optional" |
|
|
print(f" - {field.name} ({field.field_type.value}, {req})") |
|
|
|
|
|
|
|
|
extractor = FieldExtractor() |
|
|
result = extractor.extract(parse_result, schema) |
|
|
|
|
|
print("\nExtracted data:") |
|
|
for key, value in result.data.items(): |
|
|
status = " [ABSTAINED]" if key in result.abstained_fields else "" |
|
|
print(f" {key}: {value}{status}") |
|
|
|
|
|
print(f"\nOverall confidence: {result.overall_confidence:.2f}") |
|
|
|
|
|
|
|
|
if result.evidence: |
|
|
print("\nEvidence:") |
|
|
for ev in result.evidence[:3]: |
|
|
print(f" - Page {ev.page}, Chunk {ev.chunk_id[:12]}...") |
|
|
print(f" Snippet: {ev.snippet[:80]}...") |
|
|
|
|
|
|
|
|
validator = ExtractionValidator() |
|
|
validation = validator.validate(result, schema) |
|
|
|
|
|
print(f"\nValidation: {'PASSED' if validation.is_valid else 'FAILED'}") |
|
|
if validation.issues: |
|
|
print("Issues:") |
|
|
for issue in validation.issues[:3]: |
|
|
print(f" - [{issue.severity}] {issue.field_name}: {issue.message}") |
|
|
|
|
|
return result |
|
|
|
|
|
|
|
|
def demo_search_and_qa(parse_result): |
|
|
"""Demo: Search and question answering.""" |
|
|
print("\n" + "=" * 60) |
|
|
print("3. SEARCH AND Q&A") |
|
|
print("=" * 60) |
|
|
|
|
|
from src.document_intelligence.tools import get_tool |
|
|
|
|
|
|
|
|
print("\nSearching for 'document'...") |
|
|
search_tool = get_tool("search_chunks") |
|
|
search_result = search_tool.execute( |
|
|
parse_result=parse_result, |
|
|
query="document", |
|
|
top_k=5, |
|
|
) |
|
|
|
|
|
if search_result.success: |
|
|
matches = search_result.data.get("results", []) |
|
|
print(f"Found {len(matches)} matches:") |
|
|
for i, match in enumerate(matches[:3], 1): |
|
|
print(f" {i}. Page {match['page']}, Type: {match['type']}") |
|
|
print(f" Score: {match['score']:.2f}") |
|
|
print(f" Text: {match['text'][:80]}...") |
|
|
|
|
|
|
|
|
print("\nAsking: 'What is this document about?'") |
|
|
qa_tool = get_tool("answer_question") |
|
|
qa_result = qa_tool.execute( |
|
|
parse_result=parse_result, |
|
|
question="What is this document about?", |
|
|
) |
|
|
|
|
|
if qa_result.success: |
|
|
print(f"Answer: {qa_result.data.get('answer', 'No answer')}") |
|
|
print(f"Confidence: {qa_result.data.get('confidence', 0):.2f}") |
|
|
|
|
|
|
|
|
def demo_grounding(parse_result, doc_path: str): |
|
|
"""Demo: Visual grounding with crops.""" |
|
|
print("\n" + "=" * 60) |
|
|
print("4. VISUAL GROUNDING") |
|
|
print("=" * 60) |
|
|
|
|
|
from src.document_intelligence import ( |
|
|
load_document, |
|
|
RenderOptions, |
|
|
) |
|
|
from src.document_intelligence.grounding import ( |
|
|
EvidenceBuilder, |
|
|
crop_region, |
|
|
create_annotated_image, |
|
|
) |
|
|
|
|
|
|
|
|
loader, renderer = load_document(doc_path) |
|
|
page_image = renderer.render_page(1, RenderOptions(dpi=200)) |
|
|
loader.close() |
|
|
|
|
|
print(f"\nPage 1 image size: {page_image.shape}") |
|
|
|
|
|
|
|
|
page_chunks = [c for c in parse_result.chunks if c.page == 1] |
|
|
print(f"Page 1 chunks: {len(page_chunks)}") |
|
|
|
|
|
|
|
|
if page_chunks: |
|
|
chunk = page_chunks[0] |
|
|
evidence_builder = EvidenceBuilder() |
|
|
|
|
|
evidence = evidence_builder.create_evidence( |
|
|
chunk=chunk, |
|
|
value=chunk.text[:50], |
|
|
field_name="example_field", |
|
|
) |
|
|
|
|
|
print(f"\nEvidence created:") |
|
|
print(f" Chunk ID: {evidence.chunk_id}") |
|
|
print(f" Page: {evidence.page}") |
|
|
print(f" BBox: {evidence.bbox.xyxy}") |
|
|
print(f" Snippet: {evidence.snippet[:80]}...") |
|
|
|
|
|
|
|
|
crop = crop_region(page_image, chunk.bbox) |
|
|
print(f" Crop size: {crop.shape}") |
|
|
|
|
|
|
|
|
print("\nAnnotated image would include bounding boxes for all chunks.") |
|
|
print("Use the CLI 'sparknet docint visualize' command to generate.") |
|
|
|
|
|
|
|
|
def demo_classification(parse_result): |
|
|
"""Demo: Document classification.""" |
|
|
print("\n" + "=" * 60) |
|
|
print("5. DOCUMENT CLASSIFICATION") |
|
|
print("=" * 60) |
|
|
|
|
|
from src.document_intelligence.chunks import DocumentType |
|
|
|
|
|
|
|
|
first_page = [c for c in parse_result.chunks if c.page == 1][:5] |
|
|
content = " ".join(c.text for c in first_page).lower() |
|
|
|
|
|
type_keywords = { |
|
|
"invoice": ["invoice", "bill", "payment due", "amount due"], |
|
|
"contract": ["agreement", "contract", "party", "whereas"], |
|
|
"receipt": ["receipt", "paid", "transaction"], |
|
|
"patent": ["patent", "claims", "invention"], |
|
|
"report": ["report", "findings", "summary"], |
|
|
} |
|
|
|
|
|
detected_type = "other" |
|
|
confidence = 0.3 |
|
|
|
|
|
for doc_type, keywords in type_keywords.items(): |
|
|
matches = sum(1 for k in keywords if k in content) |
|
|
if matches >= 2: |
|
|
detected_type = doc_type |
|
|
confidence = min(0.95, 0.5 + matches * 0.15) |
|
|
break |
|
|
|
|
|
print(f"\nDetected type: {detected_type}") |
|
|
print(f"Confidence: {confidence:.2f}") |
|
|
|
|
|
|
|
|
def main(): |
|
|
"""Run all demos.""" |
|
|
print("=" * 60) |
|
|
print("SPARKNET Document Intelligence Demo") |
|
|
print("=" * 60) |
|
|
|
|
|
|
|
|
sample_paths = [ |
|
|
Path("Dataset/Patent_1.pdf"), |
|
|
Path("data/sample.pdf"), |
|
|
Path("tests/fixtures/sample.pdf"), |
|
|
] |
|
|
|
|
|
doc_path = None |
|
|
for path in sample_paths: |
|
|
if path.exists(): |
|
|
doc_path = str(path) |
|
|
break |
|
|
|
|
|
if not doc_path: |
|
|
print("\nNo sample document found.") |
|
|
print("Please provide a PDF file path as argument.") |
|
|
print("\nUsage: python document_intelligence_demo.py [path/to/document.pdf]") |
|
|
|
|
|
if len(sys.argv) > 1: |
|
|
doc_path = sys.argv[1] |
|
|
else: |
|
|
return |
|
|
|
|
|
print(f"\nUsing document: {doc_path}") |
|
|
|
|
|
try: |
|
|
|
|
|
parse_result = demo_parse_document(doc_path) |
|
|
demo_extract_fields(parse_result) |
|
|
demo_search_and_qa(parse_result) |
|
|
demo_grounding(parse_result, doc_path) |
|
|
demo_classification(parse_result) |
|
|
|
|
|
print("\n" + "=" * 60) |
|
|
print("Demo complete!") |
|
|
print("=" * 60) |
|
|
|
|
|
except ImportError as e: |
|
|
print(f"\nImport error: {e}") |
|
|
print("Make sure all dependencies are installed:") |
|
|
print(" pip install pymupdf pillow numpy pydantic") |
|
|
|
|
|
except Exception as e: |
|
|
print(f"\nError: {e}") |
|
|
import traceback |
|
|
traceback.print_exc() |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
main() |
|
|
|