Chapter 19: Testing for Machine Learning
19.1. The Pyramid of ML Tests: Data, Component, Model Quality, Integration
“Testing leads to failure, and failure leads to understanding.” — Burt Rutan
In traditional software engineering, the testing pyramid is a well-established pattern: a wide base of unit tests, a narrower layer of integration tests, and a small apex of end-to-end tests. The rationale is economic—unit tests are fast, cheap, and pinpoint bugs. E2E tests are slow, expensive, and fragile.
Machine Learning systems break this model in fundamental ways. The code is simple (often just a call to model.fit()), but the behavior is entirely determined by data. A bug in an ML system is not a syntax error or a null pointer exception; it is a silent degradation in prediction quality that manifests weeks after deployment when user engagement mysteriously drops by 3%.
The traditional pyramid must be inverted, expanded, and specialized. This chapter presents The Pyramid of ML Tests—a layered testing strategy that addresses the unique challenges of non-deterministic, data-dependent systems in production.
13.1.1. The Anatomy of ML System Failures
Before we can test effectively, we must understand the failure modes unique to ML.
Traditional Software vs. ML Software
| Dimension | Traditional Software | ML Software |
|---|---|---|
| Bug Source | Code logic errors | Data distribution shifts |
| Failure Mode | Crashes, exceptions | Silent accuracy degradation |
| Reproducibility | Deterministic | Stochastic (model init, data sampling) |
| Root Cause | Stack trace points to line | Model internals are opaque |
| Validation | Assert output == expected | Assert accuracy > threshold |
The “Nine Circles” of ML Failures
1. Data Validation Failures:
- Schema drift: A new feature appears in production that wasn’t in training data.
- Missing values: 15% of rows have
NULLin a critical feature. - Distribution shift: Mean of a feature changes from 50 to 500.
2. Feature Engineering Bugs:
- Train-serve skew: Feature normalization uses training stats at inference.
- Leakage: Target variable accidentally encoded in features.
- Temporal leakage: Using “future” data to predict the past.
3. Model Training Bugs:
- Overfitting: Perfect training accuracy, 50% validation accuracy.
- Underfitting: Model hasn’t converged; stopped training too early.
- Class imbalance ignored: 99.9% of data is negative, model predicts all negative.
4. Model Evaluation Errors:
- Wrong metric: Optimizing accuracy when you need recall.
- Data leakage in validation split.
- Test set contamination: Accidentally including training samples in test set.
5. Serialization/Deserialization Bugs:
- Model saved in PyTorch 1.9, loaded in PyTorch 2.0 (incompatibility).
- Pickle security vulnerabilities.
- Quantization applied incorrectly, destroying accuracy.
6. Integration Failures:
- Preprocessing pipeline mismatch between training and serving.
- API contract violation: Serving expects JSON, client sends CSV.
- Timeout: Model takes 5 seconds to infer, but SLA is 100ms.
7. Infrastructure Failures:
- GPU out of memory during batch inference.
- Disk full when saving checkpoints.
- Network partition splits distributed training cluster.
8. Monitoring Blind Spots:
- No drift detection; model degrades for 3 months unnoticed.
- Latency regression: p99 latency creeps from 50ms to 500ms.
- Cost explosion: Inference costs 10x more than expected.
9. Adversarial and Safety Failures:
- Model outputs toxic content.
- Prompt injection bypasses safety filters.
- Adversarial examples fool the model (e.g., sticker on stop sign).
13.1.2. The ML Testing Pyramid (The Four Layers)
Unlike the traditional three-layer pyramid, ML systems require four distinct testing layers:
▲
/ \
/ \
/ 4 \ Integration Tests
/───────\ (End-to-End Scenarios)
/ 3 \ Model Quality Tests
/───────────\ (Behavioral, Invariance, Minimum Functionality)
/ 2 \ Component Tests
/───────────────\ (Feature Engineering, Preprocessing, Postprocessing)
/ 1 \ Data Validation Tests
/───────────────────\ (Schema, Distribution, Integrity)
/_____________________\
Layer 1: Data Validation Tests (Foundation)
- Volume: Thousands of tests, run on every data batch.
- Speed: Milliseconds per test.
- Purpose: Catch data quality issues before they poison the model.
Layer 2: Component Tests
- Volume: Hundreds of tests.
- Speed: Seconds per test suite.
- Purpose: Ensure feature engineering, preprocessing, and postprocessing logic is correct.
Layer 3: Model Quality Tests
- Volume: Tens of tests.
- Speed: Minutes to hours (requires model training/inference).
- Purpose: Validate model behavior, performance, and robustness.
Layer 4: Integration Tests
- Volume: A few critical paths.
- Speed: Hours (full pipeline execution).
- Purpose: Verify the entire ML pipeline works end-to-end.
13.1.3. Layer 1: Data Validation Tests
Data is the fuel of ML. Poisoned fuel destroys the engine. Data validation must be continuous, automated, and granular.
Schema Validation
The first line of defense: Does the data match the expected structure?
What to Test:
- Column names and order
- Data types (int, float, string, categorical)
- Nullability constraints
- Categorical value domains (e.g.,
statusmust be in['active', 'inactive', 'pending'])
Implementation with Great Expectations:
import great_expectations as gx
# Define expectations
context = gx.get_context()
# Create expectation suite
suite = context.create_expectation_suite("user_features_v1")
# Add expectations
suite.add_expectation(
gx.core.ExpectationConfiguration(
expectation_type="expect_table_columns_to_match_ordered_list",
kwargs={
"column_list": ["user_id", "age", "spend_30d", "country", "label"]
}
)
)
suite.add_expectation(
gx.core.ExpectationConfiguration(
expectation_type="expect_column_values_to_be_of_type",
kwargs={"column": "age", "type_": "int"}
)
)
suite.add_expectation(
gx.core.ExpectationConfiguration(
expectation_type="expect_column_values_to_not_be_null",
kwargs={"column": "user_id"}
)
)
# Validate data
batch = context.get_batch(
datasource_name="my_datasource",
data_asset_name="user_features",
batch_spec_passthrough={"path": "s3://bucket/data.parquet"}
)
results = context.run_validation_operator(
"action_list_operator",
assets_to_validate=[batch],
expectation_suite_name="user_features_v1"
)
if not results["success"]:
raise ValueError("Data validation failed!")
Distribution Validation (Drift Detection)
Schema can be correct, but the statistical properties might have shifted.
What to Test:
- Mean, median, std deviation of numeric features
- Min/max bounds
- Cardinality of categorical features
- Percentage of missing values
- Correlations between features
Statistical Tests:
-
Kolmogorov-Smirnov (KS) Test: Detects if two distributions are different.
- Null Hypothesis: Training and production distributions are the same.
- If p-value < 0.05, reject null → distribution drift detected.
-
Population Stability Index (PSI): $$\text{PSI} = \sum_{i=1}^{n} (\text{Production}_i - \text{Train}_i) \times \ln\left(\frac{\text{Production}_i}{\text{Train}_i}\right)$$
- PSI < 0.1: No significant change
- PSI 0.1-0.2: Moderate drift
- PSI > 0.2: Significant drift (retrain model)
Implementation:
import numpy as np
from scipy.stats import ks_2samp
def detect_numerical_drift(train_data, prod_data, feature_name, threshold=0.05):
"""
Use KS test to detect drift in a numerical feature.
"""
train_values = train_data[feature_name].dropna()
prod_values = prod_data[feature_name].dropna()
statistic, p_value = ks_2samp(train_values, prod_values)
if p_value < threshold:
print(f"DRIFT DETECTED in {feature_name}: p-value={p_value:.4f}")
return True
return False
def calculate_psi(train_data, prod_data, feature_name, bins=10):
"""
Calculate Population Stability Index for a feature.
"""
# Bin the data
train_values = train_data[feature_name].dropna()
prod_values = prod_data[feature_name].dropna()
# Create bins based on training data quantiles
bin_edges = np.histogram_bin_edges(train_values, bins=bins)
# Calculate distributions
train_hist, _ = np.histogram(train_values, bins=bin_edges)
prod_hist, _ = np.histogram(prod_values, bins=bin_edges)
# Normalize to percentages
train_pct = train_hist / train_hist.sum()
prod_pct = prod_hist / prod_hist.sum()
# Avoid log(0)
train_pct = np.where(train_pct == 0, 0.0001, train_pct)
prod_pct = np.where(prod_pct == 0, 0.0001, prod_pct)
# Calculate PSI
psi = np.sum((prod_pct - train_pct) * np.log(prod_pct / train_pct))
print(f"PSI for {feature_name}: {psi:.4f}")
if psi > 0.2:
print(f" WARNING: Significant drift detected!")
return psi
Data Integrity Tests
Beyond schema and distribution, test the semantic correctness of data.
Examples:
agemust be between 0 and 120emailmust match regex patterntimestampmust not be in the futuretotal_pricemust equalquantity * unit_price- Referential integrity:
user_idin transactions must exist in users table
AWS Implementation: AWS Glue DataBrew:
import boto3
databrew = boto3.client('databrew')
# Create a data quality ruleset
ruleset = databrew.create_ruleset(
Name='user_features_validation',
Rules=[
{
'Name': 'age_range_check',
'ColumnSelectors': [{'Name': 'age'}],
'RuleConditions': [
{
'Condition': 'GREATER_THAN_OR_EQUAL',
'Value': '0'
},
{
'Condition': 'LESS_THAN_OR_EQUAL',
'Value': '120'
}
]
},
{
'Name': 'email_format_check',
'ColumnSelectors': [{'Name': 'email'}],
'RuleConditions': [
{
'Condition': 'MATCHES_PATTERN',
'Value': '^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\\.[a-zA-Z]{2,}$'
}
]
}
]
)
GCP Implementation: Vertex AI Data Quality:
from google.cloud import aiplatform
aiplatform.init(project='my-project', location='us-central1')
# Create data quality spec
data_quality_spec = {
"dataset": "bq://my-project.my_dataset.user_features",
"validations": [
{
"column": "age",
"checks": [
{"min": 0, "max": 120}
]
},
{
"column": "email",
"checks": [
{"regex": "^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\\.[a-zA-Z]{2,}$"}
]
},
{
"column": "user_id",
"checks": [
{"not_null": True}
]
}
]
}
13.1.4. Layer 2: Component Tests (Feature Engineering & Preprocessing)
The ML pipeline has many components beyond the model: feature transformers, encoders, scalers. Each must be tested in isolation.
Testing Feature Transformations
Example: Log Transform
import numpy as np
import pytest
def log_transform(x):
"""Apply log1p transformation."""
return np.log1p(x)
def test_log_transform_positive_values():
"""Test that log transform works correctly on positive values."""
input_val = np.array([0, 1, 10, 100])
expected = np.array([0, 0.693147, 2.397895, 4.615120])
result = log_transform(input_val)
np.testing.assert_array_almost_equal(result, expected, decimal=5)
def test_log_transform_handles_zero():
"""Test that log1p(0) = 0."""
assert log_transform(0) == 0
def test_log_transform_rejects_negative():
"""Log of negative number should raise error or return NaN."""
with pytest.raises(ValueError):
log_transform(np.array([-1, -5]))
Testing Encoders (Categorical → Numerical)
from sklearn.preprocessing import LabelEncoder
def test_label_encoder_consistency():
"""Ensure encoder produces consistent mappings."""
encoder = LabelEncoder()
categories = ['red', 'blue', 'green', 'red', 'blue']
encoded = encoder.fit_transform(categories)
# Test inverse transform
decoded = encoder.inverse_transform(encoded)
assert list(decoded) == categories
def test_label_encoder_handles_unseen_category():
"""Encoder should handle or reject unseen categories gracefully."""
encoder = LabelEncoder()
encoder.fit(['red', 'blue', 'green'])
with pytest.raises(ValueError):
encoder.transform(['yellow']) # Unseen category
Testing Scalers (Normalization)
from sklearn.preprocessing import StandardScaler
import numpy as np
def test_standard_scaler_zero_mean():
"""After scaling, mean should be ~0."""
data = np.array([[1], [2], [3], [4], [5]])
scaler = StandardScaler()
scaled = scaler.fit_transform(data)
assert np.abs(scaled.mean()) < 1e-7
def test_standard_scaler_unit_variance():
"""After scaling, std should be ~1."""
data = np.array([[10], [20], [30], [40], [50]])
scaler = StandardScaler()
scaled = scaler.fit_transform(data)
assert np.abs(scaled.std() - 1.0) < 1e-7
Testing Pipelines (scikit-learn)
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA
def test_preprocessing_pipeline():
"""Test full preprocessing pipeline."""
pipeline = Pipeline([
('scaler', StandardScaler()),
('pca', PCA(n_components=2))
])
# Dummy data
X_train = np.random.randn(100, 5)
X_test = np.random.randn(20, 5)
# Fit on training data
pipeline.fit(X_train)
X_train_transformed = pipeline.transform(X_train)
# Validate output shape
assert X_train_transformed.shape == (100, 2)
# Test transform on new data
X_test_transformed = pipeline.transform(X_test)
assert X_test_transformed.shape == (20, 2)
13.1.5. Layer 3: Model Quality Tests
This is where ML testing diverges most dramatically from traditional software. We cannot assert output == expected_output because ML models are probabilistic.
Smoke Tests (Model Loads and Predicts)
The most basic test: Can the model be loaded and produce predictions without crashing?
import torch
import pytest
def test_model_loads_from_checkpoint():
"""Test that model can be loaded from saved checkpoint."""
model = MyModel()
checkpoint_path = "s3://bucket/models/model_v1.pt"
# Load checkpoint
state_dict = torch.load(checkpoint_path)
model.load_state_dict(state_dict)
# Model should be in eval mode
model.eval()
# No exception should be raised
assert True
def test_model_inference_runs():
"""Test that model can perform inference on dummy input."""
model = load_model("s3://bucket/models/model_v1.pt")
dummy_input = torch.randn(1, 3, 224, 224) # Batch of 1 image
with torch.no_grad():
output = model(dummy_input)
# Output should have expected shape (1, num_classes)
assert output.shape == (1, 1000)
def test_model_output_is_valid_probability():
"""For classification, output should be valid probabilities."""
model = load_model("s3://bucket/models/model_v1.pt")
dummy_input = torch.randn(1, 3, 224, 224)
with torch.no_grad():
logits = model(dummy_input)
probs = torch.softmax(logits, dim=1)
# Probabilities should sum to 1
assert torch.allclose(probs.sum(dim=1), torch.tensor([1.0]), atol=1e-6)
# All probabilities should be in [0, 1]
assert torch.all(probs >= 0)
assert torch.all(probs <= 1)
Accuracy Threshold Tests
def test_model_meets_minimum_accuracy():
"""Model must meet minimum accuracy on validation set."""
model = load_model("s3://bucket/models/model_v1.pt")
val_loader = load_validation_data()
accuracy = evaluate_accuracy(model, val_loader)
MIN_ACCURACY = 0.85
assert accuracy >= MIN_ACCURACY, f"Accuracy {accuracy:.3f} < {MIN_ACCURACY}"
def test_model_regression_metrics():
"""For regression, test MAE and RMSE."""
model = load_regression_model("s3://bucket/models/regressor_v1.pt")
val_data = load_validation_data()
predictions = model.predict(val_data.X)
mae = mean_absolute_error(val_data.y, predictions)
rmse = np.sqrt(mean_squared_error(val_data.y, predictions))
assert mae < 10.0, f"MAE {mae:.2f} exceeds threshold"
assert rmse < 15.0, f"RMSE {rmse:.2f} exceeds threshold"
Slice-Based Evaluation
Overall accuracy can hide problems in specific subgroups.
def test_model_performance_by_demographic_slice():
"""Test model accuracy across demographic slices."""
model = load_model("s3://bucket/models/model_v1.pt")
test_data = load_test_data_with_demographics()
# Evaluate on different slices
slices = {
"age_under_30": test_data[test_data['age'] < 30],
"age_30_to_50": test_data[(test_data['age'] >= 30) & (test_data['age'] < 50)],
"age_over_50": test_data[test_data['age'] >= 50]
}
MIN_ACCURACY_PER_SLICE = 0.80
for slice_name, slice_data in slices.items():
accuracy = evaluate_accuracy(model, slice_data)
assert accuracy >= MIN_ACCURACY_PER_SLICE, \
f"Accuracy on {slice_name} is {accuracy:.3f}, below threshold"
13.1.6. Behavioral Testing: The Checklist Paradigm
Behavioral Testing was formalized by Ribeiro et al. (2020) in the paper “Beyond Accuracy: Behavioral Testing of NLP Models with CheckList”.
The idea: Instead of just measuring aggregate accuracy, define specific behaviors the model must exhibit, then test them systematically.
Three Types of Behavioral Tests
1. Invariance Tests (INV): The output should NOT change when certain inputs are modified.
Example: Sentiment analysis should be invariant to typos.
- Input: “This movie is great!”
- Perturbed: “This movie is grate!” (typo)
- Expected: Both should have positive sentiment
2. Directional Tests (DIR): A specific change to input should cause a predictable change in output.
Example: Adding “not” should flip sentiment.
- Input: “This movie is great!”
- Modified: “This movie is not great!”
- Expected: Sentiment should flip from positive to negative
3. Minimum Functionality Tests (MFT): The model should handle simple, unambiguous cases correctly.
Example: Obviously positive sentences.
- Input: “I love this product, it’s amazing!”
- Expected: Positive sentiment (with high confidence)
Implementation of Behavioral Tests
import pytest
def test_sentiment_invariance_to_typos():
"""Sentiment should be invariant to common typos."""
model = load_sentiment_model()
test_cases = [
("This is fantastic", "This is fantastik"),
("I love this", "I luv this"),
("Amazing quality", "Amazng quality")
]
for original, perturbed in test_cases:
sentiment_original = model.predict(original)
sentiment_perturbed = model.predict(perturbed)
assert sentiment_original == sentiment_perturbed, \
f"Sentiment changed: {original} → {perturbed}"
def test_sentiment_directional_negation():
"""Adding 'not' should flip sentiment."""
model = load_sentiment_model()
test_cases = [
("This is great", "This is not great"),
("I love it", "I do not love it"),
("Excellent product", "Not an excellent product")
]
for positive, negative in test_cases:
sentiment_pos = model.predict(positive)
sentiment_neg = model.predict(negative)
assert sentiment_pos == "positive"
assert sentiment_neg == "negative", \
f"Negation failed: {positive} → {negative}"
def test_sentiment_minimum_functionality():
"""Model should handle obvious cases."""
model = load_sentiment_model()
positive_cases = [
"I absolutely love this!",
"Best purchase ever!",
"Five stars, highly recommend!"
]
negative_cases = [
"This is terrible.",
"Worst experience of my life.",
"Complete waste of money."
]
for text in positive_cases:
assert model.predict(text) == "positive", f"Failed on: {text}"
for text in negative_cases:
assert model.predict(text) == "negative", f"Failed on: {text}"
13.1.7. Metamorphic Testing
When you don’t have labeled test data, Metamorphic Testing defines relationships between inputs and outputs.
Metamorphic Relation: A transformation $T$ applied to input $x$ should produce a predictable transformation $T’$ to the output $f(x)$.
Example: Image Classifier
Metamorphic Relation: Rotating an image by 360° should produce the same classification.
def test_image_classifier_rotation_invariance():
"""Classifier should be invariant to 360° rotation."""
model = load_image_classifier()
image = load_test_image("dog.jpg")
# Predict on original
pred_original = model.predict(image)
# Rotate 360° (identity transformation)
image_rotated = rotate_image(image, angle=360)
pred_rotated = model.predict(image_rotated)
assert pred_original == pred_rotated
Metamorphic Relation: Flipping an image horizontally should not change the class (for symmetric objects).
def test_image_classifier_horizontal_flip():
"""For symmetric classes (dogs, cats), horizontal flip should not change prediction."""
model = load_image_classifier()
symmetric_classes = ['dog', 'cat', 'bird']
for class_name in symmetric_classes:
image = load_test_image(f"{class_name}.jpg")
pred_original = model.predict(image)
image_flipped = flip_horizontal(image)
pred_flipped = model.predict(image_flipped)
assert pred_original == pred_flipped, \
f"Prediction changed on flip for {class_name}"
13.1.8. Layer 4: Integration Tests (End-to-End Pipeline)
Integration tests validate the entire ML pipeline from data ingestion to prediction serving.
End-to-End Training Pipeline Test
def test_training_pipeline_end_to_end():
"""Test full training pipeline: data → train → validate → save."""
# 1. Ingest data
raw_data = ingest_from_s3("s3://bucket/raw_data.csv")
assert len(raw_data) > 1000, "Insufficient training data"
# 2. Preprocess
processed_data = preprocess_pipeline(raw_data)
assert 'label' in processed_data.columns
assert processed_data.isnull().sum().sum() == 0, "Nulls remain after preprocessing"
# 3. Train model
model = train_model(processed_data)
assert model is not None
# 4. Evaluate
val_data = load_validation_data()
accuracy = evaluate_accuracy(model, val_data)
assert accuracy > 0.8, f"Trained model accuracy {accuracy:.3f} too low"
# 5. Save model
save_path = "s3://bucket/models/test_model.pt"
save_model(model, save_path)
# 6. Verify model can be loaded
loaded_model = load_model(save_path)
assert loaded_model is not None
End-to-End Inference Pipeline Test
def test_inference_pipeline_end_to_end():
"""Test full inference pipeline: request → preprocess → predict → postprocess → response."""
# 1. Simulate API request
request_payload = {
"user_id": 12345,
"features": {
"age": 35,
"spend_30d": 150.50,
"country": "US"
}
}
# 2. Validate request
validate_request_schema(request_payload)
# 3. Fetch additional features (e.g., from Feature Store)
enriched_features = fetch_features(request_payload['user_id'])
# 4. Preprocess
model_input = preprocess_for_inference(enriched_features)
# 5. Load model
model = load_model("s3://bucket/models/prod_model.pt")
# 6. Predict
prediction = model.predict(model_input)
# 7. Postprocess
response = {
"user_id": request_payload['user_id'],
"prediction": float(prediction),
"confidence": 0.92
}
# 8. Validate response
assert 'prediction' in response
assert 0 <= response['confidence'] <= 1
13.1.9. Shadow Mode Testing (Differential Testing)
Before deploying a new model to production, run it in shadow mode: serve predictions from both the old and new models, but only return the old model’s predictions to users. Compare outputs.
Architecture
User Request
|
v
Load Balancer
|
+----> Old Model (Production) -----> Return to User
|
+----> New Model (Shadow) -----> Log predictions (don't serve)
Compare with Old Model
Implementation (AWS Lambda Example)
import boto3
import json
sagemaker_runtime = boto3.client('sagemaker-runtime')
def lambda_handler(event, context):
"""
Invoke both prod and shadow models, return prod result, log comparison.
"""
input_data = json.loads(event['body'])
# Invoke production model
response_prod = sagemaker_runtime.invoke_endpoint(
EndpointName='prod-model-endpoint',
Body=json.dumps(input_data),
ContentType='application/json'
)
prediction_prod = json.loads(response_prod['Body'].read())
# Invoke shadow model
response_shadow = sagemaker_runtime.invoke_endpoint(
EndpointName='shadow-model-endpoint',
Body=json.dumps(input_data),
ContentType='application/json'
)
prediction_shadow = json.loads(response_shadow['Body'].read())
# Log comparison
comparison = {
'input': input_data,
'prod_prediction': prediction_prod,
'shadow_prediction': prediction_shadow,
'agreement': (prediction_prod['class'] == prediction_shadow['class'])
}
# Send to CloudWatch Logs or S3
log_comparison(comparison)
# Return production result to user
return {
'statusCode': 200,
'body': json.dumps(prediction_prod)
}
def log_comparison(comparison):
"""Log shadow mode comparison to S3."""
s3 = boto3.client('s3')
timestamp = int(time.time())
s3.put_object(
Bucket='ml-shadow-logs',
Key=f'comparisons/{timestamp}.json',
Body=json.dumps(comparison)
)
Analyzing Shadow Mode Results
import pandas as pd
def analyze_shadow_mode_logs():
"""Analyze agreement between prod and shadow models."""
# Load logs from S3
logs = load_logs_from_s3('ml-shadow-logs/comparisons/')
df = pd.DataFrame(logs)
# Calculate agreement rate
agreement_rate = df['agreement'].mean()
print(f"Agreement Rate: {agreement_rate:.2%}")
# Find cases of disagreement
disagreements = df[df['agreement'] == False]
# Analyze patterns
print(f"Total disagreements: {len(disagreements)}")
print("\nSample disagreements:")
print(disagreements[['prod_prediction', 'shadow_prediction']].head(10))
# Statistical test: Is shadow model significantly better?
# (Requires human labels for a sample)
13.1.10. Performance Testing (Latency & Throughput)
ML models must meet non-functional requirements: latency SLAs, throughput targets, memory limits.
Latency Testing
import time
import numpy as np
def test_inference_latency_p99():
"""Test that p99 latency is under SLA."""
model = load_model("s3://bucket/models/prod_model.pt")
test_inputs = generate_test_batch(size=1000)
latencies = []
for input_data in test_inputs:
start = time.perf_counter()
_ = model.predict(input_data)
end = time.perf_counter()
latencies.append((end - start) * 1000) # Convert to ms
p99_latency = np.percentile(latencies, 99)
SLA_MS = 100
assert p99_latency < SLA_MS, \
f"p99 latency {p99_latency:.2f}ms exceeds SLA of {SLA_MS}ms"
Throughput Testing
def test_batch_inference_throughput():
"""Test that model can process required throughput."""
model = load_model("s3://bucket/models/prod_model.pt")
batch_size = 32
num_batches = 100
start = time.time()
for _ in range(num_batches):
batch = generate_test_batch(size=batch_size)
_ = model.predict(batch)
end = time.time()
duration = end - start
total_samples = batch_size * num_batches
throughput = total_samples / duration # samples per second
MIN_THROUGHPUT = 500 # samples/sec
assert throughput >= MIN_THROUGHPUT, \
f"Throughput {throughput:.0f} samples/s < {MIN_THROUGHPUT}"
Memory Profiling
import psutil
import torch
def test_model_memory_footprint():
"""Ensure model fits in available GPU memory."""
model = load_model("s3://bucket/models/prod_model.pt")
model = model.cuda()
# Measure GPU memory
torch.cuda.reset_peak_memory_stats()
dummy_input = torch.randn(32, 3, 224, 224).cuda()
_ = model(dummy_input)
peak_memory_mb = torch.cuda.max_memory_allocated() / (1024 ** 2)
MAX_MEMORY_MB = 10000 # 10 GB
assert peak_memory_mb < MAX_MEMORY_MB, \
f"Peak GPU memory {peak_memory_mb:.0f}MB exceeds limit {MAX_MEMORY_MB}MB"
13.1.11. Testing in CI/CD Pipelines
Tests are worthless if they’re not automated. Integrate ML tests into your CI/CD pipeline.
GitHub Actions Example
name: ML Model Tests
on:
pull_request:
paths:
- 'src/models/**'
- 'src/features/**'
- 'tests/**'
jobs:
data-validation:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: '3.10'
- name: Install dependencies
run: |
pip install great-expectations pandas
- name: Run data validation tests
run: |
python tests/test_data_validation.py
component-tests:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Run feature engineering tests
run: |
pytest tests/test_features.py -v
model-quality-tests:
runs-on: [self-hosted, gpu]
steps:
- uses: actions/checkout@v3
- name: Download test model
run: |
aws s3 cp s3://ml-models/candidate_model.pt ./model.pt
- name: Run behavioral tests
run: |
pytest tests/test_model_behavior.py -v
- name: Run performance tests
run: |
pytest tests/test_model_performance.py -v
integration-tests:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Run end-to-end pipeline test
run: |
python tests/test_integration.py
13.1.12. Cloud-Native Testing Infrastructure
AWS SageMaker Processing for Test Execution
from sagemaker.processing import ScriptProcessor, ProcessingInput, ProcessingOutput
processor = ScriptProcessor(
image_uri='python:3.10',
role='SageMakerRole',
instance_count=1,
instance_type='ml.m5.xlarge'
)
processor.run(
code='tests/run_all_tests.py',
inputs=[
ProcessingInput(
source='s3://ml-data/test_data/',
destination='/opt/ml/processing/input'
)
],
outputs=[
ProcessingOutput(
source='/opt/ml/processing/output',
destination='s3://ml-test-results/'
)
],
arguments=['--test-suite', 'integration']
)
GCP Vertex AI Pipelines for Testing
from google.cloud import aiplatform
from kfp.v2 import dsl
@dsl.component
def run_data_validation_tests():
import great_expectations as gx
# ... validation logic ...
return "PASSED"
@dsl.component
def run_model_tests(model_uri: str):
# ... model testing logic ...
return "PASSED"
@dsl.pipeline(name='ml-testing-pipeline')
def testing_pipeline():
data_validation = run_data_validation_tests()
model_tests = run_model_tests(
model_uri='gs://ml-models/candidate_model'
).after(data_validation)
aiplatform.PipelineJob(
display_name='ml-testing-pipeline',
template_path='pipeline.json',
pipeline_root='gs://ml-pipelines/'
).run()
13.1.13. Regression Testing (Model Versioning)
When you update a model, ensure you don’t regress on previously-working cases.
Building a Test Suite Over Time
class RegressionTestSuite:
"""Accumulate test cases from production failures."""
def __init__(self, storage_path='s3://ml-tests/regression/'):
self.storage_path = storage_path
self.test_cases = self.load_test_cases()
def load_test_cases(self):
"""Load all regression test cases from storage."""
# Load from S3/GCS
return load_from_storage(self.storage_path)
def add_test_case(self, input_data, expected_output, description):
"""Add a new regression test case."""
test_case = {
'input': input_data,
'expected': expected_output,
'description': description,
'added_date': datetime.now().isoformat()
}
self.test_cases.append(test_case)
self.save_test_cases()
def run_all_tests(self, model):
"""Run all regression tests on a new model."""
failures = []
for i, test_case in enumerate(self.test_cases):
prediction = model.predict(test_case['input'])
if prediction != test_case['expected']:
failures.append({
'test_id': i,
'description': test_case['description'],
'expected': test_case['expected'],
'got': prediction
})
if failures:
print(f"REGRESSION DETECTED: {len(failures)} tests failed")
for failure in failures:
print(f" - {failure['description']}")
return False
print(f"All {len(self.test_cases)} regression tests passed")
return True
13.1.14. Summary: The Testing Strategy
Testing machine learning systems requires a paradigm shift from traditional software testing. The four-layer pyramid provides a structured approach:
1. Data Validation (Foundation):
- Schema validation (Great Expectations)
- Distribution drift detection (KS test, PSI)
- Integrity constraints
- Run on every batch, every day
2. Component Tests (Feature Engineering):
- Unit tests for transformers, encoders, scalers
- Pipeline integration tests
- Fast, deterministic, run on every commit
3. Model Quality Tests (Behavioral):
- Smoke tests (model loads, predicts)
- Accuracy threshold tests
- Slice-based evaluation
- Invariance, directional, and minimum functionality tests
- Run on every model candidate
4. Integration Tests (End-to-End):
- Full pipeline tests (data → train → serve)
- Shadow mode differential testing
- Performance tests (latency, throughput, memory)
- Run before production deployment
Key Principles:
- Automate Everything: Tests must run in CI/CD without human intervention
- Fail Fast: Catch issues in Layer 1 before they reach Layer 4
- Accumulate Knowledge: Build regression test suites from production failures
- Monitor in Production: Testing doesn’t end at deployment; continuous validation is required
Cloud Integration:
- AWS: SageMaker Processing, Glue DataBrew, CloudWatch
- GCP: Vertex AI Pipelines, Dataflow, Cloud Monitoring
The cost of a bug in production is exponentially higher than catching it in testing. For ML systems, a silent accuracy degradation can cost millions in lost revenue or damaged reputation. Invest in comprehensive testing infrastructure—it’s not overhead, it’s insurance.
In the next chapter, we will explore Continuous Training (CT) Orchestration, where we automate the retraining and deployment of models as data evolves.