From b567336bb0a86aad6855ac845e26ba0a7bc95968 Mon Sep 17 00:00:00 2001 From: Bradlee Speice Date: Sat, 2 Nov 2019 18:03:40 -0400 Subject: [PATCH] Start porting over existing tests from Aeron --- aeron-rs/src/client/concurrent/mod.rs | 41 ++++++ aeron-rs/src/client/concurrent/ringbuffer.rs | 100 +++++++------- aeron-rs/tests/many_to_one_ring_buffer.rs | 134 +++++++++++++++++++ 3 files changed, 229 insertions(+), 46 deletions(-) create mode 100644 aeron-rs/tests/many_to_one_ring_buffer.rs diff --git a/aeron-rs/src/client/concurrent/mod.rs b/aeron-rs/src/client/concurrent/mod.rs index bd8221d..49dda0a 100644 --- a/aeron-rs/src/client/concurrent/mod.rs +++ b/aeron-rs/src/client/concurrent/mod.rs @@ -62,6 +62,20 @@ pub trait AtomicBuffer: Deref + DerefMut { }) } + /// Overlay a mutable value on the buffer. + /// + /// NOTE: Has the potential to cause undefined behavior if alignment is incorrect + fn overlay_mut(&mut self, offset: IndexT) -> Result<&mut T> + where + T: Sized + { + self.bounds_check(offset, size_of::() as IndexT) + .map(|_| { + let offset_ptr = unsafe { self.as_mut_ptr().offset(offset as isize) }; + unsafe { &mut *(offset_ptr as *mut T) } + }) + } + /// Overlay a struct on a buffer, and perform a volatile read /// /// ```rust @@ -155,6 +169,11 @@ pub trait AtomicBuffer: Deref + DerefMut { self.overlay_volatile::(offset) } + /// Read an `i64` value from the buffer without performing any synchronization + fn get_i64(&self, offset: IndexT) -> Result { + self.overlay::(offset).map(|i| *i) + } + /// Perform a volatile write of an `i64` value /// /// ```rust @@ -167,6 +186,18 @@ pub trait AtomicBuffer: Deref + DerefMut { self.write_volatile::(offset, value) } + /// Write an `i64` value into the buffer without performing any synchronization + /// + /// ```rust + /// # use aeron_rs::client::concurrent::AtomicBuffer; + /// let mut buffer = vec![0u8; 8]; + /// buffer.put_i64(0, 12); + /// assert_eq!(buffer.get_i64(0), Ok(12)); + /// ``` + fn put_i64(&mut self, offset: IndexT, value: i64) -> Result<()> { + self.overlay_mut::(offset).map(|i| *i = value) + } + /// Write the contents of one buffer to another. Does not perform any synchronization fn put_bytes( &mut self, @@ -209,6 +240,11 @@ pub trait AtomicBuffer: Deref + DerefMut { self.overlay_volatile::(offset) } + /// Read an `i32` value from the buffer without performing any synchronization + fn get_i32(&self, offset: IndexT) -> Result { + self.overlay::(offset).map(|i| *i) + } + /// Perform a volatile write of an `i32` into the buffer /// /// ```rust @@ -220,6 +256,11 @@ pub trait AtomicBuffer: Deref + DerefMut { fn put_i32_ordered(&mut self, offset: IndexT, value: i32) -> Result<()> { self.write_volatile::(offset, value) } + + /// Return the total number of bytes in this buffer + fn capacity(&self) -> IndexT { + self.len() as IndexT + } } impl AtomicBuffer for Vec {} diff --git a/aeron-rs/src/client/concurrent/ringbuffer.rs b/aeron-rs/src/client/concurrent/ringbuffer.rs index 4f23130..d471e9d 100644 --- a/aeron-rs/src/client/concurrent/ringbuffer.rs +++ b/aeron-rs/src/client/concurrent/ringbuffer.rs @@ -2,7 +2,7 @@ use crate::client::concurrent::AtomicBuffer; use crate::util::bit::align; use crate::util::{bit, AeronError, IndexT, Result}; -use std::ops::Deref; +use std::ops::{Deref, DerefMut}; /// Description of the Ring Buffer schema. pub mod buffer_descriptor { @@ -97,10 +97,16 @@ pub mod record_descriptor { record_offset + HEADER_LENGTH } - pub(super) fn length_offset(record_offset: IndexT) -> IndexT { + /// Return the position of the record length field given a record's starting position + pub fn length_offset(record_offset: IndexT) -> IndexT { record_offset } + /// Return the position of the record message type field given a record's starting position + pub fn type_offset(record_offset: IndexT) -> IndexT { + record_offset + size_of::() as IndexT + } + pub(super) fn record_length(header: i64) -> i32 { header as i32 } @@ -110,6 +116,8 @@ pub mod record_descriptor { } } +const INSUFFICIENT_CAPACITY: IndexT = -2; + /// Multi-producer, single-consumer ring buffer implementation. pub struct ManyToOneRingBuffer where @@ -150,6 +158,11 @@ where .unwrap() } + /// Return the total number of bytes in this buffer + pub fn capacity(&self) -> IndexT { + self.capacity + } + /// Write a message into the ring buffer pub fn write( &mut self, @@ -157,7 +170,7 @@ where source: &B, source_index: IndexT, length: IndexT, - ) -> Result<()> + ) -> Result where B: AtomicBuffer, { @@ -168,6 +181,10 @@ where let required = bit::align(record_len as usize, record_descriptor::ALIGNMENT as usize); let record_index = self.claim_capacity(required as IndexT)?; + if record_index == INSUFFICIENT_CAPACITY { + return Ok(false); + } + // UNWRAP: `claim_capacity` performed bounds checking self.buffer .put_i64_ordered( @@ -189,7 +206,7 @@ where .put_i32_ordered(record_descriptor::length_offset(record_index), record_len) .unwrap(); - Ok(()) + Ok(true) } /// Read messages from the ring buffer and dispatch to `handler`, up to `message_count_limit` @@ -266,7 +283,7 @@ where fn claim_capacity(&mut self, required: IndexT) -> Result { // QUESTION: Is this mask how we handle the "ring" in ring buffer? // Would explain why we assert buffer capacity is a power of two during initialization - let mask = self.capacity - 1; + let mask: IndexT = self.capacity - 1; // UNWRAP: Known-valid offset calculated during initialization let mut head = self @@ -279,28 +296,18 @@ where let mut padding: IndexT; // Note the braces, making this a do-while loop while { - // UNWRAP: Known-valid offset calculated during initialization - tail = self - .buffer - .get_i64_volatile(self.tail_position_index) - .unwrap(); + tail = self.buffer.get_i64_volatile(self.tail_position_index)?; let available_capacity = self.capacity - (tail - head) as IndexT; if required > available_capacity { - // UNWRAP: Known-valid offset calculated during initialization - head = self - .buffer - .get_i64_volatile(self.head_position_index) - .unwrap(); + head = self.buffer.get_i64_volatile(self.head_position_index)?; if required > (self.capacity - (tail - head) as IndexT) { - return Err(AeronError::InsufficientCapacity); + return Ok(INSUFFICIENT_CAPACITY); } - // UNWRAP: Known-valid offset calculated during initialization self.buffer - .put_i64_ordered(self.head_cache_position_index, head) - .unwrap(); + .put_i64_ordered(self.head_cache_position_index, head)?; } padding = 0; @@ -315,45 +322,32 @@ where let mut head_index = (head & i64::from(mask)) as IndexT; if required > head_index { - // UNWRAP: Known-valid offset calculated during initialization - head = self - .buffer - .get_i64_volatile(self.head_position_index) - .unwrap(); + head = self.buffer.get_i64_volatile(self.head_position_index)?; head_index = (head & i64::from(mask)) as IndexT; if required > head_index { - return Err(AeronError::InsufficientCapacity); + return Ok(INSUFFICIENT_CAPACITY); } - // UNWRAP: Known-valid offset calculated during initialization self.buffer - .put_i64_ordered(self.head_cache_position_index, head) - .unwrap(); + .put_i64_ordered(self.head_cache_position_index, head)?; } padding = to_buffer_end_length; } - // UNWRAP: Known-valid offset calculated during initialization - !self - .buffer - .compare_and_set_i64( - self.tail_position_index, - tail, - tail + i64::from(required) + i64::from(padding), - ) - .unwrap() + !self.buffer.compare_and_set_i64( + self.tail_position_index, + tail, + tail + i64::from(required) + i64::from(padding), + )? } {} if padding != 0 { - // UNWRAP: Known-valid offset calculated during initialization - self.buffer - .put_i64_ordered( - tail_index, - record_descriptor::make_header(padding, record_descriptor::PADDING_MSG_TYPE_ID), - ) - .unwrap(); + self.buffer.put_i64_ordered( + tail_index, + record_descriptor::make_header(padding, record_descriptor::PADDING_MSG_TYPE_ID), + )?; tail_index = 0; } @@ -367,6 +361,11 @@ where Ok(()) } } + + /// Return the largest possible message size for this buffer + pub fn max_msg_length(&self) -> IndexT { + self.max_msg_length + } } impl Deref for ManyToOneRingBuffer @@ -380,6 +379,15 @@ where } } +impl DerefMut for ManyToOneRingBuffer +where + A: AtomicBuffer, +{ + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.buffer + } +} + #[cfg(test)] mod tests { use crate::client::concurrent::ringbuffer::{record_descriptor, ManyToOneRingBuffer}; @@ -453,7 +461,7 @@ mod tests { let mut ring_buffer = ManyToOneRingBuffer::new(vec![0u8; BUFFER_SIZE]).expect("Invalid buffer size"); - let mut source_buffer = &mut [12u8, 0, 0, 0, 0, 0, 0, 0][..]; + let source_buffer = &mut [12u8, 0, 0, 0, 0, 0, 0, 0][..]; let source_len = source_buffer.len() as IndexT; let type_id = 1; ring_buffer @@ -475,7 +483,7 @@ mod tests { let mut ring_buffer = ManyToOneRingBuffer::new(vec![0u8; BUFFER_SIZE]).expect("Invalid buffer size"); - let mut source_buffer = &mut [12u8, 0, 0, 0, 0, 0, 0, 0][..]; + let source_buffer = &mut [12u8, 0, 0, 0, 0, 0, 0, 0][..]; let source_len = source_buffer.len() as IndexT; let type_id = 1; ring_buffer diff --git a/aeron-rs/tests/many_to_one_ring_buffer.rs b/aeron-rs/tests/many_to_one_ring_buffer.rs new file mode 100644 index 0000000..e176180 --- /dev/null +++ b/aeron-rs/tests/many_to_one_ring_buffer.rs @@ -0,0 +1,134 @@ +/// Tests based on the C++ tests included with Aeron +use aeron_rs::client::concurrent::ringbuffer::{ + buffer_descriptor, record_descriptor, ManyToOneRingBuffer, +}; +use aeron_rs::client::concurrent::AtomicBuffer; +use aeron_rs::util::bit::align; +use aeron_rs::util::IndexT; +use std::ops::Deref; + +const CAPACITY: usize = 1024; +const BUFFER_SZ: usize = CAPACITY + buffer_descriptor::TRAILER_LENGTH as usize; +const ODD_BUFFER_SZ: usize = (CAPACITY - 1) + buffer_descriptor::TRAILER_LENGTH as usize; + +const MSG_TYPE_ID: i32 = 101; +const HEAD_COUNTER_INDEX: IndexT = 1024 as IndexT + buffer_descriptor::HEAD_POSITION_OFFSET; +const TAIL_COUNTER_INDEX: IndexT = 1024 as IndexT + buffer_descriptor::TAIL_POSITION_OFFSET; + +#[test] +fn should_calculate_capacity_for_buffer() { + let buffer = ManyToOneRingBuffer::new(vec![0u8; BUFFER_SZ]).unwrap(); + + assert_eq!(AtomicBuffer::capacity(buffer.deref()), BUFFER_SZ as IndexT); + assert_eq!(buffer.capacity(), CAPACITY as IndexT); +} + +#[test] +fn should_throw_for_capacity_not_power_of_two() { + let buffer = ManyToOneRingBuffer::new(vec![0u8; ODD_BUFFER_SZ]); + + assert!(buffer.is_err()); +} + +#[test] +fn should_throw_when_max_message_size_exceeded() { + let mut buffer = ManyToOneRingBuffer::new(vec![0u8; BUFFER_SZ]).unwrap(); + + let bytes = vec![0u8; buffer.max_msg_length() as usize + 1]; + let write_res = buffer.write(MSG_TYPE_ID, &bytes, 0, bytes.len() as IndexT); + + assert!(write_res.is_err()); +} + +#[test] +fn should_write_to_empty_buffer() { + let tail: IndexT = 0; + let tail_index: IndexT = 0; + let length: IndexT = 8; + let record_length: IndexT = length + record_descriptor::HEADER_LENGTH; + let src_index: IndexT = 0; + let aligned_record_length: IndexT = align( + record_length as usize, + record_descriptor::ALIGNMENT as usize, + ) as IndexT; + + let mut buffer = ManyToOneRingBuffer::new(vec![0u8; BUFFER_SZ]).unwrap(); + let src_bytes = vec![0u8; BUFFER_SZ]; + + assert!(buffer + .write(MSG_TYPE_ID, &src_bytes, src_index, length) + .unwrap()); + + assert_eq!( + buffer.get_i32(record_descriptor::length_offset(tail_index)), + Ok(record_length) + ); + assert_eq!( + buffer.get_i32(record_descriptor::type_offset(tail_index)), + Ok(MSG_TYPE_ID) + ); + assert_eq!( + buffer.get_i64(TAIL_COUNTER_INDEX), + Ok((tail + aligned_record_length) as i64) + ); +} + +#[test] +fn should_reject_write_when_insufficient_space() { + let length: IndexT = 100; + let head: IndexT = 0; + let tail: IndexT = head + (CAPACITY - align((length - record_descriptor::ALIGNMENT) as usize, record_descriptor::ALIGNMENT as usize)) as IndexT; + let src_index: IndexT = 0; + + let mut buffer = ManyToOneRingBuffer::new(vec![0u8; BUFFER_SZ]).unwrap(); + buffer.put_i64(HEAD_COUNTER_INDEX, head as i64).unwrap(); + buffer.put_i64(TAIL_COUNTER_INDEX, tail as i64).unwrap(); + + let src_bytes = vec![0u8; BUFFER_SZ]; + let write_res = buffer.write(MSG_TYPE_ID, &src_bytes, src_index, length); + + assert_eq!(write_res, Ok(false)); + assert_eq!(buffer.get_i64(TAIL_COUNTER_INDEX), Ok(tail as i64)); +} + +#[test] +fn should_reject_write_when_buffer_full() { + let length: IndexT = 8; + let head: IndexT = 0; + let tail: IndexT = head + CAPACITY as IndexT; + let src_index: IndexT = 0; + + let mut buffer = ManyToOneRingBuffer::new(vec![0u8; BUFFER_SZ]).unwrap(); + buffer.put_i64(HEAD_COUNTER_INDEX, head as i64).unwrap(); + buffer.put_i64(TAIL_COUNTER_INDEX, tail as i64).unwrap(); + + let src_bytes = vec![0u8; BUFFER_SZ]; + let write_res = buffer.write(MSG_TYPE_ID, &src_bytes, src_index, length); + assert_eq!(write_res, Ok(false)); + assert_eq!(buffer.get_i64(TAIL_COUNTER_INDEX), Ok(tail as i64)); +} + +#[test] +fn should_insert_padding_record_plus_message_on_buffer_wrap() { + let length: IndexT = 100; + let record_length: IndexT = length + record_descriptor::HEADER_LENGTH; + let aligned_record_length = align(record_length as usize, record_descriptor::ALIGNMENT as usize) as IndexT; + let tail: IndexT = CAPACITY as IndexT - record_descriptor::ALIGNMENT; + let head: IndexT = tail - (record_descriptor::ALIGNMENT * 4); + let src_index: IndexT = 0; + + let mut buffer = ManyToOneRingBuffer::new(vec![0u8; BUFFER_SZ]).unwrap(); + buffer.put_i64(HEAD_COUNTER_INDEX, head as i64).unwrap(); + buffer.put_i64(TAIL_COUNTER_INDEX, tail as i64).unwrap(); + + let src_bytes = vec![0u8; BUFFER_SZ]; + let write_res = buffer.write(MSG_TYPE_ID, &src_bytes, src_index, length); + assert_eq!(write_res, Ok(true)); + + assert_eq!(buffer.get_i32(record_descriptor::type_offset(tail)), Ok(record_descriptor::PADDING_MSG_TYPE_ID)); + assert_eq!(buffer.get_i32(record_descriptor::length_offset(tail)), Ok(record_descriptor::ALIGNMENT)); + + assert_eq!(buffer.get_i32(record_descriptor::length_offset(0)), Ok(record_length)); + assert_eq!(buffer.get_i32(record_descriptor::type_offset(0)), Ok(MSG_TYPE_ID)); + assert_eq!(buffer.get_i64(TAIL_COUNTER_INDEX), Ok((tail + aligned_record_length + record_descriptor::ALIGNMENT) as i64)); +}