#!/usr/bin/env python3 """ LAYER 4: WORKER MANAGER Ghost in the Machine Labs - Harmonic Stack Manages worker pool based on hardware profile. Modes: single, tandem, quad Auto-sizes from hardware_profiles.py """ import asyncio import httpx from dataclasses import dataclass from typing import List, Dict, Any, Optional from enum import Enum from datetime import datetime # Import hardware profile from hardware_profiles import select_profile, get_profile OLLAMA_URL = "http://localhost:11434" class WorkerMode(Enum): SHARED = "shared" # Single model, context switching SINGLE = "single" # Dedicated worker TANDEM = "tandem" # Two workers in parallel QUAD = "quad" # Four workers in parallel @dataclass class WorkerConfig: """Worker configuration from hardware profile.""" model: str quant: str mode: WorkerMode count: int @dataclass class WorkTask: """Unit of work for the worker pool.""" task_id: str prompt: str context: str = "" max_tokens: int = 1000 temperature: float = 0.7 @dataclass class WorkResult: """Result from worker.""" task_id: str worker_id: int response: str elapsed_ms: int success: bool error: Optional[str] = None class WorkerPool: """ Manages a pool of workers sized to hardware. """ def __init__(self, profile_name: str = None): """Initialize worker pool from hardware profile.""" if profile_name: self.profile_name = profile_name self.profile = get_profile(profile_name) else: self.profile_name, self.profile, self.vram = select_profile() # Extract Layer 4 config l4 = self.profile["layer_4_worker"] self.model = l4["model"] self.quant = l4.get("quant", "Q4_K_M") self.mode = WorkerMode(l4["mode"]) self.count = l4.get("count", 1) # Worker state self.workers_ready = False self.active_tasks: Dict[str, asyncio.Task] = {} self.results: Dict[str, WorkResult] = {} # Semaphore for concurrent workers self._semaphore = asyncio.Semaphore(self.count) print(f"[LAYER 4] Worker Pool initialized") print(f" Profile: {self.profile_name}") print(f" Model: {self.model} @ {self.quant}") print(f" Mode: {self.mode.value} x {self.count}") async def warmup(self): """Pre-load worker model into memory.""" print(f"[LAYER 4] Warming up {self.model}...") async with httpx.AsyncClient(timeout=120.0) as client: # Ping model to load it await client.post( f"{OLLAMA_URL}/api/generate", json={ "model": self.model, "prompt": "Ready.", "stream": False, "options": {"num_predict": 1} } ) self.workers_ready = True print(f"[LAYER 4] Workers ready") async def _execute_single(self, task: WorkTask, worker_id: int) -> WorkResult: """Execute a single task on a worker.""" start = datetime.now() prompt = task.prompt if task.context: prompt = f"Context:\n{task.context}\n\nTask:\n{task.prompt}" try: async with httpx.AsyncClient(timeout=300.0) as client: response = await client.post( f"{OLLAMA_URL}/api/generate", json={ "model": self.model, "prompt": prompt, "stream": False, "options": { "num_predict": task.max_tokens, "temperature": task.temperature } } ) result = response.json() elapsed = int((datetime.now() - start).total_seconds() * 1000) return WorkResult( task_id=task.task_id, worker_id=worker_id, response=result.get("response", ""), elapsed_ms=elapsed, success=True ) except Exception as e: elapsed = int((datetime.now() - start).total_seconds() * 1000) return WorkResult( task_id=task.task_id, worker_id=worker_id, response="", elapsed_ms=elapsed, success=False, error=str(e) ) async def execute(self, task: WorkTask) -> WorkResult: """Execute task, respecting concurrency limits.""" async with self._semaphore: worker_id = hash(task.task_id) % self.count return await self._execute_single(task, worker_id) async def execute_batch(self, tasks: List[WorkTask]) -> List[WorkResult]: """Execute multiple tasks in parallel up to worker count.""" coros = [self.execute(task) for task in tasks] return await asyncio.gather(*coros) async def execute_tandem(self, task: WorkTask) -> List[WorkResult]: """ Execute same task on multiple workers, return all results. Useful for consensus or best-of-N selection. """ if self.mode not in [WorkerMode.TANDEM, WorkerMode.QUAD]: # Fall back to single execution result = await self.execute(task) return [result] # Create copies with different worker IDs tasks = [] for i in range(self.count): t = WorkTask( task_id=f"{task.task_id}_w{i}", prompt=task.prompt, context=task.context, max_tokens=task.max_tokens, temperature=task.temperature + (i * 0.05) # Slight variation ) tasks.append(t) return await self.execute_batch(tasks) def status(self) -> Dict[str, Any]: """Return worker pool status.""" return { "profile": self.profile_name, "model": self.model, "mode": self.mode.value, "count": self.count, "ready": self.workers_ready, "active_tasks": len(self.active_tasks) } # Singleton instance _pool: Optional[WorkerPool] = None def get_worker_pool(profile_name: str = None) -> WorkerPool: """Get or create the worker pool singleton.""" global _pool if _pool is None: _pool = WorkerPool(profile_name) return _pool async def init_workers(profile_name: str = None): """Initialize and warmup worker pool.""" pool = get_worker_pool(profile_name) await pool.warmup() return pool # CLI test if __name__ == "__main__": async def test(): pool = await init_workers() # Single task test task = WorkTask( task_id="test_001", prompt="What is 2 + 2? Answer with just the number.", max_tokens=50 ) print("\n[TEST] Single execution:") result = await pool.execute(task) print(f" Response: {result.response}") print(f" Elapsed: {result.elapsed_ms}ms") # Tandem test (if supported) if pool.mode in [WorkerMode.TANDEM, WorkerMode.QUAD]: print("\n[TEST] Tandem execution:") results = await pool.execute_tandem(task) for r in results: print(f" Worker {r.worker_id}: {r.response[:50]}...") print("\n[STATUS]", pool.status()) asyncio.run(test())