File size: 5,322 Bytes
3374e90
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
//! Lane-based scheduler for concurrent plugin calls.
//!
//! The scheduler uses tokio semaphores to limit concurrency across
//! different priority lanes:
//!
//! - **Control** (1 permit): Serialized operations like install/uninstall.
//!   Only one control operation at a time to avoid race conditions.
//!
//! - **User** (4 permits): Interactive operations like search, get_info,
//!   get_servers. These are user-facing and should be responsive.
//!
//! - **Background** (2 permits): Low-priority operations like prefetching,
//!   article fetching. Limited to avoid starving user-facing calls.
//!
//! Additional global limits:
//! - **WASM** (4 permits): Max concurrent WASM instantiations.
//!   Each instantiation uses memory and CPU for compilation.
//! - **HTTP** (8 permits): Max concurrent HTTP requests across all plugins.
//!   Prevents connection pool exhaustion.

use std::sync::Arc;
use thiserror::Error;
use tokio::sync::{Semaphore, SemaphorePermit};

/// Error type for scheduler operations.
#[derive(Debug, Error)]
pub enum SchedulerError {
    #[error("scheduler is closed")]
    Closed,
    #[error("request was cancelled")]
    Cancelled,
}

/// Scheduler lane — determines concurrency limits and priority.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum Lane {
    /// Serialized control operations (install/uninstall) — 1 permit
    Control,
    /// Interactive user operations (search, info, servers) — 4 permits
    User,
    /// Low-priority background operations (articles, prefetch) — 2 permits
    Background,
}

/// The scheduler owns tokio semaphores for lane-based concurrency control.
pub struct Scheduler {
    control: Arc<Semaphore>,
    user: Arc<Semaphore>,
    background: Arc<Semaphore>,
    wasm: Arc<Semaphore>,
    http: Arc<Semaphore>,
}

/// Configuration for scheduler limits.
#[derive(Debug, Clone)]
pub struct SchedulerConfig {
    pub max_control: usize,
    pub max_user: usize,
    pub max_background: usize,
    pub max_wasm: usize,
    pub max_http: usize,
}

impl Default for SchedulerConfig {
    fn default() -> Self {
        Self {
            max_control: 1,
            max_user: 4,
            max_background: 2,
            max_wasm: 4,
            max_http: 8,
        }
    }
}

impl Scheduler {
    /// Create a new scheduler with default limits.
    pub fn new() -> Self {
        Self::with_config(SchedulerConfig::default())
    }

    /// Create a new scheduler with custom limits.
    pub fn with_config(config: SchedulerConfig) -> Self {
        Self {
            control: Arc::new(Semaphore::new(config.max_control)),
            user: Arc::new(Semaphore::new(config.max_user)),
            background: Arc::new(Semaphore::new(config.max_background)),
            wasm: Arc::new(Semaphore::new(config.max_wasm)),
            http: Arc::new(Semaphore::new(config.max_http)),
        }
    }

    /// Acquire a permit for the given lane.
    ///
    /// The permit is released when dropped. Use this in an async context:
    /// ```ignore
    /// let _permit = scheduler.acquire(Lane::User).await?;
    /// // do work...
    /// // permit released on drop
    /// ```
    pub async fn acquire(&self, lane: Lane) -> Result<SemaphorePermit<'_>, SchedulerError> {
        let sem = match lane {
            Lane::Control => &self.control,
            Lane::User => &self.user,
            Lane::Background => &self.background,
        };
        sem.acquire()
            .await
            .map_err(|_| SchedulerError::Closed)
    }

    /// Try to acquire a permit without waiting. Returns None if the lane is full.
    pub fn try_acquire(&self, lane: Lane) -> Option<SemaphorePermit<'_>> {
        let sem = match lane {
            Lane::Control => &self.control,
            Lane::User => &self.user,
            Lane::Background => &self.background,
        };
        sem.try_acquire().ok()
    }

    /// Acquire a WASM instantiation permit.
    ///
    /// Each WASM instantiation is expensive (compilation + memory). This
    /// limits how many can happen concurrently.
    pub async fn acquire_wasm(&self) -> Result<SemaphorePermit<'_>, SchedulerError> {
        self.wasm
            .acquire()
            .await
            .map_err(|_| SchedulerError::Closed)
    }

    /// Acquire an HTTP request permit.
    ///
    /// Limits concurrent HTTP requests to prevent connection pool exhaustion.
    pub async fn acquire_http(&self) -> Result<SemaphorePermit<'_>, SchedulerError> {
        self.http
            .acquire()
            .await
            .map_err(|_| SchedulerError::Closed)
    }

    /// Get the number of available permits for a lane (for diagnostics).
    pub fn available_permits(&self, lane: Lane) -> usize {
        let sem = match lane {
            Lane::Control => &self.control,
            Lane::User => &self.user,
            Lane::Background => &self.background,
        };
        sem.available_permits()
    }

    /// Get the number of available WASM permits.
    pub fn available_wasm_permits(&self) -> usize {
        self.wasm.available_permits()
    }

    /// Get the number of available HTTP permits.
    pub fn available_http_permits(&self) -> usize {
        self.http.available_permits()
    }
}

impl Default for Scheduler {
    fn default() -> Self {
        Self::new()
    }
}