| import os |
| import fitz |
| import pytesseract |
| import easyocr |
| import numpy as np |
| from pdf2image import convert_from_path |
| from PIL import Image |
| from abc import ABC, abstractmethod |
|
|
| from paddleocr import PaddleOCR |
| from utils import measure_time |
|
|
|
|
| pytesseract.pytesseract.tesseract_cmd = "/usr/bin/tesseract" |
|
|
|
|
|
|
|
|
|
|
|
|
| |
| class FileProcessor(ABC): |
| """Abstract class for file processing.""" |
|
|
| @abstractmethod |
| def extract_text(self, file_path): |
| """Method to extract text from a file.""" |
| pass |
|
|
|
|
| |
| class PDFProcessor(FileProcessor): |
| def extract_text(self, pdf_path,reader): |
| text = "" |
| doc = fitz.open(pdf_path) |
|
|
| for page_num in range(len(doc)): |
| page = doc.load_page(page_num) |
| page_text = page.get_text("text").strip() |
| |
| |
| images = page.get_images(full=True) |
| ocr_text = "" |
| if images: |
| img_pages = convert_from_path(pdf_path, first_page=page_num + 1, last_page=page_num + 1) |
| for img in img_pages: |
| img = np.array(img) |
| |
| text_ocr = reader.ocr(img, cls=True) |
| |
| if text_ocr: |
| ocr_text += " ".join([line[1][0] for res in text_ocr for line in res]) + "\n" |
| |
|
|
| |
| |
|
|
| |
| combined_text = f"{page_text}\n{ocr_text}".strip() |
| text += combined_text + "\n\n" |
|
|
| return text.strip() if text else "No text found in PDF." |
|
|
|
|
| |
| class ImageProcessor(FileProcessor): |
| def extract_text(self, image_path,reader): |
| print("Single Image") |
| |
|
|
| |
| |
| text_ocr = reader.ocr(image_path, cls=True) |
| |
| return " ".join([line[1][0] for res in text_ocr for line in res]) |
| |
| |
|
|
|
|
| |
| class FileProcessorFactory: |
| """Factory class to get the correct file processor based on file extension.""" |
|
|
| _processors = { |
| ".pdf": PDFProcessor(), |
| ".png": ImageProcessor(), |
| ".jpg": ImageProcessor(), |
| ".jpeg": ImageProcessor(), |
| } |
|
|
| @classmethod |
| def get_processor(cls, file_path): |
| ext = os.path.splitext(file_path)[-1].lower() |
| return cls._processors.get(ext, None) |
|
|
|
|
| |
| @measure_time |
| def read_file(file_path,reader): |
| processor = FileProcessorFactory.get_processor(file_path) |
|
|
| if processor: |
| return processor.extract_text(file_path,reader) |
| else: |
| return f"Unsupported file format: {file_path}" |
|
|
|
|