data-engineeringetlpostgresqls3parquetairflow

Postgres para S3 em 5 Minutos com DataSpoc Pipe

Michael San Martim · 2026-04-20

A maioria das equipes gasta semanas construindo pipelines de dados que movem dados do Postgres para um data lake. Com Airflow, você precisa de uma DAG, operadores, conexões, agendamento, monitoramento e uma equipe inteira de infraestrutura. Com o DataSpoc Pipe, você precisa de 5 comandos.

O Jeito Airflow (50+ linhas de boilerplate)

Veja como uma DAG típica do Airflow se parece para extração incremental do Postgres:

from airflow import DAG
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
import io
default_args = {
"owner": "data-eng",
"depends_on_past": True,
"email_on_failure": True,
"email": ["data-team@company.com"],
"retries": 3,
"retry_delay": timedelta(minutes=5),
}
dag = DAG(
"postgres_orders_to_s3",
default_args=default_args,
schedule_interval="@hourly",
start_date=datetime(2024, 1, 1),
catchup=False,
)
def extract_and_load(**context):
pg_hook = PostgresHook(postgres_conn_id="prod_postgres")
s3_hook = S3Hook(aws_conn_id="data_lake_s3")
# Get bookmark from last run
last_run = context["prev_execution_date"] or datetime(2020, 1, 1)
query = f"""
SELECT * FROM orders
WHERE updated_at > '{last_run.isoformat()}'
ORDER BY updated_at
"""
df = pg_hook.get_pandas_df(query)
if df.empty:
return
table = pa.Table.from_pandas(df)
buf = io.BytesIO()
pq.write_table(table, buf)
key = f"raw/postgres/orders/{context['ts_nodash']}.parquet"
s3_hook.load_bytes(buf.getvalue(), key=key, bucket_name="company-lake")
extract_task = PythonOperator(
task_id="extract_orders",
python_callable=extract_and_load,
dag=dag,
)

São 50+ linhas apenas para uma tabela. Você ainda precisa de:

  • Infraestrutura do Airflow (scheduler, webserver, banco de dados, workers)
  • Gerenciamento de conexões na UI do Airflow
  • Configuração de monitoramento e alertas
  • Uma DAG por tabela fonte (ou DAGs dinâmicas complexas)

O Jeito DataSpoc Pipe (5 comandos, 10 linhas de config)

Passo 1: Instalar

Terminal window
pip install dataspoc-pipe

Passo 2: Inicializar um projeto

Terminal window
dataspoc-pipe init my-pipeline
cd my-pipeline

Isso cria:

my-pipeline/
pipeline.yaml
.dataspoc/
state/
logs/

Passo 3: Adicionar sua fonte Postgres

Terminal window
dataspoc-pipe add postgres \
--host prod-db.company.com \
--port 5432 \
--database orders_db \
--tables orders,customers,products \
--incremental updated_at \
--destination s3://company-lake

Isso gera uma configuração YAML limpa:

pipeline.yaml
name: postgres-orders
source:
type: postgres
host: ${POSTGRES_HOST}
port: 5432
database: orders_db
tables:
- name: orders
incremental_key: updated_at
- name: customers
incremental_key: updated_at
- name: products
incremental_key: updated_at
destination:
type: s3
bucket: company-lake
prefix: raw/postgres
format: parquet

10 linhas de configuração real. Credenciais vêm de variáveis de ambiente — nunca armazenadas no arquivo.

Passo 4: Executar o pipeline

Terminal window
export POSTGRES_HOST=prod-db.company.com
export POSTGRES_PASSWORD=<from-vault>
dataspoc-pipe run

Saída:

[2024-03-15 10:23:01] Starting pipeline: postgres-orders
[2024-03-15 10:23:01] Source: postgres (orders_db)
[2024-03-15 10:23:01] Destination: s3://company-lake/raw/postgres/
[2024-03-15 10:23:02] Extracting: orders (incremental from 2024-03-15T09:00:00)
[2024-03-15 10:23:04] Extracted: 1,247 rows
[2024-03-15 10:23:05] Extracting: customers (incremental from 2024-03-15T09:00:00)
[2024-03-15 10:23:05] Extracted: 23 rows
[2024-03-15 10:23:06] Extracting: products (incremental from 2024-03-15T09:00:00)
[2024-03-15 10:23:06] Extracted: 5 rows
[2024-03-15 10:23:07] Pipeline complete: 3 tables, 1,275 rows, 2.4 MB written

Passo 5: Verificar status

Terminal window
dataspoc-pipe status
Pipeline: postgres-orders
Last run: 2024-03-15 10:23:07 (SUCCESS)
Tables:
orders | 1,247 rows | bookmark: 2024-03-15T10:23:04
customers | 23 rows | bookmark: 2024-03-15T10:23:05
products | 5 rows | bookmark: 2024-03-15T10:23:06

Como a Extração Incremental Funciona

O DataSpoc Pipe armazena bookmarks no próprio bucket:

s3://company-lake/
.dataspoc/
state/postgres-orders/state.json
raw/postgres/
orders/
20240315_102304.parquet
customers/
20240315_102305.parquet
products/
20240315_102306.parquet

O arquivo de estado rastreia o último valor extraído para cada tabela:

{
"pipeline": "postgres-orders",
"tables": {
"orders": {
"incremental_key": "updated_at",
"bookmark": "2024-03-15T10:23:04",
"rows_extracted": 1247
},
"customers": {
"incremental_key": "updated_at",
"bookmark": "2024-03-15T10:23:05",
"rows_extracted": 23
}
}
}

Na próxima execução, o Pipe automaticamente continua de onde parou. Sem gerenciamento de estado do Airflow, sem XCom, sem banco de dados.

Agendamento

Para agendamento, use cron, timers do systemd ou qualquer sistema CI/CD:

Terminal window
# crontab — run every hour
0 * * * * cd /opt/pipelines/my-pipeline && dataspoc-pipe run >> /var/log/pipe.log 2>&1

Ou no GitHub Actions:

name: Hourly Extraction
on:
schedule:
- cron: "0 * * * *"
jobs:
extract:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- run: pip install dataspoc-pipe
- run: dataspoc-pipe run
env:
POSTGRES_HOST: ${{ secrets.POSTGRES_HOST }}
POSTGRES_PASSWORD: ${{ secrets.POSTGRES_PASSWORD }}
AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }}
AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }}

A Comparação

AspectoAirflowDataSpoc Pipe
Linhas de código por tabela50+10 (YAML)
Infraestrutura necessáriaScheduler, DB, workersNenhuma (apenas o CLI)
Tempo até primeira extraçãoHoras/dias5 minutos
Gerenciamento de estadoMetastore do AirflowArquivo de estado no bucket
Lógica incrementalVocê constróiIntegrada
Formato de saídaVocê escolhe e implementaParquet (otimizado)
MonitoramentoUI do Airflow + alertasdataspoc-pipe status + logs no bucket
AgendamentoIntegradocron / CI / qualquer agendador

Quando Você Ainda Precisa do Airflow

O DataSpoc Pipe é para extração — mover dados de fontes para seu lake. Você ainda pode querer Airflow (ou Dagster, Prefect) se precisar de:

  • Dependências complexas de DAG entre dezenas de jobs
  • UI integrada de retry com acionamento manual
  • Integração com Spark/dbt/outras ferramentas de transformação
  • Recursos de colaboração em equipe na autoria de pipelines

Mas para o caso comum de “pegar dados de A para Parquet em B, incrementalmente” — o Pipe faz em 5 minutos, não 5 dias.

Próximos Passos

Terminal window
pip install dataspoc-pipe
dataspoc-pipe init my-first-pipeline
dataspoc-pipe add --help

Confira a lista completa de fontes para todos os conectores suportados: MySQL, MongoDB, REST APIs, Salesforce, HubSpot e mais.

Recomendados