API Integration Tutorial
This tutorial covers integrating your pipeline with external APIs for tracking, monitoring, and orchestration.
1. Introduction
wpipe’s API integration allows you to:
Register workers with an API server
Track pipeline execution progress
Report task status in real-time
Monitor health and performance
2. Understanding the API Client
The APIClient class provides methods for interacting with external APIs:
from wpipe import Pipeline
api_config = {
"base_url": "http://localhost:8418",
"token": "your-auth-token"
}
pipeline = Pipeline(api_config=api_config)
3. Worker Registration
3.1 Basic Registration
Register your pipeline as a worker:
from wpipe import Pipeline
api_config = {
"base_url": "http://localhost:8418",
"token": "your-auth-token"
}
pipeline = Pipeline(api_config=api_config, verbose=True)
# Register the worker
worker_info = pipeline.worker_register(
name="data_processor",
version="1.0.0"
)
print(f"Worker registered: {worker_info}")
# Expected: {'id': 'worker_abc123', 'name': 'data_processor', ...}
3.2 Setting Worker ID
After registration, set the worker ID:
if worker_info and "id" in worker_info:
worker_id = worker_info["id"]
pipeline.set_worker_id(worker_id)
print(f"Worker ID set: {worker_id}")
4. Process Tracking
4.1 Starting a Process
When you run a pipeline with API configured, it automatically registers the process:
# Define some steps
def fetch_data(data):
return {"users": [{"name": "Alice"}, {"name": "Bob"}]}
def process_data(data):
return {"count": len(data["users"])}
# Configure pipeline
pipeline = Pipeline(api_config=api_config, verbose=True)
pipeline.set_steps([
(fetch_data, "Fetch Data", "v1.0"),
(process_data, "Process Data", "v1.0"),
])
# Run the pipeline - this will register the process with the API
result = pipeline.run({"request_id": "req_123"})
4.2 Process Flow
Here’s what happens when you run a pipeline with API:
Pipeline starts → API receives process start notification
Each step begins → API receives task start notification
Each step completes → API receives task completion notification
Pipeline finishes → API receives process completion notification
5. API Methods Reference
5.1 Worker Methods
# Register worker
worker_info = pipeline.register_worker({
"name": "processor",
"version": "1.0.0",
"tasks": [{"name": "process", "version": "1.0.0"}]
})
# Health check
health = pipeline.healthcheck_worker({
"worker_id": "worker_123",
"status": "healthy"
})
5.2 Process Methods
# Start process
process_info = pipeline.register_process({
"id": "worker_123",
"steps": ["step1", "step2", "step3"]
})
# End process
pipeline.end_process({
"id": "process_456",
"status": "completed"
})
5.3 Task Methods
# Update task status
pipeline.update_task({
"task_id": "task_789",
"status": "completed",
"result": {"output": "success"}
})
6. Custom API Implementation
6.1 Extending APIClient
Create a custom API client for your specific needs:
from wpipe import APIClient
class CustomAPIClient(APIClient):
"""Custom API client with additional methods."""
def __init__(self, base_url: str, token: str, team_id: str):
super().__init__(base_url, token)
self.team_id = team_id
def report_metrics(self, metrics: dict) -> dict:
"""Report custom metrics to the API."""
data = {
"team_id": self.team_id,
"metrics": metrics,
"timestamp": self._get_timestamp()
}
return self.send_post("/metrics", data)
def _get_timestamp(self) -> str:
from datetime import datetime
return datetime.utcnow().isoformat()
7. Error Handling with API
7.1 Graceful Degradation
Handle API failures gracefully:
from wpipe import Pipeline
class FallbackPipeline(Pipeline):
"""Pipeline that works offline if API is unavailable."""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.api_available = True
def _api_task_update(self, msg: dict):
try:
super()._api_task_update(msg)
except Exception as e:
print(f"API update failed: {e}")
# Continue without API - graceful degradation
8. Complete Example
Here’s a complete example with full API integration:
from wpipe import Pipeline
import logging
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class TrackedPipeline(Pipeline):
"""Pipeline with comprehensive API tracking."""
def __init__(self, api_config: dict, worker_name: str):
super().__init__(api_config=api_config, verbose=True)
self.worker_name = worker_name
def run(self, *args, **kwargs):
logger.info(f"Starting pipeline: {self.worker_name}")
try:
result = super().run(*args, **kwargs)
logger.info(f"Pipeline completed successfully")
return result
except Exception as e:
logger.error(f"Pipeline failed: {e}")
raise
# Define steps
def fetch_data(data):
logger.info("Fetching data...")
return {"records": [{"id": 1}, {"id": 2}, {"id": 3}]}
def transform_data(data):
logger.info("Transforming data...")
records = data["records"]
transformed = [{"id": r["id"] * 2, "processed": True} for r in records]
return {"transformed_records": transformed}
def save_data(data):
logger.info("Saving data...")
return {"saved": True, "count": len(data["transformed_records"])}
# Configure and run
api_config = {
"base_url": "http://localhost:8418",
"token": "your-auth-token"
}
pipeline = TrackedPipeline(api_config, "data_pipeline")
pipeline.set_steps([
(fetch_data, "Fetch Data", "v1.0"),
(transform_data, "Transform Data", "v1.0"),
(save_data, "Save Data", "v1.0"),
])
# Register worker
worker_info = pipeline.worker_register(
name="data_pipeline",
version="1.0.0"
)
if worker_info:
pipeline.set_worker_id(worker_info.get("id"))
# Run pipeline
result = pipeline.run({})
print(f"Result: {result}")
9. Testing API Integration
9.1 Mock Server
Use a mock server for testing:
import responses
from wpipe import Pipeline
@responses.activate
def test_pipeline_with_mock_api():
"""Test pipeline with mocked API responses."""
# Mock the worker registration endpoint
responses.add(
responses.POST,
"http://localhost:8418/matricula",
json={"id": "worker_test_123", "name": "test_worker"},
status=200
)
# Mock the process registration endpoint
responses.add(
responses.POST,
"http://localhost:8418/newprocess",
json={"father": "process_123", "sons": []},
status=200
)
# Run pipeline
pipeline = Pipeline(
api_config={"base_url": "http://localhost:8418", "token": "test"}
)
pipeline.set_steps([
(lambda d: {"result": "ok"}, "Test Step", "v1.0"),
])
result = pipeline.run({})
assert result["result"] == "ok"
10. Security Best Practices
10.1 Token Management
Never hardcode tokens in source code
Use environment variables or secure vaults
Rotate tokens regularly
import os
api_config = {
"base_url": os.environ["API_BASE_URL"],
"token": os.environ["API_TOKEN"] # Set in environment
}
10.2 HTTPS Only
Always use HTTPS in production:
# Development (HTTP OK)
api_config = {"base_url": "http://localhost:8418", "token": "..."}
# Production (HTTPS required)
api_config = {"base_url": "https://api.production.com", "token": "..."}
11. Troubleshooting
11.1 Common Issues
Connection refused: Check API server is running
401 Unauthorized: Verify token is correct
Timeout: Check network connectivity
11.2 Debug Mode
Enable debug output:
import logging
logging.getLogger("wpipe").setLevel(logging.DEBUG)
12. Next Steps
Now you understand API integration:
Error Handling Tutorial - Handle errors gracefully
Retry Logic Tutorial - Add automatic retries
Production Deployment Tutorial - Deploy to production