""" BaseAgent - Recursive Cognitive Agent Framework for AGI-HEDGE-FUND This module implements the foundational cognitive architecture for all investment agents. Each agent inherits from this base class to enable: - Recursive reasoning loops with customizable depth - Transparent attribution tracing for decision provenance - Temporal memory shell with configurable decay - Value-weighted decision encoding - Symbolic state representation for interpretability Internal Note: Base class encodes the recursive interpretability interface (.p/ command patterns) in line with Anthropic-style Circuit Tracing research while maintaining familiar investment syntax. """ import uuid import datetime from typing import Dict, List, Any, Optional, Tuple import numpy as np from pydantic import BaseModel, Field from ..cognition.graph import ReasoningGraph from ..cognition.memory import MemoryShell from ..cognition.attribution import AttributionTracer from ..llm.router import ModelRouter from ..utils.diagnostics import TracingTools class AgentSignal(BaseModel): """Signal generated by an agent with full attribution and confidence metrics.""" ticker: str = Field(..., description="Stock ticker symbol") action: str = Field(..., description="buy, sell, or hold") confidence: float = Field(..., description="Confidence level (0.0-1.0)") quantity: Optional[int] = Field(None, description="Number of shares to trade") reasoning: str = Field(..., description="Explicit reasoning chain") intent: str = Field(..., description="High-level investment intent") value_basis: str = Field(..., description="Core value driving this decision") attribution_trace: Dict[str, float] = Field(default_factory=dict, description="Causal attribution weights") drift_signature: Dict[str, float] = Field(default_factory=dict, description="Belief drift metrics") signal_id: str = Field(default_factory=lambda: str(uuid.uuid4()), description="Unique signal identifier") timestamp: datetime.datetime = Field(default_factory=datetime.datetime.now) class AgentState(BaseModel): """Persistent agent state with temporal memory and belief dynamics.""" working_memory: Dict[str, Any] = Field(default_factory=dict, description="Short-term memory") belief_state: Dict[str, float] = Field(default_factory=dict, description="Current belief distribution") confidence_history: List[float] = Field(default_factory=list, description="Historical confidence") decision_history: List[Dict[str, Any]] = Field(default_factory=list, description="Past decisions") performance_trace: Dict[str, float] = Field(default_factory=dict, description="Performance metrics") reflective_state: Dict[str, Any] = Field(default_factory=dict, description="Self-awareness metrics") drift_vector: Dict[str, float] = Field(default_factory=dict, description="Direction of belief evolution") consistency_score: float = Field(default=1.0, description="Internal consistency metric") last_update: datetime.datetime = Field(default_factory=datetime.datetime.now) class BaseAgent: """ Base class for all investment agents in the AGI-HEDGE-FUND framework. Implements the core cognitive architecture including: - Recursive reasoning loops - Memory persistence - Attribution tracing - Value-weighted decision making - Symbolic state representation """ def __init__( self, name: str, philosophy: str, reasoning_depth: int = 3, memory_decay: float = 0.2, initial_capital: float = 100000.0, model_provider: str = "anthropic", model_name: str = "claude-3-sonnet-20240229", trace_enabled: bool = False, ): """ Initialize a cognitive investment agent. Args: name: Agent identifier name philosophy: Investment philosophy description reasoning_depth: Depth of recursive reasoning (higher = deeper thinking) memory_decay: Rate of memory deterioration (0.0-1.0) initial_capital: Starting capital amount model_provider: LLM provider ("anthropic", "openai", "groq", "ollama", "deepseek") model_name: Specific model identifier trace_enabled: Whether to generate full reasoning traces """ self.id = str(uuid.uuid4()) self.name = name self.philosophy = philosophy self.reasoning_depth = reasoning_depth self.memory_decay = memory_decay self.initial_capital = initial_capital self.current_capital = initial_capital self.trace_enabled = trace_enabled # Initialize cognitive components self.state = AgentState() self.memory_shell = MemoryShell(decay_rate=memory_decay) self.attribution_tracer = AttributionTracer() self.llm = ModelRouter(provider=model_provider, model=model_name) # Initialize reasoning graph self.reasoning_graph = ReasoningGraph( agent_name=self.name, agent_philosophy=self.philosophy, model_router=self.llm, ) # Diagnostics and tracing self.tracer = TracingTools(agent_id=self.id, agent_name=self.name) # Internal symbolic processing commands self._commands = { "reflect.trace": self._reflect_trace, "fork.signal": self._fork_signal, "collapse.detect": self._collapse_detect, "attribute.weight": self._attribute_weight, "drift.observe": self._drift_observe, } def process_market_data(self, data: Dict[str, Any]) -> Dict[str, Any]: """ Process market data through agent's cognitive lens. Args: data: Market data dictionary Returns: Processed market data with agent-specific insights """ # Each agent subclass implements its unique market interpretation raise NotImplementedError("Agent subclasses must implement process_market_data") def generate_signals(self, processed_data: Dict[str, Any]) -> List[AgentSignal]: """ Generate investment signals based on processed market data. Args: processed_data: Processed market data Returns: List of investment signals with attribution """ # Each agent subclass implements its unique signal generation logic raise NotImplementedError("Agent subclasses must implement generate_signals") def update_state(self, market_feedback: Dict[str, Any]) -> None: """ Update agent's internal state based on market feedback. Args: market_feedback: Dictionary containing market performance data """ # Update memory shell with new experiences self.memory_shell.add_experience(market_feedback) # Update belief state based on market feedback self._update_beliefs(market_feedback) # Update performance metrics self._update_performance_metrics(market_feedback) # Apply memory decay self.memory_shell.apply_decay() # Update timestamp self.state.last_update = datetime.datetime.now() def _update_beliefs(self, market_feedback: Dict[str, Any]) -> None: """ Update agent's belief state based on market feedback. Args: market_feedback: Dictionary containing market performance data """ # Extract relevant signals from market feedback if 'performance' in market_feedback: performance = market_feedback['performance'] # Record decision outcomes if 'decisions' in market_feedback: for decision in market_feedback['decisions']: self.state.decision_history.append({ 'decision': decision, 'outcome': performance.get(decision.get('ticker'), 0), 'timestamp': datetime.datetime.now() }) # Update belief state based on performance for ticker, perf in performance.items(): current_belief = self.state.belief_state.get(ticker, 0.5) # Belief update formula combines prior belief with new evidence updated_belief = (current_belief * (1 - self.memory_decay) + np.tanh(perf) * self.memory_decay) self.state.belief_state[ticker] = updated_belief # Track belief drift if ticker in self.state.belief_state: drift = updated_belief - current_belief self.state.drift_vector[ticker] = drift def _update_performance_metrics(self, market_feedback: Dict[str, Any]) -> None: """ Update agent's performance metrics based on market feedback. Args: market_feedback: Dictionary containing market performance data """ if 'portfolio_value' in market_feedback: # Calculate return portfolio_value = market_feedback['portfolio_value'] prior_value = self.state.performance_trace.get('portfolio_value', self.initial_capital) returns = (portfolio_value - prior_value) / prior_value if prior_value > 0 else 0 # Update performance trace self.state.performance_trace.update({ 'portfolio_value': portfolio_value, 'returns': returns, 'cumulative_return': (portfolio_value / self.initial_capital) - 1, 'win_rate': market_feedback.get('win_rate', 0), 'sharpe_ratio': market_feedback.get('sharpe_ratio', 0), }) # Update current capital self.current_capital = portfolio_value # Add to confidence history avg_confidence = market_feedback.get('avg_confidence', 0.5) self.state.confidence_history.append(avg_confidence) # Update consistency score self._update_consistency_score(market_feedback) def _update_consistency_score(self, market_feedback: Dict[str, Any]) -> None: """ Update agent's internal consistency score. Args: market_feedback: Dictionary containing market performance data """ # Measure consistency between stated reasoning and actual performance if 'decisions' in market_feedback and 'performance' in market_feedback: performance = market_feedback['performance'] decision_consistency = [] for decision in market_feedback['decisions']: ticker = decision.get('ticker') expected_direction = 1 if decision.get('action') == 'buy' else -1 actual_performance = performance.get(ticker, 0) actual_direction = 1 if actual_performance > 0 else -1 # Consistency is 1 when directions match, -1 when they don't direction_consistency = 1 if expected_direction == actual_direction else -1 decision_consistency.append(direction_consistency) # Update consistency score (moving average) if decision_consistency: avg_consistency = sum(decision_consistency) / len(decision_consistency) current_consistency = self.state.consistency_score self.state.consistency_score = (current_consistency * 0.8) + (avg_consistency * 0.2) def attribute_signals(self, signals: List[Dict[str, Any]]) -> List[AgentSignal]: """ Add attribution traces to raw signals. Args: signals: Raw investment signals Returns: Signals with attribution traces """ attributed_signals = [] for signal in signals: # Run attribution tracing attribution = self.attribution_tracer.trace_attribution( signal=signal, agent_state=self.state, reasoning_depth=self.reasoning_depth ) # Create AgentSignal with attribution agent_signal = AgentSignal( ticker=signal.get('ticker', ''), action=signal.get('action', 'hold'), confidence=signal.get('confidence', 0.5), quantity=signal.get('quantity'), reasoning=signal.get('reasoning', ''), intent=signal.get('intent', ''), value_basis=signal.get('value_basis', ''), attribution_trace=attribution.get('attribution_trace', {}), drift_signature=self.state.drift_vector, ) attributed_signals.append(agent_signal) # Record in tracer if enabled if self.trace_enabled: self.tracer.record_signal(agent_signal) return attributed_signals def reset(self) -> None: """Reset agent to initial state while preserving memory decay pattern.""" # Reset capital self.current_capital = self.initial_capital # Reset state but preserve some learning belief_state = self.state.belief_state.copy() drift_vector = self.state.drift_vector.copy() # Create new state self.state = AgentState( belief_state=belief_state, drift_vector=drift_vector, ) # Apply memory decay to preserved beliefs for key in self.state.belief_state: self.state.belief_state[key] *= (1 - self.memory_decay) def get_state_report(self) -> Dict[str, Any]: """ Generate a detailed report of agent's current state. Returns: Dictionary containing agent state information """ return { 'agent_id': self.id, 'agent_name': self.name, 'philosophy': self.philosophy, 'reasoning_depth': self.reasoning_depth, 'capital': self.current_capital, 'belief_state': self.state.belief_state, 'performance': self.state.performance_trace, 'consistency': self.state.consistency_score, 'decision_count': len(self.state.decision_history), 'avg_confidence': np.mean(self.state.confidence_history[-10:]) if self.state.confidence_history else 0.5, 'drift_vector': self.state.drift_vector, 'memory_decay': self.memory_decay, 'timestamp': datetime.datetime.now(), } def save_state(self, filepath: Optional[str] = None) -> Dict[str, Any]: """ Save agent state to file or return serializable state. Args: filepath: Optional path to save state Returns: Dictionary containing serializable agent state """ state_dict = { 'agent_id': self.id, 'agent_name': self.name, 'agent_state': self.state.dict(), 'memory_shell': self.memory_shell.export_state(), 'reasoning_depth': self.reasoning_depth, 'memory_decay': self.memory_decay, 'current_capital': self.current_capital, 'initial_capital': self.initial_capital, 'timestamp': datetime.datetime.now().isoformat(), } if filepath: import json with open(filepath, 'w') as f: json.dump(state_dict, f) return state_dict def load_state(self, state_dict: Dict[str, Any]) -> None: """ Load agent state from dictionary. Args: state_dict: Dictionary containing agent state """ if state_dict['agent_id'] != self.id: raise ValueError(f"State ID mismatch: {state_dict['agent_id']} vs {self.id}") # Update basic attributes self.reasoning_depth = state_dict.get('reasoning_depth', self.reasoning_depth) self.memory_decay = state_dict.get('memory_decay', self.memory_decay) self.current_capital = state_dict.get('current_capital', self.current_capital) self.initial_capital = state_dict.get('initial_capital', self.initial_capital) # Load agent state if 'agent_state' in state_dict: self.state = AgentState.parse_obj(state_dict['agent_state']) # Load memory shell if 'memory_shell' in state_dict: self.memory_shell.import_state(state_dict['memory_shell']) # Internal symbolic command processors def _reflect_trace(self, agent=None, depth=3) -> Dict[str, Any]: """ Trace agent's self-reflection on decision making process. Args: agent: Optional agent name to reflect on (self if None) depth: Recursion depth for reflection Returns: Reflection trace results """ # If reflecting on self if agent is None or agent == self.name: reflection = self.reasoning_graph.run_reflection( agent_state=self.state, depth=depth, trace_enabled=self.trace_enabled ) return { 'source': 'self', 'reflection': reflection, 'depth': depth, 'confidence': reflection.get('confidence', 0.5), 'timestamp': datetime.datetime.now(), } # Otherwise, this is a request to reflect on another agent (handled by portfolio manager) return { 'source': 'external', 'target': agent, 'depth': depth, 'status': 'delegated', 'timestamp': datetime.datetime.now(), } def _fork_signal(self, source) -> Dict[str, Any]: """ Fork a new signal branch from specified source. Args: source: Source to fork signal from Returns: Forked signal results """ if source == 'memory': # Fork from memory experiences = self.memory_shell.get_relevant_experiences(limit=3) forked_signals = self.reasoning_graph.generate_from_experiences(experiences) return { 'source': 'memory', 'signals': forked_signals, 'count': len(forked_signals), 'timestamp': datetime.datetime.now(), } elif source == 'beliefs': # Fork from current beliefs top_beliefs = dict(sorted( self.state.belief_state.items(), key=lambda x: abs(x[1] - 0.5), reverse=True )[:5]) forked_signals = self.reasoning_graph.generate_from_beliefs(top_beliefs) return { 'source': 'beliefs', 'signals': forked_signals, 'count': len(forked_signals), 'timestamp': datetime.datetime.now(), } else: # Unknown source return { 'source': source, 'signals': [], 'count': 0, 'error': 'unknown_source', 'timestamp': datetime.datetime.now(), } def _collapse_detect(self, threshold=0.7, reason=None) -> Dict[str, Any]: """ Detect potential decision collapse or inconsistency. Args: threshold: Consistency threshold below which to trigger collapse detection reason: Optional specific reason to check for collapse Returns: Collapse detection results """ # Check consistency score consistency_collapse = self.state.consistency_score < threshold # Check for specific collapses collapses = { 'consistency': consistency_collapse, 'confidence': np.mean(self.state.confidence_history[-5:]) < threshold if len(self.state.confidence_history) >= 5 else False, 'belief_drift': any(abs(drift) > 0.3 for drift in self.state.drift_vector.values()), 'performance': self.state.performance_trace.get('returns', 0) < -0.1 if self.state.performance_trace else False, } # If specific reason provided, check only that if reason and reason in collapses: collapse_detected = collapses[reason] collapse_reasons = {reason: collapses[reason]} else: # Check all collapses collapse_detected = any(collapses.values()) collapse_reasons = {k: v for k, v in collapses.items() if v} return { 'collapse_detected': collapse_detected, 'collapse_reasons': collapse_reasons, 'consistency_score': self.state.consistency_score, 'threshold': threshold, 'timestamp': datetime.datetime.now(), } def _attribute_weight(self, justification) -> Dict[str, Any]: """ Compute attribution weight for a specific justification. Args: justification: Justification to weight Returns: Attribution weight results """ # Extract key themes from justification themes = self.reasoning_graph.extract_themes(justification) # Compute alignment with agent's philosophy philosophy_alignment = self.reasoning_graph.compute_alignment( themes=themes, philosophy=self.philosophy ) # Compute weight based on alignment and consistency weight = philosophy_alignment * self.state.consistency_score return { 'justification': justification, 'themes': themes, 'philosophy_alignment': philosophy_alignment, 'consistency_factor': self.state.consistency_score, 'attribution_weight': weight, 'timestamp': datetime.datetime.now(), } def _drift_observe(self, vector, bias=0.0) -> Dict[str, Any]: """ Observe and record belief drift. Args: vector: Drift vector to observe bias: Optional bias adjustment Returns: Drift observation results """ # Record drift observation for key, value in vector.items(): if key in self.state.drift_vector: # Existing key, update with bias self.state.drift_vector[key] = (self.state.drift_vector[key] * 0.7) + (value * 0.3) + bias else: # New key self.state.drift_vector[key] = value + bias # Compute drift magnitude drift_magnitude = np.sqrt(sum(v**2 for v in self.state.drift_vector.values())) return { 'drift_vector': self.state.drift_vector, 'drift_magnitude': drift_magnitude, 'bias_applied': bias, 'observation_count': len(vector), 'timestamp': datetime.datetime.now(), } def execute_command(self, command: str, **kwargs) -> Dict[str, Any]: """ Execute an internal symbolic command. Args: command: Command to execute **kwargs: Command parameters Returns: Command execution results """ if command in self._commands: return self._commands[command](**kwargs) else: return { 'error': 'unknown_command', 'command': command, 'available_commands': list(self._commands.keys()), 'timestamp': datetime.datetime.now(), } def __repr__(self) -> str: """Generate string representation of agent.""" return f"{self.name} Agent (ID: {self.id[:8]}, Philosophy: {self.philosophy})" def __str__(self) -> str: """Generate human-readable string for agent.""" return f"{self.name} Agent: {self.philosophy} (Depth: {self.reasoning_depth})"