Skip to content

Transforms

Pipe supports optional Python transforms that run during ingestion, letting you clean or reshape data before it is written to Parquet.

Transforms are convention-based. No configuration is needed. If a Python file exists at:

~/.dataspoc-pipe/transforms/<pipeline_name>.py

and it defines a function def transform(df), Pipe automatically calls it for each batch of records during ingestion.

[Singer Tap] → stdout → [Pipe Engine] → transform(df) → Parquet → [Bucket]
  1. Create a Python file named after your pipeline in the transforms directory:
Terminal window
# For a pipeline called "orders"
vim ~/.dataspoc-pipe/transforms/orders.py
  1. Define a transform function that takes a Pandas DataFrame and returns a Pandas DataFrame:
import pandas as pd
def transform(df: pd.DataFrame) -> pd.DataFrame:
"""Clean orders data during ingestion."""
# Drop rows with null IDs
df = df.dropna(subset=["id"])
# Normalize email to lowercase
if "email" in df.columns:
df["email"] = df["email"].str.lower().str.strip()
# Convert amount to float
if "amount" in df.columns:
df["amount"] = pd.to_numeric(df["amount"], errors="coerce")
return df

That is it. The next time you run dataspoc-pipe run orders, the transform will apply automatically.

  • The transform runs per batch (approximately 10,000 records per batch).
  • The function receives a Pandas DataFrame and must return a Pandas DataFrame.
  • The returned DataFrame can have fewer rows (filtering), modified values (cleaning), or additional columns (enrichment).
  • The function should be stateless — it should not depend on data from previous batches.

If the transform function raises an exception:

  1. Pipe logs a warning with the error details.
  2. The raw, untransformed data is written to Parquet.
  3. The pipeline continues processing the next batch.

This design ensures that a buggy transform never causes data loss. You can fix the transform and re-run with --full to reprocess all data.

When a transform modifies the record count (e.g., by filtering rows), the CLI output shows both counts:

Running: orders
orders: 5,000 records...
Done! 4,800 records in 1 stream(s)
orders: 5,000 raw → 4,800 after transform
import pandas as pd
from datetime import datetime, timezone
def transform(df: pd.DataFrame) -> pd.DataFrame:
"""Filter invalid records and add processing metadata."""
# Remove test records
df = df[~df["email"].str.contains("@test.com", na=False)]
# Remove duplicates within the batch
df = df.drop_duplicates(subset=["id"], keep="last")
# Add processing timestamp
df["_processed_at"] = datetime.now(timezone.utc).isoformat()
return df
import pandas as pd
def transform(df: pd.DataFrame) -> pd.DataFrame:
"""Ensure consistent types for downstream queries."""
# Ensure numeric columns
for col in ["price", "quantity", "discount"]:
if col in df.columns:
df[col] = pd.to_numeric(df[col], errors="coerce").fillna(0)
# Ensure date columns
for col in ["created_at", "updated_at"]:
if col in df.columns:
df[col] = pd.to_datetime(df[col], errors="coerce")
# Standardize country codes to uppercase
if "country" in df.columns:
df["country"] = df["country"].str.upper().str.strip()
return df
  • Keep transforms simple. Heavy transformations belong in the query layer (DataSpoc Lens), not during ingestion.
  • Test your transform function independently before deploying:
import pandas as pd
from transforms.orders import transform
df = pd.read_csv("/tmp/sample.csv")
result = transform(df)
print(result.head())
  • If you do not need a transform, simply do not create the file. There is no performance overhead when no transform exists.