Construyendo un Agente Analista de Datos con LangGraph y DataSpoc Lens
Los agentes de IA son más útiles cuando pueden acceder a datos reales. En este tutorial, construimos un agente LangGraph que actúa cómo un analista de datos autónomo — descubre tablas en tu data lake, escribe consultas SQL y responde preguntas de negocio con números reales.
Lo Que Estamos Construyendo
Un agente conversacional que:
- Descubre tablas disponibles en un data lake
- Inspecciona schemas para entender los datos
- Escribe y ejecuta consultas SQL
- Sintetiza respuestas en lenguaje natural
Todo potenciado por el SDK de DataSpoc Lens cómo capa de datos.
Requisitos Previos
pip install dataspoc-lens langgraph langchain-openai pydanticAsegúrate de tener un data lake configurado:
dataspoc-lens add-bucket s3://company-lakedataspoc-lens discoverLa Arquitectura
User Question │ ▼┌─────────────┐│ LangGraph ││ Router │└──────┬──────┘ │ ├──► [discover] → List tables via LensClient │ ├──► [analyze] → Get schema + run SQL via LensClient │ └──► [report] → Synthesize answer with LLMImplementación Completa
Paso 1: Definir el Estado
from typing import TypedDict, Annotated, Sequencefrom pydantic import BaseModel
class AgentState(TypedDict): question: str tables: list[str] schemas: dict[str, list[dict]] sql_query: str query_result: list[dict] answer: str messages: Sequence[dict]Paso 2: Inicializar el Cliente Lens
from dataspoc_lens import LensClient
# Connects to your configured data lakeclient = LensClient()Paso 3: Definir los Nodos del Grafo
from langchain_openai import ChatOpenAIimport json
llm = ChatOpenAI(model="gpt-4o", temperature=0)
def discover_node(state: AgentState) -> AgentState: """Discover all available tables in the data lake.""" tables = client.list_tables() schemas = {} for table in tables: schema = client.get_schema(table) schemas[table] = schema
return { **state, "tables": tables, "schemas": schemas, }
def analyze_node(state: AgentState) -> AgentState: """Generate and execute a SQL query based on the question.""" # Build context about available data schema_context = "" for table, columns in state["schemas"].items(): cols = ", ".join([f"{c['name']} ({c['type']})" for c in columns]) schema_context += f"\n- {table}: {cols}"
prompt = f"""You are a SQL analyst. Given these tables:{schema_context}
Write a DuckDB SQL query to answer: {state['question']}
Rules:- Use only the tables and columns listed above- Return only the SQL query, no explanation- Use aggregations and GROUP BY when appropriate- Limit results to 20 rows unless the question requires all"""
response = llm.invoke(prompt) sql_query = response.content.strip().strip("```sql").strip("```").strip()
# Execute the query result = client.query(sql_query)
return { **state, "sql_query": sql_query, "query_result": result, }
def report_node(state: AgentState) -> AgentState: """Synthesize a natural language answer from query results.""" prompt = f"""Based on this SQL query and its results, answer the user's question.
Question: {state['question']}
SQL executed:```sql{state['sql_query']}Results: {json.dumps(state[‘query_result’][:20], indent=2, default=str)}
Provide a clear, concise answer with specific numbers. If relevant, mention trends or notable outliers. """
response = llm.invoke(prompt)
return { **state, "answer": response.content,}### Paso 4: Construir el Grafo
```pythonfrom langgraph.graph import StateGraph, END
# Build the graphworkflow = StateGraph(AgentState)
# Add nodesworkflow.add_node("discover", discover_node)workflow.add_node("analyze", analyze_node)workflow.add_node("report", report_node)
# Define edgesworkflow.set_entry_point("discover")workflow.add_edge("discover", "analyze")workflow.add_edge("analyze", "report")workflow.add_edge("report", END)
# Compileagent = workflow.compile()Paso 5: Ejecutarlo
result = agent.invoke({ "question": "Which products had the highest growth last quarter?", "tables": [], "schemas": {}, "sql_query": "", "query_result": [], "answer": "", "messages": [],})
print(result["answer"])Ejemplo de Conversación
Usuario: “Which products had the highest revenue growth last quarter compared to the previous quarter?”
Pasos internos del agente:
-
Discover: Se encontraron tablas:
raw.postgres.orders,raw.postgres.products,raw.postgres.customers -
Analyze: SQL generado:
WITH quarterly_revenue AS ( SELECT p.name AS product_name, DATE_TRUNC('quarter', o.created_at) AS quarter, SUM(o.amount) AS revenue FROM raw.postgres.orders o JOIN raw.postgres.products p ON o.product_id = p.id WHERE o.created_at >= DATE_TRUNC('quarter', CURRENT_DATE) - INTERVAL '6 months' GROUP BY p.name, DATE_TRUNC('quarter', o.created_at))SELECT product_name, MAX(CASE WHEN quarter = DATE_TRUNC('quarter', CURRENT_DATE) - INTERVAL '3 months' THEN revenue END) AS last_quarter, MAX(CASE WHEN quarter = DATE_TRUNC('quarter', CURRENT_DATE) - INTERVAL '6 months' THEN revenue END) AS prev_quarter, ROUND( (MAX(CASE WHEN quarter = DATE_TRUNC('quarter', CURRENT_DATE) - INTERVAL '3 months' THEN revenue END) - MAX(CASE WHEN quarter = DATE_TRUNC('quarter', CURRENT_DATE) - INTERVAL '6 months' THEN revenue END)) / NULLIF(MAX(CASE WHEN quarter = DATE_TRUNC('quarter', CURRENT_DATE) - INTERVAL '6 months' THEN revenue END), 0) * 100 , 1) AS growth_pctFROM quarterly_revenueGROUP BY product_nameHAVING COUNT(DISTINCT quarter) = 2ORDER BY growth_pct DESCLIMIT 10- Report:
Los 3 principales productos por crecimiento de ingresos el último trimestre fueron:
- Enterprise Plan — +47.2% ($234K → $345K)
- API Add-on — +31.8% ($89K → $117K)
- Team Plan — +22.1% ($156K → $190K)
Enterprise Plan tuvo el crecimiento más fuerte, probablemente impulsado por el push de ventas del Q4. El crecimiento del API Add-on se correlaciona con las nuevas integraciónes lanzadas en octubre.
Agregar Enrutamiento para Preguntas Complejas
Para uso en producción, agrega un router que decida si consultar múltiples tablas o hacer preguntas de seguimiento:
def router_node(state: AgentState) -> str: """Decide the next step based on what we know.""" if not state["tables"]: return "discover" if not state["sql_query"]: return "analyze" return "report"
def should_retry(state: AgentState) -> str: """Check if the query failed and needs retry.""" if state.get("query_error"): return "analyze" # Re-generate SQL return "report"
# Enhanced graph with retry logicworkflow = StateGraph(AgentState)workflow.add_node("discover", discover_node)workflow.add_node("analyze", analyze_node)workflow.add_node("report", report_node)
workflow.set_entry_point("discover")workflow.add_edge("discover", "analyze")workflow.add_conditional_edges("analyze", should_retry)workflow.add_edge("report", END)
agent = workflow.compile()Agregar Memoria para Conversaciones de Múltiples Turnos
from langgraph.checkpoint.memory import MemorySaver
memory = MemorySaver()agent = workflow.compile(checkpointer=memory)
# First questionconfig = {"configurable": {"thread_id": "analyst-session-1"}}result = agent.invoke( {"question": "What's our total revenue this month?", ...}, config=config,)
# Follow-up question (agent remembers context)result = agent.invoke( {"question": "Break that down by region", ...}, config=config,)Usar Lenguaje Natural de Lens en Lugar de Generación de SQL
Si no quieres que el LLM genere SQL, usa la IA integrada de Lens:
def analyze_with_ai_node(state: AgentState) -> AgentState: """Use Lens AI Ask instead of generating SQL ourselves.""" response = client.ask(state["question"])
return { **state, "sql_query": response["sql"], "query_result": response["result"], "answer": response["answer"], }Esto usa las capacidades de IA del servidor MCP de Lens, que ya conoce el schema y puede generar SQL optimizado para DuckDB.
Consideraciones para Producción
- Manejo de errores: Envuelve
client.query()en try/except y reintenta con SQL corregido - Limitación de tasa: Agrega retardos entre llamadas al LLM si usas muchos agentes concurrentes
- Cache: Cachea el descubrimiento de schema — no cambia con frecuencia
- Guardrails: Valida el SQL generado antes de la ejecución (sin DROP, DELETE, etc.)
- Observabilidad: Registra la entrada/salida de cada nodo para depuración
import logging
logger = logging.getLogger("data_analyst_agent")
def analyze_node(state: AgentState) -> AgentState: # ... generate SQL ... logger.info(f"Generated SQL: {sql_query}")
try: result = client.query(sql_query) logger.info(f"Query returned {len(result)} rows") except Exception as e: logger.error(f"Query failed: {e}") return {**state, "query_error": str(e)}
return {**state, "sql_query": sql_query, "query_result": result}Próximos Pasos
- Agregar más herramientas: generación de gráficos, exportar a CSV, programar reportes
- Conectar a Slack para un comando
/dataque invoque al agente - Agregar el servidor MCP de DataSpoc Lens para integración con Claude Desktop
- Usar patrónes multi-agente de CrewAI para flujos de trabajo analíticos complejos