MHamdan's picture
Initial commit: SPARKNET framework
d520909
"""
Agent Adapter for Document Intelligence
Bridges the DocumentAgent with the new document_intelligence subsystem.
Provides enhanced tools and capabilities.
"""
import logging
from dataclasses import dataclass
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple, Union
from .chunks.models import (
DocumentChunk,
EvidenceRef,
ParseResult,
ExtractionResult,
ClassificationResult,
DocumentType,
)
from .parsing import DocumentParser, ParserConfig
from .extraction import (
ExtractionSchema,
FieldExtractor,
ExtractionConfig,
ExtractionValidator,
)
from .grounding import EvidenceBuilder, EvidenceTracker, CropManager
from .tools import get_tool, list_tools, ToolResult
logger = logging.getLogger(__name__)
@dataclass
class AgentConfig:
"""Configuration for the document agent adapter."""
# Parser settings
render_dpi: int = 200
max_pages: Optional[int] = None
ocr_languages: List[str] = None
# Extraction settings
min_confidence: float = 0.5
abstain_on_low_confidence: bool = True
# Grounding settings
enable_crops: bool = True
crop_output_dir: Optional[Path] = None
# Agent settings
max_iterations: int = 10
verbose: bool = False
def __post_init__(self):
if self.ocr_languages is None:
self.ocr_languages = ["en"]
class DocumentIntelligenceAdapter:
"""
Adapter connecting DocumentAgent with document_intelligence subsystem.
Provides:
- Document loading and parsing
- Schema-driven extraction
- Evidence-grounded results
- Tool execution
"""
def __init__(
self,
config: Optional[AgentConfig] = None,
llm_client: Optional[Any] = None,
):
self.config = config or AgentConfig()
self.llm_client = llm_client
# Initialize components
self.parser = DocumentParser(
config=ParserConfig(
render_dpi=self.config.render_dpi,
max_pages=self.config.max_pages,
ocr_languages=self.config.ocr_languages,
)
)
self.extractor = FieldExtractor(
config=ExtractionConfig(
min_field_confidence=self.config.min_confidence,
abstain_on_low_confidence=self.config.abstain_on_low_confidence,
)
)
self.validator = ExtractionValidator(
min_confidence=self.config.min_confidence,
)
self.evidence_builder = EvidenceBuilder()
if self.config.enable_crops and self.config.crop_output_dir:
self.crop_manager = CropManager(self.config.crop_output_dir)
else:
self.crop_manager = None
# State
self._current_parse_result: Optional[ParseResult] = None
self._page_images: Dict[int, Any] = {}
logger.info("Initialized DocumentIntelligenceAdapter")
def load_document(
self,
path: Union[str, Path],
render_pages: bool = True,
) -> ParseResult:
"""
Load and parse a document.
Args:
path: Path to document file
render_pages: Whether to keep rendered page images
Returns:
ParseResult with chunks and metadata
"""
path = Path(path)
logger.info(f"Loading document: {path}")
# Parse document
self._current_parse_result = self.parser.parse(path)
# Optionally store page images
if render_pages:
from .io import load_document, RenderOptions
loader, renderer = load_document(path)
for page_num in range(1, self._current_parse_result.num_pages + 1):
self._page_images[page_num] = renderer.render_page(
page_num,
RenderOptions(dpi=self.config.render_dpi)
)
loader.close()
return self._current_parse_result
def extract_fields(
self,
schema: Union[ExtractionSchema, Dict[str, Any]],
validate: bool = True,
) -> ExtractionResult:
"""
Extract fields from the loaded document.
Args:
schema: Extraction schema
validate: Whether to validate results
Returns:
ExtractionResult with values and evidence
"""
if not self._current_parse_result:
raise RuntimeError("No document loaded. Call load_document() first.")
# Convert dict schema if needed
if isinstance(schema, dict):
schema = ExtractionSchema.from_json_schema(schema)
# Extract
result = self.extractor.extract(self._current_parse_result, schema)
# Validate if requested
if validate:
validation = self.validator.validate(result, schema)
if not validation.is_valid:
logger.warning(f"Extraction validation failed: {validation.error_count} errors")
# Add validation issues to result
result.metadata = result.metadata or {}
result.metadata["validation_issues"] = [
{"field": i.field_name, "type": i.issue_type, "message": i.message}
for i in validation.issues
]
return result
def answer_question(
self,
question: str,
use_llm: bool = True,
) -> Tuple[str, List[EvidenceRef], float]:
"""
Answer a question about the document.
Args:
question: Question to answer
use_llm: Whether to use LLM for generation
Returns:
Tuple of (answer, evidence, confidence)
"""
if not self._current_parse_result:
raise RuntimeError("No document loaded")
tool = get_tool("answer_question", llm_client=self.llm_client)
result = tool.execute(
parse_result=self._current_parse_result,
question=question,
use_rag=False,
)
if not result.success:
return f"Error: {result.error}", [], 0.0
data = result.data
answer = data.get("answer", "")
confidence = data.get("confidence", 0.5)
# Convert evidence
evidence = []
for ev_dict in result.evidence:
from .chunks.models import BoundingBox
evidence.append(EvidenceRef(
chunk_id=ev_dict["chunk_id"],
doc_id=self._current_parse_result.doc_id,
page=ev_dict["page"],
bbox=BoundingBox(
x_min=ev_dict["bbox"][0],
y_min=ev_dict["bbox"][1],
x_max=ev_dict["bbox"][2],
y_max=ev_dict["bbox"][3],
normalized=True,
),
source_type="text",
snippet=ev_dict.get("snippet", ""),
confidence=confidence,
))
return answer, evidence, confidence
def search_chunks(
self,
query: str,
chunk_types: Optional[List[str]] = None,
top_k: int = 10,
) -> List[Dict[str, Any]]:
"""
Search for chunks matching a query.
Args:
query: Search query
chunk_types: Optional chunk type filter
top_k: Maximum results
Returns:
List of matching chunks with scores
"""
if not self._current_parse_result:
raise RuntimeError("No document loaded")
tool = get_tool("search_chunks")
result = tool.execute(
parse_result=self._current_parse_result,
query=query,
chunk_types=chunk_types,
top_k=top_k,
)
if not result.success:
return []
return result.data.get("results", [])
def get_chunk(self, chunk_id: str) -> Optional[DocumentChunk]:
"""Get a chunk by ID."""
if not self._current_parse_result:
return None
for chunk in self._current_parse_result.chunks:
if chunk.chunk_id == chunk_id:
return chunk
return None
def get_page_image(self, page: int) -> Optional[Any]:
"""Get rendered page image."""
return self._page_images.get(page)
def crop_chunk(
self,
chunk: DocumentChunk,
padding_percent: float = 0.02,
) -> Optional[Any]:
"""Crop the region of a chunk from its page."""
page_image = self.get_page_image(chunk.page)
if page_image is None:
return None
from .grounding import crop_region
return crop_region(page_image, chunk.bbox, padding_percent)
def get_tools_description(self) -> str:
"""Get description of available tools for agent prompts."""
tools = list_tools()
lines = []
for tool in tools:
lines.append(f"- {tool['name']}: {tool['description']}")
return "\n".join(lines)
def execute_tool(
self,
tool_name: str,
**kwargs
) -> ToolResult:
"""
Execute a document tool.
Args:
tool_name: Name of tool to execute
**kwargs: Tool arguments
Returns:
ToolResult
"""
# Add current parse result if not provided
if "parse_result" not in kwargs and self._current_parse_result:
kwargs["parse_result"] = self._current_parse_result
tool = get_tool(tool_name, llm_client=self.llm_client)
return tool.execute(**kwargs)
@property
def parse_result(self) -> Optional[ParseResult]:
"""Get current parse result."""
return self._current_parse_result
@property
def document_id(self) -> Optional[str]:
"""Get current document ID."""
if self._current_parse_result:
return self._current_parse_result.doc_id
return None
def create_enhanced_document_agent(
llm_client: Any,
config: Optional[AgentConfig] = None,
) -> "EnhancedDocumentAgent":
"""
Create an enhanced DocumentAgent with document_intelligence integration.
Args:
llm_client: LLM client for reasoning
config: Agent configuration
Returns:
EnhancedDocumentAgent instance
"""
return EnhancedDocumentAgent(llm_client=llm_client, config=config)
class EnhancedDocumentAgent:
"""
Enhanced DocumentAgent using document_intelligence subsystem.
Extends the ReAct-style agent with:
- Better parsing and chunking
- Schema-driven extraction
- Visual grounding
- Evidence tracking
"""
def __init__(
self,
llm_client: Any,
config: Optional[AgentConfig] = None,
):
self.adapter = DocumentIntelligenceAdapter(
config=config,
llm_client=llm_client,
)
self.llm_client = llm_client
self.config = config or AgentConfig()
async def load_document(self, path: Union[str, Path]) -> ParseResult:
"""Load a document for processing."""
return self.adapter.load_document(path, render_pages=True)
async def extract_fields(
self,
schema: Union[ExtractionSchema, Dict],
) -> ExtractionResult:
"""Extract fields using schema."""
return self.adapter.extract_fields(schema, validate=True)
async def answer_question(
self,
question: str,
) -> Tuple[str, List[EvidenceRef]]:
"""Answer a question about the document."""
answer, evidence, confidence = self.adapter.answer_question(question)
return answer, evidence
async def classify(self) -> ClassificationResult:
"""Classify the document type."""
if not self.adapter.parse_result:
raise RuntimeError("No document loaded")
# Get first page content
first_page_chunks = [
c for c in self.adapter.parse_result.chunks
if c.page == 1
][:5]
content = " ".join(c.text[:200] for c in first_page_chunks)
# Simple keyword-based classification
doc_type = DocumentType.OTHER
confidence = 0.5
type_keywords = {
DocumentType.INVOICE: ["invoice", "bill", "payment due", "amount due"],
DocumentType.CONTRACT: ["agreement", "contract", "party", "whereas"],
DocumentType.RECEIPT: ["receipt", "paid", "transaction", "thank you"],
DocumentType.FORM: ["form", "fill in", "checkbox", "signature line"],
DocumentType.LETTER: ["dear", "sincerely", "regards"],
DocumentType.REPORT: ["report", "findings", "conclusion", "summary"],
DocumentType.PATENT: ["patent", "claims", "invention", "embodiment"],
}
content_lower = content.lower()
for dtype, keywords in type_keywords.items():
matches = sum(1 for k in keywords if k in content_lower)
if matches > 0:
doc_type = dtype
confidence = min(0.9, 0.5 + matches * 0.15)
break
return ClassificationResult(
doc_id=self.adapter.document_id,
document_type=doc_type,
confidence=confidence,
secondary_types=[],
)
def search(
self,
query: str,
top_k: int = 10,
) -> List[Dict[str, Any]]:
"""Search document content."""
return self.adapter.search_chunks(query, top_k=top_k)
@property
def current_document(self) -> Optional[ParseResult]:
"""Get current document."""
return self.adapter.parse_result