""" Evaluate one or more LoRA adapter repos against the AutoDataLab++ environment and pick the best. Designed for a Colab/T4/L4 GPU. Usage (in Colab/Notebook or shell): python3 training/eval_lora_adapters.py \ --base Qwen/Qwen2.5-14B-Instruct \ --adapters \ kartik1230/run-A \ kartik1230/run-B \ kartik1230/run-C \ --tasks easy_brief medium_brief hard_brief expert_brief \ --episodes 3 \ --hf-token hf_xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx \ --out training/eval_lora_results.json Notes: - Each adapter is loaded ONE AT A TIME on top of the same base model in 4-bit. - For each (adapter, task, episode) we let the LLM emit ONLY the FIRST action, then a deterministic continuation completes the episode (consult missing required experts -> summarize -> submit). Same scheme as the GRPO notebook. - The score is the env's terminal_grader_score (verifiable reward). - Results are saved as JSON; a summary table is printed. Pick whichever adapter has the highest mean score across tasks. """ from __future__ import annotations import argparse import gc import json import re import sys from pathlib import Path from typing import Any REPO = Path(__file__).resolve().parents[2] if str(REPO) not in sys.path: sys.path.insert(0, str(REPO)) from ceo_brief_env.environment import CEOBriefEnvironment, required_experts_for_task # noqa: E402 from ceo_brief_env.models import CoSAction # noqa: E402 SYSTEM_PROMPT = ( "You are the Chief of Staff in AutoDataLab++. You orchestrate four specialists: " "analyst, finance, strategy, hr. Reply with STRICT JSON only.\n" "Schema: {\"action_type\": one of [consult, ask, summarize, submit, noop], " "\"expert_id\": one of [analyst, finance, hr, strategy] or null}.\n" "Rules: consult each required expert at most once -> summarize -> submit." ) VALID_ACTIONS = {"consult", "ask", "summarize", "submit", "noop"} VALID_EXPERTS = {"analyst", "finance", "hr", "strategy"} _JSON_RE = re.compile(r"\{[^{}]*\}", re.S) def render_obs(obs) -> str: return ( f"task={obs.task_name} step={obs.step_count}/{obs.max_steps} " f"rag={obs.rag_enabled} " f"consulted={obs.consulted_experts} " f"brief_done={obs.current_brief is not None} " f"available={obs.available_experts}" ) def parse_action(text: str) -> CoSAction: m = _JSON_RE.search(text or "") if not m: return CoSAction(action_type="noop") try: a = json.loads(m.group(0)) except Exception: return CoSAction(action_type="noop") at = a.get("action_type") if at not in VALID_ACTIONS: return CoSAction(action_type="noop") eid = a.get("expert_id") if eid is not None and eid not in VALID_EXPERTS: eid = None return CoSAction(action_type=at, expert_id=eid) def deterministic_continuation(env: CEOBriefEnvironment, obs, task: str) -> float: while not obs.done and obs.step_count < obs.max_steps: missing = [e for e in required_experts_for_task(task) if e not in obs.consulted_experts] if missing: act = CoSAction(action_type="consult", expert_id=missing[0]) elif obs.current_brief is None: act = CoSAction(action_type="summarize") else: act = CoSAction(action_type="submit") obs = env.step(act) return float(obs.terminal_grader_score or 0.0) def evaluate_adapter(model, tok, adapter_repo: str, tasks: list[str], episodes: int, hf_token: str | None, subfolder: str | None = None) -> dict[str, Any]: import torch from peft import PeftModel print(f"[eval] loading adapter: {adapter_repo} subfolder={subfolder!r}") kwargs: dict[str, Any] = {"token": hf_token} if subfolder: kwargs["subfolder"] = subfolder peft_model = PeftModel.from_pretrained(model, adapter_repo, **kwargs) peft_model.eval() out: dict[str, Any] = {"adapter": adapter_repo, "per_task": {}, "raw": []} for task in tasks: scores: list[float] = [] for ep in range(episodes): env = CEOBriefEnvironment() obs = env.reset(task=task, use_rag=False) messages = [ {"role": "system", "content": SYSTEM_PROMPT}, {"role": "user", "content": render_obs(obs)}, ] text = tok.apply_chat_template(messages, tokenize=False, add_generation_prompt=True) ids = tok(text, return_tensors="pt").to(peft_model.device) with torch.no_grad(): gen = peft_model.generate( **ids, max_new_tokens=48, do_sample=False, pad_token_id=tok.pad_token_id, ) comp = tok.decode(gen[0, ids.input_ids.shape[1]:], skip_special_tokens=True) action = parse_action(comp) obs = env.step(action) term = deterministic_continuation(env, obs, task) scores.append(term) out["raw"].append({"task": task, "episode": ep, "completion": comp.strip()[:200], "score": term}) mean = round(sum(scores) / len(scores), 4) out["per_task"][task] = {"scores": scores, "mean": mean} print(f"[eval] {adapter_repo} | {task} | mean={mean} | scores={scores}") out["mean_overall"] = round( sum(v["mean"] for v in out["per_task"].values()) / max(1, len(out["per_task"])), 4 ) del peft_model gc.collect() try: import torch torch.cuda.empty_cache() except Exception: pass return out def main() -> int: ap = argparse.ArgumentParser() ap.add_argument("--base", required=True, help="HF id of the base model used during AutoTrain (e.g. Qwen/Qwen2.5-14B-Instruct)") ap.add_argument("--adapters", nargs="+", required=True, help="HF repos of LoRA adapters to evaluate") ap.add_argument("--tasks", nargs="+", default=["easy_brief", "medium_brief", "hard_brief", "expert_brief"]) ap.add_argument("--episodes", type=int, default=3) ap.add_argument("--hf-token", default=None, help="HF token (or set HF_TOKEN env var)") ap.add_argument("--out", default="training/eval_lora_results.json") ap.add_argument( "--adapter-subfolder", default=None, help="PEFT subfolder inside the adapter repo, e.g. 'final' when weights live in repo/final/", ) ap.add_argument("--no-4bit", action="store_true", help="Disable 4-bit (use only on big GPUs)") args = ap.parse_args() import os hf_token = args.hf_token or os.environ.get("HF_TOKEN") import torch from transformers import AutoModelForCausalLM, AutoTokenizer, BitsAndBytesConfig print(f"[eval] loading base model: {args.base}") tok = AutoTokenizer.from_pretrained(args.base, token=hf_token) if tok.pad_token is None: tok.pad_token = tok.eos_token bnb = None if not args.no_4bit: bnb = BitsAndBytesConfig( load_in_4bit=True, bnb_4bit_quant_type="nf4", bnb_4bit_compute_dtype=torch.bfloat16, bnb_4bit_use_double_quant=True, ) base_model = AutoModelForCausalLM.from_pretrained( args.base, token=hf_token, device_map="auto", quantization_config=bnb, torch_dtype=torch.bfloat16, ) base_model.eval() results: list[dict[str, Any]] = [] for adapter in args.adapters: try: res = evaluate_adapter( base_model, tok, adapter, args.tasks, args.episodes, hf_token, subfolder=args.adapter_subfolder ) except Exception as e: print(f"[eval] FAILED {adapter}: {e}") res = {"adapter": adapter, "error": str(e), "mean_overall": -1.0, "per_task": {}} results.append(res) results.sort(key=lambda r: r.get("mean_overall", -1.0), reverse=True) print("\n=== RANKING (higher is better) ===") for i, r in enumerate(results, 1): print(f"{i}. {r['adapter']} mean_overall={r.get('mean_overall')} per_task={ {k: v.get('mean') for k, v in r.get('per_task', {}).items()} }") out_path = REPO / args.out out_path.parent.mkdir(parents=True, exist_ok=True) out_path.write_text( json.dumps( { "base": args.base, "adapter_subfolder": args.adapter_subfolder, "results": results, }, indent=2, ) ) print(f"\n[eval] saved -> {out_path}") if results: print(f"[eval] WINNER: {results[0]['adapter']}") return 0 if __name__ == "__main__": raise SystemExit(main())