Best Practices
This guide outlines best practices for building robust, maintainable, and efficient pipelines with wpipe.
1. Pipeline Design
1.1 Single Responsibility Principle
Each step should do one thing well:
Good:
def fetch_user(data):
return {"user": database.get_user(data["user_id"])}
def validate_user(data):
user = data["user"]
return {"valid": user is not None and user.is_active}
def send_notification(data):
if data["valid"]:
email.send(data["user"].email, "Welcome!")
return {"notified": data["valid"]}
Bad:
def everything(data):
user = database.get_user(data["user_id"]) # Fetch
if user and user.is_active: # Validation
email.send(user.email, "Welcome!") # Send
return {"done": True} # Done
1.2 Keep Steps Small and Testable
Smaller steps are easier to:
Test in isolation
Debug when errors occur
Reuse in different pipelines
Understand and maintain
Good - Testable step:
def calculate_discount(price: float, discount_percent: float) -> float:
"""Calculate discounted price."""
return price * (1 - discount_percent / 100)
def apply_discount(data):
price = data["original_price"]
discount = data["discount_percent"]
return {"final_price": calculate_discount(price, discount)}
1.3 Order Steps Logically
Steps should follow a natural workflow:
Fetch Data → Validate Input → Process Data → Save Results
NOT:
Process Data → Fetch Data → Save Results → Validate Input
2. Error Handling
2.1 Always Return Dictionaries
Never return None from a step:
# Good
def good_step(data):
return {"result": data["x"] * 2}
# Bad - returns None on error
def bad_step(data):
if "x" not in data:
return # ERROR!
return {"result": data["x"] * 2}
2.2 Use .get() with Defaults
Handle missing keys gracefully:
def robust_step(data):
value = data.get("value", 0) # Default to 0
multiplier = data.get("multiplier", 1) # Default to 1
optional = data.get("optional", None) # Default to None
return {"result": value * multiplier}
2.3 Raise TaskError for Failures
Use the appropriate exception type:
from wpipe.exception import TaskError, Codes
def validate_input(data):
if "email" not in data:
raise TaskError(
"Email is required",
step_name="Validate Input",
code=Codes.VALIDATION_ERROR
)
if not is_valid_email(data["email"]):
raise TaskError(
"Invalid email format",
step_name="Validate Email",
code=Codes.VALIDATION_ERROR
)
return {"validated": True}
2.4 Distinguish Recoverable Errors
Mark errors that can be retried:
from wpipe.exception import TaskError, Codes
def call_api(data):
try:
result = api.get(data["endpoint"])
return {"api_result": result}
except NetworkTimeout:
raise TaskError(
"API timeout - may succeed on retry",
code=Codes.RETRYABLE_ERROR
)
except InvalidApiKey:
raise TaskError(
"Invalid API key - won't succeed on retry",
code=Codes.API_ERROR
)
3. Data Management
3.1 Don’t Mutate Input Data
Create new dictionaries instead of modifying:
# Good - creates new dict
def transform_good(data):
return {
**data,
"transformed": True,
"value": data["value"] * 2,
}
# Bad - mutates input
def transform_bad(data):
data["transformed"] = True
data["value"] = data["value"] * 2
return data # Same object, mutated!
3.2 Use Clear Key Names
Avoid generic names:
# Good - descriptive keys
return {
"user_email": user.email,
"user_id": user.id,
"email_validated": True,
}
# Bad - generic keys
return {
"a": user.email,
"b": user.id,
"c": True,
}
3.3 Document Data Schema
Comment expected input/output:
def process_payment(data):
"""Process a payment transaction.
Input:
data (dict):
- amount (float): Payment amount in USD
- currency (str): Currency code (e.g., 'USD')
- card_token (str): Tokenized card from payment provider
Output:
data (dict):
- transaction_id (str): Unique transaction ID
- status (str): 'success' or 'failed'
- receipt_url (str): URL to receipt
Raises:
TaskError: If payment processing fails
"""
...
4. Performance
4.1 Lazy Loading
Only load data when needed:
def fetch_if_needed(data):
if "cached_data" not in data:
return {"cached_data": expensive_fetch()}
return {"cached_data": data["cached_data"]}
4.2 Avoid Redundant Processing
Cache results when safe:
class CachedTransformer:
def __init__(self):
self.cache = {}
def __call__(self, data):
cache_key = data.get("id")
if cache_key in self.cache:
return self.cache[cache_key]
result = expensive_transform(data)
self.cache[cache_key] = result
return result
4.3 Batch Operations
Process multiple items together:
# Bad - individual calls
def process_slow(data):
for item in data["items"]:
api.save(item) # N API calls
return {"saved": len(data["items"])}
# Good - batch call
def process_fast(data):
api.batch_save(data["items"]) # 1 API call
return {"saved": len(data["items"])}
5. Testing
5.1 Test Steps in Isolation
Each step should be independently testable:
# my_module.py
def calculate_total(items):
return {"total": sum(item["price"] for item in items)}
# test_my_module.py
def test_calculate_total():
data = {"items": [
{"name": "A", "price": 10},
{"name": "B", "price": 20},
]}
result = calculate_total(data)
assert result["total"] == 30
5.2 Mock External Dependencies
Isolate from external services:
from unittest.mock import patch
def test_pipeline_with_api():
with patch("my_module.api_call") as mock_api:
mock_api.return_value = {"success": True}
pipeline = Pipeline()
pipeline.set_steps([
(call_api, "Call API", "v1.0"),
])
result = pipeline.run({"endpoint": "/test"})
assert result["success"] is True
mock_api.assert_called_once()
5.3 Test Error Handling
Verify error behavior:
from wpipe.exception import TaskError
def test_validation_error():
pipeline = Pipeline()
pipeline.set_steps([
(validate_email, "Validate", "v1.0"),
])
with pytest.raises(TaskError) as exc_info:
pipeline.run({"email": "invalid"})
assert exc_info.value.code == Codes.VALIDATION_ERROR
6. Logging and Debugging
6.1 Use Descriptive Step Names
Helpful for debugging:
# Good
(fetch_user_by_id, "Fetch User by ID from Database", "v1.0")
# Bad
(fetch_user, "Step 1", "v1.0")
6.2 Add Debug Output in Development
Use verbose mode for debugging:
# Development
pipeline = Pipeline(verbose=True)
# Production
pipeline = Pipeline(verbose=False)
6.3 Preserve Error Context
Include useful information in errors:
def risky_operation(data):
try:
return do_operation(data)
except Exception as e:
raise TaskError(
f"Operation failed: {e}",
step_name="Risky Operation",
original_error=e
)
7. Security
7.1 Don’t Log Sensitive Data
Protect sensitive information:
# Bad - logs password
def bad_step(data):
logger.info(f"Login attempt for user: {data['username']}, password: {data['password']}")
# Good - sanitizes
def good_step(data):
logger.info(f"Login attempt for user: {data['username']}")
return {"login_success": authenticate(data)}
7.2 Validate External Input
Never trust external data:
def process_user_input(data):
# Validate all external input
user_id = data.get("user_id")
if not isinstance(user_id, int) or user_id <= 0:
raise TaskError("Invalid user_id", code=Codes.VALIDATION_ERROR)
return {"validated": True}
8. Configuration
8.1 Use Configuration Files
Externalize configuration:
# config.yaml
pipeline:
verbose: false
timeout: 300
api:
base_url: "https://api.example.com"
timeout: 30
database:
host: "localhost"
port: 5432
from wpipe.util import leer_yaml
config = leer_yaml("config.yaml")
pipeline = Pipeline(
verbose=config["pipeline"]["verbose"],
timeout=config["pipeline"]["timeout"]
)
8.2 Use Environment Variables
For deployment:
# config.yaml with env vars
api:
token: ${API_TOKEN}
base_url: ${API_BASE_URL:-https://default.example.com}
8.3 Version Control Configuration
# .gitignore config.yaml # Don't commit secrets *.db # Don't commit databases # Commit template instead config.yaml.template
9. Code Organization
9.2 Create Reusable Step Libraries
# my_steps/__init__.py
from .validation import validate_email, validate_phone
from .transform import normalize_text, format_date
from .storage import save_to_db, save_to_s3
__all__ = [
"validate_email",
"validate_phone",
"normalize_text",
"format_date",
"save_to_db",
"save_to_s3",
]
10. Deployment
10.1 Use Virtual Environments
python -m venv venv
source venv/bin/activate
pip install -r requirements.txt
10.2 Pin Dependencies
# requirements.txt
wpipe==1.0.0
requests==2.31.0
pyyaml==6.0.1
10.3 Monitor Pipeline Health
Track execution metrics:
from wpipe import Pipeline
from wpipe.sqlite import Wsqlite
pipeline = Pipeline(verbose=False)
with Wsqlite("metrics.db") as db:
db.input = {"run_id": generate_run_id()}
result = pipeline.run({"data": process()})
db.output = {
"status": "success",
"duration": measure_duration(),
"result_size": len(str(result)),
}