Postgres para S3 em 5 Minutos com DataSpoc Pipe
A maioria das equipes gasta semanas construindo pipelines de dados que movem dados do Postgres para um data lake. Com Airflow, você precisa de uma DAG, operadores, conexões, agendamento, monitoramento e uma equipe inteira de infraestrutura. Com o DataSpoc Pipe, você precisa de 5 comandos.
O Jeito Airflow (50+ linhas de boilerplate)
Veja como uma DAG típica do Airflow se parece para extração incremental do Postgres:
from airflow import DAGfrom airflow.providers.postgres.hooks.postgres import PostgresHookfrom airflow.providers.amazon.aws.hooks.s3 import S3Hookfrom airflow.operators.python import PythonOperatorfrom datetime import datetime, timedeltaimport pandas as pdimport pyarrow as paimport pyarrow.parquet as pqimport io
default_args = { "owner": "data-eng", "depends_on_past": True, "email_on_failure": True, "email": ["data-team@company.com"], "retries": 3, "retry_delay": timedelta(minutes=5),}
dag = DAG( "postgres_orders_to_s3", default_args=default_args, schedule_interval="@hourly", start_date=datetime(2024, 1, 1), catchup=False,)
def extract_and_load(**context): pg_hook = PostgresHook(postgres_conn_id="prod_postgres") s3_hook = S3Hook(aws_conn_id="data_lake_s3")
# Get bookmark from last run last_run = context["prev_execution_date"] or datetime(2020, 1, 1)
query = f""" SELECT * FROM orders WHERE updated_at > '{last_run.isoformat()}' ORDER BY updated_at """
df = pg_hook.get_pandas_df(query) if df.empty: return
table = pa.Table.from_pandas(df) buf = io.BytesIO() pq.write_table(table, buf)
key = f"raw/postgres/orders/{context['ts_nodash']}.parquet" s3_hook.load_bytes(buf.getvalue(), key=key, bucket_name="company-lake")
extract_task = PythonOperator( task_id="extract_orders", python_callable=extract_and_load, dag=dag,)São 50+ linhas apenas para uma tabela. Você ainda precisa de:
- Infraestrutura do Airflow (scheduler, webserver, banco de dados, workers)
- Gerenciamento de conexões na UI do Airflow
- Configuração de monitoramento e alertas
- Uma DAG por tabela fonte (ou DAGs dinâmicas complexas)
O Jeito DataSpoc Pipe (5 comandos, 10 linhas de config)
Passo 1: Instalar
pip install dataspoc-pipePasso 2: Inicializar um projeto
dataspoc-pipe init my-pipelinecd my-pipelineIsso cria:
my-pipeline/ pipeline.yaml .dataspoc/ state/ logs/Passo 3: Adicionar sua fonte Postgres
dataspoc-pipe add postgres \ --host prod-db.company.com \ --port 5432 \ --database orders_db \ --tables orders,customers,products \ --incremental updated_at \ --destination s3://company-lakeIsso gera uma configuração YAML limpa:
name: postgres-orderssource: type: postgres host: ${POSTGRES_HOST} port: 5432 database: orders_db tables: - name: orders incremental_key: updated_at - name: customers incremental_key: updated_at - name: products incremental_key: updated_at
destination: type: s3 bucket: company-lake prefix: raw/postgres format: parquet10 linhas de configuração real. Credenciais vêm de variáveis de ambiente — nunca armazenadas no arquivo.
Passo 4: Executar o pipeline
export POSTGRES_HOST=prod-db.company.comexport POSTGRES_PASSWORD=<from-vault>
dataspoc-pipe runSaída:
[2024-03-15 10:23:01] Starting pipeline: postgres-orders[2024-03-15 10:23:01] Source: postgres (orders_db)[2024-03-15 10:23:01] Destination: s3://company-lake/raw/postgres/[2024-03-15 10:23:02] Extracting: orders (incremental from 2024-03-15T09:00:00)[2024-03-15 10:23:04] Extracted: 1,247 rows[2024-03-15 10:23:05] Extracting: customers (incremental from 2024-03-15T09:00:00)[2024-03-15 10:23:05] Extracted: 23 rows[2024-03-15 10:23:06] Extracting: products (incremental from 2024-03-15T09:00:00)[2024-03-15 10:23:06] Extracted: 5 rows[2024-03-15 10:23:07] Pipeline complete: 3 tables, 1,275 rows, 2.4 MB writtenPasso 5: Verificar status
dataspoc-pipe statusPipeline: postgres-ordersLast run: 2024-03-15 10:23:07 (SUCCESS)Tables: orders | 1,247 rows | bookmark: 2024-03-15T10:23:04 customers | 23 rows | bookmark: 2024-03-15T10:23:05 products | 5 rows | bookmark: 2024-03-15T10:23:06Como a Extração Incremental Funciona
O DataSpoc Pipe armazena bookmarks no próprio bucket:
s3://company-lake/ .dataspoc/ state/postgres-orders/state.json raw/postgres/ orders/ 20240315_102304.parquet customers/ 20240315_102305.parquet products/ 20240315_102306.parquetO arquivo de estado rastreia o último valor extraído para cada tabela:
{ "pipeline": "postgres-orders", "tables": { "orders": { "incremental_key": "updated_at", "bookmark": "2024-03-15T10:23:04", "rows_extracted": 1247 }, "customers": { "incremental_key": "updated_at", "bookmark": "2024-03-15T10:23:05", "rows_extracted": 23 } }}Na próxima execução, o Pipe automaticamente continua de onde parou. Sem gerenciamento de estado do Airflow, sem XCom, sem banco de dados.
Agendamento
Para agendamento, use cron, timers do systemd ou qualquer sistema CI/CD:
# crontab — run every hour0 * * * * cd /opt/pipelines/my-pipeline && dataspoc-pipe run >> /var/log/pipe.log 2>&1Ou no GitHub Actions:
name: Hourly Extractionon: schedule: - cron: "0 * * * *"jobs: extract: runs-on: ubuntu-latest steps: - uses: actions/checkout@v4 - run: pip install dataspoc-pipe - run: dataspoc-pipe run env: POSTGRES_HOST: ${{ secrets.POSTGRES_HOST }} POSTGRES_PASSWORD: ${{ secrets.POSTGRES_PASSWORD }} AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }} AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }}A Comparação
| Aspecto | Airflow | DataSpoc Pipe |
|---|---|---|
| Linhas de código por tabela | 50+ | 10 (YAML) |
| Infraestrutura necessária | Scheduler, DB, workers | Nenhuma (apenas o CLI) |
| Tempo até primeira extração | Horas/dias | 5 minutos |
| Gerenciamento de estado | Metastore do Airflow | Arquivo de estado no bucket |
| Lógica incremental | Você constrói | Integrada |
| Formato de saída | Você escolhe e implementa | Parquet (otimizado) |
| Monitoramento | UI do Airflow + alertas | dataspoc-pipe status + logs no bucket |
| Agendamento | Integrado | cron / CI / qualquer agendador |
Quando Você Ainda Precisa do Airflow
O DataSpoc Pipe é para extração — mover dados de fontes para seu lake. Você ainda pode querer Airflow (ou Dagster, Prefect) se precisar de:
- Dependências complexas de DAG entre dezenas de jobs
- UI integrada de retry com acionamento manual
- Integração com Spark/dbt/outras ferramentas de transformação
- Recursos de colaboração em equipe na autoria de pipelines
Mas para o caso comum de “pegar dados de A para Parquet em B, incrementalmente” — o Pipe faz em 5 minutos, não 5 dias.
Próximos Passos
pip install dataspoc-pipedataspoc-pipe init my-first-pipelinedataspoc-pipe add --helpConfira a lista completa de fontes para todos os conectores suportados: MySQL, MongoDB, REST APIs, Salesforce, HubSpot e mais.