Spaces:
Sleeping
Sleeping
| from __future__ import annotations | |
| from backend.engine import primitives | |
| class WorkflowExecutor: | |
| def __init__(self) -> None: | |
| self.registry = { | |
| "email_received": primitives.email_received, | |
| "schedule": primitives.schedule, | |
| "file_uploaded": primitives.file_uploaded, | |
| "manual_trigger": primitives.manual_trigger, | |
| "ai_extract": primitives.ai_extract, | |
| "ai_classify": primitives.ai_classify, | |
| "ai_compose": primitives.ai_compose, | |
| "sheets_read": primitives.sheets_read, | |
| "sheets_write": primitives.sheets_write, | |
| "sheets_update": primitives.sheets_update, | |
| "send_email": primitives.send_email, | |
| "notify_owner": primitives.notify_owner, | |
| "condition": primitives.condition, | |
| "loop": primitives.loop, | |
| "wait_for_input": primitives.wait_for_input, | |
| } | |
| def execute(self, workflow: dict, trigger: dict, db, owner_id: str) -> dict: | |
| runtime = { | |
| "trigger": trigger, | |
| "db": db, | |
| "owner_id": owner_id, | |
| "owner": db.get_owner(owner_id), | |
| "workflow_id": workflow["id"], | |
| "step_results": {}, | |
| } | |
| steps_executed = [] | |
| next_override = None | |
| try: | |
| for step in workflow["steps"]: | |
| if next_override and step["id"] != next_override and step["id"].startswith("step_notify"): | |
| continue | |
| result = self.registry[step["action"]](step["params"], runtime) | |
| normalized = result["rows"] if isinstance(result, dict) and "rows" in result else result | |
| if isinstance(normalized, dict): | |
| runtime["step_results"][step["id"]] = normalized | |
| steps_executed.append({"id": step["id"], "action": step["action"], "result": result}) | |
| if step["action"] == "condition": | |
| next_override = result["next_step"] | |
| return {"outcome": "completed", "steps": steps_executed} | |
| except Exception as exc: # pragma: no cover - defensive path | |
| return {"outcome": "error", "steps": steps_executed, "error": str(exc)} | |