ai-agentslanggraphpythonsqldata-analysis

Building a Data Analyst Agent with LangGraph and DataSpoc Lens

Michael San Martim · 2026-04-21

AI agents are most useful when they can access real data. In this tutorial, we build a LangGraph agent that acts as an autonomous data analyst — it discovers tables in your data lake, writes SQL queries, and answers business questions with real numbers.

What We’re Building

A conversational agent that:

  1. Discovers available tables in a data lake
  2. Inspects schemas to understand the data
  3. Writes and executes SQL queries
  4. Synthesizes answers in natural language

All powered by DataSpoc Lens SDK as the data layer.

Prerequisites

Terminal window
pip install dataspoc-lens langgraph langchain-openai pydantic

Make sure you have a data lake configured:

Terminal window
dataspoc-lens add-bucket s3://company-lake
dataspoc-lens discover

The Architecture

User Question
┌─────────────┐
│ LangGraph │
│ Router │
└──────┬──────┘
├──► [discover] → List tables via LensClient
├──► [analyze] → Get schema + run SQL via LensClient
└──► [report] → Synthesize answer with LLM

Full Implementation

Step 1: Define the State

from typing import TypedDict, Annotated, Sequence
from pydantic import BaseModel
class AgentState(TypedDict):
question: str
tables: list[str]
schemas: dict[str, list[dict]]
sql_query: str
query_result: list[dict]
answer: str
messages: Sequence[dict]

Step 2: Initialize the Lens Client

from dataspoc_lens import LensClient
# Connects to your configured data lake
client = LensClient()

Step 3: Define the Graph Nodes

from langchain_openai import ChatOpenAI
import json
llm = ChatOpenAI(model="gpt-4o", temperature=0)
def discover_node(state: AgentState) -> AgentState:
"""Discover all available tables in the data lake."""
tables = client.list_tables()
schemas = {}
for table in tables:
schema = client.get_schema(table)
schemas[table] = schema
return {
**state,
"tables": tables,
"schemas": schemas,
}
def analyze_node(state: AgentState) -> AgentState:
"""Generate and execute a SQL query based on the question."""
# Build context about available data
schema_context = ""
for table, columns in state["schemas"].items():
cols = ", ".join([f"{c['name']} ({c['type']})" for c in columns])
schema_context += f"\n- {table}: {cols}"
prompt = f"""You are a SQL analyst. Given these tables:
{schema_context}
Write a DuckDB SQL query to answer: {state['question']}
Rules:
- Use only the tables and columns listed above
- Return only the SQL query, no explanation
- Use aggregations and GROUP BY when appropriate
- Limit results to 20 rows unless the question requires all
"""
response = llm.invoke(prompt)
sql_query = response.content.strip().strip("```sql").strip("```").strip()
# Execute the query
result = client.query(sql_query)
return {
**state,
"sql_query": sql_query,
"query_result": result,
}
def report_node(state: AgentState) -> AgentState:
"""Synthesize a natural language answer from query results."""
prompt = f"""Based on this SQL query and its results, answer the user's question.
Question: {state['question']}
SQL executed:
```sql
{state['sql_query']}

Results: {json.dumps(state[‘query_result’][:20], indent=2, default=str)}

Provide a clear, concise answer with specific numbers. If relevant, mention trends or notable outliers. """

response = llm.invoke(prompt)
return {
**state,
"answer": response.content,
}
### Step 4: Build the Graph
```python
from langgraph.graph import StateGraph, END
# Build the graph
workflow = StateGraph(AgentState)
# Add nodes
workflow.add_node("discover", discover_node)
workflow.add_node("analyze", analyze_node)
workflow.add_node("report", report_node)
# Define edges
workflow.set_entry_point("discover")
workflow.add_edge("discover", "analyze")
workflow.add_edge("analyze", "report")
workflow.add_edge("report", END)
# Compile
agent = workflow.compile()

Step 5: Run It

result = agent.invoke({
"question": "Which products had the highest growth last quarter?",
"tables": [],
"schemas": {},
"sql_query": "",
"query_result": [],
"answer": "",
"messages": [],
})
print(result["answer"])

Example Conversation

User: “Which products had the highest revenue growth last quarter compared to the previous quarter?”

Agent internal steps:

  1. Discover: Found tables: raw.postgres.orders, raw.postgres.products, raw.postgres.customers

  2. Analyze: Generated SQL:

WITH quarterly_revenue AS (
SELECT
p.name AS product_name,
DATE_TRUNC('quarter', o.created_at) AS quarter,
SUM(o.amount) AS revenue
FROM raw.postgres.orders o
JOIN raw.postgres.products p ON o.product_id = p.id
WHERE o.created_at >= DATE_TRUNC('quarter', CURRENT_DATE) - INTERVAL '6 months'
GROUP BY p.name, DATE_TRUNC('quarter', o.created_at)
)
SELECT
product_name,
MAX(CASE WHEN quarter = DATE_TRUNC('quarter', CURRENT_DATE) - INTERVAL '3 months'
THEN revenue END) AS last_quarter,
MAX(CASE WHEN quarter = DATE_TRUNC('quarter', CURRENT_DATE) - INTERVAL '6 months'
THEN revenue END) AS prev_quarter,
ROUND(
(MAX(CASE WHEN quarter = DATE_TRUNC('quarter', CURRENT_DATE) - INTERVAL '3 months' THEN revenue END) -
MAX(CASE WHEN quarter = DATE_TRUNC('quarter', CURRENT_DATE) - INTERVAL '6 months' THEN revenue END)) /
NULLIF(MAX(CASE WHEN quarter = DATE_TRUNC('quarter', CURRENT_DATE) - INTERVAL '6 months' THEN revenue END), 0) * 100
, 1) AS growth_pct
FROM quarterly_revenue
GROUP BY product_name
HAVING COUNT(DISTINCT quarter) = 2
ORDER BY growth_pct DESC
LIMIT 10
  1. Report:

The top 3 products by revenue growth last quarter were:

  1. Enterprise Plan — +47.2% ($234K → $345K)
  2. API Add-on — +31.8% ($89K → $117K)
  3. Team Plan — +22.1% ($156K → $190K)

Enterprise Plan saw the strongest growth, likely driven by the Q4 sales push. The API Add-on’s growth correlates with the new integrations launched in October.

Adding Routing for Complex Questions

For production use, add a router that decides whether to query multiple tables or ask follow-up questions:

def router_node(state: AgentState) -> str:
"""Decide the next step based on what we know."""
if not state["tables"]:
return "discover"
if not state["sql_query"]:
return "analyze"
return "report"
def should_retry(state: AgentState) -> str:
"""Check if the query failed and needs retry."""
if state.get("query_error"):
return "analyze" # Re-generate SQL
return "report"
# Enhanced graph with retry logic
workflow = StateGraph(AgentState)
workflow.add_node("discover", discover_node)
workflow.add_node("analyze", analyze_node)
workflow.add_node("report", report_node)
workflow.set_entry_point("discover")
workflow.add_edge("discover", "analyze")
workflow.add_conditional_edges("analyze", should_retry)
workflow.add_edge("report", END)
agent = workflow.compile()

Adding Memory for Multi-Turn Conversations

from langgraph.checkpoint.memory import MemorySaver
memory = MemorySaver()
agent = workflow.compile(checkpointer=memory)
# First question
config = {"configurable": {"thread_id": "analyst-session-1"}}
result = agent.invoke(
{"question": "What's our total revenue this month?", ...},
config=config,
)
# Follow-up question (agent remembers context)
result = agent.invoke(
{"question": "Break that down by region", ...},
config=config,
)

Using Lens Natural Language Instead of SQL Generation

If you don’t want the LLM to generate SQL, use Lens’s built-in AI:

def analyze_with_ai_node(state: AgentState) -> AgentState:
"""Use Lens AI Ask instead of generating SQL ourselves."""
response = client.ask(state["question"])
return {
**state,
"sql_query": response["sql"],
"query_result": response["result"],
"answer": response["answer"],
}

This uses the Lens MCP server’s AI capabilities, which already know the schema and can generate optimized DuckDB SQL.

Production Considerations

  1. Error handling: Wrap client.query() in try/except and retry with corrected SQL
  2. Rate limiting: Add delays between LLM calls if using many concurrent agents
  3. Caching: Cache schema discovery — it doesn’t change often
  4. Guardrails: Validate generated SQL before execution (no DROP, DELETE, etc.)
  5. Observability: Log each node’s input/output for debugging
import logging
logger = logging.getLogger("data_analyst_agent")
def analyze_node(state: AgentState) -> AgentState:
# ... generate SQL ...
logger.info(f"Generated SQL: {sql_query}")
try:
result = client.query(sql_query)
logger.info(f"Query returned {len(result)} rows")
except Exception as e:
logger.error(f"Query failed: {e}")
return {**state, "query_error": str(e)}
return {**state, "sql_query": sql_query, "query_result": result}

Next Steps

  • Add more tools: chart generation, export to CSV, schedule reports
  • Connect to Slack for a /data command that invokes the agent
  • Add the DataSpoc Lens MCP server for Claude Desktop integration
  • Use CrewAI multi-agent patterns for complex analytical workflows

Recommended