rest-apietlpipelineparquettutorial

Ingest Any REST API to Parquet in 10 Minutes

Michael San Martim · 2026-04-20

Every company has internal APIs. Customer data in a CRM. Events in an analytics backend. Metrics in a monitoring tool. Getting that data into your data lake usually means writing custom Python scripts, managing scheduling, handling pagination, and dealing with schema changes.

DataSpoc Pipe wraps the Singer ecosystem — 400+ taps — and writes directly to Parquet in your cloud bucket. For REST APIs, tap-rest-api-msdk handles the HTTP calls, pagination, and schema detection. You just provide the endpoint configuration.

How It Works

REST API → tap-rest-api-msdk → DataSpoc Pipe → Parquet in S3/GCS/Azure

Pipe manages the tap, converts the output to Parquet, and writes to your bucket. No intermediate database. No staging area.

Install

Terminal window
pip install dataspoc-pipe

Pipe installs Singer taps automatically when you first run a pipeline that uses them.

Example 1: GitHub API

Pull repository data from the GitHub API into your data lake.

Pipeline Configuration

Create github-repos.yaml:

pipeline: github-repos
source:
tap: tap-rest-api-msdk
config:
api_url: "https://api.github.com"
auth_method: "header"
auth_token: "${GITHUB_TOKEN}"
streams:
- name: repositories
path: "/orgs/my-org/repos"
primary_keys: ["id"]
records_path: "$[*]"
pagination:
style: "link_header"
schema:
- name: id
type: integer
- name: name
type: string
- name: full_name
type: string
- name: stargazers_count
type: integer
- name: language
type: string
- name: created_at
type: string
format: date-time
- name: updated_at
type: string
format: date-time
- name: open_issues_count
type: integer
destination:
bucket: "s3://my-data-lake"
path: "raw/github/repositories"
format: parquet

Run It

Terminal window
export GITHUB_TOKEN="ghp_..."
dataspoc-pipe run github-repos.yaml

Output:

[github-repos] Starting pipeline...
[github-repos] Installing tap-rest-api-msdk...
[github-repos] Extracting: repositories
[github-repos] → Page 1: 30 records
[github-repos] → Page 2: 30 records
[github-repos] → Page 3: 12 records
[github-repos] Total: 72 records extracted
[github-repos] Writing Parquet to s3://my-data-lake/raw/github/repositories/
[github-repos] ✓ Complete: 72 rows, 1 file, 23KB

Query with Lens

from dataspoc_lens import LensClient
lens = LensClient()
# Which repos have the most open issues?
df = lens.query("""
SELECT name, language, stargazers_count, open_issues_count
FROM raw_github_repositories
WHERE open_issues_count > 0
ORDER BY open_issues_count DESC
LIMIT 10
""")
print(df)

Example 2: Weather API

Pull historical weather data from Open-Meteo (free, no auth required).

Pipeline Configuration

Create weather-daily.yaml:

pipeline: weather-daily
source:
tap: tap-rest-api-msdk
config:
api_url: "https://archive-api.open-meteo.com"
streams:
- name: daily_weather
path: "/v1/archive"
params:
latitude: "40.7128"
longitude: "-74.0060"
start_date: "2025-01-01"
end_date: "2026-04-01"
daily: "temperature_2m_max,temperature_2m_min,precipitation_sum,windspeed_10m_max"
records_path: "$.daily"
primary_keys: ["time"]
schema:
- name: time
type: string
format: date
- name: temperature_2m_max
type: number
- name: temperature_2m_min
type: number
- name: precipitation_sum
type: number
- name: windspeed_10m_max
type: number
destination:
bucket: "s3://my-data-lake"
path: "raw/weather/daily"
format: parquet

Run and Query

Terminal window
dataspoc-pipe run weather-daily.yaml
from dataspoc_lens import LensClient
lens = LensClient()
# Average temperature by month
df = lens.query("""
SELECT
DATE_TRUNC('month', CAST(time AS DATE)) AS month,
ROUND(AVG(temperature_2m_max), 1) AS avg_high,
ROUND(AVG(temperature_2m_min), 1) AS avg_low,
ROUND(SUM(precipitation_sum), 1) AS total_rain_mm
FROM raw_weather_daily
GROUP BY month
ORDER BY month
""")
print(df)

Example 3: Internal Company API

Most companies have internal APIs for things like user activity, product events, or CRM data. Here is how to ingest from a typical internal API with token auth and cursor-based pagination.

Pipeline Configuration

Create internal-events.yaml:

pipeline: internal-events
source:
tap: tap-rest-api-msdk
config:
api_url: "https://api.internal.company.com"
auth_method: "header"
auth_token: "${INTERNAL_API_TOKEN}"
headers:
X-API-Version: "2"
streams:
- name: user_events
path: "/v2/events"
primary_keys: ["event_id"]
records_path: "$.data[*]"
pagination:
style: "jsonpath"
next_page_token_path: "$.pagination.cursor"
page_size: 100
params:
start_date: "2026-04-01"
event_type: "purchase,signup,cancellation"
schema:
- name: event_id
type: string
- name: user_id
type: string
- name: event_type
type: string
- name: timestamp
type: string
format: date-time
- name: properties
type: object
- name: users
path: "/v2/users"
primary_keys: ["user_id"]
records_path: "$.data[*]"
pagination:
style: "offset"
page_size: 200
schema:
- name: user_id
type: string
- name: email
type: string
- name: plan
type: string
- name: created_at
type: string
format: date-time
destination:
bucket: "s3://my-data-lake"
path: "raw/internal"
format: parquet

Run It

Terminal window
export INTERNAL_API_TOKEN="tok_..."
dataspoc-pipe run internal-events.yaml
[internal-events] Starting pipeline...
[internal-events] Extracting: user_events
[internal-events] → Cursor page 1: 100 records
[internal-events] → Cursor page 2: 100 records
[internal-events] → Cursor page 3: 47 records
[internal-events] Extracting: users
[internal-events] → Offset 0: 200 records
[internal-events] → Offset 200: 200 records
[internal-events] → Offset 400: 83 records
[internal-events] Total: 247 events + 483 users
[internal-events] Writing Parquet to s3://my-data-lake/raw/internal/
[internal-events] ✓ Complete: 730 rows, 2 files, 156KB

Query: Join Events with Users

from dataspoc_lens import LensClient
lens = LensClient()
# Signups by plan this month
df = lens.query("""
SELECT
u.plan,
COUNT(*) AS signups,
MIN(e.timestamp) AS first_signup,
MAX(e.timestamp) AS last_signup
FROM raw_internal_user_events e
JOIN raw_internal_users u ON e.user_id = u.user_id
WHERE e.event_type = 'signup'
AND e.timestamp >= '2026-04-01'
GROUP BY u.plan
ORDER BY signups DESC
""")
print(df)
# Cancellation rate by plan
df = lens.query("""
SELECT
u.plan,
COUNT(*) FILTER (WHERE e.event_type = 'cancellation') AS cancellations,
COUNT(*) FILTER (WHERE e.event_type = 'signup') AS signups,
ROUND(
100.0 * COUNT(*) FILTER (WHERE e.event_type = 'cancellation')
/ NULLIF(COUNT(*) FILTER (WHERE e.event_type = 'signup'), 0),
1
) AS cancel_rate_pct
FROM raw_internal_user_events e
JOIN raw_internal_users u ON e.user_id = u.user_id
WHERE e.timestamp >= '2026-01-01'
GROUP BY u.plan
ORDER BY cancel_rate_pct DESC
""")
print(df)

Handling Pagination Styles

tap-rest-api-msdk supports multiple pagination strategies:

StyleConfigUse When
Link headerstyle: "link_header"API returns Link: <url>; rel="next" header (GitHub, many REST APIs)
Cursorstyle: "jsonpath" + next_page_token_pathAPI returns a cursor/token for next page (Stripe, Slack)
Offsetstyle: "offset" + page_sizeAPI uses ?offset=N&limit=N (simple REST APIs)
Page numberstyle: "page" + page_sizeAPI uses ?page=N (older APIs)
Headerstyle: "header_link"Next page URL in response header

Scheduling with Cron

Once your pipeline works, schedule it:

Terminal window
# Add to crontab
crontab -e
# Pull GitHub data every 6 hours
0 */6 * * * cd /opt/pipelines && dataspoc-pipe run github-repos.yaml >> /var/log/pipe-github.log 2>&1
# Pull weather data daily at 6 AM
0 6 * * * cd /opt/pipelines && dataspoc-pipe run weather-daily.yaml >> /var/log/pipe-weather.log 2>&1
# Pull internal events every hour
0 * * * * cd /opt/pipelines && dataspoc-pipe run internal-events.yaml >> /var/log/pipe-internal.log 2>&1

Using the Python SDK

For programmatic control, use PipeClient:

from dataspoc_pipe import PipeClient
pipe = PipeClient()
# Run a pipeline
result = pipe.run("github-repos.yaml")
print(f"Status: {result.status}")
print(f"Rows: {result.rows_extracted}")
print(f"Files: {result.files_written}")
# List available pipelines
pipelines = pipe.list_pipelines()
for p in pipelines:
print(f"{p.name}: last run {p.last_run}, status {p.status}")

Any REST API that returns JSON can be ingested into Parquet and queried with SQL in under 10 minutes. No custom code. No database. Just a YAML file and two commands.

Recommended