#!/usr/bin/env python3 """ COORDINATORS - Layer 3 Ghost in the Machine Labs Coordinators receive precise tasks from Directors and execute them. They manage Specialists (Layer 4) and Micro-Specialists (Layer 5). """ import os import json import asyncio from datetime import datetime from typing import Optional, Dict, List, Any from dataclasses import dataclass, asdict from enum import Enum import httpx import redis.asyncio as redis OLLAMA_URL = os.getenv("OLLAMA_URL", "http://localhost:11434") REDIS_URL = os.getenv("REDIS_URL", "redis://localhost:6379") class Domain(Enum): CODE = "code" MATH = "math" WRITING = "writing" RESEARCH = "research" SYSTEM = "system" DOMAIN_MODELS = { Domain.CODE: "qwen3:8b", Domain.MATH: "qwen3:8b", Domain.WRITING: "qwen3:4b", Domain.RESEARCH: "qwen3:8b", Domain.SYSTEM: "qwen3:4b", } @dataclass class Task: task_id: str from_director: str domain: Domain specification: str constraints: List[str] quality_criteria: List[str] context: Dict priority: int = 1 created_at: datetime = None @dataclass class TaskResult: task_id: str coordinator: str status: str output: Any quality_score: float execution_time_ms: int notes: str = "" class Coordinator: def __init__(self, domain: Domain): self.domain = domain self.model = DOMAIN_MODELS[domain] self.coordinator_id = f"coordinator_{domain.value}" self.redis = None self.http = None self.running = False self.active_tasks = {} self.completed_tasks = [] async def start(self): self.redis = redis.from_url(REDIS_URL) self.http = httpx.AsyncClient(timeout=120.0) self.running = True print(f"[{self.coordinator_id}] Started, model: {self.model}") await self._listen_for_tasks() async def stop(self): self.running = False if self.http: await self.http.aclose() if self.redis: await self.redis.aclose() print(f"[{self.coordinator_id}] Stopped") async def _listen_for_tasks(self): pubsub = self.redis.pubsub() channel = f"msgbus:coordinator:{self.domain.value}" await pubsub.subscribe(channel) async for message in pubsub.listen(): if not self.running: break if message["type"] == "message": data = message["data"] if isinstance(data, bytes): data = data.decode() try: msg = json.loads(data) if msg.get("type") == "task": await self._handle_task(msg) except Exception as e: print(f"[{self.coordinator_id}] Error: {e}") async def _handle_task(self, msg: Dict): task = Task( task_id=msg.get("task_id", f"task-{datetime.now().strftime('%Y%m%d%H%M%S')}"), from_director=msg.get("from", "unknown"), domain=self.domain, specification=msg.get("specification", ""), constraints=msg.get("constraints", []), quality_criteria=msg.get("quality_criteria", []), context=msg.get("context", {}), priority=msg.get("priority", 1), created_at=datetime.now() ) print(f"[{self.coordinator_id}] Task: {task.task_id}") result = await self._execute_task(task) await self._report_result(task, result) self.completed_tasks.append(result) async def _execute_task(self, task: Task) -> TaskResult: start = datetime.now() prompt = f"You are a {self.domain.value} specialist.\n\nTASK: {task.specification}\n\nExecute precisely." try: resp = await self.http.post(f"{OLLAMA_URL}/api/generate", json={"model": self.model, "prompt": prompt, "stream": False}) elapsed = int((datetime.now() - start).total_seconds() * 1000) if resp.status_code == 200: output = resp.json().get("response", "") return TaskResult(task.task_id, self.coordinator_id, "complete", output, 0.8, elapsed) return TaskResult(task.task_id, self.coordinator_id, "failed", None, 0.0, elapsed, f"HTTP {resp.status_code}") except Exception as e: return TaskResult(task.task_id, self.coordinator_id, "failed", None, 0.0, 0, str(e)) async def _report_result(self, task: Task, result: TaskResult): channel = f"msgbus:director:{task.from_director}" await self.redis.publish(channel, json.dumps({"type": "task_result", "task_id": result.task_id, "from": self.coordinator_id, "status": result.status, "output": result.output[:500] if result.output else None, "quality": result.quality_score, "ms": result.execution_time_ms})) print(f"[{self.coordinator_id}] {result.task_id}: {result.status} ({result.execution_time_ms}ms)") class CoordinatorPool: def __init__(self, domains=None): self.coordinators = {d: Coordinator(d) for d in (domains or list(Domain))} self.running = False async def start(self): print("=" * 60) print("COORDINATOR POOL - Layer 3") print("Ghost in the Machine Labs") print("=" * 60) self.running = True await asyncio.gather(*[c.start() for c in self.coordinators.values()]) async def stop(self): self.running = False for c in self.coordinators.values(): await c.stop() print("[POOL] Stopped") async def main(): pool = CoordinatorPool() try: await pool.start() except KeyboardInterrupt: print("\nShutting down...") finally: await pool.stop() if __name__ == "__main__": asyncio.run(main())