airflowetlyamldata-engineeringcomparison

10 Linhas de YAML vs 200 Linhas de DAG Airflow

Michael San Martim · 2026-04-28

Vamos construir o mesmo pipeline ETL duas vezes. Uma com Apache Airflow. Uma com DataSpoc Pipe. Mesma entrada, mesma saída, mesmo resultado. Você decide qual prefere manter às 2h da manhã quando o pager de plantão toca.

O Caso de Uso

Extrair dados de uma API REST (um serviço de pagamentos), transformar (renomear colunas, filtrar nulos, converter tipos) e carregar no S3 como Parquet.

Source: https://api.payments.example.com/v1/transactions Target: s3://company-data-lake/raw/payments/transactions/ Schedule: A cada hora Incremental: Apenas buscar transações desde a última execução

A Implementação Airflow

Arquivo 1: docker-compose.yaml (45 linhas)

docker-compose.yaml
version: "3.8"
services:
postgres:
image: postgres:15
environment:
POSTGRES_USER: airflow
POSTGRES_PASSWORD: airflow
POSTGRES_DB: airflow
volumes:
- postgres-data:/var/lib/postgresql/data
webserver:
image: apache/airflow:2.8.0
command: webserver
ports:
- "8080:8080"
environment:
AIRFLOW__CORE__EXECUTOR: LocalExecutor
AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
AIRFLOW__CORE__FERNET_KEY: "your-fernet-key-here"
volumes:
- ./dags:/opt/airflow/dags
- ./plugins:/opt/airflow/plugins
depends_on:
- postgres
scheduler:
image: apache/airflow:2.8.0
command: scheduler
environment:
AIRFLOW__CORE__EXECUTOR: LocalExecutor
AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
AIRFLOW__CORE__FERNET_KEY: "your-fernet-key-here"
volumes:
- ./dags:/opt/airflow/dags
- ./plugins:/opt/airflow/plugins
depends_on:
- postgres
- webserver
volumes:
postgres-data:

Arquivo 2: requirements.txt (6 linhas)

apache-airflow==2.8.0
apache-airflow-providers-amazon==8.0.0
requests==2.31.0
pyarrow==14.0.0
pandas==2.1.0
boto3==1.34.0

Arquivo 3: dags/payments_etl.py (120 linhas)

dags/payments_etl.py
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.models import Variable
from datetime import datetime, timedelta
import requests
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
import json
import io
default_args = {
"owner": "data-engineering",
"depends_on_past": True,
"email_on_failure": True,
"email": ["oncall@company.com"],
"retries": 3,
"retry_delay": timedelta(minutes=5),
}
def extract_payments(**context):
"""Extract transactions from payments API."""
api_key = Variable.get("payments_api_key")
base_url = "https://api.payments.example.com/v1/transactions"
# Get bookmark (last successful extraction timestamp)
try:
bookmark = Variable.get("payments_last_timestamp")
except KeyError:
bookmark = "2024-01-01T00:00:00Z"
headers = {"Authorization": f"Bearer {api_key}"}
params = {
"created_after": bookmark,
"limit": 1000,
}
all_records = []
page = 1
while True:
params["page"] = page
response = requests.get(base_url, headers=headers, params=params)
response.raise_for_status()
data = response.json()
records = data.get("transactions", [])
if not records:
break
all_records.extend(records)
page += 1
if not data.get("has_more", False):
break
if not all_records:
return "No new data"
# Push to XCom for next task
context["ti"].xcom_push(key="raw_data", value=json.dumps(all_records))
context["ti"].xcom_push(key="record_count", value=len(all_records))
return f"Extracted {len(all_records)} records"
def transform_payments(**context):
"""Transform: rename columns, filter nulls, cast types."""
raw_json = context["ti"].xcom_pull(key="raw_data", task_ids="extract")
records = json.loads(raw_json)
df = pd.DataFrame(records)
# Rename columns
df = df.rename(columns={
"txn_id": "transaction_id",
"amt": "amount",
"ccy": "currency",
"ts": "created_at",
"cust_id": "customer_id",
})
# Filter nulls
df = df.dropna(subset=["transaction_id", "amount", "customer_id"])
# Cast types
df["amount"] = df["amount"].astype(float)
df["created_at"] = pd.to_datetime(df["created_at"])
df["customer_id"] = df["customer_id"].astype(str)
context["ti"].xcom_push(key="transformed_data", value=df.to_json())
return f"Transformed {len(df)} records"
def load_to_s3(**context):
"""Load transformed data to S3 as Parquet."""
df_json = context["ti"].xcom_pull(key="transformed_data", task_ids="transform")
df = pd.read_json(df_json)
table = pa.Table.from_pandas(df)
buf = io.BytesIO()
pq.write_table(table, buf)
buf.seek(0)
execution_date = context["execution_date"].strftime("%Y%m%d_%H%M%S")
key = f"raw/payments/transactions/{execution_date}.parquet"
s3_hook = S3Hook(aws_conn_id="s3_data_lake")
s3_hook.load_bytes(
bytes_data=buf.getvalue(),
key=key,
bucket_name="company-data-lake",
replace=True,
)
# Update bookmark
max_timestamp = df["created_at"].max().isoformat()
Variable.set("payments_last_timestamp", max_timestamp)
return f"Loaded {len(df)} rows to s3://{key}"
with DAG(
"payments_etl",
default_args=default_args,
schedule_interval="@hourly",
start_date=datetime(2024, 1, 1),
catchup=False,
tags=["etl", "payments"],
) as dag:
extract = PythonOperator(
task_id="extract",
python_callable=extract_payments,
)
transform = PythonOperator(
task_id="transform",
python_callable=transform_payments,
)
load = PythonOperator(
task_id="load",
python_callable=load_to_s3,
)
extract >> transform >> load

Arquivo 4: Setup UI do Airflow (passos manuais)

1. Open http://localhost:8080
2. Admin → Connections → Add:
- Conn ID: s3_data_lake
- Conn Type: Amazon Web Services
- AWS Access Key ID: ****
- AWS Secret Access Key: ****
3. Admin → Variables → Add:
- Key: payments_api_key
- Value: sk_live_****
4. Admin → Variables → Add:
- Key: payments_last_timestamp
- Value: 2024-01-01T00:00:00Z

Total: ~200 linhas em 4 arquivos, mais configuração manual na UI.


A Implementação DataSpoc Pipe

Arquivo 1: pipelines/api-payments.yaml (10 linhas)

pipeline: api-payments
source:
type: rest_api
url: "https://api.payments.example.com/v1/transactions"
auth: bearer ${PAYMENTS_API_KEY}
pagination: cursor
incremental:
field: created_at
strategy: bookmark
target:
type: s3
bucket: s3://company-data-lake
prefix: raw/payments

Arquivo 2: transforms/api-payments.py (5 linhas)

def transform(df):
df = df.rename(columns={"txn_id": "transaction_id", "amt": "amount", "ccy": "currency", "ts": "created_at", "cust_id": "customer_id"})
df = df.dropna(subset=["transaction_id", "amount", "customer_id"])
df["amount"] = df["amount"].astype(float)
return df

Execute

Terminal window
export PAYMENTS_API_KEY=sk_live_****
dataspoc-pipe run api-payments

Saída:

[api-payments] Extracting from REST API...
[api-payments] Page 1: 1000 records
[api-payments] Page 2: 847 records
[api-payments] Applying transform: transforms/api-payments.py
[api-payments] Filtered: 1847 → 1823 records (24 nulls removed)
[api-payments] Writing: s3://company-data-lake/raw/payments/transactions/20260415_100000.parquet
[api-payments] Bookmark saved: 2026-04-15T09:58:42Z
[api-payments] Done. 1,823 rows, 4.1s

Total: 15 linhas em 2 arquivos. Sem infraestrutura.


Comparação Lado a Lado

DimensãoAirflowDataSpoc Pipe
Arquivos4 (docker-compose, requirements, DAG, setup manual)2 (YAML + transform)
Linhas de código~200~15
InfraestruturaPostgres + Webserver + SchedulerNenhuma (apenas o CLI)
Custo mensal$200-500 (cluster mínimo viável)$0 (roda em qualquer lugar)
Tempo de setup2-4 horas5 minutos
Dependências6 pacotes Python + Docker1 (pip install dataspoc-pipe)
Gestão de bookmarkManual (Airflow Variables)Automático (arquivo de estado no bucket)
PaginaçãoManual (escrever o loop você mesmo)Declarativo (pagination: cursor)
Gestão de secretsAirflow UI + chave FernetVariáveis de ambiente
MonitoramentoAirflow UI (requer cluster rodando)Arquivos de log no bucket + CLI status
Adicionar nova APICopiar DAG de 120 linhas, modificarCopiar YAML de 10 linhas, modificar
AgendamentoDAG schedule_intervalCron ou campo schedule no YAML

A Comparação de Transform

Airflow faz você escrever o transform como uma função Python dentro da DAG, passar dados pelo XCom (serialização JSON) e gerenciar estado entre tarefas.

Transforms do DataSpoc Pipe são funções puras. Entrada: DataFrame. Saída: DataFrame. Sem necessidade de conhecimento do framework:

# This is the ENTIRE transform file
def transform(df):
df = df.rename(columns={"txn_id": "transaction_id", "amt": "amount", "ccy": "currency", "ts": "created_at", "cust_id": "customer_id"})
df = df.dropna(subset=["transaction_id", "amount", "customer_id"])
df["amount"] = df["amount"].astype(float)
return df

Você pode testá-lo localmente com pandas:

import pandas as pd
from transforms.api_payments import transform
test_df = pd.DataFrame([
{"txn_id": "t1", "amt": "99.99", "ccy": "USD", "ts": "2026-04-15", "cust_id": "c1"},
{"txn_id": None, "amt": "50.00", "ccy": "EUR", "ts": "2026-04-15", "cust_id": "c2"},
])
result = transform(test_df)
assert len(result) == 1 # null txn_id filtered out
assert result.iloc[0]["amount"] == 99.99 # cast to float

Adicionando um Segundo Pipeline

Airflow: Copiar, Modificar, Depurar

Você copia a DAG de 120 linhas, renomeia variáveis, muda o endpoint da API, ajusta a lógica de transform, adiciona uma nova conexão na UI, adiciona novas variáveis. Repita para cada fonte.

DataSpoc Pipe: Copiar, Modificar, Pronto

pipelines/api-crm.yaml
pipeline: api-crm
source:
type: rest_api
url: "https://api.crm.example.com/v2/contacts"
auth: bearer ${CRM_API_KEY}
pagination: offset
incremental:
field: updated_at
strategy: bookmark
target:
type: s3
bucket: s3://company-data-lake
prefix: raw/crm
Terminal window
dataspoc-pipe run api-crm

Dois minutos, incluindo escrever o YAML.

Quando Airflow É a Escolha Certa

Sejamos justos. Airflow ganha quando você precisa de:

  • Dependências complexas de DAG — Tarefa A alimenta Tarefa B que alimenta Tarefas C e D em paralelo, com branching condicional e lógica de retry por tarefa
  • Orquestração cross-system — disparar um job Spark, esperar, então disparar um dbt run, então notificar Slack, então disparar retrain de modelo
  • Governança enterprise — logs de auditoria, RBAC, workflows de aprovação, monitoramento de SLA
  • Time de 10+ engenheiros de dados — plataforma de orquestração central com visibilidade compartilhada
  • Investimento existente em Airflow — você já tem 200 DAGs rodando, adicionar mais uma é custo marginal

DataSpoc Pipe é para:

  • Ingestão — levar dados da fonte para Parquet no bucket
  • Times pequenos a médios (1-5 engenheiros de dados)
  • Velocidade — protótipo a produção em minutos, não dias
  • Simplicidade — sem infraestrutura para manter
  • Custo — $0 em vez de $200-500/mês para um cluster Airflow

A Abordagem Híbrida

Você pode usar ambos. Pipe cuida da ingestão (a parte chata e repetitiva). Airflow cuida da orquestração (a parte complexa e customizada):

# dags/daily_pipeline.py — Airflow DAG that calls Pipe
from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime
with DAG("daily_ingestion", schedule_interval="@daily", start_date=datetime(2024, 1, 1)) as dag:
ingest = BashOperator(
task_id="run_all_pipes",
bash_command="dataspoc-pipe run --all",
)
dbt_transform = BashOperator(
task_id="dbt_run",
bash_command="dbt run --project-dir /opt/dbt",
)
notify = BashOperator(
task_id="slack_notify",
bash_command='curl -X POST $SLACK_WEBHOOK -d \'{"text": "Pipeline complete"}\'',
)
ingest >> dbt_transform >> notify

O melhor dos dois mundos: simplicidade do Pipe para ingestão, poder do Airflow para orquestração.

Conclusão

Para 90% das tarefas de ingestão de dados, você não precisa de uma DAG de 200 linhas e um cluster Kubernetes rodando Airflow. Você precisa de 10 linhas de YAML e um pip install.

Terminal window
pip install dataspoc-pipe
dataspoc-pipe init my-project --bucket s3://my-bucket
dataspoc-pipe add api-payments --source rest_api
dataspoc-pipe run api-payments

Quatro comandos. Zero infraestrutura. Mesmo resultado.

Recomendados