Handle the "zero copy" unpacked read for Cap'n'Proto

master
Bradlee Speice 2019-09-01 00:46:33 -04:00
parent ccdec8ddfa
commit 0f241926ff
3 changed files with 176 additions and 19 deletions

View File

@ -1,3 +1,6 @@
use std::convert::TryInto;
use std::io::{BufRead, Read};
use std::mem::size_of;
use std::str::from_utf8_unchecked;
use capnp::Error;
@ -136,10 +139,10 @@ impl CapnpReader {
}
}
pub fn deserialize_packed<'a>(&self, buf: &'a mut StreamVec, stats: &mut Summarizer) -> Result<(), Error> {
pub fn deserialize_packed(&self, buf: &mut StreamVec, stats: &mut Summarizer) -> Result<(), Error> {
// Because `capnp::serialize_packed::PackedRead` is hidden from us, packed reads
// *have* to both allocate new segments every read, and copy the buffer into
// those same segments. Un-packed reading can use `SliceSegments` for true zero-copy
// those same segments, no ability to re-use allocated memory.
let reader = read_message_packed(buf, self.read_opts)?;
let multimsg = reader.get_root::<multi_message::Reader>().unwrap();
@ -162,4 +165,146 @@ impl CapnpReader {
};
Ok(())
}
pub fn deserialize_unpacked(&self, buf: &mut StreamVec, stats: &mut Summarizer) -> Result<(), Error> {
let mut data = buf.fill_buf()?;
if data.len() == 0 {
return Err(capnp::Error::failed(String::new()));
}
let orig_data = data;
let reader_opts = ReaderOptions::default();
/*
Read into `OwnedSegments`, which means we copy the entire message into a new Vec. Note that
the `data` pointer is modified underneath us, can figure out the message length by
checking the difference between where we started and what `data` is afterward.
This is a trick you learn only by checking the fuzzing test cases.
let reader = capnp::serialize::read_message(&mut data, reader_opts)?;
let bytes_consumed = orig_data.len() - data.len();
*/
/*
Read into `SliceSegments`, which allows us to re-use the underlying message storage,
but still forces a Vec allocation for `offsets`. Also requires us to copy code from
Cap'n'Proto because `SliceSegments` has private fields, and `read_segment_table`
is private. And all this because `read_segment_from_words` has a length check
that triggers an error if our buffer is too large.
There is no other documentation on how to calculate `bytes_consumed` in this case
that I could find, you just have to guess and check until you figure this one out.
*/
let (num_words, offsets) = read_segment_table(&mut data, reader_opts)?;
let words = unsafe { capnp::Word::bytes_to_words(data) };
let reader = capnp::message::Reader::new(
SliceSegments {
words: words,
segment_slices: offsets,
},
reader_opts,
);
let segment_table_bytes = orig_data.len() - data.len();
let msg_bytes = num_words * size_of::<capnp::Word>();
let bytes_consumed = segment_table_bytes + msg_bytes;
let multimsg = reader.get_root::<multi_message::Reader>()?;
for msg in multimsg.get_messages()?.iter() {
let sym = msg.get_symbol()?;
match msg.which()? {
message::Trade(trade) => {
let trade = trade.unwrap();
stats.append_trade_volume(sym, trade.get_size().into());
},
message::Quote(quote) => {
let quote = quote.unwrap();
let is_buy = match quote.get_side()? {
Side::Buy => true,
_ => false
};
stats.update_quote_prices(sym, quote.get_price(), is_buy);
},
}
}
buf.consume(bytes_consumed);
Ok(())
}
}
pub struct SliceSegments<'a> {
words: &'a [capnp::Word],
segment_slices: Vec<(usize, usize)>,
}
impl<'a> capnp::message::ReaderSegments for SliceSegments<'a> {
fn get_segment<'b>(&'b self, id: u32) -> Option<&'b [capnp::Word]> {
if id < self.segment_slices.len() as u32 {
let (a, b) = self.segment_slices[id as usize];
Some(&self.words[a..b])
} else {
None
}
}
fn len(&self) -> usize {
self.segment_slices.len()
}
}
fn read_segment_table<R>(read: &mut R,
options: capnp::message::ReaderOptions)
-> capnp::Result<(usize, Vec<(usize, usize)>)>
where R: Read
{
let mut buf: [u8; 8] = [0; 8];
// read the first Word, which contains segment_count and the 1st segment length
read.read_exact(&mut buf)?;
let segment_count = u32::from_le_bytes(buf[0..4].try_into().unwrap()).wrapping_add(1) as usize;
if segment_count >= 512 {
return Err(Error::failed(format!("Too many segments: {}", segment_count)))
} else if segment_count == 0 {
return Err(Error::failed(format!("Too few segments: {}", segment_count)))
}
let mut segment_slices = Vec::with_capacity(segment_count);
let mut total_words = u32::from_le_bytes(buf[4..8].try_into().unwrap()) as usize;
segment_slices.push((0, total_words));
if segment_count > 1 {
if segment_count < 4 {
read.read_exact(&mut buf)?;
for idx in 0..(segment_count - 1) {
let segment_len =
u32::from_le_bytes(buf[(idx * 4)..(idx + 1) * 4].try_into().unwrap()) as usize;
segment_slices.push((total_words, total_words + segment_len));
total_words += segment_len;
}
} else {
let mut segment_sizes = vec![0u8; (segment_count & !1) * 4];
read.read_exact(&mut segment_sizes[..])?;
for idx in 0..(segment_count - 1) {
let segment_len =
u32::from_le_bytes(segment_sizes[(idx * 4)..(idx + 1) * 4].try_into().unwrap()) as usize;
segment_slices.push((total_words, total_words + segment_len));
total_words += segment_len;
}
}
}
// Don't accept a message which the receiver couldn't possibly traverse without hitting the
// traversal limit. Without this check, a malicious client could transmit a very large segment
// size to make the receiver allocate excessive space and possibly crash.
if total_words as u64 > options.traversal_limit_in_words {
return Err(Error::failed(
format!("Message has {} words, which is too large. To increase the limit on the \
receiving end, see capnp::message::ReaderOptions.", total_words)))
}
Ok((total_words, segment_slices))
}

View File

@ -48,28 +48,26 @@ fn main() {
let mut summarizer = Summarizer::default();
let mut parser = IexParser::new(&buf[..]);
let mut output_buf: Vec<u8> = Vec::new();
// Pre-allocate the same size as the backing file. Will be way more than
// necessary, but makes sure there's no re-allocation not related to
// actual parsing/serialization code
let mut output_buf: Vec<u8> = Vec::with_capacity(buf.capacity());
/*
let mut capnp_writer = capnp_runner::CapnpWriter::new();
let capnp_reader = capnp_runner::CapnpReader::new();
for iex_payload in parser {
//let iex_payload = parser.next().unwrap();
capnp_writer.serialize(&iex_payload, &mut output_buf, true);
}
let capnp_reader = capnp_runner::CapnpReader::new();
let mut read_buf = StreamVec::new(output_buf);
let mut parsed_msgs: u64 = 0;
while let Ok(_) = capnp_reader.deserialize_packed(&mut read_buf, &mut summarizer) {
parsed_msgs += 1;
}
assert_eq!(read_buf.pos, read_buf.inner.len());
*/
let mut fb_writer = flatbuffers_runner::FlatbuffersWriter::new();
for iex_payload in parser {
fb_writer.serialize(&iex_payload, &mut output_buf);
}
@ -81,6 +79,20 @@ fn main() {
while let Ok(_) = fb_reader.deserialize(&mut read_buf, &mut summarizer) {
parsed_msgs += 1;
}
*/
let mut capnp_writer = capnp_runner::CapnpWriter::new();
for iex_payload in parser {
//let iex_payload = parser.next().unwrap();
capnp_writer.serialize(&iex_payload, &mut output_buf, false);
}
let capnp_reader = capnp_runner::CapnpReader::new();
let mut read_buf = StreamVec::new(output_buf);
let mut parsed_msgs: u64 = 0;
while let Ok(_) = capnp_reader.deserialize_unpacked(&mut read_buf, &mut summarizer) {
parsed_msgs += 1;
}
dbg!(parsed_msgs);
dbg!(summarizer);

View File

@ -144,17 +144,17 @@ pub mod multi_message {
impl Pipeline {
}
mod _private {
use capnp::private::layout;
use capnp::private::layout;
pub const STRUCT_SIZE: layout::StructSize = layout::StructSize { data: 1, pointers: 1 };
pub const STRUCT_SIZE: layout::StructSize = layout::StructSize { data: 1, pointers: 1 };
pub const TYPE_ID: u64 = 0xd13b_1bd4_36e1_ca9f;
}
}
pub mod message {
pub use self::Which::{Quote, Trade};
pub use self::Which::{Quote, Trade};
#[derive(Copy, Clone)]
#[derive(Copy, Clone)]
pub struct Owned;
impl <'a> ::capnp::traits::Owned<'a> for Owned { type Reader = Reader<'a>; type Builder = Builder<'a>; }
impl <'a> ::capnp::traits::OwnedStruct<'a> for Owned { type Reader = Reader<'a>; type Builder = Builder<'a>; }
@ -362,9 +362,9 @@ pub mod message {
impl Pipeline {
}
mod _private {
use capnp::private::layout;
use capnp::private::layout;
pub const STRUCT_SIZE: layout::StructSize = layout::StructSize { data: 2, pointers: 2 };
pub const STRUCT_SIZE: layout::StructSize = layout::StructSize { data: 2, pointers: 2 };
pub const TYPE_ID: u64 = 0x91d7_2965_3a3d_4be4;
}
pub enum Which<A0,A1> {
@ -506,9 +506,9 @@ pub mod trade {
impl Pipeline {
}
mod _private {
use capnp::private::layout;
use capnp::private::layout;
pub const STRUCT_SIZE: layout::StructSize = layout::StructSize { data: 2, pointers: 0 };
pub const STRUCT_SIZE: layout::StructSize = layout::StructSize { data: 2, pointers: 0 };
pub const TYPE_ID: u64 = 0xd29e_10bd_4e5f_c241;
}
}
@ -668,9 +668,9 @@ pub mod level_update {
impl Pipeline {
}
mod _private {
use capnp::private::layout;
use capnp::private::layout;
pub const STRUCT_SIZE: layout::StructSize = layout::StructSize { data: 2, pointers: 0 };
pub const STRUCT_SIZE: layout::StructSize = layout::StructSize { data: 2, pointers: 0 };
pub const TYPE_ID: u64 = 0xe664_c3b5_6628_c453;
}
}