"""Parse and merge structured OCR JSON from MiniCPM-V.""" from __future__ import annotations import json import re from typing import Any, Dict, List, Optional, Tuple # Reject placeholder keys the model sometimes copies from schema examples. _GENERIC_KEY_PATTERN = re.compile( r"^(label|value|field\d*|column\d*|cell\d*|key|example|sample|placeholder|" r"header\d*|row\d*|item\d*|data\d*|text\d*|name\d*)$", re.IGNORECASE, ) _GENERIC_SECTION_TITLES = { "details", "section name", "table section name", "account information", "balance summary", "line items", "transactions", "key value", "key_value", } def _strip_json_fence(text: str) -> str: cleaned = text.strip() cleaned = re.sub(r"^```(?:json)?\s*", "", cleaned, flags=re.IGNORECASE) cleaned = re.sub(r"\s*```$", "", cleaned) return cleaned.strip() def _is_generic_key(key: str) -> bool: stripped = key.strip() if not stripped: return True return bool(_GENERIC_KEY_PATTERN.match(stripped)) def _normalize_section_title(title: str, fallback: str = "Extracted fields") -> str: cleaned = title.strip() if not cleaned or cleaned.lower() in _GENERIC_SECTION_TITLES: return fallback return cleaned def _coerce_fields(section: Dict[str, Any]) -> Dict[str, str]: """Accept fields dict or list-of-pairs formats from the model.""" fields: Dict[str, str] = {} raw_fields = section.get("fields") if isinstance(raw_fields, dict): for key, value in raw_fields.items(): key_str = str(key).strip() if not key_str or value is None or _is_generic_key(key_str): continue value_str = str(value).strip() if value_str: fields[key_str] = value_str for list_key in ("pairs", "key_values", "key_value_pairs", "items"): raw_list = section.get(list_key) if not isinstance(raw_list, list): continue for item in raw_list: if not isinstance(item, dict): continue label = ( item.get("key") or item.get("label") or item.get("name") or item.get("field") ) value = item.get("value") or item.get("text") or item.get("content") if label is None or value is None: continue label_str = str(label).strip() value_str = str(value).strip() if label_str and value_str and not _is_generic_key(label_str): fields[label_str] = value_str return fields def _coerce_table(section: Dict[str, Any]) -> Tuple[List[str], List[List[str]]]: headers = [str(h).strip() for h in (section.get("headers") or []) if str(h).strip()] headers = [h for h in headers if not _is_generic_key(h)] rows: List[List[str]] = [] for row in section.get("rows") or []: if not isinstance(row, list): continue cells = [str(cell).strip() for cell in row] if any(cells): rows.append(cells) # Some models return columns as objects instead of headers+rows. columns = section.get("columns") if isinstance(columns, list) and columns and not rows: col_headers = [] col_values: List[List[str]] = [] for col in columns: if not isinstance(col, dict): continue header = str(col.get("header") or col.get("name") or "").strip() values = col.get("values") or col.get("cells") or [] if header and not _is_generic_key(header): col_headers.append(header) col_values.append([str(v).strip() for v in values if v is not None]) if col_headers and col_values: max_len = max(len(values) for values in col_values) headers = col_headers rows = [] for idx in range(max_len): rows.append([values[idx] if idx < len(values) else "" for values in col_values]) return headers, rows def _normalize_sections(sections: Any) -> List[Dict[str, Any]]: if not isinstance(sections, list): return [] normalized: List[Dict[str, Any]] = [] kv_fallback_idx = 1 for section in sections: if not isinstance(section, dict): continue section_type = str(section.get("type") or "key_value").lower() title = _normalize_section_title( str(section.get("title") or ""), fallback=f"Extracted fields {kv_fallback_idx}", ) if section_type == "table": headers, rows = _coerce_table(section) if headers or rows: normalized.append( { "title": title, "type": "table", "headers": headers, "rows": rows, } ) continue fields = _coerce_fields(section) if fields: if title.startswith("Extracted fields"): kv_fallback_idx += 1 normalized.append( { "title": title, "type": "key_value", "fields": fields, } ) return normalized def parse_structured_page(raw: str, page_number: int = 1) -> Dict[str, Any]: """Parse model JSON for one page; return a safe default on failure.""" fallback = { "page_number": page_number, "document_type": "other", "document_title": "", "sections": [], "parse_error": True, "raw_text": raw.strip(), } if not raw or not raw.strip(): return fallback try: data = json.loads(_strip_json_fence(raw)) except json.JSONDecodeError: match = re.search(r"\{[\s\S]*\}", raw) if not match: return fallback try: data = json.loads(match.group(0)) except json.JSONDecodeError: return fallback sections = _normalize_sections(data.get("sections")) meta_keys = { "document_type", "document_title", "sections", "pages", "fields", "pairs", "key_values", "key_value_pairs", "items", "columns", "headers", "rows", "type", "title", } flat_fields: Dict[str, str] = {} for key, value in data.items(): if key in meta_keys or value is None: continue if isinstance(value, (str, int, float)): key_str = str(key).strip() value_str = str(value).strip() if key_str and value_str and not _is_generic_key(key_str): flat_fields[key_str] = value_str top_fields = _coerce_fields(data) flat_fields.update(top_fields) if flat_fields and not sections: sections = [ { "title": _normalize_section_title( str(data.get("document_title") or "Document header"), fallback="Document header", ), "type": "key_value", "fields": flat_fields, } ] return { "page_number": page_number, "document_type": str(data.get("document_type") or "other"), "document_title": str(data.get("document_title") or "").strip(), "sections": sections, } def merge_structured_pages( pages: List[Dict[str, Any]], filename: Optional[str] = None, ) -> Dict[str, Any]: doc_type = next( (p["document_type"] for p in pages if p.get("document_type") and p["document_type"] != "other"), pages[0]["document_type"] if pages else "other", ) document_title = next( (p["document_title"] for p in pages if p.get("document_title")), "", ) return { "filename": filename, "document_type": doc_type, "document_title": document_title, "page_count": len(pages), "pages": pages, } def structured_to_plain_text(structured: Dict[str, Any]) -> str: """Flatten structured OCR for copy/search fallback.""" lines: List[str] = [] doc_type = structured.get("document_type", "other") doc_title = structured.get("document_title", "") if doc_title: lines.append(doc_title) lines.append(f"Document type: {doc_type}") for page in structured.get("pages") or []: page_num = page.get("page_number", 1) if structured.get("page_count", 1) > 1: lines.append(f"\n--- Page {page_num} ---") page_title = page.get("document_title") if page_title and page_title != doc_title: lines.append(page_title) for section in page.get("sections") or []: title = section.get("title", "Details") lines.append(f"\n## {title}") if section.get("type") == "table": headers = section.get("headers") or [] rows = section.get("rows") or [] if headers: lines.append(" | ".join(headers)) lines.append(" | ".join(["---"] * len(headers))) for row in rows: lines.append(" | ".join(row)) else: for key, value in (section.get("fields") or {}).items(): lines.append(f"{key}: {value}") if page.get("parse_error") and page.get("raw_text"): lines.append("\nRaw extraction:") lines.append(page["raw_text"]) return "\n".join(lines).strip()