Merge pull request #6 from bspeice/driver_terminate

Driver Terminate
pull/9/head
bspeice 2019-10-07 19:12:49 -04:00 committed by GitHub
commit a348e90d36
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 1436 additions and 141 deletions

View File

@ -13,7 +13,10 @@ maintenance = { status = "actively-developed" }
[dependencies]
aeron_driver-sys = { path = "./aeron_driver-sys" }
memmap = "0.7"
num = "0.2"
[dev-dependencies]
clap = "2.33"
ctrlc = "3.1.3"
tempfile = "3.1"

View File

@ -1,3 +1,4 @@
#include <stddef.h>
#include <aeron_driver_context.h>
#include <aeronmd.h>
#include <aeron_driver_context.h>
#include <command/aeron_control_protocol.h>

View File

@ -97,6 +97,8 @@ pub fn main() {
.header("bindings.h")
.whitelist_function("aeron_.*")
.whitelist_type("aeron_.*")
.whitelist_var("AERON_.*")
.constified_enum_module("aeron_.*_enum")
.generate()
.expect("Unable to generate aeron_driver bindings");

View File

@ -1,96 +1,22 @@
//! Media driver startup example based on
//! [aeronmd.c](https://github.com/real-logic/aeron/blob/master/aeron-driver/src/main/c/aeronmd.c)
#![deny(missing_docs)]
use aeron_driver_sys::*;
use clap;
use ctrlc;
use std::ffi::CStr;
use std::os::raw::c_void;
use std::ptr;
//! A version of the `aeronmd` runner program demonstrating the Rust wrappers
//! around Media Driver functionality.
use aeron_rs::driver::DriverContext;
use std::sync::atomic::{AtomicBool, Ordering};
static RUNNING: AtomicBool = AtomicBool::new(true);
unsafe extern "C" fn termination_hook(_clientd: *mut c_void) {
RUNNING.store(false, Ordering::SeqCst);
}
static RUNNING: AtomicBool = AtomicBool::new(false);
fn main() {
let version = unsafe { CStr::from_ptr(aeron_version_full()) };
let _cmdline = clap::App::new("aeronmd")
.version(version.to_str().unwrap())
.get_matches();
let driver = DriverContext::default()
.build()
.expect("Unable to create media driver");
// TODO: Handle -D switches
let driver = driver.start().expect("Unable to start media driver");
RUNNING.store(true, Ordering::SeqCst);
ctrlc::set_handler(move || {
// TODO: Actually understand atomic ordering
RUNNING.store(false, Ordering::SeqCst);
})
.unwrap();
println!("Press Ctrl-C to quit");
let mut init_success = true;
let mut context: *mut aeron_driver_context_t = ptr::null_mut();
let mut driver: *mut aeron_driver_t = ptr::null_mut();
if init_success {
let context_init = unsafe { aeron_driver_context_init(&mut context) };
if context_init < 0 {
let err_code = unsafe { aeron_errcode() };
let err_str = unsafe { CStr::from_ptr(aeron_errmsg()) }.to_str().unwrap();
eprintln!("ERROR: context init ({}) {}", err_code, err_str);
init_success = false;
}
while RUNNING.load(Ordering::SeqCst) {
// TODO: Termination hook
driver.do_work();
}
if init_success {
let term_hook = unsafe {
aeron_driver_context_set_driver_termination_hook(
context,
Some(termination_hook),
ptr::null_mut(),
)
};
if term_hook < 0 {
let err_code = unsafe { aeron_errcode() };
let err_str = unsafe { CStr::from_ptr(aeron_errmsg()) }.to_str().unwrap();
eprintln!(
"ERROR: context set termination hook ({}) {}",
err_code, err_str
);
init_success = false;
}
}
if init_success {
let driver_init = unsafe { aeron_driver_init(&mut driver, context) };
if driver_init < 0 {
let err_code = unsafe { aeron_errcode() };
let err_str = unsafe { CStr::from_ptr(aeron_errmsg()) }.to_str().unwrap();
eprintln!("ERROR: driver init ({}) {}", err_code, err_str);
init_success = false;
}
}
if init_success {
let driver_start = unsafe { aeron_driver_start(driver, true) };
if driver_start < 0 {
let err_code = unsafe { aeron_errcode() };
let err_str = unsafe { CStr::from_ptr(aeron_errmsg()) }.to_str().unwrap();
eprintln!("ERROR: driver start ({}) {}", err_code, err_str);
init_success = false;
}
}
if init_success {
println!("Press Ctrl-C to exit.");
while RUNNING.load(Ordering::SeqCst) {
unsafe { aeron_driver_main_idle_strategy(driver, aeron_driver_main_do_work(driver)) };
}
}
unsafe { aeron_driver_close(driver) };
unsafe { aeron_driver_context_close(context) };
}

97
examples/aeronmd_sys.rs Normal file
View File

@ -0,0 +1,97 @@
//! Media driver startup example based on
//! [aeronmd.c](https://github.com/real-logic/aeron/blob/master/aeron-driver/src/main/c/aeronmd.c)
//! This example demonstrates direct usage of the -sys bindings for the Media Driver API.
use aeron_driver_sys::*;
use clap;
use ctrlc;
use std::ffi::CStr;
use std::os::raw::c_void;
use std::ptr;
use std::sync::atomic::{AtomicBool, Ordering};
static RUNNING: AtomicBool = AtomicBool::new(true);
unsafe extern "C" fn termination_hook(_clientd: *mut c_void) {
println!("Terminated");
RUNNING.store(false, Ordering::SeqCst);
}
fn main() {
let version = unsafe { CStr::from_ptr(aeron_version_full()) };
let _cmdline = clap::App::new("aeronmd")
.version(version.to_str().unwrap())
.get_matches();
// TODO: Handle -D switches
ctrlc::set_handler(move || {
// TODO: Actually understand atomic ordering
RUNNING.store(false, Ordering::SeqCst);
})
.unwrap();
let mut init_success = true;
let mut context: *mut aeron_driver_context_t = ptr::null_mut();
let mut driver: *mut aeron_driver_t = ptr::null_mut();
if init_success {
let context_init = unsafe { aeron_driver_context_init(&mut context) };
if context_init < 0 {
let err_code = unsafe { aeron_errcode() };
let err_str = unsafe { CStr::from_ptr(aeron_errmsg()) }.to_str().unwrap();
eprintln!("ERROR: context init ({}) {}", err_code, err_str);
init_success = false;
}
}
if init_success {
let term_hook = unsafe {
aeron_driver_context_set_driver_termination_hook(
context,
Some(termination_hook),
ptr::null_mut(),
)
};
if term_hook < 0 {
let err_code = unsafe { aeron_errcode() };
let err_str = unsafe { CStr::from_ptr(aeron_errmsg()) }.to_str().unwrap();
eprintln!(
"ERROR: context set termination hook ({}) {}",
err_code, err_str
);
init_success = false;
}
}
if init_success {
let driver_init = unsafe { aeron_driver_init(&mut driver, context) };
if driver_init < 0 {
let err_code = unsafe { aeron_errcode() };
let err_str = unsafe { CStr::from_ptr(aeron_errmsg()) }.to_str().unwrap();
eprintln!("ERROR: driver init ({}) {}", err_code, err_str);
init_success = false;
}
}
if init_success {
let driver_start = unsafe { aeron_driver_start(driver, true) };
if driver_start < 0 {
let err_code = unsafe { aeron_errcode() };
let err_str = unsafe { CStr::from_ptr(aeron_errmsg()) }.to_str().unwrap();
eprintln!("ERROR: driver start ({}) {}", err_code, err_str);
init_success = false;
}
}
if init_success {
println!("Press Ctrl-C to exit.");
while RUNNING.load(Ordering::SeqCst) {
unsafe { aeron_driver_main_idle_strategy(driver, aeron_driver_main_do_work(driver)) };
}
}
unsafe { aeron_driver_close(driver) };
unsafe { aeron_driver_context_close(context) };
}

View File

@ -0,0 +1,109 @@
//! Description of the command and control file used to communicate between the Media Driver
//! and its clients.
//!
//! File layout:
//!
//! ```text
//! +-----------------------------+
//! | Meta Data |
//! +-----------------------------+
//! | to-driver Buffer |
//! +-----------------------------+
//! | to-clients Buffer |
//! +-----------------------------+
//! | Counters Metadata Buffer |
//! +-----------------------------+
//! | Counters Values Buffer |
//! +-----------------------------+
//! | Error Log |
//! +-----------------------------+
//! ```
use crate::util::bit;
use std::mem::size_of;
/// The CnC file metadata header. Layout:
///
/// ```text
/// 0 1 2 3
/// 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
/// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
/// | Aeron CnC Version |
/// +---------------------------------------------------------------+
/// | to-driver buffer length |
/// +---------------------------------------------------------------+
/// | to-clients buffer length |
/// +---------------------------------------------------------------+
/// | Counters Metadata buffer length |
/// +---------------------------------------------------------------+
/// | Counters Values buffer length |
/// +---------------------------------------------------------------+
/// | Error Log buffer length |
/// +---------------------------------------------------------------+
/// | Client Liveness Timeout |
/// | |
/// +---------------------------------------------------------------+
/// | Driver Start Timestamp |
/// | |
/// +---------------------------------------------------------------+
/// | Driver PID |
/// | |
/// +---------------------------------------------------------------+
/// ```
#[repr(C, align(4))]
pub struct MetaDataDefinition {
cnc_version: i32,
/// Size of the buffer containing data going to the media driver
pub to_driver_buffer_length: i32,
_to_client_buffer_length: i32,
_counter_metadata_buffer_length: i32,
_counter_values_buffer_length: i32,
_error_log_buffer_length: i32,
_client_liveness_timeout: i64,
_start_timestamp: i64,
_pid: i64,
}
/// Length of the metadata block in a CnC file. Note that it's not equivalent
/// to the actual struct length.
pub const META_DATA_LENGTH: usize =
bit::align_usize(size_of::<MetaDataDefinition>(), bit::CACHE_LINE_LENGTH * 2);
/// Version code for the Aeron CnC file format
pub const CNC_VERSION: i32 = crate::sematic_version_compose(0, 0, 16);
/// Filename for the CnC file located in the Aeron directory
pub const CNC_FILE: &str = "cnc.dat";
#[cfg(test)]
mod tests {
use crate::client::cnc_descriptor::{MetaDataDefinition, CNC_FILE, CNC_VERSION};
use crate::driver::DriverContext;
use memmap::MmapOptions;
use std::fs::File;
use tempfile::tempdir;
#[test]
fn read_cnc_version() {
let temp_dir = tempdir().unwrap();
let dir = temp_dir.path().to_path_buf();
temp_dir.close().unwrap();
let _driver = DriverContext::default()
.set_aeron_dir(&dir)
.build()
.unwrap();
// Open the CnC location
let cnc_path = dir.join(CNC_FILE);
let cnc_file = File::open(&cnc_path).expect("Unable to open CnC file");
let mmap = unsafe {
MmapOptions::default()
.map(&cnc_file)
.expect("Unable to memory map CnC file")
};
let metadata: &MetaDataDefinition = unsafe { &*(mmap.as_ptr().cast()) };
assert_eq!(metadata.cnc_version, CNC_VERSION);
}
}

View File

@ -0,0 +1,252 @@
//! Buffer that is safe to use in a multi-process/multi-thread context. Typically used for
//! handling atomic updates of memory-mapped buffers.
use std::mem::size_of;
use std::ops::Deref;
use std::sync::atomic::{AtomicI64, Ordering};
use crate::util::{AeronError, IndexT, Result};
use std::ptr::{read_volatile, write_volatile};
/// Wrapper for atomic operations around an underlying byte buffer
pub struct AtomicBuffer<'a> {
buffer: &'a mut [u8],
}
impl<'a> Deref for AtomicBuffer<'a> {
type Target = [u8];
fn deref(&self) -> &Self::Target {
self.buffer
}
}
impl<'a> AtomicBuffer<'a> {
/// Create an `AtomicBuffer` as a view on an underlying byte slice
pub fn wrap(buffer: &'a mut [u8]) -> Self {
AtomicBuffer { buffer }
}
fn bounds_check(&self, offset: IndexT, size: IndexT) -> Result<()> {
if offset < 0 || size < 0 || self.buffer.len() as IndexT - offset < size {
Err(AeronError::OutOfBounds)
} else {
Ok(())
}
}
/// Overlay a struct on a buffer.
///
/// NOTE: Has the potential to cause undefined behavior if alignment is incorrect.
pub fn overlay<T>(&self, offset: IndexT) -> Result<&T>
where
T: Sized,
{
self.bounds_check(offset, size_of::<T>() as IndexT)
.map(|_| {
let offset_ptr = unsafe { self.buffer.as_ptr().offset(offset as isize) };
unsafe { &*(offset_ptr as *const T) }
})
}
fn overlay_volatile<T>(&self, offset: IndexT) -> Result<T>
where
T: Copy,
{
self.bounds_check(offset, size_of::<T>() as IndexT)
.map(|_| {
let offset_ptr = unsafe { self.buffer.as_ptr().offset(offset as isize) };
unsafe { read_volatile(offset_ptr as *const T) }
})
}
fn write_volatile<T>(&mut self, offset: IndexT, val: T) -> Result<()>
where
T: Copy,
{
self.bounds_check(offset, size_of::<T>() as IndexT)
.map(|_| {
let offset_ptr = unsafe { self.buffer.as_mut_ptr().offset(offset as isize) };
unsafe { write_volatile(offset_ptr as *mut T, val) };
})
}
/// Atomically fetch the current value at an offset, and increment by delta
///
/// ```rust
/// # use aeron_rs::client::concurrent::atomic_buffer::AtomicBuffer;
/// # use aeron_rs::util::AeronError;
/// let mut bytes = [0u8; 9];
/// let mut buffer = AtomicBuffer::wrap(&mut bytes);
///
/// // Simple case modifies only the first byte
/// assert_eq!(buffer.get_and_add_i64(0, 1), Ok(0));
/// assert_eq!(buffer.get_and_add_i64(0, 0), Ok(1));
///
/// // Using an offset modifies the second byte
/// assert_eq!(buffer.get_and_add_i64(1, 1), Ok(0));
/// assert_eq!(buffer.get_and_add_i64(1, 0), Ok(1));
///
/// // An offset of 2 means buffer size must be 10 to contain an `i64`
/// assert_eq!(buffer.get_and_add_i64(2, 0), Err(AeronError::OutOfBounds));
/// ```
pub fn get_and_add_i64(&self, offset: IndexT, delta: i64) -> Result<i64> {
self.overlay::<AtomicI64>(offset)
.map(|a| a.fetch_add(delta, Ordering::SeqCst))
}
/// Perform a volatile read
///
/// ```rust
/// # use aeron_rs::client::concurrent::atomic_buffer::AtomicBuffer;
/// let mut bytes = [12, 0, 0, 0, 0, 0, 0, 0];
/// let buffer = AtomicBuffer::wrap(&mut bytes);
///
/// assert_eq!(buffer.get_i64_volatile(0), Ok(12));
/// ```
pub fn get_i64_volatile(&self, offset: IndexT) -> Result<i64> {
// QUESTION: Would it be better to express this in terms of an atomic read?
self.overlay_volatile::<i64>(offset)
}
/// Perform a volatile read
///
/// ```rust
/// # use aeron_rs::client::concurrent::atomic_buffer::AtomicBuffer;
/// let mut bytes = [12, 0, 0, 0];
/// let buffer = AtomicBuffer::wrap(&mut bytes);
///
/// assert_eq!(buffer.get_i32_volatile(0), Ok(12));
/// ```
pub fn get_i32_volatile(&self, offset: IndexT) -> Result<i32> {
self.overlay_volatile::<i32>(offset)
}
/// Perform a volatile write of an `i64` into the buffer
///
/// ```rust
/// # use aeron_rs::client::concurrent::atomic_buffer::AtomicBuffer;
/// let mut bytes = [0u8; 8];
/// let mut buffer = AtomicBuffer::wrap(&mut bytes);
///
/// buffer.put_i64_ordered(0, 12);
/// assert_eq!(buffer.get_i64_volatile(0), Ok(12));
/// ```
pub fn put_i64_ordered(&mut self, offset: IndexT, val: i64) -> Result<()> {
// QUESTION: Would it be better to have callers use `write_volatile` directly
self.write_volatile::<i64>(offset, val)
}
/// Perform a volatile write of an `i32` into the buffer
///
/// ```rust
/// # use aeron_rs::client::concurrent::atomic_buffer::AtomicBuffer;
/// let mut bytes = [0u8; 4];
/// let mut buffer = AtomicBuffer::wrap(&mut bytes);
///
/// buffer.put_i32_ordered(0, 12);
/// assert_eq!(buffer.get_i32_volatile(0), Ok(12));
/// ```
pub fn put_i32_ordered(&mut self, offset: IndexT, val: i32) -> Result<()> {
// QUESTION: Would it be better to have callers use `write_volatile` directly
self.write_volatile::<i32>(offset, val)
}
/// Write the contents of one buffer to another. Does not perform any synchronization.
///
/// ```rust
/// # use aeron_rs::client::concurrent::atomic_buffer::AtomicBuffer;
/// let mut source_bytes = [1u8, 2, 3, 4];
/// let source = AtomicBuffer::wrap(&mut source_bytes);
///
/// let mut dest_bytes = [0, 0, 0, 0];
/// let mut dest = AtomicBuffer::wrap(&mut dest_bytes);
///
/// dest.put_bytes(1, &source, 1, 3);
/// drop(dest);
/// assert_eq!(dest_bytes, [0u8, 2, 3, 4]);
/// ```
pub fn put_bytes(
&mut self,
index: IndexT,
source: &AtomicBuffer,
source_index: IndexT,
len: IndexT,
) -> Result<()> {
self.bounds_check(index, len)?;
source.bounds_check(source_index, len)?;
let index = index as usize;
let source_index = source_index as usize;
let len = len as usize;
self.buffer[index..index + len].copy_from_slice(&source[source_index..source_index + len]);
Ok(())
}
/// Compare an expected value with what is in memory, and if it matches,
/// update to a new value. Returns `Ok(true)` if the update was successful,
/// and `Ok(false)` if the update failed.
///
/// ```rust
/// # use aeron_rs::client::concurrent::atomic_buffer::AtomicBuffer;
/// let mut buf = [0u8; 8];
/// let atomic_buf = AtomicBuffer::wrap(&mut buf);
/// // Set value to 1
/// atomic_buf.get_and_add_i64(0, 1).unwrap();
///
/// // Set value to 1 if existing value is 0
/// assert_eq!(atomic_buf.compare_and_set_i64(0, 0, 1), Ok(false));
/// // Set value to 2 if existing value is 1
/// assert_eq!(atomic_buf.compare_and_set_i64(0, 1, 2), Ok(true));
/// assert_eq!(atomic_buf.get_i64_volatile(0), Ok(2));
/// ```
pub fn compare_and_set_i64(&self, offset: IndexT, expected: i64, update: i64) -> Result<bool> {
// QUESTION: Do I need a volatile and atomic read here?
// Aeron C++ uses a volatile read before the atomic operation, but I think that
// may be redundant. In addition, Rust's `read_volatile` operation returns a
// *copied* value; running `compare_exchange` on that copy introduces a race condition
// because we're no longer comparing a consistent address.
self.overlay::<AtomicI64>(offset).map(|a| {
a.compare_exchange(expected, update, Ordering::SeqCst, Ordering::SeqCst)
.is_ok()
})
}
}
#[cfg(test)]
mod tests {
use memmap::MmapOptions;
use std::sync::atomic::{AtomicU64, Ordering};
use crate::client::concurrent::atomic_buffer::AtomicBuffer;
use crate::util::AeronError;
#[test]
fn mmap_to_atomic() {
let mut mmap = MmapOptions::new()
.len(24)
.map_anon()
.expect("Unable to map anonymous memory");
AtomicBuffer::wrap(&mut mmap);
}
#[test]
fn primitive_atomic_equivalent() {
let value: u64 = 24;
let val_ptr = &value as *const u64;
let a_ptr = val_ptr as *const AtomicU64;
let a: &AtomicU64 = unsafe { &*a_ptr };
assert_eq!(value, (*a).load(Ordering::SeqCst));
}
#[test]
fn negative_offset() {
let mut buf = [16, 0, 0, 0, 0, 0, 0, 0];
let atomic_buf = AtomicBuffer::wrap(&mut buf);
assert_eq!(
atomic_buf.get_and_add_i64(-1, 0),
Err(AeronError::OutOfBounds)
)
}
}

View File

@ -0,0 +1,5 @@
//! Module for handling safe interactions among the multiple clients making use
//! of a single Media Driver
pub mod atomic_buffer;
pub mod ring_buffer;

View File

@ -0,0 +1,340 @@
//! Ring buffer wrapper for communicating with the Media Driver
use crate::client::concurrent::atomic_buffer::AtomicBuffer;
use crate::util::{bit, AeronError, IndexT, Result};
/// Description of the Ring Buffer schema.
pub mod buffer_descriptor {
use crate::client::concurrent::atomic_buffer::AtomicBuffer;
use crate::util::bit::{is_power_of_two, CACHE_LINE_LENGTH};
use crate::util::AeronError::IllegalArgument;
use crate::util::{IndexT, Result};
// QUESTION: Why are these offsets so large when we only ever use i64 types?
/// Offset in the ring buffer metadata to the end of the most recent record.
pub const TAIL_POSITION_OFFSET: IndexT = (CACHE_LINE_LENGTH * 2) as IndexT;
/// QUESTION: Why the distinction between HEAD_CACHE and HEAD?
pub const HEAD_CACHE_POSITION_OFFSET: IndexT = (CACHE_LINE_LENGTH * 4) as IndexT;
/// Offset in the ring buffer metadata to index of the next record to read.
pub const HEAD_POSITION_OFFSET: IndexT = (CACHE_LINE_LENGTH * 6) as IndexT;
/// Offset of the correlation id counter, as measured in bytes past
/// the start of the ring buffer metadata trailer.
pub const CORRELATION_COUNTER_OFFSET: IndexT = (CACHE_LINE_LENGTH * 8) as IndexT;
/// Total size of the ring buffer metadata trailer.
pub const TRAILER_LENGTH: IndexT = (CACHE_LINE_LENGTH * 12) as IndexT;
/// Verify the capacity of a buffer is legal for use as a ring buffer.
/// Returns the actual capacity excluding ring buffer metadata.
pub fn check_capacity(buffer: &AtomicBuffer<'_>) -> Result<IndexT> {
let capacity = (buffer.len() - TRAILER_LENGTH as usize) as IndexT;
if is_power_of_two(capacity) {
Ok(capacity)
} else {
Err(IllegalArgument)
}
}
}
/// Ring buffer message header. Made up of fields for message length, message type,
/// and then the encoded message.
///
/// Writing the record length signals the message recording is complete, and all
/// associated ring buffer metadata has been properly updated.
///
/// ```text
/// 0 1 2 3
/// 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
/// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
/// |R| Record Length |
/// +-+-------------------------------------------------------------+
/// | Type |
/// +---------------------------------------------------------------+
/// | Encoded Message ...
///... |
/// +---------------------------------------------------------------+
/// ```
// QUESTION: What is the `R` bit in the diagram above?
pub mod record_descriptor {
use std::mem::size_of;
use crate::util::Result;
use crate::util::{AeronError, IndexT};
/// Size of the ring buffer record header.
pub const HEADER_LENGTH: IndexT = size_of::<i32>() as IndexT * 2;
/// Alignment size of records written to the buffer
pub const ALIGNMENT: IndexT = HEADER_LENGTH;
/// Message type indicating to the media driver that space has been reserved,
/// and is not yet ready for processing.
pub const PADDING_MSG_TYPE_ID: i32 = -1;
/// Retrieve the header bits for a ring buffer record.
pub fn make_header(length: i32, msg_type_id: i32) -> i64 {
// QUESTION: Instead of masking, can't we just cast and return u32/u64?
// Smells like Java.
((i64::from(msg_type_id) & 0xFFFF_FFFF) << 32) | (i64::from(length) & 0xFFFF_FFFF)
}
/// Verify a message type identifier is safe for use
pub fn check_msg_type_id(msg_type_id: i32) -> Result<()> {
if msg_type_id < 1 {
Err(AeronError::IllegalArgument)
} else {
Ok(())
}
}
/// Fetch the offset to begin writing a message payload
pub fn encoded_msg_offset(record_offset: IndexT) -> IndexT {
record_offset + HEADER_LENGTH
}
/// Fetch the offset to begin writing the message length
pub fn length_offset(record_offset: IndexT) -> IndexT {
record_offset
}
}
/// Multi-producer, single-consumer ring buffer implementation.
pub struct ManyToOneRingBuffer<'a> {
buffer: AtomicBuffer<'a>,
capacity: IndexT,
max_msg_length: IndexT,
tail_position_index: IndexT,
head_cache_position_index: IndexT,
head_position_index: IndexT,
correlation_id_counter_index: IndexT,
}
impl<'a> ManyToOneRingBuffer<'a> {
/// Create a many-to-one ring buffer from an underlying atomic buffer.
pub fn wrap(buffer: AtomicBuffer<'a>) -> Result<Self> {
buffer_descriptor::check_capacity(&buffer).map(|capacity| ManyToOneRingBuffer {
buffer,
capacity,
max_msg_length: capacity / 8,
tail_position_index: capacity + buffer_descriptor::TAIL_POSITION_OFFSET,
head_cache_position_index: capacity + buffer_descriptor::HEAD_CACHE_POSITION_OFFSET,
head_position_index: capacity + buffer_descriptor::HEAD_POSITION_OFFSET,
correlation_id_counter_index: capacity + buffer_descriptor::CORRELATION_COUNTER_OFFSET,
})
}
/// Atomically retrieve the next correlation identifier. Used as a unique identifier for
/// interactions with the Media Driver
pub fn next_correlation_id(&self) -> i64 {
// UNWRAP: Known-valid offset calculated during initialization
self.buffer
.get_and_add_i64(self.correlation_id_counter_index, 1)
.unwrap()
}
/// Write a message into the ring buffer
pub fn write(
&mut self,
msg_type_id: i32,
source: &AtomicBuffer,
source_index: IndexT,
length: IndexT,
) -> Result<()> {
record_descriptor::check_msg_type_id(msg_type_id)?;
self.check_msg_length(length)?;
let record_len = length + record_descriptor::HEADER_LENGTH;
let required = bit::align(record_len, record_descriptor::ALIGNMENT);
let record_index = self.claim_capacity(required)?;
// UNWRAP: `claim_capacity` performed bounds checking
self.buffer
.put_i64_ordered(
record_index,
record_descriptor::make_header(-length, msg_type_id),
)
.unwrap();
// UNWRAP: `claim_capacity` performed bounds checking
self.buffer
.put_bytes(
record_descriptor::encoded_msg_offset(record_index),
source,
source_index,
length,
)
.unwrap();
// UNWRAP: `claim_capacity` performed bounds checking
self.buffer
.put_i32_ordered(record_descriptor::length_offset(record_index), record_len)
.unwrap();
Ok(())
}
/// Claim capacity for a specific message size in the ring buffer. Returns the offset/index
/// at which to start writing the next record.
fn claim_capacity(&mut self, required: IndexT) -> Result<IndexT> {
// QUESTION: Is this mask how we handle the "ring" in ring buffer?
// Would explain why we assert buffer capacity is a power of two during initialization
let mask = self.capacity - 1;
// UNWRAP: Known-valid offset calculated during initialization
let mut head = self
.buffer
.get_i64_volatile(self.head_cache_position_index)
.unwrap();
let mut tail: i64;
let mut tail_index: IndexT;
let mut padding: IndexT;
// Note the braces, making this a do-while loop
while {
// UNWRAP: Known-valid offset calculated during initialization
tail = self
.buffer
.get_i64_volatile(self.tail_position_index)
.unwrap();
let available_capacity = self.capacity - (tail - head) as IndexT;
if required > available_capacity {
// UNWRAP: Known-valid offset calculated during initialization
head = self
.buffer
.get_i64_volatile(self.head_position_index)
.unwrap();
if required > (self.capacity - (tail - head) as IndexT) {
return Err(AeronError::InsufficientCapacity);
}
// UNWRAP: Known-valid offset calculated during initialization
self.buffer
.put_i64_ordered(self.head_cache_position_index, head)
.unwrap();
}
padding = 0;
// Because we assume `tail` and `mask` are always positive integers,
// it's "safe" to widen the types and bitmask below. We're just trying
// to imitate C++ here.
tail_index = (tail & i64::from(mask)) as IndexT;
let to_buffer_end_length = self.capacity - tail_index;
if required > to_buffer_end_length {
let mut head_index = (head & i64::from(mask)) as IndexT;
if required > head_index {
// UNWRAP: Known-valid offset calculated during initialization
head = self
.buffer
.get_i64_volatile(self.head_position_index)
.unwrap();
head_index = (head & i64::from(mask)) as IndexT;
if required > head_index {
return Err(AeronError::InsufficientCapacity);
}
// UNWRAP: Known-valid offset calculated during initialization
self.buffer
.put_i64_ordered(self.head_cache_position_index, head)
.unwrap();
}
padding = to_buffer_end_length;
}
// UNWRAP: Known-valid offset calculated during initialization
!self
.buffer
.compare_and_set_i64(
self.tail_position_index,
tail,
tail + i64::from(required) + i64::from(padding),
)
.unwrap()
} {}
if padding != 0 {
// UNWRAP: Known-valid offset calculated during initialization
self.buffer
.put_i64_ordered(
tail_index,
record_descriptor::make_header(padding, record_descriptor::PADDING_MSG_TYPE_ID),
)
.unwrap();
tail_index = 0;
}
Ok(tail_index)
}
fn check_msg_length(&self, length: IndexT) -> Result<()> {
if length > self.max_msg_length {
Err(AeronError::IllegalArgument)
} else {
Ok(())
}
}
}
#[cfg(test)]
mod tests {
use crate::client::concurrent::atomic_buffer::AtomicBuffer;
use crate::client::concurrent::ring_buffer::{
buffer_descriptor, record_descriptor, ManyToOneRingBuffer,
};
use crate::util::IndexT;
use std::mem::size_of;
#[test]
fn claim_capacity_basic() {
let buf_size = super::buffer_descriptor::TRAILER_LENGTH as usize + 64;
let mut buf = vec![0u8; buf_size];
let atomic_buf = AtomicBuffer::wrap(&mut buf);
let mut ring_buf = ManyToOneRingBuffer::wrap(atomic_buf).unwrap();
ring_buf.claim_capacity(16).unwrap();
assert_eq!(
ring_buf
.buffer
.get_i64_volatile(ring_buf.tail_position_index),
Ok(16)
);
let write_start = ring_buf.claim_capacity(16).unwrap();
assert_eq!(write_start, 16);
}
#[test]
fn write_basic() {
let mut bytes = vec![0u8; 512 + buffer_descriptor::TRAILER_LENGTH as usize];
let buffer = AtomicBuffer::wrap(&mut bytes);
let mut ring_buffer = ManyToOneRingBuffer::wrap(buffer).expect("Invalid buffer size");
let mut source_bytes = [12, 0, 0, 0, 0, 0, 0, 0];
let source_len = source_bytes.len() as IndexT;
let source_buffer = AtomicBuffer::wrap(&mut source_bytes);
let type_id = 1;
ring_buffer
.write(type_id, &source_buffer, 0, source_len)
.unwrap();
drop(ring_buffer);
let buffer = AtomicBuffer::wrap(&mut bytes);
let record_len = source_len + record_descriptor::HEADER_LENGTH;
assert_eq!(
buffer.get_i64_volatile(0).unwrap(),
record_descriptor::make_header(record_len, type_id)
);
assert_eq!(
buffer.get_i64_volatile(size_of::<i64>() as IndexT).unwrap(),
12
);
}
}

37
src/client/context.rs Normal file
View File

@ -0,0 +1,37 @@
//! Client library for Aeron. This encapsulates the logic needed to communicate
//! with the media driver, but does not manage the media driver itself.
use std::env;
use std::path::PathBuf;
/// Context used to initialize the Aeron client
pub struct ClientContext {
_aeron_dir: PathBuf,
}
impl ClientContext {
fn get_user_name() -> String {
env::var("USER")
.or_else(|_| env::var("USERNAME"))
.unwrap_or_else(|_| "default".to_string())
}
/// Get the default folder used by the Media Driver to interact with clients
pub fn default_aeron_path() -> PathBuf {
let base_path = if cfg!(target_os = "linux") {
PathBuf::from("/dev/shm")
} else {
// Uses TMPDIR on Unix-like and GetTempPath on Windows
env::temp_dir()
};
base_path.join(format!("aeron-{}", ClientContext::get_user_name()))
}
}
impl Default for ClientContext {
fn default() -> Self {
ClientContext {
_aeron_dir: ClientContext::default_aeron_path(),
}
}
}

View File

@ -0,0 +1,12 @@
//! Proxy object for interacting with the Media Driver. Handles operations
//! involving the command-and-control file protocol.
use crate::client::concurrent::ring_buffer::ManyToOneRingBuffer;
/// Proxy object for operations involving the Media Driver
pub struct DriverProxy<'a> {
_to_driver: ManyToOneRingBuffer<'a>,
_client_id: i64,
}
impl<'a> DriverProxy<'a> {}

7
src/client/mod.rs Normal file
View File

@ -0,0 +1,7 @@
//! Aeron client
//!
//! These are the modules necessary to construct a functioning Aeron client
pub mod cnc_descriptor;
pub mod concurrent;
pub mod context;
pub mod driver_proxy;

View File

@ -1,44 +0,0 @@
use std::env;
use std::path::PathBuf;
const DEFAULT_MEDIA_DRIVER_TIMEOUT_MS: u16 = 10_000;
const DEFAULT_RESOURCE_LINGER_MS: u16 = 5_000;
pub struct Context {
aeron_dir: PathBuf,
media_driver_timeout_ms: i32,
resource_linger_timeout_ms: i32,
use_conductor_agent_invoker: bool,
pre_touch_mapped_memory: bool,
}
impl Context {
pub fn get_user_name() -> String {
env::var("USER")
.or_else(|_| env::var("USERNAME"))
.unwrap_or("default".to_string())
}
pub fn default_aeron_path() -> PathBuf {
let base_path = if cfg!(target_os = "linux") {
PathBuf::from("/dev/shm")
} else {
// Uses TMPDIR on Unix-like, and GetTempPath on Windows
env::temp_dir()
};
base_path.join(format!("aeron-{}", Context::get_user_name()))
}
}
impl Default for Context {
fn default() -> Self {
Context {
aeron_dir: Context::default_aeron_path(),
media_driver_timeout_ms: DEFAULT_MEDIA_DRIVER_TIMEOUT_MS.into(),
resource_linger_timeout_ms: DEFAULT_RESOURCE_LINGER_MS.into(),
use_conductor_agent_invoker: false,
pre_touch_mapped_memory: false,
}
}
}

108
src/control_protocol.rs Normal file
View File

@ -0,0 +1,108 @@
//! Utilities for interacting with the control protocol of the Media Driver
use aeron_driver_sys::*;
/// Construct a C-compatible enum out of a set of constants.
/// Commonly used for types in Aeron that have fixed values via `#define`,
/// but aren't actually enums (e.g. AERON_COMMAND_.*, AERON_ERROR_CODE_.*).
/// Behavior is ultimately very similar to `num::FromPrimitive`.
macro_rules! define_enum {
(
$(#[$outer:meta])*
pub enum $name:ident {$(
$(#[$inner:meta]),*
$left:ident = $right:ident,
)+}
) => {
#[repr(u32)]
#[derive(Debug, PartialEq)]
$(#[$outer])*
pub enum $name {$(
$(#[$inner])*
$left = $right,
)*}
impl ::std::convert::TryFrom<u32> for $name {
type Error = ();
fn try_from(val: u32) -> Result<$name, ()> {
match val {
$(v if v == $name::$left as u32 => Ok($name::$left)),*,
_ => Err(())
}
}
}
}
}
define_enum!(
#[doc = "Commands sent from clients to the Media Driver"]
pub enum ClientCommand {
#[doc = "Add a Publication"]
AddPublication = AERON_COMMAND_ADD_PUBLICATION,
#[doc = "Remove a Publication"]
RemovePublication = AERON_COMMAND_REMOVE_PUBLICATION,
#[doc = "Add an Exclusive Publication"]
AddExclusivePublication = AERON_COMMAND_ADD_EXCLUSIVE_PUBLICATION,
#[doc = "Add a Subscriber"]
AddSubscription = AERON_COMMAND_ADD_SUBSCRIPTION,
#[doc = "Remove a Subscriber"]
RemoveSubscription = AERON_COMMAND_REMOVE_SUBSCRIPTION,
#[doc = "Keepalaive from Client"]
ClientKeepalive = AERON_COMMAND_CLIENT_KEEPALIVE,
#[doc = "Add Destination to an existing Publication"]
AddDestination = AERON_COMMAND_ADD_DESTINATION,
#[doc = "Remove Destination from an existing Publication"]
RemoveDestination = AERON_COMMAND_REMOVE_DESTINATION,
#[doc = "Add a Counter to the counters manager"]
AddCounter = AERON_COMMAND_ADD_COUNTER,
#[doc = "Remove a Counter from the counters manager"]
RemoveCounter = AERON_COMMAND_REMOVE_COUNTER,
#[doc = "Close indication from Client"]
ClientClose = AERON_COMMAND_CLIENT_CLOSE,
#[doc = "Add Destination for existing Subscription"]
AddRcvDestination = AERON_COMMAND_ADD_RCV_DESTINATION,
#[doc = "Remove Destination for existing Subscription"]
RemoveRcvDestination = AERON_COMMAND_REMOVE_RCV_DESTINATION,
#[doc = "Request the driver to terminate"]
TerminateDriver = AERON_COMMAND_TERMINATE_DRIVER,
}
);
define_enum!(
#[doc = "Responses from the Media Driver to client commands"]
pub enum DriverResponse {
#[doc = "Error Response as a result of attempting to process a client command operation"]
OnError = AERON_RESPONSE_ON_ERROR,
#[doc = "Subscribed Image buffers are available notification"]
OnAvailableImage = AERON_RESPONSE_ON_AVAILABLE_IMAGE,
#[doc = "New Publication buffers are ready notification"]
OnPublicationReady = AERON_RESPONSE_ON_PUBLICATION_READY,
#[doc = "Operation has succeeded"]
OnOperationSuccess = AERON_RESPONSE_ON_OPERATION_SUCCESS,
#[doc = "Inform client of timeout and removal of an inactive Image"]
OnUnavailableImage = AERON_RESPONSE_ON_UNAVAILABLE_IMAGE,
#[doc = "New Exclusive Publication buffers are ready notification"]
OnExclusivePublicationReady = AERON_RESPONSE_ON_EXCLUSIVE_PUBLICATION_READY,
#[doc = "New Subscription is ready notification"]
OnSubscriptionReady = AERON_RESPONSE_ON_SUBSCRIPTION_READY,
#[doc = "New counter is ready notification"]
OnCounterReady = AERON_RESPONSE_ON_COUNTER_READY,
#[doc = "Inform clients of removal of counter"]
OnUnavailableCounter = AERON_RESPONSE_ON_UNAVAILABLE_COUNTER,
#[doc = "Inform clients of client timeout"]
OnClientTimeout = AERON_RESPONSE_ON_CLIENT_TIMEOUT,
}
);
#[cfg(test)]
mod tests {
use crate::control_protocol::ClientCommand;
use std::convert::TryInto;
#[test]
fn client_command_convert() {
assert_eq!(
Ok(ClientCommand::AddPublication),
::aeron_driver_sys::AERON_COMMAND_ADD_PUBLICATION.try_into()
)
}
}

224
src/driver.rs Normal file
View File

@ -0,0 +1,224 @@
//! Bindings for the C Media Driver
use std::ffi::{CStr, CString};
use std::path::Path;
use std::ptr;
use aeron_driver_sys::*;
use std::marker::PhantomData;
use std::mem::replace;
/// Error code and message returned by the Media Driver
#[derive(Debug, PartialEq)]
pub struct DriverError {
code: i32,
msg: String,
}
type Result<S> = std::result::Result<S, DriverError>;
macro_rules! aeron_op {
($op:expr) => {
if $op < 0 {
let code = ::aeron_driver_sys::aeron_errcode();
let msg = CStr::from_ptr(::aeron_driver_sys::aeron_errmsg())
.to_str()
.unwrap()
.to_string();
Err(DriverError { code, msg })
} else {
Ok(())
}
};
}
/// Context used to set up the Media Driver
#[derive(Default)]
pub struct DriverContext {
aeron_dir: Option<CString>,
dir_delete_on_start: Option<bool>,
}
impl DriverContext {
/// Set the Aeron directory path that will be used for storing the files
/// Aeron uses to communicate with clients.
pub fn set_aeron_dir(mut self, path: &Path) -> Self {
// UNWRAP: Fails only if the path is non-UTF8
let path_bytes = path.to_str().unwrap().as_bytes();
// UNWRAP: Fails only if there is a null byte in the provided path
let c_string = CString::new(path_bytes).unwrap();
self.aeron_dir = Some(c_string);
self
}
/// Set whether Aeron should attempt to delete the `aeron_dir` on startup
/// if it already exists. Aeron will attempt to remove the directory if true.
/// If `aeron_dir` is not set in the `DriverContext`, Aeron will still attempt
/// to remove the default Aeron directory.
pub fn set_dir_delete_on_start(mut self, delete: bool) -> Self {
self.dir_delete_on_start = Some(delete);
self
}
/// Construct a Media Driver given the context options
pub fn build(mut self) -> Result<MediaDriver<DriverInitialized>> {
let mut driver = MediaDriver {
c_context: ptr::null_mut(),
c_driver: ptr::null_mut(),
_state: PhantomData,
};
unsafe { aeron_op!(aeron_driver_context_init(&mut driver.c_context)) }?;
self.aeron_dir.take().map(|dir| unsafe {
aeron_op!(aeron_driver_context_set_dir(
driver.c_context,
dir.into_raw()
))
});
self.dir_delete_on_start.take().map(|delete| unsafe {
aeron_op!(aeron_driver_context_set_dir_delete_on_start(
driver.c_context,
delete
))
});
unsafe { aeron_op!(aeron_driver_init(&mut driver.c_driver, driver.c_context)) }?;
Ok(driver)
}
}
/// Holder object to interface with the Media Driver
#[derive(Debug)]
pub struct MediaDriver<S> {
c_context: *mut aeron_driver_context_t,
c_driver: *mut aeron_driver_t,
_state: PhantomData<S>,
}
/// Marker type for a MediaDriver that has yet to be started
#[derive(Debug)]
pub struct DriverInitialized;
/// Marker type for a MediaDriver that has been started
#[derive(Debug)]
pub struct DriverStarted;
impl<S> MediaDriver<S> {
/// Retrieve the C library version in (major, minor, patch) format
pub fn driver_version() -> (u32, u32, u32) {
unsafe {
(
aeron_version_major() as u32,
aeron_version_minor() as u32,
aeron_version_patch() as u32,
)
}
}
}
impl MediaDriver<DriverInitialized> {
/// Set up a new Media Driver with default options
pub fn new() -> Result<Self> {
DriverContext::default().build()
}
/// Start the Media Driver threads; does not take control of the current thread
pub fn start(mut self) -> Result<MediaDriver<DriverStarted>> {
unsafe { aeron_op!(aeron_driver_start(self.c_driver, true)) }?;
// Move the driver and context references so the drop of `self` can't trigger issues
// when the new media driver is also eventually dropped
let c_driver = replace(&mut self.c_driver, ptr::null_mut());
let c_context = replace(&mut self.c_context, ptr::null_mut());
Ok(MediaDriver {
c_driver,
c_context,
_state: PhantomData,
})
}
}
impl MediaDriver<DriverStarted> {
/// Perform a single idle cycle of the Media Driver; does not take control of
/// the current thread
pub fn do_work(&self) {
unsafe {
aeron_driver_main_idle_strategy(self.c_driver, aeron_driver_main_do_work(self.c_driver))
};
}
}
impl<S> Drop for MediaDriver<S> {
fn drop(&mut self) {
if !self.c_driver.is_null() {
unsafe { aeron_op!(aeron_driver_close(self.c_driver)) }.unwrap();
}
if !self.c_context.is_null() {
unsafe { aeron_op!(aeron_driver_context_close(self.c_context)) }.unwrap();
}
}
}
#[cfg(test)]
mod tests {
use crate::driver::{DriverContext, DriverError};
use std::ffi::CStr;
use tempfile::tempdir;
#[test]
fn multiple_startup_failure() {
let temp_dir = tempdir().unwrap();
let dir = temp_dir.path().to_path_buf();
temp_dir.close().unwrap();
let driver = DriverContext::default()
.set_aeron_dir(&dir)
.build()
.unwrap();
assert_eq!(
unsafe { CStr::from_ptr((*driver.c_context).aeron_dir) }.to_str(),
Ok(dir.to_str().unwrap())
);
drop(driver);
// Attempting to start a media driver twice in rapid succession is guaranteed
// cause an issue because the new media driver must wait for a heartbeat timeout.
let driver_res = DriverContext::default().set_aeron_dir(&dir).build();
// TODO: Why is the error message behavior different on Windows?
let expected_message = if cfg!(target_os = "windows") {
String::new()
} else {
format!("could not recreate aeron dir {}: ", dir.display())
};
assert!(driver_res.is_err());
assert_eq!(
driver_res.unwrap_err(),
DriverError {
code: 0,
msg: expected_message
}
);
}
#[test]
fn single_duty_cycle() {
let temp_dir = tempdir().unwrap();
let path = temp_dir.path().to_path_buf();
temp_dir.close().unwrap();
let driver = DriverContext::default()
.set_aeron_dir(&path)
.build()
.expect("Unable to create media driver")
.start()
.expect("Unable to start driver");
driver.do_work();
}
}

View File

@ -1,15 +1,24 @@
//! [Aeron](https://github.com/real-logic/aeron) client for Rust
#![deny(missing_docs)]
mod context;
#[cfg(target_endian = "big")]
compile_error!("Aeron is only supported on little-endian architectures");
/// Retrieve the C library version in (major, minor, patch) format
pub fn aeron_version() -> (u32, u32, u32) {
unsafe {
(
aeron_driver_sys::aeron_version_major() as u32,
aeron_driver_sys::aeron_version_minor() as u32,
aeron_driver_sys::aeron_version_patch() as u32,
)
pub mod client;
pub mod control_protocol;
pub mod driver;
pub mod util;
const fn sematic_version_compose(major: u8, minor: u8, patch: u8) -> i32 {
(major as i32) << 16 | (minor as i32) << 8 | (patch as i32)
}
#[cfg(test)]
mod tests {
use crate::sematic_version_compose;
#[test]
fn version_compose_cnc() {
assert_eq!(sematic_version_compose(0, 0, 16), 16);
}
}

64
src/util.rs Normal file
View File

@ -0,0 +1,64 @@
//! Various utility and helper bits for the Aeron client. Predominantly helpful
//! in mapping between concepts in the C++ API and Rust
/// Helper type to indicate indexing operations in Aeron, Synonymous with the
/// Aeron C++ `index_t` type. Used to imitate the Java API.
// QUESTION: Can this just be updated to be `usize` in Rust?
pub type IndexT = i32;