File size: 5,322 Bytes
3374e90 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 | //! Lane-based scheduler for concurrent plugin calls.
//!
//! The scheduler uses tokio semaphores to limit concurrency across
//! different priority lanes:
//!
//! - **Control** (1 permit): Serialized operations like install/uninstall.
//! Only one control operation at a time to avoid race conditions.
//!
//! - **User** (4 permits): Interactive operations like search, get_info,
//! get_servers. These are user-facing and should be responsive.
//!
//! - **Background** (2 permits): Low-priority operations like prefetching,
//! article fetching. Limited to avoid starving user-facing calls.
//!
//! Additional global limits:
//! - **WASM** (4 permits): Max concurrent WASM instantiations.
//! Each instantiation uses memory and CPU for compilation.
//! - **HTTP** (8 permits): Max concurrent HTTP requests across all plugins.
//! Prevents connection pool exhaustion.
use std::sync::Arc;
use thiserror::Error;
use tokio::sync::{Semaphore, SemaphorePermit};
/// Error type for scheduler operations.
#[derive(Debug, Error)]
pub enum SchedulerError {
#[error("scheduler is closed")]
Closed,
#[error("request was cancelled")]
Cancelled,
}
/// Scheduler lane — determines concurrency limits and priority.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum Lane {
/// Serialized control operations (install/uninstall) — 1 permit
Control,
/// Interactive user operations (search, info, servers) — 4 permits
User,
/// Low-priority background operations (articles, prefetch) — 2 permits
Background,
}
/// The scheduler owns tokio semaphores for lane-based concurrency control.
pub struct Scheduler {
control: Arc<Semaphore>,
user: Arc<Semaphore>,
background: Arc<Semaphore>,
wasm: Arc<Semaphore>,
http: Arc<Semaphore>,
}
/// Configuration for scheduler limits.
#[derive(Debug, Clone)]
pub struct SchedulerConfig {
pub max_control: usize,
pub max_user: usize,
pub max_background: usize,
pub max_wasm: usize,
pub max_http: usize,
}
impl Default for SchedulerConfig {
fn default() -> Self {
Self {
max_control: 1,
max_user: 4,
max_background: 2,
max_wasm: 4,
max_http: 8,
}
}
}
impl Scheduler {
/// Create a new scheduler with default limits.
pub fn new() -> Self {
Self::with_config(SchedulerConfig::default())
}
/// Create a new scheduler with custom limits.
pub fn with_config(config: SchedulerConfig) -> Self {
Self {
control: Arc::new(Semaphore::new(config.max_control)),
user: Arc::new(Semaphore::new(config.max_user)),
background: Arc::new(Semaphore::new(config.max_background)),
wasm: Arc::new(Semaphore::new(config.max_wasm)),
http: Arc::new(Semaphore::new(config.max_http)),
}
}
/// Acquire a permit for the given lane.
///
/// The permit is released when dropped. Use this in an async context:
/// ```ignore
/// let _permit = scheduler.acquire(Lane::User).await?;
/// // do work...
/// // permit released on drop
/// ```
pub async fn acquire(&self, lane: Lane) -> Result<SemaphorePermit<'_>, SchedulerError> {
let sem = match lane {
Lane::Control => &self.control,
Lane::User => &self.user,
Lane::Background => &self.background,
};
sem.acquire()
.await
.map_err(|_| SchedulerError::Closed)
}
/// Try to acquire a permit without waiting. Returns None if the lane is full.
pub fn try_acquire(&self, lane: Lane) -> Option<SemaphorePermit<'_>> {
let sem = match lane {
Lane::Control => &self.control,
Lane::User => &self.user,
Lane::Background => &self.background,
};
sem.try_acquire().ok()
}
/// Acquire a WASM instantiation permit.
///
/// Each WASM instantiation is expensive (compilation + memory). This
/// limits how many can happen concurrently.
pub async fn acquire_wasm(&self) -> Result<SemaphorePermit<'_>, SchedulerError> {
self.wasm
.acquire()
.await
.map_err(|_| SchedulerError::Closed)
}
/// Acquire an HTTP request permit.
///
/// Limits concurrent HTTP requests to prevent connection pool exhaustion.
pub async fn acquire_http(&self) -> Result<SemaphorePermit<'_>, SchedulerError> {
self.http
.acquire()
.await
.map_err(|_| SchedulerError::Closed)
}
/// Get the number of available permits for a lane (for diagnostics).
pub fn available_permits(&self, lane: Lane) -> usize {
let sem = match lane {
Lane::Control => &self.control,
Lane::User => &self.user,
Lane::Background => &self.background,
};
sem.available_permits()
}
/// Get the number of available WASM permits.
pub fn available_wasm_permits(&self) -> usize {
self.wasm.available_permits()
}
/// Get the number of available HTTP permits.
pub fn available_http_permits(&self) -> usize {
self.http.available_permits()
}
}
impl Default for Scheduler {
fn default() -> Self {
Self::new()
}
}
|