MinIO et Airflow : construire un data lake local

Pourquoi un data lake local

Travailler avec S3 en production, c’est standard. Mais développer directement sur AWS coûte cher et ralentit les itérations. MinIO résout ça : un stockage objet S3-compatible qui tourne en local.

Combiné à Airflow, on obtient un environnement de développement complet :

  • Stockage objet (landing, staging, curated)
  • Orchestration des pipelines
  • Tests reproductibles sans accès cloud

L’architecture

[Sources]
    ↓
[Airflow DAGs]
    ↓
[MinIO buckets]
    ├── landing/      (données brutes)
    ├── staging/      (données nettoyées)
    └── curated/      (données prêtes à consommer)

Airflow orchestre les DAGs qui lisent et écrivent dans MinIO via le protocole S3. Le code est identique à celui qu’on déploiera en production sur AWS.

Mise en place avec Docker Compose

Un fichier docker-compose.yml suffit pour faire tourner les deux. La commande standalone d’Airflow initialise la base, démarre scheduler et webserver en un seul processus :

services:
  minio:
    image: minio/minio:RELEASE.2024-11-07T00-52-20Z
    command: server /data --console-address ":9001"
    ports:
      - "9000:9000"
      - "9001:9001"
    environment:
      MINIO_ROOT_USER: minioadmin
      MINIO_ROOT_PASSWORD: minioadmin123

  airflow:
    image: apache/airflow:2.10.5
    command: standalone
    depends_on:
      - minio
    environment:
      AIRFLOW__CORE__EXECUTOR: SequentialExecutor
      AWS_ACCESS_KEY_ID: minioadmin
      AWS_SECRET_ACCESS_KEY: minioadmin123
      AWS_ENDPOINT_URL: http://minio:9000
    ports:
      - "8080:8080"
    volumes:
      - ./dags:/opt/airflow/dags

airflow standalone migre la base, crée un utilisateur admin (mot de passe affiché dans les logs au premier démarrage) et lance scheduler + webserver. Figez les tags sur des versions validées — évitez latest en CI/production. Pour la prod, préférez un Compose multi-services avec PostgreSQL et scheduler séparé.

Connexion Airflow → MinIO

Dans Airflow, créer une connexion S3 :

from airflow.providers.amazon.aws.hooks.s3 import S3Hook

hook = S3Hook(aws_conn_id="minio_conn")
hook.load_string(
    string_data="test",
    key="landing/test.txt",
    bucket_name="data-lake"
)

Ou via l’UI Airflow :

  • Conn Type : Amazon Web Services
  • Extra : {"endpoint_url": "http://minio:9000"}

Un DAG concret

Pipeline classique : ingestion → nettoyage → agrégation.

import io
from datetime import datetime

import boto3
import pandas as pd
from airflow import DAG
from airflow.operators.python import PythonOperator

BUCKET = "data-lake"

def _get_s3():
    """Client initialisé à l'intérieur de la fonction, pas au niveau module."""
    return boto3.client("s3")  # lit AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, AWS_ENDPOINT_URL


def ingest(**context):
    execution_date = context["ds"]  # format YYYY-MM-DD
    s3 = _get_s3()
    raw_csv = "order_id,customer_id,amount\n1,101,150.00\n2,102,200.00"
    s3.put_object(
        Bucket=BUCKET,
        Key=f"landing/orders/{execution_date}.csv",
        Body=raw_csv.encode(),
    )


def clean(**context):
    execution_date = context["ds"]
    s3 = _get_s3()
    obj = s3.get_object(Bucket=BUCKET, Key=f"landing/orders/{execution_date}.csv")
    df = pd.read_csv(io.BytesIO(obj["Body"].read()))
    if df.empty:
        raise ValueError(f"Fichier vide ou corrompu pour {execution_date}")
    expected_cols = {"order_id", "customer_id", "amount"}
    if not expected_cols.issubset(df.columns):
        raise ValueError(f"Colonnes manquantes: {expected_cols - set(df.columns)}")
    df = df.dropna().drop_duplicates(subset=["order_id"])
    buffer = io.BytesIO()
    df.to_parquet(buffer, index=False)
    s3.put_object(
        Bucket=BUCKET,
        Key=f"staging/orders/{execution_date}.parquet",
        Body=buffer.getvalue(),
    )


with DAG(
    "orders_pipeline",
    start_date=datetime(2024, 1, 15),
    schedule="@daily",
    catchup=False,
):
    ingest_task = PythonOperator(task_id="ingest", python_callable=ingest)
    clean_task = PythonOperator(task_id="clean", python_callable=clean)
    ingest_task >> clean_task

Organisation des buckets

La convention de nommage des clés structure le lake :

data-lake/
├── landing/
│   └── orders/
│       └── 2024-01-15.csv
├── staging/
│   └── orders/
│       └── 2024-01-15.parquet
└── curated/
    └── orders_daily/
        └── 2024-01-15.parquet

landing/ : données brutes, jamais modifiées. staging/ : données nettoyées et typées. curated/ : données agrégées, prêtes pour les consommateurs.

Passer en production

L’intérêt de cette architecture : le passage en production est trivial.

  1. Remplacer l’endpoint MinIO par S3
  2. Mettre les credentials dans un secret manager
  3. Déployer les mêmes DAGs

Le code ne change pas. Seule la configuration change.

Les limites

  • MinIO en local : pas de haute disponibilité (c’est du dev)
  • Airflow LocalExecutor : suffisant pour le dev, pas pour la prod
  • Les credentials en dur dans le Compose ne passent jamais en prod

En résumé

MinIO + Airflow en local, c’est un data lake de développement complet. Même protocole que la prod, mêmes outils, mêmes patterns. Développez en local, déployez sur le cloud.