diff --git a/aeron-rs/src/command/flyweight.rs b/aeron-rs/src/command/flyweight.rs
index 8befdf4..4e3162c 100644
--- a/aeron-rs/src/command/flyweight.rs
+++ b/aeron-rs/src/command/flyweight.rs
@@ -17,15 +17,16 @@ impl Flyweight
where
A: AtomicBuffer,
{
+ #[allow(clippy::new_ret_no_self)]
pub fn new(buffer: A, offset: IndexT) -> Result>
where
- S: Sized
+ S: Sized,
{
buffer.overlay::(offset)?;
Ok(Flyweight {
buffer,
base_offset: offset,
- _phantom: PhantomData
+ _phantom: PhantomData,
})
}
}
diff --git a/aeron-rs/src/command/terminate_driver.rs b/aeron-rs/src/command/terminate_driver.rs
index 6aba3fa..24f1c68 100644
--- a/aeron-rs/src/command/terminate_driver.rs
+++ b/aeron-rs/src/command/terminate_driver.rs
@@ -65,4 +65,8 @@ where
}
self
}
+
+ pub fn length(&self) -> IndexT {
+ size_of::() as IndexT + self.token_length()
+ }
}
diff --git a/aeron-rs/src/driver_proxy.rs b/aeron-rs/src/driver_proxy.rs
index 4521bb7..d55d888 100644
--- a/aeron-rs/src/driver_proxy.rs
+++ b/aeron-rs/src/driver_proxy.rs
@@ -33,18 +33,15 @@ where
self.client_id
}
- pub fn terminate_driver(&mut self, _token_buffer: Option<&[u8]>) -> Result<()> {
- let _client_id = self.client_id;
- self.write_command_to_driver(|buffer: &mut [u8], _length: &mut IndexT| {
+ pub fn terminate_driver(&mut self, token_buffer: Option<&[u8]>) -> Result<()> {
+ let client_id = self.client_id;
+ self.write_command_to_driver(|buffer: &mut [u8], length: &mut IndexT| {
// UNWRAP: Buffer from `write_command` guaranteed to be long enough for `TerminateDriverDefn`
- let _request = Flyweight::new::(buffer, 0).unwrap();
+ let mut request = Flyweight::new::(buffer, 0).unwrap();
- // FIXME: Uncommenting this causes termination to not succeed
- /*
request.put_client_id(client_id).put_correlation_id(-1);
token_buffer.map(|b| request.put_token_buffer(b));
- *length = request.token_length();
- */
+ *length = request.length();
ClientCommand::TerminateDriver
})
diff --git a/aeron-rs/tests/cnc_terminate.rs b/aeron-rs/tests/cnc_terminate.rs
index 526129b..3f01086 100644
--- a/aeron-rs/tests/cnc_terminate.rs
+++ b/aeron-rs/tests/cnc_terminate.rs
@@ -3,6 +3,7 @@ use aeron_rs::cnc_descriptor;
use aeron_rs::cnc_descriptor::MetaDataDefinition;
use aeron_rs::concurrent::ringbuffer::ManyToOneRingBuffer;
use aeron_rs::concurrent::AtomicBuffer;
+use aeron_rs::driver_proxy::DriverProxy;
use aeron_rs::util::IndexT;
use memmap::MmapOptions;
use std::ffi::{c_void, CString};
@@ -115,24 +116,11 @@ fn cnc_terminate() {
.to_driver_buffer_length;
let buffer_end = cnc_metadata_len + buffer_len as usize;
- let mut ring_buffer = ManyToOneRingBuffer::new(&mut mmap[cnc_metadata_len..buffer_end])
+ let ring_buffer = ManyToOneRingBuffer::new(&mut mmap[cnc_metadata_len..buffer_end])
.expect("Improperly sized buffer");
- // 20 bytes: Client ID (8), correlation ID (8), token length (4)
- let mut terminate_bytes = vec![0u8; 20];
- let client_id = ring_buffer.next_correlation_id();
- terminate_bytes.put_i64_ordered(0, client_id).unwrap();
- terminate_bytes.put_i64_ordered(8, -1).unwrap();
-
- let term_id: i32 = 0x0E;
- ring_buffer
- .write(
- term_id,
- &terminate_bytes,
- 0,
- terminate_bytes.len() as IndexT,
- )
- .unwrap();
+ let mut driver_proxy = DriverProxy::new(ring_buffer);
+ driver_proxy.terminate_driver(None).unwrap();
// Wait for the driver to finish
// TODO: Timeout, and then set `RUNNING` manually