bee / scripts /autopilot.py
ceocxx's picture
chore: deploy Bee API backend (bee/, Dockerfile, requirements)
db82745 verified
"""Bee Autopilot — Autonomous Self-Improvement Orchestrator.
Runs continuously:
1. Transfers weights from pretrained models (bootstrap)
2. Activates LoRA domain adapters
3. Generates synthetic training data via self-play
4. Trains adapters on synthetic + real data
5. Evaluates and swaps in better adapters
6. Saves checkpoints
7. Repeats
This is the "brain stem" of Bee — it never stops learning.
"""
import argparse
import json
import logging
import os
import sys
import time
from pathlib import Path
import torch
import torch.nn.functional as F
from datasets import load_dataset
from transformers import AutoModelForCausalLM, AutoTokenizer
sys.path.insert(0, str(Path(__file__).resolve().parent.parent))
from bee.register import register
from bee.config import BeeConfig
from bee.modeling_bee import BeeForCausalLM
from bee.lora_adapter import DomainLoRAManager, LoRAConfig
from bee.self_play import SelfPlayEngine
from bee.weight_transfer import transfer_weights
# Quantum-enhanced training
sys.path.insert(0, str(Path(__file__).resolve().parent.parent / "bee"))
try:
from bee.quantum_trainer import QuantumEnhancedTrainer, QuantumHyperparams
from bee.quantum_ibm import BeeIBMQuantumClient
QUANTUM_AVAILABLE = True
except Exception:
QuantumEnhancedTrainer = None
QUANTUM_AVAILABLE = False
logging.basicConfig(level=logging.INFO, format="%(asctime)s | %(levelname)s | %(name)s | %(message)s")
logger = logging.getLogger("bee.autopilot")
class Autopilot:
"""Autonomous training loop for Bee."""
def __init__(
self,
model: BeeForCausalLM,
tokenizer: AutoTokenizer,
device: str = "cpu",
domains: list = None,
lora_config: LoRAConfig = None,
checkpoint_dir: str = "./autopilot_checkpoints",
use_quantum: bool = False, # Default OFF — IBM free tier = ~10 min/month
):
self.model = model
self.tokenizer = tokenizer
self.device = device
self.domains = domains or ["general", "programming", "math", "science"]
self.lora_config = lora_config or LoRAConfig(r=8, alpha=16, dropout=0.05)
self.checkpoint_dir = checkpoint_dir
os.makedirs(checkpoint_dir, exist_ok=True)
# Quantum is DISABLED by default — user must explicitly pass use_quantum=True
# IBM free tier = ~10 min/month. Auto-submission wastes this precious resource.
self.use_quantum = use_quantum and QUANTUM_AVAILABLE
self._quantum_explicitly_requested = use_quantum
self.quantum_trainer: QuantumEnhancedTrainer | None = None
if self.use_quantum:
try:
self.quantum_trainer = QuantumEnhancedTrainer(
model=model,
device=device,
)
logger.info(
"Quantum-enhanced training ENABLED — "
"IBM Quantum Heron r2 (156 qubits, 15mK). "
"NOTE: ~10 min free tier/month — each job uses 10-60s"
)
except Exception as e:
logger.warning("Quantum trainer failed to init: %s", e)
self.use_quantum = False
else:
if self._quantum_explicitly_requested and not QUANTUM_AVAILABLE:
logger.warning(
"Quantum requested but unavailable (qiskit/ibm_runtime not installed)"
)
logger.info("Quantum-enhanced training DISABLED (pass use_quantum=True to enable)")
self.lora_manager = DomainLoRAManager(model, self.lora_config)
for domain in self.domains:
self.lora_manager.add_adapter(domain)
self.self_play = SelfPlayEngine(
model=model,
tokenizer=tokenizer,
device=device,
max_new_tokens=128,
temperature=0.8,
)
self.step_count = 0
self.interaction_buffer: list = [] # Real user interactions
self.loss_history: list = []
self.val_loss_history: list = []
def bootstrap_from_pretrained(self, source_id: str = "HuggingFaceTB/SmolLM2-135M"):
"""Transfer weights from a pretrained model."""
logger.info("Bootstrapping from %s", source_id)
# Re-build model with compatible config
cfg = BeeConfig(
vocab_size=self.tokenizer.vocab_size,
hidden_size=512,
num_hidden_layers=8,
num_attention_heads=8,
intermediate_size=1024,
max_position_embeddings=2048,
)
self.model = transfer_weights(source_id, cfg, self.device)
self.self_play.model = self.model
# Quantum-enhanced: re-initialize with certified quantum randomness
if self.use_quantum and self.quantum_trainer:
logger.info("Applying quantum random weight initialization...")
n_layers = self.quantum_trainer.quantum_initialize_model()
logger.info("Quantum-initialized %d layers via IBM hardware", n_layers)
logger.info("Bootstrap complete")
def train_domain_adapter(
self,
domain: str,
num_steps: int = 50,
batch_size: int = 2,
learning_rate: float = 5e-4,
use_synthetic: bool = True,
) -> float:
"""Train a domain LoRA adapter with quantum enhancements."""
self.lora_manager.activate_domain(domain)
# Quantum HPO: optimize hyperparameters once at startup
hparams = None
if self.use_quantum and self.quantum_trainer and self.step_count == 0:
logger.info("Running quantum hyperparameter optimization (QAOA)...")
try:
hparams = self.quantum_trainer.optimize_hyperparameters()
logger.info(
"Quantum-optimized: rank=%d lr=%.0e batch=%d dropout=%.1f wd=%.2f",
hparams.lora_rank, hparams.learning_rate,
hparams.batch_size, hparams.dropout, hparams.weight_decay,
)
learning_rate = hparams.learning_rate
batch_size = hparams.batch_size
except Exception as e:
logger.warning("Quantum HPO failed (rate limit?), using defaults: %s", e)
# Collect only adapter parameters for training
params_to_train = []
for name, module in self.model.named_modules():
if domain in str(name) or any(
hasattr(module, attr) for attr in ["lora_A", "lora_B"]
):
for p in module.parameters():
if p.requires_grad:
params_to_train.append(p)
# Fallback: find all LoRA params
if not params_to_train:
params_to_train = []
for _, lora in self.lora_manager.adapters[domain].items():
params_to_train.extend([lora.lora_A, lora.lora_B])
optimizer = torch.optim.AdamW(params_to_train, lr=learning_rate)
# Get training data
texts = []
if use_synthetic:
# Generate synthetic data via self-play
contexts = self._get_contexts(domain, n=10)
synthetic = self.self_play.generate_training_batch(contexts, batch_size=batch_size)
for ex in synthetic:
if ex["score"] > 0.5:
texts.append(f"Q: {ex['question']}\nA: {ex['generated_answer']}")
# Add real interactions
texts.extend([f"Q: {q}\nA: {a}" for q, a in self.interaction_buffer[-50:]])
if not texts:
logger.warning("No training data for domain %s, skipping", domain)
return 0.0
# Training loop
total_loss = 0.0
self.model.train()
for step in range(num_steps):
text = random.choice(texts)
inputs = self.tokenizer(text, return_tensors="pt", truncation=True, max_length=256).to(self.device)
if inputs["input_ids"].shape[1] < 4:
continue
optimizer.zero_grad()
outputs = self.model(**inputs)
logits = outputs.logits if hasattr(outputs, "logits") else outputs[0]
shift_logits = logits[:, :-1, :].contiguous().view(-1, logits.size(-1))
shift_labels = inputs["input_ids"][:, 1:].contiguous().view(-1)
loss = F.cross_entropy(shift_logits, shift_labels)
loss.backward()
# Quantum enhancement: add certified quantum noise to gradients
# Applied once per training call (not per step) to respect IBM rate limits
if self.use_quantum and self.quantum_trainer and step == 0:
logger.info("Injecting quantum-certified gradient noise...")
for param in params_to_train:
if param.grad is not None and param.grad.numel() > 0:
qnoise = self.quantum_trainer.qrng.randn_tensor(
param.grad.shape, device=param.grad.device
)
grad_std = param.grad.std().item()
qnoise = qnoise * (grad_std * 0.01)
param.grad.add_(qnoise)
torch.nn.utils.clip_grad_norm_(params_to_train, 1.0)
optimizer.step()
total_loss += loss.item()
avg_loss = total_loss / max(num_steps, 1)
logger.info("Domain %s training: avg_loss=%.4f", domain, avg_loss)
return avg_loss
def _get_contexts(self, domain: str, n: int = 10) -> list:
"""Get document contexts for a domain."""
try:
if domain == "programming":
ds = load_dataset("codeparrot/github-code", "Python", split="train", streaming=True)
elif domain == "math":
ds = load_dataset("hendrycks/competition_math", split="train", streaming=True)
else:
ds = load_dataset("roneneldan/TinyStories", split="train", streaming=True)
return [ex.get("text", ex.get("content", ""))[:500] for ex in ds.take(n)]
except Exception as e:
logger.warning("Failed to load domain data for %s: %s", domain, e)
# Fallback: generate synthetic contexts
return [f"This is a sample document about {domain}. " * 20 for _ in range(n)]
def run_autonomous_loop(
self,
max_iterations: int = 1000,
steps_per_iteration: int = 10,
eval_every: int = 10,
save_every: int = 20,
):
"""Main autonomous learning loop."""
logger.info("=" * 60)
logger.info("BEE AUTOPILOT STARTING")
logger.info("=" * 60)
logger.info("Domains: %s", self.domains)
logger.info("LoRA rank: %d", self.lora_config.r)
logger.info("Max iterations: %d", max_iterations)
for iteration in range(max_iterations):
self.step_count = iteration
logger.info("\n--- Iteration %d ---", iteration)
# Train each domain adapter
for domain in self.domains:
loss = self.train_domain_adapter(domain, num_steps=steps_per_iteration)
self.loss_history.append({
"iteration": iteration,
"domain": domain,
"loss": loss,
})
# Evaluation
if iteration % eval_every == 0:
self._evaluate()
# Save checkpoint
if iteration % save_every == 0 and iteration > 0:
self._save_checkpoint(iteration)
# Brief pause to prevent overheating
time.sleep(1)
logger.info("Autopilot complete after %d iterations", max_iterations)
self._save_checkpoint("final")
def _evaluate(self):
"""Quick evaluation: generate text and track validation loss."""
self.model.eval()
prompt = "The key to artificial intelligence is"
inputs = self.tokenizer(prompt, return_tensors="pt").to(self.device)
with torch.no_grad():
out = self.model.generate(
**inputs,
max_new_tokens=30,
do_sample=True,
temperature=0.8,
pad_token_id=self.tokenizer.pad_token_id,
)
generated = self.tokenizer.decode(out[0], skip_special_tokens=True)
logger.info("Sample generation: %s", generated[:100])
# Track validation-like loss for quantum HPO feedback
with torch.no_grad():
outputs = self.model(**inputs)
logits = outputs.logits if hasattr(outputs, "logits") else outputs[0]
shift_logits = logits[:, :-1, :].contiguous().view(-1, logits.size(-1))
shift_labels = inputs["input_ids"][:, 1:].contiguous().view(-1)
val_loss = F.cross_entropy(shift_logits, shift_labels).item()
self.val_loss_history.append(val_loss)
if self.quantum_trainer:
self.quantum_trainer.validation_history = self.val_loss_history
logger.info("Validation loss: %.4f", val_loss)
self.model.train()
def _save_checkpoint(self, iteration):
"""Save model and adapters."""
ckpt_dir = os.path.join(self.checkpoint_dir, f"iter_{iteration}")
os.makedirs(ckpt_dir, exist_ok=True)
# Save base model
self.model.save_pretrained(ckpt_dir)
self.tokenizer.save_pretrained(ckpt_dir)
# Save adapters
for domain in self.domains:
adapter_dir = os.path.join(ckpt_dir, f"adapter_{domain}")
self.lora_manager.save_adapter(domain, adapter_dir)
# Save training history
with open(os.path.join(ckpt_dir, "history.json"), "w") as f:
json.dump(self.loss_history, f, indent=2)
logger.info("Checkpoint saved to %s", ckpt_dir)
def add_interaction(self, prompt: str, response: str, feedback: float = 0.0):
"""Add a real user interaction to the training buffer."""
self.interaction_buffer.append((prompt, response, feedback))
if len(self.interaction_buffer) > 1000:
self.interaction_buffer = self.interaction_buffer[-500:]
logger.info("Added interaction (buffer size: %d)", len(self.interaction_buffer))
def main():
parser = argparse.ArgumentParser()
parser.add_argument("--bootstrap", type=str, default="HuggingFaceTB/SmolLM2-135M",
help="Pretrained model to bootstrap from")
parser.add_argument("--device", type=str, default="mps" if torch.backends.mps.is_available() else "cpu")
parser.add_argument("--max_iterations", type=int, default=100)
parser.add_argument("--checkpoint_dir", type=str, default="./autopilot_checkpoints")
parser.add_argument("--lora_r", type=int, default=8)
parser.add_argument("--domains", nargs="+", default=["general", "programming", "math"])
args = parser.parse_args()
register()
# Tokenizer
tokenizer = AutoTokenizer.from_pretrained(args.bootstrap, trust_remote_code=True)
if tokenizer.pad_token is None:
tokenizer.pad_token = tokenizer.eos_token
# Load pretrained model directly (weight transfer to BeeForCausalLM is buggy)
model = AutoModelForCausalLM.from_pretrained(
args.bootstrap,
trust_remote_code=True,
torch_dtype=torch.float16 if args.device == "mps" else None,
).to(args.device)
logger.info("Loaded pretrained model: %s", args.bootstrap)
# Initialize autopilot
autopilot = Autopilot(
model=model,
tokenizer=tokenizer,
device=args.device,
domains=args.domains,
lora_config=LoRAConfig(r=args.lora_r, alpha=args.lora_r * 2),
checkpoint_dir=args.checkpoint_dir,
)
# Run autonomous loop
try:
autopilot.run_autonomous_loop(max_iterations=args.max_iterations)
except KeyboardInterrupt:
logger.info("Interrupted by user. Saving checkpoint...")
autopilot._save_checkpoint("interrupted")
if __name__ == "__main__":
main()