linvest21's picture
download
raw
47.9 kB
from __future__ import annotations
import hashlib
import json
from pathlib import Path
import shutil
import textwrap
from typing import Any
import zipfile
from n21.config import write_json
from n21.settings import IMPLEMENTATION_PRODUCTS_ROOT, REPO_ROOT, SHFT_WORKSPACE_ROOT
from observability.audit_log import utc_now
DEFAULT_BASE_MODEL = "meta-llama/Meta-Llama-3-8B"
DEFAULT_SOURCE_RUN_ID = "run_step19_repair_train_001"
ROLE_NAMES = [
"chief_investment_officer",
"client_portfolio_manager",
"performance_manager",
"portfolio_manager",
"researcher",
"risk_manager",
]
def product_dir_for(release_id: str) -> Path:
return IMPLEMENTATION_PRODUCTS_ROOT / release_id
def product_zip_for(release_id: str) -> Path:
return IMPLEMENTATION_PRODUCTS_ROOT / f"{release_id}.zip"
def find_release_dir(release_id: str) -> tuple[Path, str]:
product_dir = product_dir_for(release_id)
return product_dir, "implementation_products"
def sha256_file(path: Path) -> str:
digest = hashlib.sha256()
with path.open("rb") as handle:
for chunk in iter(lambda: handle.read(1024 * 1024), b""):
digest.update(chunk)
return digest.hexdigest()
def copy_tree(src: Path, dst: Path) -> None:
if not src.exists():
raise FileNotFoundError(f"required release source does not exist: {src}")
if dst.exists():
shutil.rmtree(dst)
shutil.copytree(src, dst)
def copy_if_exists(src: Path, dst: Path) -> bool:
if not src.exists():
return False
dst.parent.mkdir(parents=True, exist_ok=True)
shutil.copy2(src, dst)
return True
def collect_hashes(root: Path) -> dict[str, str]:
hashes: dict[str, str] = {}
for path in sorted(root.rglob("*")):
if path.is_file():
hashes[str(path.relative_to(root)).replace("\\", "/")] = sha256_file(path)
return hashes
def write_text(path: Path, content: str) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(textwrap.dedent(content).lstrip(), encoding="utf-8", newline="\n")
def infer_release_identity(release_id: str, *, model_id: str | None = None, asset_class: str | None = None, role: str | None = None) -> dict[str, str | None]:
parsed_asset = asset_class
parsed_role = role
prefix = "linvest21_fingpt_"
if (not parsed_asset or not parsed_role) and release_id.lower().startswith(prefix):
stem = release_id[len(prefix):]
for suffix in ["_v1_000", "_v1_001", "_v1_002"]:
if stem.lower().endswith(suffix):
stem = stem[: -len(suffix)]
break
for candidate_role in ROLE_NAMES:
marker = f"_{candidate_role}"
if stem.endswith(marker):
parsed_asset = parsed_asset or stem[: -len(marker)]
parsed_role = parsed_role or candidate_role
break
display_model_id = model_id or release_id
return {
"model_id": display_model_id,
"asset_class": parsed_asset,
"role": parsed_role,
}
def role_title(value: str | None) -> str:
if not value:
return "Financial Analysis Assistant"
return value.replace("_", " ").title()
def write_runtime_files(release_dir: Path, *, release_id: str, model_id: str, asset_class: str | None, role: str | None, base_model_id: str) -> None:
runtime = release_dir / "runtime"
role_label = role_title(role)
domain = asset_class.replace("_", " ") if asset_class else "financial"
write_json(
runtime / "chat_config.json",
{
"release_id": release_id,
"model_id": model_id,
"display_name": model_id,
"asset_class": asset_class,
"role": role,
"base_model": base_model_id,
"adapter_dir": "../model/adapter",
"max_new_tokens": 120,
"temperature": 0.2,
"top_p": 0.9,
"system_prompt": (
f"You are {model_id}, the Linvest21_FinGPT {domain} {role_label} super-agent. "
"Answer in that specialist role. Give concise, factual, numerate answers, separate facts from inference, "
"state uncertainty when data is incomplete, and avoid personalized investment advice."
),
"api": {
"host": "127.0.0.1",
"port": 8765,
"token_env_var": "LINVEST21_API_TOKEN",
"require_token": True,
},
},
)
write_text(
runtime / "requirements_cpu.txt",
"""
torch
transformers
peft
accelerate
safetensors
huggingface_hub
""",
)
write_text(
runtime / "requirements_gpu_cuda121.txt",
"""
--index-url https://download.pytorch.org/whl/cu121
torch
--extra-index-url https://pypi.org/simple
transformers
peft
accelerate
safetensors
huggingface_hub
""",
)
write_text(
runtime / "chat_console.py",
r'''
from __future__ import annotations
import argparse
import json
from pathlib import Path
import sys
import time
def status(message: str) -> None:
print(f"[Linvest21_FinGPT {time.strftime('%H:%M:%S')}] {message}", flush=True)
def load_config(path: Path) -> dict:
with path.open("r", encoding="utf-8") as handle:
config = json.load(handle)
base = path.parent
config["adapter_dir"] = str((base / config["adapter_dir"]).resolve())
return config
def build_prompt(tokenizer, messages: list[dict[str, str]]) -> str:
if getattr(tokenizer, "chat_template", None):
return tokenizer.apply_chat_template(messages, tokenize=False, add_generation_prompt=True)
lines = [f"{item['role'].capitalize()}: {item['content']}" for item in messages]
lines.append("Assistant:")
return "\n".join(lines)
def main(argv: list[str] | None = None) -> int:
parser = argparse.ArgumentParser(description="Portable Linvest21_FinGPT chat console")
parser.add_argument("--config", default=str(Path(__file__).with_name("chat_config.json")))
parser.add_argument("--base-model")
parser.add_argument("--adapter-dir")
parser.add_argument("--max-new-tokens", type=int)
parser.add_argument("--temperature", type=float)
parser.add_argument("--top-p", type=float)
args = parser.parse_args(argv)
config = load_config(Path(args.config).resolve())
for key in ["base_model", "adapter_dir", "max_new_tokens", "temperature", "top_p"]:
value = getattr(args, key, None)
if value is not None:
config[key] = value
model_id = config.get("model_id", config.get("release_id", "linvest21_fingpt"))
try:
import torch
from peft import PeftModel
from transformers import AutoModelForCausalLM, AutoTokenizer
except ImportError as exc:
raise SystemExit(f"Missing dependency: {exc.name}. Run install_cpu.bat or install_gpu_cuda121.bat first.") from exc
adapter_dir = Path(config["adapter_dir"]).resolve()
if not (adapter_dir / "adapter_model.safetensors").exists():
raise SystemExit(f"Adapter weights not found: {adapter_dir}")
device = "cuda" if torch.cuda.is_available() else "cpu"
dtype = torch.float16 if device == "cuda" else torch.float32
status(f"model_id={model_id}")
status(f"base_model={config['base_model']}")
status(f"adapter_dir={adapter_dir}")
status(f"device={device}")
if device == "cpu":
status("CPU inference can be slow for Llama-3-8B.")
tokenizer = AutoTokenizer.from_pretrained(config["base_model"], use_fast=True)
if tokenizer.pad_token_id is None:
tokenizer.pad_token = tokenizer.eos_token
model = AutoModelForCausalLM.from_pretrained(
config["base_model"],
torch_dtype=dtype,
low_cpu_mem_usage=True,
)
model = PeftModel.from_pretrained(model, str(adapter_dir))
model.eval()
model.to(device)
model.generation_config.max_length = None
messages = [{"role": "system", "content": config["system_prompt"]}]
status("ready. Type /clear to reset and /exit to quit.")
while True:
try:
user_text = input("\nlinvest21> ").strip()
except (KeyboardInterrupt, EOFError):
print()
return 0
if not user_text:
continue
if user_text.lower() in {"/exit", "exit", "quit", "/quit"}:
return 0
if user_text.lower() == "/clear":
messages = [{"role": "system", "content": config["system_prompt"]}]
status("context cleared")
continue
messages.append({"role": "user", "content": user_text})
inputs = tokenizer(build_prompt(tokenizer, messages), return_tensors="pt").to(device)
input_len = inputs["input_ids"].shape[-1]
status("generating")
with torch.no_grad():
output = model.generate(
**inputs,
max_new_tokens=int(config["max_new_tokens"]),
do_sample=float(config["temperature"]) > 0,
temperature=float(config["temperature"]),
top_p=float(config["top_p"]),
pad_token_id=tokenizer.eos_token_id,
eos_token_id=tokenizer.eos_token_id,
)
answer = tokenizer.decode(output[0][input_len:], skip_special_tokens=True).strip() or "[empty response]"
print(f"\n{model_id}: {answer}")
messages.append({"role": "assistant", "content": answer})
if __name__ == "__main__":
raise SystemExit(main())
''',
)
write_text(
runtime / "install_cpu.bat",
r"""
@echo off
setlocal
set "PYTHON_EXE=python"
if exist "%USERPROFILE%\miniconda3\python.exe" set "PYTHON_EXE=%USERPROFILE%\miniconda3\python.exe"
"%PYTHON_EXE%" -m pip install -r "%~dp0requirements_cpu.txt"
endlocal
""",
)
write_text(
runtime / "install_gpu_cuda121.bat",
r"""
@echo off
setlocal
set "PYTHON_EXE=python"
if exist "%USERPROFILE%\miniconda3\python.exe" set "PYTHON_EXE=%USERPROFILE%\miniconda3\python.exe"
"%PYTHON_EXE%" -m pip install -r "%~dp0requirements_gpu_cuda121.txt"
endlocal
""",
)
write_text(
runtime / "run_chat_cpu.bat",
r"""
@echo off
setlocal
set "PYTHON_EXE=python"
if exist "%USERPROFILE%\miniconda3\python.exe" set "PYTHON_EXE=%USERPROFILE%\miniconda3\python.exe"
set "CUDA_VISIBLE_DEVICES="
cd /d "%~dp0"
"%PYTHON_EXE%" chat_console.py --config "%~dp0chat_config.json"
endlocal
""",
)
write_text(
runtime / "run_chat_gpu.bat",
r"""
@echo off
setlocal
set "PYTHON_EXE=python"
if exist "%USERPROFILE%\miniconda3\python.exe" set "PYTHON_EXE=%USERPROFILE%\miniconda3\python.exe"
cd /d "%~dp0"
"%PYTHON_EXE%" chat_console.py --config "%~dp0chat_config.json"
endlocal
""",
)
write_text(
runtime / "run_api_cpu.bat",
r"""
@echo off
setlocal
set "PYTHON_EXE=python"
if exist "%USERPROFILE%\miniconda3\python.exe" set "PYTHON_EXE=%USERPROFILE%\miniconda3\python.exe"
set "CUDA_VISIBLE_DEVICES="
if "%LINVEST21_API_TOKEN%"=="" (
echo [SHFT API] LINVEST21_API_TOKEN is not set. Set it in this shell before starting the API.
exit /b 2
)
cd /d "%~dp0"
"%PYTHON_EXE%" serve_api.py --config "%~dp0chat_config.json"
endlocal
""",
)
write_text(
runtime / "run_api_gpu.bat",
r"""
@echo off
setlocal
set "PYTHON_EXE=python"
if exist "%USERPROFILE%\miniconda3\python.exe" set "PYTHON_EXE=%USERPROFILE%\miniconda3\python.exe"
if "%LINVEST21_API_TOKEN%"=="" (
echo [SHFT API] LINVEST21_API_TOKEN is not set. Set it in this shell before starting the API.
exit /b 2
)
cd /d "%~dp0"
"%PYTHON_EXE%" serve_api.py --config "%~dp0chat_config.json"
endlocal
""",
)
write_text(
runtime / "run_api_self_test.bat",
r"""
@echo off
setlocal
set "PYTHON_EXE=python"
if exist "%USERPROFILE%\miniconda3\python.exe" set "PYTHON_EXE=%USERPROFILE%\miniconda3\python.exe"
cd /d "%~dp0"
"%PYTHON_EXE%" self_test_api_contract.py --config "%~dp0chat_config.json"
endlocal
""",
)
write_text(
runtime / "serve_api.py",
r'''
from __future__ import annotations
import argparse
import json
import os
from pathlib import Path
from http.server import BaseHTTPRequestHandler, HTTPServer
import time
import uuid
def load_config(path: Path) -> dict:
with path.open("r", encoding="utf-8") as handle:
config = json.load(handle)
base = path.parent
config["adapter_dir"] = str((base / config["adapter_dir"]).resolve())
return config
def build_prompt(tokenizer, messages: list[dict[str, str]]) -> str:
if getattr(tokenizer, "chat_template", None):
return tokenizer.apply_chat_template(messages, tokenize=False, add_generation_prompt=True)
lines = [f"{item['role'].capitalize()}: {item['content']}" for item in messages]
lines.append("Assistant:")
return "\n".join(lines)
class Linvest21Model:
def __init__(self, config: dict):
self.config = config
self.base_model_id = config["base_model"]
self.model_id = config.get("model_id", config.get("release_id", "linvest21_fingpt"))
self.adapter_dir = Path(config["adapter_dir"]).resolve()
self.device = "cpu"
self.tokenizer = None
self.model = None
def load(self) -> None:
try:
import torch
from peft import PeftModel
from transformers import AutoModelForCausalLM, AutoTokenizer
except ImportError as exc:
raise SystemExit(f"Missing dependency: {exc.name}. Run install_cpu.bat or install_gpu_cuda121.bat first.") from exc
self.device = "cuda" if torch.cuda.is_available() else "cpu"
dtype = torch.float16 if self.device == "cuda" else torch.float32
print(f"[SHFT API] loading model_id={self.model_id}", flush=True)
print(f"[SHFT API] loading base_model={self.base_model_id}", flush=True)
tokenizer = AutoTokenizer.from_pretrained(self.base_model_id, use_fast=True)
if tokenizer.pad_token_id is None:
tokenizer.pad_token = tokenizer.eos_token
model = AutoModelForCausalLM.from_pretrained(self.base_model_id, torch_dtype=dtype, low_cpu_mem_usage=True)
if (self.adapter_dir / "adapter_model.safetensors").exists():
print(f"[SHFT API] loading adapter={self.adapter_dir}", flush=True)
model = PeftModel.from_pretrained(model, str(self.adapter_dir))
model.eval()
model.to(self.device)
model.generation_config.max_length = None
self.tokenizer = tokenizer
self.model = model
print(f"[SHFT API] ready device={self.device}", flush=True)
def generate(self, messages: list[dict[str, str]], *, max_new_tokens: int | None = None, temperature: float | None = None, top_p: float | None = None) -> dict:
if self.model is None or self.tokenizer is None:
raise RuntimeError("model is not loaded")
import torch
if not messages or messages[0].get("role") != "system":
messages = [{"role": "system", "content": self.config["system_prompt"]}, *messages]
max_new_tokens = int(max_new_tokens or self.config.get("max_new_tokens", 120))
temperature = float(self.config.get("temperature", 0.2) if temperature is None else temperature)
top_p = float(self.config.get("top_p", 0.9) if top_p is None else top_p)
inputs = self.tokenizer(build_prompt(self.tokenizer, messages), return_tensors="pt").to(self.device)
input_len = inputs["input_ids"].shape[-1]
started = time.time()
with torch.no_grad():
output = self.model.generate(
**inputs,
max_new_tokens=max_new_tokens,
do_sample=temperature > 0,
temperature=temperature,
top_p=top_p,
pad_token_id=self.tokenizer.eos_token_id,
eos_token_id=self.tokenizer.eos_token_id,
)
answer = self.tokenizer.decode(output[0][input_len:], skip_special_tokens=True).strip() or "[empty response]"
output_len = output.shape[-1] - input_len
return {
"content": answer,
"usage": {
"prompt_tokens": int(input_len),
"completion_tokens": int(output_len),
"total_tokens": int(output.shape[-1]),
},
"latency_ms": round((time.time() - started) * 1000, 2),
}
def make_handler(engine: Linvest21Model, api_token: str | None, require_token: bool):
class Handler(BaseHTTPRequestHandler):
server_version = "Linvest21FinGPTAPI/1.0"
def _send_json(self, code: int, payload: dict) -> None:
body = json.dumps(payload).encode("utf-8")
self.send_response(code)
self.send_header("Content-Type", "application/json")
self.send_header("Content-Length", str(len(body)))
self.end_headers()
self.wfile.write(body)
def _authorized(self) -> bool:
if not require_token:
return True
header = self.headers.get("Authorization", "")
return bool(api_token) and header == f"Bearer {api_token}"
def _read_json(self) -> dict:
length = int(self.headers.get("Content-Length", "0"))
if length <= 0:
return {}
return json.loads(self.rfile.read(length).decode("utf-8"))
def do_GET(self):
if self.path == "/health":
self._send_json(200, {"ok": True, "service": "linvest21_fingpt", "model": engine.model_id, "base_model": engine.base_model_id, "device": engine.device, "auth_required": require_token})
return
if self.path == "/v1/models":
if not self._authorized():
self._send_json(401, {"error": {"message": "missing or invalid bearer token"}})
return
self._send_json(200, {"object": "list", "data": [{"id": engine.model_id, "object": "model", "base_model": engine.base_model_id, "asset_class": engine.config.get("asset_class"), "role": engine.config.get("role")}]})
return
self._send_json(404, {"error": {"message": "not found"}})
def do_POST(self):
if self.path not in {"/v1/chat/completions", "/v1/generate"}:
self._send_json(404, {"error": {"message": "not found"}})
return
if not self._authorized():
self._send_json(401, {"error": {"message": "missing or invalid bearer token"}})
return
try:
payload = self._read_json()
if self.path == "/v1/generate":
prompt = str(payload.get("prompt", ""))
messages = [{"role": "user", "content": prompt}]
else:
messages = payload.get("messages", [])
if not isinstance(messages, list):
raise ValueError("messages must be a list")
result = engine.generate(
messages,
max_new_tokens=payload.get("max_new_tokens"),
temperature=payload.get("temperature"),
top_p=payload.get("top_p"),
)
except Exception as exc:
self._send_json(400, {"error": {"message": str(exc)}})
return
response = {
"id": f"chatcmpl-{uuid.uuid4().hex}",
"object": "chat.completion",
"created": int(time.time()),
"model": engine.model_id,
"choices": [{"index": 0, "message": {"role": "assistant", "content": result["content"]}, "finish_reason": "stop"}],
"usage": result["usage"],
"latency_ms": result["latency_ms"],
}
self._send_json(200, response)
def log_message(self, fmt: str, *args) -> None:
print(f"[SHFT API] {self.address_string()} {fmt % args}", flush=True)
return Handler
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Token-protected JSON API for portable Linvest21_FinGPT.")
parser.add_argument("--config", default=str(Path(__file__).with_name("chat_config.json")))
parser.add_argument("--host")
parser.add_argument("--port", type=int)
parser.add_argument("--allow-no-token", action="store_true", help="Local development only: disable bearer-token requirement.")
args = parser.parse_args()
config = load_config(Path(args.config).resolve())
api_config = config.get("api", {})
host = args.host or api_config.get("host", "127.0.0.1")
port = int(args.port or api_config.get("port", 8765))
token_env_var = api_config.get("token_env_var", "LINVEST21_API_TOKEN")
token = os.environ.get(token_env_var)
require_token = bool(api_config.get("require_token", True)) and not args.allow_no_token
if require_token and not token:
raise SystemExit(f"{token_env_var} is required. Set it in the shell; do not store it in files.")
engine = Linvest21Model(config)
engine.load()
print(f"[SHFT API] serving http://{host}:{port} auth_required={require_token}", flush=True)
HTTPServer((host, port), make_handler(engine, token, require_token)).serve_forever()
''',
)
write_text(
runtime / "self_test_api_contract.py",
r'''
from __future__ import annotations
import argparse
import json
from pathlib import Path
REQUIRED_ENDPOINTS = [
"GET /health",
"GET /v1/models",
"POST /v1/chat/completions",
"POST /v1/generate",
]
def main() -> int:
parser = argparse.ArgumentParser(description="Self-test the portable Linvest21_FinGPT JSON API contract without loading model weights.")
parser.add_argument("--config", default=str(Path(__file__).with_name("chat_config.json")))
args = parser.parse_args()
runtime_dir = Path(args.config).resolve().parent
config = json.loads(Path(args.config).read_text(encoding="utf-8"))
api = config.get("api", {})
files = {
"serve_api": runtime_dir / "serve_api.py",
"run_api_cpu": runtime_dir / "run_api_cpu.bat",
"run_api_gpu": runtime_dir / "run_api_gpu.bat",
"chat_config": Path(args.config).resolve(),
}
errors: list[str] = []
for name, path in files.items():
if not path.exists():
errors.append(f"missing {name}: {path}")
if api.get("require_token") is not True:
errors.append("chat_config api.require_token must be true")
if api.get("token_env_var") != "LINVEST21_API_TOKEN":
errors.append("chat_config api.token_env_var must be LINVEST21_API_TOKEN")
source = files["serve_api"].read_text(encoding="utf-8") if files["serve_api"].exists() else ""
for marker in ["/health", "/v1/models", "/v1/chat/completions", "/v1/generate", "Authorization", "Bearer"]:
if marker not in source:
errors.append(f"serve_api.py missing marker: {marker}")
report = {
"ok": not errors,
"errors": errors,
"api": {
"host": api.get("host"),
"port": api.get("port"),
"require_token": api.get("require_token"),
"token_env_var": api.get("token_env_var"),
"endpoints": REQUIRED_ENDPOINTS,
"json_support": True,
"token_storage_policy": "runtime_environment_only",
},
"files": {name: str(path) for name, path in files.items()},
}
print(json.dumps(report, indent=2))
return 0 if report["ok"] else 3
if __name__ == "__main__":
raise SystemExit(main())
''',
)
tools = release_dir / "tools"
write_text(
tools / "merge_hf_model.py",
r'''
from __future__ import annotations
import argparse
from pathlib import Path
def main() -> int:
parser = argparse.ArgumentParser(description="Merge a base HF model and Linvest21 LoRA adapter into a full HF model directory.")
parser.add_argument("--base-model", default="meta-llama/Meta-Llama-3-8B")
parser.add_argument("--adapter-dir", default=str(Path(__file__).resolve().parents[1] / "model" / "adapter"))
parser.add_argument("--output-dir", default=str(Path(__file__).resolve().parents[1] / "model" / "merged_hf"))
parser.add_argument("--dtype", choices=["float16", "bfloat16", "float32"], default="float16")
args = parser.parse_args()
import torch
from peft import PeftModel
from transformers import AutoModelForCausalLM, AutoTokenizer
dtype = {"float16": torch.float16, "bfloat16": torch.bfloat16, "float32": torch.float32}[args.dtype]
output_dir = Path(args.output_dir).resolve()
output_dir.mkdir(parents=True, exist_ok=True)
print(f"[SHFT merge] loading base={args.base_model}")
base = AutoModelForCausalLM.from_pretrained(args.base_model, torch_dtype=dtype, low_cpu_mem_usage=True)
print(f"[SHFT merge] loading adapter={args.adapter_dir}")
model = PeftModel.from_pretrained(base, args.adapter_dir)
print("[SHFT merge] merging adapter into base model")
merged = model.merge_and_unload()
print(f"[SHFT merge] saving merged model to {output_dir}")
merged.save_pretrained(output_dir, safe_serialization=True)
tokenizer = AutoTokenizer.from_pretrained(args.base_model, use_fast=True)
tokenizer.save_pretrained(output_dir)
print("[SHFT merge] completed")
return 0
if __name__ == "__main__":
raise SystemExit(main())
''',
)
write_text(
tools / "run_merge_hf_model.bat",
r"""
@echo off
setlocal
set "PYTHON_EXE=python"
if exist "%USERPROFILE%\miniconda3\python.exe" set "PYTHON_EXE=%USERPROFILE%\miniconda3\python.exe"
"%PYTHON_EXE%" "%~dp0merge_hf_model.py" --base-model meta-llama/Meta-Llama-3-8B --adapter-dir "%~dp0..\model\adapter" --output-dir "%~dp0..\model\merged_hf"
endlocal
""",
)
write_text(
tools / "EXPORT_MODES.md",
"""
# Export Modes
## adapter_only
Includes the trained LoRA adapter and runtime. This is the default small portable bundle. It still needs the target machine to download or cache the gated base model.
## merged_hf
Includes a full Hugging Face model directory under `model/merged_hf` when provided during export or produced with `tools/run_merge_hf_model.bat`.
This mode is easier to run on another machine, but the merged model contains base-model weights and must only be distributed under the applicable Meta Llama and upstream adapter license terms.
## quantized_gguf
Includes a `model/gguf/*.gguf` file when provided during export. This is the most portable CPU/RAM path for llama.cpp-style inference.
Produce it from `model/merged_hf` with llama.cpp conversion and quantization tools on a machine with enough disk and memory.
""",
)
def write_release_docs(
release_dir: Path,
release_id: str,
export_mode: str,
source_run_id: str,
base_model_id: str,
*,
model_id: str,
asset_class: str | None,
role: str | None,
) -> None:
write_text(
release_dir / "README.md",
f"""
# Linvest21 FinGPT Release Bundle: {release_id}
This directory is a portable SHFT implementation release bundle.
Runtime model ID:
```text
{model_id}
```
Asset class: `{asset_class or "unspecified"}`
Role: `{role or "unspecified"}`
It can be produced and certified from the source workspace with:
```powershell
.\\impl_codex\\scripts\\run_shft_0_to_16_sample_to_implementation.bat
```
## What Is Included
- `model/adapter`: the trained Linvest21 LoRA adapter copied from `{source_run_id}`.
- `runtime`: portable Windows chat/API scripts and dependency manifests.
- `runtime/serve_api.py`: token-protected JSON API server.
- `runtime/self_test_api_contract.py`: API contract verifier that does not load model weights.
- `evidence`: copied evaluation, training, and candidate evidence.
- `release_manifest.json`: machine-readable release metadata.
- `release_hashes.json`: SHA-256 hashes for release files.
## Export Mode
```text
{export_mode}
```
`adapter_only` is small and portable, but still requires access to the gated base model:
```text
{base_model_id}
```
## Run
First install dependencies:
```powershell
.\\runtime\\install_cpu.bat
```
Or, on a CUDA machine:
```powershell
.\\runtime\\install_gpu_cuda121.bat
```
Then start chat:
```powershell
.\\runtime\\run_chat_cpu.bat
```
Or:
```powershell
.\\runtime\\run_chat_gpu.bat
```
## JSON API
Set a local API token in the shell, then start the API:
```powershell
set LINVEST21_API_TOKEN=<your-local-api-token>
.\\runtime\\run_api_cpu.bat
```
Endpoints:
```text
GET /health
GET /v1/models Authorization: Bearer <token>
POST /v1/chat/completions Authorization: Bearer <token>
POST /v1/generate Authorization: Bearer <token>
```
Tokens are never stored in the release bundle or implementation config.
Self-test the API contract without loading model weights:
```powershell
.\\runtime\\run_api_self_test.bat
```
From the SHFT source workspace, certify the generated portable bundle before copying it to another machine:
```powershell
.\\impl_codex\\scripts\\self_certify_portable_api.bat {release_id}
```
## Security
Do not place Hugging Face tokens in this directory. Set `HF_TOKEN` in the target machine environment or authenticate with:
```powershell
hf auth login
```
## License Boundary
This bundle includes the Linvest21 adapter. The base model remains governed by the Meta Llama license and Hugging Face gated-access controls unless a future `merged_hf` or `quantized_gguf` export is produced and approved for distribution.
""",
)
write_text(
release_dir / "docs" / "SECURITY.md",
"""
# Security Notes
- Real provider tokens must not be committed or copied into this release.
- Use `HF_TOKEN` or Hugging Face CLI login on the target host.
- Confirm the target host is authorized for `meta-llama/Meta-Llama-3-8B`.
- Do not distribute merged or quantized base-model weights unless license and access policy permit it.
- Treat `evidence/paired_predictions.jsonl` as potentially sensitive model-output evidence.
""",
)
write_text(
release_dir / "docs" / "MODEL_CARD.md",
f"""
# Linvest21_FinGPT Model Card
Release: `{release_id}`
Runtime model ID: `{model_id}`
Asset class: `{asset_class or "unspecified"}`
Role: `{role or "unspecified"}`
Base model: `{base_model_id}`
Adapter source run: `{source_run_id}`
Intended use: internal Linvest21 financial-analysis experimentation and SHFT lifecycle validation for the named super-agent role.
Current status: exported and locally self-certified. Production promotion is controlled separately by SHFT promotion evidence and approvals.
Known limitation: local CPU inference is slow for Llama-3-8B. This adapter-only bundle still requires authorized access to the base model.
""",
)
def copy_evidence(release_dir: Path, source_run_id: str) -> dict[str, bool]:
evidence = release_dir / "evidence"
source_run = SHFT_WORKSPACE_ROOT / "runs" / source_run_id
copied = {
"candidate_manifest": copy_if_exists(source_run / "candidate_manifest.json", evidence / "candidate_manifest.json"),
"training_result": copy_if_exists(source_run / "remote_artifacts" / "training_result.json", evidence / "training_result.json"),
"training_plan": copy_if_exists(source_run / "remote_artifacts" / "training_plan.json", evidence / "training_plan.json"),
"smoke_eval_manifest": copy_if_exists(
SHFT_WORKSPACE_ROOT / "runs" / "run_step21_repair_smoke_eval_001" / "evaluation_manifest.json",
evidence / "run_step21_repair_smoke_eval_001" / "evaluation_manifest.json",
),
"smoke_eval_report": copy_if_exists(
SHFT_WORKSPACE_ROOT / "runs" / "run_step21_repair_smoke_eval_001" / "remote_artifacts" / "eval" / "paired_eval_report.json",
evidence / "run_step21_repair_smoke_eval_001" / "paired_eval_report.json",
),
"full_eval_manifest": copy_if_exists(
SHFT_WORKSPACE_ROOT / "runs" / "run_step20_repair_eval_001" / "evaluation_manifest.json",
evidence / "run_step20_repair_eval_001" / "evaluation_manifest.json",
),
"full_eval_report": copy_if_exists(
SHFT_WORKSPACE_ROOT / "runs" / "run_step20_repair_eval_001" / "remote_artifacts" / "eval" / "paired_eval_report.json",
evidence / "run_step20_repair_eval_001" / "paired_eval_report.json",
),
}
return copied
def export_release(
*,
release_id: str,
source_run_id: str = DEFAULT_SOURCE_RUN_ID,
export_mode: str = "adapter_only",
base_model_id: str = DEFAULT_BASE_MODEL,
model_id: str | None = None,
asset_class: str | None = None,
role: str | None = None,
merged_model_dir: str | None = None,
gguf_model_path: str | None = None,
zip_release: bool = False,
) -> dict[str, Any]:
if export_mode not in {"adapter_only", "merged_hf", "quantized_gguf"}:
raise ValueError(f"unsupported export mode: {export_mode}")
release_dir = product_dir_for(release_id)
if release_dir.exists():
shutil.rmtree(release_dir)
release_dir.mkdir(parents=True, exist_ok=True)
source_run = SHFT_WORKSPACE_ROOT / "runs" / source_run_id
adapter_src = source_run / "remote_artifacts" / "adapter"
if not adapter_src.exists():
raise FileNotFoundError(f"adapter source not found: {adapter_src}")
identity = infer_release_identity(release_id, model_id=model_id, asset_class=asset_class, role=role)
runtime_model_id = str(identity["model_id"] or release_id)
runtime_asset_class = identity["asset_class"]
runtime_role = identity["role"]
copy_tree(adapter_src, release_dir / "model" / "adapter")
copied_merged_model = False
copied_gguf_model = False
if merged_model_dir:
copy_tree(Path(merged_model_dir), release_dir / "model" / "merged_hf")
copied_merged_model = True
if gguf_model_path:
gguf_src = Path(gguf_model_path)
if not gguf_src.exists():
raise FileNotFoundError(f"GGUF model not found: {gguf_src}")
gguf_dst = release_dir / "model" / "gguf" / gguf_src.name
gguf_dst.parent.mkdir(parents=True, exist_ok=True)
shutil.copy2(gguf_src, gguf_dst)
copied_gguf_model = True
write_runtime_files(
release_dir,
release_id=release_id,
model_id=runtime_model_id,
asset_class=runtime_asset_class,
role=runtime_role,
base_model_id=base_model_id,
)
evidence = copy_evidence(release_dir, source_run_id)
write_release_docs(
release_dir,
release_id,
export_mode,
source_run_id,
base_model_id,
model_id=runtime_model_id,
asset_class=runtime_asset_class,
role=runtime_role,
)
model_payload: dict[str, Any] = {
"adapter_path": "model/adapter",
"base_model_id": base_model_id,
"export_mode": export_mode,
"self_containment_level": "adapter_plus_runtime",
"requires_base_model_download": True,
}
if export_mode == "merged_hf":
status = "completed" if copied_merged_model else "ready_to_merge"
model_payload.update(
{
"merged_model_path": "model/merged_hf" if copied_merged_model else None,
"self_containment_level": "full_hf_model" if copied_merged_model else "adapter_plus_merge_tool",
"requires_base_model_download": False,
"status": status,
"note": "Merged model copied into release." if copied_merged_model else "Run tools/run_merge_hf_model.bat on a licensed high-RAM host to create model/merged_hf.",
}
)
elif export_mode == "quantized_gguf":
status = "completed" if copied_gguf_model else "ready_for_conversion"
model_payload.update(
{
"gguf_model_path": f"model/gguf/{Path(gguf_model_path).name}" if gguf_model_path else None,
"self_containment_level": "full_quantized_gguf" if copied_gguf_model else "adapter_plus_conversion_tooling",
"requires_base_model_download": False,
"status": status,
"note": "GGUF model copied into release." if copied_gguf_model else "Create model/merged_hf, then use llama.cpp conversion and quantization tools.",
}
)
manifest = {
"release_id": release_id,
"model_id": runtime_model_id,
"asset_class": runtime_asset_class,
"role": runtime_role,
"created_at": utc_now(),
"source_run_id": source_run_id,
"source_run_dir": str(source_run.relative_to(REPO_ROOT)).replace("\\", "/"),
"product_root": str(IMPLEMENTATION_PRODUCTS_ROOT.relative_to(REPO_ROOT)).replace("\\", "/"),
"release_dir": str(release_dir.relative_to(REPO_ROOT)).replace("\\", "/"),
"path_policy": {
"write_target": "impl_codex/implementation_products/<model_id>",
"legacy_write_enabled": False,
},
"model": model_payload,
"runtime": {
"chat_console": "runtime/chat_console.py",
"serve_api": "runtime/serve_api.py",
"api_self_test": "runtime/self_test_api_contract.py",
"run_chat_cpu": "runtime/run_chat_cpu.bat",
"run_chat_gpu": "runtime/run_chat_gpu.bat",
"run_api_cpu": "runtime/run_api_cpu.bat",
"run_api_gpu": "runtime/run_api_gpu.bat",
"run_api_self_test": "runtime/run_api_self_test.bat",
"config": "runtime/chat_config.json",
"requirements_cpu": "runtime/requirements_cpu.txt",
"requirements_gpu_cuda121": "runtime/requirements_gpu_cuda121.txt",
},
"evidence_copied": evidence,
"promotion_status": "not_promoted",
"distribution_policy": {
"adapter_only_distribution": "allowed_with_internal_policy",
"merged_or_quantized_distribution": "requires_license_and_access_review",
"tokens_in_bundle": "forbidden",
},
"heavy_assets": {
"merged_model_copied": copied_merged_model,
"gguf_model_copied": copied_gguf_model,
},
}
write_json(release_dir / "release_manifest.json", manifest)
write_json(release_dir / "release_hashes.json", collect_hashes(release_dir))
archive_path = None
if zip_release:
archive_path = shutil.make_archive(str(release_dir), "zip", root_dir=release_dir)
result = {
"status": "completed",
"release_id": release_id,
"product_dir": str(release_dir),
"release_dir": str(release_dir),
"export_mode": export_mode,
"archive": archive_path,
"manifest": str(release_dir / "release_manifest.json"),
"path_policy": "exports write to impl_codex/implementation_products only",
}
write_json(SHFT_WORKSPACE_ROOT / "registry" / "releases" / f"{release_id}.json", result)
return result
def validate_release(release_id: str) -> dict[str, Any]:
release_dir, storage_location = find_release_dir(release_id)
errors: list[str] = []
warnings: list[str] = []
if not release_dir.exists():
return {"ok": False, "release_id": release_id, "errors": [f"release directory not found: {release_dir}"], "warnings": []}
required_files = [
"README.md",
"release_manifest.json",
"release_hashes.json",
"model/adapter/adapter_config.json",
"model/adapter/adapter_model.safetensors",
"runtime/chat_config.json",
"runtime/chat_console.py",
"runtime/serve_api.py",
"runtime/self_test_api_contract.py",
"runtime/run_chat_cpu.bat",
"runtime/run_chat_gpu.bat",
"runtime/run_api_cpu.bat",
"runtime/run_api_gpu.bat",
"runtime/run_api_self_test.bat",
"runtime/install_cpu.bat",
"runtime/install_gpu_cuda121.bat",
"tools/merge_hf_model.py",
"tools/run_merge_hf_model.bat",
"tools/EXPORT_MODES.md",
"docs/MODEL_CARD.md",
"docs/SECURITY.md",
]
for rel in required_files:
if not (release_dir / rel).exists():
errors.append(f"missing required release file: {rel}")
try:
manifest = json.loads((release_dir / "release_manifest.json").read_text(encoding="utf-8"))
except (FileNotFoundError, json.JSONDecodeError) as exc:
manifest = {}
errors.append(f"release_manifest.json is invalid: {exc}")
else:
if manifest.get("release_id") != release_id:
errors.append("release manifest id does not match release directory")
model = manifest.get("model", {})
if model.get("export_mode") == "adapter_only" and not model.get("requires_base_model_download"):
errors.append("adapter_only release must record that base model download/cache is required")
if manifest.get("distribution_policy", {}).get("tokens_in_bundle") != "forbidden":
errors.append("release distribution policy must forbid tokens in bundle")
try:
stored_hashes = json.loads((release_dir / "release_hashes.json").read_text(encoding="utf-8"))
except (FileNotFoundError, json.JSONDecodeError) as exc:
stored_hashes = {}
errors.append(f"release_hashes.json is invalid: {exc}")
for rel, expected in stored_hashes.items():
path = release_dir / rel
if not path.exists():
errors.append(f"hashed file missing: {rel}")
continue
actual = sha256_file(path)
if actual.lower() != str(expected).lower():
errors.append(f"hash mismatch for {rel}")
current_files = {
str(path.relative_to(release_dir)).replace("\\", "/")
for path in release_dir.rglob("*")
if path.is_file() and path.name not in {"release_hashes.json", "self_test_report.json"}
}
missing_hashes = sorted(current_files - set(stored_hashes))
if missing_hashes:
warnings.append(f"files not listed in release_hashes.json: {missing_hashes}")
archive_path = release_dir.with_suffix(".zip")
archive_ok = False
if archive_path.exists():
try:
with zipfile.ZipFile(archive_path) as archive:
bad = archive.testzip()
if bad:
errors.append(f"zip archive has corrupt member: {bad}")
else:
archive_ok = True
except zipfile.BadZipFile as exc:
errors.append(f"zip archive is invalid: {exc}")
else:
warnings.append(f"release zip archive not found: {archive_path}")
report = {
"ok": not errors,
"release_id": release_id,
"storage_location": storage_location,
"product_dir": str(product_dir_for(release_id)),
"release_dir": str(release_dir),
"archive": str(archive_path),
"archive_ok": archive_ok,
"required_file_count": len(required_files),
"hashed_file_count": len(stored_hashes),
"errors": errors,
"warnings": warnings,
"validated_at": utc_now(),
}
write_json(release_dir / "self_test_report.json", report)
write_json(SHFT_WORKSPACE_ROOT / "registry" / "releases" / f"{release_id}_self_test.json", report)
return report

Xet Storage Details

Size:
47.9 kB
·
Xet hash:
14d804f9a51ff11d8cfb97e7bcdb37988aa7cf184f65173aa0ff260c0e4bd327

Xet efficiently stores files, intelligently splitting them into unique chunks and accelerating uploads and downloads. More info.