ob12api / src /services /api_key_manager.py
david-baxter's picture
Upload 24 files
cef045d verified
"""API key manager — multi-key support with CRUD."""
from __future__ import annotations
import json
import os
import secrets
import time
from ..core.logger import get_logger
log = get_logger("api-keys")
def _keys_path() -> str:
return os.path.join(os.path.dirname(__file__), "..", "..", "config", "api_keys.json")
class ApiKey:
def __init__(self, data: dict):
self.key: str = data["key"]
self.name: str = data.get("name", "")
self.created_at: float = data.get("created_at", 0)
self.last_used: float = data.get("last_used", 0)
self.requests: int = data.get("requests", 0)
self.enabled: bool = data.get("enabled", True)
def to_dict(self) -> dict:
return {
"key": self.key,
"name": self.name,
"created_at": self.created_at,
"last_used": self.last_used,
"requests": self.requests,
"enabled": self.enabled,
}
def to_public(self) -> dict:
k = self.key
masked = k[:7] + "..." + k[-4:] if len(k) > 12 else k
return {
"key": masked,
"full_key": self.key,
"name": self.name,
"created_at": int(self.created_at * 1000),
"last_used": int(self.last_used * 1000) if self.last_used else 0,
"requests": self.requests,
"enabled": self.enabled,
}
class ApiKeyManager:
def __init__(self):
self._keys: list[ApiKey] = []
self._path = _keys_path()
self._dirty = False
def load(self, default_key: str = ""):
if os.path.exists(self._path):
with open(self._path, "r", encoding="utf-8") as f:
data = json.load(f)
self._keys = [ApiKey(k) for k in data]
if not self._keys and default_key:
self._keys.append(ApiKey({
"key": default_key,
"name": "默认密钥",
"created_at": time.time(),
}))
self._save()
log.info("Loaded %d keys", len(self._keys))
def _save(self):
os.makedirs(os.path.dirname(self._path), exist_ok=True)
with open(self._path, "w", encoding="utf-8") as f:
json.dump([k.to_dict() for k in self._keys], f, indent=2)
def validate(self, key: str) -> bool:
for k in self._keys:
if k.key == key and k.enabled:
k.last_used = time.time()
k.requests += 1
self._dirty = True
return True
return False
def flush(self):
"""Persist pending changes to disk."""
if self._dirty:
self._save()
self._dirty = False
def list_keys(self) -> list[dict]:
return [k.to_public() for k in self._keys]
def create_key(self, name: str = "") -> dict:
key = "sk-" + secrets.token_hex(24)
ak = ApiKey({
"key": key,
"name": name or f"Key-{len(self._keys) + 1}",
"created_at": time.time(),
})
self._keys.append(ak)
self._save()
return ak.to_public()
def create_key_with_value(self, key: str, name: str = "") -> dict:
"""Create a key with a specific value (for syncing from settings)."""
for k in self._keys:
if k.key == key:
return k.to_public()
ak = ApiKey({
"key": key,
"name": name or f"Key-{len(self._keys) + 1}",
"created_at": time.time(),
})
self._keys.append(ak)
self._save()
return ak.to_public()
def delete_key(self, key: str) -> bool:
for i, k in enumerate(self._keys):
if k.key == key:
self._keys.pop(i)
self._save()
return True
return False
def toggle_key(self, key: str) -> bool:
for k in self._keys:
if k.key == key:
k.enabled = not k.enabled
self._save()
return True
return False