1
0
mirror of https://github.com/bspeice/aeron-rs synced 2024-12-22 05:48:10 -05:00

Add duty cycle functionality to the media driver

Eventually want to receive term events in tests
This commit is contained in:
Bradlee Speice 2019-10-06 20:45:57 -04:00
parent 61a02711c0
commit adfa401245
4 changed files with 233 additions and 144 deletions

View File

@ -0,0 +1,98 @@
//! Media driver startup example based on
//! [aeronmd.c](https://github.com/real-logic/aeron/blob/master/aeron-driver/src/main/c/aeronmd.c)
//! This example demonstrates direct usage of the -sys bindings for the Media Driver API.
//! The main crate has a more Rust-idiomatic example usage.
use aeron_driver_sys::*;
use clap;
use ctrlc;
use std::ffi::CStr;
use std::os::raw::c_void;
use std::ptr;
use std::sync::atomic::{AtomicBool, Ordering};
static RUNNING: AtomicBool = AtomicBool::new(true);
unsafe extern "C" fn termination_hook(_clientd: *mut c_void) {
println!("Terminated");
RUNNING.store(false, Ordering::SeqCst);
}
fn main() {
let version = unsafe { CStr::from_ptr(aeron_version_full()) };
let _cmdline = clap::App::new("aeronmd")
.version(version.to_str().unwrap())
.get_matches();
// TODO: Handle -D switches
ctrlc::set_handler(move || {
// TODO: Actually understand atomic ordering
RUNNING.store(false, Ordering::SeqCst);
})
.unwrap();
let mut init_success = true;
let mut context: *mut aeron_driver_context_t = ptr::null_mut();
let mut driver: *mut aeron_driver_t = ptr::null_mut();
if init_success {
let context_init = unsafe { aeron_driver_context_init(&mut context) };
if context_init < 0 {
let err_code = unsafe { aeron_errcode() };
let err_str = unsafe { CStr::from_ptr(aeron_errmsg()) }.to_str().unwrap();
eprintln!("ERROR: context init ({}) {}", err_code, err_str);
init_success = false;
}
}
if init_success {
let term_hook = unsafe {
aeron_driver_context_set_driver_termination_hook(
context,
Some(termination_hook),
ptr::null_mut(),
)
};
if term_hook < 0 {
let err_code = unsafe { aeron_errcode() };
let err_str = unsafe { CStr::from_ptr(aeron_errmsg()) }.to_str().unwrap();
eprintln!(
"ERROR: context set termination hook ({}) {}",
err_code, err_str
);
init_success = false;
}
}
if init_success {
let driver_init = unsafe { aeron_driver_init(&mut driver, context) };
if driver_init < 0 {
let err_code = unsafe { aeron_errcode() };
let err_str = unsafe { CStr::from_ptr(aeron_errmsg()) }.to_str().unwrap();
eprintln!("ERROR: driver init ({}) {}", err_code, err_str);
init_success = false;
}
}
if init_success {
let driver_start = unsafe { aeron_driver_start(driver, true) };
if driver_start < 0 {
let err_code = unsafe { aeron_errcode() };
let err_str = unsafe { CStr::from_ptr(aeron_errmsg()) }.to_str().unwrap();
eprintln!("ERROR: driver start ({}) {}", err_code, err_str);
init_success = false;
}
}
if init_success {
println!("Press Ctrl-C to exit.");
while RUNNING.load(Ordering::SeqCst) {
unsafe { aeron_driver_main_idle_strategy(driver, aeron_driver_main_do_work(driver)) };
}
}
unsafe { aeron_driver_close(driver) };
unsafe { aeron_driver_context_close(context) };
}

View File

@ -1,105 +1,22 @@
//! Media driver startup example based on //! A version of the `aeronmd` runner program demonstrating the Rust wrappers
//! [aeronmd.c](https://github.com/real-logic/aeron/blob/master/aeron-driver/src/main/c/aeronmd.c) //! around Media Driver functionality.
#![deny(missing_docs)] use aeron_rs::driver::DriverContext;
use aeron_driver_sys::*;
use clap;
use ctrlc;
use std::ffi::CStr;
use std::os::raw::c_void;
use std::ptr;
use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::atomic::{AtomicBool, Ordering};
static RUNNING: AtomicBool = AtomicBool::new(true); static RUNNING: AtomicBool = AtomicBool::new(false);
unsafe extern "C" fn termination_hook(_clientd: *mut c_void) {
println!("Terminated");
RUNNING.store(false, Ordering::SeqCst);
}
unsafe extern "C" fn termination_validator(
_state: *mut c_void,
_buffer: *mut u8,
_length: i32,
) -> bool {
true
}
fn main() { fn main() {
let version = unsafe { CStr::from_ptr(aeron_version_full()) }; let driver = DriverContext::default()
let _cmdline = clap::App::new("aeronmd") .build()
.version(version.to_str().unwrap()) .expect("Unable to create media driver");
.get_matches();
// TODO: Handle -D switches let driver = driver.start().expect("Unable to start media driver");
RUNNING.store(true, Ordering::SeqCst);
ctrlc::set_handler(move || { println!("Press Ctrl-C to quit");
// TODO: Actually understand atomic ordering
RUNNING.store(false, Ordering::SeqCst);
})
.unwrap();
let mut init_success = true; while RUNNING.load(Ordering::SeqCst) {
let mut context: *mut aeron_driver_context_t = ptr::null_mut(); // TODO: Termination hook
let mut driver: *mut aeron_driver_t = ptr::null_mut(); driver.do_work();
if init_success {
let context_init = unsafe { aeron_driver_context_init(&mut context) };
if context_init < 0 {
let err_code = unsafe { aeron_errcode() };
let err_str = unsafe { CStr::from_ptr(aeron_errmsg()) }.to_str().unwrap();
eprintln!("ERROR: context init ({}) {}", err_code, err_str);
init_success = false;
}
} }
if init_success {
let term_hook = unsafe {
aeron_driver_context_set_driver_termination_hook(
context,
Some(termination_hook),
ptr::null_mut(),
)
};
if term_hook < 0 {
let err_code = unsafe { aeron_errcode() };
let err_str = unsafe { CStr::from_ptr(aeron_errmsg()) }.to_str().unwrap();
eprintln!(
"ERROR: context set termination hook ({}) {}",
err_code, err_str
);
init_success = false;
}
}
if init_success {
let driver_init = unsafe { aeron_driver_init(&mut driver, context) };
if driver_init < 0 {
let err_code = unsafe { aeron_errcode() };
let err_str = unsafe { CStr::from_ptr(aeron_errmsg()) }.to_str().unwrap();
eprintln!("ERROR: driver init ({}) {}", err_code, err_str);
init_success = false;
}
}
if init_success {
let driver_start = unsafe { aeron_driver_start(driver, true) };
if driver_start < 0 {
let err_code = unsafe { aeron_errcode() };
let err_str = unsafe { CStr::from_ptr(aeron_errmsg()) }.to_str().unwrap();
eprintln!("ERROR: driver start ({}) {}", err_code, err_str);
init_success = false;
}
}
if init_success {
println!("Press Ctrl-C to exit.");
while RUNNING.load(Ordering::SeqCst) {
unsafe { aeron_driver_main_idle_strategy(driver, aeron_driver_main_do_work(driver)) };
}
}
unsafe { aeron_driver_close(driver) };
unsafe { aeron_driver_context_close(context) };
} }

View File

@ -70,7 +70,8 @@ pub const CNC_FILE: &str = "cnc.dat";
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use crate::client::cnc_descriptor::{MetaDataDefinition, CNC_FILE, CNC_VERSION}; use crate::client::cnc_descriptor::{MetaDataDefinition, CNC_FILE, CNC_VERSION};
use crate::driver::{DriverContext, MediaDriver}; use crate::client::concurrent::atomic_buffer::AtomicBuffer;
use crate::driver::DriverContext;
use memmap::MmapOptions; use memmap::MmapOptions;
use std::fs::File; use std::fs::File;
use tempfile::tempdir; use tempfile::tempdir;
@ -81,8 +82,10 @@ mod tests {
let dir_path = dir.as_ref().to_path_buf(); let dir_path = dir.as_ref().to_path_buf();
dir.close().unwrap(); dir.close().unwrap();
let context = DriverContext::default().set_aeron_dir(&dir_path); let _driver = DriverContext::default()
let _driver = MediaDriver::with_context(context).unwrap(); .set_aeron_dir(&dir_path)
.build()
.unwrap();
// Open the CnC location // Open the CnC location
let cnc_path = dir_path.join(CNC_FILE); let cnc_path = dir_path.join(CNC_FILE);

View File

@ -5,6 +5,8 @@ use std::path::Path;
use std::ptr; use std::ptr;
use aeron_driver_sys::*; use aeron_driver_sys::*;
use std::marker::PhantomData;
use std::mem::replace;
/// Error code and message returned by the Media Driver /// Error code and message returned by the Media Driver
#[derive(Debug, PartialEq)] #[derive(Debug, PartialEq)]
@ -13,24 +15,7 @@ pub struct DriverError {
msg: String, msg: String,
} }
/// Context used to set up the Media Driver type Result<S> = std::result::Result<S, DriverError>;
#[derive(Default)]
pub struct DriverContext {
aeron_dir: Option<CString>,
}
impl DriverContext {
/// Set the Aeron directory path that will be used for storing the files
/// Aeron uses to communicate with clients.
pub fn set_aeron_dir(mut self, path: &Path) -> Self {
// UNWRAP: Fails only if the path is non-UTF8
let path_bytes = path.to_str().unwrap().as_bytes();
// UNWRAP: Fails only if there is a null byte in the provided path
let c_string = CString::new(path_bytes).unwrap();
self.aeron_dir = Some(c_string);
self
}
}
macro_rules! aeron_op { macro_rules! aeron_op {
($op:expr) => { ($op:expr) => {
@ -47,40 +32,81 @@ macro_rules! aeron_op {
}; };
} }
/// Holder object to interface with the Media Driver /// Context used to set up the Media Driver
#[derive(Debug)] #[derive(Default)]
pub struct MediaDriver { pub struct DriverContext {
c_context: *mut aeron_driver_context_t, aeron_dir: Option<CString>,
c_driver: *mut aeron_driver_t, dir_delete_on_start: Option<bool>,
} }
impl MediaDriver { impl DriverContext {
/// Set up a new Media Driver /// Set the Aeron directory path that will be used for storing the files
pub fn with_context(mut context: DriverContext) -> Result<Self, DriverError> { /// Aeron uses to communicate with clients.
pub fn set_aeron_dir(mut self, path: &Path) -> Self {
// UNWRAP: Fails only if the path is non-UTF8
let path_bytes = path.to_str().unwrap().as_bytes();
// UNWRAP: Fails only if there is a null byte in the provided path
let c_string = CString::new(path_bytes).unwrap();
self.aeron_dir = Some(c_string);
self
}
/// Set whether Aeron should attempt to delete the `aeron_dir` on startup
/// if it already exists. Aeron will attempt to remove the directory if true.
/// If `aeron_dir` is not set in the `DriverContext`, Aeron will still attempt
/// to remove the default Aeron directory.
pub fn set_dir_delete_on_start(mut self, delete: bool) -> Self {
self.dir_delete_on_start = Some(delete);
self
}
/// Construct a Media Driver given the context options
pub fn build(mut self) -> Result<MediaDriver<DriverInitialized>> {
let mut driver = MediaDriver { let mut driver = MediaDriver {
c_context: ptr::null_mut(), c_context: ptr::null_mut(),
c_driver: ptr::null_mut(), c_driver: ptr::null_mut(),
_state: PhantomData,
}; };
unsafe { aeron_op!(aeron_driver_context_init(&mut driver.c_context)) }?; unsafe { aeron_op!(aeron_driver_context_init(&mut driver.c_context)) }?;
context.aeron_dir.take().map(|dir| unsafe { self.aeron_dir.take().map(|dir| unsafe {
aeron_op!(aeron_driver_context_set_dir( aeron_op!(aeron_driver_context_set_dir(
driver.c_context, driver.c_context,
dir.into_raw() dir.into_raw()
)) ))
}); });
self.dir_delete_on_start.take().map(|delete| unsafe {
aeron_op!(aeron_driver_context_set_dir_delete_on_start(
driver.c_context,
delete
))
});
unsafe { aeron_op!(aeron_driver_init(&mut driver.c_driver, driver.c_context)) }?; unsafe { aeron_op!(aeron_driver_init(&mut driver.c_driver, driver.c_context)) }?;
Ok(driver) Ok(driver)
} }
}
/// Set up a new Media Driver with default options /// Holder object to interface with the Media Driver
pub fn new() -> Result<Self, DriverError> { #[derive(Debug)]
Self::with_context(DriverContext::default()) pub struct MediaDriver<S> {
} c_context: *mut aeron_driver_context_t,
c_driver: *mut aeron_driver_t,
_state: PhantomData<S>,
}
/// Marker type for a MediaDriver that has yet to be started
#[derive(Debug)]
pub struct DriverInitialized;
/// Marker type for a MediaDriver that has been started
#[derive(Debug)]
pub struct DriverStarted;
impl<S> MediaDriver<S> {
/// Retrieve the C library version in (major, minor, patch) format /// Retrieve the C library version in (major, minor, patch) format
pub fn driver_version() -> (u32, u32, u32) { pub fn driver_version() -> (u32, u32, u32) {
unsafe { unsafe {
@ -93,7 +119,40 @@ impl MediaDriver {
} }
} }
impl Drop for MediaDriver { impl MediaDriver<DriverInitialized> {
/// Set up a new Media Driver with default options
pub fn new() -> Result<Self> {
DriverContext::default().build()
}
/// Start the Media Driver threads; does not take control of the current thread
pub fn start(mut self) -> Result<MediaDriver<DriverStarted>> {
unsafe { aeron_op!(aeron_driver_start(self.c_driver, true)) }?;
// Move the driver and context references so the drop of `self` can't trigger issues
// when the new media driver is also eventually dropped
let c_driver = replace(&mut self.c_driver, ptr::null_mut());
let c_context = replace(&mut self.c_context, ptr::null_mut());
Ok(MediaDriver {
c_driver,
c_context,
_state: PhantomData,
})
}
}
impl MediaDriver<DriverStarted> {
/// Perform a single idle cycle of the Media Driver; does not take control of
/// the current thread
pub fn do_work(&self) {
unsafe {
aeron_driver_main_idle_strategy(self.c_driver, aeron_driver_main_do_work(self.c_driver))
};
}
}
impl<S> Drop for MediaDriver<S> {
fn drop(&mut self) { fn drop(&mut self) {
if !self.c_driver.is_null() { if !self.c_driver.is_null() {
unsafe { aeron_op!(aeron_driver_close(self.c_driver)) }.unwrap(); unsafe { aeron_op!(aeron_driver_close(self.c_driver)) }.unwrap();
@ -106,37 +165,34 @@ impl Drop for MediaDriver {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use crate::driver::{DriverContext, DriverError, MediaDriver}; use crate::driver::{DriverContext, DriverError};
use std::ffi::CStr; use std::ffi::CStr;
use tempfile::tempdir; use tempfile::tempdir;
#[test] #[test]
fn multiple_startup_failure() { fn multiple_startup_failure() {
// We immediately close `tempdir` because we just want the name; Aeron needs let dir = tempdir().unwrap().into_path();
// to set up the directory itself. let driver = DriverContext::default()
let dir = tempdir().unwrap(); .set_aeron_dir(&dir)
let dir_path = dir.as_ref().to_path_buf(); .set_dir_delete_on_start(true)
dir.close().unwrap(); .build()
.unwrap();
let context = DriverContext::default().set_aeron_dir(&dir_path);
let driver = MediaDriver::with_context(context).unwrap();
assert_eq!( assert_eq!(
unsafe { CStr::from_ptr((*driver.c_context).aeron_dir) }.to_str(), unsafe { CStr::from_ptr((*driver.c_context).aeron_dir) }.to_str(),
Ok(dir_path.to_str().unwrap()) Ok(dir.to_str().unwrap())
); );
drop(driver); drop(driver);
// Attempting to start a media driver twice in rapid succession is guaranteed // Attempting to start a media driver twice in rapid succession is guaranteed
// cause an issue because the new media driver must wait for a heartbeat timeout. // cause an issue because the new media driver must wait for a heartbeat timeout.
let context = DriverContext::default().set_aeron_dir(&dir_path); let driver_res = DriverContext::default().set_aeron_dir(&dir).build();
let driver_res = MediaDriver::with_context(context);
// TODO: Why is the error message behavior different on Windows? // TODO: Why is the error message behavior different on Windows?
let expected_message = if cfg!(target_os = "windows") { let expected_message = if cfg!(target_os = "windows") {
String::new() String::new()
} else { } else {
format!("could not recreate aeron dir {}: ", dir_path.display()) format!("could not recreate aeron dir {}: ", dir.display())
}; };
assert!(driver_res.is_err()); assert!(driver_res.is_err());
@ -148,4 +204,19 @@ mod tests {
} }
); );
} }
#[test]
fn single_duty_cycle() {
let tempdir = tempfile::tempdir().unwrap();
let path = tempdir.into_path();
let driver = DriverContext::default()
.set_aeron_dir(&path)
.set_dir_delete_on_start(true)
.build()
.expect("Unable to create media driver")
.start()
.expect("Unable to start driver");
driver.do_work();
}
} }