openaiai-agentspythondata-lakefunction-calling

Building a Data Lake Agent with OpenAI Function Calling and DataSpoc

Michael San Martim · 2026-04-15

You have a data lake full of Parquet files. You want GPT-4 to answer business questions about your data. The typical approach is RAG: embed everything, stuff it into a vector store, and hope the retrieval finds the right rows. There is a better way.

OpenAI’s function calling lets GPT-4 invoke your code with structured arguments. DataSpoc Lens gives you a SQL engine over cloud Parquet. Combine them and GPT-4 can write real SQL, execute it against your actual data, and return precise answers.

The Architecture

User Question
GPT-4 (decides which function to call)
Function: list_tables / get_schema / run_query / ask_question
DataSpoc Lens (DuckDB over Parquet in S3/GCS/Azure)
Results back to GPT-4
Natural language answer

No embeddings. No vector store. No chunking. Just SQL.

Install Dependencies

Terminal window
pip install dataspoc-lens openai

Define the Tools

Each tool wraps a LensClient method. OpenAI’s API needs a JSON schema for each function:

import json
from openai import OpenAI
from dataspoc_lens import LensClient
client = OpenAI() # uses OPENAI_API_KEY env var
lens = LensClient() # connects to your configured bucket
TOOLS = [
{
"type": "function",
"function": {
"name": "list_tables",
"description": "List all available tables in the data lake.",
"parameters": {"type": "object", "properties": {}, "required": []},
},
},
{
"type": "function",
"function": {
"name": "get_schema",
"description": "Get the schema (column names, types) for a specific table.",
"parameters": {
"type": "object",
"properties": {
"table": {
"type": "string",
"description": "The table name to get the schema for.",
}
},
"required": ["table"],
},
},
},
{
"type": "function",
"function": {
"name": "run_query",
"description": "Execute a SQL query against the data lake and return results.",
"parameters": {
"type": "object",
"properties": {
"sql": {
"type": "string",
"description": "The SQL query to execute. Use DuckDB SQL syntax.",
}
},
"required": ["sql"],
},
},
},
{
"type": "function",
"function": {
"name": "ask_question",
"description": "Ask a natural language question about the data. Returns SQL + results.",
"parameters": {
"type": "object",
"properties": {
"question": {
"type": "string",
"description": "A natural language question about the data.",
}
},
"required": ["question"],
},
},
},
]

Implement the Function Dispatcher

def dispatch_function(name: str, arguments: dict) -> str:
"""Route function calls to LensClient methods."""
if name == "list_tables":
tables = lens.tables()
return json.dumps({"tables": tables})
elif name == "get_schema":
schema = lens.schema(arguments["table"])
return json.dumps({"schema": schema})
elif name == "run_query":
result = lens.query(arguments["sql"])
# Convert DataFrame to dict for JSON serialization
return json.dumps({
"columns": list(result.columns),
"rows": result.head(50).to_dict(orient="records"),
"total_rows": len(result),
})
elif name == "ask_question":
answer = lens.ask(arguments["question"])
return json.dumps({"answer": str(answer)})
return json.dumps({"error": f"Unknown function: {name}"})

The Conversation Loop

This is the core agent loop. GPT-4 decides when to call functions and when to respond to the user:

def run_agent(user_question: str) -> str:
"""Run a full agent conversation until GPT-4 produces a final answer."""
messages = [
{
"role": "system",
"content": (
"You are a data analyst assistant. You have access to a data lake "
"with multiple tables. Use the available functions to explore the "
"data and answer the user's question accurately. Always verify "
"table names and schemas before writing SQL."
),
},
{"role": "user", "content": user_question},
]
# Loop until GPT-4 gives a text response (not a function call)
for _ in range(10): # max 10 iterations to prevent infinite loops
response = client.chat.completions.create(
model="gpt-4o",
messages=messages,
tools=TOOLS,
tool_choice="auto",
)
message = response.choices[0].message
messages.append(message)
# If no tool calls, we have the final answer
if not message.tool_calls:
return message.content
# Process each tool call
for tool_call in message.tool_calls:
args = json.loads(tool_call.function.arguments)
result = dispatch_function(tool_call.function.name, args)
messages.append({
"role": "tool",
"tool_call_id": tool_call.id,
"content": result,
})
return "Agent reached maximum iterations without a final answer."

Run It

# Simple question — GPT-4 will call list_tables, then get_schema, then run_query
answer = run_agent("What were total sales by region last quarter?")
print(answer)
# Complex question — multiple function calls
answer = run_agent(
"Compare the top 5 customers by revenue this year vs last year. "
"Show the percentage change."
)
print(answer)

A typical execution looks like this:

User: What were total sales by region last quarter?
GPT-4 → list_tables()
Result: ["raw_orders", "raw_customers", "curated_sales", "gold_revenue"]
GPT-4 → get_schema("curated_sales")
Result: {"region": "VARCHAR", "amount": "DOUBLE", "sale_date": "DATE", ...}
GPT-4 → run_query("SELECT region, SUM(amount) as total FROM curated_sales WHERE sale_date >= '2026-01-01' GROUP BY region ORDER BY total DESC")
Result: [{"region": "North America", "total": 1240000}, ...]
GPT-4 → "Total sales by region last quarter: North America led with $1.24M..."

Why This Beats RAG for Structured Data

AspectRAG (Vector Store)Function Calling + Lens
AccuracyApproximate (retrieves similar chunks)Exact (SQL on real data)
AggregationsCannot SUM/AVG/GROUP BYFull SQL support
SetupEmbedding pipeline + vector DBpip install dataspoc-lens openai
CostEmbedding tokens + storage + retrievalOnly query tokens
FreshnessRe-embed on every data changeAlways reads latest Parquet
AuditabilityOpaque retrievalEvery query is SQL you can review

RAG works for unstructured documents. For structured data in a lake, function calling with SQL is strictly better.

Adding Guardrails

In production, you want to prevent runaway queries and enforce read-only access:

def dispatch_function_safe(name: str, arguments: dict) -> str:
"""Production-safe dispatcher with guardrails."""
if name == "run_query":
sql = arguments["sql"].strip().upper()
# Block writes
if any(keyword in sql for keyword in ["INSERT", "UPDATE", "DELETE", "DROP", "CREATE", "ALTER"]):
return json.dumps({"error": "Write operations are not allowed."})
# Add LIMIT if missing
if "LIMIT" not in sql:
arguments["sql"] = arguments["sql"].rstrip(";") + " LIMIT 1000"
return dispatch_function(name, arguments)

Full Working Script

Here is the complete script you can copy and run:

#!/usr/bin/env python3
"""Data lake agent using OpenAI function calling + DataSpoc Lens."""
import json
from openai import OpenAI
from dataspoc_lens import LensClient
client = OpenAI()
lens = LensClient()
TOOLS = [
{
"type": "function",
"function": {
"name": "list_tables",
"description": "List all available tables in the data lake.",
"parameters": {"type": "object", "properties": {}, "required": []},
},
},
{
"type": "function",
"function": {
"name": "get_schema",
"description": "Get column names and types for a table.",
"parameters": {
"type": "object",
"properties": {
"table": {"type": "string", "description": "Table name."}
},
"required": ["table"],
},
},
},
{
"type": "function",
"function": {
"name": "run_query",
"description": "Execute a SQL query against the data lake.",
"parameters": {
"type": "object",
"properties": {
"sql": {"type": "string", "description": "DuckDB SQL query."}
},
"required": ["sql"],
},
},
},
{
"type": "function",
"function": {
"name": "ask_question",
"description": "Ask a natural language question about the data.",
"parameters": {
"type": "object",
"properties": {
"question": {"type": "string", "description": "Question about the data."}
},
"required": ["question"],
},
},
},
]
def dispatch(name: str, args: dict) -> str:
if name == "list_tables":
return json.dumps({"tables": lens.tables()})
elif name == "get_schema":
return json.dumps({"schema": lens.schema(args["table"])})
elif name == "run_query":
sql = args["sql"].strip()
if any(k in sql.upper() for k in ["INSERT", "UPDATE", "DELETE", "DROP"]):
return json.dumps({"error": "Write operations blocked."})
df = lens.query(sql)
return json.dumps({
"columns": list(df.columns),
"rows": df.head(50).to_dict(orient="records"),
"total_rows": len(df),
})
elif name == "ask_question":
return json.dumps({"answer": str(lens.ask(args["question"]))})
return json.dumps({"error": f"Unknown function: {name}"})
def agent(question: str) -> str:
messages = [
{"role": "system", "content": "You are a data analyst. Use the tools to explore and query the data lake."},
{"role": "user", "content": question},
]
for _ in range(10):
resp = client.chat.completions.create(
model="gpt-4o", messages=messages, tools=TOOLS, tool_choice="auto"
)
msg = resp.choices[0].message
messages.append(msg)
if not msg.tool_calls:
return msg.content
for tc in msg.tool_calls:
result = dispatch(tc.function.name, json.loads(tc.function.arguments))
messages.append({"role": "tool", "tool_call_id": tc.id, "content": result})
return "Max iterations reached."
if __name__ == "__main__":
import sys
question = " ".join(sys.argv[1:]) or "What tables are available and what data do they contain?"
print(agent(question))

Run it:

Terminal window
export OPENAI_API_KEY="sk-..."
python data_lake_agent.py "What were the top 10 products by revenue last month?"

Function calling turns GPT-4 from a text generator into a data analyst that writes and executes real SQL against your actual data. No hallucinated numbers. No stale embeddings. Just precise answers from your lake.

Recommended