| | |
| | """ |
| | LAYER 4: WORKER MANAGER |
| | Ghost in the Machine Labs - Harmonic Stack |
| | |
| | Manages worker pool based on hardware profile. |
| | Modes: single, tandem, quad |
| | Auto-sizes from hardware_profiles.py |
| | """ |
| |
|
| | import asyncio |
| | import httpx |
| | from dataclasses import dataclass |
| | from typing import List, Dict, Any, Optional |
| | from enum import Enum |
| | from datetime import datetime |
| |
|
| | |
| | from hardware_profiles import select_profile, get_profile |
| |
|
| | OLLAMA_URL = "http://localhost:11434" |
| |
|
| |
|
| | class WorkerMode(Enum): |
| | SHARED = "shared" |
| | SINGLE = "single" |
| | TANDEM = "tandem" |
| | QUAD = "quad" |
| |
|
| |
|
| | @dataclass |
| | class WorkerConfig: |
| | """Worker configuration from hardware profile.""" |
| | model: str |
| | quant: str |
| | mode: WorkerMode |
| | count: int |
| | |
| |
|
| | @dataclass |
| | class WorkTask: |
| | """Unit of work for the worker pool.""" |
| | task_id: str |
| | prompt: str |
| | context: str = "" |
| | max_tokens: int = 1000 |
| | temperature: float = 0.7 |
| |
|
| |
|
| | @dataclass |
| | class WorkResult: |
| | """Result from worker.""" |
| | task_id: str |
| | worker_id: int |
| | response: str |
| | elapsed_ms: int |
| | success: bool |
| | error: Optional[str] = None |
| |
|
| |
|
| | class WorkerPool: |
| | """ |
| | Manages a pool of workers sized to hardware. |
| | """ |
| | |
| | def __init__(self, profile_name: str = None): |
| | """Initialize worker pool from hardware profile.""" |
| | if profile_name: |
| | self.profile_name = profile_name |
| | self.profile = get_profile(profile_name) |
| | else: |
| | self.profile_name, self.profile, self.vram = select_profile() |
| | |
| | |
| | l4 = self.profile["layer_4_worker"] |
| | self.model = l4["model"] |
| | self.quant = l4.get("quant", "Q4_K_M") |
| | self.mode = WorkerMode(l4["mode"]) |
| | self.count = l4.get("count", 1) |
| | |
| | |
| | self.workers_ready = False |
| | self.active_tasks: Dict[str, asyncio.Task] = {} |
| | self.results: Dict[str, WorkResult] = {} |
| | |
| | |
| | self._semaphore = asyncio.Semaphore(self.count) |
| | |
| | print(f"[LAYER 4] Worker Pool initialized") |
| | print(f" Profile: {self.profile_name}") |
| | print(f" Model: {self.model} @ {self.quant}") |
| | print(f" Mode: {self.mode.value} x {self.count}") |
| | |
| | async def warmup(self): |
| | """Pre-load worker model into memory.""" |
| | print(f"[LAYER 4] Warming up {self.model}...") |
| | async with httpx.AsyncClient(timeout=120.0) as client: |
| | |
| | await client.post( |
| | f"{OLLAMA_URL}/api/generate", |
| | json={ |
| | "model": self.model, |
| | "prompt": "Ready.", |
| | "stream": False, |
| | "options": {"num_predict": 1} |
| | } |
| | ) |
| | self.workers_ready = True |
| | print(f"[LAYER 4] Workers ready") |
| | |
| | async def _execute_single(self, task: WorkTask, worker_id: int) -> WorkResult: |
| | """Execute a single task on a worker.""" |
| | start = datetime.now() |
| | |
| | prompt = task.prompt |
| | if task.context: |
| | prompt = f"Context:\n{task.context}\n\nTask:\n{task.prompt}" |
| | |
| | try: |
| | async with httpx.AsyncClient(timeout=300.0) as client: |
| | response = await client.post( |
| | f"{OLLAMA_URL}/api/generate", |
| | json={ |
| | "model": self.model, |
| | "prompt": prompt, |
| | "stream": False, |
| | "options": { |
| | "num_predict": task.max_tokens, |
| | "temperature": task.temperature |
| | } |
| | } |
| | ) |
| | result = response.json() |
| | elapsed = int((datetime.now() - start).total_seconds() * 1000) |
| | |
| | return WorkResult( |
| | task_id=task.task_id, |
| | worker_id=worker_id, |
| | response=result.get("response", ""), |
| | elapsed_ms=elapsed, |
| | success=True |
| | ) |
| | except Exception as e: |
| | elapsed = int((datetime.now() - start).total_seconds() * 1000) |
| | return WorkResult( |
| | task_id=task.task_id, |
| | worker_id=worker_id, |
| | response="", |
| | elapsed_ms=elapsed, |
| | success=False, |
| | error=str(e) |
| | ) |
| | |
| | async def execute(self, task: WorkTask) -> WorkResult: |
| | """Execute task, respecting concurrency limits.""" |
| | async with self._semaphore: |
| | worker_id = hash(task.task_id) % self.count |
| | return await self._execute_single(task, worker_id) |
| | |
| | async def execute_batch(self, tasks: List[WorkTask]) -> List[WorkResult]: |
| | """Execute multiple tasks in parallel up to worker count.""" |
| | coros = [self.execute(task) for task in tasks] |
| | return await asyncio.gather(*coros) |
| | |
| | async def execute_tandem(self, task: WorkTask) -> List[WorkResult]: |
| | """ |
| | Execute same task on multiple workers, return all results. |
| | Useful for consensus or best-of-N selection. |
| | """ |
| | if self.mode not in [WorkerMode.TANDEM, WorkerMode.QUAD]: |
| | |
| | result = await self.execute(task) |
| | return [result] |
| | |
| | |
| | tasks = [] |
| | for i in range(self.count): |
| | t = WorkTask( |
| | task_id=f"{task.task_id}_w{i}", |
| | prompt=task.prompt, |
| | context=task.context, |
| | max_tokens=task.max_tokens, |
| | temperature=task.temperature + (i * 0.05) |
| | ) |
| | tasks.append(t) |
| | |
| | return await self.execute_batch(tasks) |
| | |
| | def status(self) -> Dict[str, Any]: |
| | """Return worker pool status.""" |
| | return { |
| | "profile": self.profile_name, |
| | "model": self.model, |
| | "mode": self.mode.value, |
| | "count": self.count, |
| | "ready": self.workers_ready, |
| | "active_tasks": len(self.active_tasks) |
| | } |
| |
|
| |
|
| | |
| | _pool: Optional[WorkerPool] = None |
| |
|
| |
|
| | def get_worker_pool(profile_name: str = None) -> WorkerPool: |
| | """Get or create the worker pool singleton.""" |
| | global _pool |
| | if _pool is None: |
| | _pool = WorkerPool(profile_name) |
| | return _pool |
| |
|
| |
|
| | async def init_workers(profile_name: str = None): |
| | """Initialize and warmup worker pool.""" |
| | pool = get_worker_pool(profile_name) |
| | await pool.warmup() |
| | return pool |
| |
|
| |
|
| | |
| | if __name__ == "__main__": |
| | async def test(): |
| | pool = await init_workers() |
| | |
| | |
| | task = WorkTask( |
| | task_id="test_001", |
| | prompt="What is 2 + 2? Answer with just the number.", |
| | max_tokens=50 |
| | ) |
| | |
| | print("\n[TEST] Single execution:") |
| | result = await pool.execute(task) |
| | print(f" Response: {result.response}") |
| | print(f" Elapsed: {result.elapsed_ms}ms") |
| | |
| | |
| | if pool.mode in [WorkerMode.TANDEM, WorkerMode.QUAD]: |
| | print("\n[TEST] Tandem execution:") |
| | results = await pool.execute_tandem(task) |
| | for r in results: |
| | print(f" Worker {r.worker_id}: {r.response[:50]}...") |
| | |
| | print("\n[STATUS]", pool.status()) |
| | |
| | asyncio.run(test()) |
| |
|