petter2025's picture
Upload folder using huggingface_hub
afa4de7 verified
raw
history blame
5.36 kB
"""
Intent Adapter – converts API request payloads to ARF InfrastructureIntent objects.
Strict validation, no dummy fallbacks. All conversions are deterministic.
"""
import logging
from typing import Any, Dict
from agentic_reliability_framework.core.governance.intents import (
ProvisionResourceIntent,
GrantAccessIntent,
DeployConfigurationIntent,
InfrastructureIntent,
)
logger = logging.getLogger(__name__)
class IntentAdapterError(Exception):
"""Raised when intent conversion fails due to invalid input."""
pass
# Allowed values (from the framework's Literal definitions)
VALID_ENVIRONMENTS = {"dev", "staging", "prod", "test"}
VALID_RESOURCE_TYPES = {
"vm",
"storage_account",
"database",
"kubernetes_cluster",
"function_app",
"virtual_network"}
def to_oss_intent(api_request: Any) -> InfrastructureIntent:
"""
Convert an API request object to the corresponding OSS InfrastructureIntent.
"""
# Extract data
if hasattr(api_request, "model_dump"):
data = api_request.model_dump()
elif hasattr(api_request, "dict"):
data = api_request.dict()
else:
data = dict(api_request)
intent_type = data.get("intent_type")
if not intent_type:
raise IntentAdapterError("Missing 'intent_type' in request")
environment = data.get("environment")
if not environment:
raise IntentAdapterError("Missing 'environment' field")
if environment not in VALID_ENVIRONMENTS:
raise IntentAdapterError(
f"Invalid environment: {environment}. Must be one of {VALID_ENVIRONMENTS}")
requester = data.get("requester")
if not requester:
raise IntentAdapterError("Missing 'requester' field")
if intent_type == "provision_resource":
return _to_provision_intent(data, environment, requester)
elif intent_type == "grant_access":
return _to_grant_intent(data, requester) # environment NOT passed
elif intent_type == "deploy_config":
return _to_deploy_intent(data, requester) # environment NOT passed
else:
raise IntentAdapterError(f"Unknown intent_type: {intent_type}")
def _to_provision_intent(data: Dict[str,
Any],
environment: str,
requester: str) -> ProvisionResourceIntent:
resource_type_str = data.get("resource_type")
if not resource_type_str:
raise IntentAdapterError(
"Missing 'resource_type' for provision_resource intent")
if resource_type_str not in VALID_RESOURCE_TYPES:
raise IntentAdapterError(f"Invalid resource_type: {resource_type_str}")
region = data.get("region")
if not region:
raise IntentAdapterError(
"Missing 'region' for provision_resource intent")
size = data.get("size")
if not size:
raise IntentAdapterError(
"Missing 'size' for provision_resource intent")
return ProvisionResourceIntent(
resource_type=resource_type_str,
region=region,
size=size,
environment=environment,
requester=requester,
configuration=data.get("configuration", {}),
provenance=data.get("provenance", {}),
)
def _to_grant_intent(data: Dict[str, Any],
requester: str) -> GrantAccessIntent:
principal = data.get("principal")
if not principal:
raise IntentAdapterError("Missing 'principal' for grant_access intent")
permission_level = data.get("permission_level")
if not permission_level:
raise IntentAdapterError(
"Missing 'permission_level' for grant_access intent")
resource_scope = data.get("resource_scope")
if not resource_scope:
raise IntentAdapterError(
"Missing 'resource_scope' for grant_access intent")
return GrantAccessIntent(
principal=principal,
permission_level=permission_level,
resource_scope=resource_scope,
requester=requester,
justification=data.get("justification", ""),
provenance=data.get("provenance", {}),
)
def _to_deploy_intent(data: Dict[str, Any],
requester: str) -> DeployConfigurationIntent:
service_name = data.get("service_name")
if not service_name:
raise IntentAdapterError(
"Missing 'service_name' for deploy_config intent")
change_scope = data.get("change_scope")
if not change_scope:
raise IntentAdapterError(
"Missing 'change_scope' for deploy_config intent")
deployment_target = data.get("deployment_target")
if not deployment_target:
raise IntentAdapterError(
"Missing 'deployment_target' for deploy_config intent")
# risk_level_hint expects a float; if not a number, set to None
risk_hint = data.get("risk_level_hint")
if risk_hint is not None:
try:
risk_hint = float(risk_hint)
except (TypeError, ValueError):
risk_hint = None
return DeployConfigurationIntent(
service_name=service_name,
change_scope=change_scope,
deployment_target=deployment_target,
requester=requester,
risk_level_hint=risk_hint,
configuration=data.get("configuration", {}),
provenance=data.get("provenance", {}),
)