""" server.py - custom Spectrum 2 frontend for Gradio Server mode. This is the Off-Brand entry point: instead of the default Gradio component render, a gradio.Server instance serves the hand-built React UI (ui_kits/chan-compass/) and exposes the unchanged Python backend as JSON/SSE endpoints. The HF Space uses sdk: gradio and runs app.py, which calls app.launch(). Architecture: gradio.Server app /api/... -> JSON + Server-Sent-Events endpoints calling the backend / -> the React frontend (static), Spectrum 2 design system Backend modules (signal_runner, rotation, news_watch, research_agent, automation, emailer, finetune_data, llm_local) are imported and called with no business-logic changes. """ from __future__ import annotations import json import os from fastapi import Request from fastapi.responses import StreamingResponse, JSONResponse, FileResponse from fastapi.staticfiles import StaticFiles from gradio import Server import paths # sets up /data, sys.path import automation import signal_runner import rotation import news_watch import research_agent import emailer import finetune_data import llm_local HERE = os.path.dirname(os.path.abspath(__file__)) UI_DIR = os.path.join(HERE, "ui_kits", "chan-compass") app = Server(title="Chan Compass · US") # ───────────────────────── helpers ───────────────────────── def _sse(gen): """Wrap a text generator as Server-Sent Events (one 'data:' per chunk).""" def stream(): for chunk in gen: yield f"data: {json.dumps({'text': chunk})}\n\n" yield "event: done\ndata: {}\n\n" return StreamingResponse(stream(), media_type="text/event-stream") def _df_records(df): if df is None or not hasattr(df, "to_dict"): return [] return df.to_dict(orient="records") # ───────────────────────── Signals ───────────────────────── @app.get("/api/last-results") async def last_results(): return JSONResponse(automation.load_results()) @app.post("/api/signals/run") async def signals_run(req: Request): body = await req.json() tickers = [t.strip().upper() for t in (body.get("pool") or "").replace("\n", ",").split(",") if t.strip()] force = bool(body.get("force")) df, details, summary, errors = signal_runner.run_signals(tickers or None, force=force) automation.STATE["signals_df"] = df automation.STATE["signals_details"] = details automation.STATE["signals_summary"] = summary return JSONResponse({"rows": _df_records(df), "summary": summary, "tickers": sorted(details.keys())}) @app.get("/api/signals/raw") async def signals_raw(ticker: str): return JSONResponse({"raw": signal_runner.stock_raw_read(ticker)}) @app.get("/api/signals/summary") async def signals_summary(ticker: str): raw = signal_runner.stock_raw_read(ticker or "") if not raw: return _sse(iter(["Run the analysis and select a ticker first."])) chain = automation.STATE.get("signals_details", {}).get(ticker or "", "") chain_core = chain.split("日线买卖点逐项诊断")[0].strip()[:2000] if chain else "" prompt = ("You are an equity analyst. Write a SHORT plain-English summary " "(≤100 words) for a long-term holder of a US stock: the situation " "today, whether to act or wait, and the key price levels.\n" "Use the FACT LINE for the numbers, and the RULING CHAIN (a Chinese " "multi-timeframe Chan-theory decision log) for the reasoning — " "translate and synthesize it; output ENGLISH ONLY, no Chinese " "characters, do not quote the log, no disclaimers.\n\n" f"FACT LINE:\n{raw}") if chain_core: prompt += f"\n\nRULING CHAIN (translate & synthesize, don't quote):\n{chain_core}" def gen(): final = "" for acc in llm_local.chat_stream(prompt, max_tokens=240, temperature=0.2, worker="interpreter"): final = acc yield acc try: finetune_data.record(raw, final) except Exception: pass return _sse(gen()) # ───────────────────────── Sector Rotation ───────────────────────── @app.get("/api/rotation") async def rotation_tables(): d1, d5, d20, asof = rotation.build_rotation(force=True) automation.STATE["rotation"] = (d1, d5, d20, asof) return JSONResponse({ "asof": asof, "d1": _df_records(rotation.fmt_table(d1)), "d5": _df_records(rotation.fmt_table(d5)), "d20": _df_records(rotation.fmt_table(d20)), }) @app.get("/api/rotation/narrative") async def rotation_narrative(): rot = automation.STATE.get("rotation") if not rot or rot[0] is None: return _sse(iter(["Refresh the rotation tables first."])) d1, d5, d20, _ = rot brief = rotation.rotation_brief(d1, d5, d20) prompt = ("You are a US equity market strategist. Based only on the sector flow " "data below (SPDR ETF proxy: change% × dollar volume, plus RS vs SPY), " "write a crisp brief (<150 words): 1) where capital is rotating " "INTO/OUT OF; 2) do 1-day moves agree with the 5/20-day trend; 3) one " "watch item. No disclaimers.\n\nDATA:\n" + brief[:2200]) return _sse(llm_local.chat_stream(prompt, max_tokens=340, worker="narrator")) # ───────────────────────── Watchlist News ───────────────────────── @app.get("/api/news/holdings") async def news_holdings(): return JSONResponse({"holdings": news_watch.load_holdings()}) @app.post("/api/news/save") async def news_save(req: Request): body = await req.json() tickers = [t.strip().upper() for t in (body.get("holdings") or "").replace("\n", ",").split(",") if t.strip()] news_watch.save_holdings(tickers) return JSONResponse({"saved": tickers}) @app.get("/api/news/check") async def news_check(): return _sse(news_watch.check_holdings_news_stream()) # ───────────────────────── Auto Research ───────────────────────── @app.get("/api/research/run") async def research_run(ticker: str): def gen(): last_report = "" for progress, report in research_agent.run_research_stream(ticker): last_report = report yield json.dumps({"progress": progress, "report": report}) yield json.dumps({"progress": "__done__", "report": last_report, "reports": research_agent.list_reports()}) def stream(): for chunk in gen(): yield f"data: {chunk}\n\n" yield "event: done\ndata: {}\n\n" return StreamingResponse(stream(), media_type="text/event-stream") @app.get("/api/research/reports") async def research_reports(): return JSONResponse({"reports": research_agent.list_reports()}) @app.get("/api/research/report") async def research_report(name: str): return JSONResponse({"markdown": research_agent.read_report(name)}) # ───────────────────────── Automation ───────────────────────── @app.post("/api/automation/run") async def automation_run(): import threading if automation.STATE.get("running"): return JSONResponse({"message": "Pipeline already running — watch the log."}) threading.Thread(target=lambda: automation.run_pipeline(force=True), daemon=True).start() return JSONResponse({"message": "Pipeline started — the log updates live below."}) @app.get("/api/automation/state") async def automation_state(): return JSONResponse({ "log": automation.STATE.get("log", [])[-40:], "schedule": automation.schedule_info(), "traces": research_agent.list_traces(), "running": bool(automation.STATE.get("running")), }) @app.post("/api/automation/publish-traces") async def automation_publish(req: Request): body = await req.json() import trace_publish return JSONResponse({"status": trace_publish.publish_traces(body.get("repo", ""))}) # ───────────────────────── Model ───────────────────────── @app.get("/api/market/status") async def market_status(): import datetime as dt try: from zoneinfo import ZoneInfo now = dt.datetime.now(ZoneInfo("America/New_York")) except Exception: now = dt.datetime.utcnow() wd = now.weekday() # 0=Mon … 6=Sun minutes = now.hour * 60 + now.minute is_weekday = wd < 5 # regular session 9:30–16:00 ET is_open = is_weekday and (9*60+30) <= minutes < (16*60) if is_open: label, variant = "Market open", "positive" elif is_weekday and minutes < (9*60+30): label, variant = "Pre-market", "notice" elif is_weekday and minutes >= (16*60): label, variant = "After hours", "notice" else: label, variant = "Market closed · weekend", "neutral" return JSONResponse({"open": is_open, "label": label, "variant": variant}) @app.get("/api/model/list") async def model_list(): return JSONResponse({ "models": list(llm_local.MODEL_ZOO.keys()), "analyst": llm_local.WORKERS["analyst"]["model"], "analyst_ready": llm_local.WORKERS["analyst"]["llm"] is not None, }) @app.post("/api/model/load") async def model_load(req: Request): body = await req.json() name = body.get("model", "") if name not in llm_local.MODEL_ZOO: return JSONResponse({"status": "⚠️ Unknown model."}) import threading threading.Thread(target=lambda: llm_local.load_model(name, worker="analyst"), daemon=True).start() return JSONResponse({"status": f"⏳ Loading {name} onto the Analyst sub-agent…"}) @app.get("/api/model/status") async def model_status(): return JSONResponse({ "status": llm_local.status(), "workers": {k: {"model": w["model"], "ready": w["llm"] is not None, "stage": w["stage"]} for k, w in llm_local.WORKERS.items()}, }) _SELFTEST = {"running": False, "result": ""} @app.post("/api/model/test") async def model_test(): import threading if _SELFTEST["running"]: return JSONResponse({"started": True, "running": True}) def _run(): _SELFTEST["running"] = True _SELFTEST["result"] = "" try: _SELFTEST["result"] = llm_local.quick_test() except Exception as e: _SELFTEST["result"] = f"❌ {e}" _SELFTEST["running"] = False threading.Thread(target=_run, daemon=True).start() return JSONResponse({"started": True, "running": True}) @app.get("/api/model/test-status") async def model_test_status(): return JSONResponse({"running": _SELFTEST["running"], "result": _SELFTEST["result"]}) @app.get("/api/model/finetune-status") async def finetune_status(): return JSONResponse({"status": finetune_data.status_line(), "count": finetune_data.count()}) @app.post("/api/model/export-dataset") async def export_dataset(): path = finetune_data.export() if not path: return JSONResponse({"path": "", "count": 0, "download": ""}) # Copy into a WRITABLE served dir. In Docker, /app is read-only for the # non-root user, so use /data (persistent) when available, else the temp dir. import shutil import tempfile base = paths.DATA_ROOT if paths.PERSISTENT else tempfile.gettempdir() served_dir = os.path.join(base, "exports") try: os.makedirs(served_dir, exist_ok=True) except OSError: served_dir = tempfile.gettempdir() fname = os.path.basename(path) try: shutil.copy(path, os.path.join(served_dir, fname)) except OSError: pass return JSONResponse({"path": path, "count": finetune_data.count(), "download": "/download/" + fname}) @app.get("/download/{fname}") async def download_file(fname: str): import tempfile base = paths.DATA_ROOT if paths.PERSISTENT else tempfile.gettempdir() fname = os.path.basename(fname) for d in (os.path.join(base, "exports"), os.path.join(tempfile.gettempdir(), "exports"), tempfile.gettempdir()): served = os.path.join(d, fname) if os.path.exists(served): return FileResponse(served, filename=fname, media_type="application/jsonl") return JSONResponse({"error": "not found"}, status_code=404) # ───────────────────────── Email (all tabs) ───────────────────────── @app.post("/api/email") async def send_email(req: Request): body = await req.json() status = emailer.send_result(body.get("content", ""), body.get("to", ""), body.get("tag", "Chan Compass")) return JSONResponse({"status": status}) # ───────────────────────── startup ───────────────────────── def _boot(): try: automation._SCHED = automation.start_scheduler() except Exception: pass if os.environ.get("AUTO_LOAD_MODEL", "1") == "1": import threading threading.Thread(target=llm_local.auto_load_all, daemon=True).start() # Static frontend mounted LAST so /api/* and /download/* win. app.mount("/", StaticFiles(directory=UI_DIR, html=True), name="ui") @app.middleware("http") async def _no_cache_assets(request, call_next): resp = await call_next(request) p = request.url.path if p.endswith((".jsx", ".js", ".css", ".html")) or p == "/": resp.headers["Cache-Control"] = "no-cache, no-store, must-revalidate" resp.headers["Pragma"] = "no-cache" resp.headers["Expires"] = "0" return resp # Boot background services at import time (uvicorn serves `app` directly). _boot() if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=int(os.environ.get("PORT", "7860")))