| | |
| | """ |
| | LAYER 5: WORKERS (Micro-Specialists) |
| | Ghost in the Machine Labs - Harmonic Stack |
| | |
| | Atomic operations. Fast. Parallel. The foundation. |
| | These are the gears that turn. |
| | |
| | Workers handle single-purpose tasks with maximum efficiency. |
| | They have no memory, no personality - just execution. |
| | """ |
| |
|
| | import asyncio |
| | import json |
| | import httpx |
| | import redis.asyncio as redis |
| | from datetime import datetime |
| | from dataclasses import dataclass |
| | from typing import Dict, Any, Optional |
| | from enum import Enum |
| |
|
| | OLLAMA_URL = "http://localhost:11434" |
| | REDIS_URL = "redis://localhost:6379" |
| | WORKER_MODEL = "qwen3:4b" |
| |
|
| |
|
| | class WorkerType(Enum): |
| | """Atomic operation types.""" |
| | |
| | SUMMARIZE = "summarize" |
| | EXTRACT = "extract" |
| | CLASSIFY = "classify" |
| | TRANSLATE = "translate" |
| | |
| | |
| | LINT = "lint" |
| | FORMAT = "format" |
| | EXPLAIN = "explain" |
| | GENERATE = "generate" |
| | |
| | |
| | PARSE = "parse" |
| | VALIDATE = "validate" |
| | TRANSFORM = "transform" |
| | |
| | |
| | ROUTE = "route" |
| | SPLIT = "split" |
| | MERGE = "merge" |
| |
|
| |
|
| | WORKER_PROMPTS = { |
| | WorkerType.SUMMARIZE: "Summarize the following in 2-3 sentences:", |
| | WorkerType.EXTRACT: "Extract the key information from:", |
| | WorkerType.CLASSIFY: "Classify the following into one category:", |
| | WorkerType.TRANSLATE: "Translate accurately:", |
| | WorkerType.LINT: "Check this code for errors and issues:", |
| | WorkerType.FORMAT: "Format this code properly:", |
| | WorkerType.EXPLAIN: "Explain this code simply:", |
| | WorkerType.GENERATE: "Generate code for:", |
| | WorkerType.PARSE: "Parse and structure this data:", |
| | WorkerType.VALIDATE: "Validate this data against the schema:", |
| | WorkerType.TRANSFORM: "Transform this data as specified:", |
| | WorkerType.ROUTE: "Determine the best route for this request:", |
| | WorkerType.SPLIT: "Split this task into subtasks:", |
| | WorkerType.MERGE: "Merge these results coherently:", |
| | } |
| |
|
| |
|
| | @dataclass |
| | class MicroTask: |
| | """Atomic unit of work.""" |
| | task_id: str |
| | worker_type: WorkerType |
| | input_data: str |
| | parameters: Dict[str, Any] |
| | from_specialist: str |
| | created_at: datetime |
| |
|
| |
|
| | @dataclass |
| | class MicroResult: |
| | """Result of atomic operation.""" |
| | task_id: str |
| | worker_type: str |
| | status: str |
| | output: Optional[str] |
| | execution_ms: int |
| |
|
| |
|
| | class WorkerPool: |
| | """ |
| | Pool of micro-workers for atomic operations. |
| | Maximum parallelization, minimum overhead. |
| | """ |
| | |
| | def __init__(self, redis_url: str = REDIS_URL, ollama_url: str = OLLAMA_URL): |
| | self.redis_url = redis_url |
| | self.ollama_url = ollama_url |
| | self.redis = None |
| | self.running = False |
| | self.active_workers = 0 |
| | self.max_workers = 8 |
| | self.semaphore = asyncio.Semaphore(self.max_workers) |
| | |
| | async def start(self): |
| | """Start the worker pool.""" |
| | print("=" * 60) |
| | print("WORKER POOL - Layer 5") |
| | print("Ghost in the Machine Labs") |
| | print("=" * 60) |
| | print(f"Max concurrent workers: {self.max_workers}") |
| | print(f"Model: {WORKER_MODEL}") |
| | print() |
| | |
| | for wt in WorkerType: |
| | print(f" [{wt.value}] ready") |
| | |
| | self.redis = redis.from_url(self.redis_url) |
| | self.running = True |
| | |
| | |
| | pubsub = self.redis.pubsub() |
| | channels = [f"worker:{wt.value}" for wt in WorkerType] |
| | await pubsub.subscribe(*channels) |
| | print(f"\nListening on {len(channels)} channels...") |
| | |
| | async for message in pubsub.listen(): |
| | if not self.running: |
| | break |
| | if message["type"] == "message": |
| | data = message["data"] |
| | if isinstance(data, bytes): |
| | data = data.decode() |
| | try: |
| | task_data = json.loads(data) |
| | |
| | asyncio.create_task(self._process_task(task_data)) |
| | except Exception as e: |
| | print(f"[ERROR] {e}") |
| | |
| | async def _process_task(self, task_data: dict): |
| | """Process a micro-task.""" |
| | async with self.semaphore: |
| | self.active_workers += 1 |
| | |
| | task_id = task_data.get("task_id", f"micro-{datetime.now().strftime('%H%M%S%f')}") |
| | worker_type = WorkerType(task_data.get("type", "summarize")) |
| | input_data = task_data.get("input", "") |
| | params = task_data.get("parameters", {}) |
| | from_specialist = task_data.get("from", "unknown") |
| | |
| | print(f"[{worker_type.value}] Task: {task_id[:20]}... (active: {self.active_workers})") |
| | |
| | |
| | start = datetime.now() |
| | prompt = f"{WORKER_PROMPTS[worker_type]}\n\n{input_data}" |
| | |
| | try: |
| | async with httpx.AsyncClient(timeout=60.0) as client: |
| | resp = await client.post( |
| | f"{self.ollama_url}/api/generate", |
| | json={"model": WORKER_MODEL, "prompt": prompt, "stream": False} |
| | ) |
| | elapsed = int((datetime.now() - start).total_seconds() * 1000) |
| | |
| | if resp.status_code == 200: |
| | output = resp.json().get("response", "") |
| | status = "complete" |
| | else: |
| | output = None |
| | status = "failed" |
| | except Exception as e: |
| | output = None |
| | status = "error" |
| | elapsed = 0 |
| | print(f"[{worker_type.value}] Error: {e}") |
| | |
| | |
| | result = { |
| | "type": "worker_result", |
| | "task_id": task_id, |
| | "worker_type": worker_type.value, |
| | "status": status, |
| | "output": output[:500] if output else None, |
| | "ms": elapsed |
| | } |
| | await self.redis.publish(f"worker:results:{from_specialist}", json.dumps(result)) |
| | |
| | self.active_workers -= 1 |
| | print(f"[{worker_type.value}] {task_id[:20]}: {status} ({elapsed}ms)") |
| | |
| | async def stop(self): |
| | """Stop the worker pool.""" |
| | self.running = False |
| | if self.redis: |
| | await self.redis.aclose() |
| |
|
| |
|
| | async def main(): |
| | """Run the worker pool service.""" |
| | pool = WorkerPool() |
| | try: |
| | await pool.start() |
| | except KeyboardInterrupt: |
| | await pool.stop() |
| |
|
| |
|
| | if __name__ == "__main__": |
| | asyncio.run(main()) |
| |
|