Python SDK
La clase PipeClient proporciona una interfaz programatica a todas las operaciones de Pipe. Usala para integrar la gestion de pipelines en scripts, notebooks o aplicaciones.
Importar
Sección titulada «Importar»from dataspoc_pipe.sdk import PipeClient
client = PipeClient()No se necesita configuracion. PipeClient lee el mismo directorio de configuracion ~/.dataspoc-pipe/ que el CLI.
Metodos
Sección titulada «Metodos»client.pipelines()
Sección titulada «client.pipelines()»Retorna los nombres de todos los pipelines configurados.
Retorna: list[str]
names = client.pipelines()print(names)# ['orders', 'customers', 'events']client.config(name)
Sección titulada «client.config(name)»Retorna la configuracion completa de un pipeline como un diccionario.
Parametros:
name(str): Nombre del pipeline
Retorna: dict con claves name, source, destination, incremental, schedule
cfg = client.config("orders")print(cfg["source"]["tap"]) # 'tap-csv'print(cfg["destination"]["bucket"]) # 's3://my-lake'print(cfg["incremental"]["enabled"]) # Trueclient.run(name, full=False)
Sección titulada «client.run(name, full=False)»Ejecuta un pipeline. Maneja la carga de estado, actualizacion del manifiesto y registro de ejecucion — los mismos pasos post-ejecucion que el CLI.
Parametros:
name(str): Nombre del pipelinefull(bool, opcional): Forzar extraccion completa, ignorando el estado incremental. Por defecto:False
Retorna: dict con claves success, streams, error
result = client.run("orders")if result["success"]: print(f"Extracted {sum(result['streams'].values())} records") for stream, count in result["streams"].items(): print(f" {stream}: {count}")else: print(f"Failed: {result['error']}")Extracted 5200 records orders: 5200Forzar una extraccion completa:
result = client.run("orders", full=True)client.status()
Sección titulada «client.status()»Retorna el estado de todos los pipelines configurados.
Retorna: list[dict], cada uno con claves name, last_run, status, duration, records
for pipeline in client.status(): print(f"{pipeline['name']}: {pipeline['status']} ({pipeline['records']} records)")orders: success (5200 records)customers: success (12400 records)events: no runs (None records)client.logs(name)
Sección titulada «client.logs(name)»Retorna el log de ejecucion mas reciente de un pipeline.
Parametros:
name(str): Nombre del pipeline
Retorna: dict | None
log = client.logs("orders")if log: print(f"Status: {log['status']}") print(f"Duration: {log['duration_seconds']}s") print(f"Records: {log['total_records']}")Retorna None si el pipeline nunca ha sido ejecutado.
client.manifest(bucket)
Sección titulada «client.manifest(bucket)»Retorna el manifiesto (catalogo) de un bucket.
Parametros:
bucket(str): URI del bucket (ej.,s3://my-lake,file:///tmp/lake)
Retorna: dict con el catalogo de tablas
m = client.manifest("s3://my-lake")for table_name, info in m.get("tables", {}).items(): print(f"{table_name}: {info.get('total_records', 0)} records")client.validate(name)
Sección titulada «client.validate(name)»Prueba la conectividad del bucket y la disponibilidad del tap para un pipeline.
Parametros:
name(str): Nombre del pipeline
Retorna: dict con claves bucket_ok, tap_ok, errors
result = client.validate("orders")if result["bucket_ok"] and result["tap_ok"]: print("Pipeline is ready")else: for error in result["errors"]: print(f"Issue: {error}")Ejemplo completo: ejecutar todos los pipelines y reportar
Sección titulada «Ejemplo completo: ejecutar todos los pipelines y reportar»from dataspoc_pipe.sdk import PipeClient
client = PipeClient()
for name in client.pipelines(): # Validate before running check = client.validate(name) if not check["bucket_ok"] or not check["tap_ok"]: print(f"SKIP {name}: {check['errors']}") continue
# Run the pipeline result = client.run(name) if result["success"]: total = sum(result["streams"].values()) print(f"OK {name}: {total} records") else: print(f"FAIL {name}: {result['error']}")Ejemplo completo: integracion con Jupyter notebook
Sección titulada «Ejemplo completo: integracion con Jupyter notebook»from dataspoc_pipe.sdk import PipeClientimport pandas as pd
client = PipeClient()
# Check what is availableprint("Pipelines:", client.pipelines())print()
# Show status as a DataFramestatus = client.status()df = pd.DataFrame(status)df| name | last_run | status | duration | records |
|---|---|---|---|---|
| orders | 2025-01-20T14:00:00 | success | 3.2 | 5200 |
| customers | 2025-01-20T01:30:00 | success | 12.5 | 12400 |