Ir al contenido

Ejemplos

Tres ejemplos completos mostrando configuraciones de pipelines del mundo real.

El pipeline mas simple. Lee un archivo CSV y escribe Parquet en el sistema de archivos local. No se necesitan credenciales de la nube.

Ventana de terminal
pip install dataspoc-pipe tap-csv
Ventana de terminal
mkdir -p /tmp/sample-data
cat > /tmp/sample-data/products.csv << 'EOF'
id,name,category,price,in_stock
1,Widget A,electronics,29.99,true
2,Gadget B,electronics,49.99,true
3,Gizmo C,accessories,19.99,false
4,Widget D,electronics,39.99,true
5,Doohickey E,accessories,9.99,true
EOF
Ventana de terminal
dataspoc-pipe init
dataspoc-pipe add csv-products

En el asistente, ingresa:

  • Tap: tap-csv
  • Bucket: file:///tmp/lake
  • Ruta base: raw
  • Compresion: zstd
  • Incremental: no
  • Cron: (dejar vacio)

Edita ~/.dataspoc-pipe/sources/csv-products.json:

{
"csv_files_definition": [
{
"entity": "products",
"path": "/tmp/sample-data/products.csv",
"keys": ["id"]
}
]
}

~/.dataspoc-pipe/pipelines/csv-products.yaml:

source:
tap: tap-csv
config: /home/you/.dataspoc-pipe/sources/csv-products.json
destination:
bucket: file:///tmp/lake
path: raw
compression: zstd
partition_by: _extraction_date
incremental:
enabled: false
Ventana de terminal
dataspoc-pipe validate csv-products
dataspoc-pipe run csv-products
Validating: csv-products
Bucket OK: file:///tmp/lake
Tap OK: tap-csv found in PATH
Running: csv-products
products: 5 records...
Done! 5 records in 1 stream(s)
products: 5
/tmp/lake/
.dataspoc/
manifest.json
logs/csv-products/2025-01-20T103000Z.json
raw/
csv/products/
dt=2025-01-20/
products_0000.parquet
import duckdb
duckdb.sql("SELECT * FROM '/tmp/lake/raw/csv/products/**/*.parquet'").show()

Ejemplo 2: PostgreSQL a S3 (incremental, programado)

Sección titulada «Ejemplo 2: PostgreSQL a S3 (incremental, programado)»

Un pipeline de grado produccion que extrae datos incrementalmente desde PostgreSQL a S3, ejecutandose cada 2 horas.

Ventana de terminal
pip install dataspoc-pipe[s3] tap-postgres
Ventana de terminal
dataspoc-pipe init
dataspoc-pipe add pg-orders

En el asistente, ingresa:

  • Tap: tap-postgres
  • Bucket: s3://company-datalake
  • Ruta base: raw
  • Compresion: zstd
  • Incremental: si
  • Cron: 0 */2 * * *

Edita ~/.dataspoc-pipe/sources/pg-orders.json:

{
"host": "db.example.com",
"port": 5432,
"user": "readonly",
"dbname": "production",
"filter_schemas": "public",
"default_replication_method": "INCREMENTAL",
"default_replication_key": "updated_at"
}

~/.dataspoc-pipe/pipelines/pg-orders.yaml:

source:
tap: tap-postgres
config: /home/you/.dataspoc-pipe/sources/pg-orders.json
streams:
- public-orders
- public-order_items
destination:
bucket: s3://company-datalake
path: raw
compression: zstd
partition_by: _extraction_date
incremental:
enabled: true
schedule:
cron: "0 */2 * * *"

Crea ~/.dataspoc-pipe/transforms/pg-orders.py para limpiar datos durante la ingesta:

import pandas as pd
def transform(df: pd.DataFrame) -> pd.DataFrame:
"""Normalize order data."""
# Ensure email is lowercase
if "email" in df.columns:
df["email"] = df["email"].str.lower().str.strip()
# Drop test orders
if "is_test" in df.columns:
df = df[df["is_test"] != True]
return df
Ventana de terminal
# Validate connectivity
dataspoc-pipe validate pg-orders
# First run (full extraction)
dataspoc-pipe run pg-orders
# Install cron schedule
dataspoc-pipe schedule install
Validating: pg-orders
Bucket OK: s3://company-datalake
Tap OK: tap-postgres found in PATH
Running: pg-orders
public-orders: 250,000 records...
public-order_items: 890,000 records...
Done! 1,140,000 records in 2 stream(s)
public-orders: 250,000
public-order_items: 890,000
Installed: pg-orders (0 */2 * * *)
1 schedule(s) installed.

Cada 2 horas, cron dispara el pipeline. Como el modo incremental esta habilitado, solo se extraen registros nuevos:

Running: pg-orders
Incremental mode (using previous checkpoint)
public-orders: 340 records...
public-order_items: 1,200 records...
Done! 1,540 records in 2 stream(s)
public-orders: 340
public-order_items: 1,200
s3://company-datalake/
.dataspoc/
manifest.json
state/pg-orders/state.json
logs/pg-orders/2025-01-20T100000Z.json
logs/pg-orders/2025-01-20T120000Z.json
raw/
postgres/public-orders/
dt=2025-01-20/
public-orders_0000.parquet
postgres/public-order_items/
dt=2025-01-20/
public-order_items_0000.parquet

Extraer datos de repositorios desde la GitHub API hacia Google Cloud Storage.

Ventana de terminal
pip install dataspoc-pipe[gcs] tap-github
Ventana de terminal
dataspoc-pipe init
dataspoc-pipe add github-repos

En el asistente, ingresa:

  • Tap: tap-github
  • Bucket: gs://analytics-lake
  • Ruta base: raw
  • Compresion: zstd
  • Incremental: si
  • Cron: 0 1 * * * (diario a las 01:00)

Edita ~/.dataspoc-pipe/sources/github-repos.json:

{
"access_token": "${GITHUB_TOKEN}",
"repository": "dataspoclab/dataspoc-pipe",
"start_date": "2025-01-01T00:00:00Z"
}

Configura tu token de GitHub como variable de entorno:

Ventana de terminal
export GITHUB_TOKEN=ghp_xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx

~/.dataspoc-pipe/pipelines/github-repos.yaml:

source:
tap: tap-github
config: /home/you/.dataspoc-pipe/sources/github-repos.json
streams:
- commits
- pull_requests
- issues
- stargazers
destination:
bucket: gs://analytics-lake
path: raw
compression: zstd
partition_by: _extraction_date
incremental:
enabled: true
schedule:
cron: "0 1 * * *"
Ventana de terminal
# Authenticate with GCP
gcloud auth application-default login
# Validate and run
dataspoc-pipe validate github-repos
dataspoc-pipe run github-repos
Validating: github-repos
Bucket OK: gs://analytics-lake
Tap OK: tap-github found in PATH
Running: github-repos
commits: 1,250 records...
pull_requests: 180 records...
issues: 95 records...
stargazers: 420 records...
Done! 1,945 records in 4 stream(s)
commits: 1,250
pull_requests: 180
issues: 95
stargazers: 420
Ventana de terminal
dataspoc-pipe schedule install
Installed: github-repos (0 1 * * *)
1 schedule(s) installed.
gs://analytics-lake/
.dataspoc/
manifest.json
state/github-repos/state.json
logs/github-repos/2025-01-20T010000Z.json
raw/
github/commits/
dt=2025-01-20/
commits_0000.parquet
github/pull_requests/
dt=2025-01-20/
pull_requests_0000.parquet
github/issues/
dt=2025-01-20/
issues_0000.parquet
github/stargazers/
dt=2025-01-20/
stargazers_0000.parquet