Skip to content

Plugin Development Guide

This guide is a complete tutorial for developing custom plugins using the Monibuca V6 SDK. You will learn the full plugin lifecycle, configuration system, streaming operations, HTTP services, task management, and other core concepts, and master various plugin development patterns through practical examples.

Monibuca adopts a plugin-based architecture where all protocol support and extension features are implemented as plugins. The SDK (monibuca-sdk) is the sole contract layer between plugins and the engine — plugins only need to depend on the SDK crate without needing to understand engine internals.

Your plugin crate
└──▶ sdk (monibuca-sdk) ← sole dependency, open source
└──▶ codec (monibuca-codec) ← codec layer

After development, plugins can be deployed in three modes:

ModeDescriptionUse Case
Static compilationLinked to the engine binary at compile timeBest performance, production deployment
Dynamic loadingCompiled as .so/.dylib, loaded at runtimeHot updates, third-party plugins
WASM sandboxCompiled as a WASM module, executed in isolationSecurity isolation, untrusted plugins
Terminal window
mkdir -p plugins/my-plugin/src
cd plugins/my-plugin
[package]
name = "plugin-my-plugin"
version = "0.1.0"
edition = "2024"
[dependencies]
sdk = { git = "https://github.com/langhuihui/monibuca-sdk" }
# If developing within the monibuca workspace:
# sdk = { path = "../../crates/monibuca-sdk" }
async-trait = "0.1"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
tokio = { version = "1", features = ["full"] }
tokio-util = "0.7"
tracing = "0.1"
plugins/my-plugin/src/lib.rs
mod config;
mod plugin;
pub use plugin::MyPlugin;
// Export C ABI symbols for dynamic loading mode
sdk::export_plugin!(MyPlugin);
plugins/my-plugin/src/config.rs
use sdk::ConfigSchema;
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Serialize, Deserialize, ConfigSchema)]
#[schema(
plugin = "my-plugin",
title = "My Plugin",
description = "A custom Monibuca plugin",
sdk_path = "sdk" // Required for standalone plugin crates
)]
pub struct MyPluginConfig {
/// Whether to enable the plugin
#[serde(default = "default_enable")]
#[schema(label = "Enable", desc = "Whether to enable this plugin")]
pub enable: bool,
/// Custom parameter
#[serde(default = "default_message")]
#[schema(label = "Message", desc = "Custom welcome message")]
pub message: String,
}
fn default_enable() -> bool { true }
fn default_message() -> String { "Hello from MyPlugin!".to_string() }
impl Default for MyPluginConfig {
fn default() -> Self {
Self {
enable: default_enable(),
message: default_message(),
}
}
}
plugins/my-plugin/src/plugin.rs
use std::any::Any;
use std::sync::Arc;
use async_trait::async_trait;
use tokio_util::sync::CancellationToken;
use sdk::prelude::*;
use crate::config::MyPluginConfig;
pub struct MyPlugin {
state: PluginState,
config: MyPluginConfig,
}
impl Default for MyPlugin {
fn default() -> Self {
Self {
state: PluginState::Created,
config: MyPluginConfig::default(),
}
}
}
#[async_trait]
impl Plugin for MyPlugin {
fn info(&self) -> PluginInfo {
PluginInfo {
name: "my-plugin",
version: "0.1.0",
description: "A custom Monibuca plugin",
author: "Your Name",
}
}
fn state(&self) -> PluginState {
self.state
}
async fn init(
&mut self,
_manager: Arc<dyn StreamManagerApi>,
config: Arc<dyn ConfigProvider>,
) -> Result<()> {
// Load plugin configuration from config file
self.config = config
.get_plugin_config("my-plugin")
.and_then(|v| serde_json::from_value(v).ok())
.unwrap_or_default();
if !self.config.enable {
tracing::info!("MyPlugin is disabled");
self.state = PluginState::Disabled;
return Ok(());
}
tracing::info!("MyPlugin initialized: {}", self.config.message);
self.state = PluginState::Initialized;
Ok(())
}
async fn start(&mut self, _cancel: CancellationToken) -> Result<()> {
if self.state == PluginState::Disabled {
return Ok(());
}
self.state = PluginState::Running;
tracing::info!("MyPlugin started");
Ok(())
}
async fn stop(&mut self) -> Result<()> {
self.state = PluginState::Stopped;
tracing::info!("MyPlugin stopped");
Ok(())
}
fn as_any(&self) -> &dyn Any { self }
fn as_any_mut(&mut self) -> &mut dyn Any { self }
}

Add the following to Monibuca’s config.yaml:

my-plugin:
enable: true
message: "Welcome to Monibuca!"

Congratulations! You have created a minimal but complete Monibuca plugin. Next, let’s dive into each core concept.


The Plugin trait is the core interface for all plugins. The engine manages the complete plugin lifecycle through this trait.

#[async_trait]
pub trait Plugin: Send + Sync {
// ── Required ──────────────────────────────────────────────
fn info(&self) -> PluginInfo;
fn state(&self) -> PluginState;
async fn init(&mut self, manager: Arc<dyn StreamManagerApi>,
config: Arc<dyn ConfigProvider>) -> Result<()>;
async fn start(&mut self, cancel: CancellationToken) -> Result<()>;
async fn stop(&mut self) -> Result<()>;
fn as_any(&self) -> &dyn Any;
fn as_any_mut(&mut self) -> &mut dyn Any;
// ── Optional (have default implementations) ───────────────
fn name(&self) -> &'static str { self.info().name }
fn is_enabled(&self) -> bool { true }
fn on_config_update(&mut self, _config: &serde_json::Value) -> Result<()> { Ok(()) }
fn set_task_work(&mut self, _work: Arc<dyn TaskWork>) {}
fn set_engine_context(&mut self, _ctx: EngineContext) {}
fn http_routes(&self) -> Vec<HttpRoute> { Vec::new() }
fn register_http_handlers(&mut self, _server: SharedHttpServer) {}
}

The engine calls plugin methods in the following order:

┌─────────────┐
│ Created │ ← Default::default() or new()
└──────┬──────┘
set_engine_context(ctx) ← Inject IoC container (optional capabilities)
set_task_work(work) ← Inject task manager
init(manager, config) ← Load configuration, initialize dependencies
┌──────┴──────┐
│ Initialized │
└──────┬──────┘
start(cancel_token) ← Start services (TCP/HTTP/background tasks)
┌──────┴──────┐
│ Running │ ← Serving normally
└──────┬──────┘
│ cancel_token cancelled, or engine calls stop()
stop() ← Graceful shutdown, clean up resources
┌──────┴──────┐
│ Stopped │
└─────────────┘
pub enum PluginState {
Created, // Just constructed, not yet initialized
Initialized, // init() succeeded
Running, // start() succeeded, actively serving
Stopped, // stop() completed
Disabled, // enable: false in configuration
Error, // An unrecoverable error occurred
}
pub struct PluginInfo {
pub name: &'static str, // Unique plugin identifier, matches the key in config file
pub version: &'static str, // Semantic version number
pub description: &'static str, // Brief description
pub author: &'static str, // Author information
}

The ConfigSchema macro automatically generates a JSON Schema for plugin configuration structs, used by the management UI and API.

#[derive(Debug, Clone, Serialize, Deserialize, ConfigSchema)]
#[schema(
plugin = "rtmp", // Required: plugin name identifier
title = "RTMP Plugin", // Required: display title
description = "RTMP/RTMPS streaming", // Required: description
sdk_path = "sdk", // Required: must be "sdk" for standalone crates
override_groups = "Publish,Subscribe,Tcp" // Optional: global config groups to override
)]
AttributeRequiredDescription
pluginPlugin name identifier, matches the key in the config file
titleSchema title, displayed in the management UI
descriptionSchema description
sdk_pathSDK crate path, must be "sdk" for standalone crates
override_groupsGlobal config groups overridden by this plugin, comma-separated
pub struct MyConfig {
#[schema(label = "Port", desc = "Service listening port")]
pub port: u16,
#[schema(label = "Buffer Size", desc = "Read buffer size",
min = 128, max = 65536, suffix = " bytes")]
pub buffer_size: u32,
#[schema(label = "Output Directory", desc = "File output path",
pattern = "^[a-zA-Z0-9_/\\-\\.]+$",
help = "Only letters, digits, underscores, slashes, hyphens, and dots are allowed")]
pub output_dir: String,
#[schema(group = "advanced", group_label = "Advanced Settings",
group_desc = "Production environment tuning parameters")]
pub max_connections: u32,
#[schema(skip)] // Not exposed in the Schema
pub internal_state: String,
}
AttributeDescription
labelField display name
descField description
min / maxNumeric range constraints
suffixDisplay unit suffix
patternRegex validation pattern
helpAdditional help text
group / group_label / group_descConfiguration grouping
skipExclude from Schema

Using CommonConfig to Inherit Global Settings

Section titled “Using CommonConfig to Inherit Global Settings”

Many plugins need global parameters such as TCP listen address and HTTP settings. Use #[serde(flatten)] to inherit CommonConfig:

use sdk::prelude::*;
#[derive(Debug, Clone, Serialize, Deserialize, ConfigSchema)]
#[schema(
plugin = "rtmp",
title = "RTMP Plugin",
description = "RTMP streaming configuration",
override_groups = "Publish,Subscribe,Tcp",
sdk_path = "sdk"
)]
#[serde(default, rename_all = "lowercase")]
pub struct RtmpPluginConfig {
#[serde(default)]
#[schema(label = "Enable", desc = "Whether to enable RTMP")]
pub enable: bool,
/// Inherit global TCP/HTTP/Publish/Subscribe settings
#[serde(flatten)]
#[schema(skip)]
pub common: CommonConfig,
/// Plugin-specific configuration
#[serde(default = "default_chunk_size")]
#[schema(label = "Chunk Size", desc = "RTMP chunk size",
min = 128, max = 65536, suffix = " bytes")]
pub chunk_size: u32,
}

In this case, the YAML configuration becomes:

rtmp:
enable: true
tcp:
listenaddr: ":1935" # From CommonConfig.tcp
chunksize: 4096 # Plugin-specific

Load configuration in the init() method:

async fn init(
&mut self,
manager: Arc<dyn StreamManagerApi>,
config: Arc<dyn ConfigProvider>,
) -> Result<()> {
// Method 1: Parse into a struct (recommended)
let my_config: MyPluginConfig = config
.get_plugin_config("my-plugin")
.and_then(|v| serde_json::from_value(v).ok())
.unwrap_or_default();
// Check the enable flag
if !my_config.enable {
self.state = PluginState::Disabled;
return Ok(());
}
// Method 2: Read a single global config value
if let Some(log_level) = config.get("loglevel") {
tracing::info!("Global log level: {}", log_level);
}
// Method 3: Load default YAML configuration
config.load_default_config("my-plugin", include_str!("default.yaml"))?;
self.config = my_config;
self.stream_manager = Some(manager);
self.state = PluginState::Initialized;
Ok(())
}
pub trait ConfigProvider: Send + Sync + Any {
/// Get a global configuration item
fn get(&self, key: &str) -> Option<serde_json::Value>;
/// Get plugin configuration (returns the entire plugin config JSON object)
fn get_plugin_config(&self, plugin_name: &str) -> Option<serde_json::Value>;
/// Load default YAML config for a plugin (if not specified by the user in the config file)
fn load_default_config(&self, plugin_name: &str, default_yaml: &str) -> Result<()>;
/// Check if a plugin is enabled in configuration
fn is_plugin_enabled(&self, plugin_name: &str) -> bool;
/// Get CORS configuration
fn get_cors_config(&self) -> (bool, Vec<String>);
}

Implement on_config_update() to support runtime configuration changes:

fn on_config_update(&mut self, config: &serde_json::Value) -> Result<()> {
if let Ok(new_config) = serde_json::from_value::<MyPluginConfig>(config.clone()) {
tracing::info!("Config updated, old port: {}, new port: {}",
self.config.port, new_config.port);
self.config = new_config;
}
Ok(())
}

Stream operations are the core capability of streaming media plugins. StreamManagerApi provides complete interfaces for publishing (ingest) and subscribing (playback).

pub trait StreamManagerApi: Send + Sync + 'static {
// ── Publish ──────────────────────────────────────────────
fn create_publisher(&self, path: &str) -> Result<Arc<RwLock<dyn Publisher>>>;
fn create_publisher_for_plugin(&self, path: &str, plugin_name: &str)
-> Result<Arc<RwLock<dyn Publisher>>>;
fn get_publisher(&self, path: &str) -> Option<Arc<RwLock<dyn Publisher>>>;
// ── Subscribe ────────────────────────────────────────────
async fn subscribe(&self, path: &str, config: SubscriberConfig)
-> Result<(Arc<RwLock<dyn Publisher>>, Box<dyn Subscriber>)>;
fn unsubscribe(&self, path: &str, subscriber: &mut dyn Subscriber);
// ── Query ────────────────────────────────────────────────
fn has_stream(&self, path: &str) -> bool;
fn stream_count(&self) -> usize;
fn stream_paths(&self) -> Vec<String>;
// ── Events ───────────────────────────────────────────────
fn subscribe_events(&self) -> broadcast::Receiver<StreamEvent>;
// ── Lifecycle ────────────────────────────────────────────
fn dispose_stream(&self, path: &str) -> bool;
}

Here is the complete publishing flow:

use std::time::Duration;
use sdk::prelude::*;
async fn publish_stream(
manager: &Arc<dyn StreamManagerApi>,
stream_path: &str,
) -> Result<()> {
// 1. Create a Publisher (automatically creates the corresponding Stream)
let publisher = manager.create_publisher_for_plugin(stream_path, "my-plugin")?;
// 2. Initialize audio/video Tracks (must be called before writing frames)
{
let mut pub_guard = publisher.write(); // parking_lot RwLock
pub_guard.init_video_track();
pub_guard.init_audio_track();
}
// 3. Set codec information (call once when sequence header is received)
{
let mut pub_guard = publisher.write();
// Video codec (H.264 example)
let video_codec = VideoCodec::new(
VideoCodecType::H264,
1920, // width
1080, // height
);
pub_guard.set_video_codec(video_codec);
pub_guard.set_video_seq_header(sps_pps_data); // Sequence header in AVCC format
// Audio codec (AAC example)
let audio_codec = AudioCodec::new(
AudioCodecType::AAC,
44100, // sample_rate
2, // channels
);
pub_guard.set_audio_codec(audio_codec);
pub_guard.set_audio_seq_header(audio_specific_config);
}
// 4. Continuously write media frames
loop {
// Receive data from your protocol/source...
let (frame_data, timestamp, is_keyframe) = receive_frame().await?;
// Write video frame (read lock is sufficient, uses interior mutability)
let frame_type = if is_keyframe {
VideoFrameType::Keyframe
} else {
VideoFrameType::Interframe
};
let dts = Duration::from_millis(timestamp);
let pts = dts; // If there is a CTS offset, pts = dts + cts
publisher.read().write_video(frame_type, pts, dts, frame_data)?;
// Write audio frame
let audio_ts = Duration::from_millis(audio_timestamp);
publisher.read().write_audio(audio_ts, audio_data)?;
}
// 5. End publishing
manager.dispose_stream(stream_path);
Ok(())
}
pub trait Publisher: Send + Sync {
// ── Track Initialization ─────────────────────────────────
fn init_video_track(&mut self); // Must be called first
fn init_audio_track(&mut self); // Must be called first
// ── Codec Setup (sequence header, set once) ──────────────
fn set_video_codec(&mut self, codec: VideoCodec);
fn set_audio_codec(&mut self, codec: AudioCodec);
fn set_video_seq_header(&mut self, data: Bytes);
fn set_audio_seq_header(&mut self, data: Bytes);
// ── Frame Writing (callable with read lock) ──────────────
fn write_video(&self, frame_type: VideoFrameType,
pts: Duration, dts: Duration, data: Bytes) -> Result<()>;
fn write_video_raw(&self, frame_type: VideoFrameType,
timestamp: Duration, dts: Duration,
nalus: Vec<Bytes>) -> Result<()>;
fn write_audio(&self, timestamp: Duration, data: Bytes) -> Result<()>;
// ── Query ────────────────────────────────────────────────
fn path(&self) -> &str;
fn subscriber_count(&self) -> usize;
fn video_seq_header(&self) -> Option<Bytes>;
fn audio_seq_header(&self) -> Option<Bytes>;
// ── Lifecycle ────────────────────────────────────────────
fn dispose(&self);
}

Here is the complete subscription flow:

use std::time::Duration;
use tokio::time::timeout;
use sdk::prelude::*;
async fn subscribe_stream(
manager: &Arc<dyn StreamManagerApi>,
stream_path: &str,
cancel: CancellationToken,
) -> Result<()> {
// 1. Configure subscription parameters
let config = SubscriberConfig {
mode: SubscribeMode::RealTime, // Real-time mode, start from latest keyframe
buffer_time: Duration::from_secs(2), // Buffer time for Buffer mode
receive_video: true,
receive_audio: true,
keyframe_timeout: Some(Duration::from_secs(30)), // Keyframe wait timeout
};
// 2. Subscribe to the stream (async wait until stream is available)
let (publisher_ref, mut subscriber) = manager.subscribe(stream_path, config).await?;
// 3. Get sequence headers (for initializing decoders or sending to clients)
{
let pub_guard = publisher_ref.read();
if let Some(video_seq) = pub_guard.video_seq_header() {
// Send video sequence header to client...
send_video_header(video_seq).await?;
}
if let Some(audio_seq) = pub_guard.audio_seq_header() {
// Send audio sequence header to client...
send_audio_header(audio_seq).await?;
}
}
// 4. Frame reading loop
loop {
// Check for stop signal
if subscriber.is_stopped() || cancel.is_cancelled() {
break;
}
// Read all available video frames
while let Ok(Some(frame)) = subscriber.read_video() {
let timestamp = frame.timestamp;
let is_key = frame.is_keyframe();
// AVFrame supports zero-copy cached format conversion
let flv_tag = frame.get_flv_video_tag(); // FLV format
let annexb = frame.get_annexb(); // Annex-B format
let avcc = frame.get_avcc(); // AVCC format
// Send to client (choose the format matching the protocol)...
send_video_frame(flv_tag, timestamp).await?;
}
// Read all available audio frames
while let Some(frame) = subscriber.read_audio() {
let timestamp = frame.timestamp;
let flv_tag = frame.get_flv_audio_tag();
send_audio_frame(flv_tag, timestamp).await?;
}
// 5. Wait for new frames (with timeout to maintain responsiveness)
match timeout(Duration::from_millis(10), subscriber.wait_for_frames()).await {
Ok(Ok(())) => continue, // New frames available, continue reading
Ok(Err(_)) => break, // Stream ended
Err(_) => continue, // 10ms timeout, continue loop (check cancel)
}
}
// 6. Cleanup: unsubscribe
manager.unsubscribe(stream_path, subscriber.as_mut());
Ok(())
}

AVFrame has built-in lazy-loaded cached format conversions — the conversion is performed and cached on first access, and all subsequent subscribers share the same converted result (zero-copy):

// Video frame formats
frame.get_flv_video_tag() // → Bytes FLV Video Tag
frame.get_annexb() // → Bytes Annex-B (00 00 00 01 delimited)
frame.get_avcc() // → Bytes AVCC (length-prefixed)
frame.get_raw() // → Raw NALUs
// Video frame properties
frame.is_keyframe() // → bool
frame.video_frame_type // → VideoFrameType
frame.video_codec // → Option<Arc<VideoCodec>>
frame.timestamp // → Duration
// Audio frame formats
frame.get_flv_audio_tag() // → Bytes FLV Audio Tag
frame.get_audio_raw() // → Raw audio data
// Audio frame properties
frame.audio_codec // → Option<AudioCodec>
frame.timestamp // → Duration
pub struct SubscriberConfig {
pub mode: SubscribeMode,
pub buffer_time: Duration,
pub receive_video: bool,
pub receive_audio: bool,
pub keyframe_timeout: Option<Duration>,
}
ModeDescriptionUse Case
RealTimeStart from the latest IDR keyframeLive real-time viewing
BufferStart from the IDR buffer_time agoSlightly delayed but smoother
WaitKeyframeWait for the next arriving keyframePrecise synchronization point

Plugins can subscribe to global stream events to respond to stream creation and destruction:

async fn start(&mut self, cancel: CancellationToken) -> Result<()> {
let manager = self.stream_manager.clone().unwrap();
let mut event_rx = manager.subscribe_events();
tokio::spawn(async move {
loop {
tokio::select! {
biased; // Prioritize checking the cancel signal
_ = cancel.cancelled() => break,
event = event_rx.recv() => {
match event {
Ok(StreamEvent::Created(path)) => {
tracing::info!("New stream created: {}", path);
}
Ok(StreamEvent::Disposed(path)) => {
tracing::info!("Stream disposed: {}", path);
}
Ok(StreamEvent::SubscriberAdded { stream, .. }) => {
tracing::info!("New subscriber joined: {}", stream);
}
Ok(StreamEvent::SubscriberRemoved { stream, .. }) => {
tracing::info!("Subscriber left: {}", stream);
}
Err(_) => break, // Channel closed
}
}
}
}
});
self.state = PluginState::Running;
Ok(())
}

Many plugins need to expose HTTP APIs, such as snapshot services, HLS playback, etc.

#[async_trait]
pub trait HttpHandler: Send + Sync {
async fn handle(&self, req: HttpRequest) -> HttpResponse;
}
pub struct HttpRequest {
pub method: String, // "GET", "POST", etc.
pub path: String, // "/snap/api/live/stream"
pub query: Option<String>, // "format=jpg&quality=85"
pub headers: HashMap<String, String>,
pub body: Bytes,
pub params: HashMap<String, String>, // Route parameters
pub peer_addr: SocketAddr,
}
// Convenience methods
req.body_str() // → Option<&str>
req.parse_json_body::<T>() // → Result<T>
req.query_param("key") // → Option<String>
req.path_param("id") // → Option<String>
// Builder pattern for constructing responses
HttpResponse::ok().json(&serde_json::json!({"code": 0, "data": "..."}))
HttpResponse::ok().text("Hello World")
HttpResponse::ok().content_type("video/mp2t").body(ts_bytes)
HttpResponse::ok().header("Cache-Control", "no-cache").body(data)
HttpResponse::ok().cors_origin("*")
HttpResponse::not_found().text("Not Found")
HttpResponse::bad_request().text("Missing parameter")
HttpResponse::internal_error().text("Internal error")
HttpResponse::new(302).header("Location", "/redirect").body(Bytes::new())

Implementation Example: HTTP Handler with State

Section titled “Implementation Example: HTTP Handler with State”
use sdk::prelude::*;
use std::sync::Arc;
// Handler holds shared state
struct StatusHandler {
stream_manager: Arc<dyn StreamManagerApi>,
config: MyPluginConfig,
}
#[async_trait]
impl HttpHandler for StatusHandler {
async fn handle(&self, req: HttpRequest) -> HttpResponse {
// Parse query parameters
let query: HashMap<String, String> = req.query.as_ref()
.map(|q| url::form_urlencoded::parse(q.as_bytes())
.map(|(k, v)| (k.into_owned(), v.into_owned()))
.collect())
.unwrap_or_default();
// Route dispatch
let parts: Vec<&str> = req.path
.trim_start_matches('/')
.split('/')
.collect();
match (req.method.as_str(), parts.first().copied().unwrap_or("")) {
("GET", "status") => {
let stats = self.stream_manager.stats();
HttpResponse::ok().json(&serde_json::json!({
"code": 0,
"streams": stats.stream_count,
"message": self.config.message,
}))
}
("GET", "streams") => {
let paths = self.stream_manager.stream_paths();
HttpResponse::ok().json(&serde_json::json!({
"code": 0,
"streams": paths,
}))
}
("POST", "action") => {
match req.parse_json_body::<serde_json::Value>() {
Ok(body) => {
// Process request...
HttpResponse::ok().json(&serde_json::json!({
"code": 0,
"message": "done"
}))
}
Err(_) => HttpResponse::bad_request().text("Invalid JSON body"),
}
}
_ => HttpResponse::not_found().json(&serde_json::json!({
"code": 404,
"message": "Not found"
})),
}
}
}

Register routes in the Plugin’s http_routes() method:

impl Plugin for MyPlugin {
fn http_routes(&self) -> Vec<HttpRoute> {
let handler = Arc::new(StatusHandler {
stream_manager: self.stream_manager.clone().unwrap(),
config: self.config.clone(),
});
vec![
HttpRoute {
method: HttpMethod::Get,
path: "/my-plugin/**".to_string(), // Wildcard matching
handler: Box::new(HandlerAdapter { handler: handler.clone() }),
},
HttpRoute {
method: HttpMethod::Post,
path: "/my-plugin/**".to_string(),
handler: Box::new(HandlerAdapter { handler }),
},
]
}
}
// If your Handler is in an Arc, you need an adapter
struct HandlerAdapter {
handler: Arc<StatusHandler>,
}
#[async_trait]
impl HttpHandler for HandlerAdapter {
async fn handle(&self, req: HttpRequest) -> HttpResponse {
self.handler.handle(req).await
}
}

Protocol plugins (such as RTMP, RTSP) typically need to listen on a TCP port. The SDK provides the create_tcp_listener utility function.

use std::net::SocketAddr;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use tokio::net::TcpStream;
use sdk::prelude::*;
pub struct TcpServerPlugin {
state: PluginState,
config: TcpServerConfig,
stream_manager: Option<Arc<dyn StreamManagerApi>>,
task_work: Option<Arc<dyn TaskWork>>,
server_handle: Option<tokio::task::JoinHandle<()>>,
}
#[async_trait]
impl Plugin for TcpServerPlugin {
fn set_task_work(&mut self, work: Arc<dyn TaskWork>) {
self.task_work = Some(work);
}
async fn init(
&mut self,
manager: Arc<dyn StreamManagerApi>,
config: Arc<dyn ConfigProvider>,
) -> Result<()> {
self.stream_manager = Some(manager);
// Parse listen address...
self.state = PluginState::Initialized;
Ok(())
}
async fn start(&mut self, cancel: CancellationToken) -> Result<()> {
let addr: SocketAddr = self.config.listen_addr.parse()
.unwrap_or("0.0.0.0:9090".parse().unwrap());
let manager = self.stream_manager.clone().unwrap();
let task_work = self.task_work.clone();
let session_counter = Arc::new(AtomicU64::new(0));
// Use SDK's create_tcp_listener (enables SO_REUSEPORT)
let listener = sdk::create_tcp_listener(addr).map_err(|e| {
MonibucaError::Io(std::io::Error::new(
std::io::ErrorKind::AddrInUse,
format!("Failed to bind {}: {}", addr, e),
))
})?;
let listener_task = async move {
tracing::info!("TCP server listening on {}", addr);
loop {
tokio::select! {
biased;
_ = cancel.cancelled() => {
tracing::info!("TCP server shutting down");
break;
}
result = listener.accept() => {
match result {
Ok((stream, peer_addr)) => {
let session_id = session_counter
.fetch_add(1, Ordering::Relaxed);
let manager = manager.clone();
// Create a child task for each connection (monitorable in engine dashboard)
let (session_cancel, child_task) =
if let Some(ref tw) = task_work {
let child = tw.create_child_task(
&format!("Session:{}", peer_addr)
);
child.start();
(child.cancellation_token(), Some(child))
} else {
(cancel.child_token(), None)
};
tokio::spawn(async move {
if let Err(e) = handle_session(
stream, peer_addr, session_id,
manager, session_cancel,
).await {
tracing::error!(
"Session {} error: {}",
peer_addr, e
);
}
// Session ended, stop child task
if let Some(task) = child_task {
task.stop();
}
});
}
Err(e) => {
tracing::error!("Accept error: {}", e);
}
}
}
}
}
};
// Register the listener task with TaskWork
if let Some(ref tw) = self.task_work {
tw.register_child("TCPListener", Box::pin(listener_task));
} else {
self.server_handle = Some(tokio::spawn(listener_task));
}
self.state = PluginState::Running;
Ok(())
}
async fn stop(&mut self) -> Result<()> {
if let Some(handle) = self.server_handle.take() {
handle.abort();
}
self.state = PluginState::Stopped;
Ok(())
}
// ... other methods
}
async fn handle_session(
stream: TcpStream,
peer_addr: SocketAddr,
session_id: u64,
manager: Arc<dyn StreamManagerApi>,
cancel: CancellationToken,
) -> Result<()> {
tracing::info!("New session {} from {}", session_id, peer_addr);
tokio::select! {
biased;
_ = cancel.cancelled() => {
tracing::info!("Session {} cancelled", session_id);
}
result = process_connection(stream, manager) => {
if let Err(e) = result {
tracing::warn!("Session {} error: {}", session_id, e);
}
}
}
Ok(())
}
/// Create a TCP listener with SO_REUSEPORT enabled
/// Allows multiple threads/processes to accept on the same port simultaneously, improving concurrency
pub fn create_tcp_listener(addr: SocketAddr) -> std::io::Result<TcpListener>;

For scenarios requiring custom protocol framing, implement TcpSplitter:

/// Split a TCP byte stream into complete protocol messages
pub trait TcpSplitter: Send {
/// Input new data, return all completed messages
fn split(&mut self, data: &[u8]) -> Vec<Vec<u8>>;
/// Return incomplete data currently in the buffer
fn pending(&self) -> &[u8];
/// Clear the buffer
fn clear(&mut self);
}

TaskWork is the engine’s task tree management system, allowing plugin child tasks (TCP connections, recording sessions, etc.) to be monitored and managed in the engine dashboard.

pub trait TaskWork: Send + Sync + Any {
/// Start the root task
fn start(&self);
/// Register a named background task (fire-and-forget)
fn register_child(&self, owner: &str, task: Pin<Box<dyn Future<Output=()> + Send>>);
/// Create a manageable child task handle
fn create_child_task(&self, owner: &str) -> Box<dyn ChildTaskHandle>;
/// Remove a completed child task
fn remove_child_task(&self, child_id: u64);
}
pub trait ChildTaskHandle: Send + Sync + Any {
fn start(&self); // Start the child task
fn stop(&self); // Stop the child task
fn id(&self) -> u64; // Get unique ID
fn cancellation_token(&self) -> CancellationToken; // Get cancellation token
}

Pattern A: register_child — Named Background Task

Suitable for long-running single tasks (e.g., TCP listen loop):

fn set_task_work(&mut self, work: Arc<dyn TaskWork>) {
self.task_work = Some(work);
}
async fn start(&mut self, cancel: CancellationToken) -> Result<()> {
let task = async move {
// TCP accept loop, event monitoring, etc.
loop {
tokio::select! {
biased;
_ = cancel.cancelled() => break,
// ... work ...
}
}
};
if let Some(ref tw) = self.task_work {
tw.register_child("TCPListener", Box::pin(task));
}
Ok(())
}

Pattern B: create_child_task — Manageable Child Tasks

Suitable for dynamically created multiple child tasks (e.g., each TCP connection, each recording session):

// Create a child task
let child = task_work.create_child_task(&format!("RTMP:{}", peer_addr));
child.start();
let cancel_token = child.cancellation_token();
// Use cancel_token in child task logic to detect shutdown
tokio::select! {
biased;
_ = cancel_token.cancelled() => { /* Stopped by engine */ }
result = do_work() => { /* Completed normally */ }
}
// Clean up after completion
child.stop();
task_work.remove_child_task(child.id());

EngineContext is the engine’s IoC (Inversion of Control) container, based on TypeId capability mapping. Plugins use it to access various optional services provided by the engine.

pub struct MyPlugin {
state: PluginState,
engine_context: Option<EngineContext>,
database: Option<Arc<dyn DatabaseApi>>,
service_registry: Option<Arc<dyn ServiceRegistry>>,
}
impl Plugin for MyPlugin {
fn set_engine_context(&mut self, ctx: EngineContext) {
// Called before init(), safe to store
if let Some(db) = ctx.database_arc() {
self.database = Some(db);
}
if let Some(reg) = ctx.service_registry() {
self.service_registry = Some(reg);
}
self.engine_context = Some(ctx);
}
}
CapabilityGetter MethodCheck MethodDescription
StreamManagerApiinit() parameterAlways available, stream management (publish/subscribe/query)
DatabaseApictx.database_arc()ctx.has_database()Database operations
TransformApictx.transform()ctx.has_transform()Stream transformation (transcoding, etc.)
PlaybackApictx.playback()ctx.has_playback()VOD playback
ServiceRegistryctx.service_registry()ctx.has_service_registry()Inter-plugin service discovery
RoomApictx.room_api()ctx.has_room_api()Room service
ProxyManagerctx.proxy_manager()ctx.has_proxy_manager()Proxy management
CollectionApictx.collection()ctx.has_collection()Data collection operations
ReportingApictx.reporting()ctx.has_reporting()Data reporting
UtilityProviderctx.utility_provider()ctx.has_utility_provider()Utility functions
HttpRegistryctx.http_registry()ctx.has_http_registry()HTTP route registration

In addition to predefined capabilities, you can access custom capabilities via the generic API:

// Get any type of capability
if let Some(my_service) = ctx.get::<Arc<dyn MyCustomApi>>() {
// Use custom capability...
}
// Check if a capability exists
if ctx.has::<Arc<dyn MyCustomApi>>() {
// ...
}

ServiceRegistry allows plugins to register and discover protocol factory services. It is the core mechanism for implementing cross-protocol interoperability for pulling and pushing streams.

When the engine needs to pull a stream from a remote source, it looks up the corresponding protocol’s PullerFactory via ServiceRegistry:

use sdk::prelude::*;
use sdk::services::{PullerFactory, StreamPuller};
pub struct MyPullerFactory;
#[async_trait]
impl PullerFactory for MyPullerFactory {
fn protocol(&self) -> &str {
"myproto" // Protocol identifier
}
fn supports_url(&self, url: &str) -> bool {
url.starts_with("myproto://")
}
async fn create_puller(
&self,
url: &str,
stream_path: &str,
config: serde_json::Value,
manager: Arc<dyn StreamManagerApi>,
cancel_token: CancellationToken,
) -> Result<Arc<tokio::sync::RwLock<Box<dyn StreamPuller>>>> {
let puller = MyPuller::new(url, stream_path, manager, cancel_token);
Ok(Arc::new(tokio::sync::RwLock::new(Box::new(puller))))
}
}

Register in init() or start():

async fn init(&mut self, manager: Arc<dyn StreamManagerApi>,
config: Arc<dyn ConfigProvider>) -> Result<()> {
// Get ServiceRegistry via EngineContext
if let Some(ref ctx) = self.engine_context {
if let Some(registry) = ctx.service_registry() {
// Register pull stream factory
registry.register_puller_factory(Arc::new(MyPullerFactory));
// Register push stream factory
registry.register_pusher_factory(Arc::new(MyPusherFactory));
}
}
Ok(())
}

Using Factories (Cross-Plugin Collaboration)

Section titled “Using Factories (Cross-Plugin Collaboration)”
// Find a pull stream factory that supports the given URL
if let Some(factory) = registry.get_puller_factory_for_url("rtmp://example.com/live/test") {
let puller = factory.create_puller(
"rtmp://example.com/live/test",
"live/test",
serde_json::json!({}),
manager.clone(),
cancel.clone(),
).await?;
// Start pulling
puller.write().await.start().await?;
}
// List all registered pull protocols
let protocols = registry.list_puller_protocols(); // ["rtmp", "rtsp", "srt", ...]

Export C ABI symbols via the export_plugin! macro to support dynamic loading mode:

lib.rs
sdk::export_plugin!(MyPlugin);

This macro generates three no_mangle extern "C" functions:

SymbolDescription
plugin_api_version() -> u32ABI version number for compatibility checking
plugin_create() -> PluginHandleCreates a plugin instance (calls Default::default())
plugin_metadata() -> PluginMetadataReturns plugin name, version, description (for discovery)
Terminal window
# Add to Cargo.toml
[lib]
crate-type = ["cdylib"]
# Build
cargo build --release
# Output path
# Linux: target/release/libplugin_my_plugin.so
# macOS: target/release/libplugin_my_plugin.dylib
# Windows: target/release/plugin_my_plugin.dll
# Deploy: copy to Monibuca plugins directory
cp target/release/libplugin_my_plugin.so /path/to/monibuca/plugins/
target/wasm32-wasi/release/plugin_my_plugin.wasm
cargo build --target wasm32-wasi --release

The SDK provides a unified error type MonibucaError and Result<T>:

use sdk::prelude::*;
// Result<T> = std::result::Result<T, MonibucaError>
// Common error constructors
MonibucaError::Plugin("my error message".to_string()) // Plugin custom error
MonibucaError::Io(io_error) // IO error
MonibucaError::Config("invalid config".to_string()) // Configuration error
// Usage in plugins
async fn init(&mut self, ...) -> Result<()> {
let config = load_config().map_err(|e|
MonibucaError::Plugin(format!("Failed to load config: {}", e))
)?;
Ok(())
}

Example 1: Snapshot Plugin (HTTP + Subscribe)

Section titled “Example 1: Snapshot Plugin (HTTP + Subscribe)”

This example demonstrates a practical snapshot plugin — capturing a frame from a specified stream via an HTTP request:

use std::any::Any;
use std::sync::Arc;
use std::time::Duration;
use sdk::prelude::*;
pub struct SnapPlugin {
state: PluginState,
stream_manager: Option<Arc<dyn StreamManagerApi>>,
}
impl Default for SnapPlugin {
fn default() -> Self {
Self { state: PluginState::Created, stream_manager: None }
}
}
// HTTP Handler: receive snapshot requests
struct SnapHandler {
stream_manager: Arc<dyn StreamManagerApi>,
}
#[async_trait]
impl HttpHandler for SnapHandler {
async fn handle(&self, req: HttpRequest) -> HttpResponse {
// Extract stream name from path: /snap/live/camera → live/camera
let stream_path = req.path.trim_start_matches('/');
if !self.stream_manager.has_stream(stream_path) {
return HttpResponse::not_found().json(&serde_json::json!({
"error": "Stream not found"
}));
}
// Subscribe to stream, receive video only, wait for one keyframe
let config = SubscriberConfig {
mode: SubscribeMode::WaitKeyframe,
receive_video: true,
receive_audio: false,
keyframe_timeout: Some(Duration::from_secs(5)),
..Default::default()
};
match self.stream_manager.subscribe(stream_path, config).await {
Ok((_pub_ref, mut subscriber)) => {
// Wait for the first keyframe
if subscriber.wait_for_frames().await.is_ok() {
if let Ok(Some(frame)) = subscriber.read_video() {
if frame.is_keyframe() {
let raw_data = frame.get_raw();
// Can further decode to an image here...
return HttpResponse::ok()
.content_type("application/octet-stream")
.body(raw_data);
}
}
}
self.stream_manager.unsubscribe(stream_path, subscriber.as_mut());
HttpResponse::internal_error().text("Failed to capture frame")
}
Err(e) => {
HttpResponse::internal_error().text(&format!("Subscribe failed: {}", e))
}
}
}
}
#[async_trait]
impl Plugin for SnapPlugin {
fn info(&self) -> PluginInfo {
PluginInfo {
name: "snap", version: "1.0.0",
description: "Video snapshot plugin", author: "Monibuca Team",
}
}
fn state(&self) -> PluginState { self.state }
async fn init(&mut self, manager: Arc<dyn StreamManagerApi>,
_config: Arc<dyn ConfigProvider>) -> Result<()> {
self.stream_manager = Some(manager);
self.state = PluginState::Initialized;
Ok(())
}
async fn start(&mut self, _cancel: CancellationToken) -> Result<()> {
self.state = PluginState::Running;
Ok(())
}
async fn stop(&mut self) -> Result<()> {
self.state = PluginState::Stopped;
Ok(())
}
fn http_routes(&self) -> Vec<HttpRoute> {
if let Some(ref sm) = self.stream_manager {
vec![HttpRoute {
method: HttpMethod::Get,
path: "/snap/**".to_string(),
handler: Box::new(SnapHandler {
stream_manager: sm.clone(),
}),
}]
} else {
Vec::new()
}
}
fn as_any(&self) -> &dyn Any { self }
fn as_any_mut(&mut self) -> &mut dyn Any { self }
}

Example 2: Stream Relay Plugin (Publish + Subscribe Linked)

Section titled “Example 2: Stream Relay Plugin (Publish + Subscribe Linked)”

Relay a stream to another path (e.g., adding a watermark or changing the path):

async fn relay_stream(
manager: Arc<dyn StreamManagerApi>,
source_path: &str,
target_path: &str,
cancel: CancellationToken,
) -> Result<()> {
// 1. Subscribe to the source stream
let config = SubscriberConfig::default();
let (source_pub, mut subscriber) = manager.subscribe(source_path, config).await?;
// 2. Create the target Publisher
let target_pub = manager.create_publisher_for_plugin(target_path, "relay")?;
// 3. Copy codec configuration
{
let source = source_pub.read();
let mut target = target_pub.write();
target.init_video_track();
target.init_audio_track();
if let Some(codec) = source.audio_codec() {
target.set_audio_codec(codec);
}
if let Some(seq) = source.video_seq_header() {
target.set_video_seq_header(seq);
}
if let Some(seq) = source.audio_seq_header() {
target.set_audio_seq_header(seq);
}
}
// 4. Relay frame data
loop {
if subscriber.is_stopped() || cancel.is_cancelled() {
break;
}
while let Ok(Some(frame)) = subscriber.read_video() {
let _ = target_pub.read().write_video(
frame.video_frame_type,
frame.timestamp, // pts
frame.timestamp, // dts
frame.get_avcc(),
);
}
while let Some(frame) = subscriber.read_audio() {
let _ = target_pub.read().write_audio(
frame.timestamp,
frame.get_audio_raw(),
);
}
match tokio::time::timeout(
Duration::from_millis(10),
subscriber.wait_for_frames(),
).await {
Ok(Ok(())) => continue,
Ok(Err(_)) => break,
Err(_) => continue,
}
}
// 5. Cleanup
manager.unsubscribe(source_path, subscriber.as_mut());
manager.dispose_stream(target_path);
Ok(())
}

Only perform configuration loading and dependency injection in init(). Time-consuming operations (network listening, file I/O) should go in start():

// ✅ Correct
async fn init(&mut self, manager: Arc<dyn StreamManagerApi>,
config: Arc<dyn ConfigProvider>) -> Result<()> {
self.config = load_config(config)?; // Lightweight
self.stream_manager = Some(manager); // Store reference
self.state = PluginState::Initialized;
Ok(())
}
async fn start(&mut self, cancel: CancellationToken) -> Result<()> {
let listener = sdk::create_tcp_listener(self.config.addr)?; // Time-consuming operation
// Start accept loop...
Ok(())
}
// ❌ Wrong: network operations in init
async fn init(&mut self, ...) -> Result<()> {
let listener = sdk::create_tcp_listener(addr)?; // Should not be here
Ok(())
}

Always check the cancel signal in async loops, using biased to ensure shutdown is prioritized:

loop {
tokio::select! {
biased; // ← Prioritize checking the first branch
_ = cancel.cancelled() => {
tracing::info!("Shutting down gracefully");
break;
}
result = do_work() => {
// Handle result...
}
}
}

Properly clean up all resources in stop():

async fn stop(&mut self) -> Result<()> {
// 1. Stop background tasks
if let Some(handle) = self.server_handle.take() {
handle.abort();
}
// 2. Close network connections
// 3. Wait for in-progress operations to complete
// 4. Release resources
self.state = PluginState::Stopped;
tracing::info!("Plugin stopped");
Ok(())
}

It is also recommended to implement Drop for sessions holding resources:

impl Drop for MySession {
fn drop(&mut self) {
self.cleanup(); // Ensure cleanup even on panic
}
}

Use tracing for contextual structured logging:

use tracing::{info, warn, error, debug, instrument};
#[instrument(skip(stream), fields(session_id = %session_id))]
async fn handle_session(stream: TcpStream, session_id: u64) {
info!("Session started");
// All subsequent logs automatically include the session_id field
if let Err(e) = process(stream).await {
error!(error = %e, "Session error");
}
info!("Session ended");
}

Avoid unwrap() and panic! in plugin code; use Result to propagate errors:

// ✅ Correct
let addr: SocketAddr = config.addr.parse().map_err(|e|
MonibucaError::Config(format!("Invalid address: {}", e))
)?;
// ❌ Avoid
let addr: SocketAddr = config.addr.parse().unwrap();

Register child tasks through TaskWork so the engine can monitor and manage them uniformly:

// ✅ Use TaskWork
if let Some(ref tw) = self.task_work {
tw.register_child("TCPListener", Box::pin(listener_task));
}
// Weaker alternative (not managed by the engine)
tokio::spawn(listener_task);

For configurations that need runtime adjustment, implement on_config_update():

fn on_config_update(&mut self, config: &serde_json::Value) -> Result<()> {
if let Ok(new_config) = serde_json::from_value::<MyConfig>(config.clone()) {
// Only update fields that can be safely hot-reloaded
self.config.max_connections = new_config.max_connections;
self.config.timeout = new_config.timeout;
// Note: fields like port that require restart should not be hot-reloaded
tracing::info!("Config updated");
}
Ok(())
}

The recommended directory structure for a fully-featured plugin:

plugins/my-plugin/
├── Cargo.toml
├── src/
│ ├── lib.rs # Module exports + export_plugin!
│ ├── plugin.rs # Plugin trait implementation (lifecycle)
│ ├── config.rs # ConfigSchema configuration definition
│ ├── handler.rs # HTTP Handler (if HTTP API is needed)
│ ├── session.rs # Connection session logic (if TCP service)
│ ├── factory.rs # PullerFactory/PusherFactory (if protocol plugin)
│ └── error.rs # Plugin custom error types (optional)
└── tests/
└── integration.rs # Integration tests (optional)