LlamaIndex + DataSpoc: Query Your Data Lake Without Embeddings
RAG over structured data is a trap. You chunk your tables into text, embed them, retrieve approximate matches, and pray the LLM reconstructs the right answer. For structured data, there is a better path: give the LLM tools that run real SQL.
This post builds a LlamaIndex agent that queries your data lake through DataSpoc Lens — no embeddings, no vector store, no chunking. Just accurate SQL on cloud Parquet files.
Why Not RAG for Structured Data?
Consider the question: “What was total revenue last quarter?” With RAG, you would:
- Chunk your orders table into text fragments
- Embed those fragments into a vector store
- Retrieve the top-k most similar chunks
- Hope the LLM can sum numbers from text fragments
The result is often wrong. Vector similarity does not understand aggregation, joins, or time ranges.
With tool-calling agents, the LLM writes SQL, executes it through DataSpoc Lens, and returns the exact answer.
Prerequisites
pip install llama-index llama-index-llms-openai dataspoc-lensMake sure you have a DataSpoc Lens bucket configured:
dataspoc-lens add-bucket s3://my-company-data --name productiondataspoc-lens tablesStep 1: Define Tools from LensClient
LlamaIndex’s FunctionTool wraps any Python callable into a tool the agent can use. We will wrap the four key LensClient methods:
from llama_index.core.tools import FunctionToolfrom dataspoc_lens import LensClient
lens = LensClient()
def list_tables() -> str: """List all available tables in the data lake. Call this first to discover what data is available.""" tables = lens.tables() return "\n".join(tables)
def get_schema(table_name: str) -> str: """Get the column names and types for a specific table. Call this before writing SQL to know the exact column names.""" schema = lens.schema(table_name) lines = [f" {col['name']} ({col['type']})" for col in schema] return f"Table: {table_name}\n" + "\n".join(lines)
def run_query(sql: str) -> str: """Execute a SQL query against the data lake and return results. Use DuckDB SQL syntax. Always use LIMIT to avoid huge results.""" result = lens.query(sql) return result.to_string(max_rows=50)
def get_cache_status() -> str: """Check which tables are cached locally for fast queries.""" status = lens.cache_status() lines = [f" {t['table']}: {t['status']} ({t['size']})" for t in status] return "\n".join(lines)
# Wrap as LlamaIndex toolstools = [ FunctionTool.from_defaults(fn=list_tables), FunctionTool.from_defaults(fn=get_schema), FunctionTool.from_defaults(fn=run_query), FunctionTool.from_defaults(fn=get_cache_status),]Each function has a docstring that serves as the tool description. The agent reads these to decide which tool to call.
Step 2: Create the Agent
from llama_index.core.agent import ReActAgentfrom llama_index.llms.openai import OpenAI
llm = OpenAI(model="gpt-4o", temperature=0)
agent = ReActAgent.from_tools( tools, llm=llm, verbose=True, system_prompt=( "You are a data analyst with access to a company data lake. " "Always start by listing tables, then check schemas before writing SQL. " "Use DuckDB SQL syntax. Include LIMIT in queries unless aggregating. " "Present results clearly with context." ),)Step 3: Multi-Turn Conversation
# First question: discoveryresponse = agent.chat("What data do we have available?")print(response)# Agent calls list_tables(), then summarizes the available datasets
# Second question: analysisresponse = agent.chat("What are the top 10 customers by revenue this quarter?")print(response)# Agent calls get_schema("orders"), get_schema("customers"),# then run_query() with a JOIN and aggregation
# Follow-up: drill downresponse = agent.chat("Break that down by month for the top customer")print(response)# Agent remembers context, writes a filtered query for the top customerHere is what the agent’s reasoning looks like with verbose=True:
Thought: I need to find the top customers by revenue. Let me check the schema first.Action: get_schemaAction Input: {'table_name': 'raw.orders'}Observation: Table: raw.orders order_id (INTEGER) customer_id (INTEGER) amount (DECIMAL(10,2)) order_date (DATE) status (VARCHAR)
Thought: Now let me check the customers table for names.Action: get_schemaAction Input: {'table_name': 'raw.customers'}Observation: Table: raw.customers customer_id (INTEGER) name (VARCHAR) email (VARCHAR) created_at (TIMESTAMP)
Thought: I can now write the query joining orders and customers.Action: run_queryAction Input: {'sql': "SELECT c.name, SUM(o.amount) as total_revenue FROM raw.orders o JOIN raw.customers c ON o.customer_id = c.customer_id WHERE o.order_date >= '2026-01-01' AND o.order_date < '2026-04-01' GROUP BY c.name ORDER BY total_revenue DESC LIMIT 10"}LlamaIndex SQL Agent vs. DataSpoc Agent
LlamaIndex ships a built-in NLSQLTableQueryEngine that connects directly to a database. Here is how it compares:
| Feature | LlamaIndex SQL Agent | DataSpoc + FunctionTool |
|---|---|---|
| Data source | Live database (Postgres, MySQL) | Cloud Parquet (S3/GCS/Azure) |
| Setup | Connection string + SQLAlchemy | dataspoc-lens add-bucket |
| Cost | Database stays running | No compute when idle |
| Scale | Limited by database | DuckDB scales to TBs of Parquet |
| Schema discovery | SQLAlchemy introspection | lens.tables() + lens.schema() |
| Caching | None | DataSpoc local Parquet cache |
The DataSpoc approach works on data lakes. You do not need a running database — your data sits as Parquet files in object storage and DuckDB queries them directly.
Full Working Example
Here is the complete script you can save and run:
"""llamaindex_data_agent.py - Query your data lake with LlamaIndex agents."""
from llama_index.core.agent import ReActAgentfrom llama_index.core.tools import FunctionToolfrom llama_index.llms.openai import OpenAIfrom dataspoc_lens import LensClient
# Initialize the Lens clientlens = LensClient()
def list_tables() -> str: """List all available tables in the data lake. Call this first to discover what data is available.""" tables = lens.tables() return "\n".join(tables)
def get_schema(table_name: str) -> str: """Get the column names and types for a specific table. Call this before writing SQL to know the exact column names.""" schema = lens.schema(table_name) lines = [f" {col['name']} ({col['type']})" for col in schema] return f"Table: {table_name}\n" + "\n".join(lines)
def run_query(sql: str) -> str: """Execute a SQL query against the data lake and return results. Use DuckDB SQL syntax. Always use LIMIT to avoid huge results.""" result = lens.query(sql) return result.to_string(max_rows=50)
def refresh_cache(table_name: str) -> str: """Refresh the local cache for a table to speed up queries.""" lens.cache_refresh(table_name) return f"Cache refreshed for {table_name}"
# Build the agenttools = [ FunctionTool.from_defaults(fn=list_tables), FunctionTool.from_defaults(fn=get_schema), FunctionTool.from_defaults(fn=run_query), FunctionTool.from_defaults(fn=refresh_cache),]
llm = OpenAI(model="gpt-4o", temperature=0)
agent = ReActAgent.from_tools( tools, llm=llm, verbose=True, system_prompt=( "You are a data analyst with access to a company data lake. " "Always start by listing tables, then check schemas before writing SQL. " "Use DuckDB SQL syntax. Include LIMIT in queries unless aggregating. " "Present results clearly with context." ),)
# Interactive loopprint("Data Lake Agent (type 'quit' to exit)")print("-" * 40)while True: question = input("\nYou: ") if question.lower() in ("quit", "exit", "q"): break response = agent.chat(question) print(f"\nAgent: {response}")Using Claude Instead of OpenAI
Swap the LLM in two lines:
from llama_index.llms.anthropic import Anthropic
llm = Anthropic(model="claude-sonnet-4-20250514", temperature=0)Everything else stays the same. The tools, agent, and conversation loop work identically.
Tips for Production
- Cache hot tables — call
lens.cache_refresh(table)for tables queried often. The agent can do this automatically via the refresh tool. - Add guardrails — wrap
run_queryto rejectDROP,DELETE, or queries withoutLIMIT. - Log tool calls — LlamaIndex’s callback system lets you log every tool invocation for debugging.
- Token budget — large query results burn tokens. Cap
run_queryoutput at 50 rows and tell the agent to narrow queries.
No embeddings. No vector stores. Just SQL tools and a reasoning agent querying real data in your cloud bucket.