1
0
mirror of https://github.com/bspeice/aeron-rs synced 2024-12-24 23:08:09 -05:00

Don't give the full buffer to handler functions

Much simpler from an API perspective, unlikely to be a real performance issue.
This commit is contained in:
Bradlee Speice 2019-11-02 20:34:36 -04:00
parent f6dedbe268
commit 79e8c26a23
2 changed files with 13 additions and 16 deletions

View File

@ -209,17 +209,15 @@ where
Ok(true) Ok(true)
} }
/// Read messages from the ring buffer and dispatch to `handler`, up to `message_count_limit` /// Read messages from the ring buffer and dispatch to `handler`, up to `message_count_limit`.
/// The handler is given the message type identifier and message body as arguments.
/// ///
/// NOTE: The C++ API will stop reading and clean up if an exception is thrown in the handler /// NOTE: The C++ API will stop reading and clean up if an exception is thrown in the handler
/// function; by contrast, the Rust API makes no attempt to catch panics and currently /// function; by contrast, the Rust API makes no attempt to catch panics and currently
/// has no way of stopping reading once started. /// has no way of stopping reading once started.
// QUESTION: Is there a better way to handle dispatching the handler function?
// We can't give it a `&dyn AtomicBuffer` because of the monomorphized generic functions,
// don't know if having a separate handler trait would be helpful.
pub fn read_n<F>(&mut self, mut handler: F, message_count_limit: usize) -> Result<usize> pub fn read_n<F>(&mut self, mut handler: F, message_count_limit: usize) -> Result<usize>
where where
F: FnMut(i32, &A, IndexT, IndexT) -> (), F: FnMut(i32, &[u8]) -> (),
{ {
let head = self.buffer.get_i64(self.head_position_index)?; let head = self.buffer.get_i64(self.head_position_index)?;
let head_index = (head & i64::from(self.capacity - 1)) as i32; let head_index = (head & i64::from(self.capacity - 1)) as i32;
@ -249,12 +247,10 @@ where
} }
messages_read += 1; messages_read += 1;
handler( let msg_start = record_descriptor::encoded_msg_offset(record_index) as usize;
msg_type_id, let msg_end =
&self.buffer, msg_start + (record_length - record_descriptor::HEADER_LENGTH) as usize;
record_descriptor::encoded_msg_offset(record_index), handler(msg_type_id, &self.buffer[msg_start..msg_end]);
record_length - record_descriptor::HEADER_LENGTH,
);
} }
Ok(()) Ok(())
})(); })();
@ -284,13 +280,14 @@ where
} }
/// Read messages from the ring buffer and dispatch to `handler`, up to `message_count_limit` /// Read messages from the ring buffer and dispatch to `handler`, up to `message_count_limit`
/// The handler is given the message type identifier and message body as arguments.
/// ///
/// NOTE: The C++ API will stop reading and clean up if an exception is thrown in the handler /// NOTE: The C++ API will stop reading and clean up if an exception is thrown in the handler
/// function; by contrast, the Rust API makes no attempt to catch panics and currently /// function; by contrast, the Rust API makes no attempt to catch panics and currently
/// has no way of stopping reading once started. /// has no way of stopping reading once started.
pub fn read<F>(&mut self, handler: F) -> Result<usize> pub fn read<F>(&mut self, handler: F) -> Result<usize>
where where
F: FnMut(i32, &A, IndexT, IndexT) -> (), F: FnMut(i32, &[u8]) -> (),
{ {
self.read_n(handler, usize::max_value()) self.read_n(handler, usize::max_value())
} }

View File

@ -222,7 +222,7 @@ fn should_read_single_message() {
.unwrap(); .unwrap();
let mut times_called = 0; let mut times_called = 0;
let closure = |_, _buf: &Vec<u8>, _, _| { let closure = |_, _: &[u8]| {
times_called += 1; times_called += 1;
}; };
let messages_read = buffer.read(closure); let messages_read = buffer.read(closure);
@ -260,7 +260,7 @@ fn should_not_read_single_message_part_way_through_writing() {
.unwrap(); .unwrap();
let mut times_called = 0; let mut times_called = 0;
let closure = |_, _buf: &Vec<u8>, _, _| { let closure = |_, _: &[u8]| {
times_called += 1; times_called += 1;
}; };
let messages_read = buffer.read(closure); let messages_read = buffer.read(closure);
@ -306,7 +306,7 @@ fn should_read_two_messages() {
.unwrap(); .unwrap();
let mut times_called = 0; let mut times_called = 0;
let closure = |_, _buf: &Vec<u8>, _, _| { let closure = |_, _: &[u8]| {
times_called += 1; times_called += 1;
}; };
let messages_read = buffer.read(closure); let messages_read = buffer.read(closure);
@ -359,7 +359,7 @@ fn should_limit_read_of_messages() {
.unwrap(); .unwrap();
let mut times_called = 0; let mut times_called = 0;
let closure = |_, _buf: &Vec<u8>, _, _| { let closure = |_, _: &[u8]| {
times_called += 1; times_called += 1;
}; };
let messages_read = buffer.read_n(closure, 1); let messages_read = buffer.read_n(closure, 1);