Skip to content

Stream Management

StreamManager is the stream management core of Monibuca V6, responsible for the full lifecycle management of stream creation, publishing, subscription, and destruction.

┌─────────┐ ┌───────────┐ ┌─────────────┐ ┌──────────┐
│ Create │────▶│ Publish │────▶│ Subscribe │────▶│ Destroy │
│ │ │ │ │ │ │ │
└─────────┘ └───────────┘ └─────────────┘ └──────────┘
│ │ │ ▲
│ │ │ │
│ ▼ ▼ │
│ Init Tracks Read Frames Dispose
│ Set Codec Wait For Frames Clean Up
│ Write Frames Unsubscribe Remove from Registry
Publisher state machine:
Init
▼ (init_video_track / init_audio_track)
TrackAdded
▼ (add_subscriber)
Subscribed ◀─────────────────────────┐
│ │
▼ (all subscribers leave) │
WaitingSubscriber │
│ │
├─── New subscriber joins ────────┘
▼ (publisher disconnects, reconnect-on-disconnect enabled)
WaitingReconnect
├─── Republish ──▶ Subscribed
▼ (timeout)
Disposed
Special states:
Paused ──── Data timeout detection disabled (VOD/playback scenarios)

StreamManager is the unified entry point for stream management, composed of three sub-components:

StreamManager
├── StreamRegistry # Stream registry (DashMap high-concurrency storage)
├── StreamLifecycle # Lifecycle management (create/destroy/reconnect-on-disconnect)
└── WaitingQueue # Wait queue (subscribers waiting for unpublished streams)
impl StreamManager {
// Create a stream (called by publisher)
fn create_publisher(stream_path) -> Publisher
// Get a stream (called by subscriber)
fn get_publisher(stream_path) -> Option<Publisher>
// Create with Dispatcher
fn create_publisher_with_dispatcher(stream_path)
-> (Publisher, Dispatcher)
// Event subscription
fn subscribe_events() -> Receiver<StreamEvent>
// Stream info queries
fn stream_exists(stream_path) -> bool
fn stream_count() -> usize
fn list_streams() -> Vec<StreamInfo>
}

The Publisher manages the ingest side of a stream:

pub struct Publisher {
stream_path: String, // Stream path (e.g., "live/camera1")
plugin_name: String, // Name of the plugin that created this stream
stream_type: String, // Stream type (e.g., "rtmp", "rtsp")
video_track: Option<Arc<VideoTrack>>, // Video track
audio_track: Option<Arc<AudioTrack>>, // Audio track
subscribers: DashMap<u64, SubscriberInfo>, // Subscriber list
frame_notify: watch::Sender<u64>, // Frame notification channel
config: PublisherConfig, // Configuration
task: Option<Arc<Task>>, // Associated task (hierarchical cancellation)
}
PublisherConfig {
video_buffer_capacity: 1024, // Video buffer size
audio_buffer_capacity: 64, // Audio buffer size
default_buffer_time: 2s, // Default buffer time
max_buffer_time: 10s, // Maximum buffer time
wait_timeout: 30s, // No-subscriber wait timeout
max_fps: 0, // Max frame rate (0 = unlimited)
data_timeout: 60s, // Data timeout (0 = no timeout)
continue_push_timeout: 30s, // Reconnect-on-disconnect timeout
}
// 1. Create Publisher
let publisher = stream_manager.create_publisher("live/stream1")?;
// 2. Initialize tracks
let mut pub_guard = publisher.write();
pub_guard.init_video_track();
pub_guard.init_audio_track();
// 3. Set codec information
pub_guard.set_video_codec(VideoCodec::new(H264, 1920, 1080));
pub_guard.set_audio_codec(AudioCodec::new(AAC, 44100, 2));
// 4. Write frame data (in the publishing loop)
pub_guard.write_video(IDR, timestamp, dts, data)?;
pub_guard.write_audio(timestamp, data)?;

Subscribers consume frame data from the Publisher’s RingBuffer:

ModeDescription
RealTimeStart playback from the latest IDR frame (default)
BufferStart from an IDR frame before the specified buffer time
WaitKeyframeWait for the next keyframe
SubscriberConfig {
mode: SubscribeMode::RealTime,
buffer_time: Duration::from_secs(2),
receive_video: true,
receive_audio: true,
keyframe_timeout: Some(Duration::from_secs(30)),
}
// 1. Create Subscriber
let mut subscriber = Subscriber::new("live/stream1");
// 2. Subscribe to Publisher
let publisher = stream_manager.get_publisher("live/stream1")?;
subscriber.subscribe(&publisher)?;
// 3. Read frame data (in the playback loop)
loop {
subscriber.wait_for_frames().await?;
while let Ok(Some(frame)) = subscriber.read_video() {
// Process video frame
}
while let Some(frame) = subscriber.read_audio() {
// Process audio frame
}
}
// 4. Unsubscribe
subscriber.unsubscribe(&publisher);

When the source stream audio is Opus-encoded (e.g., WebRTC ingest) and the subscriber needs AAC (e.g., RTMP/FLV playback), the Subscriber automatically performs transcoding:

WebRTC ingest (Opus) ──▶ Publisher ──▶ Subscriber ──▶ AAC auto-transcode ──▶ RTMP playback
skip_opus_transcode=true
WebRTC subscriber consumes Opus directly
VideoTrack
├── RingBuffer (1024 slots) # Frame storage
├── VideoCodec # Codec info (H.264/H.265/AV1)
├── SequenceGenerator # Frame sequence number generator
├── Stats # BPS/FPS statistics
└── IDR Tracking # Keyframe tracking

VideoTrack supports two frame write formats:

  • AVCC format: From RTMP/FLV (length-prefixed NAL units)
  • Raw NAL format: From RTSP/RTP (raw NAL unit list)
AudioTrack
├── RingBuffer (64 slots) # Frame storage
├── AudioCodec # Codec info (AAC/Opus/G.711/...)
├── SequenceGenerator # Frame sequence number generator
└── Stats # BPS statistics

The Publisher maintains video and audio sequence headers:

  • Video sequence header: AVC/HEVC Decoder Configuration Record
  • Audio sequence header: AAC AudioSpecificConfig

When a new subscriber connects, the Dispatcher sends the sequence headers first to ensure proper decoder initialization.

Pulls a stream from a remote source and publishes it locally:

Remote RTMP server ──pull──▶ Monibuca Publisher ──▶ Local subscribers

Supported pull protocols: RTMP, RTSP, SRT, HLS, HTTP-FLV, MP4

Pushes a local stream to a remote server:

Local Publisher ──push──▶ Remote RTMP/RTSP/SRT server

Proxy configuration supports automatic retry, reconnection intervals, and maximum retry count.

The recording module writes stream data to files:

Publisher ──▶ Subscriber (recording) ──▶ FileWriter
┌─────┼─────┐
▼ ▼ ▼
FLV MP4 HLS
fMP4 / Raw

Supported recording formats:

FormatDescription
FLVFLV file recording
MP4Standard MP4 file
fMP4Fragmented MP4
HLSHLS TS segments
RawRaw frame data

Recording supports three priority levels: Normal (can be auto-deleted), High (protected), and Event (event-triggered).

The Transformer subscribes to a source stream, processes it, and publishes it as a new stream:

Source stream "live/camera1"
Transformer (subscribe → process → publish)
Target stream "live/camera1_720p"

Use cases:

  • Video transcoding (resolution/codec conversion)
  • Watermark/overlay addition
  • SEI data injection
  • Format conversion

When a subscriber requests a stream that has not yet been published, instead of failing immediately, it enters the wait queue:

Subscriber requests "live/camera1"
├── Stream exists? ──Yes──▶ Subscribe immediately
└── Stream doesn't exist? ──▶ Join WaitQueue
Waiting for stream to be published...
┌───────┴───────┐
│ │
Stream published Wait timeout
│ │
▼ ▼
Auto-subscribe Return error

WaitQueue decouples stream publishing from subscription — subscribers can start before the publisher. This is particularly useful in the following scenarios:

  • On-demand pulling: Subscription triggers a Pull Proxy to pull from a remote source
  • Device reconnection: Subscribers wait for a device to reconnect after disconnection
  • Live scheduling: Viewers enter a live room in advance and wait for the broadcast to start

When a publisher disconnects, Monibuca does not immediately destroy the stream but enters the WaitingReconnect state:

Publisher disconnects
Publisher state → WaitingReconnect
├── Reconnects within continue_push_timeout
│ │
│ ▼
│ take_over (takeover)
│ • Inherits timestamp base
│ • Transfers sequence headers
│ • Restores subscriber list
│ • Seamless transition for subscribers
└── Timeout without reconnection
Publisher → Disposed
Notify all subscribers with EOS

Key design: Reconnect on disconnect ensures that viewers don’t need to reconnect during brief publisher interruptions, providing a seamless viewing experience.

The default reconnect-on-disconnect timeout is 30 seconds, configurable via continue_push_timeout. Set to 0 to disable this feature.

The Publisher detects whether the ingest side has not sent data for an extended period:

// Configure data timeout (default 60 seconds)
data_timeout: Duration::from_secs(60)
  • last_data_time is updated on each frame write
  • The Publisher is automatically disposed after timeout
  • Timeout detection is disabled in the Paused state (for VOD/playback scenarios)
  • Set to 0 to disable timeout detection

StreamManager publishes stream events through a broadcast channel:

let mut events = stream_manager.subscribe_events();
loop {
match events.recv().await {
Ok(StreamEvent::Published { stream_path, .. }) => {
println!("Stream published: {}", stream_path);
}
Ok(StreamEvent::Unpublished { stream_path, .. }) => {
println!("Stream stopped: {}", stream_path);
}
Ok(StreamEvent::Subscribed { stream_path, subscriber_id, .. }) => {
println!("New subscription: {} #{}", stream_path, subscriber_id);
}
_ => {}
}
}

Plugins can listen to stream events to trigger corresponding business logic (e.g., on-demand recording, automatic transcoding, etc.).