flow-pilot / backend /engine /validator.py
DevelopedBy-Siva
setup the initial app and deploy
83fe4f9
from backend.models.workflow_schema import ALLOWED_ACTIONS, ALLOWED_TRIGGERS
def validate_workflow(workflow: dict) -> None:
trigger_type = workflow["trigger"]["type"]
if trigger_type not in ALLOWED_TRIGGERS:
raise ValueError(f"Unsupported trigger: {trigger_type}")
step_ids = set()
for step in workflow["steps"]:
if step["id"] in step_ids:
raise ValueError(f"Duplicate step id: {step['id']}")
step_ids.add(step["id"])
if step["action"] not in ALLOWED_ACTIONS:
raise ValueError(f"Unsupported action: {step['action']}")