445 lines
16 KiB
Rust
445 lines
16 KiB
Rust
//! Ring buffer wrapper for communicating with the Media Driver
|
|
use crate::client::concurrent::AtomicBuffer;
|
|
use crate::util::bit::align;
|
|
use crate::util::{bit, AeronError, IndexT, Result};
|
|
use std::ops::{Deref, DerefMut};
|
|
|
|
/// Description of the Ring Buffer schema.
|
|
pub mod buffer_descriptor {
|
|
use crate::client::concurrent::AtomicBuffer;
|
|
use crate::util::bit::{is_power_of_two, CACHE_LINE_LENGTH};
|
|
use crate::util::AeronError::IllegalArgument;
|
|
use crate::util::{IndexT, Result};
|
|
|
|
// QUESTION: Why are these offsets so large when we only ever use i64 types?
|
|
|
|
/// Offset in the ring buffer metadata to the end of the most recent record.
|
|
pub const TAIL_POSITION_OFFSET: IndexT = (CACHE_LINE_LENGTH * 2) as IndexT;
|
|
|
|
/// QUESTION: Why the distinction between HEAD_CACHE and HEAD?
|
|
pub const HEAD_CACHE_POSITION_OFFSET: IndexT = (CACHE_LINE_LENGTH * 4) as IndexT;
|
|
|
|
/// Offset in the ring buffer metadata to index of the next record to read.
|
|
pub const HEAD_POSITION_OFFSET: IndexT = (CACHE_LINE_LENGTH * 6) as IndexT;
|
|
|
|
/// Offset of the correlation id counter, as measured in bytes past
|
|
/// the start of the ring buffer metadata trailer.
|
|
pub const CORRELATION_COUNTER_OFFSET: IndexT = (CACHE_LINE_LENGTH * 8) as IndexT;
|
|
|
|
/// Total size of the ring buffer metadata trailer.
|
|
pub const TRAILER_LENGTH: IndexT = (CACHE_LINE_LENGTH * 12) as IndexT;
|
|
|
|
/// Verify the capacity of a buffer is legal for use as a ring buffer.
|
|
/// Returns the actual capacity excluding ring buffer metadata.
|
|
pub fn check_capacity<A>(buffer: &A) -> Result<IndexT>
|
|
where
|
|
A: AtomicBuffer,
|
|
{
|
|
let capacity = (buffer.len() - TRAILER_LENGTH as usize) as IndexT;
|
|
if is_power_of_two(capacity) {
|
|
Ok(capacity)
|
|
} else {
|
|
Err(IllegalArgument)
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Ring buffer message header. Made up of fields for message length, message type,
|
|
/// and then the encoded message.
|
|
///
|
|
/// Writing the record length signals the message recording is complete, and all
|
|
/// associated ring buffer metadata has been properly updated.
|
|
///
|
|
/// ```text
|
|
/// 0 1 2 3
|
|
/// 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
|
|
/// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
|
|
/// |R| Record Length |
|
|
/// +-+-------------------------------------------------------------+
|
|
/// | Type |
|
|
/// +---------------------------------------------------------------+
|
|
/// | Encoded Message ...
|
|
///... |
|
|
/// +---------------------------------------------------------------+
|
|
/// ```
|
|
// QUESTION: What is the `R` bit in the diagram above?
|
|
pub mod record_descriptor {
|
|
use std::mem::size_of;
|
|
|
|
use crate::util::Result;
|
|
use crate::util::{AeronError, IndexT};
|
|
|
|
/// Size of the ring buffer record header.
|
|
pub const HEADER_LENGTH: IndexT = size_of::<i32>() as IndexT * 2;
|
|
|
|
/// Alignment size of records written to the buffer
|
|
pub const ALIGNMENT: IndexT = HEADER_LENGTH;
|
|
|
|
/// Message type indicating to the media driver that space has been reserved,
|
|
/// and is not yet ready for processing.
|
|
pub const PADDING_MSG_TYPE_ID: i32 = -1;
|
|
|
|
pub(super) fn make_header(length: i32, msg_type_id: i32) -> i64 {
|
|
// QUESTION: Instead of masking, can't we just cast and return u32/u64?
|
|
// Smells like Java.
|
|
((i64::from(msg_type_id) & 0xFFFF_FFFF) << 32) | (i64::from(length) & 0xFFFF_FFFF)
|
|
}
|
|
|
|
pub(super) fn check_msg_type_id(msg_type_id: i32) -> Result<()> {
|
|
if msg_type_id < 1 {
|
|
Err(AeronError::IllegalArgument)
|
|
} else {
|
|
Ok(())
|
|
}
|
|
}
|
|
|
|
pub(super) fn encoded_msg_offset(record_offset: IndexT) -> IndexT {
|
|
record_offset + HEADER_LENGTH
|
|
}
|
|
|
|
/// Return the position of the record length field given a record's starting position
|
|
pub fn length_offset(record_offset: IndexT) -> IndexT {
|
|
record_offset
|
|
}
|
|
|
|
/// Return the position of the record message type field given a record's starting position
|
|
pub fn type_offset(record_offset: IndexT) -> IndexT {
|
|
record_offset + size_of::<i32>() as IndexT
|
|
}
|
|
|
|
pub(super) fn record_length(header: i64) -> i32 {
|
|
header as i32
|
|
}
|
|
|
|
pub(super) fn message_type_id(header: i64) -> i32 {
|
|
(header >> 32) as i32
|
|
}
|
|
}
|
|
|
|
const INSUFFICIENT_CAPACITY: IndexT = -2;
|
|
|
|
/// Multi-producer, single-consumer ring buffer implementation.
|
|
pub struct ManyToOneRingBuffer<A>
|
|
where
|
|
A: AtomicBuffer,
|
|
{
|
|
buffer: A,
|
|
capacity: IndexT,
|
|
max_msg_length: IndexT,
|
|
tail_position_index: IndexT,
|
|
head_cache_position_index: IndexT,
|
|
head_position_index: IndexT,
|
|
correlation_id_counter_index: IndexT,
|
|
}
|
|
|
|
impl<A> ManyToOneRingBuffer<A>
|
|
where
|
|
A: AtomicBuffer,
|
|
{
|
|
/// Create a many-to-one ring buffer from an underlying atomic buffer.
|
|
pub fn new(buffer: A) -> Result<Self> {
|
|
buffer_descriptor::check_capacity(&buffer).map(|capacity| ManyToOneRingBuffer {
|
|
buffer,
|
|
capacity,
|
|
max_msg_length: capacity / 8,
|
|
tail_position_index: capacity + buffer_descriptor::TAIL_POSITION_OFFSET,
|
|
head_cache_position_index: capacity + buffer_descriptor::HEAD_CACHE_POSITION_OFFSET,
|
|
head_position_index: capacity + buffer_descriptor::HEAD_POSITION_OFFSET,
|
|
correlation_id_counter_index: capacity + buffer_descriptor::CORRELATION_COUNTER_OFFSET,
|
|
})
|
|
}
|
|
|
|
/// Atomically retrieve the next correlation identifier. Used as a unique identifier for
|
|
/// interactions with the Media Driver
|
|
pub fn next_correlation_id(&self) -> i64 {
|
|
// UNWRAP: Known-valid offset calculated during initialization
|
|
self.buffer
|
|
.get_and_add_i64(self.correlation_id_counter_index, 1)
|
|
.unwrap()
|
|
}
|
|
|
|
/// Return the total number of bytes in this buffer
|
|
pub fn capacity(&self) -> IndexT {
|
|
self.capacity
|
|
}
|
|
|
|
/// Write a message into the ring buffer
|
|
pub fn write<B>(
|
|
&mut self,
|
|
msg_type_id: i32,
|
|
source: &B,
|
|
source_index: IndexT,
|
|
length: IndexT,
|
|
) -> Result<bool>
|
|
where
|
|
B: AtomicBuffer,
|
|
{
|
|
record_descriptor::check_msg_type_id(msg_type_id)?;
|
|
self.check_msg_length(length)?;
|
|
|
|
let record_len = length + record_descriptor::HEADER_LENGTH;
|
|
let required = bit::align(record_len as usize, record_descriptor::ALIGNMENT as usize);
|
|
let record_index = self.claim_capacity(required as IndexT)?;
|
|
|
|
if record_index == INSUFFICIENT_CAPACITY {
|
|
return Ok(false);
|
|
}
|
|
|
|
// UNWRAP: `claim_capacity` performed bounds checking
|
|
self.buffer
|
|
.put_i64_ordered(
|
|
record_index,
|
|
record_descriptor::make_header(-length, msg_type_id),
|
|
)
|
|
.unwrap();
|
|
// UNWRAP: `claim_capacity` performed bounds checking
|
|
self.buffer
|
|
.put_bytes(
|
|
record_descriptor::encoded_msg_offset(record_index),
|
|
source,
|
|
source_index,
|
|
length,
|
|
)
|
|
.unwrap();
|
|
// UNWRAP: `claim_capacity` performed bounds checking
|
|
self.buffer
|
|
.put_i32_ordered(record_descriptor::length_offset(record_index), record_len)
|
|
.unwrap();
|
|
|
|
Ok(true)
|
|
}
|
|
|
|
/// Read messages from the ring buffer and dispatch to `handler`, up to `message_count_limit`.
|
|
/// The handler is given the message type identifier and message body as arguments.
|
|
///
|
|
/// NOTE: The C++ API will stop reading and clean up if an exception is thrown in the handler
|
|
/// function; by contrast, the Rust API makes no attempt to catch panics and currently
|
|
/// has no way of stopping reading once started.
|
|
pub fn read_n<F>(&mut self, mut handler: F, message_count_limit: usize) -> Result<usize>
|
|
where
|
|
F: FnMut(i32, &[u8]) -> (),
|
|
{
|
|
let head = self.buffer.get_i64(self.head_position_index)?;
|
|
let head_index = (head & i64::from(self.capacity - 1)) as i32;
|
|
let contiguous_block_length = self.capacity - head_index;
|
|
let mut messages_read = 0;
|
|
let mut bytes_read: i32 = 0;
|
|
|
|
let result: Result<()> = (|| {
|
|
while bytes_read < contiguous_block_length && messages_read < message_count_limit {
|
|
let record_index = head_index + bytes_read;
|
|
let header = self.buffer.get_i64_volatile(record_index)?;
|
|
let record_length = record_descriptor::record_length(header);
|
|
|
|
if record_length <= 0 {
|
|
break;
|
|
}
|
|
|
|
bytes_read += align(
|
|
record_length as usize,
|
|
record_descriptor::ALIGNMENT as usize,
|
|
) as i32;
|
|
|
|
let msg_type_id = record_descriptor::message_type_id(header);
|
|
if msg_type_id == record_descriptor::PADDING_MSG_TYPE_ID {
|
|
// QUESTION: Is this a spinlock on a writer finishing?
|
|
continue;
|
|
}
|
|
|
|
messages_read += 1;
|
|
let msg_start = record_descriptor::encoded_msg_offset(record_index) as usize;
|
|
let msg_end =
|
|
msg_start + (record_length - record_descriptor::HEADER_LENGTH) as usize;
|
|
handler(msg_type_id, &self.buffer[msg_start..msg_end]);
|
|
}
|
|
Ok(())
|
|
})();
|
|
|
|
// C++ has much better semantics for handling cleanup like this; however, because
|
|
// it would require us to capture a mutable reference to self, it's not feasible
|
|
// in Rust (since the main operation also needs mutable access to self).
|
|
let mut cleanup = || {
|
|
if bytes_read != 0 {
|
|
// UNWRAP: Need to justify this one.
|
|
// Should be safe because we've already done length checks, but I want
|
|
// to spend some more time thinking about it.
|
|
self.buffer
|
|
.set_memory(head_index, bytes_read as usize, 0)
|
|
.unwrap();
|
|
self.buffer
|
|
.put_i64_ordered(self.head_position_index, head + i64::from(bytes_read))
|
|
.unwrap();
|
|
}
|
|
};
|
|
result.map(|_| cleanup()).map_err(|e| {
|
|
cleanup();
|
|
e
|
|
})?;
|
|
|
|
Ok(messages_read)
|
|
}
|
|
|
|
/// Read messages from the ring buffer and dispatch to `handler`, up to `message_count_limit`
|
|
/// The handler is given the message type identifier and message body as arguments.
|
|
///
|
|
/// NOTE: The C++ API will stop reading and clean up if an exception is thrown in the handler
|
|
/// function; by contrast, the Rust API makes no attempt to catch panics and currently
|
|
/// has no way of stopping reading once started.
|
|
pub fn read<F>(&mut self, handler: F) -> Result<usize>
|
|
where
|
|
F: FnMut(i32, &[u8]) -> (),
|
|
{
|
|
self.read_n(handler, usize::max_value())
|
|
}
|
|
|
|
/// Claim capacity for a specific message size in the ring buffer. Returns the offset/index
|
|
/// at which to start writing the next record.
|
|
fn claim_capacity(&mut self, required: IndexT) -> Result<IndexT> {
|
|
// QUESTION: Is this mask how we handle the "ring" in ring buffer?
|
|
// Would explain why we assert buffer capacity is a power of two during initialization
|
|
let mask: IndexT = self.capacity - 1;
|
|
|
|
// UNWRAP: Known-valid offset calculated during initialization
|
|
let mut head = self
|
|
.buffer
|
|
.get_i64_volatile(self.head_cache_position_index)
|
|
.unwrap();
|
|
|
|
let mut tail: i64;
|
|
let mut tail_index: IndexT;
|
|
let mut padding: IndexT;
|
|
// Note the braces, making this a do-while loop
|
|
while {
|
|
tail = self.buffer.get_i64_volatile(self.tail_position_index)?;
|
|
let available_capacity = self.capacity - (tail - head) as IndexT;
|
|
|
|
if required > available_capacity {
|
|
head = self.buffer.get_i64_volatile(self.head_position_index)?;
|
|
|
|
if required > (self.capacity - (tail - head) as IndexT) {
|
|
return Ok(INSUFFICIENT_CAPACITY);
|
|
}
|
|
|
|
self.buffer
|
|
.put_i64_ordered(self.head_cache_position_index, head)?;
|
|
}
|
|
|
|
padding = 0;
|
|
|
|
// Because we assume `tail` and `mask` are always positive integers,
|
|
// it's "safe" to widen the types and bitmask below. We're just trying
|
|
// to imitate C++ here.
|
|
tail_index = (tail & i64::from(mask)) as IndexT;
|
|
let to_buffer_end_length = self.capacity - tail_index;
|
|
|
|
if required > to_buffer_end_length {
|
|
let mut head_index = (head & i64::from(mask)) as IndexT;
|
|
|
|
if required > head_index {
|
|
head = self.buffer.get_i64_volatile(self.head_position_index)?;
|
|
head_index = (head & i64::from(mask)) as IndexT;
|
|
|
|
if required > head_index {
|
|
return Ok(INSUFFICIENT_CAPACITY);
|
|
}
|
|
|
|
self.buffer
|
|
.put_i64_ordered(self.head_cache_position_index, head)?;
|
|
}
|
|
|
|
padding = to_buffer_end_length;
|
|
}
|
|
|
|
!self.buffer.compare_and_set_i64(
|
|
self.tail_position_index,
|
|
tail,
|
|
tail + i64::from(required) + i64::from(padding),
|
|
)?
|
|
} {}
|
|
|
|
if padding != 0 {
|
|
self.buffer.put_i64_ordered(
|
|
tail_index,
|
|
record_descriptor::make_header(padding, record_descriptor::PADDING_MSG_TYPE_ID),
|
|
)?;
|
|
tail_index = 0;
|
|
}
|
|
|
|
Ok(tail_index)
|
|
}
|
|
|
|
fn check_msg_length(&self, length: IndexT) -> Result<()> {
|
|
if length > self.max_msg_length {
|
|
Err(AeronError::IllegalArgument)
|
|
} else {
|
|
Ok(())
|
|
}
|
|
}
|
|
|
|
/// Return the largest possible message size for this buffer
|
|
pub fn max_msg_length(&self) -> IndexT {
|
|
self.max_msg_length
|
|
}
|
|
}
|
|
|
|
impl<A> Deref for ManyToOneRingBuffer<A>
|
|
where
|
|
A: AtomicBuffer,
|
|
{
|
|
type Target = A;
|
|
|
|
fn deref(&self) -> &Self::Target {
|
|
&self.buffer
|
|
}
|
|
}
|
|
|
|
impl<A> DerefMut for ManyToOneRingBuffer<A>
|
|
where
|
|
A: AtomicBuffer,
|
|
{
|
|
fn deref_mut(&mut self) -> &mut Self::Target {
|
|
&mut self.buffer
|
|
}
|
|
}
|
|
|
|
#[cfg(test)]
|
|
mod tests {
|
|
use crate::client::concurrent::ringbuffer::ManyToOneRingBuffer;
|
|
use crate::client::concurrent::AtomicBuffer;
|
|
|
|
const BUFFER_SIZE: usize = 512 + super::buffer_descriptor::TRAILER_LENGTH as usize;
|
|
|
|
#[test]
|
|
fn claim_capacity_owned() {
|
|
let mut ring_buf = ManyToOneRingBuffer::new(vec![0u8; BUFFER_SIZE]).unwrap();
|
|
|
|
ring_buf.claim_capacity(16).unwrap();
|
|
assert_eq!(
|
|
ring_buf
|
|
.buffer
|
|
.get_i64_volatile(ring_buf.tail_position_index),
|
|
Ok(16)
|
|
);
|
|
|
|
let write_start = ring_buf.claim_capacity(16).unwrap();
|
|
assert_eq!(write_start, 16);
|
|
}
|
|
|
|
#[test]
|
|
fn claim_capacity_shared() {
|
|
let buf = &mut [0u8; BUFFER_SIZE][..];
|
|
let mut ring_buf = ManyToOneRingBuffer::new(buf).unwrap();
|
|
|
|
ring_buf.claim_capacity(16).unwrap();
|
|
assert_eq!(
|
|
ring_buf
|
|
.buffer
|
|
.get_i64_volatile(ring_buf.tail_position_index),
|
|
Ok(16)
|
|
);
|
|
|
|
let write_start = ring_buf.claim_capacity(16).unwrap();
|
|
assert_eq!(write_start, 16);
|
|
}
|
|
}
|