Open-Source · Apache 2.0

Stop building pipelines. Start running them.

Airflow, Meltano, custom scripts — data ingestion shouldn't take weeks. Pipe connects any of 400+ sources to Parquet in your bucket with one command. No DAGs. No infrastructure. No PhD in YAML.

$ pip install dataspoc-pipe
$ dataspoc-pipe add my-postgres
$ dataspoc-pipe run my-postgres

Your DE Agent is ready.

Pipe ships with AGENT.md — a skill file that teaches any AI agent how to manage your pipelines. Connect it via MCP or the Python SDK and your agent can add sources, run extractions, monitor status, read logs, and fix failures. Autonomously.

No prompt engineering. No custom wrappers. The agent reads the skill file, discovers what Pipe can do, and starts working. It's like hiring a data engineer that already read the docs.

Adds new sources Runs pipelines Monitors failures Validates connections Reads logs

Claude via MCP → Pipe

You: "The sales pipeline failed
      last night. What happened?"

[MCP] pipeline_logs("sales")
[MCP] validate_pipeline("sales")

Agent: "The Postgres source returned
a connection timeout. The DB host
is unreachable. Once it's back,
I can re-run with --full to
backfill the missed data."

You've been here before

47 tasks

Your Airflow DAG has 47 tasks just to move data from Postgres to S3. Half of them are retries.

2 weeks

The new hire spent 2 weeks understanding the ingestion pipeline. They still can't add a new source.

Human required

Your AI agent needs fresh data but can't trigger a pipeline without a human clicking buttons.

Before and after

What changes when you switch to Pipe.

Before

200 lines of Airflow DAG Python

After

5 lines of YAML

Before

pip install apache-airflow + 47 dependencies

After

pip install dataspoc-pipe

Before

3 weeks to first pipeline

After

15 minutes

How it works

From source to queryable Parquet in one command.

Singer Tap          Your source (Postgres, Stripe, REST API, etc.)
    |
    v
stdout (JSON)        Singer protocol — structured stream of records
    |
    v
Pipe (transform)     Optional Python transforms (transforms/<pipeline>.py)
    |
    v
Parquet              Columnar, compressed, partition-aware files
    |
    v
Bucket               S3, GCS, Azure, or local filesystem

Built for the jobs you actually have

When I need to add a new source

CLI wizard walks you through it. Pick from 400+ Singer taps, configure credentials, run. No boilerplate, no copy-paste from StackOverflow.

$ dataspoc-pipe add tap-postgres
  Source added. Configure in dataspoc-pipe.yaml
$ dataspoc-pipe run tap-postgres

When I need only new data

Incremental extraction with bookmarks. Pipe remembers where it left off and only pulls what changed. Saves time, saves egress costs.

$ dataspoc-pipe run tap-postgres
  Resuming from bookmark: 2026-04-14T08:00:00Z
  Extracted 1,247 new records (3 tables)

When I need to clean data during ingestion

Drop a Python file in transforms/<pipeline>.py and Pipe calls it on every record. Rename columns, mask PII, convert types — without a separate ETL step.

# transforms/my-postgres.py
def transform(record, stream):
    record['email'] = mask_pii(record['email'])
    return record

When my AI agent needs to trigger ingestion

Pipe runs as an MCP server. Your AI agent can discover sources, run pipelines, and check status — all through tool calls. Also works as a Python SDK.

$ dataspoc-pipe mcp
  MCP server running on stdio

# Agent: "ingest fresh data from postgres"
# Pipe: running tap-postgres... done. 12 tables.

What you get

Every Pipe run produces a clean, standardized bucket structure that Lens can query immediately.

raw/
  <source>/
    <table>/
      dt=2026-04-15/
        data_001.parquet
        data_002.parquet

.dataspoc/
  manifest.json          # Catalog — Lens reads this to discover your tables
  state/
    <pipeline>/
      state.json         # Incremental bookmarks — never re-extract old data
  logs/
    <pipeline>/
      2026-04-15T...json # Execution logs — debug any run

Your first pipeline in 15 minutes.

$ pip install dataspoc-pipe
$ dataspoc-pipe add tap-postgres
$ dataspoc-pipe run tap-postgres