incrementalextractionsingerdata-engineeringpatterns

Patrones de Extracción Incremental: Nunca Más Re-Extraigas Datos Antiguos

Michael San Martim · 2026-04-27

Tu pipeline extrae 10 millones de filas de Postgres cada noche. Pero solo 5,000 filas cambiaron desde ayer. Estás releyendo el 99.95% de datos sin cambios, pagando por el cómputo, el ancho de banda y el tiempo.

La extracción incremental solucióna esto. DataSpoc Pipe usa bookmarks de Singer para rastrear dónde se quedó y solo obtener lo nuevo. Este artículo cubre los tres patrónes de extracción incremental, cómo configurar cada uno y cuánto dinero ahorras.

Cómo Funcionan los Bookmarks de Singer

Cada tap de Singer emite mensajes STATE que registran su posición en los datos de origen. DataSpoc Pipe los persiste cómo archivos state.json en el bucket:

bucket/
.dataspoc/
state/
postgres-production/
state.json

Un archivo de estado típico se ve así:

{
"bookmarks": {
"orders": {
"replication_key": "updated_at",
"replication_key_value": "2026-04-26T15:30:00Z"
},
"users": {
"replication_key": "id",
"replication_key_value": 45231
},
"audit_log": {
"log_position": "mysql-bin.000042:12345"
}
}
}

En la siguiente ejecución, Pipe pasa este estado al tap. El tap solo consulta filas después del bookmark. Sin escaneos completos de tabla. Sin lecturas desperdiciadas.

Patrón 1: Basado en Timestamp (modified_at)

El patrón más común. Tu tabla tiene una columna updated_at o modified_at que se actualiza cada vez que una fila cambia.

Mejor para: Tablas con un timestamp updated_at que se configura de manera confiable en cada INSERT y UPDATE.

pipelines/postgres-production.yaml
source:
tap: tap-postgres
config:
host: "${POSTGRES_HOST}"
port: 5432
database: production
user: "${POSTGRES_USER}"
password: "${POSTGRES_PASSWORD}"
tables:
- name: orders
replication_method: incremental
replication_key: updated_at
- name: customers
replication_method: incremental
replication_key: modified_at
- name: products
replication_method: incremental
replication_key: updated_at
target:
bucket: "s3://my-company-data"
prefix: "raw/postgres"
format: parquet

Cómo funcióna:

-- First run: full extraction
SELECT * FROM orders ORDER BY updated_at;
-- State saved: updated_at = '2026-04-26T15:30:00Z'
-- Second run: only new/changed rows
SELECT * FROM orders
WHERE updated_at > '2026-04-26T15:30:00Z'
ORDER BY updated_at;
-- State updated: updated_at = '2026-04-27T08:15:00Z'

Cuidado: Si tu aplicación no configura updated_at en cada update, perderás cambios. Siempre verifica con:

-- Check: are there recent rows without updated_at?
SELECT COUNT(*)
FROM orders
WHERE updated_at IS NULL
OR updated_at < created_at;

Patrón 2: Basado en ID (Auto-Increment)

Usa la clave primaria cómo bookmark. Solo funcióna para tablas de solo inserción donde las filas nunca se actualizan.

Mejor para: Logs de eventos, registros de auditoría, eventos de analytics — cualquier tabla donde las filas se insertan pero nunca se modifican.

tables:
- name: events
replication_method: incremental
replication_key: id
- name: audit_log
replication_method: incremental
replication_key: id
- name: email_sends
replication_method: incremental
replication_key: id

Cómo funcióna:

-- First run
SELECT * FROM events ORDER BY id;
-- State saved: id = 1245600
-- Second run
SELECT * FROM events WHERE id > 1245600 ORDER BY id;
-- State updated: id = 1248900

Cuidado: Esto no captura actualizaciones a filas existentes. Si haces UPDATE events SET status = 'processed' WHERE id = 1000, la segunda ejecución no re-extraerá la fila 1000. Usa el patrón basado en timestamp para tablas con actualizaciones.

Patrón 3: Basado en Log (CDC)

Change Data Capture lee el log de transacciones de la base de datos (WAL en Postgres, binlog en MySQL). Captura cada INSERT, UPDATE y DELETE — incluso filas sin columna updated_at.

Mejor para: Tablas donde necesitas capturar cada cambio incluyendo eliminaciones. Tablas sin una clave de replicación confiable.

tables:
- name: subscriptions
replication_method: log_based
- name: payments
replication_method: log_based
- name: inventory
replication_method: log_based

Cómo funcióna:

# First run: initial full snapshot + start reading WAL
Snapshot: subscriptions (full table)
WAL position: 0/1A3B4C0
# Second run: only WAL changes since last position
Read WAL from 0/1A3B4C0:
INSERT INTO subscriptions (id=5001, plan='pro', ...)
UPDATE subscriptions SET plan='business' WHERE id=4200
DELETE FROM subscriptions WHERE id=3100

Configuración para Postgres: Habilita replicación lógica:

-- On your Postgres server
ALTER SYSTEM SET wal_level = 'logical';
ALTER SYSTEM SET max_replication_slots = 4;
-- Restart Postgres
-- Create a replication slot for Pipe
SELECT pg_create_logical_replication_slot('dataspoc_pipe', 'pgoutput');
# Pipeline config for CDC
source:
tap: tap-postgres
config:
host: "${POSTGRES_HOST}"
database: production
user: "${POSTGRES_REPLICATION_USER}"
password: "${POSTGRES_REPLICATION_PASSWORD}"
replication_slot: dataspoc_pipe
default_replication_method: log_based

Cuidado: CDC requiere un usuario de replicación con privilegio REPLICATION. También agrega algo de carga a la base de datos de origen para mantener el slot de replicación.

Cuándo Usar Cada Patrón

Patrón¿Captura Updates?¿Captura Deletes?Requisitos del OrigenRendimiento
TimestampNoNecesita columna updated_atRápido
Basado en IDNoNoNecesita PK auto-incrementMás rápido
Basado en Log (CDC)Acceso a WAL/binlogMedio

Regla general:

  • Tablas de solo inserción (eventos, logs) → Basado en ID
  • Tablas mutables con updated_at (orders, users) → Basado en Timestamp
  • Tablas mutables sin updated_at → Basado en Log (CDC)
  • Tablas de lookup pequeñas (< 10K filas) → Extracción completa (es suficientemente rápido)

Mezclando Patrones en Un Pipeline

La mayoría de los pipelines reales usan múltiples patrónes:

source:
tap: tap-postgres
config:
host: "${POSTGRES_HOST}"
database: production
tables:
# Mutable tables: use timestamp
- name: orders
replication_method: incremental
replication_key: updated_at
- name: customers
replication_method: incremental
replication_key: modified_at
# Append-only tables: use ID
- name: events
replication_method: incremental
replication_key: id
- name: audit_log
replication_method: incremental
replication_key: id
# Small lookup tables: full extraction
- name: countries
replication_method: full
- name: product_categories
replication_method: full
target:
bucket: "s3://my-company-data"
prefix: "raw/postgres"
format: parquet

Monitoreando el Estado Incremental

Verifica el estado de cualquier pipeline:

Terminal window
dataspoc-pipe status postgres-production
Pipeline: postgres-production
Last run: 2026-04-27 06:00:12 (success, 45s)
Tables:
orders incremental updated_at=2026-04-27T05:59:48Z 1,230 new rows
customers incremental modified_at=2026-04-27T04:12:00Z 42 new rows
events incremental id=2,456,789 18,400 new rows
audit_log incremental id=892,345 3,200 new rows
countries full - 195 rows
categories full - 34 rows

Ver logs detallados:

Terminal window
dataspoc-pipe logs postgres-production --last 3
[2026-04-27 06:00:12] SUCCESS 45s 22,901 rows 4.2 MB
[2026-04-26 06:00:08] SUCCESS 38s 19,450 rows 3.8 MB
[2026-04-25 06:00:15] SUCCESS 52s 31,200 rows 5.1 MB

Reiniciando el Estado (Re-Extracción Completa)

A veces necesitas empezar de cero: el schema cambió, se hizo un backfill de datos, o sospechas que se perdieron filas.

Terminal window
# Re-extract everything
dataspoc-pipe run postgres-production --full
# Re-extract one table
dataspoc-pipe run postgres-production --full --table orders

El flag --full ignora el archivo de estado y hace una extracción completa. Un nuevo estado se guarda al final.

Cálculo de Ahorro en Costos

Considera un pipeline con estas características:

  • Origen: Postgres con 50M filas en 10 tablas
  • Cambios diarios: ~100K filas (0.2%)
  • Ejecución diaria
MétricaExtracción CompletaIncremental
Filas leídas por ejecución50,000,000100,000
Carga en BD origenAlta (escaneo completo)Mínima (búsqueda por índice)
Transferencia de red~5 GB~10 MB
Tiempo de ejecución de Pipe~30 minutos~15 segundos
Costos de PUT en S3~$0.25/ejecución~$0.005/ejecución
Costo mensual~$7.50~$0.15
Carga mensual en BD15 horas7.5 minutos

Eso es una reducción de 50x en costo y una reducción de 120x en carga de base de datos. Para datasets más grandes, el ahorro es aún más dramático.

Lista de Verificación Antes de Ir Incremental

  1. Verifica que tu clave de replicación esté indexada en la base de datos de origen
  2. Confirma que updated_at se configura en cada INSERT y UPDATE (para patrón timestamp)
  3. Prueba con una tabla pequeña primero: ejecuta completo, luego incremental, compara conteos de filas
  4. Configura monitoreo: dataspoc-pipe status en tu sistema de alertas
  5. Programa ejecuciones --full semanal o mensualmente cómo red de seguridad

La extracción incremental es la optimización individual más grande que puedes hacer en un pipeline de datos. Deja de releer datos que no han cambiado.

Recomendados