Spaces:
Running
Running
| from typing import Dict, Any, Optional | |
| from langgraph.graph import StateGraph, START, END | |
| from langgraph.checkpoint.memory import InMemorySaver | |
| from langgraph.types import RetryPolicy | |
| from .state import AgentState, create_initial_state | |
| from ..agents.data_collection_agent import data_collection_agent_node | |
| from ..agents.technical_analysis_agent import technical_analysis_agent_node | |
| from ..agents.news_intelligence_agent import news_intelligence_agent_node | |
| from ..agents.portfolio_manager_agent import portfolio_manager_agent_node | |
| def _log_partial(updates: dict, agent_name: str) -> None: | |
| """Log interesting fields from a partial-state update dict.""" | |
| print(f"\n{agent_name} Agent Complete:") | |
| if agent_name == "Data Collection": | |
| data_results = updates.get('data_collection_results') | |
| if data_results: | |
| market_data = data_results.get('market_data', {}) | |
| print(f"Current Price: ${market_data.get('current_price', 'N/A')}") | |
| elif agent_name == "Technical Analysis": | |
| tech_results = updates.get('technical_analysis_results') | |
| if tech_results: | |
| print(f"Technical Success: {tech_results.get('success', False)}") | |
| elif agent_name == "News Intelligence": | |
| news_results = updates.get('news_intelligence_results') | |
| if news_results: | |
| print(f"News Success: {news_results.get('success', False)}") | |
| elif agent_name == "Portfolio Manager": | |
| portfolio_results = updates.get('portfolio_manager_results', {}) | |
| for sym, sym_data in portfolio_results.items(): | |
| if sym_data and sym_data.get('success'): | |
| print(f"Signal: {sym_data.get('trading_signal', 'N/A')} | " | |
| f"Confidence: {sym_data.get('confidence_level', 'N/A')}") | |
| if updates.get('error'): | |
| print(f"Error: {updates['error']}") | |
| async def debug_data_collection_node(state: AgentState) -> dict: | |
| """Data collection node with debug output.""" | |
| updates = await data_collection_agent_node(state) | |
| _log_partial(updates, "Data Collection") | |
| return updates | |
| async def debug_technical_analysis_node(state: AgentState) -> dict: | |
| """Technical analysis node with debug output.""" | |
| updates = await technical_analysis_agent_node(state) | |
| _log_partial(updates, "Technical Analysis") | |
| return updates | |
| async def debug_news_intelligence_node(state: AgentState) -> dict: | |
| """News intelligence node with debug output.""" | |
| updates = await news_intelligence_agent_node(state) | |
| _log_partial(updates, "News Intelligence") | |
| return updates | |
| async def debug_portfolio_manager_node(state: AgentState) -> dict: | |
| """Portfolio manager node with debug output.""" | |
| updates = await portfolio_manager_agent_node(state) | |
| _log_partial(updates, "Portfolio Manager") | |
| return updates | |
| def create_workflow() -> StateGraph: | |
| """ | |
| Create LangGraph workflow connecting all agents. | |
| Returns: | |
| StateGraph: Configured workflow graph | |
| """ | |
| # Initialize workflow | |
| _api_retry = RetryPolicy(max_attempts=3, initial_interval=2.0) | |
| workflow = StateGraph(AgentState) | |
| workflow.add_node("data_collection", debug_data_collection_node, retry=_api_retry) | |
| workflow.add_node("technical_analysis", debug_technical_analysis_node, retry=_api_retry) | |
| workflow.add_node("news_intelligence", debug_news_intelligence_node, retry=_api_retry) | |
| workflow.add_node("portfolio_manager", debug_portfolio_manager_node, retry=_api_retry) | |
| # Define linear flow | |
| workflow.add_edge(START, "data_collection") | |
| workflow.add_edge("data_collection", "technical_analysis") | |
| workflow.add_edge("technical_analysis", "news_intelligence") | |
| workflow.add_edge("news_intelligence", "portfolio_manager") | |
| workflow.add_edge("portfolio_manager", END) | |
| return workflow | |
| async def run_analysis(symbols: list[str], session_id: str = "default", analysis_date: Optional[str] = None) -> Dict[str, Any]: | |
| """ | |
| Run complete analysis workflow for symbols. | |
| Args: | |
| symbols: List of stock symbols to analyze | |
| session_id: Session identifier | |
| analysis_date: Date for analysis in YYYY-MM-DD format (optional, defaults to today) | |
| Returns: | |
| Dict with analysis results | |
| """ | |
| try: | |
| # Create workflow | |
| workflow = create_workflow() | |
| app = workflow.compile(checkpointer=InMemorySaver()) | |
| # Initialize state with analysis date | |
| initial_state = create_initial_state(session_id, symbols, analysis_date) | |
| # Run workflow | |
| config = {"configurable": {"thread_id": session_id}, "recursion_limit": 30} | |
| result = await app.ainvoke(initial_state, config) | |
| # Extract results | |
| return { | |
| 'success': True, | |
| 'session_id': session_id, | |
| 'symbols': symbols, | |
| 'analysis_date': analysis_date, | |
| 'results': { | |
| 'data_collection': result.get('data_collection_results'), | |
| 'technical_analysis': result.get('technical_analysis_results'), | |
| 'news_intelligence': result.get('news_intelligence_results'), | |
| 'portfolio_manager': result.get('portfolio_manager_results') | |
| }, | |
| 'final_step': result.get('current_step'), | |
| 'error': result.get('error') | |
| } | |
| except Exception as e: | |
| print(f"Workflow error: {e}") | |
| return { | |
| 'success': False, | |
| 'error': str(e), | |
| 'symbols': symbols, | |
| 'session_id': session_id, | |
| 'analysis_date': analysis_date | |
| } | |
| def should_continue(state: AgentState) -> str: | |
| """ | |
| Simple conditional logic for workflow routing. | |
| Args: | |
| state: Current workflow state | |
| Returns: | |
| Next step or END | |
| """ | |
| if state.get('error'): | |
| return END | |
| current_step = state.get('current_step', '') | |
| if current_step == 'data_collection_complete': | |
| return 'technical_analysis' | |
| elif current_step == 'technical_analysis_complete': | |
| return 'news_intelligence' | |
| elif current_step == 'news_intelligence_complete': | |
| return 'portfolio_manager' | |
| elif current_step == 'portfolio_management_complete': | |
| return END | |
| else: | |
| return END |