Spaces:
Build error
Build error
| from fastapi import APIRouter, Depends, HTTPException, Request, BackgroundTasks, Header | |
| from fastapi.encoders import jsonable_encoder | |
| from sqlalchemy.orm import Session | |
| from app.models.infrastructure_intents import InfrastructureIntentRequest | |
| from app.services.intent_adapter import to_oss_intent | |
| from app.services.risk_service import evaluate_intent, evaluate_healing_decision | |
| from app.services.intent_store import save_evaluated_intent | |
| from app.services.outcome_service import record_outcome | |
| from app.api.deps import get_db | |
| from pydantic import BaseModel | |
| import uuid | |
| import logging | |
| import time | |
| from typing import Optional | |
| from agentic_reliability_framework.core.models.event import ReliabilityEvent | |
| # ===== USAGE TRACKER IMPORTS ===== | |
| import app.core.usage_tracker | |
| from app.core.usage_tracker import UsageRecord | |
| # ===== PRICING CALCULATOR INTEGRATION ===== | |
| try: | |
| from arf_pricing_calculator.storage.buffer import add_event | |
| PRICING_AVAILABLE = True | |
| except ImportError: | |
| PRICING_AVAILABLE = False | |
| add_event = None | |
| # ===== OpenTelemetry (optional) ===== | |
| try: | |
| from opentelemetry import trace | |
| from opentelemetry.trace import Status, StatusCode | |
| _tracer = trace.get_tracer(__name__) | |
| OTEL_AVAILABLE = True | |
| except ImportError: | |
| OTEL_AVAILABLE = False | |
| _tracer = None | |
| logger = logging.getLogger(__name__) | |
| router = APIRouter() | |
| class OutcomeRequest(BaseModel): | |
| deterministic_id: str | |
| success: bool | |
| recorded_by: str | |
| notes: str = "" | |
| class HealingDecisionRequest(BaseModel): | |
| event: ReliabilityEvent | |
| async def evaluate_intent_endpoint( | |
| request: Request, | |
| intent_req: InfrastructureIntentRequest, | |
| background_tasks: BackgroundTasks, | |
| db: Session = Depends(get_db), | |
| idempotency_key: Optional[str] = Header(None, alias="Idempotency-Key"), | |
| ): | |
| """ | |
| Evaluate an infrastructure intent with idempotency and atomic quota consumption. | |
| """ | |
| # ββ optional trace ββββββββββββββββββββββββββββββββββββββ | |
| span = None | |
| if OTEL_AVAILABLE and _tracer: | |
| span = _tracer.start_span("governance.evaluate_intent") | |
| span.set_attribute("intent_type", intent_req.intent_type) | |
| span.set_attribute("environment", str(intent_req.environment)) | |
| start_time = time.time() | |
| api_key = request.headers.get("Authorization", "").replace("Bearer ", "") | |
| if not api_key: | |
| api_key = request.query_params.get("api_key", "unknown") | |
| current_tracker = app.core.usage_tracker.tracker | |
| if current_tracker is None: | |
| if span: | |
| span.set_status(Status(StatusCode.ERROR, "tracker unavailable")) | |
| span.end() | |
| raise HTTPException(status_code=503, | |
| detail="Usage tracking service unavailable") | |
| record = UsageRecord( | |
| api_key=api_key, | |
| tier=None, | |
| timestamp=start_time, | |
| endpoint="/api/v1/intents/evaluate", | |
| request_body=intent_req.model_dump(), | |
| processing_ms=None, | |
| ) | |
| success, existing_response = current_tracker.consume_quota_and_log( | |
| record=record, | |
| idempotency_key=idempotency_key | |
| ) | |
| if not success: | |
| if span: | |
| span.set_attribute("idempotent_hit", True if existing_response else False) | |
| span.end() | |
| if existing_response: | |
| return existing_response | |
| else: | |
| raise HTTPException(status_code=429, | |
| detail="Monthly evaluation quota exceeded") | |
| try: | |
| oss_intent = to_oss_intent(intent_req) | |
| risk_engine = request.app.state.risk_engine | |
| result = evaluate_intent( | |
| engine=risk_engine, | |
| intent=oss_intent, | |
| cost_estimate=intent_req.estimated_cost, | |
| policy_violations=intent_req.policy_violations | |
| ) | |
| if span: | |
| span.set_attribute("risk_score", result["risk_score"]) | |
| span.set_attribute("deterministic_id", str(uuid.uuid4())) # will be overwritten later, but fine for trace | |
| deterministic_id = str(uuid.uuid4()) | |
| api_payload = jsonable_encoder(intent_req.model_dump()) | |
| oss_payload = jsonable_encoder(oss_intent.model_dump()) | |
| save_evaluated_intent( | |
| db=db, | |
| deterministic_id=deterministic_id, | |
| intent_type=intent_req.intent_type, | |
| api_payload=api_payload, | |
| oss_payload=oss_payload, | |
| environment=str(intent_req.environment), | |
| risk_score=result["risk_score"] | |
| ) | |
| result["intent_id"] = deterministic_id | |
| response_data = result | |
| if current_tracker: | |
| background_tasks.add_task( | |
| current_tracker._insert_audit_log, | |
| UsageRecord( | |
| api_key=api_key, | |
| tier=None, | |
| timestamp=time.time(), | |
| endpoint="/api/v1/intents/evaluate/response", | |
| request_body=None, | |
| response=response_data, | |
| processing_ms=(time.time() - start_time) * 1000, | |
| ) | |
| ) | |
| if span: | |
| span.set_attribute("intent_id", deterministic_id) | |
| span.set_status(Status(StatusCode.OK)) | |
| span.end() | |
| return response_data | |
| except HTTPException: | |
| if span: | |
| span.set_status(Status(StatusCode.ERROR, "HTTP exception")) | |
| span.end() | |
| raise | |
| except Exception as e: | |
| error_msg = str(e) | |
| logger.exception("Error in evaluate_intent_endpoint") | |
| if span: | |
| span.set_status(Status(StatusCode.ERROR, error_msg)) | |
| span.record_exception(e) | |
| span.end() | |
| raise HTTPException(status_code=500, detail=error_msg) | |
| async def record_outcome_endpoint( | |
| request: Request, | |
| outcome: OutcomeRequest, | |
| db: Session = Depends(get_db), | |
| idempotency_key: Optional[str] = Header(None, alias="Idempotency-Key"), | |
| ): | |
| """ | |
| Record an outcome for a previously evaluated intent. | |
| Idempotent based on deterministic_id and success value (handled in service). | |
| Also updates the pricing calculator's calibration buffer if available. | |
| """ | |
| try: | |
| risk_engine = request.app.state.risk_engine | |
| outcome_record = record_outcome( | |
| db=db, | |
| deterministic_id=outcome.deterministic_id, | |
| success=outcome.success, | |
| recorded_by=outcome.recorded_by, | |
| notes=outcome.notes, | |
| risk_engine=risk_engine, | |
| idempotency_key=idempotency_key, | |
| ) | |
| if PRICING_AVAILABLE and add_event is not None: | |
| try: | |
| event = { | |
| "run_id": outcome.deterministic_id, | |
| "outcome": "success" if outcome.success else "failure", | |
| "recorded_at": time.time(), | |
| "source": "arf_api_outcome" | |
| } | |
| add_event(event) | |
| logger.info( | |
| f"Added outcome to pricing buffer for intent { | |
| outcome.deterministic_id}") | |
| except Exception as e: | |
| logger.warning( | |
| f"Failed to update pricing buffer for intent { | |
| outcome.deterministic_id}: {e}") | |
| return {"message": "Outcome recorded", "outcome_id": outcome_record.id} | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| async def evaluate_healing_decision_endpoint( | |
| request: Request, | |
| decision_req: HealingDecisionRequest, | |
| background_tasks: BackgroundTasks, | |
| idempotency_key: Optional[str] = Header(None, alias="Idempotency-Key"), | |
| ): | |
| """ | |
| Evaluate a healing decision with idempotency and atomic quota consumption. | |
| """ | |
| # ββ optional trace ββββββββββββββββββββββββββββββββββββββ | |
| span = None | |
| if OTEL_AVAILABLE and _tracer: | |
| span = _tracer.start_span("governance.evaluate_healing") | |
| span.set_attribute("component", decision_req.event.component) | |
| start_time = time.time() | |
| api_key = request.headers.get("Authorization", "").replace("Bearer ", "") | |
| if not api_key: | |
| api_key = request.query_params.get("api_key", "unknown") | |
| current_tracker = app.core.usage_tracker.tracker | |
| if current_tracker is None: | |
| if span: | |
| span.set_status(Status(StatusCode.ERROR, "tracker unavailable")) | |
| span.end() | |
| raise HTTPException(status_code=503, | |
| detail="Usage tracking service unavailable") | |
| record = UsageRecord( | |
| api_key=api_key, | |
| tier=None, | |
| timestamp=start_time, | |
| endpoint="/api/v1/healing/evaluate", | |
| request_body=decision_req.model_dump(), | |
| processing_ms=None, | |
| ) | |
| success, existing_response = current_tracker.consume_quota_and_log( | |
| record=record, | |
| idempotency_key=idempotency_key | |
| ) | |
| if not success: | |
| if span: | |
| span.set_attribute("idempotent_hit", True if existing_response else False) | |
| span.end() | |
| if existing_response: | |
| return existing_response | |
| else: | |
| raise HTTPException(status_code=429, | |
| detail="Monthly evaluation quota exceeded") | |
| try: | |
| policy_engine = request.app.state.policy_engine | |
| rag_graph = getattr(request.app.state, "rag_graph", None) | |
| model = getattr(request.app.state, "epistemic_model", None) | |
| tokenizer = getattr(request.app.state, "epistemic_tokenizer", None) | |
| response_data = evaluate_healing_decision( | |
| event=decision_req.event, | |
| policy_engine=policy_engine, | |
| decision_engine=None, | |
| rag_graph=rag_graph, | |
| model=model, | |
| tokenizer=tokenizer, | |
| ) | |
| if span: | |
| span.set_attribute("risk_score", response_data.get("risk_score", 0.0)) | |
| span.set_attribute("selected_action", response_data.get("selected_action", "unknown")) | |
| span.set_status(Status(StatusCode.OK)) | |
| span.end() | |
| if current_tracker: | |
| background_tasks.add_task( | |
| current_tracker._insert_audit_log, | |
| UsageRecord( | |
| api_key=api_key, | |
| tier=None, | |
| timestamp=time.time(), | |
| endpoint="/api/v1/healing/evaluate/response", | |
| request_body=None, | |
| response=response_data, | |
| processing_ms=(time.time() - start_time) * 1000, | |
| ) | |
| ) | |
| return response_data | |
| except HTTPException: | |
| if span: | |
| span.set_status(Status(StatusCode.ERROR, "HTTP exception")) | |
| span.end() | |
| raise | |
| except Exception as e: | |
| error_msg = str(e) | |
| logger.exception("Error in evaluate_healing_decision_endpoint") | |
| if span: | |
| span.set_status(Status(StatusCode.ERROR, error_msg)) | |
| span.record_exception(e) | |
| span.end() | |
| raise HTTPException(status_code=500, detail=error_msg) | |