incrementalextractionsingerdata-engineeringpatterns

Padrões de Extração Incremental: Nunca Mais Re-Extraia Dados Antigos

Michael San Martim · 2026-04-27

Seu pipeline extrai 10 milhões de linhas do Postgres toda noite. Mas apenas 5.000 linhas mudaram desde ontem. Você está relendo 99.95% de dados inalterados, pagando pelo compute, pela banda e pelo tempo.

Extração incremental resolve isso. O DataSpoc Pipe usa bookmarks Singer para rastrear onde parou e buscar apenas o que é novo. Este post cobre os três padrões para extração incremental, como configurar cada um, e quanto dinheiro você economiza.

Como os Bookmarks Singer Funcionam

Todo tap Singer emite mensagens STATE que registram sua posição nos dados de origem. O DataSpoc Pipe persiste essas como arquivos state.json no bucket:

bucket/
.dataspoc/
state/
postgres-production/
state.json

Um arquivo de estado típico se parece com isso:

{
"bookmarks": {
"orders": {
"replication_key": "updated_at",
"replication_key_value": "2026-04-26T15:30:00Z"
},
"users": {
"replication_key": "id",
"replication_key_value": 45231
},
"audit_log": {
"log_position": "mysql-bin.000042:12345"
}
}
}

Na próxima execução, o Pipe passa este estado para o tap. O tap apenas consulta linhas após o bookmark. Sem full table scans. Sem leituras desperdiçadas.

Padrão 1: Baseado em Timestamp (modified_at)

O padrão mais comum. Sua tabela tem uma coluna updated_at ou modified_at que é atualizada sempre que uma linha muda.

Melhor para: Tabelas com um timestamp updated_at que é definido de forma confiável em cada INSERT e UPDATE.

pipelines/postgres-production.yaml
source:
tap: tap-postgres
config:
host: "${POSTGRES_HOST}"
port: 5432
database: production
user: "${POSTGRES_USER}"
password: "${POSTGRES_PASSWORD}"
tables:
- name: orders
replication_method: incremental
replication_key: updated_at
- name: customers
replication_method: incremental
replication_key: modified_at
- name: products
replication_method: incremental
replication_key: updated_at
target:
bucket: "s3://my-company-data"
prefix: "raw/postgres"
format: parquet

Como funciona:

-- First run: full extraction
SELECT * FROM orders ORDER BY updated_at;
-- State saved: updated_at = '2026-04-26T15:30:00Z'
-- Second run: only new/changed rows
SELECT * FROM orders
WHERE updated_at > '2026-04-26T15:30:00Z'
ORDER BY updated_at;
-- State updated: updated_at = '2026-04-27T08:15:00Z'

Pegadinha: Se sua aplicação não define updated_at em toda atualização, você perderá mudanças. Sempre verifique com:

-- Check: are there recent rows without updated_at?
SELECT COUNT(*)
FROM orders
WHERE updated_at IS NULL
OR updated_at < created_at;

Padrão 2: Baseado em ID (Auto-Increment)

Use a chave primária como bookmark. Só funciona para tabelas append-only onde linhas nunca são atualizadas.

Melhor para: Logs de eventos, trilhas de auditoria, eventos de analytics — qualquer tabela onde linhas são inseridas mas nunca modificadas.

tables:
- name: events
replication_method: incremental
replication_key: id
- name: audit_log
replication_method: incremental
replication_key: id
- name: email_sends
replication_method: incremental
replication_key: id

Como funciona:

-- First run
SELECT * FROM events ORDER BY id;
-- State saved: id = 1245600
-- Second run
SELECT * FROM events WHERE id > 1245600 ORDER BY id;
-- State updated: id = 1248900

Pegadinha: Isso perde atualizações em linhas existentes. Se você faz UPDATE events SET status = 'processed' WHERE id = 1000, a segunda execução não re-extrairá a linha 1000. Use baseado em timestamp para tabelas com atualizações.

Padrão 3: Baseado em Log (CDC)

Change Data Capture lê o log de transações do banco de dados (WAL no Postgres, binlog no MySQL). Ele captura cada INSERT, UPDATE e DELETE — mesmo linhas sem uma coluna updated_at.

Melhor para: Tabelas onde você precisa capturar toda mudança incluindo deletes. Tabelas sem uma chave de replicação confiável.

tables:
- name: subscriptions
replication_method: log_based
- name: payments
replication_method: log_based
- name: inventory
replication_method: log_based

Como funciona:

# First run: initial full snapshot + start reading WAL
Snapshot: subscriptions (full table)
WAL position: 0/1A3B4C0
# Second run: only WAL changes since last position
Read WAL from 0/1A3B4C0:
INSERT INTO subscriptions (id=5001, plan='pro', ...)
UPDATE subscriptions SET plan='business' WHERE id=4200
DELETE FROM subscriptions WHERE id=3100

Setup para Postgres: Habilite replicação lógica:

-- On your Postgres server
ALTER SYSTEM SET wal_level = 'logical';
ALTER SYSTEM SET max_replication_slots = 4;
-- Restart Postgres
-- Create a replication slot for Pipe
SELECT pg_create_logical_replication_slot('dataspoc_pipe', 'pgoutput');
# Pipeline config for CDC
source:
tap: tap-postgres
config:
host: "${POSTGRES_HOST}"
database: production
user: "${POSTGRES_REPLICATION_USER}"
password: "${POSTGRES_REPLICATION_PASSWORD}"
replication_slot: dataspoc_pipe
default_replication_method: log_based

Pegadinha: CDC requer um usuário de replicação com privilégio REPLICATION. Também adiciona alguma carga ao banco de dados fonte para manter o slot de replicação.

Quando Usar Cada Padrão

PadrãoCaptura Updates?Captura Deletes?Requisitos da FontePerformance
TimestampSimNãoPrecisa de coluna updated_atRápido
Baseado em IDNãoNãoPrecisa de PK auto-incrementMais rápido
Baseado em Log (CDC)SimSimAcesso WAL/binlogMédio

Regra geral:

  • Tabelas append-only (eventos, logs) → Baseado em ID
  • Tabelas mutáveis com updated_at (pedidos, usuários) → Baseado em Timestamp
  • Tabelas mutáveis sem updated_at → Baseado em Log (CDC)
  • Tabelas de lookup pequenas (< 10K linhas) → Extração completa (é rápido o suficiente)

Misturando Padrões em Um Pipeline

A maioria dos pipelines reais usa múltiplos padrões:

source:
tap: tap-postgres
config:
host: "${POSTGRES_HOST}"
database: production
tables:
# Mutable tables: use timestamp
- name: orders
replication_method: incremental
replication_key: updated_at
- name: customers
replication_method: incremental
replication_key: modified_at
# Append-only tables: use ID
- name: events
replication_method: incremental
replication_key: id
- name: audit_log
replication_method: incremental
replication_key: id
# Small lookup tables: full extraction
- name: countries
replication_method: full
- name: product_categories
replication_method: full
target:
bucket: "s3://my-company-data"
prefix: "raw/postgres"
format: parquet

Monitorando o Estado Incremental

Verifique o status de qualquer pipeline:

Terminal window
dataspoc-pipe status postgres-production
Pipeline: postgres-production
Last run: 2026-04-27 06:00:12 (success, 45s)
Tables:
orders incremental updated_at=2026-04-27T05:59:48Z 1,230 new rows
customers incremental modified_at=2026-04-27T04:12:00Z 42 new rows
events incremental id=2,456,789 18,400 new rows
audit_log incremental id=892,345 3,200 new rows
countries full - 195 rows
categories full - 34 rows

Veja logs detalhados:

Terminal window
dataspoc-pipe logs postgres-production --last 3
[2026-04-27 06:00:12] SUCCESS 45s 22,901 rows 4.2 MB
[2026-04-26 06:00:08] SUCCESS 38s 19,450 rows 3.8 MB
[2026-04-25 06:00:15] SUCCESS 52s 31,200 rows 5.1 MB

Resetando o Estado (Re-Extração Completa)

Às vezes você precisa recomeçar: schema mudou, dados foram backfilled, ou você suspeita de linhas perdidas.

Terminal window
# Re-extract everything
dataspoc-pipe run postgres-production --full
# Re-extract one table
dataspoc-pipe run postgres-production --full --table orders

A flag --full ignora o arquivo de estado e faz uma extração completa. Um novo estado é salvo ao final.

Cálculo de Economia de Custos

Considere um pipeline com estas características:

  • Fonte: Postgres com 50M linhas em 10 tabelas
  • Mudanças diárias: ~100K linhas (0.2%)
  • Executando diariamente
MétricaExtração CompletaIncremental
Linhas lidas por execução50.000.000100.000
Carga no BD fonteAlta (full scan)Mínima (index lookup)
Transferência de rede~5 GB~10 MB
Tempo de execução~30 minutos~15 segundos
Custos PUT S3~$0.25/execução~$0.005/execução
Custo mensal~$7.50~$0.15
Carga mensal no BD15 horas7.5 minutos

Isso é uma redução de 50x em custo e 120x em carga no banco de dados. Para datasets maiores, a economia é ainda mais dramática.

Checklist Antes de Ir para Incremental

  1. Verifique que sua chave de replicação está indexada no banco de dados fonte
  2. Confirme que updated_at é definido em todo INSERT e UPDATE (para o padrão timestamp)
  3. Teste com uma tabela pequena primeiro: execute full, depois incremental, compare contagens de linhas
  4. Configure monitoramento: dataspoc-pipe status no seu sistema de alertas
  5. Agende execuções --full semanais ou mensais como rede de segurança

Extração incremental é a maior otimização individual que você pode fazer em um pipeline de dados. Pare de reler dados que não mudaram.

Recomendados