Error Handling Tutorial
This tutorial covers implementing robust error handling in your wpipe pipelines.
1. Introduction
Proper error handling is crucial for production pipelines. wpipe provides comprehensive error handling through custom exceptions and error codes.
2. Understanding Exceptions
2.1 TaskError
The primary exception in wpipe is TaskError:
from wpipe.exception import TaskError
try:
result = pipeline.run(data)
except TaskError as e:
print(f"Error: {e}")
print(f"Code: {e.error_code}")
2.2 Error Codes
wpipe defines standard error codes:
from wpipe.exception import Codes
print(Codes.TASK_FAILED) # 502
print(Codes.API_ERROR) # 501
print(Codes.UPDATE_PROCESS_ERROR) # 504
print(Codes.UPDATE_TASK) # 505
print(Codes.UPDATE_PROCESS_OK) # 503
3. Raising TaskError
3.1 Basic Usage
Raise TaskError from step functions:
from wpipe.exception import TaskError, Codes
def validate_input(data):
"""Validate input data."""
if "email" not in data:
raise TaskError(
"Email is required",
Codes.VALIDATION_ERROR
)
if "@" not in data["email"]:
raise TaskError(
"Invalid email format",
Codes.VALIDATION_ERROR
)
return {"validated": True}
3.2 With Step Information
Include step name in errors:
def process_data(data):
try:
# Processing logic
return {"result": "success"}
except ValueError as e:
raise TaskError(
f"Processing failed: {e}",
Codes.TASK_FAILED
) from e
4. Catching Errors
4.1 Basic Catch
from wpipe import Pipeline
from wpipe.exception import TaskError
pipeline = Pipeline(verbose=True)
pipeline.set_steps([
(validate_input, "Validate", "v1.0"),
(process_data, "Process", "v1.0"),
])
try:
result = pipeline.run({"email": "invalid"})
except TaskError as e:
print(f"Pipeline failed at step")
print(f"Error: {e}")
print(f"Code: {e.error_code}")
4.2 Specific Error Codes
Handle different error types differently:
from wpipe.exception import TaskError, Codes
try:
result = pipeline.run(data)
except TaskError as e:
if e.error_code == Codes.VALIDATION_ERROR:
print("Validation failed - check input data")
elif e.error_code == Codes.API_ERROR:
print("API error - check network/API")
elif e.error_code == Codes.TASK_FAILED:
print("Task failed - check logs")
else:
print(f"Unknown error: {e}")
5. Error Recovery
5.1 Graceful Degradation
Handle errors and continue:
def safe_step(data):
"""Step that handles errors gracefully."""
try:
result = risky_operation(data)
return {"status": "success", "result": result}
except Exception as e:
return {"status": "fallback", "error": str(e)}
5.2 Fallback Values
Provide default values on failure:
def fetch_with_fallback(data):
"""Fetch with fallback on failure."""
try:
return {"data": fetch_from_api()}
except Exception:
return {"data": get_cached_data()}
6. Validation Patterns
6.1 Early Validation
Validate at the start of the pipeline:
def validate_pipeline_input(data):
"""Validate all required fields exist."""
required = ["email", "name", "age"]
missing = [f for f in required if f not in data]
if missing:
raise TaskError(
f"Missing required fields: {missing}",
Codes.VALIDATION_ERROR
)
# Validate types
if not isinstance(data["age"], int):
raise TaskError(
"Age must be an integer",
Codes.VALIDATION_ERROR
)
if data["age"] < 0 or data["age"] > 150:
raise TaskError(
"Age must be between 0 and 150",
Codes.VALIDATION_ERROR
)
return data
6.2 Chain Validation
Validate at each step:
class ValidatingStep:
"""Step with built-in validation."""
def __init__(self, required_fields: list):
self.required_fields = required_fields
def __call__(self, data: dict) -> dict:
# Check required fields exist
missing = [f for f in self.required_fields if f not in data]
if missing:
raise TaskError(
f"Missing fields: {missing}",
Codes.VALIDATION_ERROR
)
# Process
return {"processed": True}
7. API Error Handling
7.1 Handle API Failures
def call_api_with_retry(data):
"""Call API with error handling."""
from wpipe import APIClient
client = APIClient(base_url="http://api.example.com", token="...")
try:
result = client.send_post("/process", data)
if result is None:
raise TaskError("API returned no result", Codes.API_ERROR)
return {"api_result": result}
except Exception as e:
raise TaskError(f"API call failed: {e}", Codes.API_ERROR) from e
7.2 Timeout Handling
import requests
from wpipe.exception import TaskError, Codes
def call_with_timeout(data, timeout=5):
"""Call with timeout handling."""
try:
response = requests.post(
"http://api.example.com/process",
json=data,
timeout=timeout
)
return {"response": response.json()}
except requests.Timeout:
raise TaskError("API request timed out", Codes.TIMEOUT_ERROR)
except requests.RequestException as e:
raise TaskError(f"API request failed: {e}", Codes.API_ERROR)
8. Custom Error Classes
8.1 Define Custom Errors
from wpipe.exception import TaskError
class ValidationError(TaskError):
"""Custom validation error."""
pass
class ProcessingError(TaskError):
"""Custom processing error."""
pass
8.2 Use Custom Errors
from wpipe.exception import Codes
class ValidationError(TaskError):
def __init__(self, message: str):
super().__init__(message, Codes.VALIDATION_ERROR)
def validate(data):
if not data.get("name"):
raise ValidationError("Name is required")
return data
9. Error Logging
9.1 Log Errors
import logging
logger = logging.getLogger(__name__)
def logged_step(data):
"""Step with error logging."""
try:
return process(data)
except Exception as e:
logger.error(f"Step failed: {e}", exc_info=True)
raise
9.2 Error Context
Add context to errors:
import traceback
def step_with_context(data):
try:
return risky_operation(data)
except Exception as e:
# Add context to error
error_context = {
"input_keys": list(data.keys()),
"error_type": type(e).__name__,
"traceback": traceback.format_exc()
}
raise TaskError(
f"Step failed: {e}",
Codes.TASK_FAILED
) from e
10. Complete Example
Here’s a complete error handling example:
from wpipe import Pipeline
from wpipe.exception import TaskError, Codes
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class ErrorHandlingPipeline(Pipeline):
"""Pipeline with comprehensive error handling."""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
def run(self, *args, **kwargs):
try:
return super().run(*args, **kwargs)
except TaskError as e:
logger.error(f"Task failed: {e}")
logger.error(f"Error code: {e.error_code}")
raise
except Exception as e:
logger.critical(f"Unexpected error: {e}")
raise TaskError(str(e), Codes.UNKNOWN_ERROR) from e
# Define steps with error handling
def validate_input(data):
"""Validate input data."""
if "value" not in data:
raise TaskError("Missing 'value' in input", Codes.VALIDATION_ERROR)
if not isinstance(data["value"], (int, float)):
raise TaskError("'value' must be a number", Codes.VALIDATION_ERROR)
if data["value"] < 0:
raise TaskError("'value' must be non-negative", Codes.VALIDATION_ERROR)
return {"validated": True}
def process_value(data):
"""Process the validated value."""
value = data["value"]
if value > 1000:
raise TaskError("Value too large for processing", Codes.TASK_FAILED)
return {"processed_value": value * 2}
def format_result(data):
"""Format the result."""
return {
"output": f"Processed: {data['processed_value']}",
"success": True
}
# Create and run pipeline
pipeline = ErrorHandlingPipeline(verbose=True)
pipeline.set_steps([
(validate_input, "Validate Input", "v1.0"),
(process_value, "Process Value", "v1.0"),
(format_result, "Format Result", "v1.0"),
])
# Test with valid data
result = pipeline.run({"value": 50})
print(f"Success: {result}")
# Test with invalid data
try:
result = pipeline.run({"value": -1})
except TaskError as e:
print(f"Handled error: {e}")
11. Best Practices
Validate early - Check inputs at the start of the pipeline
Fail fast - Raise errors immediately when problems are detected
Provide context - Include useful information in error messages
Use specific codes - Choose the most appropriate error code
Log errors - Record errors for debugging
Handle gracefully - Plan for failures and provide fallbacks
12. Next Steps
Retry Logic Tutorial - Add automatic retries
Conditional Branches Tutorial - Handle different error scenarios
Production Deployment Tutorial - Deploy to production