Monitoring Data Pipelines in Production
Why Monitoring Matters
Your pipeline runs daily. Sometimes it succeeds. Sometimes it fails silently. Sometimes it succeeds but loads bad data.
Without monitoring, you find out when users complain. Too late.
Monitoring answers:
- Did the pipeline run?
- Did it succeed?
- How long did it take?
- Is the data correct?
What to Monitor
Execution monitoring:
- Did pipeline start?
- Did it complete?
- How long did it take?
Data monitoring:
- How many records processed?
- Are values in expected ranges?
- Are there anomalies?
Quality monitoring:
- Are there nulls where there shouldn’t be?
- Are there duplicates?
- Does data match source?
Basic Logging
Start with structured logging:
import logging
import json
from datetime import datetime
class JSONFormatter(logging.Formatter):
"""Format logs as JSON for easy parsing."""
def format(self, record):
log_data = {
'timestamp': datetime.utcnow().isoformat(),
'level': record.levelname,
'message': record.getMessage(),
'module': record.module,
'function': record.funcName
}
# Add extra fields
if hasattr(record, 'pipeline'):
log_data['pipeline'] = record.pipeline
if hasattr(record, 'execution_date'):
log_data['execution_date'] = record.execution_date
if hasattr(record, 'rows_processed'):
log_data['rows_processed'] = record.rows_processed
return json.dumps(log_data)
# Setup
handler = logging.StreamHandler()
handler.setFormatter(JSONFormatter())
logger = logging.getLogger(__name__)
logger.addHandler(handler)
logger.setLevel(logging.INFO)
# Usage
def extract_orders(date):
logger.info(
"Extracting orders",
extra={
'pipeline': 'daily_orders',
'execution_date': str(date),
'step': 'extract'
}
)
df = fetch_orders(date)
logger.info(
"Extraction complete",
extra={
'pipeline': 'daily_orders',
'execution_date': str(date),
'step': 'extract',
'rows_processed': len(df)
}
)
return df
Structured logs = easy to parse = easy to alert on.
Metrics Collection
Track key metrics:
from dataclasses import dataclass
from datetime import datetime, timedelta
import json
@dataclass
class PipelineMetrics:
"""Metrics for a pipeline run."""
pipeline_name: str
execution_date: str
start_time: datetime
end_time: datetime
status: str # success, failed, partial
# Data metrics
rows_extracted: int
rows_transformed: int
rows_loaded: int
rows_failed: int
# Performance metrics
extract_duration_sec: float
transform_duration_sec: float
load_duration_sec: float
# Quality metrics
null_count: int
duplicate_count: int
@property
def total_duration_sec(self) -> float:
return (self.end_time - self.start_time).total_seconds()
@property
def success_rate(self) -> float:
total = self.rows_extracted
if total == 0:
return 0.0
return (self.rows_loaded / total) * 100
def to_json(self) -> str:
"""Serialize metrics to JSON."""
return json.dumps({
'pipeline_name': self.pipeline_name,
'execution_date': self.execution_date,
'start_time': self.start_time.isoformat(),
'end_time': self.end_time.isoformat(),
'status': self.status,
'rows_extracted': self.rows_extracted,
'rows_transformed': self.rows_transformed,
'rows_loaded': self.rows_loaded,
'rows_failed': self.rows_failed,
'extract_duration_sec': self.extract_duration_sec,
'transform_duration_sec': self.transform_duration_sec,
'load_duration_sec': self.load_duration_sec,
'total_duration_sec': self.total_duration_sec,
'success_rate': self.success_rate,
'null_count': self.null_count,
'duplicate_count': self.duplicate_count
})
def save(self, engine):
"""Save metrics to database."""
query = """
INSERT INTO pipeline_metrics (
pipeline_name, execution_date, start_time, end_time, status,
rows_extracted, rows_transformed, rows_loaded, rows_failed,
extract_duration_sec, transform_duration_sec, load_duration_sec,
null_count, duplicate_count
) VALUES (
:pipeline_name, :execution_date, :start_time, :end_time, :status,
:rows_extracted, :rows_transformed, :rows_loaded, :rows_failed,
:extract_duration_sec, :transform_duration_sec, :load_duration_sec,
:null_count, :duplicate_count
)
"""
with engine.begin() as conn:
conn.execute(text(query), {
'pipeline_name': self.pipeline_name,
'execution_date': self.execution_date,
'start_time': self.start_time,
'end_time': self.end_time,
'status': self.status,
'rows_extracted': self.rows_extracted,
'rows_transformed': self.rows_transformed,
'rows_loaded': self.rows_loaded,
'rows_failed': self.rows_failed,
'extract_duration_sec': self.extract_duration_sec,
'transform_duration_sec': self.transform_duration_sec,
'load_duration_sec': self.load_duration_sec,
'null_count': self.null_count,
'duplicate_count': self.duplicate_count
})
Pipeline with Monitoring
import time
from datetime import datetime
def run_pipeline_with_monitoring(process_date, engine):
"""Run pipeline with full monitoring."""
metrics = PipelineMetrics(
pipeline_name='daily_orders',
execution_date=str(process_date),
start_time=datetime.now(),
end_time=None,
status='running',
rows_extracted=0,
rows_transformed=0,
rows_loaded=0,
rows_failed=0,
extract_duration_sec=0,
transform_duration_sec=0,
load_duration_sec=0,
null_count=0,
duplicate_count=0
)
try:
# Extract
extract_start = time.time()
df = extract_orders(process_date)
metrics.extract_duration_sec = time.time() - extract_start
metrics.rows_extracted = len(df)
# Transform
transform_start = time.time()
df_transformed = transform_orders(df)
metrics.transform_duration_sec = time.time() - transform_start
metrics.rows_transformed = len(df_transformed)
# Quality checks
metrics.null_count = df_transformed.isnull().sum().sum()
metrics.duplicate_count = df_transformed.duplicated().sum()
# Load
load_start = time.time()
load_orders(df_transformed, engine, process_date)
metrics.load_duration_sec = time.time() - load_start
metrics.rows_loaded = len(df_transformed)
# Success
metrics.status = 'success'
metrics.end_time = datetime.now()
except Exception as e:
metrics.status = 'failed'
metrics.end_time = datetime.now()
logger.error(f"Pipeline failed: {e}")
raise
finally:
# Always save metrics
metrics.save(engine)
logger.info(metrics.to_json())
return metrics
Anomaly Detection
Detect when metrics are unusual:
def check_for_anomalies(metrics: PipelineMetrics, engine):
"""Check if current metrics are anomalous."""
# Get historical metrics (last 30 days)
query = """
SELECT
AVG(rows_loaded) as avg_rows,
STDDEV(rows_loaded) as stddev_rows,
AVG(total_duration_sec) as avg_duration,
STDDEV(total_duration_sec) as stddev_duration
FROM pipeline_metrics
WHERE pipeline_name = :pipeline_name
AND status = 'success'
AND start_time > NOW() - INTERVAL '30 days'
"""
with engine.connect() as conn:
result = conn.execute(
text(query),
{'pipeline_name': metrics.pipeline_name}
).fetchone()
if not result:
return # Not enough history
avg_rows, stddev_rows, avg_duration, stddev_duration = result
# Check row count
if stddev_rows > 0:
row_z_score = abs((metrics.rows_loaded - avg_rows) / stddev_rows)
if row_z_score > 3: # More than 3 standard deviations
logger.warning(
f"Anomaly detected: row count {metrics.rows_loaded} "
f"is unusual (avg: {avg_rows:.0f}, stddev: {stddev_rows:.0f})"
)
send_alert(
f"Pipeline {metrics.pipeline_name}: Unusual row count",
f"Loaded {metrics.rows_loaded} rows (expected ~{avg_rows:.0f})"
)
# Check duration
if stddev_duration > 0:
duration_z_score = abs((metrics.total_duration_sec - avg_duration) / stddev_duration)
if duration_z_score > 3:
logger.warning(
f"Anomaly detected: duration {metrics.total_duration_sec:.1f}s "
f"is unusual (avg: {avg_duration:.1f}s)"
)
send_alert(
f"Pipeline {metrics.pipeline_name}: Unusually slow",
f"Took {metrics.total_duration_sec:.1f}s (expected ~{avg_duration:.1f}s)"
)
Dashboard Queries
Track pipeline health:
-- Pipeline success rate (last 30 days)
SELECT
pipeline_name,
COUNT(*) as total_runs,
SUM(CASE WHEN status = 'success' THEN 1 ELSE 0 END) as successful_runs,
(SUM(CASE WHEN status = 'success' THEN 1 ELSE 0 END)::FLOAT / COUNT(*)) * 100 as success_rate
FROM pipeline_metrics
WHERE start_time > NOW() - INTERVAL '30 days'
GROUP BY pipeline_name;
-- Average duration trend
SELECT
DATE(start_time) as date,
pipeline_name,
AVG(extract_duration_sec + transform_duration_sec + load_duration_sec) as avg_duration_sec
FROM pipeline_metrics
WHERE status = 'success'
AND start_time > NOW() - INTERVAL '30 days'
GROUP BY DATE(start_time), pipeline_name
ORDER BY date DESC;
-- Data volume trend
SELECT
DATE(start_time) as date,
pipeline_name,
AVG(rows_loaded) as avg_rows_loaded
FROM pipeline_metrics
WHERE status = 'success'
AND start_time > NOW() - INTERVAL '30 days'
GROUP BY DATE(start_time), pipeline_name
ORDER BY date DESC;
Airflow Integration
Airflow has built-in monitoring:
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.utils.email import send_email
from datetime import datetime, timedelta
def on_failure_callback(context):
"""Send alert on failure."""
send_email(
to=['data-team@company.com'],
subject=f"Pipeline FAILED: {context['task_instance'].task_id}",
html_content=f"""
<h3>Pipeline Failed</h3>
<p><strong>DAG:</strong> {context['task_instance'].dag_id}</p>
<p><strong>Task:</strong> {context['task_instance'].task_id}</p>
<p><strong>Execution Date:</strong> {context['execution_date']}</p>
<p><strong>Log:</strong> <a href="{context['task_instance'].log_url}">View Log</a></p>
"""
)
default_args = {
'owner': 'data-team',
'retries': 3,
'retry_delay': timedelta(minutes=5),
'email_on_failure': True,
'email_on_retry': False,
'on_failure_callback': on_failure_callback
}
with DAG(
'monitored_pipeline',
default_args=default_args,
schedule='@daily'
) as dag:
task = PythonOperator(
task_id='run_pipeline',
python_callable=run_pipeline_with_monitoring
)
Summary
Monitor pipelines:
- Execution: Did it run? Did it succeed?
- Performance: How long? Any slowdowns?
- Data: How many records? Any anomalies?
- Quality: Nulls? Duplicates?
Collect metrics. Store them. Alert on anomalies.
Monitored pipelines = reliable pipelines.
Need help implementing this in your company?
For delivery-focused missions (Data Engineering, Architecture Data, Data Product Owner), visit ISData Consulting.