跳转到内容

插件开发实战

本页面通过三个由简到繁的完整插件示例,带你实践 Monibuca SDK 的核心开发模式。每个示例都可以直接编译运行。

一个最实用的入门级插件——监控所有流的创建和销毁,并通过 HTTP API 提供实时统计数据。

plugins/monitor/
├── Cargo.toml
└── src/
├── lib.rs
├── config.rs
└── plugin.rs
[package]
name = "plugin-monitor"
version = "0.1.0"
edition = "2024"
[dependencies]
sdk = { path = "../../crates/monibuca-sdk" }
async-trait = "0.1"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
tokio = { version = "1", features = ["full"] }
tokio-util = "0.7"
tracing = "0.1"
[lib]
crate-type = ["cdylib", "rlib"]
mod config;
mod plugin;
pub use plugin::MonitorPlugin;
sdk::export_plugin!(MonitorPlugin);
use sdk::ConfigSchema;
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Serialize, Deserialize, ConfigSchema)]
#[schema(
plugin = "monitor",
title = "Stream Monitor",
description = "Real-time stream monitoring and statistics",
sdk_path = "sdk"
)]
pub struct MonitorConfig {
#[serde(default = "default_enable")]
#[schema(label = "启用", desc = "是否启用监控插件")]
pub enable: bool,
#[serde(default = "default_history_size")]
#[schema(label = "历史记录数", desc = "保留最近的事件记录条数",
min = 10, max = 10000)]
pub history_size: usize,
}
fn default_enable() -> bool { true }
fn default_history_size() -> usize { 100 }
impl Default for MonitorConfig {
fn default() -> Self {
Self {
enable: default_enable(),
history_size: default_history_size(),
}
}
}
use std::any::Any;
use std::collections::VecDeque;
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use async_trait::async_trait;
use tokio_util::sync::CancellationToken;
use sdk::prelude::*;
use parking_lot::Mutex;
use crate::config::MonitorConfig;
/// 流事件记录
#[derive(Debug, Clone, serde::Serialize)]
struct EventRecord {
timestamp: String,
event_type: String,
stream_path: String,
}
/// 共享统计数据
struct MonitorStats {
total_created: AtomicU64,
total_disposed: AtomicU64,
total_subscribers: AtomicU64,
event_history: Mutex<VecDeque<EventRecord>>,
history_size: usize,
}
impl MonitorStats {
fn new(history_size: usize) -> Self {
Self {
total_created: AtomicU64::new(0),
total_disposed: AtomicU64::new(0),
total_subscribers: AtomicU64::new(0),
event_history: Mutex::new(VecDeque::with_capacity(history_size)),
history_size,
}
}
fn record_event(&self, event_type: &str, stream_path: &str) {
let record = EventRecord {
timestamp: chrono::Utc::now().to_rfc3339(),
event_type: event_type.to_string(),
stream_path: stream_path.to_string(),
};
let mut history = self.event_history.lock();
if history.len() >= self.history_size {
history.pop_front();
}
history.push_back(record);
}
}
pub struct MonitorPlugin {
state: PluginState,
config: MonitorConfig,
stream_manager: Option<Arc<dyn StreamManagerApi>>,
stats: Option<Arc<MonitorStats>>,
}
impl Default for MonitorPlugin {
fn default() -> Self {
Self {
state: PluginState::Created,
config: MonitorConfig::default(),
stream_manager: None,
stats: None,
}
}
}
#[async_trait]
impl Plugin for MonitorPlugin {
fn info(&self) -> PluginInfo {
PluginInfo {
name: "monitor",
version: "1.0.0",
description: "Real-time stream monitoring",
author: "Monibuca Team",
}
}
fn state(&self) -> PluginState { self.state }
async fn init(
&mut self,
manager: Arc<dyn StreamManagerApi>,
config: Arc<dyn ConfigProvider>,
) -> Result<()> {
// 加载配置
self.config = config
.get_plugin_config("monitor")
.and_then(|v| serde_json::from_value(v).ok())
.unwrap_or_default();
if !self.config.enable {
self.state = PluginState::Disabled;
return Ok(());
}
self.stream_manager = Some(manager);
self.stats = Some(Arc::new(MonitorStats::new(self.config.history_size)));
self.state = PluginState::Initialized;
tracing::info!("Monitor plugin initialized, history_size={}",
self.config.history_size);
Ok(())
}
async fn start(&mut self, cancel: CancellationToken) -> Result<()> {
if self.state == PluginState::Disabled {
return Ok(());
}
let manager = self.stream_manager.clone().unwrap();
let stats = self.stats.clone().unwrap();
// 启动事件监听协程
let mut event_rx = manager.subscribe_events();
tokio::spawn(async move {
loop {
tokio::select! {
biased;
_ = cancel.cancelled() => {
tracing::info!("Monitor event listener stopped");
break;
}
event = event_rx.recv() => {
match event {
Ok(StreamEvent::Created(path)) => {
stats.total_created.fetch_add(1, Ordering::Relaxed);
stats.record_event("created", &path);
tracing::info!("[Monitor] Stream created: {}", path);
}
Ok(StreamEvent::Disposed(path)) => {
stats.total_disposed.fetch_add(1, Ordering::Relaxed);
stats.record_event("disposed", &path);
tracing::info!("[Monitor] Stream disposed: {}", path);
}
Ok(StreamEvent::SubscriberAdded { stream, .. }) => {
stats.total_subscribers.fetch_add(1, Ordering::Relaxed);
stats.record_event("subscriber_added", &stream);
}
Ok(StreamEvent::SubscriberRemoved { stream, .. }) => {
stats.record_event("subscriber_removed", &stream);
}
Err(_) => break,
}
}
}
}
});
self.state = PluginState::Running;
tracing::info!("Monitor plugin started");
Ok(())
}
async fn stop(&mut self) -> Result<()> {
self.state = PluginState::Stopped;
Ok(())
}
/// 注册 HTTP API
fn http_routes(&self) -> Vec<HttpRoute> {
if let (Some(sm), Some(stats)) = (&self.stream_manager, &self.stats) {
let handler = MonitorHandler {
stream_manager: sm.clone(),
stats: stats.clone(),
};
vec![
HttpRoute {
method: HttpMethod::Get,
path: "/monitor/**".to_string(),
handler: Box::new(handler),
},
]
} else {
Vec::new()
}
}
fn as_any(&self) -> &dyn Any { self }
fn as_any_mut(&mut self) -> &mut dyn Any { self }
}
/// HTTP API Handler
struct MonitorHandler {
stream_manager: Arc<dyn StreamManagerApi>,
stats: Arc<MonitorStats>,
}
#[async_trait]
impl HttpHandler for MonitorHandler {
async fn handle(&self, req: HttpRequest) -> HttpResponse {
let path = req.path.trim_start_matches('/');
let parts: Vec<&str> = path.split('/').collect();
let action = parts.first().copied().unwrap_or("summary");
match action {
// GET /monitor/summary — 获取统计概要
"summary" => {
let active_streams = self.stream_manager.stream_paths();
HttpResponse::ok().json(&serde_json::json!({
"code": 0,
"data": {
"active_streams": active_streams.len(),
"stream_list": active_streams,
"total_created": self.stats.total_created
.load(Ordering::Relaxed),
"total_disposed": self.stats.total_disposed
.load(Ordering::Relaxed),
"total_subscribers": self.stats.total_subscribers
.load(Ordering::Relaxed),
}
}))
}
// GET /monitor/events — 获取最近事件记录
"events" => {
let history = self.stats.event_history.lock();
let events: Vec<_> = history.iter().cloned().collect();
HttpResponse::ok().json(&serde_json::json!({
"code": 0,
"count": events.len(),
"events": events,
}))
}
_ => HttpResponse::not_found().json(&serde_json::json!({
"code": 404,
"message": "Unknown endpoint. Try /monitor/summary or /monitor/events"
})),
}
}
}
Terminal window
# 启动 Monibuca 后,访问 API
curl http://localhost:8180/monitor/summary
# {"code":0,"data":{"active_streams":2,"stream_list":["live/test","live/camera"],...}}
curl http://localhost:8180/monitor/events
# {"code":0,"count":5,"events":[{"timestamp":"...","event_type":"created","stream_path":"live/test"},...]}

示例二:简易 TCP 协议服务器插件

Section titled “示例二:简易 TCP 协议服务器插件”

一个完整的 TCP 服务器插件,实现简单的文本协议——客户端通过 TCP 发送命令来发布或订阅流。

简单文本协议,每行一个命令:

PUBLISH <stream_path> → 开始发布流
SUBSCRIBE <stream_path> → 开始订阅流
DATA <hex_bytes> → 发送数据帧(发布模式)
STOP → 停止当前操作
use std::any::Any;
use std::net::SocketAddr;
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::Duration;
use async_trait::async_trait;
use bytes::Bytes;
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
use tokio::net::TcpStream;
use tokio_util::sync::CancellationToken;
use sdk::prelude::*;
pub struct SimpleTcpPlugin {
state: PluginState,
listen_addr: SocketAddr,
stream_manager: Option<Arc<dyn StreamManagerApi>>,
task_work: Option<Arc<dyn TaskWork>>,
}
impl Default for SimpleTcpPlugin {
fn default() -> Self {
Self {
state: PluginState::Created,
listen_addr: "0.0.0.0:9090".parse().unwrap(),
stream_manager: None,
task_work: None,
}
}
}
#[async_trait]
impl Plugin for SimpleTcpPlugin {
fn info(&self) -> PluginInfo {
PluginInfo {
name: "simple-tcp",
version: "0.1.0",
description: "Simple TCP protocol plugin",
author: "Tutorial",
}
}
fn state(&self) -> PluginState { self.state }
// 存储 TaskWork 引用
fn set_task_work(&mut self, work: Arc<dyn TaskWork>) {
self.task_work = Some(work);
}
async fn init(
&mut self,
manager: Arc<dyn StreamManagerApi>,
config: Arc<dyn ConfigProvider>,
) -> Result<()> {
self.stream_manager = Some(manager);
// 解析监听地址
if let Some(cfg) = config.get_plugin_config("simple-tcp") {
if let Some(addr) = cfg.get("listen_addr").and_then(|v| v.as_str()) {
let addr_str = if addr.starts_with(':') {
format!("0.0.0.0{}", addr)
} else {
addr.to_string()
};
self.listen_addr = addr_str.parse().unwrap_or(self.listen_addr);
}
}
self.state = PluginState::Initialized;
Ok(())
}
async fn start(&mut self, cancel: CancellationToken) -> Result<()> {
let addr = self.listen_addr;
let manager = self.stream_manager.clone().unwrap();
let task_work = self.task_work.clone();
let session_counter = Arc::new(AtomicU64::new(0));
// 创建 TCP 监听器
let listener = sdk::create_tcp_listener(addr).map_err(|e| {
MonibucaError::Io(std::io::Error::new(
std::io::ErrorKind::AddrInUse,
format!("Failed to bind {}: {}", addr, e),
))
})?;
tracing::info!("Simple TCP server listening on {}", addr);
// TCP accept 主循环
let accept_loop = async move {
loop {
tokio::select! {
biased;
_ = cancel.cancelled() => {
tracing::info!("TCP server shutting down");
break;
}
result = listener.accept() => {
match result {
Ok((stream, peer_addr)) => {
let session_id = session_counter
.fetch_add(1, Ordering::Relaxed);
let manager = manager.clone();
let tw = task_work.clone();
// 为每个连接创建子任务
let (session_cancel, child) =
if let Some(ref tw) = tw {
let child = tw.create_child_task(
&format!("TCP:{}", peer_addr)
);
child.start();
let token = child.cancellation_token();
(token, Some(child))
} else {
(cancel.child_token(), None)
};
tokio::spawn(async move {
tracing::info!(
"Session {} from {}",
session_id, peer_addr
);
if let Err(e) = handle_session(
stream, session_id,
manager, session_cancel,
).await {
tracing::warn!(
"Session {} error: {}",
session_id, e
);
}
// 清理子任务
if let (Some(child), Some(tw)) = (child, tw) {
child.stop();
tw.remove_child_task(child.id());
}
tracing::info!(
"Session {} ended", session_id
);
});
}
Err(e) => {
tracing::error!("Accept error: {}", e);
}
}
}
}
}
};
// 注册到 TaskWork
if let Some(ref tw) = self.task_work {
tw.register_child("TCPListener", Box::pin(accept_loop));
} else {
tokio::spawn(accept_loop);
}
self.state = PluginState::Running;
Ok(())
}
async fn stop(&mut self) -> Result<()> {
self.state = PluginState::Stopped;
tracing::info!("Simple TCP plugin stopped");
Ok(())
}
fn as_any(&self) -> &dyn Any { self }
fn as_any_mut(&mut self) -> &mut dyn Any { self }
}
/// 处理单个 TCP 会话
async fn handle_session(
stream: TcpStream,
session_id: u64,
manager: Arc<dyn StreamManagerApi>,
cancel: CancellationToken,
) -> Result<()> {
let (reader, mut writer) = stream.into_split();
let mut lines = BufReader::new(reader).lines();
writer.write_all(b"WELCOME simple-tcp/1.0\n").await
.map_err(|e| MonibucaError::Io(e))?;
loop {
tokio::select! {
biased;
_ = cancel.cancelled() => {
writer.write_all(b"BYE server shutting down\n").await.ok();
break;
}
line = lines.next_line() => {
match line {
Ok(Some(line)) => {
let parts: Vec<&str> = line.trim().splitn(2, ' ').collect();
let cmd = parts[0].to_uppercase();
let arg = parts.get(1).copied().unwrap_or("");
match cmd.as_str() {
"PUBLISH" => {
if arg.is_empty() {
writer.write_all(
b"ERR missing stream path\n"
).await.ok();
continue;
}
match do_publish(
&manager, arg, &mut lines,
&mut writer, &cancel,
).await {
Ok(_) => {
writer.write_all(
b"OK publish ended\n"
).await.ok();
}
Err(e) => {
let msg = format!(
"ERR publish failed: {}\n", e
);
writer.write_all(
msg.as_bytes()
).await.ok();
}
}
}
"SUBSCRIBE" => {
if arg.is_empty() {
writer.write_all(
b"ERR missing stream path\n"
).await.ok();
continue;
}
match do_subscribe(
&manager, arg,
&mut writer, &cancel,
).await {
Ok(_) => {
writer.write_all(
b"OK subscribe ended\n"
).await.ok();
}
Err(e) => {
let msg = format!(
"ERR subscribe failed: {}\n", e
);
writer.write_all(
msg.as_bytes()
).await.ok();
}
}
}
"QUIT" => break,
_ => {
writer.write_all(
b"ERR unknown command\n"
).await.ok();
}
}
}
Ok(None) => break, // 连接关闭
Err(e) => {
tracing::warn!("Read error: {}", e);
break;
}
}
}
}
}
Ok(())
}
/// 发布流逻辑
async fn do_publish(
manager: &Arc<dyn StreamManagerApi>,
stream_path: &str,
lines: &mut tokio::io::Lines<BufReader<tokio::net::tcp::OwnedReadHalf>>,
writer: &mut tokio::net::tcp::OwnedWriteHalf,
cancel: &CancellationToken,
) -> Result<()> {
// 创建 Publisher
let publisher = manager.create_publisher_for_plugin(stream_path, "simple-tcp")?;
{
let mut pub_guard = publisher.write();
pub_guard.init_video_track();
pub_guard.init_audio_track();
}
writer.write_all(
format!("OK publishing to {}\n", stream_path).as_bytes()
).await.map_err(|e| MonibucaError::Io(e))?;
let mut frame_count: u64 = 0;
loop {
tokio::select! {
biased;
_ = cancel.cancelled() => break,
line = lines.next_line() => {
match line {
Ok(Some(line)) => {
let line = line.trim();
if line == "STOP" {
break;
}
if let Some(hex_data) = line.strip_prefix("DATA ") {
// 简单示例:将 hex 数据作为视频帧写入
if let Ok(data) = hex::decode(hex_data) {
let ts = Duration::from_millis(frame_count * 33);
let frame_type = if frame_count % 30 == 0 {
VideoFrameType::Keyframe
} else {
VideoFrameType::Interframe
};
let _ = publisher.read().write_video(
frame_type, ts, ts,
Bytes::from(data),
);
frame_count += 1;
}
}
}
Ok(None) => break,
Err(_) => break,
}
}
}
}
// 清理
manager.dispose_stream(stream_path);
Ok(())
}
/// 订阅流逻辑
async fn do_subscribe(
manager: &Arc<dyn StreamManagerApi>,
stream_path: &str,
writer: &mut tokio::net::tcp::OwnedWriteHalf,
cancel: &CancellationToken,
) -> Result<()> {
let config = SubscriberConfig {
mode: SubscribeMode::RealTime,
receive_video: true,
receive_audio: true,
keyframe_timeout: Some(Duration::from_secs(10)),
..Default::default()
};
let (_pub_ref, mut subscriber) = manager.subscribe(stream_path, config).await?;
writer.write_all(
format!("OK subscribed to {}\n", stream_path).as_bytes()
).await.map_err(|e| MonibucaError::Io(e))?;
loop {
if subscriber.is_stopped() || cancel.is_cancelled() {
break;
}
// 读取并转发视频帧
while let Ok(Some(frame)) = subscriber.read_video() {
let info = format!(
"FRAME video ts={} keyframe={} size={}\n",
frame.timestamp.as_millis(),
frame.is_keyframe(),
frame.get_raw().len(),
);
if writer.write_all(info.as_bytes()).await.is_err() {
manager.unsubscribe(stream_path, subscriber.as_mut());
return Ok(());
}
}
// 读取并转发音频帧
while let Some(frame) = subscriber.read_audio() {
let info = format!(
"FRAME audio ts={} size={}\n",
frame.timestamp.as_millis(),
frame.get_audio_raw().len(),
);
if writer.write_all(info.as_bytes()).await.is_err() {
manager.unsubscribe(stream_path, subscriber.as_mut());
return Ok(());
}
}
// 等待新帧
match tokio::time::timeout(
Duration::from_millis(100),
subscriber.wait_for_frames(),
).await {
Ok(Ok(())) => continue,
Ok(Err(_)) => break,
Err(_) => continue,
}
}
manager.unsubscribe(stream_path, subscriber.as_mut());
Ok(())
}

实现 PullerFactory,让其他插件(如 cluster 集群插件)能通过你的协议从远程拉流。

use std::any::Any;
use std::sync::Arc;
use std::time::Duration;
use async_trait::async_trait;
use tokio_util::sync::CancellationToken;
use sdk::prelude::*;
use sdk::services::{PullerFactory, StreamPuller};
// ════════════════════════════════════════════════════════════════
// 插件主体
// ════════════════════════════════════════════════════════════════
pub struct MyProtocolPlugin {
state: PluginState,
stream_manager: Option<Arc<dyn StreamManagerApi>>,
engine_context: Option<EngineContext>,
}
impl Default for MyProtocolPlugin {
fn default() -> Self {
Self {
state: PluginState::Created,
stream_manager: None,
engine_context: None,
}
}
}
#[async_trait]
impl Plugin for MyProtocolPlugin {
fn info(&self) -> PluginInfo {
PluginInfo {
name: "myproto",
version: "0.1.0",
description: "My custom protocol plugin",
author: "Tutorial",
}
}
fn state(&self) -> PluginState { self.state }
// 接收 EngineContext,获取 ServiceRegistry
fn set_engine_context(&mut self, ctx: EngineContext) {
self.engine_context = Some(ctx);
}
async fn init(
&mut self,
manager: Arc<dyn StreamManagerApi>,
_config: Arc<dyn ConfigProvider>,
) -> Result<()> {
self.stream_manager = Some(manager.clone());
// 向 ServiceRegistry 注册拉流工厂
if let Some(ref ctx) = self.engine_context {
if let Some(registry) = ctx.service_registry() {
registry.register_puller_factory(
Arc::new(MyProtoPullerFactory {
default_manager: manager,
})
);
tracing::info!("Registered myproto:// puller factory");
}
}
self.state = PluginState::Initialized;
Ok(())
}
async fn start(&mut self, _cancel: CancellationToken) -> Result<()> {
self.state = PluginState::Running;
Ok(())
}
async fn stop(&mut self) -> Result<()> {
self.state = PluginState::Stopped;
Ok(())
}
fn as_any(&self) -> &dyn Any { self }
fn as_any_mut(&mut self) -> &mut dyn Any { self }
}
// ════════════════════════════════════════════════════════════════
// PullerFactory 实现
// ════════════════════════════════════════════════════════════════
struct MyProtoPullerFactory {
default_manager: Arc<dyn StreamManagerApi>,
}
#[async_trait]
impl PullerFactory for MyProtoPullerFactory {
fn protocol(&self) -> &str {
"myproto"
}
fn supports_url(&self, url: &str) -> bool {
url.starts_with("myproto://")
}
async fn create_puller(
&self,
url: &str,
stream_path: &str,
config: serde_json::Value,
manager: Arc<dyn StreamManagerApi>,
cancel_token: CancellationToken,
) -> Result<Arc<tokio::sync::RwLock<Box<dyn StreamPuller>>>> {
// 解析 URL
let remote_addr = url.strip_prefix("myproto://")
.ok_or_else(|| MonibucaError::Plugin("Invalid URL".to_string()))?;
// 从 JSON config 中读取可选参数
let timeout_ms = config.get("timeout_ms")
.and_then(|v| v.as_u64())
.unwrap_or(5000);
let puller = MyProtoPuller {
remote_addr: remote_addr.to_string(),
stream_path: stream_path.to_string(),
manager,
cancel_token,
timeout: Duration::from_millis(timeout_ms),
running: false,
};
Ok(Arc::new(tokio::sync::RwLock::new(
Box::new(puller) as Box<dyn StreamPuller>
)))
}
}
// ════════════════════════════════════════════════════════════════
// StreamPuller 实现
// ════════════════════════════════════════════════════════════════
struct MyProtoPuller {
remote_addr: String,
stream_path: String,
manager: Arc<dyn StreamManagerApi>,
cancel_token: CancellationToken,
timeout: Duration,
running: bool,
}
#[async_trait]
impl StreamPuller for MyProtoPuller {
async fn start(&mut self) -> Result<()> {
self.running = true;
tracing::info!(
"Pulling from myproto://{} to {}",
self.remote_addr, self.stream_path
);
// 1. 连接远程服务器
let stream = tokio::time::timeout(
self.timeout,
tokio::net::TcpStream::connect(&self.remote_addr),
).await
.map_err(|_| MonibucaError::Plugin("Connect timeout".to_string()))?
.map_err(|e| MonibucaError::Io(e))?;
// 2. 创建本地 Publisher
let publisher = self.manager.create_publisher_for_plugin(
&self.stream_path, "myproto"
)?;
{
let mut pub_guard = publisher.write();
pub_guard.init_video_track();
pub_guard.init_audio_track();
}
// 3. 从远程读取数据并写入 Publisher
let cancel = self.cancel_token.clone();
let (reader, _writer) = stream.into_split();
let mut buf_reader = tokio::io::BufReader::new(reader);
loop {
tokio::select! {
biased;
_ = cancel.cancelled() => break,
// 这里应该是你的自定义协议解析逻辑
// 伪代码示意:
result = read_protocol_frame(&mut buf_reader) => {
match result {
Ok(frame) => {
publisher.read().write_video(
frame.frame_type,
frame.pts,
frame.dts,
frame.data,
)?;
}
Err(_) => break,
}
}
}
}
// 4. 清理
self.manager.dispose_stream(&self.stream_path);
self.running = false;
Ok(())
}
fn stop(&self) -> Result<()> {
self.cancel_token.cancel();
Ok(())
}
fn stream_path(&self) -> &str {
&self.stream_path
}
fn is_running(&self) -> bool {
self.running
}
}

其他插件如何使用你的拉流工厂

Section titled “其他插件如何使用你的拉流工厂”
// 在任何其他插件中(例如 cluster 插件):
if let Some(registry) = engine_context.service_registry() {
// 按 URL 自动匹配工厂
if let Some(factory) = registry.get_puller_factory_for_url("myproto://192.168.1.100:9090") {
let puller = factory.create_puller(
"myproto://192.168.1.100:9090/live/test",
"live/remote-test",
serde_json::json!({"timeout_ms": 3000}),
stream_manager.clone(),
cancel_token.clone(),
).await?;
// 启动拉流
puller.write().await.start().await?;
}
}

通过这三个示例,你已经掌握了 Monibuca 插件开发的核心模式:

模式示例关键技术
HTTP API 插件流监控http_routes(), HttpHandler, StreamEvent
TCP 服务器插件简易 TCPcreate_tcp_listener, TaskWork, 发布/订阅
协议工厂插件拉流工厂EngineContext, ServiceRegistry, PullerFactory

更多信息请参考:

联系我们

微信公众号:不卡科技 微信公众号二维码
腾讯频道:流媒体技术 腾讯频道二维码
QQ 频道:p0qq0crz08 QQ 频道二维码
QQ 群:751639168 QQ 群二维码