From fc0c3d9b1c2c9fd748fbc135c16f9aaa2b8a3446 Mon Sep 17 00:00:00 2001 From: Bradlee Speice Date: Fri, 22 Nov 2019 21:24:30 -0500 Subject: [PATCH] Add the publication message implementation --- aeron-rs/src/command/flyweight.rs | 2 + aeron-rs/src/command/mod.rs | 1 + aeron-rs/src/command/publication_message.rs | 91 ++++++++++++++++++++ aeron-rs/src/command/subscription_message.rs | 5 +- aeron-rs/src/driver_proxy.rs | 14 +-- 5 files changed, 104 insertions(+), 9 deletions(-) create mode 100644 aeron-rs/src/command/publication_message.rs diff --git a/aeron-rs/src/command/flyweight.rs b/aeron-rs/src/command/flyweight.rs index fd3ceb2..0c5a2f4 100644 --- a/aeron-rs/src/command/flyweight.rs +++ b/aeron-rs/src/command/flyweight.rs @@ -30,6 +30,8 @@ where where S: Sized, { + // QUESTION: Should we zero the first `sizeof::()` bytes at `offset`? + // Would make sure that some of the length parameters can't trigger panics buffer.overlay::(offset)?; Ok(Flyweight { buffer, diff --git a/aeron-rs/src/command/mod.rs b/aeron-rs/src/command/mod.rs index fad151f..7b4aa83 100644 --- a/aeron-rs/src/command/mod.rs +++ b/aeron-rs/src/command/mod.rs @@ -1,5 +1,6 @@ //! Message definitions for interactions with the Media Driver pub mod correlated_message; pub mod flyweight; +pub mod publication_message; pub mod subscription_message; pub mod terminate_driver; diff --git a/aeron-rs/src/command/publication_message.rs b/aeron-rs/src/command/publication_message.rs new file mode 100644 index 0000000..33870ed --- /dev/null +++ b/aeron-rs/src/command/publication_message.rs @@ -0,0 +1,91 @@ +//! Flyweight implementation for command to add a publication +use crate::command::correlated_message::CorrelatedMessageDefn; +use crate::command::flyweight::Flyweight; +use crate::concurrent::AtomicBuffer; +use crate::util::{IndexT, Result}; +use std::mem::size_of; + +/// Control message for adding a publication +/// +/// ```text +/// 0 1 2 3 +/// 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 +/// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +/// | Client ID | +/// | | +/// +---------------------------------------------------------------+ +/// | Correlation ID | +/// | | +/// +---------------------------------------------------------------+ +/// | Stream ID | +/// +---------------------------------------------------------------+ +/// | Channel Length | +/// +---------------------------------------------------------------+ +/// | Channel ... +///... | +/// +---------------------------------------------------------------+ +/// ``` +#[repr(C, packed(4))] +pub struct PublicationMessageDefn { + correlated_message: CorrelatedMessageDefn, + stream_id: i32, + channel_length: i32, +} + +// Rust has no `offset_of` macro, so we'll just compute by hand +const CHANNEL_LENGTH_OFFSET: IndexT = + (size_of::() + size_of::()) as IndexT; + +impl Flyweight +where + A: AtomicBuffer, +{ + /// Retrieve the client identifier associated with this message + pub fn client_id(&self) -> i64 { + self.get_struct().correlated_message.client_id + } + + /// Set the client identifier for this message + pub fn put_client_id(&mut self, value: i64) -> &mut Self { + self.get_struct_mut().correlated_message.client_id = value; + self + } + + /// Retrieve the correlation identifier associated with this message. + /// Will uniquely identify a command and response pair. + pub fn correlation_id(&self) -> i64 { + self.get_struct().correlated_message.correlation_id + } + + /// Set the correlation identifier for this message + pub fn put_correlation_id(&mut self, value: i64) -> &mut Self { + self.get_struct_mut().correlated_message.correlation_id = value; + self + } + + /// Retrieve the stream identifier associated with this request + pub fn stream_id(&self) -> i32 { + self.get_struct().stream_id + } + + /// Set the stream identifier of this request + pub fn put_stream_id(&mut self, value: i32) -> &mut Self { + self.get_struct_mut().stream_id = value; + self + } + + /// Retrieve the channel name of this request + pub fn channel(&self) -> Result<&str> { + self.string_get(CHANNEL_LENGTH_OFFSET) + } + + /// Set the channel name of this request + pub fn put_channel(&mut self, value: &str) -> Result<&mut Self> { + self.string_put(CHANNEL_LENGTH_OFFSET, value).map(|_| self) + } + + /// Get the total byte length of this subscription command + pub fn length(&self) -> IndexT { + size_of::() as IndexT + self.get_struct().channel_length + } +} diff --git a/aeron-rs/src/command/subscription_message.rs b/aeron-rs/src/command/subscription_message.rs index e7157a4..50c18c5 100644 --- a/aeron-rs/src/command/subscription_message.rs +++ b/aeron-rs/src/command/subscription_message.rs @@ -1,4 +1,4 @@ -//! Flyweight implementation for commands to add a subscription +//! Flyweight implementation for command to add a subscription use crate::command::correlated_message::CorrelatedMessageDefn; use crate::command::flyweight::Flyweight; @@ -100,8 +100,7 @@ where /// Set the channel name of this request pub fn put_channel(&mut self, value: &str) -> Result<&mut Self> { - self.string_put(CHANNEL_LENGTH_OFFSET, value)?; - Ok(self) + self.string_put(CHANNEL_LENGTH_OFFSET, value).map(|_| self) } /// Get the total byte length of this subscription command diff --git a/aeron-rs/src/driver_proxy.rs b/aeron-rs/src/driver_proxy.rs index cb17cfd..94d6382 100644 --- a/aeron-rs/src/driver_proxy.rs +++ b/aeron-rs/src/driver_proxy.rs @@ -1,13 +1,12 @@ //! High level API for issuing commands to the Media Driver use crate::command::flyweight::Flyweight; +use crate::command::subscription_message::SubscriptionMessageDefn; use crate::command::terminate_driver::TerminateDriverDefn; use crate::concurrent::ringbuffer::ManyToOneRingBuffer; use crate::concurrent::AtomicBuffer; use crate::control_protocol::ClientCommand; use crate::util::{AeronError, IndexT, Result}; use std::mem::size_of; -use crate::command::subscription_message::SubscriptionMessageDefn; -use std::ops::Sub; /// High-level interface for issuing commands to a media driver pub struct DriverProxy @@ -48,14 +47,17 @@ where pub fn add_subscription(&mut self, channel: &str, stream_id: i32) -> Result { let correlation_id = self.to_driver.next_correlation_id(); if channel.len() > (COMMAND_BUFFER_SIZE - size_of::()) { - return Err(AeronError::InsufficientCapacity) + return Err(AeronError::InsufficientCapacity); } + let client_id = self.client_id; self.write_command_to_driver(|buffer: &mut [u8], length: &mut IndexT| { // UNWRAP: `SubscriptionMessageDefn` guaranteed to be smaller than `COMMAND_BUFFER_SIZE` - let mut subscription_message = Flyweight::new::(buffer, 0).unwrap(); + let mut subscription_message = + Flyweight::new::(buffer, 0).unwrap(); - subscription_message.put_client_id(self.client_id) + subscription_message + .put_client_id(client_id) .put_registration_correlation_id(-1) .put_correlation_id(correlation_id) .put_stream_id(stream_id); @@ -64,7 +66,7 @@ where *length = subscription_message.length(); ClientCommand::AddSubscription - }); + })?; Ok(correlation_id) }