diff --git a/aeron-rs/src/client/concurrent/ringbuffer.rs b/aeron-rs/src/client/concurrent/ringbuffer.rs index 51ef93e..68768b4 100644 --- a/aeron-rs/src/client/concurrent/ringbuffer.rs +++ b/aeron-rs/src/client/concurrent/ringbuffer.rs @@ -209,17 +209,15 @@ where Ok(true) } - /// Read messages from the ring buffer and dispatch to `handler`, up to `message_count_limit` + /// Read messages from the ring buffer and dispatch to `handler`, up to `message_count_limit`. + /// The handler is given the message type identifier and message body as arguments. /// /// NOTE: The C++ API will stop reading and clean up if an exception is thrown in the handler /// function; by contrast, the Rust API makes no attempt to catch panics and currently /// has no way of stopping reading once started. - // QUESTION: Is there a better way to handle dispatching the handler function? - // We can't give it a `&dyn AtomicBuffer` because of the monomorphized generic functions, - // don't know if having a separate handler trait would be helpful. pub fn read_n(&mut self, mut handler: F, message_count_limit: usize) -> Result where - F: FnMut(i32, &A, IndexT, IndexT) -> (), + F: FnMut(i32, &[u8]) -> (), { let head = self.buffer.get_i64(self.head_position_index)?; let head_index = (head & i64::from(self.capacity - 1)) as i32; @@ -249,12 +247,10 @@ where } messages_read += 1; - handler( - msg_type_id, - &self.buffer, - record_descriptor::encoded_msg_offset(record_index), - record_length - record_descriptor::HEADER_LENGTH, - ); + let msg_start = record_descriptor::encoded_msg_offset(record_index) as usize; + let msg_end = + msg_start + (record_length - record_descriptor::HEADER_LENGTH) as usize; + handler(msg_type_id, &self.buffer[msg_start..msg_end]); } Ok(()) })(); @@ -284,13 +280,14 @@ where } /// Read messages from the ring buffer and dispatch to `handler`, up to `message_count_limit` + /// The handler is given the message type identifier and message body as arguments. /// /// NOTE: The C++ API will stop reading and clean up if an exception is thrown in the handler /// function; by contrast, the Rust API makes no attempt to catch panics and currently /// has no way of stopping reading once started. pub fn read(&mut self, handler: F) -> Result where - F: FnMut(i32, &A, IndexT, IndexT) -> (), + F: FnMut(i32, &[u8]) -> (), { self.read_n(handler, usize::max_value()) } diff --git a/aeron-rs/tests/many_to_one_ring_buffer.rs b/aeron-rs/tests/many_to_one_ring_buffer.rs index 58d11ad..0d3d272 100644 --- a/aeron-rs/tests/many_to_one_ring_buffer.rs +++ b/aeron-rs/tests/many_to_one_ring_buffer.rs @@ -222,7 +222,7 @@ fn should_read_single_message() { .unwrap(); let mut times_called = 0; - let closure = |_, _buf: &Vec, _, _| { + let closure = |_, _: &[u8]| { times_called += 1; }; let messages_read = buffer.read(closure); @@ -260,7 +260,7 @@ fn should_not_read_single_message_part_way_through_writing() { .unwrap(); let mut times_called = 0; - let closure = |_, _buf: &Vec, _, _| { + let closure = |_, _: &[u8]| { times_called += 1; }; let messages_read = buffer.read(closure); @@ -306,7 +306,7 @@ fn should_read_two_messages() { .unwrap(); let mut times_called = 0; - let closure = |_, _buf: &Vec, _, _| { + let closure = |_, _: &[u8]| { times_called += 1; }; let messages_read = buffer.read(closure); @@ -359,7 +359,7 @@ fn should_limit_read_of_messages() { .unwrap(); let mut times_called = 0; - let closure = |_, _buf: &Vec, _, _| { + let closure = |_, _: &[u8]| { times_called += 1; }; let messages_read = buffer.read_n(closure, 1);