PrimoGreedy-Agent / src /agent.py
CiscsoPonce's picture
feat: VPS dashboard + LangSmith online evaluators and prompt versioning
ffc1e30
"""Interactive Chainlit agent pipeline.
Supports: manual ticker lookup, auto-scan, social scout, chat mode.
Uses the shared core modules and discovery pipeline.
"""
import io
import gc
import random
import time
import warnings
import yfinance as yf
import matplotlib.pyplot as plt
from typing import Literal
warnings.filterwarnings("ignore", category=UserWarning, module="pydantic")
from langgraph.graph import StateGraph, START, END
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.types import Command, RetryPolicy
from src.llm import get_llm, get_structured_llm, invoke_with_fallback
from src.finance_tools import (
check_financial_health,
get_insider_sentiment,
get_company_news,
get_basic_financials,
)
from src.portfolio_tracker import record_paper_trade
from src.core.logger import get_logger
from src.core.search import brave_search
from src.core.ticker_utils import extract_tickers, resolve_ticker_suffix, normalize_price
from src.core.memory import load_seen_tickers, mark_ticker_seen
from src.core.state import AgentState
from src.core.online_eval import log_online_feedback, tag_for_review, get_current_run_id
from src.prompts.senior_broker import get_analyst_prompt
from src.discovery.screener import screen_microcaps, get_trending_tickers_from_brave
from src.discovery.scoring import rank_candidates
from src.discovery.insider_feed import get_insider_buys
logger = get_logger(__name__)
# Re-export for backward compatibility (app.py imports this)
brave_market_search = brave_search
# --- CONFIGURATION ---
MAX_MARKET_CAP = 500_000_000
MIN_MARKET_CAP = 5_000_000
MAX_PRICE_PER_SHARE = 30.00
MAX_RETRIES = 1
def generate_chart(ticker: str) -> bytes | None:
"""Generate a 6-month price chart in memory."""
try:
stock = yf.Ticker(ticker)
hist = stock.history(period="6mo")
if hist.empty:
return None
plt.figure(figsize=(6, 4))
plt.plot(hist.index, hist["Close"], color="#00a1ff", linewidth=1.5)
plt.title(f"{ticker} - 6 Month Price Action")
plt.grid(True, linestyle="--", alpha=0.6)
buf = io.BytesIO()
plt.savefig(buf, format="png", bbox_inches="tight")
buf.seek(0)
plt.close("all")
return buf.getvalue()
except Exception as exc:
logger.warning("Chart generation failed for %s: %s", ticker, exc)
return None
# --- NODES ---
def chat_node(state):
"""Dedicated UI chat node for conversational queries."""
user_query = state.get("ticker", "")
prompt = f"""
You are the Senior Broker AI for PrimoGreedy. Your team member just asked you this question:
"{user_query}"
Answer them directly, professionally, and concisely. If they are asking about financial metrics or how you work, explain your quantitative Graham and Deep Value frameworks.
"""
try:
response = invoke_with_fallback(prompt, run_name="chat_node")
except Exception as exc:
logger.error("Chat LLM error: %s", exc)
response = "I am experiencing issues right now. Please try again."
return {"final_report": response, "status": "CHAT"}
def scout_node(state):
"""Scout node: manual ticker pass-through or screener+Brave discovery."""
region = state.get("region", "USA")
retries = state.get("retry_count", 0)
# Manual single-ticker lookup
is_manual = bool(state.get("ticker") and state["ticker"] != "NONE" and retries == 0)
if is_manual:
return {
"ticker": resolve_ticker_suffix(state["ticker"], region),
"manual_search": True,
}
# Auto-scan: use screener + Brave trending
if retries > 0:
time.sleep(2)
logger.info("Auto-scanning %s for micro-caps...", region)
trending = get_trending_tickers_from_brave(region)
screened = screen_microcaps(region=region, extra_tickers=trending, max_results=15)
if not screened:
return {"ticker": "NONE", "manual_search": False}
seen = load_seen_tickers()
fresh = [c for c in screened if c["ticker"] not in seen]
if not fresh:
return {"ticker": "NONE", "manual_search": False}
ranked = rank_candidates(fresh, top_n=3)
best = ranked[0]
ticker = best["ticker"]
mark_ticker_seen(ticker)
logger.info("Auto-scan target: %s (score=%d)", ticker, best.get("score", 0))
return {"ticker": ticker, "manual_search": False}
def _gatekeeper_route(state, update) -> str:
"""Decide where gatekeeper should route based on state + update."""
if state.get("manual_search"):
return "analyst"
if update.get("status") == "PASS":
return "analyst"
new_retries = update.get("retry_count", state.get("retry_count", 0))
if new_retries >= MAX_RETRIES:
return "analyst"
return "scout"
def gatekeeper_node(state) -> Command[Literal["analyst", "scout"]]:
"""Validate candidate with financial health checks. Routes via Command."""
ticker = state.get("ticker", "NONE")
retries = state.get("retry_count", 0)
if ticker == "NONE":
update = {
"is_small_cap": False,
"status": "FAIL",
"retry_count": retries + 1,
"financial_data": {"reason": "Scout found no readable ticker."},
}
return Command(update=update, goto=_gatekeeper_route(state, update))
mark_ticker_seen(ticker)
try:
stock = yf.Ticker(ticker)
raw_info = stock.info
lean_info = {
"currentPrice": raw_info.get("currentPrice", 0) or raw_info.get("regularMarketPrice", 0),
"trailingEps": raw_info.get("trailingEps", 0),
"bookValue": raw_info.get("bookValue", 0),
"marketCap": raw_info.get("marketCap", 0),
"ebitda": raw_info.get("ebitda", 0),
"sector": raw_info.get("sector", "Unknown"),
"freeCashflow": raw_info.get("freeCashflow", 0),
"totalCash": raw_info.get("totalCash", 0),
"currency": raw_info.get("currency", "USD"),
}
del raw_info
gc.collect()
price = normalize_price(lean_info["currentPrice"], ticker, lean_info["currency"])
lean_info["currentPrice"] = price
mkt_cap = lean_info["marketCap"]
chart_bytes = generate_chart(ticker)
if price > MAX_PRICE_PER_SHARE:
update = {
"market_cap": mkt_cap,
"is_small_cap": False,
"status": "FAIL",
"company_name": ticker,
"financial_data": lean_info,
"retry_count": retries + 1,
"final_report": f"Price ${price:.2f} exceeds ${MAX_PRICE_PER_SHARE} limit.",
"chart_data": chart_bytes,
}
return Command(update=update, goto=_gatekeeper_route(state, update))
if not (MIN_MARKET_CAP < mkt_cap < MAX_MARKET_CAP):
update = {
"market_cap": mkt_cap,
"is_small_cap": False,
"status": "FAIL",
"company_name": ticker,
"financial_data": lean_info,
"retry_count": retries + 1,
"final_report": f"Market Cap ${mkt_cap:,.0f} is outside the $10M-$300M range.",
"chart_data": chart_bytes,
}
return Command(update=update, goto=_gatekeeper_route(state, update))
health = check_financial_health(ticker, lean_info)
if health["status"] == "FAIL":
update = {
"market_cap": mkt_cap,
"is_small_cap": False,
"status": "FAIL",
"company_name": ticker,
"financial_data": lean_info,
"retry_count": retries + 1,
"final_report": f"**GATEKEEPER REJECT:** {health['reason']}",
"chart_data": chart_bytes,
}
return Command(update=update, goto=_gatekeeper_route(state, update))
update = {
"market_cap": mkt_cap,
"is_small_cap": True,
"status": "PASS",
"company_name": stock.info.get("shortName", ticker),
"financial_data": lean_info,
"chart_data": chart_bytes,
}
return Command(update=update, goto="analyst")
except Exception as exc:
logger.error("Gatekeeper error for %s: %s", ticker, exc)
update = {
"is_small_cap": False,
"status": "FAIL",
"retry_count": retries + 1,
"financial_data": {"reason": f"API Error: {exc}"},
}
return Command(update=update, goto=_gatekeeper_route(state, update))
def analyst_node(state):
"""Senior Broker analysis with Graham Number + Finnhub + insider data."""
ticker = state["ticker"]
info = state.get("financial_data", {})
chart_bytes = state.get("chart_data")
region = state.get("region", "USA")
if state.get("status") == "FAIL":
reason = state.get("final_report", info.get("reason", "Failed basic criteria."))
verdict = (
f"### REJECTED BY GATEKEEPER\n"
f"**Reason:** {reason}\n\n"
f"*The data for {ticker} was retrieved, but it does not fit the PrimoGreedy small-cap profile.*"
)
return {"final_verdict": verdict, "final_report": verdict, "chart_data": chart_bytes}
price = info.get("currentPrice", 0) or 0
eps = info.get("trailingEps", 0) or 0
book_value = info.get("bookValue", 0) or 0
ebitda = info.get("ebitda", 0) or 0
sector = info.get("sector", "Unknown")
currency = info.get("currency", "USD")
# Normalize per-share metrics from pence → pounds for UK stocks
# (price is already converted by gatekeeper's normalize_price call,
# but eps/bookValue come raw from yFinance in GBp/GBX)
if ticker.endswith(".L") or currency in ("GBp", "GBX"):
eps = eps / 100
book_value = book_value / 100
if eps > 0 and book_value > 0:
strategy = "GRAHAM VALUE"
valuation = (22.5 * eps * book_value) ** 0.5
thesis = f"Profitable in {sector}. Graham Value ${valuation:.2f} vs Price ${price:.2f}. EBITDA: ${ebitda:,.0f}."
else:
strategy = "DEEP VALUE ASSET PLAY"
ratio = price / book_value if book_value > 0 else 0
thesis = f"Unprofitable in {sector}. Trading at {ratio:.2f}x Book Value. EBITDA: ${ebitda:,.0f}."
news = brave_search(f"{ticker} stock {sector} catalysts insider buying")
# --- Build deep-fundamentals context ---
deep_fundamentals = ""
if region == "USA" and "." not in ticker:
logger.info("Researching Finnhub for %s...", ticker)
context = ""
try:
context += get_insider_sentiment.invoke({"ticker": ticker}) + "\n"
context += get_company_news.invoke({"ticker": ticker}) + "\n"
context += get_basic_financials.invoke({"ticker": ticker}) + "\n"
except Exception as exc:
logger.warning("Finnhub tool error for %s: %s", ticker, exc)
insider = get_insider_buys(ticker)
context += f"\nInsider Sentiment (6mo): {insider['sentiment']} | MSPR: {insider['mspr']} | Net Shares: {insider['change']}\n"
deep_fundamentals = f"DEEP FUNDAMENTALS (FINNHUB + INSIDER FEED):\n{context}"
else:
deep_fundamentals = f"NEWS: {str(news)[:1500]}"
# --- SEC EDGAR ground truth (US equities only) ---
sec_context = ""
if region == "USA" and "." not in ticker:
from src.sec_edgar import get_sec_filings
try:
sec_context = get_sec_filings.invoke({"ticker": ticker})
except Exception as exc:
logger.warning("SEC EDGAR failed for %s: %s", ticker, exc)
# --- Debate or single-LLM path ---
from src.agents.debate import is_debate_enabled, run_debate
from src.models.kelly import get_kelly_stats, calculate_position_size
debate_result = None
if is_debate_enabled():
logger.info("Running multi-agent debate for %s...", ticker)
try:
debate_result = run_debate(
ticker=ticker,
company_name=state.get("company_name", ticker),
financial_data_summary=str(info)[:2000],
deep_fundamentals=deep_fundamentals,
sec_context=sec_context,
strategy=strategy,
price=price,
eps=eps,
book_value=book_value,
ebitda=ebitda,
)
result = debate_result["_structured_result"]
stats = get_kelly_stats()
result.position_size = calculate_position_size(stats, result.verdict)
result.kelly_win_rate = stats.win_rate
result.kelly_total_trades = stats.total_trades
verdict = result.to_report()
record_paper_trade(ticker, price, verdict, source="Chainlit UI",
structured_verdict=result.verdict,
position_size=result.position_size)
_run_id = get_current_run_id()
log_online_feedback(verdict, ticker, run_id=_run_id)
tag_for_review(verdict, ticker, run_id=_run_id)
return {
"final_verdict": verdict, "final_report": verdict,
"chart_data": chart_bytes, "debate_used": True,
"bull_case": debate_result.get("bull_case", ""),
"bear_case": debate_result.get("bear_case", ""),
}
except Exception as exc:
logger.warning("Debate failed for %s, falling back to single-LLM: %s", ticker, exc)
# --- Single-LLM path (default or debate fallback) ---
template = get_analyst_prompt()
prompt = template.format(
company_name=state.get("company_name", ticker),
ticker=ticker,
price=price,
eps=eps,
book_value=book_value,
ebitda=ebitda,
thesis=thesis,
strategy=strategy,
deep_fundamentals=deep_fundamentals,
sec_context=sec_context,
)
try:
import warnings
from src.models.verdict import InvestmentVerdict
structured_llm = get_structured_llm().with_structured_output(InvestmentVerdict)
result = structured_llm.invoke(prompt)
stats = get_kelly_stats()
result.position_size = calculate_position_size(stats, result.verdict)
result.kelly_win_rate = stats.win_rate
result.kelly_total_trades = stats.total_trades
verdict = result.to_report()
record_paper_trade(ticker, price, verdict, source="Chainlit UI",
structured_verdict=result.verdict,
position_size=result.position_size)
_run_id = get_current_run_id()
log_online_feedback(verdict, ticker, run_id=_run_id)
tag_for_review(verdict, ticker, run_id=_run_id)
except Exception as exc:
logger.warning("Structured output failed for %s, falling back to plain LLM: %s", ticker, exc)
try:
verdict = invoke_with_fallback(prompt, run_name="analyst_node")
stats = get_kelly_stats()
v_upper = verdict.upper()
verdict_type = "AVOID"
if "STRONG BUY" in v_upper:
verdict_type = "STRONG BUY"
elif "BUY" in v_upper:
verdict_type = "BUY"
elif "WATCH" in v_upper:
verdict_type = "WATCH"
pos = calculate_position_size(stats, verdict_type)
if pos > 0:
verdict += (
f"\n\n### POSITION SIZING (Kelly Criterion)\n"
f"**Recommended allocation: {pos:.1f}% of portfolio**"
)
record_paper_trade(ticker, price, verdict, source="Chainlit UI",
position_size=pos)
_run_id = get_current_run_id()
log_online_feedback(verdict, ticker, run_id=_run_id, is_fallback=True)
tag_for_review(verdict, ticker, run_id=_run_id, is_fallback=True)
except Exception as exc2:
logger.error("LLM analysis failed for %s: %s", ticker, exc2)
verdict = f"Strategy: {strategy}\nLLM analysis unavailable: {exc2}"
return {"final_verdict": verdict, "final_report": verdict, "chart_data": chart_bytes}
# --- GRAPH ---
_api_retry = RetryPolicy(max_attempts=3, initial_interval=2.0)
workflow = StateGraph(AgentState)
workflow.add_node("chat", chat_node, retry=_api_retry)
workflow.add_node("scout", scout_node, retry=_api_retry)
workflow.add_node("gatekeeper", gatekeeper_node, retry=_api_retry)
workflow.add_node("analyst", analyst_node, retry=_api_retry)
def initial_routing(state):
"""Direct traffic: spaces -> chat, otherwise -> scout."""
query = str(state.get("ticker", ""))
if " " in query:
return "chat"
return "scout"
workflow.add_conditional_edges(START, initial_routing, ["chat", "scout"])
workflow.add_edge("chat", END)
workflow.add_edge("scout", "gatekeeper")
workflow.add_edge("analyst", END)
app = workflow.compile(checkpointer=InMemorySaver())