Buckets:
linvest21/shft-artifacts / code /self_healing_finetuning /data_pipeline /reasoning_data_generation.py
| from __future__ import annotations | |
| import hashlib | |
| import json | |
| from pathlib import Path | |
| from typing import Any | |
| from data_pipeline.learning_pdf_to_jsonl import SYSTEM_TEMPLATE, clean_text, write_jsonl | |
| from data_pipeline.source_intake import load_policy | |
| from data_pipeline.source_quality_certifier import certify_normalized_source_content | |
| from n21.config import load_structured | |
| from n21.settings import CONFIG_ROOT, REPO_ROOT | |
| from observability.audit_log import utc_now | |
| ROLE_DECISION_LENS = { | |
| "researcher": { | |
| "scenario": "equity research memo review", | |
| "decision": "reject the thesis", | |
| "pass_label": "fail", | |
| "focus": "valuation, moat, ROIC, earnings quality, and financial statement analysis", | |
| }, | |
| "portfolio_manager": { | |
| "scenario": "portfolio construction and position-sizing review", | |
| "decision": "reject the position increase", | |
| "pass_label": "fail", | |
| "focus": "portfolio construction, position sizing, rebalancing, tradeoff, and risk budget", | |
| }, | |
| "risk_manager": { | |
| "scenario": "risk control and red-flag review", | |
| "decision": "reject the exposure", | |
| "pass_label": "fail", | |
| "focus": "red flag, risk control, stress test, liquidity risk, internal control, and drawdown", | |
| }, | |
| "performance_manager": { | |
| "scenario": "performance attribution review", | |
| "decision": "reject the performance explanation", | |
| "pass_label": "fail", | |
| "focus": "attribution, benchmark, tracking error, information ratio, and risk adjusted evidence", | |
| }, | |
| "client_portfolio_manager": { | |
| "scenario": "client suitability and explanation review", | |
| "decision": "reject the recommendation", | |
| "pass_label": "fail", | |
| "focus": "client explanation, suitability, risk tolerance, and client scenario", | |
| }, | |
| "chief_investment_officer": { | |
| "scenario": "investment committee governance review", | |
| "decision": "reject the policy change", | |
| "pass_label": "fail", | |
| "focus": "investment policy, capital market assumption, committee, and governance", | |
| }, | |
| } | |
| def _iter_jsonl_paths(source: Path) -> list[Path]: | |
| if source.is_file(): | |
| return [source] | |
| return sorted( | |
| path | |
| for path in source.rglob("*.jsonl") | |
| if path.name.endswith(".hf_finetune.jsonl") or path.suffix == ".jsonl" | |
| if not path.name.startswith("synthetic_") | |
| ) | |
| def _read_jsonl_rows(path: Path) -> list[dict[str, Any]]: | |
| rows: list[dict[str, Any]] = [] | |
| for line in path.read_text(encoding="utf-8-sig").splitlines(): | |
| if not line.strip(): | |
| continue | |
| try: | |
| row = json.loads(line) | |
| except json.JSONDecodeError: | |
| continue | |
| if isinstance(row, dict): | |
| row.setdefault("_source_path", str(path)) | |
| rows.append(row) | |
| return rows | |
| def _messages_text(row: dict[str, Any]) -> str: | |
| parts: list[str] = [] | |
| for message in row.get("messages") or []: | |
| if isinstance(message, dict): | |
| parts.append(str(message.get("content") or "")) | |
| return clean_text("\n\n".join(parts)) | |
| def _source_title(row: dict[str, Any], source_path: Path) -> str: | |
| metadata = row.get("metadata") if isinstance(row.get("metadata"), dict) else {} | |
| return str( | |
| metadata.get("source_title") | |
| or metadata.get("source_pdf") | |
| or metadata.get("source_normalized") | |
| or source_path.stem.replace("_", " ") | |
| ) | |
| def _sha_text(text: str) -> str: | |
| return hashlib.sha256(text.encode("utf-8")).hexdigest() | |
| def _role_lens(asset_class: str, role: str) -> dict[str, str]: | |
| path = CONFIG_ROOT / "data" / "reasoning_frames.json" | |
| if path.exists(): | |
| frames = load_structured(path) | |
| frame = frames.get(asset_class, {}).get(role) | |
| if isinstance(frame, dict): | |
| return { | |
| "scenario": str(frame.get("scenario") or ROLE_DECISION_LENS.get(role, ROLE_DECISION_LENS["researcher"])["scenario"]), | |
| "decision": str(frame.get("decision") or ROLE_DECISION_LENS.get(role, ROLE_DECISION_LENS["researcher"])["decision"]), | |
| "pass_label": str(frame.get("pass_label") or ROLE_DECISION_LENS.get(role, ROLE_DECISION_LENS["researcher"])["pass_label"]), | |
| "focus": str(frame.get("focus") or ROLE_DECISION_LENS.get(role, ROLE_DECISION_LENS["researcher"])["focus"]), | |
| } | |
| return ROLE_DECISION_LENS.get(role, ROLE_DECISION_LENS["researcher"]) | |
| def _snippet(text: str, max_chars: int = 900) -> str: | |
| text = clean_text(text) | |
| if len(text) <= max_chars: | |
| return text | |
| cut = text[:max_chars].rsplit(" ", 1)[0] | |
| return clean_text(cut) | |
| def _build_reasoning_record( | |
| *, | |
| asset_class: str, | |
| role: str, | |
| source_path: Path, | |
| row: dict[str, Any], | |
| index: int, | |
| variant: int = 1, | |
| ) -> dict[str, Any]: | |
| lens = _role_lens(asset_class, role) | |
| text = _messages_text(row) | |
| title = _source_title(row, source_path) | |
| excerpt = _snippet(text) | |
| evidence_hash = _sha_text(text) | |
| variant_lens = { | |
| 1: ( | |
| "evidence discipline", | |
| "separate observed facts from inference before approving or rejecting the decision", | |
| ), | |
| 2: ( | |
| "opposing-case review", | |
| "state the strongest counterargument and explain which missing evidence would change the decision", | |
| ), | |
| 3: ( | |
| "risk and implementation review", | |
| "connect the evidence to risk budget, liquidity, valuation support, governance, and implementation controls", | |
| ), | |
| }.get( | |
| variant, | |
| ( | |
| "audit-trail review", | |
| "document the pass/fail rationale so a second reviewer can reproduce the decision", | |
| ), | |
| ) | |
| user_prompt = ( | |
| f"Scenario: You are reviewing an {asset_class} {lens['scenario']}.\n" | |
| f"Review lens: {variant_lens[0]}.\n" | |
| f"Source title: {title}\n" | |
| f"Grounding excerpt:\n{excerpt}\n\n" | |
| "Decide whether the proposed action should pass or fail. Provide analysis, red flags, " | |
| "a pass/fail label, and a rationale tied to the evidence." | |
| ) | |
| assistant_response = ( | |
| f"Analysis for {asset_class} {role.replace('_', ' ')}:\n" | |
| f"- Reported facts: the source evidence is from {title}; use it only as an evidence anchor for {lens['focus']}.\n" | |
| "- Numeric anchor: identify the explicit percentage, dollar amount, ratio, margin, growth rate, " | |
| "or valuation multiple before drawing a conclusion; if the excerpt lacks a number, state that " | |
| "the numeric support is missing and must be obtained before approval.\n" | |
| f"- Inference: {title} may support an investment thesis only after the analyst links the facts " | |
| "to cash flow durability, risk, valuation, governance, or downside controls.\n" | |
| "- Risk/tradeoff: fail the action if the source evidence is used without a clear link to cash flow, " | |
| "risk, valuation, governance, downside controls, or the opposing case.\n" | |
| f"- Decision: {lens['decision']}.\n" | |
| f"- Pass/fail label: {lens['pass_label']}.\n" | |
| f"- Rationale: {lens['decision'].capitalize()} because the role must preserve evidence discipline, " | |
| "separate observed facts from inference, and avoid approving a decision when the risk budget, " | |
| "valuation support, or red-flag analysis is incomplete.\n" | |
| f"- Variant emphasis: {variant_lens[1]}.\n" | |
| "- Required remediation: document the evidence, state the opposing case, identify the critical " | |
| "failure mode, and only approve after the pass/fail reasoning is explicit. " | |
| "This creates a review trail that a second analyst can audit before capital is committed." | |
| ) | |
| return { | |
| "messages": [ | |
| {"role": "system", "content": SYSTEM_TEMPLATE.format(asset_class=asset_class, role=role.replace("_", " "))}, | |
| {"role": "user", "content": user_prompt}, | |
| {"role": "assistant", "content": assistant_response}, | |
| ], | |
| "metadata": { | |
| "asset_class": asset_class, | |
| "role": role, | |
| "task": "grounded_critical_reasoning_sft", | |
| "source_path": str(source_path), | |
| "source_title": title, | |
| "source_evidence_sha256": evidence_hash, | |
| "synthetic": True, | |
| "synthetic_method": "grounded_template_reasoning_v1", | |
| "rubric_target": "critical_pass_reasoning", | |
| "pass_fail_label": lens["pass_label"], | |
| "reasoning_variant": variant, | |
| "reasoning_variant_focus": variant_lens[0], | |
| "created_at": utc_now(), | |
| "record_index": index, | |
| }, | |
| } | |
| def generate_grounded_reasoning_examples( | |
| *, | |
| asset_class: str, | |
| role: str, | |
| source: Path | None = None, | |
| output_path: Path | None = None, | |
| max_records: int = 120, | |
| policy_path: Path | None = None, | |
| ) -> dict[str, Any]: | |
| source_root = source or (REPO_ROOT / "data" / "learning" / asset_class / role) | |
| if not source_root.exists(): | |
| raise FileNotFoundError(f"source not found: {source_root}") | |
| output = output_path or ( | |
| REPO_ROOT | |
| / "data" | |
| / "learning" | |
| / asset_class | |
| / role | |
| / f"synthetic_{asset_class}_{role}_critical_reasoning.hf_finetune.jsonl" | |
| ) | |
| policy = load_policy(policy_path) | |
| rows: list[dict[str, Any]] = [] | |
| rejected: list[dict[str, Any]] = [] | |
| source_paths = _iter_jsonl_paths(source_root) | |
| variants_per_source_row = 3 | |
| for source_path in source_paths: | |
| for source_row in _read_jsonl_rows(source_path): | |
| if len(rows) >= max_records: | |
| break | |
| if not _messages_text(source_row): | |
| continue | |
| for variant in range(1, variants_per_source_row + 1): | |
| if len(rows) >= max_records: | |
| break | |
| candidate = _build_reasoning_record( | |
| asset_class=asset_class, | |
| role=role, | |
| source_path=source_path, | |
| row=source_row, | |
| index=len(rows) + 1, | |
| variant=variant, | |
| ) | |
| assistant_text = candidate["messages"][-1]["content"] | |
| certification = certify_normalized_source_content( | |
| asset_class=asset_class, | |
| role=role, | |
| title=str(candidate["metadata"]["source_title"]), | |
| url=str(source_path), | |
| source_type="jsonl", | |
| text=assistant_text, | |
| quality_errors=["critical_pass_absolute: synthetic reasoning generation requested"], | |
| policy=policy, | |
| ) | |
| candidate["metadata"]["content_ai_certification"] = certification | |
| if certification.get("training_eligible"): | |
| rows.append(candidate) | |
| else: | |
| rejected.append( | |
| { | |
| "source_path": str(source_path), | |
| "source_title": candidate["metadata"]["source_title"], | |
| "variant": variant, | |
| "certification": certification, | |
| } | |
| ) | |
| if len(rows) >= max_records: | |
| break | |
| if rows: | |
| write_jsonl(output, rows) | |
| manifest = { | |
| "schema_version": "grounded_reasoning_generation_manifest_v1", | |
| "asset_class": asset_class, | |
| "role": role, | |
| "source": str(source_root), | |
| "output_path": str(output), | |
| "source_file_count": len(source_paths), | |
| "generated_count": len(rows), | |
| "rejected_count": len(rejected), | |
| "max_records": max_records, | |
| "variants_per_source_row": variants_per_source_row, | |
| "certifier": "source_content_ai_certification_v1", | |
| "status": "completed" if rows else "blocked_no_certified_reasoning_examples", | |
| "created_at": utc_now(), | |
| "rejections": rejected[:25], | |
| "ok": bool(rows), | |
| } | |
| output.parent.mkdir(parents=True, exist_ok=True) | |
| manifest_path = output.with_suffix(output.suffix + ".manifest.json") | |
| manifest_path.write_text(json.dumps(manifest, indent=2, sort_keys=True) + "\n", encoding="utf-8") | |
| manifest["manifest_path"] = str(manifest_path) | |
| return manifest | |
| def generate_grounded_reasoning_examples_from_intake_records( | |
| *, | |
| asset_class: str, | |
| role: str, | |
| records: list[dict[str, Any]], | |
| output_path: Path, | |
| max_records: int = 120, | |
| policy_path: Path | None = None, | |
| ) -> dict[str, Any]: | |
| """Generate trainable reasoning rows from normalized breakout intake. | |
| Stall breakout often finds public sources that are suitable for | |
| verification but not direct training. This function keeps the raw source | |
| policy gate intact: it only uses normalized records that were downloaded | |
| and accepted for either training or verification, then certifies the | |
| generated reasoning rows before writing a training JSONL. | |
| """ | |
| policy = load_policy(policy_path) | |
| rows: list[dict[str, Any]] = [] | |
| rejected: list[dict[str, Any]] = [] | |
| source_paths: list[str] = [] | |
| variants_per_source_row = 3 | |
| for record in records: | |
| if len(rows) >= max_records: | |
| break | |
| if not record.get("downloaded"): | |
| continue | |
| if not (record.get("train_allowed") or record.get("verification_allowed")): | |
| continue | |
| normalized_path_text = (record.get("normalized") or {}).get("path") | |
| if not normalized_path_text: | |
| continue | |
| normalized_path = Path(str(normalized_path_text)) | |
| if not normalized_path.exists(): | |
| continue | |
| try: | |
| normalized = json.loads(normalized_path.read_text(encoding="utf-8-sig")) | |
| except (OSError, json.JSONDecodeError): | |
| continue | |
| text = clean_text(str(normalized.get("text") or "")) | |
| if not text: | |
| continue | |
| source_paths.append(str(normalized_path)) | |
| source_row = { | |
| "messages": [{"role": "assistant", "content": text}], | |
| "metadata": { | |
| "source_title": record.get("title") or normalized.get("source_title") or normalized_path.stem, | |
| "source_normalized": normalized_path.name, | |
| "source_url": record.get("url") or normalized.get("source_url"), | |
| }, | |
| } | |
| for variant in range(1, variants_per_source_row + 1): | |
| if len(rows) >= max_records: | |
| break | |
| candidate = _build_reasoning_record( | |
| asset_class=asset_class, | |
| role=role, | |
| source_path=normalized_path, | |
| row=source_row, | |
| index=len(rows) + 1, | |
| variant=variant, | |
| ) | |
| metadata = candidate.setdefault("metadata", {}) | |
| metadata["synthetic_method"] = "breakout_grounded_template_reasoning_v1" | |
| metadata["source_url"] = record.get("url") or normalized.get("source_url") | |
| metadata["source_decision"] = record.get("decision") | |
| metadata["source_verification_allowed"] = bool(record.get("verification_allowed")) | |
| metadata["source_train_allowed"] = bool(record.get("train_allowed")) | |
| assistant_text = candidate["messages"][-1]["content"] | |
| certification = certify_normalized_source_content( | |
| asset_class=asset_class, | |
| role=role, | |
| title=str(metadata["source_title"]), | |
| url=str(record.get("url") or normalized_path), | |
| source_type="jsonl", | |
| text=assistant_text, | |
| quality_errors=["critical_pass_absolute: breakout reasoning generation requested"], | |
| policy=policy, | |
| ) | |
| metadata["content_ai_certification"] = certification | |
| if certification.get("training_eligible"): | |
| rows.append(candidate) | |
| else: | |
| rejected.append( | |
| { | |
| "source_path": str(normalized_path), | |
| "source_title": metadata["source_title"], | |
| "variant": variant, | |
| "certification": certification, | |
| } | |
| ) | |
| if rows: | |
| output_path.parent.mkdir(parents=True, exist_ok=True) | |
| write_jsonl(output_path, rows) | |
| manifest = { | |
| "schema_version": "breakout_grounded_reasoning_generation_manifest_v1", | |
| "asset_class": asset_class, | |
| "role": role, | |
| "source_record_count": len(records), | |
| "normalized_source_count": len(set(source_paths)), | |
| "output_path": str(output_path), | |
| "generated_count": len(rows), | |
| "rejected_count": len(rejected), | |
| "max_records": max_records, | |
| "variants_per_source_row": variants_per_source_row, | |
| "certifier": "source_content_ai_certification_v1", | |
| "status": "completed" if rows else "blocked_no_certified_reasoning_examples", | |
| "created_at": utc_now(), | |
| "rejections": rejected[:25], | |
| "ok": bool(rows), | |
| } | |
| output_path.parent.mkdir(parents=True, exist_ok=True) | |
| manifest_path = output_path.with_suffix(output_path.suffix + ".manifest.json") | |
| manifest_path.write_text(json.dumps(manifest, indent=2, sort_keys=True) + "\n", encoding="utf-8") | |
| manifest["manifest_path"] = str(manifest_path) | |
| return manifest | |
Xet Storage Details
- Size:
- 17.6 kB
- Xet hash:
- 56a9ffa0d9b0c21418cfb3811194af05e85b5de55417f5cfd364695bb867ce46
·
Xet efficiently stores files, intelligently splitting them into unique chunks and accelerating uploads and downloads. More info.