diff --git a/aeron-rs/src/client/concurrent/mod.rs b/aeron-rs/src/client/concurrent/mod.rs index 679163e..8839f0e 100644 --- a/aeron-rs/src/client/concurrent/mod.rs +++ b/aeron-rs/src/client/concurrent/mod.rs @@ -188,6 +188,13 @@ pub trait AtomicBuffer: Deref + DerefMut { Ok(()) } + /// Repeatedly write a value into an atomic buffer. Guaranteed to use `memset`. + fn set_memory(&mut self, offset: IndexT, length: usize, value: u8) -> Result<()> { + self.bounds_check(offset, length as IndexT).map(|_| unsafe { + self.as_mut_ptr().offset(offset as isize).write_bytes(value, length) + }) + } + /// Perform a volatile read of an `i32` from the buffer /// /// ```rust diff --git a/aeron-rs/src/client/concurrent/ringbuffer.rs b/aeron-rs/src/client/concurrent/ringbuffer.rs index 016dcdc..3fa62a1 100644 --- a/aeron-rs/src/client/concurrent/ringbuffer.rs +++ b/aeron-rs/src/client/concurrent/ringbuffer.rs @@ -192,14 +192,14 @@ where Ok(()) } - /* /// Read messages from the ring buffer and dispatch to `handler`, up to `message_count_limit` pub fn read(&mut self, mut handler: F, message_count_limit: usize) -> Result where F: FnMut(i32, &A, IndexT, IndexT) -> (), { + // QUESTION: Should I implement the `get_i64` method that C++ uses? // UNWRAP: Bounds check performed during buffer creation - let head = self.buffer.get_i64(self.head_position_index).unwrap(); + let head = self.buffer.get_i64_volatile(self.head_position_index).unwrap(); let head_index = (head & i64::from(self.capacity - 1)) as i32; let contiguous_block_length = self.capacity - head_index; let mut messages_read = 0; @@ -257,7 +257,6 @@ where Ok(messages_read) } - */ /// Claim capacity for a specific message size in the ring buffer. Returns the offset/index /// at which to start writing the next record. @@ -381,7 +380,7 @@ where #[cfg(test)] mod tests { use crate::client::concurrent::ringbuffer::{ - buffer_descriptor, record_descriptor, ManyToOneRingBuffer, + record_descriptor, ManyToOneRingBuffer, }; use crate::client::concurrent::AtomicBuffer; use crate::util::IndexT; @@ -447,43 +446,36 @@ mod tests { ); } - /* #[test] fn read_basic() { // Similar to write basic, put something into the buffer - let mut bytes = vec![0u8; 512 + buffer_descriptor::TRAILER_LENGTH as usize]; - let buffer = AtomicBuffer::wrap(&mut bytes); - let mut ring_buffer = ManyToOneRingBuffer::wrap(buffer).expect("Invalid buffer size"); + let mut ring_buffer = ManyToOneRingBuffer::new(vec![0u8; BUFFER_SIZE]).expect("Invalid buffer size"); let mut source_buffer = &mut [12u8, 0, 0, 0, 0, 0, 0, 0][..]; - let source_len = source_bytes.len() as IndexT; + let source_len = source_buffer.len() as IndexT; let type_id = 1; ring_buffer .write(type_id, &source_buffer, 0, source_len) .unwrap(); // Now we can start the actual read process - let c = |_, buf: &dyn AtomicBuffer, offset, _| { + let c = |_, buf: &Vec, offset, _| { assert_eq!(buf.get_i64_volatile(offset).unwrap(), 12) }; ring_buffer.read(c, 1).unwrap(); // Make sure that the buffer was zeroed on finish - drop(ring_buffer); - let buffer = AtomicBuffer::wrap(&mut bytes); for i in (0..record_descriptor::ALIGNMENT * 1).step_by(4) { - assert_eq!(buffer.get_i32(i).unwrap(), 0); + assert_eq!(ring_buffer.get_i32_volatile(i).unwrap(), 0); } } #[test] fn read_multiple() { - let mut bytes = vec![0u8; 512 + buffer_descriptor::TRAILER_LENGTH as usize]; - let buffer = AtomicBuffer::wrap(&mut bytes); - let mut ring_buffer = ManyToOneRingBuffer::wrap(buffer).expect("Invalid buffer size"); + let mut ring_buffer = ManyToOneRingBuffer::new(vec![0u8; BUFFER_SIZE]).expect("Invalid buffer size"); let mut source_buffer = &mut [12u8, 0, 0, 0, 0, 0, 0, 0][..]; - let source_len = source_bytes.len() as IndexT; + let source_len = source_buffer.len() as IndexT; let type_id = 1; ring_buffer .write(type_id, &source_buffer, 0, source_len) @@ -493,7 +485,7 @@ mod tests { .unwrap(); let mut msg_count = 0; - let c = |_, buf: &dyn AtomicBuffer, offset, _| { + let c = |_, buf: &Vec, offset, _| { msg_count += 1; assert_eq!(buf.get_i64_volatile(offset).unwrap(), 12); }; @@ -501,11 +493,8 @@ mod tests { assert_eq!(msg_count, 2); // Make sure that the buffer was zeroed on finish - drop(ring_buffer); - let buffer = AtomicBuffer::wrap(&mut bytes); for i in (0..record_descriptor::ALIGNMENT * 2).step_by(4) { - assert_eq!(buffer.get_i32(i).unwrap(), 0); + assert_eq!(ring_buffer.get_i32_volatile(i).unwrap(), 0); } } - */ }