Ir al contenido

Transforms

Esta página aún no está disponible en tu idioma.

Pipe supports optional Python transforms that run during ingestion, letting you clean or reshape data before it is written to Parquet.

Transforms are convention-based. No configuration is needed. If a Python file exists at:

~/.dataspoc-pipe/transforms/<pipeline_name>.py

and it defines a function def transform(df), Pipe automatically calls it for each batch of records during ingestion.

[Singer Tap] → stdout → [Pipe Engine] → transform(df) → Parquet → [Bucket]
  1. Create a Python file named after your pipeline in the transforms directory:
Terminal window
# For a pipeline called "orders"
vim ~/.dataspoc-pipe/transforms/orders.py
  1. Define a transform function that takes a Pandas DataFrame and returns a Pandas DataFrame:
import pandas as pd
def transform(df: pd.DataFrame) -> pd.DataFrame:
"""Clean orders data during ingestion."""
# Drop rows with null IDs
df = df.dropna(subset=["id"])
# Normalize email to lowercase
if "email" in df.columns:
df["email"] = df["email"].str.lower().str.strip()
# Convert amount to float
if "amount" in df.columns:
df["amount"] = pd.to_numeric(df["amount"], errors="coerce")
return df

That is it. The next time you run dataspoc-pipe run orders, the transform will apply automatically.

  • The transform runs per batch (approximately 10,000 records per batch).
  • The function receives a Pandas DataFrame and must return a Pandas DataFrame.
  • The returned DataFrame can have fewer rows (filtering), modified values (cleaning), or additional columns (enrichment).
  • The function should be stateless — it should not depend on data from previous batches.

If the transform function raises an exception:

  1. Pipe logs a warning with the error details.
  2. The raw, untransformed data is written to Parquet.
  3. The pipeline continues processing the next batch.

This design ensures that a buggy transform never causes data loss. You can fix the transform and re-run with --full to reprocess all data.

When a transform modifies the record count (e.g., by filtering rows), the CLI output shows both counts:

Running: orders
orders: 5,000 records...
Done! 4,800 records in 1 stream(s)
orders: 5,000 raw → 4,800 after transform
import pandas as pd
from datetime import datetime, timezone
def transform(df: pd.DataFrame) -> pd.DataFrame:
"""Filter invalid records and add processing metadata."""
# Remove test records
df = df[~df["email"].str.contains("@test.com", na=False)]
# Remove duplicates within the batch
df = df.drop_duplicates(subset=["id"], keep="last")
# Add processing timestamp
df["_processed_at"] = datetime.now(timezone.utc).isoformat()
return df
import pandas as pd
def transform(df: pd.DataFrame) -> pd.DataFrame:
"""Ensure consistent types for downstream queries."""
# Ensure numeric columns
for col in ["price", "quantity", "discount"]:
if col in df.columns:
df[col] = pd.to_numeric(df[col], errors="coerce").fillna(0)
# Ensure date columns
for col in ["created_at", "updated_at"]:
if col in df.columns:
df[col] = pd.to_datetime(df[col], errors="coerce")
# Standardize country codes to uppercase
if "country" in df.columns:
df["country"] = df["country"].str.upper().str.strip()
return df
  • Keep transforms simple. Heavy transformations belong in the query layer (DataSpoc Lens), not during ingestion.
  • Test your transform function independently before deploying:
import pandas as pd
from transforms.orders import transform
df = pd.read_csv("/tmp/sample.csv")
result = transform(df)
print(result.head())
  • If you do not need a transform, simply do not create the file. There is no performance overhead when no transform exists.