Retry Logic Tutorial
This tutorial covers implementing automatic retry logic for failed pipeline steps.
1. Introduction
Retry logic is essential for handling transient failures like network timeouts, temporary service unavailability, or temporary resource constraints.
2. Built-in Retry Support
2.1 Configuring Retries
wpipe has built-in support for retries:
from wpipe import Pipeline
pipeline = Pipeline(
verbose=True,
max_retries=3, # Number of retry attempts
retry_delay=1.0, # Seconds between retries
retry_on_exceptions=(Exception,) # Which exceptions to retry on
)
2.2 How It Works
When a step fails:
1. Pipeline catches the exception
2. Waits for retry_delay seconds
3. Attempts the step again
4. Repeats up to max_retries times
5. If all attempts fail, raises the final exception
3. Custom Retry Decorators
3.1 Basic Retry Decorator
Create a reusable retry decorator:
import time
from functools import wraps
def retry(max_attempts=3, delay=1.0, backoff=1.0):
"""Retry decorator with exponential backoff.
Args:
max_attempts: Maximum number of attempts
delay: Initial delay between retries (seconds)
backoff: Multiplier for delay after each retry
"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
current_delay = delay
last_exception = None
for attempt in range(max_attempts):
try:
return func(*args, **kwargs)
except Exception as e:
last_exception = e
if attempt < max_attempts - 1:
time.sleep(current_delay)
current_delay *= backoff
else:
raise
raise last_exception
return wrapper
return decorator
3.2 Using the Decorator
Apply the decorator to your step functions:
@retry(max_attempts=3, delay=1.0, backoff=2.0)
def unreliable_api_call(data):
"""Call that might fail occasionally."""
import random
if random.random() < 0.7: # 70% chance of failure
raise ConnectionError("Connection failed")
return {"result": "success"}
4. Advanced Retry Patterns
4.1 Retry with Custom Logic
def conditional_retry(max_attempts=3, delay=1.0):
"""Retry only on specific exceptions."""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
retryable_exceptions = (
ConnectionError,
TimeoutError,
ConnectionResetError,
)
for attempt in range(max_attempts):
try:
return func(*args, **kwargs)
except retryable_exceptions as e:
if attempt < max_attempts - 1:
time.sleep(delay * (attempt + 1))
else:
raise
except Exception as e:
# Don't retry on other exceptions
raise
return wrapper
return decorator
4.2 Retry with Logging
import logging
logger = logging.getLogger(__name__)
def retry_with_logging(max_attempts=3, delay=1.0):
"""Retry with detailed logging."""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
for attempt in range(max_attempts):
try:
return func(*args, **kwargs)
except Exception as e:
logger.warning(
f"Attempt {attempt + 1}/{max_attempts} failed: {e}"
)
if attempt < max_attempts - 1:
logger.info(f"Retrying in {delay} seconds...")
time.sleep(delay)
else:
logger.error(
f"All {max_attempts} attempts failed"
)
raise
return wrapper
return decorator
5. Step-Level Retries
5.1 Independent Retry Config
Configure retries per-step:
class RetryableStep:
"""Step with its own retry logic."""
def __init__(self, max_retries: int = 3):
self.max_retries = max_retries
def __call__(self, data: dict) -> dict:
last_error = None
for attempt in range(self.max_retries):
try:
return self._execute(data)
except Exception as e:
last_error = e
if attempt < self.max_retries - 1:
time.sleep(1 * (attempt + 1))
raise last_error
def _execute(self, data: dict) -> dict:
# Actual implementation
return {"result": "success"}
6. Handling Different Failure Types
6.1 Network Failures
import requests
@retry(max_attempts=3, delay=2.0)
def fetch_with_retry(url: str) -> dict:
"""Fetch data with retry on network failure."""
response = requests.get(url, timeout=5)
response.raise_for_status()
return response.json()
6.2 Database Failures
@retry(max_attempts=3, delay=1.0)
def save_to_database(data: dict) -> dict:
"""Save with retry on database errors."""
# Connection errors, deadlocks, etc. can be retried
connection = get_db_connection()
try:
connection.save(data)
return {"saved": True}
finally:
connection.close()
7. Timeout Handling
7.1 Combined Retry and Timeout
import signal
from functools import wraps
class TimeoutError(Exception):
pass
def timeout_handler(signum, frame):
raise TimeoutError("Operation timed out")
def retry_with_timeout(max_attempts=3, delay=1.0, timeout_seconds=5):
"""Retry with timeout for each attempt."""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
for attempt in range(max_attempts):
# Set timeout
signal.signal(signal.SIGALRM, timeout_handler)
signal.alarm(timeout_seconds)
try:
result = func(*args, **kwargs)
signal.alarm(0) # Cancel alarm
return result
except TimeoutError:
if attempt < max_attempts - 1:
signal.alarm(0)
time.sleep(delay)
else:
raise
finally:
signal.alarm(0)
return wrapper
return decorator
8. Complete Example
Here’s a complete example with comprehensive retry logic:
import time
import logging
from functools import wraps
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def smart_retry(
max_attempts=3,
delay=1.0,
backoff=2.0,
retry_on=(Exception,)
):
"""Smart retry decorator with exponential backoff."""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
current_delay = delay
for attempt in range(max_attempts):
try:
result = func(*args, **kwargs)
if attempt > 0:
logger.info(
f"{func.__name__} succeeded after {attempt + 1} attempts"
)
return result
except retry_on as e:
last_error = e
if attempt < max_attempts - 1:
logger.warning(
f"{func.__name__} attempt {attempt + 1} failed: {e}. "
f"Retrying in {current_delay}s..."
)
time.sleep(current_delay)
current_delay *= backoff
else:
logger.error(
f"{func.__name__} failed after {max_attempts} attempts"
)
raise last_error
except Exception as e:
# Don't retry on non-retryable exceptions
logger.error(f"Non-retryable error in {func.__name__}: {e}")
raise
raise last_error
return wrapper
return decorator
# Apply to step functions
@smart_retry(max_attempts=3, delay=1.0, backoff=2.0)
def fetch_from_api(data):
"""Fetch data from external API."""
import random
# Simulate occasional failures
if random.random() < 0.5:
raise ConnectionError("API temporarily unavailable")
return {"data": ["item1", "item2", "item3"]}
@smart_retry(max_attempts=2, delay=0.5)
def process_data(data):
"""Process the fetched data."""
if not data.get("data"):
raise ValueError("No data to process")
return {"processed": True, "count": len(data["data"])}
@smart_retry(max_attempts=3, delay=1.0, retry_on=(ConnectionError, TimeoutError))
def save_results(data):
"""Save results to database."""
import random
if random.random() < 0.3:
raise ConnectionError("Database connection failed")
return {"saved": True}
# Create pipeline
from wpipe import Pipeline
pipeline = Pipeline(verbose=True)
pipeline.set_steps([
(fetch_from_api, "Fetch from API", "v1.0"),
(process_data, "Process Data", "v1.0"),
(save_results, "Save Results", "v1.0"),
])
# Run pipeline
result = pipeline.run({})
print(f"Pipeline completed: {result}")
9. Best Practices
Retry only transient failures - Network, timeouts, not logic errors
Use exponential backoff - Don’t hammer the failing service
Set reasonable limits - Don’t retry forever
Log retry attempts - Helps with debugging
Distinguish recoverable vs non-recoverable - Don’t retry validation errors
Consider circuit breaker - Stop retrying if service is down
10. Next Steps
Nested Pipelines Tutorial - Combine pipelines together
Production Deployment Tutorial - Deploy to production
best_practices - More best practices