1
0
mirror of https://github.com/bspeice/aeron-rs synced 2025-01-21 03:20:04 -05:00

Driver proxy add_publication

This commit is contained in:
Bradlee Speice 2019-11-22 22:08:31 -05:00
parent fc0c3d9b1c
commit 7514f1249f

View File

@ -1,5 +1,6 @@
//! High level API for issuing commands to the Media Driver
use crate::command::flyweight::Flyweight;
use crate::command::publication_message::PublicationMessageDefn;
use crate::command::subscription_message::SubscriptionMessageDefn;
use crate::command::terminate_driver::TerminateDriverDefn;
use crate::concurrent::ringbuffer::ManyToOneRingBuffer;
@ -43,6 +44,35 @@ where
self.client_id
}
/// Request the media driver prepare to publish on a new channel and stream
pub fn add_publication(&mut self, channel: &str, stream_id: i32) -> Result<i64> {
let correlation_id = self.to_driver.next_correlation_id();
if channel.len() > (COMMAND_BUFFER_SIZE - size_of::<PublicationMessageDefn>()) {
return Err(AeronError::InsufficientCapacity);
}
let client_id = self.client_id;
self.write_command_to_driver(|buffer: &mut [u8], length: &mut IndexT| {
// UNWRAP: `PublicationMessageDefn` guaranteed to be smaller than `COMMAND_BUFFER_SIZE`
let mut publication_message =
Flyweight::new::<PublicationMessageDefn>(buffer, 0).unwrap();
publication_message
.put_client_id(client_id)
.put_correlation_id(correlation_id)
.put_stream_id(stream_id);
// UNWRAP: Bounds check performed prior to attempting the write
publication_message.put_channel(channel).unwrap();
*length = publication_message.length();
ClientCommand::AddPublication
})?;
Ok(correlation_id)
}
/// Request the media driver subscribe to a new channel and stream.
pub fn add_subscription(&mut self, channel: &str, stream_id: i32) -> Result<i64> {
let correlation_id = self.to_driver.next_correlation_id();