diff --git a/Cargo.toml b/Cargo.toml index 7d82096..b999168 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,7 +13,10 @@ maintenance = { status = "actively-developed" } [dependencies] aeron_driver-sys = { path = "./aeron_driver-sys" } +memmap = "0.7" +num = "0.2" [dev-dependencies] clap = "2.33" ctrlc = "3.1.3" +tempfile = "3.1" diff --git a/aeron_driver-sys/bindings.h b/aeron_driver-sys/bindings.h index 39072ee..686e2df 100644 --- a/aeron_driver-sys/bindings.h +++ b/aeron_driver-sys/bindings.h @@ -1,3 +1,4 @@ #include -#include #include +#include +#include diff --git a/aeron_driver-sys/build.rs b/aeron_driver-sys/build.rs index a8fa6d2..451b4b2 100644 --- a/aeron_driver-sys/build.rs +++ b/aeron_driver-sys/build.rs @@ -97,6 +97,8 @@ pub fn main() { .header("bindings.h") .whitelist_function("aeron_.*") .whitelist_type("aeron_.*") + .whitelist_var("AERON_.*") + .constified_enum_module("aeron_.*_enum") .generate() .expect("Unable to generate aeron_driver bindings"); diff --git a/examples/aeronmd.rs b/examples/aeronmd.rs index 9d91217..0cb7672 100644 --- a/examples/aeronmd.rs +++ b/examples/aeronmd.rs @@ -1,96 +1,22 @@ -//! Media driver startup example based on -//! [aeronmd.c](https://github.com/real-logic/aeron/blob/master/aeron-driver/src/main/c/aeronmd.c) -#![deny(missing_docs)] - -use aeron_driver_sys::*; -use clap; -use ctrlc; -use std::ffi::CStr; -use std::os::raw::c_void; -use std::ptr; +//! A version of the `aeronmd` runner program demonstrating the Rust wrappers +//! around Media Driver functionality. +use aeron_rs::driver::DriverContext; use std::sync::atomic::{AtomicBool, Ordering}; -static RUNNING: AtomicBool = AtomicBool::new(true); - -unsafe extern "C" fn termination_hook(_clientd: *mut c_void) { - RUNNING.store(false, Ordering::SeqCst); -} +static RUNNING: AtomicBool = AtomicBool::new(false); fn main() { - let version = unsafe { CStr::from_ptr(aeron_version_full()) }; - let _cmdline = clap::App::new("aeronmd") - .version(version.to_str().unwrap()) - .get_matches(); + let driver = DriverContext::default() + .build() + .expect("Unable to create media driver"); - // TODO: Handle -D switches + let driver = driver.start().expect("Unable to start media driver"); + RUNNING.store(true, Ordering::SeqCst); - ctrlc::set_handler(move || { - // TODO: Actually understand atomic ordering - RUNNING.store(false, Ordering::SeqCst); - }) - .unwrap(); + println!("Press Ctrl-C to quit"); - let mut init_success = true; - let mut context: *mut aeron_driver_context_t = ptr::null_mut(); - let mut driver: *mut aeron_driver_t = ptr::null_mut(); - - if init_success { - let context_init = unsafe { aeron_driver_context_init(&mut context) }; - if context_init < 0 { - let err_code = unsafe { aeron_errcode() }; - let err_str = unsafe { CStr::from_ptr(aeron_errmsg()) }.to_str().unwrap(); - eprintln!("ERROR: context init ({}) {}", err_code, err_str); - init_success = false; - } + while RUNNING.load(Ordering::SeqCst) { + // TODO: Termination hook + driver.do_work(); } - - if init_success { - let term_hook = unsafe { - aeron_driver_context_set_driver_termination_hook( - context, - Some(termination_hook), - ptr::null_mut(), - ) - }; - if term_hook < 0 { - let err_code = unsafe { aeron_errcode() }; - let err_str = unsafe { CStr::from_ptr(aeron_errmsg()) }.to_str().unwrap(); - eprintln!( - "ERROR: context set termination hook ({}) {}", - err_code, err_str - ); - init_success = false; - } - } - - if init_success { - let driver_init = unsafe { aeron_driver_init(&mut driver, context) }; - if driver_init < 0 { - let err_code = unsafe { aeron_errcode() }; - let err_str = unsafe { CStr::from_ptr(aeron_errmsg()) }.to_str().unwrap(); - eprintln!("ERROR: driver init ({}) {}", err_code, err_str); - init_success = false; - } - } - - if init_success { - let driver_start = unsafe { aeron_driver_start(driver, true) }; - if driver_start < 0 { - let err_code = unsafe { aeron_errcode() }; - let err_str = unsafe { CStr::from_ptr(aeron_errmsg()) }.to_str().unwrap(); - eprintln!("ERROR: driver start ({}) {}", err_code, err_str); - init_success = false; - } - } - - if init_success { - println!("Press Ctrl-C to exit."); - - while RUNNING.load(Ordering::SeqCst) { - unsafe { aeron_driver_main_idle_strategy(driver, aeron_driver_main_do_work(driver)) }; - } - } - - unsafe { aeron_driver_close(driver) }; - unsafe { aeron_driver_context_close(context) }; } diff --git a/examples/aeronmd_sys.rs b/examples/aeronmd_sys.rs new file mode 100644 index 0000000..a3dcb5f --- /dev/null +++ b/examples/aeronmd_sys.rs @@ -0,0 +1,97 @@ +//! Media driver startup example based on +//! [aeronmd.c](https://github.com/real-logic/aeron/blob/master/aeron-driver/src/main/c/aeronmd.c) +//! This example demonstrates direct usage of the -sys bindings for the Media Driver API. + +use aeron_driver_sys::*; +use clap; +use ctrlc; +use std::ffi::CStr; +use std::os::raw::c_void; +use std::ptr; +use std::sync::atomic::{AtomicBool, Ordering}; + +static RUNNING: AtomicBool = AtomicBool::new(true); + +unsafe extern "C" fn termination_hook(_clientd: *mut c_void) { + println!("Terminated"); + RUNNING.store(false, Ordering::SeqCst); +} + +fn main() { + let version = unsafe { CStr::from_ptr(aeron_version_full()) }; + let _cmdline = clap::App::new("aeronmd") + .version(version.to_str().unwrap()) + .get_matches(); + + // TODO: Handle -D switches + + ctrlc::set_handler(move || { + // TODO: Actually understand atomic ordering + RUNNING.store(false, Ordering::SeqCst); + }) + .unwrap(); + + let mut init_success = true; + let mut context: *mut aeron_driver_context_t = ptr::null_mut(); + let mut driver: *mut aeron_driver_t = ptr::null_mut(); + + if init_success { + let context_init = unsafe { aeron_driver_context_init(&mut context) }; + if context_init < 0 { + let err_code = unsafe { aeron_errcode() }; + let err_str = unsafe { CStr::from_ptr(aeron_errmsg()) }.to_str().unwrap(); + eprintln!("ERROR: context init ({}) {}", err_code, err_str); + init_success = false; + } + } + + if init_success { + let term_hook = unsafe { + aeron_driver_context_set_driver_termination_hook( + context, + Some(termination_hook), + ptr::null_mut(), + ) + }; + if term_hook < 0 { + let err_code = unsafe { aeron_errcode() }; + let err_str = unsafe { CStr::from_ptr(aeron_errmsg()) }.to_str().unwrap(); + eprintln!( + "ERROR: context set termination hook ({}) {}", + err_code, err_str + ); + init_success = false; + } + } + + if init_success { + let driver_init = unsafe { aeron_driver_init(&mut driver, context) }; + if driver_init < 0 { + let err_code = unsafe { aeron_errcode() }; + let err_str = unsafe { CStr::from_ptr(aeron_errmsg()) }.to_str().unwrap(); + eprintln!("ERROR: driver init ({}) {}", err_code, err_str); + init_success = false; + } + } + + if init_success { + let driver_start = unsafe { aeron_driver_start(driver, true) }; + if driver_start < 0 { + let err_code = unsafe { aeron_errcode() }; + let err_str = unsafe { CStr::from_ptr(aeron_errmsg()) }.to_str().unwrap(); + eprintln!("ERROR: driver start ({}) {}", err_code, err_str); + init_success = false; + } + } + + if init_success { + println!("Press Ctrl-C to exit."); + + while RUNNING.load(Ordering::SeqCst) { + unsafe { aeron_driver_main_idle_strategy(driver, aeron_driver_main_do_work(driver)) }; + } + } + + unsafe { aeron_driver_close(driver) }; + unsafe { aeron_driver_context_close(context) }; +} diff --git a/src/client/cnc_descriptor.rs b/src/client/cnc_descriptor.rs new file mode 100644 index 0000000..4756f92 --- /dev/null +++ b/src/client/cnc_descriptor.rs @@ -0,0 +1,109 @@ +//! Description of the command and control file used to communicate between the Media Driver +//! and its clients. +//! +//! File layout: +//! +//! ```text +//! +-----------------------------+ +//! | Meta Data | +//! +-----------------------------+ +//! | to-driver Buffer | +//! +-----------------------------+ +//! | to-clients Buffer | +//! +-----------------------------+ +//! | Counters Metadata Buffer | +//! +-----------------------------+ +//! | Counters Values Buffer | +//! +-----------------------------+ +//! | Error Log | +//! +-----------------------------+ +//! ``` + +use crate::util::bit; +use std::mem::size_of; + +/// The CnC file metadata header. Layout: +/// +/// ```text +/// 0 1 2 3 +/// 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 +/// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +/// | Aeron CnC Version | +/// +---------------------------------------------------------------+ +/// | to-driver buffer length | +/// +---------------------------------------------------------------+ +/// | to-clients buffer length | +/// +---------------------------------------------------------------+ +/// | Counters Metadata buffer length | +/// +---------------------------------------------------------------+ +/// | Counters Values buffer length | +/// +---------------------------------------------------------------+ +/// | Error Log buffer length | +/// +---------------------------------------------------------------+ +/// | Client Liveness Timeout | +/// | | +/// +---------------------------------------------------------------+ +/// | Driver Start Timestamp | +/// | | +/// +---------------------------------------------------------------+ +/// | Driver PID | +/// | | +/// +---------------------------------------------------------------+ +/// ``` +#[repr(C, align(4))] +pub struct MetaDataDefinition { + cnc_version: i32, + /// Size of the buffer containing data going to the media driver + pub to_driver_buffer_length: i32, + _to_client_buffer_length: i32, + _counter_metadata_buffer_length: i32, + _counter_values_buffer_length: i32, + _error_log_buffer_length: i32, + _client_liveness_timeout: i64, + _start_timestamp: i64, + _pid: i64, +} + +/// Length of the metadata block in a CnC file. Note that it's not equivalent +/// to the actual struct length. +pub const META_DATA_LENGTH: usize = + bit::align_usize(size_of::(), bit::CACHE_LINE_LENGTH * 2); + +/// Version code for the Aeron CnC file format +pub const CNC_VERSION: i32 = crate::sematic_version_compose(0, 0, 16); + +/// Filename for the CnC file located in the Aeron directory +pub const CNC_FILE: &str = "cnc.dat"; + +#[cfg(test)] +mod tests { + use crate::client::cnc_descriptor::{MetaDataDefinition, CNC_FILE, CNC_VERSION}; + use crate::driver::DriverContext; + use memmap::MmapOptions; + use std::fs::File; + use tempfile::tempdir; + + #[test] + fn read_cnc_version() { + let temp_dir = tempdir().unwrap(); + let dir = temp_dir.path().to_path_buf(); + temp_dir.close().unwrap(); + + let _driver = DriverContext::default() + .set_aeron_dir(&dir) + .build() + .unwrap(); + + // Open the CnC location + let cnc_path = dir.join(CNC_FILE); + let cnc_file = File::open(&cnc_path).expect("Unable to open CnC file"); + let mmap = unsafe { + MmapOptions::default() + .map(&cnc_file) + .expect("Unable to memory map CnC file") + }; + + let metadata: &MetaDataDefinition = unsafe { &*(mmap.as_ptr().cast()) }; + assert_eq!(metadata.cnc_version, CNC_VERSION); + } +} diff --git a/src/client/concurrent/atomic_buffer.rs b/src/client/concurrent/atomic_buffer.rs new file mode 100644 index 0000000..f96b929 --- /dev/null +++ b/src/client/concurrent/atomic_buffer.rs @@ -0,0 +1,252 @@ +//! Buffer that is safe to use in a multi-process/multi-thread context. Typically used for +//! handling atomic updates of memory-mapped buffers. +use std::mem::size_of; +use std::ops::Deref; +use std::sync::atomic::{AtomicI64, Ordering}; + +use crate::util::{AeronError, IndexT, Result}; +use std::ptr::{read_volatile, write_volatile}; + +/// Wrapper for atomic operations around an underlying byte buffer +pub struct AtomicBuffer<'a> { + buffer: &'a mut [u8], +} + +impl<'a> Deref for AtomicBuffer<'a> { + type Target = [u8]; + + fn deref(&self) -> &Self::Target { + self.buffer + } +} + +impl<'a> AtomicBuffer<'a> { + /// Create an `AtomicBuffer` as a view on an underlying byte slice + pub fn wrap(buffer: &'a mut [u8]) -> Self { + AtomicBuffer { buffer } + } + + fn bounds_check(&self, offset: IndexT, size: IndexT) -> Result<()> { + if offset < 0 || size < 0 || self.buffer.len() as IndexT - offset < size { + Err(AeronError::OutOfBounds) + } else { + Ok(()) + } + } + + /// Overlay a struct on a buffer. + /// + /// NOTE: Has the potential to cause undefined behavior if alignment is incorrect. + pub fn overlay(&self, offset: IndexT) -> Result<&T> + where + T: Sized, + { + self.bounds_check(offset, size_of::() as IndexT) + .map(|_| { + let offset_ptr = unsafe { self.buffer.as_ptr().offset(offset as isize) }; + unsafe { &*(offset_ptr as *const T) } + }) + } + + fn overlay_volatile(&self, offset: IndexT) -> Result + where + T: Copy, + { + self.bounds_check(offset, size_of::() as IndexT) + .map(|_| { + let offset_ptr = unsafe { self.buffer.as_ptr().offset(offset as isize) }; + unsafe { read_volatile(offset_ptr as *const T) } + }) + } + + fn write_volatile(&mut self, offset: IndexT, val: T) -> Result<()> + where + T: Copy, + { + self.bounds_check(offset, size_of::() as IndexT) + .map(|_| { + let offset_ptr = unsafe { self.buffer.as_mut_ptr().offset(offset as isize) }; + unsafe { write_volatile(offset_ptr as *mut T, val) }; + }) + } + + /// Atomically fetch the current value at an offset, and increment by delta + /// + /// ```rust + /// # use aeron_rs::client::concurrent::atomic_buffer::AtomicBuffer; + /// # use aeron_rs::util::AeronError; + /// let mut bytes = [0u8; 9]; + /// let mut buffer = AtomicBuffer::wrap(&mut bytes); + /// + /// // Simple case modifies only the first byte + /// assert_eq!(buffer.get_and_add_i64(0, 1), Ok(0)); + /// assert_eq!(buffer.get_and_add_i64(0, 0), Ok(1)); + /// + /// // Using an offset modifies the second byte + /// assert_eq!(buffer.get_and_add_i64(1, 1), Ok(0)); + /// assert_eq!(buffer.get_and_add_i64(1, 0), Ok(1)); + /// + /// // An offset of 2 means buffer size must be 10 to contain an `i64` + /// assert_eq!(buffer.get_and_add_i64(2, 0), Err(AeronError::OutOfBounds)); + /// ``` + pub fn get_and_add_i64(&self, offset: IndexT, delta: i64) -> Result { + self.overlay::(offset) + .map(|a| a.fetch_add(delta, Ordering::SeqCst)) + } + + /// Perform a volatile read + /// + /// ```rust + /// # use aeron_rs::client::concurrent::atomic_buffer::AtomicBuffer; + /// let mut bytes = [12, 0, 0, 0, 0, 0, 0, 0]; + /// let buffer = AtomicBuffer::wrap(&mut bytes); + /// + /// assert_eq!(buffer.get_i64_volatile(0), Ok(12)); + /// ``` + pub fn get_i64_volatile(&self, offset: IndexT) -> Result { + // QUESTION: Would it be better to express this in terms of an atomic read? + self.overlay_volatile::(offset) + } + + /// Perform a volatile read + /// + /// ```rust + /// # use aeron_rs::client::concurrent::atomic_buffer::AtomicBuffer; + /// let mut bytes = [12, 0, 0, 0]; + /// let buffer = AtomicBuffer::wrap(&mut bytes); + /// + /// assert_eq!(buffer.get_i32_volatile(0), Ok(12)); + /// ``` + pub fn get_i32_volatile(&self, offset: IndexT) -> Result { + self.overlay_volatile::(offset) + } + + /// Perform a volatile write of an `i64` into the buffer + /// + /// ```rust + /// # use aeron_rs::client::concurrent::atomic_buffer::AtomicBuffer; + /// let mut bytes = [0u8; 8]; + /// let mut buffer = AtomicBuffer::wrap(&mut bytes); + /// + /// buffer.put_i64_ordered(0, 12); + /// assert_eq!(buffer.get_i64_volatile(0), Ok(12)); + /// ``` + pub fn put_i64_ordered(&mut self, offset: IndexT, val: i64) -> Result<()> { + // QUESTION: Would it be better to have callers use `write_volatile` directly + self.write_volatile::(offset, val) + } + + /// Perform a volatile write of an `i32` into the buffer + /// + /// ```rust + /// # use aeron_rs::client::concurrent::atomic_buffer::AtomicBuffer; + /// let mut bytes = [0u8; 4]; + /// let mut buffer = AtomicBuffer::wrap(&mut bytes); + /// + /// buffer.put_i32_ordered(0, 12); + /// assert_eq!(buffer.get_i32_volatile(0), Ok(12)); + /// ``` + pub fn put_i32_ordered(&mut self, offset: IndexT, val: i32) -> Result<()> { + // QUESTION: Would it be better to have callers use `write_volatile` directly + self.write_volatile::(offset, val) + } + + /// Write the contents of one buffer to another. Does not perform any synchronization. + /// + /// ```rust + /// # use aeron_rs::client::concurrent::atomic_buffer::AtomicBuffer; + /// let mut source_bytes = [1u8, 2, 3, 4]; + /// let source = AtomicBuffer::wrap(&mut source_bytes); + /// + /// let mut dest_bytes = [0, 0, 0, 0]; + /// let mut dest = AtomicBuffer::wrap(&mut dest_bytes); + /// + /// dest.put_bytes(1, &source, 1, 3); + /// drop(dest); + /// assert_eq!(dest_bytes, [0u8, 2, 3, 4]); + /// ``` + pub fn put_bytes( + &mut self, + index: IndexT, + source: &AtomicBuffer, + source_index: IndexT, + len: IndexT, + ) -> Result<()> { + self.bounds_check(index, len)?; + source.bounds_check(source_index, len)?; + + let index = index as usize; + let source_index = source_index as usize; + let len = len as usize; + self.buffer[index..index + len].copy_from_slice(&source[source_index..source_index + len]); + Ok(()) + } + + /// Compare an expected value with what is in memory, and if it matches, + /// update to a new value. Returns `Ok(true)` if the update was successful, + /// and `Ok(false)` if the update failed. + /// + /// ```rust + /// # use aeron_rs::client::concurrent::atomic_buffer::AtomicBuffer; + /// let mut buf = [0u8; 8]; + /// let atomic_buf = AtomicBuffer::wrap(&mut buf); + /// // Set value to 1 + /// atomic_buf.get_and_add_i64(0, 1).unwrap(); + /// + /// // Set value to 1 if existing value is 0 + /// assert_eq!(atomic_buf.compare_and_set_i64(0, 0, 1), Ok(false)); + /// // Set value to 2 if existing value is 1 + /// assert_eq!(atomic_buf.compare_and_set_i64(0, 1, 2), Ok(true)); + /// assert_eq!(atomic_buf.get_i64_volatile(0), Ok(2)); + /// ``` + pub fn compare_and_set_i64(&self, offset: IndexT, expected: i64, update: i64) -> Result { + // QUESTION: Do I need a volatile and atomic read here? + // Aeron C++ uses a volatile read before the atomic operation, but I think that + // may be redundant. In addition, Rust's `read_volatile` operation returns a + // *copied* value; running `compare_exchange` on that copy introduces a race condition + // because we're no longer comparing a consistent address. + self.overlay::(offset).map(|a| { + a.compare_exchange(expected, update, Ordering::SeqCst, Ordering::SeqCst) + .is_ok() + }) + } +} + +#[cfg(test)] +mod tests { + use memmap::MmapOptions; + use std::sync::atomic::{AtomicU64, Ordering}; + + use crate::client::concurrent::atomic_buffer::AtomicBuffer; + use crate::util::AeronError; + + #[test] + fn mmap_to_atomic() { + let mut mmap = MmapOptions::new() + .len(24) + .map_anon() + .expect("Unable to map anonymous memory"); + AtomicBuffer::wrap(&mut mmap); + } + + #[test] + fn primitive_atomic_equivalent() { + let value: u64 = 24; + + let val_ptr = &value as *const u64; + let a_ptr = val_ptr as *const AtomicU64; + let a: &AtomicU64 = unsafe { &*a_ptr }; + + assert_eq!(value, (*a).load(Ordering::SeqCst)); + } + + #[test] + fn negative_offset() { + let mut buf = [16, 0, 0, 0, 0, 0, 0, 0]; + let atomic_buf = AtomicBuffer::wrap(&mut buf); + assert_eq!( + atomic_buf.get_and_add_i64(-1, 0), + Err(AeronError::OutOfBounds) + ) + } +} diff --git a/src/client/concurrent/mod.rs b/src/client/concurrent/mod.rs new file mode 100644 index 0000000..06eec65 --- /dev/null +++ b/src/client/concurrent/mod.rs @@ -0,0 +1,5 @@ +//! Module for handling safe interactions among the multiple clients making use +//! of a single Media Driver + +pub mod atomic_buffer; +pub mod ring_buffer; diff --git a/src/client/concurrent/ring_buffer.rs b/src/client/concurrent/ring_buffer.rs new file mode 100644 index 0000000..fbc0982 --- /dev/null +++ b/src/client/concurrent/ring_buffer.rs @@ -0,0 +1,340 @@ +//! Ring buffer wrapper for communicating with the Media Driver +use crate::client::concurrent::atomic_buffer::AtomicBuffer; +use crate::util::{bit, AeronError, IndexT, Result}; + +/// Description of the Ring Buffer schema. +pub mod buffer_descriptor { + use crate::client::concurrent::atomic_buffer::AtomicBuffer; + use crate::util::bit::{is_power_of_two, CACHE_LINE_LENGTH}; + use crate::util::AeronError::IllegalArgument; + use crate::util::{IndexT, Result}; + + // QUESTION: Why are these offsets so large when we only ever use i64 types? + + /// Offset in the ring buffer metadata to the end of the most recent record. + pub const TAIL_POSITION_OFFSET: IndexT = (CACHE_LINE_LENGTH * 2) as IndexT; + + /// QUESTION: Why the distinction between HEAD_CACHE and HEAD? + pub const HEAD_CACHE_POSITION_OFFSET: IndexT = (CACHE_LINE_LENGTH * 4) as IndexT; + + /// Offset in the ring buffer metadata to index of the next record to read. + pub const HEAD_POSITION_OFFSET: IndexT = (CACHE_LINE_LENGTH * 6) as IndexT; + + /// Offset of the correlation id counter, as measured in bytes past + /// the start of the ring buffer metadata trailer. + pub const CORRELATION_COUNTER_OFFSET: IndexT = (CACHE_LINE_LENGTH * 8) as IndexT; + + /// Total size of the ring buffer metadata trailer. + pub const TRAILER_LENGTH: IndexT = (CACHE_LINE_LENGTH * 12) as IndexT; + + /// Verify the capacity of a buffer is legal for use as a ring buffer. + /// Returns the actual capacity excluding ring buffer metadata. + pub fn check_capacity(buffer: &AtomicBuffer<'_>) -> Result { + let capacity = (buffer.len() - TRAILER_LENGTH as usize) as IndexT; + if is_power_of_two(capacity) { + Ok(capacity) + } else { + Err(IllegalArgument) + } + } +} + +/// Ring buffer message header. Made up of fields for message length, message type, +/// and then the encoded message. +/// +/// Writing the record length signals the message recording is complete, and all +/// associated ring buffer metadata has been properly updated. +/// +/// ```text +/// 0 1 2 3 +/// 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 +/// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +/// |R| Record Length | +/// +-+-------------------------------------------------------------+ +/// | Type | +/// +---------------------------------------------------------------+ +/// | Encoded Message ... +///... | +/// +---------------------------------------------------------------+ +/// ``` +// QUESTION: What is the `R` bit in the diagram above? +pub mod record_descriptor { + use std::mem::size_of; + + use crate::util::Result; + use crate::util::{AeronError, IndexT}; + + /// Size of the ring buffer record header. + pub const HEADER_LENGTH: IndexT = size_of::() as IndexT * 2; + + /// Alignment size of records written to the buffer + pub const ALIGNMENT: IndexT = HEADER_LENGTH; + + /// Message type indicating to the media driver that space has been reserved, + /// and is not yet ready for processing. + pub const PADDING_MSG_TYPE_ID: i32 = -1; + + /// Retrieve the header bits for a ring buffer record. + pub fn make_header(length: i32, msg_type_id: i32) -> i64 { + // QUESTION: Instead of masking, can't we just cast and return u32/u64? + // Smells like Java. + ((i64::from(msg_type_id) & 0xFFFF_FFFF) << 32) | (i64::from(length) & 0xFFFF_FFFF) + } + + /// Verify a message type identifier is safe for use + pub fn check_msg_type_id(msg_type_id: i32) -> Result<()> { + if msg_type_id < 1 { + Err(AeronError::IllegalArgument) + } else { + Ok(()) + } + } + + /// Fetch the offset to begin writing a message payload + pub fn encoded_msg_offset(record_offset: IndexT) -> IndexT { + record_offset + HEADER_LENGTH + } + + /// Fetch the offset to begin writing the message length + pub fn length_offset(record_offset: IndexT) -> IndexT { + record_offset + } +} + +/// Multi-producer, single-consumer ring buffer implementation. +pub struct ManyToOneRingBuffer<'a> { + buffer: AtomicBuffer<'a>, + capacity: IndexT, + max_msg_length: IndexT, + tail_position_index: IndexT, + head_cache_position_index: IndexT, + head_position_index: IndexT, + correlation_id_counter_index: IndexT, +} + +impl<'a> ManyToOneRingBuffer<'a> { + /// Create a many-to-one ring buffer from an underlying atomic buffer. + pub fn wrap(buffer: AtomicBuffer<'a>) -> Result { + buffer_descriptor::check_capacity(&buffer).map(|capacity| ManyToOneRingBuffer { + buffer, + capacity, + max_msg_length: capacity / 8, + tail_position_index: capacity + buffer_descriptor::TAIL_POSITION_OFFSET, + head_cache_position_index: capacity + buffer_descriptor::HEAD_CACHE_POSITION_OFFSET, + head_position_index: capacity + buffer_descriptor::HEAD_POSITION_OFFSET, + correlation_id_counter_index: capacity + buffer_descriptor::CORRELATION_COUNTER_OFFSET, + }) + } + + /// Atomically retrieve the next correlation identifier. Used as a unique identifier for + /// interactions with the Media Driver + pub fn next_correlation_id(&self) -> i64 { + // UNWRAP: Known-valid offset calculated during initialization + self.buffer + .get_and_add_i64(self.correlation_id_counter_index, 1) + .unwrap() + } + + /// Write a message into the ring buffer + pub fn write( + &mut self, + msg_type_id: i32, + source: &AtomicBuffer, + source_index: IndexT, + length: IndexT, + ) -> Result<()> { + record_descriptor::check_msg_type_id(msg_type_id)?; + self.check_msg_length(length)?; + + let record_len = length + record_descriptor::HEADER_LENGTH; + let required = bit::align(record_len, record_descriptor::ALIGNMENT); + let record_index = self.claim_capacity(required)?; + + // UNWRAP: `claim_capacity` performed bounds checking + self.buffer + .put_i64_ordered( + record_index, + record_descriptor::make_header(-length, msg_type_id), + ) + .unwrap(); + // UNWRAP: `claim_capacity` performed bounds checking + self.buffer + .put_bytes( + record_descriptor::encoded_msg_offset(record_index), + source, + source_index, + length, + ) + .unwrap(); + // UNWRAP: `claim_capacity` performed bounds checking + self.buffer + .put_i32_ordered(record_descriptor::length_offset(record_index), record_len) + .unwrap(); + + Ok(()) + } + + /// Claim capacity for a specific message size in the ring buffer. Returns the offset/index + /// at which to start writing the next record. + fn claim_capacity(&mut self, required: IndexT) -> Result { + // QUESTION: Is this mask how we handle the "ring" in ring buffer? + // Would explain why we assert buffer capacity is a power of two during initialization + let mask = self.capacity - 1; + + // UNWRAP: Known-valid offset calculated during initialization + let mut head = self + .buffer + .get_i64_volatile(self.head_cache_position_index) + .unwrap(); + + let mut tail: i64; + let mut tail_index: IndexT; + let mut padding: IndexT; + // Note the braces, making this a do-while loop + while { + // UNWRAP: Known-valid offset calculated during initialization + tail = self + .buffer + .get_i64_volatile(self.tail_position_index) + .unwrap(); + let available_capacity = self.capacity - (tail - head) as IndexT; + + if required > available_capacity { + // UNWRAP: Known-valid offset calculated during initialization + head = self + .buffer + .get_i64_volatile(self.head_position_index) + .unwrap(); + + if required > (self.capacity - (tail - head) as IndexT) { + return Err(AeronError::InsufficientCapacity); + } + + // UNWRAP: Known-valid offset calculated during initialization + self.buffer + .put_i64_ordered(self.head_cache_position_index, head) + .unwrap(); + } + + padding = 0; + + // Because we assume `tail` and `mask` are always positive integers, + // it's "safe" to widen the types and bitmask below. We're just trying + // to imitate C++ here. + tail_index = (tail & i64::from(mask)) as IndexT; + let to_buffer_end_length = self.capacity - tail_index; + + if required > to_buffer_end_length { + let mut head_index = (head & i64::from(mask)) as IndexT; + + if required > head_index { + // UNWRAP: Known-valid offset calculated during initialization + head = self + .buffer + .get_i64_volatile(self.head_position_index) + .unwrap(); + head_index = (head & i64::from(mask)) as IndexT; + + if required > head_index { + return Err(AeronError::InsufficientCapacity); + } + + // UNWRAP: Known-valid offset calculated during initialization + self.buffer + .put_i64_ordered(self.head_cache_position_index, head) + .unwrap(); + } + + padding = to_buffer_end_length; + } + + // UNWRAP: Known-valid offset calculated during initialization + !self + .buffer + .compare_and_set_i64( + self.tail_position_index, + tail, + tail + i64::from(required) + i64::from(padding), + ) + .unwrap() + } {} + + if padding != 0 { + // UNWRAP: Known-valid offset calculated during initialization + self.buffer + .put_i64_ordered( + tail_index, + record_descriptor::make_header(padding, record_descriptor::PADDING_MSG_TYPE_ID), + ) + .unwrap(); + tail_index = 0; + } + + Ok(tail_index) + } + + fn check_msg_length(&self, length: IndexT) -> Result<()> { + if length > self.max_msg_length { + Err(AeronError::IllegalArgument) + } else { + Ok(()) + } + } +} + +#[cfg(test)] +mod tests { + use crate::client::concurrent::atomic_buffer::AtomicBuffer; + use crate::client::concurrent::ring_buffer::{ + buffer_descriptor, record_descriptor, ManyToOneRingBuffer, + }; + use crate::util::IndexT; + use std::mem::size_of; + + #[test] + fn claim_capacity_basic() { + let buf_size = super::buffer_descriptor::TRAILER_LENGTH as usize + 64; + let mut buf = vec![0u8; buf_size]; + + let atomic_buf = AtomicBuffer::wrap(&mut buf); + let mut ring_buf = ManyToOneRingBuffer::wrap(atomic_buf).unwrap(); + + ring_buf.claim_capacity(16).unwrap(); + assert_eq!( + ring_buf + .buffer + .get_i64_volatile(ring_buf.tail_position_index), + Ok(16) + ); + + let write_start = ring_buf.claim_capacity(16).unwrap(); + assert_eq!(write_start, 16); + } + + #[test] + fn write_basic() { + let mut bytes = vec![0u8; 512 + buffer_descriptor::TRAILER_LENGTH as usize]; + let buffer = AtomicBuffer::wrap(&mut bytes); + let mut ring_buffer = ManyToOneRingBuffer::wrap(buffer).expect("Invalid buffer size"); + + let mut source_bytes = [12, 0, 0, 0, 0, 0, 0, 0]; + let source_len = source_bytes.len() as IndexT; + let source_buffer = AtomicBuffer::wrap(&mut source_bytes); + let type_id = 1; + ring_buffer + .write(type_id, &source_buffer, 0, source_len) + .unwrap(); + + drop(ring_buffer); + let buffer = AtomicBuffer::wrap(&mut bytes); + let record_len = source_len + record_descriptor::HEADER_LENGTH; + assert_eq!( + buffer.get_i64_volatile(0).unwrap(), + record_descriptor::make_header(record_len, type_id) + ); + assert_eq!( + buffer.get_i64_volatile(size_of::() as IndexT).unwrap(), + 12 + ); + } +} diff --git a/src/client/context.rs b/src/client/context.rs new file mode 100644 index 0000000..d011493 --- /dev/null +++ b/src/client/context.rs @@ -0,0 +1,37 @@ +//! Client library for Aeron. This encapsulates the logic needed to communicate +//! with the media driver, but does not manage the media driver itself. +use std::env; +use std::path::PathBuf; + +/// Context used to initialize the Aeron client +pub struct ClientContext { + _aeron_dir: PathBuf, +} + +impl ClientContext { + fn get_user_name() -> String { + env::var("USER") + .or_else(|_| env::var("USERNAME")) + .unwrap_or_else(|_| "default".to_string()) + } + + /// Get the default folder used by the Media Driver to interact with clients + pub fn default_aeron_path() -> PathBuf { + let base_path = if cfg!(target_os = "linux") { + PathBuf::from("/dev/shm") + } else { + // Uses TMPDIR on Unix-like and GetTempPath on Windows + env::temp_dir() + }; + + base_path.join(format!("aeron-{}", ClientContext::get_user_name())) + } +} + +impl Default for ClientContext { + fn default() -> Self { + ClientContext { + _aeron_dir: ClientContext::default_aeron_path(), + } + } +} diff --git a/src/client/driver_proxy.rs b/src/client/driver_proxy.rs new file mode 100644 index 0000000..b1a84e4 --- /dev/null +++ b/src/client/driver_proxy.rs @@ -0,0 +1,12 @@ +//! Proxy object for interacting with the Media Driver. Handles operations +//! involving the command-and-control file protocol. + +use crate::client::concurrent::ring_buffer::ManyToOneRingBuffer; + +/// Proxy object for operations involving the Media Driver +pub struct DriverProxy<'a> { + _to_driver: ManyToOneRingBuffer<'a>, + _client_id: i64, +} + +impl<'a> DriverProxy<'a> {} diff --git a/src/client/mod.rs b/src/client/mod.rs new file mode 100644 index 0000000..b00370e --- /dev/null +++ b/src/client/mod.rs @@ -0,0 +1,7 @@ +//! Aeron client +//! +//! These are the modules necessary to construct a functioning Aeron client +pub mod cnc_descriptor; +pub mod concurrent; +pub mod context; +pub mod driver_proxy; diff --git a/src/context.rs b/src/context.rs deleted file mode 100644 index 0c1e5a9..0000000 --- a/src/context.rs +++ /dev/null @@ -1,44 +0,0 @@ -use std::env; -use std::path::PathBuf; - -const DEFAULT_MEDIA_DRIVER_TIMEOUT_MS: u16 = 10_000; -const DEFAULT_RESOURCE_LINGER_MS: u16 = 5_000; - -pub struct Context { - aeron_dir: PathBuf, - media_driver_timeout_ms: i32, - resource_linger_timeout_ms: i32, - use_conductor_agent_invoker: bool, - pre_touch_mapped_memory: bool, -} - -impl Context { - pub fn get_user_name() -> String { - env::var("USER") - .or_else(|_| env::var("USERNAME")) - .unwrap_or("default".to_string()) - } - - pub fn default_aeron_path() -> PathBuf { - let base_path = if cfg!(target_os = "linux") { - PathBuf::from("/dev/shm") - } else { - // Uses TMPDIR on Unix-like, and GetTempPath on Windows - env::temp_dir() - }; - - base_path.join(format!("aeron-{}", Context::get_user_name())) - } -} - -impl Default for Context { - fn default() -> Self { - Context { - aeron_dir: Context::default_aeron_path(), - media_driver_timeout_ms: DEFAULT_MEDIA_DRIVER_TIMEOUT_MS.into(), - resource_linger_timeout_ms: DEFAULT_RESOURCE_LINGER_MS.into(), - use_conductor_agent_invoker: false, - pre_touch_mapped_memory: false, - } - } -} diff --git a/src/control_protocol.rs b/src/control_protocol.rs new file mode 100644 index 0000000..cb2e6a7 --- /dev/null +++ b/src/control_protocol.rs @@ -0,0 +1,108 @@ +//! Utilities for interacting with the control protocol of the Media Driver +use aeron_driver_sys::*; + +/// Construct a C-compatible enum out of a set of constants. +/// Commonly used for types in Aeron that have fixed values via `#define`, +/// but aren't actually enums (e.g. AERON_COMMAND_.*, AERON_ERROR_CODE_.*). +/// Behavior is ultimately very similar to `num::FromPrimitive`. +macro_rules! define_enum { + ( + $(#[$outer:meta])* + pub enum $name:ident {$( + $(#[$inner:meta]),* + $left:ident = $right:ident, + )+} + ) => { + #[repr(u32)] + #[derive(Debug, PartialEq)] + $(#[$outer])* + pub enum $name {$( + $(#[$inner])* + $left = $right, + )*} + + impl ::std::convert::TryFrom for $name { + type Error = (); + fn try_from(val: u32) -> Result<$name, ()> { + match val { + $(v if v == $name::$left as u32 => Ok($name::$left)),*, + _ => Err(()) + } + } + } + } +} + +define_enum!( + #[doc = "Commands sent from clients to the Media Driver"] + pub enum ClientCommand { + #[doc = "Add a Publication"] + AddPublication = AERON_COMMAND_ADD_PUBLICATION, + #[doc = "Remove a Publication"] + RemovePublication = AERON_COMMAND_REMOVE_PUBLICATION, + #[doc = "Add an Exclusive Publication"] + AddExclusivePublication = AERON_COMMAND_ADD_EXCLUSIVE_PUBLICATION, + #[doc = "Add a Subscriber"] + AddSubscription = AERON_COMMAND_ADD_SUBSCRIPTION, + #[doc = "Remove a Subscriber"] + RemoveSubscription = AERON_COMMAND_REMOVE_SUBSCRIPTION, + #[doc = "Keepalaive from Client"] + ClientKeepalive = AERON_COMMAND_CLIENT_KEEPALIVE, + #[doc = "Add Destination to an existing Publication"] + AddDestination = AERON_COMMAND_ADD_DESTINATION, + #[doc = "Remove Destination from an existing Publication"] + RemoveDestination = AERON_COMMAND_REMOVE_DESTINATION, + #[doc = "Add a Counter to the counters manager"] + AddCounter = AERON_COMMAND_ADD_COUNTER, + #[doc = "Remove a Counter from the counters manager"] + RemoveCounter = AERON_COMMAND_REMOVE_COUNTER, + #[doc = "Close indication from Client"] + ClientClose = AERON_COMMAND_CLIENT_CLOSE, + #[doc = "Add Destination for existing Subscription"] + AddRcvDestination = AERON_COMMAND_ADD_RCV_DESTINATION, + #[doc = "Remove Destination for existing Subscription"] + RemoveRcvDestination = AERON_COMMAND_REMOVE_RCV_DESTINATION, + #[doc = "Request the driver to terminate"] + TerminateDriver = AERON_COMMAND_TERMINATE_DRIVER, + } +); + +define_enum!( + #[doc = "Responses from the Media Driver to client commands"] + pub enum DriverResponse { + #[doc = "Error Response as a result of attempting to process a client command operation"] + OnError = AERON_RESPONSE_ON_ERROR, + #[doc = "Subscribed Image buffers are available notification"] + OnAvailableImage = AERON_RESPONSE_ON_AVAILABLE_IMAGE, + #[doc = "New Publication buffers are ready notification"] + OnPublicationReady = AERON_RESPONSE_ON_PUBLICATION_READY, + #[doc = "Operation has succeeded"] + OnOperationSuccess = AERON_RESPONSE_ON_OPERATION_SUCCESS, + #[doc = "Inform client of timeout and removal of an inactive Image"] + OnUnavailableImage = AERON_RESPONSE_ON_UNAVAILABLE_IMAGE, + #[doc = "New Exclusive Publication buffers are ready notification"] + OnExclusivePublicationReady = AERON_RESPONSE_ON_EXCLUSIVE_PUBLICATION_READY, + #[doc = "New Subscription is ready notification"] + OnSubscriptionReady = AERON_RESPONSE_ON_SUBSCRIPTION_READY, + #[doc = "New counter is ready notification"] + OnCounterReady = AERON_RESPONSE_ON_COUNTER_READY, + #[doc = "Inform clients of removal of counter"] + OnUnavailableCounter = AERON_RESPONSE_ON_UNAVAILABLE_COUNTER, + #[doc = "Inform clients of client timeout"] + OnClientTimeout = AERON_RESPONSE_ON_CLIENT_TIMEOUT, + } +); + +#[cfg(test)] +mod tests { + use crate::control_protocol::ClientCommand; + use std::convert::TryInto; + + #[test] + fn client_command_convert() { + assert_eq!( + Ok(ClientCommand::AddPublication), + ::aeron_driver_sys::AERON_COMMAND_ADD_PUBLICATION.try_into() + ) + } +} diff --git a/src/driver.rs b/src/driver.rs new file mode 100644 index 0000000..352a692 --- /dev/null +++ b/src/driver.rs @@ -0,0 +1,224 @@ +//! Bindings for the C Media Driver + +use std::ffi::{CStr, CString}; +use std::path::Path; +use std::ptr; + +use aeron_driver_sys::*; +use std::marker::PhantomData; +use std::mem::replace; + +/// Error code and message returned by the Media Driver +#[derive(Debug, PartialEq)] +pub struct DriverError { + code: i32, + msg: String, +} + +type Result = std::result::Result; + +macro_rules! aeron_op { + ($op:expr) => { + if $op < 0 { + let code = ::aeron_driver_sys::aeron_errcode(); + let msg = CStr::from_ptr(::aeron_driver_sys::aeron_errmsg()) + .to_str() + .unwrap() + .to_string(); + Err(DriverError { code, msg }) + } else { + Ok(()) + } + }; +} + +/// Context used to set up the Media Driver +#[derive(Default)] +pub struct DriverContext { + aeron_dir: Option, + dir_delete_on_start: Option, +} + +impl DriverContext { + /// Set the Aeron directory path that will be used for storing the files + /// Aeron uses to communicate with clients. + pub fn set_aeron_dir(mut self, path: &Path) -> Self { + // UNWRAP: Fails only if the path is non-UTF8 + let path_bytes = path.to_str().unwrap().as_bytes(); + // UNWRAP: Fails only if there is a null byte in the provided path + let c_string = CString::new(path_bytes).unwrap(); + self.aeron_dir = Some(c_string); + self + } + + /// Set whether Aeron should attempt to delete the `aeron_dir` on startup + /// if it already exists. Aeron will attempt to remove the directory if true. + /// If `aeron_dir` is not set in the `DriverContext`, Aeron will still attempt + /// to remove the default Aeron directory. + pub fn set_dir_delete_on_start(mut self, delete: bool) -> Self { + self.dir_delete_on_start = Some(delete); + self + } + + /// Construct a Media Driver given the context options + pub fn build(mut self) -> Result> { + let mut driver = MediaDriver { + c_context: ptr::null_mut(), + c_driver: ptr::null_mut(), + _state: PhantomData, + }; + + unsafe { aeron_op!(aeron_driver_context_init(&mut driver.c_context)) }?; + + self.aeron_dir.take().map(|dir| unsafe { + aeron_op!(aeron_driver_context_set_dir( + driver.c_context, + dir.into_raw() + )) + }); + + self.dir_delete_on_start.take().map(|delete| unsafe { + aeron_op!(aeron_driver_context_set_dir_delete_on_start( + driver.c_context, + delete + )) + }); + + unsafe { aeron_op!(aeron_driver_init(&mut driver.c_driver, driver.c_context)) }?; + + Ok(driver) + } +} + +/// Holder object to interface with the Media Driver +#[derive(Debug)] +pub struct MediaDriver { + c_context: *mut aeron_driver_context_t, + c_driver: *mut aeron_driver_t, + _state: PhantomData, +} + +/// Marker type for a MediaDriver that has yet to be started +#[derive(Debug)] +pub struct DriverInitialized; + +/// Marker type for a MediaDriver that has been started +#[derive(Debug)] +pub struct DriverStarted; + +impl MediaDriver { + /// Retrieve the C library version in (major, minor, patch) format + pub fn driver_version() -> (u32, u32, u32) { + unsafe { + ( + aeron_version_major() as u32, + aeron_version_minor() as u32, + aeron_version_patch() as u32, + ) + } + } +} + +impl MediaDriver { + /// Set up a new Media Driver with default options + pub fn new() -> Result { + DriverContext::default().build() + } + + /// Start the Media Driver threads; does not take control of the current thread + pub fn start(mut self) -> Result> { + unsafe { aeron_op!(aeron_driver_start(self.c_driver, true)) }?; + + // Move the driver and context references so the drop of `self` can't trigger issues + // when the new media driver is also eventually dropped + let c_driver = replace(&mut self.c_driver, ptr::null_mut()); + let c_context = replace(&mut self.c_context, ptr::null_mut()); + + Ok(MediaDriver { + c_driver, + c_context, + _state: PhantomData, + }) + } +} + +impl MediaDriver { + /// Perform a single idle cycle of the Media Driver; does not take control of + /// the current thread + pub fn do_work(&self) { + unsafe { + aeron_driver_main_idle_strategy(self.c_driver, aeron_driver_main_do_work(self.c_driver)) + }; + } +} + +impl Drop for MediaDriver { + fn drop(&mut self) { + if !self.c_driver.is_null() { + unsafe { aeron_op!(aeron_driver_close(self.c_driver)) }.unwrap(); + } + if !self.c_context.is_null() { + unsafe { aeron_op!(aeron_driver_context_close(self.c_context)) }.unwrap(); + } + } +} + +#[cfg(test)] +mod tests { + use crate::driver::{DriverContext, DriverError}; + use std::ffi::CStr; + use tempfile::tempdir; + + #[test] + fn multiple_startup_failure() { + let temp_dir = tempdir().unwrap(); + let dir = temp_dir.path().to_path_buf(); + temp_dir.close().unwrap(); + + let driver = DriverContext::default() + .set_aeron_dir(&dir) + .build() + .unwrap(); + + assert_eq!( + unsafe { CStr::from_ptr((*driver.c_context).aeron_dir) }.to_str(), + Ok(dir.to_str().unwrap()) + ); + drop(driver); + + // Attempting to start a media driver twice in rapid succession is guaranteed + // cause an issue because the new media driver must wait for a heartbeat timeout. + let driver_res = DriverContext::default().set_aeron_dir(&dir).build(); + + // TODO: Why is the error message behavior different on Windows? + let expected_message = if cfg!(target_os = "windows") { + String::new() + } else { + format!("could not recreate aeron dir {}: ", dir.display()) + }; + + assert!(driver_res.is_err()); + assert_eq!( + driver_res.unwrap_err(), + DriverError { + code: 0, + msg: expected_message + } + ); + } + + #[test] + fn single_duty_cycle() { + let temp_dir = tempdir().unwrap(); + let path = temp_dir.path().to_path_buf(); + temp_dir.close().unwrap(); + + let driver = DriverContext::default() + .set_aeron_dir(&path) + .build() + .expect("Unable to create media driver") + .start() + .expect("Unable to start driver"); + driver.do_work(); + } +} diff --git a/src/lib.rs b/src/lib.rs index 966a11a..9421c8b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,15 +1,24 @@ //! [Aeron](https://github.com/real-logic/aeron) client for Rust #![deny(missing_docs)] -mod context; +#[cfg(target_endian = "big")] +compile_error!("Aeron is only supported on little-endian architectures"); -/// Retrieve the C library version in (major, minor, patch) format -pub fn aeron_version() -> (u32, u32, u32) { - unsafe { - ( - aeron_driver_sys::aeron_version_major() as u32, - aeron_driver_sys::aeron_version_minor() as u32, - aeron_driver_sys::aeron_version_patch() as u32, - ) +pub mod client; +pub mod control_protocol; +pub mod driver; +pub mod util; + +const fn sematic_version_compose(major: u8, minor: u8, patch: u8) -> i32 { + (major as i32) << 16 | (minor as i32) << 8 | (patch as i32) +} + +#[cfg(test)] +mod tests { + use crate::sematic_version_compose; + + #[test] + fn version_compose_cnc() { + assert_eq!(sematic_version_compose(0, 0, 16), 16); } } diff --git a/src/util.rs b/src/util.rs new file mode 100644 index 0000000..534f5c5 --- /dev/null +++ b/src/util.rs @@ -0,0 +1,64 @@ +//! Various utility and helper bits for the Aeron client. Predominantly helpful +//! in mapping between concepts in the C++ API and Rust + +/// Helper type to indicate indexing operations in Aeron, Synonymous with the +/// Aeron C++ `index_t` type. Used to imitate the Java API. +// QUESTION: Can this just be updated to be `usize` in Rust? +pub type IndexT = i32; + +/// Error types from operations in the Aeron client. Synonymous with the exceptions +/// generated by the C++ client. +#[derive(Debug, PartialEq)] +pub enum AeronError { + /// Indication that an argument provided is an illegal value + IllegalArgument, + /// Indication that a memory access would exceed the allowable bounds + OutOfBounds, + /// Indication that a buffer operation could not complete because of space constraints + InsufficientCapacity, +} + +/// Result type for operations in the Aeron client +pub type Result = ::std::result::Result; + +/// Bit-level utility functions +pub mod bit { + use crate::util::IndexT; + use num::PrimInt; + + /// Length of the data blocks used by the CPU cache sub-system in bytes + pub const CACHE_LINE_LENGTH: usize = 64; + + /// Helper method for quick verification that `IndexT` is a positive power of two + /// + /// ```rust + /// # use aeron_rs::util::bit::is_power_of_two; + /// assert!(is_power_of_two(16)); + /// assert!(!is_power_of_two(17)); + /// ``` + pub fn is_power_of_two(idx: IndexT) -> bool { + idx > 0 && (idx as u32).is_power_of_two() + } + + /// Align a specific value to the next largest alignment size. + /// + /// ```rust + /// # use aeron_rs::util::bit::align; + /// assert_eq!(align(7, 8), 8); + /// + /// // Not intended for alignments that aren't powers of two + /// assert_eq!(align(52, 12), 52); + /// assert_eq!(align(52, 16), 64); + /// ``` + pub fn align(val: T, alignment: T) -> T + where + T: PrimInt, + { + (val + (alignment - T::one())) & !(alignment - T::one()) + } + + /// Align a `usize` value. See `align` for similar functionality on general types. + pub const fn align_usize(val: usize, alignment: usize) -> usize { + (val + (alignment - 1)) & !(alignment - 1) + } +} diff --git a/tests/cnc_terminate.rs b/tests/cnc_terminate.rs new file mode 100644 index 0000000..73728b5 --- /dev/null +++ b/tests/cnc_terminate.rs @@ -0,0 +1,143 @@ +use aeron_driver_sys::*; +use aeron_rs::client::cnc_descriptor; +use aeron_rs::client::concurrent::atomic_buffer::AtomicBuffer; +use aeron_rs::client::concurrent::ring_buffer::ManyToOneRingBuffer; +use aeron_rs::util::IndexT; +use memmap::MmapOptions; +use std::ffi::{c_void, CString}; +use std::fs::OpenOptions; +use std::path::PathBuf; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::time::Duration; +use std::{ptr, thread}; +use tempfile::tempdir; + +static RUNNING: AtomicBool = AtomicBool::new(false); + +unsafe extern "C" fn termination_hook(_state: *mut c_void) { + RUNNING.store(false, Ordering::SeqCst); +} + +unsafe extern "C" fn termination_validation( + _state: *mut c_void, + _data: *mut u8, + _length: i32, +) -> bool { + true +} + +fn driver_thread(aeron_dir: PathBuf) { + // Code largely ripped from `aeronmd`. Extra bits for termination and + // termination validation added as necessary, and far coarser error handling. + let mut context: *mut aeron_driver_context_t = ptr::null_mut(); + let mut driver: *mut aeron_driver_t = ptr::null_mut(); + + let context_init = unsafe { aeron_driver_context_init(&mut context) }; + assert!(context_init >= 0); + + let path_bytes = aeron_dir.to_str().unwrap().as_bytes(); + let c_string = CString::new(path_bytes).unwrap(); + + let aeron_dir = unsafe { aeron_driver_context_set_dir(context, c_string.into_raw()) }; + assert!(aeron_dir >= 0); + + let term_hook = unsafe { + aeron_driver_context_set_driver_termination_hook( + context, + Some(termination_hook), + ptr::null_mut(), + ) + }; + assert!(term_hook >= 0); + + let term_validation_hook = unsafe { + aeron_driver_context_set_driver_termination_validator( + context, + Some(termination_validation), + ptr::null_mut(), + ) + }; + assert!(term_validation_hook >= 0); + + let delete_dir = unsafe { aeron_driver_context_set_dir_delete_on_start(context, true) }; + assert!(delete_dir >= 0); + + let driver_init = unsafe { aeron_driver_init(&mut driver, context) }; + assert!(driver_init >= 0); + + let driver_start = unsafe { aeron_driver_start(driver, true) }; + assert!(driver_start >= 0); + + RUNNING.store(true, Ordering::SeqCst); + while RUNNING.load(Ordering::SeqCst) { + unsafe { aeron_driver_main_idle_strategy(driver, aeron_driver_main_do_work(driver)) }; + } + + unsafe { aeron_driver_close(driver) }; + unsafe { aeron_driver_context_close(context) }; +} + +#[test] +fn cnc_terminate() { + let temp_dir = tempdir().unwrap(); + let dir = temp_dir.path().to_path_buf(); + temp_dir.close().unwrap(); + + // Start up the media driver + let driver_dir = dir.clone(); + let driver_thread = thread::Builder::new() + .name("cnc_terminate__driver_thread".to_string()) + .spawn(|| driver_thread(driver_dir)) + .unwrap(); + + // Sleep a moment to let the media driver get set up + thread::sleep(Duration::from_millis(500)); + assert_eq!(RUNNING.load(Ordering::SeqCst), true); + + // Write to the CnC file to attempt termination + let cnc = dir.join(cnc_descriptor::CNC_FILE); + let file = OpenOptions::new() + .read(true) + .write(true) + .open(&cnc) + .expect("Unable to open CnC file"); + let mut mmap = + unsafe { MmapOptions::default().map_mut(&file) }.expect("Unable to mmap CnC file"); + + // When creating the buffer, we need to offset by the CnC metadata + let cnc_metadata_len = cnc_descriptor::META_DATA_LENGTH; + + // Read metadata to get buffer length + let buffer_len = { + let atomic_buffer = AtomicBuffer::wrap(&mut mmap); + let metadata = atomic_buffer + .overlay::(0) + .unwrap(); + metadata.to_driver_buffer_length + }; + + let buffer_end = cnc_metadata_len + buffer_len as usize; + let atomic_buffer = AtomicBuffer::wrap(&mut mmap[cnc_metadata_len..buffer_end]); + let mut ring_buffer = + ManyToOneRingBuffer::wrap(atomic_buffer).expect("Improperly sized buffer"); + + // 20 bytes: Client ID (8), correlation ID (8), token length (4) + let mut terminate_bytes = vec![0u8; 20]; + let terminate_len = terminate_bytes.len(); + let mut source_buffer = AtomicBuffer::wrap(&mut terminate_bytes); + let client_id = ring_buffer.next_correlation_id(); + source_buffer.put_i64_ordered(0, client_id).unwrap(); + source_buffer.put_i64_ordered(8, -1).unwrap(); + + let term_id: i32 = 0x0E; + ring_buffer + .write(term_id, &source_buffer, 0, terminate_len as IndexT) + .unwrap(); + + // Wait for the driver to finish + // TODO: Timeout, and then set `RUNNING` manually + driver_thread + .join() + .expect("Driver thread panicked during execution"); + assert_eq!(RUNNING.load(Ordering::SeqCst), false); +}