LovingGraceTech's picture
Add release: workers.py
a847459 verified
#!/usr/bin/env python3
"""
LAYER 5: WORKERS (Micro-Specialists)
Ghost in the Machine Labs - Harmonic Stack
Atomic operations. Fast. Parallel. The foundation.
These are the gears that turn.
Workers handle single-purpose tasks with maximum efficiency.
They have no memory, no personality - just execution.
"""
import asyncio
import json
import httpx
import redis.asyncio as redis
from datetime import datetime
from dataclasses import dataclass
from typing import Dict, Any, Optional
from enum import Enum
OLLAMA_URL = "http://localhost:11434"
REDIS_URL = "redis://localhost:6379"
WORKER_MODEL = "qwen3:4b" # Fast, minimal overhead
class WorkerType(Enum):
"""Atomic operation types."""
# Text operations
SUMMARIZE = "summarize"
EXTRACT = "extract"
CLASSIFY = "classify"
TRANSLATE = "translate"
# Code operations
LINT = "lint"
FORMAT = "format"
EXPLAIN = "explain"
GENERATE = "generate"
# Data operations
PARSE = "parse"
VALIDATE = "validate"
TRANSFORM = "transform"
# Meta operations
ROUTE = "route"
SPLIT = "split"
MERGE = "merge"
WORKER_PROMPTS = {
WorkerType.SUMMARIZE: "Summarize the following in 2-3 sentences:",
WorkerType.EXTRACT: "Extract the key information from:",
WorkerType.CLASSIFY: "Classify the following into one category:",
WorkerType.TRANSLATE: "Translate accurately:",
WorkerType.LINT: "Check this code for errors and issues:",
WorkerType.FORMAT: "Format this code properly:",
WorkerType.EXPLAIN: "Explain this code simply:",
WorkerType.GENERATE: "Generate code for:",
WorkerType.PARSE: "Parse and structure this data:",
WorkerType.VALIDATE: "Validate this data against the schema:",
WorkerType.TRANSFORM: "Transform this data as specified:",
WorkerType.ROUTE: "Determine the best route for this request:",
WorkerType.SPLIT: "Split this task into subtasks:",
WorkerType.MERGE: "Merge these results coherently:",
}
@dataclass
class MicroTask:
"""Atomic unit of work."""
task_id: str
worker_type: WorkerType
input_data: str
parameters: Dict[str, Any]
from_specialist: str
created_at: datetime
@dataclass
class MicroResult:
"""Result of atomic operation."""
task_id: str
worker_type: str
status: str
output: Optional[str]
execution_ms: int
class WorkerPool:
"""
Pool of micro-workers for atomic operations.
Maximum parallelization, minimum overhead.
"""
def __init__(self, redis_url: str = REDIS_URL, ollama_url: str = OLLAMA_URL):
self.redis_url = redis_url
self.ollama_url = ollama_url
self.redis = None
self.running = False
self.active_workers = 0
self.max_workers = 8 # Concurrent worker limit
self.semaphore = asyncio.Semaphore(self.max_workers)
async def start(self):
"""Start the worker pool."""
print("=" * 60)
print("WORKER POOL - Layer 5")
print("Ghost in the Machine Labs")
print("=" * 60)
print(f"Max concurrent workers: {self.max_workers}")
print(f"Model: {WORKER_MODEL}")
print()
for wt in WorkerType:
print(f" [{wt.value}] ready")
self.redis = redis.from_url(self.redis_url)
self.running = True
# Subscribe to worker channels
pubsub = self.redis.pubsub()
channels = [f"worker:{wt.value}" for wt in WorkerType]
await pubsub.subscribe(*channels)
print(f"\nListening on {len(channels)} channels...")
async for message in pubsub.listen():
if not self.running:
break
if message["type"] == "message":
data = message["data"]
if isinstance(data, bytes):
data = data.decode()
try:
task_data = json.loads(data)
# Process in background without blocking
asyncio.create_task(self._process_task(task_data))
except Exception as e:
print(f"[ERROR] {e}")
async def _process_task(self, task_data: dict):
"""Process a micro-task."""
async with self.semaphore: # Limit concurrency
self.active_workers += 1
task_id = task_data.get("task_id", f"micro-{datetime.now().strftime('%H%M%S%f')}")
worker_type = WorkerType(task_data.get("type", "summarize"))
input_data = task_data.get("input", "")
params = task_data.get("parameters", {})
from_specialist = task_data.get("from", "unknown")
print(f"[{worker_type.value}] Task: {task_id[:20]}... (active: {self.active_workers})")
# Execute atomic operation
start = datetime.now()
prompt = f"{WORKER_PROMPTS[worker_type]}\n\n{input_data}"
try:
async with httpx.AsyncClient(timeout=60.0) as client:
resp = await client.post(
f"{self.ollama_url}/api/generate",
json={"model": WORKER_MODEL, "prompt": prompt, "stream": False}
)
elapsed = int((datetime.now() - start).total_seconds() * 1000)
if resp.status_code == 200:
output = resp.json().get("response", "")
status = "complete"
else:
output = None
status = "failed"
except Exception as e:
output = None
status = "error"
elapsed = 0
print(f"[{worker_type.value}] Error: {e}")
# Publish result back
result = {
"type": "worker_result",
"task_id": task_id,
"worker_type": worker_type.value,
"status": status,
"output": output[:500] if output else None,
"ms": elapsed
}
await self.redis.publish(f"worker:results:{from_specialist}", json.dumps(result))
self.active_workers -= 1
print(f"[{worker_type.value}] {task_id[:20]}: {status} ({elapsed}ms)")
async def stop(self):
"""Stop the worker pool."""
self.running = False
if self.redis:
await self.redis.aclose()
async def main():
"""Run the worker pool service."""
pool = WorkerPool()
try:
await pool.start()
except KeyboardInterrupt:
await pool.stop()
if __name__ == "__main__":
asyncio.run(main())