mirror of
https://github.com/bspeice/aeron-rs
synced 2024-12-22 05:48:10 -05:00
Add buffer write support
This commit is contained in:
parent
8174f0cde1
commit
9373f04b48
@ -14,6 +14,7 @@ maintenance = { status = "actively-developed" }
|
|||||||
[dependencies]
|
[dependencies]
|
||||||
aeron_driver-sys = { path = "./aeron_driver-sys" }
|
aeron_driver-sys = { path = "./aeron_driver-sys" }
|
||||||
memmap = "0.7"
|
memmap = "0.7"
|
||||||
|
num = "0.2"
|
||||||
|
|
||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
clap = "2.33"
|
clap = "2.33"
|
||||||
|
@ -26,8 +26,8 @@ impl<'a> AtomicBuffer<'a> {
|
|||||||
AtomicBuffer { buffer }
|
AtomicBuffer { buffer }
|
||||||
}
|
}
|
||||||
|
|
||||||
fn bounds_check<T>(&self, offset: IndexT) -> Result<()> {
|
fn bounds_check(&self, offset: IndexT, size: IndexT) -> Result<()> {
|
||||||
if offset < 0 || self.buffer.len() - (offset as usize) < size_of::<T>() {
|
if offset < 0 || size < 0 || self.buffer.len() as IndexT - offset < size {
|
||||||
Err(AeronError::OutOfBounds)
|
Err(AeronError::OutOfBounds)
|
||||||
} else {
|
} else {
|
||||||
Ok(())
|
Ok(())
|
||||||
@ -39,7 +39,8 @@ impl<'a> AtomicBuffer<'a> {
|
|||||||
where
|
where
|
||||||
T: Sized,
|
T: Sized,
|
||||||
{
|
{
|
||||||
self.bounds_check::<T>(offset).map(|_| {
|
self.bounds_check(offset, size_of::<T>() as IndexT)
|
||||||
|
.map(|_| {
|
||||||
let offset_ptr = unsafe { self.buffer.as_ptr().offset(offset as isize) };
|
let offset_ptr = unsafe { self.buffer.as_ptr().offset(offset as isize) };
|
||||||
unsafe { &*(offset_ptr as *const T) }
|
unsafe { &*(offset_ptr as *const T) }
|
||||||
})
|
})
|
||||||
@ -49,7 +50,8 @@ impl<'a> AtomicBuffer<'a> {
|
|||||||
where
|
where
|
||||||
T: Copy,
|
T: Copy,
|
||||||
{
|
{
|
||||||
self.bounds_check::<T>(offset).map(|_| {
|
self.bounds_check(offset, size_of::<T>() as IndexT)
|
||||||
|
.map(|_| {
|
||||||
let offset_ptr = unsafe { self.buffer.as_ptr().offset(offset as isize) };
|
let offset_ptr = unsafe { self.buffer.as_ptr().offset(offset as isize) };
|
||||||
unsafe { read_volatile(offset_ptr as *const T) }
|
unsafe { read_volatile(offset_ptr as *const T) }
|
||||||
})
|
})
|
||||||
@ -59,32 +61,142 @@ impl<'a> AtomicBuffer<'a> {
|
|||||||
where
|
where
|
||||||
T: Copy,
|
T: Copy,
|
||||||
{
|
{
|
||||||
self.bounds_check::<T>(offset).map(|_| {
|
self.bounds_check(offset, size_of::<T>() as IndexT)
|
||||||
|
.map(|_| {
|
||||||
let offset_ptr = unsafe { self.buffer.as_mut_ptr().offset(offset as isize) };
|
let offset_ptr = unsafe { self.buffer.as_mut_ptr().offset(offset as isize) };
|
||||||
unsafe { write_volatile(offset_ptr as *mut T, val) };
|
unsafe { write_volatile(offset_ptr as *mut T, val) };
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Atomically fetch the current value at an offset, and increment by delta
|
/// Atomically fetch the current value at an offset, and increment by delta
|
||||||
|
///
|
||||||
|
/// ```rust
|
||||||
|
/// # use aeron_rs::client::concurrent::atomic_buffer::AtomicBuffer;
|
||||||
|
/// # use aeron_rs::util::AeronError;
|
||||||
|
/// let mut bytes = [0u8; 9];
|
||||||
|
/// let mut buffer = AtomicBuffer::wrap(&mut bytes);
|
||||||
|
///
|
||||||
|
/// // Simple case modifies only the first byte
|
||||||
|
/// assert_eq!(buffer.get_and_add_i64(0, 1), Ok(0));
|
||||||
|
/// assert_eq!(buffer.get_and_add_i64(0, 0), Ok(1));
|
||||||
|
///
|
||||||
|
/// // Using an offset modifies the second byte
|
||||||
|
/// assert_eq!(buffer.get_and_add_i64(1, 1), Ok(0));
|
||||||
|
/// assert_eq!(buffer.get_and_add_i64(1, 0), Ok(1));
|
||||||
|
///
|
||||||
|
/// // An offset of 2 means buffer size must be 10 to contain an `i64`
|
||||||
|
/// assert_eq!(buffer.get_and_add_i64(2, 0), Err(AeronError::OutOfBounds));
|
||||||
|
/// ```
|
||||||
pub fn get_and_add_i64(&self, offset: IndexT, delta: i64) -> Result<i64> {
|
pub fn get_and_add_i64(&self, offset: IndexT, delta: i64) -> Result<i64> {
|
||||||
self.overlay::<AtomicI64>(offset)
|
self.overlay::<AtomicI64>(offset)
|
||||||
.map(|a| a.fetch_add(delta, Ordering::SeqCst))
|
.map(|a| a.fetch_add(delta, Ordering::SeqCst))
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Perform a volatile read
|
/// Perform a volatile read
|
||||||
|
///
|
||||||
|
/// ```rust
|
||||||
|
/// # use aeron_rs::client::concurrent::atomic_buffer::AtomicBuffer;
|
||||||
|
/// let mut bytes = [12, 0, 0, 0, 0, 0, 0, 0];
|
||||||
|
/// let buffer = AtomicBuffer::wrap(&mut bytes);
|
||||||
|
///
|
||||||
|
/// assert_eq!(buffer.get_i64_volatile(0), Ok(12));
|
||||||
|
/// ```
|
||||||
pub fn get_i64_volatile(&self, offset: IndexT) -> Result<i64> {
|
pub fn get_i64_volatile(&self, offset: IndexT) -> Result<i64> {
|
||||||
// QUESTION: Would it be better to express this in terms of an atomic read?
|
// QUESTION: Would it be better to express this in terms of an atomic read?
|
||||||
self.overlay_volatile::<i64>(offset)
|
self.overlay_volatile::<i64>(offset)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Perform a volatile write into the buffer
|
/// Perform a volatile read
|
||||||
|
///
|
||||||
|
/// ```rust
|
||||||
|
/// # use aeron_rs::client::concurrent::atomic_buffer::AtomicBuffer;
|
||||||
|
/// let mut bytes = [12, 0, 0, 0];
|
||||||
|
/// let buffer = AtomicBuffer::wrap(&mut bytes);
|
||||||
|
///
|
||||||
|
/// assert_eq!(buffer.get_i32_volatile(0), Ok(12));
|
||||||
|
/// ```
|
||||||
|
pub fn get_i32_volatile(&self, offset: IndexT) -> Result<i32> {
|
||||||
|
self.overlay_volatile::<i32>(offset)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Perform a volatile write of an `i64` into the buffer
|
||||||
|
///
|
||||||
|
/// ```rust
|
||||||
|
/// # use aeron_rs::client::concurrent::atomic_buffer::AtomicBuffer;
|
||||||
|
/// let mut bytes = [0u8; 8];
|
||||||
|
/// let mut buffer = AtomicBuffer::wrap(&mut bytes);
|
||||||
|
///
|
||||||
|
/// buffer.put_i64_ordered(0, 12);
|
||||||
|
/// assert_eq!(buffer.get_i64_volatile(0), Ok(12));
|
||||||
|
/// ```
|
||||||
pub fn put_i64_ordered(&mut self, offset: IndexT, val: i64) -> Result<()> {
|
pub fn put_i64_ordered(&mut self, offset: IndexT, val: i64) -> Result<()> {
|
||||||
|
// QUESTION: Would it be better to have callers use `write_volatile` directly
|
||||||
self.write_volatile::<i64>(offset, val)
|
self.write_volatile::<i64>(offset, val)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Perform a volatile write of an `i32` into the buffer
|
||||||
|
///
|
||||||
|
/// ```rust
|
||||||
|
/// # use aeron_rs::client::concurrent::atomic_buffer::AtomicBuffer;
|
||||||
|
/// let mut bytes = [0u8; 4];
|
||||||
|
/// let mut buffer = AtomicBuffer::wrap(&mut bytes);
|
||||||
|
///
|
||||||
|
/// buffer.put_i32_ordered(0, 12);
|
||||||
|
/// assert_eq!(buffer.get_i32_volatile(0), Ok(12));
|
||||||
|
/// ```
|
||||||
|
pub fn put_i32_ordered(&mut self, offset: IndexT, val: i32) -> Result<()> {
|
||||||
|
// QUESTION: Would it be better to have callers use `write_volatile` directly
|
||||||
|
self.write_volatile::<i32>(offset, val)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Write the contents of one buffer to another. Does not perform any synchronization.
|
||||||
|
///
|
||||||
|
/// ```rust
|
||||||
|
/// # use aeron_rs::client::concurrent::atomic_buffer::AtomicBuffer;
|
||||||
|
/// let mut source_bytes = [1u8, 2, 3, 4];
|
||||||
|
/// let source = AtomicBuffer::wrap(&mut source_bytes);
|
||||||
|
///
|
||||||
|
/// let mut dest_bytes = [0, 0, 0, 0];
|
||||||
|
/// let mut dest = AtomicBuffer::wrap(&mut dest_bytes);
|
||||||
|
///
|
||||||
|
/// dest.put_bytes(1, &source, 1, 3);
|
||||||
|
/// drop(dest);
|
||||||
|
/// assert_eq!(dest_bytes, [0u8, 2, 3, 4]);
|
||||||
|
/// ```
|
||||||
|
pub fn put_bytes(
|
||||||
|
&mut self,
|
||||||
|
index: IndexT,
|
||||||
|
source: &AtomicBuffer,
|
||||||
|
source_index: IndexT,
|
||||||
|
len: IndexT,
|
||||||
|
) -> Result<()> {
|
||||||
|
self.bounds_check(index, len)?;
|
||||||
|
source.bounds_check(source_index, len)?;
|
||||||
|
|
||||||
|
let index = index as usize;
|
||||||
|
let source_index = source_index as usize;
|
||||||
|
let len = len as usize;
|
||||||
|
self.buffer[index..index + len].copy_from_slice(&source[source_index..source_index + len]);
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
/// Compare an expected value with what is in memory, and if it matches,
|
/// Compare an expected value with what is in memory, and if it matches,
|
||||||
/// update to a new value. Returns `Ok(true)` if the update was successful,
|
/// update to a new value. Returns `Ok(true)` if the update was successful,
|
||||||
/// and `Ok(false)` if the update failed.
|
/// and `Ok(false)` if the update failed.
|
||||||
|
///
|
||||||
|
/// ```rust
|
||||||
|
/// # use aeron_rs::client::concurrent::atomic_buffer::AtomicBuffer;
|
||||||
|
/// let mut buf = [0u8; 8];
|
||||||
|
/// let atomic_buf = AtomicBuffer::wrap(&mut buf);
|
||||||
|
/// // Set value to 1
|
||||||
|
/// atomic_buf.get_and_add_i64(0, 1).unwrap();
|
||||||
|
///
|
||||||
|
/// // Set value to 1 if existing value is 0
|
||||||
|
/// assert_eq!(atomic_buf.compare_and_set_i64(0, 0, 1), Ok(false));
|
||||||
|
/// // Set value to 2 if existing value is 1
|
||||||
|
/// assert_eq!(atomic_buf.compare_and_set_i64(0, 1, 2), Ok(true));
|
||||||
|
/// assert_eq!(atomic_buf.get_i64_volatile(0), Ok(2));
|
||||||
|
/// ```
|
||||||
pub fn compare_and_set_i64(&self, offset: IndexT, expected: i64, update: i64) -> Result<bool> {
|
pub fn compare_and_set_i64(&self, offset: IndexT, expected: i64, update: i64) -> Result<bool> {
|
||||||
// QUESTION: Do I need a volatile and atomic read here?
|
// QUESTION: Do I need a volatile and atomic read here?
|
||||||
// Aeron C++ uses a volatile read before the atomic operation, but I think that
|
// Aeron C++ uses a volatile read before the atomic operation, but I think that
|
||||||
@ -126,46 +238,6 @@ mod tests {
|
|||||||
assert_eq!(value, (*a).load(Ordering::SeqCst));
|
assert_eq!(value, (*a).load(Ordering::SeqCst));
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn atomic_i64_increment() {
|
|
||||||
let mut buf = [16, 0, 0, 0, 0, 0, 0, 0];
|
|
||||||
|
|
||||||
let atomic_buf = AtomicBuffer::wrap(&mut buf[..]);
|
|
||||||
assert_eq!(atomic_buf.get_and_add_i64(0, 1), Ok(16));
|
|
||||||
assert_eq!(atomic_buf.get_and_add_i64(0, 0), Ok(17));
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn atomic_i64_increment_offset() {
|
|
||||||
let mut buf = [0, 16, 0, 0, 0, 0, 0, 0, 0];
|
|
||||||
|
|
||||||
let atomic_buf = AtomicBuffer::wrap(&mut buf[..]);
|
|
||||||
assert_eq!(atomic_buf.get_and_add_i64(1, 1), Ok(16));
|
|
||||||
assert_eq!(atomic_buf.get_and_add_i64(1, 0), Ok(17));
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn out_of_bounds() {
|
|
||||||
let mut buf = [16, 0, 0, 0, 0, 0, 0];
|
|
||||||
|
|
||||||
let atomic_buf = AtomicBuffer::wrap(&mut buf);
|
|
||||||
assert_eq!(
|
|
||||||
atomic_buf.get_and_add_i64(0, 0),
|
|
||||||
Err(AeronError::OutOfBounds)
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn out_of_bounds_offset() {
|
|
||||||
let mut buf = [16, 0, 0, 0, 0, 0, 0, 0];
|
|
||||||
|
|
||||||
let atomic_buf = AtomicBuffer::wrap(&mut buf);
|
|
||||||
assert_eq!(
|
|
||||||
atomic_buf.get_and_add_i64(1, 0),
|
|
||||||
Err(AeronError::OutOfBounds)
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn negative_offset() {
|
fn negative_offset() {
|
||||||
let mut buf = [16, 0, 0, 0, 0, 0, 0, 0];
|
let mut buf = [16, 0, 0, 0, 0, 0, 0, 0];
|
||||||
@ -175,25 +247,4 @@ mod tests {
|
|||||||
Err(AeronError::OutOfBounds)
|
Err(AeronError::OutOfBounds)
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn put_i64() {
|
|
||||||
let mut buf = [0u8; 8];
|
|
||||||
let mut atomic_buf = AtomicBuffer::wrap(&mut buf);
|
|
||||||
|
|
||||||
atomic_buf.put_i64_ordered(0, 12).unwrap();
|
|
||||||
assert_eq!(atomic_buf.get_i64_volatile(0), Ok(12))
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn compare_set_i64() {
|
|
||||||
let mut buf = [0u8; 8];
|
|
||||||
let atomic_buf = AtomicBuffer::wrap(&mut buf);
|
|
||||||
|
|
||||||
atomic_buf.get_and_add_i64(0, 1).unwrap();
|
|
||||||
|
|
||||||
assert_eq!(atomic_buf.compare_and_set_i64(0, 0, 1), Ok(false));
|
|
||||||
assert_eq!(atomic_buf.compare_and_set_i64(0, 1, 2), Ok(true));
|
|
||||||
assert_eq!(atomic_buf.get_i64_volatile(0), Ok(2));
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
@ -1,12 +1,13 @@
|
|||||||
//! Ring buffer wrapper for communicating with the Media Driver
|
//! Ring buffer wrapper for communicating with the Media Driver
|
||||||
use crate::client::concurrent::atomic_buffer::AtomicBuffer;
|
use crate::client::concurrent::atomic_buffer::AtomicBuffer;
|
||||||
use crate::util::{AeronError, IndexT, Result};
|
use crate::util::{bit, AeronError, IndexT, Result};
|
||||||
|
|
||||||
/// Description of the Ring Buffer schema.
|
/// Description of the Ring Buffer schema.
|
||||||
pub mod buffer_descriptor {
|
pub mod buffer_descriptor {
|
||||||
use crate::client::concurrent::atomic_buffer::AtomicBuffer;
|
use crate::client::concurrent::atomic_buffer::AtomicBuffer;
|
||||||
|
use crate::util::bit::is_power_of_two;
|
||||||
use crate::util::AeronError::IllegalArgument;
|
use crate::util::AeronError::IllegalArgument;
|
||||||
use crate::util::{is_power_of_two, IndexT, Result, CACHE_LINE_LENGTH};
|
use crate::util::{IndexT, Result, CACHE_LINE_LENGTH};
|
||||||
|
|
||||||
// QUESTION: Why are these offsets so large when we only ever use i64 types?
|
// QUESTION: Why are these offsets so large when we only ever use i64 types?
|
||||||
|
|
||||||
@ -57,12 +58,17 @@ pub mod buffer_descriptor {
|
|||||||
/// +---------------------------------------------------------------+
|
/// +---------------------------------------------------------------+
|
||||||
/// ```
|
/// ```
|
||||||
pub mod record_descriptor {
|
pub mod record_descriptor {
|
||||||
use crate::util::IndexT;
|
|
||||||
use std::mem::size_of;
|
use std::mem::size_of;
|
||||||
|
|
||||||
|
use crate::util::Result;
|
||||||
|
use crate::util::{AeronError, IndexT};
|
||||||
|
|
||||||
/// Size of the ring buffer record header.
|
/// Size of the ring buffer record header.
|
||||||
pub const HEADER_LENGTH: IndexT = size_of::<i32>() as IndexT * 2;
|
pub const HEADER_LENGTH: IndexT = size_of::<i32>() as IndexT * 2;
|
||||||
|
|
||||||
|
/// Alignment size of records written to the buffer
|
||||||
|
pub const ALIGNMENT: IndexT = HEADER_LENGTH;
|
||||||
|
|
||||||
/// Message type indicating to the media driver that space has been reserved,
|
/// Message type indicating to the media driver that space has been reserved,
|
||||||
/// and is not yet ready for processing.
|
/// and is not yet ready for processing.
|
||||||
pub const PADDING_MSG_TYPE_ID: i32 = -1;
|
pub const PADDING_MSG_TYPE_ID: i32 = -1;
|
||||||
@ -73,16 +79,36 @@ pub mod record_descriptor {
|
|||||||
// Smells like Java.
|
// Smells like Java.
|
||||||
((i64::from(msg_type_id) & 0xFFFF_FFFF) << 32) | (i64::from(length) & 0xFFFF_FFFF)
|
((i64::from(msg_type_id) & 0xFFFF_FFFF) << 32) | (i64::from(length) & 0xFFFF_FFFF)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Verify a message type identifier is safe for use
|
||||||
|
pub fn check_msg_type_id(msg_type_id: i32) -> Result<()> {
|
||||||
|
if msg_type_id < 1 {
|
||||||
|
Err(AeronError::IllegalArgument)
|
||||||
|
} else {
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Fetch the offset to begin writing a message payload
|
||||||
|
pub fn encoded_msg_offset(record_offset: IndexT) -> IndexT {
|
||||||
|
record_offset + HEADER_LENGTH
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Fetch the offset to begin writing the message length
|
||||||
|
pub fn length_offset(record_offset: IndexT) -> IndexT {
|
||||||
|
record_offset
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Multi-producer, single-consumer ring buffer implementation.
|
/// Multi-producer, single-consumer ring buffer implementation.
|
||||||
pub struct ManyToOneRingBuffer<'a> {
|
pub struct ManyToOneRingBuffer<'a> {
|
||||||
buffer: AtomicBuffer<'a>,
|
buffer: AtomicBuffer<'a>,
|
||||||
capacity: IndexT,
|
capacity: IndexT,
|
||||||
|
max_msg_length: IndexT,
|
||||||
tail_position_index: IndexT,
|
tail_position_index: IndexT,
|
||||||
head_cache_position_index: IndexT,
|
head_cache_position_index: IndexT,
|
||||||
head_position_index: IndexT,
|
head_position_index: IndexT,
|
||||||
_correlation_id_counter_index: IndexT,
|
correlation_id_counter_index: IndexT,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<'a> ManyToOneRingBuffer<'a> {
|
impl<'a> ManyToOneRingBuffer<'a> {
|
||||||
@ -91,17 +117,65 @@ impl<'a> ManyToOneRingBuffer<'a> {
|
|||||||
buffer_descriptor::check_capacity(&buffer).map(|capacity| ManyToOneRingBuffer {
|
buffer_descriptor::check_capacity(&buffer).map(|capacity| ManyToOneRingBuffer {
|
||||||
buffer,
|
buffer,
|
||||||
capacity,
|
capacity,
|
||||||
|
max_msg_length: capacity / 8,
|
||||||
tail_position_index: capacity + buffer_descriptor::TAIL_POSITION_OFFSET,
|
tail_position_index: capacity + buffer_descriptor::TAIL_POSITION_OFFSET,
|
||||||
head_cache_position_index: capacity + buffer_descriptor::HEAD_CACHE_POSITION_OFFSET,
|
head_cache_position_index: capacity + buffer_descriptor::HEAD_CACHE_POSITION_OFFSET,
|
||||||
head_position_index: capacity + buffer_descriptor::HEAD_POSITION_OFFSET,
|
head_position_index: capacity + buffer_descriptor::HEAD_POSITION_OFFSET,
|
||||||
_correlation_id_counter_index: capacity + buffer_descriptor::CORRELATION_COUNTER_OFFSET,
|
correlation_id_counter_index: capacity + buffer_descriptor::CORRELATION_COUNTER_OFFSET,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Atomically retrieve the next correlation identifier. Used as a unique identifier for
|
||||||
|
/// interactions with the Media Driver
|
||||||
|
pub fn next_correlation_id(&self) -> i64 {
|
||||||
|
// UNWRAP: Known-valid offset calculated during initialization
|
||||||
|
self.buffer
|
||||||
|
.get_and_add_i64(self.correlation_id_counter_index, 1)
|
||||||
|
.unwrap()
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Write a message into the ring buffer
|
||||||
|
pub fn write(
|
||||||
|
&mut self,
|
||||||
|
msg_type_id: i32,
|
||||||
|
source: &AtomicBuffer,
|
||||||
|
source_index: IndexT,
|
||||||
|
length: IndexT,
|
||||||
|
) -> Result<()> {
|
||||||
|
record_descriptor::check_msg_type_id(msg_type_id)?;
|
||||||
|
self.check_msg_length(length)?;
|
||||||
|
|
||||||
|
let record_len = length + record_descriptor::HEADER_LENGTH;
|
||||||
|
let required = bit::align(record_len, record_descriptor::ALIGNMENT);
|
||||||
|
let record_index = self.claim_capacity(required)?;
|
||||||
|
|
||||||
|
// UNWRAP: `claim_capacity` performed bounds checking
|
||||||
|
self.buffer
|
||||||
|
.put_i64_ordered(
|
||||||
|
record_index,
|
||||||
|
record_descriptor::make_header(-length, msg_type_id),
|
||||||
|
)
|
||||||
|
.unwrap();
|
||||||
|
// UNWRAP: `claim_capacity` performed bounds checking
|
||||||
|
self.buffer
|
||||||
|
.put_bytes(
|
||||||
|
record_descriptor::encoded_msg_offset(record_index),
|
||||||
|
source,
|
||||||
|
source_index,
|
||||||
|
length,
|
||||||
|
)
|
||||||
|
.unwrap();
|
||||||
|
// UNWRAP: `claim_capacity` performed bounds checking
|
||||||
|
self.buffer
|
||||||
|
.put_i32_ordered(record_descriptor::length_offset(record_index), record_len)
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
/// Claim capacity for a specific message size in the ring buffer. Returns the offset/index
|
/// Claim capacity for a specific message size in the ring buffer. Returns the offset/index
|
||||||
/// at which to start writing the next record.
|
/// at which to start writing the next record.
|
||||||
// TODO: Shouldn't be `pub`, just trying to avoid warnings
|
fn claim_capacity(&mut self, required: IndexT) -> Result<IndexT> {
|
||||||
pub fn claim_capacity(&mut self, required: IndexT) -> Result<IndexT> {
|
|
||||||
// QUESTION: Is this mask how we handle the "ring" in ring buffer?
|
// QUESTION: Is this mask how we handle the "ring" in ring buffer?
|
||||||
// Would explain why we assert buffer capacity is a power of two during initialization
|
// Would explain why we assert buffer capacity is a power of two during initialization
|
||||||
let mask = self.capacity - 1;
|
let mask = self.capacity - 1;
|
||||||
@ -199,15 +273,27 @@ impl<'a> ManyToOneRingBuffer<'a> {
|
|||||||
|
|
||||||
Ok(tail_index)
|
Ok(tail_index)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn check_msg_length(&self, length: IndexT) -> Result<()> {
|
||||||
|
if length > self.max_msg_length {
|
||||||
|
Err(AeronError::IllegalArgument)
|
||||||
|
} else {
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
use crate::client::concurrent::atomic_buffer::AtomicBuffer;
|
use crate::client::concurrent::atomic_buffer::AtomicBuffer;
|
||||||
use crate::client::concurrent::ring_buffer::ManyToOneRingBuffer;
|
use crate::client::concurrent::ring_buffer::{
|
||||||
|
buffer_descriptor, record_descriptor, ManyToOneRingBuffer,
|
||||||
|
};
|
||||||
|
use crate::util::IndexT;
|
||||||
|
use std::mem::size_of;
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn basic_claim_space() {
|
fn claim_capacity_basic() {
|
||||||
let buf_size = super::buffer_descriptor::TRAILER_LENGTH as usize + 64;
|
let buf_size = super::buffer_descriptor::TRAILER_LENGTH as usize + 64;
|
||||||
let mut buf = vec![0u8; buf_size];
|
let mut buf = vec![0u8; buf_size];
|
||||||
|
|
||||||
@ -225,4 +311,31 @@ mod tests {
|
|||||||
let write_start = ring_buf.claim_capacity(16).unwrap();
|
let write_start = ring_buf.claim_capacity(16).unwrap();
|
||||||
assert_eq!(write_start, 16);
|
assert_eq!(write_start, 16);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn write_basic() {
|
||||||
|
let mut bytes = vec![0u8; 512 + buffer_descriptor::TRAILER_LENGTH as usize];
|
||||||
|
let buffer = AtomicBuffer::wrap(&mut bytes);
|
||||||
|
let mut ring_buffer = ManyToOneRingBuffer::wrap(buffer).expect("Invalid buffer size");
|
||||||
|
|
||||||
|
let mut source_bytes = [12, 0, 0, 0, 0, 0, 0, 0];
|
||||||
|
let source_len = source_bytes.len() as IndexT;
|
||||||
|
let source_buffer = AtomicBuffer::wrap(&mut source_bytes);
|
||||||
|
let type_id = 1;
|
||||||
|
ring_buffer
|
||||||
|
.write(type_id, &source_buffer, 0, source_len)
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
drop(ring_buffer);
|
||||||
|
let buffer = AtomicBuffer::wrap(&mut bytes);
|
||||||
|
let record_len = source_len + record_descriptor::HEADER_LENGTH;
|
||||||
|
assert_eq!(
|
||||||
|
buffer.get_i64_volatile(0).unwrap(),
|
||||||
|
record_descriptor::make_header(record_len, type_id)
|
||||||
|
);
|
||||||
|
assert_eq!(
|
||||||
|
buffer.get_i64_volatile(size_of::<i64>() as IndexT).unwrap(),
|
||||||
|
12
|
||||||
|
);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
39
src/util.rs
39
src/util.rs
@ -6,11 +6,6 @@
|
|||||||
// QUESTION: Can this just be updated to be `usize` in Rust?
|
// QUESTION: Can this just be updated to be `usize` in Rust?
|
||||||
pub type IndexT = i32;
|
pub type IndexT = i32;
|
||||||
|
|
||||||
/// Helper method for quick verification that `IndexT` is a positive power of two
|
|
||||||
pub fn is_power_of_two(idx: IndexT) -> bool {
|
|
||||||
idx > 0 && (idx as u32).is_power_of_two()
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Length of the data blocks used by the CPU cache sub-system in bytes
|
/// Length of the data blocks used by the CPU cache sub-system in bytes
|
||||||
pub const CACHE_LINE_LENGTH: usize = 64;
|
pub const CACHE_LINE_LENGTH: usize = 64;
|
||||||
|
|
||||||
@ -28,3 +23,37 @@ pub enum AeronError {
|
|||||||
|
|
||||||
/// Result type for operations in the Aeron client
|
/// Result type for operations in the Aeron client
|
||||||
pub type Result<T> = ::std::result::Result<T, AeronError>;
|
pub type Result<T> = ::std::result::Result<T, AeronError>;
|
||||||
|
|
||||||
|
/// Bit-level utility functions
|
||||||
|
pub mod bit {
|
||||||
|
use crate::util::IndexT;
|
||||||
|
use num::PrimInt;
|
||||||
|
|
||||||
|
/// Helper method for quick verification that `IndexT` is a positive power of two
|
||||||
|
///
|
||||||
|
/// ```rust
|
||||||
|
/// # use aeron_rs::util::bit::is_power_of_two;
|
||||||
|
/// assert!(is_power_of_two(16));
|
||||||
|
/// assert!(!is_power_of_two(17));
|
||||||
|
/// ```
|
||||||
|
pub fn is_power_of_two(idx: IndexT) -> bool {
|
||||||
|
idx > 0 && (idx as u32).is_power_of_two()
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Align a specific value to the next largest alignment size.
|
||||||
|
///
|
||||||
|
/// ```rust
|
||||||
|
/// # use aeron_rs::util::bit::align;
|
||||||
|
/// assert_eq!(align(7, 8), 8);
|
||||||
|
///
|
||||||
|
/// // Not intended for alignments that aren't powers of two
|
||||||
|
/// assert_eq!(align(52, 12), 52);
|
||||||
|
/// assert_eq!(align(52, 16), 64);
|
||||||
|
/// ```
|
||||||
|
pub fn align<T>(val: T, alignment: T) -> T
|
||||||
|
where
|
||||||
|
T: PrimInt,
|
||||||
|
{
|
||||||
|
(val + (alignment - T::one())) & !(alignment - T::one())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user