Skip to content

Agent Integration

AI agents can manage your data pipelines autonomously through DataSpoc Pipe. Connect any MCP-compatible client to list, run, monitor, and troubleshoot pipelines without leaving the conversation. This guide covers every integration path: MCP, Python SDK, and framework-specific examples.

Terminal window
pip install dataspoc-pipe[mcp]
Terminal window
dataspoc-pipe mcp

The server uses stdio transport and is designed to be launched by an MCP client, not run manually.

Add to claude_desktop_config.json:

  • macOS: ~/Library/Application Support/Claude/claude_desktop_config.json
  • Linux: ~/.config/Claude/claude_desktop_config.json
{
"mcpServers": {
"dataspoc-pipe": {
"command": "dataspoc-pipe",
"args": ["mcp"],
"env": {
"DATASPOC_BUCKET": "s3://my-data"
}
}
}
}

If installed in a virtual environment, use the full path:

{
"mcpServers": {
"dataspoc-pipe": {
"command": "/home/you/.venv/bin/dataspoc-pipe",
"args": ["mcp"]
}
}
}

Restart Claude Desktop after saving.

Add to your Cursor MCP settings (Settings > MCP Servers):

{
"mcpServers": {
"dataspoc-pipe": {
"command": "dataspoc-pipe",
"args": ["mcp"]
}
}
}

Add to ~/.claude.json:

{
"mcpServers": {
"dataspoc-pipe": {
"command": "dataspoc-pipe",
"args": ["mcp"]
}
}
}

Or add it per-project in .claude/settings.json:

{
"mcpServers": {
"dataspoc-pipe": {
"command": "dataspoc-pipe",
"args": ["mcp"]
}
}
}

The Pipe MCP server exposes 7 tools:

ToolDescriptionParameters
list_pipelinesList all configured pipeline namesNone
pipeline_configReturn full pipeline configuration as JSONname (string)
run_pipelineExecute an extraction pipelinename (string), full (boolean, optional)
pipeline_statusReturn status for all pipelinesNone
pipeline_logsReturn latest execution logname (string)
show_manifestReturn bucket catalogbucket (string)
validate_pipelineCheck bucket and tap connectivityname (string)

List all configured pipeline names.

Parameters: None

Returns: One pipeline name per line, or “No pipelines configured.”

Example call:

list_pipelines()

Example response:

orders
customers
events

Return the full configuration of a pipeline.

Parameters: name (string, required) — pipeline name

Returns: JSON with source, destination, incremental, and schedule configuration.

Example call:

pipeline_config(name="orders")

Example response:

{
"name": "orders",
"source": {
"tap": "tap-csv",
"config": {
"csv_files_definition": {
"orders": {"path": "/data/orders.csv", "delimiter": ","}
}
}
},
"destination": {
"bucket": "s3://my-lake",
"layer": "raw",
"domain": "sales"
},
"incremental": {
"enabled": true,
"key": "updated_at"
},
"schedule": "0 2 * * *"
}

Execute a pipeline extraction.

Parameters:

  • name (string, required) — pipeline name
  • full (boolean, optional) — force full extraction, ignoring incremental state. Default: false

Returns: JSON with success, streams (record counts per stream), and error.

Example call:

run_pipeline(name="orders")

Example response:

{
"success": true,
"streams": {
"orders": 5200
},
"error": null
}

Full extraction example:

run_pipeline(name="orders", full=true)

Example response:

{
"success": true,
"streams": {
"orders": 148320
},
"error": null
}

Return status for all configured pipelines.

Parameters: None

Returns: JSON array with name, last_run, status, duration, records for each pipeline.

Example call:

pipeline_status()

Example response:

[
{
"name": "orders",
"last_run": "2026-04-15T02:00:12",
"status": "success",
"duration": 3.2,
"records": 5200
},
{
"name": "customers",
"last_run": "2026-04-15T01:30:45",
"status": "success",
"duration": 12.5,
"records": 12400
},
{
"name": "events",
"last_run": null,
"status": "no runs",
"duration": null,
"records": null
}
]

Return the latest execution log for a pipeline.

Parameters: name (string, required) — pipeline name

Returns: JSON with full execution log, or “No logs found.”

Example call:

pipeline_logs(name="orders")

Example response:

{
"pipeline": "orders",
"started_at": "2026-04-15T02:00:12",
"finished_at": "2026-04-15T02:00:15",
"status": "success",
"duration_seconds": 3.2,
"total_records": 5200,
"streams": {
"orders": {
"records": 5200,
"bytes": 1048576
}
},
"error": null
}

Error example:

{
"pipeline": "customers",
"started_at": "2026-04-15T01:30:00",
"finished_at": "2026-04-15T01:30:02",
"status": "failed",
"duration_seconds": 2.1,
"total_records": 0,
"streams": {},
"error": "ConnectionRefusedError: [Errno 111] Connection refused -- is PostgreSQL running on db.example.com:5432?"
}

Return the manifest (catalog) of a bucket.

Parameters: bucket (string, required) — bucket URI (e.g., s3://my-lake, file:///tmp/lake)

Returns: JSON with table catalog including schemas, timestamps, and row counts.

Example call:

show_manifest(bucket="s3://my-lake")

Example response:

{
"bucket": "s3://my-lake",
"tables": {
"raw/sales/orders": {
"columns": [
{"name": "order_id", "type": "INTEGER"},
{"name": "customer_id", "type": "INTEGER"},
{"name": "order_date", "type": "DATE"},
{"name": "total", "type": "DOUBLE"},
{"name": "status", "type": "VARCHAR"}
],
"total_records": 148320,
"last_updated": "2026-04-15T02:00:15",
"files": 12
},
"raw/sales/customers": {
"columns": [
{"name": "id", "type": "INTEGER"},
{"name": "name", "type": "VARCHAR"},
{"name": "email", "type": "VARCHAR"}
],
"total_records": 12400,
"last_updated": "2026-04-15T01:30:57",
"files": 2
}
}
}

Validate bucket connectivity and tap availability for a pipeline.

Parameters: name (string, required) — pipeline name

Returns: JSON with pipeline, bucket_ok, tap_ok, and errors.

Example call:

validate_pipeline(name="orders")

Example response (healthy):

{
"pipeline": "orders",
"bucket_ok": true,
"tap_ok": true,
"errors": []
}

Example response (problems):

{
"pipeline": "customers",
"bucket_ok": true,
"tap_ok": false,
"errors": [
"tap-postgres not found in PATH. Install with: pip install tap-postgres"
]
}

An MCP resource that returns a JSON array of all pipelines with their tap and bucket:

[
{"name": "orders", "tap": "tap-csv", "bucket": "s3://my-lake"},
{"name": "customers", "tap": "tap-postgres", "bucket": "s3://my-lake"},
{"name": "events", "tap": "tap-kafka", "bucket": "s3://my-lake"}
]

Agents can read this resource to understand the full pipeline landscape before taking action.

1. Check all pipelines and run any that haven’t run today

Section titled “1. Check all pipelines and run any that haven’t run today”

You: Check all pipelines and run any that haven’t run today.

Agent calls pipeline_status():

[
{"name": "orders", "last_run": "2026-04-15T02:00:12", "status": "success", "duration": 3.2, "records": 5200},
{"name": "customers", "last_run": "2026-04-14T01:30:45", "status": "success", "duration": 12.5, "records": 12400},
{"name": "events", "last_run": null, "status": "no runs", "duration": null, "records": null}
]

I can see that orders ran today at 02:00, but customers last ran yesterday and events has never run. Let me run both.

Agent calls run_pipeline(name="customers"):

{"success": true, "streams": {"customers": 12480}, "error": null}

Agent calls run_pipeline(name="events"):

{"success": true, "streams": {"page_views": 84200, "clicks": 22100}, "error": null}

Done. Here is the summary:

  • orders: Already ran today (5,200 records)
  • customers: Just ran — 12,480 records extracted
  • events: Just ran — 106,300 records across 2 streams (page_views, clicks)

2. The sales pipeline failed — show me the error and help me fix it

Section titled “2. The sales pipeline failed — show me the error and help me fix it”

You: The sales pipeline failed. Show me the error and help me fix it.

Agent calls pipeline_logs(name="sales"):

{
"pipeline": "sales",
"status": "failed",
"error": "ConnectionRefusedError: [Errno 111] Connection refused -- is PostgreSQL running on db.internal.com:5432?",
"started_at": "2026-04-15T02:00:00",
"finished_at": "2026-04-15T02:00:01",
"total_records": 0
}

The sales pipeline failed because it could not connect to PostgreSQL at db.internal.com:5432. Let me check the configuration.

Agent calls pipeline_config(name="sales"):

{
"name": "sales",
"source": {
"tap": "tap-postgres",
"config": {"host": "db.internal.com", "port": 5432, "dbname": "sales_prod"}
},
"destination": {"bucket": "s3://my-lake", "layer": "raw", "domain": "sales"}
}

The pipeline is configured to connect to db.internal.com:5432. The error means either:

  1. The PostgreSQL server is down
  2. A firewall is blocking port 5432
  3. The hostname is wrong

Check that the database is running (pg_isready -h db.internal.com -p 5432), then let me validate the full pipeline.

Agent calls validate_pipeline(name="sales"):

{"pipeline": "sales", "bucket_ok": true, "tap_ok": true, "errors": []}

The bucket and tap are fine — the issue is the database server itself. Once PostgreSQL is back online, say “run the sales pipeline” and I will re-trigger it.

3. Set up a new pipeline for our PostgreSQL database

Section titled “3. Set up a new pipeline for our PostgreSQL database”

You: Set up a new pipeline for our PostgreSQL database at db.example.com. We want to ingest the public.orders and public.customers tables.

Agent: To set up a new pipeline for your PostgreSQL database, create a config file at ~/.dataspoc-pipe/pipelines/postgres_main.yaml:

name: postgres_main
source:
tap: tap-postgres
config:
host: db.example.com
port: 5432
user: ${POSTGRES_USER}
password: ${POSTGRES_PASSWORD}
dbname: mydb
filter_schemas: ["public"]
filter_tables: ["orders", "customers"]
destination:
bucket: s3://my-lake
layer: raw
domain: postgres_main
incremental:
enabled: true
key: updated_at

Make sure the tap is installed:

Terminal window
pip install tap-postgres

Set the environment variables:

Terminal window
export POSTGRES_USER=your_user
export POSTGRES_PASSWORD=your_password

Let me validate the new pipeline.

Agent calls validate_pipeline(name="postgres_main"):

{"pipeline": "postgres_main", "bucket_ok": true, "tap_ok": true, "errors": []}

The pipeline is valid. Want me to run it now?

You: Yes, run it.

Agent calls run_pipeline(name="postgres_main"):

{"success": true, "streams": {"orders": 34500, "customers": 8200}, "error": null}

Pipeline postgres_main completed successfully:

  • orders: 34,500 records
  • customers: 8,200 records

The data is now in s3://my-lake/raw/postgres_main/. You can query it with DataSpoc Lens.

Use PipeClient to build custom agents that manage pipelines programmatically.

from dataspoc_pipe.sdk import PipeClient
client = PipeClient()
names = client.pipelines()
print(names)
# ['orders', 'customers', 'events']
cfg = client.config("orders")
print(cfg["source"]["tap"]) # 'tap-csv'
print(cfg["destination"]["bucket"]) # 's3://my-lake'
result = client.run("orders")
if result["success"]:
total = sum(result["streams"].values())
print(f"Extracted {total} records")
else:
print(f"Failed: {result['error']}")
for p in client.status():
print(f"{p['name']}: {p['status']} -- {p['records']} records")
log = client.logs("orders")
if log:
print(f"Status: {log['status']}, Duration: {log['duration_seconds']}s")
manifest = client.manifest("s3://my-lake")
for table, info in manifest.get("tables", {}).items():
print(f"{table}: {info.get('total_records', 0)} records")
result = client.validate("orders")
if result["bucket_ok"] and result["tap_ok"]:
print("Pipeline is ready")
else:
print(f"Issues: {result['errors']}")
from dataspoc_pipe.sdk import PipeClient
from datetime import datetime, timedelta
client = PipeClient()
today = datetime.now().date()
for p in client.status():
# Skip pipelines that already ran today
if p["last_run"]:
last = datetime.fromisoformat(p["last_run"]).date()
if last >= today:
print(f"SKIP {p['name']}: already ran today")
continue
# Validate before running
check = client.validate(p["name"])
if not check["bucket_ok"] or not check["tap_ok"]:
print(f"SKIP {p['name']}: {check['errors']}")
continue
# Run
result = client.run(p["name"])
if result["success"]:
total = sum(result["streams"].values())
print(f"OK {p['name']}: {total} records")
else:
print(f"FAIL {p['name']}: {result['error']}")

Build a CrewAI agent that monitors pipelines and reacts to failures.

from crewai import Agent, Task, Crew
from crewai.tools import tool
from dataspoc_pipe.sdk import PipeClient
client = PipeClient()
@tool
def list_pipelines() -> str:
"""List all configured pipeline names."""
return "\n".join(client.pipelines())
@tool
def check_pipeline_status() -> str:
"""Return status of all pipelines as formatted text."""
lines = []
for p in client.status():
lines.append(
f"{p['name']}: {p['status']} | "
f"last_run={p['last_run']} | "
f"records={p['records']}"
)
return "\n".join(lines)
@tool
def run_pipeline(name: str) -> str:
"""Run a pipeline by name and return the result."""
result = client.run(name)
if result["success"]:
total = sum(result["streams"].values())
return f"Success: {total} records extracted"
return f"Failed: {result['error']}"
@tool
def get_pipeline_logs(name: str) -> str:
"""Get the latest execution log for a pipeline."""
log = client.logs(name)
if not log:
return "No logs found"
return (
f"Status: {log['status']}\n"
f"Duration: {log['duration_seconds']}s\n"
f"Records: {log['total_records']}\n"
f"Error: {log.get('error', 'None')}"
)
pipeline_monitor = Agent(
role="Pipeline Monitor",
goal="Monitor data pipelines and ensure they are running correctly",
backstory=(
"You are a data operations agent responsible for monitoring "
"DataSpoc Pipe pipelines. You check status, identify failures, "
"and re-run pipelines when needed."
),
tools=[list_pipelines, check_pipeline_status, run_pipeline, get_pipeline_logs],
verbose=True,
)
daily_check = Task(
description=(
"Check the status of all pipelines. "
"If any pipeline failed or hasn't run today, investigate the logs "
"and re-run it. Report a summary of all pipeline statuses."
),
expected_output="A summary of all pipeline statuses and actions taken.",
agent=pipeline_monitor,
)
crew = Crew(agents=[pipeline_monitor], tasks=[daily_check], verbose=True)
result = crew.kickoff()
print(result)

Build a LangGraph workflow that checks and runs pipelines.

from langgraph.graph import StateGraph, END
from typing import TypedDict
from dataspoc_pipe.sdk import PipeClient
client = PipeClient()
class PipelineState(TypedDict):
pipelines: list[dict]
stale: list[str]
results: list[dict]
summary: str
def check_status(state: PipelineState) -> PipelineState:
"""Check all pipeline statuses."""
from datetime import datetime
today = datetime.now().date()
statuses = client.status()
stale = []
for p in statuses:
if not p["last_run"]:
stale.append(p["name"])
else:
last = datetime.fromisoformat(p["last_run"]).date()
if last < today:
stale.append(p["name"])
return {**state, "pipelines": statuses, "stale": stale, "results": []}
def run_stale(state: PipelineState) -> PipelineState:
"""Run all stale pipelines."""
results = []
for name in state["stale"]:
result = client.run(name)
results.append({"name": name, **result})
return {**state, "results": results}
def summarize(state: PipelineState) -> PipelineState:
"""Generate a summary."""
lines = []
for r in state["results"]:
if r["success"]:
total = sum(r["streams"].values())
lines.append(f"{r['name']}: OK ({total} records)")
else:
lines.append(f"{r['name']}: FAILED ({r['error']})")
summary = "\n".join(lines) if lines else "All pipelines are up to date."
return {**state, "summary": summary}
def should_run(state: PipelineState) -> str:
return "run_stale" if state["stale"] else "summarize"
graph = StateGraph(PipelineState)
graph.add_node("check_status", check_status)
graph.add_node("run_stale", run_stale)
graph.add_node("summarize", summarize)
graph.set_entry_point("check_status")
graph.add_conditional_edges("check_status", should_run)
graph.add_edge("run_stale", "summarize")
graph.add_edge("summarize", END)
app = graph.compile()
result = app.invoke({"pipelines": [], "stale": [], "results": [], "summary": ""})
print(result["summary"])

All Pipe CLI commands support --output json for machine-readable output. This is useful for shell-based agents or scripts that parse results with jq.

Terminal window
dataspoc-pipe list --output json
["orders", "customers", "events"]
Terminal window
dataspoc-pipe run orders --output json | jq '.success'
true
Terminal window
dataspoc-pipe run orders --output json | jq '.streams'
{"orders": 5200}
Terminal window
dataspoc-pipe status --output json | jq '.[] | {name, status, records}'
{"name": "orders", "status": "success", "records": 5200}
{"name": "customers", "status": "success", "records": 12400}
{"name": "events", "status": "no runs", "records": null}
Terminal window
dataspoc-pipe status --output json | jq '[.[] | select(.status == "failed")]'
Terminal window
dataspoc-pipe logs customers --output json | jq '.error'
#!/bin/bash
TODAY=$(date +%Y-%m-%d)
dataspoc-pipe status --output json | jq -r '.[] | select(.last_run == null or (.last_run | split("T")[0]) < "'"$TODAY"'") | .name' | while read -r pipeline; do
echo "Running: $pipeline"
result=$(dataspoc-pipe run "$pipeline" --output json)
success=$(echo "$result" | jq '.success')
if [ "$success" = "true" ]; then
records=$(echo "$result" | jq '[.streams[]] | add')
echo " OK: $records records"
else
error=$(echo "$result" | jq -r '.error')
echo " FAILED: $error"
fi
done

Always check the success field before acting on pipeline results. Agents should report errors clearly and suggest remediation steps rather than silently retrying.

result = client.run("orders")
if not result["success"]:
# Log the error, notify, or investigate -- don't just retry blindly
log = client.logs("orders")
print(f"Pipeline failed: {result['error']}")
print(f"Full log: {log}")

Pipeline execution time depends on the data source. Set appropriate timeouts in your agent framework. For MCP clients, the default timeout is usually sufficient, but long-running extractions (full re-extractions of large databases) may need extended timeouts.

Incremental pipelines are safe to run multiple times — they only extract new or updated records. Full extractions (full=True) re-extract everything, so use them intentionally. Agents should prefer incremental runs unless explicitly asked for a full extraction.

Always call validate_pipeline before run_pipeline when automating pipelines. This catches configuration issues (missing taps, unreachable buckets) before wasting time on a doomed extraction.

If your agent manages many pipelines, avoid running all of them simultaneously. Run them sequentially or in small batches to avoid overwhelming source databases or exceeding cloud API rate limits.

Pipeline configurations may contain connection strings or references to environment variables. Instruct your agent not to log or display sensitive fields from pipeline_config results.