diff --git a/src/capnp_runner.rs b/src/capnp_runner.rs index 8c98528..167671f 100644 --- a/src/capnp_runner.rs +++ b/src/capnp_runner.rs @@ -1,3 +1,6 @@ +use std::convert::TryInto; +use std::io::{BufRead, Read}; +use std::mem::size_of; use std::str::from_utf8_unchecked; use capnp::Error; @@ -136,10 +139,10 @@ impl CapnpReader { } } - pub fn deserialize_packed<'a>(&self, buf: &'a mut StreamVec, stats: &mut Summarizer) -> Result<(), Error> { + pub fn deserialize_packed(&self, buf: &mut StreamVec, stats: &mut Summarizer) -> Result<(), Error> { // Because `capnp::serialize_packed::PackedRead` is hidden from us, packed reads // *have* to both allocate new segments every read, and copy the buffer into - // those same segments. Un-packed reading can use `SliceSegments` for true zero-copy + // those same segments, no ability to re-use allocated memory. let reader = read_message_packed(buf, self.read_opts)?; let multimsg = reader.get_root::().unwrap(); @@ -162,4 +165,146 @@ impl CapnpReader { }; Ok(()) } + + pub fn deserialize_unpacked(&self, buf: &mut StreamVec, stats: &mut Summarizer) -> Result<(), Error> { + let mut data = buf.fill_buf()?; + if data.len() == 0 { + return Err(capnp::Error::failed(String::new())); + } + + let orig_data = data; + let reader_opts = ReaderOptions::default(); + + /* + Read into `OwnedSegments`, which means we copy the entire message into a new Vec. Note that + the `data` pointer is modified underneath us, can figure out the message length by + checking the difference between where we started and what `data` is afterward. + This is a trick you learn only by checking the fuzzing test cases. + + let reader = capnp::serialize::read_message(&mut data, reader_opts)?; + let bytes_consumed = orig_data.len() - data.len(); + */ + + /* + Read into `SliceSegments`, which allows us to re-use the underlying message storage, + but still forces a Vec allocation for `offsets`. Also requires us to copy code from + Cap'n'Proto because `SliceSegments` has private fields, and `read_segment_table` + is private. And all this because `read_segment_from_words` has a length check + that triggers an error if our buffer is too large. + There is no other documentation on how to calculate `bytes_consumed` in this case + that I could find, you just have to guess and check until you figure this one out. + */ + let (num_words, offsets) = read_segment_table(&mut data, reader_opts)?; + let words = unsafe { capnp::Word::bytes_to_words(data) }; + let reader = capnp::message::Reader::new( + SliceSegments { + words: words, + segment_slices: offsets, + }, + reader_opts, + ); + let segment_table_bytes = orig_data.len() - data.len(); + let msg_bytes = num_words * size_of::(); + let bytes_consumed = segment_table_bytes + msg_bytes; + + let multimsg = reader.get_root::()?; + for msg in multimsg.get_messages()?.iter() { + let sym = msg.get_symbol()?; + + match msg.which()? { + message::Trade(trade) => { + let trade = trade.unwrap(); + stats.append_trade_volume(sym, trade.get_size().into()); + }, + message::Quote(quote) => { + let quote = quote.unwrap(); + let is_buy = match quote.get_side()? { + Side::Buy => true, + _ => false + }; + stats.update_quote_prices(sym, quote.get_price(), is_buy); + }, + } + } + + buf.consume(bytes_consumed); + Ok(()) + } +} + + +pub struct SliceSegments<'a> { + words: &'a [capnp::Word], + segment_slices: Vec<(usize, usize)>, +} + +impl<'a> capnp::message::ReaderSegments for SliceSegments<'a> { + fn get_segment<'b>(&'b self, id: u32) -> Option<&'b [capnp::Word]> { + if id < self.segment_slices.len() as u32 { + let (a, b) = self.segment_slices[id as usize]; + Some(&self.words[a..b]) + } else { + None + } + } + + fn len(&self) -> usize { + self.segment_slices.len() + } +} + +fn read_segment_table(read: &mut R, + options: capnp::message::ReaderOptions) + -> capnp::Result<(usize, Vec<(usize, usize)>)> + where R: Read +{ + let mut buf: [u8; 8] = [0; 8]; + + // read the first Word, which contains segment_count and the 1st segment length + read.read_exact(&mut buf)?; + let segment_count = u32::from_le_bytes(buf[0..4].try_into().unwrap()).wrapping_add(1) as usize; + + if segment_count >= 512 { + return Err(Error::failed(format!("Too many segments: {}", segment_count))) + } else if segment_count == 0 { + return Err(Error::failed(format!("Too few segments: {}", segment_count))) + } + + let mut segment_slices = Vec::with_capacity(segment_count); + let mut total_words = u32::from_le_bytes(buf[4..8].try_into().unwrap()) as usize; + segment_slices.push((0, total_words)); + + if segment_count > 1 { + if segment_count < 4 { + read.read_exact(&mut buf)?; + for idx in 0..(segment_count - 1) { + let segment_len = + u32::from_le_bytes(buf[(idx * 4)..(idx + 1) * 4].try_into().unwrap()) as usize; + + segment_slices.push((total_words, total_words + segment_len)); + total_words += segment_len; + } + } else { + let mut segment_sizes = vec![0u8; (segment_count & !1) * 4]; + read.read_exact(&mut segment_sizes[..])?; + for idx in 0..(segment_count - 1) { + let segment_len = + u32::from_le_bytes(segment_sizes[(idx * 4)..(idx + 1) * 4].try_into().unwrap()) as usize; + + segment_slices.push((total_words, total_words + segment_len)); + total_words += segment_len; + } + } + } + + // Don't accept a message which the receiver couldn't possibly traverse without hitting the + // traversal limit. Without this check, a malicious client could transmit a very large segment + // size to make the receiver allocate excessive space and possibly crash. + if total_words as u64 > options.traversal_limit_in_words { + return Err(Error::failed( + format!("Message has {} words, which is too large. To increase the limit on the \ + receiving end, see capnp::message::ReaderOptions.", total_words))) + } + + Ok((total_words, segment_slices)) } diff --git a/src/main.rs b/src/main.rs index f4c95d8..60d343f 100644 --- a/src/main.rs +++ b/src/main.rs @@ -48,28 +48,26 @@ fn main() { let mut summarizer = Summarizer::default(); let mut parser = IexParser::new(&buf[..]); - let mut output_buf: Vec = Vec::new(); + // Pre-allocate the same size as the backing file. Will be way more than + // necessary, but makes sure there's no re-allocation not related to + // actual parsing/serialization code + let mut output_buf: Vec = Vec::with_capacity(buf.capacity()); /* let mut capnp_writer = capnp_runner::CapnpWriter::new(); - let capnp_reader = capnp_runner::CapnpReader::new(); - for iex_payload in parser { //let iex_payload = parser.next().unwrap(); capnp_writer.serialize(&iex_payload, &mut output_buf, true); } + let capnp_reader = capnp_runner::CapnpReader::new(); let mut read_buf = StreamVec::new(output_buf); let mut parsed_msgs: u64 = 0; while let Ok(_) = capnp_reader.deserialize_packed(&mut read_buf, &mut summarizer) { parsed_msgs += 1; } - assert_eq!(read_buf.pos, read_buf.inner.len()); - */ - let mut fb_writer = flatbuffers_runner::FlatbuffersWriter::new(); - for iex_payload in parser { fb_writer.serialize(&iex_payload, &mut output_buf); } @@ -81,6 +79,20 @@ fn main() { while let Ok(_) = fb_reader.deserialize(&mut read_buf, &mut summarizer) { parsed_msgs += 1; } + */ + + let mut capnp_writer = capnp_runner::CapnpWriter::new(); + for iex_payload in parser { + //let iex_payload = parser.next().unwrap(); + capnp_writer.serialize(&iex_payload, &mut output_buf, false); + } + + let capnp_reader = capnp_runner::CapnpReader::new(); + let mut read_buf = StreamVec::new(output_buf); + let mut parsed_msgs: u64 = 0; + while let Ok(_) = capnp_reader.deserialize_unpacked(&mut read_buf, &mut summarizer) { + parsed_msgs += 1; + } dbg!(parsed_msgs); dbg!(summarizer); diff --git a/src/marketdata_capnp.rs b/src/marketdata_capnp.rs index 67750da..9c7968b 100644 --- a/src/marketdata_capnp.rs +++ b/src/marketdata_capnp.rs @@ -144,17 +144,17 @@ pub mod multi_message { impl Pipeline { } mod _private { - use capnp::private::layout; + use capnp::private::layout; - pub const STRUCT_SIZE: layout::StructSize = layout::StructSize { data: 1, pointers: 1 }; + pub const STRUCT_SIZE: layout::StructSize = layout::StructSize { data: 1, pointers: 1 }; pub const TYPE_ID: u64 = 0xd13b_1bd4_36e1_ca9f; } } pub mod message { - pub use self::Which::{Quote, Trade}; + pub use self::Which::{Quote, Trade}; - #[derive(Copy, Clone)] + #[derive(Copy, Clone)] pub struct Owned; impl <'a> ::capnp::traits::Owned<'a> for Owned { type Reader = Reader<'a>; type Builder = Builder<'a>; } impl <'a> ::capnp::traits::OwnedStruct<'a> for Owned { type Reader = Reader<'a>; type Builder = Builder<'a>; } @@ -362,9 +362,9 @@ pub mod message { impl Pipeline { } mod _private { - use capnp::private::layout; + use capnp::private::layout; - pub const STRUCT_SIZE: layout::StructSize = layout::StructSize { data: 2, pointers: 2 }; + pub const STRUCT_SIZE: layout::StructSize = layout::StructSize { data: 2, pointers: 2 }; pub const TYPE_ID: u64 = 0x91d7_2965_3a3d_4be4; } pub enum Which { @@ -506,9 +506,9 @@ pub mod trade { impl Pipeline { } mod _private { - use capnp::private::layout; + use capnp::private::layout; - pub const STRUCT_SIZE: layout::StructSize = layout::StructSize { data: 2, pointers: 0 }; + pub const STRUCT_SIZE: layout::StructSize = layout::StructSize { data: 2, pointers: 0 }; pub const TYPE_ID: u64 = 0xd29e_10bd_4e5f_c241; } } @@ -668,9 +668,9 @@ pub mod level_update { impl Pipeline { } mod _private { - use capnp::private::layout; + use capnp::private::layout; - pub const STRUCT_SIZE: layout::StructSize = layout::StructSize { data: 2, pointers: 0 }; + pub const STRUCT_SIZE: layout::StructSize = layout::StructSize { data: 2, pointers: 0 }; pub const TYPE_ID: u64 = 0xe664_c3b5_6628_c453; } }