1
0
mirror of https://github.com/bspeice/aeron-rs synced 2024-12-22 05:48:10 -05:00

CnC termination test working again

This commit is contained in:
Bradlee Speice 2019-11-02 16:00:57 -04:00
parent b235655f71
commit 9139bf7234
2 changed files with 18 additions and 17 deletions

View File

@ -8,6 +8,7 @@ use std::sync::atomic::{AtomicI64, Ordering};
use crate::util::{AeronError, IndexT, Result}; use crate::util::{AeronError, IndexT, Result};
use std::ptr::{read_volatile, write_volatile}; use std::ptr::{read_volatile, write_volatile};
use memmap::MmapMut;
use std::ops::{Deref, DerefMut}; use std::ops::{Deref, DerefMut};
/// Atomic operations on slices of memory /// Atomic operations on slices of memory
@ -224,3 +225,5 @@ pub trait AtomicBuffer: Deref<Target = [u8]> + DerefMut<Target = [u8]> {
impl AtomicBuffer for Vec<u8> {} impl AtomicBuffer for Vec<u8> {}
impl AtomicBuffer for &mut [u8] {} impl AtomicBuffer for &mut [u8] {}
impl AtomicBuffer for MmapMut {}

View File

@ -1,5 +1,6 @@
use aeron_driver_sys::*; use aeron_driver_sys::*;
use aeron_rs::client::cnc_descriptor; use aeron_rs::client::cnc_descriptor;
use aeron_rs::client::cnc_descriptor::MetaDataDefinition;
use aeron_rs::client::concurrent::ringbuffer::ManyToOneRingBuffer; use aeron_rs::client::concurrent::ringbuffer::ManyToOneRingBuffer;
use aeron_rs::client::concurrent::AtomicBuffer; use aeron_rs::client::concurrent::AtomicBuffer;
use aeron_rs::util::IndexT; use aeron_rs::util::IndexT;
@ -77,7 +78,6 @@ fn driver_thread(aeron_dir: PathBuf) {
unsafe { aeron_driver_context_close(context) }; unsafe { aeron_driver_context_close(context) };
} }
/*
#[test] #[test]
fn cnc_terminate() { fn cnc_terminate() {
let temp_dir = tempdir().unwrap(); let temp_dir = tempdir().unwrap();
@ -109,30 +109,29 @@ fn cnc_terminate() {
let cnc_metadata_len = cnc_descriptor::META_DATA_LENGTH; let cnc_metadata_len = cnc_descriptor::META_DATA_LENGTH;
// Read metadata to get buffer length // Read metadata to get buffer length
let buffer_len = { let buffer_len = mmap
let atomic_buffer = AtomicBuffer::wrap(&mut mmap); .overlay::<MetaDataDefinition>(0)
let metadata = atomic_buffer .unwrap()
.overlay::<cnc_descriptor::MetaDataDefinition>(0) .to_driver_buffer_length;
.unwrap();
metadata.to_driver_buffer_length
};
let buffer_end = cnc_metadata_len + buffer_len as usize; let buffer_end = cnc_metadata_len + buffer_len as usize;
let atomic_buffer = AtomicBuffer::wrap(&mut mmap[cnc_metadata_len..buffer_end]); let mut ring_buffer = ManyToOneRingBuffer::new(&mut mmap[cnc_metadata_len..buffer_end])
let mut ring_buffer = .expect("Improperly sized buffer");
ManyToOneRingBuffer::wrap(atomic_buffer).expect("Improperly sized buffer");
// 20 bytes: Client ID (8), correlation ID (8), token length (4) // 20 bytes: Client ID (8), correlation ID (8), token length (4)
let mut terminate_bytes = vec![0u8; 20]; let mut terminate_bytes = vec![0u8; 20];
let terminate_len = terminate_bytes.len();
let mut source_buffer = AtomicBuffer::wrap(&mut terminate_bytes);
let client_id = ring_buffer.next_correlation_id(); let client_id = ring_buffer.next_correlation_id();
source_buffer.put_i64_ordered(0, client_id).unwrap(); terminate_bytes.put_i64_ordered(0, client_id).unwrap();
source_buffer.put_i64_ordered(8, -1).unwrap(); terminate_bytes.put_i64_ordered(8, -1).unwrap();
let term_id: i32 = 0x0E; let term_id: i32 = 0x0E;
ring_buffer ring_buffer
.write(term_id, &source_buffer, 0, terminate_len as IndexT) .write(
term_id,
&terminate_bytes,
0,
terminate_bytes.len() as IndexT,
)
.unwrap(); .unwrap();
// Wait for the driver to finish // Wait for the driver to finish
@ -142,4 +141,3 @@ fn cnc_terminate() {
.expect("Driver thread panicked during execution"); .expect("Driver thread panicked during execution");
assert_eq!(RUNNING.load(Ordering::SeqCst), false); assert_eq!(RUNNING.load(Ordering::SeqCst), false);
} }
*/