| | """ |
| | File: ocr.py |
| | Description: Optical Character Recognition (OCR) using software 2.0 models |
| | Author: Didier Guillevic |
| | Date: 2025-04-06 |
| | """ |
| |
|
| | import os |
| | os.system("bash setup.sh") |
| | import magic |
| | import vlm |
| |
|
| | import uuid |
| | import shutil |
| | import threading |
| | import time |
| | import pathlib |
| |
|
| | import pdf2image |
| | from pdf2image.exceptions import PDFPageCountError, PDFSyntaxError |
| | import pypdf |
| | import base64 |
| | from contextlib import contextmanager |
| | from typing import List, Optional, Tuple, Union |
| |
|
| | import logging |
| |
|
| | class PDFScannerTempManager: |
| | """ |
| | Manages temporary directory creation and cleanup for PDF scanning operations. |
| | """ |
| | |
| | def __init__(self, base_temp_dir: str = 'tmp'): |
| | """ |
| | Initialize temporary directory manager. |
| | |
| | Args: |
| | base_temp_dir (str): Base directory for temporary files |
| | """ |
| | self.base_temp_dir = base_temp_dir |
| | self.active_temp_dirs: list[str] = [] |
| | |
| | |
| | os.makedirs(base_temp_dir, exist_ok=True) |
| | |
| | |
| | logging.basicConfig(level=logging.INFO) |
| | self.logger = logging.getLogger(__name__) |
| | |
| | @contextmanager |
| | def temp_directory(self) -> str: |
| | """ |
| | Create a temporary directory with UUID and manage its lifecycle. |
| | |
| | Yields: |
| | str: Path to the temporary directory |
| | """ |
| | |
| | dir_uuid = str(uuid.uuid4()) |
| | temp_dir = os.path.join(self.base_temp_dir, dir_uuid) |
| | |
| | try: |
| | |
| | os.makedirs(temp_dir, exist_ok=False) |
| | self.active_temp_dirs.append(temp_dir) |
| | |
| | |
| | yield temp_dir |
| | |
| | finally: |
| | |
| | self._cleanup_directory(temp_dir) |
| | |
| | def _cleanup_directory(self, directory: str) -> None: |
| | """ |
| | Safely remove a temporary directory. |
| | |
| | Args: |
| | directory (str): Path to directory to remove |
| | """ |
| | try: |
| | if os.path.exists(directory): |
| | shutil.rmtree(directory) |
| | |
| | |
| | if directory in self.active_temp_dirs: |
| | self.active_temp_dirs.remove(directory) |
| | |
| | except Exception as e: |
| | self.logger.error(f"Error cleaning up directory {directory}: {e}") |
| | |
| | def cleanup_all(self) -> None: |
| | """ |
| | Clean up all temporary directories created during the session. |
| | """ |
| | for directory in list(self.active_temp_dirs): |
| | self._cleanup_directory(directory) |
| |
|
| |
|
| | class PDFScanner: |
| | """ |
| | A class to perform OCR on PDF files with robust temp management. |
| | """ |
| | |
| | def __init__(self, |
| | dpi: int = 300, |
| | temp_manager: Optional[PDFScannerTempManager] = None |
| | ): |
| | """ |
| | Initialize the PDFScanner. |
| | |
| | Args: |
| | dpi (int): DPI for PDF conversion |
| | temp_manager (PDFScannerTempManager, optional): Temp directory manager |
| | """ |
| | self.dpi = dpi |
| | self.temp_manager = temp_manager or PDFScannerTempManager() |
| | self.logger = logging.getLogger(__name__) |
| | |
| | def _validate_pdf(self, pdf_path: str) -> Tuple[bool, str, bool]: |
| | """ |
| | Validate PDF file and check for encryption. |
| | |
| | Returns: |
| | Tuple[bool, str, bool]: (is_valid, error_message, is_encrypted) |
| | """ |
| | try: |
| | with open(pdf_path, 'rb') as file: |
| | |
| | if not file.read(4) == b'%PDF': |
| | return False, "Not a valid PDF file (missing PDF signature)", False |
| | |
| | |
| | file.seek(0) |
| | |
| | try: |
| | pdf_reader = pypdf.PdfReader(file, strict=False) |
| | is_encrypted = pdf_reader.is_encrypted |
| | |
| | if is_encrypted: |
| | return False, "PDF is encrypted and requires password", True |
| | |
| | num_pages = len(pdf_reader.pages) |
| | return True, f"Valid PDF with {num_pages} pages", False |
| | |
| | except pypdf.errors.PdfReadError as e: |
| | return False, f"Invalid PDF structure: {str(e)}", False |
| | |
| | except Exception as e: |
| | return False, f"Error validating PDF: {str(e)}", False |
| | |
| | def _repair_pdf(self, pdf_path: str, temp_dir: str) -> str: |
| | """ |
| | Attempt to repair a corrupted PDF file. |
| | |
| | Args: |
| | pdf_path (str): Path to original PDF |
| | temp_dir (str): Temporary directory for repair |
| | |
| | Returns: |
| | str: Path to repaired PDF |
| | """ |
| | repaired_pdf = os.path.join(temp_dir, 'repaired.pdf') |
| | |
| | try: |
| | |
| | with open(pdf_path, 'rb') as file: |
| | reader = pypdf.PdfReader(file, strict=False) |
| | writer = pypdf.PdfWriter() |
| | |
| | for page in reader.pages: |
| | writer.add_page(page) |
| | |
| | with open(repaired_pdf, 'wb') as output_file: |
| | writer.write(output_file) |
| | |
| | if os.path.exists(repaired_pdf): |
| | return repaired_pdf |
| | |
| | except Exception as e: |
| | self.logger.warning(f"pypdf repair failed: {str(e)}") |
| | |
| | |
| | try: |
| | gs_command = [ |
| | 'gs', |
| | '-o', repaired_pdf, |
| | '-sDEVICE=pdfwrite', |
| | '-dPDFSETTINGS=/prepress', |
| | pdf_path |
| | ] |
| | |
| | process = subprocess.run( |
| | gs_command, |
| | capture_output=True, |
| | text=True |
| | ) |
| | |
| | if process.returncode == 0 and os.path.exists(repaired_pdf): |
| | return repaired_pdf |
| | else: |
| | raise Exception(f"Ghostscript repair failed: {process.stderr}") |
| | |
| | except Exception as e: |
| | self.logger.error(f"PDF repair failed: {str(e)}") |
| | raise |
| | |
| | def _process_images( |
| | self, |
| | images: list, |
| | temp_dir: str, |
| | language: str |
| | ) -> list[str]: |
| | """Helper method to process converted images.""" |
| | extracted_text = [] |
| |
|
| | for i, image in enumerate(images): |
| | image_path = os.path.join(temp_dir, f'page_{i+1}.png') |
| | try: |
| | |
| | image.save(image_path, 'PNG', quality=100) |
| | |
| | |
| | text = process_image_file(image_path) |
| | extracted_text.append(text) |
| | |
| | except Exception as e: |
| | self.logger.error(f"Error processing page {i+1}: {str(e)}") |
| | extracted_text.append(f"[ERROR ON PAGE {i+1}]") |
| | |
| | return extracted_text |
| |
|
| | def pdf_to_text( |
| | self, |
| | pdf_path: str, |
| | language: str = 'eng', |
| | first_page: Optional[int] = None, |
| | last_page: Optional[int] = None, |
| | attempt_repair: bool = True |
| | ) -> list[str]: |
| | """ |
| | Convert a PDF file to text using OCR with robust error handling. |
| | |
| | Args: |
| | pdf_path (str): Path to the PDF file |
| | language (str): Language for OCR (default: 'eng') |
| | first_page (int, optional): First page to process (1-based) |
| | last_page (int, optional): Last page to process |
| | attempt_repair (bool): Whether to attempt repairing corrupted PDFs |
| | |
| | Returns: |
| | list[str]: List of extracted text for each page |
| | """ |
| | if not os.path.exists(pdf_path): |
| | raise FileNotFoundError(f"PDF file not found: {pdf_path}") |
| | |
| | |
| | with self.temp_manager.temp_directory() as temp_dir: |
| | |
| | is_valid, error_message, is_encrypted = self._validate_pdf(pdf_path) |
| | if not is_valid: |
| | self.logger.warning(f"PDF validation issue: {error_message}") |
| | |
| | if is_encrypted: |
| | raise Exception("Cannot process encrypted PDF files") |
| | |
| | if attempt_repair: |
| | try: |
| | pdf_path = self._repair_pdf(pdf_path, temp_dir) |
| | self.logger.info("Using repaired PDF file") |
| | except Exception as e: |
| | self.logger.error(f"Repair failed: {str(e)}") |
| | |
| | |
| | conversion_methods = [ |
| | {'use_pdftocairo': True, 'strict': False}, |
| | {'use_pdftocairo': False, 'strict': False}, |
| | {'use_pdftocairo': True, 'strict': False, 'dpi': self.dpi * 2}, |
| | {'use_pdftocairo': False, 'strict': False, 'dpi': self.dpi * 3} |
| | ] |
| | |
| | last_error = None |
| | for method in conversion_methods: |
| | try: |
| | self.logger.info(f"Trying conversion method: {method}") |
| | images = pdf2image.convert_from_path( |
| | pdf_path, |
| | dpi=method.get('dpi', self.dpi), |
| | first_page=first_page, |
| | last_page=last_page, |
| | thread_count=4, |
| | grayscale=True, |
| | **{k: v for k, v in method.items() if k != 'dpi'} |
| | ) |
| | |
| | if images: |
| | return self._process_images(images, temp_dir, language) |
| | |
| | except Exception as e: |
| | last_error = e |
| | self.logger.warning(f"Method failed: {str(e)}") |
| | continue |
| | |
| | if last_error: |
| | raise Exception(f"All conversion methods failed. Last error: {str(last_error)}") |
| |
|
| | |
| | |
| | |
| | pdf_scanner = PDFScanner() |
| |
|
| |
|
| | |
| | |
| | |
| | def process_file(input_file: str): |
| | """Process given file with OCR" |
| | """ |
| | file_type = get_file_type(input_file) |
| |
|
| | if file_type == "Image": |
| | return process_image_file(input_file) |
| | elif file_type == "PDF": |
| | return process_pdf_file(input_file) |
| | else: |
| | return "Unsupported file type. Please upload a PDF, or an image file." |
| |
|
| |
|
| | def process_image_file(input_file: str): |
| | """Process image file with OCR |
| | """ |
| | messages = [ |
| | { |
| | "role": "user", |
| | "content": [ |
| | { |
| | "type": "text", |
| | "text": ( |
| | |
| | |
| | "Could you perform optical character recognition (OCR) on the image? " |
| | "Simply return the text without any additional comments. " |
| | "The exception would be if the image represents an ID card. " |
| | "In such a case, please return the information in a structured format. " |
| | ) |
| | }, |
| | { |
| | "type": "image_url", |
| | "image_url": f"data:image/jpeg;base64,{encode_image(input_file)}" |
| | } |
| | ] |
| | } |
| | ] |
| | return vlm.get_response(messages) |
| |
|
| |
|
| | def process_pdf_file(input_file: str): |
| | """Process PDF file with OCR |
| | |
| | Args: |
| | input_file: the PDF file to process with OCR |
| | |
| | Returns: |
| | the text OCR result |
| | |
| | Note: |
| | Each page of the PDF is processed as an image. |
| | """ |
| | texts = pdf_scanner.pdf_to_text(pdf_path=input_file.name) |
| | output_text = '\n\n'.join(texts) |
| | return output_text |
| |
|
| |
|
| | |
| | |
| | |
| | def get_file_type(file_path): |
| | |
| | file_extension = os.path.splitext(file_path)[1].lower() |
| |
|
| | |
| | mime = magic.Magic(mime=True) |
| | mime_type = mime.from_file(file_path) |
| |
|
| | |
| | if file_extension == '.pdf' or mime_type == 'application/pdf': |
| | return 'PDF' |
| | elif file_extension in ['.jpg', '.jpeg', '.png', '.gif'] or mime_type.startswith('image/'): |
| | return 'Image' |
| | elif file_extension == '.pptx' or mime_type == 'application/vnd.openxmlformats-officedocument.presentationml.presentation': |
| | return 'PowerPoint' |
| | else: |
| | return 'Other' |
| |
|
| | |
| | |
| | |
| | def encode_image(image_path): |
| | """Encode the image to base64.""" |
| | try: |
| | with open(image_path, "rb") as image_file: |
| | return base64.b64encode(image_file.read()).decode('utf-8') |
| | except FileNotFoundError: |
| | print(f"Error: The file {image_path} was not found.") |
| | return None |
| | except Exception as e: |
| | print(f"Error: {e}") |
| | return None |
| |
|