Spaces:
Build error
Build error
| """Outcome recording with idempotency, no dummy fallbacks, and timezone-aware timestamps.""" | |
| import datetime | |
| import logging | |
| from typing import Optional, Dict, Any | |
| from sqlalchemy.orm import Session | |
| from sqlalchemy.exc import IntegrityError | |
| from agentic_reliability_framework.core.governance.risk_engine import RiskEngine | |
| from agentic_reliability_framework.core.governance.intents import ( | |
| InfrastructureIntent, | |
| ProvisionResourceIntent, | |
| GrantAccessIntent, | |
| DeployConfigurationIntent, | |
| ) | |
| from app.database.models_intents import IntentDB, OutcomeDB, BetaStateDB | |
| logger = logging.getLogger(__name__) | |
| # --------------------------------------------------------------------------- | |
| # NEW: small helper to persist the conjugate posterior state | |
| # --------------------------------------------------------------------------- | |
| def _persist_beta_state(db: Session, risk_engine: RiskEngine) -> None: | |
| """ | |
| Write the current Beta posterior parameters to the beta_state table. | |
| This is called after every outcome update so that online learning | |
| survives restarts. | |
| """ | |
| try: | |
| state = risk_engine.beta_store.get_state() | |
| for cat, (alpha, beta) in state.items(): | |
| # Upsert: if the category already exists, update it | |
| db.merge(BetaStateDB(category=cat.value, alpha=alpha, beta=beta)) | |
| db.commit() | |
| logger.debug("Persisted Beta posterior parameters to database.") | |
| except Exception as e: | |
| db.rollback() | |
| logger.error("Failed to persist beta state: %s", e) | |
| class OutcomeConflictError(Exception): | |
| """Raised when an outcome already exists for the same intent with a different result.""" | |
| pass | |
| def reconstruct_oss_intent_from_json( | |
| oss_json: Dict[str, Any]) -> InfrastructureIntent: | |
| """Reconstruct OSS intent from stored JSON. Raises ValueError on failure.""" | |
| intent_type = oss_json.get("intent_type") | |
| if intent_type == "provision_resource": | |
| return ProvisionResourceIntent(**oss_json) | |
| elif intent_type == "grant_access": | |
| return GrantAccessIntent(**oss_json) | |
| elif intent_type == "deploy_config": | |
| return DeployConfigurationIntent(**oss_json) | |
| else: | |
| raise ValueError( | |
| f"Cannot reconstruct intent from JSON: missing or unknown intent_type {intent_type}") | |
| def record_outcome( | |
| db: Session, | |
| deterministic_id: str, | |
| success: bool, | |
| recorded_by: Optional[str], | |
| notes: Optional[str], | |
| risk_engine: RiskEngine, | |
| idempotency_key: Optional[str] = None, | |
| ) -> OutcomeDB: | |
| """ | |
| Record an outcome for a previously evaluated intent. | |
| Idempotent: calling twice with the same (deterministic_id, success) returns the same record. | |
| If the outcome already exists with a different success value, raises OutcomeConflictError. | |
| No dummy intents are created. If the OSS intent cannot be reconstructed, the risk engine | |
| is NOT updated – we log an error and still record the outcome. | |
| Args: | |
| db: SQLAlchemy session. | |
| deterministic_id: Unique identifier of the original intent. | |
| success: Whether the action succeeded (True) or failed (False). | |
| recorded_by: Optional user or system identifier. | |
| notes: Optional human-readable notes. | |
| risk_engine: ARF risk engine instance (may be updated). | |
| idempotency_key: Optional caller-provided idempotency token. | |
| Returns: | |
| The recorded OutcomeDB object. | |
| Raises: | |
| ValueError: If intent not found or reconstruction fails fatally. | |
| OutcomeConflictError: If a conflicting outcome already exists. | |
| """ | |
| # 1. Fetch the original intent record | |
| intent = db.query(IntentDB).filter( | |
| IntentDB.deterministic_id == deterministic_id).one_or_none() | |
| if not intent: | |
| raise ValueError(f"Intent not found: {deterministic_id}") | |
| # 2. Idempotency / conflict check with database-level uniqueness | |
| existing_outcome = db.query(OutcomeDB).filter( | |
| OutcomeDB.intent_id == intent.id).one_or_none() | |
| if existing_outcome: | |
| if existing_outcome.success == success: | |
| return existing_outcome | |
| db.rollback() | |
| raise OutcomeConflictError( | |
| f"Outcome already recorded for intent {deterministic_id} with different result " | |
| f"(existing={existing_outcome.success}, new={success})" | |
| ) | |
| # 3. Create outcome record | |
| outcome = OutcomeDB( | |
| intent_id=intent.id, | |
| success=bool(success), | |
| recorded_by=recorded_by, | |
| notes=notes, | |
| recorded_at=datetime.datetime.now(datetime.timezone.utc), | |
| idempotency_key=idempotency_key, | |
| ) | |
| db.add(outcome) | |
| # 4. Attempt to commit; handle duplicate key errors for idempotency | |
| try: | |
| db.commit() | |
| db.refresh(outcome) | |
| except IntegrityError as e: | |
| db.rollback() | |
| if "idempotency_key" in str(e) and idempotency_key: | |
| existing = db.query(OutcomeDB).filter( | |
| OutcomeDB.idempotency_key == idempotency_key).first() | |
| if existing: | |
| logger.info( | |
| "Idempotent request for key %s, returning existing outcome", | |
| idempotency_key) | |
| return existing | |
| raise | |
| # 5. Update RiskEngine ONLY if we can reconstruct a valid OSS intent | |
| oss_intent = None | |
| if intent.oss_payload: | |
| try: | |
| oss_intent = reconstruct_oss_intent_from_json(intent.oss_payload) | |
| except Exception as e: | |
| logger.error( | |
| "Failed to reconstruct OSS intent for %s: %s. RiskEngine will NOT be updated.", | |
| deterministic_id, | |
| e, | |
| exc_info=True) | |
| else: | |
| logger.warning( | |
| "No oss_payload stored for intent %s – cannot update RiskEngine.", | |
| deterministic_id | |
| ) | |
| if oss_intent is not None: | |
| try: | |
| risk_engine.update_outcome(oss_intent, success) | |
| # ---------------------------------------------------------------- | |
| # PERSISTENCE: after updating the conjugate posterior, write it | |
| # ---------------------------------------------------------------- | |
| _persist_beta_state(db, risk_engine) | |
| except Exception as e: | |
| logger.exception( | |
| "Failed to update RiskEngine after recording outcome for intent %s: %s", | |
| deterministic_id, | |
| e) | |
| else: | |
| logger.info( | |
| "Skipped RiskEngine update for intent %s (no valid OSS intent)", | |
| deterministic_id | |
| ) | |
| return outcome | |