MahjongGameDesigner / cache_manager.py
Estazz's picture
Upload 20 files
a705843 verified
"""
缓存管理模块 - 提供文件缓存和请求缓存功能
"""
import hashlib
import os
import time
from typing import Dict, Any, Optional
class FileCache:
"""文件缓存管理器"""
def __init__(self, cache_dir: str = ".cache", max_age: int = 3600):
"""
初始化文件缓存
Args:
cache_dir: 缓存目录
max_age: 缓存最大存活时间(秒)
"""
self.cache_dir = cache_dir
self.max_age = max_age
self._ensure_cache_dir()
def _ensure_cache_dir(self):
"""确保缓存目录存在"""
if not os.path.exists(self.cache_dir):
os.makedirs(self.cache_dir, exist_ok=True)
def _get_cache_key(self, file_path: str) -> str:
"""生成缓存键"""
# 使用文件路径和修改时间生成唯一键
stat = os.stat(file_path)
key_data = f"{file_path}:{stat.st_mtime}:{stat.st_size}"
return hashlib.md5(key_data.encode()).hexdigest()
def _get_cache_path(self, cache_key: str) -> str:
"""获取缓存文件路径"""
return os.path.join(self.cache_dir, f"{cache_key}.cache")
def get(self, file_path: str) -> Optional[str]:
"""
从缓存获取文件内容
Args:
file_path: 文件路径
Returns:
文件内容或None(如果缓存不存在或过期)
"""
if not os.path.exists(file_path):
return None
cache_key = self._get_cache_key(file_path)
cache_path = self._get_cache_path(cache_key)
if not os.path.exists(cache_path):
return None
# 检查缓存是否过期
cache_age = time.time() - os.path.getmtime(cache_path)
if cache_age > self.max_age:
os.remove(cache_path)
return None
try:
with open(cache_path, 'r', encoding='utf-8') as f:
return f.read()
except Exception:
# 缓存文件损坏,删除它
if os.path.exists(cache_path):
os.remove(cache_path)
return None
def set(self, file_path: str, content: str) -> bool:
"""
将文件内容存入缓存
Args:
file_path: 文件路径
content: 文件内容
Returns:
是否成功缓存
"""
try:
cache_key = self._get_cache_key(file_path)
cache_path = self._get_cache_path(cache_key)
with open(cache_path, 'w', encoding='utf-8') as f:
f.write(content)
return True
except Exception:
return False
def clear(self):
"""清空所有缓存"""
if os.path.exists(self.cache_dir):
for file in os.listdir(self.cache_dir):
if file.endswith('.cache'):
os.remove(os.path.join(self.cache_dir, file))
class RequestCache:
"""请求缓存管理器"""
def __init__(self, max_size: int = 100, max_age: int = 1800):
"""
初始化请求缓存
Args:
max_size: 最大缓存条目数
max_age: 缓存最大存活时间(秒)
"""
self.max_size = max_size
self.max_age = max_age
self._cache: Dict[str, Dict[str, Any]] = {}
def _get_cache_key(self, messages: list) -> str:
"""生成请求缓存键"""
# 使用消息内容生成唯一键
key_data = str(messages)
return hashlib.md5(key_data.encode()).hexdigest()
def _cleanup_expired(self):
"""清理过期的缓存条目"""
current_time = time.time()
expired_keys = []
for key, data in self._cache.items():
if current_time - data['timestamp'] > self.max_age:
expired_keys.append(key)
for key in expired_keys:
del self._cache[key]
def _cleanup_oldest(self):
"""清理最旧的缓存条目"""
if len(self._cache) >= self.max_size:
# 找到最旧的条目
oldest_key = min(self._cache.keys(),
key=lambda k: self._cache[k]['timestamp'])
del self._cache[oldest_key]
def get(self, messages: list) -> Optional[str]:
"""
从缓存获取响应
Args:
messages: 消息列表
Returns:
缓存的响应或None
"""
self._cleanup_expired()
cache_key = self._get_cache_key(messages)
if cache_key in self._cache:
return self._cache[cache_key]['response']
return None
def set(self, messages: list, response: str) -> bool:
"""
将响应存入缓存
Args:
messages: 消息列表
response: 响应内容
Returns:
是否成功缓存
"""
self._cleanup_expired()
self._cleanup_oldest()
cache_key = self._get_cache_key(messages)
self._cache[cache_key] = {
'response': response,
'timestamp': time.time()
}
return True
def clear(self):
"""清空所有缓存"""
self._cache.clear()
# 全局缓存实例
file_cache = FileCache()
request_cache = RequestCache()