| from fastapi import APIRouter, Depends, HTTPException, Request, BackgroundTasks, Header |
| from fastapi.encoders import jsonable_encoder |
| from sqlalchemy.orm import Session |
| from app.models.infrastructure_intents import InfrastructureIntentRequest |
| from app.services.intent_adapter import to_oss_intent |
| from app.services.risk_service import evaluate_intent, evaluate_healing_decision |
| from app.services.intent_store import save_evaluated_intent |
| from app.services.outcome_service import record_outcome |
| from app.api.deps import get_db |
| from pydantic import BaseModel |
| import uuid |
| import logging |
| import time |
| from typing import Optional |
|
|
| from agentic_reliability_framework.core.models.event import ReliabilityEvent |
|
|
| |
| import app.core.usage_tracker |
| from app.core.usage_tracker import UsageRecord |
|
|
| |
| try: |
| from arf_pricing_calculator.storage.buffer import add_event |
| PRICING_AVAILABLE = True |
| except ImportError: |
| PRICING_AVAILABLE = False |
| add_event = None |
|
|
| |
| try: |
| from opentelemetry import trace |
| from opentelemetry.trace import Status, StatusCode |
| _tracer = trace.get_tracer(__name__) |
| OTEL_AVAILABLE = True |
| except ImportError: |
| OTEL_AVAILABLE = False |
| _tracer = None |
|
|
| logger = logging.getLogger(__name__) |
| router = APIRouter() |
|
|
|
|
| class OutcomeRequest(BaseModel): |
| deterministic_id: str |
| success: bool |
| recorded_by: str |
| notes: str = "" |
|
|
|
|
| class HealingDecisionRequest(BaseModel): |
| event: ReliabilityEvent |
|
|
|
|
| @router.post("/intents/evaluate") |
| async def evaluate_intent_endpoint( |
| request: Request, |
| intent_req: InfrastructureIntentRequest, |
| background_tasks: BackgroundTasks, |
| db: Session = Depends(get_db), |
| idempotency_key: Optional[str] = Header(None, alias="Idempotency-Key"), |
| ): |
| """ |
| Evaluate an infrastructure intent with idempotency and atomic quota consumption. |
| """ |
| |
| span = None |
| if OTEL_AVAILABLE and _tracer: |
| span = _tracer.start_span("governance.evaluate_intent") |
| span.set_attribute("intent_type", intent_req.intent_type) |
| span.set_attribute("environment", str(intent_req.environment)) |
|
|
| start_time = time.time() |
| api_key = request.headers.get("Authorization", "").replace("Bearer ", "") |
| if not api_key: |
| api_key = request.query_params.get("api_key", "unknown") |
|
|
| current_tracker = app.core.usage_tracker.tracker |
| if current_tracker is None: |
| if span: |
| span.set_status(Status(StatusCode.ERROR, "tracker unavailable")) |
| span.end() |
| raise HTTPException(status_code=503, |
| detail="Usage tracking service unavailable") |
|
|
| record = UsageRecord( |
| api_key=api_key, |
| tier=None, |
| timestamp=start_time, |
| endpoint="/api/v1/intents/evaluate", |
| request_body=intent_req.model_dump(), |
| processing_ms=None, |
| ) |
| success, existing_response = current_tracker.consume_quota_and_log( |
| record=record, |
| idempotency_key=idempotency_key |
| ) |
| if not success: |
| if span: |
| span.set_attribute("idempotent_hit", True if existing_response else False) |
| span.end() |
| if existing_response: |
| return existing_response |
| else: |
| raise HTTPException(status_code=429, |
| detail="Monthly evaluation quota exceeded") |
|
|
| try: |
| oss_intent = to_oss_intent(intent_req) |
| risk_engine = request.app.state.risk_engine |
| result = evaluate_intent( |
| engine=risk_engine, |
| intent=oss_intent, |
| cost_estimate=intent_req.estimated_cost, |
| policy_violations=intent_req.policy_violations |
| ) |
|
|
| if span: |
| span.set_attribute("risk_score", result["risk_score"]) |
| span.set_attribute("deterministic_id", str(uuid.uuid4())) |
|
|
| deterministic_id = str(uuid.uuid4()) |
| api_payload = jsonable_encoder(intent_req.model_dump()) |
| oss_payload = jsonable_encoder(oss_intent.model_dump()) |
|
|
| save_evaluated_intent( |
| db=db, |
| deterministic_id=deterministic_id, |
| intent_type=intent_req.intent_type, |
| api_payload=api_payload, |
| oss_payload=oss_payload, |
| environment=str(intent_req.environment), |
| risk_score=result["risk_score"] |
| ) |
|
|
| result["intent_id"] = deterministic_id |
| response_data = result |
|
|
| if current_tracker: |
| background_tasks.add_task( |
| current_tracker._insert_audit_log, |
| UsageRecord( |
| api_key=api_key, |
| tier=None, |
| timestamp=time.time(), |
| endpoint="/api/v1/intents/evaluate/response", |
| request_body=None, |
| response=response_data, |
| processing_ms=(time.time() - start_time) * 1000, |
| ) |
| ) |
|
|
| if span: |
| span.set_attribute("intent_id", deterministic_id) |
| span.set_status(Status(StatusCode.OK)) |
| span.end() |
|
|
| return response_data |
|
|
| except HTTPException: |
| if span: |
| span.set_status(Status(StatusCode.ERROR, "HTTP exception")) |
| span.end() |
| raise |
| except Exception as e: |
| error_msg = str(e) |
| logger.exception("Error in evaluate_intent_endpoint") |
| if span: |
| span.set_status(Status(StatusCode.ERROR, error_msg)) |
| span.record_exception(e) |
| span.end() |
| raise HTTPException(status_code=500, detail=error_msg) |
|
|
|
|
| @router.post("/intents/outcome") |
| async def record_outcome_endpoint( |
| request: Request, |
| outcome: OutcomeRequest, |
| db: Session = Depends(get_db), |
| idempotency_key: Optional[str] = Header(None, alias="Idempotency-Key"), |
| ): |
| """ |
| Record an outcome for a previously evaluated intent. |
| Idempotent based on deterministic_id and success value (handled in service). |
| Also updates the pricing calculator's calibration buffer if available. |
| """ |
| try: |
| risk_engine = request.app.state.risk_engine |
| outcome_record = record_outcome( |
| db=db, |
| deterministic_id=outcome.deterministic_id, |
| success=outcome.success, |
| recorded_by=outcome.recorded_by, |
| notes=outcome.notes, |
| risk_engine=risk_engine, |
| idempotency_key=idempotency_key, |
| ) |
|
|
| if PRICING_AVAILABLE and add_event is not None: |
| try: |
| event = { |
| "run_id": outcome.deterministic_id, |
| "outcome": "success" if outcome.success else "failure", |
| "recorded_at": time.time(), |
| "source": "arf_api_outcome" |
| } |
| add_event(event) |
| logger.info( |
| f"Added outcome to pricing buffer for intent { |
| outcome.deterministic_id}") |
| except Exception as e: |
| logger.warning( |
| f"Failed to update pricing buffer for intent { |
| outcome.deterministic_id}: {e}") |
|
|
| return {"message": "Outcome recorded", "outcome_id": outcome_record.id} |
| except Exception as e: |
| raise HTTPException(status_code=500, detail=str(e)) |
|
|
|
|
| @router.post("/healing/evaluate") |
| async def evaluate_healing_decision_endpoint( |
| request: Request, |
| decision_req: HealingDecisionRequest, |
| background_tasks: BackgroundTasks, |
| idempotency_key: Optional[str] = Header(None, alias="Idempotency-Key"), |
| ): |
| """ |
| Evaluate a healing decision with idempotency and atomic quota consumption. |
| """ |
| |
| span = None |
| if OTEL_AVAILABLE and _tracer: |
| span = _tracer.start_span("governance.evaluate_healing") |
| span.set_attribute("component", decision_req.event.component) |
|
|
| start_time = time.time() |
| api_key = request.headers.get("Authorization", "").replace("Bearer ", "") |
| if not api_key: |
| api_key = request.query_params.get("api_key", "unknown") |
|
|
| current_tracker = app.core.usage_tracker.tracker |
| if current_tracker is None: |
| if span: |
| span.set_status(Status(StatusCode.ERROR, "tracker unavailable")) |
| span.end() |
| raise HTTPException(status_code=503, |
| detail="Usage tracking service unavailable") |
|
|
| record = UsageRecord( |
| api_key=api_key, |
| tier=None, |
| timestamp=start_time, |
| endpoint="/api/v1/healing/evaluate", |
| request_body=decision_req.model_dump(), |
| processing_ms=None, |
| ) |
| success, existing_response = current_tracker.consume_quota_and_log( |
| record=record, |
| idempotency_key=idempotency_key |
| ) |
| if not success: |
| if span: |
| span.set_attribute("idempotent_hit", True if existing_response else False) |
| span.end() |
| if existing_response: |
| return existing_response |
| else: |
| raise HTTPException(status_code=429, |
| detail="Monthly evaluation quota exceeded") |
|
|
| try: |
| policy_engine = request.app.state.policy_engine |
| rag_graph = getattr(request.app.state, "rag_graph", None) |
| model = getattr(request.app.state, "epistemic_model", None) |
| tokenizer = getattr(request.app.state, "epistemic_tokenizer", None) |
|
|
| response_data = evaluate_healing_decision( |
| event=decision_req.event, |
| policy_engine=policy_engine, |
| decision_engine=None, |
| rag_graph=rag_graph, |
| model=model, |
| tokenizer=tokenizer, |
| ) |
|
|
| if span: |
| span.set_attribute("risk_score", response_data.get("risk_score", 0.0)) |
| span.set_attribute("selected_action", response_data.get("selected_action", "unknown")) |
| span.set_status(Status(StatusCode.OK)) |
| span.end() |
|
|
| if current_tracker: |
| background_tasks.add_task( |
| current_tracker._insert_audit_log, |
| UsageRecord( |
| api_key=api_key, |
| tier=None, |
| timestamp=time.time(), |
| endpoint="/api/v1/healing/evaluate/response", |
| request_body=None, |
| response=response_data, |
| processing_ms=(time.time() - start_time) * 1000, |
| ) |
| ) |
| return response_data |
|
|
| except HTTPException: |
| if span: |
| span.set_status(Status(StatusCode.ERROR, "HTTP exception")) |
| span.end() |
| raise |
| except Exception as e: |
| error_msg = str(e) |
| logger.exception("Error in evaluate_healing_decision_endpoint") |
| if span: |
| span.set_status(Status(StatusCode.ERROR, error_msg)) |
| span.record_exception(e) |
| span.end() |
| raise HTTPException(status_code=500, detail=error_msg) |
|
|