Chapter 9.5: Data Quality Management
“Garbage in, garbage out. The difference between a good ML model and a disaster is data quality.” — DJ Patil, Former U.S. Chief Data Scientist
Data quality is the foundation of ML success. This chapter covers comprehensive strategies for validation, drift detection, and quality management at scale across AWS and GCP.
9.5.1. The Data Quality Crisis in ML
Why Data Quality Matters More for ML
| Traditional Software | Machine Learning |
|---|---|
| Explicit rules handle edge cases | Model learns from data patterns |
| Bugs are deterministic | Bugs are probabilistic |
| Testing catches issues | Bad data creates silent failures |
| Fix the code | Fix the data AND the code |
Common Data Quality Issues
| Issue | Description | Impact on ML |
|---|---|---|
| Missing values | Null, empty, or placeholder values | Biased predictions, training failures |
| Outliers | Extreme values outside normal range | Skewed model weights |
| Duplicates | Same record multiple times | Overfitting to duplicates |
| Inconsistent formats | Dates as strings, mixed encodings | Feature engineering failures |
| Schema drift | Column added/removed/renamed | Pipeline breaks |
| Range violations | Age = -5, Price = $999,999,999 | Nonsense predictions |
| Referential breaks | Foreign keys pointing to deleted records | Join failures |
| Stale data | Old data presented as current | Outdated predictions |
The Cost of Bad Data
A 2022 Gartner study found:
- Poor data quality costs organizations an average of $12.9M annually
- 60% of data scientists spend more time cleaning data than building models
- 20% of ML models fail in production due to data quality issues
9.5.2. The Data Quality Framework
The Five Dimensions of Data Quality
| Dimension | Definition | ML Relevance |
|---|---|---|
| Accuracy | Data correctly represents reality | Model learns true patterns |
| Completeness | All required data is present | No missing feature issues |
| Consistency | Data is uniform across sources | Clean joins, no conflicts |
| Timeliness | Data is current and fresh | Predictions reflect reality |
| Validity | Data conforms to rules/formats | Pipeline stability |
The Quality Pipeline
┌─────────────────────────────────────────────────────────────────────┐
│ DATA QUALITY PIPELINE │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ SOURCE │───▶│ VALIDATION │───▶│ TRANSFORM │ │
│ │ DATA │ │ LAYER │ │ LAYER │ │
│ └──────────────┘ └──────────────┘ └──────────────┘ │
│ │ │ │
│ ▼ ▼ │
│ ┌──────────────┐ ┌──────────────┐ │
│ │ QUALITY │ │ QUALITY │ │
│ │ METRICS │ │ METRICS │ │
│ └──────────────┘ └──────────────┘ │
│ │ │ │
│ └────────┬───────────┘ │
│ ▼ │
│ ┌─────────────────┐ │
│ │ MONITORING & │ │
│ │ ALERTING │ │
│ └─────────────────┘ │
│ │ │
│ ┌─────────▼─────────┐ │
│ │ Pass: Continue │ │
│ │ Fail: Alert/Stop │ │
│ └───────────────────┘ │
└─────────────────────────────────────────────────────────────────────┘
9.5.3. Great Expectations: The Validation Standard
Great Expectations is the most widely-adopted data validation framework for ML pipelines.
Core Concepts
| Concept | Definition |
|---|---|
| Expectation | A verifiable assertion about data |
| Expectation Suite | Collection of expectations for a dataset |
| Checkpoint | Validation run configuration |
| Data Docs | Auto-generated documentation |
| Profiler | Automatic expectation generation |
Setting Up Great Expectations
# Install
# pip install great_expectations
import great_expectations as gx
# Create context
context = gx.get_context()
# Connect to data source
datasource = context.sources.add_pandas("pandas_datasource")
# Add a data asset
data_asset = datasource.add_dataframe_asset(name="user_events")
# Build batch request
batch_request = data_asset.build_batch_request(dataframe=user_events_df)
Defining Expectations
# Create an expectation suite
suite = context.add_expectation_suite("user_events_quality")
# Add expectations
batch = data_asset.get_batch(batch_request)
validator = context.get_validator(batch=batch, expectation_suite=suite)
# Column existence
validator.expect_column_to_exist("user_id")
validator.expect_column_to_exist("event_type")
validator.expect_column_to_exist("timestamp")
validator.expect_column_to_exist("amount")
# Type checks
validator.expect_column_values_to_be_of_type("user_id", "str")
validator.expect_column_values_to_be_of_type("amount", "float")
validator.expect_column_values_to_be_of_type("timestamp", "datetime64")
# Null checks
validator.expect_column_values_to_not_be_null("user_id")
validator.expect_column_values_to_not_be_null("event_type")
# Allow some nulls in amount for non-purchase events
validator.expect_column_values_to_not_be_null("amount", mostly=0.80)
# Value constraints
validator.expect_column_values_to_be_in_set(
"event_type",
["page_view", "click", "purchase", "add_to_cart", "checkout"]
)
# Range checks
validator.expect_column_values_to_be_between(
"amount",
min_value=0,
max_value=100000,
mostly=0.99 # Allow 1% outliers
)
# Uniqueness (with allowance for duplicates)
validator.expect_column_values_to_be_unique("event_id")
# Freshness check (for streaming data)
from datetime import datetime, timedelta
one_hour_ago = datetime.now() - timedelta(hours=1)
validator.expect_column_max_to_be_between(
"timestamp",
min_value=one_hour_ago,
max_value=datetime.now()
)
# Statistical expectations
validator.expect_column_mean_to_be_between("amount", min_value=10, max_value=500)
validator.expect_column_stdev_to_be_between("amount", min_value=5, max_value=200)
# Distribution expectations
validator.expect_column_kl_divergence_to_be_less_than(
"amount",
partition_object=reference_distribution,
threshold=0.1
)
# Save the suite
validator.save_expectation_suite(discard_failed_expectations=False)
Running Validations in Production
# Create a checkpoint
checkpoint = context.add_or_update_checkpoint(
name="daily_quality_check",
validations=[
{
"batch_request": batch_request,
"expectation_suite_name": "user_events_quality",
}
],
action_list=[
{
"name": "store_validation_result",
"action": {"class_name": "StoreValidationResultAction"},
},
{
"name": "store_evaluation_params",
"action": {"class_name": "StoreEvaluationParametersAction"},
},
{
"name": "update_data_docs",
"action": {"class_name": "UpdateDataDocsAction"},
},
# Slack notification on failure
{
"name": "send_slack_notification",
"action": {
"class_name": "SlackNotificationAction",
"slack_webhook": "${SLACK_WEBHOOK}",
"notify_on": "failure",
},
},
],
)
# Run the checkpoint
result = checkpoint.run()
# Check result
if not result.success:
# Pipeline should stop
raise DataQualityError("Validation failed", result.list_validation_results())
9.5.4. AWS Glue Data Quality
AWS Glue provides native data quality capabilities integrated with ETL.
Data Quality Rules in Glue
# Glue Data Quality rule syntax
rules = """
Rules = [
# Completeness
ColumnExists "user_id",
Completeness "user_id" >= 1.0,
Completeness "amount" >= 0.8,
# Uniqueness
Uniqueness "event_id" >= 0.99,
# Range
ColumnValues "amount" between 0 and 100000,
ColumnValues "quantity" > 0,
# Distribution
StandardDeviation "amount" between 5 and 200,
Mean "amount" between 10 and 500,
# Pattern
ColumnValues "email" matches "^[a-zA-Z0-9+_.-]+@[a-zA-Z0-9.-]+$",
# Freshness
DataFreshness "timestamp" <= 1 hours,
# Referential Integrity
ReferentialIntegrity "user_id" "users.id" >= 0.99,
# Custom SQL
CustomSql "SELECT COUNT(*) FROM primary WHERE amount < 0" = 0
]
"""
Glue Job with Data Quality
# Glue ETL job with quality checks
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsgluedq.transforms import EvaluateDataQuality
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
# Read data
datasource = glueContext.create_dynamic_frame.from_catalog(
database="ml_database",
table_name="user_events"
)
# Define DQ rules
ruleset = """
Rules = [
Completeness "user_id" >= 1.0,
Completeness "event_type" >= 1.0,
ColumnValues "amount" >= 0,
Uniqueness "event_id" >= 0.99
]
"""
# Evaluate data quality
dq_results = EvaluateDataQuality.apply(
frame=datasource,
ruleset=ruleset,
publishing_options={
"dataQualityEvaluationContext": "user_events_dq",
"enableDataQualityCloudwatchMetrics": True,
"enableDataQualityResultsPublishing": True,
},
)
# Get passing and failing records
passing_records = dq_results.filter(f.col("DataQualityEvaluationResult") == "Pass")
failing_records = dq_results.filter(f.col("DataQualityEvaluationResult") == "Fail")
# Route passing records to destination
glueContext.write_dynamic_frame.from_options(
frame=passing_records,
connection_type="s3",
connection_options={"path": "s3://bucket/clean/"},
format="parquet"
)
# Route failing records to quarantine
glueContext.write_dynamic_frame.from_options(
frame=failing_records,
connection_type="s3",
connection_options={"path": "s3://bucket/quarantine/"},
format="parquet"
)
job.commit()
CloudWatch Integration
# Terraform: CloudWatch alarm for data quality
resource "aws_cloudwatch_metric_alarm" "data_quality_failure" {
alarm_name = "glue-data-quality-failure"
comparison_operator = "LessThanThreshold"
evaluation_periods = 1
metric_name = "glue.driver.aggregate.dq.rowsPassedPercentage"
namespace = "AWS/Glue"
period = 300
statistic = "Average"
threshold = 95 # Alert if less than 95% of rows pass
alarm_description = "Data quality check failure"
dimensions = {
JobName = "user-events-etl"
}
alarm_actions = [aws_sns_topic.alerts.arn]
}
9.5.5. GCP Data Quality with Dataplex
Google’s Dataplex provides integrated data quality management.
Dataplex Data Profile
# Using Dataplex Data Quality API
from google.cloud import dataplex_v1
client = dataplex_v1.DataScanServiceClient()
# Create a data quality scan
data_quality_spec = dataplex_v1.DataQualitySpec(
rules=[
# Null check
dataplex_v1.DataQualityRule(
column="user_id",
non_null_expectation=dataplex_v1.DataQualityRule.NonNullExpectation(),
threshold=1.0, # 100% non-null
),
# Range check
dataplex_v1.DataQualityRule(
column="amount",
range_expectation=dataplex_v1.DataQualityRule.RangeExpectation(
min_value="0",
max_value="100000",
),
threshold=0.99, # 99% within range
),
# Set membership
dataplex_v1.DataQualityRule(
column="event_type",
set_expectation=dataplex_v1.DataQualityRule.SetExpectation(
values=["page_view", "click", "purchase", "add_to_cart"]
),
threshold=1.0,
),
# Regex pattern
dataplex_v1.DataQualityRule(
column="email",
regex_expectation=dataplex_v1.DataQualityRule.RegexExpectation(
regex=r"^[a-zA-Z0-9+_.-]+@[a-zA-Z0-9.-]+$"
),
threshold=0.95,
),
# Uniqueness
dataplex_v1.DataQualityRule(
column="event_id",
uniqueness_expectation=dataplex_v1.DataQualityRule.UniquenessExpectation(),
threshold=0.99,
),
# Statistical checks
dataplex_v1.DataQualityRule(
column="amount",
statistic_range_expectation=dataplex_v1.DataQualityRule.StatisticRangeExpectation(
statistic=dataplex_v1.DataQualityRule.StatisticRangeExpectation.Statistic.MEAN,
min_value="10",
max_value="500",
),
threshold=1.0,
),
# Row-level checks with SQL
dataplex_v1.DataQualityRule(
row_condition_expectation=dataplex_v1.DataQualityRule.RowConditionExpectation(
sql_expression="amount >= 0 AND timestamp IS NOT NULL"
),
threshold=0.99,
),
],
sampling_percent=100.0, # Check all data
)
# Create the scan
scan = dataplex_v1.DataScan(
data=dataplex_v1.DataSource(
entity=f"projects/{project}/locations/{location}/lakes/{lake}/zones/{zone}/entities/user_events"
),
data_quality_spec=data_quality_spec,
execution_spec=dataplex_v1.DataScan.ExecutionSpec(
trigger=dataplex_v1.Trigger(
schedule=dataplex_v1.Trigger.Schedule(
cron="0 */6 * * *" # Every 6 hours
)
)
),
)
operation = client.create_data_scan(
parent=f"projects/{project}/locations/{location}",
data_scan=scan,
data_scan_id="user-events-dq-scan"
)
result = operation.result()
Terraform: Dataplex Data Quality
# Dataplex Data Quality Scan
resource "google_dataplex_datascan" "user_events_quality" {
location = var.region
project = var.project_id
data_scan_id = "user-events-quality"
data {
entity = google_dataplex_entity.user_events.id
}
execution_spec {
trigger {
schedule {
cron = "0 */6 * * *"
}
}
}
data_quality_spec {
sampling_percent = 100
rules {
column = "user_id"
non_null_expectation {}
threshold = 1.0
}
rules {
column = "amount"
range_expectation {
min_value = "0"
max_value = "100000"
}
threshold = 0.99
}
rules {
column = "event_type"
set_expectation {
values = ["page_view", "click", "purchase", "add_to_cart"]
}
threshold = 1.0
}
rules {
uniqueness_expectation {
column = "event_id"
}
threshold = 0.99
}
}
}
9.5.6. Data Drift Detection
Drift means the statistical properties of data are changing over time.
Types of Drift
| Type | Definition | Detection Method |
|---|---|---|
| Covariate Drift | Input feature distribution changes | Statistical tests (KS, PSI) |
| Prior Drift | Target distribution changes | Label distribution monitoring |
| Concept Drift | Relationship between X and Y changes | Model performance monitoring |
| Schema Drift | Data structure changes | Schema validation |
Drift Detection with Evidently
# Install evidently
# pip install evidently
from evidently import ColumnMapping
from evidently.report import Report
from evidently.metric_preset import DataDriftPreset, DataQualityPreset
from evidently.tests import TestSuite
from evidently.test_preset import DataDriftTestPreset
# Define column mapping
column_mapping = ColumnMapping(
prediction="prediction",
numerical_features=["amount", "age", "session_duration"],
categorical_features=["event_type", "device", "country"]
)
# Create drift report
report = Report(metrics=[
DataDriftPreset(),
DataQualityPreset(),
])
# Compare reference (training) and current (production) data
report.run(
reference_data=training_data,
current_data=production_data,
column_mapping=column_mapping
)
# Save report
report.save_html("drift_report.html")
# Get JSON for programmatic access
report_json = report.json()
drift_results = report.as_dict()
# Check if drift detected
if drift_results['metrics'][0]['result']['dataset_drift']:
print("⚠️ Data drift detected!")
for col, drift in drift_results['metrics'][0]['result']['drift_by_columns'].items():
if drift['drift_detected']:
print(f" - {col}: drift score = {drift['drift_score']:.4f}")
Test Suite for CI/CD
# Drift tests for automated validation
from evidently.tests import (
TestNumberOfRows,
TestNumberOfColumns,
TestColumnDrift,
TestShareOfMissingValues,
TestMeanInNSigmas,
)
tests = TestSuite(tests=[
# Row count should be within expected range
TestNumberOfRows(gte=10000, lte=100000),
# Schema stability
TestNumberOfColumns(eq=15),
# Missing value thresholds
TestShareOfMissingValues(column="user_id", lte=0.0),
TestShareOfMissingValues(column="amount", lte=0.2),
# Statistical stability
TestMeanInNSigmas(column="amount", n=3),
# Drift detection
TestColumnDrift(column="amount", stattest_threshold=0.05),
TestColumnDrift(column="event_type", stattest_threshold=0.05),
])
tests.run(reference_data=training_data, current_data=production_data)
# For CI/CD integration
if not tests.as_dict()['summary']['all_passed']:
raise Exception("Data drift tests failed!")
9.5.7. Schema Validation and Evolution
Schema Validation with Pandera
import pandera as pa
from pandera import Column, DataFrameSchema, Check
# Define schema
user_events_schema = DataFrameSchema(
columns={
"event_id": Column(str, Check.str_matches(r"^[a-f0-9-]{36}$")),
"user_id": Column(str, nullable=False),
"event_type": Column(
str,
Check.isin(["page_view", "click", "purchase", "add_to_cart"])
),
"timestamp": Column(pa.DateTime, nullable=False),
"amount": Column(
float,
Check.in_range(0, 100000),
nullable=True
),
"device": Column(str, Check.isin(["mobile", "desktop", "tablet"])),
"country": Column(str, Check.str_length(min_value=2, max_value=2)),
},
coerce=True, # Coerce types if possible
strict=True, # No extra columns allowed
)
# Validate
try:
validated_df = user_events_schema.validate(df)
except pa.errors.SchemaError as e:
print(f"Schema validation failed: {e}")
# Send alert, quarantine data
Schema Evolution with Schema Registry
# Apache Avro schema for event data
schema_v1 = {
"type": "record",
"name": "UserEvent",
"namespace": "com.company.ml",
"fields": [
{"name": "event_id", "type": "string"},
{"name": "user_id", "type": "string"},
{"name": "event_type", "type": "string"},
{"name": "timestamp", "type": "long"},
{"name": "amount", "type": ["null", "double"], "default": None},
]
}
# Compatible evolution: add optional field
schema_v2 = {
"type": "record",
"name": "UserEvent",
"namespace": "com.company.ml",
"fields": [
{"name": "event_id", "type": "string"},
{"name": "user_id", "type": "string"},
{"name": "event_type", "type": "string"},
{"name": "timestamp", "type": "long"},
{"name": "amount", "type": ["null", "double"], "default": None},
# New optional field (backward compatible)
{"name": "session_id", "type": ["null", "string"], "default": None},
]
}
# Register with Confluent Schema Registry
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroSerializer
schema_registry_conf = {'url': 'http://schema-registry:8081'}
schema_registry = SchemaRegistryClient(schema_registry_conf)
# Register schema with compatibility check
schema_registry.register_schema(
"user-events-value",
Schema(json.dumps(schema_v2), "AVRO")
)
9.5.8. Data Quality Metrics and Monitoring
Key Metrics to Track
| Metric | Description | Target |
|---|---|---|
| Completeness Rate | % of non-null values | >99% for required fields |
| Validity Rate | % passing validation rules | >99% |
| Freshness | Time since last update | <1 hour for real-time |
| Consistency Score | Match rate across sources | >99% |
| Drift Score | Statistical distance from baseline | <0.1 |
Prometheus Metrics
from prometheus_client import Counter, Gauge, Histogram
# Quality metrics
rows_processed = Counter(
'data_quality_rows_processed_total',
'Total rows processed',
['dataset', 'status']
)
quality_score = Gauge(
'data_quality_score',
'Overall quality score (0-100)',
['dataset']
)
validation_duration = Histogram(
'data_quality_validation_duration_seconds',
'Time to run validation',
['dataset']
)
drift_score = Gauge(
'data_quality_drift_score',
'Drift score by column',
['dataset', 'column']
)
# Update metrics after validation
def update_quality_metrics(dataset, results):
rows_processed.labels(dataset=dataset, status='pass').inc(results['passed'])
rows_processed.labels(dataset=dataset, status='fail').inc(results['failed'])
quality_score.labels(dataset=dataset).set(results['score'])
for col, score in results['drift_scores'].items():
drift_score.labels(dataset=dataset, column=col).set(score)
9.5.9. Key Takeaways
-
Data quality is foundational: Bad data → bad models, no exceptions.
-
Validate at every stage: Ingestion, transformation, serving.
-
Use Great Expectations or cloud-native tools: Proven frameworks save time.
-
Monitor for drift continuously: Data changes; detect it early.
-
Schema evolution requires planning: Use registries, version schemas.
-
Automate quality gates: Block bad data from entering pipelines.
-
Track quality metrics: What you measure improves.
-
Quarantine, don’t discard: Save bad data for debugging.
Next: 9.6 Advanced Data Versioning — lakeFS, Delta Lake, and reproducibility.