diff --git a/aeron-rs/src/client/concurrent/broadcast.rs b/aeron-rs/src/client/concurrent/broadcast.rs new file mode 100644 index 0000000..5156e6a --- /dev/null +++ b/aeron-rs/src/client/concurrent/broadcast.rs @@ -0,0 +1,185 @@ +//! Read messages that are broadcast from the media driver; this is the primary means +//! of receiving data. +use crate::client::concurrent::AtomicBuffer; +use crate::util::bit::align; +use crate::util::{IndexT, Result}; +use std::sync::atomic::{AtomicI64, Ordering}; + +/// Description of the broadcast buffer schema +pub mod buffer_descriptor { + use crate::util::bit::{is_power_of_two, CACHE_LINE_LENGTH}; + use crate::util::{AeronError, IndexT, Result}; + use std::mem::size_of; + + pub(super) const TAIL_INTENT_COUNTER_OFFSET: IndexT = 0; + pub(super) const TAIL_COUNTER_OFFSET: IndexT = + TAIL_INTENT_COUNTER_OFFSET + size_of::() as IndexT; + pub(super) const LATEST_COUNTER_OFFSET: IndexT = + TAIL_COUNTER_OFFSET + size_of::() as IndexT; + + /// Size of the broadcast buffer metadata trailer + pub const TRAILER_LENGTH: IndexT = CACHE_LINE_LENGTH as IndexT * 2; + + pub(super) fn check_capacity(capacity: IndexT) -> Result<()> { + // QUESTION: Why does C++ throw IllegalState here? + // Would've expected it to throw IllegalArgument like ring buffer + if is_power_of_two(capacity) { + Ok(()) + } else { + Err(AeronError::IllegalArgument) + } + } +} + +/// Broadcast buffer message header +// QUESTION: Isn't this the same as the ring buffer descriptor? +// Why not consolidate them? +pub mod record_descriptor { + use crate::util::IndexT; + + /// Message type to indicate a record used only + /// for padding the buffer + pub const PADDING_MSG_TYPE_ID: i32 = -1; + + /// Offset from the beginning of a record to its length + pub const LENGTH_OFFSET: IndexT = 0; + + /// Offset from the beginning of a record to its type + pub const TYPE_OFFSET: IndexT = 4; + + /// Total header length for each record + pub const HEADER_LENGTH: IndexT = 8; + + /// Alignment for all broadcast records + pub const RECORD_ALIGNMENT: IndexT = HEADER_LENGTH; + + /// Retrieve the byte offset for a record's length field given the record start + pub fn length_offset(record_offset: IndexT) -> IndexT { + record_offset + LENGTH_OFFSET + } + + /// Retrieve the byte offset for a record's type field given the record start + pub fn type_offset(record_offset: IndexT) -> IndexT { + record_offset + TYPE_OFFSET + } + + /// Retrieve the byte offset for a record's message given the record start + pub fn msg_offset(record_offset: IndexT) -> IndexT { + record_offset + HEADER_LENGTH + } +} + +/// Receive messages from a transmission stream +pub struct BroadcastReceiver +where + A: AtomicBuffer, +{ + buffer: A, + capacity: IndexT, + mask: IndexT, + tail_intent_counter_index: IndexT, + tail_counter_index: IndexT, + latest_counter_index: IndexT, + record_offset: IndexT, + cursor: i64, + next_record: i64, + lapped_count: AtomicI64, +} + +impl BroadcastReceiver +where + A: AtomicBuffer, +{ + /// Create a new receiver backed by `buffer` + pub fn new(buffer: A) -> Result { + let capacity = buffer.capacity() - buffer_descriptor::TRAILER_LENGTH; + buffer_descriptor::check_capacity(capacity)?; + + let latest_counter_index = capacity + buffer_descriptor::LATEST_COUNTER_OFFSET; + let cursor = buffer.get_i64(latest_counter_index)?; + let mask = capacity - 1; + + Ok(BroadcastReceiver { + buffer, + capacity, + mask, + tail_intent_counter_index: capacity + buffer_descriptor::TAIL_INTENT_COUNTER_OFFSET, + tail_counter_index: capacity + buffer_descriptor::TAIL_COUNTER_OFFSET, + latest_counter_index, + record_offset: (cursor as i32) & mask, + cursor, + next_record: cursor, + lapped_count: AtomicI64::new(0), + }) + } + + /// Get the total capacity of this broadcast receiver + pub fn capacity(&self) -> IndexT { + self.capacity + } + + /// Get the number of times the transmitter has lapped this receiver. Each lap + /// represents at least a buffer's worth of lost data. + pub fn lapped_count(&self) -> i64 { + // QUESTION: C++ just uses `std::atomic`, what are the ordering semantics? + // For right now I'm just assuming it's sequentially consistent + self.lapped_count.load(Ordering::SeqCst) + } + + /// Non-blocking receive of next message from the transmission stream. + /// If loss has occurred, `lapped_count` will be incremented. Returns `true` + /// if the next transmission is available, `false` otherwise. + pub fn receive_next(&mut self) -> Result { + let mut is_available = false; + let tail: i64 = self.buffer.get_i64_volatile(self.tail_counter_index)?; + let mut cursor: i64 = self.next_record; + + if tail > cursor { + // The way we set `record_offset` is slightly different from C++; + // Clippy was yelling at me, and I think this makes more sense. + if !self.validate(cursor) { + self.lapped_count.fetch_add(1, Ordering::SeqCst); + cursor = self.buffer.get_i64(self.latest_counter_index)?; + } + let mut record_offset = (cursor as i32) & self.mask; + + self.cursor = cursor; + self.next_record = cursor + + align( + self.buffer + .get_i32(record_descriptor::length_offset(record_offset))? + as usize, + record_descriptor::RECORD_ALIGNMENT as usize, + ) as i64; + + if record_descriptor::PADDING_MSG_TYPE_ID + == self + .buffer + .get_i32(record_descriptor::type_offset(record_offset))? + { + record_offset = 0; + self.cursor = self.next_record; + self.next_record += align( + self.buffer + .get_i32(record_descriptor::length_offset(record_offset))? + as usize, + record_descriptor::RECORD_ALIGNMENT as usize, + ) as i64; + } + + self.record_offset = record_offset; + is_available = true; + } + + Ok(is_available) + } + + fn validate(&self, cursor: i64) -> bool { + // UNWRAP: Length checks performed during initialization + (cursor + i64::from(self.capacity)) + > self + .buffer + .get_i64_volatile(self.tail_intent_counter_index) + .unwrap() + } +} diff --git a/aeron-rs/src/client/concurrent/mod.rs b/aeron-rs/src/client/concurrent/mod.rs index 118ee9b..6b1a452 100644 --- a/aeron-rs/src/client/concurrent/mod.rs +++ b/aeron-rs/src/client/concurrent/mod.rs @@ -1,6 +1,7 @@ //! Module for handling safe interactions among the multiple clients making use //! of a single Media Driver +pub mod broadcast; pub mod ringbuffer; use std::mem::size_of; use std::sync::atomic::{AtomicI64, Ordering}; diff --git a/aeron-rs/src/client/concurrent/ringbuffer.rs b/aeron-rs/src/client/concurrent/ringbuffer.rs index 68768b4..6f1d502 100644 --- a/aeron-rs/src/client/concurrent/ringbuffer.rs +++ b/aeron-rs/src/client/concurrent/ringbuffer.rs @@ -4,9 +4,8 @@ use crate::util::bit::align; use crate::util::{bit, AeronError, IndexT, Result}; use std::ops::{Deref, DerefMut}; -/// Description of the Ring Buffer schema. +/// Description of the ring buffer schema pub mod buffer_descriptor { - use crate::client::concurrent::AtomicBuffer; use crate::util::bit::{is_power_of_two, CACHE_LINE_LENGTH}; use crate::util::AeronError::IllegalArgument; use crate::util::{IndexT, Result}; @@ -31,13 +30,9 @@ pub mod buffer_descriptor { /// Verify the capacity of a buffer is legal for use as a ring buffer. /// Returns the actual capacity excluding ring buffer metadata. - pub fn check_capacity(buffer: &A) -> Result - where - A: AtomicBuffer, - { - let capacity = (buffer.len() - TRAILER_LENGTH as usize) as IndexT; + pub fn check_capacity(capacity: IndexT) -> Result<()> { if is_power_of_two(capacity) { - Ok(capacity) + Ok(()) } else { Err(IllegalArgument) } @@ -138,7 +133,9 @@ where { /// Create a many-to-one ring buffer from an underlying atomic buffer. pub fn new(buffer: A) -> Result { - buffer_descriptor::check_capacity(&buffer).map(|capacity| ManyToOneRingBuffer { + let capacity = buffer.capacity() - buffer_descriptor::TRAILER_LENGTH; + buffer_descriptor::check_capacity(capacity)?; + Ok(ManyToOneRingBuffer { buffer, capacity, max_msg_length: capacity / 8, diff --git a/aeron-rs/tests/broadcast_receiver.rs b/aeron-rs/tests/broadcast_receiver.rs new file mode 100644 index 0000000..131a7c7 --- /dev/null +++ b/aeron-rs/tests/broadcast_receiver.rs @@ -0,0 +1,34 @@ +use aeron_rs::client::concurrent::broadcast::{buffer_descriptor, BroadcastReceiver}; +use aeron_rs::util::IndexT; + +const CAPACITY: usize = 1024; +const TOTAL_BUFFER_LENGTH: usize = CAPACITY + buffer_descriptor::TRAILER_LENGTH as usize; + +// NOTE: The C++ tests use a mock atomic buffer for testing to validate behavior. +// This is rather hard to do with Rust, so we skip behavior validation for now, +// and assume that other tests will end up verifying needed behavior. + +#[test] +fn should_calculate_capacity_for_buffer() { + let buffer = BroadcastReceiver::new(vec![0u8; TOTAL_BUFFER_LENGTH]).unwrap(); + assert_eq!(buffer.capacity(), CAPACITY as IndexT); +} + +#[test] +fn should_throw_exception_for_capacity_that_is_not_power_of_two() { + let bytes = vec![0u8; 777 + buffer_descriptor::TRAILER_LENGTH as usize]; + + assert!(BroadcastReceiver::new(bytes).is_err()); +} + +#[test] +fn should_not_be_lapped_before_reception() { + let receiver = BroadcastReceiver::new(vec![0u8; TOTAL_BUFFER_LENGTH]).unwrap(); + assert_eq!(receiver.lapped_count(), 0); +} + +#[test] +fn should_not_receive_from_empty_buffer() { + let mut receiver = BroadcastReceiver::new(vec![0u8; TOTAL_BUFFER_LENGTH]).unwrap(); + assert_eq!(receiver.receive_next(), Ok(false)); +}