Python SDK
Tanto Lens como Pipe exponen clientes Python para acceso programatico. Usalos en frameworks de agentes de IA como CrewAI, LangGraph y AutoGen, o en cualquier script Python.
LensClient
Sección titulada «LensClient»Instalar
Sección titulada «Instalar»pip install dataspoc-lensUso Basico
Sección titulada «Uso Basico»from dataspoc_lens import LensClient
with LensClient() as client: # Discover tables tables = client.tables() for table in tables: print(f"{table['name']} - {table['row_count']} rows")
# Get table schema schema = client.schema("raw.my_source.orders") for col in schema["columns"]: print(f" {col['name']}: {col['type']}")
# Run SQL query result = client.query(""" SELECT customer, SUM(revenue) as total FROM raw.my_source.orders GROUP BY customer ORDER BY total DESC LIMIT 10 """) print(result)
# Ask in natural language answer = client.ask("top customers by revenue") print(answer)Gestion de Cache
Sección titulada «Gestion de Cache»from dataspoc_lens import LensClient
with LensClient() as client: # Check cache freshness status = client.cache_status() print(f"Tables cached: {status['cached_tables']}") print(f"Stale tables: {status['stale_tables']}")
# Refresh all cache client.cache_refresh()
# Refresh only stale tables client.cache_refresh_stale()
# Clear cache entirely client.cache_clear()Referencia Completa del API
Sección titulada «Referencia Completa del API»| Metodo | Retorna | Descripcion |
|---|---|---|
tables() | list[dict] | Lista todas las tablas con metadatos |
schema(table) | dict | Nombres de columnas, tipos y estadisticas |
query(sql) | dict | Ejecutar SQL, retorna filas y columnas |
ask(question) | dict | Pregunta en lenguaje natural, retorna SQL + resultado |
cache_status() | dict | Frescura y tamano del cache por tabla |
cache_refresh() | None | Actualizar todos los datos cacheados |
cache_refresh_stale() | None | Actualizar solo tablas con cache desactualizado |
cache_clear() | None | Eliminar todos los datos cacheados |
close() | None | Liberar recursos (se llama automaticamente con with) |
PipeClient
Sección titulada «PipeClient»Instalar
Sección titulada «Instalar»pip install dataspoc-pipeUso Basico
Sección titulada «Uso Basico»from dataspoc_pipe import PipeClient
client = PipeClient()
# List all pipelinespipelines = client.pipelines()for p in pipelines: print(f"{p['name']} - {p['status']}")
# View pipeline configurationconfig = client.config("my-source")print(f"Source type: {config['source_type']}")print(f"Tables: {config['tables']}")
# Run a pipelineresult = client.run("my-source")print(f"Status: {result['status']}")print(f"Rows synced: {result['rows_synced']}")
# Check statusstatus = client.status("my-source")print(f"Last run: {status['last_run']}")print(f"Next run: {status['next_run']}")
# View logslogs = client.logs("my-source")for entry in logs: print(f"[{entry['timestamp']}] {entry['message']}")Manifiesto y Validacion
Sección titulada «Manifiesto y Validacion»from dataspoc_pipe import PipeClient
client = PipeClient()
# Read the bucket manifestmanifest = client.manifest()print(f"Tables: {len(manifest['tables'])}")for table in manifest["tables"]: print(f" {table['path']} - {table['row_count']} rows")
# Validate pipeline config before runningvalidation = client.validate("my-source")if validation["valid"]: client.run("my-source")else: for error in validation["errors"]: print(f"Error: {error}")Referencia Completa del API
Sección titulada «Referencia Completa del API»| Metodo | Retorna | Descripcion |
|---|---|---|
pipelines() | list[dict] | Lista todos los pipelines configurados |
config(name) | dict | Detalles de configuracion del pipeline |
run(name) | dict | Ejecutar pipeline, retorna estado y conteo de filas |
status(name) | dict | Hora del ultimo run, estado, proximo run programado |
logs(name) | list[dict] | Entradas de log de ejecucion con marcas de tiempo |
manifest() | dict | Manifiesto completo del bucket con todas las tablas |
validate(name) | dict | Validar configuracion, retorna errores si los hay |
Ejemplos con Frameworks de Agentes
Sección titulada «Ejemplos con Frameworks de Agentes»from crewai import Agent, Task, Crewfrom dataspoc_lens import LensClient
client = LensClient()
analyst = Agent( role="Data Analyst", goal="Answer business questions using the data lake", tools=[client.ask, client.query, client.tables],)
task = Task( description="Find the top 5 products by revenue last quarter", agent=analyst,)
crew = Crew(agents=[analyst], tasks=[task])crew.kickoff()LangGraph
Sección titulada «LangGraph»from langgraph.graph import StateGraphfrom dataspoc_lens import LensClient
client = LensClient()
def discover_tables(state): state["tables"] = client.tables() return state
def query_data(state): state["result"] = client.query(state["sql"]) return state
graph = StateGraph()graph.add_node("discover", discover_tables)graph.add_node("query", query_data)graph.add_edge("discover", "query")