Agent Integration
Este conteúdo não está disponível em sua língua ainda.
AI agents can manage your data pipelines autonomously through DataSpoc Pipe. Connect any MCP-compatible client to list, run, monitor, and troubleshoot pipelines without leaving the conversation. This guide covers every integration path: MCP, Python SDK, and framework-specific examples.
MCP Server Quick Setup
Section titled “MCP Server Quick Setup”Install
Section titled “Install”pip install dataspoc-pipe[mcp]Start the server
Section titled “Start the server”dataspoc-pipe mcpThe server uses stdio transport and is designed to be launched by an MCP client, not run manually.
Claude Desktop
Section titled “Claude Desktop”Add to claude_desktop_config.json:
- macOS:
~/Library/Application Support/Claude/claude_desktop_config.json - Linux:
~/.config/Claude/claude_desktop_config.json
{ "mcpServers": { "dataspoc-pipe": { "command": "dataspoc-pipe", "args": ["mcp"], "env": { "DATASPOC_BUCKET": "s3://my-data" } } }}If installed in a virtual environment, use the full path:
{ "mcpServers": { "dataspoc-pipe": { "command": "/home/you/.venv/bin/dataspoc-pipe", "args": ["mcp"] } }}Restart Claude Desktop after saving.
Cursor
Section titled “Cursor”Add to your Cursor MCP settings (Settings > MCP Servers):
{ "mcpServers": { "dataspoc-pipe": { "command": "dataspoc-pipe", "args": ["mcp"] } }}Claude Code
Section titled “Claude Code”Add to ~/.claude.json:
{ "mcpServers": { "dataspoc-pipe": { "command": "dataspoc-pipe", "args": ["mcp"] } }}Or add it per-project in .claude/settings.json:
{ "mcpServers": { "dataspoc-pipe": { "command": "dataspoc-pipe", "args": ["mcp"] } }}MCP Tools Reference
Section titled “MCP Tools Reference”The Pipe MCP server exposes 7 tools:
| Tool | Description | Parameters |
|---|---|---|
list_pipelines | List all configured pipeline names | None |
pipeline_config | Return full pipeline configuration as JSON | name (string) |
run_pipeline | Execute an extraction pipeline | name (string), full (boolean, optional) |
pipeline_status | Return status for all pipelines | None |
pipeline_logs | Return latest execution log | name (string) |
show_manifest | Return bucket catalog | bucket (string) |
validate_pipeline | Check bucket and tap connectivity | name (string) |
list_pipelines
Section titled “list_pipelines”List all configured pipeline names.
Parameters: None
Returns: One pipeline name per line, or “No pipelines configured.”
Example call:
list_pipelines()Example response:
orderscustomerseventspipeline_config
Section titled “pipeline_config”Return the full configuration of a pipeline.
Parameters: name (string, required) — pipeline name
Returns: JSON with source, destination, incremental, and schedule configuration.
Example call:
pipeline_config(name="orders")Example response:
{ "name": "orders", "source": { "tap": "tap-csv", "config": { "csv_files_definition": { "orders": {"path": "/data/orders.csv", "delimiter": ","} } } }, "destination": { "bucket": "s3://my-lake", "layer": "raw", "domain": "sales" }, "incremental": { "enabled": true, "key": "updated_at" }, "schedule": "0 2 * * *"}run_pipeline
Section titled “run_pipeline”Execute a pipeline extraction.
Parameters:
name(string, required) — pipeline namefull(boolean, optional) — force full extraction, ignoring incremental state. Default:false
Returns: JSON with success, streams (record counts per stream), and error.
Example call:
run_pipeline(name="orders")Example response:
{ "success": true, "streams": { "orders": 5200 }, "error": null}Full extraction example:
run_pipeline(name="orders", full=true)Example response:
{ "success": true, "streams": { "orders": 148320 }, "error": null}pipeline_status
Section titled “pipeline_status”Return status for all configured pipelines.
Parameters: None
Returns: JSON array with name, last_run, status, duration, records for each pipeline.
Example call:
pipeline_status()Example response:
[ { "name": "orders", "last_run": "2026-04-15T02:00:12", "status": "success", "duration": 3.2, "records": 5200 }, { "name": "customers", "last_run": "2026-04-15T01:30:45", "status": "success", "duration": 12.5, "records": 12400 }, { "name": "events", "last_run": null, "status": "no runs", "duration": null, "records": null }]pipeline_logs
Section titled “pipeline_logs”Return the latest execution log for a pipeline.
Parameters: name (string, required) — pipeline name
Returns: JSON with full execution log, or “No logs found.”
Example call:
pipeline_logs(name="orders")Example response:
{ "pipeline": "orders", "started_at": "2026-04-15T02:00:12", "finished_at": "2026-04-15T02:00:15", "status": "success", "duration_seconds": 3.2, "total_records": 5200, "streams": { "orders": { "records": 5200, "bytes": 1048576 } }, "error": null}Error example:
{ "pipeline": "customers", "started_at": "2026-04-15T01:30:00", "finished_at": "2026-04-15T01:30:02", "status": "failed", "duration_seconds": 2.1, "total_records": 0, "streams": {}, "error": "ConnectionRefusedError: [Errno 111] Connection refused -- is PostgreSQL running on db.example.com:5432?"}show_manifest
Section titled “show_manifest”Return the manifest (catalog) of a bucket.
Parameters: bucket (string, required) — bucket URI (e.g., s3://my-lake, file:///tmp/lake)
Returns: JSON with table catalog including schemas, timestamps, and row counts.
Example call:
show_manifest(bucket="s3://my-lake")Example response:
{ "bucket": "s3://my-lake", "tables": { "raw/sales/orders": { "columns": [ {"name": "order_id", "type": "INTEGER"}, {"name": "customer_id", "type": "INTEGER"}, {"name": "order_date", "type": "DATE"}, {"name": "total", "type": "DOUBLE"}, {"name": "status", "type": "VARCHAR"} ], "total_records": 148320, "last_updated": "2026-04-15T02:00:15", "files": 12 }, "raw/sales/customers": { "columns": [ {"name": "id", "type": "INTEGER"}, {"name": "name", "type": "VARCHAR"}, {"name": "email", "type": "VARCHAR"} ], "total_records": 12400, "last_updated": "2026-04-15T01:30:57", "files": 2 } }}validate_pipeline
Section titled “validate_pipeline”Validate bucket connectivity and tap availability for a pipeline.
Parameters: name (string, required) — pipeline name
Returns: JSON with pipeline, bucket_ok, tap_ok, and errors.
Example call:
validate_pipeline(name="orders")Example response (healthy):
{ "pipeline": "orders", "bucket_ok": true, "tap_ok": true, "errors": []}Example response (problems):
{ "pipeline": "customers", "bucket_ok": true, "tap_ok": false, "errors": [ "tap-postgres not found in PATH. Install with: pip install tap-postgres" ]}MCP Resource
Section titled “MCP Resource”pipe://pipelines
Section titled “pipe://pipelines”An MCP resource that returns a JSON array of all pipelines with their tap and bucket:
[ {"name": "orders", "tap": "tap-csv", "bucket": "s3://my-lake"}, {"name": "customers", "tap": "tap-postgres", "bucket": "s3://my-lake"}, {"name": "events", "tap": "tap-kafka", "bucket": "s3://my-lake"}]Agents can read this resource to understand the full pipeline landscape before taking action.
Example Agent Conversations
Section titled “Example Agent Conversations”1. Check all pipelines and run any that haven’t run today
Section titled “1. Check all pipelines and run any that haven’t run today”You: Check all pipelines and run any that haven’t run today.
Agent calls
pipeline_status():[{"name": "orders", "last_run": "2026-04-15T02:00:12", "status": "success", "duration": 3.2, "records": 5200},{"name": "customers", "last_run": "2026-04-14T01:30:45", "status": "success", "duration": 12.5, "records": 12400},{"name": "events", "last_run": null, "status": "no runs", "duration": null, "records": null}]I can see that orders ran today at 02:00, but customers last ran yesterday and events has never run. Let me run both.
Agent calls
run_pipeline(name="customers"):{"success": true, "streams": {"customers": 12480}, "error": null}
Agent calls
run_pipeline(name="events"):{"success": true, "streams": {"page_views": 84200, "clicks": 22100}, "error": null}Done. Here is the summary:
- orders: Already ran today (5,200 records)
- customers: Just ran — 12,480 records extracted
- events: Just ran — 106,300 records across 2 streams (page_views, clicks)
2. The sales pipeline failed — show me the error and help me fix it
Section titled “2. The sales pipeline failed — show me the error and help me fix it”You: The sales pipeline failed. Show me the error and help me fix it.
Agent calls
pipeline_logs(name="sales"):{"pipeline": "sales","status": "failed","error": "ConnectionRefusedError: [Errno 111] Connection refused -- is PostgreSQL running on db.internal.com:5432?","started_at": "2026-04-15T02:00:00","finished_at": "2026-04-15T02:00:01","total_records": 0}The sales pipeline failed because it could not connect to PostgreSQL at
db.internal.com:5432. Let me check the configuration.
Agent calls
pipeline_config(name="sales"):{"name": "sales","source": {"tap": "tap-postgres","config": {"host": "db.internal.com", "port": 5432, "dbname": "sales_prod"}},"destination": {"bucket": "s3://my-lake", "layer": "raw", "domain": "sales"}}The pipeline is configured to connect to
db.internal.com:5432. The error means either:
- The PostgreSQL server is down
- A firewall is blocking port 5432
- The hostname is wrong
Check that the database is running (
pg_isready -h db.internal.com -p 5432), then let me validate the full pipeline.
Agent calls
validate_pipeline(name="sales"):{"pipeline": "sales", "bucket_ok": true, "tap_ok": true, "errors": []}The bucket and tap are fine — the issue is the database server itself. Once PostgreSQL is back online, say “run the sales pipeline” and I will re-trigger it.
3. Set up a new pipeline for our PostgreSQL database
Section titled “3. Set up a new pipeline for our PostgreSQL database”You: Set up a new pipeline for our PostgreSQL database at db.example.com. We want to ingest the
public.ordersandpublic.customerstables.
Agent: To set up a new pipeline for your PostgreSQL database, create a config file at
~/.dataspoc-pipe/pipelines/postgres_main.yaml:name: postgres_mainsource:tap: tap-postgresconfig:host: db.example.comport: 5432user: ${POSTGRES_USER}password: ${POSTGRES_PASSWORD}dbname: mydbfilter_schemas: ["public"]filter_tables: ["orders", "customers"]destination:bucket: s3://my-lakelayer: rawdomain: postgres_mainincremental:enabled: truekey: updated_atMake sure the tap is installed:
Terminal window pip install tap-postgresSet the environment variables:
Terminal window export POSTGRES_USER=your_userexport POSTGRES_PASSWORD=your_passwordLet me validate the new pipeline.
Agent calls
validate_pipeline(name="postgres_main"):{"pipeline": "postgres_main", "bucket_ok": true, "tap_ok": true, "errors": []}The pipeline is valid. Want me to run it now?
You: Yes, run it.
Agent calls
run_pipeline(name="postgres_main"):{"success": true, "streams": {"orders": 34500, "customers": 8200}, "error": null}Pipeline postgres_main completed successfully:
orders: 34,500 recordscustomers: 8,200 recordsThe data is now in
s3://my-lake/raw/postgres_main/. You can query it with DataSpoc Lens.
Python SDK for Agents
Section titled “Python SDK for Agents”Use PipeClient to build custom agents that manage pipelines programmatically.
from dataspoc_pipe.sdk import PipeClient
client = PipeClient()List pipelines
Section titled “List pipelines”names = client.pipelines()print(names)# ['orders', 'customers', 'events']Get pipeline configuration
Section titled “Get pipeline configuration”cfg = client.config("orders")print(cfg["source"]["tap"]) # 'tap-csv'print(cfg["destination"]["bucket"]) # 's3://my-lake'Run a pipeline
Section titled “Run a pipeline”result = client.run("orders")if result["success"]: total = sum(result["streams"].values()) print(f"Extracted {total} records")else: print(f"Failed: {result['error']}")Check pipeline status
Section titled “Check pipeline status”for p in client.status(): print(f"{p['name']}: {p['status']} -- {p['records']} records")Get execution logs
Section titled “Get execution logs”log = client.logs("orders")if log: print(f"Status: {log['status']}, Duration: {log['duration_seconds']}s")Read bucket manifest
Section titled “Read bucket manifest”manifest = client.manifest("s3://my-lake")for table, info in manifest.get("tables", {}).items(): print(f"{table}: {info.get('total_records', 0)} records")Validate a pipeline
Section titled “Validate a pipeline”result = client.validate("orders")if result["bucket_ok"] and result["tap_ok"]: print("Pipeline is ready")else: print(f"Issues: {result['errors']}")Full example: run all stale pipelines
Section titled “Full example: run all stale pipelines”from dataspoc_pipe.sdk import PipeClientfrom datetime import datetime, timedelta
client = PipeClient()today = datetime.now().date()
for p in client.status(): # Skip pipelines that already ran today if p["last_run"]: last = datetime.fromisoformat(p["last_run"]).date() if last >= today: print(f"SKIP {p['name']}: already ran today") continue
# Validate before running check = client.validate(p["name"]) if not check["bucket_ok"] or not check["tap_ok"]: print(f"SKIP {p['name']}: {check['errors']}") continue
# Run result = client.run(p["name"]) if result["success"]: total = sum(result["streams"].values()) print(f"OK {p['name']}: {total} records") else: print(f"FAIL {p['name']}: {result['error']}")CrewAI Integration Example
Section titled “CrewAI Integration Example”Build a CrewAI agent that monitors pipelines and reacts to failures.
from crewai import Agent, Task, Crewfrom crewai.tools import toolfrom dataspoc_pipe.sdk import PipeClient
client = PipeClient()
@tooldef list_pipelines() -> str: """List all configured pipeline names.""" return "\n".join(client.pipelines())
@tooldef check_pipeline_status() -> str: """Return status of all pipelines as formatted text.""" lines = [] for p in client.status(): lines.append( f"{p['name']}: {p['status']} | " f"last_run={p['last_run']} | " f"records={p['records']}" ) return "\n".join(lines)
@tooldef run_pipeline(name: str) -> str: """Run a pipeline by name and return the result.""" result = client.run(name) if result["success"]: total = sum(result["streams"].values()) return f"Success: {total} records extracted" return f"Failed: {result['error']}"
@tooldef get_pipeline_logs(name: str) -> str: """Get the latest execution log for a pipeline.""" log = client.logs(name) if not log: return "No logs found" return ( f"Status: {log['status']}\n" f"Duration: {log['duration_seconds']}s\n" f"Records: {log['total_records']}\n" f"Error: {log.get('error', 'None')}" )
pipeline_monitor = Agent( role="Pipeline Monitor", goal="Monitor data pipelines and ensure they are running correctly", backstory=( "You are a data operations agent responsible for monitoring " "DataSpoc Pipe pipelines. You check status, identify failures, " "and re-run pipelines when needed." ), tools=[list_pipelines, check_pipeline_status, run_pipeline, get_pipeline_logs], verbose=True,)
daily_check = Task( description=( "Check the status of all pipelines. " "If any pipeline failed or hasn't run today, investigate the logs " "and re-run it. Report a summary of all pipeline statuses." ), expected_output="A summary of all pipeline statuses and actions taken.", agent=pipeline_monitor,)
crew = Crew(agents=[pipeline_monitor], tasks=[daily_check], verbose=True)result = crew.kickoff()print(result)LangGraph Integration Example
Section titled “LangGraph Integration Example”Build a LangGraph workflow that checks and runs pipelines.
from langgraph.graph import StateGraph, ENDfrom typing import TypedDictfrom dataspoc_pipe.sdk import PipeClient
client = PipeClient()
class PipelineState(TypedDict): pipelines: list[dict] stale: list[str] results: list[dict] summary: str
def check_status(state: PipelineState) -> PipelineState: """Check all pipeline statuses.""" from datetime import datetime today = datetime.now().date()
statuses = client.status() stale = [] for p in statuses: if not p["last_run"]: stale.append(p["name"]) else: last = datetime.fromisoformat(p["last_run"]).date() if last < today: stale.append(p["name"])
return {**state, "pipelines": statuses, "stale": stale, "results": []}
def run_stale(state: PipelineState) -> PipelineState: """Run all stale pipelines.""" results = [] for name in state["stale"]: result = client.run(name) results.append({"name": name, **result}) return {**state, "results": results}
def summarize(state: PipelineState) -> PipelineState: """Generate a summary.""" lines = [] for r in state["results"]: if r["success"]: total = sum(r["streams"].values()) lines.append(f"{r['name']}: OK ({total} records)") else: lines.append(f"{r['name']}: FAILED ({r['error']})") summary = "\n".join(lines) if lines else "All pipelines are up to date." return {**state, "summary": summary}
def should_run(state: PipelineState) -> str: return "run_stale" if state["stale"] else "summarize"
graph = StateGraph(PipelineState)graph.add_node("check_status", check_status)graph.add_node("run_stale", run_stale)graph.add_node("summarize", summarize)
graph.set_entry_point("check_status")graph.add_conditional_edges("check_status", should_run)graph.add_edge("run_stale", "summarize")graph.add_edge("summarize", END)
app = graph.compile()result = app.invoke({"pipelines": [], "stale": [], "results": [], "summary": ""})print(result["summary"])JSON Output for Shell Agents
Section titled “JSON Output for Shell Agents”All Pipe CLI commands support --output json for machine-readable output. This is useful for shell-based agents or scripts that parse results with jq.
List pipelines
Section titled “List pipelines”dataspoc-pipe list --output json["orders", "customers", "events"]Run a pipeline and check the result
Section titled “Run a pipeline and check the result”dataspoc-pipe run orders --output json | jq '.success'trueGet record counts per stream
Section titled “Get record counts per stream”dataspoc-pipe run orders --output json | jq '.streams'{"orders": 5200}Check status of all pipelines
Section titled “Check status of all pipelines”dataspoc-pipe status --output json | jq '.[] | {name, status, records}'{"name": "orders", "status": "success", "records": 5200}{"name": "customers", "status": "success", "records": 12400}{"name": "events", "status": "no runs", "records": null}Find failed pipelines
Section titled “Find failed pipelines”dataspoc-pipe status --output json | jq '[.[] | select(.status == "failed")]'Get the error from a failed pipeline
Section titled “Get the error from a failed pipeline”dataspoc-pipe logs customers --output json | jq '.error'Shell script: run all stale pipelines
Section titled “Shell script: run all stale pipelines”#!/bin/bashTODAY=$(date +%Y-%m-%d)
dataspoc-pipe status --output json | jq -r '.[] | select(.last_run == null or (.last_run | split("T")[0]) < "'"$TODAY"'") | .name' | while read -r pipeline; do echo "Running: $pipeline" result=$(dataspoc-pipe run "$pipeline" --output json) success=$(echo "$result" | jq '.success') if [ "$success" = "true" ]; then records=$(echo "$result" | jq '[.streams[]] | add') echo " OK: $records records" else error=$(echo "$result" | jq -r '.error') echo " FAILED: $error" fidoneBest Practices
Section titled “Best Practices”Error handling
Section titled “Error handling”Always check the success field before acting on pipeline results. Agents should report errors clearly and suggest remediation steps rather than silently retrying.
result = client.run("orders")if not result["success"]: # Log the error, notify, or investigate -- don't just retry blindly log = client.logs("orders") print(f"Pipeline failed: {result['error']}") print(f"Full log: {log}")Timeouts
Section titled “Timeouts”Pipeline execution time depends on the data source. Set appropriate timeouts in your agent framework. For MCP clients, the default timeout is usually sufficient, but long-running extractions (full re-extractions of large databases) may need extended timeouts.
Idempotency
Section titled “Idempotency”Incremental pipelines are safe to run multiple times — they only extract new or updated records. Full extractions (full=True) re-extract everything, so use them intentionally. Agents should prefer incremental runs unless explicitly asked for a full extraction.
Validate before running
Section titled “Validate before running”Always call validate_pipeline before run_pipeline when automating pipelines. This catches configuration issues (missing taps, unreachable buckets) before wasting time on a doomed extraction.
Rate limiting
Section titled “Rate limiting”If your agent manages many pipelines, avoid running all of them simultaneously. Run them sequentially or in small batches to avoid overwhelming source databases or exceeding cloud API rate limits.
Sensitive information
Section titled “Sensitive information”Pipeline configurations may contain connection strings or references to environment variables. Instruct your agent not to log or display sensitive fields from pipeline_config results.