linvest21's picture
download
raw
17.6 kB
from __future__ import annotations
import hashlib
import json
from pathlib import Path
from typing import Any
from data_pipeline.learning_pdf_to_jsonl import SYSTEM_TEMPLATE, clean_text, write_jsonl
from data_pipeline.source_intake import load_policy
from data_pipeline.source_quality_certifier import certify_normalized_source_content
from n21.config import load_structured
from n21.settings import CONFIG_ROOT, REPO_ROOT
from observability.audit_log import utc_now
ROLE_DECISION_LENS = {
"researcher": {
"scenario": "equity research memo review",
"decision": "reject the thesis",
"pass_label": "fail",
"focus": "valuation, moat, ROIC, earnings quality, and financial statement analysis",
},
"portfolio_manager": {
"scenario": "portfolio construction and position-sizing review",
"decision": "reject the position increase",
"pass_label": "fail",
"focus": "portfolio construction, position sizing, rebalancing, tradeoff, and risk budget",
},
"risk_manager": {
"scenario": "risk control and red-flag review",
"decision": "reject the exposure",
"pass_label": "fail",
"focus": "red flag, risk control, stress test, liquidity risk, internal control, and drawdown",
},
"performance_manager": {
"scenario": "performance attribution review",
"decision": "reject the performance explanation",
"pass_label": "fail",
"focus": "attribution, benchmark, tracking error, information ratio, and risk adjusted evidence",
},
"client_portfolio_manager": {
"scenario": "client suitability and explanation review",
"decision": "reject the recommendation",
"pass_label": "fail",
"focus": "client explanation, suitability, risk tolerance, and client scenario",
},
"chief_investment_officer": {
"scenario": "investment committee governance review",
"decision": "reject the policy change",
"pass_label": "fail",
"focus": "investment policy, capital market assumption, committee, and governance",
},
}
def _iter_jsonl_paths(source: Path) -> list[Path]:
if source.is_file():
return [source]
return sorted(
path
for path in source.rglob("*.jsonl")
if path.name.endswith(".hf_finetune.jsonl") or path.suffix == ".jsonl"
if not path.name.startswith("synthetic_")
)
def _read_jsonl_rows(path: Path) -> list[dict[str, Any]]:
rows: list[dict[str, Any]] = []
for line in path.read_text(encoding="utf-8-sig").splitlines():
if not line.strip():
continue
try:
row = json.loads(line)
except json.JSONDecodeError:
continue
if isinstance(row, dict):
row.setdefault("_source_path", str(path))
rows.append(row)
return rows
def _messages_text(row: dict[str, Any]) -> str:
parts: list[str] = []
for message in row.get("messages") or []:
if isinstance(message, dict):
parts.append(str(message.get("content") or ""))
return clean_text("\n\n".join(parts))
def _source_title(row: dict[str, Any], source_path: Path) -> str:
metadata = row.get("metadata") if isinstance(row.get("metadata"), dict) else {}
return str(
metadata.get("source_title")
or metadata.get("source_pdf")
or metadata.get("source_normalized")
or source_path.stem.replace("_", " ")
)
def _sha_text(text: str) -> str:
return hashlib.sha256(text.encode("utf-8")).hexdigest()
def _role_lens(asset_class: str, role: str) -> dict[str, str]:
path = CONFIG_ROOT / "data" / "reasoning_frames.json"
if path.exists():
frames = load_structured(path)
frame = frames.get(asset_class, {}).get(role)
if isinstance(frame, dict):
return {
"scenario": str(frame.get("scenario") or ROLE_DECISION_LENS.get(role, ROLE_DECISION_LENS["researcher"])["scenario"]),
"decision": str(frame.get("decision") or ROLE_DECISION_LENS.get(role, ROLE_DECISION_LENS["researcher"])["decision"]),
"pass_label": str(frame.get("pass_label") or ROLE_DECISION_LENS.get(role, ROLE_DECISION_LENS["researcher"])["pass_label"]),
"focus": str(frame.get("focus") or ROLE_DECISION_LENS.get(role, ROLE_DECISION_LENS["researcher"])["focus"]),
}
return ROLE_DECISION_LENS.get(role, ROLE_DECISION_LENS["researcher"])
def _snippet(text: str, max_chars: int = 900) -> str:
text = clean_text(text)
if len(text) <= max_chars:
return text
cut = text[:max_chars].rsplit(" ", 1)[0]
return clean_text(cut)
def _build_reasoning_record(
*,
asset_class: str,
role: str,
source_path: Path,
row: dict[str, Any],
index: int,
variant: int = 1,
) -> dict[str, Any]:
lens = _role_lens(asset_class, role)
text = _messages_text(row)
title = _source_title(row, source_path)
excerpt = _snippet(text)
evidence_hash = _sha_text(text)
variant_lens = {
1: (
"evidence discipline",
"separate observed facts from inference before approving or rejecting the decision",
),
2: (
"opposing-case review",
"state the strongest counterargument and explain which missing evidence would change the decision",
),
3: (
"risk and implementation review",
"connect the evidence to risk budget, liquidity, valuation support, governance, and implementation controls",
),
}.get(
variant,
(
"audit-trail review",
"document the pass/fail rationale so a second reviewer can reproduce the decision",
),
)
user_prompt = (
f"Scenario: You are reviewing an {asset_class} {lens['scenario']}.\n"
f"Review lens: {variant_lens[0]}.\n"
f"Source title: {title}\n"
f"Grounding excerpt:\n{excerpt}\n\n"
"Decide whether the proposed action should pass or fail. Provide analysis, red flags, "
"a pass/fail label, and a rationale tied to the evidence."
)
assistant_response = (
f"Analysis for {asset_class} {role.replace('_', ' ')}:\n"
f"- Reported facts: the source evidence is from {title}; use it only as an evidence anchor for {lens['focus']}.\n"
"- Numeric anchor: identify the explicit percentage, dollar amount, ratio, margin, growth rate, "
"or valuation multiple before drawing a conclusion; if the excerpt lacks a number, state that "
"the numeric support is missing and must be obtained before approval.\n"
f"- Inference: {title} may support an investment thesis only after the analyst links the facts "
"to cash flow durability, risk, valuation, governance, or downside controls.\n"
"- Risk/tradeoff: fail the action if the source evidence is used without a clear link to cash flow, "
"risk, valuation, governance, downside controls, or the opposing case.\n"
f"- Decision: {lens['decision']}.\n"
f"- Pass/fail label: {lens['pass_label']}.\n"
f"- Rationale: {lens['decision'].capitalize()} because the role must preserve evidence discipline, "
"separate observed facts from inference, and avoid approving a decision when the risk budget, "
"valuation support, or red-flag analysis is incomplete.\n"
f"- Variant emphasis: {variant_lens[1]}.\n"
"- Required remediation: document the evidence, state the opposing case, identify the critical "
"failure mode, and only approve after the pass/fail reasoning is explicit. "
"This creates a review trail that a second analyst can audit before capital is committed."
)
return {
"messages": [
{"role": "system", "content": SYSTEM_TEMPLATE.format(asset_class=asset_class, role=role.replace("_", " "))},
{"role": "user", "content": user_prompt},
{"role": "assistant", "content": assistant_response},
],
"metadata": {
"asset_class": asset_class,
"role": role,
"task": "grounded_critical_reasoning_sft",
"source_path": str(source_path),
"source_title": title,
"source_evidence_sha256": evidence_hash,
"synthetic": True,
"synthetic_method": "grounded_template_reasoning_v1",
"rubric_target": "critical_pass_reasoning",
"pass_fail_label": lens["pass_label"],
"reasoning_variant": variant,
"reasoning_variant_focus": variant_lens[0],
"created_at": utc_now(),
"record_index": index,
},
}
def generate_grounded_reasoning_examples(
*,
asset_class: str,
role: str,
source: Path | None = None,
output_path: Path | None = None,
max_records: int = 120,
policy_path: Path | None = None,
) -> dict[str, Any]:
source_root = source or (REPO_ROOT / "data" / "learning" / asset_class / role)
if not source_root.exists():
raise FileNotFoundError(f"source not found: {source_root}")
output = output_path or (
REPO_ROOT
/ "data"
/ "learning"
/ asset_class
/ role
/ f"synthetic_{asset_class}_{role}_critical_reasoning.hf_finetune.jsonl"
)
policy = load_policy(policy_path)
rows: list[dict[str, Any]] = []
rejected: list[dict[str, Any]] = []
source_paths = _iter_jsonl_paths(source_root)
variants_per_source_row = 3
for source_path in source_paths:
for source_row in _read_jsonl_rows(source_path):
if len(rows) >= max_records:
break
if not _messages_text(source_row):
continue
for variant in range(1, variants_per_source_row + 1):
if len(rows) >= max_records:
break
candidate = _build_reasoning_record(
asset_class=asset_class,
role=role,
source_path=source_path,
row=source_row,
index=len(rows) + 1,
variant=variant,
)
assistant_text = candidate["messages"][-1]["content"]
certification = certify_normalized_source_content(
asset_class=asset_class,
role=role,
title=str(candidate["metadata"]["source_title"]),
url=str(source_path),
source_type="jsonl",
text=assistant_text,
quality_errors=["critical_pass_absolute: synthetic reasoning generation requested"],
policy=policy,
)
candidate["metadata"]["content_ai_certification"] = certification
if certification.get("training_eligible"):
rows.append(candidate)
else:
rejected.append(
{
"source_path": str(source_path),
"source_title": candidate["metadata"]["source_title"],
"variant": variant,
"certification": certification,
}
)
if len(rows) >= max_records:
break
if rows:
write_jsonl(output, rows)
manifest = {
"schema_version": "grounded_reasoning_generation_manifest_v1",
"asset_class": asset_class,
"role": role,
"source": str(source_root),
"output_path": str(output),
"source_file_count": len(source_paths),
"generated_count": len(rows),
"rejected_count": len(rejected),
"max_records": max_records,
"variants_per_source_row": variants_per_source_row,
"certifier": "source_content_ai_certification_v1",
"status": "completed" if rows else "blocked_no_certified_reasoning_examples",
"created_at": utc_now(),
"rejections": rejected[:25],
"ok": bool(rows),
}
output.parent.mkdir(parents=True, exist_ok=True)
manifest_path = output.with_suffix(output.suffix + ".manifest.json")
manifest_path.write_text(json.dumps(manifest, indent=2, sort_keys=True) + "\n", encoding="utf-8")
manifest["manifest_path"] = str(manifest_path)
return manifest
def generate_grounded_reasoning_examples_from_intake_records(
*,
asset_class: str,
role: str,
records: list[dict[str, Any]],
output_path: Path,
max_records: int = 120,
policy_path: Path | None = None,
) -> dict[str, Any]:
"""Generate trainable reasoning rows from normalized breakout intake.
Stall breakout often finds public sources that are suitable for
verification but not direct training. This function keeps the raw source
policy gate intact: it only uses normalized records that were downloaded
and accepted for either training or verification, then certifies the
generated reasoning rows before writing a training JSONL.
"""
policy = load_policy(policy_path)
rows: list[dict[str, Any]] = []
rejected: list[dict[str, Any]] = []
source_paths: list[str] = []
variants_per_source_row = 3
for record in records:
if len(rows) >= max_records:
break
if not record.get("downloaded"):
continue
if not (record.get("train_allowed") or record.get("verification_allowed")):
continue
normalized_path_text = (record.get("normalized") or {}).get("path")
if not normalized_path_text:
continue
normalized_path = Path(str(normalized_path_text))
if not normalized_path.exists():
continue
try:
normalized = json.loads(normalized_path.read_text(encoding="utf-8-sig"))
except (OSError, json.JSONDecodeError):
continue
text = clean_text(str(normalized.get("text") or ""))
if not text:
continue
source_paths.append(str(normalized_path))
source_row = {
"messages": [{"role": "assistant", "content": text}],
"metadata": {
"source_title": record.get("title") or normalized.get("source_title") or normalized_path.stem,
"source_normalized": normalized_path.name,
"source_url": record.get("url") or normalized.get("source_url"),
},
}
for variant in range(1, variants_per_source_row + 1):
if len(rows) >= max_records:
break
candidate = _build_reasoning_record(
asset_class=asset_class,
role=role,
source_path=normalized_path,
row=source_row,
index=len(rows) + 1,
variant=variant,
)
metadata = candidate.setdefault("metadata", {})
metadata["synthetic_method"] = "breakout_grounded_template_reasoning_v1"
metadata["source_url"] = record.get("url") or normalized.get("source_url")
metadata["source_decision"] = record.get("decision")
metadata["source_verification_allowed"] = bool(record.get("verification_allowed"))
metadata["source_train_allowed"] = bool(record.get("train_allowed"))
assistant_text = candidate["messages"][-1]["content"]
certification = certify_normalized_source_content(
asset_class=asset_class,
role=role,
title=str(metadata["source_title"]),
url=str(record.get("url") or normalized_path),
source_type="jsonl",
text=assistant_text,
quality_errors=["critical_pass_absolute: breakout reasoning generation requested"],
policy=policy,
)
metadata["content_ai_certification"] = certification
if certification.get("training_eligible"):
rows.append(candidate)
else:
rejected.append(
{
"source_path": str(normalized_path),
"source_title": metadata["source_title"],
"variant": variant,
"certification": certification,
}
)
if rows:
output_path.parent.mkdir(parents=True, exist_ok=True)
write_jsonl(output_path, rows)
manifest = {
"schema_version": "breakout_grounded_reasoning_generation_manifest_v1",
"asset_class": asset_class,
"role": role,
"source_record_count": len(records),
"normalized_source_count": len(set(source_paths)),
"output_path": str(output_path),
"generated_count": len(rows),
"rejected_count": len(rejected),
"max_records": max_records,
"variants_per_source_row": variants_per_source_row,
"certifier": "source_content_ai_certification_v1",
"status": "completed" if rows else "blocked_no_certified_reasoning_examples",
"created_at": utc_now(),
"rejections": rejected[:25],
"ok": bool(rows),
}
output_path.parent.mkdir(parents=True, exist_ok=True)
manifest_path = output_path.with_suffix(output_path.suffix + ".manifest.json")
manifest_path.write_text(json.dumps(manifest, indent=2, sort_keys=True) + "\n", encoding="utf-8")
manifest["manifest_path"] = str(manifest_path)
return manifest

Xet Storage Details

Size:
17.6 kB
·
Xet hash:
56a9ffa0d9b0c21418cfb3811194af05e85b5de55417f5cfd364695bb867ce46

Xet efficiently stores files, intelligently splitting them into unique chunks and accelerating uploads and downloads. More info.