Spaces:
Sleeping
Sleeping
| """ | |
| doc_reader.py | |
| ------------- | |
| Extracts full text from .docx, .pdf, and .txt files. | |
| For scanned PDFs: converts each page to image and uses GPT-4o vision. | |
| Falls back to pdfplumber for text-based PDFs. | |
| For DOCX: recursive XML walk to catch nested tables. | |
| Outputs clear section markers so doc_sectioner can locate annexures. | |
| """ | |
| import os | |
| import base64 | |
| import io | |
| import re | |
| import pdfplumber | |
| from docx import Document | |
| from docx.oxml.ns import qn | |
| from pathlib import Path | |
| from openai import OpenAI | |
| # ββ PDF: detect if scanned ββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _is_scanned_pdf(file_path: str, sample_pages: int = 3) -> bool: | |
| try: | |
| with pdfplumber.open(file_path) as pdf: | |
| pages_to_check = min(sample_pages, len(pdf.pages)) | |
| total_chars = sum( | |
| len((pdf.pages[i].extract_text() or "").strip()) | |
| for i in range(pages_to_check) | |
| ) | |
| avg = total_chars / max(pages_to_check, 1) | |
| print(f" Avg chars/page (first {pages_to_check}): {avg:.0f}") | |
| return avg < 100 | |
| except Exception: | |
| return True | |
| # ββ PDF: vision OCR via GPT-4o ββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _pdf_page_to_base64(file_path: str, page_num: int) -> str: | |
| from pdf2image import convert_from_path | |
| images = convert_from_path(file_path, first_page=page_num + 1, last_page=page_num + 1, dpi=180) | |
| if not images: | |
| return "" | |
| buf = io.BytesIO() | |
| images[0].save(buf, format="PNG") | |
| return base64.b64encode(buf.getvalue()).decode("utf-8") | |
| # Broad prompt used for most pages | |
| _VISION_PROMPT_BODY = ( | |
| "This is a page from an Indian HFC/NBFC loan document (CAL/CAM/COE/Annexure). " | |
| "Extract ALL text exactly as it appears. " | |
| "For tables, output each row on one line with columns separated by ' | '. " | |
| "Preserve all numbers, dates, rupee amounts, percentages, PAN numbers, addresses. " | |
| "Do NOT summarize. Output raw extracted text only." | |
| ) | |
| # Targeted prompts for specific page types | |
| _VISION_PROMPT_TABLE = ( | |
| "This page contains a table from an Indian loan document. " | |
| "Extract ALL rows of the table with columns separated by ' | '. " | |
| "Keep every row including headers and totals. " | |
| "Also include any heading text above or below the table. " | |
| "Do NOT summarize or skip any row." | |
| ) | |
| def _extract_text_from_scanned_pdf(file_path: str) -> str: | |
| api_key = os.getenv("OPENAI_API_KEY") | |
| if not api_key: | |
| raise ValueError("OPENAI_API_KEY not set β required for scanned PDF OCR.") | |
| client = OpenAI(api_key=api_key) | |
| with pdfplumber.open(file_path) as pdf: | |
| num_pages = len(pdf.pages) | |
| print(f" Scanned PDF β {num_pages} pages, using GPT-4o vision...") | |
| all_text = [] | |
| for page_num in range(num_pages): | |
| print(f" Page {page_num + 1}/{num_pages}...") | |
| try: | |
| b64 = _pdf_page_to_base64(file_path, page_num) | |
| if not b64: | |
| continue | |
| # Use table prompt for pages likely to have dense tables (annexures) | |
| # We don't know which pages have tables, so use body prompt for all, | |
| # but request explicit table row formatting | |
| response = client.chat.completions.create( | |
| model="gpt-4o", | |
| max_tokens=3000, | |
| messages=[{ | |
| "role": "user", | |
| "content": [ | |
| {"type": "image_url", "image_url": {"url": f"data:image/png;base64,{b64}", "detail": "high"}}, | |
| {"type": "text", "text": _VISION_PROMPT_BODY}, | |
| ] | |
| }] | |
| ) | |
| page_text = response.choices[0].message.content or "" | |
| all_text.append(f"\n=== PDF PAGE {page_num + 1} ===\n{page_text}") | |
| except Exception as e: | |
| print(f" Warning: page {page_num + 1} failed: {e}") | |
| all_text.append(f"\n=== PDF PAGE {page_num + 1} === [extraction failed: {e}]") | |
| return "\n".join(all_text).strip() | |
| # ββ PDF: text-based extraction ββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def extract_text_from_pdf(file_path: str) -> str: | |
| if _is_scanned_pdf(file_path): | |
| return _extract_text_from_scanned_pdf(file_path) | |
| print(" Text-based PDF β using pdfplumber...") | |
| text_parts = [] | |
| with pdfplumber.open(file_path) as pdf: | |
| for i, page in enumerate(pdf.pages): | |
| page_text = page.extract_text() or "" | |
| if page_text: | |
| text_parts.append(f"\n=== PDF PAGE {i + 1} ===\n{page_text}") | |
| tables = page.extract_tables() | |
| for table in tables: | |
| for row in table: | |
| if row: | |
| row_text = " | ".join(cell.strip() if cell else "" for cell in row) | |
| if row_text.strip(" |"): | |
| text_parts.append(row_text) | |
| return "\n".join(text_parts).strip() | |
| # ββ DOCX helpers ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _extract_cell_text(tc_element, depth: int = 0) -> str: | |
| parts = [] | |
| for child in tc_element: | |
| tag = child.tag.split("}")[1] if "}" in child.tag else child.tag | |
| if tag == "p": | |
| text = "".join(r.text for r in child.iter(qn("w:t")) if r.text) | |
| if text.strip(): | |
| parts.append(text.strip()) | |
| elif tag == "tbl": | |
| for tr in child.findall(".//" + qn("w:tr")): | |
| row_cells = [] | |
| for tc in tr.findall(qn("w:tc")): | |
| cell_text = _extract_cell_text(tc, depth + 1) | |
| row_cells.append(cell_text) | |
| deduped = [] | |
| for val in row_cells: | |
| if not deduped or val != deduped[-1]: | |
| deduped.append(val) | |
| row_str = " | ".join(deduped) | |
| if row_str.strip(" |"): | |
| parts.append(row_str) | |
| return "\n".join(parts) | |
| # Known heading patterns that mark important document sections | |
| _SECTION_HEADINGS = [ | |
| ("term sheet", "=== TERM SHEET ==="), | |
| ("terms of facility", "=== TERM SHEET ==="), | |
| ("annexure ii a", "=== ANNEXURE II A β SECURITY UNITS P1 ==="), | |
| ("annexure ii b", "=== ANNEXURE II B β SECURITY UNITS P2 ==="), | |
| ("annexure ii", "=== ANNEXURE II β SECURITY UNITS ==="), | |
| ("list of unsold units", "=== SECURITY UNITS TABLE ==="), | |
| ("list of unsold apartment", "=== SECURITY UNITS TABLE ==="), | |
| ("repayment schedule", "=== REPAYMENT SCHEDULE ==="), | |
| ("details of co-borrower","=== CO-BORROWERS ==="), | |
| ("details of co borrower","=== CO-BORROWERS ==="), | |
| ("pre-disbursement condition", "=== PRE-DISBURSEMENT CONDITIONS ==="), | |
| ("pre disbursement condition", "=== PRE-DISBURSEMENT CONDITIONS ==="), | |
| ("other monitoring condition", "=== MONITORING CONDITIONS ==="), | |
| ("special conditions", "=== SPECIAL CONDITIONS ==="), | |
| ("exit table", "=== EXIT TABLE ==="), | |
| ("collection slot", "=== SI / EXIT TABLE ==="), | |
| ("cash flow analysis", "=== CASH FLOW ANALYSIS ==="), | |
| ] | |
| def _inject_section_markers(text: str) -> str: | |
| """Insert section markers before lines that match known headings.""" | |
| lines = text.split("\n") | |
| out = [] | |
| for line in lines: | |
| ll = line.lower().strip() | |
| for pattern, marker in _SECTION_HEADINGS: | |
| if pattern in ll and len(ll) < 120: | |
| out.append(f"\n{marker}") | |
| break | |
| out.append(line) | |
| return "\n".join(out) | |
| def extract_text_from_docx(file_path: str) -> str: | |
| doc = Document(file_path) | |
| chunks = [] | |
| for para in doc.paragraphs: | |
| if para.text.strip(): | |
| chunks.append(para.text.strip()) | |
| for t_idx, table in enumerate(doc.tables): | |
| for row in table.rows: | |
| row_cells = [] | |
| for cell in row.cells: | |
| cell_text = _extract_cell_text(cell._tc) | |
| row_cells.append(cell_text) | |
| deduped = [] | |
| for val in row_cells: | |
| if not deduped or val != deduped[-1]: | |
| deduped.append(val) | |
| row_str = " | ".join(deduped) | |
| if row_str.strip(" |"): | |
| chunks.append(row_str) | |
| raw = "\n".join(chunks).strip() | |
| return _inject_section_markers(raw) | |
| # ββ Public API ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def extract_text(file_path: str) -> str: | |
| ext = Path(file_path).suffix.lower() | |
| if ext == ".pdf": | |
| print(" Format: PDF") | |
| return extract_text_from_pdf(file_path) | |
| elif ext == ".docx": | |
| print(" Format: DOCX") | |
| return extract_text_from_docx(file_path) | |
| elif ext == ".txt": | |
| print(" Format: TXT") | |
| with open(file_path, "r", encoding="utf-8", errors="ignore") as f: | |
| return f.read().strip() | |
| elif ext == ".doc": | |
| raise ValueError(".doc is not supported. Save as .docx and re-upload.") | |
| else: | |
| raise ValueError(f"Unsupported format: {ext}. Supported: .pdf, .docx, .txt") | |
| if __name__ == "__main__": | |
| import sys | |
| if len(sys.argv) > 1: | |
| path = sys.argv[1] | |
| print(f"[TEST] Reading: {path}") | |
| text = extract_text(path) | |
| print(f"[TEST] Extracted {len(text):,} chars") | |
| print("\n--- First 2000 chars ---") | |
| print(text[:2000]) | |
| print("\n--- Last 2000 chars ---") | |
| print(text[-2000:]) | |
| else: | |
| print("Usage: python doc_reader.py yourfile.pdf/docx/txt") |