diff --git a/aeron-rs/src/client/concurrent/atomic_buffer.rs b/aeron-rs/src/client/concurrent/atomic_buffer.rs deleted file mode 100644 index 0fbed58..0000000 --- a/aeron-rs/src/client/concurrent/atomic_buffer.rs +++ /dev/null @@ -1,272 +0,0 @@ -//! Buffer that is safe to use in a multi-process/multi-thread context. Typically used for -//! handling atomic updates of memory-mapped buffers. -use std::mem::size_of; -use std::ops::Deref; -use std::sync::atomic::{AtomicI64, Ordering}; - -use crate::util::{AeronError, IndexT, Result}; -use std::ptr::{read_volatile, write_volatile}; - -/// Wrapper for atomic operations around an underlying byte buffer -pub struct AtomicBuffer<'a> { - buffer: &'a mut [u8], -} - -impl<'a> Deref for AtomicBuffer<'a> { - type Target = [u8]; - - fn deref(&self) -> &Self::Target { - self.buffer - } -} - -impl<'a> AtomicBuffer<'a> { - /// Create an `AtomicBuffer` as a view on an underlying byte slice - pub fn wrap(buffer: &'a mut [u8]) -> Self { - AtomicBuffer { buffer } - } - - fn bounds_check(&self, offset: IndexT, size: IndexT) -> Result<()> { - if offset < 0 || size < 0 || self.buffer.len() as IndexT - offset < size { - Err(AeronError::OutOfBounds) - } else { - Ok(()) - } - } - - /// Overlay a struct on a buffer. - /// - /// NOTE: Has the potential to cause undefined behavior if alignment is incorrect. - pub fn overlay(&self, offset: IndexT) -> Result<&T> - where - T: Sized, - { - self.bounds_check(offset, size_of::() as IndexT) - .map(|_| { - let offset_ptr = unsafe { self.buffer.as_ptr().offset(offset as isize) }; - unsafe { &*(offset_ptr as *const T) } - }) - } - - fn overlay_volatile(&self, offset: IndexT) -> Result - where - T: Copy, - { - self.bounds_check(offset, size_of::() as IndexT) - .map(|_| { - let offset_ptr = unsafe { self.buffer.as_ptr().offset(offset as isize) }; - unsafe { read_volatile(offset_ptr as *const T) } - }) - } - - fn write_volatile(&mut self, offset: IndexT, val: T) -> Result<()> - where - T: Copy, - { - self.bounds_check(offset, size_of::() as IndexT) - .map(|_| { - let offset_ptr = unsafe { self.buffer.as_mut_ptr().offset(offset as isize) }; - unsafe { write_volatile(offset_ptr as *mut T, val) }; - }) - } - - /// Atomically fetch the current value at an offset, and increment by delta - /// - /// ```rust - /// # use aeron_rs::client::concurrent::atomic_buffer::AtomicBuffer; - /// # use aeron_rs::util::AeronError; - /// let mut bytes = [0u8; 9]; - /// let mut buffer = AtomicBuffer::wrap(&mut bytes); - /// - /// // Simple case modifies only the first byte - /// assert_eq!(buffer.get_and_add_i64(0, 1), Ok(0)); - /// assert_eq!(buffer.get_and_add_i64(0, 0), Ok(1)); - /// - /// // Using an offset modifies the second byte - /// assert_eq!(buffer.get_and_add_i64(1, 1), Ok(0)); - /// assert_eq!(buffer.get_and_add_i64(1, 0), Ok(1)); - /// - /// // An offset of 2 means buffer size must be 10 to contain an `i64` - /// assert_eq!(buffer.get_and_add_i64(2, 0), Err(AeronError::OutOfBounds)); - /// ``` - pub fn get_and_add_i64(&self, offset: IndexT, delta: i64) -> Result { - self.overlay::(offset) - .map(|a| a.fetch_add(delta, Ordering::SeqCst)) - } - - /// Perform a volatile read - /// - /// ```rust - /// # use aeron_rs::client::concurrent::atomic_buffer::AtomicBuffer; - /// let mut bytes = [12, 0, 0, 0, 0, 0, 0, 0]; - /// let buffer = AtomicBuffer::wrap(&mut bytes); - /// - /// assert_eq!(buffer.get_i64_volatile(0), Ok(12)); - /// ``` - pub fn get_i64_volatile(&self, offset: IndexT) -> Result { - // QUESTION: Would it be better to express this in terms of an atomic read? - self.overlay_volatile::(offset) - } - - /// Get the current value at an offset without using any synchronization operations - pub fn get_i64(&self, offset: IndexT) -> Result { - self.overlay::(offset).map(|i| *i) - } - - /// Perform a volatile read - /// - /// ```rust - /// # use aeron_rs::client::concurrent::atomic_buffer::AtomicBuffer; - /// let mut bytes = [12, 0, 0, 0]; - /// let buffer = AtomicBuffer::wrap(&mut bytes); - /// - /// assert_eq!(buffer.get_i32_volatile(0), Ok(12)); - /// ``` - pub fn get_i32_volatile(&self, offset: IndexT) -> Result { - self.overlay_volatile::(offset) - } - - /// Get the current value at an offset without using any synchronization operations - pub fn get_i32(&self, offset: IndexT) -> Result { - self.overlay::(offset).map(|i| *i) - } - - /// Perform a volatile write of an `i64` into the buffer - /// - /// ```rust - /// # use aeron_rs::client::concurrent::atomic_buffer::AtomicBuffer; - /// let mut bytes = [0u8; 8]; - /// let mut buffer = AtomicBuffer::wrap(&mut bytes); - /// - /// buffer.put_i64_ordered(0, 12); - /// assert_eq!(buffer.get_i64_volatile(0), Ok(12)); - /// ``` - pub fn put_i64_ordered(&mut self, offset: IndexT, val: i64) -> Result<()> { - // QUESTION: Would it be better to have callers use `write_volatile` directly - self.write_volatile::(offset, val) - } - - /// Perform a volatile write of an `i32` into the buffer - /// - /// ```rust - /// # use aeron_rs::client::concurrent::atomic_buffer::AtomicBuffer; - /// let mut bytes = [0u8; 4]; - /// let mut buffer = AtomicBuffer::wrap(&mut bytes); - /// - /// buffer.put_i32_ordered(0, 12); - /// assert_eq!(buffer.get_i32_volatile(0), Ok(12)); - /// ``` - pub fn put_i32_ordered(&mut self, offset: IndexT, val: i32) -> Result<()> { - // QUESTION: Would it be better to have callers use `write_volatile` directly - self.write_volatile::(offset, val) - } - - /// Write the contents of one buffer to another. Does not perform any synchronization. - /// - /// ```rust - /// # use aeron_rs::client::concurrent::atomic_buffer::AtomicBuffer; - /// let mut source_bytes = [1u8, 2, 3, 4]; - /// let source = AtomicBuffer::wrap(&mut source_bytes); - /// - /// let mut dest_bytes = [0, 0, 0, 0]; - /// let mut dest = AtomicBuffer::wrap(&mut dest_bytes); - /// - /// dest.put_bytes(1, &source, 1, 3); - /// drop(dest); - /// assert_eq!(dest_bytes, [0u8, 2, 3, 4]); - /// ``` - pub fn put_bytes( - &mut self, - index: IndexT, - source: &AtomicBuffer, - source_index: IndexT, - len: IndexT, - ) -> Result<()> { - self.bounds_check(index, len)?; - source.bounds_check(source_index, len)?; - - let index = index as usize; - let source_index = source_index as usize; - let len = len as usize; - self.buffer[index..index + len].copy_from_slice(&source[source_index..source_index + len]); - Ok(()) - } - - /// Compare an expected value with what is in memory, and if it matches, - /// update to a new value. Returns `Ok(true)` if the update was successful, - /// and `Ok(false)` if the update failed. - /// - /// ```rust - /// # use aeron_rs::client::concurrent::atomic_buffer::AtomicBuffer; - /// let mut buf = [0u8; 8]; - /// let atomic_buf = AtomicBuffer::wrap(&mut buf); - /// // Set value to 1 - /// atomic_buf.get_and_add_i64(0, 1).unwrap(); - /// - /// // Set value to 1 if existing value is 0 - /// assert_eq!(atomic_buf.compare_and_set_i64(0, 0, 1), Ok(false)); - /// // Set value to 2 if existing value is 1 - /// assert_eq!(atomic_buf.compare_and_set_i64(0, 1, 2), Ok(true)); - /// assert_eq!(atomic_buf.get_i64_volatile(0), Ok(2)); - /// ``` - pub fn compare_and_set_i64(&self, offset: IndexT, expected: i64, update: i64) -> Result { - // QUESTION: Should I use a volatile read here as well? - // Aeron C++ uses a volatile read before the atomic operation, but I think that - // may be redundant. In addition, Rust's `read_volatile` operation returns a - // *copied* value; running `compare_exchange` on that copy introduces a race condition - // because we're no longer comparing a consistent address. - self.overlay::(offset).map(|a| { - a.compare_exchange(expected, update, Ordering::SeqCst, Ordering::SeqCst) - .is_ok() - }) - } - - /// Repeatedly write a value into an atomic buffer. Guaranteed to use `memset`. - pub fn set_memory(&mut self, offset: IndexT, length: usize, value: u8) -> Result<()> { - self.bounds_check(offset, length as IndexT).map(|_| unsafe { - self.buffer - .as_mut_ptr() - .offset(offset as isize) - .write_bytes(value, length) - }) - } -} - -#[cfg(test)] -mod tests { - use memmap::MmapOptions; - use std::sync::atomic::{AtomicU64, Ordering}; - - use crate::client::concurrent::atomic_buffer::AtomicBuffer; - use crate::util::AeronError; - - #[test] - fn mmap_to_atomic() { - let mut mmap = MmapOptions::new() - .len(24) - .map_anon() - .expect("Unable to map anonymous memory"); - AtomicBuffer::wrap(&mut mmap); - } - - #[test] - fn primitive_atomic_equivalent() { - let value: u64 = 24; - - let val_ptr = &value as *const u64; - let a_ptr = val_ptr as *const AtomicU64; - let a: &AtomicU64 = unsafe { &*a_ptr }; - - assert_eq!(value, (*a).load(Ordering::SeqCst)); - } - - #[test] - fn negative_offset() { - let mut buf = [16, 0, 0, 0, 0, 0, 0, 0]; - let atomic_buf = AtomicBuffer::wrap(&mut buf); - assert_eq!( - atomic_buf.get_and_add_i64(-1, 0), - Err(AeronError::OutOfBounds) - ) - } -} diff --git a/aeron-rs/src/client/concurrent/mod.rs b/aeron-rs/src/client/concurrent/mod.rs index 06eec65..67a7e90 100644 --- a/aeron-rs/src/client/concurrent/mod.rs +++ b/aeron-rs/src/client/concurrent/mod.rs @@ -1,5 +1,274 @@ //! Module for handling safe interactions among the multiple clients making use //! of a single Media Driver -pub mod atomic_buffer; -pub mod ring_buffer; +pub mod ringbuffer; +use std::mem::size_of; +use std::ops::Deref; +use std::sync::atomic::{AtomicI64, Ordering}; + +use crate::util::{AeronError, IndexT, Result}; +use std::ptr::{read_volatile, write_volatile}; + +/// Wrapper for atomic operations around an underlying byte buffer +pub struct AtomicBuffer<'a> { + buffer: &'a mut [u8], +} + +impl<'a> Deref for AtomicBuffer<'a> { + type Target = [u8]; + + fn deref(&self) -> &Self::Target { + self.buffer + } +} + +impl<'a> AtomicBuffer<'a> { + /// Create an `AtomicBuffer` as a view on an underlying byte slice + pub fn wrap(buffer: &'a mut [u8]) -> Self { + AtomicBuffer { buffer } + } + + fn bounds_check(&self, offset: IndexT, size: IndexT) -> Result<()> { + if offset < 0 || size < 0 || self.buffer.len() as IndexT - offset < size { + Err(AeronError::OutOfBounds) + } else { + Ok(()) + } + } + + /// Overlay a struct on a buffer. + /// + /// NOTE: Has the potential to cause undefined behavior if alignment is incorrect. + pub fn overlay(&self, offset: IndexT) -> Result<&T> + where + T: Sized, + { + self.bounds_check(offset, size_of::() as IndexT) + .map(|_| { + let offset_ptr = unsafe { self.buffer.as_ptr().offset(offset as isize) }; + unsafe { &*(offset_ptr as *const T) } + }) + } + + fn overlay_volatile(&self, offset: IndexT) -> Result + where + T: Copy, + { + self.bounds_check(offset, size_of::() as IndexT) + .map(|_| { + let offset_ptr = unsafe { self.buffer.as_ptr().offset(offset as isize) }; + unsafe { read_volatile(offset_ptr as *const T) } + }) + } + + fn write_volatile(&mut self, offset: IndexT, val: T) -> Result<()> + where + T: Copy, + { + self.bounds_check(offset, size_of::() as IndexT) + .map(|_| { + let offset_ptr = unsafe { self.buffer.as_mut_ptr().offset(offset as isize) }; + unsafe { write_volatile(offset_ptr as *mut T, val) }; + }) + } + + /// Atomically fetch the current value at an offset, and increment by delta + /// + /// ```rust + /// # use aeron_rs::client::concurrent::AtomicBuffer; + /// # use aeron_rs::util::AeronError; + /// let mut bytes = [0u8; 9]; + /// let mut buffer = AtomicBuffer::wrap(&mut bytes); + /// + /// // Simple case modifies only the first byte + /// assert_eq!(buffer.get_and_add_i64(0, 1), Ok(0)); + /// assert_eq!(buffer.get_and_add_i64(0, 0), Ok(1)); + /// + /// // Using an offset modifies the second byte + /// assert_eq!(buffer.get_and_add_i64(1, 1), Ok(0)); + /// assert_eq!(buffer.get_and_add_i64(1, 0), Ok(1)); + /// + /// // An offset of 2 means buffer size must be 10 to contain an `i64` + /// assert_eq!(buffer.get_and_add_i64(2, 0), Err(AeronError::OutOfBounds)); + /// ``` + pub fn get_and_add_i64(&self, offset: IndexT, delta: i64) -> Result { + self.overlay::(offset) + .map(|a| a.fetch_add(delta, Ordering::SeqCst)) + } + + /// Perform a volatile read + /// + /// ```rust + /// # use aeron_rs::client::concurrent::AtomicBuffer; + /// let mut bytes = [12, 0, 0, 0, 0, 0, 0, 0]; + /// let buffer = AtomicBuffer::wrap(&mut bytes); + /// + /// assert_eq!(buffer.get_i64_volatile(0), Ok(12)); + /// ``` + pub fn get_i64_volatile(&self, offset: IndexT) -> Result { + // QUESTION: Would it be better to express this in terms of an atomic read? + self.overlay_volatile::(offset) + } + + /// Get the current value at an offset without using any synchronization operations + pub fn get_i64(&self, offset: IndexT) -> Result { + self.overlay::(offset).map(|i| *i) + } + + /// Perform a volatile read + /// + /// ```rust + /// # use aeron_rs::client::concurrent::AtomicBuffer; + /// let mut bytes = [12, 0, 0, 0]; + /// let buffer = AtomicBuffer::wrap(&mut bytes); + /// + /// assert_eq!(buffer.get_i32_volatile(0), Ok(12)); + /// ``` + pub fn get_i32_volatile(&self, offset: IndexT) -> Result { + self.overlay_volatile::(offset) + } + + /// Get the current value at an offset without using any synchronization operations + pub fn get_i32(&self, offset: IndexT) -> Result { + self.overlay::(offset).map(|i| *i) + } + + /// Perform a volatile write of an `i64` into the buffer + /// + /// ```rust + /// # use aeron_rs::client::concurrent::AtomicBuffer; + /// let mut bytes = [0u8; 8]; + /// let mut buffer = AtomicBuffer::wrap(&mut bytes); + /// + /// buffer.put_i64_ordered(0, 12); + /// assert_eq!(buffer.get_i64_volatile(0), Ok(12)); + /// ``` + pub fn put_i64_ordered(&mut self, offset: IndexT, val: i64) -> Result<()> { + // QUESTION: Would it be better to have callers use `write_volatile` directly + self.write_volatile::(offset, val) + } + + /// Perform a volatile write of an `i32` into the buffer + /// + /// ```rust + /// # use aeron_rs::client::concurrent::AtomicBuffer; + /// let mut bytes = [0u8; 4]; + /// let mut buffer = AtomicBuffer::wrap(&mut bytes); + /// + /// buffer.put_i32_ordered(0, 12); + /// assert_eq!(buffer.get_i32_volatile(0), Ok(12)); + /// ``` + pub fn put_i32_ordered(&mut self, offset: IndexT, val: i32) -> Result<()> { + // QUESTION: Would it be better to have callers use `write_volatile` directly + self.write_volatile::(offset, val) + } + + /// Write the contents of one buffer to another. Does not perform any synchronization. + /// + /// ```rust + /// # use aeron_rs::client::concurrent::AtomicBuffer; + /// let mut source_bytes = [1u8, 2, 3, 4]; + /// let source = AtomicBuffer::wrap(&mut source_bytes); + /// + /// let mut dest_bytes = [0, 0, 0, 0]; + /// let mut dest = AtomicBuffer::wrap(&mut dest_bytes); + /// + /// dest.put_bytes(1, &source, 1, 3); + /// drop(dest); + /// assert_eq!(dest_bytes, [0u8, 2, 3, 4]); + /// ``` + pub fn put_bytes( + &mut self, + index: IndexT, + source: &AtomicBuffer, + source_index: IndexT, + len: IndexT, + ) -> Result<()> { + self.bounds_check(index, len)?; + source.bounds_check(source_index, len)?; + + let index = index as usize; + let source_index = source_index as usize; + let len = len as usize; + self.buffer[index..index + len].copy_from_slice(&source[source_index..source_index + len]); + Ok(()) + } + + /// Compare an expected value with what is in memory, and if it matches, + /// update to a new value. Returns `Ok(true)` if the update was successful, + /// and `Ok(false)` if the update failed. + /// + /// ```rust + /// # use aeron_rs::client::concurrent::AtomicBuffer; + /// let mut buf = [0u8; 8]; + /// let atomic_buf = AtomicBuffer::wrap(&mut buf); + /// // Set value to 1 + /// atomic_buf.get_and_add_i64(0, 1).unwrap(); + /// + /// // Set value to 1 if existing value is 0 + /// assert_eq!(atomic_buf.compare_and_set_i64(0, 0, 1), Ok(false)); + /// // Set value to 2 if existing value is 1 + /// assert_eq!(atomic_buf.compare_and_set_i64(0, 1, 2), Ok(true)); + /// assert_eq!(atomic_buf.get_i64_volatile(0), Ok(2)); + /// ``` + pub fn compare_and_set_i64(&self, offset: IndexT, expected: i64, update: i64) -> Result { + // QUESTION: Should I use a volatile read here as well? + // Aeron C++ uses a volatile read before the atomic operation, but I think that + // may be redundant. In addition, Rust's `read_volatile` operation returns a + // *copied* value; running `compare_exchange` on that copy introduces a race condition + // because we're no longer comparing a consistent address. + self.overlay::(offset).map(|a| { + a.compare_exchange(expected, update, Ordering::SeqCst, Ordering::SeqCst) + .is_ok() + }) + } + + /// Repeatedly write a value into an atomic buffer. Guaranteed to use `memset`. + pub fn set_memory(&mut self, offset: IndexT, length: usize, value: u8) -> Result<()> { + self.bounds_check(offset, length as IndexT).map(|_| unsafe { + self.buffer + .as_mut_ptr() + .offset(offset as isize) + .write_bytes(value, length) + }) + } +} + +#[cfg(test)] +mod tests { + use memmap::MmapOptions; + use std::sync::atomic::{AtomicU64, Ordering}; + + use crate::client::concurrent::AtomicBuffer; + use crate::util::AeronError; + + #[test] + fn mmap_to_atomic() { + let mut mmap = MmapOptions::new() + .len(24) + .map_anon() + .expect("Unable to map anonymous memory"); + AtomicBuffer::wrap(&mut mmap); + } + + #[test] + fn primitive_atomic_equivalent() { + let value: u64 = 24; + + let val_ptr = &value as *const u64; + let a_ptr = val_ptr as *const AtomicU64; + let a: &AtomicU64 = unsafe { &*a_ptr }; + + assert_eq!(value, (*a).load(Ordering::SeqCst)); + } + + #[test] + fn negative_offset() { + let mut buf = [16, 0, 0, 0, 0, 0, 0, 0]; + let atomic_buf = AtomicBuffer::wrap(&mut buf); + assert_eq!( + atomic_buf.get_and_add_i64(-1, 0), + Err(AeronError::OutOfBounds) + ) + } +} diff --git a/aeron-rs/src/client/concurrent/ring_buffer.rs b/aeron-rs/src/client/concurrent/ringbuffer.rs similarity index 98% rename from aeron-rs/src/client/concurrent/ring_buffer.rs rename to aeron-rs/src/client/concurrent/ringbuffer.rs index 5530a52..addf9a8 100644 --- a/aeron-rs/src/client/concurrent/ring_buffer.rs +++ b/aeron-rs/src/client/concurrent/ringbuffer.rs @@ -1,11 +1,11 @@ //! Ring buffer wrapper for communicating with the Media Driver -use crate::client::concurrent::atomic_buffer::AtomicBuffer; +use crate::client::concurrent::AtomicBuffer; use crate::util::bit::align; use crate::util::{bit, AeronError, IndexT, Result}; /// Description of the Ring Buffer schema. pub mod buffer_descriptor { - use crate::client::concurrent::atomic_buffer::AtomicBuffer; + use crate::client::concurrent::AtomicBuffer; use crate::util::bit::{is_power_of_two, CACHE_LINE_LENGTH}; use crate::util::AeronError::IllegalArgument; use crate::util::{IndexT, Result}; @@ -354,8 +354,8 @@ impl<'a> ManyToOneRingBuffer<'a> { #[cfg(test)] mod tests { - use crate::client::concurrent::atomic_buffer::AtomicBuffer; - use crate::client::concurrent::ring_buffer::{ + use crate::client::concurrent::AtomicBuffer; + use crate::client::concurrent::ringbuffer::{ buffer_descriptor, record_descriptor, ManyToOneRingBuffer, }; use crate::util::IndexT; diff --git a/aeron-rs/src/client/driver_proxy.rs b/aeron-rs/src/client/driver_proxy.rs index b1a84e4..71b9ee2 100644 --- a/aeron-rs/src/client/driver_proxy.rs +++ b/aeron-rs/src/client/driver_proxy.rs @@ -1,7 +1,7 @@ //! Proxy object for interacting with the Media Driver. Handles operations //! involving the command-and-control file protocol. -use crate::client::concurrent::ring_buffer::ManyToOneRingBuffer; +use crate::client::concurrent::ringbuffer::ManyToOneRingBuffer; /// Proxy object for operations involving the Media Driver pub struct DriverProxy<'a> { diff --git a/aeron-rs/tests/cnc_terminate.rs b/aeron-rs/tests/cnc_terminate.rs index 73728b5..6ef9da9 100644 --- a/aeron-rs/tests/cnc_terminate.rs +++ b/aeron-rs/tests/cnc_terminate.rs @@ -1,7 +1,7 @@ use aeron_driver_sys::*; use aeron_rs::client::cnc_descriptor; -use aeron_rs::client::concurrent::atomic_buffer::AtomicBuffer; -use aeron_rs::client::concurrent::ring_buffer::ManyToOneRingBuffer; +use aeron_rs::client::concurrent::AtomicBuffer; +use aeron_rs::client::concurrent::ringbuffer::ManyToOneRingBuffer; use aeron_rs::util::IndexT; use memmap::MmapOptions; use std::ffi::{c_void, CString};