File size: 6,608 Bytes
afa4de7 2d521fd afa4de7 2d521fd 30d653d afa4de7 30d653d afa4de7 2d521fd afa4de7 2d521fd afa4de7 2d521fd afa4de7 2d521fd afa4de7 2d521fd afa4de7 2d521fd afa4de7 2d521fd afa4de7 2d521fd afa4de7 2d521fd afa4de7 2d521fd afa4de7 2d521fd afa4de7 2d521fd afa4de7 2d521fd afa4de7 2d521fd afa4de7 2d521fd afa4de7 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 | """Outcome recording with idempotency, no dummy fallbacks, and timezone-aware timestamps."""
import datetime
import logging
from typing import Optional, Dict, Any
from sqlalchemy.orm import Session
from sqlalchemy.exc import IntegrityError
from agentic_reliability_framework.core.governance.risk_engine import RiskEngine
from agentic_reliability_framework.core.governance.intents import (
InfrastructureIntent,
ProvisionResourceIntent,
GrantAccessIntent,
DeployConfigurationIntent,
)
from app.database.models_intents import IntentDB, OutcomeDB, BetaStateDB
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# NEW: small helper to persist the conjugate posterior state
# ---------------------------------------------------------------------------
def _persist_beta_state(db: Session, risk_engine: RiskEngine) -> None:
"""
Write the current Beta posterior parameters to the beta_state table.
This is called after every outcome update so that online learning
survives restarts.
"""
try:
state = risk_engine.beta_store.get_state()
for cat, (alpha, beta) in state.items():
# Upsert: if the category already exists, update it
db.merge(BetaStateDB(category=cat.value, alpha=alpha, beta=beta))
db.commit()
logger.debug("Persisted Beta posterior parameters to database.")
except Exception as e:
db.rollback()
logger.error("Failed to persist beta state: %s", e)
class OutcomeConflictError(Exception):
"""Raised when an outcome already exists for the same intent with a different result."""
pass
def reconstruct_oss_intent_from_json(
oss_json: Dict[str, Any]) -> InfrastructureIntent:
"""Reconstruct OSS intent from stored JSON. Raises ValueError on failure."""
intent_type = oss_json.get("intent_type")
if intent_type == "provision_resource":
return ProvisionResourceIntent(**oss_json)
elif intent_type == "grant_access":
return GrantAccessIntent(**oss_json)
elif intent_type == "deploy_config":
return DeployConfigurationIntent(**oss_json)
else:
raise ValueError(
f"Cannot reconstruct intent from JSON: missing or unknown intent_type {intent_type}")
def record_outcome(
db: Session,
deterministic_id: str,
success: bool,
recorded_by: Optional[str],
notes: Optional[str],
risk_engine: RiskEngine,
idempotency_key: Optional[str] = None,
) -> OutcomeDB:
"""
Record an outcome for a previously evaluated intent.
Idempotent: calling twice with the same (deterministic_id, success) returns the same record.
If the outcome already exists with a different success value, raises OutcomeConflictError.
No dummy intents are created. If the OSS intent cannot be reconstructed, the risk engine
is NOT updated – we log an error and still record the outcome.
Args:
db: SQLAlchemy session.
deterministic_id: Unique identifier of the original intent.
success: Whether the action succeeded (True) or failed (False).
recorded_by: Optional user or system identifier.
notes: Optional human-readable notes.
risk_engine: ARF risk engine instance (may be updated).
idempotency_key: Optional caller-provided idempotency token.
Returns:
The recorded OutcomeDB object.
Raises:
ValueError: If intent not found or reconstruction fails fatally.
OutcomeConflictError: If a conflicting outcome already exists.
"""
# 1. Fetch the original intent record
intent = db.query(IntentDB).filter(
IntentDB.deterministic_id == deterministic_id).one_or_none()
if not intent:
raise ValueError(f"Intent not found: {deterministic_id}")
# 2. Idempotency / conflict check with database-level uniqueness
existing_outcome = db.query(OutcomeDB).filter(
OutcomeDB.intent_id == intent.id).one_or_none()
if existing_outcome:
if existing_outcome.success == success:
return existing_outcome
db.rollback()
raise OutcomeConflictError(
f"Outcome already recorded for intent {deterministic_id} with different result "
f"(existing={existing_outcome.success}, new={success})"
)
# 3. Create outcome record
outcome = OutcomeDB(
intent_id=intent.id,
success=bool(success),
recorded_by=recorded_by,
notes=notes,
recorded_at=datetime.datetime.now(datetime.timezone.utc),
idempotency_key=idempotency_key,
)
db.add(outcome)
# 4. Attempt to commit; handle duplicate key errors for idempotency
try:
db.commit()
db.refresh(outcome)
except IntegrityError as e:
db.rollback()
if "idempotency_key" in str(e) and idempotency_key:
existing = db.query(OutcomeDB).filter(
OutcomeDB.idempotency_key == idempotency_key).first()
if existing:
logger.info(
"Idempotent request for key %s, returning existing outcome",
idempotency_key)
return existing
raise
# 5. Update RiskEngine ONLY if we can reconstruct a valid OSS intent
oss_intent = None
if intent.oss_payload:
try:
oss_intent = reconstruct_oss_intent_from_json(intent.oss_payload)
except Exception as e:
logger.error(
"Failed to reconstruct OSS intent for %s: %s. RiskEngine will NOT be updated.",
deterministic_id,
e,
exc_info=True)
else:
logger.warning(
"No oss_payload stored for intent %s – cannot update RiskEngine.",
deterministic_id
)
if oss_intent is not None:
try:
risk_engine.update_outcome(oss_intent, success)
# ----------------------------------------------------------------
# PERSISTENCE: after updating the conjugate posterior, write it
# ----------------------------------------------------------------
_persist_beta_state(db, risk_engine)
except Exception as e:
logger.exception(
"Failed to update RiskEngine after recording outcome for intent %s: %s",
deterministic_id,
e)
else:
logger.info(
"Skipped RiskEngine update for intent %s (no valid OSS intent)",
deterministic_id
)
return outcome
|