10 Linhas de YAML vs 200 Linhas de DAG Airflow
Vamos construir o mesmo pipeline ETL duas vezes. Uma com Apache Airflow. Uma com DataSpoc Pipe. Mesma entrada, mesma saída, mesmo resultado. Você decide qual prefere manter às 2h da manhã quando o pager de plantão toca.
O Caso de Uso
Extrair dados de uma API REST (um serviço de pagamentos), transformar (renomear colunas, filtrar nulos, converter tipos) e carregar no S3 como Parquet.
Source: https://api.payments.example.com/v1/transactions
Target: s3://company-data-lake/raw/payments/transactions/
Schedule: A cada hora
Incremental: Apenas buscar transações desde a última execução
A Implementação Airflow
Arquivo 1: docker-compose.yaml (45 linhas)
version: "3.8"services: postgres: image: postgres:15 environment: POSTGRES_USER: airflow POSTGRES_PASSWORD: airflow POSTGRES_DB: airflow volumes: - postgres-data:/var/lib/postgresql/data
webserver: image: apache/airflow:2.8.0 command: webserver ports: - "8080:8080" environment: AIRFLOW__CORE__EXECUTOR: LocalExecutor AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow AIRFLOW__CORE__FERNET_KEY: "your-fernet-key-here" volumes: - ./dags:/opt/airflow/dags - ./plugins:/opt/airflow/plugins depends_on: - postgres
scheduler: image: apache/airflow:2.8.0 command: scheduler environment: AIRFLOW__CORE__EXECUTOR: LocalExecutor AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow AIRFLOW__CORE__FERNET_KEY: "your-fernet-key-here" volumes: - ./dags:/opt/airflow/dags - ./plugins:/opt/airflow/plugins depends_on: - postgres - webserver
volumes: postgres-data:Arquivo 2: requirements.txt (6 linhas)
apache-airflow==2.8.0apache-airflow-providers-amazon==8.0.0requests==2.31.0pyarrow==14.0.0pandas==2.1.0boto3==1.34.0Arquivo 3: dags/payments_etl.py (120 linhas)
from airflow import DAGfrom airflow.operators.python import PythonOperatorfrom airflow.providers.amazon.aws.hooks.s3 import S3Hookfrom airflow.models import Variablefrom datetime import datetime, timedeltaimport requestsimport pandas as pdimport pyarrow as paimport pyarrow.parquet as pqimport jsonimport io
default_args = { "owner": "data-engineering", "depends_on_past": True, "email_on_failure": True, "email": ["oncall@company.com"], "retries": 3, "retry_delay": timedelta(minutes=5),}
def extract_payments(**context): """Extract transactions from payments API.""" api_key = Variable.get("payments_api_key") base_url = "https://api.payments.example.com/v1/transactions"
# Get bookmark (last successful extraction timestamp) try: bookmark = Variable.get("payments_last_timestamp") except KeyError: bookmark = "2024-01-01T00:00:00Z"
headers = {"Authorization": f"Bearer {api_key}"} params = { "created_after": bookmark, "limit": 1000, }
all_records = [] page = 1
while True: params["page"] = page response = requests.get(base_url, headers=headers, params=params) response.raise_for_status() data = response.json()
records = data.get("transactions", []) if not records: break
all_records.extend(records) page += 1
if not data.get("has_more", False): break
if not all_records: return "No new data"
# Push to XCom for next task context["ti"].xcom_push(key="raw_data", value=json.dumps(all_records)) context["ti"].xcom_push(key="record_count", value=len(all_records))
return f"Extracted {len(all_records)} records"
def transform_payments(**context): """Transform: rename columns, filter nulls, cast types.""" raw_json = context["ti"].xcom_pull(key="raw_data", task_ids="extract") records = json.loads(raw_json)
df = pd.DataFrame(records)
# Rename columns df = df.rename(columns={ "txn_id": "transaction_id", "amt": "amount", "ccy": "currency", "ts": "created_at", "cust_id": "customer_id", })
# Filter nulls df = df.dropna(subset=["transaction_id", "amount", "customer_id"])
# Cast types df["amount"] = df["amount"].astype(float) df["created_at"] = pd.to_datetime(df["created_at"]) df["customer_id"] = df["customer_id"].astype(str)
context["ti"].xcom_push(key="transformed_data", value=df.to_json()) return f"Transformed {len(df)} records"
def load_to_s3(**context): """Load transformed data to S3 as Parquet.""" df_json = context["ti"].xcom_pull(key="transformed_data", task_ids="transform") df = pd.read_json(df_json)
table = pa.Table.from_pandas(df) buf = io.BytesIO() pq.write_table(table, buf) buf.seek(0)
execution_date = context["execution_date"].strftime("%Y%m%d_%H%M%S") key = f"raw/payments/transactions/{execution_date}.parquet"
s3_hook = S3Hook(aws_conn_id="s3_data_lake") s3_hook.load_bytes( bytes_data=buf.getvalue(), key=key, bucket_name="company-data-lake", replace=True, )
# Update bookmark max_timestamp = df["created_at"].max().isoformat() Variable.set("payments_last_timestamp", max_timestamp)
return f"Loaded {len(df)} rows to s3://{key}"
with DAG( "payments_etl", default_args=default_args, schedule_interval="@hourly", start_date=datetime(2024, 1, 1), catchup=False, tags=["etl", "payments"],) as dag:
extract = PythonOperator( task_id="extract", python_callable=extract_payments, )
transform = PythonOperator( task_id="transform", python_callable=transform_payments, )
load = PythonOperator( task_id="load", python_callable=load_to_s3, )
extract >> transform >> loadArquivo 4: Setup UI do Airflow (passos manuais)
1. Open http://localhost:80802. Admin → Connections → Add: - Conn ID: s3_data_lake - Conn Type: Amazon Web Services - AWS Access Key ID: **** - AWS Secret Access Key: ****
3. Admin → Variables → Add: - Key: payments_api_key - Value: sk_live_****
4. Admin → Variables → Add: - Key: payments_last_timestamp - Value: 2024-01-01T00:00:00ZTotal: ~200 linhas em 4 arquivos, mais configuração manual na UI.
A Implementação DataSpoc Pipe
Arquivo 1: pipelines/api-payments.yaml (10 linhas)
pipeline: api-paymentssource: type: rest_api url: "https://api.payments.example.com/v1/transactions" auth: bearer ${PAYMENTS_API_KEY} pagination: cursor incremental: field: created_at strategy: bookmark
target: type: s3 bucket: s3://company-data-lake prefix: raw/paymentsArquivo 2: transforms/api-payments.py (5 linhas)
def transform(df): df = df.rename(columns={"txn_id": "transaction_id", "amt": "amount", "ccy": "currency", "ts": "created_at", "cust_id": "customer_id"}) df = df.dropna(subset=["transaction_id", "amount", "customer_id"]) df["amount"] = df["amount"].astype(float) return dfExecute
export PAYMENTS_API_KEY=sk_live_****dataspoc-pipe run api-paymentsSaída:
[api-payments] Extracting from REST API...[api-payments] Page 1: 1000 records[api-payments] Page 2: 847 records[api-payments] Applying transform: transforms/api-payments.py[api-payments] Filtered: 1847 → 1823 records (24 nulls removed)[api-payments] Writing: s3://company-data-lake/raw/payments/transactions/20260415_100000.parquet[api-payments] Bookmark saved: 2026-04-15T09:58:42Z[api-payments] Done. 1,823 rows, 4.1sTotal: 15 linhas em 2 arquivos. Sem infraestrutura.
Comparação Lado a Lado
| Dimensão | Airflow | DataSpoc Pipe |
|---|---|---|
| Arquivos | 4 (docker-compose, requirements, DAG, setup manual) | 2 (YAML + transform) |
| Linhas de código | ~200 | ~15 |
| Infraestrutura | Postgres + Webserver + Scheduler | Nenhuma (apenas o CLI) |
| Custo mensal | $200-500 (cluster mínimo viável) | $0 (roda em qualquer lugar) |
| Tempo de setup | 2-4 horas | 5 minutos |
| Dependências | 6 pacotes Python + Docker | 1 (pip install dataspoc-pipe) |
| Gestão de bookmark | Manual (Airflow Variables) | Automático (arquivo de estado no bucket) |
| Paginação | Manual (escrever o loop você mesmo) | Declarativo (pagination: cursor) |
| Gestão de secrets | Airflow UI + chave Fernet | Variáveis de ambiente |
| Monitoramento | Airflow UI (requer cluster rodando) | Arquivos de log no bucket + CLI status |
| Adicionar nova API | Copiar DAG de 120 linhas, modificar | Copiar YAML de 10 linhas, modificar |
| Agendamento | DAG schedule_interval | Cron ou campo schedule no YAML |
A Comparação de Transform
Airflow faz você escrever o transform como uma função Python dentro da DAG, passar dados pelo XCom (serialização JSON) e gerenciar estado entre tarefas.
Transforms do DataSpoc Pipe são funções puras. Entrada: DataFrame. Saída: DataFrame. Sem necessidade de conhecimento do framework:
# This is the ENTIRE transform filedef transform(df): df = df.rename(columns={"txn_id": "transaction_id", "amt": "amount", "ccy": "currency", "ts": "created_at", "cust_id": "customer_id"}) df = df.dropna(subset=["transaction_id", "amount", "customer_id"]) df["amount"] = df["amount"].astype(float) return dfVocê pode testá-lo localmente com pandas:
import pandas as pdfrom transforms.api_payments import transform
test_df = pd.DataFrame([ {"txn_id": "t1", "amt": "99.99", "ccy": "USD", "ts": "2026-04-15", "cust_id": "c1"}, {"txn_id": None, "amt": "50.00", "ccy": "EUR", "ts": "2026-04-15", "cust_id": "c2"},])
result = transform(test_df)assert len(result) == 1 # null txn_id filtered outassert result.iloc[0]["amount"] == 99.99 # cast to floatAdicionando um Segundo Pipeline
Airflow: Copiar, Modificar, Depurar
Você copia a DAG de 120 linhas, renomeia variáveis, muda o endpoint da API, ajusta a lógica de transform, adiciona uma nova conexão na UI, adiciona novas variáveis. Repita para cada fonte.
DataSpoc Pipe: Copiar, Modificar, Pronto
pipeline: api-crmsource: type: rest_api url: "https://api.crm.example.com/v2/contacts" auth: bearer ${CRM_API_KEY} pagination: offset incremental: field: updated_at strategy: bookmark
target: type: s3 bucket: s3://company-data-lake prefix: raw/crmdataspoc-pipe run api-crmDois minutos, incluindo escrever o YAML.
Quando Airflow É a Escolha Certa
Sejamos justos. Airflow ganha quando você precisa de:
- Dependências complexas de DAG — Tarefa A alimenta Tarefa B que alimenta Tarefas C e D em paralelo, com branching condicional e lógica de retry por tarefa
- Orquestração cross-system — disparar um job Spark, esperar, então disparar um dbt run, então notificar Slack, então disparar retrain de modelo
- Governança enterprise — logs de auditoria, RBAC, workflows de aprovação, monitoramento de SLA
- Time de 10+ engenheiros de dados — plataforma de orquestração central com visibilidade compartilhada
- Investimento existente em Airflow — você já tem 200 DAGs rodando, adicionar mais uma é custo marginal
DataSpoc Pipe é para:
- Ingestão — levar dados da fonte para Parquet no bucket
- Times pequenos a médios (1-5 engenheiros de dados)
- Velocidade — protótipo a produção em minutos, não dias
- Simplicidade — sem infraestrutura para manter
- Custo — $0 em vez de $200-500/mês para um cluster Airflow
A Abordagem Híbrida
Você pode usar ambos. Pipe cuida da ingestão (a parte chata e repetitiva). Airflow cuida da orquestração (a parte complexa e customizada):
# dags/daily_pipeline.py — Airflow DAG that calls Pipefrom airflow import DAGfrom airflow.operators.bash import BashOperatorfrom datetime import datetime
with DAG("daily_ingestion", schedule_interval="@daily", start_date=datetime(2024, 1, 1)) as dag:
ingest = BashOperator( task_id="run_all_pipes", bash_command="dataspoc-pipe run --all", )
dbt_transform = BashOperator( task_id="dbt_run", bash_command="dbt run --project-dir /opt/dbt", )
notify = BashOperator( task_id="slack_notify", bash_command='curl -X POST $SLACK_WEBHOOK -d \'{"text": "Pipeline complete"}\'', )
ingest >> dbt_transform >> notifyO melhor dos dois mundos: simplicidade do Pipe para ingestão, poder do Airflow para orquestração.
Conclusão
Para 90% das tarefas de ingestão de dados, você não precisa de uma DAG de 200 linhas e um cluster Kubernetes rodando Airflow. Você precisa de 10 linhas de YAML e um pip install.
pip install dataspoc-pipedataspoc-pipe init my-project --bucket s3://my-bucketdataspoc-pipe add api-payments --source rest_apidataspoc-pipe run api-paymentsQuatro comandos. Zero infraestrutura. Mesmo resultado.