Construyendo un agente de Data Lake con OpenAI Function Calling y DataSpoc
Tienes un data lake lleno de archivos Parquet. Quieres que GPT-4 responda preguntas de negocio sobre tus datos. El enfoque tipico es RAG: convertir todo en embeddings, meterlo en un vector store y esperar que la recuperacion encuentre las filas correctas. Hay una mejor manera.
El function calling de OpenAI permite que GPT-4 invoque tu código con argumentos estructurados. DataSpoc Lens te da un motor SQL sobre Parquet en la nube. Combinalos y GPT-4 puede escribir SQL real, ejecutarlo contra tus datos reales y devolver respuestas precisas.
La arquitectura
User Question ↓GPT-4 (decides which function to call) ↓Function: list_tables / get_schema / run_query / ask_question ↓DataSpoc Lens (DuckDB over Parquet in S3/GCS/Azure) ↓Results back to GPT-4 ↓Natural language answerSin embeddings. Sin vector store. Sin fragmentacion. Solo SQL.
Instalar dependencias
pip install dataspoc-lens openaiDefinir las herramientas
Cada herramienta envuelve un metodo de LensClient. La API de OpenAI necesita un esquema JSON para cada función:
import jsonfrom openai import OpenAIfrom dataspoc_lens import LensClient
client = OpenAI() # uses OPENAI_API_KEY env varlens = LensClient() # connects to your configured bucket
TOOLS = [ { "type": "function", "function": { "name": "list_tables", "description": "List all available tables in the data lake.", "parameters": {"type": "object", "properties": {}, "required": []}, }, }, { "type": "function", "function": { "name": "get_schema", "description": "Get the schema (column names, types) for a specific table.", "parameters": { "type": "object", "properties": { "table": { "type": "string", "description": "The table name to get the schema for.", } }, "required": ["table"], }, }, }, { "type": "function", "function": { "name": "run_query", "description": "Execute a SQL query against the data lake and return results.", "parameters": { "type": "object", "properties": { "sql": { "type": "string", "description": "The SQL query to execute. Use DuckDB SQL syntax.", } }, "required": ["sql"], }, }, }, { "type": "function", "function": { "name": "ask_question", "description": "Ask a natural language question about the data. Returns SQL + results.", "parameters": { "type": "object", "properties": { "question": { "type": "string", "description": "A natural language question about the data.", } }, "required": ["question"], }, }, },]Implementar el despachador de funciónes
def dispatch_function(name: str, arguments: dict) -> str: """Route function calls to LensClient methods.""" if name == "list_tables": tables = lens.tables() return json.dumps({"tables": tables})
elif name == "get_schema": schema = lens.schema(arguments["table"]) return json.dumps({"schema": schema})
elif name == "run_query": result = lens.query(arguments["sql"]) # Convert DataFrame to dict for JSON serialization return json.dumps({ "columns": list(result.columns), "rows": result.head(50).to_dict(orient="records"), "total_rows": len(result), })
elif name == "ask_question": answer = lens.ask(arguments["question"]) return json.dumps({"answer": str(answer)})
return json.dumps({"error": f"Unknown function: {name}"})El bucle de conversacion
Este es el bucle principal del agente. GPT-4 decide cuando llamar funciónes y cuando responder al usuario:
def run_agent(user_question: str) -> str: """Run a full agent conversation until GPT-4 produces a final answer.""" messages = [ { "role": "system", "content": ( "You are a data analyst assistant. You have access to a data lake " "with multiple tables. Use the available functions to explore the " "data and answer the user's question accurately. Always verify " "table names and schemas before writing SQL." ), }, {"role": "user", "content": user_question}, ]
# Loop until GPT-4 gives a text response (not a function call) for _ in range(10): # max 10 iterations to prevent infinite loops response = client.chat.completions.create( model="gpt-4o", messages=messages, tools=TOOLS, tool_choice="auto", )
message = response.choices[0].message messages.append(message)
# If no tool calls, we have the final answer if not message.tool_calls: return message.content
# Process each tool call for tool_call in message.tool_calls: args = json.loads(tool_call.function.arguments) result = dispatch_function(tool_call.function.name, args)
messages.append({ "role": "tool", "tool_call_id": tool_call.id, "content": result, })
return "Agent reached maximum iterations without a final answer."Ejecutarlo
# Simple question — GPT-4 will call list_tables, then get_schema, then run_queryanswer = run_agent("What were total sales by region last quarter?")print(answer)
# Complex question — multiple function callsanswer = run_agent( "Compare the top 5 customers by revenue this year vs last year. " "Show the percentage change.")print(answer)Una ejecucion tipica se ve asi:
User: What were total sales by region last quarter?
GPT-4 → list_tables() Result: ["raw_orders", "raw_customers", "curated_sales", "gold_revenue"]
GPT-4 → get_schema("curated_sales") Result: {"region": "VARCHAR", "amount": "DOUBLE", "sale_date": "DATE", ...}
GPT-4 → run_query("SELECT region, SUM(amount) as total FROM curated_sales WHERE sale_date >= '2026-01-01' GROUP BY region ORDER BY total DESC") Result: [{"region": "North America", "total": 1240000}, ...]
GPT-4 → "Total sales by region last quarter: North America led with $1.24M..."Por que esto supera a RAG para datos estructurados
| Aspecto | RAG (Vector Store) | Function Calling + Lens |
|---|---|---|
| Precision | Apróximada (recupera fragmentos similares) | Exacta (SQL sobre datos reales) |
| Agregaciones | No puede SUM/AVG/GROUP BY | Soporte SQL completo |
| Configuracion | Pipeline de embedding + vector DB | pip install dataspoc-lens openai |
| Costo | Tokens de embedding + almacenamiento + recuperacion | Solo tokens de consulta |
| Frescura | Re-convertir en embeddings con cada cambio de datos | Siempre lee el Parquet mas reciente |
| Auditabilidad | Recuperacion opaca | Cada consulta es SQL que puedes revisar |
RAG funcióna para documentos no estructurados. Para datos estructurados en un lake, function calling con SQL es estrictamente superior.
Agregar protecciones
En producción, quieres prevenir consultas descontroladas y forzar acceso de solo lectura:
def dispatch_function_safe(name: str, arguments: dict) -> str: """Production-safe dispatcher with guardrails.""" if name == "run_query": sql = arguments["sql"].strip().upper() # Block writes if any(keyword in sql for keyword in ["INSERT", "UPDATE", "DELETE", "DROP", "CREATE", "ALTER"]): return json.dumps({"error": "Write operations are not allowed."}) # Add LIMIT if missing if "LIMIT" not in sql: arguments["sql"] = arguments["sql"].rstrip(";") + " LIMIT 1000"
return dispatch_function(name, arguments)Script completo funciónal
Aqui esta el script completo que puedes copiar y ejecutar:
#!/usr/bin/env python3"""Data lake agent using OpenAI function calling + DataSpoc Lens."""
import jsonfrom openai import OpenAIfrom dataspoc_lens import LensClient
client = OpenAI()lens = LensClient()
TOOLS = [ { "type": "function", "function": { "name": "list_tables", "description": "List all available tables in the data lake.", "parameters": {"type": "object", "properties": {}, "required": []}, }, }, { "type": "function", "function": { "name": "get_schema", "description": "Get column names and types for a table.", "parameters": { "type": "object", "properties": { "table": {"type": "string", "description": "Table name."} }, "required": ["table"], }, }, }, { "type": "function", "function": { "name": "run_query", "description": "Execute a SQL query against the data lake.", "parameters": { "type": "object", "properties": { "sql": {"type": "string", "description": "DuckDB SQL query."} }, "required": ["sql"], }, }, }, { "type": "function", "function": { "name": "ask_question", "description": "Ask a natural language question about the data.", "parameters": { "type": "object", "properties": { "question": {"type": "string", "description": "Question about the data."} }, "required": ["question"], }, }, },]
def dispatch(name: str, args: dict) -> str: if name == "list_tables": return json.dumps({"tables": lens.tables()}) elif name == "get_schema": return json.dumps({"schema": lens.schema(args["table"])}) elif name == "run_query": sql = args["sql"].strip() if any(k in sql.upper() for k in ["INSERT", "UPDATE", "DELETE", "DROP"]): return json.dumps({"error": "Write operations blocked."}) df = lens.query(sql) return json.dumps({ "columns": list(df.columns), "rows": df.head(50).to_dict(orient="records"), "total_rows": len(df), }) elif name == "ask_question": return json.dumps({"answer": str(lens.ask(args["question"]))}) return json.dumps({"error": f"Unknown function: {name}"})
def agent(question: str) -> str: messages = [ {"role": "system", "content": "You are a data analyst. Use the tools to explore and query the data lake."}, {"role": "user", "content": question}, ] for _ in range(10): resp = client.chat.completions.create( model="gpt-4o", messages=messages, tools=TOOLS, tool_choice="auto" ) msg = resp.choices[0].message messages.append(msg) if not msg.tool_calls: return msg.content for tc in msg.tool_calls: result = dispatch(tc.function.name, json.loads(tc.function.arguments)) messages.append({"role": "tool", "tool_call_id": tc.id, "content": result}) return "Max iterations reached."
if __name__ == "__main__": import sys question = " ".join(sys.argv[1:]) or "What tables are available and what data do they contain?" print(agent(question))Ejecutalo:
export OPENAI_API_KEY="sk-..."python data_lake_agent.py "What were the top 10 products by revenue last month?"Function calling convierte a GPT-4 de un generador de texto en un analista de datos que escribe y ejecuta SQL real contra tus datos reales. Sin números alucinados. Sin embeddings desactualizados. Solo respuestas precisas desde tu lake.