Patrones de Extracción Incremental: Nunca Más Re-Extraigas Datos Antiguos
Tu pipeline extrae 10 millones de filas de Postgres cada noche. Pero solo 5,000 filas cambiaron desde ayer. Estás releyendo el 99.95% de datos sin cambios, pagando por el cómputo, el ancho de banda y el tiempo.
La extracción incremental solucióna esto. DataSpoc Pipe usa bookmarks de Singer para rastrear dónde se quedó y solo obtener lo nuevo. Este artículo cubre los tres patrónes de extracción incremental, cómo configurar cada uno y cuánto dinero ahorras.
Cómo Funcionan los Bookmarks de Singer
Cada tap de Singer emite mensajes STATE que registran su posición en los datos de origen. DataSpoc Pipe los persiste cómo archivos state.json en el bucket:
bucket/ .dataspoc/ state/ postgres-production/ state.jsonUn archivo de estado típico se ve así:
{ "bookmarks": { "orders": { "replication_key": "updated_at", "replication_key_value": "2026-04-26T15:30:00Z" }, "users": { "replication_key": "id", "replication_key_value": 45231 }, "audit_log": { "log_position": "mysql-bin.000042:12345" } }}En la siguiente ejecución, Pipe pasa este estado al tap. El tap solo consulta filas después del bookmark. Sin escaneos completos de tabla. Sin lecturas desperdiciadas.
Patrón 1: Basado en Timestamp (modified_at)
El patrón más común. Tu tabla tiene una columna updated_at o modified_at que se actualiza cada vez que una fila cambia.
Mejor para: Tablas con un timestamp updated_at que se configura de manera confiable en cada INSERT y UPDATE.
source: tap: tap-postgres config: host: "${POSTGRES_HOST}" port: 5432 database: production user: "${POSTGRES_USER}" password: "${POSTGRES_PASSWORD}"
tables: - name: orders replication_method: incremental replication_key: updated_at
- name: customers replication_method: incremental replication_key: modified_at
- name: products replication_method: incremental replication_key: updated_at
target: bucket: "s3://my-company-data" prefix: "raw/postgres" format: parquetCómo funcióna:
-- First run: full extractionSELECT * FROM orders ORDER BY updated_at;-- State saved: updated_at = '2026-04-26T15:30:00Z'
-- Second run: only new/changed rowsSELECT * FROM ordersWHERE updated_at > '2026-04-26T15:30:00Z'ORDER BY updated_at;-- State updated: updated_at = '2026-04-27T08:15:00Z'Cuidado: Si tu aplicación no configura updated_at en cada update, perderás cambios. Siempre verifica con:
-- Check: are there recent rows without updated_at?SELECT COUNT(*)FROM ordersWHERE updated_at IS NULL OR updated_at < created_at;Patrón 2: Basado en ID (Auto-Increment)
Usa la clave primaria cómo bookmark. Solo funcióna para tablas de solo inserción donde las filas nunca se actualizan.
Mejor para: Logs de eventos, registros de auditoría, eventos de analytics — cualquier tabla donde las filas se insertan pero nunca se modifican.
tables: - name: events replication_method: incremental replication_key: id
- name: audit_log replication_method: incremental replication_key: id
- name: email_sends replication_method: incremental replication_key: idCómo funcióna:
-- First runSELECT * FROM events ORDER BY id;-- State saved: id = 1245600
-- Second runSELECT * FROM events WHERE id > 1245600 ORDER BY id;-- State updated: id = 1248900Cuidado: Esto no captura actualizaciones a filas existentes. Si haces UPDATE events SET status = 'processed' WHERE id = 1000, la segunda ejecución no re-extraerá la fila 1000. Usa el patrón basado en timestamp para tablas con actualizaciones.
Patrón 3: Basado en Log (CDC)
Change Data Capture lee el log de transacciones de la base de datos (WAL en Postgres, binlog en MySQL). Captura cada INSERT, UPDATE y DELETE — incluso filas sin columna updated_at.
Mejor para: Tablas donde necesitas capturar cada cambio incluyendo eliminaciones. Tablas sin una clave de replicación confiable.
tables: - name: subscriptions replication_method: log_based
- name: payments replication_method: log_based
- name: inventory replication_method: log_basedCómo funcióna:
# First run: initial full snapshot + start reading WALSnapshot: subscriptions (full table)WAL position: 0/1A3B4C0
# Second run: only WAL changes since last positionRead WAL from 0/1A3B4C0: INSERT INTO subscriptions (id=5001, plan='pro', ...) UPDATE subscriptions SET plan='business' WHERE id=4200 DELETE FROM subscriptions WHERE id=3100Configuración para Postgres: Habilita replicación lógica:
-- On your Postgres serverALTER SYSTEM SET wal_level = 'logical';ALTER SYSTEM SET max_replication_slots = 4;-- Restart Postgres
-- Create a replication slot for PipeSELECT pg_create_logical_replication_slot('dataspoc_pipe', 'pgoutput');# Pipeline config for CDCsource: tap: tap-postgres config: host: "${POSTGRES_HOST}" database: production user: "${POSTGRES_REPLICATION_USER}" password: "${POSTGRES_REPLICATION_PASSWORD}" replication_slot: dataspoc_pipe default_replication_method: log_basedCuidado: CDC requiere un usuario de replicación con privilegio REPLICATION. También agrega algo de carga a la base de datos de origen para mantener el slot de replicación.
Cuándo Usar Cada Patrón
| Patrón | ¿Captura Updates? | ¿Captura Deletes? | Requisitos del Origen | Rendimiento |
|---|---|---|---|---|
| Timestamp | Sí | No | Necesita columna updated_at | Rápido |
| Basado en ID | No | No | Necesita PK auto-increment | Más rápido |
| Basado en Log (CDC) | Sí | Sí | Acceso a WAL/binlog | Medio |
Regla general:
- Tablas de solo inserción (eventos, logs) → Basado en ID
- Tablas mutables con updated_at (orders, users) → Basado en Timestamp
- Tablas mutables sin updated_at → Basado en Log (CDC)
- Tablas de lookup pequeñas (< 10K filas) → Extracción completa (es suficientemente rápido)
Mezclando Patrones en Un Pipeline
La mayoría de los pipelines reales usan múltiples patrónes:
source: tap: tap-postgres config: host: "${POSTGRES_HOST}" database: production
tables: # Mutable tables: use timestamp - name: orders replication_method: incremental replication_key: updated_at
- name: customers replication_method: incremental replication_key: modified_at
# Append-only tables: use ID - name: events replication_method: incremental replication_key: id
- name: audit_log replication_method: incremental replication_key: id
# Small lookup tables: full extraction - name: countries replication_method: full
- name: product_categories replication_method: full
target: bucket: "s3://my-company-data" prefix: "raw/postgres" format: parquetMonitoreando el Estado Incremental
Verifica el estado de cualquier pipeline:
dataspoc-pipe status postgres-productionPipeline: postgres-productionLast run: 2026-04-27 06:00:12 (success, 45s)Tables: orders incremental updated_at=2026-04-27T05:59:48Z 1,230 new rows customers incremental modified_at=2026-04-27T04:12:00Z 42 new rows events incremental id=2,456,789 18,400 new rows audit_log incremental id=892,345 3,200 new rows countries full - 195 rows categories full - 34 rowsVer logs detallados:
dataspoc-pipe logs postgres-production --last 3[2026-04-27 06:00:12] SUCCESS 45s 22,901 rows 4.2 MB[2026-04-26 06:00:08] SUCCESS 38s 19,450 rows 3.8 MB[2026-04-25 06:00:15] SUCCESS 52s 31,200 rows 5.1 MBReiniciando el Estado (Re-Extracción Completa)
A veces necesitas empezar de cero: el schema cambió, se hizo un backfill de datos, o sospechas que se perdieron filas.
# Re-extract everythingdataspoc-pipe run postgres-production --full
# Re-extract one tabledataspoc-pipe run postgres-production --full --table ordersEl flag --full ignora el archivo de estado y hace una extracción completa. Un nuevo estado se guarda al final.
Cálculo de Ahorro en Costos
Considera un pipeline con estas características:
- Origen: Postgres con 50M filas en 10 tablas
- Cambios diarios: ~100K filas (0.2%)
- Ejecución diaria
| Métrica | Extracción Completa | Incremental |
|---|---|---|
| Filas leídas por ejecución | 50,000,000 | 100,000 |
| Carga en BD origen | Alta (escaneo completo) | Mínima (búsqueda por índice) |
| Transferencia de red | ~5 GB | ~10 MB |
| Tiempo de ejecución de Pipe | ~30 minutos | ~15 segundos |
| Costos de PUT en S3 | ~$0.25/ejecución | ~$0.005/ejecución |
| Costo mensual | ~$7.50 | ~$0.15 |
| Carga mensual en BD | 15 horas | 7.5 minutos |
Eso es una reducción de 50x en costo y una reducción de 120x en carga de base de datos. Para datasets más grandes, el ahorro es aún más dramático.
Lista de Verificación Antes de Ir Incremental
- Verifica que tu clave de replicación esté indexada en la base de datos de origen
- Confirma que
updated_atse configura en cada INSERT y UPDATE (para patrón timestamp) - Prueba con una tabla pequeña primero: ejecuta completo, luego incremental, compara conteos de filas
- Configura monitoreo:
dataspoc-pipe statusen tu sistema de alertas - Programa ejecuciones
--fullsemanal o mensualmente cómo red de seguridad
La extracción incremental es la optimización individual más grande que puedes hacer en un pipeline de datos. Deja de releer datos que no han cambiado.