crewaimulti-agentai-agentsdata-engineeringpython

Construyendo un Equipo de Datos Multi-Agente con CrewAI y DataSpoc

Michael San Martim · 2026-04-25

¿Qué pasaría si todo tu equipo de datos pudiera funciónar en piloto automático? Un agente de Ingeniero de Datos que monitorea pipelines y re-ejecuta los que están desactualizados. Un agente de Analista de Datos que explora datos frescos y construye reportes. Un agente de Ingeniero ML que entrena modelos cuando llegan datos nuevos.

Con CrewAI y DataSpoc, esto no es ciencia ficción. Son unas 100 líneas de Python.

La Arquitectura

┌─────────────────────────────────────────────────┐
│ CrewAI Crew │
├─────────────────────────────────────────────────┤
│ │
│ ┌───────────┐ ┌───────────┐ ┌───────────┐ │
│ │ DE Agent │→ │ DA Agent │→ │ ML Agent │ │
│ │ │ │ │ │ │ │
│ │ PipeClient│ │ LensClient│ │ LensClient│ │
│ └───────────┘ └───────────┘ └───────────┘ │
│ │
└─────────────────────────────────────────────────┘
│ │ │
▼ ▼ ▼
┌─────────────────────────────────────┐
│ S3 Bucket (Parquet) │
└─────────────────────────────────────┘

Cada agente usa el SDK de Python de DataSpoc para interactuar con el data lake. Sin shell-outs, sin hacks de subproceso.

Requisitos Previos

Terminal window
pip install dataspoc-pipe dataspoc-lens crewai

Asegúrate de tener un proyecto DataSpoc inicializado:

Terminal window
dataspoc-pipe init my-lake --bucket s3://company-data-lake
dataspoc-lens add-bucket company s3://company-data-lake

Paso 1: Definir las Herramientas

Los agentes de CrewAI necesitan herramientas. Envolvemos el SDK de DataSpoc en herramientas compatibles con CrewAI:

tools.py
from crewai.tools import tool
from dataspoc_pipe import PipeClient
from dataspoc_lens import LensClient
pipe = PipeClient(project_path="./my-lake")
lens = LensClient(bucket_alias="company")
@tool("Check Pipeline Status")
def check_pipeline_status(pipeline_name: str) -> str:
"""Check the status of a DataSpoc pipeline. Returns last run time and row count."""
status = pipe.status(pipeline_name)
return (
f"Pipeline: {status.name}\n"
f"Last run: {status.last_run}\n"
f"Rows extracted: {status.rows_extracted}\n"
f"Status: {status.state}\n"
f"Stale: {status.is_stale}"
)
@tool("Run Pipeline")
def run_pipeline(pipeline_name: str) -> str:
"""Run a DataSpoc Pipe pipeline to extract fresh data."""
result = pipe.run(pipeline_name)
return (
f"Pipeline '{pipeline_name}' completed.\n"
f"Rows extracted: {result.rows_extracted}\n"
f"Files written: {result.files_written}\n"
f"Duration: {result.duration_seconds}s"
)
@tool("List Pipelines")
def list_pipelines() -> str:
"""List all available pipelines and their status."""
pipelines = pipe.list_pipelines()
lines = []
for p in pipelines:
lines.append(f"- {p.name} ({p.source_type}) | Last run: {p.last_run} | Stale: {p.is_stale}")
return "\n".join(lines)
@tool("Run SQL Query")
def run_sql_query(query: str) -> str:
"""Run a SQL query against the data lake using DuckDB via DataSpoc Lens."""
result = lens.sql(query)
return result.to_markdown(max_rows=20)
@tool("List Tables")
def list_tables() -> str:
"""List all tables available in the data lake."""
tables = lens.tables()
lines = []
for t in tables:
lines.append(f"- {t.layer}/{t.source}/{t.name} ({t.row_count} rows)")
return "\n".join(lines)
@tool("AI Ask")
def ai_ask(question: str) -> str:
"""Ask a natural language question about the data lake. Returns SQL + results."""
answer = lens.ask(question)
return f"SQL:\n{answer.sql}\n\nResult:\n{answer.result.to_markdown()}"
@tool("Train Model")
def train_model(table: str, target_column: str, model_name: str) -> str:
"""Train an ML model on a table from the data lake."""
result = lens.ml_train(
table=table,
target=target_column,
model_name=model_name,
)
return (
f"Model '{model_name}' trained.\n"
f"Algorithm: {result.algorithm}\n"
f"Metric (R2/Accuracy): {result.score:.4f}\n"
f"Features used: {', '.join(result.features)}"
)
@tool("Evaluate Model")
def evaluate_model(model_name: str) -> str:
"""Evaluate a trained ML model and return metrics."""
metrics = lens.ml_metrics(model_name)
lines = [f"Model: {model_name}"]
for k, v in metrics.items():
lines.append(f" {k}: {v:.4f}" if isinstance(v, float) else f" {k}: {v}")
return "\n".join(lines)

Paso 2: Definir los Agentes

Cada agente tiene un rol claro, un objetivo y un conjunto de herramientas:

agents.py
from crewai import Agent
from tools import (
check_pipeline_status,
run_pipeline,
list_pipelines,
run_sql_query,
list_tables,
ai_ask,
train_model,
evaluate_model,
)
de_agent = Agent(
role="Data Engineer",
goal="Ensure all data pipelines are fresh. Run any stale pipelines.",
backstory=(
"You are a senior Data Engineer responsible for data freshness. "
"You check pipeline status and re-run any pipeline that is stale. "
"You report what was refreshed and any errors."
),
tools=[list_pipelines, check_pipeline_status, run_pipeline],
verbose=True,
)
da_agent = Agent(
role="Data Analyst",
goal="Explore fresh data and produce a summary report with key metrics.",
backstory=(
"You are a Data Analyst. After data is refreshed, you explore the tables, "
"run SQL queries to compute key business metrics, and produce a brief report. "
"You focus on revenue, customer counts, and trends."
),
tools=[list_tables, run_sql_query, ai_ask],
verbose=True,
)
ml_agent = Agent(
role="ML Engineer",
goal="Train predictive models on fresh data and report model quality.",
backstory=(
"You are an ML Engineer. When data is fresh, you train models to predict "
"business outcomes. You evaluate model quality and report the results. "
"You only train when data is available and fresh."
),
tools=[list_tables, run_sql_query, train_model, evaluate_model],
verbose=True,
)

Paso 3: Definir Tareas y el Crew

crew.py
from crewai import Crew, Task, Process
from agents import de_agent, da_agent, ml_agent
# Task 1: Data Engineer refreshes pipelines
refresh_task = Task(
description=(
"Check all pipelines. For each stale pipeline, run it to get fresh data. "
"Report which pipelines were refreshed and how many rows were extracted."
),
expected_output="A list of pipelines that were refreshed with row counts.",
agent=de_agent,
)
# Task 2: Data Analyst explores the data
analysis_task = Task(
description=(
"List available tables. Run SQL queries to compute: "
"1) Total revenue this month, "
"2) Number of active customers, "
"3) Top 5 products by revenue. "
"Format results as a markdown report."
),
expected_output="A markdown report with revenue, customer count, and top products.",
agent=da_agent,
)
# Task 3: ML Engineer trains a churn model
ml_task = Task(
description=(
"Check if the 'curated/sales/customers' table exists. If it does, "
"train a classification model to predict customer churn (target column: 'churned'). "
"Name the model 'churn-predictor'. Report the accuracy."
),
expected_output="Model training report with accuracy score and features used.",
agent=ml_agent,
)
# Assemble the crew
data_team = Crew(
agents=[de_agent, da_agent, ml_agent],
tasks=[refresh_task, analysis_task, ml_task],
process=Process.sequential, # DE first, then DA, then ML
verbose=True,
)

Paso 4: Ejecutar el Crew

main.py
from crew import data_team
def main():
print("=== DataSpoc Multi-Agent Data Team ===\n")
result = data_team.kickoff()
print("\n=== Final Report ===")
print(result)
if __name__ == "__main__":
main()

Ejecútalo:

Terminal window
python main.py

Ejemplo de Salida de Ejecución

=== DataSpoc Multi-Agent Data Team ===
[DE Agent] Checking all pipelines...
[DE Agent] Using tool: List Pipelines
- postgres-orders (postgres) | Last run: 2026-04-14 18:00 | Stale: True
- sheets-marketing (google_sheets) | Last run: 2026-04-15 09:00 | Stale: False
- api-payments (rest_api) | Last run: 2026-04-14 12:00 | Stale: True
[DE Agent] Running stale pipeline: postgres-orders
[DE Agent] Using tool: Run Pipeline
Pipeline 'postgres-orders' completed. Rows extracted: 1,247. Duration: 8s
[DE Agent] Running stale pipeline: api-payments
[DE Agent] Using tool: Run Pipeline
Pipeline 'api-payments' completed. Rows extracted: 356. Duration: 3s
[DA Agent] Listing available tables...
[DA Agent] Using tool: List Tables
- raw/postgres/orders (45,892 rows)
- raw/postgres/customers (3,201 rows)
- curated/sales/customers (3,201 rows)
- raw/api/payments (12,445 rows)
[DA Agent] Using tool: Run SQL Query
SELECT SUM(amount) as revenue FROM raw.postgres.orders
WHERE created_at >= '2026-04-01'
→ Revenue: $284,521.00
[DA Agent] Using tool: Run SQL Query
SELECT COUNT(DISTINCT customer_id) as active_customers
FROM raw.postgres.orders WHERE created_at >= '2026-03-15'
→ Active customers: 1,847
[ML Agent] Using tool: Train Model
Training on curated/sales/customers, target=churned
Model 'churn-predictor' trained. Algorithm: LightGBM. Accuracy: 0.8734
=== Final Report ===
## Pipeline Refresh
- postgres-orders: 1,247 new rows
- api-payments: 356 new rows
## Business Metrics (April 2026)
- Revenue: $284,521
- Active customers: 1,847
- Top product: Enterprise Plan ($98,200)
## ML Model
- churn-predictor: 87.3% accuracy
- Key features: days_since_last_order, total_spend, support_tickets

Ejecutar con un Horario

Envuélvelo con un simple cron o usa cualquier programador:

Terminal window
# Run every morning at 7am
0 7 * * * cd /opt/data-team && python main.py >> /var/log/data-team.log 2>&1

O usa la programación integrada de CrewAI:

scheduled.py
from crew import data_team
from crewai import Crew
# Run every 6 hours
data_team.kickoff_async(schedule="0 */6 * * *")

Agregar una Notificación a Slack

Agrega una herramienta de notificación para que el agente DA publique resultados en Slack:

import requests
from crewai.tools import tool
@tool("Post to Slack")
def post_to_slack(message: str) -> str:
"""Post a message to the #data-reports Slack channel."""
webhook_url = os.environ["SLACK_WEBHOOK_URL"]
requests.post(webhook_url, json={"text": message})
return "Posted to Slack successfully."

Luego agrégalo a la lista de herramientas del agente DA y actualiza la tarea para incluir “Post the report to Slack.”

Conclusiones Clave

  1. CrewAI maneja la orquestación — defines agentes, roles y tareas
  2. DataSpoc maneja los datos — pipelines, consultas y ML vía SDK de Python
  3. El proceso secuencial asegura que DE se ejecute antes que DA, y DA antes que ML
  4. Cada agente está enfocado — conjunto pequeño de herramientas, objetivo claro, backstory específico
  5. El bucket es el contrato — los agentes se comúnican a través de datos, no llamadas directas

Este es un punto de partida. Puedes agregar más agentes (agente QA que valida calidad de datos, agente Ops que monitorea costos) o cambiar a Process.hierarchical para un agente manager que delega dinámicamente.

Recomendados