| """ |
| server.py - custom Spectrum 2 frontend for Gradio Server mode. |
| |
| This is the Off-Brand entry point: instead of the default Gradio component |
| render, a gradio.Server instance serves the hand-built React UI |
| (ui_kits/chan-compass/) and exposes the unchanged Python backend as JSON/SSE |
| endpoints. The HF Space uses sdk: gradio and runs app.py, which calls |
| app.launch(). |
| |
| Architecture: |
| gradio.Server app |
| /api/... -> JSON + Server-Sent-Events endpoints calling the backend |
| / -> the React frontend (static), Spectrum 2 design system |
| |
| Backend modules (signal_runner, rotation, news_watch, research_agent, |
| automation, emailer, finetune_data, llm_local) are imported and called with no |
| business-logic changes. |
| """ |
| from __future__ import annotations |
|
|
| import json |
| import os |
|
|
| from fastapi import Request |
| from fastapi.responses import StreamingResponse, JSONResponse, FileResponse |
| from fastapi.staticfiles import StaticFiles |
| from gradio import Server |
|
|
| import paths |
| import automation |
| import signal_runner |
| import rotation |
| import news_watch |
| import research_agent |
| import emailer |
| import finetune_data |
| import llm_local |
|
|
| HERE = os.path.dirname(os.path.abspath(__file__)) |
| UI_DIR = os.path.join(HERE, "ui_kits", "chan-compass") |
|
|
| app = Server(title="Chan Compass · US") |
|
|
|
|
| |
| def _sse(gen): |
| """Wrap a text generator as Server-Sent Events (one 'data:' per chunk).""" |
| def stream(): |
| for chunk in gen: |
| yield f"data: {json.dumps({'text': chunk})}\n\n" |
| yield "event: done\ndata: {}\n\n" |
| return StreamingResponse(stream(), media_type="text/event-stream") |
|
|
|
|
| def _df_records(df): |
| if df is None or not hasattr(df, "to_dict"): |
| return [] |
| return df.to_dict(orient="records") |
|
|
|
|
| |
| @app.get("/api/last-results") |
| async def last_results(): |
| return JSONResponse(automation.load_results()) |
|
|
|
|
| @app.post("/api/signals/run") |
| async def signals_run(req: Request): |
| body = await req.json() |
| tickers = [t.strip().upper() for t in |
| (body.get("pool") or "").replace("\n", ",").split(",") if t.strip()] |
| force = bool(body.get("force")) |
| df, details, summary, errors = signal_runner.run_signals(tickers or None, force=force) |
| automation.STATE["signals_df"] = df |
| automation.STATE["signals_details"] = details |
| automation.STATE["signals_summary"] = summary |
| return JSONResponse({"rows": _df_records(df), "summary": summary, |
| "tickers": sorted(details.keys())}) |
|
|
|
|
| @app.get("/api/signals/raw") |
| async def signals_raw(ticker: str): |
| return JSONResponse({"raw": signal_runner.stock_raw_read(ticker)}) |
|
|
|
|
| @app.get("/api/signals/summary") |
| async def signals_summary(ticker: str): |
| raw = signal_runner.stock_raw_read(ticker or "") |
| if not raw: |
| return _sse(iter(["Run the analysis and select a ticker first."])) |
| chain = automation.STATE.get("signals_details", {}).get(ticker or "", "") |
| chain_core = chain.split("日线买卖点逐项诊断")[0].strip()[:2000] if chain else "" |
| prompt = ("You are an equity analyst. Write a SHORT plain-English summary " |
| "(≤100 words) for a long-term holder of a US stock: the situation " |
| "today, whether to act or wait, and the key price levels.\n" |
| "Use the FACT LINE for the numbers, and the RULING CHAIN (a Chinese " |
| "multi-timeframe Chan-theory decision log) for the reasoning — " |
| "translate and synthesize it; output ENGLISH ONLY, no Chinese " |
| "characters, do not quote the log, no disclaimers.\n\n" |
| f"FACT LINE:\n{raw}") |
| if chain_core: |
| prompt += f"\n\nRULING CHAIN (translate & synthesize, don't quote):\n{chain_core}" |
|
|
| def gen(): |
| final = "" |
| for acc in llm_local.chat_stream(prompt, max_tokens=240, temperature=0.2, |
| worker="interpreter"): |
| final = acc |
| yield acc |
| try: |
| finetune_data.record(raw, final) |
| except Exception: |
| pass |
| return _sse(gen()) |
|
|
|
|
| |
| @app.get("/api/rotation") |
| async def rotation_tables(): |
| d1, d5, d20, asof = rotation.build_rotation(force=True) |
| automation.STATE["rotation"] = (d1, d5, d20, asof) |
| return JSONResponse({ |
| "asof": asof, |
| "d1": _df_records(rotation.fmt_table(d1)), |
| "d5": _df_records(rotation.fmt_table(d5)), |
| "d20": _df_records(rotation.fmt_table(d20)), |
| }) |
|
|
|
|
| @app.get("/api/rotation/narrative") |
| async def rotation_narrative(): |
| rot = automation.STATE.get("rotation") |
| if not rot or rot[0] is None: |
| return _sse(iter(["Refresh the rotation tables first."])) |
| d1, d5, d20, _ = rot |
| brief = rotation.rotation_brief(d1, d5, d20) |
| prompt = ("You are a US equity market strategist. Based only on the sector flow " |
| "data below (SPDR ETF proxy: change% × dollar volume, plus RS vs SPY), " |
| "write a crisp brief (<150 words): 1) where capital is rotating " |
| "INTO/OUT OF; 2) do 1-day moves agree with the 5/20-day trend; 3) one " |
| "watch item. No disclaimers.\n\nDATA:\n" + brief[:2200]) |
| return _sse(llm_local.chat_stream(prompt, max_tokens=340, worker="narrator")) |
|
|
|
|
| |
| @app.get("/api/news/holdings") |
| async def news_holdings(): |
| return JSONResponse({"holdings": news_watch.load_holdings()}) |
|
|
|
|
| @app.post("/api/news/save") |
| async def news_save(req: Request): |
| body = await req.json() |
| tickers = [t.strip().upper() for t in |
| (body.get("holdings") or "").replace("\n", ",").split(",") if t.strip()] |
| news_watch.save_holdings(tickers) |
| return JSONResponse({"saved": tickers}) |
|
|
|
|
| @app.get("/api/news/check") |
| async def news_check(): |
| return _sse(news_watch.check_holdings_news_stream()) |
|
|
|
|
| |
| @app.get("/api/research/run") |
| async def research_run(ticker: str): |
| def gen(): |
| last_report = "" |
| for progress, report in research_agent.run_research_stream(ticker): |
| last_report = report |
| yield json.dumps({"progress": progress, "report": report}) |
| yield json.dumps({"progress": "__done__", "report": last_report, |
| "reports": research_agent.list_reports()}) |
|
|
| def stream(): |
| for chunk in gen(): |
| yield f"data: {chunk}\n\n" |
| yield "event: done\ndata: {}\n\n" |
| return StreamingResponse(stream(), media_type="text/event-stream") |
|
|
|
|
| @app.get("/api/research/reports") |
| async def research_reports(): |
| return JSONResponse({"reports": research_agent.list_reports()}) |
|
|
|
|
| @app.get("/api/research/report") |
| async def research_report(name: str): |
| return JSONResponse({"markdown": research_agent.read_report(name)}) |
|
|
|
|
| |
| @app.post("/api/automation/run") |
| async def automation_run(): |
| import threading |
| if automation.STATE.get("running"): |
| return JSONResponse({"message": "Pipeline already running — watch the log."}) |
| threading.Thread(target=lambda: automation.run_pipeline(force=True), daemon=True).start() |
| return JSONResponse({"message": "Pipeline started — the log updates live below."}) |
|
|
|
|
| @app.get("/api/automation/state") |
| async def automation_state(): |
| return JSONResponse({ |
| "log": automation.STATE.get("log", [])[-40:], |
| "schedule": automation.schedule_info(), |
| "traces": research_agent.list_traces(), |
| "running": bool(automation.STATE.get("running")), |
| }) |
|
|
|
|
| @app.post("/api/automation/publish-traces") |
| async def automation_publish(req: Request): |
| body = await req.json() |
| import trace_publish |
| return JSONResponse({"status": trace_publish.publish_traces(body.get("repo", ""))}) |
|
|
|
|
| |
| @app.get("/api/market/status") |
| async def market_status(): |
| import datetime as dt |
| try: |
| from zoneinfo import ZoneInfo |
| now = dt.datetime.now(ZoneInfo("America/New_York")) |
| except Exception: |
| now = dt.datetime.utcnow() |
| wd = now.weekday() |
| minutes = now.hour * 60 + now.minute |
| is_weekday = wd < 5 |
| |
| is_open = is_weekday and (9*60+30) <= minutes < (16*60) |
| if is_open: |
| label, variant = "Market open", "positive" |
| elif is_weekday and minutes < (9*60+30): |
| label, variant = "Pre-market", "notice" |
| elif is_weekday and minutes >= (16*60): |
| label, variant = "After hours", "notice" |
| else: |
| label, variant = "Market closed · weekend", "neutral" |
| return JSONResponse({"open": is_open, "label": label, "variant": variant}) |
|
|
|
|
| @app.get("/api/model/list") |
| async def model_list(): |
| return JSONResponse({ |
| "models": list(llm_local.MODEL_ZOO.keys()), |
| "analyst": llm_local.WORKERS["analyst"]["model"], |
| "analyst_ready": llm_local.WORKERS["analyst"]["llm"] is not None, |
| }) |
|
|
|
|
| @app.post("/api/model/load") |
| async def model_load(req: Request): |
| body = await req.json() |
| name = body.get("model", "") |
| if name not in llm_local.MODEL_ZOO: |
| return JSONResponse({"status": "⚠️ Unknown model."}) |
| import threading |
| threading.Thread(target=lambda: llm_local.load_model(name, worker="analyst"), |
| daemon=True).start() |
| return JSONResponse({"status": f"⏳ Loading {name} onto the Analyst sub-agent…"}) |
|
|
|
|
| @app.get("/api/model/status") |
| async def model_status(): |
| return JSONResponse({ |
| "status": llm_local.status(), |
| "workers": {k: {"model": w["model"], "ready": w["llm"] is not None, |
| "stage": w["stage"]} |
| for k, w in llm_local.WORKERS.items()}, |
| }) |
|
|
|
|
| _SELFTEST = {"running": False, "result": ""} |
|
|
|
|
| @app.post("/api/model/test") |
| async def model_test(): |
| import threading |
| if _SELFTEST["running"]: |
| return JSONResponse({"started": True, "running": True}) |
|
|
| def _run(): |
| _SELFTEST["running"] = True |
| _SELFTEST["result"] = "" |
| try: |
| _SELFTEST["result"] = llm_local.quick_test() |
| except Exception as e: |
| _SELFTEST["result"] = f"❌ {e}" |
| _SELFTEST["running"] = False |
|
|
| threading.Thread(target=_run, daemon=True).start() |
| return JSONResponse({"started": True, "running": True}) |
|
|
|
|
| @app.get("/api/model/test-status") |
| async def model_test_status(): |
| return JSONResponse({"running": _SELFTEST["running"], "result": _SELFTEST["result"]}) |
|
|
|
|
| @app.get("/api/model/finetune-status") |
| async def finetune_status(): |
| return JSONResponse({"status": finetune_data.status_line(), |
| "count": finetune_data.count()}) |
|
|
|
|
| @app.post("/api/model/export-dataset") |
| async def export_dataset(): |
| path = finetune_data.export() |
| if not path: |
| return JSONResponse({"path": "", "count": 0, "download": ""}) |
| |
| |
| import shutil |
| import tempfile |
| base = paths.DATA_ROOT if paths.PERSISTENT else tempfile.gettempdir() |
| served_dir = os.path.join(base, "exports") |
| try: |
| os.makedirs(served_dir, exist_ok=True) |
| except OSError: |
| served_dir = tempfile.gettempdir() |
| fname = os.path.basename(path) |
| try: |
| shutil.copy(path, os.path.join(served_dir, fname)) |
| except OSError: |
| pass |
| return JSONResponse({"path": path, "count": finetune_data.count(), |
| "download": "/download/" + fname}) |
|
|
|
|
| @app.get("/download/{fname}") |
| async def download_file(fname: str): |
| import tempfile |
| base = paths.DATA_ROOT if paths.PERSISTENT else tempfile.gettempdir() |
| fname = os.path.basename(fname) |
| for d in (os.path.join(base, "exports"), os.path.join(tempfile.gettempdir(), "exports"), |
| tempfile.gettempdir()): |
| served = os.path.join(d, fname) |
| if os.path.exists(served): |
| return FileResponse(served, filename=fname, media_type="application/jsonl") |
| return JSONResponse({"error": "not found"}, status_code=404) |
|
|
|
|
| |
| @app.post("/api/email") |
| async def send_email(req: Request): |
| body = await req.json() |
| status = emailer.send_result(body.get("content", ""), body.get("to", ""), |
| body.get("tag", "Chan Compass")) |
| return JSONResponse({"status": status}) |
|
|
|
|
| |
| def _boot(): |
| try: |
| automation._SCHED = automation.start_scheduler() |
| except Exception: |
| pass |
| if os.environ.get("AUTO_LOAD_MODEL", "1") == "1": |
| import threading |
| threading.Thread(target=llm_local.auto_load_all, daemon=True).start() |
|
|
|
|
| |
| app.mount("/", StaticFiles(directory=UI_DIR, html=True), name="ui") |
|
|
|
|
| @app.middleware("http") |
| async def _no_cache_assets(request, call_next): |
| resp = await call_next(request) |
| p = request.url.path |
| if p.endswith((".jsx", ".js", ".css", ".html")) or p == "/": |
| resp.headers["Cache-Control"] = "no-cache, no-store, must-revalidate" |
| resp.headers["Pragma"] = "no-cache" |
| resp.headers["Expires"] = "0" |
| return resp |
|
|
| |
| _boot() |
|
|
| if __name__ == "__main__": |
| import uvicorn |
| uvicorn.run(app, host="0.0.0.0", |
| port=int(os.environ.get("PORT", "7860"))) |
|
|