Padrões de Extração Incremental: Nunca Mais Re-Extraia Dados Antigos
Seu pipeline extrai 10 milhões de linhas do Postgres toda noite. Mas apenas 5.000 linhas mudaram desde ontem. Você está relendo 99.95% de dados inalterados, pagando pelo compute, pela banda e pelo tempo.
Extração incremental resolve isso. O DataSpoc Pipe usa bookmarks Singer para rastrear onde parou e buscar apenas o que é novo. Este post cobre os três padrões para extração incremental, como configurar cada um, e quanto dinheiro você economiza.
Como os Bookmarks Singer Funcionam
Todo tap Singer emite mensagens STATE que registram sua posição nos dados de origem. O DataSpoc Pipe persiste essas como arquivos state.json no bucket:
bucket/ .dataspoc/ state/ postgres-production/ state.jsonUm arquivo de estado típico se parece com isso:
{ "bookmarks": { "orders": { "replication_key": "updated_at", "replication_key_value": "2026-04-26T15:30:00Z" }, "users": { "replication_key": "id", "replication_key_value": 45231 }, "audit_log": { "log_position": "mysql-bin.000042:12345" } }}Na próxima execução, o Pipe passa este estado para o tap. O tap apenas consulta linhas após o bookmark. Sem full table scans. Sem leituras desperdiçadas.
Padrão 1: Baseado em Timestamp (modified_at)
O padrão mais comum. Sua tabela tem uma coluna updated_at ou modified_at que é atualizada sempre que uma linha muda.
Melhor para: Tabelas com um timestamp updated_at que é definido de forma confiável em cada INSERT e UPDATE.
source: tap: tap-postgres config: host: "${POSTGRES_HOST}" port: 5432 database: production user: "${POSTGRES_USER}" password: "${POSTGRES_PASSWORD}"
tables: - name: orders replication_method: incremental replication_key: updated_at
- name: customers replication_method: incremental replication_key: modified_at
- name: products replication_method: incremental replication_key: updated_at
target: bucket: "s3://my-company-data" prefix: "raw/postgres" format: parquetComo funciona:
-- First run: full extractionSELECT * FROM orders ORDER BY updated_at;-- State saved: updated_at = '2026-04-26T15:30:00Z'
-- Second run: only new/changed rowsSELECT * FROM ordersWHERE updated_at > '2026-04-26T15:30:00Z'ORDER BY updated_at;-- State updated: updated_at = '2026-04-27T08:15:00Z'Pegadinha: Se sua aplicação não define updated_at em toda atualização, você perderá mudanças. Sempre verifique com:
-- Check: are there recent rows without updated_at?SELECT COUNT(*)FROM ordersWHERE updated_at IS NULL OR updated_at < created_at;Padrão 2: Baseado em ID (Auto-Increment)
Use a chave primária como bookmark. Só funciona para tabelas append-only onde linhas nunca são atualizadas.
Melhor para: Logs de eventos, trilhas de auditoria, eventos de analytics — qualquer tabela onde linhas são inseridas mas nunca modificadas.
tables: - name: events replication_method: incremental replication_key: id
- name: audit_log replication_method: incremental replication_key: id
- name: email_sends replication_method: incremental replication_key: idComo funciona:
-- First runSELECT * FROM events ORDER BY id;-- State saved: id = 1245600
-- Second runSELECT * FROM events WHERE id > 1245600 ORDER BY id;-- State updated: id = 1248900Pegadinha: Isso perde atualizações em linhas existentes. Se você faz UPDATE events SET status = 'processed' WHERE id = 1000, a segunda execução não re-extrairá a linha 1000. Use baseado em timestamp para tabelas com atualizações.
Padrão 3: Baseado em Log (CDC)
Change Data Capture lê o log de transações do banco de dados (WAL no Postgres, binlog no MySQL). Ele captura cada INSERT, UPDATE e DELETE — mesmo linhas sem uma coluna updated_at.
Melhor para: Tabelas onde você precisa capturar toda mudança incluindo deletes. Tabelas sem uma chave de replicação confiável.
tables: - name: subscriptions replication_method: log_based
- name: payments replication_method: log_based
- name: inventory replication_method: log_basedComo funciona:
# First run: initial full snapshot + start reading WALSnapshot: subscriptions (full table)WAL position: 0/1A3B4C0
# Second run: only WAL changes since last positionRead WAL from 0/1A3B4C0: INSERT INTO subscriptions (id=5001, plan='pro', ...) UPDATE subscriptions SET plan='business' WHERE id=4200 DELETE FROM subscriptions WHERE id=3100Setup para Postgres: Habilite replicação lógica:
-- On your Postgres serverALTER SYSTEM SET wal_level = 'logical';ALTER SYSTEM SET max_replication_slots = 4;-- Restart Postgres
-- Create a replication slot for PipeSELECT pg_create_logical_replication_slot('dataspoc_pipe', 'pgoutput');# Pipeline config for CDCsource: tap: tap-postgres config: host: "${POSTGRES_HOST}" database: production user: "${POSTGRES_REPLICATION_USER}" password: "${POSTGRES_REPLICATION_PASSWORD}" replication_slot: dataspoc_pipe default_replication_method: log_basedPegadinha: CDC requer um usuário de replicação com privilégio REPLICATION. Também adiciona alguma carga ao banco de dados fonte para manter o slot de replicação.
Quando Usar Cada Padrão
| Padrão | Captura Updates? | Captura Deletes? | Requisitos da Fonte | Performance |
|---|---|---|---|---|
| Timestamp | Sim | Não | Precisa de coluna updated_at | Rápido |
| Baseado em ID | Não | Não | Precisa de PK auto-increment | Mais rápido |
| Baseado em Log (CDC) | Sim | Sim | Acesso WAL/binlog | Médio |
Regra geral:
- Tabelas append-only (eventos, logs) → Baseado em ID
- Tabelas mutáveis com updated_at (pedidos, usuários) → Baseado em Timestamp
- Tabelas mutáveis sem updated_at → Baseado em Log (CDC)
- Tabelas de lookup pequenas (< 10K linhas) → Extração completa (é rápido o suficiente)
Misturando Padrões em Um Pipeline
A maioria dos pipelines reais usa múltiplos padrões:
source: tap: tap-postgres config: host: "${POSTGRES_HOST}" database: production
tables: # Mutable tables: use timestamp - name: orders replication_method: incremental replication_key: updated_at
- name: customers replication_method: incremental replication_key: modified_at
# Append-only tables: use ID - name: events replication_method: incremental replication_key: id
- name: audit_log replication_method: incremental replication_key: id
# Small lookup tables: full extraction - name: countries replication_method: full
- name: product_categories replication_method: full
target: bucket: "s3://my-company-data" prefix: "raw/postgres" format: parquetMonitorando o Estado Incremental
Verifique o status de qualquer pipeline:
dataspoc-pipe status postgres-productionPipeline: postgres-productionLast run: 2026-04-27 06:00:12 (success, 45s)Tables: orders incremental updated_at=2026-04-27T05:59:48Z 1,230 new rows customers incremental modified_at=2026-04-27T04:12:00Z 42 new rows events incremental id=2,456,789 18,400 new rows audit_log incremental id=892,345 3,200 new rows countries full - 195 rows categories full - 34 rowsVeja logs detalhados:
dataspoc-pipe logs postgres-production --last 3[2026-04-27 06:00:12] SUCCESS 45s 22,901 rows 4.2 MB[2026-04-26 06:00:08] SUCCESS 38s 19,450 rows 3.8 MB[2026-04-25 06:00:15] SUCCESS 52s 31,200 rows 5.1 MBResetando o Estado (Re-Extração Completa)
Às vezes você precisa recomeçar: schema mudou, dados foram backfilled, ou você suspeita de linhas perdidas.
# Re-extract everythingdataspoc-pipe run postgres-production --full
# Re-extract one tabledataspoc-pipe run postgres-production --full --table ordersA flag --full ignora o arquivo de estado e faz uma extração completa. Um novo estado é salvo ao final.
Cálculo de Economia de Custos
Considere um pipeline com estas características:
- Fonte: Postgres com 50M linhas em 10 tabelas
- Mudanças diárias: ~100K linhas (0.2%)
- Executando diariamente
| Métrica | Extração Completa | Incremental |
|---|---|---|
| Linhas lidas por execução | 50.000.000 | 100.000 |
| Carga no BD fonte | Alta (full scan) | Mínima (index lookup) |
| Transferência de rede | ~5 GB | ~10 MB |
| Tempo de execução | ~30 minutos | ~15 segundos |
| Custos PUT S3 | ~$0.25/execução | ~$0.005/execução |
| Custo mensal | ~$7.50 | ~$0.15 |
| Carga mensal no BD | 15 horas | 7.5 minutos |
Isso é uma redução de 50x em custo e 120x em carga no banco de dados. Para datasets maiores, a economia é ainda mais dramática.
Checklist Antes de Ir para Incremental
- Verifique que sua chave de replicação está indexada no banco de dados fonte
- Confirme que
updated_até definido em todo INSERT e UPDATE (para o padrão timestamp) - Teste com uma tabela pequena primeiro: execute full, depois incremental, compare contagens de linhas
- Configure monitoramento:
dataspoc-pipe statusno seu sistema de alertas - Agende execuções
--fullsemanais ou mensais como rede de segurança
Extração incremental é a maior otimização individual que você pode fazer em um pipeline de dados. Pare de reler dados que não mudaram.