Plugin Development in Practice
This page walks you through three complete plugin examples of increasing complexity to help you practice the core development patterns of the Monibuca SDK. Each example can be compiled and run directly.
Example 1: Stream Monitor Plugin
Section titled “Example 1: Stream Monitor Plugin”A highly practical beginner-level plugin — monitors the creation and destruction of all streams and provides real-time statistics via an HTTP API.
Project Structure
Section titled “Project Structure”plugins/monitor/├── Cargo.toml└── src/ ├── lib.rs ├── config.rs └── plugin.rsCargo.toml
Section titled “Cargo.toml”[package]name = "plugin-monitor"version = "0.1.0"edition = "2024"
[dependencies]sdk = { path = "../../crates/monibuca-sdk" }async-trait = "0.1"serde = { version = "1.0", features = ["derive"] }serde_json = "1.0"tokio = { version = "1", features = ["full"] }tokio-util = "0.7"tracing = "0.1"
[lib]crate-type = ["cdylib", "rlib"]src/lib.rs
Section titled “src/lib.rs”mod config;mod plugin;
pub use plugin::MonitorPlugin;
sdk::export_plugin!(MonitorPlugin);src/config.rs
Section titled “src/config.rs”use sdk::ConfigSchema;use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Serialize, Deserialize, ConfigSchema)]#[schema( plugin = "monitor", title = "Stream Monitor", description = "Real-time stream monitoring and statistics", sdk_path = "sdk")]pub struct MonitorConfig { #[serde(default = "default_enable")] #[schema(label = "Enable", desc = "Whether to enable the monitor plugin")] pub enable: bool,
#[serde(default = "default_history_size")] #[schema(label = "History Size", desc = "Number of recent event records to retain", min = 10, max = 10000)] pub history_size: usize,}
fn default_enable() -> bool { true }fn default_history_size() -> usize { 100 }
impl Default for MonitorConfig { fn default() -> Self { Self { enable: default_enable(), history_size: default_history_size(), } }}src/plugin.rs
Section titled “src/plugin.rs”use std::any::Any;use std::collections::VecDeque;use std::sync::Arc;use std::sync::atomic::{AtomicU64, Ordering};use async_trait::async_trait;use tokio_util::sync::CancellationToken;use sdk::prelude::*;use parking_lot::Mutex;
use crate::config::MonitorConfig;
/// Stream event record#[derive(Debug, Clone, serde::Serialize)]struct EventRecord { timestamp: String, event_type: String, stream_path: String,}
/// Shared statistics datastruct MonitorStats { total_created: AtomicU64, total_disposed: AtomicU64, total_subscribers: AtomicU64, event_history: Mutex<VecDeque<EventRecord>>, history_size: usize,}
impl MonitorStats { fn new(history_size: usize) -> Self { Self { total_created: AtomicU64::new(0), total_disposed: AtomicU64::new(0), total_subscribers: AtomicU64::new(0), event_history: Mutex::new(VecDeque::with_capacity(history_size)), history_size, } }
fn record_event(&self, event_type: &str, stream_path: &str) { let record = EventRecord { timestamp: chrono::Utc::now().to_rfc3339(), event_type: event_type.to_string(), stream_path: stream_path.to_string(), }; let mut history = self.event_history.lock(); if history.len() >= self.history_size { history.pop_front(); } history.push_back(record); }}
pub struct MonitorPlugin { state: PluginState, config: MonitorConfig, stream_manager: Option<Arc<dyn StreamManagerApi>>, stats: Option<Arc<MonitorStats>>,}
impl Default for MonitorPlugin { fn default() -> Self { Self { state: PluginState::Created, config: MonitorConfig::default(), stream_manager: None, stats: None, } }}
#[async_trait]impl Plugin for MonitorPlugin { fn info(&self) -> PluginInfo { PluginInfo { name: "monitor", version: "1.0.0", description: "Real-time stream monitoring", author: "Monibuca Team", } }
fn state(&self) -> PluginState { self.state }
async fn init( &mut self, manager: Arc<dyn StreamManagerApi>, config: Arc<dyn ConfigProvider>, ) -> Result<()> { // Load configuration self.config = config .get_plugin_config("monitor") .and_then(|v| serde_json::from_value(v).ok()) .unwrap_or_default();
if !self.config.enable { self.state = PluginState::Disabled; return Ok(()); }
self.stream_manager = Some(manager); self.stats = Some(Arc::new(MonitorStats::new(self.config.history_size))); self.state = PluginState::Initialized;
tracing::info!("Monitor plugin initialized, history_size={}", self.config.history_size); Ok(()) }
async fn start(&mut self, cancel: CancellationToken) -> Result<()> { if self.state == PluginState::Disabled { return Ok(()); }
let manager = self.stream_manager.clone().unwrap(); let stats = self.stats.clone().unwrap();
// Start event listener coroutine let mut event_rx = manager.subscribe_events(); tokio::spawn(async move { loop { tokio::select! { biased; _ = cancel.cancelled() => { tracing::info!("Monitor event listener stopped"); break; } event = event_rx.recv() => { match event { Ok(StreamEvent::Created(path)) => { stats.total_created.fetch_add(1, Ordering::Relaxed); stats.record_event("created", &path); tracing::info!("[Monitor] Stream created: {}", path); } Ok(StreamEvent::Disposed(path)) => { stats.total_disposed.fetch_add(1, Ordering::Relaxed); stats.record_event("disposed", &path); tracing::info!("[Monitor] Stream disposed: {}", path); } Ok(StreamEvent::SubscriberAdded { stream, .. }) => { stats.total_subscribers.fetch_add(1, Ordering::Relaxed); stats.record_event("subscriber_added", &stream); } Ok(StreamEvent::SubscriberRemoved { stream, .. }) => { stats.record_event("subscriber_removed", &stream); } Err(_) => break, } } } } });
self.state = PluginState::Running; tracing::info!("Monitor plugin started"); Ok(()) }
async fn stop(&mut self) -> Result<()> { self.state = PluginState::Stopped; Ok(()) }
/// Register HTTP API fn http_routes(&self) -> Vec<HttpRoute> { if let (Some(sm), Some(stats)) = (&self.stream_manager, &self.stats) { let handler = MonitorHandler { stream_manager: sm.clone(), stats: stats.clone(), }; vec![ HttpRoute { method: HttpMethod::Get, path: "/monitor/**".to_string(), handler: Box::new(handler), }, ] } else { Vec::new() } }
fn as_any(&self) -> &dyn Any { self } fn as_any_mut(&mut self) -> &mut dyn Any { self }}
/// HTTP API Handlerstruct MonitorHandler { stream_manager: Arc<dyn StreamManagerApi>, stats: Arc<MonitorStats>,}
#[async_trait]impl HttpHandler for MonitorHandler { async fn handle(&self, req: HttpRequest) -> HttpResponse { let path = req.path.trim_start_matches('/'); let parts: Vec<&str> = path.split('/').collect(); let action = parts.first().copied().unwrap_or("summary");
match action { // GET /monitor/summary — Get statistics summary "summary" => { let active_streams = self.stream_manager.stream_paths(); HttpResponse::ok().json(&serde_json::json!({ "code": 0, "data": { "active_streams": active_streams.len(), "stream_list": active_streams, "total_created": self.stats.total_created .load(Ordering::Relaxed), "total_disposed": self.stats.total_disposed .load(Ordering::Relaxed), "total_subscribers": self.stats.total_subscribers .load(Ordering::Relaxed), } })) }
// GET /monitor/events — Get recent event records "events" => { let history = self.stats.event_history.lock(); let events: Vec<_> = history.iter().cloned().collect(); HttpResponse::ok().json(&serde_json::json!({ "code": 0, "count": events.len(), "events": events, })) }
_ => HttpResponse::not_found().json(&serde_json::json!({ "code": 404, "message": "Unknown endpoint. Try /monitor/summary or /monitor/events" })), } }}Testing
Section titled “Testing”# After starting Monibuca, access the APIcurl http://localhost:8180/monitor/summary# {"code":0,"data":{"active_streams":2,"stream_list":["live/test","live/camera"],...}}
curl http://localhost:8180/monitor/events# {"code":0,"count":5,"events":[{"timestamp":"...","event_type":"created","stream_path":"live/test"},...]}Example 2: Simple TCP Protocol Server Plugin
Section titled “Example 2: Simple TCP Protocol Server Plugin”A complete TCP server plugin implementing a simple text protocol — clients send commands via TCP to publish or subscribe to streams.
Protocol Design
Section titled “Protocol Design”Simple text protocol, one command per line:
PUBLISH <stream_path> → Start publishing a streamSUBSCRIBE <stream_path> → Start subscribing to a streamDATA <hex_bytes> → Send a data frame (publish mode)STOP → Stop current operationCore Implementation
Section titled “Core Implementation”use std::any::Any;use std::net::SocketAddr;use std::sync::Arc;use std::sync::atomic::{AtomicU64, Ordering};use std::time::Duration;use async_trait::async_trait;use bytes::Bytes;use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};use tokio::net::TcpStream;use tokio_util::sync::CancellationToken;use sdk::prelude::*;
pub struct SimpleTcpPlugin { state: PluginState, listen_addr: SocketAddr, stream_manager: Option<Arc<dyn StreamManagerApi>>, task_work: Option<Arc<dyn TaskWork>>,}
impl Default for SimpleTcpPlugin { fn default() -> Self { Self { state: PluginState::Created, listen_addr: "0.0.0.0:9090".parse().unwrap(), stream_manager: None, task_work: None, } }}
#[async_trait]impl Plugin for SimpleTcpPlugin { fn info(&self) -> PluginInfo { PluginInfo { name: "simple-tcp", version: "0.1.0", description: "Simple TCP protocol plugin", author: "Tutorial", } }
fn state(&self) -> PluginState { self.state }
// Store TaskWork reference fn set_task_work(&mut self, work: Arc<dyn TaskWork>) { self.task_work = Some(work); }
async fn init( &mut self, manager: Arc<dyn StreamManagerApi>, config: Arc<dyn ConfigProvider>, ) -> Result<()> { self.stream_manager = Some(manager);
// Parse listen address if let Some(cfg) = config.get_plugin_config("simple-tcp") { if let Some(addr) = cfg.get("listen_addr").and_then(|v| v.as_str()) { let addr_str = if addr.starts_with(':') { format!("0.0.0.0{}", addr) } else { addr.to_string() }; self.listen_addr = addr_str.parse().unwrap_or(self.listen_addr); } }
self.state = PluginState::Initialized; Ok(()) }
async fn start(&mut self, cancel: CancellationToken) -> Result<()> { let addr = self.listen_addr; let manager = self.stream_manager.clone().unwrap(); let task_work = self.task_work.clone(); let session_counter = Arc::new(AtomicU64::new(0));
// Create TCP listener let listener = sdk::create_tcp_listener(addr).map_err(|e| { MonibucaError::Io(std::io::Error::new( std::io::ErrorKind::AddrInUse, format!("Failed to bind {}: {}", addr, e), )) })?;
tracing::info!("Simple TCP server listening on {}", addr);
// TCP accept main loop let accept_loop = async move { loop { tokio::select! { biased; _ = cancel.cancelled() => { tracing::info!("TCP server shutting down"); break; } result = listener.accept() => { match result { Ok((stream, peer_addr)) => { let session_id = session_counter .fetch_add(1, Ordering::Relaxed); let manager = manager.clone(); let tw = task_work.clone();
// Create a child task for each connection let (session_cancel, child) = if let Some(ref tw) = tw { let child = tw.create_child_task( &format!("TCP:{}", peer_addr) ); child.start(); let token = child.cancellation_token(); (token, Some(child)) } else { (cancel.child_token(), None) };
tokio::spawn(async move { tracing::info!( "Session {} from {}", session_id, peer_addr );
if let Err(e) = handle_session( stream, session_id, manager, session_cancel, ).await { tracing::warn!( "Session {} error: {}", session_id, e ); }
// Clean up child task if let (Some(child), Some(tw)) = (child, tw) { child.stop(); tw.remove_child_task(child.id()); }
tracing::info!( "Session {} ended", session_id ); }); } Err(e) => { tracing::error!("Accept error: {}", e); } } } } } };
// Register with TaskWork if let Some(ref tw) = self.task_work { tw.register_child("TCPListener", Box::pin(accept_loop)); } else { tokio::spawn(accept_loop); }
self.state = PluginState::Running; Ok(()) }
async fn stop(&mut self) -> Result<()> { self.state = PluginState::Stopped; tracing::info!("Simple TCP plugin stopped"); Ok(()) }
fn as_any(&self) -> &dyn Any { self } fn as_any_mut(&mut self) -> &mut dyn Any { self }}
/// Handle a single TCP sessionasync fn handle_session( stream: TcpStream, session_id: u64, manager: Arc<dyn StreamManagerApi>, cancel: CancellationToken,) -> Result<()> { let (reader, mut writer) = stream.into_split(); let mut lines = BufReader::new(reader).lines();
writer.write_all(b"WELCOME simple-tcp/1.0\n").await .map_err(|e| MonibucaError::Io(e))?;
loop { tokio::select! { biased; _ = cancel.cancelled() => { writer.write_all(b"BYE server shutting down\n").await.ok(); break; } line = lines.next_line() => { match line { Ok(Some(line)) => { let parts: Vec<&str> = line.trim().splitn(2, ' ').collect(); let cmd = parts[0].to_uppercase(); let arg = parts.get(1).copied().unwrap_or("");
match cmd.as_str() { "PUBLISH" => { if arg.is_empty() { writer.write_all( b"ERR missing stream path\n" ).await.ok(); continue; } match do_publish( &manager, arg, &mut lines, &mut writer, &cancel, ).await { Ok(_) => { writer.write_all( b"OK publish ended\n" ).await.ok(); } Err(e) => { let msg = format!( "ERR publish failed: {}\n", e ); writer.write_all( msg.as_bytes() ).await.ok(); } } } "SUBSCRIBE" => { if arg.is_empty() { writer.write_all( b"ERR missing stream path\n" ).await.ok(); continue; } match do_subscribe( &manager, arg, &mut writer, &cancel, ).await { Ok(_) => { writer.write_all( b"OK subscribe ended\n" ).await.ok(); } Err(e) => { let msg = format!( "ERR subscribe failed: {}\n", e ); writer.write_all( msg.as_bytes() ).await.ok(); } } } "QUIT" => break, _ => { writer.write_all( b"ERR unknown command\n" ).await.ok(); } } } Ok(None) => break, // Connection closed Err(e) => { tracing::warn!("Read error: {}", e); break; } } } } }
Ok(())}
/// Publish stream logicasync fn do_publish( manager: &Arc<dyn StreamManagerApi>, stream_path: &str, lines: &mut tokio::io::Lines<BufReader<tokio::net::tcp::OwnedReadHalf>>, writer: &mut tokio::net::tcp::OwnedWriteHalf, cancel: &CancellationToken,) -> Result<()> { // Create Publisher let publisher = manager.create_publisher_for_plugin(stream_path, "simple-tcp")?; { let mut pub_guard = publisher.write(); pub_guard.init_video_track(); pub_guard.init_audio_track(); }
writer.write_all( format!("OK publishing to {}\n", stream_path).as_bytes() ).await.map_err(|e| MonibucaError::Io(e))?;
let mut frame_count: u64 = 0;
loop { tokio::select! { biased; _ = cancel.cancelled() => break, line = lines.next_line() => { match line { Ok(Some(line)) => { let line = line.trim(); if line == "STOP" { break; } if let Some(hex_data) = line.strip_prefix("DATA ") { // Simple example: write hex data as video frames if let Ok(data) = hex::decode(hex_data) { let ts = Duration::from_millis(frame_count * 33); let frame_type = if frame_count % 30 == 0 { VideoFrameType::Keyframe } else { VideoFrameType::Interframe }; let _ = publisher.read().write_video( frame_type, ts, ts, Bytes::from(data), ); frame_count += 1; } } } Ok(None) => break, Err(_) => break, } } } }
// Cleanup manager.dispose_stream(stream_path); Ok(())}
/// Subscribe stream logicasync fn do_subscribe( manager: &Arc<dyn StreamManagerApi>, stream_path: &str, writer: &mut tokio::net::tcp::OwnedWriteHalf, cancel: &CancellationToken,) -> Result<()> { let config = SubscriberConfig { mode: SubscribeMode::RealTime, receive_video: true, receive_audio: true, keyframe_timeout: Some(Duration::from_secs(10)), ..Default::default() };
let (_pub_ref, mut subscriber) = manager.subscribe(stream_path, config).await?;
writer.write_all( format!("OK subscribed to {}\n", stream_path).as_bytes() ).await.map_err(|e| MonibucaError::Io(e))?;
loop { if subscriber.is_stopped() || cancel.is_cancelled() { break; }
// Read and forward video frames while let Ok(Some(frame)) = subscriber.read_video() { let info = format!( "FRAME video ts={} keyframe={} size={}\n", frame.timestamp.as_millis(), frame.is_keyframe(), frame.get_raw().len(), ); if writer.write_all(info.as_bytes()).await.is_err() { manager.unsubscribe(stream_path, subscriber.as_mut()); return Ok(()); } }
// Read and forward audio frames while let Some(frame) = subscriber.read_audio() { let info = format!( "FRAME audio ts={} size={}\n", frame.timestamp.as_millis(), frame.get_audio_raw().len(), ); if writer.write_all(info.as_bytes()).await.is_err() { manager.unsubscribe(stream_path, subscriber.as_mut()); return Ok(()); } }
// Wait for new frames match tokio::time::timeout( Duration::from_millis(100), subscriber.wait_for_frames(), ).await { Ok(Ok(())) => continue, Ok(Err(_)) => break, Err(_) => continue, } }
manager.unsubscribe(stream_path, subscriber.as_mut()); Ok(())}Example 3: Protocol Pull Stream Factory Plugin
Section titled “Example 3: Protocol Pull Stream Factory Plugin”Implement a PullerFactory so that other plugins (such as the cluster plugin) can pull streams from remote sources via your protocol.
use std::any::Any;use std::sync::Arc;use std::time::Duration;use async_trait::async_trait;use tokio_util::sync::CancellationToken;use sdk::prelude::*;use sdk::services::{PullerFactory, StreamPuller};
// ════════════════════════════════════════════════════════════════// Plugin Main Body// ════════════════════════════════════════════════════════════════
pub struct MyProtocolPlugin { state: PluginState, stream_manager: Option<Arc<dyn StreamManagerApi>>, engine_context: Option<EngineContext>,}
impl Default for MyProtocolPlugin { fn default() -> Self { Self { state: PluginState::Created, stream_manager: None, engine_context: None, } }}
#[async_trait]impl Plugin for MyProtocolPlugin { fn info(&self) -> PluginInfo { PluginInfo { name: "myproto", version: "0.1.0", description: "My custom protocol plugin", author: "Tutorial", } }
fn state(&self) -> PluginState { self.state }
// Receive EngineContext, obtain ServiceRegistry fn set_engine_context(&mut self, ctx: EngineContext) { self.engine_context = Some(ctx); }
async fn init( &mut self, manager: Arc<dyn StreamManagerApi>, _config: Arc<dyn ConfigProvider>, ) -> Result<()> { self.stream_manager = Some(manager.clone());
// Register pull stream factory with ServiceRegistry if let Some(ref ctx) = self.engine_context { if let Some(registry) = ctx.service_registry() { registry.register_puller_factory( Arc::new(MyProtoPullerFactory { default_manager: manager, }) ); tracing::info!("Registered myproto:// puller factory"); } }
self.state = PluginState::Initialized; Ok(()) }
async fn start(&mut self, _cancel: CancellationToken) -> Result<()> { self.state = PluginState::Running; Ok(()) }
async fn stop(&mut self) -> Result<()> { self.state = PluginState::Stopped; Ok(()) }
fn as_any(&self) -> &dyn Any { self } fn as_any_mut(&mut self) -> &mut dyn Any { self }}
// ════════════════════════════════════════════════════════════════// PullerFactory Implementation// ════════════════════════════════════════════════════════════════
struct MyProtoPullerFactory { default_manager: Arc<dyn StreamManagerApi>,}
#[async_trait]impl PullerFactory for MyProtoPullerFactory { fn protocol(&self) -> &str { "myproto" }
fn supports_url(&self, url: &str) -> bool { url.starts_with("myproto://") }
async fn create_puller( &self, url: &str, stream_path: &str, config: serde_json::Value, manager: Arc<dyn StreamManagerApi>, cancel_token: CancellationToken, ) -> Result<Arc<tokio::sync::RwLock<Box<dyn StreamPuller>>>> { // Parse URL let remote_addr = url.strip_prefix("myproto://") .ok_or_else(|| MonibucaError::Plugin("Invalid URL".to_string()))?;
// Read optional parameters from JSON config let timeout_ms = config.get("timeout_ms") .and_then(|v| v.as_u64()) .unwrap_or(5000);
let puller = MyProtoPuller { remote_addr: remote_addr.to_string(), stream_path: stream_path.to_string(), manager, cancel_token, timeout: Duration::from_millis(timeout_ms), running: false, };
Ok(Arc::new(tokio::sync::RwLock::new( Box::new(puller) as Box<dyn StreamPuller> ))) }}
// ════════════════════════════════════════════════════════════════// StreamPuller Implementation// ════════════════════════════════════════════════════════════════
struct MyProtoPuller { remote_addr: String, stream_path: String, manager: Arc<dyn StreamManagerApi>, cancel_token: CancellationToken, timeout: Duration, running: bool,}
#[async_trait]impl StreamPuller for MyProtoPuller { async fn start(&mut self) -> Result<()> { self.running = true; tracing::info!( "Pulling from myproto://{} to {}", self.remote_addr, self.stream_path );
// 1. Connect to remote server let stream = tokio::time::timeout( self.timeout, tokio::net::TcpStream::connect(&self.remote_addr), ).await .map_err(|_| MonibucaError::Plugin("Connect timeout".to_string()))? .map_err(|e| MonibucaError::Io(e))?;
// 2. Create local Publisher let publisher = self.manager.create_publisher_for_plugin( &self.stream_path, "myproto" )?; { let mut pub_guard = publisher.write(); pub_guard.init_video_track(); pub_guard.init_audio_track(); }
// 3. Read data from remote and write to Publisher let cancel = self.cancel_token.clone(); let (reader, _writer) = stream.into_split(); let mut buf_reader = tokio::io::BufReader::new(reader);
loop { tokio::select! { biased; _ = cancel.cancelled() => break, // This should be your custom protocol parsing logic // Pseudocode for illustration: result = read_protocol_frame(&mut buf_reader) => { match result { Ok(frame) => { publisher.read().write_video( frame.frame_type, frame.pts, frame.dts, frame.data, )?; } Err(_) => break, } } } }
// 4. Cleanup self.manager.dispose_stream(&self.stream_path); self.running = false; Ok(()) }
fn stop(&self) -> Result<()> { self.cancel_token.cancel(); Ok(()) }
fn stream_path(&self) -> &str { &self.stream_path }
fn is_running(&self) -> bool { self.running }}How Other Plugins Use Your Pull Stream Factory
Section titled “How Other Plugins Use Your Pull Stream Factory”// In any other plugin (e.g., cluster plugin):if let Some(registry) = engine_context.service_registry() { // Automatically match factory by URL if let Some(factory) = registry.get_puller_factory_for_url("myproto://192.168.1.100:9090") { let puller = factory.create_puller( "myproto://192.168.1.100:9090/live/test", "live/remote-test", serde_json::json!({"timeout_ms": 3000}), stream_manager.clone(), cancel_token.clone(), ).await?;
// Start pulling puller.write().await.start().await?; }}Summary
Section titled “Summary”Through these three examples, you have mastered the core patterns of Monibuca plugin development:
| Pattern | Example | Key Technologies |
|---|---|---|
| HTTP API Plugin | Stream Monitor | http_routes(), HttpHandler, StreamEvent |
| TCP Server Plugin | Simple TCP | create_tcp_listener, TaskWork, Publish/Subscribe |
| Protocol Factory Plugin | Pull Stream Factory | EngineContext, ServiceRegistry, PullerFactory |
For more information, see:
- Plugin Development Guide — Complete concept explanations and API usage
- SDK API Reference — Full documentation for all types and interfaces
- Plugin System — Plugin system architecture design