Python SDK
Both Lens and Pipe expose Python clients for programmatic access. Use them in AI agent frameworks like CrewAI, LangGraph, and AutoGen, or in any Python script.
LensClient
Section titled “LensClient”Install
Section titled “Install”pip install dataspoc-lensBasic Usage
Section titled “Basic Usage”from dataspoc_lens import LensClient
with LensClient() as client: # Discover tables tables = client.tables() for table in tables: print(f"{table['name']} - {table['row_count']} rows")
# Get table schema schema = client.schema("raw.my_source.orders") for col in schema["columns"]: print(f" {col['name']}: {col['type']}")
# Run SQL query result = client.query(""" SELECT customer, SUM(revenue) as total FROM raw.my_source.orders GROUP BY customer ORDER BY total DESC LIMIT 10 """) print(result)
# Ask in natural language answer = client.ask("top customers by revenue") print(answer)Cache Management
Section titled “Cache Management”from dataspoc_lens import LensClient
with LensClient() as client: # Check cache freshness status = client.cache_status() print(f"Tables cached: {status['cached_tables']}") print(f"Stale tables: {status['stale_tables']}")
# Refresh all cache client.cache_refresh()
# Refresh only stale tables client.cache_refresh_stale()
# Clear cache entirely client.cache_clear()Full API Reference
Section titled “Full API Reference”| Method | Returns | Description |
|---|---|---|
tables() | list[dict] | List all tables with metadata |
schema(table) | dict | Column names, types, and stats |
query(sql) | dict | Execute SQL, return rows and columns |
ask(question) | dict | Natural language question, returns SQL + result |
cache_status() | dict | Cache freshness and size per table |
cache_refresh() | None | Refresh all cached data |
cache_refresh_stale() | None | Refresh only tables with outdated cache |
cache_clear() | None | Remove all cached data |
close() | None | Release resources (called automatically with with) |
PipeClient
Section titled “PipeClient”Install
Section titled “Install”pip install dataspoc-pipeBasic Usage
Section titled “Basic Usage”from dataspoc_pipe import PipeClient
client = PipeClient()
# List all pipelinespipelines = client.pipelines()for p in pipelines: print(f"{p['name']} - {p['status']}")
# View pipeline configurationconfig = client.config("my-source")print(f"Source type: {config['source_type']}")print(f"Tables: {config['tables']}")
# Run a pipelineresult = client.run("my-source")print(f"Status: {result['status']}")print(f"Rows synced: {result['rows_synced']}")
# Check statusstatus = client.status("my-source")print(f"Last run: {status['last_run']}")print(f"Next run: {status['next_run']}")
# View logslogs = client.logs("my-source")for entry in logs: print(f"[{entry['timestamp']}] {entry['message']}")Manifest and Validation
Section titled “Manifest and Validation”from dataspoc_pipe import PipeClient
client = PipeClient()
# Read the bucket manifestmanifest = client.manifest()print(f"Tables: {len(manifest['tables'])}")for table in manifest["tables"]: print(f" {table['path']} - {table['row_count']} rows")
# Validate pipeline config before runningvalidation = client.validate("my-source")if validation["valid"]: client.run("my-source")else: for error in validation["errors"]: print(f"Error: {error}")Full API Reference
Section titled “Full API Reference”| Method | Returns | Description |
|---|---|---|
pipelines() | list[dict] | List all configured pipelines |
config(name) | dict | Pipeline configuration details |
run(name) | dict | Execute pipeline, return status and row count |
status(name) | dict | Last run time, status, next scheduled run |
logs(name) | list[dict] | Timestamped execution log entries |
manifest() | dict | Full bucket manifest with all tables |
validate(name) | dict | Validate config, return errors if any |
Agent Framework Examples
Section titled “Agent Framework Examples”CrewAI
Section titled “CrewAI”from crewai import Agent, Task, Crewfrom dataspoc_lens import LensClient
client = LensClient()
analyst = Agent( role="Data Analyst", goal="Answer business questions using the data lake", tools=[client.ask, client.query, client.tables],)
task = Task( description="Find the top 5 products by revenue last quarter", agent=analyst,)
crew = Crew(agents=[analyst], tasks=[task])crew.kickoff()LangGraph
Section titled “LangGraph”from langgraph.graph import StateGraphfrom dataspoc_lens import LensClient
client = LensClient()
def discover_tables(state): state["tables"] = client.tables() return state
def query_data(state): state["result"] = client.query(state["sql"]) return state
graph = StateGraph()graph.add_node("discover", discover_tables)graph.add_node("query", query_data)graph.add_edge("discover", "query")