1
0
mirror of https://github.com/bspeice/aeron-rs synced 2024-12-21 21:38:09 -05:00

read now working again

This commit is contained in:
Bradlee Speice 2019-11-02 15:50:23 -04:00
parent ed766ce86b
commit 8fac817ba3
2 changed files with 18 additions and 22 deletions

View File

@ -188,6 +188,13 @@ pub trait AtomicBuffer: Deref<Target = [u8]> + DerefMut<Target = [u8]> {
Ok(()) Ok(())
} }
/// Repeatedly write a value into an atomic buffer. Guaranteed to use `memset`.
fn set_memory(&mut self, offset: IndexT, length: usize, value: u8) -> Result<()> {
self.bounds_check(offset, length as IndexT).map(|_| unsafe {
self.as_mut_ptr().offset(offset as isize).write_bytes(value, length)
})
}
/// Perform a volatile read of an `i32` from the buffer /// Perform a volatile read of an `i32` from the buffer
/// ///
/// ```rust /// ```rust

View File

@ -192,14 +192,14 @@ where
Ok(()) Ok(())
} }
/*
/// Read messages from the ring buffer and dispatch to `handler`, up to `message_count_limit` /// Read messages from the ring buffer and dispatch to `handler`, up to `message_count_limit`
pub fn read<F>(&mut self, mut handler: F, message_count_limit: usize) -> Result<usize> pub fn read<F>(&mut self, mut handler: F, message_count_limit: usize) -> Result<usize>
where where
F: FnMut(i32, &A, IndexT, IndexT) -> (), F: FnMut(i32, &A, IndexT, IndexT) -> (),
{ {
// QUESTION: Should I implement the `get_i64` method that C++ uses?
// UNWRAP: Bounds check performed during buffer creation // UNWRAP: Bounds check performed during buffer creation
let head = self.buffer.get_i64(self.head_position_index).unwrap(); let head = self.buffer.get_i64_volatile(self.head_position_index).unwrap();
let head_index = (head & i64::from(self.capacity - 1)) as i32; let head_index = (head & i64::from(self.capacity - 1)) as i32;
let contiguous_block_length = self.capacity - head_index; let contiguous_block_length = self.capacity - head_index;
let mut messages_read = 0; let mut messages_read = 0;
@ -257,7 +257,6 @@ where
Ok(messages_read) Ok(messages_read)
} }
*/
/// Claim capacity for a specific message size in the ring buffer. Returns the offset/index /// Claim capacity for a specific message size in the ring buffer. Returns the offset/index
/// at which to start writing the next record. /// at which to start writing the next record.
@ -381,7 +380,7 @@ where
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use crate::client::concurrent::ringbuffer::{ use crate::client::concurrent::ringbuffer::{
buffer_descriptor, record_descriptor, ManyToOneRingBuffer, record_descriptor, ManyToOneRingBuffer,
}; };
use crate::client::concurrent::AtomicBuffer; use crate::client::concurrent::AtomicBuffer;
use crate::util::IndexT; use crate::util::IndexT;
@ -447,43 +446,36 @@ mod tests {
); );
} }
/*
#[test] #[test]
fn read_basic() { fn read_basic() {
// Similar to write basic, put something into the buffer // Similar to write basic, put something into the buffer
let mut bytes = vec![0u8; 512 + buffer_descriptor::TRAILER_LENGTH as usize]; let mut ring_buffer = ManyToOneRingBuffer::new(vec![0u8; BUFFER_SIZE]).expect("Invalid buffer size");
let buffer = AtomicBuffer::wrap(&mut bytes);
let mut ring_buffer = ManyToOneRingBuffer::wrap(buffer).expect("Invalid buffer size");
let mut source_buffer = &mut [12u8, 0, 0, 0, 0, 0, 0, 0][..]; let mut source_buffer = &mut [12u8, 0, 0, 0, 0, 0, 0, 0][..];
let source_len = source_bytes.len() as IndexT; let source_len = source_buffer.len() as IndexT;
let type_id = 1; let type_id = 1;
ring_buffer ring_buffer
.write(type_id, &source_buffer, 0, source_len) .write(type_id, &source_buffer, 0, source_len)
.unwrap(); .unwrap();
// Now we can start the actual read process // Now we can start the actual read process
let c = |_, buf: &dyn AtomicBuffer, offset, _| { let c = |_, buf: &Vec<u8>, offset, _| {
assert_eq!(buf.get_i64_volatile(offset).unwrap(), 12) assert_eq!(buf.get_i64_volatile(offset).unwrap(), 12)
}; };
ring_buffer.read(c, 1).unwrap(); ring_buffer.read(c, 1).unwrap();
// Make sure that the buffer was zeroed on finish // Make sure that the buffer was zeroed on finish
drop(ring_buffer);
let buffer = AtomicBuffer::wrap(&mut bytes);
for i in (0..record_descriptor::ALIGNMENT * 1).step_by(4) { for i in (0..record_descriptor::ALIGNMENT * 1).step_by(4) {
assert_eq!(buffer.get_i32(i).unwrap(), 0); assert_eq!(ring_buffer.get_i32_volatile(i).unwrap(), 0);
} }
} }
#[test] #[test]
fn read_multiple() { fn read_multiple() {
let mut bytes = vec![0u8; 512 + buffer_descriptor::TRAILER_LENGTH as usize]; let mut ring_buffer = ManyToOneRingBuffer::new(vec![0u8; BUFFER_SIZE]).expect("Invalid buffer size");
let buffer = AtomicBuffer::wrap(&mut bytes);
let mut ring_buffer = ManyToOneRingBuffer::wrap(buffer).expect("Invalid buffer size");
let mut source_buffer = &mut [12u8, 0, 0, 0, 0, 0, 0, 0][..]; let mut source_buffer = &mut [12u8, 0, 0, 0, 0, 0, 0, 0][..];
let source_len = source_bytes.len() as IndexT; let source_len = source_buffer.len() as IndexT;
let type_id = 1; let type_id = 1;
ring_buffer ring_buffer
.write(type_id, &source_buffer, 0, source_len) .write(type_id, &source_buffer, 0, source_len)
@ -493,7 +485,7 @@ mod tests {
.unwrap(); .unwrap();
let mut msg_count = 0; let mut msg_count = 0;
let c = |_, buf: &dyn AtomicBuffer, offset, _| { let c = |_, buf: &Vec<u8>, offset, _| {
msg_count += 1; msg_count += 1;
assert_eq!(buf.get_i64_volatile(offset).unwrap(), 12); assert_eq!(buf.get_i64_volatile(offset).unwrap(), 12);
}; };
@ -501,11 +493,8 @@ mod tests {
assert_eq!(msg_count, 2); assert_eq!(msg_count, 2);
// Make sure that the buffer was zeroed on finish // Make sure that the buffer was zeroed on finish
drop(ring_buffer);
let buffer = AtomicBuffer::wrap(&mut bytes);
for i in (0..record_descriptor::ALIGNMENT * 2).step_by(4) { for i in (0..record_descriptor::ALIGNMENT * 2).step_by(4) {
assert_eq!(buffer.get_i32(i).unwrap(), 0); assert_eq!(ring_buffer.get_i32_volatile(i).unwrap(), 0);
} }
} }
*/
} }