data-engineeringetlpostgresqls3parquetairflow

Postgres a S3 en 5 minutos con DataSpoc Pipe

Michael San Martim · 2026-04-20

La mayoria de los equipos pasan semanas construyendo pipelines de datos que mueven datos de Postgres a un data lake. Con Airflow, necesitas un DAG, operadores, conexiónes, programacion, monitoreo y todo un equipo de infraestructura. Con DataSpoc Pipe, necesitas 5 comandos.

El enfoque de Airflow (50+ lineas de boilerplate)

Asi se ve un DAG tipico de Airflow para extracción incremental de Postgres:

from airflow import DAG
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
import io
default_args = {
"owner": "data-eng",
"depends_on_past": True,
"email_on_failure": True,
"email": ["data-team@company.com"],
"retries": 3,
"retry_delay": timedelta(minutes=5),
}
dag = DAG(
"postgres_orders_to_s3",
default_args=default_args,
schedule_interval="@hourly",
start_date=datetime(2024, 1, 1),
catchup=False,
)
def extract_and_load(**context):
pg_hook = PostgresHook(postgres_conn_id="prod_postgres")
s3_hook = S3Hook(aws_conn_id="data_lake_s3")
# Get bookmark from last run
last_run = context["prev_execution_date"] or datetime(2020, 1, 1)
query = f"""
SELECT * FROM orders
WHERE updated_at > '{last_run.isoformat()}'
ORDER BY updated_at
"""
df = pg_hook.get_pandas_df(query)
if df.empty:
return
table = pa.Table.from_pandas(df)
buf = io.BytesIO()
pq.write_table(table, buf)
key = f"raw/postgres/orders/{context['ts_nodash']}.parquet"
s3_hook.load_bytes(buf.getvalue(), key=key, bucket_name="company-lake")
extract_task = PythonOperator(
task_id="extract_orders",
python_callable=extract_and_load,
dag=dag,
)

Son 50+ lineas solo para una tabla. Todavia necesitas:

  • Infraestructura de Airflow (scheduler, webserver, base de datos, workers)
  • Gestion de conexiónes en la UI de Airflow
  • Configuracion de monitoreo y alertas
  • Un DAG por tabla fuente (o DAGs dinamicos complejos)

El enfoque de DataSpoc Pipe (5 comandos, 10 lineas de config)

Paso 1: Instalar

Terminal window
pip install dataspoc-pipe

Paso 2: Inicializar un proyecto

Terminal window
dataspoc-pipe init my-pipeline
cd my-pipeline

Esto crea:

my-pipeline/
pipeline.yaml
.dataspoc/
state/
logs/

Paso 3: Agregar tu fuente Postgres

Terminal window
dataspoc-pipe add postgres \
--host prod-db.company.com \
--port 5432 \
--database orders_db \
--tables orders,customers,products \
--incremental updated_at \
--destination s3://company-lake

Esto genera una configuración YAML limpia:

pipeline.yaml
name: postgres-orders
source:
type: postgres
host: ${POSTGRES_HOST}
port: 5432
database: orders_db
tables:
- name: orders
incremental_key: updated_at
- name: customers
incremental_key: updated_at
- name: products
incremental_key: updated_at
destination:
type: s3
bucket: company-lake
prefix: raw/postgres
format: parquet

10 lineas de configuración real. Las credenciales vienen de variables de entorno — nunca se almacenan en el archivo.

Paso 4: Ejecutar el pipeline

Terminal window
export POSTGRES_HOST=prod-db.company.com
export POSTGRES_PASSWORD=<from-vault>
dataspoc-pipe run

Salida:

[2024-03-15 10:23:01] Starting pipeline: postgres-orders
[2024-03-15 10:23:01] Source: postgres (orders_db)
[2024-03-15 10:23:01] Destination: s3://company-lake/raw/postgres/
[2024-03-15 10:23:02] Extracting: orders (incremental from 2024-03-15T09:00:00)
[2024-03-15 10:23:04] Extracted: 1,247 rows
[2024-03-15 10:23:05] Extracting: customers (incremental from 2024-03-15T09:00:00)
[2024-03-15 10:23:05] Extracted: 23 rows
[2024-03-15 10:23:06] Extracting: products (incremental from 2024-03-15T09:00:00)
[2024-03-15 10:23:06] Extracted: 5 rows
[2024-03-15 10:23:07] Pipeline complete: 3 tables, 1,275 rows, 2.4 MB written

Paso 5: Verificar estado

Terminal window
dataspoc-pipe status
Pipeline: postgres-orders
Last run: 2024-03-15 10:23:07 (SUCCESS)
Tables:
orders | 1,247 rows | bookmark: 2024-03-15T10:23:04
customers | 23 rows | bookmark: 2024-03-15T10:23:05
products | 5 rows | bookmark: 2024-03-15T10:23:06

Como funcióna la extracción incremental

DataSpoc Pipe almacena bookmarks en el propio bucket:

s3://company-lake/
.dataspoc/
state/postgres-orders/state.json
raw/postgres/
orders/
20240315_102304.parquet
customers/
20240315_102305.parquet
products/
20240315_102306.parquet

El archivo de estado rastrea el ultimo valor extraido para cada tabla:

{
"pipeline": "postgres-orders",
"tables": {
"orders": {
"incremental_key": "updated_at",
"bookmark": "2024-03-15T10:23:04",
"rows_extracted": 1247
},
"customers": {
"incremental_key": "updated_at",
"bookmark": "2024-03-15T10:23:05",
"rows_extracted": 23
}
}
}

En la siguiente ejecucion, Pipe automáticamente retoma donde lo dejo. Sin gestion de estado de Airflow, sin XCom, sin base de datos.

Programacion

Para la programacion, usa cron, timers de systemd o cualquier sistema CI/CD:

Terminal window
# crontab — run every hour
0 * * * * cd /opt/pipelines/my-pipeline && dataspoc-pipe run >> /var/log/pipe.log 2>&1

O en GitHub Actions:

name: Hourly Extraction
on:
schedule:
- cron: "0 * * * *"
jobs:
extract:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- run: pip install dataspoc-pipe
- run: dataspoc-pipe run
env:
POSTGRES_HOST: ${{ secrets.POSTGRES_HOST }}
POSTGRES_PASSWORD: ${{ secrets.POSTGRES_PASSWORD }}
AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }}
AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }}

La comparación

AspectoAirflowDataSpoc Pipe
Lineas de código por tabla50+10 (YAML)
Infraestructura necesariaScheduler, DB, workersNinguna (solo el CLI)
Tiempo hasta primera extracciónHoras/dias5 minutos
Gestion de estadoMetastore de AirflowArchivo de estado en bucket
Logica incrementalTu la construyesIntegrada
Formato de salidaTu eliges e implementasParquet (optimizado)
MonitoreoUI de Airflow + alertasdataspoc-pipe status + logs en bucket
ProgramacionIntegradacron / CI / cualquier scheduler

Cuando todavia necesitas Airflow

DataSpoc Pipe es para extracción — mover datos de fuentes a tu lake. Aun podrias querer Airflow (o Dagster, Prefect) si necesitas:

  • Dependencias complejas de DAG entre docenas de trabajos
  • UI de reintentos con activacion manual
  • Integracion con Spark/dbt/otras herramientas de transformacion
  • Funciones de colaboracion en equipo para la creacion de pipelines

Pero para el caso comun de “obtener datos de A a Parquet en B, incrementalmente” — Pipe lo hace en 5 minutos, no en 5 dias.

Proximos pasos

Terminal window
pip install dataspoc-pipe
dataspoc-pipe init my-first-pipeline
dataspoc-pipe add --help

Consulta la lista completa de fuentes para todos los conectores soportados: MySQL, MongoDB, REST APIs, Salesforce, HubSpot y mas.

Recomendados