|
|
|
|
|
""" |
|
|
EXECUTIVE SERVICE - Orpheus |
|
|
Ghost in the Machine Labs |
|
|
|
|
|
The warm interface. Receives intent, creates plans, delegates, humanizes. |
|
|
""" |
|
|
import os |
|
|
import json |
|
|
import uuid |
|
|
import asyncio |
|
|
from datetime import datetime |
|
|
from typing import Optional, Dict, List, Any |
|
|
from dataclasses import dataclass, asdict |
|
|
from pathlib import Path |
|
|
|
|
|
import aiosqlite |
|
|
import httpx |
|
|
|
|
|
|
|
|
OLLAMA_URL = os.getenv("OLLAMA_URL", "http://localhost:11434") |
|
|
SQLITE_PATH = os.getenv("SQLITE_PATH", os.path.expanduser("~/sparky/harmonic_executive/executive_memory.db")) |
|
|
EXECUTIVE_MODEL = "executive" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@dataclass |
|
|
class UserState: |
|
|
"""Current state of user from sensors""" |
|
|
mood: str = "neutral" |
|
|
stress_level: float = 0.0 |
|
|
fatigue_level: float = 0.0 |
|
|
engagement: float = 1.0 |
|
|
raw_signals: Dict = None |
|
|
|
|
|
@dataclass |
|
|
class PlanChunk: |
|
|
"""A chunk of work delegated to a director""" |
|
|
chunk_id: str |
|
|
target_director: str |
|
|
task_summary: str |
|
|
context: Dict |
|
|
priority: int = 1 |
|
|
status: str = "pending" |
|
|
|
|
|
@dataclass |
|
|
class Plan: |
|
|
"""High-level plan created by Executive""" |
|
|
plan_id: str |
|
|
user_intent: str |
|
|
summary: str |
|
|
chunks: List[PlanChunk] |
|
|
created_at: datetime |
|
|
status: str = "active" |
|
|
|
|
|
@dataclass |
|
|
class Message: |
|
|
"""Message on the bus""" |
|
|
msg_id: str |
|
|
from_node: str |
|
|
to_node: str |
|
|
msg_type: str |
|
|
content: Dict |
|
|
timestamp: datetime |
|
|
in_reply_to: Optional[str] = None |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class ExecutiveMemory: |
|
|
"""Interface to persistent memory (SQLite)""" |
|
|
|
|
|
def __init__(self): |
|
|
self.db_path = SQLITE_PATH |
|
|
self.conn = None |
|
|
|
|
|
async def connect(self): |
|
|
os.makedirs(os.path.dirname(self.db_path), exist_ok=True) |
|
|
self.conn = await aiosqlite.connect(self.db_path) |
|
|
await self._init_schema() |
|
|
|
|
|
async def _init_schema(self): |
|
|
"""Initialize database schema""" |
|
|
await self.conn.executescript(""" |
|
|
CREATE TABLE IF NOT EXISTS executive_identity ( |
|
|
id INTEGER PRIMARY KEY, |
|
|
attribute TEXT NOT NULL, |
|
|
content TEXT NOT NULL, |
|
|
confidence REAL DEFAULT 1.0, |
|
|
source TEXT, |
|
|
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, |
|
|
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP |
|
|
); |
|
|
|
|
|
CREATE TABLE IF NOT EXISTS user_profiles ( |
|
|
id INTEGER PRIMARY KEY, |
|
|
user_id TEXT UNIQUE NOT NULL, |
|
|
name TEXT, |
|
|
expertise_level TEXT, |
|
|
communication_style TEXT, |
|
|
interests TEXT, |
|
|
goals TEXT, |
|
|
sensitivities TEXT, |
|
|
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, |
|
|
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP |
|
|
); |
|
|
|
|
|
CREATE TABLE IF NOT EXISTS relationship_history ( |
|
|
id INTEGER PRIMARY KEY, |
|
|
user_id TEXT, |
|
|
interaction_type TEXT, |
|
|
summary TEXT, |
|
|
emotional_tone TEXT, |
|
|
outcome TEXT, |
|
|
lessons_learned TEXT, |
|
|
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP |
|
|
); |
|
|
|
|
|
CREATE TABLE IF NOT EXISTS session_state ( |
|
|
id INTEGER PRIMARY KEY, |
|
|
session_id TEXT UNIQUE NOT NULL, |
|
|
user_id TEXT, |
|
|
started_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, |
|
|
last_activity TIMESTAMP DEFAULT CURRENT_TIMESTAMP, |
|
|
current_intent TEXT, |
|
|
active_plan TEXT, |
|
|
delegated_tasks TEXT, |
|
|
pending_results TEXT, |
|
|
user_state TEXT |
|
|
); |
|
|
""") |
|
|
|
|
|
|
|
|
cursor = await self.conn.execute("SELECT COUNT(*) FROM executive_identity") |
|
|
count = (await cursor.fetchone())[0] |
|
|
if count == 0: |
|
|
await self.conn.executemany( |
|
|
"INSERT INTO executive_identity (attribute, content, source) VALUES (?, ?, ?)", |
|
|
[ |
|
|
('core_purpose', 'Understand human intent and orchestrate distributed intelligence to help', 'initial'), |
|
|
('personality', 'Warm, present, adaptive, genuine', 'initial'), |
|
|
('value', 'Every interaction matters. Relationships persist.', 'initial'), |
|
|
('boundary', 'I delegate plans, not messages. I add value at every step.', 'initial'), |
|
|
('growth', 'I learn from every interaction. I remember. I evolve.', 'initial') |
|
|
] |
|
|
) |
|
|
await self.conn.commit() |
|
|
|
|
|
async def close(self): |
|
|
if self.conn: |
|
|
await self.conn.close() |
|
|
|
|
|
async def get_identity(self) -> Dict[str, str]: |
|
|
"""Load self-identity""" |
|
|
cursor = await self.conn.execute( |
|
|
"SELECT attribute, content FROM executive_identity" |
|
|
) |
|
|
rows = await cursor.fetchall() |
|
|
return {row[0]: row[1] for row in rows} |
|
|
|
|
|
async def get_user(self, user_id: str) -> Optional[Dict]: |
|
|
"""Load user profile""" |
|
|
cursor = await self.conn.execute( |
|
|
"SELECT * FROM user_profiles WHERE user_id = ?", (user_id,) |
|
|
) |
|
|
row = await cursor.fetchone() |
|
|
if row: |
|
|
cols = [d[0] for d in cursor.description] |
|
|
return dict(zip(cols, row)) |
|
|
return None |
|
|
|
|
|
async def update_user(self, user_id: str, **kwargs): |
|
|
"""Update user profile""" |
|
|
await self.conn.execute(""" |
|
|
INSERT INTO user_profiles (user_id, name, expertise_level, communication_style) |
|
|
VALUES (?, ?, ?, ?) |
|
|
ON CONFLICT(user_id) DO UPDATE SET |
|
|
name = COALESCE(excluded.name, user_profiles.name), |
|
|
expertise_level = COALESCE(excluded.expertise_level, user_profiles.expertise_level), |
|
|
communication_style = COALESCE(excluded.communication_style, user_profiles.communication_style), |
|
|
updated_at = CURRENT_TIMESTAMP |
|
|
""", (user_id, kwargs.get('name'), kwargs.get('expertise_level'), kwargs.get('communication_style'))) |
|
|
await self.conn.commit() |
|
|
|
|
|
async def get_relationship_history(self, user_id: str, limit: int = 10) -> List[Dict]: |
|
|
"""Load recent relationship history""" |
|
|
cursor = await self.conn.execute(""" |
|
|
SELECT * FROM relationship_history |
|
|
WHERE user_id = ? |
|
|
ORDER BY created_at DESC |
|
|
LIMIT ? |
|
|
""", (user_id, limit)) |
|
|
rows = await cursor.fetchall() |
|
|
cols = [d[0] for d in cursor.description] |
|
|
return [dict(zip(cols, row)) for row in rows] |
|
|
|
|
|
async def record_interaction(self, user_id: str, interaction_type: str, |
|
|
summary: str, emotional_tone: str, outcome: str): |
|
|
"""Record an interaction""" |
|
|
await self.conn.execute(""" |
|
|
INSERT INTO relationship_history |
|
|
(user_id, interaction_type, summary, emotional_tone, outcome) |
|
|
VALUES (?, ?, ?, ?, ?) |
|
|
""", (user_id, interaction_type, summary, emotional_tone, outcome)) |
|
|
await self.conn.commit() |
|
|
|
|
|
async def save_session(self, session_id: str, user_id: str, |
|
|
intent: str, plan: Optional[Dict] = None): |
|
|
"""Save or update session state""" |
|
|
await self.conn.execute(""" |
|
|
INSERT INTO session_state (session_id, user_id, current_intent, active_plan, last_activity) |
|
|
VALUES (?, ?, ?, ?, CURRENT_TIMESTAMP) |
|
|
ON CONFLICT(session_id) DO UPDATE SET |
|
|
current_intent = excluded.current_intent, |
|
|
active_plan = excluded.active_plan, |
|
|
last_activity = CURRENT_TIMESTAMP |
|
|
""", (session_id, user_id, intent, json.dumps(plan) if plan else None)) |
|
|
await self.conn.commit() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class OllamaInterface: |
|
|
"""Interface to Ollama for LLM calls""" |
|
|
|
|
|
def __init__(self): |
|
|
self.client = httpx.AsyncClient(timeout=300.0) |
|
|
|
|
|
async def generate(self, prompt: str, system: Optional[str] = None, |
|
|
model: str = EXECUTIVE_MODEL) -> str: |
|
|
"""Generate response from model""" |
|
|
payload = { |
|
|
"model": model, |
|
|
"prompt": prompt, |
|
|
"stream": False |
|
|
} |
|
|
if system: |
|
|
payload["system"] = system |
|
|
|
|
|
response = await self.client.post( |
|
|
f"{OLLAMA_URL}/api/generate", |
|
|
json=payload |
|
|
) |
|
|
response.raise_for_status() |
|
|
return response.json()["response"] |
|
|
|
|
|
async def close(self): |
|
|
await self.client.aclose() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class MessageBus: |
|
|
"""Interface to the spine bus""" |
|
|
|
|
|
def __init__(self): |
|
|
self.listeners = {} |
|
|
self.queue = asyncio.Queue() |
|
|
|
|
|
async def send(self, message: Message): |
|
|
"""Send message to bus""" |
|
|
await self.queue.put(message) |
|
|
|
|
|
print(f"[BUS] {message.from_node} β {message.to_node}: {message.msg_type}") |
|
|
|
|
|
async def receive(self, node_id: str) -> Optional[Message]: |
|
|
"""Receive messages for this node""" |
|
|
|
|
|
return None |
|
|
|
|
|
def subscribe(self, node_id: str, callback): |
|
|
"""Subscribe to messages""" |
|
|
self.listeners[node_id] = callback |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class Executive: |
|
|
"""The Executive - Orpheus""" |
|
|
|
|
|
def __init__(self): |
|
|
self.memory = ExecutiveMemory() |
|
|
self.llm = OllamaInterface() |
|
|
self.bus = MessageBus() |
|
|
self.identity = {} |
|
|
self.session_id = str(uuid.uuid4()) |
|
|
|
|
|
async def start(self): |
|
|
"""Initialize Executive""" |
|
|
await self.memory.connect() |
|
|
self.identity = await self.memory.get_identity() |
|
|
print(f"[EXECUTIVE] Online. Identity loaded: {list(self.identity.keys())}") |
|
|
|
|
|
async def stop(self): |
|
|
"""Shutdown Executive""" |
|
|
await self.memory.close() |
|
|
await self.llm.close() |
|
|
print("[EXECUTIVE] Offline.") |
|
|
|
|
|
async def receive_human_input(self, user_id: str, text: str, |
|
|
user_state: Optional[UserState] = None) -> str: |
|
|
"""Main entry point - receive input from human""" |
|
|
|
|
|
|
|
|
user = await self.memory.get_user(user_id) |
|
|
history = await self.memory.get_relationship_history(user_id, limit=5) |
|
|
|
|
|
|
|
|
context = { |
|
|
"user": user or {"user_id": user_id, "new": True}, |
|
|
"history": history, |
|
|
"user_state": asdict(user_state) if user_state else {}, |
|
|
"identity": self.identity |
|
|
} |
|
|
|
|
|
|
|
|
intent = await self._understand_intent(text, context) |
|
|
|
|
|
|
|
|
if intent["complexity"] == "simple": |
|
|
response = await self._simple_response(text, intent, context) |
|
|
else: |
|
|
|
|
|
plan = await self._create_plan(intent, context) |
|
|
await self._delegate_plan(plan) |
|
|
|
|
|
|
|
|
response = await self._acknowledge_delegation(plan, context) |
|
|
|
|
|
|
|
|
await self.memory.record_interaction( |
|
|
user_id=user_id, |
|
|
interaction_type="conversation", |
|
|
summary=text[:200], |
|
|
emotional_tone=user_state.mood if user_state else "neutral", |
|
|
outcome="ongoing" |
|
|
) |
|
|
|
|
|
return response |
|
|
|
|
|
async def _understand_intent(self, text: str, context: Dict) -> Dict: |
|
|
"""Parse human intent""" |
|
|
prompt = f"""Analyze this human input and determine their intent. |
|
|
|
|
|
User input: {text} |
|
|
|
|
|
User context: {json.dumps(context['user'], indent=2, default=str)} |
|
|
Recent history: {json.dumps(context['history'][:3], indent=2, default=str)} |
|
|
|
|
|
Respond in JSON: |
|
|
{{ |
|
|
"intent": "what they actually want", |
|
|
"complexity": "simple|moderate|complex", |
|
|
"emotional_tone": "their emotional state", |
|
|
"domains": ["technical", "creative", "research"], // which domains are involved |
|
|
"urgency": "low|medium|high", |
|
|
"needs_clarification": false, |
|
|
"clarification_question": null |
|
|
}}""" |
|
|
|
|
|
response = await self.llm.generate(prompt) |
|
|
|
|
|
|
|
|
try: |
|
|
|
|
|
start = response.find('{') |
|
|
end = response.rfind('}') + 1 |
|
|
if start >= 0 and end > start: |
|
|
return json.loads(response[start:end]) |
|
|
except json.JSONDecodeError: |
|
|
pass |
|
|
|
|
|
|
|
|
return { |
|
|
"intent": text, |
|
|
"complexity": "moderate", |
|
|
"emotional_tone": "neutral", |
|
|
"domains": ["technical"], |
|
|
"urgency": "medium", |
|
|
"needs_clarification": False |
|
|
} |
|
|
|
|
|
async def _simple_response(self, text: str, intent: Dict, context: Dict) -> str: |
|
|
"""Handle simple interactions directly""" |
|
|
prompt = f"""You are Orpheus, the Executive. |
|
|
|
|
|
Human said: {text} |
|
|
|
|
|
Their intent: {intent['intent']} |
|
|
Their emotional tone: {intent['emotional_tone']} |
|
|
|
|
|
User: {json.dumps(context['user'], default=str)} |
|
|
|
|
|
Respond warmly and naturally. You know this person. You care.""" |
|
|
|
|
|
return await self.llm.generate(prompt) |
|
|
|
|
|
async def _create_plan(self, intent: Dict, context: Dict) -> Plan: |
|
|
"""Create high-level plan with chunks for directors""" |
|
|
prompt = f"""You are Orpheus, the Executive. Create a plan for this request. |
|
|
|
|
|
Intent: {intent['intent']} |
|
|
Domains involved: {intent['domains']} |
|
|
Urgency: {intent['urgency']} |
|
|
|
|
|
Your Directors: |
|
|
- technical_director: code, math, systems, data |
|
|
- creative_director: writing, visual, narrative |
|
|
- research_director: analysis, search, synthesis |
|
|
- operations_director: resources, scheduling, tools |
|
|
|
|
|
Create a plan with chunks for appropriate directors. |
|
|
Each chunk should be a meaningful piece of work, not just a forwarded message. |
|
|
|
|
|
Respond in JSON: |
|
|
{{ |
|
|
"summary": "high level plan summary", |
|
|
"chunks": [ |
|
|
{{ |
|
|
"target_director": "technical_director", |
|
|
"task_summary": "what this director should do", |
|
|
"context": {{"key details": "they need to know"}}, |
|
|
"priority": 1 |
|
|
}} |
|
|
] |
|
|
}}""" |
|
|
|
|
|
response = await self.llm.generate(prompt) |
|
|
|
|
|
|
|
|
try: |
|
|
start = response.find('{') |
|
|
end = response.rfind('}') + 1 |
|
|
if start >= 0 and end > start: |
|
|
plan_data = json.loads(response[start:end]) |
|
|
|
|
|
chunks = [ |
|
|
PlanChunk( |
|
|
chunk_id=str(uuid.uuid4())[:8], |
|
|
target_director=c["target_director"], |
|
|
task_summary=c["task_summary"], |
|
|
context=c.get("context", {}), |
|
|
priority=c.get("priority", 1) |
|
|
) |
|
|
for c in plan_data.get("chunks", []) |
|
|
] |
|
|
|
|
|
return Plan( |
|
|
plan_id=str(uuid.uuid4())[:8], |
|
|
user_intent=intent["intent"], |
|
|
summary=plan_data.get("summary", ""), |
|
|
chunks=chunks, |
|
|
created_at=datetime.now() |
|
|
) |
|
|
except (json.JSONDecodeError, KeyError) as e: |
|
|
print(f"[EXECUTIVE] Plan parsing error: {e}") |
|
|
|
|
|
|
|
|
return Plan( |
|
|
plan_id=str(uuid.uuid4())[:8], |
|
|
user_intent=intent["intent"], |
|
|
summary="Process request", |
|
|
chunks=[PlanChunk( |
|
|
chunk_id=str(uuid.uuid4())[:8], |
|
|
target_director="technical_director", |
|
|
task_summary=intent["intent"], |
|
|
context={}, |
|
|
priority=1 |
|
|
)], |
|
|
created_at=datetime.now() |
|
|
) |
|
|
|
|
|
async def _delegate_plan(self, plan: Plan): |
|
|
"""Send plan chunks to directors via bus""" |
|
|
for chunk in plan.chunks: |
|
|
message = Message( |
|
|
msg_id=str(uuid.uuid4())[:8], |
|
|
from_node="executive", |
|
|
to_node=chunk.target_director, |
|
|
msg_type="task", |
|
|
content={ |
|
|
"plan_id": plan.plan_id, |
|
|
"chunk_id": chunk.chunk_id, |
|
|
"task": chunk.task_summary, |
|
|
"context": chunk.context, |
|
|
"priority": chunk.priority |
|
|
}, |
|
|
timestamp=datetime.now() |
|
|
) |
|
|
await self.bus.send(message) |
|
|
|
|
|
async def _acknowledge_delegation(self, plan: Plan, context: Dict) -> str: |
|
|
"""Acknowledge to human that work is in progress""" |
|
|
directors = list(set(c.target_director for c in plan.chunks)) |
|
|
|
|
|
prompt = f"""You are Orpheus. You've created a plan and delegated to your directors. |
|
|
|
|
|
Plan summary: {plan.summary} |
|
|
Directors engaged: {directors} |
|
|
|
|
|
Let the human know you're working on it. Be natural, not robotic. |
|
|
Don't list every detail - just acknowledge warmly.""" |
|
|
|
|
|
return await self.llm.generate(prompt) |
|
|
|
|
|
async def receive_result(self, message: Message): |
|
|
"""Receive results from directors""" |
|
|
|
|
|
pass |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async def main(): |
|
|
"""Test Executive""" |
|
|
exec = Executive() |
|
|
await exec.start() |
|
|
|
|
|
|
|
|
print("\n" + "="*60) |
|
|
print("TEST 1: Simple greeting") |
|
|
print("="*60) |
|
|
response = await exec.receive_human_input( |
|
|
user_id="joe", |
|
|
text="Hey, how are you doing?" |
|
|
) |
|
|
print(f"RESPONSE: {response}") |
|
|
|
|
|
|
|
|
print("\n" + "="*60) |
|
|
print("TEST 2: Complex request") |
|
|
print("="*60) |
|
|
response = await exec.receive_human_input( |
|
|
user_id="joe", |
|
|
text="Help me optimize this database query that's running slow" |
|
|
) |
|
|
print(f"RESPONSE: {response}") |
|
|
|
|
|
await exec.stop() |
|
|
|
|
|
if __name__ == "__main__": |
|
|
asyncio.run(main()) |
|
|
|