| """ |
| Intent Adapter – converts API request payloads to ARF InfrastructureIntent objects. |
| Strict validation, no dummy fallbacks. All conversions are deterministic. |
| """ |
|
|
| import logging |
| from typing import Any, Dict |
|
|
| from agentic_reliability_framework.core.governance.intents import ( |
| ProvisionResourceIntent, |
| GrantAccessIntent, |
| DeployConfigurationIntent, |
| InfrastructureIntent, |
| ) |
|
|
| logger = logging.getLogger(__name__) |
|
|
|
|
| class IntentAdapterError(Exception): |
| """Raised when intent conversion fails due to invalid input.""" |
| pass |
|
|
|
|
| |
| VALID_ENVIRONMENTS = {"dev", "staging", "prod", "test"} |
| VALID_RESOURCE_TYPES = { |
| "vm", |
| "storage_account", |
| "database", |
| "kubernetes_cluster", |
| "function_app", |
| "virtual_network"} |
|
|
|
|
| def to_oss_intent(api_request: Any) -> InfrastructureIntent: |
| """ |
| Convert an API request object to the corresponding OSS InfrastructureIntent. |
| """ |
| |
| if hasattr(api_request, "model_dump"): |
| data = api_request.model_dump() |
| elif hasattr(api_request, "dict"): |
| data = api_request.dict() |
| else: |
| data = dict(api_request) |
|
|
| intent_type = data.get("intent_type") |
| if not intent_type: |
| raise IntentAdapterError("Missing 'intent_type' in request") |
|
|
| environment = data.get("environment") |
| if not environment: |
| raise IntentAdapterError("Missing 'environment' field") |
| if environment not in VALID_ENVIRONMENTS: |
| raise IntentAdapterError( |
| f"Invalid environment: {environment}. Must be one of {VALID_ENVIRONMENTS}") |
|
|
| requester = data.get("requester") |
| if not requester: |
| raise IntentAdapterError("Missing 'requester' field") |
|
|
| if intent_type == "provision_resource": |
| return _to_provision_intent(data, environment, requester) |
| elif intent_type == "grant_access": |
| return _to_grant_intent(data, requester) |
| elif intent_type == "deploy_config": |
| return _to_deploy_intent(data, requester) |
| else: |
| raise IntentAdapterError(f"Unknown intent_type: {intent_type}") |
|
|
|
|
| def _to_provision_intent(data: Dict[str, |
| Any], |
| environment: str, |
| requester: str) -> ProvisionResourceIntent: |
| resource_type_str = data.get("resource_type") |
| if not resource_type_str: |
| raise IntentAdapterError( |
| "Missing 'resource_type' for provision_resource intent") |
| if resource_type_str not in VALID_RESOURCE_TYPES: |
| raise IntentAdapterError(f"Invalid resource_type: {resource_type_str}") |
|
|
| region = data.get("region") |
| if not region: |
| raise IntentAdapterError( |
| "Missing 'region' for provision_resource intent") |
|
|
| size = data.get("size") |
| if not size: |
| raise IntentAdapterError( |
| "Missing 'size' for provision_resource intent") |
|
|
| return ProvisionResourceIntent( |
| resource_type=resource_type_str, |
| region=region, |
| size=size, |
| environment=environment, |
| requester=requester, |
| configuration=data.get("configuration", {}), |
| provenance=data.get("provenance", {}), |
| ) |
|
|
|
|
| def _to_grant_intent(data: Dict[str, Any], |
| requester: str) -> GrantAccessIntent: |
| principal = data.get("principal") |
| if not principal: |
| raise IntentAdapterError("Missing 'principal' for grant_access intent") |
|
|
| permission_level = data.get("permission_level") |
| if not permission_level: |
| raise IntentAdapterError( |
| "Missing 'permission_level' for grant_access intent") |
|
|
| resource_scope = data.get("resource_scope") |
| if not resource_scope: |
| raise IntentAdapterError( |
| "Missing 'resource_scope' for grant_access intent") |
|
|
| return GrantAccessIntent( |
| principal=principal, |
| permission_level=permission_level, |
| resource_scope=resource_scope, |
| requester=requester, |
| justification=data.get("justification", ""), |
| provenance=data.get("provenance", {}), |
| ) |
|
|
|
|
| def _to_deploy_intent(data: Dict[str, Any], |
| requester: str) -> DeployConfigurationIntent: |
| service_name = data.get("service_name") |
| if not service_name: |
| raise IntentAdapterError( |
| "Missing 'service_name' for deploy_config intent") |
|
|
| change_scope = data.get("change_scope") |
| if not change_scope: |
| raise IntentAdapterError( |
| "Missing 'change_scope' for deploy_config intent") |
|
|
| deployment_target = data.get("deployment_target") |
| if not deployment_target: |
| raise IntentAdapterError( |
| "Missing 'deployment_target' for deploy_config intent") |
|
|
| |
| risk_hint = data.get("risk_level_hint") |
| if risk_hint is not None: |
| try: |
| risk_hint = float(risk_hint) |
| except (TypeError, ValueError): |
| risk_hint = None |
|
|
| return DeployConfigurationIntent( |
| service_name=service_name, |
| change_scope=change_scope, |
| deployment_target=deployment_target, |
| requester=requester, |
| risk_level_hint=risk_hint, |
| configuration=data.get("configuration", {}), |
| provenance=data.get("provenance", {}), |
| ) |
|
|