LlamaIndex + DataSpoc: Consulte Seu Data Lake Sem Embeddings
RAG sobre dados estruturados é uma armadilha. Você divide suas tabelas em pedaços de texto, gera embeddings, recupera correspondências aproximadas e torce para que o LLM reconstrua a resposta correta. Para dados estruturados, existe um caminho melhor: dar ao LLM ferramentas que executam SQL real.
Este post constrói um agente LlamaIndex que consulta seu data lake através do DataSpoc Lens — sem embeddings, sem vector store, sem chunking. Apenas SQL preciso sobre arquivos Parquet na nuvem.
Por Que Não RAG para Dados Estruturados?
Considere a pergunta: “Qual foi a receita total no último trimestre?” Com RAG, você faria:
- Dividir sua tabela de pedidos em fragmentos de texto
- Gerar embeddings desses fragmentos em um vector store
- Recuperar os top-k fragmentos mais similares
- Torcer para que o LLM consiga somar números a partir de fragmentos de texto
O resultado frequentemente está errado. Similaridade vetorial não entende agregação, joins ou intervalos de tempo.
Com agentes de chamada de ferramentas, o LLM escreve SQL, executa através do DataSpoc Lens e retorna a resposta exata.
Pré-requisitos
pip install llama-index llama-index-llms-openai dataspoc-lensCertifique-se de ter um bucket do DataSpoc Lens configurado:
dataspoc-lens add-bucket s3://my-company-data --name productiondataspoc-lens tablesPasso 1: Definir Ferramentas a partir do LensClient
O FunctionTool do LlamaIndex encapsula qualquer callable Python em uma ferramenta que o agente pode usar. Vamos encapsular os quatro métodos principais do LensClient:
from llama_index.core.tools import FunctionToolfrom dataspoc_lens import LensClient
lens = LensClient()
def list_tables() -> str: """List all available tables in the data lake. Call this first to discover what data is available.""" tables = lens.tables() return "\n".join(tables)
def get_schema(table_name: str) -> str: """Get the column names and types for a specific table. Call this before writing SQL to know the exact column names.""" schema = lens.schema(table_name) lines = [f" {col['name']} ({col['type']})" for col in schema] return f"Table: {table_name}\n" + "\n".join(lines)
def run_query(sql: str) -> str: """Execute a SQL query against the data lake and return results. Use DuckDB SQL syntax. Always use LIMIT to avoid huge results.""" result = lens.query(sql) return result.to_string(max_rows=50)
def get_cache_status() -> str: """Check which tables are cached locally for fast queries.""" status = lens.cache_status() lines = [f" {t['table']}: {t['status']} ({t['size']})" for t in status] return "\n".join(lines)
# Wrap as LlamaIndex toolstools = [ FunctionTool.from_defaults(fn=list_tables), FunctionTool.from_defaults(fn=get_schema), FunctionTool.from_defaults(fn=run_query), FunctionTool.from_defaults(fn=get_cache_status),]Cada função tem uma docstring que serve como descrição da ferramenta. O agente lê essas descrições para decidir qual ferramenta chamar.
Passo 2: Criar o Agente
from llama_index.core.agent import ReActAgentfrom llama_index.llms.openai import OpenAI
llm = OpenAI(model="gpt-4o", temperature=0)
agent = ReActAgent.from_tools( tools, llm=llm, verbose=True, system_prompt=( "You are a data analyst with access to a company data lake. " "Always start by listing tables, then check schemas before writing SQL. " "Use DuckDB SQL syntax. Include LIMIT in queries unless aggregating. " "Present results clearly with context." ),)Passo 3: Conversação Multi-turno
# First question: discoveryresponse = agent.chat("What data do we have available?")print(response)# Agent calls list_tables(), then summarizes the available datasets
# Second question: analysisresponse = agent.chat("What are the top 10 customers by revenue this quarter?")print(response)# Agent calls get_schema("orders"), get_schema("customers"),# then run_query() with a JOIN and aggregation
# Follow-up: drill downresponse = agent.chat("Break that down by month for the top customer")print(response)# Agent remembers context, writes a filtered query for the top customerVeja como o raciocínio do agente aparece com verbose=True:
Thought: I need to find the top customers by revenue. Let me check the schema first.Action: get_schemaAction Input: {'table_name': 'raw.orders'}Observation: Table: raw.orders order_id (INTEGER) customer_id (INTEGER) amount (DECIMAL(10,2)) order_date (DATE) status (VARCHAR)
Thought: Now let me check the customers table for names.Action: get_schemaAction Input: {'table_name': 'raw.customers'}Observation: Table: raw.customers customer_id (INTEGER) name (VARCHAR) email (VARCHAR) created_at (TIMESTAMP)
Thought: I can now write the query joining orders and customers.Action: run_queryAction Input: {'sql': "SELECT c.name, SUM(o.amount) as total_revenue FROM raw.orders o JOIN raw.customers c ON o.customer_id = c.customer_id WHERE o.order_date >= '2026-01-01' AND o.order_date < '2026-04-01' GROUP BY c.name ORDER BY total_revenue DESC LIMIT 10"}LlamaIndex SQL Agent vs. DataSpoc Agent
O LlamaIndex já vem com um NLSQLTableQueryEngine integrado que conecta diretamente a um banco de dados. Veja como se compara:
| Recurso | LlamaIndex SQL Agent | DataSpoc + FunctionTool |
|---|---|---|
| Fonte de dados | Banco de dados ativo (Postgres, MySQL) | Parquet na nuvem (S3/GCS/Azure) |
| Setup | Connection string + SQLAlchemy | dataspoc-lens add-bucket |
| Custo | Banco de dados sempre ligado | Sem compute quando ocioso |
| Escala | Limitado pelo banco de dados | DuckDB escala até TBs de Parquet |
| Descoberta de schema | Introspecção SQLAlchemy | lens.tables() + lens.schema() |
| Cache | Nenhum | Cache local Parquet do DataSpoc |
A abordagem DataSpoc funciona em data lakes. Você não precisa de um banco de dados rodando — seus dados ficam como arquivos Parquet em object storage e o DuckDB consulta diretamente.
Exemplo Completo Funcional
Aqui está o script completo que você pode salvar e executar:
"""llamaindex_data_agent.py - Query your data lake with LlamaIndex agents."""
from llama_index.core.agent import ReActAgentfrom llama_index.core.tools import FunctionToolfrom llama_index.llms.openai import OpenAIfrom dataspoc_lens import LensClient
# Initialize the Lens clientlens = LensClient()
def list_tables() -> str: """List all available tables in the data lake. Call this first to discover what data is available.""" tables = lens.tables() return "\n".join(tables)
def get_schema(table_name: str) -> str: """Get the column names and types for a specific table. Call this before writing SQL to know the exact column names.""" schema = lens.schema(table_name) lines = [f" {col['name']} ({col['type']})" for col in schema] return f"Table: {table_name}\n" + "\n".join(lines)
def run_query(sql: str) -> str: """Execute a SQL query against the data lake and return results. Use DuckDB SQL syntax. Always use LIMIT to avoid huge results.""" result = lens.query(sql) return result.to_string(max_rows=50)
def refresh_cache(table_name: str) -> str: """Refresh the local cache for a table to speed up queries.""" lens.cache_refresh(table_name) return f"Cache refreshed for {table_name}"
# Build the agenttools = [ FunctionTool.from_defaults(fn=list_tables), FunctionTool.from_defaults(fn=get_schema), FunctionTool.from_defaults(fn=run_query), FunctionTool.from_defaults(fn=refresh_cache),]
llm = OpenAI(model="gpt-4o", temperature=0)
agent = ReActAgent.from_tools( tools, llm=llm, verbose=True, system_prompt=( "You are a data analyst with access to a company data lake. " "Always start by listing tables, then check schemas before writing SQL. " "Use DuckDB SQL syntax. Include LIMIT in queries unless aggregating. " "Present results clearly with context." ),)
# Interactive loopprint("Data Lake Agent (type 'quit' to exit)")print("-" * 40)while True: question = input("\nYou: ") if question.lower() in ("quit", "exit", "q"): break response = agent.chat(question) print(f"\nAgent: {response}")Usando Claude em Vez do OpenAI
Troque o LLM em duas linhas:
from llama_index.llms.anthropic import Anthropic
llm = Anthropic(model="claude-sonnet-4-20250514", temperature=0)Todo o resto permanece igual. As ferramentas, o agente e o loop de conversação funcionam de forma idêntica.
Dicas para Produção
- Cache de tabelas frequentes — chame
lens.cache_refresh(table)para tabelas consultadas frequentemente. O agente pode fazer isso automaticamente via ferramenta de refresh. - Adicione guardrails — encapsule
run_querypara rejeitarDROP,DELETEou consultas semLIMIT. - Registre chamadas de ferramentas — o sistema de callbacks do LlamaIndex permite registrar cada invocação de ferramenta para depuração.
- Orçamento de tokens — resultados grandes de consultas consomem tokens. Limite a saída de
run_querya 50 linhas e instrua o agente a refinar consultas.
Sem embeddings. Sem vector stores. Apenas ferramentas SQL e um agente de raciocínio consultando dados reais no seu bucket na nuvem.