""" Risk service – integrates ARF risk engine, policy engine, and decision engine. Deterministic, no random fallbacks, explicit error handling. Version: 2026-05-04 – added Prometheus metrics for observability. """ import json import logging import os import time from typing import Optional, List, Dict, Any from agentic_reliability_framework.core.governance.risk_engine import RiskEngine from agentic_reliability_framework.core.governance.intents import InfrastructureIntent from agentic_reliability_framework.core.models.event import ReliabilityEvent, HealingAction from agentic_reliability_framework.core.governance.policy_engine import PolicyEngine from agentic_reliability_framework.core.decision.decision_engine import DecisionEngine from agentic_reliability_framework.runtime.memory.rag_graph import RAGGraphMemory from agentic_reliability_framework.core.research.eclipse_probe import compute_epistemic_risk # ── optional tracing ───────────────────────────────────────── try: from opentelemetry import trace _tracer = trace.get_tracer(__name__) OTEL_AVAILABLE = True except ImportError: OTEL_AVAILABLE = False _tracer = None # ── Prometheus metrics (always registered; no‑op if not scraped) ─ from prometheus_client import Counter, Histogram _EVAL_COUNTER = Counter( "arf_evaluations_total", "Total evaluation calls (intent + healing), partitioned by engine and status.", ["engine", "status"], ) _EVAL_DURATION = Histogram( "arf_evaluation_duration_seconds", "End‑to‑end latency of evaluation calls.", ["engine"], buckets=(0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0), ) _RUST_AGREEMENT = Counter( "arf_rust_agreement_total", "Agreement between Rust enforcer and Python policy evaluation.", ["result"], # "agreed" or "diverged" ) # ── optional Rust enforcer (shadow mode) ────────────────────── _RUST_ENFORCER_AVAILABLE = False _rust_evaluator = None # singleton per process _rust_policy_json: Optional[str] = None if os.getenv("ARF_USE_RUST_ENFORCER", "false").lower() == "true": try: import arf_enforcer _RUST_ENFORCER_AVAILABLE = True except ImportError: pass # Default OSS policy tree – mirrors the hard‑coded rules in the Python PolicyEvaluator # that check region, resource type, and max permission level. _OSS_POLICY_TREE_JSON = json.dumps({ "And": [ {"Atomic": {"RegionAllowed": {"allowed_regions": ["eastus"]}}}, {"Atomic": {"ResourceTypeRestricted": { "forbidden_types": ["DATABASE_DROP", "FULL_ROLLOUT", "SYSTEM_SHUTDOWN", "SECRET_ROTATION"] }}}, {"Atomic": {"MaxPermissionLevel": {"max_level": "admin"}}} ] }) def _ensure_rust_evaluator() -> bool: """Lazy initialise the Rust policy evaluator. Returns True on success.""" global _rust_evaluator, _rust_policy_json if _rust_evaluator is not None: return True if not _RUST_ENFORCER_AVAILABLE: return False try: _rust_policy_json = _OSS_POLICY_TREE_JSON _rust_evaluator = arf_enforcer.PyPolicyEvaluator(_rust_policy_json) return True except Exception: _rust_evaluator = None return False logger = logging.getLogger(__name__) def evaluate_intent( engine: RiskEngine, intent: InfrastructureIntent, cost_estimate: Optional[float], policy_violations: List[str] ) -> dict: """ Evaluate an infrastructure intent using the Bayesian risk engine. Optionally shadows the policy evaluation with the Rust enforcer when the environment variable ARF_USE_RUST_ENFORCER is set to "true". Any divergence is logged and counted as a Prometheus metric. Parameters ---------- engine : RiskEngine Initialised ARF Bayesian risk engine. intent : InfrastructureIntent The infrastructure request to evaluate. cost_estimate : float or None Estimated monthly cost (used by cost‑threshold policies). policy_violations : list[str] Pre‑computed policy violation strings (from the Python evaluator). Returns ------- dict Keys: risk_score, explanation, contributions. """ t0 = time.monotonic() span = None if OTEL_AVAILABLE and _tracer: span = _tracer.start_span("risk_service.evaluate_intent") span.set_attribute("intent_type", type(intent).__name__) # ── Shadow Rust enforcer (best‑effort, non‑blocking) ────── if _RUST_ENFORCER_AVAILABLE and _ensure_rust_evaluator(): try: rust_intent = { "action": getattr(intent, "intent_type", "unknown"), "component": getattr(intent, "service_name", "unknown"), "region": getattr(intent, "region", None), "resource_type": getattr(intent, "resource_type", None), "permission_level": getattr(intent, "permission_level", None), "extra": {} } rust_raw = _rust_evaluator.evaluate( json.dumps(rust_intent), cost_estimate ) rust_violations = json.loads(rust_raw) agreed = set(rust_violations) == set(policy_violations) _RUST_AGREEMENT.labels(result="agreed" if agreed else "diverged").inc() if not agreed: msg = ( "Rust enforcer divergence: " f"Rust={sorted(rust_violations)} Python={sorted(policy_violations)}" ) logger.warning(msg) if span: span.add_event("rust_enforcer_divergence", { "rust_violations": rust_violations, "python_violations": policy_violations }) except Exception as exc: logger.debug("Rust enforcer shadow evaluation failed: %s", exc) # ── Core risk evaluation ────────────────────────────────── # ── Automated canary promotion ────────────────────────── if _RUST_ENFORCER_AVAILABLE and os.getenv("ARF_RUST_CANARY", "false").lower() == "true": try: from prometheus_client import REGISTRY lower = REGISTRY.get_sample_value("arf_rust_agreement_lower_bound", {}) if lower is not None and lower > 0.9999: policy_violations = rust_violations if span: span.set_attribute("rust_enforcer_active", True) except Exception: pass try: score, explanation, contributions = engine.calculate_risk( intent=intent, cost_estimate=cost_estimate, policy_violations=policy_violations ) engine_label = "python" status = "success" except Exception: _EVAL_COUNTER.labels(engine="python", status="error").inc() _EVAL_DURATION.labels(engine="python").observe(time.monotonic() - t0) raise _EVAL_COUNTER.labels(engine=engine_label, status=status).inc() _EVAL_DURATION.labels(engine=engine_label).observe(time.monotonic() - t0) if span: span.set_attribute("risk_score", score) if _RUST_ENFORCER_AVAILABLE: span.set_attribute("rust_enforcer_available", True) span.end() return { "risk_score": score, "explanation": explanation, "contributions": contributions } def evaluate_healing_decision( event: ReliabilityEvent, policy_engine: PolicyEngine, decision_engine: Optional[DecisionEngine] = None, rag_graph: Optional[RAGGraphMemory] = None, model=None, tokenizer=None, ) -> Dict[str, Any]: """ Evaluate healing actions for a given reliability event using decision‑theoretic selection. Includes epistemic risk signals from the eclipse probe. Parameters ---------- event : ReliabilityEvent The incident event containing latency, error rate, etc. policy_engine : PolicyEngine The ARF healing policy engine with configured policies. decision_engine : DecisionEngine, optional If omitted, a default instance is created. rag_graph : RAGGraphMemory, optional Semantic memory for similar incident retrieval. model, tokenizer : optional HuggingFace model and tokenizer for epistemic risk computation. Returns ------- dict Keys: risk_score, selected_action, expected_utility, alternatives, explanation, epistemic_signals. """ t0 = time.monotonic() span = None if OTEL_AVAILABLE and _tracer: span = _tracer.start_span("risk_service.evaluate_healing") span.set_attribute("component", event.component) # If decision_engine not provided, try to get from policy_engine if decision_engine is None and hasattr(policy_engine, 'decision_engine'): decision_engine = policy_engine.decision_engine # If still None, create a minimal one (global stats only) if decision_engine is None: logger.debug("No DecisionEngine provided; creating default instance") decision_engine = DecisionEngine(rag_graph=rag_graph) # Get raw candidate actions (by temporarily disabling decision engine) orig_use = policy_engine.use_decision_engine try: policy_engine.use_decision_engine = False raw_actions = policy_engine.evaluate_policies(event) finally: policy_engine.use_decision_engine = orig_use # If no actions, return NO_ACTION if not raw_actions or raw_actions == [HealingAction.NO_ACTION]: if span: span.set_attribute("selected_action", HealingAction.NO_ACTION.value) span.end() _EVAL_COUNTER.labels(engine="python", status="success").inc() _EVAL_DURATION.labels(engine="python").observe(time.monotonic() - t0) return { "risk_score": 0.0, "selected_action": HealingAction.NO_ACTION.value, "expected_utility": 0.0, "alternatives": [], "explanation": "No candidate actions triggered.", "epistemic_signals": None, } # Build reasoning text from policies that triggered the actions reasoning_parts = [] for policy in policy_engine.policies: if any(a in policy.actions for a in raw_actions): conditions_str = ", ".join( f"{c.metric} {c.operator} {c.threshold}" for c in policy.conditions ) reasoning_parts.append( f"Policy {policy.name} triggered by {conditions_str} → actions {[a.value for a in policy.actions]}" ) reasoning_text = " ".join(reasoning_parts) # Build evidence text from the event evidence_text = ( f"Component: {event.component}, " f"latency_p99: {event.latency_p99}, " f"error_rate: {event.error_rate}, " f"cpu_util: {event.cpu_util}, " f"memory_util: {event.memory_util}" ) # Compute epistemic signals (if model/tokenizer provided) epistemic_signals = None if model is not None and tokenizer is not None: try: epistemic_signals = compute_epistemic_risk( reasoning_text, evidence_text, model, tokenizer ) except Exception as e: logger.error(f"Failed to compute epistemic risk: {e}") epistemic_signals = { "entropy": 0.0, "contradiction": 0.0, "evidence_lift": 0.0, "hallucination_risk": 0.0, } else: logger.debug("Epistemic model/tokenizer not provided; using zero signals") epistemic_signals = { "entropy": 0.0, "contradiction": 0.0, "evidence_lift": 0.0, "hallucination_risk": 0.0, } # Run decision engine to get best action and alternatives decision = decision_engine.select_optimal_action( raw_actions, event, component=event.component, epistemic_signals=epistemic_signals ) # Extract risk of the selected action risk_score = None for alt in decision.alternatives: if alt.action == decision.best_action: risk_score = alt.risk break if risk_score is None: # Compute risk separately risk_score = decision_engine.compute_risk( decision.best_action, event, event.component) # Format alternatives (top 3 only) alt_list = [] for alt in decision.alternatives[:3]: alt_list.append({ "action": alt.action.value, "expected_utility": alt.utility, "risk": alt.risk, }) # ── Metrics & span finalisation ─────────────────────────── _EVAL_COUNTER.labels(engine="python", status="success").inc() _EVAL_DURATION.labels(engine="python").observe(time.monotonic() - t0) if span: span.set_attribute("risk_score", risk_score) span.set_attribute("selected_action", decision.best_action.value) span.set_attribute("expected_utility", decision.expected_utility) span.end() return { "risk_score": risk_score, "selected_action": decision.best_action.value, "expected_utility": decision.expected_utility, "alternatives": alt_list, "explanation": decision.explanation, "raw_decision": decision.raw_data, "epistemic_signals": epistemic_signals, } def get_system_risk() -> float: """ Return an aggregated risk score across all monitored components. This is a placeholder – the endpoint is deprecated. Raises NotImplementedError to avoid random fallback. """ raise NotImplementedError( "get_system_risk is deprecated. Use component‑level risk evaluation instead." )