Python SDK
A classe PipeClient oferece uma interface programática para todas as operações do Pipe. Use-a para integrar gerenciamento de pipelines em scripts, notebooks ou aplicações.
from dataspoc_pipe.sdk import PipeClient
client = PipeClient()Nenhuma configuração é necessária. O PipeClient lê o mesmo diretório de configuração ~/.dataspoc-pipe/ que a CLI.
Métodos
Seção intitulada “Métodos”client.pipelines()
Seção intitulada “client.pipelines()”Retorna os nomes de todos os pipelines configurados.
Retorna: list[str]
names = client.pipelines()print(names)# ['orders', 'customers', 'events']client.config(name)
Seção intitulada “client.config(name)”Retorna a configuração completa de um pipeline como um dict simples.
Parâmetros:
name(str): Nome do pipeline
Retorna: dict com as chaves name, source, destination, incremental, schedule
cfg = client.config("orders")print(cfg["source"]["tap"]) # 'tap-csv'print(cfg["destination"]["bucket"]) # 's3://my-lake'print(cfg["incremental"]["enabled"]) # Trueclient.run(name, full=False)
Seção intitulada “client.run(name, full=False)”Executa um pipeline. Cuida do carregamento de estado, atualização do manifest e logging de execução — os mesmos passos pós-run que a CLI.
Parâmetros:
name(str): Nome do pipelinefull(bool, opcional): Forçar extração completa, ignorando o estado incremental. Padrão:False
Retorna: dict com as chaves success, streams, error
result = client.run("orders")if result["success"]: print(f"Extracted {sum(result['streams'].values())} records") for stream, count in result["streams"].items(): print(f" {stream}: {count}")else: print(f"Failed: {result['error']}")Extracted 5200 records orders: 5200Forçar extração completa:
result = client.run("orders", full=True)client.status()
Seção intitulada “client.status()”Retorna o status de todos os pipelines configurados.
Retorna: list[dict], cada um com as chaves name, last_run, status, duration, records
for pipeline in client.status(): print(f"{pipeline['name']}: {pipeline['status']} ({pipeline['records']} records)")orders: success (5200 records)customers: success (12400 records)events: no runs (None records)client.logs(name)
Seção intitulada “client.logs(name)”Retorna o log de execução mais recente de um pipeline.
Parâmetros:
name(str): Nome do pipeline
Retorna: dict | None
log = client.logs("orders")if log: print(f"Status: {log['status']}") print(f"Duration: {log['duration_seconds']}s") print(f"Records: {log['total_records']}")Retorna None se o pipeline nunca foi executado.
client.manifest(bucket)
Seção intitulada “client.manifest(bucket)”Retorna o manifest (catálogo) de um bucket.
Parâmetros:
bucket(str): URI do bucket (ex.:s3://my-lake,file:///tmp/lake)
Retorna: dict com o catálogo de tabelas
m = client.manifest("s3://my-lake")for table_name, info in m.get("tables", {}).items(): print(f"{table_name}: {info.get('total_records', 0)} records")client.validate(name)
Seção intitulada “client.validate(name)”Testa a conectividade do bucket e a disponibilidade do tap para um pipeline.
Parâmetros:
name(str): Nome do pipeline
Retorna: dict com as chaves bucket_ok, tap_ok, errors
result = client.validate("orders")if result["bucket_ok"] and result["tap_ok"]: print("Pipeline is ready")else: for error in result["errors"]: print(f"Issue: {error}")Exemplo completo: executar todos os pipelines e reportar
Seção intitulada “Exemplo completo: executar todos os pipelines e reportar”from dataspoc_pipe.sdk import PipeClient
client = PipeClient()
for name in client.pipelines(): # Validate before running check = client.validate(name) if not check["bucket_ok"] or not check["tap_ok"]: print(f"SKIP {name}: {check['errors']}") continue
# Run the pipeline result = client.run(name) if result["success"]: total = sum(result["streams"].values()) print(f"OK {name}: {total} records") else: print(f"FAIL {name}: {result['error']}")Exemplo completo: integração com Jupyter notebook
Seção intitulada “Exemplo completo: integração com Jupyter notebook”from dataspoc_pipe.sdk import PipeClientimport pandas as pd
client = PipeClient()
# Check what is availableprint("Pipelines:", client.pipelines())print()
# Show status as a DataFramestatus = client.status()df = pd.DataFrame(status)df| name | last_run | status | duration | records |
|---|---|---|---|---|
| orders | 2025-01-20T14:00:00 | success | 3.2 | 5200 |
| customers | 2025-01-20T01:30:00 | success | 12.5 | 12400 |