Stop building pipelines. Start running them.
Airflow, Meltano, custom scripts — data ingestion shouldn't take weeks. Pipe connects any of 400+ sources to Parquet in your bucket with one command. No DAGs. No infrastructure. No PhD in YAML.
$ pip install dataspoc-pipe $ dataspoc-pipe add my-postgres $ dataspoc-pipe run my-postgres
Your DE Agent is ready.
Pipe ships with AGENT.md — a skill file that teaches any AI agent how to manage your pipelines. Connect it via MCP or the Python SDK and your agent can add sources, run extractions, monitor status, read logs, and fix failures. Autonomously.
No prompt engineering. No custom wrappers. The agent reads the skill file, discovers what Pipe can do, and starts working. It's like hiring a data engineer that already read the docs.
Claude via MCP → Pipe
You: "The sales pipeline failed last night. What happened?" [MCP] pipeline_logs("sales") [MCP] validate_pipeline("sales") Agent: "The Postgres source returned a connection timeout. The DB host is unreachable. Once it's back, I can re-run with --full to backfill the missed data."
You've been here before
Your Airflow DAG has 47 tasks just to move data from Postgres to S3. Half of them are retries.
The new hire spent 2 weeks understanding the ingestion pipeline. They still can't add a new source.
Your AI agent needs fresh data but can't trigger a pipeline without a human clicking buttons.
Before and after
What changes when you switch to Pipe.
200 lines of Airflow DAG Python
5 lines of YAML
pip install apache-airflow + 47 dependencies
pip install dataspoc-pipe
3 weeks to first pipeline
15 minutes
How it works
From source to queryable Parquet in one command.
Singer Tap Your source (Postgres, Stripe, REST API, etc.)
|
v
stdout (JSON) Singer protocol — structured stream of records
|
v
Pipe (transform) Optional Python transforms (transforms/<pipeline>.py)
|
v
Parquet Columnar, compressed, partition-aware files
|
v
Bucket S3, GCS, Azure, or local filesystem Built for the jobs you actually have
When I need to add a new source
CLI wizard walks you through it. Pick from 400+ Singer taps, configure credentials, run. No boilerplate, no copy-paste from StackOverflow.
$ dataspoc-pipe add tap-postgres Source added. Configure in dataspoc-pipe.yaml $ dataspoc-pipe run tap-postgres
When I need only new data
Incremental extraction with bookmarks. Pipe remembers where it left off and only pulls what changed. Saves time, saves egress costs.
$ dataspoc-pipe run tap-postgres Resuming from bookmark: 2026-04-14T08:00:00Z Extracted 1,247 new records (3 tables)
When I need to clean data during ingestion
Drop a Python file in transforms/<pipeline>.py and Pipe calls it on every record. Rename columns, mask PII, convert types — without a separate ETL step.
# transforms/my-postgres.py
def transform(record, stream):
record['email'] = mask_pii(record['email'])
return record When my AI agent needs to trigger ingestion
Pipe runs as an MCP server. Your AI agent can discover sources, run pipelines, and check status — all through tool calls. Also works as a Python SDK.
$ dataspoc-pipe mcp MCP server running on stdio # Agent: "ingest fresh data from postgres" # Pipe: running tap-postgres... done. 12 tables.
What you get
Every Pipe run produces a clean, standardized bucket structure that Lens can query immediately.
raw/
<source>/
<table>/
dt=2026-04-15/
data_001.parquet
data_002.parquet
.dataspoc/
manifest.json # Catalog — Lens reads this to discover your tables
state/
<pipeline>/
state.json # Incremental bookmarks — never re-extract old data
logs/
<pipeline>/
2026-04-15T...json # Execution logs — debug any run Your first pipeline in 15 minutes.
$ pip install dataspoc-pipe
$ dataspoc-pipe add tap-postgres
$ dataspoc-pipe run tap-postgres