Class-Based Steps Tutorial
This tutorial covers using Python classes as pipeline steps, enabling more sophisticated processing patterns.
1. Introduction
While function-based steps are simple and effective, class-based steps offer several advantages:
State Management: Maintain state across multiple calls
Configuration: Configure behavior at initialization
Encapsulation: Bundle related functionality together
Reusability: Create reusable step classes
2. The __call__ Protocol
For a class to work as a pipeline step, it must implement the __call__ method:
class MyStep:
def __call__(self, data: dict) -> dict:
# Your processing logic here
return {"result": "value"}
When the pipeline calls the step, it actually calls the __call__ method.
3. Creating Step Classes
3.1 Basic Step Class
Here’s a basic step class that multiplies a value:
class Multiply:
"""Multiply a value by a factor.
Args:
factor: The multiplier to apply
"""
def __init__(self, factor: float):
self.factor = factor
def __call__(self, data: dict) -> dict:
value = data.get("value", 0)
return {"result": value * self.factor}
Usage:
from wpipe import Pipeline
pipeline = Pipeline()
pipeline.set_steps([
(Multiply(2), "Multiply by 2", "v1.0"),
])
result = pipeline.run({"value": 10})
print(result["result"]) # 20
3.2 Configurable Step Class
Create more flexible steps with configuration options:
class DataTransformer:
"""Transform data with configurable operations.
Args:
multiplier: Value to multiply by
offset: Value to add after multiplication
round_digits: Number of decimal places to round to
"""
def __init__(
self,
multiplier: float = 1.0,
offset: float = 0.0,
round_digits: int = None
):
self.multiplier = multiplier
self.offset = offset
self.round_digits = round_digits
def __call__(self, data: dict) -> dict:
value = data.get("value", 0)
result = value * self.multiplier + self.offset
if self.round_digits is not None:
result = round(result, self.round_digits)
return {"transformed": result}
Usage:
# Create pipeline with different configurations
pipeline = Pipeline()
pipeline.set_steps([
(DataTransformer(multiplier=2, offset=10, round_digits=2),
"Transform Data", "v1.0"),
])
result = pipeline.run({"value": 5})
# 5 * 2 + 10 = 20, rounded to 2 decimals
print(result["transformed"]) # 20.0
4. Stateful Steps
Classes can maintain state across multiple pipeline executions:
4.1 Running Total
class RunningTotal:
"""Maintain a running total across multiple calls."""
def __init__(self):
self.total = 0
self.call_count = 0
def __call__(self, data: dict) -> dict:
value = data.get("value", 0)
self.total += value
self.call_count += 1
return {
"running_total": self.total,
"calls": self.call_count,
"last_value": value
}
Usage with separate calls:
step = RunningTotal()
result1 = step({"value": 10})
print(result1) # {'running_total': 10, 'calls': 1, 'last_value': 10}
result2 = step({"value": 20})
print(result2) # {'running_total': 30, 'calls': 2, 'last_value': 20}
4.2 Accumulator Pattern
Build up data across multiple steps:
class Accumulator:
"""Accumulate values into a list."""
def __init__(self, key: str = "items"):
self.key = key
self.items = []
def __call__(self, data: dict) -> dict:
# Add new item to accumulator
if "item" in data:
self.items.append(data["item"])
return {self.key: self.items.copy()}
5. Composable Steps
Create reusable building blocks:
5.1 Pipeline Building Blocks
class Add:
def __init__(self, amount: float):
self.amount = amount
def __call__(self, data: dict) -> dict:
value = data.get("value", 0)
return {"value": value + self.amount}
class Multiply:
def __init__(self, factor: float):
self.factor = factor
def __call__(self, data: dict) -> dict:
value = data.get("value", 1)
return {"value": value * self.factor}
class Square:
def __call__(self, data: dict) -> dict:
value = data.get("value", 0)
return {"value": value ** 2}
Combine them:
from wpipe import Pipeline
pipeline = Pipeline()
pipeline.set_steps([
(lambda d: {"value": 2}, "Initialize", "v1.0"),
(Add(5), "Add 5", "v1.0"), # 2 + 5 = 7
(Multiply(3), "Multiply by 3", "v1.0"), # 7 * 3 = 21
(Square(), "Square", "v1.0"), # 21 ^ 2 = 441
])
result = pipeline.run({})
print(result["value"]) # 441
6. Class Methods as Steps
Use class methods for more complex processing:
class DataProcessor:
def __init__(self, config: dict):
self.config = config
def extract(self, data: dict) -> dict:
"""Extract relevant fields."""
fields = self.config.get("fields", [])
extracted = {k: data.get(k) for k in fields if k in data}
return {"extracted": extracted}
def transform(self, data: dict) -> dict:
"""Transform extracted data."""
extracted = data.get("extracted", {})
transformed = {k: v.upper() if isinstance(v, str) else v
for k, v in extracted.items()}
return {"transformed": transformed}
def load(self, data: dict) -> dict:
"""Prepare for output."""
return {"result": data.get("transformed", {})}
Usage:
processor = DataProcessor(fields=["name", "email"])
pipeline = Pipeline()
pipeline.set_steps([
(processor.extract, "Extract", "v1.0"),
(processor.transform, "Transform", "v1.0"),
(processor.load, "Load", "v1.0"),
])
result = pipeline.run({"name": "john", "email": "john@example.com"})
print(result["result"]) # {'name': 'JOHN', 'email': 'JOHN@EXAMPLE.COM'}
7. Best Practices
7.1 Keep Steps Focused
Each step should do one thing well:
# Good: Single responsibility
class ValidateEmail:
def __call__(self, data: dict) -> dict:
email = data.get("email", "")
if "@" not in email:
raise ValueError("Invalid email format")
return {"valid": True}
7.2 Use Type Hints
Add type hints for better IDE support:
class TypedStep:
def __init__(self, multiplier: float) -> None:
self.multiplier = multiplier
def __call__(self, data: dict) -> dict:
value: float = data.get("value", 0.0)
result: float = value * self.multiplier
return {"result": result}
7.3 Document Your Classes
Add docstrings for clarity:
class DocumentedStep:
"""Description of what this step does.
Args:
param1: Description of param1
param2: Description of param2
Returns:
Dictionary with key descriptions
"""
def __init__(self, param1: str, param2: int = 10):
self.param1 = param1
self.param2 = param2
def __call__(self, data: dict) -> dict:
# Implementation
return {}
8. Advanced Patterns
8.1 Step Factory
Create steps dynamically:
def create_multiplier(factor: float):
"""Factory function for creating multiplier steps."""
class Multiplier:
def __call__(self, data: dict) -> dict:
value = data.get("value", 0)
return {"result": value * factor}
return Multiplier()
pipeline = Pipeline()
pipeline.set_steps([
(create_multiplier(2), "Double", "v1.0"),
(create_multiplier(3), "Triple", "v1.0"),
])
8.2 Decorator Pattern
Wrap steps with additional functionality:
class TimedStep:
"""Wrap a step to measure execution time."""
def __init__(self, step):
self.step = step
def __call__(self, data: dict) -> dict:
import time
start = time.time()
result = self.step(data)
elapsed = time.time() - start
result["elapsed_time"] = elapsed
return result
9. Complete Example
Here’s a complete example combining many concepts:
from wpipe import Pipeline
import time
class RetryableStep:
"""Step that can retry on failure."""
def __init__(self, max_retries: int = 3):
self.max_retries = max_retries
def __call__(self, data: dict) -> dict:
for attempt in range(self.max_retries):
try:
# Simulate work
result = data["value"] * 2
return {"result": result, "attempts": attempt + 1}
except Exception as e:
if attempt == self.max_retries - 1:
raise
time.sleep(0.1) # Wait before retry
return {}
class LoggingStep:
"""Step that logs its execution."""
def __init__(self, name: str):
self.name = name
def __call__(self, data: dict) -> dict:
print(f"[{self.name}] Input: {data}")
result = {"logged": True, "input_data": data}
print(f"[{self.name}] Output: {result}")
return result
# Create pipeline with class-based steps
pipeline = Pipeline(verbose=True)
pipeline.set_steps([
(lambda d: {"value": 10}, "Initialize", "v1.0"),
(LoggingStep("Processing"), "Log Start", "v1.0"),
(RetryableStep(max_retries=3), "Process", "v1.0"),
(LoggingStep("Complete"), "Log End", "v1.0"),
])
result = pipeline.run({})
print(f"Final result: {result}")
10. Next Steps
Now you understand class-based steps, continue to:
API Integration Tutorial - Add API tracking
Error Handling Tutorial - Handle errors gracefully
Nested Pipelines Tutorial - Combine pipelines together