diff --git a/aeron-rs/src/driver_proxy.rs b/aeron-rs/src/driver_proxy.rs index 94d6382..9a47411 100644 --- a/aeron-rs/src/driver_proxy.rs +++ b/aeron-rs/src/driver_proxy.rs @@ -1,5 +1,6 @@ //! High level API for issuing commands to the Media Driver use crate::command::flyweight::Flyweight; +use crate::command::publication_message::PublicationMessageDefn; use crate::command::subscription_message::SubscriptionMessageDefn; use crate::command::terminate_driver::TerminateDriverDefn; use crate::concurrent::ringbuffer::ManyToOneRingBuffer; @@ -43,6 +44,35 @@ where self.client_id } + /// Request the media driver prepare to publish on a new channel and stream + pub fn add_publication(&mut self, channel: &str, stream_id: i32) -> Result { + let correlation_id = self.to_driver.next_correlation_id(); + if channel.len() > (COMMAND_BUFFER_SIZE - size_of::()) { + return Err(AeronError::InsufficientCapacity); + } + + let client_id = self.client_id; + self.write_command_to_driver(|buffer: &mut [u8], length: &mut IndexT| { + // UNWRAP: `PublicationMessageDefn` guaranteed to be smaller than `COMMAND_BUFFER_SIZE` + let mut publication_message = + Flyweight::new::(buffer, 0).unwrap(); + + publication_message + .put_client_id(client_id) + .put_correlation_id(correlation_id) + .put_stream_id(stream_id); + + // UNWRAP: Bounds check performed prior to attempting the write + publication_message.put_channel(channel).unwrap(); + + *length = publication_message.length(); + + ClientCommand::AddPublication + })?; + + Ok(correlation_id) + } + /// Request the media driver subscribe to a new channel and stream. pub fn add_subscription(&mut self, channel: &str, stream_id: i32) -> Result { let correlation_id = self.to_driver.next_correlation_id();