mirror of
https://github.com/bspeice/aeron-rs
synced 2024-12-21 21:38:09 -05:00
write
now working again as well
This commit is contained in:
parent
02638d20c0
commit
ed766ce86b
@ -165,6 +165,51 @@ pub trait AtomicBuffer: Deref<Target = [u8]> + DerefMut<Target = [u8]> {
|
|||||||
fn put_i64_ordered(&mut self, offset: IndexT, value: i64) -> Result<()> {
|
fn put_i64_ordered(&mut self, offset: IndexT, value: i64) -> Result<()> {
|
||||||
self.write_volatile::<i64>(offset, value)
|
self.write_volatile::<i64>(offset, value)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Write the contents of one buffer to another. Does not perform any synchronization
|
||||||
|
fn put_bytes<B>(
|
||||||
|
&mut self,
|
||||||
|
index: IndexT,
|
||||||
|
source: &B,
|
||||||
|
source_index: IndexT,
|
||||||
|
len: IndexT,
|
||||||
|
) -> Result<()>
|
||||||
|
where
|
||||||
|
B: AtomicBuffer,
|
||||||
|
{
|
||||||
|
self.bounds_check(index, len)?;
|
||||||
|
source.bounds_check(source_index, len)?;
|
||||||
|
|
||||||
|
let index = index as usize;
|
||||||
|
let source_index = source_index as usize;
|
||||||
|
let len = len as usize;
|
||||||
|
|
||||||
|
self[index..index + len].copy_from_slice(&source[source_index..source_index + len]);
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Perform a volatile read of an `i32` from the buffer
|
||||||
|
///
|
||||||
|
/// ```rust
|
||||||
|
/// # use aeron_rs::client::concurrent::AtomicBuffer;
|
||||||
|
/// let buffer = vec![0, 12, 0, 0, 0];
|
||||||
|
/// assert_eq!(buffer.get_i32_volatile(1), Ok(12));
|
||||||
|
/// ```
|
||||||
|
fn get_i32_volatile(&self, offset: IndexT) -> Result<i32> {
|
||||||
|
self.overlay_volatile::<i32>(offset)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Perform a volatile write of an `i32` into the buffer
|
||||||
|
///
|
||||||
|
/// ```rust
|
||||||
|
/// # use aeron_rs::client::concurrent::AtomicBuffer;
|
||||||
|
/// let mut bytes = vec![0u8; 4];
|
||||||
|
/// bytes.put_i32_ordered(0, 12);
|
||||||
|
/// assert_eq!(bytes.get_i32_volatile(0), Ok(12));
|
||||||
|
/// ```
|
||||||
|
fn put_i32_ordered(&mut self, offset: IndexT, value: i32) -> Result<()> {
|
||||||
|
self.write_volatile::<i32>(offset, value)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl AtomicBuffer for Vec<u8> {}
|
impl AtomicBuffer for Vec<u8> {}
|
||||||
|
@ -2,6 +2,7 @@
|
|||||||
use crate::client::concurrent::AtomicBuffer;
|
use crate::client::concurrent::AtomicBuffer;
|
||||||
use crate::util::bit::align;
|
use crate::util::bit::align;
|
||||||
use crate::util::{bit, AeronError, IndexT, Result};
|
use crate::util::{bit, AeronError, IndexT, Result};
|
||||||
|
use std::ops::Deref;
|
||||||
|
|
||||||
/// Description of the Ring Buffer schema.
|
/// Description of the Ring Buffer schema.
|
||||||
pub mod buffer_descriptor {
|
pub mod buffer_descriptor {
|
||||||
@ -149,7 +150,6 @@ where
|
|||||||
.unwrap()
|
.unwrap()
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
|
||||||
/// Write a message into the ring buffer
|
/// Write a message into the ring buffer
|
||||||
pub fn write<B>(
|
pub fn write<B>(
|
||||||
&mut self,
|
&mut self,
|
||||||
@ -159,7 +159,7 @@ where
|
|||||||
length: IndexT,
|
length: IndexT,
|
||||||
) -> Result<()>
|
) -> Result<()>
|
||||||
where
|
where
|
||||||
B: AtomicBuffer
|
B: AtomicBuffer,
|
||||||
{
|
{
|
||||||
record_descriptor::check_msg_type_id(msg_type_id)?;
|
record_descriptor::check_msg_type_id(msg_type_id)?;
|
||||||
self.check_msg_length(length)?;
|
self.check_msg_length(length)?;
|
||||||
@ -192,6 +192,7 @@ where
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
/// Read messages from the ring buffer and dispatch to `handler`, up to `message_count_limit`
|
/// Read messages from the ring buffer and dispatch to `handler`, up to `message_count_limit`
|
||||||
pub fn read<F>(&mut self, mut handler: F, message_count_limit: usize) -> Result<usize>
|
pub fn read<F>(&mut self, mut handler: F, message_count_limit: usize) -> Result<usize>
|
||||||
where
|
where
|
||||||
@ -366,20 +367,31 @@ where
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl<A> Deref for ManyToOneRingBuffer<A>
|
||||||
|
where
|
||||||
|
A: AtomicBuffer,
|
||||||
|
{
|
||||||
|
type Target = A;
|
||||||
|
|
||||||
|
fn deref(&self) -> &Self::Target {
|
||||||
|
&self.buffer
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
use crate::client::concurrent::AtomicBuffer;
|
|
||||||
use crate::client::concurrent::ringbuffer::{
|
use crate::client::concurrent::ringbuffer::{
|
||||||
buffer_descriptor, record_descriptor, ManyToOneRingBuffer,
|
buffer_descriptor, record_descriptor, ManyToOneRingBuffer,
|
||||||
};
|
};
|
||||||
|
use crate::client::concurrent::AtomicBuffer;
|
||||||
use crate::util::IndexT;
|
use crate::util::IndexT;
|
||||||
use std::mem::size_of;
|
use std::mem::size_of;
|
||||||
|
|
||||||
|
const BUFFER_SIZE: usize = 512 + super::buffer_descriptor::TRAILER_LENGTH as usize;
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn claim_capacity_owned() {
|
fn claim_capacity_owned() {
|
||||||
let buf_size = super::buffer_descriptor::TRAILER_LENGTH as usize + 64;
|
let mut ring_buf = ManyToOneRingBuffer::new(vec![0u8; BUFFER_SIZE]).unwrap();
|
||||||
let mut buf = vec![0u8; buf_size];
|
|
||||||
let mut ring_buf = ManyToOneRingBuffer::new(buf).unwrap();
|
|
||||||
|
|
||||||
ring_buf.claim_capacity(16).unwrap();
|
ring_buf.claim_capacity(16).unwrap();
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
@ -393,11 +405,9 @@ mod tests {
|
|||||||
assert_eq!(write_start, 16);
|
assert_eq!(write_start, 16);
|
||||||
}
|
}
|
||||||
|
|
||||||
const TEST_BUFFER_SIZE: usize = super::buffer_descriptor::TRAILER_LENGTH as usize + 64;
|
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn claim_capacity_shared() {
|
fn claim_capacity_shared() {
|
||||||
let mut buf = &mut [0u8; TEST_BUFFER_SIZE][..];
|
let buf = &mut [0u8; BUFFER_SIZE][..];
|
||||||
let mut ring_buf = ManyToOneRingBuffer::new(buf).unwrap();
|
let mut ring_buf = ManyToOneRingBuffer::new(buf).unwrap();
|
||||||
|
|
||||||
ring_buf.claim_capacity(16).unwrap();
|
ring_buf.claim_capacity(16).unwrap();
|
||||||
@ -412,34 +422,32 @@ mod tests {
|
|||||||
assert_eq!(write_start, 16);
|
assert_eq!(write_start, 16);
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
|
||||||
#[test]
|
#[test]
|
||||||
fn write_basic() {
|
fn write_basic() {
|
||||||
let mut bytes = vec![0u8; 512 + buffer_descriptor::TRAILER_LENGTH as usize];
|
let mut ring_buffer =
|
||||||
let buffer = AtomicBuffer::wrap(&mut bytes);
|
ManyToOneRingBuffer::new(vec![0u8; BUFFER_SIZE]).expect("Invalid buffer size");
|
||||||
let mut ring_buffer = ManyToOneRingBuffer::wrap(buffer).expect("Invalid buffer size");
|
|
||||||
|
|
||||||
let mut source_bytes = [12, 0, 0, 0, 0, 0, 0, 0];
|
let source_bytes = &mut [12u8, 0, 0, 0][..];
|
||||||
let source_len = source_bytes.len() as IndexT;
|
let source_len = source_bytes.len() as IndexT;
|
||||||
let source_buffer = AtomicBuffer::wrap(&mut source_bytes);
|
|
||||||
let type_id = 1;
|
let type_id = 1;
|
||||||
ring_buffer
|
ring_buffer
|
||||||
.write(type_id, &source_buffer, 0, source_len)
|
.write(type_id, &source_bytes, 0, source_len)
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
drop(ring_buffer);
|
|
||||||
let buffer = AtomicBuffer::wrap(&mut bytes);
|
|
||||||
let record_len = source_len + record_descriptor::HEADER_LENGTH;
|
let record_len = source_len + record_descriptor::HEADER_LENGTH;
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
buffer.get_i64_volatile(0).unwrap(),
|
ring_buffer.get_i64_volatile(0).unwrap(),
|
||||||
record_descriptor::make_header(record_len, type_id)
|
record_descriptor::make_header(record_len, type_id)
|
||||||
);
|
);
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
buffer.get_i64_volatile(size_of::<i64>() as IndexT).unwrap(),
|
ring_buffer
|
||||||
|
.get_i64_volatile(size_of::<i64>() as IndexT)
|
||||||
|
.unwrap(),
|
||||||
12
|
12
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
#[test]
|
#[test]
|
||||||
fn read_basic() {
|
fn read_basic() {
|
||||||
// Similar to write basic, put something into the buffer
|
// Similar to write basic, put something into the buffer
|
||||||
|
Loading…
Reference in New Issue
Block a user