1
0
mirror of https://github.com/bspeice/aeron-rs synced 2025-01-04 11:59:35 -05:00

Finish the buffer read messages

This commit is contained in:
Bradlee Speice 2019-11-02 20:21:15 -04:00
parent bf3a39a46f
commit 47a102b063
3 changed files with 218 additions and 62 deletions

View File

@ -257,6 +257,18 @@ pub trait AtomicBuffer: Deref<Target = [u8]> + DerefMut<Target = [u8]> {
self.write_volatile::<i32>(offset, value) self.write_volatile::<i32>(offset, value)
} }
/// Write an `i32` value into the buffer without performing any synchronization
///
/// ```rust
/// # use aeron_rs::client::concurrent::AtomicBuffer;
/// let mut buffer = vec![0u8; 5];
/// buffer.put_i32(0, 255 + 1);
/// assert_eq!(buffer.get_i32(1), Ok(1));
/// ```
fn put_i32(&mut self, offset: IndexT, value: i32) -> Result<()> {
self.overlay_mut::<i32>(offset).map(|i| *i = value)
}
/// Return the total number of bytes in this buffer /// Return the total number of bytes in this buffer
fn capacity(&self) -> IndexT { fn capacity(&self) -> IndexT {
self.len() as IndexT self.len() as IndexT

View File

@ -210,16 +210,18 @@ where
} }
/// Read messages from the ring buffer and dispatch to `handler`, up to `message_count_limit` /// Read messages from the ring buffer and dispatch to `handler`, up to `message_count_limit`
pub fn read<F>(&mut self, mut handler: F, message_count_limit: usize) -> Result<usize> ///
/// NOTE: The C++ API will stop reading and clean up if an exception is thrown in the handler
/// function; by contrast, the Rust API makes no attempt to catch panics and currently
/// has no way of stopping reading once started.
// QUESTION: Is there a better way to handle dispatching the handler function?
// We can't give it a `&dyn AtomicBuffer` because of the monomorphized generic functions,
// don't know if having a separate handler trait would be helpful.
pub fn read_n<F>(&mut self, mut handler: F, message_count_limit: usize) -> Result<usize>
where where
F: FnMut(i32, &A, IndexT, IndexT) -> (), F: FnMut(i32, &A, IndexT, IndexT) -> (),
{ {
// QUESTION: Should I implement the `get_i64` method that C++ uses? let head = self.buffer.get_i64(self.head_position_index)?;
// UNWRAP: Bounds check performed during buffer creation
let head = self
.buffer
.get_i64_volatile(self.head_position_index)
.unwrap();
let head_index = (head & i64::from(self.capacity - 1)) as i32; let head_index = (head & i64::from(self.capacity - 1)) as i32;
let contiguous_block_length = self.capacity - head_index; let contiguous_block_length = self.capacity - head_index;
let mut messages_read = 0; let mut messages_read = 0;
@ -262,6 +264,9 @@ where
// in Rust (since the main operation also needs mutable access to self). // in Rust (since the main operation also needs mutable access to self).
let mut cleanup = || { let mut cleanup = || {
if bytes_read != 0 { if bytes_read != 0 {
// UNWRAP: Need to justify this one.
// Should be safe because we've already done length checks, but I want
// to spend some more time thinking about it.
self.buffer self.buffer
.set_memory(head_index, bytes_read as usize, 0) .set_memory(head_index, bytes_read as usize, 0)
.unwrap(); .unwrap();
@ -278,6 +283,18 @@ where
Ok(messages_read) Ok(messages_read)
} }
/// Read messages from the ring buffer and dispatch to `handler`, up to `message_count_limit`
///
/// NOTE: The C++ API will stop reading and clean up if an exception is thrown in the handler
/// function; by contrast, the Rust API makes no attempt to catch panics and currently
/// has no way of stopping reading once started.
pub fn read<F>(&mut self, handler: F) -> Result<usize>
where
F: FnMut(i32, &A, IndexT, IndexT) -> (),
{
self.read_n(handler, usize::max_value())
}
/// Claim capacity for a specific message size in the ring buffer. Returns the offset/index /// Claim capacity for a specific message size in the ring buffer. Returns the offset/index
/// at which to start writing the next record. /// at which to start writing the next record.
fn claim_capacity(&mut self, required: IndexT) -> Result<IndexT> { fn claim_capacity(&mut self, required: IndexT) -> Result<IndexT> {
@ -390,10 +407,8 @@ where
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use crate::client::concurrent::ringbuffer::{record_descriptor, ManyToOneRingBuffer}; use crate::client::concurrent::ringbuffer::ManyToOneRingBuffer;
use crate::client::concurrent::AtomicBuffer; use crate::client::concurrent::AtomicBuffer;
use crate::util::IndexT;
use std::mem::size_of;
const BUFFER_SIZE: usize = 512 + super::buffer_descriptor::TRAILER_LENGTH as usize; const BUFFER_SIZE: usize = 512 + super::buffer_descriptor::TRAILER_LENGTH as usize;
@ -429,56 +444,4 @@ mod tests {
let write_start = ring_buf.claim_capacity(16).unwrap(); let write_start = ring_buf.claim_capacity(16).unwrap();
assert_eq!(write_start, 16); assert_eq!(write_start, 16);
} }
#[test]
fn read_basic() {
// Similar to write basic, put something into the buffer
let mut ring_buffer =
ManyToOneRingBuffer::new(vec![0u8; BUFFER_SIZE]).expect("Invalid buffer size");
let source_buffer = &mut [12u8, 0, 0, 0, 0, 0, 0, 0][..];
let source_len = source_buffer.len() as IndexT;
let type_id = 1;
ring_buffer
.write(type_id, &source_buffer, 0, source_len)
.unwrap();
// Now we can start the actual read process
let c = |_, buf: &Vec<u8>, offset, _| assert_eq!(buf.get_i64_volatile(offset).unwrap(), 12);
ring_buffer.read(c, 1).unwrap();
// Make sure that the buffer was zeroed on finish
for i in (0..record_descriptor::ALIGNMENT * 1).step_by(4) {
assert_eq!(ring_buffer.get_i32_volatile(i).unwrap(), 0);
}
}
#[test]
fn read_multiple() {
let mut ring_buffer =
ManyToOneRingBuffer::new(vec![0u8; BUFFER_SIZE]).expect("Invalid buffer size");
let source_buffer = &mut [12u8, 0, 0, 0, 0, 0, 0, 0][..];
let source_len = source_buffer.len() as IndexT;
let type_id = 1;
ring_buffer
.write(type_id, &source_buffer, 0, source_len)
.unwrap();
ring_buffer
.write(type_id, &source_buffer, 0, source_len)
.unwrap();
let mut msg_count = 0;
let c = |_, buf: &Vec<u8>, offset, _| {
msg_count += 1;
assert_eq!(buf.get_i64_volatile(offset).unwrap(), 12);
};
ring_buffer.read(c, 2).unwrap();
assert_eq!(msg_count, 2);
// Make sure that the buffer was zeroed on finish
for i in (0..record_descriptor::ALIGNMENT * 2).step_by(4) {
assert_eq!(ring_buffer.get_i32_volatile(i).unwrap(), 0);
}
}
} }

View File

@ -198,3 +198,184 @@ fn should_insert_padding_record_plus_message_on_buffer_wrap_with_head_equal_to_t
Ok((tail + aligned_record_length + record_descriptor::ALIGNMENT) as i64) Ok((tail + aligned_record_length + record_descriptor::ALIGNMENT) as i64)
); );
} }
#[test]
fn should_read_single_message() {
let length: IndexT = 8;
let head: IndexT = 0;
let record_length: IndexT = length + record_descriptor::HEADER_LENGTH;
let aligned_record_length: IndexT = align(
record_length as usize,
record_descriptor::ALIGNMENT as usize,
) as IndexT;
let tail: IndexT = aligned_record_length;
let mut buffer = ManyToOneRingBuffer::new(vec![0u8; BUFFER_SZ]).unwrap();
buffer.put_i64(HEAD_COUNTER_INDEX, head as i64).unwrap();
buffer.put_i64(TAIL_COUNTER_INDEX, tail as i64).unwrap();
buffer
.put_i32(record_descriptor::type_offset(0), MSG_TYPE_ID)
.unwrap();
buffer
.put_i32(record_descriptor::length_offset(0), record_length)
.unwrap();
let mut times_called = 0;
let closure = |_, _buf: &Vec<u8>, _, _| {
times_called += 1;
};
let messages_read = buffer.read(closure);
assert_eq!(messages_read, Ok(1));
assert_eq!(times_called, 1);
assert_eq!(
buffer.get_i64(HEAD_COUNTER_INDEX),
Ok((head + aligned_record_length) as i64)
);
for i in (0..record_descriptor::ALIGNMENT).step_by(4) {
assert_eq!(buffer.get_i32(i), Ok(0));
}
}
#[test]
fn should_not_read_single_message_part_way_through_writing() {
let length: IndexT = 8;
let head: IndexT = 0;
let record_length: IndexT = length + record_descriptor::HEADER_LENGTH;
let aligned_record_length: IndexT = align(
record_length as usize,
record_descriptor::ALIGNMENT as usize,
) as IndexT;
let end_tail: IndexT = aligned_record_length;
let mut buffer = ManyToOneRingBuffer::new(vec![0u8; BUFFER_SZ]).unwrap();
buffer.put_i64(TAIL_COUNTER_INDEX, end_tail as i64).unwrap();
buffer
.put_i32(record_descriptor::type_offset(0), MSG_TYPE_ID)
.unwrap();
buffer
.put_i32(record_descriptor::length_offset(0), -record_length)
.unwrap();
let mut times_called = 0;
let closure = |_, _buf: &Vec<u8>, _, _| {
times_called += 1;
};
let messages_read = buffer.read(closure);
assert_eq!(messages_read, Ok(0));
assert_eq!(times_called, 0);
assert_eq!(buffer.get_i64(HEAD_COUNTER_INDEX), Ok(head as i64));
}
#[test]
fn should_read_two_messages() {
let length: IndexT = 8;
let head: IndexT = 0;
let record_length: IndexT = length + record_descriptor::HEADER_LENGTH;
let aligned_record_length: IndexT = align(
record_length as usize,
record_descriptor::ALIGNMENT as usize,
) as IndexT;
let tail: IndexT = aligned_record_length * 2;
let mut buffer = ManyToOneRingBuffer::new(vec![0u8; BUFFER_SZ]).unwrap();
buffer.put_i64(HEAD_COUNTER_INDEX, head as i64).unwrap();
buffer.put_i64(TAIL_COUNTER_INDEX, tail as i64).unwrap();
buffer
.put_i32(record_descriptor::type_offset(0), MSG_TYPE_ID)
.unwrap();
buffer
.put_i32(record_descriptor::length_offset(0), record_length)
.unwrap();
buffer
.put_i32(
record_descriptor::type_offset(0 + aligned_record_length),
MSG_TYPE_ID,
)
.unwrap();
buffer
.put_i32(
record_descriptor::length_offset(0 + aligned_record_length),
record_length,
)
.unwrap();
let mut times_called = 0;
let closure = |_, _buf: &Vec<u8>, _, _| {
times_called += 1;
};
let messages_read = buffer.read(closure);
assert_eq!(messages_read, Ok(2));
assert_eq!(times_called, 2);
assert_eq!(
buffer.get_i64(HEAD_COUNTER_INDEX),
Ok((head + aligned_record_length * 2) as i64)
);
for i in (0..record_descriptor::ALIGNMENT * 2).step_by(4) {
assert_eq!(buffer.get_i32(i), Ok(0));
}
}
#[test]
fn should_limit_read_of_messages() {
let length: IndexT = 8;
let head: IndexT = 0;
let record_length: IndexT = length + record_descriptor::HEADER_LENGTH;
let aligned_record_length: IndexT = align(
record_length as usize,
record_descriptor::ALIGNMENT as usize,
) as IndexT;
let tail: IndexT = aligned_record_length * 2;
let mut buffer = ManyToOneRingBuffer::new(vec![0u8; BUFFER_SZ]).unwrap();
buffer.put_i64(HEAD_COUNTER_INDEX, head as i64).unwrap();
buffer.put_i64(TAIL_COUNTER_INDEX, tail as i64).unwrap();
buffer
.put_i32(record_descriptor::type_offset(0), MSG_TYPE_ID)
.unwrap();
buffer
.put_i32(record_descriptor::length_offset(0), record_length)
.unwrap();
buffer
.put_i32(
record_descriptor::type_offset(0 + aligned_record_length),
MSG_TYPE_ID,
)
.unwrap();
buffer
.put_i32(
record_descriptor::length_offset(0 + aligned_record_length),
record_length,
)
.unwrap();
let mut times_called = 0;
let closure = |_, _buf: &Vec<u8>, _, _| {
times_called += 1;
};
let messages_read = buffer.read_n(closure, 1);
assert_eq!(messages_read, Ok(1));
assert_eq!(times_called, 1);
assert_eq!(
buffer.get_i64(HEAD_COUNTER_INDEX),
Ok((head + aligned_record_length) as i64)
);
for i in (0..record_descriptor::ALIGNMENT).step_by(4) {
assert_eq!(buffer.get_i32(i), Ok(0));
}
assert_eq!(
buffer.get_i32(record_descriptor::length_offset(aligned_record_length)),
Ok(record_length)
);
}