#!/usr/bin/env python3 """ HARMONIC STACK ORCHESTRATOR Ghost in the Machine Labs The complete 5-layer stack in one entry point. Auto-configures from hardware profile. Layers: 1. Executive - Strategic decisions 2. Operator + Directors - Task coordination 3. Ethics Council - Oversight 4. Workers - Parallel execution 5. Specialists - Domain expertise """ import asyncio import httpx from dataclasses import dataclass from typing import Dict, Any, Optional, List from datetime import datetime from hardware_profiles import select_profile, get_profile, print_profile_summary from worker_manager import WorkerPool, WorkTask, WorkResult, init_workers from specialist_pool import SpecialistPool, SpecialistTask, SpecialistResult, SpecialistType, init_specialists OLLAMA_URL = "http://localhost:11434" @dataclass class StackConfig: """Full stack configuration from hardware profile.""" profile_name: str vram_gb: float # Layer 1 executive_model: str executive_ctx: int # Layer 2 operator_model: str directors: List[str] # Layer 3 council_enabled: bool council_model: str council_members: int # Layer 4 worker_model: str worker_mode: str worker_count: int # Layer 5 specialists_enabled: bool specialist_model: str max_specialists: int class HarmonicStack: """ The complete Harmonic Stack. Auto-configures all 5 layers from hardware profile. """ def __init__(self, profile_name: str = None): """Initialize all layers from hardware profile.""" # Detect or use specified profile if profile_name: self.profile_name = profile_name self.profile = get_profile(profile_name) self.vram_gb = 0 # Manual override else: self.profile_name, self.profile, self.vram_gb = select_profile() # Build config self.config = self._build_config() # Layer instances (initialized on start) self.worker_pool: Optional[WorkerPool] = None self.specialist_pool: Optional[SpecialistPool] = None self._ready = False print("\n" + "="*60) print("HARMONIC STACK") print("="*60) print_profile_summary(self.profile_name, self.profile, self.vram_gb) def _build_config(self) -> StackConfig: """Extract configuration from profile.""" p = self.profile return StackConfig( profile_name=self.profile_name, vram_gb=self.vram_gb, executive_model=p["layer_1_executive"]["model"], executive_ctx=p["layer_1_executive"]["ctx"], operator_model=p["layer_2_operator"]["model"], directors=p["layer_2_operator"].get("directors", []), council_enabled=p["layer_3_council"]["enabled"], council_model=p["layer_3_council"].get("model", ""), council_members=p["layer_3_council"].get("members", 0), worker_model=p["layer_4_worker"]["model"], worker_mode=p["layer_4_worker"]["mode"], worker_count=p["layer_4_worker"].get("count", 1), specialists_enabled=p["layer_5_specialists"]["enabled"], specialist_model=p["layer_5_specialists"].get("model", ""), max_specialists=p["layer_5_specialists"].get("max_loaded", 0) ) async def start(self): """Initialize and warm up all layers.""" print("\n[STACK] Starting Harmonic Stack...") # Layer 4: Workers print("[STACK] Initializing Layer 4 (Workers)...") self.worker_pool = await init_workers(self.profile_name) # Layer 5: Specialists print("[STACK] Initializing Layer 5 (Specialists)...") self.specialist_pool = await init_specialists(self.profile_name) # Warm up executive print("[STACK] Warming up Executive...") await self._warmup_model(self.config.executive_model) self._ready = True print("[STACK] Harmonic Stack ready\n") async def _warmup_model(self, model: str): """Pre-load a model.""" async with httpx.AsyncClient(timeout=120.0) as client: await client.post( f"{OLLAMA_URL}/api/generate", json={ "model": model, "prompt": "Ready.", "stream": False, "options": {"num_predict": 1} } ) async def _query_model(self, model: str, prompt: str, max_tokens: int = 1000) -> str: """Query a model directly.""" async with httpx.AsyncClient(timeout=300.0) as client: response = await client.post( f"{OLLAMA_URL}/api/generate", json={ "model": model, "prompt": prompt, "stream": False, "options": {"num_predict": max_tokens} } ) return response.json().get("response", "") async def executive_decide(self, query: str, context: str = "") -> str: """Layer 1: Executive strategic decision.""" prompt = f"""You are the Executive of a Harmonic Stack AI system. Make a strategic decision on how to handle this request. Context: {context if context else 'None provided'} Request: {query} Decide: 1. What type of task is this? (code/writing/analysis/research/general) 2. Does it need specialist expertise? 3. Should it go to workers for parallel processing? 4. Does it require ethics review? Respond with your decision and reasoning. """ return await self._query_model(self.config.executive_model, prompt) async def work(self, prompt: str, context: str = "") -> WorkResult: """Layer 4: Execute work task.""" if not self.worker_pool: raise RuntimeError("Stack not started") task = WorkTask( task_id=f"work_{datetime.now().timestamp()}", prompt=prompt, context=context ) return await self.worker_pool.execute(task) async def work_parallel(self, prompts: List[str]) -> List[WorkResult]: """Layer 4: Execute multiple tasks in parallel.""" if not self.worker_pool: raise RuntimeError("Stack not started") tasks = [ WorkTask( task_id=f"work_{i}_{datetime.now().timestamp()}", prompt=p ) for i, p in enumerate(prompts) ] return await self.worker_pool.execute_batch(tasks) async def specialist(self, specialist_type: SpecialistType, prompt: str, context: str = "") -> SpecialistResult: """Layer 5: Execute specialist task.""" if not self.specialist_pool: raise RuntimeError("Stack not started") task = SpecialistTask( task_id=f"spec_{datetime.now().timestamp()}", specialist_type=specialist_type, prompt=prompt, context=context ) return await self.specialist_pool.execute(task) async def process(self, query: str, context: str = "") -> Dict[str, Any]: """ Full stack processing: 1. Executive decides approach 2. Route to appropriate layer 3. Return result """ if not self._ready: raise RuntimeError("Stack not started") start = datetime.now() # Executive decision decision = await self.executive_decide(query, context) # Parse decision (simple heuristics for now) needs_specialist = "specialist" in decision.lower() needs_parallel = "parallel" in decision.lower() or "worker" in decision.lower() result = { "query": query, "executive_decision": decision, "route": "unknown", "response": "", "elapsed_ms": 0 } # Route based on decision if needs_specialist: # Determine specialist type from decision spec_type = SpecialistType.ANALYSIS # Default if "code" in decision.lower(): spec_type = SpecialistType.CODE elif "writ" in decision.lower(): spec_type = SpecialistType.WRITING elif "research" in decision.lower(): spec_type = SpecialistType.RESEARCH spec_result = await self.specialist(spec_type, query, context) result["route"] = f"specialist:{spec_type.value}" result["response"] = spec_result.response else: # Default to worker work_result = await self.work(query, context) result["route"] = "worker" result["response"] = work_result.response result["elapsed_ms"] = int((datetime.now() - start).total_seconds() * 1000) return result def status(self) -> Dict[str, Any]: """Return full stack status.""" return { "profile": self.profile_name, "ready": self._ready, "config": { "executive": self.config.executive_model, "workers": f"{self.config.worker_model} x {self.config.worker_count}", "specialists": f"{self.config.specialist_model} (max {self.config.max_specialists})" if self.config.specialists_enabled else "disabled" }, "workers": self.worker_pool.status() if self.worker_pool else None, "specialists": self.specialist_pool.status() if self.specialist_pool else None } # Convenience function async def create_stack(profile_name: str = None) -> HarmonicStack: """Create and start a Harmonic Stack.""" stack = HarmonicStack(profile_name) await stack.start() return stack # CLI if __name__ == "__main__": async def main(): # Create stack (auto-detects hardware) stack = await create_stack() # Test queries print("\n" + "="*60) print("STACK TEST") print("="*60) # Test 1: Simple question print("\n[TEST 1] Simple question:") result = await stack.process("What is the capital of France?") print(f" Route: {result['route']}") print(f" Response: {result['response'][:200]}...") print(f" Elapsed: {result['elapsed_ms']}ms") # Test 2: Code request print("\n[TEST 2] Code request:") result = await stack.process("Write a Python function to reverse a string") print(f" Route: {result['route']}") print(f" Response: {result['response'][:300]}...") print(f" Elapsed: {result['elapsed_ms']}ms") # Test 3: Parallel work print("\n[TEST 3] Parallel work:") results = await stack.work_parallel([ "What is 2+2?", "What is 3+3?", "What is 4+4?" ]) for r in results: print(f" Task {r.task_id}: {r.response[:50]}... ({r.elapsed_ms}ms)") print("\n[STATUS]") import json print(json.dumps(stack.status(), indent=2)) asyncio.run(main())