import os import signal import logging import threading import asyncio import time from fastapi import FastAPI import uvicorn from telegram import Update from telegram.ext import ApplicationBuilder, ContextTypes, MessageHandler, filters from telegram.request import HTTPXRequest from brain.ops_brain import OpsManagerAI from brain.db_handler import StoreDB # --- Configuration --- logging.basicConfig( format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', level=logging.INFO ) logger = logging.getLogger(__name__) def get_required_env(var_name: str) -> str: """Fetch a required environment variable or raise a clear error.""" value = os.getenv(var_name) if not value or not value.strip(): raise RuntimeError( f"Missing required environment variable: '{var_name}'. " f"Please set it in your Secrets/Environment settings." ) return value # Load secrets TELEGRAM_TOKEN = get_required_env("TELEGRAM_TOKEN") GROQ_API_KEY = get_required_env("GROQ_API_KEY") SUPABASE_URL = get_required_env("SUPABASE_URL") SUPABASE_KEY = get_required_env("SUPABASE_KEY") # --- Initialize AI & DB --- def init_services(): try: ai = OpsManagerAI(api_key=GROQ_API_KEY) db = StoreDB(url=SUPABASE_URL, key=SUPABASE_KEY) logger.info("AI and Database services initialized successfully.") return ai, db except Exception as e: logger.critical(f"Service Initialization failed: {e}") return None, None ai_manager, db = init_services() # --- Telegram Bot Logic --- async def handle_message(update: Update, context: ContextTypes.DEFAULT_TYPE): text = update.message.text if not text or not text.strip(): return user = update.message.from_user logger.info(f"Processing message from {user.first_name} (id={user.id}): {text}") try: if ai_manager is None or db is None: raise RuntimeError("AI or Database services not initialized") structured_data = ai_manager.process_telegram_message(text) if not structured_data or not structured_data.get('store_id'): logger.warning(f"AI could not extract a valid store_id from message: '{text}'") await update.message.reply_text( "⚠️ I couldn't extract a valid Store ID from your message. " "Please make sure to mention the store location or ID in your report." ) return try: db.save_report(structured_data) except Exception as db_err: logger.error(f"Database save failed: {db_err}") await update.message.reply_text( "❌ Critical Error: I analyzed your report, but the database rejected it.\n" "The report has been queued locally, but it will NOT appear on the dashboard until the connection is fixed." ) return await update.message.reply_text( f"✅ Report received for {structured_data.get('store_id', 'Unknown')}.\n" f"Analysis: {structured_data.get('analysis', 'N/A')}\n" f"Actions logged to Dashboard." ) except Exception as e: logger.error(f"Error in handler: {e}") await update.message.reply_text( "❌ Error processing report. Please ensure the format is correct." ) # --- HF Health Check Server --- app = FastAPI() @app.get("/") async def root(): bot_status = "Active" if ai_manager and db else "Degraded" return { "status": "Sovereign Ops Manager is Healthy", "bot": bot_status } # --- Unified Engine --- async def main(): stop_event = asyncio.Event() loop = asyncio.get_running_loop() for sig in (signal.SIGINT, signal.SIGTERM): loop.add_signal_handler(sig, lambda: stop_event.set()) config = uvicorn.Config(app, host="0.0.0.0", port=7860, log_level="info") server = uvicorn.Server(config) server_task = asyncio.create_task(server.serve()) logger.info("HF Heartbeat Server initialized on port 7860.") await asyncio.sleep(1.0) request_client = HTTPXRequest( connect_timeout=20.0, read_timeout=20.0, write_timeout=20.0, pool_timeout=20.0 ) bot_app = ( ApplicationBuilder() .token(TELEGRAM_TOKEN) .request(request_client) .build() ) bot_app.add_handler(MessageHandler(filters.TEXT & (~filters.COMMAND), handle_message)) await bot_app.initialize() await bot_app.start() await bot_app.updater.start_polling() logger.info("Sovereign Bot is LIVE and polling.") await stop_event.wait() logger.info("Shutdown signal caught. Beginning cleanup...") server.should_exit = True await server_task await bot_app.updater.stop() await bot_app.stop() await bot_app.shutdown() logger.info("All services shut down gracefully.") if __name__ == "__main__": try: asyncio.run(main()) except KeyboardInterrupt: pass