Dev Goyal
refactor: replace Alpha Vantage with Financial Modeling Prep (FMP) for earnings transcript ingestion
012bcc4 | import operator | |
| from typing import Annotated, Sequence, TypedDict, Literal, Set | |
| import yfinance as yf | |
| import pandas as pd | |
| from pydantic import BaseModel, Field | |
| from langchain_core.messages import HumanMessage, SystemMessage, BaseMessage, AIMessage | |
| from langchain_core.tools import tool | |
| from langgraph.graph import StateGraph, START, END | |
| from langchain.agents import create_agent | |
| from .sec_tools import get_company_concept_xbrl | |
| from .rag_tools import search_10k_filings | |
| from .sentiment_tools import get_recent_news | |
| from .earnings_tools import ( | |
| search_earnings_call, | |
| get_earnings_sentiment_divergence, | |
| get_earnings_keyword_trends, | |
| ) | |
| def get_stock_metrics(ticker: str) -> str: | |
| """ | |
| Fetches historical market data and calculates basic metrics for a stock. | |
| CRITICAL: You must pass the official stock ticker symbol (e.g., 'AAPL' for Apple). | |
| """ | |
| try: | |
| ticker = ticker.upper() | |
| print(f"\n[System: Fetching yfinance data for {ticker}...]") | |
| stock = yf.Ticker(ticker) | |
| hist = stock.history(period="1mo") | |
| if hist.empty: | |
| return f"Could not find price data for ticker: {ticker}. Tell the user the data fetch failed." | |
| current_price = hist["Close"].iloc[-1] | |
| monthly_high = hist["High"].max() | |
| monthly_low = hist["Low"].min() | |
| avg_volume = hist["Volume"].mean() | |
| summary = ( | |
| f"Data for {ticker}:\n" | |
| f"- Current Price: ${current_price:.2f}\n" | |
| f"- 1-Month High: ${monthly_high:.2f}\n" | |
| f"- 1-Month Low: ${monthly_low:.2f}\n" | |
| f"- Average Daily Volume: {int(avg_volume):,}" | |
| ) | |
| return summary | |
| except Exception as e: | |
| return f"Error fetching data: {str(e)}" | |
| def merge_sets(a: Set, b: Set) -> Set: | |
| return a | b | |
| class AgentState(TypedDict): | |
| messages: Annotated[Sequence[BaseMessage], operator.add] | |
| next: str | list[str] | |
| steps: Annotated[int, operator.add] | |
| completed_tasks: Annotated[Set[str], merge_sets] | |
| pending_tasks: list | |
| members = ["Quant_Agent", "Fundamental_Agent", "Sentiment_Agent", "Earnings_Agent"] | |
| def make_worker_node(agent, name: str): | |
| def node(state: AgentState): | |
| pending = state.get("pending_tasks", []) | |
| completed = state.get("completed_tasks", set()) | |
| my_task = next( | |
| ( | |
| t | |
| for t in pending | |
| if t["agent"] == name and t["task_id"] not in completed | |
| ), | |
| None, | |
| ) | |
| if not my_task: | |
| return {"completed_tasks": set()} | |
| task_message = HumanMessage( | |
| content=f"Ticker: {my_task['ticker']}. Task: {my_task['description']}" | |
| ) | |
| result = agent.invoke({"messages": [task_message]}) | |
| has_tool_call = any( | |
| isinstance(m, AIMessage) and m.tool_calls for m in result["messages"] | |
| ) | |
| if not has_tool_call: | |
| content = f"ERROR: The {name} attempted to answer without using a data tool. This analysis is unauthorized." | |
| else: | |
| content = result["messages"][-1].content.strip() or f"[{name}: No data retrieved]" | |
| return { | |
| "messages": [AIMessage(content=f"[{my_task['task_id']}] {content}", name=name)], | |
| "completed_tasks": {my_task["task_id"]}, | |
| } | |
| return node | |
| def create_planner_node(llm): | |
| planner_prompt = """You are a task planner for a financial AI system. | |
| GOLDEN RULE: Never assume or guess a number. | |
| CRITICAL AGENT CAPABILITY MAPPING: | |
| 1. Quant_Agent: ONLY use for current stock price, trading volume, and 52-week high/lows. | |
| => GROUPING RULE: If the user asks for multiple price/volume metrics for the SAME ticker, group them into EXACTLY ONE Quant_Agent task. Do NOT make separate tasks for price and volume. | |
| 2. Sentiment_Agent: ONLY use for recent news headlines and market sentiment scores. | |
| 3. Fundamental_Agent: Use for TWO things: | |
| - SEC Financial Metrics (Revenue, Net Income, Margins, Cash Flow). | |
| - SEC 10-K RAG Searches: Use this for ANY qualitative questions about business strategy, supply chain, manufacturing, competition, and corporate RISKS. | |
| 4. Earnings_Agent: Use for earnings-call analysis. This includes: | |
| - Management commentary and guidance from earnings calls. | |
| - Sentiment divergence between Prepared Remarks and Q&A sessions. | |
| - Keyword and entity tracking across quarters (e.g., mentions of "AI", "headwinds", "growth"). | |
| => Use this agent when the user asks about earnings calls, management tone, guidance language, or quarter-over-quarter keyword trends. | |
| Read the user's request and output a JSON list of tasks needed to answer it. | |
| Each task must have: | |
| - "agent": "Quant_Agent", "Fundamental_Agent", "Sentiment_Agent", or "Earnings_Agent" | |
| - "ticker": the stock ticker symbol (e.g. "AAPL") | |
| - "task_id": a unique string | |
| - "description": specific instructions on what to fetch or search | |
| Output ONLY valid JSON. No explanation. | |
| example output: | |
| [ | |
| {"agent": "Quant_Agent", "ticker": "AAPL", "task_id": "Quant_AAPL", "description": "Get price and volume for AAPL"}, | |
| {"agent": "Sentiment_Agent", "ticker": "MSFT", "task_id": "Sentiment_MSFT", "description": "Get sentiment analysis for MSFT"}, | |
| {"agent": "Earnings_Agent", "ticker": "AAPL", "task_id": "Earnings_AAPL", "description": "Analyze sentiment divergence between prepared remarks and Q&A in the latest earnings call"} | |
| ]""" | |
| def planner_function(state: AgentState): | |
| if state.get("pending_tasks"): | |
| return {} | |
| user_message = next(m for m in state["messages"] if isinstance(m, HumanMessage)) | |
| response = llm.invoke( | |
| [ | |
| SystemMessage(content=planner_prompt), | |
| HumanMessage(content=user_message.content), | |
| ] | |
| ) | |
| import json | |
| raw = response.content.strip().replace("```json", "").replace("```", "") | |
| start = raw.find("[") | |
| end = raw.rfind("]") | |
| try: | |
| tasks = json.loads(raw[start : end + 1]) if start != -1 and end != -1 else [] | |
| except Exception: | |
| tasks = [] | |
| if not tasks: | |
| print("[Planner]: No valid financial tasks found.") | |
| return { | |
| "pending_tasks": [], | |
| "completed_tasks": set(), | |
| "messages": [ | |
| AIMessage( | |
| content="I can only answer questions about stock prices, SEC filings, and market sentiment.", | |
| name="Supervisor", | |
| ) | |
| ], | |
| } | |
| print(f"\n[Planner]: Created {len(tasks)} tasks: {[t['task_id'] for t in tasks]}") | |
| return {"pending_tasks": tasks, "completed_tasks": set()} | |
| return planner_function | |
| def create_supervisor_node(llm): | |
| def supervisor_function(state: AgentState): | |
| steps = state.get("steps", 0) | |
| if steps >= 10: | |
| return {"next": "FINISH", "steps": 1} | |
| pending = state.get("pending_tasks", []) | |
| completed = state.get("completed_tasks", set()) | |
| remaining = [t for t in pending if t["task_id"] not in completed] | |
| if not remaining: | |
| print("-> All tasks complete. Routing to Summarizer.") | |
| return {"next": "FINISH", "steps": 1} | |
| # Dispatch one task per unique agent in parallel | |
| agents_to_dispatch = [] | |
| dispatched_tasks = [] | |
| for task in remaining: | |
| if task["agent"] not in agents_to_dispatch: | |
| agents_to_dispatch.append(task["agent"]) | |
| dispatched_tasks.append(task["task_id"]) | |
| print(f"\n[Supervisor]: Dispatching tasks in parallel → {dispatched_tasks}") | |
| return { | |
| "next": agents_to_dispatch, | |
| "steps": 1, | |
| } | |
| return supervisor_function | |
| def create_summarizer_node(llm): | |
| summarizer_system = """You are a senior investment analyst drafting an internal **Investment Memo** for colleagues. | |
| You will receive the user's original question and verbatim outputs from specialist agents (Quant_Agent, Fundamental_Agent, Sentiment_Agent, Earnings_Agent), or a single clarification/refusal if no research ran. | |
| Write the memo using this structure and markdown headings: | |
| # Investment Memo | |
| ## Executive Summary | |
| 2-4 sentences answering the user in plain language. | |
| ## Key Facts & Data | |
| Bullet points. Use ONLY numbers, metrics, and quotes that appear in the specialist outputs. If a section had no data, say "No quantitative/fundamental/sentiment data provided" as appropriate. | |
| ## Earnings Call Insights | |
| If Earnings_Agent data is present, summarize management's key messages and guidance. | |
| - If both Prepared Remarks and Q&A are present, analyze any sentiment divergence (e.g., was management more cautious in live Q&A?). | |
| - If only Prepared Remarks are available (typical for SEC-8 / 8-K filings), focus the analysis on the tone and specificity of the management commentary. | |
| - Note any notable keyword/entity trends across quarters (e.g., AI mentions). | |
| If no earnings data was provided, omit this section entirely. | |
| ## Risks, Sentiment, and Context | |
| Integrate fundamental and sentiment findings when present. If missing, state that briefly. | |
| ## Caveats | |
| Note missing specialists, tool errors, or "unauthorized" / ERROR lines exactly as reported—do not soften them. | |
| Rules: | |
| - Do NOT invent tickers, prices, filings, or sentiment scores not present in the inputs. | |
| - Do NOT cite tool names; write for a portfolio manager reader. | |
| - Keep the tone professional and concise.""" | |
| def summarizer_function(state: AgentState): | |
| user_messages = [m for m in state["messages"] if isinstance(m, HumanMessage)] | |
| user_query = user_messages[0].content if user_messages else "" | |
| blocks = [] | |
| for m in state["messages"]: | |
| if not isinstance(m, AIMessage): | |
| continue | |
| label = m.name or "Assistant" | |
| blocks.append(f"### {label}\n{m.content}") | |
| specialist_blob = "\n\n".join(blocks) if blocks else "(No specialist outputs.)" | |
| response = llm.invoke( | |
| [ | |
| SystemMessage(content=summarizer_system), | |
| HumanMessage( | |
| content=( | |
| f"User request:\n{user_query}\n\n" | |
| f"Specialist outputs (verbatim):\n{specialist_blob}" | |
| ) | |
| ), | |
| ] | |
| ) | |
| memo = (response.content or "").strip() | |
| return {"messages": [AIMessage(content=memo, name="Summarizer")]} | |
| return summarizer_function | |
| def build_financial_graph(llm): | |
| workflow = StateGraph(AgentState) | |
| quant_agent = create_agent( | |
| model=llm, | |
| tools=[get_stock_metrics], | |
| system_prompt=( | |
| "You are a Quantitative Analyst. " | |
| "You have exactly ONE tool: get_stock_metrics(ticker). " | |
| "For any price, volume, or trading-range question you MUST call get_stock_metrics—do not answer from memory. " | |
| "NEVER invent other tool names, NEVER output JSON blocks suggesting tools that do not exist. " | |
| "GOLDEN RULE: After the tool returns, you must format the output gracefully so it is easy to read. " | |
| "Bold the labels (like **Current Price:** or **Average Volume:**) before injecting the numbers. " | |
| "NEVER use introductory conversational filler like 'Here is the data'. Just print the labeled metrics directly." | |
| ), | |
| name="Quant_Agent", | |
| ) | |
| fundamental_agent = create_agent( | |
| model=llm, | |
| tools=[search_10k_filings, get_company_concept_xbrl], | |
| system_prompt=( | |
| "You are a Fundamental Analyst. " | |
| "GOLDEN RULE: You must output the EXACT DATA or TEXT returned by your tools. " | |
| "Do NOT explain how the tools work or what they do. " | |
| "CRITICAL: ONCE YOU HAVE CALLED TO THE TOOL ONCE AND RECEIVED THE DATA, YOU MUST WRITE YOUR FINAL ANSWER IMMEDIATELY. DO NOT CALL THE TOOL A SECOND TIME. " | |
| "Just answer the user's question using the fetched data and stop." | |
| ), | |
| name="Fundamental_Agent", | |
| ) | |
| sentiment_agent = create_agent( | |
| model=llm, | |
| tools=[get_recent_news], | |
| system_prompt=( | |
| "You are a Sentiment Analyst. Fetch recent news using your tool. " | |
| "CRITICAL RULES: Your final response MUST be exactly five lines. " | |
| "Line 1: The sentiment score (a single number between -1.0 and 1.0). " | |
| "Line 2-5: Justify the sentiment score based on the news articles." | |
| "Include important keywords from the news articles in your response." | |
| "Do not add conversational filler. Do not ask the user follow-up questions." | |
| ), | |
| name="Sentiment_Agent", | |
| ) | |
| earnings_agent = create_agent( | |
| model=llm, | |
| tools=[ | |
| search_earnings_call, | |
| get_earnings_sentiment_divergence, | |
| get_earnings_keyword_trends, | |
| ], | |
| system_prompt=( | |
| "You are an Earnings Call Analyst specializing in management commentary analysis. " | |
| "You have THREE tools for analyzing pre-ingested earnings-call transcripts:\n" | |
| "1. search_earnings_call: Search transcripts for specific topics (guidance, margins, strategy, etc.).\n" | |
| "2. get_earnings_sentiment_divergence: Compare management tone in scripted Prepared Remarks vs live Q&A.\n" | |
| "3. get_earnings_keyword_trends: Track keyword frequency changes across quarters.\n\n" | |
| "CRITICAL RULES:\n" | |
| "- You MUST call at least one tool. Do NOT answer from memory.\n" | |
| "- SEC filings (Form 8-K / SEC-8) are a valid source. They typically only contain Prepared Remarks and LACK a Q&A session. This is common and NOT a failure of the data.\n" | |
| "- If a tool returns an error about missing data (e.g., no filings found), report that the earnings data for that " | |
| "ticker/quarter has not been ingested and suggest running the ingest script.\n" | |
| "- If Q&A is missing, simply perform your analysis on the available management commentary.\n" | |
| "- After the tool returns, write a clear, evidence-backed analysis. Bold key findings.\n" | |
| "- Do NOT add conversational filler. Do NOT ask follow-up questions." | |
| ), | |
| name="Earnings_Agent", | |
| ) | |
| workflow.add_node("Planner", create_planner_node(llm)) | |
| workflow.add_node("Supervisor", create_supervisor_node(llm)) | |
| workflow.add_node("Quant_Agent", make_worker_node(quant_agent, "Quant_Agent")) | |
| workflow.add_node( | |
| "Fundamental_Agent", make_worker_node(fundamental_agent, "Fundamental_Agent") | |
| ) | |
| workflow.add_node("Sentiment_Agent", make_worker_node(sentiment_agent, "Sentiment_Agent")) | |
| workflow.add_node("Earnings_Agent", make_worker_node(earnings_agent, "Earnings_Agent")) | |
| workflow.add_node("Summarizer", create_summarizer_node(llm)) | |
| for member in members: | |
| workflow.add_edge(member, "Supervisor") | |
| workflow.add_edge(START, "Planner") | |
| workflow.add_edge("Planner", "Supervisor") | |
| workflow.add_conditional_edges( | |
| "Supervisor", | |
| lambda state: state["next"], | |
| { | |
| "Quant_Agent": "Quant_Agent", | |
| "Fundamental_Agent": "Fundamental_Agent", | |
| "Sentiment_Agent": "Sentiment_Agent", | |
| "Earnings_Agent": "Earnings_Agent", | |
| "FINISH": "Summarizer", | |
| }, | |
| ) | |
| workflow.add_edge("Summarizer", END) | |
| return workflow.compile() | |