Pular para o conteúdo

Exemplos

Três exemplos completos mostrando configurações de pipelines do mundo real.

O pipeline mais simples. Lê um arquivo CSV e grava Parquet no filesystem local. Nenhuma credencial de nuvem é necessária.

Terminal window
pip install dataspoc-pipe tap-csv
Terminal window
mkdir -p /tmp/sample-data
cat > /tmp/sample-data/products.csv << 'EOF'
id,name,category,price,in_stock
1,Widget A,electronics,29.99,true
2,Gadget B,electronics,49.99,true
3,Gizmo C,accessories,19.99,false
4,Widget D,electronics,39.99,true
5,Doohickey E,accessories,9.99,true
EOF
Terminal window
dataspoc-pipe init
dataspoc-pipe add csv-products

No assistente, informe:

  • Tap: tap-csv
  • Bucket: file:///tmp/lake
  • Caminho base: raw
  • Compressão: zstd
  • Incremental: não
  • Cron: (deixe vazio)

Edite ~/.dataspoc-pipe/sources/csv-products.json:

{
"csv_files_definition": [
{
"entity": "products",
"path": "/tmp/sample-data/products.csv",
"keys": ["id"]
}
]
}

~/.dataspoc-pipe/pipelines/csv-products.yaml:

source:
tap: tap-csv
config: /home/you/.dataspoc-pipe/sources/csv-products.json
destination:
bucket: file:///tmp/lake
path: raw
compression: zstd
partition_by: _extraction_date
incremental:
enabled: false
Terminal window
dataspoc-pipe validate csv-products
dataspoc-pipe run csv-products
Validating: csv-products
Bucket OK: file:///tmp/lake
Tap OK: tap-csv found in PATH
Running: csv-products
products: 5 records...
Done! 5 records in 1 stream(s)
products: 5
/tmp/lake/
.dataspoc/
manifest.json
logs/csv-products/2025-01-20T103000Z.json
raw/
csv/products/
dt=2025-01-20/
products_0000.parquet
import duckdb
duckdb.sql("SELECT * FROM '/tmp/lake/raw/csv/products/**/*.parquet'").show()

Exemplo 2: PostgreSQL para S3 (incremental, agendado)

Seção intitulada “Exemplo 2: PostgreSQL para S3 (incremental, agendado)”

Um pipeline de produção que extrai dados incrementalmente do PostgreSQL para o S3, executando a cada 2 horas.

Terminal window
pip install dataspoc-pipe[s3] tap-postgres
Terminal window
dataspoc-pipe init
dataspoc-pipe add pg-orders

No assistente, informe:

  • Tap: tap-postgres
  • Bucket: s3://company-datalake
  • Caminho base: raw
  • Compressão: zstd
  • Incremental: sim
  • Cron: 0 */2 * * *

Edite ~/.dataspoc-pipe/sources/pg-orders.json:

{
"host": "db.example.com",
"port": 5432,
"user": "readonly",
"dbname": "production",
"filter_schemas": "public",
"default_replication_method": "INCREMENTAL",
"default_replication_key": "updated_at"
}

~/.dataspoc-pipe/pipelines/pg-orders.yaml:

source:
tap: tap-postgres
config: /home/you/.dataspoc-pipe/sources/pg-orders.json
streams:
- public-orders
- public-order_items
destination:
bucket: s3://company-datalake
path: raw
compression: zstd
partition_by: _extraction_date
incremental:
enabled: true
schedule:
cron: "0 */2 * * *"

Crie ~/.dataspoc-pipe/transforms/pg-orders.py para limpar dados durante a ingestão:

import pandas as pd
def transform(df: pd.DataFrame) -> pd.DataFrame:
"""Normalize order data."""
# Ensure email is lowercase
if "email" in df.columns:
df["email"] = df["email"].str.lower().str.strip()
# Drop test orders
if "is_test" in df.columns:
df = df[df["is_test"] != True]
return df
Terminal window
# Validate connectivity
dataspoc-pipe validate pg-orders
# First run (full extraction)
dataspoc-pipe run pg-orders
# Install cron schedule
dataspoc-pipe schedule install
Validating: pg-orders
Bucket OK: s3://company-datalake
Tap OK: tap-postgres found in PATH
Running: pg-orders
public-orders: 250,000 records...
public-order_items: 890,000 records...
Done! 1,140,000 records in 2 stream(s)
public-orders: 250,000
public-order_items: 890,000
Installed: pg-orders (0 */2 * * *)
1 schedule(s) installed.

A cada 2 horas, o cron dispara o pipeline. Como o incremental está habilitado, apenas registros novos são extraídos:

Running: pg-orders
Incremental mode (using previous checkpoint)
public-orders: 340 records...
public-order_items: 1,200 records...
Done! 1,540 records in 2 stream(s)
public-orders: 340
public-order_items: 1,200
s3://company-datalake/
.dataspoc/
manifest.json
state/pg-orders/state.json
logs/pg-orders/2025-01-20T100000Z.json
logs/pg-orders/2025-01-20T120000Z.json
raw/
postgres/public-orders/
dt=2025-01-20/
public-orders_0000.parquet
postgres/public-order_items/
dt=2025-01-20/
public-order_items_0000.parquet

Extrair dados de repositórios da API do GitHub para o Google Cloud Storage.

Terminal window
pip install dataspoc-pipe[gcs] tap-github
Terminal window
dataspoc-pipe init
dataspoc-pipe add github-repos

No assistente, informe:

  • Tap: tap-github
  • Bucket: gs://analytics-lake
  • Caminho base: raw
  • Compressão: zstd
  • Incremental: sim
  • Cron: 0 1 * * * (diariamente às 01:00)

Edite ~/.dataspoc-pipe/sources/github-repos.json:

{
"access_token": "${GITHUB_TOKEN}",
"repository": "dataspoclab/dataspoc-pipe",
"start_date": "2025-01-01T00:00:00Z"
}

Defina seu token do GitHub como variável de ambiente:

Terminal window
export GITHUB_TOKEN=ghp_xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx

~/.dataspoc-pipe/pipelines/github-repos.yaml:

source:
tap: tap-github
config: /home/you/.dataspoc-pipe/sources/github-repos.json
streams:
- commits
- pull_requests
- issues
- stargazers
destination:
bucket: gs://analytics-lake
path: raw
compression: zstd
partition_by: _extraction_date
incremental:
enabled: true
schedule:
cron: "0 1 * * *"
Terminal window
# Authenticate with GCP
gcloud auth application-default login
# Validate and run
dataspoc-pipe validate github-repos
dataspoc-pipe run github-repos
Validating: github-repos
Bucket OK: gs://analytics-lake
Tap OK: tap-github found in PATH
Running: github-repos
commits: 1,250 records...
pull_requests: 180 records...
issues: 95 records...
stargazers: 420 records...
Done! 1,945 records in 4 stream(s)
commits: 1,250
pull_requests: 180
issues: 95
stargazers: 420
Terminal window
dataspoc-pipe schedule install
Installed: github-repos (0 1 * * *)
1 schedule(s) installed.
gs://analytics-lake/
.dataspoc/
manifest.json
state/github-repos/state.json
logs/github-repos/2025-01-20T010000Z.json
raw/
github/commits/
dt=2025-01-20/
commits_0000.parquet
github/pull_requests/
dt=2025-01-20/
pull_requests_0000.parquet
github/issues/
dt=2025-01-20/
issues_0000.parquet
github/stargazers/
dt=2025-01-20/
stargazers_0000.parquet