| """ |
| Risk service β integrates ARF risk engine, policy engine, and decision engine. |
| Deterministic, no random fallbacks, explicit error handling. |
| |
| Version: 2026-05-04 β added Prometheus metrics for observability. |
| """ |
|
|
| import json |
| import logging |
| import os |
| import time |
| from typing import Optional, List, Dict, Any |
|
|
| from agentic_reliability_framework.core.governance.risk_engine import RiskEngine |
| from agentic_reliability_framework.core.governance.intents import InfrastructureIntent |
| from agentic_reliability_framework.core.models.event import ReliabilityEvent, HealingAction |
| from agentic_reliability_framework.core.governance.policy_engine import PolicyEngine |
| from agentic_reliability_framework.core.decision.decision_engine import DecisionEngine |
| from agentic_reliability_framework.runtime.memory.rag_graph import RAGGraphMemory |
| from agentic_reliability_framework.core.research.eclipse_probe import compute_epistemic_risk |
|
|
| |
| try: |
| from opentelemetry import trace |
| _tracer = trace.get_tracer(__name__) |
| OTEL_AVAILABLE = True |
| except ImportError: |
| OTEL_AVAILABLE = False |
| _tracer = None |
|
|
| |
| from prometheus_client import Counter, Histogram |
|
|
| _EVAL_COUNTER = Counter( |
| "arf_evaluations_total", |
| "Total evaluation calls (intent + healing), partitioned by engine and status.", |
| ["engine", "status"], |
| ) |
|
|
| _EVAL_DURATION = Histogram( |
| "arf_evaluation_duration_seconds", |
| "Endβtoβend latency of evaluation calls.", |
| ["engine"], |
| buckets=(0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0), |
| ) |
|
|
| _RUST_AGREEMENT = Counter( |
| "arf_rust_agreement_total", |
| "Agreement between Rust enforcer and Python policy evaluation.", |
| ["result"], |
| ) |
|
|
| |
| _RUST_ENFORCER_AVAILABLE = False |
| _rust_evaluator = None |
| _rust_policy_json: Optional[str] = None |
|
|
| if os.getenv("ARF_USE_RUST_ENFORCER", "false").lower() == "true": |
| try: |
| import arf_enforcer |
| _RUST_ENFORCER_AVAILABLE = True |
| except ImportError: |
| pass |
|
|
| |
| |
| _OSS_POLICY_TREE_JSON = json.dumps({ |
| "And": [ |
| {"Atomic": {"RegionAllowed": {"allowed_regions": ["eastus"]}}}, |
| {"Atomic": {"ResourceTypeRestricted": { |
| "forbidden_types": ["DATABASE_DROP", "FULL_ROLLOUT", "SYSTEM_SHUTDOWN", "SECRET_ROTATION"] |
| }}}, |
| {"Atomic": {"MaxPermissionLevel": {"max_level": "admin"}}} |
| ] |
| }) |
|
|
|
|
| def _ensure_rust_evaluator() -> bool: |
| """Lazy initialise the Rust policy evaluator. Returns True on success.""" |
| global _rust_evaluator, _rust_policy_json |
| if _rust_evaluator is not None: |
| return True |
| if not _RUST_ENFORCER_AVAILABLE: |
| return False |
| try: |
| _rust_policy_json = _OSS_POLICY_TREE_JSON |
| _rust_evaluator = arf_enforcer.PyPolicyEvaluator(_rust_policy_json) |
| return True |
| except Exception: |
| _rust_evaluator = None |
| return False |
|
|
|
|
| logger = logging.getLogger(__name__) |
|
|
|
|
| def evaluate_intent( |
| engine: RiskEngine, |
| intent: InfrastructureIntent, |
| cost_estimate: Optional[float], |
| policy_violations: List[str] |
| ) -> dict: |
| """ |
| Evaluate an infrastructure intent using the Bayesian risk engine. |
| |
| Optionally shadows the policy evaluation with the Rust enforcer when |
| the environment variable ARF_USE_RUST_ENFORCER is set to "true". |
| Any divergence is logged and counted as a Prometheus metric. |
| |
| Parameters |
| ---------- |
| engine : RiskEngine |
| Initialised ARF Bayesian risk engine. |
| intent : InfrastructureIntent |
| The infrastructure request to evaluate. |
| cost_estimate : float or None |
| Estimated monthly cost (used by costβthreshold policies). |
| policy_violations : list[str] |
| Preβcomputed policy violation strings (from the Python evaluator). |
| |
| Returns |
| ------- |
| dict |
| Keys: risk_score, explanation, contributions. |
| """ |
| t0 = time.monotonic() |
| span = None |
| if OTEL_AVAILABLE and _tracer: |
| span = _tracer.start_span("risk_service.evaluate_intent") |
| span.set_attribute("intent_type", type(intent).__name__) |
|
|
| |
| if _RUST_ENFORCER_AVAILABLE and _ensure_rust_evaluator(): |
| try: |
| rust_intent = { |
| "action": getattr(intent, "intent_type", "unknown"), |
| "component": getattr(intent, "service_name", "unknown"), |
| "region": getattr(intent, "region", None), |
| "resource_type": getattr(intent, "resource_type", None), |
| "permission_level": getattr(intent, "permission_level", None), |
| "extra": {} |
| } |
| rust_raw = _rust_evaluator.evaluate( |
| json.dumps(rust_intent), cost_estimate |
| ) |
| rust_violations = json.loads(rust_raw) |
|
|
| agreed = set(rust_violations) == set(policy_violations) |
| _RUST_AGREEMENT.labels(result="agreed" if agreed else "diverged").inc() |
| if not agreed: |
| msg = ( |
| "Rust enforcer divergence: " |
| f"Rust={sorted(rust_violations)} Python={sorted(policy_violations)}" |
| ) |
| logger.warning(msg) |
| if span: |
| span.add_event("rust_enforcer_divergence", { |
| "rust_violations": rust_violations, |
| "python_violations": policy_violations |
| }) |
| except Exception as exc: |
| logger.debug("Rust enforcer shadow evaluation failed: %s", exc) |
|
|
| |
|
|
| |
| if _RUST_ENFORCER_AVAILABLE and os.getenv("ARF_RUST_CANARY", "false").lower() == "true": |
| try: |
| from prometheus_client import REGISTRY |
| lower = REGISTRY.get_sample_value("arf_rust_agreement_lower_bound", {}) |
| if lower is not None and lower > 0.9999: |
| policy_violations = rust_violations |
| if span: |
| span.set_attribute("rust_enforcer_active", True) |
| except Exception: |
| pass |
| try: |
| score, explanation, contributions = engine.calculate_risk( |
| intent=intent, |
| cost_estimate=cost_estimate, |
| policy_violations=policy_violations |
| ) |
| engine_label = "python" |
| status = "success" |
| except Exception: |
| _EVAL_COUNTER.labels(engine="python", status="error").inc() |
| _EVAL_DURATION.labels(engine="python").observe(time.monotonic() - t0) |
| raise |
|
|
| _EVAL_COUNTER.labels(engine=engine_label, status=status).inc() |
| _EVAL_DURATION.labels(engine=engine_label).observe(time.monotonic() - t0) |
|
|
| if span: |
| span.set_attribute("risk_score", score) |
| if _RUST_ENFORCER_AVAILABLE: |
| span.set_attribute("rust_enforcer_available", True) |
| span.end() |
|
|
| return { |
| "risk_score": score, |
| "explanation": explanation, |
| "contributions": contributions |
| } |
|
|
|
|
| def evaluate_healing_decision( |
| event: ReliabilityEvent, |
| policy_engine: PolicyEngine, |
| decision_engine: Optional[DecisionEngine] = None, |
| rag_graph: Optional[RAGGraphMemory] = None, |
| model=None, |
| tokenizer=None, |
| ) -> Dict[str, Any]: |
| """ |
| Evaluate healing actions for a given reliability event using decisionβtheoretic selection. |
| Includes epistemic risk signals from the eclipse probe. |
| |
| Parameters |
| ---------- |
| event : ReliabilityEvent |
| The incident event containing latency, error rate, etc. |
| policy_engine : PolicyEngine |
| The ARF healing policy engine with configured policies. |
| decision_engine : DecisionEngine, optional |
| If omitted, a default instance is created. |
| rag_graph : RAGGraphMemory, optional |
| Semantic memory for similar incident retrieval. |
| model, tokenizer : optional |
| HuggingFace model and tokenizer for epistemic risk computation. |
| |
| Returns |
| ------- |
| dict |
| Keys: risk_score, selected_action, expected_utility, alternatives, |
| explanation, epistemic_signals. |
| """ |
| t0 = time.monotonic() |
| span = None |
| if OTEL_AVAILABLE and _tracer: |
| span = _tracer.start_span("risk_service.evaluate_healing") |
| span.set_attribute("component", event.component) |
|
|
| |
| if decision_engine is None and hasattr(policy_engine, 'decision_engine'): |
| decision_engine = policy_engine.decision_engine |
|
|
| |
| if decision_engine is None: |
| logger.debug("No DecisionEngine provided; creating default instance") |
| decision_engine = DecisionEngine(rag_graph=rag_graph) |
|
|
| |
| orig_use = policy_engine.use_decision_engine |
| try: |
| policy_engine.use_decision_engine = False |
| raw_actions = policy_engine.evaluate_policies(event) |
| finally: |
| policy_engine.use_decision_engine = orig_use |
|
|
| |
| if not raw_actions or raw_actions == [HealingAction.NO_ACTION]: |
| if span: |
| span.set_attribute("selected_action", HealingAction.NO_ACTION.value) |
| span.end() |
| _EVAL_COUNTER.labels(engine="python", status="success").inc() |
| _EVAL_DURATION.labels(engine="python").observe(time.monotonic() - t0) |
| return { |
| "risk_score": 0.0, |
| "selected_action": HealingAction.NO_ACTION.value, |
| "expected_utility": 0.0, |
| "alternatives": [], |
| "explanation": "No candidate actions triggered.", |
| "epistemic_signals": None, |
| } |
|
|
| |
| reasoning_parts = [] |
| for policy in policy_engine.policies: |
| if any(a in policy.actions for a in raw_actions): |
| conditions_str = ", ".join( |
| f"{c.metric} {c.operator} {c.threshold}" for c in policy.conditions |
| ) |
| reasoning_parts.append( |
| f"Policy {policy.name} triggered by {conditions_str} β actions {[a.value for a in policy.actions]}" |
| ) |
| reasoning_text = " ".join(reasoning_parts) |
|
|
| |
| evidence_text = ( |
| f"Component: {event.component}, " |
| f"latency_p99: {event.latency_p99}, " |
| f"error_rate: {event.error_rate}, " |
| f"cpu_util: {event.cpu_util}, " |
| f"memory_util: {event.memory_util}" |
| ) |
|
|
| |
| epistemic_signals = None |
| if model is not None and tokenizer is not None: |
| try: |
| epistemic_signals = compute_epistemic_risk( |
| reasoning_text, evidence_text, model, tokenizer |
| ) |
| except Exception as e: |
| logger.error(f"Failed to compute epistemic risk: {e}") |
| epistemic_signals = { |
| "entropy": 0.0, |
| "contradiction": 0.0, |
| "evidence_lift": 0.0, |
| "hallucination_risk": 0.0, |
| } |
| else: |
| logger.debug("Epistemic model/tokenizer not provided; using zero signals") |
| epistemic_signals = { |
| "entropy": 0.0, |
| "contradiction": 0.0, |
| "evidence_lift": 0.0, |
| "hallucination_risk": 0.0, |
| } |
|
|
| |
| decision = decision_engine.select_optimal_action( |
| raw_actions, event, component=event.component, |
| epistemic_signals=epistemic_signals |
| ) |
|
|
| |
| risk_score = None |
| for alt in decision.alternatives: |
| if alt.action == decision.best_action: |
| risk_score = alt.risk |
| break |
| if risk_score is None: |
| |
| risk_score = decision_engine.compute_risk( |
| decision.best_action, event, event.component) |
|
|
| |
| alt_list = [] |
| for alt in decision.alternatives[:3]: |
| alt_list.append({ |
| "action": alt.action.value, |
| "expected_utility": alt.utility, |
| "risk": alt.risk, |
| }) |
|
|
| |
| _EVAL_COUNTER.labels(engine="python", status="success").inc() |
| _EVAL_DURATION.labels(engine="python").observe(time.monotonic() - t0) |
|
|
| if span: |
| span.set_attribute("risk_score", risk_score) |
| span.set_attribute("selected_action", decision.best_action.value) |
| span.set_attribute("expected_utility", decision.expected_utility) |
| span.end() |
|
|
| return { |
| "risk_score": risk_score, |
| "selected_action": decision.best_action.value, |
| "expected_utility": decision.expected_utility, |
| "alternatives": alt_list, |
| "explanation": decision.explanation, |
| "raw_decision": decision.raw_data, |
| "epistemic_signals": epistemic_signals, |
| } |
|
|
|
|
| def get_system_risk() -> float: |
| """ |
| Return an aggregated risk score across all monitored components. |
| This is a placeholder β the endpoint is deprecated. |
| Raises NotImplementedError to avoid random fallback. |
| """ |
| raise NotImplementedError( |
| "get_system_risk is deprecated. Use componentβlevel risk evaluation instead." |
| ) |
|
|