openaiai-agentspythondata-lakefunction-calling

Construyendo un agente de Data Lake con OpenAI Function Calling y DataSpoc

Michael San Martim · 2026-04-15

Tienes un data lake lleno de archivos Parquet. Quieres que GPT-4 responda preguntas de negocio sobre tus datos. El enfoque tipico es RAG: convertir todo en embeddings, meterlo en un vector store y esperar que la recuperacion encuentre las filas correctas. Hay una mejor manera.

El function calling de OpenAI permite que GPT-4 invoque tu código con argumentos estructurados. DataSpoc Lens te da un motor SQL sobre Parquet en la nube. Combinalos y GPT-4 puede escribir SQL real, ejecutarlo contra tus datos reales y devolver respuestas precisas.

La arquitectura

User Question
GPT-4 (decides which function to call)
Function: list_tables / get_schema / run_query / ask_question
DataSpoc Lens (DuckDB over Parquet in S3/GCS/Azure)
Results back to GPT-4
Natural language answer

Sin embeddings. Sin vector store. Sin fragmentacion. Solo SQL.

Instalar dependencias

Terminal window
pip install dataspoc-lens openai

Definir las herramientas

Cada herramienta envuelve un metodo de LensClient. La API de OpenAI necesita un esquema JSON para cada función:

import json
from openai import OpenAI
from dataspoc_lens import LensClient
client = OpenAI() # uses OPENAI_API_KEY env var
lens = LensClient() # connects to your configured bucket
TOOLS = [
{
"type": "function",
"function": {
"name": "list_tables",
"description": "List all available tables in the data lake.",
"parameters": {"type": "object", "properties": {}, "required": []},
},
},
{
"type": "function",
"function": {
"name": "get_schema",
"description": "Get the schema (column names, types) for a specific table.",
"parameters": {
"type": "object",
"properties": {
"table": {
"type": "string",
"description": "The table name to get the schema for.",
}
},
"required": ["table"],
},
},
},
{
"type": "function",
"function": {
"name": "run_query",
"description": "Execute a SQL query against the data lake and return results.",
"parameters": {
"type": "object",
"properties": {
"sql": {
"type": "string",
"description": "The SQL query to execute. Use DuckDB SQL syntax.",
}
},
"required": ["sql"],
},
},
},
{
"type": "function",
"function": {
"name": "ask_question",
"description": "Ask a natural language question about the data. Returns SQL + results.",
"parameters": {
"type": "object",
"properties": {
"question": {
"type": "string",
"description": "A natural language question about the data.",
}
},
"required": ["question"],
},
},
},
]

Implementar el despachador de funciónes

def dispatch_function(name: str, arguments: dict) -> str:
"""Route function calls to LensClient methods."""
if name == "list_tables":
tables = lens.tables()
return json.dumps({"tables": tables})
elif name == "get_schema":
schema = lens.schema(arguments["table"])
return json.dumps({"schema": schema})
elif name == "run_query":
result = lens.query(arguments["sql"])
# Convert DataFrame to dict for JSON serialization
return json.dumps({
"columns": list(result.columns),
"rows": result.head(50).to_dict(orient="records"),
"total_rows": len(result),
})
elif name == "ask_question":
answer = lens.ask(arguments["question"])
return json.dumps({"answer": str(answer)})
return json.dumps({"error": f"Unknown function: {name}"})

El bucle de conversacion

Este es el bucle principal del agente. GPT-4 decide cuando llamar funciónes y cuando responder al usuario:

def run_agent(user_question: str) -> str:
"""Run a full agent conversation until GPT-4 produces a final answer."""
messages = [
{
"role": "system",
"content": (
"You are a data analyst assistant. You have access to a data lake "
"with multiple tables. Use the available functions to explore the "
"data and answer the user's question accurately. Always verify "
"table names and schemas before writing SQL."
),
},
{"role": "user", "content": user_question},
]
# Loop until GPT-4 gives a text response (not a function call)
for _ in range(10): # max 10 iterations to prevent infinite loops
response = client.chat.completions.create(
model="gpt-4o",
messages=messages,
tools=TOOLS,
tool_choice="auto",
)
message = response.choices[0].message
messages.append(message)
# If no tool calls, we have the final answer
if not message.tool_calls:
return message.content
# Process each tool call
for tool_call in message.tool_calls:
args = json.loads(tool_call.function.arguments)
result = dispatch_function(tool_call.function.name, args)
messages.append({
"role": "tool",
"tool_call_id": tool_call.id,
"content": result,
})
return "Agent reached maximum iterations without a final answer."

Ejecutarlo

# Simple question — GPT-4 will call list_tables, then get_schema, then run_query
answer = run_agent("What were total sales by region last quarter?")
print(answer)
# Complex question — multiple function calls
answer = run_agent(
"Compare the top 5 customers by revenue this year vs last year. "
"Show the percentage change."
)
print(answer)

Una ejecucion tipica se ve asi:

User: What were total sales by region last quarter?
GPT-4 → list_tables()
Result: ["raw_orders", "raw_customers", "curated_sales", "gold_revenue"]
GPT-4 → get_schema("curated_sales")
Result: {"region": "VARCHAR", "amount": "DOUBLE", "sale_date": "DATE", ...}
GPT-4 → run_query("SELECT region, SUM(amount) as total FROM curated_sales WHERE sale_date >= '2026-01-01' GROUP BY region ORDER BY total DESC")
Result: [{"region": "North America", "total": 1240000}, ...]
GPT-4 → "Total sales by region last quarter: North America led with $1.24M..."

Por que esto supera a RAG para datos estructurados

AspectoRAG (Vector Store)Function Calling + Lens
PrecisionApróximada (recupera fragmentos similares)Exacta (SQL sobre datos reales)
AgregacionesNo puede SUM/AVG/GROUP BYSoporte SQL completo
ConfiguracionPipeline de embedding + vector DBpip install dataspoc-lens openai
CostoTokens de embedding + almacenamiento + recuperacionSolo tokens de consulta
FrescuraRe-convertir en embeddings con cada cambio de datosSiempre lee el Parquet mas reciente
AuditabilidadRecuperacion opacaCada consulta es SQL que puedes revisar

RAG funcióna para documentos no estructurados. Para datos estructurados en un lake, function calling con SQL es estrictamente superior.

Agregar protecciones

En producción, quieres prevenir consultas descontroladas y forzar acceso de solo lectura:

def dispatch_function_safe(name: str, arguments: dict) -> str:
"""Production-safe dispatcher with guardrails."""
if name == "run_query":
sql = arguments["sql"].strip().upper()
# Block writes
if any(keyword in sql for keyword in ["INSERT", "UPDATE", "DELETE", "DROP", "CREATE", "ALTER"]):
return json.dumps({"error": "Write operations are not allowed."})
# Add LIMIT if missing
if "LIMIT" not in sql:
arguments["sql"] = arguments["sql"].rstrip(";") + " LIMIT 1000"
return dispatch_function(name, arguments)

Script completo funciónal

Aqui esta el script completo que puedes copiar y ejecutar:

#!/usr/bin/env python3
"""Data lake agent using OpenAI function calling + DataSpoc Lens."""
import json
from openai import OpenAI
from dataspoc_lens import LensClient
client = OpenAI()
lens = LensClient()
TOOLS = [
{
"type": "function",
"function": {
"name": "list_tables",
"description": "List all available tables in the data lake.",
"parameters": {"type": "object", "properties": {}, "required": []},
},
},
{
"type": "function",
"function": {
"name": "get_schema",
"description": "Get column names and types for a table.",
"parameters": {
"type": "object",
"properties": {
"table": {"type": "string", "description": "Table name."}
},
"required": ["table"],
},
},
},
{
"type": "function",
"function": {
"name": "run_query",
"description": "Execute a SQL query against the data lake.",
"parameters": {
"type": "object",
"properties": {
"sql": {"type": "string", "description": "DuckDB SQL query."}
},
"required": ["sql"],
},
},
},
{
"type": "function",
"function": {
"name": "ask_question",
"description": "Ask a natural language question about the data.",
"parameters": {
"type": "object",
"properties": {
"question": {"type": "string", "description": "Question about the data."}
},
"required": ["question"],
},
},
},
]
def dispatch(name: str, args: dict) -> str:
if name == "list_tables":
return json.dumps({"tables": lens.tables()})
elif name == "get_schema":
return json.dumps({"schema": lens.schema(args["table"])})
elif name == "run_query":
sql = args["sql"].strip()
if any(k in sql.upper() for k in ["INSERT", "UPDATE", "DELETE", "DROP"]):
return json.dumps({"error": "Write operations blocked."})
df = lens.query(sql)
return json.dumps({
"columns": list(df.columns),
"rows": df.head(50).to_dict(orient="records"),
"total_rows": len(df),
})
elif name == "ask_question":
return json.dumps({"answer": str(lens.ask(args["question"]))})
return json.dumps({"error": f"Unknown function: {name}"})
def agent(question: str) -> str:
messages = [
{"role": "system", "content": "You are a data analyst. Use the tools to explore and query the data lake."},
{"role": "user", "content": question},
]
for _ in range(10):
resp = client.chat.completions.create(
model="gpt-4o", messages=messages, tools=TOOLS, tool_choice="auto"
)
msg = resp.choices[0].message
messages.append(msg)
if not msg.tool_calls:
return msg.content
for tc in msg.tool_calls:
result = dispatch(tc.function.name, json.loads(tc.function.arguments))
messages.append({"role": "tool", "tool_call_id": tc.id, "content": result})
return "Max iterations reached."
if __name__ == "__main__":
import sys
question = " ".join(sys.argv[1:]) or "What tables are available and what data do they contain?"
print(agent(question))

Ejecutalo:

Terminal window
export OPENAI_API_KEY="sk-..."
python data_lake_agent.py "What were the top 10 products by revenue last month?"

Function calling convierte a GPT-4 de un generador de texto en un analista de datos que escribe y ejecuta SQL real contra tus datos reales. Sin números alucinados. Sin embeddings desactualizados. Solo respuestas precisas desde tu lake.

Recomendados