diff --git a/src/iex.rs b/src/iex.rs index 2259ce0..92436cd 100644 --- a/src/iex.rs +++ b/src/iex.rs @@ -2,8 +2,42 @@ use std::convert::TryInto; use nom::{bytes::complete::take, IResult, number::complete::*, sequence::tuple}; +use crate::parsers::{Block, extract_iex_data, read_block}; + +pub struct IexParser<'a> { + pcap_buffer: &'a [u8] +} + +impl<'a> IexParser<'a> { + pub fn new(pcap_buffer: &[u8]) -> IexParser { + IexParser { pcap_buffer } + } +} + +impl<'a> Iterator for IexParser<'a> { + type Item = IexPayload; + + fn next(&mut self) -> Option { + let mut buffer = self.pcap_buffer; + while let Ok((rem, block)) = read_block(buffer) { + self.pcap_buffer = rem; + buffer = rem; + match block { + Block::EnhancedPacket(e) => { + let (_, iex_data) = extract_iex_data(e.packet_data).unwrap(); + let (_, payload) = IexPayload::parse(iex_data).unwrap(); + return Some(payload); + } + _ => () + } + } + + None + } +} + #[derive(Debug)] -struct IexPayload { +pub struct IexPayload { version: u8, _reserved: u8, proto_id: u16, @@ -17,8 +51,40 @@ struct IexPayload { messages: smallvec::SmallVec<[IexMessage; 8]>, } +impl IexPayload { + pub fn parse(payload: &[u8]) -> IResult<&[u8], IexPayload> { + let (mut rem, (version, _reserved, proto_id, channel_id, session_id, payload_len, msg_count, stream_offset, first_seq_no, send_time)) = + tuple((le_u8, le_u8, le_u16, le_u32, le_u32, le_u16, le_u16, le_u64, le_u64, le_i64))(payload)?; + + let mut messages = smallvec::SmallVec::new(); + for _i in 0..msg_count { + let (_rem, msg) = IexMessage::parse(rem)?; + rem = _rem; + messages.push(msg); + } + + debug_assert!(rem.len() == 0); + Ok(( + rem, + IexPayload { + version, + _reserved, + proto_id, + channel_id, + session_id, + payload_len, + msg_count, + stream_offset, + first_seq_no, + send_time, + messages, + } + )) + } +} + #[derive(Debug)] -enum IexMessage { +pub enum IexMessage { SystemEvent(SystemEvent), SecurityDirectory(SecurityDirectory), TradingStatus(TradingStatus), @@ -33,35 +99,43 @@ enum IexMessage { } macro_rules! parse_msg { - ($input:ident, $msg_type:ident) => {{ - let (rem, msg) = $msg_type::parse($input)?; - Ok((rem, IexMessage::$msg_type(msg))) + ($input:ident, $len:ident, $msg_type:ident) => {{ + let (_, msg) = $msg_type::parse($input)?; + IexMessage::$msg_type(msg) }}; } impl IexMessage { // TODO: Benchmark a version where we cast a packed struct instead of parsing - fn parse(input: &[u8]) -> IResult<&[u8], IexMessage> { - let msg_type = input[0]; - match msg_type { - 0x53 => parse_msg!(input, SystemEvent), - 0x44 => parse_msg!(input, SecurityDirectory), - 0x48 => parse_msg!(input, TradingStatus), - 0x4f => parse_msg!(input, OperationalHaltStatus), - 0x50 => parse_msg!(input, ShortSalePriceTest), - 0x45 => parse_msg!(input, SecurityEvent), + pub fn parse(input: &[u8]) -> IResult<&[u8], IexMessage> { + let (payload, msg_len) = le_u16(input)?; + + // Minor technical note: IEX's docs state that the message format + // can grow at any time, and we should always trust the msg_len field, + // so we first slice off the total msg_len for future compatibility + let (rem, payload) = take(msg_len)(payload)?; + let msg = match payload[0] { + 0x53 => parse_msg!(payload, msg_len, SystemEvent), + 0x44 => parse_msg!(payload, msg_len, SecurityDirectory), + 0x48 => parse_msg!(payload, msg_len, TradingStatus), + 0x4f => parse_msg!(payload, msg_len, OperationalHaltStatus), + 0x50 => parse_msg!(payload, msg_len, ShortSalePriceTest), + 0x45 => parse_msg!(payload, msg_len, SecurityEvent), // Why the "match multiple" looks like bitwise-OR is beyond me. - 0x38 | 0x35 => parse_msg!(input, PriceLevelUpdate), - 0x54 => parse_msg!(input, TradeReport), - 0x58 => parse_msg!(input, OfficialPrice), - 0x42 => parse_msg!(input, TradeBreak), - _ => panic!("Unrecognized message type"), - } + 0x38 | 0x35 => parse_msg!(payload, msg_len, PriceLevelUpdate), + 0x54 => parse_msg!(payload, msg_len, TradeReport), + 0x58 => parse_msg!(payload, msg_len, OfficialPrice), + 0x42 => parse_msg!(payload, msg_len, TradeBreak), + 0x41 => parse_msg!(payload, msg_len, AuctionInformation), + t => panic!("Unrecognized message type={}, payload={:?}", t, payload), + }; + + Ok((rem, msg)) } } #[derive(Debug)] -struct SystemEvent { +pub struct SystemEvent { msg_type: u8, system_event: u8, timestamp: i64, @@ -83,7 +157,7 @@ impl SystemEvent { } #[derive(Debug)] -struct SecurityDirectory { +pub struct SecurityDirectory { msg_type: u8, flags: u8, timestamp: i64, @@ -114,7 +188,7 @@ impl SecurityDirectory { } #[derive(Debug)] -struct TradingStatus { +pub struct TradingStatus { msg_type: u8, trading_status: u8, timestamp: i64, @@ -141,7 +215,7 @@ impl TradingStatus { } #[derive(Debug)] -struct OperationalHaltStatus { +pub struct OperationalHaltStatus { msg_type: u8, halt_status: u8, timestamp: i64, @@ -166,7 +240,7 @@ impl OperationalHaltStatus { } #[derive(Debug)] -struct ShortSalePriceTest { +pub struct ShortSalePriceTest { msg_type: u8, sspt_status: u8, timestamp: i64, @@ -193,7 +267,7 @@ impl ShortSalePriceTest { } #[derive(Debug)] -struct SecurityEvent { +pub struct SecurityEvent { msg_type: u8, security_event: u8, timestamp: i64, @@ -218,7 +292,7 @@ impl SecurityEvent { } #[derive(Debug)] -struct PriceLevelUpdate { +pub struct PriceLevelUpdate { msg_type: u8, event_flags: u8, timestamp: i64, @@ -247,7 +321,7 @@ impl PriceLevelUpdate { } #[derive(Debug)] -struct TradeReport { +pub struct TradeReport { msg_type: u8, sale_condition: u8, timestamp: i64, @@ -278,7 +352,7 @@ impl TradeReport { } #[derive(Debug)] -struct OfficialPrice { +pub struct OfficialPrice { msg_type: u8, price_type: u8, timestamp: i64, @@ -305,7 +379,7 @@ impl OfficialPrice { } #[derive(Debug)] -struct TradeBreak { +pub struct TradeBreak { msg_type: u8, sale_condition: u8, timestamp: i64, @@ -336,7 +410,7 @@ impl TradeBreak { } #[derive(Debug)] -struct AuctionInformation { +pub struct AuctionInformation { msg_type: u8, auction_type: u8, timestamp: i64, diff --git a/src/main.rs b/src/main.rs index ae20f9e..86d79e2 100644 --- a/src/main.rs +++ b/src/main.rs @@ -3,9 +3,8 @@ use std::io::Read; use std::path::Path; use clap::{App, Arg}; -use nom::sequence::tuple; -use parsers::Block; +use crate::iex::IexParser; // Cap'n'Proto and Flatbuffers typically ask that you generate code on the fly to match // the schemas. For purposes of auto-complete and easy browsing in the repository, @@ -35,16 +34,7 @@ fn main() { let mut buf = Vec::new(); file.read_to_end(&mut buf).expect(&format!("Unable to read file={}", path.display())); - let mut rem = &buf[..]; - while let Ok((unparsed, block)) = parsers::read_block(rem) { - let offset = (unparsed.as_ptr() as usize) - (buf.as_ptr() as usize); - rem = unparsed; - match block { - Block::SectionHeader(sh) => println!("{:?}, next offset={}", sh, offset), - Block::InterfaceDescription(id) => println!("{:?}, next offset={}", id, offset), - Block::EnhancedPacket(epb) => println!("EnhancedPacketBlock {{ block_len: {}, packet_len: {} }}, next offset={}", epb.block_len, epb.packet_data.len(), offset) - } + for _payload in IexParser::new(&buf[..]) { + //dbg!(payload); } - - println!("Remaining unparsed len={}", rem.len()); } diff --git a/src/parsers.rs b/src/parsers.rs index 7896631..cea18b7 100644 --- a/src/parsers.rs +++ b/src/parsers.rs @@ -1,6 +1,3 @@ -use std::convert::TryInto; -use std::mem::size_of; - use nom::{ branch::alt, bytes::complete::tag, bytes::complete::take, IResult, number::complete::*, sequence::tuple, @@ -13,10 +10,14 @@ pub enum Block<'a> { } pub fn read_block(input: &[u8]) -> IResult<&[u8], Block> { + // TODO: Curious if this is faster than `match` + // Theoretically it should be because we're almost always using + // enhanced packet blocks, but don't know if the branch predictor + // would catch on as well. alt(( + enhanced_packet_block, section_header_block, interface_description_block, - enhanced_packet_block, ))(input) }