diff --git a/src/capnp_runner.rs b/src/capnp_runner.rs index b9ab505..e71a0bc 100644 --- a/src/capnp_runner.rs +++ b/src/capnp_runner.rs @@ -1,19 +1,16 @@ -use std::cmp::{max, min}; -use std::collections::hash_map::{DefaultHasher, HashMap}; -use std::hash::Hasher; -use std::io::{BufReader, Error, Read}; use std::str::from_utf8_unchecked; -use capnp::message::ReaderOptions; -use capnp::serialize::{read_message, write_message}; +use capnp::Error; +use capnp::message::{Builder, ReaderOptions, ScratchSpace, ScratchSpaceHeapAllocator}; +use capnp::serialize::write_message; use capnp::serialize_packed::{read_message as read_message_packed, write_message as write_message_packed}; use nom::bytes::complete::take_until; use nom::IResult; -use crate::iex::{IexMessage, IexParser}; +use crate::{StreamVec, Summarizer}; +use crate::iex::{IexMessage, IexPayload}; use crate::marketdata_capnp::{multi_message, Side}; use crate::marketdata_capnp::message; -use crate::SummaryStats; fn __take_until<'a>(tag: &'static str, input: &'a [u8]) -> IResult<&'a [u8], &'a [u8]> { take_until(tag)(input) @@ -25,22 +22,44 @@ fn parse_symbol(sym: &[u8; 8]) -> &str { unsafe { from_utf8_unchecked(sym_bytes) } } -pub fn serialize_capnp(parser: IexParser, size_hint: usize, packed: bool) -> Vec { - let write_fn = if packed { write_message_packed } else { write_message }; +pub struct CapnpWriter<'a> { + // We have to be very careful with how messages are built, as running + // `init_root` and rebuilding will still accumulate garbage if using + // the standard HeapAllocator. + // https://github.com/capnproto/capnproto-rust/issues/111 + words: Vec, + scratch: ScratchSpace<'a>, +} - // Because CapNProto builds messages in heap before serialization, - // we'll reserve memory up front and should avoid alloc calls later - let mut capnp_message = capnp::message::Builder::new_default(); - let multimsg = capnp_message.init_root::(); - multimsg.init_messages(256); +impl<'a> CapnpWriter<'a> { + pub fn new() -> CapnpWriter<'a> { + // Cap'n'Proto words are 8 bytes, MTU is 1500 bytes, theoretically need only 188 words. + // In practice, let's just make sure everything fits. + let mut words = capnp::Word::allocate_zeroed_vec(1024); - // Allocate our output buffer - let mut output: Vec = Vec::with_capacity(size_hint); + let mut scratch = ScratchSpace::new(unsafe { + std::mem::transmute(&mut words[..]) + }); - // Now to the actual work - for iex_msg in parser { - // Find the messages we actually care about in this context - let num_msgs = iex_msg.messages.iter().map(|m| { + CapnpWriter { + words, + scratch, + } + } + + fn builder(&mut self) -> capnp::message::Builder> { + // Builders are only safe to use for serializing a single message. We can re-use the + // backing memory (unsafe because now both `self` and the returned builder have a + // mutable reference to `self.scratch), but Bad Things happen if we don't drop + // in between serialization. + capnp::message::Builder::new(ScratchSpaceHeapAllocator::new(unsafe { + std::mem::transmute(&mut self.scratch) + })) + } + + pub fn serialize(&mut self, payload: &IexPayload, mut output: &mut Vec, packed: bool) { + // First, count the messages we actually care about. + let num_msgs = payload.messages.iter().map(|m| { match m { IexMessage::TradeReport(_) | IexMessage::PriceLevelUpdate(_) => 1, _ => 0 @@ -48,16 +67,24 @@ pub fn serialize_capnp(parser: IexParser, size_hint: usize, packed: bool) -> Vec }).fold(0, |sum, i| sum + i); if num_msgs == 0 { - continue; + return; } // And actually serialize the IEX payload to CapNProto format - let mut multimsg = capnp_message.init_root::(); - multimsg.set_seq_no(iex_msg.first_seq_no); + + // This is the unsafe (but faster) version + let mut builder = self.builder(); + let mut multimsg = builder.init_root::(); + + // And the safe version used for testing + //let mut builder = capnp::message::Builder::new_default(); + //let mut multimsg = builder.init_root::(); + + multimsg.set_seq_no(payload.first_seq_no); let mut messages = multimsg.init_messages(num_msgs as u32); let mut current_msg_no = 0; - for iex_msg in iex_msg.messages { + for iex_msg in payload.messages.iter() { match iex_msg { IexMessage::TradeReport(tr) => { let mut message = messages.reborrow().get(current_msg_no); @@ -91,76 +118,47 @@ pub fn serialize_capnp(parser: IexParser, size_hint: usize, packed: bool) -> Vec } } - write_fn(&mut output, &capnp_message).unwrap(); - } + let write_fn = if packed { write_message_packed } else { write_message }; - output -} - -struct AdvancingVec<'a> { - pos: usize, - inner: &'a Vec, -} - -impl<'a> Read for AdvancingVec<'a> { - fn read(&mut self, buf: &mut [u8]) -> Result { - // TODO: There's *got* to be a better way to handle this - let end = self.pos + buf.len(); - let end = if end > self.inner.len() { self.inner.len() } else { end }; - let read_size = end - self.pos; - buf[..read_size].copy_from_slice(&self.inner[self.pos..end]); - self.pos = end; - - Ok(read_size) + write_fn(&mut output, &builder).unwrap(); } } -pub fn read_capnp(buffer: &Vec, packed: bool) -> HashMap { - let read_fn = if packed { read_message_packed } else { read_message }; - let unbuffered = AdvancingVec { - pos: 0, - inner: buffer, - }; - let mut buffered = BufReader::new(unbuffered); - let read_opts = ReaderOptions::new(); +pub struct CapnpReader { + read_opts: ReaderOptions +} - let mut stats = HashMap::new(); - - while let Ok(msg) = read_fn(&mut buffered, read_opts) { - let multimsg = msg.get_root::().unwrap(); - - for msg in multimsg.get_messages().unwrap().iter() { - // Hash the symbol name since we can't return a HashMap containing - // string pointers as the keys - let sym = msg.get_symbol().unwrap(); - let mut h = DefaultHasher::new(); - h.write(sym.as_bytes()); - let key = h.finish(); - - let mut sym_stats = stats.entry(key) - .or_insert(SummaryStats::new(sym)); - - match msg.which() { - Ok(message::Trade(tr)) => { - let tr = tr.unwrap(); - sym_stats.trade_volume += tr.get_size() as u64; - } - Ok(message::Quote(q)) => { - let q = q.unwrap(); - if q.get_side().unwrap() == Side::Buy { - sym_stats.bid_high = max(sym_stats.bid_high, q.get_price()); - sym_stats.bid_low = min(sym_stats.bid_low, q.get_price()); - } else { - sym_stats.ask_high = max(sym_stats.ask_high, q.get_price()); - sym_stats.ask_low = min(sym_stats.ask_low, q.get_price()); - } - } - _ => { - panic!("Unrecognized message type") - } - } +impl CapnpReader { + pub fn new() -> CapnpReader { + CapnpReader { + read_opts: ReaderOptions::new() } } - stats + pub fn deserialize_packed<'a>(&self, buf: &'a mut StreamVec, stats: &mut Summarizer) -> Result<(), Error> { + // Because `capnp::serialize_packed::PackedRead` is hidden from us, packed reads + // *have* to both allocate new segments every read, and copy the buffer into + // those same segments. Un-packed reading can use `SliceSegments` for true zero-copy + let reader = read_message_packed(buf, self.read_opts)?; + + let multimsg = reader.get_root::().unwrap(); + for msg in multimsg.get_messages().unwrap().iter() { + match msg.which() { + Ok(message::Trade(tr)) => { + let tr = tr.unwrap(); + stats.append_trade_volume(msg.get_symbol().unwrap(), tr.get_size() as u64); + }, + Ok(message::Quote(q)) => { + let q = q.unwrap(); + let is_bid = match q.get_side().unwrap() { + Side::Buy => true, + _ => false + }; + stats.update_quote_prices(msg.get_symbol().unwrap(), q.get_price(), is_bid); + }, + _ => panic!("Unrecognized message type!") + } + }; + Ok(()) + } } diff --git a/src/main.rs b/src/main.rs index 28d21b3..24cc24f 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,11 +1,15 @@ +use std::cmp::{max, min}; +use std::collections::hash_map::{DefaultHasher, HashMap}; use std::fs::File; -use std::io::Read; +use std::hash::Hasher; +use std::io::{BufRead, Read}; +use std::io::Error; use std::path::Path; use std::time::SystemTime; use clap::{App, Arg}; -use crate::iex::{IexMessage, IexParser}; +use crate::iex::IexParser; // Cap'n'Proto and Flatbuffers typically ask that you generate code on the fly to match // the schemas. For purposes of auto-complete and easy browsing in the repository, @@ -39,19 +43,28 @@ fn main() { file.read_to_end(&mut buf) .expect(&format!("Unable to read file={}", path.display())); - let start = SystemTime::now(); + let _start = SystemTime::now(); + let mut summarizer = Summarizer::default(); + let mut parser = IexParser::new(&buf[..]); - // Try with Capnproto for now - let parser = IexParser::new(&buf[..]); - let capnp_buf = capnp_runner::serialize_capnp(parser, buf.len(), true); - let stats = capnp_runner::read_capnp(&capnp_buf, true); + let mut capnp_writer = capnp_runner::CapnpWriter::new(); + let capnp_reader = capnp_runner::CapnpReader::new(); + let mut output_buf = Vec::new(); - dbg!(stats); + for iex_payload in parser { + //let iex_payload = parser.next().unwrap(); + capnp_writer.serialize(&iex_payload, &mut output_buf, true); + } - println!( - "Parse time seconds={}", - SystemTime::now().duration_since(start).unwrap().as_secs() - ) + let mut read_buf = StreamVec::new(output_buf); + let mut parsed_msgs: u64 = 0; + while let Ok(_) = capnp_reader.deserialize_packed(&mut read_buf, &mut summarizer) { + parsed_msgs += 1; + } + + assert_eq!(read_buf.pos, read_buf.inner.len()); + dbg!(parsed_msgs); + dbg!(summarizer); } #[derive(Debug)] @@ -64,15 +77,75 @@ pub struct SummaryStats { ask_low: u64, } -impl SummaryStats { - fn new(sym: &str) -> SummaryStats { - SummaryStats { - symbol: sym.to_string(), - trade_volume: 0, - bid_high: 0, - bid_low: u64::max_value(), - ask_high: 0, - ask_low: u64::max_value(), +#[derive(Default, Debug)] +pub struct Summarizer { + data: HashMap +} + +impl Summarizer { + fn entry(&mut self, sym: &str) -> &mut SummaryStats { + let mut hasher = DefaultHasher::new(); + hasher.write(sym.as_bytes()); + self.data.entry(hasher.finish()) + .or_insert(SummaryStats { + symbol: sym.to_string(), + trade_volume: 0, + bid_high: 0, + bid_low: u64::max_value(), + ask_high: 0, + ask_low: u64::max_value(), + }) + } + + pub fn append_trade_volume(&mut self, sym: &str, volume: u64) { + self.entry(sym).trade_volume += volume; + } + + pub fn update_quote_prices(&mut self, sym: &str, price: u64, is_buy: bool) { + let entry = self.entry(sym); + if is_buy { + entry.bid_low = min(entry.bid_low, price); + entry.bid_high = max(entry.bid_high, price); + } else { + entry.ask_low = min(entry.ask_low, price); + entry.ask_high = max(entry.ask_high, price); } } } + +pub struct StreamVec { + pos: usize, + inner: Vec, +} + +impl StreamVec { + pub fn new(buf: Vec) -> StreamVec { + StreamVec { + pos: 0, + inner: buf, + } + } +} + +impl Read for StreamVec { + fn read(&mut self, buf: &mut [u8]) -> Result { + // TODO: There's *got* to be a better way to handle this + let end = self.pos + buf.len(); + let end = if end > self.inner.len() { self.inner.len() } else { end }; + let read_size = end - self.pos; + buf[..read_size].copy_from_slice(&self.inner[self.pos..end]); + self.pos = end; + + Ok(read_size) + } +} + +impl BufRead for StreamVec { + fn fill_buf(&mut self) -> Result<&[u8], Error> { + Ok(&self.inner[self.pos..]) + } + + fn consume(&mut self, amt: usize) { + self.pos += amt; + } +}