mirror of
https://github.com/bspeice/aeron-rs
synced 2024-12-21 21:38:09 -05:00
Theoretically able to shut down a driver?
In practice, I can't get any combination of C's aeronmd, C++'s DriverTool, Rust's `aeronmd`, and `do_terminate`.
This commit is contained in:
parent
a92f7e6416
commit
b548c867c8
@ -13,6 +13,7 @@ use std::sync::atomic::{AtomicBool, Ordering};
|
|||||||
static RUNNING: AtomicBool = AtomicBool::new(true);
|
static RUNNING: AtomicBool = AtomicBool::new(true);
|
||||||
|
|
||||||
unsafe extern "C" fn termination_hook(_clientd: *mut c_void) {
|
unsafe extern "C" fn termination_hook(_clientd: *mut c_void) {
|
||||||
|
println!("Terminated");
|
||||||
RUNNING.store(false, Ordering::SeqCst);
|
RUNNING.store(false, Ordering::SeqCst);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
51
examples/do_terminate.rs
Normal file
51
examples/do_terminate.rs
Normal file
@ -0,0 +1,51 @@
|
|||||||
|
use aeron_rs::client::cnc_descriptor::MetaDataDefinition;
|
||||||
|
use aeron_rs::client::concurrent::atomic_buffer::AtomicBuffer;
|
||||||
|
use aeron_rs::client::concurrent::ring_buffer::ManyToOneRingBuffer;
|
||||||
|
use aeron_rs::client::context::ClientContext;
|
||||||
|
use aeron_rs::util::IndexT;
|
||||||
|
use memmap::MmapOptions;
|
||||||
|
use std::fs::OpenOptions;
|
||||||
|
use std::mem::size_of;
|
||||||
|
|
||||||
|
fn main() {
|
||||||
|
let path = ClientContext::default_aeron_path();
|
||||||
|
let cnc = path.join("cnc.dat");
|
||||||
|
|
||||||
|
println!("Opening CnC file: {}", cnc.display());
|
||||||
|
let file = OpenOptions::new()
|
||||||
|
.read(true)
|
||||||
|
.write(true)
|
||||||
|
.open(&cnc)
|
||||||
|
.expect("Unable to open CnC file");
|
||||||
|
let mut mmap =
|
||||||
|
unsafe { MmapOptions::default().map_mut(&file) }.expect("Unable to mmap CnC file");
|
||||||
|
println!("MMap len: {}", mmap.len());
|
||||||
|
|
||||||
|
// When creating the buffer, we need to offset by the CnC metadata
|
||||||
|
let cnc_metadata_len = size_of::<MetaDataDefinition>();
|
||||||
|
println!("Buffer len: {}", mmap[cnc_metadata_len..].len());
|
||||||
|
|
||||||
|
// Read metadata to get buffer length
|
||||||
|
let buffer_len = {
|
||||||
|
let atomic_buffer = AtomicBuffer::wrap(&mut mmap);
|
||||||
|
let metadata = atomic_buffer.overlay::<MetaDataDefinition>(0).unwrap();
|
||||||
|
metadata.to_driver_buffer_length
|
||||||
|
};
|
||||||
|
|
||||||
|
let buffer_end = cnc_metadata_len + buffer_len as usize;
|
||||||
|
let atomic_buffer = AtomicBuffer::wrap(&mut mmap[cnc_metadata_len..buffer_end]);
|
||||||
|
let mut ring_buffer =
|
||||||
|
ManyToOneRingBuffer::wrap(atomic_buffer).expect("Improperly sized buffer");
|
||||||
|
|
||||||
|
// 20 bytes: Client ID (8), correlation ID (8), token length (4)
|
||||||
|
let mut terminate_bytes = vec![0u8; 20];
|
||||||
|
let terminate_len = terminate_bytes.len();
|
||||||
|
let mut source_buffer = AtomicBuffer::wrap(&mut terminate_bytes);
|
||||||
|
let client_id = ring_buffer.next_correlation_id();
|
||||||
|
source_buffer.put_i64_ordered(0, client_id).unwrap();
|
||||||
|
|
||||||
|
let term_id: i32 = 0x0E;
|
||||||
|
ring_buffer
|
||||||
|
.write(term_id, &source_buffer, 0, terminate_len as IndexT)
|
||||||
|
.unwrap();
|
||||||
|
}
|
@ -50,7 +50,8 @@
|
|||||||
#[repr(C, align(4))]
|
#[repr(C, align(4))]
|
||||||
pub struct MetaDataDefinition {
|
pub struct MetaDataDefinition {
|
||||||
cnc_version: i32,
|
cnc_version: i32,
|
||||||
_to_driver_buffer_length: i32,
|
/// Size of the buffer containing data going to the media driver
|
||||||
|
pub to_driver_buffer_length: i32,
|
||||||
_to_client_buffer_length: i32,
|
_to_client_buffer_length: i32,
|
||||||
_counter_metadata_buffer_length: i32,
|
_counter_metadata_buffer_length: i32,
|
||||||
_counter_values_buffer_length: i32,
|
_counter_values_buffer_length: i32,
|
||||||
|
@ -34,8 +34,10 @@ impl<'a> AtomicBuffer<'a> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[allow(clippy::cast_ptr_alignment)]
|
/// Overlay a struct on a buffer.
|
||||||
fn overlay<T>(&self, offset: IndexT) -> Result<&T>
|
///
|
||||||
|
/// NOTE: Has the potential to cause undefined behavior if alignment is incorrect.
|
||||||
|
pub fn overlay<T>(&self, offset: IndexT) -> Result<&T>
|
||||||
where
|
where
|
||||||
T: Sized,
|
T: Sized,
|
||||||
{
|
{
|
||||||
|
@ -199,7 +199,6 @@ impl<'a> ManyToOneRingBuffer<'a> {
|
|||||||
.unwrap();
|
.unwrap();
|
||||||
let available_capacity = self.capacity - (tail - head) as IndexT;
|
let available_capacity = self.capacity - (tail - head) as IndexT;
|
||||||
|
|
||||||
println!("Available: {}", available_capacity);
|
|
||||||
if required > available_capacity {
|
if required > available_capacity {
|
||||||
// UNWRAP: Known-valid offset calculated during initialization
|
// UNWRAP: Known-valid offset calculated during initialization
|
||||||
head = self
|
head = self
|
||||||
@ -225,7 +224,6 @@ impl<'a> ManyToOneRingBuffer<'a> {
|
|||||||
tail_index = (tail & i64::from(mask)) as IndexT;
|
tail_index = (tail & i64::from(mask)) as IndexT;
|
||||||
let to_buffer_end_length = self.capacity - tail_index;
|
let to_buffer_end_length = self.capacity - tail_index;
|
||||||
|
|
||||||
println!("To buffer end: {}", to_buffer_end_length);
|
|
||||||
if required > to_buffer_end_length {
|
if required > to_buffer_end_length {
|
||||||
let mut head_index = (head & i64::from(mask)) as IndexT;
|
let mut head_index = (head & i64::from(mask)) as IndexT;
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user