Incremental Extraction
Este conteúdo não está disponível em sua língua ainda.
Incremental extraction lets Pipe fetch only new or changed records since the last successful run, instead of re-extracting the entire dataset every time.
How it works
Section titled “How it works”Pipe uses the Singer protocol’s bookmark mechanism:
- First run: The tap extracts all records (full extraction). As it processes data, the tap emits
STATEmessages containing bookmarks (e.g., the latestupdated_attimestamp or the highestid). - Pipe saves state: After a successful run, Pipe writes the final state to the bucket.
- Next run: Pipe loads the saved state and passes it to the tap with
--state. The tap uses the bookmark to resume where it left off. - Only new records: The tap only returns records newer than the bookmark.
Run 1: Full extraction → 100,000 records → state: {updated_at: "2025-01-20"}Run 2: Incremental → 500 records → state: {updated_at: "2025-01-21"}Run 3: Incremental → 120 records → state: {updated_at: "2025-01-21"}Enabling incremental
Section titled “Enabling incremental”Set incremental.enabled: true in your pipeline YAML:
source: tap: tap-postgres config: /home/you/.dataspoc-pipe/sources/customers.json
destination: bucket: s3://my-datalake path: raw compression: zstd
incremental: enabled: trueThe tap itself must support incremental replication. Most database taps (tap-postgres, tap-mysql) and many API taps support it out of the box.
Where state is stored
Section titled “Where state is stored”State files live inside the destination bucket:
<bucket>/.dataspoc/state/<pipeline>/state.jsonFor example:
s3://my-datalake/.dataspoc/state/customers/state.jsonThe state file is a JSON object matching the Singer STATE message format:
{ "bookmarks": { "public-customers": { "replication_key_value": "2025-01-20T15:30:00Z", "replication_key": "updated_at" } }}Because state is stored in the bucket (not locally), incremental extraction works correctly even when Pipe runs on different machines.
Forcing a full extraction
Section titled “Forcing a full extraction”To ignore the saved state and re-extract everything, use the --full flag:
dataspoc-pipe run customers --fullThis does not delete the previous state. After a successful full run, the new state replaces the old one.
Example: incremental PostgreSQL
Section titled “Example: incremental PostgreSQL”Pipeline config
Section titled “Pipeline config”source: tap: tap-postgres config: /home/you/.dataspoc-pipe/sources/pg-prod.json streams: - public-orders
destination: bucket: s3://company-datalake path: raw compression: zstd
incremental: enabled: true
schedule: cron: "0 */2 * * *"Source config (sources/pg-prod.json)
Section titled “Source config (sources/pg-prod.json)”{ "host": "db.example.com", "port": 5432, "user": "readonly", "dbname": "production", "filter_schemas": "public", "default_replication_method": "INCREMENTAL", "default_replication_key": "updated_at"}First run
Section titled “First run”dataspoc-pipe run pg-ordersRunning: pg-orders public-orders: 250,000 records...Done! 250,000 records in 1 stream(s) public-orders: 250,000Second run (2 hours later)
Section titled “Second run (2 hours later)”dataspoc-pipe run pg-ordersRunning: pg-orders Incremental mode (using previous checkpoint) public-orders: 340 records...Done! 340 records in 1 stream(s) public-orders: 340Only 340 new or updated records were fetched.
State safety
Section titled “State safety”- State is only saved after a successful run. If the pipeline fails mid-extraction, the previous state is preserved.
- Each pipeline has its own isolated state file. Running one pipeline never affects another.
- State files are plain JSON and can be inspected or edited manually if needed.