FerrellSyntheticIntelligence
Add understanding engine, conversation interface, meditation engine, unified launcher
7d9e142 | #!/usr/bin/env python3 | |
| """ | |
| VITALIS KNOWLEDGE SEEDER | |
| Feeds real developer knowledge into Vitalis on day one. | |
| She doesn't start empty. She starts knowing. | |
| """ | |
| import json, time, hashlib, os | |
| from pathlib import Path | |
| from typing import Dict, List, Optional | |
| try: | |
| import numpy as np | |
| NUMPY_OK = True | |
| except ImportError: | |
| NUMPY_OK = False | |
| KNOWLEDGE_SEEDS: Dict[str, Dict] = { | |
| # ── REACT ─────────────────────────────────────────────────────── | |
| "react_functional_component": { | |
| "category": "react", | |
| "pattern": "functional_component", | |
| "template": '''import React, { useState, useEffect } from 'react'; | |
| const {ComponentName} = ({ {props} }) => { | |
| const [state, setState] = useState({initialState}); | |
| useEffect(() => { | |
| // side effect here | |
| return () => { /* cleanup */ }; | |
| }, [{deps}]); | |
| return ( | |
| <div className="{ComponentName.lower()}"> | |
| {/* render */} | |
| </div> | |
| ); | |
| }; | |
| export default {ComponentName};''', | |
| "tags": ["react", "component", "hooks", "frontend"], | |
| "description": "Standard functional component with useState and useEffect", | |
| }, | |
| "react_custom_hook": { | |
| "category": "react", | |
| "pattern": "custom_hook", | |
| "template": '''import { useState, useEffect, useCallback } from 'react'; | |
| const use{HookName} = ({params}) => { | |
| const [data, setData] = useState(null); | |
| const [loading, setLoading] = useState(false); | |
| const [error, setError] = useState(null); | |
| const fetch{HookName} = useCallback(async () => { | |
| setLoading(true); | |
| setError(null); | |
| try { | |
| const result = await {asyncOperation}; | |
| setData(result); | |
| } catch (err) { | |
| setError(err.message); | |
| } finally { | |
| setLoading(false); | |
| } | |
| }, [{deps}]); | |
| useEffect(() => { fetch{HookName}(); }, [fetch{HookName}]); | |
| return { data, loading, error, refetch: fetch{HookName} }; | |
| }; | |
| export default use{HookName};''', | |
| "tags": ["react", "hooks", "custom-hook", "data-fetching"], | |
| "description": "Custom hook with loading/error/data state pattern", | |
| }, | |
| # ── AUTH ───────────────────────────────────────────────────────── | |
| "jwt_auth_middleware": { | |
| "category": "auth", | |
| "pattern": "jwt_middleware", | |
| "template": '''const jwt = require('jsonwebtoken'); | |
| const SECRET = process.env.JWT_SECRET || 'change-this-in-production'; | |
| const authMiddleware = (req, res, next) => { | |
| const header = req.headers['authorization']; | |
| if (!header) return res.status(401).json({ error: 'No token provided' }); | |
| const token = header.startsWith('Bearer ') ? header.slice(7) : header; | |
| try { | |
| const decoded = jwt.verify(token, SECRET); | |
| req.user = decoded; | |
| next(); | |
| } catch (err) { | |
| const msg = err.name === 'TokenExpiredError' ? 'Token expired' : 'Invalid token'; | |
| return res.status(401).json({ error: msg }); | |
| } | |
| }; | |
| const signToken = (payload, expiresIn = '24h') => | |
| jwt.sign(payload, SECRET, { expiresIn, algorithm: 'HS256' }); | |
| const refreshToken = (oldToken) => { | |
| const decoded = jwt.verify(oldToken, SECRET, { ignoreExpiration: true }); | |
| const { iat, exp, ...payload } = decoded; | |
| return signToken(payload); | |
| }; | |
| module.exports = { authMiddleware, signToken, refreshToken };''', | |
| "tags": ["auth", "jwt", "middleware", "express", "security"], | |
| "description": "Complete JWT auth middleware with sign and refresh", | |
| }, | |
| "python_jwt_auth": { | |
| "category": "auth", | |
| "pattern": "python_jwt", | |
| "template": '''import jwt | |
| import os | |
| from datetime import datetime, timedelta, timezone | |
| from functools import wraps | |
| from flask import request, jsonify, g | |
| SECRET = os.getenv("JWT_SECRET", "change-this-in-production") | |
| ALGORITHM = "HS256" | |
| def create_token(user_id: int, role: str = "user", expires_hours: int = 24) -> str: | |
| payload = { | |
| "sub": str(user_id), | |
| "role": role, | |
| "iat": datetime.now(timezone.utc), | |
| "exp": datetime.now(timezone.utc) + timedelta(hours=expires_hours), | |
| } | |
| return jwt.encode(payload, SECRET, algorithm=ALGORITHM) | |
| def decode_token(token: str) -> dict: | |
| return jwt.decode(token, SECRET, algorithms=[ALGORITHM]) | |
| def require_auth(f): | |
| @wraps(f) | |
| def decorated(*args, **kwargs): | |
| token = request.headers.get("Authorization", "").replace("Bearer ", "") | |
| if not token: | |
| return jsonify({"error": "No token"}), 401 | |
| try: | |
| g.user = decode_token(token) | |
| except jwt.ExpiredSignatureError: | |
| return jsonify({"error": "Token expired"}), 401 | |
| except jwt.InvalidTokenError: | |
| return jsonify({"error": "Invalid token"}), 401 | |
| return f(*args, **kwargs) | |
| return decorated | |
| def require_role(role: str): | |
| def decorator(f): | |
| @wraps(f) | |
| def decorated(*args, **kwargs): | |
| if not hasattr(g, "user") or g.user.get("role") != role: | |
| return jsonify({"error": "Forbidden"}), 403 | |
| return f(*args, **kwargs) | |
| return decorated | |
| return decorator''', | |
| "tags": ["auth", "jwt", "python", "flask", "security"], | |
| "description": "Python JWT auth with Flask decorator and role system", | |
| }, | |
| # ── REST API ───────────────────────────────────────────────────── | |
| "express_rest_router": { | |
| "category": "api", | |
| "pattern": "rest_router", | |
| "template": '''const express = require('express'); | |
| const router = express.Router(); | |
| const { authMiddleware } = require('../middleware/auth'); | |
| // GET all | |
| router.get('/', authMiddleware, async (req, res) => { | |
| try { | |
| const items = await {Model}.findAll({ where: { userId: req.user.sub } }); | |
| res.json({ success: true, data: items }); | |
| } catch (err) { | |
| res.status(500).json({ error: err.message }); | |
| } | |
| }); | |
| // GET by id | |
| router.get('/:id', authMiddleware, async (req, res) => { | |
| try { | |
| const item = await {Model}.findByPk(req.params.id); | |
| if (!item) return res.status(404).json({ error: 'Not found' }); | |
| res.json({ success: true, data: item }); | |
| } catch (err) { | |
| res.status(500).json({ error: err.message }); | |
| } | |
| }); | |
| // POST create | |
| router.post('/', authMiddleware, async (req, res) => { | |
| try { | |
| const item = await {Model}.create({ ...req.body, userId: req.user.sub }); | |
| res.status(201).json({ success: true, data: item }); | |
| } catch (err) { | |
| res.status(400).json({ error: err.message }); | |
| } | |
| }); | |
| // PUT update | |
| router.put('/:id', authMiddleware, async (req, res) => { | |
| try { | |
| const [updated] = await {Model}.update(req.body, { where: { id: req.params.id } }); | |
| if (!updated) return res.status(404).json({ error: 'Not found' }); | |
| const item = await {Model}.findByPk(req.params.id); | |
| res.json({ success: true, data: item }); | |
| } catch (err) { | |
| res.status(400).json({ error: err.message }); | |
| } | |
| }); | |
| // DELETE | |
| router.delete('/:id', authMiddleware, async (req, res) => { | |
| try { | |
| const deleted = await {Model}.destroy({ where: { id: req.params.id } }); | |
| if (!deleted) return res.status(404).json({ error: 'Not found' }); | |
| res.json({ success: true, message: 'Deleted' }); | |
| } catch (err) { | |
| res.status(500).json({ error: err.message }); | |
| } | |
| }); | |
| module.exports = router;''', | |
| "tags": ["api", "rest", "express", "crud", "backend"], | |
| "description": "Full CRUD REST router with auth middleware", | |
| }, | |
| "flask_rest_blueprint": { | |
| "category": "api", | |
| "pattern": "flask_blueprint", | |
| "template": '''from flask import Blueprint, request, jsonify, g | |
| from .auth import require_auth | |
| from .models import {Model} | |
| from .extensions import db | |
| bp = Blueprint("{resource}", __name__, url_prefix="/{resource}s") | |
| @bp.get("/") | |
| @require_auth | |
| def list_items(): | |
| items = {Model}.query.filter_by(user_id=g.user["sub"]).all() | |
| return jsonify({"success": True, "data": [i.to_dict() for i in items]}) | |
| @bp.get("/<int:item_id>") | |
| @require_auth | |
| def get_item(item_id): | |
| item = {Model}.query.get_or_404(item_id) | |
| return jsonify({"success": True, "data": item.to_dict()}) | |
| @bp.post("/") | |
| @require_auth | |
| def create_item(): | |
| data = request.get_json(force=True) | |
| item = {Model}(**data, user_id=g.user["sub"]) | |
| db.session.add(item) | |
| db.session.commit() | |
| return jsonify({"success": True, "data": item.to_dict()}), 201 | |
| @bp.put("/<int:item_id>") | |
| @require_auth | |
| def update_item(item_id): | |
| item = {Model}.query.get_or_404(item_id) | |
| for k, v in request.get_json(force=True).items(): | |
| setattr(item, k, v) | |
| db.session.commit() | |
| return jsonify({"success": True, "data": item.to_dict()}) | |
| @bp.delete("/<int:item_id>") | |
| @require_auth | |
| def delete_item(item_id): | |
| item = {Model}.query.get_or_404(item_id) | |
| db.session.delete(item) | |
| db.session.commit() | |
| return jsonify({"success": True, "message": "Deleted"})''', | |
| "tags": ["api", "rest", "flask", "blueprint", "crud"], | |
| "description": "Flask Blueprint with full CRUD and auth", | |
| }, | |
| # ── DATABASE ────────────────────────────────────────────────────── | |
| "sqlalchemy_model": { | |
| "category": "database", | |
| "pattern": "orm_model", | |
| "template": '''from datetime import datetime, timezone | |
| from .extensions import db | |
| class {ModelName}(db.Model): | |
| __tablename__ = "{table_name}" | |
| id = db.Column(db.Integer, primary_key=True) | |
| user_id = db.Column(db.Integer, db.ForeignKey("users.id"), nullable=False) | |
| created_at = db.Column(db.DateTime, default=lambda: datetime.now(timezone.utc)) | |
| updated_at = db.Column(db.DateTime, default=lambda: datetime.now(timezone.utc), | |
| onupdate=lambda: datetime.now(timezone.utc)) | |
| # Add your fields below: | |
| # name = db.Column(db.String(255), nullable=False) | |
| def to_dict(self): | |
| return { | |
| "id": self.id, | |
| "user_id": self.user_id, | |
| "created_at": self.created_at.isoformat(), | |
| "updated_at": self.updated_at.isoformat(), | |
| } | |
| def __repr__(self): | |
| return f"<{ModelName} id={self.id}>"''', | |
| "tags": ["database", "sqlalchemy", "orm", "model", "python"], | |
| "description": "SQLAlchemy model with timestamps and user FK", | |
| }, | |
| # ── PYTHON ASYNC ────────────────────────────────────────────────── | |
| "async_api_client": { | |
| "category": "async", | |
| "pattern": "async_client", | |
| "template": '''import asyncio | |
| import aiohttp | |
| from typing import Any, Dict, Optional | |
| class AsyncAPIClient: | |
| def __init__(self, base_url: str, token: Optional[str] = None): | |
| self.base_url = base_url.rstrip("/") | |
| self.headers = {"Content-Type": "application/json"} | |
| if token: | |
| self.headers["Authorization"] = f"Bearer {token}" | |
| self._session: Optional[aiohttp.ClientSession] = None | |
| async def __aenter__(self): | |
| self._session = aiohttp.ClientSession(headers=self.headers) | |
| return self | |
| async def __aexit__(self, *args): | |
| if self._session: | |
| await self._session.close() | |
| async def get(self, path: str, params: Optional[Dict] = None) -> Any: | |
| async with self._session.get(f"{self.base_url}{path}", params=params) as r: | |
| r.raise_for_status() | |
| return await r.json() | |
| async def post(self, path: str, data: Dict) -> Any: | |
| async with self._session.post(f"{self.base_url}{path}", json=data) as r: | |
| r.raise_for_status() | |
| return await r.json() | |
| async def put(self, path: str, data: Dict) -> Any: | |
| async with self._session.put(f"{self.base_url}{path}", json=data) as r: | |
| r.raise_for_status() | |
| return await r.json() | |
| async def delete(self, path: str) -> Any: | |
| async with self._session.delete(f"{self.base_url}{path}") as r: | |
| r.raise_for_status() | |
| return await r.json() | |
| # Usage: | |
| # async with AsyncAPIClient("https://api.example.com", token=TOKEN) as client: | |
| # data = await client.get("/users")''', | |
| "tags": ["async", "aiohttp", "api-client", "python"], | |
| "description": "Async HTTP client using aiohttp with context manager", | |
| }, | |
| # ── WEBSOCKETS ──────────────────────────────────────────────────── | |
| "websocket_server": { | |
| "category": "realtime", | |
| "pattern": "websocket", | |
| "template": '''import asyncio | |
| import json | |
| import websockets | |
| from typing import Set | |
| CONNECTIONS: Set[websockets.WebSocketServerProtocol] = set() | |
| async def broadcast(message: dict): | |
| if CONNECTIONS: | |
| data = json.dumps(message) | |
| await asyncio.gather(*[ws.send(data) for ws in CONNECTIONS], return_exceptions=True) | |
| async def handler(ws: websockets.WebSocketServerProtocol): | |
| CONNECTIONS.add(ws) | |
| print(f"[WS] Connected: {ws.remote_address} Total: {len(CONNECTIONS)}") | |
| try: | |
| async for raw in ws: | |
| try: | |
| msg = json.loads(raw) | |
| event = msg.get("event") | |
| data = msg.get("data", {}) | |
| # Route events | |
| if event == "ping": | |
| await ws.send(json.dumps({"event": "pong"})) | |
| elif event == "broadcast": | |
| await broadcast({"event": "message", "data": data}) | |
| else: | |
| await ws.send(json.dumps({"event": "error", "msg": f"Unknown event: {event}"})) | |
| except json.JSONDecodeError: | |
| await ws.send(json.dumps({"event": "error", "msg": "Invalid JSON"})) | |
| except websockets.ConnectionClosed: | |
| pass | |
| finally: | |
| CONNECTIONS.discard(ws) | |
| print(f"[WS] Disconnected. Total: {len(CONNECTIONS)}") | |
| async def main(): | |
| async with websockets.serve(handler, "0.0.0.0", 8765): | |
| print("[WS] Server running on ws://0.0.0.0:8765") | |
| await asyncio.Future() | |
| if __name__ == "__main__": | |
| asyncio.run(main())''', | |
| "tags": ["websocket", "realtime", "async", "python", "broadcast"], | |
| "description": "WebSocket server with broadcast and event routing", | |
| }, | |
| # ── ANDROID / KOTLIN ────────────────────────────────────────────── | |
| "android_viewmodel": { | |
| "category": "android", | |
| "pattern": "viewmodel", | |
| "template": '''import androidx.lifecycle.ViewModel | |
| import androidx.lifecycle.viewModelScope | |
| import kotlinx.coroutines.flow.MutableStateFlow | |
| import kotlinx.coroutines.flow.StateFlow | |
| import kotlinx.coroutines.flow.asStateFlow | |
| import kotlinx.coroutines.launch | |
| data class {Screen}State( | |
| val isLoading: Boolean = false, | |
| val data: List<{Model}> = emptyList(), | |
| val error: String? = null, | |
| ) | |
| class {Screen}ViewModel( | |
| private val repository: {Screen}Repository | |
| ) : ViewModel() { | |
| private val _state = MutableStateFlow({Screen}State()) | |
| val state: StateFlow<{Screen}State> = _state.asStateFlow() | |
| init { load() } | |
| fun load() { | |
| viewModelScope.launch { | |
| _state.value = _state.value.copy(isLoading = true, error = null) | |
| try { | |
| val result = repository.getAll() | |
| _state.value = _state.value.copy(isLoading = false, data = result) | |
| } catch (e: Exception) { | |
| _state.value = _state.value.copy(isLoading = false, error = e.message) | |
| } | |
| } | |
| } | |
| fun refresh() = load() | |
| }''', | |
| "tags": ["android", "kotlin", "viewmodel", "stateflow", "mvvm"], | |
| "description": "Android ViewModel with StateFlow and coroutines", | |
| }, | |
| } | |
| class KnowledgeSeeder: | |
| """ | |
| Seeds Vitalis's knowledge base with real developer patterns. | |
| Feeds TinyTrainer and/or the pattern library. | |
| """ | |
| def __init__(self, workspace: Optional[Path] = None): | |
| self.workspace = workspace or (Path.home() / ".vitalis_workspace") | |
| self.workspace.mkdir(parents=True, exist_ok=True) | |
| self.seed_path = self.workspace / "knowledge_seeds.json" | |
| self.index_path = self.workspace / "seed_index.json" | |
| def seed_all(self, verbose: bool = True) -> int: | |
| """Seed all built-in patterns. Returns count seeded.""" | |
| count = 0 | |
| existing = self._load_index() | |
| for key, seed in KNOWLEDGE_SEEDS.items(): | |
| seed_id = self._make_id(key, seed["template"]) | |
| if seed_id in existing: | |
| continue # already seeded | |
| self._write_seed(key, seed, seed_id) | |
| existing[seed_id] = {"key": key, "ts": time.time()} | |
| count += 1 | |
| if verbose: | |
| print(f"[SEEDER] ✓ {key} ({seed['category']})") | |
| self._save_index(existing) | |
| if verbose: | |
| print(f"\n[SEEDER] Done. {count} new patterns seeded. " | |
| f"Total in index: {len(existing)}") | |
| return count | |
| def seed_from_file(self, path: str, category: str = "custom") -> int: | |
| """Seed patterns from an external JSON file.""" | |
| p = Path(path) | |
| if not p.exists(): | |
| print(f"[SEEDER] File not found: {path}") | |
| return 0 | |
| with open(p) as f: | |
| data = json.load(f) | |
| count = 0 | |
| existing = self._load_index() | |
| for key, seed in data.items(): | |
| seed.setdefault("category", category) | |
| seed_id = self._make_id(key, seed.get("template", key)) | |
| if seed_id in existing: | |
| continue | |
| self._write_seed(key, seed, seed_id) | |
| existing[seed_id] = {"key": key, "ts": time.time()} | |
| count += 1 | |
| self._save_index(existing) | |
| print(f"[SEEDER] Seeded {count} patterns from {path}") | |
| return count | |
| def seed_from_codebase(self, source_dir: str, category: str = "learned") -> int: | |
| """ | |
| Extract patterns directly from an existing codebase. | |
| Vitalis learns from YOUR code. | |
| """ | |
| source = Path(source_dir) | |
| if not source.exists(): | |
| print(f"[SEEDER] Directory not found: {source_dir}") | |
| return 0 | |
| count = 0 | |
| existing = self._load_index() | |
| patterns = [] | |
| for py_file in source.rglob("*.py"): | |
| if any(x in str(py_file) for x in ["__pycache__", ".git", "venv", "node_modules"]): | |
| continue | |
| try: | |
| code = py_file.read_text(encoding="utf-8", errors="ignore") | |
| if len(code) < 100: | |
| continue | |
| key = f"learned_{py_file.stem}_{hashlib.md5(code[:200].encode()).hexdigest()[:6]}" | |
| seed = { | |
| "category": category, | |
| "pattern": py_file.stem, | |
| "template": code[:3000], # cap at 3k chars | |
| "tags": ["learned", category, py_file.stem], | |
| "description": f"Learned from {py_file.name}", | |
| "source_file": str(py_file), | |
| } | |
| seed_id = self._make_id(key, seed["template"]) | |
| if seed_id not in existing: | |
| self._write_seed(key, seed, seed_id) | |
| existing[seed_id] = {"key": key, "ts": time.time()} | |
| count += 1 | |
| except Exception as e: | |
| print(f"[SEEDER] Skip {py_file.name}: {e}") | |
| self._save_index(existing) | |
| print(f"[SEEDER] Learned {count} patterns from {source_dir}") | |
| return count | |
| def get_pattern(self, key: str) -> Optional[Dict]: | |
| """Retrieve a specific pattern by key.""" | |
| if self.seed_path.exists(): | |
| with open(self.seed_path) as f: | |
| seeds = json.load(f) | |
| return seeds.get(key) | |
| return None | |
| def search(self, query: str, top_k: int = 5) -> List[Dict]: | |
| """Simple tag/keyword search over seeded patterns.""" | |
| if not self.seed_path.exists(): | |
| return [] | |
| with open(self.seed_path) as f: | |
| seeds = json.load(f) | |
| q = query.lower() | |
| results = [] | |
| for key, seed in seeds.items(): | |
| score = 0 | |
| if q in key.lower(): score += 3 | |
| if q in seed.get("category", ""): score += 2 | |
| if q in seed.get("description", "").lower(): score += 2 | |
| for tag in seed.get("tags", []): | |
| if q in tag: score += 1 | |
| if q in seed.get("template", "").lower(): score += 1 | |
| if score > 0: | |
| results.append((score, key, seed)) | |
| results.sort(reverse=True) | |
| return [{"key": k, **s} for _, k, s in results[:top_k]] | |
| def status(self) -> Dict: | |
| """Return seeding status.""" | |
| index = self._load_index() | |
| cats: Dict[str, int] = {} | |
| if self.seed_path.exists(): | |
| with open(self.seed_path) as f: | |
| seeds = json.load(f) | |
| for s in seeds.values(): | |
| c = s.get("category", "unknown") | |
| cats[c] = cats.get(c, 0) + 1 | |
| return { | |
| "total_seeded": len(index), | |
| "categories": cats, | |
| "seed_file": str(self.seed_path), | |
| } | |
| def _make_id(self, key: str, template: str) -> str: | |
| h = hashlib.md5(f"{key}{template[:100]}".encode()).hexdigest()[:12] | |
| return h | |
| def _write_seed(self, key: str, seed: Dict, seed_id: str): | |
| seeds = {} | |
| if self.seed_path.exists(): | |
| with open(self.seed_path) as f: | |
| seeds = json.load(f) | |
| seeds[key] = {**seed, "_id": seed_id, "_seeded_at": time.time()} | |
| with open(self.seed_path, "w") as f: | |
| json.dump(seeds, f, indent=2) | |
| def _load_index(self) -> Dict: | |
| if self.index_path.exists(): | |
| with open(self.index_path) as f: | |
| return json.load(f) | |
| return {} | |
| def _save_index(self, index: Dict): | |
| with open(self.index_path, "w") as f: | |
| json.dump(index, f, indent=2) | |
| def feed_to_hippocampus(self) -> int: | |
| """ | |
| Push seeded knowledge into Vitalis's Hippocampus memory. | |
| Call this after seed_all() to make patterns retrievable by similarity. | |
| """ | |
| if not NUMPY_OK: | |
| print("[SEEDER] numpy not available — skipping hippocampus feed") | |
| return 0 | |
| try: | |
| from src.hippocampus import Hippocampus | |
| hc = Hippocampus() | |
| count = 0 | |
| if not self.seed_path.exists(): | |
| return 0 | |
| with open(self.seed_path) as f: | |
| seeds = json.load(f) | |
| for key, seed in seeds.items(): | |
| template = seed.get("template", key) | |
| seed_val = sum(ord(c) for c in key + template[:50]) | |
| np.random.seed(seed_val % (2**31)) | |
| vector = np.random.choice([-1, 1], size=10000).astype(np.int8) | |
| slot = f"seed_{key}" | |
| hc.store(slot, vector) | |
| count += 1 | |
| print(f"[SEEDER] Fed {count} patterns to Hippocampus.") | |
| return count | |
| except Exception as e: | |
| print(f"[SEEDER] Hippocampus feed failed: {e}") | |
| return 0 | |
| if __name__ == "__main__": | |
| import sys | |
| seeder = KnowledgeSeeder() | |
| if len(sys.argv) > 1: | |
| cmd = sys.argv[1] | |
| if cmd == "seed": | |
| seeder.seed_all(verbose=True) | |
| elif cmd == "learn" and len(sys.argv) > 2: | |
| seeder.seed_from_codebase(sys.argv[2]) | |
| elif cmd == "search" and len(sys.argv) > 2: | |
| results = seeder.search(sys.argv[2]) | |
| for r in results: | |
| print(f"\n── {r['key']} ({r['category']}) ──") | |
| print(r['description']) | |
| elif cmd == "status": | |
| print(json.dumps(seeder.status(), indent=2)) | |
| elif cmd == "hippocampus": | |
| seeder.feed_to_hippocampus() | |
| else: | |
| print("Commands: seed | learn <dir> | search <query> | status | hippocampus") | |
| seeder.seed_all() | |