Dispatcher Architecture
Dispatcher is the core component in Monibuca V6 responsible for distributing frame data from a Publisher to all Subscribers. Its design goal is to eliminate redundant reads, achieving true O(1) read + O(N) broadcast.
Design Motivation
Section titled “Design Motivation”Problems with the Traditional N-Reader Approach
Section titled “Problems with the Traditional N-Reader Approach”In traditional streaming servers, each subscriber independently reads data from the buffer:
Traditional approach: N subscribers = N reads
RingBuffer │ ├── Reader 1 reads frame ──▶ Subscriber 1 ├── Reader 2 reads frame ──▶ Subscriber 2 ├── Reader 3 reads frame ──▶ Subscriber 3 └── Reader N reads frame ──▶ Subscriber N
Problems:• 1000 subscribers = 1000 RingBuffer reads (same frame)• Each Reader needs to track state, detect overwrites• High concurrency causes massive atomic operation contentionThe Dispatcher Approach
Section titled “The Dispatcher Approach”Dispatcher approach: N subscribers = 1 read
┌─────────────────┐ │ Publisher │ └────────┬────────┘ ↓ ┌─────────────────┐ │ RingBuffer │ └────────┬────────┘ ↓ ┌──────────────────────────────┐ │ Dispatcher (single-thread │ │ frame read) │ │ • Reads only once │ │ • Arc zero-copy dispatch │ │ • Non-blocking try_send │ └──────────────┬───────────────┘ ↓ ┌────────────┬───────┴───────┬────────────┐ ↓ ↓ ↓ ↓┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐│ Queue 1 │ │ Queue 2 │ │ Queue 3 │ │ Queue N ││(bounded)│ │(bounded)│ │(bounded)│ │(bounded)│└────┬────┘ └────┬────┘ └────┬────┘ └────┬────┘ ↓ ↓ ↓ ↓ Writer 1 Writer 2 Writer 3 Writer N (own task) (own task) (own task) (own task)Core advantages:
| Metric | Traditional Approach | Dispatcher Approach |
|---|---|---|
| RingBuffer reads per frame | N times | 1 time |
| Read lock contention | O(N) | O(1) |
| Frame data copies | 0 (Arc shared) | 0 (Arc shared) |
| Slow subscriber impact | May block | Drops frames, no blocking |
Core Components
Section titled “Core Components”Dispatcher Structure
Section titled “Dispatcher Structure”pub struct Dispatcher { stream_path: String, // Subscriber list - ArcSwap lock-free reads (COW mode) subscribers: ArcSwap<Vec<DispatchSubscriber>>, next_id: AtomicU64, running: AtomicBool, queue_capacity: usize, // Default 150 total_dispatched: AtomicU64,}Each Stream corresponds to one Dispatcher instance.
Subscriber Queues
Section titled “Subscriber Queues”Each subscriber owns a bounded channel (default capacity 150):
Queue capacity = 150 frames≈ 1.8 seconds @ 80fps (mixed video + audio frame rate)Can tolerate network jitterWhen the queue is full, new frames are dropped (try_send returns Full) instead of blocking other subscribers. A dropped frame counter is automatically incremented for monitoring purposes.
Frame Message Types
Section titled “Frame Message Types”The Dispatcher sends the following frame types through the channel:
pub enum DispatchFrame { Video(Arc<AVFrame>), // Video frame Audio(Arc<AVFrame>), // Audio frame VideoSeqHeader(Bytes), // Video sequence header (AVC/HEVC decoder config) AudioSeqHeader(Bytes), // Audio sequence header (AAC decoder config) Eos, // End of stream signal}Bounded Channel Backpressure
Section titled “Bounded Channel Backpressure”Publisher 30fps │ ▼Dispatcher ──try_send──▶ [Queue: ████████░░] ──▶ Subscriber (normal) 150/150
Dispatcher ──try_send──▶ [Queue: ██████████] ──▶ Subscriber (stalled) FULL! Drop frame frames_dropped++ No blocking!Design principle: Slow subscribers only affect themselves, never dragging down the entire system.
- Short-lived queue buildup from network fluctuations can be absorbed by the buffer
- Sustained slow consumption leads to frame drops; clients can typically recover on their own
- Per-subscriber dropped frame counts can be monitored via API
Lock-Free Subscriber Management
Section titled “Lock-Free Subscriber Management”The subscriber list uses ArcSwap<Vec<DispatchSubscriber>> for lock-free management:
Adding Subscribers (COW Write)
Section titled “Adding Subscribers (COW Write)”// Clone-on-Write: does not block ongoing dispatchself.subscribers.rcu(|old| { let mut new = (**old).clone(); new.push(subscriber); new});Frame Dispatch (Lock-Free Read)
Section titled “Frame Dispatch (Lock-Free Read)”// Atomic load - lock-freelet subs = self.subscribers.load();for sub in subs.iter() { if sub.receive_video && !sub.is_closed() { sub.try_send(DispatchFrame::Video(frame.clone())); }}In a scenario with 1000 subscribers @ 60fps, dispatch operations are completely free of lock contention.
DispatcherPool
Section titled “DispatcherPool”When the server needs to handle a large number of concurrent streams (>100), DispatcherPool mode can be enabled.
Architecture
Section titled “Architecture” ┌─────────────────────────────┐ │ DispatcherPool │ │ (Manages N Workers) │ └─────────────┬───────────────┘ │ ┌───────────────────────┼───────────────────────┐ ↓ ↓ ↓ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ Worker 0 │ │ Worker 1 │ │Worker N-1│ │ Stream A │ │ Stream C │ │ Stream E │ │ Stream B │ │ Stream D │ │ Stream F │ └──────────┘ └──────────┘ └──────────┘Consistent Hashing
Section titled “Consistent Hashing”Streams are assigned to Workers via consistent hashing:
fn worker_index(&self, stream_path: &str) -> usize { let mut hasher = DefaultHasher::new(); stream_path.hash(&mut hasher); (hasher.finish() as usize) % self.num_workers}The same stream path is always assigned to the same Worker, ensuring processing continuity for each stream.
Two Operating Modes
Section titled “Two Operating Modes”Controlled by the dispatcher_workers configuration:
| Value | Mode | Description |
|---|---|---|
0 | Per-Stream | One independent Dispatcher task per stream (default) |
N | Pool | N Workers, each handling multiple streams |
# Configuration examplestream: dispatcher_workers: 0 # Per-Stream mode (default) dispatcher_workers: 4 # Pool mode with 4 Workers dispatcher_workers: 8 # Pool mode with 8 WorkersSelection guidelines:
- Few high-concurrency streams (< 100): Use
0(Per-Stream), each stream gets its own task - Many streams (> 100): Use Pool mode to reduce task overhead
- Recommended Worker count: 1–2x the number of CPU cores
Complete Data Flow Sequence
Section titled “Complete Data Flow Sequence”1. Publisher writes a frame Publisher.write_video(frame) │ ▼2. Frame is written to RingBuffer VideoTrack.buffer.write(frame) │ ▼3. Frame change notification frame_notify.send(count) ──── watch::channel │ ▼4. Dispatcher receives notification frame_notify.changed().await ──── Single-thread wakeup │ ▼5. Read frame from RingBuffer (only once) video_reader.read_next() ──── Returns Arc<AVFrame> │ ▼6. Broadcast to all subscribers for sub in subscribers: sub.try_send(Video(frame.clone())) ──── Arc::clone (ref count +1) │ ▼7. Subscriber receives from channel receiver.recv().await ──── Async await │ ▼8. Protocol encoding output RTMP/FLV/HLS/WebRTC encoderPeriodic Cleanup
Section titled “Periodic Cleanup”The Dispatcher performs cleanup every 100 dispatch cycles, removing closed subscribers:
cleanup_counter += 1;if cleanup_counter % 100 == 0 { self.cleanup_closed(); // COW mode removes closed subscribers}When a stream ends, the Dispatcher sends an Eos (End of Stream) signal to all subscribers.