Skip to content

RingBuffer

RingBuffer is the core data structure of the Monibuca V6 streaming engine, responsible for efficient frame storage and concurrent multi-consumer reads.

write_pos (atomic)
┌──────┬──────┬──────┬──────┬──────┬──────┬──────┬──────┐
│Slot 0│Slot 1│Slot 2│Slot 3│Slot 4│Slot 5│ ... │Slot N│
│ Arc │ Arc │ Arc │ Arc │ Arc │ │ │ │
│Frame │Frame │Frame │Frame │Frame │ │ │ │
└──┬───┴──┬───┴──┬───┴──────┴──────┴──────┴──────┴──────┘
│ │ │
│ │ └──── Reader C (read_pos=2)
│ └─────────── Reader B (read_pos=1)
└────────────────── Reader A (read_pos=0)

RingBuffer uses an SPMC (Single Producer, Multiple Consumer) pattern:

  • Single producer: One Publisher writes frame data
  • Multiple consumers: Multiple Subscribers (via RingReader) read concurrently
  • Fixed capacity: Automatically wraps around and overwrites old data when full

Write operations use fetch_add atomic instructions to increment write_pos, requiring no locks:

// Atomically fetch and advance the write position
let pos = self.write_pos.fetch_add(1, Ordering::AcqRel) % self.capacity;
let slot = &self.slots[pos];
// ArcSwap atomically swaps the frame data
let arc_frame = Arc::new(frame);
slot.frame.store(Some(arc_frame));
// Update the version number (used to detect overwrites)
slot.version.fetch_add(1, Ordering::Release);

Key design: Even if a consumer reads slowly, write operations are never blocked. The writer simply advances write_pos forward, and old data is naturally overwritten.

Each frame is stored as an Arc<AVFrame>:

Publisher writes:
AVFrame ──▶ Arc::new(frame) ──▶ Slot.frame (ArcSwapOption)
Subscriber reads:
Slot.frame.load_full() ──▶ Arc<AVFrame> (reference count +1)
├── Subscriber A holds Arc
├── Subscriber B holds Arc
└── Subscriber C holds Arc
  • All subscribers share the same Arc<AVFrame>, no data copying needed
  • Frame data is automatically reclaimed when all references are released
  • ArcSwapOption provides lock-free atomic swap capability

Each Slot contains an atomic version number version: AtomicU64:

pub struct RingSlot {
frame: ArcSwapOption<AVFrame>, // Frame data
version: AtomicU64, // Version number
sequence: AtomicU64, // Frame sequence number
written: AtomicBool, // Whether data has been written
}
  • version is incremented on each write
  • Readers compare version to detect whether data has been overwritten
  • If the sequence number doesn’t match, the reader automatically seeks to the nearest IDR frame to resynchronize

Video streams need to start playback from a keyframe (IDR). RingBuffer maintains a list of IDR frame positions:

IDR List (ArcSwap<Vec<IDRNode>>):
┌─────────────────────────────────────┐
│ [IDR@pos=0, IDR@pos=30, IDR@pos=60]│ ◀── Atomic pointer
└─────────────────────────────────────┘
When writing an IDR frame (COW mode):
1. Acquire lock (idr_write_lock)
2. Clone old list
3. Add new IDR node
4. Atomically swap in the new list
5. Release lock

The IDR list uses an ArcSwap + COW (Copy-on-Write) pattern:

  • Read (seek to keyframe): Completely lock-free, loaded via atomic pointer
  • Write (new keyframe arrives): Clone-modify-swap, does not block read operations
  • This is a read-heavy, write-light optimization — keyframes typically appear every 1–2 seconds, while seek operations occur more frequently

Each IDR node records:

pub struct IDRNode {
pub index: usize, // Position in the RingBuffer
pub sequence: u64, // Frame sequence number
pub timestamp: Duration, // Frame timestamp (milliseconds)
}

A maximum of 16 IDR frames are tracked (default value, configurable via with_idr_capacity).

Each subscriber owns an independent RingReader instance to track its own read position:

pub struct RingReader {
buffer: Arc<RingBuffer>, // Shared buffer reference
read_pos: usize, // Current read position
expected_seq: u64, // Expected next frame sequence number
frames_read: u64, // Number of frames read
state: ReaderState, // Reader state
}
Init
WaitingKeyframe ◀─────┐
│ │
▼ (IDR found) │
Normal ───────────┘
│ (overwritten/sequence mismatch)
CatchingUp
▼ (re-seek to IDR)
Normal
StateDescription
InitInitial state, waiting for first seek
WaitingKeyframeWaiting for a keyframe
NormalNormal reading
CatchingUpFallen too far behind, needs to jump to the latest IDR
// 1. Check if we've reached the write position (no new data)
if self.read_pos == write_pos {
return None;
}
// 2. Check if we've fallen too far behind (distance > capacity - 2)
if distance > capacity - 2 {
// Re-seek to the latest IDR
self.seek_to_latest_idr();
}
// 3. Check if the sequence number matches
if frame.sequence != self.expected_seq {
// Data has been overwritten, seek to the latest IDR
self.seek_to_latest_idr();
}
// 4. Read the frame (acquire Arc reference)
let arc_frame = self.buffer.read(self.read_pos);
self.read_pos = (self.read_pos + 1) % capacity;
TrackDefault CapacityDescription
VideoTrack1024 slots~34 seconds of buffer (at 30fps)
AudioTrack64 slots~1.3 seconds of buffer (at 48kHz, 1024 samples/frame)

Capacities are configurable via PublisherConfig:

PublisherConfig {
video_buffer_capacity: 1024,
audio_buffer_capacity: 64,
..Default::default()
}
OperationLatencyLock Contention
Write frame~100nsLock-free (atomic fetch_add)
Read frame~50nsLock-free (ArcSwap load)
Seek IDR~100nsLock-free (ArcSwap load)
Add IDR~200nsBrief Mutex (serializes IDR writes only)
RingBuffer (shared)
┌────────────┼────────────┐
▼ ▼ ▼
RingReader RingReader RingReader
(Sub A) (Sub B) (Sub C)
read_pos=10 read_pos=8 read_pos=12
• Each Reader independently tracks its position
• Read operations do not interfere with each other
• Write operations never block any read operations

Supports thousands of concurrent readers with zero contention between them. The only shared operation between readers is the reference count on the underlying Arc<AVFrame> (hardware-level atomic instructions).