FerrellSyntheticIntelligence
Add understanding engine, conversation interface, meditation engine, unified launcher
7d9e142 | #!/usr/bin/env python3 | |
| """ | |
| VITALIS CODEGEN ENGINE | |
| Real code generation. Not stubs. Not scaffolds. | |
| "scaffold auth" β complete JWT auth system with routes, middleware, models. | |
| """ | |
| import re, os, json, time | |
| from pathlib import Path | |
| from typing import Dict, List, Optional, Tuple | |
| # ββ FULL CODE TEMPLATES βββββββββββββββββββββββββββββββββββββββββββββββ | |
| # These generate working files, not stubs. | |
| GENERATORS: Dict[str, Dict] = { | |
| "auth": { | |
| "description": "Complete JWT authentication system", | |
| "tags": ["auth", "jwt", "login", "register", "security"], | |
| "files": { | |
| "auth/middleware.py": '''import jwt, os | |
| from datetime import datetime, timedelta, timezone | |
| from functools import wraps | |
| from flask import request, jsonify, g | |
| SECRET = os.getenv("JWT_SECRET", "CHANGE_THIS_SECRET") | |
| ALGORITHM = "HS256" | |
| def create_token(user_id: int, role: str = "user", hours: int = 24) -> str: | |
| now = datetime.now(timezone.utc) | |
| return jwt.encode( | |
| {"sub": str(user_id), "role": role, "iat": now, | |
| "exp": now + timedelta(hours=hours)}, | |
| SECRET, algorithm=ALGORITHM | |
| ) | |
| def decode_token(token: str) -> dict: | |
| return jwt.decode(token, SECRET, algorithms=[ALGORITHM]) | |
| def require_auth(f): | |
| @wraps(f) | |
| def decorated(*args, **kwargs): | |
| raw = request.headers.get("Authorization", "") | |
| token = raw.replace("Bearer ", "").strip() | |
| if not token: | |
| return jsonify({"error": "Unauthorized"}), 401 | |
| try: | |
| g.user = decode_token(token) | |
| except jwt.ExpiredSignatureError: | |
| return jsonify({"error": "Token expired"}), 401 | |
| except jwt.InvalidTokenError: | |
| return jsonify({"error": "Invalid token"}), 401 | |
| return f(*args, **kwargs) | |
| return decorated | |
| def require_role(role: str): | |
| def decorator(f): | |
| @wraps(f) | |
| def decorated(*args, **kwargs): | |
| if not hasattr(g, "user") or g.user.get("role") != role: | |
| return jsonify({"error": "Forbidden"}), 403 | |
| return f(*args, **kwargs) | |
| return decorated | |
| return decorator | |
| ''', | |
| "auth/routes.py": '''from flask import Blueprint, request, jsonify | |
| from werkzeug.security import generate_password_hash, check_password_hash | |
| from .middleware import create_token, require_auth | |
| from .models import User | |
| from extensions import db | |
| auth_bp = Blueprint("auth", __name__, url_prefix="/auth") | |
| @auth_bp.post("/register") | |
| def register(): | |
| data = request.get_json(force=True) | |
| email = data.get("email", "").strip().lower() | |
| password = data.get("password", "") | |
| if not email or not password: | |
| return jsonify({"error": "Email and password required"}), 400 | |
| if len(password) < 8: | |
| return jsonify({"error": "Password must be at least 8 characters"}), 400 | |
| if User.query.filter_by(email=email).first(): | |
| return jsonify({"error": "Email already registered"}), 409 | |
| user = User( | |
| email=email, | |
| password_hash=generate_password_hash(password), | |
| role="user", | |
| ) | |
| db.session.add(user) | |
| db.session.commit() | |
| token = create_token(user.id, user.role) | |
| return jsonify({"token": token, "user": user.to_dict()}), 201 | |
| @auth_bp.post("/login") | |
| def login(): | |
| data = request.get_json(force=True) | |
| email = data.get("email", "").strip().lower() | |
| password = data.get("password", "") | |
| user = User.query.filter_by(email=email).first() | |
| if not user or not check_password_hash(user.password_hash, password): | |
| return jsonify({"error": "Invalid credentials"}), 401 | |
| token = create_token(user.id, user.role) | |
| return jsonify({"token": token, "user": user.to_dict()}) | |
| @auth_bp.get("/me") | |
| @require_auth | |
| def me(): | |
| from flask import g | |
| user = User.query.get(int(g.user["sub"])) | |
| if not user: | |
| return jsonify({"error": "User not found"}), 404 | |
| return jsonify({"user": user.to_dict()}) | |
| @auth_bp.post("/refresh") | |
| @require_auth | |
| def refresh(): | |
| from flask import g | |
| new_token = create_token(int(g.user["sub"]), g.user.get("role", "user")) | |
| return jsonify({"token": new_token}) | |
| ''', | |
| "auth/models.py": '''from datetime import datetime, timezone | |
| from extensions import db | |
| class User(db.Model): | |
| __tablename__ = "users" | |
| id = db.Column(db.Integer, primary_key=True) | |
| email = db.Column(db.String(255), unique=True, nullable=False, index=True) | |
| password_hash = db.Column(db.String(512), nullable=False) | |
| role = db.Column(db.String(50), default="user", nullable=False) | |
| is_active = db.Column(db.Boolean, default=True, nullable=False) | |
| created_at = db.Column(db.DateTime, default=lambda: datetime.now(timezone.utc)) | |
| last_login = db.Column(db.DateTime, nullable=True) | |
| def to_dict(self): | |
| return { | |
| "id": self.id, | |
| "email": self.email, | |
| "role": self.role, | |
| "is_active": self.is_active, | |
| "created_at": self.created_at.isoformat(), | |
| } | |
| def __repr__(self): | |
| return f"<User {self.email}>" | |
| ''', | |
| "auth/__init__.py": "from .routes import auth_bp\n__all__ = ['auth_bp']\n", | |
| } | |
| }, | |
| "api": { | |
| "description": "Full REST API scaffold with Flask blueprints", | |
| "tags": ["api", "rest", "flask", "blueprint", "backend"], | |
| "files": { | |
| "api/app.py": '''import os | |
| from flask import Flask, jsonify | |
| from flask_cors import CORS | |
| from extensions import db | |
| from auth import auth_bp | |
| def create_app(config: dict = None) -> Flask: | |
| app = Flask(__name__) | |
| app.config["SQLALCHEMY_DATABASE_URI"] = os.getenv( | |
| "DATABASE_URL", "sqlite:///vitalis.db" | |
| ) | |
| app.config["SQLALCHEMY_TRACK_MODIFICATIONS"] = False | |
| app.config["SECRET_KEY"] = os.getenv("SECRET_KEY", "dev-secret-change-me") | |
| if config: | |
| app.config.update(config) | |
| CORS(app, resources={r"/api/*": {"origins": "*"}}) | |
| db.init_app(app) | |
| # Register blueprints | |
| app.register_blueprint(auth_bp) | |
| @app.errorhandler(404) | |
| def not_found(e): | |
| return jsonify({"error": "Not found"}), 404 | |
| @app.errorhandler(500) | |
| def server_error(e): | |
| return jsonify({"error": "Internal server error"}), 500 | |
| @app.get("/health") | |
| def health(): | |
| return jsonify({"status": "ok", "service": "vitalis-api"}) | |
| with app.app_context(): | |
| db.create_all() | |
| return app | |
| if __name__ == "__main__": | |
| app = create_app() | |
| app.run(debug=os.getenv("FLASK_DEBUG", "1") == "1", port=5000) | |
| ''', | |
| "api/extensions.py": "from flask_sqlalchemy import SQLAlchemy\ndb = SQLAlchemy()\n", | |
| "api/requirements.txt": ( | |
| "flask>=3.0\nflask-sqlalchemy>=3.0\nflask-cors>=4.0\n" | |
| "pyjwt>=2.8\nwerkzeug>=3.0\npython-dotenv>=1.0\n" | |
| ), | |
| "api/.env.example": ( | |
| "DATABASE_URL=sqlite:///vitalis.db\n" | |
| "JWT_SECRET=change-this-to-something-long-and-random\n" | |
| "SECRET_KEY=another-secret-change-this\n" | |
| "FLASK_DEBUG=1\n" | |
| ), | |
| } | |
| }, | |
| "react-app": { | |
| "description": "React app with auth, routing, and API client", | |
| "tags": ["react", "frontend", "spa", "auth", "routing"], | |
| "files": { | |
| "frontend/src/api/client.js": '''const BASE_URL = process.env.REACT_APP_API_URL || "http://localhost:5000"; | |
| class APIClient { | |
| constructor() { | |
| this.base = BASE_URL; | |
| } | |
| _headers(extra = {}) { | |
| const token = localStorage.getItem("token"); | |
| return { | |
| "Content-Type": "application/json", | |
| ...(token ? { Authorization: `Bearer ${token}` } : {}), | |
| ...extra, | |
| }; | |
| } | |
| async _fetch(path, opts = {}) { | |
| const res = await fetch(`${this.base}${path}`, { | |
| ...opts, | |
| headers: this._headers(opts.headers), | |
| }); | |
| const data = await res.json().catch(() => ({})); | |
| if (!res.ok) throw new Error(data.error || `HTTP ${res.status}`); | |
| return data; | |
| } | |
| get(path) { return this._fetch(path); } | |
| post(path, body) { return this._fetch(path, { method: "POST", body: JSON.stringify(body) }); } | |
| put(path, body) { return this._fetch(path, { method: "PUT", body: JSON.stringify(body) }); } | |
| delete(path) { return this._fetch(path, { method: "DELETE" }); } | |
| } | |
| export const api = new APIClient(); | |
| export default api; | |
| ''', | |
| "frontend/src/hooks/useAuth.js": '''import { useState, useEffect, useCallback } from "react"; | |
| import api from "../api/client"; | |
| export const useAuth = () => { | |
| const [user, setUser] = useState(null); | |
| const [loading, setLoading] = useState(true); | |
| const [error, setError] = useState(null); | |
| const fetchMe = useCallback(async () => { | |
| const token = localStorage.getItem("token"); | |
| if (!token) { setLoading(false); return; } | |
| try { | |
| const { user } = await api.get("/auth/me"); | |
| setUser(user); | |
| } catch { | |
| localStorage.removeItem("token"); | |
| } finally { | |
| setLoading(false); | |
| } | |
| }, []); | |
| useEffect(() => { fetchMe(); }, [fetchMe]); | |
| const login = async (email, password) => { | |
| setError(null); | |
| try { | |
| const { token, user } = await api.post("/auth/login", { email, password }); | |
| localStorage.setItem("token", token); | |
| setUser(user); | |
| return user; | |
| } catch (err) { | |
| setError(err.message); | |
| throw err; | |
| } | |
| }; | |
| const register = async (email, password) => { | |
| setError(null); | |
| try { | |
| const { token, user } = await api.post("/auth/register", { email, password }); | |
| localStorage.setItem("token", token); | |
| setUser(user); | |
| return user; | |
| } catch (err) { | |
| setError(err.message); | |
| throw err; | |
| } | |
| }; | |
| const logout = () => { | |
| localStorage.removeItem("token"); | |
| setUser(null); | |
| }; | |
| return { user, loading, error, login, register, logout, isAuthenticated: !!user }; | |
| }; | |
| export default useAuth; | |
| ''', | |
| "frontend/src/components/PrivateRoute.jsx": '''import { Navigate } from "react-router-dom"; | |
| import useAuth from "../hooks/useAuth"; | |
| const PrivateRoute = ({ children, requiredRole }) => { | |
| const { user, loading, isAuthenticated } = useAuth(); | |
| if (loading) return <div className="loading">Loading...</div>; | |
| if (!isAuthenticated) return <Navigate to="/login" replace />; | |
| if (requiredRole && user?.role !== requiredRole) return <Navigate to="/" replace />; | |
| return children; | |
| }; | |
| export default PrivateRoute; | |
| ''', | |
| } | |
| }, | |
| "agent": { | |
| "description": "Autonomous agent loop with memory and tool use", | |
| "tags": ["agent", "autonomous", "loop", "tools", "memory"], | |
| "files": { | |
| "agents/base_agent.py": '''#!/usr/bin/env python3 | |
| """ | |
| Vitalis Base Agent | |
| Self-directed execution loop with memory, tools, and self-correction. | |
| """ | |
| import time | |
| from abc import ABC, abstractmethod | |
| from typing import Any, Dict, List, Optional | |
| from pathlib import Path | |
| import json | |
| class BaseTool: | |
| name: str = "base_tool" | |
| description: str = "Override this" | |
| def run(self, **kwargs) -> Any: | |
| raise NotImplementedError | |
| class AgentMemory: | |
| def __init__(self, path: Optional[Path] = None): | |
| self.path = path or (Path.home() / ".vitalis_workspace" / "agent_memory.json") | |
| self._store: List[Dict] = self._load() | |
| def _load(self) -> List[Dict]: | |
| if self.path.exists(): | |
| try: | |
| return json.loads(self.path.read_text()) | |
| except Exception: | |
| return [] | |
| return [] | |
| def add(self, role: str, content: str): | |
| self._store.append({"role": role, "content": content, "ts": time.time()}) | |
| self.path.parent.mkdir(parents=True, exist_ok=True) | |
| self.path.write_text(json.dumps(self._store[-200:], indent=2)) | |
| def recent(self, n: int = 10) -> List[Dict]: | |
| return self._store[-n:] | |
| def clear(self): | |
| self._store = [] | |
| if self.path.exists(): | |
| self.path.unlink() | |
| class BaseAgent(ABC): | |
| MAX_STEPS = 20 | |
| STEP_DELAY = 0.5 | |
| def __init__(self, name: str = "VitalisAgent"): | |
| self.name = name | |
| self.memory = AgentMemory() | |
| self.tools: Dict[str, BaseTool] = {} | |
| self._steps = 0 | |
| self._running = False | |
| def register_tool(self, tool: BaseTool): | |
| self.tools[tool.name] = tool | |
| print(f"[AGENT:{self.name}] Tool registered: {tool.name}") | |
| def use_tool(self, tool_name: str, **kwargs) -> Any: | |
| tool = self.tools.get(tool_name) | |
| if not tool: | |
| return f"ERROR: Tool '{tool_name}' not found. Available: {list(self.tools.keys())}" | |
| try: | |
| result = tool.run(**kwargs) | |
| self.memory.add("tool", f"{tool_name}({kwargs}) β {str(result)[:200]}") | |
| return result | |
| except Exception as e: | |
| err = f"Tool '{tool_name}' failed: {e}" | |
| self.memory.add("error", err) | |
| return err | |
| @abstractmethod | |
| def think(self, state: Dict) -> Dict: | |
| """Override: decide next action given current state.""" | |
| pass | |
| @abstractmethod | |
| def act(self, decision: Dict) -> Any: | |
| """Override: execute the decided action.""" | |
| pass | |
| @abstractmethod | |
| def is_done(self, state: Dict) -> bool: | |
| """Override: return True when goal is reached.""" | |
| pass | |
| def run(self, goal: str, initial_state: Optional[Dict] = None) -> Any: | |
| print(f"[AGENT:{self.name}] Starting. Goal: {goal}") | |
| self.memory.add("system", f"Goal: {goal}") | |
| state = initial_state or {"goal": goal, "step": 0, "results": []} | |
| self._running = True | |
| self._steps = 0 | |
| last_result = None | |
| while self._running and self._steps < self.MAX_STEPS: | |
| if self.is_done(state): | |
| print(f"[AGENT:{self.name}] Goal reached in {self._steps} steps.") | |
| break | |
| try: | |
| decision = self.think(state) | |
| result = self.act(decision) | |
| last_result = result | |
| state["step"] = self._steps | |
| state["results"].append({"step": self._steps, "decision": decision, "result": str(result)[:300]}) | |
| self.memory.add("assistant", f"Step {self._steps}: {decision} β {str(result)[:150]}") | |
| except Exception as e: | |
| print(f"[AGENT:{self.name}] Step {self._steps} error: {e}") | |
| self.memory.add("error", str(e)) | |
| state["last_error"] = str(e) | |
| self._steps += 1 | |
| time.sleep(self.STEP_DELAY) | |
| if self._steps >= self.MAX_STEPS: | |
| print(f"[AGENT:{self.name}] Max steps reached ({self.MAX_STEPS}).") | |
| self._running = False | |
| return last_result | |
| def stop(self): | |
| self._running = False | |
| print(f"[AGENT:{self.name}] Stopped.") | |
| ''', | |
| } | |
| }, | |
| } | |
| # ββ INTENT PARSER βββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| INTENT_MAP: Dict[str, List[str]] = { | |
| "auth": ["auth", "authentication", "jwt", "login", "register", "token"], | |
| "api": ["api", "rest", "backend", "server", "flask", "express", "routes"], | |
| "react-app": ["react", "frontend", "spa", "ui", "component", "hooks"], | |
| "agent": ["agent", "autonomous", "loop", "tool", "self-directed"], | |
| } | |
| def parse_intent(prompt: str) -> Optional[str]: | |
| """Map a natural-language prompt to a generator key.""" | |
| p = prompt.lower() | |
| best_key, best_score = None, 0 | |
| for key, keywords in INTENT_MAP.items(): | |
| score = sum(1 for kw in keywords if kw in p) | |
| if score > best_score: | |
| best_score, best_key = score, key | |
| return best_key if best_score > 0 else None | |
| # ββ CODEGEN ENGINE βββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| class CodegenEngine: | |
| """ | |
| Turns intent into working code files. | |
| No stubs. No TODOs. Real output. | |
| """ | |
| def __init__(self, output_root: Optional[str] = None): | |
| self.output_root = Path(output_root or os.getcwd()) | |
| self.log_path = Path.home() / ".vitalis_workspace" / "codegen_log.json" | |
| self.log_path.parent.mkdir(parents=True, exist_ok=True) | |
| self._log: List[Dict] = [] | |
| def generate( | |
| self, | |
| intent: str, | |
| dry_run: bool = False, | |
| variables: Optional[Dict[str, str]] = None, | |
| ) -> Dict: | |
| """ | |
| Generate code from intent string. | |
| """ | |
| key = parse_intent(intent) | |
| if key is None: | |
| key = intent.strip().lower().replace(" ", "-") | |
| gen = GENERATORS.get(key) | |
| if gen is None: | |
| available = list(GENERATORS.keys()) | |
| return { | |
| "error": f"No generator for '{intent}'", | |
| "available": available, | |
| "hint": f"Try one of: {', '.join(available)}", | |
| } | |
| written: List[str] = [] | |
| skipped: List[str] = [] | |
| for rel_path, content in gen["files"].items(): | |
| if variables: | |
| for k, v in variables.items(): | |
| content = content.replace(f"{{{k}}}", v) | |
| full_path = self.output_root / rel_path | |
| if dry_run: | |
| print(f"\n{'β'*60}") | |
| print(f"FILE: {rel_path}") | |
| print('β'*60) | |
| print(content[:800] + ("\n..." if len(content) > 800 else "")) | |
| written.append(rel_path) | |
| continue | |
| if full_path.exists(): | |
| skipped.append(rel_path) | |
| print(f"[CODEGEN] Skip (exists): {rel_path}") | |
| continue | |
| full_path.parent.mkdir(parents=True, exist_ok=True) | |
| full_path.write_text(content, encoding="utf-8") | |
| written.append(rel_path) | |
| print(f"[CODEGEN] β Written: {rel_path}") | |
| result = { | |
| "key": key, | |
| "description": gen["description"], | |
| "files": written, | |
| "skipped": skipped, | |
| "output_root": str(self.output_root), | |
| "dry_run": dry_run, | |
| } | |
| self._log.append({"ts": time.time(), **result}) | |
| self.log_path.write_text(json.dumps(self._log[-100:], indent=2)) | |
| return result | |
| def generate_snippet(self, intent: str, variables: Optional[Dict[str, str]] = None) -> str: | |
| """ | |
| Return a single code snippet for the intent (no file writing). | |
| Useful for inline IDE suggestions. | |
| """ | |
| from src.knowledge_seeder import KnowledgeSeeder, KNOWLEDGE_SEEDS | |
| seeder = KnowledgeSeeder() | |
| results = seeder.search(intent, top_k=1) | |
| if results: | |
| template = results[0].get("template", "") | |
| if variables: | |
| for k, v in variables.items(): | |
| template = template.replace(f"{{{k}}}", v) | |
| return template | |
| for key, seed in KNOWLEDGE_SEEDS.items(): | |
| if any(tag in intent.lower() for tag in seed.get("tags", [])): | |
| template = seed["template"] | |
| if variables: | |
| for k, v in variables.items(): | |
| template = template.replace(f"{{{k}}}", v) | |
| return template | |
| return f"# No snippet found for: {intent}\n# Try: auth, api, react, async, websocket, agent\n" | |
| def list_generators(self) -> List[Dict]: | |
| """List all available generators.""" | |
| return [ | |
| {"key": k, "description": v["description"], "tags": v.get("tags", []), | |
| "files": list(v["files"].keys())} | |
| for k, v in GENERATORS.items() | |
| ] | |
| def add_generator(self, key: str, description: str, files: Dict[str, str], tags: List[str] = None): | |
| """Register a custom generator at runtime.""" | |
| GENERATORS[key] = {"description": description, "files": files, "tags": tags or []} | |
| print(f"[CODEGEN] Generator registered: {key}") | |
| if __name__ == "__main__": | |
| import sys | |
| engine = CodegenEngine() | |
| if len(sys.argv) < 2: | |
| print("Usage: python3 -m src.codegen_engine <command> [args]") | |
| print("\nCommands:") | |
| print(" list β list all generators") | |
| print(" generate <intent> β generate files for intent") | |
| print(" preview <intent> β dry-run, print files without writing") | |
| print(" snippet <intent> β print a single code snippet") | |
| sys.exit(0) | |
| cmd = sys.argv[1] | |
| if cmd == "list": | |
| for g in engine.list_generators(): | |
| print(f"\nββ {g['key']} ββ") | |
| print(f" {g['description']}") | |
| print(f" Files: {', '.join(g['files'])}") | |
| print(f" Tags: {', '.join(g['tags'])}") | |
| elif cmd == "generate" and len(sys.argv) > 2: | |
| intent = " ".join(sys.argv[2:]) | |
| result = engine.generate(intent) | |
| if "error" in result: | |
| print(f"[!] {result['error']}") | |
| print(f" {result.get('hint', '')}") | |
| else: | |
| print(f"\n[β] Generated {len(result['files'])} files for: {result['key']}") | |
| elif cmd == "preview" and len(sys.argv) > 2: | |
| intent = " ".join(sys.argv[2:]) | |
| engine.generate(intent, dry_run=True) | |
| elif cmd == "snippet" and len(sys.argv) > 2: | |
| intent = " ".join(sys.argv[2:]) | |
| print(engine.generate_snippet(intent)) | |
| else: | |
| print(f"Unknown command: {cmd}") | |