bbkdevops's picture
download
raw
6.49 kB
"""Grounded answer guard for anti-amnesia / anti-hallucination behavior."""
from __future__ import annotations
import json
import re
from pathlib import Path
from data.external_research import ExternalResearcher
from data.universal_context import STOPWORDS, UniversalContextLedger
def _terms(text: str) -> set[str]:
return {tok.lower() for tok in re.findall(r"[\w\u0E00-\u0E7F]+", text) if len(tok) >= 2 and tok.lower() not in STOPWORDS}
def _insufficient(question: str, reason: str, evidence: list | None = None) -> dict:
return {
"schema_version": "tinymind-grounded-answer-v1",
"question": question,
"status": "insufficient_evidence",
"answer": "ค้นหาและตรวจหลักฐานแล้ว แต่ยังไม่พบหลักฐานที่ยืนยันได้เพียงพอ จึงไม่ควรสรุปเป็นข้อเท็จจริง",
"evidence": evidence or [],
"hallucination_gate": {"passed": True, "reason": reason},
}
def build_grounded_answer(
question: str,
ledger_dir: str | Path,
top_k: int = 3,
external_research: str = "when_missing",
research_dir: str | Path | None = None,
researcher: ExternalResearcher | None = None,
) -> dict:
ledger = UniversalContextLedger(ledger_dir)
hits = ledger.query(question, top_k=top_k, min_score=0.35)
if not hits:
if external_research in {"when_missing", "always"}:
return _answer_from_external_research(question, research_dir or Path(ledger_dir) / "external_research", top_k, researcher)
return _insufficient(question, "refused_without_evidence")
if external_research == "always":
researched = _answer_from_external_research(question, research_dir or Path(ledger_dir) / "external_research", top_k, researcher)
if researched["status"] == "grounded":
return researched
matched_terms = set()
evidence = []
for hit in hits:
matched_terms |= set(hit.get("matched_terms", []))
evidence.append(
{
"path": hit["path"],
"chunk_sha256": hit["chunk_sha256"],
"start_char": hit["start_char"],
"end_char": hit["end_char"],
"score": hit["score"],
"matched_terms": hit.get("matched_terms", []),
"preview": hit["preview"],
}
)
q_terms = _terms(question)
supported_terms = sorted(q_terms & matched_terms)
if not supported_terms:
if external_research in {"when_missing", "always"}:
researched = _answer_from_external_research(question, research_dir or Path(ledger_dir) / "external_research", top_k, researcher)
if researched["status"] == "grounded":
return researched
result = _insufficient(question, "refused_low_support", evidence)
result["supported_query_terms"] = []
return result
answer = (
"จากหลักฐานที่ดึงได้ คำตอบควรยึดตาม source chunk ที่แนบไว้เท่านั้น "
f"พบคำสำคัญที่เชื่อมกับหลักฐาน: {', '.join(supported_terms[:8]) or 'ไม่พบคำร่วมชัดเจน'}"
)
return {
"schema_version": "tinymind-grounded-answer-v1",
"question": question,
"status": "grounded",
"answer": answer,
"evidence": evidence,
"supported_query_terms": supported_terms,
"hallucination_gate": {
"passed": bool(evidence),
"reason": "answer_has_source_chunks_and_hashes",
"evidence_count": len(evidence),
},
}
def _answer_from_external_research(
question: str,
research_dir: str | Path,
top_k: int,
researcher: ExternalResearcher | None,
) -> dict:
researcher = researcher or ExternalResearcher()
report = researcher.research(question, research_dir, max_results=max(3, top_k))
sources = report.get("sources", [])
if not sources:
result = _insufficient(question, "external_research_found_no_verified_sources")
result["external_research"] = {"report_path": report.get("report_path"), "source_count": 0}
return result
evidence = []
supported = set()
for source in sources[:top_k]:
supported |= set(source.get("matched_terms", []))
evidence.append(
{
"path": source["url"],
"source_url": source["url"],
"chunk_sha256": source["sha256"],
"score": source["score"],
"matched_terms": source.get("matched_terms", []),
"preview": source["text"][:300],
}
)
answer = (
"ค้นหาภายนอกแล้วพบหลักฐานที่ตรวจ hash ได้ คำตอบควรยึดตามแหล่งอ้างอิงที่แนบไว้เท่านั้น "
f"คำสำคัญที่รองรับ: {', '.join(sorted(supported)[:8])}"
)
return {
"schema_version": "tinymind-grounded-answer-v1",
"question": question,
"status": "grounded",
"answer": answer,
"evidence": evidence,
"supported_query_terms": sorted(supported),
"external_research": {
"report_path": report.get("report_path"),
"source_count": len(sources),
"policy": report.get("policy"),
},
"hallucination_gate": {
"passed": True,
"reason": "external_research_sources_hashed_and_attached",
"evidence_count": len(evidence),
},
}
def write_grounded_answer(
question: str,
ledger_dir: str | Path,
out_path: str | Path,
top_k: int = 3,
external_research: str = "when_missing",
research_dir: str | Path | None = None,
) -> dict:
result = build_grounded_answer(
question,
ledger_dir,
top_k=top_k,
external_research=external_research,
research_dir=research_dir,
)
p = Path(out_path)
p.parent.mkdir(parents=True, exist_ok=True)
p.write_text(json.dumps(result, ensure_ascii=False, indent=2, sort_keys=True), encoding="utf-8")
result["out_path"] = str(p)
return result

Xet Storage Details

Size:
6.49 kB
·
Xet hash:
5069e13013832a87bb2eec927924a8b40b6aa8d1ccb6a7d86dd732936e780385

Xet efficiently stores files, intelligently splitting them into unique chunks and accelerating uploads and downloads. More info.