Exemplos
Três exemplos completos mostrando configurações de pipelines do mundo real.
Exemplo 1: CSV para filesystem local
Seção intitulada “Exemplo 1: CSV para filesystem local”O pipeline mais simples. Lê um arquivo CSV e grava Parquet no filesystem local. Nenhuma credencial de nuvem é necessária.
Instalar
Seção intitulada “Instalar”pip install dataspoc-pipe tap-csvCriar dados de exemplo
Seção intitulada “Criar dados de exemplo”mkdir -p /tmp/sample-datacat > /tmp/sample-data/products.csv << 'EOF'id,name,category,price,in_stock1,Widget A,electronics,29.99,true2,Gadget B,electronics,49.99,true3,Gizmo C,accessories,19.99,false4,Widget D,electronics,39.99,true5,Doohickey E,accessories,9.99,trueEOFConfiguração
Seção intitulada “Configuração”dataspoc-pipe initdataspoc-pipe add csv-productsNo assistente, informe:
- Tap:
tap-csv - Bucket:
file:///tmp/lake - Caminho base:
raw - Compressão:
zstd - Incremental: não
- Cron: (deixe vazio)
Configuração da fonte
Seção intitulada “Configuração da fonte”Edite ~/.dataspoc-pipe/sources/csv-products.json:
{ "csv_files_definition": [ { "entity": "products", "path": "/tmp/sample-data/products.csv", "keys": ["id"] } ]}YAML do pipeline
Seção intitulada “YAML do pipeline”~/.dataspoc-pipe/pipelines/csv-products.yaml:
source: tap: tap-csv config: /home/you/.dataspoc-pipe/sources/csv-products.jsondestination: bucket: file:///tmp/lake path: raw compression: zstd partition_by: _extraction_dateincremental: enabled: falseExecutar
Seção intitulada “Executar”dataspoc-pipe validate csv-productsdataspoc-pipe run csv-productsValidating: csv-products Bucket OK: file:///tmp/lake Tap OK: tap-csv found in PATH
Running: csv-products products: 5 records...Done! 5 records in 1 stream(s) products: 5Estrutura do bucket
Seção intitulada “Estrutura do bucket”/tmp/lake/ .dataspoc/ manifest.json logs/csv-products/2025-01-20T103000Z.json raw/ csv/products/ dt=2025-01-20/ products_0000.parquetConsultar o resultado
Seção intitulada “Consultar o resultado”import duckdbduckdb.sql("SELECT * FROM '/tmp/lake/raw/csv/products/**/*.parquet'").show()Exemplo 2: PostgreSQL para S3 (incremental, agendado)
Seção intitulada “Exemplo 2: PostgreSQL para S3 (incremental, agendado)”Um pipeline de produção que extrai dados incrementalmente do PostgreSQL para o S3, executando a cada 2 horas.
Instalar
Seção intitulada “Instalar”pip install dataspoc-pipe[s3] tap-postgresConfiguração
Seção intitulada “Configuração”dataspoc-pipe initdataspoc-pipe add pg-ordersNo assistente, informe:
- Tap:
tap-postgres - Bucket:
s3://company-datalake - Caminho base:
raw - Compressão:
zstd - Incremental: sim
- Cron:
0 */2 * * *
Configuração da fonte
Seção intitulada “Configuração da fonte”Edite ~/.dataspoc-pipe/sources/pg-orders.json:
{ "host": "db.example.com", "port": 5432, "user": "readonly", "dbname": "production", "filter_schemas": "public", "default_replication_method": "INCREMENTAL", "default_replication_key": "updated_at"}YAML do pipeline
Seção intitulada “YAML do pipeline”~/.dataspoc-pipe/pipelines/pg-orders.yaml:
source: tap: tap-postgres config: /home/you/.dataspoc-pipe/sources/pg-orders.json streams: - public-orders - public-order_itemsdestination: bucket: s3://company-datalake path: raw compression: zstd partition_by: _extraction_dateincremental: enabled: trueschedule: cron: "0 */2 * * *"Transformação opcional
Seção intitulada “Transformação opcional”Crie ~/.dataspoc-pipe/transforms/pg-orders.py para limpar dados durante a ingestão:
import pandas as pd
def transform(df: pd.DataFrame) -> pd.DataFrame: """Normalize order data.""" # Ensure email is lowercase if "email" in df.columns: df["email"] = df["email"].str.lower().str.strip()
# Drop test orders if "is_test" in df.columns: df = df[df["is_test"] != True]
return dfExecutar
Seção intitulada “Executar”# Validate connectivitydataspoc-pipe validate pg-orders
# First run (full extraction)dataspoc-pipe run pg-orders
# Install cron scheduledataspoc-pipe schedule installValidating: pg-orders Bucket OK: s3://company-datalake Tap OK: tap-postgres found in PATH
Running: pg-orders public-orders: 250,000 records... public-order_items: 890,000 records...Done! 1,140,000 records in 2 stream(s) public-orders: 250,000 public-order_items: 890,000
Installed: pg-orders (0 */2 * * *)1 schedule(s) installed.Execuções subsequentes
Seção intitulada “Execuções subsequentes”A cada 2 horas, o cron dispara o pipeline. Como o incremental está habilitado, apenas registros novos são extraídos:
Running: pg-orders Incremental mode (using previous checkpoint) public-orders: 340 records... public-order_items: 1,200 records...Done! 1,540 records in 2 stream(s) public-orders: 340 public-order_items: 1,200Estrutura do bucket
Seção intitulada “Estrutura do bucket”s3://company-datalake/ .dataspoc/ manifest.json state/pg-orders/state.json logs/pg-orders/2025-01-20T100000Z.json logs/pg-orders/2025-01-20T120000Z.json raw/ postgres/public-orders/ dt=2025-01-20/ public-orders_0000.parquet postgres/public-order_items/ dt=2025-01-20/ public-order_items_0000.parquetExemplo 3: GitHub API para GCS
Seção intitulada “Exemplo 3: GitHub API para GCS”Extrair dados de repositórios da API do GitHub para o Google Cloud Storage.
Instalar
Seção intitulada “Instalar”pip install dataspoc-pipe[gcs] tap-githubConfiguração
Seção intitulada “Configuração”dataspoc-pipe initdataspoc-pipe add github-reposNo assistente, informe:
- Tap:
tap-github - Bucket:
gs://analytics-lake - Caminho base:
raw - Compressão:
zstd - Incremental: sim
- Cron:
0 1 * * *(diariamente às 01:00)
Configuração da fonte
Seção intitulada “Configuração da fonte”Edite ~/.dataspoc-pipe/sources/github-repos.json:
{ "access_token": "${GITHUB_TOKEN}", "repository": "dataspoclab/dataspoc-pipe", "start_date": "2025-01-01T00:00:00Z"}Defina seu token do GitHub como variável de ambiente:
export GITHUB_TOKEN=ghp_xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxYAML do pipeline
Seção intitulada “YAML do pipeline”~/.dataspoc-pipe/pipelines/github-repos.yaml:
source: tap: tap-github config: /home/you/.dataspoc-pipe/sources/github-repos.json streams: - commits - pull_requests - issues - stargazersdestination: bucket: gs://analytics-lake path: raw compression: zstd partition_by: _extraction_dateincremental: enabled: trueschedule: cron: "0 1 * * *"Executar
Seção intitulada “Executar”# Authenticate with GCPgcloud auth application-default login
# Validate and rundataspoc-pipe validate github-reposdataspoc-pipe run github-reposValidating: github-repos Bucket OK: gs://analytics-lake Tap OK: tap-github found in PATH
Running: github-repos commits: 1,250 records... pull_requests: 180 records... issues: 95 records... stargazers: 420 records...Done! 1,945 records in 4 stream(s) commits: 1,250 pull_requests: 180 issues: 95 stargazers: 420Instalar agendamento diário
Seção intitulada “Instalar agendamento diário”dataspoc-pipe schedule install Installed: github-repos (0 1 * * *)1 schedule(s) installed.Estrutura do bucket
Seção intitulada “Estrutura do bucket”gs://analytics-lake/ .dataspoc/ manifest.json state/github-repos/state.json logs/github-repos/2025-01-20T010000Z.json raw/ github/commits/ dt=2025-01-20/ commits_0000.parquet github/pull_requests/ dt=2025-01-20/ pull_requests_0000.parquet github/issues/ dt=2025-01-20/ issues_0000.parquet github/stargazers/ dt=2025-01-20/ stargazers_0000.parquet