petter2025's picture
Upload folder using huggingface_hub
afa4de7 verified
raw
history blame
6.61 kB
"""Outcome recording with idempotency, no dummy fallbacks, and timezone-aware timestamps."""
import datetime
import logging
from typing import Optional, Dict, Any
from sqlalchemy.orm import Session
from sqlalchemy.exc import IntegrityError
from agentic_reliability_framework.core.governance.risk_engine import RiskEngine
from agentic_reliability_framework.core.governance.intents import (
InfrastructureIntent,
ProvisionResourceIntent,
GrantAccessIntent,
DeployConfigurationIntent,
)
from app.database.models_intents import IntentDB, OutcomeDB, BetaStateDB
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# NEW: small helper to persist the conjugate posterior state
# ---------------------------------------------------------------------------
def _persist_beta_state(db: Session, risk_engine: RiskEngine) -> None:
"""
Write the current Beta posterior parameters to the beta_state table.
This is called after every outcome update so that online learning
survives restarts.
"""
try:
state = risk_engine.beta_store.get_state()
for cat, (alpha, beta) in state.items():
# Upsert: if the category already exists, update it
db.merge(BetaStateDB(category=cat.value, alpha=alpha, beta=beta))
db.commit()
logger.debug("Persisted Beta posterior parameters to database.")
except Exception as e:
db.rollback()
logger.error("Failed to persist beta state: %s", e)
class OutcomeConflictError(Exception):
"""Raised when an outcome already exists for the same intent with a different result."""
pass
def reconstruct_oss_intent_from_json(
oss_json: Dict[str, Any]) -> InfrastructureIntent:
"""Reconstruct OSS intent from stored JSON. Raises ValueError on failure."""
intent_type = oss_json.get("intent_type")
if intent_type == "provision_resource":
return ProvisionResourceIntent(**oss_json)
elif intent_type == "grant_access":
return GrantAccessIntent(**oss_json)
elif intent_type == "deploy_config":
return DeployConfigurationIntent(**oss_json)
else:
raise ValueError(
f"Cannot reconstruct intent from JSON: missing or unknown intent_type {intent_type}")
def record_outcome(
db: Session,
deterministic_id: str,
success: bool,
recorded_by: Optional[str],
notes: Optional[str],
risk_engine: RiskEngine,
idempotency_key: Optional[str] = None,
) -> OutcomeDB:
"""
Record an outcome for a previously evaluated intent.
Idempotent: calling twice with the same (deterministic_id, success) returns the same record.
If the outcome already exists with a different success value, raises OutcomeConflictError.
No dummy intents are created. If the OSS intent cannot be reconstructed, the risk engine
is NOT updated – we log an error and still record the outcome.
Args:
db: SQLAlchemy session.
deterministic_id: Unique identifier of the original intent.
success: Whether the action succeeded (True) or failed (False).
recorded_by: Optional user or system identifier.
notes: Optional human-readable notes.
risk_engine: ARF risk engine instance (may be updated).
idempotency_key: Optional caller-provided idempotency token.
Returns:
The recorded OutcomeDB object.
Raises:
ValueError: If intent not found or reconstruction fails fatally.
OutcomeConflictError: If a conflicting outcome already exists.
"""
# 1. Fetch the original intent record
intent = db.query(IntentDB).filter(
IntentDB.deterministic_id == deterministic_id).one_or_none()
if not intent:
raise ValueError(f"Intent not found: {deterministic_id}")
# 2. Idempotency / conflict check with database-level uniqueness
existing_outcome = db.query(OutcomeDB).filter(
OutcomeDB.intent_id == intent.id).one_or_none()
if existing_outcome:
if existing_outcome.success == success:
return existing_outcome
db.rollback()
raise OutcomeConflictError(
f"Outcome already recorded for intent {deterministic_id} with different result "
f"(existing={existing_outcome.success}, new={success})"
)
# 3. Create outcome record
outcome = OutcomeDB(
intent_id=intent.id,
success=bool(success),
recorded_by=recorded_by,
notes=notes,
recorded_at=datetime.datetime.now(datetime.timezone.utc),
idempotency_key=idempotency_key,
)
db.add(outcome)
# 4. Attempt to commit; handle duplicate key errors for idempotency
try:
db.commit()
db.refresh(outcome)
except IntegrityError as e:
db.rollback()
if "idempotency_key" in str(e) and idempotency_key:
existing = db.query(OutcomeDB).filter(
OutcomeDB.idempotency_key == idempotency_key).first()
if existing:
logger.info(
"Idempotent request for key %s, returning existing outcome",
idempotency_key)
return existing
raise
# 5. Update RiskEngine ONLY if we can reconstruct a valid OSS intent
oss_intent = None
if intent.oss_payload:
try:
oss_intent = reconstruct_oss_intent_from_json(intent.oss_payload)
except Exception as e:
logger.error(
"Failed to reconstruct OSS intent for %s: %s. RiskEngine will NOT be updated.",
deterministic_id,
e,
exc_info=True)
else:
logger.warning(
"No oss_payload stored for intent %s – cannot update RiskEngine.",
deterministic_id
)
if oss_intent is not None:
try:
risk_engine.update_outcome(oss_intent, success)
# ----------------------------------------------------------------
# PERSISTENCE: after updating the conjugate posterior, write it
# ----------------------------------------------------------------
_persist_beta_state(db, risk_engine)
except Exception as e:
logger.exception(
"Failed to update RiskEngine after recording outcome for intent %s: %s",
deterministic_id,
e)
else:
logger.info(
"Skipped RiskEngine update for intent %s (no valid OSS intent)",
deterministic_id
)
return outcome