Testing Data Pipelines: What Actually Matters

Why Data Pipelines Need Tests

Untested pipelines break in production. You find out when the CEO asks why revenue dropped 50% overnight.

The data didn’t change. Your pipeline broke. Nobody noticed until it was too late.

Tests prevent this. Not all tests—some tests waste time. But the right tests catch problems before production.

Three Levels of Testing

Level 1: Unit Tests Test individual functions. Does this transformation work correctly?

Level 2: Integration Tests Test the full pipeline. Does extract→transform→load work end-to-end?

Level 3: Data Tests Test the data itself. Does the output make business sense?

Most data engineers focus on Level 1. That’s a mistake. Level 3 matters more.

What to Test (and What Not To)

Test this:

  • Business logic transformations
  • Data quality rules
  • Known edge cases
  • Schema expectations

Don’t test this:

  • Library functions (Pandas already tested)
  • Database connections (integration tests cover this)
  • Obvious code (if it’s trivial, skip it)

Unit Tests: Testing Transformations

"""
Test your transformation logic.
"""

import pandas as pd
import pytest

def calculate_customer_ltv(orders: pd.DataFrame) -> pd.DataFrame:
    """Calculate customer lifetime value."""
    return (
        orders
        .groupby('customer_id')
        .agg({
            'amount': 'sum',
            'order_id': 'count'
        })
        .rename(columns={'amount': 'total_spent', 'order_id': 'order_count'})
        .reset_index()
    )


def test_ltv_calculation():
    """Test LTV calculation with known data."""
    orders = pd.DataFrame({
        'customer_id': [1, 1, 2, 2, 2],
        'order_id': [101, 102, 103, 104, 105],
        'amount': [100.0, 50.0, 200.0, 150.0, 50.0]
    })

    result = calculate_customer_ltv(orders)

    assert len(result) == 2
    assert result[result['customer_id'] == 1]['total_spent'].iloc[0] == 150.0
    assert result[result['customer_id'] == 1]['order_count'].iloc[0] == 2
    assert result[result['customer_id'] == 2]['total_spent'].iloc[0] == 400.0
    assert result[result['customer_id'] == 2]['order_count'].iloc[0] == 3


def test_ltv_handles_empty_data():
    """Test LTV with no orders."""
    orders = pd.DataFrame(columns=['customer_id', 'order_id', 'amount'])
    result = calculate_customer_ltv(orders)

    assert len(result) == 0
    assert list(result.columns) == ['customer_id', 'total_spent', 'order_count']


def test_ltv_handles_single_customer():
    """Test LTV with one customer."""
    orders = pd.DataFrame({
        'customer_id': [1],
        'order_id': [101],
        'amount': [100.0]
    })

    result = calculate_customer_ltv(orders)

    assert len(result) == 1
    assert result['total_spent'].iloc[0] == 100.0

These tests are fast. Run them every commit. They catch logic errors immediately.

Integration Tests: Testing the Full Pipeline

"""
Test the complete pipeline with a test database.
"""

import pytest
from sqlalchemy import create_engine, text
import pandas as pd

from pipeline import extract, transform, load


@pytest.fixture
def test_db():
    """Create a test database."""
    engine = create_engine('postgresql://test:test@localhost/test_db')

    # Setup
    with engine.begin() as conn:
        conn.execute(text('''
            CREATE TABLE IF NOT EXISTS orders (
                id INTEGER PRIMARY KEY,
                customer_id INTEGER,
                amount DECIMAL,
                order_date DATE
            )
        '''))

        conn.execute(text('''
            INSERT INTO orders VALUES
            (1, 1, 100.0, '2025-01-01'),
            (2, 1, 50.0, '2025-01-02'),
            (3, 2, 200.0, '2025-01-03')
        '''))

    yield engine

    # Teardown
    with engine.begin() as conn:
        conn.execute(text('DROP TABLE IF EXISTS orders CASCADE'))
        conn.execute(text('DROP TABLE IF EXISTS customer_summary CASCADE'))


def test_full_pipeline(test_db):
    """Test extract→transform→load pipeline."""

    # Run pipeline
    df = extract(test_db)
    transformed = transform(df)
    load(transformed, test_db, 'customer_summary')

    # Verify results
    with test_db.connect() as conn:
        result = pd.read_sql('SELECT * FROM customer_summary ORDER BY customer_id', conn)

    assert len(result) == 2
    assert result.loc[0, 'customer_id'] == 1
    assert result.loc[0, 'total_spent'] == 150.0
    assert result.loc[1, 'customer_id'] == 2
    assert result.loc[1, 'total_spent'] == 200.0

Integration tests are slower but catch connection issues, SQL errors, schema problems.

Data Tests: Testing the Output

These tests check if the data makes business sense.

"""
Data quality tests - run after pipeline completes.
"""

def test_revenue_within_expected_range(warehouse_engine):
    """Revenue should be within historical norms."""
    query = """
    SELECT SUM(amount) as total_revenue
    FROM orders
    WHERE order_date = CURRENT_DATE
    """

    result = pd.read_sql(query, warehouse_engine)
    daily_revenue = result['total_revenue'].iloc[0]

    # Historical average is $50k/day, should be within 3x
    assert 10_000 < daily_revenue < 150_000, f"Revenue ${daily_revenue} is suspicious"


def test_no_future_dates(warehouse_engine):
    """Orders shouldn't have future dates."""
    query = """
    SELECT COUNT(*) as future_count
    FROM orders
    WHERE order_date > CURRENT_DATE
    """

    result = pd.read_sql(query, warehouse_engine)
    assert result['future_count'].iloc[0] == 0, "Found orders with future dates"


def test_no_negative_amounts(warehouse_engine):
    """Order amounts must be positive."""
    query = """
    SELECT COUNT(*) as negative_count
    FROM orders
    WHERE amount < 0
    """

    result = pd.read_sql(query, warehouse_engine)
    assert result['negative_count'].iloc[0] == 0, "Found negative order amounts"


def test_referential_integrity(warehouse_engine):
    """All orders must reference valid customers."""
    query = """
    SELECT COUNT(*) as orphan_count
    FROM orders o
    LEFT JOIN customers c ON o.customer_id = c.id
    WHERE c.id IS NULL
    """

    result = pd.read_sql(query, warehouse_engine)
    assert result['orphan_count'].iloc[0] == 0, "Found orders with invalid customer_id"

These tests catch data problems that unit tests miss.

How to Run Tests

# Unit tests (fast, run always)
pytest tests/unit/

# Integration tests (slower, run before deploy)
pytest tests/integration/

# Data tests (after pipeline runs in production)
pytest tests/data/ --db=production

What I Don’t Test

Library code: Pandas is already tested. Don’t test df.groupby().

Configuration: Testing DATABASE_URL = "postgres://..." wastes time.

Trivial code: If it’s 2 lines and obvious, skip it.

Everything: 100% coverage is pointless. Test what matters.

When Tests Are Worth It

Worth testing:

  • Revenue calculations
  • Customer matching logic
  • Data quality rules
  • Complex transformations

Not worth testing:

  • Simple field renaming
  • Direct pass-through functions
  • Database connection setup
  • Logging statements

Tests That Actually Run

Tests that never run are useless.

# .github/workflows/test.yml
name: Test Pipeline

on: [push, pull_request]

jobs:
  test:
    runs-on: ubuntu-latest

    services:
      postgres:
        image: postgres:15
        env:
          POSTGRES_PASSWORD: test
        options: >-
          --health-cmd pg_isready
          --health-interval 10s

    steps:
      - uses: actions/checkout@v3

      - name: Setup Python
        uses: actions/setup-python@v4
        with:
          python-version: '3.11'

      - name: Install dependencies
        run: pip install -r requirements.txt

      - name: Run tests
        run: pytest tests/

Tests run automatically on every commit. Problems caught immediately.

Summary

Test data pipelines. But test smart:

  • Unit tests for transformation logic
  • Integration tests for the full pipeline
  • Data tests for business rules

Don’t test everything. Test what breaks in production.

Tests slow you down initially. They save you later.

Need help implementing this in your company?

For delivery-focused missions (Data Engineering, Architecture Data, Data Product Owner), visit ISData Consulting.