From ccdec8ddfa9dae1550f0589ef6116d5a21a147ae Mon Sep 17 00:00:00 2001 From: Bradlee Speice Date: Sat, 31 Aug 2019 20:42:58 -0400 Subject: [PATCH] Read flatbuffers --- src/flatbuffers_runner.rs | 65 ++++++++++++++++++++++++++++++++++++--- src/main.rs | 13 ++++++-- 2 files changed, 72 insertions(+), 6 deletions(-) diff --git a/src/flatbuffers_runner.rs b/src/flatbuffers_runner.rs index bad5f97..d7de476 100644 --- a/src/flatbuffers_runner.rs +++ b/src/flatbuffers_runner.rs @@ -1,10 +1,13 @@ -use std::io::Write; +use std::convert::TryInto; +use std::io::{BufRead, Error, Write}; +use std::mem::size_of; use std::str::from_utf8_unchecked; use capnp::data::new_builder; use flatbuffers::buffer_has_identifier; use nom::{bytes::complete::take_until, IResult}; +use crate::{StreamVec, Summarizer}; use crate::iex::{IexMessage, IexPayload}; use crate::marketdata_generated::md_shootout; @@ -103,11 +106,65 @@ impl<'a> FlatbuffersWriter<'a> { multimsg_builder.add_seq_no(payload.first_seq_no); multimsg_builder.add_messages(messages); let multimsg = multimsg_builder.finish(); - self.builder.finish(multimsg, None); - ; - output.write(self.builder.finished_data()); + + // IMPORTANT NOTE: If you just `finish`, Flatbuffers has no idea where + // an object ends in memory. To support streaming reads, you *must* + // use `finish_size_prefixed`. This adds a LE u32 to the front of the payload. + self.builder.finish_size_prefixed(multimsg, None); + output.write(self.builder.finished_data()).unwrap(); self.builder.reset(); self.message_buffer.clear(); } } + +pub struct FlatbuffersReader; + +impl FlatbuffersReader { + pub fn new() -> FlatbuffersReader { + FlatbuffersReader {} + } + + pub fn deserialize<'a>(&self, buf: &'a mut StreamVec, stats: &mut Summarizer) -> Result<(), ()> { + // Flatbuffers has kinda ad-hoc support for streaming: https://github.com/google/flatbuffers/issues/3898 + // Essentially, you can write an optional `u32` value to the front of each message + // (`finish_size_prefixed` above) to figure out how long that message actually is. + // Ultimately, end-users are responsible for all buffer management, "reading" is just + // a view over the underlying buffer. + let data = buf.fill_buf().unwrap(); + if data.len() == 0 { + return Err(()) + } + + let msg_len_buf: [u8; 4] = data[..size_of::()].try_into().unwrap(); + let msg_len = u32::from_le_bytes(msg_len_buf) as usize; + + let multimsg = flatbuffers::get_size_prefixed_root::(data); + let msg_vec = match multimsg.messages() { + Some(m) => m, + None => panic!("Couldn't find messages") + }; + + for i in 0..msg_vec.len() { + let msg: md_shootout::Message = msg_vec.get(i); + match msg.body_type() { + md_shootout::MessageBody::Trade => { + let trade = msg.body_as_trade().unwrap(); + stats.append_trade_volume(msg.symbol().unwrap(), trade.size_().into()); + }, + md_shootout::MessageBody::LevelUpdate => { + let lu = msg.body_as_level_update().unwrap(); + let is_bid = match lu.side() { + md_shootout::Side::Buy => true, + _ => false + }; + stats.update_quote_prices(msg.symbol().unwrap(), lu.price(), is_bid); + }, + md_shootout::MessageBody::NONE => panic!("Unrecognized message type") + } + } + + buf.consume(msg_len + size_of::()); + Ok(()) + } +} diff --git a/src/main.rs b/src/main.rs index bc589c7..f4c95d8 100644 --- a/src/main.rs +++ b/src/main.rs @@ -66,8 +66,6 @@ fn main() { } assert_eq!(read_buf.pos, read_buf.inner.len()); - dbg!(parsed_msgs); - dbg!(summarizer); */ let mut fb_writer = flatbuffers_runner::FlatbuffersWriter::new(); @@ -75,6 +73,17 @@ fn main() { for iex_payload in parser { fb_writer.serialize(&iex_payload, &mut output_buf); } + + let mut read_buf = StreamVec::new(output_buf); + + let fb_reader = flatbuffers_runner::FlatbuffersReader::new(); + let mut parsed_msgs = 0; + while let Ok(_) = fb_reader.deserialize(&mut read_buf, &mut summarizer) { + parsed_msgs += 1; + } + + dbg!(parsed_msgs); + dbg!(summarizer); } #[derive(Debug)]