diff --git a/aeron-rs/src/client/concurrent/atomic_buffer.rs b/aeron-rs/src/client/concurrent/atomic_buffer.rs deleted file mode 100644 index 0fbed58..0000000 --- a/aeron-rs/src/client/concurrent/atomic_buffer.rs +++ /dev/null @@ -1,272 +0,0 @@ -//! Buffer that is safe to use in a multi-process/multi-thread context. Typically used for -//! handling atomic updates of memory-mapped buffers. -use std::mem::size_of; -use std::ops::Deref; -use std::sync::atomic::{AtomicI64, Ordering}; - -use crate::util::{AeronError, IndexT, Result}; -use std::ptr::{read_volatile, write_volatile}; - -/// Wrapper for atomic operations around an underlying byte buffer -pub struct AtomicBuffer<'a> { - buffer: &'a mut [u8], -} - -impl<'a> Deref for AtomicBuffer<'a> { - type Target = [u8]; - - fn deref(&self) -> &Self::Target { - self.buffer - } -} - -impl<'a> AtomicBuffer<'a> { - /// Create an `AtomicBuffer` as a view on an underlying byte slice - pub fn wrap(buffer: &'a mut [u8]) -> Self { - AtomicBuffer { buffer } - } - - fn bounds_check(&self, offset: IndexT, size: IndexT) -> Result<()> { - if offset < 0 || size < 0 || self.buffer.len() as IndexT - offset < size { - Err(AeronError::OutOfBounds) - } else { - Ok(()) - } - } - - /// Overlay a struct on a buffer. - /// - /// NOTE: Has the potential to cause undefined behavior if alignment is incorrect. - pub fn overlay(&self, offset: IndexT) -> Result<&T> - where - T: Sized, - { - self.bounds_check(offset, size_of::() as IndexT) - .map(|_| { - let offset_ptr = unsafe { self.buffer.as_ptr().offset(offset as isize) }; - unsafe { &*(offset_ptr as *const T) } - }) - } - - fn overlay_volatile(&self, offset: IndexT) -> Result - where - T: Copy, - { - self.bounds_check(offset, size_of::() as IndexT) - .map(|_| { - let offset_ptr = unsafe { self.buffer.as_ptr().offset(offset as isize) }; - unsafe { read_volatile(offset_ptr as *const T) } - }) - } - - fn write_volatile(&mut self, offset: IndexT, val: T) -> Result<()> - where - T: Copy, - { - self.bounds_check(offset, size_of::() as IndexT) - .map(|_| { - let offset_ptr = unsafe { self.buffer.as_mut_ptr().offset(offset as isize) }; - unsafe { write_volatile(offset_ptr as *mut T, val) }; - }) - } - - /// Atomically fetch the current value at an offset, and increment by delta - /// - /// ```rust - /// # use aeron_rs::client::concurrent::atomic_buffer::AtomicBuffer; - /// # use aeron_rs::util::AeronError; - /// let mut bytes = [0u8; 9]; - /// let mut buffer = AtomicBuffer::wrap(&mut bytes); - /// - /// // Simple case modifies only the first byte - /// assert_eq!(buffer.get_and_add_i64(0, 1), Ok(0)); - /// assert_eq!(buffer.get_and_add_i64(0, 0), Ok(1)); - /// - /// // Using an offset modifies the second byte - /// assert_eq!(buffer.get_and_add_i64(1, 1), Ok(0)); - /// assert_eq!(buffer.get_and_add_i64(1, 0), Ok(1)); - /// - /// // An offset of 2 means buffer size must be 10 to contain an `i64` - /// assert_eq!(buffer.get_and_add_i64(2, 0), Err(AeronError::OutOfBounds)); - /// ``` - pub fn get_and_add_i64(&self, offset: IndexT, delta: i64) -> Result { - self.overlay::(offset) - .map(|a| a.fetch_add(delta, Ordering::SeqCst)) - } - - /// Perform a volatile read - /// - /// ```rust - /// # use aeron_rs::client::concurrent::atomic_buffer::AtomicBuffer; - /// let mut bytes = [12, 0, 0, 0, 0, 0, 0, 0]; - /// let buffer = AtomicBuffer::wrap(&mut bytes); - /// - /// assert_eq!(buffer.get_i64_volatile(0), Ok(12)); - /// ``` - pub fn get_i64_volatile(&self, offset: IndexT) -> Result { - // QUESTION: Would it be better to express this in terms of an atomic read? - self.overlay_volatile::(offset) - } - - /// Get the current value at an offset without using any synchronization operations - pub fn get_i64(&self, offset: IndexT) -> Result { - self.overlay::(offset).map(|i| *i) - } - - /// Perform a volatile read - /// - /// ```rust - /// # use aeron_rs::client::concurrent::atomic_buffer::AtomicBuffer; - /// let mut bytes = [12, 0, 0, 0]; - /// let buffer = AtomicBuffer::wrap(&mut bytes); - /// - /// assert_eq!(buffer.get_i32_volatile(0), Ok(12)); - /// ``` - pub fn get_i32_volatile(&self, offset: IndexT) -> Result { - self.overlay_volatile::(offset) - } - - /// Get the current value at an offset without using any synchronization operations - pub fn get_i32(&self, offset: IndexT) -> Result { - self.overlay::(offset).map(|i| *i) - } - - /// Perform a volatile write of an `i64` into the buffer - /// - /// ```rust - /// # use aeron_rs::client::concurrent::atomic_buffer::AtomicBuffer; - /// let mut bytes = [0u8; 8]; - /// let mut buffer = AtomicBuffer::wrap(&mut bytes); - /// - /// buffer.put_i64_ordered(0, 12); - /// assert_eq!(buffer.get_i64_volatile(0), Ok(12)); - /// ``` - pub fn put_i64_ordered(&mut self, offset: IndexT, val: i64) -> Result<()> { - // QUESTION: Would it be better to have callers use `write_volatile` directly - self.write_volatile::(offset, val) - } - - /// Perform a volatile write of an `i32` into the buffer - /// - /// ```rust - /// # use aeron_rs::client::concurrent::atomic_buffer::AtomicBuffer; - /// let mut bytes = [0u8; 4]; - /// let mut buffer = AtomicBuffer::wrap(&mut bytes); - /// - /// buffer.put_i32_ordered(0, 12); - /// assert_eq!(buffer.get_i32_volatile(0), Ok(12)); - /// ``` - pub fn put_i32_ordered(&mut self, offset: IndexT, val: i32) -> Result<()> { - // QUESTION: Would it be better to have callers use `write_volatile` directly - self.write_volatile::(offset, val) - } - - /// Write the contents of one buffer to another. Does not perform any synchronization. - /// - /// ```rust - /// # use aeron_rs::client::concurrent::atomic_buffer::AtomicBuffer; - /// let mut source_bytes = [1u8, 2, 3, 4]; - /// let source = AtomicBuffer::wrap(&mut source_bytes); - /// - /// let mut dest_bytes = [0, 0, 0, 0]; - /// let mut dest = AtomicBuffer::wrap(&mut dest_bytes); - /// - /// dest.put_bytes(1, &source, 1, 3); - /// drop(dest); - /// assert_eq!(dest_bytes, [0u8, 2, 3, 4]); - /// ``` - pub fn put_bytes( - &mut self, - index: IndexT, - source: &AtomicBuffer, - source_index: IndexT, - len: IndexT, - ) -> Result<()> { - self.bounds_check(index, len)?; - source.bounds_check(source_index, len)?; - - let index = index as usize; - let source_index = source_index as usize; - let len = len as usize; - self.buffer[index..index + len].copy_from_slice(&source[source_index..source_index + len]); - Ok(()) - } - - /// Compare an expected value with what is in memory, and if it matches, - /// update to a new value. Returns `Ok(true)` if the update was successful, - /// and `Ok(false)` if the update failed. - /// - /// ```rust - /// # use aeron_rs::client::concurrent::atomic_buffer::AtomicBuffer; - /// let mut buf = [0u8; 8]; - /// let atomic_buf = AtomicBuffer::wrap(&mut buf); - /// // Set value to 1 - /// atomic_buf.get_and_add_i64(0, 1).unwrap(); - /// - /// // Set value to 1 if existing value is 0 - /// assert_eq!(atomic_buf.compare_and_set_i64(0, 0, 1), Ok(false)); - /// // Set value to 2 if existing value is 1 - /// assert_eq!(atomic_buf.compare_and_set_i64(0, 1, 2), Ok(true)); - /// assert_eq!(atomic_buf.get_i64_volatile(0), Ok(2)); - /// ``` - pub fn compare_and_set_i64(&self, offset: IndexT, expected: i64, update: i64) -> Result { - // QUESTION: Should I use a volatile read here as well? - // Aeron C++ uses a volatile read before the atomic operation, but I think that - // may be redundant. In addition, Rust's `read_volatile` operation returns a - // *copied* value; running `compare_exchange` on that copy introduces a race condition - // because we're no longer comparing a consistent address. - self.overlay::(offset).map(|a| { - a.compare_exchange(expected, update, Ordering::SeqCst, Ordering::SeqCst) - .is_ok() - }) - } - - /// Repeatedly write a value into an atomic buffer. Guaranteed to use `memset`. - pub fn set_memory(&mut self, offset: IndexT, length: usize, value: u8) -> Result<()> { - self.bounds_check(offset, length as IndexT).map(|_| unsafe { - self.buffer - .as_mut_ptr() - .offset(offset as isize) - .write_bytes(value, length) - }) - } -} - -#[cfg(test)] -mod tests { - use memmap::MmapOptions; - use std::sync::atomic::{AtomicU64, Ordering}; - - use crate::client::concurrent::atomic_buffer::AtomicBuffer; - use crate::util::AeronError; - - #[test] - fn mmap_to_atomic() { - let mut mmap = MmapOptions::new() - .len(24) - .map_anon() - .expect("Unable to map anonymous memory"); - AtomicBuffer::wrap(&mut mmap); - } - - #[test] - fn primitive_atomic_equivalent() { - let value: u64 = 24; - - let val_ptr = &value as *const u64; - let a_ptr = val_ptr as *const AtomicU64; - let a: &AtomicU64 = unsafe { &*a_ptr }; - - assert_eq!(value, (*a).load(Ordering::SeqCst)); - } - - #[test] - fn negative_offset() { - let mut buf = [16, 0, 0, 0, 0, 0, 0, 0]; - let atomic_buf = AtomicBuffer::wrap(&mut buf); - assert_eq!( - atomic_buf.get_and_add_i64(-1, 0), - Err(AeronError::OutOfBounds) - ) - } -} diff --git a/aeron-rs/src/client/concurrent/mod.rs b/aeron-rs/src/client/concurrent/mod.rs index 06eec65..bd8221d 100644 --- a/aeron-rs/src/client/concurrent/mod.rs +++ b/aeron-rs/src/client/concurrent/mod.rs @@ -1,5 +1,229 @@ //! Module for handling safe interactions among the multiple clients making use //! of a single Media Driver -pub mod atomic_buffer; -pub mod ring_buffer; +pub mod ringbuffer; +use std::mem::size_of; +use std::sync::atomic::{AtomicI64, Ordering}; + +use crate::util::{AeronError, IndexT, Result}; +use std::ptr::{read_volatile, write_volatile}; + +use memmap::MmapMut; +use std::ops::{Deref, DerefMut}; + +/// Atomic operations on slices of memory +pub trait AtomicBuffer: Deref + DerefMut { + /// Check that there are at least `size` bytes of memory available + /// beginning at some offset. + /// + /// ```rust + /// # use aeron_rs::client::concurrent::AtomicBuffer; + /// + /// let buffer = &mut [0u8; 8][..]; + /// assert!(buffer.bounds_check(0, 8).is_ok()); + /// assert!(buffer.bounds_check(1, 7).is_ok()); + /// assert!(buffer.bounds_check(1, 8).is_err()); + /// assert!(buffer.bounds_check(-1, 8).is_err()); + /// ``` + fn bounds_check(&self, offset: IndexT, size: IndexT) -> Result<()> { + if offset < 0 || size < 0 || self.deref().len() as IndexT - offset < size { + Err(AeronError::OutOfBounds) + } else { + Ok(()) + } + } + + /// Overlay a struct on a buffer. + /// + /// NOTE: Has the potential to cause undefined behavior if alignment is incorrect. + /// + /// ```rust + /// # use aeron_rs::client::concurrent::AtomicBuffer; + /// # use std::sync::atomic::{AtomicI64, Ordering}; + /// let buffer = &mut [0u8; 9][..]; + /// + /// let my_val: &AtomicI64 = buffer.overlay::(0).unwrap(); + /// assert_eq!(my_val.load(Ordering::SeqCst), 0); + /// + /// my_val.store(1, Ordering::SeqCst); + /// assert_eq!(buffer, [1, 0, 0, 0, 0, 0, 0, 0, 0]); + /// + /// let another_val: &AtomicI64 = buffer.overlay::(1).unwrap(); + /// assert_eq!(another_val.load(Ordering::SeqCst), 0); + /// ``` + fn overlay(&self, offset: IndexT) -> Result<&T> + where + T: Sized, + { + self.bounds_check(offset, size_of::() as IndexT) + .map(|_| { + let offset_ptr = unsafe { self.as_ptr().offset(offset as isize) }; + unsafe { &*(offset_ptr as *const T) } + }) + } + + /// Overlay a struct on a buffer, and perform a volatile read + /// + /// ```rust + /// # use aeron_rs::client::concurrent::AtomicBuffer; + /// let buffer = &mut [5, 0, 0, 0][..]; + /// + /// let my_val: u32 = buffer.overlay_volatile::(0).unwrap(); + /// assert_eq!(my_val, 5); + /// ``` + fn overlay_volatile(&self, offset: IndexT) -> Result + where + T: Copy, + { + self.bounds_check(offset, size_of::() as IndexT) + .map(|_| { + let offset_ptr = unsafe { self.as_ptr().offset(offset as isize) }; + unsafe { read_volatile(offset_ptr as *const T) } + }) + } + + /// Perform a volatile write of a value over a buffer + /// + /// ```rust + /// # use aeron_rs::client::concurrent::AtomicBuffer; + /// let mut buffer = &mut [0, 0, 0, 0][..]; + /// + /// let value: u32 = 24; + /// buffer.write_volatile(0, value).unwrap(); + /// assert_eq!(buffer, [24, 0, 0, 0]); + /// ``` + fn write_volatile(&mut self, offset: IndexT, val: T) -> Result<()> + where + T: Copy, + { + self.bounds_check(offset, size_of::() as IndexT) + .map(|_| { + let offset_ptr = unsafe { self.as_mut_ptr().offset(offset as isize) }; + unsafe { write_volatile(offset_ptr as *mut T, val) }; + }) + } + + /// Perform an atomic fetch and add of a 64-bit value + /// + /// ```rust + /// # use aeron_rs::client::concurrent::AtomicBuffer; + /// let mut buf = vec![0u8; 8]; + /// assert_eq!(buf.get_and_add_i64(0, 1), Ok(0)); + /// assert_eq!(buf.get_and_add_i64(0, 1), Ok(1)); + /// ``` + fn get_and_add_i64(&self, offset: IndexT, value: i64) -> Result { + self.overlay::(offset) + .map(|a| a.fetch_add(value, Ordering::SeqCst)) + } + + /// Perform an atomic Compare-And-Swap of a 64-bit value. Returns `Ok(true)` + /// if the update was successful, and `Ok(false)` if the update failed. + /// + /// ```rust + /// # use aeron_rs::client::concurrent::AtomicBuffer; + /// let mut buf = &mut [0u8; 8][..]; + /// // Set value to 1 + /// buf.get_and_add_i64(0, 1).unwrap(); + /// + /// // Set value to 1 if existing value is 0 + /// assert_eq!(buf.compare_and_set_i64(0, 0, 1), Ok(false)); + /// // Set value to 2 if existing value is 1 + /// assert_eq!(buf.compare_and_set_i64(0, 1, 2), Ok(true)); + /// assert_eq!(buf.get_i64_volatile(0), Ok(2)); + /// ``` + fn compare_and_set_i64(&self, offset: IndexT, expected: i64, update: i64) -> Result { + // QUESTION: Should I use a volatile read here as well? + // Aeron C++ uses a volatile read before the atomic operation, but I think that + // may be redundant. In addition, Rust's `read_volatile` operation returns a + // *copied* value; running `compare_exchange` on that copy introduces a race condition + // because we're no longer comparing a consistent address. + self.overlay::(offset).map(|a| { + a.compare_exchange(expected, update, Ordering::SeqCst, Ordering::SeqCst) + .is_ok() + }) + } + + /// Perform a volatile read of an `i64` value + /// + /// ```rust + /// # use aeron_rs::client::concurrent::AtomicBuffer; + /// let buffer = vec![12u8, 0, 0, 0, 0, 0, 0, 0]; + /// assert_eq!(buffer.get_i64_volatile(0), Ok(12)); + /// ``` + fn get_i64_volatile(&self, offset: IndexT) -> Result { + // QUESTION: Would it be better to express this in terms of an atomic read? + self.overlay_volatile::(offset) + } + + /// Perform a volatile write of an `i64` value + /// + /// ```rust + /// # use aeron_rs::client::concurrent::AtomicBuffer; + /// let mut buffer = vec![0u8; 8]; + /// buffer.put_i64_ordered(0, 12); + /// assert_eq!(buffer.get_i64_volatile(0), Ok(12)); + /// ``` + fn put_i64_ordered(&mut self, offset: IndexT, value: i64) -> Result<()> { + self.write_volatile::(offset, value) + } + + /// Write the contents of one buffer to another. Does not perform any synchronization + fn put_bytes( + &mut self, + index: IndexT, + source: &B, + source_index: IndexT, + len: IndexT, + ) -> Result<()> + where + B: AtomicBuffer, + { + self.bounds_check(index, len)?; + source.bounds_check(source_index, len)?; + + let index = index as usize; + let source_index = source_index as usize; + let len = len as usize; + + self[index..index + len].copy_from_slice(&source[source_index..source_index + len]); + Ok(()) + } + + /// Repeatedly write a value into an atomic buffer. Guaranteed to use `memset`. + fn set_memory(&mut self, offset: IndexT, length: usize, value: u8) -> Result<()> { + self.bounds_check(offset, length as IndexT).map(|_| unsafe { + self.as_mut_ptr() + .offset(offset as isize) + .write_bytes(value, length) + }) + } + + /// Perform a volatile read of an `i32` from the buffer + /// + /// ```rust + /// # use aeron_rs::client::concurrent::AtomicBuffer; + /// let buffer = vec![0, 12, 0, 0, 0]; + /// assert_eq!(buffer.get_i32_volatile(1), Ok(12)); + /// ``` + fn get_i32_volatile(&self, offset: IndexT) -> Result { + self.overlay_volatile::(offset) + } + + /// Perform a volatile write of an `i32` into the buffer + /// + /// ```rust + /// # use aeron_rs::client::concurrent::AtomicBuffer; + /// let mut bytes = vec![0u8; 4]; + /// bytes.put_i32_ordered(0, 12); + /// assert_eq!(bytes.get_i32_volatile(0), Ok(12)); + /// ``` + fn put_i32_ordered(&mut self, offset: IndexT, value: i32) -> Result<()> { + self.write_volatile::(offset, value) + } +} + +impl AtomicBuffer for Vec {} + +impl AtomicBuffer for &mut [u8] {} + +impl AtomicBuffer for MmapMut {} diff --git a/aeron-rs/src/client/concurrent/ring_buffer.rs b/aeron-rs/src/client/concurrent/ringbuffer.rs similarity index 83% rename from aeron-rs/src/client/concurrent/ring_buffer.rs rename to aeron-rs/src/client/concurrent/ringbuffer.rs index 5530a52..4f23130 100644 --- a/aeron-rs/src/client/concurrent/ring_buffer.rs +++ b/aeron-rs/src/client/concurrent/ringbuffer.rs @@ -1,11 +1,12 @@ //! Ring buffer wrapper for communicating with the Media Driver -use crate::client::concurrent::atomic_buffer::AtomicBuffer; +use crate::client::concurrent::AtomicBuffer; use crate::util::bit::align; use crate::util::{bit, AeronError, IndexT, Result}; +use std::ops::Deref; /// Description of the Ring Buffer schema. pub mod buffer_descriptor { - use crate::client::concurrent::atomic_buffer::AtomicBuffer; + use crate::client::concurrent::AtomicBuffer; use crate::util::bit::{is_power_of_two, CACHE_LINE_LENGTH}; use crate::util::AeronError::IllegalArgument; use crate::util::{IndexT, Result}; @@ -30,7 +31,10 @@ pub mod buffer_descriptor { /// Verify the capacity of a buffer is legal for use as a ring buffer. /// Returns the actual capacity excluding ring buffer metadata. - pub fn check_capacity(buffer: &AtomicBuffer<'_>) -> Result { + pub fn check_capacity(buffer: &A) -> Result + where + A: AtomicBuffer, + { let capacity = (buffer.len() - TRAILER_LENGTH as usize) as IndexT; if is_power_of_two(capacity) { Ok(capacity) @@ -107,8 +111,11 @@ pub mod record_descriptor { } /// Multi-producer, single-consumer ring buffer implementation. -pub struct ManyToOneRingBuffer<'a> { - buffer: AtomicBuffer<'a>, +pub struct ManyToOneRingBuffer +where + A: AtomicBuffer, +{ + buffer: A, capacity: IndexT, max_msg_length: IndexT, tail_position_index: IndexT, @@ -117,9 +124,12 @@ pub struct ManyToOneRingBuffer<'a> { correlation_id_counter_index: IndexT, } -impl<'a> ManyToOneRingBuffer<'a> { +impl ManyToOneRingBuffer +where + A: AtomicBuffer, +{ /// Create a many-to-one ring buffer from an underlying atomic buffer. - pub fn wrap(buffer: AtomicBuffer<'a>) -> Result { + pub fn new(buffer: A) -> Result { buffer_descriptor::check_capacity(&buffer).map(|capacity| ManyToOneRingBuffer { buffer, capacity, @@ -141,13 +151,16 @@ impl<'a> ManyToOneRingBuffer<'a> { } /// Write a message into the ring buffer - pub fn write( + pub fn write( &mut self, msg_type_id: i32, - source: &AtomicBuffer, + source: &B, source_index: IndexT, length: IndexT, - ) -> Result<()> { + ) -> Result<()> + where + B: AtomicBuffer, + { record_descriptor::check_msg_type_id(msg_type_id)?; self.check_msg_length(length)?; @@ -182,10 +195,14 @@ impl<'a> ManyToOneRingBuffer<'a> { /// Read messages from the ring buffer and dispatch to `handler`, up to `message_count_limit` pub fn read(&mut self, mut handler: F, message_count_limit: usize) -> Result where - F: FnMut(i32, &AtomicBuffer, IndexT, IndexT) -> (), + F: FnMut(i32, &A, IndexT, IndexT) -> (), { + // QUESTION: Should I implement the `get_i64` method that C++ uses? // UNWRAP: Bounds check performed during buffer creation - let head = self.buffer.get_i64(self.head_position_index).unwrap(); + let head = self + .buffer + .get_i64_volatile(self.head_position_index) + .unwrap(); let head_index = (head & i64::from(self.capacity - 1)) as i32; let contiguous_block_length = self.capacity - head_index; let mut messages_read = 0; @@ -352,22 +369,46 @@ impl<'a> ManyToOneRingBuffer<'a> { } } +impl Deref for ManyToOneRingBuffer +where + A: AtomicBuffer, +{ + type Target = A; + + fn deref(&self) -> &Self::Target { + &self.buffer + } +} + #[cfg(test)] mod tests { - use crate::client::concurrent::atomic_buffer::AtomicBuffer; - use crate::client::concurrent::ring_buffer::{ - buffer_descriptor, record_descriptor, ManyToOneRingBuffer, - }; + use crate::client::concurrent::ringbuffer::{record_descriptor, ManyToOneRingBuffer}; + use crate::client::concurrent::AtomicBuffer; use crate::util::IndexT; use std::mem::size_of; - #[test] - fn claim_capacity_basic() { - let buf_size = super::buffer_descriptor::TRAILER_LENGTH as usize + 64; - let mut buf = vec![0u8; buf_size]; + const BUFFER_SIZE: usize = 512 + super::buffer_descriptor::TRAILER_LENGTH as usize; - let atomic_buf = AtomicBuffer::wrap(&mut buf); - let mut ring_buf = ManyToOneRingBuffer::wrap(atomic_buf).unwrap(); + #[test] + fn claim_capacity_owned() { + let mut ring_buf = ManyToOneRingBuffer::new(vec![0u8; BUFFER_SIZE]).unwrap(); + + ring_buf.claim_capacity(16).unwrap(); + assert_eq!( + ring_buf + .buffer + .get_i64_volatile(ring_buf.tail_position_index), + Ok(16) + ); + + let write_start = ring_buf.claim_capacity(16).unwrap(); + assert_eq!(write_start, 16); + } + + #[test] + fn claim_capacity_shared() { + let buf = &mut [0u8; BUFFER_SIZE][..]; + let mut ring_buf = ManyToOneRingBuffer::new(buf).unwrap(); ring_buf.claim_capacity(16).unwrap(); assert_eq!( @@ -383,27 +424,25 @@ mod tests { #[test] fn write_basic() { - let mut bytes = vec![0u8; 512 + buffer_descriptor::TRAILER_LENGTH as usize]; - let buffer = AtomicBuffer::wrap(&mut bytes); - let mut ring_buffer = ManyToOneRingBuffer::wrap(buffer).expect("Invalid buffer size"); + let mut ring_buffer = + ManyToOneRingBuffer::new(vec![0u8; BUFFER_SIZE]).expect("Invalid buffer size"); - let mut source_bytes = [12, 0, 0, 0, 0, 0, 0, 0]; + let source_bytes = &mut [12u8, 0, 0, 0][..]; let source_len = source_bytes.len() as IndexT; - let source_buffer = AtomicBuffer::wrap(&mut source_bytes); let type_id = 1; ring_buffer - .write(type_id, &source_buffer, 0, source_len) + .write(type_id, &source_bytes, 0, source_len) .unwrap(); - drop(ring_buffer); - let buffer = AtomicBuffer::wrap(&mut bytes); let record_len = source_len + record_descriptor::HEADER_LENGTH; assert_eq!( - buffer.get_i64_volatile(0).unwrap(), + ring_buffer.get_i64_volatile(0).unwrap(), record_descriptor::make_header(record_len, type_id) ); assert_eq!( - buffer.get_i64_volatile(size_of::() as IndexT).unwrap(), + ring_buffer + .get_i64_volatile(size_of::() as IndexT) + .unwrap(), 12 ); } @@ -411,41 +450,33 @@ mod tests { #[test] fn read_basic() { // Similar to write basic, put something into the buffer - let mut bytes = vec![0u8; 512 + buffer_descriptor::TRAILER_LENGTH as usize]; - let buffer = AtomicBuffer::wrap(&mut bytes); - let mut ring_buffer = ManyToOneRingBuffer::wrap(buffer).expect("Invalid buffer size"); + let mut ring_buffer = + ManyToOneRingBuffer::new(vec![0u8; BUFFER_SIZE]).expect("Invalid buffer size"); - let mut source_bytes = [12, 0, 0, 0, 0, 0, 0, 0]; - let source_len = source_bytes.len() as IndexT; - let source_buffer = AtomicBuffer::wrap(&mut source_bytes); + let mut source_buffer = &mut [12u8, 0, 0, 0, 0, 0, 0, 0][..]; + let source_len = source_buffer.len() as IndexT; let type_id = 1; ring_buffer .write(type_id, &source_buffer, 0, source_len) .unwrap(); // Now we can start the actual read process - let c = |_, buf: &AtomicBuffer, offset, _| { - assert_eq!(buf.get_i64_volatile(offset).unwrap(), 12) - }; + let c = |_, buf: &Vec, offset, _| assert_eq!(buf.get_i64_volatile(offset).unwrap(), 12); ring_buffer.read(c, 1).unwrap(); // Make sure that the buffer was zeroed on finish - drop(ring_buffer); - let buffer = AtomicBuffer::wrap(&mut bytes); for i in (0..record_descriptor::ALIGNMENT * 1).step_by(4) { - assert_eq!(buffer.get_i32(i).unwrap(), 0); + assert_eq!(ring_buffer.get_i32_volatile(i).unwrap(), 0); } } #[test] fn read_multiple() { - let mut bytes = vec![0u8; 512 + buffer_descriptor::TRAILER_LENGTH as usize]; - let buffer = AtomicBuffer::wrap(&mut bytes); - let mut ring_buffer = ManyToOneRingBuffer::wrap(buffer).expect("Invalid buffer size"); + let mut ring_buffer = + ManyToOneRingBuffer::new(vec![0u8; BUFFER_SIZE]).expect("Invalid buffer size"); - let mut source_bytes = [12, 0, 0, 0, 0, 0, 0, 0]; - let source_len = source_bytes.len() as IndexT; - let source_buffer = AtomicBuffer::wrap(&mut source_bytes); + let mut source_buffer = &mut [12u8, 0, 0, 0, 0, 0, 0, 0][..]; + let source_len = source_buffer.len() as IndexT; let type_id = 1; ring_buffer .write(type_id, &source_buffer, 0, source_len) @@ -455,7 +486,7 @@ mod tests { .unwrap(); let mut msg_count = 0; - let c = |_, buf: &AtomicBuffer, offset, _| { + let c = |_, buf: &Vec, offset, _| { msg_count += 1; assert_eq!(buf.get_i64_volatile(offset).unwrap(), 12); }; @@ -463,10 +494,8 @@ mod tests { assert_eq!(msg_count, 2); // Make sure that the buffer was zeroed on finish - drop(ring_buffer); - let buffer = AtomicBuffer::wrap(&mut bytes); for i in (0..record_descriptor::ALIGNMENT * 2).step_by(4) { - assert_eq!(buffer.get_i32(i).unwrap(), 0); + assert_eq!(ring_buffer.get_i32_volatile(i).unwrap(), 0); } } } diff --git a/aeron-rs/src/client/driver_proxy.rs b/aeron-rs/src/client/driver_proxy.rs deleted file mode 100644 index b1a84e4..0000000 --- a/aeron-rs/src/client/driver_proxy.rs +++ /dev/null @@ -1,12 +0,0 @@ -//! Proxy object for interacting with the Media Driver. Handles operations -//! involving the command-and-control file protocol. - -use crate::client::concurrent::ring_buffer::ManyToOneRingBuffer; - -/// Proxy object for operations involving the Media Driver -pub struct DriverProxy<'a> { - _to_driver: ManyToOneRingBuffer<'a>, - _client_id: i64, -} - -impl<'a> DriverProxy<'a> {} diff --git a/aeron-rs/src/client/mod.rs b/aeron-rs/src/client/mod.rs index b00370e..32093de 100644 --- a/aeron-rs/src/client/mod.rs +++ b/aeron-rs/src/client/mod.rs @@ -4,4 +4,3 @@ pub mod cnc_descriptor; pub mod concurrent; pub mod context; -pub mod driver_proxy; diff --git a/aeron-rs/tests/cnc_terminate.rs b/aeron-rs/tests/cnc_terminate.rs index 73728b5..2e67bd5 100644 --- a/aeron-rs/tests/cnc_terminate.rs +++ b/aeron-rs/tests/cnc_terminate.rs @@ -1,7 +1,8 @@ use aeron_driver_sys::*; use aeron_rs::client::cnc_descriptor; -use aeron_rs::client::concurrent::atomic_buffer::AtomicBuffer; -use aeron_rs::client::concurrent::ring_buffer::ManyToOneRingBuffer; +use aeron_rs::client::cnc_descriptor::MetaDataDefinition; +use aeron_rs::client::concurrent::ringbuffer::ManyToOneRingBuffer; +use aeron_rs::client::concurrent::AtomicBuffer; use aeron_rs::util::IndexT; use memmap::MmapOptions; use std::ffi::{c_void, CString}; @@ -108,30 +109,29 @@ fn cnc_terminate() { let cnc_metadata_len = cnc_descriptor::META_DATA_LENGTH; // Read metadata to get buffer length - let buffer_len = { - let atomic_buffer = AtomicBuffer::wrap(&mut mmap); - let metadata = atomic_buffer - .overlay::(0) - .unwrap(); - metadata.to_driver_buffer_length - }; + let buffer_len = mmap + .overlay::(0) + .unwrap() + .to_driver_buffer_length; let buffer_end = cnc_metadata_len + buffer_len as usize; - let atomic_buffer = AtomicBuffer::wrap(&mut mmap[cnc_metadata_len..buffer_end]); - let mut ring_buffer = - ManyToOneRingBuffer::wrap(atomic_buffer).expect("Improperly sized buffer"); + let mut ring_buffer = ManyToOneRingBuffer::new(&mut mmap[cnc_metadata_len..buffer_end]) + .expect("Improperly sized buffer"); // 20 bytes: Client ID (8), correlation ID (8), token length (4) let mut terminate_bytes = vec![0u8; 20]; - let terminate_len = terminate_bytes.len(); - let mut source_buffer = AtomicBuffer::wrap(&mut terminate_bytes); let client_id = ring_buffer.next_correlation_id(); - source_buffer.put_i64_ordered(0, client_id).unwrap(); - source_buffer.put_i64_ordered(8, -1).unwrap(); + terminate_bytes.put_i64_ordered(0, client_id).unwrap(); + terminate_bytes.put_i64_ordered(8, -1).unwrap(); let term_id: i32 = 0x0E; ring_buffer - .write(term_id, &source_buffer, 0, terminate_len as IndexT) + .write( + term_id, + &terminate_bytes, + 0, + terminate_bytes.len() as IndexT, + ) .unwrap(); // Wait for the driver to finish