from fastapi import APIRouter, Depends, HTTPException, Request, BackgroundTasks, Header from fastapi.encoders import jsonable_encoder from sqlalchemy.orm import Session from app.models.infrastructure_intents import InfrastructureIntentRequest from app.services.intent_adapter import to_oss_intent from app.services.risk_service import evaluate_intent, evaluate_healing_decision from app.services.intent_store import save_evaluated_intent from app.services.outcome_service import record_outcome from app.api.deps import get_db from pydantic import BaseModel import uuid import logging import time from typing import Optional from agentic_reliability_framework.core.models.event import ReliabilityEvent # ===== USAGE TRACKER IMPORTS ===== import app.core.usage_tracker from app.core.usage_tracker import UsageRecord # ===== PRICING CALCULATOR INTEGRATION ===== try: from arf_pricing_calculator.storage.buffer import add_event PRICING_AVAILABLE = True except ImportError: PRICING_AVAILABLE = False add_event = None # ===== OpenTelemetry (optional) ===== try: from opentelemetry import trace from opentelemetry.trace import Status, StatusCode _tracer = trace.get_tracer(__name__) OTEL_AVAILABLE = True except ImportError: OTEL_AVAILABLE = False _tracer = None logger = logging.getLogger(__name__) router = APIRouter() class OutcomeRequest(BaseModel): deterministic_id: str success: bool recorded_by: str notes: str = "" class HealingDecisionRequest(BaseModel): event: ReliabilityEvent @router.post("/intents/evaluate") async def evaluate_intent_endpoint( request: Request, intent_req: InfrastructureIntentRequest, background_tasks: BackgroundTasks, db: Session = Depends(get_db), idempotency_key: Optional[str] = Header(None, alias="Idempotency-Key"), ): """ Evaluate an infrastructure intent with idempotency and atomic quota consumption. """ # ── optional trace ────────────────────────────────────── span = None if OTEL_AVAILABLE and _tracer: span = _tracer.start_span("governance.evaluate_intent") span.set_attribute("intent_type", intent_req.intent_type) span.set_attribute("environment", str(intent_req.environment)) start_time = time.time() api_key = request.headers.get("Authorization", "").replace("Bearer ", "") if not api_key: api_key = request.query_params.get("api_key", "unknown") current_tracker = app.core.usage_tracker.tracker if current_tracker is None: if span: span.set_status(Status(StatusCode.ERROR, "tracker unavailable")) span.end() raise HTTPException(status_code=503, detail="Usage tracking service unavailable") record = UsageRecord( api_key=api_key, tier=None, timestamp=start_time, endpoint="/api/v1/intents/evaluate", request_body=intent_req.model_dump(), processing_ms=None, ) success, existing_response = current_tracker.consume_quota_and_log( record=record, idempotency_key=idempotency_key ) if not success: if span: span.set_attribute("idempotent_hit", True if existing_response else False) span.end() if existing_response: return existing_response else: raise HTTPException(status_code=429, detail="Monthly evaluation quota exceeded") try: oss_intent = to_oss_intent(intent_req) risk_engine = request.app.state.risk_engine result = evaluate_intent( engine=risk_engine, intent=oss_intent, cost_estimate=intent_req.estimated_cost, policy_violations=intent_req.policy_violations ) if span: span.set_attribute("risk_score", result["risk_score"]) span.set_attribute("deterministic_id", str(uuid.uuid4())) # will be overwritten later, but fine for trace deterministic_id = str(uuid.uuid4()) api_payload = jsonable_encoder(intent_req.model_dump()) oss_payload = jsonable_encoder(oss_intent.model_dump()) save_evaluated_intent( db=db, deterministic_id=deterministic_id, intent_type=intent_req.intent_type, api_payload=api_payload, oss_payload=oss_payload, environment=str(intent_req.environment), risk_score=result["risk_score"] ) result["intent_id"] = deterministic_id response_data = result if current_tracker: background_tasks.add_task( current_tracker._insert_audit_log, UsageRecord( api_key=api_key, tier=None, timestamp=time.time(), endpoint="/api/v1/intents/evaluate/response", request_body=None, response=response_data, processing_ms=(time.time() - start_time) * 1000, ) ) if span: span.set_attribute("intent_id", deterministic_id) span.set_status(Status(StatusCode.OK)) span.end() return response_data except HTTPException: if span: span.set_status(Status(StatusCode.ERROR, "HTTP exception")) span.end() raise except Exception as e: error_msg = str(e) logger.exception("Error in evaluate_intent_endpoint") if span: span.set_status(Status(StatusCode.ERROR, error_msg)) span.record_exception(e) span.end() raise HTTPException(status_code=500, detail=error_msg) @router.post("/intents/outcome") async def record_outcome_endpoint( request: Request, outcome: OutcomeRequest, db: Session = Depends(get_db), idempotency_key: Optional[str] = Header(None, alias="Idempotency-Key"), ): """ Record an outcome for a previously evaluated intent. Idempotent based on deterministic_id and success value (handled in service). Also updates the pricing calculator's calibration buffer if available. """ try: risk_engine = request.app.state.risk_engine outcome_record = record_outcome( db=db, deterministic_id=outcome.deterministic_id, success=outcome.success, recorded_by=outcome.recorded_by, notes=outcome.notes, risk_engine=risk_engine, idempotency_key=idempotency_key, ) if PRICING_AVAILABLE and add_event is not None: try: event = { "run_id": outcome.deterministic_id, "outcome": "success" if outcome.success else "failure", "recorded_at": time.time(), "source": "arf_api_outcome" } add_event(event) logger.info( f"Added outcome to pricing buffer for intent { outcome.deterministic_id}") except Exception as e: logger.warning( f"Failed to update pricing buffer for intent { outcome.deterministic_id}: {e}") return {"message": "Outcome recorded", "outcome_id": outcome_record.id} except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @router.post("/healing/evaluate") async def evaluate_healing_decision_endpoint( request: Request, decision_req: HealingDecisionRequest, background_tasks: BackgroundTasks, idempotency_key: Optional[str] = Header(None, alias="Idempotency-Key"), ): """ Evaluate a healing decision with idempotency and atomic quota consumption. """ # ── optional trace ────────────────────────────────────── span = None if OTEL_AVAILABLE and _tracer: span = _tracer.start_span("governance.evaluate_healing") span.set_attribute("component", decision_req.event.component) start_time = time.time() api_key = request.headers.get("Authorization", "").replace("Bearer ", "") if not api_key: api_key = request.query_params.get("api_key", "unknown") current_tracker = app.core.usage_tracker.tracker if current_tracker is None: if span: span.set_status(Status(StatusCode.ERROR, "tracker unavailable")) span.end() raise HTTPException(status_code=503, detail="Usage tracking service unavailable") record = UsageRecord( api_key=api_key, tier=None, timestamp=start_time, endpoint="/api/v1/healing/evaluate", request_body=decision_req.model_dump(), processing_ms=None, ) success, existing_response = current_tracker.consume_quota_and_log( record=record, idempotency_key=idempotency_key ) if not success: if span: span.set_attribute("idempotent_hit", True if existing_response else False) span.end() if existing_response: return existing_response else: raise HTTPException(status_code=429, detail="Monthly evaluation quota exceeded") try: policy_engine = request.app.state.policy_engine rag_graph = getattr(request.app.state, "rag_graph", None) model = getattr(request.app.state, "epistemic_model", None) tokenizer = getattr(request.app.state, "epistemic_tokenizer", None) response_data = evaluate_healing_decision( event=decision_req.event, policy_engine=policy_engine, decision_engine=None, rag_graph=rag_graph, model=model, tokenizer=tokenizer, ) if span: span.set_attribute("risk_score", response_data.get("risk_score", 0.0)) span.set_attribute("selected_action", response_data.get("selected_action", "unknown")) span.set_status(Status(StatusCode.OK)) span.end() if current_tracker: background_tasks.add_task( current_tracker._insert_audit_log, UsageRecord( api_key=api_key, tier=None, timestamp=time.time(), endpoint="/api/v1/healing/evaluate/response", request_body=None, response=response_data, processing_ms=(time.time() - start_time) * 1000, ) ) return response_data except HTTPException: if span: span.set_status(Status(StatusCode.ERROR, "HTTP exception")) span.end() raise except Exception as e: error_msg = str(e) logger.exception("Error in evaluate_healing_decision_endpoint") if span: span.set_status(Status(StatusCode.ERROR, error_msg)) span.record_exception(e) span.end() raise HTTPException(status_code=500, detail=error_msg)