| | |
| | """ |
| | COORDINATORS - Layer 3 |
| | Ghost in the Machine Labs |
| | |
| | Coordinators receive precise tasks from Directors and execute them. |
| | They manage Specialists (Layer 4) and Micro-Specialists (Layer 5). |
| | """ |
| | import os |
| | import json |
| | import asyncio |
| | from datetime import datetime |
| | from typing import Optional, Dict, List, Any |
| | from dataclasses import dataclass, asdict |
| | from enum import Enum |
| |
|
| | import httpx |
| | import redis.asyncio as redis |
| |
|
| | OLLAMA_URL = os.getenv("OLLAMA_URL", "http://localhost:11434") |
| | REDIS_URL = os.getenv("REDIS_URL", "redis://localhost:6379") |
| |
|
| | class Domain(Enum): |
| | CODE = "code" |
| | MATH = "math" |
| | WRITING = "writing" |
| | RESEARCH = "research" |
| | SYSTEM = "system" |
| |
|
| | DOMAIN_MODELS = { |
| | Domain.CODE: "qwen3:8b", |
| | Domain.MATH: "qwen3:8b", |
| | Domain.WRITING: "qwen3:4b", |
| | Domain.RESEARCH: "qwen3:8b", |
| | Domain.SYSTEM: "qwen3:4b", |
| | } |
| |
|
| | @dataclass |
| | class Task: |
| | task_id: str |
| | from_director: str |
| | domain: Domain |
| | specification: str |
| | constraints: List[str] |
| | quality_criteria: List[str] |
| | context: Dict |
| | priority: int = 1 |
| | created_at: datetime = None |
| |
|
| | @dataclass |
| | class TaskResult: |
| | task_id: str |
| | coordinator: str |
| | status: str |
| | output: Any |
| | quality_score: float |
| | execution_time_ms: int |
| | notes: str = "" |
| |
|
| | class Coordinator: |
| | def __init__(self, domain: Domain): |
| | self.domain = domain |
| | self.model = DOMAIN_MODELS[domain] |
| | self.coordinator_id = f"coordinator_{domain.value}" |
| | self.redis = None |
| | self.http = None |
| | self.running = False |
| | self.active_tasks = {} |
| | self.completed_tasks = [] |
| |
|
| | async def start(self): |
| | self.redis = redis.from_url(REDIS_URL) |
| | self.http = httpx.AsyncClient(timeout=120.0) |
| | self.running = True |
| | print(f"[{self.coordinator_id}] Started, model: {self.model}") |
| | await self._listen_for_tasks() |
| |
|
| | async def stop(self): |
| | self.running = False |
| | if self.http: await self.http.aclose() |
| | if self.redis: await self.redis.aclose() |
| | print(f"[{self.coordinator_id}] Stopped") |
| |
|
| | async def _listen_for_tasks(self): |
| | pubsub = self.redis.pubsub() |
| | channel = f"msgbus:coordinator:{self.domain.value}" |
| | await pubsub.subscribe(channel) |
| | async for message in pubsub.listen(): |
| | if not self.running: break |
| | if message["type"] == "message": |
| | data = message["data"] |
| | if isinstance(data, bytes): data = data.decode() |
| | try: |
| | msg = json.loads(data) |
| | if msg.get("type") == "task": |
| | await self._handle_task(msg) |
| | except Exception as e: |
| | print(f"[{self.coordinator_id}] Error: {e}") |
| |
|
| | async def _handle_task(self, msg: Dict): |
| | task = Task( |
| | task_id=msg.get("task_id", f"task-{datetime.now().strftime('%Y%m%d%H%M%S')}"), |
| | from_director=msg.get("from", "unknown"), |
| | domain=self.domain, |
| | specification=msg.get("specification", ""), |
| | constraints=msg.get("constraints", []), |
| | quality_criteria=msg.get("quality_criteria", []), |
| | context=msg.get("context", {}), |
| | priority=msg.get("priority", 1), |
| | created_at=datetime.now() |
| | ) |
| | print(f"[{self.coordinator_id}] Task: {task.task_id}") |
| | result = await self._execute_task(task) |
| | await self._report_result(task, result) |
| | self.completed_tasks.append(result) |
| |
|
| | async def _execute_task(self, task: Task) -> TaskResult: |
| | start = datetime.now() |
| | prompt = f"You are a {self.domain.value} specialist.\n\nTASK: {task.specification}\n\nExecute precisely." |
| | try: |
| | resp = await self.http.post(f"{OLLAMA_URL}/api/generate", json={"model": self.model, "prompt": prompt, "stream": False}) |
| | elapsed = int((datetime.now() - start).total_seconds() * 1000) |
| | if resp.status_code == 200: |
| | output = resp.json().get("response", "") |
| | return TaskResult(task.task_id, self.coordinator_id, "complete", output, 0.8, elapsed) |
| | return TaskResult(task.task_id, self.coordinator_id, "failed", None, 0.0, elapsed, f"HTTP {resp.status_code}") |
| | except Exception as e: |
| | return TaskResult(task.task_id, self.coordinator_id, "failed", None, 0.0, 0, str(e)) |
| |
|
| | async def _report_result(self, task: Task, result: TaskResult): |
| | channel = f"msgbus:director:{task.from_director}" |
| | await self.redis.publish(channel, json.dumps({"type": "task_result", "task_id": result.task_id, "from": self.coordinator_id, "status": result.status, "output": result.output[:500] if result.output else None, "quality": result.quality_score, "ms": result.execution_time_ms})) |
| | print(f"[{self.coordinator_id}] {result.task_id}: {result.status} ({result.execution_time_ms}ms)") |
| |
|
| | class CoordinatorPool: |
| | def __init__(self, domains=None): |
| | self.coordinators = {d: Coordinator(d) for d in (domains or list(Domain))} |
| | self.running = False |
| |
|
| | async def start(self): |
| | print("=" * 60) |
| | print("COORDINATOR POOL - Layer 3") |
| | print("Ghost in the Machine Labs") |
| | print("=" * 60) |
| | self.running = True |
| | await asyncio.gather(*[c.start() for c in self.coordinators.values()]) |
| |
|
| | async def stop(self): |
| | self.running = False |
| | for c in self.coordinators.values(): await c.stop() |
| | print("[POOL] Stopped") |
| |
|
| | async def main(): |
| | pool = CoordinatorPool() |
| | try: await pool.start() |
| | except KeyboardInterrupt: print("\nShutting down...") |
| | finally: await pool.stop() |
| |
|
| | if __name__ == "__main__": asyncio.run(main()) |
| |
|