Start work on the broadcast receiver

pull/19/head
Bradlee Speice 2019-11-02 22:33:33 -04:00
parent 98002c00be
commit 4ad3dbcecd
4 changed files with 226 additions and 9 deletions

View File

@ -0,0 +1,185 @@
//! Read messages that are broadcast from the media driver; this is the primary means
//! of receiving data.
use crate::client::concurrent::AtomicBuffer;
use crate::util::bit::align;
use crate::util::{IndexT, Result};
use std::sync::atomic::{AtomicI64, Ordering};
/// Description of the broadcast buffer schema
pub mod buffer_descriptor {
use crate::util::bit::{is_power_of_two, CACHE_LINE_LENGTH};
use crate::util::{AeronError, IndexT, Result};
use std::mem::size_of;
pub(super) const TAIL_INTENT_COUNTER_OFFSET: IndexT = 0;
pub(super) const TAIL_COUNTER_OFFSET: IndexT =
TAIL_INTENT_COUNTER_OFFSET + size_of::<i64>() as IndexT;
pub(super) const LATEST_COUNTER_OFFSET: IndexT =
TAIL_COUNTER_OFFSET + size_of::<i64>() as IndexT;
/// Size of the broadcast buffer metadata trailer
pub const TRAILER_LENGTH: IndexT = CACHE_LINE_LENGTH as IndexT * 2;
pub(super) fn check_capacity(capacity: IndexT) -> Result<()> {
// QUESTION: Why does C++ throw IllegalState here?
// Would've expected it to throw IllegalArgument like ring buffer
if is_power_of_two(capacity) {
Ok(())
} else {
Err(AeronError::IllegalArgument)
}
}
}
/// Broadcast buffer message header
// QUESTION: Isn't this the same as the ring buffer descriptor?
// Why not consolidate them?
pub mod record_descriptor {
use crate::util::IndexT;
/// Message type to indicate a record used only
/// for padding the buffer
pub const PADDING_MSG_TYPE_ID: i32 = -1;
/// Offset from the beginning of a record to its length
pub const LENGTH_OFFSET: IndexT = 0;
/// Offset from the beginning of a record to its type
pub const TYPE_OFFSET: IndexT = 4;
/// Total header length for each record
pub const HEADER_LENGTH: IndexT = 8;
/// Alignment for all broadcast records
pub const RECORD_ALIGNMENT: IndexT = HEADER_LENGTH;
/// Retrieve the byte offset for a record's length field given the record start
pub fn length_offset(record_offset: IndexT) -> IndexT {
record_offset + LENGTH_OFFSET
}
/// Retrieve the byte offset for a record's type field given the record start
pub fn type_offset(record_offset: IndexT) -> IndexT {
record_offset + TYPE_OFFSET
}
/// Retrieve the byte offset for a record's message given the record start
pub fn msg_offset(record_offset: IndexT) -> IndexT {
record_offset + HEADER_LENGTH
}
}
/// Receive messages from a transmission stream
pub struct BroadcastReceiver<A>
where
A: AtomicBuffer,
{
buffer: A,
capacity: IndexT,
mask: IndexT,
tail_intent_counter_index: IndexT,
tail_counter_index: IndexT,
latest_counter_index: IndexT,
record_offset: IndexT,
cursor: i64,
next_record: i64,
lapped_count: AtomicI64,
}
impl<A> BroadcastReceiver<A>
where
A: AtomicBuffer,
{
/// Create a new receiver backed by `buffer`
pub fn new(buffer: A) -> Result<Self> {
let capacity = buffer.capacity() - buffer_descriptor::TRAILER_LENGTH;
buffer_descriptor::check_capacity(capacity)?;
let latest_counter_index = capacity + buffer_descriptor::LATEST_COUNTER_OFFSET;
let cursor = buffer.get_i64(latest_counter_index)?;
let mask = capacity - 1;
Ok(BroadcastReceiver {
buffer,
capacity,
mask,
tail_intent_counter_index: capacity + buffer_descriptor::TAIL_INTENT_COUNTER_OFFSET,
tail_counter_index: capacity + buffer_descriptor::TAIL_COUNTER_OFFSET,
latest_counter_index,
record_offset: (cursor as i32) & mask,
cursor,
next_record: cursor,
lapped_count: AtomicI64::new(0),
})
}
/// Get the total capacity of this broadcast receiver
pub fn capacity(&self) -> IndexT {
self.capacity
}
/// Get the number of times the transmitter has lapped this receiver. Each lap
/// represents at least a buffer's worth of lost data.
pub fn lapped_count(&self) -> i64 {
// QUESTION: C++ just uses `std::atomic<long>`, what are the ordering semantics?
// For right now I'm just assuming it's sequentially consistent
self.lapped_count.load(Ordering::SeqCst)
}
/// Non-blocking receive of next message from the transmission stream.
/// If loss has occurred, `lapped_count` will be incremented. Returns `true`
/// if the next transmission is available, `false` otherwise.
pub fn receive_next(&mut self) -> Result<bool> {
let mut is_available = false;
let tail: i64 = self.buffer.get_i64_volatile(self.tail_counter_index)?;
let mut cursor: i64 = self.next_record;
if tail > cursor {
// The way we set `record_offset` is slightly different from C++;
// Clippy was yelling at me, and I think this makes more sense.
if !self.validate(cursor) {
self.lapped_count.fetch_add(1, Ordering::SeqCst);
cursor = self.buffer.get_i64(self.latest_counter_index)?;
}
let mut record_offset = (cursor as i32) & self.mask;
self.cursor = cursor;
self.next_record = cursor
+ align(
self.buffer
.get_i32(record_descriptor::length_offset(record_offset))?
as usize,
record_descriptor::RECORD_ALIGNMENT as usize,
) as i64;
if record_descriptor::PADDING_MSG_TYPE_ID
== self
.buffer
.get_i32(record_descriptor::type_offset(record_offset))?
{
record_offset = 0;
self.cursor = self.next_record;
self.next_record += align(
self.buffer
.get_i32(record_descriptor::length_offset(record_offset))?
as usize,
record_descriptor::RECORD_ALIGNMENT as usize,
) as i64;
}
self.record_offset = record_offset;
is_available = true;
}
Ok(is_available)
}
fn validate(&self, cursor: i64) -> bool {
// UNWRAP: Length checks performed during initialization
(cursor + i64::from(self.capacity))
> self
.buffer
.get_i64_volatile(self.tail_intent_counter_index)
.unwrap()
}
}

View File

@ -1,6 +1,7 @@
//! Module for handling safe interactions among the multiple clients making use
//! of a single Media Driver
pub mod broadcast;
pub mod ringbuffer;
use std::mem::size_of;
use std::sync::atomic::{AtomicI64, Ordering};

View File

@ -4,9 +4,8 @@ use crate::util::bit::align;
use crate::util::{bit, AeronError, IndexT, Result};
use std::ops::{Deref, DerefMut};
/// Description of the Ring Buffer schema.
/// Description of the ring buffer schema
pub mod buffer_descriptor {
use crate::client::concurrent::AtomicBuffer;
use crate::util::bit::{is_power_of_two, CACHE_LINE_LENGTH};
use crate::util::AeronError::IllegalArgument;
use crate::util::{IndexT, Result};
@ -31,13 +30,9 @@ pub mod buffer_descriptor {
/// Verify the capacity of a buffer is legal for use as a ring buffer.
/// Returns the actual capacity excluding ring buffer metadata.
pub fn check_capacity<A>(buffer: &A) -> Result<IndexT>
where
A: AtomicBuffer,
{
let capacity = (buffer.len() - TRAILER_LENGTH as usize) as IndexT;
pub fn check_capacity(capacity: IndexT) -> Result<()> {
if is_power_of_two(capacity) {
Ok(capacity)
Ok(())
} else {
Err(IllegalArgument)
}
@ -138,7 +133,9 @@ where
{
/// Create a many-to-one ring buffer from an underlying atomic buffer.
pub fn new(buffer: A) -> Result<Self> {
buffer_descriptor::check_capacity(&buffer).map(|capacity| ManyToOneRingBuffer {
let capacity = buffer.capacity() - buffer_descriptor::TRAILER_LENGTH;
buffer_descriptor::check_capacity(capacity)?;
Ok(ManyToOneRingBuffer {
buffer,
capacity,
max_msg_length: capacity / 8,

View File

@ -0,0 +1,34 @@
use aeron_rs::client::concurrent::broadcast::{buffer_descriptor, BroadcastReceiver};
use aeron_rs::util::IndexT;
const CAPACITY: usize = 1024;
const TOTAL_BUFFER_LENGTH: usize = CAPACITY + buffer_descriptor::TRAILER_LENGTH as usize;
// NOTE: The C++ tests use a mock atomic buffer for testing to validate behavior.
// This is rather hard to do with Rust, so we skip behavior validation for now,
// and assume that other tests will end up verifying needed behavior.
#[test]
fn should_calculate_capacity_for_buffer() {
let buffer = BroadcastReceiver::new(vec![0u8; TOTAL_BUFFER_LENGTH]).unwrap();
assert_eq!(buffer.capacity(), CAPACITY as IndexT);
}
#[test]
fn should_throw_exception_for_capacity_that_is_not_power_of_two() {
let bytes = vec![0u8; 777 + buffer_descriptor::TRAILER_LENGTH as usize];
assert!(BroadcastReceiver::new(bytes).is_err());
}
#[test]
fn should_not_be_lapped_before_reception() {
let receiver = BroadcastReceiver::new(vec![0u8; TOTAL_BUFFER_LENGTH]).unwrap();
assert_eq!(receiver.lapped_count(), 0);
}
#[test]
fn should_not_receive_from_empty_buffer() {
let mut receiver = BroadcastReceiver::new(vec![0u8; TOTAL_BUFFER_LENGTH]).unwrap();
assert_eq!(receiver.receive_next(), Ok(false));
}