Pular para o conteúdo

Python SDK

Tanto o Lens quanto o Pipe expõem clientes Python para acesso programático. Use-os em frameworks de agentes de IA como CrewAI, LangGraph e AutoGen, ou em qualquer script Python.

Terminal window
pip install dataspoc-lens
from dataspoc_lens import LensClient
with LensClient() as client:
# Discover tables
tables = client.tables()
for table in tables:
print(f"{table['name']} - {table['row_count']} rows")
# Get table schema
schema = client.schema("raw.my_source.orders")
for col in schema["columns"]:
print(f" {col['name']}: {col['type']}")
# Run SQL query
result = client.query("""
SELECT customer, SUM(revenue) as total
FROM raw.my_source.orders
GROUP BY customer
ORDER BY total DESC
LIMIT 10
""")
print(result)
# Ask in natural language
answer = client.ask("top customers by revenue")
print(answer)
from dataspoc_lens import LensClient
with LensClient() as client:
# Check cache freshness
status = client.cache_status()
print(f"Tables cached: {status['cached_tables']}")
print(f"Stale tables: {status['stale_tables']}")
# Refresh all cache
client.cache_refresh()
# Refresh only stale tables
client.cache_refresh_stale()
# Clear cache entirely
client.cache_clear()
MétodoRetornaDescrição
tables()list[dict]Lista todas as tabelas com metadados
schema(table)dictNomes de colunas, tipos e estatísticas
query(sql)dictExecuta SQL, retorna linhas e colunas
ask(question)dictPergunta em linguagem natural, retorna SQL + resultado
cache_status()dictFrescor e tamanho do cache por tabela
cache_refresh()NoneAtualiza todos os dados cacheados
cache_refresh_stale()NoneAtualiza apenas tabelas com cache desatualizado
cache_clear()NoneRemove todos os dados cacheados
close()NoneLibera recursos (chamado automaticamente com with)
Terminal window
pip install dataspoc-pipe
from dataspoc_pipe import PipeClient
client = PipeClient()
# List all pipelines
pipelines = client.pipelines()
for p in pipelines:
print(f"{p['name']} - {p['status']}")
# View pipeline configuration
config = client.config("my-source")
print(f"Source type: {config['source_type']}")
print(f"Tables: {config['tables']}")
# Run a pipeline
result = client.run("my-source")
print(f"Status: {result['status']}")
print(f"Rows synced: {result['rows_synced']}")
# Check status
status = client.status("my-source")
print(f"Last run: {status['last_run']}")
print(f"Next run: {status['next_run']}")
# View logs
logs = client.logs("my-source")
for entry in logs:
print(f"[{entry['timestamp']}] {entry['message']}")
from dataspoc_pipe import PipeClient
client = PipeClient()
# Read the bucket manifest
manifest = client.manifest()
print(f"Tables: {len(manifest['tables'])}")
for table in manifest["tables"]:
print(f" {table['path']} - {table['row_count']} rows")
# Validate pipeline config before running
validation = client.validate("my-source")
if validation["valid"]:
client.run("my-source")
else:
for error in validation["errors"]:
print(f"Error: {error}")
MétodoRetornaDescrição
pipelines()list[dict]Lista todos os pipelines configurados
config(name)dictDetalhes de configuração do pipeline
run(name)dictExecuta o pipeline, retorna status e contagem de linhas
status(name)dictÚltimo run, status, próxima execução agendada
logs(name)list[dict]Entradas de log com timestamp
manifest()dictManifest completo do bucket com todas as tabelas
validate(name)dictValida configuração, retorna erros se houver
from crewai import Agent, Task, Crew
from dataspoc_lens import LensClient
client = LensClient()
analyst = Agent(
role="Data Analyst",
goal="Answer business questions using the data lake",
tools=[client.ask, client.query, client.tables],
)
task = Task(
description="Find the top 5 products by revenue last quarter",
agent=analyst,
)
crew = Crew(agents=[analyst], tasks=[task])
crew.kickoff()
from langgraph.graph import StateGraph
from dataspoc_lens import LensClient
client = LensClient()
def discover_tables(state):
state["tables"] = client.tables()
return state
def query_data(state):
state["result"] = client.query(state["sql"])
return state
graph = StateGraph()
graph.add_node("discover", discover_tables)
graph.add_node("query", query_data)
graph.add_edge("discover", "query")