File size: 11,350 Bytes
e4d6d99
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
#!/usr/bin/env python3
"""
HARMONIC STACK ORCHESTRATOR
Ghost in the Machine Labs

The complete 5-layer stack in one entry point.
Auto-configures from hardware profile.

Layers:
  1. Executive - Strategic decisions
  2. Operator + Directors - Task coordination  
  3. Ethics Council - Oversight
  4. Workers - Parallel execution
  5. Specialists - Domain expertise
"""

import asyncio
import httpx
from dataclasses import dataclass
from typing import Dict, Any, Optional, List
from datetime import datetime

from hardware_profiles import select_profile, get_profile, print_profile_summary
from worker_manager import WorkerPool, WorkTask, WorkResult, init_workers
from specialist_pool import SpecialistPool, SpecialistTask, SpecialistResult, SpecialistType, init_specialists

OLLAMA_URL = "http://localhost:11434"


@dataclass
class StackConfig:
    """Full stack configuration from hardware profile."""
    profile_name: str
    vram_gb: float
    
    # Layer 1
    executive_model: str
    executive_ctx: int
    
    # Layer 2
    operator_model: str
    directors: List[str]
    
    # Layer 3
    council_enabled: bool
    council_model: str
    council_members: int
    
    # Layer 4
    worker_model: str
    worker_mode: str
    worker_count: int
    
    # Layer 5
    specialists_enabled: bool
    specialist_model: str
    max_specialists: int


class HarmonicStack:
    """
    The complete Harmonic Stack.
    Auto-configures all 5 layers from hardware profile.
    """
    
    def __init__(self, profile_name: str = None):
        """Initialize all layers from hardware profile."""
        # Detect or use specified profile
        if profile_name:
            self.profile_name = profile_name
            self.profile = get_profile(profile_name)
            self.vram_gb = 0  # Manual override
        else:
            self.profile_name, self.profile, self.vram_gb = select_profile()
        
        # Build config
        self.config = self._build_config()
        
        # Layer instances (initialized on start)
        self.worker_pool: Optional[WorkerPool] = None
        self.specialist_pool: Optional[SpecialistPool] = None
        
        self._ready = False
        
        print("\n" + "="*60)
        print("HARMONIC STACK")
        print("="*60)
        print_profile_summary(self.profile_name, self.profile, self.vram_gb)
    
    def _build_config(self) -> StackConfig:
        """Extract configuration from profile."""
        p = self.profile
        return StackConfig(
            profile_name=self.profile_name,
            vram_gb=self.vram_gb,
            executive_model=p["layer_1_executive"]["model"],
            executive_ctx=p["layer_1_executive"]["ctx"],
            operator_model=p["layer_2_operator"]["model"],
            directors=p["layer_2_operator"].get("directors", []),
            council_enabled=p["layer_3_council"]["enabled"],
            council_model=p["layer_3_council"].get("model", ""),
            council_members=p["layer_3_council"].get("members", 0),
            worker_model=p["layer_4_worker"]["model"],
            worker_mode=p["layer_4_worker"]["mode"],
            worker_count=p["layer_4_worker"].get("count", 1),
            specialists_enabled=p["layer_5_specialists"]["enabled"],
            specialist_model=p["layer_5_specialists"].get("model", ""),
            max_specialists=p["layer_5_specialists"].get("max_loaded", 0)
        )
    
    async def start(self):
        """Initialize and warm up all layers."""
        print("\n[STACK] Starting Harmonic Stack...")
        
        # Layer 4: Workers
        print("[STACK] Initializing Layer 4 (Workers)...")
        self.worker_pool = await init_workers(self.profile_name)
        
        # Layer 5: Specialists
        print("[STACK] Initializing Layer 5 (Specialists)...")
        self.specialist_pool = await init_specialists(self.profile_name)
        
        # Warm up executive
        print("[STACK] Warming up Executive...")
        await self._warmup_model(self.config.executive_model)
        
        self._ready = True
        print("[STACK] Harmonic Stack ready\n")
    
    async def _warmup_model(self, model: str):
        """Pre-load a model."""
        async with httpx.AsyncClient(timeout=120.0) as client:
            await client.post(
                f"{OLLAMA_URL}/api/generate",
                json={
                    "model": model,
                    "prompt": "Ready.",
                    "stream": False,
                    "options": {"num_predict": 1}
                }
            )
    
    async def _query_model(self, model: str, prompt: str, max_tokens: int = 1000) -> str:
        """Query a model directly."""
        async with httpx.AsyncClient(timeout=300.0) as client:
            response = await client.post(
                f"{OLLAMA_URL}/api/generate",
                json={
                    "model": model,
                    "prompt": prompt,
                    "stream": False,
                    "options": {"num_predict": max_tokens}
                }
            )
            return response.json().get("response", "")
    
    async def executive_decide(self, query: str, context: str = "") -> str:
        """Layer 1: Executive strategic decision."""
        prompt = f"""You are the Executive of a Harmonic Stack AI system.
Make a strategic decision on how to handle this request.

Context: {context if context else 'None provided'}

Request: {query}

Decide:
1. What type of task is this? (code/writing/analysis/research/general)
2. Does it need specialist expertise?
3. Should it go to workers for parallel processing?
4. Does it require ethics review?

Respond with your decision and reasoning.
"""
        return await self._query_model(self.config.executive_model, prompt)
    
    async def work(self, prompt: str, context: str = "") -> WorkResult:
        """Layer 4: Execute work task."""
        if not self.worker_pool:
            raise RuntimeError("Stack not started")
        
        task = WorkTask(
            task_id=f"work_{datetime.now().timestamp()}",
            prompt=prompt,
            context=context
        )
        return await self.worker_pool.execute(task)
    
    async def work_parallel(self, prompts: List[str]) -> List[WorkResult]:
        """Layer 4: Execute multiple tasks in parallel."""
        if not self.worker_pool:
            raise RuntimeError("Stack not started")
        
        tasks = [
            WorkTask(
                task_id=f"work_{i}_{datetime.now().timestamp()}",
                prompt=p
            )
            for i, p in enumerate(prompts)
        ]
        return await self.worker_pool.execute_batch(tasks)
    
    async def specialist(self, specialist_type: SpecialistType, prompt: str, context: str = "") -> SpecialistResult:
        """Layer 5: Execute specialist task."""
        if not self.specialist_pool:
            raise RuntimeError("Stack not started")
        
        task = SpecialistTask(
            task_id=f"spec_{datetime.now().timestamp()}",
            specialist_type=specialist_type,
            prompt=prompt,
            context=context
        )
        return await self.specialist_pool.execute(task)
    
    async def process(self, query: str, context: str = "") -> Dict[str, Any]:
        """
        Full stack processing:
        1. Executive decides approach
        2. Route to appropriate layer
        3. Return result
        """
        if not self._ready:
            raise RuntimeError("Stack not started")
        
        start = datetime.now()
        
        # Executive decision
        decision = await self.executive_decide(query, context)
        
        # Parse decision (simple heuristics for now)
        needs_specialist = "specialist" in decision.lower()
        needs_parallel = "parallel" in decision.lower() or "worker" in decision.lower()
        
        result = {
            "query": query,
            "executive_decision": decision,
            "route": "unknown",
            "response": "",
            "elapsed_ms": 0
        }
        
        # Route based on decision
        if needs_specialist:
            # Determine specialist type from decision
            spec_type = SpecialistType.ANALYSIS  # Default
            if "code" in decision.lower():
                spec_type = SpecialistType.CODE
            elif "writ" in decision.lower():
                spec_type = SpecialistType.WRITING
            elif "research" in decision.lower():
                spec_type = SpecialistType.RESEARCH
            
            spec_result = await self.specialist(spec_type, query, context)
            result["route"] = f"specialist:{spec_type.value}"
            result["response"] = spec_result.response
        else:
            # Default to worker
            work_result = await self.work(query, context)
            result["route"] = "worker"
            result["response"] = work_result.response
        
        result["elapsed_ms"] = int((datetime.now() - start).total_seconds() * 1000)
        return result
    
    def status(self) -> Dict[str, Any]:
        """Return full stack status."""
        return {
            "profile": self.profile_name,
            "ready": self._ready,
            "config": {
                "executive": self.config.executive_model,
                "workers": f"{self.config.worker_model} x {self.config.worker_count}",
                "specialists": f"{self.config.specialist_model} (max {self.config.max_specialists})" if self.config.specialists_enabled else "disabled"
            },
            "workers": self.worker_pool.status() if self.worker_pool else None,
            "specialists": self.specialist_pool.status() if self.specialist_pool else None
        }


# Convenience function
async def create_stack(profile_name: str = None) -> HarmonicStack:
    """Create and start a Harmonic Stack."""
    stack = HarmonicStack(profile_name)
    await stack.start()
    return stack


# CLI
if __name__ == "__main__":
    async def main():
        # Create stack (auto-detects hardware)
        stack = await create_stack()
        
        # Test queries
        print("\n" + "="*60)
        print("STACK TEST")
        print("="*60)
        
        # Test 1: Simple question
        print("\n[TEST 1] Simple question:")
        result = await stack.process("What is the capital of France?")
        print(f"  Route: {result['route']}")
        print(f"  Response: {result['response'][:200]}...")
        print(f"  Elapsed: {result['elapsed_ms']}ms")
        
        # Test 2: Code request
        print("\n[TEST 2] Code request:")
        result = await stack.process("Write a Python function to reverse a string")
        print(f"  Route: {result['route']}")
        print(f"  Response: {result['response'][:300]}...")
        print(f"  Elapsed: {result['elapsed_ms']}ms")
        
        # Test 3: Parallel work
        print("\n[TEST 3] Parallel work:")
        results = await stack.work_parallel([
            "What is 2+2?",
            "What is 3+3?",
            "What is 4+4?"
        ])
        for r in results:
            print(f"  Task {r.task_id}: {r.response[:50]}... ({r.elapsed_ms}ms)")
        
        print("\n[STATUS]")
        import json
        print(json.dumps(stack.status(), indent=2))
    
    asyncio.run(main())