Building a Multi-Agent Data Team with CrewAI and DataSpoc
What if your entire data team could run on autopilot? A Data Engineer agent that monitors pipelines and re-runs stale ones. A Data Analyst agent that explores fresh data and builds reports. An ML Engineer agent that trains models when new data arrives.
With CrewAI and DataSpoc, this is not science fiction. It is about 100 lines of Python.
The Architecture
┌─────────────────────────────────────────────────┐│ CrewAI Crew │├─────────────────────────────────────────────────┤│ ││ ┌───────────┐ ┌───────────┐ ┌───────────┐ ││ │ DE Agent │→ │ DA Agent │→ │ ML Agent │ ││ │ │ │ │ │ │ ││ │ PipeClient│ │ LensClient│ │ LensClient│ ││ └───────────┘ └───────────┘ └───────────┘ ││ │└─────────────────────────────────────────────────┘ │ │ │ ▼ ▼ ▼ ┌─────────────────────────────────────┐ │ S3 Bucket (Parquet) │ └─────────────────────────────────────┘Each agent uses DataSpoc’s Python SDK to interact with the data lake. No shell-outs, no subprocess hacks.
Prerequisites
pip install dataspoc-pipe dataspoc-lens crewaiMake sure you have a DataSpoc project initialized:
dataspoc-pipe init my-lake --bucket s3://company-data-lakedataspoc-lens add-bucket company s3://company-data-lakeStep 1: Define the Tools
CrewAI agents need tools. We wrap DataSpoc’s SDK into CrewAI-compatible tools:
from crewai.tools import toolfrom dataspoc_pipe import PipeClientfrom dataspoc_lens import LensClient
pipe = PipeClient(project_path="./my-lake")lens = LensClient(bucket_alias="company")
@tool("Check Pipeline Status")def check_pipeline_status(pipeline_name: str) -> str: """Check the status of a DataSpoc pipeline. Returns last run time and row count.""" status = pipe.status(pipeline_name) return ( f"Pipeline: {status.name}\n" f"Last run: {status.last_run}\n" f"Rows extracted: {status.rows_extracted}\n" f"Status: {status.state}\n" f"Stale: {status.is_stale}" )
@tool("Run Pipeline")def run_pipeline(pipeline_name: str) -> str: """Run a DataSpoc Pipe pipeline to extract fresh data.""" result = pipe.run(pipeline_name) return ( f"Pipeline '{pipeline_name}' completed.\n" f"Rows extracted: {result.rows_extracted}\n" f"Files written: {result.files_written}\n" f"Duration: {result.duration_seconds}s" )
@tool("List Pipelines")def list_pipelines() -> str: """List all available pipelines and their status.""" pipelines = pipe.list_pipelines() lines = [] for p in pipelines: lines.append(f"- {p.name} ({p.source_type}) | Last run: {p.last_run} | Stale: {p.is_stale}") return "\n".join(lines)
@tool("Run SQL Query")def run_sql_query(query: str) -> str: """Run a SQL query against the data lake using DuckDB via DataSpoc Lens.""" result = lens.sql(query) return result.to_markdown(max_rows=20)
@tool("List Tables")def list_tables() -> str: """List all tables available in the data lake.""" tables = lens.tables() lines = [] for t in tables: lines.append(f"- {t.layer}/{t.source}/{t.name} ({t.row_count} rows)") return "\n".join(lines)
@tool("AI Ask")def ai_ask(question: str) -> str: """Ask a natural language question about the data lake. Returns SQL + results.""" answer = lens.ask(question) return f"SQL:\n{answer.sql}\n\nResult:\n{answer.result.to_markdown()}"
@tool("Train Model")def train_model(table: str, target_column: str, model_name: str) -> str: """Train an ML model on a table from the data lake.""" result = lens.ml_train( table=table, target=target_column, model_name=model_name, ) return ( f"Model '{model_name}' trained.\n" f"Algorithm: {result.algorithm}\n" f"Metric (R2/Accuracy): {result.score:.4f}\n" f"Features used: {', '.join(result.features)}" )
@tool("Evaluate Model")def evaluate_model(model_name: str) -> str: """Evaluate a trained ML model and return metrics.""" metrics = lens.ml_metrics(model_name) lines = [f"Model: {model_name}"] for k, v in metrics.items(): lines.append(f" {k}: {v:.4f}" if isinstance(v, float) else f" {k}: {v}") return "\n".join(lines)Step 2: Define the Agents
Each agent has a clear role, goal, and set of tools:
from crewai import Agentfrom tools import ( check_pipeline_status, run_pipeline, list_pipelines, run_sql_query, list_tables, ai_ask, train_model, evaluate_model,)
de_agent = Agent( role="Data Engineer", goal="Ensure all data pipelines are fresh. Run any stale pipelines.", backstory=( "You are a senior Data Engineer responsible for data freshness. " "You check pipeline status and re-run any pipeline that is stale. " "You report what was refreshed and any errors." ), tools=[list_pipelines, check_pipeline_status, run_pipeline], verbose=True,)
da_agent = Agent( role="Data Analyst", goal="Explore fresh data and produce a summary report with key metrics.", backstory=( "You are a Data Analyst. After data is refreshed, you explore the tables, " "run SQL queries to compute key business metrics, and produce a brief report. " "You focus on revenue, customer counts, and trends." ), tools=[list_tables, run_sql_query, ai_ask], verbose=True,)
ml_agent = Agent( role="ML Engineer", goal="Train predictive models on fresh data and report model quality.", backstory=( "You are an ML Engineer. When data is fresh, you train models to predict " "business outcomes. You evaluate model quality and report the results. " "You only train when data is available and fresh." ), tools=[list_tables, run_sql_query, train_model, evaluate_model], verbose=True,)Step 3: Define Tasks and the Crew
from crewai import Crew, Task, Processfrom agents import de_agent, da_agent, ml_agent
# Task 1: Data Engineer refreshes pipelinesrefresh_task = Task( description=( "Check all pipelines. For each stale pipeline, run it to get fresh data. " "Report which pipelines were refreshed and how many rows were extracted." ), expected_output="A list of pipelines that were refreshed with row counts.", agent=de_agent,)
# Task 2: Data Analyst explores the dataanalysis_task = Task( description=( "List available tables. Run SQL queries to compute: " "1) Total revenue this month, " "2) Number of active customers, " "3) Top 5 products by revenue. " "Format results as a markdown report." ), expected_output="A markdown report with revenue, customer count, and top products.", agent=da_agent,)
# Task 3: ML Engineer trains a churn modelml_task = Task( description=( "Check if the 'curated/sales/customers' table exists. If it does, " "train a classification model to predict customer churn (target column: 'churned'). " "Name the model 'churn-predictor'. Report the accuracy." ), expected_output="Model training report with accuracy score and features used.", agent=ml_agent,)
# Assemble the crewdata_team = Crew( agents=[de_agent, da_agent, ml_agent], tasks=[refresh_task, analysis_task, ml_task], process=Process.sequential, # DE first, then DA, then ML verbose=True,)Step 4: Run the Crew
from crew import data_team
def main(): print("=== DataSpoc Multi-Agent Data Team ===\n") result = data_team.kickoff() print("\n=== Final Report ===") print(result)
if __name__ == "__main__": main()Run it:
python main.pyExample Execution Output
=== DataSpoc Multi-Agent Data Team ===
[DE Agent] Checking all pipelines...[DE Agent] Using tool: List Pipelines - postgres-orders (postgres) | Last run: 2026-04-14 18:00 | Stale: True - sheets-marketing (google_sheets) | Last run: 2026-04-15 09:00 | Stale: False - api-payments (rest_api) | Last run: 2026-04-14 12:00 | Stale: True
[DE Agent] Running stale pipeline: postgres-orders[DE Agent] Using tool: Run Pipeline Pipeline 'postgres-orders' completed. Rows extracted: 1,247. Duration: 8s
[DE Agent] Running stale pipeline: api-payments[DE Agent] Using tool: Run Pipeline Pipeline 'api-payments' completed. Rows extracted: 356. Duration: 3s
[DA Agent] Listing available tables...[DA Agent] Using tool: List Tables - raw/postgres/orders (45,892 rows) - raw/postgres/customers (3,201 rows) - curated/sales/customers (3,201 rows) - raw/api/payments (12,445 rows)
[DA Agent] Using tool: Run SQL Query SELECT SUM(amount) as revenue FROM raw.postgres.orders WHERE created_at >= '2026-04-01' → Revenue: $284,521.00
[DA Agent] Using tool: Run SQL Query SELECT COUNT(DISTINCT customer_id) as active_customers FROM raw.postgres.orders WHERE created_at >= '2026-03-15' → Active customers: 1,847
[ML Agent] Using tool: Train Model Training on curated/sales/customers, target=churned Model 'churn-predictor' trained. Algorithm: LightGBM. Accuracy: 0.8734
=== Final Report ===## Pipeline Refresh- postgres-orders: 1,247 new rows- api-payments: 356 new rows
## Business Metrics (April 2026)- Revenue: $284,521- Active customers: 1,847- Top product: Enterprise Plan ($98,200)
## ML Model- churn-predictor: 87.3% accuracy- Key features: days_since_last_order, total_spend, support_ticketsRunning on a Schedule
Wrap it with a simple cron or use any scheduler:
# Run every morning at 7am0 7 * * * cd /opt/data-team && python main.py >> /var/log/data-team.log 2>&1Or use the CrewAI built-in scheduling:
from crew import data_teamfrom crewai import Crew
# Run every 6 hoursdata_team.kickoff_async(schedule="0 */6 * * *")Adding a Slack Notification
Add a notification tool so the DA agent posts results to Slack:
import requestsfrom crewai.tools import tool
@tool("Post to Slack")def post_to_slack(message: str) -> str: """Post a message to the #data-reports Slack channel.""" webhook_url = os.environ["SLACK_WEBHOOK_URL"] requests.post(webhook_url, json={"text": message}) return "Posted to Slack successfully."Then add it to the DA agent’s tools list and update the task to include “Post the report to Slack.”
Key Takeaways
- CrewAI handles orchestration — you define agents, roles, and tasks
- DataSpoc handles data — pipelines, queries, and ML via Python SDK
- Sequential process ensures DE runs before DA, DA before ML
- Each agent is focused — small tool set, clear goal, specific backstory
- The bucket is the contract — agents communicate through data, not direct calls
This is a starting point. You can add more agents (QA agent that validates data quality, Ops agent that monitors costs) or switch to Process.hierarchical for a manager agent that delegates dynamically.