Agentic-Reliability-Framework-API / app /api /routes_incidents.py
petter2025's picture
Upload folder using huggingface_hub
afa4de7 verified
raw
history blame
10 kB
"""
Incident evaluation endpoints — backward‑compatible Bayesian reroute.
This module provides two incident‑related routes:
* ``POST /api/v1/report_incident``
Stores a ``ReliabilityEvent`` in an in‑memory history for auditing
and debugging.
* ``POST /api/v1/v1/incidents/evaluate`` **(deprecated)**
Former heuristic endpoint now **rerouted to the full Bayesian risk
engine**. All callers should migrate to
``POST /api/v1/intents/evaluate``, which returns richer metadata
including CUDL uncertainty decomposition and decision traces.
The local model duplicates (``ReliabilityEvent``, ``HealingAction``)
have been removed; all types are imported from the canonical ARF core
framework (``agentic_reliability_framework.core.models.event``).
"""
from __future__ import annotations
import logging
import time
from typing import Optional
from fastapi import APIRouter, BackgroundTasks, Depends, HTTPException, Request
from agentic_reliability_framework.core.models.event import (
HealingAction,
ReliabilityEvent,
)
from app.causal_explainer import CausalExplainer
from app.core.usage_tracker import UsageRecord, enforce_quota, tracker
logger = logging.getLogger(__name__)
router = APIRouter()
# ---------------------------------------------------------------------------
# In‑memory incident store (for auditing / debugging only)
# ---------------------------------------------------------------------------
incident_history: list[dict] = []
# ---------------------------------------------------------------------------
# POST /api/v1/report_incident
# ---------------------------------------------------------------------------
@router.post("/report_incident")
async def report_incident(event: ReliabilityEvent) -> dict[str, str]:
"""
Record a ``ReliabilityEvent`` in the in‑memory incident history.
This endpoint is used by internal monitoring tools to feed incident
data into the causal explainer and downstream analysis. The event
is stored as a JSON‑safe dictionary and is **not** persisted across
API restarts.
Parameters
----------
event : ReliabilityEvent
The reliability event to record. Must include at minimum
``component``, ``latency_p99``, ``error_rate``, and
``service_mesh``.
Returns
-------
dict
A simple acknowledgement ``{"status": "recorded"}``.
"""
incident_history.append(event.model_dump(mode="json"))
return {"status": "recorded"}
# ---------------------------------------------------------------------------
# POST /api/v1/v1/incidents/evaluate (deprecated)
# ---------------------------------------------------------------------------
@router.post("/v1/incidents/evaluate")
async def evaluate_incident(
request: Request,
event: ReliabilityEvent,
background_tasks: BackgroundTasks,
quota: dict = Depends(enforce_quota),
) -> dict:
"""
Evaluate an incident using the **Bayesian risk engine**.
.. deprecated:: 0.6.0
Use ``POST /api/v1/intents/evaluate`` instead. This endpoint
will be removed in a future release. Responses include a
``deprecation_notice`` field to assist migration.
The following steps are performed:
1. Convert the ``ReliabilityEvent`` into a minimal
``DeployConfigurationIntent`` via ``intent_adapter``.
2. Call ``risk_service.evaluate_intent()`` to obtain a Bayesian
risk score.
3. Generate a heuristic healing action based on the risk score.
4. Run the causal explainer for counter‑factual text.
5. Build a backward‑compatible response envelope.
Parameters
----------
request : Request
The Starlette request object (used for internal state access).
event : ReliabilityEvent
The incident event containing component name, latency, error
rate, etc.
background_tasks : BackgroundTasks
FastAPI background‑task runner for asynchronous logging.
quota : dict
Injected by ``enforce_quota``; contains ``api_key``, ``tier``,
and ``remaining``.
Returns
-------
dict
A dictionary with keys:
* ``deprecation_notice`` (str) — migration guidance.
* ``healing_intent`` (dict) — action, component, risk score,
justification, confidence, and advisory status.
* ``causal_explanation`` (dict) — factual/counter‑factual
outcomes and explanation text.
* ``utility_decision`` (dict) — selected action and expected
utility.
"""
start_time = time.time()
api_key: str = quota["api_key"]
tier = quota["tier"]
response_data: Optional[dict] = None
error_msg: Optional[str] = None
try:
# ------------------------------------------------------------------
# Step 1 – Convert the event into an infrastructure intent
# ------------------------------------------------------------------
from app.services.intent_adapter import to_oss_intent
from app.services.risk_service import evaluate_intent
raw_intent = {
"intent_type": "deploy_config",
"environment": "prod",
"service_name": event.component,
"requester": "auto",
"change_scope": "global",
"deployment_target": "prod",
"configuration": {},
"provenance": {"source": "incident_evaluate"},
}
oss_intent = to_oss_intent(raw_intent)
# ------------------------------------------------------------------
# Step 2 – Bayesian risk evaluation
# ------------------------------------------------------------------
risk_engine = request.app.state.risk_engine
result = evaluate_intent(
engine=risk_engine,
intent=oss_intent,
cost_estimate=None,
policy_violations=[],
)
# ------------------------------------------------------------------
# Step 3 – Heuristic action selection based on risk threshold
# ------------------------------------------------------------------
optimal_action = (
HealingAction.RESTART_CONTAINER
if result["risk_score"] > 0.5
else HealingAction.NO_ACTION
)
# ------------------------------------------------------------------
# Step 4 – Causal explainer
# ------------------------------------------------------------------
causal_explainer = CausalExplainer()
current_state = {
"latency": event.latency_p99,
"error_rate": event.error_rate,
"last_action": {"action_type": "no_action"},
}
proposed_action = {"action_type": optimal_action.value, "params": {}}
causal_exp = causal_explainer.explain_healing_intent(
proposed_action, current_state, "latency"
)
# ------------------------------------------------------------------
# Step 5 – Build response envelope
# ------------------------------------------------------------------
healing_intent = {
"action": optimal_action.value,
"component": event.component,
"parameters": {},
"justification": (
f"Bayesian risk score: {result['risk_score']:.3f}. "
f"Causal: {causal_exp.explanation_text}"
),
"confidence": 1.0 - result.get("uncertainty", 0.0),
"risk_score": result["risk_score"],
"status": "oss_advisory_only",
}
response_data = {
"deprecation_notice": (
"This endpoint is deprecated. Use POST /api/v1/intents/evaluate "
"for the full Bayesian evaluation with CUDL decomposition."
),
"healing_intent": healing_intent,
"causal_explanation": {
"factual_outcome": causal_exp.factual_outcome,
"counterfactual_outcome": causal_exp.counterfactual_outcome,
"effect": causal_exp.effect,
"explanation_text": causal_exp.explanation_text,
"is_model_based": causal_exp.is_model_based,
"warnings": causal_exp.warnings,
},
"utility_decision": {
"best_action": optimal_action.value,
"expected_utility": 0.5,
"explanation": (
"Decision based on Bayesian risk threshold > 0.5"
),
},
}
# ------------------------------------------------------------------
# Asynchronous usage logging
# ------------------------------------------------------------------
if tracker:
record = UsageRecord(
api_key=api_key,
tier=tier,
timestamp=time.time(),
endpoint="/v1/incidents/evaluate",
request_body=event.model_dump(mode="json"),
response=response_data,
processing_ms=(time.time() - start_time) * 1000,
)
await tracker.increment_usage_async(record, background_tasks)
logger.warning(
"Deprecated endpoint /v1/incidents/evaluate called by key %s",
api_key[:8],
)
return response_data
except HTTPException:
raise
except Exception as exc:
error_msg = str(exc)
if tracker:
record = UsageRecord(
api_key=api_key,
tier=tier,
timestamp=time.time(),
endpoint="/v1/incidents/evaluate",
request_body=event.model_dump(mode="json"),
error=error_msg,
processing_ms=(time.time() - start_time) * 1000,
)
await tracker.increment_usage_async(record, background_tasks)
raise HTTPException(status_code=500, detail=error_msg)