API Integration
Connect your pipelines to external APIs for worker registration and process tracking.
Overview
wpipe can integrate with external APIs to:
Register workers and track their health
Log process executions
Monitor task progress
Store execution history
Basic Configuration
from wpipe import Pipeline
api_config = {
"base_url": "http://localhost:8418",
"token": "your_secret_token"
}
pipeline = Pipeline(
worker_name="my_worker",
api_config=api_config,
verbose=True
)
Configuration Options
Option |
Type |
Description |
|---|---|---|
|
str |
API server URL (required) |
|
str |
Authentication token (required) |
|
int |
Request timeout in seconds (default: 30) |
|
dict |
Custom HTTP headers |
|
int |
API call retry attempts |
Worker Registration
Register your pipeline as a worker:
worker_id = pipeline.worker_register(
name="data_processor",
version="v1.0.0"
)
if worker_id:
pipeline.set_worker_id(worker_id.get("id"))
Health Checks
Keep worker alive with health checks:
import threading
import time
def health_checker():
while not stop_event.is_set():
pipeline.healthcheck_worker(worker_id)
time.sleep(30)
stop_event = threading.Event()
checker_thread = threading.Thread(target=health_checker)
checker_thread.start()
# Later, stop the checker
stop_event.set()
checker_thread.join()
Complete Example
from wpipe import Pipeline
api_config = {
"base_url": "http://localhost:8418",
"token": "my_secret_token"
}
pipeline = Pipeline(
worker_name="etl_pipeline",
api_config=api_config,
verbose=True
)
pipeline.set_steps([
(extract_data, "Extract", "v1.0"),
(transform_data, "Transform", "v1.0"),
(load_data, "Load", "v1.0"),
])
worker_id = pipeline.worker_register("etl_worker", "v1.0")
pipeline.set_worker_id(worker_id.get("id"))
result = pipeline.run({"source": "database"})
Error Handling
Handle API errors gracefully:
pipeline.SHOW_API_ERRORS = False # Silent mode (default)
pipeline.SHOW_API_ERRORS = True # Raises exceptions
Local Mode
Run without API (local only):
pipeline = Pipeline(verbose=True)
# No api_config = local mode
Best Practices
Handle unavailable API: Design for graceful degradation
Configure timeouts: Prevent indefinite hanging
Store worker_id: Persist to avoid re-registration
Use health checks: Keep worker status updated
Next Steps
Learn about SQLite Storage for local data storage
Explore Error Handling for error recovery