Idempotent Pipelines: Run Twice, Get Same Result

What Idempotency Means

An idempotent pipeline produces the same result when run multiple times with the same input.

Run it once: 100 records loaded. Run it again: Still 100 records, not 200.

Why It Matters

Pipelines fail. Network issues. Database timeouts. Out of memory. Bugs.

When a pipeline fails halfway through, you need to rerun it. If it’s not idempotent, you get:

  • Duplicate records
  • Wrong totals
  • Corrupted aggregations
  • Lost trust

Idempotent pipelines are safe to rerun. Always.

The Problem: Non-Idempotent Pipeline

def load_orders(df, engine):
    """Load orders to warehouse - WRONG."""
    df.to_sql('orders', engine, if_exists='append', index=False)

Run this twice with the same data:

  • First run: 100 records
  • Second run: 200 records (100 duplicates)

This breaks everything downstream.

Solution 1: Replace the Whole Table

def load_orders_replace(df, engine):
    """Load orders - replace entire table."""
    df.to_sql('orders', engine, if_exists='replace', index=False)

Pros: Simple. Always idempotent. Cons: Slow for large tables. Downtime while replacing.

When to use: Small tables, infrequent runs, simplicity matters.

Solution 2: Delete Before Insert

from sqlalchemy import text

def load_orders_delete_first(df, engine, process_date):
    """Load orders - delete date partition first."""

    with engine.begin() as conn:
        # Delete existing data for this date
        conn.execute(
            text("DELETE FROM orders WHERE DATE(order_date) = :date"),
            {'date': process_date}
        )

    # Insert new data
    df.to_sql('orders', engine, if_exists='append', index=False)

Pros: Fast. Safe to rerun. Common pattern. Cons: Slight complexity.

When to use: Partitioned data, daily/hourly loads.

Solution 3: Upsert (Insert or Update)

from sqlalchemy.dialects.postgresql import insert

def upsert_orders(df, engine):
    """Load orders - upsert based on primary key."""

    records = df.to_dict('records')

    with engine.begin() as conn:
        stmt = insert(orders_table).values(records)
        stmt = stmt.on_conflict_do_update(
            index_elements=['order_id'],
            set_={
                'customer_id': stmt.excluded.customer_id,
                'amount': stmt.excluded.amount,
                'order_date': stmt.excluded.order_date,
                'updated_at': func.now()
            }
        )
        conn.execute(stmt)

Pros: Handles updates correctly. True idempotency. Cons: More complex. Slower than delete+insert.

When to use: Data changes over time, need to track updates.

Solution 4: Staging Table Pattern

def load_orders_staging(df, engine, target_table, process_date):
    """Load orders via staging table."""

    staging_table = f"{target_table}_staging"

    # Load to staging
    df.to_sql(staging_table, engine, if_exists='replace', index=False)

    # Atomic swap
    with engine.begin() as conn:
        conn.execute(text(f"BEGIN"))

        # Delete from target
        conn.execute(
            text(f"DELETE FROM {target_table} WHERE DATE(order_date) = :date"),
            {'date': process_date}
        )

        # Insert from staging
        conn.execute(
            text(f"""
            INSERT INTO {target_table}
            SELECT * FROM {staging_table}
            """)
        )

        conn.execute(text(f"COMMIT"))

    # Clean up staging
    with engine.begin() as conn:
        conn.execute(text(f"DROP TABLE {staging_table}"))

Pros: Atomic. Safe. Production-grade. Cons: Most complex.

When to use: Production pipelines, large datasets, need atomicity.

Real Example: Daily Sales Summary

Non-idempotent (broken):

def daily_summary_broken(engine, date):
    """Calculate daily summary - WRONG."""

    query = f"""
    INSERT INTO daily_sales
    SELECT
        DATE(order_date) as date,
        COUNT(*) as orders,
        SUM(amount) as revenue
    FROM orders
    WHERE DATE(order_date) = '{date}'
    GROUP BY DATE(order_date)
    """

    with engine.begin() as conn:
        conn.execute(text(query))

Rerun this: duplicates in daily_sales.

Idempotent (correct):

def daily_summary_idempotent(engine, date):
    """Calculate daily summary - idempotent."""

    with engine.begin() as conn:
        # Delete existing summary for this date
        conn.execute(
            text("DELETE FROM daily_sales WHERE date = :date"),
            {'date': date}
        )

        # Insert new summary
        conn.execute(
            text("""
            INSERT INTO daily_sales
            SELECT
                DATE(order_date) as date,
                COUNT(*) as orders,
                SUM(amount) as revenue
            FROM orders
            WHERE DATE(order_date) = :date
            GROUP BY DATE(order_date)
            """),
            {'date': date}
        )

Rerun this 10 times: same result every time.

Idempotency in Airflow

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime

def process_date_partition(**context):
    """Process a specific date partition idempotently."""

    # Execution date from Airflow
    execution_date = context['ds']  # YYYY-MM-DD string

    # Extract
    df = extract_orders(engine, execution_date)

    # Transform
    transformed = transform(df)

    # Load idempotently (delete + insert)
    load_idempotent(transformed, engine, execution_date)


with DAG(
    'daily_orders',
    start_date=datetime(2025, 1, 1),
    schedule='@daily',
    catchup=True  # Can safely backfill - pipeline is idempotent
) as dag:

    process = PythonOperator(
        task_id='process_orders',
        python_callable=process_date_partition,
        provide_context=True
    )

catchup=True is safe because the pipeline is idempotent. Backfilling works correctly.

Testing Idempotency

def test_pipeline_idempotency(test_engine):
    """Test that running pipeline twice produces same result."""

    test_data = pd.DataFrame({
        'order_id': [1, 2, 3],
        'amount': [100, 200, 150],
        'order_date': ['2025-01-01', '2025-01-01', '2025-01-01']
    })

    # Run pipeline first time
    load_orders_idempotent(test_data, test_engine, '2025-01-01')
    result1 = pd.read_sql('SELECT * FROM orders ORDER BY order_id', test_engine)

    # Run pipeline second time (same data)
    load_orders_idempotent(test_data, test_engine, '2025-01-01')
    result2 = pd.read_sql('SELECT * FROM orders ORDER BY order_id', test_engine)

    # Results should be identical
    assert len(result1) == len(result2) == 3
    assert result1.equals(result2)

When Idempotency Is Hard

Incrementing counters:

-- NOT idempotent
UPDATE metrics SET view_count = view_count + 1 WHERE id = 123;

Solution: Store raw events, calculate aggregates separately.

Timestamps:

# NOT idempotent
df['loaded_at'] = datetime.now()

Solution: Use execution time from orchestrator, not current time.

Random values:

# NOT idempotent
df['sample_group'] = df.apply(lambda x: random.choice(['A', 'B']), axis=1)

Solution: Use deterministic randomness (seed based on record ID).

Summary

Build idempotent pipelines:

  • Run multiple times safely
  • Enable reliable backfilling
  • Simplify error recovery

Patterns:

  • Replace entire table (simple, small data)
  • Delete then insert (common, fast)
  • Upsert (correct updates)
  • Staging table (production-grade)

Idempotency is not optional. It’s how production pipelines work.

Need help implementing this in your company?

For delivery-focused missions (Data Engineering, Architecture Data, Data Product Owner), visit ISData Consulting.