10 Lines of YAML vs 200 Lines of Airflow DAG
Let us build the same ETL pipeline twice. Once with Apache Airflow. Once with DataSpoc Pipe. Same input, same output, same result. You decide which one you would rather maintain at 2am when the on-call pager goes off.
The Use Case
Extract data from a REST API (a payments service), transform it (rename columns, filter nulls, cast types), and load it to S3 as Parquet.
Source: https://api.payments.example.com/v1/transactions
Target: s3://company-data-lake/raw/payments/transactions/
Schedule: Every hour
Incremental: Only fetch transactions since last run
The Airflow Implementation
File 1: docker-compose.yaml (45 lines)
version: "3.8"services: postgres: image: postgres:15 environment: POSTGRES_USER: airflow POSTGRES_PASSWORD: airflow POSTGRES_DB: airflow volumes: - postgres-data:/var/lib/postgresql/data
webserver: image: apache/airflow:2.8.0 command: webserver ports: - "8080:8080" environment: AIRFLOW__CORE__EXECUTOR: LocalExecutor AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow AIRFLOW__CORE__FERNET_KEY: "your-fernet-key-here" volumes: - ./dags:/opt/airflow/dags - ./plugins:/opt/airflow/plugins depends_on: - postgres
scheduler: image: apache/airflow:2.8.0 command: scheduler environment: AIRFLOW__CORE__EXECUTOR: LocalExecutor AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow AIRFLOW__CORE__FERNET_KEY: "your-fernet-key-here" volumes: - ./dags:/opt/airflow/dags - ./plugins:/opt/airflow/plugins depends_on: - postgres - webserver
volumes: postgres-data:File 2: requirements.txt (6 lines)
apache-airflow==2.8.0apache-airflow-providers-amazon==8.0.0requests==2.31.0pyarrow==14.0.0pandas==2.1.0boto3==1.34.0File 3: dags/payments_etl.py (120 lines)
from airflow import DAGfrom airflow.operators.python import PythonOperatorfrom airflow.providers.amazon.aws.hooks.s3 import S3Hookfrom airflow.models import Variablefrom datetime import datetime, timedeltaimport requestsimport pandas as pdimport pyarrow as paimport pyarrow.parquet as pqimport jsonimport io
default_args = { "owner": "data-engineering", "depends_on_past": True, "email_on_failure": True, "email": ["oncall@company.com"], "retries": 3, "retry_delay": timedelta(minutes=5),}
def extract_payments(**context): """Extract transactions from payments API.""" api_key = Variable.get("payments_api_key") base_url = "https://api.payments.example.com/v1/transactions"
# Get bookmark (last successful extraction timestamp) try: bookmark = Variable.get("payments_last_timestamp") except KeyError: bookmark = "2024-01-01T00:00:00Z"
headers = {"Authorization": f"Bearer {api_key}"} params = { "created_after": bookmark, "limit": 1000, }
all_records = [] page = 1
while True: params["page"] = page response = requests.get(base_url, headers=headers, params=params) response.raise_for_status() data = response.json()
records = data.get("transactions", []) if not records: break
all_records.extend(records) page += 1
if not data.get("has_more", False): break
if not all_records: return "No new data"
# Push to XCom for next task context["ti"].xcom_push(key="raw_data", value=json.dumps(all_records)) context["ti"].xcom_push(key="record_count", value=len(all_records))
return f"Extracted {len(all_records)} records"
def transform_payments(**context): """Transform: rename columns, filter nulls, cast types.""" raw_json = context["ti"].xcom_pull(key="raw_data", task_ids="extract") records = json.loads(raw_json)
df = pd.DataFrame(records)
# Rename columns df = df.rename(columns={ "txn_id": "transaction_id", "amt": "amount", "ccy": "currency", "ts": "created_at", "cust_id": "customer_id", })
# Filter nulls df = df.dropna(subset=["transaction_id", "amount", "customer_id"])
# Cast types df["amount"] = df["amount"].astype(float) df["created_at"] = pd.to_datetime(df["created_at"]) df["customer_id"] = df["customer_id"].astype(str)
context["ti"].xcom_push(key="transformed_data", value=df.to_json()) return f"Transformed {len(df)} records"
def load_to_s3(**context): """Load transformed data to S3 as Parquet.""" df_json = context["ti"].xcom_pull(key="transformed_data", task_ids="transform") df = pd.read_json(df_json)
table = pa.Table.from_pandas(df) buf = io.BytesIO() pq.write_table(table, buf) buf.seek(0)
execution_date = context["execution_date"].strftime("%Y%m%d_%H%M%S") key = f"raw/payments/transactions/{execution_date}.parquet"
s3_hook = S3Hook(aws_conn_id="s3_data_lake") s3_hook.load_bytes( bytes_data=buf.getvalue(), key=key, bucket_name="company-data-lake", replace=True, )
# Update bookmark max_timestamp = df["created_at"].max().isoformat() Variable.set("payments_last_timestamp", max_timestamp)
return f"Loaded {len(df)} rows to s3://{key}"
with DAG( "payments_etl", default_args=default_args, schedule_interval="@hourly", start_date=datetime(2024, 1, 1), catchup=False, tags=["etl", "payments"],) as dag:
extract = PythonOperator( task_id="extract", python_callable=extract_payments, )
transform = PythonOperator( task_id="transform", python_callable=transform_payments, )
load = PythonOperator( task_id="load", python_callable=load_to_s3, )
extract >> transform >> loadFile 4: Airflow UI Setup (manual steps)
1. Open http://localhost:80802. Admin → Connections → Add: - Conn ID: s3_data_lake - Conn Type: Amazon Web Services - AWS Access Key ID: **** - AWS Secret Access Key: ****
3. Admin → Variables → Add: - Key: payments_api_key - Value: sk_live_****
4. Admin → Variables → Add: - Key: payments_last_timestamp - Value: 2024-01-01T00:00:00ZTotal: ~200 lines across 4 files, plus manual UI configuration.
The DataSpoc Pipe Implementation
File 1: pipelines/api-payments.yaml (10 lines)
pipeline: api-paymentssource: type: rest_api url: "https://api.payments.example.com/v1/transactions" auth: bearer ${PAYMENTS_API_KEY} pagination: cursor incremental: field: created_at strategy: bookmark
target: type: s3 bucket: s3://company-data-lake prefix: raw/paymentsFile 2: transforms/api-payments.py (5 lines)
def transform(df): df = df.rename(columns={"txn_id": "transaction_id", "amt": "amount", "ccy": "currency", "ts": "created_at", "cust_id": "customer_id"}) df = df.dropna(subset=["transaction_id", "amount", "customer_id"]) df["amount"] = df["amount"].astype(float) return dfRun it
export PAYMENTS_API_KEY=sk_live_****dataspoc-pipe run api-paymentsOutput:
[api-payments] Extracting from REST API...[api-payments] Page 1: 1000 records[api-payments] Page 2: 847 records[api-payments] Applying transform: transforms/api-payments.py[api-payments] Filtered: 1847 → 1823 records (24 nulls removed)[api-payments] Writing: s3://company-data-lake/raw/payments/transactions/20260415_100000.parquet[api-payments] Bookmark saved: 2026-04-15T09:58:42Z[api-payments] Done. 1,823 rows, 4.1sTotal: 15 lines across 2 files. No infrastructure.
Side-by-Side Comparison
| Dimension | Airflow | DataSpoc Pipe |
|---|---|---|
| Files | 4 (docker-compose, requirements, DAG, manual setup) | 2 (YAML + transform) |
| Lines of code | ~200 | ~15 |
| Infrastructure | Postgres + Webserver + Scheduler | None (just the CLI) |
| Monthly cost | $200-500 (minimum viable cluster) | $0 (runs anywhere) |
| Setup time | 2-4 hours | 5 minutes |
| Dependencies | 6 Python packages + Docker | 1 (pip install dataspoc-pipe) |
| Bookmark management | Manual (Airflow Variables) | Automatic (state file in bucket) |
| Pagination | Manual (write the loop yourself) | Declarative (pagination: cursor) |
| Secret management | Airflow UI + Fernet key | Environment variables |
| Monitoring | Airflow UI (requires running cluster) | Log files in bucket + CLI status |
| Adding a new API | Copy 120-line DAG, modify | Copy 10-line YAML, modify |
| Scheduling | DAG schedule_interval | Cron or YAML schedule field |
The Transform Comparison
Airflow makes you write the transform as a Python function inside the DAG, pass data through XCom (JSON serialization), and manage state across tasks.
DataSpoc Pipe transforms are pure functions. Input: DataFrame. Output: DataFrame. No framework awareness needed:
# This is the ENTIRE transform filedef transform(df): df = df.rename(columns={"txn_id": "transaction_id", "amt": "amount", "ccy": "currency", "ts": "created_at", "cust_id": "customer_id"}) df = df.dropna(subset=["transaction_id", "amount", "customer_id"]) df["amount"] = df["amount"].astype(float) return dfYou can test it locally with pandas:
import pandas as pdfrom transforms.api_payments import transform
test_df = pd.DataFrame([ {"txn_id": "t1", "amt": "99.99", "ccy": "USD", "ts": "2026-04-15", "cust_id": "c1"}, {"txn_id": None, "amt": "50.00", "ccy": "EUR", "ts": "2026-04-15", "cust_id": "c2"},])
result = transform(test_df)assert len(result) == 1 # null txn_id filtered outassert result.iloc[0]["amount"] == 99.99 # cast to floatAdding a Second Pipeline
Airflow: Copy, Modify, Debug
You copy the 120-line DAG, rename variables, change the API endpoint, adjust the transform logic, add a new connection in the UI, add new variables. Repeat for every source.
DataSpoc Pipe: Copy, Modify, Done
pipeline: api-crmsource: type: rest_api url: "https://api.crm.example.com/v2/contacts" auth: bearer ${CRM_API_KEY} pagination: offset incremental: field: updated_at strategy: bookmark
target: type: s3 bucket: s3://company-data-lake prefix: raw/crmdataspoc-pipe run api-crmTwo minutes, including writing the YAML.
When Airflow IS the Right Choice
Let us be fair. Airflow wins when you need:
- Complex DAG dependencies — Task A feeds Task B which feeds Tasks C and D in parallel, with conditional branching and retry logic per task
- Cross-system orchestration — trigger a Spark job, wait for it, then trigger a dbt run, then notify Slack, then trigger a model retrain
- Enterprise governance — audit logs, RBAC, approval workflows, SLA monitoring
- Team of 10+ data engineers — central orchestration platform with shared visibility
- Existing Airflow investment — you already have 200 DAGs running, adding one more is marginal cost
DataSpoc Pipe is for:
- Ingestion — get data from source to Parquet in bucket
- Small to medium teams (1-5 data engineers)
- Speed — prototype to production in minutes, not days
- Simplicity — no infrastructure to maintain
- Cost — $0 instead of $200-500/month for an Airflow cluster
The Hybrid Approach
You can use both. Pipe handles ingestion (the boring, repetitive part). Airflow handles orchestration (the complex, custom part):
# dags/daily_pipeline.py — Airflow DAG that calls Pipefrom airflow import DAGfrom airflow.operators.bash import BashOperatorfrom datetime import datetime
with DAG("daily_ingestion", schedule_interval="@daily", start_date=datetime(2024, 1, 1)) as dag:
ingest = BashOperator( task_id="run_all_pipes", bash_command="dataspoc-pipe run --all", )
dbt_transform = BashOperator( task_id="dbt_run", bash_command="dbt run --project-dir /opt/dbt", )
notify = BashOperator( task_id="slack_notify", bash_command='curl -X POST $SLACK_WEBHOOK -d \'{"text": "Pipeline complete"}\'', )
ingest >> dbt_transform >> notifyBest of both worlds: Pipe’s simplicity for ingestion, Airflow’s power for orchestration.
Conclusion
For 90% of data ingestion tasks, you do not need a 200-line DAG and a Kubernetes cluster running Airflow. You need 10 lines of YAML and a pip install.
pip install dataspoc-pipedataspoc-pipe init my-project --bucket s3://my-bucketdataspoc-pipe add api-payments --source rest_apidataspoc-pipe run api-paymentsFour commands. Zero infrastructure. Same result.