Skip to content

Dispatcher Architecture

Dispatcher is the core component in Monibuca V6 responsible for distributing frame data from a Publisher to all Subscribers. Its design goal is to eliminate redundant reads, achieving true O(1) read + O(N) broadcast.

Problems with the Traditional N-Reader Approach

Section titled “Problems with the Traditional N-Reader Approach”

In traditional streaming servers, each subscriber independently reads data from the buffer:

Traditional approach: N subscribers = N reads
RingBuffer
├── Reader 1 reads frame ──▶ Subscriber 1
├── Reader 2 reads frame ──▶ Subscriber 2
├── Reader 3 reads frame ──▶ Subscriber 3
└── Reader N reads frame ──▶ Subscriber N
Problems:
• 1000 subscribers = 1000 RingBuffer reads (same frame)
• Each Reader needs to track state, detect overwrites
• High concurrency causes massive atomic operation contention
Dispatcher approach: N subscribers = 1 read
┌─────────────────┐
│ Publisher │
└────────┬────────┘
┌─────────────────┐
│ RingBuffer │
└────────┬────────┘
┌──────────────────────────────┐
│ Dispatcher (single-thread │
│ frame read) │
│ • Reads only once │
│ • Arc zero-copy dispatch │
│ • Non-blocking try_send │
└──────────────┬───────────────┘
┌────────────┬───────┴───────┬────────────┐
↓ ↓ ↓ ↓
┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐
│ Queue 1 │ │ Queue 2 │ │ Queue 3 │ │ Queue N │
│(bounded)│ │(bounded)│ │(bounded)│ │(bounded)│
└────┬────┘ └────┬────┘ └────┬────┘ └────┬────┘
↓ ↓ ↓ ↓
Writer 1 Writer 2 Writer 3 Writer N
(own task) (own task) (own task) (own task)

Core advantages:

MetricTraditional ApproachDispatcher Approach
RingBuffer reads per frameN times1 time
Read lock contentionO(N)O(1)
Frame data copies0 (Arc shared)0 (Arc shared)
Slow subscriber impactMay blockDrops frames, no blocking
pub struct Dispatcher {
stream_path: String,
// Subscriber list - ArcSwap lock-free reads (COW mode)
subscribers: ArcSwap<Vec<DispatchSubscriber>>,
next_id: AtomicU64,
running: AtomicBool,
queue_capacity: usize, // Default 150
total_dispatched: AtomicU64,
}

Each Stream corresponds to one Dispatcher instance.

Each subscriber owns a bounded channel (default capacity 150):

Queue capacity = 150 frames
≈ 1.8 seconds @ 80fps (mixed video + audio frame rate)
Can tolerate network jitter

When the queue is full, new frames are dropped (try_send returns Full) instead of blocking other subscribers. A dropped frame counter is automatically incremented for monitoring purposes.

The Dispatcher sends the following frame types through the channel:

pub enum DispatchFrame {
Video(Arc<AVFrame>), // Video frame
Audio(Arc<AVFrame>), // Audio frame
VideoSeqHeader(Bytes), // Video sequence header (AVC/HEVC decoder config)
AudioSeqHeader(Bytes), // Audio sequence header (AAC decoder config)
Eos, // End of stream signal
}
Publisher 30fps
Dispatcher ──try_send──▶ [Queue: ████████░░] ──▶ Subscriber (normal)
150/150
Dispatcher ──try_send──▶ [Queue: ██████████] ──▶ Subscriber (stalled)
FULL! Drop frame frames_dropped++
No blocking!

Design principle: Slow subscribers only affect themselves, never dragging down the entire system.

  • Short-lived queue buildup from network fluctuations can be absorbed by the buffer
  • Sustained slow consumption leads to frame drops; clients can typically recover on their own
  • Per-subscriber dropped frame counts can be monitored via API

The subscriber list uses ArcSwap<Vec<DispatchSubscriber>> for lock-free management:

// Clone-on-Write: does not block ongoing dispatch
self.subscribers.rcu(|old| {
let mut new = (**old).clone();
new.push(subscriber);
new
});
// Atomic load - lock-free
let subs = self.subscribers.load();
for sub in subs.iter() {
if sub.receive_video && !sub.is_closed() {
sub.try_send(DispatchFrame::Video(frame.clone()));
}
}

In a scenario with 1000 subscribers @ 60fps, dispatch operations are completely free of lock contention.

When the server needs to handle a large number of concurrent streams (>100), DispatcherPool mode can be enabled.

┌─────────────────────────────┐
│ DispatcherPool │
│ (Manages N Workers) │
└─────────────┬───────────────┘
┌───────────────────────┼───────────────────────┐
↓ ↓ ↓
┌──────────┐ ┌──────────┐ ┌──────────┐
│ Worker 0 │ │ Worker 1 │ │Worker N-1│
│ Stream A │ │ Stream C │ │ Stream E │
│ Stream B │ │ Stream D │ │ Stream F │
└──────────┘ └──────────┘ └──────────┘

Streams are assigned to Workers via consistent hashing:

fn worker_index(&self, stream_path: &str) -> usize {
let mut hasher = DefaultHasher::new();
stream_path.hash(&mut hasher);
(hasher.finish() as usize) % self.num_workers
}

The same stream path is always assigned to the same Worker, ensuring processing continuity for each stream.

Controlled by the dispatcher_workers configuration:

ValueModeDescription
0Per-StreamOne independent Dispatcher task per stream (default)
NPoolN Workers, each handling multiple streams
# Configuration example
stream:
dispatcher_workers: 0 # Per-Stream mode (default)
dispatcher_workers: 4 # Pool mode with 4 Workers
dispatcher_workers: 8 # Pool mode with 8 Workers

Selection guidelines:

  • Few high-concurrency streams (< 100): Use 0 (Per-Stream), each stream gets its own task
  • Many streams (> 100): Use Pool mode to reduce task overhead
  • Recommended Worker count: 1–2x the number of CPU cores
1. Publisher writes a frame
Publisher.write_video(frame)
2. Frame is written to RingBuffer
VideoTrack.buffer.write(frame)
3. Frame change notification
frame_notify.send(count) ──── watch::channel
4. Dispatcher receives notification
frame_notify.changed().await ──── Single-thread wakeup
5. Read frame from RingBuffer (only once)
video_reader.read_next() ──── Returns Arc<AVFrame>
6. Broadcast to all subscribers
for sub in subscribers:
sub.try_send(Video(frame.clone())) ──── Arc::clone (ref count +1)
7. Subscriber receives from channel
receiver.recv().await ──── Async await
8. Protocol encoding output
RTMP/FLV/HLS/WebRTC encoder

The Dispatcher performs cleanup every 100 dispatch cycles, removing closed subscribers:

cleanup_counter += 1;
if cleanup_counter % 100 == 0 {
self.cleanup_closed(); // COW mode removes closed subscribers
}

When a stream ends, the Dispatcher sends an Eos (End of Stream) signal to all subscribers.