插件开发指南
本指南是使用 Monibuca V6 SDK 开发自定义插件的完整教程。你将学习插件的完整生命周期、配置系统、流媒体操作、HTTP 服务、任务管理等核心概念,并通过实战示例掌握插件开发的各种模式。
Monibuca 采用插件化架构,所有协议支持和扩展功能都以插件形式实现。SDK(monibuca-sdk)是插件与引擎交互的唯一契约层——插件只需依赖 SDK crate,无需了解引擎内部实现。
你的插件 crate │ └──▶ sdk (monibuca-sdk) ← 唯一依赖,开源 │ └──▶ codec (monibuca-codec) ← 编解码底层插件开发完成后,可以三种模式部署:
| 模式 | 说明 | 适用场景 |
|---|---|---|
| 静态编译 | 编译时链接到引擎二进制 | 最佳性能,生产部署 |
| 动态加载 | 编译为 .so/.dylib,运行时加载 | 热更新,第三方插件 |
| WASM 沙箱 | 编译为 WASM 模块,隔离执行 | 安全隔离,不受信任的插件 |
快速开始:创建第一个插件
Section titled “快速开始:创建第一个插件”1. 创建项目
Section titled “1. 创建项目”mkdir -p plugins/my-plugin/srccd plugins/my-plugin2. 编写 Cargo.toml
Section titled “2. 编写 Cargo.toml”[package]name = "plugin-my-plugin"version = "0.1.0"edition = "2024"
[dependencies]sdk = { git = "https://github.com/langhuihui/monibuca-sdk" }# 如果在 monibuca workspace 内开发:# sdk = { path = "../../crates/monibuca-sdk" }
async-trait = "0.1"serde = { version = "1.0", features = ["derive"] }serde_json = "1.0"tokio = { version = "1", features = ["full"] }tokio-util = "0.7"tracing = "0.1"3. 编写 lib.rs
Section titled “3. 编写 lib.rs”mod config;mod plugin;
pub use plugin::MyPlugin;
// 动态加载模式需要导出 C ABI 符号sdk::export_plugin!(MyPlugin);4. 编写配置 config.rs
Section titled “4. 编写配置 config.rs”use sdk::ConfigSchema;use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Serialize, Deserialize, ConfigSchema)]#[schema( plugin = "my-plugin", title = "My Plugin", description = "A custom Monibuca plugin", sdk_path = "sdk" // 独立插件 crate 必须指定)]pub struct MyPluginConfig { /// 是否启用插件 #[serde(default = "default_enable")] #[schema(label = "启用", desc = "是否启用此插件")] pub enable: bool,
/// 自定义参数 #[serde(default = "default_message")] #[schema(label = "消息", desc = "自定义欢迎消息")] pub message: String,}
fn default_enable() -> bool { true }fn default_message() -> String { "Hello from MyPlugin!".to_string() }
impl Default for MyPluginConfig { fn default() -> Self { Self { enable: default_enable(), message: default_message(), } }}5. 编写插件主体 plugin.rs
Section titled “5. 编写插件主体 plugin.rs”use std::any::Any;use std::sync::Arc;use async_trait::async_trait;use tokio_util::sync::CancellationToken;use sdk::prelude::*;
use crate::config::MyPluginConfig;
pub struct MyPlugin { state: PluginState, config: MyPluginConfig,}
impl Default for MyPlugin { fn default() -> Self { Self { state: PluginState::Created, config: MyPluginConfig::default(), } }}
#[async_trait]impl Plugin for MyPlugin { fn info(&self) -> PluginInfo { PluginInfo { name: "my-plugin", version: "0.1.0", description: "A custom Monibuca plugin", author: "Your Name", } }
fn state(&self) -> PluginState { self.state }
async fn init( &mut self, _manager: Arc<dyn StreamManagerApi>, config: Arc<dyn ConfigProvider>, ) -> Result<()> { // 从配置文件加载插件配置 self.config = config .get_plugin_config("my-plugin") .and_then(|v| serde_json::from_value(v).ok()) .unwrap_or_default();
if !self.config.enable { tracing::info!("MyPlugin is disabled"); self.state = PluginState::Disabled; return Ok(()); }
tracing::info!("MyPlugin initialized: {}", self.config.message); self.state = PluginState::Initialized; Ok(()) }
async fn start(&mut self, _cancel: CancellationToken) -> Result<()> { if self.state == PluginState::Disabled { return Ok(()); } self.state = PluginState::Running; tracing::info!("MyPlugin started"); Ok(()) }
async fn stop(&mut self) -> Result<()> { self.state = PluginState::Stopped; tracing::info!("MyPlugin stopped"); Ok(()) }
fn as_any(&self) -> &dyn Any { self } fn as_any_mut(&mut self) -> &mut dyn Any { self }}6. 对应的配置文件
Section titled “6. 对应的配置文件”在 Monibuca 的 config.yaml 中添加:
my-plugin: enable: true message: "Welcome to Monibuca!"恭喜!你已经创建了一个最小但完整的 Monibuca 插件。接下来,我们深入了解每个核心概念。
Plugin Trait 详解
Section titled “Plugin Trait 详解”Plugin trait 是所有插件的核心接口。引擎通过这个 trait 管理插件的完整生命周期。
#[async_trait]pub trait Plugin: Send + Sync { // ── 必须实现 ────────────────────────────────────────────── fn info(&self) -> PluginInfo; fn state(&self) -> PluginState; async fn init(&mut self, manager: Arc<dyn StreamManagerApi>, config: Arc<dyn ConfigProvider>) -> Result<()>; async fn start(&mut self, cancel: CancellationToken) -> Result<()>; async fn stop(&mut self) -> Result<()>; fn as_any(&self) -> &dyn Any; fn as_any_mut(&mut self) -> &mut dyn Any;
// ── 可选覆盖(有默认实现)────────────────────────────────── fn name(&self) -> &'static str { self.info().name } fn is_enabled(&self) -> bool { true } fn on_config_update(&mut self, _config: &serde_json::Value) -> Result<()> { Ok(()) } fn set_task_work(&mut self, _work: Arc<dyn TaskWork>) {} fn set_engine_context(&mut self, _ctx: EngineContext) {} fn http_routes(&self) -> Vec<HttpRoute> { Vec::new() } fn register_http_handlers(&mut self, _server: SharedHttpServer) {}}引擎按以下顺序调用插件方法:
┌─────────────┐ │ Created │ ← Default::default() 或 new() └──────┬──────┘ │ ▼ set_engine_context(ctx) ← 注入 IoC 容器(可选能力) set_task_work(work) ← 注入任务管理器 │ ▼ init(manager, config) ← 加载配置、初始化依赖 ┌──────┴──────┐ │ Initialized │ └──────┬──────┘ │ ▼ start(cancel_token) ← 启动服务(TCP/HTTP/后台任务) ┌──────┴──────┐ │ Running │ ← 正常服务中 └──────┬──────┘ │ cancel_token 被取消,或引擎调用 stop() ▼ stop() ← 优雅停机,清理资源 ┌──────┴──────┐ │ Stopped │ └─────────────┘PluginState 状态枚举
Section titled “PluginState 状态枚举”pub enum PluginState { Created, // 刚构造,尚未初始化 Initialized, // init() 成功 Running, // start() 成功,正在服务 Stopped, // stop() 完成 Disabled, // 配置中 enable: false Error, // 发生不可恢复的错误}PluginInfo 元信息
Section titled “PluginInfo 元信息”pub struct PluginInfo { pub name: &'static str, // 插件唯一标识,与配置文件中的 key 对应 pub version: &'static str, // 语义化版本号 pub description: &'static str, // 简短描述 pub author: &'static str, // 作者信息}ConfigSchema 派生宏
Section titled “ConfigSchema 派生宏”ConfigSchema 宏自动为插件配置结构体生成 JSON Schema,供管理界面和 API 使用。
结构体级别属性
Section titled “结构体级别属性”#[derive(Debug, Clone, Serialize, Deserialize, ConfigSchema)]#[schema( plugin = "rtmp", // 必填:插件名称标识 title = "RTMP Plugin", // 必填:显示标题 description = "RTMP/RTMPS streaming", // 必填:描述 sdk_path = "sdk", // 必填:独立 crate 必须设为 "sdk" override_groups = "Publish,Subscribe,Tcp" // 可选:覆盖的全局配置组)]| 属性 | 必填 | 说明 |
|---|---|---|
plugin | ✅ | 插件名称标识,对应配置文件中的 key |
title | ✅ | Schema 标题,用于管理界面显示 |
description | ✅ | Schema 描述 |
sdk_path | ✅ | SDK crate 路径,独立 crate 必须设为 "sdk" |
override_groups | ❌ | 该插件覆盖的全局配置组,逗号分隔 |
字段级别属性
Section titled “字段级别属性”pub struct MyConfig { #[schema(label = "端口号", desc = "服务监听端口")] pub port: u16,
#[schema(label = "缓冲区大小", desc = "读缓冲区大小", min = 128, max = 65536, suffix = " bytes")] pub buffer_size: u32,
#[schema(label = "输出目录", desc = "文件输出路径", pattern = "^[a-zA-Z0-9_/\\-\\.]+$", help = "只允许字母、数字、下划线、斜杠、连字符和点")] pub output_dir: String,
#[schema(group = "advanced", group_label = "高级设置", group_desc = "生产环境调优参数")] pub max_connections: u32,
#[schema(skip)] // 不暴露到 Schema pub internal_state: String,}| 属性 | 说明 |
|---|---|
label | 字段显示名称 |
desc | 字段描述 |
min / max | 数值范围约束 |
suffix | 显示单位后缀 |
pattern | 正则验证模式 |
help | 额外帮助信息 |
group / group_label / group_desc | 配置分组 |
skip | 从 Schema 中排除 |
使用 CommonConfig 继承全局设置
Section titled “使用 CommonConfig 继承全局设置”很多插件需要 TCP 监听地址、HTTP 设置等全局参数。使用 #[serde(flatten)] 继承 CommonConfig:
use sdk::prelude::*;
#[derive(Debug, Clone, Serialize, Deserialize, ConfigSchema)]#[schema( plugin = "rtmp", title = "RTMP Plugin", description = "RTMP streaming configuration", override_groups = "Publish,Subscribe,Tcp", sdk_path = "sdk")]#[serde(default, rename_all = "lowercase")]pub struct RtmpPluginConfig { #[serde(default)] #[schema(label = "启用", desc = "是否启用 RTMP")] pub enable: bool,
/// 继承全局 TCP/HTTP/Publish/Subscribe 设置 #[serde(flatten)] #[schema(skip)] pub common: CommonConfig,
/// 插件特有配置 #[serde(default = "default_chunk_size")] #[schema(label = "Chunk Size", desc = "RTMP chunk 大小", min = 128, max = 65536, suffix = " bytes")] pub chunk_size: u32,}此时 YAML 配置会变成:
rtmp: enable: true tcp: listenaddr: ":1935" # 来自 CommonConfig.tcp chunksize: 4096 # 插件特有加载和使用配置
Section titled “加载和使用配置”在 init() 方法中加载配置:
async fn init( &mut self, manager: Arc<dyn StreamManagerApi>, config: Arc<dyn ConfigProvider>,) -> Result<()> { // 方式一:解析为结构体(推荐) let my_config: MyPluginConfig = config .get_plugin_config("my-plugin") .and_then(|v| serde_json::from_value(v).ok()) .unwrap_or_default();
// 检查 enable 标志 if !my_config.enable { self.state = PluginState::Disabled; return Ok(()); }
// 方式二:读取单个全局配置值 if let Some(log_level) = config.get("loglevel") { tracing::info!("Global log level: {}", log_level); }
// 方式三:加载默认 YAML 配置 config.load_default_config("my-plugin", include_str!("default.yaml"))?;
self.config = my_config; self.stream_manager = Some(manager); self.state = PluginState::Initialized; Ok(())}ConfigProvider 接口
Section titled “ConfigProvider 接口”pub trait ConfigProvider: Send + Sync + Any { /// 获取全局配置项 fn get(&self, key: &str) -> Option<serde_json::Value>;
/// 获取插件配置(返回整个插件配置 JSON 对象) fn get_plugin_config(&self, plugin_name: &str) -> Option<serde_json::Value>;
/// 加载插件的默认 YAML 配置(如果用户未在配置文件中指定) fn load_default_config(&self, plugin_name: &str, default_yaml: &str) -> Result<()>;
/// 检查插件是否在配置中启用 fn is_plugin_enabled(&self, plugin_name: &str) -> bool;
/// 获取 CORS 配置 fn get_cors_config(&self) -> (bool, Vec<String>);}实现 on_config_update() 支持运行时配置变更:
fn on_config_update(&mut self, config: &serde_json::Value) -> Result<()> { if let Ok(new_config) = serde_json::from_value::<MyPluginConfig>(config.clone()) { tracing::info!("Config updated, old port: {}, new port: {}", self.config.port, new_config.port); self.config = new_config; } Ok(())}流操作:发布与订阅
Section titled “流操作:发布与订阅”流操作是流媒体插件的核心能力。StreamManagerApi 提供了发布(推流)和订阅(拉流)的完整接口。
StreamManagerApi 核心方法
Section titled “StreamManagerApi 核心方法”pub trait StreamManagerApi: Send + Sync + 'static { // ── 发布 ───────────────────────────────────────────── fn create_publisher(&self, path: &str) -> Result<Arc<RwLock<dyn Publisher>>>; fn create_publisher_for_plugin(&self, path: &str, plugin_name: &str) -> Result<Arc<RwLock<dyn Publisher>>>; fn get_publisher(&self, path: &str) -> Option<Arc<RwLock<dyn Publisher>>>;
// ── 订阅 ───────────────────────────────────────────── async fn subscribe(&self, path: &str, config: SubscriberConfig) -> Result<(Arc<RwLock<dyn Publisher>>, Box<dyn Subscriber>)>; fn unsubscribe(&self, path: &str, subscriber: &mut dyn Subscriber);
// ── 查询 ───────────────────────────────────────────── fn has_stream(&self, path: &str) -> bool; fn stream_count(&self) -> usize; fn stream_paths(&self) -> Vec<String>;
// ── 事件 ───────────────────────────────────────────── fn subscribe_events(&self) -> broadcast::Receiver<StreamEvent>;
// ── 生命周期 ───────────────────────────────────────── fn dispose_stream(&self, path: &str) -> bool;}发布流(推流端)
Section titled “发布流(推流端)”下面是完整的发布流程:
use std::time::Duration;use sdk::prelude::*;
async fn publish_stream( manager: &Arc<dyn StreamManagerApi>, stream_path: &str,) -> Result<()> { // 1. 创建 Publisher(会自动创建对应的 Stream) let publisher = manager.create_publisher_for_plugin(stream_path, "my-plugin")?;
// 2. 初始化音视频 Track(必须在写入帧之前调用) { let mut pub_guard = publisher.write(); // parking_lot RwLock pub_guard.init_video_track(); pub_guard.init_audio_track(); }
// 3. 设置编解码信息(收到序列头时调用一次) { let mut pub_guard = publisher.write();
// 视频编解码器(以 H.264 为例) let video_codec = VideoCodec::new( VideoCodecType::H264, 1920, // width 1080, // height ); pub_guard.set_video_codec(video_codec); pub_guard.set_video_seq_header(sps_pps_data); // AVCC 格式的序列头
// 音频编解码器(以 AAC 为例) let audio_codec = AudioCodec::new( AudioCodecType::AAC, 44100, // sample_rate 2, // channels ); pub_guard.set_audio_codec(audio_codec); pub_guard.set_audio_seq_header(audio_specific_config); }
// 4. 持续写入媒体帧 loop { // 从你的协议/源中接收数据... let (frame_data, timestamp, is_keyframe) = receive_frame().await?;
// 写入视频帧(读锁即可,内部使用 interior mutability) let frame_type = if is_keyframe { VideoFrameType::Keyframe } else { VideoFrameType::Interframe }; let dts = Duration::from_millis(timestamp); let pts = dts; // 如果有 CTS 偏移,pts = dts + cts
publisher.read().write_video(frame_type, pts, dts, frame_data)?;
// 写入音频帧 let audio_ts = Duration::from_millis(audio_timestamp); publisher.read().write_audio(audio_ts, audio_data)?; }
// 5. 结束发布 manager.dispose_stream(stream_path); Ok(())}Publisher Trait 关键方法
Section titled “Publisher Trait 关键方法”pub trait Publisher: Send + Sync { // ── Track 初始化 ────────────────────────────────────── fn init_video_track(&mut self); // 必须先调用 fn init_audio_track(&mut self); // 必须先调用
// ── 编解码器设置(序列头,只设置一次)──────────────── fn set_video_codec(&mut self, codec: VideoCodec); fn set_audio_codec(&mut self, codec: AudioCodec); fn set_video_seq_header(&mut self, data: Bytes); fn set_audio_seq_header(&mut self, data: Bytes);
// ── 帧写入(读锁即可调用)──────────────────────────── fn write_video(&self, frame_type: VideoFrameType, pts: Duration, dts: Duration, data: Bytes) -> Result<()>; fn write_video_raw(&self, frame_type: VideoFrameType, timestamp: Duration, dts: Duration, nalus: Vec<Bytes>) -> Result<()>; fn write_audio(&self, timestamp: Duration, data: Bytes) -> Result<()>;
// ── 查询 ───────────────────────────────────────────── fn path(&self) -> &str; fn subscriber_count(&self) -> usize; fn video_seq_header(&self) -> Option<Bytes>; fn audio_seq_header(&self) -> Option<Bytes>;
// ── 生命周期 ───────────────────────────────────────── fn dispose(&self);}订阅流(拉流端)
Section titled “订阅流(拉流端)”下面是完整的订阅流程:
use std::time::Duration;use tokio::time::timeout;use sdk::prelude::*;
async fn subscribe_stream( manager: &Arc<dyn StreamManagerApi>, stream_path: &str, cancel: CancellationToken,) -> Result<()> { // 1. 配置订阅参数 let config = SubscriberConfig { mode: SubscribeMode::RealTime, // 实时模式,从最新关键帧开始 buffer_time: Duration::from_secs(2), // Buffer 模式的缓冲时间 receive_video: true, receive_audio: true, keyframe_timeout: Some(Duration::from_secs(30)), // 等待关键帧超时 };
// 2. 订阅流(异步等待,直到流可用) let (publisher_ref, mut subscriber) = manager.subscribe(stream_path, config).await?;
// 3. 获取序列头(用于初始化解码器或发送给客户端) { let pub_guard = publisher_ref.read(); if let Some(video_seq) = pub_guard.video_seq_header() { // 发送视频序列头给客户端... send_video_header(video_seq).await?; } if let Some(audio_seq) = pub_guard.audio_seq_header() { // 发送音频序列头给客户端... send_audio_header(audio_seq).await?; } }
// 4. 帧读取循环 loop { // 检查停止信号 if subscriber.is_stopped() || cancel.is_cancelled() { break; }
// 读取所有可用的视频帧 while let Ok(Some(frame)) = subscriber.read_video() { let timestamp = frame.timestamp; let is_key = frame.is_keyframe();
// AVFrame 支持多种格式的零拷贝缓存转换 let flv_tag = frame.get_flv_video_tag(); // FLV 格式 let annexb = frame.get_annexb(); // Annex-B 格式 let avcc = frame.get_avcc(); // AVCC 格式
// 发送给客户端(选择对应协议的格式)... send_video_frame(flv_tag, timestamp).await?; }
// 读取所有可用的音频帧 while let Some(frame) = subscriber.read_audio() { let timestamp = frame.timestamp; let flv_tag = frame.get_flv_audio_tag();
send_audio_frame(flv_tag, timestamp).await?; }
// 5. 等待新帧到达(带超时,保持响应性) match timeout(Duration::from_millis(10), subscriber.wait_for_frames()).await { Ok(Ok(())) => continue, // 有新帧,继续读取 Ok(Err(_)) => break, // 流结束 Err(_) => continue, // 10ms 超时,继续循环(检查 cancel) } }
// 6. 清理:取消订阅 manager.unsubscribe(stream_path, subscriber.as_mut()); Ok(())}AVFrame 格式转换
Section titled “AVFrame 格式转换”AVFrame 内置多种格式的懒加载缓存转换——第一次访问时转换并缓存,后续所有订阅者共享同一份转换结果(零拷贝):
// 视频帧格式frame.get_flv_video_tag() // → Bytes FLV Video Tagframe.get_annexb() // → Bytes Annex-B (00 00 00 01 分隔)frame.get_avcc() // → Bytes AVCC (长度前缀)frame.get_raw() // → 原始 NALUs
// 视频帧属性frame.is_keyframe() // → boolframe.video_frame_type // → VideoFrameTypeframe.video_codec // → Option<Arc<VideoCodec>>frame.timestamp // → Duration
// 音频帧格式frame.get_flv_audio_tag() // → Bytes FLV Audio Tagframe.get_audio_raw() // → 原始音频数据
// 音频帧属性frame.audio_codec // → Option<AudioCodec>frame.timestamp // → DurationSubscriberConfig 参数
Section titled “SubscriberConfig 参数”pub struct SubscriberConfig { pub mode: SubscribeMode, pub buffer_time: Duration, pub receive_video: bool, pub receive_audio: bool, pub keyframe_timeout: Option<Duration>,}| 模式 | 说明 | 适用场景 |
|---|---|---|
RealTime | 从最新 IDR 关键帧开始 | 直播实时观看 |
Buffer | 从 buffer_time 前的 IDR 开始 | 略有延迟但更平滑 |
WaitKeyframe | 等待下一个到来的关键帧 | 精确同步起点 |
插件可以订阅全局流事件,响应流的创建和销毁:
async fn start(&mut self, cancel: CancellationToken) -> Result<()> { let manager = self.stream_manager.clone().unwrap(); let mut event_rx = manager.subscribe_events();
tokio::spawn(async move { loop { tokio::select! { biased; // 优先检查取消信号 _ = cancel.cancelled() => break, event = event_rx.recv() => { match event { Ok(StreamEvent::Created(path)) => { tracing::info!("新流创建: {}", path); } Ok(StreamEvent::Disposed(path)) => { tracing::info!("流已销毁: {}", path); } Ok(StreamEvent::SubscriberAdded { stream, .. }) => { tracing::info!("新订阅者加入: {}", stream); } Ok(StreamEvent::SubscriberRemoved { stream, .. }) => { tracing::info!("订阅者离开: {}", stream); } Err(_) => break, // channel 关闭 } } } } });
self.state = PluginState::Running; Ok(())}HTTP 端点注册
Section titled “HTTP 端点注册”很多插件需要暴露 HTTP API,例如截图服务、HLS 播放等。
HttpHandler Trait
Section titled “HttpHandler Trait”#[async_trait]pub trait HttpHandler: Send + Sync { async fn handle(&self, req: HttpRequest) -> HttpResponse;}HttpRequest
Section titled “HttpRequest”pub struct HttpRequest { pub method: String, // "GET", "POST" 等 pub path: String, // "/snap/api/live/stream" pub query: Option<String>, // "format=jpg&quality=85" pub headers: HashMap<String, String>, pub body: Bytes, pub params: HashMap<String, String>, // 路由参数 pub peer_addr: SocketAddr,}
// 便捷方法req.body_str() // → Option<&str>req.parse_json_body::<T>() // → Result<T>req.query_param("key") // → Option<String>req.path_param("id") // → Option<String>HttpResponse
Section titled “HttpResponse”// Builder 模式构造响应HttpResponse::ok().json(&serde_json::json!({"code": 0, "data": "..."}))HttpResponse::ok().text("Hello World")HttpResponse::ok().content_type("video/mp2t").body(ts_bytes)HttpResponse::ok().header("Cache-Control", "no-cache").body(data)HttpResponse::ok().cors_origin("*")
HttpResponse::not_found().text("Not Found")HttpResponse::bad_request().text("Missing parameter")HttpResponse::internal_error().text("Internal error")HttpResponse::new(302).header("Location", "/redirect").body(Bytes::new())实现示例:带状态的 HTTP Handler
Section titled “实现示例:带状态的 HTTP Handler”use sdk::prelude::*;use std::sync::Arc;
// Handler 持有共享状态struct StatusHandler { stream_manager: Arc<dyn StreamManagerApi>, config: MyPluginConfig,}
#[async_trait]impl HttpHandler for StatusHandler { async fn handle(&self, req: HttpRequest) -> HttpResponse { // 解析查询参数 let query: HashMap<String, String> = req.query.as_ref() .map(|q| url::form_urlencoded::parse(q.as_bytes()) .map(|(k, v)| (k.into_owned(), v.into_owned())) .collect()) .unwrap_or_default();
// 路由分发 let parts: Vec<&str> = req.path .trim_start_matches('/') .split('/') .collect();
match (req.method.as_str(), parts.first().copied().unwrap_or("")) { ("GET", "status") => { let stats = self.stream_manager.stats(); HttpResponse::ok().json(&serde_json::json!({ "code": 0, "streams": stats.stream_count, "message": self.config.message, })) } ("GET", "streams") => { let paths = self.stream_manager.stream_paths(); HttpResponse::ok().json(&serde_json::json!({ "code": 0, "streams": paths, })) } ("POST", "action") => { match req.parse_json_body::<serde_json::Value>() { Ok(body) => { // 处理请求... HttpResponse::ok().json(&serde_json::json!({ "code": 0, "message": "done" })) } Err(_) => HttpResponse::bad_request().text("Invalid JSON body"), } } _ => HttpResponse::not_found().json(&serde_json::json!({ "code": 404, "message": "Not found" })), } }}注册 HTTP 路由
Section titled “注册 HTTP 路由”在 Plugin 的 http_routes() 方法中注册路由:
impl Plugin for MyPlugin { fn http_routes(&self) -> Vec<HttpRoute> { let handler = Arc::new(StatusHandler { stream_manager: self.stream_manager.clone().unwrap(), config: self.config.clone(), });
vec![ HttpRoute { method: HttpMethod::Get, path: "/my-plugin/**".to_string(), // 通配符匹配 handler: Box::new(HandlerAdapter { handler: handler.clone() }), }, HttpRoute { method: HttpMethod::Post, path: "/my-plugin/**".to_string(), handler: Box::new(HandlerAdapter { handler }), }, ] }}
// 如果你的 Handler 在 Arc 中,需要一个适配器struct HandlerAdapter { handler: Arc<StatusHandler>,}
#[async_trait]impl HttpHandler for HandlerAdapter { async fn handle(&self, req: HttpRequest) -> HttpResponse { self.handler.handle(req).await }}TCP 服务器模式
Section titled “TCP 服务器模式”协议插件(如 RTMP、RTSP)通常需要监听 TCP 端口。SDK 提供了 create_tcp_listener 工具函数。
完整 TCP 服务器实现
Section titled “完整 TCP 服务器实现”use std::net::SocketAddr;use std::sync::atomic::{AtomicU64, Ordering};use std::sync::Arc;use tokio::net::TcpStream;use sdk::prelude::*;
pub struct TcpServerPlugin { state: PluginState, config: TcpServerConfig, stream_manager: Option<Arc<dyn StreamManagerApi>>, task_work: Option<Arc<dyn TaskWork>>, server_handle: Option<tokio::task::JoinHandle<()>>,}
#[async_trait]impl Plugin for TcpServerPlugin { fn set_task_work(&mut self, work: Arc<dyn TaskWork>) { self.task_work = Some(work); }
async fn init( &mut self, manager: Arc<dyn StreamManagerApi>, config: Arc<dyn ConfigProvider>, ) -> Result<()> { self.stream_manager = Some(manager); // 解析监听地址... self.state = PluginState::Initialized; Ok(()) }
async fn start(&mut self, cancel: CancellationToken) -> Result<()> { let addr: SocketAddr = self.config.listen_addr.parse() .unwrap_or("0.0.0.0:9090".parse().unwrap()); let manager = self.stream_manager.clone().unwrap(); let task_work = self.task_work.clone(); let session_counter = Arc::new(AtomicU64::new(0));
// 使用 SDK 的 create_tcp_listener(启用 SO_REUSEPORT) let listener = sdk::create_tcp_listener(addr).map_err(|e| { MonibucaError::Io(std::io::Error::new( std::io::ErrorKind::AddrInUse, format!("Failed to bind {}: {}", addr, e), )) })?;
let listener_task = async move { tracing::info!("TCP server listening on {}", addr);
loop { tokio::select! { biased; _ = cancel.cancelled() => { tracing::info!("TCP server shutting down"); break; } result = listener.accept() => { match result { Ok((stream, peer_addr)) => { let session_id = session_counter .fetch_add(1, Ordering::Relaxed); let manager = manager.clone();
// 为每个连接创建子任务(可在引擎面板中监控) let (session_cancel, child_task) = if let Some(ref tw) = task_work { let child = tw.create_child_task( &format!("Session:{}", peer_addr) ); child.start(); (child.cancellation_token(), Some(child)) } else { (cancel.child_token(), None) };
tokio::spawn(async move { if let Err(e) = handle_session( stream, peer_addr, session_id, manager, session_cancel, ).await { tracing::error!( "Session {} error: {}", peer_addr, e ); } // 会话结束,停止子任务 if let Some(task) = child_task { task.stop(); } }); } Err(e) => { tracing::error!("Accept error: {}", e); } } } } } };
// 用 TaskWork 注册监听器任务 if let Some(ref tw) = self.task_work { tw.register_child("TCPListener", Box::pin(listener_task)); } else { self.server_handle = Some(tokio::spawn(listener_task)); }
self.state = PluginState::Running; Ok(()) }
async fn stop(&mut self) -> Result<()> { if let Some(handle) = self.server_handle.take() { handle.abort(); } self.state = PluginState::Stopped; Ok(()) }
// ... 其他方法}
async fn handle_session( stream: TcpStream, peer_addr: SocketAddr, session_id: u64, manager: Arc<dyn StreamManagerApi>, cancel: CancellationToken,) -> Result<()> { tracing::info!("New session {} from {}", session_id, peer_addr);
tokio::select! { biased; _ = cancel.cancelled() => { tracing::info!("Session {} cancelled", session_id); } result = process_connection(stream, manager) => { if let Err(e) = result { tracing::warn!("Session {} error: {}", session_id, e); } } }
Ok(())}create_tcp_listener
Section titled “create_tcp_listener”/// 创建启用 SO_REUSEPORT 的 TCP 监听器/// 允许多个线程/进程同时 accept 同一端口,提升并发性能pub fn create_tcp_listener(addr: SocketAddr) -> std::io::Result<TcpListener>;TcpSplitter Trait
Section titled “TcpSplitter Trait”对于需要自定义协议分帧的场景,实现 TcpSplitter:
/// 将 TCP 字节流分割为完整的协议消息pub trait TcpSplitter: Send { /// 输入新数据,返回所有已完成的消息 fn split(&mut self, data: &[u8]) -> Vec<Vec<u8>>;
/// 返回当前缓冲中不完整的数据 fn pending(&self) -> &[u8];
/// 清空缓冲区 fn clear(&mut self);}TaskWork 任务管理
Section titled “TaskWork 任务管理”TaskWork 是引擎的任务树管理系统,让插件的子任务(TCP 连接、录制会话等)可以在引擎面板中监控和管理。
TaskWork Trait
Section titled “TaskWork Trait”pub trait TaskWork: Send + Sync + Any { /// 启动根任务 fn start(&self);
/// 注册一个命名的后台任务(fire-and-forget) fn register_child(&self, owner: &str, task: Pin<Box<dyn Future<Output=()> + Send>>);
/// 创建一个可管理的子任务句柄 fn create_child_task(&self, owner: &str) -> Box<dyn ChildTaskHandle>;
/// 移除已完成的子任务 fn remove_child_task(&self, child_id: u64);}ChildTaskHandle Trait
Section titled “ChildTaskHandle Trait”pub trait ChildTaskHandle: Send + Sync + Any { fn start(&self); // 启动子任务 fn stop(&self); // 停止子任务 fn id(&self) -> u64; // 获取唯一 ID fn cancellation_token(&self) -> CancellationToken; // 获取取消令牌}两种子任务模式
Section titled “两种子任务模式”模式 A:register_child — 命名后台任务
适用于长期运行的单一任务(如 TCP 监听循环):
fn set_task_work(&mut self, work: Arc<dyn TaskWork>) { self.task_work = Some(work);}
async fn start(&mut self, cancel: CancellationToken) -> Result<()> { let task = async move { // TCP accept loop, event monitoring, etc. loop { tokio::select! { biased; _ = cancel.cancelled() => break, // ... work ... } } };
if let Some(ref tw) = self.task_work { tw.register_child("TCPListener", Box::pin(task)); } Ok(())}模式 B:create_child_task — 可管理的子任务
适用于动态创建的多个子任务(如每个 TCP 连接、每个录制会话):
// 创建子任务let child = task_work.create_child_task(&format!("RTMP:{}", peer_addr));child.start();let cancel_token = child.cancellation_token();
// 子任务逻辑中使用 cancel_token 感知停机tokio::select! { biased; _ = cancel_token.cancelled() => { /* 被引擎停止 */ } result = do_work() => { /* 正常完成 */ }}
// 完成后清理child.stop();task_work.remove_child_task(child.id());EngineContext 依赖注入
Section titled “EngineContext 依赖注入”EngineContext 是引擎的 IoC(控制反转)容器,基于 TypeId 的能力映射。插件通过它获取引擎提供的各种可选服务。
存储 EngineContext
Section titled “存储 EngineContext”pub struct MyPlugin { state: PluginState, engine_context: Option<EngineContext>, database: Option<Arc<dyn DatabaseApi>>, service_registry: Option<Arc<dyn ServiceRegistry>>,}
impl Plugin for MyPlugin { fn set_engine_context(&mut self, ctx: EngineContext) { // 在 init() 之前被调用,安全存储 if let Some(db) = ctx.database_arc() { self.database = Some(db); } if let Some(reg) = ctx.service_registry() { self.service_registry = Some(reg); } self.engine_context = Some(ctx); }}可用能力一览
Section titled “可用能力一览”| 能力 | 获取方法 | 检查方法 | 说明 |
|---|---|---|---|
| StreamManagerApi | init() 参数 | — | 必有,流管理(发布/订阅/查询) |
| DatabaseApi | ctx.database_arc() | ctx.has_database() | 数据库操作 |
| TransformApi | ctx.transform() | ctx.has_transform() | 流变换(转码等) |
| PlaybackApi | ctx.playback() | ctx.has_playback() | 点播回放 |
| ServiceRegistry | ctx.service_registry() | ctx.has_service_registry() | 插件间服务发现 |
| RoomApi | ctx.room_api() | ctx.has_room_api() | 房间服务 |
| ProxyManager | ctx.proxy_manager() | ctx.has_proxy_manager() | 代理管理 |
| CollectionApi | ctx.collection() | ctx.has_collection() | 数据集合操作 |
| ReportingApi | ctx.reporting() | ctx.has_reporting() | 数据上报 |
| UtilityProvider | ctx.utility_provider() | ctx.has_utility_provider() | 工具函数 |
| HttpRegistry | ctx.http_registry() | ctx.has_http_registry() | HTTP 路由注册 |
泛型能力访问
Section titled “泛型能力访问”除了预定义的能力,还可以通过泛型 API 访问自定义能力:
// 获取任意类型的能力if let Some(my_service) = ctx.get::<Arc<dyn MyCustomApi>>() { // 使用自定义能力...}
// 检查能力是否存在if ctx.has::<Arc<dyn MyCustomApi>>() { // ...}ServiceRegistry 服务发现
Section titled “ServiceRegistry 服务发现”ServiceRegistry 允许插件注册和发现协议工厂服务,是实现拉流(Pull)和推流(Push)跨协议互操作的核心机制。
PullerFactory —— 拉流工厂
Section titled “PullerFactory —— 拉流工厂”当引擎需要从远程源拉流时,会通过 ServiceRegistry 查找对应协议的 PullerFactory:
use sdk::prelude::*;use sdk::services::{PullerFactory, StreamPuller};
pub struct MyPullerFactory;
#[async_trait]impl PullerFactory for MyPullerFactory { fn protocol(&self) -> &str { "myproto" // 协议标识 }
fn supports_url(&self, url: &str) -> bool { url.starts_with("myproto://") }
async fn create_puller( &self, url: &str, stream_path: &str, config: serde_json::Value, manager: Arc<dyn StreamManagerApi>, cancel_token: CancellationToken, ) -> Result<Arc<tokio::sync::RwLock<Box<dyn StreamPuller>>>> { let puller = MyPuller::new(url, stream_path, manager, cancel_token); Ok(Arc::new(tokio::sync::RwLock::new(Box::new(puller)))) }}在 init() 或 start() 中注册:
async fn init(&mut self, manager: Arc<dyn StreamManagerApi>, config: Arc<dyn ConfigProvider>) -> Result<()> { // 通过 EngineContext 获取 ServiceRegistry if let Some(ref ctx) = self.engine_context { if let Some(registry) = ctx.service_registry() { // 注册拉流工厂 registry.register_puller_factory(Arc::new(MyPullerFactory)); // 注册推流工厂 registry.register_pusher_factory(Arc::new(MyPusherFactory)); } } Ok(())}使用工厂(跨插件协作)
Section titled “使用工厂(跨插件协作)”// 查找支持指定 URL 的拉流工厂if let Some(factory) = registry.get_puller_factory_for_url("rtmp://example.com/live/test") { let puller = factory.create_puller( "rtmp://example.com/live/test", "live/test", serde_json::json!({}), manager.clone(), cancel.clone(), ).await?;
// 启动拉流 puller.write().await.start().await?;}
// 列出所有已注册的拉流协议let protocols = registry.list_puller_protocols(); // ["rtmp", "rtsp", "srt", ...]动态加载导出
Section titled “动态加载导出”export_plugin! 宏
Section titled “export_plugin! 宏”通过 export_plugin! 宏导出 C ABI 符号,支持动态加载模式:
sdk::export_plugin!(MyPlugin);该宏会生成三个 no_mangle extern "C" 函数:
| 符号 | 说明 |
|---|---|
plugin_api_version() -> u32 | ABI 版本号,用于兼容性检查 |
plugin_create() -> PluginHandle | 创建插件实例(调用 Default::default()) |
plugin_metadata() -> PluginMetadata | 返回插件名称、版本、描述(用于发现) |
编译为动态库
Section titled “编译为动态库”# 在 Cargo.toml 中添加[lib]crate-type = ["cdylib"]
# 编译cargo build --release
# 产物路径# Linux: target/release/libplugin_my_plugin.so# macOS: target/release/libplugin_my_plugin.dylib# Windows: target/release/plugin_my_plugin.dll
# 部署:复制到 Monibuca 插件目录cp target/release/libplugin_my_plugin.so /path/to/monibuca/plugins/编译为 WASM
Section titled “编译为 WASM”cargo build --target wasm32-wasi --releaseSDK 提供统一的错误类型 MonibucaError 和 Result<T>:
use sdk::prelude::*;
// Result<T> = std::result::Result<T, MonibucaError>
// 常用错误构造MonibucaError::Plugin("my error message".to_string()) // 插件自定义错误MonibucaError::Io(io_error) // IO 错误MonibucaError::Config("invalid config".to_string()) // 配置错误
// 在插件中使用async fn init(&mut self, ...) -> Result<()> { let config = load_config().map_err(|e| MonibucaError::Plugin(format!("Failed to load config: {}", e)) )?; Ok(())}示例一:截图插件(HTTP + 订阅)
Section titled “示例一:截图插件(HTTP + 订阅)”这个示例展示了一个实用的截图插件——通过 HTTP 请求触发对指定流的截图:
use std::any::Any;use std::sync::Arc;use std::time::Duration;use sdk::prelude::*;
pub struct SnapPlugin { state: PluginState, stream_manager: Option<Arc<dyn StreamManagerApi>>,}
impl Default for SnapPlugin { fn default() -> Self { Self { state: PluginState::Created, stream_manager: None } }}
// HTTP Handler:接收截图请求struct SnapHandler { stream_manager: Arc<dyn StreamManagerApi>,}
#[async_trait]impl HttpHandler for SnapHandler { async fn handle(&self, req: HttpRequest) -> HttpResponse { // 从路径中提取流名称: /snap/live/camera → live/camera let stream_path = req.path.trim_start_matches('/');
if !self.stream_manager.has_stream(stream_path) { return HttpResponse::not_found().json(&serde_json::json!({ "error": "Stream not found" })); }
// 订阅流,只接收视频,等待一个关键帧 let config = SubscriberConfig { mode: SubscribeMode::WaitKeyframe, receive_video: true, receive_audio: false, keyframe_timeout: Some(Duration::from_secs(5)), ..Default::default() };
match self.stream_manager.subscribe(stream_path, config).await { Ok((_pub_ref, mut subscriber)) => { // 等待第一个关键帧 if subscriber.wait_for_frames().await.is_ok() { if let Ok(Some(frame)) = subscriber.read_video() { if frame.is_keyframe() { let raw_data = frame.get_raw(); // 这里可以进一步解码为图片... return HttpResponse::ok() .content_type("application/octet-stream") .body(raw_data); } } } self.stream_manager.unsubscribe(stream_path, subscriber.as_mut()); HttpResponse::internal_error().text("Failed to capture frame") } Err(e) => { HttpResponse::internal_error().text(&format!("Subscribe failed: {}", e)) } } }}
#[async_trait]impl Plugin for SnapPlugin { fn info(&self) -> PluginInfo { PluginInfo { name: "snap", version: "1.0.0", description: "Video snapshot plugin", author: "Monibuca Team", } } fn state(&self) -> PluginState { self.state }
async fn init(&mut self, manager: Arc<dyn StreamManagerApi>, _config: Arc<dyn ConfigProvider>) -> Result<()> { self.stream_manager = Some(manager); self.state = PluginState::Initialized; Ok(()) }
async fn start(&mut self, _cancel: CancellationToken) -> Result<()> { self.state = PluginState::Running; Ok(()) }
async fn stop(&mut self) -> Result<()> { self.state = PluginState::Stopped; Ok(()) }
fn http_routes(&self) -> Vec<HttpRoute> { if let Some(ref sm) = self.stream_manager { vec![HttpRoute { method: HttpMethod::Get, path: "/snap/**".to_string(), handler: Box::new(SnapHandler { stream_manager: sm.clone(), }), }] } else { Vec::new() } }
fn as_any(&self) -> &dyn Any { self } fn as_any_mut(&mut self) -> &mut dyn Any { self }}示例二:流转发插件(发布 + 订阅联动)
Section titled “示例二:流转发插件(发布 + 订阅联动)”将一个流转发到另一个路径(例如添加水印或改变路径):
async fn relay_stream( manager: Arc<dyn StreamManagerApi>, source_path: &str, target_path: &str, cancel: CancellationToken,) -> Result<()> { // 1. 订阅源流 let config = SubscriberConfig::default(); let (source_pub, mut subscriber) = manager.subscribe(source_path, config).await?;
// 2. 创建目标 Publisher let target_pub = manager.create_publisher_for_plugin(target_path, "relay")?;
// 3. 复制编解码配置 { let source = source_pub.read(); let mut target = target_pub.write(); target.init_video_track(); target.init_audio_track();
if let Some(codec) = source.audio_codec() { target.set_audio_codec(codec); } if let Some(seq) = source.video_seq_header() { target.set_video_seq_header(seq); } if let Some(seq) = source.audio_seq_header() { target.set_audio_seq_header(seq); } }
// 4. 转发帧数据 loop { if subscriber.is_stopped() || cancel.is_cancelled() { break; }
while let Ok(Some(frame)) = subscriber.read_video() { let _ = target_pub.read().write_video( frame.video_frame_type, frame.timestamp, // pts frame.timestamp, // dts frame.get_avcc(), ); }
while let Some(frame) = subscriber.read_audio() { let _ = target_pub.read().write_audio( frame.timestamp, frame.get_audio_raw(), ); }
match tokio::time::timeout( Duration::from_millis(10), subscriber.wait_for_frames(), ).await { Ok(Ok(())) => continue, Ok(Err(_)) => break, Err(_) => continue, } }
// 5. 清理 manager.unsubscribe(source_path, subscriber.as_mut()); manager.dispose_stream(target_path); Ok(())}开发最佳实践
Section titled “开发最佳实践”1. 保持 init() 轻量
Section titled “1. 保持 init() 轻量”init() 中只做配置加载和依赖注入。耗时操作(网络监听、文件 I/O)放到 start() 中:
// ✅ 正确async fn init(&mut self, manager: Arc<dyn StreamManagerApi>, config: Arc<dyn ConfigProvider>) -> Result<()> { self.config = load_config(config)?; // 轻量 self.stream_manager = Some(manager); // 存储引用 self.state = PluginState::Initialized; Ok(())}
async fn start(&mut self, cancel: CancellationToken) -> Result<()> { let listener = sdk::create_tcp_listener(self.config.addr)?; // 耗时操作 // 启动 accept loop... Ok(())}
// ❌ 错误:在 init 中做网络操作async fn init(&mut self, ...) -> Result<()> { let listener = sdk::create_tcp_listener(addr)?; // 不应该在这里 Ok(())}2. 正确使用 CancellationToken
Section titled “2. 正确使用 CancellationToken”始终在异步循环中检查 cancel 信号,使用 biased 确保优先响应停机:
loop { tokio::select! { biased; // ← 优先检查第一个分支 _ = cancel.cancelled() => { tracing::info!("Shutting down gracefully"); break; } result = do_work() => { // 处理结果... } }}3. 优雅停机
Section titled “3. 优雅停机”在 stop() 中正确清理所有资源:
async fn stop(&mut self) -> Result<()> { // 1. 停止后台任务 if let Some(handle) = self.server_handle.take() { handle.abort(); }
// 2. 关闭网络连接 // 3. 等待正在进行的操作完成 // 4. 释放资源
self.state = PluginState::Stopped; tracing::info!("Plugin stopped"); Ok(())}同时建议为持有资源的会话实现 Drop:
impl Drop for MySession { fn drop(&mut self) { self.cleanup(); // 确保即使 panic 也能清理 }}4. 结构化日志
Section titled “4. 结构化日志”使用 tracing 进行带上下文的结构化日志:
use tracing::{info, warn, error, debug, instrument};
#[instrument(skip(stream), fields(session_id = %session_id))]async fn handle_session(stream: TcpStream, session_id: u64) { info!("Session started"); // 后续所有日志自动包含 session_id 字段
if let Err(e) = process(stream).await { error!(error = %e, "Session error"); } info!("Session ended");}5. 错误传播而非 panic
Section titled “5. 错误传播而非 panic”插件代码中应避免 unwrap() 和 panic!,使用 Result 传播错误:
// ✅ 正确let addr: SocketAddr = config.addr.parse().map_err(|e| MonibucaError::Config(format!("Invalid address: {}", e)))?;
// ❌ 避免let addr: SocketAddr = config.addr.parse().unwrap();6. 使用 TaskWork 管理子任务
Section titled “6. 使用 TaskWork 管理子任务”通过 TaskWork 注册子任务,让引擎可以监控和统一管理:
// ✅ 使用 TaskWorkif let Some(ref tw) = self.task_work { tw.register_child("TCPListener", Box::pin(listener_task));}
// 较弱的替代方案(不受引擎管理)tokio::spawn(listener_task);7. 配置热更新支持
Section titled “7. 配置热更新支持”对于需要运行时调整的配置,实现 on_config_update():
fn on_config_update(&mut self, config: &serde_json::Value) -> Result<()> { if let Ok(new_config) = serde_json::from_value::<MyConfig>(config.clone()) { // 只更新可以安全热更新的字段 self.config.max_connections = new_config.max_connections; self.config.timeout = new_config.timeout; // 注意:端口等需要重启的字段不应热更新 tracing::info!("Config updated"); } Ok(())}项目结构参考
Section titled “项目结构参考”一个功能完整的插件推荐以下目录结构:
plugins/my-plugin/├── Cargo.toml├── src/│ ├── lib.rs # 模块导出 + export_plugin!│ ├── plugin.rs # Plugin trait 实现(生命周期)│ ├── config.rs # ConfigSchema 配置定义│ ├── handler.rs # HTTP Handler(如果有 HTTP API)│ ├── session.rs # 连接会话逻辑(如果是 TCP 服务)│ ├── factory.rs # PullerFactory/PusherFactory(如果是协议插件)│ └── error.rs # 插件自定义错误类型(可选)└── tests/ └── integration.rs # 集成测试(可选)- 插件系统 — 理解插件系统的架构设计
- 流管理 — 深入了解 Dispatcher 和 RingBuffer
- SDK API 参考 — 完整的 SDK 类型和接口参考
- HTTP API — Monibuca 内置 HTTP API 文档
联系我们