Keyboard shortcuts

Press or to navigate between chapters

Press ? to show this help

Press Esc to hide this help

Chapter 19: Testing for Machine Learning

19.1. The Pyramid of ML Tests: Data, Component, Model Quality, Integration

“Testing leads to failure, and failure leads to understanding.” — Burt Rutan

In traditional software engineering, the testing pyramid is a well-established pattern: a wide base of unit tests, a narrower layer of integration tests, and a small apex of end-to-end tests. The rationale is economic—unit tests are fast, cheap, and pinpoint bugs. E2E tests are slow, expensive, and fragile.

Machine Learning systems break this model in fundamental ways. The code is simple (often just a call to model.fit()), but the behavior is entirely determined by data. A bug in an ML system is not a syntax error or a null pointer exception; it is a silent degradation in prediction quality that manifests weeks after deployment when user engagement mysteriously drops by 3%.

The traditional pyramid must be inverted, expanded, and specialized. This chapter presents The Pyramid of ML Tests—a layered testing strategy that addresses the unique challenges of non-deterministic, data-dependent systems in production.


13.1.1. The Anatomy of ML System Failures

Before we can test effectively, we must understand the failure modes unique to ML.

Traditional Software vs. ML Software

DimensionTraditional SoftwareML Software
Bug SourceCode logic errorsData distribution shifts
Failure ModeCrashes, exceptionsSilent accuracy degradation
ReproducibilityDeterministicStochastic (model init, data sampling)
Root CauseStack trace points to lineModel internals are opaque
ValidationAssert output == expectedAssert accuracy > threshold

The “Nine Circles” of ML Failures

1. Data Validation Failures:

  • Schema drift: A new feature appears in production that wasn’t in training data.
  • Missing values: 15% of rows have NULL in a critical feature.
  • Distribution shift: Mean of a feature changes from 50 to 500.

2. Feature Engineering Bugs:

  • Train-serve skew: Feature normalization uses training stats at inference.
  • Leakage: Target variable accidentally encoded in features.
  • Temporal leakage: Using “future” data to predict the past.

3. Model Training Bugs:

  • Overfitting: Perfect training accuracy, 50% validation accuracy.
  • Underfitting: Model hasn’t converged; stopped training too early.
  • Class imbalance ignored: 99.9% of data is negative, model predicts all negative.

4. Model Evaluation Errors:

  • Wrong metric: Optimizing accuracy when you need recall.
  • Data leakage in validation split.
  • Test set contamination: Accidentally including training samples in test set.

5. Serialization/Deserialization Bugs:

  • Model saved in PyTorch 1.9, loaded in PyTorch 2.0 (incompatibility).
  • Pickle security vulnerabilities.
  • Quantization applied incorrectly, destroying accuracy.

6. Integration Failures:

  • Preprocessing pipeline mismatch between training and serving.
  • API contract violation: Serving expects JSON, client sends CSV.
  • Timeout: Model takes 5 seconds to infer, but SLA is 100ms.

7. Infrastructure Failures:

  • GPU out of memory during batch inference.
  • Disk full when saving checkpoints.
  • Network partition splits distributed training cluster.

8. Monitoring Blind Spots:

  • No drift detection; model degrades for 3 months unnoticed.
  • Latency regression: p99 latency creeps from 50ms to 500ms.
  • Cost explosion: Inference costs 10x more than expected.

9. Adversarial and Safety Failures:

  • Model outputs toxic content.
  • Prompt injection bypasses safety filters.
  • Adversarial examples fool the model (e.g., sticker on stop sign).

13.1.2. The ML Testing Pyramid (The Four Layers)

Unlike the traditional three-layer pyramid, ML systems require four distinct testing layers:

                    ▲
                   / \
                  /   \
                 /  4  \    Integration Tests
                /───────\   (End-to-End Scenarios)
               /    3    \  Model Quality Tests
              /───────────\ (Behavioral, Invariance, Minimum Functionality)
             /      2      \ Component Tests
            /───────────────\ (Feature Engineering, Preprocessing, Postprocessing)
           /        1        \ Data Validation Tests
          /───────────────────\ (Schema, Distribution, Integrity)
         /_____________________\

Layer 1: Data Validation Tests (Foundation)

  • Volume: Thousands of tests, run on every data batch.
  • Speed: Milliseconds per test.
  • Purpose: Catch data quality issues before they poison the model.

Layer 2: Component Tests

  • Volume: Hundreds of tests.
  • Speed: Seconds per test suite.
  • Purpose: Ensure feature engineering, preprocessing, and postprocessing logic is correct.

Layer 3: Model Quality Tests

  • Volume: Tens of tests.
  • Speed: Minutes to hours (requires model training/inference).
  • Purpose: Validate model behavior, performance, and robustness.

Layer 4: Integration Tests

  • Volume: A few critical paths.
  • Speed: Hours (full pipeline execution).
  • Purpose: Verify the entire ML pipeline works end-to-end.

13.1.3. Layer 1: Data Validation Tests

Data is the fuel of ML. Poisoned fuel destroys the engine. Data validation must be continuous, automated, and granular.

Schema Validation

The first line of defense: Does the data match the expected structure?

What to Test:

  • Column names and order
  • Data types (int, float, string, categorical)
  • Nullability constraints
  • Categorical value domains (e.g., status must be in ['active', 'inactive', 'pending'])

Implementation with Great Expectations:

import great_expectations as gx

# Define expectations
context = gx.get_context()

# Create expectation suite
suite = context.create_expectation_suite("user_features_v1")

# Add expectations
suite.add_expectation(
    gx.core.ExpectationConfiguration(
        expectation_type="expect_table_columns_to_match_ordered_list",
        kwargs={
            "column_list": ["user_id", "age", "spend_30d", "country", "label"]
        }
    )
)

suite.add_expectation(
    gx.core.ExpectationConfiguration(
        expectation_type="expect_column_values_to_be_of_type",
        kwargs={"column": "age", "type_": "int"}
    )
)

suite.add_expectation(
    gx.core.ExpectationConfiguration(
        expectation_type="expect_column_values_to_not_be_null",
        kwargs={"column": "user_id"}
    )
)

# Validate data
batch = context.get_batch(
    datasource_name="my_datasource",
    data_asset_name="user_features",
    batch_spec_passthrough={"path": "s3://bucket/data.parquet"}
)

results = context.run_validation_operator(
    "action_list_operator",
    assets_to_validate=[batch],
    expectation_suite_name="user_features_v1"
)

if not results["success"]:
    raise ValueError("Data validation failed!")

Distribution Validation (Drift Detection)

Schema can be correct, but the statistical properties might have shifted.

What to Test:

  • Mean, median, std deviation of numeric features
  • Min/max bounds
  • Cardinality of categorical features
  • Percentage of missing values
  • Correlations between features

Statistical Tests:

  1. Kolmogorov-Smirnov (KS) Test: Detects if two distributions are different.

    • Null Hypothesis: Training and production distributions are the same.
    • If p-value < 0.05, reject null → distribution drift detected.
  2. Population Stability Index (PSI): $$\text{PSI} = \sum_{i=1}^{n} (\text{Production}_i - \text{Train}_i) \times \ln\left(\frac{\text{Production}_i}{\text{Train}_i}\right)$$

    • PSI < 0.1: No significant change
    • PSI 0.1-0.2: Moderate drift
    • PSI > 0.2: Significant drift (retrain model)

Implementation:

import numpy as np
from scipy.stats import ks_2samp

def detect_numerical_drift(train_data, prod_data, feature_name, threshold=0.05):
    """
    Use KS test to detect drift in a numerical feature.
    """
    train_values = train_data[feature_name].dropna()
    prod_values = prod_data[feature_name].dropna()

    statistic, p_value = ks_2samp(train_values, prod_values)

    if p_value < threshold:
        print(f"DRIFT DETECTED in {feature_name}: p-value={p_value:.4f}")
        return True
    return False

def calculate_psi(train_data, prod_data, feature_name, bins=10):
    """
    Calculate Population Stability Index for a feature.
    """
    # Bin the data
    train_values = train_data[feature_name].dropna()
    prod_values = prod_data[feature_name].dropna()

    # Create bins based on training data quantiles
    bin_edges = np.histogram_bin_edges(train_values, bins=bins)

    # Calculate distributions
    train_hist, _ = np.histogram(train_values, bins=bin_edges)
    prod_hist, _ = np.histogram(prod_values, bins=bin_edges)

    # Normalize to percentages
    train_pct = train_hist / train_hist.sum()
    prod_pct = prod_hist / prod_hist.sum()

    # Avoid log(0)
    train_pct = np.where(train_pct == 0, 0.0001, train_pct)
    prod_pct = np.where(prod_pct == 0, 0.0001, prod_pct)

    # Calculate PSI
    psi = np.sum((prod_pct - train_pct) * np.log(prod_pct / train_pct))

    print(f"PSI for {feature_name}: {psi:.4f}")
    if psi > 0.2:
        print(f"  WARNING: Significant drift detected!")

    return psi

Data Integrity Tests

Beyond schema and distribution, test the semantic correctness of data.

Examples:

  • age must be between 0 and 120
  • email must match regex pattern
  • timestamp must not be in the future
  • total_price must equal quantity * unit_price
  • Referential integrity: user_id in transactions must exist in users table

AWS Implementation: AWS Glue DataBrew:

import boto3

databrew = boto3.client('databrew')

# Create a data quality ruleset
ruleset = databrew.create_ruleset(
    Name='user_features_validation',
    Rules=[
        {
            'Name': 'age_range_check',
            'ColumnSelectors': [{'Name': 'age'}],
            'RuleConditions': [
                {
                    'Condition': 'GREATER_THAN_OR_EQUAL',
                    'Value': '0'
                },
                {
                    'Condition': 'LESS_THAN_OR_EQUAL',
                    'Value': '120'
                }
            ]
        },
        {
            'Name': 'email_format_check',
            'ColumnSelectors': [{'Name': 'email'}],
            'RuleConditions': [
                {
                    'Condition': 'MATCHES_PATTERN',
                    'Value': '^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\\.[a-zA-Z]{2,}$'
                }
            ]
        }
    ]
)

GCP Implementation: Vertex AI Data Quality:

from google.cloud import aiplatform

aiplatform.init(project='my-project', location='us-central1')

# Create data quality spec
data_quality_spec = {
    "dataset": "bq://my-project.my_dataset.user_features",
    "validations": [
        {
            "column": "age",
            "checks": [
                {"min": 0, "max": 120}
            ]
        },
        {
            "column": "email",
            "checks": [
                {"regex": "^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\\.[a-zA-Z]{2,}$"}
            ]
        },
        {
            "column": "user_id",
            "checks": [
                {"not_null": True}
            ]
        }
    ]
}

13.1.4. Layer 2: Component Tests (Feature Engineering & Preprocessing)

The ML pipeline has many components beyond the model: feature transformers, encoders, scalers. Each must be tested in isolation.

Testing Feature Transformations

Example: Log Transform

import numpy as np
import pytest

def log_transform(x):
    """Apply log1p transformation."""
    return np.log1p(x)

def test_log_transform_positive_values():
    """Test that log transform works correctly on positive values."""
    input_val = np.array([0, 1, 10, 100])
    expected = np.array([0, 0.693147, 2.397895, 4.615120])
    result = log_transform(input_val)
    np.testing.assert_array_almost_equal(result, expected, decimal=5)

def test_log_transform_handles_zero():
    """Test that log1p(0) = 0."""
    assert log_transform(0) == 0

def test_log_transform_rejects_negative():
    """Log of negative number should raise error or return NaN."""
    with pytest.raises(ValueError):
        log_transform(np.array([-1, -5]))

Testing Encoders (Categorical → Numerical)

from sklearn.preprocessing import LabelEncoder

def test_label_encoder_consistency():
    """Ensure encoder produces consistent mappings."""
    encoder = LabelEncoder()
    categories = ['red', 'blue', 'green', 'red', 'blue']
    encoded = encoder.fit_transform(categories)

    # Test inverse transform
    decoded = encoder.inverse_transform(encoded)
    assert list(decoded) == categories

def test_label_encoder_handles_unseen_category():
    """Encoder should handle or reject unseen categories gracefully."""
    encoder = LabelEncoder()
    encoder.fit(['red', 'blue', 'green'])

    with pytest.raises(ValueError):
        encoder.transform(['yellow'])  # Unseen category

Testing Scalers (Normalization)

from sklearn.preprocessing import StandardScaler
import numpy as np

def test_standard_scaler_zero_mean():
    """After scaling, mean should be ~0."""
    data = np.array([[1], [2], [3], [4], [5]])
    scaler = StandardScaler()
    scaled = scaler.fit_transform(data)

    assert np.abs(scaled.mean()) < 1e-7

def test_standard_scaler_unit_variance():
    """After scaling, std should be ~1."""
    data = np.array([[10], [20], [30], [40], [50]])
    scaler = StandardScaler()
    scaled = scaler.fit_transform(data)

    assert np.abs(scaled.std() - 1.0) < 1e-7

Testing Pipelines (scikit-learn)

from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA

def test_preprocessing_pipeline():
    """Test full preprocessing pipeline."""
    pipeline = Pipeline([
        ('scaler', StandardScaler()),
        ('pca', PCA(n_components=2))
    ])

    # Dummy data
    X_train = np.random.randn(100, 5)
    X_test = np.random.randn(20, 5)

    # Fit on training data
    pipeline.fit(X_train)
    X_train_transformed = pipeline.transform(X_train)

    # Validate output shape
    assert X_train_transformed.shape == (100, 2)

    # Test transform on new data
    X_test_transformed = pipeline.transform(X_test)
    assert X_test_transformed.shape == (20, 2)

13.1.5. Layer 3: Model Quality Tests

This is where ML testing diverges most dramatically from traditional software. We cannot assert output == expected_output because ML models are probabilistic.

Smoke Tests (Model Loads and Predicts)

The most basic test: Can the model be loaded and produce predictions without crashing?

import torch
import pytest

def test_model_loads_from_checkpoint():
    """Test that model can be loaded from saved checkpoint."""
    model = MyModel()
    checkpoint_path = "s3://bucket/models/model_v1.pt"

    # Load checkpoint
    state_dict = torch.load(checkpoint_path)
    model.load_state_dict(state_dict)

    # Model should be in eval mode
    model.eval()

    # No exception should be raised
    assert True

def test_model_inference_runs():
    """Test that model can perform inference on dummy input."""
    model = load_model("s3://bucket/models/model_v1.pt")
    dummy_input = torch.randn(1, 3, 224, 224)  # Batch of 1 image

    with torch.no_grad():
        output = model(dummy_input)

    # Output should have expected shape (1, num_classes)
    assert output.shape == (1, 1000)

def test_model_output_is_valid_probability():
    """For classification, output should be valid probabilities."""
    model = load_model("s3://bucket/models/model_v1.pt")
    dummy_input = torch.randn(1, 3, 224, 224)

    with torch.no_grad():
        logits = model(dummy_input)
        probs = torch.softmax(logits, dim=1)

    # Probabilities should sum to 1
    assert torch.allclose(probs.sum(dim=1), torch.tensor([1.0]), atol=1e-6)

    # All probabilities should be in [0, 1]
    assert torch.all(probs >= 0)
    assert torch.all(probs <= 1)

Accuracy Threshold Tests

def test_model_meets_minimum_accuracy():
    """Model must meet minimum accuracy on validation set."""
    model = load_model("s3://bucket/models/model_v1.pt")
    val_loader = load_validation_data()

    accuracy = evaluate_accuracy(model, val_loader)

    MIN_ACCURACY = 0.85
    assert accuracy >= MIN_ACCURACY, f"Accuracy {accuracy:.3f} < {MIN_ACCURACY}"

def test_model_regression_metrics():
    """For regression, test MAE and RMSE."""
    model = load_regression_model("s3://bucket/models/regressor_v1.pt")
    val_data = load_validation_data()

    predictions = model.predict(val_data.X)
    mae = mean_absolute_error(val_data.y, predictions)
    rmse = np.sqrt(mean_squared_error(val_data.y, predictions))

    assert mae < 10.0, f"MAE {mae:.2f} exceeds threshold"
    assert rmse < 15.0, f"RMSE {rmse:.2f} exceeds threshold"

Slice-Based Evaluation

Overall accuracy can hide problems in specific subgroups.

def test_model_performance_by_demographic_slice():
    """Test model accuracy across demographic slices."""
    model = load_model("s3://bucket/models/model_v1.pt")
    test_data = load_test_data_with_demographics()

    # Evaluate on different slices
    slices = {
        "age_under_30": test_data[test_data['age'] < 30],
        "age_30_to_50": test_data[(test_data['age'] >= 30) & (test_data['age'] < 50)],
        "age_over_50": test_data[test_data['age'] >= 50]
    }

    MIN_ACCURACY_PER_SLICE = 0.80

    for slice_name, slice_data in slices.items():
        accuracy = evaluate_accuracy(model, slice_data)
        assert accuracy >= MIN_ACCURACY_PER_SLICE, \
            f"Accuracy on {slice_name} is {accuracy:.3f}, below threshold"

13.1.6. Behavioral Testing: The Checklist Paradigm

Behavioral Testing was formalized by Ribeiro et al. (2020) in the paper “Beyond Accuracy: Behavioral Testing of NLP Models with CheckList”.

The idea: Instead of just measuring aggregate accuracy, define specific behaviors the model must exhibit, then test them systematically.

Three Types of Behavioral Tests

1. Invariance Tests (INV): The output should NOT change when certain inputs are modified.

Example: Sentiment analysis should be invariant to typos.

  • Input: “This movie is great!”
  • Perturbed: “This movie is grate!” (typo)
  • Expected: Both should have positive sentiment

2. Directional Tests (DIR): A specific change to input should cause a predictable change in output.

Example: Adding “not” should flip sentiment.

  • Input: “This movie is great!”
  • Modified: “This movie is not great!”
  • Expected: Sentiment should flip from positive to negative

3. Minimum Functionality Tests (MFT): The model should handle simple, unambiguous cases correctly.

Example: Obviously positive sentences.

  • Input: “I love this product, it’s amazing!”
  • Expected: Positive sentiment (with high confidence)

Implementation of Behavioral Tests

import pytest

def test_sentiment_invariance_to_typos():
    """Sentiment should be invariant to common typos."""
    model = load_sentiment_model()

    test_cases = [
        ("This is fantastic", "This is fantastik"),
        ("I love this", "I luv this"),
        ("Amazing quality", "Amazng quality")
    ]

    for original, perturbed in test_cases:
        sentiment_original = model.predict(original)
        sentiment_perturbed = model.predict(perturbed)

        assert sentiment_original == sentiment_perturbed, \
            f"Sentiment changed: {original} → {perturbed}"

def test_sentiment_directional_negation():
    """Adding 'not' should flip sentiment."""
    model = load_sentiment_model()

    test_cases = [
        ("This is great", "This is not great"),
        ("I love it", "I do not love it"),
        ("Excellent product", "Not an excellent product")
    ]

    for positive, negative in test_cases:
        sentiment_pos = model.predict(positive)
        sentiment_neg = model.predict(negative)

        assert sentiment_pos == "positive"
        assert sentiment_neg == "negative", \
            f"Negation failed: {positive} → {negative}"

def test_sentiment_minimum_functionality():
    """Model should handle obvious cases."""
    model = load_sentiment_model()

    positive_cases = [
        "I absolutely love this!",
        "Best purchase ever!",
        "Five stars, highly recommend!"
    ]

    negative_cases = [
        "This is terrible.",
        "Worst experience of my life.",
        "Complete waste of money."
    ]

    for text in positive_cases:
        assert model.predict(text) == "positive", f"Failed on: {text}"

    for text in negative_cases:
        assert model.predict(text) == "negative", f"Failed on: {text}"

13.1.7. Metamorphic Testing

When you don’t have labeled test data, Metamorphic Testing defines relationships between inputs and outputs.

Metamorphic Relation: A transformation $T$ applied to input $x$ should produce a predictable transformation $T’$ to the output $f(x)$.

Example: Image Classifier

Metamorphic Relation: Rotating an image by 360° should produce the same classification.

def test_image_classifier_rotation_invariance():
    """Classifier should be invariant to 360° rotation."""
    model = load_image_classifier()
    image = load_test_image("dog.jpg")

    # Predict on original
    pred_original = model.predict(image)

    # Rotate 360° (identity transformation)
    image_rotated = rotate_image(image, angle=360)
    pred_rotated = model.predict(image_rotated)

    assert pred_original == pred_rotated

Metamorphic Relation: Flipping an image horizontally should not change the class (for symmetric objects).

def test_image_classifier_horizontal_flip():
    """For symmetric classes (dogs, cats), horizontal flip should not change prediction."""
    model = load_image_classifier()
    symmetric_classes = ['dog', 'cat', 'bird']

    for class_name in symmetric_classes:
        image = load_test_image(f"{class_name}.jpg")
        pred_original = model.predict(image)

        image_flipped = flip_horizontal(image)
        pred_flipped = model.predict(image_flipped)

        assert pred_original == pred_flipped, \
            f"Prediction changed on flip for {class_name}"

13.1.8. Layer 4: Integration Tests (End-to-End Pipeline)

Integration tests validate the entire ML pipeline from data ingestion to prediction serving.

End-to-End Training Pipeline Test

def test_training_pipeline_end_to_end():
    """Test full training pipeline: data → train → validate → save."""
    # 1. Ingest data
    raw_data = ingest_from_s3("s3://bucket/raw_data.csv")
    assert len(raw_data) > 1000, "Insufficient training data"

    # 2. Preprocess
    processed_data = preprocess_pipeline(raw_data)
    assert 'label' in processed_data.columns
    assert processed_data.isnull().sum().sum() == 0, "Nulls remain after preprocessing"

    # 3. Train model
    model = train_model(processed_data)
    assert model is not None

    # 4. Evaluate
    val_data = load_validation_data()
    accuracy = evaluate_accuracy(model, val_data)
    assert accuracy > 0.8, f"Trained model accuracy {accuracy:.3f} too low"

    # 5. Save model
    save_path = "s3://bucket/models/test_model.pt"
    save_model(model, save_path)

    # 6. Verify model can be loaded
    loaded_model = load_model(save_path)
    assert loaded_model is not None

End-to-End Inference Pipeline Test

def test_inference_pipeline_end_to_end():
    """Test full inference pipeline: request → preprocess → predict → postprocess → response."""
    # 1. Simulate API request
    request_payload = {
        "user_id": 12345,
        "features": {
            "age": 35,
            "spend_30d": 150.50,
            "country": "US"
        }
    }

    # 2. Validate request
    validate_request_schema(request_payload)

    # 3. Fetch additional features (e.g., from Feature Store)
    enriched_features = fetch_features(request_payload['user_id'])

    # 4. Preprocess
    model_input = preprocess_for_inference(enriched_features)

    # 5. Load model
    model = load_model("s3://bucket/models/prod_model.pt")

    # 6. Predict
    prediction = model.predict(model_input)

    # 7. Postprocess
    response = {
        "user_id": request_payload['user_id'],
        "prediction": float(prediction),
        "confidence": 0.92
    }

    # 8. Validate response
    assert 'prediction' in response
    assert 0 <= response['confidence'] <= 1

13.1.9. Shadow Mode Testing (Differential Testing)

Before deploying a new model to production, run it in shadow mode: serve predictions from both the old and new models, but only return the old model’s predictions to users. Compare outputs.

Architecture

User Request
     |
     v
Load Balancer
     |
     +----> Old Model (Production)  -----> Return to User
     |
     +----> New Model (Shadow)      -----> Log predictions (don't serve)
                                            Compare with Old Model

Implementation (AWS Lambda Example)

import boto3
import json

sagemaker_runtime = boto3.client('sagemaker-runtime')

def lambda_handler(event, context):
    """
    Invoke both prod and shadow models, return prod result, log comparison.
    """
    input_data = json.loads(event['body'])

    # Invoke production model
    response_prod = sagemaker_runtime.invoke_endpoint(
        EndpointName='prod-model-endpoint',
        Body=json.dumps(input_data),
        ContentType='application/json'
    )
    prediction_prod = json.loads(response_prod['Body'].read())

    # Invoke shadow model
    response_shadow = sagemaker_runtime.invoke_endpoint(
        EndpointName='shadow-model-endpoint',
        Body=json.dumps(input_data),
        ContentType='application/json'
    )
    prediction_shadow = json.loads(response_shadow['Body'].read())

    # Log comparison
    comparison = {
        'input': input_data,
        'prod_prediction': prediction_prod,
        'shadow_prediction': prediction_shadow,
        'agreement': (prediction_prod['class'] == prediction_shadow['class'])
    }

    # Send to CloudWatch Logs or S3
    log_comparison(comparison)

    # Return production result to user
    return {
        'statusCode': 200,
        'body': json.dumps(prediction_prod)
    }

def log_comparison(comparison):
    """Log shadow mode comparison to S3."""
    s3 = boto3.client('s3')
    timestamp = int(time.time())
    s3.put_object(
        Bucket='ml-shadow-logs',
        Key=f'comparisons/{timestamp}.json',
        Body=json.dumps(comparison)
    )

Analyzing Shadow Mode Results

import pandas as pd

def analyze_shadow_mode_logs():
    """Analyze agreement between prod and shadow models."""
    # Load logs from S3
    logs = load_logs_from_s3('ml-shadow-logs/comparisons/')

    df = pd.DataFrame(logs)

    # Calculate agreement rate
    agreement_rate = df['agreement'].mean()
    print(f"Agreement Rate: {agreement_rate:.2%}")

    # Find cases of disagreement
    disagreements = df[df['agreement'] == False]

    # Analyze patterns
    print(f"Total disagreements: {len(disagreements)}")
    print("\nSample disagreements:")
    print(disagreements[['prod_prediction', 'shadow_prediction']].head(10))

    # Statistical test: Is shadow model significantly better?
    # (Requires human labels for a sample)

13.1.10. Performance Testing (Latency & Throughput)

ML models must meet non-functional requirements: latency SLAs, throughput targets, memory limits.

Latency Testing

import time
import numpy as np

def test_inference_latency_p99():
    """Test that p99 latency is under SLA."""
    model = load_model("s3://bucket/models/prod_model.pt")
    test_inputs = generate_test_batch(size=1000)

    latencies = []

    for input_data in test_inputs:
        start = time.perf_counter()
        _ = model.predict(input_data)
        end = time.perf_counter()

        latencies.append((end - start) * 1000)  # Convert to ms

    p99_latency = np.percentile(latencies, 99)
    SLA_MS = 100

    assert p99_latency < SLA_MS, \
        f"p99 latency {p99_latency:.2f}ms exceeds SLA of {SLA_MS}ms"

Throughput Testing

def test_batch_inference_throughput():
    """Test that model can process required throughput."""
    model = load_model("s3://bucket/models/prod_model.pt")
    batch_size = 32
    num_batches = 100

    start = time.time()

    for _ in range(num_batches):
        batch = generate_test_batch(size=batch_size)
        _ = model.predict(batch)

    end = time.time()
    duration = end - start

    total_samples = batch_size * num_batches
    throughput = total_samples / duration  # samples per second

    MIN_THROUGHPUT = 500  # samples/sec
    assert throughput >= MIN_THROUGHPUT, \
        f"Throughput {throughput:.0f} samples/s < {MIN_THROUGHPUT}"

Memory Profiling

import psutil
import torch

def test_model_memory_footprint():
    """Ensure model fits in available GPU memory."""
    model = load_model("s3://bucket/models/prod_model.pt")
    model = model.cuda()

    # Measure GPU memory
    torch.cuda.reset_peak_memory_stats()

    dummy_input = torch.randn(32, 3, 224, 224).cuda()
    _ = model(dummy_input)

    peak_memory_mb = torch.cuda.max_memory_allocated() / (1024 ** 2)
    MAX_MEMORY_MB = 10000  # 10 GB

    assert peak_memory_mb < MAX_MEMORY_MB, \
        f"Peak GPU memory {peak_memory_mb:.0f}MB exceeds limit {MAX_MEMORY_MB}MB"

13.1.11. Testing in CI/CD Pipelines

Tests are worthless if they’re not automated. Integrate ML tests into your CI/CD pipeline.

GitHub Actions Example

name: ML Model Tests

on:
  pull_request:
    paths:
      - 'src/models/**'
      - 'src/features/**'
      - 'tests/**'

jobs:
  data-validation:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v3

      - name: Set up Python
        uses: actions/setup-python@v4
        with:
          python-version: '3.10'

      - name: Install dependencies
        run: |
          pip install great-expectations pandas

      - name: Run data validation tests
        run: |
          python tests/test_data_validation.py

  component-tests:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v3

      - name: Run feature engineering tests
        run: |
          pytest tests/test_features.py -v

  model-quality-tests:
    runs-on: [self-hosted, gpu]
    steps:
      - uses: actions/checkout@v3

      - name: Download test model
        run: |
          aws s3 cp s3://ml-models/candidate_model.pt ./model.pt

      - name: Run behavioral tests
        run: |
          pytest tests/test_model_behavior.py -v

      - name: Run performance tests
        run: |
          pytest tests/test_model_performance.py -v

  integration-tests:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v3

      - name: Run end-to-end pipeline test
        run: |
          python tests/test_integration.py

13.1.12. Cloud-Native Testing Infrastructure

AWS SageMaker Processing for Test Execution

from sagemaker.processing import ScriptProcessor, ProcessingInput, ProcessingOutput

processor = ScriptProcessor(
    image_uri='python:3.10',
    role='SageMakerRole',
    instance_count=1,
    instance_type='ml.m5.xlarge'
)

processor.run(
    code='tests/run_all_tests.py',
    inputs=[
        ProcessingInput(
            source='s3://ml-data/test_data/',
            destination='/opt/ml/processing/input'
        )
    ],
    outputs=[
        ProcessingOutput(
            source='/opt/ml/processing/output',
            destination='s3://ml-test-results/'
        )
    ],
    arguments=['--test-suite', 'integration']
)

GCP Vertex AI Pipelines for Testing

from google.cloud import aiplatform
from kfp.v2 import dsl

@dsl.component
def run_data_validation_tests():
    import great_expectations as gx
    # ... validation logic ...
    return "PASSED"

@dsl.component
def run_model_tests(model_uri: str):
    # ... model testing logic ...
    return "PASSED"

@dsl.pipeline(name='ml-testing-pipeline')
def testing_pipeline():
    data_validation = run_data_validation_tests()

    model_tests = run_model_tests(
        model_uri='gs://ml-models/candidate_model'
    ).after(data_validation)

aiplatform.PipelineJob(
    display_name='ml-testing-pipeline',
    template_path='pipeline.json',
    pipeline_root='gs://ml-pipelines/'
).run()

13.1.13. Regression Testing (Model Versioning)

When you update a model, ensure you don’t regress on previously-working cases.

Building a Test Suite Over Time

class RegressionTestSuite:
    """Accumulate test cases from production failures."""

    def __init__(self, storage_path='s3://ml-tests/regression/'):
        self.storage_path = storage_path
        self.test_cases = self.load_test_cases()

    def load_test_cases(self):
        """Load all regression test cases from storage."""
        # Load from S3/GCS
        return load_from_storage(self.storage_path)

    def add_test_case(self, input_data, expected_output, description):
        """Add a new regression test case."""
        test_case = {
            'input': input_data,
            'expected': expected_output,
            'description': description,
            'added_date': datetime.now().isoformat()
        }
        self.test_cases.append(test_case)
        self.save_test_cases()

    def run_all_tests(self, model):
        """Run all regression tests on a new model."""
        failures = []

        for i, test_case in enumerate(self.test_cases):
            prediction = model.predict(test_case['input'])

            if prediction != test_case['expected']:
                failures.append({
                    'test_id': i,
                    'description': test_case['description'],
                    'expected': test_case['expected'],
                    'got': prediction
                })

        if failures:
            print(f"REGRESSION DETECTED: {len(failures)} tests failed")
            for failure in failures:
                print(f"  - {failure['description']}")
            return False

        print(f"All {len(self.test_cases)} regression tests passed")
        return True

13.1.14. Summary: The Testing Strategy

Testing machine learning systems requires a paradigm shift from traditional software testing. The four-layer pyramid provides a structured approach:

1. Data Validation (Foundation):

  • Schema validation (Great Expectations)
  • Distribution drift detection (KS test, PSI)
  • Integrity constraints
  • Run on every batch, every day

2. Component Tests (Feature Engineering):

  • Unit tests for transformers, encoders, scalers
  • Pipeline integration tests
  • Fast, deterministic, run on every commit

3. Model Quality Tests (Behavioral):

  • Smoke tests (model loads, predicts)
  • Accuracy threshold tests
  • Slice-based evaluation
  • Invariance, directional, and minimum functionality tests
  • Run on every model candidate

4. Integration Tests (End-to-End):

  • Full pipeline tests (data → train → serve)
  • Shadow mode differential testing
  • Performance tests (latency, throughput, memory)
  • Run before production deployment

Key Principles:

  • Automate Everything: Tests must run in CI/CD without human intervention
  • Fail Fast: Catch issues in Layer 1 before they reach Layer 4
  • Accumulate Knowledge: Build regression test suites from production failures
  • Monitor in Production: Testing doesn’t end at deployment; continuous validation is required

Cloud Integration:

  • AWS: SageMaker Processing, Glue DataBrew, CloudWatch
  • GCP: Vertex AI Pipelines, Dataflow, Cloud Monitoring

The cost of a bug in production is exponentially higher than catching it in testing. For ML systems, a silent accuracy degradation can cost millions in lost revenue or damaged reputation. Invest in comprehensive testing infrastructure—it’s not overhead, it’s insurance.

In the next chapter, we will explore Continuous Training (CT) Orchestration, where we automate the retraining and deployment of models as data evolves.