Welly-code's picture
Upload main.py with huggingface_hub
27f0c52 verified
Raw
History Blame Contribute Delete
5.02 kB
import os
import signal
import logging
import threading
import asyncio
import time
from fastapi import FastAPI
import uvicorn
from telegram import Update
from telegram.ext import ApplicationBuilder, ContextTypes, MessageHandler, filters
from telegram.request import HTTPXRequest
from brain.ops_brain import OpsManagerAI
from brain.db_handler import StoreDB
# --- Configuration ---
logging.basicConfig(
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
level=logging.INFO
)
logger = logging.getLogger(__name__)
def get_required_env(var_name: str) -> str:
"""Fetch a required environment variable or raise a clear error."""
value = os.getenv(var_name)
if not value or not value.strip():
raise RuntimeError(
f"Missing required environment variable: '{var_name}'. "
f"Please set it in your Secrets/Environment settings."
)
return value
# Load secrets
TELEGRAM_TOKEN = get_required_env("TELEGRAM_TOKEN")
GROQ_API_KEY = get_required_env("GROQ_API_KEY")
SUPABASE_URL = get_required_env("SUPABASE_URL")
SUPABASE_KEY = get_required_env("SUPABASE_KEY")
# --- Initialize AI & DB ---
def init_services():
try:
ai = OpsManagerAI(api_key=GROQ_API_KEY)
db = StoreDB(url=SUPABASE_URL, key=SUPABASE_KEY)
logger.info("AI and Database services initialized successfully.")
return ai, db
except Exception as e:
logger.critical(f"Service Initialization failed: {e}")
return None, None
ai_manager, db = init_services()
# --- Telegram Bot Logic ---
async def handle_message(update: Update, context: ContextTypes.DEFAULT_TYPE):
text = update.message.text
if not text or not text.strip():
return
user = update.message.from_user
logger.info(f"Processing message from {user.first_name} (id={user.id}): {text}")
try:
if ai_manager is None or db is None:
raise RuntimeError("AI or Database services not initialized")
structured_data = ai_manager.process_telegram_message(text)
if not structured_data or not structured_data.get('store_id'):
logger.warning(f"AI could not extract a valid store_id from message: '{text}'")
await update.message.reply_text(
"⚠️ I couldn't extract a valid Store ID from your message. "
"Please make sure to mention the store location or ID in your report."
)
return
try:
db.save_report(structured_data)
except Exception as db_err:
logger.error(f"Database save failed: {db_err}")
await update.message.reply_text(
"❌ Critical Error: I analyzed your report, but the database rejected it.\n"
"The report has been queued locally, but it will NOT appear on the dashboard until the connection is fixed."
)
return
await update.message.reply_text(
f"✅ Report received for {structured_data.get('store_id', 'Unknown')}.\n"
f"Analysis: {structured_data.get('analysis', 'N/A')}\n"
f"Actions logged to Dashboard."
)
except Exception as e:
logger.error(f"Error in handler: {e}")
await update.message.reply_text(
"❌ Error processing report. Please ensure the format is correct."
)
# --- HF Health Check Server ---
app = FastAPI()
@app.get("/")
async def root():
bot_status = "Active" if ai_manager and db else "Degraded"
return {
"status": "Sovereign Ops Manager is Healthy",
"bot": bot_status
}
# --- Unified Engine ---
async def main():
stop_event = asyncio.Event()
loop = asyncio.get_running_loop()
for sig in (signal.SIGINT, signal.SIGTERM):
loop.add_signal_handler(sig, lambda: stop_event.set())
config = uvicorn.Config(app, host="0.0.0.0", port=7860, log_level="info")
server = uvicorn.Server(config)
server_task = asyncio.create_task(server.serve())
logger.info("HF Heartbeat Server initialized on port 7860.")
await asyncio.sleep(1.0)
request_client = HTTPXRequest(
connect_timeout=20.0, read_timeout=20.0, write_timeout=20.0, pool_timeout=20.0
)
bot_app = (
ApplicationBuilder()
.token(TELEGRAM_TOKEN)
.request(request_client)
.build()
)
bot_app.add_handler(MessageHandler(filters.TEXT & (~filters.COMMAND), handle_message))
await bot_app.initialize()
await bot_app.start()
await bot_app.updater.start_polling()
logger.info("Sovereign Bot is LIVE and polling.")
await stop_event.wait()
logger.info("Shutdown signal caught. Beginning cleanup...")
server.should_exit = True
await server_task
await bot_app.updater.stop()
await bot_app.stop()
await bot_app.shutdown()
logger.info("All services shut down gracefully.")
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
pass