Skip to content

Examples

import { Tabs, TabItem } from ‘@astrojs/starlight/components’;

All examples are available in the dataspoc-pipe/examples directory on GitHub.


The e2e-demo.sh script demonstrates the full DataSpoc workflow: download a real dataset, ingest it with Pipe, and query it with Lens. See the dedicated End-to-End Demo page for a detailed walkthrough.

Terminal window
cd dataspoc-pipe
bash examples/e2e-demo.sh

The script performs these steps:

  1. Downloads the Iris dataset from the seaborn-data GitHub repository
  2. Creates a mock Singer tap config pointing to the downloaded CSV
  3. Initializes Pipe and writes a pipeline YAML for the iris dataset
  4. Runs the pipeline to convert CSV to Parquet in a local data lake
  5. Inspects the lake showing Parquet files and the manifest
  6. Sets up Lens and registers the lake as a bucket
  7. Runs SQL queries against the ingested data (counts, aggregations, top-N)
  8. Exports results to CSV and JSON

mock_tap_csv.py is a lightweight Singer tap that reads any CSV file and emits Singer protocol messages (SCHEMA, RECORD, STATE). It is useful for testing Pipe without installing a real tap.

  1. Reads a CSV file using Python’s csv.DictReader
  2. Infers JSON Schema types by sampling the first 200 rows (integer, number, boolean, or string)
  3. Emits a SCHEMA message with the inferred column types
  4. Emits one RECORD message per row, with values cast to their inferred types
  5. Emits a final STATE message indicating completion

Create a config file:

{"csv_path": "/path/to/data.csv", "stream_name": "my_table"}

The stream_name field is optional and defaults to the CSV filename without extension.

Run the tap:

Terminal window
python examples/mock_tap_csv.py --config config.json

Pipe the output to Pipe:

Terminal window
python examples/mock_tap_csv.py --config config.json | dataspoc-pipe run --stdin my-pipeline
#!/usr/bin/env python3
"""Mock Singer tap that reads a CSV file and emits Singer protocol messages.
Usage:
python mock_tap_csv.py --config config.json
Config JSON format:
{"csv_path": "/path/to/file.csv", "stream_name": "my_table"}
The stream_name field is optional and defaults to the CSV filename (without extension).
"""
import argparse
import csv
import json
import signal
import sys
from pathlib import Path
# Handle SIGPIPE gracefully (e.g. when piped to head)
signal.signal(signal.SIGPIPE, signal.SIG_DFL)
def infer_type(value: str) -> str | None:
"""Infer JSON Schema type from a string value. Returns None for empty."""
if value == "":
return None
# Try integer
try:
int(value)
return "integer"
except ValueError:
pass
# Try float
try:
float(value)
return "number"
except ValueError:
pass
# Boolean
if value.lower() in ("true", "false"):
return "boolean"
return "string"
def cast_value(value: str, json_type: str):
"""Cast a string value to the appropriate Python type."""
if value == "":
return None
if json_type == "integer":
try:
return int(value)
except ValueError:
return None
if json_type == "number":
try:
return float(value)
except ValueError:
return None
if json_type == "boolean":
return value.lower() == "true"
return value
def infer_schema(rows: list[dict], headers: list[str]) -> dict[str, str]:
"""Infer JSON Schema types by sampling rows."""
type_counts: dict[str, dict[str, int]] = {h: {} for h in headers}
sample = rows[:200] # sample first 200 rows
for row in sample:
for h in headers:
val = row.get(h, "")
t = infer_type(val)
if t is not None:
type_counts[h][t] = type_counts[h].get(t, 0) + 1
result = {}
for h in headers:
counts = type_counts[h]
if not counts:
result[h] = "string"
continue
# Pick the most specific numeric type if no string values found
non_string = {k: v for k, v in counts.items() if k != "string"}
if non_string and not counts.get("string", 0):
result[h] = max(non_string, key=non_string.get)
else:
result[h] = "string"
return result
def main():
parser = argparse.ArgumentParser(description="Mock Singer tap for CSV files")
parser.add_argument("--config", required=True, help="Path to config JSON file")
parser.add_argument("--state", default=None, help="Path to state JSON file (ignored)")
args = parser.parse_args()
with open(args.config) as f:
config = json.load(f)
csv_path = config["csv_path"]
stream_name = config.get("stream_name", Path(csv_path).stem)
# Sanitize stream name for use as table name
stream_name = stream_name.replace("-", "_").replace(" ", "_").lower()
# Read CSV
with open(csv_path, newline="", encoding="utf-8-sig") as f:
reader = csv.DictReader(f)
headers = reader.fieldnames or []
rows = list(reader)
if not headers:
print("No headers found in CSV", file=sys.stderr)
sys.exit(1)
# Infer types
col_types = infer_schema(rows, headers)
# Emit SCHEMA
properties = {}
for h in headers:
t = col_types.get(h, "string")
properties[h] = {"type": ["null", t]}
schema_msg = {
"type": "SCHEMA",
"stream": stream_name,
"schema": {
"type": "object",
"properties": properties,
},
"key_properties": [],
}
print(json.dumps(schema_msg))
# Emit RECORD for each row
for row in rows:
record = {}
for h in headers:
record[h] = cast_value(row.get(h, ""), col_types.get(h, "string"))
record_msg = {
"type": "RECORD",
"stream": stream_name,
"record": record,
}
print(json.dumps(record_msg))
# Emit STATE
state_msg = {
"type": "STATE",
"value": {"completed": True, "rows": len(rows)},
}
print(json.dumps(state_msg))
if __name__ == "__main__":
main()

Dockerfile.demo builds a self-contained image with Pipe, Lens, Jupyter, and three pre-ingested sample datasets (Iris, Titanic, Tips). It is the fastest way to try DataSpoc without installing anything locally.

DatasetRowsDescription
iris150Classic flower measurements (sepal/petal length and width, species)
titanic891Titanic passenger survival data
tips244Restaurant tipping data
  1. Installs Pipe and Lens (with Jupyter and AI extras) using uv
  2. Downloads all three CSV datasets from the seaborn-data repository
  3. Creates a pipeline for each dataset using the mock Singer tap
  4. Runs all three pipelines, producing Parquet files in /lake
  5. Initializes Lens and registers the lake as a bucket
Terminal window
cd dataspoc-pipe
# Build the image
docker build -f examples/Dockerfile.demo -t dataspoc-demo .
# Run Jupyter (default CMD)
docker run -p 8888:8888 dataspoc-demo

Open http://localhost:8888 in your browser to access Jupyter with all three tables pre-mounted.

Terminal window
# Interactive SQL shell
docker run -it dataspoc-demo dataspoc-lens shell
# One-off query
docker run dataspoc-demo dataspoc-lens query "SELECT species, COUNT(*) FROM iris GROUP BY species"
# View the catalog
docker run dataspoc-demo dataspoc-lens catalog
FROM python:3.12-slim
LABEL description="DataSpoc Demo — Pipe + Lens + Jupyter with sample data"
RUN apt-get update && apt-get install -y --no-install-recommends \
curl util-linux \
&& rm -rf /var/lib/apt/lists/*
RUN pip install --no-cache-dir uv
WORKDIR /app
# Copy Pipe
COPY pyproject.toml README.md ./
COPY src/dataspoc_pipe/ src/dataspoc_pipe/
# Install Pipe
RUN uv pip install --system -e .
# Copy Lens
COPY lens/pyproject.toml lens/README.md lens/
COPY lens/src/dataspoc_lens/ lens/src/dataspoc_lens/
# Install Lens with jupyter + ai extras
RUN cd lens && uv pip install --system -e ".[all]"
# Copy examples
COPY examples/ examples/
# Download sample datasets
RUN curl -sL "https://raw.githubusercontent.com/mwaskom/seaborn-data/master/iris.csv" -o /tmp/iris.csv \
&& curl -sL "https://raw.githubusercontent.com/mwaskom/seaborn-data/master/titanic.csv" -o /tmp/titanic.csv \
&& curl -sL "https://raw.githubusercontent.com/mwaskom/seaborn-data/master/tips.csv" -o /tmp/tips.csv
# Ingest all 3 datasets with Pipe
RUN dataspoc-pipe init
# Iris pipeline
RUN echo '{"csv_path": "/tmp/iris.csv", "stream_name": "iris"}' > /tmp/iris-config.json \
&& mkdir -p ~/.dataspoc-pipe/pipelines \
&& printf 'source:\n tap: "python /app/examples/mock_tap_csv.py"\n config: /tmp/iris-config.json\ndestination:\n bucket: "file:///lake"\n path: raw\n compression: zstd\n' > ~/.dataspoc-pipe/pipelines/iris.yaml \
&& dataspoc-pipe run iris
# Titanic pipeline
RUN echo '{"csv_path": "/tmp/titanic.csv", "stream_name": "titanic"}' > /tmp/titanic-config.json \
&& printf 'source:\n tap: "python /app/examples/mock_tap_csv.py"\n config: /tmp/titanic-config.json\ndestination:\n bucket: "file:///lake"\n path: raw\n compression: zstd\n' > ~/.dataspoc-pipe/pipelines/titanic.yaml \
&& dataspoc-pipe run titanic
# Tips pipeline
RUN echo '{"csv_path": "/tmp/tips.csv", "stream_name": "tips"}' > /tmp/tips-config.json \
&& printf 'source:\n tap: "python /app/examples/mock_tap_csv.py"\n config: /tmp/tips-config.json\ndestination:\n bucket: "file:///lake"\n path: raw\n compression: zstd\n' > ~/.dataspoc-pipe/pipelines/tips.yaml \
&& dataspoc-pipe run tips
# Setup Lens
RUN dataspoc-lens init \
&& dataspoc-lens add-bucket "file:///lake"
# Expose Jupyter
EXPOSE 8888
# Default: launch Jupyter with tables pre-mounted
CMD ["dataspoc-lens", "notebook"]

sdk_usage.py shows how to use Pipe programmatically from Python instead of the CLI.

"""Example: Using the DataSpoc Pipe Python SDK."""
from dataspoc_pipe import PipeClient
client = PipeClient()
# 1. List pipelines
pipelines = client.pipelines()
print("Pipelines:", pipelines)
# 2. Check status
for s in client.status():
print(f" {s['name']}: {s['status']} ({s['records']} records)")
# 3. Run a pipeline
result = client.run("sales-data")
if result["success"]:
print(f"\nSuccess! Streams: {result['streams']}")
else:
print(f"\nFailed: {result['error']}")
# 4. View logs
log = client.logs("sales-data")
if log:
print(f"\nLast run: {log.get('started_at', 'N/A')}")
print(f"Duration: {log.get('duration_seconds', 0):.1f}s")
print(f"Records: {log.get('total_records', 0)}")
# 5. View manifest
manifest = client.manifest("s3://my-bucket")
for table_key, table_info in manifest.get("tables", {}).items():
print(f" {table_key}: {table_info.get('stats', {}).get('total_rows', 0)} rows")
MethodDescription
PipeClient()Create a client (reads ~/.dataspoc-pipe/config.yaml)
client.pipelines()List all configured pipelines
client.status()Get status of all pipelines (last run, record counts)
client.run(name)Run a pipeline by name, returns result dict
client.logs(name)Get the latest execution log for a pipeline
client.manifest(bucket)Read the manifest from a bucket

mcp_config.json configures DataSpoc Pipe as an MCP server for Claude Desktop, allowing Claude to run pipelines, check status, and inspect manifests via natural language.

{
"mcpServers": {
"dataspoc-pipe": {
"command": "dataspoc-pipe",
"args": ["mcp"]
}
}
}
  1. Install DataSpoc Pipe: pip install dataspoc-pipe
  2. Open Claude Desktop settings
  3. Add the configuration above to your MCP servers config
  4. Restart Claude Desktop

Once connected, you can ask Claude things like:

  • “List my pipelines”
  • “Run the sales-data pipeline”
  • “Show me the manifest for s3://my-bucket”
  • “What was the last run status?”

Beyond the example scripts, here are three complete pipeline configurations for common scenarios.

The simplest pipeline. Reads a CSV file and writes Parquet to the local filesystem. No cloud credentials needed.

Terminal window
pip install dataspoc-pipe tap-csv
Terminal window
mkdir -p /tmp/sample-data
cat > /tmp/sample-data/products.csv << 'EOF'
id,name,category,price,in_stock
1,Widget A,electronics,29.99,true
2,Gadget B,electronics,49.99,true
3,Gizmo C,accessories,19.99,false
4,Widget D,electronics,39.99,true
5,Doohickey E,accessories,9.99,true
EOF
Terminal window
dataspoc-pipe init
dataspoc-pipe add csv-products

In the wizard, enter:

  • Tap: tap-csv
  • Bucket: file:///tmp/lake
  • Base path: raw
  • Compression: zstd
  • Incremental: no
  • Cron: (leave empty)

Edit ~/.dataspoc-pipe/sources/csv-products.json:

{
"csv_files_definition": [
{
"entity": "products",
"path": "/tmp/sample-data/products.csv",
"keys": ["id"]
}
]
}

Pipeline YAML (~/.dataspoc-pipe/pipelines/csv-products.yaml):

source:
tap: tap-csv
config: /home/you/.dataspoc-pipe/sources/csv-products.json
destination:
bucket: file:///tmp/lake
path: raw
compression: zstd
partition_by: _extraction_date
incremental:
enabled: false
Terminal window
dataspoc-pipe validate csv-products
dataspoc-pipe run csv-products

A production-grade pipeline that incrementally extracts data from PostgreSQL to S3, running every 2 hours.

Terminal window
pip install dataspoc-pipe[s3] tap-postgres

Pipeline YAML:

source:
tap: tap-postgres
config: /home/you/.dataspoc-pipe/sources/pg-orders.json
streams:
- public-orders
- public-order_items
destination:
bucket: s3://company-datalake
path: raw
compression: zstd
partition_by: _extraction_date
incremental:
enabled: true
schedule:
cron: "0 */2 * * *"
Terminal window
dataspoc-pipe validate pg-orders
dataspoc-pipe run pg-orders
dataspoc-pipe schedule install

Extract repository data from the GitHub API into Google Cloud Storage.

Terminal window
pip install dataspoc-pipe[gcs] tap-github

Pipeline YAML:

source:
tap: tap-github
config: /home/you/.dataspoc-pipe/sources/github-repos.json
streams:
- commits
- pull_requests
- issues
- stargazers
destination:
bucket: gs://analytics-lake
path: raw
compression: zstd
partition_by: _extraction_date
incremental:
enabled: true
schedule:
cron: "0 1 * * *"
Terminal window
gcloud auth application-default login
dataspoc-pipe validate github-repos
dataspoc-pipe run github-repos
dataspoc-pipe schedule install