From 4884108e047c25b0ecd09bd31d42e700f3b31e8b Mon Sep 17 00:00:00 2001 From: Bradlee Speice Date: Thu, 7 Nov 2019 21:06:11 -0500 Subject: [PATCH] Fix driver termination Turns out when you attempt to write 0 bytes... you write 0 bytes. --- aeron-rs/src/command/flyweight.rs | 5 +++-- aeron-rs/src/command/terminate_driver.rs | 4 ++++ aeron-rs/src/driver_proxy.rs | 13 +++++-------- aeron-rs/tests/cnc_terminate.rs | 20 ++++---------------- 4 files changed, 16 insertions(+), 26 deletions(-) diff --git a/aeron-rs/src/command/flyweight.rs b/aeron-rs/src/command/flyweight.rs index 8befdf4..4e3162c 100644 --- a/aeron-rs/src/command/flyweight.rs +++ b/aeron-rs/src/command/flyweight.rs @@ -17,15 +17,16 @@ impl Flyweight where A: AtomicBuffer, { + #[allow(clippy::new_ret_no_self)] pub fn new(buffer: A, offset: IndexT) -> Result> where - S: Sized + S: Sized, { buffer.overlay::(offset)?; Ok(Flyweight { buffer, base_offset: offset, - _phantom: PhantomData + _phantom: PhantomData, }) } } diff --git a/aeron-rs/src/command/terminate_driver.rs b/aeron-rs/src/command/terminate_driver.rs index 6aba3fa..24f1c68 100644 --- a/aeron-rs/src/command/terminate_driver.rs +++ b/aeron-rs/src/command/terminate_driver.rs @@ -65,4 +65,8 @@ where } self } + + pub fn length(&self) -> IndexT { + size_of::() as IndexT + self.token_length() + } } diff --git a/aeron-rs/src/driver_proxy.rs b/aeron-rs/src/driver_proxy.rs index 4521bb7..d55d888 100644 --- a/aeron-rs/src/driver_proxy.rs +++ b/aeron-rs/src/driver_proxy.rs @@ -33,18 +33,15 @@ where self.client_id } - pub fn terminate_driver(&mut self, _token_buffer: Option<&[u8]>) -> Result<()> { - let _client_id = self.client_id; - self.write_command_to_driver(|buffer: &mut [u8], _length: &mut IndexT| { + pub fn terminate_driver(&mut self, token_buffer: Option<&[u8]>) -> Result<()> { + let client_id = self.client_id; + self.write_command_to_driver(|buffer: &mut [u8], length: &mut IndexT| { // UNWRAP: Buffer from `write_command` guaranteed to be long enough for `TerminateDriverDefn` - let _request = Flyweight::new::(buffer, 0).unwrap(); + let mut request = Flyweight::new::(buffer, 0).unwrap(); - // FIXME: Uncommenting this causes termination to not succeed - /* request.put_client_id(client_id).put_correlation_id(-1); token_buffer.map(|b| request.put_token_buffer(b)); - *length = request.token_length(); - */ + *length = request.length(); ClientCommand::TerminateDriver }) diff --git a/aeron-rs/tests/cnc_terminate.rs b/aeron-rs/tests/cnc_terminate.rs index 526129b..3f01086 100644 --- a/aeron-rs/tests/cnc_terminate.rs +++ b/aeron-rs/tests/cnc_terminate.rs @@ -3,6 +3,7 @@ use aeron_rs::cnc_descriptor; use aeron_rs::cnc_descriptor::MetaDataDefinition; use aeron_rs::concurrent::ringbuffer::ManyToOneRingBuffer; use aeron_rs::concurrent::AtomicBuffer; +use aeron_rs::driver_proxy::DriverProxy; use aeron_rs::util::IndexT; use memmap::MmapOptions; use std::ffi::{c_void, CString}; @@ -115,24 +116,11 @@ fn cnc_terminate() { .to_driver_buffer_length; let buffer_end = cnc_metadata_len + buffer_len as usize; - let mut ring_buffer = ManyToOneRingBuffer::new(&mut mmap[cnc_metadata_len..buffer_end]) + let ring_buffer = ManyToOneRingBuffer::new(&mut mmap[cnc_metadata_len..buffer_end]) .expect("Improperly sized buffer"); - // 20 bytes: Client ID (8), correlation ID (8), token length (4) - let mut terminate_bytes = vec![0u8; 20]; - let client_id = ring_buffer.next_correlation_id(); - terminate_bytes.put_i64_ordered(0, client_id).unwrap(); - terminate_bytes.put_i64_ordered(8, -1).unwrap(); - - let term_id: i32 = 0x0E; - ring_buffer - .write( - term_id, - &terminate_bytes, - 0, - terminate_bytes.len() as IndexT, - ) - .unwrap(); + let mut driver_proxy = DriverProxy::new(ring_buffer); + driver_proxy.terminate_driver(None).unwrap(); // Wait for the driver to finish // TODO: Timeout, and then set `RUNNING` manually