跳转到内容

插件开发指南

本指南是使用 Monibuca V6 SDK 开发自定义插件的完整教程。你将学习插件的完整生命周期、配置系统、流媒体操作、HTTP 服务、任务管理等核心概念,并通过实战示例掌握插件开发的各种模式。

Monibuca 采用插件化架构,所有协议支持和扩展功能都以插件形式实现。SDK(monibuca-sdk)是插件与引擎交互的唯一契约层——插件只需依赖 SDK crate,无需了解引擎内部实现。

你的插件 crate
└──▶ sdk (monibuca-sdk) ← 唯一依赖,开源
└──▶ codec (monibuca-codec) ← 编解码底层

插件开发完成后,可以三种模式部署:

模式说明适用场景
静态编译编译时链接到引擎二进制最佳性能,生产部署
动态加载编译为 .so/.dylib,运行时加载热更新,第三方插件
WASM 沙箱编译为 WASM 模块,隔离执行安全隔离,不受信任的插件
Terminal window
mkdir -p plugins/my-plugin/src
cd plugins/my-plugin
[package]
name = "plugin-my-plugin"
version = "0.1.0"
edition = "2024"
[dependencies]
sdk = { git = "https://github.com/langhuihui/monibuca-sdk" }
# 如果在 monibuca workspace 内开发:
# sdk = { path = "../../crates/monibuca-sdk" }
async-trait = "0.1"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
tokio = { version = "1", features = ["full"] }
tokio-util = "0.7"
tracing = "0.1"
plugins/my-plugin/src/lib.rs
mod config;
mod plugin;
pub use plugin::MyPlugin;
// 动态加载模式需要导出 C ABI 符号
sdk::export_plugin!(MyPlugin);
plugins/my-plugin/src/config.rs
use sdk::ConfigSchema;
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Serialize, Deserialize, ConfigSchema)]
#[schema(
plugin = "my-plugin",
title = "My Plugin",
description = "A custom Monibuca plugin",
sdk_path = "sdk" // 独立插件 crate 必须指定
)]
pub struct MyPluginConfig {
/// 是否启用插件
#[serde(default = "default_enable")]
#[schema(label = "启用", desc = "是否启用此插件")]
pub enable: bool,
/// 自定义参数
#[serde(default = "default_message")]
#[schema(label = "消息", desc = "自定义欢迎消息")]
pub message: String,
}
fn default_enable() -> bool { true }
fn default_message() -> String { "Hello from MyPlugin!".to_string() }
impl Default for MyPluginConfig {
fn default() -> Self {
Self {
enable: default_enable(),
message: default_message(),
}
}
}
plugins/my-plugin/src/plugin.rs
use std::any::Any;
use std::sync::Arc;
use async_trait::async_trait;
use tokio_util::sync::CancellationToken;
use sdk::prelude::*;
use crate::config::MyPluginConfig;
pub struct MyPlugin {
state: PluginState,
config: MyPluginConfig,
}
impl Default for MyPlugin {
fn default() -> Self {
Self {
state: PluginState::Created,
config: MyPluginConfig::default(),
}
}
}
#[async_trait]
impl Plugin for MyPlugin {
fn info(&self) -> PluginInfo {
PluginInfo {
name: "my-plugin",
version: "0.1.0",
description: "A custom Monibuca plugin",
author: "Your Name",
}
}
fn state(&self) -> PluginState {
self.state
}
async fn init(
&mut self,
_manager: Arc<dyn StreamManagerApi>,
config: Arc<dyn ConfigProvider>,
) -> Result<()> {
// 从配置文件加载插件配置
self.config = config
.get_plugin_config("my-plugin")
.and_then(|v| serde_json::from_value(v).ok())
.unwrap_or_default();
if !self.config.enable {
tracing::info!("MyPlugin is disabled");
self.state = PluginState::Disabled;
return Ok(());
}
tracing::info!("MyPlugin initialized: {}", self.config.message);
self.state = PluginState::Initialized;
Ok(())
}
async fn start(&mut self, _cancel: CancellationToken) -> Result<()> {
if self.state == PluginState::Disabled {
return Ok(());
}
self.state = PluginState::Running;
tracing::info!("MyPlugin started");
Ok(())
}
async fn stop(&mut self) -> Result<()> {
self.state = PluginState::Stopped;
tracing::info!("MyPlugin stopped");
Ok(())
}
fn as_any(&self) -> &dyn Any { self }
fn as_any_mut(&mut self) -> &mut dyn Any { self }
}

在 Monibuca 的 config.yaml 中添加:

my-plugin:
enable: true
message: "Welcome to Monibuca!"

恭喜!你已经创建了一个最小但完整的 Monibuca 插件。接下来,我们深入了解每个核心概念。


Plugin trait 是所有插件的核心接口。引擎通过这个 trait 管理插件的完整生命周期。

#[async_trait]
pub trait Plugin: Send + Sync {
// ── 必须实现 ──────────────────────────────────────────────
fn info(&self) -> PluginInfo;
fn state(&self) -> PluginState;
async fn init(&mut self, manager: Arc<dyn StreamManagerApi>,
config: Arc<dyn ConfigProvider>) -> Result<()>;
async fn start(&mut self, cancel: CancellationToken) -> Result<()>;
async fn stop(&mut self) -> Result<()>;
fn as_any(&self) -> &dyn Any;
fn as_any_mut(&mut self) -> &mut dyn Any;
// ── 可选覆盖(有默认实现)──────────────────────────────────
fn name(&self) -> &'static str { self.info().name }
fn is_enabled(&self) -> bool { true }
fn on_config_update(&mut self, _config: &serde_json::Value) -> Result<()> { Ok(()) }
fn set_task_work(&mut self, _work: Arc<dyn TaskWork>) {}
fn set_engine_context(&mut self, _ctx: EngineContext) {}
fn http_routes(&self) -> Vec<HttpRoute> { Vec::new() }
fn register_http_handlers(&mut self, _server: SharedHttpServer) {}
}

引擎按以下顺序调用插件方法:

┌─────────────┐
│ Created │ ← Default::default() 或 new()
└──────┬──────┘
set_engine_context(ctx) ← 注入 IoC 容器(可选能力)
set_task_work(work) ← 注入任务管理器
init(manager, config) ← 加载配置、初始化依赖
┌──────┴──────┐
│ Initialized │
└──────┬──────┘
start(cancel_token) ← 启动服务(TCP/HTTP/后台任务)
┌──────┴──────┐
│ Running │ ← 正常服务中
└──────┬──────┘
│ cancel_token 被取消,或引擎调用 stop()
stop() ← 优雅停机,清理资源
┌──────┴──────┐
│ Stopped │
└─────────────┘
pub enum PluginState {
Created, // 刚构造,尚未初始化
Initialized, // init() 成功
Running, // start() 成功,正在服务
Stopped, // stop() 完成
Disabled, // 配置中 enable: false
Error, // 发生不可恢复的错误
}
pub struct PluginInfo {
pub name: &'static str, // 插件唯一标识,与配置文件中的 key 对应
pub version: &'static str, // 语义化版本号
pub description: &'static str, // 简短描述
pub author: &'static str, // 作者信息
}

ConfigSchema 宏自动为插件配置结构体生成 JSON Schema,供管理界面和 API 使用。

#[derive(Debug, Clone, Serialize, Deserialize, ConfigSchema)]
#[schema(
plugin = "rtmp", // 必填:插件名称标识
title = "RTMP Plugin", // 必填:显示标题
description = "RTMP/RTMPS streaming", // 必填:描述
sdk_path = "sdk", // 必填:独立 crate 必须设为 "sdk"
override_groups = "Publish,Subscribe,Tcp" // 可选:覆盖的全局配置组
)]
属性必填说明
plugin插件名称标识,对应配置文件中的 key
titleSchema 标题,用于管理界面显示
descriptionSchema 描述
sdk_pathSDK crate 路径,独立 crate 必须设为 "sdk"
override_groups该插件覆盖的全局配置组,逗号分隔
pub struct MyConfig {
#[schema(label = "端口号", desc = "服务监听端口")]
pub port: u16,
#[schema(label = "缓冲区大小", desc = "读缓冲区大小",
min = 128, max = 65536, suffix = " bytes")]
pub buffer_size: u32,
#[schema(label = "输出目录", desc = "文件输出路径",
pattern = "^[a-zA-Z0-9_/\\-\\.]+$",
help = "只允许字母、数字、下划线、斜杠、连字符和点")]
pub output_dir: String,
#[schema(group = "advanced", group_label = "高级设置",
group_desc = "生产环境调优参数")]
pub max_connections: u32,
#[schema(skip)] // 不暴露到 Schema
pub internal_state: String,
}
属性说明
label字段显示名称
desc字段描述
min / max数值范围约束
suffix显示单位后缀
pattern正则验证模式
help额外帮助信息
group / group_label / group_desc配置分组
skip从 Schema 中排除

很多插件需要 TCP 监听地址、HTTP 设置等全局参数。使用 #[serde(flatten)] 继承 CommonConfig

use sdk::prelude::*;
#[derive(Debug, Clone, Serialize, Deserialize, ConfigSchema)]
#[schema(
plugin = "rtmp",
title = "RTMP Plugin",
description = "RTMP streaming configuration",
override_groups = "Publish,Subscribe,Tcp",
sdk_path = "sdk"
)]
#[serde(default, rename_all = "lowercase")]
pub struct RtmpPluginConfig {
#[serde(default)]
#[schema(label = "启用", desc = "是否启用 RTMP")]
pub enable: bool,
/// 继承全局 TCP/HTTP/Publish/Subscribe 设置
#[serde(flatten)]
#[schema(skip)]
pub common: CommonConfig,
/// 插件特有配置
#[serde(default = "default_chunk_size")]
#[schema(label = "Chunk Size", desc = "RTMP chunk 大小",
min = 128, max = 65536, suffix = " bytes")]
pub chunk_size: u32,
}

此时 YAML 配置会变成:

rtmp:
enable: true
tcp:
listenaddr: ":1935" # 来自 CommonConfig.tcp
chunksize: 4096 # 插件特有

init() 方法中加载配置:

async fn init(
&mut self,
manager: Arc<dyn StreamManagerApi>,
config: Arc<dyn ConfigProvider>,
) -> Result<()> {
// 方式一:解析为结构体(推荐)
let my_config: MyPluginConfig = config
.get_plugin_config("my-plugin")
.and_then(|v| serde_json::from_value(v).ok())
.unwrap_or_default();
// 检查 enable 标志
if !my_config.enable {
self.state = PluginState::Disabled;
return Ok(());
}
// 方式二:读取单个全局配置值
if let Some(log_level) = config.get("loglevel") {
tracing::info!("Global log level: {}", log_level);
}
// 方式三:加载默认 YAML 配置
config.load_default_config("my-plugin", include_str!("default.yaml"))?;
self.config = my_config;
self.stream_manager = Some(manager);
self.state = PluginState::Initialized;
Ok(())
}
pub trait ConfigProvider: Send + Sync + Any {
/// 获取全局配置项
fn get(&self, key: &str) -> Option<serde_json::Value>;
/// 获取插件配置(返回整个插件配置 JSON 对象)
fn get_plugin_config(&self, plugin_name: &str) -> Option<serde_json::Value>;
/// 加载插件的默认 YAML 配置(如果用户未在配置文件中指定)
fn load_default_config(&self, plugin_name: &str, default_yaml: &str) -> Result<()>;
/// 检查插件是否在配置中启用
fn is_plugin_enabled(&self, plugin_name: &str) -> bool;
/// 获取 CORS 配置
fn get_cors_config(&self) -> (bool, Vec<String>);
}

实现 on_config_update() 支持运行时配置变更:

fn on_config_update(&mut self, config: &serde_json::Value) -> Result<()> {
if let Ok(new_config) = serde_json::from_value::<MyPluginConfig>(config.clone()) {
tracing::info!("Config updated, old port: {}, new port: {}",
self.config.port, new_config.port);
self.config = new_config;
}
Ok(())
}

流操作是流媒体插件的核心能力。StreamManagerApi 提供了发布(推流)和订阅(拉流)的完整接口。

pub trait StreamManagerApi: Send + Sync + 'static {
// ── 发布 ─────────────────────────────────────────────
fn create_publisher(&self, path: &str) -> Result<Arc<RwLock<dyn Publisher>>>;
fn create_publisher_for_plugin(&self, path: &str, plugin_name: &str)
-> Result<Arc<RwLock<dyn Publisher>>>;
fn get_publisher(&self, path: &str) -> Option<Arc<RwLock<dyn Publisher>>>;
// ── 订阅 ─────────────────────────────────────────────
async fn subscribe(&self, path: &str, config: SubscriberConfig)
-> Result<(Arc<RwLock<dyn Publisher>>, Box<dyn Subscriber>)>;
fn unsubscribe(&self, path: &str, subscriber: &mut dyn Subscriber);
// ── 查询 ─────────────────────────────────────────────
fn has_stream(&self, path: &str) -> bool;
fn stream_count(&self) -> usize;
fn stream_paths(&self) -> Vec<String>;
// ── 事件 ─────────────────────────────────────────────
fn subscribe_events(&self) -> broadcast::Receiver<StreamEvent>;
// ── 生命周期 ─────────────────────────────────────────
fn dispose_stream(&self, path: &str) -> bool;
}

下面是完整的发布流程:

use std::time::Duration;
use sdk::prelude::*;
async fn publish_stream(
manager: &Arc<dyn StreamManagerApi>,
stream_path: &str,
) -> Result<()> {
// 1. 创建 Publisher(会自动创建对应的 Stream)
let publisher = manager.create_publisher_for_plugin(stream_path, "my-plugin")?;
// 2. 初始化音视频 Track(必须在写入帧之前调用)
{
let mut pub_guard = publisher.write(); // parking_lot RwLock
pub_guard.init_video_track();
pub_guard.init_audio_track();
}
// 3. 设置编解码信息(收到序列头时调用一次)
{
let mut pub_guard = publisher.write();
// 视频编解码器(以 H.264 为例)
let video_codec = VideoCodec::new(
VideoCodecType::H264,
1920, // width
1080, // height
);
pub_guard.set_video_codec(video_codec);
pub_guard.set_video_seq_header(sps_pps_data); // AVCC 格式的序列头
// 音频编解码器(以 AAC 为例)
let audio_codec = AudioCodec::new(
AudioCodecType::AAC,
44100, // sample_rate
2, // channels
);
pub_guard.set_audio_codec(audio_codec);
pub_guard.set_audio_seq_header(audio_specific_config);
}
// 4. 持续写入媒体帧
loop {
// 从你的协议/源中接收数据...
let (frame_data, timestamp, is_keyframe) = receive_frame().await?;
// 写入视频帧(读锁即可,内部使用 interior mutability)
let frame_type = if is_keyframe {
VideoFrameType::Keyframe
} else {
VideoFrameType::Interframe
};
let dts = Duration::from_millis(timestamp);
let pts = dts; // 如果有 CTS 偏移,pts = dts + cts
publisher.read().write_video(frame_type, pts, dts, frame_data)?;
// 写入音频帧
let audio_ts = Duration::from_millis(audio_timestamp);
publisher.read().write_audio(audio_ts, audio_data)?;
}
// 5. 结束发布
manager.dispose_stream(stream_path);
Ok(())
}
pub trait Publisher: Send + Sync {
// ── Track 初始化 ──────────────────────────────────────
fn init_video_track(&mut self); // 必须先调用
fn init_audio_track(&mut self); // 必须先调用
// ── 编解码器设置(序列头,只设置一次)────────────────
fn set_video_codec(&mut self, codec: VideoCodec);
fn set_audio_codec(&mut self, codec: AudioCodec);
fn set_video_seq_header(&mut self, data: Bytes);
fn set_audio_seq_header(&mut self, data: Bytes);
// ── 帧写入(读锁即可调用)────────────────────────────
fn write_video(&self, frame_type: VideoFrameType,
pts: Duration, dts: Duration, data: Bytes) -> Result<()>;
fn write_video_raw(&self, frame_type: VideoFrameType,
timestamp: Duration, dts: Duration,
nalus: Vec<Bytes>) -> Result<()>;
fn write_audio(&self, timestamp: Duration, data: Bytes) -> Result<()>;
// ── 查询 ─────────────────────────────────────────────
fn path(&self) -> &str;
fn subscriber_count(&self) -> usize;
fn video_seq_header(&self) -> Option<Bytes>;
fn audio_seq_header(&self) -> Option<Bytes>;
// ── 生命周期 ─────────────────────────────────────────
fn dispose(&self);
}

下面是完整的订阅流程:

use std::time::Duration;
use tokio::time::timeout;
use sdk::prelude::*;
async fn subscribe_stream(
manager: &Arc<dyn StreamManagerApi>,
stream_path: &str,
cancel: CancellationToken,
) -> Result<()> {
// 1. 配置订阅参数
let config = SubscriberConfig {
mode: SubscribeMode::RealTime, // 实时模式,从最新关键帧开始
buffer_time: Duration::from_secs(2), // Buffer 模式的缓冲时间
receive_video: true,
receive_audio: true,
keyframe_timeout: Some(Duration::from_secs(30)), // 等待关键帧超时
};
// 2. 订阅流(异步等待,直到流可用)
let (publisher_ref, mut subscriber) = manager.subscribe(stream_path, config).await?;
// 3. 获取序列头(用于初始化解码器或发送给客户端)
{
let pub_guard = publisher_ref.read();
if let Some(video_seq) = pub_guard.video_seq_header() {
// 发送视频序列头给客户端...
send_video_header(video_seq).await?;
}
if let Some(audio_seq) = pub_guard.audio_seq_header() {
// 发送音频序列头给客户端...
send_audio_header(audio_seq).await?;
}
}
// 4. 帧读取循环
loop {
// 检查停止信号
if subscriber.is_stopped() || cancel.is_cancelled() {
break;
}
// 读取所有可用的视频帧
while let Ok(Some(frame)) = subscriber.read_video() {
let timestamp = frame.timestamp;
let is_key = frame.is_keyframe();
// AVFrame 支持多种格式的零拷贝缓存转换
let flv_tag = frame.get_flv_video_tag(); // FLV 格式
let annexb = frame.get_annexb(); // Annex-B 格式
let avcc = frame.get_avcc(); // AVCC 格式
// 发送给客户端(选择对应协议的格式)...
send_video_frame(flv_tag, timestamp).await?;
}
// 读取所有可用的音频帧
while let Some(frame) = subscriber.read_audio() {
let timestamp = frame.timestamp;
let flv_tag = frame.get_flv_audio_tag();
send_audio_frame(flv_tag, timestamp).await?;
}
// 5. 等待新帧到达(带超时,保持响应性)
match timeout(Duration::from_millis(10), subscriber.wait_for_frames()).await {
Ok(Ok(())) => continue, // 有新帧,继续读取
Ok(Err(_)) => break, // 流结束
Err(_) => continue, // 10ms 超时,继续循环(检查 cancel)
}
}
// 6. 清理:取消订阅
manager.unsubscribe(stream_path, subscriber.as_mut());
Ok(())
}

AVFrame 内置多种格式的懒加载缓存转换——第一次访问时转换并缓存,后续所有订阅者共享同一份转换结果(零拷贝):

// 视频帧格式
frame.get_flv_video_tag() // → Bytes FLV Video Tag
frame.get_annexb() // → Bytes Annex-B (00 00 00 01 分隔)
frame.get_avcc() // → Bytes AVCC (长度前缀)
frame.get_raw() // → 原始 NALUs
// 视频帧属性
frame.is_keyframe() // → bool
frame.video_frame_type // → VideoFrameType
frame.video_codec // → Option<Arc<VideoCodec>>
frame.timestamp // → Duration
// 音频帧格式
frame.get_flv_audio_tag() // → Bytes FLV Audio Tag
frame.get_audio_raw() // → 原始音频数据
// 音频帧属性
frame.audio_codec // → Option<AudioCodec>
frame.timestamp // → Duration
pub struct SubscriberConfig {
pub mode: SubscribeMode,
pub buffer_time: Duration,
pub receive_video: bool,
pub receive_audio: bool,
pub keyframe_timeout: Option<Duration>,
}
模式说明适用场景
RealTime从最新 IDR 关键帧开始直播实时观看
Bufferbuffer_time 前的 IDR 开始略有延迟但更平滑
WaitKeyframe等待下一个到来的关键帧精确同步起点

插件可以订阅全局流事件,响应流的创建和销毁:

async fn start(&mut self, cancel: CancellationToken) -> Result<()> {
let manager = self.stream_manager.clone().unwrap();
let mut event_rx = manager.subscribe_events();
tokio::spawn(async move {
loop {
tokio::select! {
biased; // 优先检查取消信号
_ = cancel.cancelled() => break,
event = event_rx.recv() => {
match event {
Ok(StreamEvent::Created(path)) => {
tracing::info!("新流创建: {}", path);
}
Ok(StreamEvent::Disposed(path)) => {
tracing::info!("流已销毁: {}", path);
}
Ok(StreamEvent::SubscriberAdded { stream, .. }) => {
tracing::info!("新订阅者加入: {}", stream);
}
Ok(StreamEvent::SubscriberRemoved { stream, .. }) => {
tracing::info!("订阅者离开: {}", stream);
}
Err(_) => break, // channel 关闭
}
}
}
}
});
self.state = PluginState::Running;
Ok(())
}

很多插件需要暴露 HTTP API,例如截图服务、HLS 播放等。

#[async_trait]
pub trait HttpHandler: Send + Sync {
async fn handle(&self, req: HttpRequest) -> HttpResponse;
}
pub struct HttpRequest {
pub method: String, // "GET", "POST" 等
pub path: String, // "/snap/api/live/stream"
pub query: Option<String>, // "format=jpg&quality=85"
pub headers: HashMap<String, String>,
pub body: Bytes,
pub params: HashMap<String, String>, // 路由参数
pub peer_addr: SocketAddr,
}
// 便捷方法
req.body_str() // → Option<&str>
req.parse_json_body::<T>() // → Result<T>
req.query_param("key") // → Option<String>
req.path_param("id") // → Option<String>
// Builder 模式构造响应
HttpResponse::ok().json(&serde_json::json!({"code": 0, "data": "..."}))
HttpResponse::ok().text("Hello World")
HttpResponse::ok().content_type("video/mp2t").body(ts_bytes)
HttpResponse::ok().header("Cache-Control", "no-cache").body(data)
HttpResponse::ok().cors_origin("*")
HttpResponse::not_found().text("Not Found")
HttpResponse::bad_request().text("Missing parameter")
HttpResponse::internal_error().text("Internal error")
HttpResponse::new(302).header("Location", "/redirect").body(Bytes::new())
use sdk::prelude::*;
use std::sync::Arc;
// Handler 持有共享状态
struct StatusHandler {
stream_manager: Arc<dyn StreamManagerApi>,
config: MyPluginConfig,
}
#[async_trait]
impl HttpHandler for StatusHandler {
async fn handle(&self, req: HttpRequest) -> HttpResponse {
// 解析查询参数
let query: HashMap<String, String> = req.query.as_ref()
.map(|q| url::form_urlencoded::parse(q.as_bytes())
.map(|(k, v)| (k.into_owned(), v.into_owned()))
.collect())
.unwrap_or_default();
// 路由分发
let parts: Vec<&str> = req.path
.trim_start_matches('/')
.split('/')
.collect();
match (req.method.as_str(), parts.first().copied().unwrap_or("")) {
("GET", "status") => {
let stats = self.stream_manager.stats();
HttpResponse::ok().json(&serde_json::json!({
"code": 0,
"streams": stats.stream_count,
"message": self.config.message,
}))
}
("GET", "streams") => {
let paths = self.stream_manager.stream_paths();
HttpResponse::ok().json(&serde_json::json!({
"code": 0,
"streams": paths,
}))
}
("POST", "action") => {
match req.parse_json_body::<serde_json::Value>() {
Ok(body) => {
// 处理请求...
HttpResponse::ok().json(&serde_json::json!({
"code": 0,
"message": "done"
}))
}
Err(_) => HttpResponse::bad_request().text("Invalid JSON body"),
}
}
_ => HttpResponse::not_found().json(&serde_json::json!({
"code": 404,
"message": "Not found"
})),
}
}
}

Pluginhttp_routes() 方法中注册路由:

impl Plugin for MyPlugin {
fn http_routes(&self) -> Vec<HttpRoute> {
let handler = Arc::new(StatusHandler {
stream_manager: self.stream_manager.clone().unwrap(),
config: self.config.clone(),
});
vec![
HttpRoute {
method: HttpMethod::Get,
path: "/my-plugin/**".to_string(), // 通配符匹配
handler: Box::new(HandlerAdapter { handler: handler.clone() }),
},
HttpRoute {
method: HttpMethod::Post,
path: "/my-plugin/**".to_string(),
handler: Box::new(HandlerAdapter { handler }),
},
]
}
}
// 如果你的 Handler 在 Arc 中,需要一个适配器
struct HandlerAdapter {
handler: Arc<StatusHandler>,
}
#[async_trait]
impl HttpHandler for HandlerAdapter {
async fn handle(&self, req: HttpRequest) -> HttpResponse {
self.handler.handle(req).await
}
}

协议插件(如 RTMP、RTSP)通常需要监听 TCP 端口。SDK 提供了 create_tcp_listener 工具函数。

use std::net::SocketAddr;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use tokio::net::TcpStream;
use sdk::prelude::*;
pub struct TcpServerPlugin {
state: PluginState,
config: TcpServerConfig,
stream_manager: Option<Arc<dyn StreamManagerApi>>,
task_work: Option<Arc<dyn TaskWork>>,
server_handle: Option<tokio::task::JoinHandle<()>>,
}
#[async_trait]
impl Plugin for TcpServerPlugin {
fn set_task_work(&mut self, work: Arc<dyn TaskWork>) {
self.task_work = Some(work);
}
async fn init(
&mut self,
manager: Arc<dyn StreamManagerApi>,
config: Arc<dyn ConfigProvider>,
) -> Result<()> {
self.stream_manager = Some(manager);
// 解析监听地址...
self.state = PluginState::Initialized;
Ok(())
}
async fn start(&mut self, cancel: CancellationToken) -> Result<()> {
let addr: SocketAddr = self.config.listen_addr.parse()
.unwrap_or("0.0.0.0:9090".parse().unwrap());
let manager = self.stream_manager.clone().unwrap();
let task_work = self.task_work.clone();
let session_counter = Arc::new(AtomicU64::new(0));
// 使用 SDK 的 create_tcp_listener(启用 SO_REUSEPORT)
let listener = sdk::create_tcp_listener(addr).map_err(|e| {
MonibucaError::Io(std::io::Error::new(
std::io::ErrorKind::AddrInUse,
format!("Failed to bind {}: {}", addr, e),
))
})?;
let listener_task = async move {
tracing::info!("TCP server listening on {}", addr);
loop {
tokio::select! {
biased;
_ = cancel.cancelled() => {
tracing::info!("TCP server shutting down");
break;
}
result = listener.accept() => {
match result {
Ok((stream, peer_addr)) => {
let session_id = session_counter
.fetch_add(1, Ordering::Relaxed);
let manager = manager.clone();
// 为每个连接创建子任务(可在引擎面板中监控)
let (session_cancel, child_task) =
if let Some(ref tw) = task_work {
let child = tw.create_child_task(
&format!("Session:{}", peer_addr)
);
child.start();
(child.cancellation_token(), Some(child))
} else {
(cancel.child_token(), None)
};
tokio::spawn(async move {
if let Err(e) = handle_session(
stream, peer_addr, session_id,
manager, session_cancel,
).await {
tracing::error!(
"Session {} error: {}",
peer_addr, e
);
}
// 会话结束,停止子任务
if let Some(task) = child_task {
task.stop();
}
});
}
Err(e) => {
tracing::error!("Accept error: {}", e);
}
}
}
}
}
};
// 用 TaskWork 注册监听器任务
if let Some(ref tw) = self.task_work {
tw.register_child("TCPListener", Box::pin(listener_task));
} else {
self.server_handle = Some(tokio::spawn(listener_task));
}
self.state = PluginState::Running;
Ok(())
}
async fn stop(&mut self) -> Result<()> {
if let Some(handle) = self.server_handle.take() {
handle.abort();
}
self.state = PluginState::Stopped;
Ok(())
}
// ... 其他方法
}
async fn handle_session(
stream: TcpStream,
peer_addr: SocketAddr,
session_id: u64,
manager: Arc<dyn StreamManagerApi>,
cancel: CancellationToken,
) -> Result<()> {
tracing::info!("New session {} from {}", session_id, peer_addr);
tokio::select! {
biased;
_ = cancel.cancelled() => {
tracing::info!("Session {} cancelled", session_id);
}
result = process_connection(stream, manager) => {
if let Err(e) = result {
tracing::warn!("Session {} error: {}", session_id, e);
}
}
}
Ok(())
}
/// 创建启用 SO_REUSEPORT 的 TCP 监听器
/// 允许多个线程/进程同时 accept 同一端口,提升并发性能
pub fn create_tcp_listener(addr: SocketAddr) -> std::io::Result<TcpListener>;

对于需要自定义协议分帧的场景,实现 TcpSplitter

/// 将 TCP 字节流分割为完整的协议消息
pub trait TcpSplitter: Send {
/// 输入新数据,返回所有已完成的消息
fn split(&mut self, data: &[u8]) -> Vec<Vec<u8>>;
/// 返回当前缓冲中不完整的数据
fn pending(&self) -> &[u8];
/// 清空缓冲区
fn clear(&mut self);
}

TaskWork 是引擎的任务树管理系统,让插件的子任务(TCP 连接、录制会话等)可以在引擎面板中监控和管理。

pub trait TaskWork: Send + Sync + Any {
/// 启动根任务
fn start(&self);
/// 注册一个命名的后台任务(fire-and-forget)
fn register_child(&self, owner: &str, task: Pin<Box<dyn Future<Output=()> + Send>>);
/// 创建一个可管理的子任务句柄
fn create_child_task(&self, owner: &str) -> Box<dyn ChildTaskHandle>;
/// 移除已完成的子任务
fn remove_child_task(&self, child_id: u64);
}
pub trait ChildTaskHandle: Send + Sync + Any {
fn start(&self); // 启动子任务
fn stop(&self); // 停止子任务
fn id(&self) -> u64; // 获取唯一 ID
fn cancellation_token(&self) -> CancellationToken; // 获取取消令牌
}

模式 A:register_child — 命名后台任务

适用于长期运行的单一任务(如 TCP 监听循环):

fn set_task_work(&mut self, work: Arc<dyn TaskWork>) {
self.task_work = Some(work);
}
async fn start(&mut self, cancel: CancellationToken) -> Result<()> {
let task = async move {
// TCP accept loop, event monitoring, etc.
loop {
tokio::select! {
biased;
_ = cancel.cancelled() => break,
// ... work ...
}
}
};
if let Some(ref tw) = self.task_work {
tw.register_child("TCPListener", Box::pin(task));
}
Ok(())
}

模式 B:create_child_task — 可管理的子任务

适用于动态创建的多个子任务(如每个 TCP 连接、每个录制会话):

// 创建子任务
let child = task_work.create_child_task(&format!("RTMP:{}", peer_addr));
child.start();
let cancel_token = child.cancellation_token();
// 子任务逻辑中使用 cancel_token 感知停机
tokio::select! {
biased;
_ = cancel_token.cancelled() => { /* 被引擎停止 */ }
result = do_work() => { /* 正常完成 */ }
}
// 完成后清理
child.stop();
task_work.remove_child_task(child.id());

EngineContext 是引擎的 IoC(控制反转)容器,基于 TypeId 的能力映射。插件通过它获取引擎提供的各种可选服务。

pub struct MyPlugin {
state: PluginState,
engine_context: Option<EngineContext>,
database: Option<Arc<dyn DatabaseApi>>,
service_registry: Option<Arc<dyn ServiceRegistry>>,
}
impl Plugin for MyPlugin {
fn set_engine_context(&mut self, ctx: EngineContext) {
// 在 init() 之前被调用,安全存储
if let Some(db) = ctx.database_arc() {
self.database = Some(db);
}
if let Some(reg) = ctx.service_registry() {
self.service_registry = Some(reg);
}
self.engine_context = Some(ctx);
}
}
能力获取方法检查方法说明
StreamManagerApiinit() 参数必有,流管理(发布/订阅/查询)
DatabaseApictx.database_arc()ctx.has_database()数据库操作
TransformApictx.transform()ctx.has_transform()流变换(转码等)
PlaybackApictx.playback()ctx.has_playback()点播回放
ServiceRegistryctx.service_registry()ctx.has_service_registry()插件间服务发现
RoomApictx.room_api()ctx.has_room_api()房间服务
ProxyManagerctx.proxy_manager()ctx.has_proxy_manager()代理管理
CollectionApictx.collection()ctx.has_collection()数据集合操作
ReportingApictx.reporting()ctx.has_reporting()数据上报
UtilityProviderctx.utility_provider()ctx.has_utility_provider()工具函数
HttpRegistryctx.http_registry()ctx.has_http_registry()HTTP 路由注册

除了预定义的能力,还可以通过泛型 API 访问自定义能力:

// 获取任意类型的能力
if let Some(my_service) = ctx.get::<Arc<dyn MyCustomApi>>() {
// 使用自定义能力...
}
// 检查能力是否存在
if ctx.has::<Arc<dyn MyCustomApi>>() {
// ...
}

ServiceRegistry 允许插件注册和发现协议工厂服务,是实现拉流(Pull)和推流(Push)跨协议互操作的核心机制。

当引擎需要从远程源拉流时,会通过 ServiceRegistry 查找对应协议的 PullerFactory

use sdk::prelude::*;
use sdk::services::{PullerFactory, StreamPuller};
pub struct MyPullerFactory;
#[async_trait]
impl PullerFactory for MyPullerFactory {
fn protocol(&self) -> &str {
"myproto" // 协议标识
}
fn supports_url(&self, url: &str) -> bool {
url.starts_with("myproto://")
}
async fn create_puller(
&self,
url: &str,
stream_path: &str,
config: serde_json::Value,
manager: Arc<dyn StreamManagerApi>,
cancel_token: CancellationToken,
) -> Result<Arc<tokio::sync::RwLock<Box<dyn StreamPuller>>>> {
let puller = MyPuller::new(url, stream_path, manager, cancel_token);
Ok(Arc::new(tokio::sync::RwLock::new(Box::new(puller))))
}
}

init()start() 中注册:

async fn init(&mut self, manager: Arc<dyn StreamManagerApi>,
config: Arc<dyn ConfigProvider>) -> Result<()> {
// 通过 EngineContext 获取 ServiceRegistry
if let Some(ref ctx) = self.engine_context {
if let Some(registry) = ctx.service_registry() {
// 注册拉流工厂
registry.register_puller_factory(Arc::new(MyPullerFactory));
// 注册推流工厂
registry.register_pusher_factory(Arc::new(MyPusherFactory));
}
}
Ok(())
}
// 查找支持指定 URL 的拉流工厂
if let Some(factory) = registry.get_puller_factory_for_url("rtmp://example.com/live/test") {
let puller = factory.create_puller(
"rtmp://example.com/live/test",
"live/test",
serde_json::json!({}),
manager.clone(),
cancel.clone(),
).await?;
// 启动拉流
puller.write().await.start().await?;
}
// 列出所有已注册的拉流协议
let protocols = registry.list_puller_protocols(); // ["rtmp", "rtsp", "srt", ...]

通过 export_plugin! 宏导出 C ABI 符号,支持动态加载模式:

lib.rs
sdk::export_plugin!(MyPlugin);

该宏会生成三个 no_mangle extern "C" 函数:

符号说明
plugin_api_version() -> u32ABI 版本号,用于兼容性检查
plugin_create() -> PluginHandle创建插件实例(调用 Default::default()
plugin_metadata() -> PluginMetadata返回插件名称、版本、描述(用于发现)
Terminal window
# 在 Cargo.toml 中添加
[lib]
crate-type = ["cdylib"]
# 编译
cargo build --release
# 产物路径
# Linux: target/release/libplugin_my_plugin.so
# macOS: target/release/libplugin_my_plugin.dylib
# Windows: target/release/plugin_my_plugin.dll
# 部署:复制到 Monibuca 插件目录
cp target/release/libplugin_my_plugin.so /path/to/monibuca/plugins/
target/wasm32-wasi/release/plugin_my_plugin.wasm
cargo build --target wasm32-wasi --release

SDK 提供统一的错误类型 MonibucaErrorResult<T>

use sdk::prelude::*;
// Result<T> = std::result::Result<T, MonibucaError>
// 常用错误构造
MonibucaError::Plugin("my error message".to_string()) // 插件自定义错误
MonibucaError::Io(io_error) // IO 错误
MonibucaError::Config("invalid config".to_string()) // 配置错误
// 在插件中使用
async fn init(&mut self, ...) -> Result<()> {
let config = load_config().map_err(|e|
MonibucaError::Plugin(format!("Failed to load config: {}", e))
)?;
Ok(())
}

示例一:截图插件(HTTP + 订阅)

Section titled “示例一:截图插件(HTTP + 订阅)”

这个示例展示了一个实用的截图插件——通过 HTTP 请求触发对指定流的截图:

use std::any::Any;
use std::sync::Arc;
use std::time::Duration;
use sdk::prelude::*;
pub struct SnapPlugin {
state: PluginState,
stream_manager: Option<Arc<dyn StreamManagerApi>>,
}
impl Default for SnapPlugin {
fn default() -> Self {
Self { state: PluginState::Created, stream_manager: None }
}
}
// HTTP Handler:接收截图请求
struct SnapHandler {
stream_manager: Arc<dyn StreamManagerApi>,
}
#[async_trait]
impl HttpHandler for SnapHandler {
async fn handle(&self, req: HttpRequest) -> HttpResponse {
// 从路径中提取流名称: /snap/live/camera → live/camera
let stream_path = req.path.trim_start_matches('/');
if !self.stream_manager.has_stream(stream_path) {
return HttpResponse::not_found().json(&serde_json::json!({
"error": "Stream not found"
}));
}
// 订阅流,只接收视频,等待一个关键帧
let config = SubscriberConfig {
mode: SubscribeMode::WaitKeyframe,
receive_video: true,
receive_audio: false,
keyframe_timeout: Some(Duration::from_secs(5)),
..Default::default()
};
match self.stream_manager.subscribe(stream_path, config).await {
Ok((_pub_ref, mut subscriber)) => {
// 等待第一个关键帧
if subscriber.wait_for_frames().await.is_ok() {
if let Ok(Some(frame)) = subscriber.read_video() {
if frame.is_keyframe() {
let raw_data = frame.get_raw();
// 这里可以进一步解码为图片...
return HttpResponse::ok()
.content_type("application/octet-stream")
.body(raw_data);
}
}
}
self.stream_manager.unsubscribe(stream_path, subscriber.as_mut());
HttpResponse::internal_error().text("Failed to capture frame")
}
Err(e) => {
HttpResponse::internal_error().text(&format!("Subscribe failed: {}", e))
}
}
}
}
#[async_trait]
impl Plugin for SnapPlugin {
fn info(&self) -> PluginInfo {
PluginInfo {
name: "snap", version: "1.0.0",
description: "Video snapshot plugin", author: "Monibuca Team",
}
}
fn state(&self) -> PluginState { self.state }
async fn init(&mut self, manager: Arc<dyn StreamManagerApi>,
_config: Arc<dyn ConfigProvider>) -> Result<()> {
self.stream_manager = Some(manager);
self.state = PluginState::Initialized;
Ok(())
}
async fn start(&mut self, _cancel: CancellationToken) -> Result<()> {
self.state = PluginState::Running;
Ok(())
}
async fn stop(&mut self) -> Result<()> {
self.state = PluginState::Stopped;
Ok(())
}
fn http_routes(&self) -> Vec<HttpRoute> {
if let Some(ref sm) = self.stream_manager {
vec![HttpRoute {
method: HttpMethod::Get,
path: "/snap/**".to_string(),
handler: Box::new(SnapHandler {
stream_manager: sm.clone(),
}),
}]
} else {
Vec::new()
}
}
fn as_any(&self) -> &dyn Any { self }
fn as_any_mut(&mut self) -> &mut dyn Any { self }
}

示例二:流转发插件(发布 + 订阅联动)

Section titled “示例二:流转发插件(发布 + 订阅联动)”

将一个流转发到另一个路径(例如添加水印或改变路径):

async fn relay_stream(
manager: Arc<dyn StreamManagerApi>,
source_path: &str,
target_path: &str,
cancel: CancellationToken,
) -> Result<()> {
// 1. 订阅源流
let config = SubscriberConfig::default();
let (source_pub, mut subscriber) = manager.subscribe(source_path, config).await?;
// 2. 创建目标 Publisher
let target_pub = manager.create_publisher_for_plugin(target_path, "relay")?;
// 3. 复制编解码配置
{
let source = source_pub.read();
let mut target = target_pub.write();
target.init_video_track();
target.init_audio_track();
if let Some(codec) = source.audio_codec() {
target.set_audio_codec(codec);
}
if let Some(seq) = source.video_seq_header() {
target.set_video_seq_header(seq);
}
if let Some(seq) = source.audio_seq_header() {
target.set_audio_seq_header(seq);
}
}
// 4. 转发帧数据
loop {
if subscriber.is_stopped() || cancel.is_cancelled() {
break;
}
while let Ok(Some(frame)) = subscriber.read_video() {
let _ = target_pub.read().write_video(
frame.video_frame_type,
frame.timestamp, // pts
frame.timestamp, // dts
frame.get_avcc(),
);
}
while let Some(frame) = subscriber.read_audio() {
let _ = target_pub.read().write_audio(
frame.timestamp,
frame.get_audio_raw(),
);
}
match tokio::time::timeout(
Duration::from_millis(10),
subscriber.wait_for_frames(),
).await {
Ok(Ok(())) => continue,
Ok(Err(_)) => break,
Err(_) => continue,
}
}
// 5. 清理
manager.unsubscribe(source_path, subscriber.as_mut());
manager.dispose_stream(target_path);
Ok(())
}

init() 中只做配置加载和依赖注入。耗时操作(网络监听、文件 I/O)放到 start() 中:

// ✅ 正确
async fn init(&mut self, manager: Arc<dyn StreamManagerApi>,
config: Arc<dyn ConfigProvider>) -> Result<()> {
self.config = load_config(config)?; // 轻量
self.stream_manager = Some(manager); // 存储引用
self.state = PluginState::Initialized;
Ok(())
}
async fn start(&mut self, cancel: CancellationToken) -> Result<()> {
let listener = sdk::create_tcp_listener(self.config.addr)?; // 耗时操作
// 启动 accept loop...
Ok(())
}
// ❌ 错误:在 init 中做网络操作
async fn init(&mut self, ...) -> Result<()> {
let listener = sdk::create_tcp_listener(addr)?; // 不应该在这里
Ok(())
}

始终在异步循环中检查 cancel 信号,使用 biased 确保优先响应停机:

loop {
tokio::select! {
biased; // ← 优先检查第一个分支
_ = cancel.cancelled() => {
tracing::info!("Shutting down gracefully");
break;
}
result = do_work() => {
// 处理结果...
}
}
}

stop() 中正确清理所有资源:

async fn stop(&mut self) -> Result<()> {
// 1. 停止后台任务
if let Some(handle) = self.server_handle.take() {
handle.abort();
}
// 2. 关闭网络连接
// 3. 等待正在进行的操作完成
// 4. 释放资源
self.state = PluginState::Stopped;
tracing::info!("Plugin stopped");
Ok(())
}

同时建议为持有资源的会话实现 Drop

impl Drop for MySession {
fn drop(&mut self) {
self.cleanup(); // 确保即使 panic 也能清理
}
}

使用 tracing 进行带上下文的结构化日志:

use tracing::{info, warn, error, debug, instrument};
#[instrument(skip(stream), fields(session_id = %session_id))]
async fn handle_session(stream: TcpStream, session_id: u64) {
info!("Session started");
// 后续所有日志自动包含 session_id 字段
if let Err(e) = process(stream).await {
error!(error = %e, "Session error");
}
info!("Session ended");
}

插件代码中应避免 unwrap()panic!,使用 Result 传播错误:

// ✅ 正确
let addr: SocketAddr = config.addr.parse().map_err(|e|
MonibucaError::Config(format!("Invalid address: {}", e))
)?;
// ❌ 避免
let addr: SocketAddr = config.addr.parse().unwrap();

通过 TaskWork 注册子任务,让引擎可以监控和统一管理:

// ✅ 使用 TaskWork
if let Some(ref tw) = self.task_work {
tw.register_child("TCPListener", Box::pin(listener_task));
}
// 较弱的替代方案(不受引擎管理)
tokio::spawn(listener_task);

对于需要运行时调整的配置,实现 on_config_update()

fn on_config_update(&mut self, config: &serde_json::Value) -> Result<()> {
if let Ok(new_config) = serde_json::from_value::<MyConfig>(config.clone()) {
// 只更新可以安全热更新的字段
self.config.max_connections = new_config.max_connections;
self.config.timeout = new_config.timeout;
// 注意:端口等需要重启的字段不应热更新
tracing::info!("Config updated");
}
Ok(())
}

一个功能完整的插件推荐以下目录结构:

plugins/my-plugin/
├── Cargo.toml
├── src/
│ ├── lib.rs # 模块导出 + export_plugin!
│ ├── plugin.rs # Plugin trait 实现(生命周期)
│ ├── config.rs # ConfigSchema 配置定义
│ ├── handler.rs # HTTP Handler(如果有 HTTP API)
│ ├── session.rs # 连接会话逻辑(如果是 TCP 服务)
│ ├── factory.rs # PullerFactory/PusherFactory(如果是协议插件)
│ └── error.rs # 插件自定义错误类型(可选)
└── tests/
└── integration.rs # 集成测试(可选)

联系我们

微信公众号:不卡科技 微信公众号二维码
腾讯频道:流媒体技术 腾讯频道二维码
QQ 频道:p0qq0crz08 QQ 频道二维码
QQ 群:751639168 QQ 群二维码