airflowetlyamldata-engineeringcomparison

10 Lines of YAML vs 200 Lines of Airflow DAG

Michael San Martim · 2026-04-28

Let us build the same ETL pipeline twice. Once with Apache Airflow. Once with DataSpoc Pipe. Same input, same output, same result. You decide which one you would rather maintain at 2am when the on-call pager goes off.

The Use Case

Extract data from a REST API (a payments service), transform it (rename columns, filter nulls, cast types), and load it to S3 as Parquet.

Source: https://api.payments.example.com/v1/transactions Target: s3://company-data-lake/raw/payments/transactions/ Schedule: Every hour Incremental: Only fetch transactions since last run

The Airflow Implementation

File 1: docker-compose.yaml (45 lines)

docker-compose.yaml
version: "3.8"
services:
postgres:
image: postgres:15
environment:
POSTGRES_USER: airflow
POSTGRES_PASSWORD: airflow
POSTGRES_DB: airflow
volumes:
- postgres-data:/var/lib/postgresql/data
webserver:
image: apache/airflow:2.8.0
command: webserver
ports:
- "8080:8080"
environment:
AIRFLOW__CORE__EXECUTOR: LocalExecutor
AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
AIRFLOW__CORE__FERNET_KEY: "your-fernet-key-here"
volumes:
- ./dags:/opt/airflow/dags
- ./plugins:/opt/airflow/plugins
depends_on:
- postgres
scheduler:
image: apache/airflow:2.8.0
command: scheduler
environment:
AIRFLOW__CORE__EXECUTOR: LocalExecutor
AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
AIRFLOW__CORE__FERNET_KEY: "your-fernet-key-here"
volumes:
- ./dags:/opt/airflow/dags
- ./plugins:/opt/airflow/plugins
depends_on:
- postgres
- webserver
volumes:
postgres-data:

File 2: requirements.txt (6 lines)

apache-airflow==2.8.0
apache-airflow-providers-amazon==8.0.0
requests==2.31.0
pyarrow==14.0.0
pandas==2.1.0
boto3==1.34.0

File 3: dags/payments_etl.py (120 lines)

dags/payments_etl.py
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.models import Variable
from datetime import datetime, timedelta
import requests
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
import json
import io
default_args = {
"owner": "data-engineering",
"depends_on_past": True,
"email_on_failure": True,
"email": ["oncall@company.com"],
"retries": 3,
"retry_delay": timedelta(minutes=5),
}
def extract_payments(**context):
"""Extract transactions from payments API."""
api_key = Variable.get("payments_api_key")
base_url = "https://api.payments.example.com/v1/transactions"
# Get bookmark (last successful extraction timestamp)
try:
bookmark = Variable.get("payments_last_timestamp")
except KeyError:
bookmark = "2024-01-01T00:00:00Z"
headers = {"Authorization": f"Bearer {api_key}"}
params = {
"created_after": bookmark,
"limit": 1000,
}
all_records = []
page = 1
while True:
params["page"] = page
response = requests.get(base_url, headers=headers, params=params)
response.raise_for_status()
data = response.json()
records = data.get("transactions", [])
if not records:
break
all_records.extend(records)
page += 1
if not data.get("has_more", False):
break
if not all_records:
return "No new data"
# Push to XCom for next task
context["ti"].xcom_push(key="raw_data", value=json.dumps(all_records))
context["ti"].xcom_push(key="record_count", value=len(all_records))
return f"Extracted {len(all_records)} records"
def transform_payments(**context):
"""Transform: rename columns, filter nulls, cast types."""
raw_json = context["ti"].xcom_pull(key="raw_data", task_ids="extract")
records = json.loads(raw_json)
df = pd.DataFrame(records)
# Rename columns
df = df.rename(columns={
"txn_id": "transaction_id",
"amt": "amount",
"ccy": "currency",
"ts": "created_at",
"cust_id": "customer_id",
})
# Filter nulls
df = df.dropna(subset=["transaction_id", "amount", "customer_id"])
# Cast types
df["amount"] = df["amount"].astype(float)
df["created_at"] = pd.to_datetime(df["created_at"])
df["customer_id"] = df["customer_id"].astype(str)
context["ti"].xcom_push(key="transformed_data", value=df.to_json())
return f"Transformed {len(df)} records"
def load_to_s3(**context):
"""Load transformed data to S3 as Parquet."""
df_json = context["ti"].xcom_pull(key="transformed_data", task_ids="transform")
df = pd.read_json(df_json)
table = pa.Table.from_pandas(df)
buf = io.BytesIO()
pq.write_table(table, buf)
buf.seek(0)
execution_date = context["execution_date"].strftime("%Y%m%d_%H%M%S")
key = f"raw/payments/transactions/{execution_date}.parquet"
s3_hook = S3Hook(aws_conn_id="s3_data_lake")
s3_hook.load_bytes(
bytes_data=buf.getvalue(),
key=key,
bucket_name="company-data-lake",
replace=True,
)
# Update bookmark
max_timestamp = df["created_at"].max().isoformat()
Variable.set("payments_last_timestamp", max_timestamp)
return f"Loaded {len(df)} rows to s3://{key}"
with DAG(
"payments_etl",
default_args=default_args,
schedule_interval="@hourly",
start_date=datetime(2024, 1, 1),
catchup=False,
tags=["etl", "payments"],
) as dag:
extract = PythonOperator(
task_id="extract",
python_callable=extract_payments,
)
transform = PythonOperator(
task_id="transform",
python_callable=transform_payments,
)
load = PythonOperator(
task_id="load",
python_callable=load_to_s3,
)
extract >> transform >> load

File 4: Airflow UI Setup (manual steps)

1. Open http://localhost:8080
2. Admin → Connections → Add:
- Conn ID: s3_data_lake
- Conn Type: Amazon Web Services
- AWS Access Key ID: ****
- AWS Secret Access Key: ****
3. Admin → Variables → Add:
- Key: payments_api_key
- Value: sk_live_****
4. Admin → Variables → Add:
- Key: payments_last_timestamp
- Value: 2024-01-01T00:00:00Z

Total: ~200 lines across 4 files, plus manual UI configuration.


The DataSpoc Pipe Implementation

File 1: pipelines/api-payments.yaml (10 lines)

pipeline: api-payments
source:
type: rest_api
url: "https://api.payments.example.com/v1/transactions"
auth: bearer ${PAYMENTS_API_KEY}
pagination: cursor
incremental:
field: created_at
strategy: bookmark
target:
type: s3
bucket: s3://company-data-lake
prefix: raw/payments

File 2: transforms/api-payments.py (5 lines)

def transform(df):
df = df.rename(columns={"txn_id": "transaction_id", "amt": "amount", "ccy": "currency", "ts": "created_at", "cust_id": "customer_id"})
df = df.dropna(subset=["transaction_id", "amount", "customer_id"])
df["amount"] = df["amount"].astype(float)
return df

Run it

Terminal window
export PAYMENTS_API_KEY=sk_live_****
dataspoc-pipe run api-payments

Output:

[api-payments] Extracting from REST API...
[api-payments] Page 1: 1000 records
[api-payments] Page 2: 847 records
[api-payments] Applying transform: transforms/api-payments.py
[api-payments] Filtered: 1847 → 1823 records (24 nulls removed)
[api-payments] Writing: s3://company-data-lake/raw/payments/transactions/20260415_100000.parquet
[api-payments] Bookmark saved: 2026-04-15T09:58:42Z
[api-payments] Done. 1,823 rows, 4.1s

Total: 15 lines across 2 files. No infrastructure.


Side-by-Side Comparison

DimensionAirflowDataSpoc Pipe
Files4 (docker-compose, requirements, DAG, manual setup)2 (YAML + transform)
Lines of code~200~15
InfrastructurePostgres + Webserver + SchedulerNone (just the CLI)
Monthly cost$200-500 (minimum viable cluster)$0 (runs anywhere)
Setup time2-4 hours5 minutes
Dependencies6 Python packages + Docker1 (pip install dataspoc-pipe)
Bookmark managementManual (Airflow Variables)Automatic (state file in bucket)
PaginationManual (write the loop yourself)Declarative (pagination: cursor)
Secret managementAirflow UI + Fernet keyEnvironment variables
MonitoringAirflow UI (requires running cluster)Log files in bucket + CLI status
Adding a new APICopy 120-line DAG, modifyCopy 10-line YAML, modify
SchedulingDAG schedule_intervalCron or YAML schedule field

The Transform Comparison

Airflow makes you write the transform as a Python function inside the DAG, pass data through XCom (JSON serialization), and manage state across tasks.

DataSpoc Pipe transforms are pure functions. Input: DataFrame. Output: DataFrame. No framework awareness needed:

# This is the ENTIRE transform file
def transform(df):
df = df.rename(columns={"txn_id": "transaction_id", "amt": "amount", "ccy": "currency", "ts": "created_at", "cust_id": "customer_id"})
df = df.dropna(subset=["transaction_id", "amount", "customer_id"])
df["amount"] = df["amount"].astype(float)
return df

You can test it locally with pandas:

import pandas as pd
from transforms.api_payments import transform
test_df = pd.DataFrame([
{"txn_id": "t1", "amt": "99.99", "ccy": "USD", "ts": "2026-04-15", "cust_id": "c1"},
{"txn_id": None, "amt": "50.00", "ccy": "EUR", "ts": "2026-04-15", "cust_id": "c2"},
])
result = transform(test_df)
assert len(result) == 1 # null txn_id filtered out
assert result.iloc[0]["amount"] == 99.99 # cast to float

Adding a Second Pipeline

Airflow: Copy, Modify, Debug

You copy the 120-line DAG, rename variables, change the API endpoint, adjust the transform logic, add a new connection in the UI, add new variables. Repeat for every source.

DataSpoc Pipe: Copy, Modify, Done

pipelines/api-crm.yaml
pipeline: api-crm
source:
type: rest_api
url: "https://api.crm.example.com/v2/contacts"
auth: bearer ${CRM_API_KEY}
pagination: offset
incremental:
field: updated_at
strategy: bookmark
target:
type: s3
bucket: s3://company-data-lake
prefix: raw/crm
Terminal window
dataspoc-pipe run api-crm

Two minutes, including writing the YAML.

When Airflow IS the Right Choice

Let us be fair. Airflow wins when you need:

  • Complex DAG dependencies — Task A feeds Task B which feeds Tasks C and D in parallel, with conditional branching and retry logic per task
  • Cross-system orchestration — trigger a Spark job, wait for it, then trigger a dbt run, then notify Slack, then trigger a model retrain
  • Enterprise governance — audit logs, RBAC, approval workflows, SLA monitoring
  • Team of 10+ data engineers — central orchestration platform with shared visibility
  • Existing Airflow investment — you already have 200 DAGs running, adding one more is marginal cost

DataSpoc Pipe is for:

  • Ingestion — get data from source to Parquet in bucket
  • Small to medium teams (1-5 data engineers)
  • Speed — prototype to production in minutes, not days
  • Simplicity — no infrastructure to maintain
  • Cost — $0 instead of $200-500/month for an Airflow cluster

The Hybrid Approach

You can use both. Pipe handles ingestion (the boring, repetitive part). Airflow handles orchestration (the complex, custom part):

# dags/daily_pipeline.py — Airflow DAG that calls Pipe
from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime
with DAG("daily_ingestion", schedule_interval="@daily", start_date=datetime(2024, 1, 1)) as dag:
ingest = BashOperator(
task_id="run_all_pipes",
bash_command="dataspoc-pipe run --all",
)
dbt_transform = BashOperator(
task_id="dbt_run",
bash_command="dbt run --project-dir /opt/dbt",
)
notify = BashOperator(
task_id="slack_notify",
bash_command='curl -X POST $SLACK_WEBHOOK -d \'{"text": "Pipeline complete"}\'',
)
ingest >> dbt_transform >> notify

Best of both worlds: Pipe’s simplicity for ingestion, Airflow’s power for orchestration.

Conclusion

For 90% of data ingestion tasks, you do not need a 200-line DAG and a Kubernetes cluster running Airflow. You need 10 lines of YAML and a pip install.

Terminal window
pip install dataspoc-pipe
dataspoc-pipe init my-project --bucket s3://my-bucket
dataspoc-pipe add api-payments --source rest_api
dataspoc-pipe run api-payments

Four commands. Zero infrastructure. Same result.

Recommended