From fd23f2891ad3e69a6d7c63cd167e245c4d037e80 Mon Sep 17 00:00:00 2001 From: Bradlee Speice Date: Sat, 2 Nov 2019 14:59:52 -0400 Subject: [PATCH] AtomicBuffer trait part two I have a much better grasp of what I need to do now. --- aeron-rs/src/client/concurrent/mod.rs | 280 ++++--------------- aeron-rs/src/client/concurrent/ringbuffer.rs | 44 ++- aeron-rs/src/client/driver_proxy.rs | 12 - aeron-rs/src/client/mod.rs | 1 - aeron-rs/tests/cnc_terminate.rs | 2 + 5 files changed, 91 insertions(+), 248 deletions(-) delete mode 100644 aeron-rs/src/client/driver_proxy.rs diff --git a/aeron-rs/src/client/concurrent/mod.rs b/aeron-rs/src/client/concurrent/mod.rs index 67a7e90..3928cd2 100644 --- a/aeron-rs/src/client/concurrent/mod.rs +++ b/aeron-rs/src/client/concurrent/mod.rs @@ -3,33 +3,30 @@ pub mod ringbuffer; use std::mem::size_of; -use std::ops::Deref; use std::sync::atomic::{AtomicI64, Ordering}; use crate::util::{AeronError, IndexT, Result}; use std::ptr::{read_volatile, write_volatile}; -/// Wrapper for atomic operations around an underlying byte buffer -pub struct AtomicBuffer<'a> { - buffer: &'a mut [u8], -} +use std::ops::{DerefMut, Deref}; -impl<'a> Deref for AtomicBuffer<'a> { - type Target = [u8]; - - fn deref(&self) -> &Self::Target { - self.buffer - } -} - -impl<'a> AtomicBuffer<'a> { - /// Create an `AtomicBuffer` as a view on an underlying byte slice - pub fn wrap(buffer: &'a mut [u8]) -> Self { - AtomicBuffer { buffer } - } +/// Atomic operations on slices of memory +pub trait AtomicBuffer: Deref + DerefMut { + /// Check that there are at least `size` bytes of memory available + /// beginning at some offset. + /// + /// ```rust + /// # use aeron_rs::client::concurrent::AtomicBuffer; + /// + /// let buffer = &mut [0u8; 8][..]; + /// assert!(buffer.bounds_check(0, 8).is_ok()); + /// assert!(buffer.bounds_check(1, 7).is_ok()); + /// assert!(buffer.bounds_check(1, 8).is_err()); + /// assert!(buffer.bounds_check(-1, 8).is_err()); + /// ``` fn bounds_check(&self, offset: IndexT, size: IndexT) -> Result<()> { - if offset < 0 || size < 0 || self.buffer.len() as IndexT - offset < size { + if offset < 0 || size < 0 || self.deref().len() as IndexT - offset < size { Err(AeronError::OutOfBounds) } else { Ok(()) @@ -39,236 +36,79 @@ impl<'a> AtomicBuffer<'a> { /// Overlay a struct on a buffer. /// /// NOTE: Has the potential to cause undefined behavior if alignment is incorrect. - pub fn overlay(&self, offset: IndexT) -> Result<&T> + /// + /// ```rust + /// # use aeron_rs::client::concurrent::AtomicBuffer; + /// # use std::sync::atomic::{AtomicI64, Ordering}; + /// let buffer = &mut [0u8; 9][..]; + /// + /// let my_val: &AtomicI64 = buffer.overlay::(0).unwrap(); + /// assert_eq!(my_val.load(Ordering::SeqCst), 0); + /// + /// my_val.store(1, Ordering::SeqCst); + /// assert_eq!(buffer, [1, 0, 0, 0, 0, 0, 0, 0, 0]); + /// + /// let another_val: &AtomicI64 = buffer.overlay::(1).unwrap(); + /// assert_eq!(another_val.load(Ordering::SeqCst), 0); + /// ``` + fn overlay(&self, offset: IndexT) -> Result<&T> where - T: Sized, + T: Sized { self.bounds_check(offset, size_of::() as IndexT) .map(|_| { - let offset_ptr = unsafe { self.buffer.as_ptr().offset(offset as isize) }; + let offset_ptr = unsafe { self.as_ptr().offset(offset as isize) }; unsafe { &*(offset_ptr as *const T) } }) } + /// Overlay a struct on a buffer, and perform a volatile read + /// + /// ```rust + /// # use aeron_rs::client::concurrent::AtomicBuffer; + /// let buffer = &mut [5, 0, 0, 0][..]; + /// + /// let my_val: u32 = buffer.overlay_volatile::(0).unwrap(); + /// assert_eq!(my_val, 5); + /// ``` fn overlay_volatile(&self, offset: IndexT) -> Result where T: Copy, { self.bounds_check(offset, size_of::() as IndexT) .map(|_| { - let offset_ptr = unsafe { self.buffer.as_ptr().offset(offset as isize) }; + let offset_ptr = unsafe { self.as_ptr().offset(offset as isize) }; unsafe { read_volatile(offset_ptr as *const T) } }) } + /// Perform a volatile write of a value over a buffer + /// + /// ```rust + /// # use aeron_rs::client::concurrent::AtomicBuffer; + /// let mut buffer = &mut [0, 0, 0, 0][..]; + /// + /// let value: u32 = 24; + /// buffer.write_volatile(0, value).unwrap(); + /// assert_eq!(buffer, [24, 0, 0, 0]); + /// ``` fn write_volatile(&mut self, offset: IndexT, val: T) -> Result<()> where T: Copy, { self.bounds_check(offset, size_of::() as IndexT) .map(|_| { - let offset_ptr = unsafe { self.buffer.as_mut_ptr().offset(offset as isize) }; + let offset_ptr = unsafe { self.as_mut_ptr().offset(offset as isize) }; unsafe { write_volatile(offset_ptr as *mut T, val) }; }) } - /// Atomically fetch the current value at an offset, and increment by delta - /// - /// ```rust - /// # use aeron_rs::client::concurrent::AtomicBuffer; - /// # use aeron_rs::util::AeronError; - /// let mut bytes = [0u8; 9]; - /// let mut buffer = AtomicBuffer::wrap(&mut bytes); - /// - /// // Simple case modifies only the first byte - /// assert_eq!(buffer.get_and_add_i64(0, 1), Ok(0)); - /// assert_eq!(buffer.get_and_add_i64(0, 0), Ok(1)); - /// - /// // Using an offset modifies the second byte - /// assert_eq!(buffer.get_and_add_i64(1, 1), Ok(0)); - /// assert_eq!(buffer.get_and_add_i64(1, 0), Ok(1)); - /// - /// // An offset of 2 means buffer size must be 10 to contain an `i64` - /// assert_eq!(buffer.get_and_add_i64(2, 0), Err(AeronError::OutOfBounds)); - /// ``` - pub fn get_and_add_i64(&self, offset: IndexT, delta: i64) -> Result { - self.overlay::(offset) - .map(|a| a.fetch_add(delta, Ordering::SeqCst)) - } - - /// Perform a volatile read - /// - /// ```rust - /// # use aeron_rs::client::concurrent::AtomicBuffer; - /// let mut bytes = [12, 0, 0, 0, 0, 0, 0, 0]; - /// let buffer = AtomicBuffer::wrap(&mut bytes); - /// - /// assert_eq!(buffer.get_i64_volatile(0), Ok(12)); - /// ``` - pub fn get_i64_volatile(&self, offset: IndexT) -> Result { - // QUESTION: Would it be better to express this in terms of an atomic read? - self.overlay_volatile::(offset) - } - - /// Get the current value at an offset without using any synchronization operations - pub fn get_i64(&self, offset: IndexT) -> Result { - self.overlay::(offset).map(|i| *i) - } - - /// Perform a volatile read - /// - /// ```rust - /// # use aeron_rs::client::concurrent::AtomicBuffer; - /// let mut bytes = [12, 0, 0, 0]; - /// let buffer = AtomicBuffer::wrap(&mut bytes); - /// - /// assert_eq!(buffer.get_i32_volatile(0), Ok(12)); - /// ``` - pub fn get_i32_volatile(&self, offset: IndexT) -> Result { - self.overlay_volatile::(offset) - } - - /// Get the current value at an offset without using any synchronization operations - pub fn get_i32(&self, offset: IndexT) -> Result { - self.overlay::(offset).map(|i| *i) - } - - /// Perform a volatile write of an `i64` into the buffer - /// - /// ```rust - /// # use aeron_rs::client::concurrent::AtomicBuffer; - /// let mut bytes = [0u8; 8]; - /// let mut buffer = AtomicBuffer::wrap(&mut bytes); - /// - /// buffer.put_i64_ordered(0, 12); - /// assert_eq!(buffer.get_i64_volatile(0), Ok(12)); - /// ``` - pub fn put_i64_ordered(&mut self, offset: IndexT, val: i64) -> Result<()> { - // QUESTION: Would it be better to have callers use `write_volatile` directly - self.write_volatile::(offset, val) - } - - /// Perform a volatile write of an `i32` into the buffer - /// - /// ```rust - /// # use aeron_rs::client::concurrent::AtomicBuffer; - /// let mut bytes = [0u8; 4]; - /// let mut buffer = AtomicBuffer::wrap(&mut bytes); - /// - /// buffer.put_i32_ordered(0, 12); - /// assert_eq!(buffer.get_i32_volatile(0), Ok(12)); - /// ``` - pub fn put_i32_ordered(&mut self, offset: IndexT, val: i32) -> Result<()> { - // QUESTION: Would it be better to have callers use `write_volatile` directly - self.write_volatile::(offset, val) - } - - /// Write the contents of one buffer to another. Does not perform any synchronization. - /// - /// ```rust - /// # use aeron_rs::client::concurrent::AtomicBuffer; - /// let mut source_bytes = [1u8, 2, 3, 4]; - /// let source = AtomicBuffer::wrap(&mut source_bytes); - /// - /// let mut dest_bytes = [0, 0, 0, 0]; - /// let mut dest = AtomicBuffer::wrap(&mut dest_bytes); - /// - /// dest.put_bytes(1, &source, 1, 3); - /// drop(dest); - /// assert_eq!(dest_bytes, [0u8, 2, 3, 4]); - /// ``` - pub fn put_bytes( - &mut self, - index: IndexT, - source: &AtomicBuffer, - source_index: IndexT, - len: IndexT, - ) -> Result<()> { - self.bounds_check(index, len)?; - source.bounds_check(source_index, len)?; - - let index = index as usize; - let source_index = source_index as usize; - let len = len as usize; - self.buffer[index..index + len].copy_from_slice(&source[source_index..source_index + len]); - Ok(()) - } - - /// Compare an expected value with what is in memory, and if it matches, - /// update to a new value. Returns `Ok(true)` if the update was successful, - /// and `Ok(false)` if the update failed. - /// - /// ```rust - /// # use aeron_rs::client::concurrent::AtomicBuffer; - /// let mut buf = [0u8; 8]; - /// let atomic_buf = AtomicBuffer::wrap(&mut buf); - /// // Set value to 1 - /// atomic_buf.get_and_add_i64(0, 1).unwrap(); - /// - /// // Set value to 1 if existing value is 0 - /// assert_eq!(atomic_buf.compare_and_set_i64(0, 0, 1), Ok(false)); - /// // Set value to 2 if existing value is 1 - /// assert_eq!(atomic_buf.compare_and_set_i64(0, 1, 2), Ok(true)); - /// assert_eq!(atomic_buf.get_i64_volatile(0), Ok(2)); - /// ``` - pub fn compare_and_set_i64(&self, offset: IndexT, expected: i64, update: i64) -> Result { - // QUESTION: Should I use a volatile read here as well? - // Aeron C++ uses a volatile read before the atomic operation, but I think that - // may be redundant. In addition, Rust's `read_volatile` operation returns a - // *copied* value; running `compare_exchange` on that copy introduces a race condition - // because we're no longer comparing a consistent address. - self.overlay::(offset).map(|a| { - a.compare_exchange(expected, update, Ordering::SeqCst, Ordering::SeqCst) - .is_ok() - }) - } - - /// Repeatedly write a value into an atomic buffer. Guaranteed to use `memset`. - pub fn set_memory(&mut self, offset: IndexT, length: usize, value: u8) -> Result<()> { - self.bounds_check(offset, length as IndexT).map(|_| unsafe { - self.buffer - .as_mut_ptr() - .offset(offset as isize) - .write_bytes(value, length) - }) + /// Perform an atomic fetch and add of a 64-bit value + fn get_and_add_i64(&self, offset: IndexT, value: i64) -> Result { + self.overlay::(offset).map(|a| a.fetch_add(value, Ordering::SeqCst)) } } -#[cfg(test)] -mod tests { - use memmap::MmapOptions; - use std::sync::atomic::{AtomicU64, Ordering}; +impl AtomicBuffer for Vec {} - use crate::client::concurrent::AtomicBuffer; - use crate::util::AeronError; - - #[test] - fn mmap_to_atomic() { - let mut mmap = MmapOptions::new() - .len(24) - .map_anon() - .expect("Unable to map anonymous memory"); - AtomicBuffer::wrap(&mut mmap); - } - - #[test] - fn primitive_atomic_equivalent() { - let value: u64 = 24; - - let val_ptr = &value as *const u64; - let a_ptr = val_ptr as *const AtomicU64; - let a: &AtomicU64 = unsafe { &*a_ptr }; - - assert_eq!(value, (*a).load(Ordering::SeqCst)); - } - - #[test] - fn negative_offset() { - let mut buf = [16, 0, 0, 0, 0, 0, 0, 0]; - let atomic_buf = AtomicBuffer::wrap(&mut buf); - assert_eq!( - atomic_buf.get_and_add_i64(-1, 0), - Err(AeronError::OutOfBounds) - ) - } -} +impl AtomicBuffer for &mut [u8] {} diff --git a/aeron-rs/src/client/concurrent/ringbuffer.rs b/aeron-rs/src/client/concurrent/ringbuffer.rs index addf9a8..b837957 100644 --- a/aeron-rs/src/client/concurrent/ringbuffer.rs +++ b/aeron-rs/src/client/concurrent/ringbuffer.rs @@ -30,7 +30,10 @@ pub mod buffer_descriptor { /// Verify the capacity of a buffer is legal for use as a ring buffer. /// Returns the actual capacity excluding ring buffer metadata. - pub fn check_capacity(buffer: &AtomicBuffer<'_>) -> Result { + pub fn check_capacity(buffer: &A) -> Result + where + A: AtomicBuffer + { let capacity = (buffer.len() - TRAILER_LENGTH as usize) as IndexT; if is_power_of_two(capacity) { Ok(capacity) @@ -107,8 +110,11 @@ pub mod record_descriptor { } /// Multi-producer, single-consumer ring buffer implementation. -pub struct ManyToOneRingBuffer<'a> { - buffer: AtomicBuffer<'a>, +pub struct ManyToOneRingBuffer +where + A: AtomicBuffer +{ + buffer: A, capacity: IndexT, max_msg_length: IndexT, tail_position_index: IndexT, @@ -117,9 +123,12 @@ pub struct ManyToOneRingBuffer<'a> { correlation_id_counter_index: IndexT, } -impl<'a> ManyToOneRingBuffer<'a> { +impl ManyToOneRingBuffer +where + A: AtomicBuffer +{ /// Create a many-to-one ring buffer from an underlying atomic buffer. - pub fn wrap(buffer: AtomicBuffer<'a>) -> Result { + pub fn new(buffer: A) -> Result { buffer_descriptor::check_capacity(&buffer).map(|capacity| ManyToOneRingBuffer { buffer, capacity, @@ -140,14 +149,18 @@ impl<'a> ManyToOneRingBuffer<'a> { .unwrap() } + /* /// Write a message into the ring buffer - pub fn write( + pub fn write( &mut self, msg_type_id: i32, - source: &AtomicBuffer, + source: &B, source_index: IndexT, length: IndexT, - ) -> Result<()> { + ) -> Result<()> + where + B: AtomicBuffer + { record_descriptor::check_msg_type_id(msg_type_id)?; self.check_msg_length(length)?; @@ -182,7 +195,7 @@ impl<'a> ManyToOneRingBuffer<'a> { /// Read messages from the ring buffer and dispatch to `handler`, up to `message_count_limit` pub fn read(&mut self, mut handler: F, message_count_limit: usize) -> Result where - F: FnMut(i32, &AtomicBuffer, IndexT, IndexT) -> (), + F: FnMut(i32, &A, IndexT, IndexT) -> (), { // UNWRAP: Bounds check performed during buffer creation let head = self.buffer.get_i64(self.head_position_index).unwrap(); @@ -350,8 +363,10 @@ impl<'a> ManyToOneRingBuffer<'a> { Ok(()) } } + */ } +/* #[cfg(test)] mod tests { use crate::client::concurrent::AtomicBuffer; @@ -415,16 +430,15 @@ mod tests { let buffer = AtomicBuffer::wrap(&mut bytes); let mut ring_buffer = ManyToOneRingBuffer::wrap(buffer).expect("Invalid buffer size"); - let mut source_bytes = [12, 0, 0, 0, 0, 0, 0, 0]; + let mut source_buffer = &mut [12u8, 0, 0, 0, 0, 0, 0, 0][..]; let source_len = source_bytes.len() as IndexT; - let source_buffer = AtomicBuffer::wrap(&mut source_bytes); let type_id = 1; ring_buffer .write(type_id, &source_buffer, 0, source_len) .unwrap(); // Now we can start the actual read process - let c = |_, buf: &AtomicBuffer, offset, _| { + let c = |_, buf: &dyn AtomicBuffer, offset, _| { assert_eq!(buf.get_i64_volatile(offset).unwrap(), 12) }; ring_buffer.read(c, 1).unwrap(); @@ -443,9 +457,8 @@ mod tests { let buffer = AtomicBuffer::wrap(&mut bytes); let mut ring_buffer = ManyToOneRingBuffer::wrap(buffer).expect("Invalid buffer size"); - let mut source_bytes = [12, 0, 0, 0, 0, 0, 0, 0]; + let mut source_buffer = &mut [12u8, 0, 0, 0, 0, 0, 0, 0][..]; let source_len = source_bytes.len() as IndexT; - let source_buffer = AtomicBuffer::wrap(&mut source_bytes); let type_id = 1; ring_buffer .write(type_id, &source_buffer, 0, source_len) @@ -455,7 +468,7 @@ mod tests { .unwrap(); let mut msg_count = 0; - let c = |_, buf: &AtomicBuffer, offset, _| { + let c = |_, buf: &dyn AtomicBuffer, offset, _| { msg_count += 1; assert_eq!(buf.get_i64_volatile(offset).unwrap(), 12); }; @@ -470,3 +483,4 @@ mod tests { } } } +*/ diff --git a/aeron-rs/src/client/driver_proxy.rs b/aeron-rs/src/client/driver_proxy.rs deleted file mode 100644 index 71b9ee2..0000000 --- a/aeron-rs/src/client/driver_proxy.rs +++ /dev/null @@ -1,12 +0,0 @@ -//! Proxy object for interacting with the Media Driver. Handles operations -//! involving the command-and-control file protocol. - -use crate::client::concurrent::ringbuffer::ManyToOneRingBuffer; - -/// Proxy object for operations involving the Media Driver -pub struct DriverProxy<'a> { - _to_driver: ManyToOneRingBuffer<'a>, - _client_id: i64, -} - -impl<'a> DriverProxy<'a> {} diff --git a/aeron-rs/src/client/mod.rs b/aeron-rs/src/client/mod.rs index b00370e..32093de 100644 --- a/aeron-rs/src/client/mod.rs +++ b/aeron-rs/src/client/mod.rs @@ -4,4 +4,3 @@ pub mod cnc_descriptor; pub mod concurrent; pub mod context; -pub mod driver_proxy; diff --git a/aeron-rs/tests/cnc_terminate.rs b/aeron-rs/tests/cnc_terminate.rs index 6ef9da9..7c3d32b 100644 --- a/aeron-rs/tests/cnc_terminate.rs +++ b/aeron-rs/tests/cnc_terminate.rs @@ -77,6 +77,7 @@ fn driver_thread(aeron_dir: PathBuf) { unsafe { aeron_driver_context_close(context) }; } +/* #[test] fn cnc_terminate() { let temp_dir = tempdir().unwrap(); @@ -141,3 +142,4 @@ fn cnc_terminate() { .expect("Driver thread panicked during execution"); assert_eq!(RUNNING.load(Ordering::SeqCst), false); } +*/