ops-manager-final / brain /ops_brain.py
Welly-code's picture
Upload folder using huggingface_hub
b09389b verified
Raw
History Blame Contribute Delete
5.68 kB
import os
import json
import time
import logging
from typing import List, Dict, Any, Optional
from groq import Groq
logger = logging.getLogger(__name__)
# Default model. Groq deprecated `llama-3.3-70b-versatile` — llama-3.1-70b
# is the supported current SKU. Override via env for fast A/B.
DEFAULT_MODEL = os.getenv("OPS_BRAIN_MODEL", "llama-3.1-70b-versatile")
GROQ_TIMEOUT_S = 15.0
GROQ_MAX_RETRIES = 2
GROQ_RETRY_BACKOFF_S = 1.5
def _safe_json_loads(raw: str) -> Optional[Dict[str, Any]]:
"""
Parse JSON from a model response, tolerating stray prose / markdown fences.
Returns None if unparseable.
"""
if not raw:
return None
text = raw.strip()
# Strip ```json ... ``` fences if present.
if text.startswith("```"):
first_nl = text.find("\n")
if first_nl != -1:
text = text[first_nl + 1 :]
if text.endswith("```"):
text = text[:-3]
# First attempt: strict.
try:
return json.loads(text)
except json.JSONDecodeError:
pass
# Second attempt: find the first '{' and last '}' and try that slice.
start = text.find("{")
end = text.rfind("}")
if start != -1 and end != -1 and end > start:
try:
return json.loads(text[start : end + 1])
except json.JSONDecodeError:
pass
logger.error("ops_brain: could not parse JSON from model output: %r", raw[:300])
return None
def _groq_chat(client: Groq, prompt: str, model: str) -> Optional[Dict[str, Any]]:
"""Call Groq with timeout, retry, and JSON repair."""
last_err: Optional[Exception] = None
for attempt in range(1, GROQ_MAX_RETRIES + 1):
try:
completion = client.chat.completions.create(
model=model,
messages=[{"role": "user", "content": prompt}],
response_format={"type": "json_object"},
timeout=GROQ_TIMEOUT_S,
)
raw = completion.choices[0].message.content
parsed = _safe_json_loads(raw)
if parsed is not None:
return parsed
last_err = ValueError("json parse failed")
except Exception as e: # groq.GroqError, httpx.TimeoutException, etc.
last_err = e
logger.warning("ops_brain: groq attempt %d/%d failed: %s", attempt, GROQ_MAX_RETRIES, e)
if attempt < GROQ_MAX_RETRIES:
time.sleep(GROQ_RETRY_BACKOFF_S * attempt)
logger.error("ops_brain: giving up after %d attempts. last_err=%s", GROQ_MAX_RETRIES, last_err)
return None
class OpsManagerAI:
def __init__(self, api_key: str, model: str = DEFAULT_MODEL):
if not api_key or not api_key.strip():
raise ValueError("OpsManagerAI: api_key is empty")
self.client = Groq(api_key=api_key)
self.model = model
def process_telegram_message(self, text: str) -> Dict[str, Any]:
"""
Parses store reports into structured JSON using Groq.
Returns a safe-default dict (with store_id=None) if the model fails,
so the bot never crashes the handler thread.
"""
prompt = f"""
You are a Professional AI Operations Manager. Your task is to parse store reports into a strict JSON format.
The user provides reports in a specific template like:
'Daily Update [ Store Name ] Date: [Date] 💵 Sales:; (Value) 🛍️Transactions: [Value] 📊Average Transaction (AT): [Value] ⬆️⬇️AT Yesterday[ Value]'
Input Text: {text}
Required JSON Output:
{{
"store_id": "Extract the name inside the brackets [ ]",
"metrics": {{
"sales": float or null,
"inventory_status": "Good|Warning|Critical",
"staffing": "OK|Understaffed|Overstaffed"
}},
"issues": ["List any mentioned anomalies or low stock, otherwise empty"],
"analysis": "A brief analytical summary of the store's health today",
"actions_needed": ["Concrete, actionable steps based on the sales/AT trends"]
}}
Return ONLY the JSON object. No preamble.
"""
parsed = _groq_chat(self.client, prompt, self.model)
if parsed is not None and parsed.get("store_id"):
return parsed
# Safe fallback so caller can decide to reply with an error instead of crashing.
return {
"store_id": None,
"metrics": {"sales": None, "inventory_status": None, "staffing": None},
"issues": [],
"analysis": "AI could not extract a valid store report from the message.",
"actions_needed": [],
}
def generate_hot_list_analysis(self, all_stores_data: List[Dict]) -> Dict[str, Any]:
"""
Analyze the full fleet to find critical areas.
"""
prompt = f"""
Analyze the following store data and provide a strategic operation summary.
Data: {json.dumps(all_stores_data)}
Return JSON:
{{
"top_performers": ["Store name"],
"critical_stores": ["Store name"],
"global_dev_area": "General area needing improvement",
"strategic_priority": "Top immediate action for the owner"
}}
Return ONLY JSON.
"""
parsed = _groq_chat(self.client, prompt, self.model)
if parsed is not None:
return parsed
return {
"top_performers": [],
"critical_stores": [],
"global_dev_area": "AI analysis unavailable.",
"strategic_priority": "Manual review required.",
}