Ejemplos
Tres ejemplos completos mostrando configuraciones de pipelines del mundo real.
Ejemplo 1: CSV a sistema de archivos local
Sección titulada «Ejemplo 1: CSV a sistema de archivos local»El pipeline mas simple. Lee un archivo CSV y escribe Parquet en el sistema de archivos local. No se necesitan credenciales de la nube.
Instalar
Sección titulada «Instalar»pip install dataspoc-pipe tap-csvCrear datos de ejemplo
Sección titulada «Crear datos de ejemplo»mkdir -p /tmp/sample-datacat > /tmp/sample-data/products.csv << 'EOF'id,name,category,price,in_stock1,Widget A,electronics,29.99,true2,Gadget B,electronics,49.99,true3,Gizmo C,accessories,19.99,false4,Widget D,electronics,39.99,true5,Doohickey E,accessories,9.99,trueEOFConfiguracion
Sección titulada «Configuracion»dataspoc-pipe initdataspoc-pipe add csv-productsEn el asistente, ingresa:
- Tap:
tap-csv - Bucket:
file:///tmp/lake - Ruta base:
raw - Compresion:
zstd - Incremental: no
- Cron: (dejar vacio)
Configuracion de la fuente
Sección titulada «Configuracion de la fuente»Edita ~/.dataspoc-pipe/sources/csv-products.json:
{ "csv_files_definition": [ { "entity": "products", "path": "/tmp/sample-data/products.csv", "keys": ["id"] } ]}YAML del Pipeline
Sección titulada «YAML del Pipeline»~/.dataspoc-pipe/pipelines/csv-products.yaml:
source: tap: tap-csv config: /home/you/.dataspoc-pipe/sources/csv-products.jsondestination: bucket: file:///tmp/lake path: raw compression: zstd partition_by: _extraction_dateincremental: enabled: falseEjecutar
Sección titulada «Ejecutar»dataspoc-pipe validate csv-productsdataspoc-pipe run csv-productsValidating: csv-products Bucket OK: file:///tmp/lake Tap OK: tap-csv found in PATH
Running: csv-products products: 5 records...Done! 5 records in 1 stream(s) products: 5Estructura del bucket
Sección titulada «Estructura del bucket»/tmp/lake/ .dataspoc/ manifest.json logs/csv-products/2025-01-20T103000Z.json raw/ csv/products/ dt=2025-01-20/ products_0000.parquetConsultar el resultado
Sección titulada «Consultar el resultado»import duckdbduckdb.sql("SELECT * FROM '/tmp/lake/raw/csv/products/**/*.parquet'").show()Ejemplo 2: PostgreSQL a S3 (incremental, programado)
Sección titulada «Ejemplo 2: PostgreSQL a S3 (incremental, programado)»Un pipeline de grado produccion que extrae datos incrementalmente desde PostgreSQL a S3, ejecutandose cada 2 horas.
Instalar
Sección titulada «Instalar»pip install dataspoc-pipe[s3] tap-postgresConfiguracion
Sección titulada «Configuracion»dataspoc-pipe initdataspoc-pipe add pg-ordersEn el asistente, ingresa:
- Tap:
tap-postgres - Bucket:
s3://company-datalake - Ruta base:
raw - Compresion:
zstd - Incremental: si
- Cron:
0 */2 * * *
Configuracion de la fuente
Sección titulada «Configuracion de la fuente»Edita ~/.dataspoc-pipe/sources/pg-orders.json:
{ "host": "db.example.com", "port": 5432, "user": "readonly", "dbname": "production", "filter_schemas": "public", "default_replication_method": "INCREMENTAL", "default_replication_key": "updated_at"}YAML del Pipeline
Sección titulada «YAML del Pipeline»~/.dataspoc-pipe/pipelines/pg-orders.yaml:
source: tap: tap-postgres config: /home/you/.dataspoc-pipe/sources/pg-orders.json streams: - public-orders - public-order_itemsdestination: bucket: s3://company-datalake path: raw compression: zstd partition_by: _extraction_dateincremental: enabled: trueschedule: cron: "0 */2 * * *"Transformacion opcional
Sección titulada «Transformacion opcional»Crea ~/.dataspoc-pipe/transforms/pg-orders.py para limpiar datos durante la ingesta:
import pandas as pd
def transform(df: pd.DataFrame) -> pd.DataFrame: """Normalize order data.""" # Ensure email is lowercase if "email" in df.columns: df["email"] = df["email"].str.lower().str.strip()
# Drop test orders if "is_test" in df.columns: df = df[df["is_test"] != True]
return dfEjecutar
Sección titulada «Ejecutar»# Validate connectivitydataspoc-pipe validate pg-orders
# First run (full extraction)dataspoc-pipe run pg-orders
# Install cron scheduledataspoc-pipe schedule installValidating: pg-orders Bucket OK: s3://company-datalake Tap OK: tap-postgres found in PATH
Running: pg-orders public-orders: 250,000 records... public-order_items: 890,000 records...Done! 1,140,000 records in 2 stream(s) public-orders: 250,000 public-order_items: 890,000
Installed: pg-orders (0 */2 * * *)1 schedule(s) installed.Ejecuciones subsecuentes
Sección titulada «Ejecuciones subsecuentes»Cada 2 horas, cron dispara el pipeline. Como el modo incremental esta habilitado, solo se extraen registros nuevos:
Running: pg-orders Incremental mode (using previous checkpoint) public-orders: 340 records... public-order_items: 1,200 records...Done! 1,540 records in 2 stream(s) public-orders: 340 public-order_items: 1,200Estructura del bucket
Sección titulada «Estructura del bucket»s3://company-datalake/ .dataspoc/ manifest.json state/pg-orders/state.json logs/pg-orders/2025-01-20T100000Z.json logs/pg-orders/2025-01-20T120000Z.json raw/ postgres/public-orders/ dt=2025-01-20/ public-orders_0000.parquet postgres/public-order_items/ dt=2025-01-20/ public-order_items_0000.parquetEjemplo 3: GitHub API a GCS
Sección titulada «Ejemplo 3: GitHub API a GCS»Extraer datos de repositorios desde la GitHub API hacia Google Cloud Storage.
Instalar
Sección titulada «Instalar»pip install dataspoc-pipe[gcs] tap-githubConfiguracion
Sección titulada «Configuracion»dataspoc-pipe initdataspoc-pipe add github-reposEn el asistente, ingresa:
- Tap:
tap-github - Bucket:
gs://analytics-lake - Ruta base:
raw - Compresion:
zstd - Incremental: si
- Cron:
0 1 * * *(diario a las 01:00)
Configuracion de la fuente
Sección titulada «Configuracion de la fuente»Edita ~/.dataspoc-pipe/sources/github-repos.json:
{ "access_token": "${GITHUB_TOKEN}", "repository": "dataspoclab/dataspoc-pipe", "start_date": "2025-01-01T00:00:00Z"}Configura tu token de GitHub como variable de entorno:
export GITHUB_TOKEN=ghp_xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxYAML del Pipeline
Sección titulada «YAML del Pipeline»~/.dataspoc-pipe/pipelines/github-repos.yaml:
source: tap: tap-github config: /home/you/.dataspoc-pipe/sources/github-repos.json streams: - commits - pull_requests - issues - stargazersdestination: bucket: gs://analytics-lake path: raw compression: zstd partition_by: _extraction_dateincremental: enabled: trueschedule: cron: "0 1 * * *"Ejecutar
Sección titulada «Ejecutar»# Authenticate with GCPgcloud auth application-default login
# Validate and rundataspoc-pipe validate github-reposdataspoc-pipe run github-reposValidating: github-repos Bucket OK: gs://analytics-lake Tap OK: tap-github found in PATH
Running: github-repos commits: 1,250 records... pull_requests: 180 records... issues: 95 records... stargazers: 420 records...Done! 1,945 records in 4 stream(s) commits: 1,250 pull_requests: 180 issues: 95 stargazers: 420Instalar schedule diario
Sección titulada «Instalar schedule diario»dataspoc-pipe schedule install Installed: github-repos (0 1 * * *)1 schedule(s) installed.Estructura del bucket
Sección titulada «Estructura del bucket»gs://analytics-lake/ .dataspoc/ manifest.json state/github-repos/state.json logs/github-repos/2025-01-20T010000Z.json raw/ github/commits/ dt=2025-01-20/ commits_0000.parquet github/pull_requests/ dt=2025-01-20/ pull_requests_0000.parquet github/issues/ dt=2025-01-20/ issues_0000.parquet github/stargazers/ dt=2025-01-20/ stargazers_0000.parquet