跳转到内容

RingBuffer 环形缓冲区

RingBuffer 是 Monibuca V6 流媒体引擎的核心数据结构,负责音视频帧的高效存储和多消费者并发读取。

write_pos (atomic)
┌──────┬──────┬──────┬──────┬──────┬──────┬──────┬──────┐
│Slot 0│Slot 1│Slot 2│Slot 3│Slot 4│Slot 5│ ... │Slot N│
│ Arc │ Arc │ Arc │ Arc │ Arc │ │ │ │
│Frame │Frame │Frame │Frame │Frame │ │ │ │
└──┬───┴──┬───┴──┬───┴──────┴──────┴──────┴──────┴──────┘
│ │ │
│ │ └──── Reader C (read_pos=2)
│ └─────────── Reader B (read_pos=1)
└────────────────── Reader A (read_pos=0)

RingBuffer 采用 SPMC(Single Producer, Multiple Consumer)模式:

  • 单生产者: 一个 Publisher 写入帧数据
  • 多消费者: 多个 Subscriber(通过 RingReader)并发读取
  • 固定容量: 写满后自动回绕覆盖旧数据

写入操作使用 fetch_add 原子指令递增 write_pos,不需要任何锁:

// 原子获取并推进写位置
let pos = self.write_pos.fetch_add(1, Ordering::AcqRel) % self.capacity;
let slot = &self.slots[pos];
// ArcSwap 原子交换帧数据
let arc_frame = Arc::new(frame);
slot.frame.store(Some(arc_frame));
// 更新版本号(用于检测覆写)
slot.version.fetch_add(1, Ordering::Release);

关键设计: 即使某个消费者读取缓慢,写入操作也不会被阻塞。写入方只管向前推进 write_pos,旧数据会被自然覆盖。

每个帧数据存储为 Arc<AVFrame>

Publisher 写入:
AVFrame ──▶ Arc::new(frame) ──▶ Slot.frame (ArcSwapOption)
Subscriber 读取:
Slot.frame.load_full() ──▶ Arc<AVFrame> (引用计数 +1)
├── Subscriber A 持有 Arc
├── Subscriber B 持有 Arc
└── Subscriber C 持有 Arc
  • 所有订阅者共享同一个 Arc<AVFrame>,无需数据拷贝
  • 当所有引用释放后,帧数据自动回收
  • ArcSwapOption 提供无锁原子交换能力

每个 Slot 包含一个原子版本号 version: AtomicU64

pub struct RingSlot {
frame: ArcSwapOption<AVFrame>, // 帧数据
version: AtomicU64, // 版本号
sequence: AtomicU64, // 帧序列号
written: AtomicBool, // 是否已写入
}
  • 每次写入时 version 自增
  • 读取者通过比较 version 检测数据是否被覆写
  • 若序列号不匹配,读取者自动 seek 到最近的 IDR 帧重新同步

视频流需要从关键帧(IDR)开始播放。RingBuffer 维护一个 IDR 帧位置列表:

IDR List (ArcSwap<Vec<IDRNode>>):
┌─────────────────────────────────────┐
│ [IDR@pos=0, IDR@pos=30, IDR@pos=60]│ ◀── 原子指针
└─────────────────────────────────────┘
写入 IDR 帧时 (COW 模式):
1. 加锁 (idr_write_lock)
2. 克隆旧列表
3. 添加新 IDR 节点
4. 原子交换新列表
5. 释放锁

IDR 列表使用 ArcSwap + COW(Copy-on-Write) 模式:

  • 读取(seek 到关键帧): 完全无锁,通过原子指针加载
  • 写入(新关键帧到来): 克隆-修改-交换,不阻塞读操作
  • 这是一种读多写少的优化模式 — 关键帧通常每 1~2 秒出现一次,而 seek 操作频率更高

每个 IDR 节点记录:

pub struct IDRNode {
pub index: usize, // 在 RingBuffer 中的位置
pub sequence: u64, // 帧序列号
pub timestamp: Duration, // 帧时间戳(毫秒)
}

最多追踪 16 个 IDR 帧(默认值,可通过 with_idr_capacity 配置)。

每个订阅者拥有一个独立的 RingReader 实例来追踪各自的读取位置:

pub struct RingReader {
buffer: Arc<RingBuffer>, // 共享缓冲区引用
read_pos: usize, // 当前读取位置
expected_seq: u64, // 期望的下一帧序列号
frames_read: u64, // 已读取帧数
state: ReaderState, // 读取器状态
}
Init
WaitingKeyframe ◀─────┐
│ │
▼ (找到 IDR) │
Normal ───────────┘
│ (被覆写/序列号不匹配)
CatchingUp
▼ (重新 seek IDR)
Normal
状态说明
Init初始状态,等待首次 seek
WaitingKeyframe等待关键帧
Normal正常读取
CatchingUp落后太多,需要跳到最新 IDR
// 1. 检查是否到达写入位置(无新数据)
if self.read_pos == write_pos {
return None;
}
// 2. 检查是否落后太多(距离 > capacity - 2)
if distance > capacity - 2 {
// 重新 seek 到最新 IDR
self.seek_to_latest_idr();
}
// 3. 检查序列号是否匹配
if frame.sequence != self.expected_seq {
// 数据已被覆写,seek 到最新 IDR
self.seek_to_latest_idr();
}
// 4. 读取帧(获取 Arc 引用)
let arc_frame = self.buffer.read(self.read_pos);
self.read_pos = (self.read_pos + 1) % capacity;
轨道默认容量说明
VideoTrack1024 slots约 34 秒缓冲(30fps)
AudioTrack64 slots约 1.3 秒缓冲(48kHz, 1024 samples/frame)

容量可通过 PublisherConfig 配置:

PublisherConfig {
video_buffer_capacity: 1024,
audio_buffer_capacity: 64,
..Default::default()
}
操作延迟锁竞争
写入帧~100ns无锁(atomic fetch_add)
读取帧~50ns无锁(ArcSwap load)
Seek IDR~100ns无锁(ArcSwap load)
添加 IDR~200ns短暂 Mutex(仅序列化 IDR 写入)
RingBuffer (共享)
┌────────────┼────────────┐
▼ ▼ ▼
RingReader RingReader RingReader
(Sub A) (Sub B) (Sub C)
read_pos=10 read_pos=8 read_pos=12
• 每个 Reader 独立追踪位置
• 读操作之间互不干扰
• 写操作不阻塞任何读操作

支持上千个并发读取者,彼此之间零竞争。读取者之间唯一的共享是底层 Arc<AVFrame> 的引用计数操作(硬件级原子指令)。

联系我们

微信公众号:不卡科技 微信公众号二维码
腾讯频道:流媒体技术 腾讯频道二维码
QQ 频道:p0qq0crz08 QQ 频道二维码
QQ 群:751639168 QQ 群二维码