Spaces:
Running
Running
| """Interactive Chainlit agent pipeline. | |
| Supports: manual ticker lookup, auto-scan, social scout, chat mode. | |
| Uses the shared core modules and discovery pipeline. | |
| """ | |
| import io | |
| import gc | |
| import random | |
| import time | |
| import warnings | |
| import yfinance as yf | |
| import matplotlib.pyplot as plt | |
| from typing import Literal | |
| warnings.filterwarnings("ignore", category=UserWarning, module="pydantic") | |
| from langgraph.graph import StateGraph, START, END | |
| from langgraph.checkpoint.memory import InMemorySaver | |
| from langgraph.types import Command, RetryPolicy | |
| from src.llm import get_llm, get_structured_llm, invoke_with_fallback | |
| from src.finance_tools import ( | |
| check_financial_health, | |
| get_insider_sentiment, | |
| get_company_news, | |
| get_basic_financials, | |
| ) | |
| from src.portfolio_tracker import record_paper_trade | |
| from src.core.logger import get_logger | |
| from src.core.search import brave_search | |
| from src.core.ticker_utils import extract_tickers, resolve_ticker_suffix, normalize_price | |
| from src.core.memory import load_seen_tickers, mark_ticker_seen | |
| from src.core.state import AgentState | |
| from src.core.online_eval import log_online_feedback, tag_for_review, get_current_run_id | |
| from src.prompts.senior_broker import get_analyst_prompt | |
| from src.discovery.screener import screen_microcaps, get_trending_tickers_from_brave | |
| from src.discovery.scoring import rank_candidates | |
| from src.discovery.insider_feed import get_insider_buys | |
| logger = get_logger(__name__) | |
| # Re-export for backward compatibility (app.py imports this) | |
| brave_market_search = brave_search | |
| # --- CONFIGURATION --- | |
| MAX_MARKET_CAP = 500_000_000 | |
| MIN_MARKET_CAP = 5_000_000 | |
| MAX_PRICE_PER_SHARE = 30.00 | |
| MAX_RETRIES = 1 | |
| def generate_chart(ticker: str) -> bytes | None: | |
| """Generate a 6-month price chart in memory.""" | |
| try: | |
| stock = yf.Ticker(ticker) | |
| hist = stock.history(period="6mo") | |
| if hist.empty: | |
| return None | |
| plt.figure(figsize=(6, 4)) | |
| plt.plot(hist.index, hist["Close"], color="#00a1ff", linewidth=1.5) | |
| plt.title(f"{ticker} - 6 Month Price Action") | |
| plt.grid(True, linestyle="--", alpha=0.6) | |
| buf = io.BytesIO() | |
| plt.savefig(buf, format="png", bbox_inches="tight") | |
| buf.seek(0) | |
| plt.close("all") | |
| return buf.getvalue() | |
| except Exception as exc: | |
| logger.warning("Chart generation failed for %s: %s", ticker, exc) | |
| return None | |
| # --- NODES --- | |
| def chat_node(state): | |
| """Dedicated UI chat node for conversational queries.""" | |
| user_query = state.get("ticker", "") | |
| prompt = f""" | |
| You are the Senior Broker AI for PrimoGreedy. Your team member just asked you this question: | |
| "{user_query}" | |
| Answer them directly, professionally, and concisely. If they are asking about financial metrics or how you work, explain your quantitative Graham and Deep Value frameworks. | |
| """ | |
| try: | |
| response = invoke_with_fallback(prompt, run_name="chat_node") | |
| except Exception as exc: | |
| logger.error("Chat LLM error: %s", exc) | |
| response = "I am experiencing issues right now. Please try again." | |
| return {"final_report": response, "status": "CHAT"} | |
| def scout_node(state): | |
| """Scout node: manual ticker pass-through or screener+Brave discovery.""" | |
| region = state.get("region", "USA") | |
| retries = state.get("retry_count", 0) | |
| # Manual single-ticker lookup | |
| is_manual = bool(state.get("ticker") and state["ticker"] != "NONE" and retries == 0) | |
| if is_manual: | |
| return { | |
| "ticker": resolve_ticker_suffix(state["ticker"], region), | |
| "manual_search": True, | |
| } | |
| # Auto-scan: use screener + Brave trending | |
| if retries > 0: | |
| time.sleep(2) | |
| logger.info("Auto-scanning %s for micro-caps...", region) | |
| trending = get_trending_tickers_from_brave(region) | |
| screened = screen_microcaps(region=region, extra_tickers=trending, max_results=15) | |
| if not screened: | |
| return {"ticker": "NONE", "manual_search": False} | |
| seen = load_seen_tickers() | |
| fresh = [c for c in screened if c["ticker"] not in seen] | |
| if not fresh: | |
| return {"ticker": "NONE", "manual_search": False} | |
| ranked = rank_candidates(fresh, top_n=3) | |
| best = ranked[0] | |
| ticker = best["ticker"] | |
| mark_ticker_seen(ticker) | |
| logger.info("Auto-scan target: %s (score=%d)", ticker, best.get("score", 0)) | |
| return {"ticker": ticker, "manual_search": False} | |
| def _gatekeeper_route(state, update) -> str: | |
| """Decide where gatekeeper should route based on state + update.""" | |
| if state.get("manual_search"): | |
| return "analyst" | |
| if update.get("status") == "PASS": | |
| return "analyst" | |
| new_retries = update.get("retry_count", state.get("retry_count", 0)) | |
| if new_retries >= MAX_RETRIES: | |
| return "analyst" | |
| return "scout" | |
| def gatekeeper_node(state) -> Command[Literal["analyst", "scout"]]: | |
| """Validate candidate with financial health checks. Routes via Command.""" | |
| ticker = state.get("ticker", "NONE") | |
| retries = state.get("retry_count", 0) | |
| if ticker == "NONE": | |
| update = { | |
| "is_small_cap": False, | |
| "status": "FAIL", | |
| "retry_count": retries + 1, | |
| "financial_data": {"reason": "Scout found no readable ticker."}, | |
| } | |
| return Command(update=update, goto=_gatekeeper_route(state, update)) | |
| mark_ticker_seen(ticker) | |
| try: | |
| stock = yf.Ticker(ticker) | |
| raw_info = stock.info | |
| lean_info = { | |
| "currentPrice": raw_info.get("currentPrice", 0) or raw_info.get("regularMarketPrice", 0), | |
| "trailingEps": raw_info.get("trailingEps", 0), | |
| "bookValue": raw_info.get("bookValue", 0), | |
| "marketCap": raw_info.get("marketCap", 0), | |
| "ebitda": raw_info.get("ebitda", 0), | |
| "sector": raw_info.get("sector", "Unknown"), | |
| "freeCashflow": raw_info.get("freeCashflow", 0), | |
| "totalCash": raw_info.get("totalCash", 0), | |
| "currency": raw_info.get("currency", "USD"), | |
| } | |
| del raw_info | |
| gc.collect() | |
| price = normalize_price(lean_info["currentPrice"], ticker, lean_info["currency"]) | |
| lean_info["currentPrice"] = price | |
| mkt_cap = lean_info["marketCap"] | |
| chart_bytes = generate_chart(ticker) | |
| if price > MAX_PRICE_PER_SHARE: | |
| update = { | |
| "market_cap": mkt_cap, | |
| "is_small_cap": False, | |
| "status": "FAIL", | |
| "company_name": ticker, | |
| "financial_data": lean_info, | |
| "retry_count": retries + 1, | |
| "final_report": f"Price ${price:.2f} exceeds ${MAX_PRICE_PER_SHARE} limit.", | |
| "chart_data": chart_bytes, | |
| } | |
| return Command(update=update, goto=_gatekeeper_route(state, update)) | |
| if not (MIN_MARKET_CAP < mkt_cap < MAX_MARKET_CAP): | |
| update = { | |
| "market_cap": mkt_cap, | |
| "is_small_cap": False, | |
| "status": "FAIL", | |
| "company_name": ticker, | |
| "financial_data": lean_info, | |
| "retry_count": retries + 1, | |
| "final_report": f"Market Cap ${mkt_cap:,.0f} is outside the $10M-$300M range.", | |
| "chart_data": chart_bytes, | |
| } | |
| return Command(update=update, goto=_gatekeeper_route(state, update)) | |
| health = check_financial_health(ticker, lean_info) | |
| if health["status"] == "FAIL": | |
| update = { | |
| "market_cap": mkt_cap, | |
| "is_small_cap": False, | |
| "status": "FAIL", | |
| "company_name": ticker, | |
| "financial_data": lean_info, | |
| "retry_count": retries + 1, | |
| "final_report": f"**GATEKEEPER REJECT:** {health['reason']}", | |
| "chart_data": chart_bytes, | |
| } | |
| return Command(update=update, goto=_gatekeeper_route(state, update)) | |
| update = { | |
| "market_cap": mkt_cap, | |
| "is_small_cap": True, | |
| "status": "PASS", | |
| "company_name": stock.info.get("shortName", ticker), | |
| "financial_data": lean_info, | |
| "chart_data": chart_bytes, | |
| } | |
| return Command(update=update, goto="analyst") | |
| except Exception as exc: | |
| logger.error("Gatekeeper error for %s: %s", ticker, exc) | |
| update = { | |
| "is_small_cap": False, | |
| "status": "FAIL", | |
| "retry_count": retries + 1, | |
| "financial_data": {"reason": f"API Error: {exc}"}, | |
| } | |
| return Command(update=update, goto=_gatekeeper_route(state, update)) | |
| def analyst_node(state): | |
| """Senior Broker analysis with Graham Number + Finnhub + insider data.""" | |
| ticker = state["ticker"] | |
| info = state.get("financial_data", {}) | |
| chart_bytes = state.get("chart_data") | |
| region = state.get("region", "USA") | |
| if state.get("status") == "FAIL": | |
| reason = state.get("final_report", info.get("reason", "Failed basic criteria.")) | |
| verdict = ( | |
| f"### REJECTED BY GATEKEEPER\n" | |
| f"**Reason:** {reason}\n\n" | |
| f"*The data for {ticker} was retrieved, but it does not fit the PrimoGreedy small-cap profile.*" | |
| ) | |
| return {"final_verdict": verdict, "final_report": verdict, "chart_data": chart_bytes} | |
| price = info.get("currentPrice", 0) or 0 | |
| eps = info.get("trailingEps", 0) or 0 | |
| book_value = info.get("bookValue", 0) or 0 | |
| ebitda = info.get("ebitda", 0) or 0 | |
| sector = info.get("sector", "Unknown") | |
| currency = info.get("currency", "USD") | |
| # Normalize per-share metrics from pence → pounds for UK stocks | |
| # (price is already converted by gatekeeper's normalize_price call, | |
| # but eps/bookValue come raw from yFinance in GBp/GBX) | |
| if ticker.endswith(".L") or currency in ("GBp", "GBX"): | |
| eps = eps / 100 | |
| book_value = book_value / 100 | |
| if eps > 0 and book_value > 0: | |
| strategy = "GRAHAM VALUE" | |
| valuation = (22.5 * eps * book_value) ** 0.5 | |
| thesis = f"Profitable in {sector}. Graham Value ${valuation:.2f} vs Price ${price:.2f}. EBITDA: ${ebitda:,.0f}." | |
| else: | |
| strategy = "DEEP VALUE ASSET PLAY" | |
| ratio = price / book_value if book_value > 0 else 0 | |
| thesis = f"Unprofitable in {sector}. Trading at {ratio:.2f}x Book Value. EBITDA: ${ebitda:,.0f}." | |
| news = brave_search(f"{ticker} stock {sector} catalysts insider buying") | |
| # --- Build deep-fundamentals context --- | |
| deep_fundamentals = "" | |
| if region == "USA" and "." not in ticker: | |
| logger.info("Researching Finnhub for %s...", ticker) | |
| context = "" | |
| try: | |
| context += get_insider_sentiment.invoke({"ticker": ticker}) + "\n" | |
| context += get_company_news.invoke({"ticker": ticker}) + "\n" | |
| context += get_basic_financials.invoke({"ticker": ticker}) + "\n" | |
| except Exception as exc: | |
| logger.warning("Finnhub tool error for %s: %s", ticker, exc) | |
| insider = get_insider_buys(ticker) | |
| context += f"\nInsider Sentiment (6mo): {insider['sentiment']} | MSPR: {insider['mspr']} | Net Shares: {insider['change']}\n" | |
| deep_fundamentals = f"DEEP FUNDAMENTALS (FINNHUB + INSIDER FEED):\n{context}" | |
| else: | |
| deep_fundamentals = f"NEWS: {str(news)[:1500]}" | |
| # --- SEC EDGAR ground truth (US equities only) --- | |
| sec_context = "" | |
| if region == "USA" and "." not in ticker: | |
| from src.sec_edgar import get_sec_filings | |
| try: | |
| sec_context = get_sec_filings.invoke({"ticker": ticker}) | |
| except Exception as exc: | |
| logger.warning("SEC EDGAR failed for %s: %s", ticker, exc) | |
| # --- Debate or single-LLM path --- | |
| from src.agents.debate import is_debate_enabled, run_debate | |
| from src.models.kelly import get_kelly_stats, calculate_position_size | |
| debate_result = None | |
| if is_debate_enabled(): | |
| logger.info("Running multi-agent debate for %s...", ticker) | |
| try: | |
| debate_result = run_debate( | |
| ticker=ticker, | |
| company_name=state.get("company_name", ticker), | |
| financial_data_summary=str(info)[:2000], | |
| deep_fundamentals=deep_fundamentals, | |
| sec_context=sec_context, | |
| strategy=strategy, | |
| price=price, | |
| eps=eps, | |
| book_value=book_value, | |
| ebitda=ebitda, | |
| ) | |
| result = debate_result["_structured_result"] | |
| stats = get_kelly_stats() | |
| result.position_size = calculate_position_size(stats, result.verdict) | |
| result.kelly_win_rate = stats.win_rate | |
| result.kelly_total_trades = stats.total_trades | |
| verdict = result.to_report() | |
| record_paper_trade(ticker, price, verdict, source="Chainlit UI", | |
| structured_verdict=result.verdict, | |
| position_size=result.position_size) | |
| _run_id = get_current_run_id() | |
| log_online_feedback(verdict, ticker, run_id=_run_id) | |
| tag_for_review(verdict, ticker, run_id=_run_id) | |
| return { | |
| "final_verdict": verdict, "final_report": verdict, | |
| "chart_data": chart_bytes, "debate_used": True, | |
| "bull_case": debate_result.get("bull_case", ""), | |
| "bear_case": debate_result.get("bear_case", ""), | |
| } | |
| except Exception as exc: | |
| logger.warning("Debate failed for %s, falling back to single-LLM: %s", ticker, exc) | |
| # --- Single-LLM path (default or debate fallback) --- | |
| template = get_analyst_prompt() | |
| prompt = template.format( | |
| company_name=state.get("company_name", ticker), | |
| ticker=ticker, | |
| price=price, | |
| eps=eps, | |
| book_value=book_value, | |
| ebitda=ebitda, | |
| thesis=thesis, | |
| strategy=strategy, | |
| deep_fundamentals=deep_fundamentals, | |
| sec_context=sec_context, | |
| ) | |
| try: | |
| import warnings | |
| from src.models.verdict import InvestmentVerdict | |
| structured_llm = get_structured_llm().with_structured_output(InvestmentVerdict) | |
| result = structured_llm.invoke(prompt) | |
| stats = get_kelly_stats() | |
| result.position_size = calculate_position_size(stats, result.verdict) | |
| result.kelly_win_rate = stats.win_rate | |
| result.kelly_total_trades = stats.total_trades | |
| verdict = result.to_report() | |
| record_paper_trade(ticker, price, verdict, source="Chainlit UI", | |
| structured_verdict=result.verdict, | |
| position_size=result.position_size) | |
| _run_id = get_current_run_id() | |
| log_online_feedback(verdict, ticker, run_id=_run_id) | |
| tag_for_review(verdict, ticker, run_id=_run_id) | |
| except Exception as exc: | |
| logger.warning("Structured output failed for %s, falling back to plain LLM: %s", ticker, exc) | |
| try: | |
| verdict = invoke_with_fallback(prompt, run_name="analyst_node") | |
| stats = get_kelly_stats() | |
| v_upper = verdict.upper() | |
| verdict_type = "AVOID" | |
| if "STRONG BUY" in v_upper: | |
| verdict_type = "STRONG BUY" | |
| elif "BUY" in v_upper: | |
| verdict_type = "BUY" | |
| elif "WATCH" in v_upper: | |
| verdict_type = "WATCH" | |
| pos = calculate_position_size(stats, verdict_type) | |
| if pos > 0: | |
| verdict += ( | |
| f"\n\n### POSITION SIZING (Kelly Criterion)\n" | |
| f"**Recommended allocation: {pos:.1f}% of portfolio**" | |
| ) | |
| record_paper_trade(ticker, price, verdict, source="Chainlit UI", | |
| position_size=pos) | |
| _run_id = get_current_run_id() | |
| log_online_feedback(verdict, ticker, run_id=_run_id, is_fallback=True) | |
| tag_for_review(verdict, ticker, run_id=_run_id, is_fallback=True) | |
| except Exception as exc2: | |
| logger.error("LLM analysis failed for %s: %s", ticker, exc2) | |
| verdict = f"Strategy: {strategy}\nLLM analysis unavailable: {exc2}" | |
| return {"final_verdict": verdict, "final_report": verdict, "chart_data": chart_bytes} | |
| # --- GRAPH --- | |
| _api_retry = RetryPolicy(max_attempts=3, initial_interval=2.0) | |
| workflow = StateGraph(AgentState) | |
| workflow.add_node("chat", chat_node, retry=_api_retry) | |
| workflow.add_node("scout", scout_node, retry=_api_retry) | |
| workflow.add_node("gatekeeper", gatekeeper_node, retry=_api_retry) | |
| workflow.add_node("analyst", analyst_node, retry=_api_retry) | |
| def initial_routing(state): | |
| """Direct traffic: spaces -> chat, otherwise -> scout.""" | |
| query = str(state.get("ticker", "")) | |
| if " " in query: | |
| return "chat" | |
| return "scout" | |
| workflow.add_conditional_edges(START, initial_routing, ["chat", "scout"]) | |
| workflow.add_edge("chat", END) | |
| workflow.add_edge("scout", "gatekeeper") | |
| workflow.add_edge("analyst", END) | |
| app = workflow.compile(checkpointer=InMemorySaver()) | |