Python SDK
Tanto o Lens quanto o Pipe expõem clientes Python para acesso programático. Use-os em frameworks de agentes de IA como CrewAI, LangGraph e AutoGen, ou em qualquer script Python.
LensClient
Seção intitulada “LensClient”Instalar
Seção intitulada “Instalar”pip install dataspoc-lensUso Básico
Seção intitulada “Uso Básico”from dataspoc_lens import LensClient
with LensClient() as client: # Discover tables tables = client.tables() for table in tables: print(f"{table['name']} - {table['row_count']} rows")
# Get table schema schema = client.schema("raw.my_source.orders") for col in schema["columns"]: print(f" {col['name']}: {col['type']}")
# Run SQL query result = client.query(""" SELECT customer, SUM(revenue) as total FROM raw.my_source.orders GROUP BY customer ORDER BY total DESC LIMIT 10 """) print(result)
# Ask in natural language answer = client.ask("top customers by revenue") print(answer)Gerenciamento de Cache
Seção intitulada “Gerenciamento de Cache”from dataspoc_lens import LensClient
with LensClient() as client: # Check cache freshness status = client.cache_status() print(f"Tables cached: {status['cached_tables']}") print(f"Stale tables: {status['stale_tables']}")
# Refresh all cache client.cache_refresh()
# Refresh only stale tables client.cache_refresh_stale()
# Clear cache entirely client.cache_clear()Referência Completa da API
Seção intitulada “Referência Completa da API”| Método | Retorna | Descrição |
|---|---|---|
tables() | list[dict] | Lista todas as tabelas com metadados |
schema(table) | dict | Nomes de colunas, tipos e estatísticas |
query(sql) | dict | Executa SQL, retorna linhas e colunas |
ask(question) | dict | Pergunta em linguagem natural, retorna SQL + resultado |
cache_status() | dict | Frescor e tamanho do cache por tabela |
cache_refresh() | None | Atualiza todos os dados cacheados |
cache_refresh_stale() | None | Atualiza apenas tabelas com cache desatualizado |
cache_clear() | None | Remove todos os dados cacheados |
close() | None | Libera recursos (chamado automaticamente com with) |
PipeClient
Seção intitulada “PipeClient”Instalar
Seção intitulada “Instalar”pip install dataspoc-pipeUso Básico
Seção intitulada “Uso Básico”from dataspoc_pipe import PipeClient
client = PipeClient()
# List all pipelinespipelines = client.pipelines()for p in pipelines: print(f"{p['name']} - {p['status']}")
# View pipeline configurationconfig = client.config("my-source")print(f"Source type: {config['source_type']}")print(f"Tables: {config['tables']}")
# Run a pipelineresult = client.run("my-source")print(f"Status: {result['status']}")print(f"Rows synced: {result['rows_synced']}")
# Check statusstatus = client.status("my-source")print(f"Last run: {status['last_run']}")print(f"Next run: {status['next_run']}")
# View logslogs = client.logs("my-source")for entry in logs: print(f"[{entry['timestamp']}] {entry['message']}")Manifest e Validação
Seção intitulada “Manifest e Validação”from dataspoc_pipe import PipeClient
client = PipeClient()
# Read the bucket manifestmanifest = client.manifest()print(f"Tables: {len(manifest['tables'])}")for table in manifest["tables"]: print(f" {table['path']} - {table['row_count']} rows")
# Validate pipeline config before runningvalidation = client.validate("my-source")if validation["valid"]: client.run("my-source")else: for error in validation["errors"]: print(f"Error: {error}")Referência Completa da API
Seção intitulada “Referência Completa da API”| Método | Retorna | Descrição |
|---|---|---|
pipelines() | list[dict] | Lista todos os pipelines configurados |
config(name) | dict | Detalhes de configuração do pipeline |
run(name) | dict | Executa o pipeline, retorna status e contagem de linhas |
status(name) | dict | Último run, status, próxima execução agendada |
logs(name) | list[dict] | Entradas de log com timestamp |
manifest() | dict | Manifest completo do bucket com todas as tabelas |
validate(name) | dict | Valida configuração, retorna erros se houver |
Exemplos com Frameworks de Agentes
Seção intitulada “Exemplos com Frameworks de Agentes”from crewai import Agent, Task, Crewfrom dataspoc_lens import LensClient
client = LensClient()
analyst = Agent( role="Data Analyst", goal="Answer business questions using the data lake", tools=[client.ask, client.query, client.tables],)
task = Task( description="Find the top 5 products by revenue last quarter", agent=analyst,)
crew = Crew(agents=[analyst], tasks=[task])crew.kickoff()LangGraph
Seção intitulada “LangGraph”from langgraph.graph import StateGraphfrom dataspoc_lens import LensClient
client = LensClient()
def discover_tables(state): state["tables"] = client.tables() return state
def query_data(state): state["result"] = client.query(state["sql"]) return state
graph = StateGraph()graph.add_node("discover", discover_tables)graph.add_node("query", query_data)graph.add_edge("discover", "query")