Construindo um Agente Data Analyst com LangGraph e DataSpoc Lens
Agentes de IA são mais úteis quando podem acessar dados reais. Neste tutorial, construímos um agente LangGraph que age como um analista de dados autônomo — ele descobre tabelas no seu data lake, escreve queries SQL e responde perguntas de negócio com números reais.
O Que Estamos Construindo
Um agente conversacional que:
- Descobre tabelas disponíveis em um data lake
- Inspeciona schemas para entender os dados
- Escreve e executa queries SQL
- Sintetiza respostas em linguagem natural
Tudo alimentado pelo DataSpoc Lens SDK como camada de dados.
Pré-requisitos
pip install dataspoc-lens langgraph langchain-openai pydanticCertifique-se de que você tem um data lake configurado:
dataspoc-lens add-bucket s3://company-lakedataspoc-lens discoverA Arquitetura
User Question │ ▼┌─────────────┐│ LangGraph ││ Router │└──────┬──────┘ │ ├──► [discover] → List tables via LensClient │ ├──► [analyze] → Get schema + run SQL via LensClient │ └──► [report] → Synthesize answer with LLMImplementação Completa
Passo 1: Definir o Estado
from typing import TypedDict, Annotated, Sequencefrom pydantic import BaseModel
class AgentState(TypedDict): question: str tables: list[str] schemas: dict[str, list[dict]] sql_query: str query_result: list[dict] answer: str messages: Sequence[dict]Passo 2: Inicializar o Lens Client
from dataspoc_lens import LensClient
# Connects to your configured data lakeclient = LensClient()Passo 3: Definir os Nós do Grafo
from langchain_openai import ChatOpenAIimport json
llm = ChatOpenAI(model="gpt-4o", temperature=0)
def discover_node(state: AgentState) -> AgentState: """Discover all available tables in the data lake.""" tables = client.list_tables() schemas = {} for table in tables: schema = client.get_schema(table) schemas[table] = schema
return { **state, "tables": tables, "schemas": schemas, }
def analyze_node(state: AgentState) -> AgentState: """Generate and execute a SQL query based on the question.""" # Build context about available data schema_context = "" for table, columns in state["schemas"].items(): cols = ", ".join([f"{c['name']} ({c['type']})" for c in columns]) schema_context += f"\n- {table}: {cols}"
prompt = f"""You are a SQL analyst. Given these tables:{schema_context}
Write a DuckDB SQL query to answer: {state['question']}
Rules:- Use only the tables and columns listed above- Return only the SQL query, no explanation- Use aggregations and GROUP BY when appropriate- Limit results to 20 rows unless the question requires all"""
response = llm.invoke(prompt) sql_query = response.content.strip().strip("```sql").strip("```").strip()
# Execute the query result = client.query(sql_query)
return { **state, "sql_query": sql_query, "query_result": result, }
def report_node(state: AgentState) -> AgentState: """Synthesize a natural language answer from query results.""" prompt = f"""Based on this SQL query and its results, answer the user's question.
Question: {state['question']}
SQL executed:```sql{state['sql_query']}Results: {json.dumps(state[‘query_result’][:20], indent=2, default=str)}
Provide a clear, concise answer with specific numbers. If relevant, mention trends or notable outliers. """
response = llm.invoke(prompt)
return { **state, "answer": response.content,}### Passo 4: Construir o Grafo
```pythonfrom langgraph.graph import StateGraph, END
# Build the graphworkflow = StateGraph(AgentState)
# Add nodesworkflow.add_node("discover", discover_node)workflow.add_node("analyze", analyze_node)workflow.add_node("report", report_node)
# Define edgesworkflow.set_entry_point("discover")workflow.add_edge("discover", "analyze")workflow.add_edge("analyze", "report")workflow.add_edge("report", END)
# Compileagent = workflow.compile()Passo 5: Executar
result = agent.invoke({ "question": "Which products had the highest growth last quarter?", "tables": [], "schemas": {}, "sql_query": "", "query_result": [], "answer": "", "messages": [],})
print(result["answer"])Exemplo de Conversa
Usuário: “Quais produtos tiveram o maior crescimento de receita no último trimestre comparado ao trimestre anterior?”
Passos internos do agente:
-
Discover: Encontrou tabelas:
raw.postgres.orders,raw.postgres.products,raw.postgres.customers -
Analyze: Gerou SQL:
WITH quarterly_revenue AS ( SELECT p.name AS product_name, DATE_TRUNC('quarter', o.created_at) AS quarter, SUM(o.amount) AS revenue FROM raw.postgres.orders o JOIN raw.postgres.products p ON o.product_id = p.id WHERE o.created_at >= DATE_TRUNC('quarter', CURRENT_DATE) - INTERVAL '6 months' GROUP BY p.name, DATE_TRUNC('quarter', o.created_at))SELECT product_name, MAX(CASE WHEN quarter = DATE_TRUNC('quarter', CURRENT_DATE) - INTERVAL '3 months' THEN revenue END) AS last_quarter, MAX(CASE WHEN quarter = DATE_TRUNC('quarter', CURRENT_DATE) - INTERVAL '6 months' THEN revenue END) AS prev_quarter, ROUND( (MAX(CASE WHEN quarter = DATE_TRUNC('quarter', CURRENT_DATE) - INTERVAL '3 months' THEN revenue END) - MAX(CASE WHEN quarter = DATE_TRUNC('quarter', CURRENT_DATE) - INTERVAL '6 months' THEN revenue END)) / NULLIF(MAX(CASE WHEN quarter = DATE_TRUNC('quarter', CURRENT_DATE) - INTERVAL '6 months' THEN revenue END), 0) * 100 , 1) AS growth_pctFROM quarterly_revenueGROUP BY product_nameHAVING COUNT(DISTINCT quarter) = 2ORDER BY growth_pct DESCLIMIT 10- Report:
Os 3 principais produtos por crescimento de receita no último trimestre foram:
- Enterprise Plan — +47.2% ($234K → $345K)
- API Add-on — +31.8% ($89K → $117K)
- Team Plan — +22.1% ($156K → $190K)
O Enterprise Plan teve o crescimento mais forte, provavelmente impulsionado pelo push de vendas do Q4. O crescimento do API Add-on correlaciona com as novas integrações lançadas em outubro.
Adicionando Roteamento para Perguntas Complexas
Para uso em produção, adicione um roteador que decide se deve consultar múltiplas tabelas ou fazer perguntas de acompanhamento:
def router_node(state: AgentState) -> str: """Decide the next step based on what we know.""" if not state["tables"]: return "discover" if not state["sql_query"]: return "analyze" return "report"
def should_retry(state: AgentState) -> str: """Check if the query failed and needs retry.""" if state.get("query_error"): return "analyze" # Re-generate SQL return "report"
# Enhanced graph with retry logicworkflow = StateGraph(AgentState)workflow.add_node("discover", discover_node)workflow.add_node("analyze", analyze_node)workflow.add_node("report", report_node)
workflow.set_entry_point("discover")workflow.add_edge("discover", "analyze")workflow.add_conditional_edges("analyze", should_retry)workflow.add_edge("report", END)
agent = workflow.compile()Adicionando Memória para Conversas Multi-Turno
from langgraph.checkpoint.memory import MemorySaver
memory = MemorySaver()agent = workflow.compile(checkpointer=memory)
# First questionconfig = {"configurable": {"thread_id": "analyst-session-1"}}result = agent.invoke( {"question": "What's our total revenue this month?", ...}, config=config,)
# Follow-up question (agent remembers context)result = agent.invoke( {"question": "Break that down by region", ...}, config=config,)Usando Linguagem Natural do Lens em Vez de Geração de SQL
Se você não quer que o LLM gere SQL, use a IA integrada do Lens:
def analyze_with_ai_node(state: AgentState) -> AgentState: """Use Lens AI Ask instead of generating SQL ourselves.""" response = client.ask(state["question"])
return { **state, "sql_query": response["sql"], "query_result": response["result"], "answer": response["answer"], }Isso usa as capacidades de IA do servidor MCP do Lens, que já conhece o schema e pode gerar SQL DuckDB otimizado.
Considerações de Produção
- Tratamento de erros: Envolva
client.query()em try/except e retente com SQL corrigido - Rate limiting: Adicione delays entre chamadas LLM se usando muitos agentes concorrentes
- Cache: Faça cache da descoberta de schema — não muda com frequência
- Guardrails: Valide o SQL gerado antes da execução (sem DROP, DELETE, etc.)
- Observabilidade: Registre input/output de cada nó para depuração
import logging
logger = logging.getLogger("data_analyst_agent")
def analyze_node(state: AgentState) -> AgentState: # ... generate SQL ... logger.info(f"Generated SQL: {sql_query}")
try: result = client.query(sql_query) logger.info(f"Query returned {len(result)} rows") except Exception as e: logger.error(f"Query failed: {e}") return {**state, "query_error": str(e)}
return {**state, "sql_query": sql_query, "query_result": result}Próximos Passos
- Adicionar mais ferramentas: geração de gráficos, export para CSV, agendamento de relatórios
- Conectar ao Slack para um comando
/dataque invoca o agente - Adicionar o servidor MCP DataSpoc Lens para integração com Claude Desktop
- Usar padrões multi-agente CrewAI para fluxos de trabalho analíticos complexos