From 505b9a4bd6aa6cae7b3fb58bcdf495c3145de1e9 Mon Sep 17 00:00:00 2001 From: Bradlee Speice Date: Sun, 3 Nov 2019 16:19:03 -0500 Subject: [PATCH 1/6] Refactor - don't use a client module --- aeron-rs/src/client/mod.rs | 6 - aeron-rs/src/{client => }/cnc_descriptor.rs | 2 +- .../src/{client => }/concurrent/broadcast.rs | 2 +- aeron-rs/src/{client => }/concurrent/mod.rs | 24 ++-- .../src/{client => }/concurrent/ringbuffer.rs | 6 +- aeron-rs/src/{client => }/context.rs | 0 aeron-rs/src/control_protocol.rs | 108 ------------------ aeron-rs/src/driver.rs | 2 +- aeron-rs/src/lib.rs | 5 +- aeron-rs/tests/broadcast_receiver.rs | 4 +- aeron-rs/tests/cnc_terminate.rs | 8 +- aeron-rs/tests/many_to_one_ring_buffer.rs | 4 +- 12 files changed, 29 insertions(+), 142 deletions(-) delete mode 100644 aeron-rs/src/client/mod.rs rename aeron-rs/src/{client => }/cnc_descriptor.rs (98%) rename aeron-rs/src/{client => }/concurrent/broadcast.rs (99%) rename aeron-rs/src/{client => }/concurrent/mod.rs (93%) rename aeron-rs/src/{client => }/concurrent/ringbuffer.rs (98%) rename aeron-rs/src/{client => }/context.rs (100%) delete mode 100644 aeron-rs/src/control_protocol.rs diff --git a/aeron-rs/src/client/mod.rs b/aeron-rs/src/client/mod.rs deleted file mode 100644 index 32093de..0000000 --- a/aeron-rs/src/client/mod.rs +++ /dev/null @@ -1,6 +0,0 @@ -//! Aeron client -//! -//! These are the modules necessary to construct a functioning Aeron client -pub mod cnc_descriptor; -pub mod concurrent; -pub mod context; diff --git a/aeron-rs/src/client/cnc_descriptor.rs b/aeron-rs/src/cnc_descriptor.rs similarity index 98% rename from aeron-rs/src/client/cnc_descriptor.rs rename to aeron-rs/src/cnc_descriptor.rs index 9d05066..400bdf0 100644 --- a/aeron-rs/src/client/cnc_descriptor.rs +++ b/aeron-rs/src/cnc_descriptor.rs @@ -77,7 +77,7 @@ pub const CNC_FILE: &str = "cnc.dat"; #[cfg(test)] mod tests { - use crate::client::cnc_descriptor::{MetaDataDefinition, CNC_FILE, CNC_VERSION}; + use crate::cnc_descriptor::{MetaDataDefinition, CNC_FILE, CNC_VERSION}; use crate::driver::DriverContext; use memmap::MmapOptions; use std::fs::File; diff --git a/aeron-rs/src/client/concurrent/broadcast.rs b/aeron-rs/src/concurrent/broadcast.rs similarity index 99% rename from aeron-rs/src/client/concurrent/broadcast.rs rename to aeron-rs/src/concurrent/broadcast.rs index 42cac44..cb8b6a6 100644 --- a/aeron-rs/src/client/concurrent/broadcast.rs +++ b/aeron-rs/src/concurrent/broadcast.rs @@ -1,6 +1,6 @@ //! Read messages that are broadcast from the media driver; this is the primary means //! of receiving data. -use crate::client::concurrent::AtomicBuffer; +use crate::concurrent::AtomicBuffer; use crate::util::bit::align; use crate::util::{AeronError, IndexT, Result}; use std::sync::atomic::{AtomicI64, Ordering}; diff --git a/aeron-rs/src/client/concurrent/mod.rs b/aeron-rs/src/concurrent/mod.rs similarity index 93% rename from aeron-rs/src/client/concurrent/mod.rs rename to aeron-rs/src/concurrent/mod.rs index 6b1a452..7228885 100644 --- a/aeron-rs/src/client/concurrent/mod.rs +++ b/aeron-rs/src/concurrent/mod.rs @@ -18,7 +18,7 @@ pub trait AtomicBuffer: Deref + DerefMut { /// beginning at some offset. /// /// ```rust - /// # use aeron_rs::client::concurrent::AtomicBuffer; + /// # use aeron_rs::concurrent::AtomicBuffer; /// /// let buffer = &mut [0u8; 8][..]; /// assert!(buffer.bounds_check(0, 8).is_ok()); @@ -39,7 +39,7 @@ pub trait AtomicBuffer: Deref + DerefMut { /// NOTE: Has the potential to cause undefined behavior if alignment is incorrect. /// /// ```rust - /// # use aeron_rs::client::concurrent::AtomicBuffer; + /// # use aeron_rs::concurrent::AtomicBuffer; /// # use std::sync::atomic::{AtomicI64, Ordering}; /// let buffer = &mut [0u8; 9][..]; /// @@ -80,7 +80,7 @@ pub trait AtomicBuffer: Deref + DerefMut { /// Overlay a struct on a buffer, and perform a volatile read /// /// ```rust - /// # use aeron_rs::client::concurrent::AtomicBuffer; + /// # use aeron_rs::concurrent::AtomicBuffer; /// let buffer = &mut [5, 0, 0, 0][..]; /// /// let my_val: u32 = buffer.overlay_volatile::(0).unwrap(); @@ -100,7 +100,7 @@ pub trait AtomicBuffer: Deref + DerefMut { /// Perform a volatile write of a value over a buffer /// /// ```rust - /// # use aeron_rs::client::concurrent::AtomicBuffer; + /// # use aeron_rs::concurrent::AtomicBuffer; /// let mut buffer = &mut [0, 0, 0, 0][..]; /// /// let value: u32 = 24; @@ -121,7 +121,7 @@ pub trait AtomicBuffer: Deref + DerefMut { /// Perform an atomic fetch and add of a 64-bit value /// /// ```rust - /// # use aeron_rs::client::concurrent::AtomicBuffer; + /// # use aeron_rs::concurrent::AtomicBuffer; /// let mut buf = vec![0u8; 8]; /// assert_eq!(buf.get_and_add_i64(0, 1), Ok(0)); /// assert_eq!(buf.get_and_add_i64(0, 1), Ok(1)); @@ -135,7 +135,7 @@ pub trait AtomicBuffer: Deref + DerefMut { /// if the update was successful, and `Ok(false)` if the update failed. /// /// ```rust - /// # use aeron_rs::client::concurrent::AtomicBuffer; + /// # use aeron_rs::concurrent::AtomicBuffer; /// let mut buf = &mut [0u8; 8][..]; /// // Set value to 1 /// buf.get_and_add_i64(0, 1).unwrap(); @@ -161,7 +161,7 @@ pub trait AtomicBuffer: Deref + DerefMut { /// Perform a volatile read of an `i64` value /// /// ```rust - /// # use aeron_rs::client::concurrent::AtomicBuffer; + /// # use aeron_rs::concurrent::AtomicBuffer; /// let buffer = vec![12u8, 0, 0, 0, 0, 0, 0, 0]; /// assert_eq!(buffer.get_i64_volatile(0), Ok(12)); /// ``` @@ -178,7 +178,7 @@ pub trait AtomicBuffer: Deref + DerefMut { /// Perform a volatile write of an `i64` value /// /// ```rust - /// # use aeron_rs::client::concurrent::AtomicBuffer; + /// # use aeron_rs::concurrent::AtomicBuffer; /// let mut buffer = vec![0u8; 8]; /// buffer.put_i64_ordered(0, 12); /// assert_eq!(buffer.get_i64_volatile(0), Ok(12)); @@ -190,7 +190,7 @@ pub trait AtomicBuffer: Deref + DerefMut { /// Write an `i64` value into the buffer without performing any synchronization /// /// ```rust - /// # use aeron_rs::client::concurrent::AtomicBuffer; + /// # use aeron_rs::concurrent::AtomicBuffer; /// let mut buffer = vec![0u8; 8]; /// buffer.put_i64(0, 12); /// assert_eq!(buffer.get_i64(0), Ok(12)); @@ -233,7 +233,7 @@ pub trait AtomicBuffer: Deref + DerefMut { /// Perform a volatile read of an `i32` from the buffer /// /// ```rust - /// # use aeron_rs::client::concurrent::AtomicBuffer; + /// # use aeron_rs::concurrent::AtomicBuffer; /// let buffer = vec![0, 12, 0, 0, 0]; /// assert_eq!(buffer.get_i32_volatile(1), Ok(12)); /// ``` @@ -249,7 +249,7 @@ pub trait AtomicBuffer: Deref + DerefMut { /// Perform a volatile write of an `i32` into the buffer /// /// ```rust - /// # use aeron_rs::client::concurrent::AtomicBuffer; + /// # use aeron_rs::concurrent::AtomicBuffer; /// let mut bytes = vec![0u8; 4]; /// bytes.put_i32_ordered(0, 12); /// assert_eq!(bytes.get_i32_volatile(0), Ok(12)); @@ -261,7 +261,7 @@ pub trait AtomicBuffer: Deref + DerefMut { /// Write an `i32` value into the buffer without performing any synchronization /// /// ```rust - /// # use aeron_rs::client::concurrent::AtomicBuffer; + /// # use aeron_rs::concurrent::AtomicBuffer; /// let mut buffer = vec![0u8; 5]; /// buffer.put_i32(0, 255 + 1); /// assert_eq!(buffer.get_i32(1), Ok(1)); diff --git a/aeron-rs/src/client/concurrent/ringbuffer.rs b/aeron-rs/src/concurrent/ringbuffer.rs similarity index 98% rename from aeron-rs/src/client/concurrent/ringbuffer.rs rename to aeron-rs/src/concurrent/ringbuffer.rs index 6f1d502..bdc503e 100644 --- a/aeron-rs/src/client/concurrent/ringbuffer.rs +++ b/aeron-rs/src/concurrent/ringbuffer.rs @@ -1,5 +1,5 @@ //! Ring buffer wrapper for communicating with the Media Driver -use crate::client::concurrent::AtomicBuffer; +use crate::concurrent::AtomicBuffer; use crate::util::bit::align; use crate::util::{bit, AeronError, IndexT, Result}; use std::ops::{Deref, DerefMut}; @@ -401,8 +401,8 @@ where #[cfg(test)] mod tests { - use crate::client::concurrent::ringbuffer::ManyToOneRingBuffer; - use crate::client::concurrent::AtomicBuffer; + use crate::concurrent::ringbuffer::ManyToOneRingBuffer; + use crate::concurrent::AtomicBuffer; const BUFFER_SIZE: usize = 512 + super::buffer_descriptor::TRAILER_LENGTH as usize; diff --git a/aeron-rs/src/client/context.rs b/aeron-rs/src/context.rs similarity index 100% rename from aeron-rs/src/client/context.rs rename to aeron-rs/src/context.rs diff --git a/aeron-rs/src/control_protocol.rs b/aeron-rs/src/control_protocol.rs deleted file mode 100644 index cb2e6a7..0000000 --- a/aeron-rs/src/control_protocol.rs +++ /dev/null @@ -1,108 +0,0 @@ -//! Utilities for interacting with the control protocol of the Media Driver -use aeron_driver_sys::*; - -/// Construct a C-compatible enum out of a set of constants. -/// Commonly used for types in Aeron that have fixed values via `#define`, -/// but aren't actually enums (e.g. AERON_COMMAND_.*, AERON_ERROR_CODE_.*). -/// Behavior is ultimately very similar to `num::FromPrimitive`. -macro_rules! define_enum { - ( - $(#[$outer:meta])* - pub enum $name:ident {$( - $(#[$inner:meta]),* - $left:ident = $right:ident, - )+} - ) => { - #[repr(u32)] - #[derive(Debug, PartialEq)] - $(#[$outer])* - pub enum $name {$( - $(#[$inner])* - $left = $right, - )*} - - impl ::std::convert::TryFrom for $name { - type Error = (); - fn try_from(val: u32) -> Result<$name, ()> { - match val { - $(v if v == $name::$left as u32 => Ok($name::$left)),*, - _ => Err(()) - } - } - } - } -} - -define_enum!( - #[doc = "Commands sent from clients to the Media Driver"] - pub enum ClientCommand { - #[doc = "Add a Publication"] - AddPublication = AERON_COMMAND_ADD_PUBLICATION, - #[doc = "Remove a Publication"] - RemovePublication = AERON_COMMAND_REMOVE_PUBLICATION, - #[doc = "Add an Exclusive Publication"] - AddExclusivePublication = AERON_COMMAND_ADD_EXCLUSIVE_PUBLICATION, - #[doc = "Add a Subscriber"] - AddSubscription = AERON_COMMAND_ADD_SUBSCRIPTION, - #[doc = "Remove a Subscriber"] - RemoveSubscription = AERON_COMMAND_REMOVE_SUBSCRIPTION, - #[doc = "Keepalaive from Client"] - ClientKeepalive = AERON_COMMAND_CLIENT_KEEPALIVE, - #[doc = "Add Destination to an existing Publication"] - AddDestination = AERON_COMMAND_ADD_DESTINATION, - #[doc = "Remove Destination from an existing Publication"] - RemoveDestination = AERON_COMMAND_REMOVE_DESTINATION, - #[doc = "Add a Counter to the counters manager"] - AddCounter = AERON_COMMAND_ADD_COUNTER, - #[doc = "Remove a Counter from the counters manager"] - RemoveCounter = AERON_COMMAND_REMOVE_COUNTER, - #[doc = "Close indication from Client"] - ClientClose = AERON_COMMAND_CLIENT_CLOSE, - #[doc = "Add Destination for existing Subscription"] - AddRcvDestination = AERON_COMMAND_ADD_RCV_DESTINATION, - #[doc = "Remove Destination for existing Subscription"] - RemoveRcvDestination = AERON_COMMAND_REMOVE_RCV_DESTINATION, - #[doc = "Request the driver to terminate"] - TerminateDriver = AERON_COMMAND_TERMINATE_DRIVER, - } -); - -define_enum!( - #[doc = "Responses from the Media Driver to client commands"] - pub enum DriverResponse { - #[doc = "Error Response as a result of attempting to process a client command operation"] - OnError = AERON_RESPONSE_ON_ERROR, - #[doc = "Subscribed Image buffers are available notification"] - OnAvailableImage = AERON_RESPONSE_ON_AVAILABLE_IMAGE, - #[doc = "New Publication buffers are ready notification"] - OnPublicationReady = AERON_RESPONSE_ON_PUBLICATION_READY, - #[doc = "Operation has succeeded"] - OnOperationSuccess = AERON_RESPONSE_ON_OPERATION_SUCCESS, - #[doc = "Inform client of timeout and removal of an inactive Image"] - OnUnavailableImage = AERON_RESPONSE_ON_UNAVAILABLE_IMAGE, - #[doc = "New Exclusive Publication buffers are ready notification"] - OnExclusivePublicationReady = AERON_RESPONSE_ON_EXCLUSIVE_PUBLICATION_READY, - #[doc = "New Subscription is ready notification"] - OnSubscriptionReady = AERON_RESPONSE_ON_SUBSCRIPTION_READY, - #[doc = "New counter is ready notification"] - OnCounterReady = AERON_RESPONSE_ON_COUNTER_READY, - #[doc = "Inform clients of removal of counter"] - OnUnavailableCounter = AERON_RESPONSE_ON_UNAVAILABLE_COUNTER, - #[doc = "Inform clients of client timeout"] - OnClientTimeout = AERON_RESPONSE_ON_CLIENT_TIMEOUT, - } -); - -#[cfg(test)] -mod tests { - use crate::control_protocol::ClientCommand; - use std::convert::TryInto; - - #[test] - fn client_command_convert() { - assert_eq!( - Ok(ClientCommand::AddPublication), - ::aeron_driver_sys::AERON_COMMAND_ADD_PUBLICATION.try_into() - ) - } -} diff --git a/aeron-rs/src/driver.rs b/aeron-rs/src/driver.rs index 352a692..a616f0d 100644 --- a/aeron-rs/src/driver.rs +++ b/aeron-rs/src/driver.rs @@ -1,4 +1,4 @@ -//! Bindings for the C Media Driver +//! High-level bindings for the C Media Driver use std::ffi::{CStr, CString}; use std::path::Path; diff --git a/aeron-rs/src/lib.rs b/aeron-rs/src/lib.rs index 9421c8b..ae6f47c 100644 --- a/aeron-rs/src/lib.rs +++ b/aeron-rs/src/lib.rs @@ -4,8 +4,9 @@ #[cfg(target_endian = "big")] compile_error!("Aeron is only supported on little-endian architectures"); -pub mod client; -pub mod control_protocol; +pub mod cnc_descriptor; +pub mod concurrent; +pub mod context; pub mod driver; pub mod util; diff --git a/aeron-rs/tests/broadcast_receiver.rs b/aeron-rs/tests/broadcast_receiver.rs index a64c8ec..1c465f6 100644 --- a/aeron-rs/tests/broadcast_receiver.rs +++ b/aeron-rs/tests/broadcast_receiver.rs @@ -1,7 +1,7 @@ -use aeron_rs::client::concurrent::broadcast::{ +use aeron_rs::concurrent::broadcast::{ buffer_descriptor, record_descriptor, BroadcastReceiver, }; -use aeron_rs::client::concurrent::AtomicBuffer; +use aeron_rs::concurrent::AtomicBuffer; use aeron_rs::util::bit::align; use aeron_rs::util::IndexT; diff --git a/aeron-rs/tests/cnc_terminate.rs b/aeron-rs/tests/cnc_terminate.rs index 2e67bd5..526129b 100644 --- a/aeron-rs/tests/cnc_terminate.rs +++ b/aeron-rs/tests/cnc_terminate.rs @@ -1,8 +1,8 @@ use aeron_driver_sys::*; -use aeron_rs::client::cnc_descriptor; -use aeron_rs::client::cnc_descriptor::MetaDataDefinition; -use aeron_rs::client::concurrent::ringbuffer::ManyToOneRingBuffer; -use aeron_rs::client::concurrent::AtomicBuffer; +use aeron_rs::cnc_descriptor; +use aeron_rs::cnc_descriptor::MetaDataDefinition; +use aeron_rs::concurrent::ringbuffer::ManyToOneRingBuffer; +use aeron_rs::concurrent::AtomicBuffer; use aeron_rs::util::IndexT; use memmap::MmapOptions; use std::ffi::{c_void, CString}; diff --git a/aeron-rs/tests/many_to_one_ring_buffer.rs b/aeron-rs/tests/many_to_one_ring_buffer.rs index 0d3d272..ec3c68c 100644 --- a/aeron-rs/tests/many_to_one_ring_buffer.rs +++ b/aeron-rs/tests/many_to_one_ring_buffer.rs @@ -1,8 +1,8 @@ /// Tests based on the C++ tests included with Aeron -use aeron_rs::client::concurrent::ringbuffer::{ +use aeron_rs::concurrent::ringbuffer::{ buffer_descriptor, record_descriptor, ManyToOneRingBuffer, }; -use aeron_rs::client::concurrent::AtomicBuffer; +use aeron_rs::concurrent::AtomicBuffer; use aeron_rs::util::bit::align; use aeron_rs::util::IndexT; use std::ops::Deref; From 5725c32c72b34280df06fa78ea8a93fcccd93677 Mon Sep 17 00:00:00 2001 From: Bradlee Speice Date: Sun, 3 Nov 2019 21:19:32 -0500 Subject: [PATCH 2/6] Start work on flyweights and driver proxy Implemented flyweights four different ways, can't say I like any of them. This is at least a starting point. --- aeron-rs/src/command/correlated_message.rs | 30 ++++++ aeron-rs/src/command/flyweight.rs | 47 +++++++++ aeron-rs/src/command/mod.rs | 3 + aeron-rs/src/command/terminate_driver.rs | 68 +++++++++++++ aeron-rs/src/concurrent/mod.rs | 32 +++++- aeron-rs/src/concurrent/ringbuffer.rs | 9 ++ aeron-rs/src/control_protocol.rs | 107 +++++++++++++++++++++ aeron-rs/src/driver_proxy.rs | 73 ++++++++++++++ aeron-rs/src/lib.rs | 4 +- aeron-rs/tests/broadcast_receiver.rs | 4 +- aeron-rs/tests/many_to_one_ring_buffer.rs | 4 +- 11 files changed, 369 insertions(+), 12 deletions(-) create mode 100644 aeron-rs/src/command/correlated_message.rs create mode 100644 aeron-rs/src/command/flyweight.rs create mode 100644 aeron-rs/src/command/mod.rs create mode 100644 aeron-rs/src/command/terminate_driver.rs create mode 100644 aeron-rs/src/control_protocol.rs create mode 100644 aeron-rs/src/driver_proxy.rs diff --git a/aeron-rs/src/command/correlated_message.rs b/aeron-rs/src/command/correlated_message.rs new file mode 100644 index 0000000..2110bb0 --- /dev/null +++ b/aeron-rs/src/command/correlated_message.rs @@ -0,0 +1,30 @@ +use crate::command::flyweight::Flyweight; +use crate::concurrent::AtomicBuffer; + +pub struct CorrelatedMessageDefn { + pub(crate) client_id: i64, + pub(crate) correlation_id: i64, +} + +impl Flyweight +where + A: AtomicBuffer, +{ + pub fn client_id(&self) -> i64 { + self.get_struct().client_id + } + + pub fn put_client_id(&mut self, value: i64) -> &mut Self { + self.get_struct_mut().client_id = value; + self + } + + pub fn correlation_id(&self) -> i64 { + self.get_struct().correlation_id + } + + pub fn put_correlation_id(&mut self, value: i64) -> &mut Self { + self.get_struct_mut().correlation_id = value; + self + } +} diff --git a/aeron-rs/src/command/flyweight.rs b/aeron-rs/src/command/flyweight.rs new file mode 100644 index 0000000..1449cf1 --- /dev/null +++ b/aeron-rs/src/command/flyweight.rs @@ -0,0 +1,47 @@ +use crate::concurrent::AtomicBuffer; +use crate::util::{IndexT, Result}; +use std::marker::PhantomData; + +pub struct Flyweight +where + A: AtomicBuffer, +{ + pub(in crate::command) buffer: A, + base_offset: IndexT, + _phantom: PhantomData, +} + +impl Flyweight +where + A: AtomicBuffer, + S: Sized, +{ + pub fn new(buffer: A, offset: IndexT) -> Result> { + buffer.overlay::(offset)?; + Ok(Flyweight { + buffer, + base_offset: offset, + _phantom: PhantomData, + }) + } + + pub(crate) fn get_struct(&self) -> &S { + // UNWRAP: Bounds check performed during initialization + self.buffer.overlay::(self.base_offset).unwrap() + } + + pub(crate) fn get_struct_mut(&mut self) -> &mut S { + // UNWRAP: Bounds check performed during initialization + self.buffer.overlay_mut::(self.base_offset).unwrap() + } + + pub(crate) fn bytes_at(&self, offset: IndexT) -> &[u8] { + let offset = (self.base_offset + offset) as usize; + // FIXME: Unwrap is unjustified here. + // C++ uses pointer arithmetic with no bounds checking, so I'm more comfortable + // with the Rust version at least panicking. Is the idea that we're safe because + // this is a crate-local (protected in C++) method? + self.buffer.bounds_check(offset as IndexT, 0).unwrap(); + &self.buffer[offset..] + } +} diff --git a/aeron-rs/src/command/mod.rs b/aeron-rs/src/command/mod.rs new file mode 100644 index 0000000..83e46c0 --- /dev/null +++ b/aeron-rs/src/command/mod.rs @@ -0,0 +1,3 @@ +pub mod correlated_message; +pub mod flyweight; +pub mod terminate_driver; diff --git a/aeron-rs/src/command/terminate_driver.rs b/aeron-rs/src/command/terminate_driver.rs new file mode 100644 index 0000000..6aba3fa --- /dev/null +++ b/aeron-rs/src/command/terminate_driver.rs @@ -0,0 +1,68 @@ +use crate::command::correlated_message::CorrelatedMessageDefn; +use crate::command::flyweight::Flyweight; +use crate::concurrent::AtomicBuffer; +use crate::util::IndexT; +use std::mem::size_of; + +pub struct TerminateDriverDefn { + pub(crate) correlated_message: CorrelatedMessageDefn, + pub(crate) token_length: i32, +} + +impl Flyweight +where + A: AtomicBuffer, +{ + pub fn client_id(&self) -> i64 { + self.get_struct().correlated_message.client_id + } + + pub fn put_client_id(&mut self, value: i64) -> &mut Self { + self.get_struct_mut().correlated_message.client_id = value; + self + } + + pub fn correlation_id(&self) -> i64 { + self.get_struct().correlated_message.correlation_id + } + + pub fn put_correlation_id(&mut self, value: i64) -> &mut Self { + self.get_struct_mut().correlated_message.correlation_id = value; + self + } + + pub fn token_length(&self) -> i32 { + self.get_struct().token_length + } + + pub fn put_token_length(&mut self, value: i32) -> &mut Self { + self.get_struct_mut().token_length = value; + self + } + + pub fn token_buffer(&self) -> &[u8] { + // QUESTION: Should I be slicing the buffer to `token_length`? + // C++ doesn't do anything, so I'm going to assume not. + &self.bytes_at(size_of::() as IndexT) + } + + pub fn put_token_buffer(&mut self, token_buffer: &[u8]) -> &mut Self { + let token_length = token_buffer.len() as i32; + self.get_struct_mut().token_length = token_length; + + if token_length > 0 { + // FIXME: Unwrap is unjustified here + // Currently just assume that people are going to be nice about the token buffer + // and not oversize it. + self.buffer + .put_slice( + size_of::() as IndexT, + &token_buffer, + 0, + token_length, + ) + .unwrap() + } + self + } +} diff --git a/aeron-rs/src/concurrent/mod.rs b/aeron-rs/src/concurrent/mod.rs index 7228885..a28b85a 100644 --- a/aeron-rs/src/concurrent/mod.rs +++ b/aeron-rs/src/concurrent/mod.rs @@ -12,6 +12,14 @@ use std::ptr::{read_volatile, write_volatile}; use memmap::MmapMut; use std::ops::{Deref, DerefMut}; +fn bounds_check_slice(slice: &[u8], offset: IndexT, size: IndexT) -> Result<()> { + if offset < 0 || size < 0 || slice.len() as IndexT - offset < size { + Err(AeronError::OutOfBounds) + } else { + Ok(()) + } +} + /// Atomic operations on slices of memory pub trait AtomicBuffer: Deref + DerefMut { /// Check that there are at least `size` bytes of memory available @@ -27,11 +35,7 @@ pub trait AtomicBuffer: Deref + DerefMut { /// assert!(buffer.bounds_check(-1, 8).is_err()); /// ``` fn bounds_check(&self, offset: IndexT, size: IndexT) -> Result<()> { - if offset < 0 || size < 0 || self.deref().len() as IndexT - offset < size { - Err(AeronError::OutOfBounds) - } else { - Ok(()) - } + bounds_check_slice(self.deref(), offset, size) } /// Overlay a struct on a buffer. @@ -199,6 +203,24 @@ pub trait AtomicBuffer: Deref + DerefMut { self.overlay_mut::(offset).map(|i| *i = value) } + fn put_slice( + &mut self, + index: IndexT, + source: &[u8], + source_index: IndexT, + len: IndexT, + ) -> Result<()> { + self.bounds_check(index, len)?; + bounds_check_slice(source, source_index, len)?; + + let index = index as usize; + let source_index = source_index as usize; + let len = len as usize; + + self[index..index + len].copy_from_slice(&source[source_index..source_index + len]); + Ok(()) + } + /// Write the contents of one buffer to another. Does not perform any synchronization fn put_bytes( &mut self, diff --git a/aeron-rs/src/concurrent/ringbuffer.rs b/aeron-rs/src/concurrent/ringbuffer.rs index bdc503e..c8bc7eb 100644 --- a/aeron-rs/src/concurrent/ringbuffer.rs +++ b/aeron-rs/src/concurrent/ringbuffer.rs @@ -25,6 +25,9 @@ pub mod buffer_descriptor { /// the start of the ring buffer metadata trailer. pub const CORRELATION_COUNTER_OFFSET: IndexT = (CACHE_LINE_LENGTH * 8) as IndexT; + /// Offset within the ring buffer trailer to the consumer heartbeat timestamp + pub const CONSUMER_HEARTBEAT_OFFSET: IndexT = (CACHE_LINE_LENGTH * 10) as IndexT; + /// Total size of the ring buffer metadata trailer. pub const TRAILER_LENGTH: IndexT = (CACHE_LINE_LENGTH * 12) as IndexT; @@ -125,6 +128,7 @@ where head_cache_position_index: IndexT, head_position_index: IndexT, correlation_id_counter_index: IndexT, + consumer_heartbeat_index: IndexT, } impl ManyToOneRingBuffer @@ -143,6 +147,7 @@ where head_cache_position_index: capacity + buffer_descriptor::HEAD_CACHE_POSITION_OFFSET, head_position_index: capacity + buffer_descriptor::HEAD_POSITION_OFFSET, correlation_id_counter_index: capacity + buffer_descriptor::CORRELATION_COUNTER_OFFSET, + consumer_heartbeat_index: capacity + buffer_descriptor::CONSUMER_HEARTBEAT_OFFSET, }) } @@ -377,6 +382,10 @@ where pub fn max_msg_length(&self) -> IndexT { self.max_msg_length } + + pub fn consumer_heartbeat_time(&self) -> Result { + self.buffer.get_i64_volatile(self.consumer_heartbeat_index) + } } impl Deref for ManyToOneRingBuffer diff --git a/aeron-rs/src/control_protocol.rs b/aeron-rs/src/control_protocol.rs new file mode 100644 index 0000000..43d23e4 --- /dev/null +++ b/aeron-rs/src/control_protocol.rs @@ -0,0 +1,107 @@ +use aeron_driver_sys::*; + +/// Construct a C-compatible enum out of a set of constants. +/// Commonly used for types in Aeron that have fixed values via `#define`, +/// but aren't actually enums (e.g. AERON_COMMAND_.*, AERON_ERROR_CODE_.*). +/// Behavior is ultimately very similar to `num::FromPrimitive`. +macro_rules! define_enum { + ( + $(#[$outer:meta])* + pub enum $name:ident {$( + $(#[$inner:meta]),* + $left:ident = $right:ident, + )+} + ) => { + #[repr(u32)] + #[derive(Debug, PartialEq)] + $(#[$outer])* + pub enum $name {$( + $(#[$inner])* + $left = $right, + )*} + + impl ::std::convert::TryFrom for $name { + type Error = (); + fn try_from(val: u32) -> Result<$name, ()> { + match val { + $(v if v == $name::$left as u32 => Ok($name::$left)),*, + _ => Err(()) + } + } + } + } +} + +define_enum!( + #[doc = "Commands sent from clients to the Media Driver"] + pub enum ClientCommand { + #[doc = "Add a Publication"] + AddPublication = AERON_COMMAND_ADD_PUBLICATION, + #[doc = "Remove a Publication"] + RemovePublication = AERON_COMMAND_REMOVE_PUBLICATION, + #[doc = "Add an Exclusive Publication"] + AddExclusivePublication = AERON_COMMAND_ADD_EXCLUSIVE_PUBLICATION, + #[doc = "Add a Subscriber"] + AddSubscription = AERON_COMMAND_ADD_SUBSCRIPTION, + #[doc = "Remove a Subscriber"] + RemoveSubscription = AERON_COMMAND_REMOVE_SUBSCRIPTION, + #[doc = "Keepalaive from Client"] + ClientKeepalive = AERON_COMMAND_CLIENT_KEEPALIVE, + #[doc = "Add Destination to an existing Publication"] + AddDestination = AERON_COMMAND_ADD_DESTINATION, + #[doc = "Remove Destination from an existing Publication"] + RemoveDestination = AERON_COMMAND_REMOVE_DESTINATION, + #[doc = "Add a Counter to the counters manager"] + AddCounter = AERON_COMMAND_ADD_COUNTER, + #[doc = "Remove a Counter from the counters manager"] + RemoveCounter = AERON_COMMAND_REMOVE_COUNTER, + #[doc = "Close indication from Client"] + ClientClose = AERON_COMMAND_CLIENT_CLOSE, + #[doc = "Add Destination for existing Subscription"] + AddRcvDestination = AERON_COMMAND_ADD_RCV_DESTINATION, + #[doc = "Remove Destination for existing Subscription"] + RemoveRcvDestination = AERON_COMMAND_REMOVE_RCV_DESTINATION, + #[doc = "Request the driver to terminate"] + TerminateDriver = AERON_COMMAND_TERMINATE_DRIVER, + } +); + +define_enum!( + #[doc = "Responses from the Media Driver to client commands"] + pub enum DriverResponse { + #[doc = "Error Response as a result of attempting to process a client command operation"] + OnError = AERON_RESPONSE_ON_ERROR, + #[doc = "Subscribed Image buffers are available notification"] + OnAvailableImage = AERON_RESPONSE_ON_AVAILABLE_IMAGE, + #[doc = "New Publication buffers are ready notification"] + OnPublicationReady = AERON_RESPONSE_ON_PUBLICATION_READY, + #[doc = "Operation has succeeded"] + OnOperationSuccess = AERON_RESPONSE_ON_OPERATION_SUCCESS, + #[doc = "Inform client of timeout and removal of an inactive Image"] + OnUnavailableImage = AERON_RESPONSE_ON_UNAVAILABLE_IMAGE, + #[doc = "New Exclusive Publication buffers are ready notification"] + OnExclusivePublicationReady = AERON_RESPONSE_ON_EXCLUSIVE_PUBLICATION_READY, + #[doc = "New Subscription is ready notification"] + OnSubscriptionReady = AERON_RESPONSE_ON_SUBSCRIPTION_READY, + #[doc = "New counter is ready notification"] + OnCounterReady = AERON_RESPONSE_ON_COUNTER_READY, + #[doc = "Inform clients of removal of counter"] + OnUnavailableCounter = AERON_RESPONSE_ON_UNAVAILABLE_COUNTER, + #[doc = "Inform clients of client timeout"] + OnClientTimeout = AERON_RESPONSE_ON_CLIENT_TIMEOUT, + } +); + +#[cfg(test)] +mod tests { + use crate::control_protocol::ClientCommand; + use std::convert::TryInto; + + #[test] + fn client_command_convert() { + assert_eq!( + Ok(ClientCommand::AddPublication), + ::aeron_driver_sys::AERON_COMMAND_ADD_PUBLICATION.try_into() + ) + } +} diff --git a/aeron-rs/src/driver_proxy.rs b/aeron-rs/src/driver_proxy.rs new file mode 100644 index 0000000..7e48026 --- /dev/null +++ b/aeron-rs/src/driver_proxy.rs @@ -0,0 +1,73 @@ +use crate::command::flyweight::Flyweight; +use crate::command::terminate_driver::TerminateDriverDefn; +use crate::concurrent::ringbuffer::ManyToOneRingBuffer; +use crate::concurrent::AtomicBuffer; +use crate::control_protocol::ClientCommand; +use crate::util::{AeronError, IndexT, Result}; + +pub struct DriverProxy +where + A: AtomicBuffer, +{ + to_driver: ManyToOneRingBuffer, + client_id: i64, +} + +impl DriverProxy +where + A: AtomicBuffer, +{ + pub fn new(to_driver: ManyToOneRingBuffer) -> Self { + let client_id = to_driver.next_correlation_id(); + DriverProxy { + to_driver, + client_id, + } + } + + pub fn time_of_last_driver_keepalive(&self) -> Result { + self.to_driver.consumer_heartbeat_time() + } + + pub fn client_id(&self) -> i64 { + self.client_id + } + + pub fn terminate_driver(&mut self, _token_buffer: Option<&[u8]>) -> Result<()> { + let _client_id = self.client_id; + self.write_command_to_driver(|buffer: &mut [u8], _length: &mut IndexT| { + // FIXME: This method signature is ugly. + // UNWRAP: Buffer from `write_command` guaranteed to be long enough for `TerminateDriverDefn` + let _request: Flyweight<_, TerminateDriverDefn> = Flyweight::new(buffer, 0).unwrap(); + + // FIXME: Uncommenting this causes termination to not succeed + /* + request.put_client_id(client_id).put_correlation_id(-1); + token_buffer.map(|b| request.put_token_buffer(b)); + *length = request.token_length(); + */ + + ClientCommand::TerminateDriver + }) + } + + fn write_command_to_driver(&mut self, filler: F) -> Result<()> + where + F: FnOnce(&mut [u8], &mut IndexT) -> ClientCommand, + { + // QUESTION: Can Rust align structs on stack? + // C++ does some fancy shenanigans I assume help the CPU cache? + let mut buffer = &mut [0u8; 512][..]; + let mut length = buffer.len() as IndexT; + let msg_type_id = filler(&mut buffer, &mut length); + + if !self + .to_driver + .write(msg_type_id as i32, &buffer, 0, length)? + { + Err(AeronError::IllegalState) + } else { + Ok(()) + } + } +} diff --git a/aeron-rs/src/lib.rs b/aeron-rs/src/lib.rs index ae6f47c..9fc7905 100644 --- a/aeron-rs/src/lib.rs +++ b/aeron-rs/src/lib.rs @@ -1,13 +1,15 @@ //! [Aeron](https://github.com/real-logic/aeron) client for Rust -#![deny(missing_docs)] #[cfg(target_endian = "big")] compile_error!("Aeron is only supported on little-endian architectures"); pub mod cnc_descriptor; +pub mod command; pub mod concurrent; pub mod context; +pub mod control_protocol; pub mod driver; +pub mod driver_proxy; pub mod util; const fn sematic_version_compose(major: u8, minor: u8, patch: u8) -> i32 { diff --git a/aeron-rs/tests/broadcast_receiver.rs b/aeron-rs/tests/broadcast_receiver.rs index 1c465f6..ab5336c 100644 --- a/aeron-rs/tests/broadcast_receiver.rs +++ b/aeron-rs/tests/broadcast_receiver.rs @@ -1,6 +1,4 @@ -use aeron_rs::concurrent::broadcast::{ - buffer_descriptor, record_descriptor, BroadcastReceiver, -}; +use aeron_rs::concurrent::broadcast::{buffer_descriptor, record_descriptor, BroadcastReceiver}; use aeron_rs::concurrent::AtomicBuffer; use aeron_rs::util::bit::align; use aeron_rs::util::IndexT; diff --git a/aeron-rs/tests/many_to_one_ring_buffer.rs b/aeron-rs/tests/many_to_one_ring_buffer.rs index ec3c68c..bbbf99a 100644 --- a/aeron-rs/tests/many_to_one_ring_buffer.rs +++ b/aeron-rs/tests/many_to_one_ring_buffer.rs @@ -1,7 +1,5 @@ /// Tests based on the C++ tests included with Aeron -use aeron_rs::concurrent::ringbuffer::{ - buffer_descriptor, record_descriptor, ManyToOneRingBuffer, -}; +use aeron_rs::concurrent::ringbuffer::{buffer_descriptor, record_descriptor, ManyToOneRingBuffer}; use aeron_rs::concurrent::AtomicBuffer; use aeron_rs::util::bit::align; use aeron_rs::util::IndexT; From 88985418e0d19595908feb522c726676a837a4a5 Mon Sep 17 00:00:00 2001 From: Bradlee Speice Date: Thu, 7 Nov 2019 20:50:08 -0500 Subject: [PATCH 3/6] Nicer flyweight API --- aeron-rs/src/command/flyweight.rs | 28 +++++++++++++++++++--------- aeron-rs/src/driver_proxy.rs | 3 +-- 2 files changed, 20 insertions(+), 11 deletions(-) diff --git a/aeron-rs/src/command/flyweight.rs b/aeron-rs/src/command/flyweight.rs index 1449cf1..8befdf4 100644 --- a/aeron-rs/src/command/flyweight.rs +++ b/aeron-rs/src/command/flyweight.rs @@ -11,20 +11,30 @@ where _phantom: PhantomData, } +pub struct Unchecked; + +impl Flyweight +where + A: AtomicBuffer, +{ + pub fn new(buffer: A, offset: IndexT) -> Result> + where + S: Sized + { + buffer.overlay::(offset)?; + Ok(Flyweight { + buffer, + base_offset: offset, + _phantom: PhantomData + }) + } +} + impl Flyweight where A: AtomicBuffer, S: Sized, { - pub fn new(buffer: A, offset: IndexT) -> Result> { - buffer.overlay::(offset)?; - Ok(Flyweight { - buffer, - base_offset: offset, - _phantom: PhantomData, - }) - } - pub(crate) fn get_struct(&self) -> &S { // UNWRAP: Bounds check performed during initialization self.buffer.overlay::(self.base_offset).unwrap() diff --git a/aeron-rs/src/driver_proxy.rs b/aeron-rs/src/driver_proxy.rs index 7e48026..4521bb7 100644 --- a/aeron-rs/src/driver_proxy.rs +++ b/aeron-rs/src/driver_proxy.rs @@ -36,9 +36,8 @@ where pub fn terminate_driver(&mut self, _token_buffer: Option<&[u8]>) -> Result<()> { let _client_id = self.client_id; self.write_command_to_driver(|buffer: &mut [u8], _length: &mut IndexT| { - // FIXME: This method signature is ugly. // UNWRAP: Buffer from `write_command` guaranteed to be long enough for `TerminateDriverDefn` - let _request: Flyweight<_, TerminateDriverDefn> = Flyweight::new(buffer, 0).unwrap(); + let _request = Flyweight::new::(buffer, 0).unwrap(); // FIXME: Uncommenting this causes termination to not succeed /* From 4884108e047c25b0ecd09bd31d42e700f3b31e8b Mon Sep 17 00:00:00 2001 From: Bradlee Speice Date: Thu, 7 Nov 2019 21:06:11 -0500 Subject: [PATCH 4/6] Fix driver termination Turns out when you attempt to write 0 bytes... you write 0 bytes. --- aeron-rs/src/command/flyweight.rs | 5 +++-- aeron-rs/src/command/terminate_driver.rs | 4 ++++ aeron-rs/src/driver_proxy.rs | 13 +++++-------- aeron-rs/tests/cnc_terminate.rs | 20 ++++---------------- 4 files changed, 16 insertions(+), 26 deletions(-) diff --git a/aeron-rs/src/command/flyweight.rs b/aeron-rs/src/command/flyweight.rs index 8befdf4..4e3162c 100644 --- a/aeron-rs/src/command/flyweight.rs +++ b/aeron-rs/src/command/flyweight.rs @@ -17,15 +17,16 @@ impl Flyweight where A: AtomicBuffer, { + #[allow(clippy::new_ret_no_self)] pub fn new(buffer: A, offset: IndexT) -> Result> where - S: Sized + S: Sized, { buffer.overlay::(offset)?; Ok(Flyweight { buffer, base_offset: offset, - _phantom: PhantomData + _phantom: PhantomData, }) } } diff --git a/aeron-rs/src/command/terminate_driver.rs b/aeron-rs/src/command/terminate_driver.rs index 6aba3fa..24f1c68 100644 --- a/aeron-rs/src/command/terminate_driver.rs +++ b/aeron-rs/src/command/terminate_driver.rs @@ -65,4 +65,8 @@ where } self } + + pub fn length(&self) -> IndexT { + size_of::() as IndexT + self.token_length() + } } diff --git a/aeron-rs/src/driver_proxy.rs b/aeron-rs/src/driver_proxy.rs index 4521bb7..d55d888 100644 --- a/aeron-rs/src/driver_proxy.rs +++ b/aeron-rs/src/driver_proxy.rs @@ -33,18 +33,15 @@ where self.client_id } - pub fn terminate_driver(&mut self, _token_buffer: Option<&[u8]>) -> Result<()> { - let _client_id = self.client_id; - self.write_command_to_driver(|buffer: &mut [u8], _length: &mut IndexT| { + pub fn terminate_driver(&mut self, token_buffer: Option<&[u8]>) -> Result<()> { + let client_id = self.client_id; + self.write_command_to_driver(|buffer: &mut [u8], length: &mut IndexT| { // UNWRAP: Buffer from `write_command` guaranteed to be long enough for `TerminateDriverDefn` - let _request = Flyweight::new::(buffer, 0).unwrap(); + let mut request = Flyweight::new::(buffer, 0).unwrap(); - // FIXME: Uncommenting this causes termination to not succeed - /* request.put_client_id(client_id).put_correlation_id(-1); token_buffer.map(|b| request.put_token_buffer(b)); - *length = request.token_length(); - */ + *length = request.length(); ClientCommand::TerminateDriver }) diff --git a/aeron-rs/tests/cnc_terminate.rs b/aeron-rs/tests/cnc_terminate.rs index 526129b..3f01086 100644 --- a/aeron-rs/tests/cnc_terminate.rs +++ b/aeron-rs/tests/cnc_terminate.rs @@ -3,6 +3,7 @@ use aeron_rs::cnc_descriptor; use aeron_rs::cnc_descriptor::MetaDataDefinition; use aeron_rs::concurrent::ringbuffer::ManyToOneRingBuffer; use aeron_rs::concurrent::AtomicBuffer; +use aeron_rs::driver_proxy::DriverProxy; use aeron_rs::util::IndexT; use memmap::MmapOptions; use std::ffi::{c_void, CString}; @@ -115,24 +116,11 @@ fn cnc_terminate() { .to_driver_buffer_length; let buffer_end = cnc_metadata_len + buffer_len as usize; - let mut ring_buffer = ManyToOneRingBuffer::new(&mut mmap[cnc_metadata_len..buffer_end]) + let ring_buffer = ManyToOneRingBuffer::new(&mut mmap[cnc_metadata_len..buffer_end]) .expect("Improperly sized buffer"); - // 20 bytes: Client ID (8), correlation ID (8), token length (4) - let mut terminate_bytes = vec![0u8; 20]; - let client_id = ring_buffer.next_correlation_id(); - terminate_bytes.put_i64_ordered(0, client_id).unwrap(); - terminate_bytes.put_i64_ordered(8, -1).unwrap(); - - let term_id: i32 = 0x0E; - ring_buffer - .write( - term_id, - &terminate_bytes, - 0, - terminate_bytes.len() as IndexT, - ) - .unwrap(); + let mut driver_proxy = DriverProxy::new(ring_buffer); + driver_proxy.terminate_driver(None).unwrap(); // Wait for the driver to finish // TODO: Timeout, and then set `RUNNING` manually From 32402823549bb256a6ae2b9c4192606f06885c9d Mon Sep 17 00:00:00 2001 From: Bradlee Speice Date: Thu, 7 Nov 2019 22:52:26 -0500 Subject: [PATCH 5/6] Re-enable `deny(docs)` It's better than leaving it out, but I don't think I've yet done a good job at explaining what each element's purpose is. --- aeron-rs/src/command/correlated_message.rs | 27 ++++++++++++++-- aeron-rs/src/command/flyweight.rs | 8 +++++ aeron-rs/src/command/mod.rs | 1 + aeron-rs/src/command/terminate_driver.rs | 37 ++++++++++++++++++++-- aeron-rs/src/concurrent/mod.rs | 1 + aeron-rs/src/concurrent/ringbuffer.rs | 9 ++++-- aeron-rs/src/control_protocol.rs | 1 + aeron-rs/src/driver_proxy.rs | 10 +++++- aeron-rs/src/lib.rs | 1 + aeron-rs/tests/cnc_terminate.rs | 1 - 10 files changed, 88 insertions(+), 8 deletions(-) diff --git a/aeron-rs/src/command/correlated_message.rs b/aeron-rs/src/command/correlated_message.rs index 2110bb0..1c4638a 100644 --- a/aeron-rs/src/command/correlated_message.rs +++ b/aeron-rs/src/command/correlated_message.rs @@ -1,30 +1,53 @@ +//! Header struct for commands that use an identifier to associate the media driver response. use crate::command::flyweight::Flyweight; use crate::concurrent::AtomicBuffer; +/// Basic definition for messages that include a client and correlation identifier to associate +/// commands and responses +#[repr(C, packed(4))] pub struct CorrelatedMessageDefn { - pub(crate) client_id: i64, - pub(crate) correlation_id: i64, + pub(in crate::command) client_id: i64, + pub(in crate::command) correlation_id: i64, } impl Flyweight where A: AtomicBuffer, { + /// Retrieve the client identifier associated with this message pub fn client_id(&self) -> i64 { self.get_struct().client_id } + /// Set the client identifier for this message pub fn put_client_id(&mut self, value: i64) -> &mut Self { self.get_struct_mut().client_id = value; self } + /// Retrieve the correlation identifier associated with this message. + /// Will uniquely identify a command and response pair. pub fn correlation_id(&self) -> i64 { self.get_struct().correlation_id } + /// Set the correlation identifier for this message pub fn put_correlation_id(&mut self, value: i64) -> &mut Self { self.get_struct_mut().correlation_id = value; self } } + +#[cfg(test)] +mod tests { + use crate::command::correlated_message::CorrelatedMessageDefn; + use std::mem::size_of; + + #[test] + fn correlated_message_size() { + assert_eq!( + size_of::(), + size_of::() + ) + } +} diff --git a/aeron-rs/src/command/flyweight.rs b/aeron-rs/src/command/flyweight.rs index 4e3162c..07cf6d8 100644 --- a/aeron-rs/src/command/flyweight.rs +++ b/aeron-rs/src/command/flyweight.rs @@ -1,7 +1,10 @@ +//! Flyweight pattern implementation for messages to and from the media driver. use crate::concurrent::AtomicBuffer; use crate::util::{IndexT, Result}; use std::marker::PhantomData; +/// Flyweight holder object. Wrapper around an underlying `AtomicBuffer` and +/// offset within that buffer that all future operations are relative to. pub struct Flyweight where A: AtomicBuffer, @@ -11,12 +14,17 @@ where _phantom: PhantomData, } +/// Marker struct. +// We can't put this `new` method in the fully generic implementation because +// Rust gets confused as to what type `S` should be. pub struct Unchecked; impl Flyweight where A: AtomicBuffer, { + /// Create a new flyweight object. Performs a bounds check on initialization + /// to ensure there is space available for `S`. #[allow(clippy::new_ret_no_self)] pub fn new(buffer: A, offset: IndexT) -> Result> where diff --git a/aeron-rs/src/command/mod.rs b/aeron-rs/src/command/mod.rs index 83e46c0..8993c97 100644 --- a/aeron-rs/src/command/mod.rs +++ b/aeron-rs/src/command/mod.rs @@ -1,3 +1,4 @@ +//! Message definitions for interactions with the Media Driver pub mod correlated_message; pub mod flyweight; pub mod terminate_driver; diff --git a/aeron-rs/src/command/terminate_driver.rs b/aeron-rs/src/command/terminate_driver.rs index 24f1c68..21ce6dc 100644 --- a/aeron-rs/src/command/terminate_driver.rs +++ b/aeron-rs/src/command/terminate_driver.rs @@ -1,51 +1,69 @@ +//! Flyweight implementation for commands to terminate the driver use crate::command::correlated_message::CorrelatedMessageDefn; use crate::command::flyweight::Flyweight; use crate::concurrent::AtomicBuffer; use crate::util::IndexT; use std::mem::size_of; +/// Raw command to terminate a driver. The `token_length` describes the length +/// of a buffer immediately trailing this struct definition and part of the +/// same message. +#[repr(C, packed(4))] pub struct TerminateDriverDefn { - pub(crate) correlated_message: CorrelatedMessageDefn, - pub(crate) token_length: i32, + pub(in crate::command) correlated_message: CorrelatedMessageDefn, + pub(in crate::command) token_length: i32, } impl Flyweight where A: AtomicBuffer, { + /// Retrieve the client identifier of this request. pub fn client_id(&self) -> i64 { self.get_struct().correlated_message.client_id } + /// Set the client identifier of this request. pub fn put_client_id(&mut self, value: i64) -> &mut Self { self.get_struct_mut().correlated_message.client_id = value; self } + /// Retrieve the correlation identifier associated with this request. Used to + /// associate driver responses with a specific request. pub fn correlation_id(&self) -> i64 { self.get_struct().correlated_message.correlation_id } + /// Set the correlation identifier to be used with this request. pub fn put_correlation_id(&mut self, value: i64) -> &mut Self { self.get_struct_mut().correlated_message.correlation_id = value; self } + /// Get the current length of the payload associated with this termination request. pub fn token_length(&self) -> i32 { self.get_struct().token_length } + /// Set the payload length of this termination request. + /// + /// NOTE: While there are no safety issues, improperly setting this value can cause panics. + /// The `token_length` value is automatically set during calls to `put_token_buffer()`, + /// so this method is not likely to be frequently used. pub fn put_token_length(&mut self, value: i32) -> &mut Self { self.get_struct_mut().token_length = value; self } + /// Return the current token payload associated with this termination request. pub fn token_buffer(&self) -> &[u8] { // QUESTION: Should I be slicing the buffer to `token_length`? // C++ doesn't do anything, so I'm going to assume not. &self.bytes_at(size_of::() as IndexT) } + /// Append a payload to the termination request. pub fn put_token_buffer(&mut self, token_buffer: &[u8]) -> &mut Self { let token_length = token_buffer.len() as i32; self.get_struct_mut().token_length = token_length; @@ -66,7 +84,22 @@ where self } + /// Get the total byte length of this termination command pub fn length(&self) -> IndexT { size_of::() as IndexT + self.token_length() } } + +#[cfg(test)] +mod tests { + use crate::command::terminate_driver::TerminateDriverDefn; + use std::mem::size_of; + + #[test] + fn terminate_command_size() { + assert_eq!( + size_of::(), + size_of::() + ) + } +} diff --git a/aeron-rs/src/concurrent/mod.rs b/aeron-rs/src/concurrent/mod.rs index a28b85a..70a1e46 100644 --- a/aeron-rs/src/concurrent/mod.rs +++ b/aeron-rs/src/concurrent/mod.rs @@ -203,6 +203,7 @@ pub trait AtomicBuffer: Deref + DerefMut { self.overlay_mut::(offset).map(|i| *i = value) } + /// Write the contents of a byte slice to this buffer. Does not perform any synchronization fn put_slice( &mut self, index: IndexT, diff --git a/aeron-rs/src/concurrent/ringbuffer.rs b/aeron-rs/src/concurrent/ringbuffer.rs index c8bc7eb..87f01ad 100644 --- a/aeron-rs/src/concurrent/ringbuffer.rs +++ b/aeron-rs/src/concurrent/ringbuffer.rs @@ -383,8 +383,13 @@ where self.max_msg_length } - pub fn consumer_heartbeat_time(&self) -> Result { - self.buffer.get_i64_volatile(self.consumer_heartbeat_index) + /// Return the last heartbeat timestamp associated with the consumer of this queue. + /// Timestamps are milliseconds since 1 Jan 1970, UTC. + pub fn consumer_heartbeat_time(&self) -> i64 { + // UNWRAP: Known-valid offset calculated during initialization + self.buffer + .get_i64_volatile(self.consumer_heartbeat_index) + .unwrap() } } diff --git a/aeron-rs/src/control_protocol.rs b/aeron-rs/src/control_protocol.rs index 43d23e4..dedb124 100644 --- a/aeron-rs/src/control_protocol.rs +++ b/aeron-rs/src/control_protocol.rs @@ -1,3 +1,4 @@ +//! Utilities for wrapping the command-and-control protocol with a nicer API use aeron_driver_sys::*; /// Construct a C-compatible enum out of a set of constants. diff --git a/aeron-rs/src/driver_proxy.rs b/aeron-rs/src/driver_proxy.rs index d55d888..3d3fdbf 100644 --- a/aeron-rs/src/driver_proxy.rs +++ b/aeron-rs/src/driver_proxy.rs @@ -1,3 +1,4 @@ +//! High level API for issuing commands to the Media Driver use crate::command::flyweight::Flyweight; use crate::command::terminate_driver::TerminateDriverDefn; use crate::concurrent::ringbuffer::ManyToOneRingBuffer; @@ -5,6 +6,7 @@ use crate::concurrent::AtomicBuffer; use crate::control_protocol::ClientCommand; use crate::util::{AeronError, IndexT, Result}; +/// High-level interface for issuing commands to a media driver pub struct DriverProxy where A: AtomicBuffer, @@ -17,6 +19,7 @@ impl DriverProxy where A: AtomicBuffer, { + /// Initialize a new driver proxy from a command-and-control "to driver" buffer pub fn new(to_driver: ManyToOneRingBuffer) -> Self { let client_id = to_driver.next_correlation_id(); DriverProxy { @@ -25,14 +28,19 @@ where } } - pub fn time_of_last_driver_keepalive(&self) -> Result { + /// Retrieve the timestamp of the most recent driver heartbeat. Values are + /// milliseconds past 1 Jan 1970, UTC. + pub fn time_of_last_driver_keepalive(&self) -> i64 { self.to_driver.consumer_heartbeat_time() } + /// Get the unique identifier associated with this proxy. pub fn client_id(&self) -> i64 { self.client_id } + /// Request termination of the media driver. Optionally supply a payload on the request + /// that will be available to the driver. pub fn terminate_driver(&mut self, token_buffer: Option<&[u8]>) -> Result<()> { let client_id = self.client_id; self.write_command_to_driver(|buffer: &mut [u8], length: &mut IndexT| { diff --git a/aeron-rs/src/lib.rs b/aeron-rs/src/lib.rs index 9fc7905..b06c98e 100644 --- a/aeron-rs/src/lib.rs +++ b/aeron-rs/src/lib.rs @@ -1,4 +1,5 @@ //! [Aeron](https://github.com/real-logic/aeron) client for Rust +#![deny(missing_docs)] #[cfg(target_endian = "big")] compile_error!("Aeron is only supported on little-endian architectures"); diff --git a/aeron-rs/tests/cnc_terminate.rs b/aeron-rs/tests/cnc_terminate.rs index 3f01086..6e4cf8d 100644 --- a/aeron-rs/tests/cnc_terminate.rs +++ b/aeron-rs/tests/cnc_terminate.rs @@ -4,7 +4,6 @@ use aeron_rs::cnc_descriptor::MetaDataDefinition; use aeron_rs::concurrent::ringbuffer::ManyToOneRingBuffer; use aeron_rs::concurrent::AtomicBuffer; use aeron_rs::driver_proxy::DriverProxy; -use aeron_rs::util::IndexT; use memmap::MmapOptions; use std::ffi::{c_void, CString}; use std::fs::OpenOptions; From a5c8bbb05be5466d761cbed11e83e400dda669cd Mon Sep 17 00:00:00 2001 From: Bradlee Speice Date: Thu, 7 Nov 2019 22:53:42 -0500 Subject: [PATCH 6/6] Minor docs note about C++ behavior --- aeron-rs/src/command/terminate_driver.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/aeron-rs/src/command/terminate_driver.rs b/aeron-rs/src/command/terminate_driver.rs index 21ce6dc..2782e93 100644 --- a/aeron-rs/src/command/terminate_driver.rs +++ b/aeron-rs/src/command/terminate_driver.rs @@ -71,7 +71,7 @@ where if token_length > 0 { // FIXME: Unwrap is unjustified here // Currently just assume that people are going to be nice about the token buffer - // and not oversize it. + // and not oversize it. C++ relies on throwing an exception if bounds are violated. self.buffer .put_slice( size_of::() as IndexT,