mirror of
https://github.com/bspeice/aeron-rs
synced 2024-12-22 05:48:10 -05:00
Merge #17
17: Aeron tests r=bspeice a=bspeice Port over tests for `ManyToOneRingBuffer` from Aeron C++ Co-authored-by: Bradlee Speice <bradlee@speice.io>
This commit is contained in:
commit
f6dedbe268
@ -62,6 +62,20 @@ pub trait AtomicBuffer: Deref<Target = [u8]> + DerefMut<Target = [u8]> {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Overlay a mutable value on the buffer.
|
||||||
|
///
|
||||||
|
/// NOTE: Has the potential to cause undefined behavior if alignment is incorrect
|
||||||
|
fn overlay_mut<T>(&mut self, offset: IndexT) -> Result<&mut T>
|
||||||
|
where
|
||||||
|
T: Sized,
|
||||||
|
{
|
||||||
|
self.bounds_check(offset, size_of::<T>() as IndexT)
|
||||||
|
.map(|_| {
|
||||||
|
let offset_ptr = unsafe { self.as_mut_ptr().offset(offset as isize) };
|
||||||
|
unsafe { &mut *(offset_ptr as *mut T) }
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
/// Overlay a struct on a buffer, and perform a volatile read
|
/// Overlay a struct on a buffer, and perform a volatile read
|
||||||
///
|
///
|
||||||
/// ```rust
|
/// ```rust
|
||||||
@ -155,6 +169,11 @@ pub trait AtomicBuffer: Deref<Target = [u8]> + DerefMut<Target = [u8]> {
|
|||||||
self.overlay_volatile::<i64>(offset)
|
self.overlay_volatile::<i64>(offset)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Read an `i64` value from the buffer without performing any synchronization
|
||||||
|
fn get_i64(&self, offset: IndexT) -> Result<i64> {
|
||||||
|
self.overlay::<i64>(offset).map(|i| *i)
|
||||||
|
}
|
||||||
|
|
||||||
/// Perform a volatile write of an `i64` value
|
/// Perform a volatile write of an `i64` value
|
||||||
///
|
///
|
||||||
/// ```rust
|
/// ```rust
|
||||||
@ -167,6 +186,18 @@ pub trait AtomicBuffer: Deref<Target = [u8]> + DerefMut<Target = [u8]> {
|
|||||||
self.write_volatile::<i64>(offset, value)
|
self.write_volatile::<i64>(offset, value)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Write an `i64` value into the buffer without performing any synchronization
|
||||||
|
///
|
||||||
|
/// ```rust
|
||||||
|
/// # use aeron_rs::client::concurrent::AtomicBuffer;
|
||||||
|
/// let mut buffer = vec![0u8; 8];
|
||||||
|
/// buffer.put_i64(0, 12);
|
||||||
|
/// assert_eq!(buffer.get_i64(0), Ok(12));
|
||||||
|
/// ```
|
||||||
|
fn put_i64(&mut self, offset: IndexT, value: i64) -> Result<()> {
|
||||||
|
self.overlay_mut::<i64>(offset).map(|i| *i = value)
|
||||||
|
}
|
||||||
|
|
||||||
/// Write the contents of one buffer to another. Does not perform any synchronization
|
/// Write the contents of one buffer to another. Does not perform any synchronization
|
||||||
fn put_bytes<B>(
|
fn put_bytes<B>(
|
||||||
&mut self,
|
&mut self,
|
||||||
@ -209,6 +240,11 @@ pub trait AtomicBuffer: Deref<Target = [u8]> + DerefMut<Target = [u8]> {
|
|||||||
self.overlay_volatile::<i32>(offset)
|
self.overlay_volatile::<i32>(offset)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Read an `i32` value from the buffer without performing any synchronization
|
||||||
|
fn get_i32(&self, offset: IndexT) -> Result<i32> {
|
||||||
|
self.overlay::<i32>(offset).map(|i| *i)
|
||||||
|
}
|
||||||
|
|
||||||
/// Perform a volatile write of an `i32` into the buffer
|
/// Perform a volatile write of an `i32` into the buffer
|
||||||
///
|
///
|
||||||
/// ```rust
|
/// ```rust
|
||||||
@ -220,6 +256,23 @@ pub trait AtomicBuffer: Deref<Target = [u8]> + DerefMut<Target = [u8]> {
|
|||||||
fn put_i32_ordered(&mut self, offset: IndexT, value: i32) -> Result<()> {
|
fn put_i32_ordered(&mut self, offset: IndexT, value: i32) -> Result<()> {
|
||||||
self.write_volatile::<i32>(offset, value)
|
self.write_volatile::<i32>(offset, value)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Write an `i32` value into the buffer without performing any synchronization
|
||||||
|
///
|
||||||
|
/// ```rust
|
||||||
|
/// # use aeron_rs::client::concurrent::AtomicBuffer;
|
||||||
|
/// let mut buffer = vec![0u8; 5];
|
||||||
|
/// buffer.put_i32(0, 255 + 1);
|
||||||
|
/// assert_eq!(buffer.get_i32(1), Ok(1));
|
||||||
|
/// ```
|
||||||
|
fn put_i32(&mut self, offset: IndexT, value: i32) -> Result<()> {
|
||||||
|
self.overlay_mut::<i32>(offset).map(|i| *i = value)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Return the total number of bytes in this buffer
|
||||||
|
fn capacity(&self) -> IndexT {
|
||||||
|
self.len() as IndexT
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl AtomicBuffer for Vec<u8> {}
|
impl AtomicBuffer for Vec<u8> {}
|
||||||
|
@ -2,7 +2,7 @@
|
|||||||
use crate::client::concurrent::AtomicBuffer;
|
use crate::client::concurrent::AtomicBuffer;
|
||||||
use crate::util::bit::align;
|
use crate::util::bit::align;
|
||||||
use crate::util::{bit, AeronError, IndexT, Result};
|
use crate::util::{bit, AeronError, IndexT, Result};
|
||||||
use std::ops::Deref;
|
use std::ops::{Deref, DerefMut};
|
||||||
|
|
||||||
/// Description of the Ring Buffer schema.
|
/// Description of the Ring Buffer schema.
|
||||||
pub mod buffer_descriptor {
|
pub mod buffer_descriptor {
|
||||||
@ -97,10 +97,16 @@ pub mod record_descriptor {
|
|||||||
record_offset + HEADER_LENGTH
|
record_offset + HEADER_LENGTH
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(super) fn length_offset(record_offset: IndexT) -> IndexT {
|
/// Return the position of the record length field given a record's starting position
|
||||||
|
pub fn length_offset(record_offset: IndexT) -> IndexT {
|
||||||
record_offset
|
record_offset
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Return the position of the record message type field given a record's starting position
|
||||||
|
pub fn type_offset(record_offset: IndexT) -> IndexT {
|
||||||
|
record_offset + size_of::<i32>() as IndexT
|
||||||
|
}
|
||||||
|
|
||||||
pub(super) fn record_length(header: i64) -> i32 {
|
pub(super) fn record_length(header: i64) -> i32 {
|
||||||
header as i32
|
header as i32
|
||||||
}
|
}
|
||||||
@ -110,6 +116,8 @@ pub mod record_descriptor {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const INSUFFICIENT_CAPACITY: IndexT = -2;
|
||||||
|
|
||||||
/// Multi-producer, single-consumer ring buffer implementation.
|
/// Multi-producer, single-consumer ring buffer implementation.
|
||||||
pub struct ManyToOneRingBuffer<A>
|
pub struct ManyToOneRingBuffer<A>
|
||||||
where
|
where
|
||||||
@ -150,6 +158,11 @@ where
|
|||||||
.unwrap()
|
.unwrap()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Return the total number of bytes in this buffer
|
||||||
|
pub fn capacity(&self) -> IndexT {
|
||||||
|
self.capacity
|
||||||
|
}
|
||||||
|
|
||||||
/// Write a message into the ring buffer
|
/// Write a message into the ring buffer
|
||||||
pub fn write<B>(
|
pub fn write<B>(
|
||||||
&mut self,
|
&mut self,
|
||||||
@ -157,7 +170,7 @@ where
|
|||||||
source: &B,
|
source: &B,
|
||||||
source_index: IndexT,
|
source_index: IndexT,
|
||||||
length: IndexT,
|
length: IndexT,
|
||||||
) -> Result<()>
|
) -> Result<bool>
|
||||||
where
|
where
|
||||||
B: AtomicBuffer,
|
B: AtomicBuffer,
|
||||||
{
|
{
|
||||||
@ -168,6 +181,10 @@ where
|
|||||||
let required = bit::align(record_len as usize, record_descriptor::ALIGNMENT as usize);
|
let required = bit::align(record_len as usize, record_descriptor::ALIGNMENT as usize);
|
||||||
let record_index = self.claim_capacity(required as IndexT)?;
|
let record_index = self.claim_capacity(required as IndexT)?;
|
||||||
|
|
||||||
|
if record_index == INSUFFICIENT_CAPACITY {
|
||||||
|
return Ok(false);
|
||||||
|
}
|
||||||
|
|
||||||
// UNWRAP: `claim_capacity` performed bounds checking
|
// UNWRAP: `claim_capacity` performed bounds checking
|
||||||
self.buffer
|
self.buffer
|
||||||
.put_i64_ordered(
|
.put_i64_ordered(
|
||||||
@ -189,20 +206,22 @@ where
|
|||||||
.put_i32_ordered(record_descriptor::length_offset(record_index), record_len)
|
.put_i32_ordered(record_descriptor::length_offset(record_index), record_len)
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
Ok(())
|
Ok(true)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Read messages from the ring buffer and dispatch to `handler`, up to `message_count_limit`
|
/// Read messages from the ring buffer and dispatch to `handler`, up to `message_count_limit`
|
||||||
pub fn read<F>(&mut self, mut handler: F, message_count_limit: usize) -> Result<usize>
|
///
|
||||||
|
/// NOTE: The C++ API will stop reading and clean up if an exception is thrown in the handler
|
||||||
|
/// function; by contrast, the Rust API makes no attempt to catch panics and currently
|
||||||
|
/// has no way of stopping reading once started.
|
||||||
|
// QUESTION: Is there a better way to handle dispatching the handler function?
|
||||||
|
// We can't give it a `&dyn AtomicBuffer` because of the monomorphized generic functions,
|
||||||
|
// don't know if having a separate handler trait would be helpful.
|
||||||
|
pub fn read_n<F>(&mut self, mut handler: F, message_count_limit: usize) -> Result<usize>
|
||||||
where
|
where
|
||||||
F: FnMut(i32, &A, IndexT, IndexT) -> (),
|
F: FnMut(i32, &A, IndexT, IndexT) -> (),
|
||||||
{
|
{
|
||||||
// QUESTION: Should I implement the `get_i64` method that C++ uses?
|
let head = self.buffer.get_i64(self.head_position_index)?;
|
||||||
// UNWRAP: Bounds check performed during buffer creation
|
|
||||||
let head = self
|
|
||||||
.buffer
|
|
||||||
.get_i64_volatile(self.head_position_index)
|
|
||||||
.unwrap();
|
|
||||||
let head_index = (head & i64::from(self.capacity - 1)) as i32;
|
let head_index = (head & i64::from(self.capacity - 1)) as i32;
|
||||||
let contiguous_block_length = self.capacity - head_index;
|
let contiguous_block_length = self.capacity - head_index;
|
||||||
let mut messages_read = 0;
|
let mut messages_read = 0;
|
||||||
@ -245,6 +264,9 @@ where
|
|||||||
// in Rust (since the main operation also needs mutable access to self).
|
// in Rust (since the main operation also needs mutable access to self).
|
||||||
let mut cleanup = || {
|
let mut cleanup = || {
|
||||||
if bytes_read != 0 {
|
if bytes_read != 0 {
|
||||||
|
// UNWRAP: Need to justify this one.
|
||||||
|
// Should be safe because we've already done length checks, but I want
|
||||||
|
// to spend some more time thinking about it.
|
||||||
self.buffer
|
self.buffer
|
||||||
.set_memory(head_index, bytes_read as usize, 0)
|
.set_memory(head_index, bytes_read as usize, 0)
|
||||||
.unwrap();
|
.unwrap();
|
||||||
@ -261,12 +283,24 @@ where
|
|||||||
Ok(messages_read)
|
Ok(messages_read)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Read messages from the ring buffer and dispatch to `handler`, up to `message_count_limit`
|
||||||
|
///
|
||||||
|
/// NOTE: The C++ API will stop reading and clean up if an exception is thrown in the handler
|
||||||
|
/// function; by contrast, the Rust API makes no attempt to catch panics and currently
|
||||||
|
/// has no way of stopping reading once started.
|
||||||
|
pub fn read<F>(&mut self, handler: F) -> Result<usize>
|
||||||
|
where
|
||||||
|
F: FnMut(i32, &A, IndexT, IndexT) -> (),
|
||||||
|
{
|
||||||
|
self.read_n(handler, usize::max_value())
|
||||||
|
}
|
||||||
|
|
||||||
/// Claim capacity for a specific message size in the ring buffer. Returns the offset/index
|
/// Claim capacity for a specific message size in the ring buffer. Returns the offset/index
|
||||||
/// at which to start writing the next record.
|
/// at which to start writing the next record.
|
||||||
fn claim_capacity(&mut self, required: IndexT) -> Result<IndexT> {
|
fn claim_capacity(&mut self, required: IndexT) -> Result<IndexT> {
|
||||||
// QUESTION: Is this mask how we handle the "ring" in ring buffer?
|
// QUESTION: Is this mask how we handle the "ring" in ring buffer?
|
||||||
// Would explain why we assert buffer capacity is a power of two during initialization
|
// Would explain why we assert buffer capacity is a power of two during initialization
|
||||||
let mask = self.capacity - 1;
|
let mask: IndexT = self.capacity - 1;
|
||||||
|
|
||||||
// UNWRAP: Known-valid offset calculated during initialization
|
// UNWRAP: Known-valid offset calculated during initialization
|
||||||
let mut head = self
|
let mut head = self
|
||||||
@ -279,28 +313,18 @@ where
|
|||||||
let mut padding: IndexT;
|
let mut padding: IndexT;
|
||||||
// Note the braces, making this a do-while loop
|
// Note the braces, making this a do-while loop
|
||||||
while {
|
while {
|
||||||
// UNWRAP: Known-valid offset calculated during initialization
|
tail = self.buffer.get_i64_volatile(self.tail_position_index)?;
|
||||||
tail = self
|
|
||||||
.buffer
|
|
||||||
.get_i64_volatile(self.tail_position_index)
|
|
||||||
.unwrap();
|
|
||||||
let available_capacity = self.capacity - (tail - head) as IndexT;
|
let available_capacity = self.capacity - (tail - head) as IndexT;
|
||||||
|
|
||||||
if required > available_capacity {
|
if required > available_capacity {
|
||||||
// UNWRAP: Known-valid offset calculated during initialization
|
head = self.buffer.get_i64_volatile(self.head_position_index)?;
|
||||||
head = self
|
|
||||||
.buffer
|
|
||||||
.get_i64_volatile(self.head_position_index)
|
|
||||||
.unwrap();
|
|
||||||
|
|
||||||
if required > (self.capacity - (tail - head) as IndexT) {
|
if required > (self.capacity - (tail - head) as IndexT) {
|
||||||
return Err(AeronError::InsufficientCapacity);
|
return Ok(INSUFFICIENT_CAPACITY);
|
||||||
}
|
}
|
||||||
|
|
||||||
// UNWRAP: Known-valid offset calculated during initialization
|
|
||||||
self.buffer
|
self.buffer
|
||||||
.put_i64_ordered(self.head_cache_position_index, head)
|
.put_i64_ordered(self.head_cache_position_index, head)?;
|
||||||
.unwrap();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
padding = 0;
|
padding = 0;
|
||||||
@ -315,45 +339,32 @@ where
|
|||||||
let mut head_index = (head & i64::from(mask)) as IndexT;
|
let mut head_index = (head & i64::from(mask)) as IndexT;
|
||||||
|
|
||||||
if required > head_index {
|
if required > head_index {
|
||||||
// UNWRAP: Known-valid offset calculated during initialization
|
head = self.buffer.get_i64_volatile(self.head_position_index)?;
|
||||||
head = self
|
|
||||||
.buffer
|
|
||||||
.get_i64_volatile(self.head_position_index)
|
|
||||||
.unwrap();
|
|
||||||
head_index = (head & i64::from(mask)) as IndexT;
|
head_index = (head & i64::from(mask)) as IndexT;
|
||||||
|
|
||||||
if required > head_index {
|
if required > head_index {
|
||||||
return Err(AeronError::InsufficientCapacity);
|
return Ok(INSUFFICIENT_CAPACITY);
|
||||||
}
|
}
|
||||||
|
|
||||||
// UNWRAP: Known-valid offset calculated during initialization
|
|
||||||
self.buffer
|
self.buffer
|
||||||
.put_i64_ordered(self.head_cache_position_index, head)
|
.put_i64_ordered(self.head_cache_position_index, head)?;
|
||||||
.unwrap();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
padding = to_buffer_end_length;
|
padding = to_buffer_end_length;
|
||||||
}
|
}
|
||||||
|
|
||||||
// UNWRAP: Known-valid offset calculated during initialization
|
!self.buffer.compare_and_set_i64(
|
||||||
!self
|
|
||||||
.buffer
|
|
||||||
.compare_and_set_i64(
|
|
||||||
self.tail_position_index,
|
self.tail_position_index,
|
||||||
tail,
|
tail,
|
||||||
tail + i64::from(required) + i64::from(padding),
|
tail + i64::from(required) + i64::from(padding),
|
||||||
)
|
)?
|
||||||
.unwrap()
|
|
||||||
} {}
|
} {}
|
||||||
|
|
||||||
if padding != 0 {
|
if padding != 0 {
|
||||||
// UNWRAP: Known-valid offset calculated during initialization
|
self.buffer.put_i64_ordered(
|
||||||
self.buffer
|
|
||||||
.put_i64_ordered(
|
|
||||||
tail_index,
|
tail_index,
|
||||||
record_descriptor::make_header(padding, record_descriptor::PADDING_MSG_TYPE_ID),
|
record_descriptor::make_header(padding, record_descriptor::PADDING_MSG_TYPE_ID),
|
||||||
)
|
)?;
|
||||||
.unwrap();
|
|
||||||
tail_index = 0;
|
tail_index = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -367,6 +378,11 @@ where
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Return the largest possible message size for this buffer
|
||||||
|
pub fn max_msg_length(&self) -> IndexT {
|
||||||
|
self.max_msg_length
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<A> Deref for ManyToOneRingBuffer<A>
|
impl<A> Deref for ManyToOneRingBuffer<A>
|
||||||
@ -380,12 +396,19 @@ where
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl<A> DerefMut for ManyToOneRingBuffer<A>
|
||||||
|
where
|
||||||
|
A: AtomicBuffer,
|
||||||
|
{
|
||||||
|
fn deref_mut(&mut self) -> &mut Self::Target {
|
||||||
|
&mut self.buffer
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
use crate::client::concurrent::ringbuffer::{record_descriptor, ManyToOneRingBuffer};
|
use crate::client::concurrent::ringbuffer::ManyToOneRingBuffer;
|
||||||
use crate::client::concurrent::AtomicBuffer;
|
use crate::client::concurrent::AtomicBuffer;
|
||||||
use crate::util::IndexT;
|
|
||||||
use std::mem::size_of;
|
|
||||||
|
|
||||||
const BUFFER_SIZE: usize = 512 + super::buffer_descriptor::TRAILER_LENGTH as usize;
|
const BUFFER_SIZE: usize = 512 + super::buffer_descriptor::TRAILER_LENGTH as usize;
|
||||||
|
|
||||||
@ -421,81 +444,4 @@ mod tests {
|
|||||||
let write_start = ring_buf.claim_capacity(16).unwrap();
|
let write_start = ring_buf.claim_capacity(16).unwrap();
|
||||||
assert_eq!(write_start, 16);
|
assert_eq!(write_start, 16);
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn write_basic() {
|
|
||||||
let mut ring_buffer =
|
|
||||||
ManyToOneRingBuffer::new(vec![0u8; BUFFER_SIZE]).expect("Invalid buffer size");
|
|
||||||
|
|
||||||
let source_bytes = &mut [12u8, 0, 0, 0][..];
|
|
||||||
let source_len = source_bytes.len() as IndexT;
|
|
||||||
let type_id = 1;
|
|
||||||
ring_buffer
|
|
||||||
.write(type_id, &source_bytes, 0, source_len)
|
|
||||||
.unwrap();
|
|
||||||
|
|
||||||
let record_len = source_len + record_descriptor::HEADER_LENGTH;
|
|
||||||
assert_eq!(
|
|
||||||
ring_buffer.get_i64_volatile(0).unwrap(),
|
|
||||||
record_descriptor::make_header(record_len, type_id)
|
|
||||||
);
|
|
||||||
assert_eq!(
|
|
||||||
ring_buffer
|
|
||||||
.get_i64_volatile(size_of::<i64>() as IndexT)
|
|
||||||
.unwrap(),
|
|
||||||
12
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn read_basic() {
|
|
||||||
// Similar to write basic, put something into the buffer
|
|
||||||
let mut ring_buffer =
|
|
||||||
ManyToOneRingBuffer::new(vec![0u8; BUFFER_SIZE]).expect("Invalid buffer size");
|
|
||||||
|
|
||||||
let mut source_buffer = &mut [12u8, 0, 0, 0, 0, 0, 0, 0][..];
|
|
||||||
let source_len = source_buffer.len() as IndexT;
|
|
||||||
let type_id = 1;
|
|
||||||
ring_buffer
|
|
||||||
.write(type_id, &source_buffer, 0, source_len)
|
|
||||||
.unwrap();
|
|
||||||
|
|
||||||
// Now we can start the actual read process
|
|
||||||
let c = |_, buf: &Vec<u8>, offset, _| assert_eq!(buf.get_i64_volatile(offset).unwrap(), 12);
|
|
||||||
ring_buffer.read(c, 1).unwrap();
|
|
||||||
|
|
||||||
// Make sure that the buffer was zeroed on finish
|
|
||||||
for i in (0..record_descriptor::ALIGNMENT * 1).step_by(4) {
|
|
||||||
assert_eq!(ring_buffer.get_i32_volatile(i).unwrap(), 0);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn read_multiple() {
|
|
||||||
let mut ring_buffer =
|
|
||||||
ManyToOneRingBuffer::new(vec![0u8; BUFFER_SIZE]).expect("Invalid buffer size");
|
|
||||||
|
|
||||||
let mut source_buffer = &mut [12u8, 0, 0, 0, 0, 0, 0, 0][..];
|
|
||||||
let source_len = source_buffer.len() as IndexT;
|
|
||||||
let type_id = 1;
|
|
||||||
ring_buffer
|
|
||||||
.write(type_id, &source_buffer, 0, source_len)
|
|
||||||
.unwrap();
|
|
||||||
ring_buffer
|
|
||||||
.write(type_id, &source_buffer, 0, source_len)
|
|
||||||
.unwrap();
|
|
||||||
|
|
||||||
let mut msg_count = 0;
|
|
||||||
let c = |_, buf: &Vec<u8>, offset, _| {
|
|
||||||
msg_count += 1;
|
|
||||||
assert_eq!(buf.get_i64_volatile(offset).unwrap(), 12);
|
|
||||||
};
|
|
||||||
ring_buffer.read(c, 2).unwrap();
|
|
||||||
assert_eq!(msg_count, 2);
|
|
||||||
|
|
||||||
// Make sure that the buffer was zeroed on finish
|
|
||||||
for i in (0..record_descriptor::ALIGNMENT * 2).step_by(4) {
|
|
||||||
assert_eq!(ring_buffer.get_i32_volatile(i).unwrap(), 0);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
381
aeron-rs/tests/many_to_one_ring_buffer.rs
Normal file
381
aeron-rs/tests/many_to_one_ring_buffer.rs
Normal file
@ -0,0 +1,381 @@
|
|||||||
|
/// Tests based on the C++ tests included with Aeron
|
||||||
|
use aeron_rs::client::concurrent::ringbuffer::{
|
||||||
|
buffer_descriptor, record_descriptor, ManyToOneRingBuffer,
|
||||||
|
};
|
||||||
|
use aeron_rs::client::concurrent::AtomicBuffer;
|
||||||
|
use aeron_rs::util::bit::align;
|
||||||
|
use aeron_rs::util::IndexT;
|
||||||
|
use std::ops::Deref;
|
||||||
|
|
||||||
|
const CAPACITY: usize = 1024;
|
||||||
|
const BUFFER_SZ: usize = CAPACITY + buffer_descriptor::TRAILER_LENGTH as usize;
|
||||||
|
const ODD_BUFFER_SZ: usize = (CAPACITY - 1) + buffer_descriptor::TRAILER_LENGTH as usize;
|
||||||
|
|
||||||
|
const MSG_TYPE_ID: i32 = 101;
|
||||||
|
const HEAD_COUNTER_INDEX: IndexT = 1024 as IndexT + buffer_descriptor::HEAD_POSITION_OFFSET;
|
||||||
|
const TAIL_COUNTER_INDEX: IndexT = 1024 as IndexT + buffer_descriptor::TAIL_POSITION_OFFSET;
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn should_calculate_capacity_for_buffer() {
|
||||||
|
let buffer = ManyToOneRingBuffer::new(vec![0u8; BUFFER_SZ]).unwrap();
|
||||||
|
|
||||||
|
assert_eq!(AtomicBuffer::capacity(buffer.deref()), BUFFER_SZ as IndexT);
|
||||||
|
assert_eq!(buffer.capacity(), CAPACITY as IndexT);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn should_throw_for_capacity_not_power_of_two() {
|
||||||
|
let buffer = ManyToOneRingBuffer::new(vec![0u8; ODD_BUFFER_SZ]);
|
||||||
|
|
||||||
|
assert!(buffer.is_err());
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn should_throw_when_max_message_size_exceeded() {
|
||||||
|
let mut buffer = ManyToOneRingBuffer::new(vec![0u8; BUFFER_SZ]).unwrap();
|
||||||
|
|
||||||
|
let bytes = vec![0u8; buffer.max_msg_length() as usize + 1];
|
||||||
|
let write_res = buffer.write(MSG_TYPE_ID, &bytes, 0, bytes.len() as IndexT);
|
||||||
|
|
||||||
|
assert!(write_res.is_err());
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn should_write_to_empty_buffer() {
|
||||||
|
let tail: IndexT = 0;
|
||||||
|
let tail_index: IndexT = 0;
|
||||||
|
let length: IndexT = 8;
|
||||||
|
let record_length: IndexT = length + record_descriptor::HEADER_LENGTH;
|
||||||
|
let src_index: IndexT = 0;
|
||||||
|
let aligned_record_length: IndexT = align(
|
||||||
|
record_length as usize,
|
||||||
|
record_descriptor::ALIGNMENT as usize,
|
||||||
|
) as IndexT;
|
||||||
|
|
||||||
|
let mut buffer = ManyToOneRingBuffer::new(vec![0u8; BUFFER_SZ]).unwrap();
|
||||||
|
let src_bytes = vec![0u8; BUFFER_SZ];
|
||||||
|
|
||||||
|
assert!(buffer
|
||||||
|
.write(MSG_TYPE_ID, &src_bytes, src_index, length)
|
||||||
|
.unwrap());
|
||||||
|
|
||||||
|
assert_eq!(
|
||||||
|
buffer.get_i32(record_descriptor::length_offset(tail_index)),
|
||||||
|
Ok(record_length)
|
||||||
|
);
|
||||||
|
assert_eq!(
|
||||||
|
buffer.get_i32(record_descriptor::type_offset(tail_index)),
|
||||||
|
Ok(MSG_TYPE_ID)
|
||||||
|
);
|
||||||
|
assert_eq!(
|
||||||
|
buffer.get_i64(TAIL_COUNTER_INDEX),
|
||||||
|
Ok((tail + aligned_record_length) as i64)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn should_reject_write_when_insufficient_space() {
|
||||||
|
let length: IndexT = 100;
|
||||||
|
let head: IndexT = 0;
|
||||||
|
let tail: IndexT = head
|
||||||
|
+ (CAPACITY
|
||||||
|
- align(
|
||||||
|
(length - record_descriptor::ALIGNMENT) as usize,
|
||||||
|
record_descriptor::ALIGNMENT as usize,
|
||||||
|
)) as IndexT;
|
||||||
|
let src_index: IndexT = 0;
|
||||||
|
|
||||||
|
let mut buffer = ManyToOneRingBuffer::new(vec![0u8; BUFFER_SZ]).unwrap();
|
||||||
|
buffer.put_i64(HEAD_COUNTER_INDEX, head as i64).unwrap();
|
||||||
|
buffer.put_i64(TAIL_COUNTER_INDEX, tail as i64).unwrap();
|
||||||
|
|
||||||
|
let src_bytes = vec![0u8; BUFFER_SZ];
|
||||||
|
let write_res = buffer.write(MSG_TYPE_ID, &src_bytes, src_index, length);
|
||||||
|
|
||||||
|
assert_eq!(write_res, Ok(false));
|
||||||
|
assert_eq!(buffer.get_i64(TAIL_COUNTER_INDEX), Ok(tail as i64));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn should_reject_write_when_buffer_full() {
|
||||||
|
let length: IndexT = 8;
|
||||||
|
let head: IndexT = 0;
|
||||||
|
let tail: IndexT = head + CAPACITY as IndexT;
|
||||||
|
let src_index: IndexT = 0;
|
||||||
|
|
||||||
|
let mut buffer = ManyToOneRingBuffer::new(vec![0u8; BUFFER_SZ]).unwrap();
|
||||||
|
buffer.put_i64(HEAD_COUNTER_INDEX, head as i64).unwrap();
|
||||||
|
buffer.put_i64(TAIL_COUNTER_INDEX, tail as i64).unwrap();
|
||||||
|
|
||||||
|
let src_bytes = vec![0u8; BUFFER_SZ];
|
||||||
|
let write_res = buffer.write(MSG_TYPE_ID, &src_bytes, src_index, length);
|
||||||
|
assert_eq!(write_res, Ok(false));
|
||||||
|
assert_eq!(buffer.get_i64(TAIL_COUNTER_INDEX), Ok(tail as i64));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn should_insert_padding_record_plus_message_on_buffer_wrap() {
|
||||||
|
let length: IndexT = 100;
|
||||||
|
let record_length: IndexT = length + record_descriptor::HEADER_LENGTH;
|
||||||
|
let aligned_record_length = align(
|
||||||
|
record_length as usize,
|
||||||
|
record_descriptor::ALIGNMENT as usize,
|
||||||
|
) as IndexT;
|
||||||
|
let tail: IndexT = CAPACITY as IndexT - record_descriptor::ALIGNMENT;
|
||||||
|
let head: IndexT = tail - (record_descriptor::ALIGNMENT * 4);
|
||||||
|
let src_index: IndexT = 0;
|
||||||
|
|
||||||
|
let mut buffer = ManyToOneRingBuffer::new(vec![0u8; BUFFER_SZ]).unwrap();
|
||||||
|
buffer.put_i64(HEAD_COUNTER_INDEX, head as i64).unwrap();
|
||||||
|
buffer.put_i64(TAIL_COUNTER_INDEX, tail as i64).unwrap();
|
||||||
|
|
||||||
|
let src_bytes = vec![0u8; BUFFER_SZ];
|
||||||
|
let write_res = buffer.write(MSG_TYPE_ID, &src_bytes, src_index, length);
|
||||||
|
assert_eq!(write_res, Ok(true));
|
||||||
|
|
||||||
|
assert_eq!(
|
||||||
|
buffer.get_i32(record_descriptor::type_offset(tail)),
|
||||||
|
Ok(record_descriptor::PADDING_MSG_TYPE_ID)
|
||||||
|
);
|
||||||
|
assert_eq!(
|
||||||
|
buffer.get_i32(record_descriptor::length_offset(tail)),
|
||||||
|
Ok(record_descriptor::ALIGNMENT)
|
||||||
|
);
|
||||||
|
|
||||||
|
assert_eq!(
|
||||||
|
buffer.get_i32(record_descriptor::length_offset(0)),
|
||||||
|
Ok(record_length)
|
||||||
|
);
|
||||||
|
assert_eq!(
|
||||||
|
buffer.get_i32(record_descriptor::type_offset(0)),
|
||||||
|
Ok(MSG_TYPE_ID)
|
||||||
|
);
|
||||||
|
assert_eq!(
|
||||||
|
buffer.get_i64(TAIL_COUNTER_INDEX),
|
||||||
|
Ok((tail + aligned_record_length + record_descriptor::ALIGNMENT) as i64)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn should_insert_padding_record_plus_message_on_buffer_wrap_with_head_equal_to_tail() {
|
||||||
|
let length: IndexT = 100;
|
||||||
|
let record_length: IndexT = length + record_descriptor::HEADER_LENGTH;
|
||||||
|
let aligned_record_length: IndexT = align(
|
||||||
|
record_length as usize,
|
||||||
|
record_descriptor::ALIGNMENT as usize,
|
||||||
|
) as IndexT;
|
||||||
|
let tail: IndexT = CAPACITY as IndexT - record_descriptor::ALIGNMENT;
|
||||||
|
let head: IndexT = tail;
|
||||||
|
let src_index: IndexT = 0;
|
||||||
|
|
||||||
|
let mut buffer = ManyToOneRingBuffer::new(vec![0u8; BUFFER_SZ]).unwrap();
|
||||||
|
buffer.put_i64(HEAD_COUNTER_INDEX, head as i64).unwrap();
|
||||||
|
buffer.put_i64(TAIL_COUNTER_INDEX, tail as i64).unwrap();
|
||||||
|
|
||||||
|
let src_bytes = vec![0u8; BUFFER_SZ];
|
||||||
|
let write_res = buffer.write(MSG_TYPE_ID, &src_bytes, src_index, length);
|
||||||
|
assert_eq!(write_res, Ok(true));
|
||||||
|
|
||||||
|
assert_eq!(
|
||||||
|
buffer.get_i32(record_descriptor::type_offset(tail)),
|
||||||
|
Ok(record_descriptor::PADDING_MSG_TYPE_ID)
|
||||||
|
);
|
||||||
|
assert_eq!(
|
||||||
|
buffer.get_i32(record_descriptor::length_offset(tail)),
|
||||||
|
Ok(record_descriptor::ALIGNMENT)
|
||||||
|
);
|
||||||
|
|
||||||
|
assert_eq!(
|
||||||
|
buffer.get_i32(record_descriptor::length_offset(0)),
|
||||||
|
Ok(record_length)
|
||||||
|
);
|
||||||
|
assert_eq!(
|
||||||
|
buffer.get_i32(record_descriptor::type_offset(0)),
|
||||||
|
Ok(MSG_TYPE_ID)
|
||||||
|
);
|
||||||
|
assert_eq!(
|
||||||
|
buffer.get_i64(TAIL_COUNTER_INDEX),
|
||||||
|
Ok((tail + aligned_record_length + record_descriptor::ALIGNMENT) as i64)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn should_read_single_message() {
|
||||||
|
let length: IndexT = 8;
|
||||||
|
let head: IndexT = 0;
|
||||||
|
let record_length: IndexT = length + record_descriptor::HEADER_LENGTH;
|
||||||
|
let aligned_record_length: IndexT = align(
|
||||||
|
record_length as usize,
|
||||||
|
record_descriptor::ALIGNMENT as usize,
|
||||||
|
) as IndexT;
|
||||||
|
let tail: IndexT = aligned_record_length;
|
||||||
|
|
||||||
|
let mut buffer = ManyToOneRingBuffer::new(vec![0u8; BUFFER_SZ]).unwrap();
|
||||||
|
buffer.put_i64(HEAD_COUNTER_INDEX, head as i64).unwrap();
|
||||||
|
buffer.put_i64(TAIL_COUNTER_INDEX, tail as i64).unwrap();
|
||||||
|
|
||||||
|
buffer
|
||||||
|
.put_i32(record_descriptor::type_offset(0), MSG_TYPE_ID)
|
||||||
|
.unwrap();
|
||||||
|
buffer
|
||||||
|
.put_i32(record_descriptor::length_offset(0), record_length)
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
let mut times_called = 0;
|
||||||
|
let closure = |_, _buf: &Vec<u8>, _, _| {
|
||||||
|
times_called += 1;
|
||||||
|
};
|
||||||
|
let messages_read = buffer.read(closure);
|
||||||
|
|
||||||
|
assert_eq!(messages_read, Ok(1));
|
||||||
|
assert_eq!(times_called, 1);
|
||||||
|
assert_eq!(
|
||||||
|
buffer.get_i64(HEAD_COUNTER_INDEX),
|
||||||
|
Ok((head + aligned_record_length) as i64)
|
||||||
|
);
|
||||||
|
|
||||||
|
for i in (0..record_descriptor::ALIGNMENT).step_by(4) {
|
||||||
|
assert_eq!(buffer.get_i32(i), Ok(0));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn should_not_read_single_message_part_way_through_writing() {
|
||||||
|
let length: IndexT = 8;
|
||||||
|
let head: IndexT = 0;
|
||||||
|
let record_length: IndexT = length + record_descriptor::HEADER_LENGTH;
|
||||||
|
let aligned_record_length: IndexT = align(
|
||||||
|
record_length as usize,
|
||||||
|
record_descriptor::ALIGNMENT as usize,
|
||||||
|
) as IndexT;
|
||||||
|
let end_tail: IndexT = aligned_record_length;
|
||||||
|
|
||||||
|
let mut buffer = ManyToOneRingBuffer::new(vec![0u8; BUFFER_SZ]).unwrap();
|
||||||
|
buffer.put_i64(TAIL_COUNTER_INDEX, end_tail as i64).unwrap();
|
||||||
|
buffer
|
||||||
|
.put_i32(record_descriptor::type_offset(0), MSG_TYPE_ID)
|
||||||
|
.unwrap();
|
||||||
|
buffer
|
||||||
|
.put_i32(record_descriptor::length_offset(0), -record_length)
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
let mut times_called = 0;
|
||||||
|
let closure = |_, _buf: &Vec<u8>, _, _| {
|
||||||
|
times_called += 1;
|
||||||
|
};
|
||||||
|
let messages_read = buffer.read(closure);
|
||||||
|
|
||||||
|
assert_eq!(messages_read, Ok(0));
|
||||||
|
assert_eq!(times_called, 0);
|
||||||
|
assert_eq!(buffer.get_i64(HEAD_COUNTER_INDEX), Ok(head as i64));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn should_read_two_messages() {
|
||||||
|
let length: IndexT = 8;
|
||||||
|
let head: IndexT = 0;
|
||||||
|
let record_length: IndexT = length + record_descriptor::HEADER_LENGTH;
|
||||||
|
let aligned_record_length: IndexT = align(
|
||||||
|
record_length as usize,
|
||||||
|
record_descriptor::ALIGNMENT as usize,
|
||||||
|
) as IndexT;
|
||||||
|
let tail: IndexT = aligned_record_length * 2;
|
||||||
|
|
||||||
|
let mut buffer = ManyToOneRingBuffer::new(vec![0u8; BUFFER_SZ]).unwrap();
|
||||||
|
buffer.put_i64(HEAD_COUNTER_INDEX, head as i64).unwrap();
|
||||||
|
buffer.put_i64(TAIL_COUNTER_INDEX, tail as i64).unwrap();
|
||||||
|
|
||||||
|
buffer
|
||||||
|
.put_i32(record_descriptor::type_offset(0), MSG_TYPE_ID)
|
||||||
|
.unwrap();
|
||||||
|
buffer
|
||||||
|
.put_i32(record_descriptor::length_offset(0), record_length)
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
buffer
|
||||||
|
.put_i32(
|
||||||
|
record_descriptor::type_offset(0 + aligned_record_length),
|
||||||
|
MSG_TYPE_ID,
|
||||||
|
)
|
||||||
|
.unwrap();
|
||||||
|
buffer
|
||||||
|
.put_i32(
|
||||||
|
record_descriptor::length_offset(0 + aligned_record_length),
|
||||||
|
record_length,
|
||||||
|
)
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
let mut times_called = 0;
|
||||||
|
let closure = |_, _buf: &Vec<u8>, _, _| {
|
||||||
|
times_called += 1;
|
||||||
|
};
|
||||||
|
let messages_read = buffer.read(closure);
|
||||||
|
|
||||||
|
assert_eq!(messages_read, Ok(2));
|
||||||
|
assert_eq!(times_called, 2);
|
||||||
|
assert_eq!(
|
||||||
|
buffer.get_i64(HEAD_COUNTER_INDEX),
|
||||||
|
Ok((head + aligned_record_length * 2) as i64)
|
||||||
|
);
|
||||||
|
|
||||||
|
for i in (0..record_descriptor::ALIGNMENT * 2).step_by(4) {
|
||||||
|
assert_eq!(buffer.get_i32(i), Ok(0));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn should_limit_read_of_messages() {
|
||||||
|
let length: IndexT = 8;
|
||||||
|
let head: IndexT = 0;
|
||||||
|
let record_length: IndexT = length + record_descriptor::HEADER_LENGTH;
|
||||||
|
let aligned_record_length: IndexT = align(
|
||||||
|
record_length as usize,
|
||||||
|
record_descriptor::ALIGNMENT as usize,
|
||||||
|
) as IndexT;
|
||||||
|
let tail: IndexT = aligned_record_length * 2;
|
||||||
|
|
||||||
|
let mut buffer = ManyToOneRingBuffer::new(vec![0u8; BUFFER_SZ]).unwrap();
|
||||||
|
buffer.put_i64(HEAD_COUNTER_INDEX, head as i64).unwrap();
|
||||||
|
buffer.put_i64(TAIL_COUNTER_INDEX, tail as i64).unwrap();
|
||||||
|
|
||||||
|
buffer
|
||||||
|
.put_i32(record_descriptor::type_offset(0), MSG_TYPE_ID)
|
||||||
|
.unwrap();
|
||||||
|
buffer
|
||||||
|
.put_i32(record_descriptor::length_offset(0), record_length)
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
buffer
|
||||||
|
.put_i32(
|
||||||
|
record_descriptor::type_offset(0 + aligned_record_length),
|
||||||
|
MSG_TYPE_ID,
|
||||||
|
)
|
||||||
|
.unwrap();
|
||||||
|
buffer
|
||||||
|
.put_i32(
|
||||||
|
record_descriptor::length_offset(0 + aligned_record_length),
|
||||||
|
record_length,
|
||||||
|
)
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
let mut times_called = 0;
|
||||||
|
let closure = |_, _buf: &Vec<u8>, _, _| {
|
||||||
|
times_called += 1;
|
||||||
|
};
|
||||||
|
let messages_read = buffer.read_n(closure, 1);
|
||||||
|
|
||||||
|
assert_eq!(messages_read, Ok(1));
|
||||||
|
assert_eq!(times_called, 1);
|
||||||
|
assert_eq!(
|
||||||
|
buffer.get_i64(HEAD_COUNTER_INDEX),
|
||||||
|
Ok((head + aligned_record_length) as i64)
|
||||||
|
);
|
||||||
|
|
||||||
|
for i in (0..record_descriptor::ALIGNMENT).step_by(4) {
|
||||||
|
assert_eq!(buffer.get_i32(i), Ok(0));
|
||||||
|
}
|
||||||
|
assert_eq!(
|
||||||
|
buffer.get_i32(record_descriptor::length_offset(aligned_record_length)),
|
||||||
|
Ok(record_length)
|
||||||
|
);
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user