Skip to content

Python SDK

The PipeClient class provides a programmatic interface to all Pipe operations. Use it to embed pipeline management in scripts, notebooks, or applications.

from dataspoc_pipe.sdk import PipeClient
client = PipeClient()

No configuration is needed. PipeClient reads the same ~/.dataspoc-pipe/ config directory as the CLI.

Return the names of all configured pipelines.

Returns: list[str]

names = client.pipelines()
print(names)
# ['orders', 'customers', 'events']

Return the full configuration of a pipeline as a plain dict.

Parameters:

  • name (str): Pipeline name

Returns: dict with keys name, source, destination, incremental, schedule

cfg = client.config("orders")
print(cfg["source"]["tap"]) # 'tap-csv'
print(cfg["destination"]["bucket"]) # 's3://my-lake'
print(cfg["incremental"]["enabled"]) # True

Execute a pipeline. Handles state loading, manifest update, and execution logging — the same post-run steps as the CLI.

Parameters:

  • name (str): Pipeline name
  • full (bool, optional): Force full extraction, ignoring incremental state. Default: False

Returns: dict with keys success, streams, error

result = client.run("orders")
if result["success"]:
print(f"Extracted {sum(result['streams'].values())} records")
for stream, count in result["streams"].items():
print(f" {stream}: {count}")
else:
print(f"Failed: {result['error']}")
Extracted 5200 records
orders: 5200

Force a full extraction:

result = client.run("orders", full=True)

Return status for all configured pipelines.

Returns: list[dict], each with keys name, last_run, status, duration, records

for pipeline in client.status():
print(f"{pipeline['name']}: {pipeline['status']} ({pipeline['records']} records)")
orders: success (5200 records)
customers: success (12400 records)
events: no runs (None records)

Return the latest execution log for a pipeline.

Parameters:

  • name (str): Pipeline name

Returns: dict | None

log = client.logs("orders")
if log:
print(f"Status: {log['status']}")
print(f"Duration: {log['duration_seconds']}s")
print(f"Records: {log['total_records']}")

Returns None if the pipeline has never been run.

Return the manifest (catalog) for a bucket.

Parameters:

  • bucket (str): Bucket URI (e.g., s3://my-lake, file:///tmp/lake)

Returns: dict with table catalog

m = client.manifest("s3://my-lake")
for table_name, info in m.get("tables", {}).items():
print(f"{table_name}: {info.get('total_records', 0)} records")

Test bucket connectivity and tap availability for a pipeline.

Parameters:

  • name (str): Pipeline name

Returns: dict with keys bucket_ok, tap_ok, errors

result = client.validate("orders")
if result["bucket_ok"] and result["tap_ok"]:
print("Pipeline is ready")
else:
for error in result["errors"]:
print(f"Issue: {error}")

Full example: run all pipelines and report

Section titled “Full example: run all pipelines and report”
from dataspoc_pipe.sdk import PipeClient
client = PipeClient()
for name in client.pipelines():
# Validate before running
check = client.validate(name)
if not check["bucket_ok"] or not check["tap_ok"]:
print(f"SKIP {name}: {check['errors']}")
continue
# Run the pipeline
result = client.run(name)
if result["success"]:
total = sum(result["streams"].values())
print(f"OK {name}: {total} records")
else:
print(f"FAIL {name}: {result['error']}")

Full example: Jupyter notebook integration

Section titled “Full example: Jupyter notebook integration”
from dataspoc_pipe.sdk import PipeClient
import pandas as pd
client = PipeClient()
# Check what is available
print("Pipelines:", client.pipelines())
print()
# Show status as a DataFrame
status = client.status()
df = pd.DataFrame(status)
df
namelast_runstatusdurationrecords
orders2025-01-20T14:00:00success3.25200
customers2025-01-20T01:30:00success12.512400