import os import json import time import logging from typing import List, Dict, Any, Optional from groq import Groq logger = logging.getLogger(__name__) # Default model. Groq deprecated `llama-3.3-70b-versatile` — llama-3.1-70b # is the supported current SKU. Override via env for fast A/B. DEFAULT_MODEL = os.getenv("OPS_BRAIN_MODEL", "llama-3.1-70b-versatile") GROQ_TIMEOUT_S = 15.0 GROQ_MAX_RETRIES = 2 GROQ_RETRY_BACKOFF_S = 1.5 def _safe_json_loads(raw: str) -> Optional[Dict[str, Any]]: """ Parse JSON from a model response, tolerating stray prose / markdown fences. Returns None if unparseable. """ if not raw: return None text = raw.strip() # Strip ```json ... ``` fences if present. if text.startswith("```"): first_nl = text.find("\n") if first_nl != -1: text = text[first_nl + 1 :] if text.endswith("```"): text = text[:-3] # First attempt: strict. try: return json.loads(text) except json.JSONDecodeError: pass # Second attempt: find the first '{' and last '}' and try that slice. start = text.find("{") end = text.rfind("}") if start != -1 and end != -1 and end > start: try: return json.loads(text[start : end + 1]) except json.JSONDecodeError: pass logger.error("ops_brain: could not parse JSON from model output: %r", raw[:300]) return None def _groq_chat(client: Groq, prompt: str, model: str) -> Optional[Dict[str, Any]]: """Call Groq with timeout, retry, and JSON repair.""" last_err: Optional[Exception] = None for attempt in range(1, GROQ_MAX_RETRIES + 1): try: completion = client.chat.completions.create( model=model, messages=[{"role": "user", "content": prompt}], response_format={"type": "json_object"}, timeout=GROQ_TIMEOUT_S, ) raw = completion.choices[0].message.content parsed = _safe_json_loads(raw) if parsed is not None: return parsed last_err = ValueError("json parse failed") except Exception as e: # groq.GroqError, httpx.TimeoutException, etc. last_err = e logger.warning("ops_brain: groq attempt %d/%d failed: %s", attempt, GROQ_MAX_RETRIES, e) if attempt < GROQ_MAX_RETRIES: time.sleep(GROQ_RETRY_BACKOFF_S * attempt) logger.error("ops_brain: giving up after %d attempts. last_err=%s", GROQ_MAX_RETRIES, last_err) return None class OpsManagerAI: def __init__(self, api_key: str, model: str = DEFAULT_MODEL): if not api_key or not api_key.strip(): raise ValueError("OpsManagerAI: api_key is empty") self.client = Groq(api_key=api_key) self.model = model def process_telegram_message(self, text: str) -> Dict[str, Any]: """ Parses store reports into structured JSON using Groq. Returns a safe-default dict (with store_id=None) if the model fails, so the bot never crashes the handler thread. """ prompt = f""" You are a Professional AI Operations Manager. Your task is to parse store reports into a strict JSON format. The user provides reports in a specific template like: 'Daily Update [ Store Name ] Date: [Date] 💵 Sales:; (Value) 🛍️Transactions: [Value] 📊Average Transaction (AT): [Value] ⬆️⬇️AT Yesterday[ Value]' Input Text: {text} Required JSON Output: {{ "store_id": "Extract the name inside the brackets [ ]", "metrics": {{ "sales": float or null, "inventory_status": "Good|Warning|Critical", "staffing": "OK|Understaffed|Overstaffed" }}, "issues": ["List any mentioned anomalies or low stock, otherwise empty"], "analysis": "A brief analytical summary of the store's health today", "actions_needed": ["Concrete, actionable steps based on the sales/AT trends"] }} Return ONLY the JSON object. No preamble. """ parsed = _groq_chat(self.client, prompt, self.model) if parsed is not None and parsed.get("store_id"): return parsed # Safe fallback so caller can decide to reply with an error instead of crashing. return { "store_id": None, "metrics": {"sales": None, "inventory_status": None, "staffing": None}, "issues": [], "analysis": "AI could not extract a valid store report from the message.", "actions_needed": [], } def generate_hot_list_analysis(self, all_stores_data: List[Dict]) -> Dict[str, Any]: """ Analyze the full fleet to find critical areas. """ prompt = f""" Analyze the following store data and provide a strategic operation summary. Data: {json.dumps(all_stores_data)} Return JSON: {{ "top_performers": ["Store name"], "critical_stores": ["Store name"], "global_dev_area": "General area needing improvement", "strategic_priority": "Top immediate action for the owner" }} Return ONLY JSON. """ parsed = _groq_chat(self.client, prompt, self.model) if parsed is not None: return parsed return { "top_performers": [], "critical_stores": [], "global_dev_area": "AI analysis unavailable.", "strategic_priority": "Manual review required.", }