# app.py import os import re import io import json import time import random import logging import subprocess import sys import gradio as gr from PIL import Image, ImageSequence # Automatically install the google-generative-ai package if not available. try: from google import genai from google.genai import types except ImportError: subprocess.check_call([sys.executable, "-m", "pip", "install", "google-genai"]) from google import genai from google.genai import types # -------------------------------------------------------------------------------------- # CONFIGURATION # -------------------------------------------------------------------------------------- MODEL_NAME = "gemini-2.0-flash-thinking-exp-01-21" # Load API keys from the environment variable "MY_API_KEYS" api_keys_str = os.environ.get("MY_API_KEYS") if not api_keys_str: raise ValueError("API keys are not set. Please set the MY_API_KEYS environment variable in your Hugging Face Space secrets.") API_KEYS = api_keys_str.split(",") # Keys should be comma separated EXPECTED_FIELDS = [ "StampType", # e.g., "Marca da Bollo" "StampNumber", # Unique serial or identification number. "IssueDate", # Date of issuance in YYYY-MM-DD format. "Value", # Monetary value, e.g., "16.00" "Barcode" # Barcode data if present. ] MAX_SIZE_KB = 15000 QUALITY = 85 MAX_WIDTH = 2048 MAX_HEIGHT = 2048 # -------------------------------------------------------------------------------------- # LOGGING CONFIGURATION # -------------------------------------------------------------------------------------- logging.basicConfig( level=logging.DEBUG, format="%(asctime)s [%(levelname)s] %(message)s", ) logger = logging.getLogger(__name__) # -------------------------------------------------------------------------------------- # FILE CONVERSION FUNCTION # -------------------------------------------------------------------------------------- def convert_file_to_image(file_path): """ Converts the uploaded file to a PIL Image. If the file is a PDF, converts its first page. For DOC/DOCX files, raises NotImplementedError. Otherwise, treats the file as an image. """ ext = os.path.splitext(file_path)[1].lower() if ext == ".pdf": try: from pdf2image import convert_from_path except ImportError: raise ImportError("pdf2image is not installed. Please install it with 'pip install pdf2image'") logger.debug("Converting PDF to image (first page).") images = convert_from_path(file_path, first_page=1, last_page=1) if images: return images[0] else: raise ValueError("No pages found in PDF.") elif ext in [".doc", ".docx"]: raise NotImplementedError("DOC/DOCX conversion is not implemented. Please convert to PDF or image first.") else: logger.debug("File is an image; opening.") return Image.open(file_path) # -------------------------------------------------------------------------------------- # IMAGE OPTIMIZATION FUNCTIONS # -------------------------------------------------------------------------------------- def optimize_image_for_gemini(image_file, max_size_kb=MAX_SIZE_KB, quality=QUALITY, max_width=MAX_WIDTH, max_height=MAX_HEIGHT): try: img = Image.open(image_file) logger.debug("Opened image successfully.") except Exception as e: logger.error(f"Failed to open image: {e}") return None if getattr(img, "is_animated", False): return _optimize_animated(img, max_size_kb, max_width, max_height) if 'A' in img.mode: img = img.convert("RGBA") else: img = img.convert("RGB") img = resize_image(img, max_width, max_height) compression_plan = [ ("WEBP", 75, {}), ("WEBP", 65, {}), ("WEBP", 55, {}), ("JPEG", 90, {"optimize": True}), ("JPEG", 85, {"optimize": True}), ("JPEG", 80, {"optimize": True}) ] optimized = _attempt_compressions(img, compression_plan, max_size_kb) if optimized: logger.debug("Image optimized successfully.") return optimized logger.error("Could not compress image below target size.") return None def _optimize_animated(img, max_size_kb, max_width, max_height): frames = [] durations = [] for i, frame in enumerate(ImageSequence.Iterator(img)): if i % 2 == 0: frame = frame.convert("RGBA") if 'A' in frame.mode else frame.convert("RGB") frame = resize_image(frame, max_width, max_height) frames.append(frame) durations.append(frame.info.get('duration', 40)) if not frames: logger.error("No frames found in animated image.") return None output_buffer = io.BytesIO() try: frames[0].save( output_buffer, format="WEBP", save_all=True, append_images=frames[1:], duration=durations, loop=0, lossless=False, quality=75, method=6 ) except Exception as e: logger.error(f"Could not save animated image: {e}") return None optimized_bytes = output_buffer.getvalue() size_kb = len(optimized_bytes) / 1024 logger.debug(f"Animated image size: {size_kb:.2f} KB") if size_kb <= max_size_kb: return optimized_bytes else: logger.error("Animated image exceeds target size.") return None def resize_image(img, max_width, max_height): width, height = img.size if width > max_width or height > max_height: if width >= height: new_width = max_width new_height = int(height * (max_width / width)) else: new_height = max_height new_width = int(width * (max_height / height)) img = img.resize((new_width, new_height), Image.Resampling.LANCZOS) logger.debug(f"Image resized to: {new_width}x{new_height}") return img def _attempt_compressions(img, compression_plan, max_size_kb): for fmt, q, options in compression_plan: compressed = compress_image(img, fmt, q, options) size_kb = len(compressed) / 1024 logger.debug(f"Compression with {fmt} quality {q}: {size_kb:.2f} KB") if size_kb <= max_size_kb: return compressed return None def compress_image(img, format, quality, options={}): output_buffer = io.BytesIO() img.save(output_buffer, format=format, quality=quality, **options) return output_buffer.getvalue() # -------------------------------------------------------------------------------------- # GEMINI CALL & RESPONSE HANDLING FUNCTIONS # -------------------------------------------------------------------------------------- def flexible_handle_stamp_response(response_text): cleaned_text = response_text.strip() cleaned_text = re.sub(r"^```(json)?", "", cleaned_text, flags=re.IGNORECASE) cleaned_text = re.sub(r"```$", "", cleaned_text).strip() logger.debug(f"Cleaned response: {cleaned_text}") try: data = json.loads(cleaned_text) logger.debug("JSON parsed successfully.") except json.JSONDecodeError as e: logger.error(f"JSON parsing error: {e}. Full response: {response_text}") raise ValueError(f"Failed to parse JSON response: {e}") # Enforce consistent output: flexible_data = {} for key in EXPECTED_FIELDS: flexible_data[key] = data.get(key, None) # Preserve any extra keys returned. for key, value in data.items(): if key not in flexible_data: flexible_data[key] = value logger.debug(f"Final parsed data: {flexible_data}") return flexible_data def call_gemini_stamp_scan(file_path, api_key): client = genai.Client(api_key=api_key) logger.debug("Initialized Gemini client.") # Convert file to image if needed. try: pil_image = convert_file_to_image(file_path) except Exception as e: logger.error(f"File conversion failed: {e}") raise ValueError(f"File conversion failed: {e}") # Save the image to a BytesIO buffer for optimization. buffer = io.BytesIO() pil_image.save(buffer, format="JPEG") buffer.seek(0) image_bytes = optimize_image_for_gemini(buffer) if image_bytes is None: raise ValueError("Image optimization failed.") # Prepare the prompt for Gemini. user_prompt = ( "You are a highly accurate document scanning system specializing in Italian revenue stamps (\"marche da bollo\"). " "Analyze the attached stamp image and extract each key element as a JSON object. " "The output MUST include the following fields, with values in a consistent format:\n" " - StampType: The type of stamp (should be 'Marca da Bollo').\n" " - StampNumber: The unique serial/identification number on the stamp.\n" " - IssueDate: The date of issuance in YYYY-MM-DD format.\n" " - Value: The monetary value of the stamp (e.g., '16.00').\n" " - Barcode: The barcode data if present; otherwise null.\n\n" "Do not include any extra commentary. " "If any element is missing or unclear, set its value to null.\n\n" "Example Output Format:\n" "{\n" ' "StampType": "Marca da Bollo",\n' ' "StampNumber": "AB123456789",\n' ' "IssueDate": "2023-05-10",\n' ' "Value": "16.00",\n' ' "Barcode": null\n' "}\n\n" "Process the attached stamp image and return the JSON with the appropriate labels." ) contents = [ types.Content(role="user", parts=[types.Part(text=user_prompt)]), types.Content( role="user", parts=[ types.Part(text="STAMP IMAGE:"), types.Part.from_bytes(data=image_bytes, mime_type="image/jpeg") ] ) ] logger.debug("Sending request to Gemini.") response = client.models.generate_content( model=MODEL_NAME, contents=contents, config=types.GenerateContentConfig(temperature=0.2) ) logger.debug("Received response from Gemini.") return flexible_handle_stamp_response(response.text) def stamp_scan_gradio(file_path): try: key = random.choice(API_KEYS) result = call_gemini_stamp_scan(file_path, key) return result except Exception as e: logger.error(f"Stamp scan failed: {e}") return {"Error": str(e)} # -------------------------------------------------------------------------------------- # GRADIO APP WITH UPDATED LAYOUT, RESET BUTTON, AND FILE CONVERSION SUPPORT # -------------------------------------------------------------------------------------- with gr.Blocks(title="Marca da Bollo Scanner") as demo: gr.HTML("