Plugin Development Guide
This guide is a complete tutorial for developing custom plugins using the Monibuca V6 SDK. You will learn the full plugin lifecycle, configuration system, streaming operations, HTTP services, task management, and other core concepts, and master various plugin development patterns through practical examples.
Overview
Section titled “Overview”Monibuca adopts a plugin-based architecture where all protocol support and extension features are implemented as plugins. The SDK (monibuca-sdk) is the sole contract layer between plugins and the engine — plugins only need to depend on the SDK crate without needing to understand engine internals.
Your plugin crate │ └──▶ sdk (monibuca-sdk) ← sole dependency, open source │ └──▶ codec (monibuca-codec) ← codec layerAfter development, plugins can be deployed in three modes:
| Mode | Description | Use Case |
|---|---|---|
| Static compilation | Linked to the engine binary at compile time | Best performance, production deployment |
| Dynamic loading | Compiled as .so/.dylib, loaded at runtime | Hot updates, third-party plugins |
| WASM sandbox | Compiled as a WASM module, executed in isolation | Security isolation, untrusted plugins |
Quick Start: Creating Your First Plugin
Section titled “Quick Start: Creating Your First Plugin”1. Create the Project
Section titled “1. Create the Project”mkdir -p plugins/my-plugin/srccd plugins/my-plugin2. Write Cargo.toml
Section titled “2. Write Cargo.toml”[package]name = "plugin-my-plugin"version = "0.1.0"edition = "2024"
[dependencies]sdk = { git = "https://github.com/langhuihui/monibuca-sdk" }# If developing within the monibuca workspace:# sdk = { path = "../../crates/monibuca-sdk" }
async-trait = "0.1"serde = { version = "1.0", features = ["derive"] }serde_json = "1.0"tokio = { version = "1", features = ["full"] }tokio-util = "0.7"tracing = "0.1"3. Write lib.rs
Section titled “3. Write lib.rs”mod config;mod plugin;
pub use plugin::MyPlugin;
// Export C ABI symbols for dynamic loading modesdk::export_plugin!(MyPlugin);4. Write the Configuration config.rs
Section titled “4. Write the Configuration config.rs”use sdk::ConfigSchema;use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Serialize, Deserialize, ConfigSchema)]#[schema( plugin = "my-plugin", title = "My Plugin", description = "A custom Monibuca plugin", sdk_path = "sdk" // Required for standalone plugin crates)]pub struct MyPluginConfig { /// Whether to enable the plugin #[serde(default = "default_enable")] #[schema(label = "Enable", desc = "Whether to enable this plugin")] pub enable: bool,
/// Custom parameter #[serde(default = "default_message")] #[schema(label = "Message", desc = "Custom welcome message")] pub message: String,}
fn default_enable() -> bool { true }fn default_message() -> String { "Hello from MyPlugin!".to_string() }
impl Default for MyPluginConfig { fn default() -> Self { Self { enable: default_enable(), message: default_message(), } }}5. Write the Plugin Main Body plugin.rs
Section titled “5. Write the Plugin Main Body plugin.rs”use std::any::Any;use std::sync::Arc;use async_trait::async_trait;use tokio_util::sync::CancellationToken;use sdk::prelude::*;
use crate::config::MyPluginConfig;
pub struct MyPlugin { state: PluginState, config: MyPluginConfig,}
impl Default for MyPlugin { fn default() -> Self { Self { state: PluginState::Created, config: MyPluginConfig::default(), } }}
#[async_trait]impl Plugin for MyPlugin { fn info(&self) -> PluginInfo { PluginInfo { name: "my-plugin", version: "0.1.0", description: "A custom Monibuca plugin", author: "Your Name", } }
fn state(&self) -> PluginState { self.state }
async fn init( &mut self, _manager: Arc<dyn StreamManagerApi>, config: Arc<dyn ConfigProvider>, ) -> Result<()> { // Load plugin configuration from config file self.config = config .get_plugin_config("my-plugin") .and_then(|v| serde_json::from_value(v).ok()) .unwrap_or_default();
if !self.config.enable { tracing::info!("MyPlugin is disabled"); self.state = PluginState::Disabled; return Ok(()); }
tracing::info!("MyPlugin initialized: {}", self.config.message); self.state = PluginState::Initialized; Ok(()) }
async fn start(&mut self, _cancel: CancellationToken) -> Result<()> { if self.state == PluginState::Disabled { return Ok(()); } self.state = PluginState::Running; tracing::info!("MyPlugin started"); Ok(()) }
async fn stop(&mut self) -> Result<()> { self.state = PluginState::Stopped; tracing::info!("MyPlugin stopped"); Ok(()) }
fn as_any(&self) -> &dyn Any { self } fn as_any_mut(&mut self) -> &mut dyn Any { self }}6. Corresponding Configuration File
Section titled “6. Corresponding Configuration File”Add the following to Monibuca’s config.yaml:
my-plugin: enable: true message: "Welcome to Monibuca!"Congratulations! You have created a minimal but complete Monibuca plugin. Next, let’s dive into each core concept.
Plugin Trait in Detail
Section titled “Plugin Trait in Detail”The Plugin trait is the core interface for all plugins. The engine manages the complete plugin lifecycle through this trait.
Full Interface
Section titled “Full Interface”#[async_trait]pub trait Plugin: Send + Sync { // ── Required ────────────────────────────────────────────── fn info(&self) -> PluginInfo; fn state(&self) -> PluginState; async fn init(&mut self, manager: Arc<dyn StreamManagerApi>, config: Arc<dyn ConfigProvider>) -> Result<()>; async fn start(&mut self, cancel: CancellationToken) -> Result<()>; async fn stop(&mut self) -> Result<()>; fn as_any(&self) -> &dyn Any; fn as_any_mut(&mut self) -> &mut dyn Any;
// ── Optional (have default implementations) ─────────────── fn name(&self) -> &'static str { self.info().name } fn is_enabled(&self) -> bool { true } fn on_config_update(&mut self, _config: &serde_json::Value) -> Result<()> { Ok(()) } fn set_task_work(&mut self, _work: Arc<dyn TaskWork>) {} fn set_engine_context(&mut self, _ctx: EngineContext) {} fn http_routes(&self) -> Vec<HttpRoute> { Vec::new() } fn register_http_handlers(&mut self, _server: SharedHttpServer) {}}Lifecycle
Section titled “Lifecycle”The engine calls plugin methods in the following order:
┌─────────────┐ │ Created │ ← Default::default() or new() └──────┬──────┘ │ ▼ set_engine_context(ctx) ← Inject IoC container (optional capabilities) set_task_work(work) ← Inject task manager │ ▼ init(manager, config) ← Load configuration, initialize dependencies ┌──────┴──────┐ │ Initialized │ └──────┬──────┘ │ ▼ start(cancel_token) ← Start services (TCP/HTTP/background tasks) ┌──────┴──────┐ │ Running │ ← Serving normally └──────┬──────┘ │ cancel_token cancelled, or engine calls stop() ▼ stop() ← Graceful shutdown, clean up resources ┌──────┴──────┐ │ Stopped │ └─────────────┘PluginState Enum
Section titled “PluginState Enum”pub enum PluginState { Created, // Just constructed, not yet initialized Initialized, // init() succeeded Running, // start() succeeded, actively serving Stopped, // stop() completed Disabled, // enable: false in configuration Error, // An unrecoverable error occurred}PluginInfo Metadata
Section titled “PluginInfo Metadata”pub struct PluginInfo { pub name: &'static str, // Unique plugin identifier, matches the key in config file pub version: &'static str, // Semantic version number pub description: &'static str, // Brief description pub author: &'static str, // Author information}Configuration System
Section titled “Configuration System”ConfigSchema Derive Macro
Section titled “ConfigSchema Derive Macro”The ConfigSchema macro automatically generates a JSON Schema for plugin configuration structs, used by the management UI and API.
Struct-Level Attributes
Section titled “Struct-Level Attributes”#[derive(Debug, Clone, Serialize, Deserialize, ConfigSchema)]#[schema( plugin = "rtmp", // Required: plugin name identifier title = "RTMP Plugin", // Required: display title description = "RTMP/RTMPS streaming", // Required: description sdk_path = "sdk", // Required: must be "sdk" for standalone crates override_groups = "Publish,Subscribe,Tcp" // Optional: global config groups to override)]| Attribute | Required | Description |
|---|---|---|
plugin | ✅ | Plugin name identifier, matches the key in the config file |
title | ✅ | Schema title, displayed in the management UI |
description | ✅ | Schema description |
sdk_path | ✅ | SDK crate path, must be "sdk" for standalone crates |
override_groups | ❌ | Global config groups overridden by this plugin, comma-separated |
Field-Level Attributes
Section titled “Field-Level Attributes”pub struct MyConfig { #[schema(label = "Port", desc = "Service listening port")] pub port: u16,
#[schema(label = "Buffer Size", desc = "Read buffer size", min = 128, max = 65536, suffix = " bytes")] pub buffer_size: u32,
#[schema(label = "Output Directory", desc = "File output path", pattern = "^[a-zA-Z0-9_/\\-\\.]+$", help = "Only letters, digits, underscores, slashes, hyphens, and dots are allowed")] pub output_dir: String,
#[schema(group = "advanced", group_label = "Advanced Settings", group_desc = "Production environment tuning parameters")] pub max_connections: u32,
#[schema(skip)] // Not exposed in the Schema pub internal_state: String,}| Attribute | Description |
|---|---|
label | Field display name |
desc | Field description |
min / max | Numeric range constraints |
suffix | Display unit suffix |
pattern | Regex validation pattern |
help | Additional help text |
group / group_label / group_desc | Configuration grouping |
skip | Exclude from Schema |
Using CommonConfig to Inherit Global Settings
Section titled “Using CommonConfig to Inherit Global Settings”Many plugins need global parameters such as TCP listen address and HTTP settings. Use #[serde(flatten)] to inherit CommonConfig:
use sdk::prelude::*;
#[derive(Debug, Clone, Serialize, Deserialize, ConfigSchema)]#[schema( plugin = "rtmp", title = "RTMP Plugin", description = "RTMP streaming configuration", override_groups = "Publish,Subscribe,Tcp", sdk_path = "sdk")]#[serde(default, rename_all = "lowercase")]pub struct RtmpPluginConfig { #[serde(default)] #[schema(label = "Enable", desc = "Whether to enable RTMP")] pub enable: bool,
/// Inherit global TCP/HTTP/Publish/Subscribe settings #[serde(flatten)] #[schema(skip)] pub common: CommonConfig,
/// Plugin-specific configuration #[serde(default = "default_chunk_size")] #[schema(label = "Chunk Size", desc = "RTMP chunk size", min = 128, max = 65536, suffix = " bytes")] pub chunk_size: u32,}In this case, the YAML configuration becomes:
rtmp: enable: true tcp: listenaddr: ":1935" # From CommonConfig.tcp chunksize: 4096 # Plugin-specificLoading and Using Configuration
Section titled “Loading and Using Configuration”Load configuration in the init() method:
async fn init( &mut self, manager: Arc<dyn StreamManagerApi>, config: Arc<dyn ConfigProvider>,) -> Result<()> { // Method 1: Parse into a struct (recommended) let my_config: MyPluginConfig = config .get_plugin_config("my-plugin") .and_then(|v| serde_json::from_value(v).ok()) .unwrap_or_default();
// Check the enable flag if !my_config.enable { self.state = PluginState::Disabled; return Ok(()); }
// Method 2: Read a single global config value if let Some(log_level) = config.get("loglevel") { tracing::info!("Global log level: {}", log_level); }
// Method 3: Load default YAML configuration config.load_default_config("my-plugin", include_str!("default.yaml"))?;
self.config = my_config; self.stream_manager = Some(manager); self.state = PluginState::Initialized; Ok(())}ConfigProvider Interface
Section titled “ConfigProvider Interface”pub trait ConfigProvider: Send + Sync + Any { /// Get a global configuration item fn get(&self, key: &str) -> Option<serde_json::Value>;
/// Get plugin configuration (returns the entire plugin config JSON object) fn get_plugin_config(&self, plugin_name: &str) -> Option<serde_json::Value>;
/// Load default YAML config for a plugin (if not specified by the user in the config file) fn load_default_config(&self, plugin_name: &str, default_yaml: &str) -> Result<()>;
/// Check if a plugin is enabled in configuration fn is_plugin_enabled(&self, plugin_name: &str) -> bool;
/// Get CORS configuration fn get_cors_config(&self) -> (bool, Vec<String>);}Hot Configuration Reload
Section titled “Hot Configuration Reload”Implement on_config_update() to support runtime configuration changes:
fn on_config_update(&mut self, config: &serde_json::Value) -> Result<()> { if let Ok(new_config) = serde_json::from_value::<MyPluginConfig>(config.clone()) { tracing::info!("Config updated, old port: {}, new port: {}", self.config.port, new_config.port); self.config = new_config; } Ok(())}Stream Operations: Publish and Subscribe
Section titled “Stream Operations: Publish and Subscribe”Stream operations are the core capability of streaming media plugins. StreamManagerApi provides complete interfaces for publishing (ingest) and subscribing (playback).
StreamManagerApi Core Methods
Section titled “StreamManagerApi Core Methods”pub trait StreamManagerApi: Send + Sync + 'static { // ── Publish ────────────────────────────────────────────── fn create_publisher(&self, path: &str) -> Result<Arc<RwLock<dyn Publisher>>>; fn create_publisher_for_plugin(&self, path: &str, plugin_name: &str) -> Result<Arc<RwLock<dyn Publisher>>>; fn get_publisher(&self, path: &str) -> Option<Arc<RwLock<dyn Publisher>>>;
// ── Subscribe ──────────────────────────────────────────── async fn subscribe(&self, path: &str, config: SubscriberConfig) -> Result<(Arc<RwLock<dyn Publisher>>, Box<dyn Subscriber>)>; fn unsubscribe(&self, path: &str, subscriber: &mut dyn Subscriber);
// ── Query ──────────────────────────────────────────────── fn has_stream(&self, path: &str) -> bool; fn stream_count(&self) -> usize; fn stream_paths(&self) -> Vec<String>;
// ── Events ─────────────────────────────────────────────── fn subscribe_events(&self) -> broadcast::Receiver<StreamEvent>;
// ── Lifecycle ──────────────────────────────────────────── fn dispose_stream(&self, path: &str) -> bool;}Publishing a Stream (Ingest Side)
Section titled “Publishing a Stream (Ingest Side)”Here is the complete publishing flow:
use std::time::Duration;use sdk::prelude::*;
async fn publish_stream( manager: &Arc<dyn StreamManagerApi>, stream_path: &str,) -> Result<()> { // 1. Create a Publisher (automatically creates the corresponding Stream) let publisher = manager.create_publisher_for_plugin(stream_path, "my-plugin")?;
// 2. Initialize audio/video Tracks (must be called before writing frames) { let mut pub_guard = publisher.write(); // parking_lot RwLock pub_guard.init_video_track(); pub_guard.init_audio_track(); }
// 3. Set codec information (call once when sequence header is received) { let mut pub_guard = publisher.write();
// Video codec (H.264 example) let video_codec = VideoCodec::new( VideoCodecType::H264, 1920, // width 1080, // height ); pub_guard.set_video_codec(video_codec); pub_guard.set_video_seq_header(sps_pps_data); // Sequence header in AVCC format
// Audio codec (AAC example) let audio_codec = AudioCodec::new( AudioCodecType::AAC, 44100, // sample_rate 2, // channels ); pub_guard.set_audio_codec(audio_codec); pub_guard.set_audio_seq_header(audio_specific_config); }
// 4. Continuously write media frames loop { // Receive data from your protocol/source... let (frame_data, timestamp, is_keyframe) = receive_frame().await?;
// Write video frame (read lock is sufficient, uses interior mutability) let frame_type = if is_keyframe { VideoFrameType::Keyframe } else { VideoFrameType::Interframe }; let dts = Duration::from_millis(timestamp); let pts = dts; // If there is a CTS offset, pts = dts + cts
publisher.read().write_video(frame_type, pts, dts, frame_data)?;
// Write audio frame let audio_ts = Duration::from_millis(audio_timestamp); publisher.read().write_audio(audio_ts, audio_data)?; }
// 5. End publishing manager.dispose_stream(stream_path); Ok(())}Publisher Trait Key Methods
Section titled “Publisher Trait Key Methods”pub trait Publisher: Send + Sync { // ── Track Initialization ───────────────────────────────── fn init_video_track(&mut self); // Must be called first fn init_audio_track(&mut self); // Must be called first
// ── Codec Setup (sequence header, set once) ────────────── fn set_video_codec(&mut self, codec: VideoCodec); fn set_audio_codec(&mut self, codec: AudioCodec); fn set_video_seq_header(&mut self, data: Bytes); fn set_audio_seq_header(&mut self, data: Bytes);
// ── Frame Writing (callable with read lock) ────────────── fn write_video(&self, frame_type: VideoFrameType, pts: Duration, dts: Duration, data: Bytes) -> Result<()>; fn write_video_raw(&self, frame_type: VideoFrameType, timestamp: Duration, dts: Duration, nalus: Vec<Bytes>) -> Result<()>; fn write_audio(&self, timestamp: Duration, data: Bytes) -> Result<()>;
// ── Query ──────────────────────────────────────────────── fn path(&self) -> &str; fn subscriber_count(&self) -> usize; fn video_seq_header(&self) -> Option<Bytes>; fn audio_seq_header(&self) -> Option<Bytes>;
// ── Lifecycle ──────────────────────────────────────────── fn dispose(&self);}Subscribing to a Stream (Playback Side)
Section titled “Subscribing to a Stream (Playback Side)”Here is the complete subscription flow:
use std::time::Duration;use tokio::time::timeout;use sdk::prelude::*;
async fn subscribe_stream( manager: &Arc<dyn StreamManagerApi>, stream_path: &str, cancel: CancellationToken,) -> Result<()> { // 1. Configure subscription parameters let config = SubscriberConfig { mode: SubscribeMode::RealTime, // Real-time mode, start from latest keyframe buffer_time: Duration::from_secs(2), // Buffer time for Buffer mode receive_video: true, receive_audio: true, keyframe_timeout: Some(Duration::from_secs(30)), // Keyframe wait timeout };
// 2. Subscribe to the stream (async wait until stream is available) let (publisher_ref, mut subscriber) = manager.subscribe(stream_path, config).await?;
// 3. Get sequence headers (for initializing decoders or sending to clients) { let pub_guard = publisher_ref.read(); if let Some(video_seq) = pub_guard.video_seq_header() { // Send video sequence header to client... send_video_header(video_seq).await?; } if let Some(audio_seq) = pub_guard.audio_seq_header() { // Send audio sequence header to client... send_audio_header(audio_seq).await?; } }
// 4. Frame reading loop loop { // Check for stop signal if subscriber.is_stopped() || cancel.is_cancelled() { break; }
// Read all available video frames while let Ok(Some(frame)) = subscriber.read_video() { let timestamp = frame.timestamp; let is_key = frame.is_keyframe();
// AVFrame supports zero-copy cached format conversion let flv_tag = frame.get_flv_video_tag(); // FLV format let annexb = frame.get_annexb(); // Annex-B format let avcc = frame.get_avcc(); // AVCC format
// Send to client (choose the format matching the protocol)... send_video_frame(flv_tag, timestamp).await?; }
// Read all available audio frames while let Some(frame) = subscriber.read_audio() { let timestamp = frame.timestamp; let flv_tag = frame.get_flv_audio_tag();
send_audio_frame(flv_tag, timestamp).await?; }
// 5. Wait for new frames (with timeout to maintain responsiveness) match timeout(Duration::from_millis(10), subscriber.wait_for_frames()).await { Ok(Ok(())) => continue, // New frames available, continue reading Ok(Err(_)) => break, // Stream ended Err(_) => continue, // 10ms timeout, continue loop (check cancel) } }
// 6. Cleanup: unsubscribe manager.unsubscribe(stream_path, subscriber.as_mut()); Ok(())}AVFrame Format Conversion
Section titled “AVFrame Format Conversion”AVFrame has built-in lazy-loaded cached format conversions — the conversion is performed and cached on first access, and all subsequent subscribers share the same converted result (zero-copy):
// Video frame formatsframe.get_flv_video_tag() // → Bytes FLV Video Tagframe.get_annexb() // → Bytes Annex-B (00 00 00 01 delimited)frame.get_avcc() // → Bytes AVCC (length-prefixed)frame.get_raw() // → Raw NALUs
// Video frame propertiesframe.is_keyframe() // → boolframe.video_frame_type // → VideoFrameTypeframe.video_codec // → Option<Arc<VideoCodec>>frame.timestamp // → Duration
// Audio frame formatsframe.get_flv_audio_tag() // → Bytes FLV Audio Tagframe.get_audio_raw() // → Raw audio data
// Audio frame propertiesframe.audio_codec // → Option<AudioCodec>frame.timestamp // → DurationSubscriberConfig Parameters
Section titled “SubscriberConfig Parameters”pub struct SubscriberConfig { pub mode: SubscribeMode, pub buffer_time: Duration, pub receive_video: bool, pub receive_audio: bool, pub keyframe_timeout: Option<Duration>,}| Mode | Description | Use Case |
|---|---|---|
RealTime | Start from the latest IDR keyframe | Live real-time viewing |
Buffer | Start from the IDR buffer_time ago | Slightly delayed but smoother |
WaitKeyframe | Wait for the next arriving keyframe | Precise synchronization point |
Listening to Stream Events
Section titled “Listening to Stream Events”Plugins can subscribe to global stream events to respond to stream creation and destruction:
async fn start(&mut self, cancel: CancellationToken) -> Result<()> { let manager = self.stream_manager.clone().unwrap(); let mut event_rx = manager.subscribe_events();
tokio::spawn(async move { loop { tokio::select! { biased; // Prioritize checking the cancel signal _ = cancel.cancelled() => break, event = event_rx.recv() => { match event { Ok(StreamEvent::Created(path)) => { tracing::info!("New stream created: {}", path); } Ok(StreamEvent::Disposed(path)) => { tracing::info!("Stream disposed: {}", path); } Ok(StreamEvent::SubscriberAdded { stream, .. }) => { tracing::info!("New subscriber joined: {}", stream); } Ok(StreamEvent::SubscriberRemoved { stream, .. }) => { tracing::info!("Subscriber left: {}", stream); } Err(_) => break, // Channel closed } } } } });
self.state = PluginState::Running; Ok(())}HTTP Endpoint Registration
Section titled “HTTP Endpoint Registration”Many plugins need to expose HTTP APIs, such as snapshot services, HLS playback, etc.
HttpHandler Trait
Section titled “HttpHandler Trait”#[async_trait]pub trait HttpHandler: Send + Sync { async fn handle(&self, req: HttpRequest) -> HttpResponse;}HttpRequest
Section titled “HttpRequest”pub struct HttpRequest { pub method: String, // "GET", "POST", etc. pub path: String, // "/snap/api/live/stream" pub query: Option<String>, // "format=jpg&quality=85" pub headers: HashMap<String, String>, pub body: Bytes, pub params: HashMap<String, String>, // Route parameters pub peer_addr: SocketAddr,}
// Convenience methodsreq.body_str() // → Option<&str>req.parse_json_body::<T>() // → Result<T>req.query_param("key") // → Option<String>req.path_param("id") // → Option<String>HttpResponse
Section titled “HttpResponse”// Builder pattern for constructing responsesHttpResponse::ok().json(&serde_json::json!({"code": 0, "data": "..."}))HttpResponse::ok().text("Hello World")HttpResponse::ok().content_type("video/mp2t").body(ts_bytes)HttpResponse::ok().header("Cache-Control", "no-cache").body(data)HttpResponse::ok().cors_origin("*")
HttpResponse::not_found().text("Not Found")HttpResponse::bad_request().text("Missing parameter")HttpResponse::internal_error().text("Internal error")HttpResponse::new(302).header("Location", "/redirect").body(Bytes::new())Implementation Example: HTTP Handler with State
Section titled “Implementation Example: HTTP Handler with State”use sdk::prelude::*;use std::sync::Arc;
// Handler holds shared statestruct StatusHandler { stream_manager: Arc<dyn StreamManagerApi>, config: MyPluginConfig,}
#[async_trait]impl HttpHandler for StatusHandler { async fn handle(&self, req: HttpRequest) -> HttpResponse { // Parse query parameters let query: HashMap<String, String> = req.query.as_ref() .map(|q| url::form_urlencoded::parse(q.as_bytes()) .map(|(k, v)| (k.into_owned(), v.into_owned())) .collect()) .unwrap_or_default();
// Route dispatch let parts: Vec<&str> = req.path .trim_start_matches('/') .split('/') .collect();
match (req.method.as_str(), parts.first().copied().unwrap_or("")) { ("GET", "status") => { let stats = self.stream_manager.stats(); HttpResponse::ok().json(&serde_json::json!({ "code": 0, "streams": stats.stream_count, "message": self.config.message, })) } ("GET", "streams") => { let paths = self.stream_manager.stream_paths(); HttpResponse::ok().json(&serde_json::json!({ "code": 0, "streams": paths, })) } ("POST", "action") => { match req.parse_json_body::<serde_json::Value>() { Ok(body) => { // Process request... HttpResponse::ok().json(&serde_json::json!({ "code": 0, "message": "done" })) } Err(_) => HttpResponse::bad_request().text("Invalid JSON body"), } } _ => HttpResponse::not_found().json(&serde_json::json!({ "code": 404, "message": "Not found" })), } }}Registering HTTP Routes
Section titled “Registering HTTP Routes”Register routes in the Plugin’s http_routes() method:
impl Plugin for MyPlugin { fn http_routes(&self) -> Vec<HttpRoute> { let handler = Arc::new(StatusHandler { stream_manager: self.stream_manager.clone().unwrap(), config: self.config.clone(), });
vec![ HttpRoute { method: HttpMethod::Get, path: "/my-plugin/**".to_string(), // Wildcard matching handler: Box::new(HandlerAdapter { handler: handler.clone() }), }, HttpRoute { method: HttpMethod::Post, path: "/my-plugin/**".to_string(), handler: Box::new(HandlerAdapter { handler }), }, ] }}
// If your Handler is in an Arc, you need an adapterstruct HandlerAdapter { handler: Arc<StatusHandler>,}
#[async_trait]impl HttpHandler for HandlerAdapter { async fn handle(&self, req: HttpRequest) -> HttpResponse { self.handler.handle(req).await }}TCP Server Mode
Section titled “TCP Server Mode”Protocol plugins (such as RTMP, RTSP) typically need to listen on a TCP port. The SDK provides the create_tcp_listener utility function.
Complete TCP Server Implementation
Section titled “Complete TCP Server Implementation”use std::net::SocketAddr;use std::sync::atomic::{AtomicU64, Ordering};use std::sync::Arc;use tokio::net::TcpStream;use sdk::prelude::*;
pub struct TcpServerPlugin { state: PluginState, config: TcpServerConfig, stream_manager: Option<Arc<dyn StreamManagerApi>>, task_work: Option<Arc<dyn TaskWork>>, server_handle: Option<tokio::task::JoinHandle<()>>,}
#[async_trait]impl Plugin for TcpServerPlugin { fn set_task_work(&mut self, work: Arc<dyn TaskWork>) { self.task_work = Some(work); }
async fn init( &mut self, manager: Arc<dyn StreamManagerApi>, config: Arc<dyn ConfigProvider>, ) -> Result<()> { self.stream_manager = Some(manager); // Parse listen address... self.state = PluginState::Initialized; Ok(()) }
async fn start(&mut self, cancel: CancellationToken) -> Result<()> { let addr: SocketAddr = self.config.listen_addr.parse() .unwrap_or("0.0.0.0:9090".parse().unwrap()); let manager = self.stream_manager.clone().unwrap(); let task_work = self.task_work.clone(); let session_counter = Arc::new(AtomicU64::new(0));
// Use SDK's create_tcp_listener (enables SO_REUSEPORT) let listener = sdk::create_tcp_listener(addr).map_err(|e| { MonibucaError::Io(std::io::Error::new( std::io::ErrorKind::AddrInUse, format!("Failed to bind {}: {}", addr, e), )) })?;
let listener_task = async move { tracing::info!("TCP server listening on {}", addr);
loop { tokio::select! { biased; _ = cancel.cancelled() => { tracing::info!("TCP server shutting down"); break; } result = listener.accept() => { match result { Ok((stream, peer_addr)) => { let session_id = session_counter .fetch_add(1, Ordering::Relaxed); let manager = manager.clone();
// Create a child task for each connection (monitorable in engine dashboard) let (session_cancel, child_task) = if let Some(ref tw) = task_work { let child = tw.create_child_task( &format!("Session:{}", peer_addr) ); child.start(); (child.cancellation_token(), Some(child)) } else { (cancel.child_token(), None) };
tokio::spawn(async move { if let Err(e) = handle_session( stream, peer_addr, session_id, manager, session_cancel, ).await { tracing::error!( "Session {} error: {}", peer_addr, e ); } // Session ended, stop child task if let Some(task) = child_task { task.stop(); } }); } Err(e) => { tracing::error!("Accept error: {}", e); } } } } } };
// Register the listener task with TaskWork if let Some(ref tw) = self.task_work { tw.register_child("TCPListener", Box::pin(listener_task)); } else { self.server_handle = Some(tokio::spawn(listener_task)); }
self.state = PluginState::Running; Ok(()) }
async fn stop(&mut self) -> Result<()> { if let Some(handle) = self.server_handle.take() { handle.abort(); } self.state = PluginState::Stopped; Ok(()) }
// ... other methods}
async fn handle_session( stream: TcpStream, peer_addr: SocketAddr, session_id: u64, manager: Arc<dyn StreamManagerApi>, cancel: CancellationToken,) -> Result<()> { tracing::info!("New session {} from {}", session_id, peer_addr);
tokio::select! { biased; _ = cancel.cancelled() => { tracing::info!("Session {} cancelled", session_id); } result = process_connection(stream, manager) => { if let Err(e) = result { tracing::warn!("Session {} error: {}", session_id, e); } } }
Ok(())}create_tcp_listener
Section titled “create_tcp_listener”/// Create a TCP listener with SO_REUSEPORT enabled/// Allows multiple threads/processes to accept on the same port simultaneously, improving concurrencypub fn create_tcp_listener(addr: SocketAddr) -> std::io::Result<TcpListener>;TcpSplitter Trait
Section titled “TcpSplitter Trait”For scenarios requiring custom protocol framing, implement TcpSplitter:
/// Split a TCP byte stream into complete protocol messagespub trait TcpSplitter: Send { /// Input new data, return all completed messages fn split(&mut self, data: &[u8]) -> Vec<Vec<u8>>;
/// Return incomplete data currently in the buffer fn pending(&self) -> &[u8];
/// Clear the buffer fn clear(&mut self);}TaskWork Task Management
Section titled “TaskWork Task Management”TaskWork is the engine’s task tree management system, allowing plugin child tasks (TCP connections, recording sessions, etc.) to be monitored and managed in the engine dashboard.
TaskWork Trait
Section titled “TaskWork Trait”pub trait TaskWork: Send + Sync + Any { /// Start the root task fn start(&self);
/// Register a named background task (fire-and-forget) fn register_child(&self, owner: &str, task: Pin<Box<dyn Future<Output=()> + Send>>);
/// Create a manageable child task handle fn create_child_task(&self, owner: &str) -> Box<dyn ChildTaskHandle>;
/// Remove a completed child task fn remove_child_task(&self, child_id: u64);}ChildTaskHandle Trait
Section titled “ChildTaskHandle Trait”pub trait ChildTaskHandle: Send + Sync + Any { fn start(&self); // Start the child task fn stop(&self); // Stop the child task fn id(&self) -> u64; // Get unique ID fn cancellation_token(&self) -> CancellationToken; // Get cancellation token}Two Child Task Patterns
Section titled “Two Child Task Patterns”Pattern A: register_child — Named Background Task
Suitable for long-running single tasks (e.g., TCP listen loop):
fn set_task_work(&mut self, work: Arc<dyn TaskWork>) { self.task_work = Some(work);}
async fn start(&mut self, cancel: CancellationToken) -> Result<()> { let task = async move { // TCP accept loop, event monitoring, etc. loop { tokio::select! { biased; _ = cancel.cancelled() => break, // ... work ... } } };
if let Some(ref tw) = self.task_work { tw.register_child("TCPListener", Box::pin(task)); } Ok(())}Pattern B: create_child_task — Manageable Child Tasks
Suitable for dynamically created multiple child tasks (e.g., each TCP connection, each recording session):
// Create a child tasklet child = task_work.create_child_task(&format!("RTMP:{}", peer_addr));child.start();let cancel_token = child.cancellation_token();
// Use cancel_token in child task logic to detect shutdowntokio::select! { biased; _ = cancel_token.cancelled() => { /* Stopped by engine */ } result = do_work() => { /* Completed normally */ }}
// Clean up after completionchild.stop();task_work.remove_child_task(child.id());EngineContext Dependency Injection
Section titled “EngineContext Dependency Injection”EngineContext is the engine’s IoC (Inversion of Control) container, based on TypeId capability mapping. Plugins use it to access various optional services provided by the engine.
Storing EngineContext
Section titled “Storing EngineContext”pub struct MyPlugin { state: PluginState, engine_context: Option<EngineContext>, database: Option<Arc<dyn DatabaseApi>>, service_registry: Option<Arc<dyn ServiceRegistry>>,}
impl Plugin for MyPlugin { fn set_engine_context(&mut self, ctx: EngineContext) { // Called before init(), safe to store if let Some(db) = ctx.database_arc() { self.database = Some(db); } if let Some(reg) = ctx.service_registry() { self.service_registry = Some(reg); } self.engine_context = Some(ctx); }}Available Capabilities
Section titled “Available Capabilities”| Capability | Getter Method | Check Method | Description |
|---|---|---|---|
| StreamManagerApi | init() parameter | — | Always available, stream management (publish/subscribe/query) |
| DatabaseApi | ctx.database_arc() | ctx.has_database() | Database operations |
| TransformApi | ctx.transform() | ctx.has_transform() | Stream transformation (transcoding, etc.) |
| PlaybackApi | ctx.playback() | ctx.has_playback() | VOD playback |
| ServiceRegistry | ctx.service_registry() | ctx.has_service_registry() | Inter-plugin service discovery |
| RoomApi | ctx.room_api() | ctx.has_room_api() | Room service |
| ProxyManager | ctx.proxy_manager() | ctx.has_proxy_manager() | Proxy management |
| CollectionApi | ctx.collection() | ctx.has_collection() | Data collection operations |
| ReportingApi | ctx.reporting() | ctx.has_reporting() | Data reporting |
| UtilityProvider | ctx.utility_provider() | ctx.has_utility_provider() | Utility functions |
| HttpRegistry | ctx.http_registry() | ctx.has_http_registry() | HTTP route registration |
Generic Capability Access
Section titled “Generic Capability Access”In addition to predefined capabilities, you can access custom capabilities via the generic API:
// Get any type of capabilityif let Some(my_service) = ctx.get::<Arc<dyn MyCustomApi>>() { // Use custom capability...}
// Check if a capability existsif ctx.has::<Arc<dyn MyCustomApi>>() { // ...}ServiceRegistry Service Discovery
Section titled “ServiceRegistry Service Discovery”ServiceRegistry allows plugins to register and discover protocol factory services. It is the core mechanism for implementing cross-protocol interoperability for pulling and pushing streams.
PullerFactory — Pull Stream Factory
Section titled “PullerFactory — Pull Stream Factory”When the engine needs to pull a stream from a remote source, it looks up the corresponding protocol’s PullerFactory via ServiceRegistry:
use sdk::prelude::*;use sdk::services::{PullerFactory, StreamPuller};
pub struct MyPullerFactory;
#[async_trait]impl PullerFactory for MyPullerFactory { fn protocol(&self) -> &str { "myproto" // Protocol identifier }
fn supports_url(&self, url: &str) -> bool { url.starts_with("myproto://") }
async fn create_puller( &self, url: &str, stream_path: &str, config: serde_json::Value, manager: Arc<dyn StreamManagerApi>, cancel_token: CancellationToken, ) -> Result<Arc<tokio::sync::RwLock<Box<dyn StreamPuller>>>> { let puller = MyPuller::new(url, stream_path, manager, cancel_token); Ok(Arc::new(tokio::sync::RwLock::new(Box::new(puller)))) }}Registering Factories
Section titled “Registering Factories”Register in init() or start():
async fn init(&mut self, manager: Arc<dyn StreamManagerApi>, config: Arc<dyn ConfigProvider>) -> Result<()> { // Get ServiceRegistry via EngineContext if let Some(ref ctx) = self.engine_context { if let Some(registry) = ctx.service_registry() { // Register pull stream factory registry.register_puller_factory(Arc::new(MyPullerFactory)); // Register push stream factory registry.register_pusher_factory(Arc::new(MyPusherFactory)); } } Ok(())}Using Factories (Cross-Plugin Collaboration)
Section titled “Using Factories (Cross-Plugin Collaboration)”// Find a pull stream factory that supports the given URLif let Some(factory) = registry.get_puller_factory_for_url("rtmp://example.com/live/test") { let puller = factory.create_puller( "rtmp://example.com/live/test", "live/test", serde_json::json!({}), manager.clone(), cancel.clone(), ).await?;
// Start pulling puller.write().await.start().await?;}
// List all registered pull protocolslet protocols = registry.list_puller_protocols(); // ["rtmp", "rtsp", "srt", ...]Dynamic Loading Export
Section titled “Dynamic Loading Export”export_plugin! Macro
Section titled “export_plugin! Macro”Export C ABI symbols via the export_plugin! macro to support dynamic loading mode:
sdk::export_plugin!(MyPlugin);This macro generates three no_mangle extern "C" functions:
| Symbol | Description |
|---|---|
plugin_api_version() -> u32 | ABI version number for compatibility checking |
plugin_create() -> PluginHandle | Creates a plugin instance (calls Default::default()) |
plugin_metadata() -> PluginMetadata | Returns plugin name, version, description (for discovery) |
Compiling as a Dynamic Library
Section titled “Compiling as a Dynamic Library”# Add to Cargo.toml[lib]crate-type = ["cdylib"]
# Buildcargo build --release
# Output path# Linux: target/release/libplugin_my_plugin.so# macOS: target/release/libplugin_my_plugin.dylib# Windows: target/release/plugin_my_plugin.dll
# Deploy: copy to Monibuca plugins directorycp target/release/libplugin_my_plugin.so /path/to/monibuca/plugins/Compiling to WASM
Section titled “Compiling to WASM”cargo build --target wasm32-wasi --releaseError Handling
Section titled “Error Handling”The SDK provides a unified error type MonibucaError and Result<T>:
use sdk::prelude::*;
// Result<T> = std::result::Result<T, MonibucaError>
// Common error constructorsMonibucaError::Plugin("my error message".to_string()) // Plugin custom errorMonibucaError::Io(io_error) // IO errorMonibucaError::Config("invalid config".to_string()) // Configuration error
// Usage in pluginsasync fn init(&mut self, ...) -> Result<()> { let config = load_config().map_err(|e| MonibucaError::Plugin(format!("Failed to load config: {}", e)) )?; Ok(())}Practical Examples
Section titled “Practical Examples”Example 1: Snapshot Plugin (HTTP + Subscribe)
Section titled “Example 1: Snapshot Plugin (HTTP + Subscribe)”This example demonstrates a practical snapshot plugin — capturing a frame from a specified stream via an HTTP request:
use std::any::Any;use std::sync::Arc;use std::time::Duration;use sdk::prelude::*;
pub struct SnapPlugin { state: PluginState, stream_manager: Option<Arc<dyn StreamManagerApi>>,}
impl Default for SnapPlugin { fn default() -> Self { Self { state: PluginState::Created, stream_manager: None } }}
// HTTP Handler: receive snapshot requestsstruct SnapHandler { stream_manager: Arc<dyn StreamManagerApi>,}
#[async_trait]impl HttpHandler for SnapHandler { async fn handle(&self, req: HttpRequest) -> HttpResponse { // Extract stream name from path: /snap/live/camera → live/camera let stream_path = req.path.trim_start_matches('/');
if !self.stream_manager.has_stream(stream_path) { return HttpResponse::not_found().json(&serde_json::json!({ "error": "Stream not found" })); }
// Subscribe to stream, receive video only, wait for one keyframe let config = SubscriberConfig { mode: SubscribeMode::WaitKeyframe, receive_video: true, receive_audio: false, keyframe_timeout: Some(Duration::from_secs(5)), ..Default::default() };
match self.stream_manager.subscribe(stream_path, config).await { Ok((_pub_ref, mut subscriber)) => { // Wait for the first keyframe if subscriber.wait_for_frames().await.is_ok() { if let Ok(Some(frame)) = subscriber.read_video() { if frame.is_keyframe() { let raw_data = frame.get_raw(); // Can further decode to an image here... return HttpResponse::ok() .content_type("application/octet-stream") .body(raw_data); } } } self.stream_manager.unsubscribe(stream_path, subscriber.as_mut()); HttpResponse::internal_error().text("Failed to capture frame") } Err(e) => { HttpResponse::internal_error().text(&format!("Subscribe failed: {}", e)) } } }}
#[async_trait]impl Plugin for SnapPlugin { fn info(&self) -> PluginInfo { PluginInfo { name: "snap", version: "1.0.0", description: "Video snapshot plugin", author: "Monibuca Team", } } fn state(&self) -> PluginState { self.state }
async fn init(&mut self, manager: Arc<dyn StreamManagerApi>, _config: Arc<dyn ConfigProvider>) -> Result<()> { self.stream_manager = Some(manager); self.state = PluginState::Initialized; Ok(()) }
async fn start(&mut self, _cancel: CancellationToken) -> Result<()> { self.state = PluginState::Running; Ok(()) }
async fn stop(&mut self) -> Result<()> { self.state = PluginState::Stopped; Ok(()) }
fn http_routes(&self) -> Vec<HttpRoute> { if let Some(ref sm) = self.stream_manager { vec![HttpRoute { method: HttpMethod::Get, path: "/snap/**".to_string(), handler: Box::new(SnapHandler { stream_manager: sm.clone(), }), }] } else { Vec::new() } }
fn as_any(&self) -> &dyn Any { self } fn as_any_mut(&mut self) -> &mut dyn Any { self }}Example 2: Stream Relay Plugin (Publish + Subscribe Linked)
Section titled “Example 2: Stream Relay Plugin (Publish + Subscribe Linked)”Relay a stream to another path (e.g., adding a watermark or changing the path):
async fn relay_stream( manager: Arc<dyn StreamManagerApi>, source_path: &str, target_path: &str, cancel: CancellationToken,) -> Result<()> { // 1. Subscribe to the source stream let config = SubscriberConfig::default(); let (source_pub, mut subscriber) = manager.subscribe(source_path, config).await?;
// 2. Create the target Publisher let target_pub = manager.create_publisher_for_plugin(target_path, "relay")?;
// 3. Copy codec configuration { let source = source_pub.read(); let mut target = target_pub.write(); target.init_video_track(); target.init_audio_track();
if let Some(codec) = source.audio_codec() { target.set_audio_codec(codec); } if let Some(seq) = source.video_seq_header() { target.set_video_seq_header(seq); } if let Some(seq) = source.audio_seq_header() { target.set_audio_seq_header(seq); } }
// 4. Relay frame data loop { if subscriber.is_stopped() || cancel.is_cancelled() { break; }
while let Ok(Some(frame)) = subscriber.read_video() { let _ = target_pub.read().write_video( frame.video_frame_type, frame.timestamp, // pts frame.timestamp, // dts frame.get_avcc(), ); }
while let Some(frame) = subscriber.read_audio() { let _ = target_pub.read().write_audio( frame.timestamp, frame.get_audio_raw(), ); }
match tokio::time::timeout( Duration::from_millis(10), subscriber.wait_for_frames(), ).await { Ok(Ok(())) => continue, Ok(Err(_)) => break, Err(_) => continue, } }
// 5. Cleanup manager.unsubscribe(source_path, subscriber.as_mut()); manager.dispose_stream(target_path); Ok(())}Development Best Practices
Section titled “Development Best Practices”1. Keep init() Lightweight
Section titled “1. Keep init() Lightweight”Only perform configuration loading and dependency injection in init(). Time-consuming operations (network listening, file I/O) should go in start():
// ✅ Correctasync fn init(&mut self, manager: Arc<dyn StreamManagerApi>, config: Arc<dyn ConfigProvider>) -> Result<()> { self.config = load_config(config)?; // Lightweight self.stream_manager = Some(manager); // Store reference self.state = PluginState::Initialized; Ok(())}
async fn start(&mut self, cancel: CancellationToken) -> Result<()> { let listener = sdk::create_tcp_listener(self.config.addr)?; // Time-consuming operation // Start accept loop... Ok(())}
// ❌ Wrong: network operations in initasync fn init(&mut self, ...) -> Result<()> { let listener = sdk::create_tcp_listener(addr)?; // Should not be here Ok(())}2. Proper Use of CancellationToken
Section titled “2. Proper Use of CancellationToken”Always check the cancel signal in async loops, using biased to ensure shutdown is prioritized:
loop { tokio::select! { biased; // ← Prioritize checking the first branch _ = cancel.cancelled() => { tracing::info!("Shutting down gracefully"); break; } result = do_work() => { // Handle result... } }}3. Graceful Shutdown
Section titled “3. Graceful Shutdown”Properly clean up all resources in stop():
async fn stop(&mut self) -> Result<()> { // 1. Stop background tasks if let Some(handle) = self.server_handle.take() { handle.abort(); }
// 2. Close network connections // 3. Wait for in-progress operations to complete // 4. Release resources
self.state = PluginState::Stopped; tracing::info!("Plugin stopped"); Ok(())}It is also recommended to implement Drop for sessions holding resources:
impl Drop for MySession { fn drop(&mut self) { self.cleanup(); // Ensure cleanup even on panic }}4. Structured Logging
Section titled “4. Structured Logging”Use tracing for contextual structured logging:
use tracing::{info, warn, error, debug, instrument};
#[instrument(skip(stream), fields(session_id = %session_id))]async fn handle_session(stream: TcpStream, session_id: u64) { info!("Session started"); // All subsequent logs automatically include the session_id field
if let Err(e) = process(stream).await { error!(error = %e, "Session error"); } info!("Session ended");}5. Propagate Errors Instead of Panicking
Section titled “5. Propagate Errors Instead of Panicking”Avoid unwrap() and panic! in plugin code; use Result to propagate errors:
// ✅ Correctlet addr: SocketAddr = config.addr.parse().map_err(|e| MonibucaError::Config(format!("Invalid address: {}", e)))?;
// ❌ Avoidlet addr: SocketAddr = config.addr.parse().unwrap();6. Use TaskWork to Manage Child Tasks
Section titled “6. Use TaskWork to Manage Child Tasks”Register child tasks through TaskWork so the engine can monitor and manage them uniformly:
// ✅ Use TaskWorkif let Some(ref tw) = self.task_work { tw.register_child("TCPListener", Box::pin(listener_task));}
// Weaker alternative (not managed by the engine)tokio::spawn(listener_task);7. Support Hot Configuration Reload
Section titled “7. Support Hot Configuration Reload”For configurations that need runtime adjustment, implement on_config_update():
fn on_config_update(&mut self, config: &serde_json::Value) -> Result<()> { if let Ok(new_config) = serde_json::from_value::<MyConfig>(config.clone()) { // Only update fields that can be safely hot-reloaded self.config.max_connections = new_config.max_connections; self.config.timeout = new_config.timeout; // Note: fields like port that require restart should not be hot-reloaded tracing::info!("Config updated"); } Ok(())}Project Structure Reference
Section titled “Project Structure Reference”The recommended directory structure for a fully-featured plugin:
plugins/my-plugin/├── Cargo.toml├── src/│ ├── lib.rs # Module exports + export_plugin!│ ├── plugin.rs # Plugin trait implementation (lifecycle)│ ├── config.rs # ConfigSchema configuration definition│ ├── handler.rs # HTTP Handler (if HTTP API is needed)│ ├── session.rs # Connection session logic (if TCP service)│ ├── factory.rs # PullerFactory/PusherFactory (if protocol plugin)│ └── error.rs # Plugin custom error types (optional)└── tests/ └── integration.rs # Integration tests (optional)Next Steps
Section titled “Next Steps”- Plugin System — Understand the plugin system architecture design
- Stream Management — Deep dive into Dispatcher and RingBuffer
- SDK API Reference — Complete SDK types and interface reference
- HTTP API — Monibuca built-in HTTP API documentation