Python SDK
The PipeClient class provides a programmatic interface to all Pipe operations. Use it to embed pipeline management in scripts, notebooks, or applications.
Import
Section titled “Import”from dataspoc_pipe.sdk import PipeClient
client = PipeClient()No configuration is needed. PipeClient reads the same ~/.dataspoc-pipe/ config directory as the CLI.
Methods
Section titled “Methods”client.pipelines()
Section titled “client.pipelines()”Return the names of all configured pipelines.
Returns: list[str]
names = client.pipelines()print(names)# ['orders', 'customers', 'events']client.config(name)
Section titled “client.config(name)”Return the full configuration of a pipeline as a plain dict.
Parameters:
name(str): Pipeline name
Returns: dict with keys name, source, destination, incremental, schedule
cfg = client.config("orders")print(cfg["source"]["tap"]) # 'tap-csv'print(cfg["destination"]["bucket"]) # 's3://my-lake'print(cfg["incremental"]["enabled"]) # Trueclient.run(name, full=False)
Section titled “client.run(name, full=False)”Execute a pipeline. Handles state loading, manifest update, and execution logging — the same post-run steps as the CLI.
Parameters:
name(str): Pipeline namefull(bool, optional): Force full extraction, ignoring incremental state. Default:False
Returns: dict with keys success, streams, error
result = client.run("orders")if result["success"]: print(f"Extracted {sum(result['streams'].values())} records") for stream, count in result["streams"].items(): print(f" {stream}: {count}")else: print(f"Failed: {result['error']}")Extracted 5200 records orders: 5200Force a full extraction:
result = client.run("orders", full=True)client.status()
Section titled “client.status()”Return status for all configured pipelines.
Returns: list[dict], each with keys name, last_run, status, duration, records
for pipeline in client.status(): print(f"{pipeline['name']}: {pipeline['status']} ({pipeline['records']} records)")orders: success (5200 records)customers: success (12400 records)events: no runs (None records)client.logs(name)
Section titled “client.logs(name)”Return the latest execution log for a pipeline.
Parameters:
name(str): Pipeline name
Returns: dict | None
log = client.logs("orders")if log: print(f"Status: {log['status']}") print(f"Duration: {log['duration_seconds']}s") print(f"Records: {log['total_records']}")Returns None if the pipeline has never been run.
client.manifest(bucket)
Section titled “client.manifest(bucket)”Return the manifest (catalog) for a bucket.
Parameters:
bucket(str): Bucket URI (e.g.,s3://my-lake,file:///tmp/lake)
Returns: dict with table catalog
m = client.manifest("s3://my-lake")for table_name, info in m.get("tables", {}).items(): print(f"{table_name}: {info.get('total_records', 0)} records")client.validate(name)
Section titled “client.validate(name)”Test bucket connectivity and tap availability for a pipeline.
Parameters:
name(str): Pipeline name
Returns: dict with keys bucket_ok, tap_ok, errors
result = client.validate("orders")if result["bucket_ok"] and result["tap_ok"]: print("Pipeline is ready")else: for error in result["errors"]: print(f"Issue: {error}")Full example: run all pipelines and report
Section titled “Full example: run all pipelines and report”from dataspoc_pipe.sdk import PipeClient
client = PipeClient()
for name in client.pipelines(): # Validate before running check = client.validate(name) if not check["bucket_ok"] or not check["tap_ok"]: print(f"SKIP {name}: {check['errors']}") continue
# Run the pipeline result = client.run(name) if result["success"]: total = sum(result["streams"].values()) print(f"OK {name}: {total} records") else: print(f"FAIL {name}: {result['error']}")Full example: Jupyter notebook integration
Section titled “Full example: Jupyter notebook integration”from dataspoc_pipe.sdk import PipeClientimport pandas as pd
client = PipeClient()
# Check what is availableprint("Pipelines:", client.pipelines())print()
# Show status as a DataFramestatus = client.status()df = pd.DataFrame(status)df| name | last_run | status | duration | records |
|---|---|---|---|---|
| orders | 2025-01-20T14:00:00 | success | 3.2 | 5200 |
| customers | 2025-01-20T01:30:00 | success | 12.5 | 12400 |