Transforms
Esta página aún no está disponible en tu idioma.
Pipe supports optional Python transforms that run during ingestion, letting you clean or reshape data before it is written to Parquet.
How it works
Section titled “How it works”Transforms are convention-based. No configuration is needed. If a Python file exists at:
~/.dataspoc-pipe/transforms/<pipeline_name>.pyand it defines a function def transform(df), Pipe automatically calls it for each batch of records during ingestion.
[Singer Tap] → stdout → [Pipe Engine] → transform(df) → Parquet → [Bucket]Creating a transform
Section titled “Creating a transform”- Create a Python file named after your pipeline in the transforms directory:
# For a pipeline called "orders"vim ~/.dataspoc-pipe/transforms/orders.py- Define a
transformfunction that takes a Pandas DataFrame and returns a Pandas DataFrame:
import pandas as pd
def transform(df: pd.DataFrame) -> pd.DataFrame: """Clean orders data during ingestion.""" # Drop rows with null IDs df = df.dropna(subset=["id"])
# Normalize email to lowercase if "email" in df.columns: df["email"] = df["email"].str.lower().str.strip()
# Convert amount to float if "amount" in df.columns: df["amount"] = pd.to_numeric(df["amount"], errors="coerce")
return dfThat is it. The next time you run dataspoc-pipe run orders, the transform will apply automatically.
Execution details
Section titled “Execution details”- The transform runs per batch (approximately 10,000 records per batch).
- The function receives a Pandas DataFrame and must return a Pandas DataFrame.
- The returned DataFrame can have fewer rows (filtering), modified values (cleaning), or additional columns (enrichment).
- The function should be stateless — it should not depend on data from previous batches.
Fail-safe behavior
Section titled “Fail-safe behavior”If the transform function raises an exception:
- Pipe logs a warning with the error details.
- The raw, untransformed data is written to Parquet.
- The pipeline continues processing the next batch.
This design ensures that a buggy transform never causes data loss. You can fix the transform and re-run with --full to reprocess all data.
Monitoring transforms
Section titled “Monitoring transforms”When a transform modifies the record count (e.g., by filtering rows), the CLI output shows both counts:
Running: orders orders: 5,000 records...Done! 4,800 records in 1 stream(s) orders: 5,000 raw → 4,800 after transformExample: filtering and enrichment
Section titled “Example: filtering and enrichment”import pandas as pdfrom datetime import datetime, timezone
def transform(df: pd.DataFrame) -> pd.DataFrame: """Filter invalid records and add processing metadata.""" # Remove test records df = df[~df["email"].str.contains("@test.com", na=False)]
# Remove duplicates within the batch df = df.drop_duplicates(subset=["id"], keep="last")
# Add processing timestamp df["_processed_at"] = datetime.now(timezone.utc).isoformat()
return dfExample: type normalization
Section titled “Example: type normalization”import pandas as pd
def transform(df: pd.DataFrame) -> pd.DataFrame: """Ensure consistent types for downstream queries.""" # Ensure numeric columns for col in ["price", "quantity", "discount"]: if col in df.columns: df[col] = pd.to_numeric(df[col], errors="coerce").fillna(0)
# Ensure date columns for col in ["created_at", "updated_at"]: if col in df.columns: df[col] = pd.to_datetime(df[col], errors="coerce")
# Standardize country codes to uppercase if "country" in df.columns: df["country"] = df["country"].str.upper().str.strip()
return df- Keep transforms simple. Heavy transformations belong in the query layer (DataSpoc Lens), not during ingestion.
- Test your transform function independently before deploying:
import pandas as pdfrom transforms.orders import transform
df = pd.read_csv("/tmp/sample.csv")result = transform(df)print(result.head())- If you do not need a transform, simply do not create the file. There is no performance overhead when no transform exists.