diff --git a/Cargo.toml b/Cargo.toml index a8ab2f4..b999168 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,6 +14,7 @@ maintenance = { status = "actively-developed" } [dependencies] aeron_driver-sys = { path = "./aeron_driver-sys" } memmap = "0.7" +num = "0.2" [dev-dependencies] clap = "2.33" diff --git a/src/client/concurrent/atomic_buffer.rs b/src/client/concurrent/atomic_buffer.rs index 9c0abfc..c130a43 100644 --- a/src/client/concurrent/atomic_buffer.rs +++ b/src/client/concurrent/atomic_buffer.rs @@ -26,8 +26,8 @@ impl<'a> AtomicBuffer<'a> { AtomicBuffer { buffer } } - fn bounds_check(&self, offset: IndexT) -> Result<()> { - if offset < 0 || self.buffer.len() - (offset as usize) < size_of::() { + fn bounds_check(&self, offset: IndexT, size: IndexT) -> Result<()> { + if offset < 0 || size < 0 || self.buffer.len() as IndexT - offset < size { Err(AeronError::OutOfBounds) } else { Ok(()) @@ -39,52 +39,164 @@ impl<'a> AtomicBuffer<'a> { where T: Sized, { - self.bounds_check::(offset).map(|_| { - let offset_ptr = unsafe { self.buffer.as_ptr().offset(offset as isize) }; - unsafe { &*(offset_ptr as *const T) } - }) + self.bounds_check(offset, size_of::() as IndexT) + .map(|_| { + let offset_ptr = unsafe { self.buffer.as_ptr().offset(offset as isize) }; + unsafe { &*(offset_ptr as *const T) } + }) } fn overlay_volatile(&self, offset: IndexT) -> Result where T: Copy, { - self.bounds_check::(offset).map(|_| { - let offset_ptr = unsafe { self.buffer.as_ptr().offset(offset as isize) }; - unsafe { read_volatile(offset_ptr as *const T) } - }) + self.bounds_check(offset, size_of::() as IndexT) + .map(|_| { + let offset_ptr = unsafe { self.buffer.as_ptr().offset(offset as isize) }; + unsafe { read_volatile(offset_ptr as *const T) } + }) } fn write_volatile(&mut self, offset: IndexT, val: T) -> Result<()> where T: Copy, { - self.bounds_check::(offset).map(|_| { - let offset_ptr = unsafe { self.buffer.as_mut_ptr().offset(offset as isize) }; - unsafe { write_volatile(offset_ptr as *mut T, val) }; - }) + self.bounds_check(offset, size_of::() as IndexT) + .map(|_| { + let offset_ptr = unsafe { self.buffer.as_mut_ptr().offset(offset as isize) }; + unsafe { write_volatile(offset_ptr as *mut T, val) }; + }) } /// Atomically fetch the current value at an offset, and increment by delta + /// + /// ```rust + /// # use aeron_rs::client::concurrent::atomic_buffer::AtomicBuffer; + /// # use aeron_rs::util::AeronError; + /// let mut bytes = [0u8; 9]; + /// let mut buffer = AtomicBuffer::wrap(&mut bytes); + /// + /// // Simple case modifies only the first byte + /// assert_eq!(buffer.get_and_add_i64(0, 1), Ok(0)); + /// assert_eq!(buffer.get_and_add_i64(0, 0), Ok(1)); + /// + /// // Using an offset modifies the second byte + /// assert_eq!(buffer.get_and_add_i64(1, 1), Ok(0)); + /// assert_eq!(buffer.get_and_add_i64(1, 0), Ok(1)); + /// + /// // An offset of 2 means buffer size must be 10 to contain an `i64` + /// assert_eq!(buffer.get_and_add_i64(2, 0), Err(AeronError::OutOfBounds)); + /// ``` pub fn get_and_add_i64(&self, offset: IndexT, delta: i64) -> Result { self.overlay::(offset) .map(|a| a.fetch_add(delta, Ordering::SeqCst)) } /// Perform a volatile read + /// + /// ```rust + /// # use aeron_rs::client::concurrent::atomic_buffer::AtomicBuffer; + /// let mut bytes = [12, 0, 0, 0, 0, 0, 0, 0]; + /// let buffer = AtomicBuffer::wrap(&mut bytes); + /// + /// assert_eq!(buffer.get_i64_volatile(0), Ok(12)); + /// ``` pub fn get_i64_volatile(&self, offset: IndexT) -> Result { // QUESTION: Would it be better to express this in terms of an atomic read? self.overlay_volatile::(offset) } - /// Perform a volatile write into the buffer + /// Perform a volatile read + /// + /// ```rust + /// # use aeron_rs::client::concurrent::atomic_buffer::AtomicBuffer; + /// let mut bytes = [12, 0, 0, 0]; + /// let buffer = AtomicBuffer::wrap(&mut bytes); + /// + /// assert_eq!(buffer.get_i32_volatile(0), Ok(12)); + /// ``` + pub fn get_i32_volatile(&self, offset: IndexT) -> Result { + self.overlay_volatile::(offset) + } + + /// Perform a volatile write of an `i64` into the buffer + /// + /// ```rust + /// # use aeron_rs::client::concurrent::atomic_buffer::AtomicBuffer; + /// let mut bytes = [0u8; 8]; + /// let mut buffer = AtomicBuffer::wrap(&mut bytes); + /// + /// buffer.put_i64_ordered(0, 12); + /// assert_eq!(buffer.get_i64_volatile(0), Ok(12)); + /// ``` pub fn put_i64_ordered(&mut self, offset: IndexT, val: i64) -> Result<()> { + // QUESTION: Would it be better to have callers use `write_volatile` directly self.write_volatile::(offset, val) } + /// Perform a volatile write of an `i32` into the buffer + /// + /// ```rust + /// # use aeron_rs::client::concurrent::atomic_buffer::AtomicBuffer; + /// let mut bytes = [0u8; 4]; + /// let mut buffer = AtomicBuffer::wrap(&mut bytes); + /// + /// buffer.put_i32_ordered(0, 12); + /// assert_eq!(buffer.get_i32_volatile(0), Ok(12)); + /// ``` + pub fn put_i32_ordered(&mut self, offset: IndexT, val: i32) -> Result<()> { + // QUESTION: Would it be better to have callers use `write_volatile` directly + self.write_volatile::(offset, val) + } + + /// Write the contents of one buffer to another. Does not perform any synchronization. + /// + /// ```rust + /// # use aeron_rs::client::concurrent::atomic_buffer::AtomicBuffer; + /// let mut source_bytes = [1u8, 2, 3, 4]; + /// let source = AtomicBuffer::wrap(&mut source_bytes); + /// + /// let mut dest_bytes = [0, 0, 0, 0]; + /// let mut dest = AtomicBuffer::wrap(&mut dest_bytes); + /// + /// dest.put_bytes(1, &source, 1, 3); + /// drop(dest); + /// assert_eq!(dest_bytes, [0u8, 2, 3, 4]); + /// ``` + pub fn put_bytes( + &mut self, + index: IndexT, + source: &AtomicBuffer, + source_index: IndexT, + len: IndexT, + ) -> Result<()> { + self.bounds_check(index, len)?; + source.bounds_check(source_index, len)?; + + let index = index as usize; + let source_index = source_index as usize; + let len = len as usize; + self.buffer[index..index + len].copy_from_slice(&source[source_index..source_index + len]); + Ok(()) + } + /// Compare an expected value with what is in memory, and if it matches, /// update to a new value. Returns `Ok(true)` if the update was successful, /// and `Ok(false)` if the update failed. + /// + /// ```rust + /// # use aeron_rs::client::concurrent::atomic_buffer::AtomicBuffer; + /// let mut buf = [0u8; 8]; + /// let atomic_buf = AtomicBuffer::wrap(&mut buf); + /// // Set value to 1 + /// atomic_buf.get_and_add_i64(0, 1).unwrap(); + /// + /// // Set value to 1 if existing value is 0 + /// assert_eq!(atomic_buf.compare_and_set_i64(0, 0, 1), Ok(false)); + /// // Set value to 2 if existing value is 1 + /// assert_eq!(atomic_buf.compare_and_set_i64(0, 1, 2), Ok(true)); + /// assert_eq!(atomic_buf.get_i64_volatile(0), Ok(2)); + /// ``` pub fn compare_and_set_i64(&self, offset: IndexT, expected: i64, update: i64) -> Result { // QUESTION: Do I need a volatile and atomic read here? // Aeron C++ uses a volatile read before the atomic operation, but I think that @@ -126,46 +238,6 @@ mod tests { assert_eq!(value, (*a).load(Ordering::SeqCst)); } - #[test] - fn atomic_i64_increment() { - let mut buf = [16, 0, 0, 0, 0, 0, 0, 0]; - - let atomic_buf = AtomicBuffer::wrap(&mut buf[..]); - assert_eq!(atomic_buf.get_and_add_i64(0, 1), Ok(16)); - assert_eq!(atomic_buf.get_and_add_i64(0, 0), Ok(17)); - } - - #[test] - fn atomic_i64_increment_offset() { - let mut buf = [0, 16, 0, 0, 0, 0, 0, 0, 0]; - - let atomic_buf = AtomicBuffer::wrap(&mut buf[..]); - assert_eq!(atomic_buf.get_and_add_i64(1, 1), Ok(16)); - assert_eq!(atomic_buf.get_and_add_i64(1, 0), Ok(17)); - } - - #[test] - fn out_of_bounds() { - let mut buf = [16, 0, 0, 0, 0, 0, 0]; - - let atomic_buf = AtomicBuffer::wrap(&mut buf); - assert_eq!( - atomic_buf.get_and_add_i64(0, 0), - Err(AeronError::OutOfBounds) - ); - } - - #[test] - fn out_of_bounds_offset() { - let mut buf = [16, 0, 0, 0, 0, 0, 0, 0]; - - let atomic_buf = AtomicBuffer::wrap(&mut buf); - assert_eq!( - atomic_buf.get_and_add_i64(1, 0), - Err(AeronError::OutOfBounds) - ); - } - #[test] fn negative_offset() { let mut buf = [16, 0, 0, 0, 0, 0, 0, 0]; @@ -175,25 +247,4 @@ mod tests { Err(AeronError::OutOfBounds) ) } - - #[test] - fn put_i64() { - let mut buf = [0u8; 8]; - let mut atomic_buf = AtomicBuffer::wrap(&mut buf); - - atomic_buf.put_i64_ordered(0, 12).unwrap(); - assert_eq!(atomic_buf.get_i64_volatile(0), Ok(12)) - } - - #[test] - fn compare_set_i64() { - let mut buf = [0u8; 8]; - let atomic_buf = AtomicBuffer::wrap(&mut buf); - - atomic_buf.get_and_add_i64(0, 1).unwrap(); - - assert_eq!(atomic_buf.compare_and_set_i64(0, 0, 1), Ok(false)); - assert_eq!(atomic_buf.compare_and_set_i64(0, 1, 2), Ok(true)); - assert_eq!(atomic_buf.get_i64_volatile(0), Ok(2)); - } } diff --git a/src/client/concurrent/ring_buffer.rs b/src/client/concurrent/ring_buffer.rs index 2a9bdd9..c6d88e8 100644 --- a/src/client/concurrent/ring_buffer.rs +++ b/src/client/concurrent/ring_buffer.rs @@ -1,12 +1,13 @@ //! Ring buffer wrapper for communicating with the Media Driver use crate::client::concurrent::atomic_buffer::AtomicBuffer; -use crate::util::{AeronError, IndexT, Result}; +use crate::util::{bit, AeronError, IndexT, Result}; /// Description of the Ring Buffer schema. pub mod buffer_descriptor { use crate::client::concurrent::atomic_buffer::AtomicBuffer; + use crate::util::bit::is_power_of_two; use crate::util::AeronError::IllegalArgument; - use crate::util::{is_power_of_two, IndexT, Result, CACHE_LINE_LENGTH}; + use crate::util::{IndexT, Result, CACHE_LINE_LENGTH}; // QUESTION: Why are these offsets so large when we only ever use i64 types? @@ -57,12 +58,17 @@ pub mod buffer_descriptor { /// +---------------------------------------------------------------+ /// ``` pub mod record_descriptor { - use crate::util::IndexT; use std::mem::size_of; + use crate::util::Result; + use crate::util::{AeronError, IndexT}; + /// Size of the ring buffer record header. pub const HEADER_LENGTH: IndexT = size_of::() as IndexT * 2; + /// Alignment size of records written to the buffer + pub const ALIGNMENT: IndexT = HEADER_LENGTH; + /// Message type indicating to the media driver that space has been reserved, /// and is not yet ready for processing. pub const PADDING_MSG_TYPE_ID: i32 = -1; @@ -73,16 +79,36 @@ pub mod record_descriptor { // Smells like Java. ((i64::from(msg_type_id) & 0xFFFF_FFFF) << 32) | (i64::from(length) & 0xFFFF_FFFF) } + + /// Verify a message type identifier is safe for use + pub fn check_msg_type_id(msg_type_id: i32) -> Result<()> { + if msg_type_id < 1 { + Err(AeronError::IllegalArgument) + } else { + Ok(()) + } + } + + /// Fetch the offset to begin writing a message payload + pub fn encoded_msg_offset(record_offset: IndexT) -> IndexT { + record_offset + HEADER_LENGTH + } + + /// Fetch the offset to begin writing the message length + pub fn length_offset(record_offset: IndexT) -> IndexT { + record_offset + } } /// Multi-producer, single-consumer ring buffer implementation. pub struct ManyToOneRingBuffer<'a> { buffer: AtomicBuffer<'a>, capacity: IndexT, + max_msg_length: IndexT, tail_position_index: IndexT, head_cache_position_index: IndexT, head_position_index: IndexT, - _correlation_id_counter_index: IndexT, + correlation_id_counter_index: IndexT, } impl<'a> ManyToOneRingBuffer<'a> { @@ -91,17 +117,65 @@ impl<'a> ManyToOneRingBuffer<'a> { buffer_descriptor::check_capacity(&buffer).map(|capacity| ManyToOneRingBuffer { buffer, capacity, + max_msg_length: capacity / 8, tail_position_index: capacity + buffer_descriptor::TAIL_POSITION_OFFSET, head_cache_position_index: capacity + buffer_descriptor::HEAD_CACHE_POSITION_OFFSET, head_position_index: capacity + buffer_descriptor::HEAD_POSITION_OFFSET, - _correlation_id_counter_index: capacity + buffer_descriptor::CORRELATION_COUNTER_OFFSET, + correlation_id_counter_index: capacity + buffer_descriptor::CORRELATION_COUNTER_OFFSET, }) } + /// Atomically retrieve the next correlation identifier. Used as a unique identifier for + /// interactions with the Media Driver + pub fn next_correlation_id(&self) -> i64 { + // UNWRAP: Known-valid offset calculated during initialization + self.buffer + .get_and_add_i64(self.correlation_id_counter_index, 1) + .unwrap() + } + + /// Write a message into the ring buffer + pub fn write( + &mut self, + msg_type_id: i32, + source: &AtomicBuffer, + source_index: IndexT, + length: IndexT, + ) -> Result<()> { + record_descriptor::check_msg_type_id(msg_type_id)?; + self.check_msg_length(length)?; + + let record_len = length + record_descriptor::HEADER_LENGTH; + let required = bit::align(record_len, record_descriptor::ALIGNMENT); + let record_index = self.claim_capacity(required)?; + + // UNWRAP: `claim_capacity` performed bounds checking + self.buffer + .put_i64_ordered( + record_index, + record_descriptor::make_header(-length, msg_type_id), + ) + .unwrap(); + // UNWRAP: `claim_capacity` performed bounds checking + self.buffer + .put_bytes( + record_descriptor::encoded_msg_offset(record_index), + source, + source_index, + length, + ) + .unwrap(); + // UNWRAP: `claim_capacity` performed bounds checking + self.buffer + .put_i32_ordered(record_descriptor::length_offset(record_index), record_len) + .unwrap(); + + Ok(()) + } + /// Claim capacity for a specific message size in the ring buffer. Returns the offset/index /// at which to start writing the next record. - // TODO: Shouldn't be `pub`, just trying to avoid warnings - pub fn claim_capacity(&mut self, required: IndexT) -> Result { + fn claim_capacity(&mut self, required: IndexT) -> Result { // QUESTION: Is this mask how we handle the "ring" in ring buffer? // Would explain why we assert buffer capacity is a power of two during initialization let mask = self.capacity - 1; @@ -199,15 +273,27 @@ impl<'a> ManyToOneRingBuffer<'a> { Ok(tail_index) } + + fn check_msg_length(&self, length: IndexT) -> Result<()> { + if length > self.max_msg_length { + Err(AeronError::IllegalArgument) + } else { + Ok(()) + } + } } #[cfg(test)] mod tests { use crate::client::concurrent::atomic_buffer::AtomicBuffer; - use crate::client::concurrent::ring_buffer::ManyToOneRingBuffer; + use crate::client::concurrent::ring_buffer::{ + buffer_descriptor, record_descriptor, ManyToOneRingBuffer, + }; + use crate::util::IndexT; + use std::mem::size_of; #[test] - fn basic_claim_space() { + fn claim_capacity_basic() { let buf_size = super::buffer_descriptor::TRAILER_LENGTH as usize + 64; let mut buf = vec![0u8; buf_size]; @@ -225,4 +311,31 @@ mod tests { let write_start = ring_buf.claim_capacity(16).unwrap(); assert_eq!(write_start, 16); } + + #[test] + fn write_basic() { + let mut bytes = vec![0u8; 512 + buffer_descriptor::TRAILER_LENGTH as usize]; + let buffer = AtomicBuffer::wrap(&mut bytes); + let mut ring_buffer = ManyToOneRingBuffer::wrap(buffer).expect("Invalid buffer size"); + + let mut source_bytes = [12, 0, 0, 0, 0, 0, 0, 0]; + let source_len = source_bytes.len() as IndexT; + let source_buffer = AtomicBuffer::wrap(&mut source_bytes); + let type_id = 1; + ring_buffer + .write(type_id, &source_buffer, 0, source_len) + .unwrap(); + + drop(ring_buffer); + let buffer = AtomicBuffer::wrap(&mut bytes); + let record_len = source_len + record_descriptor::HEADER_LENGTH; + assert_eq!( + buffer.get_i64_volatile(0).unwrap(), + record_descriptor::make_header(record_len, type_id) + ); + assert_eq!( + buffer.get_i64_volatile(size_of::() as IndexT).unwrap(), + 12 + ); + } } diff --git a/src/util.rs b/src/util.rs index 192d644..db0dfdc 100644 --- a/src/util.rs +++ b/src/util.rs @@ -6,11 +6,6 @@ // QUESTION: Can this just be updated to be `usize` in Rust? pub type IndexT = i32; -/// Helper method for quick verification that `IndexT` is a positive power of two -pub fn is_power_of_two(idx: IndexT) -> bool { - idx > 0 && (idx as u32).is_power_of_two() -} - /// Length of the data blocks used by the CPU cache sub-system in bytes pub const CACHE_LINE_LENGTH: usize = 64; @@ -28,3 +23,37 @@ pub enum AeronError { /// Result type for operations in the Aeron client pub type Result = ::std::result::Result; + +/// Bit-level utility functions +pub mod bit { + use crate::util::IndexT; + use num::PrimInt; + + /// Helper method for quick verification that `IndexT` is a positive power of two + /// + /// ```rust + /// # use aeron_rs::util::bit::is_power_of_two; + /// assert!(is_power_of_two(16)); + /// assert!(!is_power_of_two(17)); + /// ``` + pub fn is_power_of_two(idx: IndexT) -> bool { + idx > 0 && (idx as u32).is_power_of_two() + } + + /// Align a specific value to the next largest alignment size. + /// + /// ```rust + /// # use aeron_rs::util::bit::align; + /// assert_eq!(align(7, 8), 8); + /// + /// // Not intended for alignments that aren't powers of two + /// assert_eq!(align(52, 12), 52); + /// assert_eq!(align(52, 16), 64); + /// ``` + pub fn align(val: T, alignment: T) -> T + where + T: PrimInt, + { + (val + (alignment - T::one())) & !(alignment - T::one()) + } +}