MongoDB para Parquet: Construa um Data Lake a partir do Seu MongoDB em 15 Minutos
Seu aplicativo roda em MongoDB. Seus analistas querem SQL. Sua equipe de dados quer um data lake. Você não quer construir um pipeline ETL do zero.
Este post leva você de um banco MongoDB a um data lake consultável baseado em Parquet em 15 minutos usando o DataSpoc Pipe. Você vai extrair coleções, achatar documentos aninhados, configurar sincronização incremental e consultar o resultado com SQL.
O Problema
MongoDB é ótimo para aplicações mas doloroso para analytics:
- Sem SQL (sim, existe MQL, mas seus analistas não conhecem)
- Sem joins (aggregation pipelines são verbosos e lentos para analytics)
- Carga em produção: consultas analíticas competem com seu aplicativo
- Sem schema: cada documento pode ser diferente
A solução: extraia para Parquet uma vez, consulte para sempre com DuckDB.
Passo 1: Instalar o MongoDB Tap
pip install dataspoc-pipedataspoc-pipe add tap-mongodbIsso instala o extrator MongoDB compatível com Singer. O DataSpoc Pipe o gerencia como um plugin.
Passo 2: Configurar a Conexão
Crie um YAML de pipeline que define a fonte e o destino:
source: tap: tap-mongodb config: connection_string: "${MONGODB_URI}" database: "production" collections: - users - orders - products - events
target: bucket: "s3://my-company-data" prefix: "raw/mongodb" format: parquet
settings: batch_size: 10000 flatten_nested: true flatten_max_depth: 3Defina sua string de conexão MongoDB como variável de ambiente:
export MONGODB_URI="mongodb+srv://readonly:password@cluster0.abc123.mongodb.net"Use um usuário somente leitura. Nunca dê acesso de escrita ao seu pipeline ETL em produção.
Passo 3: Visualizar o Que Será Extraído
Antes de executar uma extração completa, visualize o schema que o Pipe descobre:
dataspoc-pipe discover mongodb-productionSaída:
Discovered 4 collections:
users (12,450 documents) _id ObjectId email string name string plan string address object → address_street, address_city, address_state, address_zip created_at datetime metadata object → metadata_source, metadata_campaign
orders (89,230 documents) _id ObjectId user_id ObjectId items array → flattened to separate rows total number status string created_at datetime
products (340 documents) _id ObjectId name string sku string price number category string tags array → tags_0, tags_1, tags_2
events (1,245,600 documents) _id ObjectId user_id ObjectId event_type string properties object → properties_page, properties_referrer, properties_duration timestamp datetimeNote como flatten_nested: true converte objetos aninhados em colunas planas. O objeto address vira address_street, address_city, etc. Isso é crítico porque Parquet precisa de um schema plano.
Passo 4: Executar a Extração
dataspoc-pipe run mongodb-production[14:23:01] Starting pipeline: mongodb-production[14:23:02] Extracting: users (12,450 docs)[14:23:05] → raw/mongodb/users/users_20260426_142305.parquet (1.2 MB)[14:23:05] Extracting: orders (89,230 docs)[14:23:18] → raw/mongodb/orders/orders_20260426_142305.parquet (8.4 MB)[14:23:18] Extracting: products (340 docs)[14:23:19] → raw/mongodb/products/products_20260426_142319.parquet (42 KB)[14:23:19] Extracting: events (1,245,600 docs)[14:24:02] → raw/mongodb/events/events_20260426_142319.parquet (95 MB)[14:24:02] Pipeline complete: 4 tables, 1,347,620 rows, 105 MB Parquet[14:24:02] State saved to .dataspoc/state/mongodb-production/state.jsonQuatro coleções extraídas. 1,3 milhão de documentos convertidos para Parquet. O arquivo de estado registra o último _id visto em cada coleção para extração incremental.
Passo 5: Consultar com DataSpoc Lens
dataspoc-lens add-bucket s3://my-company-data --name productiondataspoc-lens tablesraw.mongodb.usersraw.mongodb.ordersraw.mongodb.productsraw.mongodb.eventsAgora execute SQL:
dataspoc-lens query " SELECT u.plan, COUNT(DISTINCT u._id) as customers, SUM(o.total) as revenue, ROUND(SUM(o.total) / COUNT(DISTINCT u._id), 2) as arpu FROM raw.mongodb.users u JOIN raw.mongodb.orders o ON u._id = o.user_id WHERE o.status = 'completed' GROUP BY u.plan ORDER BY revenue DESC"┌──────────┬───────────┬────────────┬────────┐│ plan │ customers │ revenue │ arpu │├──────────┼───────────┼────────────┼────────┤│ business │ 2,340 │ 1,245,600 │ 532.31 ││ pro │ 4,120 │ 824,000 │ 200.00 ││ starter │ 5,990 │ 299,500 │ 50.00 │└──────────┴───────────┴────────────┴────────┘Seus analistas agora têm acesso SQL aos dados do MongoDB sem tocar no banco de produção.
Lidando com Documentos Aninhados
O schema flexível do MongoDB é o maior desafio. Veja como o Pipe lida com cada caso:
Objetos aninhados são achatados com separadores de underscore:
// MongoDB document{ "address": { "street": "123 Main St", "city": "San Francisco", "state": "CA" }}// Parquet columnsaddress_street VARCHAR "123 Main St"address_city VARCHAR "San Francisco"address_state VARCHAR "CA"Arrays de primitivos viram colunas numeradas:
{ "tags": ["electronics", "sale", "featured"] }tags_0 VARCHAR "electronics"tags_1 VARCHAR "sale"tags_2 VARCHAR "featured"Arrays de objetos (como itens de pedido) podem ser achatados em linhas separadas:
# In pipeline configsettings: flatten_nested: true flatten_arrays: explode # Creates one row per array element{ "order_id": "abc", "items": [ {"sku": "A1", "qty": 2, "price": 10.00}, {"sku": "B2", "qty": 1, "price": 25.00} ]}Vira duas linhas Parquet:
order_id items_sku items_qty items_priceabc A1 2 10.00abc B2 1 25.00Passo 6: Sincronização Incremental
Após a primeira extração completa, execuções subsequentes buscam apenas documentos novos:
dataspoc-pipe run mongodb-production[15:00:01] Starting pipeline: mongodb-production (incremental)[15:00:01] Resuming from state: users._id > ObjectId('6627a...')[15:00:02] Extracting: users (45 new docs since last run)[15:00:02] → raw/mongodb/users/users_20260426_150002.parquet (4 KB)[15:00:02] Extracting: orders (312 new docs)[15:00:03] → raw/mongodb/orders/orders_20260426_150002.parquet (28 KB)...[15:00:05] Pipeline complete: 4 tables, 1,892 new rowsO Pipe usa o campo _id do MongoDB (que é naturalmente ordenado por tempo de inserção) para marcar o progresso. Apenas documentos novos são extraídos.
Para forçar uma re-extração completa:
dataspoc-pipe run mongodb-production --fullPasso 7: Agendar com Cron
Adicione um cron job para extrair a cada hora:
crontab -e0 * * * * cd /opt/dataspoc && dataspoc-pipe run mongodb-production >> /var/log/dataspoc/mongodb.log 2>&1Verifique o status da última execução:
dataspoc-pipe status mongodb-productionPipeline: mongodb-productionLast run: 2026-04-26 15:00:05 (success)Next scheduled: 2026-04-26 16:00:00Total rows extracted: 1,349,512State: incremental (bookmark: users._id=6627a...)Visualize logs de execução:
dataspoc-pipe logs mongodb-production --last 5Comparação de Custos
| Abordagem | Custo Mensal | Tempo de Setup |
|---|---|---|
| MongoDB Atlas Analytics Node | $500+ | Minutos |
| ETL customizado (Airflow + Python) | $200+ (infra) | Semanas |
| Conector MongoDB do Fivetran | $1/MAR | Minutos |
| DataSpoc Pipe + S3 | ~$3 (armazenamento S3) | 15 minutos |
Para uma startup com 1M de documentos, DataSpoc Pipe mais armazenamento S3 custa menos de $5/mês. Sem taxas de plataforma ETL gerenciada, sem nodes de analytics, sem cluster Airflow.
Referência Completa de Configuração do Pipeline
source: tap: tap-mongodb config: connection_string: "${MONGODB_URI}" database: "production" collections: - users - orders - products - events # Optional: filter collections by pattern # collection_pattern: "^(users|orders).*"
target: bucket: "s3://my-company-data" prefix: "raw/mongodb" format: parquet # Parquet settings compression: snappy row_group_size: 100000
settings: batch_size: 10000 flatten_nested: true flatten_max_depth: 3 flatten_arrays: explode # or "stringify" to keep as JSON string
# Incremental settings replication_method: incremental bookmark_field: _idDe MongoDB a data lake consultável com SQL em 15 minutos. Seus analistas ganham SQL, seu banco de produção fica intocado e sua conta mensal fica abaixo de $5.