Keyboard shortcuts

Press or to navigate between chapters

Press ? to show this help

Press Esc to hide this help

Chapter 9.7: Data Lineage & Governance

“You can’t govern what you can’t see. And you can’t trust what you can’t trace.” — KPMG Data & Analytics Report, 2023

Data lineage and governance are foundational for regulatory compliance, impact analysis, and building trustworthy ML systems. This chapter covers comprehensive strategies for tracking data provenance across the ML lifecycle.


9.7.1. The Governance Imperative

Why Governance Matters Now

DriverImpact on ML
RegulationsEU AI Act, GDPR, CCPA, HIPAA require explainability
Model RiskRegulators want to trace predictions to source data
Audits“Show me where this model’s training data came from”
Debugging“Why did the model make that prediction?”
TrustStakeholders need to verify data sources

The Cost of Ungoverned ML

IssueReal-World Impact
No lineageBank fined $400M for inability to explain credit decisions
Unknown data sourcesHealthcare model trained on biased subset, recalled
Stale metadataInsurance pricing model used deprecated field, $50M loss
Missing consent trackingGDPR violation, €20M fine

9.7.2. Data Lineage Fundamentals

What Lineage Tracks

┌─────────────────────────────────────────────────────────────────────┐
│                      DATA LINEAGE COMPONENTS                        │
├─────────────────────────────────────────────────────────────────────┤
│                                                                     │
│  ┌──────────────┐    ┌──────────────┐    ┌──────────────┐          │
│  │   SOURCE     │───▶│ TRANSFORM    │───▶│   TARGET     │          │
│  │              │    │              │    │              │          │
│  │  (Origin)    │    │ (Processing) │    │ (Destination)│          │
│  └──────────────┘    └──────────────┘    └──────────────┘          │
│                                                                     │
│  METADATA CAPTURED:                                                 │
│  ┌─────────────────────────────────────────────────────────────┐   │
│  │ • Schema (columns, types)                                    │   │
│  │ • Ownership (team, individual)                               │   │
│  │ • Freshness (last updated)                                   │   │
│  │ • Quality metrics                                            │   │
│  │ • Classification (PII, sensitive)                            │   │
│  │ • Transformations applied                                    │   │
│  │ • Consumers (who uses this data)                             │   │
│  └─────────────────────────────────────────────────────────────┘   │
│                                                                     │
└─────────────────────────────────────────────────────────────────────┘

Types of Lineage

TypeDescriptionUse Case
Table-levelRelationships between tables/datasetsImpact analysis
Column-levelField-to-field mappingsDetailed debugging
TransformationLogic applied to dataAudit compliance
OperationalRuntime execution detailsPerformance analysis

9.7.3. OpenLineage: The Industry Standard

OpenLineage is an open standard for lineage metadata.

OpenLineage Event Structure

{
  "eventType": "COMPLETE",
  "eventTime": "2024-01-15T10:30:00.000Z",
  "run": {
    "runId": "d46e465b-d358-4d32-83d4-df660ff614dd"
  },
  "job": {
    "namespace": "ml-training",
    "name": "user-feature-pipeline"
  },
  "inputs": [
    {
      "namespace": "s3://raw-data",
      "name": "user_events",
      "facets": {
        "schema": {
          "fields": [
            {"name": "user_id", "type": "STRING"},
            {"name": "event_type", "type": "STRING"},
            {"name": "timestamp", "type": "TIMESTAMP"},
            {"name": "amount", "type": "DOUBLE"}
          ]
        },
        "dataSource": {
          "name": "production-kafka",
          "uri": "kafka://prod-cluster/user-events"
        }
      }
    }
  ],
  "outputs": [
    {
      "namespace": "s3://feature-store",
      "name": "user_features",
      "facets": {
        "schema": {
          "fields": [
            {"name": "user_id", "type": "STRING"},
            {"name": "purchase_count_7d", "type": "INTEGER"},
            {"name": "total_spend_7d", "type": "DOUBLE"},
            {"name": "avg_session_duration", "type": "DOUBLE"}
          ]
        },
        "dataQuality": {
          "rowCount": 1500000,
          "nullCount": {"purchase_count_7d": 0, "total_spend_7d": 1523}
        }
      }
    }
  ],
  "producer": "airflow-scheduler"
}

Implementing OpenLineage

from openlineage.client import OpenLineageClient
from openlineage.client.facet import (
    SchemaDatasetFacet,
    SchemaField,
    DataQualityMetricsInputDatasetFacet,
    ColumnMetric,
    SqlJobFacet,
)
from openlineage.client.run import (
    RunEvent, 
    RunState, 
    Job, 
    Run, 
    Dataset,
    InputDataset,
    OutputDataset,
)
import uuid
from datetime import datetime

# Initialize client
client = OpenLineageClient.from_environment()

# Create unique run ID
run_id = str(uuid.uuid4())

# Define job
job = Job(
    namespace="ml-pipelines",
    name="feature-engineering"
)

# Define input dataset with facets
input_schema = SchemaDatasetFacet(
    fields=[
        SchemaField(name="user_id", type="STRING"),
        SchemaField(name="event_type", type="STRING"),
        SchemaField(name="timestamp", type="TIMESTAMP"),
        SchemaField(name="amount", type="DOUBLE"),
    ]
)

input_dataset = InputDataset(
    namespace="s3://raw-data",
    name="user_events",
    facets={"schema": input_schema}
)

# Emit START event
start_event = RunEvent(
    eventType=RunState.START,
    eventTime=datetime.utcnow().isoformat() + "Z",
    run=Run(runId=run_id),
    job=job,
    inputs=[input_dataset],
    outputs=[],
    producer="feature-pipeline"
)
client.emit(start_event)

# ... Run your pipeline ...

# Define output dataset with quality metrics
output_schema = SchemaDatasetFacet(
    fields=[
        SchemaField(name="user_id", type="STRING"),
        SchemaField(name="purchase_count_7d", type="INTEGER"),
        SchemaField(name="total_spend_7d", type="DOUBLE"),
    ]
)

quality_facet = DataQualityMetricsInputDatasetFacet(
    rowCount=1500000,
    columnMetrics={
        "user_id": ColumnMetric(nullCount=0, distinctCount=1200000),
        "purchase_count_7d": ColumnMetric(nullCount=0, min=0, max=127),
        "total_spend_7d": ColumnMetric(nullCount=1523, min=0.0, max=50000.0),
    }
)

output_dataset = OutputDataset(
    namespace="s3://feature-store",
    name="user_features",
    facets={
        "schema": output_schema,
        "dataQuality": quality_facet,
    }
)

# Emit COMPLETE event
complete_event = RunEvent(
    eventType=RunState.COMPLETE,
    eventTime=datetime.utcnow().isoformat() + "Z",
    run=Run(runId=run_id),
    job=job,
    inputs=[input_dataset],
    outputs=[output_dataset],
    producer="feature-pipeline"
)
client.emit(complete_event)

9.7.4. Marquez: OpenLineage Backend

Marquez is the reference OpenLineage backend for storing and querying lineage.

Deploying Marquez

# docker-compose.yml
version: "3"
services:
  marquez:
    image: marquezproject/marquez:0.41.0
    ports:
      - "5000:5000"
      - "5001:5001"
    environment:
      - MARQUEZ_CONFIG=/opt/marquez/marquez.yml
    volumes:
      - ./marquez.yml:/opt/marquez/marquez.yml
    depends_on:
      - db

  marquez-web:
    image: marquezproject/marquez-web:0.41.0
    ports:
      - "3000:3000"
    environment:
      - MARQUEZ_HOST=marquez
      - MARQUEZ_PORT=5000

  db:
    image: postgres:14
    environment:
      - POSTGRES_USER=marquez
      - POSTGRES_PASSWORD=marquez
      - POSTGRES_DB=marquez
    volumes:
      - marquez-data:/var/lib/postgresql/data

volumes:
  marquez-data:
# marquez.yml
server:
  applicationConnectors:
    - type: http
      port: 5000
  adminConnectors:
    - type: http
      port: 5001

db:
  driverClass: org.postgresql.Driver
  url: jdbc:postgresql://db:5432/marquez
  user: marquez
  password: marquez

migrateOnStartup: true

Querying Lineage

import requests

MARQUEZ_URL = "http://localhost:5000/api/v1"

# Get all namespaces
namespaces = requests.get(f"{MARQUEZ_URL}/namespaces").json()

# Get datasets in a namespace
datasets = requests.get(
    f"{MARQUEZ_URL}/namespaces/ml-pipelines/datasets"
).json()

# Get lineage for a specific dataset
lineage = requests.get(
    f"{MARQUEZ_URL}/lineage",
    params={
        "nodeId": "dataset:s3://feature-store:user_features",
        "depth": 5
    }
).json()

# Visualize upstream dependencies
for node in lineage["graph"]:
    if node["type"] == "DATASET":
        print(f"Dataset: {node['data']['name']}")
    elif node["type"] == "JOB":
        print(f"  ← Job: {node['data']['name']}")

9.7.5. AWS Glue Data Catalog Lineage

AWS Glue provides native lineage tracking for ETL jobs.

Enabling Lineage in Glue

# Terraform: Glue job with lineage
resource "aws_glue_job" "feature_pipeline" {
  name     = "feature-engineering"
  role_arn = aws_iam_role.glue_role.arn

  command {
    script_location = "s3://scripts/feature_pipeline.py"
    python_version  = "3"
  }

  default_arguments = {
    "--enable-glue-datacatalog"         = "true"
    "--enable-continuous-cloudwatch-log" = "true"
    "--enable-metrics"                   = "true"
    
    # Enable lineage tracking
    "--enable-job-insights"             = "true"
  }

  glue_version = "4.0"
}

Glue Data Catalog Integration

# Glue ETL with catalog lineage
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

# Read from catalog (lineage auto-tracked)
source = glueContext.create_dynamic_frame.from_catalog(
    database="ml_database",
    table_name="user_events",
    transformation_ctx="source"  # Important for lineage!
)

# Transform
transformed = source.apply_mapping([
    ("user_id", "string", "user_id", "string"),
    ("event_type", "string", "event_type", "string"),
    ("amount", "double", "amount", "double"),
])

# Aggregate
aggregated = transformed.toDF() \
    .groupBy("user_id") \
    .agg(
        F.count("*").alias("event_count"),
        F.sum("amount").alias("total_amount")
    )

# Write to catalog (lineage auto-tracked)
output = DynamicFrame.fromDF(aggregated, glueContext, "output")
glueContext.write_dynamic_frame.from_catalog(
    frame=output,
    database="ml_database",
    table_name="user_features",
    transformation_ctx="output"  # Important for lineage!
)

job.commit()

Querying Glue Lineage

import boto3

glue = boto3.client('glue')

# Get column-level lineage
response = glue.get_mapping(
    Source={
        'DatabaseName': 'ml_database',
        'TableName': 'user_events'
    },
    Sinks=[{
        'DatabaseName': 'ml_database',
        'TableName': 'user_features'
    }]
)

for mapping in response['Mapping']:
    print(f"{mapping['SourceColumn']} → {mapping['TargetColumn']}")

9.7.6. GCP Dataplex Lineage

Google Cloud Dataplex provides integrated lineage through Data Catalog.

Dataplex Lineage API

from google.cloud import datacatalog_lineage_v1

client = datacatalog_lineage_v1.LineageClient()

# Create a process (represents a transformation)
process = datacatalog_lineage_v1.Process(
    name=f"projects/{project}/locations/{location}/processes/feature-pipeline",
    display_name="Feature Engineering Pipeline",
    attributes={
        "author": datacatalog_lineage_v1.AttributeValue(value_string="ml-team"),
        "pipeline_version": datacatalog_lineage_v1.AttributeValue(value_string="1.2.3"),
    }
)

created_process = client.create_process(
    parent=f"projects/{project}/locations/{location}",
    process=process
)

# Create a run
run = datacatalog_lineage_v1.Run(
    display_name="Daily Run 2024-01-15",
    state=datacatalog_lineage_v1.Run.State.STARTED,
    start_time={"seconds": int(time.time())},
)

created_run = client.create_run(
    parent=created_process.name,
    run=run
)

# Create lineage events
lineage_event = datacatalog_lineage_v1.LineageEvent(
    start_time={"seconds": int(time.time())},
    links=[
        datacatalog_lineage_v1.EventLink(
            source=datacatalog_lineage_v1.EntityReference(
                fully_qualified_name="bigquery:project.dataset.user_events"
            ),
            target=datacatalog_lineage_v1.EntityReference(
                fully_qualified_name="bigquery:project.dataset.user_features"
            ),
        )
    ]
)

client.create_lineage_event(
    parent=created_run.name,
    lineage_event=lineage_event
)

# Complete the run
client.update_run(
    run=datacatalog_lineage_v1.Run(
        name=created_run.name,
        state=datacatalog_lineage_v1.Run.State.COMPLETED,
        end_time={"seconds": int(time.time())},
    ),
    update_mask={"paths": ["state", "end_time"]}
)

Terraform: Dataplex Lineage

# Data Catalog taxonomy for classification
resource "google_data_catalog_taxonomy" "ml_classifications" {
  provider = google-beta
  region   = var.region
  
  display_name = "ML Data Classifications"
  description  = "Classification taxonomy for ML data governance"
  
  activated_policy_types = ["FINE_GRAINED_ACCESS_CONTROL"]
}

# Policy tags for data classification
resource "google_data_catalog_policy_tag" "pii" {
  provider = google-beta
  taxonomy = google_data_catalog_taxonomy.ml_classifications.id
  
  display_name = "PII"
  description  = "Personally Identifiable Information"
}

resource "google_data_catalog_policy_tag" "sensitive" {
  provider     = google-beta
  taxonomy     = google_data_catalog_taxonomy.ml_classifications.id
  parent_policy_tag = google_data_catalog_policy_tag.pii.id
  
  display_name = "Sensitive"
  description  = "Sensitive personal data"
}

# Apply tags to BigQuery columns
resource "google_bigquery_table" "user_features" {
  dataset_id = google_bigquery_dataset.ml_features.dataset_id
  table_id   = "user_features"
  
  schema = jsonencode([
    {
      name        = "user_id"
      type        = "STRING"
      mode        = "REQUIRED"
      policyTags  = {
        names = [google_data_catalog_policy_tag.pii.name]
      }
    },
    {
      name = "purchase_count_7d"
      type = "INTEGER"
      mode = "NULLABLE"
    },
    {
      name = "total_spend_7d"
      type = "FLOAT"
      mode = "NULLABLE"
    }
  ])
}

9.7.7. Data Classification and PII Tracking

Automated PII Detection

from google.cloud import dlp_v2

dlp = dlp_v2.DlpServiceClient()

# Configure inspection
inspect_config = dlp_v2.InspectConfig(
    info_types=[
        dlp_v2.InfoType(name="EMAIL_ADDRESS"),
        dlp_v2.InfoType(name="PHONE_NUMBER"),
        dlp_v2.InfoType(name="CREDIT_CARD_NUMBER"),
        dlp_v2.InfoType(name="US_SOCIAL_SECURITY_NUMBER"),
        dlp_v2.InfoType(name="PERSON_NAME"),
        dlp_v2.InfoType(name="STREET_ADDRESS"),
    ],
    min_likelihood=dlp_v2.Likelihood.LIKELY,
    include_quote=True,
)

# Inspect a BigQuery table
job_config = dlp_v2.InspectJobConfig(
    storage_config=dlp_v2.StorageConfig(
        big_query_options=dlp_v2.BigQueryOptions(
            table_reference=dlp_v2.BigQueryTable(
                project_id=project_id,
                dataset_id="ml_data",
                table_id="user_profiles"
            )
        )
    ),
    inspect_config=inspect_config,
    actions=[
        dlp_v2.Action(
            save_findings=dlp_v2.Action.SaveFindings(
                output_config=dlp_v2.OutputStorageConfig(
                    table=dlp_v2.BigQueryTable(
                        project_id=project_id,
                        dataset_id="dlp_findings",
                        table_id="pii_scan_results"
                    )
                )
            )
        ),
        dlp_v2.Action(
            publish_to_stackdriver={}
        )
    ]
)

# Create the inspection job
parent = f"projects/{project_id}/locations/global"
response = dlp.create_dlp_job(
    parent=parent,
    inspect_job=job_config
)

AWS Macie for PII Discovery

# Enable Macie for S3 data classification
resource "aws_macie2_account" "ml_data" {}

resource "aws_macie2_classification_job" "pii_discovery" {
  name                       = "ml-data-pii-discovery"
  job_type                   = "SCHEDULED"
  schedule_frequency_weekly  = true
  
  s3_job_definition {
    bucket_definitions {
      account_id = data.aws_caller_identity.current.account_id
      buckets    = [aws_s3_bucket.ml_data.id]
    }
    
    scoping {
      includes {
        and {
          simple_scope_term {
            comparator       = "STARTS_WITH"
            key             = "OBJECT_KEY"
            values          = ["raw/", "features/", "training/"]
          }
        }
      }
    }
  }
  
  custom_data_identifier_ids = [
    aws_macie2_custom_data_identifier.customer_id.id
  ]
  
  sampling_percentage = 100
}

# Custom identifier for internal customer IDs
resource "aws_macie2_custom_data_identifier" "customer_id" {
  name                   = "internal-customer-id"
  regex                  = "CUST-[A-Z0-9]{8}"
  description            = "Internal customer identifier format"
  maximum_match_distance = 50
}

9.7.8. ML Model Lineage

Connecting data lineage to models.

Model Training Lineage

import mlflow
from openlineage.client import OpenLineageClient
from openlineage.client.run import RunEvent, Job, Run, Dataset

# Log model lineage
with mlflow.start_run() as run:
    # Capture data lineage
    data_version = "lakefs://ml-data/main@abc123"
    feature_version = "feast://user_features/v1.2"
    
    mlflow.log_param("data_version", data_version)
    mlflow.log_param("feature_version", feature_version)
    mlflow.log_param("training_date", datetime.now().isoformat())
    
    # Train model
    model = train_model(X_train, y_train)
    
    # Log model with lineage tags
    mlflow.sklearn.log_model(
        model,
        "model",
        registered_model_name="fraud_detector",
        signature=signature,
        metadata={
            "training_data": data_version,
            "feature_store": feature_version,
            "columns_used": X_train.columns.tolist(),
        }
    )

    # Emit OpenLineage event connecting data to model
    lineage_client = OpenLineageClient.from_environment()
    
    lineage_event = RunEvent(
        eventType="COMPLETE",
        job=Job(namespace="ml-training", name="fraud-detector-training"),
        run=Run(runId=run.info.run_id),
        inputs=[
            Dataset(
                namespace="lakefs://ml-data",
                name="training_data",
                facets={"version": {"version": "abc123"}}
            ),
            Dataset(
                namespace="feast://",
                name="user_features",
                facets={"version": {"version": "v1.2"}}
            )
        ],
        outputs=[
            Dataset(
                namespace="mlflow://",
                name="fraud_detector",
                facets={
                    "version": {"version": run.info.run_id},
                    "model_type": {"type": "random_forest"}
                }
            )
        ],
        producer="mlflow"
    )
    lineage_client.emit(lineage_event)

Model Cards with Lineage

# Generate model card with data lineage
model_card = {
    "model_details": {
        "name": "fraud_detector",
        "version": "1.3.0",
        "type": "RandomForestClassifier",
        "trained_on": "2024-01-15",
    },
    "data_lineage": {
        "training_data": {
            "source": "lakefs://ml-data/main",
            "version": "abc123def456",
            "rows": 1_500_000,
            "columns": 45,
            "date_range": "2022-01-01 to 2024-01-01",
        },
        "features": {
            "source": "feast://user_features",
            "version": "v1.2",
            "feature_count": 25,
            "feature_names": ["purchase_count_7d", "total_spend_7d", ...],
        },
        "labels": {
            "source": "s3://labels/fraud_labels",
            "labeling_method": "Manual review + production feedback",
            "fraud_rate": "2.3%",
        }
    },
    "evaluation": {
        "test_set_version": "abc123def456",
        "metrics": {
            "auc_roc": 0.94,
            "precision": 0.78,
            "recall": 0.82,
        }
    },
    "governance": {
        "owner": "fraud-detection-team",
        "approved_by": "model-risk-committee",
        "approval_date": "2024-01-20",
        "next_review": "2024-04-20",
    }
}

9.7.9. Governance Automation

Schema Change Alerts

# Monitor for schema drift
def check_schema_changes(table_name: str, current_schema: dict) -> list:
    """Compare current schema to catalog and alert on changes."""
    
    catalog_schema = get_catalog_schema(table_name)
    alerts = []
    
    current_cols = set(current_schema.keys())
    catalog_cols = set(catalog_schema.keys())
    
    # New columns
    new_cols = current_cols - catalog_cols
    if new_cols:
        alerts.append({
            "type": "SCHEMA_ADDITION",
            "table": table_name,
            "columns": list(new_cols),
            "severity": "INFO"
        })
    
    # Removed columns
    removed_cols = catalog_cols - current_cols
    if removed_cols:
        alerts.append({
            "type": "SCHEMA_REMOVAL",
            "table": table_name,
            "columns": list(removed_cols),
            "severity": "WARNING"
        })
    
    # Type changes
    for col in current_cols & catalog_cols:
        if current_schema[col]["type"] != catalog_schema[col]["type"]:
            alerts.append({
                "type": "TYPE_CHANGE",
                "table": table_name,
                "column": col,
                "from": catalog_schema[col]["type"],
                "to": current_schema[col]["type"],
                "severity": "ERROR"
            })
    
    return alerts

Impact Analysis

def analyze_impact(dataset_name: str) -> dict:
    """Analyze downstream impact of changes to a dataset."""
    
    # Query lineage graph
    lineage = get_lineage_graph(dataset_name)
    
    downstream = []
    for edge in lineage["edges"]:
        if edge["source"] == dataset_name:
            downstream.append({
                "type": edge["target_type"],
                "name": edge["target_name"],
                "owner": get_owner(edge["target_name"]),
            })
    
    # Categorize by type
    impacted_tables = [d for d in downstream if d["type"] == "table"]
    impacted_models = [d for d in downstream if d["type"] == "model"]
    impacted_dashboards = [d for d in downstream if d["type"] == "dashboard"]
    
    # Generate impact report
    return {
        "dataset": dataset_name,
        "total_downstream": len(downstream),
        "impacted_tables": impacted_tables,
        "impacted_models": impacted_models,
        "impacted_dashboards": impacted_dashboards,
        "owners_to_notify": list(set(d["owner"] for d in downstream)),
    }

9.7.10. Key Takeaways

  1. Lineage is mandatory for compliance: GDPR, EU AI Act, financial regulations require it.

  2. Use OpenLineage for interoperability: Standard format, works across tools.

  3. Marquez or cloud-native for storage: Both work; choose based on cloud strategy.

  4. Track column-level lineage: Table-level isn’t enough for debugging.

  5. Classify data automatically: Use DLP/Macie to find PII.

  6. Connect data lineage to models: ML lineage requires both.

  7. Automate governance: Schema alerts, impact analysis, compliance checks.

  8. Model cards complete the picture: Document lineage for every model.


9.7.11. Chapter 9 Summary

SectionKey Content
9.1 Lambda & KappaBatch/streaming unification architectures
9.2 Cloud StorageS3, GCS, FSx, Filestore optimization
9.3 Processing EnginesGlue, EMR, Dataflow, Dataproc
9.4 Synthetic DataGANs, simulation for data augmentation
9.5 Data QualityGreat Expectations, drift detection
9.6 Data VersioninglakeFS, Delta Lake, DVC
9.7 Lineage & GovernanceOpenLineage, PII, compliance

The Data Pipeline Formula:

Reliable ML = 
    Robust Ingestion + 
    Quality Validation + 
    Versioning + 
    Lineage + 
    Governance

End of Chapter 9: Advanced Data Pipeline Architecture

Continue to Chapter 10: LabelOps (The Human-in-the-Loop)