| |
| |
| |
| |
| |
|
|
| use crate::config::JsPoolConfig; |
| use crate::error::JsError; |
| use crate::worker::{JsResult, JsTask, JsTaskKind, JsWorker}; |
| use crossbeam_channel::{self, Sender}; |
| use parking_lot::Mutex; |
| use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; |
| use std::sync::Arc; |
|
|
| |
| pub struct JsPool { |
| workers: Mutex<Vec<std::thread::JoinHandle<()>>>, |
| task_senders: Mutex<Vec<Sender<JsTask>>>, |
| |
| worker_count: AtomicUsize, |
| shutdown: Arc<AtomicBool>, |
| config: JsPoolConfig, |
| } |
|
|
| impl JsPool { |
| |
| pub fn new(config: JsPoolConfig) -> Result<Self, JsError> { |
| let num_workers = config.initial_workers; |
| let shutdown = Arc::new(AtomicBool::new(false)); |
| let mut workers = Vec::with_capacity(num_workers); |
| let mut task_senders = Vec::with_capacity(num_workers); |
|
|
| for id in 0..num_workers { |
| let (tx, rx) = crossbeam_channel::bounded::<JsTask>(256); |
| let handle = JsWorker::spawn(id, config.clone(), rx, shutdown.clone()); |
| workers.push(handle); |
| task_senders.push(tx); |
| } |
|
|
| Ok(Self { |
| workers: Mutex::new(workers), |
| task_senders: Mutex::new(task_senders), |
| worker_count: AtomicUsize::new(num_workers), |
| shutdown, |
| config, |
| }) |
| } |
|
|
| |
| |
| |
| pub fn eval_js(&self, plugin_id: &str, code: &str, input: &str) -> Result<String, JsError> { |
| self.eval_js_opts( |
| plugin_id, |
| code, |
| input, |
| None, |
| self.config.default_timeout_ms, |
| ) |
| } |
|
|
| |
| pub fn eval_js_opts( |
| &self, |
| plugin_id: &str, |
| code: &str, |
| input: &str, |
| filename: Option<String>, |
| timeout_ms: u32, |
| ) -> Result<String, JsError> { |
| if self.shutdown.load(Ordering::Acquire) { |
| return Err(JsError::PoolShutdown); |
| } |
|
|
| let (reply_tx, reply_rx) = crossbeam_channel::bounded::<JsResult>(1); |
|
|
| let task = JsTask { |
| plugin_id: plugin_id.to_string(), |
| kind: JsTaskKind::Eval { |
| code: code.to_string(), |
| input: input.to_string(), |
| filename, |
| timeout_ms, |
| }, |
| reply: reply_tx, |
| }; |
|
|
| self.dispatch_task(plugin_id, task)?; |
|
|
| match reply_rx.recv_timeout(std::time::Duration::from_millis( |
| (timeout_ms as u64).max(self.config.default_timeout_ms as u64) + 2000, |
| )) { |
| Ok(result) => result.result, |
| Err(_) => Err(JsError::Timeout(timeout_ms)), |
| } |
| } |
|
|
| |
| |
| |
| pub fn call_js_fn( |
| &self, |
| plugin_id: &str, |
| name: &str, |
| fn_source: &str, |
| args_json: &str, |
| ) -> Result<String, JsError> { |
| if self.shutdown.load(Ordering::Acquire) { |
| return Err(JsError::PoolShutdown); |
| } |
|
|
| let (reply_tx, reply_rx) = crossbeam_channel::bounded::<JsResult>(1); |
|
|
| let task = JsTask { |
| plugin_id: plugin_id.to_string(), |
| kind: JsTaskKind::CallFn { |
| name: name.to_string(), |
| fn_source: fn_source.to_string(), |
| args_json: args_json.to_string(), |
| timeout_ms: self.config.default_timeout_ms, |
| }, |
| reply: reply_tx, |
| }; |
|
|
| self.dispatch_task(plugin_id, task)?; |
|
|
| match reply_rx.recv_timeout(std::time::Duration::from_millis( |
| self.config.default_timeout_ms as u64 + 2000, |
| )) { |
| Ok(result) => result.result, |
| Err(_) => Err(JsError::Timeout(self.config.default_timeout_ms)), |
| } |
| } |
|
|
| |
| pub fn clear_js_fn(&self, plugin_id: &str, name: &str) -> Result<u8, JsError> { |
| if self.shutdown.load(Ordering::Acquire) { |
| return Err(JsError::PoolShutdown); |
| } |
|
|
| let (reply_tx, reply_rx) = crossbeam_channel::bounded::<JsResult>(1); |
|
|
| let task = JsTask { |
| plugin_id: plugin_id.to_string(), |
| kind: JsTaskKind::ClearFn { |
| name: name.to_string(), |
| }, |
| reply: reply_tx, |
| }; |
|
|
| self.dispatch_task(plugin_id, task)?; |
|
|
| match reply_rx.recv_timeout(std::time::Duration::from_millis( |
| self.config.default_timeout_ms as u64 + 2000, |
| )) { |
| Ok(result) => result.result.map(|_| 0), |
| Err(_) => Err(JsError::Timeout(self.config.default_timeout_ms)), |
| } |
| } |
|
|
| |
| pub fn evict_plugin(&self, plugin_id: &str) { |
| if self.shutdown.load(Ordering::Acquire) { |
| return; |
| } |
|
|
| |
| let senders = self.task_senders.lock(); |
| for tx in senders.iter() { |
| let (reply_tx, reply_rx) = crossbeam_channel::bounded::<JsResult>(1); |
| let task = JsTask { |
| plugin_id: plugin_id.to_string(), |
| kind: JsTaskKind::Evict, |
| reply: reply_tx, |
| }; |
| let _ = tx.send(task); |
| let _ = reply_rx.recv_timeout(std::time::Duration::from_secs(2)); |
| } |
| } |
|
|
| |
| |
| |
| |
| |
| |
| |
| fn dispatch_task(&self, plugin_id: &str, mut task: JsTask) -> Result<(), JsError> { |
| if self.shutdown.load(Ordering::Acquire) { |
| return Err(JsError::PoolShutdown); |
| } |
|
|
| |
| let affinity_idx = self.select_worker(plugin_id); |
| { |
| let senders = self.task_senders.lock(); |
| if let Some(tx) = senders.get(affinity_idx) { |
| match tx.try_send(task) { |
| Ok(()) => return Ok(()), |
| Err(crossbeam_channel::TrySendError::Disconnected(_)) => { |
| return Err(JsError::PoolShutdown); |
| } |
| Err(crossbeam_channel::TrySendError::Full(t)) => { |
| |
| task = t; |
| } |
| } |
| } |
| } |
|
|
| |
| { |
| let senders = self.task_senders.lock(); |
| let count = senders.len(); |
| for i in 0..count { |
| if i == affinity_idx { |
| continue; |
| } |
| if let Some(tx) = senders.get(i) { |
| match tx.try_send(task) { |
| Ok(()) => return Ok(()), |
| Err(crossbeam_channel::TrySendError::Disconnected(_)) => { |
| return Err(JsError::PoolShutdown); |
| } |
| Err(crossbeam_channel::TrySendError::Full(t)) => { |
| task = t; |
| continue; |
| } |
| } |
| } |
| } |
| } |
|
|
| |
| if self.maybe_grow() { |
| |
| |
| let senders = self.task_senders.lock(); |
| if let Some(tx) = senders.last() { |
| match tx.try_send(task) { |
| Ok(()) => return Ok(()), |
| Err(crossbeam_channel::TrySendError::Disconnected(_)) => { |
| return Err(JsError::PoolShutdown); |
| } |
| Err(crossbeam_channel::TrySendError::Full(_)) => { |
| |
| } |
| } |
| } |
| } |
|
|
| |
| Err(JsError::PoolBusy) |
| } |
|
|
| |
| |
| fn select_worker(&self, plugin_id: &str) -> usize { |
| let hash = simple_hash(plugin_id); |
| let count = self.worker_count.load(Ordering::Acquire); |
| if count == 0 { |
| 0 |
| } else { |
| (hash as usize) % count |
| } |
| } |
|
|
| |
| |
| fn maybe_grow(&self) -> bool { |
| let current = self.worker_count.load(Ordering::Acquire); |
| if current >= self.config.max_workers { |
| return false; |
| } |
|
|
| |
| let mut senders = self.task_senders.lock(); |
| let mut workers = self.workers.lock(); |
| let count = senders.len(); |
|
|
| if count >= self.config.max_workers { |
| return false; |
| } |
|
|
| let new_id = count; |
| let (tx, rx) = crossbeam_channel::bounded::<JsTask>(256); |
| let handle = JsWorker::spawn(new_id, self.config.clone(), rx, self.shutdown.clone()); |
| workers.push(handle); |
| senders.push(tx); |
|
|
| |
| self.worker_count.store(count + 1, Ordering::Release); |
|
|
| tracing::info!( |
| old_count = count, |
| new_count = count + 1, |
| max = self.config.max_workers, |
| "JS pool grew — added worker" |
| ); |
|
|
| true |
| } |
| } |
|
|
| impl Drop for JsPool { |
| fn drop(&mut self) { |
| |
| self.shutdown.store(true, Ordering::SeqCst); |
| |
| std::sync::atomic::fence(Ordering::SeqCst); |
| |
| self.task_senders.lock().clear(); |
| |
| let mut workers = self.workers.lock(); |
| for handle in workers.drain(..) { |
| let _ = handle.join(); |
| } |
| } |
| } |
|
|
| |
| pub fn simple_hash(s: &str) -> u64 { |
| let mut hash: u64 = 0xcbf29ce484222325; |
| for byte in s.bytes() { |
| hash ^= byte as u64; |
| hash = hash.wrapping_mul(0x100000001b3); |
| } |
| hash |
| } |
|
|