rest-apietlpipelineparquettutorial

Ingira Qualquer REST API para Parquet em 10 Minutos

Michael San Martim · 2026-04-20

Toda empresa tem APIs internas. Dados de clientes em um CRM. Eventos em um backend de analytics. Métricas em uma ferramenta de monitoramento. Colocar esses dados no seu data lake geralmente significa escrever scripts Python customizados, gerenciar agendamento, lidar com paginação e tratar mudanças de schema.

O DataSpoc Pipe encapsula o ecossistema Singer — 400+ taps — e escreve diretamente em Parquet no seu bucket na nuvem. Para REST APIs, tap-rest-api-msdk cuida das chamadas HTTP, paginação e detecção de schema. Você só fornece a configuração do endpoint.

Como Funciona

REST API → tap-rest-api-msdk → DataSpoc Pipe → Parquet in S3/GCS/Azure

O Pipe gerencia o tap, converte a saída para Parquet e escreve no seu bucket. Sem banco de dados intermediário. Sem área de staging.

Instalação

Terminal window
pip install dataspoc-pipe

O Pipe instala Singer taps automaticamente quando você executa um pipeline que os usa pela primeira vez.

Exemplo 1: API do GitHub

Puxe dados de repositórios da API do GitHub para seu data lake.

Configuração do Pipeline

Crie github-repos.yaml:

pipeline: github-repos
source:
tap: tap-rest-api-msdk
config:
api_url: "https://api.github.com"
auth_method: "header"
auth_token: "${GITHUB_TOKEN}"
streams:
- name: repositories
path: "/orgs/my-org/repos"
primary_keys: ["id"]
records_path: "$[*]"
pagination:
style: "link_header"
schema:
- name: id
type: integer
- name: name
type: string
- name: full_name
type: string
- name: stargazers_count
type: integer
- name: language
type: string
- name: created_at
type: string
format: date-time
- name: updated_at
type: string
format: date-time
- name: open_issues_count
type: integer
destination:
bucket: "s3://my-data-lake"
path: "raw/github/repositories"
format: parquet

Execute

Terminal window
export GITHUB_TOKEN="ghp_..."
dataspoc-pipe run github-repos.yaml

Saída:

[github-repos] Starting pipeline...
[github-repos] Installing tap-rest-api-msdk...
[github-repos] Extracting: repositories
[github-repos] → Page 1: 30 records
[github-repos] → Page 2: 30 records
[github-repos] → Page 3: 12 records
[github-repos] Total: 72 records extracted
[github-repos] Writing Parquet to s3://my-data-lake/raw/github/repositories/
[github-repos] ✓ Complete: 72 rows, 1 file, 23KB

Consulte com Lens

from dataspoc_lens import LensClient
lens = LensClient()
# Which repos have the most open issues?
df = lens.query("""
SELECT name, language, stargazers_count, open_issues_count
FROM raw_github_repositories
WHERE open_issues_count > 0
ORDER BY open_issues_count DESC
LIMIT 10
""")
print(df)

Exemplo 2: API de Clima

Puxe dados históricos de clima do Open-Meteo (grátis, sem autenticação necessária).

Configuração do Pipeline

Crie weather-daily.yaml:

pipeline: weather-daily
source:
tap: tap-rest-api-msdk
config:
api_url: "https://archive-api.open-meteo.com"
streams:
- name: daily_weather
path: "/v1/archive"
params:
latitude: "40.7128"
longitude: "-74.0060"
start_date: "2025-01-01"
end_date: "2026-04-01"
daily: "temperature_2m_max,temperature_2m_min,precipitation_sum,windspeed_10m_max"
records_path: "$.daily"
primary_keys: ["time"]
schema:
- name: time
type: string
format: date
- name: temperature_2m_max
type: number
- name: temperature_2m_min
type: number
- name: precipitation_sum
type: number
- name: windspeed_10m_max
type: number
destination:
bucket: "s3://my-data-lake"
path: "raw/weather/daily"
format: parquet

Execute e Consulte

Terminal window
dataspoc-pipe run weather-daily.yaml
from dataspoc_lens import LensClient
lens = LensClient()
# Average temperature by month
df = lens.query("""
SELECT
DATE_TRUNC('month', CAST(time AS DATE)) AS month,
ROUND(AVG(temperature_2m_max), 1) AS avg_high,
ROUND(AVG(temperature_2m_min), 1) AS avg_low,
ROUND(SUM(precipitation_sum), 1) AS total_rain_mm
FROM raw_weather_daily
GROUP BY month
ORDER BY month
""")
print(df)

Exemplo 3: API Interna da Empresa

A maioria das empresas tem APIs internas para coisas como atividade de usuários, eventos de produto ou dados de CRM. Veja como ingerir de uma API interna típica com autenticação por token e paginação baseada em cursor.

Configuração do Pipeline

Crie internal-events.yaml:

pipeline: internal-events
source:
tap: tap-rest-api-msdk
config:
api_url: "https://api.internal.company.com"
auth_method: "header"
auth_token: "${INTERNAL_API_TOKEN}"
headers:
X-API-Version: "2"
streams:
- name: user_events
path: "/v2/events"
primary_keys: ["event_id"]
records_path: "$.data[*]"
pagination:
style: "jsonpath"
next_page_token_path: "$.pagination.cursor"
page_size: 100
params:
start_date: "2026-04-01"
event_type: "purchase,signup,cancellation"
schema:
- name: event_id
type: string
- name: user_id
type: string
- name: event_type
type: string
- name: timestamp
type: string
format: date-time
- name: properties
type: object
- name: users
path: "/v2/users"
primary_keys: ["user_id"]
records_path: "$.data[*]"
pagination:
style: "offset"
page_size: 200
schema:
- name: user_id
type: string
- name: email
type: string
- name: plan
type: string
- name: created_at
type: string
format: date-time
destination:
bucket: "s3://my-data-lake"
path: "raw/internal"
format: parquet

Execute

Terminal window
export INTERNAL_API_TOKEN="tok_..."
dataspoc-pipe run internal-events.yaml
[internal-events] Starting pipeline...
[internal-events] Extracting: user_events
[internal-events] → Cursor page 1: 100 records
[internal-events] → Cursor page 2: 100 records
[internal-events] → Cursor page 3: 47 records
[internal-events] Extracting: users
[internal-events] → Offset 0: 200 records
[internal-events] → Offset 200: 200 records
[internal-events] → Offset 400: 83 records
[internal-events] Total: 247 events + 483 users
[internal-events] Writing Parquet to s3://my-data-lake/raw/internal/
[internal-events] ✓ Complete: 730 rows, 2 files, 156KB

Consulta: Join de Eventos com Usuários

from dataspoc_lens import LensClient
lens = LensClient()
# Signups by plan this month
df = lens.query("""
SELECT
u.plan,
COUNT(*) AS signups,
MIN(e.timestamp) AS first_signup,
MAX(e.timestamp) AS last_signup
FROM raw_internal_user_events e
JOIN raw_internal_users u ON e.user_id = u.user_id
WHERE e.event_type = 'signup'
AND e.timestamp >= '2026-04-01'
GROUP BY u.plan
ORDER BY signups DESC
""")
print(df)
# Cancellation rate by plan
df = lens.query("""
SELECT
u.plan,
COUNT(*) FILTER (WHERE e.event_type = 'cancellation') AS cancellations,
COUNT(*) FILTER (WHERE e.event_type = 'signup') AS signups,
ROUND(
100.0 * COUNT(*) FILTER (WHERE e.event_type = 'cancellation')
/ NULLIF(COUNT(*) FILTER (WHERE e.event_type = 'signup'), 0),
1
) AS cancel_rate_pct
FROM raw_internal_user_events e
JOIN raw_internal_users u ON e.user_id = u.user_id
WHERE e.timestamp >= '2026-01-01'
GROUP BY u.plan
ORDER BY cancel_rate_pct DESC
""")
print(df)

Lidando com Estilos de Paginação

tap-rest-api-msdk suporta múltiplas estratégias de paginação:

EstiloConfiguraçãoUse Quando
Link headerstyle: "link_header"API retorna header Link: <url>; rel="next" (GitHub, muitas REST APIs)
Cursorstyle: "jsonpath" + next_page_token_pathAPI retorna um cursor/token para próxima página (Stripe, Slack)
Offsetstyle: "offset" + page_sizeAPI usa ?offset=N&limit=N (REST APIs simples)
Número de páginastyle: "page" + page_sizeAPI usa ?page=N (APIs mais antigas)
Headerstyle: "header_link"URL da próxima página no header da resposta

Agendamento com Cron

Uma vez que seu pipeline funciona, agende-o:

Terminal window
# Add to crontab
crontab -e
# Pull GitHub data every 6 hours
0 */6 * * * cd /opt/pipelines && dataspoc-pipe run github-repos.yaml >> /var/log/pipe-github.log 2>&1
# Pull weather data daily at 6 AM
0 6 * * * cd /opt/pipelines && dataspoc-pipe run weather-daily.yaml >> /var/log/pipe-weather.log 2>&1
# Pull internal events every hour
0 * * * * cd /opt/pipelines && dataspoc-pipe run internal-events.yaml >> /var/log/pipe-internal.log 2>&1

Usando o SDK Python

Para controle programático, use o PipeClient:

from dataspoc_pipe import PipeClient
pipe = PipeClient()
# Run a pipeline
result = pipe.run("github-repos.yaml")
print(f"Status: {result.status}")
print(f"Rows: {result.rows_extracted}")
print(f"Files: {result.files_written}")
# List available pipelines
pipelines = pipe.list_pipelines()
for p in pipelines:
print(f"{p.name}: last run {p.last_run}, status {p.status}")

Qualquer REST API que retorna JSON pode ser ingerida em Parquet e consultada com SQL em menos de 10 minutos. Sem código customizado. Sem banco de dados. Apenas um arquivo YAML e dois comandos.

Recomendados