| | import logging |
| | import os |
| | import io |
| | import base64 |
| | import json |
| | import requests |
| | import threading |
| | import uuid |
| | import time |
| | import tempfile |
| | import subprocess |
| | import shutil |
| | import re |
| | from datetime import datetime |
| | from flask import Flask, render_template, request, jsonify, Response, stream_with_context, send_from_directory |
| | from google import genai |
| | from google.genai import types |
| | from PIL import Image |
| |
|
| | |
| | logging.basicConfig( |
| | level=logging.INFO, |
| | format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', |
| | datefmt='%Y-%m-%d %H:%M:%S' |
| | ) |
| | logger = logging.getLogger(__name__) |
| |
|
| | |
| | app = Flask(__name__) |
| |
|
| | |
| | GOOGLE_API_KEY = os.environ.get("GOOGLE_API_KEY") |
| | TELEGRAM_BOT_TOKEN = "8004545342:AAGcZaoDjYg8dmbbXRsR1N3TfSSbEiAGz88" |
| | TELEGRAM_CHAT_ID = "-1002564204301" |
| | GENERATED_PDF_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'generated_pdfs') |
| | USER_IMAGES_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'user_images') |
| |
|
| | |
| | client = None |
| | if GOOGLE_API_KEY: |
| | try: |
| | client = genai.Client(api_key=GOOGLE_API_KEY) |
| | logger.info("Client Google GenAI initialisé avec succès.") |
| | except Exception as e: |
| | logger.critical(f"Erreur critique lors de l'initialisation du client Gemini: {e}", exc_info=True) |
| | else: |
| | logger.critical("GOOGLE_API_KEY non trouvé dans les variables d'environnement. Le service ne fonctionnera pas.") |
| |
|
| | task_results = {} |
| |
|
| | |
| |
|
| | def load_prompt_from_file(filename): |
| | """Charge le contenu d'un fichier de prompt.""" |
| | try: |
| | prompts_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'prompts') |
| | filepath = os.path.join(prompts_dir, filename) |
| | logger.info(f"Chargement du prompt depuis '{filepath}'") |
| | with open(filepath, 'r', encoding='utf-8') as f: |
| | return f.read() |
| | except Exception as e: |
| | logger.error(f"Erreur lors du chargement du prompt '{filename}': {e}", exc_info=True) |
| | return "" |
| |
|
| | def get_prompt_for_style(style): |
| | """Retourne le prompt approprié en fonction du style demandé.""" |
| | logger.info(f"Sélection du prompt pour le style: '{style}'") |
| | return load_prompt_from_file('prompt_light.txt') if style == 'light' else load_prompt_from_file('prompt_colorful.txt') |
| |
|
| | def check_latex_installation(): |
| | """Vérifie si pdflatex est installé et accessible dans le PATH.""" |
| | logger.info("Vérification de l'installation de LaTeX (pdflatex)...") |
| | try: |
| | subprocess.run(["pdflatex", "-version"], capture_output=True, check=True, timeout=10) |
| | logger.info("Vérification réussie: pdflatex est installé et fonctionnel.") |
| | return True |
| | except (subprocess.CalledProcessError, FileNotFoundError, subprocess.TimeoutExpired) as e: |
| | logger.warning(f"pdflatex n'est pas installé ou n'est pas dans le PATH. La génération de PDF sera désactivée. Erreur: {e}") |
| | return False |
| |
|
| | IS_LATEX_INSTALLED = check_latex_installation() |
| |
|
| | def save_user_image(image_data, filename, task_id): |
| | """Sauvegarde une image utilisateur dans le dossier user_images.""" |
| | try: |
| | |
| | file_extension = os.path.splitext(filename)[1] |
| | safe_filename = f"{task_id}_{filename}" |
| | image_path = os.path.join(USER_IMAGES_DIR, safe_filename) |
| | |
| | with open(image_path, 'wb') as f: |
| | f.write(image_data) |
| | |
| | logger.info(f"Image utilisateur sauvegardée: {safe_filename}") |
| | return safe_filename |
| | except Exception as e: |
| | logger.error(f"Erreur lors de la sauvegarde de l'image {filename}: {e}") |
| | return None |
| |
|
| | def upload_file_to_genai_api(file_data, filename, mime_type): |
| | """Upload un fichier vers l'API Files de Google GenAI.""" |
| | try: |
| | |
| | if mime_type.startswith('image/'): |
| | if mime_type == 'image/jpeg': |
| | suffix = '.jpg' |
| | elif mime_type == 'image/png': |
| | suffix = '.png' |
| | elif mime_type == 'image/gif': |
| | suffix = '.gif' |
| | elif mime_type == 'image/webp': |
| | suffix = '.webp' |
| | else: |
| | suffix = '.jpg' |
| | elif mime_type == 'application/pdf': |
| | suffix = '.pdf' |
| | else: |
| | suffix = '.tmp' |
| | |
| | |
| | with tempfile.NamedTemporaryFile(delete=False, suffix=suffix) as temp_file: |
| | temp_file.write(file_data) |
| | temp_file_path = temp_file.name |
| | |
| | logger.info(f"Upload du fichier '{filename}' ({len(file_data)} bytes) vers l'API Files de Google GenAI...") |
| | |
| | |
| | file_ref = client.files.upload(file=temp_file_path) |
| | |
| | |
| | os.unlink(temp_file_path) |
| | |
| | logger.info(f"Fichier '{filename}' uploadé avec succès. Référence: {file_ref.name}") |
| | return file_ref |
| | |
| | except Exception as e: |
| | logger.error(f"Erreur lors de l'upload du fichier '{filename}' vers l'API Files: {e}") |
| | |
| | try: |
| | if 'temp_file_path' in locals(): |
| | os.unlink(temp_file_path) |
| | except: |
| | pass |
| | return None |
| |
|
| | def call_gemini_with_fallback(contents, task_id): |
| | """Appelle Gemini 2.5 Pro en premier, puis 2.5 Flash en cas d'échec.""" |
| | models_to_try = [ |
| | {"name": "gemini-3-flash-preview", "display_name": " 2.5 Pro"}, |
| | {"name": "gemini-3-flash-preview", "display_name": " 2.5 Flash"} |
| | ] |
| | |
| | last_error = None |
| | |
| | for i, model_info in enumerate(models_to_try): |
| | model_name = model_info["name"] |
| | model_display = model_info["display_name"] |
| | |
| | try: |
| | logger.info(f"[Task {task_id}] Tentative {i+1}/2: Appel de {model_display}...") |
| | |
| | |
| | if model_name == "gemini-3-flash-preview": |
| | task_results[task_id]['status'] = 'generating_latex_pro' |
| | task_results[task_id]['current_model'] = 'Gemini 2.5 Pro' |
| | else: |
| | task_results[task_id]['status'] = 'generating_latex_flash' |
| | task_results[task_id]['current_model'] = 'Gemini 2.5 Flash (fallback)' |
| | |
| | |
| | gemini_response = client.models.generate_content( |
| | model=model_name, |
| | contents=contents, |
| | config=types.GenerateContentConfig(tools=[types.Tool(code_execution=types.ToolCodeExecution)]) |
| | ) |
| | |
| | |
| | full_latex_response = "" |
| | if gemini_response.candidates and gemini_response.candidates[0].content and gemini_response.candidates[0].content.parts: |
| | for part in gemini_response.candidates[0].content.parts: |
| | if hasattr(part, 'text') and part.text: |
| | full_latex_response += part.text |
| | |
| | if not full_latex_response.strip(): |
| | raise ValueError(f"Réponse vide de {model_display}") |
| | |
| | logger.info(f"[Task {task_id}] ✓ Succès avec {model_display}") |
| | task_results[task_id]['used_model'] = model_display |
| | return full_latex_response |
| | |
| | except Exception as e: |
| | error_msg = str(e) |
| | logger.warning(f"[Task {task_id}] ✗ Échec avec {model_display}: {error_msg}") |
| | last_error = e |
| | |
| | |
| | if i == len(models_to_try) - 1: |
| | logger.error(f"[Task {task_id}] Tous les modèles ont échoué. Dernière erreur: {error_msg}") |
| | task_results[task_id]['model_failures'] = [ |
| | f"{models_to_try[0]['display_name']}: Échec", |
| | f"{models_to_try[1]['display_name']}: {error_msg}" |
| | ] |
| | raise last_error |
| | |
| | |
| | time.sleep(2) |
| | |
| | |
| | raise last_error if last_error else Exception("Erreur inconnue lors des appels aux modèles Gemini") |
| |
|
| | def get_all_tasks_info(): |
| | """Récupère toutes les informations des tâches pour le centre de gestion.""" |
| | tasks_info = [] |
| | |
| | for task_id, task_data in task_results.items(): |
| | task_info = { |
| | 'id': task_id, |
| | 'status': task_data['status'], |
| | 'style': task_data.get('style', 'unknown'), |
| | 'first_filename': task_data.get('first_filename', 'Unknown'), |
| | 'time_started': task_data.get('time_started', 0), |
| | 'time_started_formatted': datetime.fromtimestamp(task_data.get('time_started', 0)).strftime('%Y-%m-%d %H:%M:%S'), |
| | 'file_count': task_data.get('file_count', {'images': 0, 'pdfs': 0}), |
| | 'pdf_filename': task_data.get('pdf_filename'), |
| | 'error': task_data.get('error'), |
| | 'response': task_data.get('response', ''), |
| | 'used_model': task_data.get('used_model', 'N/A'), |
| | 'current_model': task_data.get('current_model', ''), |
| | 'model_failures': task_data.get('model_failures', []), |
| | 'user_images': [] |
| | } |
| | |
| | |
| | if os.path.exists(USER_IMAGES_DIR): |
| | for img_file in os.listdir(USER_IMAGES_DIR): |
| | if img_file.startswith(f"{task_id}_"): |
| | task_info['user_images'].append(img_file) |
| | |
| | tasks_info.append(task_info) |
| | |
| | |
| | tasks_info.sort(key=lambda x: x['time_started'], reverse=True) |
| | return tasks_info |
| |
|
| | def get_system_stats(): |
| | """Récupère les statistiques du système.""" |
| | total_tasks = len(task_results) |
| | completed_tasks = sum(1 for task in task_results.values() if task['status'] == 'completed') |
| | error_tasks = sum(1 for task in task_results.values() if task['status'] == 'error') |
| | pending_tasks = sum(1 for task in task_results.values() if task['status'] not in ['completed', 'error']) |
| | |
| | |
| | pro_usage = sum(1 for task in task_results.values() if task.get('used_model') == 'Gemini 2.5 Pro') |
| | flash_usage = sum(1 for task in task_results.values() if task.get('used_model') == 'Gemini 2.5 Flash (fallback)') |
| | |
| | |
| | pdf_files = 0 |
| | if os.path.exists(GENERATED_PDF_DIR): |
| | pdf_files = len([f for f in os.listdir(GENERATED_PDF_DIR) if f.endswith('.pdf')]) |
| | |
| | |
| | user_images = 0 |
| | if os.path.exists(USER_IMAGES_DIR): |
| | user_images = len([f for f in os.listdir(USER_IMAGES_DIR) if f.lower().endswith(('.png', '.jpg', '.jpeg', '.gif', '.webp'))]) |
| | |
| | return { |
| | 'total_tasks': total_tasks, |
| | 'completed_tasks': completed_tasks, |
| | 'error_tasks': error_tasks, |
| | 'pending_tasks': pending_tasks, |
| | 'pdf_files': pdf_files, |
| | 'user_images': user_images, |
| | 'latex_installed': IS_LATEX_INSTALLED, |
| | 'pro_usage': pro_usage, |
| | 'flash_usage': flash_usage |
| | } |
| |
|
| | def clean_latex_code(latex_code): |
| | """Extrait le code LaTeX brut des blocs de code formatés (```latex ... ```).""" |
| | logger.info("Nettoyage du code LaTeX reçu de Gemini...") |
| | match_latex = re.search(r"```(?:latex|tex)\s*(.*?)\s*```", latex_code, re.DOTALL | re.IGNORECASE) |
| | if match_latex: |
| | logger.info("Bloc de code 'latex' ou 'tex' trouvé et extrait.") |
| | return match_latex.group(1).strip() |
| | |
| | match_generic = re.search(r"```\s*(\\documentclass.*?)\s*```", latex_code, re.DOTALL | re.IGNORECASE) |
| | if match_generic: |
| | logger.info("Bloc de code générique avec '\\documentclass' trouvé et extrait.") |
| | return match_generic.group(1).strip() |
| | |
| | logger.warning("Aucun bloc de code LaTeX (```...```) n'a été trouvé. Utilisation de la réponse brute.") |
| | return latex_code.strip() |
| |
|
| | def latex_to_pdf(latex_code, output_filename_base, output_dir): |
| | """Compile une chaîne de code LaTeX en fichier PDF.""" |
| | if not IS_LATEX_INSTALLED: |
| | logger.error("Tentative de compilation LaTeX alors que pdflatex n'est pas disponible.") |
| | return None, "Erreur: pdflatex n'est pas installé sur le serveur." |
| |
|
| | tex_filename = f"{output_filename_base}.tex" |
| | tex_path = os.path.join(output_dir, tex_filename) |
| | pdf_path = os.path.join(output_dir, f"{output_filename_base}.pdf") |
| | |
| | logger.info(f"Début de la compilation LaTeX vers PDF pour '{output_filename_base}'") |
| | |
| | try: |
| | with open(tex_path, "w", encoding="utf-8") as tex_file: |
| | tex_file.write(latex_code) |
| | logger.info(f"Fichier .tex '{tex_path}' créé avec succès.") |
| | |
| | my_env = os.environ.copy() |
| | my_env["LC_ALL"] = "C.UTF-8" |
| | my_env["LANG"] = "C.UTF-8" |
| | |
| | last_result = None |
| | for i in range(2): |
| | logger.info(f"Exécution de pdflatex - Passe {i+1}/2...") |
| | process = subprocess.run( |
| | ["pdflatex", "-interaction=nonstopmode", "-output-directory", output_dir, tex_path], |
| | capture_output=True, text=True, check=False, encoding="utf-8", errors="replace", env=my_env, timeout=60 |
| | ) |
| | last_result = process |
| | if not os.path.exists(pdf_path) and process.returncode != 0: |
| | logger.warning(f"La passe {i+1} de pdflatex a échoué et aucun PDF n'a été créé. Arrêt de la compilation.") |
| | break |
| | |
| | if os.path.exists(pdf_path): |
| | logger.info(f"PDF généré avec succès : '{pdf_path}'") |
| | return pdf_path, f"PDF généré: {os.path.basename(pdf_path)}" |
| | else: |
| | error_log = last_result.stdout + "\n" + last_result.stderr if last_result else "Aucun résultat de compilation disponible." |
| | logger.error(f"Échec de la compilation PDF pour '{tex_filename}'. Log de pdflatex:\n{error_log}") |
| | return None, f"Erreur de compilation PDF. Log: ...{error_log[-1000:]}" |
| | |
| | except Exception as e: |
| | logger.error(f"Exception pendant la génération du PDF: {e}", exc_info=True) |
| | return None, f"Exception durant la génération du PDF: {str(e)}" |
| |
|
| | def send_to_telegram(file_data, filename, caption="Nouveau fichier uploadé"): |
| | """Envoie un fichier au canal Telegram configuré.""" |
| | logger.info(f"Préparation de l'envoi du fichier '{filename}' à Telegram.") |
| | try: |
| | if filename.lower().endswith(('.png', '.jpg', '.jpeg', '.gif', '.webp')): |
| | url = f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/sendPhoto" |
| | files = {'photo': (filename, file_data)} |
| | log_msg = f"Envoi de l'image '{filename}' à Telegram..." |
| | else: |
| | url = f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/sendDocument" |
| | files = {'document': (filename, file_data)} |
| | log_msg = f"Envoi du document '{filename}' à Telegram..." |
| |
|
| | logger.info(log_msg) |
| | data = {'chat_id': TELEGRAM_CHAT_ID, 'caption': caption} |
| | response = requests.post(url, files=files, data=data, timeout=30) |
| | response.raise_for_status() |
| | logger.info(f"Fichier '{filename}' envoyé avec succès à Telegram.") |
| | except Exception as e: |
| | logger.error(f"Erreur lors de l'envoi à Telegram: {e}", exc_info=True) |
| |
|
| | |
| |
|
| | def process_files_background(task_id, files_data, resolution_style): |
| | """Fonction exécutée en thread pour traiter les fichiers, appeler Gemini et générer le PDF.""" |
| | logger.info(f"[Task {task_id}] Démarrage du traitement en arrière-plan.") |
| | task_results[task_id]['status'] = 'processing' |
| | uploaded_file_refs = [] |
| |
|
| | try: |
| | if not client: |
| | raise ConnectionError("Le client Gemini n'est pas initialisé.") |
| |
|
| | contents = [] |
| | logger.info(f"[Task {task_id}] Préparation des fichiers pour l'API Gemini.") |
| | |
| | |
| | for file_info in files_data: |
| | logger.info(f"[Task {task_id}] Traitement du fichier '{file_info['filename']}' ({file_info['type']}).") |
| | |
| | |
| | if file_info['type'].startswith('image/'): |
| | saved_filename = save_user_image(file_info['data'], file_info['filename'], task_id) |
| | |
| | |
| | file_ref = upload_file_to_genai_api( |
| | file_info['data'], |
| | file_info['filename'], |
| | file_info['type'] |
| | ) |
| | |
| | if file_ref: |
| | uploaded_file_refs.append(file_ref) |
| | contents.append(file_ref) |
| | logger.info(f"[Task {task_id}] Fichier '{file_info['filename']}' uploadé avec succès. Référence: {file_ref.name}") |
| | else: |
| | logger.warning(f"[Task {task_id}] Échec de l'upload du fichier '{file_info['filename']}'. Fichier ignoré.") |
| |
|
| | if not contents: |
| | raise ValueError("Aucun fichier n'a pu être uploadé vers l'API Files de Google GenAI.") |
| | |
| | prompt_to_use = get_prompt_for_style(resolution_style) |
| | if not prompt_to_use: |
| | raise ValueError(f"Le fichier de prompt pour le style '{resolution_style}' est introuvable ou vide.") |
| | contents.append(prompt_to_use) |
| |
|
| | |
| | logger.info(f"[Task {task_id}] Début des appels aux modèles Gemini avec {len(uploaded_file_refs)} fichier(s).") |
| | full_latex_response = call_gemini_with_fallback(contents, task_id) |
| |
|
| | logger.info(f"[Task {task_id}] Réponse reçue de Gemini ({task_results[task_id].get('used_model', 'modèle inconnu')}).") |
| | logger.debug(f"[Task {task_id}] Réponse brute de Gemini:\n---\n{full_latex_response[:500]}...\n---") |
| |
|
| | task_results[task_id]['status'] = 'cleaning_latex' |
| | cleaned_latex = clean_latex_code(full_latex_response) |
| | logger.debug(f"[Task {task_id}] Code LaTeX nettoyé:\n---\n{cleaned_latex[:500]}...\n---") |
| |
|
| | task_results[task_id]['status'] = 'generating_pdf' |
| | pdf_filename_base = f"solution_{task_id}" |
| | pdf_file_path, pdf_message = latex_to_pdf(cleaned_latex, pdf_filename_base, GENERATED_PDF_DIR) |
| |
|
| | if pdf_file_path: |
| | task_results[task_id]['status'] = 'completed' |
| | task_results[task_id]['pdf_filename'] = os.path.basename(pdf_file_path) |
| | used_model = task_results[task_id].get('used_model', 'modèle inconnu') |
| | task_results[task_id]['response'] = f"PDF généré avec succès: {os.path.basename(pdf_file_path)} (généré par {used_model})" |
| | logger.info(f"[Task {task_id}] Tâche terminée avec succès. PDF: {os.path.basename(pdf_file_path)} (modèle: {used_model})") |
| | else: |
| | raise RuntimeError(f"Échec de la génération du PDF: {pdf_message}") |
| |
|
| | except Exception as e: |
| | logger.error(f"[Task {task_id}] Une erreur est survenue dans le thread de traitement.", exc_info=True) |
| | task_results[task_id]['status'] = 'error' |
| | task_results[task_id]['error'] = str(e) |
| | task_results[task_id]['response'] = f"Une erreur est survenue: {str(e)}" |
| | finally: |
| | |
| | if uploaded_file_refs: |
| | logger.info(f"[Task {task_id}] Nettoyage des {len(uploaded_file_refs)} fichiers temporaires de l'API Files.") |
| | for file_ref in uploaded_file_refs: |
| | try: |
| | client.files.delete(file_ref) |
| | logger.info(f"[Task {task_id}] Fichier temporaire '{file_ref.name}' supprimé de l'API Files.") |
| | except Exception as del_e: |
| | logger.warning(f"[Task {task_id}] Échec de la suppression du fichier temporaire '{file_ref.name}': {del_e}") |
| |
|
| | |
| |
|
| | @app.route('/oups') |
| | def index(): |
| | logger.info(f"Requête servie pour l'endpoint '/' depuis {request.remote_addr}") |
| | return render_template('index.html') |
| |
|
| | @app.route('/admin1') |
| | def admin_panel(): |
| | """Centre de gestion complet.""" |
| | logger.info(f"Accès au centre de gestion depuis {request.remote_addr}") |
| | tasks_info = get_all_tasks_info() |
| | system_stats = get_system_stats() |
| | |
| | return render_template('admin.html', tasks=tasks_info, stats=system_stats) |
| |
|
| | @app.route('/admin/api/tasks') |
| | def admin_api_tasks(): |
| | """API JSON pour récupérer les informations des tâches.""" |
| | tasks_info = get_all_tasks_info() |
| | return jsonify(tasks_info) |
| |
|
| | @app.route('/admin/api/stats') |
| | def admin_api_stats(): |
| | """API JSON pour récupérer les statistiques système.""" |
| | stats = get_system_stats() |
| | return jsonify(stats) |
| |
|
| | @app.route('/admin/delete_task/<task_id>', methods=['POST']) |
| | def admin_delete_task(task_id): |
| | """Supprime une tâche et ses fichiers associés.""" |
| | logger.info(f"Demande de suppression de la tâche {task_id}") |
| | |
| | if task_id not in task_results: |
| | return jsonify({'error': 'Tâche introuvable'}), 404 |
| | |
| | try: |
| | task_data = task_results[task_id] |
| | |
| | |
| | if 'pdf_filename' in task_data: |
| | pdf_path = os.path.join(GENERATED_PDF_DIR, task_data['pdf_filename']) |
| | if os.path.exists(pdf_path): |
| | os.remove(pdf_path) |
| | logger.info(f"PDF supprimé: {pdf_path}") |
| | |
| | |
| | if os.path.exists(USER_IMAGES_DIR): |
| | for img_file in os.listdir(USER_IMAGES_DIR): |
| | if img_file.startswith(f"{task_id}_"): |
| | img_path = os.path.join(USER_IMAGES_DIR, img_file) |
| | os.remove(img_path) |
| | logger.info(f"Image utilisateur supprimée: {img_path}") |
| | |
| | |
| | del task_results[task_id] |
| | |
| | logger.info(f"Tâche {task_id} supprimée avec succès") |
| | return jsonify({'success': True}) |
| | |
| | except Exception as e: |
| | logger.error(f"Erreur lors de la suppression de la tâche {task_id}: {e}") |
| | return jsonify({'error': str(e)}), 500 |
| |
|
| | @app.route('/user_images/<filename>') |
| | def serve_user_image(filename): |
| | """Sert les images utilisateur.""" |
| | try: |
| | return send_from_directory(USER_IMAGES_DIR, filename) |
| | except FileNotFoundError: |
| | return "Image non trouvée", 404 |
| |
|
| | @app.route('/solve', methods=['POST']) |
| | def solve(): |
| | logger.info(f"Nouvelle requête sur /solve depuis {request.remote_addr}") |
| | try: |
| | if 'user_files' not in request.files: |
| | logger.warning(f"/solve: Requête de {request.remote_addr} sans 'user_files'.") |
| | return jsonify({'error': 'Aucun champ de fichier dans la requête'}), 400 |
| | |
| | uploaded_files = request.files.getlist('user_files') |
| | if not uploaded_files or all(f.filename == '' for f in uploaded_files): |
| | logger.warning(f"/solve: Requête de {request.remote_addr} avec champ 'user_files' mais sans fichiers.") |
| | return jsonify({'error': 'Aucun fichier sélectionné'}), 400 |
| |
|
| | resolution_style = request.form.get('style', 'colorful') |
| | files_data = [] |
| | file_count = {'images': 0, 'pdfs': 0} |
| | |
| | for file in uploaded_files: |
| | if not file.filename: continue |
| | file_data = file.read() |
| | file_type = file.content_type or 'application/octet-stream' |
| | |
| | if file_type.startswith('image/'): |
| | file_count['images'] += 1 |
| | files_data.append({'filename': file.filename, 'data': file_data, 'type': file_type}) |
| | send_to_telegram(file_data, file.filename, f"Image reçue: {file.filename} (Style: {resolution_style})") |
| | elif file_type == 'application/pdf': |
| | if file_count['pdfs'] >= 1: |
| | logger.warning(f"/solve: Requête de {request.remote_addr} avec plusieurs PDFs. Rejetée.") |
| | return jsonify({'error': 'Un seul fichier PDF est autorisé par requête'}), 400 |
| | file_count['pdfs'] += 1 |
| | files_data.append({'filename': file.filename, 'data': file_data, 'type': file_type}) |
| | send_to_telegram(file_data, file.filename, f"PDF reçu: {file.filename} (Style: {resolution_style})") |
| | else: |
| | logger.warning(f"/solve: Fichier non supporté '{file.filename}' de type '{file_type}' uploadé par {request.remote_addr}.") |
| |
|
| | if not files_data: |
| | logger.warning(f"/solve: Aucun fichier valide (image/pdf) trouvé dans la requête de {request.remote_addr}.") |
| | return jsonify({'error': 'Aucun fichier valide (image ou PDF) n\'a été fourni'}), 400 |
| | |
| | task_id = str(uuid.uuid4()) |
| | task_results[task_id] = { |
| | 'status': 'pending', 'response': '', 'error': None, 'time_started': time.time(), |
| | 'style': resolution_style, 'file_count': file_count, 'first_filename': files_data[0]['filename'] |
| | } |
| | |
| | logger.info(f"Création de la tâche {task_id} pour {file_count['images']} image(s) et {file_count['pdfs']} PDF(s). Style: {resolution_style}.") |
| | threading.Thread(target=process_files_background, args=(task_id, files_data, resolution_style)).start() |
| | |
| | return jsonify({'task_id': task_id, 'status': 'pending', 'first_filename': files_data[0]['filename']}) |
| |
|
| | except Exception as e: |
| | logger.error(f"Erreur inattendue dans l'endpoint /solve: {e}", exc_info=True) |
| | return jsonify({'error': f'Erreur interne du serveur: {e}'}), 500 |
| |
|
| | @app.route('/task/<task_id>', methods=['GET']) |
| | def get_task_status(task_id): |
| | logger.debug(f"Requête de statut pour la tâche {task_id}") |
| | task = task_results.get(task_id) |
| | if not task: |
| | logger.warning(f"Tentative d'accès à une tâche inexistante: {task_id}") |
| | return jsonify({'error': 'Tâche introuvable'}), 404 |
| | |
| | response_data = { |
| | 'status': task['status'], |
| | 'response': task.get('response'), |
| | 'error': task.get('error'), |
| | 'current_model': task.get('current_model', ''), |
| | 'used_model': task.get('used_model', ''), |
| | 'model_failures': task.get('model_failures', []) |
| | } |
| | if task['status'] == 'completed': |
| | response_data['download_url'] = f"/download/{task_id}" |
| | |
| | return jsonify(response_data) |
| |
|
| | @app.route('/stream/<task_id>', methods=['GET']) |
| | def stream_task_progress(task_id): |
| | """Endpoint pour Server-Sent Events (SSE) pour streamer la progression.""" |
| | def generate(): |
| | logger.info(f"Nouvelle connexion de streaming (SSE) pour la tâche {task_id}") |
| | last_status_sent = None |
| | last_model_sent = None |
| | |
| | while True: |
| | task = task_results.get(task_id) |
| | if not task: |
| | logger.warning(f"La tâche {task_id} a disparu pendant le streaming.") |
| | yield f'data: {json.dumps({"error": "La tâche a été perdue", "status": "error"})}\n\n' |
| | break |
| | |
| | current_status = task['status'] |
| | current_model = task.get('current_model', '') |
| | |
| | |
| | if current_status != last_status_sent or current_model != last_model_sent: |
| | data_to_send = { |
| | "status": current_status, |
| | "current_model": current_model, |
| | "used_model": task.get('used_model', '') |
| | } |
| | |
| | if current_status == 'completed': |
| | data_to_send["response"] = task.get("response", "") |
| | data_to_send["download_url"] = f"/download/{task_id}" |
| | elif current_status == 'error': |
| | data_to_send["error"] = task.get("error", "Erreur inconnue") |
| | data_to_send["model_failures"] = task.get("model_failures", []) |
| |
|
| | logger.info(f"[Task {task_id}] Envoi de la mise à jour de statut via SSE: {current_status} ({current_model})") |
| | yield f'data: {json.dumps(data_to_send)}\n\n' |
| | last_status_sent = current_status |
| | last_model_sent = current_model |
| | |
| | if current_status in ['completed', 'error']: |
| | logger.info(f"Fermeture de la connexion SSE pour la tâche terminée/échouée {task_id}") |
| | break |
| | |
| | time.sleep(1) |
| | |
| | return Response(stream_with_context(generate()), mimetype='text/event-stream', headers={'Cache-Control': 'no-cache', 'X-Accel-Buffering': 'no'}) |
| |
|
| | @app.route('/download/<task_id>') |
| | def download_pdf(task_id): |
| | logger.info(f"Requête de téléchargement pour la tâche {task_id}") |
| | task = task_results.get(task_id) |
| | if not task or task['status'] != 'completed' or 'pdf_filename' not in task: |
| | logger.warning(f"Échec du téléchargement pour la tâche {task_id}: Fichier non trouvé ou tâche non terminée.") |
| | return "Fichier non trouvé ou la tâche n'est pas encore terminée.", 404 |
| | |
| | try: |
| | logger.info(f"Envoi du fichier '{task['pdf_filename']}' pour la tâche {task_id}") |
| | return send_from_directory(GENERATED_PDF_DIR, task['pdf_filename'], as_attachment=True) |
| | except FileNotFoundError: |
| | logger.error(f"Le fichier PDF '{task['pdf_filename']}' pour la tâche {task_id} est introuvable sur le disque.") |
| | return "Erreur: Fichier introuvable sur le serveur.", 404 |
| |
|
| | if __name__ == '__main__': |
| | logger.info("Démarrage de l'application Flask.") |
| | |
| | |
| | os.makedirs(GENERATED_PDF_DIR, exist_ok=True) |
| | os.makedirs(USER_IMAGES_DIR, exist_ok=True) |
| | logger.info(f"Répertoires assurés d'exister: '{GENERATED_PDF_DIR}' et '{USER_IMAGES_DIR}'") |
| | |
| | app.run(debug=True, host='0.0.0.0', port=5000) |