ai-agentslanggraphpythonsqldata-analysis

Construyendo un Agente Analista de Datos con LangGraph y DataSpoc Lens

Michael San Martim · 2026-04-21

Los agentes de IA son más útiles cuando pueden acceder a datos reales. En este tutorial, construimos un agente LangGraph que actúa cómo un analista de datos autónomo — descubre tablas en tu data lake, escribe consultas SQL y responde preguntas de negocio con números reales.

Lo Que Estamos Construyendo

Un agente conversacional que:

  1. Descubre tablas disponibles en un data lake
  2. Inspecciona schemas para entender los datos
  3. Escribe y ejecuta consultas SQL
  4. Sintetiza respuestas en lenguaje natural

Todo potenciado por el SDK de DataSpoc Lens cómo capa de datos.

Requisitos Previos

Terminal window
pip install dataspoc-lens langgraph langchain-openai pydantic

Asegúrate de tener un data lake configurado:

Terminal window
dataspoc-lens add-bucket s3://company-lake
dataspoc-lens discover

La Arquitectura

User Question
┌─────────────┐
│ LangGraph │
│ Router │
└──────┬──────┘
├──► [discover] → List tables via LensClient
├──► [analyze] → Get schema + run SQL via LensClient
└──► [report] → Synthesize answer with LLM

Implementación Completa

Paso 1: Definir el Estado

from typing import TypedDict, Annotated, Sequence
from pydantic import BaseModel
class AgentState(TypedDict):
question: str
tables: list[str]
schemas: dict[str, list[dict]]
sql_query: str
query_result: list[dict]
answer: str
messages: Sequence[dict]

Paso 2: Inicializar el Cliente Lens

from dataspoc_lens import LensClient
# Connects to your configured data lake
client = LensClient()

Paso 3: Definir los Nodos del Grafo

from langchain_openai import ChatOpenAI
import json
llm = ChatOpenAI(model="gpt-4o", temperature=0)
def discover_node(state: AgentState) -> AgentState:
"""Discover all available tables in the data lake."""
tables = client.list_tables()
schemas = {}
for table in tables:
schema = client.get_schema(table)
schemas[table] = schema
return {
**state,
"tables": tables,
"schemas": schemas,
}
def analyze_node(state: AgentState) -> AgentState:
"""Generate and execute a SQL query based on the question."""
# Build context about available data
schema_context = ""
for table, columns in state["schemas"].items():
cols = ", ".join([f"{c['name']} ({c['type']})" for c in columns])
schema_context += f"\n- {table}: {cols}"
prompt = f"""You are a SQL analyst. Given these tables:
{schema_context}
Write a DuckDB SQL query to answer: {state['question']}
Rules:
- Use only the tables and columns listed above
- Return only the SQL query, no explanation
- Use aggregations and GROUP BY when appropriate
- Limit results to 20 rows unless the question requires all
"""
response = llm.invoke(prompt)
sql_query = response.content.strip().strip("```sql").strip("```").strip()
# Execute the query
result = client.query(sql_query)
return {
**state,
"sql_query": sql_query,
"query_result": result,
}
def report_node(state: AgentState) -> AgentState:
"""Synthesize a natural language answer from query results."""
prompt = f"""Based on this SQL query and its results, answer the user's question.
Question: {state['question']}
SQL executed:
```sql
{state['sql_query']}

Results: {json.dumps(state[‘query_result’][:20], indent=2, default=str)}

Provide a clear, concise answer with specific numbers. If relevant, mention trends or notable outliers. """

response = llm.invoke(prompt)
return {
**state,
"answer": response.content,
}
### Paso 4: Construir el Grafo
```python
from langgraph.graph import StateGraph, END
# Build the graph
workflow = StateGraph(AgentState)
# Add nodes
workflow.add_node("discover", discover_node)
workflow.add_node("analyze", analyze_node)
workflow.add_node("report", report_node)
# Define edges
workflow.set_entry_point("discover")
workflow.add_edge("discover", "analyze")
workflow.add_edge("analyze", "report")
workflow.add_edge("report", END)
# Compile
agent = workflow.compile()

Paso 5: Ejecutarlo

result = agent.invoke({
"question": "Which products had the highest growth last quarter?",
"tables": [],
"schemas": {},
"sql_query": "",
"query_result": [],
"answer": "",
"messages": [],
})
print(result["answer"])

Ejemplo de Conversación

Usuario: “Which products had the highest revenue growth last quarter compared to the previous quarter?”

Pasos internos del agente:

  1. Discover: Se encontraron tablas: raw.postgres.orders, raw.postgres.products, raw.postgres.customers

  2. Analyze: SQL generado:

WITH quarterly_revenue AS (
SELECT
p.name AS product_name,
DATE_TRUNC('quarter', o.created_at) AS quarter,
SUM(o.amount) AS revenue
FROM raw.postgres.orders o
JOIN raw.postgres.products p ON o.product_id = p.id
WHERE o.created_at >= DATE_TRUNC('quarter', CURRENT_DATE) - INTERVAL '6 months'
GROUP BY p.name, DATE_TRUNC('quarter', o.created_at)
)
SELECT
product_name,
MAX(CASE WHEN quarter = DATE_TRUNC('quarter', CURRENT_DATE) - INTERVAL '3 months'
THEN revenue END) AS last_quarter,
MAX(CASE WHEN quarter = DATE_TRUNC('quarter', CURRENT_DATE) - INTERVAL '6 months'
THEN revenue END) AS prev_quarter,
ROUND(
(MAX(CASE WHEN quarter = DATE_TRUNC('quarter', CURRENT_DATE) - INTERVAL '3 months' THEN revenue END) -
MAX(CASE WHEN quarter = DATE_TRUNC('quarter', CURRENT_DATE) - INTERVAL '6 months' THEN revenue END)) /
NULLIF(MAX(CASE WHEN quarter = DATE_TRUNC('quarter', CURRENT_DATE) - INTERVAL '6 months' THEN revenue END), 0) * 100
, 1) AS growth_pct
FROM quarterly_revenue
GROUP BY product_name
HAVING COUNT(DISTINCT quarter) = 2
ORDER BY growth_pct DESC
LIMIT 10
  1. Report:

Los 3 principales productos por crecimiento de ingresos el último trimestre fueron:

  1. Enterprise Plan — +47.2% ($234K → $345K)
  2. API Add-on — +31.8% ($89K → $117K)
  3. Team Plan — +22.1% ($156K → $190K)

Enterprise Plan tuvo el crecimiento más fuerte, probablemente impulsado por el push de ventas del Q4. El crecimiento del API Add-on se correlaciona con las nuevas integraciónes lanzadas en octubre.

Agregar Enrutamiento para Preguntas Complejas

Para uso en producción, agrega un router que decida si consultar múltiples tablas o hacer preguntas de seguimiento:

def router_node(state: AgentState) -> str:
"""Decide the next step based on what we know."""
if not state["tables"]:
return "discover"
if not state["sql_query"]:
return "analyze"
return "report"
def should_retry(state: AgentState) -> str:
"""Check if the query failed and needs retry."""
if state.get("query_error"):
return "analyze" # Re-generate SQL
return "report"
# Enhanced graph with retry logic
workflow = StateGraph(AgentState)
workflow.add_node("discover", discover_node)
workflow.add_node("analyze", analyze_node)
workflow.add_node("report", report_node)
workflow.set_entry_point("discover")
workflow.add_edge("discover", "analyze")
workflow.add_conditional_edges("analyze", should_retry)
workflow.add_edge("report", END)
agent = workflow.compile()

Agregar Memoria para Conversaciones de Múltiples Turnos

from langgraph.checkpoint.memory import MemorySaver
memory = MemorySaver()
agent = workflow.compile(checkpointer=memory)
# First question
config = {"configurable": {"thread_id": "analyst-session-1"}}
result = agent.invoke(
{"question": "What's our total revenue this month?", ...},
config=config,
)
# Follow-up question (agent remembers context)
result = agent.invoke(
{"question": "Break that down by region", ...},
config=config,
)

Usar Lenguaje Natural de Lens en Lugar de Generación de SQL

Si no quieres que el LLM genere SQL, usa la IA integrada de Lens:

def analyze_with_ai_node(state: AgentState) -> AgentState:
"""Use Lens AI Ask instead of generating SQL ourselves."""
response = client.ask(state["question"])
return {
**state,
"sql_query": response["sql"],
"query_result": response["result"],
"answer": response["answer"],
}

Esto usa las capacidades de IA del servidor MCP de Lens, que ya conoce el schema y puede generar SQL optimizado para DuckDB.

Consideraciones para Producción

  1. Manejo de errores: Envuelve client.query() en try/except y reintenta con SQL corregido
  2. Limitación de tasa: Agrega retardos entre llamadas al LLM si usas muchos agentes concurrentes
  3. Cache: Cachea el descubrimiento de schema — no cambia con frecuencia
  4. Guardrails: Valida el SQL generado antes de la ejecución (sin DROP, DELETE, etc.)
  5. Observabilidad: Registra la entrada/salida de cada nodo para depuración
import logging
logger = logging.getLogger("data_analyst_agent")
def analyze_node(state: AgentState) -> AgentState:
# ... generate SQL ...
logger.info(f"Generated SQL: {sql_query}")
try:
result = client.query(sql_query)
logger.info(f"Query returned {len(result)} rows")
except Exception as e:
logger.error(f"Query failed: {e}")
return {**state, "query_error": str(e)}
return {**state, "sql_query": sql_query, "query_result": result}

Próximos Pasos

  • Agregar más herramientas: generación de gráficos, exportar a CSV, programar reportes
  • Conectar a Slack para un comando /data que invoque al agente
  • Agregar el servidor MCP de DataSpoc Lens para integración con Claude Desktop
  • Usar patrónes multi-agente de CrewAI para flujos de trabajo analíticos complejos

Recomendados