From dc9fd52e073fac37407388f7a2444fccb007dbea Mon Sep 17 00:00:00 2001 From: Bradlee Speice Date: Sun, 6 Oct 2019 23:16:47 -0400 Subject: [PATCH] Able to successfully terminate a driver!!! Needs a *ton* of work to be more Rust-idiomatic, but the basics are there. --- examples/do_terminate.rs | 54 -------------- src/client/cnc_descriptor.rs | 3 +- tests/cnc_terminate.rs | 141 +++++++++++++++++++++++++++++++++++ 3 files changed, 143 insertions(+), 55 deletions(-) delete mode 100644 examples/do_terminate.rs create mode 100644 tests/cnc_terminate.rs diff --git a/examples/do_terminate.rs b/examples/do_terminate.rs deleted file mode 100644 index 00b30bb..0000000 --- a/examples/do_terminate.rs +++ /dev/null @@ -1,54 +0,0 @@ -use aeron_rs::client::cnc_descriptor::MetaDataDefinition; -use aeron_rs::client::concurrent::atomic_buffer::AtomicBuffer; -use aeron_rs::client::concurrent::ring_buffer::ManyToOneRingBuffer; -use aeron_rs::client::context::ClientContext; -use aeron_rs::util::IndexT; -use memmap::MmapOptions; -use std::fs::OpenOptions; -use std::mem::size_of; -use aeron_rs::client::cnc_descriptor; - -fn main() { - let path = ClientContext::default_aeron_path(); - let cnc = path.join("cnc.dat"); - - println!("Opening CnC file: {}", cnc.display()); - let file = OpenOptions::new() - .read(true) - .write(true) - .open(&cnc) - .expect("Unable to open CnC file"); - let mut mmap = - unsafe { MmapOptions::default().map_mut(&file) }.expect("Unable to mmap CnC file"); - println!("MMap len: {}", mmap.len()); - - // When creating the buffer, we need to offset by the CnC metadata - let cnc_metadata_len = cnc_descriptor::META_DATA_LENGTH; - println!("Buffer start: {}", cnc_metadata_len); - - // Read metadata to get buffer length - let buffer_len = { - let atomic_buffer = AtomicBuffer::wrap(&mut mmap); - let metadata = atomic_buffer.overlay::(0).unwrap(); - metadata.to_driver_buffer_length - }; - println!("Buffer len: {}", buffer_len); - - let buffer_end = cnc_metadata_len + buffer_len as usize; - let atomic_buffer = AtomicBuffer::wrap(&mut mmap[cnc_metadata_len..buffer_end]); - let mut ring_buffer = - ManyToOneRingBuffer::wrap(atomic_buffer).expect("Improperly sized buffer"); - - // 20 bytes: Client ID (8), correlation ID (8), token length (4) - let mut terminate_bytes = vec![0u8; 20]; - let terminate_len = terminate_bytes.len(); - let mut source_buffer = AtomicBuffer::wrap(&mut terminate_bytes); - let client_id = ring_buffer.next_correlation_id(); - source_buffer.put_i64_ordered(0, client_id).unwrap(); - source_buffer.put_i64_ordered(8, -1).unwrap(); - - let term_id: i32 = 0x0E; - ring_buffer - .write(term_id, &source_buffer, 0, terminate_len as IndexT) - .unwrap(); -} diff --git a/src/client/cnc_descriptor.rs b/src/client/cnc_descriptor.rs index 410ab20..3ea3e88 100644 --- a/src/client/cnc_descriptor.rs +++ b/src/client/cnc_descriptor.rs @@ -66,7 +66,8 @@ pub struct MetaDataDefinition { /// Length of the metadata block in a CnC file. Note that it's not equivalent /// to the actual struct length. -pub const META_DATA_LENGTH: usize = bit::align_usize(size_of::(), bit::CACHE_LINE_LENGTH * 2); +pub const META_DATA_LENGTH: usize = + bit::align_usize(size_of::(), bit::CACHE_LINE_LENGTH * 2); /// Version code for the Aeron CnC file format pub const CNC_VERSION: i32 = crate::sematic_version_compose(0, 0, 16); diff --git a/tests/cnc_terminate.rs b/tests/cnc_terminate.rs new file mode 100644 index 0000000..f6aba3b --- /dev/null +++ b/tests/cnc_terminate.rs @@ -0,0 +1,141 @@ +use aeron_driver_sys::*; +use aeron_rs::client::cnc_descriptor; +use aeron_rs::client::concurrent::atomic_buffer::AtomicBuffer; +use aeron_rs::client::concurrent::ring_buffer::ManyToOneRingBuffer; +use aeron_rs::util::IndexT; +use memmap::MmapOptions; +use std::ffi::{c_void, CString}; +use std::fs::OpenOptions; +use std::path::PathBuf; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::time::Duration; +use std::{ptr, thread}; +use tempfile::tempdir; + +static RUNNING: AtomicBool = AtomicBool::new(false); + +unsafe extern "C" fn termination_hook(_state: *mut c_void) { + RUNNING.store(false, Ordering::SeqCst); +} + +unsafe extern "C" fn termination_validation( + _state: *mut c_void, + _data: *mut u8, + _length: i32, +) -> bool { + true +} + +fn driver_thread(aeron_dir: PathBuf) { + // Code largely ripped from `aeronmd`. Extra bits for termination and + // termination validation added as necessary, and far coarser error handling. + let mut context: *mut aeron_driver_context_t = ptr::null_mut(); + let mut driver: *mut aeron_driver_t = ptr::null_mut(); + + let context_init = unsafe { aeron_driver_context_init(&mut context) }; + assert!(context_init >= 0); + + let path_bytes = aeron_dir.to_str().unwrap().as_bytes(); + let c_string = CString::new(path_bytes).unwrap(); + + let aeron_dir = unsafe { aeron_driver_context_set_dir(context, c_string.into_raw()) }; + assert!(aeron_dir >= 0); + + let term_hook = unsafe { + aeron_driver_context_set_driver_termination_hook( + context, + Some(termination_hook), + ptr::null_mut(), + ) + }; + assert!(term_hook >= 0); + + let term_validation_hook = unsafe { + aeron_driver_context_set_driver_termination_validator( + context, + Some(termination_validation), + ptr::null_mut(), + ) + }; + assert!(term_validation_hook >= 0); + + let delete_dir = unsafe { aeron_driver_context_set_dir_delete_on_start(context, true) }; + assert!(delete_dir >= 0); + + let driver_init = unsafe { aeron_driver_init(&mut driver, context) }; + assert!(driver_init >= 0); + + let driver_start = unsafe { aeron_driver_start(driver, true) }; + assert!(driver_start >= 0); + + RUNNING.store(true, Ordering::SeqCst); + while RUNNING.load(Ordering::SeqCst) { + unsafe { aeron_driver_main_idle_strategy(driver, aeron_driver_main_do_work(driver)) }; + } + + unsafe { aeron_driver_close(driver) }; + unsafe { aeron_driver_context_close(context) }; +} + +#[test] +fn cnc_terminate() { + let dir = tempdir().unwrap().into_path(); + + // Start up the media driver + let driver_dir = dir.clone(); + let driver_thread = thread::Builder::new() + .name("cnc_terminate__driver_thread".to_string()) + .spawn(|| driver_thread(driver_dir)) + .unwrap(); + + // Sleep a moment to let the media driver get set up + thread::sleep(Duration::from_millis(500)); + assert_eq!(RUNNING.load(Ordering::SeqCst), true); + + // Write to the CnC file to attempt termination + let cnc = dir.join(cnc_descriptor::CNC_FILE); + let file = OpenOptions::new() + .read(true) + .write(true) + .open(&cnc) + .expect("Unable to open CnC file"); + let mut mmap = + unsafe { MmapOptions::default().map_mut(&file) }.expect("Unable to mmap CnC file"); + + // When creating the buffer, we need to offset by the CnC metadata + let cnc_metadata_len = cnc_descriptor::META_DATA_LENGTH; + + // Read metadata to get buffer length + let buffer_len = { + let atomic_buffer = AtomicBuffer::wrap(&mut mmap); + let metadata = atomic_buffer + .overlay::(0) + .unwrap(); + metadata.to_driver_buffer_length + }; + + let buffer_end = cnc_metadata_len + buffer_len as usize; + let atomic_buffer = AtomicBuffer::wrap(&mut mmap[cnc_metadata_len..buffer_end]); + let mut ring_buffer = + ManyToOneRingBuffer::wrap(atomic_buffer).expect("Improperly sized buffer"); + + // 20 bytes: Client ID (8), correlation ID (8), token length (4) + let mut terminate_bytes = vec![0u8; 20]; + let terminate_len = terminate_bytes.len(); + let mut source_buffer = AtomicBuffer::wrap(&mut terminate_bytes); + let client_id = ring_buffer.next_correlation_id(); + source_buffer.put_i64_ordered(0, client_id).unwrap(); + source_buffer.put_i64_ordered(8, -1).unwrap(); + + let term_id: i32 = 0x0E; + ring_buffer + .write(term_id, &source_buffer, 0, terminate_len as IndexT) + .unwrap(); + + // Wait for the driver to finish + // TODO: Timeout, and then set `RUNNING` manually + driver_thread + .join() + .expect("Driver thread panicked during execution"); + assert_eq!(RUNNING.load(Ordering::SeqCst), false); +}