1
0
mirror of https://github.com/bspeice/aeron-rs synced 2025-01-21 03:20:04 -05:00

Fix driver termination

Turns out when you attempt to write 0 bytes... you write 0 bytes.
This commit is contained in:
Bradlee Speice 2019-11-07 21:06:11 -05:00
parent 88985418e0
commit 4884108e04
4 changed files with 16 additions and 26 deletions

View File

@ -17,15 +17,16 @@ impl<A> Flyweight<A, Unchecked>
where
A: AtomicBuffer,
{
#[allow(clippy::new_ret_no_self)]
pub fn new<S>(buffer: A, offset: IndexT) -> Result<Flyweight<A, S>>
where
S: Sized
S: Sized,
{
buffer.overlay::<S>(offset)?;
Ok(Flyweight {
buffer,
base_offset: offset,
_phantom: PhantomData
_phantom: PhantomData,
})
}
}

View File

@ -65,4 +65,8 @@ where
}
self
}
pub fn length(&self) -> IndexT {
size_of::<Self>() as IndexT + self.token_length()
}
}

View File

@ -33,18 +33,15 @@ where
self.client_id
}
pub fn terminate_driver(&mut self, _token_buffer: Option<&[u8]>) -> Result<()> {
let _client_id = self.client_id;
self.write_command_to_driver(|buffer: &mut [u8], _length: &mut IndexT| {
pub fn terminate_driver(&mut self, token_buffer: Option<&[u8]>) -> Result<()> {
let client_id = self.client_id;
self.write_command_to_driver(|buffer: &mut [u8], length: &mut IndexT| {
// UNWRAP: Buffer from `write_command` guaranteed to be long enough for `TerminateDriverDefn`
let _request = Flyweight::new::<TerminateDriverDefn>(buffer, 0).unwrap();
let mut request = Flyweight::new::<TerminateDriverDefn>(buffer, 0).unwrap();
// FIXME: Uncommenting this causes termination to not succeed
/*
request.put_client_id(client_id).put_correlation_id(-1);
token_buffer.map(|b| request.put_token_buffer(b));
*length = request.token_length();
*/
*length = request.length();
ClientCommand::TerminateDriver
})

View File

@ -3,6 +3,7 @@ use aeron_rs::cnc_descriptor;
use aeron_rs::cnc_descriptor::MetaDataDefinition;
use aeron_rs::concurrent::ringbuffer::ManyToOneRingBuffer;
use aeron_rs::concurrent::AtomicBuffer;
use aeron_rs::driver_proxy::DriverProxy;
use aeron_rs::util::IndexT;
use memmap::MmapOptions;
use std::ffi::{c_void, CString};
@ -115,24 +116,11 @@ fn cnc_terminate() {
.to_driver_buffer_length;
let buffer_end = cnc_metadata_len + buffer_len as usize;
let mut ring_buffer = ManyToOneRingBuffer::new(&mut mmap[cnc_metadata_len..buffer_end])
let ring_buffer = ManyToOneRingBuffer::new(&mut mmap[cnc_metadata_len..buffer_end])
.expect("Improperly sized buffer");
// 20 bytes: Client ID (8), correlation ID (8), token length (4)
let mut terminate_bytes = vec![0u8; 20];
let client_id = ring_buffer.next_correlation_id();
terminate_bytes.put_i64_ordered(0, client_id).unwrap();
terminate_bytes.put_i64_ordered(8, -1).unwrap();
let term_id: i32 = 0x0E;
ring_buffer
.write(
term_id,
&terminate_bytes,
0,
terminate_bytes.len() as IndexT,
)
.unwrap();
let mut driver_proxy = DriverProxy::new(ring_buffer);
driver_proxy.terminate_driver(None).unwrap();
// Wait for the driver to finish
// TODO: Timeout, and then set `RUNNING` manually