""" Agent Adapter for Document Intelligence Bridges the DocumentAgent with the new document_intelligence subsystem. Provides enhanced tools and capabilities. """ import logging from dataclasses import dataclass from pathlib import Path from typing import Any, Dict, List, Optional, Tuple, Union from .chunks.models import ( DocumentChunk, EvidenceRef, ParseResult, ExtractionResult, ClassificationResult, DocumentType, ) from .parsing import DocumentParser, ParserConfig from .extraction import ( ExtractionSchema, FieldExtractor, ExtractionConfig, ExtractionValidator, ) from .grounding import EvidenceBuilder, EvidenceTracker, CropManager from .tools import get_tool, list_tools, ToolResult logger = logging.getLogger(__name__) @dataclass class AgentConfig: """Configuration for the document agent adapter.""" # Parser settings render_dpi: int = 200 max_pages: Optional[int] = None ocr_languages: List[str] = None # Extraction settings min_confidence: float = 0.5 abstain_on_low_confidence: bool = True # Grounding settings enable_crops: bool = True crop_output_dir: Optional[Path] = None # Agent settings max_iterations: int = 10 verbose: bool = False def __post_init__(self): if self.ocr_languages is None: self.ocr_languages = ["en"] class DocumentIntelligenceAdapter: """ Adapter connecting DocumentAgent with document_intelligence subsystem. Provides: - Document loading and parsing - Schema-driven extraction - Evidence-grounded results - Tool execution """ def __init__( self, config: Optional[AgentConfig] = None, llm_client: Optional[Any] = None, ): self.config = config or AgentConfig() self.llm_client = llm_client # Initialize components self.parser = DocumentParser( config=ParserConfig( render_dpi=self.config.render_dpi, max_pages=self.config.max_pages, ocr_languages=self.config.ocr_languages, ) ) self.extractor = FieldExtractor( config=ExtractionConfig( min_field_confidence=self.config.min_confidence, abstain_on_low_confidence=self.config.abstain_on_low_confidence, ) ) self.validator = ExtractionValidator( min_confidence=self.config.min_confidence, ) self.evidence_builder = EvidenceBuilder() if self.config.enable_crops and self.config.crop_output_dir: self.crop_manager = CropManager(self.config.crop_output_dir) else: self.crop_manager = None # State self._current_parse_result: Optional[ParseResult] = None self._page_images: Dict[int, Any] = {} logger.info("Initialized DocumentIntelligenceAdapter") def load_document( self, path: Union[str, Path], render_pages: bool = True, ) -> ParseResult: """ Load and parse a document. Args: path: Path to document file render_pages: Whether to keep rendered page images Returns: ParseResult with chunks and metadata """ path = Path(path) logger.info(f"Loading document: {path}") # Parse document self._current_parse_result = self.parser.parse(path) # Optionally store page images if render_pages: from .io import load_document, RenderOptions loader, renderer = load_document(path) for page_num in range(1, self._current_parse_result.num_pages + 1): self._page_images[page_num] = renderer.render_page( page_num, RenderOptions(dpi=self.config.render_dpi) ) loader.close() return self._current_parse_result def extract_fields( self, schema: Union[ExtractionSchema, Dict[str, Any]], validate: bool = True, ) -> ExtractionResult: """ Extract fields from the loaded document. Args: schema: Extraction schema validate: Whether to validate results Returns: ExtractionResult with values and evidence """ if not self._current_parse_result: raise RuntimeError("No document loaded. Call load_document() first.") # Convert dict schema if needed if isinstance(schema, dict): schema = ExtractionSchema.from_json_schema(schema) # Extract result = self.extractor.extract(self._current_parse_result, schema) # Validate if requested if validate: validation = self.validator.validate(result, schema) if not validation.is_valid: logger.warning(f"Extraction validation failed: {validation.error_count} errors") # Add validation issues to result result.metadata = result.metadata or {} result.metadata["validation_issues"] = [ {"field": i.field_name, "type": i.issue_type, "message": i.message} for i in validation.issues ] return result def answer_question( self, question: str, use_llm: bool = True, ) -> Tuple[str, List[EvidenceRef], float]: """ Answer a question about the document. Args: question: Question to answer use_llm: Whether to use LLM for generation Returns: Tuple of (answer, evidence, confidence) """ if not self._current_parse_result: raise RuntimeError("No document loaded") tool = get_tool("answer_question", llm_client=self.llm_client) result = tool.execute( parse_result=self._current_parse_result, question=question, use_rag=False, ) if not result.success: return f"Error: {result.error}", [], 0.0 data = result.data answer = data.get("answer", "") confidence = data.get("confidence", 0.5) # Convert evidence evidence = [] for ev_dict in result.evidence: from .chunks.models import BoundingBox evidence.append(EvidenceRef( chunk_id=ev_dict["chunk_id"], doc_id=self._current_parse_result.doc_id, page=ev_dict["page"], bbox=BoundingBox( x_min=ev_dict["bbox"][0], y_min=ev_dict["bbox"][1], x_max=ev_dict["bbox"][2], y_max=ev_dict["bbox"][3], normalized=True, ), source_type="text", snippet=ev_dict.get("snippet", ""), confidence=confidence, )) return answer, evidence, confidence def search_chunks( self, query: str, chunk_types: Optional[List[str]] = None, top_k: int = 10, ) -> List[Dict[str, Any]]: """ Search for chunks matching a query. Args: query: Search query chunk_types: Optional chunk type filter top_k: Maximum results Returns: List of matching chunks with scores """ if not self._current_parse_result: raise RuntimeError("No document loaded") tool = get_tool("search_chunks") result = tool.execute( parse_result=self._current_parse_result, query=query, chunk_types=chunk_types, top_k=top_k, ) if not result.success: return [] return result.data.get("results", []) def get_chunk(self, chunk_id: str) -> Optional[DocumentChunk]: """Get a chunk by ID.""" if not self._current_parse_result: return None for chunk in self._current_parse_result.chunks: if chunk.chunk_id == chunk_id: return chunk return None def get_page_image(self, page: int) -> Optional[Any]: """Get rendered page image.""" return self._page_images.get(page) def crop_chunk( self, chunk: DocumentChunk, padding_percent: float = 0.02, ) -> Optional[Any]: """Crop the region of a chunk from its page.""" page_image = self.get_page_image(chunk.page) if page_image is None: return None from .grounding import crop_region return crop_region(page_image, chunk.bbox, padding_percent) def get_tools_description(self) -> str: """Get description of available tools for agent prompts.""" tools = list_tools() lines = [] for tool in tools: lines.append(f"- {tool['name']}: {tool['description']}") return "\n".join(lines) def execute_tool( self, tool_name: str, **kwargs ) -> ToolResult: """ Execute a document tool. Args: tool_name: Name of tool to execute **kwargs: Tool arguments Returns: ToolResult """ # Add current parse result if not provided if "parse_result" not in kwargs and self._current_parse_result: kwargs["parse_result"] = self._current_parse_result tool = get_tool(tool_name, llm_client=self.llm_client) return tool.execute(**kwargs) @property def parse_result(self) -> Optional[ParseResult]: """Get current parse result.""" return self._current_parse_result @property def document_id(self) -> Optional[str]: """Get current document ID.""" if self._current_parse_result: return self._current_parse_result.doc_id return None def create_enhanced_document_agent( llm_client: Any, config: Optional[AgentConfig] = None, ) -> "EnhancedDocumentAgent": """ Create an enhanced DocumentAgent with document_intelligence integration. Args: llm_client: LLM client for reasoning config: Agent configuration Returns: EnhancedDocumentAgent instance """ return EnhancedDocumentAgent(llm_client=llm_client, config=config) class EnhancedDocumentAgent: """ Enhanced DocumentAgent using document_intelligence subsystem. Extends the ReAct-style agent with: - Better parsing and chunking - Schema-driven extraction - Visual grounding - Evidence tracking """ def __init__( self, llm_client: Any, config: Optional[AgentConfig] = None, ): self.adapter = DocumentIntelligenceAdapter( config=config, llm_client=llm_client, ) self.llm_client = llm_client self.config = config or AgentConfig() async def load_document(self, path: Union[str, Path]) -> ParseResult: """Load a document for processing.""" return self.adapter.load_document(path, render_pages=True) async def extract_fields( self, schema: Union[ExtractionSchema, Dict], ) -> ExtractionResult: """Extract fields using schema.""" return self.adapter.extract_fields(schema, validate=True) async def answer_question( self, question: str, ) -> Tuple[str, List[EvidenceRef]]: """Answer a question about the document.""" answer, evidence, confidence = self.adapter.answer_question(question) return answer, evidence async def classify(self) -> ClassificationResult: """Classify the document type.""" if not self.adapter.parse_result: raise RuntimeError("No document loaded") # Get first page content first_page_chunks = [ c for c in self.adapter.parse_result.chunks if c.page == 1 ][:5] content = " ".join(c.text[:200] for c in first_page_chunks) # Simple keyword-based classification doc_type = DocumentType.OTHER confidence = 0.5 type_keywords = { DocumentType.INVOICE: ["invoice", "bill", "payment due", "amount due"], DocumentType.CONTRACT: ["agreement", "contract", "party", "whereas"], DocumentType.RECEIPT: ["receipt", "paid", "transaction", "thank you"], DocumentType.FORM: ["form", "fill in", "checkbox", "signature line"], DocumentType.LETTER: ["dear", "sincerely", "regards"], DocumentType.REPORT: ["report", "findings", "conclusion", "summary"], DocumentType.PATENT: ["patent", "claims", "invention", "embodiment"], } content_lower = content.lower() for dtype, keywords in type_keywords.items(): matches = sum(1 for k in keywords if k in content_lower) if matches > 0: doc_type = dtype confidence = min(0.9, 0.5 + matches * 0.15) break return ClassificationResult( doc_id=self.adapter.document_id, document_type=doc_type, confidence=confidence, secondary_types=[], ) def search( self, query: str, top_k: int = 10, ) -> List[Dict[str, Any]]: """Search document content.""" return self.adapter.search_chunks(query, top_k=top_k) @property def current_document(self) -> Optional[ParseResult]: """Get current document.""" return self.adapter.parse_result