""" VineyardChatbot: Gemini-powered conversational advisor for the SolarWine agrivoltaic system. Provides a natural-language interface for farmers to ask about shading decisions, photosynthesis, weather conditions, vine biology, and energy generation. Uses a DataHub of loosely-coupled service providers for all data access — the chatbot never imports data clients directly. Anti-hallucination guardrails (v2): - Structured responses with confidence, sources, and caveats - Mandatory tool grounding for data questions - Post-response rule validation - Source-tagged tool results - Confidence estimation based on data freshness """ from __future__ import annotations import json import re import traceback from dataclasses import dataclass, field from typing import Optional from src.data_providers import DataHub from src.genai_utils import extract_json_object, get_genai_client, get_google_api_key from src.chatbot.guardrails import ( check_cross_source_consistency, classify_query, estimate_confidence, get_source_label, tag_tool_result, validate_response, ) def _extract_json(text: str) -> dict: """Thin wrapper around the shared genai_utils implementation.""" return extract_json_object(text) # --------------------------------------------------------------------------- # Data structures # --------------------------------------------------------------------------- @dataclass class ChatResponse: """Structured response from the chatbot with grounding metadata.""" message: str tool_calls: list[dict] = field(default_factory=list) data: dict = field(default_factory=dict) # --- Grounding metadata (v2) --- confidence: str = "low" # high / medium / low / insufficient_data sources: list[str] = field(default_factory=list) caveats: list[str] = field(default_factory=list) rule_violations: list[dict] = field(default_factory=list) # --- Dual-channel advisory (v3) --- response_mode: str = "info" # "info" (factual) or "advisory" (recommendation) # --------------------------------------------------------------------------- # Biology rules lookup (shared knowledge base) # --------------------------------------------------------------------------- BIOLOGY_RULES = { "site_location": ( "The vineyard site is in Yeruham, Israel (Seymour experimental plot). " "Weather data is from IMS station 43 (Sde Boker, Negev). Timezone is always " "Asia/Jerusalem (Israel Standard Time / Israel Daylight Time). All timestamps " "from tools (get_current_weather, get_vine_state, etc.) are in Israel local time. " "When the user asks about 'right now' or 'current' conditions, interpret the " "time in the tool result as Israel local time (e.g. 15:16 = afternoon in Yeruham)." ), "temperature_transition": ( "Below 30\u00b0C, Semillon photosynthesis is RuBP-limited (light is the " "bottleneck \u2014 shading HURTS). Above 30\u00b0C, it becomes Rubisco-limited " "(heat is the bottleneck \u2014 shading MAY help). The transition is gradual " "(28\u201332\u00b0C)." ), "no_shade_before_10": ( "Morning light is critical for carbon fixation. Never shade before " "10:00 regardless of temperature." ), "no_shade_in_may": ( "May is the flowering/fruit-set period. Yield protection has priority: " "avoid shading in May under normal conditions because even small losses " "can reduce cluster number and berry set. Only introduce shade in May " "as a last resort in extreme heat to prevent serious damage (e.g. " "severe sunburn or lethal stress)." ), "cwsi_threshold": ( "Crop Water Stress Index > 0.4 indicates real water stress. Below 0.4, " "the vine is coping adequately." ), "berry_sunburn": ( "Direct exposure at air temperature > 35\u00b0C risks berry sunburn, " "especially on the southwest-facing side of clusters in the afternoon." ), "energy_budget": ( "Primary objective is to maximise annual PV energy. The vines have a " "limited \"protection budget\": up to 5% annual energy sacrifice for " "shading that clearly protects vine health or yield. Suggested monthly " "caps: May=0%, Jun=15%, Jul=30%, Aug=30%, Sep=20%, Oct=5%. Stay below " "these caps unless there is an exceptional agronomic reason." ), "model_routing": ( "Use FvCB (Farquhar model) for standard conditions (T < 30\u00b0C, " "VPD < 2.5 kPa, adequate water). Use ML ensemble for stress conditions " "(T > 30\u00b0C, high VPD, water stress, or any non-linear regime)." ), "phenological_multiplier": ( "Stress during veraison (berry ripening) is 1.5x more damaging than " "during vegetative growth. Protect veraison at higher cost." ), "irrigation_management": ( "Aim to keep soil moisture in a comfortable band for Semillon: avoid " "both chronic dryness and chronic saturation. During vegetative growth " "allow gentle dry-down between irrigations; during flowering and " "veraison, avoid strong swings. Use CWSI and VPD together: if CWSI " "stays > 0.4 and VPD is high for several hours, consider an irrigation " "event unless the soil is already wet." ), "fertiliser_management": ( "Prioritise balanced nutrition over aggressive fertiliser use. Apply " "most nitrogen early in the season (budburst to pre-flowering), reduce " "near veraison to avoid excessive vigour and delayed ripening. Use " "leaf tissue tests and visual cues; avoid fertilising stressed vines " "during acute heat or drought events." ), "photosynthesis_3d": ( "The 3D viewer shows the vine canopy, solar tracker panel and sun position, " "with each zone coloured by photosynthesis rate (green = rate). Connect a " "Google API key to use the Vineyard Advisor and generate the interactive " "3D scene from the chat (e.g. \"Show me the 3D vine and photosynthesis\")." ), "no_leaves_no_shade_problem": ( "When there are no leaves (dormant season, before budburst, or canopy not " "yet developed), there is no problem with shading \u2014 the vine is not " "photosynthesising, so shading does not harm it. Do not frame the answer as " "\"you should not shade\" as if shading would be bad; instead say that " "shading is irrelevant right now (no leaves to protect), and panel position " "can favour energy. In the Negev, dormancy is roughly October\u2013March; budburst " "is typically March\u2013April." ), "no_shading_must_explain": ( "When recommending that the farmer should NOT shade (or that shading is not " "needed), always give a specific reason tied to photosynthesis or need. " "Examples: (1) No leaves / dormant \u2014 no photosynthesis to protect, so shading " "is irrelevant. (2) Full sun is beneficial \u2014 vine is light-limited (T < 30\u00b0C), " "so shading would reduce photosynthesis; keep panels tracking. (3) No " "radiation (night or GHI = 0) \u2014 nothing to manage; no shading decision needed. " "Never say only \"you should not shade\" without explaining the underlying " "reason (no need for PS protection, or need for full light for PS, etc.)." ), } # --------------------------------------------------------------------------- # System prompt # --------------------------------------------------------------------------- _SYSTEM_PROMPT_TEMPLATE = """\ You are a friendly vineyard advisor for the SolarWine agrivoltaic system. \ Site: Yeruham, Israel (Seymour plot, Negev). Weather: IMS station 43 (Sde Boker). \ Timezone: Asia/Jerusalem — all tool timestamps are Israel local time; interpret \ "now" and "current" using that timezone (e.g. 15:16 = afternoon in Yeruham). \ You help the farmer decide when and how much to shade their Semillon grapevines \ (VSP trellis, 1.2 m canopy) under single-axis solar trackers (1.13 m panel at \ 2.05 m height, 3.0 m row spacing). LANGUAGE: - ALWAYS reply in the same language the user writes in. If they write in \ Hebrew, reply in Hebrew. If English, reply in English. Match their language \ exactly — do not switch languages mid-conversation. CONTROL OBJECTIVE: - Primary goal: maximise annual PV energy production. - Secondary goal: protect vines from heat, water stress, and sunburn using a \ limited shading budget (see energy_budget rule). - When in doubt and there is no clear sign of dangerous stress, prefer \ keeping panels in their energy-maximising position. CALENDAR & STAGE HANDLING: - Do NOT guess the current calendar month. If the user does not supply a \ date and you do not have a phenology tool result, talk in terms of stages \ (budburst, flowering, veraison, etc.) rather than asserting a specific month. - IMPORTANT: For "should I shade?" questions, ALWAYS consider phenological \ stage FIRST. If the vine is dormant (no leaves), shading is irrelevant — \ say so briefly and recommend full tracking for energy. Do not waste the \ user's time with weather analysis when the vine has no leaves. COMMUNICATION STYLE: - Be CONCISE: 2-4 sentences for simple questions, not 15 lines - Lead with the answer, then give a brief reason - Always explain WHY a recommendation makes sense biologically - When uncertain, say so and suggest what data would help - Do NOT repeat that data is stale multiple times — mention it once BIOLOGICAL GUIDELINES (strong constraints; balance them with the energy objective): {biology_rules} TOOLS AVAILABLE: You can call tools by including a JSON block in your response with this format: {{"tool_call": {{"name": "", "args": {{}}}}}} Available tools: WEATHER & ENVIRONMENT: - get_current_weather: No args. Returns latest IMS weather readings plus \ current_time_israel, current_date_israel, current_datetime_israel (the real \ "now" in Yeruham). Use these for "right now" answers; timestamp_local is \ when the weather was recorded (may be stale — check age_minutes). - get_weather_history: Args: start_date (str YYYY-MM-DD), end_date (str \ YYYY-MM-DD). Returns hourly IMS weather summary for a date range. VINE SENSORS (ThingsBoard): - get_vine_state: No args. Returns the latest on-site sensor readings from \ ThingsBoard (soil moisture, leaf temperature, fruiting-zone PAR, irrigation \ status, panel surface temps) comparing TREATMENT area (rows 501-502, under \ panels) vs REFERENCE area (rows 503-504, open sky). Use when the user asks \ about current vine conditions, stress levels, soil moisture, or irrigation. - get_sensor_history: Args: device_type (str: air/crop/soil), area (str: \ treatment/reference/ambient), hours_back (int, default 24). Returns hourly \ averages from ThingsBoard time-series data. PHOTOSYNTHESIS: - calc_photosynthesis: Args: PAR (float), Tleaf (float), CO2 (float), \ VPD (float), Tair (float). Returns net assimilation A and limiting factor \ using the mechanistic Farquhar (FvCB) model. - predict_photosynthesis_ml: Args: features (dict, optional). Returns ML \ ensemble prediction of A. If features not provided, auto-fills from latest \ IMS cache. Use when conditions are stressful (T>30C, high VPD). - get_ps_forecast: Args: date (str YYYY-MM-DD, optional). Returns 24-hour \ predicted A profile (hourly) using time-series forecasting. SHADING & TRACKING: - simulate_shading: Args: angle_offset (float, degrees), hour (int 0-23), \ date (str YYYY-MM-DD, optional). Returns A comparison shaded vs unshaded. - compare_tilt_angles: Args: angles (list of ints, optional). Returns A \ and energy at different tilt offsets. - get_daily_schedule: Args: stress_threshold (float, optional), \ shade_angle (int, optional). Returns hourly shading schedule. ENERGY: - get_energy_generation: No args. Returns latest energy generation data \ from ThingsBoard (today kWh, current power W). - get_energy_history: Args: hours_back (int, default 24). Returns energy \ generation time-series. - predict_energy: Args: date (str YYYY-MM-DD, optional). Returns predicted \ daily energy generation (kWh) based on IMS GHI forecast and panel geometry. ADVISORY: - run_day_ahead_advisory: Args: date (str YYYY-MM-DD, optional). Returns \ full stress advisory from the DayAheadAdvisor. VISUALIZATION: - get_photosynthesis_3d: Args: hour (int 0-23, optional), date (str YYYY-MM-DD, \ optional). Returns a 3D interactive scene showing the vine, solar tracker, sun, \ and which parts of the canopy are doing how much photosynthesis (green = rate). \ Use when the user asks to see a 3D view, visualize photosynthesis, or show vine \ and tracker together. BIOLOGY: - explain_biology_rule: Args: rule_name (str). Returns detailed explanation. \ Valid names: {rule_names}. RESPONSE RULES: - CRITICAL: When the user asks about current conditions, specific numbers, \ predictions, sensor readings, or any site-specific data, you MUST call a \ tool. NEVER answer data questions from your training knowledge — always \ use a tool to get real data. - When quoting numbers from tool results, cite the data source and timestamp. \ Example: "According to IMS Station 43 (recorded 14:30), the temperature is 28°C." - If tool data is older than 60 minutes, warn: "Note: this data is X minutes old." - After receiving tool results, explain them in plain language. - When the answer is "no shading" or "shading not needed", always state the \ specific reason (no leaves / dormant; light-limited so full sun helps PS; or \ no radiation). See no_shading_must_explain and no_leaves_no_shade_problem. - If the user suggests something that violates a biology rule, refuse clearly \ and explain which rule and why. - If a tool returns an error or some data is missing, say clearly what data \ is unavailable. Do NOT invent or estimate values — say "I don't have current \ data for X" and explain what you can still answer from biology rules. - If no API key is available, you can still answer biology questions from \ your built-in knowledge. - NEVER invent sensor readings, temperatures, or measurements. If you don't \ have data, say so. """ # --------------------------------------------------------------------------- # Build system prompt from BIOLOGY_RULES to avoid drift # --------------------------------------------------------------------------- def _build_system_prompt() -> str: """Build the system prompt, embedding biology rules from the shared dict.""" rules_text = "\n\n".join( f"{i}. {name.upper().replace('_', ' ')}: {text}" for i, (name, text) in enumerate(BIOLOGY_RULES.items(), 1) ) rule_names = ", ".join(BIOLOGY_RULES.keys()) return _SYSTEM_PROMPT_TEMPLATE.format( biology_rules=rules_text, rule_names=rule_names, ) CHATBOT_SYSTEM_PROMPT = _build_system_prompt() # RAG-style rule retrieval: keyword index for selecting relevant rules per query _RULE_KEYWORDS = { "site_location": ["yeruham", "location", "timezone", "israel", "sde boker", "negev", "where", "site", "local time"], "temperature_transition": ["temperature", "30", "rubp", "rubisco", "transition", "heat", "hot", "cold", "cool", "warm"], "no_shade_before_10": ["morning", "before 10", "early", "sunrise", "dawn"], "no_shade_in_may": ["may", "flowering", "fruit set", "spring"], "cwsi_threshold": ["cwsi", "water stress", "crop water", "drought"], "berry_sunburn": ["sunburn", "berry", "35", "cluster", "grape"], "energy_budget": ["budget", "energy", "sacrifice", "ceiling", "5%", "kwh", "solar", "power", "generation"], "model_routing": ["model", "fvcb", "farquhar", "ml", "routing", "predict"], "phenological_multiplier": ["veraison", "ripening", "phenol", "stage"], "irrigation_management": ["irrigation", "water", "soil", "moisture", "irrigate"], "fertiliser_management": ["fertiliser", "fertilizer", "nitrogen", "nutrient"], "photosynthesis_3d": ["3d", "visual", "scene", "show"], "no_leaves_no_shade_problem": ["no leaves", "dormant", "budburst", "winter"], "no_shading_must_explain": ["should not shade", "no shading", "don't shade", "why not shade"], } # Rules that are always included (core constraints) _PINNED_RULES = {"no_shade_before_10", "energy_budget", "temperature_transition"} def retrieve_relevant_rules(query: str, max_rules: int = 6) -> list[str]: """Retrieve the most relevant biology rules for a query. Returns up to ``max_rules`` rule names, always including pinned rules. Uses weighted keyword matching with partial-match support: - Exact keyword match: +2 points - Partial word overlap: +1 point (e.g. "irrigat" matches "irrigation") """ query_lower = query.lower() query_words = set(re.findall(r'\w+', query_lower)) scores: dict[str, float] = {} for rule_name, keywords in _RULE_KEYWORDS.items(): score = 0.0 for kw in keywords: if kw in query_lower: # Exact substring match — strong signal score += 2.0 else: # Partial word overlap — weaker signal kw_words = set(re.findall(r'\w+', kw)) overlap = kw_words & query_words if overlap: score += len(overlap) * 0.5 if score > 0: scores[rule_name] = score # Always include pinned rules selected = set(_PINNED_RULES) # Add scored rules sorted by relevance for name, _ in sorted(scores.items(), key=lambda x: -x[1]): if len(selected) >= max_rules: break selected.add(name) return [r for r in BIOLOGY_RULES if r in selected] _ADVISORY_PATTERNS = [re.compile(p, re.IGNORECASE) for p in [ r"\bshould i\b", r"\bwhat should\b", r"\brecommend\b", r"\badvice\b", r"\bwhat do i\b", r"\baction\b", r"\bwhat to do\b", r"\bshade now\b", r"\birrigate\b", r"\bprepare\b", r"\bneed to\b", r"\bhow much\b", r"\bwhen should\b", r"\bcan i\b", ]] def classify_response_mode(query: str) -> str: """Classify whether a query needs factual info or actionable advisory. Returns 'info' or 'advisory'. """ for pat in _ADVISORY_PATTERNS: if pat.search(query): return "advisory" return "info" def build_contextual_prompt(query: str) -> str: """Build a system prompt with only relevant biology rules for this query.""" relevant = retrieve_relevant_rules(query) rules_text = "\n\n".join( f"{i}. {name.upper().replace('_', ' ')}: {BIOLOGY_RULES[name]}" for i, name in enumerate(relevant, 1) ) rule_names = ", ".join(BIOLOGY_RULES.keys()) return _SYSTEM_PROMPT_TEMPLATE.format( biology_rules=rules_text, rule_names=rule_names, ) # --------------------------------------------------------------------------- # Main class # --------------------------------------------------------------------------- class VineyardChatbot: """ Gemini-powered conversational vineyard advisor. All data access is delegated to a DataHub of loosely-coupled services. The chatbot itself only handles: - Gemini communication (two-pass tool-calling flow) - Tool dispatch (thin delegation to hub services) - Guardrails (query classification, response validation, confidence) - Offline fallback (keyword-match to biology rules) Usage ----- bot = VineyardChatbot() # default hub bot = VineyardChatbot(hub=custom_hub) # injected hub response = bot.chat("Should I shade right now?", history=[]) """ # Maximum retries when LLM fails to call a required tool _MAX_TOOL_RETRIES = 1 def __init__( self, hub: Optional[DataHub] = None, model_name: str = "gemini-2.5-flash", api_key: Optional[str] = None, verbose: bool = False, ): self.hub = hub or DataHub.default(verbose=verbose) self.model_name = model_name self._api_key = api_key self._client = None self.verbose = verbose # ------------------------------------------------------------------ # Gemini client (lazy) # ------------------------------------------------------------------ @property def api_key(self) -> str: return get_google_api_key(self._api_key) @property def client(self): if self._client is None: self._client = get_genai_client(self._api_key) return self._client @property def has_api_key(self) -> bool: try: get_google_api_key(self._api_key) return True except (ValueError, Exception): return False def _log(self, msg: str) -> None: if self.verbose: print(f"[VineyardChatbot] {msg}") # ------------------------------------------------------------------ # Tool dispatch — thin delegation to hub services # ------------------------------------------------------------------ def _dispatch_tool(self, tool_name: str, args: dict) -> dict: """Route a tool call to the correct hub service method.""" self._log(f"Dispatching tool: {tool_name}({args})") # --- Weather --- if tool_name == "get_current_weather": return self.hub.weather.get_current() elif tool_name == "get_weather_history": return self.hub.weather.get_history( start_date=str(args.get("start_date", "")), end_date=str(args.get("end_date", "")), ) # --- Vine sensors --- elif tool_name == "get_vine_state": return self.hub.vine_sensors.get_snapshot() elif tool_name == "get_sensor_history": return self.hub.vine_sensors.get_history( device_type=str(args.get("device_type", "crop")), area=str(args.get("area", "treatment")), hours_back=int(args.get("hours_back", 24)), ) # --- Photosynthesis --- elif tool_name == "calc_photosynthesis": return self.hub.photosynthesis.predict_fvcb( PAR=float(args.get("PAR", 1500)), Tleaf=float(args.get("Tleaf", 30)), CO2=float(args.get("CO2", 400)), VPD=float(args.get("VPD", 2.0)), Tair=float(args.get("Tair", 30)), ) elif tool_name == "predict_photosynthesis_ml": return self.hub.photosynthesis.predict_ml( features=args.get("features"), ) elif tool_name == "get_ps_forecast": return self.hub.photosynthesis.forecast_day_ahead( target_date=args.get("date"), ) # --- Shading / tracking --- elif tool_name == "simulate_shading": return self.hub.photosynthesis.simulate_shading( angle_offset=float(args.get("angle_offset", 20)), hour=int(args.get("hour", 13)), date_str=args.get("date"), ) elif tool_name == "compare_tilt_angles": angles = args.get("angles") if angles and isinstance(angles, list): angles = [int(a) for a in angles] return self.hub.photosynthesis.compare_angles(angles=angles) elif tool_name == "get_daily_schedule": return self.hub.photosynthesis.daily_schedule( stress_threshold=float(args.get("stress_threshold", 2.0)), shade_angle=int(args.get("shade_angle", 20)), ) # --- Energy --- elif tool_name == "get_energy_generation": return self.hub.energy.get_current() elif tool_name == "get_energy_history": return self.hub.energy.get_history( hours_back=int(args.get("hours_back", 24)), ) elif tool_name == "predict_energy": return self.hub.energy.predict( target_date=args.get("date"), ) # --- Advisory --- elif tool_name == "run_day_ahead_advisory": return self.hub.advisory.run_advisory( target_date=args.get("date"), ) # --- Biology --- elif tool_name == "explain_biology_rule": return self.hub.biology.explain_rule( rule_name=str(args.get("rule_name", "")), ) elif tool_name == "get_photosynthesis_3d": hour = args.get("hour") if hour is not None: hour = int(hour) return self.hub.photosynthesis.get_photosynthesis_3d_scene( hour=hour, date_str=args.get("date"), ) else: return {"error": f"Unknown tool: {tool_name}"} # ------------------------------------------------------------------ # Gemini communication # ------------------------------------------------------------------ # Number of recent message pairs to keep verbatim _RECENT_MESSAGES = 6 # Max older messages to summarize _MAX_SUMMARY_MESSAGES = 20 def _build_messages(self, user_message: str, history: list[dict]) -> list[dict]: """Build Gemini multi-turn message list with sliding context window. Strategy: - Inject live status briefing as pinned context (from cached data) - Keep the most recent 6 messages verbatim (for conversational flow) - Summarize older messages into a single context message """ messages = [] # Inject live status briefing so the LLM has immediate context briefing = self._build_status_briefing() if briefing: messages.append({ "role": "user", "parts": [{"text": f"[System status — do not repeat verbatim, use as context]\n{briefing}"}], }) messages.append({ "role": "model", "parts": [{"text": "Got it, I have the current status."}], }) n = len(history) if n > self._RECENT_MESSAGES: # Summarize older messages older = history[:n - self._RECENT_MESSAGES] # Take at most _MAX_SUMMARY_MESSAGES from the older portion older = older[-self._MAX_SUMMARY_MESSAGES:] summary = self._summarize_history(older) if summary: messages.append({ "role": "user", "parts": [{"text": f"[Conversation context: {summary}]"}], }) messages.append({ "role": "model", "parts": [{"text": "Understood, I'll keep that context in mind."}], }) # Recent messages verbatim recent = history[-self._RECENT_MESSAGES:] if n > self._RECENT_MESSAGES else history for entry in recent: role = entry.get("role", "user") content = entry.get("content", "") if role == "user": messages.append({"role": "user", "parts": [{"text": content}]}) elif role == "assistant": messages.append({"role": "model", "parts": [{"text": content}]}) messages.append({"role": "user", "parts": [{"text": user_message}]}) return messages @staticmethod def _summarize_history(messages: list[dict]) -> str: """Create a brief summary of older conversation messages.""" topics = [] for entry in messages: content = entry.get("content", "") role = entry.get("role", "user") if role == "user" and content: # Extract the core question/topic (first sentence or 100 chars) first_line = content.split("\n")[0][:100] topics.append(first_line) if not topics: return "" # Deduplicate and keep last 5 topics seen = set() unique = [] for t in reversed(topics): t_lower = t.lower().strip() if t_lower not in seen: seen.add(t_lower) unique.append(t) unique.reverse() return "Earlier in this conversation, the user asked about: " + "; ".join(unique[-5:]) def _call_gemini(self, messages: list[dict], system_prompt: str | None = None) -> str: """Send messages to Gemini and return raw text response.""" prompt = system_prompt or CHATBOT_SYSTEM_PROMPT response = self.client.models.generate_content( model=self.model_name, contents=messages, config={"system_instruction": prompt}, ) return response.text def _extract_tool_call(self, text: str) -> Optional[dict]: """Try to extract a tool_call JSON from the model response.""" try: match = re.search(r'\{\s*"tool_call"\s*:', text) if not match: return None start = match.start() brace_count = 0 for i in range(start, len(text)): if text[i] == "{": brace_count += 1 elif text[i] == "}": brace_count -= 1 if brace_count == 0: snippet = text[start:i + 1] parsed = json.loads(snippet) return parsed.get("tool_call") return None except (json.JSONDecodeError, ValueError): return None # ------------------------------------------------------------------ # Context gathering (for rule validation) # ------------------------------------------------------------------ def _get_validation_context(self) -> dict: """Gather current context for post-response rule validation.""" ctx = {} try: from src.phenology import estimate_stage_for_date from datetime import date, datetime import zoneinfo tz = zoneinfo.ZoneInfo("Asia/Jerusalem") now = datetime.now(tz=tz) ctx["hour"] = now.hour ctx["month"] = now.month stage = estimate_stage_for_date(date.today()) ctx["stage_id"] = stage.id # Try to get current temperature from cached weather try: wx = self.hub.weather.get_current() if "error" not in wx: t = wx.get("air_temperature_c") if t is not None: ctx["temp_c"] = float(t) except Exception: pass except Exception: pass return ctx # ------------------------------------------------------------------ # Live status briefing — injected at conversation start # ------------------------------------------------------------------ def _build_status_briefing(self) -> str: """Assemble a short system status from cached DataHub data. Uses only already-cached values (no new API calls), so it adds zero latency. Returns an empty string if nothing is available. """ from datetime import datetime import zoneinfo lines: list[str] = [] tz = zoneinfo.ZoneInfo("Asia/Jerusalem") now = datetime.now(tz=tz) lines.append(f"CURRENT STATUS ({now.strftime('%Y-%m-%d %H:%M')} IST):") # Phenology FIRST — most important context for shading decisions try: from src.models.phenology import estimate_stage_for_date from datetime import date stage = estimate_stage_for_date(date.today()) dormant = stage.id in ("winter_dormancy", "dormant", "pre_budburst") lines.append(f" Phenology: {stage.name} ({stage.id})" + (" — DORMANT, no leaves, shading irrelevant" if dormant else "")) except Exception: pass # Weather try: wx = self.hub.weather.get_current() if wx and "error" not in wx: t = wx.get("air_temperature_c") ghi = wx.get("ghi_w_m2") rh = wx.get("rh_percent") wind = wx.get("wind_speed_ms") parts = [] if t is not None: parts.append(f"T={float(t):.1f}°C") if ghi is not None: parts.append(f"GHI={float(ghi):.0f} W/m²") if rh is not None: parts.append(f"RH={float(rh):.0f}%") if wind is not None: parts.append(f"wind={float(wind):.1f} m/s") if parts: lines.append(f" Weather: {', '.join(parts)}") age = wx.get("age_minutes") if age is not None and float(age) > 30: lines.append(f" (weather data is {int(float(age))} min old)") except Exception: pass # Sensors try: snap = self.hub.vine_sensors.get_snapshot(light=True) if snap and "error" not in snap: parts = [] for key, label in [ ("treatment_air_temp_c", "air"), ("treatment_crop_par_umol", "PAR"), ("treatment_soil_moisture_pct", "soil"), ]: v = snap.get(key) if v is not None: if "temp" in key: parts.append(f"{label}={float(v):.1f}°C") elif "par" in key: parts.append(f"{label}={float(v):.0f} µmol") else: parts.append(f"{label}={float(v):.0f}%") if parts: lines.append(f" Sensors (treatment): {', '.join(parts)}") stale = snap.get("staleness_minutes") if stale is not None and float(stale) > 15: lines.append(f" (sensors {int(float(stale))} min old)") except Exception: pass # Energy try: en = self.hub.energy.get_current() if en and "error" not in en: pw = en.get("power_kw") if pw is not None: lines.append(f" Energy: {float(pw):.1f} kW now") except Exception: pass # Control status (from Redis via hub — no direct Redis import) try: ctrl = self.hub.advisory.get_status() if ctrl and "error" not in ctrl: mode = ctrl.get("mode") or ctrl.get("action") if mode: lines.append(f" Control: {mode}") except Exception: pass if len(lines) <= 1: return "" return "\n".join(lines) # ------------------------------------------------------------------ # Main chat method # ------------------------------------------------------------------ def chat(self, user_message: str, history: list[dict] | None = None) -> ChatResponse: """ Process a user message and return a structured response. Flow: 1. Classify query (data vs knowledge vs greeting) 2. Send to Gemini (Pass 1) 3. If data query and no tool call → re-prompt to force tool use 4. If tool call → dispatch → tag result → send back (Pass 2) 5. Validate response against biology rules 6. Estimate confidence 7. Return structured ChatResponse """ history = history or [] if not self.has_api_key: _, response = self._fallback_response(user_message) return response try: # Step 1: Classify query query_class = classify_query(user_message) self._log(f"Query classified: {query_class.category} " f"(requires_data={query_class.requires_data})") # Build contextual system prompt with only relevant biology rules contextual_prompt = build_contextual_prompt(user_message) messages = self._build_messages(user_message, history) self._log("Pass 1: calling Gemini...") response_text = self._call_gemini(messages, system_prompt=contextual_prompt) self._log(f"Pass 1 response: {response_text[:200]}...") tool_call = self._extract_tool_call(response_text) # Step 2: Force tool use if query requires data but LLM didn't call one if query_class.requires_data and not tool_call: self._log("Data query but no tool call — re-prompting...") retry_prompt = ( "The user is asking about site-specific data or current conditions. " "You MUST call a tool to answer this — do not use your training " "knowledge for real-time data. Please call the appropriate tool now." ) messages.append({"role": "model", "parts": [{"text": response_text}]}) messages.append({"role": "user", "parts": [{"text": retry_prompt}]}) response_text = self._call_gemini(messages, system_prompt=contextual_prompt) tool_call = self._extract_tool_call(response_text) # Step 3: Process tool call if present tool_name = None tool_result = None tool_succeeded = False data_age = None if tool_call: tool_name = tool_call.get("name", "") tool_args = tool_call.get("args", {}) self._log(f"Tool call detected: {tool_name}") try: tool_result = self._dispatch_tool(tool_name, tool_args) tool_succeeded = "error" not in tool_result except Exception as exc: tool_result = {"error": f"Tool execution failed: {exc}"} tool_succeeded = False # Tag result with source metadata tagged_result = tag_tool_result(tool_name, tool_result) data_age = tagged_result.get("_data_age_minutes") # Auto-supplement: when IMS is stale, also fetch TB sensors supplement_text = "" if tool_name == "get_current_weather" and data_age is not None and data_age > 120: try: snap = self.hub.vine_sensors.get_snapshot(light=True) if snap and "error" not in snap: snap_tagged = tag_tool_result("get_vine_state", snap) supplement_text = ( f"\n\nADDITIONAL: IMS weather is stale ({data_age:.0f} min old). " f"Here are FRESH on-site sensor readings from ThingsBoard:\n" f"```json\n{json.dumps(snap_tagged, indent=2, default=str)}\n```\n" f"Use these fresh readings instead of the stale IMS data for " f"current conditions." ) except Exception: pass # Build Pass 2 prompt with source citation instructions source_label = get_source_label(tool_name) freshness_note = "" if data_age is not None and data_age > 60: freshness_note = ( f"\n\nNote: IMS data is {data_age:.0f} minutes old — " "mention this once, briefly." ) tool_result_text = ( f"Tool result for {tool_name} " f"(source: {source_label}):\n" f"```json\n{json.dumps(tagged_result, indent=2, default=str)}\n```\n\n" f"Answer the farmer's question concisely (2-4 sentences). " f"Lead with the answer, then explain briefly." f"{freshness_note}{supplement_text}" ) messages.append({"role": "model", "parts": [{"text": response_text}]}) messages.append({"role": "user", "parts": [{"text": tool_result_text}]}) self._log("Pass 2: calling Gemini with tool result...") final_response = self._call_gemini(messages) self._log(f"Pass 2 response: {final_response[:200]}...") else: final_response = response_text # Step 5: Post-response rule validation validation_ctx = self._get_validation_context() violations = validate_response( response_text=final_response, context=validation_ctx, ) # Detect rule-based overrides (dormancy, blocked rules) for confidence has_rule_override = any( v.rule_name in ("no_leaves_no_shade_problem", "no_shade_before_10", "no_shade_in_may") and v.severity == "block" for v in violations ) # Step 4: Estimate confidence confidence = estimate_confidence( tool_called=tool_call is not None, tool_succeeded=tool_succeeded, data_age_minutes=data_age, tool_name=tool_name, rule_override=has_rule_override, ) caveats: list[str] = [] violation_dicts: list[dict] = [] for v in violations: violation_dicts.append({ "rule": v.rule_name, "severity": v.severity, "message": v.message, }) if v.severity == "block": # Override the response with the correction final_response = ( f"{v.correction}\n\n" f"*(Original response was overridden because it violated " f"the **{v.rule_name.replace('_', ' ')}** rule.)*" ) confidence = "high" # rule-based override is deterministic self._log(f"BLOCKED: {v.rule_name} — {v.message}") elif v.severity == "warn": caveats.append(v.correction) self._log(f"WARNING: {v.rule_name} — {v.message}") # Build data freshness caveat if data_age is not None and data_age > 60: caveats.append( f"Data is {data_age:.0f} minutes old — conditions may have changed." ) # Range validation warnings if tool_result: range_warnings = tool_result.get("_range_warnings") or ( tagged_result.get("_range_warnings") if tool_call else None ) if range_warnings: for rw in range_warnings: caveats.append(rw) # Cross-source consistency check (when we have both weather + sensors) try: wx_data = self.hub.weather.get_current() sensor_data = self.hub.vine_sensors.get_snapshot(light=True) consistency_caveats = check_cross_source_consistency(wx_data, sensor_data) caveats.extend(consistency_caveats) except Exception: pass # Build sources list sources: list[str] = [] if tool_name: sources.append(get_source_label(tool_name)) if not tool_call and query_class.category == "knowledge": sources.append("Built-in biology rules") response_mode = classify_response_mode(user_message) return ChatResponse( message=final_response, tool_calls=[{"name": tool_name, "args": tool_call.get("args", {}), "result": tool_result}] if tool_call else [], data=tool_result if tool_result else {}, confidence=confidence, sources=sources, caveats=caveats, rule_violations=violation_dicts, response_mode=response_mode, ) except Exception as exc: self._log(f"Chat error: {exc}\n{traceback.format_exc()}") matched, fallback = self._fallback_response(user_message) if matched: return fallback return ChatResponse( message=( "I'm having trouble connecting to the AI service right now. " "You can still ask me about vine biology rules \u2014 I have those " "built in. For data queries, please check that your Google API " "key is configured." ), confidence="insufficient_data", sources=[], caveats=["AI service connection failed"], ) # ------------------------------------------------------------------ # Fallback (no API key / offline) # ------------------------------------------------------------------ def _fallback_response(self, user_message: str) -> tuple[bool, ChatResponse]: """Keyword-match fallback when Gemini is unavailable.""" msg_lower = user_message.lower() rule_matches = { "site_location": ["yeruham", "location", "timezone", "right now", "current time", "what time", "israel time", "local time"], "temperature_transition": ["temperature", "30 degree", "30\u00b0", "rubp", "rubisco", "transition", "heat", "hot"], "no_shade_before_10": ["morning", "before 10", "early", "sunrise"], "no_shade_in_may": ["may", "flowering", "fruit set", "fruit-set"], "cwsi_threshold": ["cwsi", "water stress", "crop water"], "berry_sunburn": ["sunburn", "berry", "35\u00b0", "35 degree"], "energy_budget": ["budget", "energy", "sacrifice", "ceiling", "5%", "monthly", "generation", "kwh", "power", "solar"], "model_routing": ["model", "fvcb", "farquhar", "ml", "routing", "predict", "forecast"], "phenological_multiplier": ["veraison", "ripening", "phenolog"], "irrigation_management": ["irrigation", "water", "soil moisture"], "fertiliser_management": ["fertiliser", "fertilizer", "nitrogen", "nutrient"], "photosynthesis_3d": ["3d", "3D", "visual", "visualize", "visualise", "model show", "vine and tracker", "sun and vine"], "no_leaves_no_shade_problem": ["no leaves", "dormant", "budburst", "no canopy"], "no_shading_must_explain": ["should not shade", "don't shade", "no shading"], } matched_rules = [] for rule_name, keywords in rule_matches.items(): if any(kw in msg_lower for kw in keywords): matched_rules.append(rule_name) if matched_rules: parts = ["Here's what I know about that (from built-in biology rules):\n"] for rule in matched_rules: parts.append(f"**{rule.replace('_', ' ').title()}:** {BIOLOGY_RULES[rule]}\n") parts.append( "\n*Note: I'm running without an AI connection, so I can only " "answer from built-in biology rules. Connect a Google API key " "for full advisory capabilities.*" ) return True, ChatResponse( message="\n".join(parts), confidence="medium", sources=["Built-in biology rules"], ) return False, ChatResponse( message=( "I'm currently running without an AI connection (no Google API key). " "I can answer questions about vine biology rules \u2014 try asking about:\n\n" "- Temperature and shading thresholds\n" "- Morning light rules\n" "- May shading restrictions\n" "- Water stress (CWSI)\n" "- Berry sunburn risk\n" "- Energy budget limits\n" "- Model routing (FvCB vs ML)\n" "- Veraison protection\n" "- Irrigation management\n" "- Energy generation and prediction\n\n" "*Connect a Google API key for full advisory capabilities " "(weather, photosynthesis calculations, shading simulations, " "energy analysis).*" ), confidence="insufficient_data", sources=[], )