Construindo um Agente de Data Lake com OpenAI Function Calling e DataSpoc
Você tem um data lake cheio de arquivos Parquet. Você quer que o GPT-4 responda perguntas de negócio sobre seus dados. A abordagem típica é RAG: gerar embeddings de tudo, colocar em um vector store e torcer para que a recuperação encontre as linhas certas. Existe um caminho melhor.
O function calling da OpenAI permite que o GPT-4 invoque seu código com argumentos estruturados. O DataSpoc Lens fornece um motor SQL sobre Parquet na nuvem. Combine-os e o GPT-4 pode escrever SQL real, executar contra seus dados reais e retornar respostas precisas.
A Arquitetura
User Question ↓GPT-4 (decides which function to call) ↓Function: list_tables / get_schema / run_query / ask_question ↓DataSpoc Lens (DuckDB over Parquet in S3/GCS/Azure) ↓Results back to GPT-4 ↓Natural language answerSem embeddings. Sem vector store. Sem chunking. Apenas SQL.
Instalar Dependências
pip install dataspoc-lens openaiDefinir as Ferramentas
Cada ferramenta encapsula um método do LensClient. A API da OpenAI precisa de um JSON schema para cada função:
import jsonfrom openai import OpenAIfrom dataspoc_lens import LensClient
client = OpenAI() # uses OPENAI_API_KEY env varlens = LensClient() # connects to your configured bucket
TOOLS = [ { "type": "function", "function": { "name": "list_tables", "description": "List all available tables in the data lake.", "parameters": {"type": "object", "properties": {}, "required": []}, }, }, { "type": "function", "function": { "name": "get_schema", "description": "Get the schema (column names, types) for a specific table.", "parameters": { "type": "object", "properties": { "table": { "type": "string", "description": "The table name to get the schema for.", } }, "required": ["table"], }, }, }, { "type": "function", "function": { "name": "run_query", "description": "Execute a SQL query against the data lake and return results.", "parameters": { "type": "object", "properties": { "sql": { "type": "string", "description": "The SQL query to execute. Use DuckDB SQL syntax.", } }, "required": ["sql"], }, }, }, { "type": "function", "function": { "name": "ask_question", "description": "Ask a natural language question about the data. Returns SQL + results.", "parameters": { "type": "object", "properties": { "question": { "type": "string", "description": "A natural language question about the data.", } }, "required": ["question"], }, }, },]Implementar o Despachante de Funções
def dispatch_function(name: str, arguments: dict) -> str: """Route function calls to LensClient methods.""" if name == "list_tables": tables = lens.tables() return json.dumps({"tables": tables})
elif name == "get_schema": schema = lens.schema(arguments["table"]) return json.dumps({"schema": schema})
elif name == "run_query": result = lens.query(arguments["sql"]) # Convert DataFrame to dict for JSON serialization return json.dumps({ "columns": list(result.columns), "rows": result.head(50).to_dict(orient="records"), "total_rows": len(result), })
elif name == "ask_question": answer = lens.ask(arguments["question"]) return json.dumps({"answer": str(answer)})
return json.dumps({"error": f"Unknown function: {name}"})O Loop de Conversação
Este é o loop principal do agente. O GPT-4 decide quando chamar funções e quando responder ao usuário:
def run_agent(user_question: str) -> str: """Run a full agent conversation until GPT-4 produces a final answer.""" messages = [ { "role": "system", "content": ( "You are a data analyst assistant. You have access to a data lake " "with multiple tables. Use the available functions to explore the " "data and answer the user's question accurately. Always verify " "table names and schemas before writing SQL." ), }, {"role": "user", "content": user_question}, ]
# Loop until GPT-4 gives a text response (not a function call) for _ in range(10): # max 10 iterations to prevent infinite loops response = client.chat.completions.create( model="gpt-4o", messages=messages, tools=TOOLS, tool_choice="auto", )
message = response.choices[0].message messages.append(message)
# If no tool calls, we have the final answer if not message.tool_calls: return message.content
# Process each tool call for tool_call in message.tool_calls: args = json.loads(tool_call.function.arguments) result = dispatch_function(tool_call.function.name, args)
messages.append({ "role": "tool", "tool_call_id": tool_call.id, "content": result, })
return "Agent reached maximum iterations without a final answer."Execute
# Simple question — GPT-4 will call list_tables, then get_schema, then run_queryanswer = run_agent("What were total sales by region last quarter?")print(answer)
# Complex question — multiple function callsanswer = run_agent( "Compare the top 5 customers by revenue this year vs last year. " "Show the percentage change.")print(answer)Uma execução típica se parece com isso:
User: What were total sales by region last quarter?
GPT-4 → list_tables() Result: ["raw_orders", "raw_customers", "curated_sales", "gold_revenue"]
GPT-4 → get_schema("curated_sales") Result: {"region": "VARCHAR", "amount": "DOUBLE", "sale_date": "DATE", ...}
GPT-4 → run_query("SELECT region, SUM(amount) as total FROM curated_sales WHERE sale_date >= '2026-01-01' GROUP BY region ORDER BY total DESC") Result: [{"region": "North America", "total": 1240000}, ...]
GPT-4 → "Total sales by region last quarter: North America led with $1.24M..."Por Que Isso Supera RAG para Dados Estruturados
| Aspecto | RAG (Vector Store) | Function Calling + Lens |
|---|---|---|
| Precisão | Aproximada (recupera chunks similares) | Exata (SQL sobre dados reais) |
| Agregações | Não consegue SUM/AVG/GROUP BY | Suporte completo a SQL |
| Setup | Pipeline de embedding + vector DB | pip install dataspoc-lens openai |
| Custo | Tokens de embedding + armazenamento + recuperação | Apenas tokens da consulta |
| Atualidade | Re-embed a cada mudança de dados | Sempre lê o Parquet mais recente |
| Auditabilidade | Recuperação opaca | Cada consulta é SQL que você pode revisar |
RAG funciona para documentos não estruturados. Para dados estruturados em um lake, function calling com SQL é estritamente melhor.
Adicionando Guardrails
Em produção, você quer prevenir consultas descontroladas e garantir acesso somente leitura:
def dispatch_function_safe(name: str, arguments: dict) -> str: """Production-safe dispatcher with guardrails.""" if name == "run_query": sql = arguments["sql"].strip().upper() # Block writes if any(keyword in sql for keyword in ["INSERT", "UPDATE", "DELETE", "DROP", "CREATE", "ALTER"]): return json.dumps({"error": "Write operations are not allowed."}) # Add LIMIT if missing if "LIMIT" not in sql: arguments["sql"] = arguments["sql"].rstrip(";") + " LIMIT 1000"
return dispatch_function(name, arguments)Script Completo Funcional
Aqui está o script completo que você pode copiar e executar:
#!/usr/bin/env python3"""Data lake agent using OpenAI function calling + DataSpoc Lens."""
import jsonfrom openai import OpenAIfrom dataspoc_lens import LensClient
client = OpenAI()lens = LensClient()
TOOLS = [ { "type": "function", "function": { "name": "list_tables", "description": "List all available tables in the data lake.", "parameters": {"type": "object", "properties": {}, "required": []}, }, }, { "type": "function", "function": { "name": "get_schema", "description": "Get column names and types for a table.", "parameters": { "type": "object", "properties": { "table": {"type": "string", "description": "Table name."} }, "required": ["table"], }, }, }, { "type": "function", "function": { "name": "run_query", "description": "Execute a SQL query against the data lake.", "parameters": { "type": "object", "properties": { "sql": {"type": "string", "description": "DuckDB SQL query."} }, "required": ["sql"], }, }, }, { "type": "function", "function": { "name": "ask_question", "description": "Ask a natural language question about the data.", "parameters": { "type": "object", "properties": { "question": {"type": "string", "description": "Question about the data."} }, "required": ["question"], }, }, },]
def dispatch(name: str, args: dict) -> str: if name == "list_tables": return json.dumps({"tables": lens.tables()}) elif name == "get_schema": return json.dumps({"schema": lens.schema(args["table"])}) elif name == "run_query": sql = args["sql"].strip() if any(k in sql.upper() for k in ["INSERT", "UPDATE", "DELETE", "DROP"]): return json.dumps({"error": "Write operations blocked."}) df = lens.query(sql) return json.dumps({ "columns": list(df.columns), "rows": df.head(50).to_dict(orient="records"), "total_rows": len(df), }) elif name == "ask_question": return json.dumps({"answer": str(lens.ask(args["question"]))}) return json.dumps({"error": f"Unknown function: {name}"})
def agent(question: str) -> str: messages = [ {"role": "system", "content": "You are a data analyst. Use the tools to explore and query the data lake."}, {"role": "user", "content": question}, ] for _ in range(10): resp = client.chat.completions.create( model="gpt-4o", messages=messages, tools=TOOLS, tool_choice="auto" ) msg = resp.choices[0].message messages.append(msg) if not msg.tool_calls: return msg.content for tc in msg.tool_calls: result = dispatch(tc.function.name, json.loads(tc.function.arguments)) messages.append({"role": "tool", "tool_call_id": tc.id, "content": result}) return "Max iterations reached."
if __name__ == "__main__": import sys question = " ".join(sys.argv[1:]) or "What tables are available and what data do they contain?" print(agent(question))Execute:
export OPENAI_API_KEY="sk-..."python data_lake_agent.py "What were the top 10 products by revenue last month?"Function calling transforma o GPT-4 de um gerador de texto em um analista de dados que escreve e executa SQL real contra seus dados reais. Sem números alucinados. Sem embeddings desatualizados. Apenas respostas precisas do seu lake.