diff --git a/aeron-rs/src/command/correlated_message.rs b/aeron-rs/src/command/correlated_message.rs new file mode 100644 index 0000000..2110bb0 --- /dev/null +++ b/aeron-rs/src/command/correlated_message.rs @@ -0,0 +1,30 @@ +use crate::command::flyweight::Flyweight; +use crate::concurrent::AtomicBuffer; + +pub struct CorrelatedMessageDefn { + pub(crate) client_id: i64, + pub(crate) correlation_id: i64, +} + +impl Flyweight +where + A: AtomicBuffer, +{ + pub fn client_id(&self) -> i64 { + self.get_struct().client_id + } + + pub fn put_client_id(&mut self, value: i64) -> &mut Self { + self.get_struct_mut().client_id = value; + self + } + + pub fn correlation_id(&self) -> i64 { + self.get_struct().correlation_id + } + + pub fn put_correlation_id(&mut self, value: i64) -> &mut Self { + self.get_struct_mut().correlation_id = value; + self + } +} diff --git a/aeron-rs/src/command/flyweight.rs b/aeron-rs/src/command/flyweight.rs new file mode 100644 index 0000000..1449cf1 --- /dev/null +++ b/aeron-rs/src/command/flyweight.rs @@ -0,0 +1,47 @@ +use crate::concurrent::AtomicBuffer; +use crate::util::{IndexT, Result}; +use std::marker::PhantomData; + +pub struct Flyweight +where + A: AtomicBuffer, +{ + pub(in crate::command) buffer: A, + base_offset: IndexT, + _phantom: PhantomData, +} + +impl Flyweight +where + A: AtomicBuffer, + S: Sized, +{ + pub fn new(buffer: A, offset: IndexT) -> Result> { + buffer.overlay::(offset)?; + Ok(Flyweight { + buffer, + base_offset: offset, + _phantom: PhantomData, + }) + } + + pub(crate) fn get_struct(&self) -> &S { + // UNWRAP: Bounds check performed during initialization + self.buffer.overlay::(self.base_offset).unwrap() + } + + pub(crate) fn get_struct_mut(&mut self) -> &mut S { + // UNWRAP: Bounds check performed during initialization + self.buffer.overlay_mut::(self.base_offset).unwrap() + } + + pub(crate) fn bytes_at(&self, offset: IndexT) -> &[u8] { + let offset = (self.base_offset + offset) as usize; + // FIXME: Unwrap is unjustified here. + // C++ uses pointer arithmetic with no bounds checking, so I'm more comfortable + // with the Rust version at least panicking. Is the idea that we're safe because + // this is a crate-local (protected in C++) method? + self.buffer.bounds_check(offset as IndexT, 0).unwrap(); + &self.buffer[offset..] + } +} diff --git a/aeron-rs/src/command/mod.rs b/aeron-rs/src/command/mod.rs new file mode 100644 index 0000000..83e46c0 --- /dev/null +++ b/aeron-rs/src/command/mod.rs @@ -0,0 +1,3 @@ +pub mod correlated_message; +pub mod flyweight; +pub mod terminate_driver; diff --git a/aeron-rs/src/command/terminate_driver.rs b/aeron-rs/src/command/terminate_driver.rs new file mode 100644 index 0000000..6aba3fa --- /dev/null +++ b/aeron-rs/src/command/terminate_driver.rs @@ -0,0 +1,68 @@ +use crate::command::correlated_message::CorrelatedMessageDefn; +use crate::command::flyweight::Flyweight; +use crate::concurrent::AtomicBuffer; +use crate::util::IndexT; +use std::mem::size_of; + +pub struct TerminateDriverDefn { + pub(crate) correlated_message: CorrelatedMessageDefn, + pub(crate) token_length: i32, +} + +impl Flyweight +where + A: AtomicBuffer, +{ + pub fn client_id(&self) -> i64 { + self.get_struct().correlated_message.client_id + } + + pub fn put_client_id(&mut self, value: i64) -> &mut Self { + self.get_struct_mut().correlated_message.client_id = value; + self + } + + pub fn correlation_id(&self) -> i64 { + self.get_struct().correlated_message.correlation_id + } + + pub fn put_correlation_id(&mut self, value: i64) -> &mut Self { + self.get_struct_mut().correlated_message.correlation_id = value; + self + } + + pub fn token_length(&self) -> i32 { + self.get_struct().token_length + } + + pub fn put_token_length(&mut self, value: i32) -> &mut Self { + self.get_struct_mut().token_length = value; + self + } + + pub fn token_buffer(&self) -> &[u8] { + // QUESTION: Should I be slicing the buffer to `token_length`? + // C++ doesn't do anything, so I'm going to assume not. + &self.bytes_at(size_of::() as IndexT) + } + + pub fn put_token_buffer(&mut self, token_buffer: &[u8]) -> &mut Self { + let token_length = token_buffer.len() as i32; + self.get_struct_mut().token_length = token_length; + + if token_length > 0 { + // FIXME: Unwrap is unjustified here + // Currently just assume that people are going to be nice about the token buffer + // and not oversize it. + self.buffer + .put_slice( + size_of::() as IndexT, + &token_buffer, + 0, + token_length, + ) + .unwrap() + } + self + } +} diff --git a/aeron-rs/src/concurrent/mod.rs b/aeron-rs/src/concurrent/mod.rs index 7228885..a28b85a 100644 --- a/aeron-rs/src/concurrent/mod.rs +++ b/aeron-rs/src/concurrent/mod.rs @@ -12,6 +12,14 @@ use std::ptr::{read_volatile, write_volatile}; use memmap::MmapMut; use std::ops::{Deref, DerefMut}; +fn bounds_check_slice(slice: &[u8], offset: IndexT, size: IndexT) -> Result<()> { + if offset < 0 || size < 0 || slice.len() as IndexT - offset < size { + Err(AeronError::OutOfBounds) + } else { + Ok(()) + } +} + /// Atomic operations on slices of memory pub trait AtomicBuffer: Deref + DerefMut { /// Check that there are at least `size` bytes of memory available @@ -27,11 +35,7 @@ pub trait AtomicBuffer: Deref + DerefMut { /// assert!(buffer.bounds_check(-1, 8).is_err()); /// ``` fn bounds_check(&self, offset: IndexT, size: IndexT) -> Result<()> { - if offset < 0 || size < 0 || self.deref().len() as IndexT - offset < size { - Err(AeronError::OutOfBounds) - } else { - Ok(()) - } + bounds_check_slice(self.deref(), offset, size) } /// Overlay a struct on a buffer. @@ -199,6 +203,24 @@ pub trait AtomicBuffer: Deref + DerefMut { self.overlay_mut::(offset).map(|i| *i = value) } + fn put_slice( + &mut self, + index: IndexT, + source: &[u8], + source_index: IndexT, + len: IndexT, + ) -> Result<()> { + self.bounds_check(index, len)?; + bounds_check_slice(source, source_index, len)?; + + let index = index as usize; + let source_index = source_index as usize; + let len = len as usize; + + self[index..index + len].copy_from_slice(&source[source_index..source_index + len]); + Ok(()) + } + /// Write the contents of one buffer to another. Does not perform any synchronization fn put_bytes( &mut self, diff --git a/aeron-rs/src/concurrent/ringbuffer.rs b/aeron-rs/src/concurrent/ringbuffer.rs index bdc503e..c8bc7eb 100644 --- a/aeron-rs/src/concurrent/ringbuffer.rs +++ b/aeron-rs/src/concurrent/ringbuffer.rs @@ -25,6 +25,9 @@ pub mod buffer_descriptor { /// the start of the ring buffer metadata trailer. pub const CORRELATION_COUNTER_OFFSET: IndexT = (CACHE_LINE_LENGTH * 8) as IndexT; + /// Offset within the ring buffer trailer to the consumer heartbeat timestamp + pub const CONSUMER_HEARTBEAT_OFFSET: IndexT = (CACHE_LINE_LENGTH * 10) as IndexT; + /// Total size of the ring buffer metadata trailer. pub const TRAILER_LENGTH: IndexT = (CACHE_LINE_LENGTH * 12) as IndexT; @@ -125,6 +128,7 @@ where head_cache_position_index: IndexT, head_position_index: IndexT, correlation_id_counter_index: IndexT, + consumer_heartbeat_index: IndexT, } impl ManyToOneRingBuffer @@ -143,6 +147,7 @@ where head_cache_position_index: capacity + buffer_descriptor::HEAD_CACHE_POSITION_OFFSET, head_position_index: capacity + buffer_descriptor::HEAD_POSITION_OFFSET, correlation_id_counter_index: capacity + buffer_descriptor::CORRELATION_COUNTER_OFFSET, + consumer_heartbeat_index: capacity + buffer_descriptor::CONSUMER_HEARTBEAT_OFFSET, }) } @@ -377,6 +382,10 @@ where pub fn max_msg_length(&self) -> IndexT { self.max_msg_length } + + pub fn consumer_heartbeat_time(&self) -> Result { + self.buffer.get_i64_volatile(self.consumer_heartbeat_index) + } } impl Deref for ManyToOneRingBuffer diff --git a/aeron-rs/src/control_protocol.rs b/aeron-rs/src/control_protocol.rs new file mode 100644 index 0000000..43d23e4 --- /dev/null +++ b/aeron-rs/src/control_protocol.rs @@ -0,0 +1,107 @@ +use aeron_driver_sys::*; + +/// Construct a C-compatible enum out of a set of constants. +/// Commonly used for types in Aeron that have fixed values via `#define`, +/// but aren't actually enums (e.g. AERON_COMMAND_.*, AERON_ERROR_CODE_.*). +/// Behavior is ultimately very similar to `num::FromPrimitive`. +macro_rules! define_enum { + ( + $(#[$outer:meta])* + pub enum $name:ident {$( + $(#[$inner:meta]),* + $left:ident = $right:ident, + )+} + ) => { + #[repr(u32)] + #[derive(Debug, PartialEq)] + $(#[$outer])* + pub enum $name {$( + $(#[$inner])* + $left = $right, + )*} + + impl ::std::convert::TryFrom for $name { + type Error = (); + fn try_from(val: u32) -> Result<$name, ()> { + match val { + $(v if v == $name::$left as u32 => Ok($name::$left)),*, + _ => Err(()) + } + } + } + } +} + +define_enum!( + #[doc = "Commands sent from clients to the Media Driver"] + pub enum ClientCommand { + #[doc = "Add a Publication"] + AddPublication = AERON_COMMAND_ADD_PUBLICATION, + #[doc = "Remove a Publication"] + RemovePublication = AERON_COMMAND_REMOVE_PUBLICATION, + #[doc = "Add an Exclusive Publication"] + AddExclusivePublication = AERON_COMMAND_ADD_EXCLUSIVE_PUBLICATION, + #[doc = "Add a Subscriber"] + AddSubscription = AERON_COMMAND_ADD_SUBSCRIPTION, + #[doc = "Remove a Subscriber"] + RemoveSubscription = AERON_COMMAND_REMOVE_SUBSCRIPTION, + #[doc = "Keepalaive from Client"] + ClientKeepalive = AERON_COMMAND_CLIENT_KEEPALIVE, + #[doc = "Add Destination to an existing Publication"] + AddDestination = AERON_COMMAND_ADD_DESTINATION, + #[doc = "Remove Destination from an existing Publication"] + RemoveDestination = AERON_COMMAND_REMOVE_DESTINATION, + #[doc = "Add a Counter to the counters manager"] + AddCounter = AERON_COMMAND_ADD_COUNTER, + #[doc = "Remove a Counter from the counters manager"] + RemoveCounter = AERON_COMMAND_REMOVE_COUNTER, + #[doc = "Close indication from Client"] + ClientClose = AERON_COMMAND_CLIENT_CLOSE, + #[doc = "Add Destination for existing Subscription"] + AddRcvDestination = AERON_COMMAND_ADD_RCV_DESTINATION, + #[doc = "Remove Destination for existing Subscription"] + RemoveRcvDestination = AERON_COMMAND_REMOVE_RCV_DESTINATION, + #[doc = "Request the driver to terminate"] + TerminateDriver = AERON_COMMAND_TERMINATE_DRIVER, + } +); + +define_enum!( + #[doc = "Responses from the Media Driver to client commands"] + pub enum DriverResponse { + #[doc = "Error Response as a result of attempting to process a client command operation"] + OnError = AERON_RESPONSE_ON_ERROR, + #[doc = "Subscribed Image buffers are available notification"] + OnAvailableImage = AERON_RESPONSE_ON_AVAILABLE_IMAGE, + #[doc = "New Publication buffers are ready notification"] + OnPublicationReady = AERON_RESPONSE_ON_PUBLICATION_READY, + #[doc = "Operation has succeeded"] + OnOperationSuccess = AERON_RESPONSE_ON_OPERATION_SUCCESS, + #[doc = "Inform client of timeout and removal of an inactive Image"] + OnUnavailableImage = AERON_RESPONSE_ON_UNAVAILABLE_IMAGE, + #[doc = "New Exclusive Publication buffers are ready notification"] + OnExclusivePublicationReady = AERON_RESPONSE_ON_EXCLUSIVE_PUBLICATION_READY, + #[doc = "New Subscription is ready notification"] + OnSubscriptionReady = AERON_RESPONSE_ON_SUBSCRIPTION_READY, + #[doc = "New counter is ready notification"] + OnCounterReady = AERON_RESPONSE_ON_COUNTER_READY, + #[doc = "Inform clients of removal of counter"] + OnUnavailableCounter = AERON_RESPONSE_ON_UNAVAILABLE_COUNTER, + #[doc = "Inform clients of client timeout"] + OnClientTimeout = AERON_RESPONSE_ON_CLIENT_TIMEOUT, + } +); + +#[cfg(test)] +mod tests { + use crate::control_protocol::ClientCommand; + use std::convert::TryInto; + + #[test] + fn client_command_convert() { + assert_eq!( + Ok(ClientCommand::AddPublication), + ::aeron_driver_sys::AERON_COMMAND_ADD_PUBLICATION.try_into() + ) + } +} diff --git a/aeron-rs/src/driver_proxy.rs b/aeron-rs/src/driver_proxy.rs new file mode 100644 index 0000000..7e48026 --- /dev/null +++ b/aeron-rs/src/driver_proxy.rs @@ -0,0 +1,73 @@ +use crate::command::flyweight::Flyweight; +use crate::command::terminate_driver::TerminateDriverDefn; +use crate::concurrent::ringbuffer::ManyToOneRingBuffer; +use crate::concurrent::AtomicBuffer; +use crate::control_protocol::ClientCommand; +use crate::util::{AeronError, IndexT, Result}; + +pub struct DriverProxy +where + A: AtomicBuffer, +{ + to_driver: ManyToOneRingBuffer, + client_id: i64, +} + +impl DriverProxy +where + A: AtomicBuffer, +{ + pub fn new(to_driver: ManyToOneRingBuffer) -> Self { + let client_id = to_driver.next_correlation_id(); + DriverProxy { + to_driver, + client_id, + } + } + + pub fn time_of_last_driver_keepalive(&self) -> Result { + self.to_driver.consumer_heartbeat_time() + } + + pub fn client_id(&self) -> i64 { + self.client_id + } + + pub fn terminate_driver(&mut self, _token_buffer: Option<&[u8]>) -> Result<()> { + let _client_id = self.client_id; + self.write_command_to_driver(|buffer: &mut [u8], _length: &mut IndexT| { + // FIXME: This method signature is ugly. + // UNWRAP: Buffer from `write_command` guaranteed to be long enough for `TerminateDriverDefn` + let _request: Flyweight<_, TerminateDriverDefn> = Flyweight::new(buffer, 0).unwrap(); + + // FIXME: Uncommenting this causes termination to not succeed + /* + request.put_client_id(client_id).put_correlation_id(-1); + token_buffer.map(|b| request.put_token_buffer(b)); + *length = request.token_length(); + */ + + ClientCommand::TerminateDriver + }) + } + + fn write_command_to_driver(&mut self, filler: F) -> Result<()> + where + F: FnOnce(&mut [u8], &mut IndexT) -> ClientCommand, + { + // QUESTION: Can Rust align structs on stack? + // C++ does some fancy shenanigans I assume help the CPU cache? + let mut buffer = &mut [0u8; 512][..]; + let mut length = buffer.len() as IndexT; + let msg_type_id = filler(&mut buffer, &mut length); + + if !self + .to_driver + .write(msg_type_id as i32, &buffer, 0, length)? + { + Err(AeronError::IllegalState) + } else { + Ok(()) + } + } +} diff --git a/aeron-rs/src/lib.rs b/aeron-rs/src/lib.rs index ae6f47c..9fc7905 100644 --- a/aeron-rs/src/lib.rs +++ b/aeron-rs/src/lib.rs @@ -1,13 +1,15 @@ //! [Aeron](https://github.com/real-logic/aeron) client for Rust -#![deny(missing_docs)] #[cfg(target_endian = "big")] compile_error!("Aeron is only supported on little-endian architectures"); pub mod cnc_descriptor; +pub mod command; pub mod concurrent; pub mod context; +pub mod control_protocol; pub mod driver; +pub mod driver_proxy; pub mod util; const fn sematic_version_compose(major: u8, minor: u8, patch: u8) -> i32 { diff --git a/aeron-rs/tests/broadcast_receiver.rs b/aeron-rs/tests/broadcast_receiver.rs index 1c465f6..ab5336c 100644 --- a/aeron-rs/tests/broadcast_receiver.rs +++ b/aeron-rs/tests/broadcast_receiver.rs @@ -1,6 +1,4 @@ -use aeron_rs::concurrent::broadcast::{ - buffer_descriptor, record_descriptor, BroadcastReceiver, -}; +use aeron_rs::concurrent::broadcast::{buffer_descriptor, record_descriptor, BroadcastReceiver}; use aeron_rs::concurrent::AtomicBuffer; use aeron_rs::util::bit::align; use aeron_rs::util::IndexT; diff --git a/aeron-rs/tests/many_to_one_ring_buffer.rs b/aeron-rs/tests/many_to_one_ring_buffer.rs index ec3c68c..bbbf99a 100644 --- a/aeron-rs/tests/many_to_one_ring_buffer.rs +++ b/aeron-rs/tests/many_to_one_ring_buffer.rs @@ -1,7 +1,5 @@ /// Tests based on the C++ tests included with Aeron -use aeron_rs::concurrent::ringbuffer::{ - buffer_descriptor, record_descriptor, ManyToOneRingBuffer, -}; +use aeron_rs::concurrent::ringbuffer::{buffer_descriptor, record_descriptor, ManyToOneRingBuffer}; use aeron_rs::concurrent::AtomicBuffer; use aeron_rs::util::bit::align; use aeron_rs::util::IndexT;