Chan-Compass / server.py
ranranrunforit's picture
Upload 33 files
5c8eff4 verified
Raw
History Blame Contribute Delete
14.7 kB
"""
server.py - custom Spectrum 2 frontend for Gradio Server mode.
This is the Off-Brand entry point: instead of the default Gradio component
render, a gradio.Server instance serves the hand-built React UI
(ui_kits/chan-compass/) and exposes the unchanged Python backend as JSON/SSE
endpoints. The HF Space uses sdk: gradio and runs app.py, which calls
app.launch().
Architecture:
gradio.Server app
/api/... -> JSON + Server-Sent-Events endpoints calling the backend
/ -> the React frontend (static), Spectrum 2 design system
Backend modules (signal_runner, rotation, news_watch, research_agent,
automation, emailer, finetune_data, llm_local) are imported and called with no
business-logic changes.
"""
from __future__ import annotations
import json
import os
from fastapi import Request
from fastapi.responses import StreamingResponse, JSONResponse, FileResponse
from fastapi.staticfiles import StaticFiles
from gradio import Server
import paths # sets up /data, sys.path
import automation
import signal_runner
import rotation
import news_watch
import research_agent
import emailer
import finetune_data
import llm_local
HERE = os.path.dirname(os.path.abspath(__file__))
UI_DIR = os.path.join(HERE, "ui_kits", "chan-compass")
app = Server(title="Chan Compass · US")
# ───────────────────────── helpers ─────────────────────────
def _sse(gen):
"""Wrap a text generator as Server-Sent Events (one 'data:' per chunk)."""
def stream():
for chunk in gen:
yield f"data: {json.dumps({'text': chunk})}\n\n"
yield "event: done\ndata: {}\n\n"
return StreamingResponse(stream(), media_type="text/event-stream")
def _df_records(df):
if df is None or not hasattr(df, "to_dict"):
return []
return df.to_dict(orient="records")
# ───────────────────────── Signals ─────────────────────────
@app.get("/api/last-results")
async def last_results():
return JSONResponse(automation.load_results())
@app.post("/api/signals/run")
async def signals_run(req: Request):
body = await req.json()
tickers = [t.strip().upper() for t in
(body.get("pool") or "").replace("\n", ",").split(",") if t.strip()]
force = bool(body.get("force"))
df, details, summary, errors = signal_runner.run_signals(tickers or None, force=force)
automation.STATE["signals_df"] = df
automation.STATE["signals_details"] = details
automation.STATE["signals_summary"] = summary
return JSONResponse({"rows": _df_records(df), "summary": summary,
"tickers": sorted(details.keys())})
@app.get("/api/signals/raw")
async def signals_raw(ticker: str):
return JSONResponse({"raw": signal_runner.stock_raw_read(ticker)})
@app.get("/api/signals/summary")
async def signals_summary(ticker: str):
raw = signal_runner.stock_raw_read(ticker or "")
if not raw:
return _sse(iter(["Run the analysis and select a ticker first."]))
chain = automation.STATE.get("signals_details", {}).get(ticker or "", "")
chain_core = chain.split("日线买卖点逐项诊断")[0].strip()[:2000] if chain else ""
prompt = ("You are an equity analyst. Write a SHORT plain-English summary "
"(≤100 words) for a long-term holder of a US stock: the situation "
"today, whether to act or wait, and the key price levels.\n"
"Use the FACT LINE for the numbers, and the RULING CHAIN (a Chinese "
"multi-timeframe Chan-theory decision log) for the reasoning — "
"translate and synthesize it; output ENGLISH ONLY, no Chinese "
"characters, do not quote the log, no disclaimers.\n\n"
f"FACT LINE:\n{raw}")
if chain_core:
prompt += f"\n\nRULING CHAIN (translate & synthesize, don't quote):\n{chain_core}"
def gen():
final = ""
for acc in llm_local.chat_stream(prompt, max_tokens=240, temperature=0.2,
worker="interpreter"):
final = acc
yield acc
try:
finetune_data.record(raw, final)
except Exception:
pass
return _sse(gen())
# ───────────────────────── Sector Rotation ─────────────────────────
@app.get("/api/rotation")
async def rotation_tables():
d1, d5, d20, asof = rotation.build_rotation(force=True)
automation.STATE["rotation"] = (d1, d5, d20, asof)
return JSONResponse({
"asof": asof,
"d1": _df_records(rotation.fmt_table(d1)),
"d5": _df_records(rotation.fmt_table(d5)),
"d20": _df_records(rotation.fmt_table(d20)),
})
@app.get("/api/rotation/narrative")
async def rotation_narrative():
rot = automation.STATE.get("rotation")
if not rot or rot[0] is None:
return _sse(iter(["Refresh the rotation tables first."]))
d1, d5, d20, _ = rot
brief = rotation.rotation_brief(d1, d5, d20)
prompt = ("You are a US equity market strategist. Based only on the sector flow "
"data below (SPDR ETF proxy: change% × dollar volume, plus RS vs SPY), "
"write a crisp brief (<150 words): 1) where capital is rotating "
"INTO/OUT OF; 2) do 1-day moves agree with the 5/20-day trend; 3) one "
"watch item. No disclaimers.\n\nDATA:\n" + brief[:2200])
return _sse(llm_local.chat_stream(prompt, max_tokens=340, worker="narrator"))
# ───────────────────────── Watchlist News ─────────────────────────
@app.get("/api/news/holdings")
async def news_holdings():
return JSONResponse({"holdings": news_watch.load_holdings()})
@app.post("/api/news/save")
async def news_save(req: Request):
body = await req.json()
tickers = [t.strip().upper() for t in
(body.get("holdings") or "").replace("\n", ",").split(",") if t.strip()]
news_watch.save_holdings(tickers)
return JSONResponse({"saved": tickers})
@app.get("/api/news/check")
async def news_check():
return _sse(news_watch.check_holdings_news_stream())
# ───────────────────────── Auto Research ─────────────────────────
@app.get("/api/research/run")
async def research_run(ticker: str):
def gen():
last_report = ""
for progress, report in research_agent.run_research_stream(ticker):
last_report = report
yield json.dumps({"progress": progress, "report": report})
yield json.dumps({"progress": "__done__", "report": last_report,
"reports": research_agent.list_reports()})
def stream():
for chunk in gen():
yield f"data: {chunk}\n\n"
yield "event: done\ndata: {}\n\n"
return StreamingResponse(stream(), media_type="text/event-stream")
@app.get("/api/research/reports")
async def research_reports():
return JSONResponse({"reports": research_agent.list_reports()})
@app.get("/api/research/report")
async def research_report(name: str):
return JSONResponse({"markdown": research_agent.read_report(name)})
# ───────────────────────── Automation ─────────────────────────
@app.post("/api/automation/run")
async def automation_run():
import threading
if automation.STATE.get("running"):
return JSONResponse({"message": "Pipeline already running — watch the log."})
threading.Thread(target=lambda: automation.run_pipeline(force=True), daemon=True).start()
return JSONResponse({"message": "Pipeline started — the log updates live below."})
@app.get("/api/automation/state")
async def automation_state():
return JSONResponse({
"log": automation.STATE.get("log", [])[-40:],
"schedule": automation.schedule_info(),
"traces": research_agent.list_traces(),
"running": bool(automation.STATE.get("running")),
})
@app.post("/api/automation/publish-traces")
async def automation_publish(req: Request):
body = await req.json()
import trace_publish
return JSONResponse({"status": trace_publish.publish_traces(body.get("repo", ""))})
# ───────────────────────── Model ─────────────────────────
@app.get("/api/market/status")
async def market_status():
import datetime as dt
try:
from zoneinfo import ZoneInfo
now = dt.datetime.now(ZoneInfo("America/New_York"))
except Exception:
now = dt.datetime.utcnow()
wd = now.weekday() # 0=Mon … 6=Sun
minutes = now.hour * 60 + now.minute
is_weekday = wd < 5
# regular session 9:30–16:00 ET
is_open = is_weekday and (9*60+30) <= minutes < (16*60)
if is_open:
label, variant = "Market open", "positive"
elif is_weekday and minutes < (9*60+30):
label, variant = "Pre-market", "notice"
elif is_weekday and minutes >= (16*60):
label, variant = "After hours", "notice"
else:
label, variant = "Market closed · weekend", "neutral"
return JSONResponse({"open": is_open, "label": label, "variant": variant})
@app.get("/api/model/list")
async def model_list():
return JSONResponse({
"models": list(llm_local.MODEL_ZOO.keys()),
"analyst": llm_local.WORKERS["analyst"]["model"],
"analyst_ready": llm_local.WORKERS["analyst"]["llm"] is not None,
})
@app.post("/api/model/load")
async def model_load(req: Request):
body = await req.json()
name = body.get("model", "")
if name not in llm_local.MODEL_ZOO:
return JSONResponse({"status": "⚠️ Unknown model."})
import threading
threading.Thread(target=lambda: llm_local.load_model(name, worker="analyst"),
daemon=True).start()
return JSONResponse({"status": f"⏳ Loading {name} onto the Analyst sub-agent…"})
@app.get("/api/model/status")
async def model_status():
return JSONResponse({
"status": llm_local.status(),
"workers": {k: {"model": w["model"], "ready": w["llm"] is not None,
"stage": w["stage"]}
for k, w in llm_local.WORKERS.items()},
})
_SELFTEST = {"running": False, "result": ""}
@app.post("/api/model/test")
async def model_test():
import threading
if _SELFTEST["running"]:
return JSONResponse({"started": True, "running": True})
def _run():
_SELFTEST["running"] = True
_SELFTEST["result"] = ""
try:
_SELFTEST["result"] = llm_local.quick_test()
except Exception as e:
_SELFTEST["result"] = f"❌ {e}"
_SELFTEST["running"] = False
threading.Thread(target=_run, daemon=True).start()
return JSONResponse({"started": True, "running": True})
@app.get("/api/model/test-status")
async def model_test_status():
return JSONResponse({"running": _SELFTEST["running"], "result": _SELFTEST["result"]})
@app.get("/api/model/finetune-status")
async def finetune_status():
return JSONResponse({"status": finetune_data.status_line(),
"count": finetune_data.count()})
@app.post("/api/model/export-dataset")
async def export_dataset():
path = finetune_data.export()
if not path:
return JSONResponse({"path": "", "count": 0, "download": ""})
# Copy into a WRITABLE served dir. In Docker, /app is read-only for the
# non-root user, so use /data (persistent) when available, else the temp dir.
import shutil
import tempfile
base = paths.DATA_ROOT if paths.PERSISTENT else tempfile.gettempdir()
served_dir = os.path.join(base, "exports")
try:
os.makedirs(served_dir, exist_ok=True)
except OSError:
served_dir = tempfile.gettempdir()
fname = os.path.basename(path)
try:
shutil.copy(path, os.path.join(served_dir, fname))
except OSError:
pass
return JSONResponse({"path": path, "count": finetune_data.count(),
"download": "/download/" + fname})
@app.get("/download/{fname}")
async def download_file(fname: str):
import tempfile
base = paths.DATA_ROOT if paths.PERSISTENT else tempfile.gettempdir()
fname = os.path.basename(fname)
for d in (os.path.join(base, "exports"), os.path.join(tempfile.gettempdir(), "exports"),
tempfile.gettempdir()):
served = os.path.join(d, fname)
if os.path.exists(served):
return FileResponse(served, filename=fname, media_type="application/jsonl")
return JSONResponse({"error": "not found"}, status_code=404)
# ───────────────────────── Email (all tabs) ─────────────────────────
@app.post("/api/email")
async def send_email(req: Request):
body = await req.json()
status = emailer.send_result(body.get("content", ""), body.get("to", ""),
body.get("tag", "Chan Compass"))
return JSONResponse({"status": status})
# ───────────────────────── startup ─────────────────────────
def _boot():
try:
automation._SCHED = automation.start_scheduler()
except Exception:
pass
if os.environ.get("AUTO_LOAD_MODEL", "1") == "1":
import threading
threading.Thread(target=llm_local.auto_load_all, daemon=True).start()
# Static frontend mounted LAST so /api/* and /download/* win.
app.mount("/", StaticFiles(directory=UI_DIR, html=True), name="ui")
@app.middleware("http")
async def _no_cache_assets(request, call_next):
resp = await call_next(request)
p = request.url.path
if p.endswith((".jsx", ".js", ".css", ".html")) or p == "/":
resp.headers["Cache-Control"] = "no-cache, no-store, must-revalidate"
resp.headers["Pragma"] = "no-cache"
resp.headers["Expires"] = "0"
return resp
# Boot background services at import time (uvicorn serves `app` directly).
_boot()
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0",
port=int(os.environ.get("PORT", "7860")))