diff --git a/aeron_driver-sys/examples/aeronmd.rs b/aeron_driver-sys/examples/aeronmd.rs new file mode 100644 index 0000000..9247fd5 --- /dev/null +++ b/aeron_driver-sys/examples/aeronmd.rs @@ -0,0 +1,98 @@ +//! Media driver startup example based on +//! [aeronmd.c](https://github.com/real-logic/aeron/blob/master/aeron-driver/src/main/c/aeronmd.c) +//! This example demonstrates direct usage of the -sys bindings for the Media Driver API. +//! The main crate has a more Rust-idiomatic example usage. + +use aeron_driver_sys::*; +use clap; +use ctrlc; +use std::ffi::CStr; +use std::os::raw::c_void; +use std::ptr; +use std::sync::atomic::{AtomicBool, Ordering}; + +static RUNNING: AtomicBool = AtomicBool::new(true); + +unsafe extern "C" fn termination_hook(_clientd: *mut c_void) { + println!("Terminated"); + RUNNING.store(false, Ordering::SeqCst); +} + +fn main() { + let version = unsafe { CStr::from_ptr(aeron_version_full()) }; + let _cmdline = clap::App::new("aeronmd") + .version(version.to_str().unwrap()) + .get_matches(); + + // TODO: Handle -D switches + + ctrlc::set_handler(move || { + // TODO: Actually understand atomic ordering + RUNNING.store(false, Ordering::SeqCst); + }) + .unwrap(); + + let mut init_success = true; + let mut context: *mut aeron_driver_context_t = ptr::null_mut(); + let mut driver: *mut aeron_driver_t = ptr::null_mut(); + + if init_success { + let context_init = unsafe { aeron_driver_context_init(&mut context) }; + if context_init < 0 { + let err_code = unsafe { aeron_errcode() }; + let err_str = unsafe { CStr::from_ptr(aeron_errmsg()) }.to_str().unwrap(); + eprintln!("ERROR: context init ({}) {}", err_code, err_str); + init_success = false; + } + } + + if init_success { + let term_hook = unsafe { + aeron_driver_context_set_driver_termination_hook( + context, + Some(termination_hook), + ptr::null_mut(), + ) + }; + if term_hook < 0 { + let err_code = unsafe { aeron_errcode() }; + let err_str = unsafe { CStr::from_ptr(aeron_errmsg()) }.to_str().unwrap(); + eprintln!( + "ERROR: context set termination hook ({}) {}", + err_code, err_str + ); + init_success = false; + } + } + + if init_success { + let driver_init = unsafe { aeron_driver_init(&mut driver, context) }; + if driver_init < 0 { + let err_code = unsafe { aeron_errcode() }; + let err_str = unsafe { CStr::from_ptr(aeron_errmsg()) }.to_str().unwrap(); + eprintln!("ERROR: driver init ({}) {}", err_code, err_str); + init_success = false; + } + } + + if init_success { + let driver_start = unsafe { aeron_driver_start(driver, true) }; + if driver_start < 0 { + let err_code = unsafe { aeron_errcode() }; + let err_str = unsafe { CStr::from_ptr(aeron_errmsg()) }.to_str().unwrap(); + eprintln!("ERROR: driver start ({}) {}", err_code, err_str); + init_success = false; + } + } + + if init_success { + println!("Press Ctrl-C to exit."); + + while RUNNING.load(Ordering::SeqCst) { + unsafe { aeron_driver_main_idle_strategy(driver, aeron_driver_main_do_work(driver)) }; + } + } + + unsafe { aeron_driver_close(driver) }; + unsafe { aeron_driver_context_close(context) }; +} diff --git a/examples/aeronmd.rs b/examples/aeronmd.rs index 1a972b5..0cb7672 100644 --- a/examples/aeronmd.rs +++ b/examples/aeronmd.rs @@ -1,105 +1,22 @@ -//! Media driver startup example based on -//! [aeronmd.c](https://github.com/real-logic/aeron/blob/master/aeron-driver/src/main/c/aeronmd.c) -#![deny(missing_docs)] - -use aeron_driver_sys::*; -use clap; -use ctrlc; -use std::ffi::CStr; -use std::os::raw::c_void; -use std::ptr; +//! A version of the `aeronmd` runner program demonstrating the Rust wrappers +//! around Media Driver functionality. +use aeron_rs::driver::DriverContext; use std::sync::atomic::{AtomicBool, Ordering}; -static RUNNING: AtomicBool = AtomicBool::new(true); - -unsafe extern "C" fn termination_hook(_clientd: *mut c_void) { - println!("Terminated"); - RUNNING.store(false, Ordering::SeqCst); -} - -unsafe extern "C" fn termination_validator( - _state: *mut c_void, - _buffer: *mut u8, - _length: i32, -) -> bool { - true -} +static RUNNING: AtomicBool = AtomicBool::new(false); fn main() { - let version = unsafe { CStr::from_ptr(aeron_version_full()) }; - let _cmdline = clap::App::new("aeronmd") - .version(version.to_str().unwrap()) - .get_matches(); + let driver = DriverContext::default() + .build() + .expect("Unable to create media driver"); - // TODO: Handle -D switches + let driver = driver.start().expect("Unable to start media driver"); + RUNNING.store(true, Ordering::SeqCst); - ctrlc::set_handler(move || { - // TODO: Actually understand atomic ordering - RUNNING.store(false, Ordering::SeqCst); - }) - .unwrap(); + println!("Press Ctrl-C to quit"); - let mut init_success = true; - let mut context: *mut aeron_driver_context_t = ptr::null_mut(); - let mut driver: *mut aeron_driver_t = ptr::null_mut(); - - if init_success { - let context_init = unsafe { aeron_driver_context_init(&mut context) }; - if context_init < 0 { - let err_code = unsafe { aeron_errcode() }; - let err_str = unsafe { CStr::from_ptr(aeron_errmsg()) }.to_str().unwrap(); - eprintln!("ERROR: context init ({}) {}", err_code, err_str); - init_success = false; - } + while RUNNING.load(Ordering::SeqCst) { + // TODO: Termination hook + driver.do_work(); } - - if init_success { - let term_hook = unsafe { - aeron_driver_context_set_driver_termination_hook( - context, - Some(termination_hook), - ptr::null_mut(), - ) - }; - if term_hook < 0 { - let err_code = unsafe { aeron_errcode() }; - let err_str = unsafe { CStr::from_ptr(aeron_errmsg()) }.to_str().unwrap(); - eprintln!( - "ERROR: context set termination hook ({}) {}", - err_code, err_str - ); - init_success = false; - } - } - - if init_success { - let driver_init = unsafe { aeron_driver_init(&mut driver, context) }; - if driver_init < 0 { - let err_code = unsafe { aeron_errcode() }; - let err_str = unsafe { CStr::from_ptr(aeron_errmsg()) }.to_str().unwrap(); - eprintln!("ERROR: driver init ({}) {}", err_code, err_str); - init_success = false; - } - } - - if init_success { - let driver_start = unsafe { aeron_driver_start(driver, true) }; - if driver_start < 0 { - let err_code = unsafe { aeron_errcode() }; - let err_str = unsafe { CStr::from_ptr(aeron_errmsg()) }.to_str().unwrap(); - eprintln!("ERROR: driver start ({}) {}", err_code, err_str); - init_success = false; - } - } - - if init_success { - println!("Press Ctrl-C to exit."); - - while RUNNING.load(Ordering::SeqCst) { - unsafe { aeron_driver_main_idle_strategy(driver, aeron_driver_main_do_work(driver)) }; - } - } - - unsafe { aeron_driver_close(driver) }; - unsafe { aeron_driver_context_close(context) }; } diff --git a/src/client/cnc_descriptor.rs b/src/client/cnc_descriptor.rs index 6724a80..f5b6c18 100644 --- a/src/client/cnc_descriptor.rs +++ b/src/client/cnc_descriptor.rs @@ -70,7 +70,8 @@ pub const CNC_FILE: &str = "cnc.dat"; #[cfg(test)] mod tests { use crate::client::cnc_descriptor::{MetaDataDefinition, CNC_FILE, CNC_VERSION}; - use crate::driver::{DriverContext, MediaDriver}; + use crate::client::concurrent::atomic_buffer::AtomicBuffer; + use crate::driver::DriverContext; use memmap::MmapOptions; use std::fs::File; use tempfile::tempdir; @@ -81,8 +82,10 @@ mod tests { let dir_path = dir.as_ref().to_path_buf(); dir.close().unwrap(); - let context = DriverContext::default().set_aeron_dir(&dir_path); - let _driver = MediaDriver::with_context(context).unwrap(); + let _driver = DriverContext::default() + .set_aeron_dir(&dir_path) + .build() + .unwrap(); // Open the CnC location let cnc_path = dir_path.join(CNC_FILE); diff --git a/src/driver.rs b/src/driver.rs index 3876752..2918386 100644 --- a/src/driver.rs +++ b/src/driver.rs @@ -5,6 +5,8 @@ use std::path::Path; use std::ptr; use aeron_driver_sys::*; +use std::marker::PhantomData; +use std::mem::replace; /// Error code and message returned by the Media Driver #[derive(Debug, PartialEq)] @@ -13,24 +15,7 @@ pub struct DriverError { msg: String, } -/// Context used to set up the Media Driver -#[derive(Default)] -pub struct DriverContext { - aeron_dir: Option, -} - -impl DriverContext { - /// Set the Aeron directory path that will be used for storing the files - /// Aeron uses to communicate with clients. - pub fn set_aeron_dir(mut self, path: &Path) -> Self { - // UNWRAP: Fails only if the path is non-UTF8 - let path_bytes = path.to_str().unwrap().as_bytes(); - // UNWRAP: Fails only if there is a null byte in the provided path - let c_string = CString::new(path_bytes).unwrap(); - self.aeron_dir = Some(c_string); - self - } -} +type Result = std::result::Result; macro_rules! aeron_op { ($op:expr) => { @@ -47,40 +32,81 @@ macro_rules! aeron_op { }; } -/// Holder object to interface with the Media Driver -#[derive(Debug)] -pub struct MediaDriver { - c_context: *mut aeron_driver_context_t, - c_driver: *mut aeron_driver_t, +/// Context used to set up the Media Driver +#[derive(Default)] +pub struct DriverContext { + aeron_dir: Option, + dir_delete_on_start: Option, } -impl MediaDriver { - /// Set up a new Media Driver - pub fn with_context(mut context: DriverContext) -> Result { +impl DriverContext { + /// Set the Aeron directory path that will be used for storing the files + /// Aeron uses to communicate with clients. + pub fn set_aeron_dir(mut self, path: &Path) -> Self { + // UNWRAP: Fails only if the path is non-UTF8 + let path_bytes = path.to_str().unwrap().as_bytes(); + // UNWRAP: Fails only if there is a null byte in the provided path + let c_string = CString::new(path_bytes).unwrap(); + self.aeron_dir = Some(c_string); + self + } + + /// Set whether Aeron should attempt to delete the `aeron_dir` on startup + /// if it already exists. Aeron will attempt to remove the directory if true. + /// If `aeron_dir` is not set in the `DriverContext`, Aeron will still attempt + /// to remove the default Aeron directory. + pub fn set_dir_delete_on_start(mut self, delete: bool) -> Self { + self.dir_delete_on_start = Some(delete); + self + } + + /// Construct a Media Driver given the context options + pub fn build(mut self) -> Result> { let mut driver = MediaDriver { c_context: ptr::null_mut(), c_driver: ptr::null_mut(), + _state: PhantomData, }; unsafe { aeron_op!(aeron_driver_context_init(&mut driver.c_context)) }?; - context.aeron_dir.take().map(|dir| unsafe { + self.aeron_dir.take().map(|dir| unsafe { aeron_op!(aeron_driver_context_set_dir( driver.c_context, dir.into_raw() )) }); + self.dir_delete_on_start.take().map(|delete| unsafe { + aeron_op!(aeron_driver_context_set_dir_delete_on_start( + driver.c_context, + delete + )) + }); + unsafe { aeron_op!(aeron_driver_init(&mut driver.c_driver, driver.c_context)) }?; Ok(driver) } +} - /// Set up a new Media Driver with default options - pub fn new() -> Result { - Self::with_context(DriverContext::default()) - } +/// Holder object to interface with the Media Driver +#[derive(Debug)] +pub struct MediaDriver { + c_context: *mut aeron_driver_context_t, + c_driver: *mut aeron_driver_t, + _state: PhantomData, +} +/// Marker type for a MediaDriver that has yet to be started +#[derive(Debug)] +pub struct DriverInitialized; + +/// Marker type for a MediaDriver that has been started +#[derive(Debug)] +pub struct DriverStarted; + +impl MediaDriver { /// Retrieve the C library version in (major, minor, patch) format pub fn driver_version() -> (u32, u32, u32) { unsafe { @@ -93,7 +119,40 @@ impl MediaDriver { } } -impl Drop for MediaDriver { +impl MediaDriver { + /// Set up a new Media Driver with default options + pub fn new() -> Result { + DriverContext::default().build() + } + + /// Start the Media Driver threads; does not take control of the current thread + pub fn start(mut self) -> Result> { + unsafe { aeron_op!(aeron_driver_start(self.c_driver, true)) }?; + + // Move the driver and context references so the drop of `self` can't trigger issues + // when the new media driver is also eventually dropped + let c_driver = replace(&mut self.c_driver, ptr::null_mut()); + let c_context = replace(&mut self.c_context, ptr::null_mut()); + + Ok(MediaDriver { + c_driver, + c_context, + _state: PhantomData, + }) + } +} + +impl MediaDriver { + /// Perform a single idle cycle of the Media Driver; does not take control of + /// the current thread + pub fn do_work(&self) { + unsafe { + aeron_driver_main_idle_strategy(self.c_driver, aeron_driver_main_do_work(self.c_driver)) + }; + } +} + +impl Drop for MediaDriver { fn drop(&mut self) { if !self.c_driver.is_null() { unsafe { aeron_op!(aeron_driver_close(self.c_driver)) }.unwrap(); @@ -106,37 +165,34 @@ impl Drop for MediaDriver { #[cfg(test)] mod tests { - use crate::driver::{DriverContext, DriverError, MediaDriver}; + use crate::driver::{DriverContext, DriverError}; use std::ffi::CStr; use tempfile::tempdir; #[test] fn multiple_startup_failure() { - // We immediately close `tempdir` because we just want the name; Aeron needs - // to set up the directory itself. - let dir = tempdir().unwrap(); - let dir_path = dir.as_ref().to_path_buf(); - dir.close().unwrap(); - - let context = DriverContext::default().set_aeron_dir(&dir_path); - let driver = MediaDriver::with_context(context).unwrap(); + let dir = tempdir().unwrap().into_path(); + let driver = DriverContext::default() + .set_aeron_dir(&dir) + .set_dir_delete_on_start(true) + .build() + .unwrap(); assert_eq!( unsafe { CStr::from_ptr((*driver.c_context).aeron_dir) }.to_str(), - Ok(dir_path.to_str().unwrap()) + Ok(dir.to_str().unwrap()) ); drop(driver); // Attempting to start a media driver twice in rapid succession is guaranteed // cause an issue because the new media driver must wait for a heartbeat timeout. - let context = DriverContext::default().set_aeron_dir(&dir_path); - let driver_res = MediaDriver::with_context(context); + let driver_res = DriverContext::default().set_aeron_dir(&dir).build(); // TODO: Why is the error message behavior different on Windows? let expected_message = if cfg!(target_os = "windows") { String::new() } else { - format!("could not recreate aeron dir {}: ", dir_path.display()) + format!("could not recreate aeron dir {}: ", dir.display()) }; assert!(driver_res.is_err()); @@ -148,4 +204,19 @@ mod tests { } ); } + + #[test] + fn single_duty_cycle() { + let tempdir = tempfile::tempdir().unwrap(); + let path = tempdir.into_path(); + + let driver = DriverContext::default() + .set_aeron_dir(&path) + .set_dir_delete_on_start(true) + .build() + .expect("Unable to create media driver") + .start() + .expect("Unable to start driver"); + driver.do_work(); + } }