From 32402823549bb256a6ae2b9c4192606f06885c9d Mon Sep 17 00:00:00 2001 From: Bradlee Speice Date: Thu, 7 Nov 2019 22:52:26 -0500 Subject: [PATCH] Re-enable `deny(docs)` It's better than leaving it out, but I don't think I've yet done a good job at explaining what each element's purpose is. --- aeron-rs/src/command/correlated_message.rs | 27 ++++++++++++++-- aeron-rs/src/command/flyweight.rs | 8 +++++ aeron-rs/src/command/mod.rs | 1 + aeron-rs/src/command/terminate_driver.rs | 37 ++++++++++++++++++++-- aeron-rs/src/concurrent/mod.rs | 1 + aeron-rs/src/concurrent/ringbuffer.rs | 9 ++++-- aeron-rs/src/control_protocol.rs | 1 + aeron-rs/src/driver_proxy.rs | 10 +++++- aeron-rs/src/lib.rs | 1 + aeron-rs/tests/cnc_terminate.rs | 1 - 10 files changed, 88 insertions(+), 8 deletions(-) diff --git a/aeron-rs/src/command/correlated_message.rs b/aeron-rs/src/command/correlated_message.rs index 2110bb0..1c4638a 100644 --- a/aeron-rs/src/command/correlated_message.rs +++ b/aeron-rs/src/command/correlated_message.rs @@ -1,30 +1,53 @@ +//! Header struct for commands that use an identifier to associate the media driver response. use crate::command::flyweight::Flyweight; use crate::concurrent::AtomicBuffer; +/// Basic definition for messages that include a client and correlation identifier to associate +/// commands and responses +#[repr(C, packed(4))] pub struct CorrelatedMessageDefn { - pub(crate) client_id: i64, - pub(crate) correlation_id: i64, + pub(in crate::command) client_id: i64, + pub(in crate::command) correlation_id: i64, } impl Flyweight where A: AtomicBuffer, { + /// Retrieve the client identifier associated with this message pub fn client_id(&self) -> i64 { self.get_struct().client_id } + /// Set the client identifier for this message pub fn put_client_id(&mut self, value: i64) -> &mut Self { self.get_struct_mut().client_id = value; self } + /// Retrieve the correlation identifier associated with this message. + /// Will uniquely identify a command and response pair. pub fn correlation_id(&self) -> i64 { self.get_struct().correlation_id } + /// Set the correlation identifier for this message pub fn put_correlation_id(&mut self, value: i64) -> &mut Self { self.get_struct_mut().correlation_id = value; self } } + +#[cfg(test)] +mod tests { + use crate::command::correlated_message::CorrelatedMessageDefn; + use std::mem::size_of; + + #[test] + fn correlated_message_size() { + assert_eq!( + size_of::(), + size_of::() + ) + } +} diff --git a/aeron-rs/src/command/flyweight.rs b/aeron-rs/src/command/flyweight.rs index 4e3162c..07cf6d8 100644 --- a/aeron-rs/src/command/flyweight.rs +++ b/aeron-rs/src/command/flyweight.rs @@ -1,7 +1,10 @@ +//! Flyweight pattern implementation for messages to and from the media driver. use crate::concurrent::AtomicBuffer; use crate::util::{IndexT, Result}; use std::marker::PhantomData; +/// Flyweight holder object. Wrapper around an underlying `AtomicBuffer` and +/// offset within that buffer that all future operations are relative to. pub struct Flyweight where A: AtomicBuffer, @@ -11,12 +14,17 @@ where _phantom: PhantomData, } +/// Marker struct. +// We can't put this `new` method in the fully generic implementation because +// Rust gets confused as to what type `S` should be. pub struct Unchecked; impl Flyweight where A: AtomicBuffer, { + /// Create a new flyweight object. Performs a bounds check on initialization + /// to ensure there is space available for `S`. #[allow(clippy::new_ret_no_self)] pub fn new(buffer: A, offset: IndexT) -> Result> where diff --git a/aeron-rs/src/command/mod.rs b/aeron-rs/src/command/mod.rs index 83e46c0..8993c97 100644 --- a/aeron-rs/src/command/mod.rs +++ b/aeron-rs/src/command/mod.rs @@ -1,3 +1,4 @@ +//! Message definitions for interactions with the Media Driver pub mod correlated_message; pub mod flyweight; pub mod terminate_driver; diff --git a/aeron-rs/src/command/terminate_driver.rs b/aeron-rs/src/command/terminate_driver.rs index 24f1c68..21ce6dc 100644 --- a/aeron-rs/src/command/terminate_driver.rs +++ b/aeron-rs/src/command/terminate_driver.rs @@ -1,51 +1,69 @@ +//! Flyweight implementation for commands to terminate the driver use crate::command::correlated_message::CorrelatedMessageDefn; use crate::command::flyweight::Flyweight; use crate::concurrent::AtomicBuffer; use crate::util::IndexT; use std::mem::size_of; +/// Raw command to terminate a driver. The `token_length` describes the length +/// of a buffer immediately trailing this struct definition and part of the +/// same message. +#[repr(C, packed(4))] pub struct TerminateDriverDefn { - pub(crate) correlated_message: CorrelatedMessageDefn, - pub(crate) token_length: i32, + pub(in crate::command) correlated_message: CorrelatedMessageDefn, + pub(in crate::command) token_length: i32, } impl Flyweight where A: AtomicBuffer, { + /// Retrieve the client identifier of this request. pub fn client_id(&self) -> i64 { self.get_struct().correlated_message.client_id } + /// Set the client identifier of this request. pub fn put_client_id(&mut self, value: i64) -> &mut Self { self.get_struct_mut().correlated_message.client_id = value; self } + /// Retrieve the correlation identifier associated with this request. Used to + /// associate driver responses with a specific request. pub fn correlation_id(&self) -> i64 { self.get_struct().correlated_message.correlation_id } + /// Set the correlation identifier to be used with this request. pub fn put_correlation_id(&mut self, value: i64) -> &mut Self { self.get_struct_mut().correlated_message.correlation_id = value; self } + /// Get the current length of the payload associated with this termination request. pub fn token_length(&self) -> i32 { self.get_struct().token_length } + /// Set the payload length of this termination request. + /// + /// NOTE: While there are no safety issues, improperly setting this value can cause panics. + /// The `token_length` value is automatically set during calls to `put_token_buffer()`, + /// so this method is not likely to be frequently used. pub fn put_token_length(&mut self, value: i32) -> &mut Self { self.get_struct_mut().token_length = value; self } + /// Return the current token payload associated with this termination request. pub fn token_buffer(&self) -> &[u8] { // QUESTION: Should I be slicing the buffer to `token_length`? // C++ doesn't do anything, so I'm going to assume not. &self.bytes_at(size_of::() as IndexT) } + /// Append a payload to the termination request. pub fn put_token_buffer(&mut self, token_buffer: &[u8]) -> &mut Self { let token_length = token_buffer.len() as i32; self.get_struct_mut().token_length = token_length; @@ -66,7 +84,22 @@ where self } + /// Get the total byte length of this termination command pub fn length(&self) -> IndexT { size_of::() as IndexT + self.token_length() } } + +#[cfg(test)] +mod tests { + use crate::command::terminate_driver::TerminateDriverDefn; + use std::mem::size_of; + + #[test] + fn terminate_command_size() { + assert_eq!( + size_of::(), + size_of::() + ) + } +} diff --git a/aeron-rs/src/concurrent/mod.rs b/aeron-rs/src/concurrent/mod.rs index a28b85a..70a1e46 100644 --- a/aeron-rs/src/concurrent/mod.rs +++ b/aeron-rs/src/concurrent/mod.rs @@ -203,6 +203,7 @@ pub trait AtomicBuffer: Deref + DerefMut { self.overlay_mut::(offset).map(|i| *i = value) } + /// Write the contents of a byte slice to this buffer. Does not perform any synchronization fn put_slice( &mut self, index: IndexT, diff --git a/aeron-rs/src/concurrent/ringbuffer.rs b/aeron-rs/src/concurrent/ringbuffer.rs index c8bc7eb..87f01ad 100644 --- a/aeron-rs/src/concurrent/ringbuffer.rs +++ b/aeron-rs/src/concurrent/ringbuffer.rs @@ -383,8 +383,13 @@ where self.max_msg_length } - pub fn consumer_heartbeat_time(&self) -> Result { - self.buffer.get_i64_volatile(self.consumer_heartbeat_index) + /// Return the last heartbeat timestamp associated with the consumer of this queue. + /// Timestamps are milliseconds since 1 Jan 1970, UTC. + pub fn consumer_heartbeat_time(&self) -> i64 { + // UNWRAP: Known-valid offset calculated during initialization + self.buffer + .get_i64_volatile(self.consumer_heartbeat_index) + .unwrap() } } diff --git a/aeron-rs/src/control_protocol.rs b/aeron-rs/src/control_protocol.rs index 43d23e4..dedb124 100644 --- a/aeron-rs/src/control_protocol.rs +++ b/aeron-rs/src/control_protocol.rs @@ -1,3 +1,4 @@ +//! Utilities for wrapping the command-and-control protocol with a nicer API use aeron_driver_sys::*; /// Construct a C-compatible enum out of a set of constants. diff --git a/aeron-rs/src/driver_proxy.rs b/aeron-rs/src/driver_proxy.rs index d55d888..3d3fdbf 100644 --- a/aeron-rs/src/driver_proxy.rs +++ b/aeron-rs/src/driver_proxy.rs @@ -1,3 +1,4 @@ +//! High level API for issuing commands to the Media Driver use crate::command::flyweight::Flyweight; use crate::command::terminate_driver::TerminateDriverDefn; use crate::concurrent::ringbuffer::ManyToOneRingBuffer; @@ -5,6 +6,7 @@ use crate::concurrent::AtomicBuffer; use crate::control_protocol::ClientCommand; use crate::util::{AeronError, IndexT, Result}; +/// High-level interface for issuing commands to a media driver pub struct DriverProxy where A: AtomicBuffer, @@ -17,6 +19,7 @@ impl DriverProxy where A: AtomicBuffer, { + /// Initialize a new driver proxy from a command-and-control "to driver" buffer pub fn new(to_driver: ManyToOneRingBuffer) -> Self { let client_id = to_driver.next_correlation_id(); DriverProxy { @@ -25,14 +28,19 @@ where } } - pub fn time_of_last_driver_keepalive(&self) -> Result { + /// Retrieve the timestamp of the most recent driver heartbeat. Values are + /// milliseconds past 1 Jan 1970, UTC. + pub fn time_of_last_driver_keepalive(&self) -> i64 { self.to_driver.consumer_heartbeat_time() } + /// Get the unique identifier associated with this proxy. pub fn client_id(&self) -> i64 { self.client_id } + /// Request termination of the media driver. Optionally supply a payload on the request + /// that will be available to the driver. pub fn terminate_driver(&mut self, token_buffer: Option<&[u8]>) -> Result<()> { let client_id = self.client_id; self.write_command_to_driver(|buffer: &mut [u8], length: &mut IndexT| { diff --git a/aeron-rs/src/lib.rs b/aeron-rs/src/lib.rs index 9fc7905..b06c98e 100644 --- a/aeron-rs/src/lib.rs +++ b/aeron-rs/src/lib.rs @@ -1,4 +1,5 @@ //! [Aeron](https://github.com/real-logic/aeron) client for Rust +#![deny(missing_docs)] #[cfg(target_endian = "big")] compile_error!("Aeron is only supported on little-endian architectures"); diff --git a/aeron-rs/tests/cnc_terminate.rs b/aeron-rs/tests/cnc_terminate.rs index 3f01086..6e4cf8d 100644 --- a/aeron-rs/tests/cnc_terminate.rs +++ b/aeron-rs/tests/cnc_terminate.rs @@ -4,7 +4,6 @@ use aeron_rs::cnc_descriptor::MetaDataDefinition; use aeron_rs::concurrent::ringbuffer::ManyToOneRingBuffer; use aeron_rs::concurrent::AtomicBuffer; use aeron_rs::driver_proxy::DriverProxy; -use aeron_rs::util::IndexT; use memmap::MmapOptions; use std::ffi::{c_void, CString}; use std::fs::OpenOptions;