Bollo_Reader / app.py
fsdfdsdf's picture
Update app.py
44d8834 verified
# app.py
import os
import re
import io
import json
import time
import random
import logging
import subprocess
import sys
import gradio as gr
from PIL import Image, ImageSequence
# Automatically install the google-generative-ai package if not available.
try:
from google import genai
from google.genai import types
except ImportError:
subprocess.check_call([sys.executable, "-m", "pip", "install", "google-genai"])
from google import genai
from google.genai import types
# --------------------------------------------------------------------------------------
# CONFIGURATION
# --------------------------------------------------------------------------------------
MODEL_NAME = "gemini-2.0-flash-thinking-exp-01-21"
# Load API keys from the environment variable "MY_API_KEYS"
api_keys_str = os.environ.get("MY_API_KEYS")
if not api_keys_str:
raise ValueError("API keys are not set. Please set the MY_API_KEYS environment variable in your Hugging Face Space secrets.")
API_KEYS = api_keys_str.split(",") # Keys should be comma separated
EXPECTED_FIELDS = [
"StampType", # e.g., "Marca da Bollo"
"StampNumber", # Unique serial or identification number.
"IssueDate", # Date of issuance in YYYY-MM-DD format.
"Value", # Monetary value, e.g., "16.00"
"Barcode" # Barcode data if present.
]
MAX_SIZE_KB = 15000
QUALITY = 85
MAX_WIDTH = 2048
MAX_HEIGHT = 2048
# --------------------------------------------------------------------------------------
# LOGGING CONFIGURATION
# --------------------------------------------------------------------------------------
logging.basicConfig(
level=logging.DEBUG,
format="%(asctime)s [%(levelname)s] %(message)s",
)
logger = logging.getLogger(__name__)
# --------------------------------------------------------------------------------------
# FILE CONVERSION FUNCTION
# --------------------------------------------------------------------------------------
def convert_file_to_image(file_path):
"""
Converts the uploaded file to a PIL Image.
If the file is a PDF, converts its first page.
For DOC/DOCX files, raises NotImplementedError.
Otherwise, treats the file as an image.
"""
ext = os.path.splitext(file_path)[1].lower()
if ext == ".pdf":
try:
from pdf2image import convert_from_path
except ImportError:
raise ImportError("pdf2image is not installed. Please install it with 'pip install pdf2image'")
logger.debug("Converting PDF to image (first page).")
images = convert_from_path(file_path, first_page=1, last_page=1)
if images:
return images[0]
else:
raise ValueError("No pages found in PDF.")
elif ext in [".doc", ".docx"]:
raise NotImplementedError("DOC/DOCX conversion is not implemented. Please convert to PDF or image first.")
else:
logger.debug("File is an image; opening.")
return Image.open(file_path)
# --------------------------------------------------------------------------------------
# IMAGE OPTIMIZATION FUNCTIONS
# --------------------------------------------------------------------------------------
def optimize_image_for_gemini(image_file, max_size_kb=MAX_SIZE_KB, quality=QUALITY, max_width=MAX_WIDTH, max_height=MAX_HEIGHT):
try:
img = Image.open(image_file)
logger.debug("Opened image successfully.")
except Exception as e:
logger.error(f"Failed to open image: {e}")
return None
if getattr(img, "is_animated", False):
return _optimize_animated(img, max_size_kb, max_width, max_height)
if 'A' in img.mode:
img = img.convert("RGBA")
else:
img = img.convert("RGB")
img = resize_image(img, max_width, max_height)
compression_plan = [
("WEBP", 75, {}),
("WEBP", 65, {}),
("WEBP", 55, {}),
("JPEG", 90, {"optimize": True}),
("JPEG", 85, {"optimize": True}),
("JPEG", 80, {"optimize": True})
]
optimized = _attempt_compressions(img, compression_plan, max_size_kb)
if optimized:
logger.debug("Image optimized successfully.")
return optimized
logger.error("Could not compress image below target size.")
return None
def _optimize_animated(img, max_size_kb, max_width, max_height):
frames = []
durations = []
for i, frame in enumerate(ImageSequence.Iterator(img)):
if i % 2 == 0:
frame = frame.convert("RGBA") if 'A' in frame.mode else frame.convert("RGB")
frame = resize_image(frame, max_width, max_height)
frames.append(frame)
durations.append(frame.info.get('duration', 40))
if not frames:
logger.error("No frames found in animated image.")
return None
output_buffer = io.BytesIO()
try:
frames[0].save(
output_buffer,
format="WEBP",
save_all=True,
append_images=frames[1:],
duration=durations,
loop=0,
lossless=False,
quality=75,
method=6
)
except Exception as e:
logger.error(f"Could not save animated image: {e}")
return None
optimized_bytes = output_buffer.getvalue()
size_kb = len(optimized_bytes) / 1024
logger.debug(f"Animated image size: {size_kb:.2f} KB")
if size_kb <= max_size_kb:
return optimized_bytes
else:
logger.error("Animated image exceeds target size.")
return None
def resize_image(img, max_width, max_height):
width, height = img.size
if width > max_width or height > max_height:
if width >= height:
new_width = max_width
new_height = int(height * (max_width / width))
else:
new_height = max_height
new_width = int(width * (max_height / height))
img = img.resize((new_width, new_height), Image.Resampling.LANCZOS)
logger.debug(f"Image resized to: {new_width}x{new_height}")
return img
def _attempt_compressions(img, compression_plan, max_size_kb):
for fmt, q, options in compression_plan:
compressed = compress_image(img, fmt, q, options)
size_kb = len(compressed) / 1024
logger.debug(f"Compression with {fmt} quality {q}: {size_kb:.2f} KB")
if size_kb <= max_size_kb:
return compressed
return None
def compress_image(img, format, quality, options={}):
output_buffer = io.BytesIO()
img.save(output_buffer, format=format, quality=quality, **options)
return output_buffer.getvalue()
# --------------------------------------------------------------------------------------
# GEMINI CALL & RESPONSE HANDLING FUNCTIONS
# --------------------------------------------------------------------------------------
def flexible_handle_stamp_response(response_text):
cleaned_text = response_text.strip()
cleaned_text = re.sub(r"^```(json)?", "", cleaned_text, flags=re.IGNORECASE)
cleaned_text = re.sub(r"```$", "", cleaned_text).strip()
logger.debug(f"Cleaned response: {cleaned_text}")
try:
data = json.loads(cleaned_text)
logger.debug("JSON parsed successfully.")
except json.JSONDecodeError as e:
logger.error(f"JSON parsing error: {e}. Full response: {response_text}")
raise ValueError(f"Failed to parse JSON response: {e}")
# Enforce consistent output:
flexible_data = {}
for key in EXPECTED_FIELDS:
flexible_data[key] = data.get(key, None)
# Preserve any extra keys returned.
for key, value in data.items():
if key not in flexible_data:
flexible_data[key] = value
logger.debug(f"Final parsed data: {flexible_data}")
return flexible_data
def call_gemini_stamp_scan(file_path, api_key):
client = genai.Client(api_key=api_key)
logger.debug("Initialized Gemini client.")
# Convert file to image if needed.
try:
pil_image = convert_file_to_image(file_path)
except Exception as e:
logger.error(f"File conversion failed: {e}")
raise ValueError(f"File conversion failed: {e}")
# Save the image to a BytesIO buffer for optimization.
buffer = io.BytesIO()
pil_image.save(buffer, format="JPEG")
buffer.seek(0)
image_bytes = optimize_image_for_gemini(buffer)
if image_bytes is None:
raise ValueError("Image optimization failed.")
# Prepare the prompt for Gemini.
user_prompt = (
"You are a highly accurate document scanning system specializing in Italian revenue stamps (\"marche da bollo\"). "
"Analyze the attached stamp image and extract each key element as a JSON object. "
"The output MUST include the following fields, with values in a consistent format:\n"
" - StampType: The type of stamp (should be 'Marca da Bollo').\n"
" - StampNumber: The unique serial/identification number on the stamp.\n"
" - IssueDate: The date of issuance in YYYY-MM-DD format.\n"
" - Value: The monetary value of the stamp (e.g., '16.00').\n"
" - Barcode: The barcode data if present; otherwise null.\n\n"
"Do not include any extra commentary. "
"If any element is missing or unclear, set its value to null.\n\n"
"Example Output Format:\n"
"{\n"
' "StampType": "Marca da Bollo",\n'
' "StampNumber": "AB123456789",\n'
' "IssueDate": "2023-05-10",\n'
' "Value": "16.00",\n'
' "Barcode": null\n'
"}\n\n"
"Process the attached stamp image and return the JSON with the appropriate labels."
)
contents = [
types.Content(role="user", parts=[types.Part(text=user_prompt)]),
types.Content(
role="user",
parts=[
types.Part(text="STAMP IMAGE:"),
types.Part.from_bytes(data=image_bytes, mime_type="image/jpeg")
]
)
]
logger.debug("Sending request to Gemini.")
response = client.models.generate_content(
model=MODEL_NAME,
contents=contents,
config=types.GenerateContentConfig(temperature=0.2)
)
logger.debug("Received response from Gemini.")
return flexible_handle_stamp_response(response.text)
def stamp_scan_gradio(file_path):
try:
key = random.choice(API_KEYS)
result = call_gemini_stamp_scan(file_path, key)
return result
except Exception as e:
logger.error(f"Stamp scan failed: {e}")
return {"Error": str(e)}
# --------------------------------------------------------------------------------------
# GRADIO APP WITH UPDATED LAYOUT, RESET BUTTON, AND FILE CONVERSION SUPPORT
# --------------------------------------------------------------------------------------
with gr.Blocks(title="Marca da Bollo Scanner") as demo:
gr.HTML("<h2>Marca da Bollo Scanner</h2>")
with gr.Row():
with gr.Column(scale=1):
# File input (supports image, PDF, DOC)
file_input = gr.File(label="Upload Stamp (Image, PDF, DOC)", file_count="single")
scan_button = gr.Button("Scan Stamp")
reset_button = gr.Button("Start with a new Stamp")
with gr.Column(scale=1):
outputs = {} # Dictionary to hold output textboxes for each field
for field in EXPECTED_FIELDS:
with gr.Row():
tb = gr.Textbox(label=field, interactive=True, elem_id=f"text_{field}")
# Custom copy button (HTML) to silently copy the textbox's value.
copy_html = f"""
<button onclick="
var container = document.getElementById('text_{field}');
var textarea = container.querySelector('textarea');
if(textarea) {{
navigator.clipboard.writeText(textarea.value);
console.log('Copied: ' + textarea.value);
}}
">Copy</button>
"""
copy_btn = gr.HTML(copy_html)
outputs[field] = tb
def process_and_format(file_obj):
if file_obj is None:
return ["" for _ in EXPECTED_FIELDS]
file_path = file_obj.name
result = stamp_scan_gradio(file_path)
return [result.get(field, "") for field in EXPECTED_FIELDS]
def reset_ui():
# Return None for file input and empty strings for each output textbox.
return [None] + ["" for _ in EXPECTED_FIELDS]
scan_button.click(
process_and_format,
inputs=file_input,
outputs=[outputs[field] for field in EXPECTED_FIELDS]
)
reset_button.click(
reset_ui,
inputs=None,
outputs=[file_input] + [outputs[field] for field in EXPECTED_FIELDS]
)
if __name__ == "__main__":
demo.launch(share=True)