mirror of
https://github.com/bspeice/aeron-rs
synced 2024-12-22 05:48:10 -05:00
Add a Rust-ier way of handling the MediaDriver
This commit is contained in:
parent
1889a198b9
commit
966db0767f
@ -17,3 +17,4 @@ aeron_driver-sys = { path = "./aeron_driver-sys" }
|
|||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
clap = "2.33"
|
clap = "2.33"
|
||||||
ctrlc = "3.1.3"
|
ctrlc = "3.1.3"
|
||||||
|
tempfile = "3.1"
|
||||||
|
132
src/driver.rs
Normal file
132
src/driver.rs
Normal file
@ -0,0 +1,132 @@
|
|||||||
|
//! Bindings for the C Media Driver
|
||||||
|
|
||||||
|
use std::ffi::{CStr, CString};
|
||||||
|
use std::os::unix::ffi::OsStrExt;
|
||||||
|
use std::path::Path;
|
||||||
|
use std::ptr;
|
||||||
|
|
||||||
|
use aeron_driver_sys::*;
|
||||||
|
|
||||||
|
/// Error code and message returned by the Media Driver
|
||||||
|
#[derive(Debug, PartialEq)]
|
||||||
|
pub struct DriverError {
|
||||||
|
code: i32,
|
||||||
|
msg: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Context used to set up the Media Driver
|
||||||
|
#[derive(Default)]
|
||||||
|
pub struct DriverContext {
|
||||||
|
aeron_dir: Option<CString>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl DriverContext {
|
||||||
|
/// Set the Aeron directory path that will be used for storing the files
|
||||||
|
/// Aeron uses to communicate with clients.
|
||||||
|
pub fn set_aeron_dir(mut self, path: &Path) -> Self {
|
||||||
|
// UNWRAP: Fails only if there is a null byte in the provided path
|
||||||
|
let c_string = CString::new(path.as_os_str().as_bytes()).unwrap();
|
||||||
|
self.aeron_dir = Some(c_string);
|
||||||
|
self
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
macro_rules! aeron_op {
|
||||||
|
($op:expr) => {
|
||||||
|
if $op < 0 {
|
||||||
|
let code = ::aeron_driver_sys::aeron_errcode();
|
||||||
|
let msg = CStr::from_ptr(::aeron_driver_sys::aeron_errmsg())
|
||||||
|
.to_str()
|
||||||
|
.unwrap()
|
||||||
|
.to_string();
|
||||||
|
Err(DriverError { code, msg })
|
||||||
|
} else {
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Holder object to interface with the Media Driver
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct MediaDriver {
|
||||||
|
c_context: *mut aeron_driver_context_t,
|
||||||
|
c_driver: *mut aeron_driver_t,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl MediaDriver {
|
||||||
|
/// Set up a new Media Driver
|
||||||
|
pub fn with_context(mut context: DriverContext) -> Result<Self, DriverError> {
|
||||||
|
let mut driver = MediaDriver {
|
||||||
|
c_context: ptr::null_mut(),
|
||||||
|
c_driver: ptr::null_mut(),
|
||||||
|
};
|
||||||
|
|
||||||
|
unsafe { aeron_op!(aeron_driver_context_init(&mut driver.c_context)) }?;
|
||||||
|
|
||||||
|
context.aeron_dir.take().map(|dir| unsafe {
|
||||||
|
aeron_op!(aeron_driver_context_set_dir(
|
||||||
|
driver.c_context,
|
||||||
|
dir.into_raw()
|
||||||
|
))
|
||||||
|
});
|
||||||
|
|
||||||
|
unsafe { aeron_op!(aeron_driver_init(&mut driver.c_driver, driver.c_context)) }?;
|
||||||
|
|
||||||
|
Ok(driver)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Set up a new Media Driver with default options
|
||||||
|
pub fn new() -> Result<Self, DriverError> {
|
||||||
|
Self::with_context(DriverContext::default())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Drop for MediaDriver {
|
||||||
|
fn drop(&mut self) {
|
||||||
|
if self.c_driver != ptr::null_mut() {
|
||||||
|
unsafe { aeron_op!(aeron_driver_close(self.c_driver)) }.unwrap();
|
||||||
|
}
|
||||||
|
if self.c_context != ptr::null_mut() {
|
||||||
|
unsafe { aeron_op!(aeron_driver_context_close(self.c_context)) }.unwrap();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use crate::driver::{DriverContext, DriverError, MediaDriver};
|
||||||
|
use std::ffi::CStr;
|
||||||
|
use tempfile::tempdir;
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn multiple_startup_failure() {
|
||||||
|
// We immediately close `tempdir` because we just want the name; Aeron needs
|
||||||
|
// to set up the directory itself.
|
||||||
|
let dir = tempdir().unwrap();
|
||||||
|
let dir_path = dir.as_ref().to_path_buf();
|
||||||
|
dir.close().unwrap();
|
||||||
|
|
||||||
|
let context = DriverContext::default().set_aeron_dir(&dir_path);
|
||||||
|
let driver = MediaDriver::with_context(context).unwrap();
|
||||||
|
|
||||||
|
assert_eq!(
|
||||||
|
unsafe { CStr::from_ptr((*driver.c_context).aeron_dir) }.to_str(),
|
||||||
|
Ok(dir_path.to_str().unwrap())
|
||||||
|
);
|
||||||
|
drop(driver);
|
||||||
|
|
||||||
|
// Attempting to start a media driver twice in rapid succession is guaranteed
|
||||||
|
// cause an issue because the new media driver must wait for a heartbeat timeout.
|
||||||
|
let context = DriverContext::default().set_aeron_dir(&dir_path);
|
||||||
|
let driver_res = MediaDriver::with_context(context);
|
||||||
|
|
||||||
|
assert!(driver_res.is_err());
|
||||||
|
assert_eq!(
|
||||||
|
driver_res.unwrap_err(),
|
||||||
|
DriverError {
|
||||||
|
code: 0,
|
||||||
|
msg: format!("could not recreate aeron dir {}: ", dir_path.display())
|
||||||
|
}
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
@ -1,7 +1,7 @@
|
|||||||
//! [Aeron](https://github.com/real-logic/aeron) client for Rust
|
//! [Aeron](https://github.com/real-logic/aeron) client for Rust
|
||||||
#![deny(missing_docs)]
|
#![deny(missing_docs)]
|
||||||
|
|
||||||
mod context;
|
pub mod driver;
|
||||||
|
|
||||||
/// Retrieve the C library version in (major, minor, patch) format
|
/// Retrieve the C library version in (major, minor, patch) format
|
||||||
pub fn aeron_version() -> (u32, u32, u32) {
|
pub fn aeron_version() -> (u32, u32, u32) {
|
||||||
|
Loading…
Reference in New Issue
Block a user