From 64babc5e4360a744446f89e60db5e66d2dfa9479 Mon Sep 17 00:00:00 2001 From: Bradlee Speice Date: Sun, 17 Nov 2019 01:06:25 -0500 Subject: [PATCH] `DriverProxy.add_subscription` --- aeron-rs/src/driver_proxy.rs | 29 ++++++++++++++++++++++++++++- 1 file changed, 28 insertions(+), 1 deletion(-) diff --git a/aeron-rs/src/driver_proxy.rs b/aeron-rs/src/driver_proxy.rs index b3d5e98..cb17cfd 100644 --- a/aeron-rs/src/driver_proxy.rs +++ b/aeron-rs/src/driver_proxy.rs @@ -6,6 +6,8 @@ use crate::concurrent::AtomicBuffer; use crate::control_protocol::ClientCommand; use crate::util::{AeronError, IndexT, Result}; use std::mem::size_of; +use crate::command::subscription_message::SubscriptionMessageDefn; +use std::ops::Sub; /// High-level interface for issuing commands to a media driver pub struct DriverProxy @@ -42,8 +44,33 @@ where self.client_id } + /// Request the media driver subscribe to a new channel and stream. + pub fn add_subscription(&mut self, channel: &str, stream_id: i32) -> Result { + let correlation_id = self.to_driver.next_correlation_id(); + if channel.len() > (COMMAND_BUFFER_SIZE - size_of::()) { + return Err(AeronError::InsufficientCapacity) + } + + self.write_command_to_driver(|buffer: &mut [u8], length: &mut IndexT| { + // UNWRAP: `SubscriptionMessageDefn` guaranteed to be smaller than `COMMAND_BUFFER_SIZE` + let mut subscription_message = Flyweight::new::(buffer, 0).unwrap(); + + subscription_message.put_client_id(self.client_id) + .put_registration_correlation_id(-1) + .put_correlation_id(correlation_id) + .put_stream_id(stream_id); + // UNWRAP: Bounds check performed prior to attempting the write + subscription_message.put_channel(channel).unwrap(); + + *length = subscription_message.length(); + ClientCommand::AddSubscription + }); + + Ok(correlation_id) + } + /// Request termination of the media driver. Optionally supply a payload on the request - /// that will be available to the driver. + /// that the driver can use to decide whether or not to honor the request. pub fn terminate_driver(&mut self, token_buffer: Option<&[u8]>) -> Result<()> { let client_id = self.client_id; if token_buffer.is_some()