PIPE Data Pipelines
The unglamorous plumbing that makes or breaks your entire trading system
Learning Objectives
- •Understand data pipeline architecture for trading systems
- •Learn data quality, validation, and error handling
- •Build a reliable data ingestion pipeline
Explain Like I'm 5
This is the least exciting topic in quantitative finance, and also the most important. Garbage in, garbage out — literally. If your data is wrong, your model is wrong, your trades are wrong, and your money is gone. Data pipelines are the plumbing: getting data from sources (broker API, data vendors), cleaning it (filling gaps, removing bad ticks), storing it (databases), and serving it to your model. Nobody posts about their data pipeline on social media, but every production system that actually works has a solid one.
Think of It This Way
A data pipeline is the plumbing in a house. Nobody thinks about plumbing until they're standing in sewage water. But without clean water (data) flowing reliably to every faucet (model), the whole house (trading system) is useless. The best quants I know? They're secretly plumbing enthusiasts.
1Pipeline Architecture
2Data Quality Issues
3The Paper Trading Reality Check
Typical Performance Gap: Backtest vs Paper vs Live
4Latency and Where It Hides
Latency Distribution by Component (ms)
5Handling Vendor Lock-in
6Common Pipeline Disasters
Hands-On Code
Data Pipeline with Validation
import numpy as np
import pandas as pd
class DataPipeline:
"""Simple data pipeline with validation."""
def __init__(self, max_gap_bars=5, spike_threshold=5.0):
self.max_gap_bars = max_gap_bars
self.spike_threshold = spike_threshold
def validate(self, df: pd.DataFrame) -> dict:
"""Validate OHLCV data quality."""
issues = []
# Check for missing bars
if hasattr(df.index, 'freq') and df.index.freq:
expected = pd.date_range(df.index[0], df.index[-1], freq=df.index.freq)
missing = len(expected) - len(df)
if missing > 0:
issues.append(f"[WARN] {missing} missing bars detected")
# Check for negative prices
if (df[['open', 'high', 'low', 'close']] <= 0).any().any():
issues.append("[ALERT] Negative or zero prices found")
# Check OHLC consistency
if (df['high'] < df['low']).any():
issues.append("[ALERT] High < Low detected")
# Check for spikes
returns = df['close'].pct_change()
spike_mask = returns.abs() > self.spike_threshold * returns.std()
if spike_mask.any():
issues.append(f"[WARN] {spike_mask.sum()} price spikes detected")
# Check volume
if (df['volume'] == 0).mean() > 0.1:
issues.append(f"[WARN] {(df['volume']==0).mean():.0%} zero-volume bars")
status = "[PASS] CLEAN" if not issues else "[WARN] ISSUES FOUND"
print(f"Data validation: {status}")
for issue in issues:
print(f" {issue}")
return {'clean': len(issues) == 0, 'issues': issues}Validates data BEFORE feeding it to models. Catching data errors early prevents garbage predictions and bad trades. This is not optional — skip this and the consequences are predictable.
Knowledge Check
Q1.Your backtest shows amazing results but uses close prices to compute indicators and generate signals at the same bar's close. What's the problem?
Assignment
Build a data validation pipeline that checks for: missing bars, price spikes, OHLC consistency, and look-ahead bias. Run it on your historical data and fix any issues found.