//! JS Pool — the main interface for the host engine. //! //! JsPool manages a pool of worker threads, each running a QuickJS runtime. //! It routes JS evaluation requests to workers with plugin affinity. //! The pool grows on demand up to `max_workers` when all worker queues are full. use crate::config::JsPoolConfig; use crate::error::JsError; use crate::worker::{JsResult, JsTask, JsTaskKind, JsWorker}; use crossbeam_channel::{self, Sender}; use parking_lot::Mutex; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::sync::Arc; /// The JS pool — the main interface for the host engine. pub struct JsPool { workers: Mutex>>, task_senders: Mutex>>, /// Lock-free worker count for fast affinity selection. worker_count: AtomicUsize, shutdown: Arc, config: JsPoolConfig, } impl JsPool { /// Create a new JS pool with the given configuration. pub fn new(config: JsPoolConfig) -> Result { let num_workers = config.initial_workers; let shutdown = Arc::new(AtomicBool::new(false)); let mut workers = Vec::with_capacity(num_workers); let mut task_senders = Vec::with_capacity(num_workers); for id in 0..num_workers { let (tx, rx) = crossbeam_channel::bounded::(256); let handle = JsWorker::spawn(id, config.clone(), rx, shutdown.clone()); workers.push(handle); task_senders.push(tx); } Ok(Self { workers: Mutex::new(workers), task_senders: Mutex::new(task_senders), worker_count: AtomicUsize::new(num_workers), shutdown, config, }) } /// Evaluate a JavaScript string and return the result as JSON. /// Uses simple one-shot evaluation with default timeout. /// `input` is injected as the global variable `input` before eval. pub fn eval_js(&self, plugin_id: &str, code: &str, input: &str) -> Result { self.eval_js_opts( plugin_id, code, input, None, self.config.default_timeout_ms, ) } /// Evaluate JavaScript with options. pub fn eval_js_opts( &self, plugin_id: &str, code: &str, input: &str, filename: Option, timeout_ms: u32, ) -> Result { if self.shutdown.load(Ordering::Acquire) { return Err(JsError::PoolShutdown); } let (reply_tx, reply_rx) = crossbeam_channel::bounded::(1); let task = JsTask { plugin_id: plugin_id.to_string(), kind: JsTaskKind::Eval { code: code.to_string(), input: input.to_string(), filename, timeout_ms, }, reply: reply_tx, }; self.dispatch_task(plugin_id, task)?; match reply_rx.recv_timeout(std::time::Duration::from_millis( (timeout_ms as u64).max(self.config.default_timeout_ms as u64) + 2000, )) { Ok(result) => result.result, Err(_) => Err(JsError::Timeout(timeout_ms)), } } /// Call a named JavaScript function. /// `fn_source` is evaluated on first call (or if the source hash changes). /// `args_json` is passed as a single string argument (no eval of user data). pub fn call_js_fn( &self, plugin_id: &str, name: &str, fn_source: &str, args_json: &str, ) -> Result { if self.shutdown.load(Ordering::Acquire) { return Err(JsError::PoolShutdown); } let (reply_tx, reply_rx) = crossbeam_channel::bounded::(1); let task = JsTask { plugin_id: plugin_id.to_string(), kind: JsTaskKind::CallFn { name: name.to_string(), fn_source: fn_source.to_string(), args_json: args_json.to_string(), timeout_ms: self.config.default_timeout_ms, }, reply: reply_tx, }; self.dispatch_task(plugin_id, task)?; match reply_rx.recv_timeout(std::time::Duration::from_millis( self.config.default_timeout_ms as u64 + 2000, )) { Ok(result) => result.result, Err(_) => Err(JsError::Timeout(self.config.default_timeout_ms)), } } /// Clear a named JS function from a plugin's context. pub fn clear_js_fn(&self, plugin_id: &str, name: &str) -> Result { if self.shutdown.load(Ordering::Acquire) { return Err(JsError::PoolShutdown); } let (reply_tx, reply_rx) = crossbeam_channel::bounded::(1); let task = JsTask { plugin_id: plugin_id.to_string(), kind: JsTaskKind::ClearFn { name: name.to_string(), }, reply: reply_tx, }; self.dispatch_task(plugin_id, task)?; match reply_rx.recv_timeout(std::time::Duration::from_millis( self.config.default_timeout_ms as u64 + 2000, )) { Ok(result) => result.result.map(|_| 0), Err(_) => Err(JsError::Timeout(self.config.default_timeout_ms)), } } /// Evict a plugin's JS context (called on plugin uninstall). pub fn evict_plugin(&self, plugin_id: &str) { if self.shutdown.load(Ordering::Acquire) { return; } // Send evict to all workers since we don't know which one has the context let senders = self.task_senders.lock(); for tx in senders.iter() { let (reply_tx, reply_rx) = crossbeam_channel::bounded::(1); let task = JsTask { plugin_id: plugin_id.to_string(), kind: JsTaskKind::Evict, reply: reply_tx, }; let _ = tx.send(task); let _ = reply_rx.recv_timeout(std::time::Duration::from_secs(2)); } } /// Dispatch a task to a worker with plugin affinity. /// /// Strategy: /// 1. Try the affinity worker first (hash-based). /// 2. If full, try any other worker with capacity (overflow routing). /// 3. If all full and pool < max_workers, grow the pool and retry. /// 4. If still can't dispatch, return PoolBusy. fn dispatch_task(&self, plugin_id: &str, mut task: JsTask) -> Result<(), JsError> { if self.shutdown.load(Ordering::Acquire) { return Err(JsError::PoolShutdown); } // Step 1: Try affinity worker first let affinity_idx = self.select_worker(plugin_id); { let senders = self.task_senders.lock(); if let Some(tx) = senders.get(affinity_idx) { match tx.try_send(task) { Ok(()) => return Ok(()), Err(crossbeam_channel::TrySendError::Disconnected(_)) => { return Err(JsError::PoolShutdown); } Err(crossbeam_channel::TrySendError::Full(t)) => { // Recover the task for overflow routing task = t; } } } } // Step 2: Overflow routing — try any other worker with capacity { let senders = self.task_senders.lock(); let count = senders.len(); for i in 0..count { if i == affinity_idx { continue; } if let Some(tx) = senders.get(i) { match tx.try_send(task) { Ok(()) => return Ok(()), Err(crossbeam_channel::TrySendError::Disconnected(_)) => { return Err(JsError::PoolShutdown); } Err(crossbeam_channel::TrySendError::Full(t)) => { task = t; continue; } } } } } // Step 3: All full — try to grow the pool if under max_workers if self.maybe_grow() { // The newly added worker is at the end — try it first since it's // guaranteed to have an empty queue let senders = self.task_senders.lock(); if let Some(tx) = senders.last() { match tx.try_send(task) { Ok(()) => return Ok(()), Err(crossbeam_channel::TrySendError::Disconnected(_)) => { return Err(JsError::PoolShutdown); } Err(crossbeam_channel::TrySendError::Full(_)) => { // Extremely unlikely but possible under heavy contention } } } } // Step 4: Still can't dispatch Err(JsError::PoolBusy) } /// Select a worker for a given plugin_id. /// Uses hash-based affinity so the same plugin always goes to the same worker. fn select_worker(&self, plugin_id: &str) -> usize { let hash = simple_hash(plugin_id); let count = self.worker_count.load(Ordering::Acquire); if count == 0 { 0 } else { (hash as usize) % count } } /// Try to spawn additional workers if the pool is under max_workers. /// Returns `true` if at least one new worker was added. fn maybe_grow(&self) -> bool { let current = self.worker_count.load(Ordering::Acquire); if current >= self.config.max_workers { return false; } // Double-check under lock to avoid over-spawning let mut senders = self.task_senders.lock(); let mut workers = self.workers.lock(); let count = senders.len(); if count >= self.config.max_workers { return false; } let new_id = count; let (tx, rx) = crossbeam_channel::bounded::(256); let handle = JsWorker::spawn(new_id, self.config.clone(), rx, self.shutdown.clone()); workers.push(handle); senders.push(tx); // Update lock-free counter after the worker is registered self.worker_count.store(count + 1, Ordering::Release); tracing::info!( old_count = count, new_count = count + 1, max = self.config.max_workers, "JS pool grew — added worker" ); true } } impl Drop for JsPool { fn drop(&mut self) { // SeqCst ensures all other threads see shutdown=true BEFORE we clear senders self.shutdown.store(true, Ordering::SeqCst); // Fence: nothing before this point reorders after std::sync::atomic::fence(Ordering::SeqCst); // Drop senders to signal workers to stop self.task_senders.lock().clear(); // Join all worker threads let mut workers = self.workers.lock(); for handle in workers.drain(..) { let _ = handle.join(); } } } /// Simple FNV-1a hash for plugin_id -> worker affinity. pub fn simple_hash(s: &str) -> u64 { let mut hash: u64 = 0xcbf29ce484222325; for byte in s.bytes() { hash ^= byte as u64; hash = hash.wrapping_mul(0x100000001b3); } hash }