Building a Data Analyst Agent with LangGraph and DataSpoc Lens
AI agents are most useful when they can access real data. In this tutorial, we build a LangGraph agent that acts as an autonomous data analyst — it discovers tables in your data lake, writes SQL queries, and answers business questions with real numbers.
What We’re Building
A conversational agent that:
- Discovers available tables in a data lake
- Inspects schemas to understand the data
- Writes and executes SQL queries
- Synthesizes answers in natural language
All powered by DataSpoc Lens SDK as the data layer.
Prerequisites
pip install dataspoc-lens langgraph langchain-openai pydanticMake sure you have a data lake configured:
dataspoc-lens add-bucket s3://company-lakedataspoc-lens discoverThe Architecture
User Question │ ▼┌─────────────┐│ LangGraph ││ Router │└──────┬──────┘ │ ├──► [discover] → List tables via LensClient │ ├──► [analyze] → Get schema + run SQL via LensClient │ └──► [report] → Synthesize answer with LLMFull Implementation
Step 1: Define the State
from typing import TypedDict, Annotated, Sequencefrom pydantic import BaseModel
class AgentState(TypedDict): question: str tables: list[str] schemas: dict[str, list[dict]] sql_query: str query_result: list[dict] answer: str messages: Sequence[dict]Step 2: Initialize the Lens Client
from dataspoc_lens import LensClient
# Connects to your configured data lakeclient = LensClient()Step 3: Define the Graph Nodes
from langchain_openai import ChatOpenAIimport json
llm = ChatOpenAI(model="gpt-4o", temperature=0)
def discover_node(state: AgentState) -> AgentState: """Discover all available tables in the data lake.""" tables = client.list_tables() schemas = {} for table in tables: schema = client.get_schema(table) schemas[table] = schema
return { **state, "tables": tables, "schemas": schemas, }
def analyze_node(state: AgentState) -> AgentState: """Generate and execute a SQL query based on the question.""" # Build context about available data schema_context = "" for table, columns in state["schemas"].items(): cols = ", ".join([f"{c['name']} ({c['type']})" for c in columns]) schema_context += f"\n- {table}: {cols}"
prompt = f"""You are a SQL analyst. Given these tables:{schema_context}
Write a DuckDB SQL query to answer: {state['question']}
Rules:- Use only the tables and columns listed above- Return only the SQL query, no explanation- Use aggregations and GROUP BY when appropriate- Limit results to 20 rows unless the question requires all"""
response = llm.invoke(prompt) sql_query = response.content.strip().strip("```sql").strip("```").strip()
# Execute the query result = client.query(sql_query)
return { **state, "sql_query": sql_query, "query_result": result, }
def report_node(state: AgentState) -> AgentState: """Synthesize a natural language answer from query results.""" prompt = f"""Based on this SQL query and its results, answer the user's question.
Question: {state['question']}
SQL executed:```sql{state['sql_query']}Results: {json.dumps(state[‘query_result’][:20], indent=2, default=str)}
Provide a clear, concise answer with specific numbers. If relevant, mention trends or notable outliers. """
response = llm.invoke(prompt)
return { **state, "answer": response.content,}### Step 4: Build the Graph
```pythonfrom langgraph.graph import StateGraph, END
# Build the graphworkflow = StateGraph(AgentState)
# Add nodesworkflow.add_node("discover", discover_node)workflow.add_node("analyze", analyze_node)workflow.add_node("report", report_node)
# Define edgesworkflow.set_entry_point("discover")workflow.add_edge("discover", "analyze")workflow.add_edge("analyze", "report")workflow.add_edge("report", END)
# Compileagent = workflow.compile()Step 5: Run It
result = agent.invoke({ "question": "Which products had the highest growth last quarter?", "tables": [], "schemas": {}, "sql_query": "", "query_result": [], "answer": "", "messages": [],})
print(result["answer"])Example Conversation
User: “Which products had the highest revenue growth last quarter compared to the previous quarter?”
Agent internal steps:
-
Discover: Found tables:
raw.postgres.orders,raw.postgres.products,raw.postgres.customers -
Analyze: Generated SQL:
WITH quarterly_revenue AS ( SELECT p.name AS product_name, DATE_TRUNC('quarter', o.created_at) AS quarter, SUM(o.amount) AS revenue FROM raw.postgres.orders o JOIN raw.postgres.products p ON o.product_id = p.id WHERE o.created_at >= DATE_TRUNC('quarter', CURRENT_DATE) - INTERVAL '6 months' GROUP BY p.name, DATE_TRUNC('quarter', o.created_at))SELECT product_name, MAX(CASE WHEN quarter = DATE_TRUNC('quarter', CURRENT_DATE) - INTERVAL '3 months' THEN revenue END) AS last_quarter, MAX(CASE WHEN quarter = DATE_TRUNC('quarter', CURRENT_DATE) - INTERVAL '6 months' THEN revenue END) AS prev_quarter, ROUND( (MAX(CASE WHEN quarter = DATE_TRUNC('quarter', CURRENT_DATE) - INTERVAL '3 months' THEN revenue END) - MAX(CASE WHEN quarter = DATE_TRUNC('quarter', CURRENT_DATE) - INTERVAL '6 months' THEN revenue END)) / NULLIF(MAX(CASE WHEN quarter = DATE_TRUNC('quarter', CURRENT_DATE) - INTERVAL '6 months' THEN revenue END), 0) * 100 , 1) AS growth_pctFROM quarterly_revenueGROUP BY product_nameHAVING COUNT(DISTINCT quarter) = 2ORDER BY growth_pct DESCLIMIT 10- Report:
The top 3 products by revenue growth last quarter were:
- Enterprise Plan — +47.2% ($234K → $345K)
- API Add-on — +31.8% ($89K → $117K)
- Team Plan — +22.1% ($156K → $190K)
Enterprise Plan saw the strongest growth, likely driven by the Q4 sales push. The API Add-on’s growth correlates with the new integrations launched in October.
Adding Routing for Complex Questions
For production use, add a router that decides whether to query multiple tables or ask follow-up questions:
def router_node(state: AgentState) -> str: """Decide the next step based on what we know.""" if not state["tables"]: return "discover" if not state["sql_query"]: return "analyze" return "report"
def should_retry(state: AgentState) -> str: """Check if the query failed and needs retry.""" if state.get("query_error"): return "analyze" # Re-generate SQL return "report"
# Enhanced graph with retry logicworkflow = StateGraph(AgentState)workflow.add_node("discover", discover_node)workflow.add_node("analyze", analyze_node)workflow.add_node("report", report_node)
workflow.set_entry_point("discover")workflow.add_edge("discover", "analyze")workflow.add_conditional_edges("analyze", should_retry)workflow.add_edge("report", END)
agent = workflow.compile()Adding Memory for Multi-Turn Conversations
from langgraph.checkpoint.memory import MemorySaver
memory = MemorySaver()agent = workflow.compile(checkpointer=memory)
# First questionconfig = {"configurable": {"thread_id": "analyst-session-1"}}result = agent.invoke( {"question": "What's our total revenue this month?", ...}, config=config,)
# Follow-up question (agent remembers context)result = agent.invoke( {"question": "Break that down by region", ...}, config=config,)Using Lens Natural Language Instead of SQL Generation
If you don’t want the LLM to generate SQL, use Lens’s built-in AI:
def analyze_with_ai_node(state: AgentState) -> AgentState: """Use Lens AI Ask instead of generating SQL ourselves.""" response = client.ask(state["question"])
return { **state, "sql_query": response["sql"], "query_result": response["result"], "answer": response["answer"], }This uses the Lens MCP server’s AI capabilities, which already know the schema and can generate optimized DuckDB SQL.
Production Considerations
- Error handling: Wrap
client.query()in try/except and retry with corrected SQL - Rate limiting: Add delays between LLM calls if using many concurrent agents
- Caching: Cache schema discovery — it doesn’t change often
- Guardrails: Validate generated SQL before execution (no DROP, DELETE, etc.)
- Observability: Log each node’s input/output for debugging
import logging
logger = logging.getLogger("data_analyst_agent")
def analyze_node(state: AgentState) -> AgentState: # ... generate SQL ... logger.info(f"Generated SQL: {sql_query}")
try: result = client.query(sql_query) logger.info(f"Query returned {len(result)} rows") except Exception as e: logger.error(f"Query failed: {e}") return {**state, "query_error": str(e)}
return {**state, "sql_query": sql_query, "query_result": result}Next Steps
- Add more tools: chart generation, export to CSV, schedule reports
- Connect to Slack for a
/datacommand that invokes the agent - Add the DataSpoc Lens MCP server for Claude Desktop integration
- Use CrewAI multi-agent patterns for complex analytical workflows