#!/usr/bin/env python3 """ VITALIS CODEGEN ENGINE Real code generation. Not stubs. Not scaffolds. "scaffold auth" → complete JWT auth system with routes, middleware, models. """ import re, os, json, time from pathlib import Path from typing import Dict, List, Optional, Tuple # ── FULL CODE TEMPLATES ─────────────────────────────────────────────── # These generate working files, not stubs. GENERATORS: Dict[str, Dict] = { "auth": { "description": "Complete JWT authentication system", "tags": ["auth", "jwt", "login", "register", "security"], "files": { "auth/middleware.py": '''import jwt, os from datetime import datetime, timedelta, timezone from functools import wraps from flask import request, jsonify, g SECRET = os.getenv("JWT_SECRET", "CHANGE_THIS_SECRET") ALGORITHM = "HS256" def create_token(user_id: int, role: str = "user", hours: int = 24) -> str: now = datetime.now(timezone.utc) return jwt.encode( {"sub": str(user_id), "role": role, "iat": now, "exp": now + timedelta(hours=hours)}, SECRET, algorithm=ALGORITHM ) def decode_token(token: str) -> dict: return jwt.decode(token, SECRET, algorithms=[ALGORITHM]) def require_auth(f): @wraps(f) def decorated(*args, **kwargs): raw = request.headers.get("Authorization", "") token = raw.replace("Bearer ", "").strip() if not token: return jsonify({"error": "Unauthorized"}), 401 try: g.user = decode_token(token) except jwt.ExpiredSignatureError: return jsonify({"error": "Token expired"}), 401 except jwt.InvalidTokenError: return jsonify({"error": "Invalid token"}), 401 return f(*args, **kwargs) return decorated def require_role(role: str): def decorator(f): @wraps(f) def decorated(*args, **kwargs): if not hasattr(g, "user") or g.user.get("role") != role: return jsonify({"error": "Forbidden"}), 403 return f(*args, **kwargs) return decorated return decorator ''', "auth/routes.py": '''from flask import Blueprint, request, jsonify from werkzeug.security import generate_password_hash, check_password_hash from .middleware import create_token, require_auth from .models import User from extensions import db auth_bp = Blueprint("auth", __name__, url_prefix="/auth") @auth_bp.post("/register") def register(): data = request.get_json(force=True) email = data.get("email", "").strip().lower() password = data.get("password", "") if not email or not password: return jsonify({"error": "Email and password required"}), 400 if len(password) < 8: return jsonify({"error": "Password must be at least 8 characters"}), 400 if User.query.filter_by(email=email).first(): return jsonify({"error": "Email already registered"}), 409 user = User( email=email, password_hash=generate_password_hash(password), role="user", ) db.session.add(user) db.session.commit() token = create_token(user.id, user.role) return jsonify({"token": token, "user": user.to_dict()}), 201 @auth_bp.post("/login") def login(): data = request.get_json(force=True) email = data.get("email", "").strip().lower() password = data.get("password", "") user = User.query.filter_by(email=email).first() if not user or not check_password_hash(user.password_hash, password): return jsonify({"error": "Invalid credentials"}), 401 token = create_token(user.id, user.role) return jsonify({"token": token, "user": user.to_dict()}) @auth_bp.get("/me") @require_auth def me(): from flask import g user = User.query.get(int(g.user["sub"])) if not user: return jsonify({"error": "User not found"}), 404 return jsonify({"user": user.to_dict()}) @auth_bp.post("/refresh") @require_auth def refresh(): from flask import g new_token = create_token(int(g.user["sub"]), g.user.get("role", "user")) return jsonify({"token": new_token}) ''', "auth/models.py": '''from datetime import datetime, timezone from extensions import db class User(db.Model): __tablename__ = "users" id = db.Column(db.Integer, primary_key=True) email = db.Column(db.String(255), unique=True, nullable=False, index=True) password_hash = db.Column(db.String(512), nullable=False) role = db.Column(db.String(50), default="user", nullable=False) is_active = db.Column(db.Boolean, default=True, nullable=False) created_at = db.Column(db.DateTime, default=lambda: datetime.now(timezone.utc)) last_login = db.Column(db.DateTime, nullable=True) def to_dict(self): return { "id": self.id, "email": self.email, "role": self.role, "is_active": self.is_active, "created_at": self.created_at.isoformat(), } def __repr__(self): return f"" ''', "auth/__init__.py": "from .routes import auth_bp\n__all__ = ['auth_bp']\n", } }, "api": { "description": "Full REST API scaffold with Flask blueprints", "tags": ["api", "rest", "flask", "blueprint", "backend"], "files": { "api/app.py": '''import os from flask import Flask, jsonify from flask_cors import CORS from extensions import db from auth import auth_bp def create_app(config: dict = None) -> Flask: app = Flask(__name__) app.config["SQLALCHEMY_DATABASE_URI"] = os.getenv( "DATABASE_URL", "sqlite:///vitalis.db" ) app.config["SQLALCHEMY_TRACK_MODIFICATIONS"] = False app.config["SECRET_KEY"] = os.getenv("SECRET_KEY", "dev-secret-change-me") if config: app.config.update(config) CORS(app, resources={r"/api/*": {"origins": "*"}}) db.init_app(app) # Register blueprints app.register_blueprint(auth_bp) @app.errorhandler(404) def not_found(e): return jsonify({"error": "Not found"}), 404 @app.errorhandler(500) def server_error(e): return jsonify({"error": "Internal server error"}), 500 @app.get("/health") def health(): return jsonify({"status": "ok", "service": "vitalis-api"}) with app.app_context(): db.create_all() return app if __name__ == "__main__": app = create_app() app.run(debug=os.getenv("FLASK_DEBUG", "1") == "1", port=5000) ''', "api/extensions.py": "from flask_sqlalchemy import SQLAlchemy\ndb = SQLAlchemy()\n", "api/requirements.txt": ( "flask>=3.0\nflask-sqlalchemy>=3.0\nflask-cors>=4.0\n" "pyjwt>=2.8\nwerkzeug>=3.0\npython-dotenv>=1.0\n" ), "api/.env.example": ( "DATABASE_URL=sqlite:///vitalis.db\n" "JWT_SECRET=change-this-to-something-long-and-random\n" "SECRET_KEY=another-secret-change-this\n" "FLASK_DEBUG=1\n" ), } }, "react-app": { "description": "React app with auth, routing, and API client", "tags": ["react", "frontend", "spa", "auth", "routing"], "files": { "frontend/src/api/client.js": '''const BASE_URL = process.env.REACT_APP_API_URL || "http://localhost:5000"; class APIClient { constructor() { this.base = BASE_URL; } _headers(extra = {}) { const token = localStorage.getItem("token"); return { "Content-Type": "application/json", ...(token ? { Authorization: `Bearer ${token}` } : {}), ...extra, }; } async _fetch(path, opts = {}) { const res = await fetch(`${this.base}${path}`, { ...opts, headers: this._headers(opts.headers), }); const data = await res.json().catch(() => ({})); if (!res.ok) throw new Error(data.error || `HTTP ${res.status}`); return data; } get(path) { return this._fetch(path); } post(path, body) { return this._fetch(path, { method: "POST", body: JSON.stringify(body) }); } put(path, body) { return this._fetch(path, { method: "PUT", body: JSON.stringify(body) }); } delete(path) { return this._fetch(path, { method: "DELETE" }); } } export const api = new APIClient(); export default api; ''', "frontend/src/hooks/useAuth.js": '''import { useState, useEffect, useCallback } from "react"; import api from "../api/client"; export const useAuth = () => { const [user, setUser] = useState(null); const [loading, setLoading] = useState(true); const [error, setError] = useState(null); const fetchMe = useCallback(async () => { const token = localStorage.getItem("token"); if (!token) { setLoading(false); return; } try { const { user } = await api.get("/auth/me"); setUser(user); } catch { localStorage.removeItem("token"); } finally { setLoading(false); } }, []); useEffect(() => { fetchMe(); }, [fetchMe]); const login = async (email, password) => { setError(null); try { const { token, user } = await api.post("/auth/login", { email, password }); localStorage.setItem("token", token); setUser(user); return user; } catch (err) { setError(err.message); throw err; } }; const register = async (email, password) => { setError(null); try { const { token, user } = await api.post("/auth/register", { email, password }); localStorage.setItem("token", token); setUser(user); return user; } catch (err) { setError(err.message); throw err; } }; const logout = () => { localStorage.removeItem("token"); setUser(null); }; return { user, loading, error, login, register, logout, isAuthenticated: !!user }; }; export default useAuth; ''', "frontend/src/components/PrivateRoute.jsx": '''import { Navigate } from "react-router-dom"; import useAuth from "../hooks/useAuth"; const PrivateRoute = ({ children, requiredRole }) => { const { user, loading, isAuthenticated } = useAuth(); if (loading) return
Loading...
; if (!isAuthenticated) return ; if (requiredRole && user?.role !== requiredRole) return ; return children; }; export default PrivateRoute; ''', } }, "agent": { "description": "Autonomous agent loop with memory and tool use", "tags": ["agent", "autonomous", "loop", "tools", "memory"], "files": { "agents/base_agent.py": '''#!/usr/bin/env python3 """ Vitalis Base Agent Self-directed execution loop with memory, tools, and self-correction. """ import time from abc import ABC, abstractmethod from typing import Any, Dict, List, Optional from pathlib import Path import json class BaseTool: name: str = "base_tool" description: str = "Override this" def run(self, **kwargs) -> Any: raise NotImplementedError class AgentMemory: def __init__(self, path: Optional[Path] = None): self.path = path or (Path.home() / ".vitalis_workspace" / "agent_memory.json") self._store: List[Dict] = self._load() def _load(self) -> List[Dict]: if self.path.exists(): try: return json.loads(self.path.read_text()) except Exception: return [] return [] def add(self, role: str, content: str): self._store.append({"role": role, "content": content, "ts": time.time()}) self.path.parent.mkdir(parents=True, exist_ok=True) self.path.write_text(json.dumps(self._store[-200:], indent=2)) def recent(self, n: int = 10) -> List[Dict]: return self._store[-n:] def clear(self): self._store = [] if self.path.exists(): self.path.unlink() class BaseAgent(ABC): MAX_STEPS = 20 STEP_DELAY = 0.5 def __init__(self, name: str = "VitalisAgent"): self.name = name self.memory = AgentMemory() self.tools: Dict[str, BaseTool] = {} self._steps = 0 self._running = False def register_tool(self, tool: BaseTool): self.tools[tool.name] = tool print(f"[AGENT:{self.name}] Tool registered: {tool.name}") def use_tool(self, tool_name: str, **kwargs) -> Any: tool = self.tools.get(tool_name) if not tool: return f"ERROR: Tool '{tool_name}' not found. Available: {list(self.tools.keys())}" try: result = tool.run(**kwargs) self.memory.add("tool", f"{tool_name}({kwargs}) → {str(result)[:200]}") return result except Exception as e: err = f"Tool '{tool_name}' failed: {e}" self.memory.add("error", err) return err @abstractmethod def think(self, state: Dict) -> Dict: """Override: decide next action given current state.""" pass @abstractmethod def act(self, decision: Dict) -> Any: """Override: execute the decided action.""" pass @abstractmethod def is_done(self, state: Dict) -> bool: """Override: return True when goal is reached.""" pass def run(self, goal: str, initial_state: Optional[Dict] = None) -> Any: print(f"[AGENT:{self.name}] Starting. Goal: {goal}") self.memory.add("system", f"Goal: {goal}") state = initial_state or {"goal": goal, "step": 0, "results": []} self._running = True self._steps = 0 last_result = None while self._running and self._steps < self.MAX_STEPS: if self.is_done(state): print(f"[AGENT:{self.name}] Goal reached in {self._steps} steps.") break try: decision = self.think(state) result = self.act(decision) last_result = result state["step"] = self._steps state["results"].append({"step": self._steps, "decision": decision, "result": str(result)[:300]}) self.memory.add("assistant", f"Step {self._steps}: {decision} → {str(result)[:150]}") except Exception as e: print(f"[AGENT:{self.name}] Step {self._steps} error: {e}") self.memory.add("error", str(e)) state["last_error"] = str(e) self._steps += 1 time.sleep(self.STEP_DELAY) if self._steps >= self.MAX_STEPS: print(f"[AGENT:{self.name}] Max steps reached ({self.MAX_STEPS}).") self._running = False return last_result def stop(self): self._running = False print(f"[AGENT:{self.name}] Stopped.") ''', } }, } # ── INTENT PARSER ───────────────────────────────────────────────────── INTENT_MAP: Dict[str, List[str]] = { "auth": ["auth", "authentication", "jwt", "login", "register", "token"], "api": ["api", "rest", "backend", "server", "flask", "express", "routes"], "react-app": ["react", "frontend", "spa", "ui", "component", "hooks"], "agent": ["agent", "autonomous", "loop", "tool", "self-directed"], } def parse_intent(prompt: str) -> Optional[str]: """Map a natural-language prompt to a generator key.""" p = prompt.lower() best_key, best_score = None, 0 for key, keywords in INTENT_MAP.items(): score = sum(1 for kw in keywords if kw in p) if score > best_score: best_score, best_key = score, key return best_key if best_score > 0 else None # ── CODEGEN ENGINE ───────────────────────────────────────────────────── class CodegenEngine: """ Turns intent into working code files. No stubs. No TODOs. Real output. """ def __init__(self, output_root: Optional[str] = None): self.output_root = Path(output_root or os.getcwd()) self.log_path = Path.home() / ".vitalis_workspace" / "codegen_log.json" self.log_path.parent.mkdir(parents=True, exist_ok=True) self._log: List[Dict] = [] def generate( self, intent: str, dry_run: bool = False, variables: Optional[Dict[str, str]] = None, ) -> Dict: """ Generate code from intent string. """ key = parse_intent(intent) if key is None: key = intent.strip().lower().replace(" ", "-") gen = GENERATORS.get(key) if gen is None: available = list(GENERATORS.keys()) return { "error": f"No generator for '{intent}'", "available": available, "hint": f"Try one of: {', '.join(available)}", } written: List[str] = [] skipped: List[str] = [] for rel_path, content in gen["files"].items(): if variables: for k, v in variables.items(): content = content.replace(f"{{{k}}}", v) full_path = self.output_root / rel_path if dry_run: print(f"\n{'─'*60}") print(f"FILE: {rel_path}") print('─'*60) print(content[:800] + ("\n..." if len(content) > 800 else "")) written.append(rel_path) continue if full_path.exists(): skipped.append(rel_path) print(f"[CODEGEN] Skip (exists): {rel_path}") continue full_path.parent.mkdir(parents=True, exist_ok=True) full_path.write_text(content, encoding="utf-8") written.append(rel_path) print(f"[CODEGEN] ✓ Written: {rel_path}") result = { "key": key, "description": gen["description"], "files": written, "skipped": skipped, "output_root": str(self.output_root), "dry_run": dry_run, } self._log.append({"ts": time.time(), **result}) self.log_path.write_text(json.dumps(self._log[-100:], indent=2)) return result def generate_snippet(self, intent: str, variables: Optional[Dict[str, str]] = None) -> str: """ Return a single code snippet for the intent (no file writing). Useful for inline IDE suggestions. """ from src.knowledge_seeder import KnowledgeSeeder, KNOWLEDGE_SEEDS seeder = KnowledgeSeeder() results = seeder.search(intent, top_k=1) if results: template = results[0].get("template", "") if variables: for k, v in variables.items(): template = template.replace(f"{{{k}}}", v) return template for key, seed in KNOWLEDGE_SEEDS.items(): if any(tag in intent.lower() for tag in seed.get("tags", [])): template = seed["template"] if variables: for k, v in variables.items(): template = template.replace(f"{{{k}}}", v) return template return f"# No snippet found for: {intent}\n# Try: auth, api, react, async, websocket, agent\n" def list_generators(self) -> List[Dict]: """List all available generators.""" return [ {"key": k, "description": v["description"], "tags": v.get("tags", []), "files": list(v["files"].keys())} for k, v in GENERATORS.items() ] def add_generator(self, key: str, description: str, files: Dict[str, str], tags: List[str] = None): """Register a custom generator at runtime.""" GENERATORS[key] = {"description": description, "files": files, "tags": tags or []} print(f"[CODEGEN] Generator registered: {key}") if __name__ == "__main__": import sys engine = CodegenEngine() if len(sys.argv) < 2: print("Usage: python3 -m src.codegen_engine [args]") print("\nCommands:") print(" list — list all generators") print(" generate — generate files for intent") print(" preview — dry-run, print files without writing") print(" snippet — print a single code snippet") sys.exit(0) cmd = sys.argv[1] if cmd == "list": for g in engine.list_generators(): print(f"\n── {g['key']} ──") print(f" {g['description']}") print(f" Files: {', '.join(g['files'])}") print(f" Tags: {', '.join(g['tags'])}") elif cmd == "generate" and len(sys.argv) > 2: intent = " ".join(sys.argv[2:]) result = engine.generate(intent) if "error" in result: print(f"[!] {result['error']}") print(f" {result.get('hint', '')}") else: print(f"\n[✓] Generated {len(result['files'])} files for: {result['key']}") elif cmd == "preview" and len(sys.argv) > 2: intent = " ".join(sys.argv[2:]) engine.generate(intent, dry_run=True) elif cmd == "snippet" and len(sys.argv) > 2: intent = " ".join(sys.argv[2:]) print(engine.generate_snippet(intent)) else: print(f"Unknown command: {cmd}")