diff --git a/aeron-rs/src/command/flyweight.rs b/aeron-rs/src/command/flyweight.rs index e012a3e..fd3ceb2 100644 --- a/aeron-rs/src/command/flyweight.rs +++ b/aeron-rs/src/command/flyweight.rs @@ -59,4 +59,14 @@ where self.buffer.bounds_check(offset as IndexT, 0)?; Ok(&self.buffer[offset..]) } + + pub(in crate::command) fn string_get(&self, offset: IndexT) -> Result<&str> { + self.buffer + .get_string((self.base_offset + offset) as IndexT) + } + + pub(in crate::command) fn string_put(&mut self, offset: IndexT, value: &str) -> Result { + self.buffer + .put_string((self.base_offset + offset) as IndexT, value) + } } diff --git a/aeron-rs/src/command/mod.rs b/aeron-rs/src/command/mod.rs index 8993c97..fad151f 100644 --- a/aeron-rs/src/command/mod.rs +++ b/aeron-rs/src/command/mod.rs @@ -1,4 +1,5 @@ //! Message definitions for interactions with the Media Driver pub mod correlated_message; pub mod flyweight; +pub mod subscription_message; pub mod terminate_driver; diff --git a/aeron-rs/src/command/subscription_message.rs b/aeron-rs/src/command/subscription_message.rs new file mode 100644 index 0000000..e7157a4 --- /dev/null +++ b/aeron-rs/src/command/subscription_message.rs @@ -0,0 +1,111 @@ +//! Flyweight implementation for commands to add a subscription + +use crate::command::correlated_message::CorrelatedMessageDefn; +use crate::command::flyweight::Flyweight; +use crate::concurrent::AtomicBuffer; +use crate::util::{IndexT, Result}; +use std::mem::size_of; + +/// Control message for adding a subscription +/// +/// ```text +/// 0 1 2 3 +/// 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 +/// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +/// | Client ID | +/// | | +/// +---------------------------------------------------------------+ +/// | Correlation ID | +/// | | +/// +---------------------------------------------------------------+ +/// | Registration Correlation ID | +/// | | +/// +---------------------------------------------------------------+ +/// | Stream Id | +/// +---------------------------------------------------------------+ +/// | Channel Length | +/// +---------------------------------------------------------------+ +/// | Channel ... +/// ... | +/// +---------------------------------------------------------------+ +/// ``` +#[repr(C, packed(4))] +pub struct SubscriptionMessageDefn { + correlated_message: CorrelatedMessageDefn, + registration_correlation_id: i64, + stream_id: i32, + channel_length: i32, +} + +// Rust has no `offset_of` macro, so we'll just compute by hand +const CHANNEL_LENGTH_OFFSET: IndexT = + (size_of::() + size_of::() + size_of::()) as IndexT; + +impl Flyweight +where + A: AtomicBuffer, +{ + /// Retrieve the client identifier of this request. + pub fn client_id(&self) -> i64 { + self.get_struct().correlated_message.client_id + } + + /// Set the client identifier of this request. + pub fn put_client_id(&mut self, value: i64) -> &mut Self { + self.get_struct_mut().correlated_message.client_id = value; + self + } + + /// Retrieve the correlation identifier associated with this request. Used to + /// associate driver responses with a specific request. + pub fn correlation_id(&self) -> i64 { + self.get_struct().correlated_message.correlation_id + } + + /// Set the correlation identifier to be used with this request. + pub fn put_correlation_id(&mut self, value: i64) -> &mut Self { + self.get_struct_mut().correlated_message.correlation_id = value; + self + } + + /// Retrieve the registration correlation identifier + // QUESTION: What is this ID used for? In the DriverProxy, it's always set to -1 + pub fn registration_correlation_id(&self) -> i64 { + self.get_struct().registration_correlation_id + } + + /// Set the registration correlation identifier of this request + pub fn put_registration_correlation_id(&mut self, value: i64) -> &mut Self { + self.get_struct_mut().registration_correlation_id = value; + self + } + + /// Get the stream identifier associated with this request. + // QUESTION: What is the difference between stream ID and channel? + // Both are set in the `BasicSubscriber` example, not sure what they do. + pub fn stream_id(&self) -> i32 { + self.get_struct().stream_id + } + + /// Set the stream identifier of this request + pub fn put_stream_id(&mut self, value: i32) -> &mut Self { + self.get_struct_mut().stream_id = value; + self + } + + /// Retrieve the channel name of this request + pub fn channel(&self) -> Result<&str> { + self.string_get(CHANNEL_LENGTH_OFFSET) + } + + /// Set the channel name of this request + pub fn put_channel(&mut self, value: &str) -> Result<&mut Self> { + self.string_put(CHANNEL_LENGTH_OFFSET, value)?; + Ok(self) + } + + /// Get the total byte length of this subscription command + pub fn length(&self) -> IndexT { + size_of::() as IndexT + self.get_struct().channel_length + } +} diff --git a/aeron-rs/src/command/terminate_driver.rs b/aeron-rs/src/command/terminate_driver.rs index b1a37fd..f29acc2 100644 --- a/aeron-rs/src/command/terminate_driver.rs +++ b/aeron-rs/src/command/terminate_driver.rs @@ -5,13 +5,25 @@ use crate::concurrent::AtomicBuffer; use crate::util::{IndexT, Result}; use std::mem::size_of; -/// Raw command to terminate a driver. The `token_length` describes the length -/// of a buffer immediately trailing this struct definition and part of the -/// same message. +/// Command message flyweight to ask the driver process to terminate +/// +/// ```text +/// 0 1 2 3 +/// 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 +/// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +/// | Correlation ID | +/// | | +/// +---------------------------------------------------------------+ +/// | Token Length | +/// +---------------------------------------------------------------+ +/// | Token Buffer ... +///... | +/// +---------------------------------------------------------------+ +/// ``` #[repr(C, packed(4))] pub struct TerminateDriverDefn { - pub(in crate::command) correlated_message: CorrelatedMessageDefn, - pub(in crate::command) token_length: i32, + correlated_message: CorrelatedMessageDefn, + token_length: i32, } impl Flyweight diff --git a/aeron-rs/src/concurrent/mod.rs b/aeron-rs/src/concurrent/mod.rs index 70a1e46..b1dffde 100644 --- a/aeron-rs/src/concurrent/mod.rs +++ b/aeron-rs/src/concurrent/mod.rs @@ -11,6 +11,7 @@ use std::ptr::{read_volatile, write_volatile}; use memmap::MmapMut; use std::ops::{Deref, DerefMut}; +use std::str::from_utf8; fn bounds_check_slice(slice: &[u8], offset: IndexT, size: IndexT) -> Result<()> { if offset < 0 || size < 0 || slice.len() as IndexT - offset < size { @@ -297,6 +298,50 @@ pub trait AtomicBuffer: Deref + DerefMut { fn capacity(&self) -> IndexT { self.len() as IndexT } + + /// Fetch an ASCII/UTF-8 encoded string from the buffer, prefixed by a 32-bit length value. + /// + /// ```rust + /// # use aeron_rs::concurrent::AtomicBuffer; + /// # use std::mem::size_of; + /// # use aeron_rs::util::IndexT; + /// let mut buffer = vec![0u8; 10]; + /// let message = [0x48, 0x65, 0x6C, 0x6C, 0x6F, 0x21]; + /// buffer.put_i32(0, message.len() as IndexT); + /// buffer.put_slice(size_of::() as IndexT, &message[..], 0, message.len() as IndexT); + /// + /// assert_eq!(buffer.get_string(0), Ok("Hello!")); + /// ``` + fn get_string(&self, offset: IndexT) -> Result<&str> { + let length = self.get_i32(offset)?; + + let str_offset = offset + size_of::() as IndexT; + self.bounds_check(str_offset, length)?; + + let start = str_offset as usize; + let end = start + length as usize; + let string = from_utf8(&self[start..end]).map_err(|_e| AeronError::UnknownEncoding)?; + Ok(string) + } + + /// Write a string into the buffer, prefixed by a 32-bit length value. Returns the total + /// number of bytes written. + /// + /// ```rust + /// # use aeron_rs::concurrent::AtomicBuffer; + /// let mut buffer = vec![0u8; 10]; + /// buffer.put_string(0, "Hello!"); + /// assert_eq!(buffer.get_string(0), Ok("Hello!")); + /// ``` + fn put_string(&mut self, offset: IndexT, value: &str) -> Result { + let length = value.len() as IndexT; + self.put_i32(offset, length)?; + + let str_offset = offset + size_of::() as IndexT; + self.put_slice(str_offset, value.as_bytes(), 0, length)?; + + Ok(size_of::() as i32 + length) + } } impl AtomicBuffer for Vec {} diff --git a/aeron-rs/src/util.rs b/aeron-rs/src/util.rs index 12d5337..c7df8b3 100644 --- a/aeron-rs/src/util.rs +++ b/aeron-rs/src/util.rs @@ -18,6 +18,8 @@ pub enum AeronError { InsufficientCapacity, /// Indication that we have reached an invalid state and can't continue processing IllegalState, + /// Indication that a string is not encoded in UTF-8/ASCII format + UnknownEncoding, } /// Result type for operations in the Aeron client