10 Líneas de YAML vs 200 Líneas de DAG de Airflow
Construyamos el mismo pipeline ETL dos veces. Una con Apache Airflow. Una con DataSpoc Pipe. Misma entrada, misma salida, mismo resultado. Tú decides cuál preferirías mantener a las 2am cuando suena la alerta de guardia.
El Caso de Uso
Extraer datos de una API REST (un servicio de pagos), transformarlos (renombrar columnas, filtrar nulos, castear tipos), y cargarlos a S3 cómo Parquet.
Fuente: https://api.payments.example.com/v1/transactions
Destino: s3://company-data-lake/raw/payments/transactions/
Horario: Cada hora
Incremental: Solo obtener transacciones desde la última ejecución
La Implementación con Airflow
Archivo 1: docker-compose.yaml (45 líneas)
version: "3.8"services: postgres: image: postgres:15 environment: POSTGRES_USER: airflow POSTGRES_PASSWORD: airflow POSTGRES_DB: airflow volumes: - postgres-data:/var/lib/postgresql/data
webserver: image: apache/airflow:2.8.0 command: webserver ports: - "8080:8080" environment: AIRFLOW__CORE__EXECUTOR: LocalExecutor AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow AIRFLOW__CORE__FERNET_KEY: "your-fernet-key-here" volumes: - ./dags:/opt/airflow/dags - ./plugins:/opt/airflow/plugins depends_on: - postgres
scheduler: image: apache/airflow:2.8.0 command: scheduler environment: AIRFLOW__CORE__EXECUTOR: LocalExecutor AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow AIRFLOW__CORE__FERNET_KEY: "your-fernet-key-here" volumes: - ./dags:/opt/airflow/dags - ./plugins:/opt/airflow/plugins depends_on: - postgres - webserver
volumes: postgres-data:Archivo 2: requirements.txt (6 líneas)
apache-airflow==2.8.0apache-airflow-providers-amazon==8.0.0requests==2.31.0pyarrow==14.0.0pandas==2.1.0boto3==1.34.0Archivo 3: dags/payments_etl.py (120 líneas)
from airflow import DAGfrom airflow.operators.python import PythonOperatorfrom airflow.providers.amazon.aws.hooks.s3 import S3Hookfrom airflow.models import Variablefrom datetime import datetime, timedeltaimport requestsimport pandas as pdimport pyarrow as paimport pyarrow.parquet as pqimport jsonimport io
default_args = { "owner": "data-engineering", "depends_on_past": True, "email_on_failure": True, "email": ["oncall@company.com"], "retries": 3, "retry_delay": timedelta(minutes=5),}
def extract_payments(**context): """Extract transactions from payments API.""" api_key = Variable.get("payments_api_key") base_url = "https://api.payments.example.com/v1/transactions"
# Get bookmark (last successful extraction timestamp) try: bookmark = Variable.get("payments_last_timestamp") except KeyError: bookmark = "2024-01-01T00:00:00Z"
headers = {"Authorization": f"Bearer {api_key}"} params = { "created_after": bookmark, "limit": 1000, }
all_records = [] page = 1
while True: params["page"] = page response = requests.get(base_url, headers=headers, params=params) response.raise_for_status() data = response.json()
records = data.get("transactions", []) if not records: break
all_records.extend(records) page += 1
if not data.get("has_more", False): break
if not all_records: return "No new data"
# Push to XCom for next task context["ti"].xcom_push(key="raw_data", value=json.dumps(all_records)) context["ti"].xcom_push(key="record_count", value=len(all_records))
return f"Extracted {len(all_records)} records"
def transform_payments(**context): """Transform: rename columns, filter nulls, cast types.""" raw_json = context["ti"].xcom_pull(key="raw_data", task_ids="extract") records = json.loads(raw_json)
df = pd.DataFrame(records)
# Rename columns df = df.rename(columns={ "txn_id": "transaction_id", "amt": "amount", "ccy": "currency", "ts": "created_at", "cust_id": "customer_id", })
# Filter nulls df = df.dropna(subset=["transaction_id", "amount", "customer_id"])
# Cast types df["amount"] = df["amount"].astype(float) df["created_at"] = pd.to_datetime(df["created_at"]) df["customer_id"] = df["customer_id"].astype(str)
context["ti"].xcom_push(key="transformed_data", value=df.to_json()) return f"Transformed {len(df)} records"
def load_to_s3(**context): """Load transformed data to S3 as Parquet.""" df_json = context["ti"].xcom_pull(key="transformed_data", task_ids="transform") df = pd.read_json(df_json)
table = pa.Table.from_pandas(df) buf = io.BytesIO() pq.write_table(table, buf) buf.seek(0)
execution_date = context["execution_date"].strftime("%Y%m%d_%H%M%S") key = f"raw/payments/transactions/{execution_date}.parquet"
s3_hook = S3Hook(aws_conn_id="s3_data_lake") s3_hook.load_bytes( bytes_data=buf.getvalue(), key=key, bucket_name="company-data-lake", replace=True, )
# Update bookmark max_timestamp = df["created_at"].max().isoformat() Variable.set("payments_last_timestamp", max_timestamp)
return f"Loaded {len(df)} rows to s3://{key}"
with DAG( "payments_etl", default_args=default_args, schedule_interval="@hourly", start_date=datetime(2024, 1, 1), catchup=False, tags=["etl", "payments"],) as dag:
extract = PythonOperator( task_id="extract", python_callable=extract_payments, )
transform = PythonOperator( task_id="transform", python_callable=transform_payments, )
load = PythonOperator( task_id="load", python_callable=load_to_s3, )
extract >> transform >> loadArchivo 4: Configuración de UI de Airflow (pasos manuales)
1. Open http://localhost:80802. Admin → Connections → Add: - Conn ID: s3_data_lake - Conn Type: Amazon Web Services - AWS Access Key ID: **** - AWS Secret Access Key: ****
3. Admin → Variables → Add: - Key: payments_api_key - Value: sk_live_****
4. Admin → Variables → Add: - Key: payments_last_timestamp - Value: 2024-01-01T00:00:00ZTotal: ~200 líneas en 4 archivos, más configuración manual de UI.
La Implementación con DataSpoc Pipe
Archivo 1: pipelines/api-payments.yaml (10 líneas)
pipeline: api-paymentssource: type: rest_api url: "https://api.payments.example.com/v1/transactions" auth: bearer ${PAYMENTS_API_KEY} pagination: cursor incremental: field: created_at strategy: bookmark
target: type: s3 bucket: s3://company-data-lake prefix: raw/paymentsArchivo 2: transforms/api-payments.py (5 líneas)
def transform(df): df = df.rename(columns={"txn_id": "transaction_id", "amt": "amount", "ccy": "currency", "ts": "created_at", "cust_id": "customer_id"}) df = df.dropna(subset=["transaction_id", "amount", "customer_id"]) df["amount"] = df["amount"].astype(float) return dfEjecútalo
export PAYMENTS_API_KEY=sk_live_****dataspoc-pipe run api-paymentsSalida:
[api-payments] Extracting from REST API...[api-payments] Page 1: 1000 records[api-payments] Page 2: 847 records[api-payments] Applying transform: transforms/api-payments.py[api-payments] Filtered: 1847 → 1823 records (24 nulls removed)[api-payments] Writing: s3://company-data-lake/raw/payments/transactions/20260415_100000.parquet[api-payments] Bookmark saved: 2026-04-15T09:58:42Z[api-payments] Done. 1,823 rows, 4.1sTotal: 15 líneas en 2 archivos. Sin infraestructura.
Comparación Lado a Lado
| Dimensión | Airflow | DataSpoc Pipe |
|---|---|---|
| Archivos | 4 (docker-compose, requirements, DAG, configuración manual) | 2 (YAML + transform) |
| Líneas de código | ~200 | ~15 |
| Infraestructura | Postgres + Webserver + Scheduler | Ninguna (solo el CLI) |
| Costo mensual | $200-500 (cluster mínimo viable) | $0 (se ejecuta en cualquier lugar) |
| Tiempo de configuración | 2-4 horas | 5 minutos |
| Dependencias | 6 paquetes Python + Docker | 1 (pip install dataspoc-pipe) |
| Gestión de bookmarks | Manual (Variables de Airflow) | Automática (archivo de estado en bucket) |
| Paginación | Manual (escribir el bucle tú mismo) | Declarativa (pagination: cursor) |
| Gestión de secretos | UI de Airflow + clave Fernet | Variables de entorno |
| Monitoreo | UI de Airflow (requiere cluster corriendo) | Archivos de log en bucket + CLI status |
| Agregar una nueva API | Copiar DAG de 120 líneas, modificar | Copiar YAML de 10 líneas, modificar |
| Programación | DAG schedule_interval | Cron o campo schedule en YAML |
La Comparación de Transformaciones
Airflow te obliga a escribir la transformación cómo una función Python dentro del DAG, pasar datos a través de XCom (serialización JSON) y gestionar estado entre tareas.
Las transformaciones de DataSpoc Pipe son funciónes puras. Entrada: DataFrame. Salida: DataFrame. Sin necesidad de conocer el framework:
# This is the ENTIRE transform filedef transform(df): df = df.rename(columns={"txn_id": "transaction_id", "amt": "amount", "ccy": "currency", "ts": "created_at", "cust_id": "customer_id"}) df = df.dropna(subset=["transaction_id", "amount", "customer_id"]) df["amount"] = df["amount"].astype(float) return dfPuedes probarlo localmente con pandas:
import pandas as pdfrom transforms.api_payments import transform
test_df = pd.DataFrame([ {"txn_id": "t1", "amt": "99.99", "ccy": "USD", "ts": "2026-04-15", "cust_id": "c1"}, {"txn_id": None, "amt": "50.00", "ccy": "EUR", "ts": "2026-04-15", "cust_id": "c2"},])
result = transform(test_df)assert len(result) == 1 # null txn_id filtered outassert result.iloc[0]["amount"] == 99.99 # cast to floatAgregar un Segundo Pipeline
Airflow: Copiar, Modificar, Depurar
Copias el DAG de 120 líneas, renombras variables, cambias el endpoint de la API, ajustas la lógica de transformación, agregas una nueva conexión en la UI, agregas nuevas variables. Repetir para cada fuente.
DataSpoc Pipe: Copiar, Modificar, Listo
pipeline: api-crmsource: type: rest_api url: "https://api.crm.example.com/v2/contacts" auth: bearer ${CRM_API_KEY} pagination: offset incremental: field: updated_at strategy: bookmark
target: type: s3 bucket: s3://company-data-lake prefix: raw/crmdataspoc-pipe run api-crmDos minutos, incluyendo escribir el YAML.
Cuándo Airflow ES la Elección Correcta
Seamos justos. Airflow gana cuando necesitas:
- Dependencias complejas de DAG — La tarea A alimenta la tarea B que alimenta las tareas C y D en paralelo, con ramificación condicional y lógica de reintento por tarea
- Orquestación entre sistemas — disparar un trabajo Spark, esperar a que termine, luego disparar una ejecución de dbt, luego notificar a Slack, luego disparar un reentrenamiento de modelo
- Gobernanza empresarial — logs de auditoría, RBAC, flujos de aprobación, monitoreo de SLA
- Equipo de 10+ ingenieros de datos — plataforma central de orquestación con visibilidad compartida
- Inversión existente en Airflow — ya tienes 200 DAGs corriendo, agregar uno más es costo marginal
DataSpoc Pipe es para:
- Ingesta — llevar datos de la fuente a Parquet en bucket
- Equipos pequeños a medianos (1-5 ingenieros de datos)
- Velocidad — de prototipo a producción en minutos, no días
- Simplicidad — sin infraestructura que mantener
- Costo — $0 en lugar de $200-500/mes por un cluster de Airflow
El Enfoque Híbrido
Puedes usar ambos. Pipe maneja la ingesta (la parte aburrida y repetitiva). Airflow maneja la orquestación (la parte compleja y personalizada):
# dags/daily_pipeline.py — Airflow DAG that calls Pipefrom airflow import DAGfrom airflow.operators.bash import BashOperatorfrom datetime import datetime
with DAG("daily_ingestion", schedule_interval="@daily", start_date=datetime(2024, 1, 1)) as dag:
ingest = BashOperator( task_id="run_all_pipes", bash_command="dataspoc-pipe run --all", )
dbt_transform = BashOperator( task_id="dbt_run", bash_command="dbt run --project-dir /opt/dbt", )
notify = BashOperator( task_id="slack_notify", bash_command='curl -X POST $SLACK_WEBHOOK -d \'{"text": "Pipeline complete"}\'', )
ingest >> dbt_transform >> notifyLo mejor de ambos mundos: la simplicidad de Pipe para ingesta, el poder de Airflow para orquestación.
Conclusión
Para el 90% de las tareas de ingesta de datos, no necesitas un DAG de 200 líneas y un cluster de Kubernetes corriendo Airflow. Necesitas 10 líneas de YAML y un pip install.
pip install dataspoc-pipedataspoc-pipe init my-project --bucket s3://my-bucketdataspoc-pipe add api-payments --source rest_apidataspoc-pipe run api-paymentsCuatro comandos. Cero infraestructura. Mismo resultado.