Pular para o conteúdo

Python SDK

A classe PipeClient oferece uma interface programática para todas as operações do Pipe. Use-a para integrar gerenciamento de pipelines em scripts, notebooks ou aplicações.

from dataspoc_pipe.sdk import PipeClient
client = PipeClient()

Nenhuma configuração é necessária. O PipeClient lê o mesmo diretório de configuração ~/.dataspoc-pipe/ que a CLI.

Retorna os nomes de todos os pipelines configurados.

Retorna: list[str]

names = client.pipelines()
print(names)
# ['orders', 'customers', 'events']

Retorna a configuração completa de um pipeline como um dict simples.

Parâmetros:

  • name (str): Nome do pipeline

Retorna: dict com as chaves name, source, destination, incremental, schedule

cfg = client.config("orders")
print(cfg["source"]["tap"]) # 'tap-csv'
print(cfg["destination"]["bucket"]) # 's3://my-lake'
print(cfg["incremental"]["enabled"]) # True

Executa um pipeline. Cuida do carregamento de estado, atualização do manifest e logging de execução — os mesmos passos pós-run que a CLI.

Parâmetros:

  • name (str): Nome do pipeline
  • full (bool, opcional): Forçar extração completa, ignorando o estado incremental. Padrão: False

Retorna: dict com as chaves success, streams, error

result = client.run("orders")
if result["success"]:
print(f"Extracted {sum(result['streams'].values())} records")
for stream, count in result["streams"].items():
print(f" {stream}: {count}")
else:
print(f"Failed: {result['error']}")
Extracted 5200 records
orders: 5200

Forçar extração completa:

result = client.run("orders", full=True)

Retorna o status de todos os pipelines configurados.

Retorna: list[dict], cada um com as chaves name, last_run, status, duration, records

for pipeline in client.status():
print(f"{pipeline['name']}: {pipeline['status']} ({pipeline['records']} records)")
orders: success (5200 records)
customers: success (12400 records)
events: no runs (None records)

Retorna o log de execução mais recente de um pipeline.

Parâmetros:

  • name (str): Nome do pipeline

Retorna: dict | None

log = client.logs("orders")
if log:
print(f"Status: {log['status']}")
print(f"Duration: {log['duration_seconds']}s")
print(f"Records: {log['total_records']}")

Retorna None se o pipeline nunca foi executado.

Retorna o manifest (catálogo) de um bucket.

Parâmetros:

  • bucket (str): URI do bucket (ex.: s3://my-lake, file:///tmp/lake)

Retorna: dict com o catálogo de tabelas

m = client.manifest("s3://my-lake")
for table_name, info in m.get("tables", {}).items():
print(f"{table_name}: {info.get('total_records', 0)} records")

Testa a conectividade do bucket e a disponibilidade do tap para um pipeline.

Parâmetros:

  • name (str): Nome do pipeline

Retorna: dict com as chaves bucket_ok, tap_ok, errors

result = client.validate("orders")
if result["bucket_ok"] and result["tap_ok"]:
print("Pipeline is ready")
else:
for error in result["errors"]:
print(f"Issue: {error}")

Exemplo completo: executar todos os pipelines e reportar

Seção intitulada “Exemplo completo: executar todos os pipelines e reportar”
from dataspoc_pipe.sdk import PipeClient
client = PipeClient()
for name in client.pipelines():
# Validate before running
check = client.validate(name)
if not check["bucket_ok"] or not check["tap_ok"]:
print(f"SKIP {name}: {check['errors']}")
continue
# Run the pipeline
result = client.run(name)
if result["success"]:
total = sum(result["streams"].values())
print(f"OK {name}: {total} records")
else:
print(f"FAIL {name}: {result['error']}")

Exemplo completo: integração com Jupyter notebook

Seção intitulada “Exemplo completo: integração com Jupyter notebook”
from dataspoc_pipe.sdk import PipeClient
import pandas as pd
client = PipeClient()
# Check what is available
print("Pipelines:", client.pipelines())
print()
# Show status as a DataFrame
status = client.status()
df = pd.DataFrame(status)
df
namelast_runstatusdurationrecords
orders2025-01-20T14:00:00success3.25200
customers2025-01-20T01:30:00success12.512400