Examples
import { Tabs, TabItem } from ‘@astrojs/starlight/components’;
All examples are available in the dataspoc-pipe/examples directory on GitHub.
End-to-End Demo
Section titled “End-to-End Demo”The e2e-demo.sh script demonstrates the full DataSpoc workflow: download a real dataset, ingest it with Pipe, and query it with Lens. See the dedicated End-to-End Demo page for a detailed walkthrough.
cd dataspoc-pipebash examples/e2e-demo.shThe script performs these steps:
- Downloads the Iris dataset from the seaborn-data GitHub repository
- Creates a mock Singer tap config pointing to the downloaded CSV
- Initializes Pipe and writes a pipeline YAML for the iris dataset
- Runs the pipeline to convert CSV to Parquet in a local data lake
- Inspects the lake showing Parquet files and the manifest
- Sets up Lens and registers the lake as a bucket
- Runs SQL queries against the ingested data (counts, aggregations, top-N)
- Exports results to CSV and JSON
Mock Singer Tap (CSV)
Section titled “Mock Singer Tap (CSV)”mock_tap_csv.py is a lightweight Singer tap that reads any CSV file and emits Singer protocol messages (SCHEMA, RECORD, STATE). It is useful for testing Pipe without installing a real tap.
How it works
Section titled “How it works”- Reads a CSV file using Python’s
csv.DictReader - Infers JSON Schema types by sampling the first 200 rows (integer, number, boolean, or string)
- Emits a
SCHEMAmessage with the inferred column types - Emits one
RECORDmessage per row, with values cast to their inferred types - Emits a final
STATEmessage indicating completion
Create a config file:
{"csv_path": "/path/to/data.csv", "stream_name": "my_table"}The stream_name field is optional and defaults to the CSV filename without extension.
Run the tap:
python examples/mock_tap_csv.py --config config.jsonPipe the output to Pipe:
python examples/mock_tap_csv.py --config config.json | dataspoc-pipe run --stdin my-pipelineFull source
Section titled “Full source”#!/usr/bin/env python3"""Mock Singer tap that reads a CSV file and emits Singer protocol messages.
Usage: python mock_tap_csv.py --config config.json
Config JSON format: {"csv_path": "/path/to/file.csv", "stream_name": "my_table"}
The stream_name field is optional and defaults to the CSV filename (without extension)."""
import argparseimport csvimport jsonimport signalimport sysfrom pathlib import Path
# Handle SIGPIPE gracefully (e.g. when piped to head)signal.signal(signal.SIGPIPE, signal.SIG_DFL)
def infer_type(value: str) -> str | None: """Infer JSON Schema type from a string value. Returns None for empty.""" if value == "": return None # Try integer try: int(value) return "integer" except ValueError: pass # Try float try: float(value) return "number" except ValueError: pass # Boolean if value.lower() in ("true", "false"): return "boolean" return "string"
def cast_value(value: str, json_type: str): """Cast a string value to the appropriate Python type.""" if value == "": return None if json_type == "integer": try: return int(value) except ValueError: return None if json_type == "number": try: return float(value) except ValueError: return None if json_type == "boolean": return value.lower() == "true" return value
def infer_schema(rows: list[dict], headers: list[str]) -> dict[str, str]: """Infer JSON Schema types by sampling rows.""" type_counts: dict[str, dict[str, int]] = {h: {} for h in headers}
sample = rows[:200] # sample first 200 rows for row in sample: for h in headers: val = row.get(h, "") t = infer_type(val) if t is not None: type_counts[h][t] = type_counts[h].get(t, 0) + 1
result = {} for h in headers: counts = type_counts[h] if not counts: result[h] = "string" continue # Pick the most specific numeric type if no string values found non_string = {k: v for k, v in counts.items() if k != "string"} if non_string and not counts.get("string", 0): result[h] = max(non_string, key=non_string.get) else: result[h] = "string" return result
def main(): parser = argparse.ArgumentParser(description="Mock Singer tap for CSV files") parser.add_argument("--config", required=True, help="Path to config JSON file") parser.add_argument("--state", default=None, help="Path to state JSON file (ignored)") args = parser.parse_args()
with open(args.config) as f: config = json.load(f)
csv_path = config["csv_path"] stream_name = config.get("stream_name", Path(csv_path).stem) # Sanitize stream name for use as table name stream_name = stream_name.replace("-", "_").replace(" ", "_").lower()
# Read CSV with open(csv_path, newline="", encoding="utf-8-sig") as f: reader = csv.DictReader(f) headers = reader.fieldnames or [] rows = list(reader)
if not headers: print("No headers found in CSV", file=sys.stderr) sys.exit(1)
# Infer types col_types = infer_schema(rows, headers)
# Emit SCHEMA properties = {} for h in headers: t = col_types.get(h, "string") properties[h] = {"type": ["null", t]}
schema_msg = { "type": "SCHEMA", "stream": stream_name, "schema": { "type": "object", "properties": properties, }, "key_properties": [], } print(json.dumps(schema_msg))
# Emit RECORD for each row for row in rows: record = {} for h in headers: record[h] = cast_value(row.get(h, ""), col_types.get(h, "string"))
record_msg = { "type": "RECORD", "stream": stream_name, "record": record, } print(json.dumps(record_msg))
# Emit STATE state_msg = { "type": "STATE", "value": {"completed": True, "rows": len(rows)}, } print(json.dumps(state_msg))
if __name__ == "__main__": main()Docker Demo
Section titled “Docker Demo”Dockerfile.demo builds a self-contained image with Pipe, Lens, Jupyter, and three pre-ingested sample datasets (Iris, Titanic, Tips). It is the fastest way to try DataSpoc without installing anything locally.
Included datasets
Section titled “Included datasets”| Dataset | Rows | Description |
|---|---|---|
| iris | 150 | Classic flower measurements (sepal/petal length and width, species) |
| titanic | 891 | Titanic passenger survival data |
| tips | 244 | Restaurant tipping data |
What the image does at build time
Section titled “What the image does at build time”- Installs Pipe and Lens (with Jupyter and AI extras) using
uv - Downloads all three CSV datasets from the seaborn-data repository
- Creates a pipeline for each dataset using the mock Singer tap
- Runs all three pipelines, producing Parquet files in
/lake - Initializes Lens and registers the lake as a bucket
Build and run
Section titled “Build and run”cd dataspoc-pipe
# Build the imagedocker build -f examples/Dockerfile.demo -t dataspoc-demo .
# Run Jupyter (default CMD)docker run -p 8888:8888 dataspoc-demoOpen http://localhost:8888 in your browser to access Jupyter with all three tables pre-mounted.
Run queries instead of Jupyter
Section titled “Run queries instead of Jupyter”# Interactive SQL shelldocker run -it dataspoc-demo dataspoc-lens shell
# One-off querydocker run dataspoc-demo dataspoc-lens query "SELECT species, COUNT(*) FROM iris GROUP BY species"
# View the catalogdocker run dataspoc-demo dataspoc-lens catalogDockerfile
Section titled “Dockerfile”FROM python:3.12-slim
LABEL description="DataSpoc Demo — Pipe + Lens + Jupyter with sample data"
RUN apt-get update && apt-get install -y --no-install-recommends \ curl util-linux \ && rm -rf /var/lib/apt/lists/*
RUN pip install --no-cache-dir uv
WORKDIR /app
# Copy PipeCOPY pyproject.toml README.md ./COPY src/dataspoc_pipe/ src/dataspoc_pipe/
# Install PipeRUN uv pip install --system -e .
# Copy LensCOPY lens/pyproject.toml lens/README.md lens/COPY lens/src/dataspoc_lens/ lens/src/dataspoc_lens/
# Install Lens with jupyter + ai extrasRUN cd lens && uv pip install --system -e ".[all]"
# Copy examplesCOPY examples/ examples/
# Download sample datasetsRUN curl -sL "https://raw.githubusercontent.com/mwaskom/seaborn-data/master/iris.csv" -o /tmp/iris.csv \ && curl -sL "https://raw.githubusercontent.com/mwaskom/seaborn-data/master/titanic.csv" -o /tmp/titanic.csv \ && curl -sL "https://raw.githubusercontent.com/mwaskom/seaborn-data/master/tips.csv" -o /tmp/tips.csv
# Ingest all 3 datasets with PipeRUN dataspoc-pipe init
# Iris pipelineRUN echo '{"csv_path": "/tmp/iris.csv", "stream_name": "iris"}' > /tmp/iris-config.json \ && mkdir -p ~/.dataspoc-pipe/pipelines \ && printf 'source:\n tap: "python /app/examples/mock_tap_csv.py"\n config: /tmp/iris-config.json\ndestination:\n bucket: "file:///lake"\n path: raw\n compression: zstd\n' > ~/.dataspoc-pipe/pipelines/iris.yaml \ && dataspoc-pipe run iris
# Titanic pipelineRUN echo '{"csv_path": "/tmp/titanic.csv", "stream_name": "titanic"}' > /tmp/titanic-config.json \ && printf 'source:\n tap: "python /app/examples/mock_tap_csv.py"\n config: /tmp/titanic-config.json\ndestination:\n bucket: "file:///lake"\n path: raw\n compression: zstd\n' > ~/.dataspoc-pipe/pipelines/titanic.yaml \ && dataspoc-pipe run titanic
# Tips pipelineRUN echo '{"csv_path": "/tmp/tips.csv", "stream_name": "tips"}' > /tmp/tips-config.json \ && printf 'source:\n tap: "python /app/examples/mock_tap_csv.py"\n config: /tmp/tips-config.json\ndestination:\n bucket: "file:///lake"\n path: raw\n compression: zstd\n' > ~/.dataspoc-pipe/pipelines/tips.yaml \ && dataspoc-pipe run tips
# Setup LensRUN dataspoc-lens init \ && dataspoc-lens add-bucket "file:///lake"
# Expose JupyterEXPOSE 8888
# Default: launch Jupyter with tables pre-mountedCMD ["dataspoc-lens", "notebook"]Python SDK
Section titled “Python SDK”sdk_usage.py shows how to use Pipe programmatically from Python instead of the CLI.
"""Example: Using the DataSpoc Pipe Python SDK."""
from dataspoc_pipe import PipeClient
client = PipeClient()
# 1. List pipelinespipelines = client.pipelines()print("Pipelines:", pipelines)
# 2. Check statusfor s in client.status(): print(f" {s['name']}: {s['status']} ({s['records']} records)")
# 3. Run a pipelineresult = client.run("sales-data")if result["success"]: print(f"\nSuccess! Streams: {result['streams']}")else: print(f"\nFailed: {result['error']}")
# 4. View logslog = client.logs("sales-data")if log: print(f"\nLast run: {log.get('started_at', 'N/A')}") print(f"Duration: {log.get('duration_seconds', 0):.1f}s") print(f"Records: {log.get('total_records', 0)}")
# 5. View manifestmanifest = client.manifest("s3://my-bucket")for table_key, table_info in manifest.get("tables", {}).items(): print(f" {table_key}: {table_info.get('stats', {}).get('total_rows', 0)} rows")Key SDK methods
Section titled “Key SDK methods”| Method | Description |
|---|---|
PipeClient() | Create a client (reads ~/.dataspoc-pipe/config.yaml) |
client.pipelines() | List all configured pipelines |
client.status() | Get status of all pipelines (last run, record counts) |
client.run(name) | Run a pipeline by name, returns result dict |
client.logs(name) | Get the latest execution log for a pipeline |
client.manifest(bucket) | Read the manifest from a bucket |
MCP Config (Claude Desktop)
Section titled “MCP Config (Claude Desktop)”mcp_config.json configures DataSpoc Pipe as an MCP server for Claude Desktop, allowing Claude to run pipelines, check status, and inspect manifests via natural language.
{ "mcpServers": { "dataspoc-pipe": { "command": "dataspoc-pipe", "args": ["mcp"] } }}- Install DataSpoc Pipe:
pip install dataspoc-pipe - Open Claude Desktop settings
- Add the configuration above to your MCP servers config
- Restart Claude Desktop
Once connected, you can ask Claude things like:
- “List my pipelines”
- “Run the sales-data pipeline”
- “Show me the manifest for s3://my-bucket”
- “What was the last run status?”
Pipeline examples
Section titled “Pipeline examples”Beyond the example scripts, here are three complete pipeline configurations for common scenarios.
CSV to local filesystem
Section titled “CSV to local filesystem”The simplest pipeline. Reads a CSV file and writes Parquet to the local filesystem. No cloud credentials needed.
pip install dataspoc-pipe tap-csvmkdir -p /tmp/sample-datacat > /tmp/sample-data/products.csv << 'EOF'id,name,category,price,in_stock1,Widget A,electronics,29.99,true2,Gadget B,electronics,49.99,true3,Gizmo C,accessories,19.99,false4,Widget D,electronics,39.99,true5,Doohickey E,accessories,9.99,trueEOFdataspoc-pipe initdataspoc-pipe add csv-productsIn the wizard, enter:
- Tap:
tap-csv - Bucket:
file:///tmp/lake - Base path:
raw - Compression:
zstd - Incremental: no
- Cron: (leave empty)
Edit ~/.dataspoc-pipe/sources/csv-products.json:
{ "csv_files_definition": [ { "entity": "products", "path": "/tmp/sample-data/products.csv", "keys": ["id"] } ]}Pipeline YAML (~/.dataspoc-pipe/pipelines/csv-products.yaml):
source: tap: tap-csv config: /home/you/.dataspoc-pipe/sources/csv-products.jsondestination: bucket: file:///tmp/lake path: raw compression: zstd partition_by: _extraction_dateincremental: enabled: falsedataspoc-pipe validate csv-productsdataspoc-pipe run csv-productsPostgreSQL to S3 (incremental, scheduled)
Section titled “PostgreSQL to S3 (incremental, scheduled)”A production-grade pipeline that incrementally extracts data from PostgreSQL to S3, running every 2 hours.
pip install dataspoc-pipe[s3] tap-postgresPipeline YAML:
source: tap: tap-postgres config: /home/you/.dataspoc-pipe/sources/pg-orders.json streams: - public-orders - public-order_itemsdestination: bucket: s3://company-datalake path: raw compression: zstd partition_by: _extraction_dateincremental: enabled: trueschedule: cron: "0 */2 * * *"dataspoc-pipe validate pg-ordersdataspoc-pipe run pg-ordersdataspoc-pipe schedule installGitHub API to GCS
Section titled “GitHub API to GCS”Extract repository data from the GitHub API into Google Cloud Storage.
pip install dataspoc-pipe[gcs] tap-githubPipeline YAML:
source: tap: tap-github config: /home/you/.dataspoc-pipe/sources/github-repos.json streams: - commits - pull_requests - issues - stargazersdestination: bucket: gs://analytics-lake path: raw compression: zstd partition_by: _extraction_dateincremental: enabled: trueschedule: cron: "0 1 * * *"gcloud auth application-default logindataspoc-pipe validate github-reposdataspoc-pipe run github-reposdataspoc-pipe schedule install