krystv's picture
Upload 107 files
3374e90 verified
//! JS Pool — the main interface for the host engine.
//!
//! JsPool manages a pool of worker threads, each running a QuickJS runtime.
//! It routes JS evaluation requests to workers with plugin affinity.
//! The pool grows on demand up to `max_workers` when all worker queues are full.
use crate::config::JsPoolConfig;
use crate::error::JsError;
use crate::worker::{JsResult, JsTask, JsTaskKind, JsWorker};
use crossbeam_channel::{self, Sender};
use parking_lot::Mutex;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::Arc;
/// The JS pool — the main interface for the host engine.
pub struct JsPool {
workers: Mutex<Vec<std::thread::JoinHandle<()>>>,
task_senders: Mutex<Vec<Sender<JsTask>>>,
/// Lock-free worker count for fast affinity selection.
worker_count: AtomicUsize,
shutdown: Arc<AtomicBool>,
config: JsPoolConfig,
}
impl JsPool {
/// Create a new JS pool with the given configuration.
pub fn new(config: JsPoolConfig) -> Result<Self, JsError> {
let num_workers = config.initial_workers;
let shutdown = Arc::new(AtomicBool::new(false));
let mut workers = Vec::with_capacity(num_workers);
let mut task_senders = Vec::with_capacity(num_workers);
for id in 0..num_workers {
let (tx, rx) = crossbeam_channel::bounded::<JsTask>(256);
let handle = JsWorker::spawn(id, config.clone(), rx, shutdown.clone());
workers.push(handle);
task_senders.push(tx);
}
Ok(Self {
workers: Mutex::new(workers),
task_senders: Mutex::new(task_senders),
worker_count: AtomicUsize::new(num_workers),
shutdown,
config,
})
}
/// Evaluate a JavaScript string and return the result as JSON.
/// Uses simple one-shot evaluation with default timeout.
/// `input` is injected as the global variable `input` before eval.
pub fn eval_js(&self, plugin_id: &str, code: &str, input: &str) -> Result<String, JsError> {
self.eval_js_opts(
plugin_id,
code,
input,
None,
self.config.default_timeout_ms,
)
}
/// Evaluate JavaScript with options.
pub fn eval_js_opts(
&self,
plugin_id: &str,
code: &str,
input: &str,
filename: Option<String>,
timeout_ms: u32,
) -> Result<String, JsError> {
if self.shutdown.load(Ordering::Acquire) {
return Err(JsError::PoolShutdown);
}
let (reply_tx, reply_rx) = crossbeam_channel::bounded::<JsResult>(1);
let task = JsTask {
plugin_id: plugin_id.to_string(),
kind: JsTaskKind::Eval {
code: code.to_string(),
input: input.to_string(),
filename,
timeout_ms,
},
reply: reply_tx,
};
self.dispatch_task(plugin_id, task)?;
match reply_rx.recv_timeout(std::time::Duration::from_millis(
(timeout_ms as u64).max(self.config.default_timeout_ms as u64) + 2000,
)) {
Ok(result) => result.result,
Err(_) => Err(JsError::Timeout(timeout_ms)),
}
}
/// Call a named JavaScript function.
/// `fn_source` is evaluated on first call (or if the source hash changes).
/// `args_json` is passed as a single string argument (no eval of user data).
pub fn call_js_fn(
&self,
plugin_id: &str,
name: &str,
fn_source: &str,
args_json: &str,
) -> Result<String, JsError> {
if self.shutdown.load(Ordering::Acquire) {
return Err(JsError::PoolShutdown);
}
let (reply_tx, reply_rx) = crossbeam_channel::bounded::<JsResult>(1);
let task = JsTask {
plugin_id: plugin_id.to_string(),
kind: JsTaskKind::CallFn {
name: name.to_string(),
fn_source: fn_source.to_string(),
args_json: args_json.to_string(),
timeout_ms: self.config.default_timeout_ms,
},
reply: reply_tx,
};
self.dispatch_task(plugin_id, task)?;
match reply_rx.recv_timeout(std::time::Duration::from_millis(
self.config.default_timeout_ms as u64 + 2000,
)) {
Ok(result) => result.result,
Err(_) => Err(JsError::Timeout(self.config.default_timeout_ms)),
}
}
/// Clear a named JS function from a plugin's context.
pub fn clear_js_fn(&self, plugin_id: &str, name: &str) -> Result<u8, JsError> {
if self.shutdown.load(Ordering::Acquire) {
return Err(JsError::PoolShutdown);
}
let (reply_tx, reply_rx) = crossbeam_channel::bounded::<JsResult>(1);
let task = JsTask {
plugin_id: plugin_id.to_string(),
kind: JsTaskKind::ClearFn {
name: name.to_string(),
},
reply: reply_tx,
};
self.dispatch_task(plugin_id, task)?;
match reply_rx.recv_timeout(std::time::Duration::from_millis(
self.config.default_timeout_ms as u64 + 2000,
)) {
Ok(result) => result.result.map(|_| 0),
Err(_) => Err(JsError::Timeout(self.config.default_timeout_ms)),
}
}
/// Evict a plugin's JS context (called on plugin uninstall).
pub fn evict_plugin(&self, plugin_id: &str) {
if self.shutdown.load(Ordering::Acquire) {
return;
}
// Send evict to all workers since we don't know which one has the context
let senders = self.task_senders.lock();
for tx in senders.iter() {
let (reply_tx, reply_rx) = crossbeam_channel::bounded::<JsResult>(1);
let task = JsTask {
plugin_id: plugin_id.to_string(),
kind: JsTaskKind::Evict,
reply: reply_tx,
};
let _ = tx.send(task);
let _ = reply_rx.recv_timeout(std::time::Duration::from_secs(2));
}
}
/// Dispatch a task to a worker with plugin affinity.
///
/// Strategy:
/// 1. Try the affinity worker first (hash-based).
/// 2. If full, try any other worker with capacity (overflow routing).
/// 3. If all full and pool < max_workers, grow the pool and retry.
/// 4. If still can't dispatch, return PoolBusy.
fn dispatch_task(&self, plugin_id: &str, mut task: JsTask) -> Result<(), JsError> {
if self.shutdown.load(Ordering::Acquire) {
return Err(JsError::PoolShutdown);
}
// Step 1: Try affinity worker first
let affinity_idx = self.select_worker(plugin_id);
{
let senders = self.task_senders.lock();
if let Some(tx) = senders.get(affinity_idx) {
match tx.try_send(task) {
Ok(()) => return Ok(()),
Err(crossbeam_channel::TrySendError::Disconnected(_)) => {
return Err(JsError::PoolShutdown);
}
Err(crossbeam_channel::TrySendError::Full(t)) => {
// Recover the task for overflow routing
task = t;
}
}
}
}
// Step 2: Overflow routing — try any other worker with capacity
{
let senders = self.task_senders.lock();
let count = senders.len();
for i in 0..count {
if i == affinity_idx {
continue;
}
if let Some(tx) = senders.get(i) {
match tx.try_send(task) {
Ok(()) => return Ok(()),
Err(crossbeam_channel::TrySendError::Disconnected(_)) => {
return Err(JsError::PoolShutdown);
}
Err(crossbeam_channel::TrySendError::Full(t)) => {
task = t;
continue;
}
}
}
}
}
// Step 3: All full — try to grow the pool if under max_workers
if self.maybe_grow() {
// The newly added worker is at the end — try it first since it's
// guaranteed to have an empty queue
let senders = self.task_senders.lock();
if let Some(tx) = senders.last() {
match tx.try_send(task) {
Ok(()) => return Ok(()),
Err(crossbeam_channel::TrySendError::Disconnected(_)) => {
return Err(JsError::PoolShutdown);
}
Err(crossbeam_channel::TrySendError::Full(_)) => {
// Extremely unlikely but possible under heavy contention
}
}
}
}
// Step 4: Still can't dispatch
Err(JsError::PoolBusy)
}
/// Select a worker for a given plugin_id.
/// Uses hash-based affinity so the same plugin always goes to the same worker.
fn select_worker(&self, plugin_id: &str) -> usize {
let hash = simple_hash(plugin_id);
let count = self.worker_count.load(Ordering::Acquire);
if count == 0 {
0
} else {
(hash as usize) % count
}
}
/// Try to spawn additional workers if the pool is under max_workers.
/// Returns `true` if at least one new worker was added.
fn maybe_grow(&self) -> bool {
let current = self.worker_count.load(Ordering::Acquire);
if current >= self.config.max_workers {
return false;
}
// Double-check under lock to avoid over-spawning
let mut senders = self.task_senders.lock();
let mut workers = self.workers.lock();
let count = senders.len();
if count >= self.config.max_workers {
return false;
}
let new_id = count;
let (tx, rx) = crossbeam_channel::bounded::<JsTask>(256);
let handle = JsWorker::spawn(new_id, self.config.clone(), rx, self.shutdown.clone());
workers.push(handle);
senders.push(tx);
// Update lock-free counter after the worker is registered
self.worker_count.store(count + 1, Ordering::Release);
tracing::info!(
old_count = count,
new_count = count + 1,
max = self.config.max_workers,
"JS pool grew — added worker"
);
true
}
}
impl Drop for JsPool {
fn drop(&mut self) {
// SeqCst ensures all other threads see shutdown=true BEFORE we clear senders
self.shutdown.store(true, Ordering::SeqCst);
// Fence: nothing before this point reorders after
std::sync::atomic::fence(Ordering::SeqCst);
// Drop senders to signal workers to stop
self.task_senders.lock().clear();
// Join all worker threads
let mut workers = self.workers.lock();
for handle in workers.drain(..) {
let _ = handle.join();
}
}
}
/// Simple FNV-1a hash for plugin_id -> worker affinity.
pub fn simple_hash(s: &str) -> u64 {
let mut hash: u64 = 0xcbf29ce484222325;
for byte in s.bytes() {
hash ^= byte as u64;
hash = hash.wrapping_mul(0x100000001b3);
}
hash
}