Spaces:
Build error
Build error
| """ | |
| Intent Adapter – converts API request payloads to ARF InfrastructureIntent objects. | |
| Strict validation, no dummy fallbacks. All conversions are deterministic. | |
| """ | |
| import logging | |
| from typing import Any, Dict | |
| from agentic_reliability_framework.core.governance.intents import ( | |
| ProvisionResourceIntent, | |
| GrantAccessIntent, | |
| DeployConfigurationIntent, | |
| InfrastructureIntent, | |
| ) | |
| logger = logging.getLogger(__name__) | |
| class IntentAdapterError(Exception): | |
| """Raised when intent conversion fails due to invalid input.""" | |
| pass | |
| # Allowed values (from the framework's Literal definitions) | |
| VALID_ENVIRONMENTS = {"dev", "staging", "prod", "test"} | |
| VALID_RESOURCE_TYPES = { | |
| "vm", | |
| "storage_account", | |
| "database", | |
| "kubernetes_cluster", | |
| "function_app", | |
| "virtual_network"} | |
| def to_oss_intent(api_request: Any) -> InfrastructureIntent: | |
| """ | |
| Convert an API request object to the corresponding OSS InfrastructureIntent. | |
| """ | |
| # Extract data | |
| if hasattr(api_request, "model_dump"): | |
| data = api_request.model_dump() | |
| elif hasattr(api_request, "dict"): | |
| data = api_request.dict() | |
| else: | |
| data = dict(api_request) | |
| intent_type = data.get("intent_type") | |
| if not intent_type: | |
| raise IntentAdapterError("Missing 'intent_type' in request") | |
| environment = data.get("environment") | |
| if not environment: | |
| raise IntentAdapterError("Missing 'environment' field") | |
| if environment not in VALID_ENVIRONMENTS: | |
| raise IntentAdapterError( | |
| f"Invalid environment: {environment}. Must be one of {VALID_ENVIRONMENTS}") | |
| requester = data.get("requester") | |
| if not requester: | |
| raise IntentAdapterError("Missing 'requester' field") | |
| if intent_type == "provision_resource": | |
| return _to_provision_intent(data, environment, requester) | |
| elif intent_type == "grant_access": | |
| return _to_grant_intent(data, requester) # environment NOT passed | |
| elif intent_type == "deploy_config": | |
| return _to_deploy_intent(data, requester) # environment NOT passed | |
| else: | |
| raise IntentAdapterError(f"Unknown intent_type: {intent_type}") | |
| def _to_provision_intent(data: Dict[str, | |
| Any], | |
| environment: str, | |
| requester: str) -> ProvisionResourceIntent: | |
| resource_type_str = data.get("resource_type") | |
| if not resource_type_str: | |
| raise IntentAdapterError( | |
| "Missing 'resource_type' for provision_resource intent") | |
| if resource_type_str not in VALID_RESOURCE_TYPES: | |
| raise IntentAdapterError(f"Invalid resource_type: {resource_type_str}") | |
| region = data.get("region") | |
| if not region: | |
| raise IntentAdapterError( | |
| "Missing 'region' for provision_resource intent") | |
| size = data.get("size") | |
| if not size: | |
| raise IntentAdapterError( | |
| "Missing 'size' for provision_resource intent") | |
| return ProvisionResourceIntent( | |
| resource_type=resource_type_str, | |
| region=region, | |
| size=size, | |
| environment=environment, | |
| requester=requester, | |
| configuration=data.get("configuration", {}), | |
| provenance=data.get("provenance", {}), | |
| ) | |
| def _to_grant_intent(data: Dict[str, Any], | |
| requester: str) -> GrantAccessIntent: | |
| principal = data.get("principal") | |
| if not principal: | |
| raise IntentAdapterError("Missing 'principal' for grant_access intent") | |
| permission_level = data.get("permission_level") | |
| if not permission_level: | |
| raise IntentAdapterError( | |
| "Missing 'permission_level' for grant_access intent") | |
| resource_scope = data.get("resource_scope") | |
| if not resource_scope: | |
| raise IntentAdapterError( | |
| "Missing 'resource_scope' for grant_access intent") | |
| return GrantAccessIntent( | |
| principal=principal, | |
| permission_level=permission_level, | |
| resource_scope=resource_scope, | |
| requester=requester, | |
| justification=data.get("justification", ""), | |
| provenance=data.get("provenance", {}), | |
| ) | |
| def _to_deploy_intent(data: Dict[str, Any], | |
| requester: str) -> DeployConfigurationIntent: | |
| service_name = data.get("service_name") | |
| if not service_name: | |
| raise IntentAdapterError( | |
| "Missing 'service_name' for deploy_config intent") | |
| change_scope = data.get("change_scope") | |
| if not change_scope: | |
| raise IntentAdapterError( | |
| "Missing 'change_scope' for deploy_config intent") | |
| deployment_target = data.get("deployment_target") | |
| if not deployment_target: | |
| raise IntentAdapterError( | |
| "Missing 'deployment_target' for deploy_config intent") | |
| # risk_level_hint expects a float; if not a number, set to None | |
| risk_hint = data.get("risk_level_hint") | |
| if risk_hint is not None: | |
| try: | |
| risk_hint = float(risk_hint) | |
| except (TypeError, ValueError): | |
| risk_hint = None | |
| return DeployConfigurationIntent( | |
| service_name=service_name, | |
| change_scope=change_scope, | |
| deployment_target=deployment_target, | |
| requester=requester, | |
| risk_level_hint=risk_hint, | |
| configuration=data.get("configuration", {}), | |
| provenance=data.get("provenance", {}), | |
| ) | |