"""Outcome recording with idempotency, no dummy fallbacks, and timezone-aware timestamps.""" import datetime import logging from typing import Optional, Dict, Any from sqlalchemy.orm import Session from sqlalchemy.exc import IntegrityError from agentic_reliability_framework.core.governance.risk_engine import RiskEngine from agentic_reliability_framework.core.governance.intents import ( InfrastructureIntent, ProvisionResourceIntent, GrantAccessIntent, DeployConfigurationIntent, ) from app.database.models_intents import IntentDB, OutcomeDB, BetaStateDB logger = logging.getLogger(__name__) # --------------------------------------------------------------------------- # NEW: small helper to persist the conjugate posterior state # --------------------------------------------------------------------------- def _persist_beta_state(db: Session, risk_engine: RiskEngine) -> None: """ Write the current Beta posterior parameters to the beta_state table. This is called after every outcome update so that online learning survives restarts. """ try: state = risk_engine.beta_store.get_state() for cat, (alpha, beta) in state.items(): # Upsert: if the category already exists, update it db.merge(BetaStateDB(category=cat.value, alpha=alpha, beta=beta)) db.commit() logger.debug("Persisted Beta posterior parameters to database.") except Exception as e: db.rollback() logger.error("Failed to persist beta state: %s", e) class OutcomeConflictError(Exception): """Raised when an outcome already exists for the same intent with a different result.""" pass def reconstruct_oss_intent_from_json( oss_json: Dict[str, Any]) -> InfrastructureIntent: """Reconstruct OSS intent from stored JSON. Raises ValueError on failure.""" intent_type = oss_json.get("intent_type") if intent_type == "provision_resource": return ProvisionResourceIntent(**oss_json) elif intent_type == "grant_access": return GrantAccessIntent(**oss_json) elif intent_type == "deploy_config": return DeployConfigurationIntent(**oss_json) else: raise ValueError( f"Cannot reconstruct intent from JSON: missing or unknown intent_type {intent_type}") def record_outcome( db: Session, deterministic_id: str, success: bool, recorded_by: Optional[str], notes: Optional[str], risk_engine: RiskEngine, idempotency_key: Optional[str] = None, ) -> OutcomeDB: """ Record an outcome for a previously evaluated intent. Idempotent: calling twice with the same (deterministic_id, success) returns the same record. If the outcome already exists with a different success value, raises OutcomeConflictError. No dummy intents are created. If the OSS intent cannot be reconstructed, the risk engine is NOT updated – we log an error and still record the outcome. Args: db: SQLAlchemy session. deterministic_id: Unique identifier of the original intent. success: Whether the action succeeded (True) or failed (False). recorded_by: Optional user or system identifier. notes: Optional human-readable notes. risk_engine: ARF risk engine instance (may be updated). idempotency_key: Optional caller-provided idempotency token. Returns: The recorded OutcomeDB object. Raises: ValueError: If intent not found or reconstruction fails fatally. OutcomeConflictError: If a conflicting outcome already exists. """ # 1. Fetch the original intent record intent = db.query(IntentDB).filter( IntentDB.deterministic_id == deterministic_id).one_or_none() if not intent: raise ValueError(f"Intent not found: {deterministic_id}") # 2. Idempotency / conflict check with database-level uniqueness existing_outcome = db.query(OutcomeDB).filter( OutcomeDB.intent_id == intent.id).one_or_none() if existing_outcome: if existing_outcome.success == success: return existing_outcome db.rollback() raise OutcomeConflictError( f"Outcome already recorded for intent {deterministic_id} with different result " f"(existing={existing_outcome.success}, new={success})" ) # 3. Create outcome record outcome = OutcomeDB( intent_id=intent.id, success=bool(success), recorded_by=recorded_by, notes=notes, recorded_at=datetime.datetime.now(datetime.timezone.utc), idempotency_key=idempotency_key, ) db.add(outcome) # 4. Attempt to commit; handle duplicate key errors for idempotency try: db.commit() db.refresh(outcome) except IntegrityError as e: db.rollback() if "idempotency_key" in str(e) and idempotency_key: existing = db.query(OutcomeDB).filter( OutcomeDB.idempotency_key == idempotency_key).first() if existing: logger.info( "Idempotent request for key %s, returning existing outcome", idempotency_key) return existing raise # 5. Update RiskEngine ONLY if we can reconstruct a valid OSS intent oss_intent = None if intent.oss_payload: try: oss_intent = reconstruct_oss_intent_from_json(intent.oss_payload) except Exception as e: logger.error( "Failed to reconstruct OSS intent for %s: %s. RiskEngine will NOT be updated.", deterministic_id, e, exc_info=True) else: logger.warning( "No oss_payload stored for intent %s – cannot update RiskEngine.", deterministic_id ) if oss_intent is not None: try: risk_engine.update_outcome(oss_intent, success) # ---------------------------------------------------------------- # PERSISTENCE: after updating the conjugate posterior, write it # ---------------------------------------------------------------- _persist_beta_state(db, risk_engine) except Exception as e: logger.exception( "Failed to update RiskEngine after recording outcome for intent %s: %s", deterministic_id, e) else: logger.info( "Skipped RiskEngine update for intent %s (no valid OSS intent)", deterministic_id ) return outcome