mirror of
				https://github.com/speice-io/marketdata-shootout
				synced 2025-11-03 18:10:38 -05:00 
			
		
		
		
	Able to replay an entire DEEP file!
This commit is contained in:
		
							
								
								
									
										136
									
								
								src/iex.rs
									
									
									
									
									
								
							
							
						
						
									
										136
									
								
								src/iex.rs
									
									
									
									
									
								
							@ -2,8 +2,42 @@ use std::convert::TryInto;
 | 
			
		||||
 | 
			
		||||
use nom::{bytes::complete::take, IResult, number::complete::*, sequence::tuple};
 | 
			
		||||
 | 
			
		||||
use crate::parsers::{Block, extract_iex_data, read_block};
 | 
			
		||||
 | 
			
		||||
pub struct IexParser<'a> {
 | 
			
		||||
    pcap_buffer: &'a [u8]
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
impl<'a> IexParser<'a> {
 | 
			
		||||
    pub fn new(pcap_buffer: &[u8]) -> IexParser {
 | 
			
		||||
        IexParser { pcap_buffer }
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
impl<'a> Iterator for IexParser<'a> {
 | 
			
		||||
    type Item = IexPayload;
 | 
			
		||||
 | 
			
		||||
    fn next(&mut self) -> Option<Self::Item> {
 | 
			
		||||
        let mut buffer = self.pcap_buffer;
 | 
			
		||||
        while let Ok((rem, block)) = read_block(buffer) {
 | 
			
		||||
            self.pcap_buffer = rem;
 | 
			
		||||
            buffer = rem;
 | 
			
		||||
            match block {
 | 
			
		||||
                Block::EnhancedPacket(e) => {
 | 
			
		||||
                    let (_, iex_data) = extract_iex_data(e.packet_data).unwrap();
 | 
			
		||||
                    let (_, payload) = IexPayload::parse(iex_data).unwrap();
 | 
			
		||||
                    return Some(payload);
 | 
			
		||||
                }
 | 
			
		||||
                _ => ()
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        None
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
#[derive(Debug)]
 | 
			
		||||
struct IexPayload {
 | 
			
		||||
pub struct IexPayload {
 | 
			
		||||
    version: u8,
 | 
			
		||||
    _reserved: u8,
 | 
			
		||||
    proto_id: u16,
 | 
			
		||||
@ -17,8 +51,40 @@ struct IexPayload {
 | 
			
		||||
    messages: smallvec::SmallVec<[IexMessage; 8]>,
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
impl IexPayload {
 | 
			
		||||
    pub fn parse(payload: &[u8]) -> IResult<&[u8], IexPayload> {
 | 
			
		||||
        let (mut rem, (version, _reserved, proto_id, channel_id, session_id, payload_len, msg_count, stream_offset, first_seq_no, send_time)) =
 | 
			
		||||
            tuple((le_u8, le_u8, le_u16, le_u32, le_u32, le_u16, le_u16, le_u64, le_u64, le_i64))(payload)?;
 | 
			
		||||
 | 
			
		||||
        let mut messages = smallvec::SmallVec::new();
 | 
			
		||||
        for _i in 0..msg_count {
 | 
			
		||||
            let (_rem, msg) = IexMessage::parse(rem)?;
 | 
			
		||||
            rem = _rem;
 | 
			
		||||
            messages.push(msg);
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        debug_assert!(rem.len() == 0);
 | 
			
		||||
        Ok((
 | 
			
		||||
            rem,
 | 
			
		||||
            IexPayload {
 | 
			
		||||
                version,
 | 
			
		||||
                _reserved,
 | 
			
		||||
                proto_id,
 | 
			
		||||
                channel_id,
 | 
			
		||||
                session_id,
 | 
			
		||||
                payload_len,
 | 
			
		||||
                msg_count,
 | 
			
		||||
                stream_offset,
 | 
			
		||||
                first_seq_no,
 | 
			
		||||
                send_time,
 | 
			
		||||
                messages,
 | 
			
		||||
            }
 | 
			
		||||
        ))
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
#[derive(Debug)]
 | 
			
		||||
enum IexMessage {
 | 
			
		||||
pub enum IexMessage {
 | 
			
		||||
    SystemEvent(SystemEvent),
 | 
			
		||||
    SecurityDirectory(SecurityDirectory),
 | 
			
		||||
    TradingStatus(TradingStatus),
 | 
			
		||||
@ -33,35 +99,43 @@ enum IexMessage {
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
macro_rules! parse_msg {
 | 
			
		||||
    ($input:ident, $msg_type:ident) => {{
 | 
			
		||||
        let (rem, msg) = $msg_type::parse($input)?;
 | 
			
		||||
        Ok((rem, IexMessage::$msg_type(msg)))
 | 
			
		||||
    ($input:ident, $len:ident, $msg_type:ident) => {{
 | 
			
		||||
        let (_, msg) = $msg_type::parse($input)?;
 | 
			
		||||
        IexMessage::$msg_type(msg)
 | 
			
		||||
    }};
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
impl IexMessage {
 | 
			
		||||
    // TODO: Benchmark a version where we cast a packed struct instead of parsing
 | 
			
		||||
    fn parse(input: &[u8]) -> IResult<&[u8], IexMessage> {
 | 
			
		||||
        let msg_type = input[0];
 | 
			
		||||
        match msg_type {
 | 
			
		||||
            0x53 => parse_msg!(input, SystemEvent),
 | 
			
		||||
            0x44 => parse_msg!(input, SecurityDirectory),
 | 
			
		||||
            0x48 => parse_msg!(input, TradingStatus),
 | 
			
		||||
            0x4f => parse_msg!(input, OperationalHaltStatus),
 | 
			
		||||
            0x50 => parse_msg!(input, ShortSalePriceTest),
 | 
			
		||||
            0x45 => parse_msg!(input, SecurityEvent),
 | 
			
		||||
    pub fn parse(input: &[u8]) -> IResult<&[u8], IexMessage> {
 | 
			
		||||
        let (payload, msg_len) = le_u16(input)?;
 | 
			
		||||
 | 
			
		||||
        // Minor technical note: IEX's docs state that the message format
 | 
			
		||||
        // can grow at any time, and we should always trust the msg_len field,
 | 
			
		||||
        // so we first slice off the total msg_len for future compatibility
 | 
			
		||||
        let (rem, payload) = take(msg_len)(payload)?;
 | 
			
		||||
        let msg = match payload[0] {
 | 
			
		||||
            0x53 => parse_msg!(payload, msg_len, SystemEvent),
 | 
			
		||||
            0x44 => parse_msg!(payload, msg_len, SecurityDirectory),
 | 
			
		||||
            0x48 => parse_msg!(payload, msg_len, TradingStatus),
 | 
			
		||||
            0x4f => parse_msg!(payload, msg_len, OperationalHaltStatus),
 | 
			
		||||
            0x50 => parse_msg!(payload, msg_len, ShortSalePriceTest),
 | 
			
		||||
            0x45 => parse_msg!(payload, msg_len, SecurityEvent),
 | 
			
		||||
            // Why the "match multiple" looks like bitwise-OR is beyond me.
 | 
			
		||||
            0x38 | 0x35 => parse_msg!(input, PriceLevelUpdate),
 | 
			
		||||
            0x54 => parse_msg!(input, TradeReport),
 | 
			
		||||
            0x58 => parse_msg!(input, OfficialPrice),
 | 
			
		||||
            0x42 => parse_msg!(input, TradeBreak),
 | 
			
		||||
            _ => panic!("Unrecognized message type"),
 | 
			
		||||
        }
 | 
			
		||||
            0x38 | 0x35 => parse_msg!(payload, msg_len, PriceLevelUpdate),
 | 
			
		||||
            0x54 => parse_msg!(payload, msg_len, TradeReport),
 | 
			
		||||
            0x58 => parse_msg!(payload, msg_len, OfficialPrice),
 | 
			
		||||
            0x42 => parse_msg!(payload, msg_len, TradeBreak),
 | 
			
		||||
            0x41 => parse_msg!(payload, msg_len, AuctionInformation),
 | 
			
		||||
            t => panic!("Unrecognized message type={}, payload={:?}", t, payload),
 | 
			
		||||
        };
 | 
			
		||||
 | 
			
		||||
        Ok((rem, msg))
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
#[derive(Debug)]
 | 
			
		||||
struct SystemEvent {
 | 
			
		||||
pub struct SystemEvent {
 | 
			
		||||
    msg_type: u8,
 | 
			
		||||
    system_event: u8,
 | 
			
		||||
    timestamp: i64,
 | 
			
		||||
@ -83,7 +157,7 @@ impl SystemEvent {
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
#[derive(Debug)]
 | 
			
		||||
struct SecurityDirectory {
 | 
			
		||||
pub struct SecurityDirectory {
 | 
			
		||||
    msg_type: u8,
 | 
			
		||||
    flags: u8,
 | 
			
		||||
    timestamp: i64,
 | 
			
		||||
@ -114,7 +188,7 @@ impl SecurityDirectory {
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
#[derive(Debug)]
 | 
			
		||||
struct TradingStatus {
 | 
			
		||||
pub struct TradingStatus {
 | 
			
		||||
    msg_type: u8,
 | 
			
		||||
    trading_status: u8,
 | 
			
		||||
    timestamp: i64,
 | 
			
		||||
@ -141,7 +215,7 @@ impl TradingStatus {
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
#[derive(Debug)]
 | 
			
		||||
struct OperationalHaltStatus {
 | 
			
		||||
pub struct OperationalHaltStatus {
 | 
			
		||||
    msg_type: u8,
 | 
			
		||||
    halt_status: u8,
 | 
			
		||||
    timestamp: i64,
 | 
			
		||||
@ -166,7 +240,7 @@ impl OperationalHaltStatus {
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
#[derive(Debug)]
 | 
			
		||||
struct ShortSalePriceTest {
 | 
			
		||||
pub struct ShortSalePriceTest {
 | 
			
		||||
    msg_type: u8,
 | 
			
		||||
    sspt_status: u8,
 | 
			
		||||
    timestamp: i64,
 | 
			
		||||
@ -193,7 +267,7 @@ impl ShortSalePriceTest {
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
#[derive(Debug)]
 | 
			
		||||
struct SecurityEvent {
 | 
			
		||||
pub struct SecurityEvent {
 | 
			
		||||
    msg_type: u8,
 | 
			
		||||
    security_event: u8,
 | 
			
		||||
    timestamp: i64,
 | 
			
		||||
@ -218,7 +292,7 @@ impl SecurityEvent {
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
#[derive(Debug)]
 | 
			
		||||
struct PriceLevelUpdate {
 | 
			
		||||
pub struct PriceLevelUpdate {
 | 
			
		||||
    msg_type: u8,
 | 
			
		||||
    event_flags: u8,
 | 
			
		||||
    timestamp: i64,
 | 
			
		||||
@ -247,7 +321,7 @@ impl PriceLevelUpdate {
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
#[derive(Debug)]
 | 
			
		||||
struct TradeReport {
 | 
			
		||||
pub struct TradeReport {
 | 
			
		||||
    msg_type: u8,
 | 
			
		||||
    sale_condition: u8,
 | 
			
		||||
    timestamp: i64,
 | 
			
		||||
@ -278,7 +352,7 @@ impl TradeReport {
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
#[derive(Debug)]
 | 
			
		||||
struct OfficialPrice {
 | 
			
		||||
pub struct OfficialPrice {
 | 
			
		||||
    msg_type: u8,
 | 
			
		||||
    price_type: u8,
 | 
			
		||||
    timestamp: i64,
 | 
			
		||||
@ -305,7 +379,7 @@ impl OfficialPrice {
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
#[derive(Debug)]
 | 
			
		||||
struct TradeBreak {
 | 
			
		||||
pub struct TradeBreak {
 | 
			
		||||
    msg_type: u8,
 | 
			
		||||
    sale_condition: u8,
 | 
			
		||||
    timestamp: i64,
 | 
			
		||||
@ -336,7 +410,7 @@ impl TradeBreak {
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
#[derive(Debug)]
 | 
			
		||||
struct AuctionInformation {
 | 
			
		||||
pub struct AuctionInformation {
 | 
			
		||||
    msg_type: u8,
 | 
			
		||||
    auction_type: u8,
 | 
			
		||||
    timestamp: i64,
 | 
			
		||||
 | 
			
		||||
							
								
								
									
										16
									
								
								src/main.rs
									
									
									
									
									
								
							
							
						
						
									
										16
									
								
								src/main.rs
									
									
									
									
									
								
							@ -3,9 +3,8 @@ use std::io::Read;
 | 
			
		||||
use std::path::Path;
 | 
			
		||||
 | 
			
		||||
use clap::{App, Arg};
 | 
			
		||||
use nom::sequence::tuple;
 | 
			
		||||
 | 
			
		||||
use parsers::Block;
 | 
			
		||||
use crate::iex::IexParser;
 | 
			
		||||
 | 
			
		||||
// Cap'n'Proto and Flatbuffers typically ask that you generate code on the fly to match
 | 
			
		||||
// the schemas. For purposes of auto-complete and easy browsing in the repository,
 | 
			
		||||
@ -35,16 +34,7 @@ fn main() {
 | 
			
		||||
    let mut buf = Vec::new();
 | 
			
		||||
    file.read_to_end(&mut buf).expect(&format!("Unable to read file={}", path.display()));
 | 
			
		||||
 | 
			
		||||
    let mut rem = &buf[..];
 | 
			
		||||
    while let Ok((unparsed, block)) = parsers::read_block(rem) {
 | 
			
		||||
        let offset = (unparsed.as_ptr() as usize) - (buf.as_ptr() as usize);
 | 
			
		||||
        rem = unparsed;
 | 
			
		||||
        match block {
 | 
			
		||||
            Block::SectionHeader(sh) => println!("{:?}, next offset={}", sh, offset),
 | 
			
		||||
            Block::InterfaceDescription(id) => println!("{:?}, next offset={}", id, offset),
 | 
			
		||||
            Block::EnhancedPacket(epb) => println!("EnhancedPacketBlock {{ block_len: {}, packet_len: {} }}, next offset={}", epb.block_len, epb.packet_data.len(), offset)
 | 
			
		||||
        }
 | 
			
		||||
    for _payload in IexParser::new(&buf[..]) {
 | 
			
		||||
        //dbg!(payload);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    println!("Remaining unparsed len={}", rem.len());
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -1,6 +1,3 @@
 | 
			
		||||
use std::convert::TryInto;
 | 
			
		||||
use std::mem::size_of;
 | 
			
		||||
 | 
			
		||||
use nom::{
 | 
			
		||||
    branch::alt, bytes::complete::tag, bytes::complete::take, IResult, number::complete::*,
 | 
			
		||||
    sequence::tuple,
 | 
			
		||||
@ -13,10 +10,14 @@ pub enum Block<'a> {
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
pub fn read_block(input: &[u8]) -> IResult<&[u8], Block> {
 | 
			
		||||
    // TODO: Curious if this is faster than `match`
 | 
			
		||||
    // Theoretically it should be because we're almost always using
 | 
			
		||||
    // enhanced packet blocks, but don't know if the branch predictor
 | 
			
		||||
    // would catch on as well.
 | 
			
		||||
    alt((
 | 
			
		||||
        enhanced_packet_block,
 | 
			
		||||
        section_header_block,
 | 
			
		||||
        interface_description_block,
 | 
			
		||||
        enhanced_packet_block,
 | 
			
		||||
    ))(input)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
		Reference in New Issue
	
	Block a user