SQLite Storage
Persist pipeline execution results to SQLite databases.
Overview
wpipe provides SQLite integration for storing pipeline results locally.
Basic Usage
Write Results:
from wpipe.sqlite import Wsqlite
db = Wsqlite("pipeline_results.db")
pipeline = Pipeline(verbose=True)
pipeline.set_steps([
(step1, "Step 1", "v1.0"),
(step2, "Step 2", "v1.0"),
])
db.write(input_data={"x": 10}, output_data=pipeline.run({"x": 10}))
Read Results:
results = db.read()
for row in results:
print(row)
Query Results:
results = db.query("SELECT * FROM results WHERE input_x > ?", (5,))
Class-Based Access
For more control:
from wpipe.sqlite import SQLite
db = SQLite("custom_results.db")
db.create_table()
db.insert(input_data={"test": True}, output_data={"result": 42})
results = db.select_all()
db.close()
Methods
Method |
Description |
|---|---|
|
Insert pipeline results |
|
Read all results |
|
Execute SQL query |
|
Create results table |
|
Insert single result |
|
Select all rows |
|
Close database connection |
Complete Example
from wpipe import Pipeline
from wpipe.sqlite import Wsqlite
db = Wsqlite("analytics.db")
def collect_metrics(data):
return {"requests": 100, "errors": 2}
def calculate_rate(data):
rate = (data["errors"] / data["requests"]) * 100
return {"error_rate": rate}
pipeline = Pipeline()
pipeline.set_steps([
(collect_metrics, "Collect", "v1.0"),
(calculate_rate, "Calculate", "v1.0"),
])
result = pipeline.run({})
db.write(input_data={"service": "api"}, output_data=result)
print(f"Error rate: {result['error_rate']}%")
Best Practices
Use meaningful table names: Easier to manage multiple databases
Close connections: Prevent resource leaks
Handle large data: Consider compressing for storage
Backup regularly: Protect against data loss
Next Steps
Learn about YAML Configuration for configuration management
Explore Nested Pipelines for complex workflows