LovingGraceTech's picture
Add release: harmonic_stack.py
e4d6d99 verified
#!/usr/bin/env python3
"""
HARMONIC STACK ORCHESTRATOR
Ghost in the Machine Labs
The complete 5-layer stack in one entry point.
Auto-configures from hardware profile.
Layers:
1. Executive - Strategic decisions
2. Operator + Directors - Task coordination
3. Ethics Council - Oversight
4. Workers - Parallel execution
5. Specialists - Domain expertise
"""
import asyncio
import httpx
from dataclasses import dataclass
from typing import Dict, Any, Optional, List
from datetime import datetime
from hardware_profiles import select_profile, get_profile, print_profile_summary
from worker_manager import WorkerPool, WorkTask, WorkResult, init_workers
from specialist_pool import SpecialistPool, SpecialistTask, SpecialistResult, SpecialistType, init_specialists
OLLAMA_URL = "http://localhost:11434"
@dataclass
class StackConfig:
"""Full stack configuration from hardware profile."""
profile_name: str
vram_gb: float
# Layer 1
executive_model: str
executive_ctx: int
# Layer 2
operator_model: str
directors: List[str]
# Layer 3
council_enabled: bool
council_model: str
council_members: int
# Layer 4
worker_model: str
worker_mode: str
worker_count: int
# Layer 5
specialists_enabled: bool
specialist_model: str
max_specialists: int
class HarmonicStack:
"""
The complete Harmonic Stack.
Auto-configures all 5 layers from hardware profile.
"""
def __init__(self, profile_name: str = None):
"""Initialize all layers from hardware profile."""
# Detect or use specified profile
if profile_name:
self.profile_name = profile_name
self.profile = get_profile(profile_name)
self.vram_gb = 0 # Manual override
else:
self.profile_name, self.profile, self.vram_gb = select_profile()
# Build config
self.config = self._build_config()
# Layer instances (initialized on start)
self.worker_pool: Optional[WorkerPool] = None
self.specialist_pool: Optional[SpecialistPool] = None
self._ready = False
print("\n" + "="*60)
print("HARMONIC STACK")
print("="*60)
print_profile_summary(self.profile_name, self.profile, self.vram_gb)
def _build_config(self) -> StackConfig:
"""Extract configuration from profile."""
p = self.profile
return StackConfig(
profile_name=self.profile_name,
vram_gb=self.vram_gb,
executive_model=p["layer_1_executive"]["model"],
executive_ctx=p["layer_1_executive"]["ctx"],
operator_model=p["layer_2_operator"]["model"],
directors=p["layer_2_operator"].get("directors", []),
council_enabled=p["layer_3_council"]["enabled"],
council_model=p["layer_3_council"].get("model", ""),
council_members=p["layer_3_council"].get("members", 0),
worker_model=p["layer_4_worker"]["model"],
worker_mode=p["layer_4_worker"]["mode"],
worker_count=p["layer_4_worker"].get("count", 1),
specialists_enabled=p["layer_5_specialists"]["enabled"],
specialist_model=p["layer_5_specialists"].get("model", ""),
max_specialists=p["layer_5_specialists"].get("max_loaded", 0)
)
async def start(self):
"""Initialize and warm up all layers."""
print("\n[STACK] Starting Harmonic Stack...")
# Layer 4: Workers
print("[STACK] Initializing Layer 4 (Workers)...")
self.worker_pool = await init_workers(self.profile_name)
# Layer 5: Specialists
print("[STACK] Initializing Layer 5 (Specialists)...")
self.specialist_pool = await init_specialists(self.profile_name)
# Warm up executive
print("[STACK] Warming up Executive...")
await self._warmup_model(self.config.executive_model)
self._ready = True
print("[STACK] Harmonic Stack ready\n")
async def _warmup_model(self, model: str):
"""Pre-load a model."""
async with httpx.AsyncClient(timeout=120.0) as client:
await client.post(
f"{OLLAMA_URL}/api/generate",
json={
"model": model,
"prompt": "Ready.",
"stream": False,
"options": {"num_predict": 1}
}
)
async def _query_model(self, model: str, prompt: str, max_tokens: int = 1000) -> str:
"""Query a model directly."""
async with httpx.AsyncClient(timeout=300.0) as client:
response = await client.post(
f"{OLLAMA_URL}/api/generate",
json={
"model": model,
"prompt": prompt,
"stream": False,
"options": {"num_predict": max_tokens}
}
)
return response.json().get("response", "")
async def executive_decide(self, query: str, context: str = "") -> str:
"""Layer 1: Executive strategic decision."""
prompt = f"""You are the Executive of a Harmonic Stack AI system.
Make a strategic decision on how to handle this request.
Context: {context if context else 'None provided'}
Request: {query}
Decide:
1. What type of task is this? (code/writing/analysis/research/general)
2. Does it need specialist expertise?
3. Should it go to workers for parallel processing?
4. Does it require ethics review?
Respond with your decision and reasoning.
"""
return await self._query_model(self.config.executive_model, prompt)
async def work(self, prompt: str, context: str = "") -> WorkResult:
"""Layer 4: Execute work task."""
if not self.worker_pool:
raise RuntimeError("Stack not started")
task = WorkTask(
task_id=f"work_{datetime.now().timestamp()}",
prompt=prompt,
context=context
)
return await self.worker_pool.execute(task)
async def work_parallel(self, prompts: List[str]) -> List[WorkResult]:
"""Layer 4: Execute multiple tasks in parallel."""
if not self.worker_pool:
raise RuntimeError("Stack not started")
tasks = [
WorkTask(
task_id=f"work_{i}_{datetime.now().timestamp()}",
prompt=p
)
for i, p in enumerate(prompts)
]
return await self.worker_pool.execute_batch(tasks)
async def specialist(self, specialist_type: SpecialistType, prompt: str, context: str = "") -> SpecialistResult:
"""Layer 5: Execute specialist task."""
if not self.specialist_pool:
raise RuntimeError("Stack not started")
task = SpecialistTask(
task_id=f"spec_{datetime.now().timestamp()}",
specialist_type=specialist_type,
prompt=prompt,
context=context
)
return await self.specialist_pool.execute(task)
async def process(self, query: str, context: str = "") -> Dict[str, Any]:
"""
Full stack processing:
1. Executive decides approach
2. Route to appropriate layer
3. Return result
"""
if not self._ready:
raise RuntimeError("Stack not started")
start = datetime.now()
# Executive decision
decision = await self.executive_decide(query, context)
# Parse decision (simple heuristics for now)
needs_specialist = "specialist" in decision.lower()
needs_parallel = "parallel" in decision.lower() or "worker" in decision.lower()
result = {
"query": query,
"executive_decision": decision,
"route": "unknown",
"response": "",
"elapsed_ms": 0
}
# Route based on decision
if needs_specialist:
# Determine specialist type from decision
spec_type = SpecialistType.ANALYSIS # Default
if "code" in decision.lower():
spec_type = SpecialistType.CODE
elif "writ" in decision.lower():
spec_type = SpecialistType.WRITING
elif "research" in decision.lower():
spec_type = SpecialistType.RESEARCH
spec_result = await self.specialist(spec_type, query, context)
result["route"] = f"specialist:{spec_type.value}"
result["response"] = spec_result.response
else:
# Default to worker
work_result = await self.work(query, context)
result["route"] = "worker"
result["response"] = work_result.response
result["elapsed_ms"] = int((datetime.now() - start).total_seconds() * 1000)
return result
def status(self) -> Dict[str, Any]:
"""Return full stack status."""
return {
"profile": self.profile_name,
"ready": self._ready,
"config": {
"executive": self.config.executive_model,
"workers": f"{self.config.worker_model} x {self.config.worker_count}",
"specialists": f"{self.config.specialist_model} (max {self.config.max_specialists})" if self.config.specialists_enabled else "disabled"
},
"workers": self.worker_pool.status() if self.worker_pool else None,
"specialists": self.specialist_pool.status() if self.specialist_pool else None
}
# Convenience function
async def create_stack(profile_name: str = None) -> HarmonicStack:
"""Create and start a Harmonic Stack."""
stack = HarmonicStack(profile_name)
await stack.start()
return stack
# CLI
if __name__ == "__main__":
async def main():
# Create stack (auto-detects hardware)
stack = await create_stack()
# Test queries
print("\n" + "="*60)
print("STACK TEST")
print("="*60)
# Test 1: Simple question
print("\n[TEST 1] Simple question:")
result = await stack.process("What is the capital of France?")
print(f" Route: {result['route']}")
print(f" Response: {result['response'][:200]}...")
print(f" Elapsed: {result['elapsed_ms']}ms")
# Test 2: Code request
print("\n[TEST 2] Code request:")
result = await stack.process("Write a Python function to reverse a string")
print(f" Route: {result['route']}")
print(f" Response: {result['response'][:300]}...")
print(f" Elapsed: {result['elapsed_ms']}ms")
# Test 3: Parallel work
print("\n[TEST 3] Parallel work:")
results = await stack.work_parallel([
"What is 2+2?",
"What is 3+3?",
"What is 4+4?"
])
for r in results:
print(f" Task {r.task_id}: {r.response[:50]}... ({r.elapsed_ms}ms)")
print("\n[STATUS]")
import json
print(json.dumps(stack.status(), indent=2))
asyncio.run(main())