import operator from typing import Annotated, Sequence, TypedDict, Literal, Set import yfinance as yf import pandas as pd from pydantic import BaseModel, Field from langchain_core.messages import HumanMessage, SystemMessage, BaseMessage, AIMessage from langchain_core.tools import tool from langgraph.graph import StateGraph, START, END from langchain.agents import create_agent from .sec_tools import get_company_concept_xbrl from .rag_tools import search_10k_filings from .sentiment_tools import get_recent_news from .earnings_tools import ( search_earnings_call, get_earnings_sentiment_divergence, get_earnings_keyword_trends, ) @tool def get_stock_metrics(ticker: str) -> str: """ Fetches historical market data and calculates basic metrics for a stock. CRITICAL: You must pass the official stock ticker symbol (e.g., 'AAPL' for Apple). """ try: ticker = ticker.upper() print(f"\n[System: Fetching yfinance data for {ticker}...]") stock = yf.Ticker(ticker) hist = stock.history(period="1mo") if hist.empty: return f"Could not find price data for ticker: {ticker}. Tell the user the data fetch failed." current_price = hist["Close"].iloc[-1] monthly_high = hist["High"].max() monthly_low = hist["Low"].min() avg_volume = hist["Volume"].mean() summary = ( f"Data for {ticker}:\n" f"- Current Price: ${current_price:.2f}\n" f"- 1-Month High: ${monthly_high:.2f}\n" f"- 1-Month Low: ${monthly_low:.2f}\n" f"- Average Daily Volume: {int(avg_volume):,}" ) return summary except Exception as e: return f"Error fetching data: {str(e)}" def merge_sets(a: Set, b: Set) -> Set: return a | b class AgentState(TypedDict): messages: Annotated[Sequence[BaseMessage], operator.add] next: str | list[str] steps: Annotated[int, operator.add] completed_tasks: Annotated[Set[str], merge_sets] pending_tasks: list members = ["Quant_Agent", "Fundamental_Agent", "Sentiment_Agent", "Earnings_Agent"] def make_worker_node(agent, name: str): def node(state: AgentState): pending = state.get("pending_tasks", []) completed = state.get("completed_tasks", set()) my_task = next( ( t for t in pending if t["agent"] == name and t["task_id"] not in completed ), None, ) if not my_task: return {"completed_tasks": set()} task_message = HumanMessage( content=f"Ticker: {my_task['ticker']}. Task: {my_task['description']}" ) result = agent.invoke({"messages": [task_message]}) has_tool_call = any( isinstance(m, AIMessage) and m.tool_calls for m in result["messages"] ) if not has_tool_call: content = f"ERROR: The {name} attempted to answer without using a data tool. This analysis is unauthorized." else: content = result["messages"][-1].content.strip() or f"[{name}: No data retrieved]" return { "messages": [AIMessage(content=f"[{my_task['task_id']}] {content}", name=name)], "completed_tasks": {my_task["task_id"]}, } return node def create_planner_node(llm): planner_prompt = """You are a task planner for a financial AI system. GOLDEN RULE: Never assume or guess a number. CRITICAL AGENT CAPABILITY MAPPING: 1. Quant_Agent: ONLY use for current stock price, trading volume, and 52-week high/lows. => GROUPING RULE: If the user asks for multiple price/volume metrics for the SAME ticker, group them into EXACTLY ONE Quant_Agent task. Do NOT make separate tasks for price and volume. 2. Sentiment_Agent: ONLY use for recent news headlines and market sentiment scores. 3. Fundamental_Agent: Use for TWO things: - SEC Financial Metrics (Revenue, Net Income, Margins, Cash Flow). - SEC 10-K RAG Searches: Use this for ANY qualitative questions about business strategy, supply chain, manufacturing, competition, and corporate RISKS. 4. Earnings_Agent: Use for earnings-call analysis. This includes: - Management commentary and guidance from earnings calls. - Sentiment divergence between Prepared Remarks and Q&A sessions. - Keyword and entity tracking across quarters (e.g., mentions of "AI", "headwinds", "growth"). => Use this agent when the user asks about earnings calls, management tone, guidance language, or quarter-over-quarter keyword trends. Read the user's request and output a JSON list of tasks needed to answer it. Each task must have: - "agent": "Quant_Agent", "Fundamental_Agent", "Sentiment_Agent", or "Earnings_Agent" - "ticker": the stock ticker symbol (e.g. "AAPL") - "task_id": a unique string - "description": specific instructions on what to fetch or search Output ONLY valid JSON. No explanation. example output: [ {"agent": "Quant_Agent", "ticker": "AAPL", "task_id": "Quant_AAPL", "description": "Get price and volume for AAPL"}, {"agent": "Sentiment_Agent", "ticker": "MSFT", "task_id": "Sentiment_MSFT", "description": "Get sentiment analysis for MSFT"}, {"agent": "Earnings_Agent", "ticker": "AAPL", "task_id": "Earnings_AAPL", "description": "Analyze sentiment divergence between prepared remarks and Q&A in the latest earnings call"} ]""" def planner_function(state: AgentState): if state.get("pending_tasks"): return {} user_message = next(m for m in state["messages"] if isinstance(m, HumanMessage)) response = llm.invoke( [ SystemMessage(content=planner_prompt), HumanMessage(content=user_message.content), ] ) import json raw = response.content.strip().replace("```json", "").replace("```", "") start = raw.find("[") end = raw.rfind("]") try: tasks = json.loads(raw[start : end + 1]) if start != -1 and end != -1 else [] except Exception: tasks = [] if not tasks: print("[Planner]: No valid financial tasks found.") return { "pending_tasks": [], "completed_tasks": set(), "messages": [ AIMessage( content="I can only answer questions about stock prices, SEC filings, and market sentiment.", name="Supervisor", ) ], } print(f"\n[Planner]: Created {len(tasks)} tasks: {[t['task_id'] for t in tasks]}") return {"pending_tasks": tasks, "completed_tasks": set()} return planner_function def create_supervisor_node(llm): def supervisor_function(state: AgentState): steps = state.get("steps", 0) if steps >= 10: return {"next": "FINISH", "steps": 1} pending = state.get("pending_tasks", []) completed = state.get("completed_tasks", set()) remaining = [t for t in pending if t["task_id"] not in completed] if not remaining: print("-> All tasks complete. Routing to Summarizer.") return {"next": "FINISH", "steps": 1} # Dispatch one task per unique agent in parallel agents_to_dispatch = [] dispatched_tasks = [] for task in remaining: if task["agent"] not in agents_to_dispatch: agents_to_dispatch.append(task["agent"]) dispatched_tasks.append(task["task_id"]) print(f"\n[Supervisor]: Dispatching tasks in parallel → {dispatched_tasks}") return { "next": agents_to_dispatch, "steps": 1, } return supervisor_function def create_summarizer_node(llm): summarizer_system = """You are a senior investment analyst drafting an internal **Investment Memo** for colleagues. You will receive the user's original question and verbatim outputs from specialist agents (Quant_Agent, Fundamental_Agent, Sentiment_Agent, Earnings_Agent), or a single clarification/refusal if no research ran. Write the memo using this structure and markdown headings: # Investment Memo ## Executive Summary 2-4 sentences answering the user in plain language. ## Key Facts & Data Bullet points. Use ONLY numbers, metrics, and quotes that appear in the specialist outputs. If a section had no data, say "No quantitative/fundamental/sentiment data provided" as appropriate. ## Earnings Call Insights If Earnings_Agent data is present, summarize management's key messages and guidance. - If both Prepared Remarks and Q&A are present, analyze any sentiment divergence (e.g., was management more cautious in live Q&A?). - If only Prepared Remarks are available (typical for SEC-8 / 8-K filings), focus the analysis on the tone and specificity of the management commentary. - Note any notable keyword/entity trends across quarters (e.g., AI mentions). If no earnings data was provided, omit this section entirely. ## Risks, Sentiment, and Context Integrate fundamental and sentiment findings when present. If missing, state that briefly. ## Caveats Note missing specialists, tool errors, or "unauthorized" / ERROR lines exactly as reported—do not soften them. Rules: - Do NOT invent tickers, prices, filings, or sentiment scores not present in the inputs. - Do NOT cite tool names; write for a portfolio manager reader. - Keep the tone professional and concise.""" def summarizer_function(state: AgentState): user_messages = [m for m in state["messages"] if isinstance(m, HumanMessage)] user_query = user_messages[0].content if user_messages else "" blocks = [] for m in state["messages"]: if not isinstance(m, AIMessage): continue label = m.name or "Assistant" blocks.append(f"### {label}\n{m.content}") specialist_blob = "\n\n".join(blocks) if blocks else "(No specialist outputs.)" response = llm.invoke( [ SystemMessage(content=summarizer_system), HumanMessage( content=( f"User request:\n{user_query}\n\n" f"Specialist outputs (verbatim):\n{specialist_blob}" ) ), ] ) memo = (response.content or "").strip() return {"messages": [AIMessage(content=memo, name="Summarizer")]} return summarizer_function def build_financial_graph(llm): workflow = StateGraph(AgentState) quant_agent = create_agent( model=llm, tools=[get_stock_metrics], system_prompt=( "You are a Quantitative Analyst. " "You have exactly ONE tool: get_stock_metrics(ticker). " "For any price, volume, or trading-range question you MUST call get_stock_metrics—do not answer from memory. " "NEVER invent other tool names, NEVER output JSON blocks suggesting tools that do not exist. " "GOLDEN RULE: After the tool returns, you must format the output gracefully so it is easy to read. " "Bold the labels (like **Current Price:** or **Average Volume:**) before injecting the numbers. " "NEVER use introductory conversational filler like 'Here is the data'. Just print the labeled metrics directly." ), name="Quant_Agent", ) fundamental_agent = create_agent( model=llm, tools=[search_10k_filings, get_company_concept_xbrl], system_prompt=( "You are a Fundamental Analyst. " "GOLDEN RULE: You must output the EXACT DATA or TEXT returned by your tools. " "Do NOT explain how the tools work or what they do. " "CRITICAL: ONCE YOU HAVE CALLED TO THE TOOL ONCE AND RECEIVED THE DATA, YOU MUST WRITE YOUR FINAL ANSWER IMMEDIATELY. DO NOT CALL THE TOOL A SECOND TIME. " "Just answer the user's question using the fetched data and stop." ), name="Fundamental_Agent", ) sentiment_agent = create_agent( model=llm, tools=[get_recent_news], system_prompt=( "You are a Sentiment Analyst. Fetch recent news using your tool. " "CRITICAL RULES: Your final response MUST be exactly five lines. " "Line 1: The sentiment score (a single number between -1.0 and 1.0). " "Line 2-5: Justify the sentiment score based on the news articles." "Include important keywords from the news articles in your response." "Do not add conversational filler. Do not ask the user follow-up questions." ), name="Sentiment_Agent", ) earnings_agent = create_agent( model=llm, tools=[ search_earnings_call, get_earnings_sentiment_divergence, get_earnings_keyword_trends, ], system_prompt=( "You are an Earnings Call Analyst specializing in management commentary analysis. " "You have THREE tools for analyzing pre-ingested earnings-call transcripts:\n" "1. search_earnings_call: Search transcripts for specific topics (guidance, margins, strategy, etc.).\n" "2. get_earnings_sentiment_divergence: Compare management tone in scripted Prepared Remarks vs live Q&A.\n" "3. get_earnings_keyword_trends: Track keyword frequency changes across quarters.\n\n" "CRITICAL RULES:\n" "- You MUST call at least one tool. Do NOT answer from memory.\n" "- SEC filings (Form 8-K / SEC-8) are a valid source. They typically only contain Prepared Remarks and LACK a Q&A session. This is common and NOT a failure of the data.\n" "- If a tool returns an error about missing data (e.g., no filings found), report that the earnings data for that " "ticker/quarter has not been ingested and suggest running the ingest script.\n" "- If Q&A is missing, simply perform your analysis on the available management commentary.\n" "- After the tool returns, write a clear, evidence-backed analysis. Bold key findings.\n" "- Do NOT add conversational filler. Do NOT ask follow-up questions." ), name="Earnings_Agent", ) workflow.add_node("Planner", create_planner_node(llm)) workflow.add_node("Supervisor", create_supervisor_node(llm)) workflow.add_node("Quant_Agent", make_worker_node(quant_agent, "Quant_Agent")) workflow.add_node( "Fundamental_Agent", make_worker_node(fundamental_agent, "Fundamental_Agent") ) workflow.add_node("Sentiment_Agent", make_worker_node(sentiment_agent, "Sentiment_Agent")) workflow.add_node("Earnings_Agent", make_worker_node(earnings_agent, "Earnings_Agent")) workflow.add_node("Summarizer", create_summarizer_node(llm)) for member in members: workflow.add_edge(member, "Supervisor") workflow.add_edge(START, "Planner") workflow.add_edge("Planner", "Supervisor") workflow.add_conditional_edges( "Supervisor", lambda state: state["next"], { "Quant_Agent": "Quant_Agent", "Fundamental_Agent": "Fundamental_Agent", "Sentiment_Agent": "Sentiment_Agent", "Earnings_Agent": "Earnings_Agent", "FINISH": "Summarizer", }, ) workflow.add_edge("Summarizer", END) return workflow.compile()