From 5ce7ce740b76560409b0153c8c64cab70763302b Mon Sep 17 00:00:00 2001 From: Bradlee Speice Date: Sat, 24 Aug 2019 22:49:07 -0400 Subject: [PATCH] Add all the IEX parsers --- Cargo.lock | 1 + Cargo.toml | 1 + build.rs | 8 +- src/iex.rs | 418 +++++++++++++++++++++++++++++++++ src/main.rs | 8 +- src/{pcap_ng.rs => parsers.rs} | 79 ++++--- 6 files changed, 476 insertions(+), 39 deletions(-) create mode 100644 src/iex.rs rename src/{pcap_ng.rs => parsers.rs} (51%) diff --git a/Cargo.lock b/Cargo.lock index 7396fe3..55771fe 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -105,6 +105,7 @@ dependencies = [ "flatbuffers 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)", "flatc-rust 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)", "nom 5.0.0 (registry+https://github.com/rust-lang/crates.io-index)", + "smallvec 0.6.10 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index e1fd6fc..f2eef64 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,6 +9,7 @@ capnp = "0.10.1" clap = "2.33.0" flatbuffers = "0.6.0" nom = "5.0.0" +smallvec = "0.6.10" [build-dependencies] capnpc = "0.10" diff --git a/build.rs b/build.rs index d231930..42af504 100644 --- a/build.rs +++ b/build.rs @@ -7,11 +7,13 @@ fn main() { .src_prefix("") .file("marketdata.capnp") .output_path("src/") - .run().expect("Unable to compile capnpc"); + .run() + .expect("Unable to compile capnpc"); flatc_rust::run(flatc_rust::Args { inputs: &[Path::new("marketdata.fbs")], out_dir: Path::new("src/"), ..Default::default() - }).expect("Unable to compile flatc"); -} \ No newline at end of file + }) + .expect("Unable to compile flatc"); +} diff --git a/src/iex.rs b/src/iex.rs new file mode 100644 index 0000000..2259ce0 --- /dev/null +++ b/src/iex.rs @@ -0,0 +1,418 @@ +use std::convert::TryInto; + +use nom::{bytes::complete::take, IResult, number::complete::*, sequence::tuple}; + +#[derive(Debug)] +struct IexPayload { + version: u8, + _reserved: u8, + proto_id: u16, + channel_id: u32, + session_id: u32, + payload_len: u16, + msg_count: u16, + stream_offset: u64, + first_seq_no: u64, + send_time: i64, + messages: smallvec::SmallVec<[IexMessage; 8]>, +} + +#[derive(Debug)] +enum IexMessage { + SystemEvent(SystemEvent), + SecurityDirectory(SecurityDirectory), + TradingStatus(TradingStatus), + OperationalHaltStatus(OperationalHaltStatus), + ShortSalePriceTest(ShortSalePriceTest), + SecurityEvent(SecurityEvent), + PriceLevelUpdate(PriceLevelUpdate), + TradeReport(TradeReport), + OfficialPrice(OfficialPrice), + TradeBreak(TradeBreak), + AuctionInformation(AuctionInformation), +} + +macro_rules! parse_msg { + ($input:ident, $msg_type:ident) => {{ + let (rem, msg) = $msg_type::parse($input)?; + Ok((rem, IexMessage::$msg_type(msg))) + }}; +} + +impl IexMessage { + // TODO: Benchmark a version where we cast a packed struct instead of parsing + fn parse(input: &[u8]) -> IResult<&[u8], IexMessage> { + let msg_type = input[0]; + match msg_type { + 0x53 => parse_msg!(input, SystemEvent), + 0x44 => parse_msg!(input, SecurityDirectory), + 0x48 => parse_msg!(input, TradingStatus), + 0x4f => parse_msg!(input, OperationalHaltStatus), + 0x50 => parse_msg!(input, ShortSalePriceTest), + 0x45 => parse_msg!(input, SecurityEvent), + // Why the "match multiple" looks like bitwise-OR is beyond me. + 0x38 | 0x35 => parse_msg!(input, PriceLevelUpdate), + 0x54 => parse_msg!(input, TradeReport), + 0x58 => parse_msg!(input, OfficialPrice), + 0x42 => parse_msg!(input, TradeBreak), + _ => panic!("Unrecognized message type"), + } + } +} + +#[derive(Debug)] +struct SystemEvent { + msg_type: u8, + system_event: u8, + timestamp: i64, +} + +impl SystemEvent { + fn parse(input: &[u8]) -> IResult<&[u8], SystemEvent> { + let (rem, (msg_type, system_event, timestamp)) = tuple((le_u8, le_u8, le_i64))(input)?; + + Ok(( + rem, + SystemEvent { + msg_type, + system_event, + timestamp, + }, + )) + } +} + +#[derive(Debug)] +struct SecurityDirectory { + msg_type: u8, + flags: u8, + timestamp: i64, + symbol: [u8; 8], + lot_size: u32, + previous_closing: u64, + luld_tier: u8, +} + +impl SecurityDirectory { + fn parse(input: &[u8]) -> IResult<&[u8], SecurityDirectory> { + let (rem, (msg_type, flags, timestamp, symbol, lot_size, previous_closing, luld_tier)) = + tuple((le_u8, le_u8, le_i64, take(8usize), le_u32, le_u64, le_u8))(input)?; + + Ok(( + rem, + SecurityDirectory { + msg_type, + flags, + timestamp, + symbol: symbol.try_into().unwrap(), + lot_size, + previous_closing, + luld_tier, + }, + )) + } +} + +#[derive(Debug)] +struct TradingStatus { + msg_type: u8, + trading_status: u8, + timestamp: i64, + symbol: [u8; 8], + reason: [u8; 4], +} + +impl TradingStatus { + fn parse(input: &[u8]) -> IResult<&[u8], TradingStatus> { + let (rem, (msg_type, trading_status, timestamp, symbol, reason)) = + tuple((le_u8, le_u8, le_i64, take(8usize), take(4usize)))(input)?; + + Ok(( + rem, + TradingStatus { + msg_type, + trading_status, + timestamp, + symbol: symbol.try_into().unwrap(), + reason: reason.try_into().unwrap(), + }, + )) + } +} + +#[derive(Debug)] +struct OperationalHaltStatus { + msg_type: u8, + halt_status: u8, + timestamp: i64, + symbol: [u8; 8], +} + +impl OperationalHaltStatus { + fn parse(input: &[u8]) -> IResult<&[u8], OperationalHaltStatus> { + let (rem, (msg_type, halt_status, timestamp, symbol)) = + tuple((le_u8, le_u8, le_i64, take(8usize)))(input)?; + + Ok(( + rem, + OperationalHaltStatus { + msg_type, + halt_status, + timestamp, + symbol: symbol.try_into().unwrap(), + }, + )) + } +} + +#[derive(Debug)] +struct ShortSalePriceTest { + msg_type: u8, + sspt_status: u8, + timestamp: i64, + symbol: [u8; 8], + detail: u8, +} + +impl ShortSalePriceTest { + fn parse(input: &[u8]) -> IResult<&[u8], ShortSalePriceTest> { + let (rem, (msg_type, sspt_status, timestamp, symbol, detail)) = + tuple((le_u8, le_u8, le_i64, take(8usize), le_u8))(input)?; + + Ok(( + rem, + ShortSalePriceTest { + msg_type, + sspt_status, + timestamp, + symbol: symbol.try_into().unwrap(), + detail, + }, + )) + } +} + +#[derive(Debug)] +struct SecurityEvent { + msg_type: u8, + security_event: u8, + timestamp: i64, + symbol: [u8; 8], +} + +impl SecurityEvent { + fn parse(input: &[u8]) -> IResult<&[u8], SecurityEvent> { + let (rem, (msg_type, security_event, timestamp, symbol)) = + tuple((le_u8, le_u8, le_i64, take(8usize)))(input)?; + + Ok(( + rem, + SecurityEvent { + msg_type, + security_event, + timestamp, + symbol: symbol.try_into().unwrap(), + }, + )) + } +} + +#[derive(Debug)] +struct PriceLevelUpdate { + msg_type: u8, + event_flags: u8, + timestamp: i64, + symbol: [u8; 8], + size: u32, + price: u64, +} + +impl PriceLevelUpdate { + fn parse(input: &[u8]) -> IResult<&[u8], PriceLevelUpdate> { + let (rem, (msg_type, event_flags, timestamp, symbol, size, price)) = + tuple((le_u8, le_u8, le_i64, take(8usize), le_u32, le_u64))(input)?; + + Ok(( + rem, + PriceLevelUpdate { + msg_type, + event_flags, + timestamp, + symbol: symbol.try_into().unwrap(), + size, + price, + }, + )) + } +} + +#[derive(Debug)] +struct TradeReport { + msg_type: u8, + sale_condition: u8, + timestamp: i64, + symbol: [u8; 8], + size: u32, + price: u64, + trade_id: u64, +} + +impl TradeReport { + fn parse(input: &[u8]) -> IResult<&[u8], TradeReport> { + let (rem, (msg_type, sale_condition, timestamp, symbol, size, price, trade_id)) = + tuple((le_u8, le_u8, le_i64, take(8usize), le_u32, le_u64, le_u64))(input)?; + + Ok(( + rem, + TradeReport { + msg_type, + sale_condition, + timestamp, + symbol: symbol.try_into().unwrap(), + size, + price, + trade_id, + }, + )) + } +} + +#[derive(Debug)] +struct OfficialPrice { + msg_type: u8, + price_type: u8, + timestamp: i64, + symbol: [u8; 8], + official_price: u64, +} + +impl OfficialPrice { + fn parse(input: &[u8]) -> IResult<&[u8], OfficialPrice> { + let (rem, (msg_type, price_type, timestamp, symbol, official_price)) = + tuple((le_u8, le_u8, le_i64, take(8usize), le_u64))(input)?; + + Ok(( + rem, + OfficialPrice { + msg_type, + price_type, + timestamp, + symbol: symbol.try_into().unwrap(), + official_price, + }, + )) + } +} + +#[derive(Debug)] +struct TradeBreak { + msg_type: u8, + sale_condition: u8, + timestamp: i64, + symbol: [u8; 8], + size: u32, + price: u64, + trade_id: u64, +} + +impl TradeBreak { + fn parse(input: &[u8]) -> IResult<&[u8], TradeBreak> { + let (rem, (msg_type, sale_condition, timestamp, symbol, size, price, trade_id)) = + tuple((le_u8, le_u8, le_i64, take(8usize), le_u32, le_u64, le_u64))(input)?; + + Ok(( + rem, + TradeBreak { + msg_type, + sale_condition, + timestamp, + symbol: symbol.try_into().unwrap(), + size, + price, + trade_id, + }, + )) + } +} + +#[derive(Debug)] +struct AuctionInformation { + msg_type: u8, + auction_type: u8, + timestamp: i64, + symbol: [u8; 8], + paired_shares: u32, + reference_price: u64, + indicative_clearing_price: u64, + imbalance_shares: u32, + imbalance_side: u8, + extension_number: u8, + scheduled_auction: u32, + auction_book_clearing_price: u64, + collar_reference_price: u64, + lower_auction_collar: u64, + upper_auction_collar: u64, +} + +impl AuctionInformation { + fn parse(input: &[u8]) -> IResult<&[u8], AuctionInformation> { + // Dear Lord, why? + let ( + rem, + ( + msg_type, + auction_type, + timestamp, + symbol, + paired_shares, + reference_price, + indicative_clearing_price, + imbalance_shares, + imbalance_side, + extension_number, + scheduled_auction, + auction_book_clearing_price, + collar_reference_price, + lower_auction_collar, + upper_auction_collar, + ), + ) = tuple(( + le_u8, + le_u8, + le_i64, + take(8usize), + le_u32, + le_u64, + le_u64, + le_u32, + le_u8, + le_u8, + le_u32, + le_u64, + le_u64, + le_u64, + le_u64, + ))(input)?; + + Ok(( + rem, + AuctionInformation { + msg_type, + auction_type, + timestamp, + symbol: symbol.try_into().unwrap(), + paired_shares, + reference_price, + indicative_clearing_price, + imbalance_shares, + imbalance_side, + extension_number, + scheduled_auction, + auction_book_clearing_price, + collar_reference_price, + lower_auction_collar, + upper_auction_collar, + }, + )) + } +} diff --git a/src/main.rs b/src/main.rs index d9c4aed..ae20f9e 100644 --- a/src/main.rs +++ b/src/main.rs @@ -3,8 +3,9 @@ use std::io::Read; use std::path::Path; use clap::{App, Arg}; +use nom::sequence::tuple; -use pcap_ng::Block; +use parsers::Block; // Cap'n'Proto and Flatbuffers typically ask that you generate code on the fly to match // the schemas. For purposes of auto-complete and easy browsing in the repository, @@ -13,7 +14,8 @@ pub mod marketdata_capnp; #[allow(unused_imports)] pub mod marketdata_generated; // Flatbuffers -mod pcap_ng; +mod iex; +mod parsers; fn main() { let matches = App::new("Marketdata Shootout") @@ -34,7 +36,7 @@ fn main() { file.read_to_end(&mut buf).expect(&format!("Unable to read file={}", path.display())); let mut rem = &buf[..]; - while let Ok((unparsed, block)) = pcap_ng::read_block(rem) { + while let Ok((unparsed, block)) = parsers::read_block(rem) { let offset = (unparsed.as_ptr() as usize) - (buf.as_ptr() as usize); rem = unparsed; match block { diff --git a/src/pcap_ng.rs b/src/parsers.rs similarity index 51% rename from src/pcap_ng.rs rename to src/parsers.rs index 5d75520..7896631 100644 --- a/src/pcap_ng.rs +++ b/src/parsers.rs @@ -1,16 +1,11 @@ +use std::convert::TryInto; use std::mem::size_of; use nom::{ - branch::alt, - bytes::complete::tag, - bytes::complete::take, - IResult, - number::complete::*, + branch::alt, bytes::complete::tag, bytes::complete::take, IResult, number::complete::*, sequence::tuple, }; -use crate::pcap_ng::Block::EnhancedPacket; - pub enum Block<'a> { SectionHeader(SectionHeaderBlock), InterfaceDescription(InterfaceDescriptionBlock), @@ -21,49 +16,43 @@ pub fn read_block(input: &[u8]) -> IResult<&[u8], Block> { alt(( section_header_block, interface_description_block, - enhanced_packet_block + enhanced_packet_block, ))(input) } #[derive(Debug)] pub struct SectionHeaderBlock { - block_len: u32 + block_len: u32, } const SECTION_HEADER: [u8; 4] = [0x0a, 0x0d, 0x0d, 0x0a]; pub fn section_header_block(input: &[u8]) -> IResult<&[u8], Block> { let header_len = 12; - let (rem, (_, block_len, _)) = tuple(( - tag(SECTION_HEADER), - le_u32, - tag([0x4d, 0x3c, 0x2b, 0x1a]) - ))(input)?; + let (rem, (_, block_len, _)) = + tuple((tag(SECTION_HEADER), le_u32, tag([0x4d, 0x3c, 0x2b, 0x1a])))(input)?; take(block_len - header_len)(rem) - .map(|i| (i.0, Block::SectionHeader(SectionHeaderBlock { - block_len - }))) + .map(|i| (i.0, Block::SectionHeader(SectionHeaderBlock { block_len }))) } #[derive(Debug)] pub struct InterfaceDescriptionBlock { - block_len: u32 + block_len: u32, } const INTERFACE_DESCRIPTION: [u8; 4] = [0x01, 0x00, 0x00, 0x00]; pub fn interface_description_block(input: &[u8]) -> IResult<&[u8], Block> { let header_len = 8; - let (rem, (_, block_len)) = tuple(( - tag(INTERFACE_DESCRIPTION), - le_u32 - ))(input)?; + let (rem, (_, block_len)) = tuple((tag(INTERFACE_DESCRIPTION), le_u32))(input)?; - take(block_len - header_len)(rem) - .map(|i| (i.0, Block::InterfaceDescription(InterfaceDescriptionBlock { - block_len - }))) + take(block_len - header_len)(rem).map(|i| { + ( + i.0, + Block::InterfaceDescription(InterfaceDescriptionBlock { block_len }), + ) + }) } pub struct EnhancedPacketBlock<'a> { @@ -82,7 +71,7 @@ pub fn enhanced_packet_block(input: &[u8]) -> IResult<&[u8], Block> { le_u32, le_u32, le_u32, - le_u32 + le_u32, ))(input)?; let (rem, packet_data) = take(captured_len)(rem)?; @@ -91,9 +80,33 @@ pub fn enhanced_packet_block(input: &[u8]) -> IResult<&[u8], Block> { // seem to respect this //let packet_total_len = (captured_len + 3) / 4 * 4; - take(block_len - header_len - captured_len)(rem) - .map(|i| (i.0, Block::EnhancedPacket(EnhancedPacketBlock { - block_len, - packet_data, - }))) -} \ No newline at end of file + take(block_len - header_len - captured_len)(rem).map(|i| { + ( + i.0, + Block::EnhancedPacket(EnhancedPacketBlock { + block_len, + packet_data, + }), + ) + }) +} + +pub fn extract_iex_data(input: &[u8]) -> IResult<&[u8], &[u8]> { + // Get the IEX payload out of each packet + // First step is the ethernet frame + let (rem, (_, _, _)) = tuple((take(6usize), take(6usize), tag([0x08, 0x00])))(input)?; + + // Then the IP header + let header_words = rem[0] & 0x0F; + let header_len = header_words as u32 * 4; + let (udp_data, _header_data) = take(header_len)(rem)?; + + // And finally, extract out of UDP + let (rem, (_, _, udp_len, _)) = tuple((be_u16, be_u16, be_u16, be_u16))(udp_data)?; + + let udp_header_len = 8; + let (rem, iex_data) = take(udp_len - udp_header_len)(rem)?; + debug_assert!(rem.len() == 0); + + Ok((rem, iex_data)) +}