Apache Airflow: Orchestrate Your Data Pipelines

What is Apache Airflow?

Airflow schedules and monitors your data pipelines.

You tell it: “Run this pipeline daily at 2 AM. If it fails, retry 3 times. Alert me if it still fails.”

Airflow handles everything else.

The Problem It Solves

Manual execution fails. You forget to run things. You run them twice. You can’t see what’s happening.

Airflow automates and visualizes your entire workflow.

Key Concepts

DAG (Directed Acyclic Graph): Your workflow definition. It’s code that describes what runs when.

Task: A single step in your DAG. Could be: run SQL query, call API, move files, send email.

Operator: The type of task. BashOperator runs bash commands. PythonOperator runs Python functions. PostgresOperator runs SQL.

Schedule: When your DAG runs. Daily. Hourly. Every Monday at 9 AM. Whatever you need.

Simple DAG Example

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime

def extract():
    print("Extracting data...")

def transform():
    print("Transforming data...")

def load():
    print("Loading data...")

with DAG(
    dag_id="simple_etl",
    start_date=datetime(2024, 1, 1),
    schedule="@daily",
    catchup=False
) as dag:

    extract_task = PythonOperator(
        task_id="extract",
        python_callable=extract
    )

    transform_task = PythonOperator(
        task_id="transform",
        python_callable=transform
    )

    load_task = PythonOperator(
        task_id="load",
        python_callable=load
    )

    extract_task >> transform_task >> load_task

This DAG runs daily. Extract first. Then transform. Then load.

The Airflow UI

Airflow provides a web interface. You can:

  • See all your DAGs
  • Check run history
  • View logs for each task
  • Manually trigger runs
  • See task dependencies visually

This visibility is valuable. You know exactly what ran, when, and if it succeeded.

When to Use Airflow

Good for:

  • Scheduled batch pipelines
  • Multi-step workflows
  • Complex dependencies
  • When you need monitoring and alerting

Not ideal for:

  • Real-time streaming (use Kafka instead)
  • Simple one-off scripts
  • Very small projects

Installation

pip install apache-airflow

# Initialize database
airflow db init

# Create admin user
airflow users create \
    --username admin \
    --password admin \
    --firstname Admin \
    --lastname User \
    --role Admin \
    --email admin@example.com

# Start webserver
airflow webserver --port 8080

# Start scheduler (in another terminal)
airflow scheduler

Common Patterns

Branching: Run different tasks based on conditions.

Sensors: Wait for something to happen (file arrives, API responds).

XCom: Pass data between tasks.

Pools: Limit concurrent task execution.

Variables: Store configuration values.

Real-World DAG

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.operators.email import EmailOperator
from datetime import datetime, timedelta

default_args = {
    "owner": "data-team",
    "retries": 3,
    "retry_delay": timedelta(minutes=5),
    "email_on_failure": True,
    "email": ["alerts@company.com"]
}

with DAG(
    dag_id="sales_report",
    default_args=default_args,
    start_date=datetime(2024, 1, 1),
    schedule="0 6 * * *",  # Every day at 6 AM
    catchup=False
) as dag:

    extract_sales = PostgresOperator(
        task_id="extract_sales",
        postgres_conn_id="sales_db",
        sql="SELECT * FROM sales WHERE date = '{{ ds }}'"
    )

    calculate_metrics = PythonOperator(
        task_id="calculate_metrics",
        python_callable=calculate_daily_metrics
    )

    send_report = EmailOperator(
        task_id="send_report",
        to="manager@company.com",
        subject="Daily Sales Report {{ ds }}",
        html_content="See attached report."
    )

    extract_sales >> calculate_metrics >> send_report

This DAG extracts sales data, calculates metrics, sends email report. Daily at 6 AM. Retries on failure. Alerts on persistent failure.

Best Practices

Keep DAGs simple: One DAG, one purpose. Don’t build mega-DAGs.

Idempotent tasks: Running twice should produce the same result.

Use connections: Don’t hardcode credentials in DAGs.

Test locally: Validate DAG syntax before deploying.

Monitor: Check the Airflow UI regularly. Set up alerts.

Summary

Airflow orchestrates data pipelines. You define workflows as code. Airflow schedules, executes, and monitors them.

It’s complex to set up. But for production data pipelines, it’s essential.

Start simple. One DAG. Few tasks. Expand as needed.

Need help implementing this in your company?

For delivery-focused missions (Data Engineering, Architecture Data, Data Product Owner), visit ISData Consulting.