mirror of
https://github.com/speice-io/marketdata-shootout
synced 2024-11-16 21:28:08 -05:00
Fix Builder
usage.
Long story short, builders are one per message.
This commit is contained in:
parent
bf71146152
commit
b333fd3810
@ -1,19 +1,16 @@
|
|||||||
use std::cmp::{max, min};
|
|
||||||
use std::collections::hash_map::{DefaultHasher, HashMap};
|
|
||||||
use std::hash::Hasher;
|
|
||||||
use std::io::{BufReader, Error, Read};
|
|
||||||
use std::str::from_utf8_unchecked;
|
use std::str::from_utf8_unchecked;
|
||||||
|
|
||||||
use capnp::message::ReaderOptions;
|
use capnp::Error;
|
||||||
use capnp::serialize::{read_message, write_message};
|
use capnp::message::{Builder, ReaderOptions, ScratchSpace, ScratchSpaceHeapAllocator};
|
||||||
|
use capnp::serialize::write_message;
|
||||||
use capnp::serialize_packed::{read_message as read_message_packed, write_message as write_message_packed};
|
use capnp::serialize_packed::{read_message as read_message_packed, write_message as write_message_packed};
|
||||||
use nom::bytes::complete::take_until;
|
use nom::bytes::complete::take_until;
|
||||||
use nom::IResult;
|
use nom::IResult;
|
||||||
|
|
||||||
use crate::iex::{IexMessage, IexParser};
|
use crate::{StreamVec, Summarizer};
|
||||||
|
use crate::iex::{IexMessage, IexPayload};
|
||||||
use crate::marketdata_capnp::{multi_message, Side};
|
use crate::marketdata_capnp::{multi_message, Side};
|
||||||
use crate::marketdata_capnp::message;
|
use crate::marketdata_capnp::message;
|
||||||
use crate::SummaryStats;
|
|
||||||
|
|
||||||
fn __take_until<'a>(tag: &'static str, input: &'a [u8]) -> IResult<&'a [u8], &'a [u8]> {
|
fn __take_until<'a>(tag: &'static str, input: &'a [u8]) -> IResult<&'a [u8], &'a [u8]> {
|
||||||
take_until(tag)(input)
|
take_until(tag)(input)
|
||||||
@ -25,22 +22,44 @@ fn parse_symbol(sym: &[u8; 8]) -> &str {
|
|||||||
unsafe { from_utf8_unchecked(sym_bytes) }
|
unsafe { from_utf8_unchecked(sym_bytes) }
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn serialize_capnp(parser: IexParser, size_hint: usize, packed: bool) -> Vec<u8> {
|
pub struct CapnpWriter<'a> {
|
||||||
let write_fn = if packed { write_message_packed } else { write_message };
|
// We have to be very careful with how messages are built, as running
|
||||||
|
// `init_root` and rebuilding will still accumulate garbage if using
|
||||||
|
// the standard HeapAllocator.
|
||||||
|
// https://github.com/capnproto/capnproto-rust/issues/111
|
||||||
|
words: Vec<capnp::Word>,
|
||||||
|
scratch: ScratchSpace<'a>,
|
||||||
|
}
|
||||||
|
|
||||||
// Because CapNProto builds messages in heap before serialization,
|
impl<'a> CapnpWriter<'a> {
|
||||||
// we'll reserve memory up front and should avoid alloc calls later
|
pub fn new() -> CapnpWriter<'a> {
|
||||||
let mut capnp_message = capnp::message::Builder::new_default();
|
// Cap'n'Proto words are 8 bytes, MTU is 1500 bytes, theoretically need only 188 words.
|
||||||
let multimsg = capnp_message.init_root::<multi_message::Builder>();
|
// In practice, let's just make sure everything fits.
|
||||||
multimsg.init_messages(256);
|
let mut words = capnp::Word::allocate_zeroed_vec(1024);
|
||||||
|
|
||||||
// Allocate our output buffer
|
let mut scratch = ScratchSpace::new(unsafe {
|
||||||
let mut output: Vec<u8> = Vec::with_capacity(size_hint);
|
std::mem::transmute(&mut words[..])
|
||||||
|
});
|
||||||
|
|
||||||
// Now to the actual work
|
CapnpWriter {
|
||||||
for iex_msg in parser {
|
words,
|
||||||
// Find the messages we actually care about in this context
|
scratch,
|
||||||
let num_msgs = iex_msg.messages.iter().map(|m| {
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn builder(&mut self) -> capnp::message::Builder<ScratchSpaceHeapAllocator<'a, 'a>> {
|
||||||
|
// Builders are only safe to use for serializing a single message. We can re-use the
|
||||||
|
// backing memory (unsafe because now both `self` and the returned builder have a
|
||||||
|
// mutable reference to `self.scratch), but Bad Things happen if we don't drop
|
||||||
|
// in between serialization.
|
||||||
|
capnp::message::Builder::new(ScratchSpaceHeapAllocator::new(unsafe {
|
||||||
|
std::mem::transmute(&mut self.scratch)
|
||||||
|
}))
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn serialize(&mut self, payload: &IexPayload, mut output: &mut Vec<u8>, packed: bool) {
|
||||||
|
// First, count the messages we actually care about.
|
||||||
|
let num_msgs = payload.messages.iter().map(|m| {
|
||||||
match m {
|
match m {
|
||||||
IexMessage::TradeReport(_) | IexMessage::PriceLevelUpdate(_) => 1,
|
IexMessage::TradeReport(_) | IexMessage::PriceLevelUpdate(_) => 1,
|
||||||
_ => 0
|
_ => 0
|
||||||
@ -48,16 +67,24 @@ pub fn serialize_capnp(parser: IexParser, size_hint: usize, packed: bool) -> Vec
|
|||||||
}).fold(0, |sum, i| sum + i);
|
}).fold(0, |sum, i| sum + i);
|
||||||
|
|
||||||
if num_msgs == 0 {
|
if num_msgs == 0 {
|
||||||
continue;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
// And actually serialize the IEX payload to CapNProto format
|
// And actually serialize the IEX payload to CapNProto format
|
||||||
let mut multimsg = capnp_message.init_root::<multi_message::Builder>();
|
|
||||||
multimsg.set_seq_no(iex_msg.first_seq_no);
|
// This is the unsafe (but faster) version
|
||||||
|
let mut builder = self.builder();
|
||||||
|
let mut multimsg = builder.init_root::<multi_message::Builder>();
|
||||||
|
|
||||||
|
// And the safe version used for testing
|
||||||
|
//let mut builder = capnp::message::Builder::new_default();
|
||||||
|
//let mut multimsg = builder.init_root::<multi_message::Builder>();
|
||||||
|
|
||||||
|
multimsg.set_seq_no(payload.first_seq_no);
|
||||||
|
|
||||||
let mut messages = multimsg.init_messages(num_msgs as u32);
|
let mut messages = multimsg.init_messages(num_msgs as u32);
|
||||||
let mut current_msg_no = 0;
|
let mut current_msg_no = 0;
|
||||||
for iex_msg in iex_msg.messages {
|
for iex_msg in payload.messages.iter() {
|
||||||
match iex_msg {
|
match iex_msg {
|
||||||
IexMessage::TradeReport(tr) => {
|
IexMessage::TradeReport(tr) => {
|
||||||
let mut message = messages.reborrow().get(current_msg_no);
|
let mut message = messages.reborrow().get(current_msg_no);
|
||||||
@ -91,76 +118,47 @@ pub fn serialize_capnp(parser: IexParser, size_hint: usize, packed: bool) -> Vec
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
write_fn(&mut output, &capnp_message).unwrap();
|
let write_fn = if packed { write_message_packed } else { write_message };
|
||||||
}
|
|
||||||
|
|
||||||
output
|
write_fn(&mut output, &builder).unwrap();
|
||||||
}
|
|
||||||
|
|
||||||
struct AdvancingVec<'a> {
|
|
||||||
pos: usize,
|
|
||||||
inner: &'a Vec<u8>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<'a> Read for AdvancingVec<'a> {
|
|
||||||
fn read(&mut self, buf: &mut [u8]) -> Result<usize, Error> {
|
|
||||||
// TODO: There's *got* to be a better way to handle this
|
|
||||||
let end = self.pos + buf.len();
|
|
||||||
let end = if end > self.inner.len() { self.inner.len() } else { end };
|
|
||||||
let read_size = end - self.pos;
|
|
||||||
buf[..read_size].copy_from_slice(&self.inner[self.pos..end]);
|
|
||||||
self.pos = end;
|
|
||||||
|
|
||||||
Ok(read_size)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn read_capnp(buffer: &Vec<u8>, packed: bool) -> HashMap<u64, SummaryStats> {
|
pub struct CapnpReader {
|
||||||
let read_fn = if packed { read_message_packed } else { read_message };
|
read_opts: ReaderOptions
|
||||||
let unbuffered = AdvancingVec {
|
}
|
||||||
pos: 0,
|
|
||||||
inner: buffer,
|
|
||||||
};
|
|
||||||
let mut buffered = BufReader::new(unbuffered);
|
|
||||||
let read_opts = ReaderOptions::new();
|
|
||||||
|
|
||||||
let mut stats = HashMap::new();
|
impl CapnpReader {
|
||||||
|
pub fn new() -> CapnpReader {
|
||||||
while let Ok(msg) = read_fn(&mut buffered, read_opts) {
|
CapnpReader {
|
||||||
let multimsg = msg.get_root::<multi_message::Reader>().unwrap();
|
read_opts: ReaderOptions::new()
|
||||||
|
|
||||||
for msg in multimsg.get_messages().unwrap().iter() {
|
|
||||||
// Hash the symbol name since we can't return a HashMap containing
|
|
||||||
// string pointers as the keys
|
|
||||||
let sym = msg.get_symbol().unwrap();
|
|
||||||
let mut h = DefaultHasher::new();
|
|
||||||
h.write(sym.as_bytes());
|
|
||||||
let key = h.finish();
|
|
||||||
|
|
||||||
let mut sym_stats = stats.entry(key)
|
|
||||||
.or_insert(SummaryStats::new(sym));
|
|
||||||
|
|
||||||
match msg.which() {
|
|
||||||
Ok(message::Trade(tr)) => {
|
|
||||||
let tr = tr.unwrap();
|
|
||||||
sym_stats.trade_volume += tr.get_size() as u64;
|
|
||||||
}
|
|
||||||
Ok(message::Quote(q)) => {
|
|
||||||
let q = q.unwrap();
|
|
||||||
if q.get_side().unwrap() == Side::Buy {
|
|
||||||
sym_stats.bid_high = max(sym_stats.bid_high, q.get_price());
|
|
||||||
sym_stats.bid_low = min(sym_stats.bid_low, q.get_price());
|
|
||||||
} else {
|
|
||||||
sym_stats.ask_high = max(sym_stats.ask_high, q.get_price());
|
|
||||||
sym_stats.ask_low = min(sym_stats.ask_low, q.get_price());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
_ => {
|
|
||||||
panic!("Unrecognized message type")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
stats
|
pub fn deserialize_packed<'a>(&self, buf: &'a mut StreamVec, stats: &mut Summarizer) -> Result<(), Error> {
|
||||||
|
// Because `capnp::serialize_packed::PackedRead` is hidden from us, packed reads
|
||||||
|
// *have* to both allocate new segments every read, and copy the buffer into
|
||||||
|
// those same segments. Un-packed reading can use `SliceSegments` for true zero-copy
|
||||||
|
let reader = read_message_packed(buf, self.read_opts)?;
|
||||||
|
|
||||||
|
let multimsg = reader.get_root::<multi_message::Reader>().unwrap();
|
||||||
|
for msg in multimsg.get_messages().unwrap().iter() {
|
||||||
|
match msg.which() {
|
||||||
|
Ok(message::Trade(tr)) => {
|
||||||
|
let tr = tr.unwrap();
|
||||||
|
stats.append_trade_volume(msg.get_symbol().unwrap(), tr.get_size() as u64);
|
||||||
|
},
|
||||||
|
Ok(message::Quote(q)) => {
|
||||||
|
let q = q.unwrap();
|
||||||
|
let is_bid = match q.get_side().unwrap() {
|
||||||
|
Side::Buy => true,
|
||||||
|
_ => false
|
||||||
|
};
|
||||||
|
stats.update_quote_prices(msg.get_symbol().unwrap(), q.get_price(), is_bid);
|
||||||
|
},
|
||||||
|
_ => panic!("Unrecognized message type!")
|
||||||
|
}
|
||||||
|
};
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
115
src/main.rs
115
src/main.rs
@ -1,11 +1,15 @@
|
|||||||
|
use std::cmp::{max, min};
|
||||||
|
use std::collections::hash_map::{DefaultHasher, HashMap};
|
||||||
use std::fs::File;
|
use std::fs::File;
|
||||||
use std::io::Read;
|
use std::hash::Hasher;
|
||||||
|
use std::io::{BufRead, Read};
|
||||||
|
use std::io::Error;
|
||||||
use std::path::Path;
|
use std::path::Path;
|
||||||
use std::time::SystemTime;
|
use std::time::SystemTime;
|
||||||
|
|
||||||
use clap::{App, Arg};
|
use clap::{App, Arg};
|
||||||
|
|
||||||
use crate::iex::{IexMessage, IexParser};
|
use crate::iex::IexParser;
|
||||||
|
|
||||||
// Cap'n'Proto and Flatbuffers typically ask that you generate code on the fly to match
|
// Cap'n'Proto and Flatbuffers typically ask that you generate code on the fly to match
|
||||||
// the schemas. For purposes of auto-complete and easy browsing in the repository,
|
// the schemas. For purposes of auto-complete and easy browsing in the repository,
|
||||||
@ -39,19 +43,28 @@ fn main() {
|
|||||||
file.read_to_end(&mut buf)
|
file.read_to_end(&mut buf)
|
||||||
.expect(&format!("Unable to read file={}", path.display()));
|
.expect(&format!("Unable to read file={}", path.display()));
|
||||||
|
|
||||||
let start = SystemTime::now();
|
let _start = SystemTime::now();
|
||||||
|
let mut summarizer = Summarizer::default();
|
||||||
|
let mut parser = IexParser::new(&buf[..]);
|
||||||
|
|
||||||
// Try with Capnproto for now
|
let mut capnp_writer = capnp_runner::CapnpWriter::new();
|
||||||
let parser = IexParser::new(&buf[..]);
|
let capnp_reader = capnp_runner::CapnpReader::new();
|
||||||
let capnp_buf = capnp_runner::serialize_capnp(parser, buf.len(), true);
|
let mut output_buf = Vec::new();
|
||||||
let stats = capnp_runner::read_capnp(&capnp_buf, true);
|
|
||||||
|
|
||||||
dbg!(stats);
|
for iex_payload in parser {
|
||||||
|
//let iex_payload = parser.next().unwrap();
|
||||||
|
capnp_writer.serialize(&iex_payload, &mut output_buf, true);
|
||||||
|
}
|
||||||
|
|
||||||
println!(
|
let mut read_buf = StreamVec::new(output_buf);
|
||||||
"Parse time seconds={}",
|
let mut parsed_msgs: u64 = 0;
|
||||||
SystemTime::now().duration_since(start).unwrap().as_secs()
|
while let Ok(_) = capnp_reader.deserialize_packed(&mut read_buf, &mut summarizer) {
|
||||||
)
|
parsed_msgs += 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
assert_eq!(read_buf.pos, read_buf.inner.len());
|
||||||
|
dbg!(parsed_msgs);
|
||||||
|
dbg!(summarizer);
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
@ -64,15 +77,75 @@ pub struct SummaryStats {
|
|||||||
ask_low: u64,
|
ask_low: u64,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl SummaryStats {
|
#[derive(Default, Debug)]
|
||||||
fn new(sym: &str) -> SummaryStats {
|
pub struct Summarizer {
|
||||||
SummaryStats {
|
data: HashMap<u64, SummaryStats>
|
||||||
symbol: sym.to_string(),
|
}
|
||||||
trade_volume: 0,
|
|
||||||
bid_high: 0,
|
impl Summarizer {
|
||||||
bid_low: u64::max_value(),
|
fn entry(&mut self, sym: &str) -> &mut SummaryStats {
|
||||||
ask_high: 0,
|
let mut hasher = DefaultHasher::new();
|
||||||
ask_low: u64::max_value(),
|
hasher.write(sym.as_bytes());
|
||||||
|
self.data.entry(hasher.finish())
|
||||||
|
.or_insert(SummaryStats {
|
||||||
|
symbol: sym.to_string(),
|
||||||
|
trade_volume: 0,
|
||||||
|
bid_high: 0,
|
||||||
|
bid_low: u64::max_value(),
|
||||||
|
ask_high: 0,
|
||||||
|
ask_low: u64::max_value(),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn append_trade_volume(&mut self, sym: &str, volume: u64) {
|
||||||
|
self.entry(sym).trade_volume += volume;
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn update_quote_prices(&mut self, sym: &str, price: u64, is_buy: bool) {
|
||||||
|
let entry = self.entry(sym);
|
||||||
|
if is_buy {
|
||||||
|
entry.bid_low = min(entry.bid_low, price);
|
||||||
|
entry.bid_high = max(entry.bid_high, price);
|
||||||
|
} else {
|
||||||
|
entry.ask_low = min(entry.ask_low, price);
|
||||||
|
entry.ask_high = max(entry.ask_high, price);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub struct StreamVec {
|
||||||
|
pos: usize,
|
||||||
|
inner: Vec<u8>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl StreamVec {
|
||||||
|
pub fn new(buf: Vec<u8>) -> StreamVec {
|
||||||
|
StreamVec {
|
||||||
|
pos: 0,
|
||||||
|
inner: buf,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Read for StreamVec {
|
||||||
|
fn read(&mut self, buf: &mut [u8]) -> Result<usize, Error> {
|
||||||
|
// TODO: There's *got* to be a better way to handle this
|
||||||
|
let end = self.pos + buf.len();
|
||||||
|
let end = if end > self.inner.len() { self.inner.len() } else { end };
|
||||||
|
let read_size = end - self.pos;
|
||||||
|
buf[..read_size].copy_from_slice(&self.inner[self.pos..end]);
|
||||||
|
self.pos = end;
|
||||||
|
|
||||||
|
Ok(read_size)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl BufRead for StreamVec {
|
||||||
|
fn fill_buf(&mut self) -> Result<&[u8], Error> {
|
||||||
|
Ok(&self.inner[self.pos..])
|
||||||
|
}
|
||||||
|
|
||||||
|
fn consume(&mut self, amt: usize) {
|
||||||
|
self.pos += amt;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user