ai-agentslanggraphpythonsqldata-analysis

Construindo um Agente Data Analyst com LangGraph e DataSpoc Lens

Michael San Martim · 2026-04-21

Agentes de IA são mais úteis quando podem acessar dados reais. Neste tutorial, construímos um agente LangGraph que age como um analista de dados autônomo — ele descobre tabelas no seu data lake, escreve queries SQL e responde perguntas de negócio com números reais.

O Que Estamos Construindo

Um agente conversacional que:

  1. Descobre tabelas disponíveis em um data lake
  2. Inspeciona schemas para entender os dados
  3. Escreve e executa queries SQL
  4. Sintetiza respostas em linguagem natural

Tudo alimentado pelo DataSpoc Lens SDK como camada de dados.

Pré-requisitos

Terminal window
pip install dataspoc-lens langgraph langchain-openai pydantic

Certifique-se de que você tem um data lake configurado:

Terminal window
dataspoc-lens add-bucket s3://company-lake
dataspoc-lens discover

A Arquitetura

User Question
┌─────────────┐
│ LangGraph │
│ Router │
└──────┬──────┘
├──► [discover] → List tables via LensClient
├──► [analyze] → Get schema + run SQL via LensClient
└──► [report] → Synthesize answer with LLM

Implementação Completa

Passo 1: Definir o Estado

from typing import TypedDict, Annotated, Sequence
from pydantic import BaseModel
class AgentState(TypedDict):
question: str
tables: list[str]
schemas: dict[str, list[dict]]
sql_query: str
query_result: list[dict]
answer: str
messages: Sequence[dict]

Passo 2: Inicializar o Lens Client

from dataspoc_lens import LensClient
# Connects to your configured data lake
client = LensClient()

Passo 3: Definir os Nós do Grafo

from langchain_openai import ChatOpenAI
import json
llm = ChatOpenAI(model="gpt-4o", temperature=0)
def discover_node(state: AgentState) -> AgentState:
"""Discover all available tables in the data lake."""
tables = client.list_tables()
schemas = {}
for table in tables:
schema = client.get_schema(table)
schemas[table] = schema
return {
**state,
"tables": tables,
"schemas": schemas,
}
def analyze_node(state: AgentState) -> AgentState:
"""Generate and execute a SQL query based on the question."""
# Build context about available data
schema_context = ""
for table, columns in state["schemas"].items():
cols = ", ".join([f"{c['name']} ({c['type']})" for c in columns])
schema_context += f"\n- {table}: {cols}"
prompt = f"""You are a SQL analyst. Given these tables:
{schema_context}
Write a DuckDB SQL query to answer: {state['question']}
Rules:
- Use only the tables and columns listed above
- Return only the SQL query, no explanation
- Use aggregations and GROUP BY when appropriate
- Limit results to 20 rows unless the question requires all
"""
response = llm.invoke(prompt)
sql_query = response.content.strip().strip("```sql").strip("```").strip()
# Execute the query
result = client.query(sql_query)
return {
**state,
"sql_query": sql_query,
"query_result": result,
}
def report_node(state: AgentState) -> AgentState:
"""Synthesize a natural language answer from query results."""
prompt = f"""Based on this SQL query and its results, answer the user's question.
Question: {state['question']}
SQL executed:
```sql
{state['sql_query']}

Results: {json.dumps(state[‘query_result’][:20], indent=2, default=str)}

Provide a clear, concise answer with specific numbers. If relevant, mention trends or notable outliers. """

response = llm.invoke(prompt)
return {
**state,
"answer": response.content,
}
### Passo 4: Construir o Grafo
```python
from langgraph.graph import StateGraph, END
# Build the graph
workflow = StateGraph(AgentState)
# Add nodes
workflow.add_node("discover", discover_node)
workflow.add_node("analyze", analyze_node)
workflow.add_node("report", report_node)
# Define edges
workflow.set_entry_point("discover")
workflow.add_edge("discover", "analyze")
workflow.add_edge("analyze", "report")
workflow.add_edge("report", END)
# Compile
agent = workflow.compile()

Passo 5: Executar

result = agent.invoke({
"question": "Which products had the highest growth last quarter?",
"tables": [],
"schemas": {},
"sql_query": "",
"query_result": [],
"answer": "",
"messages": [],
})
print(result["answer"])

Exemplo de Conversa

Usuário: “Quais produtos tiveram o maior crescimento de receita no último trimestre comparado ao trimestre anterior?”

Passos internos do agente:

  1. Discover: Encontrou tabelas: raw.postgres.orders, raw.postgres.products, raw.postgres.customers

  2. Analyze: Gerou SQL:

WITH quarterly_revenue AS (
SELECT
p.name AS product_name,
DATE_TRUNC('quarter', o.created_at) AS quarter,
SUM(o.amount) AS revenue
FROM raw.postgres.orders o
JOIN raw.postgres.products p ON o.product_id = p.id
WHERE o.created_at >= DATE_TRUNC('quarter', CURRENT_DATE) - INTERVAL '6 months'
GROUP BY p.name, DATE_TRUNC('quarter', o.created_at)
)
SELECT
product_name,
MAX(CASE WHEN quarter = DATE_TRUNC('quarter', CURRENT_DATE) - INTERVAL '3 months'
THEN revenue END) AS last_quarter,
MAX(CASE WHEN quarter = DATE_TRUNC('quarter', CURRENT_DATE) - INTERVAL '6 months'
THEN revenue END) AS prev_quarter,
ROUND(
(MAX(CASE WHEN quarter = DATE_TRUNC('quarter', CURRENT_DATE) - INTERVAL '3 months' THEN revenue END) -
MAX(CASE WHEN quarter = DATE_TRUNC('quarter', CURRENT_DATE) - INTERVAL '6 months' THEN revenue END)) /
NULLIF(MAX(CASE WHEN quarter = DATE_TRUNC('quarter', CURRENT_DATE) - INTERVAL '6 months' THEN revenue END), 0) * 100
, 1) AS growth_pct
FROM quarterly_revenue
GROUP BY product_name
HAVING COUNT(DISTINCT quarter) = 2
ORDER BY growth_pct DESC
LIMIT 10
  1. Report:

Os 3 principais produtos por crescimento de receita no último trimestre foram:

  1. Enterprise Plan — +47.2% ($234K → $345K)
  2. API Add-on — +31.8% ($89K → $117K)
  3. Team Plan — +22.1% ($156K → $190K)

O Enterprise Plan teve o crescimento mais forte, provavelmente impulsionado pelo push de vendas do Q4. O crescimento do API Add-on correlaciona com as novas integrações lançadas em outubro.

Adicionando Roteamento para Perguntas Complexas

Para uso em produção, adicione um roteador que decide se deve consultar múltiplas tabelas ou fazer perguntas de acompanhamento:

def router_node(state: AgentState) -> str:
"""Decide the next step based on what we know."""
if not state["tables"]:
return "discover"
if not state["sql_query"]:
return "analyze"
return "report"
def should_retry(state: AgentState) -> str:
"""Check if the query failed and needs retry."""
if state.get("query_error"):
return "analyze" # Re-generate SQL
return "report"
# Enhanced graph with retry logic
workflow = StateGraph(AgentState)
workflow.add_node("discover", discover_node)
workflow.add_node("analyze", analyze_node)
workflow.add_node("report", report_node)
workflow.set_entry_point("discover")
workflow.add_edge("discover", "analyze")
workflow.add_conditional_edges("analyze", should_retry)
workflow.add_edge("report", END)
agent = workflow.compile()

Adicionando Memória para Conversas Multi-Turno

from langgraph.checkpoint.memory import MemorySaver
memory = MemorySaver()
agent = workflow.compile(checkpointer=memory)
# First question
config = {"configurable": {"thread_id": "analyst-session-1"}}
result = agent.invoke(
{"question": "What's our total revenue this month?", ...},
config=config,
)
# Follow-up question (agent remembers context)
result = agent.invoke(
{"question": "Break that down by region", ...},
config=config,
)

Usando Linguagem Natural do Lens em Vez de Geração de SQL

Se você não quer que o LLM gere SQL, use a IA integrada do Lens:

def analyze_with_ai_node(state: AgentState) -> AgentState:
"""Use Lens AI Ask instead of generating SQL ourselves."""
response = client.ask(state["question"])
return {
**state,
"sql_query": response["sql"],
"query_result": response["result"],
"answer": response["answer"],
}

Isso usa as capacidades de IA do servidor MCP do Lens, que já conhece o schema e pode gerar SQL DuckDB otimizado.

Considerações de Produção

  1. Tratamento de erros: Envolva client.query() em try/except e retente com SQL corrigido
  2. Rate limiting: Adicione delays entre chamadas LLM se usando muitos agentes concorrentes
  3. Cache: Faça cache da descoberta de schema — não muda com frequência
  4. Guardrails: Valide o SQL gerado antes da execução (sem DROP, DELETE, etc.)
  5. Observabilidade: Registre input/output de cada nó para depuração
import logging
logger = logging.getLogger("data_analyst_agent")
def analyze_node(state: AgentState) -> AgentState:
# ... generate SQL ...
logger.info(f"Generated SQL: {sql_query}")
try:
result = client.query(sql_query)
logger.info(f"Query returned {len(result)} rows")
except Exception as e:
logger.error(f"Query failed: {e}")
return {**state, "query_error": str(e)}
return {**state, "sql_query": sql_query, "query_result": result}

Próximos Passos

  • Adicionar mais ferramentas: geração de gráficos, export para CSV, agendamento de relatórios
  • Conectar ao Slack para um comando /data que invoca o agente
  • Adicionar o servidor MCP DataSpoc Lens para integração com Claude Desktop
  • Usar padrões multi-agente CrewAI para fluxos de trabalho analíticos complexos

Recomendados