Postgres to S3 in 5 Minutes with DataSpoc Pipe
Most teams spend weeks building data pipelines that move data from Postgres to a data lake. With Airflow, you need a DAG, operators, connections, scheduling, monitoring, and a whole infrastructure team. With DataSpoc Pipe, you need 5 commands.
The Airflow Way (50+ lines of boilerplate)
Here’s what a typical Airflow DAG looks like for incremental Postgres extraction:
from airflow import DAGfrom airflow.providers.postgres.hooks.postgres import PostgresHookfrom airflow.providers.amazon.aws.hooks.s3 import S3Hookfrom airflow.operators.python import PythonOperatorfrom datetime import datetime, timedeltaimport pandas as pdimport pyarrow as paimport pyarrow.parquet as pqimport io
default_args = { "owner": "data-eng", "depends_on_past": True, "email_on_failure": True, "email": ["data-team@company.com"], "retries": 3, "retry_delay": timedelta(minutes=5),}
dag = DAG( "postgres_orders_to_s3", default_args=default_args, schedule_interval="@hourly", start_date=datetime(2024, 1, 1), catchup=False,)
def extract_and_load(**context): pg_hook = PostgresHook(postgres_conn_id="prod_postgres") s3_hook = S3Hook(aws_conn_id="data_lake_s3")
# Get bookmark from last run last_run = context["prev_execution_date"] or datetime(2020, 1, 1)
query = f""" SELECT * FROM orders WHERE updated_at > '{last_run.isoformat()}' ORDER BY updated_at """
df = pg_hook.get_pandas_df(query) if df.empty: return
table = pa.Table.from_pandas(df) buf = io.BytesIO() pq.write_table(table, buf)
key = f"raw/postgres/orders/{context['ts_nodash']}.parquet" s3_hook.load_bytes(buf.getvalue(), key=key, bucket_name="company-lake")
extract_task = PythonOperator( task_id="extract_orders", python_callable=extract_and_load, dag=dag,)That’s 50+ lines just for one table. You still need:
- Airflow infrastructure (scheduler, webserver, database, workers)
- Connection management in the Airflow UI
- Monitoring and alerting setup
- One DAG per source table (or complex dynamic DAGs)
The DataSpoc Pipe Way (5 commands, 10 lines of config)
Step 1: Install
pip install dataspoc-pipeStep 2: Initialize a project
dataspoc-pipe init my-pipelinecd my-pipelineThis creates:
my-pipeline/ pipeline.yaml .dataspoc/ state/ logs/Step 3: Add your Postgres source
dataspoc-pipe add postgres \ --host prod-db.company.com \ --port 5432 \ --database orders_db \ --tables orders,customers,products \ --incremental updated_at \ --destination s3://company-lakeThis generates a clean YAML config:
name: postgres-orderssource: type: postgres host: ${POSTGRES_HOST} port: 5432 database: orders_db tables: - name: orders incremental_key: updated_at - name: customers incremental_key: updated_at - name: products incremental_key: updated_at
destination: type: s3 bucket: company-lake prefix: raw/postgres format: parquet10 lines of actual configuration. Credentials come from environment variables — never stored in the file.
Step 4: Run the pipeline
export POSTGRES_HOST=prod-db.company.comexport POSTGRES_PASSWORD=<from-vault>
dataspoc-pipe runOutput:
[2024-03-15 10:23:01] Starting pipeline: postgres-orders[2024-03-15 10:23:01] Source: postgres (orders_db)[2024-03-15 10:23:01] Destination: s3://company-lake/raw/postgres/[2024-03-15 10:23:02] Extracting: orders (incremental from 2024-03-15T09:00:00)[2024-03-15 10:23:04] Extracted: 1,247 rows[2024-03-15 10:23:05] Extracting: customers (incremental from 2024-03-15T09:00:00)[2024-03-15 10:23:05] Extracted: 23 rows[2024-03-15 10:23:06] Extracting: products (incremental from 2024-03-15T09:00:00)[2024-03-15 10:23:06] Extracted: 5 rows[2024-03-15 10:23:07] Pipeline complete: 3 tables, 1,275 rows, 2.4 MB writtenStep 5: Check status
dataspoc-pipe statusPipeline: postgres-ordersLast run: 2024-03-15 10:23:07 (SUCCESS)Tables: orders | 1,247 rows | bookmark: 2024-03-15T10:23:04 customers | 23 rows | bookmark: 2024-03-15T10:23:05 products | 5 rows | bookmark: 2024-03-15T10:23:06How Incremental Extraction Works
DataSpoc Pipe stores bookmarks in the bucket itself:
s3://company-lake/ .dataspoc/ state/postgres-orders/state.json raw/postgres/ orders/ 20240315_102304.parquet customers/ 20240315_102305.parquet products/ 20240315_102306.parquetThe state file tracks the last extracted value for each table:
{ "pipeline": "postgres-orders", "tables": { "orders": { "incremental_key": "updated_at", "bookmark": "2024-03-15T10:23:04", "rows_extracted": 1247 }, "customers": { "incremental_key": "updated_at", "bookmark": "2024-03-15T10:23:05", "rows_extracted": 23 } }}Next run, Pipe automatically picks up where it left off. No Airflow state management, no XCom, no database.
Scheduling
For scheduling, use cron, systemd timers, or any CI/CD system:
# crontab — run every hour0 * * * * cd /opt/pipelines/my-pipeline && dataspoc-pipe run >> /var/log/pipe.log 2>&1Or in GitHub Actions:
name: Hourly Extractionon: schedule: - cron: "0 * * * *"jobs: extract: runs-on: ubuntu-latest steps: - uses: actions/checkout@v4 - run: pip install dataspoc-pipe - run: dataspoc-pipe run env: POSTGRES_HOST: ${{ secrets.POSTGRES_HOST }} POSTGRES_PASSWORD: ${{ secrets.POSTGRES_PASSWORD }} AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }} AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }}The Comparison
| Aspect | Airflow | DataSpoc Pipe |
|---|---|---|
| Lines of code per table | 50+ | 10 (YAML) |
| Infrastructure needed | Scheduler, DB, workers | None (just the CLI) |
| Time to first extraction | Hours/days | 5 minutes |
| State management | Airflow metastore | Bucket state file |
| Incremental logic | You build it | Built-in |
| Output format | You choose & implement | Parquet (optimized) |
| Monitoring | Airflow UI + alerting | dataspoc-pipe status + logs in bucket |
| Scheduling | Built-in | cron / CI / any scheduler |
When You Still Need Airflow
DataSpoc Pipe is for extraction — moving data from sources to your lake. You still might want Airflow (or Dagster, Prefect) if you need:
- Complex DAG dependencies between dozens of jobs
- Built-in retry UI with manual triggering
- Integration with Spark/dbt/other transformation tools
- Team collaboration features on pipeline authoring
But for the common case of “get data from A to Parquet in B, incrementally” — Pipe does it in 5 minutes, not 5 days.
Next Steps
pip install dataspoc-pipedataspoc-pipe init my-first-pipelinedataspoc-pipe add --helpCheck the full source list for all supported connectors: MySQL, MongoDB, REST APIs, Salesforce, HubSpot, and more.