Add the subscription message flyweight

subscription
Bradlee Speice 2019-11-17 00:38:16 -05:00
parent 5bf2b768ae
commit 69687023fb
6 changed files with 186 additions and 5 deletions

View File

@ -59,4 +59,14 @@ where
self.buffer.bounds_check(offset as IndexT, 0)?;
Ok(&self.buffer[offset..])
}
pub(in crate::command) fn string_get(&self, offset: IndexT) -> Result<&str> {
self.buffer
.get_string((self.base_offset + offset) as IndexT)
}
pub(in crate::command) fn string_put(&mut self, offset: IndexT, value: &str) -> Result<i32> {
self.buffer
.put_string((self.base_offset + offset) as IndexT, value)
}
}

View File

@ -1,4 +1,5 @@
//! Message definitions for interactions with the Media Driver
pub mod correlated_message;
pub mod flyweight;
pub mod subscription_message;
pub mod terminate_driver;

View File

@ -0,0 +1,111 @@
//! Flyweight implementation for commands to add a subscription
use crate::command::correlated_message::CorrelatedMessageDefn;
use crate::command::flyweight::Flyweight;
use crate::concurrent::AtomicBuffer;
use crate::util::{IndexT, Result};
use std::mem::size_of;
/// Control message for adding a subscription
///
/// ```text
/// 0 1 2 3
/// 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
/// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
/// | Client ID |
/// | |
/// +---------------------------------------------------------------+
/// | Correlation ID |
/// | |
/// +---------------------------------------------------------------+
/// | Registration Correlation ID |
/// | |
/// +---------------------------------------------------------------+
/// | Stream Id |
/// +---------------------------------------------------------------+
/// | Channel Length |
/// +---------------------------------------------------------------+
/// | Channel ...
/// ... |
/// +---------------------------------------------------------------+
/// ```
#[repr(C, packed(4))]
pub struct SubscriptionMessageDefn {
correlated_message: CorrelatedMessageDefn,
registration_correlation_id: i64,
stream_id: i32,
channel_length: i32,
}
// Rust has no `offset_of` macro, so we'll just compute by hand
const CHANNEL_LENGTH_OFFSET: IndexT =
(size_of::<CorrelatedMessageDefn>() + size_of::<i64>() + size_of::<i32>()) as IndexT;
impl<A> Flyweight<A, SubscriptionMessageDefn>
where
A: AtomicBuffer,
{
/// Retrieve the client identifier of this request.
pub fn client_id(&self) -> i64 {
self.get_struct().correlated_message.client_id
}
/// Set the client identifier of this request.
pub fn put_client_id(&mut self, value: i64) -> &mut Self {
self.get_struct_mut().correlated_message.client_id = value;
self
}
/// Retrieve the correlation identifier associated with this request. Used to
/// associate driver responses with a specific request.
pub fn correlation_id(&self) -> i64 {
self.get_struct().correlated_message.correlation_id
}
/// Set the correlation identifier to be used with this request.
pub fn put_correlation_id(&mut self, value: i64) -> &mut Self {
self.get_struct_mut().correlated_message.correlation_id = value;
self
}
/// Retrieve the registration correlation identifier
// QUESTION: What is this ID used for? In the DriverProxy, it's always set to -1
pub fn registration_correlation_id(&self) -> i64 {
self.get_struct().registration_correlation_id
}
/// Set the registration correlation identifier of this request
pub fn put_registration_correlation_id(&mut self, value: i64) -> &mut Self {
self.get_struct_mut().registration_correlation_id = value;
self
}
/// Get the stream identifier associated with this request.
// QUESTION: What is the difference between stream ID and channel?
// Both are set in the `BasicSubscriber` example, not sure what they do.
pub fn stream_id(&self) -> i32 {
self.get_struct().stream_id
}
/// Set the stream identifier of this request
pub fn put_stream_id(&mut self, value: i32) -> &mut Self {
self.get_struct_mut().stream_id = value;
self
}
/// Retrieve the channel name of this request
pub fn channel(&self) -> Result<&str> {
self.string_get(CHANNEL_LENGTH_OFFSET)
}
/// Set the channel name of this request
pub fn put_channel(&mut self, value: &str) -> Result<&mut Self> {
self.string_put(CHANNEL_LENGTH_OFFSET, value)?;
Ok(self)
}
/// Get the total byte length of this subscription command
pub fn length(&self) -> IndexT {
size_of::<SubscriptionMessageDefn>() as IndexT + self.get_struct().channel_length
}
}

View File

@ -5,13 +5,25 @@ use crate::concurrent::AtomicBuffer;
use crate::util::{IndexT, Result};
use std::mem::size_of;
/// Raw command to terminate a driver. The `token_length` describes the length
/// of a buffer immediately trailing this struct definition and part of the
/// same message.
/// Command message flyweight to ask the driver process to terminate
///
/// ```text
/// 0 1 2 3
/// 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
/// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
/// | Correlation ID |
/// | |
/// +---------------------------------------------------------------+
/// | Token Length |
/// +---------------------------------------------------------------+
/// | Token Buffer ...
///... |
/// +---------------------------------------------------------------+
/// ```
#[repr(C, packed(4))]
pub struct TerminateDriverDefn {
pub(in crate::command) correlated_message: CorrelatedMessageDefn,
pub(in crate::command) token_length: i32,
correlated_message: CorrelatedMessageDefn,
token_length: i32,
}
impl<A> Flyweight<A, TerminateDriverDefn>

View File

@ -11,6 +11,7 @@ use std::ptr::{read_volatile, write_volatile};
use memmap::MmapMut;
use std::ops::{Deref, DerefMut};
use std::str::from_utf8;
fn bounds_check_slice(slice: &[u8], offset: IndexT, size: IndexT) -> Result<()> {
if offset < 0 || size < 0 || slice.len() as IndexT - offset < size {
@ -297,6 +298,50 @@ pub trait AtomicBuffer: Deref<Target = [u8]> + DerefMut<Target = [u8]> {
fn capacity(&self) -> IndexT {
self.len() as IndexT
}
/// Fetch an ASCII/UTF-8 encoded string from the buffer, prefixed by a 32-bit length value.
///
/// ```rust
/// # use aeron_rs::concurrent::AtomicBuffer;
/// # use std::mem::size_of;
/// # use aeron_rs::util::IndexT;
/// let mut buffer = vec![0u8; 10];
/// let message = [0x48, 0x65, 0x6C, 0x6C, 0x6F, 0x21];
/// buffer.put_i32(0, message.len() as IndexT);
/// buffer.put_slice(size_of::<i32>() as IndexT, &message[..], 0, message.len() as IndexT);
///
/// assert_eq!(buffer.get_string(0), Ok("Hello!"));
/// ```
fn get_string(&self, offset: IndexT) -> Result<&str> {
let length = self.get_i32(offset)?;
let str_offset = offset + size_of::<i32>() as IndexT;
self.bounds_check(str_offset, length)?;
let start = str_offset as usize;
let end = start + length as usize;
let string = from_utf8(&self[start..end]).map_err(|_e| AeronError::UnknownEncoding)?;
Ok(string)
}
/// Write a string into the buffer, prefixed by a 32-bit length value. Returns the total
/// number of bytes written.
///
/// ```rust
/// # use aeron_rs::concurrent::AtomicBuffer;
/// let mut buffer = vec![0u8; 10];
/// buffer.put_string(0, "Hello!");
/// assert_eq!(buffer.get_string(0), Ok("Hello!"));
/// ```
fn put_string(&mut self, offset: IndexT, value: &str) -> Result<i32> {
let length = value.len() as IndexT;
self.put_i32(offset, length)?;
let str_offset = offset + size_of::<i32>() as IndexT;
self.put_slice(str_offset, value.as_bytes(), 0, length)?;
Ok(size_of::<i32>() as i32 + length)
}
}
impl AtomicBuffer for Vec<u8> {}

View File

@ -18,6 +18,8 @@ pub enum AeronError {
InsufficientCapacity,
/// Indication that we have reached an invalid state and can't continue processing
IllegalState,
/// Indication that a string is not encoded in UTF-8/ASCII format
UnknownEncoding,
}
/// Result type for operations in the Aeron client