|
|
| import os |
| import signal |
| import logging |
| import threading |
| import asyncio |
| import time |
| from fastapi import FastAPI |
| import uvicorn |
| from telegram import Update |
| from telegram.ext import ApplicationBuilder, ContextTypes, MessageHandler, filters |
| from telegram.request import HTTPXRequest |
| from brain.ops_brain import OpsManagerAI |
| from brain.db_handler import StoreDB |
|
|
| |
| logging.basicConfig( |
| format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', |
| level=logging.INFO |
| ) |
| logger = logging.getLogger(__name__) |
|
|
| def get_required_env(var_name: str) -> str: |
| """Fetch a required environment variable or raise a clear error.""" |
| value = os.getenv(var_name) |
| if not value or not value.strip(): |
| raise RuntimeError( |
| f"Missing required environment variable: '{var_name}'. " |
| f"Please set it in your Secrets/Environment settings." |
| ) |
| return value |
|
|
| |
| TELEGRAM_TOKEN = get_required_env("TELEGRAM_TOKEN") |
| GROQ_API_KEY = get_required_env("GROQ_API_KEY") |
| SUPABASE_URL = get_required_env("SUPABASE_URL") |
| SUPABASE_KEY = get_required_env("SUPABASE_KEY") |
|
|
| |
| def init_services(): |
| try: |
| ai = OpsManagerAI(api_key=GROQ_API_KEY) |
| db = StoreDB(url=SUPABASE_URL, key=SUPABASE_KEY) |
| logger.info("AI and Database services initialized successfully.") |
| return ai, db |
| except Exception as e: |
| logger.critical(f"Service Initialization failed: {e}") |
| return None, None |
|
|
| ai_manager, db = init_services() |
|
|
| |
| async def handle_message(update: Update, context: ContextTypes.DEFAULT_TYPE): |
| text = update.message.text |
| if not text or not text.strip(): |
| return |
|
|
| user = update.message.from_user |
| logger.info(f"Processing message from {user.first_name} (id={user.id}): {text}") |
|
|
| try: |
| if ai_manager is None or db is None: |
| raise RuntimeError("AI or Database services not initialized") |
|
|
| structured_data = ai_manager.process_telegram_message(text) |
|
|
| if not structured_data or not structured_data.get('store_id'): |
| logger.warning(f"AI could not extract a valid store_id from message: '{text}'") |
| await update.message.reply_text( |
| "⚠️ I couldn't extract a valid Store ID from your message. " |
| "Please make sure to mention the store location or ID in your report." |
| ) |
| return |
|
|
| try: |
| db.save_report(structured_data) |
| except Exception as db_err: |
| logger.error(f"Database save failed: {db_err}") |
| await update.message.reply_text( |
| "❌ Critical Error: I analyzed your report, but the database rejected it.\n" |
| "The report has been queued locally, but it will NOT appear on the dashboard until the connection is fixed." |
| ) |
| return |
|
|
| await update.message.reply_text( |
| f"✅ Report received for {structured_data.get('store_id', 'Unknown')}.\n" |
| f"Analysis: {structured_data.get('analysis', 'N/A')}\n" |
| f"Actions logged to Dashboard." |
| ) |
|
|
| except Exception as e: |
| logger.error(f"Error in handler: {e}") |
| await update.message.reply_text( |
| "❌ Error processing report. Please ensure the format is correct." |
| ) |
|
|
| |
| app = FastAPI() |
|
|
| @app.get("/") |
| async def root(): |
| bot_status = "Active" if ai_manager and db else "Degraded" |
| return { |
| "status": "Sovereign Ops Manager is Healthy", |
| "bot": bot_status |
| } |
|
|
| |
| async def main(): |
| stop_event = asyncio.Event() |
| |
| loop = asyncio.get_running_loop() |
| for sig in (signal.SIGINT, signal.SIGTERM): |
| loop.add_signal_handler(sig, lambda: stop_event.set()) |
|
|
| config = uvicorn.Config(app, host="0.0.0.0", port=7860, log_level="info") |
| server = uvicorn.Server(config) |
|
|
| server_task = asyncio.create_task(server.serve()) |
| logger.info("HF Heartbeat Server initialized on port 7860.") |
| |
| await asyncio.sleep(1.0) |
|
|
| request_client = HTTPXRequest( |
| connect_timeout=20.0, read_timeout=20.0, write_timeout=20.0, pool_timeout=20.0 |
| ) |
| bot_app = ( |
| ApplicationBuilder() |
| .token(TELEGRAM_TOKEN) |
| .request(request_client) |
| .build() |
| ) |
| bot_app.add_handler(MessageHandler(filters.TEXT & (~filters.COMMAND), handle_message)) |
|
|
| await bot_app.initialize() |
| await bot_app.start() |
| await bot_app.updater.start_polling() |
| logger.info("Sovereign Bot is LIVE and polling.") |
|
|
| await stop_event.wait() |
| |
| logger.info("Shutdown signal caught. Beginning cleanup...") |
| server.should_exit = True |
| await server_task |
| await bot_app.updater.stop() |
| await bot_app.stop() |
| await bot_app.shutdown() |
| logger.info("All services shut down gracefully.") |
|
|
| if __name__ == "__main__": |
| try: |
| asyncio.run(main()) |
| except KeyboardInterrupt: |
| pass |
|
|