Buckets:
bbkdevops/unicosys-hypergraph-bucket / tinymind-native-colab-handoff /bundle /evaluation /pure_oracle_kernel.py
| """PureOracle kernel: deterministic tool/retrieval/logic/grounding stack. | |
| The kernel is deliberately evidence-first. It can make a tiny model behave more | |
| capably by routing questions through exact retrieval, symbolic logic, and | |
| hash-backed grounding before any free-form answer is allowed. | |
| """ | |
| from __future__ import annotations | |
| from dataclasses import dataclass | |
| from datetime import datetime, timezone | |
| import hashlib | |
| import json | |
| from pathlib import Path | |
| import re | |
| from typing import Callable | |
| from data.universal_context import STOPWORDS, UniversalContextLedger | |
| from model.logic_core import TinyLogicCore | |
| TOKEN_RE = re.compile(r"[\w\u0E00-\u0E7F]+", re.UNICODE) | |
| SHA_RE = re.compile(r"\b[a-fA-F0-9]{12,64}\b") | |
| ORACLE_STOPWORDS = STOPWORDS | { | |
| "a", "an", "as", "at", "be", "by", "can", "do", "does", "for", "from", "has", "have", | |
| "in", "into", "it", "of", "on", "only", "should", "through", "to", "uses", "using", | |
| "with", "without", | |
| "กับ", "การ", "ของ", "ความ", "จาก", "จะ", "ด้วย", "ต้อง", "ตอบ", "ทำ", "ที่", "นั้น", | |
| "นี้", "เป็น", "แบบ", "และ", "ว่า", "อย่าง", "ให้", "ได้", "ใน", "ไร้รูปแบบ", | |
| } | |
| def _sha256_text(text: str) -> str: | |
| return hashlib.sha256(text.encode("utf-8")).hexdigest() | |
| def _tokens(text: str) -> set[str]: | |
| return {tok.lower() for tok in TOKEN_RE.findall(text) if len(tok) >= 2 and tok.lower() not in ORACLE_STOPWORDS} | |
| def _shingles(text: str, n: int = 3) -> set[str]: | |
| compact = "".join(TOKEN_RE.findall(text.lower())) | |
| if len(compact) < n: | |
| return {compact} if compact else set() | |
| return {compact[i : i + n] for i in range(0, len(compact) - n + 1)} | |
| class OracleTool: | |
| name: str | |
| purpose: str | |
| input_schema: dict | |
| output_schema: dict | |
| deterministic: bool | |
| fn: Callable[[dict], dict] | |
| def spec(self) -> dict: | |
| return { | |
| "name": self.name, | |
| "purpose": self.purpose, | |
| "input_schema": self.input_schema, | |
| "output_schema": self.output_schema, | |
| "deterministic": self.deterministic, | |
| } | |
| class PureRetrievalEngine: | |
| """Hybrid exact/lexical/shingle retrieval over the Evidence Ledger.""" | |
| def __init__(self, ledger_dir: str | Path): | |
| self.ledger = UniversalContextLedger(ledger_dir) | |
| def retrieve(self, query: str, top_k: int = 5) -> list[dict]: | |
| chunks = self.ledger._chunks() | |
| q_terms = _tokens(query) | |
| q_shingles = _shingles(query) | |
| q_hashes = {item.lower() for item in SHA_RE.findall(query)} | |
| phrase = query.strip().lower() | |
| hits = [] | |
| for chunk in chunks: | |
| text = chunk["text"] | |
| lower = text.lower() | |
| terms = _tokens(text) | |
| shingles = _shingles(text) | |
| exact_hash = any(str(chunk["sha256"]).lower().startswith(h) or h in str(chunk["sha256"]).lower() for h in q_hashes) | |
| term_overlap = q_terms & terms | |
| shingle_overlap = q_shingles & shingles | |
| term_score = len(term_overlap) / max(len(q_terms), 1) | |
| shingle_score = len(shingle_overlap) / max(len(q_shingles), 1) | |
| phrase_score = 1.0 if phrase and len(phrase) >= 8 and phrase in lower else 0.0 | |
| hash_score = 1.0 if exact_hash else 0.0 | |
| source_multiplier = 0.82 if "\\.waylog\\" in str(chunk["rel_path"]).lower() or "/.waylog/" in str(chunk["rel_path"]).lower() else 1.0 | |
| score = max(hash_score, source_multiplier * (0.58 * term_score + 0.32 * shingle_score + 0.10 * phrase_score)) | |
| if score <= 0: | |
| continue | |
| hits.append( | |
| { | |
| "score": round(float(score), 6), | |
| "path": chunk["rel_path"], | |
| "chunk_sha256": chunk["sha256"], | |
| "chunk_id": chunk["chunk_id"], | |
| "start_char": chunk["start_char"], | |
| "end_char": chunk["end_char"], | |
| "matched_terms": sorted(term_overlap), | |
| "matched_shingles": len(shingle_overlap), | |
| "preview": text[:360], | |
| "evidence_hash": _sha256_text( | |
| json.dumps( | |
| { | |
| "chunk_sha256": chunk["sha256"], | |
| "query": query, | |
| "score": round(float(score), 6), | |
| }, | |
| ensure_ascii=False, | |
| sort_keys=True, | |
| ) | |
| ), | |
| } | |
| ) | |
| hits.sort(key=lambda row: (row["score"], len(row["matched_terms"]), row["matched_shingles"]), reverse=True) | |
| return hits[: max(1, int(top_k))] | |
| class PureOracleKernel: | |
| """Routes questions through self-built tools and blocks unsupported answers.""" | |
| def __init__(self, ledger_dir: str | Path): | |
| self.ledger_dir = Path(ledger_dir) | |
| self.retrieval = PureRetrievalEngine(self.ledger_dir) | |
| self.logic = TinyLogicCore() | |
| self.tools = self._build_tools() | |
| def _build_tools(self) -> dict[str, OracleTool]: | |
| return { | |
| "logic_prover": OracleTool( | |
| name="logic_prover", | |
| purpose="Solve deterministic formal reasoning patterns and return proof steps.", | |
| input_schema={"question": "str"}, | |
| output_schema={"ok": "bool", "answer": "str", "proof_steps": "list[str]"}, | |
| deterministic=True, | |
| fn=lambda payload: self.logic.solve(str(payload.get("question", ""))), | |
| ), | |
| "evidence_retriever": OracleTool( | |
| name="evidence_retriever", | |
| purpose="Retrieve exact ledger chunks by hash, lexical overlap, phrase, and character-shingle resonance.", | |
| input_schema={"query": "str", "top_k": "int"}, | |
| output_schema={"hits": "list[evidence_chunk]"}, | |
| deterministic=True, | |
| fn=lambda payload: { | |
| "hits": self.retrieval.retrieve( | |
| str(payload.get("query", "")), | |
| top_k=int(payload.get("top_k", 5)), | |
| ) | |
| }, | |
| ), | |
| "grounding_gate": OracleTool( | |
| name="grounding_gate", | |
| purpose="Allow answers only when source chunks and query support are strong enough.", | |
| input_schema={"question": "str", "evidence": "list[evidence_chunk]"}, | |
| output_schema={"passed": "bool", "reason": "str", "support": "dict"}, | |
| deterministic=True, | |
| fn=lambda payload: self._grounding_gate( | |
| str(payload.get("question", "")), | |
| list(payload.get("evidence", [])), | |
| ), | |
| ), | |
| } | |
| def tool_specs(self) -> list[dict]: | |
| return [tool.spec() for tool in self.tools.values()] | |
| def answer(self, question: str, top_k: int = 5) -> dict: | |
| logic_result = self.tools["logic_prover"].fn({"question": question}) | |
| if logic_result.get("ok"): | |
| return self._package( | |
| question=question, | |
| route="logic_prover", | |
| status="grounded", | |
| answer=str(logic_result["answer"]), | |
| evidence=[], | |
| logic=logic_result, | |
| grounding={"passed": True, "reason": "deterministic_logic_proof", "support": {"proof_steps": len(logic_result.get("proof_steps", []))}}, | |
| ) | |
| retrieval = self.tools["evidence_retriever"].fn({"query": question, "top_k": top_k}) | |
| evidence = retrieval["hits"] | |
| grounding = self.tools["grounding_gate"].fn({"question": question, "evidence": evidence}) | |
| if not grounding["passed"]: | |
| return self._package( | |
| question=question, | |
| route="retrieval_grounding", | |
| status="insufficient_evidence", | |
| answer="ค้นแล้วแต่หลักฐานยังไม่พอสำหรับคำตอบที่ควรเชื่อถือได้", | |
| evidence=evidence, | |
| logic=logic_result, | |
| grounding=grounding, | |
| ) | |
| supported = grounding["support"]["supported_terms"] | |
| answer = ( | |
| "คำตอบต้องยึดหลักฐานที่แนบเท่านั้น: " | |
| f"พบ source chunks {len(evidence)} จุด และคำสำคัญที่รองรับคือ {', '.join(supported[:10])}. " | |
| "ใช้ hash ของแต่ละ chunk เพื่อตรวจย้อนกลับก่อนนำไปสรุปต่อ" | |
| ) | |
| return self._package( | |
| question=question, | |
| route="retrieval_grounding", | |
| status="grounded", | |
| answer=answer, | |
| evidence=evidence, | |
| logic=logic_result, | |
| grounding=grounding, | |
| ) | |
| def _grounding_gate(self, question: str, evidence: list[dict]) -> dict: | |
| q_terms = _tokens(question) | |
| supported = set() | |
| score_mass = 0.0 | |
| verified = 0 | |
| for hit in evidence: | |
| if hit.get("chunk_sha256") and hit.get("evidence_hash"): | |
| verified += 1 | |
| supported |= set(hit.get("matched_terms", [])) | |
| score_mass += float(hit.get("score", 0.0)) | |
| support_ratio = len(q_terms & supported) / max(len(q_terms), 1) | |
| avg_score = score_mass / max(len(evidence), 1) | |
| passed = bool(evidence) and verified == len(evidence) and (support_ratio >= 0.34 or avg_score >= 0.55) | |
| reason = "source_hashes_and_query_terms_verified" if passed else "insufficient_source_support" | |
| return { | |
| "passed": passed, | |
| "reason": reason, | |
| "support": { | |
| "query_terms": sorted(q_terms), | |
| "supported_terms": sorted(q_terms & supported), | |
| "support_ratio": round(support_ratio, 6), | |
| "avg_retrieval_score": round(avg_score, 6), | |
| "verified_evidence": verified, | |
| "evidence_count": len(evidence), | |
| }, | |
| } | |
| def _package( | |
| self, | |
| question: str, | |
| route: str, | |
| status: str, | |
| answer: str, | |
| evidence: list[dict], | |
| logic: dict, | |
| grounding: dict, | |
| ) -> dict: | |
| payload = { | |
| "schema_version": "tinymind-pure-oracle-kernel-v1", | |
| "created_at": datetime.now(timezone.utc).isoformat(), | |
| "question": question, | |
| "route": route, | |
| "status": status, | |
| "answer": answer, | |
| "evidence": evidence, | |
| "logic": logic, | |
| "grounding_gate": grounding, | |
| "tool_specs": self.tool_specs(), | |
| "quality_claim": { | |
| "top_world_claim_allowed": False, | |
| "reason": "This is local deterministic infrastructure evidence, not external official ranking.", | |
| }, | |
| } | |
| payload["response_sha256"] = _sha256_text(json.dumps(payload, ensure_ascii=False, sort_keys=True)) | |
| return payload | |
| def write_pure_oracle_answer( | |
| ledger_dir: str | Path, | |
| question: str, | |
| out_path: str | Path, | |
| top_k: int = 5, | |
| ) -> dict: | |
| result = PureOracleKernel(ledger_dir).answer(question, top_k=top_k) | |
| out = Path(out_path) | |
| out.parent.mkdir(parents=True, exist_ok=True) | |
| out.write_text(json.dumps(result, ensure_ascii=False, indent=2, sort_keys=True), encoding="utf-8") | |
| result["out_path"] = str(out) | |
| return result | |
Xet Storage Details
- Size:
- 11.9 kB
- Xet hash:
- 2ab4b24c34e4b4cbcf9a4e395b573f5300b481c982f9f6861a3dcf1f4ba7d5a4
·
Xet efficiently stores files, intelligently splitting them into unique chunks and accelerating uploads and downloads. More info.