|
|
""" |
|
|
Agent Adapter for Document Intelligence |
|
|
|
|
|
Bridges the DocumentAgent with the new document_intelligence subsystem. |
|
|
Provides enhanced tools and capabilities. |
|
|
""" |
|
|
|
|
|
import logging |
|
|
from dataclasses import dataclass |
|
|
from pathlib import Path |
|
|
from typing import Any, Dict, List, Optional, Tuple, Union |
|
|
|
|
|
from .chunks.models import ( |
|
|
DocumentChunk, |
|
|
EvidenceRef, |
|
|
ParseResult, |
|
|
ExtractionResult, |
|
|
ClassificationResult, |
|
|
DocumentType, |
|
|
) |
|
|
from .parsing import DocumentParser, ParserConfig |
|
|
from .extraction import ( |
|
|
ExtractionSchema, |
|
|
FieldExtractor, |
|
|
ExtractionConfig, |
|
|
ExtractionValidator, |
|
|
) |
|
|
from .grounding import EvidenceBuilder, EvidenceTracker, CropManager |
|
|
from .tools import get_tool, list_tools, ToolResult |
|
|
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
|
|
|
@dataclass |
|
|
class AgentConfig: |
|
|
"""Configuration for the document agent adapter.""" |
|
|
|
|
|
|
|
|
render_dpi: int = 200 |
|
|
max_pages: Optional[int] = None |
|
|
ocr_languages: List[str] = None |
|
|
|
|
|
|
|
|
min_confidence: float = 0.5 |
|
|
abstain_on_low_confidence: bool = True |
|
|
|
|
|
|
|
|
enable_crops: bool = True |
|
|
crop_output_dir: Optional[Path] = None |
|
|
|
|
|
|
|
|
max_iterations: int = 10 |
|
|
verbose: bool = False |
|
|
|
|
|
def __post_init__(self): |
|
|
if self.ocr_languages is None: |
|
|
self.ocr_languages = ["en"] |
|
|
|
|
|
|
|
|
class DocumentIntelligenceAdapter: |
|
|
""" |
|
|
Adapter connecting DocumentAgent with document_intelligence subsystem. |
|
|
|
|
|
Provides: |
|
|
- Document loading and parsing |
|
|
- Schema-driven extraction |
|
|
- Evidence-grounded results |
|
|
- Tool execution |
|
|
""" |
|
|
|
|
|
def __init__( |
|
|
self, |
|
|
config: Optional[AgentConfig] = None, |
|
|
llm_client: Optional[Any] = None, |
|
|
): |
|
|
self.config = config or AgentConfig() |
|
|
self.llm_client = llm_client |
|
|
|
|
|
|
|
|
self.parser = DocumentParser( |
|
|
config=ParserConfig( |
|
|
render_dpi=self.config.render_dpi, |
|
|
max_pages=self.config.max_pages, |
|
|
ocr_languages=self.config.ocr_languages, |
|
|
) |
|
|
) |
|
|
|
|
|
self.extractor = FieldExtractor( |
|
|
config=ExtractionConfig( |
|
|
min_field_confidence=self.config.min_confidence, |
|
|
abstain_on_low_confidence=self.config.abstain_on_low_confidence, |
|
|
) |
|
|
) |
|
|
|
|
|
self.validator = ExtractionValidator( |
|
|
min_confidence=self.config.min_confidence, |
|
|
) |
|
|
|
|
|
self.evidence_builder = EvidenceBuilder() |
|
|
|
|
|
if self.config.enable_crops and self.config.crop_output_dir: |
|
|
self.crop_manager = CropManager(self.config.crop_output_dir) |
|
|
else: |
|
|
self.crop_manager = None |
|
|
|
|
|
|
|
|
self._current_parse_result: Optional[ParseResult] = None |
|
|
self._page_images: Dict[int, Any] = {} |
|
|
|
|
|
logger.info("Initialized DocumentIntelligenceAdapter") |
|
|
|
|
|
def load_document( |
|
|
self, |
|
|
path: Union[str, Path], |
|
|
render_pages: bool = True, |
|
|
) -> ParseResult: |
|
|
""" |
|
|
Load and parse a document. |
|
|
|
|
|
Args: |
|
|
path: Path to document file |
|
|
render_pages: Whether to keep rendered page images |
|
|
|
|
|
Returns: |
|
|
ParseResult with chunks and metadata |
|
|
""" |
|
|
path = Path(path) |
|
|
logger.info(f"Loading document: {path}") |
|
|
|
|
|
|
|
|
self._current_parse_result = self.parser.parse(path) |
|
|
|
|
|
|
|
|
if render_pages: |
|
|
from .io import load_document, RenderOptions |
|
|
loader, renderer = load_document(path) |
|
|
for page_num in range(1, self._current_parse_result.num_pages + 1): |
|
|
self._page_images[page_num] = renderer.render_page( |
|
|
page_num, |
|
|
RenderOptions(dpi=self.config.render_dpi) |
|
|
) |
|
|
loader.close() |
|
|
|
|
|
return self._current_parse_result |
|
|
|
|
|
def extract_fields( |
|
|
self, |
|
|
schema: Union[ExtractionSchema, Dict[str, Any]], |
|
|
validate: bool = True, |
|
|
) -> ExtractionResult: |
|
|
""" |
|
|
Extract fields from the loaded document. |
|
|
|
|
|
Args: |
|
|
schema: Extraction schema |
|
|
validate: Whether to validate results |
|
|
|
|
|
Returns: |
|
|
ExtractionResult with values and evidence |
|
|
""" |
|
|
if not self._current_parse_result: |
|
|
raise RuntimeError("No document loaded. Call load_document() first.") |
|
|
|
|
|
|
|
|
if isinstance(schema, dict): |
|
|
schema = ExtractionSchema.from_json_schema(schema) |
|
|
|
|
|
|
|
|
result = self.extractor.extract(self._current_parse_result, schema) |
|
|
|
|
|
|
|
|
if validate: |
|
|
validation = self.validator.validate(result, schema) |
|
|
if not validation.is_valid: |
|
|
logger.warning(f"Extraction validation failed: {validation.error_count} errors") |
|
|
|
|
|
result.metadata = result.metadata or {} |
|
|
result.metadata["validation_issues"] = [ |
|
|
{"field": i.field_name, "type": i.issue_type, "message": i.message} |
|
|
for i in validation.issues |
|
|
] |
|
|
|
|
|
return result |
|
|
|
|
|
def answer_question( |
|
|
self, |
|
|
question: str, |
|
|
use_llm: bool = True, |
|
|
) -> Tuple[str, List[EvidenceRef], float]: |
|
|
""" |
|
|
Answer a question about the document. |
|
|
|
|
|
Args: |
|
|
question: Question to answer |
|
|
use_llm: Whether to use LLM for generation |
|
|
|
|
|
Returns: |
|
|
Tuple of (answer, evidence, confidence) |
|
|
""" |
|
|
if not self._current_parse_result: |
|
|
raise RuntimeError("No document loaded") |
|
|
|
|
|
tool = get_tool("answer_question", llm_client=self.llm_client) |
|
|
result = tool.execute( |
|
|
parse_result=self._current_parse_result, |
|
|
question=question, |
|
|
use_rag=False, |
|
|
) |
|
|
|
|
|
if not result.success: |
|
|
return f"Error: {result.error}", [], 0.0 |
|
|
|
|
|
data = result.data |
|
|
answer = data.get("answer", "") |
|
|
confidence = data.get("confidence", 0.5) |
|
|
|
|
|
|
|
|
evidence = [] |
|
|
for ev_dict in result.evidence: |
|
|
from .chunks.models import BoundingBox |
|
|
evidence.append(EvidenceRef( |
|
|
chunk_id=ev_dict["chunk_id"], |
|
|
doc_id=self._current_parse_result.doc_id, |
|
|
page=ev_dict["page"], |
|
|
bbox=BoundingBox( |
|
|
x_min=ev_dict["bbox"][0], |
|
|
y_min=ev_dict["bbox"][1], |
|
|
x_max=ev_dict["bbox"][2], |
|
|
y_max=ev_dict["bbox"][3], |
|
|
normalized=True, |
|
|
), |
|
|
source_type="text", |
|
|
snippet=ev_dict.get("snippet", ""), |
|
|
confidence=confidence, |
|
|
)) |
|
|
|
|
|
return answer, evidence, confidence |
|
|
|
|
|
def search_chunks( |
|
|
self, |
|
|
query: str, |
|
|
chunk_types: Optional[List[str]] = None, |
|
|
top_k: int = 10, |
|
|
) -> List[Dict[str, Any]]: |
|
|
""" |
|
|
Search for chunks matching a query. |
|
|
|
|
|
Args: |
|
|
query: Search query |
|
|
chunk_types: Optional chunk type filter |
|
|
top_k: Maximum results |
|
|
|
|
|
Returns: |
|
|
List of matching chunks with scores |
|
|
""" |
|
|
if not self._current_parse_result: |
|
|
raise RuntimeError("No document loaded") |
|
|
|
|
|
tool = get_tool("search_chunks") |
|
|
result = tool.execute( |
|
|
parse_result=self._current_parse_result, |
|
|
query=query, |
|
|
chunk_types=chunk_types, |
|
|
top_k=top_k, |
|
|
) |
|
|
|
|
|
if not result.success: |
|
|
return [] |
|
|
|
|
|
return result.data.get("results", []) |
|
|
|
|
|
def get_chunk(self, chunk_id: str) -> Optional[DocumentChunk]: |
|
|
"""Get a chunk by ID.""" |
|
|
if not self._current_parse_result: |
|
|
return None |
|
|
|
|
|
for chunk in self._current_parse_result.chunks: |
|
|
if chunk.chunk_id == chunk_id: |
|
|
return chunk |
|
|
return None |
|
|
|
|
|
def get_page_image(self, page: int) -> Optional[Any]: |
|
|
"""Get rendered page image.""" |
|
|
return self._page_images.get(page) |
|
|
|
|
|
def crop_chunk( |
|
|
self, |
|
|
chunk: DocumentChunk, |
|
|
padding_percent: float = 0.02, |
|
|
) -> Optional[Any]: |
|
|
"""Crop the region of a chunk from its page.""" |
|
|
page_image = self.get_page_image(chunk.page) |
|
|
if page_image is None: |
|
|
return None |
|
|
|
|
|
from .grounding import crop_region |
|
|
return crop_region(page_image, chunk.bbox, padding_percent) |
|
|
|
|
|
def get_tools_description(self) -> str: |
|
|
"""Get description of available tools for agent prompts.""" |
|
|
tools = list_tools() |
|
|
lines = [] |
|
|
for tool in tools: |
|
|
lines.append(f"- {tool['name']}: {tool['description']}") |
|
|
return "\n".join(lines) |
|
|
|
|
|
def execute_tool( |
|
|
self, |
|
|
tool_name: str, |
|
|
**kwargs |
|
|
) -> ToolResult: |
|
|
""" |
|
|
Execute a document tool. |
|
|
|
|
|
Args: |
|
|
tool_name: Name of tool to execute |
|
|
**kwargs: Tool arguments |
|
|
|
|
|
Returns: |
|
|
ToolResult |
|
|
""" |
|
|
|
|
|
if "parse_result" not in kwargs and self._current_parse_result: |
|
|
kwargs["parse_result"] = self._current_parse_result |
|
|
|
|
|
tool = get_tool(tool_name, llm_client=self.llm_client) |
|
|
return tool.execute(**kwargs) |
|
|
|
|
|
@property |
|
|
def parse_result(self) -> Optional[ParseResult]: |
|
|
"""Get current parse result.""" |
|
|
return self._current_parse_result |
|
|
|
|
|
@property |
|
|
def document_id(self) -> Optional[str]: |
|
|
"""Get current document ID.""" |
|
|
if self._current_parse_result: |
|
|
return self._current_parse_result.doc_id |
|
|
return None |
|
|
|
|
|
|
|
|
def create_enhanced_document_agent( |
|
|
llm_client: Any, |
|
|
config: Optional[AgentConfig] = None, |
|
|
) -> "EnhancedDocumentAgent": |
|
|
""" |
|
|
Create an enhanced DocumentAgent with document_intelligence integration. |
|
|
|
|
|
Args: |
|
|
llm_client: LLM client for reasoning |
|
|
config: Agent configuration |
|
|
|
|
|
Returns: |
|
|
EnhancedDocumentAgent instance |
|
|
""" |
|
|
return EnhancedDocumentAgent(llm_client=llm_client, config=config) |
|
|
|
|
|
|
|
|
class EnhancedDocumentAgent: |
|
|
""" |
|
|
Enhanced DocumentAgent using document_intelligence subsystem. |
|
|
|
|
|
Extends the ReAct-style agent with: |
|
|
- Better parsing and chunking |
|
|
- Schema-driven extraction |
|
|
- Visual grounding |
|
|
- Evidence tracking |
|
|
""" |
|
|
|
|
|
def __init__( |
|
|
self, |
|
|
llm_client: Any, |
|
|
config: Optional[AgentConfig] = None, |
|
|
): |
|
|
self.adapter = DocumentIntelligenceAdapter( |
|
|
config=config, |
|
|
llm_client=llm_client, |
|
|
) |
|
|
self.llm_client = llm_client |
|
|
self.config = config or AgentConfig() |
|
|
|
|
|
async def load_document(self, path: Union[str, Path]) -> ParseResult: |
|
|
"""Load a document for processing.""" |
|
|
return self.adapter.load_document(path, render_pages=True) |
|
|
|
|
|
async def extract_fields( |
|
|
self, |
|
|
schema: Union[ExtractionSchema, Dict], |
|
|
) -> ExtractionResult: |
|
|
"""Extract fields using schema.""" |
|
|
return self.adapter.extract_fields(schema, validate=True) |
|
|
|
|
|
async def answer_question( |
|
|
self, |
|
|
question: str, |
|
|
) -> Tuple[str, List[EvidenceRef]]: |
|
|
"""Answer a question about the document.""" |
|
|
answer, evidence, confidence = self.adapter.answer_question(question) |
|
|
return answer, evidence |
|
|
|
|
|
async def classify(self) -> ClassificationResult: |
|
|
"""Classify the document type.""" |
|
|
if not self.adapter.parse_result: |
|
|
raise RuntimeError("No document loaded") |
|
|
|
|
|
|
|
|
first_page_chunks = [ |
|
|
c for c in self.adapter.parse_result.chunks |
|
|
if c.page == 1 |
|
|
][:5] |
|
|
|
|
|
content = " ".join(c.text[:200] for c in first_page_chunks) |
|
|
|
|
|
|
|
|
doc_type = DocumentType.OTHER |
|
|
confidence = 0.5 |
|
|
|
|
|
type_keywords = { |
|
|
DocumentType.INVOICE: ["invoice", "bill", "payment due", "amount due"], |
|
|
DocumentType.CONTRACT: ["agreement", "contract", "party", "whereas"], |
|
|
DocumentType.RECEIPT: ["receipt", "paid", "transaction", "thank you"], |
|
|
DocumentType.FORM: ["form", "fill in", "checkbox", "signature line"], |
|
|
DocumentType.LETTER: ["dear", "sincerely", "regards"], |
|
|
DocumentType.REPORT: ["report", "findings", "conclusion", "summary"], |
|
|
DocumentType.PATENT: ["patent", "claims", "invention", "embodiment"], |
|
|
} |
|
|
|
|
|
content_lower = content.lower() |
|
|
for dtype, keywords in type_keywords.items(): |
|
|
matches = sum(1 for k in keywords if k in content_lower) |
|
|
if matches > 0: |
|
|
doc_type = dtype |
|
|
confidence = min(0.9, 0.5 + matches * 0.15) |
|
|
break |
|
|
|
|
|
return ClassificationResult( |
|
|
doc_id=self.adapter.document_id, |
|
|
document_type=doc_type, |
|
|
confidence=confidence, |
|
|
secondary_types=[], |
|
|
) |
|
|
|
|
|
def search( |
|
|
self, |
|
|
query: str, |
|
|
top_k: int = 10, |
|
|
) -> List[Dict[str, Any]]: |
|
|
"""Search document content.""" |
|
|
return self.adapter.search_chunks(query, top_k=top_k) |
|
|
|
|
|
@property |
|
|
def current_document(self) -> Optional[ParseResult]: |
|
|
"""Get current document.""" |
|
|
return self.adapter.parse_result |
|
|
|