Stream Management
StreamManager is the stream management core of Monibuca V6, responsible for the full lifecycle management of stream creation, publishing, subscription, and destruction.
Stream Lifecycle
Section titled “Stream Lifecycle”┌─────────┐ ┌───────────┐ ┌─────────────┐ ┌──────────┐│ Create │────▶│ Publish │────▶│ Subscribe │────▶│ Destroy ││ │ │ │ │ │ │ │└─────────┘ └───────────┘ └─────────────┘ └──────────┘ │ │ │ ▲ │ │ │ │ │ ▼ ▼ │ │ Init Tracks Read Frames Dispose │ Set Codec Wait For Frames Clean Up │ Write Frames Unsubscribe Remove from RegistryDetailed State Transitions
Section titled “Detailed State Transitions”Publisher state machine:
Init │ ▼ (init_video_track / init_audio_track) TrackAdded │ ▼ (add_subscriber) Subscribed ◀─────────────────────────┐ │ │ ▼ (all subscribers leave) │ WaitingSubscriber │ │ │ ├─── New subscriber joins ────────┘ │ ▼ (publisher disconnects, reconnect-on-disconnect enabled) WaitingReconnect │ ├─── Republish ──▶ Subscribed │ ▼ (timeout) Disposed
Special states: Paused ──── Data timeout detection disabled (VOD/playback scenarios)StreamManager Responsibilities
Section titled “StreamManager Responsibilities”StreamManager is the unified entry point for stream management, composed of three sub-components:
StreamManager├── StreamRegistry # Stream registry (DashMap high-concurrency storage)├── StreamLifecycle # Lifecycle management (create/destroy/reconnect-on-disconnect)└── WaitingQueue # Wait queue (subscribers waiting for unpublished streams)Core API
Section titled “Core API”impl StreamManager { // Create a stream (called by publisher) fn create_publisher(stream_path) -> Publisher
// Get a stream (called by subscriber) fn get_publisher(stream_path) -> Option<Publisher>
// Create with Dispatcher fn create_publisher_with_dispatcher(stream_path) -> (Publisher, Dispatcher)
// Event subscription fn subscribe_events() -> Receiver<StreamEvent>
// Stream info queries fn stream_exists(stream_path) -> bool fn stream_count() -> usize fn list_streams() -> Vec<StreamInfo>}Publisher
Section titled “Publisher”The Publisher manages the ingest side of a stream:
pub struct Publisher { stream_path: String, // Stream path (e.g., "live/camera1") plugin_name: String, // Name of the plugin that created this stream stream_type: String, // Stream type (e.g., "rtmp", "rtsp") video_track: Option<Arc<VideoTrack>>, // Video track audio_track: Option<Arc<AudioTrack>>, // Audio track subscribers: DashMap<u64, SubscriberInfo>, // Subscriber list frame_notify: watch::Sender<u64>, // Frame notification channel config: PublisherConfig, // Configuration task: Option<Arc<Task>>, // Associated task (hierarchical cancellation)}Publisher Configuration
Section titled “Publisher Configuration”PublisherConfig { video_buffer_capacity: 1024, // Video buffer size audio_buffer_capacity: 64, // Audio buffer size default_buffer_time: 2s, // Default buffer time max_buffer_time: 10s, // Maximum buffer time wait_timeout: 30s, // No-subscriber wait timeout max_fps: 0, // Max frame rate (0 = unlimited) data_timeout: 60s, // Data timeout (0 = no timeout) continue_push_timeout: 30s, // Reconnect-on-disconnect timeout}Publishing Flow
Section titled “Publishing Flow”// 1. Create Publisherlet publisher = stream_manager.create_publisher("live/stream1")?;
// 2. Initialize trackslet mut pub_guard = publisher.write();pub_guard.init_video_track();pub_guard.init_audio_track();
// 3. Set codec informationpub_guard.set_video_codec(VideoCodec::new(H264, 1920, 1080));pub_guard.set_audio_codec(AudioCodec::new(AAC, 44100, 2));
// 4. Write frame data (in the publishing loop)pub_guard.write_video(IDR, timestamp, dts, data)?;pub_guard.write_audio(timestamp, data)?;Subscriber
Section titled “Subscriber”Subscribers consume frame data from the Publisher’s RingBuffer:
Three Subscription Modes
Section titled “Three Subscription Modes”| Mode | Description |
|---|---|
| RealTime | Start playback from the latest IDR frame (default) |
| Buffer | Start from an IDR frame before the specified buffer time |
| WaitKeyframe | Wait for the next keyframe |
Subscription Configuration
Section titled “Subscription Configuration”SubscriberConfig { mode: SubscribeMode::RealTime, buffer_time: Duration::from_secs(2), receive_video: true, receive_audio: true, keyframe_timeout: Some(Duration::from_secs(30)),}Subscription Flow
Section titled “Subscription Flow”// 1. Create Subscriberlet mut subscriber = Subscriber::new("live/stream1");
// 2. Subscribe to Publisherlet publisher = stream_manager.get_publisher("live/stream1")?;subscriber.subscribe(&publisher)?;
// 3. Read frame data (in the playback loop)loop { subscriber.wait_for_frames().await?;
while let Ok(Some(frame)) = subscriber.read_video() { // Process video frame }
while let Some(frame) = subscriber.read_audio() { // Process audio frame }}
// 4. Unsubscribesubscriber.unsubscribe(&publisher);Automatic Opus→AAC Transcoding
Section titled “Automatic Opus→AAC Transcoding”When the source stream audio is Opus-encoded (e.g., WebRTC ingest) and the subscriber needs AAC (e.g., RTMP/FLV playback), the Subscriber automatically performs transcoding:
WebRTC ingest (Opus) ──▶ Publisher ──▶ Subscriber ──▶ AAC auto-transcode ──▶ RTMP playback │ skip_opus_transcode=true │ WebRTC subscriber consumes Opus directlyAudio/Video Track Management
Section titled “Audio/Video Track Management”VideoTrack
Section titled “VideoTrack”VideoTrack├── RingBuffer (1024 slots) # Frame storage├── VideoCodec # Codec info (H.264/H.265/AV1)├── SequenceGenerator # Frame sequence number generator├── Stats # BPS/FPS statistics└── IDR Tracking # Keyframe trackingVideoTrack supports two frame write formats:
- AVCC format: From RTMP/FLV (length-prefixed NAL units)
- Raw NAL format: From RTSP/RTP (raw NAL unit list)
AudioTrack
Section titled “AudioTrack”AudioTrack├── RingBuffer (64 slots) # Frame storage├── AudioCodec # Codec info (AAC/Opus/G.711/...)├── SequenceGenerator # Frame sequence number generator└── Stats # BPS statisticsSequence Header
Section titled “Sequence Header”The Publisher maintains video and audio sequence headers:
- Video sequence header: AVC/HEVC Decoder Configuration Record
- Audio sequence header: AAC AudioSpecificConfig
When a new subscriber connects, the Dispatcher sends the sequence headers first to ensure proper decoder initialization.
Pull/Push Proxy
Section titled “Pull/Push Proxy”Pull Proxy
Section titled “Pull Proxy”Pulls a stream from a remote source and publishes it locally:
Remote RTMP server ──pull──▶ Monibuca Publisher ──▶ Local subscribersSupported pull protocols: RTMP, RTSP, SRT, HLS, HTTP-FLV, MP4
Push Proxy
Section titled “Push Proxy”Pushes a local stream to a remote server:
Local Publisher ──push──▶ Remote RTMP/RTSP/SRT serverProxy configuration supports automatic retry, reconnection intervals, and maximum retry count.
Recording
Section titled “Recording”The recording module writes stream data to files:
Publisher ──▶ Subscriber (recording) ──▶ FileWriter │ ┌─────┼─────┐ ▼ ▼ ▼ FLV MP4 HLS │ fMP4 / RawSupported recording formats:
| Format | Description |
|---|---|
| FLV | FLV file recording |
| MP4 | Standard MP4 file |
| fMP4 | Fragmented MP4 |
| HLS | HLS TS segments |
| Raw | Raw frame data |
Recording supports three priority levels: Normal (can be auto-deleted), High (protected), and Event (event-triggered).
Transformer
Section titled “Transformer”The Transformer subscribes to a source stream, processes it, and publishes it as a new stream:
Source stream "live/camera1" │ ▼Transformer (subscribe → process → publish) │ ▼Target stream "live/camera1_720p"Use cases:
- Video transcoding (resolution/codec conversion)
- Watermark/overlay addition
- SEI data injection
- Format conversion
Wait Queue (WaitQueue)
Section titled “Wait Queue (WaitQueue)”When a subscriber requests a stream that has not yet been published, instead of failing immediately, it enters the wait queue:
Subscriber requests "live/camera1" │ ├── Stream exists? ──Yes──▶ Subscribe immediately │ └── Stream doesn't exist? ──▶ Join WaitQueue │ ▼ Waiting for stream to be published... │ ┌───────┴───────┐ │ │ Stream published Wait timeout │ │ ▼ ▼ Auto-subscribe Return errorWaitQueue decouples stream publishing from subscription — subscribers can start before the publisher. This is particularly useful in the following scenarios:
- On-demand pulling: Subscription triggers a Pull Proxy to pull from a remote source
- Device reconnection: Subscribers wait for a device to reconnect after disconnection
- Live scheduling: Viewers enter a live room in advance and wait for the broadcast to start
Reconnect on Disconnect
Section titled “Reconnect on Disconnect”When a publisher disconnects, Monibuca does not immediately destroy the stream but enters the WaitingReconnect state:
Publisher disconnects │ ▼Publisher state → WaitingReconnect │ ├── Reconnects within continue_push_timeout │ │ │ ▼ │ take_over (takeover) │ • Inherits timestamp base │ • Transfers sequence headers │ • Restores subscriber list │ • Seamless transition for subscribers │ └── Timeout without reconnection │ ▼ Publisher → Disposed Notify all subscribers with EOSKey design: Reconnect on disconnect ensures that viewers don’t need to reconnect during brief publisher interruptions, providing a seamless viewing experience.
The default reconnect-on-disconnect timeout is 30 seconds, configurable via continue_push_timeout. Set to 0 to disable this feature.
Data Timeout Detection
Section titled “Data Timeout Detection”The Publisher detects whether the ingest side has not sent data for an extended period:
// Configure data timeout (default 60 seconds)data_timeout: Duration::from_secs(60)last_data_timeis updated on each frame write- The Publisher is automatically disposed after timeout
- Timeout detection is disabled in the Paused state (for VOD/playback scenarios)
- Set to 0 to disable timeout detection
Event System
Section titled “Event System”StreamManager publishes stream events through a broadcast channel:
let mut events = stream_manager.subscribe_events();
loop { match events.recv().await { Ok(StreamEvent::Published { stream_path, .. }) => { println!("Stream published: {}", stream_path); } Ok(StreamEvent::Unpublished { stream_path, .. }) => { println!("Stream stopped: {}", stream_path); } Ok(StreamEvent::Subscribed { stream_path, subscriber_id, .. }) => { println!("New subscription: {} #{}", stream_path, subscriber_id); } _ => {} }}Plugins can listen to stream events to trigger corresponding business logic (e.g., on-demand recording, automatic transcoding, etc.).