File size: 5,684 Bytes
b09389b | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 | import os
import json
import time
import logging
from typing import List, Dict, Any, Optional
from groq import Groq
logger = logging.getLogger(__name__)
# Default model. Groq deprecated `llama-3.3-70b-versatile` — llama-3.1-70b
# is the supported current SKU. Override via env for fast A/B.
DEFAULT_MODEL = os.getenv("OPS_BRAIN_MODEL", "llama-3.1-70b-versatile")
GROQ_TIMEOUT_S = 15.0
GROQ_MAX_RETRIES = 2
GROQ_RETRY_BACKOFF_S = 1.5
def _safe_json_loads(raw: str) -> Optional[Dict[str, Any]]:
"""
Parse JSON from a model response, tolerating stray prose / markdown fences.
Returns None if unparseable.
"""
if not raw:
return None
text = raw.strip()
# Strip ```json ... ``` fences if present.
if text.startswith("```"):
first_nl = text.find("\n")
if first_nl != -1:
text = text[first_nl + 1 :]
if text.endswith("```"):
text = text[:-3]
# First attempt: strict.
try:
return json.loads(text)
except json.JSONDecodeError:
pass
# Second attempt: find the first '{' and last '}' and try that slice.
start = text.find("{")
end = text.rfind("}")
if start != -1 and end != -1 and end > start:
try:
return json.loads(text[start : end + 1])
except json.JSONDecodeError:
pass
logger.error("ops_brain: could not parse JSON from model output: %r", raw[:300])
return None
def _groq_chat(client: Groq, prompt: str, model: str) -> Optional[Dict[str, Any]]:
"""Call Groq with timeout, retry, and JSON repair."""
last_err: Optional[Exception] = None
for attempt in range(1, GROQ_MAX_RETRIES + 1):
try:
completion = client.chat.completions.create(
model=model,
messages=[{"role": "user", "content": prompt}],
response_format={"type": "json_object"},
timeout=GROQ_TIMEOUT_S,
)
raw = completion.choices[0].message.content
parsed = _safe_json_loads(raw)
if parsed is not None:
return parsed
last_err = ValueError("json parse failed")
except Exception as e: # groq.GroqError, httpx.TimeoutException, etc.
last_err = e
logger.warning("ops_brain: groq attempt %d/%d failed: %s", attempt, GROQ_MAX_RETRIES, e)
if attempt < GROQ_MAX_RETRIES:
time.sleep(GROQ_RETRY_BACKOFF_S * attempt)
logger.error("ops_brain: giving up after %d attempts. last_err=%s", GROQ_MAX_RETRIES, last_err)
return None
class OpsManagerAI:
def __init__(self, api_key: str, model: str = DEFAULT_MODEL):
if not api_key or not api_key.strip():
raise ValueError("OpsManagerAI: api_key is empty")
self.client = Groq(api_key=api_key)
self.model = model
def process_telegram_message(self, text: str) -> Dict[str, Any]:
"""
Parses store reports into structured JSON using Groq.
Returns a safe-default dict (with store_id=None) if the model fails,
so the bot never crashes the handler thread.
"""
prompt = f"""
You are a Professional AI Operations Manager. Your task is to parse store reports into a strict JSON format.
The user provides reports in a specific template like:
'Daily Update [ Store Name ] Date: [Date] 💵 Sales:; (Value) 🛍️Transactions: [Value] 📊Average Transaction (AT): [Value] ⬆️⬇️AT Yesterday[ Value]'
Input Text: {text}
Required JSON Output:
{{
"store_id": "Extract the name inside the brackets [ ]",
"metrics": {{
"sales": float or null,
"inventory_status": "Good|Warning|Critical",
"staffing": "OK|Understaffed|Overstaffed"
}},
"issues": ["List any mentioned anomalies or low stock, otherwise empty"],
"analysis": "A brief analytical summary of the store's health today",
"actions_needed": ["Concrete, actionable steps based on the sales/AT trends"]
}}
Return ONLY the JSON object. No preamble.
"""
parsed = _groq_chat(self.client, prompt, self.model)
if parsed is not None and parsed.get("store_id"):
return parsed
# Safe fallback so caller can decide to reply with an error instead of crashing.
return {
"store_id": None,
"metrics": {"sales": None, "inventory_status": None, "staffing": None},
"issues": [],
"analysis": "AI could not extract a valid store report from the message.",
"actions_needed": [],
}
def generate_hot_list_analysis(self, all_stores_data: List[Dict]) -> Dict[str, Any]:
"""
Analyze the full fleet to find critical areas.
"""
prompt = f"""
Analyze the following store data and provide a strategic operation summary.
Data: {json.dumps(all_stores_data)}
Return JSON:
{{
"top_performers": ["Store name"],
"critical_stores": ["Store name"],
"global_dev_area": "General area needing improvement",
"strategic_priority": "Top immediate action for the owner"
}}
Return ONLY JSON.
"""
parsed = _groq_chat(self.client, prompt, self.model)
if parsed is not None:
return parsed
return {
"top_performers": [],
"critical_stores": [],
"global_dev_area": "AI analysis unavailable.",
"strategic_priority": "Manual review required.",
}
|