diff --git a/src/client/concurrent/atomic_buffer.rs b/src/client/concurrent/atomic_buffer.rs new file mode 100644 index 0000000..179f24e --- /dev/null +++ b/src/client/concurrent/atomic_buffer.rs @@ -0,0 +1,83 @@ +//! Buffer that is safe to use in a multi-process/multi-thread context. Typically used for +//! handling atomic updates of memory-mapped buffers. +use std::mem::size_of; +use std::ops::Deref; +use std::sync::atomic::{AtomicI64, Ordering}; + +use crate::util::{AeronError, IndexT, Result}; + +/// Wrapper for atomic operations around an underlying byte buffer +pub struct AtomicBuffer<'a> { + buffer: &'a mut [u8], +} + +impl<'a> Deref for AtomicBuffer<'a> { + type Target = [u8]; + + fn deref(&self) -> &Self::Target { + self.buffer + } +} + +impl<'a> AtomicBuffer<'a> { + /// Create an `AtomicBuffer` as a view on an underlying byte slice + pub fn wrap(buffer: &'a mut [u8]) -> Self { + AtomicBuffer { buffer } + } + + fn bounds_check(&self, offset: IndexT, size: usize) -> Result<()> { + if self.buffer.len() - offset as usize > size { + Err(AeronError::OutOfBounds) + } else { + Ok(()) + } + } + + /// Atomically fetch the current value at an offset, and increment by delta + #[allow(clippy::cast_ptr_alignment)] + pub fn get_and_add_i64(&self, offset: IndexT, delta: i64) -> Result { + self.bounds_check(offset, size_of::()).map(|_| { + let a: &AtomicI64 = + unsafe { &*(self.buffer.as_ptr().offset(offset as isize) as *const AtomicI64) }; + println!("AtomicI64: {:p}", a); + a.fetch_add(delta, Ordering::SeqCst) + }) + } +} + +#[cfg(test)] +mod tests { + use memmap::MmapOptions; + use std::sync::atomic::{AtomicU64, Ordering}; + + use crate::client::concurrent::atomic_buffer::AtomicBuffer; + + #[test] + fn mmap_to_atomic() { + let mut mmap = MmapOptions::new() + .len(24) + .map_anon() + .expect("Unable to map anonymous memory"); + AtomicBuffer::wrap(&mut mmap); + } + + #[test] + fn primitive_atomic_equivalent() { + let value: u64 = 24; + + let val_ptr = &value as *const u64; + let a_ptr = val_ptr as *const AtomicU64; + let a: &AtomicU64 = unsafe { &*a_ptr }; + + assert_eq!(value, (*a).load(Ordering::SeqCst)); + } + + #[test] + fn atomic_i64_increment() { + let mut buf = [16, 0, 0, 0, 0, 0, 0, 0]; + + let atomic_buf = AtomicBuffer::wrap(&mut buf[..]); + assert_eq!(atomic_buf.get_and_add_i64(0, 1), Ok(16)); + assert_eq!(atomic_buf.get_and_add_i64(0, 0), Ok(17)); + } +} diff --git a/src/client/concurrent/mod.rs b/src/client/concurrent/mod.rs new file mode 100644 index 0000000..06eec65 --- /dev/null +++ b/src/client/concurrent/mod.rs @@ -0,0 +1,5 @@ +//! Module for handling safe interactions among the multiple clients making use +//! of a single Media Driver + +pub mod atomic_buffer; +pub mod ring_buffer; diff --git a/src/client/concurrent/ring_buffer.rs b/src/client/concurrent/ring_buffer.rs new file mode 100644 index 0000000..f13ae53 --- /dev/null +++ b/src/client/concurrent/ring_buffer.rs @@ -0,0 +1,81 @@ +//! Ring buffer wrapper for communicating with the Media Driver +use crate::client::concurrent::atomic_buffer::AtomicBuffer; +use crate::util::{IndexT, Result}; + +/// Description of the Ring Buffer schema. Each Ring Buffer looks like: +/// +/// ```text +/// 0 1 2 3 +/// 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 +/// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +/// | Buffer Data ... +/// ... | +/// +---------------------------------------------------------------+ +/// | | +/// | Tail Position | +/// | | +/// | | +/// +---------------------------------------------------------------+ +/// | | +/// | Head Cache Position | +/// | | +/// | | +/// +---------------------------------------------------------------+ +/// | | +/// | Head Position | +/// | | +/// | | +/// +---------------------------------------------------------------+ +/// | | +/// | Correlation ID Counter | +/// | | +/// | | +/// +---------------------------------------------------------------+ +/// | | +/// | Consumer Heartbeat | +/// | | +/// | | +/// +---------------------------------------------------------------+ +/// ``` +pub mod descriptor { + use crate::client::concurrent::atomic_buffer::AtomicBuffer; + use crate::util::AeronError::IllegalArgument; + use crate::util::{is_power_of_two, IndexT, Result, CACHE_LINE_LENGTH}; + + /// Offset of the correlation id counter, as measured in bytes past + /// the start of the ring buffer metadata trailer + pub const CORRELATION_COUNTER_OFFSET: usize = CACHE_LINE_LENGTH * 8; + + /// Total size of the ring buffer metadata trailer + pub const TRAILER_LENGTH: usize = CACHE_LINE_LENGTH * 12; + + /// Verify the capacity of a buffer is legal for use as a ring buffer. + /// Returns the actual buffer capacity once ring buffer metadata has been removed. + pub fn check_capacity(buffer: &AtomicBuffer<'_>) -> Result { + let capacity = (buffer.len() - TRAILER_LENGTH) as IndexT; + if is_power_of_two(capacity) { + Ok(capacity) + } else { + Err(IllegalArgument) + } + } +} + +/// Multi-producer, single-consumer ring buffer implementation. +pub struct ManyToOneRingBuffer<'a> { + _buffer: AtomicBuffer<'a>, + _capacity: IndexT, + _correlation_counter_offset: IndexT, +} + +impl<'a> ManyToOneRingBuffer<'a> { + /// Create a many-to-one ring buffer from an underlying atomic buffer + pub fn wrap(buffer: AtomicBuffer<'a>) -> Result { + descriptor::check_capacity(&buffer).map(|capacity| ManyToOneRingBuffer { + _buffer: buffer, + _capacity: capacity, + _correlation_counter_offset: capacity + + descriptor::CORRELATION_COUNTER_OFFSET as IndexT, + }) + } +} diff --git a/src/client/driver_proxy.rs b/src/client/driver_proxy.rs new file mode 100644 index 0000000..b1a84e4 --- /dev/null +++ b/src/client/driver_proxy.rs @@ -0,0 +1,12 @@ +//! Proxy object for interacting with the Media Driver. Handles operations +//! involving the command-and-control file protocol. + +use crate::client::concurrent::ring_buffer::ManyToOneRingBuffer; + +/// Proxy object for operations involving the Media Driver +pub struct DriverProxy<'a> { + _to_driver: ManyToOneRingBuffer<'a>, + _client_id: i64, +} + +impl<'a> DriverProxy<'a> {} diff --git a/src/client/mod.rs b/src/client/mod.rs index 969153c..b00370e 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -2,4 +2,6 @@ //! //! These are the modules necessary to construct a functioning Aeron client pub mod cnc_descriptor; +pub mod concurrent; pub mod context; +pub mod driver_proxy; diff --git a/src/lib.rs b/src/lib.rs index f593f40..9421c8b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,9 +1,13 @@ //! [Aeron](https://github.com/real-logic/aeron) client for Rust #![deny(missing_docs)] +#[cfg(target_endian = "big")] +compile_error!("Aeron is only supported on little-endian architectures"); + pub mod client; pub mod control_protocol; pub mod driver; +pub mod util; const fn sematic_version_compose(major: u8, minor: u8, patch: u8) -> i32 { (major as i32) << 16 | (minor as i32) << 8 | (patch as i32) diff --git a/src/util.rs b/src/util.rs new file mode 100644 index 0000000..2c91323 --- /dev/null +++ b/src/util.rs @@ -0,0 +1,27 @@ +//! Various utility and helper bits for the Aeron client. Predominantly helpful +//! in mapping between concepts in the C++ API and Rust + +/// Helper type to indicate indexing operations in Aeron, Synonymous with the +/// Aeron C++ `index_t` type. Used to imitate the Java API. +pub type IndexT = i32; + +/// Helper method for quick verification that `IndexT` is a positive power of two +pub fn is_power_of_two(idx: IndexT) -> bool { + idx > 0 && (idx as u32).is_power_of_two() +} + +/// Length of the data blocks used by the CPU cache sub-system in bytes +pub const CACHE_LINE_LENGTH: usize = 64; + +/// Error types from operations in the Aeron client. Synonymous with the exceptions +/// generated by the C++ client. +#[derive(Debug, PartialEq)] +pub enum AeronError { + /// Indication that an argument provided is an illegal value + IllegalArgument, + /// Indication that a memory access would exceed the allowable bounds + OutOfBounds, +} + +/// Result type for operations in the Aeron client +pub type Result = ::std::result::Result;