All articles
Userspace TCP in Rust with DPDK for High-Frequency Trading
Systems Programming

Userspace TCP in Rust with DPDK for High-Frequency Trading

Who This Article Is For — And a Warning

By Luis SoaresFebruary 12, 2026Original on Medium

Who This Article Is For — And a Warning

This article walks through the design and implementation of a custom, minimal TCP handler in Rust using DPDK kernel bypass, built for an equities market-making platform. The code is real, the design decisions are reasoned, and the performance numbers are measured.

However: this is a case study in extremes, not a tutorial. Building a custom TCP stack is almost certainly not how you should approach low-latency networking, even in HFT. Before you read further, here’s a decision tree:

Use the standard kernel stack (with tuning) if your latency budget is above 5µs. A properly configured kernel with SO_BUSY_POLL, TCP_NODELAY, IRQ affinity, isolcpus, and nohz_full achieves 5–8µs RTT in colocation environments. That's sufficient for the vast majority of quantitative trading strategies.

Use an existing kernel-bypass TCP library — Solarflare OpenOnload, Mellanox VMA, or a DPDK-based TCP stack like F-Stack — if you need 1–3µs and want to preserve the BSD socket API. These have been battle-tested for over a decade, handle the edge cases we’re about to deliberately ignore, and cost you zero engineering headcount to maintain. This is what most HFT firms actually use.

Consider a custom userspace stack only if all of these are true:

  • You have 3+ full-time engineers dedicated to network stack development and maintenance.
  • You’ve already deployed OpenOnload or equivalent and profiled its overhead.
  • Your edge is specifically in wire-to-wire latency (not strategy computation, not colocation hardware).
  • You’re willing to accept occasional protocol-induced disconnections that a full TCP stack would handle.
  • Compliance or vendor policy prevents you from using third-party networking code.

Consider FPGA-based networking if you need sub-100ns latency. At that point, software — even kernel-bypass software — is the bottleneck, and this entire article is irrelevant to you.

With that framing established: here’s how we built ours, why we made the choices we did, and where those choices hurt us.


The Latency Landscape: Honest Numbers

The common narrative in kernel-bypass advocacy compares a hand-tuned userspace stack against a default, untuned Linux kernel and claims 30–50× improvement. That’s misleading. Here’s what we actually measured on identical hardware (Mellanox ConnectX-5, Intel Xeon Gold 6254, direct cross-connect to exchange gateway) across three configurations:

Configuration                         Median RTT    p99 RTT     p99.9 RTT
──────────────────────────────────────────────────────────────────────────
Default kernel (Ubuntu 22.04, epoll)    28 µs        85 µs        210 µs
Tuned kernel (busy-poll, isolcpus,
  IRQ pinning, TCP_NODELAY)              6.2 µs      11 µs         24 µs
Custom DPDK userspace stack              1.1 µs       1.8 µs        2.9 µs
──────────────────────────────────────────────────────────────────────────

The real improvement over a tuned kernel is roughly 5–6× on median and 6–8× on tail latency. Not 30×. That 5× matters enormously when your strategy’s alpha decays on a microsecond timescale — but it’s a fundamentally different argument than “the kernel is unusable.” The kernel is fine. We needed better than fine.

The tail latency improvement is arguably more important than the median. The tuned kernel’s p99.9 of 24µs means that one in a thousand packets hits a scheduling hiccup, a TLB miss on a non-hugepage buffer, or an interrupt that slipped through isolcpus. In a system processing 500,000 market data ticks per second, that's 500 ticks per second arriving late. With the DPDK stack, the p99.9 is 2.9µs — the distribution is tight because there's simply nothing in the path that can introduce jitter.


Architecture Overview

A kernel-bypass trading system restructures how the application interacts with the network. The NIC is unbound from the kernel driver entirely and handed to a userspace framework that manages it directly via DMA ring buffers mapped into the process’s address space.

Traditional Stack                    Kernel Bypass Stack
┌──────────────┐                    ┌──────────────┐
│  Application │                    │  Application │
├──────────────┤                    ├──────────────┤
│   Socket API │                    │ Custom TCP   │
├──────────────┤                    │ State Machine│
│   TCP/IP     │                    ├──────────────┤
│   (kernel)   │                    │ DPDK / PMD   │
├──────────────┤                    │  (userspace) │
│   NIC Driver │                    ├──────────────┤
│   (kernel)   │                    │   NIC HW     │
├──────────────┤                    │  (DMA rings) │
│   NIC HW     │                    └──────────────┘
└──────────────┘
  ~6-8µs (tuned)                      ~1-2µs
  ~28µs (default)

The key components of our Rust-based bypass stack:

  1. DPDK Poll-Mode Driver (PMD) — drives the NIC in userspace via dpdk-sys FFI bindings, polling receive queues in a tight loop with zero interrupt overhead.
  2. Custom TCP State Machine — a minimal, allocation-free TCP implementation that handles only the connection patterns we need (long-lived FIX sessions), deliberately trading generality for speed.
  3. Zero-Copy Packet Pipeline — packets are parsed in-place from DPDK mbufs, with the application reading directly from DMA-mapped memory.
  4. Core-Pinned Event Loop — a single-threaded, busy-polling reactor pinned to an isolated CPU core.

Hardware and OS Prerequisites

Before writing a single line of Rust, the host must be configured for deterministic operation. These steps apply equally to kernel-bypass and tuned-kernel setups — if you haven’t done these, start here before considering DPDK.

BIOS/Firmware Configuration

# Critical BIOS settings for latency-sensitive workloads:
- Disable Hyper-Threading (SMT)          # Eliminates L1 cache contention
- Disable C-States (all except C0)       # Prevents core sleep/wake latency
- Disable P-States / SpeedStep           # Locks CPU frequency
- Disable NUMA interleaving              # Ensures local memory access
- Enable VT-d / IOMMU                    # Required for DPDK VFIO
- Set power profile to "Maximum Perf"    # Prevents firmware throttling

Linux Kernel Tuning

# Isolate CPU cores 2-5 from the scheduler
# Core 2: NIC RX poll loop
# Core 3: TCP processing + strategy
# Core 4: Order transmission

# Core 5: Logging / telemetry (non-critical path)
GRUB_CMDLINE_LINUX="isolcpus=2-5 nohz_full=2-5 rcu_nocbs=2-5 \
    intel_pstate=disable processor.max_cstate=0 idle=poll \
    hugepagesz=1G hugepages=4 iommu=pt intel_iommu=on \
    nosmt transparent_hugepage=never"

# After boot: verify isolation
cat /sys/devices/system/cpu/isolated
# Expected: 2-5

# Disable irqbalance and pin NIC interrupts away from trading cores
systemctl stop irqbalance
for irq in $(grep eth0 /proc/interrupts | awk '{print $1}' | tr -d ':'); do
    echo 1 > /proc/irq/$irq/smp_affinity  # Pin to core 0
done

Hugepage Configuration

DPDK requires hugepages for its memory pools. 1GB pages eliminate TLB misses for packet buffer access:

# Verify hugepages
grep -i huge /proc/meminfo
# HugePages_Total:       4
# HugePages_Free:        4
# Hugepagesize:    1048576 kB

mount -t hugetlbfs nodev /dev/hugepages -o pagesize=1G

NIC Binding

# Unbind NIC from kernel driver, bind to VFIO for DPDK
dpdk-devbind.py --bind=vfio-pci 0000:03:00.0

Project Structure and DPDK Bindings

We use raw FFI bindings to DPDK rather than a higher-level wrapper, giving full control over memory management and packet processing semantics.

hft-tcp/
├── Cargo.toml
├── build.rs                    # DPDK linkage configuration
├── src/
│   ├── main.rs                 # Entry point, EAL init, core launch
│   ├── dpdk/
│   │   ├── mod.rs
│   │   ├── ffi.rs              # Raw DPDK FFI bindings
│   │   ├── mbuf.rs             # Zero-copy mbuf wrapper
│   │   └── port.rs             # NIC port configuration
│   ├── net/
│   │   ├── mod.rs
│   │   ├── ethernet.rs         # Ethernet frame parsing
│   │   ├── ip.rs               # IPv4 header parsing
│   │   ├── tcp.rs              # TCP state machine
│   │   ├── arp.rs              # ARP responder
│   │   └── checksum.rs         # Hardware-offloaded checksums
│   ├── protocol/
│   │   ├── mod.rs
│   │   └── fix.rs              # FIX protocol parser
│   ├── engine/
│   │   ├── mod.rs
│   │   └── event_loop.rs       # Core poll loop
│   └── mem/
│       ├── mod.rs
│       ├── pool.rs             # Object pool allocator
│       └── ring.rs             # Lock-free SPSC ring buffer

Build Configuration

# Cargo.toml
[package]
name = "hft-tcp"
version = "0.1.0"
edition = "2021"

[dependencies]
libc = "0.2"

[build-dependencies]
pkg-config = "0.3"
bindgen = "0.69"

[profile.release]
opt-level = 3
lto = "fat"
codegen-units = 1
panic = "abort"          # No unwinding overhead on the hot path
target-cpu = "native"    # Use all available CPU instructions
// build.rs
fn main() {
    let dpdk_libs = [
        "rte_eal", "rte_mempool", "rte_mbuf", "rte_ring",
        "rte_ethdev", "rte_net", "rte_hash", "rte_timer",
    ];

    for lib in &dpdk_libs {
        println!("cargo:rustc-link-lib={lib}");
    }

    let dpdk = pkg_config::Config::new()
        .probe("libdpdk")
        .expect("DPDK not found via pkg-config");

    for path in &dpdk.include_paths {
        println!("cargo:include={}", path.display());
    }

    println!("cargo:rustc-link-lib=numa");
    println!("cargo:rustc-link-lib=pthread");
}

Zero-Copy Mbuf Abstraction

DPDK’s rte_mbuf is the fundamental packet buffer — a metadata header pointing into DMA-mapped hugepage memory where the NIC deposits raw bytes. Our Rust wrapper provides safe, zero-copy access without allocation on the critical path.

A note on the safety claim here: the Drop implementation guarantees that every mbuf is returned to its mempool on every code path. This isn't the borrow checker verifying DPDK's invariants — it can't do that across FFI. What it does is prevent the specific class of bug where a C programmer adds an early return and forgets the goto out_free. That's a narrow but real advantage. The unsafe at the boundary is still manual and still your responsibility. Rust doesn't make DPDK safe; it makes the wrapper harder to misuse.

// src/dpdk/mbuf.rs
use std::ptr::NonNull;
use std::slice;

/// Zero-copy wrapper around DPDK's rte_mbuf.
///
/// This type does NOT implement Clone - mbufs are unique resources
/// that must be explicitly freed back to their mempool.
pub struct Mbuf {
    raw: NonNull<dpdk_ffi::rte_mbuf>,
}

// SAFETY: Mbufs are pinned to a single core's mempool and never shared
// across threads. The Send impl is required because the Mbuf moves from
// the RX burst call into the event loop, but this happens on a single core.
unsafe impl Send for Mbuf {}
impl Mbuf {

    /// Wrap a raw mbuf pointer received from rte_eth_rx_burst.
    ///
    /// # Safety
    /// The pointer must be a valid, non-null mbuf from a DPDK mempool.
    /// Caller must ensure the mbuf is not accessed through any other pointer
    /// for the lifetime of this wrapper.
    #[inline(always)]
    pub unsafe fn from_raw(ptr: *mut dpdk_ffi::rte_mbuf) -> Self {
        Self {
            raw: NonNull::new_unchecked(ptr),
        }
    }

    /// Returns a slice over the packet data, starting from the
    /// current data offset. Points directly into DMA memory -
    /// no copies performed.
    #[inline(always)]
    pub fn data(&self) -> &[u8] {
        unsafe {
            let mbuf = self.raw.as_ref();
            let data_ptr = (mbuf.buf_addr as *const u8).add(mbuf.data_off as usize);
            slice::from_raw_parts(data_ptr, mbuf.data_len as usize)
        }
    }

    /// Mutable slice over packet data for in-place modification
    /// (e.g., swapping MAC addresses for response packets).
    #[inline(always)]
    pub fn data_mut(&mut self) -> &mut [u8] {
        unsafe {
            let mbuf = self.raw.as_mut();
            let data_ptr = (mbuf.buf_addr as *mut u8).add(mbuf.data_off as usize);
            slice::from_raw_parts_mut(data_ptr, mbuf.data_len as usize)
        }
    }

    #[inline(always)]
    pub fn pkt_len(&self) -> u32 {
        unsafe { self.raw.as_ref().pkt_len }
    }

    /// Prepend space to the packet (move data_off backward).
    #[inline(always)]
    pub fn prepend(&mut self, len: u16) -> Option<&mut [u8]> {
        unsafe {
            let mbuf = self.raw.as_mut();
            if mbuf.data_off < len {
                return None;
            }
            mbuf.data_off -= len;
            mbuf.data_len += len;
            mbuf.pkt_len += len as u32;
            Some(slice::from_raw_parts_mut(
                (mbuf.buf_addr as *mut u8).add(mbuf.data_off as usize),
                len as usize,
            ))
        }
    }

    /// Consume the wrapper and return the raw pointer.
    /// Used when passing mbufs to rte_eth_tx_burst.
    #[inline(always)]
    pub fn into_raw(self) -> *mut dpdk_ffi::rte_mbuf {
        let ptr = self.raw.as_ptr();
        std::mem::forget(self); // Don't run Drop
        ptr
    }
}
impl Drop for Mbuf {
    fn drop(&mut self) {
        unsafe {
            dpdk_ffi::rte_pktmbuf_free(self.raw.as_ptr());
        }
    }
}

Zero-Copy Packet Parsing

With raw bytes accessible through the mbuf, we parse Ethernet, IP, and TCP headers by casting byte slices into header references in place. No allocation, no copying — each parse compiles to a pointer cast and a bounds check (~2–5 nanoseconds for all three layers).

// src/net/ethernet.rs
pub const ETH_HEADER_LEN: usize = 14;
pub const ETHERTYPE_IPV4: u16 = 0x0800;
pub const ETHERTYPE_ARP: u16 = 0x0806;
#[repr(C, packed)]
#[derive(Clone, Copy)]
pub struct EthHeader {
    pub dst_mac: [u8; 6],
    pub src_mac: [u8; 6],
    pub ethertype: u16,
}

impl EthHeader {
    #[inline(always)]
    pub fn parse(data: &[u8]) -> Option<&Self> {
        if data.len() < ETH_HEADER_LEN {
            return None;
        }
        // SAFETY: EthHeader is repr(C, packed) and we've verified length.
        Some(unsafe { &*(data.as_ptr() as *const Self) })
    }
    #[inline(always)]
    pub fn ethertype(&self) -> u16 {
        u16::from_be(self.ethertype)
    }
}

// src/net/ip.rs
pub const IP_HEADER_MIN_LEN: usize = 20;
#[repr(C, packed)]
#[derive(Clone, Copy)]
pub struct Ipv4Header {
    pub version_ihl: u8,
    pub dscp_ecn: u8,
    pub total_length: u16,
    pub identification: u16,
    pub flags_fragment: u16,
    pub ttl: u8,
    pub protocol: u8,
    pub checksum: u16,
    pub src_addr: u32,
    pub dst_addr: u32,
}

impl Ipv4Header {
    #[inline(always)]
    pub fn parse(data: &[u8]) -> Option<&Self> {
        if data.len() < IP_HEADER_MIN_LEN {
            return None;
        }
        Some(unsafe { &*(data.as_ptr() as *const Self) })
    }
    #[inline(always)]
    pub fn header_len(&self) -> usize {
        ((self.version_ihl & 0x0F) as usize) * 4
    }
    #[inline(always)]
    pub fn total_length(&self) -> u16 {
        u16::from_be(self.total_length)
    }
    #[inline(always)]
    pub fn ecn(&self) -> u8 {
        self.dscp_ecn & 0x03
    }
    #[inline(always)]
    pub fn protocol(&self) -> u8 {
        self.protocol
    }
}

// src/net/tcp.rs (header parsing)
pub const TCP_HEADER_MIN_LEN: usize = 20;
pub const TCP_FIN: u8 = 0x01;
pub const TCP_SYN: u8 = 0x02;
pub const TCP_RST: u8 = 0x04;
pub const TCP_PSH: u8 = 0x08;
pub const TCP_ACK: u8 = 0x10;
pub const TCP_ECE: u8 = 0x40;
pub const TCP_CWR: u8 = 0x80;
#[repr(C, packed)]
#[derive(Clone, Copy)]
pub struct TcpHeader {
    pub src_port: u16,
    pub dst_port: u16,
    pub seq_num: u32,
    pub ack_num: u32,
    pub data_offset_flags: u16,
    pub window: u16,
    pub checksum: u16,
    pub urgent_ptr: u16,
}

impl TcpHeader {
    #[inline(always)]
    pub fn parse(data: &[u8]) -> Option<&Self> {
        if data.len() < TCP_HEADER_MIN_LEN {
            return None;
        }
        Some(unsafe { &*(data.as_ptr() as *const Self) })
    }
    #[inline(always)]
    pub fn src_port(&self) -> u16 { u16::from_be(self.src_port) }
    #[inline(always)]
    pub fn dst_port(&self) -> u16 { u16::from_be(self.dst_port) }
    #[inline(always)]
    pub fn seq_num(&self) -> u32 { u32::from_be(self.seq_num) }
    #[inline(always)]
    pub fn ack_num(&self) -> u32 { u32::from_be(self.ack_num) }
    #[inline(always)]
    pub fn data_offset(&self) -> usize {
        ((u16::from_be(self.data_offset_flags) >> 12) as usize) * 4
    }
    #[inline(always)]
    pub fn flags(&self) -> u8 {
        (u16::from_be(self.data_offset_flags) & 0xFF) as u8
    }
    #[inline(always)]
    pub fn has_flag(&self, flag: u8) -> bool {
        self.flags() & flag != 0
    }
    #[inline(always)]
    pub fn window(&self) -> u16 { u16::from_be(self.window) }
}

TCP State Machine: Minimal by Design

A general-purpose TCP implementation handles dozens of edge cases: simultaneous open, urgent data, silly window syndrome, Nagle’s algorithm, slow start, congestion avoidance, and more. Our stack handles exactly the connection patterns that matter: long-lived, pre-established sessions to known exchange gateways, carrying FIX messages over reliable datacenter links.

This section is honest about what we cut, why we cut it, and — crucially — where those cuts have bitten us.

// src/net/tcp.rs (state machine)
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum TcpState {
    Closed,
    SynSent,
    Established,
    FinWait1,
    FinWait2,
    TimeWait,

}
/// 4-tuple connection identifier. Used as lookup key via perfect hashing.
#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq)]
pub struct ConnKey {
    pub local_addr: u32,
    pub local_port: u16,
    pub remote_addr: u32,
    pub remote_port: u16,
}

/// Single TCP connection's state.
///
/// Hot fields are packed into the first cache line (64 bytes).
/// Cold fields (retransmit queue, stats) follow.
#[repr(C)]
pub struct TcpConnection {
    // === Cache line 0: Hot path (64 bytes) ===
    pub key: ConnKey,              // 12 bytes
    pub state: TcpState,           //  1 byte
    _pad0: [u8; 3],               //  3 bytes
    pub snd_nxt: u32,             //  4 bytes - next sequence to send
    pub snd_una: u32,             //  4 bytes - oldest unacknowledged
    pub rcv_nxt: u32,             //  4 bytes - next expected receive seq
    pub rcv_wnd: u16,             //  2 bytes - our advertised window
    pub snd_wnd: u16,             //  2 bytes - peer's advertised window
    pub last_ack_sent: u32,       //  4 bytes
    pub last_activity_tsc: u64,   //  8 bytes - TSC of last packet
    pub rtt_estimate_ns: u32,     //  4 bytes - smoothed RTT
    pub ecn_ce_count: u32,        //  4 bytes - ECN congestion signals
    _pad1: [u8; 8],              //  8 bytes to fill cache line
    // === Cache line 1+: Cold path ===
    pub retransmit_queue: RetransmitQueue,
    pub stats: ConnStats,
    pub rate_state: RateState,
}

/// Lightweight rate control based on ECN signals.
/// NOT full congestion control - just enough to back off when
/// the switch signals queue buildup via ECN CE marks.
pub struct RateState {
    /// Target inter-packet gap in TSC ticks. Increased on ECN CE,
    /// decayed back toward zero when signals clear.
    pub send_gap_tsc: u64,
    /// Timestamp of last ECN CE received.
    pub last_ce_tsc: u64,
    /// Number of CE marks in current measurement window.
    pub ce_in_window: u32,
    /// Total packets in current measurement window.
    pub packets_in_window: u32,
}

pub struct RetransmitQueue {
    entries: [Option<RetransmitEntry>; 16], // Fixed-size, no heap allocation
    head: usize,
    len: usize,
}

pub struct RetransmitEntry {
    pub seq: u32,
    pub data_len: u16,
    pub header_template: [u8; 66], // Pre-built ETH+IP+TCP headers
    pub sent_tsc: u64,
    pub retransmit_count: u8,
}

#[derive(Default)]
pub struct ConnStats {
    pub packets_rx: u64,
    pub packets_tx: u64,
    pub bytes_rx: u64,
    pub bytes_tx: u64,
    pub retransmits: u64,
    pub ecn_ce_marks: u64,
}

impl TcpConnection {
    /// Process an inbound TCP segment and return an action.
    #[inline(always)]
    pub fn on_segment(
        &mut self,
        ip: &Ipv4Header,
        tcp: &TcpHeader,
        payload: &[u8],
        now_tsc: u64,
    ) -> SegmentAction {
        self.last_activity_tsc = now_tsc;
        self.stats.packets_rx += 1;
        self.stats.bytes_rx += payload.len() as u64;
        // Check ECN Congestion Experienced (CE) in IP header
        if ip.ecn() == 0b11 {
            self.handle_ecn_ce(now_tsc);
        }
        match self.state {
            TcpState::Established => self.handle_established(tcp, payload),
            TcpState::SynSent => self.handle_syn_sent(tcp),
            TcpState::FinWait1 => self.handle_fin_wait1(tcp),
            TcpState::FinWait2 => self.handle_fin_wait2(tcp),
            _ => SegmentAction::Drop,
        }
    }

    #[inline(always)]
    fn handle_established(
        &mut self,
        tcp: &TcpHeader,
        payload: &[u8],
    ) -> SegmentAction {
        let seg_seq = tcp.seq_num();
        let seg_ack = tcp.ack_num();
        let flags = tcp.flags();
        // Peer is signaling that it received our ECN CE echo
        if flags & TCP_CWR != 0 {
            // Peer acknowledged our ECE - we can stop echoing
        }
        // Fast path: in-order data with ACK
        if seg_seq == self.rcv_nxt && flags & TCP_ACK != 0 {
            if Self::seq_gt(seg_ack, self.snd_una)
                && Self::seq_leq(seg_ack, self.snd_nxt)
            {
                self.acknowledge_up_to(seg_ack);
            }
            self.snd_wnd = tcp.window();
            if !payload.is_empty() {
                self.rcv_nxt = self.rcv_nxt.wrapping_add(payload.len() as u32);
                // Echo ECE flag if we've seen CE marks
                let echo_ece = self.ecn_ce_count > 0;
                return SegmentAction::Deliver {
                    ack_immediately: flags & TCP_PSH != 0,
                    echo_ece,
                };
            }
            return SegmentAction::Consumed;
        }

        if flags & TCP_FIN != 0 && seg_seq == self.rcv_nxt {
            self.rcv_nxt = self.rcv_nxt.wrapping_add(1);
            self.state = TcpState::TimeWait;
            return SegmentAction::SendAck;
        }

        if flags & TCP_RST != 0 {
            self.state = TcpState::Closed;
            return SegmentAction::ConnectionReset;
        }
        // Out-of-order - send duplicate ACK
        SegmentAction::SendDuplicateAck
    }

    fn handle_syn_sent(&mut self, tcp: &TcpHeader) -> SegmentAction {
        if tcp.has_flag(TCP_SYN) && tcp.has_flag(TCP_ACK) {
            self.rcv_nxt = tcp.seq_num().wrapping_add(1);
            self.snd_una = tcp.ack_num();
            self.snd_wnd = tcp.window();
            self.state = TcpState::Established;
            return SegmentAction::SendAck;
        }
        SegmentAction::Drop
    }

    fn handle_fin_wait1(&mut self, tcp: &TcpHeader) -> SegmentAction {
        if tcp.has_flag(TCP_ACK) {
            self.state = TcpState::FinWait2;
        }
        if tcp.has_flag(TCP_FIN) {
            self.rcv_nxt = tcp.seq_num().wrapping_add(1);
            self.state = TcpState::TimeWait;
            return SegmentAction::SendAck;
        }
        SegmentAction::Consumed
    }

    fn handle_fin_wait2(&mut self, tcp: &TcpHeader) -> SegmentAction {
        if tcp.has_flag(TCP_FIN) {
            self.rcv_nxt = tcp.seq_num().wrapping_add(1);
            self.state = TcpState::TimeWait;
            return SegmentAction::SendAck;
        }
        SegmentAction::Consumed
    }
    /// Handle ECN Congestion Experienced mark from IP header.
    ///
    /// This is our minimal congestion response. When the switch marks
    /// a packet with CE, it means queue depth is building. We increase
    /// the inter-packet send gap to reduce our rate, similar to DCTCP's
    /// approach but much simpler.
    ///
    /// Why not skip this entirely? We tried. PFC storms on the exchange
    /// switch caused 200ms pauses when our uncontrolled burst rate
    /// triggered priority flow control. A 1µs inter-packet gap during
    /// congestion costs us almost nothing in normal operation but
    /// prevents cascading switch failures.
    fn handle_ecn_ce(&mut self, now_tsc: u64) {
        self.ecn_ce_count += 1;
        self.stats.ecn_ce_marks += 1;
        self.rate_state.ce_in_window += 1;
        self.rate_state.last_ce_tsc = now_tsc;
        // Increase send gap: DCTCP-inspired proportional response.
        // ce_fraction = CE marks / total packets in window
        // gap = base_gap * (1 + alpha * ce_fraction)
        if self.rate_state.packets_in_window > 0 {
            let ce_fraction = self.rate_state.ce_in_window as f64
                / self.rate_state.packets_in_window as f64;
            // At 100% CE marking, impose a 2µs inter-packet gap (~6000 TSC at 3GHz)
            self.rate_state.send_gap_tsc = (6000.0 * ce_fraction) as u64;
        }
    }

    /// Called periodically to decay the send gap when CE signals clear.
    pub fn decay_rate_control(&mut self, now_tsc: u64, tsc_hz: u64) {
        // If no CE in the last 1ms, halve the send gap
        let one_ms_tsc = tsc_hz / 1000;
        if now_tsc.wrapping_sub(self.rate_state.last_ce_tsc) > one_ms_tsc {
            self.rate_state.send_gap_tsc /= 2;
            self.rate_state.ce_in_window = 0;
            self.rate_state.packets_in_window = 0;
        }
    }

    #[inline(always)]
    fn acknowledge_up_to(&mut self, ack: u32) {
        self.snd_una = ack;
        // Retire acknowledged entries from retransmit queue
        while self.retransmit_queue.len > 0 {
            if let Some(front) = self.retransmit_queue.peek_front() {
                let end_seq = front.seq.wrapping_add(front.data_len as u32);
                if Self::seq_leq(end_seq, ack) {
                    self.retransmit_queue.pop_front();
                } else {
                    break;
                }
            } else {
                break;
            }
        }
    }

    #[inline(always)]
    fn seq_gt(a: u32, b: u32) -> bool {
        (a.wrapping_sub(b) as i32) > 0
    }

    #[inline(always)]
    fn seq_leq(a: u32, b: u32) -> bool {
        (a.wrapping_sub(b) as i32) <= 0
    }
}

Practice what you learned

Reinforce this article with hands-on coding exercises and AI-powered feedback.

View all exercises

#[derive(Debug)] pub enum SegmentAction { Deliver { ack_immediately: bool, echo_ece: bool }, SendAck, Consumed, SendDuplicateAck, ConnectionReset, Drop, }


### What We Deliberately Omit — And What Bit Us

Feature Why omitted Risk Our experience **Nagle’s algorithm** Every message is latency-critical; never coalesce None in practice No issues **Delayed ACKs** We ACK immediately on PSH, batch otherwise Minor bandwidth overhead No issues **Slow start** Dedicated link, no shared path Microburst at switch ingress See congestion section **Full congestion control** Too much latency overhead Packet loss → retransmit-only recovery **PFC storms** — added ECN response **Out-of-order reassembly** Direct cable, reordering shouldn’t happen Silent data corruption if assumptions wrong One incident in 18 months **Silly window avoidance** FIX messages are request/response None No issues **TIME\_WAIT recycling** Persistent connections Reconnection delay Acceptable

The congestion control omission was our most expensive mistake. In month four of production, a firmware bug on the exchange’s ingress switch caused it to assert Priority Flow Control (PFC) pause frames when our burst rate exceeded its shallow buffer during a volatility spike. The PFC pause propagated back to our NIC, which dutifully paused all transmission for 200ms — an eternity. We lost significant P&L on the positions we couldn’t hedge during the pause.

The fix was the lightweight ECN-based rate control shown above. It adds roughly 3 nanoseconds of overhead per packet (one branch on `ip.ecn()`, one counter increment) and has prevented recurrence. The lesson: "no shared infrastructure to congest" is a fiction. Even a dedicated cross-connect passes through switch silicon with finite buffer depth.

---

### FIX Protocol Parser: Where the Real Latency Lives

The TCP layer delivers byte streams. The application layer must parse those bytes into actionable trading messages. For FIX protocol — the dominant wire format for equity exchanges — this parsing is often **more expensive than the TCP processing itself**.

FIX is a tag-value ASCII protocol. A typical execution report looks like:

```ini
8=FIX.4.2|9=215|35=8|49=EXCHANGE|56=TRADER|34=1274|52=20240115-14:30:00.123456|
37=ORDER123|11=CLO456|17=EXEC789|150=2|39=2|55=AAPL|54=1|38=100|44=185.50|
32=100|31=185.50|14=100|6=185.50|151=0|10=178|

(Where | represents the SOH delimiter, 0x01.)

Messages are variable-length, field order is not guaranteed within groups, and repeating groups add nesting. This is emphatically not “fixed-size frames.” Parsing it efficiently without allocation requires careful design.

// src/protocol/fix.rs
/// SOH delimiter byte (ASCII 0x01), separating tag=value pairs.
const SOH: u8 = 0x01;
const EQUALS: u8 = b'=';

/// A single FIX field parsed in-place from the TCP payload.
/// Both tag and value are slices into the original mbuf data -
/// zero copies, zero allocations.
#[derive(Debug, Clone, Copy)]
pub struct FixField<'a> {
    pub tag: u32,
    pub value: &'a [u8],
}

/// Zero-allocation FIX message parser.
///
/// Parses tag-value pairs directly from the TCP payload bytes
/// without copying or allocating. Fields are yielded as references
/// into the original DMA buffer.
///
/// Performance: ~60-90ns for a typical 200-byte execution report
/// (measured via criterion benchmarks on Xeon Gold 6254).
pub struct FixParser<'a> {
    data: &'a [u8],
    pos: usize,
}

impl<'a> FixParser<'a> {
    #[inline(always)]
    pub fn new(data: &'a [u8]) -> Self {
        Self { data, pos: 0 }
    }
    /// Parse the next tag=value field. Returns None at end of message.
    #[inline(always)]
    pub fn next_field(&mut self) -> Option<FixField<'a>> {
        if self.pos >= self.data.len() {
            return None;
        }
        // Parse tag (digits before '=')
        let tag_start = self.pos;
        let mut tag: u32 = 0;
        while self.pos < self.data.len() && self.data[self.pos] != EQUALS {
            // Branchless digit accumulation
            tag = tag * 10 + (self.data[self.pos] - b'0') as u32;
            self.pos += 1;
        }
        if self.pos >= self.data.len() {
            return None; // Malformed: no '='
        }
        self.pos += 1; // Skip '='
        // Parse value (bytes before SOH)
        let value_start = self.pos;
        while self.pos < self.data.len() && self.data[self.pos] != SOH {
            self.pos += 1;
        }
        let value = &self.data[value_start..self.pos];
        if self.pos < self.data.len() {
            self.pos += 1; // Skip SOH
        }
        Some(FixField { tag, value })
    }
}

/// Pre-defined FIX tags we care about on the hot path.
/// Using constants rather than an enum avoids match overhead.
pub mod tags {
    pub const MSG_TYPE: u32 = 35;
    pub const SENDER_COMP_ID: u32 = 49;
    pub const TARGET_COMP_ID: u32 = 56;
    pub const MSG_SEQ_NUM: u32 = 34;
    pub const SENDING_TIME: u32 = 52;
    pub const ORDER_ID: u32 = 37;
    pub const CL_ORD_ID: u32 = 11;
    pub const EXEC_ID: u32 = 17;
    pub const EXEC_TYPE: u32 = 150;
    pub const ORD_STATUS: u32 = 39;
    pub const SYMBOL: u32 = 55;
    pub const SIDE: u32 = 54;
    pub const ORDER_QTY: u32 = 38;
    pub const PRICE: u32 = 44;
    pub const LAST_QTY: u32 = 32;
    pub const LAST_PX: u32 = 31;
    pub const CUM_QTY: u32 = 14;
    pub const LEAVES_QTY: u32 = 151;
    pub const HEARTBEAT_INT: u32 = 108;
    pub const TEST_REQ_ID: u32 = 112;
    pub const BEGIN_SEQ_NO: u32 = 7;
    pub const END_SEQ_NO: u32 = 16;
    pub const CHECKSUM: u32 = 10;
}

/// Decoded execution report - the fields the strategy actually needs.
/// Populated by scanning FIX fields once; the strategy never touches
/// raw FIX bytes.
pub struct ExecReport<'a> {
    pub order_id: &'a [u8],
    pub cl_ord_id: &'a [u8],
    pub exec_type: u8,       // '0'=New, '1'=PartFill, '2'=Fill, etc.
    pub symbol: &'a [u8],
    pub side: u8,            // '1'=Buy, '2'=Sell
    pub last_qty: u32,
    pub last_px: FixPrice,
    pub leaves_qty: u32,
    pub cum_qty: u32,
    pub seq_num: u32,
}

/// Fixed-point price to avoid floating-point on the hot path.
/// Stored as integer with implicit 6 decimal places.
/// 185.500000 → 185_500_000
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
pub struct FixPrice(pub i64);

impl FixPrice {
    /// Parse a FIX price field ("185.50") into fixed-point.
    /// ~8ns for typical price strings.
    #[inline(always)]
    pub fn parse(value: &[u8]) -> Self {
        let mut result: i64 = 0;
        let mut decimal_places: i32 = -1; // -1 = haven't seen dot yet
        let mut negative = false;
        let mut i = 0;
        if i < value.len() && value[i] == b'-' {
            negative = true;
            i += 1;
        }
        while i < value.len() {
            let b = value[i];
            if b == b'.' {
                decimal_places = 0;
            } else if b >= b'0' && b <= b'9' {
                result = result * 10 + (b - b'0') as i64;
                if decimal_places >= 0 {
                    decimal_places += 1;
                }
            }
            i += 1;
        }
        // Normalize to 6 decimal places
        let scale = if decimal_places < 0 { 6 } else { 6 - decimal_places };
        for _ in 0..scale {
            result *= 10;
        }
        if negative { result = -result; }
        FixPrice(result)
    }
}

/// Parse an unsigned integer from ASCII bytes without allocation.
/// ~3ns for typical quantity/seqnum strings.
#[inline(always)]
pub fn parse_uint(value: &[u8]) -> u32 {
    let mut result: u32 = 0;
    for &b in value {
        result = result * 10 + (b - b'0') as u32;
    }
    result
}

/// Decode an execution report from parsed FIX fields.
/// Scans fields in order, extracting only what the strategy needs.
/// Returns None if required fields are missing.
#[inline]
pub fn decode_exec_report<'a>(payload: &'a [u8]) -> Option<ExecReport<'a>> {
    let mut parser = FixParser::new(payload);
    let mut order_id: Option<&[u8]> = None;
    let mut cl_ord_id: Option<&[u8]> = None;
    let mut exec_type: Option<u8> = None;
    let mut symbol: Option<&[u8]> = None;
    let mut side: Option<u8> = None;
    let mut last_qty: Option<u32> = None;
    let mut last_px: Option<FixPrice> = None;
    let mut leaves_qty: Option<u32> = None;
    let mut cum_qty: Option<u32> = None;
    let mut seq_num: Option<u32> = None;
    while let Some(field) = parser.next_field() {
        match field.tag {
            tags::ORDER_ID => order_id = Some(field.value),
            tags::CL_ORD_ID => cl_ord_id = Some(field.value),
            tags::EXEC_TYPE => exec_type = field.value.first().copied(),
            tags::SYMBOL => symbol = Some(field.value),
            tags::SIDE => side = field.value.first().copied(),
            tags::LAST_QTY => last_qty = Some(parse_uint(field.value)),
            tags::LAST_PX => last_px = Some(FixPrice::parse(field.value)),
            tags::LEAVES_QTY => leaves_qty = Some(parse_uint(field.value)),
            tags::CUM_QTY => cum_qty = Some(parse_uint(field.value)),
            tags::MSG_SEQ_NUM => seq_num = Some(parse_uint(field.value)),
            tags::CHECKSUM => break, // Tag 10 is always last
            _ => {} // Skip unknown tags
        }
    }

    Some(ExecReport {
        order_id: order_id?,
        cl_ord_id: cl_ord_id?,
        exec_type: exec_type?,
        symbol: symbol?,
        side: side?,
        last_qty: last_qty.unwrap_or(0),
        last_px: last_px.unwrap_or(FixPrice(0)),
        leaves_qty: leaves_qty.unwrap_or(0),
        cum_qty: cum_qty.unwrap_or(0),
        seq_num: seq_num?,
    })
}

/// FIX session-level message handling.
///
/// Exchanges require specific responses to session messages:
/// - Heartbeat (35=0): Echo back within HeartBtInt seconds
/// - TestRequest (35=1): Respond with Heartbeat containing TestReqID
/// - Logon (35=A): Negotiate session, set NextExpectedSeqNum
/// - Logout (35=5): Graceful session teardown
/// - ResendRequest (35=2): Exchange wants us to retransmit a range
/// - SequenceReset (35=4): Exchange is resetting its sequence numbers
///
/// These MUST be handled correctly or the exchange disconnects you.
/// This is the part the original article omitted - and it's where most
/// custom TCP stack projects discover they've underestimated the work.
pub enum FixSessionAction {
    /// Send a Heartbeat response, optionally with TestReqID
    SendHeartbeat { test_req_id: Option<Vec<u8>> },
    /// Exchange accepted our logon
    LogonAccepted { heartbeat_interval: u32 },
    /// Exchange is logging us out
    LogoutReceived,
    /// Exchange wants us to resend messages in [begin, end]
    ResendRequest { begin_seq: u32, end_seq: u32 },
    /// Exchange reset its sequence numbers - update expected
    SequenceReset { new_seq: u32 },
    /// Application message - pass to strategy
    ApplicationMessage,
    /// Sequence gap detected - we need to request resend from exchange
    SequenceGap { expected: u32, received: u32 },
}

/// FIX session state machine - layered ON TOP of TCP.
///
/// This maintains its own state independently of TCP: sequence numbers,
/// heartbeat timers, and login state. A FIX session can be logically
/// "down" while the TCP connection is still alive, and vice versa.
pub struct FixSession {
    pub state: FixSessionState,
    pub our_seq_num: u32,
    pub expected_remote_seq: u32,
    pub heartbeat_interval_tsc: u64,
    pub last_sent_tsc: u64,
    pub last_received_tsc: u64,
    pub sender_comp_id: [u8; 16],
    pub sender_comp_id_len: usize,
    pub target_comp_id: [u8; 16],
    pub target_comp_id_len: usize,
}

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum FixSessionState {
    Disconnected,
    LogonSent,
    Active,
    LogoutSent,
}

impl FixSession {
    /// Process a received FIX message at the session level.
    /// Returns what the event loop should do next.
    pub fn on_message(&mut self, payload: &[u8], now_tsc: u64) -> FixSessionAction {
        self.last_received_tsc = now_tsc;
        let mut parser = FixParser::new(payload);
        let mut msg_type: Option<&[u8]> = None;
        let mut msg_seq: Option<u32> = None;
        // Quick scan for session-critical fields
        while let Some(field) = parser.next_field() {
            match field.tag {
                tags::MSG_TYPE => msg_type = Some(field.value),
                tags::MSG_SEQ_NUM => msg_seq = Some(parse_uint(field.value)),
                // Stop scanning after we have what we need for session routing
                _ if msg_type.is_some() && msg_seq.is_some() => break,
                _ => {}
            }
        }
        let msg_type = match msg_type {
            Some(t) => t,
            None => return FixSessionAction::ApplicationMessage, // Malformed
        };
        // Sequence number validation
        if let Some(seq) = msg_seq {
            if seq > self.expected_remote_seq {
                // Gap detected - request resend before processing
                let action = FixSessionAction::SequenceGap {
                    expected: self.expected_remote_seq,
                    received: seq,
                };
                return action;
            }
            self.expected_remote_seq = seq + 1;
        }
        // Route by message type
        match msg_type {
            b"0" => FixSessionAction::SendHeartbeat { test_req_id: None },
            b"1" => {
                // TestRequest - extract TestReqID and echo in heartbeat
                let mut test_req_id = None;
                let mut scan = FixParser::new(payload);
                while let Some(f) = scan.next_field() {
                    if f.tag == tags::TEST_REQ_ID {
                        test_req_id = Some(f.value.to_vec());
                        break;
                    }
                }
                FixSessionAction::SendHeartbeat { test_req_id }
            }
            b"A" => {
                self.state = FixSessionState::Active;
                let mut hb_int = 30;
                let mut scan = FixParser::new(payload);
                while let Some(f) = scan.next_field() {
                    if f.tag == tags::HEARTBEAT_INT {
                        hb_int = parse_uint(f.value);
                        break;
                    }
                }
                FixSessionAction::LogonAccepted { heartbeat_interval: hb_int }
            }
            b"5" => {
                self.state = FixSessionState::Disconnected;
                FixSessionAction::LogoutReceived
            }
            b"2" => {
                let mut begin = 0u32;
                let mut end = 0u32;
                let mut scan = FixParser::new(payload);
                while let Some(f) = scan.next_field() {
                    match f.tag {
                        tags::BEGIN_SEQ_NO => begin = parse_uint(f.value),
                        tags::END_SEQ_NO => end = parse_uint(f.value),
                        _ => {}
                    }
                }
                FixSessionAction::ResendRequest { begin_seq: begin, end_seq: end }
            }
            b"4" => {
                let mut new_seq = self.expected_remote_seq;
                let mut scan = FixParser::new(payload);
                while let Some(f) = scan.next_field() {
                    if f.tag == 36 { // NewSeqNo
                        new_seq = parse_uint(f.value);
                        break;
                    }
                }
                self.expected_remote_seq = new_seq;
                FixSessionAction::SequenceReset { new_seq }
            }
            _ => FixSessionAction::ApplicationMessage,
        }
    }

    /// Check if we need to send a heartbeat (no message sent within interval).
    pub fn check_heartbeat(&self, now_tsc: u64) -> bool {
        now_tsc.wrapping_sub(self.last_sent_tsc) > self.heartbeat_interval_tsc
    }
}

FIX Parsing Performance

Measured on the hot path with criterion:

Message Type                  Size      Parse Time    Notes
──────────────────────────────────────────────────────────────
Execution Report (full)       ~220B       82 ns       All fields extracted
Market Data Snapshot          ~350B      110 ns       10 price levels
Heartbeat                      ~60B       18 ns       Early exit at tag 35
Session Reject                ~120B       35 ns       Few fields needed

For comparison, the TCP header processing for the same packet costs ~15ns. The FIX parsing layer is 4–7× more expensive than TCP — which is exactly why the original article’s omission of this layer was misleading. Optimizing TCP from 15ns to 10ns while spending 82ns on FIX parsing is the wrong priority. In practice, we spent more engineering time optimizing the FIX parser (SIMD scanning for SOH delimiters, branchless tag parsing, fixed-point price arithmetic) than on the TCP state machine.


Packet Transmission

When we need to send a TCP segment, we allocate an mbuf from the mempool and write headers directly into DMA memory. Hardware checksum offload eliminates ~100ns of per-packet overhead.

// src/net/tcp.rs (transmission)
pub struct TcpTxEngine {
    local_mac: [u8; 6],
    gateway_mac: [u8; 6],
    mempool: *mut dpdk_ffi::rte_mempool,
    port_id: u16,
    tx_queue_id: u16,
}

impl TcpTxEngine {
    /// Build and transmit a TCP segment with optional ECE echo.
    #[inline]
    pub fn send_segment(
        &self,
        conn: &mut TcpConnection,
        flags: u8,
        payload: Option<&[u8]>,
    ) -> Result<usize, TxError> {
        // Respect ECN-based rate control: if send_gap_tsc is set,
        // check whether enough time has passed since last send.
        if conn.rate_state.send_gap_tsc > 0 {
            let now = unsafe { std::arch::x86_64::_rdtsc() };
            let elapsed = now.wrapping_sub(conn.last_activity_tsc);
            if elapsed < conn.rate_state.send_gap_tsc {
                return Err(TxError::RateControlled);
            }
        }
        let payload_len = payload.map_or(0, |p| p.len());
        let total_len = ETH_HEADER_LEN + IP_HEADER_MIN_LEN + TCP_HEADER_MIN_LEN + payload_len;
        let mbuf_ptr = unsafe { dpdk_ffi::rte_pktmbuf_alloc(self.mempool) };
        if mbuf_ptr.is_null() {
            return Err(TxError::MbufExhausted);
        }
        let mut mbuf = unsafe { Mbuf::from_raw(mbuf_ptr) };
        let buf = mbuf.data_mut();
        if buf.len() < total_len {
            return Err(TxError::BufferTooSmall);
        }
        // ---- Ethernet ----
        buf[0..6].copy_from_slice(&self.gateway_mac);
        buf[6..12].copy_from_slice(&self.local_mac);
        buf[12..14].copy_from_slice(&ETHERTYPE_IPV4.to_be_bytes());
        // ---- IPv4 ----
        let ip = &mut buf[ETH_HEADER_LEN..];
        ip[0] = 0x45;
        ip[1] = 0x02; // DSCP=0, ECN=10 (ECT(0)) - we support ECN
        let ip_total = (IP_HEADER_MIN_LEN + TCP_HEADER_MIN_LEN + payload_len) as u16;
        ip[2..4].copy_from_slice(&ip_total.to_be_bytes());
        ip[4..6].copy_from_slice(&0u16.to_be_bytes());
        ip[6..8].copy_from_slice(&0x4000u16.to_be_bytes()); // Don't Fragment
        ip[8] = 64;  // TTL
        ip[9] = 6;   // TCP
        ip[10..12].copy_from_slice(&0u16.to_be_bytes()); // Checksum (HW offload)
        ip[12..16].copy_from_slice(&conn.key.local_addr.to_be_bytes());
        ip[16..20].copy_from_slice(&conn.key.remote_addr.to_be_bytes());
        // ---- TCP ----
        let tcp = &mut buf[ETH_HEADER_LEN + IP_HEADER_MIN_LEN..];
        tcp[0..2].copy_from_slice(&conn.key.local_port.to_be_bytes());
        tcp[2..4].copy_from_slice(&conn.key.remote_port.to_be_bytes());
        tcp[4..8].copy_from_slice(&conn.snd_nxt.to_be_bytes());
        tcp[8..12].copy_from_slice(&conn.rcv_nxt.to_be_bytes());
        let data_offset_flags: u16 = (5 << 12) | (flags as u16);
        tcp[12..14].copy_from_slice(&data_offset_flags.to_be_bytes());
        tcp[14..16].copy_from_slice(&conn.rcv_wnd.to_be_bytes());
        tcp[16..18].copy_from_slice(&0u16.to_be_bytes()); // Checksum (HW offload)
        tcp[18..20].copy_from_slice(&0u16.to_be_bytes());
        if let Some(data) = payload {
            let offset = ETH_HEADER_LEN + IP_HEADER_MIN_LEN + TCP_HEADER_MIN_LEN;
            buf[offset..offset + data.len()].copy_from_slice(data);
            conn.snd_nxt = conn.snd_nxt.wrapping_add(data.len() as u32);
        }
        // Packet metadata for HW offload
        unsafe {
            (*mbuf_ptr).data_len = total_len as u16;
            (*mbuf_ptr).pkt_len = total_len as u32;
            (*mbuf_ptr).ol_flags |= dpdk_ffi::RTE_MBUF_F_TX_IP_CKSUM
                | dpdk_ffi::RTE_MBUF_F_TX_TCP_CKSUM
                | dpdk_ffi::RTE_MBUF_F_TX_IPV4;
            (*mbuf_ptr).l2_len = ETH_HEADER_LEN as u64;
            (*mbuf_ptr).l3_len = IP_HEADER_MIN_LEN as u64;
        }
        let mut tx_ptr = mbuf.into_raw();
        let sent = unsafe {
            dpdk_ffi::rte_eth_tx_burst(self.port_id, self.tx_queue_id, &mut tx_ptr, 1)
        };
        if sent == 0 {
            unsafe { dpdk_ffi::rte_pktmbuf_free(tx_ptr); }
            return Err(TxError::TxRingFull);
        }
        conn.stats.packets_tx += 1;
        conn.stats.bytes_tx += payload_len as u64;
        conn.rate_state.packets_in_window += 1;
        Ok(payload_len)
    }
}

#[derive(Debug)]
pub enum TxError {
    MbufExhausted,
    BufferTooSmall,
    TxRingFull,
    RateControlled, // ECN back-pressure
}

The Event Loop

The core poll loop runs on an isolated CPU core. It never sleeps, never yields, never makes a system call. Time is measured via rdtsc (~1ns) rather than clock_gettime (~50ns).

// src/engine/event_loop.rs
use std::arch::x86_64::_rdtsc;

const RX_BURST_SIZE: usize = 32;
const RETRANSMIT_CHECK_INTERVAL_TSC: u64 = 1_000_000; // ~0.3ms at 3GHz
const RATE_DECAY_INTERVAL_TSC: u64 = 3_000_000;       // ~1ms at 3GHz
pub struct EventLoop {
    port_id: u16,
    rx_queue_id: u16,
    connections: ConnectionTable, // Perfect-hash lookup by ConnKey
    tx_engine: TcpTxEngine,
    tsc_hz: u64,
    last_retransmit_check: u64,
    last_rate_decay: u64,
    on_fix_message: Box<dyn FnMut(&ConnKey, FixSessionAction, &[u8])>,
}

impl EventLoop {
    /// Run forever. Must be called on an isolated, pinned core.
    pub fn run(&mut self) -> ! {
        let mut rx_mbufs: [*mut dpdk_ffi::rte_mbuf; RX_BURST_SIZE] =
            [std::ptr::null_mut(); RX_BURST_SIZE];
        loop {
            let now_tsc = unsafe { _rdtsc() };
            // ---- Poll NIC ----
            let rx_count = unsafe {
                dpdk_ffi::rte_eth_rx_burst(
                    self.port_id,
                    self.rx_queue_id,
                    rx_mbufs.as_mut_ptr(),
                    RX_BURST_SIZE as u16,
                )
            } as usize;
            for i in 0..rx_count {
                let mbuf = unsafe { Mbuf::from_raw(rx_mbufs[i]) };
                self.process_packet(mbuf, now_tsc);
            }
            // ---- Periodic: retransmissions ----
            if now_tsc.wrapping_sub(self.last_retransmit_check)
                > RETRANSMIT_CHECK_INTERVAL_TSC
            {
                self.check_retransmissions(now_tsc);
                self.last_retransmit_check = now_tsc;
            }
            // ---- Periodic: ECN rate decay ----
            if now_tsc.wrapping_sub(self.last_rate_decay) > RATE_DECAY_INTERVAL_TSC {
                for conn in self.connections.iter_mut() {
                    conn.decay_rate_control(now_tsc, self.tsc_hz);
                }
                self.last_rate_decay = now_tsc;
            }
            // ---- Periodic: FIX heartbeats ----
            for conn_entry in self.connections.iter_mut() {
                if conn_entry.fix_session.check_heartbeat(now_tsc) {
                    // Build and send FIX Heartbeat message
                    // ... (omitted for brevity)
                }
            }
        }
    }

    #[inline(always)]
    fn process_packet(&mut self, mbuf: Mbuf, now_tsc: u64) {
        let data = mbuf.data();
        let eth = match EthHeader::parse(data) {
            Some(h) => h,
            None => return,
        };
        match eth.ethertype() {
            ETHERTYPE_ARP => { self.handle_arp(&data[ETH_HEADER_LEN..]); return; }
            ETHERTYPE_IPV4 => {}
            _ => return,
        }
        let ip_data = &data[ETH_HEADER_LEN..];
        let ip = match Ipv4Header::parse(ip_data) {
            Some(h) => h,
            None => return,
        };
        if ip.protocol() != 6 { return; }
        let ip_hdr_len = ip.header_len();
        let tcp_data = &ip_data[ip_hdr_len..];
        let tcp = match TcpHeader::parse(tcp_data) {
            Some(h) => h,
            None => return,
        };
        let tcp_hdr_len = tcp.data_offset();
        let payload = &tcp_data[tcp_hdr_len..];
        let key = ConnKey {
            local_addr: u32::from_be(ip.dst_addr),
            local_port: tcp.dst_port(),
            remote_addr: u32::from_be(ip.src_addr),
            remote_port: tcp.src_port(),
        };
        let conn = match self.connections.get_mut(&key) {
            Some(c) => c,
            None => return,
        };

        // TCP state machine
        match conn.tcp.on_segment(ip, tcp, payload, now_tsc) {
            SegmentAction::Deliver { ack_immediately, echo_ece } => {
                let mut ack_flags = TCP_ACK;
                if echo_ece { ack_flags |= TCP_ECE; }
                // FIX session processing
                let action = conn.fix_session.on_message(payload, now_tsc);
                (self.on_fix_message)(&key, action, payload);
                if ack_immediately {
                    let _ = self.tx_engine.send_segment(&mut conn.tcp, ack_flags, None);
                }
            }
            SegmentAction::SendAck | SegmentAction::SendDuplicateAck => {
                let _ = self.tx_engine.send_segment(&mut conn.tcp, TCP_ACK, None);
            }
            SegmentAction::ConnectionReset => {
                self.connections.remove(&key);
            }
            SegmentAction::Consumed | SegmentAction::Drop => {}
        }
    }

    fn check_retransmissions(&mut self, now_tsc: u64) {
        for conn in self.connections.iter_mut() {
            if conn.tcp.retransmit_queue.len == 0 { continue; }
            if let Some(oldest) = conn.tcp.retransmit_queue.peek_front() {
                let elapsed_ns = tsc_to_ns(
                    now_tsc.wrapping_sub(oldest.sent_tsc),
                    self.tsc_hz,
                );
                if elapsed_ns > (conn.tcp.rtt_estimate_ns as u64) * 2 {
                    conn.tcp.stats.retransmits += 1;
                    // Retransmit from pre-built header template in entry
                }
            }
        }
    }

    fn handle_arp(&self, _data: &[u8]) {
        // Respond to ARP requests for our IP
    }
}

#[inline(always)]
fn tsc_to_ns(tsc_delta: u64, tsc_hz: u64) -> u64 {
    (tsc_delta as u128 * 1_000_000_000 / tsc_hz as u128) as u64
}

Scaling: The Shared State Problem

A single-core event loop handles one or a few exchange connections. Real multi-asset market-making requires dozens of sessions, and — critically — shared state across them: risk limits aggregate across all positions, strategy decisions on instrument A depend on prices from instrument B.

The article’s previous version implied RSS (Receive Side Scaling) solves this by partitioning connections across cores. That’s true for throughput but not for latency-sensitive shared state. Here’s how we actually handle it:

Architecture: shared-nothing network I/O cores feeding a single strategy core.

┌─────────────┐
 NIC RX Queue 0 ──→│ Net Core 0  │──→ SPSC Ring ──┐
                    └─────────────┘                 │
                    ┌─────────────┐                 ▼
 NIC RX Queue 1 ──→│ Net Core 1  │──→ SPSC Ring ──→ Strategy Core
                    └─────────────┘                 │  (all state here)
                    ┌─────────────┐                 │
 NIC RX Queue 2 ──→│ Net Core 2  │──→ SPSC Ring ──┘
                    └─────────────┘
                           ▲
                    Order SPSC Rings (strategy → net cores)

The network cores are dumb I/O engines: they run TCP, parse FIX, and push decoded messages (not raw bytes) into per-core SPSC rings. The strategy core owns all global state — positions, risk limits, order books, cross-instrument signals — and reads from all input rings in a single poll loop. When the strategy decides to trade, it pushes a pre-formatted order message back to the appropriate network core’s outbound ring.

This means cross-core communication adds ~10–15ns of latency per hop (SPSC ring push + pop). For a market data tick arriving on net core 0 that triggers an order on the connection owned by net core 1, the path is: net core 0 → ring → strategy core → ring → net core 1. That’s ~25ns of cross-core overhead on top of the wire-to-wire processing.

The SPSC ring implementation:

// src/mem/ring.rs
use std::cell::UnsafeCell;
use std::sync::atomic::{AtomicUsize, Ordering};

/// Bounded, lock-free, wait-free SPSC ring buffer.
/// Cache-line padded to prevent false sharing.
#[repr(C)]
pub struct SpscRing<T, const N: usize> {
    head: AtomicUsize,
    _pad0: [u8; 56],
    tail: AtomicUsize,
    _pad1: [u8; 56],
    buffer: [UnsafeCell<Option<T>>; N],
}

unsafe impl<T: Send, const N: usize> Send for SpscRing<T, N> {}

unsafe impl<T: Send, const N: usize> Sync for SpscRing<T, N> {}

impl<T, const N: usize> SpscRing<T, N> {
    #[inline(always)]
    pub fn push(&self, item: T) -> Result<(), T> {
        let head = self.head.load(Ordering::Relaxed);
        let next_head = (head + 1) % N;
        if next_head == self.tail.load(Ordering::Acquire) {
            return Err(item);
        }
        unsafe { *self.buffer[head].get() = Some(item); }
        self.head.store(next_head, Ordering::Release);
        Ok(())
    }

    #[inline(always)]
    pub fn pop(&self) -> Option<T> {
        let tail = self.tail.load(Ordering::Relaxed);
        if tail == self.head.load(Ordering::Acquire) {
            return None;
        }
        let item = unsafe { (*self.buffer[tail].get()).take() };
        self.tail.store((tail + 1) % N, Ordering::Release);
        item
    }
}

On x86_64, the Release store compiles to a plain MOV — the strong memory model gives us release semantics for free.


Alternatives: What You Should Probably Use Instead

Before building any of the above, we evaluated these alternatives. Here’s an honest comparison:

Solution Median RTT Tail (p99.9) Socket API Maintenance Cost Solarflare OpenOnload 2.5 µs 5 µs Full BSD Vendor-supported NIC + license Mellanox VMA 2.8 µs 6 µs Full BSD Vendor-supported Free with ConnectX AF_XDP 3.5 µs 8 µs Custom (XDP) Community Free F-Stack (DPDK + FreeBSD TCP) 1.8 µs 3.5 µs Near-full BSD Community Free Custom DPDK (this article) 1.1 µs 2.9 µs None You Engineering salary

OpenOnload is where most firms should start. It intercepts standard socket calls and redirects them through Solarflare’s userspace TCP stack. Your existing code works unchanged with 2–3× latency improvement. The tradeoff is vendor lock-in (Solarflare/Xilinx NICs only) and opaque internals you can’t tune.

Mellanox VMA is the equivalent for Mellanox/NVIDIA ConnectX NICs. Similar approach, similar performance, free with the hardware. Less mature than OpenOnload but actively developed.

AF_XDP is interesting as a middle ground: it’s an in-kernel framework that gives you raw packet access without fully leaving the kernel. Latency is higher than DPDK but you retain tcpdump visibility and don't need to manage your own ARP/ICMP.

F-Stack grafts FreeBSD’s mature TCP stack onto DPDK. You get a real, full-featured TCP implementation with kernel-bypass performance. This is probably the right answer if you need DPDK performance but don’t want to write a TCP state machine.

We chose the custom approach because:

  1. F-Stack’s FreeBSD TCP stack carried features we didn’t need, and their overhead — even minimal — compounded across 500k packets/second.
  2. We needed precise control over ACK timing for exchange-specific requirements.
  3. Compliance required full source ownership of the network stack.

Point 3 was the decisive factor. If your compliance team doesn’t require it, use OpenOnload or F-Stack.


Latency Budget: Where the Time Actually Goes

After building and profiling the full pipeline, here’s the honest breakdown:

Pipeline Stage                  Median (p50)    p99       p99.9     % of total
────────────────────────────────────────────────────────────────────────────────
NIC DMA → mbuf available          35 ns          50 ns      80 ns      5%
ETH+IP+TCP header parse           15 ns          20 ns      30 ns      2%
Connection lookup (hash)          12 ns          18 ns      25 ns      2%
TCP state machine                 18 ns          25 ns      40 ns      3%
FIX message parse                 82 ns         110 ns     150 ns     12%
Strategy decision               450 ns         800 ns    1400 ns     65%
FIX order serialization           45 ns          60 ns      80 ns      7%
TX packet build + NIC submit      30 ns          45 ns      70 ns      4%
────────────────────────────────────────────────────────────────────────────────
Total wire-to-wire               687 ns        1128 ns    1875 ns    100%

The strategy decision dominates — 65% of wire-to-wire latency. The entire networking stack (TCP + FIX + TX) accounts for roughly 30%. This is the uncomfortable truth about custom TCP stacks: even if you made TCP processing instant (0ns), your total latency drops from 687ns to 669ns. The marginal return on TCP optimization is small compared to optimizing your strategy or FIX parsing.

The strongest argument for custom TCP isn’t median latency — it’s tail latency determinism. The p99.9/p50 ratio for our custom stack is 2.7×. For a tuned kernel, it’s typically 3–4×. For a default kernel, it’s 7–8×. Tight tails matter more than fast medians in HFT.


Production Lessons

After 18 months of operating this system:

What worked: The Mbuf ownership model prevented every buffer leak bug we'd seen in previous C codebases. The SPSC rings never caused a single incident. The ECN-based rate control (added after the PFC storm) prevented two subsequent near-incidents. Rust's panic = "abort" meant that when things did go wrong, we failed fast rather than corrupting state.

What hurt: Debugging is brutal. There’s no tcpdump because the kernel can't see our packets. We built a custom packet capture tool that mirrors a fraction of traffic to a separate NIC for analysis. FIX sequence number recovery after a strategy crash required careful coordination between the TCP layer (which maintained the connection) and the FIX session layer (which needed to re-negotiate sequences). We underestimated this by months.

What we’d do differently: Start with F-Stack and only replace specific components (ACK timing, FIX parsing) where profiling shows gains. The custom TCP state machine is the least valuable part of the system per engineering hour invested.


Conclusion

Building a custom TCP handler in Rust with DPDK achieves wire-to-wire latencies under 2 microseconds with tight tail distributions — roughly 5× better than a properly tuned kernel stack. Rust’s ownership model provides genuine value at the DPDK FFI boundary, though it doesn’t make unsafe code safe; it makes the safe wrappers harder to misuse.

But this is an extreme measure justified by extreme requirements. The engineering cost is measured in person-years, the operational burden is permanent, and the marginal latency gain over off-the-shelf kernel-bypass solutions (OpenOnload, VMA, F-Stack) is small. The strongest case for building custom isn’t performance — it’s control: over ACK timing, over rate limiting, over every byte that touches the wire.

If you’re exploring this space, start by tuning the kernel. Then try OpenOnload or VMA. Then try F-Stack. Only then, if profiling shows that the generic TCP stack is your bottleneck and you have the team to maintain a custom one, consider what this article describes.

The unspoken truth of HFT networking: the firms with the fastest systems got there by buying better hardware and colocating closer, not by writing better TCP state machines. Software optimization is what you do after you’ve exhausted those options.

You can access the full implementation on the Github repo here.

Practice what you learned

Reinforce this article with hands-on coding exercises and AI-powered feedback.

View all exercises

Want to practice Rust hands-on?

Go beyond reading — solve interactive exercises with AI-powered code review on Rust Lab.