mirror of
				https://github.com/bspeice/aeron-rs
				synced 2025-10-31 09:30:24 -04:00 
			
		
		
		
	Add the publication message implementation
This commit is contained in:
		| @ -30,6 +30,8 @@ where | ||||
|     where | ||||
|         S: Sized, | ||||
|     { | ||||
|         // QUESTION: Should we zero the first `sizeof::<S>()` bytes at `offset`? | ||||
|         // Would make sure that some of the length parameters can't trigger panics | ||||
|         buffer.overlay::<S>(offset)?; | ||||
|         Ok(Flyweight { | ||||
|             buffer, | ||||
|  | ||||
| @ -1,5 +1,6 @@ | ||||
| //! Message definitions for interactions with the Media Driver | ||||
| pub mod correlated_message; | ||||
| pub mod flyweight; | ||||
| pub mod publication_message; | ||||
| pub mod subscription_message; | ||||
| pub mod terminate_driver; | ||||
|  | ||||
							
								
								
									
										91
									
								
								aeron-rs/src/command/publication_message.rs
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										91
									
								
								aeron-rs/src/command/publication_message.rs
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,91 @@ | ||||
| //! Flyweight implementation for command to add a publication | ||||
| use crate::command::correlated_message::CorrelatedMessageDefn; | ||||
| use crate::command::flyweight::Flyweight; | ||||
| use crate::concurrent::AtomicBuffer; | ||||
| use crate::util::{IndexT, Result}; | ||||
| use std::mem::size_of; | ||||
|  | ||||
| /// Control message for adding a publication | ||||
| /// | ||||
| /// ```text | ||||
| ///  0                   1                   2                   3 | ||||
| ///  0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 | ||||
| /// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | ||||
| /// |                         Client ID                             | | ||||
| /// |                                                               | | ||||
| /// +---------------------------------------------------------------+ | ||||
| /// |                       Correlation ID                          | | ||||
| /// |                                                               | | ||||
| /// +---------------------------------------------------------------+ | ||||
| /// |                         Stream ID                             | | ||||
| /// +---------------------------------------------------------------+ | ||||
| /// |                       Channel Length                          | | ||||
| /// +---------------------------------------------------------------+ | ||||
| /// |                          Channel                             ... | ||||
| ///...                                                              | | ||||
| /// +---------------------------------------------------------------+ | ||||
| /// ``` | ||||
| #[repr(C, packed(4))] | ||||
| pub struct PublicationMessageDefn { | ||||
|     correlated_message: CorrelatedMessageDefn, | ||||
|     stream_id: i32, | ||||
|     channel_length: i32, | ||||
| } | ||||
|  | ||||
| // Rust has no `offset_of` macro, so we'll just compute by hand | ||||
| const CHANNEL_LENGTH_OFFSET: IndexT = | ||||
|     (size_of::<CorrelatedMessageDefn>() + size_of::<i32>()) as IndexT; | ||||
|  | ||||
| impl<A> Flyweight<A, PublicationMessageDefn> | ||||
| where | ||||
|     A: AtomicBuffer, | ||||
| { | ||||
|     /// Retrieve the client identifier associated with this message | ||||
|     pub fn client_id(&self) -> i64 { | ||||
|         self.get_struct().correlated_message.client_id | ||||
|     } | ||||
|  | ||||
|     /// Set the client identifier for this message | ||||
|     pub fn put_client_id(&mut self, value: i64) -> &mut Self { | ||||
|         self.get_struct_mut().correlated_message.client_id = value; | ||||
|         self | ||||
|     } | ||||
|  | ||||
|     /// Retrieve the correlation identifier associated with this message. | ||||
|     /// Will uniquely identify a command and response pair. | ||||
|     pub fn correlation_id(&self) -> i64 { | ||||
|         self.get_struct().correlated_message.correlation_id | ||||
|     } | ||||
|  | ||||
|     /// Set the correlation identifier for this message | ||||
|     pub fn put_correlation_id(&mut self, value: i64) -> &mut Self { | ||||
|         self.get_struct_mut().correlated_message.correlation_id = value; | ||||
|         self | ||||
|     } | ||||
|  | ||||
|     /// Retrieve the stream identifier associated with this request | ||||
|     pub fn stream_id(&self) -> i32 { | ||||
|         self.get_struct().stream_id | ||||
|     } | ||||
|  | ||||
|     /// Set the stream identifier of this request | ||||
|     pub fn put_stream_id(&mut self, value: i32) -> &mut Self { | ||||
|         self.get_struct_mut().stream_id = value; | ||||
|         self | ||||
|     } | ||||
|  | ||||
|     /// Retrieve the channel name of this request | ||||
|     pub fn channel(&self) -> Result<&str> { | ||||
|         self.string_get(CHANNEL_LENGTH_OFFSET) | ||||
|     } | ||||
|  | ||||
|     /// Set the channel name of this request | ||||
|     pub fn put_channel(&mut self, value: &str) -> Result<&mut Self> { | ||||
|         self.string_put(CHANNEL_LENGTH_OFFSET, value).map(|_| self) | ||||
|     } | ||||
|  | ||||
|     /// Get the total byte length of this subscription command | ||||
|     pub fn length(&self) -> IndexT { | ||||
|         size_of::<PublicationMessageDefn>() as IndexT + self.get_struct().channel_length | ||||
|     } | ||||
| } | ||||
| @ -1,4 +1,4 @@ | ||||
| //! Flyweight implementation for commands to add a subscription | ||||
| //! Flyweight implementation for command to add a subscription | ||||
|  | ||||
| use crate::command::correlated_message::CorrelatedMessageDefn; | ||||
| use crate::command::flyweight::Flyweight; | ||||
| @ -100,8 +100,7 @@ where | ||||
|  | ||||
|     /// Set the channel name of this request | ||||
|     pub fn put_channel(&mut self, value: &str) -> Result<&mut Self> { | ||||
|         self.string_put(CHANNEL_LENGTH_OFFSET, value)?; | ||||
|         Ok(self) | ||||
|         self.string_put(CHANNEL_LENGTH_OFFSET, value).map(|_| self) | ||||
|     } | ||||
|  | ||||
|     /// Get the total byte length of this subscription command | ||||
|  | ||||
| @ -1,13 +1,12 @@ | ||||
| //! High level API for issuing commands to the Media Driver | ||||
| use crate::command::flyweight::Flyweight; | ||||
| use crate::command::subscription_message::SubscriptionMessageDefn; | ||||
| use crate::command::terminate_driver::TerminateDriverDefn; | ||||
| use crate::concurrent::ringbuffer::ManyToOneRingBuffer; | ||||
| use crate::concurrent::AtomicBuffer; | ||||
| use crate::control_protocol::ClientCommand; | ||||
| use crate::util::{AeronError, IndexT, Result}; | ||||
| use std::mem::size_of; | ||||
| use crate::command::subscription_message::SubscriptionMessageDefn; | ||||
| use std::ops::Sub; | ||||
|  | ||||
| /// High-level interface for issuing commands to a media driver | ||||
| pub struct DriverProxy<A> | ||||
| @ -48,14 +47,17 @@ where | ||||
|     pub fn add_subscription(&mut self, channel: &str, stream_id: i32) -> Result<i64> { | ||||
|         let correlation_id = self.to_driver.next_correlation_id(); | ||||
|         if channel.len() > (COMMAND_BUFFER_SIZE - size_of::<SubscriptionMessageDefn>()) { | ||||
|             return Err(AeronError::InsufficientCapacity) | ||||
|             return Err(AeronError::InsufficientCapacity); | ||||
|         } | ||||
|  | ||||
|         let client_id = self.client_id; | ||||
|         self.write_command_to_driver(|buffer: &mut [u8], length: &mut IndexT| { | ||||
|             // UNWRAP: `SubscriptionMessageDefn` guaranteed to be smaller than `COMMAND_BUFFER_SIZE` | ||||
|             let mut subscription_message = Flyweight::new::<SubscriptionMessageDefn>(buffer, 0).unwrap(); | ||||
|             let mut subscription_message = | ||||
|                 Flyweight::new::<SubscriptionMessageDefn>(buffer, 0).unwrap(); | ||||
|  | ||||
|             subscription_message.put_client_id(self.client_id) | ||||
|             subscription_message | ||||
|                 .put_client_id(client_id) | ||||
|                 .put_registration_correlation_id(-1) | ||||
|                 .put_correlation_id(correlation_id) | ||||
|                 .put_stream_id(stream_id); | ||||
| @ -64,7 +66,7 @@ where | ||||
|  | ||||
|             *length = subscription_message.length(); | ||||
|             ClientCommand::AddSubscription | ||||
|         }); | ||||
|         })?; | ||||
|  | ||||
|         Ok(correlation_id) | ||||
|     } | ||||
|  | ||||
		Reference in New Issue
	
	Block a user