Spaces:
Sleeping
Sleeping
| # app.py | |
| import os | |
| import re | |
| import io | |
| import json | |
| import time | |
| import random | |
| import logging | |
| import subprocess | |
| import sys | |
| import gradio as gr | |
| from PIL import Image, ImageSequence | |
| # Automatically install the google-generative-ai package if not available. | |
| try: | |
| from google import genai | |
| from google.genai import types | |
| except ImportError: | |
| subprocess.check_call([sys.executable, "-m", "pip", "install", "google-genai"]) | |
| from google import genai | |
| from google.genai import types | |
| # -------------------------------------------------------------------------------------- | |
| # CONFIGURATION | |
| # -------------------------------------------------------------------------------------- | |
| MODEL_NAME = "gemini-2.0-flash-thinking-exp-01-21" | |
| # Load API keys from the environment variable "MY_API_KEYS" | |
| api_keys_str = os.environ.get("MY_API_KEYS") | |
| if not api_keys_str: | |
| raise ValueError("API keys are not set. Please set the MY_API_KEYS environment variable in your Hugging Face Space secrets.") | |
| API_KEYS = api_keys_str.split(",") # Keys should be comma separated | |
| EXPECTED_FIELDS = [ | |
| "StampType", # e.g., "Marca da Bollo" | |
| "StampNumber", # Unique serial or identification number. | |
| "IssueDate", # Date of issuance in YYYY-MM-DD format. | |
| "Value", # Monetary value, e.g., "16.00" | |
| "Barcode" # Barcode data if present. | |
| ] | |
| MAX_SIZE_KB = 15000 | |
| QUALITY = 85 | |
| MAX_WIDTH = 2048 | |
| MAX_HEIGHT = 2048 | |
| # -------------------------------------------------------------------------------------- | |
| # LOGGING CONFIGURATION | |
| # -------------------------------------------------------------------------------------- | |
| logging.basicConfig( | |
| level=logging.DEBUG, | |
| format="%(asctime)s [%(levelname)s] %(message)s", | |
| ) | |
| logger = logging.getLogger(__name__) | |
| # -------------------------------------------------------------------------------------- | |
| # FILE CONVERSION FUNCTION | |
| # -------------------------------------------------------------------------------------- | |
| def convert_file_to_image(file_path): | |
| """ | |
| Converts the uploaded file to a PIL Image. | |
| If the file is a PDF, converts its first page. | |
| For DOC/DOCX files, raises NotImplementedError. | |
| Otherwise, treats the file as an image. | |
| """ | |
| ext = os.path.splitext(file_path)[1].lower() | |
| if ext == ".pdf": | |
| try: | |
| from pdf2image import convert_from_path | |
| except ImportError: | |
| raise ImportError("pdf2image is not installed. Please install it with 'pip install pdf2image'") | |
| logger.debug("Converting PDF to image (first page).") | |
| images = convert_from_path(file_path, first_page=1, last_page=1) | |
| if images: | |
| return images[0] | |
| else: | |
| raise ValueError("No pages found in PDF.") | |
| elif ext in [".doc", ".docx"]: | |
| raise NotImplementedError("DOC/DOCX conversion is not implemented. Please convert to PDF or image first.") | |
| else: | |
| logger.debug("File is an image; opening.") | |
| return Image.open(file_path) | |
| # -------------------------------------------------------------------------------------- | |
| # IMAGE OPTIMIZATION FUNCTIONS | |
| # -------------------------------------------------------------------------------------- | |
| def optimize_image_for_gemini(image_file, max_size_kb=MAX_SIZE_KB, quality=QUALITY, max_width=MAX_WIDTH, max_height=MAX_HEIGHT): | |
| try: | |
| img = Image.open(image_file) | |
| logger.debug("Opened image successfully.") | |
| except Exception as e: | |
| logger.error(f"Failed to open image: {e}") | |
| return None | |
| if getattr(img, "is_animated", False): | |
| return _optimize_animated(img, max_size_kb, max_width, max_height) | |
| if 'A' in img.mode: | |
| img = img.convert("RGBA") | |
| else: | |
| img = img.convert("RGB") | |
| img = resize_image(img, max_width, max_height) | |
| compression_plan = [ | |
| ("WEBP", 75, {}), | |
| ("WEBP", 65, {}), | |
| ("WEBP", 55, {}), | |
| ("JPEG", 90, {"optimize": True}), | |
| ("JPEG", 85, {"optimize": True}), | |
| ("JPEG", 80, {"optimize": True}) | |
| ] | |
| optimized = _attempt_compressions(img, compression_plan, max_size_kb) | |
| if optimized: | |
| logger.debug("Image optimized successfully.") | |
| return optimized | |
| logger.error("Could not compress image below target size.") | |
| return None | |
| def _optimize_animated(img, max_size_kb, max_width, max_height): | |
| frames = [] | |
| durations = [] | |
| for i, frame in enumerate(ImageSequence.Iterator(img)): | |
| if i % 2 == 0: | |
| frame = frame.convert("RGBA") if 'A' in frame.mode else frame.convert("RGB") | |
| frame = resize_image(frame, max_width, max_height) | |
| frames.append(frame) | |
| durations.append(frame.info.get('duration', 40)) | |
| if not frames: | |
| logger.error("No frames found in animated image.") | |
| return None | |
| output_buffer = io.BytesIO() | |
| try: | |
| frames[0].save( | |
| output_buffer, | |
| format="WEBP", | |
| save_all=True, | |
| append_images=frames[1:], | |
| duration=durations, | |
| loop=0, | |
| lossless=False, | |
| quality=75, | |
| method=6 | |
| ) | |
| except Exception as e: | |
| logger.error(f"Could not save animated image: {e}") | |
| return None | |
| optimized_bytes = output_buffer.getvalue() | |
| size_kb = len(optimized_bytes) / 1024 | |
| logger.debug(f"Animated image size: {size_kb:.2f} KB") | |
| if size_kb <= max_size_kb: | |
| return optimized_bytes | |
| else: | |
| logger.error("Animated image exceeds target size.") | |
| return None | |
| def resize_image(img, max_width, max_height): | |
| width, height = img.size | |
| if width > max_width or height > max_height: | |
| if width >= height: | |
| new_width = max_width | |
| new_height = int(height * (max_width / width)) | |
| else: | |
| new_height = max_height | |
| new_width = int(width * (max_height / height)) | |
| img = img.resize((new_width, new_height), Image.Resampling.LANCZOS) | |
| logger.debug(f"Image resized to: {new_width}x{new_height}") | |
| return img | |
| def _attempt_compressions(img, compression_plan, max_size_kb): | |
| for fmt, q, options in compression_plan: | |
| compressed = compress_image(img, fmt, q, options) | |
| size_kb = len(compressed) / 1024 | |
| logger.debug(f"Compression with {fmt} quality {q}: {size_kb:.2f} KB") | |
| if size_kb <= max_size_kb: | |
| return compressed | |
| return None | |
| def compress_image(img, format, quality, options={}): | |
| output_buffer = io.BytesIO() | |
| img.save(output_buffer, format=format, quality=quality, **options) | |
| return output_buffer.getvalue() | |
| # -------------------------------------------------------------------------------------- | |
| # GEMINI CALL & RESPONSE HANDLING FUNCTIONS | |
| # -------------------------------------------------------------------------------------- | |
| def flexible_handle_stamp_response(response_text): | |
| cleaned_text = response_text.strip() | |
| cleaned_text = re.sub(r"^```(json)?", "", cleaned_text, flags=re.IGNORECASE) | |
| cleaned_text = re.sub(r"```$", "", cleaned_text).strip() | |
| logger.debug(f"Cleaned response: {cleaned_text}") | |
| try: | |
| data = json.loads(cleaned_text) | |
| logger.debug("JSON parsed successfully.") | |
| except json.JSONDecodeError as e: | |
| logger.error(f"JSON parsing error: {e}. Full response: {response_text}") | |
| raise ValueError(f"Failed to parse JSON response: {e}") | |
| # Enforce consistent output: | |
| flexible_data = {} | |
| for key in EXPECTED_FIELDS: | |
| flexible_data[key] = data.get(key, None) | |
| # Preserve any extra keys returned. | |
| for key, value in data.items(): | |
| if key not in flexible_data: | |
| flexible_data[key] = value | |
| logger.debug(f"Final parsed data: {flexible_data}") | |
| return flexible_data | |
| def call_gemini_stamp_scan(file_path, api_key): | |
| client = genai.Client(api_key=api_key) | |
| logger.debug("Initialized Gemini client.") | |
| # Convert file to image if needed. | |
| try: | |
| pil_image = convert_file_to_image(file_path) | |
| except Exception as e: | |
| logger.error(f"File conversion failed: {e}") | |
| raise ValueError(f"File conversion failed: {e}") | |
| # Save the image to a BytesIO buffer for optimization. | |
| buffer = io.BytesIO() | |
| pil_image.save(buffer, format="JPEG") | |
| buffer.seek(0) | |
| image_bytes = optimize_image_for_gemini(buffer) | |
| if image_bytes is None: | |
| raise ValueError("Image optimization failed.") | |
| # Prepare the prompt for Gemini. | |
| user_prompt = ( | |
| "You are a highly accurate document scanning system specializing in Italian revenue stamps (\"marche da bollo\"). " | |
| "Analyze the attached stamp image and extract each key element as a JSON object. " | |
| "The output MUST include the following fields, with values in a consistent format:\n" | |
| " - StampType: The type of stamp (should be 'Marca da Bollo').\n" | |
| " - StampNumber: The unique serial/identification number on the stamp.\n" | |
| " - IssueDate: The date of issuance in YYYY-MM-DD format.\n" | |
| " - Value: The monetary value of the stamp (e.g., '16.00').\n" | |
| " - Barcode: The barcode data if present; otherwise null.\n\n" | |
| "Do not include any extra commentary. " | |
| "If any element is missing or unclear, set its value to null.\n\n" | |
| "Example Output Format:\n" | |
| "{\n" | |
| ' "StampType": "Marca da Bollo",\n' | |
| ' "StampNumber": "AB123456789",\n' | |
| ' "IssueDate": "2023-05-10",\n' | |
| ' "Value": "16.00",\n' | |
| ' "Barcode": null\n' | |
| "}\n\n" | |
| "Process the attached stamp image and return the JSON with the appropriate labels." | |
| ) | |
| contents = [ | |
| types.Content(role="user", parts=[types.Part(text=user_prompt)]), | |
| types.Content( | |
| role="user", | |
| parts=[ | |
| types.Part(text="STAMP IMAGE:"), | |
| types.Part.from_bytes(data=image_bytes, mime_type="image/jpeg") | |
| ] | |
| ) | |
| ] | |
| logger.debug("Sending request to Gemini.") | |
| response = client.models.generate_content( | |
| model=MODEL_NAME, | |
| contents=contents, | |
| config=types.GenerateContentConfig(temperature=0.2) | |
| ) | |
| logger.debug("Received response from Gemini.") | |
| return flexible_handle_stamp_response(response.text) | |
| def stamp_scan_gradio(file_path): | |
| try: | |
| key = random.choice(API_KEYS) | |
| result = call_gemini_stamp_scan(file_path, key) | |
| return result | |
| except Exception as e: | |
| logger.error(f"Stamp scan failed: {e}") | |
| return {"Error": str(e)} | |
| # -------------------------------------------------------------------------------------- | |
| # GRADIO APP WITH UPDATED LAYOUT, RESET BUTTON, AND FILE CONVERSION SUPPORT | |
| # -------------------------------------------------------------------------------------- | |
| with gr.Blocks(title="Marca da Bollo Scanner") as demo: | |
| gr.HTML("<h2>Marca da Bollo Scanner</h2>") | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| # File input (supports image, PDF, DOC) | |
| file_input = gr.File(label="Upload Stamp (Image, PDF, DOC)", file_count="single") | |
| scan_button = gr.Button("Scan Stamp") | |
| reset_button = gr.Button("Start with a new Stamp") | |
| with gr.Column(scale=1): | |
| outputs = {} # Dictionary to hold output textboxes for each field | |
| for field in EXPECTED_FIELDS: | |
| with gr.Row(): | |
| tb = gr.Textbox(label=field, interactive=True, elem_id=f"text_{field}") | |
| # Custom copy button (HTML) to silently copy the textbox's value. | |
| copy_html = f""" | |
| <button onclick=" | |
| var container = document.getElementById('text_{field}'); | |
| var textarea = container.querySelector('textarea'); | |
| if(textarea) {{ | |
| navigator.clipboard.writeText(textarea.value); | |
| console.log('Copied: ' + textarea.value); | |
| }} | |
| ">Copy</button> | |
| """ | |
| copy_btn = gr.HTML(copy_html) | |
| outputs[field] = tb | |
| def process_and_format(file_obj): | |
| if file_obj is None: | |
| return ["" for _ in EXPECTED_FIELDS] | |
| file_path = file_obj.name | |
| result = stamp_scan_gradio(file_path) | |
| return [result.get(field, "") for field in EXPECTED_FIELDS] | |
| def reset_ui(): | |
| # Return None for file input and empty strings for each output textbox. | |
| return [None] + ["" for _ in EXPECTED_FIELDS] | |
| scan_button.click( | |
| process_and_format, | |
| inputs=file_input, | |
| outputs=[outputs[field] for field in EXPECTED_FIELDS] | |
| ) | |
| reset_button.click( | |
| reset_ui, | |
| inputs=None, | |
| outputs=[file_input] + [outputs[field] for field in EXPECTED_FIELDS] | |
| ) | |
| if __name__ == "__main__": | |
| demo.launch(share=True) | |