File size: 11,350 Bytes
e4d6d99 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 |
#!/usr/bin/env python3
"""
HARMONIC STACK ORCHESTRATOR
Ghost in the Machine Labs
The complete 5-layer stack in one entry point.
Auto-configures from hardware profile.
Layers:
1. Executive - Strategic decisions
2. Operator + Directors - Task coordination
3. Ethics Council - Oversight
4. Workers - Parallel execution
5. Specialists - Domain expertise
"""
import asyncio
import httpx
from dataclasses import dataclass
from typing import Dict, Any, Optional, List
from datetime import datetime
from hardware_profiles import select_profile, get_profile, print_profile_summary
from worker_manager import WorkerPool, WorkTask, WorkResult, init_workers
from specialist_pool import SpecialistPool, SpecialistTask, SpecialistResult, SpecialistType, init_specialists
OLLAMA_URL = "http://localhost:11434"
@dataclass
class StackConfig:
"""Full stack configuration from hardware profile."""
profile_name: str
vram_gb: float
# Layer 1
executive_model: str
executive_ctx: int
# Layer 2
operator_model: str
directors: List[str]
# Layer 3
council_enabled: bool
council_model: str
council_members: int
# Layer 4
worker_model: str
worker_mode: str
worker_count: int
# Layer 5
specialists_enabled: bool
specialist_model: str
max_specialists: int
class HarmonicStack:
"""
The complete Harmonic Stack.
Auto-configures all 5 layers from hardware profile.
"""
def __init__(self, profile_name: str = None):
"""Initialize all layers from hardware profile."""
# Detect or use specified profile
if profile_name:
self.profile_name = profile_name
self.profile = get_profile(profile_name)
self.vram_gb = 0 # Manual override
else:
self.profile_name, self.profile, self.vram_gb = select_profile()
# Build config
self.config = self._build_config()
# Layer instances (initialized on start)
self.worker_pool: Optional[WorkerPool] = None
self.specialist_pool: Optional[SpecialistPool] = None
self._ready = False
print("\n" + "="*60)
print("HARMONIC STACK")
print("="*60)
print_profile_summary(self.profile_name, self.profile, self.vram_gb)
def _build_config(self) -> StackConfig:
"""Extract configuration from profile."""
p = self.profile
return StackConfig(
profile_name=self.profile_name,
vram_gb=self.vram_gb,
executive_model=p["layer_1_executive"]["model"],
executive_ctx=p["layer_1_executive"]["ctx"],
operator_model=p["layer_2_operator"]["model"],
directors=p["layer_2_operator"].get("directors", []),
council_enabled=p["layer_3_council"]["enabled"],
council_model=p["layer_3_council"].get("model", ""),
council_members=p["layer_3_council"].get("members", 0),
worker_model=p["layer_4_worker"]["model"],
worker_mode=p["layer_4_worker"]["mode"],
worker_count=p["layer_4_worker"].get("count", 1),
specialists_enabled=p["layer_5_specialists"]["enabled"],
specialist_model=p["layer_5_specialists"].get("model", ""),
max_specialists=p["layer_5_specialists"].get("max_loaded", 0)
)
async def start(self):
"""Initialize and warm up all layers."""
print("\n[STACK] Starting Harmonic Stack...")
# Layer 4: Workers
print("[STACK] Initializing Layer 4 (Workers)...")
self.worker_pool = await init_workers(self.profile_name)
# Layer 5: Specialists
print("[STACK] Initializing Layer 5 (Specialists)...")
self.specialist_pool = await init_specialists(self.profile_name)
# Warm up executive
print("[STACK] Warming up Executive...")
await self._warmup_model(self.config.executive_model)
self._ready = True
print("[STACK] Harmonic Stack ready\n")
async def _warmup_model(self, model: str):
"""Pre-load a model."""
async with httpx.AsyncClient(timeout=120.0) as client:
await client.post(
f"{OLLAMA_URL}/api/generate",
json={
"model": model,
"prompt": "Ready.",
"stream": False,
"options": {"num_predict": 1}
}
)
async def _query_model(self, model: str, prompt: str, max_tokens: int = 1000) -> str:
"""Query a model directly."""
async with httpx.AsyncClient(timeout=300.0) as client:
response = await client.post(
f"{OLLAMA_URL}/api/generate",
json={
"model": model,
"prompt": prompt,
"stream": False,
"options": {"num_predict": max_tokens}
}
)
return response.json().get("response", "")
async def executive_decide(self, query: str, context: str = "") -> str:
"""Layer 1: Executive strategic decision."""
prompt = f"""You are the Executive of a Harmonic Stack AI system.
Make a strategic decision on how to handle this request.
Context: {context if context else 'None provided'}
Request: {query}
Decide:
1. What type of task is this? (code/writing/analysis/research/general)
2. Does it need specialist expertise?
3. Should it go to workers for parallel processing?
4. Does it require ethics review?
Respond with your decision and reasoning.
"""
return await self._query_model(self.config.executive_model, prompt)
async def work(self, prompt: str, context: str = "") -> WorkResult:
"""Layer 4: Execute work task."""
if not self.worker_pool:
raise RuntimeError("Stack not started")
task = WorkTask(
task_id=f"work_{datetime.now().timestamp()}",
prompt=prompt,
context=context
)
return await self.worker_pool.execute(task)
async def work_parallel(self, prompts: List[str]) -> List[WorkResult]:
"""Layer 4: Execute multiple tasks in parallel."""
if not self.worker_pool:
raise RuntimeError("Stack not started")
tasks = [
WorkTask(
task_id=f"work_{i}_{datetime.now().timestamp()}",
prompt=p
)
for i, p in enumerate(prompts)
]
return await self.worker_pool.execute_batch(tasks)
async def specialist(self, specialist_type: SpecialistType, prompt: str, context: str = "") -> SpecialistResult:
"""Layer 5: Execute specialist task."""
if not self.specialist_pool:
raise RuntimeError("Stack not started")
task = SpecialistTask(
task_id=f"spec_{datetime.now().timestamp()}",
specialist_type=specialist_type,
prompt=prompt,
context=context
)
return await self.specialist_pool.execute(task)
async def process(self, query: str, context: str = "") -> Dict[str, Any]:
"""
Full stack processing:
1. Executive decides approach
2. Route to appropriate layer
3. Return result
"""
if not self._ready:
raise RuntimeError("Stack not started")
start = datetime.now()
# Executive decision
decision = await self.executive_decide(query, context)
# Parse decision (simple heuristics for now)
needs_specialist = "specialist" in decision.lower()
needs_parallel = "parallel" in decision.lower() or "worker" in decision.lower()
result = {
"query": query,
"executive_decision": decision,
"route": "unknown",
"response": "",
"elapsed_ms": 0
}
# Route based on decision
if needs_specialist:
# Determine specialist type from decision
spec_type = SpecialistType.ANALYSIS # Default
if "code" in decision.lower():
spec_type = SpecialistType.CODE
elif "writ" in decision.lower():
spec_type = SpecialistType.WRITING
elif "research" in decision.lower():
spec_type = SpecialistType.RESEARCH
spec_result = await self.specialist(spec_type, query, context)
result["route"] = f"specialist:{spec_type.value}"
result["response"] = spec_result.response
else:
# Default to worker
work_result = await self.work(query, context)
result["route"] = "worker"
result["response"] = work_result.response
result["elapsed_ms"] = int((datetime.now() - start).total_seconds() * 1000)
return result
def status(self) -> Dict[str, Any]:
"""Return full stack status."""
return {
"profile": self.profile_name,
"ready": self._ready,
"config": {
"executive": self.config.executive_model,
"workers": f"{self.config.worker_model} x {self.config.worker_count}",
"specialists": f"{self.config.specialist_model} (max {self.config.max_specialists})" if self.config.specialists_enabled else "disabled"
},
"workers": self.worker_pool.status() if self.worker_pool else None,
"specialists": self.specialist_pool.status() if self.specialist_pool else None
}
# Convenience function
async def create_stack(profile_name: str = None) -> HarmonicStack:
"""Create and start a Harmonic Stack."""
stack = HarmonicStack(profile_name)
await stack.start()
return stack
# CLI
if __name__ == "__main__":
async def main():
# Create stack (auto-detects hardware)
stack = await create_stack()
# Test queries
print("\n" + "="*60)
print("STACK TEST")
print("="*60)
# Test 1: Simple question
print("\n[TEST 1] Simple question:")
result = await stack.process("What is the capital of France?")
print(f" Route: {result['route']}")
print(f" Response: {result['response'][:200]}...")
print(f" Elapsed: {result['elapsed_ms']}ms")
# Test 2: Code request
print("\n[TEST 2] Code request:")
result = await stack.process("Write a Python function to reverse a string")
print(f" Route: {result['route']}")
print(f" Response: {result['response'][:300]}...")
print(f" Elapsed: {result['elapsed_ms']}ms")
# Test 3: Parallel work
print("\n[TEST 3] Parallel work:")
results = await stack.work_parallel([
"What is 2+2?",
"What is 3+3?",
"What is 4+4?"
])
for r in results:
print(f" Task {r.task_id}: {r.response[:50]}... ({r.elapsed_ms}ms)")
print("\n[STATUS]")
import json
print(json.dumps(stack.status(), indent=2))
asyncio.run(main())
|