Agent Integration
AI agents connected to DataSpoc Lens can explore your data lake, run SQL queries, ask natural language questions, and manage cache — all autonomously. This guide covers every integration path: MCP, Python SDK, multi-framework examples, and combining Lens with Pipe for end-to-end data workflows.
MCP Server Quick Setup
Section titled “MCP Server Quick Setup”Install
Section titled “Install”pip install dataspoc-lens[mcp]Start the server
Section titled “Start the server”dataspoc-lens mcpThe server uses stdio transport and is designed to be launched by an MCP client, not run manually.
Claude Desktop
Section titled “Claude Desktop”Add to claude_desktop_config.json:
- macOS:
~/Library/Application Support/Claude/claude_desktop_config.json - Linux:
~/.config/Claude/claude_desktop_config.json
{ "mcpServers": { "dataspoc-lens": { "command": "dataspoc-lens", "args": ["mcp"], "env": { "DATASPOC_BUCKET": "s3://my-data" } } }}If installed in a virtual environment, use the full path:
{ "mcpServers": { "dataspoc-lens": { "command": "/home/you/.venv/bin/dataspoc-lens", "args": ["mcp"] } }}Restart Claude Desktop after saving.
Cursor
Section titled “Cursor”Add to your Cursor MCP settings (Settings > MCP Servers):
{ "mcpServers": { "dataspoc-lens": { "command": "dataspoc-lens", "args": ["mcp"] } }}Claude Code
Section titled “Claude Code”Add to ~/.claude.json:
{ "mcpServers": { "dataspoc-lens": { "command": "dataspoc-lens", "args": ["mcp"] } }}Or add it per-project in .claude/settings.json:
{ "mcpServers": { "dataspoc-lens": { "command": "dataspoc-lens", "args": ["mcp"] } }}MCP Tools Reference
Section titled “MCP Tools Reference”The Lens MCP server exposes 7 tools:
| Tool | Description | Parameters |
|---|---|---|
list_tables | List all tables in the data lake | None |
describe_table | Show schema (columns and types) for a table | table (string) |
query | Execute a read-only SQL query | sql (string) |
ask | Ask a natural language question about the data | question (string) |
cache_status | Show status of all cached tables | None |
cache_refresh | Force-refresh cache for a specific table | table (string) |
cache_refresh_stale | Refresh all stale cached tables | None |
list_tables
Section titled “list_tables”List all available tables in the data lake.
Parameters: None
Returns: One table name per line, or “No tables found.”
Example call:
list_tables()Example response:
customersordersproductsdescribe_table
Section titled “describe_table”Show the schema of a table.
Parameters: table (string, required) — table name
Returns: JSON array of {"column_name": ..., "data_type": ...} objects.
Example call:
describe_table(table="orders")Example response:
[ {"column_name": "order_id", "data_type": "INTEGER"}, {"column_name": "customer_id", "data_type": "INTEGER"}, {"column_name": "order_date", "data_type": "DATE"}, {"column_name": "total", "data_type": "DOUBLE"}, {"column_name": "status", "data_type": "VARCHAR"}]Execute a read-only SQL query. Write operations (INSERT, UPDATE, DELETE, DROP, ALTER, TRUNCATE, CREATE, REPLACE) are rejected.
Parameters: sql (string, required) — SQL query
Returns: JSON with columns, rows, row_count, and duration.
Example call:
query(sql="SELECT status, COUNT(*) as cnt FROM orders GROUP BY status ORDER BY cnt DESC")Example response:
{ "columns": ["status", "cnt"], "rows": [ ["completed", 32100], ["pending", 8450], ["cancelled", 2130] ], "row_count": 3, "duration": 0.089}Rejected write example:
query(sql="DROP TABLE orders")Response:
{ "error": "Write operations are not allowed. Only SELECT, SHOW, DESCRIBE, and EXPLAIN queries are permitted."}Translate a natural language question to SQL and execute it. Uses AI to generate the SQL based on the available table schemas.
Parameters: question (string, required) — natural language question
Returns: JSON with sql, columns, rows, duration, and error.
Example call:
ask(question="What are the top 5 customers by total spending?")Example response:
{ "sql": "SELECT c.name, SUM(o.total) as total_spent FROM customers c JOIN orders o ON c.id = o.customer_id GROUP BY c.name ORDER BY total_spent DESC LIMIT 5", "columns": ["name", "total_spent"], "rows": [ ["Alice Smith", 15420.50], ["Bob Johnson", 12300.00], ["Carol Williams", 9870.25], ["David Brown", 8640.00], ["Eve Davis", 7210.75] ], "duration": 1.23, "error": null}cache_status
Section titled “cache_status”Show the status of all locally cached tables.
Parameters: None
Returns: JSON array of objects with table, status, cached_at, size_bytes.
Example call:
cache_status()Example response:
[ { "table": "orders", "status": "fresh", "cached_at": "2026-04-15T10:30:00", "size_bytes": 12902400 }, { "table": "customers", "status": "stale", "cached_at": "2026-04-13T08:00:00", "size_bytes": 2200000 }, { "table": "products", "status": "fresh", "cached_at": "2026-04-15T10:30:00", "size_bytes": 450000 }]cache_refresh
Section titled “cache_refresh”Force-refresh the local cache for a specific table.
Parameters: table (string, required) — table name
Returns: JSON with cached_at, size_bytes, and file_count.
Example call:
cache_refresh(table="customers")Example response:
{ "cached_at": "2026-04-15T14:00:00", "size_bytes": 2350000, "file_count": 2}cache_refresh_stale
Section titled “cache_refresh_stale”Refresh all tables where the cache status is stale.
Parameters: None
Returns: JSON array of refreshed tables.
Example call:
cache_refresh_stale()Example response:
[ { "table": "customers", "cached_at": "2026-04-15T14:00:00", "size_bytes": 2350000, "file_count": 2 }]MCP Resource
Section titled “MCP Resource”lens://tables
Section titled “lens://tables”An MCP resource that returns the full table catalog with schemas:
[ { "table": "customers", "columns": [ {"column_name": "id", "data_type": "INTEGER"}, {"column_name": "name", "data_type": "VARCHAR"}, {"column_name": "email", "data_type": "VARCHAR"}, {"column_name": "created_at", "data_type": "TIMESTAMP"} ] }, { "table": "orders", "columns": [ {"column_name": "order_id", "data_type": "INTEGER"}, {"column_name": "customer_id", "data_type": "INTEGER"}, {"column_name": "order_date", "data_type": "DATE"}, {"column_name": "total", "data_type": "DOUBLE"}, {"column_name": "status", "data_type": "VARCHAR"} ] }]Agents can read this resource upfront to understand the full schema before generating queries.
Example Agent Conversations
Section titled “Example Agent Conversations”1. Explore what data we have and give me a summary
Section titled “1. Explore what data we have and give me a summary”You: Explore what data we have and give me a summary.
Agent calls
list_tables():customersordersproducts
Agent calls
describe_table(table="customers"):[{"column_name": "id", "data_type": "INTEGER"},{"column_name": "name", "data_type": "VARCHAR"},{"column_name": "email", "data_type": "VARCHAR"},{"column_name": "created_at", "data_type": "TIMESTAMP"}]
Agent calls
describe_table(table="orders"):[{"column_name": "order_id", "data_type": "INTEGER"},{"column_name": "customer_id", "data_type": "INTEGER"},{"column_name": "order_date", "data_type": "DATE"},{"column_name": "total", "data_type": "DOUBLE"},{"column_name": "status", "data_type": "VARCHAR"}]
Agent calls
query(sql="SELECT COUNT(*) as total FROM customers")andquery(sql="SELECT COUNT(*) as total FROM orders")andquery(sql="SELECT COUNT(*) as total FROM products"):Here is a summary of your data lake:
Table Columns Rows customers 4 (id, name, email, created_at) 12,400 orders 5 (order_id, customer_id, order_date, total, status) 148,320 products 6 (product_id, name, category, price, sku, active) 2,450 The data looks like a typical e-commerce dataset with customer, order, and product information. The orders table is the largest with 148K records. Want me to dig deeper into any of these?
2. How many orders did we have yesterday? Break down by status
Section titled “2. How many orders did we have yesterday? Break down by status”You: How many orders did we have yesterday? Break down by status.
Agent calls
query(sql="SELECT status, COUNT(*) as cnt, SUM(total) as revenue FROM orders WHERE order_date = CURRENT_DATE - INTERVAL '1 day' GROUP BY status ORDER BY cnt DESC"):{"columns": ["status", "cnt", "revenue"],"rows": [["completed", 312, 28450.00],["pending", 45, 4120.50],["cancelled", 8, 640.00]],"row_count": 3,"duration": 0.045}Yesterday’s orders:
Status Count Revenue Completed 312 $28,450.00 Pending 45 $4,120.50 Cancelled 8 $640.00 Total 365 $33,210.50 85% of orders were completed. The 45 pending orders may need follow-up.
3. Create a weekly revenue report and export as CSV
Section titled “3. Create a weekly revenue report and export as CSV”You: Create a weekly revenue report and export as CSV.
Agent calls
query(sql="SELECT DATE_TRUNC('week', order_date) as week, COUNT(*) as orders, SUM(total) as revenue, AVG(total) as avg_order_value FROM orders WHERE order_date >= CURRENT_DATE - INTERVAL '8 weeks' GROUP BY week ORDER BY week DESC"):{"columns": ["week", "orders", "revenue", "avg_order_value"],"rows": [["2026-04-13", 365, 33210.50, 90.99],["2026-04-06", 2480, 225120.00, 90.77],["2026-04-01", 2510, 231450.00, 92.21],["2026-03-23", 2390, 214200.00, 89.62],["2026-03-16", 2450, 220500.00, 90.00],["2026-03-09", 2380, 213900.00, 89.87],["2026-03-02", 2520, 228600.00, 90.71],["2026-02-23", 2410, 218400.00, 90.62]],"row_count": 8,"duration": 0.12}Here is the weekly revenue report for the last 8 weeks. To export as CSV, you can use the CLI:
Terminal window dataspoc-lens query "SELECT DATE_TRUNC('week', order_date) as week, COUNT(*) as orders, SUM(total) as revenue, AVG(total) as avg_order_value FROM orders WHERE order_date >= CURRENT_DATE - INTERVAL '8 weeks' GROUP BY week ORDER BY week DESC" --output csv > weekly_revenue.csvOr from Python:
import csvfrom dataspoc_lens import LensClientwith LensClient() as client:result = client.query("SELECT ...")with open("weekly_revenue.csv", "w", newline="") as f:writer = csv.writer(f)writer.writerow(result["columns"])writer.writerows(result["rows"])
4. Check if our cache is fresh, refresh stale tables, then show me the latest numbers
Section titled “4. Check if our cache is fresh, refresh stale tables, then show me the latest numbers”You: Check if our cache is fresh, refresh stale tables, then show me the latest numbers.
Agent calls
cache_status():[{"table": "orders", "status": "stale", "cached_at": "2026-04-14T02:00:00", "size_bytes": 12902400},{"table": "customers", "status": "fresh", "cached_at": "2026-04-15T10:30:00", "size_bytes": 2200000},{"table": "products", "status": "fresh", "cached_at": "2026-04-15T10:30:00", "size_bytes": 450000}]The orders table cache is stale (last cached yesterday). Let me refresh it.
Agent calls
cache_refresh_stale():[{"table": "orders", "cached_at": "2026-04-15T14:00:00", "size_bytes": 13100000, "file_count": 4}]Cache refreshed. Now let me pull the latest numbers.
Agent calls
query(sql="SELECT COUNT(*) as total_orders, SUM(total) as total_revenue, AVG(total) as avg_order FROM orders WHERE order_date = CURRENT_DATE"):{"columns": ["total_orders", "total_revenue", "avg_order"],"rows": [[187, 17023.50, 91.03]],"row_count": 1,"duration": 0.034}All caches are now fresh. Today’s numbers so far:
- Orders: 187
- Revenue: $17,023.50
- Average order value: $91.03
Python SDK for Agents
Section titled “Python SDK for Agents”Use LensClient to build agents that query and analyze your data lake programmatically.
from dataspoc_lens import LensClient
client = LensClient()List tables
Section titled “List tables”tables = client.tables()# ['customers', 'orders', 'products']Get table schema
Section titled “Get table schema”columns = client.schema("orders")# [{"column_name": "order_id", "data_type": "INTEGER"}, ...]Run a SQL query
Section titled “Run a SQL query”result = client.query("SELECT COUNT(*) as total FROM orders")print(result["rows"]) # [[148320]]print(result["duration"]) # 0.034Ask a natural language question
Section titled “Ask a natural language question”answer = client.ask("Which product category has the highest revenue?")print(answer["sql"]) # 'SELECT p.category, SUM(o.total) ...'print(answer["rows"]) # [['Electronics', 452300.0], ...]Check and refresh cache
Section titled “Check and refresh cache”# Check statusfor entry in client.cache_status(): print(f"{entry['table']}: {entry['status']}")
# Refresh a specific tableclient.cache_refresh("orders")
# Refresh all stale tablesclient.cache_refresh_stale()Full example: automated data exploration
Section titled “Full example: automated data exploration”from dataspoc_lens import LensClient
with LensClient() as client: # Refresh stale caches first client.cache_refresh_stale()
# Explore all tables for table in client.tables(): cols = client.schema(table) count = client.query(f"SELECT COUNT(*) FROM {table}")["rows"][0][0] print(f"{table}: {len(cols)} columns, {count:,} rows")
# Run analysis result = client.query(""" SELECT DATE_TRUNC('month', order_date) as month, COUNT(*) as orders, SUM(total) as revenue FROM orders WHERE order_date >= '2026-01-01' GROUP BY month ORDER BY month """)
for row in result["rows"]: print(f"{row[0]}: {row[1]:,} orders, ${row[2]:,.2f}")CrewAI Integration Example
Section titled “CrewAI Integration Example”Build a CrewAI data analyst agent.
from crewai import Agent, Task, Crewfrom crewai.tools import toolfrom dataspoc_lens import LensClient
client = LensClient()
@tooldef list_tables() -> str: """List all tables available in the data lake.""" return "\n".join(client.tables())
@tooldef describe_table(table: str) -> str: """Get the schema of a table (columns and types).""" import json return json.dumps(client.schema(table), indent=2)
@tooldef run_query(sql: str) -> str: """Execute a read-only SQL query and return results.""" import json result = client.query(sql) return json.dumps(result, indent=2, default=str)
@tooldef ask_question(question: str) -> str: """Ask a natural language question about the data.""" import json result = client.ask(question) return json.dumps(result, indent=2, default=str)
data_analyst = Agent( role="Data Analyst", goal="Analyze data in the lake and provide actionable insights", backstory=( "You are a senior data analyst connected to a DataSpoc Lens data lake. " "You can discover tables, examine schemas, run SQL queries, and answer " "natural language questions about the data." ), tools=[list_tables, describe_table, run_query, ask_question], verbose=True,)
analysis_task = Task( description=( "Explore the data lake, understand the available tables, and produce " "a summary of key business metrics for this week: total orders, " "revenue, top customers, and any notable trends." ), expected_output="A business intelligence summary with key metrics and trends.", agent=data_analyst,)
crew = Crew(agents=[data_analyst], tasks=[analysis_task], verbose=True)result = crew.kickoff()print(result)LangGraph Integration Example
Section titled “LangGraph Integration Example”Build a LangGraph data analysis workflow.
from langgraph.graph import StateGraph, ENDfrom typing import TypedDictfrom dataspoc_lens import LensClient
client = LensClient()
class AnalysisState(TypedDict): tables: list[str] schemas: dict metrics: dict report: str
def discover_tables(state: AnalysisState) -> AnalysisState: """Discover all available tables and their schemas.""" tables = client.tables() schemas = {} for t in tables: schemas[t] = client.schema(t) return {**state, "tables": tables, "schemas": schemas}
def compute_metrics(state: AnalysisState) -> AnalysisState: """Compute key business metrics.""" metrics = {}
# Total counts for table in state["tables"]: result = client.query(f"SELECT COUNT(*) FROM {table}") metrics[f"{table}_count"] = result["rows"][0][0]
# Revenue metrics revenue = client.query(""" SELECT SUM(total) as total_revenue, AVG(total) as avg_order, COUNT(*) as total_orders FROM orders WHERE order_date >= CURRENT_DATE - INTERVAL '7 days' """) metrics["weekly_revenue"] = revenue["rows"][0][0] metrics["avg_order_value"] = revenue["rows"][0][1] metrics["weekly_orders"] = revenue["rows"][0][2]
return {**state, "metrics": metrics}
def generate_report(state: AnalysisState) -> AnalysisState: """Generate a text report from the metrics.""" m = state["metrics"] lines = [ "Weekly Business Report", "=" * 40, f"Tables: {', '.join(state['tables'])}", f"Weekly orders: {m.get('weekly_orders', 0):,}", f"Weekly revenue: ${m.get('weekly_revenue', 0):,.2f}", f"Avg order value: ${m.get('avg_order_value', 0):,.2f}", ] for t in state["tables"]: lines.append(f"{t}: {m.get(f'{t}_count', 0):,} total rows") return {**state, "report": "\n".join(lines)}
graph = StateGraph(AnalysisState)graph.add_node("discover", discover_tables)graph.add_node("metrics", compute_metrics)graph.add_node("report", generate_report)
graph.set_entry_point("discover")graph.add_edge("discover", "metrics")graph.add_edge("metrics", "report")graph.add_edge("report", END)
app = graph.compile()result = app.invoke({"tables": [], "schemas": {}, "metrics": {}, "report": ""})print(result["report"])AutoGen Integration Example
Section titled “AutoGen Integration Example”Build an AutoGen multi-agent data team with a researcher and an analyst.
import autogenfrom dataspoc_lens import LensClient
client = LensClient()
# Define the functions agents can calldef list_tables() -> str: """List all tables in the data lake.""" return ", ".join(client.tables())
def describe_table(table: str) -> str: """Get schema for a table.""" import json return json.dumps(client.schema(table), indent=2)
def run_query(sql: str) -> str: """Execute a SQL query.""" import json result = client.query(sql) return json.dumps(result, indent=2, default=str)
def ask_question(question: str) -> str: """Ask a natural language question about the data.""" import json result = client.ask(question) return json.dumps(result, indent=2, default=str)
# Configurationllm_config = { "model": "claude-sonnet-4-20250514", "api_type": "anthropic",}
# Data Researcher: discovers and describes dataresearcher = autogen.AssistantAgent( name="DataResearcher", system_message=( "You are a data researcher. Your job is to explore the data lake, " "discover tables, examine schemas, and understand the data structure. " "Report your findings to the analyst." ), llm_config=llm_config,)
# Data Analyst: runs queries and generates insightsanalyst = autogen.AssistantAgent( name="DataAnalyst", system_message=( "You are a senior data analyst. Based on the researcher's findings, " "write and execute SQL queries to answer business questions. " "Present results clearly with numbers and trends." ), llm_config=llm_config,)
# User proxy to execute function callsuser_proxy = autogen.UserProxyAgent( name="User", human_input_mode="NEVER", code_execution_config=False,)
# Register functionsfor func in [list_tables, describe_table, run_query, ask_question]: autogen.register_function( func, caller=researcher, executor=user_proxy, description=func.__doc__, ) autogen.register_function( func, caller=analyst, executor=user_proxy, description=func.__doc__, )
# Group chatgroupchat = autogen.GroupChat( agents=[user_proxy, researcher, analyst], messages=[], max_round=12,)
manager = autogen.GroupChatManager(groupchat=groupchat, llm_config=llm_config)
user_proxy.initiate_chat( manager, message=( "Explore our data lake, understand the tables, and produce a weekly " "business report with revenue, order trends, and top customers." ),)JSON Output for Shell Agents
Section titled “JSON Output for Shell Agents”All Lens CLI commands support --output json for machine-readable output.
List tables
Section titled “List tables”dataspoc-lens tables --output json["customers", "orders", "products"]Describe a table
Section titled “Describe a table”dataspoc-lens schema orders --output json[ {"column_name": "order_id", "data_type": "INTEGER"}, {"column_name": "customer_id", "data_type": "INTEGER"}, {"column_name": "order_date", "data_type": "DATE"}, {"column_name": "total", "data_type": "DOUBLE"}, {"column_name": "status", "data_type": "VARCHAR"}]Run a query and extract data with jq
Section titled “Run a query and extract data with jq”dataspoc-lens query "SELECT status, COUNT(*) as cnt FROM orders GROUP BY status" --output json | jq '.rows'[ ["completed", 32100], ["pending", 8450], ["cancelled", 2130]]Get row count only
Section titled “Get row count only”dataspoc-lens query "SELECT COUNT(*) FROM orders" --output json | jq '.rows[0][0]'148320Check cache freshness
Section titled “Check cache freshness”dataspoc-lens cache status --output json | jq '.[] | {table, status}'{"table": "orders", "status": "fresh"}{"table": "customers", "status": "stale"}{"table": "products", "status": "fresh"}Python subprocess example
Section titled “Python subprocess example”import subprocessimport json
def lens_query(sql: str) -> dict: result = subprocess.run( ["dataspoc-lens", "query", sql, "--output", "json"], capture_output=True, text=True, check=True, ) return json.loads(result.stdout)
data = lens_query("SELECT COUNT(*) as total FROM orders")print(data["rows"][0][0]) # 148320Shell script: daily metrics snapshot
Section titled “Shell script: daily metrics snapshot”#!/bin/bashecho "=== Daily Metrics Snapshot ==="echo "Date: $(date +%Y-%m-%d)"echo ""
# Refresh stale caches firstdataspoc-lens cache refresh-stale --output json | jq -r '.[] | "Refreshed: \(.table)"'
# Get key metricsecho ""echo "Orders today:"dataspoc-lens query "SELECT COUNT(*) as orders, SUM(total) as revenue FROM orders WHERE order_date = CURRENT_DATE" --output json | jq -r '.rows[0] | " Count: \(.[0]), Revenue: $\(.[1])"'
echo ""echo "Top 5 customers this week:"dataspoc-lens query "SELECT c.name, SUM(o.total) as spent FROM customers c JOIN orders o ON c.id = o.customer_id WHERE o.order_date >= CURRENT_DATE - INTERVAL '7 days' GROUP BY c.name ORDER BY spent DESC LIMIT 5" --output json | jq -r '.rows[] | " \(.[0]): $\(.[1])"'Combining Pipe + Lens Agents
Section titled “Combining Pipe + Lens Agents”The most powerful setup uses both MCP servers together. An agent can ingest fresh data with Pipe, then immediately query it with Lens.
Claude Desktop: both servers
Section titled “Claude Desktop: both servers”{ "mcpServers": { "dataspoc-pipe": { "command": "dataspoc-pipe", "args": ["mcp"], "env": { "DATASPOC_BUCKET": "s3://my-data" } }, "dataspoc-lens": { "command": "dataspoc-lens", "args": ["mcp"], "env": { "DATASPOC_BUCKET": "s3://my-data" } } }}Example conversation: ingest then query
Section titled “Example conversation: ingest then query”You: Run the sales pipeline to get fresh data, then tell me today’s revenue.
Agent calls Pipe
run_pipeline(name="sales"):{"success": true, "streams": {"orders": 5200}, "error": null}
Agent calls Lens
cache_refresh(table="orders"):{"cached_at": "2026-04-15T14:30:00", "size_bytes": 13100000, "file_count": 4}
Agent calls Lens
query(sql="SELECT SUM(total) as revenue FROM orders WHERE order_date = CURRENT_DATE"):{"columns": ["revenue"], "rows": [[17023.50]], "row_count": 1, "duration": 0.034}Done. The sales pipeline extracted 5,200 new records. After refreshing the cache, today’s revenue is $17,023.50.
Python SDK: end-to-end workflow
Section titled “Python SDK: end-to-end workflow”from dataspoc_pipe.sdk import PipeClientfrom dataspoc_lens import LensClient
pipe = PipeClient()lens = LensClient()
# Step 1: Run all pipelinesfor name in pipe.pipelines(): result = pipe.run(name) if result["success"]: total = sum(result["streams"].values()) print(f"Pipe [{name}]: {total} records") else: print(f"Pipe [{name}]: FAILED - {result['error']}")
# Step 2: Refresh Lens cachelens.cache_refresh_stale()
# Step 3: Query the fresh dataresult = lens.query(""" SELECT DATE_TRUNC('day', order_date) as day, COUNT(*) as orders, SUM(total) as revenue FROM orders WHERE order_date >= CURRENT_DATE - INTERVAL '7 days' GROUP BY day ORDER BY day""")
for row in result["rows"]: print(f"{row[0]}: {row[1]} orders, ${row[2]:,.2f}")
lens.close()Best Practices
Section titled “Best Practices”Refresh cache before querying
Section titled “Refresh cache before querying”If your data changes frequently, always refresh stale caches before running analytical queries. This ensures the agent works with the latest data.
client.cache_refresh_stale()result = client.query("SELECT ...")Handle large result sets
Section titled “Handle large result sets”The query tool returns all matching rows. For large tables, always use LIMIT or aggregate queries to avoid overwhelming the agent’s context window.
# Good: bounded resultresult = client.query("SELECT * FROM orders ORDER BY order_date DESC LIMIT 100")
# Good: aggregationresult = client.query("SELECT status, COUNT(*) FROM orders GROUP BY status")
# Avoid: unbounded scan of a large table# result = client.query("SELECT * FROM orders")Rate limiting for AI ask
Section titled “Rate limiting for AI ask”The ask tool calls an LLM to generate SQL. This costs tokens and takes longer than direct SQL queries. Use query when you already know the SQL, and reserve ask for genuinely ad-hoc natural language questions.
Error handling
Section titled “Error handling”Always check for errors in query results and ask results:
result = client.ask("What is the revenue trend?")if result["error"]: print(f"AI query failed: {result['error']}")else: for row in result["rows"]: print(row)Security considerations
Section titled “Security considerations”Lens enforces read-only queries at the server level. Write operations are rejected. However, agents still operate with the permissions of the user who started the MCP server. Ensure the bucket IAM policies are properly configured to restrict access.
Timeouts
Section titled “Timeouts”Complex queries over large datasets may take longer. If your agent framework has a timeout, set it appropriately (30 seconds or more for analytical queries). The duration field in query responses helps calibrate expectations.