File size: 14,126 Bytes
afa4de7
 
 
 
 
 
 
 
 
 
 
2d521fd
afa4de7
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
0fa17fa
afa4de7
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
2d521fd
 
 
 
afa4de7
2d521fd
 
 
afa4de7
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
2d521fd
afa4de7
 
 
2d521fd
 
 
 
afa4de7
2d521fd
 
 
0fa17fa
 
2d521fd
afa4de7
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
2d521fd
 
 
 
afa4de7
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
2d521fd
 
 
 
afa4de7
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
"""
Risk service – integrates ARF risk engine, policy engine, and decision engine.
Deterministic, no random fallbacks, explicit error handling.

Version: 2026-05-04 – added Prometheus metrics for observability.
"""

import json
import logging
import os
import time
from typing import Optional, List, Dict, Any

from agentic_reliability_framework.core.governance.risk_engine import RiskEngine
from agentic_reliability_framework.core.governance.intents import InfrastructureIntent
from agentic_reliability_framework.core.models.event import ReliabilityEvent, HealingAction
from agentic_reliability_framework.core.governance.policy_engine import PolicyEngine
from agentic_reliability_framework.core.decision.decision_engine import DecisionEngine
from agentic_reliability_framework.runtime.memory.rag_graph import RAGGraphMemory
from agentic_reliability_framework.core.research.eclipse_probe import compute_epistemic_risk

# ── optional tracing ─────────────────────────────────────────
try:
    from opentelemetry import trace
    _tracer = trace.get_tracer(__name__)
    OTEL_AVAILABLE = True
except ImportError:
    OTEL_AVAILABLE = False
    _tracer = None

# ── Prometheus metrics (always registered; no‑op if not scraped) ─
from prometheus_client import Counter, Histogram

_EVAL_COUNTER = Counter(
    "arf_evaluations_total",
    "Total evaluation calls (intent + healing), partitioned by engine and status.",
    ["engine", "status"],
)

_EVAL_DURATION = Histogram(
    "arf_evaluation_duration_seconds",
    "End‑to‑end latency of evaluation calls.",
    ["engine"],
    buckets=(0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0),
)

_RUST_AGREEMENT = Counter(
    "arf_rust_agreement_total",
    "Agreement between Rust enforcer and Python policy evaluation.",
    ["result"],  # "agreed" or "diverged"
)

# ── optional Rust enforcer (shadow mode) ──────────────────────
_RUST_ENFORCER_AVAILABLE = False
_rust_evaluator = None            # singleton per process
_rust_policy_json: Optional[str] = None

if os.getenv("ARF_USE_RUST_ENFORCER", "false").lower() == "true":
    try:
        import arf_enforcer
        _RUST_ENFORCER_AVAILABLE = True
    except ImportError:
        pass

# Default OSS policy tree – mirrors the hard‑coded rules in the Python PolicyEvaluator
# that check region, resource type, and max permission level.
_OSS_POLICY_TREE_JSON = json.dumps({
    "And": [
        {"Atomic": {"RegionAllowed": {"allowed_regions": ["eastus"]}}},
        {"Atomic": {"ResourceTypeRestricted": {
            "forbidden_types": ["DATABASE_DROP", "FULL_ROLLOUT", "SYSTEM_SHUTDOWN", "SECRET_ROTATION"]
        }}},
        {"Atomic": {"MaxPermissionLevel": {"max_level": "admin"}}}
    ]
})


def _ensure_rust_evaluator() -> bool:
    """Lazy initialise the Rust policy evaluator.  Returns True on success."""
    global _rust_evaluator, _rust_policy_json
    if _rust_evaluator is not None:
        return True
    if not _RUST_ENFORCER_AVAILABLE:
        return False
    try:
        _rust_policy_json = _OSS_POLICY_TREE_JSON
        _rust_evaluator = arf_enforcer.PyPolicyEvaluator(_rust_policy_json)
        return True
    except Exception:
        _rust_evaluator = None
        return False


logger = logging.getLogger(__name__)


def evaluate_intent(
    engine: RiskEngine,
    intent: InfrastructureIntent,
    cost_estimate: Optional[float],
    policy_violations: List[str]
) -> dict:
    """
    Evaluate an infrastructure intent using the Bayesian risk engine.

    Optionally shadows the policy evaluation with the Rust enforcer when
    the environment variable ARF_USE_RUST_ENFORCER is set to "true".
    Any divergence is logged and counted as a Prometheus metric.

    Parameters
    ----------
    engine : RiskEngine
        Initialised ARF Bayesian risk engine.
    intent : InfrastructureIntent
        The infrastructure request to evaluate.
    cost_estimate : float or None
        Estimated monthly cost (used by cost‑threshold policies).
    policy_violations : list[str]
        Pre‑computed policy violation strings (from the Python evaluator).

    Returns
    -------
    dict
        Keys: risk_score, explanation, contributions.
    """
    t0 = time.monotonic()
    span = None
    if OTEL_AVAILABLE and _tracer:
        span = _tracer.start_span("risk_service.evaluate_intent")
        span.set_attribute("intent_type", type(intent).__name__)

    # ── Shadow Rust enforcer (best‑effort, non‑blocking) ──────
    if _RUST_ENFORCER_AVAILABLE and _ensure_rust_evaluator():
        try:
            rust_intent = {
                "action": getattr(intent, "intent_type", "unknown"),
                "component": getattr(intent, "service_name", "unknown"),
                "region": getattr(intent, "region", None),
                "resource_type": getattr(intent, "resource_type", None),
                "permission_level": getattr(intent, "permission_level", None),
                "extra": {}
            }
            rust_raw = _rust_evaluator.evaluate(
                json.dumps(rust_intent), cost_estimate
            )
            rust_violations = json.loads(rust_raw)

            agreed = set(rust_violations) == set(policy_violations)
            _RUST_AGREEMENT.labels(result="agreed" if agreed else "diverged").inc()
            if not agreed:
                msg = (
                    "Rust enforcer divergence: "
                    f"Rust={sorted(rust_violations)} Python={sorted(policy_violations)}"
                )
                logger.warning(msg)
                if span:
                    span.add_event("rust_enforcer_divergence", {
                        "rust_violations": rust_violations,
                        "python_violations": policy_violations
                    })
        except Exception as exc:
            logger.debug("Rust enforcer shadow evaluation failed: %s", exc)

    # ── Core risk evaluation ──────────────────────────────────

    # ── Automated canary promotion ──────────────────────────
    if _RUST_ENFORCER_AVAILABLE and os.getenv("ARF_RUST_CANARY", "false").lower() == "true":
        try:
            from prometheus_client import REGISTRY
            lower = REGISTRY.get_sample_value("arf_rust_agreement_lower_bound", {})
            if lower is not None and lower > 0.9999:
                policy_violations = rust_violations
                if span:
                    span.set_attribute("rust_enforcer_active", True)
        except Exception:
            pass
    try:
        score, explanation, contributions = engine.calculate_risk(
            intent=intent,
            cost_estimate=cost_estimate,
            policy_violations=policy_violations
        )
        engine_label = "python"
        status = "success"
    except Exception:
        _EVAL_COUNTER.labels(engine="python", status="error").inc()
        _EVAL_DURATION.labels(engine="python").observe(time.monotonic() - t0)
        raise

    _EVAL_COUNTER.labels(engine=engine_label, status=status).inc()
    _EVAL_DURATION.labels(engine=engine_label).observe(time.monotonic() - t0)

    if span:
        span.set_attribute("risk_score", score)
        if _RUST_ENFORCER_AVAILABLE:
            span.set_attribute("rust_enforcer_available", True)
        span.end()

    return {
        "risk_score": score,
        "explanation": explanation,
        "contributions": contributions
    }


def evaluate_healing_decision(
    event: ReliabilityEvent,
    policy_engine: PolicyEngine,
    decision_engine: Optional[DecisionEngine] = None,
    rag_graph: Optional[RAGGraphMemory] = None,
    model=None,
    tokenizer=None,
) -> Dict[str, Any]:
    """
    Evaluate healing actions for a given reliability event using decision‑theoretic selection.
    Includes epistemic risk signals from the eclipse probe.

    Parameters
    ----------
    event : ReliabilityEvent
        The incident event containing latency, error rate, etc.
    policy_engine : PolicyEngine
        The ARF healing policy engine with configured policies.
    decision_engine : DecisionEngine, optional
        If omitted, a default instance is created.
    rag_graph : RAGGraphMemory, optional
        Semantic memory for similar incident retrieval.
    model, tokenizer : optional
        HuggingFace model and tokenizer for epistemic risk computation.

    Returns
    -------
    dict
        Keys: risk_score, selected_action, expected_utility, alternatives,
        explanation, epistemic_signals.
    """
    t0 = time.monotonic()
    span = None
    if OTEL_AVAILABLE and _tracer:
        span = _tracer.start_span("risk_service.evaluate_healing")
        span.set_attribute("component", event.component)

    # If decision_engine not provided, try to get from policy_engine
    if decision_engine is None and hasattr(policy_engine, 'decision_engine'):
        decision_engine = policy_engine.decision_engine

    # If still None, create a minimal one (global stats only)
    if decision_engine is None:
        logger.debug("No DecisionEngine provided; creating default instance")
        decision_engine = DecisionEngine(rag_graph=rag_graph)

    # Get raw candidate actions (by temporarily disabling decision engine)
    orig_use = policy_engine.use_decision_engine
    try:
        policy_engine.use_decision_engine = False
        raw_actions = policy_engine.evaluate_policies(event)
    finally:
        policy_engine.use_decision_engine = orig_use

    # If no actions, return NO_ACTION
    if not raw_actions or raw_actions == [HealingAction.NO_ACTION]:
        if span:
            span.set_attribute("selected_action", HealingAction.NO_ACTION.value)
            span.end()
        _EVAL_COUNTER.labels(engine="python", status="success").inc()
        _EVAL_DURATION.labels(engine="python").observe(time.monotonic() - t0)
        return {
            "risk_score": 0.0,
            "selected_action": HealingAction.NO_ACTION.value,
            "expected_utility": 0.0,
            "alternatives": [],
            "explanation": "No candidate actions triggered.",
            "epistemic_signals": None,
        }

    # Build reasoning text from policies that triggered the actions
    reasoning_parts = []
    for policy in policy_engine.policies:
        if any(a in policy.actions for a in raw_actions):
            conditions_str = ", ".join(
                f"{c.metric} {c.operator} {c.threshold}" for c in policy.conditions
            )
            reasoning_parts.append(
                f"Policy {policy.name} triggered by {conditions_str} β†’ actions {[a.value for a in policy.actions]}"
            )
    reasoning_text = " ".join(reasoning_parts)

    # Build evidence text from the event
    evidence_text = (
        f"Component: {event.component}, "
        f"latency_p99: {event.latency_p99}, "
        f"error_rate: {event.error_rate}, "
        f"cpu_util: {event.cpu_util}, "
        f"memory_util: {event.memory_util}"
    )

    # Compute epistemic signals (if model/tokenizer provided)
    epistemic_signals = None
    if model is not None and tokenizer is not None:
        try:
            epistemic_signals = compute_epistemic_risk(
                reasoning_text, evidence_text, model, tokenizer
            )
        except Exception as e:
            logger.error(f"Failed to compute epistemic risk: {e}")
            epistemic_signals = {
                "entropy": 0.0,
                "contradiction": 0.0,
                "evidence_lift": 0.0,
                "hallucination_risk": 0.0,
            }
    else:
        logger.debug("Epistemic model/tokenizer not provided; using zero signals")
        epistemic_signals = {
            "entropy": 0.0,
            "contradiction": 0.0,
            "evidence_lift": 0.0,
            "hallucination_risk": 0.0,
        }

    # Run decision engine to get best action and alternatives
    decision = decision_engine.select_optimal_action(
        raw_actions, event, component=event.component,
        epistemic_signals=epistemic_signals
    )

    # Extract risk of the selected action
    risk_score = None
    for alt in decision.alternatives:
        if alt.action == decision.best_action:
            risk_score = alt.risk
            break
    if risk_score is None:
        # Compute risk separately
        risk_score = decision_engine.compute_risk(
            decision.best_action, event, event.component)

    # Format alternatives (top 3 only)
    alt_list = []
    for alt in decision.alternatives[:3]:
        alt_list.append({
            "action": alt.action.value,
            "expected_utility": alt.utility,
            "risk": alt.risk,
        })

    # ── Metrics & span finalisation ───────────────────────────
    _EVAL_COUNTER.labels(engine="python", status="success").inc()
    _EVAL_DURATION.labels(engine="python").observe(time.monotonic() - t0)

    if span:
        span.set_attribute("risk_score", risk_score)
        span.set_attribute("selected_action", decision.best_action.value)
        span.set_attribute("expected_utility", decision.expected_utility)
        span.end()

    return {
        "risk_score": risk_score,
        "selected_action": decision.best_action.value,
        "expected_utility": decision.expected_utility,
        "alternatives": alt_list,
        "explanation": decision.explanation,
        "raw_decision": decision.raw_data,
        "epistemic_signals": epistemic_signals,
    }


def get_system_risk() -> float:
    """
    Return an aggregated risk score across all monitored components.
    This is a placeholder – the endpoint is deprecated.
    Raises NotImplementedError to avoid random fallback.
    """
    raise NotImplementedError(
        "get_system_risk is deprecated. Use component‑level risk evaluation instead."
    )