| """Outcome recording with idempotency, no dummy fallbacks, and timezone-aware timestamps.""" |
|
|
| import datetime |
| import logging |
| from typing import Optional, Dict, Any |
|
|
| from sqlalchemy.orm import Session |
| from sqlalchemy.exc import IntegrityError |
|
|
| from agentic_reliability_framework.core.governance.risk_engine import RiskEngine |
| from agentic_reliability_framework.core.governance.intents import ( |
| InfrastructureIntent, |
| ProvisionResourceIntent, |
| GrantAccessIntent, |
| DeployConfigurationIntent, |
| ) |
| from app.database.models_intents import IntentDB, OutcomeDB, BetaStateDB |
|
|
| logger = logging.getLogger(__name__) |
|
|
|
|
| |
| |
| |
| def _persist_beta_state(db: Session, risk_engine: RiskEngine) -> None: |
| """ |
| Write the current Beta posterior parameters to the beta_state table. |
| This is called after every outcome update so that online learning |
| survives restarts. |
| """ |
| try: |
| state = risk_engine.beta_store.get_state() |
| for cat, (alpha, beta) in state.items(): |
| |
| db.merge(BetaStateDB(category=cat.value, alpha=alpha, beta=beta)) |
| db.commit() |
| logger.debug("Persisted Beta posterior parameters to database.") |
| except Exception as e: |
| db.rollback() |
| logger.error("Failed to persist beta state: %s", e) |
|
|
|
|
| class OutcomeConflictError(Exception): |
| """Raised when an outcome already exists for the same intent with a different result.""" |
| pass |
|
|
|
|
| def reconstruct_oss_intent_from_json( |
| oss_json: Dict[str, Any]) -> InfrastructureIntent: |
| """Reconstruct OSS intent from stored JSON. Raises ValueError on failure.""" |
| intent_type = oss_json.get("intent_type") |
| if intent_type == "provision_resource": |
| return ProvisionResourceIntent(**oss_json) |
| elif intent_type == "grant_access": |
| return GrantAccessIntent(**oss_json) |
| elif intent_type == "deploy_config": |
| return DeployConfigurationIntent(**oss_json) |
| else: |
| raise ValueError( |
| f"Cannot reconstruct intent from JSON: missing or unknown intent_type {intent_type}") |
|
|
|
|
| def record_outcome( |
| db: Session, |
| deterministic_id: str, |
| success: bool, |
| recorded_by: Optional[str], |
| notes: Optional[str], |
| risk_engine: RiskEngine, |
| idempotency_key: Optional[str] = None, |
| ) -> OutcomeDB: |
| """ |
| Record an outcome for a previously evaluated intent. |
| |
| Idempotent: calling twice with the same (deterministic_id, success) returns the same record. |
| If the outcome already exists with a different success value, raises OutcomeConflictError. |
| |
| No dummy intents are created. If the OSS intent cannot be reconstructed, the risk engine |
| is NOT updated – we log an error and still record the outcome. |
| |
| Args: |
| db: SQLAlchemy session. |
| deterministic_id: Unique identifier of the original intent. |
| success: Whether the action succeeded (True) or failed (False). |
| recorded_by: Optional user or system identifier. |
| notes: Optional human-readable notes. |
| risk_engine: ARF risk engine instance (may be updated). |
| idempotency_key: Optional caller-provided idempotency token. |
| |
| Returns: |
| The recorded OutcomeDB object. |
| |
| Raises: |
| ValueError: If intent not found or reconstruction fails fatally. |
| OutcomeConflictError: If a conflicting outcome already exists. |
| """ |
| |
| intent = db.query(IntentDB).filter( |
| IntentDB.deterministic_id == deterministic_id).one_or_none() |
| if not intent: |
| raise ValueError(f"Intent not found: {deterministic_id}") |
|
|
| |
| existing_outcome = db.query(OutcomeDB).filter( |
| OutcomeDB.intent_id == intent.id).one_or_none() |
| if existing_outcome: |
| if existing_outcome.success == success: |
| return existing_outcome |
| db.rollback() |
| raise OutcomeConflictError( |
| f"Outcome already recorded for intent {deterministic_id} with different result " |
| f"(existing={existing_outcome.success}, new={success})" |
| ) |
|
|
| |
| outcome = OutcomeDB( |
| intent_id=intent.id, |
| success=bool(success), |
| recorded_by=recorded_by, |
| notes=notes, |
| recorded_at=datetime.datetime.now(datetime.timezone.utc), |
| idempotency_key=idempotency_key, |
| ) |
| db.add(outcome) |
|
|
| |
| try: |
| db.commit() |
| db.refresh(outcome) |
| except IntegrityError as e: |
| db.rollback() |
| if "idempotency_key" in str(e) and idempotency_key: |
| existing = db.query(OutcomeDB).filter( |
| OutcomeDB.idempotency_key == idempotency_key).first() |
| if existing: |
| logger.info( |
| "Idempotent request for key %s, returning existing outcome", |
| idempotency_key) |
| return existing |
| raise |
|
|
| |
| oss_intent = None |
| if intent.oss_payload: |
| try: |
| oss_intent = reconstruct_oss_intent_from_json(intent.oss_payload) |
| except Exception as e: |
| logger.error( |
| "Failed to reconstruct OSS intent for %s: %s. RiskEngine will NOT be updated.", |
| deterministic_id, |
| e, |
| exc_info=True) |
| else: |
| logger.warning( |
| "No oss_payload stored for intent %s – cannot update RiskEngine.", |
| deterministic_id |
| ) |
|
|
| if oss_intent is not None: |
| try: |
| risk_engine.update_outcome(oss_intent, success) |
|
|
| |
| |
| |
| _persist_beta_state(db, risk_engine) |
|
|
| except Exception as e: |
| logger.exception( |
| "Failed to update RiskEngine after recording outcome for intent %s: %s", |
| deterministic_id, |
| e) |
| else: |
| logger.info( |
| "Skipped RiskEngine update for intent %s (no valid OSS intent)", |
| deterministic_id |
| ) |
|
|
| return outcome |
|
|