Pular para o conteúdo

Agent Integration

Este conteúdo não está disponível em sua língua ainda.

AI agents connected to DataSpoc Lens can explore your data lake, run SQL queries, ask natural language questions, and manage cache — all autonomously. This guide covers every integration path: MCP, Python SDK, multi-framework examples, and combining Lens with Pipe for end-to-end data workflows.

Terminal window
pip install dataspoc-lens[mcp]
Terminal window
dataspoc-lens mcp

The server uses stdio transport and is designed to be launched by an MCP client, not run manually.

Add to claude_desktop_config.json:

  • macOS: ~/Library/Application Support/Claude/claude_desktop_config.json
  • Linux: ~/.config/Claude/claude_desktop_config.json
{
"mcpServers": {
"dataspoc-lens": {
"command": "dataspoc-lens",
"args": ["mcp"],
"env": {
"DATASPOC_BUCKET": "s3://my-data"
}
}
}
}

If installed in a virtual environment, use the full path:

{
"mcpServers": {
"dataspoc-lens": {
"command": "/home/you/.venv/bin/dataspoc-lens",
"args": ["mcp"]
}
}
}

Restart Claude Desktop after saving.

Add to your Cursor MCP settings (Settings > MCP Servers):

{
"mcpServers": {
"dataspoc-lens": {
"command": "dataspoc-lens",
"args": ["mcp"]
}
}
}

Add to ~/.claude.json:

{
"mcpServers": {
"dataspoc-lens": {
"command": "dataspoc-lens",
"args": ["mcp"]
}
}
}

Or add it per-project in .claude/settings.json:

{
"mcpServers": {
"dataspoc-lens": {
"command": "dataspoc-lens",
"args": ["mcp"]
}
}
}

The Lens MCP server exposes 7 tools:

ToolDescriptionParameters
list_tablesList all tables in the data lakeNone
describe_tableShow schema (columns and types) for a tabletable (string)
queryExecute a read-only SQL querysql (string)
askAsk a natural language question about the dataquestion (string)
cache_statusShow status of all cached tablesNone
cache_refreshForce-refresh cache for a specific tabletable (string)
cache_refresh_staleRefresh all stale cached tablesNone

List all available tables in the data lake.

Parameters: None

Returns: One table name per line, or “No tables found.”

Example call:

list_tables()

Example response:

customers
orders
products

Show the schema of a table.

Parameters: table (string, required) — table name

Returns: JSON array of {"column_name": ..., "data_type": ...} objects.

Example call:

describe_table(table="orders")

Example response:

[
{"column_name": "order_id", "data_type": "INTEGER"},
{"column_name": "customer_id", "data_type": "INTEGER"},
{"column_name": "order_date", "data_type": "DATE"},
{"column_name": "total", "data_type": "DOUBLE"},
{"column_name": "status", "data_type": "VARCHAR"}
]

Execute a read-only SQL query. Write operations (INSERT, UPDATE, DELETE, DROP, ALTER, TRUNCATE, CREATE, REPLACE) are rejected.

Parameters: sql (string, required) — SQL query

Returns: JSON with columns, rows, row_count, and duration.

Example call:

query(sql="SELECT status, COUNT(*) as cnt FROM orders GROUP BY status ORDER BY cnt DESC")

Example response:

{
"columns": ["status", "cnt"],
"rows": [
["completed", 32100],
["pending", 8450],
["cancelled", 2130]
],
"row_count": 3,
"duration": 0.089
}

Rejected write example:

query(sql="DROP TABLE orders")

Response:

{
"error": "Write operations are not allowed. Only SELECT, SHOW, DESCRIBE, and EXPLAIN queries are permitted."
}

Translate a natural language question to SQL and execute it. Uses AI to generate the SQL based on the available table schemas.

Parameters: question (string, required) — natural language question

Returns: JSON with sql, columns, rows, duration, and error.

Example call:

ask(question="What are the top 5 customers by total spending?")

Example response:

{
"sql": "SELECT c.name, SUM(o.total) as total_spent FROM customers c JOIN orders o ON c.id = o.customer_id GROUP BY c.name ORDER BY total_spent DESC LIMIT 5",
"columns": ["name", "total_spent"],
"rows": [
["Alice Smith", 15420.50],
["Bob Johnson", 12300.00],
["Carol Williams", 9870.25],
["David Brown", 8640.00],
["Eve Davis", 7210.75]
],
"duration": 1.23,
"error": null
}

Show the status of all locally cached tables.

Parameters: None

Returns: JSON array of objects with table, status, cached_at, size_bytes.

Example call:

cache_status()

Example response:

[
{
"table": "orders",
"status": "fresh",
"cached_at": "2026-04-15T10:30:00",
"size_bytes": 12902400
},
{
"table": "customers",
"status": "stale",
"cached_at": "2026-04-13T08:00:00",
"size_bytes": 2200000
},
{
"table": "products",
"status": "fresh",
"cached_at": "2026-04-15T10:30:00",
"size_bytes": 450000
}
]

Force-refresh the local cache for a specific table.

Parameters: table (string, required) — table name

Returns: JSON with cached_at, size_bytes, and file_count.

Example call:

cache_refresh(table="customers")

Example response:

{
"cached_at": "2026-04-15T14:00:00",
"size_bytes": 2350000,
"file_count": 2
}

Refresh all tables where the cache status is stale.

Parameters: None

Returns: JSON array of refreshed tables.

Example call:

cache_refresh_stale()

Example response:

[
{
"table": "customers",
"cached_at": "2026-04-15T14:00:00",
"size_bytes": 2350000,
"file_count": 2
}
]

An MCP resource that returns the full table catalog with schemas:

[
{
"table": "customers",
"columns": [
{"column_name": "id", "data_type": "INTEGER"},
{"column_name": "name", "data_type": "VARCHAR"},
{"column_name": "email", "data_type": "VARCHAR"},
{"column_name": "created_at", "data_type": "TIMESTAMP"}
]
},
{
"table": "orders",
"columns": [
{"column_name": "order_id", "data_type": "INTEGER"},
{"column_name": "customer_id", "data_type": "INTEGER"},
{"column_name": "order_date", "data_type": "DATE"},
{"column_name": "total", "data_type": "DOUBLE"},
{"column_name": "status", "data_type": "VARCHAR"}
]
}
]

Agents can read this resource upfront to understand the full schema before generating queries.

1. Explore what data we have and give me a summary

Section titled “1. Explore what data we have and give me a summary”

You: Explore what data we have and give me a summary.

Agent calls list_tables():

customers
orders
products

Agent calls describe_table(table="customers"):

[
{"column_name": "id", "data_type": "INTEGER"},
{"column_name": "name", "data_type": "VARCHAR"},
{"column_name": "email", "data_type": "VARCHAR"},
{"column_name": "created_at", "data_type": "TIMESTAMP"}
]

Agent calls describe_table(table="orders"):

[
{"column_name": "order_id", "data_type": "INTEGER"},
{"column_name": "customer_id", "data_type": "INTEGER"},
{"column_name": "order_date", "data_type": "DATE"},
{"column_name": "total", "data_type": "DOUBLE"},
{"column_name": "status", "data_type": "VARCHAR"}
]

Agent calls query(sql="SELECT COUNT(*) as total FROM customers") and query(sql="SELECT COUNT(*) as total FROM orders") and query(sql="SELECT COUNT(*) as total FROM products"):

Here is a summary of your data lake:

TableColumnsRows
customers4 (id, name, email, created_at)12,400
orders5 (order_id, customer_id, order_date, total, status)148,320
products6 (product_id, name, category, price, sku, active)2,450

The data looks like a typical e-commerce dataset with customer, order, and product information. The orders table is the largest with 148K records. Want me to dig deeper into any of these?

2. How many orders did we have yesterday? Break down by status

Section titled “2. How many orders did we have yesterday? Break down by status”

You: How many orders did we have yesterday? Break down by status.

Agent calls query(sql="SELECT status, COUNT(*) as cnt, SUM(total) as revenue FROM orders WHERE order_date = CURRENT_DATE - INTERVAL '1 day' GROUP BY status ORDER BY cnt DESC"):

{
"columns": ["status", "cnt", "revenue"],
"rows": [
["completed", 312, 28450.00],
["pending", 45, 4120.50],
["cancelled", 8, 640.00]
],
"row_count": 3,
"duration": 0.045
}

Yesterday’s orders:

StatusCountRevenue
Completed312$28,450.00
Pending45$4,120.50
Cancelled8$640.00
Total365$33,210.50

85% of orders were completed. The 45 pending orders may need follow-up.

3. Create a weekly revenue report and export as CSV

Section titled “3. Create a weekly revenue report and export as CSV”

You: Create a weekly revenue report and export as CSV.

Agent calls query(sql="SELECT DATE_TRUNC('week', order_date) as week, COUNT(*) as orders, SUM(total) as revenue, AVG(total) as avg_order_value FROM orders WHERE order_date >= CURRENT_DATE - INTERVAL '8 weeks' GROUP BY week ORDER BY week DESC"):

{
"columns": ["week", "orders", "revenue", "avg_order_value"],
"rows": [
["2026-04-13", 365, 33210.50, 90.99],
["2026-04-06", 2480, 225120.00, 90.77],
["2026-04-01", 2510, 231450.00, 92.21],
["2026-03-23", 2390, 214200.00, 89.62],
["2026-03-16", 2450, 220500.00, 90.00],
["2026-03-09", 2380, 213900.00, 89.87],
["2026-03-02", 2520, 228600.00, 90.71],
["2026-02-23", 2410, 218400.00, 90.62]
],
"row_count": 8,
"duration": 0.12
}

Here is the weekly revenue report for the last 8 weeks. To export as CSV, you can use the CLI:

Terminal window
dataspoc-lens query "SELECT DATE_TRUNC('week', order_date) as week, COUNT(*) as orders, SUM(total) as revenue, AVG(total) as avg_order_value FROM orders WHERE order_date >= CURRENT_DATE - INTERVAL '8 weeks' GROUP BY week ORDER BY week DESC" --output csv > weekly_revenue.csv

Or from Python:

import csv
from dataspoc_lens import LensClient
with LensClient() as client:
result = client.query("SELECT ...")
with open("weekly_revenue.csv", "w", newline="") as f:
writer = csv.writer(f)
writer.writerow(result["columns"])
writer.writerows(result["rows"])

4. Check if our cache is fresh, refresh stale tables, then show me the latest numbers

Section titled “4. Check if our cache is fresh, refresh stale tables, then show me the latest numbers”

You: Check if our cache is fresh, refresh stale tables, then show me the latest numbers.

Agent calls cache_status():

[
{"table": "orders", "status": "stale", "cached_at": "2026-04-14T02:00:00", "size_bytes": 12902400},
{"table": "customers", "status": "fresh", "cached_at": "2026-04-15T10:30:00", "size_bytes": 2200000},
{"table": "products", "status": "fresh", "cached_at": "2026-04-15T10:30:00", "size_bytes": 450000}
]

The orders table cache is stale (last cached yesterday). Let me refresh it.

Agent calls cache_refresh_stale():

[
{"table": "orders", "cached_at": "2026-04-15T14:00:00", "size_bytes": 13100000, "file_count": 4}
]

Cache refreshed. Now let me pull the latest numbers.

Agent calls query(sql="SELECT COUNT(*) as total_orders, SUM(total) as total_revenue, AVG(total) as avg_order FROM orders WHERE order_date = CURRENT_DATE"):

{
"columns": ["total_orders", "total_revenue", "avg_order"],
"rows": [[187, 17023.50, 91.03]],
"row_count": 1,
"duration": 0.034
}

All caches are now fresh. Today’s numbers so far:

  • Orders: 187
  • Revenue: $17,023.50
  • Average order value: $91.03

Use LensClient to build agents that query and analyze your data lake programmatically.

from dataspoc_lens import LensClient
client = LensClient()
tables = client.tables()
# ['customers', 'orders', 'products']
columns = client.schema("orders")
# [{"column_name": "order_id", "data_type": "INTEGER"}, ...]
result = client.query("SELECT COUNT(*) as total FROM orders")
print(result["rows"]) # [[148320]]
print(result["duration"]) # 0.034
answer = client.ask("Which product category has the highest revenue?")
print(answer["sql"]) # 'SELECT p.category, SUM(o.total) ...'
print(answer["rows"]) # [['Electronics', 452300.0], ...]
# Check status
for entry in client.cache_status():
print(f"{entry['table']}: {entry['status']}")
# Refresh a specific table
client.cache_refresh("orders")
# Refresh all stale tables
client.cache_refresh_stale()
from dataspoc_lens import LensClient
with LensClient() as client:
# Refresh stale caches first
client.cache_refresh_stale()
# Explore all tables
for table in client.tables():
cols = client.schema(table)
count = client.query(f"SELECT COUNT(*) FROM {table}")["rows"][0][0]
print(f"{table}: {len(cols)} columns, {count:,} rows")
# Run analysis
result = client.query("""
SELECT
DATE_TRUNC('month', order_date) as month,
COUNT(*) as orders,
SUM(total) as revenue
FROM orders
WHERE order_date >= '2026-01-01'
GROUP BY month
ORDER BY month
""")
for row in result["rows"]:
print(f"{row[0]}: {row[1]:,} orders, ${row[2]:,.2f}")

Build a CrewAI data analyst agent.

from crewai import Agent, Task, Crew
from crewai.tools import tool
from dataspoc_lens import LensClient
client = LensClient()
@tool
def list_tables() -> str:
"""List all tables available in the data lake."""
return "\n".join(client.tables())
@tool
def describe_table(table: str) -> str:
"""Get the schema of a table (columns and types)."""
import json
return json.dumps(client.schema(table), indent=2)
@tool
def run_query(sql: str) -> str:
"""Execute a read-only SQL query and return results."""
import json
result = client.query(sql)
return json.dumps(result, indent=2, default=str)
@tool
def ask_question(question: str) -> str:
"""Ask a natural language question about the data."""
import json
result = client.ask(question)
return json.dumps(result, indent=2, default=str)
data_analyst = Agent(
role="Data Analyst",
goal="Analyze data in the lake and provide actionable insights",
backstory=(
"You are a senior data analyst connected to a DataSpoc Lens data lake. "
"You can discover tables, examine schemas, run SQL queries, and answer "
"natural language questions about the data."
),
tools=[list_tables, describe_table, run_query, ask_question],
verbose=True,
)
analysis_task = Task(
description=(
"Explore the data lake, understand the available tables, and produce "
"a summary of key business metrics for this week: total orders, "
"revenue, top customers, and any notable trends."
),
expected_output="A business intelligence summary with key metrics and trends.",
agent=data_analyst,
)
crew = Crew(agents=[data_analyst], tasks=[analysis_task], verbose=True)
result = crew.kickoff()
print(result)

Build a LangGraph data analysis workflow.

from langgraph.graph import StateGraph, END
from typing import TypedDict
from dataspoc_lens import LensClient
client = LensClient()
class AnalysisState(TypedDict):
tables: list[str]
schemas: dict
metrics: dict
report: str
def discover_tables(state: AnalysisState) -> AnalysisState:
"""Discover all available tables and their schemas."""
tables = client.tables()
schemas = {}
for t in tables:
schemas[t] = client.schema(t)
return {**state, "tables": tables, "schemas": schemas}
def compute_metrics(state: AnalysisState) -> AnalysisState:
"""Compute key business metrics."""
metrics = {}
# Total counts
for table in state["tables"]:
result = client.query(f"SELECT COUNT(*) FROM {table}")
metrics[f"{table}_count"] = result["rows"][0][0]
# Revenue metrics
revenue = client.query("""
SELECT
SUM(total) as total_revenue,
AVG(total) as avg_order,
COUNT(*) as total_orders
FROM orders
WHERE order_date >= CURRENT_DATE - INTERVAL '7 days'
""")
metrics["weekly_revenue"] = revenue["rows"][0][0]
metrics["avg_order_value"] = revenue["rows"][0][1]
metrics["weekly_orders"] = revenue["rows"][0][2]
return {**state, "metrics": metrics}
def generate_report(state: AnalysisState) -> AnalysisState:
"""Generate a text report from the metrics."""
m = state["metrics"]
lines = [
"Weekly Business Report",
"=" * 40,
f"Tables: {', '.join(state['tables'])}",
f"Weekly orders: {m.get('weekly_orders', 0):,}",
f"Weekly revenue: ${m.get('weekly_revenue', 0):,.2f}",
f"Avg order value: ${m.get('avg_order_value', 0):,.2f}",
]
for t in state["tables"]:
lines.append(f"{t}: {m.get(f'{t}_count', 0):,} total rows")
return {**state, "report": "\n".join(lines)}
graph = StateGraph(AnalysisState)
graph.add_node("discover", discover_tables)
graph.add_node("metrics", compute_metrics)
graph.add_node("report", generate_report)
graph.set_entry_point("discover")
graph.add_edge("discover", "metrics")
graph.add_edge("metrics", "report")
graph.add_edge("report", END)
app = graph.compile()
result = app.invoke({"tables": [], "schemas": {}, "metrics": {}, "report": ""})
print(result["report"])

Build an AutoGen multi-agent data team with a researcher and an analyst.

import autogen
from dataspoc_lens import LensClient
client = LensClient()
# Define the functions agents can call
def list_tables() -> str:
"""List all tables in the data lake."""
return ", ".join(client.tables())
def describe_table(table: str) -> str:
"""Get schema for a table."""
import json
return json.dumps(client.schema(table), indent=2)
def run_query(sql: str) -> str:
"""Execute a SQL query."""
import json
result = client.query(sql)
return json.dumps(result, indent=2, default=str)
def ask_question(question: str) -> str:
"""Ask a natural language question about the data."""
import json
result = client.ask(question)
return json.dumps(result, indent=2, default=str)
# Configuration
llm_config = {
"model": "claude-sonnet-4-20250514",
"api_type": "anthropic",
}
# Data Researcher: discovers and describes data
researcher = autogen.AssistantAgent(
name="DataResearcher",
system_message=(
"You are a data researcher. Your job is to explore the data lake, "
"discover tables, examine schemas, and understand the data structure. "
"Report your findings to the analyst."
),
llm_config=llm_config,
)
# Data Analyst: runs queries and generates insights
analyst = autogen.AssistantAgent(
name="DataAnalyst",
system_message=(
"You are a senior data analyst. Based on the researcher's findings, "
"write and execute SQL queries to answer business questions. "
"Present results clearly with numbers and trends."
),
llm_config=llm_config,
)
# User proxy to execute function calls
user_proxy = autogen.UserProxyAgent(
name="User",
human_input_mode="NEVER",
code_execution_config=False,
)
# Register functions
for func in [list_tables, describe_table, run_query, ask_question]:
autogen.register_function(
func,
caller=researcher,
executor=user_proxy,
description=func.__doc__,
)
autogen.register_function(
func,
caller=analyst,
executor=user_proxy,
description=func.__doc__,
)
# Group chat
groupchat = autogen.GroupChat(
agents=[user_proxy, researcher, analyst],
messages=[],
max_round=12,
)
manager = autogen.GroupChatManager(groupchat=groupchat, llm_config=llm_config)
user_proxy.initiate_chat(
manager,
message=(
"Explore our data lake, understand the tables, and produce a weekly "
"business report with revenue, order trends, and top customers."
),
)

All Lens CLI commands support --output json for machine-readable output.

Terminal window
dataspoc-lens tables --output json
["customers", "orders", "products"]
Terminal window
dataspoc-lens schema orders --output json
[
{"column_name": "order_id", "data_type": "INTEGER"},
{"column_name": "customer_id", "data_type": "INTEGER"},
{"column_name": "order_date", "data_type": "DATE"},
{"column_name": "total", "data_type": "DOUBLE"},
{"column_name": "status", "data_type": "VARCHAR"}
]
Terminal window
dataspoc-lens query "SELECT status, COUNT(*) as cnt FROM orders GROUP BY status" --output json | jq '.rows'
[
["completed", 32100],
["pending", 8450],
["cancelled", 2130]
]
Terminal window
dataspoc-lens query "SELECT COUNT(*) FROM orders" --output json | jq '.rows[0][0]'
148320
Terminal window
dataspoc-lens cache status --output json | jq '.[] | {table, status}'
{"table": "orders", "status": "fresh"}
{"table": "customers", "status": "stale"}
{"table": "products", "status": "fresh"}
import subprocess
import json
def lens_query(sql: str) -> dict:
result = subprocess.run(
["dataspoc-lens", "query", sql, "--output", "json"],
capture_output=True,
text=True,
check=True,
)
return json.loads(result.stdout)
data = lens_query("SELECT COUNT(*) as total FROM orders")
print(data["rows"][0][0]) # 148320
#!/bin/bash
echo "=== Daily Metrics Snapshot ==="
echo "Date: $(date +%Y-%m-%d)"
echo ""
# Refresh stale caches first
dataspoc-lens cache refresh-stale --output json | jq -r '.[] | "Refreshed: \(.table)"'
# Get key metrics
echo ""
echo "Orders today:"
dataspoc-lens query "SELECT COUNT(*) as orders, SUM(total) as revenue FROM orders WHERE order_date = CURRENT_DATE" --output json | jq -r '.rows[0] | " Count: \(.[0]), Revenue: $\(.[1])"'
echo ""
echo "Top 5 customers this week:"
dataspoc-lens query "SELECT c.name, SUM(o.total) as spent FROM customers c JOIN orders o ON c.id = o.customer_id WHERE o.order_date >= CURRENT_DATE - INTERVAL '7 days' GROUP BY c.name ORDER BY spent DESC LIMIT 5" --output json | jq -r '.rows[] | " \(.[0]): $\(.[1])"'

The most powerful setup uses both MCP servers together. An agent can ingest fresh data with Pipe, then immediately query it with Lens.

{
"mcpServers": {
"dataspoc-pipe": {
"command": "dataspoc-pipe",
"args": ["mcp"],
"env": {
"DATASPOC_BUCKET": "s3://my-data"
}
},
"dataspoc-lens": {
"command": "dataspoc-lens",
"args": ["mcp"],
"env": {
"DATASPOC_BUCKET": "s3://my-data"
}
}
}
}

You: Run the sales pipeline to get fresh data, then tell me today’s revenue.

Agent calls Pipe run_pipeline(name="sales"):

{"success": true, "streams": {"orders": 5200}, "error": null}

Agent calls Lens cache_refresh(table="orders"):

{"cached_at": "2026-04-15T14:30:00", "size_bytes": 13100000, "file_count": 4}

Agent calls Lens query(sql="SELECT SUM(total) as revenue FROM orders WHERE order_date = CURRENT_DATE"):

{"columns": ["revenue"], "rows": [[17023.50]], "row_count": 1, "duration": 0.034}

Done. The sales pipeline extracted 5,200 new records. After refreshing the cache, today’s revenue is $17,023.50.

from dataspoc_pipe.sdk import PipeClient
from dataspoc_lens import LensClient
pipe = PipeClient()
lens = LensClient()
# Step 1: Run all pipelines
for name in pipe.pipelines():
result = pipe.run(name)
if result["success"]:
total = sum(result["streams"].values())
print(f"Pipe [{name}]: {total} records")
else:
print(f"Pipe [{name}]: FAILED - {result['error']}")
# Step 2: Refresh Lens cache
lens.cache_refresh_stale()
# Step 3: Query the fresh data
result = lens.query("""
SELECT
DATE_TRUNC('day', order_date) as day,
COUNT(*) as orders,
SUM(total) as revenue
FROM orders
WHERE order_date >= CURRENT_DATE - INTERVAL '7 days'
GROUP BY day
ORDER BY day
""")
for row in result["rows"]:
print(f"{row[0]}: {row[1]} orders, ${row[2]:,.2f}")
lens.close()

If your data changes frequently, always refresh stale caches before running analytical queries. This ensures the agent works with the latest data.

client.cache_refresh_stale()
result = client.query("SELECT ...")

The query tool returns all matching rows. For large tables, always use LIMIT or aggregate queries to avoid overwhelming the agent’s context window.

# Good: bounded result
result = client.query("SELECT * FROM orders ORDER BY order_date DESC LIMIT 100")
# Good: aggregation
result = client.query("SELECT status, COUNT(*) FROM orders GROUP BY status")
# Avoid: unbounded scan of a large table
# result = client.query("SELECT * FROM orders")

The ask tool calls an LLM to generate SQL. This costs tokens and takes longer than direct SQL queries. Use query when you already know the SQL, and reserve ask for genuinely ad-hoc natural language questions.

Always check for errors in query results and ask results:

result = client.ask("What is the revenue trend?")
if result["error"]:
print(f"AI query failed: {result['error']}")
else:
for row in result["rows"]:
print(row)

Lens enforces read-only queries at the server level. Write operations are rejected. However, agents still operate with the permissions of the user who started the MCP server. Ensure the bucket IAM policies are properly configured to restrict access.

Complex queries over large datasets may take longer. If your agent framework has a timeout, set it appropriately (30 seconds or more for analytical queries). The duration field in query responses helps calibrate expectations.