File size: 4,072 Bytes
cef045d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
"""API key manager — multi-key support with CRUD."""
from __future__ import annotations

import json
import os
import secrets
import time

from ..core.logger import get_logger

log = get_logger("api-keys")


def _keys_path() -> str:
    return os.path.join(os.path.dirname(__file__), "..", "..", "config", "api_keys.json")


class ApiKey:
    def __init__(self, data: dict):
        self.key: str = data["key"]
        self.name: str = data.get("name", "")
        self.created_at: float = data.get("created_at", 0)
        self.last_used: float = data.get("last_used", 0)
        self.requests: int = data.get("requests", 0)
        self.enabled: bool = data.get("enabled", True)

    def to_dict(self) -> dict:
        return {
            "key": self.key,
            "name": self.name,
            "created_at": self.created_at,
            "last_used": self.last_used,
            "requests": self.requests,
            "enabled": self.enabled,
        }

    def to_public(self) -> dict:
        k = self.key
        masked = k[:7] + "..." + k[-4:] if len(k) > 12 else k
        return {
            "key": masked,
            "full_key": self.key,
            "name": self.name,
            "created_at": int(self.created_at * 1000),
            "last_used": int(self.last_used * 1000) if self.last_used else 0,
            "requests": self.requests,
            "enabled": self.enabled,
        }


class ApiKeyManager:
    def __init__(self):
        self._keys: list[ApiKey] = []
        self._path = _keys_path()
        self._dirty = False

    def load(self, default_key: str = ""):
        if os.path.exists(self._path):
            with open(self._path, "r", encoding="utf-8") as f:
                data = json.load(f)
            self._keys = [ApiKey(k) for k in data]
        if not self._keys and default_key:
            self._keys.append(ApiKey({
                "key": default_key,
                "name": "默认密钥",
                "created_at": time.time(),
            }))
            self._save()
        log.info("Loaded %d keys", len(self._keys))

    def _save(self):
        os.makedirs(os.path.dirname(self._path), exist_ok=True)
        with open(self._path, "w", encoding="utf-8") as f:
            json.dump([k.to_dict() for k in self._keys], f, indent=2)

    def validate(self, key: str) -> bool:
        for k in self._keys:
            if k.key == key and k.enabled:
                k.last_used = time.time()
                k.requests += 1
                self._dirty = True
                return True
        return False

    def flush(self):
        """Persist pending changes to disk."""
        if self._dirty:
            self._save()
            self._dirty = False

    def list_keys(self) -> list[dict]:
        return [k.to_public() for k in self._keys]

    def create_key(self, name: str = "") -> dict:
        key = "sk-" + secrets.token_hex(24)
        ak = ApiKey({
            "key": key,
            "name": name or f"Key-{len(self._keys) + 1}",
            "created_at": time.time(),
        })
        self._keys.append(ak)
        self._save()
        return ak.to_public()

    def create_key_with_value(self, key: str, name: str = "") -> dict:
        """Create a key with a specific value (for syncing from settings)."""
        for k in self._keys:
            if k.key == key:
                return k.to_public()
        ak = ApiKey({
            "key": key,
            "name": name or f"Key-{len(self._keys) + 1}",
            "created_at": time.time(),
        })
        self._keys.append(ak)
        self._save()
        return ak.to_public()

    def delete_key(self, key: str) -> bool:
        for i, k in enumerate(self._keys):
            if k.key == key:
                self._keys.pop(i)
                self._save()
                return True
        return False

    def toggle_key(self, key: str) -> bool:
        for k in self._keys:
            if k.key == key:
                k.enabled = not k.enabled
                self._save()
                return True
        return False