A common misconception about machine learning systems is that they’re mostly model code — the elegant lines where you fit a random forest or tune a gradient boosting classifier. In practice, the model is a small box in the centre of a much larger system. The rest is data pipelines: extracting raw data from sources, transforming it into features the model can consume, loading it into the right place at the right time, and doing all of this reliably, repeatedly, and at scale.
Google’s influential paper on technical debt in ML systems called this “glue code” — the infrastructure that connects data sources to models and models to decisions. In their experience, the model itself accounted for roughly 5% of the total code in a production ML system. The other 95% was data collection, feature extraction, validation, serving infrastructure, and monitoring.
If you’ve built microservices, this ratio will feel familiar. The business logic in a typical service is a thin layer; the rest is routing, serialisation, error handling, retries, logging, and health checks. Data pipelines are the same pattern applied to data: the statistical transformation is the thin layer, surrounded by engineering that makes it reliable.
This chapter covers the engineering patterns that make data pipelines robust. If Section 16.1 was about ensuring your results are reproducible, this chapter is about ensuring the data that feeds those results is correct, consistent, and available when you need it.
17.2 ETL: a pattern you already know
ETL stands for Extract, Transform, Load — a three-stage pattern for moving data from source systems into a form suitable for analysis or modelling.
Extract reads raw data from its source: a database, an API, a CSV dump, a message queue, or an event stream. The key engineering concern is that sources are unreliable. APIs have rate limits. Database queries can time out. CSV files arrive late, malformed, or not at all. Every extraction step needs error handling, retries, and validation — the same defensive programming you’d apply to any external dependency.
Transform converts raw data into a useful shape. This might mean joining tables, filtering rows, computing derived columns, handling missing values, encoding categorical variables, or normalising numerical features. Transforms are where most bugs live, because they encode domain assumptions. A transform that fills missing revenue with zero is making a very different assumption from one that fills it with the column median — and both might be wrong.
Load writes the transformed data to its destination: a data warehouse, a feature store, a parquet file, or a model’s input buffer. The key engineering concern is atomicity — if a load fails partway through, the destination should not be left in an inconsistent state. This is the same problem as database transactions, and the solutions are similar: write to a staging location first, validate, then swap.
The raw data have the problems you’d expect from a real source: missing values, invalid entries, and mixed types. The transform step is where we impose structure.
# ---- Transform: clean, validate, and derive features ----def transform_user_data(df: pd.DataFrame) -> pd.DataFrame:"""Clean raw user data and compute features for modelling.""" clean = df.copy()# Handle invalid sessions: replace negatives with NaN, then fill clean.loc[clean["sessions"] <0, "sessions"] = np.nan# Fill missing revenue with median — assumes data are missing completely# at random (MCAR). If missingness is related to the value itself# (e.g. high earners less likely to report), this biases the result. clean["revenue"] = clean["revenue"].fillna(clean["revenue"].median())# Fill missing sessions with median clean["sessions"] = clean["sessions"].fillna(clean["sessions"].median())# Fill missing country with "Unknown" rather than dropping rows clean["country"] = clean["country"].fillna("Unknown")# Derive features clean["revenue_per_session"] = clean["revenue"] / clean["sessions"].clip(lower=1) clean["days_since_signup"] = ( pd.Timestamp("2024-01-01") - clean["signup_date"] ).dt.days clean["is_high_value"] = (clean["revenue"] > clean["revenue"].quantile(0.75)).astype(int)return cleanfeatures = transform_user_data(raw)print(f"Transformed: {features.shape[0]} rows, {features.isna().sum().sum()} nulls")print(f"New columns: {[c for c in features.columns if c notin raw.columns]}")
Transformed: 1000 rows, 0 nulls
New columns: ['revenue_per_session', 'days_since_signup', 'is_high_value']
# ---- Load: write to a versioned output with validation ----import hashlibimport iodef load_with_validation(df: pd.DataFrame, path: str) ->dict:"""Write a DataFrame to parquet with a content hash for verification.""" buf = io.BytesIO() df.to_parquet(buf, index=False) content_hash = hashlib.sha256(buf.getvalue()).hexdigest()[:16]# In production, write to path; here we just report record = {"rows": len(df),"columns": list(df.columns),"content_hash": content_hash,"null_count": int(df.isna().sum().sum()), }return recordmanifest = load_with_validation(features, "data/features/v=2024-01-01/users.parquet")print(f"Load manifest: {manifest}")
The content hash from the load step connects directly to the data reproducibility patterns in Section 16.4 — it’s a fingerprint that lets you verify that the feature data hasn’t changed between when it was written and when a model reads it.
Engineering Bridge
If you’ve built microservices, ETL maps cleanly onto a familiar pattern. Extract is calling an upstream service or reading from a database — you’d wrap it in retries with exponential backoff, validate the response schema, and handle timeouts. Transform is your business logic layer — pure functions that convert one data shape into another, testable in isolation. Load is writing to your own data store — you’d use transactions or write-ahead patterns to ensure consistency. The main difference is scale: a microservice processes one request at a time; an ETL pipeline processes an entire dataset in a single batch. But the engineering principles — idempotency, validation, error handling, atomicity — are identical.
17.3 Feature engineering: the transform layer
In the ETL pattern, the transform step is where data science diverges most from traditional data engineering. A data engineer’s transform typically cleans and restructures data — joins, filters, type conversions. A data scientist’s transform also creates features: derived quantities that encode domain knowledge in a form that models can learn from.
Feature engineering is the process of turning raw data into model inputs. It sounds mechanical, but it’s one of the highest-leverage activities in applied data science. A good feature can improve a model more than switching algorithms or tuning hyperparameters.
Each of these features encodes a specific hypothesis about user behaviour: users who spend across many categories might behave differently from those who concentrate in one; users whose average transaction is close to their maximum are consistent spenders, while those with a low ratio make occasional large purchases. The model doesn’t know these hypotheses — it just sees numbers. The feature engineer’s job is to present the numbers in a way that makes the underlying patterns learnable.
Author’s Note
Feature engineering feels like plumbing — unglamorous work that happens before the real modelling begins. But in applied data science, the representation of the data matters more than the choice of algorithm. A well-chosen feature can improve model performance more than any amount of hyperparameter tuning. The model is the same. The algorithm is the same. What changes is the signal the model has access to. This is the inverse of the software engineering instinct, where the algorithm is the interesting part and the data format is just a detail. In data science, the data format is the algorithm’s input, and shaping that input well is where most of the practical value lies.
17.4 The train/serve skew problem
One of the most insidious bugs in production ML systems is train/serve skew — a mismatch between how features are computed during training and how they’re computed at serving time. The model learns from one representation of the data but makes predictions on a subtly different one.
This happens more easily than you’d think. During training, you compute features in a batch pipeline using pandas on a static dataset. At serving time, you compute features in real time using a different code path — perhaps a Java service that reimplements the same logic but handles edge cases differently. A rounding difference, a different null-handling strategy, or a time-zone conversion that behaves differently between Python and Java, and your model’s predictions degrade silently.
import numpy as npfrom sklearn.preprocessing import StandardScalerfrom sklearn.linear_model import LogisticRegressionrng = np.random.default_rng(42)# Training: compute features from historical datatrain_data = rng.normal(50, 15, (500, 3))scaler = StandardScaler()X_train = scaler.fit_transform(train_data)y_train = (X_train[:, 0] +0.5* X_train[:, 1] >0).astype(int)model = LogisticRegression(random_state=42)model.fit(X_train, y_train)# Serving: a new observation arrivesnew_obs = np.array([[65, 40, 55]])# Correct: use the fitted scaler from trainingX_correct = scaler.transform(new_obs)pred_correct = model.predict_proba(X_correct)[0, 1]# Skewed: accidentally re-fit the scaler on just the new observationbad_scaler = StandardScaler()X_skewed = bad_scaler.fit_transform(new_obs) # this centres on the single obs!pred_skewed = model.predict_proba(X_skewed)[0, 1]print(f"Correct prediction: {pred_correct:.4f}")print(f"Skewed prediction: {pred_skewed:.4f}")print(f"Difference: {abs(pred_correct - pred_skewed):.4f}")
The correct serving path uses the training scaler — the one fitted on the training data’s mean and standard deviation. The skewed path re-fits a scaler on the single new observation, which divides by that observation’s standard deviation — zero, since a single point has no variance. Sklearn silently produces NaN features, which the model then consumes to produce meaningless predictions. No exception is raised; the code runs to completion and returns an answer. It’s just the wrong answer.
To see this at scale, we can generate many test observations and compare the correct and skewed predictions for each:
{#fig-train-serve-skew width=566 height=470 fig-alt=’ Scatter plot comparing correct vs skewed predicted probabilities for
100 test observations. Correct predictions spread from 0 to 1 along the
x-axis; skewed predictions cluster near 0.5 on the y-axis, showing that
re-fitting the scaler per observation destroys the model's discrimination.’}
Engineering Bridge
Train/serve skew is the data science equivalent of dev/prod parity — one of the twelve-factor app principles. In web development, you learn (usually the hard way) that if your dev environment uses SQLite but production uses PostgreSQL, you’ll discover bugs only in production. The same principle applies to feature computation: if the training pipeline computes features differently from the serving pipeline, your model will behave differently in production than in evaluation. The solution is also the same — minimise the gap. Use the same code, the same libraries, and ideally the same feature computation path for both training and serving.
17.5 Feature stores: the shared library of features
A feature store is infrastructure that solves two problems at once: it provides a single, shared repository of feature definitions (eliminating duplicate feature engineering across teams), and it serves the same features consistently for both training and inference (eliminating train/serve skew).
Instead of each model reimplementing “days since last purchase” or “average transaction amount over 30 days,” these features are defined once, computed by a shared pipeline, and stored in a system that serves them to any model that needs them. Training reads historical features as of the time each training example occurred (point-in-time correctness). Serving reads the latest feature values for real-time predictions.
Feature stores typically have two storage layers. The offline store — a data warehouse or file system — holds historical features used in training, stored as a large columnar dataset keyed by entity ID and timestamp. The online store — a low-latency key-value store like Redis or DynamoDB — holds the latest feature values used in real-time serving. A synchronisation process keeps the online store up to date as the offline store is refreshed.
import pandas as pdimport numpy as nprng = np.random.default_rng(42)# Simulate a feature store's offline layer: historical features keyed by# (user_id, event_timestamp)n_users =50n_snapshots =12# monthly snapshotsrecords = []for user_id inrange(1, n_users +1): base_spend = rng.exponential(200)for month inrange(n_snapshots): records.append({"user_id": user_id,"snapshot_date": pd.Timestamp("2023-01-01") + pd.DateOffset(months=month),"total_spend_30d": round(base_spend * rng.uniform(0.5, 1.5), 2),"transaction_count_30d": int(rng.poisson(8)),"days_since_last_purchase": int(rng.exponential(5)), })offline_store = pd.DataFrame(records)print(f"Offline store: {offline_store.shape[0]} rows "f"({n_users} users × {n_snapshots} snapshots)")print(f"Columns: {list(offline_store.columns)}")print()# Point-in-time lookup: get features as they were on a specific datedef get_features_at(store: pd.DataFrame, user_id: int, as_of: pd.Timestamp) -> pd.Series:"""Retrieve features for a user as of a given date.""" mask = (store["user_id"] == user_id) & (store["snapshot_date"] <= as_of)if mask.sum() ==0:return pd.Series(dtype=float)return store.loc[mask].sort_values("snapshot_date").iloc[-1]# Training: get features as of training label date (avoids future leakage)train_features = get_features_at(offline_store, user_id=1, as_of=pd.Timestamp("2023-06-15"))print(f"Training features for user 1 as of 2023-06-15:")print(train_features[["total_spend_30d", "transaction_count_30d","days_since_last_purchase"]].to_string())
Offline store: 600 rows (50 users × 12 snapshots)
Columns: ['user_id', 'snapshot_date', 'total_spend_30d', 'transaction_count_30d', 'days_since_last_purchase']
Training features for user 1 as of 2023-06-15:
total_spend_30d 545.62
transaction_count_30d 5
days_since_last_purchase 7
The get_features_at function illustrates the critical concept of point-in-time correctness. When building training data, you must retrieve features as they existed at the time the label was observed, not as they exist today. Using today’s features to predict yesterday’s outcome is data leakage — the model sees information from the future during training, learns to rely on it, and then fails in production where that information isn’t available.
Engineering Bridge
A feature store is to ML features what a package registry (npm, PyPI, Maven) is to code libraries. Without a registry, every team copies and modifies shared code independently, leading to inconsistent behaviour and duplicated maintenance. A package registry provides a single source of truth with versioning and dependency management. A feature store does the same for computed features: it provides a canonical definition, historical versioning, and consistent access for both training (batch reads of historical data) and serving (real-time reads of current values). The parallel even extends to the governance problem — just as a package registry lets you audit who depends on a library before changing its API, a feature store lets you audit which models depend on a feature before changing its computation logic.
17.6 Data validation: catching problems early
The most expensive bugs in data pipelines are the ones that don’t crash. A malformed API response that gets silently ingested, a join that drops rows due to a key mismatch, a feature column that drifts to all zeros after an upstream schema change — these produce valid-looking data that quietly degrade your model’s predictions.
Data validation is the practice of asserting expectations about your data at each stage of the pipeline, the same way you’d add assertions and contract tests to a service boundary.
import pandas as pdimport numpy as npdef validate_features(df: pd.DataFrame) ->list[str]:"""Validate a feature DataFrame against expected contracts.""" errors = []# Schema checks required_cols = {"user_id", "total_spend", "transaction_count","unique_categories"} missing = required_cols -set(df.columns)if missing: errors.append(f"Missing columns: {missing}")# Null checks null_pct = df.isnull().mean() high_null_cols = null_pct[null_pct >0.05].index.tolist()if high_null_cols: errors.append(f"Columns exceed 5% null threshold: {high_null_cols}")# Range checksif"total_spend"in df.columns and (df["total_spend"] <0).any(): errors.append("Negative values in total_spend")if"transaction_count"in df.columns and (df["transaction_count"] <0).any(): errors.append("Negative values in transaction_count")# Distribution checks (detect drift)if"total_spend"in df.columns: median_spend = df["total_spend"].median()if median_spend <1or median_spend >10_000: errors.append(f"Suspicious median total_spend: {median_spend:.2f}")# Completeness checkiflen(df) <100: errors.append(f"Fewer than 100 rows ({len(df)}) — possible extraction failure")return errors# Test with clean datarng = np.random.default_rng(42)good_data = pd.DataFrame({"user_id": range(500),"total_spend": rng.exponential(200, 500),"transaction_count": rng.poisson(10, 500),"unique_categories": rng.integers(1, 6, 500),})errors = validate_features(good_data)print(f"Clean data validation: {'PASSED'ifnot errors else errors}")# Test with problematic databad_data = good_data.copy()bad_data.loc[:40, "total_spend"] = np.nan # >5% nullsbad_data.loc[100:105, "transaction_count"] =-1# negative countserrors = validate_features(bad_data)print(f"Bad data validation: {errors}")
Clean data validation: PASSED
Bad data validation: ["Columns exceed 5% null threshold: ['total_spend']", 'Negative values in transaction_count']
The validation function checks four categories of expectation: schema (are the right columns present?), nulls (is data completeness within tolerance?), ranges (are values physically plausible?), and distributions (has the data drifted from expected patterns?). In production, these checks run at every pipeline stage — after extraction, after each transform, and before load. If any check fails, the pipeline halts and alerts the team rather than silently writing bad data.
Author’s Note
Data validation feels like a “nice to have” — something you add after the pipeline is working. But pipelines fail silently far more often than they fail loudly. A column that changes from British date format (DD/MM/YYYY) to American format (MM/DD/YYYY) will parse without error — 1 March becomes 3 January — and the model will retrain on quietly wrong features. The accuracy drop will be subtle, delayed, and extremely hard to diagnose after the fact. A simple validation check at the pipeline boundary catches these problems in seconds. The instinct should be: write the validation before the transform, not after.
17.7 Orchestrating pipeline stages
A real-world feature pipeline is rarely a single script. It’s a directed acyclic graph (DAG) of dependent stages: extraction depends on the source being available, transforms depend on the extraction completing, validation depends on the transform, and loading depends on validation passing.
In Section 16.7 we saw how DVC encodes these dependencies for reproducibility. For scheduled production pipelines — the ones that run nightly or hourly to keep feature stores up to date — dedicated orchestration tools manage execution, retries, scheduling, and alerting. The most widely used are Airflow (the established standard), Prefect (a more Pythonic alternative), and Dagster (which emphasises typed data contracts between stages).
All three share the same core abstraction: a DAG of tasks, where each task is a Python function or shell command, and edges define data dependencies. If you’ve used CI/CD systems like GitHub Actions or GitLab CI, the structure will feel familiar — jobs, stages, dependencies, and retry policies.
# Conceptual Airflow DAG — not executable in this environment# but illustrates the pipeline structurefrom airflow import DAGfrom airflow.operators.python import PythonOperatorfrom datetime import datetimedag = DAG("user_features_pipeline", schedule_interval="@daily", start_date=datetime(2024, 1, 1), catchup=False,)extract = PythonOperator( task_id="extract_transactions", python_callable=extract_transactions, # reads from source DB dag=dag,)transform = PythonOperator( task_id="compute_user_features", python_callable=compute_user_features, # aggregates per user dag=dag,)validate = PythonOperator( task_id="validate_features", python_callable=validate_feature_data, # checks quality gates dag=dag,)load = PythonOperator( task_id="load_to_feature_store", python_callable=load_to_feature_store, # writes to offline + online stores dag=dag,)extract >> transform >> validate >> load # define execution order
The >> operator defines dependencies: transform won’t run until extract succeeds, validate won’t run until transform succeeds. If validate fails, load never executes — bad data never reaches the feature store. The orchestrator handles scheduling (run daily at 02:00), retries (retry extract up to 3 times with exponential backoff), and alerting (page the on-call engineer if the pipeline hasn’t succeeded by 06:00).
17.8 A worked example: end-to-end feature pipeline
The following example brings together the concepts from this chapter: extraction, validation, feature engineering, and reproducible output. We’ll build a feature pipeline for the service metrics scenario from Section 16.9 and demonstrate how each stage validates its outputs.
The raw extract has the usual problems: null error counts (perhaps the monitoring agent was down during those intervals) and negative CPU readings from a sensor glitch. Before transforming this data, we validate the extraction to catch catastrophic failures early — a truncated extract or a missing column would waste everything downstream.
With the extraction validated, the transform stage cleans the data and engineers features. Notice that we fill missing values using per-service medians rather than a global median — the error rate for payment-svc is likely on a different scale from search-api, so a global fill would blur meaningful differences between services.
# ---- Stage 3: Transform — clean and engineer features ----def transform_service_metrics(df: pd.DataFrame) -> pd.DataFrame:"""Clean raw metrics and compute service-level features.""" clean = df.copy()# Fix invalid values clean.loc[clean["cpu_pct"] <0, "cpu_pct"] = np.nan# Fill missing values with per-service mediansfor col in ["error_count", "cpu_pct"]: clean[col] = clean.groupby("service_id")[col].transform(lambda s: s.fillna(s.median()) )# Derive features clean["error_rate"] = clean["error_count"] / clean["request_rate"].clip(lower=1) clean["latency_ratio"] = clean["p99_latency_ms"] / clean["p50_latency_ms"].clip(lower=1) clean["is_high_cpu"] = (clean["cpu_pct"] >80).astype(int)# Time-based features clean["hour"] = clean["timestamp"].dt.hour clean["is_business_hours"] = clean["hour"].between(9, 17).astype(int)return cleanfeatures = transform_service_metrics(raw_metrics)print(f"Transformed: {features.shape[0]} rows, {features.shape[1]} columns")print(f" New features: error_rate, latency_ratio, is_high_cpu, hour, is_business_hours")print(f" Remaining nulls: {features.isna().sum().sum()}")
The transform has cleaned the data and produced five new features. Visualising their distributions gives us a quick sanity check — and establishes a baseline that future pipeline runs can be compared against to detect drift.
{#fig-feature-distributions width=948 height=326 fig-alt=’ Three histograms side by side showing the distributions of error
rate (right-skewed, mostly near zero), latency ratio (right-skewed,
centred around 2), and CPU percentage (roughly normal, centred around 45).’}
Now we validate the output — this is the gate between transform and load, and it’s where we check invariants that should hold after a correct transformation. If any check fails, the pipeline halts rather than writing bad features to the store.
# ---- Stage 4: Validate transform output ----def validate_features_pipeline(df: pd.DataFrame) ->list[str]:"""Gate check after feature engineering.""" errors = []# No nulls should remain after transform null_cols = df.columns[df.isna().any()].tolist()if null_cols: errors.append(f"Unexpected nulls in: {null_cols}")# Derived features should be non-negativefor col in ["error_rate", "latency_ratio"]:if col in df.columns and (df[col] <0).any(): errors.append(f"Negative values in {col}")# Latency ratio should be >= 1 (p99 >= p50 by definition)if"latency_ratio"in df.columns: below_one = (df["latency_ratio"] <1.0).mean()if below_one >0.05: errors.append(f"Latency ratio < 1 for {below_one:.1%} of rows")return errorstransform_errors = validate_features_pipeline(features)print(f"Transform validation: {'PASSED'ifnot transform_errors else transform_errors}")
Transform validation: PASSED
With both validation gates passed, we can safely load the features. The load step writes the data and produces a manifest — a summary record that captures the pipeline’s provenance: row counts, a content hash, and the results of every validation check.
Each stage validates its output before the next stage begins. The final manifest records the full provenance: how many rows entered, how many survived, what the output hash is, and whether every validation gate passed. This is the data pipeline equivalent of a CI build report — a complete record of what happened, in what order, with what results.
17.9 Summary
Data pipelines are the majority of ML code. The model is a small component; the infrastructure that extracts, transforms, validates, and serves data is where most engineering effort goes.
ETL is a pattern, not a technology. Extract handles unreliable sources with defensive programming. Transform encodes domain knowledge as features. Load writes results atomically with content hashes for verification.
Train/serve skew is a silent killer. Use the same code path for computing features in training and in serving. Feature stores formalise this by providing a single source of truth for feature definitions and values.
Validate data at every pipeline boundary. Check schemas, null rates, value ranges, and distributions after each stage. A pipeline that halts on bad data is better than one that silently produces bad predictions.
Orchestrate with DAGs. Express pipeline dependencies explicitly using tools like Airflow, Prefect, or DVC. This gives you automatic retries, dependency tracking, and the confidence that no stage runs before its inputs are ready and validated.
17.10 Exercises
Extend the validate_features function from Section 17.6 to include a distribution drift check: compare the mean and standard deviation of each numerical column against reference statistics (provided as a dictionary), and flag any column where the mean has shifted by more than 2 standard deviations. Test it by creating a “drifted” dataset where one feature’s mean has shifted.
Write a feature engineering function that takes the raw transaction data from Section 17.3 and computes time-windowed features: for each user, calculate total spend and transaction count over the last 7 days, 30 days, and 90 days relative to a given reference date. Discuss why the reference date matters for avoiding data leakage.
Demonstrate the train/serve skew problem with a StandardScaler: train a logistic regression model using fit_transform on the training set, then show that (a) using transform with the training scaler produces correct serving predictions, and (b) calling fit_transform on each new observation individually produces wrong predictions. Measure the average prediction error between the two approaches.
Conceptual: Your company has 15 ML models in production, and 8 of them use a feature called “customer lifetime value” (CLV). Each model team computes CLV slightly differently — different time windows, different revenue definitions, different null handling. A colleague proposes building a feature store to standardise this. What are the benefits? What are the risks? How would you handle the transition without breaking existing models?
Conceptual: A junior data scientist argues that data validation is unnecessary because “the model will just learn to handle bad data.” In what sense are they wrong? In what narrow sense might they have a point? What’s the strongest argument for validation that goes beyond model accuracy?
---title: "Data pipelines: ETL and feature stores"---## The glue code problem {#sec-pipelines-intro}A common misconception about machine learning systems is that they're mostly *model* code — the elegant lines where you fit a random forest or tune a gradient boosting classifier. In practice, the model is a small box in the centre of a much larger system. The rest is data pipelines: extracting raw data from sources, transforming it into features the model can consume, loading it into the right place at the right time, and doing all of this reliably, repeatedly, and at scale.Google's influential paper on technical debt in ML systems called this "glue code" — the infrastructure that connects data sources to models and models to decisions. In their experience, the model itself accounted for roughly 5% of the total code in a production ML system. The other 95% was data collection, feature extraction, validation, serving infrastructure, and monitoring.If you've built microservices, this ratio will feel familiar. The business logic in a typical service is a thin layer; the rest is routing, serialisation, error handling, retries, logging, and health checks. Data pipelines are the same pattern applied to data: the statistical transformation is the thin layer, surrounded by engineering that makes it reliable.This chapter covers the engineering patterns that make data pipelines robust. If @sec-repro-intro was about ensuring your results are reproducible, this chapter is about ensuring the data that feeds those results is correct, consistent, and available when you need it.## ETL: a pattern you already know {#sec-etl}**ETL** stands for **Extract, Transform, Load** — a three-stage pattern for moving data from source systems into a form suitable for analysis or modelling.**Extract** reads raw data from its source: a database, an API, a CSV dump, a message queue, or an event stream. The key engineering concern is that sources are unreliable. APIs have rate limits. Database queries can time out. CSV files arrive late, malformed, or not at all. Every extraction step needs error handling, retries, and validation — the same defensive programming you'd apply to any external dependency.**Transform** converts raw data into a useful shape. This might mean joining tables, filtering rows, computing derived columns, handling missing values, encoding categorical variables, or normalising numerical features. Transforms are where most bugs live, because they encode domain assumptions. A transform that fills missing revenue with zero is making a very different assumption from one that fills it with the column median — and both might be wrong.**Load** writes the transformed data to its destination: a data warehouse, a feature store, a parquet file, or a model's input buffer. The key engineering concern is atomicity — if a load fails partway through, the destination should not be left in an inconsistent state. This is the same problem as database transactions, and the solutions are similar: write to a staging location first, validate, then swap.```{python}#| label: etl-example#| echo: trueimport pandas as pdimport numpy as nprng = np.random.default_rng(42)# ---- Extract: simulate reading from a messy source ----raw = pd.DataFrame({"user_id": range(1, 1001),"signup_date": pd.date_range("2023-01-01", periods=1000, freq="8h"),"revenue": rng.exponential(50, 1000),"country": rng.choice(["GB", "US", "DE", "FR", None], 1000, p=[0.3, 0.3, 0.15, 0.15, 0.1]),"sessions": rng.poisson(12, 1000),})# Introduce realistic messinessraw.loc[rng.choice(1000, 30, replace=False), "revenue"] = np.nanraw.loc[rng.choice(1000, 15, replace=False), "sessions"] =-1# bad dataprint(f"Raw: {raw.shape[0]} rows, {raw.isna().sum().sum()} nulls, "f"{(raw['sessions'] <0).sum()} invalid sessions")```The raw data have the problems you'd expect from a real source: missing values, invalid entries, and mixed types. The transform step is where we impose structure.```{python}#| label: transform-example#| echo: true# ---- Transform: clean, validate, and derive features ----def transform_user_data(df: pd.DataFrame) -> pd.DataFrame:"""Clean raw user data and compute features for modelling.""" clean = df.copy()# Handle invalid sessions: replace negatives with NaN, then fill clean.loc[clean["sessions"] <0, "sessions"] = np.nan# Fill missing revenue with median — assumes data are missing completely# at random (MCAR). If missingness is related to the value itself# (e.g. high earners less likely to report), this biases the result. clean["revenue"] = clean["revenue"].fillna(clean["revenue"].median())# Fill missing sessions with median clean["sessions"] = clean["sessions"].fillna(clean["sessions"].median())# Fill missing country with "Unknown" rather than dropping rows clean["country"] = clean["country"].fillna("Unknown")# Derive features clean["revenue_per_session"] = clean["revenue"] / clean["sessions"].clip(lower=1) clean["days_since_signup"] = ( pd.Timestamp("2024-01-01") - clean["signup_date"] ).dt.days clean["is_high_value"] = (clean["revenue"] > clean["revenue"].quantile(0.75)).astype(int)return cleanfeatures = transform_user_data(raw)print(f"Transformed: {features.shape[0]} rows, {features.isna().sum().sum()} nulls")print(f"New columns: {[c for c in features.columns if c notin raw.columns]}")``````{python}#| label: load-example#| echo: true# ---- Load: write to a versioned output with validation ----import hashlibimport iodef load_with_validation(df: pd.DataFrame, path: str) ->dict:"""Write a DataFrame to parquet with a content hash for verification.""" buf = io.BytesIO() df.to_parquet(buf, index=False) content_hash = hashlib.sha256(buf.getvalue()).hexdigest()[:16]# In production, write to path; here we just report record = {"rows": len(df),"columns": list(df.columns),"content_hash": content_hash,"null_count": int(df.isna().sum().sum()), }return recordmanifest = load_with_validation(features, "data/features/v=2024-01-01/users.parquet")print(f"Load manifest: {manifest}")```The content hash from the load step connects directly to the data reproducibility patterns in @sec-data-repro — it's a fingerprint that lets you verify that the feature data hasn't changed between when it was written and when a model reads it.::: {.callout-note}## Engineering BridgeIf you've built microservices, ETL maps cleanly onto a familiar pattern. **Extract** is calling an upstream service or reading from a database — you'd wrap it in retries with exponential backoff, validate the response schema, and handle timeouts. **Transform** is your business logic layer — pure functions that convert one data shape into another, testable in isolation. **Load** is writing to your own data store — you'd use transactions or write-ahead patterns to ensure consistency. The main difference is scale: a microservice processes one request at a time; an ETL pipeline processes an entire dataset in a single batch. But the engineering principles — idempotency, validation, error handling, atomicity — are identical.:::## Feature engineering: the transform layer {#sec-feature-engineering}In the ETL pattern, the transform step is where data science diverges most from traditional data engineering. A data engineer's transform typically cleans and restructures data — joins, filters, type conversions. A data scientist's transform also creates **features**: derived quantities that encode domain knowledge in a form that models can learn from.Feature engineering is the process of turning raw data into model inputs. It sounds mechanical, but it's one of the highest-leverage activities in applied data science. A good feature can improve a model more than switching algorithms or tuning hyperparameters.```{python}#| label: feature-engineering#| echo: trueimport pandas as pdimport numpy as nprng = np.random.default_rng(42)# Simulated e-commerce transaction datan =2000transactions = pd.DataFrame({"user_id": rng.integers(1, 201, n),"timestamp": pd.date_range("2023-06-01", periods=n, freq="37min"),"amount": np.round(rng.lognormal(3, 1, n), 2),"category": rng.choice(["electronics", "clothing", "food", "books"], n),})# Feature engineering: aggregate per-user behaviouruser_features = transactions.groupby("user_id").agg( total_spend=("amount", "sum"), avg_transaction=("amount", "mean"), transaction_count=("amount", "count"), unique_categories=("category", "nunique"), max_single_purchase=("amount", "max"),).reset_index()# Derived ratiosuser_features["avg_to_max_ratio"] = ( user_features["avg_transaction"] / user_features["max_single_purchase"])print(user_features.describe().round(2))```Each of these features encodes a specific hypothesis about user behaviour: users who spend across many categories might behave differently from those who concentrate in one; users whose average transaction is close to their maximum are consistent spenders, while those with a low ratio make occasional large purchases. The model doesn't know these hypotheses — it just sees numbers. The feature engineer's job is to present the numbers in a way that makes the underlying patterns learnable.::: {.callout-tip}## Author's NoteFeature engineering feels like plumbing — unglamorous work that happens before the real modelling begins. But in applied data science, the representation of the data matters more than the choice of algorithm. A well-chosen feature can improve model performance more than any amount of hyperparameter tuning. The model is the same. The algorithm is the same. What changes is the *signal* the model has access to. This is the inverse of the software engineering instinct, where the algorithm is the interesting part and the data format is just a detail. In data science, the data format *is* the algorithm's input, and shaping that input well is where most of the practical value lies.:::## The train/serve skew problem {#sec-train-serve-skew}One of the most insidious bugs in production ML systems is **train/serve skew** — a mismatch between how features are computed during training and how they're computed at serving time. The model learns from one representation of the data but makes predictions on a subtly different one.This happens more easily than you'd think. During training, you compute features in a batch pipeline using pandas on a static dataset. At serving time, you compute features in real time using a different code path — perhaps a Java service that reimplements the same logic but handles edge cases differently. A rounding difference, a different null-handling strategy, or a time-zone conversion that behaves differently between Python and Java, and your model's predictions degrade silently.```{python}#| label: train-serve-skew#| echo: trueimport numpy as npfrom sklearn.preprocessing import StandardScalerfrom sklearn.linear_model import LogisticRegressionrng = np.random.default_rng(42)# Training: compute features from historical datatrain_data = rng.normal(50, 15, (500, 3))scaler = StandardScaler()X_train = scaler.fit_transform(train_data)y_train = (X_train[:, 0] +0.5* X_train[:, 1] >0).astype(int)model = LogisticRegression(random_state=42)model.fit(X_train, y_train)# Serving: a new observation arrivesnew_obs = np.array([[65, 40, 55]])# Correct: use the fitted scaler from trainingX_correct = scaler.transform(new_obs)pred_correct = model.predict_proba(X_correct)[0, 1]# Skewed: accidentally re-fit the scaler on just the new observationbad_scaler = StandardScaler()X_skewed = bad_scaler.fit_transform(new_obs) # this centres on the single obs!pred_skewed = model.predict_proba(X_skewed)[0, 1]print(f"Correct prediction: {pred_correct:.4f}")print(f"Skewed prediction: {pred_skewed:.4f}")print(f"Difference: {abs(pred_correct - pred_skewed):.4f}")```The correct serving path uses the *training* scaler — the one fitted on the training data's mean and standard deviation. The skewed path re-fits a scaler on the single new observation, which divides by that observation's standard deviation — zero, since a single point has no variance. Sklearn silently produces NaN features, which the model then consumes to produce meaningless predictions. No exception is raised; the code runs to completion and returns an answer. It's just the wrong answer.To see this at scale, we can generate many test observations and compare the correct and skewed predictions for each:```{python}#| label: fig-train-serve-skew#| echo: true#| fig-cap: "Train/serve skew in action. Each point is a test observation scored#| two ways: correctly (using the training scaler) and incorrectly (re-fitting#| a scaler per observation). The skewed predictions cluster around 0.5 — the#| model's default when features are NaN — while correct predictions span the#| full probability range."#| fig-alt: "Scatter plot comparing correct vs skewed predicted probabilities for#| 100 test observations. Correct predictions spread from 0 to 1 along the#| x-axis; skewed predictions cluster near 0.5 on the y-axis, showing that#| re-fitting the scaler per observation destroys the model's discrimination."import matplotlib.pyplot as pltimport warningstest_data = rng.normal(50, 15, (100, 3))correct_preds = model.predict_proba(scaler.transform(test_data))[:, 1]skewed_preds = []for obs in test_data: s = StandardScaler()with warnings.catch_warnings(): warnings.simplefilter("ignore") # suppress divide-by-zero warnings x = s.fit_transform(obs.reshape(1, -1)) skewed_preds.append(model.predict_proba(x)[0, 1])skewed_preds = np.array(skewed_preds)fig, ax = plt.subplots(figsize=(6, 5))ax.scatter(correct_preds, skewed_preds, alpha=0.5, s=30, edgecolors="none")ax.plot([0, 1], [0, 1], "--", color="grey", linewidth=1, label="Perfect agreement")ax.set_xlabel("Correct prediction (training scaler)")ax.set_ylabel("Skewed prediction (re-fitted scaler)")ax.set_title("Train/serve skew distorts predictions")ax.legend(frameon=False)ax.set_xlim(-0.05, 1.05)ax.set_ylim(-0.05, 1.05)plt.tight_layout()plt.show()```::: {.callout-note}## Engineering BridgeTrain/serve skew is the data science equivalent of **dev/prod parity** — one of the twelve-factor app principles. In web development, you learn (usually the hard way) that if your dev environment uses SQLite but production uses PostgreSQL, you'll discover bugs only in production. The same principle applies to feature computation: if the training pipeline computes features differently from the serving pipeline, your model will behave differently in production than in evaluation. The solution is also the same — minimise the gap. Use the same code, the same libraries, and ideally the same feature computation path for both training and serving.:::## Feature stores: the shared library of features {#sec-feature-stores}A **feature store** is infrastructure that solves two problems at once: it provides a single, shared repository of feature definitions (eliminating duplicate feature engineering across teams), and it serves the same features consistently for both training and inference (eliminating train/serve skew).Instead of each model reimplementing "days since last purchase" or "average transaction amount over 30 days," these features are defined once, computed by a shared pipeline, and stored in a system that serves them to any model that needs them. Training reads historical features as of the time each training example occurred (point-in-time correctness). Serving reads the latest feature values for real-time predictions.Feature stores typically have two storage layers. The **offline store** — a data warehouse or file system — holds historical features used in training, stored as a large columnar dataset keyed by entity ID and timestamp. The **online store** — a low-latency key-value store like Redis or DynamoDB — holds the latest feature values used in real-time serving. A synchronisation process keeps the online store up to date as the offline store is refreshed.```{python}#| label: feature-store-concept#| echo: trueimport pandas as pdimport numpy as nprng = np.random.default_rng(42)# Simulate a feature store's offline layer: historical features keyed by# (user_id, event_timestamp)n_users =50n_snapshots =12# monthly snapshotsrecords = []for user_id inrange(1, n_users +1): base_spend = rng.exponential(200)for month inrange(n_snapshots): records.append({"user_id": user_id,"snapshot_date": pd.Timestamp("2023-01-01") + pd.DateOffset(months=month),"total_spend_30d": round(base_spend * rng.uniform(0.5, 1.5), 2),"transaction_count_30d": int(rng.poisson(8)),"days_since_last_purchase": int(rng.exponential(5)), })offline_store = pd.DataFrame(records)print(f"Offline store: {offline_store.shape[0]} rows "f"({n_users} users × {n_snapshots} snapshots)")print(f"Columns: {list(offline_store.columns)}")print()# Point-in-time lookup: get features as they were on a specific datedef get_features_at(store: pd.DataFrame, user_id: int, as_of: pd.Timestamp) -> pd.Series:"""Retrieve features for a user as of a given date.""" mask = (store["user_id"] == user_id) & (store["snapshot_date"] <= as_of)if mask.sum() ==0:return pd.Series(dtype=float)return store.loc[mask].sort_values("snapshot_date").iloc[-1]# Training: get features as of training label date (avoids future leakage)train_features = get_features_at(offline_store, user_id=1, as_of=pd.Timestamp("2023-06-15"))print(f"Training features for user 1 as of 2023-06-15:")print(train_features[["total_spend_30d", "transaction_count_30d","days_since_last_purchase"]].to_string())```The `get_features_at` function illustrates the critical concept of **point-in-time correctness**. When building training data, you must retrieve features as they existed *at the time the label was observed*, not as they exist today. Using today's features to predict yesterday's outcome is data leakage — the model sees information from the future during training, learns to rely on it, and then fails in production where that information isn't available.::: {.callout-note}## Engineering BridgeA feature store is to ML features what a package registry (npm, PyPI, Maven) is to code libraries. Without a registry, every team copies and modifies shared code independently, leading to inconsistent behaviour and duplicated maintenance. A package registry provides a single source of truth with versioning and dependency management. A feature store does the same for computed features: it provides a canonical definition, historical versioning, and consistent access for both training (batch reads of historical data) and serving (real-time reads of current values). The parallel even extends to the governance problem — just as a package registry lets you audit who depends on a library before changing its API, a feature store lets you audit which models depend on a feature before changing its computation logic.:::## Data validation: catching problems early {#sec-data-validation}The most expensive bugs in data pipelines are the ones that don't crash. A malformed API response that gets silently ingested, a join that drops rows due to a key mismatch, a feature column that drifts to all zeros after an upstream schema change — these produce valid-looking data that quietly degrade your model's predictions.Data validation is the practice of asserting expectations about your data at each stage of the pipeline, the same way you'd add assertions and contract tests to a service boundary.```{python}#| label: data-validation#| echo: trueimport pandas as pdimport numpy as npdef validate_features(df: pd.DataFrame) ->list[str]:"""Validate a feature DataFrame against expected contracts.""" errors = []# Schema checks required_cols = {"user_id", "total_spend", "transaction_count","unique_categories"} missing = required_cols -set(df.columns)if missing: errors.append(f"Missing columns: {missing}")# Null checks null_pct = df.isnull().mean() high_null_cols = null_pct[null_pct >0.05].index.tolist()if high_null_cols: errors.append(f"Columns exceed 5% null threshold: {high_null_cols}")# Range checksif"total_spend"in df.columns and (df["total_spend"] <0).any(): errors.append("Negative values in total_spend")if"transaction_count"in df.columns and (df["transaction_count"] <0).any(): errors.append("Negative values in transaction_count")# Distribution checks (detect drift)if"total_spend"in df.columns: median_spend = df["total_spend"].median()if median_spend <1or median_spend >10_000: errors.append(f"Suspicious median total_spend: {median_spend:.2f}")# Completeness checkiflen(df) <100: errors.append(f"Fewer than 100 rows ({len(df)}) — possible extraction failure")return errors# Test with clean datarng = np.random.default_rng(42)good_data = pd.DataFrame({"user_id": range(500),"total_spend": rng.exponential(200, 500),"transaction_count": rng.poisson(10, 500),"unique_categories": rng.integers(1, 6, 500),})errors = validate_features(good_data)print(f"Clean data validation: {'PASSED'ifnot errors else errors}")# Test with problematic databad_data = good_data.copy()bad_data.loc[:40, "total_spend"] = np.nan # >5% nullsbad_data.loc[100:105, "transaction_count"] =-1# negative countserrors = validate_features(bad_data)print(f"Bad data validation: {errors}")```The validation function checks four categories of expectation: **schema** (are the right columns present?), **nulls** (is data completeness within tolerance?), **ranges** (are values physically plausible?), and **distributions** (has the data drifted from expected patterns?). In production, these checks run at every pipeline stage — after extraction, after each transform, and before load. If any check fails, the pipeline halts and alerts the team rather than silently writing bad data.::: {.callout-tip}## Author's NoteData validation feels like a "nice to have" — something you add after the pipeline is working. But pipelines fail silently far more often than they fail loudly. A column that changes from British date format (DD/MM/YYYY) to American format (MM/DD/YYYY) will parse without error — 1 March becomes 3 January — and the model will retrain on quietly wrong features. The accuracy drop will be subtle, delayed, and extremely hard to diagnose after the fact. A simple validation check at the pipeline boundary catches these problems in seconds. The instinct should be: write the validation *before* the transform, not after.:::## Orchestrating pipeline stages {#sec-orchestration}A real-world feature pipeline is rarely a single script. It's a directed acyclic graph (DAG) of dependent stages: extraction depends on the source being available, transforms depend on the extraction completing, validation depends on the transform, and loading depends on validation passing.In @sec-notebooks-to-pipelines we saw how DVC encodes these dependencies for reproducibility. For scheduled production pipelines — the ones that run nightly or hourly to keep feature stores up to date — dedicated orchestration tools manage execution, retries, scheduling, and alerting. The most widely used are **Airflow** (the established standard), **Prefect** (a more Pythonic alternative), and **Dagster** (which emphasises typed data contracts between stages).All three share the same core abstraction: a DAG of tasks, where each task is a Python function or shell command, and edges define data dependencies. If you've used CI/CD systems like GitHub Actions or GitLab CI, the structure will feel familiar — jobs, stages, dependencies, and retry policies.```python# Conceptual Airflow DAG — not executable in this environment# but illustrates the pipeline structurefrom airflow import DAGfrom airflow.operators.python import PythonOperatorfrom datetime import datetimedag = DAG("user_features_pipeline", schedule_interval="@daily", start_date=datetime(2024, 1, 1), catchup=False,)extract = PythonOperator( task_id="extract_transactions", python_callable=extract_transactions, # reads from source DB dag=dag,)transform = PythonOperator( task_id="compute_user_features", python_callable=compute_user_features, # aggregates per user dag=dag,)validate = PythonOperator( task_id="validate_features", python_callable=validate_feature_data, # checks quality gates dag=dag,)load = PythonOperator( task_id="load_to_feature_store", python_callable=load_to_feature_store, # writes to offline + online stores dag=dag,)extract >> transform >> validate >> load # define execution order```The `>>` operator defines dependencies: `transform` won't run until `extract` succeeds, `validate` won't run until `transform` succeeds. If `validate` fails, `load` never executes — bad data never reaches the feature store. The orchestrator handles scheduling (run daily at 02:00), retries (retry `extract` up to 3 times with exponential backoff), and alerting (page the on-call engineer if the pipeline hasn't succeeded by 06:00).## A worked example: end-to-end feature pipeline {#sec-pipeline-worked-example}The following example brings together the concepts from this chapter: extraction, validation, feature engineering, and reproducible output. We'll build a feature pipeline for the service metrics scenario from @sec-repro-worked-example and demonstrate how each stage validates its outputs.```{python}#| label: pipeline-worked-example#| echo: trueimport numpy as npimport pandas as pdimport hashlibimport iofrom datetime import datetime, timezone# ---- Stage 1: Extract ----rng = np.random.default_rng(42)n =1500raw_metrics = pd.DataFrame({"service_id": rng.choice(["api-gateway", "auth-service", "payment-svc","search-api", "user-svc"], n),"timestamp": pd.date_range("2023-01-01", periods=n, freq="6h"),"request_rate": rng.exponential(500, n),"error_count": rng.poisson(5, n),"p50_latency_ms": (p50 := rng.lognormal(3, 0.5, n)),"p99_latency_ms": p50 + rng.exponential(30, n), # p99 >= p50 by definition"cpu_pct": np.clip(rng.normal(45, 20, n), 0, 100),})# Introduce realistic messinessraw_metrics.loc[rng.choice(n, 20, replace=False), "error_count"] = np.nanraw_metrics.loc[rng.choice(n, 10, replace=False), "cpu_pct"] =-1# sensor glitchprint(f"Extracted: {raw_metrics.shape[0]} rows from 5 services")print(f" Nulls: {raw_metrics.isna().sum().sum()}, "f"Invalid CPU: {(raw_metrics['cpu_pct'] <0).sum()}")```The raw extract has the usual problems: null error counts (perhaps the monitoring agent was down during those intervals) and negative CPU readings from a sensor glitch. Before transforming this data, we validate the extraction to catch catastrophic failures early — a truncated extract or a missing column would waste everything downstream.```{python}#| label: pipeline-validate-extract#| echo: true# ---- Stage 2: Validate extraction ----def validate_extraction(df: pd.DataFrame) ->list[str]:"""Gate check after extraction.""" errors = [] required = {"service_id", "timestamp", "request_rate", "error_count","p50_latency_ms", "p99_latency_ms", "cpu_pct"} missing = required -set(df.columns)if missing: errors.append(f"Missing columns: {missing}")iflen(df) <100: errors.append(f"Too few rows: {len(df)}")if df["timestamp"].nunique() <10: errors.append("Suspiciously few unique timestamps")return errorsextract_errors = validate_extraction(raw_metrics)print(f"Extraction validation: {'PASSED'ifnot extract_errors else extract_errors}")```With the extraction validated, the transform stage cleans the data and engineers features. Notice that we fill missing values using per-service medians rather than a global median — the error rate for `payment-svc` is likely on a different scale from `search-api`, so a global fill would blur meaningful differences between services.```{python}#| label: pipeline-transform#| echo: true# ---- Stage 3: Transform — clean and engineer features ----def transform_service_metrics(df: pd.DataFrame) -> pd.DataFrame:"""Clean raw metrics and compute service-level features.""" clean = df.copy()# Fix invalid values clean.loc[clean["cpu_pct"] <0, "cpu_pct"] = np.nan# Fill missing values with per-service mediansfor col in ["error_count", "cpu_pct"]: clean[col] = clean.groupby("service_id")[col].transform(lambda s: s.fillna(s.median()) )# Derive features clean["error_rate"] = clean["error_count"] / clean["request_rate"].clip(lower=1) clean["latency_ratio"] = clean["p99_latency_ms"] / clean["p50_latency_ms"].clip(lower=1) clean["is_high_cpu"] = (clean["cpu_pct"] >80).astype(int)# Time-based features clean["hour"] = clean["timestamp"].dt.hour clean["is_business_hours"] = clean["hour"].between(9, 17).astype(int)return cleanfeatures = transform_service_metrics(raw_metrics)print(f"Transformed: {features.shape[0]} rows, {features.shape[1]} columns")print(f" New features: error_rate, latency_ratio, is_high_cpu, hour, is_business_hours")print(f" Remaining nulls: {features.isna().sum().sum()}")```The transform has cleaned the data and produced five new features. Visualising their distributions gives us a quick sanity check — and establishes a baseline that future pipeline runs can be compared against to detect drift.```{python}#| label: fig-feature-distributions#| echo: true#| fig-cap: "Distributions of the three continuous features produced by the#| transform stage. These baselines are what a distribution drift check would#| compare against on future pipeline runs."#| fig-alt: "Three histograms side by side showing the distributions of error#| rate (right-skewed, mostly near zero), latency ratio (right-skewed,#| centred around 2), and CPU percentage (roughly normal, centred around 45)."import matplotlib.pyplot as pltfig, axes = plt.subplots(1, 3, figsize=(10, 3.5))feat_cols = ["error_rate", "latency_ratio", "cpu_pct"]titles = ["Error rate", "Latency ratio (p99/p50)", "CPU %"]for ax, col, title inzip(axes, feat_cols, titles): ax.hist(features[col].dropna(), bins=40, edgecolor="white", linewidth=0.5) ax.set_xlabel(col) ax.set_title(title) ax.axvline(features[col].median(), color="red", linestyle="--", linewidth=1, label=f"Median: {features[col].median():.2f}") ax.legend(fontsize=8, frameon=False)plt.tight_layout()plt.show()```Now we validate the output — this is the gate between transform and load, and it's where we check invariants that should hold after a correct transformation. If any check fails, the pipeline halts rather than writing bad features to the store.```{python}#| label: pipeline-validate-transform#| echo: true# ---- Stage 4: Validate transform output ----def validate_features_pipeline(df: pd.DataFrame) ->list[str]:"""Gate check after feature engineering.""" errors = []# No nulls should remain after transform null_cols = df.columns[df.isna().any()].tolist()if null_cols: errors.append(f"Unexpected nulls in: {null_cols}")# Derived features should be non-negativefor col in ["error_rate", "latency_ratio"]:if col in df.columns and (df[col] <0).any(): errors.append(f"Negative values in {col}")# Latency ratio should be >= 1 (p99 >= p50 by definition)if"latency_ratio"in df.columns: below_one = (df["latency_ratio"] <1.0).mean()if below_one >0.05: errors.append(f"Latency ratio < 1 for {below_one:.1%} of rows")return errorstransform_errors = validate_features_pipeline(features)print(f"Transform validation: {'PASSED'ifnot transform_errors else transform_errors}")```With both validation gates passed, we can safely load the features. The load step writes the data and produces a manifest — a summary record that captures the pipeline's provenance: row counts, a content hash, and the results of every validation check.```{python}#| label: pipeline-load#| echo: true# ---- Stage 5: Load with provenance ----buf = io.BytesIO()features.to_parquet(buf, index=False)content_hash = hashlib.sha256(buf.getvalue()).hexdigest()[:16]manifest = {"pipeline": "service_features","timestamp": datetime.now(timezone.utc).isoformat(),"input_rows": len(raw_metrics),"output_rows": len(features),"output_columns": len(features.columns),"content_hash": content_hash,"validation": {"extract": "PASSED"ifnot extract_errors else extract_errors,"transform": "PASSED"ifnot transform_errors else transform_errors, },}print("Pipeline manifest:")for key, value in manifest.items():print(f" {key}: {value}")```Each stage validates its output before the next stage begins. The final manifest records the full provenance: how many rows entered, how many survived, what the output hash is, and whether every validation gate passed. This is the data pipeline equivalent of a CI build report — a complete record of what happened, in what order, with what results.## Summary {#sec-pipelines-summary}1. **Data pipelines are the majority of ML code.** The model is a small component; the infrastructure that extracts, transforms, validates, and serves data is where most engineering effort goes.2. **ETL is a pattern, not a technology.** Extract handles unreliable sources with defensive programming. Transform encodes domain knowledge as features. Load writes results atomically with content hashes for verification.3. **Train/serve skew is a silent killer.** Use the same code path for computing features in training and in serving. Feature stores formalise this by providing a single source of truth for feature definitions and values.4. **Validate data at every pipeline boundary.** Check schemas, null rates, value ranges, and distributions after each stage. A pipeline that halts on bad data is better than one that silently produces bad predictions.5. **Orchestrate with DAGs.** Express pipeline dependencies explicitly using tools like Airflow, Prefect, or DVC. This gives you automatic retries, dependency tracking, and the confidence that no stage runs before its inputs are ready and validated.## Exercises {#sec-pipelines-exercises}1. Extend the `validate_features` function from @sec-data-validation to include a **distribution drift check**: compare the mean and standard deviation of each numerical column against reference statistics (provided as a dictionary), and flag any column where the mean has shifted by more than 2 standard deviations. Test it by creating a "drifted" dataset where one feature's mean has shifted.2. Write a feature engineering function that takes the raw transaction data from @sec-feature-engineering and computes **time-windowed features**: for each user, calculate total spend and transaction count over the last 7 days, 30 days, and 90 days relative to a given reference date. Discuss why the reference date matters for avoiding data leakage.3. Demonstrate the train/serve skew problem with a `StandardScaler`: train a logistic regression model using `fit_transform` on the training set, then show that (a) using `transform` with the training scaler produces correct serving predictions, and (b) calling `fit_transform` on each new observation individually produces wrong predictions. Measure the average prediction error between the two approaches.4. **Conceptual:** Your company has 15 ML models in production, and 8 of them use a feature called "customer lifetime value" (CLV). Each model team computes CLV slightly differently — different time windows, different revenue definitions, different null handling. A colleague proposes building a feature store to standardise this. What are the benefits? What are the risks? How would you handle the transition without breaking existing models?5. **Conceptual:** A junior data scientist argues that data validation is unnecessary because "the model will just learn to handle bad data." In what sense are they wrong? In what narrow sense might they have a point? What's the strongest argument for validation that goes beyond model accuracy?