rhodawk-ai-devops-engine / knowledge_rag.py
Rhodawk Agent
Round 1 sync β€” files missed in previous push
2798900
"""
knowledge_rag.py β€” security-knowledge RAG store (Masterplan Β§1.4).
A small, dependency-light vector store of security writeups, CVE detail
pages, disclosed bug-bounty reports, and research papers. The store reuses
the embedder from ``embedding_memory.py`` if available, otherwise falls
back to a deterministic hash-bag baseline so unit tests pass with zero
extra dependencies.
The store is a single SQLite file under ``/data/knowledge_rag.sqlite`` so
it survives Space restarts and can be snapshotted to GitHub like the rest
of the Hermes memory.
"""
from __future__ import annotations
import hashlib
import json
import logging
import math
import os
import sqlite3
import time
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any, Iterable
LOG = logging.getLogger("knowledge_rag")
DB_PATH = Path(os.getenv("KNOWLEDGE_RAG_DB", "/data/knowledge_rag.sqlite"))
EMBED_DIM = 256
SOURCES_DEFAULT: list[str] = [
"https://hackerone.com/hacktivity",
"https://www.cvedetails.com/",
"https://github.com/ngalongc/bug-bounty-reference",
"https://github.com/EdOverflow/bugbounty-cheatsheet",
"https://github.com/nicowillis/awesome-bugbounty-writeups",
"https://arxiv.org/list/cs.CR/recent",
"https://googleprojectzero.blogspot.com/",
"https://portswigger.net/research",
]
@dataclass
class Document:
doc_id: str
source: str
title: str
text: str
tags: list[str] = field(default_factory=list)
score: float = 0.0
# ── Embedding ──────────────────────────────────────────────────────────────
def _hash_embed(text: str, dim: int = EMBED_DIM) -> list[float]:
"""Deterministic hash-bag embedder β€” no external deps, good enough for
cosine-similarity ranking inside a single corpus."""
vec = [0.0] * dim
for tok in text.lower().split():
h = int(hashlib.blake2s(tok.encode("utf-8"), digest_size=4).hexdigest(), 16)
vec[h % dim] += 1.0
n = math.sqrt(sum(v * v for v in vec)) or 1.0
return [v / n for v in vec]
def _embed(text: str) -> list[float]:
try:
from embedding_memory import embed as _real_embed # type: ignore
v = _real_embed(text)
if isinstance(v, list) and v:
return v
except Exception: # noqa: BLE001
pass
return _hash_embed(text)
def _cosine(a: list[float], b: list[float]) -> float:
if not a or not b or len(a) != len(b):
return 0.0
return sum(x * y for x, y in zip(a, b)) # both are unit-normalised
# ── Storage ────────────────────────────────────────────────────────────────
def _connect() -> sqlite3.Connection:
DB_PATH.parent.mkdir(parents=True, exist_ok=True)
conn = sqlite3.connect(DB_PATH)
conn.execute("""
CREATE TABLE IF NOT EXISTS docs (
doc_id TEXT PRIMARY KEY,
source TEXT NOT NULL,
title TEXT NOT NULL,
text TEXT NOT NULL,
tags TEXT NOT NULL,
embed TEXT NOT NULL,
added_at REAL NOT NULL
)
""")
return conn
class KnowledgeRAG:
"""Vector store of security knowledge documents."""
SOURCES = SOURCES_DEFAULT
def __init__(self, db_path: Path | None = None):
global DB_PATH
if db_path is not None:
DB_PATH = Path(db_path)
# ── ingestion ─────────────────────────────────────────────────────────
def add(self, *, source: str, title: str, text: str,
tags: Iterable[str] | None = None) -> str:
doc_id = hashlib.blake2s(
f"{source}::{title}::{text[:200]}".encode("utf-8"), digest_size=8
).hexdigest()
embed = _embed(f"{title}\n{text}")
with _connect() as c:
c.execute(
"INSERT OR REPLACE INTO docs VALUES (?, ?, ?, ?, ?, ?, ?)",
(doc_id, source, title, text,
json.dumps(list(tags or [])),
json.dumps(embed),
time.time()),
)
return doc_id
def add_many(self, items: list[dict[str, Any]]) -> int:
added = 0
for it in items:
try:
self.add(
source=str(it["source"]),
title=str(it["title"]),
text=str(it["text"]),
tags=it.get("tags") or [],
)
added += 1
except Exception as exc: # noqa: BLE001
LOG.warning("ingest failed: %s", exc)
return added
def ingest_text_file(self, path: str | Path, source: str) -> int:
"""Ingest a markdown / text file as one document per top-level heading."""
p = Path(path)
if not p.exists():
return 0
text = p.read_text(encoding="utf-8", errors="ignore")
chunks: list[tuple[str, str]] = []
cur_title = p.stem
cur_buf: list[str] = []
for line in text.splitlines():
if line.startswith("# ") or line.startswith("## "):
if cur_buf:
chunks.append((cur_title, "\n".join(cur_buf).strip()))
cur_title = line.lstrip("# ").strip() or p.stem
cur_buf = []
else:
cur_buf.append(line)
if cur_buf:
chunks.append((cur_title, "\n".join(cur_buf).strip()))
return self.add_many([
{"source": source, "title": t, "text": b, "tags": [p.stem]}
for t, b in chunks if b
])
# ── query ─────────────────────────────────────────────────────────────
def query(self, query_text: str, *, top_k: int = 5,
source_prefix: str | None = None) -> list[Document]:
qv = _embed(query_text)
with _connect() as c:
rows = c.execute(
"SELECT doc_id, source, title, text, tags, embed FROM docs"
).fetchall()
scored: list[Document] = []
for doc_id, source, title, text, tags_json, embed_json in rows:
if source_prefix and not source.startswith(source_prefix):
continue
try:
ev = json.loads(embed_json)
tags = json.loads(tags_json)
except Exception:
continue
score = _cosine(qv, ev)
scored.append(Document(
doc_id=doc_id, source=source, title=title,
text=text, tags=tags, score=score,
))
scored.sort(key=lambda d: d.score, reverse=True)
return scored[:top_k]
def stats(self) -> dict[str, Any]:
with _connect() as c:
n = c.execute("SELECT COUNT(*) FROM docs").fetchone()[0]
sources = [r[0] for r in c.execute(
"SELECT DISTINCT source FROM docs ORDER BY source"
)]
return {"total_docs": n, "sources": sources, "db": str(DB_PATH)}