From ed766ce86bbfdf64ffedfd2e8e1216593e4c6dc3 Mon Sep 17 00:00:00 2001 From: Bradlee Speice Date: Sat, 2 Nov 2019 15:40:52 -0400 Subject: [PATCH] `write` now working again as well --- aeron-rs/src/client/concurrent/mod.rs | 45 ++++++++++++++++++ aeron-rs/src/client/concurrent/ringbuffer.rs | 48 ++++++++++++-------- 2 files changed, 73 insertions(+), 20 deletions(-) diff --git a/aeron-rs/src/client/concurrent/mod.rs b/aeron-rs/src/client/concurrent/mod.rs index e4339fc..679163e 100644 --- a/aeron-rs/src/client/concurrent/mod.rs +++ b/aeron-rs/src/client/concurrent/mod.rs @@ -165,6 +165,51 @@ pub trait AtomicBuffer: Deref + DerefMut { fn put_i64_ordered(&mut self, offset: IndexT, value: i64) -> Result<()> { self.write_volatile::(offset, value) } + + /// Write the contents of one buffer to another. Does not perform any synchronization + fn put_bytes( + &mut self, + index: IndexT, + source: &B, + source_index: IndexT, + len: IndexT, + ) -> Result<()> + where + B: AtomicBuffer, + { + self.bounds_check(index, len)?; + source.bounds_check(source_index, len)?; + + let index = index as usize; + let source_index = source_index as usize; + let len = len as usize; + + self[index..index + len].copy_from_slice(&source[source_index..source_index + len]); + Ok(()) + } + + /// Perform a volatile read of an `i32` from the buffer + /// + /// ```rust + /// # use aeron_rs::client::concurrent::AtomicBuffer; + /// let buffer = vec![0, 12, 0, 0, 0]; + /// assert_eq!(buffer.get_i32_volatile(1), Ok(12)); + /// ``` + fn get_i32_volatile(&self, offset: IndexT) -> Result { + self.overlay_volatile::(offset) + } + + /// Perform a volatile write of an `i32` into the buffer + /// + /// ```rust + /// # use aeron_rs::client::concurrent::AtomicBuffer; + /// let mut bytes = vec![0u8; 4]; + /// bytes.put_i32_ordered(0, 12); + /// assert_eq!(bytes.get_i32_volatile(0), Ok(12)); + /// ``` + fn put_i32_ordered(&mut self, offset: IndexT, value: i32) -> Result<()> { + self.write_volatile::(offset, value) + } } impl AtomicBuffer for Vec {} diff --git a/aeron-rs/src/client/concurrent/ringbuffer.rs b/aeron-rs/src/client/concurrent/ringbuffer.rs index e74d577..016dcdc 100644 --- a/aeron-rs/src/client/concurrent/ringbuffer.rs +++ b/aeron-rs/src/client/concurrent/ringbuffer.rs @@ -2,6 +2,7 @@ use crate::client::concurrent::AtomicBuffer; use crate::util::bit::align; use crate::util::{bit, AeronError, IndexT, Result}; +use std::ops::Deref; /// Description of the Ring Buffer schema. pub mod buffer_descriptor { @@ -149,7 +150,6 @@ where .unwrap() } - /* /// Write a message into the ring buffer pub fn write( &mut self, @@ -159,7 +159,7 @@ where length: IndexT, ) -> Result<()> where - B: AtomicBuffer + B: AtomicBuffer, { record_descriptor::check_msg_type_id(msg_type_id)?; self.check_msg_length(length)?; @@ -192,6 +192,7 @@ where Ok(()) } + /* /// Read messages from the ring buffer and dispatch to `handler`, up to `message_count_limit` pub fn read(&mut self, mut handler: F, message_count_limit: usize) -> Result where @@ -366,20 +367,31 @@ where } } +impl Deref for ManyToOneRingBuffer +where + A: AtomicBuffer, +{ + type Target = A; + + fn deref(&self) -> &Self::Target { + &self.buffer + } +} + #[cfg(test)] mod tests { - use crate::client::concurrent::AtomicBuffer; use crate::client::concurrent::ringbuffer::{ buffer_descriptor, record_descriptor, ManyToOneRingBuffer, }; + use crate::client::concurrent::AtomicBuffer; use crate::util::IndexT; use std::mem::size_of; + const BUFFER_SIZE: usize = 512 + super::buffer_descriptor::TRAILER_LENGTH as usize; + #[test] fn claim_capacity_owned() { - let buf_size = super::buffer_descriptor::TRAILER_LENGTH as usize + 64; - let mut buf = vec![0u8; buf_size]; - let mut ring_buf = ManyToOneRingBuffer::new(buf).unwrap(); + let mut ring_buf = ManyToOneRingBuffer::new(vec![0u8; BUFFER_SIZE]).unwrap(); ring_buf.claim_capacity(16).unwrap(); assert_eq!( @@ -393,11 +405,9 @@ mod tests { assert_eq!(write_start, 16); } - const TEST_BUFFER_SIZE: usize = super::buffer_descriptor::TRAILER_LENGTH as usize + 64; - #[test] fn claim_capacity_shared() { - let mut buf = &mut [0u8; TEST_BUFFER_SIZE][..]; + let buf = &mut [0u8; BUFFER_SIZE][..]; let mut ring_buf = ManyToOneRingBuffer::new(buf).unwrap(); ring_buf.claim_capacity(16).unwrap(); @@ -412,34 +422,32 @@ mod tests { assert_eq!(write_start, 16); } - /* #[test] fn write_basic() { - let mut bytes = vec![0u8; 512 + buffer_descriptor::TRAILER_LENGTH as usize]; - let buffer = AtomicBuffer::wrap(&mut bytes); - let mut ring_buffer = ManyToOneRingBuffer::wrap(buffer).expect("Invalid buffer size"); + let mut ring_buffer = + ManyToOneRingBuffer::new(vec![0u8; BUFFER_SIZE]).expect("Invalid buffer size"); - let mut source_bytes = [12, 0, 0, 0, 0, 0, 0, 0]; + let source_bytes = &mut [12u8, 0, 0, 0][..]; let source_len = source_bytes.len() as IndexT; - let source_buffer = AtomicBuffer::wrap(&mut source_bytes); let type_id = 1; ring_buffer - .write(type_id, &source_buffer, 0, source_len) + .write(type_id, &source_bytes, 0, source_len) .unwrap(); - drop(ring_buffer); - let buffer = AtomicBuffer::wrap(&mut bytes); let record_len = source_len + record_descriptor::HEADER_LENGTH; assert_eq!( - buffer.get_i64_volatile(0).unwrap(), + ring_buffer.get_i64_volatile(0).unwrap(), record_descriptor::make_header(record_len, type_id) ); assert_eq!( - buffer.get_i64_volatile(size_of::() as IndexT).unwrap(), + ring_buffer + .get_i64_volatile(size_of::() as IndexT) + .unwrap(), 12 ); } + /* #[test] fn read_basic() { // Similar to write basic, put something into the buffer