1
0
mirror of https://github.com/bspeice/aeron-rs synced 2024-12-22 05:48:10 -05:00

Start work on the atomic buffer

C++ Aeron doesn't seem to care about alignment, so I guess I won't either?
This commit is contained in:
Bradlee Speice 2019-10-03 22:10:34 -04:00
parent 5527495b09
commit c463c96170
7 changed files with 214 additions and 0 deletions

View File

@ -0,0 +1,83 @@
//! Buffer that is safe to use in a multi-process/multi-thread context. Typically used for
//! handling atomic updates of memory-mapped buffers.
use std::mem::size_of;
use std::ops::Deref;
use std::sync::atomic::{AtomicI64, Ordering};
use crate::util::{AeronError, IndexT, Result};
/// Wrapper for atomic operations around an underlying byte buffer
pub struct AtomicBuffer<'a> {
buffer: &'a mut [u8],
}
impl<'a> Deref for AtomicBuffer<'a> {
type Target = [u8];
fn deref(&self) -> &Self::Target {
self.buffer
}
}
impl<'a> AtomicBuffer<'a> {
/// Create an `AtomicBuffer` as a view on an underlying byte slice
pub fn wrap(buffer: &'a mut [u8]) -> Self {
AtomicBuffer { buffer }
}
fn bounds_check(&self, offset: IndexT, size: usize) -> Result<()> {
if self.buffer.len() - offset as usize > size {
Err(AeronError::OutOfBounds)
} else {
Ok(())
}
}
/// Atomically fetch the current value at an offset, and increment by delta
#[allow(clippy::cast_ptr_alignment)]
pub fn get_and_add_i64(&self, offset: IndexT, delta: i64) -> Result<i64> {
self.bounds_check(offset, size_of::<AtomicI64>()).map(|_| {
let a: &AtomicI64 =
unsafe { &*(self.buffer.as_ptr().offset(offset as isize) as *const AtomicI64) };
println!("AtomicI64: {:p}", a);
a.fetch_add(delta, Ordering::SeqCst)
})
}
}
#[cfg(test)]
mod tests {
use memmap::MmapOptions;
use std::sync::atomic::{AtomicU64, Ordering};
use crate::client::concurrent::atomic_buffer::AtomicBuffer;
#[test]
fn mmap_to_atomic() {
let mut mmap = MmapOptions::new()
.len(24)
.map_anon()
.expect("Unable to map anonymous memory");
AtomicBuffer::wrap(&mut mmap);
}
#[test]
fn primitive_atomic_equivalent() {
let value: u64 = 24;
let val_ptr = &value as *const u64;
let a_ptr = val_ptr as *const AtomicU64;
let a: &AtomicU64 = unsafe { &*a_ptr };
assert_eq!(value, (*a).load(Ordering::SeqCst));
}
#[test]
fn atomic_i64_increment() {
let mut buf = [16, 0, 0, 0, 0, 0, 0, 0];
let atomic_buf = AtomicBuffer::wrap(&mut buf[..]);
assert_eq!(atomic_buf.get_and_add_i64(0, 1), Ok(16));
assert_eq!(atomic_buf.get_and_add_i64(0, 0), Ok(17));
}
}

View File

@ -0,0 +1,5 @@
//! Module for handling safe interactions among the multiple clients making use
//! of a single Media Driver
pub mod atomic_buffer;
pub mod ring_buffer;

View File

@ -0,0 +1,81 @@
//! Ring buffer wrapper for communicating with the Media Driver
use crate::client::concurrent::atomic_buffer::AtomicBuffer;
use crate::util::{IndexT, Result};
/// Description of the Ring Buffer schema. Each Ring Buffer looks like:
///
/// ```text
/// 0 1 2 3
/// 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
/// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
/// | Buffer Data ...
/// ... |
/// +---------------------------------------------------------------+
/// | |
/// | Tail Position |
/// | |
/// | |
/// +---------------------------------------------------------------+
/// | |
/// | Head Cache Position |
/// | |
/// | |
/// +---------------------------------------------------------------+
/// | |
/// | Head Position |
/// | |
/// | |
/// +---------------------------------------------------------------+
/// | |
/// | Correlation ID Counter |
/// | |
/// | |
/// +---------------------------------------------------------------+
/// | |
/// | Consumer Heartbeat |
/// | |
/// | |
/// +---------------------------------------------------------------+
/// ```
pub mod descriptor {
use crate::client::concurrent::atomic_buffer::AtomicBuffer;
use crate::util::AeronError::IllegalArgument;
use crate::util::{is_power_of_two, IndexT, Result, CACHE_LINE_LENGTH};
/// Offset of the correlation id counter, as measured in bytes past
/// the start of the ring buffer metadata trailer
pub const CORRELATION_COUNTER_OFFSET: usize = CACHE_LINE_LENGTH * 8;
/// Total size of the ring buffer metadata trailer
pub const TRAILER_LENGTH: usize = CACHE_LINE_LENGTH * 12;
/// Verify the capacity of a buffer is legal for use as a ring buffer.
/// Returns the actual buffer capacity once ring buffer metadata has been removed.
pub fn check_capacity(buffer: &AtomicBuffer<'_>) -> Result<IndexT> {
let capacity = (buffer.len() - TRAILER_LENGTH) as IndexT;
if is_power_of_two(capacity) {
Ok(capacity)
} else {
Err(IllegalArgument)
}
}
}
/// Multi-producer, single-consumer ring buffer implementation.
pub struct ManyToOneRingBuffer<'a> {
_buffer: AtomicBuffer<'a>,
_capacity: IndexT,
_correlation_counter_offset: IndexT,
}
impl<'a> ManyToOneRingBuffer<'a> {
/// Create a many-to-one ring buffer from an underlying atomic buffer
pub fn wrap(buffer: AtomicBuffer<'a>) -> Result<Self> {
descriptor::check_capacity(&buffer).map(|capacity| ManyToOneRingBuffer {
_buffer: buffer,
_capacity: capacity,
_correlation_counter_offset: capacity
+ descriptor::CORRELATION_COUNTER_OFFSET as IndexT,
})
}
}

View File

@ -0,0 +1,12 @@
//! Proxy object for interacting with the Media Driver. Handles operations
//! involving the command-and-control file protocol.
use crate::client::concurrent::ring_buffer::ManyToOneRingBuffer;
/// Proxy object for operations involving the Media Driver
pub struct DriverProxy<'a> {
_to_driver: ManyToOneRingBuffer<'a>,
_client_id: i64,
}
impl<'a> DriverProxy<'a> {}

View File

@ -2,4 +2,6 @@
//! //!
//! These are the modules necessary to construct a functioning Aeron client //! These are the modules necessary to construct a functioning Aeron client
pub mod cnc_descriptor; pub mod cnc_descriptor;
pub mod concurrent;
pub mod context; pub mod context;
pub mod driver_proxy;

View File

@ -1,9 +1,13 @@
//! [Aeron](https://github.com/real-logic/aeron) client for Rust //! [Aeron](https://github.com/real-logic/aeron) client for Rust
#![deny(missing_docs)] #![deny(missing_docs)]
#[cfg(target_endian = "big")]
compile_error!("Aeron is only supported on little-endian architectures");
pub mod client; pub mod client;
pub mod control_protocol; pub mod control_protocol;
pub mod driver; pub mod driver;
pub mod util;
const fn sematic_version_compose(major: u8, minor: u8, patch: u8) -> i32 { const fn sematic_version_compose(major: u8, minor: u8, patch: u8) -> i32 {
(major as i32) << 16 | (minor as i32) << 8 | (patch as i32) (major as i32) << 16 | (minor as i32) << 8 | (patch as i32)

27
src/util.rs Normal file
View File

@ -0,0 +1,27 @@
//! Various utility and helper bits for the Aeron client. Predominantly helpful
//! in mapping between concepts in the C++ API and Rust
/// Helper type to indicate indexing operations in Aeron, Synonymous with the
/// Aeron C++ `index_t` type. Used to imitate the Java API.
pub type IndexT = i32;
/// Helper method for quick verification that `IndexT` is a positive power of two
pub fn is_power_of_two(idx: IndexT) -> bool {
idx > 0 && (idx as u32).is_power_of_two()
}
/// Length of the data blocks used by the CPU cache sub-system in bytes
pub const CACHE_LINE_LENGTH: usize = 64;
/// Error types from operations in the Aeron client. Synonymous with the exceptions
/// generated by the C++ client.
#[derive(Debug, PartialEq)]
pub enum AeronError {
/// Indication that an argument provided is an illegal value
IllegalArgument,
/// Indication that a memory access would exceed the allowable bounds
OutOfBounds,
}
/// Result type for operations in the Aeron client
pub type Result<T> = ::std::result::Result<T, AeronError>;