| """Parse and merge structured OCR JSON from MiniCPM-V.""" |
|
|
| from __future__ import annotations |
|
|
| import json |
| import re |
| from typing import Any, Dict, List, Optional, Tuple |
|
|
| |
| _GENERIC_KEY_PATTERN = re.compile( |
| r"^(label|value|field\d*|column\d*|cell\d*|key|example|sample|placeholder|" |
| r"header\d*|row\d*|item\d*|data\d*|text\d*|name\d*)$", |
| re.IGNORECASE, |
| ) |
|
|
| _GENERIC_SECTION_TITLES = { |
| "details", |
| "section name", |
| "table section name", |
| "account information", |
| "balance summary", |
| "line items", |
| "transactions", |
| "key value", |
| "key_value", |
| } |
|
|
|
|
| def _strip_json_fence(text: str) -> str: |
| cleaned = text.strip() |
| cleaned = re.sub(r"^```(?:json)?\s*", "", cleaned, flags=re.IGNORECASE) |
| cleaned = re.sub(r"\s*```$", "", cleaned) |
| return cleaned.strip() |
|
|
|
|
| def _is_generic_key(key: str) -> bool: |
| stripped = key.strip() |
| if not stripped: |
| return True |
| return bool(_GENERIC_KEY_PATTERN.match(stripped)) |
|
|
|
|
| def _normalize_section_title(title: str, fallback: str = "Extracted fields") -> str: |
| cleaned = title.strip() |
| if not cleaned or cleaned.lower() in _GENERIC_SECTION_TITLES: |
| return fallback |
| return cleaned |
|
|
|
|
| def _coerce_fields(section: Dict[str, Any]) -> Dict[str, str]: |
| """Accept fields dict or list-of-pairs formats from the model.""" |
| fields: Dict[str, str] = {} |
|
|
| raw_fields = section.get("fields") |
| if isinstance(raw_fields, dict): |
| for key, value in raw_fields.items(): |
| key_str = str(key).strip() |
| if not key_str or value is None or _is_generic_key(key_str): |
| continue |
| value_str = str(value).strip() |
| if value_str: |
| fields[key_str] = value_str |
|
|
| for list_key in ("pairs", "key_values", "key_value_pairs", "items"): |
| raw_list = section.get(list_key) |
| if not isinstance(raw_list, list): |
| continue |
| for item in raw_list: |
| if not isinstance(item, dict): |
| continue |
| label = ( |
| item.get("key") |
| or item.get("label") |
| or item.get("name") |
| or item.get("field") |
| ) |
| value = item.get("value") or item.get("text") or item.get("content") |
| if label is None or value is None: |
| continue |
| label_str = str(label).strip() |
| value_str = str(value).strip() |
| if label_str and value_str and not _is_generic_key(label_str): |
| fields[label_str] = value_str |
|
|
| return fields |
|
|
|
|
| def _coerce_table(section: Dict[str, Any]) -> Tuple[List[str], List[List[str]]]: |
| headers = [str(h).strip() for h in (section.get("headers") or []) if str(h).strip()] |
| headers = [h for h in headers if not _is_generic_key(h)] |
|
|
| rows: List[List[str]] = [] |
| for row in section.get("rows") or []: |
| if not isinstance(row, list): |
| continue |
| cells = [str(cell).strip() for cell in row] |
| if any(cells): |
| rows.append(cells) |
|
|
| |
| columns = section.get("columns") |
| if isinstance(columns, list) and columns and not rows: |
| col_headers = [] |
| col_values: List[List[str]] = [] |
| for col in columns: |
| if not isinstance(col, dict): |
| continue |
| header = str(col.get("header") or col.get("name") or "").strip() |
| values = col.get("values") or col.get("cells") or [] |
| if header and not _is_generic_key(header): |
| col_headers.append(header) |
| col_values.append([str(v).strip() for v in values if v is not None]) |
| if col_headers and col_values: |
| max_len = max(len(values) for values in col_values) |
| headers = col_headers |
| rows = [] |
| for idx in range(max_len): |
| rows.append([values[idx] if idx < len(values) else "" for values in col_values]) |
|
|
| return headers, rows |
|
|
|
|
| def _normalize_sections(sections: Any) -> List[Dict[str, Any]]: |
| if not isinstance(sections, list): |
| return [] |
|
|
| normalized: List[Dict[str, Any]] = [] |
| kv_fallback_idx = 1 |
|
|
| for section in sections: |
| if not isinstance(section, dict): |
| continue |
|
|
| section_type = str(section.get("type") or "key_value").lower() |
| title = _normalize_section_title( |
| str(section.get("title") or ""), |
| fallback=f"Extracted fields {kv_fallback_idx}", |
| ) |
|
|
| if section_type == "table": |
| headers, rows = _coerce_table(section) |
| if headers or rows: |
| normalized.append( |
| { |
| "title": title, |
| "type": "table", |
| "headers": headers, |
| "rows": rows, |
| } |
| ) |
| continue |
|
|
| fields = _coerce_fields(section) |
| if fields: |
| if title.startswith("Extracted fields"): |
| kv_fallback_idx += 1 |
| normalized.append( |
| { |
| "title": title, |
| "type": "key_value", |
| "fields": fields, |
| } |
| ) |
|
|
| return normalized |
|
|
|
|
| def parse_structured_page(raw: str, page_number: int = 1) -> Dict[str, Any]: |
| """Parse model JSON for one page; return a safe default on failure.""" |
| fallback = { |
| "page_number": page_number, |
| "document_type": "other", |
| "document_title": "", |
| "sections": [], |
| "parse_error": True, |
| "raw_text": raw.strip(), |
| } |
| if not raw or not raw.strip(): |
| return fallback |
|
|
| try: |
| data = json.loads(_strip_json_fence(raw)) |
| except json.JSONDecodeError: |
| match = re.search(r"\{[\s\S]*\}", raw) |
| if not match: |
| return fallback |
| try: |
| data = json.loads(match.group(0)) |
| except json.JSONDecodeError: |
| return fallback |
|
|
| sections = _normalize_sections(data.get("sections")) |
|
|
| meta_keys = { |
| "document_type", |
| "document_title", |
| "sections", |
| "pages", |
| "fields", |
| "pairs", |
| "key_values", |
| "key_value_pairs", |
| "items", |
| "columns", |
| "headers", |
| "rows", |
| "type", |
| "title", |
| } |
| flat_fields: Dict[str, str] = {} |
| for key, value in data.items(): |
| if key in meta_keys or value is None: |
| continue |
| if isinstance(value, (str, int, float)): |
| key_str = str(key).strip() |
| value_str = str(value).strip() |
| if key_str and value_str and not _is_generic_key(key_str): |
| flat_fields[key_str] = value_str |
|
|
| top_fields = _coerce_fields(data) |
| flat_fields.update(top_fields) |
|
|
| if flat_fields and not sections: |
| sections = [ |
| { |
| "title": _normalize_section_title( |
| str(data.get("document_title") or "Document header"), |
| fallback="Document header", |
| ), |
| "type": "key_value", |
| "fields": flat_fields, |
| } |
| ] |
|
|
| return { |
| "page_number": page_number, |
| "document_type": str(data.get("document_type") or "other"), |
| "document_title": str(data.get("document_title") or "").strip(), |
| "sections": sections, |
| } |
|
|
|
|
| def merge_structured_pages( |
| pages: List[Dict[str, Any]], |
| filename: Optional[str] = None, |
| ) -> Dict[str, Any]: |
| doc_type = next( |
| (p["document_type"] for p in pages if p.get("document_type") and p["document_type"] != "other"), |
| pages[0]["document_type"] if pages else "other", |
| ) |
| document_title = next( |
| (p["document_title"] for p in pages if p.get("document_title")), |
| "", |
| ) |
| return { |
| "filename": filename, |
| "document_type": doc_type, |
| "document_title": document_title, |
| "page_count": len(pages), |
| "pages": pages, |
| } |
|
|
|
|
| def structured_to_plain_text(structured: Dict[str, Any]) -> str: |
| """Flatten structured OCR for copy/search fallback.""" |
| lines: List[str] = [] |
| doc_type = structured.get("document_type", "other") |
| doc_title = structured.get("document_title", "") |
| if doc_title: |
| lines.append(doc_title) |
| lines.append(f"Document type: {doc_type}") |
|
|
| for page in structured.get("pages") or []: |
| page_num = page.get("page_number", 1) |
| if structured.get("page_count", 1) > 1: |
| lines.append(f"\n--- Page {page_num} ---") |
|
|
| page_title = page.get("document_title") |
| if page_title and page_title != doc_title: |
| lines.append(page_title) |
|
|
| for section in page.get("sections") or []: |
| title = section.get("title", "Details") |
| lines.append(f"\n## {title}") |
|
|
| if section.get("type") == "table": |
| headers = section.get("headers") or [] |
| rows = section.get("rows") or [] |
| if headers: |
| lines.append(" | ".join(headers)) |
| lines.append(" | ".join(["---"] * len(headers))) |
| for row in rows: |
| lines.append(" | ".join(row)) |
| else: |
| for key, value in (section.get("fields") or {}).items(): |
| lines.append(f"{key}: {value}") |
|
|
| if page.get("parse_error") and page.get("raw_text"): |
| lines.append("\nRaw extraction:") |
| lines.append(page["raw_text"]) |
|
|
| return "\n".join(lines).strip() |
|
|