diff --git a/aeron-rs/src/command/correlated_message.rs b/aeron-rs/src/command/correlated_message.rs
index 2110bb0..1c4638a 100644
--- a/aeron-rs/src/command/correlated_message.rs
+++ b/aeron-rs/src/command/correlated_message.rs
@@ -1,30 +1,53 @@
+//! Header struct for commands that use an identifier to associate the media driver response.
use crate::command::flyweight::Flyweight;
use crate::concurrent::AtomicBuffer;
+/// Basic definition for messages that include a client and correlation identifier to associate
+/// commands and responses
+#[repr(C, packed(4))]
pub struct CorrelatedMessageDefn {
- pub(crate) client_id: i64,
- pub(crate) correlation_id: i64,
+ pub(in crate::command) client_id: i64,
+ pub(in crate::command) correlation_id: i64,
}
impl Flyweight
where
A: AtomicBuffer,
{
+ /// Retrieve the client identifier associated with this message
pub fn client_id(&self) -> i64 {
self.get_struct().client_id
}
+ /// Set the client identifier for this message
pub fn put_client_id(&mut self, value: i64) -> &mut Self {
self.get_struct_mut().client_id = value;
self
}
+ /// Retrieve the correlation identifier associated with this message.
+ /// Will uniquely identify a command and response pair.
pub fn correlation_id(&self) -> i64 {
self.get_struct().correlation_id
}
+ /// Set the correlation identifier for this message
pub fn put_correlation_id(&mut self, value: i64) -> &mut Self {
self.get_struct_mut().correlation_id = value;
self
}
}
+
+#[cfg(test)]
+mod tests {
+ use crate::command::correlated_message::CorrelatedMessageDefn;
+ use std::mem::size_of;
+
+ #[test]
+ fn correlated_message_size() {
+ assert_eq!(
+ size_of::(),
+ size_of::()
+ )
+ }
+}
diff --git a/aeron-rs/src/command/flyweight.rs b/aeron-rs/src/command/flyweight.rs
index 4e3162c..07cf6d8 100644
--- a/aeron-rs/src/command/flyweight.rs
+++ b/aeron-rs/src/command/flyweight.rs
@@ -1,7 +1,10 @@
+//! Flyweight pattern implementation for messages to and from the media driver.
use crate::concurrent::AtomicBuffer;
use crate::util::{IndexT, Result};
use std::marker::PhantomData;
+/// Flyweight holder object. Wrapper around an underlying `AtomicBuffer` and
+/// offset within that buffer that all future operations are relative to.
pub struct Flyweight
where
A: AtomicBuffer,
@@ -11,12 +14,17 @@ where
_phantom: PhantomData,
}
+/// Marker struct.
+// We can't put this `new` method in the fully generic implementation because
+// Rust gets confused as to what type `S` should be.
pub struct Unchecked;
impl Flyweight
where
A: AtomicBuffer,
{
+ /// Create a new flyweight object. Performs a bounds check on initialization
+ /// to ensure there is space available for `S`.
#[allow(clippy::new_ret_no_self)]
pub fn new(buffer: A, offset: IndexT) -> Result>
where
diff --git a/aeron-rs/src/command/mod.rs b/aeron-rs/src/command/mod.rs
index 83e46c0..8993c97 100644
--- a/aeron-rs/src/command/mod.rs
+++ b/aeron-rs/src/command/mod.rs
@@ -1,3 +1,4 @@
+//! Message definitions for interactions with the Media Driver
pub mod correlated_message;
pub mod flyweight;
pub mod terminate_driver;
diff --git a/aeron-rs/src/command/terminate_driver.rs b/aeron-rs/src/command/terminate_driver.rs
index 24f1c68..21ce6dc 100644
--- a/aeron-rs/src/command/terminate_driver.rs
+++ b/aeron-rs/src/command/terminate_driver.rs
@@ -1,51 +1,69 @@
+//! Flyweight implementation for commands to terminate the driver
use crate::command::correlated_message::CorrelatedMessageDefn;
use crate::command::flyweight::Flyweight;
use crate::concurrent::AtomicBuffer;
use crate::util::IndexT;
use std::mem::size_of;
+/// Raw command to terminate a driver. The `token_length` describes the length
+/// of a buffer immediately trailing this struct definition and part of the
+/// same message.
+#[repr(C, packed(4))]
pub struct TerminateDriverDefn {
- pub(crate) correlated_message: CorrelatedMessageDefn,
- pub(crate) token_length: i32,
+ pub(in crate::command) correlated_message: CorrelatedMessageDefn,
+ pub(in crate::command) token_length: i32,
}
impl Flyweight
where
A: AtomicBuffer,
{
+ /// Retrieve the client identifier of this request.
pub fn client_id(&self) -> i64 {
self.get_struct().correlated_message.client_id
}
+ /// Set the client identifier of this request.
pub fn put_client_id(&mut self, value: i64) -> &mut Self {
self.get_struct_mut().correlated_message.client_id = value;
self
}
+ /// Retrieve the correlation identifier associated with this request. Used to
+ /// associate driver responses with a specific request.
pub fn correlation_id(&self) -> i64 {
self.get_struct().correlated_message.correlation_id
}
+ /// Set the correlation identifier to be used with this request.
pub fn put_correlation_id(&mut self, value: i64) -> &mut Self {
self.get_struct_mut().correlated_message.correlation_id = value;
self
}
+ /// Get the current length of the payload associated with this termination request.
pub fn token_length(&self) -> i32 {
self.get_struct().token_length
}
+ /// Set the payload length of this termination request.
+ ///
+ /// NOTE: While there are no safety issues, improperly setting this value can cause panics.
+ /// The `token_length` value is automatically set during calls to `put_token_buffer()`,
+ /// so this method is not likely to be frequently used.
pub fn put_token_length(&mut self, value: i32) -> &mut Self {
self.get_struct_mut().token_length = value;
self
}
+ /// Return the current token payload associated with this termination request.
pub fn token_buffer(&self) -> &[u8] {
// QUESTION: Should I be slicing the buffer to `token_length`?
// C++ doesn't do anything, so I'm going to assume not.
&self.bytes_at(size_of::() as IndexT)
}
+ /// Append a payload to the termination request.
pub fn put_token_buffer(&mut self, token_buffer: &[u8]) -> &mut Self {
let token_length = token_buffer.len() as i32;
self.get_struct_mut().token_length = token_length;
@@ -66,7 +84,22 @@ where
self
}
+ /// Get the total byte length of this termination command
pub fn length(&self) -> IndexT {
size_of::() as IndexT + self.token_length()
}
}
+
+#[cfg(test)]
+mod tests {
+ use crate::command::terminate_driver::TerminateDriverDefn;
+ use std::mem::size_of;
+
+ #[test]
+ fn terminate_command_size() {
+ assert_eq!(
+ size_of::(),
+ size_of::()
+ )
+ }
+}
diff --git a/aeron-rs/src/concurrent/mod.rs b/aeron-rs/src/concurrent/mod.rs
index a28b85a..70a1e46 100644
--- a/aeron-rs/src/concurrent/mod.rs
+++ b/aeron-rs/src/concurrent/mod.rs
@@ -203,6 +203,7 @@ pub trait AtomicBuffer: Deref + DerefMut {
self.overlay_mut::(offset).map(|i| *i = value)
}
+ /// Write the contents of a byte slice to this buffer. Does not perform any synchronization
fn put_slice(
&mut self,
index: IndexT,
diff --git a/aeron-rs/src/concurrent/ringbuffer.rs b/aeron-rs/src/concurrent/ringbuffer.rs
index c8bc7eb..87f01ad 100644
--- a/aeron-rs/src/concurrent/ringbuffer.rs
+++ b/aeron-rs/src/concurrent/ringbuffer.rs
@@ -383,8 +383,13 @@ where
self.max_msg_length
}
- pub fn consumer_heartbeat_time(&self) -> Result {
- self.buffer.get_i64_volatile(self.consumer_heartbeat_index)
+ /// Return the last heartbeat timestamp associated with the consumer of this queue.
+ /// Timestamps are milliseconds since 1 Jan 1970, UTC.
+ pub fn consumer_heartbeat_time(&self) -> i64 {
+ // UNWRAP: Known-valid offset calculated during initialization
+ self.buffer
+ .get_i64_volatile(self.consumer_heartbeat_index)
+ .unwrap()
}
}
diff --git a/aeron-rs/src/control_protocol.rs b/aeron-rs/src/control_protocol.rs
index 43d23e4..dedb124 100644
--- a/aeron-rs/src/control_protocol.rs
+++ b/aeron-rs/src/control_protocol.rs
@@ -1,3 +1,4 @@
+//! Utilities for wrapping the command-and-control protocol with a nicer API
use aeron_driver_sys::*;
/// Construct a C-compatible enum out of a set of constants.
diff --git a/aeron-rs/src/driver_proxy.rs b/aeron-rs/src/driver_proxy.rs
index d55d888..3d3fdbf 100644
--- a/aeron-rs/src/driver_proxy.rs
+++ b/aeron-rs/src/driver_proxy.rs
@@ -1,3 +1,4 @@
+//! High level API for issuing commands to the Media Driver
use crate::command::flyweight::Flyweight;
use crate::command::terminate_driver::TerminateDriverDefn;
use crate::concurrent::ringbuffer::ManyToOneRingBuffer;
@@ -5,6 +6,7 @@ use crate::concurrent::AtomicBuffer;
use crate::control_protocol::ClientCommand;
use crate::util::{AeronError, IndexT, Result};
+/// High-level interface for issuing commands to a media driver
pub struct DriverProxy
where
A: AtomicBuffer,
@@ -17,6 +19,7 @@ impl DriverProxy
where
A: AtomicBuffer,
{
+ /// Initialize a new driver proxy from a command-and-control "to driver" buffer
pub fn new(to_driver: ManyToOneRingBuffer) -> Self {
let client_id = to_driver.next_correlation_id();
DriverProxy {
@@ -25,14 +28,19 @@ where
}
}
- pub fn time_of_last_driver_keepalive(&self) -> Result {
+ /// Retrieve the timestamp of the most recent driver heartbeat. Values are
+ /// milliseconds past 1 Jan 1970, UTC.
+ pub fn time_of_last_driver_keepalive(&self) -> i64 {
self.to_driver.consumer_heartbeat_time()
}
+ /// Get the unique identifier associated with this proxy.
pub fn client_id(&self) -> i64 {
self.client_id
}
+ /// Request termination of the media driver. Optionally supply a payload on the request
+ /// that will be available to the driver.
pub fn terminate_driver(&mut self, token_buffer: Option<&[u8]>) -> Result<()> {
let client_id = self.client_id;
self.write_command_to_driver(|buffer: &mut [u8], length: &mut IndexT| {
diff --git a/aeron-rs/src/lib.rs b/aeron-rs/src/lib.rs
index 9fc7905..b06c98e 100644
--- a/aeron-rs/src/lib.rs
+++ b/aeron-rs/src/lib.rs
@@ -1,4 +1,5 @@
//! [Aeron](https://github.com/real-logic/aeron) client for Rust
+#![deny(missing_docs)]
#[cfg(target_endian = "big")]
compile_error!("Aeron is only supported on little-endian architectures");
diff --git a/aeron-rs/tests/cnc_terminate.rs b/aeron-rs/tests/cnc_terminate.rs
index 3f01086..6e4cf8d 100644
--- a/aeron-rs/tests/cnc_terminate.rs
+++ b/aeron-rs/tests/cnc_terminate.rs
@@ -4,7 +4,6 @@ use aeron_rs::cnc_descriptor::MetaDataDefinition;
use aeron_rs::concurrent::ringbuffer::ManyToOneRingBuffer;
use aeron_rs::concurrent::AtomicBuffer;
use aeron_rs::driver_proxy::DriverProxy;
-use aeron_rs::util::IndexT;
use memmap::MmapOptions;
use std::ffi::{c_void, CString};
use std::fs::OpenOptions;