llamaindexai-agentspythondata-lakerag

LlamaIndex + DataSpoc: Consulta tu Data Lake sin Embeddings

Michael San Martim · 2026-04-25

RAG sobre datos estructurados es una trampa. Divides tus tablas en fragmentos de texto, los conviertes en embeddings, recuperas coincidencias apróximadas y rezas para que el LLM reconstruya la respuesta correcta. Para datos estructurados, hay un mejor camino: darle al LLM herramientas que ejecuten SQL real.

Este post construye un agente de LlamaIndex que consulta tu data lake a traves de DataSpoc Lens — sin embeddings, sin vector store, sin fragmentacion. Solo SQL preciso sobre archivos Parquet en la nube.

Por que no RAG para datos estructurados?

Considera la pregunta: “Cual fue el ingreso total del ultimo trimestre?” Con RAG, tendrias que:

  1. Fragmentar tu tabla de ordenes en trozos de texto
  2. Convertir esos fragmentos en embeddings en un vector store
  3. Recuperar los top-k fragmentos mas similares
  4. Esperar que el LLM pueda sumar números a partir de fragmentos de texto

El resultado suele ser incorrecto. La similitud vectorial no entiende agregaciones, joins ni rangos de tiempo.

Con agentes que usan tool-calling, el LLM escribe SQL, lo ejecuta a traves de DataSpoc Lens y devuelve la respuesta exacta.

Prerrequisitos

Terminal window
pip install llama-index llama-index-llms-openai dataspoc-lens

Asegurate de tener un bucket de DataSpoc Lens configurado:

Terminal window
dataspoc-lens add-bucket s3://my-company-data --name production
dataspoc-lens tables

Paso 1: Definir herramientas desde LensClient

El FunctionTool de LlamaIndex envuelve cualquier callable de Python en una herramienta que el agente puede usar. Vamos a envolver los cuatro metodos clave de LensClient:

from llama_index.core.tools import FunctionTool
from dataspoc_lens import LensClient
lens = LensClient()
def list_tables() -> str:
"""List all available tables in the data lake.
Call this first to discover what data is available."""
tables = lens.tables()
return "\n".join(tables)
def get_schema(table_name: str) -> str:
"""Get the column names and types for a specific table.
Call this before writing SQL to know the exact column names."""
schema = lens.schema(table_name)
lines = [f" {col['name']} ({col['type']})" for col in schema]
return f"Table: {table_name}\n" + "\n".join(lines)
def run_query(sql: str) -> str:
"""Execute a SQL query against the data lake and return results.
Use DuckDB SQL syntax. Always use LIMIT to avoid huge results."""
result = lens.query(sql)
return result.to_string(max_rows=50)
def get_cache_status() -> str:
"""Check which tables are cached locally for fast queries."""
status = lens.cache_status()
lines = [f" {t['table']}: {t['status']} ({t['size']})" for t in status]
return "\n".join(lines)
# Wrap as LlamaIndex tools
tools = [
FunctionTool.from_defaults(fn=list_tables),
FunctionTool.from_defaults(fn=get_schema),
FunctionTool.from_defaults(fn=run_query),
FunctionTool.from_defaults(fn=get_cache_status),
]

Cada función tiene un docstring que sirve cómo descripcion de la herramienta. El agente lee estas descripciones para decidir cual herramienta llamar.

Paso 2: Crear el agente

from llama_index.core.agent import ReActAgent
from llama_index.llms.openai import OpenAI
llm = OpenAI(model="gpt-4o", temperature=0)
agent = ReActAgent.from_tools(
tools,
llm=llm,
verbose=True,
system_prompt=(
"You are a data analyst with access to a company data lake. "
"Always start by listing tables, then check schemas before writing SQL. "
"Use DuckDB SQL syntax. Include LIMIT in queries unless aggregating. "
"Present results clearly with context."
),
)

Paso 3: Conversacion multi-turno

# First question: discovery
response = agent.chat("What data do we have available?")
print(response)
# Agent calls list_tables(), then summarizes the available datasets
# Second question: analysis
response = agent.chat("What are the top 10 customers by revenue this quarter?")
print(response)
# Agent calls get_schema("orders"), get_schema("customers"),
# then run_query() with a JOIN and aggregation
# Follow-up: drill down
response = agent.chat("Break that down by month for the top customer")
print(response)
# Agent remembers context, writes a filtered query for the top customer

Asi se ve el razonamiento del agente con verbose=True:

Thought: I need to find the top customers by revenue. Let me check the schema first.
Action: get_schema
Action Input: {'table_name': 'raw.orders'}
Observation: Table: raw.orders
order_id (INTEGER)
customer_id (INTEGER)
amount (DECIMAL(10,2))
order_date (DATE)
status (VARCHAR)
Thought: Now let me check the customers table for names.
Action: get_schema
Action Input: {'table_name': 'raw.customers'}
Observation: Table: raw.customers
customer_id (INTEGER)
name (VARCHAR)
email (VARCHAR)
created_at (TIMESTAMP)
Thought: I can now write the query joining orders and customers.
Action: run_query
Action Input: {'sql': "SELECT c.name, SUM(o.amount) as total_revenue FROM raw.orders o JOIN raw.customers c ON o.customer_id = c.customer_id WHERE o.order_date >= '2026-01-01' AND o.order_date < '2026-04-01' GROUP BY c.name ORDER BY total_revenue DESC LIMIT 10"}

LlamaIndex SQL Agent vs. DataSpoc Agent

LlamaIndex incluye un NLSQLTableQueryEngine integrado que se conecta directamente a una base de datos. Asi se compara:

CaracteristicaLlamaIndex SQL AgentDataSpoc + FunctionTool
Fuente de datosBase de datos activa (Postgres, MySQL)Parquet en la nube (S3/GCS/Azure)
ConfiguracionConnection string + SQLAlchemydataspoc-lens add-bucket
CostoLa base de datos se mantiene activaSin computo cuando esta inactivo
EscalaLimitada por la base de datosDuckDB escala a TBs de Parquet
Descubrimiento de esquemaIntrospeccion SQLAlchemylens.tables() + lens.schema()
CacheNingunoCache local de Parquet en DataSpoc

El enfoque de DataSpoc funcióna sobre data lakes. No necesitas una base de datos en ejecucion — tus datos estan cómo archivos Parquet en almacenamiento de objetos y DuckDB los consulta directamente.

Ejemplo completo funciónal

Aqui esta el script completo que puedes guardar y ejecutar:

"""llamaindex_data_agent.py - Query your data lake with LlamaIndex agents."""
from llama_index.core.agent import ReActAgent
from llama_index.core.tools import FunctionTool
from llama_index.llms.openai import OpenAI
from dataspoc_lens import LensClient
# Initialize the Lens client
lens = LensClient()
def list_tables() -> str:
"""List all available tables in the data lake.
Call this first to discover what data is available."""
tables = lens.tables()
return "\n".join(tables)
def get_schema(table_name: str) -> str:
"""Get the column names and types for a specific table.
Call this before writing SQL to know the exact column names."""
schema = lens.schema(table_name)
lines = [f" {col['name']} ({col['type']})" for col in schema]
return f"Table: {table_name}\n" + "\n".join(lines)
def run_query(sql: str) -> str:
"""Execute a SQL query against the data lake and return results.
Use DuckDB SQL syntax. Always use LIMIT to avoid huge results."""
result = lens.query(sql)
return result.to_string(max_rows=50)
def refresh_cache(table_name: str) -> str:
"""Refresh the local cache for a table to speed up queries."""
lens.cache_refresh(table_name)
return f"Cache refreshed for {table_name}"
# Build the agent
tools = [
FunctionTool.from_defaults(fn=list_tables),
FunctionTool.from_defaults(fn=get_schema),
FunctionTool.from_defaults(fn=run_query),
FunctionTool.from_defaults(fn=refresh_cache),
]
llm = OpenAI(model="gpt-4o", temperature=0)
agent = ReActAgent.from_tools(
tools,
llm=llm,
verbose=True,
system_prompt=(
"You are a data analyst with access to a company data lake. "
"Always start by listing tables, then check schemas before writing SQL. "
"Use DuckDB SQL syntax. Include LIMIT in queries unless aggregating. "
"Present results clearly with context."
),
)
# Interactive loop
print("Data Lake Agent (type 'quit' to exit)")
print("-" * 40)
while True:
question = input("\nYou: ")
if question.lower() in ("quit", "exit", "q"):
break
response = agent.chat(question)
print(f"\nAgent: {response}")

Usar Claude en lugar de OpenAI

Cambia el LLM en dos lineas:

from llama_index.llms.anthropic import Anthropic
llm = Anthropic(model="claude-sonnet-4-20250514", temperature=0)

Todo lo demas se mantiene igual. Las herramientas, el agente y el bucle de conversacion funciónan de manera identica.

Consejos para producción

  1. Cachea tablas frecuentes — llama a lens.cache_refresh(table) para tablas que se consultan seguido. El agente puede hacer esto automáticamente a traves de la herramienta de refresh.
  2. Agrega protecciones — envuelve run_query para rechazar DROP, DELETE, o consultas sin LIMIT.
  3. Registra las llamadas a herramientas — el sistema de callbacks de LlamaIndex te permite registrar cada invocacion de herramienta para depuracion.
  4. Presupuesto de tokens — resultados de consultas grandes consumen tokens. Limita la salida de run_query a 50 filas e indica al agente que acote las consultas.

Sin embeddings. Sin vector stores. Solo herramientas SQL y un agente de razonamiento consultando datos reales en tu bucket en la nube.

Recomendados