Ir al contenido

Python SDK

La clase PipeClient proporciona una interfaz programatica a todas las operaciones de Pipe. Usala para integrar la gestion de pipelines en scripts, notebooks o aplicaciones.

from dataspoc_pipe.sdk import PipeClient
client = PipeClient()

No se necesita configuracion. PipeClient lee el mismo directorio de configuracion ~/.dataspoc-pipe/ que el CLI.

Retorna los nombres de todos los pipelines configurados.

Retorna: list[str]

names = client.pipelines()
print(names)
# ['orders', 'customers', 'events']

Retorna la configuracion completa de un pipeline como un diccionario.

Parametros:

  • name (str): Nombre del pipeline

Retorna: dict con claves name, source, destination, incremental, schedule

cfg = client.config("orders")
print(cfg["source"]["tap"]) # 'tap-csv'
print(cfg["destination"]["bucket"]) # 's3://my-lake'
print(cfg["incremental"]["enabled"]) # True

Ejecuta un pipeline. Maneja la carga de estado, actualizacion del manifiesto y registro de ejecucion — los mismos pasos post-ejecucion que el CLI.

Parametros:

  • name (str): Nombre del pipeline
  • full (bool, opcional): Forzar extraccion completa, ignorando el estado incremental. Por defecto: False

Retorna: dict con claves success, streams, error

result = client.run("orders")
if result["success"]:
print(f"Extracted {sum(result['streams'].values())} records")
for stream, count in result["streams"].items():
print(f" {stream}: {count}")
else:
print(f"Failed: {result['error']}")
Extracted 5200 records
orders: 5200

Forzar una extraccion completa:

result = client.run("orders", full=True)

Retorna el estado de todos los pipelines configurados.

Retorna: list[dict], cada uno con claves name, last_run, status, duration, records

for pipeline in client.status():
print(f"{pipeline['name']}: {pipeline['status']} ({pipeline['records']} records)")
orders: success (5200 records)
customers: success (12400 records)
events: no runs (None records)

Retorna el log de ejecucion mas reciente de un pipeline.

Parametros:

  • name (str): Nombre del pipeline

Retorna: dict | None

log = client.logs("orders")
if log:
print(f"Status: {log['status']}")
print(f"Duration: {log['duration_seconds']}s")
print(f"Records: {log['total_records']}")

Retorna None si el pipeline nunca ha sido ejecutado.

Retorna el manifiesto (catalogo) de un bucket.

Parametros:

  • bucket (str): URI del bucket (ej., s3://my-lake, file:///tmp/lake)

Retorna: dict con el catalogo de tablas

m = client.manifest("s3://my-lake")
for table_name, info in m.get("tables", {}).items():
print(f"{table_name}: {info.get('total_records', 0)} records")

Prueba la conectividad del bucket y la disponibilidad del tap para un pipeline.

Parametros:

  • name (str): Nombre del pipeline

Retorna: dict con claves bucket_ok, tap_ok, errors

result = client.validate("orders")
if result["bucket_ok"] and result["tap_ok"]:
print("Pipeline is ready")
else:
for error in result["errors"]:
print(f"Issue: {error}")

Ejemplo completo: ejecutar todos los pipelines y reportar

Sección titulada «Ejemplo completo: ejecutar todos los pipelines y reportar»
from dataspoc_pipe.sdk import PipeClient
client = PipeClient()
for name in client.pipelines():
# Validate before running
check = client.validate(name)
if not check["bucket_ok"] or not check["tap_ok"]:
print(f"SKIP {name}: {check['errors']}")
continue
# Run the pipeline
result = client.run(name)
if result["success"]:
total = sum(result["streams"].values())
print(f"OK {name}: {total} records")
else:
print(f"FAIL {name}: {result['error']}")

Ejemplo completo: integracion con Jupyter notebook

Sección titulada «Ejemplo completo: integracion con Jupyter notebook»
from dataspoc_pipe.sdk import PipeClient
import pandas as pd
client = PipeClient()
# Check what is available
print("Pipelines:", client.pipelines())
print()
# Show status as a DataFrame
status = client.status()
df = pd.DataFrame(status)
df
namelast_runstatusdurationrecords
orders2025-01-20T14:00:00success3.25200
customers2025-01-20T01:30:00success12.512400