1
0
mirror of https://github.com/bspeice/aeron-rs synced 2024-12-22 05:48:10 -05:00

AtomicBuffer trait part two

I have a much better grasp of what I need to do now.
This commit is contained in:
Bradlee Speice 2019-11-02 14:59:52 -04:00
parent f7ec021bc8
commit fd23f2891a
5 changed files with 91 additions and 248 deletions

View File

@ -3,33 +3,30 @@
pub mod ringbuffer; pub mod ringbuffer;
use std::mem::size_of; use std::mem::size_of;
use std::ops::Deref;
use std::sync::atomic::{AtomicI64, Ordering}; use std::sync::atomic::{AtomicI64, Ordering};
use crate::util::{AeronError, IndexT, Result}; use crate::util::{AeronError, IndexT, Result};
use std::ptr::{read_volatile, write_volatile}; use std::ptr::{read_volatile, write_volatile};
/// Wrapper for atomic operations around an underlying byte buffer use std::ops::{DerefMut, Deref};
pub struct AtomicBuffer<'a> {
buffer: &'a mut [u8],
}
impl<'a> Deref for AtomicBuffer<'a> { /// Atomic operations on slices of memory
type Target = [u8]; pub trait AtomicBuffer: Deref<Target = [u8]> + DerefMut<Target = [u8]> {
fn deref(&self) -> &Self::Target {
self.buffer
}
}
impl<'a> AtomicBuffer<'a> {
/// Create an `AtomicBuffer` as a view on an underlying byte slice
pub fn wrap(buffer: &'a mut [u8]) -> Self {
AtomicBuffer { buffer }
}
/// Check that there are at least `size` bytes of memory available
/// beginning at some offset.
///
/// ```rust
/// # use aeron_rs::client::concurrent::AtomicBuffer;
///
/// let buffer = &mut [0u8; 8][..];
/// assert!(buffer.bounds_check(0, 8).is_ok());
/// assert!(buffer.bounds_check(1, 7).is_ok());
/// assert!(buffer.bounds_check(1, 8).is_err());
/// assert!(buffer.bounds_check(-1, 8).is_err());
/// ```
fn bounds_check(&self, offset: IndexT, size: IndexT) -> Result<()> { fn bounds_check(&self, offset: IndexT, size: IndexT) -> Result<()> {
if offset < 0 || size < 0 || self.buffer.len() as IndexT - offset < size { if offset < 0 || size < 0 || self.deref().len() as IndexT - offset < size {
Err(AeronError::OutOfBounds) Err(AeronError::OutOfBounds)
} else { } else {
Ok(()) Ok(())
@ -39,236 +36,79 @@ impl<'a> AtomicBuffer<'a> {
/// Overlay a struct on a buffer. /// Overlay a struct on a buffer.
/// ///
/// NOTE: Has the potential to cause undefined behavior if alignment is incorrect. /// NOTE: Has the potential to cause undefined behavior if alignment is incorrect.
pub fn overlay<T>(&self, offset: IndexT) -> Result<&T> ///
/// ```rust
/// # use aeron_rs::client::concurrent::AtomicBuffer;
/// # use std::sync::atomic::{AtomicI64, Ordering};
/// let buffer = &mut [0u8; 9][..];
///
/// let my_val: &AtomicI64 = buffer.overlay::<AtomicI64>(0).unwrap();
/// assert_eq!(my_val.load(Ordering::SeqCst), 0);
///
/// my_val.store(1, Ordering::SeqCst);
/// assert_eq!(buffer, [1, 0, 0, 0, 0, 0, 0, 0, 0]);
///
/// let another_val: &AtomicI64 = buffer.overlay::<AtomicI64>(1).unwrap();
/// assert_eq!(another_val.load(Ordering::SeqCst), 0);
/// ```
fn overlay<T>(&self, offset: IndexT) -> Result<&T>
where where
T: Sized, T: Sized
{ {
self.bounds_check(offset, size_of::<T>() as IndexT) self.bounds_check(offset, size_of::<T>() as IndexT)
.map(|_| { .map(|_| {
let offset_ptr = unsafe { self.buffer.as_ptr().offset(offset as isize) }; let offset_ptr = unsafe { self.as_ptr().offset(offset as isize) };
unsafe { &*(offset_ptr as *const T) } unsafe { &*(offset_ptr as *const T) }
}) })
} }
/// Overlay a struct on a buffer, and perform a volatile read
///
/// ```rust
/// # use aeron_rs::client::concurrent::AtomicBuffer;
/// let buffer = &mut [5, 0, 0, 0][..];
///
/// let my_val: u32 = buffer.overlay_volatile::<u32>(0).unwrap();
/// assert_eq!(my_val, 5);
/// ```
fn overlay_volatile<T>(&self, offset: IndexT) -> Result<T> fn overlay_volatile<T>(&self, offset: IndexT) -> Result<T>
where where
T: Copy, T: Copy,
{ {
self.bounds_check(offset, size_of::<T>() as IndexT) self.bounds_check(offset, size_of::<T>() as IndexT)
.map(|_| { .map(|_| {
let offset_ptr = unsafe { self.buffer.as_ptr().offset(offset as isize) }; let offset_ptr = unsafe { self.as_ptr().offset(offset as isize) };
unsafe { read_volatile(offset_ptr as *const T) } unsafe { read_volatile(offset_ptr as *const T) }
}) })
} }
/// Perform a volatile write of a value over a buffer
///
/// ```rust
/// # use aeron_rs::client::concurrent::AtomicBuffer;
/// let mut buffer = &mut [0, 0, 0, 0][..];
///
/// let value: u32 = 24;
/// buffer.write_volatile(0, value).unwrap();
/// assert_eq!(buffer, [24, 0, 0, 0]);
/// ```
fn write_volatile<T>(&mut self, offset: IndexT, val: T) -> Result<()> fn write_volatile<T>(&mut self, offset: IndexT, val: T) -> Result<()>
where where
T: Copy, T: Copy,
{ {
self.bounds_check(offset, size_of::<T>() as IndexT) self.bounds_check(offset, size_of::<T>() as IndexT)
.map(|_| { .map(|_| {
let offset_ptr = unsafe { self.buffer.as_mut_ptr().offset(offset as isize) }; let offset_ptr = unsafe { self.as_mut_ptr().offset(offset as isize) };
unsafe { write_volatile(offset_ptr as *mut T, val) }; unsafe { write_volatile(offset_ptr as *mut T, val) };
}) })
} }
/// Atomically fetch the current value at an offset, and increment by delta /// Perform an atomic fetch and add of a 64-bit value
/// fn get_and_add_i64(&self, offset: IndexT, value: i64) -> Result<i64> {
/// ```rust self.overlay::<AtomicI64>(offset).map(|a| a.fetch_add(value, Ordering::SeqCst))
/// # use aeron_rs::client::concurrent::AtomicBuffer;
/// # use aeron_rs::util::AeronError;
/// let mut bytes = [0u8; 9];
/// let mut buffer = AtomicBuffer::wrap(&mut bytes);
///
/// // Simple case modifies only the first byte
/// assert_eq!(buffer.get_and_add_i64(0, 1), Ok(0));
/// assert_eq!(buffer.get_and_add_i64(0, 0), Ok(1));
///
/// // Using an offset modifies the second byte
/// assert_eq!(buffer.get_and_add_i64(1, 1), Ok(0));
/// assert_eq!(buffer.get_and_add_i64(1, 0), Ok(1));
///
/// // An offset of 2 means buffer size must be 10 to contain an `i64`
/// assert_eq!(buffer.get_and_add_i64(2, 0), Err(AeronError::OutOfBounds));
/// ```
pub fn get_and_add_i64(&self, offset: IndexT, delta: i64) -> Result<i64> {
self.overlay::<AtomicI64>(offset)
.map(|a| a.fetch_add(delta, Ordering::SeqCst))
}
/// Perform a volatile read
///
/// ```rust
/// # use aeron_rs::client::concurrent::AtomicBuffer;
/// let mut bytes = [12, 0, 0, 0, 0, 0, 0, 0];
/// let buffer = AtomicBuffer::wrap(&mut bytes);
///
/// assert_eq!(buffer.get_i64_volatile(0), Ok(12));
/// ```
pub fn get_i64_volatile(&self, offset: IndexT) -> Result<i64> {
// QUESTION: Would it be better to express this in terms of an atomic read?
self.overlay_volatile::<i64>(offset)
}
/// Get the current value at an offset without using any synchronization operations
pub fn get_i64(&self, offset: IndexT) -> Result<i64> {
self.overlay::<i64>(offset).map(|i| *i)
}
/// Perform a volatile read
///
/// ```rust
/// # use aeron_rs::client::concurrent::AtomicBuffer;
/// let mut bytes = [12, 0, 0, 0];
/// let buffer = AtomicBuffer::wrap(&mut bytes);
///
/// assert_eq!(buffer.get_i32_volatile(0), Ok(12));
/// ```
pub fn get_i32_volatile(&self, offset: IndexT) -> Result<i32> {
self.overlay_volatile::<i32>(offset)
}
/// Get the current value at an offset without using any synchronization operations
pub fn get_i32(&self, offset: IndexT) -> Result<i32> {
self.overlay::<i32>(offset).map(|i| *i)
}
/// Perform a volatile write of an `i64` into the buffer
///
/// ```rust
/// # use aeron_rs::client::concurrent::AtomicBuffer;
/// let mut bytes = [0u8; 8];
/// let mut buffer = AtomicBuffer::wrap(&mut bytes);
///
/// buffer.put_i64_ordered(0, 12);
/// assert_eq!(buffer.get_i64_volatile(0), Ok(12));
/// ```
pub fn put_i64_ordered(&mut self, offset: IndexT, val: i64) -> Result<()> {
// QUESTION: Would it be better to have callers use `write_volatile` directly
self.write_volatile::<i64>(offset, val)
}
/// Perform a volatile write of an `i32` into the buffer
///
/// ```rust
/// # use aeron_rs::client::concurrent::AtomicBuffer;
/// let mut bytes = [0u8; 4];
/// let mut buffer = AtomicBuffer::wrap(&mut bytes);
///
/// buffer.put_i32_ordered(0, 12);
/// assert_eq!(buffer.get_i32_volatile(0), Ok(12));
/// ```
pub fn put_i32_ordered(&mut self, offset: IndexT, val: i32) -> Result<()> {
// QUESTION: Would it be better to have callers use `write_volatile` directly
self.write_volatile::<i32>(offset, val)
}
/// Write the contents of one buffer to another. Does not perform any synchronization.
///
/// ```rust
/// # use aeron_rs::client::concurrent::AtomicBuffer;
/// let mut source_bytes = [1u8, 2, 3, 4];
/// let source = AtomicBuffer::wrap(&mut source_bytes);
///
/// let mut dest_bytes = [0, 0, 0, 0];
/// let mut dest = AtomicBuffer::wrap(&mut dest_bytes);
///
/// dest.put_bytes(1, &source, 1, 3);
/// drop(dest);
/// assert_eq!(dest_bytes, [0u8, 2, 3, 4]);
/// ```
pub fn put_bytes(
&mut self,
index: IndexT,
source: &AtomicBuffer,
source_index: IndexT,
len: IndexT,
) -> Result<()> {
self.bounds_check(index, len)?;
source.bounds_check(source_index, len)?;
let index = index as usize;
let source_index = source_index as usize;
let len = len as usize;
self.buffer[index..index + len].copy_from_slice(&source[source_index..source_index + len]);
Ok(())
}
/// Compare an expected value with what is in memory, and if it matches,
/// update to a new value. Returns `Ok(true)` if the update was successful,
/// and `Ok(false)` if the update failed.
///
/// ```rust
/// # use aeron_rs::client::concurrent::AtomicBuffer;
/// let mut buf = [0u8; 8];
/// let atomic_buf = AtomicBuffer::wrap(&mut buf);
/// // Set value to 1
/// atomic_buf.get_and_add_i64(0, 1).unwrap();
///
/// // Set value to 1 if existing value is 0
/// assert_eq!(atomic_buf.compare_and_set_i64(0, 0, 1), Ok(false));
/// // Set value to 2 if existing value is 1
/// assert_eq!(atomic_buf.compare_and_set_i64(0, 1, 2), Ok(true));
/// assert_eq!(atomic_buf.get_i64_volatile(0), Ok(2));
/// ```
pub fn compare_and_set_i64(&self, offset: IndexT, expected: i64, update: i64) -> Result<bool> {
// QUESTION: Should I use a volatile read here as well?
// Aeron C++ uses a volatile read before the atomic operation, but I think that
// may be redundant. In addition, Rust's `read_volatile` operation returns a
// *copied* value; running `compare_exchange` on that copy introduces a race condition
// because we're no longer comparing a consistent address.
self.overlay::<AtomicI64>(offset).map(|a| {
a.compare_exchange(expected, update, Ordering::SeqCst, Ordering::SeqCst)
.is_ok()
})
}
/// Repeatedly write a value into an atomic buffer. Guaranteed to use `memset`.
pub fn set_memory(&mut self, offset: IndexT, length: usize, value: u8) -> Result<()> {
self.bounds_check(offset, length as IndexT).map(|_| unsafe {
self.buffer
.as_mut_ptr()
.offset(offset as isize)
.write_bytes(value, length)
})
} }
} }
#[cfg(test)] impl AtomicBuffer for Vec<u8> {}
mod tests {
use memmap::MmapOptions;
use std::sync::atomic::{AtomicU64, Ordering};
use crate::client::concurrent::AtomicBuffer; impl AtomicBuffer for &mut [u8] {}
use crate::util::AeronError;
#[test]
fn mmap_to_atomic() {
let mut mmap = MmapOptions::new()
.len(24)
.map_anon()
.expect("Unable to map anonymous memory");
AtomicBuffer::wrap(&mut mmap);
}
#[test]
fn primitive_atomic_equivalent() {
let value: u64 = 24;
let val_ptr = &value as *const u64;
let a_ptr = val_ptr as *const AtomicU64;
let a: &AtomicU64 = unsafe { &*a_ptr };
assert_eq!(value, (*a).load(Ordering::SeqCst));
}
#[test]
fn negative_offset() {
let mut buf = [16, 0, 0, 0, 0, 0, 0, 0];
let atomic_buf = AtomicBuffer::wrap(&mut buf);
assert_eq!(
atomic_buf.get_and_add_i64(-1, 0),
Err(AeronError::OutOfBounds)
)
}
}

View File

@ -30,7 +30,10 @@ pub mod buffer_descriptor {
/// Verify the capacity of a buffer is legal for use as a ring buffer. /// Verify the capacity of a buffer is legal for use as a ring buffer.
/// Returns the actual capacity excluding ring buffer metadata. /// Returns the actual capacity excluding ring buffer metadata.
pub fn check_capacity(buffer: &AtomicBuffer<'_>) -> Result<IndexT> { pub fn check_capacity<A>(buffer: &A) -> Result<IndexT>
where
A: AtomicBuffer
{
let capacity = (buffer.len() - TRAILER_LENGTH as usize) as IndexT; let capacity = (buffer.len() - TRAILER_LENGTH as usize) as IndexT;
if is_power_of_two(capacity) { if is_power_of_two(capacity) {
Ok(capacity) Ok(capacity)
@ -107,8 +110,11 @@ pub mod record_descriptor {
} }
/// Multi-producer, single-consumer ring buffer implementation. /// Multi-producer, single-consumer ring buffer implementation.
pub struct ManyToOneRingBuffer<'a> { pub struct ManyToOneRingBuffer<A>
buffer: AtomicBuffer<'a>, where
A: AtomicBuffer
{
buffer: A,
capacity: IndexT, capacity: IndexT,
max_msg_length: IndexT, max_msg_length: IndexT,
tail_position_index: IndexT, tail_position_index: IndexT,
@ -117,9 +123,12 @@ pub struct ManyToOneRingBuffer<'a> {
correlation_id_counter_index: IndexT, correlation_id_counter_index: IndexT,
} }
impl<'a> ManyToOneRingBuffer<'a> { impl<A> ManyToOneRingBuffer<A>
where
A: AtomicBuffer
{
/// Create a many-to-one ring buffer from an underlying atomic buffer. /// Create a many-to-one ring buffer from an underlying atomic buffer.
pub fn wrap(buffer: AtomicBuffer<'a>) -> Result<Self> { pub fn new(buffer: A) -> Result<Self> {
buffer_descriptor::check_capacity(&buffer).map(|capacity| ManyToOneRingBuffer { buffer_descriptor::check_capacity(&buffer).map(|capacity| ManyToOneRingBuffer {
buffer, buffer,
capacity, capacity,
@ -140,14 +149,18 @@ impl<'a> ManyToOneRingBuffer<'a> {
.unwrap() .unwrap()
} }
/*
/// Write a message into the ring buffer /// Write a message into the ring buffer
pub fn write( pub fn write<B>(
&mut self, &mut self,
msg_type_id: i32, msg_type_id: i32,
source: &AtomicBuffer, source: &B,
source_index: IndexT, source_index: IndexT,
length: IndexT, length: IndexT,
) -> Result<()> { ) -> Result<()>
where
B: AtomicBuffer
{
record_descriptor::check_msg_type_id(msg_type_id)?; record_descriptor::check_msg_type_id(msg_type_id)?;
self.check_msg_length(length)?; self.check_msg_length(length)?;
@ -182,7 +195,7 @@ impl<'a> ManyToOneRingBuffer<'a> {
/// Read messages from the ring buffer and dispatch to `handler`, up to `message_count_limit` /// Read messages from the ring buffer and dispatch to `handler`, up to `message_count_limit`
pub fn read<F>(&mut self, mut handler: F, message_count_limit: usize) -> Result<usize> pub fn read<F>(&mut self, mut handler: F, message_count_limit: usize) -> Result<usize>
where where
F: FnMut(i32, &AtomicBuffer, IndexT, IndexT) -> (), F: FnMut(i32, &A, IndexT, IndexT) -> (),
{ {
// UNWRAP: Bounds check performed during buffer creation // UNWRAP: Bounds check performed during buffer creation
let head = self.buffer.get_i64(self.head_position_index).unwrap(); let head = self.buffer.get_i64(self.head_position_index).unwrap();
@ -350,8 +363,10 @@ impl<'a> ManyToOneRingBuffer<'a> {
Ok(()) Ok(())
} }
} }
*/
} }
/*
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use crate::client::concurrent::AtomicBuffer; use crate::client::concurrent::AtomicBuffer;
@ -415,16 +430,15 @@ mod tests {
let buffer = AtomicBuffer::wrap(&mut bytes); let buffer = AtomicBuffer::wrap(&mut bytes);
let mut ring_buffer = ManyToOneRingBuffer::wrap(buffer).expect("Invalid buffer size"); let mut ring_buffer = ManyToOneRingBuffer::wrap(buffer).expect("Invalid buffer size");
let mut source_bytes = [12, 0, 0, 0, 0, 0, 0, 0]; let mut source_buffer = &mut [12u8, 0, 0, 0, 0, 0, 0, 0][..];
let source_len = source_bytes.len() as IndexT; let source_len = source_bytes.len() as IndexT;
let source_buffer = AtomicBuffer::wrap(&mut source_bytes);
let type_id = 1; let type_id = 1;
ring_buffer ring_buffer
.write(type_id, &source_buffer, 0, source_len) .write(type_id, &source_buffer, 0, source_len)
.unwrap(); .unwrap();
// Now we can start the actual read process // Now we can start the actual read process
let c = |_, buf: &AtomicBuffer, offset, _| { let c = |_, buf: &dyn AtomicBuffer, offset, _| {
assert_eq!(buf.get_i64_volatile(offset).unwrap(), 12) assert_eq!(buf.get_i64_volatile(offset).unwrap(), 12)
}; };
ring_buffer.read(c, 1).unwrap(); ring_buffer.read(c, 1).unwrap();
@ -443,9 +457,8 @@ mod tests {
let buffer = AtomicBuffer::wrap(&mut bytes); let buffer = AtomicBuffer::wrap(&mut bytes);
let mut ring_buffer = ManyToOneRingBuffer::wrap(buffer).expect("Invalid buffer size"); let mut ring_buffer = ManyToOneRingBuffer::wrap(buffer).expect("Invalid buffer size");
let mut source_bytes = [12, 0, 0, 0, 0, 0, 0, 0]; let mut source_buffer = &mut [12u8, 0, 0, 0, 0, 0, 0, 0][..];
let source_len = source_bytes.len() as IndexT; let source_len = source_bytes.len() as IndexT;
let source_buffer = AtomicBuffer::wrap(&mut source_bytes);
let type_id = 1; let type_id = 1;
ring_buffer ring_buffer
.write(type_id, &source_buffer, 0, source_len) .write(type_id, &source_buffer, 0, source_len)
@ -455,7 +468,7 @@ mod tests {
.unwrap(); .unwrap();
let mut msg_count = 0; let mut msg_count = 0;
let c = |_, buf: &AtomicBuffer, offset, _| { let c = |_, buf: &dyn AtomicBuffer, offset, _| {
msg_count += 1; msg_count += 1;
assert_eq!(buf.get_i64_volatile(offset).unwrap(), 12); assert_eq!(buf.get_i64_volatile(offset).unwrap(), 12);
}; };
@ -470,3 +483,4 @@ mod tests {
} }
} }
} }
*/

View File

@ -1,12 +0,0 @@
//! Proxy object for interacting with the Media Driver. Handles operations
//! involving the command-and-control file protocol.
use crate::client::concurrent::ringbuffer::ManyToOneRingBuffer;
/// Proxy object for operations involving the Media Driver
pub struct DriverProxy<'a> {
_to_driver: ManyToOneRingBuffer<'a>,
_client_id: i64,
}
impl<'a> DriverProxy<'a> {}

View File

@ -4,4 +4,3 @@
pub mod cnc_descriptor; pub mod cnc_descriptor;
pub mod concurrent; pub mod concurrent;
pub mod context; pub mod context;
pub mod driver_proxy;

View File

@ -77,6 +77,7 @@ fn driver_thread(aeron_dir: PathBuf) {
unsafe { aeron_driver_context_close(context) }; unsafe { aeron_driver_context_close(context) };
} }
/*
#[test] #[test]
fn cnc_terminate() { fn cnc_terminate() {
let temp_dir = tempdir().unwrap(); let temp_dir = tempdir().unwrap();
@ -141,3 +142,4 @@ fn cnc_terminate() {
.expect("Driver thread panicked during execution"); .expect("Driver thread panicked during execution");
assert_eq!(RUNNING.load(Ordering::SeqCst), false); assert_eq!(RUNNING.load(Ordering::SeqCst), false);
} }
*/