mirror of
https://github.com/bspeice/aeron-rs
synced 2024-12-24 23:08:09 -05:00
Refactor - don't use a client module
This commit is contained in:
parent
15969581b6
commit
505b9a4bd6
@ -1,6 +0,0 @@
|
|||||||
//! Aeron client
|
|
||||||
//!
|
|
||||||
//! These are the modules necessary to construct a functioning Aeron client
|
|
||||||
pub mod cnc_descriptor;
|
|
||||||
pub mod concurrent;
|
|
||||||
pub mod context;
|
|
@ -77,7 +77,7 @@ pub const CNC_FILE: &str = "cnc.dat";
|
|||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
use crate::client::cnc_descriptor::{MetaDataDefinition, CNC_FILE, CNC_VERSION};
|
use crate::cnc_descriptor::{MetaDataDefinition, CNC_FILE, CNC_VERSION};
|
||||||
use crate::driver::DriverContext;
|
use crate::driver::DriverContext;
|
||||||
use memmap::MmapOptions;
|
use memmap::MmapOptions;
|
||||||
use std::fs::File;
|
use std::fs::File;
|
@ -1,6 +1,6 @@
|
|||||||
//! Read messages that are broadcast from the media driver; this is the primary means
|
//! Read messages that are broadcast from the media driver; this is the primary means
|
||||||
//! of receiving data.
|
//! of receiving data.
|
||||||
use crate::client::concurrent::AtomicBuffer;
|
use crate::concurrent::AtomicBuffer;
|
||||||
use crate::util::bit::align;
|
use crate::util::bit::align;
|
||||||
use crate::util::{AeronError, IndexT, Result};
|
use crate::util::{AeronError, IndexT, Result};
|
||||||
use std::sync::atomic::{AtomicI64, Ordering};
|
use std::sync::atomic::{AtomicI64, Ordering};
|
@ -18,7 +18,7 @@ pub trait AtomicBuffer: Deref<Target = [u8]> + DerefMut<Target = [u8]> {
|
|||||||
/// beginning at some offset.
|
/// beginning at some offset.
|
||||||
///
|
///
|
||||||
/// ```rust
|
/// ```rust
|
||||||
/// # use aeron_rs::client::concurrent::AtomicBuffer;
|
/// # use aeron_rs::concurrent::AtomicBuffer;
|
||||||
///
|
///
|
||||||
/// let buffer = &mut [0u8; 8][..];
|
/// let buffer = &mut [0u8; 8][..];
|
||||||
/// assert!(buffer.bounds_check(0, 8).is_ok());
|
/// assert!(buffer.bounds_check(0, 8).is_ok());
|
||||||
@ -39,7 +39,7 @@ pub trait AtomicBuffer: Deref<Target = [u8]> + DerefMut<Target = [u8]> {
|
|||||||
/// NOTE: Has the potential to cause undefined behavior if alignment is incorrect.
|
/// NOTE: Has the potential to cause undefined behavior if alignment is incorrect.
|
||||||
///
|
///
|
||||||
/// ```rust
|
/// ```rust
|
||||||
/// # use aeron_rs::client::concurrent::AtomicBuffer;
|
/// # use aeron_rs::concurrent::AtomicBuffer;
|
||||||
/// # use std::sync::atomic::{AtomicI64, Ordering};
|
/// # use std::sync::atomic::{AtomicI64, Ordering};
|
||||||
/// let buffer = &mut [0u8; 9][..];
|
/// let buffer = &mut [0u8; 9][..];
|
||||||
///
|
///
|
||||||
@ -80,7 +80,7 @@ pub trait AtomicBuffer: Deref<Target = [u8]> + DerefMut<Target = [u8]> {
|
|||||||
/// Overlay a struct on a buffer, and perform a volatile read
|
/// Overlay a struct on a buffer, and perform a volatile read
|
||||||
///
|
///
|
||||||
/// ```rust
|
/// ```rust
|
||||||
/// # use aeron_rs::client::concurrent::AtomicBuffer;
|
/// # use aeron_rs::concurrent::AtomicBuffer;
|
||||||
/// let buffer = &mut [5, 0, 0, 0][..];
|
/// let buffer = &mut [5, 0, 0, 0][..];
|
||||||
///
|
///
|
||||||
/// let my_val: u32 = buffer.overlay_volatile::<u32>(0).unwrap();
|
/// let my_val: u32 = buffer.overlay_volatile::<u32>(0).unwrap();
|
||||||
@ -100,7 +100,7 @@ pub trait AtomicBuffer: Deref<Target = [u8]> + DerefMut<Target = [u8]> {
|
|||||||
/// Perform a volatile write of a value over a buffer
|
/// Perform a volatile write of a value over a buffer
|
||||||
///
|
///
|
||||||
/// ```rust
|
/// ```rust
|
||||||
/// # use aeron_rs::client::concurrent::AtomicBuffer;
|
/// # use aeron_rs::concurrent::AtomicBuffer;
|
||||||
/// let mut buffer = &mut [0, 0, 0, 0][..];
|
/// let mut buffer = &mut [0, 0, 0, 0][..];
|
||||||
///
|
///
|
||||||
/// let value: u32 = 24;
|
/// let value: u32 = 24;
|
||||||
@ -121,7 +121,7 @@ pub trait AtomicBuffer: Deref<Target = [u8]> + DerefMut<Target = [u8]> {
|
|||||||
/// Perform an atomic fetch and add of a 64-bit value
|
/// Perform an atomic fetch and add of a 64-bit value
|
||||||
///
|
///
|
||||||
/// ```rust
|
/// ```rust
|
||||||
/// # use aeron_rs::client::concurrent::AtomicBuffer;
|
/// # use aeron_rs::concurrent::AtomicBuffer;
|
||||||
/// let mut buf = vec![0u8; 8];
|
/// let mut buf = vec![0u8; 8];
|
||||||
/// assert_eq!(buf.get_and_add_i64(0, 1), Ok(0));
|
/// assert_eq!(buf.get_and_add_i64(0, 1), Ok(0));
|
||||||
/// assert_eq!(buf.get_and_add_i64(0, 1), Ok(1));
|
/// assert_eq!(buf.get_and_add_i64(0, 1), Ok(1));
|
||||||
@ -135,7 +135,7 @@ pub trait AtomicBuffer: Deref<Target = [u8]> + DerefMut<Target = [u8]> {
|
|||||||
/// if the update was successful, and `Ok(false)` if the update failed.
|
/// if the update was successful, and `Ok(false)` if the update failed.
|
||||||
///
|
///
|
||||||
/// ```rust
|
/// ```rust
|
||||||
/// # use aeron_rs::client::concurrent::AtomicBuffer;
|
/// # use aeron_rs::concurrent::AtomicBuffer;
|
||||||
/// let mut buf = &mut [0u8; 8][..];
|
/// let mut buf = &mut [0u8; 8][..];
|
||||||
/// // Set value to 1
|
/// // Set value to 1
|
||||||
/// buf.get_and_add_i64(0, 1).unwrap();
|
/// buf.get_and_add_i64(0, 1).unwrap();
|
||||||
@ -161,7 +161,7 @@ pub trait AtomicBuffer: Deref<Target = [u8]> + DerefMut<Target = [u8]> {
|
|||||||
/// Perform a volatile read of an `i64` value
|
/// Perform a volatile read of an `i64` value
|
||||||
///
|
///
|
||||||
/// ```rust
|
/// ```rust
|
||||||
/// # use aeron_rs::client::concurrent::AtomicBuffer;
|
/// # use aeron_rs::concurrent::AtomicBuffer;
|
||||||
/// let buffer = vec![12u8, 0, 0, 0, 0, 0, 0, 0];
|
/// let buffer = vec![12u8, 0, 0, 0, 0, 0, 0, 0];
|
||||||
/// assert_eq!(buffer.get_i64_volatile(0), Ok(12));
|
/// assert_eq!(buffer.get_i64_volatile(0), Ok(12));
|
||||||
/// ```
|
/// ```
|
||||||
@ -178,7 +178,7 @@ pub trait AtomicBuffer: Deref<Target = [u8]> + DerefMut<Target = [u8]> {
|
|||||||
/// Perform a volatile write of an `i64` value
|
/// Perform a volatile write of an `i64` value
|
||||||
///
|
///
|
||||||
/// ```rust
|
/// ```rust
|
||||||
/// # use aeron_rs::client::concurrent::AtomicBuffer;
|
/// # use aeron_rs::concurrent::AtomicBuffer;
|
||||||
/// let mut buffer = vec![0u8; 8];
|
/// let mut buffer = vec![0u8; 8];
|
||||||
/// buffer.put_i64_ordered(0, 12);
|
/// buffer.put_i64_ordered(0, 12);
|
||||||
/// assert_eq!(buffer.get_i64_volatile(0), Ok(12));
|
/// assert_eq!(buffer.get_i64_volatile(0), Ok(12));
|
||||||
@ -190,7 +190,7 @@ pub trait AtomicBuffer: Deref<Target = [u8]> + DerefMut<Target = [u8]> {
|
|||||||
/// Write an `i64` value into the buffer without performing any synchronization
|
/// Write an `i64` value into the buffer without performing any synchronization
|
||||||
///
|
///
|
||||||
/// ```rust
|
/// ```rust
|
||||||
/// # use aeron_rs::client::concurrent::AtomicBuffer;
|
/// # use aeron_rs::concurrent::AtomicBuffer;
|
||||||
/// let mut buffer = vec![0u8; 8];
|
/// let mut buffer = vec![0u8; 8];
|
||||||
/// buffer.put_i64(0, 12);
|
/// buffer.put_i64(0, 12);
|
||||||
/// assert_eq!(buffer.get_i64(0), Ok(12));
|
/// assert_eq!(buffer.get_i64(0), Ok(12));
|
||||||
@ -233,7 +233,7 @@ pub trait AtomicBuffer: Deref<Target = [u8]> + DerefMut<Target = [u8]> {
|
|||||||
/// Perform a volatile read of an `i32` from the buffer
|
/// Perform a volatile read of an `i32` from the buffer
|
||||||
///
|
///
|
||||||
/// ```rust
|
/// ```rust
|
||||||
/// # use aeron_rs::client::concurrent::AtomicBuffer;
|
/// # use aeron_rs::concurrent::AtomicBuffer;
|
||||||
/// let buffer = vec![0, 12, 0, 0, 0];
|
/// let buffer = vec![0, 12, 0, 0, 0];
|
||||||
/// assert_eq!(buffer.get_i32_volatile(1), Ok(12));
|
/// assert_eq!(buffer.get_i32_volatile(1), Ok(12));
|
||||||
/// ```
|
/// ```
|
||||||
@ -249,7 +249,7 @@ pub trait AtomicBuffer: Deref<Target = [u8]> + DerefMut<Target = [u8]> {
|
|||||||
/// Perform a volatile write of an `i32` into the buffer
|
/// Perform a volatile write of an `i32` into the buffer
|
||||||
///
|
///
|
||||||
/// ```rust
|
/// ```rust
|
||||||
/// # use aeron_rs::client::concurrent::AtomicBuffer;
|
/// # use aeron_rs::concurrent::AtomicBuffer;
|
||||||
/// let mut bytes = vec![0u8; 4];
|
/// let mut bytes = vec![0u8; 4];
|
||||||
/// bytes.put_i32_ordered(0, 12);
|
/// bytes.put_i32_ordered(0, 12);
|
||||||
/// assert_eq!(bytes.get_i32_volatile(0), Ok(12));
|
/// assert_eq!(bytes.get_i32_volatile(0), Ok(12));
|
||||||
@ -261,7 +261,7 @@ pub trait AtomicBuffer: Deref<Target = [u8]> + DerefMut<Target = [u8]> {
|
|||||||
/// Write an `i32` value into the buffer without performing any synchronization
|
/// Write an `i32` value into the buffer without performing any synchronization
|
||||||
///
|
///
|
||||||
/// ```rust
|
/// ```rust
|
||||||
/// # use aeron_rs::client::concurrent::AtomicBuffer;
|
/// # use aeron_rs::concurrent::AtomicBuffer;
|
||||||
/// let mut buffer = vec![0u8; 5];
|
/// let mut buffer = vec![0u8; 5];
|
||||||
/// buffer.put_i32(0, 255 + 1);
|
/// buffer.put_i32(0, 255 + 1);
|
||||||
/// assert_eq!(buffer.get_i32(1), Ok(1));
|
/// assert_eq!(buffer.get_i32(1), Ok(1));
|
@ -1,5 +1,5 @@
|
|||||||
//! Ring buffer wrapper for communicating with the Media Driver
|
//! Ring buffer wrapper for communicating with the Media Driver
|
||||||
use crate::client::concurrent::AtomicBuffer;
|
use crate::concurrent::AtomicBuffer;
|
||||||
use crate::util::bit::align;
|
use crate::util::bit::align;
|
||||||
use crate::util::{bit, AeronError, IndexT, Result};
|
use crate::util::{bit, AeronError, IndexT, Result};
|
||||||
use std::ops::{Deref, DerefMut};
|
use std::ops::{Deref, DerefMut};
|
||||||
@ -401,8 +401,8 @@ where
|
|||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
use crate::client::concurrent::ringbuffer::ManyToOneRingBuffer;
|
use crate::concurrent::ringbuffer::ManyToOneRingBuffer;
|
||||||
use crate::client::concurrent::AtomicBuffer;
|
use crate::concurrent::AtomicBuffer;
|
||||||
|
|
||||||
const BUFFER_SIZE: usize = 512 + super::buffer_descriptor::TRAILER_LENGTH as usize;
|
const BUFFER_SIZE: usize = 512 + super::buffer_descriptor::TRAILER_LENGTH as usize;
|
||||||
|
|
@ -1,108 +0,0 @@
|
|||||||
//! Utilities for interacting with the control protocol of the Media Driver
|
|
||||||
use aeron_driver_sys::*;
|
|
||||||
|
|
||||||
/// Construct a C-compatible enum out of a set of constants.
|
|
||||||
/// Commonly used for types in Aeron that have fixed values via `#define`,
|
|
||||||
/// but aren't actually enums (e.g. AERON_COMMAND_.*, AERON_ERROR_CODE_.*).
|
|
||||||
/// Behavior is ultimately very similar to `num::FromPrimitive`.
|
|
||||||
macro_rules! define_enum {
|
|
||||||
(
|
|
||||||
$(#[$outer:meta])*
|
|
||||||
pub enum $name:ident {$(
|
|
||||||
$(#[$inner:meta]),*
|
|
||||||
$left:ident = $right:ident,
|
|
||||||
)+}
|
|
||||||
) => {
|
|
||||||
#[repr(u32)]
|
|
||||||
#[derive(Debug, PartialEq)]
|
|
||||||
$(#[$outer])*
|
|
||||||
pub enum $name {$(
|
|
||||||
$(#[$inner])*
|
|
||||||
$left = $right,
|
|
||||||
)*}
|
|
||||||
|
|
||||||
impl ::std::convert::TryFrom<u32> for $name {
|
|
||||||
type Error = ();
|
|
||||||
fn try_from(val: u32) -> Result<$name, ()> {
|
|
||||||
match val {
|
|
||||||
$(v if v == $name::$left as u32 => Ok($name::$left)),*,
|
|
||||||
_ => Err(())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
define_enum!(
|
|
||||||
#[doc = "Commands sent from clients to the Media Driver"]
|
|
||||||
pub enum ClientCommand {
|
|
||||||
#[doc = "Add a Publication"]
|
|
||||||
AddPublication = AERON_COMMAND_ADD_PUBLICATION,
|
|
||||||
#[doc = "Remove a Publication"]
|
|
||||||
RemovePublication = AERON_COMMAND_REMOVE_PUBLICATION,
|
|
||||||
#[doc = "Add an Exclusive Publication"]
|
|
||||||
AddExclusivePublication = AERON_COMMAND_ADD_EXCLUSIVE_PUBLICATION,
|
|
||||||
#[doc = "Add a Subscriber"]
|
|
||||||
AddSubscription = AERON_COMMAND_ADD_SUBSCRIPTION,
|
|
||||||
#[doc = "Remove a Subscriber"]
|
|
||||||
RemoveSubscription = AERON_COMMAND_REMOVE_SUBSCRIPTION,
|
|
||||||
#[doc = "Keepalaive from Client"]
|
|
||||||
ClientKeepalive = AERON_COMMAND_CLIENT_KEEPALIVE,
|
|
||||||
#[doc = "Add Destination to an existing Publication"]
|
|
||||||
AddDestination = AERON_COMMAND_ADD_DESTINATION,
|
|
||||||
#[doc = "Remove Destination from an existing Publication"]
|
|
||||||
RemoveDestination = AERON_COMMAND_REMOVE_DESTINATION,
|
|
||||||
#[doc = "Add a Counter to the counters manager"]
|
|
||||||
AddCounter = AERON_COMMAND_ADD_COUNTER,
|
|
||||||
#[doc = "Remove a Counter from the counters manager"]
|
|
||||||
RemoveCounter = AERON_COMMAND_REMOVE_COUNTER,
|
|
||||||
#[doc = "Close indication from Client"]
|
|
||||||
ClientClose = AERON_COMMAND_CLIENT_CLOSE,
|
|
||||||
#[doc = "Add Destination for existing Subscription"]
|
|
||||||
AddRcvDestination = AERON_COMMAND_ADD_RCV_DESTINATION,
|
|
||||||
#[doc = "Remove Destination for existing Subscription"]
|
|
||||||
RemoveRcvDestination = AERON_COMMAND_REMOVE_RCV_DESTINATION,
|
|
||||||
#[doc = "Request the driver to terminate"]
|
|
||||||
TerminateDriver = AERON_COMMAND_TERMINATE_DRIVER,
|
|
||||||
}
|
|
||||||
);
|
|
||||||
|
|
||||||
define_enum!(
|
|
||||||
#[doc = "Responses from the Media Driver to client commands"]
|
|
||||||
pub enum DriverResponse {
|
|
||||||
#[doc = "Error Response as a result of attempting to process a client command operation"]
|
|
||||||
OnError = AERON_RESPONSE_ON_ERROR,
|
|
||||||
#[doc = "Subscribed Image buffers are available notification"]
|
|
||||||
OnAvailableImage = AERON_RESPONSE_ON_AVAILABLE_IMAGE,
|
|
||||||
#[doc = "New Publication buffers are ready notification"]
|
|
||||||
OnPublicationReady = AERON_RESPONSE_ON_PUBLICATION_READY,
|
|
||||||
#[doc = "Operation has succeeded"]
|
|
||||||
OnOperationSuccess = AERON_RESPONSE_ON_OPERATION_SUCCESS,
|
|
||||||
#[doc = "Inform client of timeout and removal of an inactive Image"]
|
|
||||||
OnUnavailableImage = AERON_RESPONSE_ON_UNAVAILABLE_IMAGE,
|
|
||||||
#[doc = "New Exclusive Publication buffers are ready notification"]
|
|
||||||
OnExclusivePublicationReady = AERON_RESPONSE_ON_EXCLUSIVE_PUBLICATION_READY,
|
|
||||||
#[doc = "New Subscription is ready notification"]
|
|
||||||
OnSubscriptionReady = AERON_RESPONSE_ON_SUBSCRIPTION_READY,
|
|
||||||
#[doc = "New counter is ready notification"]
|
|
||||||
OnCounterReady = AERON_RESPONSE_ON_COUNTER_READY,
|
|
||||||
#[doc = "Inform clients of removal of counter"]
|
|
||||||
OnUnavailableCounter = AERON_RESPONSE_ON_UNAVAILABLE_COUNTER,
|
|
||||||
#[doc = "Inform clients of client timeout"]
|
|
||||||
OnClientTimeout = AERON_RESPONSE_ON_CLIENT_TIMEOUT,
|
|
||||||
}
|
|
||||||
);
|
|
||||||
|
|
||||||
#[cfg(test)]
|
|
||||||
mod tests {
|
|
||||||
use crate::control_protocol::ClientCommand;
|
|
||||||
use std::convert::TryInto;
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn client_command_convert() {
|
|
||||||
assert_eq!(
|
|
||||||
Ok(ClientCommand::AddPublication),
|
|
||||||
::aeron_driver_sys::AERON_COMMAND_ADD_PUBLICATION.try_into()
|
|
||||||
)
|
|
||||||
}
|
|
||||||
}
|
|
@ -1,4 +1,4 @@
|
|||||||
//! Bindings for the C Media Driver
|
//! High-level bindings for the C Media Driver
|
||||||
|
|
||||||
use std::ffi::{CStr, CString};
|
use std::ffi::{CStr, CString};
|
||||||
use std::path::Path;
|
use std::path::Path;
|
||||||
|
@ -4,8 +4,9 @@
|
|||||||
#[cfg(target_endian = "big")]
|
#[cfg(target_endian = "big")]
|
||||||
compile_error!("Aeron is only supported on little-endian architectures");
|
compile_error!("Aeron is only supported on little-endian architectures");
|
||||||
|
|
||||||
pub mod client;
|
pub mod cnc_descriptor;
|
||||||
pub mod control_protocol;
|
pub mod concurrent;
|
||||||
|
pub mod context;
|
||||||
pub mod driver;
|
pub mod driver;
|
||||||
pub mod util;
|
pub mod util;
|
||||||
|
|
||||||
|
@ -1,7 +1,7 @@
|
|||||||
use aeron_rs::client::concurrent::broadcast::{
|
use aeron_rs::concurrent::broadcast::{
|
||||||
buffer_descriptor, record_descriptor, BroadcastReceiver,
|
buffer_descriptor, record_descriptor, BroadcastReceiver,
|
||||||
};
|
};
|
||||||
use aeron_rs::client::concurrent::AtomicBuffer;
|
use aeron_rs::concurrent::AtomicBuffer;
|
||||||
use aeron_rs::util::bit::align;
|
use aeron_rs::util::bit::align;
|
||||||
use aeron_rs::util::IndexT;
|
use aeron_rs::util::IndexT;
|
||||||
|
|
||||||
|
@ -1,8 +1,8 @@
|
|||||||
use aeron_driver_sys::*;
|
use aeron_driver_sys::*;
|
||||||
use aeron_rs::client::cnc_descriptor;
|
use aeron_rs::cnc_descriptor;
|
||||||
use aeron_rs::client::cnc_descriptor::MetaDataDefinition;
|
use aeron_rs::cnc_descriptor::MetaDataDefinition;
|
||||||
use aeron_rs::client::concurrent::ringbuffer::ManyToOneRingBuffer;
|
use aeron_rs::concurrent::ringbuffer::ManyToOneRingBuffer;
|
||||||
use aeron_rs::client::concurrent::AtomicBuffer;
|
use aeron_rs::concurrent::AtomicBuffer;
|
||||||
use aeron_rs::util::IndexT;
|
use aeron_rs::util::IndexT;
|
||||||
use memmap::MmapOptions;
|
use memmap::MmapOptions;
|
||||||
use std::ffi::{c_void, CString};
|
use std::ffi::{c_void, CString};
|
||||||
|
@ -1,8 +1,8 @@
|
|||||||
/// Tests based on the C++ tests included with Aeron
|
/// Tests based on the C++ tests included with Aeron
|
||||||
use aeron_rs::client::concurrent::ringbuffer::{
|
use aeron_rs::concurrent::ringbuffer::{
|
||||||
buffer_descriptor, record_descriptor, ManyToOneRingBuffer,
|
buffer_descriptor, record_descriptor, ManyToOneRingBuffer,
|
||||||
};
|
};
|
||||||
use aeron_rs::client::concurrent::AtomicBuffer;
|
use aeron_rs::concurrent::AtomicBuffer;
|
||||||
use aeron_rs::util::bit::align;
|
use aeron_rs::util::bit::align;
|
||||||
use aeron_rs::util::IndexT;
|
use aeron_rs::util::IndexT;
|
||||||
use std::ops::Deref;
|
use std::ops::Deref;
|
||||||
|
Loading…
Reference in New Issue
Block a user