19: Broadcast receiver r=bspeice a=bspeice

Doesn't include some tests from Aeron/Agrona because of the complexity of mocking `AtomicBuffer`. Will eventually need to do more work to test this, but it's sufficient for now.
bors r+

Co-authored-by: Bradlee Speice <bradlee@speice.io>
pull/20/head
bors[bot] 2019-11-03 04:52:33 +00:00 committed by GitHub
commit 15969581b6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 480 additions and 9 deletions

View File

@ -0,0 +1,278 @@
//! Read messages that are broadcast from the media driver; this is the primary means
//! of receiving data.
use crate::client::concurrent::AtomicBuffer;
use crate::util::bit::align;
use crate::util::{AeronError, IndexT, Result};
use std::sync::atomic::{AtomicI64, Ordering};
/// Description of the broadcast buffer schema
pub mod buffer_descriptor {
use crate::util::bit::{is_power_of_two, CACHE_LINE_LENGTH};
use crate::util::{AeronError, IndexT, Result};
use std::mem::size_of;
/// Offset within the trailer for the tail intended value
pub const TAIL_INTENT_COUNTER_OFFSET: IndexT = 0;
/// Offset within the trailer for the tail value
pub const TAIL_COUNTER_OFFSET: IndexT = TAIL_INTENT_COUNTER_OFFSET + size_of::<i64>() as IndexT;
/// Offset within the buffer trailer for the latest sequence value
pub const LATEST_COUNTER_OFFSET: IndexT = TAIL_COUNTER_OFFSET + size_of::<i64>() as IndexT;
/// Size of the broadcast buffer metadata trailer
pub const TRAILER_LENGTH: IndexT = CACHE_LINE_LENGTH as IndexT * 2;
pub(super) fn check_capacity(capacity: IndexT) -> Result<()> {
// QUESTION: Why does C++ throw IllegalState here?
// Would've expected it to throw IllegalArgument like ring buffer
if is_power_of_two(capacity) {
Ok(())
} else {
Err(AeronError::IllegalArgument)
}
}
}
/// Broadcast buffer message header
// QUESTION: Isn't this the same as the ring buffer descriptor?
// Why not consolidate them?
pub mod record_descriptor {
use crate::util::IndexT;
/// Message type to indicate a record used only
/// for padding the buffer
pub const PADDING_MSG_TYPE_ID: i32 = -1;
/// Offset from the beginning of a record to its length
pub const LENGTH_OFFSET: IndexT = 0;
/// Offset from the beginning of a record to its type
pub const TYPE_OFFSET: IndexT = 4;
/// Total header length for each record
pub const HEADER_LENGTH: IndexT = 8;
/// Alignment for all broadcast records
pub const RECORD_ALIGNMENT: IndexT = HEADER_LENGTH;
/// Retrieve the byte offset for a record's length field given the record start
pub fn length_offset(record_offset: IndexT) -> IndexT {
record_offset + LENGTH_OFFSET
}
/// Retrieve the byte offset for a record's type field given the record start
pub fn type_offset(record_offset: IndexT) -> IndexT {
record_offset + TYPE_OFFSET
}
/// Retrieve the byte offset for a record's message given the record start
pub fn msg_offset(record_offset: IndexT) -> IndexT {
record_offset + HEADER_LENGTH
}
}
/// Receive messages from a transmission stream. Works by polling `receive_next`
/// until `true` is returned, then inspecting messages using the provided methods.
pub struct BroadcastReceiver<A>
where
A: AtomicBuffer,
{
buffer: A,
capacity: IndexT,
mask: IndexT,
tail_intent_counter_index: IndexT,
tail_counter_index: IndexT,
latest_counter_index: IndexT,
record_offset: IndexT,
cursor: i64,
next_record: i64,
lapped_count: AtomicI64,
}
impl<A> BroadcastReceiver<A>
where
A: AtomicBuffer,
{
/// Create a new receiver backed by `buffer`
pub fn new(buffer: A) -> Result<Self> {
let capacity = buffer.capacity() - buffer_descriptor::TRAILER_LENGTH;
buffer_descriptor::check_capacity(capacity)?;
let mask = capacity - 1;
let latest_counter_index = capacity + buffer_descriptor::LATEST_COUNTER_OFFSET;
let cursor = buffer.get_i64(latest_counter_index)?;
Ok(BroadcastReceiver {
buffer,
capacity,
mask,
tail_intent_counter_index: capacity + buffer_descriptor::TAIL_INTENT_COUNTER_OFFSET,
tail_counter_index: capacity + buffer_descriptor::TAIL_COUNTER_OFFSET,
latest_counter_index,
record_offset: (cursor as i32) & mask,
cursor,
next_record: cursor,
lapped_count: AtomicI64::new(0),
})
}
/// Get the total capacity of this broadcast receiver
pub fn capacity(&self) -> IndexT {
self.capacity
}
/// Get the number of times the transmitter has lapped this receiver. Each lap
/// represents at least a buffer's worth of lost data.
pub fn lapped_count(&self) -> i64 {
// QUESTION: C++ just uses `std::atomic<long>`, what are the ordering semantics?
// For right now I'm just assuming it's sequentially consistent
self.lapped_count.load(Ordering::SeqCst)
}
/// Non-blocking receive of next message from the transmission stream.
/// If loss has occurred, `lapped_count` will be incremented. Returns `true`
/// if the next transmission is available, `false` otherwise.
pub fn receive_next(&mut self) -> Result<bool> {
let mut is_available = false;
let tail: i64 = self.buffer.get_i64_volatile(self.tail_counter_index)?;
let mut cursor: i64 = self.next_record;
if tail > cursor {
// NOTE: C++ and Java clients do these first lines slightly differently. As far
// as I can tell, this is structurally equivalent, and Clippy yells less at me.
if !self._validate(cursor) {
self.lapped_count.fetch_add(1, Ordering::SeqCst);
cursor = self.buffer.get_i64(self.latest_counter_index)?;
}
let mut record_offset = (cursor as i32) & self.mask;
self.cursor = cursor;
self.next_record = cursor
+ align(
self.buffer
.get_i32(record_descriptor::length_offset(record_offset))?
as usize,
record_descriptor::RECORD_ALIGNMENT as usize,
) as i64;
if record_descriptor::PADDING_MSG_TYPE_ID
== self
.buffer
.get_i32(record_descriptor::type_offset(record_offset))?
{
record_offset = 0;
self.cursor = self.next_record;
self.next_record += align(
self.buffer
.get_i32(record_descriptor::length_offset(record_offset))?
as usize,
record_descriptor::RECORD_ALIGNMENT as usize,
) as i64;
}
self.record_offset = record_offset;
is_available = true;
}
Ok(is_available)
}
/// Get the length of the message in the current record
pub fn length(&self) -> Result<i32> {
Ok(self
.buffer
.get_i32(record_descriptor::length_offset(self.record_offset))?
- record_descriptor::HEADER_LENGTH)
}
/// Get the offset to the message content in the current record
pub fn offset(&self) -> i32 {
record_descriptor::msg_offset(self.record_offset)
}
/// Ensure that the current received record is still valid and has not been
/// overwritten.
pub fn validate(&self) -> bool {
// QUESTION: C++ uses `atomic::acquire()` here, what does that do?
self._validate(self.cursor)
}
/// Get the message type identifier for the current record
pub fn msg_type_id(&self) -> Result<i32> {
Ok(self
.buffer
.get_i32(record_descriptor::type_offset(self.record_offset))?)
}
fn _validate(&self, cursor: i64) -> bool {
// UNWRAP: Length checks performed during initialization
(cursor + i64::from(self.capacity))
> self
.buffer
.get_i64_volatile(self.tail_intent_counter_index)
.unwrap()
}
}
/// Broadcast receiver that copies messages to an internal buffer.
///
/// The benefit of copying every message is that we keep a consistent view of the data
/// even if we're lapped while reading. However, this may be overkill if you can
/// guarantee the stream never outpaces you.
pub struct CopyBroadcastReceiver<A>
where
A: AtomicBuffer,
{
receiver: BroadcastReceiver<A>,
scratch: Vec<u8>,
}
impl<A> CopyBroadcastReceiver<A>
where
A: AtomicBuffer,
{
/// Create a new broadcast receiver
pub fn new(receiver: BroadcastReceiver<A>) -> Self {
CopyBroadcastReceiver {
receiver,
scratch: Vec::with_capacity(4096),
}
}
/// Attempt to receive a single message from the broadcast buffer,
/// and deliver it to the message handler if successful.
/// Returns the number of messages received.
pub fn receive<F>(&mut self, mut handler: F) -> Result<i32>
where
F: FnMut(i32, &[u8]) -> (),
{
let mut messages_received = 0;
let last_seen_lapped_count = self.receiver.lapped_count();
if self.receiver.receive_next()? {
if last_seen_lapped_count != self.receiver.lapped_count() {
// The C++ API uses IllegalArgument here, but returns IllegalState
// with the same message later.
return Err(AeronError::IllegalState);
}
let length = self.receiver.length()?;
if length > AtomicBuffer::capacity(&self.scratch) {
return Err(AeronError::IllegalState);
}
let msg_type_id = self.receiver.msg_type_id()?;
self.scratch
.put_bytes(0, &self.receiver.buffer, self.receiver.offset(), length)?;
if !self.receiver.validate() {
return Err(AeronError::IllegalState);
}
handler(msg_type_id, &self.scratch[0..length as usize]);
messages_received += 1;
}
Ok(messages_received)
}
}

View File

@ -1,6 +1,7 @@
//! Module for handling safe interactions among the multiple clients making use
//! of a single Media Driver
pub mod broadcast;
pub mod ringbuffer;
use std::mem::size_of;
use std::sync::atomic::{AtomicI64, Ordering};

View File

@ -4,9 +4,8 @@ use crate::util::bit::align;
use crate::util::{bit, AeronError, IndexT, Result};
use std::ops::{Deref, DerefMut};
/// Description of the Ring Buffer schema.
/// Description of the ring buffer schema
pub mod buffer_descriptor {
use crate::client::concurrent::AtomicBuffer;
use crate::util::bit::{is_power_of_two, CACHE_LINE_LENGTH};
use crate::util::AeronError::IllegalArgument;
use crate::util::{IndexT, Result};
@ -31,13 +30,9 @@ pub mod buffer_descriptor {
/// Verify the capacity of a buffer is legal for use as a ring buffer.
/// Returns the actual capacity excluding ring buffer metadata.
pub fn check_capacity<A>(buffer: &A) -> Result<IndexT>
where
A: AtomicBuffer,
{
let capacity = (buffer.len() - TRAILER_LENGTH as usize) as IndexT;
pub fn check_capacity(capacity: IndexT) -> Result<()> {
if is_power_of_two(capacity) {
Ok(capacity)
Ok(())
} else {
Err(IllegalArgument)
}
@ -138,7 +133,9 @@ where
{
/// Create a many-to-one ring buffer from an underlying atomic buffer.
pub fn new(buffer: A) -> Result<Self> {
buffer_descriptor::check_capacity(&buffer).map(|capacity| ManyToOneRingBuffer {
let capacity = buffer.capacity() - buffer_descriptor::TRAILER_LENGTH;
buffer_descriptor::check_capacity(capacity)?;
Ok(ManyToOneRingBuffer {
buffer,
capacity,
max_msg_length: capacity / 8,

View File

@ -16,6 +16,8 @@ pub enum AeronError {
OutOfBounds,
/// Indication that a buffer operation could not complete because of space constraints
InsufficientCapacity,
/// Indication that we have reached an invalid state and can't continue processing
IllegalState,
}
/// Result type for operations in the Aeron client

View File

@ -0,0 +1,193 @@
use aeron_rs::client::concurrent::broadcast::{
buffer_descriptor, record_descriptor, BroadcastReceiver,
};
use aeron_rs::client::concurrent::AtomicBuffer;
use aeron_rs::util::bit::align;
use aeron_rs::util::IndexT;
const CAPACITY: usize = 1024;
const TOTAL_BUFFER_LENGTH: usize = CAPACITY + buffer_descriptor::TRAILER_LENGTH as usize;
const MSG_TYPE_ID: i32 = 7;
const TAIL_INTENT_COUNTER_INDEX: i32 =
CAPACITY as i32 + buffer_descriptor::TAIL_INTENT_COUNTER_OFFSET;
const TAIL_COUNTER_INDEX: i32 = CAPACITY as i32 + buffer_descriptor::TAIL_COUNTER_OFFSET;
const LATEST_COUNTER_INDEX: i32 = CAPACITY as i32 + buffer_descriptor::LATEST_COUNTER_OFFSET;
// NOTE: The C++ tests use a mock atomic buffer for testing to validate behavior.
// I haven't implemented this in Rust mostly because it's a great deal of work,
// but it means we're not verifying that BroadcastReceiver uses the properly
// synchronized method calls on the underlying buffer.
#[test]
fn should_calculate_capacity_for_buffer() {
let buffer = BroadcastReceiver::new(vec![0u8; TOTAL_BUFFER_LENGTH]).unwrap();
assert_eq!(buffer.capacity(), CAPACITY as IndexT);
}
#[test]
fn should_throw_exception_for_capacity_that_is_not_power_of_two() {
let bytes = vec![0u8; 777 + buffer_descriptor::TRAILER_LENGTH as usize];
assert!(BroadcastReceiver::new(bytes).is_err());
}
#[test]
fn should_not_be_lapped_before_reception() {
let receiver = BroadcastReceiver::new(vec![0u8; TOTAL_BUFFER_LENGTH]).unwrap();
assert_eq!(receiver.lapped_count(), 0);
}
#[test]
fn should_not_receive_from_empty_buffer() {
let mut receiver = BroadcastReceiver::new(vec![0u8; TOTAL_BUFFER_LENGTH]).unwrap();
assert_eq!(receiver.receive_next(), Ok(false));
}
#[test]
fn should_receive_first_message_from_buffer() {
let length: i32 = 8;
let record_length: i32 = length + record_descriptor::HEADER_LENGTH;
let aligned_record_length: i32 = align(
record_length as usize,
record_descriptor::RECORD_ALIGNMENT as usize,
) as i32;
let tail = aligned_record_length as i64;
let latest_record = tail - aligned_record_length as i64;
let record_offset = latest_record as i32;
let mut buffer = vec![0u8; TOTAL_BUFFER_LENGTH];
buffer.put_i64(TAIL_COUNTER_INDEX, tail).unwrap();
buffer.put_i64(TAIL_INTENT_COUNTER_INDEX, tail).unwrap();
buffer
.put_i32(
record_descriptor::length_offset(record_offset),
record_length,
)
.unwrap();
buffer
.put_i32(record_descriptor::type_offset(record_offset), MSG_TYPE_ID)
.unwrap();
let mut receiver = BroadcastReceiver::new(buffer).unwrap();
assert_eq!(receiver.receive_next(), Ok(true));
assert_eq!(receiver.msg_type_id(), Ok(MSG_TYPE_ID));
assert_eq!(
receiver.offset(),
record_descriptor::msg_offset(record_offset)
);
assert_eq!(receiver.length(), Ok(length));
assert!(receiver.validate());
}
#[test]
fn should_receive_two_messages_from_buffer() {
let length: i32 = 8;
let record_length: i32 = length + record_descriptor::HEADER_LENGTH;
let aligned_record_length: i32 = align(
record_length as usize,
record_descriptor::RECORD_ALIGNMENT as usize,
) as i32;
let tail = (aligned_record_length * 2) as i64;
let latest_record = tail - aligned_record_length as i64;
let record_offset_one = 0;
let record_offset_two = latest_record as i32;
let mut buffer = vec![0u8; TOTAL_BUFFER_LENGTH];
buffer.put_i64(TAIL_COUNTER_INDEX, tail).unwrap();
buffer.put_i64(TAIL_INTENT_COUNTER_INDEX, tail).unwrap();
buffer
.put_i32(
record_descriptor::length_offset(record_offset_one),
record_length,
)
.unwrap();
buffer
.put_i32(
record_descriptor::type_offset(record_offset_one),
MSG_TYPE_ID,
)
.unwrap();
buffer
.put_i32(
record_descriptor::length_offset(record_offset_two),
record_length,
)
.unwrap();
buffer
.put_i32(
record_descriptor::type_offset(record_offset_two),
MSG_TYPE_ID,
)
.unwrap();
let mut receiver = BroadcastReceiver::new(buffer).unwrap();
assert_eq!(receiver.receive_next(), Ok(true));
assert_eq!(receiver.msg_type_id(), Ok(MSG_TYPE_ID));
assert_eq!(
receiver.offset(),
record_descriptor::msg_offset(record_offset_one)
);
assert_eq!(receiver.length(), Ok(length));
assert!(receiver.validate());
assert_eq!(receiver.receive_next(), Ok(true));
assert_eq!(receiver.msg_type_id(), Ok(MSG_TYPE_ID));
assert_eq!(
receiver.offset(),
record_descriptor::msg_offset(record_offset_two)
);
assert_eq!(receiver.length(), Ok(length));
assert!(receiver.validate());
}
#[test]
fn should_late_join_transmission() {
let length: i32 = 8;
let record_length: i32 = length + record_descriptor::HEADER_LENGTH;
let aligned_record_length: i32 = align(
record_length as usize,
record_descriptor::RECORD_ALIGNMENT as usize,
) as i32;
let tail = (CAPACITY * 3) as i64
+ record_descriptor::HEADER_LENGTH as i64
+ aligned_record_length as i64;
let latest_record = tail - aligned_record_length as i64;
let record_offset = latest_record as i32 & (CAPACITY - 1) as i32;
let mut buffer = vec![0u8; TOTAL_BUFFER_LENGTH];
// In order to properly do this test, we have to initialize the broadcast receiver
// while the buffer is empty, and then write into the buffer afterward. Rust is understandably
// not happy about that, but that's the price we pay for not dealing with mocking.
let receiver_buffer =
unsafe { ::std::slice::from_raw_parts_mut(buffer.as_mut_ptr(), buffer.len()) };
let mut receiver = BroadcastReceiver::new(receiver_buffer).unwrap();
buffer.put_i64(TAIL_COUNTER_INDEX, tail).unwrap();
buffer.put_i64(TAIL_INTENT_COUNTER_INDEX, tail).unwrap();
buffer.put_i64(LATEST_COUNTER_INDEX, latest_record).unwrap();
buffer
.put_i32(
record_descriptor::length_offset(record_offset),
record_length,
)
.unwrap();
buffer
.put_i32(record_descriptor::type_offset(record_offset), MSG_TYPE_ID)
.unwrap();
assert_eq!(receiver.receive_next(), Ok(true));
assert_eq!(receiver.msg_type_id(), Ok(MSG_TYPE_ID));
assert_eq!(
receiver.offset(),
record_descriptor::msg_offset(record_offset)
);
assert_eq!(receiver.length(), Ok(length));
assert!(receiver.validate());
assert!(receiver.lapped_count() > 0);
}
// TODO: Implement the rest of the tests
// Currently not done because of the need to mock the AtomicBuffer