diff --git a/aeron-rs/src/client/concurrent/broadcast.rs b/aeron-rs/src/client/concurrent/broadcast.rs index 5156e6a..2824a7b 100644 --- a/aeron-rs/src/client/concurrent/broadcast.rs +++ b/aeron-rs/src/client/concurrent/broadcast.rs @@ -2,7 +2,7 @@ //! of receiving data. use crate::client::concurrent::AtomicBuffer; use crate::util::bit::align; -use crate::util::{IndexT, Result}; +use crate::util::{AeronError, IndexT, Result}; use std::sync::atomic::{AtomicI64, Ordering}; /// Description of the broadcast buffer schema @@ -69,7 +69,8 @@ pub mod record_descriptor { } } -/// Receive messages from a transmission stream +/// Receive messages from a transmission stream. Works by polling `receive_next` +/// until `true` is returned, then inspecting messages using the provided methods. pub struct BroadcastReceiver where A: AtomicBuffer, @@ -137,7 +138,7 @@ where if tail > cursor { // The way we set `record_offset` is slightly different from C++; // Clippy was yelling at me, and I think this makes more sense. - if !self.validate(cursor) { + if !self._validate(cursor) { self.lapped_count.fetch_add(1, Ordering::SeqCst); cursor = self.buffer.get_i64(self.latest_counter_index)?; } @@ -174,7 +175,34 @@ where Ok(is_available) } - fn validate(&self, cursor: i64) -> bool { + /// Get the length of the message in the current record + pub fn length(&self) -> Result { + Ok(self + .buffer + .get_i32(record_descriptor::length_offset(self.record_offset))? + - record_descriptor::HEADER_LENGTH) + } + + /// Get the offset to the message content in the current record + pub fn offset(&self) -> i32 { + record_descriptor::msg_offset(self.record_offset) + } + + /// Ensure that the current received record is still valid and has not been + /// overwritten. + pub fn validate(&self) -> bool { + // QUESTION: C++ uses `atomic::acquire()` here, what does that do? + self._validate(self.cursor) + } + + /// Get the message type identifier for the current record + pub fn msg_type_id(&self) -> Result { + Ok(self + .buffer + .get_i32(record_descriptor::type_offset(self.record_offset))?) + } + + fn _validate(&self, cursor: i64) -> bool { // UNWRAP: Length checks performed during initialization (cursor + i64::from(self.capacity)) > self @@ -183,3 +211,65 @@ where .unwrap() } } + +/// Broadcast receiver that copies messages to an internal buffer. +/// +/// The benefit of copying every message is that we keep a consistent view of the data +/// even if we're lapped while reading. However, this may be overkill if you can +/// guarantee the stream never outpaces you. +pub struct CopyBroadcastReceiver +where + A: AtomicBuffer, +{ + receiver: BroadcastReceiver, + scratch: Vec, +} + +impl CopyBroadcastReceiver +where + A: AtomicBuffer, +{ + /// Create a new broadcast receiver + pub fn new(receiver: BroadcastReceiver) -> Self { + CopyBroadcastReceiver { + receiver, + scratch: Vec::with_capacity(4096), + } + } + + /// Attempt to receive a single message from the broadcast buffer, + /// and deliver it to the message handler if successful. + /// Returns the number of messages received. + pub fn receive(&mut self, mut handler: F) -> Result + where + F: FnMut(i32, &[u8]) -> (), + { + let mut messages_received = 0; + let last_seen_lapped_count = self.receiver.lapped_count(); + + if self.receiver.receive_next()? { + if last_seen_lapped_count != self.receiver.lapped_count() { + // The C++ API uses IllegalArgument here, but returns IllegalState + // with the same message later. + return Err(AeronError::IllegalState); + } + + let length = self.receiver.length()?; + if length > AtomicBuffer::capacity(&self.scratch) { + return Err(AeronError::IllegalState); + } + + let msg_type_id = self.receiver.msg_type_id()?; + self.scratch + .put_bytes(0, &self.receiver.buffer, self.receiver.offset(), length)?; + + if !self.receiver.validate() { + return Err(AeronError::IllegalState); + } + handler(msg_type_id, &self.scratch[0..length as usize]); + messages_received += 1; + } + + Ok(messages_received) + } +} diff --git a/aeron-rs/src/util.rs b/aeron-rs/src/util.rs index 44c8cfb..12d5337 100644 --- a/aeron-rs/src/util.rs +++ b/aeron-rs/src/util.rs @@ -16,6 +16,8 @@ pub enum AeronError { OutOfBounds, /// Indication that a buffer operation could not complete because of space constraints InsufficientCapacity, + /// Indication that we have reached an invalid state and can't continue processing + IllegalState, } /// Result type for operations in the Aeron client