Error Handling in Data Pipelines
Why Pipelines Fail
Pipelines fail constantly:
- API rate limits
- Database timeouts
- Network issues
- Out of memory
- Malformed data
- Schema changes
Failures are normal. Bad error handling is not.
The Problem: Silent Failures
def extract_data(api_url):
try:
response = requests.get(api_url)
return response.json()
except:
return [] # Silent failure
This returns empty data when the API fails. Nobody notices. Dashboards show zeros. Reports are wrong. Trust erodes.
Principle 1: Fail Loudly
import requests
import logging
logger = logging.getLogger(__name__)
class DataExtractionError(Exception):
"""Raised when data extraction fails."""
pass
def extract_data(api_url: str) -> dict:
"""Extract data from API."""
try:
response = requests.get(api_url, timeout=30)
response.raise_for_status() # Raise error for 4xx/5xx
return response.json()
except requests.Timeout:
logger.error(f"API timeout: {api_url}")
raise DataExtractionError(f"API timeout after 30s: {api_url}")
except requests.HTTPError as e:
logger.error(f"API HTTP error: {e.response.status_code}")
raise DataExtractionError(f"API returned {e.response.status_code}: {api_url}")
except requests.RequestException as e:
logger.error(f"API request failed: {e}")
raise DataExtractionError(f"Failed to connect to API: {api_url}")
except ValueError: # JSON decode error
logger.error(f"Invalid JSON response from {api_url}")
raise DataExtractionError(f"API returned invalid JSON: {api_url}")
When something fails, the pipeline stops. Loudly. You fix it before bad data propagates.
Principle 2: Retry Transient Failures
Some errors are temporary:
- Network blip
- Database connection timeout
- API rate limit (temporary)
Retry these. Don’t retry permanent failures (bad credentials, 404).
import time
from functools import wraps
def retry(max_attempts=3, delay=5, backoff=2, exceptions=(Exception,)):
"""
Retry decorator with exponential backoff.
Args:
max_attempts: Maximum retry attempts
delay: Initial delay in seconds
backoff: Backoff multiplier
exceptions: Exceptions to catch and retry
"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
current_delay = delay
for attempt in range(1, max_attempts + 1):
try:
return func(*args, **kwargs)
except exceptions as e:
if attempt == max_attempts:
logger.error(f"{func.__name__} failed after {max_attempts} attempts")
raise
logger.warning(
f"{func.__name__} failed (attempt {attempt}/{max_attempts}): {e}. "
f"Retrying in {current_delay}s..."
)
time.sleep(current_delay)
current_delay *= backoff
return wrapper
return decorator
# Usage
@retry(max_attempts=3, delay=5, exceptions=(requests.Timeout, requests.ConnectionError))
def extract_data(api_url: str) -> dict:
response = requests.get(api_url, timeout=30)
response.raise_for_status()
return response.json()
Transient failures: retried automatically. Permanent failures: fail immediately.
Principle 3: Categorize Failures
Not all errors are equal:
Transient: Retry
- Network timeout
- Database connection reset
- API rate limit
Permanent: Fail immediately
- Invalid credentials
- 404 Not Found
- Schema mismatch
Partial: Continue with partial data
- Some records are malformed
- Optional enrichment fails
class TransientError(Exception):
"""Temporary error, should retry."""
pass
class PermanentError(Exception):
"""Permanent error, don't retry."""
pass
class PartialFailure(Exception):
"""Some data failed, but can proceed."""
def __init__(self, message, successful_count, failed_count):
super().__init__(message)
self.successful_count = successful_count
self.failed_count = failed_count
def process_records(records):
"""Process records, handle partial failures."""
successful = []
failed = []
for record in records:
try:
processed = transform_record(record)
successful.append(processed)
except ValueError as e:
logger.warning(f"Failed to process record {record.get('id')}: {e}")
failed.append(record)
if failed:
logger.warning(f"Processed {len(successful)}/{len(records)} records successfully")
# If more than 10% fail, something is wrong
failure_rate = len(failed) / len(records)
if failure_rate > 0.1:
raise PartialFailure(
f"High failure rate: {failure_rate:.1%}",
len(successful),
len(failed)
)
return successful
Principle 4: Graceful Degradation
When optional enrichments fail, continue with core data:
def enrich_customer_data(customers_df):
"""Enrich customer data with external sources."""
df = customers_df.copy()
# Core data: must succeed
df = calculate_ltv(df)
# Optional enrichment: can fail
try:
credit_scores = fetch_credit_scores(df['customer_id'])
df = df.merge(credit_scores, on='customer_id', how='left')
logger.info("Successfully enriched with credit scores")
except Exception as e:
logger.warning(f"Credit score enrichment failed: {e}. Continuing without it.")
# Pipeline continues, just without credit scores
return df
Core features fail loudly. Optional features degrade gracefully.
Principle 5: Alert on Failures
import smtplib
from email.message import EmailMessage
def send_alert(subject: str, body: str, to: list[str]):
"""Send alert email."""
msg = EmailMessage()
msg['Subject'] = subject
msg['From'] = 'pipeline-alerts@company.com'
msg['To'] = ', '.join(to)
msg.set_content(body)
with smtplib.SMTP('smtp.company.com') as smtp:
smtp.send_message(msg)
def run_pipeline_with_alerts(process_date):
"""Run pipeline with alerting."""
try:
run_pipeline(process_date)
except DataExtractionError as e:
send_alert(
subject=f"Pipeline FAILED: Data extraction error",
body=f"Pipeline failed to extract data for {process_date}.\n\nError: {e}",
to=['data-team@company.com']
)
raise
except Exception as e:
send_alert(
subject=f"Pipeline FAILED: Unexpected error",
body=f"Pipeline failed for {process_date}.\n\nError: {e}",
to=['data-team@company.com', 'oncall@company.com']
)
raise
Failures trigger alerts. Teams respond immediately.
Real Example: Robust API Extraction
import requests
from typing import Optional
import logging
logger = logging.getLogger(__name__)
class APIClient:
"""Robust API client with error handling."""
def __init__(self, base_url: str, api_key: str, max_retries: int = 3):
self.base_url = base_url
self.api_key = api_key
self.max_retries = max_retries
def get(self, endpoint: str, params: Optional[dict] = None) -> dict:
"""
GET request with retry logic and error handling.
"""
url = f"{self.base_url}/{endpoint}"
headers = {'Authorization': f'Bearer {self.api_key}'}
for attempt in range(1, self.max_retries + 1):
try:
response = requests.get(
url,
headers=headers,
params=params,
timeout=30
)
# Handle rate limiting
if response.status_code == 429:
retry_after = int(response.headers.get('Retry-After', 60))
logger.warning(f"Rate limited. Retrying after {retry_after}s")
time.sleep(retry_after)
continue
# Check for errors
response.raise_for_status()
return response.json()
except requests.Timeout:
if attempt == self.max_retries:
logger.error(f"API timeout after {self.max_retries} attempts")
raise TransientError(f"API timeout: {url}")
logger.warning(f"Timeout (attempt {attempt}/{self.max_retries}), retrying...")
time.sleep(5 * attempt)
except requests.HTTPError as e:
# Don't retry 4xx errors (client errors)
if 400 <= e.response.status_code < 500:
logger.error(f"Client error: {e.response.status_code}")
raise PermanentError(f"API error {e.response.status_code}: {url}")
# Retry 5xx errors (server errors)
if attempt == self.max_retries:
raise TransientError(f"API error after retries: {url}")
logger.warning(f"Server error (attempt {attempt}/{self.max_retries}), retrying...")
time.sleep(5 * attempt)
except requests.RequestException as e:
if attempt == self.max_retries:
raise TransientError(f"Network error: {url}")
logger.warning(f"Network error (attempt {attempt}/{self.max_retries}), retrying...")
time.sleep(5 * attempt)
raise TransientError(f"Failed after {self.max_retries} attempts: {url}")
Testing Error Handling
import pytest
from unittest.mock import patch, Mock
def test_retry_on_timeout():
"""Test that transient errors are retried."""
with patch('requests.get') as mock_get:
# Fail twice, succeed third time
mock_get.side_effect = [
requests.Timeout(),
requests.Timeout(),
Mock(status_code=200, json=lambda: {'data': 'success'})
]
client = APIClient('https://api.example.com', 'key')
result = client.get('endpoint')
assert result == {'data': 'success'}
assert mock_get.call_count == 3
def test_no_retry_on_404():
"""Test that permanent errors are not retried."""
with patch('requests.get') as mock_get:
response = Mock()
response.status_code = 404
response.raise_for_status.side_effect = requests.HTTPError(response=response)
mock_get.return_value = response
client = APIClient('https://api.example.com', 'key')
with pytest.raises(PermanentError):
client.get('endpoint')
assert mock_get.call_count == 1 # No retry
Summary
Handle errors properly:
- Fail loudly: Don’t return empty data silently
- Retry transient errors: Network issues, timeouts
- Fail fast on permanent errors: Bad credentials, 404
- Graceful degradation: Core succeeds, optional fails
- Alert immediately: Teams respond fast
Resilient pipelines = production-ready pipelines.
Need help implementing this in your company?
For delivery-focused missions (Data Engineering, Architecture Data, Data Product Owner), visit ISData Consulting.