Ingira Qualquer REST API para Parquet em 10 Minutos
Toda empresa tem APIs internas. Dados de clientes em um CRM. Eventos em um backend de analytics. Métricas em uma ferramenta de monitoramento. Colocar esses dados no seu data lake geralmente significa escrever scripts Python customizados, gerenciar agendamento, lidar com paginação e tratar mudanças de schema.
O DataSpoc Pipe encapsula o ecossistema Singer — 400+ taps — e escreve diretamente em Parquet no seu bucket na nuvem. Para REST APIs, tap-rest-api-msdk cuida das chamadas HTTP, paginação e detecção de schema. Você só fornece a configuração do endpoint.
Como Funciona
REST API → tap-rest-api-msdk → DataSpoc Pipe → Parquet in S3/GCS/AzureO Pipe gerencia o tap, converte a saída para Parquet e escreve no seu bucket. Sem banco de dados intermediário. Sem área de staging.
Instalação
pip install dataspoc-pipeO Pipe instala Singer taps automaticamente quando você executa um pipeline que os usa pela primeira vez.
Exemplo 1: API do GitHub
Puxe dados de repositórios da API do GitHub para seu data lake.
Configuração do Pipeline
Crie github-repos.yaml:
pipeline: github-repossource: tap: tap-rest-api-msdk config: api_url: "https://api.github.com" auth_method: "header" auth_token: "${GITHUB_TOKEN}" streams: - name: repositories path: "/orgs/my-org/repos" primary_keys: ["id"] records_path: "$[*]" pagination: style: "link_header" schema: - name: id type: integer - name: name type: string - name: full_name type: string - name: stargazers_count type: integer - name: language type: string - name: created_at type: string format: date-time - name: updated_at type: string format: date-time - name: open_issues_count type: integer
destination: bucket: "s3://my-data-lake" path: "raw/github/repositories" format: parquetExecute
export GITHUB_TOKEN="ghp_..."dataspoc-pipe run github-repos.yamlSaída:
[github-repos] Starting pipeline...[github-repos] Installing tap-rest-api-msdk...[github-repos] Extracting: repositories[github-repos] → Page 1: 30 records[github-repos] → Page 2: 30 records[github-repos] → Page 3: 12 records[github-repos] Total: 72 records extracted[github-repos] Writing Parquet to s3://my-data-lake/raw/github/repositories/[github-repos] ✓ Complete: 72 rows, 1 file, 23KBConsulte com Lens
from dataspoc_lens import LensClient
lens = LensClient()
# Which repos have the most open issues?df = lens.query(""" SELECT name, language, stargazers_count, open_issues_count FROM raw_github_repositories WHERE open_issues_count > 0 ORDER BY open_issues_count DESC LIMIT 10""")print(df)Exemplo 2: API de Clima
Puxe dados históricos de clima do Open-Meteo (grátis, sem autenticação necessária).
Configuração do Pipeline
Crie weather-daily.yaml:
pipeline: weather-dailysource: tap: tap-rest-api-msdk config: api_url: "https://archive-api.open-meteo.com" streams: - name: daily_weather path: "/v1/archive" params: latitude: "40.7128" longitude: "-74.0060" start_date: "2025-01-01" end_date: "2026-04-01" daily: "temperature_2m_max,temperature_2m_min,precipitation_sum,windspeed_10m_max" records_path: "$.daily" primary_keys: ["time"] schema: - name: time type: string format: date - name: temperature_2m_max type: number - name: temperature_2m_min type: number - name: precipitation_sum type: number - name: windspeed_10m_max type: number
destination: bucket: "s3://my-data-lake" path: "raw/weather/daily" format: parquetExecute e Consulte
dataspoc-pipe run weather-daily.yamlfrom dataspoc_lens import LensClient
lens = LensClient()
# Average temperature by monthdf = lens.query(""" SELECT DATE_TRUNC('month', CAST(time AS DATE)) AS month, ROUND(AVG(temperature_2m_max), 1) AS avg_high, ROUND(AVG(temperature_2m_min), 1) AS avg_low, ROUND(SUM(precipitation_sum), 1) AS total_rain_mm FROM raw_weather_daily GROUP BY month ORDER BY month""")print(df)Exemplo 3: API Interna da Empresa
A maioria das empresas tem APIs internas para coisas como atividade de usuários, eventos de produto ou dados de CRM. Veja como ingerir de uma API interna típica com autenticação por token e paginação baseada em cursor.
Configuração do Pipeline
Crie internal-events.yaml:
pipeline: internal-eventssource: tap: tap-rest-api-msdk config: api_url: "https://api.internal.company.com" auth_method: "header" auth_token: "${INTERNAL_API_TOKEN}" headers: X-API-Version: "2" streams: - name: user_events path: "/v2/events" primary_keys: ["event_id"] records_path: "$.data[*]" pagination: style: "jsonpath" next_page_token_path: "$.pagination.cursor" page_size: 100 params: start_date: "2026-04-01" event_type: "purchase,signup,cancellation" schema: - name: event_id type: string - name: user_id type: string - name: event_type type: string - name: timestamp type: string format: date-time - name: properties type: object - name: users path: "/v2/users" primary_keys: ["user_id"] records_path: "$.data[*]" pagination: style: "offset" page_size: 200 schema: - name: user_id type: string - name: email type: string - name: plan type: string - name: created_at type: string format: date-time
destination: bucket: "s3://my-data-lake" path: "raw/internal" format: parquetExecute
export INTERNAL_API_TOKEN="tok_..."dataspoc-pipe run internal-events.yaml[internal-events] Starting pipeline...[internal-events] Extracting: user_events[internal-events] → Cursor page 1: 100 records[internal-events] → Cursor page 2: 100 records[internal-events] → Cursor page 3: 47 records[internal-events] Extracting: users[internal-events] → Offset 0: 200 records[internal-events] → Offset 200: 200 records[internal-events] → Offset 400: 83 records[internal-events] Total: 247 events + 483 users[internal-events] Writing Parquet to s3://my-data-lake/raw/internal/[internal-events] ✓ Complete: 730 rows, 2 files, 156KBConsulta: Join de Eventos com Usuários
from dataspoc_lens import LensClient
lens = LensClient()
# Signups by plan this monthdf = lens.query(""" SELECT u.plan, COUNT(*) AS signups, MIN(e.timestamp) AS first_signup, MAX(e.timestamp) AS last_signup FROM raw_internal_user_events e JOIN raw_internal_users u ON e.user_id = u.user_id WHERE e.event_type = 'signup' AND e.timestamp >= '2026-04-01' GROUP BY u.plan ORDER BY signups DESC""")print(df)
# Cancellation rate by plandf = lens.query(""" SELECT u.plan, COUNT(*) FILTER (WHERE e.event_type = 'cancellation') AS cancellations, COUNT(*) FILTER (WHERE e.event_type = 'signup') AS signups, ROUND( 100.0 * COUNT(*) FILTER (WHERE e.event_type = 'cancellation') / NULLIF(COUNT(*) FILTER (WHERE e.event_type = 'signup'), 0), 1 ) AS cancel_rate_pct FROM raw_internal_user_events e JOIN raw_internal_users u ON e.user_id = u.user_id WHERE e.timestamp >= '2026-01-01' GROUP BY u.plan ORDER BY cancel_rate_pct DESC""")print(df)Lidando com Estilos de Paginação
tap-rest-api-msdk suporta múltiplas estratégias de paginação:
| Estilo | Configuração | Use Quando |
|---|---|---|
| Link header | style: "link_header" | API retorna header Link: <url>; rel="next" (GitHub, muitas REST APIs) |
| Cursor | style: "jsonpath" + next_page_token_path | API retorna um cursor/token para próxima página (Stripe, Slack) |
| Offset | style: "offset" + page_size | API usa ?offset=N&limit=N (REST APIs simples) |
| Número de página | style: "page" + page_size | API usa ?page=N (APIs mais antigas) |
| Header | style: "header_link" | URL da próxima página no header da resposta |
Agendamento com Cron
Uma vez que seu pipeline funciona, agende-o:
# Add to crontabcrontab -e# Pull GitHub data every 6 hours0 */6 * * * cd /opt/pipelines && dataspoc-pipe run github-repos.yaml >> /var/log/pipe-github.log 2>&1
# Pull weather data daily at 6 AM0 6 * * * cd /opt/pipelines && dataspoc-pipe run weather-daily.yaml >> /var/log/pipe-weather.log 2>&1
# Pull internal events every hour0 * * * * cd /opt/pipelines && dataspoc-pipe run internal-events.yaml >> /var/log/pipe-internal.log 2>&1Usando o SDK Python
Para controle programático, use o PipeClient:
from dataspoc_pipe import PipeClient
pipe = PipeClient()
# Run a pipelineresult = pipe.run("github-repos.yaml")print(f"Status: {result.status}")print(f"Rows: {result.rows_extracted}")print(f"Files: {result.files_written}")
# List available pipelinespipelines = pipe.list_pipelines()for p in pipelines: print(f"{p.name}: last run {p.last_run}, status {p.status}")Qualquer REST API que retorna JSON pode ser ingerida em Parquet e consultada com SQL em menos de 10 minutos. Sem código customizado. Sem banco de dados. Apenas um arquivo YAML e dois comandos.