from __future__ import annotations from backend.engine import primitives class WorkflowExecutor: def __init__(self) -> None: self.registry = { "email_received": primitives.email_received, "schedule": primitives.schedule, "file_uploaded": primitives.file_uploaded, "manual_trigger": primitives.manual_trigger, "ai_extract": primitives.ai_extract, "ai_classify": primitives.ai_classify, "ai_compose": primitives.ai_compose, "sheets_read": primitives.sheets_read, "sheets_write": primitives.sheets_write, "sheets_update": primitives.sheets_update, "send_email": primitives.send_email, "notify_owner": primitives.notify_owner, "condition": primitives.condition, "loop": primitives.loop, "wait_for_input": primitives.wait_for_input, } def execute(self, workflow: dict, trigger: dict, db, owner_id: str) -> dict: runtime = { "trigger": trigger, "db": db, "owner_id": owner_id, "owner": db.get_owner(owner_id), "workflow_id": workflow["id"], "step_results": {}, } steps_executed = [] next_override = None try: for step in workflow["steps"]: if next_override and step["id"] != next_override and step["id"].startswith("step_notify"): continue result = self.registry[step["action"]](step["params"], runtime) normalized = result["rows"] if isinstance(result, dict) and "rows" in result else result if isinstance(normalized, dict): runtime["step_results"][step["id"]] = normalized steps_executed.append({"id": step["id"], "action": step["action"], "result": result}) if step["action"] == "condition": next_override = result["next_step"] return {"outcome": "completed", "steps": steps_executed} except Exception as exc: # pragma: no cover - defensive path return {"outcome": "error", "steps": steps_executed, "error": str(exc)}