data-engineeringetlpostgresqls3parquetairflow

Postgres to S3 in 5 Minutes with DataSpoc Pipe

Michael San Martim · 2026-04-20

Most teams spend weeks building data pipelines that move data from Postgres to a data lake. With Airflow, you need a DAG, operators, connections, scheduling, monitoring, and a whole infrastructure team. With DataSpoc Pipe, you need 5 commands.

The Airflow Way (50+ lines of boilerplate)

Here’s what a typical Airflow DAG looks like for incremental Postgres extraction:

from airflow import DAG
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
import io
default_args = {
"owner": "data-eng",
"depends_on_past": True,
"email_on_failure": True,
"email": ["data-team@company.com"],
"retries": 3,
"retry_delay": timedelta(minutes=5),
}
dag = DAG(
"postgres_orders_to_s3",
default_args=default_args,
schedule_interval="@hourly",
start_date=datetime(2024, 1, 1),
catchup=False,
)
def extract_and_load(**context):
pg_hook = PostgresHook(postgres_conn_id="prod_postgres")
s3_hook = S3Hook(aws_conn_id="data_lake_s3")
# Get bookmark from last run
last_run = context["prev_execution_date"] or datetime(2020, 1, 1)
query = f"""
SELECT * FROM orders
WHERE updated_at > '{last_run.isoformat()}'
ORDER BY updated_at
"""
df = pg_hook.get_pandas_df(query)
if df.empty:
return
table = pa.Table.from_pandas(df)
buf = io.BytesIO()
pq.write_table(table, buf)
key = f"raw/postgres/orders/{context['ts_nodash']}.parquet"
s3_hook.load_bytes(buf.getvalue(), key=key, bucket_name="company-lake")
extract_task = PythonOperator(
task_id="extract_orders",
python_callable=extract_and_load,
dag=dag,
)

That’s 50+ lines just for one table. You still need:

  • Airflow infrastructure (scheduler, webserver, database, workers)
  • Connection management in the Airflow UI
  • Monitoring and alerting setup
  • One DAG per source table (or complex dynamic DAGs)

The DataSpoc Pipe Way (5 commands, 10 lines of config)

Step 1: Install

Terminal window
pip install dataspoc-pipe

Step 2: Initialize a project

Terminal window
dataspoc-pipe init my-pipeline
cd my-pipeline

This creates:

my-pipeline/
pipeline.yaml
.dataspoc/
state/
logs/

Step 3: Add your Postgres source

Terminal window
dataspoc-pipe add postgres \
--host prod-db.company.com \
--port 5432 \
--database orders_db \
--tables orders,customers,products \
--incremental updated_at \
--destination s3://company-lake

This generates a clean YAML config:

pipeline.yaml
name: postgres-orders
source:
type: postgres
host: ${POSTGRES_HOST}
port: 5432
database: orders_db
tables:
- name: orders
incremental_key: updated_at
- name: customers
incremental_key: updated_at
- name: products
incremental_key: updated_at
destination:
type: s3
bucket: company-lake
prefix: raw/postgres
format: parquet

10 lines of actual configuration. Credentials come from environment variables — never stored in the file.

Step 4: Run the pipeline

Terminal window
export POSTGRES_HOST=prod-db.company.com
export POSTGRES_PASSWORD=<from-vault>
dataspoc-pipe run

Output:

[2024-03-15 10:23:01] Starting pipeline: postgres-orders
[2024-03-15 10:23:01] Source: postgres (orders_db)
[2024-03-15 10:23:01] Destination: s3://company-lake/raw/postgres/
[2024-03-15 10:23:02] Extracting: orders (incremental from 2024-03-15T09:00:00)
[2024-03-15 10:23:04] Extracted: 1,247 rows
[2024-03-15 10:23:05] Extracting: customers (incremental from 2024-03-15T09:00:00)
[2024-03-15 10:23:05] Extracted: 23 rows
[2024-03-15 10:23:06] Extracting: products (incremental from 2024-03-15T09:00:00)
[2024-03-15 10:23:06] Extracted: 5 rows
[2024-03-15 10:23:07] Pipeline complete: 3 tables, 1,275 rows, 2.4 MB written

Step 5: Check status

Terminal window
dataspoc-pipe status
Pipeline: postgres-orders
Last run: 2024-03-15 10:23:07 (SUCCESS)
Tables:
orders | 1,247 rows | bookmark: 2024-03-15T10:23:04
customers | 23 rows | bookmark: 2024-03-15T10:23:05
products | 5 rows | bookmark: 2024-03-15T10:23:06

How Incremental Extraction Works

DataSpoc Pipe stores bookmarks in the bucket itself:

s3://company-lake/
.dataspoc/
state/postgres-orders/state.json
raw/postgres/
orders/
20240315_102304.parquet
customers/
20240315_102305.parquet
products/
20240315_102306.parquet

The state file tracks the last extracted value for each table:

{
"pipeline": "postgres-orders",
"tables": {
"orders": {
"incremental_key": "updated_at",
"bookmark": "2024-03-15T10:23:04",
"rows_extracted": 1247
},
"customers": {
"incremental_key": "updated_at",
"bookmark": "2024-03-15T10:23:05",
"rows_extracted": 23
}
}
}

Next run, Pipe automatically picks up where it left off. No Airflow state management, no XCom, no database.

Scheduling

For scheduling, use cron, systemd timers, or any CI/CD system:

Terminal window
# crontab — run every hour
0 * * * * cd /opt/pipelines/my-pipeline && dataspoc-pipe run >> /var/log/pipe.log 2>&1

Or in GitHub Actions:

name: Hourly Extraction
on:
schedule:
- cron: "0 * * * *"
jobs:
extract:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- run: pip install dataspoc-pipe
- run: dataspoc-pipe run
env:
POSTGRES_HOST: ${{ secrets.POSTGRES_HOST }}
POSTGRES_PASSWORD: ${{ secrets.POSTGRES_PASSWORD }}
AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }}
AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }}

The Comparison

AspectAirflowDataSpoc Pipe
Lines of code per table50+10 (YAML)
Infrastructure neededScheduler, DB, workersNone (just the CLI)
Time to first extractionHours/days5 minutes
State managementAirflow metastoreBucket state file
Incremental logicYou build itBuilt-in
Output formatYou choose & implementParquet (optimized)
MonitoringAirflow UI + alertingdataspoc-pipe status + logs in bucket
SchedulingBuilt-incron / CI / any scheduler

When You Still Need Airflow

DataSpoc Pipe is for extraction — moving data from sources to your lake. You still might want Airflow (or Dagster, Prefect) if you need:

  • Complex DAG dependencies between dozens of jobs
  • Built-in retry UI with manual triggering
  • Integration with Spark/dbt/other transformation tools
  • Team collaboration features on pipeline authoring

But for the common case of “get data from A to Parquet in B, incrementally” — Pipe does it in 5 minutes, not 5 days.

Next Steps

Terminal window
pip install dataspoc-pipe
dataspoc-pipe init my-first-pipeline
dataspoc-pipe add --help

Check the full source list for all supported connectors: MySQL, MongoDB, REST APIs, Salesforce, HubSpot, and more.

Recommended