crewaimulti-agentai-agentsdata-engineeringpython

Construindo um Time de Dados Multi-Agente com CrewAI e DataSpoc

Michael San Martim · 2026-04-25

E se todo o seu time de dados pudesse rodar no piloto automático? Um agente de Data Engineer que monitora pipelines e re-executa os desatualizados. Um agente de Data Analyst que explora dados frescos e constrói relatórios. Um agente de ML Engineer que treina modelos quando novos dados chegam.

Com CrewAI e DataSpoc, isso não é ficção científica. São cerca de 100 linhas de Python.

A Arquitetura

┌─────────────────────────────────────────────────┐
│ CrewAI Crew │
├─────────────────────────────────────────────────┤
│ │
│ ┌───────────┐ ┌───────────┐ ┌───────────┐ │
│ │ DE Agent │→ │ DA Agent │→ │ ML Agent │ │
│ │ │ │ │ │ │ │
│ │ PipeClient│ │ LensClient│ │ LensClient│ │
│ └───────────┘ └───────────┘ └───────────┘ │
│ │
└─────────────────────────────────────────────────┘
│ │ │
▼ ▼ ▼
┌─────────────────────────────────────┐
│ S3 Bucket (Parquet) │
└─────────────────────────────────────┘

Cada agente usa o SDK Python do DataSpoc para interagir com o data lake. Sem shell-outs, sem hacks de subprocess.

Pré-requisitos

Terminal window
pip install dataspoc-pipe dataspoc-lens crewai

Certifique-se de que você tem um projeto DataSpoc inicializado:

Terminal window
dataspoc-pipe init my-lake --bucket s3://company-data-lake
dataspoc-lens add-bucket company s3://company-data-lake

Passo 1: Definir as Ferramentas

Agentes CrewAI precisam de ferramentas. Encapsulamos o SDK do DataSpoc em ferramentas compatíveis com CrewAI:

tools.py
from crewai.tools import tool
from dataspoc_pipe import PipeClient
from dataspoc_lens import LensClient
pipe = PipeClient(project_path="./my-lake")
lens = LensClient(bucket_alias="company")
@tool("Check Pipeline Status")
def check_pipeline_status(pipeline_name: str) -> str:
"""Check the status of a DataSpoc pipeline. Returns last run time and row count."""
status = pipe.status(pipeline_name)
return (
f"Pipeline: {status.name}\n"
f"Last run: {status.last_run}\n"
f"Rows extracted: {status.rows_extracted}\n"
f"Status: {status.state}\n"
f"Stale: {status.is_stale}"
)
@tool("Run Pipeline")
def run_pipeline(pipeline_name: str) -> str:
"""Run a DataSpoc Pipe pipeline to extract fresh data."""
result = pipe.run(pipeline_name)
return (
f"Pipeline '{pipeline_name}' completed.\n"
f"Rows extracted: {result.rows_extracted}\n"
f"Files written: {result.files_written}\n"
f"Duration: {result.duration_seconds}s"
)
@tool("List Pipelines")
def list_pipelines() -> str:
"""List all available pipelines and their status."""
pipelines = pipe.list_pipelines()
lines = []
for p in pipelines:
lines.append(f"- {p.name} ({p.source_type}) | Last run: {p.last_run} | Stale: {p.is_stale}")
return "\n".join(lines)
@tool("Run SQL Query")
def run_sql_query(query: str) -> str:
"""Run a SQL query against the data lake using DuckDB via DataSpoc Lens."""
result = lens.sql(query)
return result.to_markdown(max_rows=20)
@tool("List Tables")
def list_tables() -> str:
"""List all tables available in the data lake."""
tables = lens.tables()
lines = []
for t in tables:
lines.append(f"- {t.layer}/{t.source}/{t.name} ({t.row_count} rows)")
return "\n".join(lines)
@tool("AI Ask")
def ai_ask(question: str) -> str:
"""Ask a natural language question about the data lake. Returns SQL + results."""
answer = lens.ask(question)
return f"SQL:\n{answer.sql}\n\nResult:\n{answer.result.to_markdown()}"
@tool("Train Model")
def train_model(table: str, target_column: str, model_name: str) -> str:
"""Train an ML model on a table from the data lake."""
result = lens.ml_train(
table=table,
target=target_column,
model_name=model_name,
)
return (
f"Model '{model_name}' trained.\n"
f"Algorithm: {result.algorithm}\n"
f"Metric (R2/Accuracy): {result.score:.4f}\n"
f"Features used: {', '.join(result.features)}"
)
@tool("Evaluate Model")
def evaluate_model(model_name: str) -> str:
"""Evaluate a trained ML model and return metrics."""
metrics = lens.ml_metrics(model_name)
lines = [f"Model: {model_name}"]
for k, v in metrics.items():
lines.append(f" {k}: {v:.4f}" if isinstance(v, float) else f" {k}: {v}")
return "\n".join(lines)

Passo 2: Definir os Agentes

Cada agente tem um papel claro, um objetivo e um conjunto de ferramentas:

agents.py
from crewai import Agent
from tools import (
check_pipeline_status,
run_pipeline,
list_pipelines,
run_sql_query,
list_tables,
ai_ask,
train_model,
evaluate_model,
)
de_agent = Agent(
role="Data Engineer",
goal="Ensure all data pipelines are fresh. Run any stale pipelines.",
backstory=(
"You are a senior Data Engineer responsible for data freshness. "
"You check pipeline status and re-run any pipeline that is stale. "
"You report what was refreshed and any errors."
),
tools=[list_pipelines, check_pipeline_status, run_pipeline],
verbose=True,
)
da_agent = Agent(
role="Data Analyst",
goal="Explore fresh data and produce a summary report with key metrics.",
backstory=(
"You are a Data Analyst. After data is refreshed, you explore the tables, "
"run SQL queries to compute key business metrics, and produce a brief report. "
"You focus on revenue, customer counts, and trends."
),
tools=[list_tables, run_sql_query, ai_ask],
verbose=True,
)
ml_agent = Agent(
role="ML Engineer",
goal="Train predictive models on fresh data and report model quality.",
backstory=(
"You are an ML Engineer. When data is fresh, you train models to predict "
"business outcomes. You evaluate model quality and report the results. "
"You only train when data is available and fresh."
),
tools=[list_tables, run_sql_query, train_model, evaluate_model],
verbose=True,
)

Passo 3: Definir Tarefas e o Crew

crew.py
from crewai import Crew, Task, Process
from agents import de_agent, da_agent, ml_agent
# Task 1: Data Engineer refreshes pipelines
refresh_task = Task(
description=(
"Check all pipelines. For each stale pipeline, run it to get fresh data. "
"Report which pipelines were refreshed and how many rows were extracted."
),
expected_output="A list of pipelines that were refreshed with row counts.",
agent=de_agent,
)
# Task 2: Data Analyst explores the data
analysis_task = Task(
description=(
"List available tables. Run SQL queries to compute: "
"1) Total revenue this month, "
"2) Number of active customers, "
"3) Top 5 products by revenue. "
"Format results as a markdown report."
),
expected_output="A markdown report with revenue, customer count, and top products.",
agent=da_agent,
)
# Task 3: ML Engineer trains a churn model
ml_task = Task(
description=(
"Check if the 'curated/sales/customers' table exists. If it does, "
"train a classification model to predict customer churn (target column: 'churned'). "
"Name the model 'churn-predictor'. Report the accuracy."
),
expected_output="Model training report with accuracy score and features used.",
agent=ml_agent,
)
# Assemble the crew
data_team = Crew(
agents=[de_agent, da_agent, ml_agent],
tasks=[refresh_task, analysis_task, ml_task],
process=Process.sequential, # DE first, then DA, then ML
verbose=True,
)

Passo 4: Executar o Crew

main.py
from crew import data_team
def main():
print("=== DataSpoc Multi-Agent Data Team ===\n")
result = data_team.kickoff()
print("\n=== Final Report ===")
print(result)
if __name__ == "__main__":
main()

Execute:

Terminal window
python main.py

Exemplo de Saída de Execução

=== DataSpoc Multi-Agent Data Team ===
[DE Agent] Checking all pipelines...
[DE Agent] Using tool: List Pipelines
- postgres-orders (postgres) | Last run: 2026-04-14 18:00 | Stale: True
- sheets-marketing (google_sheets) | Last run: 2026-04-15 09:00 | Stale: False
- api-payments (rest_api) | Last run: 2026-04-14 12:00 | Stale: True
[DE Agent] Running stale pipeline: postgres-orders
[DE Agent] Using tool: Run Pipeline
Pipeline 'postgres-orders' completed. Rows extracted: 1,247. Duration: 8s
[DE Agent] Running stale pipeline: api-payments
[DE Agent] Using tool: Run Pipeline
Pipeline 'api-payments' completed. Rows extracted: 356. Duration: 3s
[DA Agent] Listing available tables...
[DA Agent] Using tool: List Tables
- raw/postgres/orders (45,892 rows)
- raw/postgres/customers (3,201 rows)
- curated/sales/customers (3,201 rows)
- raw/api/payments (12,445 rows)
[DA Agent] Using tool: Run SQL Query
SELECT SUM(amount) as revenue FROM raw.postgres.orders
WHERE created_at >= '2026-04-01'
→ Revenue: $284,521.00
[DA Agent] Using tool: Run SQL Query
SELECT COUNT(DISTINCT customer_id) as active_customers
FROM raw.postgres.orders WHERE created_at >= '2026-03-15'
→ Active customers: 1,847
[ML Agent] Using tool: Train Model
Training on curated/sales/customers, target=churned
Model 'churn-predictor' trained. Algorithm: LightGBM. Accuracy: 0.8734
=== Final Report ===
## Pipeline Refresh
- postgres-orders: 1,247 new rows
- api-payments: 356 new rows
## Business Metrics (April 2026)
- Revenue: $284,521
- Active customers: 1,847
- Top product: Enterprise Plan ($98,200)
## ML Model
- churn-predictor: 87.3% accuracy
- Key features: days_since_last_order, total_spend, support_tickets

Executando em um Agendamento

Encapsule com um simples cron ou use qualquer agendador:

Terminal window
# Run every morning at 7am
0 7 * * * cd /opt/data-team && python main.py >> /var/log/data-team.log 2>&1

Ou use o agendamento integrado do CrewAI:

scheduled.py
from crew import data_team
from crewai import Crew
# Run every 6 hours
data_team.kickoff_async(schedule="0 */6 * * *")

Adicionando Notificação Slack

Adicione uma ferramenta de notificação para que o agente DA publique resultados no Slack:

import requests
from crewai.tools import tool
@tool("Post to Slack")
def post_to_slack(message: str) -> str:
"""Post a message to the #data-reports Slack channel."""
webhook_url = os.environ["SLACK_WEBHOOK_URL"]
requests.post(webhook_url, json={"text": message})
return "Posted to Slack successfully."

Então adicione à lista de ferramentas do agente DA e atualize a tarefa para incluir “Post the report to Slack.”

Conclusões Principais

  1. CrewAI cuida da orquestração — você define agentes, papéis e tarefas
  2. DataSpoc cuida dos dados — pipelines, queries e ML via SDK Python
  3. Processo sequencial garante que o DE rode antes do DA, e o DA antes do ML
  4. Cada agente é focado — poucas ferramentas, objetivo claro, backstory específico
  5. O bucket é o contrato — agentes se comunicam através de dados, não chamadas diretas

Este é um ponto de partida. Você pode adicionar mais agentes (agente de QA que valida qualidade de dados, agente de Ops que monitora custos) ou mudar para Process.hierarchical para um agente gerente que delega dinamicamente.

Recomendados