From 6628b5b6cd8bd88f81763d09cd0593cf1bd6baf6 Mon Sep 17 00:00:00 2001 From: Bradlee Speice Date: Fri, 6 Sep 2019 23:26:11 -0400 Subject: [PATCH] Output analysis results Flatbuffers and SBE were much closer than expected --- src/main.rs | 64 +++++++++++++++++++++++++++++++++---- src/marketdata_capnp.rs | 4 --- src/marketdata_generated.rs | 7 ++-- 3 files changed, 60 insertions(+), 15 deletions(-) diff --git a/src/main.rs b/src/main.rs index 0ca2e00..9ce3232 100644 --- a/src/main.rs +++ b/src/main.rs @@ -50,29 +50,40 @@ fn main() { file.read_to_end(&mut buf) .expect(&format!("Unable to read file={}", path.display())); - let _capnp_unpacked = run_analysis( + let capnp_unpacked = run_analysis( &buf, &mut capnp_runner::CapnpWriter::new(false), &mut capnp_runner::CapnpReader::new(false), ); - let _capnp_packed = run_analysis( + println!("Cap'n Proto Unpacked:\n{}\n", capnp_unpacked.timing_stats()); + + let capnp_packed = run_analysis( &buf, &mut capnp_runner::CapnpWriter::new(true), &mut capnp_runner::CapnpReader::new(true), ); - let _flatbuffers = run_analysis( + assert_eq!(capnp_unpacked.summary_stats, capnp_packed.summary_stats); + println!("Cap'n Proto Packed:\n{}\n", capnp_packed.timing_stats()); + + let flatbuffers = run_analysis( &buf, &mut flatbuffers_runner::FlatbuffersWriter::new(), &mut flatbuffers_runner::FlatbuffersReader::new(), ); - let _sbe = run_analysis( + assert_eq!(capnp_packed.summary_stats, flatbuffers.summary_stats); + println!("Flatbuffers:\n{}\n", flatbuffers.timing_stats()); + + let sbe = run_analysis( &buf, &mut sbe_runner::SBEWriter::new(), &mut sbe_runner::SBEReader::new(), ); + + assert_eq!(flatbuffers.summary_stats, sbe.summary_stats); + println!("SBE:\n{}\n", sbe.timing_stats()); } #[derive(Debug, PartialEq)] @@ -186,6 +197,33 @@ struct RunAnalysis { buf_len: usize, } +impl RunAnalysis { + fn timing_stats(&self) -> String { + format!( + concat!( + " serialize_50={}ns\n", + " serialize_99={}ns\n", + " serialize_999={}ns\n", + " deserialize_50={}ns\n", + " deserialize_99={}ns\n", + " deserialize_999={}ns\n", + " serialize_total={}ns\n", + " deserialize_total={}ns\n", + " write_len={}b" + ), + self.serialize_hist.value_at_quantile(0.5), + self.serialize_hist.value_at_quantile(0.99), + self.serialize_hist.value_at_quantile(0.999), + self.deserialize_hist.value_at_quantile(0.5), + self.deserialize_hist.value_at_quantile(0.99), + self.deserialize_hist.value_at_quantile(0.999), + self.serialize_total_nanos, + self.deserialize_total_nanos, + self.buf_len + ) + } +} + fn run_analysis(iex_data: &Vec, serializer: &mut S, deserializer: &mut D) -> RunAnalysis where S: RunnerSerialize, @@ -201,21 +239,32 @@ where let mut serialize_msgs = 0; for iex_payload in iex_parser { + let output_len_start = output_buf.len(); let serialize_start = Instant::now(); serializer.serialize(&iex_payload, &mut output_buf); let serialize_end = Instant::now().duration_since(serialize_start).as_nanos(); + serialize_hist.record(serialize_end as u64).unwrap(); serialize_nanos_total += serialize_end; - serialize_msgs += 1; + + // If the IEX payload is made up of messages we don't care about + // (a multi-message containing nothing but SystemEvent for example), + // Cap'n Proto doesn't write anything into the output buffer. + // As such, only increment `serialize_msgs` when something was written + // so that the read/write counts line up. + let write_size = output_buf.len() - output_len_start; + if write_size != 0 { + serialize_msgs += 1; + } } let output_len = output_buf.len(); let mut read_buf = StreamVec::new(output_buf); let mut summarizer = Summarizer::default(); let mut deserialize_hist = Histogram::::new(2).unwrap(); - let mut parsed_msgs: u64 = 0; + let mut parsed_msgs = 0usize; let mut deserialize_nanos_total = 0u128; loop { @@ -234,7 +283,8 @@ where } } - dbg!(serialize_msgs, parsed_msgs); + assert_eq!(serialize_msgs, parsed_msgs); + //dbg!(serialize_all); RunAnalysis { serialize_hist, diff --git a/src/marketdata_capnp.rs b/src/marketdata_capnp.rs index f51c2ec..89de7af 100644 --- a/src/marketdata_capnp.rs +++ b/src/marketdata_capnp.rs @@ -213,7 +213,6 @@ pub mod multi_message { impl Pipeline {} mod _private { use capnp::private::layout; - pub const STRUCT_SIZE: layout::StructSize = layout::StructSize { data: 1, pointers: 1, @@ -508,7 +507,6 @@ pub mod message { impl Pipeline {} mod _private { use capnp::private::layout; - pub const STRUCT_SIZE: layout::StructSize = layout::StructSize { data: 2, pointers: 2, @@ -703,7 +701,6 @@ pub mod trade { impl Pipeline {} mod _private { use capnp::private::layout; - pub const STRUCT_SIZE: layout::StructSize = layout::StructSize { data: 2, pointers: 0, @@ -914,7 +911,6 @@ pub mod level_update { impl Pipeline {} mod _private { use capnp::private::layout; - pub const STRUCT_SIZE: layout::StructSize = layout::StructSize { data: 2, pointers: 0, diff --git a/src/marketdata_generated.rs b/src/marketdata_generated.rs index ce97374..a2ce5db 100644 --- a/src/marketdata_generated.rs +++ b/src/marketdata_generated.rs @@ -1,20 +1,19 @@ // automatically generated by the FlatBuffers compiler, do not modify -extern crate flatbuffers; - use std::cmp::Ordering; use std::mem; +extern crate flatbuffers; use self::flatbuffers::EndianScalar; #[allow(unused_imports, dead_code)] pub mod md_shootout { + use std::cmp::Ordering; use std::mem; - use self::flatbuffers::EndianScalar; - extern crate flatbuffers; + use self::flatbuffers::EndianScalar; #[allow(non_camel_case_types)] #[repr(u8)]