Spaces:
Sleeping
Sleeping
File size: 2,236 Bytes
83fe4f9 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 | from __future__ import annotations
from backend.engine import primitives
class WorkflowExecutor:
def __init__(self) -> None:
self.registry = {
"email_received": primitives.email_received,
"schedule": primitives.schedule,
"file_uploaded": primitives.file_uploaded,
"manual_trigger": primitives.manual_trigger,
"ai_extract": primitives.ai_extract,
"ai_classify": primitives.ai_classify,
"ai_compose": primitives.ai_compose,
"sheets_read": primitives.sheets_read,
"sheets_write": primitives.sheets_write,
"sheets_update": primitives.sheets_update,
"send_email": primitives.send_email,
"notify_owner": primitives.notify_owner,
"condition": primitives.condition,
"loop": primitives.loop,
"wait_for_input": primitives.wait_for_input,
}
def execute(self, workflow: dict, trigger: dict, db, owner_id: str) -> dict:
runtime = {
"trigger": trigger,
"db": db,
"owner_id": owner_id,
"owner": db.get_owner(owner_id),
"workflow_id": workflow["id"],
"step_results": {},
}
steps_executed = []
next_override = None
try:
for step in workflow["steps"]:
if next_override and step["id"] != next_override and step["id"].startswith("step_notify"):
continue
result = self.registry[step["action"]](step["params"], runtime)
normalized = result["rows"] if isinstance(result, dict) and "rows" in result else result
if isinstance(normalized, dict):
runtime["step_results"][step["id"]] = normalized
steps_executed.append({"id": step["id"], "action": step["action"], "result": result})
if step["action"] == "condition":
next_override = result["next_step"]
return {"outcome": "completed", "steps": steps_executed}
except Exception as exc: # pragma: no cover - defensive path
return {"outcome": "error", "steps": steps_executed, "error": str(exc)}
|