mongodbmigrationparquetdata-laketutorial

MongoDB a Parquet: Construye un Data Lake desde tu MongoDB en 15 minutos

Michael San Martim · 2026-04-26

Tu app corre sobre MongoDB. Tus analistas quieren SQL. Tu equipo de datos quiere un data lake. Tu no quieres construir un pipeline ETL desde cero.

Este post te lleva de una base de datos MongoDB a un data lake consultable basado en Parquet en 15 minutos usando DataSpoc Pipe. Vas a extraer colecciones, aplanar documentos anidados, configurar sincronizacion incremental y consultar el resultado con SQL.

El problema

MongoDB es excelente para aplicaciónes pero doloroso para analitica:

  • Sin SQL (si, existe MQL, pero tus analistas no lo conocen)
  • Sin joins (los aggregation pipelines son verbosos y lentos para analitica)
  • Carga de producción: las consultas analiticas compiten con tu app
  • Sin esquema: cada documento puede ser diferente

La solución: extraer a Parquet una vez, consultar para siempre con DuckDB.

Paso 1: Instalar el tap de MongoDB

Terminal window
pip install dataspoc-pipe
dataspoc-pipe add tap-mongodb

Esto instala el extractor de MongoDB compatible con Singer. DataSpoc Pipe lo gestiona cómo un plugin.

Paso 2: Configurar la conexión

Crea un YAML de pipeline que define la fuente y el destino:

pipelines/mongodb-production.yaml
source:
tap: tap-mongodb
config:
connection_string: "${MONGODB_URI}"
database: "production"
collections:
- users
- orders
- products
- events
target:
bucket: "s3://my-company-data"
prefix: "raw/mongodb"
format: parquet
settings:
batch_size: 10000
flatten_nested: true
flatten_max_depth: 3

Establece tu connection string de MongoDB cómo variable de entorno:

Terminal window
export MONGODB_URI="mongodb+srv://readonly:password@cluster0.abc123.mongodb.net"

Usa un usuario de solo lectura. Nunca le des acceso de escritura a producción a tu pipeline de ETL.

Paso 3: Vista previa de lo que se extraera

Antes de ejecutar una extracción completa, previsualizaliza el esquema que Pipe descubre:

Terminal window
dataspoc-pipe discover mongodb-production

Salida:

Discovered 4 collections:
users (12,450 documents)
_id ObjectId
email string
name string
plan string
address object → address_street, address_city, address_state, address_zip
created_at datetime
metadata object → metadata_source, metadata_campaign
orders (89,230 documents)
_id ObjectId
user_id ObjectId
items array → flattened to separate rows
total number
status string
created_at datetime
products (340 documents)
_id ObjectId
name string
sku string
price number
category string
tags array → tags_0, tags_1, tags_2
events (1,245,600 documents)
_id ObjectId
user_id ObjectId
event_type string
properties object → properties_page, properties_referrer, properties_duration
timestamp datetime

Observa cómo flatten_nested: true convierte objetos anidados en columnas planas. El objeto address se convierte en address_street, address_city, etc. Esto es critico porque Parquet necesita un esquema plano.

Paso 4: Ejecutar la extracción

Terminal window
dataspoc-pipe run mongodb-production
[14:23:01] Starting pipeline: mongodb-production
[14:23:02] Extracting: users (12,450 docs)
[14:23:05] → raw/mongodb/users/users_20260426_142305.parquet (1.2 MB)
[14:23:05] Extracting: orders (89,230 docs)
[14:23:18] → raw/mongodb/orders/orders_20260426_142305.parquet (8.4 MB)
[14:23:18] Extracting: products (340 docs)
[14:23:19] → raw/mongodb/products/products_20260426_142319.parquet (42 KB)
[14:23:19] Extracting: events (1,245,600 docs)
[14:24:02] → raw/mongodb/events/events_20260426_142319.parquet (95 MB)
[14:24:02] Pipeline complete: 4 tables, 1,347,620 rows, 105 MB Parquet
[14:24:02] State saved to .dataspoc/state/mongodb-production/state.json

Cuatro colecciones extraidas. 1.3 millones de documentos convertidos a Parquet. El archivo de estado registra el ultimo _id visto en cada coleccion para extracción incremental.

Paso 5: Consultar con DataSpoc Lens

Terminal window
dataspoc-lens add-bucket s3://my-company-data --name production
dataspoc-lens tables
raw.mongodb.users
raw.mongodb.orders
raw.mongodb.products
raw.mongodb.events

Ahora ejecuta SQL:

Terminal window
dataspoc-lens query "
SELECT
u.plan,
COUNT(DISTINCT u._id) as customers,
SUM(o.total) as revenue,
ROUND(SUM(o.total) / COUNT(DISTINCT u._id), 2) as arpu
FROM raw.mongodb.users u
JOIN raw.mongodb.orders o ON u._id = o.user_id
WHERE o.status = 'completed'
GROUP BY u.plan
ORDER BY revenue DESC
"
┌──────────┬───────────┬────────────┬────────┐
│ plan │ customers │ revenue │ arpu │
├──────────┼───────────┼────────────┼────────┤
│ business │ 2,340 │ 1,245,600 │ 532.31 │
│ pro │ 4,120 │ 824,000 │ 200.00 │
│ starter │ 5,990 │ 299,500 │ 50.00 │
└──────────┴───────────┴────────────┴────────┘

Tus analistas ahora tienen acceso SQL a los datos de MongoDB sin tocar la base de datos de producción.

Manejo de documentos anidados

El esquema flexible de MongoDB es el mayor desafio. Asi es cómo Pipe maneja cada caso:

Objetos anidados se aplanan con separadores de guion bajo:

// MongoDB document
{
"address": {
"street": "123 Main St",
"city": "San Francisco",
"state": "CA"
}
}
// Parquet columns
address_street VARCHAR "123 Main St"
address_city VARCHAR "San Francisco"
address_state VARCHAR "CA"

Arrays de primitivos se convierten en columnas numeradas:

{ "tags": ["electronics", "sale", "featured"] }
tags_0 VARCHAR "electronics"
tags_1 VARCHAR "sale"
tags_2 VARCHAR "featured"

Arrays de objetos (cómo items de orden) se pueden aplanar en filas separadas:

# In pipeline config
settings:
flatten_nested: true
flatten_arrays: explode # Creates one row per array element
{
"order_id": "abc",
"items": [
{"sku": "A1", "qty": 2, "price": 10.00},
{"sku": "B2", "qty": 1, "price": 25.00}
]
}

Se convierte en dos filas Parquet:

order_id items_sku items_qty items_price
abc A1 2 10.00
abc B2 1 25.00

Paso 6: Sincronizacion incremental

Despues de la primera extracción completa, las ejecuciones posteriores solo obtienen documentos nuevos:

Terminal window
dataspoc-pipe run mongodb-production
[15:00:01] Starting pipeline: mongodb-production (incremental)
[15:00:01] Resuming from state: users._id > ObjectId('6627a...')
[15:00:02] Extracting: users (45 new docs since last run)
[15:00:02] → raw/mongodb/users/users_20260426_150002.parquet (4 KB)
[15:00:02] Extracting: orders (312 new docs)
[15:00:03] → raw/mongodb/orders/orders_20260426_150002.parquet (28 KB)
...
[15:00:05] Pipeline complete: 4 tables, 1,892 new rows

Pipe usa el campo _id de MongoDB (que esta naturalmente ordenado por tiempo de insercion) para marcar el progreso. Solo se extraen documentos nuevos.

Para forzar una re-extracción completa:

Terminal window
dataspoc-pipe run mongodb-production --full

Paso 7: Programar con cron

Agrega un trabajo cron para extraer cada hora:

Terminal window
crontab -e
0 * * * * cd /opt/dataspoc && dataspoc-pipe run mongodb-production >> /var/log/dataspoc/mongodb.log 2>&1

Verifica el estado de la ultima ejecucion:

Terminal window
dataspoc-pipe status mongodb-production
Pipeline: mongodb-production
Last run: 2026-04-26 15:00:05 (success)
Next scheduled: 2026-04-26 16:00:00
Total rows extracted: 1,349,512
State: incremental (bookmark: users._id=6627a...)

Ver logs de ejecucion:

Terminal window
dataspoc-pipe logs mongodb-production --last 5

Comparación de costos

EnfoqueCosto mensualTiempo de configuración
MongoDB Atlas Analytics Node$500+Minutos
ETL personalizado (Airflow + Python)$200+ (infra)Semanas
Conector MongoDB de Fivetran$1/MARMinutos
DataSpoc Pipe + S3~$3 (almacenamiento S3)15 minutos

Para una startup con 1M de documentos, DataSpoc Pipe mas almacenamiento S3 cuesta menos de $5/mes. Sin tarifas de plataforma ETL gestionada, sin nodos de analitica, sin cluster de Airflow.

Referencia completa de configuración de pipeline

pipelines/mongodb-production.yaml
source:
tap: tap-mongodb
config:
connection_string: "${MONGODB_URI}"
database: "production"
collections:
- users
- orders
- products
- events
# Optional: filter collections by pattern
# collection_pattern: "^(users|orders).*"
target:
bucket: "s3://my-company-data"
prefix: "raw/mongodb"
format: parquet
# Parquet settings
compression: snappy
row_group_size: 100000
settings:
batch_size: 10000
flatten_nested: true
flatten_max_depth: 3
flatten_arrays: explode # or "stringify" to keep as JSON string
# Incremental settings
replication_method: incremental
bookmark_field: _id

De MongoDB a un data lake consultable con SQL en 15 minutos. Tus analistas obtienen SQL, tu base de datos de producción queda intacta y tu factura mensual se mantiene por debajo de $5.

Recomendados