diff --git a/aeron-rs/src/client/concurrent/mod.rs b/aeron-rs/src/client/concurrent/mod.rs index bd8221d..118ee9b 100644 --- a/aeron-rs/src/client/concurrent/mod.rs +++ b/aeron-rs/src/client/concurrent/mod.rs @@ -62,6 +62,20 @@ pub trait AtomicBuffer: Deref + DerefMut { }) } + /// Overlay a mutable value on the buffer. + /// + /// NOTE: Has the potential to cause undefined behavior if alignment is incorrect + fn overlay_mut(&mut self, offset: IndexT) -> Result<&mut T> + where + T: Sized, + { + self.bounds_check(offset, size_of::() as IndexT) + .map(|_| { + let offset_ptr = unsafe { self.as_mut_ptr().offset(offset as isize) }; + unsafe { &mut *(offset_ptr as *mut T) } + }) + } + /// Overlay a struct on a buffer, and perform a volatile read /// /// ```rust @@ -155,6 +169,11 @@ pub trait AtomicBuffer: Deref + DerefMut { self.overlay_volatile::(offset) } + /// Read an `i64` value from the buffer without performing any synchronization + fn get_i64(&self, offset: IndexT) -> Result { + self.overlay::(offset).map(|i| *i) + } + /// Perform a volatile write of an `i64` value /// /// ```rust @@ -167,6 +186,18 @@ pub trait AtomicBuffer: Deref + DerefMut { self.write_volatile::(offset, value) } + /// Write an `i64` value into the buffer without performing any synchronization + /// + /// ```rust + /// # use aeron_rs::client::concurrent::AtomicBuffer; + /// let mut buffer = vec![0u8; 8]; + /// buffer.put_i64(0, 12); + /// assert_eq!(buffer.get_i64(0), Ok(12)); + /// ``` + fn put_i64(&mut self, offset: IndexT, value: i64) -> Result<()> { + self.overlay_mut::(offset).map(|i| *i = value) + } + /// Write the contents of one buffer to another. Does not perform any synchronization fn put_bytes( &mut self, @@ -209,6 +240,11 @@ pub trait AtomicBuffer: Deref + DerefMut { self.overlay_volatile::(offset) } + /// Read an `i32` value from the buffer without performing any synchronization + fn get_i32(&self, offset: IndexT) -> Result { + self.overlay::(offset).map(|i| *i) + } + /// Perform a volatile write of an `i32` into the buffer /// /// ```rust @@ -220,6 +256,23 @@ pub trait AtomicBuffer: Deref + DerefMut { fn put_i32_ordered(&mut self, offset: IndexT, value: i32) -> Result<()> { self.write_volatile::(offset, value) } + + /// Write an `i32` value into the buffer without performing any synchronization + /// + /// ```rust + /// # use aeron_rs::client::concurrent::AtomicBuffer; + /// let mut buffer = vec![0u8; 5]; + /// buffer.put_i32(0, 255 + 1); + /// assert_eq!(buffer.get_i32(1), Ok(1)); + /// ``` + fn put_i32(&mut self, offset: IndexT, value: i32) -> Result<()> { + self.overlay_mut::(offset).map(|i| *i = value) + } + + /// Return the total number of bytes in this buffer + fn capacity(&self) -> IndexT { + self.len() as IndexT + } } impl AtomicBuffer for Vec {} diff --git a/aeron-rs/src/client/concurrent/ringbuffer.rs b/aeron-rs/src/client/concurrent/ringbuffer.rs index 4f23130..51ef93e 100644 --- a/aeron-rs/src/client/concurrent/ringbuffer.rs +++ b/aeron-rs/src/client/concurrent/ringbuffer.rs @@ -2,7 +2,7 @@ use crate::client::concurrent::AtomicBuffer; use crate::util::bit::align; use crate::util::{bit, AeronError, IndexT, Result}; -use std::ops::Deref; +use std::ops::{Deref, DerefMut}; /// Description of the Ring Buffer schema. pub mod buffer_descriptor { @@ -97,10 +97,16 @@ pub mod record_descriptor { record_offset + HEADER_LENGTH } - pub(super) fn length_offset(record_offset: IndexT) -> IndexT { + /// Return the position of the record length field given a record's starting position + pub fn length_offset(record_offset: IndexT) -> IndexT { record_offset } + /// Return the position of the record message type field given a record's starting position + pub fn type_offset(record_offset: IndexT) -> IndexT { + record_offset + size_of::() as IndexT + } + pub(super) fn record_length(header: i64) -> i32 { header as i32 } @@ -110,6 +116,8 @@ pub mod record_descriptor { } } +const INSUFFICIENT_CAPACITY: IndexT = -2; + /// Multi-producer, single-consumer ring buffer implementation. pub struct ManyToOneRingBuffer where @@ -150,6 +158,11 @@ where .unwrap() } + /// Return the total number of bytes in this buffer + pub fn capacity(&self) -> IndexT { + self.capacity + } + /// Write a message into the ring buffer pub fn write( &mut self, @@ -157,7 +170,7 @@ where source: &B, source_index: IndexT, length: IndexT, - ) -> Result<()> + ) -> Result where B: AtomicBuffer, { @@ -168,6 +181,10 @@ where let required = bit::align(record_len as usize, record_descriptor::ALIGNMENT as usize); let record_index = self.claim_capacity(required as IndexT)?; + if record_index == INSUFFICIENT_CAPACITY { + return Ok(false); + } + // UNWRAP: `claim_capacity` performed bounds checking self.buffer .put_i64_ordered( @@ -189,20 +206,22 @@ where .put_i32_ordered(record_descriptor::length_offset(record_index), record_len) .unwrap(); - Ok(()) + Ok(true) } /// Read messages from the ring buffer and dispatch to `handler`, up to `message_count_limit` - pub fn read(&mut self, mut handler: F, message_count_limit: usize) -> Result + /// + /// NOTE: The C++ API will stop reading and clean up if an exception is thrown in the handler + /// function; by contrast, the Rust API makes no attempt to catch panics and currently + /// has no way of stopping reading once started. + // QUESTION: Is there a better way to handle dispatching the handler function? + // We can't give it a `&dyn AtomicBuffer` because of the monomorphized generic functions, + // don't know if having a separate handler trait would be helpful. + pub fn read_n(&mut self, mut handler: F, message_count_limit: usize) -> Result where F: FnMut(i32, &A, IndexT, IndexT) -> (), { - // QUESTION: Should I implement the `get_i64` method that C++ uses? - // UNWRAP: Bounds check performed during buffer creation - let head = self - .buffer - .get_i64_volatile(self.head_position_index) - .unwrap(); + let head = self.buffer.get_i64(self.head_position_index)?; let head_index = (head & i64::from(self.capacity - 1)) as i32; let contiguous_block_length = self.capacity - head_index; let mut messages_read = 0; @@ -245,6 +264,9 @@ where // in Rust (since the main operation also needs mutable access to self). let mut cleanup = || { if bytes_read != 0 { + // UNWRAP: Need to justify this one. + // Should be safe because we've already done length checks, but I want + // to spend some more time thinking about it. self.buffer .set_memory(head_index, bytes_read as usize, 0) .unwrap(); @@ -261,12 +283,24 @@ where Ok(messages_read) } + /// Read messages from the ring buffer and dispatch to `handler`, up to `message_count_limit` + /// + /// NOTE: The C++ API will stop reading and clean up if an exception is thrown in the handler + /// function; by contrast, the Rust API makes no attempt to catch panics and currently + /// has no way of stopping reading once started. + pub fn read(&mut self, handler: F) -> Result + where + F: FnMut(i32, &A, IndexT, IndexT) -> (), + { + self.read_n(handler, usize::max_value()) + } + /// Claim capacity for a specific message size in the ring buffer. Returns the offset/index /// at which to start writing the next record. fn claim_capacity(&mut self, required: IndexT) -> Result { // QUESTION: Is this mask how we handle the "ring" in ring buffer? // Would explain why we assert buffer capacity is a power of two during initialization - let mask = self.capacity - 1; + let mask: IndexT = self.capacity - 1; // UNWRAP: Known-valid offset calculated during initialization let mut head = self @@ -279,28 +313,18 @@ where let mut padding: IndexT; // Note the braces, making this a do-while loop while { - // UNWRAP: Known-valid offset calculated during initialization - tail = self - .buffer - .get_i64_volatile(self.tail_position_index) - .unwrap(); + tail = self.buffer.get_i64_volatile(self.tail_position_index)?; let available_capacity = self.capacity - (tail - head) as IndexT; if required > available_capacity { - // UNWRAP: Known-valid offset calculated during initialization - head = self - .buffer - .get_i64_volatile(self.head_position_index) - .unwrap(); + head = self.buffer.get_i64_volatile(self.head_position_index)?; if required > (self.capacity - (tail - head) as IndexT) { - return Err(AeronError::InsufficientCapacity); + return Ok(INSUFFICIENT_CAPACITY); } - // UNWRAP: Known-valid offset calculated during initialization self.buffer - .put_i64_ordered(self.head_cache_position_index, head) - .unwrap(); + .put_i64_ordered(self.head_cache_position_index, head)?; } padding = 0; @@ -315,45 +339,32 @@ where let mut head_index = (head & i64::from(mask)) as IndexT; if required > head_index { - // UNWRAP: Known-valid offset calculated during initialization - head = self - .buffer - .get_i64_volatile(self.head_position_index) - .unwrap(); + head = self.buffer.get_i64_volatile(self.head_position_index)?; head_index = (head & i64::from(mask)) as IndexT; if required > head_index { - return Err(AeronError::InsufficientCapacity); + return Ok(INSUFFICIENT_CAPACITY); } - // UNWRAP: Known-valid offset calculated during initialization self.buffer - .put_i64_ordered(self.head_cache_position_index, head) - .unwrap(); + .put_i64_ordered(self.head_cache_position_index, head)?; } padding = to_buffer_end_length; } - // UNWRAP: Known-valid offset calculated during initialization - !self - .buffer - .compare_and_set_i64( - self.tail_position_index, - tail, - tail + i64::from(required) + i64::from(padding), - ) - .unwrap() + !self.buffer.compare_and_set_i64( + self.tail_position_index, + tail, + tail + i64::from(required) + i64::from(padding), + )? } {} if padding != 0 { - // UNWRAP: Known-valid offset calculated during initialization - self.buffer - .put_i64_ordered( - tail_index, - record_descriptor::make_header(padding, record_descriptor::PADDING_MSG_TYPE_ID), - ) - .unwrap(); + self.buffer.put_i64_ordered( + tail_index, + record_descriptor::make_header(padding, record_descriptor::PADDING_MSG_TYPE_ID), + )?; tail_index = 0; } @@ -367,6 +378,11 @@ where Ok(()) } } + + /// Return the largest possible message size for this buffer + pub fn max_msg_length(&self) -> IndexT { + self.max_msg_length + } } impl Deref for ManyToOneRingBuffer @@ -380,12 +396,19 @@ where } } +impl DerefMut for ManyToOneRingBuffer +where + A: AtomicBuffer, +{ + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.buffer + } +} + #[cfg(test)] mod tests { - use crate::client::concurrent::ringbuffer::{record_descriptor, ManyToOneRingBuffer}; + use crate::client::concurrent::ringbuffer::ManyToOneRingBuffer; use crate::client::concurrent::AtomicBuffer; - use crate::util::IndexT; - use std::mem::size_of; const BUFFER_SIZE: usize = 512 + super::buffer_descriptor::TRAILER_LENGTH as usize; @@ -421,81 +444,4 @@ mod tests { let write_start = ring_buf.claim_capacity(16).unwrap(); assert_eq!(write_start, 16); } - - #[test] - fn write_basic() { - let mut ring_buffer = - ManyToOneRingBuffer::new(vec![0u8; BUFFER_SIZE]).expect("Invalid buffer size"); - - let source_bytes = &mut [12u8, 0, 0, 0][..]; - let source_len = source_bytes.len() as IndexT; - let type_id = 1; - ring_buffer - .write(type_id, &source_bytes, 0, source_len) - .unwrap(); - - let record_len = source_len + record_descriptor::HEADER_LENGTH; - assert_eq!( - ring_buffer.get_i64_volatile(0).unwrap(), - record_descriptor::make_header(record_len, type_id) - ); - assert_eq!( - ring_buffer - .get_i64_volatile(size_of::() as IndexT) - .unwrap(), - 12 - ); - } - - #[test] - fn read_basic() { - // Similar to write basic, put something into the buffer - let mut ring_buffer = - ManyToOneRingBuffer::new(vec![0u8; BUFFER_SIZE]).expect("Invalid buffer size"); - - let mut source_buffer = &mut [12u8, 0, 0, 0, 0, 0, 0, 0][..]; - let source_len = source_buffer.len() as IndexT; - let type_id = 1; - ring_buffer - .write(type_id, &source_buffer, 0, source_len) - .unwrap(); - - // Now we can start the actual read process - let c = |_, buf: &Vec, offset, _| assert_eq!(buf.get_i64_volatile(offset).unwrap(), 12); - ring_buffer.read(c, 1).unwrap(); - - // Make sure that the buffer was zeroed on finish - for i in (0..record_descriptor::ALIGNMENT * 1).step_by(4) { - assert_eq!(ring_buffer.get_i32_volatile(i).unwrap(), 0); - } - } - - #[test] - fn read_multiple() { - let mut ring_buffer = - ManyToOneRingBuffer::new(vec![0u8; BUFFER_SIZE]).expect("Invalid buffer size"); - - let mut source_buffer = &mut [12u8, 0, 0, 0, 0, 0, 0, 0][..]; - let source_len = source_buffer.len() as IndexT; - let type_id = 1; - ring_buffer - .write(type_id, &source_buffer, 0, source_len) - .unwrap(); - ring_buffer - .write(type_id, &source_buffer, 0, source_len) - .unwrap(); - - let mut msg_count = 0; - let c = |_, buf: &Vec, offset, _| { - msg_count += 1; - assert_eq!(buf.get_i64_volatile(offset).unwrap(), 12); - }; - ring_buffer.read(c, 2).unwrap(); - assert_eq!(msg_count, 2); - - // Make sure that the buffer was zeroed on finish - for i in (0..record_descriptor::ALIGNMENT * 2).step_by(4) { - assert_eq!(ring_buffer.get_i32_volatile(i).unwrap(), 0); - } - } } diff --git a/aeron-rs/tests/many_to_one_ring_buffer.rs b/aeron-rs/tests/many_to_one_ring_buffer.rs new file mode 100644 index 0000000..58d11ad --- /dev/null +++ b/aeron-rs/tests/many_to_one_ring_buffer.rs @@ -0,0 +1,381 @@ +/// Tests based on the C++ tests included with Aeron +use aeron_rs::client::concurrent::ringbuffer::{ + buffer_descriptor, record_descriptor, ManyToOneRingBuffer, +}; +use aeron_rs::client::concurrent::AtomicBuffer; +use aeron_rs::util::bit::align; +use aeron_rs::util::IndexT; +use std::ops::Deref; + +const CAPACITY: usize = 1024; +const BUFFER_SZ: usize = CAPACITY + buffer_descriptor::TRAILER_LENGTH as usize; +const ODD_BUFFER_SZ: usize = (CAPACITY - 1) + buffer_descriptor::TRAILER_LENGTH as usize; + +const MSG_TYPE_ID: i32 = 101; +const HEAD_COUNTER_INDEX: IndexT = 1024 as IndexT + buffer_descriptor::HEAD_POSITION_OFFSET; +const TAIL_COUNTER_INDEX: IndexT = 1024 as IndexT + buffer_descriptor::TAIL_POSITION_OFFSET; + +#[test] +fn should_calculate_capacity_for_buffer() { + let buffer = ManyToOneRingBuffer::new(vec![0u8; BUFFER_SZ]).unwrap(); + + assert_eq!(AtomicBuffer::capacity(buffer.deref()), BUFFER_SZ as IndexT); + assert_eq!(buffer.capacity(), CAPACITY as IndexT); +} + +#[test] +fn should_throw_for_capacity_not_power_of_two() { + let buffer = ManyToOneRingBuffer::new(vec![0u8; ODD_BUFFER_SZ]); + + assert!(buffer.is_err()); +} + +#[test] +fn should_throw_when_max_message_size_exceeded() { + let mut buffer = ManyToOneRingBuffer::new(vec![0u8; BUFFER_SZ]).unwrap(); + + let bytes = vec![0u8; buffer.max_msg_length() as usize + 1]; + let write_res = buffer.write(MSG_TYPE_ID, &bytes, 0, bytes.len() as IndexT); + + assert!(write_res.is_err()); +} + +#[test] +fn should_write_to_empty_buffer() { + let tail: IndexT = 0; + let tail_index: IndexT = 0; + let length: IndexT = 8; + let record_length: IndexT = length + record_descriptor::HEADER_LENGTH; + let src_index: IndexT = 0; + let aligned_record_length: IndexT = align( + record_length as usize, + record_descriptor::ALIGNMENT as usize, + ) as IndexT; + + let mut buffer = ManyToOneRingBuffer::new(vec![0u8; BUFFER_SZ]).unwrap(); + let src_bytes = vec![0u8; BUFFER_SZ]; + + assert!(buffer + .write(MSG_TYPE_ID, &src_bytes, src_index, length) + .unwrap()); + + assert_eq!( + buffer.get_i32(record_descriptor::length_offset(tail_index)), + Ok(record_length) + ); + assert_eq!( + buffer.get_i32(record_descriptor::type_offset(tail_index)), + Ok(MSG_TYPE_ID) + ); + assert_eq!( + buffer.get_i64(TAIL_COUNTER_INDEX), + Ok((tail + aligned_record_length) as i64) + ); +} + +#[test] +fn should_reject_write_when_insufficient_space() { + let length: IndexT = 100; + let head: IndexT = 0; + let tail: IndexT = head + + (CAPACITY + - align( + (length - record_descriptor::ALIGNMENT) as usize, + record_descriptor::ALIGNMENT as usize, + )) as IndexT; + let src_index: IndexT = 0; + + let mut buffer = ManyToOneRingBuffer::new(vec![0u8; BUFFER_SZ]).unwrap(); + buffer.put_i64(HEAD_COUNTER_INDEX, head as i64).unwrap(); + buffer.put_i64(TAIL_COUNTER_INDEX, tail as i64).unwrap(); + + let src_bytes = vec![0u8; BUFFER_SZ]; + let write_res = buffer.write(MSG_TYPE_ID, &src_bytes, src_index, length); + + assert_eq!(write_res, Ok(false)); + assert_eq!(buffer.get_i64(TAIL_COUNTER_INDEX), Ok(tail as i64)); +} + +#[test] +fn should_reject_write_when_buffer_full() { + let length: IndexT = 8; + let head: IndexT = 0; + let tail: IndexT = head + CAPACITY as IndexT; + let src_index: IndexT = 0; + + let mut buffer = ManyToOneRingBuffer::new(vec![0u8; BUFFER_SZ]).unwrap(); + buffer.put_i64(HEAD_COUNTER_INDEX, head as i64).unwrap(); + buffer.put_i64(TAIL_COUNTER_INDEX, tail as i64).unwrap(); + + let src_bytes = vec![0u8; BUFFER_SZ]; + let write_res = buffer.write(MSG_TYPE_ID, &src_bytes, src_index, length); + assert_eq!(write_res, Ok(false)); + assert_eq!(buffer.get_i64(TAIL_COUNTER_INDEX), Ok(tail as i64)); +} + +#[test] +fn should_insert_padding_record_plus_message_on_buffer_wrap() { + let length: IndexT = 100; + let record_length: IndexT = length + record_descriptor::HEADER_LENGTH; + let aligned_record_length = align( + record_length as usize, + record_descriptor::ALIGNMENT as usize, + ) as IndexT; + let tail: IndexT = CAPACITY as IndexT - record_descriptor::ALIGNMENT; + let head: IndexT = tail - (record_descriptor::ALIGNMENT * 4); + let src_index: IndexT = 0; + + let mut buffer = ManyToOneRingBuffer::new(vec![0u8; BUFFER_SZ]).unwrap(); + buffer.put_i64(HEAD_COUNTER_INDEX, head as i64).unwrap(); + buffer.put_i64(TAIL_COUNTER_INDEX, tail as i64).unwrap(); + + let src_bytes = vec![0u8; BUFFER_SZ]; + let write_res = buffer.write(MSG_TYPE_ID, &src_bytes, src_index, length); + assert_eq!(write_res, Ok(true)); + + assert_eq!( + buffer.get_i32(record_descriptor::type_offset(tail)), + Ok(record_descriptor::PADDING_MSG_TYPE_ID) + ); + assert_eq!( + buffer.get_i32(record_descriptor::length_offset(tail)), + Ok(record_descriptor::ALIGNMENT) + ); + + assert_eq!( + buffer.get_i32(record_descriptor::length_offset(0)), + Ok(record_length) + ); + assert_eq!( + buffer.get_i32(record_descriptor::type_offset(0)), + Ok(MSG_TYPE_ID) + ); + assert_eq!( + buffer.get_i64(TAIL_COUNTER_INDEX), + Ok((tail + aligned_record_length + record_descriptor::ALIGNMENT) as i64) + ); +} + +#[test] +fn should_insert_padding_record_plus_message_on_buffer_wrap_with_head_equal_to_tail() { + let length: IndexT = 100; + let record_length: IndexT = length + record_descriptor::HEADER_LENGTH; + let aligned_record_length: IndexT = align( + record_length as usize, + record_descriptor::ALIGNMENT as usize, + ) as IndexT; + let tail: IndexT = CAPACITY as IndexT - record_descriptor::ALIGNMENT; + let head: IndexT = tail; + let src_index: IndexT = 0; + + let mut buffer = ManyToOneRingBuffer::new(vec![0u8; BUFFER_SZ]).unwrap(); + buffer.put_i64(HEAD_COUNTER_INDEX, head as i64).unwrap(); + buffer.put_i64(TAIL_COUNTER_INDEX, tail as i64).unwrap(); + + let src_bytes = vec![0u8; BUFFER_SZ]; + let write_res = buffer.write(MSG_TYPE_ID, &src_bytes, src_index, length); + assert_eq!(write_res, Ok(true)); + + assert_eq!( + buffer.get_i32(record_descriptor::type_offset(tail)), + Ok(record_descriptor::PADDING_MSG_TYPE_ID) + ); + assert_eq!( + buffer.get_i32(record_descriptor::length_offset(tail)), + Ok(record_descriptor::ALIGNMENT) + ); + + assert_eq!( + buffer.get_i32(record_descriptor::length_offset(0)), + Ok(record_length) + ); + assert_eq!( + buffer.get_i32(record_descriptor::type_offset(0)), + Ok(MSG_TYPE_ID) + ); + assert_eq!( + buffer.get_i64(TAIL_COUNTER_INDEX), + Ok((tail + aligned_record_length + record_descriptor::ALIGNMENT) as i64) + ); +} + +#[test] +fn should_read_single_message() { + let length: IndexT = 8; + let head: IndexT = 0; + let record_length: IndexT = length + record_descriptor::HEADER_LENGTH; + let aligned_record_length: IndexT = align( + record_length as usize, + record_descriptor::ALIGNMENT as usize, + ) as IndexT; + let tail: IndexT = aligned_record_length; + + let mut buffer = ManyToOneRingBuffer::new(vec![0u8; BUFFER_SZ]).unwrap(); + buffer.put_i64(HEAD_COUNTER_INDEX, head as i64).unwrap(); + buffer.put_i64(TAIL_COUNTER_INDEX, tail as i64).unwrap(); + + buffer + .put_i32(record_descriptor::type_offset(0), MSG_TYPE_ID) + .unwrap(); + buffer + .put_i32(record_descriptor::length_offset(0), record_length) + .unwrap(); + + let mut times_called = 0; + let closure = |_, _buf: &Vec, _, _| { + times_called += 1; + }; + let messages_read = buffer.read(closure); + + assert_eq!(messages_read, Ok(1)); + assert_eq!(times_called, 1); + assert_eq!( + buffer.get_i64(HEAD_COUNTER_INDEX), + Ok((head + aligned_record_length) as i64) + ); + + for i in (0..record_descriptor::ALIGNMENT).step_by(4) { + assert_eq!(buffer.get_i32(i), Ok(0)); + } +} + +#[test] +fn should_not_read_single_message_part_way_through_writing() { + let length: IndexT = 8; + let head: IndexT = 0; + let record_length: IndexT = length + record_descriptor::HEADER_LENGTH; + let aligned_record_length: IndexT = align( + record_length as usize, + record_descriptor::ALIGNMENT as usize, + ) as IndexT; + let end_tail: IndexT = aligned_record_length; + + let mut buffer = ManyToOneRingBuffer::new(vec![0u8; BUFFER_SZ]).unwrap(); + buffer.put_i64(TAIL_COUNTER_INDEX, end_tail as i64).unwrap(); + buffer + .put_i32(record_descriptor::type_offset(0), MSG_TYPE_ID) + .unwrap(); + buffer + .put_i32(record_descriptor::length_offset(0), -record_length) + .unwrap(); + + let mut times_called = 0; + let closure = |_, _buf: &Vec, _, _| { + times_called += 1; + }; + let messages_read = buffer.read(closure); + + assert_eq!(messages_read, Ok(0)); + assert_eq!(times_called, 0); + assert_eq!(buffer.get_i64(HEAD_COUNTER_INDEX), Ok(head as i64)); +} + +#[test] +fn should_read_two_messages() { + let length: IndexT = 8; + let head: IndexT = 0; + let record_length: IndexT = length + record_descriptor::HEADER_LENGTH; + let aligned_record_length: IndexT = align( + record_length as usize, + record_descriptor::ALIGNMENT as usize, + ) as IndexT; + let tail: IndexT = aligned_record_length * 2; + + let mut buffer = ManyToOneRingBuffer::new(vec![0u8; BUFFER_SZ]).unwrap(); + buffer.put_i64(HEAD_COUNTER_INDEX, head as i64).unwrap(); + buffer.put_i64(TAIL_COUNTER_INDEX, tail as i64).unwrap(); + + buffer + .put_i32(record_descriptor::type_offset(0), MSG_TYPE_ID) + .unwrap(); + buffer + .put_i32(record_descriptor::length_offset(0), record_length) + .unwrap(); + + buffer + .put_i32( + record_descriptor::type_offset(0 + aligned_record_length), + MSG_TYPE_ID, + ) + .unwrap(); + buffer + .put_i32( + record_descriptor::length_offset(0 + aligned_record_length), + record_length, + ) + .unwrap(); + + let mut times_called = 0; + let closure = |_, _buf: &Vec, _, _| { + times_called += 1; + }; + let messages_read = buffer.read(closure); + + assert_eq!(messages_read, Ok(2)); + assert_eq!(times_called, 2); + assert_eq!( + buffer.get_i64(HEAD_COUNTER_INDEX), + Ok((head + aligned_record_length * 2) as i64) + ); + + for i in (0..record_descriptor::ALIGNMENT * 2).step_by(4) { + assert_eq!(buffer.get_i32(i), Ok(0)); + } +} + +#[test] +fn should_limit_read_of_messages() { + let length: IndexT = 8; + let head: IndexT = 0; + let record_length: IndexT = length + record_descriptor::HEADER_LENGTH; + let aligned_record_length: IndexT = align( + record_length as usize, + record_descriptor::ALIGNMENT as usize, + ) as IndexT; + let tail: IndexT = aligned_record_length * 2; + + let mut buffer = ManyToOneRingBuffer::new(vec![0u8; BUFFER_SZ]).unwrap(); + buffer.put_i64(HEAD_COUNTER_INDEX, head as i64).unwrap(); + buffer.put_i64(TAIL_COUNTER_INDEX, tail as i64).unwrap(); + + buffer + .put_i32(record_descriptor::type_offset(0), MSG_TYPE_ID) + .unwrap(); + buffer + .put_i32(record_descriptor::length_offset(0), record_length) + .unwrap(); + + buffer + .put_i32( + record_descriptor::type_offset(0 + aligned_record_length), + MSG_TYPE_ID, + ) + .unwrap(); + buffer + .put_i32( + record_descriptor::length_offset(0 + aligned_record_length), + record_length, + ) + .unwrap(); + + let mut times_called = 0; + let closure = |_, _buf: &Vec, _, _| { + times_called += 1; + }; + let messages_read = buffer.read_n(closure, 1); + + assert_eq!(messages_read, Ok(1)); + assert_eq!(times_called, 1); + assert_eq!( + buffer.get_i64(HEAD_COUNTER_INDEX), + Ok((head + aligned_record_length) as i64) + ); + + for i in (0..record_descriptor::ALIGNMENT).step_by(4) { + assert_eq!(buffer.get_i32(i), Ok(0)); + } + assert_eq!( + buffer.get_i32(record_descriptor::length_offset(aligned_record_length)), + Ok(record_length) + ); +}