Spaces:
Build error
Build error
| """ | |
| Risk service β integrates ARF risk engine, policy engine, and decision engine. | |
| Deterministic, no random fallbacks, explicit error handling. | |
| Version: 2026-05-04 β added Prometheus metrics for observability. | |
| """ | |
| import json | |
| import logging | |
| import os | |
| import time | |
| from typing import Optional, List, Dict, Any | |
| from agentic_reliability_framework.core.governance.risk_engine import RiskEngine | |
| from agentic_reliability_framework.core.governance.intents import InfrastructureIntent | |
| from agentic_reliability_framework.core.models.event import ReliabilityEvent, HealingAction | |
| from agentic_reliability_framework.core.governance.policy_engine import PolicyEngine | |
| from agentic_reliability_framework.core.decision.decision_engine import DecisionEngine | |
| from agentic_reliability_framework.runtime.memory.rag_graph import RAGGraphMemory | |
| from agentic_reliability_framework.core.research.eclipse_probe import compute_epistemic_risk | |
| # ββ optional tracing βββββββββββββββββββββββββββββββββββββββββ | |
| try: | |
| from opentelemetry import trace | |
| _tracer = trace.get_tracer(__name__) | |
| OTEL_AVAILABLE = True | |
| except ImportError: | |
| OTEL_AVAILABLE = False | |
| _tracer = None | |
| # ββ Prometheus metrics (always registered; noβop if not scraped) β | |
| from prometheus_client import Counter, Histogram | |
| _EVAL_COUNTER = Counter( | |
| "arf_evaluations_total", | |
| "Total evaluation calls (intent + healing), partitioned by engine and status.", | |
| ["engine", "status"], | |
| ) | |
| _EVAL_DURATION = Histogram( | |
| "arf_evaluation_duration_seconds", | |
| "Endβtoβend latency of evaluation calls.", | |
| ["engine"], | |
| buckets=(0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0), | |
| ) | |
| _RUST_AGREEMENT = Counter( | |
| "arf_rust_agreement_total", | |
| "Agreement between Rust enforcer and Python policy evaluation.", | |
| ["result"], # "agreed" or "diverged" | |
| ) | |
| # ββ optional Rust enforcer (shadow mode) ββββββββββββββββββββββ | |
| _RUST_ENFORCER_AVAILABLE = False | |
| _rust_evaluator = None # singleton per process | |
| _rust_policy_json: Optional[str] = None | |
| if os.getenv("ARF_USE_RUST_ENFORCER", "false").lower() == "true": | |
| try: | |
| import arf_enforcer | |
| _RUST_ENFORCER_AVAILABLE = True | |
| except ImportError: | |
| pass | |
| # Default OSS policy tree β mirrors the hardβcoded rules in the Python PolicyEvaluator | |
| # that check region, resource type, and max permission level. | |
| _OSS_POLICY_TREE_JSON = json.dumps({ | |
| "And": [ | |
| {"Atomic": {"RegionAllowed": {"allowed_regions": ["eastus"]}}}, | |
| {"Atomic": {"ResourceTypeRestricted": { | |
| "forbidden_types": ["DATABASE_DROP", "FULL_ROLLOUT", "SYSTEM_SHUTDOWN", "SECRET_ROTATION"] | |
| }}}, | |
| {"Atomic": {"MaxPermissionLevel": {"max_level": "admin"}}} | |
| ] | |
| }) | |
| def _ensure_rust_evaluator() -> bool: | |
| """Lazy initialise the Rust policy evaluator. Returns True on success.""" | |
| global _rust_evaluator, _rust_policy_json | |
| if _rust_evaluator is not None: | |
| return True | |
| if not _RUST_ENFORCER_AVAILABLE: | |
| return False | |
| try: | |
| _rust_policy_json = _OSS_POLICY_TREE_JSON | |
| _rust_evaluator = arf_enforcer.PyPolicyEvaluator(_rust_policy_json) | |
| return True | |
| except Exception: | |
| _rust_evaluator = None | |
| return False | |
| logger = logging.getLogger(__name__) | |
| def evaluate_intent( | |
| engine: RiskEngine, | |
| intent: InfrastructureIntent, | |
| cost_estimate: Optional[float], | |
| policy_violations: List[str] | |
| ) -> dict: | |
| """ | |
| Evaluate an infrastructure intent using the Bayesian risk engine. | |
| Optionally shadows the policy evaluation with the Rust enforcer when | |
| the environment variable ARF_USE_RUST_ENFORCER is set to "true". | |
| Any divergence is logged and counted as a Prometheus metric. | |
| Parameters | |
| ---------- | |
| engine : RiskEngine | |
| Initialised ARF Bayesian risk engine. | |
| intent : InfrastructureIntent | |
| The infrastructure request to evaluate. | |
| cost_estimate : float or None | |
| Estimated monthly cost (used by costβthreshold policies). | |
| policy_violations : list[str] | |
| Preβcomputed policy violation strings (from the Python evaluator). | |
| Returns | |
| ------- | |
| dict | |
| Keys: risk_score, explanation, contributions. | |
| """ | |
| t0 = time.monotonic() | |
| span = None | |
| if OTEL_AVAILABLE and _tracer: | |
| span = _tracer.start_span("risk_service.evaluate_intent") | |
| span.set_attribute("intent_type", type(intent).__name__) | |
| # ββ Shadow Rust enforcer (bestβeffort, nonβblocking) ββββββ | |
| if _RUST_ENFORCER_AVAILABLE and _ensure_rust_evaluator(): | |
| try: | |
| rust_intent = { | |
| "action": getattr(intent, "intent_type", "unknown"), | |
| "component": getattr(intent, "service_name", "unknown"), | |
| "region": getattr(intent, "region", None), | |
| "resource_type": getattr(intent, "resource_type", None), | |
| "permission_level": getattr(intent, "permission_level", None), | |
| "extra": {} | |
| } | |
| rust_raw = _rust_evaluator.evaluate( | |
| json.dumps(rust_intent), cost_estimate | |
| ) | |
| rust_violations = json.loads(rust_raw) | |
| agreed = set(rust_violations) == set(policy_violations) | |
| _RUST_AGREEMENT.labels(result="agreed" if agreed else "diverged").inc() | |
| if not agreed: | |
| msg = ( | |
| "Rust enforcer divergence: " | |
| f"Rust={sorted(rust_violations)} Python={sorted(policy_violations)}" | |
| ) | |
| logger.warning(msg) | |
| if span: | |
| span.add_event("rust_enforcer_divergence", { | |
| "rust_violations": rust_violations, | |
| "python_violations": policy_violations | |
| }) | |
| except Exception as exc: | |
| logger.debug("Rust enforcer shadow evaluation failed: %s", exc) | |
| # ββ Core risk evaluation ββββββββββββββββββββββββββββββββββ | |
| # ββ Automated canary promotion ββββββββββββββββββββββββββ | |
| if _RUST_ENFORCER_AVAILABLE and os.getenv("ARF_RUST_CANARY", "false").lower() == "true": | |
| try: | |
| from prometheus_client import REGISTRY | |
| lower = REGISTRY.get_sample_value("arf_rust_agreement_lower_bound", {}) | |
| if lower is not None and lower > 0.9999: | |
| policy_violations = rust_violations | |
| if span: | |
| span.set_attribute("rust_enforcer_active", True) | |
| except Exception: | |
| pass | |
| try: | |
| score, explanation, contributions = engine.calculate_risk( | |
| intent=intent, | |
| cost_estimate=cost_estimate, | |
| policy_violations=policy_violations | |
| ) | |
| engine_label = "python" | |
| status = "success" | |
| except Exception: | |
| _EVAL_COUNTER.labels(engine="python", status="error").inc() | |
| _EVAL_DURATION.labels(engine="python").observe(time.monotonic() - t0) | |
| raise | |
| _EVAL_COUNTER.labels(engine=engine_label, status=status).inc() | |
| _EVAL_DURATION.labels(engine=engine_label).observe(time.monotonic() - t0) | |
| if span: | |
| span.set_attribute("risk_score", score) | |
| if _RUST_ENFORCER_AVAILABLE: | |
| span.set_attribute("rust_enforcer_available", True) | |
| span.end() | |
| return { | |
| "risk_score": score, | |
| "explanation": explanation, | |
| "contributions": contributions | |
| } | |
| def evaluate_healing_decision( | |
| event: ReliabilityEvent, | |
| policy_engine: PolicyEngine, | |
| decision_engine: Optional[DecisionEngine] = None, | |
| rag_graph: Optional[RAGGraphMemory] = None, | |
| model=None, | |
| tokenizer=None, | |
| ) -> Dict[str, Any]: | |
| """ | |
| Evaluate healing actions for a given reliability event using decisionβtheoretic selection. | |
| Includes epistemic risk signals from the eclipse probe. | |
| Parameters | |
| ---------- | |
| event : ReliabilityEvent | |
| The incident event containing latency, error rate, etc. | |
| policy_engine : PolicyEngine | |
| The ARF healing policy engine with configured policies. | |
| decision_engine : DecisionEngine, optional | |
| If omitted, a default instance is created. | |
| rag_graph : RAGGraphMemory, optional | |
| Semantic memory for similar incident retrieval. | |
| model, tokenizer : optional | |
| HuggingFace model and tokenizer for epistemic risk computation. | |
| Returns | |
| ------- | |
| dict | |
| Keys: risk_score, selected_action, expected_utility, alternatives, | |
| explanation, epistemic_signals. | |
| """ | |
| t0 = time.monotonic() | |
| span = None | |
| if OTEL_AVAILABLE and _tracer: | |
| span = _tracer.start_span("risk_service.evaluate_healing") | |
| span.set_attribute("component", event.component) | |
| # If decision_engine not provided, try to get from policy_engine | |
| if decision_engine is None and hasattr(policy_engine, 'decision_engine'): | |
| decision_engine = policy_engine.decision_engine | |
| # If still None, create a minimal one (global stats only) | |
| if decision_engine is None: | |
| logger.debug("No DecisionEngine provided; creating default instance") | |
| decision_engine = DecisionEngine(rag_graph=rag_graph) | |
| # Get raw candidate actions (by temporarily disabling decision engine) | |
| orig_use = policy_engine.use_decision_engine | |
| try: | |
| policy_engine.use_decision_engine = False | |
| raw_actions = policy_engine.evaluate_policies(event) | |
| finally: | |
| policy_engine.use_decision_engine = orig_use | |
| # If no actions, return NO_ACTION | |
| if not raw_actions or raw_actions == [HealingAction.NO_ACTION]: | |
| if span: | |
| span.set_attribute("selected_action", HealingAction.NO_ACTION.value) | |
| span.end() | |
| _EVAL_COUNTER.labels(engine="python", status="success").inc() | |
| _EVAL_DURATION.labels(engine="python").observe(time.monotonic() - t0) | |
| return { | |
| "risk_score": 0.0, | |
| "selected_action": HealingAction.NO_ACTION.value, | |
| "expected_utility": 0.0, | |
| "alternatives": [], | |
| "explanation": "No candidate actions triggered.", | |
| "epistemic_signals": None, | |
| } | |
| # Build reasoning text from policies that triggered the actions | |
| reasoning_parts = [] | |
| for policy in policy_engine.policies: | |
| if any(a in policy.actions for a in raw_actions): | |
| conditions_str = ", ".join( | |
| f"{c.metric} {c.operator} {c.threshold}" for c in policy.conditions | |
| ) | |
| reasoning_parts.append( | |
| f"Policy {policy.name} triggered by {conditions_str} β actions {[a.value for a in policy.actions]}" | |
| ) | |
| reasoning_text = " ".join(reasoning_parts) | |
| # Build evidence text from the event | |
| evidence_text = ( | |
| f"Component: {event.component}, " | |
| f"latency_p99: {event.latency_p99}, " | |
| f"error_rate: {event.error_rate}, " | |
| f"cpu_util: {event.cpu_util}, " | |
| f"memory_util: {event.memory_util}" | |
| ) | |
| # Compute epistemic signals (if model/tokenizer provided) | |
| epistemic_signals = None | |
| if model is not None and tokenizer is not None: | |
| try: | |
| epistemic_signals = compute_epistemic_risk( | |
| reasoning_text, evidence_text, model, tokenizer | |
| ) | |
| except Exception as e: | |
| logger.error(f"Failed to compute epistemic risk: {e}") | |
| epistemic_signals = { | |
| "entropy": 0.0, | |
| "contradiction": 0.0, | |
| "evidence_lift": 0.0, | |
| "hallucination_risk": 0.0, | |
| } | |
| else: | |
| logger.debug("Epistemic model/tokenizer not provided; using zero signals") | |
| epistemic_signals = { | |
| "entropy": 0.0, | |
| "contradiction": 0.0, | |
| "evidence_lift": 0.0, | |
| "hallucination_risk": 0.0, | |
| } | |
| # Run decision engine to get best action and alternatives | |
| decision = decision_engine.select_optimal_action( | |
| raw_actions, event, component=event.component, | |
| epistemic_signals=epistemic_signals | |
| ) | |
| # Extract risk of the selected action | |
| risk_score = None | |
| for alt in decision.alternatives: | |
| if alt.action == decision.best_action: | |
| risk_score = alt.risk | |
| break | |
| if risk_score is None: | |
| # Compute risk separately | |
| risk_score = decision_engine.compute_risk( | |
| decision.best_action, event, event.component) | |
| # Format alternatives (top 3 only) | |
| alt_list = [] | |
| for alt in decision.alternatives[:3]: | |
| alt_list.append({ | |
| "action": alt.action.value, | |
| "expected_utility": alt.utility, | |
| "risk": alt.risk, | |
| }) | |
| # ββ Metrics & span finalisation βββββββββββββββββββββββββββ | |
| _EVAL_COUNTER.labels(engine="python", status="success").inc() | |
| _EVAL_DURATION.labels(engine="python").observe(time.monotonic() - t0) | |
| if span: | |
| span.set_attribute("risk_score", risk_score) | |
| span.set_attribute("selected_action", decision.best_action.value) | |
| span.set_attribute("expected_utility", decision.expected_utility) | |
| span.end() | |
| return { | |
| "risk_score": risk_score, | |
| "selected_action": decision.best_action.value, | |
| "expected_utility": decision.expected_utility, | |
| "alternatives": alt_list, | |
| "explanation": decision.explanation, | |
| "raw_decision": decision.raw_data, | |
| "epistemic_signals": epistemic_signals, | |
| } | |
| def get_system_risk() -> float: | |
| """ | |
| Return an aggregated risk score across all monitored components. | |
| This is a placeholder β the endpoint is deprecated. | |
| Raises NotImplementedError to avoid random fallback. | |
| """ | |
| raise NotImplementedError( | |
| "get_system_risk is deprecated. Use componentβlevel risk evaluation instead." | |
| ) | |