Add support for writing Flatbuffers

Need to figure out stream parsing yet
This commit is contained in:
Bradlee Speice 2019-08-31 17:58:44 -04:00
parent b333fd3810
commit 373bc22684
5 changed files with 140 additions and 15 deletions

View File

@ -22,7 +22,7 @@ table LevelUpdate {
} }
table Message { table Message {
ts_nanos:uint64; ts_nanos:int64;
symbol:string; symbol:string;
body:MessageBody; body:MessageBody;
} }

View File

@ -17,6 +17,7 @@ fn __take_until<'a>(tag: &'static str, input: &'a [u8]) -> IResult<&'a [u8], &'a
} }
fn parse_symbol(sym: &[u8; 8]) -> &str { fn parse_symbol(sym: &[u8; 8]) -> &str {
// TODO: Use the `jetscii` library for all that SIMD goodness
// IEX guarantees ASCII, so we're fine using an unsafe conversion // IEX guarantees ASCII, so we're fine using an unsafe conversion
let (_, sym_bytes) = __take_until(" ", &sym[..]).unwrap(); let (_, sym_bytes) = __take_until(" ", &sym[..]).unwrap();
unsafe { from_utf8_unchecked(sym_bytes) } unsafe { from_utf8_unchecked(sym_bytes) }

113
src/flatbuffers_runner.rs Normal file
View File

@ -0,0 +1,113 @@
use std::io::Write;
use std::str::from_utf8_unchecked;
use capnp::data::new_builder;
use flatbuffers::buffer_has_identifier;
use nom::{bytes::complete::take_until, IResult};
use crate::iex::{IexMessage, IexPayload};
use crate::marketdata_generated::md_shootout;
fn __take_until<'a>(tag: &'static str, input: &'a [u8]) -> IResult<&'a [u8], &'a [u8]> {
take_until(tag)(input)
}
fn parse_symbol(sym: &[u8; 8]) -> &str {
// TODO: Use the `jetscii` library for all that SIMD goodness
// IEX guarantees ASCII, so we're fine using an unsafe conversion
let (_, sym_bytes) = __take_until(" ", &sym[..]).unwrap();
unsafe { from_utf8_unchecked(sym_bytes) }
}
pub struct FlatbuffersWriter<'a> {
builder: flatbuffers::FlatBufferBuilder<'a>,
message_buffer: Vec<flatbuffers::WIPOffset<md_shootout::Message<'a>>>,
}
impl<'a> FlatbuffersWriter<'a> {
pub fn new() -> FlatbuffersWriter<'static> {
FlatbuffersWriter {
builder: flatbuffers::FlatBufferBuilder::new(),
message_buffer: Vec::new(),
}
}
pub fn serialize(&mut self, payload: &IexPayload, output: &mut Vec<u8>) {
// Because FlatBuffers can't handle nested vectors (specifically, we can't track
// both the variable-length vector of messages, and the variable-length strings
// within those messages), we have to cache the messages as they get built
// so they can be added all at once later.
for iex_msg in &payload.messages {
let msg_args = match iex_msg {
IexMessage::TradeReport(tr) => {
// The `Args` objects used are wrappers over an underlying `Builder`.
// We trust release builds to optimize out the wrapper, but would be
// interesting to know whether that's actually the case.
let trade = md_shootout::Trade::create(
&mut self.builder,
&md_shootout::TradeArgs {
price: tr.price,
size_: tr.size,
},
);
/*
let mut trade_builder = md_shootout::TradeBuilder::new(self.builder);
trade_builder.add_price(tr.price);
trade_builder.add_size_(tr.size);
let trade = trade_builder.finish();
*/
let sym_str = self.builder.create_string(parse_symbol(&tr.symbol));
Some(md_shootout::MessageArgs {
ts_nanos: tr.timestamp,
symbol: Some(sym_str),
body_type: md_shootout::MessageBody::Trade,
// Why the hell do I need the `as_union_value` function to convert to UnionWIPOffset???
body: Some(trade.as_union_value()),
})
}
IexMessage::PriceLevelUpdate(plu) => {
let level_update = md_shootout::LevelUpdate::create(
&mut self.builder,
&md_shootout::LevelUpdateArgs {
price: plu.price,
size_: plu.size,
flags: plu.event_flags,
side: if plu.msg_type == 0x38 { md_shootout::Side::Buy } else { md_shootout::Side::Sell },
},
);
let sym_str = self.builder.create_string(parse_symbol(&plu.symbol));
Some(md_shootout::MessageArgs {
ts_nanos: plu.timestamp,
symbol: Some(sym_str),
body_type: md_shootout::MessageBody::LevelUpdate,
body: Some(level_update.as_union_value()),
})
}
_ => None
};
msg_args.map(|a| {
let msg = md_shootout::Message::create(&mut self.builder, &a);
self.message_buffer.push(msg);
});
}
let messages = self.builder.create_vector(&self.message_buffer[..]);
// Now that we've finished building all the messages, time to set up the final buffer
let mut multimsg_builder = md_shootout::MultiMessageBuilder::new(&mut self.builder);
multimsg_builder.add_seq_no(payload.first_seq_no);
multimsg_builder.add_messages(messages);
let multimsg = multimsg_builder.finish();
self.builder.finish(multimsg, None);
;
output.write(self.builder.finished_data());
self.builder.reset();
self.message_buffer.clear();
}
}

View File

@ -19,6 +19,7 @@ pub mod marketdata_capnp;
pub mod marketdata_generated; // Flatbuffers pub mod marketdata_generated; // Flatbuffers
mod capnp_runner; mod capnp_runner;
mod flatbuffers_runner;
mod iex; mod iex;
mod parsers; mod parsers;
@ -47,9 +48,11 @@ fn main() {
let mut summarizer = Summarizer::default(); let mut summarizer = Summarizer::default();
let mut parser = IexParser::new(&buf[..]); let mut parser = IexParser::new(&buf[..]);
let mut output_buf: Vec<u8> = Vec::new();
/*
let mut capnp_writer = capnp_runner::CapnpWriter::new(); let mut capnp_writer = capnp_runner::CapnpWriter::new();
let capnp_reader = capnp_runner::CapnpReader::new(); let capnp_reader = capnp_runner::CapnpReader::new();
let mut output_buf = Vec::new();
for iex_payload in parser { for iex_payload in parser {
//let iex_payload = parser.next().unwrap(); //let iex_payload = parser.next().unwrap();
@ -65,6 +68,13 @@ fn main() {
assert_eq!(read_buf.pos, read_buf.inner.len()); assert_eq!(read_buf.pos, read_buf.inner.len());
dbg!(parsed_msgs); dbg!(parsed_msgs);
dbg!(summarizer); dbg!(summarizer);
*/
let mut fb_writer = flatbuffers_runner::FlatbuffersWriter::new();
for iex_payload in parser {
fb_writer.serialize(&iex_payload, &mut output_buf);
}
} }
#[derive(Debug)] #[derive(Debug)]

View File

@ -2,22 +2,23 @@
use std::mem;
use std::cmp::Ordering;
extern crate flatbuffers; extern crate flatbuffers;
use std::cmp::Ordering;
use std::mem;
use self::flatbuffers::EndianScalar; use self::flatbuffers::EndianScalar;
#[allow(unused_imports, dead_code)] #[allow(unused_imports, dead_code)]
pub mod md_shootout { pub mod md_shootout {
use std::cmp::Ordering;
use std::mem;
use std::mem; use self::flatbuffers::EndianScalar;
use std::cmp::Ordering;
extern crate flatbuffers; extern crate flatbuffers;
use self::flatbuffers::EndianScalar;
#[allow(non_camel_case_types)] #[allow(non_camel_case_types)]
#[repr(u8)] #[repr(u8)]
#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Debug)] #[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Debug)]
pub enum MessageBody { pub enum MessageBody {
@ -383,8 +384,8 @@ impl<'a> Message<'a> {
pub const VT_BODY: flatbuffers::VOffsetT = 10; pub const VT_BODY: flatbuffers::VOffsetT = 10;
#[inline] #[inline]
pub fn ts_nanos(&self) -> u64 { pub fn ts_nanos(&self) -> i64 {
self._tab.get::<u64>(Message::VT_TS_NANOS, Some(0)).unwrap() self._tab.get::<i64>(Message::VT_TS_NANOS, Some(0)).unwrap()
} }
#[inline] #[inline]
pub fn symbol(&self) -> Option<&'a str> { pub fn symbol(&self) -> Option<&'a str> {
@ -421,7 +422,7 @@ impl<'a> Message<'a> {
} }
pub struct MessageArgs<'a> { pub struct MessageArgs<'a> {
pub ts_nanos: u64, pub ts_nanos: i64,
pub symbol: Option<flatbuffers::WIPOffset<&'a str>>, pub symbol: Option<flatbuffers::WIPOffset<&'a str>>,
pub body_type: MessageBody, pub body_type: MessageBody,
pub body: Option<flatbuffers::WIPOffset<flatbuffers::UnionWIPOffset>>, pub body: Option<flatbuffers::WIPOffset<flatbuffers::UnionWIPOffset>>,
@ -443,8 +444,8 @@ pub struct MessageBuilder<'a: 'b, 'b> {
} }
impl<'a: 'b, 'b> MessageBuilder<'a, 'b> { impl<'a: 'b, 'b> MessageBuilder<'a, 'b> {
#[inline] #[inline]
pub fn add_ts_nanos(&mut self, ts_nanos: u64) { pub fn add_ts_nanos(&mut self, ts_nanos: i64) {
self.fbb_.push_slot::<u64>(Message::VT_TS_NANOS, ts_nanos, 0); self.fbb_.push_slot::<i64>(Message::VT_TS_NANOS, ts_nanos, 0);
} }
#[inline] #[inline]
pub fn add_symbol(&mut self, symbol: flatbuffers::WIPOffset<&'b str>) { pub fn add_symbol(&mut self, symbol: flatbuffers::WIPOffset<&'b str>) {