Ir al contenido

Python SDK

Tanto Lens como Pipe exponen clientes Python para acceso programatico. Usalos en frameworks de agentes de IA como CrewAI, LangGraph y AutoGen, o en cualquier script Python.

Ventana de terminal
pip install dataspoc-lens
from dataspoc_lens import LensClient
with LensClient() as client:
# Discover tables
tables = client.tables()
for table in tables:
print(f"{table['name']} - {table['row_count']} rows")
# Get table schema
schema = client.schema("raw.my_source.orders")
for col in schema["columns"]:
print(f" {col['name']}: {col['type']}")
# Run SQL query
result = client.query("""
SELECT customer, SUM(revenue) as total
FROM raw.my_source.orders
GROUP BY customer
ORDER BY total DESC
LIMIT 10
""")
print(result)
# Ask in natural language
answer = client.ask("top customers by revenue")
print(answer)
from dataspoc_lens import LensClient
with LensClient() as client:
# Check cache freshness
status = client.cache_status()
print(f"Tables cached: {status['cached_tables']}")
print(f"Stale tables: {status['stale_tables']}")
# Refresh all cache
client.cache_refresh()
# Refresh only stale tables
client.cache_refresh_stale()
# Clear cache entirely
client.cache_clear()
MetodoRetornaDescripcion
tables()list[dict]Lista todas las tablas con metadatos
schema(table)dictNombres de columnas, tipos y estadisticas
query(sql)dictEjecutar SQL, retorna filas y columnas
ask(question)dictPregunta en lenguaje natural, retorna SQL + resultado
cache_status()dictFrescura y tamano del cache por tabla
cache_refresh()NoneActualizar todos los datos cacheados
cache_refresh_stale()NoneActualizar solo tablas con cache desactualizado
cache_clear()NoneEliminar todos los datos cacheados
close()NoneLiberar recursos (se llama automaticamente con with)
Ventana de terminal
pip install dataspoc-pipe
from dataspoc_pipe import PipeClient
client = PipeClient()
# List all pipelines
pipelines = client.pipelines()
for p in pipelines:
print(f"{p['name']} - {p['status']}")
# View pipeline configuration
config = client.config("my-source")
print(f"Source type: {config['source_type']}")
print(f"Tables: {config['tables']}")
# Run a pipeline
result = client.run("my-source")
print(f"Status: {result['status']}")
print(f"Rows synced: {result['rows_synced']}")
# Check status
status = client.status("my-source")
print(f"Last run: {status['last_run']}")
print(f"Next run: {status['next_run']}")
# View logs
logs = client.logs("my-source")
for entry in logs:
print(f"[{entry['timestamp']}] {entry['message']}")
from dataspoc_pipe import PipeClient
client = PipeClient()
# Read the bucket manifest
manifest = client.manifest()
print(f"Tables: {len(manifest['tables'])}")
for table in manifest["tables"]:
print(f" {table['path']} - {table['row_count']} rows")
# Validate pipeline config before running
validation = client.validate("my-source")
if validation["valid"]:
client.run("my-source")
else:
for error in validation["errors"]:
print(f"Error: {error}")
MetodoRetornaDescripcion
pipelines()list[dict]Lista todos los pipelines configurados
config(name)dictDetalles de configuracion del pipeline
run(name)dictEjecutar pipeline, retorna estado y conteo de filas
status(name)dictHora del ultimo run, estado, proximo run programado
logs(name)list[dict]Entradas de log de ejecucion con marcas de tiempo
manifest()dictManifiesto completo del bucket con todas las tablas
validate(name)dictValidar configuracion, retorna errores si los hay
from crewai import Agent, Task, Crew
from dataspoc_lens import LensClient
client = LensClient()
analyst = Agent(
role="Data Analyst",
goal="Answer business questions using the data lake",
tools=[client.ask, client.query, client.tables],
)
task = Task(
description="Find the top 5 products by revenue last quarter",
agent=analyst,
)
crew = Crew(agents=[analyst], tasks=[task])
crew.kickoff()
from langgraph.graph import StateGraph
from dataspoc_lens import LensClient
client = LensClient()
def discover_tables(state):
state["tables"] = client.tables()
return state
def query_data(state):
state["result"] = client.query(state["sql"])
return state
graph = StateGraph()
graph.add_node("discover", discover_tables)
graph.add_node("query", query_data)
graph.add_edge("discover", "query")