rest-apietlpipelineparquettutorial

Ingesta cualquier REST API a Parquet en 10 minutos

Michael San Martim · 2026-04-20

Toda empresa tiene APIs internas. Datos de clientes en un CRM. Eventos en un backend de analitica. Metricas en una herramienta de monitoreo. Llevar esos datos a tu data lake generalmente significa escribir scripts de Python personalizados, gestionar programacion, manejar paginacion y lidiar con cambios de esquema.

DataSpoc Pipe envuelve el ecosistema Singer — 400+ taps — y escribe directamente a Parquet en tu bucket de nube. Para REST APIs, tap-rest-api-msdk maneja las llamadas HTTP, paginacion y detección de esquema. Tu solo proporcionas la configuración del endpoint.

Como funcióna

REST API → tap-rest-api-msdk → DataSpoc Pipe → Parquet in S3/GCS/Azure

Pipe gestiona el tap, convierte la salida a Parquet y escribe a tu bucket. Sin base de datos intermedia. Sin area de staging.

Instalar

Terminal window
pip install dataspoc-pipe

Pipe instala los Singer taps automáticamente cuando ejecutas por primera vez un pipeline que los usa.

Ejemplo 1: GitHub API

Extrae datos de repositorios de la API de GitHub a tu data lake.

Configuracion del pipeline

Crea github-repos.yaml:

pipeline: github-repos
source:
tap: tap-rest-api-msdk
config:
api_url: "https://api.github.com"
auth_method: "header"
auth_token: "${GITHUB_TOKEN}"
streams:
- name: repositories
path: "/orgs/my-org/repos"
primary_keys: ["id"]
records_path: "$[*]"
pagination:
style: "link_header"
schema:
- name: id
type: integer
- name: name
type: string
- name: full_name
type: string
- name: stargazers_count
type: integer
- name: language
type: string
- name: created_at
type: string
format: date-time
- name: updated_at
type: string
format: date-time
- name: open_issues_count
type: integer
destination:
bucket: "s3://my-data-lake"
path: "raw/github/repositories"
format: parquet

Ejecutarlo

Terminal window
export GITHUB_TOKEN="ghp_..."
dataspoc-pipe run github-repos.yaml

Salida:

[github-repos] Starting pipeline...
[github-repos] Installing tap-rest-api-msdk...
[github-repos] Extracting: repositories
[github-repos] → Page 1: 30 records
[github-repos] → Page 2: 30 records
[github-repos] → Page 3: 12 records
[github-repos] Total: 72 records extracted
[github-repos] Writing Parquet to s3://my-data-lake/raw/github/repositories/
[github-repos] ✓ Complete: 72 rows, 1 file, 23KB

Consultar con Lens

from dataspoc_lens import LensClient
lens = LensClient()
# Which repos have the most open issues?
df = lens.query("""
SELECT name, language, stargazers_count, open_issues_count
FROM raw_github_repositories
WHERE open_issues_count > 0
ORDER BY open_issues_count DESC
LIMIT 10
""")
print(df)

Ejemplo 2: API del clima

Extrae datos meteorologicos históricos de Open-Meteo (gratis, sin autenticacion requerida).

Configuracion del pipeline

Crea weather-daily.yaml:

pipeline: weather-daily
source:
tap: tap-rest-api-msdk
config:
api_url: "https://archive-api.open-meteo.com"
streams:
- name: daily_weather
path: "/v1/archive"
params:
latitude: "40.7128"
longitude: "-74.0060"
start_date: "2025-01-01"
end_date: "2026-04-01"
daily: "temperature_2m_max,temperature_2m_min,precipitation_sum,windspeed_10m_max"
records_path: "$.daily"
primary_keys: ["time"]
schema:
- name: time
type: string
format: date
- name: temperature_2m_max
type: number
- name: temperature_2m_min
type: number
- name: precipitation_sum
type: number
- name: windspeed_10m_max
type: number
destination:
bucket: "s3://my-data-lake"
path: "raw/weather/daily"
format: parquet

Ejecutar y consultar

Terminal window
dataspoc-pipe run weather-daily.yaml
from dataspoc_lens import LensClient
lens = LensClient()
# Average temperature by month
df = lens.query("""
SELECT
DATE_TRUNC('month', CAST(time AS DATE)) AS month,
ROUND(AVG(temperature_2m_max), 1) AS avg_high,
ROUND(AVG(temperature_2m_min), 1) AS avg_low,
ROUND(SUM(precipitation_sum), 1) AS total_rain_mm
FROM raw_weather_daily
GROUP BY month
ORDER BY month
""")
print(df)

Ejemplo 3: API interna de la empresa

La mayoria de las empresas tienen APIs internas para cosas cómo actividad de usuarios, eventos de producto o datos de CRM. Asi es cómo ingestar desde una API interna tipica con autenticacion por token y paginacion basada en cursor.

Configuracion del pipeline

Crea internal-events.yaml:

pipeline: internal-events
source:
tap: tap-rest-api-msdk
config:
api_url: "https://api.internal.company.com"
auth_method: "header"
auth_token: "${INTERNAL_API_TOKEN}"
headers:
X-API-Version: "2"
streams:
- name: user_events
path: "/v2/events"
primary_keys: ["event_id"]
records_path: "$.data[*]"
pagination:
style: "jsonpath"
next_page_token_path: "$.pagination.cursor"
page_size: 100
params:
start_date: "2026-04-01"
event_type: "purchase,signup,cancellation"
schema:
- name: event_id
type: string
- name: user_id
type: string
- name: event_type
type: string
- name: timestamp
type: string
format: date-time
- name: properties
type: object
- name: users
path: "/v2/users"
primary_keys: ["user_id"]
records_path: "$.data[*]"
pagination:
style: "offset"
page_size: 200
schema:
- name: user_id
type: string
- name: email
type: string
- name: plan
type: string
- name: created_at
type: string
format: date-time
destination:
bucket: "s3://my-data-lake"
path: "raw/internal"
format: parquet

Ejecutarlo

Terminal window
export INTERNAL_API_TOKEN="tok_..."
dataspoc-pipe run internal-events.yaml
[internal-events] Starting pipeline...
[internal-events] Extracting: user_events
[internal-events] → Cursor page 1: 100 records
[internal-events] → Cursor page 2: 100 records
[internal-events] → Cursor page 3: 47 records
[internal-events] Extracting: users
[internal-events] → Offset 0: 200 records
[internal-events] → Offset 200: 200 records
[internal-events] → Offset 400: 83 records
[internal-events] Total: 247 events + 483 users
[internal-events] Writing Parquet to s3://my-data-lake/raw/internal/
[internal-events] ✓ Complete: 730 rows, 2 files, 156KB

Consulta: Unir eventos con usuarios

from dataspoc_lens import LensClient
lens = LensClient()
# Signups by plan this month
df = lens.query("""
SELECT
u.plan,
COUNT(*) AS signups,
MIN(e.timestamp) AS first_signup,
MAX(e.timestamp) AS last_signup
FROM raw_internal_user_events e
JOIN raw_internal_users u ON e.user_id = u.user_id
WHERE e.event_type = 'signup'
AND e.timestamp >= '2026-04-01'
GROUP BY u.plan
ORDER BY signups DESC
""")
print(df)
# Cancellation rate by plan
df = lens.query("""
SELECT
u.plan,
COUNT(*) FILTER (WHERE e.event_type = 'cancellation') AS cancellations,
COUNT(*) FILTER (WHERE e.event_type = 'signup') AS signups,
ROUND(
100.0 * COUNT(*) FILTER (WHERE e.event_type = 'cancellation')
/ NULLIF(COUNT(*) FILTER (WHERE e.event_type = 'signup'), 0),
1
) AS cancel_rate_pct
FROM raw_internal_user_events e
JOIN raw_internal_users u ON e.user_id = u.user_id
WHERE e.timestamp >= '2026-01-01'
GROUP BY u.plan
ORDER BY cancel_rate_pct DESC
""")
print(df)

Manejo de estilos de paginacion

tap-rest-api-msdk soporta multiples estrategias de paginacion:

EstiloConfiguracionUsar cuando
Link headerstyle: "link_header"La API devuelve header Link: <url>; rel="next" (GitHub, muchas REST APIs)
Cursorstyle: "jsonpath" + next_page_token_pathLa API devuelve un cursor/token para la siguiente pagina (Stripe, Slack)
Offsetstyle: "offset" + page_sizeLa API usa ?offset=N&limit=N (REST APIs simples)
Numero de paginastyle: "page" + page_sizeLa API usa ?page=N (APIs antiguas)
Headerstyle: "header_link"URL de siguiente pagina en header de respuesta

Programacion con cron

Una vez que tu pipeline funcióna, programalo:

Terminal window
# Add to crontab
crontab -e
# Pull GitHub data every 6 hours
0 */6 * * * cd /opt/pipelines && dataspoc-pipe run github-repos.yaml >> /var/log/pipe-github.log 2>&1
# Pull weather data daily at 6 AM
0 6 * * * cd /opt/pipelines && dataspoc-pipe run weather-daily.yaml >> /var/log/pipe-weather.log 2>&1
# Pull internal events every hour
0 * * * * cd /opt/pipelines && dataspoc-pipe run internal-events.yaml >> /var/log/pipe-internal.log 2>&1

Usando el SDK de Python

Para control programatico, usa PipeClient:

from dataspoc_pipe import PipeClient
pipe = PipeClient()
# Run a pipeline
result = pipe.run("github-repos.yaml")
print(f"Status: {result.status}")
print(f"Rows: {result.rows_extracted}")
print(f"Files: {result.files_written}")
# List available pipelines
pipelines = pipe.list_pipelines()
for p in pipelines:
print(f"{p.name}: last run {p.last_run}, status {p.status}")

Cualquier REST API que devuelve JSON puede ser ingestada a Parquet y consultada con SQL en menos de 10 minutos. Sin código personalizado. Sin base de datos. Solo un archivo YAML y dos comandos.

Recomendados