krystv's picture
Upload 107 files
3374e90 verified
//! Lane-based scheduler for concurrent plugin calls.
//!
//! The scheduler uses tokio semaphores to limit concurrency across
//! different priority lanes:
//!
//! - **Control** (1 permit): Serialized operations like install/uninstall.
//! Only one control operation at a time to avoid race conditions.
//!
//! - **User** (4 permits): Interactive operations like search, get_info,
//! get_servers. These are user-facing and should be responsive.
//!
//! - **Background** (2 permits): Low-priority operations like prefetching,
//! article fetching. Limited to avoid starving user-facing calls.
//!
//! Additional global limits:
//! - **WASM** (4 permits): Max concurrent WASM instantiations.
//! Each instantiation uses memory and CPU for compilation.
//! - **HTTP** (8 permits): Max concurrent HTTP requests across all plugins.
//! Prevents connection pool exhaustion.
use std::sync::Arc;
use thiserror::Error;
use tokio::sync::{Semaphore, SemaphorePermit};
/// Error type for scheduler operations.
#[derive(Debug, Error)]
pub enum SchedulerError {
#[error("scheduler is closed")]
Closed,
#[error("request was cancelled")]
Cancelled,
}
/// Scheduler lane — determines concurrency limits and priority.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum Lane {
/// Serialized control operations (install/uninstall) — 1 permit
Control,
/// Interactive user operations (search, info, servers) — 4 permits
User,
/// Low-priority background operations (articles, prefetch) — 2 permits
Background,
}
/// The scheduler owns tokio semaphores for lane-based concurrency control.
pub struct Scheduler {
control: Arc<Semaphore>,
user: Arc<Semaphore>,
background: Arc<Semaphore>,
wasm: Arc<Semaphore>,
http: Arc<Semaphore>,
}
/// Configuration for scheduler limits.
#[derive(Debug, Clone)]
pub struct SchedulerConfig {
pub max_control: usize,
pub max_user: usize,
pub max_background: usize,
pub max_wasm: usize,
pub max_http: usize,
}
impl Default for SchedulerConfig {
fn default() -> Self {
Self {
max_control: 1,
max_user: 4,
max_background: 2,
max_wasm: 4,
max_http: 8,
}
}
}
impl Scheduler {
/// Create a new scheduler with default limits.
pub fn new() -> Self {
Self::with_config(SchedulerConfig::default())
}
/// Create a new scheduler with custom limits.
pub fn with_config(config: SchedulerConfig) -> Self {
Self {
control: Arc::new(Semaphore::new(config.max_control)),
user: Arc::new(Semaphore::new(config.max_user)),
background: Arc::new(Semaphore::new(config.max_background)),
wasm: Arc::new(Semaphore::new(config.max_wasm)),
http: Arc::new(Semaphore::new(config.max_http)),
}
}
/// Acquire a permit for the given lane.
///
/// The permit is released when dropped. Use this in an async context:
/// ```ignore
/// let _permit = scheduler.acquire(Lane::User).await?;
/// // do work...
/// // permit released on drop
/// ```
pub async fn acquire(&self, lane: Lane) -> Result<SemaphorePermit<'_>, SchedulerError> {
let sem = match lane {
Lane::Control => &self.control,
Lane::User => &self.user,
Lane::Background => &self.background,
};
sem.acquire()
.await
.map_err(|_| SchedulerError::Closed)
}
/// Try to acquire a permit without waiting. Returns None if the lane is full.
pub fn try_acquire(&self, lane: Lane) -> Option<SemaphorePermit<'_>> {
let sem = match lane {
Lane::Control => &self.control,
Lane::User => &self.user,
Lane::Background => &self.background,
};
sem.try_acquire().ok()
}
/// Acquire a WASM instantiation permit.
///
/// Each WASM instantiation is expensive (compilation + memory). This
/// limits how many can happen concurrently.
pub async fn acquire_wasm(&self) -> Result<SemaphorePermit<'_>, SchedulerError> {
self.wasm
.acquire()
.await
.map_err(|_| SchedulerError::Closed)
}
/// Acquire an HTTP request permit.
///
/// Limits concurrent HTTP requests to prevent connection pool exhaustion.
pub async fn acquire_http(&self) -> Result<SemaphorePermit<'_>, SchedulerError> {
self.http
.acquire()
.await
.map_err(|_| SchedulerError::Closed)
}
/// Get the number of available permits for a lane (for diagnostics).
pub fn available_permits(&self, lane: Lane) -> usize {
let sem = match lane {
Lane::Control => &self.control,
Lane::User => &self.user,
Lane::Background => &self.background,
};
sem.available_permits()
}
/// Get the number of available WASM permits.
pub fn available_wasm_permits(&self) -> usize {
self.wasm.available_permits()
}
/// Get the number of available HTTP permits.
pub fn available_http_permits(&self) -> usize {
self.http.available_permits()
}
}
impl Default for Scheduler {
fn default() -> Self {
Self::new()
}
}