From b548c867c8ea18bbd3bbc9e177437d56c67bd66c Mon Sep 17 00:00:00 2001 From: Bradlee Speice Date: Sat, 5 Oct 2019 22:01:17 -0400 Subject: [PATCH] Theoretically able to shut down a driver? In practice, I can't get any combination of C's aeronmd, C++'s DriverTool, Rust's `aeronmd`, and `do_terminate`. --- examples/aeronmd.rs | 1 + examples/do_terminate.rs | 51 ++++++++++++++++++++++++++ src/client/cnc_descriptor.rs | 3 +- src/client/concurrent/atomic_buffer.rs | 6 ++- src/client/concurrent/ring_buffer.rs | 2 - 5 files changed, 58 insertions(+), 5 deletions(-) create mode 100644 examples/do_terminate.rs diff --git a/examples/aeronmd.rs b/examples/aeronmd.rs index 9d91217..a7786b4 100644 --- a/examples/aeronmd.rs +++ b/examples/aeronmd.rs @@ -13,6 +13,7 @@ use std::sync::atomic::{AtomicBool, Ordering}; static RUNNING: AtomicBool = AtomicBool::new(true); unsafe extern "C" fn termination_hook(_clientd: *mut c_void) { + println!("Terminated"); RUNNING.store(false, Ordering::SeqCst); } diff --git a/examples/do_terminate.rs b/examples/do_terminate.rs new file mode 100644 index 0000000..81e818c --- /dev/null +++ b/examples/do_terminate.rs @@ -0,0 +1,51 @@ +use aeron_rs::client::cnc_descriptor::MetaDataDefinition; +use aeron_rs::client::concurrent::atomic_buffer::AtomicBuffer; +use aeron_rs::client::concurrent::ring_buffer::ManyToOneRingBuffer; +use aeron_rs::client::context::ClientContext; +use aeron_rs::util::IndexT; +use memmap::MmapOptions; +use std::fs::OpenOptions; +use std::mem::size_of; + +fn main() { + let path = ClientContext::default_aeron_path(); + let cnc = path.join("cnc.dat"); + + println!("Opening CnC file: {}", cnc.display()); + let file = OpenOptions::new() + .read(true) + .write(true) + .open(&cnc) + .expect("Unable to open CnC file"); + let mut mmap = + unsafe { MmapOptions::default().map_mut(&file) }.expect("Unable to mmap CnC file"); + println!("MMap len: {}", mmap.len()); + + // When creating the buffer, we need to offset by the CnC metadata + let cnc_metadata_len = size_of::(); + println!("Buffer len: {}", mmap[cnc_metadata_len..].len()); + + // Read metadata to get buffer length + let buffer_len = { + let atomic_buffer = AtomicBuffer::wrap(&mut mmap); + let metadata = atomic_buffer.overlay::(0).unwrap(); + metadata.to_driver_buffer_length + }; + + let buffer_end = cnc_metadata_len + buffer_len as usize; + let atomic_buffer = AtomicBuffer::wrap(&mut mmap[cnc_metadata_len..buffer_end]); + let mut ring_buffer = + ManyToOneRingBuffer::wrap(atomic_buffer).expect("Improperly sized buffer"); + + // 20 bytes: Client ID (8), correlation ID (8), token length (4) + let mut terminate_bytes = vec![0u8; 20]; + let terminate_len = terminate_bytes.len(); + let mut source_buffer = AtomicBuffer::wrap(&mut terminate_bytes); + let client_id = ring_buffer.next_correlation_id(); + source_buffer.put_i64_ordered(0, client_id).unwrap(); + + let term_id: i32 = 0x0E; + ring_buffer + .write(term_id, &source_buffer, 0, terminate_len as IndexT) + .unwrap(); +} diff --git a/src/client/cnc_descriptor.rs b/src/client/cnc_descriptor.rs index 1c02318..6724a80 100644 --- a/src/client/cnc_descriptor.rs +++ b/src/client/cnc_descriptor.rs @@ -50,7 +50,8 @@ #[repr(C, align(4))] pub struct MetaDataDefinition { cnc_version: i32, - _to_driver_buffer_length: i32, + /// Size of the buffer containing data going to the media driver + pub to_driver_buffer_length: i32, _to_client_buffer_length: i32, _counter_metadata_buffer_length: i32, _counter_values_buffer_length: i32, diff --git a/src/client/concurrent/atomic_buffer.rs b/src/client/concurrent/atomic_buffer.rs index c130a43..f96b929 100644 --- a/src/client/concurrent/atomic_buffer.rs +++ b/src/client/concurrent/atomic_buffer.rs @@ -34,8 +34,10 @@ impl<'a> AtomicBuffer<'a> { } } - #[allow(clippy::cast_ptr_alignment)] - fn overlay(&self, offset: IndexT) -> Result<&T> + /// Overlay a struct on a buffer. + /// + /// NOTE: Has the potential to cause undefined behavior if alignment is incorrect. + pub fn overlay(&self, offset: IndexT) -> Result<&T> where T: Sized, { diff --git a/src/client/concurrent/ring_buffer.rs b/src/client/concurrent/ring_buffer.rs index 0c85925..4cbbaa8 100644 --- a/src/client/concurrent/ring_buffer.rs +++ b/src/client/concurrent/ring_buffer.rs @@ -199,7 +199,6 @@ impl<'a> ManyToOneRingBuffer<'a> { .unwrap(); let available_capacity = self.capacity - (tail - head) as IndexT; - println!("Available: {}", available_capacity); if required > available_capacity { // UNWRAP: Known-valid offset calculated during initialization head = self @@ -225,7 +224,6 @@ impl<'a> ManyToOneRingBuffer<'a> { tail_index = (tail & i64::from(mask)) as IndexT; let to_buffer_end_length = self.capacity - tail_index; - println!("To buffer end: {}", to_buffer_end_length); if required > to_buffer_end_length { let mut head_index = (head & i64::from(mask)) as IndexT;