RingBuffer
RingBuffer is the core data structure of the Monibuca V6 streaming engine, responsible for efficient frame storage and concurrent multi-consumer reads.
Design Overview
Section titled “Design Overview” write_pos (atomic) │ ▼┌──────┬──────┬──────┬──────┬──────┬──────┬──────┬──────┐│Slot 0│Slot 1│Slot 2│Slot 3│Slot 4│Slot 5│ ... │Slot N││ Arc │ Arc │ Arc │ Arc │ Arc │ │ │ ││Frame │Frame │Frame │Frame │Frame │ │ │ │└──┬───┴──┬───┴──┬───┴──────┴──────┴──────┴──────┴──────┘ │ │ │ │ │ └──── Reader C (read_pos=2) │ └─────────── Reader B (read_pos=1) └────────────────── Reader A (read_pos=0)RingBuffer uses an SPMC (Single Producer, Multiple Consumer) pattern:
- Single producer: One Publisher writes frame data
- Multiple consumers: Multiple Subscribers (via RingReader) read concurrently
- Fixed capacity: Automatically wraps around and overwrites old data when full
Core Features
Section titled “Core Features”Writes Never Block
Section titled “Writes Never Block”Write operations use fetch_add atomic instructions to increment write_pos, requiring no locks:
// Atomically fetch and advance the write positionlet pos = self.write_pos.fetch_add(1, Ordering::AcqRel) % self.capacity;let slot = &self.slots[pos];
// ArcSwap atomically swaps the frame datalet arc_frame = Arc::new(frame);slot.frame.store(Some(arc_frame));
// Update the version number (used to detect overwrites)slot.version.fetch_add(1, Ordering::Release);Key design: Even if a consumer reads slowly, write operations are never blocked. The writer simply advances write_pos forward, and old data is naturally overwritten.
Arc Zero-Copy Sharing
Section titled “Arc Zero-Copy Sharing”Each frame is stored as an Arc<AVFrame>:
Publisher writes: AVFrame ──▶ Arc::new(frame) ──▶ Slot.frame (ArcSwapOption)
Subscriber reads: Slot.frame.load_full() ──▶ Arc<AVFrame> (reference count +1) │ ├── Subscriber A holds Arc ├── Subscriber B holds Arc └── Subscriber C holds Arc- All subscribers share the same
Arc<AVFrame>, no data copying needed - Frame data is automatically reclaimed when all references are released
ArcSwapOptionprovides lock-free atomic swap capability
Version Number Validation
Section titled “Version Number Validation”Each Slot contains an atomic version number version: AtomicU64:
pub struct RingSlot { frame: ArcSwapOption<AVFrame>, // Frame data version: AtomicU64, // Version number sequence: AtomicU64, // Frame sequence number written: AtomicBool, // Whether data has been written}versionis incremented on each write- Readers compare
versionto detect whether data has been overwritten - If the sequence number doesn’t match, the reader automatically seeks to the nearest IDR frame to resynchronize
IDR Frame Tracking
Section titled “IDR Frame Tracking”Video streams need to start playback from a keyframe (IDR). RingBuffer maintains a list of IDR frame positions:
IDR List (ArcSwap<Vec<IDRNode>>):┌─────────────────────────────────────┐│ [IDR@pos=0, IDR@pos=30, IDR@pos=60]│ ◀── Atomic pointer└─────────────────────────────────────┘
When writing an IDR frame (COW mode):1. Acquire lock (idr_write_lock)2. Clone old list3. Add new IDR node4. Atomically swap in the new list5. Release lockThe IDR list uses an ArcSwap + COW (Copy-on-Write) pattern:
- Read (seek to keyframe): Completely lock-free, loaded via atomic pointer
- Write (new keyframe arrives): Clone-modify-swap, does not block read operations
- This is a read-heavy, write-light optimization — keyframes typically appear every 1–2 seconds, while seek operations occur more frequently
Each IDR node records:
pub struct IDRNode { pub index: usize, // Position in the RingBuffer pub sequence: u64, // Frame sequence number pub timestamp: Duration, // Frame timestamp (milliseconds)}A maximum of 16 IDR frames are tracked (default value, configurable via with_idr_capacity).
RingReader
Section titled “RingReader”Each subscriber owns an independent RingReader instance to track its own read position:
pub struct RingReader { buffer: Arc<RingBuffer>, // Shared buffer reference read_pos: usize, // Current read position expected_seq: u64, // Expected next frame sequence number frames_read: u64, // Number of frames read state: ReaderState, // Reader state}Reader State Machine
Section titled “Reader State Machine” Init │ ▼ WaitingKeyframe ◀─────┐ │ │ ▼ (IDR found) │ Normal ───────────┘ │ (overwritten/sequence mismatch) ▼ CatchingUp │ ▼ (re-seek to IDR) Normal| State | Description |
|---|---|
Init | Initial state, waiting for first seek |
WaitingKeyframe | Waiting for a keyframe |
Normal | Normal reading |
CatchingUp | Fallen too far behind, needs to jump to the latest IDR |
Read Flow
Section titled “Read Flow”// 1. Check if we've reached the write position (no new data)if self.read_pos == write_pos { return None;}
// 2. Check if we've fallen too far behind (distance > capacity - 2)if distance > capacity - 2 { // Re-seek to the latest IDR self.seek_to_latest_idr();}
// 3. Check if the sequence number matchesif frame.sequence != self.expected_seq { // Data has been overwritten, seek to the latest IDR self.seek_to_latest_idr();}
// 4. Read the frame (acquire Arc reference)let arc_frame = self.buffer.read(self.read_pos);self.read_pos = (self.read_pos + 1) % capacity;Default Capacities
Section titled “Default Capacities”| Track | Default Capacity | Description |
|---|---|---|
| VideoTrack | 1024 slots | ~34 seconds of buffer (at 30fps) |
| AudioTrack | 64 slots | ~1.3 seconds of buffer (at 48kHz, 1024 samples/frame) |
Capacities are configurable via PublisherConfig:
PublisherConfig { video_buffer_capacity: 1024, audio_buffer_capacity: 64, ..Default::default()}Performance Characteristics
Section titled “Performance Characteristics”| Operation | Latency | Lock Contention |
|---|---|---|
| Write frame | ~100ns | Lock-free (atomic fetch_add) |
| Read frame | ~50ns | Lock-free (ArcSwap load) |
| Seek IDR | ~100ns | Lock-free (ArcSwap load) |
| Add IDR | ~200ns | Brief Mutex (serializes IDR writes only) |
Multi-Reader Concurrency
Section titled “Multi-Reader Concurrency” RingBuffer (shared) │ ┌────────────┼────────────┐ ▼ ▼ ▼ RingReader RingReader RingReader (Sub A) (Sub B) (Sub C) read_pos=10 read_pos=8 read_pos=12
• Each Reader independently tracks its position • Read operations do not interfere with each other • Write operations never block any read operationsSupports thousands of concurrent readers with zero contention between them. The only shared operation between readers is the reference count on the underlying Arc<AVFrame> (hardware-level atomic instructions).