airflowetlyamldata-engineeringcomparison

10 Líneas de YAML vs 200 Líneas de DAG de Airflow

Michael San Martim · 2026-04-28

Construyamos el mismo pipeline ETL dos veces. Una con Apache Airflow. Una con DataSpoc Pipe. Misma entrada, misma salida, mismo resultado. Tú decides cuál preferirías mantener a las 2am cuando suena la alerta de guardia.

El Caso de Uso

Extraer datos de una API REST (un servicio de pagos), transformarlos (renombrar columnas, filtrar nulos, castear tipos), y cargarlos a S3 cómo Parquet.

Fuente: https://api.payments.example.com/v1/transactions Destino: s3://company-data-lake/raw/payments/transactions/ Horario: Cada hora Incremental: Solo obtener transacciones desde la última ejecución

La Implementación con Airflow

Archivo 1: docker-compose.yaml (45 líneas)

docker-compose.yaml
version: "3.8"
services:
postgres:
image: postgres:15
environment:
POSTGRES_USER: airflow
POSTGRES_PASSWORD: airflow
POSTGRES_DB: airflow
volumes:
- postgres-data:/var/lib/postgresql/data
webserver:
image: apache/airflow:2.8.0
command: webserver
ports:
- "8080:8080"
environment:
AIRFLOW__CORE__EXECUTOR: LocalExecutor
AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
AIRFLOW__CORE__FERNET_KEY: "your-fernet-key-here"
volumes:
- ./dags:/opt/airflow/dags
- ./plugins:/opt/airflow/plugins
depends_on:
- postgres
scheduler:
image: apache/airflow:2.8.0
command: scheduler
environment:
AIRFLOW__CORE__EXECUTOR: LocalExecutor
AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
AIRFLOW__CORE__FERNET_KEY: "your-fernet-key-here"
volumes:
- ./dags:/opt/airflow/dags
- ./plugins:/opt/airflow/plugins
depends_on:
- postgres
- webserver
volumes:
postgres-data:

Archivo 2: requirements.txt (6 líneas)

apache-airflow==2.8.0
apache-airflow-providers-amazon==8.0.0
requests==2.31.0
pyarrow==14.0.0
pandas==2.1.0
boto3==1.34.0

Archivo 3: dags/payments_etl.py (120 líneas)

dags/payments_etl.py
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.models import Variable
from datetime import datetime, timedelta
import requests
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
import json
import io
default_args = {
"owner": "data-engineering",
"depends_on_past": True,
"email_on_failure": True,
"email": ["oncall@company.com"],
"retries": 3,
"retry_delay": timedelta(minutes=5),
}
def extract_payments(**context):
"""Extract transactions from payments API."""
api_key = Variable.get("payments_api_key")
base_url = "https://api.payments.example.com/v1/transactions"
# Get bookmark (last successful extraction timestamp)
try:
bookmark = Variable.get("payments_last_timestamp")
except KeyError:
bookmark = "2024-01-01T00:00:00Z"
headers = {"Authorization": f"Bearer {api_key}"}
params = {
"created_after": bookmark,
"limit": 1000,
}
all_records = []
page = 1
while True:
params["page"] = page
response = requests.get(base_url, headers=headers, params=params)
response.raise_for_status()
data = response.json()
records = data.get("transactions", [])
if not records:
break
all_records.extend(records)
page += 1
if not data.get("has_more", False):
break
if not all_records:
return "No new data"
# Push to XCom for next task
context["ti"].xcom_push(key="raw_data", value=json.dumps(all_records))
context["ti"].xcom_push(key="record_count", value=len(all_records))
return f"Extracted {len(all_records)} records"
def transform_payments(**context):
"""Transform: rename columns, filter nulls, cast types."""
raw_json = context["ti"].xcom_pull(key="raw_data", task_ids="extract")
records = json.loads(raw_json)
df = pd.DataFrame(records)
# Rename columns
df = df.rename(columns={
"txn_id": "transaction_id",
"amt": "amount",
"ccy": "currency",
"ts": "created_at",
"cust_id": "customer_id",
})
# Filter nulls
df = df.dropna(subset=["transaction_id", "amount", "customer_id"])
# Cast types
df["amount"] = df["amount"].astype(float)
df["created_at"] = pd.to_datetime(df["created_at"])
df["customer_id"] = df["customer_id"].astype(str)
context["ti"].xcom_push(key="transformed_data", value=df.to_json())
return f"Transformed {len(df)} records"
def load_to_s3(**context):
"""Load transformed data to S3 as Parquet."""
df_json = context["ti"].xcom_pull(key="transformed_data", task_ids="transform")
df = pd.read_json(df_json)
table = pa.Table.from_pandas(df)
buf = io.BytesIO()
pq.write_table(table, buf)
buf.seek(0)
execution_date = context["execution_date"].strftime("%Y%m%d_%H%M%S")
key = f"raw/payments/transactions/{execution_date}.parquet"
s3_hook = S3Hook(aws_conn_id="s3_data_lake")
s3_hook.load_bytes(
bytes_data=buf.getvalue(),
key=key,
bucket_name="company-data-lake",
replace=True,
)
# Update bookmark
max_timestamp = df["created_at"].max().isoformat()
Variable.set("payments_last_timestamp", max_timestamp)
return f"Loaded {len(df)} rows to s3://{key}"
with DAG(
"payments_etl",
default_args=default_args,
schedule_interval="@hourly",
start_date=datetime(2024, 1, 1),
catchup=False,
tags=["etl", "payments"],
) as dag:
extract = PythonOperator(
task_id="extract",
python_callable=extract_payments,
)
transform = PythonOperator(
task_id="transform",
python_callable=transform_payments,
)
load = PythonOperator(
task_id="load",
python_callable=load_to_s3,
)
extract >> transform >> load

Archivo 4: Configuración de UI de Airflow (pasos manuales)

1. Open http://localhost:8080
2. Admin → Connections → Add:
- Conn ID: s3_data_lake
- Conn Type: Amazon Web Services
- AWS Access Key ID: ****
- AWS Secret Access Key: ****
3. Admin → Variables → Add:
- Key: payments_api_key
- Value: sk_live_****
4. Admin → Variables → Add:
- Key: payments_last_timestamp
- Value: 2024-01-01T00:00:00Z

Total: ~200 líneas en 4 archivos, más configuración manual de UI.


La Implementación con DataSpoc Pipe

Archivo 1: pipelines/api-payments.yaml (10 líneas)

pipeline: api-payments
source:
type: rest_api
url: "https://api.payments.example.com/v1/transactions"
auth: bearer ${PAYMENTS_API_KEY}
pagination: cursor
incremental:
field: created_at
strategy: bookmark
target:
type: s3
bucket: s3://company-data-lake
prefix: raw/payments

Archivo 2: transforms/api-payments.py (5 líneas)

def transform(df):
df = df.rename(columns={"txn_id": "transaction_id", "amt": "amount", "ccy": "currency", "ts": "created_at", "cust_id": "customer_id"})
df = df.dropna(subset=["transaction_id", "amount", "customer_id"])
df["amount"] = df["amount"].astype(float)
return df

Ejecútalo

Terminal window
export PAYMENTS_API_KEY=sk_live_****
dataspoc-pipe run api-payments

Salida:

[api-payments] Extracting from REST API...
[api-payments] Page 1: 1000 records
[api-payments] Page 2: 847 records
[api-payments] Applying transform: transforms/api-payments.py
[api-payments] Filtered: 1847 → 1823 records (24 nulls removed)
[api-payments] Writing: s3://company-data-lake/raw/payments/transactions/20260415_100000.parquet
[api-payments] Bookmark saved: 2026-04-15T09:58:42Z
[api-payments] Done. 1,823 rows, 4.1s

Total: 15 líneas en 2 archivos. Sin infraestructura.


Comparación Lado a Lado

DimensiónAirflowDataSpoc Pipe
Archivos4 (docker-compose, requirements, DAG, configuración manual)2 (YAML + transform)
Líneas de código~200~15
InfraestructuraPostgres + Webserver + SchedulerNinguna (solo el CLI)
Costo mensual$200-500 (cluster mínimo viable)$0 (se ejecuta en cualquier lugar)
Tiempo de configuración2-4 horas5 minutos
Dependencias6 paquetes Python + Docker1 (pip install dataspoc-pipe)
Gestión de bookmarksManual (Variables de Airflow)Automática (archivo de estado en bucket)
PaginaciónManual (escribir el bucle tú mismo)Declarativa (pagination: cursor)
Gestión de secretosUI de Airflow + clave FernetVariables de entorno
MonitoreoUI de Airflow (requiere cluster corriendo)Archivos de log en bucket + CLI status
Agregar una nueva APICopiar DAG de 120 líneas, modificarCopiar YAML de 10 líneas, modificar
ProgramaciónDAG schedule_intervalCron o campo schedule en YAML

La Comparación de Transformaciones

Airflow te obliga a escribir la transformación cómo una función Python dentro del DAG, pasar datos a través de XCom (serialización JSON) y gestionar estado entre tareas.

Las transformaciones de DataSpoc Pipe son funciónes puras. Entrada: DataFrame. Salida: DataFrame. Sin necesidad de conocer el framework:

# This is the ENTIRE transform file
def transform(df):
df = df.rename(columns={"txn_id": "transaction_id", "amt": "amount", "ccy": "currency", "ts": "created_at", "cust_id": "customer_id"})
df = df.dropna(subset=["transaction_id", "amount", "customer_id"])
df["amount"] = df["amount"].astype(float)
return df

Puedes probarlo localmente con pandas:

import pandas as pd
from transforms.api_payments import transform
test_df = pd.DataFrame([
{"txn_id": "t1", "amt": "99.99", "ccy": "USD", "ts": "2026-04-15", "cust_id": "c1"},
{"txn_id": None, "amt": "50.00", "ccy": "EUR", "ts": "2026-04-15", "cust_id": "c2"},
])
result = transform(test_df)
assert len(result) == 1 # null txn_id filtered out
assert result.iloc[0]["amount"] == 99.99 # cast to float

Agregar un Segundo Pipeline

Airflow: Copiar, Modificar, Depurar

Copias el DAG de 120 líneas, renombras variables, cambias el endpoint de la API, ajustas la lógica de transformación, agregas una nueva conexión en la UI, agregas nuevas variables. Repetir para cada fuente.

DataSpoc Pipe: Copiar, Modificar, Listo

pipelines/api-crm.yaml
pipeline: api-crm
source:
type: rest_api
url: "https://api.crm.example.com/v2/contacts"
auth: bearer ${CRM_API_KEY}
pagination: offset
incremental:
field: updated_at
strategy: bookmark
target:
type: s3
bucket: s3://company-data-lake
prefix: raw/crm
Terminal window
dataspoc-pipe run api-crm

Dos minutos, incluyendo escribir el YAML.

Cuándo Airflow ES la Elección Correcta

Seamos justos. Airflow gana cuando necesitas:

  • Dependencias complejas de DAG — La tarea A alimenta la tarea B que alimenta las tareas C y D en paralelo, con ramificación condicional y lógica de reintento por tarea
  • Orquestación entre sistemas — disparar un trabajo Spark, esperar a que termine, luego disparar una ejecución de dbt, luego notificar a Slack, luego disparar un reentrenamiento de modelo
  • Gobernanza empresarial — logs de auditoría, RBAC, flujos de aprobación, monitoreo de SLA
  • Equipo de 10+ ingenieros de datos — plataforma central de orquestación con visibilidad compartida
  • Inversión existente en Airflow — ya tienes 200 DAGs corriendo, agregar uno más es costo marginal

DataSpoc Pipe es para:

  • Ingesta — llevar datos de la fuente a Parquet en bucket
  • Equipos pequeños a medianos (1-5 ingenieros de datos)
  • Velocidad — de prototipo a producción en minutos, no días
  • Simplicidad — sin infraestructura que mantener
  • Costo — $0 en lugar de $200-500/mes por un cluster de Airflow

El Enfoque Híbrido

Puedes usar ambos. Pipe maneja la ingesta (la parte aburrida y repetitiva). Airflow maneja la orquestación (la parte compleja y personalizada):

# dags/daily_pipeline.py — Airflow DAG that calls Pipe
from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime
with DAG("daily_ingestion", schedule_interval="@daily", start_date=datetime(2024, 1, 1)) as dag:
ingest = BashOperator(
task_id="run_all_pipes",
bash_command="dataspoc-pipe run --all",
)
dbt_transform = BashOperator(
task_id="dbt_run",
bash_command="dbt run --project-dir /opt/dbt",
)
notify = BashOperator(
task_id="slack_notify",
bash_command='curl -X POST $SLACK_WEBHOOK -d \'{"text": "Pipeline complete"}\'',
)
ingest >> dbt_transform >> notify

Lo mejor de ambos mundos: la simplicidad de Pipe para ingesta, el poder de Airflow para orquestación.

Conclusión

Para el 90% de las tareas de ingesta de datos, no necesitas un DAG de 200 líneas y un cluster de Kubernetes corriendo Airflow. Necesitas 10 líneas de YAML y un pip install.

Terminal window
pip install dataspoc-pipe
dataspoc-pipe init my-project --bucket s3://my-bucket
dataspoc-pipe add api-payments --source rest_api
dataspoc-pipe run api-payments

Cuatro comandos. Cero infraestructura. Mismo resultado.

Recomendados