Pular para o conteúdo

Quickstart

Este guia mostra como criar e executar um pipeline que ingere um arquivo CSV em Parquet no filesystem local.

Terminal window
pip install dataspoc-pipe
pip install tap-csv
Terminal window
mkdir -p /tmp/sample-data
cat > /tmp/sample-data/orders.csv << 'EOF'
id,customer,product,amount,created_at
1,Alice,Widget,29.99,2025-01-15
2,Bob,Gadget,49.99,2025-01-16
3,Charlie,Widget,29.99,2025-01-17
4,Alice,Gizmo,19.99,2025-01-18
5,Diana,Gadget,49.99,2025-01-19
EOF
Terminal window
dataspoc-pipe init

Saída esperada:

Structure created at /home/you/.dataspoc-pipe
Next step: dataspoc-pipe add <name>

Isso cria a seguinte estrutura:

~/.dataspoc-pipe/
config.yaml # Global defaults
sources/ # Tap configuration files
pipelines/ # Pipeline YAML definitions
transforms/ # Optional Python transforms
Terminal window
dataspoc-pipe add my-first-pipeline

O assistente interativo vai perguntar:

Creating pipeline: my-first-pipeline
Taps with template: tap-csv, tap-postgres, tap-mysql, ...
Tap Singer: tap-csv
Template found for tap-csv
Source config: /home/you/.dataspoc-pipe/sources/my-first-pipeline.json
Destination bucket (e.g.: s3://my-bucket, file:///tmp/lake): file:///tmp/lake
Base path in bucket [raw]: raw
Compression (zstd, snappy, gzip, none) [zstd]: zstd
Enable incremental extraction? [y/N]: N
Cron expression for scheduling (empty to skip):
Pipeline saved at /home/you/.dataspoc-pipe/pipelines/my-first-pipeline.yaml
Next steps:
1. Edit source config: /home/you/.dataspoc-pipe/sources/my-first-pipeline.json
2. Validate: dataspoc-pipe validate my-first-pipeline
3. Run: dataspoc-pipe run my-first-pipeline

Abra ~/.dataspoc-pipe/sources/my-first-pipeline.json e aponte para o CSV:

{
"csv_files_definition": [
{
"entity": "orders",
"path": "/tmp/sample-data/orders.csv",
"keys": ["id"]
}
]
}

O arquivo gerado em ~/.dataspoc-pipe/pipelines/my-first-pipeline.yaml fica assim:

source:
tap: tap-csv
config: /home/you/.dataspoc-pipe/sources/my-first-pipeline.json
destination:
bucket: file:///tmp/lake
path: raw
compression: zstd
partition_by: _extraction_date
incremental:
enabled: false
schedule:
cron: null
Terminal window
dataspoc-pipe validate my-first-pipeline

Saída esperada:

Validating: my-first-pipeline
Bucket OK: file:///tmp/lake
Tap OK: tap-csv found in PATH
Terminal window
dataspoc-pipe run my-first-pipeline

Saída esperada:

Running: my-first-pipeline
orders: 5 records...
Done! 5 records in 1 stream(s)
orders: 5
Terminal window
dataspoc-pipe status
Pipelines
┌───────────────────┬─────────────────────┬─────────┬──────────┬─────────┐
│ Pipeline │ Last Run │ Status │ Duration │ Records │
├───────────────────┼─────────────────────┼─────────┼──────────┼─────────┤
│ my-first-pipeline │ 2025-01-20T10:30:00 │ success │ 1.2s │ 5 │
└───────────────────┴─────────────────────┴─────────┴──────────┴─────────┘
Terminal window
dataspoc-pipe logs my-first-pipeline
{
"pipeline": "my-first-pipeline",
"status": "success",
"started_at": "2025-01-20T10:30:00Z",
"finished_at": "2025-01-20T10:30:01Z",
"duration_seconds": 1.2,
"total_records": 5,
"streams": {
"orders": 5
}
}
Terminal window
dataspoc-pipe manifest file:///tmp/lake

O manifest mostra todas as tabelas que o Pipe gravou neste bucket, incluindo schemas, contagem de registros e timestamps. Este é o catálogo que ferramentas downstream como o DataSpoc Lens usam para descobrir os dados disponíveis.

/tmp/lake/
.dataspoc/
manifest.json
state/my-first-pipeline/state.json
logs/my-first-pipeline/2025-01-20T103000Z.json
raw/
csv/orders/
dt=2025-01-20/
orders_0000.parquet

O arquivo Parquet está pronto para ser consultado com DuckDB, Pandas, Polars ou DataSpoc Lens.