Ir al contenido

Incremental Extraction

Esta página aún no está disponible en tu idioma.

Incremental extraction lets Pipe fetch only new or changed records since the last successful run, instead of re-extracting the entire dataset every time.

Pipe uses the Singer protocol’s bookmark mechanism:

  1. First run: The tap extracts all records (full extraction). As it processes data, the tap emits STATE messages containing bookmarks (e.g., the latest updated_at timestamp or the highest id).
  2. Pipe saves state: After a successful run, Pipe writes the final state to the bucket.
  3. Next run: Pipe loads the saved state and passes it to the tap with --state. The tap uses the bookmark to resume where it left off.
  4. Only new records: The tap only returns records newer than the bookmark.
Run 1: Full extraction → 100,000 records → state: {updated_at: "2025-01-20"}
Run 2: Incremental → 500 records → state: {updated_at: "2025-01-21"}
Run 3: Incremental → 120 records → state: {updated_at: "2025-01-21"}

Set incremental.enabled: true in your pipeline YAML:

source:
tap: tap-postgres
config: /home/you/.dataspoc-pipe/sources/customers.json
destination:
bucket: s3://my-datalake
path: raw
compression: zstd
incremental:
enabled: true

The tap itself must support incremental replication. Most database taps (tap-postgres, tap-mysql) and many API taps support it out of the box.

State files live inside the destination bucket:

<bucket>/.dataspoc/state/<pipeline>/state.json

For example:

s3://my-datalake/.dataspoc/state/customers/state.json

The state file is a JSON object matching the Singer STATE message format:

{
"bookmarks": {
"public-customers": {
"replication_key_value": "2025-01-20T15:30:00Z",
"replication_key": "updated_at"
}
}
}

Because state is stored in the bucket (not locally), incremental extraction works correctly even when Pipe runs on different machines.

To ignore the saved state and re-extract everything, use the --full flag:

Terminal window
dataspoc-pipe run customers --full

This does not delete the previous state. After a successful full run, the new state replaces the old one.

source:
tap: tap-postgres
config: /home/you/.dataspoc-pipe/sources/pg-prod.json
streams:
- public-orders
destination:
bucket: s3://company-datalake
path: raw
compression: zstd
incremental:
enabled: true
schedule:
cron: "0 */2 * * *"
{
"host": "db.example.com",
"port": 5432,
"user": "readonly",
"dbname": "production",
"filter_schemas": "public",
"default_replication_method": "INCREMENTAL",
"default_replication_key": "updated_at"
}
Terminal window
dataspoc-pipe run pg-orders
Running: pg-orders
public-orders: 250,000 records...
Done! 250,000 records in 1 stream(s)
public-orders: 250,000
Terminal window
dataspoc-pipe run pg-orders
Running: pg-orders
Incremental mode (using previous checkpoint)
public-orders: 340 records...
Done! 340 records in 1 stream(s)
public-orders: 340

Only 340 new or updated records were fetched.

  • State is only saved after a successful run. If the pipeline fails mid-extraction, the previous state is preserved.
  • Each pipeline has its own isolated state file. Running one pipeline never affects another.
  • State files are plain JSON and can be inspected or edited manually if needed.