File size: 15,578 Bytes
c6d67ac 012bcc4 c6d67ac 012bcc4 c6d67ac 012bcc4 c6d67ac | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 | import operator
from typing import Annotated, Sequence, TypedDict, Literal, Set
import yfinance as yf
import pandas as pd
from pydantic import BaseModel, Field
from langchain_core.messages import HumanMessage, SystemMessage, BaseMessage, AIMessage
from langchain_core.tools import tool
from langgraph.graph import StateGraph, START, END
from langchain.agents import create_agent
from .sec_tools import get_company_concept_xbrl
from .rag_tools import search_10k_filings
from .sentiment_tools import get_recent_news
from .earnings_tools import (
search_earnings_call,
get_earnings_sentiment_divergence,
get_earnings_keyword_trends,
)
@tool
def get_stock_metrics(ticker: str) -> str:
"""
Fetches historical market data and calculates basic metrics for a stock.
CRITICAL: You must pass the official stock ticker symbol (e.g., 'AAPL' for Apple).
"""
try:
ticker = ticker.upper()
print(f"\n[System: Fetching yfinance data for {ticker}...]")
stock = yf.Ticker(ticker)
hist = stock.history(period="1mo")
if hist.empty:
return f"Could not find price data for ticker: {ticker}. Tell the user the data fetch failed."
current_price = hist["Close"].iloc[-1]
monthly_high = hist["High"].max()
monthly_low = hist["Low"].min()
avg_volume = hist["Volume"].mean()
summary = (
f"Data for {ticker}:\n"
f"- Current Price: ${current_price:.2f}\n"
f"- 1-Month High: ${monthly_high:.2f}\n"
f"- 1-Month Low: ${monthly_low:.2f}\n"
f"- Average Daily Volume: {int(avg_volume):,}"
)
return summary
except Exception as e:
return f"Error fetching data: {str(e)}"
def merge_sets(a: Set, b: Set) -> Set:
return a | b
class AgentState(TypedDict):
messages: Annotated[Sequence[BaseMessage], operator.add]
next: str | list[str]
steps: Annotated[int, operator.add]
completed_tasks: Annotated[Set[str], merge_sets]
pending_tasks: list
members = ["Quant_Agent", "Fundamental_Agent", "Sentiment_Agent", "Earnings_Agent"]
def make_worker_node(agent, name: str):
def node(state: AgentState):
pending = state.get("pending_tasks", [])
completed = state.get("completed_tasks", set())
my_task = next(
(
t
for t in pending
if t["agent"] == name and t["task_id"] not in completed
),
None,
)
if not my_task:
return {"completed_tasks": set()}
task_message = HumanMessage(
content=f"Ticker: {my_task['ticker']}. Task: {my_task['description']}"
)
result = agent.invoke({"messages": [task_message]})
has_tool_call = any(
isinstance(m, AIMessage) and m.tool_calls for m in result["messages"]
)
if not has_tool_call:
content = f"ERROR: The {name} attempted to answer without using a data tool. This analysis is unauthorized."
else:
content = result["messages"][-1].content.strip() or f"[{name}: No data retrieved]"
return {
"messages": [AIMessage(content=f"[{my_task['task_id']}] {content}", name=name)],
"completed_tasks": {my_task["task_id"]},
}
return node
def create_planner_node(llm):
planner_prompt = """You are a task planner for a financial AI system.
GOLDEN RULE: Never assume or guess a number.
CRITICAL AGENT CAPABILITY MAPPING:
1. Quant_Agent: ONLY use for current stock price, trading volume, and 52-week high/lows.
=> GROUPING RULE: If the user asks for multiple price/volume metrics for the SAME ticker, group them into EXACTLY ONE Quant_Agent task. Do NOT make separate tasks for price and volume.
2. Sentiment_Agent: ONLY use for recent news headlines and market sentiment scores.
3. Fundamental_Agent: Use for TWO things:
- SEC Financial Metrics (Revenue, Net Income, Margins, Cash Flow).
- SEC 10-K RAG Searches: Use this for ANY qualitative questions about business strategy, supply chain, manufacturing, competition, and corporate RISKS.
4. Earnings_Agent: Use for earnings-call analysis. This includes:
- Management commentary and guidance from earnings calls.
- Sentiment divergence between Prepared Remarks and Q&A sessions.
- Keyword and entity tracking across quarters (e.g., mentions of "AI", "headwinds", "growth").
=> Use this agent when the user asks about earnings calls, management tone, guidance language, or quarter-over-quarter keyword trends.
Read the user's request and output a JSON list of tasks needed to answer it.
Each task must have:
- "agent": "Quant_Agent", "Fundamental_Agent", "Sentiment_Agent", or "Earnings_Agent"
- "ticker": the stock ticker symbol (e.g. "AAPL")
- "task_id": a unique string
- "description": specific instructions on what to fetch or search
Output ONLY valid JSON. No explanation.
example output:
[
{"agent": "Quant_Agent", "ticker": "AAPL", "task_id": "Quant_AAPL", "description": "Get price and volume for AAPL"},
{"agent": "Sentiment_Agent", "ticker": "MSFT", "task_id": "Sentiment_MSFT", "description": "Get sentiment analysis for MSFT"},
{"agent": "Earnings_Agent", "ticker": "AAPL", "task_id": "Earnings_AAPL", "description": "Analyze sentiment divergence between prepared remarks and Q&A in the latest earnings call"}
]"""
def planner_function(state: AgentState):
if state.get("pending_tasks"):
return {}
user_message = next(m for m in state["messages"] if isinstance(m, HumanMessage))
response = llm.invoke(
[
SystemMessage(content=planner_prompt),
HumanMessage(content=user_message.content),
]
)
import json
raw = response.content.strip().replace("```json", "").replace("```", "")
start = raw.find("[")
end = raw.rfind("]")
try:
tasks = json.loads(raw[start : end + 1]) if start != -1 and end != -1 else []
except Exception:
tasks = []
if not tasks:
print("[Planner]: No valid financial tasks found.")
return {
"pending_tasks": [],
"completed_tasks": set(),
"messages": [
AIMessage(
content="I can only answer questions about stock prices, SEC filings, and market sentiment.",
name="Supervisor",
)
],
}
print(f"\n[Planner]: Created {len(tasks)} tasks: {[t['task_id'] for t in tasks]}")
return {"pending_tasks": tasks, "completed_tasks": set()}
return planner_function
def create_supervisor_node(llm):
def supervisor_function(state: AgentState):
steps = state.get("steps", 0)
if steps >= 10:
return {"next": "FINISH", "steps": 1}
pending = state.get("pending_tasks", [])
completed = state.get("completed_tasks", set())
remaining = [t for t in pending if t["task_id"] not in completed]
if not remaining:
print("-> All tasks complete. Routing to Summarizer.")
return {"next": "FINISH", "steps": 1}
# Dispatch one task per unique agent in parallel
agents_to_dispatch = []
dispatched_tasks = []
for task in remaining:
if task["agent"] not in agents_to_dispatch:
agents_to_dispatch.append(task["agent"])
dispatched_tasks.append(task["task_id"])
print(f"\n[Supervisor]: Dispatching tasks in parallel → {dispatched_tasks}")
return {
"next": agents_to_dispatch,
"steps": 1,
}
return supervisor_function
def create_summarizer_node(llm):
summarizer_system = """You are a senior investment analyst drafting an internal **Investment Memo** for colleagues.
You will receive the user's original question and verbatim outputs from specialist agents (Quant_Agent, Fundamental_Agent, Sentiment_Agent, Earnings_Agent), or a single clarification/refusal if no research ran.
Write the memo using this structure and markdown headings:
# Investment Memo
## Executive Summary
2-4 sentences answering the user in plain language.
## Key Facts & Data
Bullet points. Use ONLY numbers, metrics, and quotes that appear in the specialist outputs. If a section had no data, say "No quantitative/fundamental/sentiment data provided" as appropriate.
## Earnings Call Insights
If Earnings_Agent data is present, summarize management's key messages and guidance.
- If both Prepared Remarks and Q&A are present, analyze any sentiment divergence (e.g., was management more cautious in live Q&A?).
- If only Prepared Remarks are available (typical for SEC-8 / 8-K filings), focus the analysis on the tone and specificity of the management commentary.
- Note any notable keyword/entity trends across quarters (e.g., AI mentions).
If no earnings data was provided, omit this section entirely.
## Risks, Sentiment, and Context
Integrate fundamental and sentiment findings when present. If missing, state that briefly.
## Caveats
Note missing specialists, tool errors, or "unauthorized" / ERROR lines exactly as reported—do not soften them.
Rules:
- Do NOT invent tickers, prices, filings, or sentiment scores not present in the inputs.
- Do NOT cite tool names; write for a portfolio manager reader.
- Keep the tone professional and concise."""
def summarizer_function(state: AgentState):
user_messages = [m for m in state["messages"] if isinstance(m, HumanMessage)]
user_query = user_messages[0].content if user_messages else ""
blocks = []
for m in state["messages"]:
if not isinstance(m, AIMessage):
continue
label = m.name or "Assistant"
blocks.append(f"### {label}\n{m.content}")
specialist_blob = "\n\n".join(blocks) if blocks else "(No specialist outputs.)"
response = llm.invoke(
[
SystemMessage(content=summarizer_system),
HumanMessage(
content=(
f"User request:\n{user_query}\n\n"
f"Specialist outputs (verbatim):\n{specialist_blob}"
)
),
]
)
memo = (response.content or "").strip()
return {"messages": [AIMessage(content=memo, name="Summarizer")]}
return summarizer_function
def build_financial_graph(llm):
workflow = StateGraph(AgentState)
quant_agent = create_agent(
model=llm,
tools=[get_stock_metrics],
system_prompt=(
"You are a Quantitative Analyst. "
"You have exactly ONE tool: get_stock_metrics(ticker). "
"For any price, volume, or trading-range question you MUST call get_stock_metrics—do not answer from memory. "
"NEVER invent other tool names, NEVER output JSON blocks suggesting tools that do not exist. "
"GOLDEN RULE: After the tool returns, you must format the output gracefully so it is easy to read. "
"Bold the labels (like **Current Price:** or **Average Volume:**) before injecting the numbers. "
"NEVER use introductory conversational filler like 'Here is the data'. Just print the labeled metrics directly."
),
name="Quant_Agent",
)
fundamental_agent = create_agent(
model=llm,
tools=[search_10k_filings, get_company_concept_xbrl],
system_prompt=(
"You are a Fundamental Analyst. "
"GOLDEN RULE: You must output the EXACT DATA or TEXT returned by your tools. "
"Do NOT explain how the tools work or what they do. "
"CRITICAL: ONCE YOU HAVE CALLED TO THE TOOL ONCE AND RECEIVED THE DATA, YOU MUST WRITE YOUR FINAL ANSWER IMMEDIATELY. DO NOT CALL THE TOOL A SECOND TIME. "
"Just answer the user's question using the fetched data and stop."
),
name="Fundamental_Agent",
)
sentiment_agent = create_agent(
model=llm,
tools=[get_recent_news],
system_prompt=(
"You are a Sentiment Analyst. Fetch recent news using your tool. "
"CRITICAL RULES: Your final response MUST be exactly five lines. "
"Line 1: The sentiment score (a single number between -1.0 and 1.0). "
"Line 2-5: Justify the sentiment score based on the news articles."
"Include important keywords from the news articles in your response."
"Do not add conversational filler. Do not ask the user follow-up questions."
),
name="Sentiment_Agent",
)
earnings_agent = create_agent(
model=llm,
tools=[
search_earnings_call,
get_earnings_sentiment_divergence,
get_earnings_keyword_trends,
],
system_prompt=(
"You are an Earnings Call Analyst specializing in management commentary analysis. "
"You have THREE tools for analyzing pre-ingested earnings-call transcripts:\n"
"1. search_earnings_call: Search transcripts for specific topics (guidance, margins, strategy, etc.).\n"
"2. get_earnings_sentiment_divergence: Compare management tone in scripted Prepared Remarks vs live Q&A.\n"
"3. get_earnings_keyword_trends: Track keyword frequency changes across quarters.\n\n"
"CRITICAL RULES:\n"
"- You MUST call at least one tool. Do NOT answer from memory.\n"
"- SEC filings (Form 8-K / SEC-8) are a valid source. They typically only contain Prepared Remarks and LACK a Q&A session. This is common and NOT a failure of the data.\n"
"- If a tool returns an error about missing data (e.g., no filings found), report that the earnings data for that "
"ticker/quarter has not been ingested and suggest running the ingest script.\n"
"- If Q&A is missing, simply perform your analysis on the available management commentary.\n"
"- After the tool returns, write a clear, evidence-backed analysis. Bold key findings.\n"
"- Do NOT add conversational filler. Do NOT ask follow-up questions."
),
name="Earnings_Agent",
)
workflow.add_node("Planner", create_planner_node(llm))
workflow.add_node("Supervisor", create_supervisor_node(llm))
workflow.add_node("Quant_Agent", make_worker_node(quant_agent, "Quant_Agent"))
workflow.add_node(
"Fundamental_Agent", make_worker_node(fundamental_agent, "Fundamental_Agent")
)
workflow.add_node("Sentiment_Agent", make_worker_node(sentiment_agent, "Sentiment_Agent"))
workflow.add_node("Earnings_Agent", make_worker_node(earnings_agent, "Earnings_Agent"))
workflow.add_node("Summarizer", create_summarizer_node(llm))
for member in members:
workflow.add_edge(member, "Supervisor")
workflow.add_edge(START, "Planner")
workflow.add_edge("Planner", "Supervisor")
workflow.add_conditional_edges(
"Supervisor",
lambda state: state["next"],
{
"Quant_Agent": "Quant_Agent",
"Fundamental_Agent": "Fundamental_Agent",
"Sentiment_Agent": "Sentiment_Agent",
"Earnings_Agent": "Earnings_Agent",
"FINISH": "Summarizer",
},
)
workflow.add_edge("Summarizer", END)
return workflow.compile()
|