1
0
mirror of https://github.com/bspeice/aeron-rs synced 2024-12-21 21:38:09 -05:00

Implement claim_capacity

Pretty close to having write support ready
This commit is contained in:
Bradlee Speice 2019-10-04 23:53:58 -04:00
parent 818d2ad821
commit 3ae7e6176c
3 changed files with 292 additions and 59 deletions

View File

@ -5,6 +5,7 @@ use std::ops::Deref;
use std::sync::atomic::{AtomicI64, Ordering}; use std::sync::atomic::{AtomicI64, Ordering};
use crate::util::{AeronError, IndexT, Result}; use crate::util::{AeronError, IndexT, Result};
use std::ptr::{read_volatile, write_volatile};
/// Wrapper for atomic operations around an underlying byte buffer /// Wrapper for atomic operations around an underlying byte buffer
pub struct AtomicBuffer<'a> { pub struct AtomicBuffer<'a> {
@ -25,18 +26,43 @@ impl<'a> AtomicBuffer<'a> {
AtomicBuffer { buffer } AtomicBuffer { buffer }
} }
fn bounds_check<T>(&self, offset: IndexT) -> Result<()> {
if offset < 0 || self.buffer.len() - (offset as usize) < size_of::<T>() {
Err(AeronError::OutOfBounds)
} else {
Ok(())
}
}
#[allow(clippy::cast_ptr_alignment)] #[allow(clippy::cast_ptr_alignment)]
fn overlay<T>(&self, offset: IndexT) -> Result<&T> fn overlay<T>(&self, offset: IndexT) -> Result<&T>
where where
T: Sized, T: Sized,
{ {
if offset < 0 || self.buffer.len() - (offset as usize) < size_of::<T>() { self.bounds_check::<T>(offset).map(|_| {
Err(AeronError::OutOfBounds)
} else {
let offset_ptr = unsafe { self.buffer.as_ptr().offset(offset as isize) }; let offset_ptr = unsafe { self.buffer.as_ptr().offset(offset as isize) };
let t: &T = unsafe { &*(offset_ptr as *const T) }; unsafe { &*(offset_ptr as *const T) }
Ok(t) })
} }
fn overlay_volatile<T>(&self, offset: IndexT) -> Result<T>
where
T: Copy
{
self.bounds_check::<T>(offset).map(|_| {
let offset_ptr = unsafe { self.buffer.as_ptr().offset(offset as isize) };
unsafe { read_volatile(offset_ptr as *const T) }
})
}
fn write_volatile<T>(&mut self, offset: IndexT, val: T) -> Result<()>
where
T: Copy,
{
self.bounds_check::<T>(offset).map(|_| {
let offset_ptr = unsafe { self.buffer.as_mut_ptr().offset(offset as isize) };
unsafe { write_volatile(offset_ptr as *mut T, val) };
})
} }
/// Atomically fetch the current value at an offset, and increment by delta /// Atomically fetch the current value at an offset, and increment by delta
@ -44,6 +70,32 @@ impl<'a> AtomicBuffer<'a> {
self.overlay::<AtomicI64>(offset) self.overlay::<AtomicI64>(offset)
.map(|a| a.fetch_add(delta, Ordering::SeqCst)) .map(|a| a.fetch_add(delta, Ordering::SeqCst))
} }
/// Perform a volatile read
pub fn get_i64_volatile(&self, offset: IndexT) -> Result<i64> {
// QUESTION: Would it be better to express this in terms of an atomic read?
self.overlay_volatile::<i64>(offset)
}
/// Perform a volatile write into the buffer
pub fn put_i64_ordered(&mut self, offset: IndexT, val: i64) -> Result<()> {
self.write_volatile::<i64>(offset, val)
}
/// Compare an expected value with what is in memory, and if it matches,
/// update to a new value. Returns `Ok(true)` if the update was successful,
/// and `Ok(false)` if the update failed.
pub fn compare_and_set_i64(&self, offset: IndexT, expected: i64, update: i64) -> Result<bool> {
// QUESTION: Do I need a volatile and atomic read here?
// Aeron C++ uses a volatile read before the atomic operation, but I think that
// may be redundant. In addition, Rust's `read_volatile` operation returns a
// *copied* value; running `compare_exchange` on that copy introduces a race condition
// because we're no longer comparing a consistent address.
self.overlay::<AtomicI64>(offset).map(|a| {
a.compare_exchange(expected, update, Ordering::SeqCst, Ordering::SeqCst)
.is_ok()
})
}
} }
#[cfg(test)] #[cfg(test)]
@ -123,4 +175,37 @@ mod tests {
Err(AeronError::OutOfBounds) Err(AeronError::OutOfBounds)
) )
} }
#[test]
fn put_i64() {
let mut buf = [0u8; 8];
let mut atomic_buf = AtomicBuffer::wrap(&mut buf);
atomic_buf.put_i64_ordered(0, 12).unwrap();
assert_eq!(
atomic_buf.get_i64_volatile(0),
Ok(12)
)
}
#[test]
fn compare_set_i64() {
let mut buf = [0u8; 8];
let atomic_buf = AtomicBuffer::wrap(&mut buf);
atomic_buf.get_and_add_i64(0, 1).unwrap();
assert_eq!(
atomic_buf.compare_and_set_i64(0, 0, 1),
Ok(false)
);
assert_eq!(
atomic_buf.compare_and_set_i64(0, 1, 2),
Ok(true)
);
assert_eq!(
atomic_buf.get_i64_volatile(0),
Ok(2)
);
}
} }

View File

@ -1,58 +1,35 @@
//! Ring buffer wrapper for communicating with the Media Driver //! Ring buffer wrapper for communicating with the Media Driver
use crate::client::concurrent::atomic_buffer::AtomicBuffer; use crate::client::concurrent::atomic_buffer::AtomicBuffer;
use crate::util::{IndexT, Result}; use crate::util::{AeronError, IndexT, Result};
/// Description of the Ring Buffer schema. Each Ring Buffer looks like: /// Description of the Ring Buffer schema.
/// pub mod buffer_descriptor {
/// ```text
/// 0 1 2 3
/// 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
/// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
/// | Buffer Data ...
/// ... |
/// +---------------------------------------------------------------+
/// | |
/// | Tail Position |
/// | |
/// | |
/// +---------------------------------------------------------------+
/// | |
/// | Head Cache Position |
/// | |
/// | |
/// +---------------------------------------------------------------+
/// | |
/// | Head Position |
/// | |
/// | |
/// +---------------------------------------------------------------+
/// | |
/// | Correlation ID Counter |
/// | |
/// | |
/// +---------------------------------------------------------------+
/// | |
/// | Consumer Heartbeat |
/// | |
/// | |
/// +---------------------------------------------------------------+
/// ```
pub mod descriptor {
use crate::client::concurrent::atomic_buffer::AtomicBuffer; use crate::client::concurrent::atomic_buffer::AtomicBuffer;
use crate::util::AeronError::IllegalArgument; use crate::util::AeronError::IllegalArgument;
use crate::util::{is_power_of_two, IndexT, Result, CACHE_LINE_LENGTH}; use crate::util::{is_power_of_two, IndexT, Result, CACHE_LINE_LENGTH};
/// Offset of the correlation id counter, as measured in bytes past // QUESTION: Why are these offsets so large when we only ever use i64 types?
/// the start of the ring buffer metadata trailer
pub const CORRELATION_COUNTER_OFFSET: usize = CACHE_LINE_LENGTH * 8;
/// Total size of the ring buffer metadata trailer /// Offset in the ring buffer metadata to the end of the most recent record.
pub const TRAILER_LENGTH: usize = CACHE_LINE_LENGTH * 12; pub const TAIL_POSITION_OFFSET: IndexT = (CACHE_LINE_LENGTH * 2) as IndexT;
/// QUESTION: Why the distinction between HEAD_CACHE and HEAD?
pub const HEAD_CACHE_POSITION_OFFSET: IndexT = (CACHE_LINE_LENGTH * 4) as IndexT;
/// Offset in the ring buffer metadata to index of the next record to read.
pub const HEAD_POSITION_OFFSET: IndexT = (CACHE_LINE_LENGTH * 6) as IndexT;
/// Offset of the correlation id counter, as measured in bytes past
/// the start of the ring buffer metadata trailer.
pub const CORRELATION_COUNTER_OFFSET: IndexT = (CACHE_LINE_LENGTH * 8) as IndexT;
/// Total size of the ring buffer metadata trailer.
pub const TRAILER_LENGTH: IndexT = (CACHE_LINE_LENGTH * 12) as IndexT;
/// Verify the capacity of a buffer is legal for use as a ring buffer. /// Verify the capacity of a buffer is legal for use as a ring buffer.
/// Returns the actual buffer capacity once ring buffer metadata has been removed. /// Returns the actual capacity excluding ring buffer metadata.
pub fn check_capacity(buffer: &AtomicBuffer<'_>) -> Result<IndexT> { pub fn check_capacity(buffer: &AtomicBuffer<'_>) -> Result<IndexT> {
let capacity = (buffer.len() - TRAILER_LENGTH) as IndexT; let capacity = (buffer.len() - TRAILER_LENGTH as usize) as IndexT;
if is_power_of_two(capacity) { if is_power_of_two(capacity) {
Ok(capacity) Ok(capacity)
} else { } else {
@ -61,21 +38,189 @@ pub mod descriptor {
} }
} }
/// Ring buffer message header. Made up of fields for message length, message type,
/// and then the encoded message.
///
/// Writing the record length signals the message recording is complete, and all
/// associated ring buffer metadata has been properly updated.
///
/// ```text
/// 0 1 2 3
/// 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
/// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
/// |R| Record Length |
/// +-+-------------------------------------------------------------+
/// | Type |
/// +---------------------------------------------------------------+
/// | Encoded Message ...
///... |
/// +---------------------------------------------------------------+
/// ```
pub mod record_descriptor {
use crate::util::IndexT;
use std::mem::size_of;
/// Size of the ring buffer record header.
pub const HEADER_LENGTH: IndexT = size_of::<i32>() as IndexT * 2;
/// Message type indicating to the media driver that space has been reserved,
/// and is not yet ready for processing.
pub const PADDING_MSG_TYPE_ID: i32 = -1;
/// Retrieve the header bits for a ring buffer record.
pub fn make_header(length: i32, msg_type_id: i32) -> i64 {
// QUESTION: Instead of masking, can't we just cast and return u32/u64?
// Smells like Java.
((i64::from(msg_type_id) & 0xFFFF_FFFF) << 32) | (i64::from(length) & 0xFFFF_FFFF)
}
}
/// Multi-producer, single-consumer ring buffer implementation. /// Multi-producer, single-consumer ring buffer implementation.
pub struct ManyToOneRingBuffer<'a> { pub struct ManyToOneRingBuffer<'a> {
_buffer: AtomicBuffer<'a>, buffer: AtomicBuffer<'a>,
_capacity: IndexT, capacity: IndexT,
_correlation_counter_offset: IndexT, tail_position_index: IndexT,
head_cache_position_index: IndexT,
head_position_index: IndexT,
_correlation_id_counter_index: IndexT,
} }
impl<'a> ManyToOneRingBuffer<'a> { impl<'a> ManyToOneRingBuffer<'a> {
/// Create a many-to-one ring buffer from an underlying atomic buffer /// Create a many-to-one ring buffer from an underlying atomic buffer.
pub fn wrap(buffer: AtomicBuffer<'a>) -> Result<Self> { pub fn wrap(buffer: AtomicBuffer<'a>) -> Result<Self> {
descriptor::check_capacity(&buffer).map(|capacity| ManyToOneRingBuffer { buffer_descriptor::check_capacity(&buffer).map(|capacity| ManyToOneRingBuffer {
_buffer: buffer, buffer,
_capacity: capacity, capacity,
_correlation_counter_offset: capacity tail_position_index: capacity + buffer_descriptor::TAIL_POSITION_OFFSET,
+ descriptor::CORRELATION_COUNTER_OFFSET as IndexT, head_cache_position_index: capacity + buffer_descriptor::HEAD_CACHE_POSITION_OFFSET,
head_position_index: capacity + buffer_descriptor::HEAD_POSITION_OFFSET,
_correlation_id_counter_index: capacity + buffer_descriptor::CORRELATION_COUNTER_OFFSET,
}) })
} }
/// Claim capacity for a specific message size in the ring buffer. Returns the offset/index
/// at which to start writing the next record.
// TODO: Shouldn't be `pub`, just trying to avoid warnings
pub fn claim_capacity(&mut self, required: IndexT) -> Result<IndexT> {
// QUESTION: Is this mask how we handle the "ring" in ring buffer?
// Would explain why we assert buffer capacity is a power of two during initialization
let mask = self.capacity - 1;
// UNWRAP: Known-valid offset calculated during initialization
let mut head = self
.buffer
.get_i64_volatile(self.head_cache_position_index)
.unwrap();
let mut tail: i64;
let mut tail_index: IndexT;
let mut padding: IndexT;
// Note the braces, making this a do-while loop
while {
// UNWRAP: Known-valid offset calculated during initialization
tail = self
.buffer
.get_i64_volatile(self.tail_position_index)
.unwrap();
let available_capacity = self.capacity - (tail - head) as IndexT;
println!("Available: {}", available_capacity);
if required > available_capacity {
// UNWRAP: Known-valid offset calculated during initialization
head = self
.buffer
.get_i64_volatile(self.head_position_index)
.unwrap();
if required > (self.capacity - (tail - head) as IndexT) {
return Err(AeronError::InsufficientCapacity);
}
// UNWRAP: Known-valid offset calculated during initialization
self.buffer
.put_i64_ordered(self.head_cache_position_index, head)
.unwrap();
}
padding = 0;
// Because we assume `tail` and `mask` are always positive integers,
// it's "safe" to widen the types and bitmask below. We're just trying
// to imitate C++ here.
tail_index = (tail & i64::from(mask)) as IndexT;
let to_buffer_end_length = self.capacity - tail_index;
println!("To buffer end: {}", to_buffer_end_length);
if required > to_buffer_end_length {
let mut head_index = (head & i64::from(mask)) as IndexT;
if required > head_index {
// UNWRAP: Known-valid offset calculated during initialization
head = self
.buffer
.get_i64_volatile(self.head_position_index)
.unwrap();
head_index = (head & i64::from(mask)) as IndexT;
if required > head_index {
return Err(AeronError::InsufficientCapacity);
}
// UNWRAP: Known-valid offset calculated during initialization
self.buffer
.put_i64_ordered(self.head_cache_position_index, head)
.unwrap();
}
padding = to_buffer_end_length;
}
// UNWRAP: Known-valid offset calculated during initialization
!self
.buffer
.compare_and_set_i64(
self.tail_position_index,
tail,
tail + i64::from(required) + i64::from(padding),
)
.unwrap()
} {}
if padding != 0 {
// UNWRAP: Known-valid offset calculated during initialization
self.buffer
.put_i64_ordered(
tail_index,
record_descriptor::make_header(padding, record_descriptor::PADDING_MSG_TYPE_ID),
)
.unwrap();
tail_index = 0;
}
Ok(tail_index)
}
}
#[cfg(test)]
mod tests {
use crate::client::concurrent::atomic_buffer::AtomicBuffer;
use crate::client::concurrent::ring_buffer::ManyToOneRingBuffer;
#[test]
fn basic_claim_space() {
let buf_size = super::buffer_descriptor::TRAILER_LENGTH as usize + 64;
let mut buf = vec![0u8; buf_size];
let atomic_buf = AtomicBuffer::wrap(&mut buf);
let mut ring_buf = ManyToOneRingBuffer::wrap(atomic_buf).unwrap();
ring_buf.claim_capacity(16).unwrap();
assert_eq!(
ring_buf.buffer.get_i64_volatile(ring_buf.tail_position_index),
Ok(16)
);
let write_start = ring_buf.claim_capacity(16).unwrap();
assert_eq!(write_start, 16);
}
} }

View File

@ -3,6 +3,7 @@
/// Helper type to indicate indexing operations in Aeron, Synonymous with the /// Helper type to indicate indexing operations in Aeron, Synonymous with the
/// Aeron C++ `index_t` type. Used to imitate the Java API. /// Aeron C++ `index_t` type. Used to imitate the Java API.
// QUESTION: Can this just be updated to be `usize` in Rust?
pub type IndexT = i32; pub type IndexT = i32;
/// Helper method for quick verification that `IndexT` is a positive power of two /// Helper method for quick verification that `IndexT` is a positive power of two
@ -21,6 +22,8 @@ pub enum AeronError {
IllegalArgument, IllegalArgument,
/// Indication that a memory access would exceed the allowable bounds /// Indication that a memory access would exceed the allowable bounds
OutOfBounds, OutOfBounds,
/// Indication that a buffer operation could not complete because of space constraints
InsufficientCapacity,
} }
/// Result type for operations in the Aeron client /// Result type for operations in the Aeron client