Read flatbuffers

master
Bradlee Speice 2019-08-31 20:42:58 -04:00
parent 373bc22684
commit ccdec8ddfa
2 changed files with 72 additions and 6 deletions

View File

@ -1,10 +1,13 @@
use std::io::Write;
use std::convert::TryInto;
use std::io::{BufRead, Error, Write};
use std::mem::size_of;
use std::str::from_utf8_unchecked;
use capnp::data::new_builder;
use flatbuffers::buffer_has_identifier;
use nom::{bytes::complete::take_until, IResult};
use crate::{StreamVec, Summarizer};
use crate::iex::{IexMessage, IexPayload};
use crate::marketdata_generated::md_shootout;
@ -103,11 +106,65 @@ impl<'a> FlatbuffersWriter<'a> {
multimsg_builder.add_seq_no(payload.first_seq_no);
multimsg_builder.add_messages(messages);
let multimsg = multimsg_builder.finish();
self.builder.finish(multimsg, None);
;
output.write(self.builder.finished_data());
// IMPORTANT NOTE: If you just `finish`, Flatbuffers has no idea where
// an object ends in memory. To support streaming reads, you *must*
// use `finish_size_prefixed`. This adds a LE u32 to the front of the payload.
self.builder.finish_size_prefixed(multimsg, None);
output.write(self.builder.finished_data()).unwrap();
self.builder.reset();
self.message_buffer.clear();
}
}
pub struct FlatbuffersReader;
impl FlatbuffersReader {
pub fn new() -> FlatbuffersReader {
FlatbuffersReader {}
}
pub fn deserialize<'a>(&self, buf: &'a mut StreamVec, stats: &mut Summarizer) -> Result<(), ()> {
// Flatbuffers has kinda ad-hoc support for streaming: https://github.com/google/flatbuffers/issues/3898
// Essentially, you can write an optional `u32` value to the front of each message
// (`finish_size_prefixed` above) to figure out how long that message actually is.
// Ultimately, end-users are responsible for all buffer management, "reading" is just
// a view over the underlying buffer.
let data = buf.fill_buf().unwrap();
if data.len() == 0 {
return Err(())
}
let msg_len_buf: [u8; 4] = data[..size_of::<u32>()].try_into().unwrap();
let msg_len = u32::from_le_bytes(msg_len_buf) as usize;
let multimsg = flatbuffers::get_size_prefixed_root::<md_shootout::MultiMessage>(data);
let msg_vec = match multimsg.messages() {
Some(m) => m,
None => panic!("Couldn't find messages")
};
for i in 0..msg_vec.len() {
let msg: md_shootout::Message = msg_vec.get(i);
match msg.body_type() {
md_shootout::MessageBody::Trade => {
let trade = msg.body_as_trade().unwrap();
stats.append_trade_volume(msg.symbol().unwrap(), trade.size_().into());
},
md_shootout::MessageBody::LevelUpdate => {
let lu = msg.body_as_level_update().unwrap();
let is_bid = match lu.side() {
md_shootout::Side::Buy => true,
_ => false
};
stats.update_quote_prices(msg.symbol().unwrap(), lu.price(), is_bid);
},
md_shootout::MessageBody::NONE => panic!("Unrecognized message type")
}
}
buf.consume(msg_len + size_of::<u32>());
Ok(())
}
}

View File

@ -66,8 +66,6 @@ fn main() {
}
assert_eq!(read_buf.pos, read_buf.inner.len());
dbg!(parsed_msgs);
dbg!(summarizer);
*/
let mut fb_writer = flatbuffers_runner::FlatbuffersWriter::new();
@ -75,6 +73,17 @@ fn main() {
for iex_payload in parser {
fb_writer.serialize(&iex_payload, &mut output_buf);
}
let mut read_buf = StreamVec::new(output_buf);
let fb_reader = flatbuffers_runner::FlatbuffersReader::new();
let mut parsed_msgs = 0;
while let Ok(_) = fb_reader.deserialize(&mut read_buf, &mut summarizer) {
parsed_msgs += 1;
}
dbg!(parsed_msgs);
dbg!(summarizer);
}
#[derive(Debug)]