| """ |
| knowledge_rag.py β security-knowledge RAG store (Masterplan Β§1.4). |
| |
| A small, dependency-light vector store of security writeups, CVE detail |
| pages, disclosed bug-bounty reports, and research papers. The store reuses |
| the embedder from ``embedding_memory.py`` if available, otherwise falls |
| back to a deterministic hash-bag baseline so unit tests pass with zero |
| extra dependencies. |
| |
| The store is a single SQLite file under ``/data/knowledge_rag.sqlite`` so |
| it survives Space restarts and can be snapshotted to GitHub like the rest |
| of the Hermes memory. |
| """ |
|
|
| from __future__ import annotations |
|
|
| import hashlib |
| import json |
| import logging |
| import math |
| import os |
| import sqlite3 |
| import time |
| from dataclasses import dataclass, field |
| from pathlib import Path |
| from typing import Any, Iterable |
|
|
| LOG = logging.getLogger("knowledge_rag") |
|
|
| DB_PATH = Path(os.getenv("KNOWLEDGE_RAG_DB", "/data/knowledge_rag.sqlite")) |
| EMBED_DIM = 256 |
|
|
|
|
| SOURCES_DEFAULT: list[str] = [ |
| "https://hackerone.com/hacktivity", |
| "https://www.cvedetails.com/", |
| "https://github.com/ngalongc/bug-bounty-reference", |
| "https://github.com/EdOverflow/bugbounty-cheatsheet", |
| "https://github.com/nicowillis/awesome-bugbounty-writeups", |
| "https://arxiv.org/list/cs.CR/recent", |
| "https://googleprojectzero.blogspot.com/", |
| "https://portswigger.net/research", |
| ] |
|
|
|
|
| @dataclass |
| class Document: |
| doc_id: str |
| source: str |
| title: str |
| text: str |
| tags: list[str] = field(default_factory=list) |
| score: float = 0.0 |
|
|
|
|
| |
| def _hash_embed(text: str, dim: int = EMBED_DIM) -> list[float]: |
| """Deterministic hash-bag embedder β no external deps, good enough for |
| cosine-similarity ranking inside a single corpus.""" |
| vec = [0.0] * dim |
| for tok in text.lower().split(): |
| h = int(hashlib.blake2s(tok.encode("utf-8"), digest_size=4).hexdigest(), 16) |
| vec[h % dim] += 1.0 |
| n = math.sqrt(sum(v * v for v in vec)) or 1.0 |
| return [v / n for v in vec] |
|
|
|
|
| def _embed(text: str) -> list[float]: |
| try: |
| from embedding_memory import embed as _real_embed |
| v = _real_embed(text) |
| if isinstance(v, list) and v: |
| return v |
| except Exception: |
| pass |
| return _hash_embed(text) |
|
|
|
|
| def _cosine(a: list[float], b: list[float]) -> float: |
| if not a or not b or len(a) != len(b): |
| return 0.0 |
| return sum(x * y for x, y in zip(a, b)) |
|
|
|
|
| |
| def _connect() -> sqlite3.Connection: |
| DB_PATH.parent.mkdir(parents=True, exist_ok=True) |
| conn = sqlite3.connect(DB_PATH) |
| conn.execute(""" |
| CREATE TABLE IF NOT EXISTS docs ( |
| doc_id TEXT PRIMARY KEY, |
| source TEXT NOT NULL, |
| title TEXT NOT NULL, |
| text TEXT NOT NULL, |
| tags TEXT NOT NULL, |
| embed TEXT NOT NULL, |
| added_at REAL NOT NULL |
| ) |
| """) |
| return conn |
|
|
|
|
| class KnowledgeRAG: |
| """Vector store of security knowledge documents.""" |
|
|
| SOURCES = SOURCES_DEFAULT |
|
|
| def __init__(self, db_path: Path | None = None): |
| global DB_PATH |
| if db_path is not None: |
| DB_PATH = Path(db_path) |
|
|
| |
| def add(self, *, source: str, title: str, text: str, |
| tags: Iterable[str] | None = None) -> str: |
| doc_id = hashlib.blake2s( |
| f"{source}::{title}::{text[:200]}".encode("utf-8"), digest_size=8 |
| ).hexdigest() |
| embed = _embed(f"{title}\n{text}") |
| with _connect() as c: |
| c.execute( |
| "INSERT OR REPLACE INTO docs VALUES (?, ?, ?, ?, ?, ?, ?)", |
| (doc_id, source, title, text, |
| json.dumps(list(tags or [])), |
| json.dumps(embed), |
| time.time()), |
| ) |
| return doc_id |
|
|
| def add_many(self, items: list[dict[str, Any]]) -> int: |
| added = 0 |
| for it in items: |
| try: |
| self.add( |
| source=str(it["source"]), |
| title=str(it["title"]), |
| text=str(it["text"]), |
| tags=it.get("tags") or [], |
| ) |
| added += 1 |
| except Exception as exc: |
| LOG.warning("ingest failed: %s", exc) |
| return added |
|
|
| def ingest_text_file(self, path: str | Path, source: str) -> int: |
| """Ingest a markdown / text file as one document per top-level heading.""" |
| p = Path(path) |
| if not p.exists(): |
| return 0 |
| text = p.read_text(encoding="utf-8", errors="ignore") |
| chunks: list[tuple[str, str]] = [] |
| cur_title = p.stem |
| cur_buf: list[str] = [] |
| for line in text.splitlines(): |
| if line.startswith("# ") or line.startswith("## "): |
| if cur_buf: |
| chunks.append((cur_title, "\n".join(cur_buf).strip())) |
| cur_title = line.lstrip("# ").strip() or p.stem |
| cur_buf = [] |
| else: |
| cur_buf.append(line) |
| if cur_buf: |
| chunks.append((cur_title, "\n".join(cur_buf).strip())) |
| return self.add_many([ |
| {"source": source, "title": t, "text": b, "tags": [p.stem]} |
| for t, b in chunks if b |
| ]) |
|
|
| |
| def query(self, query_text: str, *, top_k: int = 5, |
| source_prefix: str | None = None) -> list[Document]: |
| qv = _embed(query_text) |
| with _connect() as c: |
| rows = c.execute( |
| "SELECT doc_id, source, title, text, tags, embed FROM docs" |
| ).fetchall() |
| scored: list[Document] = [] |
| for doc_id, source, title, text, tags_json, embed_json in rows: |
| if source_prefix and not source.startswith(source_prefix): |
| continue |
| try: |
| ev = json.loads(embed_json) |
| tags = json.loads(tags_json) |
| except Exception: |
| continue |
| score = _cosine(qv, ev) |
| scored.append(Document( |
| doc_id=doc_id, source=source, title=title, |
| text=text, tags=tags, score=score, |
| )) |
| scored.sort(key=lambda d: d.score, reverse=True) |
| return scored[:top_k] |
|
|
| def stats(self) -> dict[str, Any]: |
| with _connect() as c: |
| n = c.execute("SELECT COUNT(*) FROM docs").fetchone()[0] |
| sources = [r[0] for r in c.execute( |
| "SELECT DISTINCT source FROM docs ORDER BY source" |
| )] |
| return {"total_docs": n, "sources": sources, "db": str(DB_PATH)} |
|
|