DoAn / core /preprocessing /docling_processor.py
hungnha's picture
Thay đổi promt
92c9b4d
import os
import re
import gc
import sys
import signal
import logging
from datetime import datetime
from pathlib import Path
from docling.document_converter import DocumentConverter, FormatOption
from docling.datamodel.base_models import InputFormat
from docling.datamodel.pipeline_options import PdfPipelineOptions, TableStructureOptions, EasyOcrOptions, TableFormerMode
from docling.backend.pypdfium2_backend import PyPdfiumDocumentBackend
from docling.pipeline.standard_pdf_pipeline import StandardPdfPipeline
# Add project root to path for HashProcessor import
PROJECT_ROOT = Path(__file__).resolve().parents[2]
if str(PROJECT_ROOT) not in sys.path:
sys.path.insert(0, str(PROJECT_ROOT))
from core.hash_file.hash_file import HashProcessor
class DoclingProcessor:
def __init__(self, output_dir: str, use_ocr: bool = True, timeout: int = 300, images_scale: float = 3.0):
self.output_dir = output_dir
self.timeout = timeout
self.logger = logging.getLogger(__name__)
self.hasher = HashProcessor(verbose=False)
os.makedirs(output_dir, exist_ok=True)
# Hash index file
self.hash_index_path = Path(output_dir) / "docling_hash_index.json"
self.hash_index = self.hasher.load_processed_index(str(self.hash_index_path))
# PDF pipeline configuration
opts = PdfPipelineOptions(do_ocr=use_ocr, do_table_structure=True)
opts.table_structure_options = TableStructureOptions(do_cell_matching=True, mode=TableFormerMode.ACCURATE)
opts.images_scale = images_scale
# Vietnamese OCR configuration
if use_ocr:
ocr = EasyOcrOptions()
ocr.lang = ["vi"]
ocr.force_full_page_ocr = False
opts.ocr_options = ocr
self.converter = DocumentConverter(format_options={
InputFormat.PDF: FormatOption(backend=PyPdfiumDocumentBackend, pipeline_cls=StandardPdfPipeline, pipeline_options=opts)
})
self.logger.info(f"Docling | OCR={use_ocr} | Table=accurate | Scale={images_scale} | timeout={timeout}s")
def clean_markdown(self, text: str) -> str:
text = re.sub(r'\n\s*Trang\s+\d+\s*\n', '\n', text)
return re.sub(r'\n{3,}', '\n\n', text).strip()
def _should_process(self, pdf_path: str, output_path: Path) -> bool:
# If output doesn't exist -> needs processing
if not output_path.exists():
return True
# Compute hash of current PDF file
current_hash = self.hasher.get_file_hash(pdf_path)
if not current_hash:
return True
# Compare with saved hash
saved_hash = self.hash_index.get(pdf_path, {}).get("hash")
return current_hash != saved_hash
def _save_hash(self, pdf_path: str, file_hash: str) -> None:
self.hash_index[pdf_path] = {
"hash": file_hash,
"processed_at": self.hasher.get_current_timestamp()
}
def parse_document(self, file_path: str) -> str | None:
if not os.path.exists(file_path):
return None
filename = os.path.basename(file_path)
try:
# Set timeout to prevent hanging
signal.signal(signal.SIGALRM, lambda s, f: (_ for _ in ()).throw(TimeoutError()))
signal.alarm(self.timeout)
result = self.converter.convert(file_path)
md = result.document.export_to_markdown(image_placeholder="")
signal.alarm(0)
md = self.clean_markdown(md)
# Add frontmatter metadata
return f"---\nfilename: {filename}\nfilepath: {file_path}\npage_count: {len(result.document.pages)}\nprocessed_at: {datetime.now().isoformat()}\n---\n\n{md}"
except TimeoutError:
self.logger.warning(f"Timeout: {filename}")
signal.alarm(0)
return None
except Exception as e:
self.logger.error(f"Error: {filename}: {e}")
signal.alarm(0)
return None
def parse_directory(self, source_dir: str) -> dict:
self.logger.info(f"Found {len(pdf_files)} PDF files in {source_dir}")
results = {"total": len(pdf_files), "parsed": 0, "skipped": 0, "errors": 0}
for i, fp in enumerate(pdf_files):
try:
rel = fp.relative_to(source_path)
except ValueError:
rel = Path(fp.name)
out = Path(self.output_dir) / rel.with_suffix(".md")
out.parent.mkdir(parents=True, exist_ok=True)
pdf_path = str(fp)
# Check hash to decide if processing is needed
if not self._should_process(pdf_path, out):
results["skipped"] += 1
continue
# Compute hash before processing
file_hash = self.hasher.get_file_hash(pdf_path)
md = self.parse_document(pdf_path)
if md:
out.write_text(md, encoding="utf-8")
results["parsed"] += 1
# Save hash after successful processing
if file_hash:
self._save_hash(pdf_path, file_hash)
else:
results["errors"] += 1
# Clean up memory every 10 files
if (i + 1) % 10 == 0:
gc.collect()
self.logger.info(f"{i+1}/{len(pdf_files)} (skipped: {results['skipped']})")
# Save hash index after processing
self.hasher.save_processed_index(str(self.hash_index_path), self.hash_index)
self.logger.info(f"Done: {results['parsed']} processed, {results['skipped']} skipped, {results['errors']} errors")
return results