diff --git a/marketdata.fbs b/marketdata.fbs index 9f74475..e8d7d2e 100644 --- a/marketdata.fbs +++ b/marketdata.fbs @@ -22,7 +22,7 @@ table LevelUpdate { } table Message { - ts_nanos:uint64; + ts_nanos:int64; symbol:string; body:MessageBody; } diff --git a/src/capnp_runner.rs b/src/capnp_runner.rs index e71a0bc..8c98528 100644 --- a/src/capnp_runner.rs +++ b/src/capnp_runner.rs @@ -17,6 +17,7 @@ fn __take_until<'a>(tag: &'static str, input: &'a [u8]) -> IResult<&'a [u8], &'a } fn parse_symbol(sym: &[u8; 8]) -> &str { + // TODO: Use the `jetscii` library for all that SIMD goodness // IEX guarantees ASCII, so we're fine using an unsafe conversion let (_, sym_bytes) = __take_until(" ", &sym[..]).unwrap(); unsafe { from_utf8_unchecked(sym_bytes) } diff --git a/src/flatbuffers_runner.rs b/src/flatbuffers_runner.rs new file mode 100644 index 0000000..bad5f97 --- /dev/null +++ b/src/flatbuffers_runner.rs @@ -0,0 +1,113 @@ +use std::io::Write; +use std::str::from_utf8_unchecked; + +use capnp::data::new_builder; +use flatbuffers::buffer_has_identifier; +use nom::{bytes::complete::take_until, IResult}; + +use crate::iex::{IexMessage, IexPayload}; +use crate::marketdata_generated::md_shootout; + +fn __take_until<'a>(tag: &'static str, input: &'a [u8]) -> IResult<&'a [u8], &'a [u8]> { + take_until(tag)(input) +} + +fn parse_symbol(sym: &[u8; 8]) -> &str { + // TODO: Use the `jetscii` library for all that SIMD goodness + // IEX guarantees ASCII, so we're fine using an unsafe conversion + let (_, sym_bytes) = __take_until(" ", &sym[..]).unwrap(); + unsafe { from_utf8_unchecked(sym_bytes) } +} + +pub struct FlatbuffersWriter<'a> { + builder: flatbuffers::FlatBufferBuilder<'a>, + message_buffer: Vec>>, +} + +impl<'a> FlatbuffersWriter<'a> { + pub fn new() -> FlatbuffersWriter<'static> { + FlatbuffersWriter { + builder: flatbuffers::FlatBufferBuilder::new(), + message_buffer: Vec::new(), + } + } + + pub fn serialize(&mut self, payload: &IexPayload, output: &mut Vec) { + + // Because FlatBuffers can't handle nested vectors (specifically, we can't track + // both the variable-length vector of messages, and the variable-length strings + // within those messages), we have to cache the messages as they get built + // so they can be added all at once later. + + for iex_msg in &payload.messages { + let msg_args = match iex_msg { + IexMessage::TradeReport(tr) => { + // The `Args` objects used are wrappers over an underlying `Builder`. + // We trust release builds to optimize out the wrapper, but would be + // interesting to know whether that's actually the case. + let trade = md_shootout::Trade::create( + &mut self.builder, + &md_shootout::TradeArgs { + price: tr.price, + size_: tr.size, + }, + ); + + /* + let mut trade_builder = md_shootout::TradeBuilder::new(self.builder); + trade_builder.add_price(tr.price); + trade_builder.add_size_(tr.size); + let trade = trade_builder.finish(); + */ + let sym_str = self.builder.create_string(parse_symbol(&tr.symbol)); + Some(md_shootout::MessageArgs { + ts_nanos: tr.timestamp, + symbol: Some(sym_str), + body_type: md_shootout::MessageBody::Trade, + // Why the hell do I need the `as_union_value` function to convert to UnionWIPOffset??? + body: Some(trade.as_union_value()), + }) + } + IexMessage::PriceLevelUpdate(plu) => { + let level_update = md_shootout::LevelUpdate::create( + &mut self.builder, + &md_shootout::LevelUpdateArgs { + price: plu.price, + size_: plu.size, + flags: plu.event_flags, + side: if plu.msg_type == 0x38 { md_shootout::Side::Buy } else { md_shootout::Side::Sell }, + }, + ); + + let sym_str = self.builder.create_string(parse_symbol(&plu.symbol)); + Some(md_shootout::MessageArgs { + ts_nanos: plu.timestamp, + symbol: Some(sym_str), + body_type: md_shootout::MessageBody::LevelUpdate, + body: Some(level_update.as_union_value()), + }) + } + _ => None + }; + + msg_args.map(|a| { + let msg = md_shootout::Message::create(&mut self.builder, &a); + self.message_buffer.push(msg); + }); + } + + let messages = self.builder.create_vector(&self.message_buffer[..]); + + // Now that we've finished building all the messages, time to set up the final buffer + let mut multimsg_builder = md_shootout::MultiMessageBuilder::new(&mut self.builder); + multimsg_builder.add_seq_no(payload.first_seq_no); + multimsg_builder.add_messages(messages); + let multimsg = multimsg_builder.finish(); + self.builder.finish(multimsg, None); + ; + output.write(self.builder.finished_data()); + + self.builder.reset(); + self.message_buffer.clear(); + } +} diff --git a/src/main.rs b/src/main.rs index 24cc24f..bc589c7 100644 --- a/src/main.rs +++ b/src/main.rs @@ -19,6 +19,7 @@ pub mod marketdata_capnp; pub mod marketdata_generated; // Flatbuffers mod capnp_runner; +mod flatbuffers_runner; mod iex; mod parsers; @@ -47,9 +48,11 @@ fn main() { let mut summarizer = Summarizer::default(); let mut parser = IexParser::new(&buf[..]); + let mut output_buf: Vec = Vec::new(); + + /* let mut capnp_writer = capnp_runner::CapnpWriter::new(); let capnp_reader = capnp_runner::CapnpReader::new(); - let mut output_buf = Vec::new(); for iex_payload in parser { //let iex_payload = parser.next().unwrap(); @@ -65,6 +68,13 @@ fn main() { assert_eq!(read_buf.pos, read_buf.inner.len()); dbg!(parsed_msgs); dbg!(summarizer); + */ + + let mut fb_writer = flatbuffers_runner::FlatbuffersWriter::new(); + + for iex_payload in parser { + fb_writer.serialize(&iex_payload, &mut output_buf); + } } #[derive(Debug)] diff --git a/src/marketdata_generated.rs b/src/marketdata_generated.rs index e88a118..1742a77 100644 --- a/src/marketdata_generated.rs +++ b/src/marketdata_generated.rs @@ -2,22 +2,23 @@ -use std::mem; -use std::cmp::Ordering; - extern crate flatbuffers; + +use std::cmp::Ordering; +use std::mem; + use self::flatbuffers::EndianScalar; #[allow(unused_imports, dead_code)] pub mod md_shootout { + use std::cmp::Ordering; + use std::mem; - use std::mem; - use std::cmp::Ordering; + use self::flatbuffers::EndianScalar; - extern crate flatbuffers; - use self::flatbuffers::EndianScalar; + extern crate flatbuffers; -#[allow(non_camel_case_types)] + #[allow(non_camel_case_types)] #[repr(u8)] #[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Debug)] pub enum MessageBody { @@ -383,8 +384,8 @@ impl<'a> Message<'a> { pub const VT_BODY: flatbuffers::VOffsetT = 10; #[inline] - pub fn ts_nanos(&self) -> u64 { - self._tab.get::(Message::VT_TS_NANOS, Some(0)).unwrap() + pub fn ts_nanos(&self) -> i64 { + self._tab.get::(Message::VT_TS_NANOS, Some(0)).unwrap() } #[inline] pub fn symbol(&self) -> Option<&'a str> { @@ -421,7 +422,7 @@ impl<'a> Message<'a> { } pub struct MessageArgs<'a> { - pub ts_nanos: u64, + pub ts_nanos: i64, pub symbol: Option>, pub body_type: MessageBody, pub body: Option>, @@ -443,8 +444,8 @@ pub struct MessageBuilder<'a: 'b, 'b> { } impl<'a: 'b, 'b> MessageBuilder<'a, 'b> { #[inline] - pub fn add_ts_nanos(&mut self, ts_nanos: u64) { - self.fbb_.push_slot::(Message::VT_TS_NANOS, ts_nanos, 0); + pub fn add_ts_nanos(&mut self, ts_nanos: i64) { + self.fbb_.push_slot::(Message::VT_TS_NANOS, ts_nanos, 0); } #[inline] pub fn add_symbol(&mut self, symbol: flatbuffers::WIPOffset<&'b str>) {