跳转到内容

Dispatcher 分发架构

Dispatcher 是 Monibuca V6 中负责将帧数据从 Publisher 分发到所有 Subscriber 的核心组件。它的设计目标是消除重复读取,实现真正的 O(1) 读取 + O(N) 广播。

在传统的流媒体服务器中,每个订阅者独立从缓冲区读取数据:

传统方案: N 个订阅者 = N 次读取
RingBuffer
├── Reader 1 读取帧 ──▶ Subscriber 1
├── Reader 2 读取帧 ──▶ Subscriber 2
├── Reader 3 读取帧 ──▶ Subscriber 3
└── Reader N 读取帧 ──▶ Subscriber N
问题:
• 1000 个订阅者 = 1000 次 RingBuffer 读取(同一帧)
• 每个 Reader 都需要追踪状态、检测覆写
• 高并发时产生大量原子操作竞争
Dispatcher 方案: N 个订阅者 = 1 次读取
┌─────────────────┐
│ Publisher │
└────────┬────────┘
┌─────────────────┐
│ RingBuffer │
└────────┬────────┘
┌──────────────────────────────┐
│ Dispatcher (单线程读取帧) │
│ • 只读取 1 次 │
│ • Arc 零拷贝分发 │
│ • 非阻塞 try_send │
└──────────────┬───────────────┘
┌────────────┬───────┴───────┬────────────┐
↓ ↓ ↓ ↓
┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐
│ Queue 1 │ │ Queue 2 │ │ Queue 3 │ │ Queue N │
│(bounded)│ │(bounded)│ │(bounded)│ │(bounded)│
└────┬────┘ └────┬────┘ └────┬────┘ └────┬────┘
↓ ↓ ↓ ↓
Writer 1 Writer 2 Writer 3 Writer N
(独立 task) (独立 task) (独立 task) (独立 task)

核心优势:

指标传统方案Dispatcher 方案
RingBuffer 读取次数N 次/帧1 次/帧
读锁竞争O(N)O(1)
帧数据拷贝0(Arc 共享)0(Arc 共享)
慢订阅者影响可能阻塞丢帧,不阻塞
pub struct Dispatcher {
stream_path: String,
// 订阅者列表 - ArcSwap 无锁读取 (COW 模式)
subscribers: ArcSwap<Vec<DispatchSubscriber>>,
next_id: AtomicU64,
running: AtomicBool,
queue_capacity: usize, // 默认 150
total_dispatched: AtomicU64,
}

每个 Stream 对应一个 Dispatcher 实例。

每个订阅者拥有一个 bounded channel(默认容量 150):

Queue 容量 = 150 帧
≈ 1.8 秒 @ 80fps(视频 + 音频混合帧率)
可容忍网络抖动

当队列满时,新帧会被 丢弃try_send 返回 Full),而不是阻塞其他订阅者。丢帧计数器会自动递增,供监控使用。

Dispatcher 通过 channel 发送以下帧类型:

pub enum DispatchFrame {
Video(Arc<AVFrame>), // 视频帧
Audio(Arc<AVFrame>), // 音频帧
VideoSeqHeader(Bytes), // 视频序列头(AVC/HEVC 解码器配置)
AudioSeqHeader(Bytes), // 音频序列头(AAC 解码器配置)
Eos, // 流结束信号
}
Publisher 30fps
Dispatcher ──try_send──▶ [Queue: ████████░░] ──▶ Subscriber(正常)
150/150
Dispatcher ──try_send──▶ [Queue: ██████████] ──▶ Subscriber(卡顿)
FULL! 丢弃帧 frames_dropped++
不阻塞!

设计原则: 慢速订阅者只影响自己,不会拖慢整个系统。

  • 网络波动导致的短暂队列积压可以被缓冲吸收
  • 持续的慢速消费会导致帧丢弃,客户端通常可以自行恢复
  • 每个订阅者的丢帧计数可通过 API 监控

订阅者列表使用 ArcSwap<Vec<DispatchSubscriber>> 实现无锁管理:

// Clone-on-Write: 不阻塞正在进行的分发
self.subscribers.rcu(|old| {
let mut new = (**old).clone();
new.push(subscriber);
new
});
// 原子加载 - 无锁
let subs = self.subscribers.load();
for sub in subs.iter() {
if sub.receive_video && !sub.is_closed() {
sub.try_send(DispatchFrame::Video(frame.clone()));
}
}

在 1000 个订阅者 @ 60fps 的场景下,分发操作完全无锁竞争。

当服务器需要处理大量并发流(>100)时,可以启用 DispatcherPool 模式。

┌─────────────────────────────┐
│ DispatcherPool │
│ (管理 N 个 Worker) │
└─────────────┬───────────────┘
┌───────────────────────┼───────────────────────┐
↓ ↓ ↓
┌──────────┐ ┌──────────┐ ┌──────────┐
│ Worker 0 │ │ Worker 1 │ │Worker N-1│
│ Stream A │ │ Stream C │ │ Stream E │
│ Stream B │ │ Stream D │ │ Stream F │
└──────────┘ └──────────┘ └──────────┘

流通过 一致性哈希 分配到 Worker:

fn worker_index(&self, stream_path: &str) -> usize {
let mut hasher = DefaultHasher::new();
stream_path.hash(&mut hasher);
(hasher.finish() as usize) % self.num_workers
}

同一个流路径始终被分配到相同的 Worker,保证了流的处理连续性。

通过 dispatcher_workers 配置项控制:

模式说明
0Per-Stream每个流一个独立的 Dispatcher task(默认)
NPoolN 个 Worker,每个处理多个流
# 配置示例
stream:
dispatcher_workers: 0 # Per-Stream 模式(默认)
dispatcher_workers: 4 # 4 个 Worker 的池模式
dispatcher_workers: 8 # 8 个 Worker 的池模式

选择建议:

  • 少量高并发流(< 100 路): 使用 0(Per-Stream),每路流独享一个 task
  • 大量流(> 100 路): 使用 Pool 模式,减少 task 开销
  • Worker 数量建议设置为 CPU 核心数的 1~2 倍
1. Publisher 写入帧
Publisher.write_video(frame)
2. 帧写入 RingBuffer
VideoTrack.buffer.write(frame)
3. 通知帧变更
frame_notify.send(count) ──── watch::channel
4. Dispatcher 收到通知
frame_notify.changed().await ──── 单线程唤醒
5. 从 RingBuffer 读取帧(仅 1 次)
video_reader.read_next() ──── 返回 Arc<AVFrame>
6. 广播到所有订阅者
for sub in subscribers:
sub.try_send(Video(frame.clone())) ──── Arc::clone (引用计数 +1)
7. 订阅者从 channel 接收
receiver.recv().await ──── 异步等待
8. 协议编码输出
RTMP/FLV/HLS/WebRTC encoder

Dispatcher 每 100 个帧分发周期执行一次清理,移除已关闭的订阅者:

cleanup_counter += 1;
if cleanup_counter % 100 == 0 {
self.cleanup_closed(); // COW 模式移除已关闭的订阅者
}

当流结束时,Dispatcher 向所有订阅者发送 Eos(End of Stream)信号。

联系我们

微信公众号:不卡科技 微信公众号二维码
腾讯频道:流媒体技术 腾讯频道二维码
QQ 频道:p0qq0crz08 QQ 频道二维码
QQ 群:751639168 QQ 群二维码