//! The BexRuntime — async callback-driven wrapper around bex_core::Engine. //! //! BexRuntime adds these capabilities on top of the sync bex_core::Engine: //! //! 1. **Callback-based async**: Plugin call results are delivered directly //! to a C function pointer callback from a background Tokio thread. //! No event queue, no polling. //! //! 2. **Lane-based scheduling**: Plugin calls are dispatched to tokio tasks //! with concurrency limits per priority lane (Control, User, Background). //! //! 3. **Cancellation**: Each request gets a `CancellationToken`. The C++ //! backend can cancel via `bex_cancel_request()`. //! //! ## Architecture //! //! ```text //! C++ Backend //! │ //! ├── bex_submit_search(engine, plugin_id, query, callback, user_data) //! │ → returns request_id immediately //! │ //! │ [Rust Tokio background thread] //! │ ├── Acquires scheduler permit //! │ ├── Executes plugin call via spawn_blocking //! │ └── Invokes callback(user_data, request_id, success, payload, len) //! │ //! ├── bex_cancel_request(engine, request_id) //! │ //! BexRuntime (this crate) //! ┌───────────────────────────────────┐ //! │ Scheduler (lane semaphores) │ //! │ Cancellation Tokens (DashMap) │ //! │ bex_core::Engine (inner) │ //! │ Tokio Runtime (owned) │ //! └───────────────────────────────────┘ //! ``` use std::sync::atomic::{AtomicU8, Ordering}; use std::sync::Arc; use bex_types::BexError; use dashmap::DashMap; use tokio_util::sync::CancellationToken; use crate::scheduler::{Scheduler, SchedulerConfig}; // ── Runtime State Machine ──────────────────────────────────────────── #[allow(dead_code)] const STATE_NOT_READY: u8 = 0; const STATE_READY: u8 = 1; const STATE_DRAINING: u8 = 2; const STATE_STOPPED: u8 = 3; // ── BexRuntime ─────────────────────────────────────────────────────── /// The async runtime that wraps `bex_core::Engine` with callback-driven scheduling. /// /// Created once at application startup. The C++ backend (via FFI) /// calls the `bex_submit_*` functions to kick off plugin operations, and /// receives results via the callback function pointer. pub struct BexRuntime { inner: Arc, } struct RuntimeInner { /// The underlying sync engine from bex-core. engine: bex_core::Engine, /// Owned tokio runtime — keeps the async threads alive. runtime: Arc, /// Lane-based scheduler for concurrency control. #[allow(dead_code)] scheduler: Scheduler, /// Cancellation tokens per request. cancellation: DashMap, /// Runtime state machine. state: AtomicU8, } impl BexRuntime { /// Create a new BexRuntime from the given engine config. pub fn new(config: bex_core::EngineConfig) -> Result { Self::with_scheduler_config(config, SchedulerConfig::default()) } /// Create a new BexRuntime with custom scheduler limits. pub fn with_scheduler_config( config: bex_core::EngineConfig, scheduler_config: SchedulerConfig, ) -> Result { let engine = bex_core::Engine::new(config)?; let runtime = Arc::new( tokio::runtime::Builder::new_multi_thread() .worker_threads(4) .enable_all() .build() .map_err(|e| BexError::Internal(format!("tokio runtime: {e}")))?, ); let scheduler = Scheduler::with_config(scheduler_config); let inner = Arc::new(RuntimeInner { engine, runtime, scheduler, cancellation: DashMap::new(), state: AtomicU8::new(STATE_READY), }); Ok(Self { inner }) } // ── Accessors for FFI layer ────────────────────────────────────── /// Get a clone of the underlying bex-core Engine for use in spawned tasks. pub fn clone_engine(&self) -> bex_core::Engine { self.inner.engine.clone() } /// Get a handle to the Tokio runtime for spawning tasks. pub fn tokio_handle(&self) -> tokio::runtime::Handle { self.inner.runtime.handle().clone() } /// Insert a cancellation token for a request. pub fn insert_cancellation(&self, request_id: u64, token: CancellationToken) { self.inner.cancellation.insert(request_id, token); } /// Remove a cancellation token (after request completes). pub fn remove_cancellation(&self, request_id: u64) { self.inner.cancellation.remove(&request_id); } // ── Cancellation ──────────────────────────────────────────────── /// Cancel a pending request. pub fn cancel_request(&self, request_id: u64) -> bool { if let Some((_, token)) = self.inner.cancellation.remove(&request_id) { token.cancel(); true } else { false } } // ── Plugin Management (delegated to bex_core::Engine) ─────────── /// Install a plugin from a file path. pub fn install_plugin(&self, path: &std::path::Path) -> Result { self.inner.engine.install_plugin(path) } /// Install a plugin from raw bytes. pub fn install_bytes(&self, data: &[u8]) -> Result { self.inner.engine.install_bytes(data) } /// Uninstall a plugin. pub fn uninstall_plugin(&self, id: &str) -> Result<(), BexError> { self.inner.engine.uninstall_plugin(id) } /// List all installed plugins. pub fn list_plugins(&self) -> Vec { self.inner.engine.list_plugins() } /// Enable a plugin. pub fn enable_plugin(&self, id: &str) -> Result<(), BexError> { self.inner.engine.enable_plugin(id) } /// Disable a plugin. pub fn disable_plugin(&self, id: &str) -> Result<(), BexError> { self.inner.engine.disable_plugin(id) } /// Get plugin info. pub fn get_plugin_info(&self, id: &str) -> Option { self.inner.engine.get_plugin_info(id) } // ── Secret / API Key Management ──────────────────────────────── /// Set a secret/API key for a plugin. pub fn secret_set(&self, plugin_id: &str, key: &str, value: &str) -> Result<(), BexError> { self.inner.engine.secret_set(plugin_id, key, value) } /// Get a secret/API key for a plugin. Returns the value or None. pub fn secret_get(&self, plugin_id: &str, key: &str) -> Result, BexError> { self.inner.engine.secret_get(plugin_id, key) } /// Delete a secret/API key for a plugin. Returns true if the key existed. pub fn secret_remove(&self, plugin_id: &str, key: &str) -> Result { self.inner.engine.secret_remove(plugin_id, key) } /// List all secret keys for a plugin. pub fn secret_keys(&self, plugin_id: &str) -> Result, BexError> { self.inner.engine.secret_keys(plugin_id) } // ── JSON-based API calls (used by FFI) ───────────────────────── /// Search for media. Returns JSON string. pub fn call_search_json(&self, plugin_id: &str, query: &str) -> Result { self.inner.engine.call_search_json(plugin_id, query) } /// Get home page. Returns JSON string. pub fn call_get_home_json(&self, plugin_id: &str) -> Result { self.inner.engine.call_get_home_json(plugin_id) } /// Get media info. Returns JSON string. /// The media_id is opaque — the plugin knows how to interpret it. pub fn call_get_info_json(&self, plugin_id: &str, media_id: &str) -> Result { self.inner.engine.call_get_info_json(plugin_id, media_id) } /// Get servers for an episode. Returns JSON string. /// The id is self-describing — the plugin knows how to parse its own IDs. pub fn call_get_servers_json(&self, plugin_id: &str, id: &str) -> Result { self.inner.engine.call_get_servers_json(plugin_id, id) } /// Resolve a stream URL. Returns JSON string. pub fn call_resolve_stream_json(&self, plugin_id: &str, server_json: &str) -> Result { self.inner.engine.call_resolve_stream_json(plugin_id, server_json) } // ── Stats and Shutdown ────────────────────────────────────────── /// Get engine stats. pub fn stats(&self) -> bex_types::engine_types::EngineStats { self.inner.engine.stats() } /// Shut down the runtime gracefully. pub fn shutdown(&self) { tracing::info!("BexRuntime shutting down..."); self.inner.state.store(STATE_DRAINING, Ordering::Release); // Give active tasks a brief window to complete let deadline = std::time::Instant::now() + std::time::Duration::from_millis(500); while std::time::Instant::now() < deadline { std::thread::sleep(std::time::Duration::from_millis(50)); } self.inner.state.store(STATE_STOPPED, Ordering::Release); tracing::info!("BexRuntime shut down complete"); } }