Add the copy broadcast receiver

pull/19/head
Bradlee Speice 2019-11-02 23:12:36 -04:00
parent 4ad3dbcecd
commit 6df284f73c
2 changed files with 96 additions and 4 deletions

View File

@ -2,7 +2,7 @@
//! of receiving data.
use crate::client::concurrent::AtomicBuffer;
use crate::util::bit::align;
use crate::util::{IndexT, Result};
use crate::util::{AeronError, IndexT, Result};
use std::sync::atomic::{AtomicI64, Ordering};
/// Description of the broadcast buffer schema
@ -69,7 +69,8 @@ pub mod record_descriptor {
}
}
/// Receive messages from a transmission stream
/// Receive messages from a transmission stream. Works by polling `receive_next`
/// until `true` is returned, then inspecting messages using the provided methods.
pub struct BroadcastReceiver<A>
where
A: AtomicBuffer,
@ -137,7 +138,7 @@ where
if tail > cursor {
// The way we set `record_offset` is slightly different from C++;
// Clippy was yelling at me, and I think this makes more sense.
if !self.validate(cursor) {
if !self._validate(cursor) {
self.lapped_count.fetch_add(1, Ordering::SeqCst);
cursor = self.buffer.get_i64(self.latest_counter_index)?;
}
@ -174,7 +175,34 @@ where
Ok(is_available)
}
fn validate(&self, cursor: i64) -> bool {
/// Get the length of the message in the current record
pub fn length(&self) -> Result<i32> {
Ok(self
.buffer
.get_i32(record_descriptor::length_offset(self.record_offset))?
- record_descriptor::HEADER_LENGTH)
}
/// Get the offset to the message content in the current record
pub fn offset(&self) -> i32 {
record_descriptor::msg_offset(self.record_offset)
}
/// Ensure that the current received record is still valid and has not been
/// overwritten.
pub fn validate(&self) -> bool {
// QUESTION: C++ uses `atomic::acquire()` here, what does that do?
self._validate(self.cursor)
}
/// Get the message type identifier for the current record
pub fn msg_type_id(&self) -> Result<i32> {
Ok(self
.buffer
.get_i32(record_descriptor::type_offset(self.record_offset))?)
}
fn _validate(&self, cursor: i64) -> bool {
// UNWRAP: Length checks performed during initialization
(cursor + i64::from(self.capacity))
> self
@ -183,3 +211,65 @@ where
.unwrap()
}
}
/// Broadcast receiver that copies messages to an internal buffer.
///
/// The benefit of copying every message is that we keep a consistent view of the data
/// even if we're lapped while reading. However, this may be overkill if you can
/// guarantee the stream never outpaces you.
pub struct CopyBroadcastReceiver<A>
where
A: AtomicBuffer,
{
receiver: BroadcastReceiver<A>,
scratch: Vec<u8>,
}
impl<A> CopyBroadcastReceiver<A>
where
A: AtomicBuffer,
{
/// Create a new broadcast receiver
pub fn new(receiver: BroadcastReceiver<A>) -> Self {
CopyBroadcastReceiver {
receiver,
scratch: Vec::with_capacity(4096),
}
}
/// Attempt to receive a single message from the broadcast buffer,
/// and deliver it to the message handler if successful.
/// Returns the number of messages received.
pub fn receive<F>(&mut self, mut handler: F) -> Result<i32>
where
F: FnMut(i32, &[u8]) -> (),
{
let mut messages_received = 0;
let last_seen_lapped_count = self.receiver.lapped_count();
if self.receiver.receive_next()? {
if last_seen_lapped_count != self.receiver.lapped_count() {
// The C++ API uses IllegalArgument here, but returns IllegalState
// with the same message later.
return Err(AeronError::IllegalState);
}
let length = self.receiver.length()?;
if length > AtomicBuffer::capacity(&self.scratch) {
return Err(AeronError::IllegalState);
}
let msg_type_id = self.receiver.msg_type_id()?;
self.scratch
.put_bytes(0, &self.receiver.buffer, self.receiver.offset(), length)?;
if !self.receiver.validate() {
return Err(AeronError::IllegalState);
}
handler(msg_type_id, &self.scratch[0..length as usize]);
messages_received += 1;
}
Ok(messages_received)
}
}

View File

@ -16,6 +16,8 @@ pub enum AeronError {
OutOfBounds,
/// Indication that a buffer operation could not complete because of space constraints
InsufficientCapacity,
/// Indication that we have reached an invalid state and can't continue processing
IllegalState,
}
/// Result type for operations in the Aeron client