mirror of
https://github.com/bspeice/aeron-rs
synced 2024-12-21 21:38:09 -05:00
Merge #14
14: Implement reading from a ring buffer r=bspeice a=bspeice bors r+ Co-authored-by: Bradlee Speice <bradlee@speice.io>
This commit is contained in:
commit
ddf1774cae
@ -108,6 +108,11 @@ impl<'a> AtomicBuffer<'a> {
|
|||||||
self.overlay_volatile::<i64>(offset)
|
self.overlay_volatile::<i64>(offset)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Get the current value at an offset without using any synchronization operations
|
||||||
|
pub fn get_i64(&self, offset: IndexT) -> Result<i64> {
|
||||||
|
self.overlay::<i64>(offset).map(|i| *i)
|
||||||
|
}
|
||||||
|
|
||||||
/// Perform a volatile read
|
/// Perform a volatile read
|
||||||
///
|
///
|
||||||
/// ```rust
|
/// ```rust
|
||||||
@ -121,6 +126,11 @@ impl<'a> AtomicBuffer<'a> {
|
|||||||
self.overlay_volatile::<i32>(offset)
|
self.overlay_volatile::<i32>(offset)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Get the current value at an offset without using any synchronization operations
|
||||||
|
pub fn get_i32(&self, offset: IndexT) -> Result<i32> {
|
||||||
|
self.overlay::<i32>(offset).map(|i| *i)
|
||||||
|
}
|
||||||
|
|
||||||
/// Perform a volatile write of an `i64` into the buffer
|
/// Perform a volatile write of an `i64` into the buffer
|
||||||
///
|
///
|
||||||
/// ```rust
|
/// ```rust
|
||||||
@ -200,7 +210,7 @@ impl<'a> AtomicBuffer<'a> {
|
|||||||
/// assert_eq!(atomic_buf.get_i64_volatile(0), Ok(2));
|
/// assert_eq!(atomic_buf.get_i64_volatile(0), Ok(2));
|
||||||
/// ```
|
/// ```
|
||||||
pub fn compare_and_set_i64(&self, offset: IndexT, expected: i64, update: i64) -> Result<bool> {
|
pub fn compare_and_set_i64(&self, offset: IndexT, expected: i64, update: i64) -> Result<bool> {
|
||||||
// QUESTION: Do I need a volatile and atomic read here?
|
// QUESTION: Should I use a volatile read here as well?
|
||||||
// Aeron C++ uses a volatile read before the atomic operation, but I think that
|
// Aeron C++ uses a volatile read before the atomic operation, but I think that
|
||||||
// may be redundant. In addition, Rust's `read_volatile` operation returns a
|
// may be redundant. In addition, Rust's `read_volatile` operation returns a
|
||||||
// *copied* value; running `compare_exchange` on that copy introduces a race condition
|
// *copied* value; running `compare_exchange` on that copy introduces a race condition
|
||||||
@ -210,6 +220,16 @@ impl<'a> AtomicBuffer<'a> {
|
|||||||
.is_ok()
|
.is_ok()
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Repeatedly write a value into an atomic buffer. Guaranteed to use `memset`.
|
||||||
|
pub fn set_memory(&mut self, offset: IndexT, length: usize, value: u8) -> Result<()> {
|
||||||
|
self.bounds_check(offset, length as IndexT).map(|_| unsafe {
|
||||||
|
self.buffer
|
||||||
|
.as_mut_ptr()
|
||||||
|
.offset(offset as isize)
|
||||||
|
.write_bytes(value, length)
|
||||||
|
})
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
|
@ -1,5 +1,6 @@
|
|||||||
//! Ring buffer wrapper for communicating with the Media Driver
|
//! Ring buffer wrapper for communicating with the Media Driver
|
||||||
use crate::client::concurrent::atomic_buffer::AtomicBuffer;
|
use crate::client::concurrent::atomic_buffer::AtomicBuffer;
|
||||||
|
use crate::util::bit::align;
|
||||||
use crate::util::{bit, AeronError, IndexT, Result};
|
use crate::util::{bit, AeronError, IndexT, Result};
|
||||||
|
|
||||||
/// Description of the Ring Buffer schema.
|
/// Description of the Ring Buffer schema.
|
||||||
@ -74,15 +75,13 @@ pub mod record_descriptor {
|
|||||||
/// and is not yet ready for processing.
|
/// and is not yet ready for processing.
|
||||||
pub const PADDING_MSG_TYPE_ID: i32 = -1;
|
pub const PADDING_MSG_TYPE_ID: i32 = -1;
|
||||||
|
|
||||||
/// Retrieve the header bits for a ring buffer record.
|
pub(super) fn make_header(length: i32, msg_type_id: i32) -> i64 {
|
||||||
pub fn make_header(length: i32, msg_type_id: i32) -> i64 {
|
|
||||||
// QUESTION: Instead of masking, can't we just cast and return u32/u64?
|
// QUESTION: Instead of masking, can't we just cast and return u32/u64?
|
||||||
// Smells like Java.
|
// Smells like Java.
|
||||||
((i64::from(msg_type_id) & 0xFFFF_FFFF) << 32) | (i64::from(length) & 0xFFFF_FFFF)
|
((i64::from(msg_type_id) & 0xFFFF_FFFF) << 32) | (i64::from(length) & 0xFFFF_FFFF)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Verify a message type identifier is safe for use
|
pub(super) fn check_msg_type_id(msg_type_id: i32) -> Result<()> {
|
||||||
pub fn check_msg_type_id(msg_type_id: i32) -> Result<()> {
|
|
||||||
if msg_type_id < 1 {
|
if msg_type_id < 1 {
|
||||||
Err(AeronError::IllegalArgument)
|
Err(AeronError::IllegalArgument)
|
||||||
} else {
|
} else {
|
||||||
@ -90,15 +89,21 @@ pub mod record_descriptor {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Fetch the offset to begin writing a message payload
|
pub(super) fn encoded_msg_offset(record_offset: IndexT) -> IndexT {
|
||||||
pub fn encoded_msg_offset(record_offset: IndexT) -> IndexT {
|
|
||||||
record_offset + HEADER_LENGTH
|
record_offset + HEADER_LENGTH
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Fetch the offset to begin writing the message length
|
pub(super) fn length_offset(record_offset: IndexT) -> IndexT {
|
||||||
pub fn length_offset(record_offset: IndexT) -> IndexT {
|
|
||||||
record_offset
|
record_offset
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub(super) fn record_length(header: i64) -> i32 {
|
||||||
|
header as i32
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(super) fn message_type_id(header: i64) -> i32 {
|
||||||
|
(header >> 32) as i32
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Multi-producer, single-consumer ring buffer implementation.
|
/// Multi-producer, single-consumer ring buffer implementation.
|
||||||
@ -174,6 +179,71 @@ impl<'a> ManyToOneRingBuffer<'a> {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Read messages from the ring buffer and dispatch to `handler`, up to `message_count_limit`
|
||||||
|
pub fn read<F>(&mut self, mut handler: F, message_count_limit: usize) -> Result<usize>
|
||||||
|
where
|
||||||
|
F: FnMut(i32, &AtomicBuffer, IndexT, IndexT) -> (),
|
||||||
|
{
|
||||||
|
// UNWRAP: Bounds check performed during buffer creation
|
||||||
|
let head = self.buffer.get_i64(self.head_position_index).unwrap();
|
||||||
|
let head_index = (head & i64::from(self.capacity - 1)) as i32;
|
||||||
|
let contiguous_block_length = self.capacity - head_index;
|
||||||
|
let mut messages_read = 0;
|
||||||
|
let mut bytes_read: i32 = 0;
|
||||||
|
|
||||||
|
let result: Result<()> = (|| {
|
||||||
|
while bytes_read < contiguous_block_length && messages_read < message_count_limit {
|
||||||
|
let record_index = head_index + bytes_read;
|
||||||
|
let header = self.buffer.get_i64_volatile(record_index)?;
|
||||||
|
let record_length = record_descriptor::record_length(header);
|
||||||
|
|
||||||
|
if record_length <= 0 {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
bytes_read += align(
|
||||||
|
record_length as usize,
|
||||||
|
record_descriptor::ALIGNMENT as usize,
|
||||||
|
) as i32;
|
||||||
|
|
||||||
|
let msg_type_id = record_descriptor::message_type_id(header);
|
||||||
|
if msg_type_id == record_descriptor::PADDING_MSG_TYPE_ID {
|
||||||
|
// QUESTION: Is this a spinlock on a writer finishing?
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
messages_read += 1;
|
||||||
|
handler(
|
||||||
|
msg_type_id,
|
||||||
|
&self.buffer,
|
||||||
|
record_descriptor::encoded_msg_offset(record_index),
|
||||||
|
record_length - record_descriptor::HEADER_LENGTH,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
})();
|
||||||
|
|
||||||
|
// C++ has much better semantics for handling cleanup like this; however, because
|
||||||
|
// it would require us to capture a mutable reference to self, it's not feasible
|
||||||
|
// in Rust (since the main operation also needs mutable access to self).
|
||||||
|
let mut cleanup = || {
|
||||||
|
if bytes_read != 0 {
|
||||||
|
self.buffer
|
||||||
|
.set_memory(head_index, bytes_read as usize, 0)
|
||||||
|
.unwrap();
|
||||||
|
self.buffer
|
||||||
|
.put_i64_ordered(self.head_position_index, head + i64::from(bytes_read))
|
||||||
|
.unwrap();
|
||||||
|
}
|
||||||
|
};
|
||||||
|
result.map(|_| cleanup()).map_err(|e| {
|
||||||
|
cleanup();
|
||||||
|
e
|
||||||
|
})?;
|
||||||
|
|
||||||
|
Ok(messages_read)
|
||||||
|
}
|
||||||
|
|
||||||
/// Claim capacity for a specific message size in the ring buffer. Returns the offset/index
|
/// Claim capacity for a specific message size in the ring buffer. Returns the offset/index
|
||||||
/// at which to start writing the next record.
|
/// at which to start writing the next record.
|
||||||
fn claim_capacity(&mut self, required: IndexT) -> Result<IndexT> {
|
fn claim_capacity(&mut self, required: IndexT) -> Result<IndexT> {
|
||||||
@ -337,4 +407,66 @@ mod tests {
|
|||||||
12
|
12
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn read_basic() {
|
||||||
|
// Similar to write basic, put something into the buffer
|
||||||
|
let mut bytes = vec![0u8; 512 + buffer_descriptor::TRAILER_LENGTH as usize];
|
||||||
|
let buffer = AtomicBuffer::wrap(&mut bytes);
|
||||||
|
let mut ring_buffer = ManyToOneRingBuffer::wrap(buffer).expect("Invalid buffer size");
|
||||||
|
|
||||||
|
let mut source_bytes = [12, 0, 0, 0, 0, 0, 0, 0];
|
||||||
|
let source_len = source_bytes.len() as IndexT;
|
||||||
|
let source_buffer = AtomicBuffer::wrap(&mut source_bytes);
|
||||||
|
let type_id = 1;
|
||||||
|
ring_buffer
|
||||||
|
.write(type_id, &source_buffer, 0, source_len)
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
// Now we can start the actual read process
|
||||||
|
let c = |_, buf: &AtomicBuffer, offset, _| {
|
||||||
|
assert_eq!(buf.get_i64_volatile(offset).unwrap(), 12)
|
||||||
|
};
|
||||||
|
ring_buffer.read(c, 1).unwrap();
|
||||||
|
|
||||||
|
// Make sure that the buffer was zeroed on finish
|
||||||
|
drop(ring_buffer);
|
||||||
|
let buffer = AtomicBuffer::wrap(&mut bytes);
|
||||||
|
for i in (0..record_descriptor::ALIGNMENT * 1).step_by(4) {
|
||||||
|
assert_eq!(buffer.get_i32(i).unwrap(), 0);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn read_multiple() {
|
||||||
|
let mut bytes = vec![0u8; 512 + buffer_descriptor::TRAILER_LENGTH as usize];
|
||||||
|
let buffer = AtomicBuffer::wrap(&mut bytes);
|
||||||
|
let mut ring_buffer = ManyToOneRingBuffer::wrap(buffer).expect("Invalid buffer size");
|
||||||
|
|
||||||
|
let mut source_bytes = [12, 0, 0, 0, 0, 0, 0, 0];
|
||||||
|
let source_len = source_bytes.len() as IndexT;
|
||||||
|
let source_buffer = AtomicBuffer::wrap(&mut source_bytes);
|
||||||
|
let type_id = 1;
|
||||||
|
ring_buffer
|
||||||
|
.write(type_id, &source_buffer, 0, source_len)
|
||||||
|
.unwrap();
|
||||||
|
ring_buffer
|
||||||
|
.write(type_id, &source_buffer, 0, source_len)
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
let mut msg_count = 0;
|
||||||
|
let c = |_, buf: &AtomicBuffer, offset, _| {
|
||||||
|
msg_count += 1;
|
||||||
|
assert_eq!(buf.get_i64_volatile(offset).unwrap(), 12);
|
||||||
|
};
|
||||||
|
ring_buffer.read(c, 2).unwrap();
|
||||||
|
assert_eq!(msg_count, 2);
|
||||||
|
|
||||||
|
// Make sure that the buffer was zeroed on finish
|
||||||
|
drop(ring_buffer);
|
||||||
|
let buffer = AtomicBuffer::wrap(&mut bytes);
|
||||||
|
for i in (0..record_descriptor::ALIGNMENT * 2).step_by(4) {
|
||||||
|
assert_eq!(buffer.get_i32(i).unwrap(), 0);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user