Construindo um Time de Dados Multi-Agente com CrewAI e DataSpoc
E se todo o seu time de dados pudesse rodar no piloto automático? Um agente de Data Engineer que monitora pipelines e re-executa os desatualizados. Um agente de Data Analyst que explora dados frescos e constrói relatórios. Um agente de ML Engineer que treina modelos quando novos dados chegam.
Com CrewAI e DataSpoc, isso não é ficção científica. São cerca de 100 linhas de Python.
A Arquitetura
┌─────────────────────────────────────────────────┐│ CrewAI Crew │├─────────────────────────────────────────────────┤│ ││ ┌───────────┐ ┌───────────┐ ┌───────────┐ ││ │ DE Agent │→ │ DA Agent │→ │ ML Agent │ ││ │ │ │ │ │ │ ││ │ PipeClient│ │ LensClient│ │ LensClient│ ││ └───────────┘ └───────────┘ └───────────┘ ││ │└─────────────────────────────────────────────────┘ │ │ │ ▼ ▼ ▼ ┌─────────────────────────────────────┐ │ S3 Bucket (Parquet) │ └─────────────────────────────────────┘Cada agente usa o SDK Python do DataSpoc para interagir com o data lake. Sem shell-outs, sem hacks de subprocess.
Pré-requisitos
pip install dataspoc-pipe dataspoc-lens crewaiCertifique-se de que você tem um projeto DataSpoc inicializado:
dataspoc-pipe init my-lake --bucket s3://company-data-lakedataspoc-lens add-bucket company s3://company-data-lakePasso 1: Definir as Ferramentas
Agentes CrewAI precisam de ferramentas. Encapsulamos o SDK do DataSpoc em ferramentas compatíveis com CrewAI:
from crewai.tools import toolfrom dataspoc_pipe import PipeClientfrom dataspoc_lens import LensClient
pipe = PipeClient(project_path="./my-lake")lens = LensClient(bucket_alias="company")
@tool("Check Pipeline Status")def check_pipeline_status(pipeline_name: str) -> str: """Check the status of a DataSpoc pipeline. Returns last run time and row count.""" status = pipe.status(pipeline_name) return ( f"Pipeline: {status.name}\n" f"Last run: {status.last_run}\n" f"Rows extracted: {status.rows_extracted}\n" f"Status: {status.state}\n" f"Stale: {status.is_stale}" )
@tool("Run Pipeline")def run_pipeline(pipeline_name: str) -> str: """Run a DataSpoc Pipe pipeline to extract fresh data.""" result = pipe.run(pipeline_name) return ( f"Pipeline '{pipeline_name}' completed.\n" f"Rows extracted: {result.rows_extracted}\n" f"Files written: {result.files_written}\n" f"Duration: {result.duration_seconds}s" )
@tool("List Pipelines")def list_pipelines() -> str: """List all available pipelines and their status.""" pipelines = pipe.list_pipelines() lines = [] for p in pipelines: lines.append(f"- {p.name} ({p.source_type}) | Last run: {p.last_run} | Stale: {p.is_stale}") return "\n".join(lines)
@tool("Run SQL Query")def run_sql_query(query: str) -> str: """Run a SQL query against the data lake using DuckDB via DataSpoc Lens.""" result = lens.sql(query) return result.to_markdown(max_rows=20)
@tool("List Tables")def list_tables() -> str: """List all tables available in the data lake.""" tables = lens.tables() lines = [] for t in tables: lines.append(f"- {t.layer}/{t.source}/{t.name} ({t.row_count} rows)") return "\n".join(lines)
@tool("AI Ask")def ai_ask(question: str) -> str: """Ask a natural language question about the data lake. Returns SQL + results.""" answer = lens.ask(question) return f"SQL:\n{answer.sql}\n\nResult:\n{answer.result.to_markdown()}"
@tool("Train Model")def train_model(table: str, target_column: str, model_name: str) -> str: """Train an ML model on a table from the data lake.""" result = lens.ml_train( table=table, target=target_column, model_name=model_name, ) return ( f"Model '{model_name}' trained.\n" f"Algorithm: {result.algorithm}\n" f"Metric (R2/Accuracy): {result.score:.4f}\n" f"Features used: {', '.join(result.features)}" )
@tool("Evaluate Model")def evaluate_model(model_name: str) -> str: """Evaluate a trained ML model and return metrics.""" metrics = lens.ml_metrics(model_name) lines = [f"Model: {model_name}"] for k, v in metrics.items(): lines.append(f" {k}: {v:.4f}" if isinstance(v, float) else f" {k}: {v}") return "\n".join(lines)Passo 2: Definir os Agentes
Cada agente tem um papel claro, um objetivo e um conjunto de ferramentas:
from crewai import Agentfrom tools import ( check_pipeline_status, run_pipeline, list_pipelines, run_sql_query, list_tables, ai_ask, train_model, evaluate_model,)
de_agent = Agent( role="Data Engineer", goal="Ensure all data pipelines are fresh. Run any stale pipelines.", backstory=( "You are a senior Data Engineer responsible for data freshness. " "You check pipeline status and re-run any pipeline that is stale. " "You report what was refreshed and any errors." ), tools=[list_pipelines, check_pipeline_status, run_pipeline], verbose=True,)
da_agent = Agent( role="Data Analyst", goal="Explore fresh data and produce a summary report with key metrics.", backstory=( "You are a Data Analyst. After data is refreshed, you explore the tables, " "run SQL queries to compute key business metrics, and produce a brief report. " "You focus on revenue, customer counts, and trends." ), tools=[list_tables, run_sql_query, ai_ask], verbose=True,)
ml_agent = Agent( role="ML Engineer", goal="Train predictive models on fresh data and report model quality.", backstory=( "You are an ML Engineer. When data is fresh, you train models to predict " "business outcomes. You evaluate model quality and report the results. " "You only train when data is available and fresh." ), tools=[list_tables, run_sql_query, train_model, evaluate_model], verbose=True,)Passo 3: Definir Tarefas e o Crew
from crewai import Crew, Task, Processfrom agents import de_agent, da_agent, ml_agent
# Task 1: Data Engineer refreshes pipelinesrefresh_task = Task( description=( "Check all pipelines. For each stale pipeline, run it to get fresh data. " "Report which pipelines were refreshed and how many rows were extracted." ), expected_output="A list of pipelines that were refreshed with row counts.", agent=de_agent,)
# Task 2: Data Analyst explores the dataanalysis_task = Task( description=( "List available tables. Run SQL queries to compute: " "1) Total revenue this month, " "2) Number of active customers, " "3) Top 5 products by revenue. " "Format results as a markdown report." ), expected_output="A markdown report with revenue, customer count, and top products.", agent=da_agent,)
# Task 3: ML Engineer trains a churn modelml_task = Task( description=( "Check if the 'curated/sales/customers' table exists. If it does, " "train a classification model to predict customer churn (target column: 'churned'). " "Name the model 'churn-predictor'. Report the accuracy." ), expected_output="Model training report with accuracy score and features used.", agent=ml_agent,)
# Assemble the crewdata_team = Crew( agents=[de_agent, da_agent, ml_agent], tasks=[refresh_task, analysis_task, ml_task], process=Process.sequential, # DE first, then DA, then ML verbose=True,)Passo 4: Executar o Crew
from crew import data_team
def main(): print("=== DataSpoc Multi-Agent Data Team ===\n") result = data_team.kickoff() print("\n=== Final Report ===") print(result)
if __name__ == "__main__": main()Execute:
python main.pyExemplo de Saída de Execução
=== DataSpoc Multi-Agent Data Team ===
[DE Agent] Checking all pipelines...[DE Agent] Using tool: List Pipelines - postgres-orders (postgres) | Last run: 2026-04-14 18:00 | Stale: True - sheets-marketing (google_sheets) | Last run: 2026-04-15 09:00 | Stale: False - api-payments (rest_api) | Last run: 2026-04-14 12:00 | Stale: True
[DE Agent] Running stale pipeline: postgres-orders[DE Agent] Using tool: Run Pipeline Pipeline 'postgres-orders' completed. Rows extracted: 1,247. Duration: 8s
[DE Agent] Running stale pipeline: api-payments[DE Agent] Using tool: Run Pipeline Pipeline 'api-payments' completed. Rows extracted: 356. Duration: 3s
[DA Agent] Listing available tables...[DA Agent] Using tool: List Tables - raw/postgres/orders (45,892 rows) - raw/postgres/customers (3,201 rows) - curated/sales/customers (3,201 rows) - raw/api/payments (12,445 rows)
[DA Agent] Using tool: Run SQL Query SELECT SUM(amount) as revenue FROM raw.postgres.orders WHERE created_at >= '2026-04-01' → Revenue: $284,521.00
[DA Agent] Using tool: Run SQL Query SELECT COUNT(DISTINCT customer_id) as active_customers FROM raw.postgres.orders WHERE created_at >= '2026-03-15' → Active customers: 1,847
[ML Agent] Using tool: Train Model Training on curated/sales/customers, target=churned Model 'churn-predictor' trained. Algorithm: LightGBM. Accuracy: 0.8734
=== Final Report ===## Pipeline Refresh- postgres-orders: 1,247 new rows- api-payments: 356 new rows
## Business Metrics (April 2026)- Revenue: $284,521- Active customers: 1,847- Top product: Enterprise Plan ($98,200)
## ML Model- churn-predictor: 87.3% accuracy- Key features: days_since_last_order, total_spend, support_ticketsExecutando em um Agendamento
Encapsule com um simples cron ou use qualquer agendador:
# Run every morning at 7am0 7 * * * cd /opt/data-team && python main.py >> /var/log/data-team.log 2>&1Ou use o agendamento integrado do CrewAI:
from crew import data_teamfrom crewai import Crew
# Run every 6 hoursdata_team.kickoff_async(schedule="0 */6 * * *")Adicionando Notificação Slack
Adicione uma ferramenta de notificação para que o agente DA publique resultados no Slack:
import requestsfrom crewai.tools import tool
@tool("Post to Slack")def post_to_slack(message: str) -> str: """Post a message to the #data-reports Slack channel.""" webhook_url = os.environ["SLACK_WEBHOOK_URL"] requests.post(webhook_url, json={"text": message}) return "Posted to Slack successfully."Então adicione à lista de ferramentas do agente DA e atualize a tarefa para incluir “Post the report to Slack.”
Conclusões Principais
- CrewAI cuida da orquestração — você define agentes, papéis e tarefas
- DataSpoc cuida dos dados — pipelines, queries e ML via SDK Python
- Processo sequencial garante que o DE rode antes do DA, e o DA antes do ML
- Cada agente é focado — poucas ferramentas, objetivo claro, backstory específico
- O bucket é o contrato — agentes se comunicam através de dados, não chamadas diretas
Este é um ponto de partida. Você pode adicionar mais agentes (agente de QA que valida qualidade de dados, agente de Ops que monitora custos) ou mudar para Process.hierarchical para um agente gerente que delega dinamicamente.