Dispatcher 分发架构
Dispatcher 是 Monibuca V6 中负责将帧数据从 Publisher 分发到所有 Subscriber 的核心组件。它的设计目标是消除重复读取,实现真正的 O(1) 读取 + O(N) 广播。
传统 N-Reader 方案的问题
Section titled “传统 N-Reader 方案的问题”在传统的流媒体服务器中,每个订阅者独立从缓冲区读取数据:
传统方案: N 个订阅者 = N 次读取
RingBuffer │ ├── Reader 1 读取帧 ──▶ Subscriber 1 ├── Reader 2 读取帧 ──▶ Subscriber 2 ├── Reader 3 读取帧 ──▶ Subscriber 3 └── Reader N 读取帧 ──▶ Subscriber N
问题:• 1000 个订阅者 = 1000 次 RingBuffer 读取(同一帧)• 每个 Reader 都需要追踪状态、检测覆写• 高并发时产生大量原子操作竞争Dispatcher 方案
Section titled “Dispatcher 方案”Dispatcher 方案: N 个订阅者 = 1 次读取
┌─────────────────┐ │ Publisher │ └────────┬────────┘ ↓ ┌─────────────────┐ │ RingBuffer │ └────────┬────────┘ ↓ ┌──────────────────────────────┐ │ Dispatcher (单线程读取帧) │ │ • 只读取 1 次 │ │ • Arc 零拷贝分发 │ │ • 非阻塞 try_send │ └──────────────┬───────────────┘ ↓ ┌────────────┬───────┴───────┬────────────┐ ↓ ↓ ↓ ↓┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐│ Queue 1 │ │ Queue 2 │ │ Queue 3 │ │ Queue N ││(bounded)│ │(bounded)│ │(bounded)│ │(bounded)│└────┬────┘ └────┬────┘ └────┬────┘ └────┬────┘ ↓ ↓ ↓ ↓ Writer 1 Writer 2 Writer 3 Writer N (独立 task) (独立 task) (独立 task) (独立 task)核心优势:
| 指标 | 传统方案 | Dispatcher 方案 |
|---|---|---|
| RingBuffer 读取次数 | N 次/帧 | 1 次/帧 |
| 读锁竞争 | O(N) | O(1) |
| 帧数据拷贝 | 0(Arc 共享) | 0(Arc 共享) |
| 慢订阅者影响 | 可能阻塞 | 丢帧,不阻塞 |
Dispatcher 结构
Section titled “Dispatcher 结构”pub struct Dispatcher { stream_path: String, // 订阅者列表 - ArcSwap 无锁读取 (COW 模式) subscribers: ArcSwap<Vec<DispatchSubscriber>>, next_id: AtomicU64, running: AtomicBool, queue_capacity: usize, // 默认 150 total_dispatched: AtomicU64,}每个 Stream 对应一个 Dispatcher 实例。
每个订阅者拥有一个 bounded channel(默认容量 150):
Queue 容量 = 150 帧≈ 1.8 秒 @ 80fps(视频 + 音频混合帧率)可容忍网络抖动当队列满时,新帧会被 丢弃(try_send 返回 Full),而不是阻塞其他订阅者。丢帧计数器会自动递增,供监控使用。
Dispatcher 通过 channel 发送以下帧类型:
pub enum DispatchFrame { Video(Arc<AVFrame>), // 视频帧 Audio(Arc<AVFrame>), // 音频帧 VideoSeqHeader(Bytes), // 视频序列头(AVC/HEVC 解码器配置) AudioSeqHeader(Bytes), // 音频序列头(AAC 解码器配置) Eos, // 流结束信号}Bounded Channel 背压机制
Section titled “Bounded Channel 背压机制”Publisher 30fps │ ▼Dispatcher ──try_send──▶ [Queue: ████████░░] ──▶ Subscriber(正常) 150/150
Dispatcher ──try_send──▶ [Queue: ██████████] ──▶ Subscriber(卡顿) FULL! 丢弃帧 frames_dropped++ 不阻塞!设计原则: 慢速订阅者只影响自己,不会拖慢整个系统。
- 网络波动导致的短暂队列积压可以被缓冲吸收
- 持续的慢速消费会导致帧丢弃,客户端通常可以自行恢复
- 每个订阅者的丢帧计数可通过 API 监控
无锁订阅者管理
Section titled “无锁订阅者管理”订阅者列表使用 ArcSwap<Vec<DispatchSubscriber>> 实现无锁管理:
添加订阅者(COW 写入)
Section titled “添加订阅者(COW 写入)”// Clone-on-Write: 不阻塞正在进行的分发self.subscribers.rcu(|old| { let mut new = (**old).clone(); new.push(subscriber); new});帧分发(无锁读取)
Section titled “帧分发(无锁读取)”// 原子加载 - 无锁let subs = self.subscribers.load();for sub in subs.iter() { if sub.receive_video && !sub.is_closed() { sub.try_send(DispatchFrame::Video(frame.clone())); }}在 1000 个订阅者 @ 60fps 的场景下,分发操作完全无锁竞争。
DispatcherPool
Section titled “DispatcherPool”当服务器需要处理大量并发流(>100)时,可以启用 DispatcherPool 模式。
┌─────────────────────────────┐ │ DispatcherPool │ │ (管理 N 个 Worker) │ └─────────────┬───────────────┘ │ ┌───────────────────────┼───────────────────────┐ ↓ ↓ ↓ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ Worker 0 │ │ Worker 1 │ │Worker N-1│ │ Stream A │ │ Stream C │ │ Stream E │ │ Stream B │ │ Stream D │ │ Stream F │ └──────────┘ └──────────┘ └──────────┘流通过 一致性哈希 分配到 Worker:
fn worker_index(&self, stream_path: &str) -> usize { let mut hasher = DefaultHasher::new(); stream_path.hash(&mut hasher); (hasher.finish() as usize) % self.num_workers}同一个流路径始终被分配到相同的 Worker,保证了流的处理连续性。
两种工作模式
Section titled “两种工作模式”通过 dispatcher_workers 配置项控制:
| 值 | 模式 | 说明 |
|---|---|---|
0 | Per-Stream | 每个流一个独立的 Dispatcher task(默认) |
N | Pool | N 个 Worker,每个处理多个流 |
# 配置示例stream: dispatcher_workers: 0 # Per-Stream 模式(默认) dispatcher_workers: 4 # 4 个 Worker 的池模式 dispatcher_workers: 8 # 8 个 Worker 的池模式选择建议:
- 少量高并发流(< 100 路): 使用
0(Per-Stream),每路流独享一个 task - 大量流(> 100 路): 使用 Pool 模式,减少 task 开销
- Worker 数量建议设置为 CPU 核心数的 1~2 倍
完整数据流时序
Section titled “完整数据流时序”1. Publisher 写入帧 Publisher.write_video(frame) │ ▼2. 帧写入 RingBuffer VideoTrack.buffer.write(frame) │ ▼3. 通知帧变更 frame_notify.send(count) ──── watch::channel │ ▼4. Dispatcher 收到通知 frame_notify.changed().await ──── 单线程唤醒 │ ▼5. 从 RingBuffer 读取帧(仅 1 次) video_reader.read_next() ──── 返回 Arc<AVFrame> │ ▼6. 广播到所有订阅者 for sub in subscribers: sub.try_send(Video(frame.clone())) ──── Arc::clone (引用计数 +1) │ ▼7. 订阅者从 channel 接收 receiver.recv().await ──── 异步等待 │ ▼8. 协议编码输出 RTMP/FLV/HLS/WebRTC encoderDispatcher 每 100 个帧分发周期执行一次清理,移除已关闭的订阅者:
cleanup_counter += 1;if cleanup_counter % 100 == 0 { self.cleanup_closed(); // COW 模式移除已关闭的订阅者}当流结束时,Dispatcher 向所有订阅者发送 Eos(End of Stream)信号。
联系我们
微信公众号:不卡科技
腾讯频道:流媒体技术
QQ 频道:p0qq0crz08
QQ 群:751639168