diff --git a/aeron-rs/src/client/mod.rs b/aeron-rs/src/client/mod.rs deleted file mode 100644 index 32093de..0000000 --- a/aeron-rs/src/client/mod.rs +++ /dev/null @@ -1,6 +0,0 @@ -//! Aeron client -//! -//! These are the modules necessary to construct a functioning Aeron client -pub mod cnc_descriptor; -pub mod concurrent; -pub mod context; diff --git a/aeron-rs/src/client/cnc_descriptor.rs b/aeron-rs/src/cnc_descriptor.rs similarity index 98% rename from aeron-rs/src/client/cnc_descriptor.rs rename to aeron-rs/src/cnc_descriptor.rs index 9d05066..400bdf0 100644 --- a/aeron-rs/src/client/cnc_descriptor.rs +++ b/aeron-rs/src/cnc_descriptor.rs @@ -77,7 +77,7 @@ pub const CNC_FILE: &str = "cnc.dat"; #[cfg(test)] mod tests { - use crate::client::cnc_descriptor::{MetaDataDefinition, CNC_FILE, CNC_VERSION}; + use crate::cnc_descriptor::{MetaDataDefinition, CNC_FILE, CNC_VERSION}; use crate::driver::DriverContext; use memmap::MmapOptions; use std::fs::File; diff --git a/aeron-rs/src/command/correlated_message.rs b/aeron-rs/src/command/correlated_message.rs new file mode 100644 index 0000000..1c4638a --- /dev/null +++ b/aeron-rs/src/command/correlated_message.rs @@ -0,0 +1,53 @@ +//! Header struct for commands that use an identifier to associate the media driver response. +use crate::command::flyweight::Flyweight; +use crate::concurrent::AtomicBuffer; + +/// Basic definition for messages that include a client and correlation identifier to associate +/// commands and responses +#[repr(C, packed(4))] +pub struct CorrelatedMessageDefn { + pub(in crate::command) client_id: i64, + pub(in crate::command) correlation_id: i64, +} + +impl Flyweight +where + A: AtomicBuffer, +{ + /// Retrieve the client identifier associated with this message + pub fn client_id(&self) -> i64 { + self.get_struct().client_id + } + + /// Set the client identifier for this message + pub fn put_client_id(&mut self, value: i64) -> &mut Self { + self.get_struct_mut().client_id = value; + self + } + + /// Retrieve the correlation identifier associated with this message. + /// Will uniquely identify a command and response pair. + pub fn correlation_id(&self) -> i64 { + self.get_struct().correlation_id + } + + /// Set the correlation identifier for this message + pub fn put_correlation_id(&mut self, value: i64) -> &mut Self { + self.get_struct_mut().correlation_id = value; + self + } +} + +#[cfg(test)] +mod tests { + use crate::command::correlated_message::CorrelatedMessageDefn; + use std::mem::size_of; + + #[test] + fn correlated_message_size() { + assert_eq!( + size_of::(), + size_of::() + ) + } +} diff --git a/aeron-rs/src/command/flyweight.rs b/aeron-rs/src/command/flyweight.rs new file mode 100644 index 0000000..07cf6d8 --- /dev/null +++ b/aeron-rs/src/command/flyweight.rs @@ -0,0 +1,66 @@ +//! Flyweight pattern implementation for messages to and from the media driver. +use crate::concurrent::AtomicBuffer; +use crate::util::{IndexT, Result}; +use std::marker::PhantomData; + +/// Flyweight holder object. Wrapper around an underlying `AtomicBuffer` and +/// offset within that buffer that all future operations are relative to. +pub struct Flyweight +where + A: AtomicBuffer, +{ + pub(in crate::command) buffer: A, + base_offset: IndexT, + _phantom: PhantomData, +} + +/// Marker struct. +// We can't put this `new` method in the fully generic implementation because +// Rust gets confused as to what type `S` should be. +pub struct Unchecked; + +impl Flyweight +where + A: AtomicBuffer, +{ + /// Create a new flyweight object. Performs a bounds check on initialization + /// to ensure there is space available for `S`. + #[allow(clippy::new_ret_no_self)] + pub fn new(buffer: A, offset: IndexT) -> Result> + where + S: Sized, + { + buffer.overlay::(offset)?; + Ok(Flyweight { + buffer, + base_offset: offset, + _phantom: PhantomData, + }) + } +} + +impl Flyweight +where + A: AtomicBuffer, + S: Sized, +{ + pub(crate) fn get_struct(&self) -> &S { + // UNWRAP: Bounds check performed during initialization + self.buffer.overlay::(self.base_offset).unwrap() + } + + pub(crate) fn get_struct_mut(&mut self) -> &mut S { + // UNWRAP: Bounds check performed during initialization + self.buffer.overlay_mut::(self.base_offset).unwrap() + } + + pub(crate) fn bytes_at(&self, offset: IndexT) -> &[u8] { + let offset = (self.base_offset + offset) as usize; + // FIXME: Unwrap is unjustified here. + // C++ uses pointer arithmetic with no bounds checking, so I'm more comfortable + // with the Rust version at least panicking. Is the idea that we're safe because + // this is a crate-local (protected in C++) method? + self.buffer.bounds_check(offset as IndexT, 0).unwrap(); + &self.buffer[offset..] + } +} diff --git a/aeron-rs/src/command/mod.rs b/aeron-rs/src/command/mod.rs new file mode 100644 index 0000000..8993c97 --- /dev/null +++ b/aeron-rs/src/command/mod.rs @@ -0,0 +1,4 @@ +//! Message definitions for interactions with the Media Driver +pub mod correlated_message; +pub mod flyweight; +pub mod terminate_driver; diff --git a/aeron-rs/src/command/terminate_driver.rs b/aeron-rs/src/command/terminate_driver.rs new file mode 100644 index 0000000..2782e93 --- /dev/null +++ b/aeron-rs/src/command/terminate_driver.rs @@ -0,0 +1,105 @@ +//! Flyweight implementation for commands to terminate the driver +use crate::command::correlated_message::CorrelatedMessageDefn; +use crate::command::flyweight::Flyweight; +use crate::concurrent::AtomicBuffer; +use crate::util::IndexT; +use std::mem::size_of; + +/// Raw command to terminate a driver. The `token_length` describes the length +/// of a buffer immediately trailing this struct definition and part of the +/// same message. +#[repr(C, packed(4))] +pub struct TerminateDriverDefn { + pub(in crate::command) correlated_message: CorrelatedMessageDefn, + pub(in crate::command) token_length: i32, +} + +impl Flyweight +where + A: AtomicBuffer, +{ + /// Retrieve the client identifier of this request. + pub fn client_id(&self) -> i64 { + self.get_struct().correlated_message.client_id + } + + /// Set the client identifier of this request. + pub fn put_client_id(&mut self, value: i64) -> &mut Self { + self.get_struct_mut().correlated_message.client_id = value; + self + } + + /// Retrieve the correlation identifier associated with this request. Used to + /// associate driver responses with a specific request. + pub fn correlation_id(&self) -> i64 { + self.get_struct().correlated_message.correlation_id + } + + /// Set the correlation identifier to be used with this request. + pub fn put_correlation_id(&mut self, value: i64) -> &mut Self { + self.get_struct_mut().correlated_message.correlation_id = value; + self + } + + /// Get the current length of the payload associated with this termination request. + pub fn token_length(&self) -> i32 { + self.get_struct().token_length + } + + /// Set the payload length of this termination request. + /// + /// NOTE: While there are no safety issues, improperly setting this value can cause panics. + /// The `token_length` value is automatically set during calls to `put_token_buffer()`, + /// so this method is not likely to be frequently used. + pub fn put_token_length(&mut self, value: i32) -> &mut Self { + self.get_struct_mut().token_length = value; + self + } + + /// Return the current token payload associated with this termination request. + pub fn token_buffer(&self) -> &[u8] { + // QUESTION: Should I be slicing the buffer to `token_length`? + // C++ doesn't do anything, so I'm going to assume not. + &self.bytes_at(size_of::() as IndexT) + } + + /// Append a payload to the termination request. + pub fn put_token_buffer(&mut self, token_buffer: &[u8]) -> &mut Self { + let token_length = token_buffer.len() as i32; + self.get_struct_mut().token_length = token_length; + + if token_length > 0 { + // FIXME: Unwrap is unjustified here + // Currently just assume that people are going to be nice about the token buffer + // and not oversize it. C++ relies on throwing an exception if bounds are violated. + self.buffer + .put_slice( + size_of::() as IndexT, + &token_buffer, + 0, + token_length, + ) + .unwrap() + } + self + } + + /// Get the total byte length of this termination command + pub fn length(&self) -> IndexT { + size_of::() as IndexT + self.token_length() + } +} + +#[cfg(test)] +mod tests { + use crate::command::terminate_driver::TerminateDriverDefn; + use std::mem::size_of; + + #[test] + fn terminate_command_size() { + assert_eq!( + size_of::(), + size_of::() + ) + } +} diff --git a/aeron-rs/src/client/concurrent/broadcast.rs b/aeron-rs/src/concurrent/broadcast.rs similarity index 99% rename from aeron-rs/src/client/concurrent/broadcast.rs rename to aeron-rs/src/concurrent/broadcast.rs index 42cac44..cb8b6a6 100644 --- a/aeron-rs/src/client/concurrent/broadcast.rs +++ b/aeron-rs/src/concurrent/broadcast.rs @@ -1,6 +1,6 @@ //! Read messages that are broadcast from the media driver; this is the primary means //! of receiving data. -use crate::client::concurrent::AtomicBuffer; +use crate::concurrent::AtomicBuffer; use crate::util::bit::align; use crate::util::{AeronError, IndexT, Result}; use std::sync::atomic::{AtomicI64, Ordering}; diff --git a/aeron-rs/src/client/concurrent/mod.rs b/aeron-rs/src/concurrent/mod.rs similarity index 86% rename from aeron-rs/src/client/concurrent/mod.rs rename to aeron-rs/src/concurrent/mod.rs index 6b1a452..70a1e46 100644 --- a/aeron-rs/src/client/concurrent/mod.rs +++ b/aeron-rs/src/concurrent/mod.rs @@ -12,13 +12,21 @@ use std::ptr::{read_volatile, write_volatile}; use memmap::MmapMut; use std::ops::{Deref, DerefMut}; +fn bounds_check_slice(slice: &[u8], offset: IndexT, size: IndexT) -> Result<()> { + if offset < 0 || size < 0 || slice.len() as IndexT - offset < size { + Err(AeronError::OutOfBounds) + } else { + Ok(()) + } +} + /// Atomic operations on slices of memory pub trait AtomicBuffer: Deref + DerefMut { /// Check that there are at least `size` bytes of memory available /// beginning at some offset. /// /// ```rust - /// # use aeron_rs::client::concurrent::AtomicBuffer; + /// # use aeron_rs::concurrent::AtomicBuffer; /// /// let buffer = &mut [0u8; 8][..]; /// assert!(buffer.bounds_check(0, 8).is_ok()); @@ -27,11 +35,7 @@ pub trait AtomicBuffer: Deref + DerefMut { /// assert!(buffer.bounds_check(-1, 8).is_err()); /// ``` fn bounds_check(&self, offset: IndexT, size: IndexT) -> Result<()> { - if offset < 0 || size < 0 || self.deref().len() as IndexT - offset < size { - Err(AeronError::OutOfBounds) - } else { - Ok(()) - } + bounds_check_slice(self.deref(), offset, size) } /// Overlay a struct on a buffer. @@ -39,7 +43,7 @@ pub trait AtomicBuffer: Deref + DerefMut { /// NOTE: Has the potential to cause undefined behavior if alignment is incorrect. /// /// ```rust - /// # use aeron_rs::client::concurrent::AtomicBuffer; + /// # use aeron_rs::concurrent::AtomicBuffer; /// # use std::sync::atomic::{AtomicI64, Ordering}; /// let buffer = &mut [0u8; 9][..]; /// @@ -80,7 +84,7 @@ pub trait AtomicBuffer: Deref + DerefMut { /// Overlay a struct on a buffer, and perform a volatile read /// /// ```rust - /// # use aeron_rs::client::concurrent::AtomicBuffer; + /// # use aeron_rs::concurrent::AtomicBuffer; /// let buffer = &mut [5, 0, 0, 0][..]; /// /// let my_val: u32 = buffer.overlay_volatile::(0).unwrap(); @@ -100,7 +104,7 @@ pub trait AtomicBuffer: Deref + DerefMut { /// Perform a volatile write of a value over a buffer /// /// ```rust - /// # use aeron_rs::client::concurrent::AtomicBuffer; + /// # use aeron_rs::concurrent::AtomicBuffer; /// let mut buffer = &mut [0, 0, 0, 0][..]; /// /// let value: u32 = 24; @@ -121,7 +125,7 @@ pub trait AtomicBuffer: Deref + DerefMut { /// Perform an atomic fetch and add of a 64-bit value /// /// ```rust - /// # use aeron_rs::client::concurrent::AtomicBuffer; + /// # use aeron_rs::concurrent::AtomicBuffer; /// let mut buf = vec![0u8; 8]; /// assert_eq!(buf.get_and_add_i64(0, 1), Ok(0)); /// assert_eq!(buf.get_and_add_i64(0, 1), Ok(1)); @@ -135,7 +139,7 @@ pub trait AtomicBuffer: Deref + DerefMut { /// if the update was successful, and `Ok(false)` if the update failed. /// /// ```rust - /// # use aeron_rs::client::concurrent::AtomicBuffer; + /// # use aeron_rs::concurrent::AtomicBuffer; /// let mut buf = &mut [0u8; 8][..]; /// // Set value to 1 /// buf.get_and_add_i64(0, 1).unwrap(); @@ -161,7 +165,7 @@ pub trait AtomicBuffer: Deref + DerefMut { /// Perform a volatile read of an `i64` value /// /// ```rust - /// # use aeron_rs::client::concurrent::AtomicBuffer; + /// # use aeron_rs::concurrent::AtomicBuffer; /// let buffer = vec![12u8, 0, 0, 0, 0, 0, 0, 0]; /// assert_eq!(buffer.get_i64_volatile(0), Ok(12)); /// ``` @@ -178,7 +182,7 @@ pub trait AtomicBuffer: Deref + DerefMut { /// Perform a volatile write of an `i64` value /// /// ```rust - /// # use aeron_rs::client::concurrent::AtomicBuffer; + /// # use aeron_rs::concurrent::AtomicBuffer; /// let mut buffer = vec![0u8; 8]; /// buffer.put_i64_ordered(0, 12); /// assert_eq!(buffer.get_i64_volatile(0), Ok(12)); @@ -190,7 +194,7 @@ pub trait AtomicBuffer: Deref + DerefMut { /// Write an `i64` value into the buffer without performing any synchronization /// /// ```rust - /// # use aeron_rs::client::concurrent::AtomicBuffer; + /// # use aeron_rs::concurrent::AtomicBuffer; /// let mut buffer = vec![0u8; 8]; /// buffer.put_i64(0, 12); /// assert_eq!(buffer.get_i64(0), Ok(12)); @@ -199,6 +203,25 @@ pub trait AtomicBuffer: Deref + DerefMut { self.overlay_mut::(offset).map(|i| *i = value) } + /// Write the contents of a byte slice to this buffer. Does not perform any synchronization + fn put_slice( + &mut self, + index: IndexT, + source: &[u8], + source_index: IndexT, + len: IndexT, + ) -> Result<()> { + self.bounds_check(index, len)?; + bounds_check_slice(source, source_index, len)?; + + let index = index as usize; + let source_index = source_index as usize; + let len = len as usize; + + self[index..index + len].copy_from_slice(&source[source_index..source_index + len]); + Ok(()) + } + /// Write the contents of one buffer to another. Does not perform any synchronization fn put_bytes( &mut self, @@ -233,7 +256,7 @@ pub trait AtomicBuffer: Deref + DerefMut { /// Perform a volatile read of an `i32` from the buffer /// /// ```rust - /// # use aeron_rs::client::concurrent::AtomicBuffer; + /// # use aeron_rs::concurrent::AtomicBuffer; /// let buffer = vec![0, 12, 0, 0, 0]; /// assert_eq!(buffer.get_i32_volatile(1), Ok(12)); /// ``` @@ -249,7 +272,7 @@ pub trait AtomicBuffer: Deref + DerefMut { /// Perform a volatile write of an `i32` into the buffer /// /// ```rust - /// # use aeron_rs::client::concurrent::AtomicBuffer; + /// # use aeron_rs::concurrent::AtomicBuffer; /// let mut bytes = vec![0u8; 4]; /// bytes.put_i32_ordered(0, 12); /// assert_eq!(bytes.get_i32_volatile(0), Ok(12)); @@ -261,7 +284,7 @@ pub trait AtomicBuffer: Deref + DerefMut { /// Write an `i32` value into the buffer without performing any synchronization /// /// ```rust - /// # use aeron_rs::client::concurrent::AtomicBuffer; + /// # use aeron_rs::concurrent::AtomicBuffer; /// let mut buffer = vec![0u8; 5]; /// buffer.put_i32(0, 255 + 1); /// assert_eq!(buffer.get_i32(1), Ok(1)); diff --git a/aeron-rs/src/client/concurrent/ringbuffer.rs b/aeron-rs/src/concurrent/ringbuffer.rs similarity index 95% rename from aeron-rs/src/client/concurrent/ringbuffer.rs rename to aeron-rs/src/concurrent/ringbuffer.rs index 6f1d502..87f01ad 100644 --- a/aeron-rs/src/client/concurrent/ringbuffer.rs +++ b/aeron-rs/src/concurrent/ringbuffer.rs @@ -1,5 +1,5 @@ //! Ring buffer wrapper for communicating with the Media Driver -use crate::client::concurrent::AtomicBuffer; +use crate::concurrent::AtomicBuffer; use crate::util::bit::align; use crate::util::{bit, AeronError, IndexT, Result}; use std::ops::{Deref, DerefMut}; @@ -25,6 +25,9 @@ pub mod buffer_descriptor { /// the start of the ring buffer metadata trailer. pub const CORRELATION_COUNTER_OFFSET: IndexT = (CACHE_LINE_LENGTH * 8) as IndexT; + /// Offset within the ring buffer trailer to the consumer heartbeat timestamp + pub const CONSUMER_HEARTBEAT_OFFSET: IndexT = (CACHE_LINE_LENGTH * 10) as IndexT; + /// Total size of the ring buffer metadata trailer. pub const TRAILER_LENGTH: IndexT = (CACHE_LINE_LENGTH * 12) as IndexT; @@ -125,6 +128,7 @@ where head_cache_position_index: IndexT, head_position_index: IndexT, correlation_id_counter_index: IndexT, + consumer_heartbeat_index: IndexT, } impl ManyToOneRingBuffer @@ -143,6 +147,7 @@ where head_cache_position_index: capacity + buffer_descriptor::HEAD_CACHE_POSITION_OFFSET, head_position_index: capacity + buffer_descriptor::HEAD_POSITION_OFFSET, correlation_id_counter_index: capacity + buffer_descriptor::CORRELATION_COUNTER_OFFSET, + consumer_heartbeat_index: capacity + buffer_descriptor::CONSUMER_HEARTBEAT_OFFSET, }) } @@ -377,6 +382,15 @@ where pub fn max_msg_length(&self) -> IndexT { self.max_msg_length } + + /// Return the last heartbeat timestamp associated with the consumer of this queue. + /// Timestamps are milliseconds since 1 Jan 1970, UTC. + pub fn consumer_heartbeat_time(&self) -> i64 { + // UNWRAP: Known-valid offset calculated during initialization + self.buffer + .get_i64_volatile(self.consumer_heartbeat_index) + .unwrap() + } } impl Deref for ManyToOneRingBuffer @@ -401,8 +415,8 @@ where #[cfg(test)] mod tests { - use crate::client::concurrent::ringbuffer::ManyToOneRingBuffer; - use crate::client::concurrent::AtomicBuffer; + use crate::concurrent::ringbuffer::ManyToOneRingBuffer; + use crate::concurrent::AtomicBuffer; const BUFFER_SIZE: usize = 512 + super::buffer_descriptor::TRAILER_LENGTH as usize; diff --git a/aeron-rs/src/client/context.rs b/aeron-rs/src/context.rs similarity index 100% rename from aeron-rs/src/client/context.rs rename to aeron-rs/src/context.rs diff --git a/aeron-rs/src/control_protocol.rs b/aeron-rs/src/control_protocol.rs index cb2e6a7..dedb124 100644 --- a/aeron-rs/src/control_protocol.rs +++ b/aeron-rs/src/control_protocol.rs @@ -1,4 +1,4 @@ -//! Utilities for interacting with the control protocol of the Media Driver +//! Utilities for wrapping the command-and-control protocol with a nicer API use aeron_driver_sys::*; /// Construct a C-compatible enum out of a set of constants. diff --git a/aeron-rs/src/driver.rs b/aeron-rs/src/driver.rs index 352a692..a616f0d 100644 --- a/aeron-rs/src/driver.rs +++ b/aeron-rs/src/driver.rs @@ -1,4 +1,4 @@ -//! Bindings for the C Media Driver +//! High-level bindings for the C Media Driver use std::ffi::{CStr, CString}; use std::path::Path; diff --git a/aeron-rs/src/driver_proxy.rs b/aeron-rs/src/driver_proxy.rs new file mode 100644 index 0000000..3d3fdbf --- /dev/null +++ b/aeron-rs/src/driver_proxy.rs @@ -0,0 +1,77 @@ +//! High level API for issuing commands to the Media Driver +use crate::command::flyweight::Flyweight; +use crate::command::terminate_driver::TerminateDriverDefn; +use crate::concurrent::ringbuffer::ManyToOneRingBuffer; +use crate::concurrent::AtomicBuffer; +use crate::control_protocol::ClientCommand; +use crate::util::{AeronError, IndexT, Result}; + +/// High-level interface for issuing commands to a media driver +pub struct DriverProxy +where + A: AtomicBuffer, +{ + to_driver: ManyToOneRingBuffer, + client_id: i64, +} + +impl DriverProxy +where + A: AtomicBuffer, +{ + /// Initialize a new driver proxy from a command-and-control "to driver" buffer + pub fn new(to_driver: ManyToOneRingBuffer) -> Self { + let client_id = to_driver.next_correlation_id(); + DriverProxy { + to_driver, + client_id, + } + } + + /// Retrieve the timestamp of the most recent driver heartbeat. Values are + /// milliseconds past 1 Jan 1970, UTC. + pub fn time_of_last_driver_keepalive(&self) -> i64 { + self.to_driver.consumer_heartbeat_time() + } + + /// Get the unique identifier associated with this proxy. + pub fn client_id(&self) -> i64 { + self.client_id + } + + /// Request termination of the media driver. Optionally supply a payload on the request + /// that will be available to the driver. + pub fn terminate_driver(&mut self, token_buffer: Option<&[u8]>) -> Result<()> { + let client_id = self.client_id; + self.write_command_to_driver(|buffer: &mut [u8], length: &mut IndexT| { + // UNWRAP: Buffer from `write_command` guaranteed to be long enough for `TerminateDriverDefn` + let mut request = Flyweight::new::(buffer, 0).unwrap(); + + request.put_client_id(client_id).put_correlation_id(-1); + token_buffer.map(|b| request.put_token_buffer(b)); + *length = request.length(); + + ClientCommand::TerminateDriver + }) + } + + fn write_command_to_driver(&mut self, filler: F) -> Result<()> + where + F: FnOnce(&mut [u8], &mut IndexT) -> ClientCommand, + { + // QUESTION: Can Rust align structs on stack? + // C++ does some fancy shenanigans I assume help the CPU cache? + let mut buffer = &mut [0u8; 512][..]; + let mut length = buffer.len() as IndexT; + let msg_type_id = filler(&mut buffer, &mut length); + + if !self + .to_driver + .write(msg_type_id as i32, &buffer, 0, length)? + { + Err(AeronError::IllegalState) + } else { + Ok(()) + } + } +} diff --git a/aeron-rs/src/lib.rs b/aeron-rs/src/lib.rs index 9421c8b..b06c98e 100644 --- a/aeron-rs/src/lib.rs +++ b/aeron-rs/src/lib.rs @@ -4,9 +4,13 @@ #[cfg(target_endian = "big")] compile_error!("Aeron is only supported on little-endian architectures"); -pub mod client; +pub mod cnc_descriptor; +pub mod command; +pub mod concurrent; +pub mod context; pub mod control_protocol; pub mod driver; +pub mod driver_proxy; pub mod util; const fn sematic_version_compose(major: u8, minor: u8, patch: u8) -> i32 { diff --git a/aeron-rs/tests/broadcast_receiver.rs b/aeron-rs/tests/broadcast_receiver.rs index a64c8ec..ab5336c 100644 --- a/aeron-rs/tests/broadcast_receiver.rs +++ b/aeron-rs/tests/broadcast_receiver.rs @@ -1,7 +1,5 @@ -use aeron_rs::client::concurrent::broadcast::{ - buffer_descriptor, record_descriptor, BroadcastReceiver, -}; -use aeron_rs::client::concurrent::AtomicBuffer; +use aeron_rs::concurrent::broadcast::{buffer_descriptor, record_descriptor, BroadcastReceiver}; +use aeron_rs::concurrent::AtomicBuffer; use aeron_rs::util::bit::align; use aeron_rs::util::IndexT; diff --git a/aeron-rs/tests/cnc_terminate.rs b/aeron-rs/tests/cnc_terminate.rs index 2e67bd5..6e4cf8d 100644 --- a/aeron-rs/tests/cnc_terminate.rs +++ b/aeron-rs/tests/cnc_terminate.rs @@ -1,9 +1,9 @@ use aeron_driver_sys::*; -use aeron_rs::client::cnc_descriptor; -use aeron_rs::client::cnc_descriptor::MetaDataDefinition; -use aeron_rs::client::concurrent::ringbuffer::ManyToOneRingBuffer; -use aeron_rs::client::concurrent::AtomicBuffer; -use aeron_rs::util::IndexT; +use aeron_rs::cnc_descriptor; +use aeron_rs::cnc_descriptor::MetaDataDefinition; +use aeron_rs::concurrent::ringbuffer::ManyToOneRingBuffer; +use aeron_rs::concurrent::AtomicBuffer; +use aeron_rs::driver_proxy::DriverProxy; use memmap::MmapOptions; use std::ffi::{c_void, CString}; use std::fs::OpenOptions; @@ -115,24 +115,11 @@ fn cnc_terminate() { .to_driver_buffer_length; let buffer_end = cnc_metadata_len + buffer_len as usize; - let mut ring_buffer = ManyToOneRingBuffer::new(&mut mmap[cnc_metadata_len..buffer_end]) + let ring_buffer = ManyToOneRingBuffer::new(&mut mmap[cnc_metadata_len..buffer_end]) .expect("Improperly sized buffer"); - // 20 bytes: Client ID (8), correlation ID (8), token length (4) - let mut terminate_bytes = vec![0u8; 20]; - let client_id = ring_buffer.next_correlation_id(); - terminate_bytes.put_i64_ordered(0, client_id).unwrap(); - terminate_bytes.put_i64_ordered(8, -1).unwrap(); - - let term_id: i32 = 0x0E; - ring_buffer - .write( - term_id, - &terminate_bytes, - 0, - terminate_bytes.len() as IndexT, - ) - .unwrap(); + let mut driver_proxy = DriverProxy::new(ring_buffer); + driver_proxy.terminate_driver(None).unwrap(); // Wait for the driver to finish // TODO: Timeout, and then set `RUNNING` manually diff --git a/aeron-rs/tests/many_to_one_ring_buffer.rs b/aeron-rs/tests/many_to_one_ring_buffer.rs index 0d3d272..bbbf99a 100644 --- a/aeron-rs/tests/many_to_one_ring_buffer.rs +++ b/aeron-rs/tests/many_to_one_ring_buffer.rs @@ -1,8 +1,6 @@ /// Tests based on the C++ tests included with Aeron -use aeron_rs::client::concurrent::ringbuffer::{ - buffer_descriptor, record_descriptor, ManyToOneRingBuffer, -}; -use aeron_rs::client::concurrent::AtomicBuffer; +use aeron_rs::concurrent::ringbuffer::{buffer_descriptor, record_descriptor, ManyToOneRingBuffer}; +use aeron_rs::concurrent::AtomicBuffer; use aeron_rs::util::bit::align; use aeron_rs::util::IndexT; use std::ops::Deref;