FinSightAI / backend /utils /ocr_structure.py
Aniket2003333333's picture
start
7248d39
Raw
History Blame Contribute Delete
9.68 kB
"""Parse and merge structured OCR JSON from MiniCPM-V."""
from __future__ import annotations
import json
import re
from typing import Any, Dict, List, Optional, Tuple
# Reject placeholder keys the model sometimes copies from schema examples.
_GENERIC_KEY_PATTERN = re.compile(
r"^(label|value|field\d*|column\d*|cell\d*|key|example|sample|placeholder|"
r"header\d*|row\d*|item\d*|data\d*|text\d*|name\d*)$",
re.IGNORECASE,
)
_GENERIC_SECTION_TITLES = {
"details",
"section name",
"table section name",
"account information",
"balance summary",
"line items",
"transactions",
"key value",
"key_value",
}
def _strip_json_fence(text: str) -> str:
cleaned = text.strip()
cleaned = re.sub(r"^```(?:json)?\s*", "", cleaned, flags=re.IGNORECASE)
cleaned = re.sub(r"\s*```$", "", cleaned)
return cleaned.strip()
def _is_generic_key(key: str) -> bool:
stripped = key.strip()
if not stripped:
return True
return bool(_GENERIC_KEY_PATTERN.match(stripped))
def _normalize_section_title(title: str, fallback: str = "Extracted fields") -> str:
cleaned = title.strip()
if not cleaned or cleaned.lower() in _GENERIC_SECTION_TITLES:
return fallback
return cleaned
def _coerce_fields(section: Dict[str, Any]) -> Dict[str, str]:
"""Accept fields dict or list-of-pairs formats from the model."""
fields: Dict[str, str] = {}
raw_fields = section.get("fields")
if isinstance(raw_fields, dict):
for key, value in raw_fields.items():
key_str = str(key).strip()
if not key_str or value is None or _is_generic_key(key_str):
continue
value_str = str(value).strip()
if value_str:
fields[key_str] = value_str
for list_key in ("pairs", "key_values", "key_value_pairs", "items"):
raw_list = section.get(list_key)
if not isinstance(raw_list, list):
continue
for item in raw_list:
if not isinstance(item, dict):
continue
label = (
item.get("key")
or item.get("label")
or item.get("name")
or item.get("field")
)
value = item.get("value") or item.get("text") or item.get("content")
if label is None or value is None:
continue
label_str = str(label).strip()
value_str = str(value).strip()
if label_str and value_str and not _is_generic_key(label_str):
fields[label_str] = value_str
return fields
def _coerce_table(section: Dict[str, Any]) -> Tuple[List[str], List[List[str]]]:
headers = [str(h).strip() for h in (section.get("headers") or []) if str(h).strip()]
headers = [h for h in headers if not _is_generic_key(h)]
rows: List[List[str]] = []
for row in section.get("rows") or []:
if not isinstance(row, list):
continue
cells = [str(cell).strip() for cell in row]
if any(cells):
rows.append(cells)
# Some models return columns as objects instead of headers+rows.
columns = section.get("columns")
if isinstance(columns, list) and columns and not rows:
col_headers = []
col_values: List[List[str]] = []
for col in columns:
if not isinstance(col, dict):
continue
header = str(col.get("header") or col.get("name") or "").strip()
values = col.get("values") or col.get("cells") or []
if header and not _is_generic_key(header):
col_headers.append(header)
col_values.append([str(v).strip() for v in values if v is not None])
if col_headers and col_values:
max_len = max(len(values) for values in col_values)
headers = col_headers
rows = []
for idx in range(max_len):
rows.append([values[idx] if idx < len(values) else "" for values in col_values])
return headers, rows
def _normalize_sections(sections: Any) -> List[Dict[str, Any]]:
if not isinstance(sections, list):
return []
normalized: List[Dict[str, Any]] = []
kv_fallback_idx = 1
for section in sections:
if not isinstance(section, dict):
continue
section_type = str(section.get("type") or "key_value").lower()
title = _normalize_section_title(
str(section.get("title") or ""),
fallback=f"Extracted fields {kv_fallback_idx}",
)
if section_type == "table":
headers, rows = _coerce_table(section)
if headers or rows:
normalized.append(
{
"title": title,
"type": "table",
"headers": headers,
"rows": rows,
}
)
continue
fields = _coerce_fields(section)
if fields:
if title.startswith("Extracted fields"):
kv_fallback_idx += 1
normalized.append(
{
"title": title,
"type": "key_value",
"fields": fields,
}
)
return normalized
def parse_structured_page(raw: str, page_number: int = 1) -> Dict[str, Any]:
"""Parse model JSON for one page; return a safe default on failure."""
fallback = {
"page_number": page_number,
"document_type": "other",
"document_title": "",
"sections": [],
"parse_error": True,
"raw_text": raw.strip(),
}
if not raw or not raw.strip():
return fallback
try:
data = json.loads(_strip_json_fence(raw))
except json.JSONDecodeError:
match = re.search(r"\{[\s\S]*\}", raw)
if not match:
return fallback
try:
data = json.loads(match.group(0))
except json.JSONDecodeError:
return fallback
sections = _normalize_sections(data.get("sections"))
meta_keys = {
"document_type",
"document_title",
"sections",
"pages",
"fields",
"pairs",
"key_values",
"key_value_pairs",
"items",
"columns",
"headers",
"rows",
"type",
"title",
}
flat_fields: Dict[str, str] = {}
for key, value in data.items():
if key in meta_keys or value is None:
continue
if isinstance(value, (str, int, float)):
key_str = str(key).strip()
value_str = str(value).strip()
if key_str and value_str and not _is_generic_key(key_str):
flat_fields[key_str] = value_str
top_fields = _coerce_fields(data)
flat_fields.update(top_fields)
if flat_fields and not sections:
sections = [
{
"title": _normalize_section_title(
str(data.get("document_title") or "Document header"),
fallback="Document header",
),
"type": "key_value",
"fields": flat_fields,
}
]
return {
"page_number": page_number,
"document_type": str(data.get("document_type") or "other"),
"document_title": str(data.get("document_title") or "").strip(),
"sections": sections,
}
def merge_structured_pages(
pages: List[Dict[str, Any]],
filename: Optional[str] = None,
) -> Dict[str, Any]:
doc_type = next(
(p["document_type"] for p in pages if p.get("document_type") and p["document_type"] != "other"),
pages[0]["document_type"] if pages else "other",
)
document_title = next(
(p["document_title"] for p in pages if p.get("document_title")),
"",
)
return {
"filename": filename,
"document_type": doc_type,
"document_title": document_title,
"page_count": len(pages),
"pages": pages,
}
def structured_to_plain_text(structured: Dict[str, Any]) -> str:
"""Flatten structured OCR for copy/search fallback."""
lines: List[str] = []
doc_type = structured.get("document_type", "other")
doc_title = structured.get("document_title", "")
if doc_title:
lines.append(doc_title)
lines.append(f"Document type: {doc_type}")
for page in structured.get("pages") or []:
page_num = page.get("page_number", 1)
if structured.get("page_count", 1) > 1:
lines.append(f"\n--- Page {page_num} ---")
page_title = page.get("document_title")
if page_title and page_title != doc_title:
lines.append(page_title)
for section in page.get("sections") or []:
title = section.get("title", "Details")
lines.append(f"\n## {title}")
if section.get("type") == "table":
headers = section.get("headers") or []
rows = section.get("rows") or []
if headers:
lines.append(" | ".join(headers))
lines.append(" | ".join(["---"] * len(headers)))
for row in rows:
lines.append(" | ".join(row))
else:
for key, value in (section.get("fields") or {}).items():
lines.append(f"{key}: {value}")
if page.get("parse_error") and page.get("raw_text"):
lines.append("\nRaw extraction:")
lines.append(page["raw_text"])
return "\n".join(lines).strip()