Python Project Structure for Data Pipelines

Why Structure Matters

Start with one Python script. Works fine.

Add more features. The script grows to 500 lines. Then 1000. Then you can’t find anything. Changing one thing breaks three others. New team members don’t know where to look.

Poor structure = unmaintainable code = wasted time.

Good structure = self-documenting = easy to maintain.

The Problem: Single Script Pipelines

pipeline.py  # 1200 lines of everything

Everything in one file:

  • Database connections
  • Extraction logic
  • Transformation functions
  • Loading functions
  • Configuration
  • Utility functions

This doesn’t scale.

Solution: Organized Project Structure

data-pipeline/
├── README.md
├── requirements.txt
├── setup.py
├── .env.example
├── .gitignore
├── config/
│   ├── __init__.py
│   └── settings.py
├── src/
│   ├── __init__.py
│   ├── extract.py
│   ├── transform.py
│   ├── load.py
│   └── utils.py
├── tests/
│   ├── __init__.py
│   ├── test_extract.py
│   ├── test_transform.py
│   └── test_load.py
└── scripts/
    ├── run_daily_pipeline.py
    └── backfill_date_range.py

Clear separation. Each file has one purpose.

config/settings.py: Configuration

"""
Pipeline configuration.
Load from environment variables.
"""

import os
from dataclasses import dataclass

@dataclass
class DatabaseConfig:
    host: str
    port: int
    database: str
    user: str
    password: str

    @property
    def connection_string(self) -> str:
        return f"postgresql://{self.user}:{self.password}@{self.host}:{self.port}/{self.database}"


@dataclass
class PipelineConfig:
    source_db: DatabaseConfig
    target_db: DatabaseConfig
    batch_size: int
    max_retries: int


def load_config() -> PipelineConfig:
    """Load configuration from environment variables."""

    source_db = DatabaseConfig(
        host=os.getenv('SOURCE_DB_HOST', 'localhost'),
        port=int(os.getenv('SOURCE_DB_PORT', '5432')),
        database=os.getenv('SOURCE_DB_NAME', 'production'),
        user=os.getenv('SOURCE_DB_USER', 'user'),
        password=os.getenv('SOURCE_DB_PASSWORD', '')
    )

    target_db = DatabaseConfig(
        host=os.getenv('TARGET_DB_HOST', 'localhost'),
        port=int(os.getenv('TARGET_DB_PORT', '5432')),
        database=os.getenv('TARGET_DB_NAME', 'warehouse'),
        user=os.getenv('TARGET_DB_USER', 'user'),
        password=os.getenv('TARGET_DB_PASSWORD', '')
    )

    return PipelineConfig(
        source_db=source_db,
        target_db=target_db,
        batch_size=int(os.getenv('BATCH_SIZE', '10000')),
        max_retries=int(os.getenv('MAX_RETRIES', '3'))
    )

Configuration separate from logic. Easy to change. No hardcoded values.

src/extract.py: Extraction Logic

"""
Data extraction from source systems.
"""

import pandas as pd
from sqlalchemy import create_engine, text
from datetime import date
import logging

logger = logging.getLogger(__name__)


def extract_orders(connection_string: str, process_date: date) -> pd.DataFrame:
    """
    Extract orders for a specific date.

    Args:
        connection_string: Database connection string
        process_date: Date to extract

    Returns:
        DataFrame with orders
    """
    logger.info(f"Extracting orders for {process_date}")

    engine = create_engine(connection_string)

    query = text("""
        SELECT
            order_id,
            customer_id,
            product_id,
            amount,
            order_date,
            status
        FROM orders
        WHERE DATE(order_date) = :process_date
    """)

    df = pd.read_sql(query, engine, params={'process_date': process_date})

    logger.info(f"Extracted {len(df)} orders")

    return df

src/transform.py: Transformation Logic

"""
Data transformations.
"""

import pandas as pd
import logging

logger = logging.getLogger(__name__)


def clean_orders(df: pd.DataFrame) -> pd.DataFrame:
    """
    Clean orders data.

    - Remove duplicates
    - Drop nulls in critical fields
    - Standardize status values
    """
    logger.info(f"Cleaning {len(df)} orders")

    # Remove duplicates
    original_count = len(df)
    df = df.drop_duplicates(subset=['order_id'])
    duplicates_removed = original_count - len(df)
    if duplicates_removed > 0:
        logger.warning(f"Removed {duplicates_removed} duplicate orders")

    # Drop nulls
    df = df.dropna(subset=['order_id', 'customer_id', 'amount'])

    # Standardize status
    status_map = {
        'complete': 'completed',
        'done': 'completed',
        'cancelled': 'canceled'
    }
    df['status'] = df['status'].str.lower().replace(status_map)

    logger.info(f"Cleaned data: {len(df)} orders remaining")

    return df


def calculate_metrics(df: pd.DataFrame) -> pd.DataFrame:
    """
    Calculate derived metrics.
    """
    df = df.copy()

    # Flag high-value orders
    df['is_high_value'] = df['amount'] > 1000

    # Extract date parts
    df['order_month'] = pd.to_datetime(df['order_date']).dt.to_period('M')
    df['order_year'] = pd.to_datetime(df['order_date']).dt.year

    return df

src/load.py: Loading Logic

"""
Data loading to target systems.
"""

import pandas as pd
from sqlalchemy import create_engine, text
from datetime import date
import logging

logger = logging.getLogger(__name__)


def load_orders(df: pd.DataFrame, connection_string: str, process_date: date):
    """
    Load orders to warehouse (idempotent).

    Args:
        df: DataFrame to load
        connection_string: Target database connection
        process_date: Date partition to load
    """
    logger.info(f"Loading {len(df)} orders for {process_date}")

    engine = create_engine(connection_string)

    with engine.begin() as conn:
        # Delete existing data for this date (idempotent)
        conn.execute(
            text("DELETE FROM warehouse.orders WHERE DATE(order_date) = :date"),
            {'date': process_date}
        )
        logger.info(f"Deleted existing orders for {process_date}")

    # Insert new data
    df.to_sql(
        'orders',
        engine,
        schema='warehouse',
        if_exists='append',
        index=False
    )

    logger.info(f"Successfully loaded {len(df)} orders")

scripts/run_daily_pipeline.py: Orchestration

"""
Run daily orders pipeline.
"""

import sys
import logging
from datetime import datetime, timedelta
from pathlib import Path

# Add src to path
sys.path.insert(0, str(Path(__file__).parent.parent))

from src.extract import extract_orders
from src.transform import clean_orders, calculate_metrics
from src.load import load_orders
from config.settings import load_config

# Setup logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)


def run_pipeline(process_date: datetime.date):
    """
    Run the full pipeline for a specific date.
    """
    logger.info(f"Starting pipeline for {process_date}")

    # Load config
    config = load_config()

    try:
        # Extract
        df = extract_orders(
            config.source_db.connection_string,
            process_date
        )

        # Transform
        df = clean_orders(df)
        df = calculate_metrics(df)

        # Load
        load_orders(
            df,
            config.target_db.connection_string,
            process_date
        )

        logger.info(f"Pipeline completed successfully for {process_date}")

    except Exception as e:
        logger.error(f"Pipeline failed: {e}")
        raise


if __name__ == '__main__':
    # Default: yesterday
    process_date = datetime.now().date() - timedelta(days=1)

    # Or from command line: python run_daily_pipeline.py 2025-01-15
    if len(sys.argv) > 1:
        process_date = datetime.strptime(sys.argv[1], '%Y-%m-%d').date()

    run_pipeline(process_date)

scripts/backfill_date_range.py: Backfilling

"""
Backfill pipeline for a date range.
"""

import sys
from datetime import datetime, timedelta
from pathlib import Path

sys.path.insert(0, str(Path(__file__).parent.parent))

from scripts.run_daily_pipeline import run_pipeline


def backfill(start_date: datetime.date, end_date: datetime.date):
    """Backfill pipeline for date range."""

    current_date = start_date

    while current_date <= end_date:
        print(f"Processing {current_date}")

        try:
            run_pipeline(current_date)
        except Exception as e:
            print(f"Failed for {current_date}: {e}")
            # Continue with next date

        current_date += timedelta(days=1)


if __name__ == '__main__':
    if len(sys.argv) != 3:
        print("Usage: python backfill_date_range.py START_DATE END_DATE")
        print("Example: python backfill_date_range.py 2025-01-01 2025-01-31")
        sys.exit(1)

    start = datetime.strptime(sys.argv[1], '%Y-%m-%d').date()
    end = datetime.strptime(sys.argv[2], '%Y-%m-%d').date()

    backfill(start, end)

requirements.txt

pandas==2.1.0
sqlalchemy==2.0.23
psycopg2-binary==2.9.9
python-dotenv==1.0.0
pytest==7.4.3

.env.example

# Source database
SOURCE_DB_HOST=localhost
SOURCE_DB_PORT=5432
SOURCE_DB_NAME=production
SOURCE_DB_USER=user
SOURCE_DB_PASSWORD=password

# Target database
TARGET_DB_HOST=localhost
TARGET_DB_PORT=5432
TARGET_DB_NAME=warehouse
TARGET_DB_USER=user
TARGET_DB_PASSWORD=password

# Pipeline settings
BATCH_SIZE=10000
MAX_RETRIES=3

README.md

# Orders Data Pipeline

Daily pipeline to extract, transform, and load orders data.

## Setup

```bash
# Install dependencies
pip install -r requirements.txt

# Configure environment
cp .env.example .env
# Edit .env with your database credentials

Usage

# Run for yesterday
python scripts/run_daily_pipeline.py

# Run for specific date
python scripts/run_daily_pipeline.py 2025-01-15

# Backfill date range
python scripts/backfill_date_range.py 2025-01-01 2025-01-31

Testing

pytest tests/

## Why This Structure Works

**Separation of concerns**: Extract, transform, load are independent.

**Testable**: Each module can be tested separately.

**Reusable**: transform.py can be used by multiple pipelines.

**Clear entry points**: scripts/ folder shows how to run things.

**Configuration separate**: Change databases without touching code.

**Scalable**: Add new pipelines without mess.

## What to Avoid

**Don't:** Mix configuration and logic.
**Don't:** Put everything in `utils.py` (vague name = dumping ground).
**Don't:** Create deep nested folders (flat is better).
**Don't:** Name files `helper.py` or `common.py` (be specific).

## Summary

Structure your Python pipelines:
- config/ for settings
- src/ for pipeline logic
- tests/ for tests
- scripts/ for entry points

Organized code = maintainable code = productive team.

Need help implementing this in your company?

For delivery-focused missions (Data Engineering, Architecture Data, Data Product Owner), visit ISData Consulting.