LovingGraceTech's picture
Add release: coordinators.py
c4f2e87 verified
#!/usr/bin/env python3
"""
COORDINATORS - Layer 3
Ghost in the Machine Labs
Coordinators receive precise tasks from Directors and execute them.
They manage Specialists (Layer 4) and Micro-Specialists (Layer 5).
"""
import os
import json
import asyncio
from datetime import datetime
from typing import Optional, Dict, List, Any
from dataclasses import dataclass, asdict
from enum import Enum
import httpx
import redis.asyncio as redis
OLLAMA_URL = os.getenv("OLLAMA_URL", "http://localhost:11434")
REDIS_URL = os.getenv("REDIS_URL", "redis://localhost:6379")
class Domain(Enum):
CODE = "code"
MATH = "math"
WRITING = "writing"
RESEARCH = "research"
SYSTEM = "system"
DOMAIN_MODELS = {
Domain.CODE: "qwen3:8b",
Domain.MATH: "qwen3:8b",
Domain.WRITING: "qwen3:4b",
Domain.RESEARCH: "qwen3:8b",
Domain.SYSTEM: "qwen3:4b",
}
@dataclass
class Task:
task_id: str
from_director: str
domain: Domain
specification: str
constraints: List[str]
quality_criteria: List[str]
context: Dict
priority: int = 1
created_at: datetime = None
@dataclass
class TaskResult:
task_id: str
coordinator: str
status: str
output: Any
quality_score: float
execution_time_ms: int
notes: str = ""
class Coordinator:
def __init__(self, domain: Domain):
self.domain = domain
self.model = DOMAIN_MODELS[domain]
self.coordinator_id = f"coordinator_{domain.value}"
self.redis = None
self.http = None
self.running = False
self.active_tasks = {}
self.completed_tasks = []
async def start(self):
self.redis = redis.from_url(REDIS_URL)
self.http = httpx.AsyncClient(timeout=120.0)
self.running = True
print(f"[{self.coordinator_id}] Started, model: {self.model}")
await self._listen_for_tasks()
async def stop(self):
self.running = False
if self.http: await self.http.aclose()
if self.redis: await self.redis.aclose()
print(f"[{self.coordinator_id}] Stopped")
async def _listen_for_tasks(self):
pubsub = self.redis.pubsub()
channel = f"msgbus:coordinator:{self.domain.value}"
await pubsub.subscribe(channel)
async for message in pubsub.listen():
if not self.running: break
if message["type"] == "message":
data = message["data"]
if isinstance(data, bytes): data = data.decode()
try:
msg = json.loads(data)
if msg.get("type") == "task":
await self._handle_task(msg)
except Exception as e:
print(f"[{self.coordinator_id}] Error: {e}")
async def _handle_task(self, msg: Dict):
task = Task(
task_id=msg.get("task_id", f"task-{datetime.now().strftime('%Y%m%d%H%M%S')}"),
from_director=msg.get("from", "unknown"),
domain=self.domain,
specification=msg.get("specification", ""),
constraints=msg.get("constraints", []),
quality_criteria=msg.get("quality_criteria", []),
context=msg.get("context", {}),
priority=msg.get("priority", 1),
created_at=datetime.now()
)
print(f"[{self.coordinator_id}] Task: {task.task_id}")
result = await self._execute_task(task)
await self._report_result(task, result)
self.completed_tasks.append(result)
async def _execute_task(self, task: Task) -> TaskResult:
start = datetime.now()
prompt = f"You are a {self.domain.value} specialist.\n\nTASK: {task.specification}\n\nExecute precisely."
try:
resp = await self.http.post(f"{OLLAMA_URL}/api/generate", json={"model": self.model, "prompt": prompt, "stream": False})
elapsed = int((datetime.now() - start).total_seconds() * 1000)
if resp.status_code == 200:
output = resp.json().get("response", "")
return TaskResult(task.task_id, self.coordinator_id, "complete", output, 0.8, elapsed)
return TaskResult(task.task_id, self.coordinator_id, "failed", None, 0.0, elapsed, f"HTTP {resp.status_code}")
except Exception as e:
return TaskResult(task.task_id, self.coordinator_id, "failed", None, 0.0, 0, str(e))
async def _report_result(self, task: Task, result: TaskResult):
channel = f"msgbus:director:{task.from_director}"
await self.redis.publish(channel, json.dumps({"type": "task_result", "task_id": result.task_id, "from": self.coordinator_id, "status": result.status, "output": result.output[:500] if result.output else None, "quality": result.quality_score, "ms": result.execution_time_ms}))
print(f"[{self.coordinator_id}] {result.task_id}: {result.status} ({result.execution_time_ms}ms)")
class CoordinatorPool:
def __init__(self, domains=None):
self.coordinators = {d: Coordinator(d) for d in (domains or list(Domain))}
self.running = False
async def start(self):
print("=" * 60)
print("COORDINATOR POOL - Layer 3")
print("Ghost in the Machine Labs")
print("=" * 60)
self.running = True
await asyncio.gather(*[c.start() for c in self.coordinators.values()])
async def stop(self):
self.running = False
for c in self.coordinators.values(): await c.stop()
print("[POOL] Stopped")
async def main():
pool = CoordinatorPool()
try: await pool.start()
except KeyboardInterrupt: print("\nShutting down...")
finally: await pool.stop()
if __name__ == "__main__": asyncio.run(main())