From 966db0767fce79ff42b857fb1da868a21aa35eb4 Mon Sep 17 00:00:00 2001 From: Bradlee Speice Date: Mon, 23 Sep 2019 23:04:59 -0400 Subject: [PATCH 01/40] Add a Rust-ier way of handling the MediaDriver --- Cargo.toml | 1 + src/driver.rs | 132 ++++++++++++++++++++++++++++++++++++++++++++++++++ src/lib.rs | 2 +- 3 files changed, 134 insertions(+), 1 deletion(-) create mode 100644 src/driver.rs diff --git a/Cargo.toml b/Cargo.toml index 7d82096..492aaf4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,3 +17,4 @@ aeron_driver-sys = { path = "./aeron_driver-sys" } [dev-dependencies] clap = "2.33" ctrlc = "3.1.3" +tempfile = "3.1" diff --git a/src/driver.rs b/src/driver.rs new file mode 100644 index 0000000..ded7b5b --- /dev/null +++ b/src/driver.rs @@ -0,0 +1,132 @@ +//! Bindings for the C Media Driver + +use std::ffi::{CStr, CString}; +use std::os::unix::ffi::OsStrExt; +use std::path::Path; +use std::ptr; + +use aeron_driver_sys::*; + +/// Error code and message returned by the Media Driver +#[derive(Debug, PartialEq)] +pub struct DriverError { + code: i32, + msg: String, +} + +/// Context used to set up the Media Driver +#[derive(Default)] +pub struct DriverContext { + aeron_dir: Option, +} + +impl DriverContext { + /// Set the Aeron directory path that will be used for storing the files + /// Aeron uses to communicate with clients. + pub fn set_aeron_dir(mut self, path: &Path) -> Self { + // UNWRAP: Fails only if there is a null byte in the provided path + let c_string = CString::new(path.as_os_str().as_bytes()).unwrap(); + self.aeron_dir = Some(c_string); + self + } +} + +macro_rules! aeron_op { + ($op:expr) => { + if $op < 0 { + let code = ::aeron_driver_sys::aeron_errcode(); + let msg = CStr::from_ptr(::aeron_driver_sys::aeron_errmsg()) + .to_str() + .unwrap() + .to_string(); + Err(DriverError { code, msg }) + } else { + Ok(()) + } + }; +} + +/// Holder object to interface with the Media Driver +#[derive(Debug)] +pub struct MediaDriver { + c_context: *mut aeron_driver_context_t, + c_driver: *mut aeron_driver_t, +} + +impl MediaDriver { + /// Set up a new Media Driver + pub fn with_context(mut context: DriverContext) -> Result { + let mut driver = MediaDriver { + c_context: ptr::null_mut(), + c_driver: ptr::null_mut(), + }; + + unsafe { aeron_op!(aeron_driver_context_init(&mut driver.c_context)) }?; + + context.aeron_dir.take().map(|dir| unsafe { + aeron_op!(aeron_driver_context_set_dir( + driver.c_context, + dir.into_raw() + )) + }); + + unsafe { aeron_op!(aeron_driver_init(&mut driver.c_driver, driver.c_context)) }?; + + Ok(driver) + } + + /// Set up a new Media Driver with default options + pub fn new() -> Result { + Self::with_context(DriverContext::default()) + } +} + +impl Drop for MediaDriver { + fn drop(&mut self) { + if self.c_driver != ptr::null_mut() { + unsafe { aeron_op!(aeron_driver_close(self.c_driver)) }.unwrap(); + } + if self.c_context != ptr::null_mut() { + unsafe { aeron_op!(aeron_driver_context_close(self.c_context)) }.unwrap(); + } + } +} + +#[cfg(test)] +mod tests { + use crate::driver::{DriverContext, DriverError, MediaDriver}; + use std::ffi::CStr; + use tempfile::tempdir; + + #[test] + fn multiple_startup_failure() { + // We immediately close `tempdir` because we just want the name; Aeron needs + // to set up the directory itself. + let dir = tempdir().unwrap(); + let dir_path = dir.as_ref().to_path_buf(); + dir.close().unwrap(); + + let context = DriverContext::default().set_aeron_dir(&dir_path); + let driver = MediaDriver::with_context(context).unwrap(); + + assert_eq!( + unsafe { CStr::from_ptr((*driver.c_context).aeron_dir) }.to_str(), + Ok(dir_path.to_str().unwrap()) + ); + drop(driver); + + // Attempting to start a media driver twice in rapid succession is guaranteed + // cause an issue because the new media driver must wait for a heartbeat timeout. + let context = DriverContext::default().set_aeron_dir(&dir_path); + let driver_res = MediaDriver::with_context(context); + + assert!(driver_res.is_err()); + assert_eq!( + driver_res.unwrap_err(), + DriverError { + code: 0, + msg: format!("could not recreate aeron dir {}: ", dir_path.display()) + } + ); + } +} diff --git a/src/lib.rs b/src/lib.rs index 966a11a..8a199bb 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,7 +1,7 @@ //! [Aeron](https://github.com/real-logic/aeron) client for Rust #![deny(missing_docs)] -mod context; +pub mod driver; /// Retrieve the C library version in (major, minor, patch) format pub fn aeron_version() -> (u32, u32, u32) { From 941c18bf37ba9a972addff324a69d881e6e67916 Mon Sep 17 00:00:00 2001 From: Bradlee Speice Date: Mon, 23 Sep 2019 23:27:03 -0400 Subject: [PATCH 02/40] Clean up the client context a bit We're not ready to make use of most of these options --- src/client.rs | 37 +++++++++++++++++++++++++++++++++++++ src/context.rs | 44 -------------------------------------------- src/driver.rs | 11 +++++++++++ src/lib.rs | 12 +----------- 4 files changed, 49 insertions(+), 55 deletions(-) create mode 100644 src/client.rs delete mode 100644 src/context.rs diff --git a/src/client.rs b/src/client.rs new file mode 100644 index 0000000..ceeb0f2 --- /dev/null +++ b/src/client.rs @@ -0,0 +1,37 @@ +//! Client library for Aeron. This encapsulates the logic needed to communicate +//! with the media driver, but does not manage the media driver itself. +use std::env; +use std::path::PathBuf; + +/// Context used to initialize the Aeron client +pub struct ClientContext { + aeron_dir: PathBuf, +} + +impl ClientContext { + fn get_user_name() -> String { + env::var("USER") + .or_else(|_| env::var("USERNAME")) + .unwrap_or("default".to_string()) + } + + /// Get the default folder used by the Media Driver to interact with clients + pub fn default_aeron_path() -> PathBuf { + let base_path = if cfg!(target_os = "linux") { + PathBuf::from("/dev/shm") + } else { + // Uses TMPDIR on Unix-like and GetTempPath on Windows + env::temp_dir() + }; + + base_path.join(format!("aeron-{}", ClientContext::get_user_name())) + } +} + +impl Default for ClientContext { + fn default() -> Self { + ClientContext { + aeron_dir: ClientContext::default_aeron_path(), + } + } +} diff --git a/src/context.rs b/src/context.rs deleted file mode 100644 index 0c1e5a9..0000000 --- a/src/context.rs +++ /dev/null @@ -1,44 +0,0 @@ -use std::env; -use std::path::PathBuf; - -const DEFAULT_MEDIA_DRIVER_TIMEOUT_MS: u16 = 10_000; -const DEFAULT_RESOURCE_LINGER_MS: u16 = 5_000; - -pub struct Context { - aeron_dir: PathBuf, - media_driver_timeout_ms: i32, - resource_linger_timeout_ms: i32, - use_conductor_agent_invoker: bool, - pre_touch_mapped_memory: bool, -} - -impl Context { - pub fn get_user_name() -> String { - env::var("USER") - .or_else(|_| env::var("USERNAME")) - .unwrap_or("default".to_string()) - } - - pub fn default_aeron_path() -> PathBuf { - let base_path = if cfg!(target_os = "linux") { - PathBuf::from("/dev/shm") - } else { - // Uses TMPDIR on Unix-like, and GetTempPath on Windows - env::temp_dir() - }; - - base_path.join(format!("aeron-{}", Context::get_user_name())) - } -} - -impl Default for Context { - fn default() -> Self { - Context { - aeron_dir: Context::default_aeron_path(), - media_driver_timeout_ms: DEFAULT_MEDIA_DRIVER_TIMEOUT_MS.into(), - resource_linger_timeout_ms: DEFAULT_RESOURCE_LINGER_MS.into(), - use_conductor_agent_invoker: false, - pre_touch_mapped_memory: false, - } - } -} diff --git a/src/driver.rs b/src/driver.rs index ded7b5b..a377a85 100644 --- a/src/driver.rs +++ b/src/driver.rs @@ -79,6 +79,17 @@ impl MediaDriver { pub fn new() -> Result { Self::with_context(DriverContext::default()) } + + /// Retrieve the C library version in (major, minor, patch) format + pub fn driver_version() -> (u32, u32, u32) { + unsafe { + ( + aeron_version_major() as u32, + aeron_version_minor() as u32, + aeron_version_patch() as u32, + ) + } + } } impl Drop for MediaDriver { diff --git a/src/lib.rs b/src/lib.rs index 8a199bb..1895dfa 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,15 +1,5 @@ //! [Aeron](https://github.com/real-logic/aeron) client for Rust #![deny(missing_docs)] +pub mod client; pub mod driver; - -/// Retrieve the C library version in (major, minor, patch) format -pub fn aeron_version() -> (u32, u32, u32) { - unsafe { - ( - aeron_driver_sys::aeron_version_major() as u32, - aeron_driver_sys::aeron_version_minor() as u32, - aeron_driver_sys::aeron_version_patch() as u32, - ) - } -} From 28900c330ebf481ad9c77f4c849ea075504bb481 Mon Sep 17 00:00:00 2001 From: Bradlee Speice Date: Tue, 24 Sep 2019 00:25:12 -0400 Subject: [PATCH 03/40] Windows compatibility fixes --- src/driver.rs | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/src/driver.rs b/src/driver.rs index a377a85..0ca3a95 100644 --- a/src/driver.rs +++ b/src/driver.rs @@ -1,7 +1,6 @@ //! Bindings for the C Media Driver use std::ffi::{CStr, CString}; -use std::os::unix::ffi::OsStrExt; use std::path::Path; use std::ptr; @@ -24,8 +23,10 @@ impl DriverContext { /// Set the Aeron directory path that will be used for storing the files /// Aeron uses to communicate with clients. pub fn set_aeron_dir(mut self, path: &Path) -> Self { + // UNWRAP: Fails only if the path is non-UTF8 + let path_bytes = path.to_str().unwrap().as_bytes(); // UNWRAP: Fails only if there is a null byte in the provided path - let c_string = CString::new(path.as_os_str().as_bytes()).unwrap(); + let c_string = CString::new(path_bytes).unwrap(); self.aeron_dir = Some(c_string); self } @@ -131,12 +132,19 @@ mod tests { let context = DriverContext::default().set_aeron_dir(&dir_path); let driver_res = MediaDriver::with_context(context); + // TODO: Why is the error message behavior different on Windows? + let expected_message = if cfg!(target_os = "windows") { + String::new() + } else { + format!("could not recreate aeron dir {}: ", dir_path.display()) + }; + assert!(driver_res.is_err()); assert_eq!( driver_res.unwrap_err(), DriverError { code: 0, - msg: format!("could not recreate aeron dir {}: ", dir_path.display()) + msg: expected_message } ); } From 8371b2563853415c9caa0aaba94deadb26521939 Mon Sep 17 00:00:00 2001 From: Bradlee Speice Date: Tue, 24 Sep 2019 19:41:14 -0400 Subject: [PATCH 04/40] Tweak to enum generation No more gigantic weird names --- aeron_driver-sys/build.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/aeron_driver-sys/build.rs b/aeron_driver-sys/build.rs index a8fa6d2..33643f4 100644 --- a/aeron_driver-sys/build.rs +++ b/aeron_driver-sys/build.rs @@ -97,6 +97,7 @@ pub fn main() { .header("bindings.h") .whitelist_function("aeron_.*") .whitelist_type("aeron_.*") + .constified_enum_module("aeron_.*_enum") .generate() .expect("Unable to generate aeron_driver bindings"); From 3cd0aa4f55c7255f07088824817de535566c8771 Mon Sep 17 00:00:00 2001 From: Bradlee Speice Date: Wed, 25 Sep 2019 00:19:04 -0400 Subject: [PATCH 05/40] Add in Rust-y bindings for some of the `#define` constants --- aeron_driver-sys/bindings.h | 3 ++- aeron_driver-sys/build.rs | 1 + aeron_driver-sys/src/lib.rs | 50 +++++++++++++++++++++++++++++++++++++ 3 files changed, 53 insertions(+), 1 deletion(-) diff --git a/aeron_driver-sys/bindings.h b/aeron_driver-sys/bindings.h index 39072ee..686e2df 100644 --- a/aeron_driver-sys/bindings.h +++ b/aeron_driver-sys/bindings.h @@ -1,3 +1,4 @@ #include -#include #include +#include +#include diff --git a/aeron_driver-sys/build.rs b/aeron_driver-sys/build.rs index 33643f4..451b4b2 100644 --- a/aeron_driver-sys/build.rs +++ b/aeron_driver-sys/build.rs @@ -97,6 +97,7 @@ pub fn main() { .header("bindings.h") .whitelist_function("aeron_.*") .whitelist_type("aeron_.*") + .whitelist_var("AERON_.*") .constified_enum_module("aeron_.*_enum") .generate() .expect("Unable to generate aeron_driver bindings"); diff --git a/aeron_driver-sys/src/lib.rs b/aeron_driver-sys/src/lib.rs index 9f6426e..cefe986 100644 --- a/aeron_driver-sys/src/lib.rs +++ b/aeron_driver-sys/src/lib.rs @@ -4,8 +4,50 @@ #![allow(clippy::all)] include!(concat!(env!("OUT_DIR"), "/bindings.rs")); +/// Construct a C-compatible enum out of a set of constants. +/// Commonly used for types in Aeron that have fixed values via `#define`, +/// but aren't actually enums (e.g. AERON_COMMAND_.*, AERON_ERROR_CODE_.*). +/// Behavior is ultimately very similar to `num::FromPrimitive`. +macro_rules! define_enum { + ($(#[$outer:meta])*, $name:ident, [$(($left:ident, $right:expr)),*]) => { + #[repr(u32)] + #[derive(Debug, PartialEq)] + $(#[$outer])* + pub enum $name { + $($left = $right),* + } + + impl ::std::convert::TryFrom for $name { + type Error = (); + fn try_from(val: u32) -> Result<$name, ()> { + match val { + $(v if v == $name::$left as u32 => Ok($name::$left)),*, + _ => Err(()) + } + } + } + } +} + +define_enum!( + #[doc = "Command codes used when interacting with the Media Driver"], + AeronCommand, [ + (AddPublication, AERON_COMMAND_ADD_PUBLICATION), + (RemovePublication, AERON_COMMAND_REMOVE_PUBLICATION) + ] +); + +define_enum!( + #[doc = "Command codes used when interacting with the Media Driver"], + AeronErrorCode, [ + (GenericError, AERON_ERROR_CODE_GENERIC_ERROR) + ] +); + #[cfg(test)] mod tests { + use crate::*; + use std::convert::TryInto; #[test] fn version_check() { @@ -16,4 +58,12 @@ mod tests { assert_eq!(minor, 21); assert_eq!(patch, 2); } + + #[test] + fn define_enum_try() { + assert_eq!( + Ok(AeronCommand::AddPublication), + AERON_COMMAND_ADD_PUBLICATION.try_into() + ); + } } From 774a9a6b309d08d3d1f26e0b93daf589dc605b77 Mon Sep 17 00:00:00 2001 From: Bradlee Speice Date: Wed, 25 Sep 2019 00:24:42 -0400 Subject: [PATCH 06/40] Doc fixes --- aeron_driver-sys/src/lib.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/aeron_driver-sys/src/lib.rs b/aeron_driver-sys/src/lib.rs index cefe986..b4608e3 100644 --- a/aeron_driver-sys/src/lib.rs +++ b/aeron_driver-sys/src/lib.rs @@ -38,8 +38,8 @@ define_enum!( ); define_enum!( - #[doc = "Command codes used when interacting with the Media Driver"], - AeronErrorCode, [ + #[doc = "Error codes used by the Media Driver control protocol"], + AeronControlErrorCode, [ (GenericError, AERON_ERROR_CODE_GENERIC_ERROR) ] ); From 8b0ee22e1b84b3e942325a6f9874034893f7fb6a Mon Sep 17 00:00:00 2001 From: Bradlee Speice Date: Wed, 25 Sep 2019 00:26:19 -0400 Subject: [PATCH 07/40] Clippy cleanup --- src/client.rs | 2 +- src/driver.rs | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/client.rs b/src/client.rs index ceeb0f2..0702c24 100644 --- a/src/client.rs +++ b/src/client.rs @@ -12,7 +12,7 @@ impl ClientContext { fn get_user_name() -> String { env::var("USER") .or_else(|_| env::var("USERNAME")) - .unwrap_or("default".to_string()) + .unwrap_or_else(|_| "default".to_string()) } /// Get the default folder used by the Media Driver to interact with clients diff --git a/src/driver.rs b/src/driver.rs index 0ca3a95..26bde37 100644 --- a/src/driver.rs +++ b/src/driver.rs @@ -95,10 +95,10 @@ impl MediaDriver { impl Drop for MediaDriver { fn drop(&mut self) { - if self.c_driver != ptr::null_mut() { + if self.c_driver.is_null() { unsafe { aeron_op!(aeron_driver_close(self.c_driver)) }.unwrap(); } - if self.c_context != ptr::null_mut() { + if self.c_context.is_null() { unsafe { aeron_op!(aeron_driver_context_close(self.c_context)) }.unwrap(); } } From 78730221d169f04e3cd37a0986eaf8eb8105f934 Mon Sep 17 00:00:00 2001 From: Bradlee Speice Date: Wed, 25 Sep 2019 22:09:18 -0400 Subject: [PATCH 08/40] Fix a dumb mistake in null checking... D'oh. --- src/driver.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/driver.rs b/src/driver.rs index 26bde37..3876752 100644 --- a/src/driver.rs +++ b/src/driver.rs @@ -95,10 +95,10 @@ impl MediaDriver { impl Drop for MediaDriver { fn drop(&mut self) { - if self.c_driver.is_null() { + if !self.c_driver.is_null() { unsafe { aeron_op!(aeron_driver_close(self.c_driver)) }.unwrap(); } - if self.c_context.is_null() { + if !self.c_context.is_null() { unsafe { aeron_op!(aeron_driver_context_close(self.c_context)) }.unwrap(); } } From cef3a17d55d0de60cbaf91145fee397fcba6a26c Mon Sep 17 00:00:00 2001 From: Bradlee Speice Date: Wed, 25 Sep 2019 22:18:42 -0400 Subject: [PATCH 09/40] Move the control protocol enum to main client lib Side benefit: forcing documentation --- aeron_driver-sys/src/lib.rs | 48 ------------------------------ src/control_protocol.rs | 59 +++++++++++++++++++++++++++++++++++++ src/lib.rs | 1 + 3 files changed, 60 insertions(+), 48 deletions(-) create mode 100644 src/control_protocol.rs diff --git a/aeron_driver-sys/src/lib.rs b/aeron_driver-sys/src/lib.rs index b4608e3..386ea65 100644 --- a/aeron_driver-sys/src/lib.rs +++ b/aeron_driver-sys/src/lib.rs @@ -4,46 +4,6 @@ #![allow(clippy::all)] include!(concat!(env!("OUT_DIR"), "/bindings.rs")); -/// Construct a C-compatible enum out of a set of constants. -/// Commonly used for types in Aeron that have fixed values via `#define`, -/// but aren't actually enums (e.g. AERON_COMMAND_.*, AERON_ERROR_CODE_.*). -/// Behavior is ultimately very similar to `num::FromPrimitive`. -macro_rules! define_enum { - ($(#[$outer:meta])*, $name:ident, [$(($left:ident, $right:expr)),*]) => { - #[repr(u32)] - #[derive(Debug, PartialEq)] - $(#[$outer])* - pub enum $name { - $($left = $right),* - } - - impl ::std::convert::TryFrom for $name { - type Error = (); - fn try_from(val: u32) -> Result<$name, ()> { - match val { - $(v if v == $name::$left as u32 => Ok($name::$left)),*, - _ => Err(()) - } - } - } - } -} - -define_enum!( - #[doc = "Command codes used when interacting with the Media Driver"], - AeronCommand, [ - (AddPublication, AERON_COMMAND_ADD_PUBLICATION), - (RemovePublication, AERON_COMMAND_REMOVE_PUBLICATION) - ] -); - -define_enum!( - #[doc = "Error codes used by the Media Driver control protocol"], - AeronControlErrorCode, [ - (GenericError, AERON_ERROR_CODE_GENERIC_ERROR) - ] -); - #[cfg(test)] mod tests { use crate::*; @@ -58,12 +18,4 @@ mod tests { assert_eq!(minor, 21); assert_eq!(patch, 2); } - - #[test] - fn define_enum_try() { - assert_eq!( - Ok(AeronCommand::AddPublication), - AERON_COMMAND_ADD_PUBLICATION.try_into() - ); - } } diff --git a/src/control_protocol.rs b/src/control_protocol.rs new file mode 100644 index 0000000..a8c8bea --- /dev/null +++ b/src/control_protocol.rs @@ -0,0 +1,59 @@ +//! Utilities for interacting with the control protocol of the Media Driver +use aeron_driver_sys::*; + +/// Construct a C-compatible enum out of a set of constants. +/// Commonly used for types in Aeron that have fixed values via `#define`, +/// but aren't actually enums (e.g. AERON_COMMAND_.*, AERON_ERROR_CODE_.*). +/// Behavior is ultimately very similar to `num::FromPrimitive`. +macro_rules! define_enum { + ( + $(#[$outer:meta])* + $name:ident, [$( + $(#[$inner:meta]),* + $left:ident = $right:ident + ),+] + ) => { + #[repr(u32)] + #[derive(Debug, PartialEq)] + $(#[$outer])* + pub enum $name {$( + $(#[$inner])* + $left = $right, + )*} + + impl ::std::convert::TryFrom for $name { + type Error = (); + fn try_from(val: u32) -> Result<$name, ()> { + match val { + $(v if v == $name::$left as u32 => Ok($name::$left)),*, + _ => Err(()) + } + } + } + } +} + +define_enum!( + #[doc = "Commands sent from clients to the Media Driver"] + ClientCommand, + [ + #[doc = "Client declaring a new publication"] + AddPublication = AERON_COMMAND_ADD_PUBLICATION, + #[doc = "Client removing a publication"] + RemovePublication = AERON_COMMAND_REMOVE_PUBLICATION + ] +); + +#[cfg(test)] +mod tests { + use crate::control_protocol::ClientCommand; + use std::convert::TryInto; + + #[test] + fn client_command_convert() { + assert_eq!( + Ok(ClientCommand::AddPublication), + ::aeron_driver_sys::AERON_COMMAND_ADD_PUBLICATION.try_into() + ) + } +} diff --git a/src/lib.rs b/src/lib.rs index 1895dfa..d988f30 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -2,4 +2,5 @@ #![deny(missing_docs)] pub mod client; +pub mod control_protocol; pub mod driver; From dcb0af4f5e873e26b5f95ff15467e2a66b65aae4 Mon Sep 17 00:00:00 2001 From: Bradlee Speice Date: Wed, 25 Sep 2019 22:31:10 -0400 Subject: [PATCH 10/40] Make the macro look like an enum declaration Interestingly enough, `cargo fmt` now recognizes and formats it --- src/control_protocol.rs | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/src/control_protocol.rs b/src/control_protocol.rs index a8c8bea..343cfb2 100644 --- a/src/control_protocol.rs +++ b/src/control_protocol.rs @@ -8,10 +8,10 @@ use aeron_driver_sys::*; macro_rules! define_enum { ( $(#[$outer:meta])* - $name:ident, [$( + pub enum $name:ident {$( $(#[$inner:meta]),* - $left:ident = $right:ident - ),+] + $left:ident = $right:ident, + )+} ) => { #[repr(u32)] #[derive(Debug, PartialEq)] @@ -35,13 +35,12 @@ macro_rules! define_enum { define_enum!( #[doc = "Commands sent from clients to the Media Driver"] - ClientCommand, - [ + pub enum ClientCommand { #[doc = "Client declaring a new publication"] AddPublication = AERON_COMMAND_ADD_PUBLICATION, #[doc = "Client removing a publication"] - RemovePublication = AERON_COMMAND_REMOVE_PUBLICATION - ] + RemovePublication = AERON_COMMAND_REMOVE_PUBLICATION, + } ); #[cfg(test)] From 4085574b91ed580fe376a1de3d860dd48c645891 Mon Sep 17 00:00:00 2001 From: Bradlee Speice Date: Wed, 25 Sep 2019 23:02:28 -0400 Subject: [PATCH 11/40] Add all the control protocol enums --- src/control_protocol.rs | 54 +++++++++++++++++++++++++++++++++++++++-- 1 file changed, 52 insertions(+), 2 deletions(-) diff --git a/src/control_protocol.rs b/src/control_protocol.rs index 343cfb2..cb2e6a7 100644 --- a/src/control_protocol.rs +++ b/src/control_protocol.rs @@ -36,10 +36,60 @@ macro_rules! define_enum { define_enum!( #[doc = "Commands sent from clients to the Media Driver"] pub enum ClientCommand { - #[doc = "Client declaring a new publication"] + #[doc = "Add a Publication"] AddPublication = AERON_COMMAND_ADD_PUBLICATION, - #[doc = "Client removing a publication"] + #[doc = "Remove a Publication"] RemovePublication = AERON_COMMAND_REMOVE_PUBLICATION, + #[doc = "Add an Exclusive Publication"] + AddExclusivePublication = AERON_COMMAND_ADD_EXCLUSIVE_PUBLICATION, + #[doc = "Add a Subscriber"] + AddSubscription = AERON_COMMAND_ADD_SUBSCRIPTION, + #[doc = "Remove a Subscriber"] + RemoveSubscription = AERON_COMMAND_REMOVE_SUBSCRIPTION, + #[doc = "Keepalaive from Client"] + ClientKeepalive = AERON_COMMAND_CLIENT_KEEPALIVE, + #[doc = "Add Destination to an existing Publication"] + AddDestination = AERON_COMMAND_ADD_DESTINATION, + #[doc = "Remove Destination from an existing Publication"] + RemoveDestination = AERON_COMMAND_REMOVE_DESTINATION, + #[doc = "Add a Counter to the counters manager"] + AddCounter = AERON_COMMAND_ADD_COUNTER, + #[doc = "Remove a Counter from the counters manager"] + RemoveCounter = AERON_COMMAND_REMOVE_COUNTER, + #[doc = "Close indication from Client"] + ClientClose = AERON_COMMAND_CLIENT_CLOSE, + #[doc = "Add Destination for existing Subscription"] + AddRcvDestination = AERON_COMMAND_ADD_RCV_DESTINATION, + #[doc = "Remove Destination for existing Subscription"] + RemoveRcvDestination = AERON_COMMAND_REMOVE_RCV_DESTINATION, + #[doc = "Request the driver to terminate"] + TerminateDriver = AERON_COMMAND_TERMINATE_DRIVER, + } +); + +define_enum!( + #[doc = "Responses from the Media Driver to client commands"] + pub enum DriverResponse { + #[doc = "Error Response as a result of attempting to process a client command operation"] + OnError = AERON_RESPONSE_ON_ERROR, + #[doc = "Subscribed Image buffers are available notification"] + OnAvailableImage = AERON_RESPONSE_ON_AVAILABLE_IMAGE, + #[doc = "New Publication buffers are ready notification"] + OnPublicationReady = AERON_RESPONSE_ON_PUBLICATION_READY, + #[doc = "Operation has succeeded"] + OnOperationSuccess = AERON_RESPONSE_ON_OPERATION_SUCCESS, + #[doc = "Inform client of timeout and removal of an inactive Image"] + OnUnavailableImage = AERON_RESPONSE_ON_UNAVAILABLE_IMAGE, + #[doc = "New Exclusive Publication buffers are ready notification"] + OnExclusivePublicationReady = AERON_RESPONSE_ON_EXCLUSIVE_PUBLICATION_READY, + #[doc = "New Subscription is ready notification"] + OnSubscriptionReady = AERON_RESPONSE_ON_SUBSCRIPTION_READY, + #[doc = "New counter is ready notification"] + OnCounterReady = AERON_RESPONSE_ON_COUNTER_READY, + #[doc = "Inform clients of removal of counter"] + OnUnavailableCounter = AERON_RESPONSE_ON_UNAVAILABLE_COUNTER, + #[doc = "Inform clients of client timeout"] + OnClientTimeout = AERON_RESPONSE_ON_CLIENT_TIMEOUT, } ); From 27a8ce0dd49a89e380cebbf44e6b4e765724a536 Mon Sep 17 00:00:00 2001 From: Bradlee Speice Date: Wed, 2 Oct 2019 22:54:44 -0400 Subject: [PATCH 12/40] Start reading the CnC file --- Cargo.toml | 1 + src/client/cnc_descriptor.rs | 92 ++++++++++++++++++++++++++++ src/{client.rs => client/context.rs} | 4 +- src/client/mod.rs | 5 ++ 4 files changed, 100 insertions(+), 2 deletions(-) create mode 100644 src/client/cnc_descriptor.rs rename src/{client.rs => client/context.rs} (92%) create mode 100644 src/client/mod.rs diff --git a/Cargo.toml b/Cargo.toml index 492aaf4..a8ab2f4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,6 +13,7 @@ maintenance = { status = "actively-developed" } [dependencies] aeron_driver-sys = { path = "./aeron_driver-sys" } +memmap = "0.7" [dev-dependencies] clap = "2.33" diff --git a/src/client/cnc_descriptor.rs b/src/client/cnc_descriptor.rs new file mode 100644 index 0000000..42ea7b0 --- /dev/null +++ b/src/client/cnc_descriptor.rs @@ -0,0 +1,92 @@ +//! Description of the command and control file used to communicate between the Media Driver +//! and its clients. +//! +//! File layout: +//! +//! ```text +//! +-----------------------------+ +//! | Meta Data | +//! +-----------------------------+ +//! | to-driver Buffer | +//! +-----------------------------+ +//! | to-clients Buffer | +//! +-----------------------------+ +//! | Counters Metadata Buffer | +//! +-----------------------------+ +//! | Counters Values Buffer | +//! +-----------------------------+ +//! | Error Log | +//! +-----------------------------+ +//! ``` + +/// The CnC file metadata header. Layout: +/// +/// ```text +/// 0 1 2 3 +/// 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 +/// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +/// | Aeron CnC Version | +/// +---------------------------------------------------------------+ +/// | to-driver buffer length | +/// +---------------------------------------------------------------+ +/// | to-clients buffer length | +/// +---------------------------------------------------------------+ +/// | Counters Metadata buffer length | +/// +---------------------------------------------------------------+ +/// | Counters Values buffer length | +/// +---------------------------------------------------------------+ +/// | Error Log buffer length | +/// +---------------------------------------------------------------+ +/// | Client Liveness Timeout | +/// | | +/// +---------------------------------------------------------------+ +/// | Driver Start Timestamp | +/// | | +/// +---------------------------------------------------------------+ +/// | Driver PID | +/// | | +/// +---------------------------------------------------------------+ +/// ``` +#[repr(C, align(4))] +pub struct MetaDataDefinition { + cnc_version: i32, + _to_driver_buffer_length: i32, + _to_client_buffer_length: i32, + _counter_metadata_buffer_length: i32, + _counter_values_buffer_length: i32, + _error_log_buffer_length: i32, + _client_liveness_timeout: i64, + _start_timestamp: i64, + _pid: i64, +} + +#[cfg(test)] +mod tests { + use crate::client::cnc_descriptor::MetaDataDefinition; + use crate::driver::{DriverContext, MediaDriver}; + use memmap::MmapOptions; + use std::fs::File; + use tempfile::tempdir; + + #[test] + fn read_cnc_version() { + let dir = tempdir().unwrap(); + let dir_path = dir.as_ref().to_path_buf(); + dir.close().unwrap(); + + let context = DriverContext::default().set_aeron_dir(&dir_path); + let _driver = MediaDriver::with_context(context).unwrap(); + + // Open the CnC location + let cnc_path = dir_path.join("cnc.dat"); + let cnc_file = File::open(&cnc_path).expect("Unable to open CnC file"); + let mmap = unsafe { + MmapOptions::default() + .map(&cnc_file) + .expect("Unable to memory map CnC file") + }; + + let metadata: &MetaDataDefinition = unsafe { &*(mmap.as_ptr().cast()) }; + assert_eq!(metadata.cnc_version, 16); + } +} diff --git a/src/client.rs b/src/client/context.rs similarity index 92% rename from src/client.rs rename to src/client/context.rs index 0702c24..d011493 100644 --- a/src/client.rs +++ b/src/client/context.rs @@ -5,7 +5,7 @@ use std::path::PathBuf; /// Context used to initialize the Aeron client pub struct ClientContext { - aeron_dir: PathBuf, + _aeron_dir: PathBuf, } impl ClientContext { @@ -31,7 +31,7 @@ impl ClientContext { impl Default for ClientContext { fn default() -> Self { ClientContext { - aeron_dir: ClientContext::default_aeron_path(), + _aeron_dir: ClientContext::default_aeron_path(), } } } diff --git a/src/client/mod.rs b/src/client/mod.rs new file mode 100644 index 0000000..969153c --- /dev/null +++ b/src/client/mod.rs @@ -0,0 +1,5 @@ +//! Aeron client +//! +//! These are the modules necessary to construct a functioning Aeron client +pub mod cnc_descriptor; +pub mod context; From 49bfb1edb977c36fceaf0a182408aa15e667fd16 Mon Sep 17 00:00:00 2001 From: Bradlee Speice Date: Wed, 2 Oct 2019 23:11:44 -0400 Subject: [PATCH 13/40] Minor cleanup of the CnC version check --- src/client/cnc_descriptor.rs | 7 +++++-- src/lib.rs | 14 ++++++++++++++ 2 files changed, 19 insertions(+), 2 deletions(-) diff --git a/src/client/cnc_descriptor.rs b/src/client/cnc_descriptor.rs index 42ea7b0..6385354 100644 --- a/src/client/cnc_descriptor.rs +++ b/src/client/cnc_descriptor.rs @@ -60,9 +60,12 @@ pub struct MetaDataDefinition { _pid: i64, } +/// Version code for the Aeron CnC file format +pub const CNC_VERSION: i32 = crate::sematic_version_compose(0, 0, 16); + #[cfg(test)] mod tests { - use crate::client::cnc_descriptor::MetaDataDefinition; + use crate::client::cnc_descriptor::{MetaDataDefinition, CNC_VERSION}; use crate::driver::{DriverContext, MediaDriver}; use memmap::MmapOptions; use std::fs::File; @@ -87,6 +90,6 @@ mod tests { }; let metadata: &MetaDataDefinition = unsafe { &*(mmap.as_ptr().cast()) }; - assert_eq!(metadata.cnc_version, 16); + assert_eq!(metadata.cnc_version, CNC_VERSION); } } diff --git a/src/lib.rs b/src/lib.rs index d988f30..f593f40 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -4,3 +4,17 @@ pub mod client; pub mod control_protocol; pub mod driver; + +const fn sematic_version_compose(major: u8, minor: u8, patch: u8) -> i32 { + (major as i32) << 16 | (minor as i32) << 8 | (patch as i32) +} + +#[cfg(test)] +mod tests { + use crate::sematic_version_compose; + + #[test] + fn version_compose_cnc() { + assert_eq!(sematic_version_compose(0, 0, 16), 16); + } +} From c982a63cd97a99195d259785061d8ccad6161e36 Mon Sep 17 00:00:00 2001 From: Bradlee Speice Date: Wed, 2 Oct 2019 23:27:59 -0400 Subject: [PATCH 14/40] More minor cleanup --- {examples => aeron_driver-sys/examples}/aeronmd.rs | 0 src/client/cnc_descriptor.rs | 9 ++++++--- 2 files changed, 6 insertions(+), 3 deletions(-) rename {examples => aeron_driver-sys/examples}/aeronmd.rs (100%) diff --git a/examples/aeronmd.rs b/aeron_driver-sys/examples/aeronmd.rs similarity index 100% rename from examples/aeronmd.rs rename to aeron_driver-sys/examples/aeronmd.rs diff --git a/src/client/cnc_descriptor.rs b/src/client/cnc_descriptor.rs index 6385354..ae0d2ed 100644 --- a/src/client/cnc_descriptor.rs +++ b/src/client/cnc_descriptor.rs @@ -60,12 +60,15 @@ pub struct MetaDataDefinition { _pid: i64, } -/// Version code for the Aeron CnC file format +/// Version code for the Aeron CnC file format that this client is compatible with pub const CNC_VERSION: i32 = crate::sematic_version_compose(0, 0, 16); +/// Filename for the CnC file located in the Aeron directory +pub const CNC_FILE: &str = "cnc.dat"; + #[cfg(test)] mod tests { - use crate::client::cnc_descriptor::{MetaDataDefinition, CNC_VERSION}; + use crate::client::cnc_descriptor::{MetaDataDefinition, CNC_FILE, CNC_VERSION}; use crate::driver::{DriverContext, MediaDriver}; use memmap::MmapOptions; use std::fs::File; @@ -81,7 +84,7 @@ mod tests { let _driver = MediaDriver::with_context(context).unwrap(); // Open the CnC location - let cnc_path = dir_path.join("cnc.dat"); + let cnc_path = dir_path.join(CNC_FILE); let cnc_file = File::open(&cnc_path).expect("Unable to open CnC file"); let mmap = unsafe { MmapOptions::default() From 0e1462fe9e58b959956084434a43edd59d0b2ff7 Mon Sep 17 00:00:00 2001 From: Bradlee Speice Date: Wed, 2 Oct 2019 23:42:05 -0400 Subject: [PATCH 15/40] Fix some Cargo.toml issues --- Cargo.toml | 4 +--- aeron_driver-sys/Cargo.toml | 5 +++++ aeron_driver-sys/src/lib.rs | 2 -- 3 files changed, 6 insertions(+), 5 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index a8ab2f4..cf7e491 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,6 +16,4 @@ aeron_driver-sys = { path = "./aeron_driver-sys" } memmap = "0.7" [dev-dependencies] -clap = "2.33" -ctrlc = "3.1.3" -tempfile = "3.1" +tempfile = "3.1" \ No newline at end of file diff --git a/aeron_driver-sys/Cargo.toml b/aeron_driver-sys/Cargo.toml index 8aac0d8..df22f6b 100644 --- a/aeron_driver-sys/Cargo.toml +++ b/aeron_driver-sys/Cargo.toml @@ -15,3 +15,8 @@ dunce = "1.0.0" [features] static = [] + +[dev-dependencies] +clap = "2.33" +ctrlc = "3.1.3" +tempfile = "3.1" diff --git a/aeron_driver-sys/src/lib.rs b/aeron_driver-sys/src/lib.rs index 386ea65..9f6426e 100644 --- a/aeron_driver-sys/src/lib.rs +++ b/aeron_driver-sys/src/lib.rs @@ -6,8 +6,6 @@ include!(concat!(env!("OUT_DIR"), "/bindings.rs")); #[cfg(test)] mod tests { - use crate::*; - use std::convert::TryInto; #[test] fn version_check() { From c5b227f328fcce4ec64a028c102fb13669c38035 Mon Sep 17 00:00:00 2001 From: Bradlee Speice Date: Thu, 3 Oct 2019 00:32:37 -0400 Subject: [PATCH 16/40] Install luuid for Linux --- .travis.yml | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/.travis.yml b/.travis.yml index 4384fb8..358d7a3 100644 --- a/.travis.yml +++ b/.travis.yml @@ -6,6 +6,11 @@ os: - osx - windows +addons: + apt: + packages: + - uuid-dev + cache: - cargo From 5527495b095a7464949482af11f97c49400f5f7a Mon Sep 17 00:00:00 2001 From: Bradlee Speice Date: Thu, 3 Oct 2019 00:47:06 -0400 Subject: [PATCH 17/40] Reversion and cleanup Moving the `aeronmd` example to the sys crate caused issues for static linking because of the `tempfile` crate. Revert "Install luuid for Linux" Revert "Fix some Cargo.toml issues" Revert "More minor cleanup" --- .travis.yml | 5 ----- Cargo.toml | 4 +++- aeron_driver-sys/Cargo.toml | 5 ----- aeron_driver-sys/src/lib.rs | 2 ++ {aeron_driver-sys/examples => examples}/aeronmd.rs | 0 src/client/cnc_descriptor.rs | 2 +- 6 files changed, 6 insertions(+), 12 deletions(-) rename {aeron_driver-sys/examples => examples}/aeronmd.rs (100%) diff --git a/.travis.yml b/.travis.yml index 358d7a3..4384fb8 100644 --- a/.travis.yml +++ b/.travis.yml @@ -6,11 +6,6 @@ os: - osx - windows -addons: - apt: - packages: - - uuid-dev - cache: - cargo diff --git a/Cargo.toml b/Cargo.toml index cf7e491..a8ab2f4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,4 +16,6 @@ aeron_driver-sys = { path = "./aeron_driver-sys" } memmap = "0.7" [dev-dependencies] -tempfile = "3.1" \ No newline at end of file +clap = "2.33" +ctrlc = "3.1.3" +tempfile = "3.1" diff --git a/aeron_driver-sys/Cargo.toml b/aeron_driver-sys/Cargo.toml index df22f6b..8aac0d8 100644 --- a/aeron_driver-sys/Cargo.toml +++ b/aeron_driver-sys/Cargo.toml @@ -15,8 +15,3 @@ dunce = "1.0.0" [features] static = [] - -[dev-dependencies] -clap = "2.33" -ctrlc = "3.1.3" -tempfile = "3.1" diff --git a/aeron_driver-sys/src/lib.rs b/aeron_driver-sys/src/lib.rs index 9f6426e..386ea65 100644 --- a/aeron_driver-sys/src/lib.rs +++ b/aeron_driver-sys/src/lib.rs @@ -6,6 +6,8 @@ include!(concat!(env!("OUT_DIR"), "/bindings.rs")); #[cfg(test)] mod tests { + use crate::*; + use std::convert::TryInto; #[test] fn version_check() { diff --git a/aeron_driver-sys/examples/aeronmd.rs b/examples/aeronmd.rs similarity index 100% rename from aeron_driver-sys/examples/aeronmd.rs rename to examples/aeronmd.rs diff --git a/src/client/cnc_descriptor.rs b/src/client/cnc_descriptor.rs index ae0d2ed..1c02318 100644 --- a/src/client/cnc_descriptor.rs +++ b/src/client/cnc_descriptor.rs @@ -60,7 +60,7 @@ pub struct MetaDataDefinition { _pid: i64, } -/// Version code for the Aeron CnC file format that this client is compatible with +/// Version code for the Aeron CnC file format pub const CNC_VERSION: i32 = crate::sematic_version_compose(0, 0, 16); /// Filename for the CnC file located in the Aeron directory From c463c961706f71832cfb8eb5d056353616d19c75 Mon Sep 17 00:00:00 2001 From: Bradlee Speice Date: Thu, 3 Oct 2019 22:10:34 -0400 Subject: [PATCH 18/40] Start work on the atomic buffer C++ Aeron doesn't seem to care about alignment, so I guess I won't either? --- src/client/concurrent/atomic_buffer.rs | 83 ++++++++++++++++++++++++++ src/client/concurrent/mod.rs | 5 ++ src/client/concurrent/ring_buffer.rs | 81 +++++++++++++++++++++++++ src/client/driver_proxy.rs | 12 ++++ src/client/mod.rs | 2 + src/lib.rs | 4 ++ src/util.rs | 27 +++++++++ 7 files changed, 214 insertions(+) create mode 100644 src/client/concurrent/atomic_buffer.rs create mode 100644 src/client/concurrent/mod.rs create mode 100644 src/client/concurrent/ring_buffer.rs create mode 100644 src/client/driver_proxy.rs create mode 100644 src/util.rs diff --git a/src/client/concurrent/atomic_buffer.rs b/src/client/concurrent/atomic_buffer.rs new file mode 100644 index 0000000..179f24e --- /dev/null +++ b/src/client/concurrent/atomic_buffer.rs @@ -0,0 +1,83 @@ +//! Buffer that is safe to use in a multi-process/multi-thread context. Typically used for +//! handling atomic updates of memory-mapped buffers. +use std::mem::size_of; +use std::ops::Deref; +use std::sync::atomic::{AtomicI64, Ordering}; + +use crate::util::{AeronError, IndexT, Result}; + +/// Wrapper for atomic operations around an underlying byte buffer +pub struct AtomicBuffer<'a> { + buffer: &'a mut [u8], +} + +impl<'a> Deref for AtomicBuffer<'a> { + type Target = [u8]; + + fn deref(&self) -> &Self::Target { + self.buffer + } +} + +impl<'a> AtomicBuffer<'a> { + /// Create an `AtomicBuffer` as a view on an underlying byte slice + pub fn wrap(buffer: &'a mut [u8]) -> Self { + AtomicBuffer { buffer } + } + + fn bounds_check(&self, offset: IndexT, size: usize) -> Result<()> { + if self.buffer.len() - offset as usize > size { + Err(AeronError::OutOfBounds) + } else { + Ok(()) + } + } + + /// Atomically fetch the current value at an offset, and increment by delta + #[allow(clippy::cast_ptr_alignment)] + pub fn get_and_add_i64(&self, offset: IndexT, delta: i64) -> Result { + self.bounds_check(offset, size_of::()).map(|_| { + let a: &AtomicI64 = + unsafe { &*(self.buffer.as_ptr().offset(offset as isize) as *const AtomicI64) }; + println!("AtomicI64: {:p}", a); + a.fetch_add(delta, Ordering::SeqCst) + }) + } +} + +#[cfg(test)] +mod tests { + use memmap::MmapOptions; + use std::sync::atomic::{AtomicU64, Ordering}; + + use crate::client::concurrent::atomic_buffer::AtomicBuffer; + + #[test] + fn mmap_to_atomic() { + let mut mmap = MmapOptions::new() + .len(24) + .map_anon() + .expect("Unable to map anonymous memory"); + AtomicBuffer::wrap(&mut mmap); + } + + #[test] + fn primitive_atomic_equivalent() { + let value: u64 = 24; + + let val_ptr = &value as *const u64; + let a_ptr = val_ptr as *const AtomicU64; + let a: &AtomicU64 = unsafe { &*a_ptr }; + + assert_eq!(value, (*a).load(Ordering::SeqCst)); + } + + #[test] + fn atomic_i64_increment() { + let mut buf = [16, 0, 0, 0, 0, 0, 0, 0]; + + let atomic_buf = AtomicBuffer::wrap(&mut buf[..]); + assert_eq!(atomic_buf.get_and_add_i64(0, 1), Ok(16)); + assert_eq!(atomic_buf.get_and_add_i64(0, 0), Ok(17)); + } +} diff --git a/src/client/concurrent/mod.rs b/src/client/concurrent/mod.rs new file mode 100644 index 0000000..06eec65 --- /dev/null +++ b/src/client/concurrent/mod.rs @@ -0,0 +1,5 @@ +//! Module for handling safe interactions among the multiple clients making use +//! of a single Media Driver + +pub mod atomic_buffer; +pub mod ring_buffer; diff --git a/src/client/concurrent/ring_buffer.rs b/src/client/concurrent/ring_buffer.rs new file mode 100644 index 0000000..f13ae53 --- /dev/null +++ b/src/client/concurrent/ring_buffer.rs @@ -0,0 +1,81 @@ +//! Ring buffer wrapper for communicating with the Media Driver +use crate::client::concurrent::atomic_buffer::AtomicBuffer; +use crate::util::{IndexT, Result}; + +/// Description of the Ring Buffer schema. Each Ring Buffer looks like: +/// +/// ```text +/// 0 1 2 3 +/// 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 +/// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +/// | Buffer Data ... +/// ... | +/// +---------------------------------------------------------------+ +/// | | +/// | Tail Position | +/// | | +/// | | +/// +---------------------------------------------------------------+ +/// | | +/// | Head Cache Position | +/// | | +/// | | +/// +---------------------------------------------------------------+ +/// | | +/// | Head Position | +/// | | +/// | | +/// +---------------------------------------------------------------+ +/// | | +/// | Correlation ID Counter | +/// | | +/// | | +/// +---------------------------------------------------------------+ +/// | | +/// | Consumer Heartbeat | +/// | | +/// | | +/// +---------------------------------------------------------------+ +/// ``` +pub mod descriptor { + use crate::client::concurrent::atomic_buffer::AtomicBuffer; + use crate::util::AeronError::IllegalArgument; + use crate::util::{is_power_of_two, IndexT, Result, CACHE_LINE_LENGTH}; + + /// Offset of the correlation id counter, as measured in bytes past + /// the start of the ring buffer metadata trailer + pub const CORRELATION_COUNTER_OFFSET: usize = CACHE_LINE_LENGTH * 8; + + /// Total size of the ring buffer metadata trailer + pub const TRAILER_LENGTH: usize = CACHE_LINE_LENGTH * 12; + + /// Verify the capacity of a buffer is legal for use as a ring buffer. + /// Returns the actual buffer capacity once ring buffer metadata has been removed. + pub fn check_capacity(buffer: &AtomicBuffer<'_>) -> Result { + let capacity = (buffer.len() - TRAILER_LENGTH) as IndexT; + if is_power_of_two(capacity) { + Ok(capacity) + } else { + Err(IllegalArgument) + } + } +} + +/// Multi-producer, single-consumer ring buffer implementation. +pub struct ManyToOneRingBuffer<'a> { + _buffer: AtomicBuffer<'a>, + _capacity: IndexT, + _correlation_counter_offset: IndexT, +} + +impl<'a> ManyToOneRingBuffer<'a> { + /// Create a many-to-one ring buffer from an underlying atomic buffer + pub fn wrap(buffer: AtomicBuffer<'a>) -> Result { + descriptor::check_capacity(&buffer).map(|capacity| ManyToOneRingBuffer { + _buffer: buffer, + _capacity: capacity, + _correlation_counter_offset: capacity + + descriptor::CORRELATION_COUNTER_OFFSET as IndexT, + }) + } +} diff --git a/src/client/driver_proxy.rs b/src/client/driver_proxy.rs new file mode 100644 index 0000000..b1a84e4 --- /dev/null +++ b/src/client/driver_proxy.rs @@ -0,0 +1,12 @@ +//! Proxy object for interacting with the Media Driver. Handles operations +//! involving the command-and-control file protocol. + +use crate::client::concurrent::ring_buffer::ManyToOneRingBuffer; + +/// Proxy object for operations involving the Media Driver +pub struct DriverProxy<'a> { + _to_driver: ManyToOneRingBuffer<'a>, + _client_id: i64, +} + +impl<'a> DriverProxy<'a> {} diff --git a/src/client/mod.rs b/src/client/mod.rs index 969153c..b00370e 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -2,4 +2,6 @@ //! //! These are the modules necessary to construct a functioning Aeron client pub mod cnc_descriptor; +pub mod concurrent; pub mod context; +pub mod driver_proxy; diff --git a/src/lib.rs b/src/lib.rs index f593f40..9421c8b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,9 +1,13 @@ //! [Aeron](https://github.com/real-logic/aeron) client for Rust #![deny(missing_docs)] +#[cfg(target_endian = "big")] +compile_error!("Aeron is only supported on little-endian architectures"); + pub mod client; pub mod control_protocol; pub mod driver; +pub mod util; const fn sematic_version_compose(major: u8, minor: u8, patch: u8) -> i32 { (major as i32) << 16 | (minor as i32) << 8 | (patch as i32) diff --git a/src/util.rs b/src/util.rs new file mode 100644 index 0000000..2c91323 --- /dev/null +++ b/src/util.rs @@ -0,0 +1,27 @@ +//! Various utility and helper bits for the Aeron client. Predominantly helpful +//! in mapping between concepts in the C++ API and Rust + +/// Helper type to indicate indexing operations in Aeron, Synonymous with the +/// Aeron C++ `index_t` type. Used to imitate the Java API. +pub type IndexT = i32; + +/// Helper method for quick verification that `IndexT` is a positive power of two +pub fn is_power_of_two(idx: IndexT) -> bool { + idx > 0 && (idx as u32).is_power_of_two() +} + +/// Length of the data blocks used by the CPU cache sub-system in bytes +pub const CACHE_LINE_LENGTH: usize = 64; + +/// Error types from operations in the Aeron client. Synonymous with the exceptions +/// generated by the C++ client. +#[derive(Debug, PartialEq)] +pub enum AeronError { + /// Indication that an argument provided is an illegal value + IllegalArgument, + /// Indication that a memory access would exceed the allowable bounds + OutOfBounds, +} + +/// Result type for operations in the Aeron client +pub type Result = ::std::result::Result; From 4611034b7bad69d09dc3164bc3d3a24efce2b29d Mon Sep 17 00:00:00 2001 From: Bradlee Speice Date: Thu, 3 Oct 2019 22:17:08 -0400 Subject: [PATCH 19/40] Add a test for increment offset --- src/client/concurrent/atomic_buffer.rs | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/src/client/concurrent/atomic_buffer.rs b/src/client/concurrent/atomic_buffer.rs index 179f24e..b6269e8 100644 --- a/src/client/concurrent/atomic_buffer.rs +++ b/src/client/concurrent/atomic_buffer.rs @@ -80,4 +80,13 @@ mod tests { assert_eq!(atomic_buf.get_and_add_i64(0, 1), Ok(16)); assert_eq!(atomic_buf.get_and_add_i64(0, 0), Ok(17)); } + + #[test] + fn atomic_i64_increment_offset() { + let mut buf = [0, 16, 0, 0, 0, 0, 0, 0, 0]; + + let atomic_buf = AtomicBuffer::wrap(&mut buf[..]); + assert_eq!(atomic_buf.get_and_add_i64(1, 1), Ok(16)); + assert_eq!(atomic_buf.get_and_add_i64(1, 0), Ok(17)); + } } From 38eea0224f80119edace69a38c17e055dd995db2 Mon Sep 17 00:00:00 2001 From: Bradlee Speice Date: Thu, 3 Oct 2019 22:40:10 -0400 Subject: [PATCH 20/40] Fix the bounds check --- src/client/concurrent/atomic_buffer.rs | 19 ++++++++++++++++++- 1 file changed, 18 insertions(+), 1 deletion(-) diff --git a/src/client/concurrent/atomic_buffer.rs b/src/client/concurrent/atomic_buffer.rs index b6269e8..5847b26 100644 --- a/src/client/concurrent/atomic_buffer.rs +++ b/src/client/concurrent/atomic_buffer.rs @@ -26,7 +26,7 @@ impl<'a> AtomicBuffer<'a> { } fn bounds_check(&self, offset: IndexT, size: usize) -> Result<()> { - if self.buffer.len() - offset as usize > size { + if self.buffer.len() - (offset as usize) < size { Err(AeronError::OutOfBounds) } else { Ok(()) @@ -51,6 +51,7 @@ mod tests { use std::sync::atomic::{AtomicU64, Ordering}; use crate::client::concurrent::atomic_buffer::AtomicBuffer; + use crate::util::AeronError; #[test] fn mmap_to_atomic() { @@ -89,4 +90,20 @@ mod tests { assert_eq!(atomic_buf.get_and_add_i64(1, 1), Ok(16)); assert_eq!(atomic_buf.get_and_add_i64(1, 0), Ok(17)); } + + #[test] + fn out_of_bounds() { + let mut buf = [16, 0, 0, 0, 0, 0, 0]; + + let atomic_buf = AtomicBuffer::wrap(&mut buf); + assert_eq!(atomic_buf.get_and_add_i64(0, 0), Err(AeronError::OutOfBounds)); + } + + #[test] + fn out_of_bounds_offset() { + let mut buf = [16, 0, 0, 0, 0, 0, 0, 0]; + + let atomic_buf = AtomicBuffer::wrap(&mut buf); + assert_eq!(atomic_buf.get_and_add_i64(1, 0), Err(AeronError::OutOfBounds)); + } } From a40a71d4a86ef96adaa4f1b4e10cb4955c2cb49f Mon Sep 17 00:00:00 2001 From: Bradlee Speice Date: Thu, 3 Oct 2019 22:55:02 -0400 Subject: [PATCH 21/40] Fix formatting and warnings --- aeron_driver-sys/src/lib.rs | 2 -- src/client/concurrent/atomic_buffer.rs | 10 ++++++++-- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/aeron_driver-sys/src/lib.rs b/aeron_driver-sys/src/lib.rs index 386ea65..9f6426e 100644 --- a/aeron_driver-sys/src/lib.rs +++ b/aeron_driver-sys/src/lib.rs @@ -6,8 +6,6 @@ include!(concat!(env!("OUT_DIR"), "/bindings.rs")); #[cfg(test)] mod tests { - use crate::*; - use std::convert::TryInto; #[test] fn version_check() { diff --git a/src/client/concurrent/atomic_buffer.rs b/src/client/concurrent/atomic_buffer.rs index 5847b26..69c39a8 100644 --- a/src/client/concurrent/atomic_buffer.rs +++ b/src/client/concurrent/atomic_buffer.rs @@ -96,7 +96,10 @@ mod tests { let mut buf = [16, 0, 0, 0, 0, 0, 0]; let atomic_buf = AtomicBuffer::wrap(&mut buf); - assert_eq!(atomic_buf.get_and_add_i64(0, 0), Err(AeronError::OutOfBounds)); + assert_eq!( + atomic_buf.get_and_add_i64(0, 0), + Err(AeronError::OutOfBounds) + ); } #[test] @@ -104,6 +107,9 @@ mod tests { let mut buf = [16, 0, 0, 0, 0, 0, 0, 0]; let atomic_buf = AtomicBuffer::wrap(&mut buf); - assert_eq!(atomic_buf.get_and_add_i64(1, 0), Err(AeronError::OutOfBounds)); + assert_eq!( + atomic_buf.get_and_add_i64(1, 0), + Err(AeronError::OutOfBounds) + ); } } From 818d2ad821242c3d285988a016aadec8ebede843 Mon Sep 17 00:00:00 2001 From: Bradlee Speice Date: Fri, 4 Oct 2019 20:26:15 -0400 Subject: [PATCH 22/40] Struct overlay, handle negative offsets --- src/client/concurrent/atomic_buffer.rs | 31 +++++++++++++++++--------- 1 file changed, 21 insertions(+), 10 deletions(-) diff --git a/src/client/concurrent/atomic_buffer.rs b/src/client/concurrent/atomic_buffer.rs index 69c39a8..5f57095 100644 --- a/src/client/concurrent/atomic_buffer.rs +++ b/src/client/concurrent/atomic_buffer.rs @@ -25,23 +25,24 @@ impl<'a> AtomicBuffer<'a> { AtomicBuffer { buffer } } - fn bounds_check(&self, offset: IndexT, size: usize) -> Result<()> { - if self.buffer.len() - (offset as usize) < size { + #[allow(clippy::cast_ptr_alignment)] + fn overlay(&self, offset: IndexT) -> Result<&T> + where + T: Sized, + { + if offset < 0 || self.buffer.len() - (offset as usize) < size_of::() { Err(AeronError::OutOfBounds) } else { - Ok(()) + let offset_ptr = unsafe { self.buffer.as_ptr().offset(offset as isize) }; + let t: &T = unsafe { &*(offset_ptr as *const T) }; + Ok(t) } } /// Atomically fetch the current value at an offset, and increment by delta - #[allow(clippy::cast_ptr_alignment)] pub fn get_and_add_i64(&self, offset: IndexT, delta: i64) -> Result { - self.bounds_check(offset, size_of::()).map(|_| { - let a: &AtomicI64 = - unsafe { &*(self.buffer.as_ptr().offset(offset as isize) as *const AtomicI64) }; - println!("AtomicI64: {:p}", a); - a.fetch_add(delta, Ordering::SeqCst) - }) + self.overlay::(offset) + .map(|a| a.fetch_add(delta, Ordering::SeqCst)) } } @@ -112,4 +113,14 @@ mod tests { Err(AeronError::OutOfBounds) ); } + + #[test] + fn negative_offset() { + let mut buf = [16, 0, 0, 0, 0, 0, 0, 0]; + let atomic_buf = AtomicBuffer::wrap(&mut buf); + assert_eq!( + atomic_buf.get_and_add_i64(-1, 0), + Err(AeronError::OutOfBounds) + ) + } } From 3ae7e6176cd9d8442a4c603c16d41a8ff581894d Mon Sep 17 00:00:00 2001 From: Bradlee Speice Date: Fri, 4 Oct 2019 23:53:58 -0400 Subject: [PATCH 23/40] Implement `claim_capacity` Pretty close to having write support ready --- src/client/concurrent/atomic_buffer.rs | 97 +++++++++- src/client/concurrent/ring_buffer.rs | 251 +++++++++++++++++++------ src/util.rs | 3 + 3 files changed, 292 insertions(+), 59 deletions(-) diff --git a/src/client/concurrent/atomic_buffer.rs b/src/client/concurrent/atomic_buffer.rs index 5f57095..891361f 100644 --- a/src/client/concurrent/atomic_buffer.rs +++ b/src/client/concurrent/atomic_buffer.rs @@ -5,6 +5,7 @@ use std::ops::Deref; use std::sync::atomic::{AtomicI64, Ordering}; use crate::util::{AeronError, IndexT, Result}; +use std::ptr::{read_volatile, write_volatile}; /// Wrapper for atomic operations around an underlying byte buffer pub struct AtomicBuffer<'a> { @@ -25,18 +26,43 @@ impl<'a> AtomicBuffer<'a> { AtomicBuffer { buffer } } + fn bounds_check(&self, offset: IndexT) -> Result<()> { + if offset < 0 || self.buffer.len() - (offset as usize) < size_of::() { + Err(AeronError::OutOfBounds) + } else { + Ok(()) + } + } + #[allow(clippy::cast_ptr_alignment)] fn overlay(&self, offset: IndexT) -> Result<&T> where T: Sized, { - if offset < 0 || self.buffer.len() - (offset as usize) < size_of::() { - Err(AeronError::OutOfBounds) - } else { + self.bounds_check::(offset).map(|_| { let offset_ptr = unsafe { self.buffer.as_ptr().offset(offset as isize) }; - let t: &T = unsafe { &*(offset_ptr as *const T) }; - Ok(t) - } + unsafe { &*(offset_ptr as *const T) } + }) + } + + fn overlay_volatile(&self, offset: IndexT) -> Result + where + T: Copy + { + self.bounds_check::(offset).map(|_| { + let offset_ptr = unsafe { self.buffer.as_ptr().offset(offset as isize) }; + unsafe { read_volatile(offset_ptr as *const T) } + }) + } + + fn write_volatile(&mut self, offset: IndexT, val: T) -> Result<()> + where + T: Copy, + { + self.bounds_check::(offset).map(|_| { + let offset_ptr = unsafe { self.buffer.as_mut_ptr().offset(offset as isize) }; + unsafe { write_volatile(offset_ptr as *mut T, val) }; + }) } /// Atomically fetch the current value at an offset, and increment by delta @@ -44,6 +70,32 @@ impl<'a> AtomicBuffer<'a> { self.overlay::(offset) .map(|a| a.fetch_add(delta, Ordering::SeqCst)) } + + /// Perform a volatile read + pub fn get_i64_volatile(&self, offset: IndexT) -> Result { + // QUESTION: Would it be better to express this in terms of an atomic read? + self.overlay_volatile::(offset) + } + + /// Perform a volatile write into the buffer + pub fn put_i64_ordered(&mut self, offset: IndexT, val: i64) -> Result<()> { + self.write_volatile::(offset, val) + } + + /// Compare an expected value with what is in memory, and if it matches, + /// update to a new value. Returns `Ok(true)` if the update was successful, + /// and `Ok(false)` if the update failed. + pub fn compare_and_set_i64(&self, offset: IndexT, expected: i64, update: i64) -> Result { + // QUESTION: Do I need a volatile and atomic read here? + // Aeron C++ uses a volatile read before the atomic operation, but I think that + // may be redundant. In addition, Rust's `read_volatile` operation returns a + // *copied* value; running `compare_exchange` on that copy introduces a race condition + // because we're no longer comparing a consistent address. + self.overlay::(offset).map(|a| { + a.compare_exchange(expected, update, Ordering::SeqCst, Ordering::SeqCst) + .is_ok() + }) + } } #[cfg(test)] @@ -123,4 +175,37 @@ mod tests { Err(AeronError::OutOfBounds) ) } + + #[test] + fn put_i64() { + let mut buf = [0u8; 8]; + let mut atomic_buf = AtomicBuffer::wrap(&mut buf); + + atomic_buf.put_i64_ordered(0, 12).unwrap(); + assert_eq!( + atomic_buf.get_i64_volatile(0), + Ok(12) + ) + } + + #[test] + fn compare_set_i64() { + let mut buf = [0u8; 8]; + let atomic_buf = AtomicBuffer::wrap(&mut buf); + + atomic_buf.get_and_add_i64(0, 1).unwrap(); + + assert_eq!( + atomic_buf.compare_and_set_i64(0, 0, 1), + Ok(false) + ); + assert_eq!( + atomic_buf.compare_and_set_i64(0, 1, 2), + Ok(true) + ); + assert_eq!( + atomic_buf.get_i64_volatile(0), + Ok(2) + ); + } } diff --git a/src/client/concurrent/ring_buffer.rs b/src/client/concurrent/ring_buffer.rs index f13ae53..27ff6d3 100644 --- a/src/client/concurrent/ring_buffer.rs +++ b/src/client/concurrent/ring_buffer.rs @@ -1,58 +1,35 @@ //! Ring buffer wrapper for communicating with the Media Driver use crate::client::concurrent::atomic_buffer::AtomicBuffer; -use crate::util::{IndexT, Result}; +use crate::util::{AeronError, IndexT, Result}; -/// Description of the Ring Buffer schema. Each Ring Buffer looks like: -/// -/// ```text -/// 0 1 2 3 -/// 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 -/// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ -/// | Buffer Data ... -/// ... | -/// +---------------------------------------------------------------+ -/// | | -/// | Tail Position | -/// | | -/// | | -/// +---------------------------------------------------------------+ -/// | | -/// | Head Cache Position | -/// | | -/// | | -/// +---------------------------------------------------------------+ -/// | | -/// | Head Position | -/// | | -/// | | -/// +---------------------------------------------------------------+ -/// | | -/// | Correlation ID Counter | -/// | | -/// | | -/// +---------------------------------------------------------------+ -/// | | -/// | Consumer Heartbeat | -/// | | -/// | | -/// +---------------------------------------------------------------+ -/// ``` -pub mod descriptor { +/// Description of the Ring Buffer schema. +pub mod buffer_descriptor { use crate::client::concurrent::atomic_buffer::AtomicBuffer; use crate::util::AeronError::IllegalArgument; use crate::util::{is_power_of_two, IndexT, Result, CACHE_LINE_LENGTH}; - /// Offset of the correlation id counter, as measured in bytes past - /// the start of the ring buffer metadata trailer - pub const CORRELATION_COUNTER_OFFSET: usize = CACHE_LINE_LENGTH * 8; + // QUESTION: Why are these offsets so large when we only ever use i64 types? - /// Total size of the ring buffer metadata trailer - pub const TRAILER_LENGTH: usize = CACHE_LINE_LENGTH * 12; + /// Offset in the ring buffer metadata to the end of the most recent record. + pub const TAIL_POSITION_OFFSET: IndexT = (CACHE_LINE_LENGTH * 2) as IndexT; + + /// QUESTION: Why the distinction between HEAD_CACHE and HEAD? + pub const HEAD_CACHE_POSITION_OFFSET: IndexT = (CACHE_LINE_LENGTH * 4) as IndexT; + + /// Offset in the ring buffer metadata to index of the next record to read. + pub const HEAD_POSITION_OFFSET: IndexT = (CACHE_LINE_LENGTH * 6) as IndexT; + + /// Offset of the correlation id counter, as measured in bytes past + /// the start of the ring buffer metadata trailer. + pub const CORRELATION_COUNTER_OFFSET: IndexT = (CACHE_LINE_LENGTH * 8) as IndexT; + + /// Total size of the ring buffer metadata trailer. + pub const TRAILER_LENGTH: IndexT = (CACHE_LINE_LENGTH * 12) as IndexT; /// Verify the capacity of a buffer is legal for use as a ring buffer. - /// Returns the actual buffer capacity once ring buffer metadata has been removed. + /// Returns the actual capacity excluding ring buffer metadata. pub fn check_capacity(buffer: &AtomicBuffer<'_>) -> Result { - let capacity = (buffer.len() - TRAILER_LENGTH) as IndexT; + let capacity = (buffer.len() - TRAILER_LENGTH as usize) as IndexT; if is_power_of_two(capacity) { Ok(capacity) } else { @@ -61,21 +38,189 @@ pub mod descriptor { } } +/// Ring buffer message header. Made up of fields for message length, message type, +/// and then the encoded message. +/// +/// Writing the record length signals the message recording is complete, and all +/// associated ring buffer metadata has been properly updated. +/// +/// ```text +/// 0 1 2 3 +/// 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 +/// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +/// |R| Record Length | +/// +-+-------------------------------------------------------------+ +/// | Type | +/// +---------------------------------------------------------------+ +/// | Encoded Message ... +///... | +/// +---------------------------------------------------------------+ +/// ``` +pub mod record_descriptor { + use crate::util::IndexT; + use std::mem::size_of; + + /// Size of the ring buffer record header. + pub const HEADER_LENGTH: IndexT = size_of::() as IndexT * 2; + + /// Message type indicating to the media driver that space has been reserved, + /// and is not yet ready for processing. + pub const PADDING_MSG_TYPE_ID: i32 = -1; + + /// Retrieve the header bits for a ring buffer record. + pub fn make_header(length: i32, msg_type_id: i32) -> i64 { + // QUESTION: Instead of masking, can't we just cast and return u32/u64? + // Smells like Java. + ((i64::from(msg_type_id) & 0xFFFF_FFFF) << 32) | (i64::from(length) & 0xFFFF_FFFF) + } +} + /// Multi-producer, single-consumer ring buffer implementation. pub struct ManyToOneRingBuffer<'a> { - _buffer: AtomicBuffer<'a>, - _capacity: IndexT, - _correlation_counter_offset: IndexT, + buffer: AtomicBuffer<'a>, + capacity: IndexT, + tail_position_index: IndexT, + head_cache_position_index: IndexT, + head_position_index: IndexT, + _correlation_id_counter_index: IndexT, } impl<'a> ManyToOneRingBuffer<'a> { - /// Create a many-to-one ring buffer from an underlying atomic buffer + /// Create a many-to-one ring buffer from an underlying atomic buffer. pub fn wrap(buffer: AtomicBuffer<'a>) -> Result { - descriptor::check_capacity(&buffer).map(|capacity| ManyToOneRingBuffer { - _buffer: buffer, - _capacity: capacity, - _correlation_counter_offset: capacity - + descriptor::CORRELATION_COUNTER_OFFSET as IndexT, + buffer_descriptor::check_capacity(&buffer).map(|capacity| ManyToOneRingBuffer { + buffer, + capacity, + tail_position_index: capacity + buffer_descriptor::TAIL_POSITION_OFFSET, + head_cache_position_index: capacity + buffer_descriptor::HEAD_CACHE_POSITION_OFFSET, + head_position_index: capacity + buffer_descriptor::HEAD_POSITION_OFFSET, + _correlation_id_counter_index: capacity + buffer_descriptor::CORRELATION_COUNTER_OFFSET, }) } + + /// Claim capacity for a specific message size in the ring buffer. Returns the offset/index + /// at which to start writing the next record. + // TODO: Shouldn't be `pub`, just trying to avoid warnings + pub fn claim_capacity(&mut self, required: IndexT) -> Result { + // QUESTION: Is this mask how we handle the "ring" in ring buffer? + // Would explain why we assert buffer capacity is a power of two during initialization + let mask = self.capacity - 1; + + // UNWRAP: Known-valid offset calculated during initialization + let mut head = self + .buffer + .get_i64_volatile(self.head_cache_position_index) + .unwrap(); + + let mut tail: i64; + let mut tail_index: IndexT; + let mut padding: IndexT; + // Note the braces, making this a do-while loop + while { + // UNWRAP: Known-valid offset calculated during initialization + tail = self + .buffer + .get_i64_volatile(self.tail_position_index) + .unwrap(); + let available_capacity = self.capacity - (tail - head) as IndexT; + + println!("Available: {}", available_capacity); + if required > available_capacity { + // UNWRAP: Known-valid offset calculated during initialization + head = self + .buffer + .get_i64_volatile(self.head_position_index) + .unwrap(); + + if required > (self.capacity - (tail - head) as IndexT) { + return Err(AeronError::InsufficientCapacity); + } + + // UNWRAP: Known-valid offset calculated during initialization + self.buffer + .put_i64_ordered(self.head_cache_position_index, head) + .unwrap(); + } + + padding = 0; + + // Because we assume `tail` and `mask` are always positive integers, + // it's "safe" to widen the types and bitmask below. We're just trying + // to imitate C++ here. + tail_index = (tail & i64::from(mask)) as IndexT; + let to_buffer_end_length = self.capacity - tail_index; + + println!("To buffer end: {}", to_buffer_end_length); + if required > to_buffer_end_length { + let mut head_index = (head & i64::from(mask)) as IndexT; + + if required > head_index { + // UNWRAP: Known-valid offset calculated during initialization + head = self + .buffer + .get_i64_volatile(self.head_position_index) + .unwrap(); + head_index = (head & i64::from(mask)) as IndexT; + + if required > head_index { + return Err(AeronError::InsufficientCapacity); + } + + // UNWRAP: Known-valid offset calculated during initialization + self.buffer + .put_i64_ordered(self.head_cache_position_index, head) + .unwrap(); + } + + padding = to_buffer_end_length; + } + + // UNWRAP: Known-valid offset calculated during initialization + !self + .buffer + .compare_and_set_i64( + self.tail_position_index, + tail, + tail + i64::from(required) + i64::from(padding), + ) + .unwrap() + } {} + + if padding != 0 { + // UNWRAP: Known-valid offset calculated during initialization + self.buffer + .put_i64_ordered( + tail_index, + record_descriptor::make_header(padding, record_descriptor::PADDING_MSG_TYPE_ID), + ) + .unwrap(); + tail_index = 0; + } + + Ok(tail_index) + } +} + +#[cfg(test)] +mod tests { + use crate::client::concurrent::atomic_buffer::AtomicBuffer; + use crate::client::concurrent::ring_buffer::ManyToOneRingBuffer; + + #[test] + fn basic_claim_space() { + let buf_size = super::buffer_descriptor::TRAILER_LENGTH as usize + 64; + let mut buf = vec![0u8; buf_size]; + + let atomic_buf = AtomicBuffer::wrap(&mut buf); + let mut ring_buf = ManyToOneRingBuffer::wrap(atomic_buf).unwrap(); + + ring_buf.claim_capacity(16).unwrap(); + assert_eq!( + ring_buf.buffer.get_i64_volatile(ring_buf.tail_position_index), + Ok(16) + ); + + let write_start = ring_buf.claim_capacity(16).unwrap(); + assert_eq!(write_start, 16); + } } diff --git a/src/util.rs b/src/util.rs index 2c91323..192d644 100644 --- a/src/util.rs +++ b/src/util.rs @@ -3,6 +3,7 @@ /// Helper type to indicate indexing operations in Aeron, Synonymous with the /// Aeron C++ `index_t` type. Used to imitate the Java API. +// QUESTION: Can this just be updated to be `usize` in Rust? pub type IndexT = i32; /// Helper method for quick verification that `IndexT` is a positive power of two @@ -21,6 +22,8 @@ pub enum AeronError { IllegalArgument, /// Indication that a memory access would exceed the allowable bounds OutOfBounds, + /// Indication that a buffer operation could not complete because of space constraints + InsufficientCapacity, } /// Result type for operations in the Aeron client From 8174f0cde13957f3f9fddc667c6bb0519a26d8d9 Mon Sep 17 00:00:00 2001 From: Bradlee Speice Date: Sat, 5 Oct 2019 00:24:47 -0400 Subject: [PATCH 24/40] Fix formatting --- src/client/concurrent/atomic_buffer.rs | 22 +++++----------------- src/client/concurrent/ring_buffer.rs | 4 +++- 2 files changed, 8 insertions(+), 18 deletions(-) diff --git a/src/client/concurrent/atomic_buffer.rs b/src/client/concurrent/atomic_buffer.rs index 891361f..9c0abfc 100644 --- a/src/client/concurrent/atomic_buffer.rs +++ b/src/client/concurrent/atomic_buffer.rs @@ -47,7 +47,7 @@ impl<'a> AtomicBuffer<'a> { fn overlay_volatile(&self, offset: IndexT) -> Result where - T: Copy + T: Copy, { self.bounds_check::(offset).map(|_| { let offset_ptr = unsafe { self.buffer.as_ptr().offset(offset as isize) }; @@ -182,10 +182,7 @@ mod tests { let mut atomic_buf = AtomicBuffer::wrap(&mut buf); atomic_buf.put_i64_ordered(0, 12).unwrap(); - assert_eq!( - atomic_buf.get_i64_volatile(0), - Ok(12) - ) + assert_eq!(atomic_buf.get_i64_volatile(0), Ok(12)) } #[test] @@ -195,17 +192,8 @@ mod tests { atomic_buf.get_and_add_i64(0, 1).unwrap(); - assert_eq!( - atomic_buf.compare_and_set_i64(0, 0, 1), - Ok(false) - ); - assert_eq!( - atomic_buf.compare_and_set_i64(0, 1, 2), - Ok(true) - ); - assert_eq!( - atomic_buf.get_i64_volatile(0), - Ok(2) - ); + assert_eq!(atomic_buf.compare_and_set_i64(0, 0, 1), Ok(false)); + assert_eq!(atomic_buf.compare_and_set_i64(0, 1, 2), Ok(true)); + assert_eq!(atomic_buf.get_i64_volatile(0), Ok(2)); } } diff --git a/src/client/concurrent/ring_buffer.rs b/src/client/concurrent/ring_buffer.rs index 27ff6d3..2a9bdd9 100644 --- a/src/client/concurrent/ring_buffer.rs +++ b/src/client/concurrent/ring_buffer.rs @@ -216,7 +216,9 @@ mod tests { ring_buf.claim_capacity(16).unwrap(); assert_eq!( - ring_buf.buffer.get_i64_volatile(ring_buf.tail_position_index), + ring_buf + .buffer + .get_i64_volatile(ring_buf.tail_position_index), Ok(16) ); From 9373f04b48d30579d1a4e17d334081a6b2d835e1 Mon Sep 17 00:00:00 2001 From: Bradlee Speice Date: Sat, 5 Oct 2019 20:28:35 -0400 Subject: [PATCH 25/40] Add buffer write support --- Cargo.toml | 1 + src/client/concurrent/atomic_buffer.rs | 203 ++++++++++++++++--------- src/client/concurrent/ring_buffer.rs | 131 ++++++++++++++-- src/util.rs | 39 ++++- 4 files changed, 284 insertions(+), 90 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index a8ab2f4..b999168 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,6 +14,7 @@ maintenance = { status = "actively-developed" } [dependencies] aeron_driver-sys = { path = "./aeron_driver-sys" } memmap = "0.7" +num = "0.2" [dev-dependencies] clap = "2.33" diff --git a/src/client/concurrent/atomic_buffer.rs b/src/client/concurrent/atomic_buffer.rs index 9c0abfc..c130a43 100644 --- a/src/client/concurrent/atomic_buffer.rs +++ b/src/client/concurrent/atomic_buffer.rs @@ -26,8 +26,8 @@ impl<'a> AtomicBuffer<'a> { AtomicBuffer { buffer } } - fn bounds_check(&self, offset: IndexT) -> Result<()> { - if offset < 0 || self.buffer.len() - (offset as usize) < size_of::() { + fn bounds_check(&self, offset: IndexT, size: IndexT) -> Result<()> { + if offset < 0 || size < 0 || self.buffer.len() as IndexT - offset < size { Err(AeronError::OutOfBounds) } else { Ok(()) @@ -39,52 +39,164 @@ impl<'a> AtomicBuffer<'a> { where T: Sized, { - self.bounds_check::(offset).map(|_| { - let offset_ptr = unsafe { self.buffer.as_ptr().offset(offset as isize) }; - unsafe { &*(offset_ptr as *const T) } - }) + self.bounds_check(offset, size_of::() as IndexT) + .map(|_| { + let offset_ptr = unsafe { self.buffer.as_ptr().offset(offset as isize) }; + unsafe { &*(offset_ptr as *const T) } + }) } fn overlay_volatile(&self, offset: IndexT) -> Result where T: Copy, { - self.bounds_check::(offset).map(|_| { - let offset_ptr = unsafe { self.buffer.as_ptr().offset(offset as isize) }; - unsafe { read_volatile(offset_ptr as *const T) } - }) + self.bounds_check(offset, size_of::() as IndexT) + .map(|_| { + let offset_ptr = unsafe { self.buffer.as_ptr().offset(offset as isize) }; + unsafe { read_volatile(offset_ptr as *const T) } + }) } fn write_volatile(&mut self, offset: IndexT, val: T) -> Result<()> where T: Copy, { - self.bounds_check::(offset).map(|_| { - let offset_ptr = unsafe { self.buffer.as_mut_ptr().offset(offset as isize) }; - unsafe { write_volatile(offset_ptr as *mut T, val) }; - }) + self.bounds_check(offset, size_of::() as IndexT) + .map(|_| { + let offset_ptr = unsafe { self.buffer.as_mut_ptr().offset(offset as isize) }; + unsafe { write_volatile(offset_ptr as *mut T, val) }; + }) } /// Atomically fetch the current value at an offset, and increment by delta + /// + /// ```rust + /// # use aeron_rs::client::concurrent::atomic_buffer::AtomicBuffer; + /// # use aeron_rs::util::AeronError; + /// let mut bytes = [0u8; 9]; + /// let mut buffer = AtomicBuffer::wrap(&mut bytes); + /// + /// // Simple case modifies only the first byte + /// assert_eq!(buffer.get_and_add_i64(0, 1), Ok(0)); + /// assert_eq!(buffer.get_and_add_i64(0, 0), Ok(1)); + /// + /// // Using an offset modifies the second byte + /// assert_eq!(buffer.get_and_add_i64(1, 1), Ok(0)); + /// assert_eq!(buffer.get_and_add_i64(1, 0), Ok(1)); + /// + /// // An offset of 2 means buffer size must be 10 to contain an `i64` + /// assert_eq!(buffer.get_and_add_i64(2, 0), Err(AeronError::OutOfBounds)); + /// ``` pub fn get_and_add_i64(&self, offset: IndexT, delta: i64) -> Result { self.overlay::(offset) .map(|a| a.fetch_add(delta, Ordering::SeqCst)) } /// Perform a volatile read + /// + /// ```rust + /// # use aeron_rs::client::concurrent::atomic_buffer::AtomicBuffer; + /// let mut bytes = [12, 0, 0, 0, 0, 0, 0, 0]; + /// let buffer = AtomicBuffer::wrap(&mut bytes); + /// + /// assert_eq!(buffer.get_i64_volatile(0), Ok(12)); + /// ``` pub fn get_i64_volatile(&self, offset: IndexT) -> Result { // QUESTION: Would it be better to express this in terms of an atomic read? self.overlay_volatile::(offset) } - /// Perform a volatile write into the buffer + /// Perform a volatile read + /// + /// ```rust + /// # use aeron_rs::client::concurrent::atomic_buffer::AtomicBuffer; + /// let mut bytes = [12, 0, 0, 0]; + /// let buffer = AtomicBuffer::wrap(&mut bytes); + /// + /// assert_eq!(buffer.get_i32_volatile(0), Ok(12)); + /// ``` + pub fn get_i32_volatile(&self, offset: IndexT) -> Result { + self.overlay_volatile::(offset) + } + + /// Perform a volatile write of an `i64` into the buffer + /// + /// ```rust + /// # use aeron_rs::client::concurrent::atomic_buffer::AtomicBuffer; + /// let mut bytes = [0u8; 8]; + /// let mut buffer = AtomicBuffer::wrap(&mut bytes); + /// + /// buffer.put_i64_ordered(0, 12); + /// assert_eq!(buffer.get_i64_volatile(0), Ok(12)); + /// ``` pub fn put_i64_ordered(&mut self, offset: IndexT, val: i64) -> Result<()> { + // QUESTION: Would it be better to have callers use `write_volatile` directly self.write_volatile::(offset, val) } + /// Perform a volatile write of an `i32` into the buffer + /// + /// ```rust + /// # use aeron_rs::client::concurrent::atomic_buffer::AtomicBuffer; + /// let mut bytes = [0u8; 4]; + /// let mut buffer = AtomicBuffer::wrap(&mut bytes); + /// + /// buffer.put_i32_ordered(0, 12); + /// assert_eq!(buffer.get_i32_volatile(0), Ok(12)); + /// ``` + pub fn put_i32_ordered(&mut self, offset: IndexT, val: i32) -> Result<()> { + // QUESTION: Would it be better to have callers use `write_volatile` directly + self.write_volatile::(offset, val) + } + + /// Write the contents of one buffer to another. Does not perform any synchronization. + /// + /// ```rust + /// # use aeron_rs::client::concurrent::atomic_buffer::AtomicBuffer; + /// let mut source_bytes = [1u8, 2, 3, 4]; + /// let source = AtomicBuffer::wrap(&mut source_bytes); + /// + /// let mut dest_bytes = [0, 0, 0, 0]; + /// let mut dest = AtomicBuffer::wrap(&mut dest_bytes); + /// + /// dest.put_bytes(1, &source, 1, 3); + /// drop(dest); + /// assert_eq!(dest_bytes, [0u8, 2, 3, 4]); + /// ``` + pub fn put_bytes( + &mut self, + index: IndexT, + source: &AtomicBuffer, + source_index: IndexT, + len: IndexT, + ) -> Result<()> { + self.bounds_check(index, len)?; + source.bounds_check(source_index, len)?; + + let index = index as usize; + let source_index = source_index as usize; + let len = len as usize; + self.buffer[index..index + len].copy_from_slice(&source[source_index..source_index + len]); + Ok(()) + } + /// Compare an expected value with what is in memory, and if it matches, /// update to a new value. Returns `Ok(true)` if the update was successful, /// and `Ok(false)` if the update failed. + /// + /// ```rust + /// # use aeron_rs::client::concurrent::atomic_buffer::AtomicBuffer; + /// let mut buf = [0u8; 8]; + /// let atomic_buf = AtomicBuffer::wrap(&mut buf); + /// // Set value to 1 + /// atomic_buf.get_and_add_i64(0, 1).unwrap(); + /// + /// // Set value to 1 if existing value is 0 + /// assert_eq!(atomic_buf.compare_and_set_i64(0, 0, 1), Ok(false)); + /// // Set value to 2 if existing value is 1 + /// assert_eq!(atomic_buf.compare_and_set_i64(0, 1, 2), Ok(true)); + /// assert_eq!(atomic_buf.get_i64_volatile(0), Ok(2)); + /// ``` pub fn compare_and_set_i64(&self, offset: IndexT, expected: i64, update: i64) -> Result { // QUESTION: Do I need a volatile and atomic read here? // Aeron C++ uses a volatile read before the atomic operation, but I think that @@ -126,46 +238,6 @@ mod tests { assert_eq!(value, (*a).load(Ordering::SeqCst)); } - #[test] - fn atomic_i64_increment() { - let mut buf = [16, 0, 0, 0, 0, 0, 0, 0]; - - let atomic_buf = AtomicBuffer::wrap(&mut buf[..]); - assert_eq!(atomic_buf.get_and_add_i64(0, 1), Ok(16)); - assert_eq!(atomic_buf.get_and_add_i64(0, 0), Ok(17)); - } - - #[test] - fn atomic_i64_increment_offset() { - let mut buf = [0, 16, 0, 0, 0, 0, 0, 0, 0]; - - let atomic_buf = AtomicBuffer::wrap(&mut buf[..]); - assert_eq!(atomic_buf.get_and_add_i64(1, 1), Ok(16)); - assert_eq!(atomic_buf.get_and_add_i64(1, 0), Ok(17)); - } - - #[test] - fn out_of_bounds() { - let mut buf = [16, 0, 0, 0, 0, 0, 0]; - - let atomic_buf = AtomicBuffer::wrap(&mut buf); - assert_eq!( - atomic_buf.get_and_add_i64(0, 0), - Err(AeronError::OutOfBounds) - ); - } - - #[test] - fn out_of_bounds_offset() { - let mut buf = [16, 0, 0, 0, 0, 0, 0, 0]; - - let atomic_buf = AtomicBuffer::wrap(&mut buf); - assert_eq!( - atomic_buf.get_and_add_i64(1, 0), - Err(AeronError::OutOfBounds) - ); - } - #[test] fn negative_offset() { let mut buf = [16, 0, 0, 0, 0, 0, 0, 0]; @@ -175,25 +247,4 @@ mod tests { Err(AeronError::OutOfBounds) ) } - - #[test] - fn put_i64() { - let mut buf = [0u8; 8]; - let mut atomic_buf = AtomicBuffer::wrap(&mut buf); - - atomic_buf.put_i64_ordered(0, 12).unwrap(); - assert_eq!(atomic_buf.get_i64_volatile(0), Ok(12)) - } - - #[test] - fn compare_set_i64() { - let mut buf = [0u8; 8]; - let atomic_buf = AtomicBuffer::wrap(&mut buf); - - atomic_buf.get_and_add_i64(0, 1).unwrap(); - - assert_eq!(atomic_buf.compare_and_set_i64(0, 0, 1), Ok(false)); - assert_eq!(atomic_buf.compare_and_set_i64(0, 1, 2), Ok(true)); - assert_eq!(atomic_buf.get_i64_volatile(0), Ok(2)); - } } diff --git a/src/client/concurrent/ring_buffer.rs b/src/client/concurrent/ring_buffer.rs index 2a9bdd9..c6d88e8 100644 --- a/src/client/concurrent/ring_buffer.rs +++ b/src/client/concurrent/ring_buffer.rs @@ -1,12 +1,13 @@ //! Ring buffer wrapper for communicating with the Media Driver use crate::client::concurrent::atomic_buffer::AtomicBuffer; -use crate::util::{AeronError, IndexT, Result}; +use crate::util::{bit, AeronError, IndexT, Result}; /// Description of the Ring Buffer schema. pub mod buffer_descriptor { use crate::client::concurrent::atomic_buffer::AtomicBuffer; + use crate::util::bit::is_power_of_two; use crate::util::AeronError::IllegalArgument; - use crate::util::{is_power_of_two, IndexT, Result, CACHE_LINE_LENGTH}; + use crate::util::{IndexT, Result, CACHE_LINE_LENGTH}; // QUESTION: Why are these offsets so large when we only ever use i64 types? @@ -57,12 +58,17 @@ pub mod buffer_descriptor { /// +---------------------------------------------------------------+ /// ``` pub mod record_descriptor { - use crate::util::IndexT; use std::mem::size_of; + use crate::util::Result; + use crate::util::{AeronError, IndexT}; + /// Size of the ring buffer record header. pub const HEADER_LENGTH: IndexT = size_of::() as IndexT * 2; + /// Alignment size of records written to the buffer + pub const ALIGNMENT: IndexT = HEADER_LENGTH; + /// Message type indicating to the media driver that space has been reserved, /// and is not yet ready for processing. pub const PADDING_MSG_TYPE_ID: i32 = -1; @@ -73,16 +79,36 @@ pub mod record_descriptor { // Smells like Java. ((i64::from(msg_type_id) & 0xFFFF_FFFF) << 32) | (i64::from(length) & 0xFFFF_FFFF) } + + /// Verify a message type identifier is safe for use + pub fn check_msg_type_id(msg_type_id: i32) -> Result<()> { + if msg_type_id < 1 { + Err(AeronError::IllegalArgument) + } else { + Ok(()) + } + } + + /// Fetch the offset to begin writing a message payload + pub fn encoded_msg_offset(record_offset: IndexT) -> IndexT { + record_offset + HEADER_LENGTH + } + + /// Fetch the offset to begin writing the message length + pub fn length_offset(record_offset: IndexT) -> IndexT { + record_offset + } } /// Multi-producer, single-consumer ring buffer implementation. pub struct ManyToOneRingBuffer<'a> { buffer: AtomicBuffer<'a>, capacity: IndexT, + max_msg_length: IndexT, tail_position_index: IndexT, head_cache_position_index: IndexT, head_position_index: IndexT, - _correlation_id_counter_index: IndexT, + correlation_id_counter_index: IndexT, } impl<'a> ManyToOneRingBuffer<'a> { @@ -91,17 +117,65 @@ impl<'a> ManyToOneRingBuffer<'a> { buffer_descriptor::check_capacity(&buffer).map(|capacity| ManyToOneRingBuffer { buffer, capacity, + max_msg_length: capacity / 8, tail_position_index: capacity + buffer_descriptor::TAIL_POSITION_OFFSET, head_cache_position_index: capacity + buffer_descriptor::HEAD_CACHE_POSITION_OFFSET, head_position_index: capacity + buffer_descriptor::HEAD_POSITION_OFFSET, - _correlation_id_counter_index: capacity + buffer_descriptor::CORRELATION_COUNTER_OFFSET, + correlation_id_counter_index: capacity + buffer_descriptor::CORRELATION_COUNTER_OFFSET, }) } + /// Atomically retrieve the next correlation identifier. Used as a unique identifier for + /// interactions with the Media Driver + pub fn next_correlation_id(&self) -> i64 { + // UNWRAP: Known-valid offset calculated during initialization + self.buffer + .get_and_add_i64(self.correlation_id_counter_index, 1) + .unwrap() + } + + /// Write a message into the ring buffer + pub fn write( + &mut self, + msg_type_id: i32, + source: &AtomicBuffer, + source_index: IndexT, + length: IndexT, + ) -> Result<()> { + record_descriptor::check_msg_type_id(msg_type_id)?; + self.check_msg_length(length)?; + + let record_len = length + record_descriptor::HEADER_LENGTH; + let required = bit::align(record_len, record_descriptor::ALIGNMENT); + let record_index = self.claim_capacity(required)?; + + // UNWRAP: `claim_capacity` performed bounds checking + self.buffer + .put_i64_ordered( + record_index, + record_descriptor::make_header(-length, msg_type_id), + ) + .unwrap(); + // UNWRAP: `claim_capacity` performed bounds checking + self.buffer + .put_bytes( + record_descriptor::encoded_msg_offset(record_index), + source, + source_index, + length, + ) + .unwrap(); + // UNWRAP: `claim_capacity` performed bounds checking + self.buffer + .put_i32_ordered(record_descriptor::length_offset(record_index), record_len) + .unwrap(); + + Ok(()) + } + /// Claim capacity for a specific message size in the ring buffer. Returns the offset/index /// at which to start writing the next record. - // TODO: Shouldn't be `pub`, just trying to avoid warnings - pub fn claim_capacity(&mut self, required: IndexT) -> Result { + fn claim_capacity(&mut self, required: IndexT) -> Result { // QUESTION: Is this mask how we handle the "ring" in ring buffer? // Would explain why we assert buffer capacity is a power of two during initialization let mask = self.capacity - 1; @@ -199,15 +273,27 @@ impl<'a> ManyToOneRingBuffer<'a> { Ok(tail_index) } + + fn check_msg_length(&self, length: IndexT) -> Result<()> { + if length > self.max_msg_length { + Err(AeronError::IllegalArgument) + } else { + Ok(()) + } + } } #[cfg(test)] mod tests { use crate::client::concurrent::atomic_buffer::AtomicBuffer; - use crate::client::concurrent::ring_buffer::ManyToOneRingBuffer; + use crate::client::concurrent::ring_buffer::{ + buffer_descriptor, record_descriptor, ManyToOneRingBuffer, + }; + use crate::util::IndexT; + use std::mem::size_of; #[test] - fn basic_claim_space() { + fn claim_capacity_basic() { let buf_size = super::buffer_descriptor::TRAILER_LENGTH as usize + 64; let mut buf = vec![0u8; buf_size]; @@ -225,4 +311,31 @@ mod tests { let write_start = ring_buf.claim_capacity(16).unwrap(); assert_eq!(write_start, 16); } + + #[test] + fn write_basic() { + let mut bytes = vec![0u8; 512 + buffer_descriptor::TRAILER_LENGTH as usize]; + let buffer = AtomicBuffer::wrap(&mut bytes); + let mut ring_buffer = ManyToOneRingBuffer::wrap(buffer).expect("Invalid buffer size"); + + let mut source_bytes = [12, 0, 0, 0, 0, 0, 0, 0]; + let source_len = source_bytes.len() as IndexT; + let source_buffer = AtomicBuffer::wrap(&mut source_bytes); + let type_id = 1; + ring_buffer + .write(type_id, &source_buffer, 0, source_len) + .unwrap(); + + drop(ring_buffer); + let buffer = AtomicBuffer::wrap(&mut bytes); + let record_len = source_len + record_descriptor::HEADER_LENGTH; + assert_eq!( + buffer.get_i64_volatile(0).unwrap(), + record_descriptor::make_header(record_len, type_id) + ); + assert_eq!( + buffer.get_i64_volatile(size_of::() as IndexT).unwrap(), + 12 + ); + } } diff --git a/src/util.rs b/src/util.rs index 192d644..db0dfdc 100644 --- a/src/util.rs +++ b/src/util.rs @@ -6,11 +6,6 @@ // QUESTION: Can this just be updated to be `usize` in Rust? pub type IndexT = i32; -/// Helper method for quick verification that `IndexT` is a positive power of two -pub fn is_power_of_two(idx: IndexT) -> bool { - idx > 0 && (idx as u32).is_power_of_two() -} - /// Length of the data blocks used by the CPU cache sub-system in bytes pub const CACHE_LINE_LENGTH: usize = 64; @@ -28,3 +23,37 @@ pub enum AeronError { /// Result type for operations in the Aeron client pub type Result = ::std::result::Result; + +/// Bit-level utility functions +pub mod bit { + use crate::util::IndexT; + use num::PrimInt; + + /// Helper method for quick verification that `IndexT` is a positive power of two + /// + /// ```rust + /// # use aeron_rs::util::bit::is_power_of_two; + /// assert!(is_power_of_two(16)); + /// assert!(!is_power_of_two(17)); + /// ``` + pub fn is_power_of_two(idx: IndexT) -> bool { + idx > 0 && (idx as u32).is_power_of_two() + } + + /// Align a specific value to the next largest alignment size. + /// + /// ```rust + /// # use aeron_rs::util::bit::align; + /// assert_eq!(align(7, 8), 8); + /// + /// // Not intended for alignments that aren't powers of two + /// assert_eq!(align(52, 12), 52); + /// assert_eq!(align(52, 16), 64); + /// ``` + pub fn align(val: T, alignment: T) -> T + where + T: PrimInt, + { + (val + (alignment - T::one())) & !(alignment - T::one()) + } +} From a92f7e64163ddbade6c505c821b9cfa1983f1a5d Mon Sep 17 00:00:00 2001 From: Bradlee Speice Date: Sat, 5 Oct 2019 20:30:28 -0400 Subject: [PATCH 26/40] One more question I need to eventually track down --- src/client/concurrent/ring_buffer.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/client/concurrent/ring_buffer.rs b/src/client/concurrent/ring_buffer.rs index c6d88e8..0c85925 100644 --- a/src/client/concurrent/ring_buffer.rs +++ b/src/client/concurrent/ring_buffer.rs @@ -57,6 +57,7 @@ pub mod buffer_descriptor { ///... | /// +---------------------------------------------------------------+ /// ``` +// QUESTION: What is the `R` bit in the diagram above? pub mod record_descriptor { use std::mem::size_of; From b548c867c8ea18bbd3bbc9e177437d56c67bd66c Mon Sep 17 00:00:00 2001 From: Bradlee Speice Date: Sat, 5 Oct 2019 22:01:17 -0400 Subject: [PATCH 27/40] Theoretically able to shut down a driver? In practice, I can't get any combination of C's aeronmd, C++'s DriverTool, Rust's `aeronmd`, and `do_terminate`. --- examples/aeronmd.rs | 1 + examples/do_terminate.rs | 51 ++++++++++++++++++++++++++ src/client/cnc_descriptor.rs | 3 +- src/client/concurrent/atomic_buffer.rs | 6 ++- src/client/concurrent/ring_buffer.rs | 2 - 5 files changed, 58 insertions(+), 5 deletions(-) create mode 100644 examples/do_terminate.rs diff --git a/examples/aeronmd.rs b/examples/aeronmd.rs index 9d91217..a7786b4 100644 --- a/examples/aeronmd.rs +++ b/examples/aeronmd.rs @@ -13,6 +13,7 @@ use std::sync::atomic::{AtomicBool, Ordering}; static RUNNING: AtomicBool = AtomicBool::new(true); unsafe extern "C" fn termination_hook(_clientd: *mut c_void) { + println!("Terminated"); RUNNING.store(false, Ordering::SeqCst); } diff --git a/examples/do_terminate.rs b/examples/do_terminate.rs new file mode 100644 index 0000000..81e818c --- /dev/null +++ b/examples/do_terminate.rs @@ -0,0 +1,51 @@ +use aeron_rs::client::cnc_descriptor::MetaDataDefinition; +use aeron_rs::client::concurrent::atomic_buffer::AtomicBuffer; +use aeron_rs::client::concurrent::ring_buffer::ManyToOneRingBuffer; +use aeron_rs::client::context::ClientContext; +use aeron_rs::util::IndexT; +use memmap::MmapOptions; +use std::fs::OpenOptions; +use std::mem::size_of; + +fn main() { + let path = ClientContext::default_aeron_path(); + let cnc = path.join("cnc.dat"); + + println!("Opening CnC file: {}", cnc.display()); + let file = OpenOptions::new() + .read(true) + .write(true) + .open(&cnc) + .expect("Unable to open CnC file"); + let mut mmap = + unsafe { MmapOptions::default().map_mut(&file) }.expect("Unable to mmap CnC file"); + println!("MMap len: {}", mmap.len()); + + // When creating the buffer, we need to offset by the CnC metadata + let cnc_metadata_len = size_of::(); + println!("Buffer len: {}", mmap[cnc_metadata_len..].len()); + + // Read metadata to get buffer length + let buffer_len = { + let atomic_buffer = AtomicBuffer::wrap(&mut mmap); + let metadata = atomic_buffer.overlay::(0).unwrap(); + metadata.to_driver_buffer_length + }; + + let buffer_end = cnc_metadata_len + buffer_len as usize; + let atomic_buffer = AtomicBuffer::wrap(&mut mmap[cnc_metadata_len..buffer_end]); + let mut ring_buffer = + ManyToOneRingBuffer::wrap(atomic_buffer).expect("Improperly sized buffer"); + + // 20 bytes: Client ID (8), correlation ID (8), token length (4) + let mut terminate_bytes = vec![0u8; 20]; + let terminate_len = terminate_bytes.len(); + let mut source_buffer = AtomicBuffer::wrap(&mut terminate_bytes); + let client_id = ring_buffer.next_correlation_id(); + source_buffer.put_i64_ordered(0, client_id).unwrap(); + + let term_id: i32 = 0x0E; + ring_buffer + .write(term_id, &source_buffer, 0, terminate_len as IndexT) + .unwrap(); +} diff --git a/src/client/cnc_descriptor.rs b/src/client/cnc_descriptor.rs index 1c02318..6724a80 100644 --- a/src/client/cnc_descriptor.rs +++ b/src/client/cnc_descriptor.rs @@ -50,7 +50,8 @@ #[repr(C, align(4))] pub struct MetaDataDefinition { cnc_version: i32, - _to_driver_buffer_length: i32, + /// Size of the buffer containing data going to the media driver + pub to_driver_buffer_length: i32, _to_client_buffer_length: i32, _counter_metadata_buffer_length: i32, _counter_values_buffer_length: i32, diff --git a/src/client/concurrent/atomic_buffer.rs b/src/client/concurrent/atomic_buffer.rs index c130a43..f96b929 100644 --- a/src/client/concurrent/atomic_buffer.rs +++ b/src/client/concurrent/atomic_buffer.rs @@ -34,8 +34,10 @@ impl<'a> AtomicBuffer<'a> { } } - #[allow(clippy::cast_ptr_alignment)] - fn overlay(&self, offset: IndexT) -> Result<&T> + /// Overlay a struct on a buffer. + /// + /// NOTE: Has the potential to cause undefined behavior if alignment is incorrect. + pub fn overlay(&self, offset: IndexT) -> Result<&T> where T: Sized, { diff --git a/src/client/concurrent/ring_buffer.rs b/src/client/concurrent/ring_buffer.rs index 0c85925..4cbbaa8 100644 --- a/src/client/concurrent/ring_buffer.rs +++ b/src/client/concurrent/ring_buffer.rs @@ -199,7 +199,6 @@ impl<'a> ManyToOneRingBuffer<'a> { .unwrap(); let available_capacity = self.capacity - (tail - head) as IndexT; - println!("Available: {}", available_capacity); if required > available_capacity { // UNWRAP: Known-valid offset calculated during initialization head = self @@ -225,7 +224,6 @@ impl<'a> ManyToOneRingBuffer<'a> { tail_index = (tail & i64::from(mask)) as IndexT; let to_buffer_end_length = self.capacity - tail_index; - println!("To buffer end: {}", to_buffer_end_length); if required > to_buffer_end_length { let mut head_index = (head & i64::from(mask)) as IndexT; From 730252127b59064b62ff0f16a78410fd6b3bd335 Mon Sep 17 00:00:00 2001 From: Bradlee Speice Date: Sun, 6 Oct 2019 18:17:28 -0400 Subject: [PATCH 28/40] Add a termination validator hook Explains why `aeronmd` never actually shut down --- examples/aeronmd.rs | 23 +++++++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/examples/aeronmd.rs b/examples/aeronmd.rs index a7786b4..6909ff3 100644 --- a/examples/aeronmd.rs +++ b/examples/aeronmd.rs @@ -17,6 +17,10 @@ unsafe extern "C" fn termination_hook(_clientd: *mut c_void) { RUNNING.store(false, Ordering::SeqCst); } +unsafe extern "C" fn termination_validator(_state: *mut c_void, _buffer: *mut u8, _length: i32) -> bool { + true +} + fn main() { let version = unsafe { CStr::from_ptr(aeron_version_full()) }; let _cmdline = clap::App::new("aeronmd") @@ -64,6 +68,25 @@ fn main() { } } + if init_success { + let term_validator = unsafe { + aeron_driver_context_set_driver_termination_validator( + context, + Some(termination_validator), + ptr::null_mut() + ) + }; + if term_validator < 0 { + let err_code = unsafe { aeron_errcode() }; + let err_str = unsafe { CStr::from_ptr(aeron_errmsg()) }.to_str().unwrap(); + eprintln!( + "ERROR: context set termination validator ({}), {}", + err_code, err_str + ); + init_success = false + } + } + if init_success { let driver_init = unsafe { aeron_driver_init(&mut driver, context) }; if driver_init < 0 { From 6e1645a7c169700b530659580edbfe5ba1069f9f Mon Sep 17 00:00:00 2001 From: Bradlee Speice Date: Sun, 6 Oct 2019 18:42:38 -0400 Subject: [PATCH 29/40] Formatting fix --- examples/aeronmd.rs | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/examples/aeronmd.rs b/examples/aeronmd.rs index 6909ff3..f3a258b 100644 --- a/examples/aeronmd.rs +++ b/examples/aeronmd.rs @@ -17,7 +17,11 @@ unsafe extern "C" fn termination_hook(_clientd: *mut c_void) { RUNNING.store(false, Ordering::SeqCst); } -unsafe extern "C" fn termination_validator(_state: *mut c_void, _buffer: *mut u8, _length: i32) -> bool { +unsafe extern "C" fn termination_validator( + _state: *mut c_void, + _buffer: *mut u8, + _length: i32, +) -> bool { true } @@ -73,7 +77,7 @@ fn main() { aeron_driver_context_set_driver_termination_validator( context, Some(termination_validator), - ptr::null_mut() + ptr::null_mut(), ) }; if term_validator < 0 { From 61a02711c05fd083faf0a765698768037e4703eb Mon Sep 17 00:00:00 2001 From: Bradlee Speice Date: Sun, 6 Oct 2019 19:12:35 -0400 Subject: [PATCH 30/40] Remove the termination validator As was pointed out in the main Aeron project, should use an environment variable instead. --- examples/aeronmd.rs | 19 ------------------- 1 file changed, 19 deletions(-) diff --git a/examples/aeronmd.rs b/examples/aeronmd.rs index f3a258b..1a972b5 100644 --- a/examples/aeronmd.rs +++ b/examples/aeronmd.rs @@ -72,25 +72,6 @@ fn main() { } } - if init_success { - let term_validator = unsafe { - aeron_driver_context_set_driver_termination_validator( - context, - Some(termination_validator), - ptr::null_mut(), - ) - }; - if term_validator < 0 { - let err_code = unsafe { aeron_errcode() }; - let err_str = unsafe { CStr::from_ptr(aeron_errmsg()) }.to_str().unwrap(); - eprintln!( - "ERROR: context set termination validator ({}), {}", - err_code, err_str - ); - init_success = false - } - } - if init_success { let driver_init = unsafe { aeron_driver_init(&mut driver, context) }; if driver_init < 0 { From adfa40124560fa3e32a6fa90a5f7ab1bd40044ba Mon Sep 17 00:00:00 2001 From: Bradlee Speice Date: Sun, 6 Oct 2019 20:45:57 -0400 Subject: [PATCH 31/40] Add duty cycle functionality to the media driver Eventually want to receive term events in tests --- aeron_driver-sys/examples/aeronmd.rs | 98 ++++++++++++++++ examples/aeronmd.rs | 109 +++--------------- src/client/cnc_descriptor.rs | 9 +- src/driver.rs | 161 +++++++++++++++++++-------- 4 files changed, 233 insertions(+), 144 deletions(-) create mode 100644 aeron_driver-sys/examples/aeronmd.rs diff --git a/aeron_driver-sys/examples/aeronmd.rs b/aeron_driver-sys/examples/aeronmd.rs new file mode 100644 index 0000000..9247fd5 --- /dev/null +++ b/aeron_driver-sys/examples/aeronmd.rs @@ -0,0 +1,98 @@ +//! Media driver startup example based on +//! [aeronmd.c](https://github.com/real-logic/aeron/blob/master/aeron-driver/src/main/c/aeronmd.c) +//! This example demonstrates direct usage of the -sys bindings for the Media Driver API. +//! The main crate has a more Rust-idiomatic example usage. + +use aeron_driver_sys::*; +use clap; +use ctrlc; +use std::ffi::CStr; +use std::os::raw::c_void; +use std::ptr; +use std::sync::atomic::{AtomicBool, Ordering}; + +static RUNNING: AtomicBool = AtomicBool::new(true); + +unsafe extern "C" fn termination_hook(_clientd: *mut c_void) { + println!("Terminated"); + RUNNING.store(false, Ordering::SeqCst); +} + +fn main() { + let version = unsafe { CStr::from_ptr(aeron_version_full()) }; + let _cmdline = clap::App::new("aeronmd") + .version(version.to_str().unwrap()) + .get_matches(); + + // TODO: Handle -D switches + + ctrlc::set_handler(move || { + // TODO: Actually understand atomic ordering + RUNNING.store(false, Ordering::SeqCst); + }) + .unwrap(); + + let mut init_success = true; + let mut context: *mut aeron_driver_context_t = ptr::null_mut(); + let mut driver: *mut aeron_driver_t = ptr::null_mut(); + + if init_success { + let context_init = unsafe { aeron_driver_context_init(&mut context) }; + if context_init < 0 { + let err_code = unsafe { aeron_errcode() }; + let err_str = unsafe { CStr::from_ptr(aeron_errmsg()) }.to_str().unwrap(); + eprintln!("ERROR: context init ({}) {}", err_code, err_str); + init_success = false; + } + } + + if init_success { + let term_hook = unsafe { + aeron_driver_context_set_driver_termination_hook( + context, + Some(termination_hook), + ptr::null_mut(), + ) + }; + if term_hook < 0 { + let err_code = unsafe { aeron_errcode() }; + let err_str = unsafe { CStr::from_ptr(aeron_errmsg()) }.to_str().unwrap(); + eprintln!( + "ERROR: context set termination hook ({}) {}", + err_code, err_str + ); + init_success = false; + } + } + + if init_success { + let driver_init = unsafe { aeron_driver_init(&mut driver, context) }; + if driver_init < 0 { + let err_code = unsafe { aeron_errcode() }; + let err_str = unsafe { CStr::from_ptr(aeron_errmsg()) }.to_str().unwrap(); + eprintln!("ERROR: driver init ({}) {}", err_code, err_str); + init_success = false; + } + } + + if init_success { + let driver_start = unsafe { aeron_driver_start(driver, true) }; + if driver_start < 0 { + let err_code = unsafe { aeron_errcode() }; + let err_str = unsafe { CStr::from_ptr(aeron_errmsg()) }.to_str().unwrap(); + eprintln!("ERROR: driver start ({}) {}", err_code, err_str); + init_success = false; + } + } + + if init_success { + println!("Press Ctrl-C to exit."); + + while RUNNING.load(Ordering::SeqCst) { + unsafe { aeron_driver_main_idle_strategy(driver, aeron_driver_main_do_work(driver)) }; + } + } + + unsafe { aeron_driver_close(driver) }; + unsafe { aeron_driver_context_close(context) }; +} diff --git a/examples/aeronmd.rs b/examples/aeronmd.rs index 1a972b5..0cb7672 100644 --- a/examples/aeronmd.rs +++ b/examples/aeronmd.rs @@ -1,105 +1,22 @@ -//! Media driver startup example based on -//! [aeronmd.c](https://github.com/real-logic/aeron/blob/master/aeron-driver/src/main/c/aeronmd.c) -#![deny(missing_docs)] - -use aeron_driver_sys::*; -use clap; -use ctrlc; -use std::ffi::CStr; -use std::os::raw::c_void; -use std::ptr; +//! A version of the `aeronmd` runner program demonstrating the Rust wrappers +//! around Media Driver functionality. +use aeron_rs::driver::DriverContext; use std::sync::atomic::{AtomicBool, Ordering}; -static RUNNING: AtomicBool = AtomicBool::new(true); - -unsafe extern "C" fn termination_hook(_clientd: *mut c_void) { - println!("Terminated"); - RUNNING.store(false, Ordering::SeqCst); -} - -unsafe extern "C" fn termination_validator( - _state: *mut c_void, - _buffer: *mut u8, - _length: i32, -) -> bool { - true -} +static RUNNING: AtomicBool = AtomicBool::new(false); fn main() { - let version = unsafe { CStr::from_ptr(aeron_version_full()) }; - let _cmdline = clap::App::new("aeronmd") - .version(version.to_str().unwrap()) - .get_matches(); + let driver = DriverContext::default() + .build() + .expect("Unable to create media driver"); - // TODO: Handle -D switches + let driver = driver.start().expect("Unable to start media driver"); + RUNNING.store(true, Ordering::SeqCst); - ctrlc::set_handler(move || { - // TODO: Actually understand atomic ordering - RUNNING.store(false, Ordering::SeqCst); - }) - .unwrap(); + println!("Press Ctrl-C to quit"); - let mut init_success = true; - let mut context: *mut aeron_driver_context_t = ptr::null_mut(); - let mut driver: *mut aeron_driver_t = ptr::null_mut(); - - if init_success { - let context_init = unsafe { aeron_driver_context_init(&mut context) }; - if context_init < 0 { - let err_code = unsafe { aeron_errcode() }; - let err_str = unsafe { CStr::from_ptr(aeron_errmsg()) }.to_str().unwrap(); - eprintln!("ERROR: context init ({}) {}", err_code, err_str); - init_success = false; - } + while RUNNING.load(Ordering::SeqCst) { + // TODO: Termination hook + driver.do_work(); } - - if init_success { - let term_hook = unsafe { - aeron_driver_context_set_driver_termination_hook( - context, - Some(termination_hook), - ptr::null_mut(), - ) - }; - if term_hook < 0 { - let err_code = unsafe { aeron_errcode() }; - let err_str = unsafe { CStr::from_ptr(aeron_errmsg()) }.to_str().unwrap(); - eprintln!( - "ERROR: context set termination hook ({}) {}", - err_code, err_str - ); - init_success = false; - } - } - - if init_success { - let driver_init = unsafe { aeron_driver_init(&mut driver, context) }; - if driver_init < 0 { - let err_code = unsafe { aeron_errcode() }; - let err_str = unsafe { CStr::from_ptr(aeron_errmsg()) }.to_str().unwrap(); - eprintln!("ERROR: driver init ({}) {}", err_code, err_str); - init_success = false; - } - } - - if init_success { - let driver_start = unsafe { aeron_driver_start(driver, true) }; - if driver_start < 0 { - let err_code = unsafe { aeron_errcode() }; - let err_str = unsafe { CStr::from_ptr(aeron_errmsg()) }.to_str().unwrap(); - eprintln!("ERROR: driver start ({}) {}", err_code, err_str); - init_success = false; - } - } - - if init_success { - println!("Press Ctrl-C to exit."); - - while RUNNING.load(Ordering::SeqCst) { - unsafe { aeron_driver_main_idle_strategy(driver, aeron_driver_main_do_work(driver)) }; - } - } - - unsafe { aeron_driver_close(driver) }; - unsafe { aeron_driver_context_close(context) }; } diff --git a/src/client/cnc_descriptor.rs b/src/client/cnc_descriptor.rs index 6724a80..f5b6c18 100644 --- a/src/client/cnc_descriptor.rs +++ b/src/client/cnc_descriptor.rs @@ -70,7 +70,8 @@ pub const CNC_FILE: &str = "cnc.dat"; #[cfg(test)] mod tests { use crate::client::cnc_descriptor::{MetaDataDefinition, CNC_FILE, CNC_VERSION}; - use crate::driver::{DriverContext, MediaDriver}; + use crate::client::concurrent::atomic_buffer::AtomicBuffer; + use crate::driver::DriverContext; use memmap::MmapOptions; use std::fs::File; use tempfile::tempdir; @@ -81,8 +82,10 @@ mod tests { let dir_path = dir.as_ref().to_path_buf(); dir.close().unwrap(); - let context = DriverContext::default().set_aeron_dir(&dir_path); - let _driver = MediaDriver::with_context(context).unwrap(); + let _driver = DriverContext::default() + .set_aeron_dir(&dir_path) + .build() + .unwrap(); // Open the CnC location let cnc_path = dir_path.join(CNC_FILE); diff --git a/src/driver.rs b/src/driver.rs index 3876752..2918386 100644 --- a/src/driver.rs +++ b/src/driver.rs @@ -5,6 +5,8 @@ use std::path::Path; use std::ptr; use aeron_driver_sys::*; +use std::marker::PhantomData; +use std::mem::replace; /// Error code and message returned by the Media Driver #[derive(Debug, PartialEq)] @@ -13,24 +15,7 @@ pub struct DriverError { msg: String, } -/// Context used to set up the Media Driver -#[derive(Default)] -pub struct DriverContext { - aeron_dir: Option, -} - -impl DriverContext { - /// Set the Aeron directory path that will be used for storing the files - /// Aeron uses to communicate with clients. - pub fn set_aeron_dir(mut self, path: &Path) -> Self { - // UNWRAP: Fails only if the path is non-UTF8 - let path_bytes = path.to_str().unwrap().as_bytes(); - // UNWRAP: Fails only if there is a null byte in the provided path - let c_string = CString::new(path_bytes).unwrap(); - self.aeron_dir = Some(c_string); - self - } -} +type Result = std::result::Result; macro_rules! aeron_op { ($op:expr) => { @@ -47,40 +32,81 @@ macro_rules! aeron_op { }; } -/// Holder object to interface with the Media Driver -#[derive(Debug)] -pub struct MediaDriver { - c_context: *mut aeron_driver_context_t, - c_driver: *mut aeron_driver_t, +/// Context used to set up the Media Driver +#[derive(Default)] +pub struct DriverContext { + aeron_dir: Option, + dir_delete_on_start: Option, } -impl MediaDriver { - /// Set up a new Media Driver - pub fn with_context(mut context: DriverContext) -> Result { +impl DriverContext { + /// Set the Aeron directory path that will be used for storing the files + /// Aeron uses to communicate with clients. + pub fn set_aeron_dir(mut self, path: &Path) -> Self { + // UNWRAP: Fails only if the path is non-UTF8 + let path_bytes = path.to_str().unwrap().as_bytes(); + // UNWRAP: Fails only if there is a null byte in the provided path + let c_string = CString::new(path_bytes).unwrap(); + self.aeron_dir = Some(c_string); + self + } + + /// Set whether Aeron should attempt to delete the `aeron_dir` on startup + /// if it already exists. Aeron will attempt to remove the directory if true. + /// If `aeron_dir` is not set in the `DriverContext`, Aeron will still attempt + /// to remove the default Aeron directory. + pub fn set_dir_delete_on_start(mut self, delete: bool) -> Self { + self.dir_delete_on_start = Some(delete); + self + } + + /// Construct a Media Driver given the context options + pub fn build(mut self) -> Result> { let mut driver = MediaDriver { c_context: ptr::null_mut(), c_driver: ptr::null_mut(), + _state: PhantomData, }; unsafe { aeron_op!(aeron_driver_context_init(&mut driver.c_context)) }?; - context.aeron_dir.take().map(|dir| unsafe { + self.aeron_dir.take().map(|dir| unsafe { aeron_op!(aeron_driver_context_set_dir( driver.c_context, dir.into_raw() )) }); + self.dir_delete_on_start.take().map(|delete| unsafe { + aeron_op!(aeron_driver_context_set_dir_delete_on_start( + driver.c_context, + delete + )) + }); + unsafe { aeron_op!(aeron_driver_init(&mut driver.c_driver, driver.c_context)) }?; Ok(driver) } +} - /// Set up a new Media Driver with default options - pub fn new() -> Result { - Self::with_context(DriverContext::default()) - } +/// Holder object to interface with the Media Driver +#[derive(Debug)] +pub struct MediaDriver { + c_context: *mut aeron_driver_context_t, + c_driver: *mut aeron_driver_t, + _state: PhantomData, +} +/// Marker type for a MediaDriver that has yet to be started +#[derive(Debug)] +pub struct DriverInitialized; + +/// Marker type for a MediaDriver that has been started +#[derive(Debug)] +pub struct DriverStarted; + +impl MediaDriver { /// Retrieve the C library version in (major, minor, patch) format pub fn driver_version() -> (u32, u32, u32) { unsafe { @@ -93,7 +119,40 @@ impl MediaDriver { } } -impl Drop for MediaDriver { +impl MediaDriver { + /// Set up a new Media Driver with default options + pub fn new() -> Result { + DriverContext::default().build() + } + + /// Start the Media Driver threads; does not take control of the current thread + pub fn start(mut self) -> Result> { + unsafe { aeron_op!(aeron_driver_start(self.c_driver, true)) }?; + + // Move the driver and context references so the drop of `self` can't trigger issues + // when the new media driver is also eventually dropped + let c_driver = replace(&mut self.c_driver, ptr::null_mut()); + let c_context = replace(&mut self.c_context, ptr::null_mut()); + + Ok(MediaDriver { + c_driver, + c_context, + _state: PhantomData, + }) + } +} + +impl MediaDriver { + /// Perform a single idle cycle of the Media Driver; does not take control of + /// the current thread + pub fn do_work(&self) { + unsafe { + aeron_driver_main_idle_strategy(self.c_driver, aeron_driver_main_do_work(self.c_driver)) + }; + } +} + +impl Drop for MediaDriver { fn drop(&mut self) { if !self.c_driver.is_null() { unsafe { aeron_op!(aeron_driver_close(self.c_driver)) }.unwrap(); @@ -106,37 +165,34 @@ impl Drop for MediaDriver { #[cfg(test)] mod tests { - use crate::driver::{DriverContext, DriverError, MediaDriver}; + use crate::driver::{DriverContext, DriverError}; use std::ffi::CStr; use tempfile::tempdir; #[test] fn multiple_startup_failure() { - // We immediately close `tempdir` because we just want the name; Aeron needs - // to set up the directory itself. - let dir = tempdir().unwrap(); - let dir_path = dir.as_ref().to_path_buf(); - dir.close().unwrap(); - - let context = DriverContext::default().set_aeron_dir(&dir_path); - let driver = MediaDriver::with_context(context).unwrap(); + let dir = tempdir().unwrap().into_path(); + let driver = DriverContext::default() + .set_aeron_dir(&dir) + .set_dir_delete_on_start(true) + .build() + .unwrap(); assert_eq!( unsafe { CStr::from_ptr((*driver.c_context).aeron_dir) }.to_str(), - Ok(dir_path.to_str().unwrap()) + Ok(dir.to_str().unwrap()) ); drop(driver); // Attempting to start a media driver twice in rapid succession is guaranteed // cause an issue because the new media driver must wait for a heartbeat timeout. - let context = DriverContext::default().set_aeron_dir(&dir_path); - let driver_res = MediaDriver::with_context(context); + let driver_res = DriverContext::default().set_aeron_dir(&dir).build(); // TODO: Why is the error message behavior different on Windows? let expected_message = if cfg!(target_os = "windows") { String::new() } else { - format!("could not recreate aeron dir {}: ", dir_path.display()) + format!("could not recreate aeron dir {}: ", dir.display()) }; assert!(driver_res.is_err()); @@ -148,4 +204,19 @@ mod tests { } ); } + + #[test] + fn single_duty_cycle() { + let tempdir = tempfile::tempdir().unwrap(); + let path = tempdir.into_path(); + + let driver = DriverContext::default() + .set_aeron_dir(&path) + .set_dir_delete_on_start(true) + .build() + .expect("Unable to create media driver") + .start() + .expect("Unable to start driver"); + driver.do_work(); + } } From 143e21bf0aef1da49be9eaadf9ebcbea8c6deaac Mon Sep 17 00:00:00 2001 From: Bradlee Speice Date: Sun, 6 Oct 2019 20:54:11 -0400 Subject: [PATCH 32/40] Minor cleanup of CnC descriptor test --- src/client/cnc_descriptor.rs | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/src/client/cnc_descriptor.rs b/src/client/cnc_descriptor.rs index f5b6c18..fbe07bc 100644 --- a/src/client/cnc_descriptor.rs +++ b/src/client/cnc_descriptor.rs @@ -70,7 +70,6 @@ pub const CNC_FILE: &str = "cnc.dat"; #[cfg(test)] mod tests { use crate::client::cnc_descriptor::{MetaDataDefinition, CNC_FILE, CNC_VERSION}; - use crate::client::concurrent::atomic_buffer::AtomicBuffer; use crate::driver::DriverContext; use memmap::MmapOptions; use std::fs::File; @@ -78,17 +77,15 @@ mod tests { #[test] fn read_cnc_version() { - let dir = tempdir().unwrap(); - let dir_path = dir.as_ref().to_path_buf(); - dir.close().unwrap(); - + let dir = tempdir().unwrap().into_path(); let _driver = DriverContext::default() - .set_aeron_dir(&dir_path) + .set_aeron_dir(&dir) + .set_dir_delete_on_start(true) .build() .unwrap(); // Open the CnC location - let cnc_path = dir_path.join(CNC_FILE); + let cnc_path = dir.join(CNC_FILE); let cnc_file = File::open(&cnc_path).expect("Unable to open CnC file"); let mmap = unsafe { MmapOptions::default() From f4c2fcf0c53654d99ba0293d0b3fc65c1688b1a4 Mon Sep 17 00:00:00 2001 From: Bradlee Speice Date: Sun, 6 Oct 2019 21:37:38 -0400 Subject: [PATCH 33/40] Finally able to terminate a driver. Need to write a test for it, but it's working locally. --- examples/do_terminate.rs | 7 +++++-- src/client/cnc_descriptor.rs | 7 +++++++ src/client/concurrent/ring_buffer.rs | 4 ++-- src/util.rs | 11 ++++++++--- 4 files changed, 22 insertions(+), 7 deletions(-) diff --git a/examples/do_terminate.rs b/examples/do_terminate.rs index 81e818c..00b30bb 100644 --- a/examples/do_terminate.rs +++ b/examples/do_terminate.rs @@ -6,6 +6,7 @@ use aeron_rs::util::IndexT; use memmap::MmapOptions; use std::fs::OpenOptions; use std::mem::size_of; +use aeron_rs::client::cnc_descriptor; fn main() { let path = ClientContext::default_aeron_path(); @@ -22,8 +23,8 @@ fn main() { println!("MMap len: {}", mmap.len()); // When creating the buffer, we need to offset by the CnC metadata - let cnc_metadata_len = size_of::(); - println!("Buffer len: {}", mmap[cnc_metadata_len..].len()); + let cnc_metadata_len = cnc_descriptor::META_DATA_LENGTH; + println!("Buffer start: {}", cnc_metadata_len); // Read metadata to get buffer length let buffer_len = { @@ -31,6 +32,7 @@ fn main() { let metadata = atomic_buffer.overlay::(0).unwrap(); metadata.to_driver_buffer_length }; + println!("Buffer len: {}", buffer_len); let buffer_end = cnc_metadata_len + buffer_len as usize; let atomic_buffer = AtomicBuffer::wrap(&mut mmap[cnc_metadata_len..buffer_end]); @@ -43,6 +45,7 @@ fn main() { let mut source_buffer = AtomicBuffer::wrap(&mut terminate_bytes); let client_id = ring_buffer.next_correlation_id(); source_buffer.put_i64_ordered(0, client_id).unwrap(); + source_buffer.put_i64_ordered(8, -1).unwrap(); let term_id: i32 = 0x0E; ring_buffer diff --git a/src/client/cnc_descriptor.rs b/src/client/cnc_descriptor.rs index fbe07bc..410ab20 100644 --- a/src/client/cnc_descriptor.rs +++ b/src/client/cnc_descriptor.rs @@ -19,6 +19,9 @@ //! +-----------------------------+ //! ``` +use crate::util::bit; +use std::mem::size_of; + /// The CnC file metadata header. Layout: /// /// ```text @@ -61,6 +64,10 @@ pub struct MetaDataDefinition { _pid: i64, } +/// Length of the metadata block in a CnC file. Note that it's not equivalent +/// to the actual struct length. +pub const META_DATA_LENGTH: usize = bit::align_usize(size_of::(), bit::CACHE_LINE_LENGTH * 2); + /// Version code for the Aeron CnC file format pub const CNC_VERSION: i32 = crate::sematic_version_compose(0, 0, 16); diff --git a/src/client/concurrent/ring_buffer.rs b/src/client/concurrent/ring_buffer.rs index 4cbbaa8..fbc0982 100644 --- a/src/client/concurrent/ring_buffer.rs +++ b/src/client/concurrent/ring_buffer.rs @@ -5,9 +5,9 @@ use crate::util::{bit, AeronError, IndexT, Result}; /// Description of the Ring Buffer schema. pub mod buffer_descriptor { use crate::client::concurrent::atomic_buffer::AtomicBuffer; - use crate::util::bit::is_power_of_two; + use crate::util::bit::{is_power_of_two, CACHE_LINE_LENGTH}; use crate::util::AeronError::IllegalArgument; - use crate::util::{IndexT, Result, CACHE_LINE_LENGTH}; + use crate::util::{IndexT, Result}; // QUESTION: Why are these offsets so large when we only ever use i64 types? diff --git a/src/util.rs b/src/util.rs index db0dfdc..534f5c5 100644 --- a/src/util.rs +++ b/src/util.rs @@ -6,9 +6,6 @@ // QUESTION: Can this just be updated to be `usize` in Rust? pub type IndexT = i32; -/// Length of the data blocks used by the CPU cache sub-system in bytes -pub const CACHE_LINE_LENGTH: usize = 64; - /// Error types from operations in the Aeron client. Synonymous with the exceptions /// generated by the C++ client. #[derive(Debug, PartialEq)] @@ -29,6 +26,9 @@ pub mod bit { use crate::util::IndexT; use num::PrimInt; + /// Length of the data blocks used by the CPU cache sub-system in bytes + pub const CACHE_LINE_LENGTH: usize = 64; + /// Helper method for quick verification that `IndexT` is a positive power of two /// /// ```rust @@ -56,4 +56,9 @@ pub mod bit { { (val + (alignment - T::one())) & !(alignment - T::one()) } + + /// Align a `usize` value. See `align` for similar functionality on general types. + pub const fn align_usize(val: usize, alignment: usize) -> usize { + (val + (alignment - 1)) & !(alignment - 1) + } } From a755382f24c7a3a6c792682acc56a0312a86c8fb Mon Sep 17 00:00:00 2001 From: Bradlee Speice Date: Sun, 6 Oct 2019 21:38:38 -0400 Subject: [PATCH 34/40] Add dev-dependencies for `sys` crate --- aeron_driver-sys/Cargo.toml | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/aeron_driver-sys/Cargo.toml b/aeron_driver-sys/Cargo.toml index 8aac0d8..44bf8ba 100644 --- a/aeron_driver-sys/Cargo.toml +++ b/aeron_driver-sys/Cargo.toml @@ -13,5 +13,9 @@ bindgen = "0.51" cmake = "0.1" dunce = "1.0.0" +[dev-dependencies] +clap = "2.33" +ctrlc = "3.1" + [features] static = [] From dc9fd52e073fac37407388f7a2444fccb007dbea Mon Sep 17 00:00:00 2001 From: Bradlee Speice Date: Sun, 6 Oct 2019 23:16:47 -0400 Subject: [PATCH 35/40] Able to successfully terminate a driver!!! Needs a *ton* of work to be more Rust-idiomatic, but the basics are there. --- examples/do_terminate.rs | 54 -------------- src/client/cnc_descriptor.rs | 3 +- tests/cnc_terminate.rs | 141 +++++++++++++++++++++++++++++++++++ 3 files changed, 143 insertions(+), 55 deletions(-) delete mode 100644 examples/do_terminate.rs create mode 100644 tests/cnc_terminate.rs diff --git a/examples/do_terminate.rs b/examples/do_terminate.rs deleted file mode 100644 index 00b30bb..0000000 --- a/examples/do_terminate.rs +++ /dev/null @@ -1,54 +0,0 @@ -use aeron_rs::client::cnc_descriptor::MetaDataDefinition; -use aeron_rs::client::concurrent::atomic_buffer::AtomicBuffer; -use aeron_rs::client::concurrent::ring_buffer::ManyToOneRingBuffer; -use aeron_rs::client::context::ClientContext; -use aeron_rs::util::IndexT; -use memmap::MmapOptions; -use std::fs::OpenOptions; -use std::mem::size_of; -use aeron_rs::client::cnc_descriptor; - -fn main() { - let path = ClientContext::default_aeron_path(); - let cnc = path.join("cnc.dat"); - - println!("Opening CnC file: {}", cnc.display()); - let file = OpenOptions::new() - .read(true) - .write(true) - .open(&cnc) - .expect("Unable to open CnC file"); - let mut mmap = - unsafe { MmapOptions::default().map_mut(&file) }.expect("Unable to mmap CnC file"); - println!("MMap len: {}", mmap.len()); - - // When creating the buffer, we need to offset by the CnC metadata - let cnc_metadata_len = cnc_descriptor::META_DATA_LENGTH; - println!("Buffer start: {}", cnc_metadata_len); - - // Read metadata to get buffer length - let buffer_len = { - let atomic_buffer = AtomicBuffer::wrap(&mut mmap); - let metadata = atomic_buffer.overlay::(0).unwrap(); - metadata.to_driver_buffer_length - }; - println!("Buffer len: {}", buffer_len); - - let buffer_end = cnc_metadata_len + buffer_len as usize; - let atomic_buffer = AtomicBuffer::wrap(&mut mmap[cnc_metadata_len..buffer_end]); - let mut ring_buffer = - ManyToOneRingBuffer::wrap(atomic_buffer).expect("Improperly sized buffer"); - - // 20 bytes: Client ID (8), correlation ID (8), token length (4) - let mut terminate_bytes = vec![0u8; 20]; - let terminate_len = terminate_bytes.len(); - let mut source_buffer = AtomicBuffer::wrap(&mut terminate_bytes); - let client_id = ring_buffer.next_correlation_id(); - source_buffer.put_i64_ordered(0, client_id).unwrap(); - source_buffer.put_i64_ordered(8, -1).unwrap(); - - let term_id: i32 = 0x0E; - ring_buffer - .write(term_id, &source_buffer, 0, terminate_len as IndexT) - .unwrap(); -} diff --git a/src/client/cnc_descriptor.rs b/src/client/cnc_descriptor.rs index 410ab20..3ea3e88 100644 --- a/src/client/cnc_descriptor.rs +++ b/src/client/cnc_descriptor.rs @@ -66,7 +66,8 @@ pub struct MetaDataDefinition { /// Length of the metadata block in a CnC file. Note that it's not equivalent /// to the actual struct length. -pub const META_DATA_LENGTH: usize = bit::align_usize(size_of::(), bit::CACHE_LINE_LENGTH * 2); +pub const META_DATA_LENGTH: usize = + bit::align_usize(size_of::(), bit::CACHE_LINE_LENGTH * 2); /// Version code for the Aeron CnC file format pub const CNC_VERSION: i32 = crate::sematic_version_compose(0, 0, 16); diff --git a/tests/cnc_terminate.rs b/tests/cnc_terminate.rs new file mode 100644 index 0000000..f6aba3b --- /dev/null +++ b/tests/cnc_terminate.rs @@ -0,0 +1,141 @@ +use aeron_driver_sys::*; +use aeron_rs::client::cnc_descriptor; +use aeron_rs::client::concurrent::atomic_buffer::AtomicBuffer; +use aeron_rs::client::concurrent::ring_buffer::ManyToOneRingBuffer; +use aeron_rs::util::IndexT; +use memmap::MmapOptions; +use std::ffi::{c_void, CString}; +use std::fs::OpenOptions; +use std::path::PathBuf; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::time::Duration; +use std::{ptr, thread}; +use tempfile::tempdir; + +static RUNNING: AtomicBool = AtomicBool::new(false); + +unsafe extern "C" fn termination_hook(_state: *mut c_void) { + RUNNING.store(false, Ordering::SeqCst); +} + +unsafe extern "C" fn termination_validation( + _state: *mut c_void, + _data: *mut u8, + _length: i32, +) -> bool { + true +} + +fn driver_thread(aeron_dir: PathBuf) { + // Code largely ripped from `aeronmd`. Extra bits for termination and + // termination validation added as necessary, and far coarser error handling. + let mut context: *mut aeron_driver_context_t = ptr::null_mut(); + let mut driver: *mut aeron_driver_t = ptr::null_mut(); + + let context_init = unsafe { aeron_driver_context_init(&mut context) }; + assert!(context_init >= 0); + + let path_bytes = aeron_dir.to_str().unwrap().as_bytes(); + let c_string = CString::new(path_bytes).unwrap(); + + let aeron_dir = unsafe { aeron_driver_context_set_dir(context, c_string.into_raw()) }; + assert!(aeron_dir >= 0); + + let term_hook = unsafe { + aeron_driver_context_set_driver_termination_hook( + context, + Some(termination_hook), + ptr::null_mut(), + ) + }; + assert!(term_hook >= 0); + + let term_validation_hook = unsafe { + aeron_driver_context_set_driver_termination_validator( + context, + Some(termination_validation), + ptr::null_mut(), + ) + }; + assert!(term_validation_hook >= 0); + + let delete_dir = unsafe { aeron_driver_context_set_dir_delete_on_start(context, true) }; + assert!(delete_dir >= 0); + + let driver_init = unsafe { aeron_driver_init(&mut driver, context) }; + assert!(driver_init >= 0); + + let driver_start = unsafe { aeron_driver_start(driver, true) }; + assert!(driver_start >= 0); + + RUNNING.store(true, Ordering::SeqCst); + while RUNNING.load(Ordering::SeqCst) { + unsafe { aeron_driver_main_idle_strategy(driver, aeron_driver_main_do_work(driver)) }; + } + + unsafe { aeron_driver_close(driver) }; + unsafe { aeron_driver_context_close(context) }; +} + +#[test] +fn cnc_terminate() { + let dir = tempdir().unwrap().into_path(); + + // Start up the media driver + let driver_dir = dir.clone(); + let driver_thread = thread::Builder::new() + .name("cnc_terminate__driver_thread".to_string()) + .spawn(|| driver_thread(driver_dir)) + .unwrap(); + + // Sleep a moment to let the media driver get set up + thread::sleep(Duration::from_millis(500)); + assert_eq!(RUNNING.load(Ordering::SeqCst), true); + + // Write to the CnC file to attempt termination + let cnc = dir.join(cnc_descriptor::CNC_FILE); + let file = OpenOptions::new() + .read(true) + .write(true) + .open(&cnc) + .expect("Unable to open CnC file"); + let mut mmap = + unsafe { MmapOptions::default().map_mut(&file) }.expect("Unable to mmap CnC file"); + + // When creating the buffer, we need to offset by the CnC metadata + let cnc_metadata_len = cnc_descriptor::META_DATA_LENGTH; + + // Read metadata to get buffer length + let buffer_len = { + let atomic_buffer = AtomicBuffer::wrap(&mut mmap); + let metadata = atomic_buffer + .overlay::(0) + .unwrap(); + metadata.to_driver_buffer_length + }; + + let buffer_end = cnc_metadata_len + buffer_len as usize; + let atomic_buffer = AtomicBuffer::wrap(&mut mmap[cnc_metadata_len..buffer_end]); + let mut ring_buffer = + ManyToOneRingBuffer::wrap(atomic_buffer).expect("Improperly sized buffer"); + + // 20 bytes: Client ID (8), correlation ID (8), token length (4) + let mut terminate_bytes = vec![0u8; 20]; + let terminate_len = terminate_bytes.len(); + let mut source_buffer = AtomicBuffer::wrap(&mut terminate_bytes); + let client_id = ring_buffer.next_correlation_id(); + source_buffer.put_i64_ordered(0, client_id).unwrap(); + source_buffer.put_i64_ordered(8, -1).unwrap(); + + let term_id: i32 = 0x0E; + ring_buffer + .write(term_id, &source_buffer, 0, terminate_len as IndexT) + .unwrap(); + + // Wait for the driver to finish + // TODO: Timeout, and then set `RUNNING` manually + driver_thread + .join() + .expect("Driver thread panicked during execution"); + assert_eq!(RUNNING.load(Ordering::SeqCst), false); +} From 7e8f49c3470fb07dd89b819692353d114cd3c632 Mon Sep 17 00:00:00 2001 From: Bradlee Speice Date: Sun, 6 Oct 2019 23:18:57 -0400 Subject: [PATCH 36/40] Move `aeronmd` back out of -sys. Clap/Ctrlc are still causing issues during static link. --- aeron_driver-sys/Cargo.toml | 4 ---- .../examples/aeronmd.rs => examples/aeronmd_sys.rs | 1 - 2 files changed, 5 deletions(-) rename aeron_driver-sys/examples/aeronmd.rs => examples/aeronmd_sys.rs (98%) diff --git a/aeron_driver-sys/Cargo.toml b/aeron_driver-sys/Cargo.toml index 44bf8ba..8aac0d8 100644 --- a/aeron_driver-sys/Cargo.toml +++ b/aeron_driver-sys/Cargo.toml @@ -13,9 +13,5 @@ bindgen = "0.51" cmake = "0.1" dunce = "1.0.0" -[dev-dependencies] -clap = "2.33" -ctrlc = "3.1" - [features] static = [] diff --git a/aeron_driver-sys/examples/aeronmd.rs b/examples/aeronmd_sys.rs similarity index 98% rename from aeron_driver-sys/examples/aeronmd.rs rename to examples/aeronmd_sys.rs index 9247fd5..a3dcb5f 100644 --- a/aeron_driver-sys/examples/aeronmd.rs +++ b/examples/aeronmd_sys.rs @@ -1,7 +1,6 @@ //! Media driver startup example based on //! [aeronmd.c](https://github.com/real-logic/aeron/blob/master/aeron-driver/src/main/c/aeronmd.c) //! This example demonstrates direct usage of the -sys bindings for the Media Driver API. -//! The main crate has a more Rust-idiomatic example usage. use aeron_driver_sys::*; use clap; From 2cd2ae9995f576a3cca6a0642325b2772d8d43bd Mon Sep 17 00:00:00 2001 From: Bradlee Speice Date: Mon, 7 Oct 2019 07:02:02 -0400 Subject: [PATCH 37/40] Fix issues with delete on start Seems like Windows doesn't handle it properly? --- src/client/cnc_descriptor.rs | 5 ++++- src/driver.rs | 12 +++++++----- 2 files changed, 11 insertions(+), 6 deletions(-) diff --git a/src/client/cnc_descriptor.rs b/src/client/cnc_descriptor.rs index 3ea3e88..ff001ec 100644 --- a/src/client/cnc_descriptor.rs +++ b/src/client/cnc_descriptor.rs @@ -85,10 +85,13 @@ mod tests { #[test] fn read_cnc_version() { + let temp_dir = tempdir().unwrap(); + let dir = temp_dir.path().to_path_buf(); + temp_dir.close(); + let dir = tempdir().unwrap().into_path(); let _driver = DriverContext::default() .set_aeron_dir(&dir) - .set_dir_delete_on_start(true) .build() .unwrap(); diff --git a/src/driver.rs b/src/driver.rs index 2918386..43cdc3f 100644 --- a/src/driver.rs +++ b/src/driver.rs @@ -171,10 +171,12 @@ mod tests { #[test] fn multiple_startup_failure() { - let dir = tempdir().unwrap().into_path(); + let temp_dir = tempdir().unwrap(); + let dir = temp_dir.path().to_path_buf(); + temp_dir.close(); + let driver = DriverContext::default() .set_aeron_dir(&dir) - .set_dir_delete_on_start(true) .build() .unwrap(); @@ -207,12 +209,12 @@ mod tests { #[test] fn single_duty_cycle() { - let tempdir = tempfile::tempdir().unwrap(); - let path = tempdir.into_path(); + let temp_dir = tempdir().unwrap(); + let path = temp_dir.path().to_path_buf(); + tempdir.close(); let driver = DriverContext::default() .set_aeron_dir(&path) - .set_dir_delete_on_start(true) .build() .expect("Unable to create media driver") .start() From 1a3d67f02cf44d2bdaad75caf23e7b02b78c538c Mon Sep 17 00:00:00 2001 From: Bradlee Speice Date: Mon, 7 Oct 2019 07:07:16 -0400 Subject: [PATCH 38/40] Fix a variable name --- src/driver.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/driver.rs b/src/driver.rs index 43cdc3f..0e63e5b 100644 --- a/src/driver.rs +++ b/src/driver.rs @@ -211,7 +211,7 @@ mod tests { fn single_duty_cycle() { let temp_dir = tempdir().unwrap(); let path = temp_dir.path().to_path_buf(); - tempdir.close(); + temp_dir.close(); let driver = DriverContext::default() .set_aeron_dir(&path) From 2b8e32abb77878f1040414ec24ec6c4e65c6e56d Mon Sep 17 00:00:00 2001 From: Bradlee Speice Date: Mon, 7 Oct 2019 07:34:00 -0400 Subject: [PATCH 39/40] More temporary directory fixes --- src/client/cnc_descriptor.rs | 3 +-- tests/cnc_terminate.rs | 4 +++- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/src/client/cnc_descriptor.rs b/src/client/cnc_descriptor.rs index ff001ec..4756f92 100644 --- a/src/client/cnc_descriptor.rs +++ b/src/client/cnc_descriptor.rs @@ -87,9 +87,8 @@ mod tests { fn read_cnc_version() { let temp_dir = tempdir().unwrap(); let dir = temp_dir.path().to_path_buf(); - temp_dir.close(); + temp_dir.close().unwrap(); - let dir = tempdir().unwrap().into_path(); let _driver = DriverContext::default() .set_aeron_dir(&dir) .build() diff --git a/tests/cnc_terminate.rs b/tests/cnc_terminate.rs index f6aba3b..73728b5 100644 --- a/tests/cnc_terminate.rs +++ b/tests/cnc_terminate.rs @@ -79,7 +79,9 @@ fn driver_thread(aeron_dir: PathBuf) { #[test] fn cnc_terminate() { - let dir = tempdir().unwrap().into_path(); + let temp_dir = tempdir().unwrap(); + let dir = temp_dir.path().to_path_buf(); + temp_dir.close().unwrap(); // Start up the media driver let driver_dir = dir.clone(); From 5156f48dd4731c82a29d95f11ebb9a1fe2dd2ab2 Mon Sep 17 00:00:00 2001 From: Bradlee Speice Date: Mon, 7 Oct 2019 07:49:20 -0400 Subject: [PATCH 40/40] Fix two last unused Result types --- src/driver.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/driver.rs b/src/driver.rs index 0e63e5b..352a692 100644 --- a/src/driver.rs +++ b/src/driver.rs @@ -173,7 +173,7 @@ mod tests { fn multiple_startup_failure() { let temp_dir = tempdir().unwrap(); let dir = temp_dir.path().to_path_buf(); - temp_dir.close(); + temp_dir.close().unwrap(); let driver = DriverContext::default() .set_aeron_dir(&dir) @@ -211,7 +211,7 @@ mod tests { fn single_duty_cycle() { let temp_dir = tempdir().unwrap(); let path = temp_dir.path().to_path_buf(); - temp_dir.close(); + temp_dir.close().unwrap(); let driver = DriverContext::default() .set_aeron_dir(&path)