diff --git a/aeron-rs/src/client/concurrent/broadcast.rs b/aeron-rs/src/client/concurrent/broadcast.rs index 2824a7b..42cac44 100644 --- a/aeron-rs/src/client/concurrent/broadcast.rs +++ b/aeron-rs/src/client/concurrent/broadcast.rs @@ -11,11 +11,14 @@ pub mod buffer_descriptor { use crate::util::{AeronError, IndexT, Result}; use std::mem::size_of; - pub(super) const TAIL_INTENT_COUNTER_OFFSET: IndexT = 0; - pub(super) const TAIL_COUNTER_OFFSET: IndexT = - TAIL_INTENT_COUNTER_OFFSET + size_of::() as IndexT; - pub(super) const LATEST_COUNTER_OFFSET: IndexT = - TAIL_COUNTER_OFFSET + size_of::() as IndexT; + /// Offset within the trailer for the tail intended value + pub const TAIL_INTENT_COUNTER_OFFSET: IndexT = 0; + + /// Offset within the trailer for the tail value + pub const TAIL_COUNTER_OFFSET: IndexT = TAIL_INTENT_COUNTER_OFFSET + size_of::() as IndexT; + + /// Offset within the buffer trailer for the latest sequence value + pub const LATEST_COUNTER_OFFSET: IndexT = TAIL_COUNTER_OFFSET + size_of::() as IndexT; /// Size of the broadcast buffer metadata trailer pub const TRAILER_LENGTH: IndexT = CACHE_LINE_LENGTH as IndexT * 2; @@ -95,10 +98,10 @@ where pub fn new(buffer: A) -> Result { let capacity = buffer.capacity() - buffer_descriptor::TRAILER_LENGTH; buffer_descriptor::check_capacity(capacity)?; + let mask = capacity - 1; let latest_counter_index = capacity + buffer_descriptor::LATEST_COUNTER_OFFSET; let cursor = buffer.get_i64(latest_counter_index)?; - let mask = capacity - 1; Ok(BroadcastReceiver { buffer, @@ -136,8 +139,8 @@ where let mut cursor: i64 = self.next_record; if tail > cursor { - // The way we set `record_offset` is slightly different from C++; - // Clippy was yelling at me, and I think this makes more sense. + // NOTE: C++ and Java clients do these first lines slightly differently. As far + // as I can tell, this is structurally equivalent, and Clippy yells less at me. if !self._validate(cursor) { self.lapped_count.fetch_add(1, Ordering::SeqCst); cursor = self.buffer.get_i64(self.latest_counter_index)?; diff --git a/aeron-rs/tests/broadcast_receiver.rs b/aeron-rs/tests/broadcast_receiver.rs index 131a7c7..a64c8ec 100644 --- a/aeron-rs/tests/broadcast_receiver.rs +++ b/aeron-rs/tests/broadcast_receiver.rs @@ -1,12 +1,22 @@ -use aeron_rs::client::concurrent::broadcast::{buffer_descriptor, BroadcastReceiver}; +use aeron_rs::client::concurrent::broadcast::{ + buffer_descriptor, record_descriptor, BroadcastReceiver, +}; +use aeron_rs::client::concurrent::AtomicBuffer; +use aeron_rs::util::bit::align; use aeron_rs::util::IndexT; const CAPACITY: usize = 1024; const TOTAL_BUFFER_LENGTH: usize = CAPACITY + buffer_descriptor::TRAILER_LENGTH as usize; +const MSG_TYPE_ID: i32 = 7; +const TAIL_INTENT_COUNTER_INDEX: i32 = + CAPACITY as i32 + buffer_descriptor::TAIL_INTENT_COUNTER_OFFSET; +const TAIL_COUNTER_INDEX: i32 = CAPACITY as i32 + buffer_descriptor::TAIL_COUNTER_OFFSET; +const LATEST_COUNTER_INDEX: i32 = CAPACITY as i32 + buffer_descriptor::LATEST_COUNTER_OFFSET; // NOTE: The C++ tests use a mock atomic buffer for testing to validate behavior. -// This is rather hard to do with Rust, so we skip behavior validation for now, -// and assume that other tests will end up verifying needed behavior. +// I haven't implemented this in Rust mostly because it's a great deal of work, +// but it means we're not verifying that BroadcastReceiver uses the properly +// synchronized method calls on the underlying buffer. #[test] fn should_calculate_capacity_for_buffer() { @@ -32,3 +42,152 @@ fn should_not_receive_from_empty_buffer() { let mut receiver = BroadcastReceiver::new(vec![0u8; TOTAL_BUFFER_LENGTH]).unwrap(); assert_eq!(receiver.receive_next(), Ok(false)); } + +#[test] +fn should_receive_first_message_from_buffer() { + let length: i32 = 8; + let record_length: i32 = length + record_descriptor::HEADER_LENGTH; + let aligned_record_length: i32 = align( + record_length as usize, + record_descriptor::RECORD_ALIGNMENT as usize, + ) as i32; + let tail = aligned_record_length as i64; + let latest_record = tail - aligned_record_length as i64; + let record_offset = latest_record as i32; + + let mut buffer = vec![0u8; TOTAL_BUFFER_LENGTH]; + buffer.put_i64(TAIL_COUNTER_INDEX, tail).unwrap(); + buffer.put_i64(TAIL_INTENT_COUNTER_INDEX, tail).unwrap(); + buffer + .put_i32( + record_descriptor::length_offset(record_offset), + record_length, + ) + .unwrap(); + buffer + .put_i32(record_descriptor::type_offset(record_offset), MSG_TYPE_ID) + .unwrap(); + + let mut receiver = BroadcastReceiver::new(buffer).unwrap(); + assert_eq!(receiver.receive_next(), Ok(true)); + assert_eq!(receiver.msg_type_id(), Ok(MSG_TYPE_ID)); + assert_eq!( + receiver.offset(), + record_descriptor::msg_offset(record_offset) + ); + assert_eq!(receiver.length(), Ok(length)); + assert!(receiver.validate()); +} + +#[test] +fn should_receive_two_messages_from_buffer() { + let length: i32 = 8; + let record_length: i32 = length + record_descriptor::HEADER_LENGTH; + let aligned_record_length: i32 = align( + record_length as usize, + record_descriptor::RECORD_ALIGNMENT as usize, + ) as i32; + let tail = (aligned_record_length * 2) as i64; + let latest_record = tail - aligned_record_length as i64; + let record_offset_one = 0; + let record_offset_two = latest_record as i32; + + let mut buffer = vec![0u8; TOTAL_BUFFER_LENGTH]; + buffer.put_i64(TAIL_COUNTER_INDEX, tail).unwrap(); + buffer.put_i64(TAIL_INTENT_COUNTER_INDEX, tail).unwrap(); + + buffer + .put_i32( + record_descriptor::length_offset(record_offset_one), + record_length, + ) + .unwrap(); + buffer + .put_i32( + record_descriptor::type_offset(record_offset_one), + MSG_TYPE_ID, + ) + .unwrap(); + + buffer + .put_i32( + record_descriptor::length_offset(record_offset_two), + record_length, + ) + .unwrap(); + buffer + .put_i32( + record_descriptor::type_offset(record_offset_two), + MSG_TYPE_ID, + ) + .unwrap(); + + let mut receiver = BroadcastReceiver::new(buffer).unwrap(); + assert_eq!(receiver.receive_next(), Ok(true)); + assert_eq!(receiver.msg_type_id(), Ok(MSG_TYPE_ID)); + assert_eq!( + receiver.offset(), + record_descriptor::msg_offset(record_offset_one) + ); + assert_eq!(receiver.length(), Ok(length)); + assert!(receiver.validate()); + + assert_eq!(receiver.receive_next(), Ok(true)); + assert_eq!(receiver.msg_type_id(), Ok(MSG_TYPE_ID)); + assert_eq!( + receiver.offset(), + record_descriptor::msg_offset(record_offset_two) + ); + assert_eq!(receiver.length(), Ok(length)); + assert!(receiver.validate()); +} + +#[test] +fn should_late_join_transmission() { + let length: i32 = 8; + let record_length: i32 = length + record_descriptor::HEADER_LENGTH; + let aligned_record_length: i32 = align( + record_length as usize, + record_descriptor::RECORD_ALIGNMENT as usize, + ) as i32; + let tail = (CAPACITY * 3) as i64 + + record_descriptor::HEADER_LENGTH as i64 + + aligned_record_length as i64; + let latest_record = tail - aligned_record_length as i64; + let record_offset = latest_record as i32 & (CAPACITY - 1) as i32; + + let mut buffer = vec![0u8; TOTAL_BUFFER_LENGTH]; + // In order to properly do this test, we have to initialize the broadcast receiver + // while the buffer is empty, and then write into the buffer afterward. Rust is understandably + // not happy about that, but that's the price we pay for not dealing with mocking. + let receiver_buffer = + unsafe { ::std::slice::from_raw_parts_mut(buffer.as_mut_ptr(), buffer.len()) }; + let mut receiver = BroadcastReceiver::new(receiver_buffer).unwrap(); + + buffer.put_i64(TAIL_COUNTER_INDEX, tail).unwrap(); + buffer.put_i64(TAIL_INTENT_COUNTER_INDEX, tail).unwrap(); + buffer.put_i64(LATEST_COUNTER_INDEX, latest_record).unwrap(); + + buffer + .put_i32( + record_descriptor::length_offset(record_offset), + record_length, + ) + .unwrap(); + buffer + .put_i32(record_descriptor::type_offset(record_offset), MSG_TYPE_ID) + .unwrap(); + + assert_eq!(receiver.receive_next(), Ok(true)); + assert_eq!(receiver.msg_type_id(), Ok(MSG_TYPE_ID)); + assert_eq!( + receiver.offset(), + record_descriptor::msg_offset(record_offset) + ); + assert_eq!(receiver.length(), Ok(length)); + assert!(receiver.validate()); + assert!(receiver.lapped_count() > 0); +} + +// TODO: Implement the rest of the tests +// Currently not done because of the need to mock the AtomicBuffer