LlamaIndex + DataSpoc: Consulta tu Data Lake sin Embeddings
RAG sobre datos estructurados es una trampa. Divides tus tablas en fragmentos de texto, los conviertes en embeddings, recuperas coincidencias apróximadas y rezas para que el LLM reconstruya la respuesta correcta. Para datos estructurados, hay un mejor camino: darle al LLM herramientas que ejecuten SQL real.
Este post construye un agente de LlamaIndex que consulta tu data lake a traves de DataSpoc Lens — sin embeddings, sin vector store, sin fragmentacion. Solo SQL preciso sobre archivos Parquet en la nube.
Por que no RAG para datos estructurados?
Considera la pregunta: “Cual fue el ingreso total del ultimo trimestre?” Con RAG, tendrias que:
- Fragmentar tu tabla de ordenes en trozos de texto
- Convertir esos fragmentos en embeddings en un vector store
- Recuperar los top-k fragmentos mas similares
- Esperar que el LLM pueda sumar números a partir de fragmentos de texto
El resultado suele ser incorrecto. La similitud vectorial no entiende agregaciones, joins ni rangos de tiempo.
Con agentes que usan tool-calling, el LLM escribe SQL, lo ejecuta a traves de DataSpoc Lens y devuelve la respuesta exacta.
Prerrequisitos
pip install llama-index llama-index-llms-openai dataspoc-lensAsegurate de tener un bucket de DataSpoc Lens configurado:
dataspoc-lens add-bucket s3://my-company-data --name productiondataspoc-lens tablesPaso 1: Definir herramientas desde LensClient
El FunctionTool de LlamaIndex envuelve cualquier callable de Python en una herramienta que el agente puede usar. Vamos a envolver los cuatro metodos clave de LensClient:
from llama_index.core.tools import FunctionToolfrom dataspoc_lens import LensClient
lens = LensClient()
def list_tables() -> str: """List all available tables in the data lake. Call this first to discover what data is available.""" tables = lens.tables() return "\n".join(tables)
def get_schema(table_name: str) -> str: """Get the column names and types for a specific table. Call this before writing SQL to know the exact column names.""" schema = lens.schema(table_name) lines = [f" {col['name']} ({col['type']})" for col in schema] return f"Table: {table_name}\n" + "\n".join(lines)
def run_query(sql: str) -> str: """Execute a SQL query against the data lake and return results. Use DuckDB SQL syntax. Always use LIMIT to avoid huge results.""" result = lens.query(sql) return result.to_string(max_rows=50)
def get_cache_status() -> str: """Check which tables are cached locally for fast queries.""" status = lens.cache_status() lines = [f" {t['table']}: {t['status']} ({t['size']})" for t in status] return "\n".join(lines)
# Wrap as LlamaIndex toolstools = [ FunctionTool.from_defaults(fn=list_tables), FunctionTool.from_defaults(fn=get_schema), FunctionTool.from_defaults(fn=run_query), FunctionTool.from_defaults(fn=get_cache_status),]Cada función tiene un docstring que sirve cómo descripcion de la herramienta. El agente lee estas descripciones para decidir cual herramienta llamar.
Paso 2: Crear el agente
from llama_index.core.agent import ReActAgentfrom llama_index.llms.openai import OpenAI
llm = OpenAI(model="gpt-4o", temperature=0)
agent = ReActAgent.from_tools( tools, llm=llm, verbose=True, system_prompt=( "You are a data analyst with access to a company data lake. " "Always start by listing tables, then check schemas before writing SQL. " "Use DuckDB SQL syntax. Include LIMIT in queries unless aggregating. " "Present results clearly with context." ),)Paso 3: Conversacion multi-turno
# First question: discoveryresponse = agent.chat("What data do we have available?")print(response)# Agent calls list_tables(), then summarizes the available datasets
# Second question: analysisresponse = agent.chat("What are the top 10 customers by revenue this quarter?")print(response)# Agent calls get_schema("orders"), get_schema("customers"),# then run_query() with a JOIN and aggregation
# Follow-up: drill downresponse = agent.chat("Break that down by month for the top customer")print(response)# Agent remembers context, writes a filtered query for the top customerAsi se ve el razonamiento del agente con verbose=True:
Thought: I need to find the top customers by revenue. Let me check the schema first.Action: get_schemaAction Input: {'table_name': 'raw.orders'}Observation: Table: raw.orders order_id (INTEGER) customer_id (INTEGER) amount (DECIMAL(10,2)) order_date (DATE) status (VARCHAR)
Thought: Now let me check the customers table for names.Action: get_schemaAction Input: {'table_name': 'raw.customers'}Observation: Table: raw.customers customer_id (INTEGER) name (VARCHAR) email (VARCHAR) created_at (TIMESTAMP)
Thought: I can now write the query joining orders and customers.Action: run_queryAction Input: {'sql': "SELECT c.name, SUM(o.amount) as total_revenue FROM raw.orders o JOIN raw.customers c ON o.customer_id = c.customer_id WHERE o.order_date >= '2026-01-01' AND o.order_date < '2026-04-01' GROUP BY c.name ORDER BY total_revenue DESC LIMIT 10"}LlamaIndex SQL Agent vs. DataSpoc Agent
LlamaIndex incluye un NLSQLTableQueryEngine integrado que se conecta directamente a una base de datos. Asi se compara:
| Caracteristica | LlamaIndex SQL Agent | DataSpoc + FunctionTool |
|---|---|---|
| Fuente de datos | Base de datos activa (Postgres, MySQL) | Parquet en la nube (S3/GCS/Azure) |
| Configuracion | Connection string + SQLAlchemy | dataspoc-lens add-bucket |
| Costo | La base de datos se mantiene activa | Sin computo cuando esta inactivo |
| Escala | Limitada por la base de datos | DuckDB escala a TBs de Parquet |
| Descubrimiento de esquema | Introspeccion SQLAlchemy | lens.tables() + lens.schema() |
| Cache | Ninguno | Cache local de Parquet en DataSpoc |
El enfoque de DataSpoc funcióna sobre data lakes. No necesitas una base de datos en ejecucion — tus datos estan cómo archivos Parquet en almacenamiento de objetos y DuckDB los consulta directamente.
Ejemplo completo funciónal
Aqui esta el script completo que puedes guardar y ejecutar:
"""llamaindex_data_agent.py - Query your data lake with LlamaIndex agents."""
from llama_index.core.agent import ReActAgentfrom llama_index.core.tools import FunctionToolfrom llama_index.llms.openai import OpenAIfrom dataspoc_lens import LensClient
# Initialize the Lens clientlens = LensClient()
def list_tables() -> str: """List all available tables in the data lake. Call this first to discover what data is available.""" tables = lens.tables() return "\n".join(tables)
def get_schema(table_name: str) -> str: """Get the column names and types for a specific table. Call this before writing SQL to know the exact column names.""" schema = lens.schema(table_name) lines = [f" {col['name']} ({col['type']})" for col in schema] return f"Table: {table_name}\n" + "\n".join(lines)
def run_query(sql: str) -> str: """Execute a SQL query against the data lake and return results. Use DuckDB SQL syntax. Always use LIMIT to avoid huge results.""" result = lens.query(sql) return result.to_string(max_rows=50)
def refresh_cache(table_name: str) -> str: """Refresh the local cache for a table to speed up queries.""" lens.cache_refresh(table_name) return f"Cache refreshed for {table_name}"
# Build the agenttools = [ FunctionTool.from_defaults(fn=list_tables), FunctionTool.from_defaults(fn=get_schema), FunctionTool.from_defaults(fn=run_query), FunctionTool.from_defaults(fn=refresh_cache),]
llm = OpenAI(model="gpt-4o", temperature=0)
agent = ReActAgent.from_tools( tools, llm=llm, verbose=True, system_prompt=( "You are a data analyst with access to a company data lake. " "Always start by listing tables, then check schemas before writing SQL. " "Use DuckDB SQL syntax. Include LIMIT in queries unless aggregating. " "Present results clearly with context." ),)
# Interactive loopprint("Data Lake Agent (type 'quit' to exit)")print("-" * 40)while True: question = input("\nYou: ") if question.lower() in ("quit", "exit", "q"): break response = agent.chat(question) print(f"\nAgent: {response}")Usar Claude en lugar de OpenAI
Cambia el LLM en dos lineas:
from llama_index.llms.anthropic import Anthropic
llm = Anthropic(model="claude-sonnet-4-20250514", temperature=0)Todo lo demas se mantiene igual. Las herramientas, el agente y el bucle de conversacion funciónan de manera identica.
Consejos para producción
- Cachea tablas frecuentes — llama a
lens.cache_refresh(table)para tablas que se consultan seguido. El agente puede hacer esto automáticamente a traves de la herramienta de refresh. - Agrega protecciones — envuelve
run_querypara rechazarDROP,DELETE, o consultas sinLIMIT. - Registra las llamadas a herramientas — el sistema de callbacks de LlamaIndex te permite registrar cada invocacion de herramienta para depuracion.
- Presupuesto de tokens — resultados de consultas grandes consumen tokens. Limita la salida de
run_querya 50 filas e indica al agente que acote las consultas.
Sin embeddings. Sin vector stores. Solo herramientas SQL y un agente de razonamiento consultando datos reales en tu bucket en la nube.