use serde::{Serialize, Deserialize}; use std::collections::VecDeque; use tokio::sync::RwLock; use tauri::Emitter; use std::sync::atomic::{AtomicBool, Ordering}; #[derive(Debug, Clone, Serialize, Deserialize)] pub struct ProxyRequestLog { pub id: String, pub timestamp: i64, pub method: String, pub url: String, pub status: u16, pub duration: u64, // ms pub model: Option, // 客户端请求的模型名 pub mapped_model: Option, // 实际路由后使用的模型名 pub account_email: Option, pub client_ip: Option, // 客户端 IP 地址 pub error: Option, pub request_body: Option, pub response_body: Option, pub input_tokens: Option, pub output_tokens: Option, pub protocol: Option, // 协议类型: "openai", "anthropic", "gemini" pub username: Option, // User token username } #[derive(Debug, Clone, Serialize, Deserialize, Default)] pub struct ProxyStats { pub total_requests: u64, pub success_count: u64, pub error_count: u64, } pub struct ProxyMonitor { pub logs: RwLock>, pub stats: RwLock, pub max_logs: usize, pub enabled: AtomicBool, app_handle: Option, } impl ProxyMonitor { pub fn new(max_logs: usize, app_handle: Option) -> Self { // Initialize DB if let Err(e) = crate::modules::proxy_db::init_db() { tracing::error!("Failed to initialize proxy DB: {}", e); } // Auto cleanup old logs (keep last 30 days) tokio::spawn(async { match crate::modules::proxy_db::cleanup_old_logs(30) { Ok(deleted) => { if deleted > 0 { tracing::info!("Auto cleanup: removed {} old logs (>30 days)", deleted); } } Err(e) => { tracing::error!("Failed to cleanup old logs: {}", e); } } }); Self { logs: RwLock::new(VecDeque::with_capacity(max_logs)), stats: RwLock::new(ProxyStats::default()), max_logs, enabled: AtomicBool::new(false), // Default to disabled app_handle, } } pub fn set_enabled(&self, enabled: bool) { self.enabled.store(enabled, Ordering::Relaxed); } pub fn is_enabled(&self) -> bool { self.enabled.load(Ordering::Relaxed) } pub async fn log_request(&self, log: ProxyRequestLog) { if let (Some(account), Some(input), Some(output)) = ( &log.account_email, log.input_tokens, log.output_tokens, ) { let model = log.model.clone().unwrap_or_else(|| "unknown".to_string()); let account = account.clone(); tokio::spawn(async move { if let Err(e) = crate::modules::token_stats::record_usage(&account, &model, input, output) { tracing::debug!("Failed to record token stats: {}", e); } }); } if !self.is_enabled() { return; } tracing::info!("[Monitor] Logging request: {} {}", log.method, log.url); // Update stats { let mut stats = self.stats.write().await; stats.total_requests += 1; if log.status >= 200 && log.status < 400 { stats.success_count += 1; } else { stats.error_count += 1; } } // Add log to memory { let mut logs = self.logs.write().await; if logs.len() >= self.max_logs { logs.pop_back(); } logs.push_front(log.clone()); } // Save to DB let log_to_save = log.clone(); tokio::spawn(async move { if let Err(e) = crate::modules::proxy_db::save_log(&log_to_save) { tracing::error!("Failed to save proxy log to DB: {}", e); } // Sync to Security DB (IpAccessLogs) so it appears in Security Monitor if let Some(ip) = &log_to_save.client_ip { let security_log = crate::modules::security_db::IpAccessLog { id: uuid::Uuid::new_v4().to_string(), client_ip: ip.clone(), timestamp: log_to_save.timestamp / 1000, // ms to s method: Some(log_to_save.method.clone()), path: Some(log_to_save.url.clone()), user_agent: None, // We don't have UA in ProxyRequestLog easily accessible here without plumbing status: Some(log_to_save.status as i32), duration: Some(log_to_save.duration as i64), api_key_hash: None, blocked: false, // This comes from monitor, so it wasn't blocked by IP filter block_reason: None, username: log_to_save.username.clone(), }; if let Err(e) = crate::modules::security_db::save_ip_access_log(&security_log) { tracing::error!("Failed to save security log: {}", e); } } // Record token stats if available if let (Some(account), Some(input), Some(output)) = ( &log_to_save.account_email, log_to_save.input_tokens, log_to_save.output_tokens, ) { let model = log_to_save.model.clone().unwrap_or_else(|| "unknown".to_string()); if let Err(e) = crate::modules::token_stats::record_usage(account, &model, input, output) { tracing::debug!("Failed to record token stats: {}", e); } } }); // Emit event (send summary only, without body to reduce memory) if let Some(app) = &self.app_handle { let log_summary = ProxyRequestLog { id: log.id.clone(), timestamp: log.timestamp, method: log.method.clone(), url: log.url.clone(), status: log.status, duration: log.duration, model: log.model.clone(), mapped_model: log.mapped_model.clone(), account_email: log.account_email.clone(), client_ip: log.client_ip.clone(), error: log.error.clone(), request_body: None, // Don't send body in event response_body: None, // Don't send body in event input_tokens: log.input_tokens, output_tokens: log.output_tokens, protocol: log.protocol.clone(), username: log.username.clone(), }; let _ = app.emit("proxy://request", &log_summary); } } pub async fn get_logs(&self, limit: usize) -> Vec { // Try to get from DB first for true history let db_result = tokio::task::spawn_blocking(move || { crate::modules::proxy_db::get_logs(limit) }).await; match db_result { Ok(Ok(logs)) => logs, Ok(Err(e)) => { tracing::error!("Failed to get logs from DB: {}", e); // Fallback to memory let logs = self.logs.read().await; logs.iter().take(limit).cloned().collect() } Err(e) => { tracing::error!("Spawn blocking failed for get_logs: {}", e); let logs = self.logs.read().await; logs.iter().take(limit).cloned().collect() } } } pub async fn get_stats(&self) -> ProxyStats { let db_result = tokio::task::spawn_blocking(|| { crate::modules::proxy_db::get_stats() }).await; match db_result { Ok(Ok(stats)) => stats, Ok(Err(e)) => { tracing::error!("Failed to get stats from DB: {}", e); self.stats.read().await.clone() } Err(e) => { tracing::error!("Spawn blocking failed for get_stats: {}", e); self.stats.read().await.clone() } } } pub async fn get_logs_filtered( &self, page: usize, page_size: usize, search_text: Option, level: Option, ) -> Result, String> { let offset = (page.max(1) - 1) * page_size; let errors_only = level.as_deref() == Some("error"); let search = search_text.unwrap_or_default(); let res = tokio::task::spawn_blocking(move || { crate::modules::proxy_db::get_logs_filtered(&search, errors_only, page_size, offset) }).await; match res { Ok(r) => r, Err(e) => Err(format!("Spawn blocking failed: {}", e)), } } pub async fn clear(&self) { let mut logs = self.logs.write().await; logs.clear(); let mut stats = self.stats.write().await; *stats = ProxyStats::default(); let _ = tokio::task::spawn_blocking(|| { if let Err(e) = crate::modules::proxy_db::clear_logs() { tracing::error!("Failed to clear logs in DB: {}", e); } }).await; } }