Troubleshooting
This guide helps you diagnose and fix common issues with wpipe.
1. Common Errors
1.1 Step Must Return Dictionary
Error Message:
TypeError: Step must return a dictionary
Cause: A step function returned None or a non-dictionary value.
Solution:
# Wrong
def bad_step(data):
process(data)
# Missing return!
def bad_step2(data):
return None # Explicit None
# Correct
def good_step(data):
process(data)
return {"processed": True}
1.2 KeyError - Missing Key
Error Message:
KeyError: 'some_key'
Cause: Step tried to access a key that doesn’t exist in the data.
Solution:
# Wrong - assumes key exists
def bad_step(data):
return {"doubled": data["x"] * 2}
# Correct - use .get() with default
def good_step(data):
return {"doubled": data.get("x", 0) * 2}
# Or validate and provide context
def safe_step(data):
if "x" not in data:
return {"error": "Missing 'x' in data", "doubled": None}
return {"doubled": data["x"] * 2}
1.3 TypeError - Wrong Type
Error Message:
TypeError: unsupported operand type(s) for *: 'str' and 'int'
Cause: Operation on incompatible types.
Solution:
# Wrong
def bad_step(data):
return {"result": data["value"] * 2} # 'value' might be string
# Correct - cast types
def good_step(data):
value = int(data.get("value", 0))
return {"result": value * 2}
# Or validate types
def validated_step(data):
value = data.get("value")
if not isinstance(value, (int, float)):
raise ValueError(f"Expected number, got {type(value)}")
return {"result": value * 2}
1.4 TaskError - Step Failed
Error Message:
wpipe.exception.TaskError: Step 'Step Name' failed: Original error message
Cause: A step raised an exception.
Solution:
from wpipe.exception import TaskError
def my_step(data):
try:
return do_work(data)
except ValueError as e:
raise TaskError(
f"Validation failed: {e}",
step_name="My Step",
original_error=e
)
2. Installation Issues
2.1 Python Version Not Supported
Error:
ERROR: Python 3.8 is not supported. Please use Python 3.9 or higher.
Solution:
# Check Python version
python --version
# Install newer Python or use pyenv
pyenv install 3.10.12
pyenv local 3.10.12
2.2 pip Not Found
Error:
bash: pip: command not found
Solution:
# Use python -m pip
python -m pip install wpipe
# Or install pip first
python ensurepip
2.3 Permission Denied
Error:
ERROR: Could not install packages due to an OSError: [Errno 13] Permission denied
Solution:
# Use virtual environment (recommended)
python -m venv venv
source venv/bin/activate
pip install wpipe
# Or install for current user
pip install --user wpipe
3. Import Issues
3.1 Module Not Found
Error:
ModuleNotFoundError: No module named 'wpipe'
Solution:
# Install wpipe
pip install wpipe
# Or install from source
git clone https://github.com/wisrovi/wpipe
cd wpipe
pip install -e .
3.2 Circular Import
Error:
ImportError: cannot import name 'X' from partially initialized module
Solution:
Circular imports usually indicate a code structure issue. Restructure your imports:
# module_a.py
from module_b import function_b
# module_b.py
# Don't import from module_a here - causes circular import
4. API Integration Issues
4.1 Cannot Connect to API
Symptoms:
Pipeline hangs indefinitely
Timeout errors
Connection refused
Debugging:
import requests
# Test connectivity
api_config = {
"base_url": "http://localhost:8418",
"token": "my-token"
}
# Direct test
response = requests.get(
f"{api_config['base_url']}/health",
headers={"Authorization": f"Bearer {api_config['token']}"},
timeout=5
)
print(f"Status: {response.status_code}")
print(f"Response: {response.text}")
Checklist:
Is the API server running?
Is the base_url correct?
Is the token valid?
Is there a network/firewall issue?
4.2 Authentication Failed
Error:
ApiError: Authentication failed
Solutions:
# Check token is correct
api_config = {
"base_url": "http://localhost:8418",
"token": "correct-token-here"
}
# Token shouldn't have extra spaces
api_config["token"] = api_config["token"].strip()
4.3 Worker Registration Failed
Error:
ApiError: Failed to register worker
Solutions:
# Check API is running
pipeline = Pipeline(api_config=api_config, verbose=True)
# Register with explicit parameters
try:
worker = pipeline.worker_register(
name="my_worker",
version="1.0.0"
)
print(f"Worker ID: {worker.get('id')}")
except Exception as e:
print(f"Registration failed: {e}")
5. SQLite Issues
5.1 Database Locked
Error:
sqlite3.OperationalError: database is locked
Solutions:
# Use context manager for proper cleanup
with Wsqlite(db_name="results.db") as db:
db.input = data
result = pipeline.run(data)
db.output = result
# Database automatically released here
# Or close explicitly
db = Wsqlite(db_name="results.db")
try:
# ... use database
pass
finally:
db.close() # Release lock
5.2 Table Not Found
Error:
sqlite3.OperationalError: no such table: pipeline_executions
Solutions:
# Create table if not exists
db = Wsqlite(db_name="results.db", create=True)
# Table is created automatically
# Or create manually
db = Sqlite(db_name="results.db", create=True)
db.create_table()
5.3 Data Not Persisting
Symptoms: Data is not saved to database after pipeline runs.
Solutions:
# Wrong order - output set before run
with Wsqlite(db_name="results.db") as db:
db.input = {"x": 10}
db.output = {"result": 20} # WRONG - not from pipeline
result = pipeline.run({"x": 10}) # Result not saved!
# Correct order
with Wsqlite(db_name="results.db") as db:
db.input = {"x": 10}
result = pipeline.run({"x": 10})
db.output = result # CORRECT - save pipeline result
6. Performance Issues
6.1 Pipeline Very Slow
Causes:
Large data processing
Network latency
Inefficient algorithms
Optimization:
# Profile to find bottleneck
from wpipe.ram import memory
start_mem = memory()
result = pipeline.run(data)
end_mem = memory()
print(f"Memory used: {end_mem - start_mem:.2f} MB")
# Optimize slow steps
def slow_step(data):
# Inefficient - processes all items
results = []
for item in data["items"]:
results.append(process(item))
return {"results": results}
def fast_step(data):
# Efficient - batch processing
return {"results": batch_process(data["items"])}
6.2 Memory Usage Too High
Solutions:
# Monitor memory
from wpipe.ram import memory
def step_with_monitoring(data):
before = memory()
result = do_work(data)
after = memory()
print(f"Step memory: {after - before:.2f} MB")
return result
# Process in chunks
def chunked_processing(data):
for chunk in chunked(data["large_dataset"], size=1000):
yield from process_chunk(chunk)
7. Debugging Techniques
7.1 Enable Verbose Mode
pipeline = Pipeline(verbose=True)
result = pipeline.run(data)
7.2 Add Debug Steps
def debug_step(data):
print(f"DEBUG: Current data keys: {list(data.keys())}")
print(f"DEBUG: Sample values: { {k: data[k] for k in list(data.keys())[:3]} }")
return data # Pass through unchanged
pipeline.set_steps([
(step1, "Step 1", "v1.0"),
(debug_step, "DEBUG", "v1.0"), # Insert debug step
(step2, "Step 2", "v1.0"),
])
7.3 Test Steps Individually
# Test each step in isolation
test_data = {"x": 10}
print("Testing step1:")
result1 = step1(test_data)
print(f" Result: {result1}")
print("Testing step2:")
result2 = step2({**test_data, **result1})
print(f" Result: {result2}")
7.4 Use Try/Except
from wpipe.exception import TaskError
try:
result = pipeline.run(data)
except TaskError as e:
print(f"Failed at: {e.step_name}")
print(f"Error code: {e.code}")
print(f"Original error: {e.original_error}")
print(f"Partial results: {getattr(e, 'partial_results', 'N/A')}")
8. Getting Help
8.1 Collect Information
When reporting an issue, include:
import sys
import wpipe
print(f"Python: {sys.version}")
print(f"wpipe: {wpipe.__version__}")
# Your environment info
import platform
print(f"OS: {platform.platform()}")
print(f"Architecture: {platform.machine()}")
8.2 Check Existing Issues
Visit https://github.com/wisrovi/wpipe/issues to see if your issue is already reported.
8.3 Report New Issues
Include:
Python version
wpipe version
Minimal reproducible example
Full error traceback
Expected vs actual behavior