diff --git a/aeron-rs/src/client/concurrent/mod.rs b/aeron-rs/src/client/concurrent/mod.rs index d296594..118ee9b 100644 --- a/aeron-rs/src/client/concurrent/mod.rs +++ b/aeron-rs/src/client/concurrent/mod.rs @@ -257,6 +257,18 @@ pub trait AtomicBuffer: Deref + DerefMut { self.write_volatile::(offset, value) } + /// Write an `i32` value into the buffer without performing any synchronization + /// + /// ```rust + /// # use aeron_rs::client::concurrent::AtomicBuffer; + /// let mut buffer = vec![0u8; 5]; + /// buffer.put_i32(0, 255 + 1); + /// assert_eq!(buffer.get_i32(1), Ok(1)); + /// ``` + fn put_i32(&mut self, offset: IndexT, value: i32) -> Result<()> { + self.overlay_mut::(offset).map(|i| *i = value) + } + /// Return the total number of bytes in this buffer fn capacity(&self) -> IndexT { self.len() as IndexT diff --git a/aeron-rs/src/client/concurrent/ringbuffer.rs b/aeron-rs/src/client/concurrent/ringbuffer.rs index 35ba3ce..51ef93e 100644 --- a/aeron-rs/src/client/concurrent/ringbuffer.rs +++ b/aeron-rs/src/client/concurrent/ringbuffer.rs @@ -210,16 +210,18 @@ where } /// Read messages from the ring buffer and dispatch to `handler`, up to `message_count_limit` - pub fn read(&mut self, mut handler: F, message_count_limit: usize) -> Result + /// + /// NOTE: The C++ API will stop reading and clean up if an exception is thrown in the handler + /// function; by contrast, the Rust API makes no attempt to catch panics and currently + /// has no way of stopping reading once started. + // QUESTION: Is there a better way to handle dispatching the handler function? + // We can't give it a `&dyn AtomicBuffer` because of the monomorphized generic functions, + // don't know if having a separate handler trait would be helpful. + pub fn read_n(&mut self, mut handler: F, message_count_limit: usize) -> Result where F: FnMut(i32, &A, IndexT, IndexT) -> (), { - // QUESTION: Should I implement the `get_i64` method that C++ uses? - // UNWRAP: Bounds check performed during buffer creation - let head = self - .buffer - .get_i64_volatile(self.head_position_index) - .unwrap(); + let head = self.buffer.get_i64(self.head_position_index)?; let head_index = (head & i64::from(self.capacity - 1)) as i32; let contiguous_block_length = self.capacity - head_index; let mut messages_read = 0; @@ -262,6 +264,9 @@ where // in Rust (since the main operation also needs mutable access to self). let mut cleanup = || { if bytes_read != 0 { + // UNWRAP: Need to justify this one. + // Should be safe because we've already done length checks, but I want + // to spend some more time thinking about it. self.buffer .set_memory(head_index, bytes_read as usize, 0) .unwrap(); @@ -278,6 +283,18 @@ where Ok(messages_read) } + /// Read messages from the ring buffer and dispatch to `handler`, up to `message_count_limit` + /// + /// NOTE: The C++ API will stop reading and clean up if an exception is thrown in the handler + /// function; by contrast, the Rust API makes no attempt to catch panics and currently + /// has no way of stopping reading once started. + pub fn read(&mut self, handler: F) -> Result + where + F: FnMut(i32, &A, IndexT, IndexT) -> (), + { + self.read_n(handler, usize::max_value()) + } + /// Claim capacity for a specific message size in the ring buffer. Returns the offset/index /// at which to start writing the next record. fn claim_capacity(&mut self, required: IndexT) -> Result { @@ -390,10 +407,8 @@ where #[cfg(test)] mod tests { - use crate::client::concurrent::ringbuffer::{record_descriptor, ManyToOneRingBuffer}; + use crate::client::concurrent::ringbuffer::ManyToOneRingBuffer; use crate::client::concurrent::AtomicBuffer; - use crate::util::IndexT; - use std::mem::size_of; const BUFFER_SIZE: usize = 512 + super::buffer_descriptor::TRAILER_LENGTH as usize; @@ -429,56 +444,4 @@ mod tests { let write_start = ring_buf.claim_capacity(16).unwrap(); assert_eq!(write_start, 16); } - - #[test] - fn read_basic() { - // Similar to write basic, put something into the buffer - let mut ring_buffer = - ManyToOneRingBuffer::new(vec![0u8; BUFFER_SIZE]).expect("Invalid buffer size"); - - let source_buffer = &mut [12u8, 0, 0, 0, 0, 0, 0, 0][..]; - let source_len = source_buffer.len() as IndexT; - let type_id = 1; - ring_buffer - .write(type_id, &source_buffer, 0, source_len) - .unwrap(); - - // Now we can start the actual read process - let c = |_, buf: &Vec, offset, _| assert_eq!(buf.get_i64_volatile(offset).unwrap(), 12); - ring_buffer.read(c, 1).unwrap(); - - // Make sure that the buffer was zeroed on finish - for i in (0..record_descriptor::ALIGNMENT * 1).step_by(4) { - assert_eq!(ring_buffer.get_i32_volatile(i).unwrap(), 0); - } - } - - #[test] - fn read_multiple() { - let mut ring_buffer = - ManyToOneRingBuffer::new(vec![0u8; BUFFER_SIZE]).expect("Invalid buffer size"); - - let source_buffer = &mut [12u8, 0, 0, 0, 0, 0, 0, 0][..]; - let source_len = source_buffer.len() as IndexT; - let type_id = 1; - ring_buffer - .write(type_id, &source_buffer, 0, source_len) - .unwrap(); - ring_buffer - .write(type_id, &source_buffer, 0, source_len) - .unwrap(); - - let mut msg_count = 0; - let c = |_, buf: &Vec, offset, _| { - msg_count += 1; - assert_eq!(buf.get_i64_volatile(offset).unwrap(), 12); - }; - ring_buffer.read(c, 2).unwrap(); - assert_eq!(msg_count, 2); - - // Make sure that the buffer was zeroed on finish - for i in (0..record_descriptor::ALIGNMENT * 2).step_by(4) { - assert_eq!(ring_buffer.get_i32_volatile(i).unwrap(), 0); - } - } } diff --git a/aeron-rs/tests/many_to_one_ring_buffer.rs b/aeron-rs/tests/many_to_one_ring_buffer.rs index 482ce9b..58d11ad 100644 --- a/aeron-rs/tests/many_to_one_ring_buffer.rs +++ b/aeron-rs/tests/many_to_one_ring_buffer.rs @@ -198,3 +198,184 @@ fn should_insert_padding_record_plus_message_on_buffer_wrap_with_head_equal_to_t Ok((tail + aligned_record_length + record_descriptor::ALIGNMENT) as i64) ); } + +#[test] +fn should_read_single_message() { + let length: IndexT = 8; + let head: IndexT = 0; + let record_length: IndexT = length + record_descriptor::HEADER_LENGTH; + let aligned_record_length: IndexT = align( + record_length as usize, + record_descriptor::ALIGNMENT as usize, + ) as IndexT; + let tail: IndexT = aligned_record_length; + + let mut buffer = ManyToOneRingBuffer::new(vec![0u8; BUFFER_SZ]).unwrap(); + buffer.put_i64(HEAD_COUNTER_INDEX, head as i64).unwrap(); + buffer.put_i64(TAIL_COUNTER_INDEX, tail as i64).unwrap(); + + buffer + .put_i32(record_descriptor::type_offset(0), MSG_TYPE_ID) + .unwrap(); + buffer + .put_i32(record_descriptor::length_offset(0), record_length) + .unwrap(); + + let mut times_called = 0; + let closure = |_, _buf: &Vec, _, _| { + times_called += 1; + }; + let messages_read = buffer.read(closure); + + assert_eq!(messages_read, Ok(1)); + assert_eq!(times_called, 1); + assert_eq!( + buffer.get_i64(HEAD_COUNTER_INDEX), + Ok((head + aligned_record_length) as i64) + ); + + for i in (0..record_descriptor::ALIGNMENT).step_by(4) { + assert_eq!(buffer.get_i32(i), Ok(0)); + } +} + +#[test] +fn should_not_read_single_message_part_way_through_writing() { + let length: IndexT = 8; + let head: IndexT = 0; + let record_length: IndexT = length + record_descriptor::HEADER_LENGTH; + let aligned_record_length: IndexT = align( + record_length as usize, + record_descriptor::ALIGNMENT as usize, + ) as IndexT; + let end_tail: IndexT = aligned_record_length; + + let mut buffer = ManyToOneRingBuffer::new(vec![0u8; BUFFER_SZ]).unwrap(); + buffer.put_i64(TAIL_COUNTER_INDEX, end_tail as i64).unwrap(); + buffer + .put_i32(record_descriptor::type_offset(0), MSG_TYPE_ID) + .unwrap(); + buffer + .put_i32(record_descriptor::length_offset(0), -record_length) + .unwrap(); + + let mut times_called = 0; + let closure = |_, _buf: &Vec, _, _| { + times_called += 1; + }; + let messages_read = buffer.read(closure); + + assert_eq!(messages_read, Ok(0)); + assert_eq!(times_called, 0); + assert_eq!(buffer.get_i64(HEAD_COUNTER_INDEX), Ok(head as i64)); +} + +#[test] +fn should_read_two_messages() { + let length: IndexT = 8; + let head: IndexT = 0; + let record_length: IndexT = length + record_descriptor::HEADER_LENGTH; + let aligned_record_length: IndexT = align( + record_length as usize, + record_descriptor::ALIGNMENT as usize, + ) as IndexT; + let tail: IndexT = aligned_record_length * 2; + + let mut buffer = ManyToOneRingBuffer::new(vec![0u8; BUFFER_SZ]).unwrap(); + buffer.put_i64(HEAD_COUNTER_INDEX, head as i64).unwrap(); + buffer.put_i64(TAIL_COUNTER_INDEX, tail as i64).unwrap(); + + buffer + .put_i32(record_descriptor::type_offset(0), MSG_TYPE_ID) + .unwrap(); + buffer + .put_i32(record_descriptor::length_offset(0), record_length) + .unwrap(); + + buffer + .put_i32( + record_descriptor::type_offset(0 + aligned_record_length), + MSG_TYPE_ID, + ) + .unwrap(); + buffer + .put_i32( + record_descriptor::length_offset(0 + aligned_record_length), + record_length, + ) + .unwrap(); + + let mut times_called = 0; + let closure = |_, _buf: &Vec, _, _| { + times_called += 1; + }; + let messages_read = buffer.read(closure); + + assert_eq!(messages_read, Ok(2)); + assert_eq!(times_called, 2); + assert_eq!( + buffer.get_i64(HEAD_COUNTER_INDEX), + Ok((head + aligned_record_length * 2) as i64) + ); + + for i in (0..record_descriptor::ALIGNMENT * 2).step_by(4) { + assert_eq!(buffer.get_i32(i), Ok(0)); + } +} + +#[test] +fn should_limit_read_of_messages() { + let length: IndexT = 8; + let head: IndexT = 0; + let record_length: IndexT = length + record_descriptor::HEADER_LENGTH; + let aligned_record_length: IndexT = align( + record_length as usize, + record_descriptor::ALIGNMENT as usize, + ) as IndexT; + let tail: IndexT = aligned_record_length * 2; + + let mut buffer = ManyToOneRingBuffer::new(vec![0u8; BUFFER_SZ]).unwrap(); + buffer.put_i64(HEAD_COUNTER_INDEX, head as i64).unwrap(); + buffer.put_i64(TAIL_COUNTER_INDEX, tail as i64).unwrap(); + + buffer + .put_i32(record_descriptor::type_offset(0), MSG_TYPE_ID) + .unwrap(); + buffer + .put_i32(record_descriptor::length_offset(0), record_length) + .unwrap(); + + buffer + .put_i32( + record_descriptor::type_offset(0 + aligned_record_length), + MSG_TYPE_ID, + ) + .unwrap(); + buffer + .put_i32( + record_descriptor::length_offset(0 + aligned_record_length), + record_length, + ) + .unwrap(); + + let mut times_called = 0; + let closure = |_, _buf: &Vec, _, _| { + times_called += 1; + }; + let messages_read = buffer.read_n(closure, 1); + + assert_eq!(messages_read, Ok(1)); + assert_eq!(times_called, 1); + assert_eq!( + buffer.get_i64(HEAD_COUNTER_INDEX), + Ok((head + aligned_record_length) as i64) + ); + + for i in (0..record_descriptor::ALIGNMENT).step_by(4) { + assert_eq!(buffer.get_i32(i), Ok(0)); + } + assert_eq!( + buffer.get_i32(record_descriptor::length_offset(aligned_record_length)), + Ok(record_length) + ); +}