Chapter 9.7: Data Lineage & Governance
“You can’t govern what you can’t see. And you can’t trust what you can’t trace.” — KPMG Data & Analytics Report, 2023
Data lineage and governance are foundational for regulatory compliance, impact analysis, and building trustworthy ML systems. This chapter covers comprehensive strategies for tracking data provenance across the ML lifecycle.
9.7.1. The Governance Imperative
Why Governance Matters Now
| Driver | Impact on ML |
|---|---|
| Regulations | EU AI Act, GDPR, CCPA, HIPAA require explainability |
| Model Risk | Regulators want to trace predictions to source data |
| Audits | “Show me where this model’s training data came from” |
| Debugging | “Why did the model make that prediction?” |
| Trust | Stakeholders need to verify data sources |
The Cost of Ungoverned ML
| Issue | Real-World Impact |
|---|---|
| No lineage | Bank fined $400M for inability to explain credit decisions |
| Unknown data sources | Healthcare model trained on biased subset, recalled |
| Stale metadata | Insurance pricing model used deprecated field, $50M loss |
| Missing consent tracking | GDPR violation, €20M fine |
9.7.2. Data Lineage Fundamentals
What Lineage Tracks
┌─────────────────────────────────────────────────────────────────────┐
│ DATA LINEAGE COMPONENTS │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ SOURCE │───▶│ TRANSFORM │───▶│ TARGET │ │
│ │ │ │ │ │ │ │
│ │ (Origin) │ │ (Processing) │ │ (Destination)│ │
│ └──────────────┘ └──────────────┘ └──────────────┘ │
│ │
│ METADATA CAPTURED: │
│ ┌─────────────────────────────────────────────────────────────┐ │
│ │ • Schema (columns, types) │ │
│ │ • Ownership (team, individual) │ │
│ │ • Freshness (last updated) │ │
│ │ • Quality metrics │ │
│ │ • Classification (PII, sensitive) │ │
│ │ • Transformations applied │ │
│ │ • Consumers (who uses this data) │ │
│ └─────────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────┘
Types of Lineage
| Type | Description | Use Case |
|---|---|---|
| Table-level | Relationships between tables/datasets | Impact analysis |
| Column-level | Field-to-field mappings | Detailed debugging |
| Transformation | Logic applied to data | Audit compliance |
| Operational | Runtime execution details | Performance analysis |
9.7.3. OpenLineage: The Industry Standard
OpenLineage is an open standard for lineage metadata.
OpenLineage Event Structure
{
"eventType": "COMPLETE",
"eventTime": "2024-01-15T10:30:00.000Z",
"run": {
"runId": "d46e465b-d358-4d32-83d4-df660ff614dd"
},
"job": {
"namespace": "ml-training",
"name": "user-feature-pipeline"
},
"inputs": [
{
"namespace": "s3://raw-data",
"name": "user_events",
"facets": {
"schema": {
"fields": [
{"name": "user_id", "type": "STRING"},
{"name": "event_type", "type": "STRING"},
{"name": "timestamp", "type": "TIMESTAMP"},
{"name": "amount", "type": "DOUBLE"}
]
},
"dataSource": {
"name": "production-kafka",
"uri": "kafka://prod-cluster/user-events"
}
}
}
],
"outputs": [
{
"namespace": "s3://feature-store",
"name": "user_features",
"facets": {
"schema": {
"fields": [
{"name": "user_id", "type": "STRING"},
{"name": "purchase_count_7d", "type": "INTEGER"},
{"name": "total_spend_7d", "type": "DOUBLE"},
{"name": "avg_session_duration", "type": "DOUBLE"}
]
},
"dataQuality": {
"rowCount": 1500000,
"nullCount": {"purchase_count_7d": 0, "total_spend_7d": 1523}
}
}
}
],
"producer": "airflow-scheduler"
}
Implementing OpenLineage
from openlineage.client import OpenLineageClient
from openlineage.client.facet import (
SchemaDatasetFacet,
SchemaField,
DataQualityMetricsInputDatasetFacet,
ColumnMetric,
SqlJobFacet,
)
from openlineage.client.run import (
RunEvent,
RunState,
Job,
Run,
Dataset,
InputDataset,
OutputDataset,
)
import uuid
from datetime import datetime
# Initialize client
client = OpenLineageClient.from_environment()
# Create unique run ID
run_id = str(uuid.uuid4())
# Define job
job = Job(
namespace="ml-pipelines",
name="feature-engineering"
)
# Define input dataset with facets
input_schema = SchemaDatasetFacet(
fields=[
SchemaField(name="user_id", type="STRING"),
SchemaField(name="event_type", type="STRING"),
SchemaField(name="timestamp", type="TIMESTAMP"),
SchemaField(name="amount", type="DOUBLE"),
]
)
input_dataset = InputDataset(
namespace="s3://raw-data",
name="user_events",
facets={"schema": input_schema}
)
# Emit START event
start_event = RunEvent(
eventType=RunState.START,
eventTime=datetime.utcnow().isoformat() + "Z",
run=Run(runId=run_id),
job=job,
inputs=[input_dataset],
outputs=[],
producer="feature-pipeline"
)
client.emit(start_event)
# ... Run your pipeline ...
# Define output dataset with quality metrics
output_schema = SchemaDatasetFacet(
fields=[
SchemaField(name="user_id", type="STRING"),
SchemaField(name="purchase_count_7d", type="INTEGER"),
SchemaField(name="total_spend_7d", type="DOUBLE"),
]
)
quality_facet = DataQualityMetricsInputDatasetFacet(
rowCount=1500000,
columnMetrics={
"user_id": ColumnMetric(nullCount=0, distinctCount=1200000),
"purchase_count_7d": ColumnMetric(nullCount=0, min=0, max=127),
"total_spend_7d": ColumnMetric(nullCount=1523, min=0.0, max=50000.0),
}
)
output_dataset = OutputDataset(
namespace="s3://feature-store",
name="user_features",
facets={
"schema": output_schema,
"dataQuality": quality_facet,
}
)
# Emit COMPLETE event
complete_event = RunEvent(
eventType=RunState.COMPLETE,
eventTime=datetime.utcnow().isoformat() + "Z",
run=Run(runId=run_id),
job=job,
inputs=[input_dataset],
outputs=[output_dataset],
producer="feature-pipeline"
)
client.emit(complete_event)
9.7.4. Marquez: OpenLineage Backend
Marquez is the reference OpenLineage backend for storing and querying lineage.
Deploying Marquez
# docker-compose.yml
version: "3"
services:
marquez:
image: marquezproject/marquez:0.41.0
ports:
- "5000:5000"
- "5001:5001"
environment:
- MARQUEZ_CONFIG=/opt/marquez/marquez.yml
volumes:
- ./marquez.yml:/opt/marquez/marquez.yml
depends_on:
- db
marquez-web:
image: marquezproject/marquez-web:0.41.0
ports:
- "3000:3000"
environment:
- MARQUEZ_HOST=marquez
- MARQUEZ_PORT=5000
db:
image: postgres:14
environment:
- POSTGRES_USER=marquez
- POSTGRES_PASSWORD=marquez
- POSTGRES_DB=marquez
volumes:
- marquez-data:/var/lib/postgresql/data
volumes:
marquez-data:
# marquez.yml
server:
applicationConnectors:
- type: http
port: 5000
adminConnectors:
- type: http
port: 5001
db:
driverClass: org.postgresql.Driver
url: jdbc:postgresql://db:5432/marquez
user: marquez
password: marquez
migrateOnStartup: true
Querying Lineage
import requests
MARQUEZ_URL = "http://localhost:5000/api/v1"
# Get all namespaces
namespaces = requests.get(f"{MARQUEZ_URL}/namespaces").json()
# Get datasets in a namespace
datasets = requests.get(
f"{MARQUEZ_URL}/namespaces/ml-pipelines/datasets"
).json()
# Get lineage for a specific dataset
lineage = requests.get(
f"{MARQUEZ_URL}/lineage",
params={
"nodeId": "dataset:s3://feature-store:user_features",
"depth": 5
}
).json()
# Visualize upstream dependencies
for node in lineage["graph"]:
if node["type"] == "DATASET":
print(f"Dataset: {node['data']['name']}")
elif node["type"] == "JOB":
print(f" ← Job: {node['data']['name']}")
9.7.5. AWS Glue Data Catalog Lineage
AWS Glue provides native lineage tracking for ETL jobs.
Enabling Lineage in Glue
# Terraform: Glue job with lineage
resource "aws_glue_job" "feature_pipeline" {
name = "feature-engineering"
role_arn = aws_iam_role.glue_role.arn
command {
script_location = "s3://scripts/feature_pipeline.py"
python_version = "3"
}
default_arguments = {
"--enable-glue-datacatalog" = "true"
"--enable-continuous-cloudwatch-log" = "true"
"--enable-metrics" = "true"
# Enable lineage tracking
"--enable-job-insights" = "true"
}
glue_version = "4.0"
}
Glue Data Catalog Integration
# Glue ETL with catalog lineage
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
# Read from catalog (lineage auto-tracked)
source = glueContext.create_dynamic_frame.from_catalog(
database="ml_database",
table_name="user_events",
transformation_ctx="source" # Important for lineage!
)
# Transform
transformed = source.apply_mapping([
("user_id", "string", "user_id", "string"),
("event_type", "string", "event_type", "string"),
("amount", "double", "amount", "double"),
])
# Aggregate
aggregated = transformed.toDF() \
.groupBy("user_id") \
.agg(
F.count("*").alias("event_count"),
F.sum("amount").alias("total_amount")
)
# Write to catalog (lineage auto-tracked)
output = DynamicFrame.fromDF(aggregated, glueContext, "output")
glueContext.write_dynamic_frame.from_catalog(
frame=output,
database="ml_database",
table_name="user_features",
transformation_ctx="output" # Important for lineage!
)
job.commit()
Querying Glue Lineage
import boto3
glue = boto3.client('glue')
# Get column-level lineage
response = glue.get_mapping(
Source={
'DatabaseName': 'ml_database',
'TableName': 'user_events'
},
Sinks=[{
'DatabaseName': 'ml_database',
'TableName': 'user_features'
}]
)
for mapping in response['Mapping']:
print(f"{mapping['SourceColumn']} → {mapping['TargetColumn']}")
9.7.6. GCP Dataplex Lineage
Google Cloud Dataplex provides integrated lineage through Data Catalog.
Dataplex Lineage API
from google.cloud import datacatalog_lineage_v1
client = datacatalog_lineage_v1.LineageClient()
# Create a process (represents a transformation)
process = datacatalog_lineage_v1.Process(
name=f"projects/{project}/locations/{location}/processes/feature-pipeline",
display_name="Feature Engineering Pipeline",
attributes={
"author": datacatalog_lineage_v1.AttributeValue(value_string="ml-team"),
"pipeline_version": datacatalog_lineage_v1.AttributeValue(value_string="1.2.3"),
}
)
created_process = client.create_process(
parent=f"projects/{project}/locations/{location}",
process=process
)
# Create a run
run = datacatalog_lineage_v1.Run(
display_name="Daily Run 2024-01-15",
state=datacatalog_lineage_v1.Run.State.STARTED,
start_time={"seconds": int(time.time())},
)
created_run = client.create_run(
parent=created_process.name,
run=run
)
# Create lineage events
lineage_event = datacatalog_lineage_v1.LineageEvent(
start_time={"seconds": int(time.time())},
links=[
datacatalog_lineage_v1.EventLink(
source=datacatalog_lineage_v1.EntityReference(
fully_qualified_name="bigquery:project.dataset.user_events"
),
target=datacatalog_lineage_v1.EntityReference(
fully_qualified_name="bigquery:project.dataset.user_features"
),
)
]
)
client.create_lineage_event(
parent=created_run.name,
lineage_event=lineage_event
)
# Complete the run
client.update_run(
run=datacatalog_lineage_v1.Run(
name=created_run.name,
state=datacatalog_lineage_v1.Run.State.COMPLETED,
end_time={"seconds": int(time.time())},
),
update_mask={"paths": ["state", "end_time"]}
)
Terraform: Dataplex Lineage
# Data Catalog taxonomy for classification
resource "google_data_catalog_taxonomy" "ml_classifications" {
provider = google-beta
region = var.region
display_name = "ML Data Classifications"
description = "Classification taxonomy for ML data governance"
activated_policy_types = ["FINE_GRAINED_ACCESS_CONTROL"]
}
# Policy tags for data classification
resource "google_data_catalog_policy_tag" "pii" {
provider = google-beta
taxonomy = google_data_catalog_taxonomy.ml_classifications.id
display_name = "PII"
description = "Personally Identifiable Information"
}
resource "google_data_catalog_policy_tag" "sensitive" {
provider = google-beta
taxonomy = google_data_catalog_taxonomy.ml_classifications.id
parent_policy_tag = google_data_catalog_policy_tag.pii.id
display_name = "Sensitive"
description = "Sensitive personal data"
}
# Apply tags to BigQuery columns
resource "google_bigquery_table" "user_features" {
dataset_id = google_bigquery_dataset.ml_features.dataset_id
table_id = "user_features"
schema = jsonencode([
{
name = "user_id"
type = "STRING"
mode = "REQUIRED"
policyTags = {
names = [google_data_catalog_policy_tag.pii.name]
}
},
{
name = "purchase_count_7d"
type = "INTEGER"
mode = "NULLABLE"
},
{
name = "total_spend_7d"
type = "FLOAT"
mode = "NULLABLE"
}
])
}
9.7.7. Data Classification and PII Tracking
Automated PII Detection
from google.cloud import dlp_v2
dlp = dlp_v2.DlpServiceClient()
# Configure inspection
inspect_config = dlp_v2.InspectConfig(
info_types=[
dlp_v2.InfoType(name="EMAIL_ADDRESS"),
dlp_v2.InfoType(name="PHONE_NUMBER"),
dlp_v2.InfoType(name="CREDIT_CARD_NUMBER"),
dlp_v2.InfoType(name="US_SOCIAL_SECURITY_NUMBER"),
dlp_v2.InfoType(name="PERSON_NAME"),
dlp_v2.InfoType(name="STREET_ADDRESS"),
],
min_likelihood=dlp_v2.Likelihood.LIKELY,
include_quote=True,
)
# Inspect a BigQuery table
job_config = dlp_v2.InspectJobConfig(
storage_config=dlp_v2.StorageConfig(
big_query_options=dlp_v2.BigQueryOptions(
table_reference=dlp_v2.BigQueryTable(
project_id=project_id,
dataset_id="ml_data",
table_id="user_profiles"
)
)
),
inspect_config=inspect_config,
actions=[
dlp_v2.Action(
save_findings=dlp_v2.Action.SaveFindings(
output_config=dlp_v2.OutputStorageConfig(
table=dlp_v2.BigQueryTable(
project_id=project_id,
dataset_id="dlp_findings",
table_id="pii_scan_results"
)
)
)
),
dlp_v2.Action(
publish_to_stackdriver={}
)
]
)
# Create the inspection job
parent = f"projects/{project_id}/locations/global"
response = dlp.create_dlp_job(
parent=parent,
inspect_job=job_config
)
AWS Macie for PII Discovery
# Enable Macie for S3 data classification
resource "aws_macie2_account" "ml_data" {}
resource "aws_macie2_classification_job" "pii_discovery" {
name = "ml-data-pii-discovery"
job_type = "SCHEDULED"
schedule_frequency_weekly = true
s3_job_definition {
bucket_definitions {
account_id = data.aws_caller_identity.current.account_id
buckets = [aws_s3_bucket.ml_data.id]
}
scoping {
includes {
and {
simple_scope_term {
comparator = "STARTS_WITH"
key = "OBJECT_KEY"
values = ["raw/", "features/", "training/"]
}
}
}
}
}
custom_data_identifier_ids = [
aws_macie2_custom_data_identifier.customer_id.id
]
sampling_percentage = 100
}
# Custom identifier for internal customer IDs
resource "aws_macie2_custom_data_identifier" "customer_id" {
name = "internal-customer-id"
regex = "CUST-[A-Z0-9]{8}"
description = "Internal customer identifier format"
maximum_match_distance = 50
}
9.7.8. ML Model Lineage
Connecting data lineage to models.
Model Training Lineage
import mlflow
from openlineage.client import OpenLineageClient
from openlineage.client.run import RunEvent, Job, Run, Dataset
# Log model lineage
with mlflow.start_run() as run:
# Capture data lineage
data_version = "lakefs://ml-data/main@abc123"
feature_version = "feast://user_features/v1.2"
mlflow.log_param("data_version", data_version)
mlflow.log_param("feature_version", feature_version)
mlflow.log_param("training_date", datetime.now().isoformat())
# Train model
model = train_model(X_train, y_train)
# Log model with lineage tags
mlflow.sklearn.log_model(
model,
"model",
registered_model_name="fraud_detector",
signature=signature,
metadata={
"training_data": data_version,
"feature_store": feature_version,
"columns_used": X_train.columns.tolist(),
}
)
# Emit OpenLineage event connecting data to model
lineage_client = OpenLineageClient.from_environment()
lineage_event = RunEvent(
eventType="COMPLETE",
job=Job(namespace="ml-training", name="fraud-detector-training"),
run=Run(runId=run.info.run_id),
inputs=[
Dataset(
namespace="lakefs://ml-data",
name="training_data",
facets={"version": {"version": "abc123"}}
),
Dataset(
namespace="feast://",
name="user_features",
facets={"version": {"version": "v1.2"}}
)
],
outputs=[
Dataset(
namespace="mlflow://",
name="fraud_detector",
facets={
"version": {"version": run.info.run_id},
"model_type": {"type": "random_forest"}
}
)
],
producer="mlflow"
)
lineage_client.emit(lineage_event)
Model Cards with Lineage
# Generate model card with data lineage
model_card = {
"model_details": {
"name": "fraud_detector",
"version": "1.3.0",
"type": "RandomForestClassifier",
"trained_on": "2024-01-15",
},
"data_lineage": {
"training_data": {
"source": "lakefs://ml-data/main",
"version": "abc123def456",
"rows": 1_500_000,
"columns": 45,
"date_range": "2022-01-01 to 2024-01-01",
},
"features": {
"source": "feast://user_features",
"version": "v1.2",
"feature_count": 25,
"feature_names": ["purchase_count_7d", "total_spend_7d", ...],
},
"labels": {
"source": "s3://labels/fraud_labels",
"labeling_method": "Manual review + production feedback",
"fraud_rate": "2.3%",
}
},
"evaluation": {
"test_set_version": "abc123def456",
"metrics": {
"auc_roc": 0.94,
"precision": 0.78,
"recall": 0.82,
}
},
"governance": {
"owner": "fraud-detection-team",
"approved_by": "model-risk-committee",
"approval_date": "2024-01-20",
"next_review": "2024-04-20",
}
}
9.7.9. Governance Automation
Schema Change Alerts
# Monitor for schema drift
def check_schema_changes(table_name: str, current_schema: dict) -> list:
"""Compare current schema to catalog and alert on changes."""
catalog_schema = get_catalog_schema(table_name)
alerts = []
current_cols = set(current_schema.keys())
catalog_cols = set(catalog_schema.keys())
# New columns
new_cols = current_cols - catalog_cols
if new_cols:
alerts.append({
"type": "SCHEMA_ADDITION",
"table": table_name,
"columns": list(new_cols),
"severity": "INFO"
})
# Removed columns
removed_cols = catalog_cols - current_cols
if removed_cols:
alerts.append({
"type": "SCHEMA_REMOVAL",
"table": table_name,
"columns": list(removed_cols),
"severity": "WARNING"
})
# Type changes
for col in current_cols & catalog_cols:
if current_schema[col]["type"] != catalog_schema[col]["type"]:
alerts.append({
"type": "TYPE_CHANGE",
"table": table_name,
"column": col,
"from": catalog_schema[col]["type"],
"to": current_schema[col]["type"],
"severity": "ERROR"
})
return alerts
Impact Analysis
def analyze_impact(dataset_name: str) -> dict:
"""Analyze downstream impact of changes to a dataset."""
# Query lineage graph
lineage = get_lineage_graph(dataset_name)
downstream = []
for edge in lineage["edges"]:
if edge["source"] == dataset_name:
downstream.append({
"type": edge["target_type"],
"name": edge["target_name"],
"owner": get_owner(edge["target_name"]),
})
# Categorize by type
impacted_tables = [d for d in downstream if d["type"] == "table"]
impacted_models = [d for d in downstream if d["type"] == "model"]
impacted_dashboards = [d for d in downstream if d["type"] == "dashboard"]
# Generate impact report
return {
"dataset": dataset_name,
"total_downstream": len(downstream),
"impacted_tables": impacted_tables,
"impacted_models": impacted_models,
"impacted_dashboards": impacted_dashboards,
"owners_to_notify": list(set(d["owner"] for d in downstream)),
}
9.7.10. Key Takeaways
-
Lineage is mandatory for compliance: GDPR, EU AI Act, financial regulations require it.
-
Use OpenLineage for interoperability: Standard format, works across tools.
-
Marquez or cloud-native for storage: Both work; choose based on cloud strategy.
-
Track column-level lineage: Table-level isn’t enough for debugging.
-
Classify data automatically: Use DLP/Macie to find PII.
-
Connect data lineage to models: ML lineage requires both.
-
Automate governance: Schema alerts, impact analysis, compliance checks.
-
Model cards complete the picture: Document lineage for every model.
9.7.11. Chapter 9 Summary
| Section | Key Content |
|---|---|
| 9.1 Lambda & Kappa | Batch/streaming unification architectures |
| 9.2 Cloud Storage | S3, GCS, FSx, Filestore optimization |
| 9.3 Processing Engines | Glue, EMR, Dataflow, Dataproc |
| 9.4 Synthetic Data | GANs, simulation for data augmentation |
| 9.5 Data Quality | Great Expectations, drift detection |
| 9.6 Data Versioning | lakeFS, Delta Lake, DVC |
| 9.7 Lineage & Governance | OpenLineage, PII, compliance |
The Data Pipeline Formula:
Reliable ML =
Robust Ingestion +
Quality Validation +
Versioning +
Lineage +
Governance
End of Chapter 9: Advanced Data Pipeline Architecture
Continue to Chapter 10: LabelOps (The Human-in-the-Loop)