| import os |
| import json |
| import time |
| import logging |
| from typing import List, Dict, Any, Optional |
| from groq import Groq |
|
|
| logger = logging.getLogger(__name__) |
|
|
| |
| |
| DEFAULT_MODEL = os.getenv("OPS_BRAIN_MODEL", "llama-3.1-70b-versatile") |
|
|
| GROQ_TIMEOUT_S = 15.0 |
| GROQ_MAX_RETRIES = 2 |
| GROQ_RETRY_BACKOFF_S = 1.5 |
|
|
|
|
| def _safe_json_loads(raw: str) -> Optional[Dict[str, Any]]: |
| """ |
| Parse JSON from a model response, tolerating stray prose / markdown fences. |
| Returns None if unparseable. |
| """ |
| if not raw: |
| return None |
| text = raw.strip() |
|
|
| |
| if text.startswith("```"): |
| first_nl = text.find("\n") |
| if first_nl != -1: |
| text = text[first_nl + 1 :] |
| if text.endswith("```"): |
| text = text[:-3] |
|
|
| |
| try: |
| return json.loads(text) |
| except json.JSONDecodeError: |
| pass |
|
|
| |
| start = text.find("{") |
| end = text.rfind("}") |
| if start != -1 and end != -1 and end > start: |
| try: |
| return json.loads(text[start : end + 1]) |
| except json.JSONDecodeError: |
| pass |
|
|
| logger.error("ops_brain: could not parse JSON from model output: %r", raw[:300]) |
| return None |
|
|
|
|
| def _groq_chat(client: Groq, prompt: str, model: str) -> Optional[Dict[str, Any]]: |
| """Call Groq with timeout, retry, and JSON repair.""" |
| last_err: Optional[Exception] = None |
| for attempt in range(1, GROQ_MAX_RETRIES + 1): |
| try: |
| completion = client.chat.completions.create( |
| model=model, |
| messages=[{"role": "user", "content": prompt}], |
| response_format={"type": "json_object"}, |
| timeout=GROQ_TIMEOUT_S, |
| ) |
| raw = completion.choices[0].message.content |
| parsed = _safe_json_loads(raw) |
| if parsed is not None: |
| return parsed |
| last_err = ValueError("json parse failed") |
| except Exception as e: |
| last_err = e |
| logger.warning("ops_brain: groq attempt %d/%d failed: %s", attempt, GROQ_MAX_RETRIES, e) |
|
|
| if attempt < GROQ_MAX_RETRIES: |
| time.sleep(GROQ_RETRY_BACKOFF_S * attempt) |
|
|
| logger.error("ops_brain: giving up after %d attempts. last_err=%s", GROQ_MAX_RETRIES, last_err) |
| return None |
|
|
|
|
| class OpsManagerAI: |
| def __init__(self, api_key: str, model: str = DEFAULT_MODEL): |
| if not api_key or not api_key.strip(): |
| raise ValueError("OpsManagerAI: api_key is empty") |
| self.client = Groq(api_key=api_key) |
| self.model = model |
|
|
| def process_telegram_message(self, text: str) -> Dict[str, Any]: |
| """ |
| Parses store reports into structured JSON using Groq. |
| Returns a safe-default dict (with store_id=None) if the model fails, |
| so the bot never crashes the handler thread. |
| """ |
| prompt = f""" |
| You are a Professional AI Operations Manager. Your task is to parse store reports into a strict JSON format. |
| |
| The user provides reports in a specific template like: |
| 'Daily Update [ Store Name ] Date: [Date] 💵 Sales:; (Value) 🛍️Transactions: [Value] 📊Average Transaction (AT): [Value] ⬆️⬇️AT Yesterday[ Value]' |
| |
| Input Text: {text} |
| |
| Required JSON Output: |
| {{ |
| "store_id": "Extract the name inside the brackets [ ]", |
| "metrics": {{ |
| "sales": float or null, |
| "inventory_status": "Good|Warning|Critical", |
| "staffing": "OK|Understaffed|Overstaffed" |
| }}, |
| "issues": ["List any mentioned anomalies or low stock, otherwise empty"], |
| "analysis": "A brief analytical summary of the store's health today", |
| "actions_needed": ["Concrete, actionable steps based on the sales/AT trends"] |
| }} |
| Return ONLY the JSON object. No preamble. |
| """ |
|
|
| parsed = _groq_chat(self.client, prompt, self.model) |
| if parsed is not None and parsed.get("store_id"): |
| return parsed |
|
|
| |
| return { |
| "store_id": None, |
| "metrics": {"sales": None, "inventory_status": None, "staffing": None}, |
| "issues": [], |
| "analysis": "AI could not extract a valid store report from the message.", |
| "actions_needed": [], |
| } |
|
|
| def generate_hot_list_analysis(self, all_stores_data: List[Dict]) -> Dict[str, Any]: |
| """ |
| Analyze the full fleet to find critical areas. |
| """ |
| prompt = f""" |
| Analyze the following store data and provide a strategic operation summary. |
| Data: {json.dumps(all_stores_data)} |
| |
| Return JSON: |
| {{ |
| "top_performers": ["Store name"], |
| "critical_stores": ["Store name"], |
| "global_dev_area": "General area needing improvement", |
| "strategic_priority": "Top immediate action for the owner" |
| }} |
| Return ONLY JSON. |
| """ |
|
|
| parsed = _groq_chat(self.client, prompt, self.model) |
| if parsed is not None: |
| return parsed |
| return { |
| "top_performers": [], |
| "critical_stores": [], |
| "global_dev_area": "AI analysis unavailable.", |
| "strategic_priority": "Manual review required.", |
| } |
|
|