LovingGraceTech's picture
Add release: worker_manager.py
8ff1d57 verified
#!/usr/bin/env python3
"""
LAYER 4: WORKER MANAGER
Ghost in the Machine Labs - Harmonic Stack
Manages worker pool based on hardware profile.
Modes: single, tandem, quad
Auto-sizes from hardware_profiles.py
"""
import asyncio
import httpx
from dataclasses import dataclass
from typing import List, Dict, Any, Optional
from enum import Enum
from datetime import datetime
# Import hardware profile
from hardware_profiles import select_profile, get_profile
OLLAMA_URL = "http://localhost:11434"
class WorkerMode(Enum):
SHARED = "shared" # Single model, context switching
SINGLE = "single" # Dedicated worker
TANDEM = "tandem" # Two workers in parallel
QUAD = "quad" # Four workers in parallel
@dataclass
class WorkerConfig:
"""Worker configuration from hardware profile."""
model: str
quant: str
mode: WorkerMode
count: int
@dataclass
class WorkTask:
"""Unit of work for the worker pool."""
task_id: str
prompt: str
context: str = ""
max_tokens: int = 1000
temperature: float = 0.7
@dataclass
class WorkResult:
"""Result from worker."""
task_id: str
worker_id: int
response: str
elapsed_ms: int
success: bool
error: Optional[str] = None
class WorkerPool:
"""
Manages a pool of workers sized to hardware.
"""
def __init__(self, profile_name: str = None):
"""Initialize worker pool from hardware profile."""
if profile_name:
self.profile_name = profile_name
self.profile = get_profile(profile_name)
else:
self.profile_name, self.profile, self.vram = select_profile()
# Extract Layer 4 config
l4 = self.profile["layer_4_worker"]
self.model = l4["model"]
self.quant = l4.get("quant", "Q4_K_M")
self.mode = WorkerMode(l4["mode"])
self.count = l4.get("count", 1)
# Worker state
self.workers_ready = False
self.active_tasks: Dict[str, asyncio.Task] = {}
self.results: Dict[str, WorkResult] = {}
# Semaphore for concurrent workers
self._semaphore = asyncio.Semaphore(self.count)
print(f"[LAYER 4] Worker Pool initialized")
print(f" Profile: {self.profile_name}")
print(f" Model: {self.model} @ {self.quant}")
print(f" Mode: {self.mode.value} x {self.count}")
async def warmup(self):
"""Pre-load worker model into memory."""
print(f"[LAYER 4] Warming up {self.model}...")
async with httpx.AsyncClient(timeout=120.0) as client:
# Ping model to load it
await client.post(
f"{OLLAMA_URL}/api/generate",
json={
"model": self.model,
"prompt": "Ready.",
"stream": False,
"options": {"num_predict": 1}
}
)
self.workers_ready = True
print(f"[LAYER 4] Workers ready")
async def _execute_single(self, task: WorkTask, worker_id: int) -> WorkResult:
"""Execute a single task on a worker."""
start = datetime.now()
prompt = task.prompt
if task.context:
prompt = f"Context:\n{task.context}\n\nTask:\n{task.prompt}"
try:
async with httpx.AsyncClient(timeout=300.0) as client:
response = await client.post(
f"{OLLAMA_URL}/api/generate",
json={
"model": self.model,
"prompt": prompt,
"stream": False,
"options": {
"num_predict": task.max_tokens,
"temperature": task.temperature
}
}
)
result = response.json()
elapsed = int((datetime.now() - start).total_seconds() * 1000)
return WorkResult(
task_id=task.task_id,
worker_id=worker_id,
response=result.get("response", ""),
elapsed_ms=elapsed,
success=True
)
except Exception as e:
elapsed = int((datetime.now() - start).total_seconds() * 1000)
return WorkResult(
task_id=task.task_id,
worker_id=worker_id,
response="",
elapsed_ms=elapsed,
success=False,
error=str(e)
)
async def execute(self, task: WorkTask) -> WorkResult:
"""Execute task, respecting concurrency limits."""
async with self._semaphore:
worker_id = hash(task.task_id) % self.count
return await self._execute_single(task, worker_id)
async def execute_batch(self, tasks: List[WorkTask]) -> List[WorkResult]:
"""Execute multiple tasks in parallel up to worker count."""
coros = [self.execute(task) for task in tasks]
return await asyncio.gather(*coros)
async def execute_tandem(self, task: WorkTask) -> List[WorkResult]:
"""
Execute same task on multiple workers, return all results.
Useful for consensus or best-of-N selection.
"""
if self.mode not in [WorkerMode.TANDEM, WorkerMode.QUAD]:
# Fall back to single execution
result = await self.execute(task)
return [result]
# Create copies with different worker IDs
tasks = []
for i in range(self.count):
t = WorkTask(
task_id=f"{task.task_id}_w{i}",
prompt=task.prompt,
context=task.context,
max_tokens=task.max_tokens,
temperature=task.temperature + (i * 0.05) # Slight variation
)
tasks.append(t)
return await self.execute_batch(tasks)
def status(self) -> Dict[str, Any]:
"""Return worker pool status."""
return {
"profile": self.profile_name,
"model": self.model,
"mode": self.mode.value,
"count": self.count,
"ready": self.workers_ready,
"active_tasks": len(self.active_tasks)
}
# Singleton instance
_pool: Optional[WorkerPool] = None
def get_worker_pool(profile_name: str = None) -> WorkerPool:
"""Get or create the worker pool singleton."""
global _pool
if _pool is None:
_pool = WorkerPool(profile_name)
return _pool
async def init_workers(profile_name: str = None):
"""Initialize and warmup worker pool."""
pool = get_worker_pool(profile_name)
await pool.warmup()
return pool
# CLI test
if __name__ == "__main__":
async def test():
pool = await init_workers()
# Single task test
task = WorkTask(
task_id="test_001",
prompt="What is 2 + 2? Answer with just the number.",
max_tokens=50
)
print("\n[TEST] Single execution:")
result = await pool.execute(task)
print(f" Response: {result.response}")
print(f" Elapsed: {result.elapsed_ms}ms")
# Tandem test (if supported)
if pool.mode in [WorkerMode.TANDEM, WorkerMode.QUAD]:
print("\n[TEST] Tandem execution:")
results = await pool.execute_tandem(task)
for r in results:
print(f" Worker {r.worker_id}: {r.response[:50]}...")
print("\n[STATUS]", pool.status())
asyncio.run(test())