diff --git a/marketdata.xml b/marketdata.xml index 038e9d8..9e8f8f7 100644 --- a/marketdata.xml +++ b/marketdata.xml @@ -59,23 +59,26 @@ --> - - + + diff --git a/src/lib.rs b/src/lib.rs deleted file mode 100644 index 1c88d4b..0000000 --- a/src/lib.rs +++ /dev/null @@ -1,2 +0,0 @@ -// This file is needed for tests outside the main source tree to find the project files -pub mod marketdata_capnp; \ No newline at end of file diff --git a/src/main.rs b/src/main.rs index 60d343f..c2e0d6a 100644 --- a/src/main.rs +++ b/src/main.rs @@ -17,9 +17,11 @@ use crate::iex::IexParser; pub mod marketdata_capnp; #[allow(unused_imports)] pub mod marketdata_generated; // Flatbuffers +pub mod marketdata_sbe; mod capnp_runner; mod flatbuffers_runner; +mod sbe_runner; mod iex; mod parsers; @@ -81,6 +83,7 @@ fn main() { } */ + /* let mut capnp_writer = capnp_runner::CapnpWriter::new(); for iex_payload in parser { //let iex_payload = parser.next().unwrap(); @@ -93,6 +96,20 @@ fn main() { while let Ok(_) = capnp_reader.deserialize_unpacked(&mut read_buf, &mut summarizer) { parsed_msgs += 1; } + */ + + let mut sbe_writer = sbe_runner::SBEWriter::new(); + for iex_payload in parser { + //let iex_payload = parser.next().unwrap(); + sbe_writer.serialize(&iex_payload, &mut output_buf); + } + + let sbe_reader = sbe_runner::SBEReader::new(); + let mut read_buf = StreamVec::new(output_buf); + let mut parsed_msgs: u64 = 0; + while let Ok(_) = sbe_reader.deserialize(&mut read_buf, &mut summarizer) { + parsed_msgs += 1; + } dbg!(parsed_msgs); dbg!(summarizer); diff --git a/src/marketdata_capnp.rs b/src/marketdata_capnp.rs index 9c7968b..67750da 100644 --- a/src/marketdata_capnp.rs +++ b/src/marketdata_capnp.rs @@ -144,17 +144,17 @@ pub mod multi_message { impl Pipeline { } mod _private { - use capnp::private::layout; + use capnp::private::layout; - pub const STRUCT_SIZE: layout::StructSize = layout::StructSize { data: 1, pointers: 1 }; + pub const STRUCT_SIZE: layout::StructSize = layout::StructSize { data: 1, pointers: 1 }; pub const TYPE_ID: u64 = 0xd13b_1bd4_36e1_ca9f; } } pub mod message { - pub use self::Which::{Quote, Trade}; + pub use self::Which::{Quote, Trade}; - #[derive(Copy, Clone)] + #[derive(Copy, Clone)] pub struct Owned; impl <'a> ::capnp::traits::Owned<'a> for Owned { type Reader = Reader<'a>; type Builder = Builder<'a>; } impl <'a> ::capnp::traits::OwnedStruct<'a> for Owned { type Reader = Reader<'a>; type Builder = Builder<'a>; } @@ -362,9 +362,9 @@ pub mod message { impl Pipeline { } mod _private { - use capnp::private::layout; + use capnp::private::layout; - pub const STRUCT_SIZE: layout::StructSize = layout::StructSize { data: 2, pointers: 2 }; + pub const STRUCT_SIZE: layout::StructSize = layout::StructSize { data: 2, pointers: 2 }; pub const TYPE_ID: u64 = 0x91d7_2965_3a3d_4be4; } pub enum Which { @@ -506,9 +506,9 @@ pub mod trade { impl Pipeline { } mod _private { - use capnp::private::layout; + use capnp::private::layout; - pub const STRUCT_SIZE: layout::StructSize = layout::StructSize { data: 2, pointers: 0 }; + pub const STRUCT_SIZE: layout::StructSize = layout::StructSize { data: 2, pointers: 0 }; pub const TYPE_ID: u64 = 0xd29e_10bd_4e5f_c241; } } @@ -668,9 +668,9 @@ pub mod level_update { impl Pipeline { } mod _private { - use capnp::private::layout; + use capnp::private::layout; - pub const STRUCT_SIZE: layout::StructSize = layout::StructSize { data: 2, pointers: 0 }; + pub const STRUCT_SIZE: layout::StructSize = layout::StructSize { data: 2, pointers: 0 }; pub const TYPE_ID: u64 = 0xe664_c3b5_6628_c453; } } diff --git a/src/marketdata_sbe.rs b/src/marketdata_sbe.rs index 4d2ea13..4fac90e 100644 --- a/src/marketdata_sbe.rs +++ b/src/marketdata_sbe.rs @@ -9,13 +9,13 @@ extern crate core; /// Errors that may occur during the course of encoding or decoding. #[derive(Debug)] pub enum CodecErr { - /// Too few bytes in the byte-slice to read or write the data structure relevant - /// to the current state of the codec - NotEnoughBytes, + /// Too few bytes in the byte-slice to read or write the data structure relevant + /// to the current state of the codec + NotEnoughBytes, - /// Groups and vardata are constrained by the numeric type chosen to represent their - /// length as well as optional maxima imposed by the schema - SliceIsLongerThanAllowedBySchema, + /// Groups and vardata are constrained by the numeric type chosen to represent their + /// length as well as optional maxima imposed by the schema + SliceIsLongerThanAllowedBySchema, } pub type CodecResult = core::result::Result; @@ -23,208 +23,206 @@ pub type CodecResult = core::result::Result; /// Scratch Decoder Data Wrapper - codec internal use only #[derive(Debug)] pub struct ScratchDecoderData<'d> { - data: &'d [u8], - pos: usize, + data: &'d [u8], + pos: usize, } impl<'d> ScratchDecoderData<'d> { - /// Create a struct reference overlaid atop the data buffer - /// such that the struct's contents directly reflect the buffer. - /// Advances the `pos` index by the size of the struct in bytes. - #[inline] - fn read_type(&mut self, num_bytes: usize) -> CodecResult<&'d T> { - let end = self.pos + num_bytes; - if end <= self.data.len() { - let s = self.data[self.pos..end].as_ptr() as *mut T; - let v: &'d T = unsafe { &*s }; - self.pos = end; - Ok(v) - } else { - Err(CodecErr::NotEnoughBytes) - } + /// Create a struct reference overlaid atop the data buffer + /// such that the struct's contents directly reflect the buffer. + /// Advances the `pos` index by the size of the struct in bytes. + #[inline] + fn read_type(&mut self, num_bytes: usize) -> CodecResult<&'d T> { + let end = self.pos + num_bytes; + if end <= self.data.len() { + let s = self.data[self.pos..end].as_ptr() as *mut T; + let v: &'d T = unsafe { &*s }; + self.pos = end; + Ok(v) + } else { + Err(CodecErr::NotEnoughBytes) } + } - /// Advances the `pos` index by a set number of bytes. - #[inline] - fn skip_bytes(&mut self, num_bytes: usize) -> CodecResult<()> { - let end = self.pos + num_bytes; - if end <= self.data.len() { - self.pos = end; - Ok(()) - } else { - Err(CodecErr::NotEnoughBytes) - } + /// Advances the `pos` index by a set number of bytes. + #[inline] + fn skip_bytes(&mut self, num_bytes: usize) -> CodecResult<()> { + let end = self.pos + num_bytes; + if end <= self.data.len() { + self.pos = end; + Ok(()) + } else { + Err(CodecErr::NotEnoughBytes) } + } - /// Create a slice reference overlaid atop the data buffer - /// such that the slice's members' contents directly reflect the buffer. - /// Advances the `pos` index by the size of the slice contents in bytes. - #[inline] - fn read_slice(&mut self, count: usize, bytes_per_item: usize) -> CodecResult<&'d [T]> { - let num_bytes = bytes_per_item * count; - let end = self.pos + num_bytes; - if end <= self.data.len() { - let v: &'d [T] = unsafe { - core::slice::from_raw_parts(self.data[self.pos..end].as_ptr() as *const T, count) - }; - self.pos = end; - Ok(v) - } else { - Err(CodecErr::NotEnoughBytes) - } + /// Create a slice reference overlaid atop the data buffer + /// such that the slice's members' contents directly reflect the buffer. + /// Advances the `pos` index by the size of the slice contents in bytes. + #[inline] + fn read_slice(&mut self, count: usize, bytes_per_item: usize) -> CodecResult<&'d [T]> { + let num_bytes = bytes_per_item * count; + let end = self.pos + num_bytes; + if end <= self.data.len() { + let v: &'d [T] = unsafe { + core::slice::from_raw_parts(self.data[self.pos..end].as_ptr() as *const T, count) + }; + self.pos = end; + Ok(v) + } else { + Err(CodecErr::NotEnoughBytes) } + } } /// Scratch Encoder Data Wrapper - codec internal use only #[derive(Debug)] pub struct ScratchEncoderData<'d> { - data: &'d mut [u8], - pos: usize, + data: &'d mut [u8], + pos: usize, } impl<'d> ScratchEncoderData<'d> { - /// Copy the bytes of a value into the data buffer - /// Advances the `pos` index to after the newly-written bytes. - #[inline] - fn write_type(&mut self, t: &T, num_bytes: usize) -> CodecResult<()> { - let end = self.pos + num_bytes; - if end <= self.data.len() { - let source_bytes: &[u8] = unsafe { - core::slice::from_raw_parts(t as *const T as *const u8, num_bytes) - }; - (&mut self.data[self.pos..end]).copy_from_slice(source_bytes); - self.pos = end; - Ok(()) - } else { - Err(CodecErr::NotEnoughBytes) - } + /// Copy the bytes of a value into the data buffer + /// Advances the `pos` index to after the newly-written bytes. + #[inline] + fn write_type(&mut self, t: &T, num_bytes: usize) -> CodecResult<()> { + let end = self.pos + num_bytes; + if end <= self.data.len() { + let source_bytes: &[u8] = unsafe { + core::slice::from_raw_parts(t as *const T as *const u8, num_bytes) + }; + (&mut self.data[self.pos..end]).copy_from_slice(source_bytes); + self.pos = end; + Ok(()) + } else { + Err(CodecErr::NotEnoughBytes) } + } - /// Advances the `pos` index by a set number of bytes. - #[inline] - fn skip_bytes(&mut self, num_bytes: usize) -> CodecResult<()> { - let end = self.pos + num_bytes; - if end <= self.data.len() { - self.pos = end; - Ok(()) - } else { - Err(CodecErr::NotEnoughBytes) - } + /// Advances the `pos` index by a set number of bytes. + #[inline] + fn skip_bytes(&mut self, num_bytes: usize) -> CodecResult<()> { + let end = self.pos + num_bytes; + if end <= self.data.len() { + self.pos = end; + Ok(()) + } else { + Err(CodecErr::NotEnoughBytes) } + } - /// Create a struct reference overlaid atop the data buffer - /// such that changes to the struct directly edit the buffer. - /// Note that the initial content of the struct's fields may be garbage. - /// Advances the `pos` index to after the newly-written bytes. - #[inline] - fn writable_overlay(&mut self, num_bytes: usize) -> CodecResult<&'d mut T> { - let end = self.pos + num_bytes; - if end <= self.data.len() { - let v: &'d mut T = unsafe { - let s = self.data.as_ptr().offset(self.pos as isize) as *mut T; - &mut *s - }; - self.pos = end; - Ok(v) - } else { - Err(CodecErr::NotEnoughBytes) - } + /// Create a struct reference overlaid atop the data buffer + /// such that changes to the struct directly edit the buffer. + /// Note that the initial content of the struct's fields may be garbage. + /// Advances the `pos` index to after the newly-written bytes. + #[inline] + fn writable_overlay(&mut self, num_bytes: usize) -> CodecResult<&'d mut T> { + let end = self.pos + num_bytes; + if end <= self.data.len() { + let v: &'d mut T = unsafe { + let s = self.data.as_ptr().offset(self.pos as isize) as *mut T; + &mut *s + }; + self.pos = end; + Ok(v) + } else { + Err(CodecErr::NotEnoughBytes) } + } - /// Copy the bytes of a value into the data buffer at a specific position - /// Does **not** alter the `pos` index. - #[inline] - fn write_at_position(&mut self, position: usize, t: &T, num_bytes: usize) -> CodecResult<()> { - let end = position + num_bytes; - if end <= self.data.len() { - let source_bytes: &[u8] = unsafe { - core::slice::from_raw_parts(t as *const T as *const u8, num_bytes) - }; - (&mut self.data[position..end]).copy_from_slice(source_bytes); - Ok(()) - } else { - Err(CodecErr::NotEnoughBytes) - } + /// Copy the bytes of a value into the data buffer at a specific position + /// Does **not** alter the `pos` index. + #[inline] + fn write_at_position(&mut self, position: usize, t: &T, num_bytes: usize) -> CodecResult<()> { + let end = position + num_bytes; + if end <= self.data.len() { + let source_bytes: &[u8] = unsafe { + core::slice::from_raw_parts(t as *const T as *const u8, num_bytes) + }; + (&mut self.data[position..end]).copy_from_slice(source_bytes); + Ok(()) + } else { + Err(CodecErr::NotEnoughBytes) } - /// Create a mutable slice overlaid atop the data buffer directly - /// such that changes to the slice contents directly edit the buffer - /// Note that the initial content of the slice's members' fields may be garbage. - /// Advances the `pos` index to after the region representing the slice. - #[inline] - fn writable_slice(&mut self, count: usize, bytes_per_item: usize) -> CodecResult<&'d mut [T]> { - let end = self.pos + (count * bytes_per_item); - if end <= self.data.len() { - let v: &'d mut [T] = unsafe { - core::slice::from_raw_parts_mut(self.data[self.pos..end].as_mut_ptr() as *mut T, count) - }; - self.pos = end; - Ok(v) - } else { - Err(CodecErr::NotEnoughBytes) - } + } + /// Create a mutable slice overlaid atop the data buffer directly + /// such that changes to the slice contents directly edit the buffer + /// Note that the initial content of the slice's members' fields may be garbage. + /// Advances the `pos` index to after the region representing the slice. + #[inline] + fn writable_slice(&mut self, count: usize, bytes_per_item: usize) -> CodecResult<&'d mut [T]> { + let end = self.pos + (count * bytes_per_item); + if end <= self.data.len() { + let v: &'d mut [T] = unsafe { + core::slice::from_raw_parts_mut(self.data[self.pos..end].as_mut_ptr() as *mut T, count) + }; + self.pos = end; + Ok(v) + } else { + Err(CodecErr::NotEnoughBytes) } + } - /// Copy the raw bytes of a slice's contents into the data buffer - /// Does **not** encode the length of the slice explicitly into the buffer. - /// Advances the `pos` index to after the newly-written slice bytes. - #[inline] - fn write_slice_without_count(&mut self, t: &[T], bytes_per_item: usize) -> CodecResult<()> { - let content_bytes_size = bytes_per_item * t.len(); - let end = self.pos + content_bytes_size; - if end <= self.data.len() { - let source_bytes: &[u8] = unsafe { - core::slice::from_raw_parts(t.as_ptr() as *const u8, content_bytes_size) - }; - (&mut self.data[self.pos..end]).copy_from_slice(source_bytes); - self.pos = end; - Ok(()) - } else { - Err(CodecErr::NotEnoughBytes) - } + /// Copy the raw bytes of a slice's contents into the data buffer + /// Does **not** encode the length of the slice explicitly into the buffer. + /// Advances the `pos` index to after the newly-written slice bytes. + #[inline] + fn write_slice_without_count(&mut self, t: &[T], bytes_per_item: usize) -> CodecResult<()> { + let content_bytes_size = bytes_per_item * t.len(); + let end = self.pos + content_bytes_size; + if end <= self.data.len() { + let source_bytes: &[u8] = unsafe { + core::slice::from_raw_parts(t.as_ptr() as *const u8, content_bytes_size) + }; + (&mut self.data[self.pos..end]).copy_from_slice(source_bytes); + self.pos = end; + Ok(()) + } else { + Err(CodecErr::NotEnoughBytes) } + } } /// Convenience Either enum #[derive(Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Debug)] pub enum Either { - Left(L), - Right(R), + Left(L), + Right(R) } /// Enum Side #[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)] #[repr(u8)] pub enum Side { - Buy = 0u8, - Sell = 1u8, - NullVal = 255u8, + Buy = 0u8, + Sell = 1u8, + NullVal = 255u8, } - impl Default for Side { - fn default() -> Self { Side::NullVal } + fn default() -> Self { Side::NullVal } } /// Enum MsgType #[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)] #[repr(u8)] pub enum MsgType { - Trade = 0u8, - Quote = 1u8, - NullVal = 255u8, + Trade = 0u8, + Quote = 1u8, + NullVal = 255u8, } - impl Default for MsgType { - fn default() -> Self { MsgType::NullVal } + fn default() -> Self { MsgType::NullVal } } /// Quote #[repr(C, packed)] #[derive(Default)] pub struct Quote { - pub price: u64, - pub size: u32, - pub flags: u8, - pub side: Side, + pub price: u64, + pub size: u32, + pub flags: u8, + pub side: Side, } impl Quote {} @@ -233,8 +231,8 @@ impl Quote {} #[repr(C, packed)] #[derive(Default)] pub struct Trade { - pub price: u64, - pub size: u32, + pub price: u64, + pub size: u32, } impl Trade {} @@ -243,10 +241,10 @@ impl Trade {} #[repr(C, packed)] #[derive(Default)] pub struct MessageHeader { - pub block_length: u16, - pub template_id: u16, - pub schema_id: u16, - pub version: u16, + pub block_length: u16, + pub template_id: u16, + pub schema_id: u16, + pub version: u16, } impl MessageHeader {} @@ -255,8 +253,8 @@ impl MessageHeader {} #[repr(C, packed)] #[derive(Default)] pub struct GroupSizeEncoding { - pub block_length: u16, - pub num_in_group: u16, + pub block_length: u16, + pub num_in_group: u16, } impl GroupSizeEncoding {} @@ -265,24 +263,24 @@ impl GroupSizeEncoding {} #[repr(C, packed)] #[derive(Default)] pub struct VarAsciiEncoding { - pub length: u32, - pub var_data: u8, + pub length: u32, + pub var_data: u8, } impl VarAsciiEncoding {} /// MessageHeader Decoder entry point pub fn start_decoding_message_header<'d>(data: &'d [u8]) -> CodecResult<(&'d MessageHeader, ScratchDecoderData<'d>)> { - let mut scratch = ScratchDecoderData { data: data, pos: 0 }; - let v = scratch.read_type::(8)?; - Ok((v, scratch)) + let mut scratch = ScratchDecoderData { data: data, pos: 0 }; + let v = scratch.read_type::(8)?; + Ok((v, scratch)) } /// MultiMessage Fixed-size Fields (8 bytes) #[repr(C, packed)] #[derive(Default)] pub struct MultiMessageFields { - pub sequence_number: u64, + pub sequence_number: u64, } @@ -291,293 +289,279 @@ impl MultiMessageFields {} /// MultiMessage specific Message Header #[repr(C, packed)] pub struct MultiMessageMessageHeader { - pub message_header: MessageHeader + pub message_header: MessageHeader } - impl MultiMessageMessageHeader { - pub const BLOCK_LENGTH: u16 = 8; - pub const TEMPLATE_ID: u16 = 1; - pub const SCHEMA_ID: u16 = 1; - pub const VERSION: u16 = 0; + pub const BLOCK_LENGTH: u16 = 8; + pub const TEMPLATE_ID: u16 = 1; + pub const SCHEMA_ID: u16 = 1; + pub const VERSION: u16 = 0; } - impl Default for MultiMessageMessageHeader { - fn default() -> MultiMessageMessageHeader { - MultiMessageMessageHeader { - message_header: MessageHeader { - block_length: 8u16, - template_id: 1u16, - schema_id: 1u16, - version: 0u16, - } - } + fn default() -> MultiMessageMessageHeader { + MultiMessageMessageHeader { + message_header: MessageHeader { + block_length: 8u16, + template_id: 1u16, + schema_id: 1u16, + version: 0u16, + } } + } } /// Group fixed-field member representations #[repr(C, packed)] #[derive(Default)] pub struct MultiMessageMessagesMember { - pub timestamp: i64, - pub msg_type: MsgType, - pub trade: Trade, - pub quote: Quote, + pub timestamp: i64, + pub msg_type: MsgType, + pub trade: Trade, + pub quote: Quote, } impl MultiMessageMessagesMember {} /// MultiMessageDecoderDone pub struct MultiMessageDecoderDone<'d> { - scratch: ScratchDecoderData<'d>, + scratch: ScratchDecoderData<'d>, } - impl<'d> MultiMessageDecoderDone<'d> { - /// Returns the number of bytes decoded - pub fn unwrap(self) -> usize { - self.scratch.pos - } + /// Returns the number of bytes decoded + pub fn unwrap(self) -> usize { + self.scratch.pos + } - pub fn wrap(scratch: ScratchDecoderData<'d>) -> MultiMessageDecoderDone<'d> { - MultiMessageDecoderDone { scratch: scratch } - } + pub fn wrap(scratch: ScratchDecoderData<'d>) -> MultiMessageDecoderDone<'d> { + MultiMessageDecoderDone { scratch: scratch } + } } /// symbol variable-length data pub struct MultiMessageMessagesSymbolDecoder<'d> { - parent: MultiMessageMessagesMemberDecoder<'d>, + parent: MultiMessageMessagesMemberDecoder<'d>, } - impl<'d> MultiMessageMessagesSymbolDecoder<'d> { - fn wrap(parent: MultiMessageMessagesMemberDecoder<'d>) -> Self { - MultiMessageMessagesSymbolDecoder { parent: parent } - } - pub fn symbol(mut self) -> CodecResult<(&'d [u8], Either, MultiMessageDecoderDone<'d>>)> { - let count = *self.parent.scratch.read_type::(4)?; - Ok((self.parent.scratch.read_slice::(count as usize, 1)?, self.parent.after_member())) - } + fn wrap(parent: MultiMessageMessagesMemberDecoder<'d>) -> Self { + MultiMessageMessagesSymbolDecoder { parent: parent } + } + pub fn symbol(mut self) -> CodecResult<(&'d [u8], Either, MultiMessageDecoderDone<'d>>)> { + let count = *self.parent.scratch.read_type::(4)?; + Ok((self.parent.scratch.read_slice::(count as usize, 1)?, self.parent.after_member())) + } } /// MultiMessageMessages Decoder for fields and header pub struct MultiMessageMessagesMemberDecoder<'d> { - scratch: ScratchDecoderData<'d>, - max_index: u16, - index: u16, + scratch: ScratchDecoderData<'d>, + max_index: u16, + index: u16, } impl<'d> MultiMessageMessagesMemberDecoder<'d> { - fn new(scratch: ScratchDecoderData<'d>, count: u16) -> Self { - assert!(count > 0u16); - MultiMessageMessagesMemberDecoder { - scratch: scratch, - max_index: count - 1, - index: 0, - } + fn new(scratch: ScratchDecoderData<'d>, count: u16) -> Self { + assert!(count > 0u16); + MultiMessageMessagesMemberDecoder { + scratch: scratch, + max_index: count - 1, + index: 0, } + } - pub fn next_messages_member(mut self) -> CodecResult<(&'d MultiMessageMessagesMember, MultiMessageMessagesSymbolDecoder<'d>)> { - let v = self.scratch.read_type::(35)?; - self.index += 1; - Ok((v, MultiMessageMessagesSymbolDecoder::wrap(self))) - } - #[inline] - fn after_member(self) -> Either, MultiMessageDecoderDone<'d>> { - if self.index <= self.max_index { - Either::Left(self) - } else { - Either::Right(MultiMessageDecoderDone::wrap(self.scratch)) - } + pub fn next_messages_member(mut self) -> CodecResult<(&'d MultiMessageMessagesMember, MultiMessageMessagesSymbolDecoder<'d>)> { + let v = self.scratch.read_type::(35)?; + self.index += 1; + Ok((v, MultiMessageMessagesSymbolDecoder::wrap(self))) + } + #[inline] + fn after_member(self) -> Either, MultiMessageDecoderDone<'d>> { + if self.index <= self.max_index { + Either::Left(self) + } else { + Either::Right(MultiMessageDecoderDone::wrap(self.scratch)) } + } } - pub struct MultiMessageMessagesHeaderDecoder<'d> { - scratch: ScratchDecoderData<'d>, + scratch: ScratchDecoderData<'d>, } - impl<'d> MultiMessageMessagesHeaderDecoder<'d> { - fn wrap(scratch: ScratchDecoderData<'d>) -> Self { - MultiMessageMessagesHeaderDecoder { scratch: scratch } - } - pub fn messages_individually(mut self) -> CodecResult, MultiMessageDecoderDone<'d>>> { - let dim = self.scratch.read_type::(4)?; - if dim.num_in_group > 0 { - Ok(Either::Left(MultiMessageMessagesMemberDecoder::new(self.scratch, dim.num_in_group))) - } else { - Ok(Either::Right(MultiMessageDecoderDone::wrap(self.scratch))) - } + fn wrap(scratch: ScratchDecoderData<'d>) -> Self { + MultiMessageMessagesHeaderDecoder { scratch: scratch } + } + pub fn messages_individually(mut self) -> CodecResult, MultiMessageDecoderDone<'d>>> { + let dim = self.scratch.read_type::(4)?; + if dim.num_in_group > 0 { + Ok(Either::Left(MultiMessageMessagesMemberDecoder::new(self.scratch, dim.num_in_group))) + } else { + Ok(Either::Right(MultiMessageDecoderDone::wrap(self.scratch))) } + } } /// MultiMessage Fixed fields Decoder pub struct MultiMessageFieldsDecoder<'d> { - scratch: ScratchDecoderData<'d>, + scratch: ScratchDecoderData<'d>, } - impl<'d> MultiMessageFieldsDecoder<'d> { - pub fn wrap(scratch: ScratchDecoderData<'d>) -> MultiMessageFieldsDecoder<'d> { - MultiMessageFieldsDecoder { scratch: scratch } - } - pub fn multi_message_fields(mut self) -> CodecResult<(&'d MultiMessageFields, MultiMessageMessagesHeaderDecoder<'d>)> { - let v = self.scratch.read_type::(8)?; - Ok((v, MultiMessageMessagesHeaderDecoder::wrap(self.scratch))) - } + pub fn wrap(scratch: ScratchDecoderData<'d>) -> MultiMessageFieldsDecoder<'d> { + MultiMessageFieldsDecoder { scratch: scratch } + } + pub fn multi_message_fields(mut self) -> CodecResult<(&'d MultiMessageFields, MultiMessageMessagesHeaderDecoder<'d>)> { + let v = self.scratch.read_type::(8)?; + Ok((v, MultiMessageMessagesHeaderDecoder::wrap(self.scratch))) + } } /// MultiMessageMessageHeaderDecoder pub struct MultiMessageMessageHeaderDecoder<'d> { - scratch: ScratchDecoderData<'d>, + scratch: ScratchDecoderData<'d>, } - impl<'d> MultiMessageMessageHeaderDecoder<'d> { - pub fn wrap(scratch: ScratchDecoderData<'d>) -> MultiMessageMessageHeaderDecoder<'d> { - MultiMessageMessageHeaderDecoder { scratch: scratch } - } - pub fn header(mut self) -> CodecResult<(&'d MessageHeader, MultiMessageFieldsDecoder<'d>)> { - let v = self.scratch.read_type::(8)?; - Ok((v, MultiMessageFieldsDecoder::wrap(self.scratch))) - } + pub fn wrap(scratch: ScratchDecoderData<'d>) -> MultiMessageMessageHeaderDecoder<'d> { + MultiMessageMessageHeaderDecoder { scratch: scratch } + } + pub fn header(mut self) -> CodecResult<(&'d MessageHeader, MultiMessageFieldsDecoder<'d>)> { + let v = self.scratch.read_type::(8)?; + Ok((v, MultiMessageFieldsDecoder::wrap(self.scratch))) + } } /// MultiMessage Decoder entry point pub fn start_decoding_multi_message<'d>(data: &'d [u8]) -> MultiMessageMessageHeaderDecoder<'d> { - MultiMessageMessageHeaderDecoder::wrap(ScratchDecoderData { data: data, pos: 0 }) + MultiMessageMessageHeaderDecoder::wrap(ScratchDecoderData { data: data, pos: 0 }) } /// MultiMessageEncoderDone pub struct MultiMessageEncoderDone<'d> { - scratch: ScratchEncoderData<'d>, + scratch: ScratchEncoderData<'d>, } - impl<'d> MultiMessageEncoderDone<'d> { - /// Returns the number of bytes encoded - pub fn unwrap(self) -> usize { - self.scratch.pos - } + /// Returns the number of bytes encoded + pub fn unwrap(self) -> usize { + self.scratch.pos + } - pub fn wrap(scratch: ScratchEncoderData<'d>) -> MultiMessageEncoderDone<'d> { - MultiMessageEncoderDone { scratch: scratch } - } + pub fn wrap(scratch: ScratchEncoderData<'d>) -> MultiMessageEncoderDone<'d> { + MultiMessageEncoderDone { scratch: scratch } + } } /// symbol variable-length data pub struct MultiMessageMessagesSymbolEncoder<'d> { - parent: MultiMessageMessagesMemberEncoder<'d>, + parent: MultiMessageMessagesMemberEncoder<'d>, } - impl<'d> MultiMessageMessagesSymbolEncoder<'d> { - fn wrap(parent: MultiMessageMessagesMemberEncoder<'d>) -> Self { - MultiMessageMessagesSymbolEncoder { parent: parent } - } - pub fn symbol(mut self, s: &'d [u8]) -> CodecResult { - let l = s.len(); - if l > 4294967294 { - return Err(CodecErr::SliceIsLongerThanAllowedBySchema); - } - // Write data length - self.parent.scratch.write_type::(&(l as u32), 4)?; // group length - self.parent.scratch.write_slice_without_count::(s, 1)?; - Ok(self.parent) + fn wrap(parent: MultiMessageMessagesMemberEncoder<'d>) -> Self { + MultiMessageMessagesSymbolEncoder { parent: parent } + } + pub fn symbol(mut self, s: &'d [u8]) -> CodecResult { + let l = s.len(); + if l > 4294967294 { + return Err(CodecErr::SliceIsLongerThanAllowedBySchema) } + // Write data length + self.parent.scratch.write_type::(&(l as u32), 4)?; // group length + self.parent.scratch.write_slice_without_count::(s, 1)?; + Ok(self.parent) + } } /// MultiMessageMessages Encoder for fields and header pub struct MultiMessageMessagesMemberEncoder<'d> { - scratch: ScratchEncoderData<'d>, - count_write_pos: usize, - count: u16, + scratch: ScratchEncoderData<'d>, + count_write_pos: usize, + count: u16, } impl<'d> MultiMessageMessagesMemberEncoder<'d> { - #[inline] - fn new(scratch: ScratchEncoderData<'d>, count_write_pos: usize) -> Self { - MultiMessageMessagesMemberEncoder { - scratch: scratch, - count_write_pos: count_write_pos, - count: 0, - } + #[inline] + fn new(scratch: ScratchEncoderData<'d>, count_write_pos: usize) -> Self { + MultiMessageMessagesMemberEncoder { + scratch: scratch, + count_write_pos: count_write_pos, + count: 0, } + } - #[inline] - pub fn next_messages_member(mut self, fields: &MultiMessageMessagesMember) -> CodecResult> { - self.scratch.write_type::(fields, 35)?; // block length - self.count += 1; - Ok(MultiMessageMessagesSymbolEncoder::wrap(self)) - } - #[inline] - pub fn done_with_messages(mut self) -> CodecResult> { - self.scratch.write_at_position::(self.count_write_pos, &self.count, 2)?; - Ok(MultiMessageEncoderDone::wrap(self.scratch)) - } + #[inline] + pub fn next_messages_member(mut self, fields: &MultiMessageMessagesMember) -> CodecResult> { + self.scratch.write_type::(fields, 35)?; // block length + self.count += 1; + Ok(MultiMessageMessagesSymbolEncoder::wrap(self)) + } + #[inline] + pub fn done_with_messages(mut self) -> CodecResult> { + self.scratch.write_at_position::(self.count_write_pos, &self.count, 2)?; + Ok(MultiMessageEncoderDone::wrap(self.scratch)) + } } - pub struct MultiMessageMessagesHeaderEncoder<'d> { - scratch: ScratchEncoderData<'d>, + scratch: ScratchEncoderData<'d>, } - impl<'d> MultiMessageMessagesHeaderEncoder<'d> { - #[inline] - fn wrap(scratch: ScratchEncoderData<'d>) -> Self { - MultiMessageMessagesHeaderEncoder { scratch: scratch } - } - #[inline] - pub fn messages_individually(mut self) -> CodecResult> { - self.scratch.write_type::(&35u16, 2)?; // block length - let count_pos = self.scratch.pos; - self.scratch.write_type::(&0, 2)?; // preliminary group member count - Ok(MultiMessageMessagesMemberEncoder::new(self.scratch, count_pos)) - } + #[inline] + fn wrap(scratch: ScratchEncoderData<'d>) -> Self { + MultiMessageMessagesHeaderEncoder { scratch: scratch } + } + #[inline] + pub fn messages_individually(mut self) -> CodecResult> { + self.scratch.write_type::(&35u16, 2)?; // block length + let count_pos = self.scratch.pos; + self.scratch.write_type::(&0, 2)?; // preliminary group member count + Ok(MultiMessageMessagesMemberEncoder::new(self.scratch, count_pos)) + } } /// MultiMessage Fixed fields Encoder pub struct MultiMessageFieldsEncoder<'d> { - scratch: ScratchEncoderData<'d>, + scratch: ScratchEncoderData<'d>, } - impl<'d> MultiMessageFieldsEncoder<'d> { - pub fn wrap(scratch: ScratchEncoderData<'d>) -> MultiMessageFieldsEncoder<'d> { - MultiMessageFieldsEncoder { scratch: scratch } - } + pub fn wrap(scratch: ScratchEncoderData<'d>) -> MultiMessageFieldsEncoder<'d> { + MultiMessageFieldsEncoder { scratch: scratch } + } - /// Create a mutable struct reference overlaid atop the data buffer - /// such that changes to the struct directly edit the buffer. - /// Note that the initial content of the struct's fields may be garbage. - pub fn multi_message_fields(mut self) -> CodecResult<(&'d mut MultiMessageFields, MultiMessageMessagesHeaderEncoder<'d>)> { - let v = self.scratch.writable_overlay::(8 + 0)?; - Ok((v, MultiMessageMessagesHeaderEncoder::wrap(self.scratch))) - } + /// Create a mutable struct reference overlaid atop the data buffer + /// such that changes to the struct directly edit the buffer. + /// Note that the initial content of the struct's fields may be garbage. + pub fn multi_message_fields(mut self) -> CodecResult<(&'d mut MultiMessageFields, MultiMessageMessagesHeaderEncoder<'d>)> { + let v = self.scratch.writable_overlay::(8 + 0)?; + Ok((v, MultiMessageMessagesHeaderEncoder::wrap(self.scratch))) + } - /// Copy the bytes of a value into the data buffer - pub fn multi_message_fields_copy(mut self, t: &MultiMessageFields) -> CodecResult> { - self.scratch.write_type::(t, 8)?; - Ok(MultiMessageMessagesHeaderEncoder::wrap(self.scratch)) - } + /// Copy the bytes of a value into the data buffer + pub fn multi_message_fields_copy(mut self, t: &MultiMessageFields) -> CodecResult> { + self.scratch.write_type::(t, 8)?; + Ok(MultiMessageMessagesHeaderEncoder::wrap(self.scratch)) + } } /// MultiMessageMessageHeaderEncoder pub struct MultiMessageMessageHeaderEncoder<'d> { - scratch: ScratchEncoderData<'d>, + scratch: ScratchEncoderData<'d>, } - impl<'d> MultiMessageMessageHeaderEncoder<'d> { - pub fn wrap(scratch: ScratchEncoderData<'d>) -> MultiMessageMessageHeaderEncoder<'d> { - MultiMessageMessageHeaderEncoder { scratch: scratch } - } + pub fn wrap(scratch: ScratchEncoderData<'d>) -> MultiMessageMessageHeaderEncoder<'d> { + MultiMessageMessageHeaderEncoder { scratch: scratch } + } - /// Create a mutable struct reference overlaid atop the data buffer - /// such that changes to the struct directly edit the buffer. - /// Note that the initial content of the struct's fields may be garbage. - pub fn header(mut self) -> CodecResult<(&'d mut MessageHeader, MultiMessageFieldsEncoder<'d>)> { - let v = self.scratch.writable_overlay::(8 + 0)?; - Ok((v, MultiMessageFieldsEncoder::wrap(self.scratch))) - } + /// Create a mutable struct reference overlaid atop the data buffer + /// such that changes to the struct directly edit the buffer. + /// Note that the initial content of the struct's fields may be garbage. + pub fn header(mut self) -> CodecResult<(&'d mut MessageHeader, MultiMessageFieldsEncoder<'d>)> { + let v = self.scratch.writable_overlay::(8 + 0)?; + Ok((v, MultiMessageFieldsEncoder::wrap(self.scratch))) + } - /// Copy the bytes of a value into the data buffer - pub fn header_copy(mut self, t: &MessageHeader) -> CodecResult> { - self.scratch.write_type::(t, 8)?; - Ok(MultiMessageFieldsEncoder::wrap(self.scratch)) - } + /// Copy the bytes of a value into the data buffer + pub fn header_copy(mut self, t: &MessageHeader) -> CodecResult> { + self.scratch.write_type::(t, 8)?; + Ok(MultiMessageFieldsEncoder::wrap(self.scratch)) + } } /// MultiMessage Encoder entry point pub fn start_encoding_multi_message<'d>(data: &'d mut [u8]) -> MultiMessageMessageHeaderEncoder<'d> { - MultiMessageMessageHeaderEncoder::wrap(ScratchEncoderData { data: data, pos: 0 }) + MultiMessageMessageHeaderEncoder::wrap(ScratchEncoderData { data: data, pos: 0 }) } diff --git a/src/sbe_runner.rs b/src/sbe_runner.rs new file mode 100644 index 0000000..9765a81 --- /dev/null +++ b/src/sbe_runner.rs @@ -0,0 +1,144 @@ +use std::io::{BufRead, Write}; +use std::str::from_utf8_unchecked; + +use nom::bytes::complete::take_until; +use nom::IResult; + +use crate::{marketdata_sbe, StreamVec, Summarizer}; +use crate::iex::{IexMessage, IexPayload}; +use crate::marketdata_sbe::{Either, MultiMessageFields, MultiMessageMessageHeader, MultiMessageMessagesMember, MultiMessageMessagesMemberEncoder, MultiMessageMessagesSymbolEncoder, Side, start_decoding_multi_message, start_encoding_multi_message}; + +fn __take_until<'a>(tag: &'static str, input: &'a [u8]) -> IResult<&'a [u8], &'a [u8]> { + take_until(tag)(input) +} + +fn parse_symbol(sym: &[u8; 8]) -> &str { + // TODO: Use the `jetscii` library for all that SIMD goodness + // IEX guarantees ASCII, so we're fine using an unsafe conversion + let (_, sym_bytes) = __take_until(" ", &sym[..]).unwrap(); + unsafe { from_utf8_unchecked(sym_bytes) } +} + +pub struct SBEWriter { + /// Buffer to construct messages before copying. While SBE benefits + /// from easily being able to create messages directly in output buffer, + /// we'll construct in a scratch buffer and then copy to more fairly + /// benchmark against Cap'n Proto and Flatbuffers. + scratch_buffer: Vec, + default_header: MultiMessageMessageHeader, +} + +impl SBEWriter { + pub fn new() -> SBEWriter { + SBEWriter { + // 8K scratch buffer is *way* more than necessary, + // but we don't want to run into issues with not enough + // data to encode messages + scratch_buffer: vec![0; 1024 * 8], + default_header: MultiMessageMessageHeader::default(), + } + } + + pub fn serialize(&mut self, payload: &IexPayload, output: &mut Vec) { + let (fields, encoder) = start_encoding_multi_message(&mut self.scratch_buffer[..]) + .header_copy(&self.default_header.message_header).unwrap() + .multi_message_fields().unwrap(); + fields.sequence_number = payload.first_seq_no; + + let mut encoder = encoder.messages_individually().unwrap(); + let mut encoder: MultiMessageMessagesMemberEncoder = payload.messages.iter().fold(encoder, |enc, m| { + match m { + IexMessage::TradeReport(tr) => { + let fields = MultiMessageMessagesMember { + msg_type: marketdata_sbe::MsgType::Trade, + timestamp: tr.timestamp, + trade: marketdata_sbe::Trade { + size: tr.size, + price: tr.price, + }, + ..Default::default() + }; + let sym_enc: MultiMessageMessagesSymbolEncoder = enc.next_messages_member(&fields).unwrap(); + sym_enc.symbol(parse_symbol(&tr.symbol).as_bytes()).unwrap() + } + IexMessage::PriceLevelUpdate(plu) => { + let fields = MultiMessageMessagesMember { + msg_type: marketdata_sbe::MsgType::Quote, + timestamp: plu.timestamp, + quote: marketdata_sbe::Quote { + price: plu.price, + size: plu.size, + flags: plu.event_flags, + side: if plu.msg_type == 0x38 { Side::Buy } else { Side::Sell }, + }, + ..Default::default() + }; + let sym_enc: MultiMessageMessagesSymbolEncoder = enc.next_messages_member(&fields).unwrap(); + sym_enc.symbol(parse_symbol(&plu.symbol).as_bytes()).unwrap() + } + _ => enc + } + }); + + let finished = encoder.done_with_messages().unwrap(); + let data_len = finished.unwrap(); + + output.write(&self.scratch_buffer[..data_len]).unwrap(); + } +} + +pub struct SBEReader; + +impl SBEReader { + pub fn new() -> SBEReader { + SBEReader {} + } + + pub fn deserialize<'a>(&self, buf: &'a mut StreamVec, stats: &mut Summarizer) -> Result<(), ()> { + let data = buf.fill_buf().unwrap(); + if data.len() == 0 { + return Err(()); + } + + let (header, decoder) = start_decoding_multi_message(data) + .header().unwrap(); + + let (fields, decoder) = decoder.multi_message_fields().unwrap(); + let mut msg_decoder = decoder.messages_individually().unwrap(); + while let Either::Left(msg) = msg_decoder { + let (member, sym_dec) = msg.next_messages_member().unwrap(); + let (sym, next_msg_dec) = sym_dec.symbol().unwrap(); + match member.msg_type { + marketdata_sbe::MsgType::Trade => stats.append_trade_volume( + unsafe { from_utf8_unchecked(sym) }, + member.trade.size as u64, + ), + marketdata_sbe::MsgType::Quote => stats.update_quote_prices( + unsafe { from_utf8_unchecked(sym) }, + member.quote.price, + match member.quote.side { + Side::Buy => true, + _ => false + }, + ), + _ => () + } + msg_decoder = next_msg_dec; + } + + // We now have a `Right`, which is a finished messages block + let msg_decoder = match msg_decoder { + Either::Right(r) => r, + _ => panic!("Didn't parse all messages") + }; + + // Interestingly enough, `buf.consume(msg_decoder.unwrap())` isn't OK, + // presumably something to do with when *precisely* the drop of `self` + // happens for `msg_decoder`. Leave it as two statments so that + // Rust is able to prove our immutable borrow of `data` ends in time + // to consume the buffer + let msg_len = msg_decoder.unwrap(); + buf.consume(msg_len); + Ok(()) + } +} diff --git a/tests/capnp.rs b/tests/capnp.rs deleted file mode 100644 index 67a5fd5..0000000 --- a/tests/capnp.rs +++ /dev/null @@ -1,50 +0,0 @@ -use alloc_counter::{AllocCounterSystem, count_alloc, deny_alloc}; - -use md_shootout::marketdata_capnp::multi_message; - -#[global_allocator] -static A: AllocCounterSystem = AllocCounterSystem; - -#[test] -fn reinit_memory_check() { - // Setting up the builder doesn't reserve any heap memory - let mut msg_block = deny_alloc(|| { - capnp::message::Builder::new_default() - }); - - // Setting up the root object, however, does reserve a first segment - let (stats, result) = count_alloc(|| { - let multimsg = msg_block.init_root::(); - multimsg.init_messages(32); - }); - - assert_eq!(stats.0, 4); - assert_eq!(stats.1, 0); - assert_eq!(stats.2, 0); - - // If we reinitialize an object on that original builder, we re-use memory - deny_alloc(|| { - let multimsg = msg_block.init_root::(); - multimsg.init_messages(32); - - // Even if we down-size and up-size the message list size, we don't need - // to re-allocate - let multimsg = msg_block.init_root::(); - multimsg.init_messages(16); - - let multimsg = msg_block.init_root::(); - multimsg.init_messages(32); - }); - - // It's only when we init a larger message count that a fresh allocation occurs - let (stats, _) = count_alloc(|| { - let multimsg = msg_block.init_root::(); - // Note: calling `init_messages(33)` doesn't force allocation because - // the Capnproto builder reserved extra memory the first time around - multimsg.init_messages(256); - }); - - assert_eq!(stats.0, 1); - assert_eq!(stats.1, 3); - assert_eq!(stats.2, 0); -} \ No newline at end of file