#!/usr/bin/env python3 """ LAYER 5: WORKERS (Micro-Specialists) Ghost in the Machine Labs - Harmonic Stack Atomic operations. Fast. Parallel. The foundation. These are the gears that turn. Workers handle single-purpose tasks with maximum efficiency. They have no memory, no personality - just execution. """ import asyncio import json import httpx import redis.asyncio as redis from datetime import datetime from dataclasses import dataclass from typing import Dict, Any, Optional from enum import Enum OLLAMA_URL = "http://localhost:11434" REDIS_URL = "redis://localhost:6379" WORKER_MODEL = "qwen3:4b" # Fast, minimal overhead class WorkerType(Enum): """Atomic operation types.""" # Text operations SUMMARIZE = "summarize" EXTRACT = "extract" CLASSIFY = "classify" TRANSLATE = "translate" # Code operations LINT = "lint" FORMAT = "format" EXPLAIN = "explain" GENERATE = "generate" # Data operations PARSE = "parse" VALIDATE = "validate" TRANSFORM = "transform" # Meta operations ROUTE = "route" SPLIT = "split" MERGE = "merge" WORKER_PROMPTS = { WorkerType.SUMMARIZE: "Summarize the following in 2-3 sentences:", WorkerType.EXTRACT: "Extract the key information from:", WorkerType.CLASSIFY: "Classify the following into one category:", WorkerType.TRANSLATE: "Translate accurately:", WorkerType.LINT: "Check this code for errors and issues:", WorkerType.FORMAT: "Format this code properly:", WorkerType.EXPLAIN: "Explain this code simply:", WorkerType.GENERATE: "Generate code for:", WorkerType.PARSE: "Parse and structure this data:", WorkerType.VALIDATE: "Validate this data against the schema:", WorkerType.TRANSFORM: "Transform this data as specified:", WorkerType.ROUTE: "Determine the best route for this request:", WorkerType.SPLIT: "Split this task into subtasks:", WorkerType.MERGE: "Merge these results coherently:", } @dataclass class MicroTask: """Atomic unit of work.""" task_id: str worker_type: WorkerType input_data: str parameters: Dict[str, Any] from_specialist: str created_at: datetime @dataclass class MicroResult: """Result of atomic operation.""" task_id: str worker_type: str status: str output: Optional[str] execution_ms: int class WorkerPool: """ Pool of micro-workers for atomic operations. Maximum parallelization, minimum overhead. """ def __init__(self, redis_url: str = REDIS_URL, ollama_url: str = OLLAMA_URL): self.redis_url = redis_url self.ollama_url = ollama_url self.redis = None self.running = False self.active_workers = 0 self.max_workers = 8 # Concurrent worker limit self.semaphore = asyncio.Semaphore(self.max_workers) async def start(self): """Start the worker pool.""" print("=" * 60) print("WORKER POOL - Layer 5") print("Ghost in the Machine Labs") print("=" * 60) print(f"Max concurrent workers: {self.max_workers}") print(f"Model: {WORKER_MODEL}") print() for wt in WorkerType: print(f" [{wt.value}] ready") self.redis = redis.from_url(self.redis_url) self.running = True # Subscribe to worker channels pubsub = self.redis.pubsub() channels = [f"worker:{wt.value}" for wt in WorkerType] await pubsub.subscribe(*channels) print(f"\nListening on {len(channels)} channels...") async for message in pubsub.listen(): if not self.running: break if message["type"] == "message": data = message["data"] if isinstance(data, bytes): data = data.decode() try: task_data = json.loads(data) # Process in background without blocking asyncio.create_task(self._process_task(task_data)) except Exception as e: print(f"[ERROR] {e}") async def _process_task(self, task_data: dict): """Process a micro-task.""" async with self.semaphore: # Limit concurrency self.active_workers += 1 task_id = task_data.get("task_id", f"micro-{datetime.now().strftime('%H%M%S%f')}") worker_type = WorkerType(task_data.get("type", "summarize")) input_data = task_data.get("input", "") params = task_data.get("parameters", {}) from_specialist = task_data.get("from", "unknown") print(f"[{worker_type.value}] Task: {task_id[:20]}... (active: {self.active_workers})") # Execute atomic operation start = datetime.now() prompt = f"{WORKER_PROMPTS[worker_type]}\n\n{input_data}" try: async with httpx.AsyncClient(timeout=60.0) as client: resp = await client.post( f"{self.ollama_url}/api/generate", json={"model": WORKER_MODEL, "prompt": prompt, "stream": False} ) elapsed = int((datetime.now() - start).total_seconds() * 1000) if resp.status_code == 200: output = resp.json().get("response", "") status = "complete" else: output = None status = "failed" except Exception as e: output = None status = "error" elapsed = 0 print(f"[{worker_type.value}] Error: {e}") # Publish result back result = { "type": "worker_result", "task_id": task_id, "worker_type": worker_type.value, "status": status, "output": output[:500] if output else None, "ms": elapsed } await self.redis.publish(f"worker:results:{from_specialist}", json.dumps(result)) self.active_workers -= 1 print(f"[{worker_type.value}] {task_id[:20]}: {status} ({elapsed}ms)") async def stop(self): """Stop the worker pool.""" self.running = False if self.redis: await self.redis.aclose() async def main(): """Run the worker pool service.""" pool = WorkerPool() try: await pool.start() except KeyboardInterrupt: await pool.stop() if __name__ == "__main__": asyncio.run(main())