MinIO et Airflow : construire un data lake local
Pourquoi un data lake local
Travailler avec S3 en production, c’est standard. Mais développer directement sur AWS coûte cher et ralentit les itérations. MinIO résout ça : un stockage objet S3-compatible qui tourne en local.
Combiné à Airflow, on obtient un environnement de développement complet :
- Stockage objet (landing, staging, curated)
- Orchestration des pipelines
- Tests reproductibles sans accès cloud
L’architecture
[Sources]
↓
[Airflow DAGs]
↓
[MinIO buckets]
├── landing/ (données brutes)
├── staging/ (données nettoyées)
└── curated/ (données prêtes à consommer)
Airflow orchestre les DAGs qui lisent et écrivent dans MinIO via le protocole S3. Le code est identique à celui qu’on déploiera en production sur AWS.
Mise en place avec Docker Compose
Un fichier docker-compose.yml suffit pour faire tourner les deux. La commande standalone d’Airflow initialise la base, démarre scheduler et webserver en un seul processus :
services:
minio:
image: minio/minio:RELEASE.2024-11-07T00-52-20Z
command: server /data --console-address ":9001"
ports:
- "9000:9000"
- "9001:9001"
environment:
MINIO_ROOT_USER: minioadmin
MINIO_ROOT_PASSWORD: minioadmin123
airflow:
image: apache/airflow:2.10.5
command: standalone
depends_on:
- minio
environment:
AIRFLOW__CORE__EXECUTOR: SequentialExecutor
AWS_ACCESS_KEY_ID: minioadmin
AWS_SECRET_ACCESS_KEY: minioadmin123
AWS_ENDPOINT_URL: http://minio:9000
ports:
- "8080:8080"
volumes:
- ./dags:/opt/airflow/dags
airflow standalone migre la base, crée un utilisateur admin (mot de passe affiché dans les logs au premier démarrage) et lance scheduler + webserver. Figez les tags sur des versions validées — évitez latest en CI/production. Pour la prod, préférez un Compose multi-services avec PostgreSQL et scheduler séparé.
Connexion Airflow → MinIO
Dans Airflow, créer une connexion S3 :
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
hook = S3Hook(aws_conn_id="minio_conn")
hook.load_string(
string_data="test",
key="landing/test.txt",
bucket_name="data-lake"
)
Ou via l’UI Airflow :
- Conn Type : Amazon Web Services
- Extra :
{"endpoint_url": "http://minio:9000"}
Un DAG concret
Pipeline classique : ingestion → nettoyage → agrégation.
import io
from datetime import datetime
import boto3
import pandas as pd
from airflow import DAG
from airflow.operators.python import PythonOperator
BUCKET = "data-lake"
def _get_s3():
"""Client initialisé à l'intérieur de la fonction, pas au niveau module."""
return boto3.client("s3") # lit AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, AWS_ENDPOINT_URL
def ingest(**context):
execution_date = context["ds"] # format YYYY-MM-DD
s3 = _get_s3()
raw_csv = "order_id,customer_id,amount\n1,101,150.00\n2,102,200.00"
s3.put_object(
Bucket=BUCKET,
Key=f"landing/orders/{execution_date}.csv",
Body=raw_csv.encode(),
)
def clean(**context):
execution_date = context["ds"]
s3 = _get_s3()
obj = s3.get_object(Bucket=BUCKET, Key=f"landing/orders/{execution_date}.csv")
df = pd.read_csv(io.BytesIO(obj["Body"].read()))
if df.empty:
raise ValueError(f"Fichier vide ou corrompu pour {execution_date}")
expected_cols = {"order_id", "customer_id", "amount"}
if not expected_cols.issubset(df.columns):
raise ValueError(f"Colonnes manquantes: {expected_cols - set(df.columns)}")
df = df.dropna().drop_duplicates(subset=["order_id"])
buffer = io.BytesIO()
df.to_parquet(buffer, index=False)
s3.put_object(
Bucket=BUCKET,
Key=f"staging/orders/{execution_date}.parquet",
Body=buffer.getvalue(),
)
with DAG(
"orders_pipeline",
start_date=datetime(2024, 1, 15),
schedule="@daily",
catchup=False,
):
ingest_task = PythonOperator(task_id="ingest", python_callable=ingest)
clean_task = PythonOperator(task_id="clean", python_callable=clean)
ingest_task >> clean_task
Organisation des buckets
La convention de nommage des clés structure le lake :
data-lake/
├── landing/
│ └── orders/
│ └── 2024-01-15.csv
├── staging/
│ └── orders/
│ └── 2024-01-15.parquet
└── curated/
└── orders_daily/
└── 2024-01-15.parquet
landing/ : données brutes, jamais modifiées. staging/ : données nettoyées et typées. curated/ : données agrégées, prêtes pour les consommateurs.
Passer en production
L’intérêt de cette architecture : le passage en production est trivial.
- Remplacer l’endpoint MinIO par S3
- Mettre les credentials dans un secret manager
- Déployer les mêmes DAGs
Le code ne change pas. Seule la configuration change.
Les limites
- MinIO en local : pas de haute disponibilité (c’est du dev)
- Airflow LocalExecutor : suffisant pour le dev, pas pour la prod
- Les credentials en dur dans le Compose ne passent jamais en prod
En résumé
MinIO + Airflow en local, c’est un data lake de développement complet. Même protocole que la prod, mêmes outils, mêmes patterns. Développez en local, déployez sur le cloud.