recursivelabs's picture
Upload 16 files
c5828bc verified
"""
BaseAgent - Recursive Cognitive Agent Framework for AGI-HEDGE-FUND
This module implements the foundational cognitive architecture for all investment agents.
Each agent inherits from this base class to enable:
- Recursive reasoning loops with customizable depth
- Transparent attribution tracing for decision provenance
- Temporal memory shell with configurable decay
- Value-weighted decision encoding
- Symbolic state representation for interpretability
Internal Note: Base class encodes the recursive interpretability interface (.p/ command patterns)
in line with Anthropic-style Circuit Tracing research while maintaining familiar investment syntax.
"""
import uuid
import datetime
from typing import Dict, List, Any, Optional, Tuple
import numpy as np
from pydantic import BaseModel, Field
from ..cognition.graph import ReasoningGraph
from ..cognition.memory import MemoryShell
from ..cognition.attribution import AttributionTracer
from ..llm.router import ModelRouter
from ..utils.diagnostics import TracingTools
class AgentSignal(BaseModel):
"""Signal generated by an agent with full attribution and confidence metrics."""
ticker: str = Field(..., description="Stock ticker symbol")
action: str = Field(..., description="buy, sell, or hold")
confidence: float = Field(..., description="Confidence level (0.0-1.0)")
quantity: Optional[int] = Field(None, description="Number of shares to trade")
reasoning: str = Field(..., description="Explicit reasoning chain")
intent: str = Field(..., description="High-level investment intent")
value_basis: str = Field(..., description="Core value driving this decision")
attribution_trace: Dict[str, float] = Field(default_factory=dict, description="Causal attribution weights")
drift_signature: Dict[str, float] = Field(default_factory=dict, description="Belief drift metrics")
signal_id: str = Field(default_factory=lambda: str(uuid.uuid4()), description="Unique signal identifier")
timestamp: datetime.datetime = Field(default_factory=datetime.datetime.now)
class AgentState(BaseModel):
"""Persistent agent state with temporal memory and belief dynamics."""
working_memory: Dict[str, Any] = Field(default_factory=dict, description="Short-term memory")
belief_state: Dict[str, float] = Field(default_factory=dict, description="Current belief distribution")
confidence_history: List[float] = Field(default_factory=list, description="Historical confidence")
decision_history: List[Dict[str, Any]] = Field(default_factory=list, description="Past decisions")
performance_trace: Dict[str, float] = Field(default_factory=dict, description="Performance metrics")
reflective_state: Dict[str, Any] = Field(default_factory=dict, description="Self-awareness metrics")
drift_vector: Dict[str, float] = Field(default_factory=dict, description="Direction of belief evolution")
consistency_score: float = Field(default=1.0, description="Internal consistency metric")
last_update: datetime.datetime = Field(default_factory=datetime.datetime.now)
class BaseAgent:
"""
Base class for all investment agents in the AGI-HEDGE-FUND framework.
Implements the core cognitive architecture including:
- Recursive reasoning loops
- Memory persistence
- Attribution tracing
- Value-weighted decision making
- Symbolic state representation
"""
def __init__(
self,
name: str,
philosophy: str,
reasoning_depth: int = 3,
memory_decay: float = 0.2,
initial_capital: float = 100000.0,
model_provider: str = "anthropic",
model_name: str = "claude-3-sonnet-20240229",
trace_enabled: bool = False,
):
"""
Initialize a cognitive investment agent.
Args:
name: Agent identifier name
philosophy: Investment philosophy description
reasoning_depth: Depth of recursive reasoning (higher = deeper thinking)
memory_decay: Rate of memory deterioration (0.0-1.0)
initial_capital: Starting capital amount
model_provider: LLM provider ("anthropic", "openai", "groq", "ollama", "deepseek")
model_name: Specific model identifier
trace_enabled: Whether to generate full reasoning traces
"""
self.id = str(uuid.uuid4())
self.name = name
self.philosophy = philosophy
self.reasoning_depth = reasoning_depth
self.memory_decay = memory_decay
self.initial_capital = initial_capital
self.current_capital = initial_capital
self.trace_enabled = trace_enabled
# Initialize cognitive components
self.state = AgentState()
self.memory_shell = MemoryShell(decay_rate=memory_decay)
self.attribution_tracer = AttributionTracer()
self.llm = ModelRouter(provider=model_provider, model=model_name)
# Initialize reasoning graph
self.reasoning_graph = ReasoningGraph(
agent_name=self.name,
agent_philosophy=self.philosophy,
model_router=self.llm,
)
# Diagnostics and tracing
self.tracer = TracingTools(agent_id=self.id, agent_name=self.name)
# Internal symbolic processing commands
self._commands = {
"reflect.trace": self._reflect_trace,
"fork.signal": self._fork_signal,
"collapse.detect": self._collapse_detect,
"attribute.weight": self._attribute_weight,
"drift.observe": self._drift_observe,
}
def process_market_data(self, data: Dict[str, Any]) -> Dict[str, Any]:
"""
Process market data through agent's cognitive lens.
Args:
data: Market data dictionary
Returns:
Processed market data with agent-specific insights
"""
# Each agent subclass implements its unique market interpretation
raise NotImplementedError("Agent subclasses must implement process_market_data")
def generate_signals(self, processed_data: Dict[str, Any]) -> List[AgentSignal]:
"""
Generate investment signals based on processed market data.
Args:
processed_data: Processed market data
Returns:
List of investment signals with attribution
"""
# Each agent subclass implements its unique signal generation logic
raise NotImplementedError("Agent subclasses must implement generate_signals")
def update_state(self, market_feedback: Dict[str, Any]) -> None:
"""
Update agent's internal state based on market feedback.
Args:
market_feedback: Dictionary containing market performance data
"""
# Update memory shell with new experiences
self.memory_shell.add_experience(market_feedback)
# Update belief state based on market feedback
self._update_beliefs(market_feedback)
# Update performance metrics
self._update_performance_metrics(market_feedback)
# Apply memory decay
self.memory_shell.apply_decay()
# Update timestamp
self.state.last_update = datetime.datetime.now()
def _update_beliefs(self, market_feedback: Dict[str, Any]) -> None:
"""
Update agent's belief state based on market feedback.
Args:
market_feedback: Dictionary containing market performance data
"""
# Extract relevant signals from market feedback
if 'performance' in market_feedback:
performance = market_feedback['performance']
# Record decision outcomes
if 'decisions' in market_feedback:
for decision in market_feedback['decisions']:
self.state.decision_history.append({
'decision': decision,
'outcome': performance.get(decision.get('ticker'), 0),
'timestamp': datetime.datetime.now()
})
# Update belief state based on performance
for ticker, perf in performance.items():
current_belief = self.state.belief_state.get(ticker, 0.5)
# Belief update formula combines prior belief with new evidence
updated_belief = (current_belief * (1 - self.memory_decay) +
np.tanh(perf) * self.memory_decay)
self.state.belief_state[ticker] = updated_belief
# Track belief drift
if ticker in self.state.belief_state:
drift = updated_belief - current_belief
self.state.drift_vector[ticker] = drift
def _update_performance_metrics(self, market_feedback: Dict[str, Any]) -> None:
"""
Update agent's performance metrics based on market feedback.
Args:
market_feedback: Dictionary containing market performance data
"""
if 'portfolio_value' in market_feedback:
# Calculate return
portfolio_value = market_feedback['portfolio_value']
prior_value = self.state.performance_trace.get('portfolio_value', self.initial_capital)
returns = (portfolio_value - prior_value) / prior_value if prior_value > 0 else 0
# Update performance trace
self.state.performance_trace.update({
'portfolio_value': portfolio_value,
'returns': returns,
'cumulative_return': (portfolio_value / self.initial_capital) - 1,
'win_rate': market_feedback.get('win_rate', 0),
'sharpe_ratio': market_feedback.get('sharpe_ratio', 0),
})
# Update current capital
self.current_capital = portfolio_value
# Add to confidence history
avg_confidence = market_feedback.get('avg_confidence', 0.5)
self.state.confidence_history.append(avg_confidence)
# Update consistency score
self._update_consistency_score(market_feedback)
def _update_consistency_score(self, market_feedback: Dict[str, Any]) -> None:
"""
Update agent's internal consistency score.
Args:
market_feedback: Dictionary containing market performance data
"""
# Measure consistency between stated reasoning and actual performance
if 'decisions' in market_feedback and 'performance' in market_feedback:
performance = market_feedback['performance']
decision_consistency = []
for decision in market_feedback['decisions']:
ticker = decision.get('ticker')
expected_direction = 1 if decision.get('action') == 'buy' else -1
actual_performance = performance.get(ticker, 0)
actual_direction = 1 if actual_performance > 0 else -1
# Consistency is 1 when directions match, -1 when they don't
direction_consistency = 1 if expected_direction == actual_direction else -1
decision_consistency.append(direction_consistency)
# Update consistency score (moving average)
if decision_consistency:
avg_consistency = sum(decision_consistency) / len(decision_consistency)
current_consistency = self.state.consistency_score
self.state.consistency_score = (current_consistency * 0.8) + (avg_consistency * 0.2)
def attribute_signals(self, signals: List[Dict[str, Any]]) -> List[AgentSignal]:
"""
Add attribution traces to raw signals.
Args:
signals: Raw investment signals
Returns:
Signals with attribution traces
"""
attributed_signals = []
for signal in signals:
# Run attribution tracing
attribution = self.attribution_tracer.trace_attribution(
signal=signal,
agent_state=self.state,
reasoning_depth=self.reasoning_depth
)
# Create AgentSignal with attribution
agent_signal = AgentSignal(
ticker=signal.get('ticker', ''),
action=signal.get('action', 'hold'),
confidence=signal.get('confidence', 0.5),
quantity=signal.get('quantity'),
reasoning=signal.get('reasoning', ''),
intent=signal.get('intent', ''),
value_basis=signal.get('value_basis', ''),
attribution_trace=attribution.get('attribution_trace', {}),
drift_signature=self.state.drift_vector,
)
attributed_signals.append(agent_signal)
# Record in tracer if enabled
if self.trace_enabled:
self.tracer.record_signal(agent_signal)
return attributed_signals
def reset(self) -> None:
"""Reset agent to initial state while preserving memory decay pattern."""
# Reset capital
self.current_capital = self.initial_capital
# Reset state but preserve some learning
belief_state = self.state.belief_state.copy()
drift_vector = self.state.drift_vector.copy()
# Create new state
self.state = AgentState(
belief_state=belief_state,
drift_vector=drift_vector,
)
# Apply memory decay to preserved beliefs
for key in self.state.belief_state:
self.state.belief_state[key] *= (1 - self.memory_decay)
def get_state_report(self) -> Dict[str, Any]:
"""
Generate a detailed report of agent's current state.
Returns:
Dictionary containing agent state information
"""
return {
'agent_id': self.id,
'agent_name': self.name,
'philosophy': self.philosophy,
'reasoning_depth': self.reasoning_depth,
'capital': self.current_capital,
'belief_state': self.state.belief_state,
'performance': self.state.performance_trace,
'consistency': self.state.consistency_score,
'decision_count': len(self.state.decision_history),
'avg_confidence': np.mean(self.state.confidence_history[-10:]) if self.state.confidence_history else 0.5,
'drift_vector': self.state.drift_vector,
'memory_decay': self.memory_decay,
'timestamp': datetime.datetime.now(),
}
def save_state(self, filepath: Optional[str] = None) -> Dict[str, Any]:
"""
Save agent state to file or return serializable state.
Args:
filepath: Optional path to save state
Returns:
Dictionary containing serializable agent state
"""
state_dict = {
'agent_id': self.id,
'agent_name': self.name,
'agent_state': self.state.dict(),
'memory_shell': self.memory_shell.export_state(),
'reasoning_depth': self.reasoning_depth,
'memory_decay': self.memory_decay,
'current_capital': self.current_capital,
'initial_capital': self.initial_capital,
'timestamp': datetime.datetime.now().isoformat(),
}
if filepath:
import json
with open(filepath, 'w') as f:
json.dump(state_dict, f)
return state_dict
def load_state(self, state_dict: Dict[str, Any]) -> None:
"""
Load agent state from dictionary.
Args:
state_dict: Dictionary containing agent state
"""
if state_dict['agent_id'] != self.id:
raise ValueError(f"State ID mismatch: {state_dict['agent_id']} vs {self.id}")
# Update basic attributes
self.reasoning_depth = state_dict.get('reasoning_depth', self.reasoning_depth)
self.memory_decay = state_dict.get('memory_decay', self.memory_decay)
self.current_capital = state_dict.get('current_capital', self.current_capital)
self.initial_capital = state_dict.get('initial_capital', self.initial_capital)
# Load agent state
if 'agent_state' in state_dict:
self.state = AgentState.parse_obj(state_dict['agent_state'])
# Load memory shell
if 'memory_shell' in state_dict:
self.memory_shell.import_state(state_dict['memory_shell'])
# Internal symbolic command processors
def _reflect_trace(self, agent=None, depth=3) -> Dict[str, Any]:
"""
Trace agent's self-reflection on decision making process.
Args:
agent: Optional agent name to reflect on (self if None)
depth: Recursion depth for reflection
Returns:
Reflection trace results
"""
# If reflecting on self
if agent is None or agent == self.name:
reflection = self.reasoning_graph.run_reflection(
agent_state=self.state,
depth=depth,
trace_enabled=self.trace_enabled
)
return {
'source': 'self',
'reflection': reflection,
'depth': depth,
'confidence': reflection.get('confidence', 0.5),
'timestamp': datetime.datetime.now(),
}
# Otherwise, this is a request to reflect on another agent (handled by portfolio manager)
return {
'source': 'external',
'target': agent,
'depth': depth,
'status': 'delegated',
'timestamp': datetime.datetime.now(),
}
def _fork_signal(self, source) -> Dict[str, Any]:
"""
Fork a new signal branch from specified source.
Args:
source: Source to fork signal from
Returns:
Forked signal results
"""
if source == 'memory':
# Fork from memory
experiences = self.memory_shell.get_relevant_experiences(limit=3)
forked_signals = self.reasoning_graph.generate_from_experiences(experiences)
return {
'source': 'memory',
'signals': forked_signals,
'count': len(forked_signals),
'timestamp': datetime.datetime.now(),
}
elif source == 'beliefs':
# Fork from current beliefs
top_beliefs = dict(sorted(
self.state.belief_state.items(),
key=lambda x: abs(x[1] - 0.5),
reverse=True
)[:5])
forked_signals = self.reasoning_graph.generate_from_beliefs(top_beliefs)
return {
'source': 'beliefs',
'signals': forked_signals,
'count': len(forked_signals),
'timestamp': datetime.datetime.now(),
}
else:
# Unknown source
return {
'source': source,
'signals': [],
'count': 0,
'error': 'unknown_source',
'timestamp': datetime.datetime.now(),
}
def _collapse_detect(self, threshold=0.7, reason=None) -> Dict[str, Any]:
"""
Detect potential decision collapse or inconsistency.
Args:
threshold: Consistency threshold below which to trigger collapse detection
reason: Optional specific reason to check for collapse
Returns:
Collapse detection results
"""
# Check consistency score
consistency_collapse = self.state.consistency_score < threshold
# Check for specific collapses
collapses = {
'consistency': consistency_collapse,
'confidence': np.mean(self.state.confidence_history[-5:]) < threshold if len(self.state.confidence_history) >= 5 else False,
'belief_drift': any(abs(drift) > 0.3 for drift in self.state.drift_vector.values()),
'performance': self.state.performance_trace.get('returns', 0) < -0.1 if self.state.performance_trace else False,
}
# If specific reason provided, check only that
if reason and reason in collapses:
collapse_detected = collapses[reason]
collapse_reasons = {reason: collapses[reason]}
else:
# Check all collapses
collapse_detected = any(collapses.values())
collapse_reasons = {k: v for k, v in collapses.items() if v}
return {
'collapse_detected': collapse_detected,
'collapse_reasons': collapse_reasons,
'consistency_score': self.state.consistency_score,
'threshold': threshold,
'timestamp': datetime.datetime.now(),
}
def _attribute_weight(self, justification) -> Dict[str, Any]:
"""
Compute attribution weight for a specific justification.
Args:
justification: Justification to weight
Returns:
Attribution weight results
"""
# Extract key themes from justification
themes = self.reasoning_graph.extract_themes(justification)
# Compute alignment with agent's philosophy
philosophy_alignment = self.reasoning_graph.compute_alignment(
themes=themes,
philosophy=self.philosophy
)
# Compute weight based on alignment and consistency
weight = philosophy_alignment * self.state.consistency_score
return {
'justification': justification,
'themes': themes,
'philosophy_alignment': philosophy_alignment,
'consistency_factor': self.state.consistency_score,
'attribution_weight': weight,
'timestamp': datetime.datetime.now(),
}
def _drift_observe(self, vector, bias=0.0) -> Dict[str, Any]:
"""
Observe and record belief drift.
Args:
vector: Drift vector to observe
bias: Optional bias adjustment
Returns:
Drift observation results
"""
# Record drift observation
for key, value in vector.items():
if key in self.state.drift_vector:
# Existing key, update with bias
self.state.drift_vector[key] = (self.state.drift_vector[key] * 0.7) + (value * 0.3) + bias
else:
# New key
self.state.drift_vector[key] = value + bias
# Compute drift magnitude
drift_magnitude = np.sqrt(sum(v**2 for v in self.state.drift_vector.values()))
return {
'drift_vector': self.state.drift_vector,
'drift_magnitude': drift_magnitude,
'bias_applied': bias,
'observation_count': len(vector),
'timestamp': datetime.datetime.now(),
}
def execute_command(self, command: str, **kwargs) -> Dict[str, Any]:
"""
Execute an internal symbolic command.
Args:
command: Command to execute
**kwargs: Command parameters
Returns:
Command execution results
"""
if command in self._commands:
return self._commands[command](**kwargs)
else:
return {
'error': 'unknown_command',
'command': command,
'available_commands': list(self._commands.keys()),
'timestamp': datetime.datetime.now(),
}
def __repr__(self) -> str:
"""Generate string representation of agent."""
return f"{self.name} Agent (ID: {self.id[:8]}, Philosophy: {self.philosophy})"
def __str__(self) -> str:
"""Generate human-readable string for agent."""
return f"{self.name} Agent: {self.philosophy} (Depth: {self.reasoning_depth})"