mirror of
https://github.com/bspeice/aeron-rs
synced 2025-01-21 03:20:04 -05:00
Merge #16
16: Atomic buffer trait r=bspeice a=bspeice Implements #12. Turns out I had been making it too hard on myself previously. bors r+ Co-authored-by: Bradlee Speice <bradlee@speice.io>
This commit is contained in:
commit
b902f580c9
@ -1,272 +0,0 @@
|
||||
//! Buffer that is safe to use in a multi-process/multi-thread context. Typically used for
|
||||
//! handling atomic updates of memory-mapped buffers.
|
||||
use std::mem::size_of;
|
||||
use std::ops::Deref;
|
||||
use std::sync::atomic::{AtomicI64, Ordering};
|
||||
|
||||
use crate::util::{AeronError, IndexT, Result};
|
||||
use std::ptr::{read_volatile, write_volatile};
|
||||
|
||||
/// Wrapper for atomic operations around an underlying byte buffer
|
||||
pub struct AtomicBuffer<'a> {
|
||||
buffer: &'a mut [u8],
|
||||
}
|
||||
|
||||
impl<'a> Deref for AtomicBuffer<'a> {
|
||||
type Target = [u8];
|
||||
|
||||
fn deref(&self) -> &Self::Target {
|
||||
self.buffer
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> AtomicBuffer<'a> {
|
||||
/// Create an `AtomicBuffer` as a view on an underlying byte slice
|
||||
pub fn wrap(buffer: &'a mut [u8]) -> Self {
|
||||
AtomicBuffer { buffer }
|
||||
}
|
||||
|
||||
fn bounds_check(&self, offset: IndexT, size: IndexT) -> Result<()> {
|
||||
if offset < 0 || size < 0 || self.buffer.len() as IndexT - offset < size {
|
||||
Err(AeronError::OutOfBounds)
|
||||
} else {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// Overlay a struct on a buffer.
|
||||
///
|
||||
/// NOTE: Has the potential to cause undefined behavior if alignment is incorrect.
|
||||
pub fn overlay<T>(&self, offset: IndexT) -> Result<&T>
|
||||
where
|
||||
T: Sized,
|
||||
{
|
||||
self.bounds_check(offset, size_of::<T>() as IndexT)
|
||||
.map(|_| {
|
||||
let offset_ptr = unsafe { self.buffer.as_ptr().offset(offset as isize) };
|
||||
unsafe { &*(offset_ptr as *const T) }
|
||||
})
|
||||
}
|
||||
|
||||
fn overlay_volatile<T>(&self, offset: IndexT) -> Result<T>
|
||||
where
|
||||
T: Copy,
|
||||
{
|
||||
self.bounds_check(offset, size_of::<T>() as IndexT)
|
||||
.map(|_| {
|
||||
let offset_ptr = unsafe { self.buffer.as_ptr().offset(offset as isize) };
|
||||
unsafe { read_volatile(offset_ptr as *const T) }
|
||||
})
|
||||
}
|
||||
|
||||
fn write_volatile<T>(&mut self, offset: IndexT, val: T) -> Result<()>
|
||||
where
|
||||
T: Copy,
|
||||
{
|
||||
self.bounds_check(offset, size_of::<T>() as IndexT)
|
||||
.map(|_| {
|
||||
let offset_ptr = unsafe { self.buffer.as_mut_ptr().offset(offset as isize) };
|
||||
unsafe { write_volatile(offset_ptr as *mut T, val) };
|
||||
})
|
||||
}
|
||||
|
||||
/// Atomically fetch the current value at an offset, and increment by delta
|
||||
///
|
||||
/// ```rust
|
||||
/// # use aeron_rs::client::concurrent::atomic_buffer::AtomicBuffer;
|
||||
/// # use aeron_rs::util::AeronError;
|
||||
/// let mut bytes = [0u8; 9];
|
||||
/// let mut buffer = AtomicBuffer::wrap(&mut bytes);
|
||||
///
|
||||
/// // Simple case modifies only the first byte
|
||||
/// assert_eq!(buffer.get_and_add_i64(0, 1), Ok(0));
|
||||
/// assert_eq!(buffer.get_and_add_i64(0, 0), Ok(1));
|
||||
///
|
||||
/// // Using an offset modifies the second byte
|
||||
/// assert_eq!(buffer.get_and_add_i64(1, 1), Ok(0));
|
||||
/// assert_eq!(buffer.get_and_add_i64(1, 0), Ok(1));
|
||||
///
|
||||
/// // An offset of 2 means buffer size must be 10 to contain an `i64`
|
||||
/// assert_eq!(buffer.get_and_add_i64(2, 0), Err(AeronError::OutOfBounds));
|
||||
/// ```
|
||||
pub fn get_and_add_i64(&self, offset: IndexT, delta: i64) -> Result<i64> {
|
||||
self.overlay::<AtomicI64>(offset)
|
||||
.map(|a| a.fetch_add(delta, Ordering::SeqCst))
|
||||
}
|
||||
|
||||
/// Perform a volatile read
|
||||
///
|
||||
/// ```rust
|
||||
/// # use aeron_rs::client::concurrent::atomic_buffer::AtomicBuffer;
|
||||
/// let mut bytes = [12, 0, 0, 0, 0, 0, 0, 0];
|
||||
/// let buffer = AtomicBuffer::wrap(&mut bytes);
|
||||
///
|
||||
/// assert_eq!(buffer.get_i64_volatile(0), Ok(12));
|
||||
/// ```
|
||||
pub fn get_i64_volatile(&self, offset: IndexT) -> Result<i64> {
|
||||
// QUESTION: Would it be better to express this in terms of an atomic read?
|
||||
self.overlay_volatile::<i64>(offset)
|
||||
}
|
||||
|
||||
/// Get the current value at an offset without using any synchronization operations
|
||||
pub fn get_i64(&self, offset: IndexT) -> Result<i64> {
|
||||
self.overlay::<i64>(offset).map(|i| *i)
|
||||
}
|
||||
|
||||
/// Perform a volatile read
|
||||
///
|
||||
/// ```rust
|
||||
/// # use aeron_rs::client::concurrent::atomic_buffer::AtomicBuffer;
|
||||
/// let mut bytes = [12, 0, 0, 0];
|
||||
/// let buffer = AtomicBuffer::wrap(&mut bytes);
|
||||
///
|
||||
/// assert_eq!(buffer.get_i32_volatile(0), Ok(12));
|
||||
/// ```
|
||||
pub fn get_i32_volatile(&self, offset: IndexT) -> Result<i32> {
|
||||
self.overlay_volatile::<i32>(offset)
|
||||
}
|
||||
|
||||
/// Get the current value at an offset without using any synchronization operations
|
||||
pub fn get_i32(&self, offset: IndexT) -> Result<i32> {
|
||||
self.overlay::<i32>(offset).map(|i| *i)
|
||||
}
|
||||
|
||||
/// Perform a volatile write of an `i64` into the buffer
|
||||
///
|
||||
/// ```rust
|
||||
/// # use aeron_rs::client::concurrent::atomic_buffer::AtomicBuffer;
|
||||
/// let mut bytes = [0u8; 8];
|
||||
/// let mut buffer = AtomicBuffer::wrap(&mut bytes);
|
||||
///
|
||||
/// buffer.put_i64_ordered(0, 12);
|
||||
/// assert_eq!(buffer.get_i64_volatile(0), Ok(12));
|
||||
/// ```
|
||||
pub fn put_i64_ordered(&mut self, offset: IndexT, val: i64) -> Result<()> {
|
||||
// QUESTION: Would it be better to have callers use `write_volatile` directly
|
||||
self.write_volatile::<i64>(offset, val)
|
||||
}
|
||||
|
||||
/// Perform a volatile write of an `i32` into the buffer
|
||||
///
|
||||
/// ```rust
|
||||
/// # use aeron_rs::client::concurrent::atomic_buffer::AtomicBuffer;
|
||||
/// let mut bytes = [0u8; 4];
|
||||
/// let mut buffer = AtomicBuffer::wrap(&mut bytes);
|
||||
///
|
||||
/// buffer.put_i32_ordered(0, 12);
|
||||
/// assert_eq!(buffer.get_i32_volatile(0), Ok(12));
|
||||
/// ```
|
||||
pub fn put_i32_ordered(&mut self, offset: IndexT, val: i32) -> Result<()> {
|
||||
// QUESTION: Would it be better to have callers use `write_volatile` directly
|
||||
self.write_volatile::<i32>(offset, val)
|
||||
}
|
||||
|
||||
/// Write the contents of one buffer to another. Does not perform any synchronization.
|
||||
///
|
||||
/// ```rust
|
||||
/// # use aeron_rs::client::concurrent::atomic_buffer::AtomicBuffer;
|
||||
/// let mut source_bytes = [1u8, 2, 3, 4];
|
||||
/// let source = AtomicBuffer::wrap(&mut source_bytes);
|
||||
///
|
||||
/// let mut dest_bytes = [0, 0, 0, 0];
|
||||
/// let mut dest = AtomicBuffer::wrap(&mut dest_bytes);
|
||||
///
|
||||
/// dest.put_bytes(1, &source, 1, 3);
|
||||
/// drop(dest);
|
||||
/// assert_eq!(dest_bytes, [0u8, 2, 3, 4]);
|
||||
/// ```
|
||||
pub fn put_bytes(
|
||||
&mut self,
|
||||
index: IndexT,
|
||||
source: &AtomicBuffer,
|
||||
source_index: IndexT,
|
||||
len: IndexT,
|
||||
) -> Result<()> {
|
||||
self.bounds_check(index, len)?;
|
||||
source.bounds_check(source_index, len)?;
|
||||
|
||||
let index = index as usize;
|
||||
let source_index = source_index as usize;
|
||||
let len = len as usize;
|
||||
self.buffer[index..index + len].copy_from_slice(&source[source_index..source_index + len]);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Compare an expected value with what is in memory, and if it matches,
|
||||
/// update to a new value. Returns `Ok(true)` if the update was successful,
|
||||
/// and `Ok(false)` if the update failed.
|
||||
///
|
||||
/// ```rust
|
||||
/// # use aeron_rs::client::concurrent::atomic_buffer::AtomicBuffer;
|
||||
/// let mut buf = [0u8; 8];
|
||||
/// let atomic_buf = AtomicBuffer::wrap(&mut buf);
|
||||
/// // Set value to 1
|
||||
/// atomic_buf.get_and_add_i64(0, 1).unwrap();
|
||||
///
|
||||
/// // Set value to 1 if existing value is 0
|
||||
/// assert_eq!(atomic_buf.compare_and_set_i64(0, 0, 1), Ok(false));
|
||||
/// // Set value to 2 if existing value is 1
|
||||
/// assert_eq!(atomic_buf.compare_and_set_i64(0, 1, 2), Ok(true));
|
||||
/// assert_eq!(atomic_buf.get_i64_volatile(0), Ok(2));
|
||||
/// ```
|
||||
pub fn compare_and_set_i64(&self, offset: IndexT, expected: i64, update: i64) -> Result<bool> {
|
||||
// QUESTION: Should I use a volatile read here as well?
|
||||
// Aeron C++ uses a volatile read before the atomic operation, but I think that
|
||||
// may be redundant. In addition, Rust's `read_volatile` operation returns a
|
||||
// *copied* value; running `compare_exchange` on that copy introduces a race condition
|
||||
// because we're no longer comparing a consistent address.
|
||||
self.overlay::<AtomicI64>(offset).map(|a| {
|
||||
a.compare_exchange(expected, update, Ordering::SeqCst, Ordering::SeqCst)
|
||||
.is_ok()
|
||||
})
|
||||
}
|
||||
|
||||
/// Repeatedly write a value into an atomic buffer. Guaranteed to use `memset`.
|
||||
pub fn set_memory(&mut self, offset: IndexT, length: usize, value: u8) -> Result<()> {
|
||||
self.bounds_check(offset, length as IndexT).map(|_| unsafe {
|
||||
self.buffer
|
||||
.as_mut_ptr()
|
||||
.offset(offset as isize)
|
||||
.write_bytes(value, length)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use memmap::MmapOptions;
|
||||
use std::sync::atomic::{AtomicU64, Ordering};
|
||||
|
||||
use crate::client::concurrent::atomic_buffer::AtomicBuffer;
|
||||
use crate::util::AeronError;
|
||||
|
||||
#[test]
|
||||
fn mmap_to_atomic() {
|
||||
let mut mmap = MmapOptions::new()
|
||||
.len(24)
|
||||
.map_anon()
|
||||
.expect("Unable to map anonymous memory");
|
||||
AtomicBuffer::wrap(&mut mmap);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn primitive_atomic_equivalent() {
|
||||
let value: u64 = 24;
|
||||
|
||||
let val_ptr = &value as *const u64;
|
||||
let a_ptr = val_ptr as *const AtomicU64;
|
||||
let a: &AtomicU64 = unsafe { &*a_ptr };
|
||||
|
||||
assert_eq!(value, (*a).load(Ordering::SeqCst));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn negative_offset() {
|
||||
let mut buf = [16, 0, 0, 0, 0, 0, 0, 0];
|
||||
let atomic_buf = AtomicBuffer::wrap(&mut buf);
|
||||
assert_eq!(
|
||||
atomic_buf.get_and_add_i64(-1, 0),
|
||||
Err(AeronError::OutOfBounds)
|
||||
)
|
||||
}
|
||||
}
|
@ -1,5 +1,229 @@
|
||||
//! Module for handling safe interactions among the multiple clients making use
|
||||
//! of a single Media Driver
|
||||
|
||||
pub mod atomic_buffer;
|
||||
pub mod ring_buffer;
|
||||
pub mod ringbuffer;
|
||||
use std::mem::size_of;
|
||||
use std::sync::atomic::{AtomicI64, Ordering};
|
||||
|
||||
use crate::util::{AeronError, IndexT, Result};
|
||||
use std::ptr::{read_volatile, write_volatile};
|
||||
|
||||
use memmap::MmapMut;
|
||||
use std::ops::{Deref, DerefMut};
|
||||
|
||||
/// Atomic operations on slices of memory
|
||||
pub trait AtomicBuffer: Deref<Target = [u8]> + DerefMut<Target = [u8]> {
|
||||
/// Check that there are at least `size` bytes of memory available
|
||||
/// beginning at some offset.
|
||||
///
|
||||
/// ```rust
|
||||
/// # use aeron_rs::client::concurrent::AtomicBuffer;
|
||||
///
|
||||
/// let buffer = &mut [0u8; 8][..];
|
||||
/// assert!(buffer.bounds_check(0, 8).is_ok());
|
||||
/// assert!(buffer.bounds_check(1, 7).is_ok());
|
||||
/// assert!(buffer.bounds_check(1, 8).is_err());
|
||||
/// assert!(buffer.bounds_check(-1, 8).is_err());
|
||||
/// ```
|
||||
fn bounds_check(&self, offset: IndexT, size: IndexT) -> Result<()> {
|
||||
if offset < 0 || size < 0 || self.deref().len() as IndexT - offset < size {
|
||||
Err(AeronError::OutOfBounds)
|
||||
} else {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// Overlay a struct on a buffer.
|
||||
///
|
||||
/// NOTE: Has the potential to cause undefined behavior if alignment is incorrect.
|
||||
///
|
||||
/// ```rust
|
||||
/// # use aeron_rs::client::concurrent::AtomicBuffer;
|
||||
/// # use std::sync::atomic::{AtomicI64, Ordering};
|
||||
/// let buffer = &mut [0u8; 9][..];
|
||||
///
|
||||
/// let my_val: &AtomicI64 = buffer.overlay::<AtomicI64>(0).unwrap();
|
||||
/// assert_eq!(my_val.load(Ordering::SeqCst), 0);
|
||||
///
|
||||
/// my_val.store(1, Ordering::SeqCst);
|
||||
/// assert_eq!(buffer, [1, 0, 0, 0, 0, 0, 0, 0, 0]);
|
||||
///
|
||||
/// let another_val: &AtomicI64 = buffer.overlay::<AtomicI64>(1).unwrap();
|
||||
/// assert_eq!(another_val.load(Ordering::SeqCst), 0);
|
||||
/// ```
|
||||
fn overlay<T>(&self, offset: IndexT) -> Result<&T>
|
||||
where
|
||||
T: Sized,
|
||||
{
|
||||
self.bounds_check(offset, size_of::<T>() as IndexT)
|
||||
.map(|_| {
|
||||
let offset_ptr = unsafe { self.as_ptr().offset(offset as isize) };
|
||||
unsafe { &*(offset_ptr as *const T) }
|
||||
})
|
||||
}
|
||||
|
||||
/// Overlay a struct on a buffer, and perform a volatile read
|
||||
///
|
||||
/// ```rust
|
||||
/// # use aeron_rs::client::concurrent::AtomicBuffer;
|
||||
/// let buffer = &mut [5, 0, 0, 0][..];
|
||||
///
|
||||
/// let my_val: u32 = buffer.overlay_volatile::<u32>(0).unwrap();
|
||||
/// assert_eq!(my_val, 5);
|
||||
/// ```
|
||||
fn overlay_volatile<T>(&self, offset: IndexT) -> Result<T>
|
||||
where
|
||||
T: Copy,
|
||||
{
|
||||
self.bounds_check(offset, size_of::<T>() as IndexT)
|
||||
.map(|_| {
|
||||
let offset_ptr = unsafe { self.as_ptr().offset(offset as isize) };
|
||||
unsafe { read_volatile(offset_ptr as *const T) }
|
||||
})
|
||||
}
|
||||
|
||||
/// Perform a volatile write of a value over a buffer
|
||||
///
|
||||
/// ```rust
|
||||
/// # use aeron_rs::client::concurrent::AtomicBuffer;
|
||||
/// let mut buffer = &mut [0, 0, 0, 0][..];
|
||||
///
|
||||
/// let value: u32 = 24;
|
||||
/// buffer.write_volatile(0, value).unwrap();
|
||||
/// assert_eq!(buffer, [24, 0, 0, 0]);
|
||||
/// ```
|
||||
fn write_volatile<T>(&mut self, offset: IndexT, val: T) -> Result<()>
|
||||
where
|
||||
T: Copy,
|
||||
{
|
||||
self.bounds_check(offset, size_of::<T>() as IndexT)
|
||||
.map(|_| {
|
||||
let offset_ptr = unsafe { self.as_mut_ptr().offset(offset as isize) };
|
||||
unsafe { write_volatile(offset_ptr as *mut T, val) };
|
||||
})
|
||||
}
|
||||
|
||||
/// Perform an atomic fetch and add of a 64-bit value
|
||||
///
|
||||
/// ```rust
|
||||
/// # use aeron_rs::client::concurrent::AtomicBuffer;
|
||||
/// let mut buf = vec![0u8; 8];
|
||||
/// assert_eq!(buf.get_and_add_i64(0, 1), Ok(0));
|
||||
/// assert_eq!(buf.get_and_add_i64(0, 1), Ok(1));
|
||||
/// ```
|
||||
fn get_and_add_i64(&self, offset: IndexT, value: i64) -> Result<i64> {
|
||||
self.overlay::<AtomicI64>(offset)
|
||||
.map(|a| a.fetch_add(value, Ordering::SeqCst))
|
||||
}
|
||||
|
||||
/// Perform an atomic Compare-And-Swap of a 64-bit value. Returns `Ok(true)`
|
||||
/// if the update was successful, and `Ok(false)` if the update failed.
|
||||
///
|
||||
/// ```rust
|
||||
/// # use aeron_rs::client::concurrent::AtomicBuffer;
|
||||
/// let mut buf = &mut [0u8; 8][..];
|
||||
/// // Set value to 1
|
||||
/// buf.get_and_add_i64(0, 1).unwrap();
|
||||
///
|
||||
/// // Set value to 1 if existing value is 0
|
||||
/// assert_eq!(buf.compare_and_set_i64(0, 0, 1), Ok(false));
|
||||
/// // Set value to 2 if existing value is 1
|
||||
/// assert_eq!(buf.compare_and_set_i64(0, 1, 2), Ok(true));
|
||||
/// assert_eq!(buf.get_i64_volatile(0), Ok(2));
|
||||
/// ```
|
||||
fn compare_and_set_i64(&self, offset: IndexT, expected: i64, update: i64) -> Result<bool> {
|
||||
// QUESTION: Should I use a volatile read here as well?
|
||||
// Aeron C++ uses a volatile read before the atomic operation, but I think that
|
||||
// may be redundant. In addition, Rust's `read_volatile` operation returns a
|
||||
// *copied* value; running `compare_exchange` on that copy introduces a race condition
|
||||
// because we're no longer comparing a consistent address.
|
||||
self.overlay::<AtomicI64>(offset).map(|a| {
|
||||
a.compare_exchange(expected, update, Ordering::SeqCst, Ordering::SeqCst)
|
||||
.is_ok()
|
||||
})
|
||||
}
|
||||
|
||||
/// Perform a volatile read of an `i64` value
|
||||
///
|
||||
/// ```rust
|
||||
/// # use aeron_rs::client::concurrent::AtomicBuffer;
|
||||
/// let buffer = vec![12u8, 0, 0, 0, 0, 0, 0, 0];
|
||||
/// assert_eq!(buffer.get_i64_volatile(0), Ok(12));
|
||||
/// ```
|
||||
fn get_i64_volatile(&self, offset: IndexT) -> Result<i64> {
|
||||
// QUESTION: Would it be better to express this in terms of an atomic read?
|
||||
self.overlay_volatile::<i64>(offset)
|
||||
}
|
||||
|
||||
/// Perform a volatile write of an `i64` value
|
||||
///
|
||||
/// ```rust
|
||||
/// # use aeron_rs::client::concurrent::AtomicBuffer;
|
||||
/// let mut buffer = vec![0u8; 8];
|
||||
/// buffer.put_i64_ordered(0, 12);
|
||||
/// assert_eq!(buffer.get_i64_volatile(0), Ok(12));
|
||||
/// ```
|
||||
fn put_i64_ordered(&mut self, offset: IndexT, value: i64) -> Result<()> {
|
||||
self.write_volatile::<i64>(offset, value)
|
||||
}
|
||||
|
||||
/// Write the contents of one buffer to another. Does not perform any synchronization
|
||||
fn put_bytes<B>(
|
||||
&mut self,
|
||||
index: IndexT,
|
||||
source: &B,
|
||||
source_index: IndexT,
|
||||
len: IndexT,
|
||||
) -> Result<()>
|
||||
where
|
||||
B: AtomicBuffer,
|
||||
{
|
||||
self.bounds_check(index, len)?;
|
||||
source.bounds_check(source_index, len)?;
|
||||
|
||||
let index = index as usize;
|
||||
let source_index = source_index as usize;
|
||||
let len = len as usize;
|
||||
|
||||
self[index..index + len].copy_from_slice(&source[source_index..source_index + len]);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Repeatedly write a value into an atomic buffer. Guaranteed to use `memset`.
|
||||
fn set_memory(&mut self, offset: IndexT, length: usize, value: u8) -> Result<()> {
|
||||
self.bounds_check(offset, length as IndexT).map(|_| unsafe {
|
||||
self.as_mut_ptr()
|
||||
.offset(offset as isize)
|
||||
.write_bytes(value, length)
|
||||
})
|
||||
}
|
||||
|
||||
/// Perform a volatile read of an `i32` from the buffer
|
||||
///
|
||||
/// ```rust
|
||||
/// # use aeron_rs::client::concurrent::AtomicBuffer;
|
||||
/// let buffer = vec![0, 12, 0, 0, 0];
|
||||
/// assert_eq!(buffer.get_i32_volatile(1), Ok(12));
|
||||
/// ```
|
||||
fn get_i32_volatile(&self, offset: IndexT) -> Result<i32> {
|
||||
self.overlay_volatile::<i32>(offset)
|
||||
}
|
||||
|
||||
/// Perform a volatile write of an `i32` into the buffer
|
||||
///
|
||||
/// ```rust
|
||||
/// # use aeron_rs::client::concurrent::AtomicBuffer;
|
||||
/// let mut bytes = vec![0u8; 4];
|
||||
/// bytes.put_i32_ordered(0, 12);
|
||||
/// assert_eq!(bytes.get_i32_volatile(0), Ok(12));
|
||||
/// ```
|
||||
fn put_i32_ordered(&mut self, offset: IndexT, value: i32) -> Result<()> {
|
||||
self.write_volatile::<i32>(offset, value)
|
||||
}
|
||||
}
|
||||
|
||||
impl AtomicBuffer for Vec<u8> {}
|
||||
|
||||
impl AtomicBuffer for &mut [u8] {}
|
||||
|
||||
impl AtomicBuffer for MmapMut {}
|
||||
|
@ -1,11 +1,12 @@
|
||||
//! Ring buffer wrapper for communicating with the Media Driver
|
||||
use crate::client::concurrent::atomic_buffer::AtomicBuffer;
|
||||
use crate::client::concurrent::AtomicBuffer;
|
||||
use crate::util::bit::align;
|
||||
use crate::util::{bit, AeronError, IndexT, Result};
|
||||
use std::ops::Deref;
|
||||
|
||||
/// Description of the Ring Buffer schema.
|
||||
pub mod buffer_descriptor {
|
||||
use crate::client::concurrent::atomic_buffer::AtomicBuffer;
|
||||
use crate::client::concurrent::AtomicBuffer;
|
||||
use crate::util::bit::{is_power_of_two, CACHE_LINE_LENGTH};
|
||||
use crate::util::AeronError::IllegalArgument;
|
||||
use crate::util::{IndexT, Result};
|
||||
@ -30,7 +31,10 @@ pub mod buffer_descriptor {
|
||||
|
||||
/// Verify the capacity of a buffer is legal for use as a ring buffer.
|
||||
/// Returns the actual capacity excluding ring buffer metadata.
|
||||
pub fn check_capacity(buffer: &AtomicBuffer<'_>) -> Result<IndexT> {
|
||||
pub fn check_capacity<A>(buffer: &A) -> Result<IndexT>
|
||||
where
|
||||
A: AtomicBuffer,
|
||||
{
|
||||
let capacity = (buffer.len() - TRAILER_LENGTH as usize) as IndexT;
|
||||
if is_power_of_two(capacity) {
|
||||
Ok(capacity)
|
||||
@ -107,8 +111,11 @@ pub mod record_descriptor {
|
||||
}
|
||||
|
||||
/// Multi-producer, single-consumer ring buffer implementation.
|
||||
pub struct ManyToOneRingBuffer<'a> {
|
||||
buffer: AtomicBuffer<'a>,
|
||||
pub struct ManyToOneRingBuffer<A>
|
||||
where
|
||||
A: AtomicBuffer,
|
||||
{
|
||||
buffer: A,
|
||||
capacity: IndexT,
|
||||
max_msg_length: IndexT,
|
||||
tail_position_index: IndexT,
|
||||
@ -117,9 +124,12 @@ pub struct ManyToOneRingBuffer<'a> {
|
||||
correlation_id_counter_index: IndexT,
|
||||
}
|
||||
|
||||
impl<'a> ManyToOneRingBuffer<'a> {
|
||||
impl<A> ManyToOneRingBuffer<A>
|
||||
where
|
||||
A: AtomicBuffer,
|
||||
{
|
||||
/// Create a many-to-one ring buffer from an underlying atomic buffer.
|
||||
pub fn wrap(buffer: AtomicBuffer<'a>) -> Result<Self> {
|
||||
pub fn new(buffer: A) -> Result<Self> {
|
||||
buffer_descriptor::check_capacity(&buffer).map(|capacity| ManyToOneRingBuffer {
|
||||
buffer,
|
||||
capacity,
|
||||
@ -141,13 +151,16 @@ impl<'a> ManyToOneRingBuffer<'a> {
|
||||
}
|
||||
|
||||
/// Write a message into the ring buffer
|
||||
pub fn write(
|
||||
pub fn write<B>(
|
||||
&mut self,
|
||||
msg_type_id: i32,
|
||||
source: &AtomicBuffer,
|
||||
source: &B,
|
||||
source_index: IndexT,
|
||||
length: IndexT,
|
||||
) -> Result<()> {
|
||||
) -> Result<()>
|
||||
where
|
||||
B: AtomicBuffer,
|
||||
{
|
||||
record_descriptor::check_msg_type_id(msg_type_id)?;
|
||||
self.check_msg_length(length)?;
|
||||
|
||||
@ -182,10 +195,14 @@ impl<'a> ManyToOneRingBuffer<'a> {
|
||||
/// Read messages from the ring buffer and dispatch to `handler`, up to `message_count_limit`
|
||||
pub fn read<F>(&mut self, mut handler: F, message_count_limit: usize) -> Result<usize>
|
||||
where
|
||||
F: FnMut(i32, &AtomicBuffer, IndexT, IndexT) -> (),
|
||||
F: FnMut(i32, &A, IndexT, IndexT) -> (),
|
||||
{
|
||||
// QUESTION: Should I implement the `get_i64` method that C++ uses?
|
||||
// UNWRAP: Bounds check performed during buffer creation
|
||||
let head = self.buffer.get_i64(self.head_position_index).unwrap();
|
||||
let head = self
|
||||
.buffer
|
||||
.get_i64_volatile(self.head_position_index)
|
||||
.unwrap();
|
||||
let head_index = (head & i64::from(self.capacity - 1)) as i32;
|
||||
let contiguous_block_length = self.capacity - head_index;
|
||||
let mut messages_read = 0;
|
||||
@ -352,22 +369,46 @@ impl<'a> ManyToOneRingBuffer<'a> {
|
||||
}
|
||||
}
|
||||
|
||||
impl<A> Deref for ManyToOneRingBuffer<A>
|
||||
where
|
||||
A: AtomicBuffer,
|
||||
{
|
||||
type Target = A;
|
||||
|
||||
fn deref(&self) -> &Self::Target {
|
||||
&self.buffer
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use crate::client::concurrent::atomic_buffer::AtomicBuffer;
|
||||
use crate::client::concurrent::ring_buffer::{
|
||||
buffer_descriptor, record_descriptor, ManyToOneRingBuffer,
|
||||
};
|
||||
use crate::client::concurrent::ringbuffer::{record_descriptor, ManyToOneRingBuffer};
|
||||
use crate::client::concurrent::AtomicBuffer;
|
||||
use crate::util::IndexT;
|
||||
use std::mem::size_of;
|
||||
|
||||
#[test]
|
||||
fn claim_capacity_basic() {
|
||||
let buf_size = super::buffer_descriptor::TRAILER_LENGTH as usize + 64;
|
||||
let mut buf = vec![0u8; buf_size];
|
||||
const BUFFER_SIZE: usize = 512 + super::buffer_descriptor::TRAILER_LENGTH as usize;
|
||||
|
||||
let atomic_buf = AtomicBuffer::wrap(&mut buf);
|
||||
let mut ring_buf = ManyToOneRingBuffer::wrap(atomic_buf).unwrap();
|
||||
#[test]
|
||||
fn claim_capacity_owned() {
|
||||
let mut ring_buf = ManyToOneRingBuffer::new(vec![0u8; BUFFER_SIZE]).unwrap();
|
||||
|
||||
ring_buf.claim_capacity(16).unwrap();
|
||||
assert_eq!(
|
||||
ring_buf
|
||||
.buffer
|
||||
.get_i64_volatile(ring_buf.tail_position_index),
|
||||
Ok(16)
|
||||
);
|
||||
|
||||
let write_start = ring_buf.claim_capacity(16).unwrap();
|
||||
assert_eq!(write_start, 16);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn claim_capacity_shared() {
|
||||
let buf = &mut [0u8; BUFFER_SIZE][..];
|
||||
let mut ring_buf = ManyToOneRingBuffer::new(buf).unwrap();
|
||||
|
||||
ring_buf.claim_capacity(16).unwrap();
|
||||
assert_eq!(
|
||||
@ -383,27 +424,25 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn write_basic() {
|
||||
let mut bytes = vec![0u8; 512 + buffer_descriptor::TRAILER_LENGTH as usize];
|
||||
let buffer = AtomicBuffer::wrap(&mut bytes);
|
||||
let mut ring_buffer = ManyToOneRingBuffer::wrap(buffer).expect("Invalid buffer size");
|
||||
let mut ring_buffer =
|
||||
ManyToOneRingBuffer::new(vec![0u8; BUFFER_SIZE]).expect("Invalid buffer size");
|
||||
|
||||
let mut source_bytes = [12, 0, 0, 0, 0, 0, 0, 0];
|
||||
let source_bytes = &mut [12u8, 0, 0, 0][..];
|
||||
let source_len = source_bytes.len() as IndexT;
|
||||
let source_buffer = AtomicBuffer::wrap(&mut source_bytes);
|
||||
let type_id = 1;
|
||||
ring_buffer
|
||||
.write(type_id, &source_buffer, 0, source_len)
|
||||
.write(type_id, &source_bytes, 0, source_len)
|
||||
.unwrap();
|
||||
|
||||
drop(ring_buffer);
|
||||
let buffer = AtomicBuffer::wrap(&mut bytes);
|
||||
let record_len = source_len + record_descriptor::HEADER_LENGTH;
|
||||
assert_eq!(
|
||||
buffer.get_i64_volatile(0).unwrap(),
|
||||
ring_buffer.get_i64_volatile(0).unwrap(),
|
||||
record_descriptor::make_header(record_len, type_id)
|
||||
);
|
||||
assert_eq!(
|
||||
buffer.get_i64_volatile(size_of::<i64>() as IndexT).unwrap(),
|
||||
ring_buffer
|
||||
.get_i64_volatile(size_of::<i64>() as IndexT)
|
||||
.unwrap(),
|
||||
12
|
||||
);
|
||||
}
|
||||
@ -411,41 +450,33 @@ mod tests {
|
||||
#[test]
|
||||
fn read_basic() {
|
||||
// Similar to write basic, put something into the buffer
|
||||
let mut bytes = vec![0u8; 512 + buffer_descriptor::TRAILER_LENGTH as usize];
|
||||
let buffer = AtomicBuffer::wrap(&mut bytes);
|
||||
let mut ring_buffer = ManyToOneRingBuffer::wrap(buffer).expect("Invalid buffer size");
|
||||
let mut ring_buffer =
|
||||
ManyToOneRingBuffer::new(vec![0u8; BUFFER_SIZE]).expect("Invalid buffer size");
|
||||
|
||||
let mut source_bytes = [12, 0, 0, 0, 0, 0, 0, 0];
|
||||
let source_len = source_bytes.len() as IndexT;
|
||||
let source_buffer = AtomicBuffer::wrap(&mut source_bytes);
|
||||
let mut source_buffer = &mut [12u8, 0, 0, 0, 0, 0, 0, 0][..];
|
||||
let source_len = source_buffer.len() as IndexT;
|
||||
let type_id = 1;
|
||||
ring_buffer
|
||||
.write(type_id, &source_buffer, 0, source_len)
|
||||
.unwrap();
|
||||
|
||||
// Now we can start the actual read process
|
||||
let c = |_, buf: &AtomicBuffer, offset, _| {
|
||||
assert_eq!(buf.get_i64_volatile(offset).unwrap(), 12)
|
||||
};
|
||||
let c = |_, buf: &Vec<u8>, offset, _| assert_eq!(buf.get_i64_volatile(offset).unwrap(), 12);
|
||||
ring_buffer.read(c, 1).unwrap();
|
||||
|
||||
// Make sure that the buffer was zeroed on finish
|
||||
drop(ring_buffer);
|
||||
let buffer = AtomicBuffer::wrap(&mut bytes);
|
||||
for i in (0..record_descriptor::ALIGNMENT * 1).step_by(4) {
|
||||
assert_eq!(buffer.get_i32(i).unwrap(), 0);
|
||||
assert_eq!(ring_buffer.get_i32_volatile(i).unwrap(), 0);
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn read_multiple() {
|
||||
let mut bytes = vec![0u8; 512 + buffer_descriptor::TRAILER_LENGTH as usize];
|
||||
let buffer = AtomicBuffer::wrap(&mut bytes);
|
||||
let mut ring_buffer = ManyToOneRingBuffer::wrap(buffer).expect("Invalid buffer size");
|
||||
let mut ring_buffer =
|
||||
ManyToOneRingBuffer::new(vec![0u8; BUFFER_SIZE]).expect("Invalid buffer size");
|
||||
|
||||
let mut source_bytes = [12, 0, 0, 0, 0, 0, 0, 0];
|
||||
let source_len = source_bytes.len() as IndexT;
|
||||
let source_buffer = AtomicBuffer::wrap(&mut source_bytes);
|
||||
let mut source_buffer = &mut [12u8, 0, 0, 0, 0, 0, 0, 0][..];
|
||||
let source_len = source_buffer.len() as IndexT;
|
||||
let type_id = 1;
|
||||
ring_buffer
|
||||
.write(type_id, &source_buffer, 0, source_len)
|
||||
@ -455,7 +486,7 @@ mod tests {
|
||||
.unwrap();
|
||||
|
||||
let mut msg_count = 0;
|
||||
let c = |_, buf: &AtomicBuffer, offset, _| {
|
||||
let c = |_, buf: &Vec<u8>, offset, _| {
|
||||
msg_count += 1;
|
||||
assert_eq!(buf.get_i64_volatile(offset).unwrap(), 12);
|
||||
};
|
||||
@ -463,10 +494,8 @@ mod tests {
|
||||
assert_eq!(msg_count, 2);
|
||||
|
||||
// Make sure that the buffer was zeroed on finish
|
||||
drop(ring_buffer);
|
||||
let buffer = AtomicBuffer::wrap(&mut bytes);
|
||||
for i in (0..record_descriptor::ALIGNMENT * 2).step_by(4) {
|
||||
assert_eq!(buffer.get_i32(i).unwrap(), 0);
|
||||
assert_eq!(ring_buffer.get_i32_volatile(i).unwrap(), 0);
|
||||
}
|
||||
}
|
||||
}
|
@ -1,12 +0,0 @@
|
||||
//! Proxy object for interacting with the Media Driver. Handles operations
|
||||
//! involving the command-and-control file protocol.
|
||||
|
||||
use crate::client::concurrent::ring_buffer::ManyToOneRingBuffer;
|
||||
|
||||
/// Proxy object for operations involving the Media Driver
|
||||
pub struct DriverProxy<'a> {
|
||||
_to_driver: ManyToOneRingBuffer<'a>,
|
||||
_client_id: i64,
|
||||
}
|
||||
|
||||
impl<'a> DriverProxy<'a> {}
|
@ -4,4 +4,3 @@
|
||||
pub mod cnc_descriptor;
|
||||
pub mod concurrent;
|
||||
pub mod context;
|
||||
pub mod driver_proxy;
|
||||
|
@ -1,7 +1,8 @@
|
||||
use aeron_driver_sys::*;
|
||||
use aeron_rs::client::cnc_descriptor;
|
||||
use aeron_rs::client::concurrent::atomic_buffer::AtomicBuffer;
|
||||
use aeron_rs::client::concurrent::ring_buffer::ManyToOneRingBuffer;
|
||||
use aeron_rs::client::cnc_descriptor::MetaDataDefinition;
|
||||
use aeron_rs::client::concurrent::ringbuffer::ManyToOneRingBuffer;
|
||||
use aeron_rs::client::concurrent::AtomicBuffer;
|
||||
use aeron_rs::util::IndexT;
|
||||
use memmap::MmapOptions;
|
||||
use std::ffi::{c_void, CString};
|
||||
@ -108,30 +109,29 @@ fn cnc_terminate() {
|
||||
let cnc_metadata_len = cnc_descriptor::META_DATA_LENGTH;
|
||||
|
||||
// Read metadata to get buffer length
|
||||
let buffer_len = {
|
||||
let atomic_buffer = AtomicBuffer::wrap(&mut mmap);
|
||||
let metadata = atomic_buffer
|
||||
.overlay::<cnc_descriptor::MetaDataDefinition>(0)
|
||||
.unwrap();
|
||||
metadata.to_driver_buffer_length
|
||||
};
|
||||
let buffer_len = mmap
|
||||
.overlay::<MetaDataDefinition>(0)
|
||||
.unwrap()
|
||||
.to_driver_buffer_length;
|
||||
|
||||
let buffer_end = cnc_metadata_len + buffer_len as usize;
|
||||
let atomic_buffer = AtomicBuffer::wrap(&mut mmap[cnc_metadata_len..buffer_end]);
|
||||
let mut ring_buffer =
|
||||
ManyToOneRingBuffer::wrap(atomic_buffer).expect("Improperly sized buffer");
|
||||
let mut ring_buffer = ManyToOneRingBuffer::new(&mut mmap[cnc_metadata_len..buffer_end])
|
||||
.expect("Improperly sized buffer");
|
||||
|
||||
// 20 bytes: Client ID (8), correlation ID (8), token length (4)
|
||||
let mut terminate_bytes = vec![0u8; 20];
|
||||
let terminate_len = terminate_bytes.len();
|
||||
let mut source_buffer = AtomicBuffer::wrap(&mut terminate_bytes);
|
||||
let client_id = ring_buffer.next_correlation_id();
|
||||
source_buffer.put_i64_ordered(0, client_id).unwrap();
|
||||
source_buffer.put_i64_ordered(8, -1).unwrap();
|
||||
terminate_bytes.put_i64_ordered(0, client_id).unwrap();
|
||||
terminate_bytes.put_i64_ordered(8, -1).unwrap();
|
||||
|
||||
let term_id: i32 = 0x0E;
|
||||
ring_buffer
|
||||
.write(term_id, &source_buffer, 0, terminate_len as IndexT)
|
||||
.write(
|
||||
term_id,
|
||||
&terminate_bytes,
|
||||
0,
|
||||
terminate_bytes.len() as IndexT,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
// Wait for the driver to finish
|
||||
|
Loading…
Reference in New Issue
Block a user