From 9139bf7234176064811603f6d94277ffca743a5b Mon Sep 17 00:00:00 2001 From: Bradlee Speice Date: Sat, 2 Nov 2019 16:00:57 -0400 Subject: [PATCH] CnC termination test working again --- aeron-rs/src/client/concurrent/mod.rs | 3 +++ aeron-rs/tests/cnc_terminate.rs | 32 +++++++++++++-------------- 2 files changed, 18 insertions(+), 17 deletions(-) diff --git a/aeron-rs/src/client/concurrent/mod.rs b/aeron-rs/src/client/concurrent/mod.rs index d337fb3..bd8221d 100644 --- a/aeron-rs/src/client/concurrent/mod.rs +++ b/aeron-rs/src/client/concurrent/mod.rs @@ -8,6 +8,7 @@ use std::sync::atomic::{AtomicI64, Ordering}; use crate::util::{AeronError, IndexT, Result}; use std::ptr::{read_volatile, write_volatile}; +use memmap::MmapMut; use std::ops::{Deref, DerefMut}; /// Atomic operations on slices of memory @@ -224,3 +225,5 @@ pub trait AtomicBuffer: Deref + DerefMut { impl AtomicBuffer for Vec {} impl AtomicBuffer for &mut [u8] {} + +impl AtomicBuffer for MmapMut {} diff --git a/aeron-rs/tests/cnc_terminate.rs b/aeron-rs/tests/cnc_terminate.rs index e4bc449..2e67bd5 100644 --- a/aeron-rs/tests/cnc_terminate.rs +++ b/aeron-rs/tests/cnc_terminate.rs @@ -1,5 +1,6 @@ use aeron_driver_sys::*; use aeron_rs::client::cnc_descriptor; +use aeron_rs::client::cnc_descriptor::MetaDataDefinition; use aeron_rs::client::concurrent::ringbuffer::ManyToOneRingBuffer; use aeron_rs::client::concurrent::AtomicBuffer; use aeron_rs::util::IndexT; @@ -77,7 +78,6 @@ fn driver_thread(aeron_dir: PathBuf) { unsafe { aeron_driver_context_close(context) }; } -/* #[test] fn cnc_terminate() { let temp_dir = tempdir().unwrap(); @@ -109,30 +109,29 @@ fn cnc_terminate() { let cnc_metadata_len = cnc_descriptor::META_DATA_LENGTH; // Read metadata to get buffer length - let buffer_len = { - let atomic_buffer = AtomicBuffer::wrap(&mut mmap); - let metadata = atomic_buffer - .overlay::(0) - .unwrap(); - metadata.to_driver_buffer_length - }; + let buffer_len = mmap + .overlay::(0) + .unwrap() + .to_driver_buffer_length; let buffer_end = cnc_metadata_len + buffer_len as usize; - let atomic_buffer = AtomicBuffer::wrap(&mut mmap[cnc_metadata_len..buffer_end]); - let mut ring_buffer = - ManyToOneRingBuffer::wrap(atomic_buffer).expect("Improperly sized buffer"); + let mut ring_buffer = ManyToOneRingBuffer::new(&mut mmap[cnc_metadata_len..buffer_end]) + .expect("Improperly sized buffer"); // 20 bytes: Client ID (8), correlation ID (8), token length (4) let mut terminate_bytes = vec![0u8; 20]; - let terminate_len = terminate_bytes.len(); - let mut source_buffer = AtomicBuffer::wrap(&mut terminate_bytes); let client_id = ring_buffer.next_correlation_id(); - source_buffer.put_i64_ordered(0, client_id).unwrap(); - source_buffer.put_i64_ordered(8, -1).unwrap(); + terminate_bytes.put_i64_ordered(0, client_id).unwrap(); + terminate_bytes.put_i64_ordered(8, -1).unwrap(); let term_id: i32 = 0x0E; ring_buffer - .write(term_id, &source_buffer, 0, terminate_len as IndexT) + .write( + term_id, + &terminate_bytes, + 0, + terminate_bytes.len() as IndexT, + ) .unwrap(); // Wait for the driver to finish @@ -142,4 +141,3 @@ fn cnc_terminate() { .expect("Driver thread panicked during execution"); assert_eq!(RUNNING.load(Ordering::SeqCst), false); } -*/