| | import os |
| | import re |
| | import gc |
| | import sys |
| | import signal |
| | import logging |
| | from datetime import datetime |
| | from pathlib import Path |
| |
|
| | from docling.document_converter import DocumentConverter, FormatOption |
| | from docling.datamodel.base_models import InputFormat |
| | from docling.datamodel.pipeline_options import PdfPipelineOptions, TableStructureOptions, EasyOcrOptions, TableFormerMode |
| | from docling.backend.pypdfium2_backend import PyPdfiumDocumentBackend |
| | from docling.pipeline.standard_pdf_pipeline import StandardPdfPipeline |
| |
|
| | |
| | PROJECT_ROOT = Path(__file__).resolve().parents[2] |
| | if str(PROJECT_ROOT) not in sys.path: |
| | sys.path.insert(0, str(PROJECT_ROOT)) |
| |
|
| | from core.hash_file.hash_file import HashProcessor |
| |
|
| |
|
| | class DoclingProcessor: |
| |
|
| | |
| | def __init__(self, output_dir: str, use_ocr: bool = True, timeout: int = 300, images_scale: float = 3.0): |
| |
|
| | self.output_dir = output_dir |
| | self.timeout = timeout |
| | self.logger = logging.getLogger(__name__) |
| | self.hasher = HashProcessor(verbose=False) |
| | os.makedirs(output_dir, exist_ok=True) |
| | |
| | |
| | self.hash_index_path = Path(output_dir) / "docling_hash_index.json" |
| | self.hash_index = self.hasher.load_processed_index(str(self.hash_index_path)) |
| | |
| | |
| | opts = PdfPipelineOptions(do_ocr=use_ocr, do_table_structure=True) |
| | opts.table_structure_options = TableStructureOptions(do_cell_matching=True, mode=TableFormerMode.ACCURATE) |
| | opts.images_scale = images_scale |
| | |
| | |
| | if use_ocr: |
| | ocr = EasyOcrOptions() |
| | ocr.lang = ["vi"] |
| | ocr.force_full_page_ocr = False |
| | opts.ocr_options = ocr |
| |
|
| | self.converter = DocumentConverter(format_options={ |
| | InputFormat.PDF: FormatOption(backend=PyPdfiumDocumentBackend, pipeline_cls=StandardPdfPipeline, pipeline_options=opts) |
| | }) |
| | self.logger.info(f"Docling | OCR={use_ocr} | Table=accurate | Scale={images_scale} | timeout={timeout}s") |
| | |
| | def clean_markdown(self, text: str) -> str: |
| |
|
| | text = re.sub(r'\n\s*Trang\s+\d+\s*\n', '\n', text) |
| | return re.sub(r'\n{3,}', '\n\n', text).strip() |
| | |
| | def _should_process(self, pdf_path: str, output_path: Path) -> bool: |
| |
|
| | |
| | if not output_path.exists(): |
| | return True |
| | |
| | |
| | current_hash = self.hasher.get_file_hash(pdf_path) |
| | if not current_hash: |
| | return True |
| | |
| | |
| | saved_hash = self.hash_index.get(pdf_path, {}).get("hash") |
| | return current_hash != saved_hash |
| | |
| | def _save_hash(self, pdf_path: str, file_hash: str) -> None: |
| |
|
| | self.hash_index[pdf_path] = { |
| | "hash": file_hash, |
| | "processed_at": self.hasher.get_current_timestamp() |
| | } |
| | |
| | def parse_document(self, file_path: str) -> str | None: |
| |
|
| | if not os.path.exists(file_path): |
| | return None |
| | filename = os.path.basename(file_path) |
| | try: |
| | |
| | signal.signal(signal.SIGALRM, lambda s, f: (_ for _ in ()).throw(TimeoutError())) |
| | signal.alarm(self.timeout) |
| | |
| | result = self.converter.convert(file_path) |
| | md = result.document.export_to_markdown(image_placeholder="") |
| | signal.alarm(0) |
| | |
| | md = self.clean_markdown(md) |
| | |
| | return f"---\nfilename: {filename}\nfilepath: {file_path}\npage_count: {len(result.document.pages)}\nprocessed_at: {datetime.now().isoformat()}\n---\n\n{md}" |
| | except TimeoutError: |
| | self.logger.warning(f"Timeout: {filename}") |
| | signal.alarm(0) |
| | return None |
| | except Exception as e: |
| | self.logger.error(f"Error: {filename}: {e}") |
| | signal.alarm(0) |
| | return None |
| | |
| | def parse_directory(self, source_dir: str) -> dict: |
| | self.logger.info(f"Found {len(pdf_files)} PDF files in {source_dir}") |
| | |
| | results = {"total": len(pdf_files), "parsed": 0, "skipped": 0, "errors": 0} |
| | |
| | for i, fp in enumerate(pdf_files): |
| | try: |
| | rel = fp.relative_to(source_path) |
| | except ValueError: |
| | rel = Path(fp.name) |
| | out = Path(self.output_dir) / rel.with_suffix(".md") |
| | out.parent.mkdir(parents=True, exist_ok=True) |
| | |
| | pdf_path = str(fp) |
| | |
| | |
| | if not self._should_process(pdf_path, out): |
| | results["skipped"] += 1 |
| | continue |
| | |
| | |
| | file_hash = self.hasher.get_file_hash(pdf_path) |
| | |
| | md = self.parse_document(pdf_path) |
| | if md: |
| | out.write_text(md, encoding="utf-8") |
| | results["parsed"] += 1 |
| | |
| | if file_hash: |
| | self._save_hash(pdf_path, file_hash) |
| | else: |
| | results["errors"] += 1 |
| | |
| | |
| | if (i + 1) % 10 == 0: |
| | gc.collect() |
| | self.logger.info(f"{i+1}/{len(pdf_files)} (skipped: {results['skipped']})") |
| | |
| | |
| | self.hasher.save_processed_index(str(self.hash_index_path), self.hash_index) |
| | |
| | self.logger.info(f"Done: {results['parsed']} processed, {results['skipped']} skipped, {results['errors']} errors") |
| | return results |
| |
|