Building a Data Lake Agent with OpenAI Function Calling and DataSpoc
You have a data lake full of Parquet files. You want GPT-4 to answer business questions about your data. The typical approach is RAG: embed everything, stuff it into a vector store, and hope the retrieval finds the right rows. There is a better way.
OpenAI’s function calling lets GPT-4 invoke your code with structured arguments. DataSpoc Lens gives you a SQL engine over cloud Parquet. Combine them and GPT-4 can write real SQL, execute it against your actual data, and return precise answers.
The Architecture
User Question ↓GPT-4 (decides which function to call) ↓Function: list_tables / get_schema / run_query / ask_question ↓DataSpoc Lens (DuckDB over Parquet in S3/GCS/Azure) ↓Results back to GPT-4 ↓Natural language answerNo embeddings. No vector store. No chunking. Just SQL.
Install Dependencies
pip install dataspoc-lens openaiDefine the Tools
Each tool wraps a LensClient method. OpenAI’s API needs a JSON schema for each function:
import jsonfrom openai import OpenAIfrom dataspoc_lens import LensClient
client = OpenAI() # uses OPENAI_API_KEY env varlens = LensClient() # connects to your configured bucket
TOOLS = [ { "type": "function", "function": { "name": "list_tables", "description": "List all available tables in the data lake.", "parameters": {"type": "object", "properties": {}, "required": []}, }, }, { "type": "function", "function": { "name": "get_schema", "description": "Get the schema (column names, types) for a specific table.", "parameters": { "type": "object", "properties": { "table": { "type": "string", "description": "The table name to get the schema for.", } }, "required": ["table"], }, }, }, { "type": "function", "function": { "name": "run_query", "description": "Execute a SQL query against the data lake and return results.", "parameters": { "type": "object", "properties": { "sql": { "type": "string", "description": "The SQL query to execute. Use DuckDB SQL syntax.", } }, "required": ["sql"], }, }, }, { "type": "function", "function": { "name": "ask_question", "description": "Ask a natural language question about the data. Returns SQL + results.", "parameters": { "type": "object", "properties": { "question": { "type": "string", "description": "A natural language question about the data.", } }, "required": ["question"], }, }, },]Implement the Function Dispatcher
def dispatch_function(name: str, arguments: dict) -> str: """Route function calls to LensClient methods.""" if name == "list_tables": tables = lens.tables() return json.dumps({"tables": tables})
elif name == "get_schema": schema = lens.schema(arguments["table"]) return json.dumps({"schema": schema})
elif name == "run_query": result = lens.query(arguments["sql"]) # Convert DataFrame to dict for JSON serialization return json.dumps({ "columns": list(result.columns), "rows": result.head(50).to_dict(orient="records"), "total_rows": len(result), })
elif name == "ask_question": answer = lens.ask(arguments["question"]) return json.dumps({"answer": str(answer)})
return json.dumps({"error": f"Unknown function: {name}"})The Conversation Loop
This is the core agent loop. GPT-4 decides when to call functions and when to respond to the user:
def run_agent(user_question: str) -> str: """Run a full agent conversation until GPT-4 produces a final answer.""" messages = [ { "role": "system", "content": ( "You are a data analyst assistant. You have access to a data lake " "with multiple tables. Use the available functions to explore the " "data and answer the user's question accurately. Always verify " "table names and schemas before writing SQL." ), }, {"role": "user", "content": user_question}, ]
# Loop until GPT-4 gives a text response (not a function call) for _ in range(10): # max 10 iterations to prevent infinite loops response = client.chat.completions.create( model="gpt-4o", messages=messages, tools=TOOLS, tool_choice="auto", )
message = response.choices[0].message messages.append(message)
# If no tool calls, we have the final answer if not message.tool_calls: return message.content
# Process each tool call for tool_call in message.tool_calls: args = json.loads(tool_call.function.arguments) result = dispatch_function(tool_call.function.name, args)
messages.append({ "role": "tool", "tool_call_id": tool_call.id, "content": result, })
return "Agent reached maximum iterations without a final answer."Run It
# Simple question — GPT-4 will call list_tables, then get_schema, then run_queryanswer = run_agent("What were total sales by region last quarter?")print(answer)
# Complex question — multiple function callsanswer = run_agent( "Compare the top 5 customers by revenue this year vs last year. " "Show the percentage change.")print(answer)A typical execution looks like this:
User: What were total sales by region last quarter?
GPT-4 → list_tables() Result: ["raw_orders", "raw_customers", "curated_sales", "gold_revenue"]
GPT-4 → get_schema("curated_sales") Result: {"region": "VARCHAR", "amount": "DOUBLE", "sale_date": "DATE", ...}
GPT-4 → run_query("SELECT region, SUM(amount) as total FROM curated_sales WHERE sale_date >= '2026-01-01' GROUP BY region ORDER BY total DESC") Result: [{"region": "North America", "total": 1240000}, ...]
GPT-4 → "Total sales by region last quarter: North America led with $1.24M..."Why This Beats RAG for Structured Data
| Aspect | RAG (Vector Store) | Function Calling + Lens |
|---|---|---|
| Accuracy | Approximate (retrieves similar chunks) | Exact (SQL on real data) |
| Aggregations | Cannot SUM/AVG/GROUP BY | Full SQL support |
| Setup | Embedding pipeline + vector DB | pip install dataspoc-lens openai |
| Cost | Embedding tokens + storage + retrieval | Only query tokens |
| Freshness | Re-embed on every data change | Always reads latest Parquet |
| Auditability | Opaque retrieval | Every query is SQL you can review |
RAG works for unstructured documents. For structured data in a lake, function calling with SQL is strictly better.
Adding Guardrails
In production, you want to prevent runaway queries and enforce read-only access:
def dispatch_function_safe(name: str, arguments: dict) -> str: """Production-safe dispatcher with guardrails.""" if name == "run_query": sql = arguments["sql"].strip().upper() # Block writes if any(keyword in sql for keyword in ["INSERT", "UPDATE", "DELETE", "DROP", "CREATE", "ALTER"]): return json.dumps({"error": "Write operations are not allowed."}) # Add LIMIT if missing if "LIMIT" not in sql: arguments["sql"] = arguments["sql"].rstrip(";") + " LIMIT 1000"
return dispatch_function(name, arguments)Full Working Script
Here is the complete script you can copy and run:
#!/usr/bin/env python3"""Data lake agent using OpenAI function calling + DataSpoc Lens."""
import jsonfrom openai import OpenAIfrom dataspoc_lens import LensClient
client = OpenAI()lens = LensClient()
TOOLS = [ { "type": "function", "function": { "name": "list_tables", "description": "List all available tables in the data lake.", "parameters": {"type": "object", "properties": {}, "required": []}, }, }, { "type": "function", "function": { "name": "get_schema", "description": "Get column names and types for a table.", "parameters": { "type": "object", "properties": { "table": {"type": "string", "description": "Table name."} }, "required": ["table"], }, }, }, { "type": "function", "function": { "name": "run_query", "description": "Execute a SQL query against the data lake.", "parameters": { "type": "object", "properties": { "sql": {"type": "string", "description": "DuckDB SQL query."} }, "required": ["sql"], }, }, }, { "type": "function", "function": { "name": "ask_question", "description": "Ask a natural language question about the data.", "parameters": { "type": "object", "properties": { "question": {"type": "string", "description": "Question about the data."} }, "required": ["question"], }, }, },]
def dispatch(name: str, args: dict) -> str: if name == "list_tables": return json.dumps({"tables": lens.tables()}) elif name == "get_schema": return json.dumps({"schema": lens.schema(args["table"])}) elif name == "run_query": sql = args["sql"].strip() if any(k in sql.upper() for k in ["INSERT", "UPDATE", "DELETE", "DROP"]): return json.dumps({"error": "Write operations blocked."}) df = lens.query(sql) return json.dumps({ "columns": list(df.columns), "rows": df.head(50).to_dict(orient="records"), "total_rows": len(df), }) elif name == "ask_question": return json.dumps({"answer": str(lens.ask(args["question"]))}) return json.dumps({"error": f"Unknown function: {name}"})
def agent(question: str) -> str: messages = [ {"role": "system", "content": "You are a data analyst. Use the tools to explore and query the data lake."}, {"role": "user", "content": question}, ] for _ in range(10): resp = client.chat.completions.create( model="gpt-4o", messages=messages, tools=TOOLS, tool_choice="auto" ) msg = resp.choices[0].message messages.append(msg) if not msg.tool_calls: return msg.content for tc in msg.tool_calls: result = dispatch(tc.function.name, json.loads(tc.function.arguments)) messages.append({"role": "tool", "tool_call_id": tc.id, "content": result}) return "Max iterations reached."
if __name__ == "__main__": import sys question = " ".join(sys.argv[1:]) or "What tables are available and what data do they contain?" print(agent(question))Run it:
export OPENAI_API_KEY="sk-..."python data_lake_agent.py "What were the top 10 products by revenue last month?"Function calling turns GPT-4 from a text generator into a data analyst that writes and executes real SQL against your actual data. No hallucinated numbers. No stale embeddings. Just precise answers from your lake.