1
0
mirror of https://github.com/bspeice/aeron-rs synced 2024-12-22 05:48:10 -05:00

claim_capacity now working again

This commit is contained in:
Bradlee Speice 2019-11-02 15:18:52 -04:00
parent fd23f2891a
commit 02638d20c0
3 changed files with 95 additions and 20 deletions

View File

@ -8,11 +8,10 @@ use std::sync::atomic::{AtomicI64, Ordering};
use crate::util::{AeronError, IndexT, Result}; use crate::util::{AeronError, IndexT, Result};
use std::ptr::{read_volatile, write_volatile}; use std::ptr::{read_volatile, write_volatile};
use std::ops::{DerefMut, Deref}; use std::ops::{Deref, DerefMut};
/// Atomic operations on slices of memory /// Atomic operations on slices of memory
pub trait AtomicBuffer: Deref<Target = [u8]> + DerefMut<Target = [u8]> { pub trait AtomicBuffer: Deref<Target = [u8]> + DerefMut<Target = [u8]> {
/// Check that there are at least `size` bytes of memory available /// Check that there are at least `size` bytes of memory available
/// beginning at some offset. /// beginning at some offset.
/// ///
@ -52,8 +51,8 @@ pub trait AtomicBuffer: Deref<Target = [u8]> + DerefMut<Target = [u8]> {
/// assert_eq!(another_val.load(Ordering::SeqCst), 0); /// assert_eq!(another_val.load(Ordering::SeqCst), 0);
/// ``` /// ```
fn overlay<T>(&self, offset: IndexT) -> Result<&T> fn overlay<T>(&self, offset: IndexT) -> Result<&T>
where where
T: Sized T: Sized,
{ {
self.bounds_check(offset, size_of::<T>() as IndexT) self.bounds_check(offset, size_of::<T>() as IndexT)
.map(|_| { .map(|_| {
@ -72,8 +71,8 @@ pub trait AtomicBuffer: Deref<Target = [u8]> + DerefMut<Target = [u8]> {
/// assert_eq!(my_val, 5); /// assert_eq!(my_val, 5);
/// ``` /// ```
fn overlay_volatile<T>(&self, offset: IndexT) -> Result<T> fn overlay_volatile<T>(&self, offset: IndexT) -> Result<T>
where where
T: Copy, T: Copy,
{ {
self.bounds_check(offset, size_of::<T>() as IndexT) self.bounds_check(offset, size_of::<T>() as IndexT)
.map(|_| { .map(|_| {
@ -93,8 +92,8 @@ pub trait AtomicBuffer: Deref<Target = [u8]> + DerefMut<Target = [u8]> {
/// assert_eq!(buffer, [24, 0, 0, 0]); /// assert_eq!(buffer, [24, 0, 0, 0]);
/// ``` /// ```
fn write_volatile<T>(&mut self, offset: IndexT, val: T) -> Result<()> fn write_volatile<T>(&mut self, offset: IndexT, val: T) -> Result<()>
where where
T: Copy, T: Copy,
{ {
self.bounds_check(offset, size_of::<T>() as IndexT) self.bounds_check(offset, size_of::<T>() as IndexT)
.map(|_| { .map(|_| {
@ -104,8 +103,67 @@ pub trait AtomicBuffer: Deref<Target = [u8]> + DerefMut<Target = [u8]> {
} }
/// Perform an atomic fetch and add of a 64-bit value /// Perform an atomic fetch and add of a 64-bit value
///
/// ```rust
/// # use aeron_rs::client::concurrent::AtomicBuffer;
/// let mut buf = vec![0u8; 8];
/// assert_eq!(buf.get_and_add_i64(0, 1), Ok(0));
/// assert_eq!(buf.get_and_add_i64(0, 1), Ok(1));
/// ```
fn get_and_add_i64(&self, offset: IndexT, value: i64) -> Result<i64> { fn get_and_add_i64(&self, offset: IndexT, value: i64) -> Result<i64> {
self.overlay::<AtomicI64>(offset).map(|a| a.fetch_add(value, Ordering::SeqCst)) self.overlay::<AtomicI64>(offset)
.map(|a| a.fetch_add(value, Ordering::SeqCst))
}
/// Perform an atomic Compare-And-Swap of a 64-bit value. Returns `Ok(true)`
/// if the update was successful, and `Ok(false)` if the update failed.
///
/// ```rust
/// # use aeron_rs::client::concurrent::AtomicBuffer;
/// let mut buf = &mut [0u8; 8][..];
/// // Set value to 1
/// buf.get_and_add_i64(0, 1).unwrap();
///
/// // Set value to 1 if existing value is 0
/// assert_eq!(buf.compare_and_set_i64(0, 0, 1), Ok(false));
/// // Set value to 2 if existing value is 1
/// assert_eq!(buf.compare_and_set_i64(0, 1, 2), Ok(true));
/// assert_eq!(buf.get_i64_volatile(0), Ok(2));
/// ```
fn compare_and_set_i64(&self, offset: IndexT, expected: i64, update: i64) -> Result<bool> {
// QUESTION: Should I use a volatile read here as well?
// Aeron C++ uses a volatile read before the atomic operation, but I think that
// may be redundant. In addition, Rust's `read_volatile` operation returns a
// *copied* value; running `compare_exchange` on that copy introduces a race condition
// because we're no longer comparing a consistent address.
self.overlay::<AtomicI64>(offset).map(|a| {
a.compare_exchange(expected, update, Ordering::SeqCst, Ordering::SeqCst)
.is_ok()
})
}
/// Perform a volatile read of an `i64` value
///
/// ```rust
/// # use aeron_rs::client::concurrent::AtomicBuffer;
/// let buffer = vec![12u8, 0, 0, 0, 0, 0, 0, 0];
/// assert_eq!(buffer.get_i64_volatile(0), Ok(12));
/// ```
fn get_i64_volatile(&self, offset: IndexT) -> Result<i64> {
// QUESTION: Would it be better to express this in terms of an atomic read?
self.overlay_volatile::<i64>(offset)
}
/// Perform a volatile write of an `i64` value
///
/// ```rust
/// # use aeron_rs::client::concurrent::AtomicBuffer;
/// let mut buffer = vec![0u8; 8];
/// buffer.put_i64_ordered(0, 12);
/// assert_eq!(buffer.get_i64_volatile(0), Ok(12));
/// ```
fn put_i64_ordered(&mut self, offset: IndexT, value: i64) -> Result<()> {
self.write_volatile::<i64>(offset, value)
} }
} }

View File

@ -32,7 +32,7 @@ pub mod buffer_descriptor {
/// Returns the actual capacity excluding ring buffer metadata. /// Returns the actual capacity excluding ring buffer metadata.
pub fn check_capacity<A>(buffer: &A) -> Result<IndexT> pub fn check_capacity<A>(buffer: &A) -> Result<IndexT>
where where
A: AtomicBuffer A: AtomicBuffer,
{ {
let capacity = (buffer.len() - TRAILER_LENGTH as usize) as IndexT; let capacity = (buffer.len() - TRAILER_LENGTH as usize) as IndexT;
if is_power_of_two(capacity) { if is_power_of_two(capacity) {
@ -112,7 +112,7 @@ pub mod record_descriptor {
/// Multi-producer, single-consumer ring buffer implementation. /// Multi-producer, single-consumer ring buffer implementation.
pub struct ManyToOneRingBuffer<A> pub struct ManyToOneRingBuffer<A>
where where
A: AtomicBuffer A: AtomicBuffer,
{ {
buffer: A, buffer: A,
capacity: IndexT, capacity: IndexT,
@ -125,7 +125,7 @@ where
impl<A> ManyToOneRingBuffer<A> impl<A> ManyToOneRingBuffer<A>
where where
A: AtomicBuffer A: AtomicBuffer,
{ {
/// Create a many-to-one ring buffer from an underlying atomic buffer. /// Create a many-to-one ring buffer from an underlying atomic buffer.
pub fn new(buffer: A) -> Result<Self> { pub fn new(buffer: A) -> Result<Self> {
@ -256,6 +256,7 @@ where
Ok(messages_read) Ok(messages_read)
} }
*/
/// Claim capacity for a specific message size in the ring buffer. Returns the offset/index /// Claim capacity for a specific message size in the ring buffer. Returns the offset/index
/// at which to start writing the next record. /// at which to start writing the next record.
@ -363,10 +364,8 @@ where
Ok(()) Ok(())
} }
} }
*/
} }
/*
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use crate::client::concurrent::AtomicBuffer; use crate::client::concurrent::AtomicBuffer;
@ -377,12 +376,10 @@ mod tests {
use std::mem::size_of; use std::mem::size_of;
#[test] #[test]
fn claim_capacity_basic() { fn claim_capacity_owned() {
let buf_size = super::buffer_descriptor::TRAILER_LENGTH as usize + 64; let buf_size = super::buffer_descriptor::TRAILER_LENGTH as usize + 64;
let mut buf = vec![0u8; buf_size]; let mut buf = vec![0u8; buf_size];
let mut ring_buf = ManyToOneRingBuffer::new(buf).unwrap();
let atomic_buf = AtomicBuffer::wrap(&mut buf);
let mut ring_buf = ManyToOneRingBuffer::wrap(atomic_buf).unwrap();
ring_buf.claim_capacity(16).unwrap(); ring_buf.claim_capacity(16).unwrap();
assert_eq!( assert_eq!(
@ -396,6 +393,26 @@ mod tests {
assert_eq!(write_start, 16); assert_eq!(write_start, 16);
} }
const TEST_BUFFER_SIZE: usize = super::buffer_descriptor::TRAILER_LENGTH as usize + 64;
#[test]
fn claim_capacity_shared() {
let mut buf = &mut [0u8; TEST_BUFFER_SIZE][..];
let mut ring_buf = ManyToOneRingBuffer::new(buf).unwrap();
ring_buf.claim_capacity(16).unwrap();
assert_eq!(
ring_buf
.buffer
.get_i64_volatile(ring_buf.tail_position_index),
Ok(16)
);
let write_start = ring_buf.claim_capacity(16).unwrap();
assert_eq!(write_start, 16);
}
/*
#[test] #[test]
fn write_basic() { fn write_basic() {
let mut bytes = vec![0u8; 512 + buffer_descriptor::TRAILER_LENGTH as usize]; let mut bytes = vec![0u8; 512 + buffer_descriptor::TRAILER_LENGTH as usize];
@ -482,5 +499,5 @@ mod tests {
assert_eq!(buffer.get_i32(i).unwrap(), 0); assert_eq!(buffer.get_i32(i).unwrap(), 0);
} }
} }
*/
} }
*/

View File

@ -1,7 +1,7 @@
use aeron_driver_sys::*; use aeron_driver_sys::*;
use aeron_rs::client::cnc_descriptor; use aeron_rs::client::cnc_descriptor;
use aeron_rs::client::concurrent::AtomicBuffer;
use aeron_rs::client::concurrent::ringbuffer::ManyToOneRingBuffer; use aeron_rs::client::concurrent::ringbuffer::ManyToOneRingBuffer;
use aeron_rs::client::concurrent::AtomicBuffer;
use aeron_rs::util::IndexT; use aeron_rs::util::IndexT;
use memmap::MmapOptions; use memmap::MmapOptions;
use std::ffi::{c_void, CString}; use std::ffi::{c_void, CString};