diff --git a/aeron-rs/src/client/concurrent/atomic_buffer.rs b/aeron-rs/src/client/concurrent/atomic_buffer.rs index f96b929..0fbed58 100644 --- a/aeron-rs/src/client/concurrent/atomic_buffer.rs +++ b/aeron-rs/src/client/concurrent/atomic_buffer.rs @@ -108,6 +108,11 @@ impl<'a> AtomicBuffer<'a> { self.overlay_volatile::(offset) } + /// Get the current value at an offset without using any synchronization operations + pub fn get_i64(&self, offset: IndexT) -> Result { + self.overlay::(offset).map(|i| *i) + } + /// Perform a volatile read /// /// ```rust @@ -121,6 +126,11 @@ impl<'a> AtomicBuffer<'a> { self.overlay_volatile::(offset) } + /// Get the current value at an offset without using any synchronization operations + pub fn get_i32(&self, offset: IndexT) -> Result { + self.overlay::(offset).map(|i| *i) + } + /// Perform a volatile write of an `i64` into the buffer /// /// ```rust @@ -200,7 +210,7 @@ impl<'a> AtomicBuffer<'a> { /// assert_eq!(atomic_buf.get_i64_volatile(0), Ok(2)); /// ``` pub fn compare_and_set_i64(&self, offset: IndexT, expected: i64, update: i64) -> Result { - // QUESTION: Do I need a volatile and atomic read here? + // QUESTION: Should I use a volatile read here as well? // Aeron C++ uses a volatile read before the atomic operation, but I think that // may be redundant. In addition, Rust's `read_volatile` operation returns a // *copied* value; running `compare_exchange` on that copy introduces a race condition @@ -210,6 +220,16 @@ impl<'a> AtomicBuffer<'a> { .is_ok() }) } + + /// Repeatedly write a value into an atomic buffer. Guaranteed to use `memset`. + pub fn set_memory(&mut self, offset: IndexT, length: usize, value: u8) -> Result<()> { + self.bounds_check(offset, length as IndexT).map(|_| unsafe { + self.buffer + .as_mut_ptr() + .offset(offset as isize) + .write_bytes(value, length) + }) + } } #[cfg(test)] diff --git a/aeron-rs/src/client/concurrent/ring_buffer.rs b/aeron-rs/src/client/concurrent/ring_buffer.rs index 6e42ca5..5530a52 100644 --- a/aeron-rs/src/client/concurrent/ring_buffer.rs +++ b/aeron-rs/src/client/concurrent/ring_buffer.rs @@ -1,5 +1,6 @@ //! Ring buffer wrapper for communicating with the Media Driver use crate::client::concurrent::atomic_buffer::AtomicBuffer; +use crate::util::bit::align; use crate::util::{bit, AeronError, IndexT, Result}; /// Description of the Ring Buffer schema. @@ -74,15 +75,13 @@ pub mod record_descriptor { /// and is not yet ready for processing. pub const PADDING_MSG_TYPE_ID: i32 = -1; - /// Retrieve the header bits for a ring buffer record. - pub fn make_header(length: i32, msg_type_id: i32) -> i64 { + pub(super) fn make_header(length: i32, msg_type_id: i32) -> i64 { // QUESTION: Instead of masking, can't we just cast and return u32/u64? // Smells like Java. ((i64::from(msg_type_id) & 0xFFFF_FFFF) << 32) | (i64::from(length) & 0xFFFF_FFFF) } - /// Verify a message type identifier is safe for use - pub fn check_msg_type_id(msg_type_id: i32) -> Result<()> { + pub(super) fn check_msg_type_id(msg_type_id: i32) -> Result<()> { if msg_type_id < 1 { Err(AeronError::IllegalArgument) } else { @@ -90,15 +89,21 @@ pub mod record_descriptor { } } - /// Fetch the offset to begin writing a message payload - pub fn encoded_msg_offset(record_offset: IndexT) -> IndexT { + pub(super) fn encoded_msg_offset(record_offset: IndexT) -> IndexT { record_offset + HEADER_LENGTH } - /// Fetch the offset to begin writing the message length - pub fn length_offset(record_offset: IndexT) -> IndexT { + pub(super) fn length_offset(record_offset: IndexT) -> IndexT { record_offset } + + pub(super) fn record_length(header: i64) -> i32 { + header as i32 + } + + pub(super) fn message_type_id(header: i64) -> i32 { + (header >> 32) as i32 + } } /// Multi-producer, single-consumer ring buffer implementation. @@ -174,6 +179,71 @@ impl<'a> ManyToOneRingBuffer<'a> { Ok(()) } + /// Read messages from the ring buffer and dispatch to `handler`, up to `message_count_limit` + pub fn read(&mut self, mut handler: F, message_count_limit: usize) -> Result + where + F: FnMut(i32, &AtomicBuffer, IndexT, IndexT) -> (), + { + // UNWRAP: Bounds check performed during buffer creation + let head = self.buffer.get_i64(self.head_position_index).unwrap(); + let head_index = (head & i64::from(self.capacity - 1)) as i32; + let contiguous_block_length = self.capacity - head_index; + let mut messages_read = 0; + let mut bytes_read: i32 = 0; + + let result: Result<()> = (|| { + while bytes_read < contiguous_block_length && messages_read < message_count_limit { + let record_index = head_index + bytes_read; + let header = self.buffer.get_i64_volatile(record_index)?; + let record_length = record_descriptor::record_length(header); + + if record_length <= 0 { + break; + } + + bytes_read += align( + record_length as usize, + record_descriptor::ALIGNMENT as usize, + ) as i32; + + let msg_type_id = record_descriptor::message_type_id(header); + if msg_type_id == record_descriptor::PADDING_MSG_TYPE_ID { + // QUESTION: Is this a spinlock on a writer finishing? + continue; + } + + messages_read += 1; + handler( + msg_type_id, + &self.buffer, + record_descriptor::encoded_msg_offset(record_index), + record_length - record_descriptor::HEADER_LENGTH, + ); + } + Ok(()) + })(); + + // C++ has much better semantics for handling cleanup like this; however, because + // it would require us to capture a mutable reference to self, it's not feasible + // in Rust (since the main operation also needs mutable access to self). + let mut cleanup = || { + if bytes_read != 0 { + self.buffer + .set_memory(head_index, bytes_read as usize, 0) + .unwrap(); + self.buffer + .put_i64_ordered(self.head_position_index, head + i64::from(bytes_read)) + .unwrap(); + } + }; + result.map(|_| cleanup()).map_err(|e| { + cleanup(); + e + })?; + + Ok(messages_read) + } + /// Claim capacity for a specific message size in the ring buffer. Returns the offset/index /// at which to start writing the next record. fn claim_capacity(&mut self, required: IndexT) -> Result { @@ -337,4 +407,66 @@ mod tests { 12 ); } + + #[test] + fn read_basic() { + // Similar to write basic, put something into the buffer + let mut bytes = vec![0u8; 512 + buffer_descriptor::TRAILER_LENGTH as usize]; + let buffer = AtomicBuffer::wrap(&mut bytes); + let mut ring_buffer = ManyToOneRingBuffer::wrap(buffer).expect("Invalid buffer size"); + + let mut source_bytes = [12, 0, 0, 0, 0, 0, 0, 0]; + let source_len = source_bytes.len() as IndexT; + let source_buffer = AtomicBuffer::wrap(&mut source_bytes); + let type_id = 1; + ring_buffer + .write(type_id, &source_buffer, 0, source_len) + .unwrap(); + + // Now we can start the actual read process + let c = |_, buf: &AtomicBuffer, offset, _| { + assert_eq!(buf.get_i64_volatile(offset).unwrap(), 12) + }; + ring_buffer.read(c, 1).unwrap(); + + // Make sure that the buffer was zeroed on finish + drop(ring_buffer); + let buffer = AtomicBuffer::wrap(&mut bytes); + for i in (0..record_descriptor::ALIGNMENT * 1).step_by(4) { + assert_eq!(buffer.get_i32(i).unwrap(), 0); + } + } + + #[test] + fn read_multiple() { + let mut bytes = vec![0u8; 512 + buffer_descriptor::TRAILER_LENGTH as usize]; + let buffer = AtomicBuffer::wrap(&mut bytes); + let mut ring_buffer = ManyToOneRingBuffer::wrap(buffer).expect("Invalid buffer size"); + + let mut source_bytes = [12, 0, 0, 0, 0, 0, 0, 0]; + let source_len = source_bytes.len() as IndexT; + let source_buffer = AtomicBuffer::wrap(&mut source_bytes); + let type_id = 1; + ring_buffer + .write(type_id, &source_buffer, 0, source_len) + .unwrap(); + ring_buffer + .write(type_id, &source_buffer, 0, source_len) + .unwrap(); + + let mut msg_count = 0; + let c = |_, buf: &AtomicBuffer, offset, _| { + msg_count += 1; + assert_eq!(buf.get_i64_volatile(offset).unwrap(), 12); + }; + ring_buffer.read(c, 2).unwrap(); + assert_eq!(msg_count, 2); + + // Make sure that the buffer was zeroed on finish + drop(ring_buffer); + let buffer = AtomicBuffer::wrap(&mut bytes); + for i in (0..record_descriptor::ALIGNMENT * 2).step_by(4) { + assert_eq!(buffer.get_i32(i).unwrap(), 0); + } + } }