CHIP / chip /compressor.py
luancy1208's picture
v0.2 initial
67d959b verified
"""
chip.compressor
================
CHIP 主压缩器。设计原则:
1. 协议是文本,不是模型 — 不依赖 LLM 调用,纯规则可跑
2. 双轨 — Qwen 轨用中文方括号,cl100k 轨用 XML/Markdown
3. 可逆 — 保留命名实体、数字、代码、URL 不动
4. 可审计 — 每条改动可追溯到 rules.yaml 的某条规则
当前实现层级:
L1 (lex) — 词法替换:啰嗦套话 → 紧凑动宾,纯正则,~1.3-1.5x 压缩
L2 (syn) — 句法重排:虚词替换、列表化,需 jieba 分词,~2-3x
L3 (idiom) — 成语压缩(基于实测白名单),需 target 是国产 tokenizer
L4 (proto) — 协议层归一化,统一为 ### 标签
NP-aware 角色提取(可选):
L2-022 默认用正则,在含空格的复合 NP 上偶有截断。
设环境变量 CHIP_USE_JIEBA=1 启用 jieba 增强版。
"""
from __future__ import annotations
import os
import re
from dataclasses import dataclass, field
from pathlib import Path
from typing import Iterable
import yaml
# ============ 数据类 ============
@dataclass
class Rule:
"""一条 CHIP 转换规则。"""
id: str
layer: str # "L1" | "L2" | "L3" | "L4"
pattern: str # 正则
replacement: str
description: str = ""
saves: int = 0 # 在参考 tokenizer 上预估省多少 token
risk: str = "low" # low | mid | high
flags: int = 0
_compiled: re.Pattern = field(default=None, repr=False)
def compile(self):
if self._compiled is None:
self._compiled = re.compile(self.pattern, self.flags)
return self._compiled
@dataclass
class CompressionResult:
"""压缩结果,带 audit trail。"""
original: str
compressed: str
applied_rules: list[str] # 命中的 rule id 列表
target: str # tokenizer 名
layers: tuple
@property
def char_ratio(self) -> float:
return len(self.compressed) / max(len(self.original), 1)
def diff(self) -> str:
"""简单的并排展示。"""
return f"原: {self.original}\n压: {self.compressed}\n规则: {', '.join(self.applied_rules) or '(none)'}"
# ============ 规则加载 ============
DEFAULT_RULES_PATH = Path(__file__).parent / "rules" / "rules.yaml"
def load_rules(path: Path | str = DEFAULT_RULES_PATH) -> list[Rule]:
"""从 yaml 加载规则。"""
path = Path(path)
with open(path, encoding="utf-8") as f:
data = yaml.safe_load(f)
rules = []
for item in data.get("rules", []):
flags = 0
for flag_name in item.get("flags", []):
flags |= getattr(re, flag_name.upper(), 0)
rules.append(Rule(
id=item["id"],
layer=item["layer"],
pattern=item["pattern"],
replacement=item.get("replacement", ""),
description=item.get("description", ""),
saves=item.get("saves", 0),
risk=item.get("risk", "low"),
flags=flags,
))
return rules
# ============ 保护性 mask ============
# 这些 pattern 命中的子串会先被替换成占位符,跑完规则后再还原。
# 防止规则误改专有名词、URL、代码、数字。
PROTECT_PATTERNS = [
("URL", re.compile(r"https?://\S+")),
("CODE", re.compile(r"```[\s\S]*?```|`[^`\n]+`")),
("NUM", re.compile(r"\d+(?:\.\d+)?(?:%|km|kg|m|s|°C)?")),
("EMAIL", re.compile(r"[\w.+-]+@[\w-]+\.[\w.-]+")),
# 双引号包裹的引文(用户原话)
("QUOTE", re.compile(r"[\"\u201c][^\"\u201d]+[\"\u201d]")),
]
# 占位符前缀用一个不会出现在自然中文里、且不会被 PROTECT_PATTERNS 命中的 token
_PH_OPEN = "\u2983" # ⦃
_PH_CLOSE = "\u2984" # ⦄
_PH_RE = re.compile(rf"{_PH_OPEN}\d+{_PH_CLOSE}")
def _mask(text: str) -> tuple[str, list[tuple[str, str]]]:
"""把不可压缩片段替换成 ⦃i⦄ 占位符,返回 (masked, mappings)。
关键:每次 sub 时跳过已经 mask 过的占位符,避免嵌套替换。
"""
mappings = []
masked = text
def make_sub():
def _sub(m):
# 如果 match 整体落在已有占位符内,跳过
content = m.group(0)
if _PH_RE.fullmatch(content):
return content
i = len(mappings)
placeholder = f"{_PH_OPEN}{i}{_PH_CLOSE}"
mappings.append((placeholder, content))
return placeholder
return _sub
for tag, pat in PROTECT_PATTERNS:
masked = pat.sub(make_sub(), masked)
return masked, mappings
def _unmask(text: str, mappings: list[tuple[str, str]]) -> str:
# 反向替换避免 ⦃1⦄ 误替换 ⦃10⦄
for placeholder, original in reversed(mappings):
text = text.replace(placeholder, original)
return text
# ============ 主类 ============
class Compressor:
"""可重用的压缩器实例。"""
def __init__(self,
rules_path: Path | str = DEFAULT_RULES_PATH,
target: str = "qwen2.5",
layers: Iterable[str] = ("L1", "L2", "L4")):
"""
Args:
target: 目标 tokenizer,影响成语压缩等 target-aware 决策
layers: 启用的压缩层
- L1: 词法层(套话剪枝),保险,默认开
- L2: 句法层(模式重排),保险,默认开
- L3: 成语层(语义压缩),需 target 是国产 tokenizer 才有意义,默认关
- L4: 协议层归一化(### 标题统一),无害,默认开
"""
self.rules = load_rules(rules_path)
self.target = target
self.layers = tuple(layers)
# 预编译
for r in self.rules:
r.compile()
def compress(self, text: str) -> CompressionResult:
original = text
# 可选:jieba 增强角色提取 (pre-process,优先于 L2-022 的纯正则)
applied_pre = []
if os.getenv("CHIP_USE_JIEBA") == "1" and "L2" in self.layers:
text, jieba_applied = _jieba_role_extract(text)
if jieba_applied:
applied_pre.append("L2-022J(jieba)")
masked, mappings = _mask(text)
applied = list(applied_pre)
for rule in self.rules:
if rule.layer not in self.layers:
continue
new_text, n = rule._compiled.subn(rule.replacement, masked)
if n > 0:
applied.append(f"{rule.id}×{n}")
masked = new_text
# 收尾:多余空白、连续标点
masked = re.sub(r"[ \t]+", " ", masked)
masked = re.sub(r"\s*\n\s*\n\s*\n+", "\n\n", masked)
# 协议层留下的孤立标点清理(L2-022 等会留下 "\n,xxx")
masked = re.sub(r"\n[,,;;。.\s]+", "\n", masked)
masked = re.sub(r"^[,,;;]+\s*", "", masked, flags=re.MULTILINE)
masked = masked.strip()
compressed = _unmask(masked, mappings)
return CompressionResult(
original=original,
compressed=compressed,
applied_rules=applied,
target=self.target,
layers=self.layers,
)
# ============ 便捷函数 ============
_default_compressor = None
def compress(text: str,
target: str = "qwen2.5",
layers: Iterable[str] = ("L1", "L2", "L4"),
return_result: bool = False) -> str | CompressionResult:
"""简便入口。
>>> compress("请帮我总结一下这段文字")
'总结一下这段文字'
>>> compress("...", layers=["L1","L2","L3","L4"]) # 启用所有层(包括成语)
>>> r = compress("...", return_result=True)
>>> print(r.diff())
"""
global _default_compressor
key = (target, tuple(layers))
if _default_compressor is None or _default_compressor[0] != key:
_default_compressor = (key, Compressor(target=target, layers=layers))
result = _default_compressor[1].compress(text)
return result if return_result else result.compressed
# ============ jieba NP 提取(可选增强) ============
_jieba_loaded = False
def _ensure_jieba():
"""懒加载 jieba。"""
global _jieba_loaded
if _jieba_loaded:
return True
try:
import jieba.posseg as pseg # noqa: F401
_jieba_loaded = True
return True
except ImportError:
return False
# 角色扮演的触发短语 — jieba 用它定位
_ROLE_PREFIX_RE = re.compile(
r"请\s*(?:你)?\s*扮演\s*(?:一(?:个|位))?\s*"
)
def _jieba_role_extract(text: str) -> tuple[str, bool]:
"""用 jieba 词性标注提取最长名词短语作为角色描述。
替换 L2-022 的纯正则 lookahead 实现 — 后者在以下场景失败:
- 角色描述非常长且无标点结尾
- 角色描述被句中的连词意外截断("...然后..." 这种)
策略:
1. 找到 "请你扮演[一位]" 触发短语
2. 从触发短语后开始,jieba.posseg 切分
3. 贪婪收集 NP token,直到遇到 hard-stop:
- 连词 c (然后/接着/以及)
- 介词 p (对/把/为)
- 动词 v (但 vn 动名词允许)
- 句末标点 w (。;,等)
4. 助词 'uj/u/ul'(的/地/得)、空格、英文都允许进入 NP
"""
if not _ensure_jieba():
return text, False
import jieba.posseg as pseg
m = _ROLE_PREFIX_RE.search(text)
if not m:
return text, False
head = text[:m.start()]
body = text[m.end():]
if not body:
return text, False
words = list(pseg.cut(body))
# NP 定义:最长前缀,直到遇到硬终止
# HARD_STOP:动词(非 vn)、连词、介词、标点
# ALLOW_IN_NP:名词、形容词、英文、数字、量词、助词(的/地/得)、空格
np_chars = []
cumlen = 0
rest_start = 0
found_np_core = False # 是否已经收到名词或形容词(NP 核心)
for w, flag in words:
# hard stop 条件
is_hard_stop = (
flag == "w" # 标点
or w in {",", ",", "。", ".", ";", ";", ":", ":", "、", "\n"}
or flag == "c" # 连词
or flag == "p" # 介词
or (flag.startswith("v") and flag != "vn") # 真动词(非动名词)
)
if is_hard_stop and found_np_core:
rest_start = cumlen
break
# 在 NP 内
np_chars.append(w)
cumlen += len(w)
if flag.startswith("n") or flag.startswith("a") or flag == "eng":
found_np_core = True
else:
# 遍历完了,整个 body 都是 NP
rest_start = cumlen
np_str = "".join(np_chars).strip()
if not np_str or len(np_str) < 2 or not found_np_core:
return text, False
rest = body[rest_start:]
new_text = f"{head}\n### 角色\n{np_str}\n{rest}"
# 清理紧跟在角色块后的孤立标点
new_text = re.sub(r"\n[,,;;。.]+", "\n", new_text)
new_text = new_text.strip()
return new_text, True