"""Bee Autopilot — Autonomous Self-Improvement Orchestrator. Runs continuously: 1. Transfers weights from pretrained models (bootstrap) 2. Activates LoRA domain adapters 3. Generates synthetic training data via self-play 4. Trains adapters on synthetic + real data 5. Evaluates and swaps in better adapters 6. Saves checkpoints 7. Repeats This is the "brain stem" of Bee — it never stops learning. """ import argparse import json import logging import os import sys import time from pathlib import Path import torch import torch.nn.functional as F from datasets import load_dataset from transformers import AutoModelForCausalLM, AutoTokenizer sys.path.insert(0, str(Path(__file__).resolve().parent.parent)) from bee.register import register from bee.config import BeeConfig from bee.modeling_bee import BeeForCausalLM from bee.lora_adapter import DomainLoRAManager, LoRAConfig from bee.self_play import SelfPlayEngine from bee.weight_transfer import transfer_weights # Quantum-enhanced training sys.path.insert(0, str(Path(__file__).resolve().parent.parent / "bee")) try: from bee.quantum_trainer import QuantumEnhancedTrainer, QuantumHyperparams from bee.quantum_ibm import BeeIBMQuantumClient QUANTUM_AVAILABLE = True except Exception: QuantumEnhancedTrainer = None QUANTUM_AVAILABLE = False logging.basicConfig(level=logging.INFO, format="%(asctime)s | %(levelname)s | %(name)s | %(message)s") logger = logging.getLogger("bee.autopilot") class Autopilot: """Autonomous training loop for Bee.""" def __init__( self, model: BeeForCausalLM, tokenizer: AutoTokenizer, device: str = "cpu", domains: list = None, lora_config: LoRAConfig = None, checkpoint_dir: str = "./autopilot_checkpoints", use_quantum: bool = False, # Default OFF — IBM free tier = ~10 min/month ): self.model = model self.tokenizer = tokenizer self.device = device self.domains = domains or ["general", "programming", "math", "science"] self.lora_config = lora_config or LoRAConfig(r=8, alpha=16, dropout=0.05) self.checkpoint_dir = checkpoint_dir os.makedirs(checkpoint_dir, exist_ok=True) # Quantum is DISABLED by default — user must explicitly pass use_quantum=True # IBM free tier = ~10 min/month. Auto-submission wastes this precious resource. self.use_quantum = use_quantum and QUANTUM_AVAILABLE self._quantum_explicitly_requested = use_quantum self.quantum_trainer: QuantumEnhancedTrainer | None = None if self.use_quantum: try: self.quantum_trainer = QuantumEnhancedTrainer( model=model, device=device, ) logger.info( "Quantum-enhanced training ENABLED — " "IBM Quantum Heron r2 (156 qubits, 15mK). " "NOTE: ~10 min free tier/month — each job uses 10-60s" ) except Exception as e: logger.warning("Quantum trainer failed to init: %s", e) self.use_quantum = False else: if self._quantum_explicitly_requested and not QUANTUM_AVAILABLE: logger.warning( "Quantum requested but unavailable (qiskit/ibm_runtime not installed)" ) logger.info("Quantum-enhanced training DISABLED (pass use_quantum=True to enable)") self.lora_manager = DomainLoRAManager(model, self.lora_config) for domain in self.domains: self.lora_manager.add_adapter(domain) self.self_play = SelfPlayEngine( model=model, tokenizer=tokenizer, device=device, max_new_tokens=128, temperature=0.8, ) self.step_count = 0 self.interaction_buffer: list = [] # Real user interactions self.loss_history: list = [] self.val_loss_history: list = [] def bootstrap_from_pretrained(self, source_id: str = "HuggingFaceTB/SmolLM2-135M"): """Transfer weights from a pretrained model.""" logger.info("Bootstrapping from %s", source_id) # Re-build model with compatible config cfg = BeeConfig( vocab_size=self.tokenizer.vocab_size, hidden_size=512, num_hidden_layers=8, num_attention_heads=8, intermediate_size=1024, max_position_embeddings=2048, ) self.model = transfer_weights(source_id, cfg, self.device) self.self_play.model = self.model # Quantum-enhanced: re-initialize with certified quantum randomness if self.use_quantum and self.quantum_trainer: logger.info("Applying quantum random weight initialization...") n_layers = self.quantum_trainer.quantum_initialize_model() logger.info("Quantum-initialized %d layers via IBM hardware", n_layers) logger.info("Bootstrap complete") def train_domain_adapter( self, domain: str, num_steps: int = 50, batch_size: int = 2, learning_rate: float = 5e-4, use_synthetic: bool = True, ) -> float: """Train a domain LoRA adapter with quantum enhancements.""" self.lora_manager.activate_domain(domain) # Quantum HPO: optimize hyperparameters once at startup hparams = None if self.use_quantum and self.quantum_trainer and self.step_count == 0: logger.info("Running quantum hyperparameter optimization (QAOA)...") try: hparams = self.quantum_trainer.optimize_hyperparameters() logger.info( "Quantum-optimized: rank=%d lr=%.0e batch=%d dropout=%.1f wd=%.2f", hparams.lora_rank, hparams.learning_rate, hparams.batch_size, hparams.dropout, hparams.weight_decay, ) learning_rate = hparams.learning_rate batch_size = hparams.batch_size except Exception as e: logger.warning("Quantum HPO failed (rate limit?), using defaults: %s", e) # Collect only adapter parameters for training params_to_train = [] for name, module in self.model.named_modules(): if domain in str(name) or any( hasattr(module, attr) for attr in ["lora_A", "lora_B"] ): for p in module.parameters(): if p.requires_grad: params_to_train.append(p) # Fallback: find all LoRA params if not params_to_train: params_to_train = [] for _, lora in self.lora_manager.adapters[domain].items(): params_to_train.extend([lora.lora_A, lora.lora_B]) optimizer = torch.optim.AdamW(params_to_train, lr=learning_rate) # Get training data texts = [] if use_synthetic: # Generate synthetic data via self-play contexts = self._get_contexts(domain, n=10) synthetic = self.self_play.generate_training_batch(contexts, batch_size=batch_size) for ex in synthetic: if ex["score"] > 0.5: texts.append(f"Q: {ex['question']}\nA: {ex['generated_answer']}") # Add real interactions texts.extend([f"Q: {q}\nA: {a}" for q, a in self.interaction_buffer[-50:]]) if not texts: logger.warning("No training data for domain %s, skipping", domain) return 0.0 # Training loop total_loss = 0.0 self.model.train() for step in range(num_steps): text = random.choice(texts) inputs = self.tokenizer(text, return_tensors="pt", truncation=True, max_length=256).to(self.device) if inputs["input_ids"].shape[1] < 4: continue optimizer.zero_grad() outputs = self.model(**inputs) logits = outputs.logits if hasattr(outputs, "logits") else outputs[0] shift_logits = logits[:, :-1, :].contiguous().view(-1, logits.size(-1)) shift_labels = inputs["input_ids"][:, 1:].contiguous().view(-1) loss = F.cross_entropy(shift_logits, shift_labels) loss.backward() # Quantum enhancement: add certified quantum noise to gradients # Applied once per training call (not per step) to respect IBM rate limits if self.use_quantum and self.quantum_trainer and step == 0: logger.info("Injecting quantum-certified gradient noise...") for param in params_to_train: if param.grad is not None and param.grad.numel() > 0: qnoise = self.quantum_trainer.qrng.randn_tensor( param.grad.shape, device=param.grad.device ) grad_std = param.grad.std().item() qnoise = qnoise * (grad_std * 0.01) param.grad.add_(qnoise) torch.nn.utils.clip_grad_norm_(params_to_train, 1.0) optimizer.step() total_loss += loss.item() avg_loss = total_loss / max(num_steps, 1) logger.info("Domain %s training: avg_loss=%.4f", domain, avg_loss) return avg_loss def _get_contexts(self, domain: str, n: int = 10) -> list: """Get document contexts for a domain.""" try: if domain == "programming": ds = load_dataset("codeparrot/github-code", "Python", split="train", streaming=True) elif domain == "math": ds = load_dataset("hendrycks/competition_math", split="train", streaming=True) else: ds = load_dataset("roneneldan/TinyStories", split="train", streaming=True) return [ex.get("text", ex.get("content", ""))[:500] for ex in ds.take(n)] except Exception as e: logger.warning("Failed to load domain data for %s: %s", domain, e) # Fallback: generate synthetic contexts return [f"This is a sample document about {domain}. " * 20 for _ in range(n)] def run_autonomous_loop( self, max_iterations: int = 1000, steps_per_iteration: int = 10, eval_every: int = 10, save_every: int = 20, ): """Main autonomous learning loop.""" logger.info("=" * 60) logger.info("BEE AUTOPILOT STARTING") logger.info("=" * 60) logger.info("Domains: %s", self.domains) logger.info("LoRA rank: %d", self.lora_config.r) logger.info("Max iterations: %d", max_iterations) for iteration in range(max_iterations): self.step_count = iteration logger.info("\n--- Iteration %d ---", iteration) # Train each domain adapter for domain in self.domains: loss = self.train_domain_adapter(domain, num_steps=steps_per_iteration) self.loss_history.append({ "iteration": iteration, "domain": domain, "loss": loss, }) # Evaluation if iteration % eval_every == 0: self._evaluate() # Save checkpoint if iteration % save_every == 0 and iteration > 0: self._save_checkpoint(iteration) # Brief pause to prevent overheating time.sleep(1) logger.info("Autopilot complete after %d iterations", max_iterations) self._save_checkpoint("final") def _evaluate(self): """Quick evaluation: generate text and track validation loss.""" self.model.eval() prompt = "The key to artificial intelligence is" inputs = self.tokenizer(prompt, return_tensors="pt").to(self.device) with torch.no_grad(): out = self.model.generate( **inputs, max_new_tokens=30, do_sample=True, temperature=0.8, pad_token_id=self.tokenizer.pad_token_id, ) generated = self.tokenizer.decode(out[0], skip_special_tokens=True) logger.info("Sample generation: %s", generated[:100]) # Track validation-like loss for quantum HPO feedback with torch.no_grad(): outputs = self.model(**inputs) logits = outputs.logits if hasattr(outputs, "logits") else outputs[0] shift_logits = logits[:, :-1, :].contiguous().view(-1, logits.size(-1)) shift_labels = inputs["input_ids"][:, 1:].contiguous().view(-1) val_loss = F.cross_entropy(shift_logits, shift_labels).item() self.val_loss_history.append(val_loss) if self.quantum_trainer: self.quantum_trainer.validation_history = self.val_loss_history logger.info("Validation loss: %.4f", val_loss) self.model.train() def _save_checkpoint(self, iteration): """Save model and adapters.""" ckpt_dir = os.path.join(self.checkpoint_dir, f"iter_{iteration}") os.makedirs(ckpt_dir, exist_ok=True) # Save base model self.model.save_pretrained(ckpt_dir) self.tokenizer.save_pretrained(ckpt_dir) # Save adapters for domain in self.domains: adapter_dir = os.path.join(ckpt_dir, f"adapter_{domain}") self.lora_manager.save_adapter(domain, adapter_dir) # Save training history with open(os.path.join(ckpt_dir, "history.json"), "w") as f: json.dump(self.loss_history, f, indent=2) logger.info("Checkpoint saved to %s", ckpt_dir) def add_interaction(self, prompt: str, response: str, feedback: float = 0.0): """Add a real user interaction to the training buffer.""" self.interaction_buffer.append((prompt, response, feedback)) if len(self.interaction_buffer) > 1000: self.interaction_buffer = self.interaction_buffer[-500:] logger.info("Added interaction (buffer size: %d)", len(self.interaction_buffer)) def main(): parser = argparse.ArgumentParser() parser.add_argument("--bootstrap", type=str, default="HuggingFaceTB/SmolLM2-135M", help="Pretrained model to bootstrap from") parser.add_argument("--device", type=str, default="mps" if torch.backends.mps.is_available() else "cpu") parser.add_argument("--max_iterations", type=int, default=100) parser.add_argument("--checkpoint_dir", type=str, default="./autopilot_checkpoints") parser.add_argument("--lora_r", type=int, default=8) parser.add_argument("--domains", nargs="+", default=["general", "programming", "math"]) args = parser.parse_args() register() # Tokenizer tokenizer = AutoTokenizer.from_pretrained(args.bootstrap, trust_remote_code=True) if tokenizer.pad_token is None: tokenizer.pad_token = tokenizer.eos_token # Load pretrained model directly (weight transfer to BeeForCausalLM is buggy) model = AutoModelForCausalLM.from_pretrained( args.bootstrap, trust_remote_code=True, torch_dtype=torch.float16 if args.device == "mps" else None, ).to(args.device) logger.info("Loaded pretrained model: %s", args.bootstrap) # Initialize autopilot autopilot = Autopilot( model=model, tokenizer=tokenizer, device=args.device, domains=args.domains, lora_config=LoRAConfig(r=args.lora_r, alpha=args.lora_r * 2), checkpoint_dir=args.checkpoint_dir, ) # Run autonomous loop try: autopilot.run_autonomous_loop(max_iterations=args.max_iterations) except KeyboardInterrupt: logger.info("Interrupted by user. Saving checkpoint...") autopilot._save_checkpoint("interrupted") if __name__ == "__main__": main()