| | """ |
| | BaseAgent - Recursive Cognitive Agent Framework for AGI-HEDGE-FUND |
| | |
| | This module implements the foundational cognitive architecture for all investment agents. |
| | Each agent inherits from this base class to enable: |
| | - Recursive reasoning loops with customizable depth |
| | - Transparent attribution tracing for decision provenance |
| | - Temporal memory shell with configurable decay |
| | - Value-weighted decision encoding |
| | - Symbolic state representation for interpretability |
| | |
| | Internal Note: Base class encodes the recursive interpretability interface (.p/ command patterns) |
| | in line with Anthropic-style Circuit Tracing research while maintaining familiar investment syntax. |
| | """ |
| |
|
| | import uuid |
| | import datetime |
| | from typing import Dict, List, Any, Optional, Tuple |
| | import numpy as np |
| | from pydantic import BaseModel, Field |
| |
|
| | from ..cognition.graph import ReasoningGraph |
| | from ..cognition.memory import MemoryShell |
| | from ..cognition.attribution import AttributionTracer |
| | from ..llm.router import ModelRouter |
| | from ..utils.diagnostics import TracingTools |
| |
|
| |
|
| | class AgentSignal(BaseModel): |
| | """Signal generated by an agent with full attribution and confidence metrics.""" |
| | |
| | ticker: str = Field(..., description="Stock ticker symbol") |
| | action: str = Field(..., description="buy, sell, or hold") |
| | confidence: float = Field(..., description="Confidence level (0.0-1.0)") |
| | quantity: Optional[int] = Field(None, description="Number of shares to trade") |
| | reasoning: str = Field(..., description="Explicit reasoning chain") |
| | intent: str = Field(..., description="High-level investment intent") |
| | value_basis: str = Field(..., description="Core value driving this decision") |
| | attribution_trace: Dict[str, float] = Field(default_factory=dict, description="Causal attribution weights") |
| | drift_signature: Dict[str, float] = Field(default_factory=dict, description="Belief drift metrics") |
| | signal_id: str = Field(default_factory=lambda: str(uuid.uuid4()), description="Unique signal identifier") |
| | timestamp: datetime.datetime = Field(default_factory=datetime.datetime.now) |
| |
|
| |
|
| | class AgentState(BaseModel): |
| | """Persistent agent state with temporal memory and belief dynamics.""" |
| | |
| | working_memory: Dict[str, Any] = Field(default_factory=dict, description="Short-term memory") |
| | belief_state: Dict[str, float] = Field(default_factory=dict, description="Current belief distribution") |
| | confidence_history: List[float] = Field(default_factory=list, description="Historical confidence") |
| | decision_history: List[Dict[str, Any]] = Field(default_factory=list, description="Past decisions") |
| | performance_trace: Dict[str, float] = Field(default_factory=dict, description="Performance metrics") |
| | reflective_state: Dict[str, Any] = Field(default_factory=dict, description="Self-awareness metrics") |
| | drift_vector: Dict[str, float] = Field(default_factory=dict, description="Direction of belief evolution") |
| | consistency_score: float = Field(default=1.0, description="Internal consistency metric") |
| | last_update: datetime.datetime = Field(default_factory=datetime.datetime.now) |
| |
|
| |
|
| | class BaseAgent: |
| | """ |
| | Base class for all investment agents in the AGI-HEDGE-FUND framework. |
| | |
| | Implements the core cognitive architecture including: |
| | - Recursive reasoning loops |
| | - Memory persistence |
| | - Attribution tracing |
| | - Value-weighted decision making |
| | - Symbolic state representation |
| | """ |
| | |
| | def __init__( |
| | self, |
| | name: str, |
| | philosophy: str, |
| | reasoning_depth: int = 3, |
| | memory_decay: float = 0.2, |
| | initial_capital: float = 100000.0, |
| | model_provider: str = "anthropic", |
| | model_name: str = "claude-3-sonnet-20240229", |
| | trace_enabled: bool = False, |
| | ): |
| | """ |
| | Initialize a cognitive investment agent. |
| | |
| | Args: |
| | name: Agent identifier name |
| | philosophy: Investment philosophy description |
| | reasoning_depth: Depth of recursive reasoning (higher = deeper thinking) |
| | memory_decay: Rate of memory deterioration (0.0-1.0) |
| | initial_capital: Starting capital amount |
| | model_provider: LLM provider ("anthropic", "openai", "groq", "ollama", "deepseek") |
| | model_name: Specific model identifier |
| | trace_enabled: Whether to generate full reasoning traces |
| | """ |
| | self.id = str(uuid.uuid4()) |
| | self.name = name |
| | self.philosophy = philosophy |
| | self.reasoning_depth = reasoning_depth |
| | self.memory_decay = memory_decay |
| | self.initial_capital = initial_capital |
| | self.current_capital = initial_capital |
| | self.trace_enabled = trace_enabled |
| | |
| | |
| | self.state = AgentState() |
| | self.memory_shell = MemoryShell(decay_rate=memory_decay) |
| | self.attribution_tracer = AttributionTracer() |
| | self.llm = ModelRouter(provider=model_provider, model=model_name) |
| | |
| | |
| | self.reasoning_graph = ReasoningGraph( |
| | agent_name=self.name, |
| | agent_philosophy=self.philosophy, |
| | model_router=self.llm, |
| | ) |
| | |
| | |
| | self.tracer = TracingTools(agent_id=self.id, agent_name=self.name) |
| | |
| | |
| | self._commands = { |
| | "reflect.trace": self._reflect_trace, |
| | "fork.signal": self._fork_signal, |
| | "collapse.detect": self._collapse_detect, |
| | "attribute.weight": self._attribute_weight, |
| | "drift.observe": self._drift_observe, |
| | } |
| |
|
| | def process_market_data(self, data: Dict[str, Any]) -> Dict[str, Any]: |
| | """ |
| | Process market data through agent's cognitive lens. |
| | |
| | Args: |
| | data: Market data dictionary |
| | |
| | Returns: |
| | Processed market data with agent-specific insights |
| | """ |
| | |
| | raise NotImplementedError("Agent subclasses must implement process_market_data") |
| | |
| | def generate_signals(self, processed_data: Dict[str, Any]) -> List[AgentSignal]: |
| | """ |
| | Generate investment signals based on processed market data. |
| | |
| | Args: |
| | processed_data: Processed market data |
| | |
| | Returns: |
| | List of investment signals with attribution |
| | """ |
| | |
| | raise NotImplementedError("Agent subclasses must implement generate_signals") |
| | |
| | def update_state(self, market_feedback: Dict[str, Any]) -> None: |
| | """ |
| | Update agent's internal state based on market feedback. |
| | |
| | Args: |
| | market_feedback: Dictionary containing market performance data |
| | """ |
| | |
| | self.memory_shell.add_experience(market_feedback) |
| | |
| | |
| | self._update_beliefs(market_feedback) |
| | |
| | |
| | self._update_performance_metrics(market_feedback) |
| | |
| | |
| | self.memory_shell.apply_decay() |
| | |
| | |
| | self.state.last_update = datetime.datetime.now() |
| | |
| | def _update_beliefs(self, market_feedback: Dict[str, Any]) -> None: |
| | """ |
| | Update agent's belief state based on market feedback. |
| | |
| | Args: |
| | market_feedback: Dictionary containing market performance data |
| | """ |
| | |
| | if 'performance' in market_feedback: |
| | performance = market_feedback['performance'] |
| | |
| | |
| | if 'decisions' in market_feedback: |
| | for decision in market_feedback['decisions']: |
| | self.state.decision_history.append({ |
| | 'decision': decision, |
| | 'outcome': performance.get(decision.get('ticker'), 0), |
| | 'timestamp': datetime.datetime.now() |
| | }) |
| | |
| | |
| | for ticker, perf in performance.items(): |
| | current_belief = self.state.belief_state.get(ticker, 0.5) |
| | |
| | updated_belief = (current_belief * (1 - self.memory_decay) + |
| | np.tanh(perf) * self.memory_decay) |
| | self.state.belief_state[ticker] = updated_belief |
| | |
| | |
| | if ticker in self.state.belief_state: |
| | drift = updated_belief - current_belief |
| | self.state.drift_vector[ticker] = drift |
| | |
| | def _update_performance_metrics(self, market_feedback: Dict[str, Any]) -> None: |
| | """ |
| | Update agent's performance metrics based on market feedback. |
| | |
| | Args: |
| | market_feedback: Dictionary containing market performance data |
| | """ |
| | if 'portfolio_value' in market_feedback: |
| | |
| | portfolio_value = market_feedback['portfolio_value'] |
| | prior_value = self.state.performance_trace.get('portfolio_value', self.initial_capital) |
| | returns = (portfolio_value - prior_value) / prior_value if prior_value > 0 else 0 |
| | |
| | |
| | self.state.performance_trace.update({ |
| | 'portfolio_value': portfolio_value, |
| | 'returns': returns, |
| | 'cumulative_return': (portfolio_value / self.initial_capital) - 1, |
| | 'win_rate': market_feedback.get('win_rate', 0), |
| | 'sharpe_ratio': market_feedback.get('sharpe_ratio', 0), |
| | }) |
| | |
| | |
| | self.current_capital = portfolio_value |
| | |
| | |
| | avg_confidence = market_feedback.get('avg_confidence', 0.5) |
| | self.state.confidence_history.append(avg_confidence) |
| | |
| | |
| | self._update_consistency_score(market_feedback) |
| | |
| | def _update_consistency_score(self, market_feedback: Dict[str, Any]) -> None: |
| | """ |
| | Update agent's internal consistency score. |
| | |
| | Args: |
| | market_feedback: Dictionary containing market performance data |
| | """ |
| | |
| | if 'decisions' in market_feedback and 'performance' in market_feedback: |
| | performance = market_feedback['performance'] |
| | decision_consistency = [] |
| | |
| | for decision in market_feedback['decisions']: |
| | ticker = decision.get('ticker') |
| | expected_direction = 1 if decision.get('action') == 'buy' else -1 |
| | actual_performance = performance.get(ticker, 0) |
| | actual_direction = 1 if actual_performance > 0 else -1 |
| | |
| | |
| | direction_consistency = 1 if expected_direction == actual_direction else -1 |
| | decision_consistency.append(direction_consistency) |
| | |
| | |
| | if decision_consistency: |
| | avg_consistency = sum(decision_consistency) / len(decision_consistency) |
| | current_consistency = self.state.consistency_score |
| | self.state.consistency_score = (current_consistency * 0.8) + (avg_consistency * 0.2) |
| | |
| | def attribute_signals(self, signals: List[Dict[str, Any]]) -> List[AgentSignal]: |
| | """ |
| | Add attribution traces to raw signals. |
| | |
| | Args: |
| | signals: Raw investment signals |
| | |
| | Returns: |
| | Signals with attribution traces |
| | """ |
| | attributed_signals = [] |
| | for signal in signals: |
| | |
| | attribution = self.attribution_tracer.trace_attribution( |
| | signal=signal, |
| | agent_state=self.state, |
| | reasoning_depth=self.reasoning_depth |
| | ) |
| | |
| | |
| | agent_signal = AgentSignal( |
| | ticker=signal.get('ticker', ''), |
| | action=signal.get('action', 'hold'), |
| | confidence=signal.get('confidence', 0.5), |
| | quantity=signal.get('quantity'), |
| | reasoning=signal.get('reasoning', ''), |
| | intent=signal.get('intent', ''), |
| | value_basis=signal.get('value_basis', ''), |
| | attribution_trace=attribution.get('attribution_trace', {}), |
| | drift_signature=self.state.drift_vector, |
| | ) |
| | attributed_signals.append(agent_signal) |
| | |
| | |
| | if self.trace_enabled: |
| | self.tracer.record_signal(agent_signal) |
| | |
| | return attributed_signals |
| | |
| | def reset(self) -> None: |
| | """Reset agent to initial state while preserving memory decay pattern.""" |
| | |
| | self.current_capital = self.initial_capital |
| | |
| | |
| | belief_state = self.state.belief_state.copy() |
| | drift_vector = self.state.drift_vector.copy() |
| | |
| | |
| | self.state = AgentState( |
| | belief_state=belief_state, |
| | drift_vector=drift_vector, |
| | ) |
| | |
| | |
| | for key in self.state.belief_state: |
| | self.state.belief_state[key] *= (1 - self.memory_decay) |
| | |
| | def get_state_report(self) -> Dict[str, Any]: |
| | """ |
| | Generate a detailed report of agent's current state. |
| | |
| | Returns: |
| | Dictionary containing agent state information |
| | """ |
| | return { |
| | 'agent_id': self.id, |
| | 'agent_name': self.name, |
| | 'philosophy': self.philosophy, |
| | 'reasoning_depth': self.reasoning_depth, |
| | 'capital': self.current_capital, |
| | 'belief_state': self.state.belief_state, |
| | 'performance': self.state.performance_trace, |
| | 'consistency': self.state.consistency_score, |
| | 'decision_count': len(self.state.decision_history), |
| | 'avg_confidence': np.mean(self.state.confidence_history[-10:]) if self.state.confidence_history else 0.5, |
| | 'drift_vector': self.state.drift_vector, |
| | 'memory_decay': self.memory_decay, |
| | 'timestamp': datetime.datetime.now(), |
| | } |
| | |
| | def save_state(self, filepath: Optional[str] = None) -> Dict[str, Any]: |
| | """ |
| | Save agent state to file or return serializable state. |
| | |
| | Args: |
| | filepath: Optional path to save state |
| | |
| | Returns: |
| | Dictionary containing serializable agent state |
| | """ |
| | state_dict = { |
| | 'agent_id': self.id, |
| | 'agent_name': self.name, |
| | 'agent_state': self.state.dict(), |
| | 'memory_shell': self.memory_shell.export_state(), |
| | 'reasoning_depth': self.reasoning_depth, |
| | 'memory_decay': self.memory_decay, |
| | 'current_capital': self.current_capital, |
| | 'initial_capital': self.initial_capital, |
| | 'timestamp': datetime.datetime.now().isoformat(), |
| | } |
| | |
| | if filepath: |
| | import json |
| | with open(filepath, 'w') as f: |
| | json.dump(state_dict, f) |
| | |
| | return state_dict |
| | |
| | def load_state(self, state_dict: Dict[str, Any]) -> None: |
| | """ |
| | Load agent state from dictionary. |
| | |
| | Args: |
| | state_dict: Dictionary containing agent state |
| | """ |
| | if state_dict['agent_id'] != self.id: |
| | raise ValueError(f"State ID mismatch: {state_dict['agent_id']} vs {self.id}") |
| | |
| | |
| | self.reasoning_depth = state_dict.get('reasoning_depth', self.reasoning_depth) |
| | self.memory_decay = state_dict.get('memory_decay', self.memory_decay) |
| | self.current_capital = state_dict.get('current_capital', self.current_capital) |
| | self.initial_capital = state_dict.get('initial_capital', self.initial_capital) |
| | |
| | |
| | if 'agent_state' in state_dict: |
| | self.state = AgentState.parse_obj(state_dict['agent_state']) |
| | |
| | |
| | if 'memory_shell' in state_dict: |
| | self.memory_shell.import_state(state_dict['memory_shell']) |
| | |
| | |
| | def _reflect_trace(self, agent=None, depth=3) -> Dict[str, Any]: |
| | """ |
| | Trace agent's self-reflection on decision making process. |
| | |
| | Args: |
| | agent: Optional agent name to reflect on (self if None) |
| | depth: Recursion depth for reflection |
| | |
| | Returns: |
| | Reflection trace results |
| | """ |
| | |
| | if agent is None or agent == self.name: |
| | reflection = self.reasoning_graph.run_reflection( |
| | agent_state=self.state, |
| | depth=depth, |
| | trace_enabled=self.trace_enabled |
| | ) |
| | return { |
| | 'source': 'self', |
| | 'reflection': reflection, |
| | 'depth': depth, |
| | 'confidence': reflection.get('confidence', 0.5), |
| | 'timestamp': datetime.datetime.now(), |
| | } |
| | |
| | return { |
| | 'source': 'external', |
| | 'target': agent, |
| | 'depth': depth, |
| | 'status': 'delegated', |
| | 'timestamp': datetime.datetime.now(), |
| | } |
| | |
| | def _fork_signal(self, source) -> Dict[str, Any]: |
| | """ |
| | Fork a new signal branch from specified source. |
| | |
| | Args: |
| | source: Source to fork signal from |
| | |
| | Returns: |
| | Forked signal results |
| | """ |
| | if source == 'memory': |
| | |
| | experiences = self.memory_shell.get_relevant_experiences(limit=3) |
| | forked_signals = self.reasoning_graph.generate_from_experiences(experiences) |
| | return { |
| | 'source': 'memory', |
| | 'signals': forked_signals, |
| | 'count': len(forked_signals), |
| | 'timestamp': datetime.datetime.now(), |
| | } |
| | elif source == 'beliefs': |
| | |
| | top_beliefs = dict(sorted( |
| | self.state.belief_state.items(), |
| | key=lambda x: abs(x[1] - 0.5), |
| | reverse=True |
| | )[:5]) |
| | forked_signals = self.reasoning_graph.generate_from_beliefs(top_beliefs) |
| | return { |
| | 'source': 'beliefs', |
| | 'signals': forked_signals, |
| | 'count': len(forked_signals), |
| | 'timestamp': datetime.datetime.now(), |
| | } |
| | else: |
| | |
| | return { |
| | 'source': source, |
| | 'signals': [], |
| | 'count': 0, |
| | 'error': 'unknown_source', |
| | 'timestamp': datetime.datetime.now(), |
| | } |
| | |
| | def _collapse_detect(self, threshold=0.7, reason=None) -> Dict[str, Any]: |
| | """ |
| | Detect potential decision collapse or inconsistency. |
| | |
| | Args: |
| | threshold: Consistency threshold below which to trigger collapse detection |
| | reason: Optional specific reason to check for collapse |
| | |
| | Returns: |
| | Collapse detection results |
| | """ |
| | |
| | consistency_collapse = self.state.consistency_score < threshold |
| | |
| | |
| | collapses = { |
| | 'consistency': consistency_collapse, |
| | 'confidence': np.mean(self.state.confidence_history[-5:]) < threshold if len(self.state.confidence_history) >= 5 else False, |
| | 'belief_drift': any(abs(drift) > 0.3 for drift in self.state.drift_vector.values()), |
| | 'performance': self.state.performance_trace.get('returns', 0) < -0.1 if self.state.performance_trace else False, |
| | } |
| | |
| | |
| | if reason and reason in collapses: |
| | collapse_detected = collapses[reason] |
| | collapse_reasons = {reason: collapses[reason]} |
| | else: |
| | |
| | collapse_detected = any(collapses.values()) |
| | collapse_reasons = {k: v for k, v in collapses.items() if v} |
| | |
| | return { |
| | 'collapse_detected': collapse_detected, |
| | 'collapse_reasons': collapse_reasons, |
| | 'consistency_score': self.state.consistency_score, |
| | 'threshold': threshold, |
| | 'timestamp': datetime.datetime.now(), |
| | } |
| | |
| | def _attribute_weight(self, justification) -> Dict[str, Any]: |
| | """ |
| | Compute attribution weight for a specific justification. |
| | |
| | Args: |
| | justification: Justification to weight |
| | |
| | Returns: |
| | Attribution weight results |
| | """ |
| | |
| | themes = self.reasoning_graph.extract_themes(justification) |
| | |
| | |
| | philosophy_alignment = self.reasoning_graph.compute_alignment( |
| | themes=themes, |
| | philosophy=self.philosophy |
| | ) |
| | |
| | |
| | weight = philosophy_alignment * self.state.consistency_score |
| | |
| | return { |
| | 'justification': justification, |
| | 'themes': themes, |
| | 'philosophy_alignment': philosophy_alignment, |
| | 'consistency_factor': self.state.consistency_score, |
| | 'attribution_weight': weight, |
| | 'timestamp': datetime.datetime.now(), |
| | } |
| | |
| | def _drift_observe(self, vector, bias=0.0) -> Dict[str, Any]: |
| | """ |
| | Observe and record belief drift. |
| | |
| | Args: |
| | vector: Drift vector to observe |
| | bias: Optional bias adjustment |
| | |
| | Returns: |
| | Drift observation results |
| | """ |
| | |
| | for key, value in vector.items(): |
| | if key in self.state.drift_vector: |
| | |
| | self.state.drift_vector[key] = (self.state.drift_vector[key] * 0.7) + (value * 0.3) + bias |
| | else: |
| | |
| | self.state.drift_vector[key] = value + bias |
| | |
| | |
| | drift_magnitude = np.sqrt(sum(v**2 for v in self.state.drift_vector.values())) |
| | |
| | return { |
| | 'drift_vector': self.state.drift_vector, |
| | 'drift_magnitude': drift_magnitude, |
| | 'bias_applied': bias, |
| | 'observation_count': len(vector), |
| | 'timestamp': datetime.datetime.now(), |
| | } |
| | |
| | def execute_command(self, command: str, **kwargs) -> Dict[str, Any]: |
| | """ |
| | Execute an internal symbolic command. |
| | |
| | Args: |
| | command: Command to execute |
| | **kwargs: Command parameters |
| | |
| | Returns: |
| | Command execution results |
| | """ |
| | if command in self._commands: |
| | return self._commands[command](**kwargs) |
| | else: |
| | return { |
| | 'error': 'unknown_command', |
| | 'command': command, |
| | 'available_commands': list(self._commands.keys()), |
| | 'timestamp': datetime.datetime.now(), |
| | } |
| |
|
| | def __repr__(self) -> str: |
| | """Generate string representation of agent.""" |
| | return f"{self.name} Agent (ID: {self.id[:8]}, Philosophy: {self.philosophy})" |
| |
|
| | def __str__(self) -> str: |
| | """Generate human-readable string for agent.""" |
| | return f"{self.name} Agent: {self.philosophy} (Depth: {self.reasoning_depth})" |
| |
|