Skip to content

Python SDK

Both Lens and Pipe expose Python clients for programmatic access. Use them in AI agent frameworks like CrewAI, LangGraph, and AutoGen, or in any Python script.

Terminal window
pip install dataspoc-lens
from dataspoc_lens import LensClient
with LensClient() as client:
# Discover tables
tables = client.tables()
for table in tables:
print(f"{table['name']} - {table['row_count']} rows")
# Get table schema
schema = client.schema("raw.my_source.orders")
for col in schema["columns"]:
print(f" {col['name']}: {col['type']}")
# Run SQL query
result = client.query("""
SELECT customer, SUM(revenue) as total
FROM raw.my_source.orders
GROUP BY customer
ORDER BY total DESC
LIMIT 10
""")
print(result)
# Ask in natural language
answer = client.ask("top customers by revenue")
print(answer)
from dataspoc_lens import LensClient
with LensClient() as client:
# Check cache freshness
status = client.cache_status()
print(f"Tables cached: {status['cached_tables']}")
print(f"Stale tables: {status['stale_tables']}")
# Refresh all cache
client.cache_refresh()
# Refresh only stale tables
client.cache_refresh_stale()
# Clear cache entirely
client.cache_clear()
MethodReturnsDescription
tables()list[dict]List all tables with metadata
schema(table)dictColumn names, types, and stats
query(sql)dictExecute SQL, return rows and columns
ask(question)dictNatural language question, returns SQL + result
cache_status()dictCache freshness and size per table
cache_refresh()NoneRefresh all cached data
cache_refresh_stale()NoneRefresh only tables with outdated cache
cache_clear()NoneRemove all cached data
close()NoneRelease resources (called automatically with with)
Terminal window
pip install dataspoc-pipe
from dataspoc_pipe import PipeClient
client = PipeClient()
# List all pipelines
pipelines = client.pipelines()
for p in pipelines:
print(f"{p['name']} - {p['status']}")
# View pipeline configuration
config = client.config("my-source")
print(f"Source type: {config['source_type']}")
print(f"Tables: {config['tables']}")
# Run a pipeline
result = client.run("my-source")
print(f"Status: {result['status']}")
print(f"Rows synced: {result['rows_synced']}")
# Check status
status = client.status("my-source")
print(f"Last run: {status['last_run']}")
print(f"Next run: {status['next_run']}")
# View logs
logs = client.logs("my-source")
for entry in logs:
print(f"[{entry['timestamp']}] {entry['message']}")
from dataspoc_pipe import PipeClient
client = PipeClient()
# Read the bucket manifest
manifest = client.manifest()
print(f"Tables: {len(manifest['tables'])}")
for table in manifest["tables"]:
print(f" {table['path']} - {table['row_count']} rows")
# Validate pipeline config before running
validation = client.validate("my-source")
if validation["valid"]:
client.run("my-source")
else:
for error in validation["errors"]:
print(f"Error: {error}")
MethodReturnsDescription
pipelines()list[dict]List all configured pipelines
config(name)dictPipeline configuration details
run(name)dictExecute pipeline, return status and row count
status(name)dictLast run time, status, next scheduled run
logs(name)list[dict]Timestamped execution log entries
manifest()dictFull bucket manifest with all tables
validate(name)dictValidate config, return errors if any
from crewai import Agent, Task, Crew
from dataspoc_lens import LensClient
client = LensClient()
analyst = Agent(
role="Data Analyst",
goal="Answer business questions using the data lake",
tools=[client.ask, client.query, client.tables],
)
task = Task(
description="Find the top 5 products by revenue last quarter",
agent=analyst,
)
crew = Crew(agents=[analyst], tasks=[task])
crew.kickoff()
from langgraph.graph import StateGraph
from dataspoc_lens import LensClient
client = LensClient()
def discover_tables(state):
state["tables"] = client.tables()
return state
def query_data(state):
state["result"] = client.query(state["sql"])
return state
graph = StateGraph()
graph.add_node("discover", discover_tables)
graph.add_node("query", query_data)
graph.add_edge("discover", "query")