RingBuffer 环形缓冲区
RingBuffer 是 Monibuca V6 流媒体引擎的核心数据结构,负责音视频帧的高效存储和多消费者并发读取。
write_pos (atomic) │ ▼┌──────┬──────┬──────┬──────┬──────┬──────┬──────┬──────┐│Slot 0│Slot 1│Slot 2│Slot 3│Slot 4│Slot 5│ ... │Slot N││ Arc │ Arc │ Arc │ Arc │ Arc │ │ │ ││Frame │Frame │Frame │Frame │Frame │ │ │ │└──┬───┴──┬───┴──┬───┴──────┴──────┴──────┴──────┴──────┘ │ │ │ │ │ └──── Reader C (read_pos=2) │ └─────────── Reader B (read_pos=1) └────────────────── Reader A (read_pos=0)RingBuffer 采用 SPMC(Single Producer, Multiple Consumer)模式:
- 单生产者: 一个 Publisher 写入帧数据
- 多消费者: 多个 Subscriber(通过 RingReader)并发读取
- 固定容量: 写满后自动回绕覆盖旧数据
写入永不阻塞
Section titled “写入永不阻塞”写入操作使用 fetch_add 原子指令递增 write_pos,不需要任何锁:
// 原子获取并推进写位置let pos = self.write_pos.fetch_add(1, Ordering::AcqRel) % self.capacity;let slot = &self.slots[pos];
// ArcSwap 原子交换帧数据let arc_frame = Arc::new(frame);slot.frame.store(Some(arc_frame));
// 更新版本号(用于检测覆写)slot.version.fetch_add(1, Ordering::Release);关键设计: 即使某个消费者读取缓慢,写入操作也不会被阻塞。写入方只管向前推进 write_pos,旧数据会被自然覆盖。
Arc 零拷贝共享
Section titled “Arc 零拷贝共享”每个帧数据存储为 Arc<AVFrame>:
Publisher 写入: AVFrame ──▶ Arc::new(frame) ──▶ Slot.frame (ArcSwapOption)
Subscriber 读取: Slot.frame.load_full() ──▶ Arc<AVFrame> (引用计数 +1) │ ├── Subscriber A 持有 Arc ├── Subscriber B 持有 Arc └── Subscriber C 持有 Arc- 所有订阅者共享同一个
Arc<AVFrame>,无需数据拷贝 - 当所有引用释放后,帧数据自动回收
ArcSwapOption提供无锁原子交换能力
每个 Slot 包含一个原子版本号 version: AtomicU64:
pub struct RingSlot { frame: ArcSwapOption<AVFrame>, // 帧数据 version: AtomicU64, // 版本号 sequence: AtomicU64, // 帧序列号 written: AtomicBool, // 是否已写入}- 每次写入时
version自增 - 读取者通过比较
version检测数据是否被覆写 - 若序列号不匹配,读取者自动 seek 到最近的 IDR 帧重新同步
IDR 帧追踪
Section titled “IDR 帧追踪”视频流需要从关键帧(IDR)开始播放。RingBuffer 维护一个 IDR 帧位置列表:
IDR List (ArcSwap<Vec<IDRNode>>):┌─────────────────────────────────────┐│ [IDR@pos=0, IDR@pos=30, IDR@pos=60]│ ◀── 原子指针└─────────────────────────────────────┘
写入 IDR 帧时 (COW 模式):1. 加锁 (idr_write_lock)2. 克隆旧列表3. 添加新 IDR 节点4. 原子交换新列表5. 释放锁IDR 列表使用 ArcSwap + COW(Copy-on-Write) 模式:
- 读取(seek 到关键帧): 完全无锁,通过原子指针加载
- 写入(新关键帧到来): 克隆-修改-交换,不阻塞读操作
- 这是一种读多写少的优化模式 — 关键帧通常每 1~2 秒出现一次,而 seek 操作频率更高
每个 IDR 节点记录:
pub struct IDRNode { pub index: usize, // 在 RingBuffer 中的位置 pub sequence: u64, // 帧序列号 pub timestamp: Duration, // 帧时间戳(毫秒)}最多追踪 16 个 IDR 帧(默认值,可通过 with_idr_capacity 配置)。
RingReader 读取器
Section titled “RingReader 读取器”每个订阅者拥有一个独立的 RingReader 实例来追踪各自的读取位置:
pub struct RingReader { buffer: Arc<RingBuffer>, // 共享缓冲区引用 read_pos: usize, // 当前读取位置 expected_seq: u64, // 期望的下一帧序列号 frames_read: u64, // 已读取帧数 state: ReaderState, // 读取器状态}读取器状态机
Section titled “读取器状态机” Init │ ▼ WaitingKeyframe ◀─────┐ │ │ ▼ (找到 IDR) │ Normal ───────────┘ │ (被覆写/序列号不匹配) ▼ CatchingUp │ ▼ (重新 seek IDR) Normal| 状态 | 说明 |
|---|---|
Init | 初始状态,等待首次 seek |
WaitingKeyframe | 等待关键帧 |
Normal | 正常读取 |
CatchingUp | 落后太多,需要跳到最新 IDR |
// 1. 检查是否到达写入位置(无新数据)if self.read_pos == write_pos { return None;}
// 2. 检查是否落后太多(距离 > capacity - 2)if distance > capacity - 2 { // 重新 seek 到最新 IDR self.seek_to_latest_idr();}
// 3. 检查序列号是否匹配if frame.sequence != self.expected_seq { // 数据已被覆写,seek 到最新 IDR self.seek_to_latest_idr();}
// 4. 读取帧(获取 Arc 引用)let arc_frame = self.buffer.read(self.read_pos);self.read_pos = (self.read_pos + 1) % capacity;| 轨道 | 默认容量 | 说明 |
|---|---|---|
| VideoTrack | 1024 slots | 约 34 秒缓冲(30fps) |
| AudioTrack | 64 slots | 约 1.3 秒缓冲(48kHz, 1024 samples/frame) |
容量可通过 PublisherConfig 配置:
PublisherConfig { video_buffer_capacity: 1024, audio_buffer_capacity: 64, ..Default::default()}| 操作 | 延迟 | 锁竞争 |
|---|---|---|
| 写入帧 | ~100ns | 无锁(atomic fetch_add) |
| 读取帧 | ~50ns | 无锁(ArcSwap load) |
| Seek IDR | ~100ns | 无锁(ArcSwap load) |
| 添加 IDR | ~200ns | 短暂 Mutex(仅序列化 IDR 写入) |
RingBuffer (共享) │ ┌────────────┼────────────┐ ▼ ▼ ▼ RingReader RingReader RingReader (Sub A) (Sub B) (Sub C) read_pos=10 read_pos=8 read_pos=12
• 每个 Reader 独立追踪位置 • 读操作之间互不干扰 • 写操作不阻塞任何读操作支持上千个并发读取者,彼此之间零竞争。读取者之间唯一的共享是底层 Arc<AVFrame> 的引用计数操作(硬件级原子指令)。
联系我们
微信公众号:不卡科技
腾讯频道:流媒体技术
QQ 频道:p0qq0crz08
QQ 群:751639168