From 4ad3dbcecd494792abccbfd5ace26df924e5c282 Mon Sep 17 00:00:00 2001 From: Bradlee Speice Date: Sat, 2 Nov 2019 22:33:33 -0400 Subject: [PATCH 1/3] Start work on the broadcast receiver --- aeron-rs/src/client/concurrent/broadcast.rs | 185 +++++++++++++++++++ aeron-rs/src/client/concurrent/mod.rs | 1 + aeron-rs/src/client/concurrent/ringbuffer.rs | 15 +- aeron-rs/tests/broadcast_receiver.rs | 34 ++++ 4 files changed, 226 insertions(+), 9 deletions(-) create mode 100644 aeron-rs/src/client/concurrent/broadcast.rs create mode 100644 aeron-rs/tests/broadcast_receiver.rs diff --git a/aeron-rs/src/client/concurrent/broadcast.rs b/aeron-rs/src/client/concurrent/broadcast.rs new file mode 100644 index 0000000..5156e6a --- /dev/null +++ b/aeron-rs/src/client/concurrent/broadcast.rs @@ -0,0 +1,185 @@ +//! Read messages that are broadcast from the media driver; this is the primary means +//! of receiving data. +use crate::client::concurrent::AtomicBuffer; +use crate::util::bit::align; +use crate::util::{IndexT, Result}; +use std::sync::atomic::{AtomicI64, Ordering}; + +/// Description of the broadcast buffer schema +pub mod buffer_descriptor { + use crate::util::bit::{is_power_of_two, CACHE_LINE_LENGTH}; + use crate::util::{AeronError, IndexT, Result}; + use std::mem::size_of; + + pub(super) const TAIL_INTENT_COUNTER_OFFSET: IndexT = 0; + pub(super) const TAIL_COUNTER_OFFSET: IndexT = + TAIL_INTENT_COUNTER_OFFSET + size_of::() as IndexT; + pub(super) const LATEST_COUNTER_OFFSET: IndexT = + TAIL_COUNTER_OFFSET + size_of::() as IndexT; + + /// Size of the broadcast buffer metadata trailer + pub const TRAILER_LENGTH: IndexT = CACHE_LINE_LENGTH as IndexT * 2; + + pub(super) fn check_capacity(capacity: IndexT) -> Result<()> { + // QUESTION: Why does C++ throw IllegalState here? + // Would've expected it to throw IllegalArgument like ring buffer + if is_power_of_two(capacity) { + Ok(()) + } else { + Err(AeronError::IllegalArgument) + } + } +} + +/// Broadcast buffer message header +// QUESTION: Isn't this the same as the ring buffer descriptor? +// Why not consolidate them? +pub mod record_descriptor { + use crate::util::IndexT; + + /// Message type to indicate a record used only + /// for padding the buffer + pub const PADDING_MSG_TYPE_ID: i32 = -1; + + /// Offset from the beginning of a record to its length + pub const LENGTH_OFFSET: IndexT = 0; + + /// Offset from the beginning of a record to its type + pub const TYPE_OFFSET: IndexT = 4; + + /// Total header length for each record + pub const HEADER_LENGTH: IndexT = 8; + + /// Alignment for all broadcast records + pub const RECORD_ALIGNMENT: IndexT = HEADER_LENGTH; + + /// Retrieve the byte offset for a record's length field given the record start + pub fn length_offset(record_offset: IndexT) -> IndexT { + record_offset + LENGTH_OFFSET + } + + /// Retrieve the byte offset for a record's type field given the record start + pub fn type_offset(record_offset: IndexT) -> IndexT { + record_offset + TYPE_OFFSET + } + + /// Retrieve the byte offset for a record's message given the record start + pub fn msg_offset(record_offset: IndexT) -> IndexT { + record_offset + HEADER_LENGTH + } +} + +/// Receive messages from a transmission stream +pub struct BroadcastReceiver +where + A: AtomicBuffer, +{ + buffer: A, + capacity: IndexT, + mask: IndexT, + tail_intent_counter_index: IndexT, + tail_counter_index: IndexT, + latest_counter_index: IndexT, + record_offset: IndexT, + cursor: i64, + next_record: i64, + lapped_count: AtomicI64, +} + +impl BroadcastReceiver +where + A: AtomicBuffer, +{ + /// Create a new receiver backed by `buffer` + pub fn new(buffer: A) -> Result { + let capacity = buffer.capacity() - buffer_descriptor::TRAILER_LENGTH; + buffer_descriptor::check_capacity(capacity)?; + + let latest_counter_index = capacity + buffer_descriptor::LATEST_COUNTER_OFFSET; + let cursor = buffer.get_i64(latest_counter_index)?; + let mask = capacity - 1; + + Ok(BroadcastReceiver { + buffer, + capacity, + mask, + tail_intent_counter_index: capacity + buffer_descriptor::TAIL_INTENT_COUNTER_OFFSET, + tail_counter_index: capacity + buffer_descriptor::TAIL_COUNTER_OFFSET, + latest_counter_index, + record_offset: (cursor as i32) & mask, + cursor, + next_record: cursor, + lapped_count: AtomicI64::new(0), + }) + } + + /// Get the total capacity of this broadcast receiver + pub fn capacity(&self) -> IndexT { + self.capacity + } + + /// Get the number of times the transmitter has lapped this receiver. Each lap + /// represents at least a buffer's worth of lost data. + pub fn lapped_count(&self) -> i64 { + // QUESTION: C++ just uses `std::atomic`, what are the ordering semantics? + // For right now I'm just assuming it's sequentially consistent + self.lapped_count.load(Ordering::SeqCst) + } + + /// Non-blocking receive of next message from the transmission stream. + /// If loss has occurred, `lapped_count` will be incremented. Returns `true` + /// if the next transmission is available, `false` otherwise. + pub fn receive_next(&mut self) -> Result { + let mut is_available = false; + let tail: i64 = self.buffer.get_i64_volatile(self.tail_counter_index)?; + let mut cursor: i64 = self.next_record; + + if tail > cursor { + // The way we set `record_offset` is slightly different from C++; + // Clippy was yelling at me, and I think this makes more sense. + if !self.validate(cursor) { + self.lapped_count.fetch_add(1, Ordering::SeqCst); + cursor = self.buffer.get_i64(self.latest_counter_index)?; + } + let mut record_offset = (cursor as i32) & self.mask; + + self.cursor = cursor; + self.next_record = cursor + + align( + self.buffer + .get_i32(record_descriptor::length_offset(record_offset))? + as usize, + record_descriptor::RECORD_ALIGNMENT as usize, + ) as i64; + + if record_descriptor::PADDING_MSG_TYPE_ID + == self + .buffer + .get_i32(record_descriptor::type_offset(record_offset))? + { + record_offset = 0; + self.cursor = self.next_record; + self.next_record += align( + self.buffer + .get_i32(record_descriptor::length_offset(record_offset))? + as usize, + record_descriptor::RECORD_ALIGNMENT as usize, + ) as i64; + } + + self.record_offset = record_offset; + is_available = true; + } + + Ok(is_available) + } + + fn validate(&self, cursor: i64) -> bool { + // UNWRAP: Length checks performed during initialization + (cursor + i64::from(self.capacity)) + > self + .buffer + .get_i64_volatile(self.tail_intent_counter_index) + .unwrap() + } +} diff --git a/aeron-rs/src/client/concurrent/mod.rs b/aeron-rs/src/client/concurrent/mod.rs index 118ee9b..6b1a452 100644 --- a/aeron-rs/src/client/concurrent/mod.rs +++ b/aeron-rs/src/client/concurrent/mod.rs @@ -1,6 +1,7 @@ //! Module for handling safe interactions among the multiple clients making use //! of a single Media Driver +pub mod broadcast; pub mod ringbuffer; use std::mem::size_of; use std::sync::atomic::{AtomicI64, Ordering}; diff --git a/aeron-rs/src/client/concurrent/ringbuffer.rs b/aeron-rs/src/client/concurrent/ringbuffer.rs index 68768b4..6f1d502 100644 --- a/aeron-rs/src/client/concurrent/ringbuffer.rs +++ b/aeron-rs/src/client/concurrent/ringbuffer.rs @@ -4,9 +4,8 @@ use crate::util::bit::align; use crate::util::{bit, AeronError, IndexT, Result}; use std::ops::{Deref, DerefMut}; -/// Description of the Ring Buffer schema. +/// Description of the ring buffer schema pub mod buffer_descriptor { - use crate::client::concurrent::AtomicBuffer; use crate::util::bit::{is_power_of_two, CACHE_LINE_LENGTH}; use crate::util::AeronError::IllegalArgument; use crate::util::{IndexT, Result}; @@ -31,13 +30,9 @@ pub mod buffer_descriptor { /// Verify the capacity of a buffer is legal for use as a ring buffer. /// Returns the actual capacity excluding ring buffer metadata. - pub fn check_capacity(buffer: &A) -> Result - where - A: AtomicBuffer, - { - let capacity = (buffer.len() - TRAILER_LENGTH as usize) as IndexT; + pub fn check_capacity(capacity: IndexT) -> Result<()> { if is_power_of_two(capacity) { - Ok(capacity) + Ok(()) } else { Err(IllegalArgument) } @@ -138,7 +133,9 @@ where { /// Create a many-to-one ring buffer from an underlying atomic buffer. pub fn new(buffer: A) -> Result { - buffer_descriptor::check_capacity(&buffer).map(|capacity| ManyToOneRingBuffer { + let capacity = buffer.capacity() - buffer_descriptor::TRAILER_LENGTH; + buffer_descriptor::check_capacity(capacity)?; + Ok(ManyToOneRingBuffer { buffer, capacity, max_msg_length: capacity / 8, diff --git a/aeron-rs/tests/broadcast_receiver.rs b/aeron-rs/tests/broadcast_receiver.rs new file mode 100644 index 0000000..131a7c7 --- /dev/null +++ b/aeron-rs/tests/broadcast_receiver.rs @@ -0,0 +1,34 @@ +use aeron_rs::client::concurrent::broadcast::{buffer_descriptor, BroadcastReceiver}; +use aeron_rs::util::IndexT; + +const CAPACITY: usize = 1024; +const TOTAL_BUFFER_LENGTH: usize = CAPACITY + buffer_descriptor::TRAILER_LENGTH as usize; + +// NOTE: The C++ tests use a mock atomic buffer for testing to validate behavior. +// This is rather hard to do with Rust, so we skip behavior validation for now, +// and assume that other tests will end up verifying needed behavior. + +#[test] +fn should_calculate_capacity_for_buffer() { + let buffer = BroadcastReceiver::new(vec![0u8; TOTAL_BUFFER_LENGTH]).unwrap(); + assert_eq!(buffer.capacity(), CAPACITY as IndexT); +} + +#[test] +fn should_throw_exception_for_capacity_that_is_not_power_of_two() { + let bytes = vec![0u8; 777 + buffer_descriptor::TRAILER_LENGTH as usize]; + + assert!(BroadcastReceiver::new(bytes).is_err()); +} + +#[test] +fn should_not_be_lapped_before_reception() { + let receiver = BroadcastReceiver::new(vec![0u8; TOTAL_BUFFER_LENGTH]).unwrap(); + assert_eq!(receiver.lapped_count(), 0); +} + +#[test] +fn should_not_receive_from_empty_buffer() { + let mut receiver = BroadcastReceiver::new(vec![0u8; TOTAL_BUFFER_LENGTH]).unwrap(); + assert_eq!(receiver.receive_next(), Ok(false)); +} From 6df284f73c023485841680fb4a2bd2f7789e555d Mon Sep 17 00:00:00 2001 From: Bradlee Speice Date: Sat, 2 Nov 2019 23:12:36 -0400 Subject: [PATCH 2/3] Add the copy broadcast receiver --- aeron-rs/src/client/concurrent/broadcast.rs | 98 ++++++++++++++++++++- aeron-rs/src/util.rs | 2 + 2 files changed, 96 insertions(+), 4 deletions(-) diff --git a/aeron-rs/src/client/concurrent/broadcast.rs b/aeron-rs/src/client/concurrent/broadcast.rs index 5156e6a..2824a7b 100644 --- a/aeron-rs/src/client/concurrent/broadcast.rs +++ b/aeron-rs/src/client/concurrent/broadcast.rs @@ -2,7 +2,7 @@ //! of receiving data. use crate::client::concurrent::AtomicBuffer; use crate::util::bit::align; -use crate::util::{IndexT, Result}; +use crate::util::{AeronError, IndexT, Result}; use std::sync::atomic::{AtomicI64, Ordering}; /// Description of the broadcast buffer schema @@ -69,7 +69,8 @@ pub mod record_descriptor { } } -/// Receive messages from a transmission stream +/// Receive messages from a transmission stream. Works by polling `receive_next` +/// until `true` is returned, then inspecting messages using the provided methods. pub struct BroadcastReceiver where A: AtomicBuffer, @@ -137,7 +138,7 @@ where if tail > cursor { // The way we set `record_offset` is slightly different from C++; // Clippy was yelling at me, and I think this makes more sense. - if !self.validate(cursor) { + if !self._validate(cursor) { self.lapped_count.fetch_add(1, Ordering::SeqCst); cursor = self.buffer.get_i64(self.latest_counter_index)?; } @@ -174,7 +175,34 @@ where Ok(is_available) } - fn validate(&self, cursor: i64) -> bool { + /// Get the length of the message in the current record + pub fn length(&self) -> Result { + Ok(self + .buffer + .get_i32(record_descriptor::length_offset(self.record_offset))? + - record_descriptor::HEADER_LENGTH) + } + + /// Get the offset to the message content in the current record + pub fn offset(&self) -> i32 { + record_descriptor::msg_offset(self.record_offset) + } + + /// Ensure that the current received record is still valid and has not been + /// overwritten. + pub fn validate(&self) -> bool { + // QUESTION: C++ uses `atomic::acquire()` here, what does that do? + self._validate(self.cursor) + } + + /// Get the message type identifier for the current record + pub fn msg_type_id(&self) -> Result { + Ok(self + .buffer + .get_i32(record_descriptor::type_offset(self.record_offset))?) + } + + fn _validate(&self, cursor: i64) -> bool { // UNWRAP: Length checks performed during initialization (cursor + i64::from(self.capacity)) > self @@ -183,3 +211,65 @@ where .unwrap() } } + +/// Broadcast receiver that copies messages to an internal buffer. +/// +/// The benefit of copying every message is that we keep a consistent view of the data +/// even if we're lapped while reading. However, this may be overkill if you can +/// guarantee the stream never outpaces you. +pub struct CopyBroadcastReceiver +where + A: AtomicBuffer, +{ + receiver: BroadcastReceiver, + scratch: Vec, +} + +impl CopyBroadcastReceiver +where + A: AtomicBuffer, +{ + /// Create a new broadcast receiver + pub fn new(receiver: BroadcastReceiver) -> Self { + CopyBroadcastReceiver { + receiver, + scratch: Vec::with_capacity(4096), + } + } + + /// Attempt to receive a single message from the broadcast buffer, + /// and deliver it to the message handler if successful. + /// Returns the number of messages received. + pub fn receive(&mut self, mut handler: F) -> Result + where + F: FnMut(i32, &[u8]) -> (), + { + let mut messages_received = 0; + let last_seen_lapped_count = self.receiver.lapped_count(); + + if self.receiver.receive_next()? { + if last_seen_lapped_count != self.receiver.lapped_count() { + // The C++ API uses IllegalArgument here, but returns IllegalState + // with the same message later. + return Err(AeronError::IllegalState); + } + + let length = self.receiver.length()?; + if length > AtomicBuffer::capacity(&self.scratch) { + return Err(AeronError::IllegalState); + } + + let msg_type_id = self.receiver.msg_type_id()?; + self.scratch + .put_bytes(0, &self.receiver.buffer, self.receiver.offset(), length)?; + + if !self.receiver.validate() { + return Err(AeronError::IllegalState); + } + handler(msg_type_id, &self.scratch[0..length as usize]); + messages_received += 1; + } + + Ok(messages_received) + } +} diff --git a/aeron-rs/src/util.rs b/aeron-rs/src/util.rs index 44c8cfb..12d5337 100644 --- a/aeron-rs/src/util.rs +++ b/aeron-rs/src/util.rs @@ -16,6 +16,8 @@ pub enum AeronError { OutOfBounds, /// Indication that a buffer operation could not complete because of space constraints InsufficientCapacity, + /// Indication that we have reached an invalid state and can't continue processing + IllegalState, } /// Result type for operations in the Aeron client From 1b49cd0326b4fa1ad40cc66b9d0efbe6dd778b88 Mon Sep 17 00:00:00 2001 From: Bradlee Speice Date: Sun, 3 Nov 2019 00:50:42 -0400 Subject: [PATCH 3/3] Add in the tests --- aeron-rs/src/client/concurrent/broadcast.rs | 19 ++- aeron-rs/tests/broadcast_receiver.rs | 165 +++++++++++++++++++- 2 files changed, 173 insertions(+), 11 deletions(-) diff --git a/aeron-rs/src/client/concurrent/broadcast.rs b/aeron-rs/src/client/concurrent/broadcast.rs index 2824a7b..42cac44 100644 --- a/aeron-rs/src/client/concurrent/broadcast.rs +++ b/aeron-rs/src/client/concurrent/broadcast.rs @@ -11,11 +11,14 @@ pub mod buffer_descriptor { use crate::util::{AeronError, IndexT, Result}; use std::mem::size_of; - pub(super) const TAIL_INTENT_COUNTER_OFFSET: IndexT = 0; - pub(super) const TAIL_COUNTER_OFFSET: IndexT = - TAIL_INTENT_COUNTER_OFFSET + size_of::() as IndexT; - pub(super) const LATEST_COUNTER_OFFSET: IndexT = - TAIL_COUNTER_OFFSET + size_of::() as IndexT; + /// Offset within the trailer for the tail intended value + pub const TAIL_INTENT_COUNTER_OFFSET: IndexT = 0; + + /// Offset within the trailer for the tail value + pub const TAIL_COUNTER_OFFSET: IndexT = TAIL_INTENT_COUNTER_OFFSET + size_of::() as IndexT; + + /// Offset within the buffer trailer for the latest sequence value + pub const LATEST_COUNTER_OFFSET: IndexT = TAIL_COUNTER_OFFSET + size_of::() as IndexT; /// Size of the broadcast buffer metadata trailer pub const TRAILER_LENGTH: IndexT = CACHE_LINE_LENGTH as IndexT * 2; @@ -95,10 +98,10 @@ where pub fn new(buffer: A) -> Result { let capacity = buffer.capacity() - buffer_descriptor::TRAILER_LENGTH; buffer_descriptor::check_capacity(capacity)?; + let mask = capacity - 1; let latest_counter_index = capacity + buffer_descriptor::LATEST_COUNTER_OFFSET; let cursor = buffer.get_i64(latest_counter_index)?; - let mask = capacity - 1; Ok(BroadcastReceiver { buffer, @@ -136,8 +139,8 @@ where let mut cursor: i64 = self.next_record; if tail > cursor { - // The way we set `record_offset` is slightly different from C++; - // Clippy was yelling at me, and I think this makes more sense. + // NOTE: C++ and Java clients do these first lines slightly differently. As far + // as I can tell, this is structurally equivalent, and Clippy yells less at me. if !self._validate(cursor) { self.lapped_count.fetch_add(1, Ordering::SeqCst); cursor = self.buffer.get_i64(self.latest_counter_index)?; diff --git a/aeron-rs/tests/broadcast_receiver.rs b/aeron-rs/tests/broadcast_receiver.rs index 131a7c7..a64c8ec 100644 --- a/aeron-rs/tests/broadcast_receiver.rs +++ b/aeron-rs/tests/broadcast_receiver.rs @@ -1,12 +1,22 @@ -use aeron_rs::client::concurrent::broadcast::{buffer_descriptor, BroadcastReceiver}; +use aeron_rs::client::concurrent::broadcast::{ + buffer_descriptor, record_descriptor, BroadcastReceiver, +}; +use aeron_rs::client::concurrent::AtomicBuffer; +use aeron_rs::util::bit::align; use aeron_rs::util::IndexT; const CAPACITY: usize = 1024; const TOTAL_BUFFER_LENGTH: usize = CAPACITY + buffer_descriptor::TRAILER_LENGTH as usize; +const MSG_TYPE_ID: i32 = 7; +const TAIL_INTENT_COUNTER_INDEX: i32 = + CAPACITY as i32 + buffer_descriptor::TAIL_INTENT_COUNTER_OFFSET; +const TAIL_COUNTER_INDEX: i32 = CAPACITY as i32 + buffer_descriptor::TAIL_COUNTER_OFFSET; +const LATEST_COUNTER_INDEX: i32 = CAPACITY as i32 + buffer_descriptor::LATEST_COUNTER_OFFSET; // NOTE: The C++ tests use a mock atomic buffer for testing to validate behavior. -// This is rather hard to do with Rust, so we skip behavior validation for now, -// and assume that other tests will end up verifying needed behavior. +// I haven't implemented this in Rust mostly because it's a great deal of work, +// but it means we're not verifying that BroadcastReceiver uses the properly +// synchronized method calls on the underlying buffer. #[test] fn should_calculate_capacity_for_buffer() { @@ -32,3 +42,152 @@ fn should_not_receive_from_empty_buffer() { let mut receiver = BroadcastReceiver::new(vec![0u8; TOTAL_BUFFER_LENGTH]).unwrap(); assert_eq!(receiver.receive_next(), Ok(false)); } + +#[test] +fn should_receive_first_message_from_buffer() { + let length: i32 = 8; + let record_length: i32 = length + record_descriptor::HEADER_LENGTH; + let aligned_record_length: i32 = align( + record_length as usize, + record_descriptor::RECORD_ALIGNMENT as usize, + ) as i32; + let tail = aligned_record_length as i64; + let latest_record = tail - aligned_record_length as i64; + let record_offset = latest_record as i32; + + let mut buffer = vec![0u8; TOTAL_BUFFER_LENGTH]; + buffer.put_i64(TAIL_COUNTER_INDEX, tail).unwrap(); + buffer.put_i64(TAIL_INTENT_COUNTER_INDEX, tail).unwrap(); + buffer + .put_i32( + record_descriptor::length_offset(record_offset), + record_length, + ) + .unwrap(); + buffer + .put_i32(record_descriptor::type_offset(record_offset), MSG_TYPE_ID) + .unwrap(); + + let mut receiver = BroadcastReceiver::new(buffer).unwrap(); + assert_eq!(receiver.receive_next(), Ok(true)); + assert_eq!(receiver.msg_type_id(), Ok(MSG_TYPE_ID)); + assert_eq!( + receiver.offset(), + record_descriptor::msg_offset(record_offset) + ); + assert_eq!(receiver.length(), Ok(length)); + assert!(receiver.validate()); +} + +#[test] +fn should_receive_two_messages_from_buffer() { + let length: i32 = 8; + let record_length: i32 = length + record_descriptor::HEADER_LENGTH; + let aligned_record_length: i32 = align( + record_length as usize, + record_descriptor::RECORD_ALIGNMENT as usize, + ) as i32; + let tail = (aligned_record_length * 2) as i64; + let latest_record = tail - aligned_record_length as i64; + let record_offset_one = 0; + let record_offset_two = latest_record as i32; + + let mut buffer = vec![0u8; TOTAL_BUFFER_LENGTH]; + buffer.put_i64(TAIL_COUNTER_INDEX, tail).unwrap(); + buffer.put_i64(TAIL_INTENT_COUNTER_INDEX, tail).unwrap(); + + buffer + .put_i32( + record_descriptor::length_offset(record_offset_one), + record_length, + ) + .unwrap(); + buffer + .put_i32( + record_descriptor::type_offset(record_offset_one), + MSG_TYPE_ID, + ) + .unwrap(); + + buffer + .put_i32( + record_descriptor::length_offset(record_offset_two), + record_length, + ) + .unwrap(); + buffer + .put_i32( + record_descriptor::type_offset(record_offset_two), + MSG_TYPE_ID, + ) + .unwrap(); + + let mut receiver = BroadcastReceiver::new(buffer).unwrap(); + assert_eq!(receiver.receive_next(), Ok(true)); + assert_eq!(receiver.msg_type_id(), Ok(MSG_TYPE_ID)); + assert_eq!( + receiver.offset(), + record_descriptor::msg_offset(record_offset_one) + ); + assert_eq!(receiver.length(), Ok(length)); + assert!(receiver.validate()); + + assert_eq!(receiver.receive_next(), Ok(true)); + assert_eq!(receiver.msg_type_id(), Ok(MSG_TYPE_ID)); + assert_eq!( + receiver.offset(), + record_descriptor::msg_offset(record_offset_two) + ); + assert_eq!(receiver.length(), Ok(length)); + assert!(receiver.validate()); +} + +#[test] +fn should_late_join_transmission() { + let length: i32 = 8; + let record_length: i32 = length + record_descriptor::HEADER_LENGTH; + let aligned_record_length: i32 = align( + record_length as usize, + record_descriptor::RECORD_ALIGNMENT as usize, + ) as i32; + let tail = (CAPACITY * 3) as i64 + + record_descriptor::HEADER_LENGTH as i64 + + aligned_record_length as i64; + let latest_record = tail - aligned_record_length as i64; + let record_offset = latest_record as i32 & (CAPACITY - 1) as i32; + + let mut buffer = vec![0u8; TOTAL_BUFFER_LENGTH]; + // In order to properly do this test, we have to initialize the broadcast receiver + // while the buffer is empty, and then write into the buffer afterward. Rust is understandably + // not happy about that, but that's the price we pay for not dealing with mocking. + let receiver_buffer = + unsafe { ::std::slice::from_raw_parts_mut(buffer.as_mut_ptr(), buffer.len()) }; + let mut receiver = BroadcastReceiver::new(receiver_buffer).unwrap(); + + buffer.put_i64(TAIL_COUNTER_INDEX, tail).unwrap(); + buffer.put_i64(TAIL_INTENT_COUNTER_INDEX, tail).unwrap(); + buffer.put_i64(LATEST_COUNTER_INDEX, latest_record).unwrap(); + + buffer + .put_i32( + record_descriptor::length_offset(record_offset), + record_length, + ) + .unwrap(); + buffer + .put_i32(record_descriptor::type_offset(record_offset), MSG_TYPE_ID) + .unwrap(); + + assert_eq!(receiver.receive_next(), Ok(true)); + assert_eq!(receiver.msg_type_id(), Ok(MSG_TYPE_ID)); + assert_eq!( + receiver.offset(), + record_descriptor::msg_offset(record_offset) + ); + assert_eq!(receiver.length(), Ok(length)); + assert!(receiver.validate()); + assert!(receiver.lapped_count() > 0); +} + +// TODO: Implement the rest of the tests +// Currently not done because of the need to mock the AtomicBuffer