File size: 10,102 Bytes
3374e90 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 | //! The BexRuntime β async callback-driven wrapper around bex_core::Engine.
//!
//! BexRuntime adds these capabilities on top of the sync bex_core::Engine:
//!
//! 1. **Callback-based async**: Plugin call results are delivered directly
//! to a C function pointer callback from a background Tokio thread.
//! No event queue, no polling.
//!
//! 2. **Lane-based scheduling**: Plugin calls are dispatched to tokio tasks
//! with concurrency limits per priority lane (Control, User, Background).
//!
//! 3. **Cancellation**: Each request gets a `CancellationToken`. The C++
//! backend can cancel via `bex_cancel_request()`.
//!
//! ## Architecture
//!
//! ```text
//! C++ Backend
//! β
//! βββ bex_submit_search(engine, plugin_id, query, callback, user_data)
//! β β returns request_id immediately
//! β
//! β [Rust Tokio background thread]
//! β βββ Acquires scheduler permit
//! β βββ Executes plugin call via spawn_blocking
//! β βββ Invokes callback(user_data, request_id, success, payload, len)
//! β
//! βββ bex_cancel_request(engine, request_id)
//! β
//! BexRuntime (this crate)
//! βββββββββββββββββββββββββββββββββββββ
//! β Scheduler (lane semaphores) β
//! β Cancellation Tokens (DashMap) β
//! β bex_core::Engine (inner) β
//! β Tokio Runtime (owned) β
//! βββββββββββββββββββββββββββββββββββββ
//! ```
use std::sync::atomic::{AtomicU8, Ordering};
use std::sync::Arc;
use bex_types::BexError;
use dashmap::DashMap;
use tokio_util::sync::CancellationToken;
use crate::scheduler::{Scheduler, SchedulerConfig};
// ββ Runtime State Machine ββββββββββββββββββββββββββββββββββββββββββββ
#[allow(dead_code)]
const STATE_NOT_READY: u8 = 0;
const STATE_READY: u8 = 1;
const STATE_DRAINING: u8 = 2;
const STATE_STOPPED: u8 = 3;
// ββ BexRuntime βββββββββββββββββββββββββββββββββββββββββββββββββββββββ
/// The async runtime that wraps `bex_core::Engine` with callback-driven scheduling.
///
/// Created once at application startup. The C++ backend (via FFI)
/// calls the `bex_submit_*` functions to kick off plugin operations, and
/// receives results via the callback function pointer.
pub struct BexRuntime {
inner: Arc<RuntimeInner>,
}
struct RuntimeInner {
/// The underlying sync engine from bex-core.
engine: bex_core::Engine,
/// Owned tokio runtime β keeps the async threads alive.
runtime: Arc<tokio::runtime::Runtime>,
/// Lane-based scheduler for concurrency control.
#[allow(dead_code)]
scheduler: Scheduler,
/// Cancellation tokens per request.
cancellation: DashMap<u64, CancellationToken>,
/// Runtime state machine.
state: AtomicU8,
}
impl BexRuntime {
/// Create a new BexRuntime from the given engine config.
pub fn new(config: bex_core::EngineConfig) -> Result<Self, BexError> {
Self::with_scheduler_config(config, SchedulerConfig::default())
}
/// Create a new BexRuntime with custom scheduler limits.
pub fn with_scheduler_config(
config: bex_core::EngineConfig,
scheduler_config: SchedulerConfig,
) -> Result<Self, BexError> {
let engine = bex_core::Engine::new(config)?;
let runtime = Arc::new(
tokio::runtime::Builder::new_multi_thread()
.worker_threads(4)
.enable_all()
.build()
.map_err(|e| BexError::Internal(format!("tokio runtime: {e}")))?,
);
let scheduler = Scheduler::with_config(scheduler_config);
let inner = Arc::new(RuntimeInner {
engine,
runtime,
scheduler,
cancellation: DashMap::new(),
state: AtomicU8::new(STATE_READY),
});
Ok(Self { inner })
}
// ββ Accessors for FFI layer ββββββββββββββββββββββββββββββββββββββ
/// Get a clone of the underlying bex-core Engine for use in spawned tasks.
pub fn clone_engine(&self) -> bex_core::Engine {
self.inner.engine.clone()
}
/// Get a handle to the Tokio runtime for spawning tasks.
pub fn tokio_handle(&self) -> tokio::runtime::Handle {
self.inner.runtime.handle().clone()
}
/// Insert a cancellation token for a request.
pub fn insert_cancellation(&self, request_id: u64, token: CancellationToken) {
self.inner.cancellation.insert(request_id, token);
}
/// Remove a cancellation token (after request completes).
pub fn remove_cancellation(&self, request_id: u64) {
self.inner.cancellation.remove(&request_id);
}
// ββ Cancellation ββββββββββββββββββββββββββββββββββββββββββββββββ
/// Cancel a pending request.
pub fn cancel_request(&self, request_id: u64) -> bool {
if let Some((_, token)) = self.inner.cancellation.remove(&request_id) {
token.cancel();
true
} else {
false
}
}
// ββ Plugin Management (delegated to bex_core::Engine) βββββββββββ
/// Install a plugin from a file path.
pub fn install_plugin(&self, path: &std::path::Path) -> Result<bex_types::plugin_info::PluginInfo, BexError> {
self.inner.engine.install_plugin(path)
}
/// Install a plugin from raw bytes.
pub fn install_bytes(&self, data: &[u8]) -> Result<bex_types::plugin_info::PluginInfo, BexError> {
self.inner.engine.install_bytes(data)
}
/// Uninstall a plugin.
pub fn uninstall_plugin(&self, id: &str) -> Result<(), BexError> {
self.inner.engine.uninstall_plugin(id)
}
/// List all installed plugins.
pub fn list_plugins(&self) -> Vec<bex_types::plugin_info::PluginInfo> {
self.inner.engine.list_plugins()
}
/// Enable a plugin.
pub fn enable_plugin(&self, id: &str) -> Result<(), BexError> {
self.inner.engine.enable_plugin(id)
}
/// Disable a plugin.
pub fn disable_plugin(&self, id: &str) -> Result<(), BexError> {
self.inner.engine.disable_plugin(id)
}
/// Get plugin info.
pub fn get_plugin_info(&self, id: &str) -> Option<bex_types::plugin_info::PluginInfo> {
self.inner.engine.get_plugin_info(id)
}
// ββ Secret / API Key Management ββββββββββββββββββββββββββββββββ
/// Set a secret/API key for a plugin.
pub fn secret_set(&self, plugin_id: &str, key: &str, value: &str) -> Result<(), BexError> {
self.inner.engine.secret_set(plugin_id, key, value)
}
/// Get a secret/API key for a plugin. Returns the value or None.
pub fn secret_get(&self, plugin_id: &str, key: &str) -> Result<Option<String>, BexError> {
self.inner.engine.secret_get(plugin_id, key)
}
/// Delete a secret/API key for a plugin. Returns true if the key existed.
pub fn secret_remove(&self, plugin_id: &str, key: &str) -> Result<bool, BexError> {
self.inner.engine.secret_remove(plugin_id, key)
}
/// List all secret keys for a plugin.
pub fn secret_keys(&self, plugin_id: &str) -> Result<Vec<String>, BexError> {
self.inner.engine.secret_keys(plugin_id)
}
// ββ JSON-based API calls (used by FFI) βββββββββββββββββββββββββ
/// Search for media. Returns JSON string.
pub fn call_search_json(&self, plugin_id: &str, query: &str) -> Result<String, BexError> {
self.inner.engine.call_search_json(plugin_id, query)
}
/// Get home page. Returns JSON string.
pub fn call_get_home_json(&self, plugin_id: &str) -> Result<String, BexError> {
self.inner.engine.call_get_home_json(plugin_id)
}
/// Get media info. Returns JSON string.
/// The media_id is opaque β the plugin knows how to interpret it.
pub fn call_get_info_json(&self, plugin_id: &str, media_id: &str) -> Result<String, BexError> {
self.inner.engine.call_get_info_json(plugin_id, media_id)
}
/// Get servers for an episode. Returns JSON string.
/// The id is self-describing β the plugin knows how to parse its own IDs.
pub fn call_get_servers_json(&self, plugin_id: &str, id: &str) -> Result<String, BexError> {
self.inner.engine.call_get_servers_json(plugin_id, id)
}
/// Resolve a stream URL. Returns JSON string.
pub fn call_resolve_stream_json(&self, plugin_id: &str, server_json: &str) -> Result<String, BexError> {
self.inner.engine.call_resolve_stream_json(plugin_id, server_json)
}
// ββ Stats and Shutdown ββββββββββββββββββββββββββββββββββββββββββ
/// Get engine stats.
pub fn stats(&self) -> bex_types::engine_types::EngineStats {
self.inner.engine.stats()
}
/// Shut down the runtime gracefully.
pub fn shutdown(&self) {
tracing::info!("BexRuntime shutting down...");
self.inner.state.store(STATE_DRAINING, Ordering::Release);
// Give active tasks a brief window to complete
let deadline = std::time::Instant::now() + std::time::Duration::from_millis(500);
while std::time::Instant::now() < deadline {
std::thread::sleep(std::time::Duration::from_millis(50));
}
self.inner.state.store(STATE_STOPPED, Ordering::Release);
tracing::info!("BexRuntime shut down complete");
}
}
|