Skip to content

Plugin Development in Practice

This page walks you through three complete plugin examples of increasing complexity to help you practice the core development patterns of the Monibuca SDK. Each example can be compiled and run directly.

A highly practical beginner-level plugin — monitors the creation and destruction of all streams and provides real-time statistics via an HTTP API.

plugins/monitor/
├── Cargo.toml
└── src/
├── lib.rs
├── config.rs
└── plugin.rs
[package]
name = "plugin-monitor"
version = "0.1.0"
edition = "2024"
[dependencies]
sdk = { path = "../../crates/monibuca-sdk" }
async-trait = "0.1"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
tokio = { version = "1", features = ["full"] }
tokio-util = "0.7"
tracing = "0.1"
[lib]
crate-type = ["cdylib", "rlib"]
mod config;
mod plugin;
pub use plugin::MonitorPlugin;
sdk::export_plugin!(MonitorPlugin);
use sdk::ConfigSchema;
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Serialize, Deserialize, ConfigSchema)]
#[schema(
plugin = "monitor",
title = "Stream Monitor",
description = "Real-time stream monitoring and statistics",
sdk_path = "sdk"
)]
pub struct MonitorConfig {
#[serde(default = "default_enable")]
#[schema(label = "Enable", desc = "Whether to enable the monitor plugin")]
pub enable: bool,
#[serde(default = "default_history_size")]
#[schema(label = "History Size", desc = "Number of recent event records to retain",
min = 10, max = 10000)]
pub history_size: usize,
}
fn default_enable() -> bool { true }
fn default_history_size() -> usize { 100 }
impl Default for MonitorConfig {
fn default() -> Self {
Self {
enable: default_enable(),
history_size: default_history_size(),
}
}
}
use std::any::Any;
use std::collections::VecDeque;
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use async_trait::async_trait;
use tokio_util::sync::CancellationToken;
use sdk::prelude::*;
use parking_lot::Mutex;
use crate::config::MonitorConfig;
/// Stream event record
#[derive(Debug, Clone, serde::Serialize)]
struct EventRecord {
timestamp: String,
event_type: String,
stream_path: String,
}
/// Shared statistics data
struct MonitorStats {
total_created: AtomicU64,
total_disposed: AtomicU64,
total_subscribers: AtomicU64,
event_history: Mutex<VecDeque<EventRecord>>,
history_size: usize,
}
impl MonitorStats {
fn new(history_size: usize) -> Self {
Self {
total_created: AtomicU64::new(0),
total_disposed: AtomicU64::new(0),
total_subscribers: AtomicU64::new(0),
event_history: Mutex::new(VecDeque::with_capacity(history_size)),
history_size,
}
}
fn record_event(&self, event_type: &str, stream_path: &str) {
let record = EventRecord {
timestamp: chrono::Utc::now().to_rfc3339(),
event_type: event_type.to_string(),
stream_path: stream_path.to_string(),
};
let mut history = self.event_history.lock();
if history.len() >= self.history_size {
history.pop_front();
}
history.push_back(record);
}
}
pub struct MonitorPlugin {
state: PluginState,
config: MonitorConfig,
stream_manager: Option<Arc<dyn StreamManagerApi>>,
stats: Option<Arc<MonitorStats>>,
}
impl Default for MonitorPlugin {
fn default() -> Self {
Self {
state: PluginState::Created,
config: MonitorConfig::default(),
stream_manager: None,
stats: None,
}
}
}
#[async_trait]
impl Plugin for MonitorPlugin {
fn info(&self) -> PluginInfo {
PluginInfo {
name: "monitor",
version: "1.0.0",
description: "Real-time stream monitoring",
author: "Monibuca Team",
}
}
fn state(&self) -> PluginState { self.state }
async fn init(
&mut self,
manager: Arc<dyn StreamManagerApi>,
config: Arc<dyn ConfigProvider>,
) -> Result<()> {
// Load configuration
self.config = config
.get_plugin_config("monitor")
.and_then(|v| serde_json::from_value(v).ok())
.unwrap_or_default();
if !self.config.enable {
self.state = PluginState::Disabled;
return Ok(());
}
self.stream_manager = Some(manager);
self.stats = Some(Arc::new(MonitorStats::new(self.config.history_size)));
self.state = PluginState::Initialized;
tracing::info!("Monitor plugin initialized, history_size={}",
self.config.history_size);
Ok(())
}
async fn start(&mut self, cancel: CancellationToken) -> Result<()> {
if self.state == PluginState::Disabled {
return Ok(());
}
let manager = self.stream_manager.clone().unwrap();
let stats = self.stats.clone().unwrap();
// Start event listener coroutine
let mut event_rx = manager.subscribe_events();
tokio::spawn(async move {
loop {
tokio::select! {
biased;
_ = cancel.cancelled() => {
tracing::info!("Monitor event listener stopped");
break;
}
event = event_rx.recv() => {
match event {
Ok(StreamEvent::Created(path)) => {
stats.total_created.fetch_add(1, Ordering::Relaxed);
stats.record_event("created", &path);
tracing::info!("[Monitor] Stream created: {}", path);
}
Ok(StreamEvent::Disposed(path)) => {
stats.total_disposed.fetch_add(1, Ordering::Relaxed);
stats.record_event("disposed", &path);
tracing::info!("[Monitor] Stream disposed: {}", path);
}
Ok(StreamEvent::SubscriberAdded { stream, .. }) => {
stats.total_subscribers.fetch_add(1, Ordering::Relaxed);
stats.record_event("subscriber_added", &stream);
}
Ok(StreamEvent::SubscriberRemoved { stream, .. }) => {
stats.record_event("subscriber_removed", &stream);
}
Err(_) => break,
}
}
}
}
});
self.state = PluginState::Running;
tracing::info!("Monitor plugin started");
Ok(())
}
async fn stop(&mut self) -> Result<()> {
self.state = PluginState::Stopped;
Ok(())
}
/// Register HTTP API
fn http_routes(&self) -> Vec<HttpRoute> {
if let (Some(sm), Some(stats)) = (&self.stream_manager, &self.stats) {
let handler = MonitorHandler {
stream_manager: sm.clone(),
stats: stats.clone(),
};
vec![
HttpRoute {
method: HttpMethod::Get,
path: "/monitor/**".to_string(),
handler: Box::new(handler),
},
]
} else {
Vec::new()
}
}
fn as_any(&self) -> &dyn Any { self }
fn as_any_mut(&mut self) -> &mut dyn Any { self }
}
/// HTTP API Handler
struct MonitorHandler {
stream_manager: Arc<dyn StreamManagerApi>,
stats: Arc<MonitorStats>,
}
#[async_trait]
impl HttpHandler for MonitorHandler {
async fn handle(&self, req: HttpRequest) -> HttpResponse {
let path = req.path.trim_start_matches('/');
let parts: Vec<&str> = path.split('/').collect();
let action = parts.first().copied().unwrap_or("summary");
match action {
// GET /monitor/summary — Get statistics summary
"summary" => {
let active_streams = self.stream_manager.stream_paths();
HttpResponse::ok().json(&serde_json::json!({
"code": 0,
"data": {
"active_streams": active_streams.len(),
"stream_list": active_streams,
"total_created": self.stats.total_created
.load(Ordering::Relaxed),
"total_disposed": self.stats.total_disposed
.load(Ordering::Relaxed),
"total_subscribers": self.stats.total_subscribers
.load(Ordering::Relaxed),
}
}))
}
// GET /monitor/events — Get recent event records
"events" => {
let history = self.stats.event_history.lock();
let events: Vec<_> = history.iter().cloned().collect();
HttpResponse::ok().json(&serde_json::json!({
"code": 0,
"count": events.len(),
"events": events,
}))
}
_ => HttpResponse::not_found().json(&serde_json::json!({
"code": 404,
"message": "Unknown endpoint. Try /monitor/summary or /monitor/events"
})),
}
}
}
Terminal window
# After starting Monibuca, access the API
curl http://localhost:8180/monitor/summary
# {"code":0,"data":{"active_streams":2,"stream_list":["live/test","live/camera"],...}}
curl http://localhost:8180/monitor/events
# {"code":0,"count":5,"events":[{"timestamp":"...","event_type":"created","stream_path":"live/test"},...]}

Example 2: Simple TCP Protocol Server Plugin

Section titled “Example 2: Simple TCP Protocol Server Plugin”

A complete TCP server plugin implementing a simple text protocol — clients send commands via TCP to publish or subscribe to streams.

Simple text protocol, one command per line:

PUBLISH <stream_path> → Start publishing a stream
SUBSCRIBE <stream_path> → Start subscribing to a stream
DATA <hex_bytes> → Send a data frame (publish mode)
STOP → Stop current operation
use std::any::Any;
use std::net::SocketAddr;
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::Duration;
use async_trait::async_trait;
use bytes::Bytes;
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
use tokio::net::TcpStream;
use tokio_util::sync::CancellationToken;
use sdk::prelude::*;
pub struct SimpleTcpPlugin {
state: PluginState,
listen_addr: SocketAddr,
stream_manager: Option<Arc<dyn StreamManagerApi>>,
task_work: Option<Arc<dyn TaskWork>>,
}
impl Default for SimpleTcpPlugin {
fn default() -> Self {
Self {
state: PluginState::Created,
listen_addr: "0.0.0.0:9090".parse().unwrap(),
stream_manager: None,
task_work: None,
}
}
}
#[async_trait]
impl Plugin for SimpleTcpPlugin {
fn info(&self) -> PluginInfo {
PluginInfo {
name: "simple-tcp",
version: "0.1.0",
description: "Simple TCP protocol plugin",
author: "Tutorial",
}
}
fn state(&self) -> PluginState { self.state }
// Store TaskWork reference
fn set_task_work(&mut self, work: Arc<dyn TaskWork>) {
self.task_work = Some(work);
}
async fn init(
&mut self,
manager: Arc<dyn StreamManagerApi>,
config: Arc<dyn ConfigProvider>,
) -> Result<()> {
self.stream_manager = Some(manager);
// Parse listen address
if let Some(cfg) = config.get_plugin_config("simple-tcp") {
if let Some(addr) = cfg.get("listen_addr").and_then(|v| v.as_str()) {
let addr_str = if addr.starts_with(':') {
format!("0.0.0.0{}", addr)
} else {
addr.to_string()
};
self.listen_addr = addr_str.parse().unwrap_or(self.listen_addr);
}
}
self.state = PluginState::Initialized;
Ok(())
}
async fn start(&mut self, cancel: CancellationToken) -> Result<()> {
let addr = self.listen_addr;
let manager = self.stream_manager.clone().unwrap();
let task_work = self.task_work.clone();
let session_counter = Arc::new(AtomicU64::new(0));
// Create TCP listener
let listener = sdk::create_tcp_listener(addr).map_err(|e| {
MonibucaError::Io(std::io::Error::new(
std::io::ErrorKind::AddrInUse,
format!("Failed to bind {}: {}", addr, e),
))
})?;
tracing::info!("Simple TCP server listening on {}", addr);
// TCP accept main loop
let accept_loop = async move {
loop {
tokio::select! {
biased;
_ = cancel.cancelled() => {
tracing::info!("TCP server shutting down");
break;
}
result = listener.accept() => {
match result {
Ok((stream, peer_addr)) => {
let session_id = session_counter
.fetch_add(1, Ordering::Relaxed);
let manager = manager.clone();
let tw = task_work.clone();
// Create a child task for each connection
let (session_cancel, child) =
if let Some(ref tw) = tw {
let child = tw.create_child_task(
&format!("TCP:{}", peer_addr)
);
child.start();
let token = child.cancellation_token();
(token, Some(child))
} else {
(cancel.child_token(), None)
};
tokio::spawn(async move {
tracing::info!(
"Session {} from {}",
session_id, peer_addr
);
if let Err(e) = handle_session(
stream, session_id,
manager, session_cancel,
).await {
tracing::warn!(
"Session {} error: {}",
session_id, e
);
}
// Clean up child task
if let (Some(child), Some(tw)) = (child, tw) {
child.stop();
tw.remove_child_task(child.id());
}
tracing::info!(
"Session {} ended", session_id
);
});
}
Err(e) => {
tracing::error!("Accept error: {}", e);
}
}
}
}
}
};
// Register with TaskWork
if let Some(ref tw) = self.task_work {
tw.register_child("TCPListener", Box::pin(accept_loop));
} else {
tokio::spawn(accept_loop);
}
self.state = PluginState::Running;
Ok(())
}
async fn stop(&mut self) -> Result<()> {
self.state = PluginState::Stopped;
tracing::info!("Simple TCP plugin stopped");
Ok(())
}
fn as_any(&self) -> &dyn Any { self }
fn as_any_mut(&mut self) -> &mut dyn Any { self }
}
/// Handle a single TCP session
async fn handle_session(
stream: TcpStream,
session_id: u64,
manager: Arc<dyn StreamManagerApi>,
cancel: CancellationToken,
) -> Result<()> {
let (reader, mut writer) = stream.into_split();
let mut lines = BufReader::new(reader).lines();
writer.write_all(b"WELCOME simple-tcp/1.0\n").await
.map_err(|e| MonibucaError::Io(e))?;
loop {
tokio::select! {
biased;
_ = cancel.cancelled() => {
writer.write_all(b"BYE server shutting down\n").await.ok();
break;
}
line = lines.next_line() => {
match line {
Ok(Some(line)) => {
let parts: Vec<&str> = line.trim().splitn(2, ' ').collect();
let cmd = parts[0].to_uppercase();
let arg = parts.get(1).copied().unwrap_or("");
match cmd.as_str() {
"PUBLISH" => {
if arg.is_empty() {
writer.write_all(
b"ERR missing stream path\n"
).await.ok();
continue;
}
match do_publish(
&manager, arg, &mut lines,
&mut writer, &cancel,
).await {
Ok(_) => {
writer.write_all(
b"OK publish ended\n"
).await.ok();
}
Err(e) => {
let msg = format!(
"ERR publish failed: {}\n", e
);
writer.write_all(
msg.as_bytes()
).await.ok();
}
}
}
"SUBSCRIBE" => {
if arg.is_empty() {
writer.write_all(
b"ERR missing stream path\n"
).await.ok();
continue;
}
match do_subscribe(
&manager, arg,
&mut writer, &cancel,
).await {
Ok(_) => {
writer.write_all(
b"OK subscribe ended\n"
).await.ok();
}
Err(e) => {
let msg = format!(
"ERR subscribe failed: {}\n", e
);
writer.write_all(
msg.as_bytes()
).await.ok();
}
}
}
"QUIT" => break,
_ => {
writer.write_all(
b"ERR unknown command\n"
).await.ok();
}
}
}
Ok(None) => break, // Connection closed
Err(e) => {
tracing::warn!("Read error: {}", e);
break;
}
}
}
}
}
Ok(())
}
/// Publish stream logic
async fn do_publish(
manager: &Arc<dyn StreamManagerApi>,
stream_path: &str,
lines: &mut tokio::io::Lines<BufReader<tokio::net::tcp::OwnedReadHalf>>,
writer: &mut tokio::net::tcp::OwnedWriteHalf,
cancel: &CancellationToken,
) -> Result<()> {
// Create Publisher
let publisher = manager.create_publisher_for_plugin(stream_path, "simple-tcp")?;
{
let mut pub_guard = publisher.write();
pub_guard.init_video_track();
pub_guard.init_audio_track();
}
writer.write_all(
format!("OK publishing to {}\n", stream_path).as_bytes()
).await.map_err(|e| MonibucaError::Io(e))?;
let mut frame_count: u64 = 0;
loop {
tokio::select! {
biased;
_ = cancel.cancelled() => break,
line = lines.next_line() => {
match line {
Ok(Some(line)) => {
let line = line.trim();
if line == "STOP" {
break;
}
if let Some(hex_data) = line.strip_prefix("DATA ") {
// Simple example: write hex data as video frames
if let Ok(data) = hex::decode(hex_data) {
let ts = Duration::from_millis(frame_count * 33);
let frame_type = if frame_count % 30 == 0 {
VideoFrameType::Keyframe
} else {
VideoFrameType::Interframe
};
let _ = publisher.read().write_video(
frame_type, ts, ts,
Bytes::from(data),
);
frame_count += 1;
}
}
}
Ok(None) => break,
Err(_) => break,
}
}
}
}
// Cleanup
manager.dispose_stream(stream_path);
Ok(())
}
/// Subscribe stream logic
async fn do_subscribe(
manager: &Arc<dyn StreamManagerApi>,
stream_path: &str,
writer: &mut tokio::net::tcp::OwnedWriteHalf,
cancel: &CancellationToken,
) -> Result<()> {
let config = SubscriberConfig {
mode: SubscribeMode::RealTime,
receive_video: true,
receive_audio: true,
keyframe_timeout: Some(Duration::from_secs(10)),
..Default::default()
};
let (_pub_ref, mut subscriber) = manager.subscribe(stream_path, config).await?;
writer.write_all(
format!("OK subscribed to {}\n", stream_path).as_bytes()
).await.map_err(|e| MonibucaError::Io(e))?;
loop {
if subscriber.is_stopped() || cancel.is_cancelled() {
break;
}
// Read and forward video frames
while let Ok(Some(frame)) = subscriber.read_video() {
let info = format!(
"FRAME video ts={} keyframe={} size={}\n",
frame.timestamp.as_millis(),
frame.is_keyframe(),
frame.get_raw().len(),
);
if writer.write_all(info.as_bytes()).await.is_err() {
manager.unsubscribe(stream_path, subscriber.as_mut());
return Ok(());
}
}
// Read and forward audio frames
while let Some(frame) = subscriber.read_audio() {
let info = format!(
"FRAME audio ts={} size={}\n",
frame.timestamp.as_millis(),
frame.get_audio_raw().len(),
);
if writer.write_all(info.as_bytes()).await.is_err() {
manager.unsubscribe(stream_path, subscriber.as_mut());
return Ok(());
}
}
// Wait for new frames
match tokio::time::timeout(
Duration::from_millis(100),
subscriber.wait_for_frames(),
).await {
Ok(Ok(())) => continue,
Ok(Err(_)) => break,
Err(_) => continue,
}
}
manager.unsubscribe(stream_path, subscriber.as_mut());
Ok(())
}

Example 3: Protocol Pull Stream Factory Plugin

Section titled “Example 3: Protocol Pull Stream Factory Plugin”

Implement a PullerFactory so that other plugins (such as the cluster plugin) can pull streams from remote sources via your protocol.

use std::any::Any;
use std::sync::Arc;
use std::time::Duration;
use async_trait::async_trait;
use tokio_util::sync::CancellationToken;
use sdk::prelude::*;
use sdk::services::{PullerFactory, StreamPuller};
// ════════════════════════════════════════════════════════════════
// Plugin Main Body
// ════════════════════════════════════════════════════════════════
pub struct MyProtocolPlugin {
state: PluginState,
stream_manager: Option<Arc<dyn StreamManagerApi>>,
engine_context: Option<EngineContext>,
}
impl Default for MyProtocolPlugin {
fn default() -> Self {
Self {
state: PluginState::Created,
stream_manager: None,
engine_context: None,
}
}
}
#[async_trait]
impl Plugin for MyProtocolPlugin {
fn info(&self) -> PluginInfo {
PluginInfo {
name: "myproto",
version: "0.1.0",
description: "My custom protocol plugin",
author: "Tutorial",
}
}
fn state(&self) -> PluginState { self.state }
// Receive EngineContext, obtain ServiceRegistry
fn set_engine_context(&mut self, ctx: EngineContext) {
self.engine_context = Some(ctx);
}
async fn init(
&mut self,
manager: Arc<dyn StreamManagerApi>,
_config: Arc<dyn ConfigProvider>,
) -> Result<()> {
self.stream_manager = Some(manager.clone());
// Register pull stream factory with ServiceRegistry
if let Some(ref ctx) = self.engine_context {
if let Some(registry) = ctx.service_registry() {
registry.register_puller_factory(
Arc::new(MyProtoPullerFactory {
default_manager: manager,
})
);
tracing::info!("Registered myproto:// puller factory");
}
}
self.state = PluginState::Initialized;
Ok(())
}
async fn start(&mut self, _cancel: CancellationToken) -> Result<()> {
self.state = PluginState::Running;
Ok(())
}
async fn stop(&mut self) -> Result<()> {
self.state = PluginState::Stopped;
Ok(())
}
fn as_any(&self) -> &dyn Any { self }
fn as_any_mut(&mut self) -> &mut dyn Any { self }
}
// ════════════════════════════════════════════════════════════════
// PullerFactory Implementation
// ════════════════════════════════════════════════════════════════
struct MyProtoPullerFactory {
default_manager: Arc<dyn StreamManagerApi>,
}
#[async_trait]
impl PullerFactory for MyProtoPullerFactory {
fn protocol(&self) -> &str {
"myproto"
}
fn supports_url(&self, url: &str) -> bool {
url.starts_with("myproto://")
}
async fn create_puller(
&self,
url: &str,
stream_path: &str,
config: serde_json::Value,
manager: Arc<dyn StreamManagerApi>,
cancel_token: CancellationToken,
) -> Result<Arc<tokio::sync::RwLock<Box<dyn StreamPuller>>>> {
// Parse URL
let remote_addr = url.strip_prefix("myproto://")
.ok_or_else(|| MonibucaError::Plugin("Invalid URL".to_string()))?;
// Read optional parameters from JSON config
let timeout_ms = config.get("timeout_ms")
.and_then(|v| v.as_u64())
.unwrap_or(5000);
let puller = MyProtoPuller {
remote_addr: remote_addr.to_string(),
stream_path: stream_path.to_string(),
manager,
cancel_token,
timeout: Duration::from_millis(timeout_ms),
running: false,
};
Ok(Arc::new(tokio::sync::RwLock::new(
Box::new(puller) as Box<dyn StreamPuller>
)))
}
}
// ════════════════════════════════════════════════════════════════
// StreamPuller Implementation
// ════════════════════════════════════════════════════════════════
struct MyProtoPuller {
remote_addr: String,
stream_path: String,
manager: Arc<dyn StreamManagerApi>,
cancel_token: CancellationToken,
timeout: Duration,
running: bool,
}
#[async_trait]
impl StreamPuller for MyProtoPuller {
async fn start(&mut self) -> Result<()> {
self.running = true;
tracing::info!(
"Pulling from myproto://{} to {}",
self.remote_addr, self.stream_path
);
// 1. Connect to remote server
let stream = tokio::time::timeout(
self.timeout,
tokio::net::TcpStream::connect(&self.remote_addr),
).await
.map_err(|_| MonibucaError::Plugin("Connect timeout".to_string()))?
.map_err(|e| MonibucaError::Io(e))?;
// 2. Create local Publisher
let publisher = self.manager.create_publisher_for_plugin(
&self.stream_path, "myproto"
)?;
{
let mut pub_guard = publisher.write();
pub_guard.init_video_track();
pub_guard.init_audio_track();
}
// 3. Read data from remote and write to Publisher
let cancel = self.cancel_token.clone();
let (reader, _writer) = stream.into_split();
let mut buf_reader = tokio::io::BufReader::new(reader);
loop {
tokio::select! {
biased;
_ = cancel.cancelled() => break,
// This should be your custom protocol parsing logic
// Pseudocode for illustration:
result = read_protocol_frame(&mut buf_reader) => {
match result {
Ok(frame) => {
publisher.read().write_video(
frame.frame_type,
frame.pts,
frame.dts,
frame.data,
)?;
}
Err(_) => break,
}
}
}
}
// 4. Cleanup
self.manager.dispose_stream(&self.stream_path);
self.running = false;
Ok(())
}
fn stop(&self) -> Result<()> {
self.cancel_token.cancel();
Ok(())
}
fn stream_path(&self) -> &str {
&self.stream_path
}
fn is_running(&self) -> bool {
self.running
}
}

How Other Plugins Use Your Pull Stream Factory

Section titled “How Other Plugins Use Your Pull Stream Factory”
// In any other plugin (e.g., cluster plugin):
if let Some(registry) = engine_context.service_registry() {
// Automatically match factory by URL
if let Some(factory) = registry.get_puller_factory_for_url("myproto://192.168.1.100:9090") {
let puller = factory.create_puller(
"myproto://192.168.1.100:9090/live/test",
"live/remote-test",
serde_json::json!({"timeout_ms": 3000}),
stream_manager.clone(),
cancel_token.clone(),
).await?;
// Start pulling
puller.write().await.start().await?;
}
}

Through these three examples, you have mastered the core patterns of Monibuca plugin development:

PatternExampleKey Technologies
HTTP API PluginStream Monitorhttp_routes(), HttpHandler, StreamEvent
TCP Server PluginSimple TCPcreate_tcp_listener, TaskWork, Publish/Subscribe
Protocol Factory PluginPull Stream FactoryEngineContext, ServiceRegistry, PullerFactory

For more information, see: