Construyendo un Equipo de Datos Multi-Agente con CrewAI y DataSpoc
¿Qué pasaría si todo tu equipo de datos pudiera funciónar en piloto automático? Un agente de Ingeniero de Datos que monitorea pipelines y re-ejecuta los que están desactualizados. Un agente de Analista de Datos que explora datos frescos y construye reportes. Un agente de Ingeniero ML que entrena modelos cuando llegan datos nuevos.
Con CrewAI y DataSpoc, esto no es ciencia ficción. Son unas 100 líneas de Python.
La Arquitectura
┌─────────────────────────────────────────────────┐│ CrewAI Crew │├─────────────────────────────────────────────────┤│ ││ ┌───────────┐ ┌───────────┐ ┌───────────┐ ││ │ DE Agent │→ │ DA Agent │→ │ ML Agent │ ││ │ │ │ │ │ │ ││ │ PipeClient│ │ LensClient│ │ LensClient│ ││ └───────────┘ └───────────┘ └───────────┘ ││ │└─────────────────────────────────────────────────┘ │ │ │ ▼ ▼ ▼ ┌─────────────────────────────────────┐ │ S3 Bucket (Parquet) │ └─────────────────────────────────────┘Cada agente usa el SDK de Python de DataSpoc para interactuar con el data lake. Sin shell-outs, sin hacks de subproceso.
Requisitos Previos
pip install dataspoc-pipe dataspoc-lens crewaiAsegúrate de tener un proyecto DataSpoc inicializado:
dataspoc-pipe init my-lake --bucket s3://company-data-lakedataspoc-lens add-bucket company s3://company-data-lakePaso 1: Definir las Herramientas
Los agentes de CrewAI necesitan herramientas. Envolvemos el SDK de DataSpoc en herramientas compatibles con CrewAI:
from crewai.tools import toolfrom dataspoc_pipe import PipeClientfrom dataspoc_lens import LensClient
pipe = PipeClient(project_path="./my-lake")lens = LensClient(bucket_alias="company")
@tool("Check Pipeline Status")def check_pipeline_status(pipeline_name: str) -> str: """Check the status of a DataSpoc pipeline. Returns last run time and row count.""" status = pipe.status(pipeline_name) return ( f"Pipeline: {status.name}\n" f"Last run: {status.last_run}\n" f"Rows extracted: {status.rows_extracted}\n" f"Status: {status.state}\n" f"Stale: {status.is_stale}" )
@tool("Run Pipeline")def run_pipeline(pipeline_name: str) -> str: """Run a DataSpoc Pipe pipeline to extract fresh data.""" result = pipe.run(pipeline_name) return ( f"Pipeline '{pipeline_name}' completed.\n" f"Rows extracted: {result.rows_extracted}\n" f"Files written: {result.files_written}\n" f"Duration: {result.duration_seconds}s" )
@tool("List Pipelines")def list_pipelines() -> str: """List all available pipelines and their status.""" pipelines = pipe.list_pipelines() lines = [] for p in pipelines: lines.append(f"- {p.name} ({p.source_type}) | Last run: {p.last_run} | Stale: {p.is_stale}") return "\n".join(lines)
@tool("Run SQL Query")def run_sql_query(query: str) -> str: """Run a SQL query against the data lake using DuckDB via DataSpoc Lens.""" result = lens.sql(query) return result.to_markdown(max_rows=20)
@tool("List Tables")def list_tables() -> str: """List all tables available in the data lake.""" tables = lens.tables() lines = [] for t in tables: lines.append(f"- {t.layer}/{t.source}/{t.name} ({t.row_count} rows)") return "\n".join(lines)
@tool("AI Ask")def ai_ask(question: str) -> str: """Ask a natural language question about the data lake. Returns SQL + results.""" answer = lens.ask(question) return f"SQL:\n{answer.sql}\n\nResult:\n{answer.result.to_markdown()}"
@tool("Train Model")def train_model(table: str, target_column: str, model_name: str) -> str: """Train an ML model on a table from the data lake.""" result = lens.ml_train( table=table, target=target_column, model_name=model_name, ) return ( f"Model '{model_name}' trained.\n" f"Algorithm: {result.algorithm}\n" f"Metric (R2/Accuracy): {result.score:.4f}\n" f"Features used: {', '.join(result.features)}" )
@tool("Evaluate Model")def evaluate_model(model_name: str) -> str: """Evaluate a trained ML model and return metrics.""" metrics = lens.ml_metrics(model_name) lines = [f"Model: {model_name}"] for k, v in metrics.items(): lines.append(f" {k}: {v:.4f}" if isinstance(v, float) else f" {k}: {v}") return "\n".join(lines)Paso 2: Definir los Agentes
Cada agente tiene un rol claro, un objetivo y un conjunto de herramientas:
from crewai import Agentfrom tools import ( check_pipeline_status, run_pipeline, list_pipelines, run_sql_query, list_tables, ai_ask, train_model, evaluate_model,)
de_agent = Agent( role="Data Engineer", goal="Ensure all data pipelines are fresh. Run any stale pipelines.", backstory=( "You are a senior Data Engineer responsible for data freshness. " "You check pipeline status and re-run any pipeline that is stale. " "You report what was refreshed and any errors." ), tools=[list_pipelines, check_pipeline_status, run_pipeline], verbose=True,)
da_agent = Agent( role="Data Analyst", goal="Explore fresh data and produce a summary report with key metrics.", backstory=( "You are a Data Analyst. After data is refreshed, you explore the tables, " "run SQL queries to compute key business metrics, and produce a brief report. " "You focus on revenue, customer counts, and trends." ), tools=[list_tables, run_sql_query, ai_ask], verbose=True,)
ml_agent = Agent( role="ML Engineer", goal="Train predictive models on fresh data and report model quality.", backstory=( "You are an ML Engineer. When data is fresh, you train models to predict " "business outcomes. You evaluate model quality and report the results. " "You only train when data is available and fresh." ), tools=[list_tables, run_sql_query, train_model, evaluate_model], verbose=True,)Paso 3: Definir Tareas y el Crew
from crewai import Crew, Task, Processfrom agents import de_agent, da_agent, ml_agent
# Task 1: Data Engineer refreshes pipelinesrefresh_task = Task( description=( "Check all pipelines. For each stale pipeline, run it to get fresh data. " "Report which pipelines were refreshed and how many rows were extracted." ), expected_output="A list of pipelines that were refreshed with row counts.", agent=de_agent,)
# Task 2: Data Analyst explores the dataanalysis_task = Task( description=( "List available tables. Run SQL queries to compute: " "1) Total revenue this month, " "2) Number of active customers, " "3) Top 5 products by revenue. " "Format results as a markdown report." ), expected_output="A markdown report with revenue, customer count, and top products.", agent=da_agent,)
# Task 3: ML Engineer trains a churn modelml_task = Task( description=( "Check if the 'curated/sales/customers' table exists. If it does, " "train a classification model to predict customer churn (target column: 'churned'). " "Name the model 'churn-predictor'. Report the accuracy." ), expected_output="Model training report with accuracy score and features used.", agent=ml_agent,)
# Assemble the crewdata_team = Crew( agents=[de_agent, da_agent, ml_agent], tasks=[refresh_task, analysis_task, ml_task], process=Process.sequential, # DE first, then DA, then ML verbose=True,)Paso 4: Ejecutar el Crew
from crew import data_team
def main(): print("=== DataSpoc Multi-Agent Data Team ===\n") result = data_team.kickoff() print("\n=== Final Report ===") print(result)
if __name__ == "__main__": main()Ejecútalo:
python main.pyEjemplo de Salida de Ejecución
=== DataSpoc Multi-Agent Data Team ===
[DE Agent] Checking all pipelines...[DE Agent] Using tool: List Pipelines - postgres-orders (postgres) | Last run: 2026-04-14 18:00 | Stale: True - sheets-marketing (google_sheets) | Last run: 2026-04-15 09:00 | Stale: False - api-payments (rest_api) | Last run: 2026-04-14 12:00 | Stale: True
[DE Agent] Running stale pipeline: postgres-orders[DE Agent] Using tool: Run Pipeline Pipeline 'postgres-orders' completed. Rows extracted: 1,247. Duration: 8s
[DE Agent] Running stale pipeline: api-payments[DE Agent] Using tool: Run Pipeline Pipeline 'api-payments' completed. Rows extracted: 356. Duration: 3s
[DA Agent] Listing available tables...[DA Agent] Using tool: List Tables - raw/postgres/orders (45,892 rows) - raw/postgres/customers (3,201 rows) - curated/sales/customers (3,201 rows) - raw/api/payments (12,445 rows)
[DA Agent] Using tool: Run SQL Query SELECT SUM(amount) as revenue FROM raw.postgres.orders WHERE created_at >= '2026-04-01' → Revenue: $284,521.00
[DA Agent] Using tool: Run SQL Query SELECT COUNT(DISTINCT customer_id) as active_customers FROM raw.postgres.orders WHERE created_at >= '2026-03-15' → Active customers: 1,847
[ML Agent] Using tool: Train Model Training on curated/sales/customers, target=churned Model 'churn-predictor' trained. Algorithm: LightGBM. Accuracy: 0.8734
=== Final Report ===## Pipeline Refresh- postgres-orders: 1,247 new rows- api-payments: 356 new rows
## Business Metrics (April 2026)- Revenue: $284,521- Active customers: 1,847- Top product: Enterprise Plan ($98,200)
## ML Model- churn-predictor: 87.3% accuracy- Key features: days_since_last_order, total_spend, support_ticketsEjecutar con un Horario
Envuélvelo con un simple cron o usa cualquier programador:
# Run every morning at 7am0 7 * * * cd /opt/data-team && python main.py >> /var/log/data-team.log 2>&1O usa la programación integrada de CrewAI:
from crew import data_teamfrom crewai import Crew
# Run every 6 hoursdata_team.kickoff_async(schedule="0 */6 * * *")Agregar una Notificación a Slack
Agrega una herramienta de notificación para que el agente DA publique resultados en Slack:
import requestsfrom crewai.tools import tool
@tool("Post to Slack")def post_to_slack(message: str) -> str: """Post a message to the #data-reports Slack channel.""" webhook_url = os.environ["SLACK_WEBHOOK_URL"] requests.post(webhook_url, json={"text": message}) return "Posted to Slack successfully."Luego agrégalo a la lista de herramientas del agente DA y actualiza la tarea para incluir “Post the report to Slack.”
Conclusiones Clave
- CrewAI maneja la orquestación — defines agentes, roles y tareas
- DataSpoc maneja los datos — pipelines, consultas y ML vía SDK de Python
- El proceso secuencial asegura que DE se ejecute antes que DA, y DA antes que ML
- Cada agente está enfocado — conjunto pequeño de herramientas, objetivo claro, backstory específico
- El bucket es el contrato — los agentes se comúnican a través de datos, no llamadas directas
Este es un punto de partida. Puedes agregar más agentes (agente QA que valida calidad de datos, agente Ops que monitorea costos) o cambiar a Process.hierarchical para un agente manager que delega dinámicamente.