Agentic-Reliability-Framework-API / app /api /routes_governance.py
petter2025's picture
Upload folder using huggingface_hub
afa4de7 verified
raw
history blame
11.3 kB
from fastapi import APIRouter, Depends, HTTPException, Request, BackgroundTasks, Header
from fastapi.encoders import jsonable_encoder
from sqlalchemy.orm import Session
from app.models.infrastructure_intents import InfrastructureIntentRequest
from app.services.intent_adapter import to_oss_intent
from app.services.risk_service import evaluate_intent, evaluate_healing_decision
from app.services.intent_store import save_evaluated_intent
from app.services.outcome_service import record_outcome
from app.api.deps import get_db
from pydantic import BaseModel
import uuid
import logging
import time
from typing import Optional
from agentic_reliability_framework.core.models.event import ReliabilityEvent
# ===== USAGE TRACKER IMPORTS =====
import app.core.usage_tracker
from app.core.usage_tracker import UsageRecord
# ===== PRICING CALCULATOR INTEGRATION =====
try:
from arf_pricing_calculator.storage.buffer import add_event
PRICING_AVAILABLE = True
except ImportError:
PRICING_AVAILABLE = False
add_event = None
# ===== OpenTelemetry (optional) =====
try:
from opentelemetry import trace
from opentelemetry.trace import Status, StatusCode
_tracer = trace.get_tracer(__name__)
OTEL_AVAILABLE = True
except ImportError:
OTEL_AVAILABLE = False
_tracer = None
logger = logging.getLogger(__name__)
router = APIRouter()
class OutcomeRequest(BaseModel):
deterministic_id: str
success: bool
recorded_by: str
notes: str = ""
class HealingDecisionRequest(BaseModel):
event: ReliabilityEvent
@router.post("/intents/evaluate")
async def evaluate_intent_endpoint(
request: Request,
intent_req: InfrastructureIntentRequest,
background_tasks: BackgroundTasks,
db: Session = Depends(get_db),
idempotency_key: Optional[str] = Header(None, alias="Idempotency-Key"),
):
"""
Evaluate an infrastructure intent with idempotency and atomic quota consumption.
"""
# ── optional trace ──────────────────────────────────────
span = None
if OTEL_AVAILABLE and _tracer:
span = _tracer.start_span("governance.evaluate_intent")
span.set_attribute("intent_type", intent_req.intent_type)
span.set_attribute("environment", str(intent_req.environment))
start_time = time.time()
api_key = request.headers.get("Authorization", "").replace("Bearer ", "")
if not api_key:
api_key = request.query_params.get("api_key", "unknown")
current_tracker = app.core.usage_tracker.tracker
if current_tracker is None:
if span:
span.set_status(Status(StatusCode.ERROR, "tracker unavailable"))
span.end()
raise HTTPException(status_code=503,
detail="Usage tracking service unavailable")
record = UsageRecord(
api_key=api_key,
tier=None,
timestamp=start_time,
endpoint="/api/v1/intents/evaluate",
request_body=intent_req.model_dump(),
processing_ms=None,
)
success, existing_response = current_tracker.consume_quota_and_log(
record=record,
idempotency_key=idempotency_key
)
if not success:
if span:
span.set_attribute("idempotent_hit", True if existing_response else False)
span.end()
if existing_response:
return existing_response
else:
raise HTTPException(status_code=429,
detail="Monthly evaluation quota exceeded")
try:
oss_intent = to_oss_intent(intent_req)
risk_engine = request.app.state.risk_engine
result = evaluate_intent(
engine=risk_engine,
intent=oss_intent,
cost_estimate=intent_req.estimated_cost,
policy_violations=intent_req.policy_violations
)
if span:
span.set_attribute("risk_score", result["risk_score"])
span.set_attribute("deterministic_id", str(uuid.uuid4())) # will be overwritten later, but fine for trace
deterministic_id = str(uuid.uuid4())
api_payload = jsonable_encoder(intent_req.model_dump())
oss_payload = jsonable_encoder(oss_intent.model_dump())
save_evaluated_intent(
db=db,
deterministic_id=deterministic_id,
intent_type=intent_req.intent_type,
api_payload=api_payload,
oss_payload=oss_payload,
environment=str(intent_req.environment),
risk_score=result["risk_score"]
)
result["intent_id"] = deterministic_id
response_data = result
if current_tracker:
background_tasks.add_task(
current_tracker._insert_audit_log,
UsageRecord(
api_key=api_key,
tier=None,
timestamp=time.time(),
endpoint="/api/v1/intents/evaluate/response",
request_body=None,
response=response_data,
processing_ms=(time.time() - start_time) * 1000,
)
)
if span:
span.set_attribute("intent_id", deterministic_id)
span.set_status(Status(StatusCode.OK))
span.end()
return response_data
except HTTPException:
if span:
span.set_status(Status(StatusCode.ERROR, "HTTP exception"))
span.end()
raise
except Exception as e:
error_msg = str(e)
logger.exception("Error in evaluate_intent_endpoint")
if span:
span.set_status(Status(StatusCode.ERROR, error_msg))
span.record_exception(e)
span.end()
raise HTTPException(status_code=500, detail=error_msg)
@router.post("/intents/outcome")
async def record_outcome_endpoint(
request: Request,
outcome: OutcomeRequest,
db: Session = Depends(get_db),
idempotency_key: Optional[str] = Header(None, alias="Idempotency-Key"),
):
"""
Record an outcome for a previously evaluated intent.
Idempotent based on deterministic_id and success value (handled in service).
Also updates the pricing calculator's calibration buffer if available.
"""
try:
risk_engine = request.app.state.risk_engine
outcome_record = record_outcome(
db=db,
deterministic_id=outcome.deterministic_id,
success=outcome.success,
recorded_by=outcome.recorded_by,
notes=outcome.notes,
risk_engine=risk_engine,
idempotency_key=idempotency_key,
)
if PRICING_AVAILABLE and add_event is not None:
try:
event = {
"run_id": outcome.deterministic_id,
"outcome": "success" if outcome.success else "failure",
"recorded_at": time.time(),
"source": "arf_api_outcome"
}
add_event(event)
logger.info(
f"Added outcome to pricing buffer for intent {
outcome.deterministic_id}")
except Exception as e:
logger.warning(
f"Failed to update pricing buffer for intent {
outcome.deterministic_id}: {e}")
return {"message": "Outcome recorded", "outcome_id": outcome_record.id}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.post("/healing/evaluate")
async def evaluate_healing_decision_endpoint(
request: Request,
decision_req: HealingDecisionRequest,
background_tasks: BackgroundTasks,
idempotency_key: Optional[str] = Header(None, alias="Idempotency-Key"),
):
"""
Evaluate a healing decision with idempotency and atomic quota consumption.
"""
# ── optional trace ──────────────────────────────────────
span = None
if OTEL_AVAILABLE and _tracer:
span = _tracer.start_span("governance.evaluate_healing")
span.set_attribute("component", decision_req.event.component)
start_time = time.time()
api_key = request.headers.get("Authorization", "").replace("Bearer ", "")
if not api_key:
api_key = request.query_params.get("api_key", "unknown")
current_tracker = app.core.usage_tracker.tracker
if current_tracker is None:
if span:
span.set_status(Status(StatusCode.ERROR, "tracker unavailable"))
span.end()
raise HTTPException(status_code=503,
detail="Usage tracking service unavailable")
record = UsageRecord(
api_key=api_key,
tier=None,
timestamp=start_time,
endpoint="/api/v1/healing/evaluate",
request_body=decision_req.model_dump(),
processing_ms=None,
)
success, existing_response = current_tracker.consume_quota_and_log(
record=record,
idempotency_key=idempotency_key
)
if not success:
if span:
span.set_attribute("idempotent_hit", True if existing_response else False)
span.end()
if existing_response:
return existing_response
else:
raise HTTPException(status_code=429,
detail="Monthly evaluation quota exceeded")
try:
policy_engine = request.app.state.policy_engine
rag_graph = getattr(request.app.state, "rag_graph", None)
model = getattr(request.app.state, "epistemic_model", None)
tokenizer = getattr(request.app.state, "epistemic_tokenizer", None)
response_data = evaluate_healing_decision(
event=decision_req.event,
policy_engine=policy_engine,
decision_engine=None,
rag_graph=rag_graph,
model=model,
tokenizer=tokenizer,
)
if span:
span.set_attribute("risk_score", response_data.get("risk_score", 0.0))
span.set_attribute("selected_action", response_data.get("selected_action", "unknown"))
span.set_status(Status(StatusCode.OK))
span.end()
if current_tracker:
background_tasks.add_task(
current_tracker._insert_audit_log,
UsageRecord(
api_key=api_key,
tier=None,
timestamp=time.time(),
endpoint="/api/v1/healing/evaluate/response",
request_body=None,
response=response_data,
processing_ms=(time.time() - start_time) * 1000,
)
)
return response_data
except HTTPException:
if span:
span.set_status(Status(StatusCode.ERROR, "HTTP exception"))
span.end()
raise
except Exception as e:
error_msg = str(e)
logger.exception("Error in evaluate_healing_decision_endpoint")
if span:
span.set_status(Status(StatusCode.ERROR, error_msg))
span.record_exception(e)
span.end()
raise HTTPException(status_code=500, detail=error_msg)