From f7ec021bc8e0fb808b24c48c4227cbea0e8af13a Mon Sep 17 00:00:00 2001 From: Bradlee Speice Date: Sat, 2 Nov 2019 13:27:27 -0400 Subject: [PATCH 1/7] Refactor to match Aeron's layout --- .../src/client/concurrent/atomic_buffer.rs | 272 ----------------- aeron-rs/src/client/concurrent/mod.rs | 273 +++++++++++++++++- .../{ring_buffer.rs => ringbuffer.rs} | 8 +- aeron-rs/src/client/driver_proxy.rs | 2 +- aeron-rs/tests/cnc_terminate.rs | 4 +- 5 files changed, 278 insertions(+), 281 deletions(-) delete mode 100644 aeron-rs/src/client/concurrent/atomic_buffer.rs rename aeron-rs/src/client/concurrent/{ring_buffer.rs => ringbuffer.rs} (98%) diff --git a/aeron-rs/src/client/concurrent/atomic_buffer.rs b/aeron-rs/src/client/concurrent/atomic_buffer.rs deleted file mode 100644 index 0fbed58..0000000 --- a/aeron-rs/src/client/concurrent/atomic_buffer.rs +++ /dev/null @@ -1,272 +0,0 @@ -//! Buffer that is safe to use in a multi-process/multi-thread context. Typically used for -//! handling atomic updates of memory-mapped buffers. -use std::mem::size_of; -use std::ops::Deref; -use std::sync::atomic::{AtomicI64, Ordering}; - -use crate::util::{AeronError, IndexT, Result}; -use std::ptr::{read_volatile, write_volatile}; - -/// Wrapper for atomic operations around an underlying byte buffer -pub struct AtomicBuffer<'a> { - buffer: &'a mut [u8], -} - -impl<'a> Deref for AtomicBuffer<'a> { - type Target = [u8]; - - fn deref(&self) -> &Self::Target { - self.buffer - } -} - -impl<'a> AtomicBuffer<'a> { - /// Create an `AtomicBuffer` as a view on an underlying byte slice - pub fn wrap(buffer: &'a mut [u8]) -> Self { - AtomicBuffer { buffer } - } - - fn bounds_check(&self, offset: IndexT, size: IndexT) -> Result<()> { - if offset < 0 || size < 0 || self.buffer.len() as IndexT - offset < size { - Err(AeronError::OutOfBounds) - } else { - Ok(()) - } - } - - /// Overlay a struct on a buffer. - /// - /// NOTE: Has the potential to cause undefined behavior if alignment is incorrect. - pub fn overlay(&self, offset: IndexT) -> Result<&T> - where - T: Sized, - { - self.bounds_check(offset, size_of::() as IndexT) - .map(|_| { - let offset_ptr = unsafe { self.buffer.as_ptr().offset(offset as isize) }; - unsafe { &*(offset_ptr as *const T) } - }) - } - - fn overlay_volatile(&self, offset: IndexT) -> Result - where - T: Copy, - { - self.bounds_check(offset, size_of::() as IndexT) - .map(|_| { - let offset_ptr = unsafe { self.buffer.as_ptr().offset(offset as isize) }; - unsafe { read_volatile(offset_ptr as *const T) } - }) - } - - fn write_volatile(&mut self, offset: IndexT, val: T) -> Result<()> - where - T: Copy, - { - self.bounds_check(offset, size_of::() as IndexT) - .map(|_| { - let offset_ptr = unsafe { self.buffer.as_mut_ptr().offset(offset as isize) }; - unsafe { write_volatile(offset_ptr as *mut T, val) }; - }) - } - - /// Atomically fetch the current value at an offset, and increment by delta - /// - /// ```rust - /// # use aeron_rs::client::concurrent::atomic_buffer::AtomicBuffer; - /// # use aeron_rs::util::AeronError; - /// let mut bytes = [0u8; 9]; - /// let mut buffer = AtomicBuffer::wrap(&mut bytes); - /// - /// // Simple case modifies only the first byte - /// assert_eq!(buffer.get_and_add_i64(0, 1), Ok(0)); - /// assert_eq!(buffer.get_and_add_i64(0, 0), Ok(1)); - /// - /// // Using an offset modifies the second byte - /// assert_eq!(buffer.get_and_add_i64(1, 1), Ok(0)); - /// assert_eq!(buffer.get_and_add_i64(1, 0), Ok(1)); - /// - /// // An offset of 2 means buffer size must be 10 to contain an `i64` - /// assert_eq!(buffer.get_and_add_i64(2, 0), Err(AeronError::OutOfBounds)); - /// ``` - pub fn get_and_add_i64(&self, offset: IndexT, delta: i64) -> Result { - self.overlay::(offset) - .map(|a| a.fetch_add(delta, Ordering::SeqCst)) - } - - /// Perform a volatile read - /// - /// ```rust - /// # use aeron_rs::client::concurrent::atomic_buffer::AtomicBuffer; - /// let mut bytes = [12, 0, 0, 0, 0, 0, 0, 0]; - /// let buffer = AtomicBuffer::wrap(&mut bytes); - /// - /// assert_eq!(buffer.get_i64_volatile(0), Ok(12)); - /// ``` - pub fn get_i64_volatile(&self, offset: IndexT) -> Result { - // QUESTION: Would it be better to express this in terms of an atomic read? - self.overlay_volatile::(offset) - } - - /// Get the current value at an offset without using any synchronization operations - pub fn get_i64(&self, offset: IndexT) -> Result { - self.overlay::(offset).map(|i| *i) - } - - /// Perform a volatile read - /// - /// ```rust - /// # use aeron_rs::client::concurrent::atomic_buffer::AtomicBuffer; - /// let mut bytes = [12, 0, 0, 0]; - /// let buffer = AtomicBuffer::wrap(&mut bytes); - /// - /// assert_eq!(buffer.get_i32_volatile(0), Ok(12)); - /// ``` - pub fn get_i32_volatile(&self, offset: IndexT) -> Result { - self.overlay_volatile::(offset) - } - - /// Get the current value at an offset without using any synchronization operations - pub fn get_i32(&self, offset: IndexT) -> Result { - self.overlay::(offset).map(|i| *i) - } - - /// Perform a volatile write of an `i64` into the buffer - /// - /// ```rust - /// # use aeron_rs::client::concurrent::atomic_buffer::AtomicBuffer; - /// let mut bytes = [0u8; 8]; - /// let mut buffer = AtomicBuffer::wrap(&mut bytes); - /// - /// buffer.put_i64_ordered(0, 12); - /// assert_eq!(buffer.get_i64_volatile(0), Ok(12)); - /// ``` - pub fn put_i64_ordered(&mut self, offset: IndexT, val: i64) -> Result<()> { - // QUESTION: Would it be better to have callers use `write_volatile` directly - self.write_volatile::(offset, val) - } - - /// Perform a volatile write of an `i32` into the buffer - /// - /// ```rust - /// # use aeron_rs::client::concurrent::atomic_buffer::AtomicBuffer; - /// let mut bytes = [0u8; 4]; - /// let mut buffer = AtomicBuffer::wrap(&mut bytes); - /// - /// buffer.put_i32_ordered(0, 12); - /// assert_eq!(buffer.get_i32_volatile(0), Ok(12)); - /// ``` - pub fn put_i32_ordered(&mut self, offset: IndexT, val: i32) -> Result<()> { - // QUESTION: Would it be better to have callers use `write_volatile` directly - self.write_volatile::(offset, val) - } - - /// Write the contents of one buffer to another. Does not perform any synchronization. - /// - /// ```rust - /// # use aeron_rs::client::concurrent::atomic_buffer::AtomicBuffer; - /// let mut source_bytes = [1u8, 2, 3, 4]; - /// let source = AtomicBuffer::wrap(&mut source_bytes); - /// - /// let mut dest_bytes = [0, 0, 0, 0]; - /// let mut dest = AtomicBuffer::wrap(&mut dest_bytes); - /// - /// dest.put_bytes(1, &source, 1, 3); - /// drop(dest); - /// assert_eq!(dest_bytes, [0u8, 2, 3, 4]); - /// ``` - pub fn put_bytes( - &mut self, - index: IndexT, - source: &AtomicBuffer, - source_index: IndexT, - len: IndexT, - ) -> Result<()> { - self.bounds_check(index, len)?; - source.bounds_check(source_index, len)?; - - let index = index as usize; - let source_index = source_index as usize; - let len = len as usize; - self.buffer[index..index + len].copy_from_slice(&source[source_index..source_index + len]); - Ok(()) - } - - /// Compare an expected value with what is in memory, and if it matches, - /// update to a new value. Returns `Ok(true)` if the update was successful, - /// and `Ok(false)` if the update failed. - /// - /// ```rust - /// # use aeron_rs::client::concurrent::atomic_buffer::AtomicBuffer; - /// let mut buf = [0u8; 8]; - /// let atomic_buf = AtomicBuffer::wrap(&mut buf); - /// // Set value to 1 - /// atomic_buf.get_and_add_i64(0, 1).unwrap(); - /// - /// // Set value to 1 if existing value is 0 - /// assert_eq!(atomic_buf.compare_and_set_i64(0, 0, 1), Ok(false)); - /// // Set value to 2 if existing value is 1 - /// assert_eq!(atomic_buf.compare_and_set_i64(0, 1, 2), Ok(true)); - /// assert_eq!(atomic_buf.get_i64_volatile(0), Ok(2)); - /// ``` - pub fn compare_and_set_i64(&self, offset: IndexT, expected: i64, update: i64) -> Result { - // QUESTION: Should I use a volatile read here as well? - // Aeron C++ uses a volatile read before the atomic operation, but I think that - // may be redundant. In addition, Rust's `read_volatile` operation returns a - // *copied* value; running `compare_exchange` on that copy introduces a race condition - // because we're no longer comparing a consistent address. - self.overlay::(offset).map(|a| { - a.compare_exchange(expected, update, Ordering::SeqCst, Ordering::SeqCst) - .is_ok() - }) - } - - /// Repeatedly write a value into an atomic buffer. Guaranteed to use `memset`. - pub fn set_memory(&mut self, offset: IndexT, length: usize, value: u8) -> Result<()> { - self.bounds_check(offset, length as IndexT).map(|_| unsafe { - self.buffer - .as_mut_ptr() - .offset(offset as isize) - .write_bytes(value, length) - }) - } -} - -#[cfg(test)] -mod tests { - use memmap::MmapOptions; - use std::sync::atomic::{AtomicU64, Ordering}; - - use crate::client::concurrent::atomic_buffer::AtomicBuffer; - use crate::util::AeronError; - - #[test] - fn mmap_to_atomic() { - let mut mmap = MmapOptions::new() - .len(24) - .map_anon() - .expect("Unable to map anonymous memory"); - AtomicBuffer::wrap(&mut mmap); - } - - #[test] - fn primitive_atomic_equivalent() { - let value: u64 = 24; - - let val_ptr = &value as *const u64; - let a_ptr = val_ptr as *const AtomicU64; - let a: &AtomicU64 = unsafe { &*a_ptr }; - - assert_eq!(value, (*a).load(Ordering::SeqCst)); - } - - #[test] - fn negative_offset() { - let mut buf = [16, 0, 0, 0, 0, 0, 0, 0]; - let atomic_buf = AtomicBuffer::wrap(&mut buf); - assert_eq!( - atomic_buf.get_and_add_i64(-1, 0), - Err(AeronError::OutOfBounds) - ) - } -} diff --git a/aeron-rs/src/client/concurrent/mod.rs b/aeron-rs/src/client/concurrent/mod.rs index 06eec65..67a7e90 100644 --- a/aeron-rs/src/client/concurrent/mod.rs +++ b/aeron-rs/src/client/concurrent/mod.rs @@ -1,5 +1,274 @@ //! Module for handling safe interactions among the multiple clients making use //! of a single Media Driver -pub mod atomic_buffer; -pub mod ring_buffer; +pub mod ringbuffer; +use std::mem::size_of; +use std::ops::Deref; +use std::sync::atomic::{AtomicI64, Ordering}; + +use crate::util::{AeronError, IndexT, Result}; +use std::ptr::{read_volatile, write_volatile}; + +/// Wrapper for atomic operations around an underlying byte buffer +pub struct AtomicBuffer<'a> { + buffer: &'a mut [u8], +} + +impl<'a> Deref for AtomicBuffer<'a> { + type Target = [u8]; + + fn deref(&self) -> &Self::Target { + self.buffer + } +} + +impl<'a> AtomicBuffer<'a> { + /// Create an `AtomicBuffer` as a view on an underlying byte slice + pub fn wrap(buffer: &'a mut [u8]) -> Self { + AtomicBuffer { buffer } + } + + fn bounds_check(&self, offset: IndexT, size: IndexT) -> Result<()> { + if offset < 0 || size < 0 || self.buffer.len() as IndexT - offset < size { + Err(AeronError::OutOfBounds) + } else { + Ok(()) + } + } + + /// Overlay a struct on a buffer. + /// + /// NOTE: Has the potential to cause undefined behavior if alignment is incorrect. + pub fn overlay(&self, offset: IndexT) -> Result<&T> + where + T: Sized, + { + self.bounds_check(offset, size_of::() as IndexT) + .map(|_| { + let offset_ptr = unsafe { self.buffer.as_ptr().offset(offset as isize) }; + unsafe { &*(offset_ptr as *const T) } + }) + } + + fn overlay_volatile(&self, offset: IndexT) -> Result + where + T: Copy, + { + self.bounds_check(offset, size_of::() as IndexT) + .map(|_| { + let offset_ptr = unsafe { self.buffer.as_ptr().offset(offset as isize) }; + unsafe { read_volatile(offset_ptr as *const T) } + }) + } + + fn write_volatile(&mut self, offset: IndexT, val: T) -> Result<()> + where + T: Copy, + { + self.bounds_check(offset, size_of::() as IndexT) + .map(|_| { + let offset_ptr = unsafe { self.buffer.as_mut_ptr().offset(offset as isize) }; + unsafe { write_volatile(offset_ptr as *mut T, val) }; + }) + } + + /// Atomically fetch the current value at an offset, and increment by delta + /// + /// ```rust + /// # use aeron_rs::client::concurrent::AtomicBuffer; + /// # use aeron_rs::util::AeronError; + /// let mut bytes = [0u8; 9]; + /// let mut buffer = AtomicBuffer::wrap(&mut bytes); + /// + /// // Simple case modifies only the first byte + /// assert_eq!(buffer.get_and_add_i64(0, 1), Ok(0)); + /// assert_eq!(buffer.get_and_add_i64(0, 0), Ok(1)); + /// + /// // Using an offset modifies the second byte + /// assert_eq!(buffer.get_and_add_i64(1, 1), Ok(0)); + /// assert_eq!(buffer.get_and_add_i64(1, 0), Ok(1)); + /// + /// // An offset of 2 means buffer size must be 10 to contain an `i64` + /// assert_eq!(buffer.get_and_add_i64(2, 0), Err(AeronError::OutOfBounds)); + /// ``` + pub fn get_and_add_i64(&self, offset: IndexT, delta: i64) -> Result { + self.overlay::(offset) + .map(|a| a.fetch_add(delta, Ordering::SeqCst)) + } + + /// Perform a volatile read + /// + /// ```rust + /// # use aeron_rs::client::concurrent::AtomicBuffer; + /// let mut bytes = [12, 0, 0, 0, 0, 0, 0, 0]; + /// let buffer = AtomicBuffer::wrap(&mut bytes); + /// + /// assert_eq!(buffer.get_i64_volatile(0), Ok(12)); + /// ``` + pub fn get_i64_volatile(&self, offset: IndexT) -> Result { + // QUESTION: Would it be better to express this in terms of an atomic read? + self.overlay_volatile::(offset) + } + + /// Get the current value at an offset without using any synchronization operations + pub fn get_i64(&self, offset: IndexT) -> Result { + self.overlay::(offset).map(|i| *i) + } + + /// Perform a volatile read + /// + /// ```rust + /// # use aeron_rs::client::concurrent::AtomicBuffer; + /// let mut bytes = [12, 0, 0, 0]; + /// let buffer = AtomicBuffer::wrap(&mut bytes); + /// + /// assert_eq!(buffer.get_i32_volatile(0), Ok(12)); + /// ``` + pub fn get_i32_volatile(&self, offset: IndexT) -> Result { + self.overlay_volatile::(offset) + } + + /// Get the current value at an offset without using any synchronization operations + pub fn get_i32(&self, offset: IndexT) -> Result { + self.overlay::(offset).map(|i| *i) + } + + /// Perform a volatile write of an `i64` into the buffer + /// + /// ```rust + /// # use aeron_rs::client::concurrent::AtomicBuffer; + /// let mut bytes = [0u8; 8]; + /// let mut buffer = AtomicBuffer::wrap(&mut bytes); + /// + /// buffer.put_i64_ordered(0, 12); + /// assert_eq!(buffer.get_i64_volatile(0), Ok(12)); + /// ``` + pub fn put_i64_ordered(&mut self, offset: IndexT, val: i64) -> Result<()> { + // QUESTION: Would it be better to have callers use `write_volatile` directly + self.write_volatile::(offset, val) + } + + /// Perform a volatile write of an `i32` into the buffer + /// + /// ```rust + /// # use aeron_rs::client::concurrent::AtomicBuffer; + /// let mut bytes = [0u8; 4]; + /// let mut buffer = AtomicBuffer::wrap(&mut bytes); + /// + /// buffer.put_i32_ordered(0, 12); + /// assert_eq!(buffer.get_i32_volatile(0), Ok(12)); + /// ``` + pub fn put_i32_ordered(&mut self, offset: IndexT, val: i32) -> Result<()> { + // QUESTION: Would it be better to have callers use `write_volatile` directly + self.write_volatile::(offset, val) + } + + /// Write the contents of one buffer to another. Does not perform any synchronization. + /// + /// ```rust + /// # use aeron_rs::client::concurrent::AtomicBuffer; + /// let mut source_bytes = [1u8, 2, 3, 4]; + /// let source = AtomicBuffer::wrap(&mut source_bytes); + /// + /// let mut dest_bytes = [0, 0, 0, 0]; + /// let mut dest = AtomicBuffer::wrap(&mut dest_bytes); + /// + /// dest.put_bytes(1, &source, 1, 3); + /// drop(dest); + /// assert_eq!(dest_bytes, [0u8, 2, 3, 4]); + /// ``` + pub fn put_bytes( + &mut self, + index: IndexT, + source: &AtomicBuffer, + source_index: IndexT, + len: IndexT, + ) -> Result<()> { + self.bounds_check(index, len)?; + source.bounds_check(source_index, len)?; + + let index = index as usize; + let source_index = source_index as usize; + let len = len as usize; + self.buffer[index..index + len].copy_from_slice(&source[source_index..source_index + len]); + Ok(()) + } + + /// Compare an expected value with what is in memory, and if it matches, + /// update to a new value. Returns `Ok(true)` if the update was successful, + /// and `Ok(false)` if the update failed. + /// + /// ```rust + /// # use aeron_rs::client::concurrent::AtomicBuffer; + /// let mut buf = [0u8; 8]; + /// let atomic_buf = AtomicBuffer::wrap(&mut buf); + /// // Set value to 1 + /// atomic_buf.get_and_add_i64(0, 1).unwrap(); + /// + /// // Set value to 1 if existing value is 0 + /// assert_eq!(atomic_buf.compare_and_set_i64(0, 0, 1), Ok(false)); + /// // Set value to 2 if existing value is 1 + /// assert_eq!(atomic_buf.compare_and_set_i64(0, 1, 2), Ok(true)); + /// assert_eq!(atomic_buf.get_i64_volatile(0), Ok(2)); + /// ``` + pub fn compare_and_set_i64(&self, offset: IndexT, expected: i64, update: i64) -> Result { + // QUESTION: Should I use a volatile read here as well? + // Aeron C++ uses a volatile read before the atomic operation, but I think that + // may be redundant. In addition, Rust's `read_volatile` operation returns a + // *copied* value; running `compare_exchange` on that copy introduces a race condition + // because we're no longer comparing a consistent address. + self.overlay::(offset).map(|a| { + a.compare_exchange(expected, update, Ordering::SeqCst, Ordering::SeqCst) + .is_ok() + }) + } + + /// Repeatedly write a value into an atomic buffer. Guaranteed to use `memset`. + pub fn set_memory(&mut self, offset: IndexT, length: usize, value: u8) -> Result<()> { + self.bounds_check(offset, length as IndexT).map(|_| unsafe { + self.buffer + .as_mut_ptr() + .offset(offset as isize) + .write_bytes(value, length) + }) + } +} + +#[cfg(test)] +mod tests { + use memmap::MmapOptions; + use std::sync::atomic::{AtomicU64, Ordering}; + + use crate::client::concurrent::AtomicBuffer; + use crate::util::AeronError; + + #[test] + fn mmap_to_atomic() { + let mut mmap = MmapOptions::new() + .len(24) + .map_anon() + .expect("Unable to map anonymous memory"); + AtomicBuffer::wrap(&mut mmap); + } + + #[test] + fn primitive_atomic_equivalent() { + let value: u64 = 24; + + let val_ptr = &value as *const u64; + let a_ptr = val_ptr as *const AtomicU64; + let a: &AtomicU64 = unsafe { &*a_ptr }; + + assert_eq!(value, (*a).load(Ordering::SeqCst)); + } + + #[test] + fn negative_offset() { + let mut buf = [16, 0, 0, 0, 0, 0, 0, 0]; + let atomic_buf = AtomicBuffer::wrap(&mut buf); + assert_eq!( + atomic_buf.get_and_add_i64(-1, 0), + Err(AeronError::OutOfBounds) + ) + } +} diff --git a/aeron-rs/src/client/concurrent/ring_buffer.rs b/aeron-rs/src/client/concurrent/ringbuffer.rs similarity index 98% rename from aeron-rs/src/client/concurrent/ring_buffer.rs rename to aeron-rs/src/client/concurrent/ringbuffer.rs index 5530a52..addf9a8 100644 --- a/aeron-rs/src/client/concurrent/ring_buffer.rs +++ b/aeron-rs/src/client/concurrent/ringbuffer.rs @@ -1,11 +1,11 @@ //! Ring buffer wrapper for communicating with the Media Driver -use crate::client::concurrent::atomic_buffer::AtomicBuffer; +use crate::client::concurrent::AtomicBuffer; use crate::util::bit::align; use crate::util::{bit, AeronError, IndexT, Result}; /// Description of the Ring Buffer schema. pub mod buffer_descriptor { - use crate::client::concurrent::atomic_buffer::AtomicBuffer; + use crate::client::concurrent::AtomicBuffer; use crate::util::bit::{is_power_of_two, CACHE_LINE_LENGTH}; use crate::util::AeronError::IllegalArgument; use crate::util::{IndexT, Result}; @@ -354,8 +354,8 @@ impl<'a> ManyToOneRingBuffer<'a> { #[cfg(test)] mod tests { - use crate::client::concurrent::atomic_buffer::AtomicBuffer; - use crate::client::concurrent::ring_buffer::{ + use crate::client::concurrent::AtomicBuffer; + use crate::client::concurrent::ringbuffer::{ buffer_descriptor, record_descriptor, ManyToOneRingBuffer, }; use crate::util::IndexT; diff --git a/aeron-rs/src/client/driver_proxy.rs b/aeron-rs/src/client/driver_proxy.rs index b1a84e4..71b9ee2 100644 --- a/aeron-rs/src/client/driver_proxy.rs +++ b/aeron-rs/src/client/driver_proxy.rs @@ -1,7 +1,7 @@ //! Proxy object for interacting with the Media Driver. Handles operations //! involving the command-and-control file protocol. -use crate::client::concurrent::ring_buffer::ManyToOneRingBuffer; +use crate::client::concurrent::ringbuffer::ManyToOneRingBuffer; /// Proxy object for operations involving the Media Driver pub struct DriverProxy<'a> { diff --git a/aeron-rs/tests/cnc_terminate.rs b/aeron-rs/tests/cnc_terminate.rs index 73728b5..6ef9da9 100644 --- a/aeron-rs/tests/cnc_terminate.rs +++ b/aeron-rs/tests/cnc_terminate.rs @@ -1,7 +1,7 @@ use aeron_driver_sys::*; use aeron_rs::client::cnc_descriptor; -use aeron_rs::client::concurrent::atomic_buffer::AtomicBuffer; -use aeron_rs::client::concurrent::ring_buffer::ManyToOneRingBuffer; +use aeron_rs::client::concurrent::AtomicBuffer; +use aeron_rs::client::concurrent::ringbuffer::ManyToOneRingBuffer; use aeron_rs::util::IndexT; use memmap::MmapOptions; use std::ffi::{c_void, CString}; From fd23f2891ad3e69a6d7c63cd167e245c4d037e80 Mon Sep 17 00:00:00 2001 From: Bradlee Speice Date: Sat, 2 Nov 2019 14:59:52 -0400 Subject: [PATCH 2/7] AtomicBuffer trait part two I have a much better grasp of what I need to do now. --- aeron-rs/src/client/concurrent/mod.rs | 280 ++++--------------- aeron-rs/src/client/concurrent/ringbuffer.rs | 44 ++- aeron-rs/src/client/driver_proxy.rs | 12 - aeron-rs/src/client/mod.rs | 1 - aeron-rs/tests/cnc_terminate.rs | 2 + 5 files changed, 91 insertions(+), 248 deletions(-) delete mode 100644 aeron-rs/src/client/driver_proxy.rs diff --git a/aeron-rs/src/client/concurrent/mod.rs b/aeron-rs/src/client/concurrent/mod.rs index 67a7e90..3928cd2 100644 --- a/aeron-rs/src/client/concurrent/mod.rs +++ b/aeron-rs/src/client/concurrent/mod.rs @@ -3,33 +3,30 @@ pub mod ringbuffer; use std::mem::size_of; -use std::ops::Deref; use std::sync::atomic::{AtomicI64, Ordering}; use crate::util::{AeronError, IndexT, Result}; use std::ptr::{read_volatile, write_volatile}; -/// Wrapper for atomic operations around an underlying byte buffer -pub struct AtomicBuffer<'a> { - buffer: &'a mut [u8], -} +use std::ops::{DerefMut, Deref}; -impl<'a> Deref for AtomicBuffer<'a> { - type Target = [u8]; - - fn deref(&self) -> &Self::Target { - self.buffer - } -} - -impl<'a> AtomicBuffer<'a> { - /// Create an `AtomicBuffer` as a view on an underlying byte slice - pub fn wrap(buffer: &'a mut [u8]) -> Self { - AtomicBuffer { buffer } - } +/// Atomic operations on slices of memory +pub trait AtomicBuffer: Deref + DerefMut { + /// Check that there are at least `size` bytes of memory available + /// beginning at some offset. + /// + /// ```rust + /// # use aeron_rs::client::concurrent::AtomicBuffer; + /// + /// let buffer = &mut [0u8; 8][..]; + /// assert!(buffer.bounds_check(0, 8).is_ok()); + /// assert!(buffer.bounds_check(1, 7).is_ok()); + /// assert!(buffer.bounds_check(1, 8).is_err()); + /// assert!(buffer.bounds_check(-1, 8).is_err()); + /// ``` fn bounds_check(&self, offset: IndexT, size: IndexT) -> Result<()> { - if offset < 0 || size < 0 || self.buffer.len() as IndexT - offset < size { + if offset < 0 || size < 0 || self.deref().len() as IndexT - offset < size { Err(AeronError::OutOfBounds) } else { Ok(()) @@ -39,236 +36,79 @@ impl<'a> AtomicBuffer<'a> { /// Overlay a struct on a buffer. /// /// NOTE: Has the potential to cause undefined behavior if alignment is incorrect. - pub fn overlay(&self, offset: IndexT) -> Result<&T> + /// + /// ```rust + /// # use aeron_rs::client::concurrent::AtomicBuffer; + /// # use std::sync::atomic::{AtomicI64, Ordering}; + /// let buffer = &mut [0u8; 9][..]; + /// + /// let my_val: &AtomicI64 = buffer.overlay::(0).unwrap(); + /// assert_eq!(my_val.load(Ordering::SeqCst), 0); + /// + /// my_val.store(1, Ordering::SeqCst); + /// assert_eq!(buffer, [1, 0, 0, 0, 0, 0, 0, 0, 0]); + /// + /// let another_val: &AtomicI64 = buffer.overlay::(1).unwrap(); + /// assert_eq!(another_val.load(Ordering::SeqCst), 0); + /// ``` + fn overlay(&self, offset: IndexT) -> Result<&T> where - T: Sized, + T: Sized { self.bounds_check(offset, size_of::() as IndexT) .map(|_| { - let offset_ptr = unsafe { self.buffer.as_ptr().offset(offset as isize) }; + let offset_ptr = unsafe { self.as_ptr().offset(offset as isize) }; unsafe { &*(offset_ptr as *const T) } }) } + /// Overlay a struct on a buffer, and perform a volatile read + /// + /// ```rust + /// # use aeron_rs::client::concurrent::AtomicBuffer; + /// let buffer = &mut [5, 0, 0, 0][..]; + /// + /// let my_val: u32 = buffer.overlay_volatile::(0).unwrap(); + /// assert_eq!(my_val, 5); + /// ``` fn overlay_volatile(&self, offset: IndexT) -> Result where T: Copy, { self.bounds_check(offset, size_of::() as IndexT) .map(|_| { - let offset_ptr = unsafe { self.buffer.as_ptr().offset(offset as isize) }; + let offset_ptr = unsafe { self.as_ptr().offset(offset as isize) }; unsafe { read_volatile(offset_ptr as *const T) } }) } + /// Perform a volatile write of a value over a buffer + /// + /// ```rust + /// # use aeron_rs::client::concurrent::AtomicBuffer; + /// let mut buffer = &mut [0, 0, 0, 0][..]; + /// + /// let value: u32 = 24; + /// buffer.write_volatile(0, value).unwrap(); + /// assert_eq!(buffer, [24, 0, 0, 0]); + /// ``` fn write_volatile(&mut self, offset: IndexT, val: T) -> Result<()> where T: Copy, { self.bounds_check(offset, size_of::() as IndexT) .map(|_| { - let offset_ptr = unsafe { self.buffer.as_mut_ptr().offset(offset as isize) }; + let offset_ptr = unsafe { self.as_mut_ptr().offset(offset as isize) }; unsafe { write_volatile(offset_ptr as *mut T, val) }; }) } - /// Atomically fetch the current value at an offset, and increment by delta - /// - /// ```rust - /// # use aeron_rs::client::concurrent::AtomicBuffer; - /// # use aeron_rs::util::AeronError; - /// let mut bytes = [0u8; 9]; - /// let mut buffer = AtomicBuffer::wrap(&mut bytes); - /// - /// // Simple case modifies only the first byte - /// assert_eq!(buffer.get_and_add_i64(0, 1), Ok(0)); - /// assert_eq!(buffer.get_and_add_i64(0, 0), Ok(1)); - /// - /// // Using an offset modifies the second byte - /// assert_eq!(buffer.get_and_add_i64(1, 1), Ok(0)); - /// assert_eq!(buffer.get_and_add_i64(1, 0), Ok(1)); - /// - /// // An offset of 2 means buffer size must be 10 to contain an `i64` - /// assert_eq!(buffer.get_and_add_i64(2, 0), Err(AeronError::OutOfBounds)); - /// ``` - pub fn get_and_add_i64(&self, offset: IndexT, delta: i64) -> Result { - self.overlay::(offset) - .map(|a| a.fetch_add(delta, Ordering::SeqCst)) - } - - /// Perform a volatile read - /// - /// ```rust - /// # use aeron_rs::client::concurrent::AtomicBuffer; - /// let mut bytes = [12, 0, 0, 0, 0, 0, 0, 0]; - /// let buffer = AtomicBuffer::wrap(&mut bytes); - /// - /// assert_eq!(buffer.get_i64_volatile(0), Ok(12)); - /// ``` - pub fn get_i64_volatile(&self, offset: IndexT) -> Result { - // QUESTION: Would it be better to express this in terms of an atomic read? - self.overlay_volatile::(offset) - } - - /// Get the current value at an offset without using any synchronization operations - pub fn get_i64(&self, offset: IndexT) -> Result { - self.overlay::(offset).map(|i| *i) - } - - /// Perform a volatile read - /// - /// ```rust - /// # use aeron_rs::client::concurrent::AtomicBuffer; - /// let mut bytes = [12, 0, 0, 0]; - /// let buffer = AtomicBuffer::wrap(&mut bytes); - /// - /// assert_eq!(buffer.get_i32_volatile(0), Ok(12)); - /// ``` - pub fn get_i32_volatile(&self, offset: IndexT) -> Result { - self.overlay_volatile::(offset) - } - - /// Get the current value at an offset without using any synchronization operations - pub fn get_i32(&self, offset: IndexT) -> Result { - self.overlay::(offset).map(|i| *i) - } - - /// Perform a volatile write of an `i64` into the buffer - /// - /// ```rust - /// # use aeron_rs::client::concurrent::AtomicBuffer; - /// let mut bytes = [0u8; 8]; - /// let mut buffer = AtomicBuffer::wrap(&mut bytes); - /// - /// buffer.put_i64_ordered(0, 12); - /// assert_eq!(buffer.get_i64_volatile(0), Ok(12)); - /// ``` - pub fn put_i64_ordered(&mut self, offset: IndexT, val: i64) -> Result<()> { - // QUESTION: Would it be better to have callers use `write_volatile` directly - self.write_volatile::(offset, val) - } - - /// Perform a volatile write of an `i32` into the buffer - /// - /// ```rust - /// # use aeron_rs::client::concurrent::AtomicBuffer; - /// let mut bytes = [0u8; 4]; - /// let mut buffer = AtomicBuffer::wrap(&mut bytes); - /// - /// buffer.put_i32_ordered(0, 12); - /// assert_eq!(buffer.get_i32_volatile(0), Ok(12)); - /// ``` - pub fn put_i32_ordered(&mut self, offset: IndexT, val: i32) -> Result<()> { - // QUESTION: Would it be better to have callers use `write_volatile` directly - self.write_volatile::(offset, val) - } - - /// Write the contents of one buffer to another. Does not perform any synchronization. - /// - /// ```rust - /// # use aeron_rs::client::concurrent::AtomicBuffer; - /// let mut source_bytes = [1u8, 2, 3, 4]; - /// let source = AtomicBuffer::wrap(&mut source_bytes); - /// - /// let mut dest_bytes = [0, 0, 0, 0]; - /// let mut dest = AtomicBuffer::wrap(&mut dest_bytes); - /// - /// dest.put_bytes(1, &source, 1, 3); - /// drop(dest); - /// assert_eq!(dest_bytes, [0u8, 2, 3, 4]); - /// ``` - pub fn put_bytes( - &mut self, - index: IndexT, - source: &AtomicBuffer, - source_index: IndexT, - len: IndexT, - ) -> Result<()> { - self.bounds_check(index, len)?; - source.bounds_check(source_index, len)?; - - let index = index as usize; - let source_index = source_index as usize; - let len = len as usize; - self.buffer[index..index + len].copy_from_slice(&source[source_index..source_index + len]); - Ok(()) - } - - /// Compare an expected value with what is in memory, and if it matches, - /// update to a new value. Returns `Ok(true)` if the update was successful, - /// and `Ok(false)` if the update failed. - /// - /// ```rust - /// # use aeron_rs::client::concurrent::AtomicBuffer; - /// let mut buf = [0u8; 8]; - /// let atomic_buf = AtomicBuffer::wrap(&mut buf); - /// // Set value to 1 - /// atomic_buf.get_and_add_i64(0, 1).unwrap(); - /// - /// // Set value to 1 if existing value is 0 - /// assert_eq!(atomic_buf.compare_and_set_i64(0, 0, 1), Ok(false)); - /// // Set value to 2 if existing value is 1 - /// assert_eq!(atomic_buf.compare_and_set_i64(0, 1, 2), Ok(true)); - /// assert_eq!(atomic_buf.get_i64_volatile(0), Ok(2)); - /// ``` - pub fn compare_and_set_i64(&self, offset: IndexT, expected: i64, update: i64) -> Result { - // QUESTION: Should I use a volatile read here as well? - // Aeron C++ uses a volatile read before the atomic operation, but I think that - // may be redundant. In addition, Rust's `read_volatile` operation returns a - // *copied* value; running `compare_exchange` on that copy introduces a race condition - // because we're no longer comparing a consistent address. - self.overlay::(offset).map(|a| { - a.compare_exchange(expected, update, Ordering::SeqCst, Ordering::SeqCst) - .is_ok() - }) - } - - /// Repeatedly write a value into an atomic buffer. Guaranteed to use `memset`. - pub fn set_memory(&mut self, offset: IndexT, length: usize, value: u8) -> Result<()> { - self.bounds_check(offset, length as IndexT).map(|_| unsafe { - self.buffer - .as_mut_ptr() - .offset(offset as isize) - .write_bytes(value, length) - }) + /// Perform an atomic fetch and add of a 64-bit value + fn get_and_add_i64(&self, offset: IndexT, value: i64) -> Result { + self.overlay::(offset).map(|a| a.fetch_add(value, Ordering::SeqCst)) } } -#[cfg(test)] -mod tests { - use memmap::MmapOptions; - use std::sync::atomic::{AtomicU64, Ordering}; +impl AtomicBuffer for Vec {} - use crate::client::concurrent::AtomicBuffer; - use crate::util::AeronError; - - #[test] - fn mmap_to_atomic() { - let mut mmap = MmapOptions::new() - .len(24) - .map_anon() - .expect("Unable to map anonymous memory"); - AtomicBuffer::wrap(&mut mmap); - } - - #[test] - fn primitive_atomic_equivalent() { - let value: u64 = 24; - - let val_ptr = &value as *const u64; - let a_ptr = val_ptr as *const AtomicU64; - let a: &AtomicU64 = unsafe { &*a_ptr }; - - assert_eq!(value, (*a).load(Ordering::SeqCst)); - } - - #[test] - fn negative_offset() { - let mut buf = [16, 0, 0, 0, 0, 0, 0, 0]; - let atomic_buf = AtomicBuffer::wrap(&mut buf); - assert_eq!( - atomic_buf.get_and_add_i64(-1, 0), - Err(AeronError::OutOfBounds) - ) - } -} +impl AtomicBuffer for &mut [u8] {} diff --git a/aeron-rs/src/client/concurrent/ringbuffer.rs b/aeron-rs/src/client/concurrent/ringbuffer.rs index addf9a8..b837957 100644 --- a/aeron-rs/src/client/concurrent/ringbuffer.rs +++ b/aeron-rs/src/client/concurrent/ringbuffer.rs @@ -30,7 +30,10 @@ pub mod buffer_descriptor { /// Verify the capacity of a buffer is legal for use as a ring buffer. /// Returns the actual capacity excluding ring buffer metadata. - pub fn check_capacity(buffer: &AtomicBuffer<'_>) -> Result { + pub fn check_capacity(buffer: &A) -> Result + where + A: AtomicBuffer + { let capacity = (buffer.len() - TRAILER_LENGTH as usize) as IndexT; if is_power_of_two(capacity) { Ok(capacity) @@ -107,8 +110,11 @@ pub mod record_descriptor { } /// Multi-producer, single-consumer ring buffer implementation. -pub struct ManyToOneRingBuffer<'a> { - buffer: AtomicBuffer<'a>, +pub struct ManyToOneRingBuffer +where + A: AtomicBuffer +{ + buffer: A, capacity: IndexT, max_msg_length: IndexT, tail_position_index: IndexT, @@ -117,9 +123,12 @@ pub struct ManyToOneRingBuffer<'a> { correlation_id_counter_index: IndexT, } -impl<'a> ManyToOneRingBuffer<'a> { +impl ManyToOneRingBuffer +where + A: AtomicBuffer +{ /// Create a many-to-one ring buffer from an underlying atomic buffer. - pub fn wrap(buffer: AtomicBuffer<'a>) -> Result { + pub fn new(buffer: A) -> Result { buffer_descriptor::check_capacity(&buffer).map(|capacity| ManyToOneRingBuffer { buffer, capacity, @@ -140,14 +149,18 @@ impl<'a> ManyToOneRingBuffer<'a> { .unwrap() } + /* /// Write a message into the ring buffer - pub fn write( + pub fn write( &mut self, msg_type_id: i32, - source: &AtomicBuffer, + source: &B, source_index: IndexT, length: IndexT, - ) -> Result<()> { + ) -> Result<()> + where + B: AtomicBuffer + { record_descriptor::check_msg_type_id(msg_type_id)?; self.check_msg_length(length)?; @@ -182,7 +195,7 @@ impl<'a> ManyToOneRingBuffer<'a> { /// Read messages from the ring buffer and dispatch to `handler`, up to `message_count_limit` pub fn read(&mut self, mut handler: F, message_count_limit: usize) -> Result where - F: FnMut(i32, &AtomicBuffer, IndexT, IndexT) -> (), + F: FnMut(i32, &A, IndexT, IndexT) -> (), { // UNWRAP: Bounds check performed during buffer creation let head = self.buffer.get_i64(self.head_position_index).unwrap(); @@ -350,8 +363,10 @@ impl<'a> ManyToOneRingBuffer<'a> { Ok(()) } } + */ } +/* #[cfg(test)] mod tests { use crate::client::concurrent::AtomicBuffer; @@ -415,16 +430,15 @@ mod tests { let buffer = AtomicBuffer::wrap(&mut bytes); let mut ring_buffer = ManyToOneRingBuffer::wrap(buffer).expect("Invalid buffer size"); - let mut source_bytes = [12, 0, 0, 0, 0, 0, 0, 0]; + let mut source_buffer = &mut [12u8, 0, 0, 0, 0, 0, 0, 0][..]; let source_len = source_bytes.len() as IndexT; - let source_buffer = AtomicBuffer::wrap(&mut source_bytes); let type_id = 1; ring_buffer .write(type_id, &source_buffer, 0, source_len) .unwrap(); // Now we can start the actual read process - let c = |_, buf: &AtomicBuffer, offset, _| { + let c = |_, buf: &dyn AtomicBuffer, offset, _| { assert_eq!(buf.get_i64_volatile(offset).unwrap(), 12) }; ring_buffer.read(c, 1).unwrap(); @@ -443,9 +457,8 @@ mod tests { let buffer = AtomicBuffer::wrap(&mut bytes); let mut ring_buffer = ManyToOneRingBuffer::wrap(buffer).expect("Invalid buffer size"); - let mut source_bytes = [12, 0, 0, 0, 0, 0, 0, 0]; + let mut source_buffer = &mut [12u8, 0, 0, 0, 0, 0, 0, 0][..]; let source_len = source_bytes.len() as IndexT; - let source_buffer = AtomicBuffer::wrap(&mut source_bytes); let type_id = 1; ring_buffer .write(type_id, &source_buffer, 0, source_len) @@ -455,7 +468,7 @@ mod tests { .unwrap(); let mut msg_count = 0; - let c = |_, buf: &AtomicBuffer, offset, _| { + let c = |_, buf: &dyn AtomicBuffer, offset, _| { msg_count += 1; assert_eq!(buf.get_i64_volatile(offset).unwrap(), 12); }; @@ -470,3 +483,4 @@ mod tests { } } } +*/ diff --git a/aeron-rs/src/client/driver_proxy.rs b/aeron-rs/src/client/driver_proxy.rs deleted file mode 100644 index 71b9ee2..0000000 --- a/aeron-rs/src/client/driver_proxy.rs +++ /dev/null @@ -1,12 +0,0 @@ -//! Proxy object for interacting with the Media Driver. Handles operations -//! involving the command-and-control file protocol. - -use crate::client::concurrent::ringbuffer::ManyToOneRingBuffer; - -/// Proxy object for operations involving the Media Driver -pub struct DriverProxy<'a> { - _to_driver: ManyToOneRingBuffer<'a>, - _client_id: i64, -} - -impl<'a> DriverProxy<'a> {} diff --git a/aeron-rs/src/client/mod.rs b/aeron-rs/src/client/mod.rs index b00370e..32093de 100644 --- a/aeron-rs/src/client/mod.rs +++ b/aeron-rs/src/client/mod.rs @@ -4,4 +4,3 @@ pub mod cnc_descriptor; pub mod concurrent; pub mod context; -pub mod driver_proxy; diff --git a/aeron-rs/tests/cnc_terminate.rs b/aeron-rs/tests/cnc_terminate.rs index 6ef9da9..7c3d32b 100644 --- a/aeron-rs/tests/cnc_terminate.rs +++ b/aeron-rs/tests/cnc_terminate.rs @@ -77,6 +77,7 @@ fn driver_thread(aeron_dir: PathBuf) { unsafe { aeron_driver_context_close(context) }; } +/* #[test] fn cnc_terminate() { let temp_dir = tempdir().unwrap(); @@ -141,3 +142,4 @@ fn cnc_terminate() { .expect("Driver thread panicked during execution"); assert_eq!(RUNNING.load(Ordering::SeqCst), false); } +*/ From 02638d20c0ec4df68c3382b05834f4489608cc16 Mon Sep 17 00:00:00 2001 From: Bradlee Speice Date: Sat, 2 Nov 2019 15:18:52 -0400 Subject: [PATCH 3/7] `claim_capacity` now working again --- aeron-rs/src/client/concurrent/mod.rs | 76 +++++++++++++++++--- aeron-rs/src/client/concurrent/ringbuffer.rs | 37 +++++++--- aeron-rs/tests/cnc_terminate.rs | 2 +- 3 files changed, 95 insertions(+), 20 deletions(-) diff --git a/aeron-rs/src/client/concurrent/mod.rs b/aeron-rs/src/client/concurrent/mod.rs index 3928cd2..e4339fc 100644 --- a/aeron-rs/src/client/concurrent/mod.rs +++ b/aeron-rs/src/client/concurrent/mod.rs @@ -8,11 +8,10 @@ use std::sync::atomic::{AtomicI64, Ordering}; use crate::util::{AeronError, IndexT, Result}; use std::ptr::{read_volatile, write_volatile}; -use std::ops::{DerefMut, Deref}; +use std::ops::{Deref, DerefMut}; /// Atomic operations on slices of memory pub trait AtomicBuffer: Deref + DerefMut { - /// Check that there are at least `size` bytes of memory available /// beginning at some offset. /// @@ -52,8 +51,8 @@ pub trait AtomicBuffer: Deref + DerefMut { /// assert_eq!(another_val.load(Ordering::SeqCst), 0); /// ``` fn overlay(&self, offset: IndexT) -> Result<&T> - where - T: Sized + where + T: Sized, { self.bounds_check(offset, size_of::() as IndexT) .map(|_| { @@ -72,8 +71,8 @@ pub trait AtomicBuffer: Deref + DerefMut { /// assert_eq!(my_val, 5); /// ``` fn overlay_volatile(&self, offset: IndexT) -> Result - where - T: Copy, + where + T: Copy, { self.bounds_check(offset, size_of::() as IndexT) .map(|_| { @@ -93,8 +92,8 @@ pub trait AtomicBuffer: Deref + DerefMut { /// assert_eq!(buffer, [24, 0, 0, 0]); /// ``` fn write_volatile(&mut self, offset: IndexT, val: T) -> Result<()> - where - T: Copy, + where + T: Copy, { self.bounds_check(offset, size_of::() as IndexT) .map(|_| { @@ -104,8 +103,67 @@ pub trait AtomicBuffer: Deref + DerefMut { } /// Perform an atomic fetch and add of a 64-bit value + /// + /// ```rust + /// # use aeron_rs::client::concurrent::AtomicBuffer; + /// let mut buf = vec![0u8; 8]; + /// assert_eq!(buf.get_and_add_i64(0, 1), Ok(0)); + /// assert_eq!(buf.get_and_add_i64(0, 1), Ok(1)); + /// ``` fn get_and_add_i64(&self, offset: IndexT, value: i64) -> Result { - self.overlay::(offset).map(|a| a.fetch_add(value, Ordering::SeqCst)) + self.overlay::(offset) + .map(|a| a.fetch_add(value, Ordering::SeqCst)) + } + + /// Perform an atomic Compare-And-Swap of a 64-bit value. Returns `Ok(true)` + /// if the update was successful, and `Ok(false)` if the update failed. + /// + /// ```rust + /// # use aeron_rs::client::concurrent::AtomicBuffer; + /// let mut buf = &mut [0u8; 8][..]; + /// // Set value to 1 + /// buf.get_and_add_i64(0, 1).unwrap(); + /// + /// // Set value to 1 if existing value is 0 + /// assert_eq!(buf.compare_and_set_i64(0, 0, 1), Ok(false)); + /// // Set value to 2 if existing value is 1 + /// assert_eq!(buf.compare_and_set_i64(0, 1, 2), Ok(true)); + /// assert_eq!(buf.get_i64_volatile(0), Ok(2)); + /// ``` + fn compare_and_set_i64(&self, offset: IndexT, expected: i64, update: i64) -> Result { + // QUESTION: Should I use a volatile read here as well? + // Aeron C++ uses a volatile read before the atomic operation, but I think that + // may be redundant. In addition, Rust's `read_volatile` operation returns a + // *copied* value; running `compare_exchange` on that copy introduces a race condition + // because we're no longer comparing a consistent address. + self.overlay::(offset).map(|a| { + a.compare_exchange(expected, update, Ordering::SeqCst, Ordering::SeqCst) + .is_ok() + }) + } + + /// Perform a volatile read of an `i64` value + /// + /// ```rust + /// # use aeron_rs::client::concurrent::AtomicBuffer; + /// let buffer = vec![12u8, 0, 0, 0, 0, 0, 0, 0]; + /// assert_eq!(buffer.get_i64_volatile(0), Ok(12)); + /// ``` + fn get_i64_volatile(&self, offset: IndexT) -> Result { + // QUESTION: Would it be better to express this in terms of an atomic read? + self.overlay_volatile::(offset) + } + + /// Perform a volatile write of an `i64` value + /// + /// ```rust + /// # use aeron_rs::client::concurrent::AtomicBuffer; + /// let mut buffer = vec![0u8; 8]; + /// buffer.put_i64_ordered(0, 12); + /// assert_eq!(buffer.get_i64_volatile(0), Ok(12)); + /// ``` + fn put_i64_ordered(&mut self, offset: IndexT, value: i64) -> Result<()> { + self.write_volatile::(offset, value) } } diff --git a/aeron-rs/src/client/concurrent/ringbuffer.rs b/aeron-rs/src/client/concurrent/ringbuffer.rs index b837957..e74d577 100644 --- a/aeron-rs/src/client/concurrent/ringbuffer.rs +++ b/aeron-rs/src/client/concurrent/ringbuffer.rs @@ -32,7 +32,7 @@ pub mod buffer_descriptor { /// Returns the actual capacity excluding ring buffer metadata. pub fn check_capacity(buffer: &A) -> Result where - A: AtomicBuffer + A: AtomicBuffer, { let capacity = (buffer.len() - TRAILER_LENGTH as usize) as IndexT; if is_power_of_two(capacity) { @@ -112,7 +112,7 @@ pub mod record_descriptor { /// Multi-producer, single-consumer ring buffer implementation. pub struct ManyToOneRingBuffer where - A: AtomicBuffer + A: AtomicBuffer, { buffer: A, capacity: IndexT, @@ -125,7 +125,7 @@ where impl ManyToOneRingBuffer where - A: AtomicBuffer + A: AtomicBuffer, { /// Create a many-to-one ring buffer from an underlying atomic buffer. pub fn new(buffer: A) -> Result { @@ -256,6 +256,7 @@ where Ok(messages_read) } + */ /// Claim capacity for a specific message size in the ring buffer. Returns the offset/index /// at which to start writing the next record. @@ -363,10 +364,8 @@ where Ok(()) } } - */ } -/* #[cfg(test)] mod tests { use crate::client::concurrent::AtomicBuffer; @@ -377,12 +376,10 @@ mod tests { use std::mem::size_of; #[test] - fn claim_capacity_basic() { + fn claim_capacity_owned() { let buf_size = super::buffer_descriptor::TRAILER_LENGTH as usize + 64; let mut buf = vec![0u8; buf_size]; - - let atomic_buf = AtomicBuffer::wrap(&mut buf); - let mut ring_buf = ManyToOneRingBuffer::wrap(atomic_buf).unwrap(); + let mut ring_buf = ManyToOneRingBuffer::new(buf).unwrap(); ring_buf.claim_capacity(16).unwrap(); assert_eq!( @@ -396,6 +393,26 @@ mod tests { assert_eq!(write_start, 16); } + const TEST_BUFFER_SIZE: usize = super::buffer_descriptor::TRAILER_LENGTH as usize + 64; + + #[test] + fn claim_capacity_shared() { + let mut buf = &mut [0u8; TEST_BUFFER_SIZE][..]; + let mut ring_buf = ManyToOneRingBuffer::new(buf).unwrap(); + + ring_buf.claim_capacity(16).unwrap(); + assert_eq!( + ring_buf + .buffer + .get_i64_volatile(ring_buf.tail_position_index), + Ok(16) + ); + + let write_start = ring_buf.claim_capacity(16).unwrap(); + assert_eq!(write_start, 16); + } + + /* #[test] fn write_basic() { let mut bytes = vec![0u8; 512 + buffer_descriptor::TRAILER_LENGTH as usize]; @@ -482,5 +499,5 @@ mod tests { assert_eq!(buffer.get_i32(i).unwrap(), 0); } } + */ } -*/ diff --git a/aeron-rs/tests/cnc_terminate.rs b/aeron-rs/tests/cnc_terminate.rs index 7c3d32b..e4bc449 100644 --- a/aeron-rs/tests/cnc_terminate.rs +++ b/aeron-rs/tests/cnc_terminate.rs @@ -1,7 +1,7 @@ use aeron_driver_sys::*; use aeron_rs::client::cnc_descriptor; -use aeron_rs::client::concurrent::AtomicBuffer; use aeron_rs::client::concurrent::ringbuffer::ManyToOneRingBuffer; +use aeron_rs::client::concurrent::AtomicBuffer; use aeron_rs::util::IndexT; use memmap::MmapOptions; use std::ffi::{c_void, CString}; From ed766ce86bbfdf64ffedfd2e8e1216593e4c6dc3 Mon Sep 17 00:00:00 2001 From: Bradlee Speice Date: Sat, 2 Nov 2019 15:40:52 -0400 Subject: [PATCH 4/7] `write` now working again as well --- aeron-rs/src/client/concurrent/mod.rs | 45 ++++++++++++++++++ aeron-rs/src/client/concurrent/ringbuffer.rs | 48 ++++++++++++-------- 2 files changed, 73 insertions(+), 20 deletions(-) diff --git a/aeron-rs/src/client/concurrent/mod.rs b/aeron-rs/src/client/concurrent/mod.rs index e4339fc..679163e 100644 --- a/aeron-rs/src/client/concurrent/mod.rs +++ b/aeron-rs/src/client/concurrent/mod.rs @@ -165,6 +165,51 @@ pub trait AtomicBuffer: Deref + DerefMut { fn put_i64_ordered(&mut self, offset: IndexT, value: i64) -> Result<()> { self.write_volatile::(offset, value) } + + /// Write the contents of one buffer to another. Does not perform any synchronization + fn put_bytes( + &mut self, + index: IndexT, + source: &B, + source_index: IndexT, + len: IndexT, + ) -> Result<()> + where + B: AtomicBuffer, + { + self.bounds_check(index, len)?; + source.bounds_check(source_index, len)?; + + let index = index as usize; + let source_index = source_index as usize; + let len = len as usize; + + self[index..index + len].copy_from_slice(&source[source_index..source_index + len]); + Ok(()) + } + + /// Perform a volatile read of an `i32` from the buffer + /// + /// ```rust + /// # use aeron_rs::client::concurrent::AtomicBuffer; + /// let buffer = vec![0, 12, 0, 0, 0]; + /// assert_eq!(buffer.get_i32_volatile(1), Ok(12)); + /// ``` + fn get_i32_volatile(&self, offset: IndexT) -> Result { + self.overlay_volatile::(offset) + } + + /// Perform a volatile write of an `i32` into the buffer + /// + /// ```rust + /// # use aeron_rs::client::concurrent::AtomicBuffer; + /// let mut bytes = vec![0u8; 4]; + /// bytes.put_i32_ordered(0, 12); + /// assert_eq!(bytes.get_i32_volatile(0), Ok(12)); + /// ``` + fn put_i32_ordered(&mut self, offset: IndexT, value: i32) -> Result<()> { + self.write_volatile::(offset, value) + } } impl AtomicBuffer for Vec {} diff --git a/aeron-rs/src/client/concurrent/ringbuffer.rs b/aeron-rs/src/client/concurrent/ringbuffer.rs index e74d577..016dcdc 100644 --- a/aeron-rs/src/client/concurrent/ringbuffer.rs +++ b/aeron-rs/src/client/concurrent/ringbuffer.rs @@ -2,6 +2,7 @@ use crate::client::concurrent::AtomicBuffer; use crate::util::bit::align; use crate::util::{bit, AeronError, IndexT, Result}; +use std::ops::Deref; /// Description of the Ring Buffer schema. pub mod buffer_descriptor { @@ -149,7 +150,6 @@ where .unwrap() } - /* /// Write a message into the ring buffer pub fn write( &mut self, @@ -159,7 +159,7 @@ where length: IndexT, ) -> Result<()> where - B: AtomicBuffer + B: AtomicBuffer, { record_descriptor::check_msg_type_id(msg_type_id)?; self.check_msg_length(length)?; @@ -192,6 +192,7 @@ where Ok(()) } + /* /// Read messages from the ring buffer and dispatch to `handler`, up to `message_count_limit` pub fn read(&mut self, mut handler: F, message_count_limit: usize) -> Result where @@ -366,20 +367,31 @@ where } } +impl Deref for ManyToOneRingBuffer +where + A: AtomicBuffer, +{ + type Target = A; + + fn deref(&self) -> &Self::Target { + &self.buffer + } +} + #[cfg(test)] mod tests { - use crate::client::concurrent::AtomicBuffer; use crate::client::concurrent::ringbuffer::{ buffer_descriptor, record_descriptor, ManyToOneRingBuffer, }; + use crate::client::concurrent::AtomicBuffer; use crate::util::IndexT; use std::mem::size_of; + const BUFFER_SIZE: usize = 512 + super::buffer_descriptor::TRAILER_LENGTH as usize; + #[test] fn claim_capacity_owned() { - let buf_size = super::buffer_descriptor::TRAILER_LENGTH as usize + 64; - let mut buf = vec![0u8; buf_size]; - let mut ring_buf = ManyToOneRingBuffer::new(buf).unwrap(); + let mut ring_buf = ManyToOneRingBuffer::new(vec![0u8; BUFFER_SIZE]).unwrap(); ring_buf.claim_capacity(16).unwrap(); assert_eq!( @@ -393,11 +405,9 @@ mod tests { assert_eq!(write_start, 16); } - const TEST_BUFFER_SIZE: usize = super::buffer_descriptor::TRAILER_LENGTH as usize + 64; - #[test] fn claim_capacity_shared() { - let mut buf = &mut [0u8; TEST_BUFFER_SIZE][..]; + let buf = &mut [0u8; BUFFER_SIZE][..]; let mut ring_buf = ManyToOneRingBuffer::new(buf).unwrap(); ring_buf.claim_capacity(16).unwrap(); @@ -412,34 +422,32 @@ mod tests { assert_eq!(write_start, 16); } - /* #[test] fn write_basic() { - let mut bytes = vec![0u8; 512 + buffer_descriptor::TRAILER_LENGTH as usize]; - let buffer = AtomicBuffer::wrap(&mut bytes); - let mut ring_buffer = ManyToOneRingBuffer::wrap(buffer).expect("Invalid buffer size"); + let mut ring_buffer = + ManyToOneRingBuffer::new(vec![0u8; BUFFER_SIZE]).expect("Invalid buffer size"); - let mut source_bytes = [12, 0, 0, 0, 0, 0, 0, 0]; + let source_bytes = &mut [12u8, 0, 0, 0][..]; let source_len = source_bytes.len() as IndexT; - let source_buffer = AtomicBuffer::wrap(&mut source_bytes); let type_id = 1; ring_buffer - .write(type_id, &source_buffer, 0, source_len) + .write(type_id, &source_bytes, 0, source_len) .unwrap(); - drop(ring_buffer); - let buffer = AtomicBuffer::wrap(&mut bytes); let record_len = source_len + record_descriptor::HEADER_LENGTH; assert_eq!( - buffer.get_i64_volatile(0).unwrap(), + ring_buffer.get_i64_volatile(0).unwrap(), record_descriptor::make_header(record_len, type_id) ); assert_eq!( - buffer.get_i64_volatile(size_of::() as IndexT).unwrap(), + ring_buffer + .get_i64_volatile(size_of::() as IndexT) + .unwrap(), 12 ); } + /* #[test] fn read_basic() { // Similar to write basic, put something into the buffer From 8fac817ba3b14c2d6976f90cc158e9ef40332b17 Mon Sep 17 00:00:00 2001 From: Bradlee Speice Date: Sat, 2 Nov 2019 15:50:23 -0400 Subject: [PATCH 5/7] `read` now working again --- aeron-rs/src/client/concurrent/mod.rs | 7 +++++ aeron-rs/src/client/concurrent/ringbuffer.rs | 33 +++++++------------- 2 files changed, 18 insertions(+), 22 deletions(-) diff --git a/aeron-rs/src/client/concurrent/mod.rs b/aeron-rs/src/client/concurrent/mod.rs index 679163e..8839f0e 100644 --- a/aeron-rs/src/client/concurrent/mod.rs +++ b/aeron-rs/src/client/concurrent/mod.rs @@ -188,6 +188,13 @@ pub trait AtomicBuffer: Deref + DerefMut { Ok(()) } + /// Repeatedly write a value into an atomic buffer. Guaranteed to use `memset`. + fn set_memory(&mut self, offset: IndexT, length: usize, value: u8) -> Result<()> { + self.bounds_check(offset, length as IndexT).map(|_| unsafe { + self.as_mut_ptr().offset(offset as isize).write_bytes(value, length) + }) + } + /// Perform a volatile read of an `i32` from the buffer /// /// ```rust diff --git a/aeron-rs/src/client/concurrent/ringbuffer.rs b/aeron-rs/src/client/concurrent/ringbuffer.rs index 016dcdc..3fa62a1 100644 --- a/aeron-rs/src/client/concurrent/ringbuffer.rs +++ b/aeron-rs/src/client/concurrent/ringbuffer.rs @@ -192,14 +192,14 @@ where Ok(()) } - /* /// Read messages from the ring buffer and dispatch to `handler`, up to `message_count_limit` pub fn read(&mut self, mut handler: F, message_count_limit: usize) -> Result where F: FnMut(i32, &A, IndexT, IndexT) -> (), { + // QUESTION: Should I implement the `get_i64` method that C++ uses? // UNWRAP: Bounds check performed during buffer creation - let head = self.buffer.get_i64(self.head_position_index).unwrap(); + let head = self.buffer.get_i64_volatile(self.head_position_index).unwrap(); let head_index = (head & i64::from(self.capacity - 1)) as i32; let contiguous_block_length = self.capacity - head_index; let mut messages_read = 0; @@ -257,7 +257,6 @@ where Ok(messages_read) } - */ /// Claim capacity for a specific message size in the ring buffer. Returns the offset/index /// at which to start writing the next record. @@ -381,7 +380,7 @@ where #[cfg(test)] mod tests { use crate::client::concurrent::ringbuffer::{ - buffer_descriptor, record_descriptor, ManyToOneRingBuffer, + record_descriptor, ManyToOneRingBuffer, }; use crate::client::concurrent::AtomicBuffer; use crate::util::IndexT; @@ -447,43 +446,36 @@ mod tests { ); } - /* #[test] fn read_basic() { // Similar to write basic, put something into the buffer - let mut bytes = vec![0u8; 512 + buffer_descriptor::TRAILER_LENGTH as usize]; - let buffer = AtomicBuffer::wrap(&mut bytes); - let mut ring_buffer = ManyToOneRingBuffer::wrap(buffer).expect("Invalid buffer size"); + let mut ring_buffer = ManyToOneRingBuffer::new(vec![0u8; BUFFER_SIZE]).expect("Invalid buffer size"); let mut source_buffer = &mut [12u8, 0, 0, 0, 0, 0, 0, 0][..]; - let source_len = source_bytes.len() as IndexT; + let source_len = source_buffer.len() as IndexT; let type_id = 1; ring_buffer .write(type_id, &source_buffer, 0, source_len) .unwrap(); // Now we can start the actual read process - let c = |_, buf: &dyn AtomicBuffer, offset, _| { + let c = |_, buf: &Vec, offset, _| { assert_eq!(buf.get_i64_volatile(offset).unwrap(), 12) }; ring_buffer.read(c, 1).unwrap(); // Make sure that the buffer was zeroed on finish - drop(ring_buffer); - let buffer = AtomicBuffer::wrap(&mut bytes); for i in (0..record_descriptor::ALIGNMENT * 1).step_by(4) { - assert_eq!(buffer.get_i32(i).unwrap(), 0); + assert_eq!(ring_buffer.get_i32_volatile(i).unwrap(), 0); } } #[test] fn read_multiple() { - let mut bytes = vec![0u8; 512 + buffer_descriptor::TRAILER_LENGTH as usize]; - let buffer = AtomicBuffer::wrap(&mut bytes); - let mut ring_buffer = ManyToOneRingBuffer::wrap(buffer).expect("Invalid buffer size"); + let mut ring_buffer = ManyToOneRingBuffer::new(vec![0u8; BUFFER_SIZE]).expect("Invalid buffer size"); let mut source_buffer = &mut [12u8, 0, 0, 0, 0, 0, 0, 0][..]; - let source_len = source_bytes.len() as IndexT; + let source_len = source_buffer.len() as IndexT; let type_id = 1; ring_buffer .write(type_id, &source_buffer, 0, source_len) @@ -493,7 +485,7 @@ mod tests { .unwrap(); let mut msg_count = 0; - let c = |_, buf: &dyn AtomicBuffer, offset, _| { + let c = |_, buf: &Vec, offset, _| { msg_count += 1; assert_eq!(buf.get_i64_volatile(offset).unwrap(), 12); }; @@ -501,11 +493,8 @@ mod tests { assert_eq!(msg_count, 2); // Make sure that the buffer was zeroed on finish - drop(ring_buffer); - let buffer = AtomicBuffer::wrap(&mut bytes); for i in (0..record_descriptor::ALIGNMENT * 2).step_by(4) { - assert_eq!(buffer.get_i32(i).unwrap(), 0); + assert_eq!(ring_buffer.get_i32_volatile(i).unwrap(), 0); } } - */ } From b235655f71155f32d8a57009c5a7e71bf90c8c59 Mon Sep 17 00:00:00 2001 From: Bradlee Speice Date: Sat, 2 Nov 2019 15:50:33 -0400 Subject: [PATCH 6/7] Formatting --- aeron-rs/src/client/concurrent/mod.rs | 4 +++- aeron-rs/src/client/concurrent/ringbuffer.rs | 19 ++++++++++--------- 2 files changed, 13 insertions(+), 10 deletions(-) diff --git a/aeron-rs/src/client/concurrent/mod.rs b/aeron-rs/src/client/concurrent/mod.rs index 8839f0e..d337fb3 100644 --- a/aeron-rs/src/client/concurrent/mod.rs +++ b/aeron-rs/src/client/concurrent/mod.rs @@ -191,7 +191,9 @@ pub trait AtomicBuffer: Deref + DerefMut { /// Repeatedly write a value into an atomic buffer. Guaranteed to use `memset`. fn set_memory(&mut self, offset: IndexT, length: usize, value: u8) -> Result<()> { self.bounds_check(offset, length as IndexT).map(|_| unsafe { - self.as_mut_ptr().offset(offset as isize).write_bytes(value, length) + self.as_mut_ptr() + .offset(offset as isize) + .write_bytes(value, length) }) } diff --git a/aeron-rs/src/client/concurrent/ringbuffer.rs b/aeron-rs/src/client/concurrent/ringbuffer.rs index 3fa62a1..4f23130 100644 --- a/aeron-rs/src/client/concurrent/ringbuffer.rs +++ b/aeron-rs/src/client/concurrent/ringbuffer.rs @@ -199,7 +199,10 @@ where { // QUESTION: Should I implement the `get_i64` method that C++ uses? // UNWRAP: Bounds check performed during buffer creation - let head = self.buffer.get_i64_volatile(self.head_position_index).unwrap(); + let head = self + .buffer + .get_i64_volatile(self.head_position_index) + .unwrap(); let head_index = (head & i64::from(self.capacity - 1)) as i32; let contiguous_block_length = self.capacity - head_index; let mut messages_read = 0; @@ -379,9 +382,7 @@ where #[cfg(test)] mod tests { - use crate::client::concurrent::ringbuffer::{ - record_descriptor, ManyToOneRingBuffer, - }; + use crate::client::concurrent::ringbuffer::{record_descriptor, ManyToOneRingBuffer}; use crate::client::concurrent::AtomicBuffer; use crate::util::IndexT; use std::mem::size_of; @@ -449,7 +450,8 @@ mod tests { #[test] fn read_basic() { // Similar to write basic, put something into the buffer - let mut ring_buffer = ManyToOneRingBuffer::new(vec![0u8; BUFFER_SIZE]).expect("Invalid buffer size"); + let mut ring_buffer = + ManyToOneRingBuffer::new(vec![0u8; BUFFER_SIZE]).expect("Invalid buffer size"); let mut source_buffer = &mut [12u8, 0, 0, 0, 0, 0, 0, 0][..]; let source_len = source_buffer.len() as IndexT; @@ -459,9 +461,7 @@ mod tests { .unwrap(); // Now we can start the actual read process - let c = |_, buf: &Vec, offset, _| { - assert_eq!(buf.get_i64_volatile(offset).unwrap(), 12) - }; + let c = |_, buf: &Vec, offset, _| assert_eq!(buf.get_i64_volatile(offset).unwrap(), 12); ring_buffer.read(c, 1).unwrap(); // Make sure that the buffer was zeroed on finish @@ -472,7 +472,8 @@ mod tests { #[test] fn read_multiple() { - let mut ring_buffer = ManyToOneRingBuffer::new(vec![0u8; BUFFER_SIZE]).expect("Invalid buffer size"); + let mut ring_buffer = + ManyToOneRingBuffer::new(vec![0u8; BUFFER_SIZE]).expect("Invalid buffer size"); let mut source_buffer = &mut [12u8, 0, 0, 0, 0, 0, 0, 0][..]; let source_len = source_buffer.len() as IndexT; From 9139bf7234176064811603f6d94277ffca743a5b Mon Sep 17 00:00:00 2001 From: Bradlee Speice Date: Sat, 2 Nov 2019 16:00:57 -0400 Subject: [PATCH 7/7] CnC termination test working again --- aeron-rs/src/client/concurrent/mod.rs | 3 +++ aeron-rs/tests/cnc_terminate.rs | 32 +++++++++++++-------------- 2 files changed, 18 insertions(+), 17 deletions(-) diff --git a/aeron-rs/src/client/concurrent/mod.rs b/aeron-rs/src/client/concurrent/mod.rs index d337fb3..bd8221d 100644 --- a/aeron-rs/src/client/concurrent/mod.rs +++ b/aeron-rs/src/client/concurrent/mod.rs @@ -8,6 +8,7 @@ use std::sync::atomic::{AtomicI64, Ordering}; use crate::util::{AeronError, IndexT, Result}; use std::ptr::{read_volatile, write_volatile}; +use memmap::MmapMut; use std::ops::{Deref, DerefMut}; /// Atomic operations on slices of memory @@ -224,3 +225,5 @@ pub trait AtomicBuffer: Deref + DerefMut { impl AtomicBuffer for Vec {} impl AtomicBuffer for &mut [u8] {} + +impl AtomicBuffer for MmapMut {} diff --git a/aeron-rs/tests/cnc_terminate.rs b/aeron-rs/tests/cnc_terminate.rs index e4bc449..2e67bd5 100644 --- a/aeron-rs/tests/cnc_terminate.rs +++ b/aeron-rs/tests/cnc_terminate.rs @@ -1,5 +1,6 @@ use aeron_driver_sys::*; use aeron_rs::client::cnc_descriptor; +use aeron_rs::client::cnc_descriptor::MetaDataDefinition; use aeron_rs::client::concurrent::ringbuffer::ManyToOneRingBuffer; use aeron_rs::client::concurrent::AtomicBuffer; use aeron_rs::util::IndexT; @@ -77,7 +78,6 @@ fn driver_thread(aeron_dir: PathBuf) { unsafe { aeron_driver_context_close(context) }; } -/* #[test] fn cnc_terminate() { let temp_dir = tempdir().unwrap(); @@ -109,30 +109,29 @@ fn cnc_terminate() { let cnc_metadata_len = cnc_descriptor::META_DATA_LENGTH; // Read metadata to get buffer length - let buffer_len = { - let atomic_buffer = AtomicBuffer::wrap(&mut mmap); - let metadata = atomic_buffer - .overlay::(0) - .unwrap(); - metadata.to_driver_buffer_length - }; + let buffer_len = mmap + .overlay::(0) + .unwrap() + .to_driver_buffer_length; let buffer_end = cnc_metadata_len + buffer_len as usize; - let atomic_buffer = AtomicBuffer::wrap(&mut mmap[cnc_metadata_len..buffer_end]); - let mut ring_buffer = - ManyToOneRingBuffer::wrap(atomic_buffer).expect("Improperly sized buffer"); + let mut ring_buffer = ManyToOneRingBuffer::new(&mut mmap[cnc_metadata_len..buffer_end]) + .expect("Improperly sized buffer"); // 20 bytes: Client ID (8), correlation ID (8), token length (4) let mut terminate_bytes = vec![0u8; 20]; - let terminate_len = terminate_bytes.len(); - let mut source_buffer = AtomicBuffer::wrap(&mut terminate_bytes); let client_id = ring_buffer.next_correlation_id(); - source_buffer.put_i64_ordered(0, client_id).unwrap(); - source_buffer.put_i64_ordered(8, -1).unwrap(); + terminate_bytes.put_i64_ordered(0, client_id).unwrap(); + terminate_bytes.put_i64_ordered(8, -1).unwrap(); let term_id: i32 = 0x0E; ring_buffer - .write(term_id, &source_buffer, 0, terminate_len as IndexT) + .write( + term_id, + &terminate_bytes, + 0, + terminate_bytes.len() as IndexT, + ) .unwrap(); // Wait for the driver to finish @@ -142,4 +141,3 @@ fn cnc_terminate() { .expect("Driver thread panicked during execution"); assert_eq!(RUNNING.load(Ordering::SeqCst), false); } -*/