File size: 10,102 Bytes
3374e90
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
//! The BexRuntime β€” async callback-driven wrapper around bex_core::Engine.
//!
//! BexRuntime adds these capabilities on top of the sync bex_core::Engine:
//!
//! 1. **Callback-based async**: Plugin call results are delivered directly
//!    to a C function pointer callback from a background Tokio thread.
//!    No event queue, no polling.
//!
//! 2. **Lane-based scheduling**: Plugin calls are dispatched to tokio tasks
//!    with concurrency limits per priority lane (Control, User, Background).
//!
//! 3. **Cancellation**: Each request gets a `CancellationToken`. The C++
//!    backend can cancel via `bex_cancel_request()`.
//!
//! ## Architecture
//!
//! ```text
//! C++ Backend
//!     β”‚
//!     β”œβ”€β”€ bex_submit_search(engine, plugin_id, query, callback, user_data)
//!     β”‚       β†’ returns request_id immediately
//!     β”‚
//!     β”‚   [Rust Tokio background thread]
//!     β”‚   β”œβ”€β”€ Acquires scheduler permit
//!     β”‚   β”œβ”€β”€ Executes plugin call via spawn_blocking
//!     β”‚   └── Invokes callback(user_data, request_id, success, payload, len)
//!     β”‚
//!     β”œβ”€β”€ bex_cancel_request(engine, request_id)
//!     β”‚
//!     BexRuntime (this crate)
//!     β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
//!     β”‚  Scheduler (lane semaphores)      β”‚
//!     β”‚  Cancellation Tokens (DashMap)    β”‚
//!     β”‚  bex_core::Engine (inner)         β”‚
//!     β”‚  Tokio Runtime (owned)            β”‚
//!     β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
//! ```

use std::sync::atomic::{AtomicU8, Ordering};
use std::sync::Arc;

use bex_types::BexError;
use dashmap::DashMap;
use tokio_util::sync::CancellationToken;

use crate::scheduler::{Scheduler, SchedulerConfig};

// ── Runtime State Machine ────────────────────────────────────────────

#[allow(dead_code)]
const STATE_NOT_READY: u8 = 0;
const STATE_READY: u8 = 1;
const STATE_DRAINING: u8 = 2;
const STATE_STOPPED: u8 = 3;

// ── BexRuntime ───────────────────────────────────────────────────────

/// The async runtime that wraps `bex_core::Engine` with callback-driven scheduling.
///
/// Created once at application startup. The C++ backend (via FFI)
/// calls the `bex_submit_*` functions to kick off plugin operations, and
/// receives results via the callback function pointer.
pub struct BexRuntime {
    inner: Arc<RuntimeInner>,
}

struct RuntimeInner {
    /// The underlying sync engine from bex-core.
    engine: bex_core::Engine,

    /// Owned tokio runtime β€” keeps the async threads alive.
    runtime: Arc<tokio::runtime::Runtime>,

    /// Lane-based scheduler for concurrency control.
    #[allow(dead_code)]
    scheduler: Scheduler,

    /// Cancellation tokens per request.
    cancellation: DashMap<u64, CancellationToken>,

    /// Runtime state machine.
    state: AtomicU8,
}

impl BexRuntime {
    /// Create a new BexRuntime from the given engine config.
    pub fn new(config: bex_core::EngineConfig) -> Result<Self, BexError> {
        Self::with_scheduler_config(config, SchedulerConfig::default())
    }

    /// Create a new BexRuntime with custom scheduler limits.
    pub fn with_scheduler_config(
        config: bex_core::EngineConfig,
        scheduler_config: SchedulerConfig,
    ) -> Result<Self, BexError> {
        let engine = bex_core::Engine::new(config)?;

        let runtime = Arc::new(
            tokio::runtime::Builder::new_multi_thread()
                .worker_threads(4)
                .enable_all()
                .build()
                .map_err(|e| BexError::Internal(format!("tokio runtime: {e}")))?,
        );

        let scheduler = Scheduler::with_config(scheduler_config);

        let inner = Arc::new(RuntimeInner {
            engine,
            runtime,
            scheduler,
            cancellation: DashMap::new(),
            state: AtomicU8::new(STATE_READY),
        });

        Ok(Self { inner })
    }

    // ── Accessors for FFI layer ──────────────────────────────────────

    /// Get a clone of the underlying bex-core Engine for use in spawned tasks.
    pub fn clone_engine(&self) -> bex_core::Engine {
        self.inner.engine.clone()
    }

    /// Get a handle to the Tokio runtime for spawning tasks.
    pub fn tokio_handle(&self) -> tokio::runtime::Handle {
        self.inner.runtime.handle().clone()
    }

    /// Insert a cancellation token for a request.
    pub fn insert_cancellation(&self, request_id: u64, token: CancellationToken) {
        self.inner.cancellation.insert(request_id, token);
    }

    /// Remove a cancellation token (after request completes).
    pub fn remove_cancellation(&self, request_id: u64) {
        self.inner.cancellation.remove(&request_id);
    }

    // ── Cancellation ────────────────────────────────────────────────

    /// Cancel a pending request.
    pub fn cancel_request(&self, request_id: u64) -> bool {
        if let Some((_, token)) = self.inner.cancellation.remove(&request_id) {
            token.cancel();
            true
        } else {
            false
        }
    }

    // ── Plugin Management (delegated to bex_core::Engine) ───────────

    /// Install a plugin from a file path.
    pub fn install_plugin(&self, path: &std::path::Path) -> Result<bex_types::plugin_info::PluginInfo, BexError> {
        self.inner.engine.install_plugin(path)
    }

    /// Install a plugin from raw bytes.
    pub fn install_bytes(&self, data: &[u8]) -> Result<bex_types::plugin_info::PluginInfo, BexError> {
        self.inner.engine.install_bytes(data)
    }

    /// Uninstall a plugin.
    pub fn uninstall_plugin(&self, id: &str) -> Result<(), BexError> {
        self.inner.engine.uninstall_plugin(id)
    }

    /// List all installed plugins.
    pub fn list_plugins(&self) -> Vec<bex_types::plugin_info::PluginInfo> {
        self.inner.engine.list_plugins()
    }

    /// Enable a plugin.
    pub fn enable_plugin(&self, id: &str) -> Result<(), BexError> {
        self.inner.engine.enable_plugin(id)
    }

    /// Disable a plugin.
    pub fn disable_plugin(&self, id: &str) -> Result<(), BexError> {
        self.inner.engine.disable_plugin(id)
    }

    /// Get plugin info.
    pub fn get_plugin_info(&self, id: &str) -> Option<bex_types::plugin_info::PluginInfo> {
        self.inner.engine.get_plugin_info(id)
    }

    // ── Secret / API Key Management ────────────────────────────────

    /// Set a secret/API key for a plugin.
    pub fn secret_set(&self, plugin_id: &str, key: &str, value: &str) -> Result<(), BexError> {
        self.inner.engine.secret_set(plugin_id, key, value)
    }

    /// Get a secret/API key for a plugin. Returns the value or None.
    pub fn secret_get(&self, plugin_id: &str, key: &str) -> Result<Option<String>, BexError> {
        self.inner.engine.secret_get(plugin_id, key)
    }

    /// Delete a secret/API key for a plugin. Returns true if the key existed.
    pub fn secret_remove(&self, plugin_id: &str, key: &str) -> Result<bool, BexError> {
        self.inner.engine.secret_remove(plugin_id, key)
    }

    /// List all secret keys for a plugin.
    pub fn secret_keys(&self, plugin_id: &str) -> Result<Vec<String>, BexError> {
        self.inner.engine.secret_keys(plugin_id)
    }

    // ── JSON-based API calls (used by FFI) ─────────────────────────

    /// Search for media. Returns JSON string.
    pub fn call_search_json(&self, plugin_id: &str, query: &str) -> Result<String, BexError> {
        self.inner.engine.call_search_json(plugin_id, query)
    }

    /// Get home page. Returns JSON string.
    pub fn call_get_home_json(&self, plugin_id: &str) -> Result<String, BexError> {
        self.inner.engine.call_get_home_json(plugin_id)
    }

    /// Get media info. Returns JSON string.
    /// The media_id is opaque β€” the plugin knows how to interpret it.
    pub fn call_get_info_json(&self, plugin_id: &str, media_id: &str) -> Result<String, BexError> {
        self.inner.engine.call_get_info_json(plugin_id, media_id)
    }

    /// Get servers for an episode. Returns JSON string.
    /// The id is self-describing β€” the plugin knows how to parse its own IDs.
    pub fn call_get_servers_json(&self, plugin_id: &str, id: &str) -> Result<String, BexError> {
        self.inner.engine.call_get_servers_json(plugin_id, id)
    }

    /// Resolve a stream URL. Returns JSON string.
    pub fn call_resolve_stream_json(&self, plugin_id: &str, server_json: &str) -> Result<String, BexError> {
        self.inner.engine.call_resolve_stream_json(plugin_id, server_json)
    }

    // ── Stats and Shutdown ──────────────────────────────────────────

    /// Get engine stats.
    pub fn stats(&self) -> bex_types::engine_types::EngineStats {
        self.inner.engine.stats()
    }

    /// Shut down the runtime gracefully.
    pub fn shutdown(&self) {
        tracing::info!("BexRuntime shutting down...");
        self.inner.state.store(STATE_DRAINING, Ordering::Release);

        // Give active tasks a brief window to complete
        let deadline = std::time::Instant::now() + std::time::Duration::from_millis(500);
        while std::time::Instant::now() < deadline {
            std::thread::sleep(std::time::Duration::from_millis(50));
        }

        self.inner.state.store(STATE_STOPPED, Ordering::Release);
        tracing::info!("BexRuntime shut down complete");
    }
}