Ingest Any REST API to Parquet in 10 Minutes
Every company has internal APIs. Customer data in a CRM. Events in an analytics backend. Metrics in a monitoring tool. Getting that data into your data lake usually means writing custom Python scripts, managing scheduling, handling pagination, and dealing with schema changes.
DataSpoc Pipe wraps the Singer ecosystem — 400+ taps — and writes directly to Parquet in your cloud bucket. For REST APIs, tap-rest-api-msdk handles the HTTP calls, pagination, and schema detection. You just provide the endpoint configuration.
How It Works
REST API → tap-rest-api-msdk → DataSpoc Pipe → Parquet in S3/GCS/AzurePipe manages the tap, converts the output to Parquet, and writes to your bucket. No intermediate database. No staging area.
Install
pip install dataspoc-pipePipe installs Singer taps automatically when you first run a pipeline that uses them.
Example 1: GitHub API
Pull repository data from the GitHub API into your data lake.
Pipeline Configuration
Create github-repos.yaml:
pipeline: github-repossource: tap: tap-rest-api-msdk config: api_url: "https://api.github.com" auth_method: "header" auth_token: "${GITHUB_TOKEN}" streams: - name: repositories path: "/orgs/my-org/repos" primary_keys: ["id"] records_path: "$[*]" pagination: style: "link_header" schema: - name: id type: integer - name: name type: string - name: full_name type: string - name: stargazers_count type: integer - name: language type: string - name: created_at type: string format: date-time - name: updated_at type: string format: date-time - name: open_issues_count type: integer
destination: bucket: "s3://my-data-lake" path: "raw/github/repositories" format: parquetRun It
export GITHUB_TOKEN="ghp_..."dataspoc-pipe run github-repos.yamlOutput:
[github-repos] Starting pipeline...[github-repos] Installing tap-rest-api-msdk...[github-repos] Extracting: repositories[github-repos] → Page 1: 30 records[github-repos] → Page 2: 30 records[github-repos] → Page 3: 12 records[github-repos] Total: 72 records extracted[github-repos] Writing Parquet to s3://my-data-lake/raw/github/repositories/[github-repos] ✓ Complete: 72 rows, 1 file, 23KBQuery with Lens
from dataspoc_lens import LensClient
lens = LensClient()
# Which repos have the most open issues?df = lens.query(""" SELECT name, language, stargazers_count, open_issues_count FROM raw_github_repositories WHERE open_issues_count > 0 ORDER BY open_issues_count DESC LIMIT 10""")print(df)Example 2: Weather API
Pull historical weather data from Open-Meteo (free, no auth required).
Pipeline Configuration
Create weather-daily.yaml:
pipeline: weather-dailysource: tap: tap-rest-api-msdk config: api_url: "https://archive-api.open-meteo.com" streams: - name: daily_weather path: "/v1/archive" params: latitude: "40.7128" longitude: "-74.0060" start_date: "2025-01-01" end_date: "2026-04-01" daily: "temperature_2m_max,temperature_2m_min,precipitation_sum,windspeed_10m_max" records_path: "$.daily" primary_keys: ["time"] schema: - name: time type: string format: date - name: temperature_2m_max type: number - name: temperature_2m_min type: number - name: precipitation_sum type: number - name: windspeed_10m_max type: number
destination: bucket: "s3://my-data-lake" path: "raw/weather/daily" format: parquetRun and Query
dataspoc-pipe run weather-daily.yamlfrom dataspoc_lens import LensClient
lens = LensClient()
# Average temperature by monthdf = lens.query(""" SELECT DATE_TRUNC('month', CAST(time AS DATE)) AS month, ROUND(AVG(temperature_2m_max), 1) AS avg_high, ROUND(AVG(temperature_2m_min), 1) AS avg_low, ROUND(SUM(precipitation_sum), 1) AS total_rain_mm FROM raw_weather_daily GROUP BY month ORDER BY month""")print(df)Example 3: Internal Company API
Most companies have internal APIs for things like user activity, product events, or CRM data. Here is how to ingest from a typical internal API with token auth and cursor-based pagination.
Pipeline Configuration
Create internal-events.yaml:
pipeline: internal-eventssource: tap: tap-rest-api-msdk config: api_url: "https://api.internal.company.com" auth_method: "header" auth_token: "${INTERNAL_API_TOKEN}" headers: X-API-Version: "2" streams: - name: user_events path: "/v2/events" primary_keys: ["event_id"] records_path: "$.data[*]" pagination: style: "jsonpath" next_page_token_path: "$.pagination.cursor" page_size: 100 params: start_date: "2026-04-01" event_type: "purchase,signup,cancellation" schema: - name: event_id type: string - name: user_id type: string - name: event_type type: string - name: timestamp type: string format: date-time - name: properties type: object - name: users path: "/v2/users" primary_keys: ["user_id"] records_path: "$.data[*]" pagination: style: "offset" page_size: 200 schema: - name: user_id type: string - name: email type: string - name: plan type: string - name: created_at type: string format: date-time
destination: bucket: "s3://my-data-lake" path: "raw/internal" format: parquetRun It
export INTERNAL_API_TOKEN="tok_..."dataspoc-pipe run internal-events.yaml[internal-events] Starting pipeline...[internal-events] Extracting: user_events[internal-events] → Cursor page 1: 100 records[internal-events] → Cursor page 2: 100 records[internal-events] → Cursor page 3: 47 records[internal-events] Extracting: users[internal-events] → Offset 0: 200 records[internal-events] → Offset 200: 200 records[internal-events] → Offset 400: 83 records[internal-events] Total: 247 events + 483 users[internal-events] Writing Parquet to s3://my-data-lake/raw/internal/[internal-events] ✓ Complete: 730 rows, 2 files, 156KBQuery: Join Events with Users
from dataspoc_lens import LensClient
lens = LensClient()
# Signups by plan this monthdf = lens.query(""" SELECT u.plan, COUNT(*) AS signups, MIN(e.timestamp) AS first_signup, MAX(e.timestamp) AS last_signup FROM raw_internal_user_events e JOIN raw_internal_users u ON e.user_id = u.user_id WHERE e.event_type = 'signup' AND e.timestamp >= '2026-04-01' GROUP BY u.plan ORDER BY signups DESC""")print(df)
# Cancellation rate by plandf = lens.query(""" SELECT u.plan, COUNT(*) FILTER (WHERE e.event_type = 'cancellation') AS cancellations, COUNT(*) FILTER (WHERE e.event_type = 'signup') AS signups, ROUND( 100.0 * COUNT(*) FILTER (WHERE e.event_type = 'cancellation') / NULLIF(COUNT(*) FILTER (WHERE e.event_type = 'signup'), 0), 1 ) AS cancel_rate_pct FROM raw_internal_user_events e JOIN raw_internal_users u ON e.user_id = u.user_id WHERE e.timestamp >= '2026-01-01' GROUP BY u.plan ORDER BY cancel_rate_pct DESC""")print(df)Handling Pagination Styles
tap-rest-api-msdk supports multiple pagination strategies:
| Style | Config | Use When |
|---|---|---|
| Link header | style: "link_header" | API returns Link: <url>; rel="next" header (GitHub, many REST APIs) |
| Cursor | style: "jsonpath" + next_page_token_path | API returns a cursor/token for next page (Stripe, Slack) |
| Offset | style: "offset" + page_size | API uses ?offset=N&limit=N (simple REST APIs) |
| Page number | style: "page" + page_size | API uses ?page=N (older APIs) |
| Header | style: "header_link" | Next page URL in response header |
Scheduling with Cron
Once your pipeline works, schedule it:
# Add to crontabcrontab -e# Pull GitHub data every 6 hours0 */6 * * * cd /opt/pipelines && dataspoc-pipe run github-repos.yaml >> /var/log/pipe-github.log 2>&1
# Pull weather data daily at 6 AM0 6 * * * cd /opt/pipelines && dataspoc-pipe run weather-daily.yaml >> /var/log/pipe-weather.log 2>&1
# Pull internal events every hour0 * * * * cd /opt/pipelines && dataspoc-pipe run internal-events.yaml >> /var/log/pipe-internal.log 2>&1Using the Python SDK
For programmatic control, use PipeClient:
from dataspoc_pipe import PipeClient
pipe = PipeClient()
# Run a pipelineresult = pipe.run("github-repos.yaml")print(f"Status: {result.status}")print(f"Rows: {result.rows_extracted}")print(f"Files: {result.files_written}")
# List available pipelinespipelines = pipe.list_pipelines()for p in pipelines: print(f"{p.name}: last run {p.last_run}, status {p.status}")Any REST API that returns JSON can be ingested into Parquet and queried with SQL in under 10 minutes. No custom code. No database. Just a YAML file and two commands.