Keyboard shortcuts

Press or to navigate between chapters

Press ? to show this help

Press Esc to hide this help

Chapter 9.5: Data Quality Management

“Garbage in, garbage out. The difference between a good ML model and a disaster is data quality.” — DJ Patil, Former U.S. Chief Data Scientist

Data quality is the foundation of ML success. This chapter covers comprehensive strategies for validation, drift detection, and quality management at scale across AWS and GCP.


9.5.1. The Data Quality Crisis in ML

Why Data Quality Matters More for ML

Traditional SoftwareMachine Learning
Explicit rules handle edge casesModel learns from data patterns
Bugs are deterministicBugs are probabilistic
Testing catches issuesBad data creates silent failures
Fix the codeFix the data AND the code

Common Data Quality Issues

IssueDescriptionImpact on ML
Missing valuesNull, empty, or placeholder valuesBiased predictions, training failures
OutliersExtreme values outside normal rangeSkewed model weights
DuplicatesSame record multiple timesOverfitting to duplicates
Inconsistent formatsDates as strings, mixed encodingsFeature engineering failures
Schema driftColumn added/removed/renamedPipeline breaks
Range violationsAge = -5, Price = $999,999,999Nonsense predictions
Referential breaksForeign keys pointing to deleted recordsJoin failures
Stale dataOld data presented as currentOutdated predictions

The Cost of Bad Data

A 2022 Gartner study found:

  • Poor data quality costs organizations an average of $12.9M annually
  • 60% of data scientists spend more time cleaning data than building models
  • 20% of ML models fail in production due to data quality issues

9.5.2. The Data Quality Framework

The Five Dimensions of Data Quality

DimensionDefinitionML Relevance
AccuracyData correctly represents realityModel learns true patterns
CompletenessAll required data is presentNo missing feature issues
ConsistencyData is uniform across sourcesClean joins, no conflicts
TimelinessData is current and freshPredictions reflect reality
ValidityData conforms to rules/formatsPipeline stability

The Quality Pipeline

┌─────────────────────────────────────────────────────────────────────┐
│                    DATA QUALITY PIPELINE                            │
├─────────────────────────────────────────────────────────────────────┤
│                                                                     │
│  ┌──────────────┐    ┌──────────────┐    ┌──────────────┐          │
│  │   SOURCE     │───▶│  VALIDATION  │───▶│  TRANSFORM   │          │
│  │   DATA       │    │   LAYER      │    │   LAYER      │          │
│  └──────────────┘    └──────────────┘    └──────────────┘          │
│                             │                    │                  │
│                             ▼                    ▼                  │
│                      ┌──────────────┐    ┌──────────────┐          │
│                      │   QUALITY    │    │   QUALITY    │          │
│                      │   METRICS    │    │   METRICS    │          │
│                      └──────────────┘    └──────────────┘          │
│                             │                    │                  │
│                             └────────┬───────────┘                  │
│                                      ▼                              │
│                            ┌─────────────────┐                      │
│                            │   MONITORING &  │                      │
│                            │   ALERTING      │                      │
│                            └─────────────────┘                      │
│                                      │                              │
│                            ┌─────────▼─────────┐                    │
│                            │  Pass: Continue   │                    │
│                            │  Fail: Alert/Stop │                    │
│                            └───────────────────┘                    │
└─────────────────────────────────────────────────────────────────────┘

9.5.3. Great Expectations: The Validation Standard

Great Expectations is the most widely-adopted data validation framework for ML pipelines.

Core Concepts

ConceptDefinition
ExpectationA verifiable assertion about data
Expectation SuiteCollection of expectations for a dataset
CheckpointValidation run configuration
Data DocsAuto-generated documentation
ProfilerAutomatic expectation generation

Setting Up Great Expectations

# Install
# pip install great_expectations

import great_expectations as gx

# Create context
context = gx.get_context()

# Connect to data source
datasource = context.sources.add_pandas("pandas_datasource")

# Add a data asset
data_asset = datasource.add_dataframe_asset(name="user_events")

# Build batch request
batch_request = data_asset.build_batch_request(dataframe=user_events_df)

Defining Expectations

# Create an expectation suite
suite = context.add_expectation_suite("user_events_quality")

# Add expectations
batch = data_asset.get_batch(batch_request)
validator = context.get_validator(batch=batch, expectation_suite=suite)

# Column existence
validator.expect_column_to_exist("user_id")
validator.expect_column_to_exist("event_type")
validator.expect_column_to_exist("timestamp")
validator.expect_column_to_exist("amount")

# Type checks
validator.expect_column_values_to_be_of_type("user_id", "str")
validator.expect_column_values_to_be_of_type("amount", "float")
validator.expect_column_values_to_be_of_type("timestamp", "datetime64")

# Null checks
validator.expect_column_values_to_not_be_null("user_id")
validator.expect_column_values_to_not_be_null("event_type")
# Allow some nulls in amount for non-purchase events
validator.expect_column_values_to_not_be_null("amount", mostly=0.80)

# Value constraints
validator.expect_column_values_to_be_in_set(
    "event_type", 
    ["page_view", "click", "purchase", "add_to_cart", "checkout"]
)

# Range checks
validator.expect_column_values_to_be_between(
    "amount", 
    min_value=0, 
    max_value=100000,
    mostly=0.99  # Allow 1% outliers
)

# Uniqueness (with allowance for duplicates)
validator.expect_column_values_to_be_unique("event_id")

# Freshness check (for streaming data)
from datetime import datetime, timedelta
one_hour_ago = datetime.now() - timedelta(hours=1)
validator.expect_column_max_to_be_between(
    "timestamp",
    min_value=one_hour_ago,
    max_value=datetime.now()
)

# Statistical expectations
validator.expect_column_mean_to_be_between("amount", min_value=10, max_value=500)
validator.expect_column_stdev_to_be_between("amount", min_value=5, max_value=200)

# Distribution expectations
validator.expect_column_kl_divergence_to_be_less_than(
    "amount",
    partition_object=reference_distribution,
    threshold=0.1
)

# Save the suite
validator.save_expectation_suite(discard_failed_expectations=False)

Running Validations in Production

# Create a checkpoint
checkpoint = context.add_or_update_checkpoint(
    name="daily_quality_check",
    validations=[
        {
            "batch_request": batch_request,
            "expectation_suite_name": "user_events_quality",
        }
    ],
    action_list=[
        {
            "name": "store_validation_result",
            "action": {"class_name": "StoreValidationResultAction"},
        },
        {
            "name": "store_evaluation_params",
            "action": {"class_name": "StoreEvaluationParametersAction"},
        },
        {
            "name": "update_data_docs",
            "action": {"class_name": "UpdateDataDocsAction"},
        },
        # Slack notification on failure
        {
            "name": "send_slack_notification",
            "action": {
                "class_name": "SlackNotificationAction",
                "slack_webhook": "${SLACK_WEBHOOK}",
                "notify_on": "failure",
            },
        },
    ],
)

# Run the checkpoint
result = checkpoint.run()

# Check result
if not result.success:
    # Pipeline should stop
    raise DataQualityError("Validation failed", result.list_validation_results())

9.5.4. AWS Glue Data Quality

AWS Glue provides native data quality capabilities integrated with ETL.

Data Quality Rules in Glue

# Glue Data Quality rule syntax
rules = """
Rules = [
    # Completeness
    ColumnExists "user_id",
    Completeness "user_id" >= 1.0,
    Completeness "amount" >= 0.8,
    
    # Uniqueness
    Uniqueness "event_id" >= 0.99,
    
    # Range
    ColumnValues "amount" between 0 and 100000,
    ColumnValues "quantity" > 0,
    
    # Distribution
    StandardDeviation "amount" between 5 and 200,
    Mean "amount" between 10 and 500,
    
    # Pattern
    ColumnValues "email" matches "^[a-zA-Z0-9+_.-]+@[a-zA-Z0-9.-]+$",
    
    # Freshness
    DataFreshness "timestamp" <= 1 hours,
    
    # Referential Integrity
    ReferentialIntegrity "user_id" "users.id" >= 0.99,
    
    # Custom SQL
    CustomSql "SELECT COUNT(*) FROM primary WHERE amount < 0" = 0
]
"""

Glue Job with Data Quality

# Glue ETL job with quality checks
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsgluedq.transforms import EvaluateDataQuality

args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

# Read data
datasource = glueContext.create_dynamic_frame.from_catalog(
    database="ml_database",
    table_name="user_events"
)

# Define DQ rules
ruleset = """
Rules = [
    Completeness "user_id" >= 1.0,
    Completeness "event_type" >= 1.0,
    ColumnValues "amount" >= 0,
    Uniqueness "event_id" >= 0.99
]
"""

# Evaluate data quality
dq_results = EvaluateDataQuality.apply(
    frame=datasource,
    ruleset=ruleset,
    publishing_options={
        "dataQualityEvaluationContext": "user_events_dq",
        "enableDataQualityCloudwatchMetrics": True,
        "enableDataQualityResultsPublishing": True,
    },
)

# Get passing and failing records
passing_records = dq_results.filter(f.col("DataQualityEvaluationResult") == "Pass")
failing_records = dq_results.filter(f.col("DataQualityEvaluationResult") == "Fail")

# Route passing records to destination
glueContext.write_dynamic_frame.from_options(
    frame=passing_records,
    connection_type="s3",
    connection_options={"path": "s3://bucket/clean/"},
    format="parquet"
)

# Route failing records to quarantine
glueContext.write_dynamic_frame.from_options(
    frame=failing_records,
    connection_type="s3",
    connection_options={"path": "s3://bucket/quarantine/"},
    format="parquet"
)

job.commit()

CloudWatch Integration

# Terraform: CloudWatch alarm for data quality
resource "aws_cloudwatch_metric_alarm" "data_quality_failure" {
  alarm_name          = "glue-data-quality-failure"
  comparison_operator = "LessThanThreshold"
  evaluation_periods  = 1
  metric_name         = "glue.driver.aggregate.dq.rowsPassedPercentage"
  namespace           = "AWS/Glue"
  period              = 300
  statistic           = "Average"
  threshold           = 95  # Alert if less than 95% of rows pass
  alarm_description   = "Data quality check failure"

  dimensions = {
    JobName = "user-events-etl"
  }

  alarm_actions = [aws_sns_topic.alerts.arn]
}

9.5.5. GCP Data Quality with Dataplex

Google’s Dataplex provides integrated data quality management.

Dataplex Data Profile

# Using Dataplex Data Quality API
from google.cloud import dataplex_v1

client = dataplex_v1.DataScanServiceClient()

# Create a data quality scan
data_quality_spec = dataplex_v1.DataQualitySpec(
    rules=[
        # Null check
        dataplex_v1.DataQualityRule(
            column="user_id",
            non_null_expectation=dataplex_v1.DataQualityRule.NonNullExpectation(),
            threshold=1.0,  # 100% non-null
        ),
        # Range check
        dataplex_v1.DataQualityRule(
            column="amount",
            range_expectation=dataplex_v1.DataQualityRule.RangeExpectation(
                min_value="0",
                max_value="100000",
            ),
            threshold=0.99,  # 99% within range
        ),
        # Set membership
        dataplex_v1.DataQualityRule(
            column="event_type",
            set_expectation=dataplex_v1.DataQualityRule.SetExpectation(
                values=["page_view", "click", "purchase", "add_to_cart"]
            ),
            threshold=1.0,
        ),
        # Regex pattern
        dataplex_v1.DataQualityRule(
            column="email",
            regex_expectation=dataplex_v1.DataQualityRule.RegexExpectation(
                regex=r"^[a-zA-Z0-9+_.-]+@[a-zA-Z0-9.-]+$"
            ),
            threshold=0.95,
        ),
        # Uniqueness
        dataplex_v1.DataQualityRule(
            column="event_id",
            uniqueness_expectation=dataplex_v1.DataQualityRule.UniquenessExpectation(),
            threshold=0.99,
        ),
        # Statistical checks
        dataplex_v1.DataQualityRule(
            column="amount",
            statistic_range_expectation=dataplex_v1.DataQualityRule.StatisticRangeExpectation(
                statistic=dataplex_v1.DataQualityRule.StatisticRangeExpectation.Statistic.MEAN,
                min_value="10",
                max_value="500",
            ),
            threshold=1.0,
        ),
        # Row-level checks with SQL
        dataplex_v1.DataQualityRule(
            row_condition_expectation=dataplex_v1.DataQualityRule.RowConditionExpectation(
                sql_expression="amount >= 0 AND timestamp IS NOT NULL"
            ),
            threshold=0.99,
        ),
    ],
    sampling_percent=100.0,  # Check all data
)

# Create the scan
scan = dataplex_v1.DataScan(
    data=dataplex_v1.DataSource(
        entity=f"projects/{project}/locations/{location}/lakes/{lake}/zones/{zone}/entities/user_events"
    ),
    data_quality_spec=data_quality_spec,
    execution_spec=dataplex_v1.DataScan.ExecutionSpec(
        trigger=dataplex_v1.Trigger(
            schedule=dataplex_v1.Trigger.Schedule(
                cron="0 */6 * * *"  # Every 6 hours
            )
        )
    ),
)

operation = client.create_data_scan(
    parent=f"projects/{project}/locations/{location}",
    data_scan=scan,
    data_scan_id="user-events-dq-scan"
)
result = operation.result()

Terraform: Dataplex Data Quality

# Dataplex Data Quality Scan
resource "google_dataplex_datascan" "user_events_quality" {
  location = var.region
  project  = var.project_id

  data_scan_id = "user-events-quality"

  data {
    entity = google_dataplex_entity.user_events.id
  }

  execution_spec {
    trigger {
      schedule {
        cron = "0 */6 * * *"
      }
    }
  }

  data_quality_spec {
    sampling_percent = 100

    rules {
      column         = "user_id"
      non_null_expectation {}
      threshold      = 1.0
    }

    rules {
      column = "amount"
      range_expectation {
        min_value = "0"
        max_value = "100000"
      }
      threshold = 0.99
    }

    rules {
      column = "event_type"
      set_expectation {
        values = ["page_view", "click", "purchase", "add_to_cart"]
      }
      threshold = 1.0
    }

    rules {
      uniqueness_expectation {
        column = "event_id"
      }
      threshold = 0.99
    }
  }
}

9.5.6. Data Drift Detection

Drift means the statistical properties of data are changing over time.

Types of Drift

TypeDefinitionDetection Method
Covariate DriftInput feature distribution changesStatistical tests (KS, PSI)
Prior DriftTarget distribution changesLabel distribution monitoring
Concept DriftRelationship between X and Y changesModel performance monitoring
Schema DriftData structure changesSchema validation

Drift Detection with Evidently

# Install evidently
# pip install evidently

from evidently import ColumnMapping
from evidently.report import Report
from evidently.metric_preset import DataDriftPreset, DataQualityPreset
from evidently.tests import TestSuite
from evidently.test_preset import DataDriftTestPreset

# Define column mapping
column_mapping = ColumnMapping(
    prediction="prediction",
    numerical_features=["amount", "age", "session_duration"],
    categorical_features=["event_type", "device", "country"]
)

# Create drift report
report = Report(metrics=[
    DataDriftPreset(),
    DataQualityPreset(),
])

# Compare reference (training) and current (production) data
report.run(
    reference_data=training_data,
    current_data=production_data,
    column_mapping=column_mapping
)

# Save report
report.save_html("drift_report.html")

# Get JSON for programmatic access
report_json = report.json()
drift_results = report.as_dict()

# Check if drift detected
if drift_results['metrics'][0]['result']['dataset_drift']:
    print("⚠️ Data drift detected!")
    for col, drift in drift_results['metrics'][0]['result']['drift_by_columns'].items():
        if drift['drift_detected']:
            print(f"  - {col}: drift score = {drift['drift_score']:.4f}")

Test Suite for CI/CD

# Drift tests for automated validation
from evidently.tests import (
    TestNumberOfRows,
    TestNumberOfColumns,
    TestColumnDrift,
    TestShareOfMissingValues,
    TestMeanInNSigmas,
)

tests = TestSuite(tests=[
    # Row count should be within expected range
    TestNumberOfRows(gte=10000, lte=100000),
    
    # Schema stability
    TestNumberOfColumns(eq=15),
    
    # Missing value thresholds
    TestShareOfMissingValues(column="user_id", lte=0.0),
    TestShareOfMissingValues(column="amount", lte=0.2),
    
    # Statistical stability
    TestMeanInNSigmas(column="amount", n=3),
    
    # Drift detection
    TestColumnDrift(column="amount", stattest_threshold=0.05),
    TestColumnDrift(column="event_type", stattest_threshold=0.05),
])

tests.run(reference_data=training_data, current_data=production_data)

# For CI/CD integration
if not tests.as_dict()['summary']['all_passed']:
    raise Exception("Data drift tests failed!")

9.5.7. Schema Validation and Evolution

Schema Validation with Pandera

import pandera as pa
from pandera import Column, DataFrameSchema, Check

# Define schema
user_events_schema = DataFrameSchema(
    columns={
        "event_id": Column(str, Check.str_matches(r"^[a-f0-9-]{36}$")),
        "user_id": Column(str, nullable=False),
        "event_type": Column(
            str, 
            Check.isin(["page_view", "click", "purchase", "add_to_cart"])
        ),
        "timestamp": Column(pa.DateTime, nullable=False),
        "amount": Column(
            float, 
            Check.in_range(0, 100000),
            nullable=True
        ),
        "device": Column(str, Check.isin(["mobile", "desktop", "tablet"])),
        "country": Column(str, Check.str_length(min_value=2, max_value=2)),
    },
    coerce=True,  # Coerce types if possible
    strict=True,   # No extra columns allowed
)

# Validate
try:
    validated_df = user_events_schema.validate(df)
except pa.errors.SchemaError as e:
    print(f"Schema validation failed: {e}")
    # Send alert, quarantine data

Schema Evolution with Schema Registry

# Apache Avro schema for event data
schema_v1 = {
    "type": "record",
    "name": "UserEvent",
    "namespace": "com.company.ml",
    "fields": [
        {"name": "event_id", "type": "string"},
        {"name": "user_id", "type": "string"},
        {"name": "event_type", "type": "string"},
        {"name": "timestamp", "type": "long"},
        {"name": "amount", "type": ["null", "double"], "default": None},
    ]
}

# Compatible evolution: add optional field
schema_v2 = {
    "type": "record",
    "name": "UserEvent",
    "namespace": "com.company.ml",
    "fields": [
        {"name": "event_id", "type": "string"},
        {"name": "user_id", "type": "string"},
        {"name": "event_type", "type": "string"},
        {"name": "timestamp", "type": "long"},
        {"name": "amount", "type": ["null", "double"], "default": None},
        # New optional field (backward compatible)
        {"name": "session_id", "type": ["null", "string"], "default": None},
    ]
}

# Register with Confluent Schema Registry
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroSerializer

schema_registry_conf = {'url': 'http://schema-registry:8081'}
schema_registry = SchemaRegistryClient(schema_registry_conf)

# Register schema with compatibility check
schema_registry.register_schema(
    "user-events-value",
    Schema(json.dumps(schema_v2), "AVRO")
)

9.5.8. Data Quality Metrics and Monitoring

Key Metrics to Track

MetricDescriptionTarget
Completeness Rate% of non-null values>99% for required fields
Validity Rate% passing validation rules>99%
FreshnessTime since last update<1 hour for real-time
Consistency ScoreMatch rate across sources>99%
Drift ScoreStatistical distance from baseline<0.1

Prometheus Metrics

from prometheus_client import Counter, Gauge, Histogram

# Quality metrics
rows_processed = Counter(
    'data_quality_rows_processed_total',
    'Total rows processed',
    ['dataset', 'status']
)

quality_score = Gauge(
    'data_quality_score',
    'Overall quality score (0-100)',
    ['dataset']
)

validation_duration = Histogram(
    'data_quality_validation_duration_seconds',
    'Time to run validation',
    ['dataset']
)

drift_score = Gauge(
    'data_quality_drift_score',
    'Drift score by column',
    ['dataset', 'column']
)

# Update metrics after validation
def update_quality_metrics(dataset, results):
    rows_processed.labels(dataset=dataset, status='pass').inc(results['passed'])
    rows_processed.labels(dataset=dataset, status='fail').inc(results['failed'])
    quality_score.labels(dataset=dataset).set(results['score'])
    
    for col, score in results['drift_scores'].items():
        drift_score.labels(dataset=dataset, column=col).set(score)

9.5.9. Key Takeaways

  1. Data quality is foundational: Bad data → bad models, no exceptions.

  2. Validate at every stage: Ingestion, transformation, serving.

  3. Use Great Expectations or cloud-native tools: Proven frameworks save time.

  4. Monitor for drift continuously: Data changes; detect it early.

  5. Schema evolution requires planning: Use registries, version schemas.

  6. Automate quality gates: Block bad data from entering pipelines.

  7. Track quality metrics: What you measure improves.

  8. Quarantine, don’t discard: Save bad data for debugging.


Next: 9.6 Advanced Data Versioning — lakeFS, Delta Lake, and reproducibility.