"""API key manager — multi-key support with CRUD.""" from __future__ import annotations import json import os import secrets import time from ..core.logger import get_logger log = get_logger("api-keys") def _keys_path() -> str: return os.path.join(os.path.dirname(__file__), "..", "..", "config", "api_keys.json") class ApiKey: def __init__(self, data: dict): self.key: str = data["key"] self.name: str = data.get("name", "") self.created_at: float = data.get("created_at", 0) self.last_used: float = data.get("last_used", 0) self.requests: int = data.get("requests", 0) self.enabled: bool = data.get("enabled", True) def to_dict(self) -> dict: return { "key": self.key, "name": self.name, "created_at": self.created_at, "last_used": self.last_used, "requests": self.requests, "enabled": self.enabled, } def to_public(self) -> dict: k = self.key masked = k[:7] + "..." + k[-4:] if len(k) > 12 else k return { "key": masked, "full_key": self.key, "name": self.name, "created_at": int(self.created_at * 1000), "last_used": int(self.last_used * 1000) if self.last_used else 0, "requests": self.requests, "enabled": self.enabled, } class ApiKeyManager: def __init__(self): self._keys: list[ApiKey] = [] self._path = _keys_path() self._dirty = False def load(self, default_key: str = ""): if os.path.exists(self._path): with open(self._path, "r", encoding="utf-8") as f: data = json.load(f) self._keys = [ApiKey(k) for k in data] if not self._keys and default_key: self._keys.append(ApiKey({ "key": default_key, "name": "默认密钥", "created_at": time.time(), })) self._save() log.info("Loaded %d keys", len(self._keys)) def _save(self): os.makedirs(os.path.dirname(self._path), exist_ok=True) with open(self._path, "w", encoding="utf-8") as f: json.dump([k.to_dict() for k in self._keys], f, indent=2) def validate(self, key: str) -> bool: for k in self._keys: if k.key == key and k.enabled: k.last_used = time.time() k.requests += 1 self._dirty = True return True return False def flush(self): """Persist pending changes to disk.""" if self._dirty: self._save() self._dirty = False def list_keys(self) -> list[dict]: return [k.to_public() for k in self._keys] def create_key(self, name: str = "") -> dict: key = "sk-" + secrets.token_hex(24) ak = ApiKey({ "key": key, "name": name or f"Key-{len(self._keys) + 1}", "created_at": time.time(), }) self._keys.append(ak) self._save() return ak.to_public() def create_key_with_value(self, key: str, name: str = "") -> dict: """Create a key with a specific value (for syncing from settings).""" for k in self._keys: if k.key == key: return k.to_public() ak = ApiKey({ "key": key, "name": name or f"Key-{len(self._keys) + 1}", "created_at": time.time(), }) self._keys.append(ak) self._save() return ak.to_public() def delete_key(self, key: str) -> bool: for i, k in enumerate(self._keys): if k.key == key: self._keys.pop(i) self._save() return True return False def toggle_key(self, key: str) -> bool: for k in self._keys: if k.key == key: k.enabled = not k.enabled self._save() return True return False