Add all the IEX parsers

This commit is contained in:
Bradlee Speice 2019-08-24 22:49:07 -04:00
parent d13eb1c725
commit 5ce7ce740b
6 changed files with 476 additions and 39 deletions

1
Cargo.lock generated
View File

@ -105,6 +105,7 @@ dependencies = [
"flatbuffers 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)", "flatbuffers 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)",
"flatc-rust 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)", "flatc-rust 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)",
"nom 5.0.0 (registry+https://github.com/rust-lang/crates.io-index)", "nom 5.0.0 (registry+https://github.com/rust-lang/crates.io-index)",
"smallvec 0.6.10 (registry+https://github.com/rust-lang/crates.io-index)",
] ]
[[package]] [[package]]

View File

@ -9,6 +9,7 @@ capnp = "0.10.1"
clap = "2.33.0" clap = "2.33.0"
flatbuffers = "0.6.0" flatbuffers = "0.6.0"
nom = "5.0.0" nom = "5.0.0"
smallvec = "0.6.10"
[build-dependencies] [build-dependencies]
capnpc = "0.10" capnpc = "0.10"

View File

@ -7,11 +7,13 @@ fn main() {
.src_prefix("") .src_prefix("")
.file("marketdata.capnp") .file("marketdata.capnp")
.output_path("src/") .output_path("src/")
.run().expect("Unable to compile capnpc"); .run()
.expect("Unable to compile capnpc");
flatc_rust::run(flatc_rust::Args { flatc_rust::run(flatc_rust::Args {
inputs: &[Path::new("marketdata.fbs")], inputs: &[Path::new("marketdata.fbs")],
out_dir: Path::new("src/"), out_dir: Path::new("src/"),
..Default::default() ..Default::default()
}).expect("Unable to compile flatc"); })
} .expect("Unable to compile flatc");
}

418
src/iex.rs Normal file
View File

@ -0,0 +1,418 @@
use std::convert::TryInto;
use nom::{bytes::complete::take, IResult, number::complete::*, sequence::tuple};
#[derive(Debug)]
struct IexPayload {
version: u8,
_reserved: u8,
proto_id: u16,
channel_id: u32,
session_id: u32,
payload_len: u16,
msg_count: u16,
stream_offset: u64,
first_seq_no: u64,
send_time: i64,
messages: smallvec::SmallVec<[IexMessage; 8]>,
}
#[derive(Debug)]
enum IexMessage {
SystemEvent(SystemEvent),
SecurityDirectory(SecurityDirectory),
TradingStatus(TradingStatus),
OperationalHaltStatus(OperationalHaltStatus),
ShortSalePriceTest(ShortSalePriceTest),
SecurityEvent(SecurityEvent),
PriceLevelUpdate(PriceLevelUpdate),
TradeReport(TradeReport),
OfficialPrice(OfficialPrice),
TradeBreak(TradeBreak),
AuctionInformation(AuctionInformation),
}
macro_rules! parse_msg {
($input:ident, $msg_type:ident) => {{
let (rem, msg) = $msg_type::parse($input)?;
Ok((rem, IexMessage::$msg_type(msg)))
}};
}
impl IexMessage {
// TODO: Benchmark a version where we cast a packed struct instead of parsing
fn parse(input: &[u8]) -> IResult<&[u8], IexMessage> {
let msg_type = input[0];
match msg_type {
0x53 => parse_msg!(input, SystemEvent),
0x44 => parse_msg!(input, SecurityDirectory),
0x48 => parse_msg!(input, TradingStatus),
0x4f => parse_msg!(input, OperationalHaltStatus),
0x50 => parse_msg!(input, ShortSalePriceTest),
0x45 => parse_msg!(input, SecurityEvent),
// Why the "match multiple" looks like bitwise-OR is beyond me.
0x38 | 0x35 => parse_msg!(input, PriceLevelUpdate),
0x54 => parse_msg!(input, TradeReport),
0x58 => parse_msg!(input, OfficialPrice),
0x42 => parse_msg!(input, TradeBreak),
_ => panic!("Unrecognized message type"),
}
}
}
#[derive(Debug)]
struct SystemEvent {
msg_type: u8,
system_event: u8,
timestamp: i64,
}
impl SystemEvent {
fn parse(input: &[u8]) -> IResult<&[u8], SystemEvent> {
let (rem, (msg_type, system_event, timestamp)) = tuple((le_u8, le_u8, le_i64))(input)?;
Ok((
rem,
SystemEvent {
msg_type,
system_event,
timestamp,
},
))
}
}
#[derive(Debug)]
struct SecurityDirectory {
msg_type: u8,
flags: u8,
timestamp: i64,
symbol: [u8; 8],
lot_size: u32,
previous_closing: u64,
luld_tier: u8,
}
impl SecurityDirectory {
fn parse(input: &[u8]) -> IResult<&[u8], SecurityDirectory> {
let (rem, (msg_type, flags, timestamp, symbol, lot_size, previous_closing, luld_tier)) =
tuple((le_u8, le_u8, le_i64, take(8usize), le_u32, le_u64, le_u8))(input)?;
Ok((
rem,
SecurityDirectory {
msg_type,
flags,
timestamp,
symbol: symbol.try_into().unwrap(),
lot_size,
previous_closing,
luld_tier,
},
))
}
}
#[derive(Debug)]
struct TradingStatus {
msg_type: u8,
trading_status: u8,
timestamp: i64,
symbol: [u8; 8],
reason: [u8; 4],
}
impl TradingStatus {
fn parse(input: &[u8]) -> IResult<&[u8], TradingStatus> {
let (rem, (msg_type, trading_status, timestamp, symbol, reason)) =
tuple((le_u8, le_u8, le_i64, take(8usize), take(4usize)))(input)?;
Ok((
rem,
TradingStatus {
msg_type,
trading_status,
timestamp,
symbol: symbol.try_into().unwrap(),
reason: reason.try_into().unwrap(),
},
))
}
}
#[derive(Debug)]
struct OperationalHaltStatus {
msg_type: u8,
halt_status: u8,
timestamp: i64,
symbol: [u8; 8],
}
impl OperationalHaltStatus {
fn parse(input: &[u8]) -> IResult<&[u8], OperationalHaltStatus> {
let (rem, (msg_type, halt_status, timestamp, symbol)) =
tuple((le_u8, le_u8, le_i64, take(8usize)))(input)?;
Ok((
rem,
OperationalHaltStatus {
msg_type,
halt_status,
timestamp,
symbol: symbol.try_into().unwrap(),
},
))
}
}
#[derive(Debug)]
struct ShortSalePriceTest {
msg_type: u8,
sspt_status: u8,
timestamp: i64,
symbol: [u8; 8],
detail: u8,
}
impl ShortSalePriceTest {
fn parse(input: &[u8]) -> IResult<&[u8], ShortSalePriceTest> {
let (rem, (msg_type, sspt_status, timestamp, symbol, detail)) =
tuple((le_u8, le_u8, le_i64, take(8usize), le_u8))(input)?;
Ok((
rem,
ShortSalePriceTest {
msg_type,
sspt_status,
timestamp,
symbol: symbol.try_into().unwrap(),
detail,
},
))
}
}
#[derive(Debug)]
struct SecurityEvent {
msg_type: u8,
security_event: u8,
timestamp: i64,
symbol: [u8; 8],
}
impl SecurityEvent {
fn parse(input: &[u8]) -> IResult<&[u8], SecurityEvent> {
let (rem, (msg_type, security_event, timestamp, symbol)) =
tuple((le_u8, le_u8, le_i64, take(8usize)))(input)?;
Ok((
rem,
SecurityEvent {
msg_type,
security_event,
timestamp,
symbol: symbol.try_into().unwrap(),
},
))
}
}
#[derive(Debug)]
struct PriceLevelUpdate {
msg_type: u8,
event_flags: u8,
timestamp: i64,
symbol: [u8; 8],
size: u32,
price: u64,
}
impl PriceLevelUpdate {
fn parse(input: &[u8]) -> IResult<&[u8], PriceLevelUpdate> {
let (rem, (msg_type, event_flags, timestamp, symbol, size, price)) =
tuple((le_u8, le_u8, le_i64, take(8usize), le_u32, le_u64))(input)?;
Ok((
rem,
PriceLevelUpdate {
msg_type,
event_flags,
timestamp,
symbol: symbol.try_into().unwrap(),
size,
price,
},
))
}
}
#[derive(Debug)]
struct TradeReport {
msg_type: u8,
sale_condition: u8,
timestamp: i64,
symbol: [u8; 8],
size: u32,
price: u64,
trade_id: u64,
}
impl TradeReport {
fn parse(input: &[u8]) -> IResult<&[u8], TradeReport> {
let (rem, (msg_type, sale_condition, timestamp, symbol, size, price, trade_id)) =
tuple((le_u8, le_u8, le_i64, take(8usize), le_u32, le_u64, le_u64))(input)?;
Ok((
rem,
TradeReport {
msg_type,
sale_condition,
timestamp,
symbol: symbol.try_into().unwrap(),
size,
price,
trade_id,
},
))
}
}
#[derive(Debug)]
struct OfficialPrice {
msg_type: u8,
price_type: u8,
timestamp: i64,
symbol: [u8; 8],
official_price: u64,
}
impl OfficialPrice {
fn parse(input: &[u8]) -> IResult<&[u8], OfficialPrice> {
let (rem, (msg_type, price_type, timestamp, symbol, official_price)) =
tuple((le_u8, le_u8, le_i64, take(8usize), le_u64))(input)?;
Ok((
rem,
OfficialPrice {
msg_type,
price_type,
timestamp,
symbol: symbol.try_into().unwrap(),
official_price,
},
))
}
}
#[derive(Debug)]
struct TradeBreak {
msg_type: u8,
sale_condition: u8,
timestamp: i64,
symbol: [u8; 8],
size: u32,
price: u64,
trade_id: u64,
}
impl TradeBreak {
fn parse(input: &[u8]) -> IResult<&[u8], TradeBreak> {
let (rem, (msg_type, sale_condition, timestamp, symbol, size, price, trade_id)) =
tuple((le_u8, le_u8, le_i64, take(8usize), le_u32, le_u64, le_u64))(input)?;
Ok((
rem,
TradeBreak {
msg_type,
sale_condition,
timestamp,
symbol: symbol.try_into().unwrap(),
size,
price,
trade_id,
},
))
}
}
#[derive(Debug)]
struct AuctionInformation {
msg_type: u8,
auction_type: u8,
timestamp: i64,
symbol: [u8; 8],
paired_shares: u32,
reference_price: u64,
indicative_clearing_price: u64,
imbalance_shares: u32,
imbalance_side: u8,
extension_number: u8,
scheduled_auction: u32,
auction_book_clearing_price: u64,
collar_reference_price: u64,
lower_auction_collar: u64,
upper_auction_collar: u64,
}
impl AuctionInformation {
fn parse(input: &[u8]) -> IResult<&[u8], AuctionInformation> {
// Dear Lord, why?
let (
rem,
(
msg_type,
auction_type,
timestamp,
symbol,
paired_shares,
reference_price,
indicative_clearing_price,
imbalance_shares,
imbalance_side,
extension_number,
scheduled_auction,
auction_book_clearing_price,
collar_reference_price,
lower_auction_collar,
upper_auction_collar,
),
) = tuple((
le_u8,
le_u8,
le_i64,
take(8usize),
le_u32,
le_u64,
le_u64,
le_u32,
le_u8,
le_u8,
le_u32,
le_u64,
le_u64,
le_u64,
le_u64,
))(input)?;
Ok((
rem,
AuctionInformation {
msg_type,
auction_type,
timestamp,
symbol: symbol.try_into().unwrap(),
paired_shares,
reference_price,
indicative_clearing_price,
imbalance_shares,
imbalance_side,
extension_number,
scheduled_auction,
auction_book_clearing_price,
collar_reference_price,
lower_auction_collar,
upper_auction_collar,
},
))
}
}

View File

@ -3,8 +3,9 @@ use std::io::Read;
use std::path::Path; use std::path::Path;
use clap::{App, Arg}; use clap::{App, Arg};
use nom::sequence::tuple;
use pcap_ng::Block; use parsers::Block;
// Cap'n'Proto and Flatbuffers typically ask that you generate code on the fly to match // Cap'n'Proto and Flatbuffers typically ask that you generate code on the fly to match
// the schemas. For purposes of auto-complete and easy browsing in the repository, // the schemas. For purposes of auto-complete and easy browsing in the repository,
@ -13,7 +14,8 @@ pub mod marketdata_capnp;
#[allow(unused_imports)] #[allow(unused_imports)]
pub mod marketdata_generated; // Flatbuffers pub mod marketdata_generated; // Flatbuffers
mod pcap_ng; mod iex;
mod parsers;
fn main() { fn main() {
let matches = App::new("Marketdata Shootout") let matches = App::new("Marketdata Shootout")
@ -34,7 +36,7 @@ fn main() {
file.read_to_end(&mut buf).expect(&format!("Unable to read file={}", path.display())); file.read_to_end(&mut buf).expect(&format!("Unable to read file={}", path.display()));
let mut rem = &buf[..]; let mut rem = &buf[..];
while let Ok((unparsed, block)) = pcap_ng::read_block(rem) { while let Ok((unparsed, block)) = parsers::read_block(rem) {
let offset = (unparsed.as_ptr() as usize) - (buf.as_ptr() as usize); let offset = (unparsed.as_ptr() as usize) - (buf.as_ptr() as usize);
rem = unparsed; rem = unparsed;
match block { match block {

View File

@ -1,16 +1,11 @@
use std::convert::TryInto;
use std::mem::size_of; use std::mem::size_of;
use nom::{ use nom::{
branch::alt, branch::alt, bytes::complete::tag, bytes::complete::take, IResult, number::complete::*,
bytes::complete::tag,
bytes::complete::take,
IResult,
number::complete::*,
sequence::tuple, sequence::tuple,
}; };
use crate::pcap_ng::Block::EnhancedPacket;
pub enum Block<'a> { pub enum Block<'a> {
SectionHeader(SectionHeaderBlock), SectionHeader(SectionHeaderBlock),
InterfaceDescription(InterfaceDescriptionBlock), InterfaceDescription(InterfaceDescriptionBlock),
@ -21,49 +16,43 @@ pub fn read_block(input: &[u8]) -> IResult<&[u8], Block> {
alt(( alt((
section_header_block, section_header_block,
interface_description_block, interface_description_block,
enhanced_packet_block enhanced_packet_block,
))(input) ))(input)
} }
#[derive(Debug)] #[derive(Debug)]
pub struct SectionHeaderBlock { pub struct SectionHeaderBlock {
block_len: u32 block_len: u32,
} }
const SECTION_HEADER: [u8; 4] = [0x0a, 0x0d, 0x0d, 0x0a]; const SECTION_HEADER: [u8; 4] = [0x0a, 0x0d, 0x0d, 0x0a];
pub fn section_header_block(input: &[u8]) -> IResult<&[u8], Block> { pub fn section_header_block(input: &[u8]) -> IResult<&[u8], Block> {
let header_len = 12; let header_len = 12;
let (rem, (_, block_len, _)) = tuple(( let (rem, (_, block_len, _)) =
tag(SECTION_HEADER), tuple((tag(SECTION_HEADER), le_u32, tag([0x4d, 0x3c, 0x2b, 0x1a])))(input)?;
le_u32,
tag([0x4d, 0x3c, 0x2b, 0x1a])
))(input)?;
take(block_len - header_len)(rem) take(block_len - header_len)(rem)
.map(|i| (i.0, Block::SectionHeader(SectionHeaderBlock { .map(|i| (i.0, Block::SectionHeader(SectionHeaderBlock { block_len })))
block_len
})))
} }
#[derive(Debug)] #[derive(Debug)]
pub struct InterfaceDescriptionBlock { pub struct InterfaceDescriptionBlock {
block_len: u32 block_len: u32,
} }
const INTERFACE_DESCRIPTION: [u8; 4] = [0x01, 0x00, 0x00, 0x00]; const INTERFACE_DESCRIPTION: [u8; 4] = [0x01, 0x00, 0x00, 0x00];
pub fn interface_description_block(input: &[u8]) -> IResult<&[u8], Block> { pub fn interface_description_block(input: &[u8]) -> IResult<&[u8], Block> {
let header_len = 8; let header_len = 8;
let (rem, (_, block_len)) = tuple(( let (rem, (_, block_len)) = tuple((tag(INTERFACE_DESCRIPTION), le_u32))(input)?;
tag(INTERFACE_DESCRIPTION),
le_u32
))(input)?;
take(block_len - header_len)(rem) take(block_len - header_len)(rem).map(|i| {
.map(|i| (i.0, Block::InterfaceDescription(InterfaceDescriptionBlock { (
block_len i.0,
}))) Block::InterfaceDescription(InterfaceDescriptionBlock { block_len }),
)
})
} }
pub struct EnhancedPacketBlock<'a> { pub struct EnhancedPacketBlock<'a> {
@ -82,7 +71,7 @@ pub fn enhanced_packet_block(input: &[u8]) -> IResult<&[u8], Block> {
le_u32, le_u32,
le_u32, le_u32,
le_u32, le_u32,
le_u32 le_u32,
))(input)?; ))(input)?;
let (rem, packet_data) = take(captured_len)(rem)?; let (rem, packet_data) = take(captured_len)(rem)?;
@ -91,9 +80,33 @@ pub fn enhanced_packet_block(input: &[u8]) -> IResult<&[u8], Block> {
// seem to respect this // seem to respect this
//let packet_total_len = (captured_len + 3) / 4 * 4; //let packet_total_len = (captured_len + 3) / 4 * 4;
take(block_len - header_len - captured_len)(rem) take(block_len - header_len - captured_len)(rem).map(|i| {
.map(|i| (i.0, Block::EnhancedPacket(EnhancedPacketBlock { (
block_len, i.0,
packet_data, Block::EnhancedPacket(EnhancedPacketBlock {
}))) block_len,
} packet_data,
}),
)
})
}
pub fn extract_iex_data(input: &[u8]) -> IResult<&[u8], &[u8]> {
// Get the IEX payload out of each packet
// First step is the ethernet frame
let (rem, (_, _, _)) = tuple((take(6usize), take(6usize), tag([0x08, 0x00])))(input)?;
// Then the IP header
let header_words = rem[0] & 0x0F;
let header_len = header_words as u32 * 4;
let (udp_data, _header_data) = take(header_len)(rem)?;
// And finally, extract out of UDP
let (rem, (_, _, udp_len, _)) = tuple((be_u16, be_u16, be_u16, be_u16))(udp_data)?;
let udp_header_len = 8;
let (rem, iex_data) = take(udp_len - udp_header_len)(rem)?;
debug_assert!(rem.len() == 0);
Ok((rem, iex_data))
}