Postgres a S3 en 5 minutos con DataSpoc Pipe
La mayoria de los equipos pasan semanas construyendo pipelines de datos que mueven datos de Postgres a un data lake. Con Airflow, necesitas un DAG, operadores, conexiónes, programacion, monitoreo y todo un equipo de infraestructura. Con DataSpoc Pipe, necesitas 5 comandos.
El enfoque de Airflow (50+ lineas de boilerplate)
Asi se ve un DAG tipico de Airflow para extracción incremental de Postgres:
from airflow import DAGfrom airflow.providers.postgres.hooks.postgres import PostgresHookfrom airflow.providers.amazon.aws.hooks.s3 import S3Hookfrom airflow.operators.python import PythonOperatorfrom datetime import datetime, timedeltaimport pandas as pdimport pyarrow as paimport pyarrow.parquet as pqimport io
default_args = { "owner": "data-eng", "depends_on_past": True, "email_on_failure": True, "email": ["data-team@company.com"], "retries": 3, "retry_delay": timedelta(minutes=5),}
dag = DAG( "postgres_orders_to_s3", default_args=default_args, schedule_interval="@hourly", start_date=datetime(2024, 1, 1), catchup=False,)
def extract_and_load(**context): pg_hook = PostgresHook(postgres_conn_id="prod_postgres") s3_hook = S3Hook(aws_conn_id="data_lake_s3")
# Get bookmark from last run last_run = context["prev_execution_date"] or datetime(2020, 1, 1)
query = f""" SELECT * FROM orders WHERE updated_at > '{last_run.isoformat()}' ORDER BY updated_at """
df = pg_hook.get_pandas_df(query) if df.empty: return
table = pa.Table.from_pandas(df) buf = io.BytesIO() pq.write_table(table, buf)
key = f"raw/postgres/orders/{context['ts_nodash']}.parquet" s3_hook.load_bytes(buf.getvalue(), key=key, bucket_name="company-lake")
extract_task = PythonOperator( task_id="extract_orders", python_callable=extract_and_load, dag=dag,)Son 50+ lineas solo para una tabla. Todavia necesitas:
- Infraestructura de Airflow (scheduler, webserver, base de datos, workers)
- Gestion de conexiónes en la UI de Airflow
- Configuracion de monitoreo y alertas
- Un DAG por tabla fuente (o DAGs dinamicos complejos)
El enfoque de DataSpoc Pipe (5 comandos, 10 lineas de config)
Paso 1: Instalar
pip install dataspoc-pipePaso 2: Inicializar un proyecto
dataspoc-pipe init my-pipelinecd my-pipelineEsto crea:
my-pipeline/ pipeline.yaml .dataspoc/ state/ logs/Paso 3: Agregar tu fuente Postgres
dataspoc-pipe add postgres \ --host prod-db.company.com \ --port 5432 \ --database orders_db \ --tables orders,customers,products \ --incremental updated_at \ --destination s3://company-lakeEsto genera una configuración YAML limpia:
name: postgres-orderssource: type: postgres host: ${POSTGRES_HOST} port: 5432 database: orders_db tables: - name: orders incremental_key: updated_at - name: customers incremental_key: updated_at - name: products incremental_key: updated_at
destination: type: s3 bucket: company-lake prefix: raw/postgres format: parquet10 lineas de configuración real. Las credenciales vienen de variables de entorno — nunca se almacenan en el archivo.
Paso 4: Ejecutar el pipeline
export POSTGRES_HOST=prod-db.company.comexport POSTGRES_PASSWORD=<from-vault>
dataspoc-pipe runSalida:
[2024-03-15 10:23:01] Starting pipeline: postgres-orders[2024-03-15 10:23:01] Source: postgres (orders_db)[2024-03-15 10:23:01] Destination: s3://company-lake/raw/postgres/[2024-03-15 10:23:02] Extracting: orders (incremental from 2024-03-15T09:00:00)[2024-03-15 10:23:04] Extracted: 1,247 rows[2024-03-15 10:23:05] Extracting: customers (incremental from 2024-03-15T09:00:00)[2024-03-15 10:23:05] Extracted: 23 rows[2024-03-15 10:23:06] Extracting: products (incremental from 2024-03-15T09:00:00)[2024-03-15 10:23:06] Extracted: 5 rows[2024-03-15 10:23:07] Pipeline complete: 3 tables, 1,275 rows, 2.4 MB writtenPaso 5: Verificar estado
dataspoc-pipe statusPipeline: postgres-ordersLast run: 2024-03-15 10:23:07 (SUCCESS)Tables: orders | 1,247 rows | bookmark: 2024-03-15T10:23:04 customers | 23 rows | bookmark: 2024-03-15T10:23:05 products | 5 rows | bookmark: 2024-03-15T10:23:06Como funcióna la extracción incremental
DataSpoc Pipe almacena bookmarks en el propio bucket:
s3://company-lake/ .dataspoc/ state/postgres-orders/state.json raw/postgres/ orders/ 20240315_102304.parquet customers/ 20240315_102305.parquet products/ 20240315_102306.parquetEl archivo de estado rastrea el ultimo valor extraido para cada tabla:
{ "pipeline": "postgres-orders", "tables": { "orders": { "incremental_key": "updated_at", "bookmark": "2024-03-15T10:23:04", "rows_extracted": 1247 }, "customers": { "incremental_key": "updated_at", "bookmark": "2024-03-15T10:23:05", "rows_extracted": 23 } }}En la siguiente ejecucion, Pipe automáticamente retoma donde lo dejo. Sin gestion de estado de Airflow, sin XCom, sin base de datos.
Programacion
Para la programacion, usa cron, timers de systemd o cualquier sistema CI/CD:
# crontab — run every hour0 * * * * cd /opt/pipelines/my-pipeline && dataspoc-pipe run >> /var/log/pipe.log 2>&1O en GitHub Actions:
name: Hourly Extractionon: schedule: - cron: "0 * * * *"jobs: extract: runs-on: ubuntu-latest steps: - uses: actions/checkout@v4 - run: pip install dataspoc-pipe - run: dataspoc-pipe run env: POSTGRES_HOST: ${{ secrets.POSTGRES_HOST }} POSTGRES_PASSWORD: ${{ secrets.POSTGRES_PASSWORD }} AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }} AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }}La comparación
| Aspecto | Airflow | DataSpoc Pipe |
|---|---|---|
| Lineas de código por tabla | 50+ | 10 (YAML) |
| Infraestructura necesaria | Scheduler, DB, workers | Ninguna (solo el CLI) |
| Tiempo hasta primera extracción | Horas/dias | 5 minutos |
| Gestion de estado | Metastore de Airflow | Archivo de estado en bucket |
| Logica incremental | Tu la construyes | Integrada |
| Formato de salida | Tu eliges e implementas | Parquet (optimizado) |
| Monitoreo | UI de Airflow + alertas | dataspoc-pipe status + logs en bucket |
| Programacion | Integrada | cron / CI / cualquier scheduler |
Cuando todavia necesitas Airflow
DataSpoc Pipe es para extracción — mover datos de fuentes a tu lake. Aun podrias querer Airflow (o Dagster, Prefect) si necesitas:
- Dependencias complejas de DAG entre docenas de trabajos
- UI de reintentos con activacion manual
- Integracion con Spark/dbt/otras herramientas de transformacion
- Funciones de colaboracion en equipo para la creacion de pipelines
Pero para el caso comun de “obtener datos de A a Parquet en B, incrementalmente” — Pipe lo hace en 5 minutos, no en 5 dias.
Proximos pasos
pip install dataspoc-pipedataspoc-pipe init my-first-pipelinedataspoc-pipe add --helpConsulta la lista completa de fuentes para todos los conectores soportados: MySQL, MongoDB, REST APIs, Salesforce, HubSpot y mas.