| from __future__ import annotations
|
|
|
| import json
|
| import os
|
| from dataclasses import dataclass
|
| from typing import Dict, List, Tuple, Optional
|
| import numpy as np
|
| from pathlib import Path
|
|
|
| BASE_DIR = Path(__file__).resolve()
|
| DATA_DIR = BASE_DIR.parent / "data"
|
|
|
|
|
|
|
| @dataclass
|
| class PipeOwlConfig:
|
| """
|
| 全域設定。
|
|
|
| embeddings_path:
|
| 語義場的基底向量矩陣 (V, D)
|
| V = 詞彙數
|
| D = 向量維度
|
|
|
| delta_scalar_path:
|
| 每個 token 對應的一維場偏移量 (V,)
|
| 用來做 score 偏移(目前為靜態 bias)
|
|
|
| vocab_path:
|
| vocab list,必須與 embeddings 順序完全對齊。
|
| index i <-> emb[i] <-> delta[i]
|
|
|
| alpha:
|
| base 相似度權重
|
|
|
| beta:
|
| delta 權重(目前為 logit bias,不是動態 loss)
|
|
|
| top_k:
|
| retrieval 預設回傳數量
|
|
|
| temperature:
|
| decode 階段採樣溫度
|
|
|
| max_new_tokens:
|
| decode 最大生成長度
|
| """
|
| embeddings_path: str = str(DATA_DIR / "L1_base_embeddings.npy")
|
| delta_scalar_path: str = str(DATA_DIR / "delta_base_scalar.npy")
|
| vocab_path: str = str(DATA_DIR / "L1_base_vocab.json")
|
|
|
|
|
| alpha: float = 1.0
|
| beta: float = 1.0
|
|
|
|
|
| top_k: int = 16
|
|
|
|
|
| temperature: float = 0.8
|
| max_new_tokens: int = 64
|
|
|
|
|
| class VocabTokenizer:
|
| """
|
| 字串最大匹配 tokenizer。
|
|
|
| 設計目標:
|
| 將輸入文字拆成 vocab 中存在的 token。
|
|
|
| 方法:
|
| - 使用最大長度優先匹配
|
| - OOV 字元直接跳過
|
|
|
| 風險:
|
| - OOV 會被忽略(可能導致語義缺失)
|
| - 無 subword fallback
|
| - 時間複雜度 O(n * max_token_len)
|
|
|
| 適用情境:
|
| vocab 是字 / 詞 級別,且已對齊 embedding。
|
| """
|
| def __init__(self, vocab_list):
|
| self.vocab_set = set(vocab_list)
|
| self.max_len = max(len(t) for t in vocab_list)
|
|
|
| def tokenize(self, text: str):
|
| tokens = []
|
| i = 0
|
| n = len(text)
|
|
|
| while i < n:
|
| matched = False
|
| for L in range(self.max_len, 0, -1):
|
| if i + L <= n:
|
| piece = text[i:i+L]
|
| if piece in self.vocab_set:
|
| tokens.append(piece)
|
| i += L
|
| matched = True
|
| break
|
| if not matched:
|
| i += 1
|
| return tokens
|
|
|
| class PipeOwlEngine:
|
| """
|
| PipeOwl 幾何語義引擎核心。
|
|
|
| 設計哲學:
|
| index = 語義場座標
|
|
|
| emb[i] -> 詞向量
|
| delta[i] -> 詞的場偏移量
|
| vocab[i] -> 詞本身
|
|
|
| 核心流程:
|
| text
|
| ↓
|
| tokenize
|
| ↓
|
| mean embedding
|
| ↓
|
| score = alpha*base + beta*delta
|
| ↓
|
| top-k
|
| ↓
|
| decode
|
|
|
| 這是一個:
|
| Field-based retrieval language system
|
| """
|
|
|
| def __init__(self, cfg: PipeOwlConfig):
|
| self.cfg = cfg
|
|
|
|
|
| self.emb: np.ndarray = None
|
| self.delta: np.ndarray = None
|
| self.token_to_id: Dict[str, int] = {}
|
| self.id_to_token: List[str] = []
|
|
|
|
|
| self.decoder = MicroGPTDecoder()
|
|
|
| self._load_assets()
|
|
|
|
|
|
|
|
|
|
|
| def _load_assets(self) -> None:
|
| """
|
| 載入語義場資產。
|
|
|
| 載入內容:
|
| 1. embeddings (V, D)
|
| 2. delta scalar (V,)
|
| 3. vocab list (V,)
|
|
|
| 關鍵假設:
|
| 三者必須 index 完全對齊。
|
|
|
| 幾何意義:
|
| 每個 index i 對應語義空間中的一個固定場點。
|
|
|
| 風險:
|
| - vocab 長度不等於 embeddings
|
| - delta 長度不等於 embeddings
|
| - dtype 不一致
|
| """
|
| if not os.path.exists(self.cfg.embeddings_path):
|
| raise FileNotFoundError(self.cfg.embeddings_path)
|
| if not os.path.exists(self.cfg.delta_scalar_path):
|
| raise FileNotFoundError(self.cfg.delta_scalar_path)
|
| if not os.path.exists(self.cfg.vocab_path):
|
| raise FileNotFoundError(self.cfg.vocab_path)
|
|
|
|
|
| self.emb = np.load(self.cfg.embeddings_path)
|
| if self.emb.dtype != np.float32:
|
| self.emb = self.emb.astype(np.float32, copy=False)
|
|
|
|
|
| self.delta = np.load(self.cfg.delta_scalar_path)
|
| if self.delta.dtype != np.float32:
|
| self.delta = self.delta.astype(np.float32, copy=False)
|
|
|
| if self.emb.ndim != 2:
|
| raise ValueError(f"embeddings must be 2D (V, D), got shape={self.emb.shape}")
|
| V, D = self.emb.shape
|
|
|
| if self.delta.ndim != 1 or self.delta.shape[0] != V:
|
| raise ValueError(f"delta must be shape (V,), got {self.delta.shape}, expected ({V},)")
|
|
|
|
|
| with open(self.cfg.vocab_path, "r", encoding="utf-8") as f:
|
| vocab_list = json.load(f)
|
|
|
| if not isinstance(vocab_list, list):
|
| raise ValueError("vocab must be a list for geometric field mode")
|
|
|
| if len(vocab_list) != V:
|
| raise ValueError(f"vocab size {len(vocab_list)} != embeddings V {V}")
|
|
|
| self.vocab = vocab_list
|
| self.id_to_token = vocab_list
|
| self.token_to_id = {ch: i for i, ch in enumerate(vocab_list)}
|
|
|
| self.tokenizer = VocabTokenizer(self.vocab)
|
|
|
|
|
|
|
|
|
|
|
| def encode(self, text: str):
|
| """
|
| 將文字投影到語義場中。
|
|
|
| 流程:
|
| 1. tokenize -> token list
|
| 2. 取每個 token 對應 emb
|
| 3. 做 mean pooling
|
| 4. normalize
|
|
|
| 數學形式:
|
| q = normalize( mean( emb[token_i] ) )
|
|
|
| 幾何意義:
|
| 這是在語義場中求質心。
|
|
|
| 風險:
|
| - mean pooling 會削弱方向性
|
| - 若 tokens 少或 OOV 多,向量會接近零
|
| """
|
| tokens = self.tokenizer.tokenize(text)
|
|
|
| vecs = []
|
| for t in tokens:
|
| idx = self.token_to_id[t]
|
| vecs.append(self.emb[idx])
|
|
|
| if not vecs:
|
| return np.zeros(self.emb.shape[1], dtype=np.float32)
|
|
|
| q = np.mean(vecs, axis=0)
|
| q /= (np.linalg.norm(q) + 1e-12)
|
| return q
|
|
|
|
|
|
|
|
|
| def score_vocab(self, q: np.ndarray, alpha: Optional[float] = None, beta: Optional[float] = None) -> np.ndarray:
|
| """
|
| 計算每個 vocab token 的場分數。
|
|
|
| base:
|
| emb @ q
|
| 若 emb 與 q 已正規化,則為 cosine similarity。
|
|
|
| delta:
|
| 每個 token 的靜態場偏移量。
|
|
|
| 最終公式:
|
| score = alpha * base + beta * delta
|
|
|
| 目前語義:
|
| delta 是 logit bias。
|
| 不是 loss、不是 energy gradient。
|
|
|
| 暫無實作
|
| 若 beta = 0:
|
| 純 embedding 相似度搜尋。
|
|
|
| 若 beta > 0:
|
| 加入場重力井效果。
|
| """
|
| a = self.cfg.alpha if alpha is None else float(alpha)
|
| b = self.cfg.beta if beta is None else float(beta)
|
|
|
| base = self.emb @ q
|
| score = a * base + b * self.delta
|
| return score.astype(np.float32, copy=False)
|
|
|
| def topk(self, score: np.ndarray, k: Optional[int] = None) -> List[Tuple[str, float]]:
|
| """
|
| 取前 k 高分 token。
|
|
|
| 使用 argpartition 提升效率。
|
|
|
| 回傳:
|
| [(token_string, score), ...]
|
|
|
| 幾何意義:
|
| 找出最接近 query 向量(含場偏移)的場點。
|
|
|
| 注意:
|
| score 可能 > 1(因為加入 delta)。
|
| """
|
| k = self.cfg.top_k if k is None else int(k)
|
| k = max(1, min(k, score.shape[0]))
|
|
|
|
|
| idx = np.argpartition(-score, k - 1)[:k]
|
| idx = idx[np.argsort(-score[idx])]
|
|
|
| out = []
|
| for i in idx:
|
| tok = self.id_to_token[i] if i < len(self.id_to_token) else str(i)
|
| out.append((tok, float(score[i])))
|
| return out
|
|
|
|
|
|
|
|
|
| def decode(self, prompt_tokens: List[str]) -> str:
|
| """
|
| Decode 階段。
|
|
|
| 目前行為:
|
| 將 top tokens 拼成 prompt 字串,
|
| 丟給 microgpt stub。
|
|
|
| 設計定位:
|
| retrieval 與 generation 分離。
|
|
|
| 現狀:
|
| microgpt 尚未接上真實權重,
|
| 目前只是 pipeline 占位。
|
| """
|
|
|
| prompt = " ".join([t for t in prompt_tokens if t])
|
| return self.decoder.generate(
|
| prompt=prompt,
|
| temperature=self.cfg.temperature,
|
| max_new_tokens=self.cfg.max_new_tokens,
|
| )
|
|
|
|
|
|
|
|
|
| def pipeowl(
|
| self,
|
| text: str,
|
| *,
|
| top_k: Optional[int] = None,
|
| alpha: Optional[float] = None,
|
| beta: Optional[float] = None,
|
| temperature: Optional[float] = None,
|
| max_new_tokens: Optional[int] = None,
|
| ) -> Dict[str, object]:
|
| """
|
| 單次完整 pipeline。
|
|
|
| 流程:
|
| text
|
| ↓
|
| encode
|
| ↓
|
| score_vocab
|
| ↓
|
| topk
|
| ↓
|
| decode
|
|
|
| 回傳:
|
| {
|
| "query": 原始文字,
|
| "retrieved": top-k token + 分數,
|
| "prompt": 用於 decode 的 token 串,
|
| "decoded": 生成結果
|
| }
|
|
|
| 這是語義場查詢的一次完整觀測。
|
| """
|
| q = self.encode(text)
|
| s = self.score_vocab(q, alpha=alpha, beta=beta)
|
| retrieved = self.topk(s, k=top_k)
|
|
|
|
|
| prompt_tokens = [t for (t, _) in retrieved[: min(len(retrieved), 8)]]
|
| if temperature is not None:
|
| self.cfg.temperature = float(temperature)
|
| if max_new_tokens is not None:
|
| self.cfg.max_new_tokens = int(max_new_tokens)
|
|
|
| decoded = self.decode(prompt_tokens)
|
| return {
|
| "query": text,
|
| "retrieved": retrieved,
|
| "prompt": " ".join(prompt_tokens),
|
| "decoded": decoded,
|
| }
|
|
|
|
|
|
|
|
|
|
|
| class MicroGPTDecoder:
|
| """
|
| 推理階段占位 decoder。
|
|
|
| 設計目的:
|
| 讓 pipeline 可運行,
|
| 未來可替換為:
|
| - 已訓練 microGPT
|
| - 外部 LLM
|
| - 或場驅動 sampling 模型
|
|
|
| 現在只是 scaffold。
|
|
|
| Inference-only placeholder.
|
|
|
| Why placeholder?
|
| - Your pasted microGPT file trains its own weights in-process.
|
| - For a real decode stage, you want:
|
| (A) load a trained state_dict from disk, OR
|
| (B) keep a tiny trained model in memory, OR
|
| (C) use microGPT purely as a sampler over a learned char vocab.
|
|
|
| This class is the stable interface. Plug your implementation later.
|
| """
|
|
|
| def __init__(self):
|
|
|
|
|
| pass
|
|
|
| def generate(self, prompt: str, temperature: float = 0.8, max_new_tokens: int = 64) -> str:
|
|
|
|
|
|
|
| return f"[microgpt_stub] {prompt}"
|
|
|