Keyboard shortcuts

Press or to navigate between chapters

Press ? to show this help

Press Esc to hide this help

Chapter 9.6: Advanced Data Versioning

“Without versioning, you can’t reproduce results. Without reproducibility, you don’t have science—you have anecdotes.” — Pete Warden, Former Google Staff Engineer

Data versioning is the foundation of ML reproducibility. This chapter covers deep dives into lakeFS, Delta Lake, and other versioning strategies that enable time travel, rollback, and experiment tracking for data.


9.6.1. Why Data Versioning Matters for ML

The Reproducibility Crisis

ProblemWithout VersioningWith Versioning
“What data trained this model?”UnknownExact commit hash
“Can we reproduce last month’s results?”NoYes, checkout data version
“Something broke—what changed?”Manual investigationDiff between versions
“Can we rollback bad data?”Restore from backup (hours)Instant rollback

Data Versioning vs. Code Versioning

AspectCode (Git)Data
SizeMBsTBs-PBs
Change frequencyCommitsContinuous streams
Diff granularityLine-by-lineRow/column/partition
Storage modelFull copiesCopy-on-write/delta
BranchingCheapMust be efficient

9.6.2. lakeFS: Git for Data

lakeFS provides Git-like operations (branch, commit, merge) for data lakes.

Core Concepts

ConceptlakeFS Implementation
RepositoryA bucket or prefix in object storage
BranchPointer to a commit, mutable
CommitImmutable snapshot of data
ObjectIndividual file in the lake
MergeCombine branches (three-way merge)

Architecture Overview

┌─────────────────────────────────────────────────────────────────────┐
│                         lakeFS Architecture                          │
├─────────────────────────────────────────────────────────────────────┤
│                                                                     │
│   ┌────────────────┐                                                │
│   │    Clients     │                                                │
│   │ (S3/GCS API)   │                                                │
│   └───────┬────────┘                                                │
│           │                                                         │
│           ▼                                                         │
│   ┌────────────────┐   ┌────────────────┐                          │
│   │    lakeFS      │◄──│   Metadata     │                          │
│   │    Gateway     │   │   Store        │                          │
│   │                │   │(PostgreSQL/    │                          │
│   │ (S3 Protocol)  │   │ DynamoDB)      │                          │
│   └───────┬────────┘   └────────────────┘                          │
│           │                                                         │
│           ▼                                                         │
│   ┌────────────────────────────────────────────────────────┐       │
│   │              Object Storage (S3/GCS/Azure)              │       │
│   │   ┌──────────┐  ┌──────────┐  ┌──────────┐            │       │
│   │   │  Branch  │  │  Branch  │  │  Branch  │  ...       │       │
│   │   │  main    │  │  develop │  │  feature │            │       │
│   │   └──────────┘  └──────────┘  └──────────┘            │       │
│   └────────────────────────────────────────────────────────┘       │
└─────────────────────────────────────────────────────────────────────┘

Installing lakeFS

# docker-compose.yml for lakeFS
version: "3"
services:
  lakefs:
    image: treeverse/lakefs:0.110.0
    ports:
      - "8000:8000"
    environment:
      - LAKEFS_DATABASE_TYPE=local
      - LAKEFS_BLOCKSTORE_TYPE=s3
      - LAKEFS_BLOCKSTORE_S3_REGION=us-east-1
      - LAKEFS_AUTH_ENCRYPT_SECRET_KEY=${LAKEFS_SECRET}
      - AWS_ACCESS_KEY_ID=${AWS_ACCESS_KEY_ID}
      - AWS_SECRET_ACCESS_KEY=${AWS_SECRET_ACCESS_KEY}
    volumes:
      - ./lakefs-data:/var/lib/lakefs
    command: run --db.local.path /var/lib/lakefs/metadata

lakeFS with Terraform (AWS)

# lakeFS on EKS
resource "helm_release" "lakefs" {
  name       = "lakefs"
  repository = "https://charts.lakefs.io"
  chart      = "lakefs"
  namespace  = "lakefs"

  values = [
    yamlencode({
      lakefsConfig = {
        database = {
          type = "postgres"
          postgres = {
            connection_string = "postgres://${var.db_user}:${var.db_password}@${aws_db_instance.lakefs.endpoint}/lakefs"
          }
        }
        blockstore = {
          type = "s3"
          s3 = {
            region = var.region
          }
        }
      }
      service = {
        type = "LoadBalancer"
        annotations = {
          "service.beta.kubernetes.io/aws-load-balancer-type" = "nlb"
        }
      }
    })
  ]
}

# S3 bucket for lakeFS storage
resource "aws_s3_bucket" "lakefs_data" {
  bucket = "lakefs-ml-data-${var.environment}"
}

# IAM role for lakeFS
resource "aws_iam_role_policy" "lakefs_s3_access" {
  name = "lakefs-s3-access"
  role = aws_iam_role.lakefs.id

  policy = jsonencode({
    Version = "2012-10-17"
    Statement = [
      {
        Effect = "Allow"
        Action = [
          "s3:GetObject",
          "s3:PutObject",
          "s3:DeleteObject",
          "s3:ListBucket"
        ]
        Resource = [
          aws_s3_bucket.lakefs_data.arn,
          "${aws_s3_bucket.lakefs_data.arn}/*"
        ]
      }
    ]
  })
}

Using lakeFS in Python

import lakefs_client
from lakefs_client import models
from lakefs_client.client import LakeFSClient

# Configure client
configuration = lakefs_client.Configuration()
configuration.host = "http://lakefs:8000/api/v1"
configuration.username = "AKIAIOSFODNN7EXAMPLE"
configuration.password = "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY"

client = LakeFSClient(configuration)

# Create a repository
repo_api = lakefs_client.RepositoriesApi(client)
repo_api.create_repository(
    models.RepositoryCreation(
        name="ml-training-data",
        storage_namespace="s3://lakefs-ml-data-prod/repos/ml-training-data",
        default_branch="main"
    )
)

# Create a branch for experimentation
branch_api = lakefs_client.BranchesApi(client)
branch_api.create_branch(
    repository="ml-training-data",
    branch_creation=models.BranchCreation(
        name="experiment-new-features",
        source="main"
    )
)

# Upload data using S3 API (lakeFS speaks S3!)
import boto3

s3 = boto3.client(
    's3',
    endpoint_url='http://lakefs:8000',
    aws_access_key_id='AKIAIOSFODNN7EXAMPLE',
    aws_secret_access_key='wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY'
)

# Upload to a branch
s3.upload_file(
    'training_data.parquet',
    'ml-training-data',
    'experiment-new-features/data/training_data.parquet'
)

# Commit the changes
commits_api = lakefs_client.CommitsApi(client)
commits_api.commit(
    repository="ml-training-data",
    branch="experiment-new-features",
    commit_creation=models.CommitCreation(
        message="Add new feature engineering pipeline output",
        metadata={"experiment_id": "exp-123", "author": "data-team"}
    )
)

# Diff between branches
diff_api = lakefs_client.RefsApi(client)
diff_result = diff_api.diff_refs(
    repository="ml-training-data",
    left_ref="main",
    right_ref="experiment-new-features"
)

for diff in diff_result.results:
    print(f"{diff.type}: {diff.path}")

# Merge if experiment is successful
merge_api = lakefs_client.RefsApi(client)
merge_api.merge_into_branch(
    repository="ml-training-data",
    source_ref="experiment-new-features",
    destination_branch="main"
)

lakeFS for ML Training

# Reading versioned data in training
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .config("spark.hadoop.fs.s3a.endpoint", "http://lakefs:8000") \
    .config("spark.hadoop.fs.s3a.access.key", "AKIAIOSFODNN7EXAMPLE") \
    .config("spark.hadoop.fs.s3a.secret.key", "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY") \
    .config("spark.hadoop.fs.s3a.path.style.access", "true") \
    .getOrCreate()

# Read from a specific commit (reproducible!)
commit_id = "abc123def456"
training_data = spark.read.parquet(
    f"s3a://ml-training-data/{commit_id}/data/training/"
)

# Or read from a branch
training_data = spark.read.parquet(
    "s3a://ml-training-data/main/data/training/"
)

# Log the data version with the model
import mlflow

with mlflow.start_run():
    mlflow.log_param("data_commit", commit_id)
    mlflow.log_param("data_branch", "main")
    # Train model...

9.6.3. Delta Lake: ACID Transactions for Big Data

Delta Lake brings ACID transactions to data lakes.

Core Features

FeatureDescription
ACID TransactionsConcurrent reads/writes without corruption
Time TravelQuery historical versions
Schema EvolutionAdd columns without breaking
Unified Batch/StreamingSame table for both
Audit LogTransaction history

Delta Lake on AWS

# Using Delta Lake with Spark on EMR/Glue
from delta import *
from pyspark.sql import SparkSession

# Configure Spark with Delta
spark = SparkSession.builder \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .getOrCreate()

# Write data as Delta table
df = spark.read.parquet("s3://raw-data/events/")
df.write.format("delta").save("s3://delta-lake/events/")

# Read Delta table
events = spark.read.format("delta").load("s3://delta-lake/events/")

# Time travel: read historical version
events_v0 = spark.read \
    .format("delta") \
    .option("versionAsOf", 0) \
    .load("s3://delta-lake/events/")

# Time travel by timestamp
events_yesterday = spark.read \
    .format("delta") \
    .option("timestampAsOf", "2024-01-14 00:00:00") \
    .load("s3://delta-lake/events/")

Delta Lake Operations

from delta.tables import DeltaTable

# Create Delta table reference
delta_table = DeltaTable.forPath(spark, "s3://delta-lake/events/")

# UPSERT (merge)
updates_df = spark.read.parquet("s3://updates/")

delta_table.alias("target").merge(
    updates_df.alias("source"),
    "target.event_id = source.event_id"
).whenMatchedUpdate(
    set={"amount": "source.amount", "timestamp": "source.timestamp"}
).whenNotMatchedInsert(
    values={
        "event_id": "source.event_id",
        "user_id": "source.user_id",
        "amount": "source.amount",
        "timestamp": "source.timestamp"
    }
).execute()

# Delete
delta_table.delete("timestamp < '2023-01-01'")

# Vacuum (clean up old files)
delta_table.vacuum(retentionHours=168)  # 7 days

# Get history
history = delta_table.history()
history.show()

Delta Lake on GCP with Dataproc

# Dataproc cluster with Delta Lake
resource "google_dataproc_cluster" "delta_cluster" {
  name   = "delta-processing"
  region = var.region

  cluster_config {
    master_config {
      num_instances = 1
      machine_type  = "n2-standard-4"
    }

    worker_config {
      num_instances = 4
      machine_type  = "n2-standard-8"
    }

    software_config {
      image_version = "2.1-debian11"
      optional_components = ["JUPYTER"]
      
      override_properties = {
        "spark:spark.sql.extensions"   = "io.delta.sql.DeltaSparkSessionExtension"
        "spark:spark.sql.catalog.spark_catalog" = "org.apache.spark.sql.delta.catalog.DeltaCatalog"
        "spark:spark.jars.packages"    = "io.delta:delta-core_2.12:2.4.0"
      }
    }

    gce_cluster_config {
      zone = "${var.region}-a"
      
      service_account_scopes = [
        "cloud-platform"
      ]
    }
  }
}

9.6.4. DVC: Git-Based Data Versioning

DVC (Data Version Control) extends Git for large files.

How DVC Works

┌─────────────────────────────────────────────────────────────────────┐
│                      DVC Architecture                                │
├─────────────────────────────────────────────────────────────────────┤
│                                                                     │
│   Git Repository                  Remote Storage                    │
│   ┌──────────────┐               ┌──────────────┐                  │
│   │ code/        │               │   S3/GCS/    │                  │
│   │ ├── train.py │               │   Azure Blob │                  │
│   │ ├── model.py │               ├──────────────┤                  │
│   │              │               │ data/        │                  │
│   │ data.dvc ◄───┼──────.dvc────▶│ └── v1/     │                  │
│   │  (pointer)   │    files      │     training/│                  │
│   │              │               │   └── v2/    │                  │
│   │ dvc.lock     │               │     training/│                  │
│   │  (pipeline)  │               └──────────────┘                  │
│   └──────────────┘                                                  │
└─────────────────────────────────────────────────────────────────────┘

DVC Setup and Usage

# Initialize DVC in a Git repo
git init
dvc init

# Configure remote storage
dvc remote add -d myremote s3://my-bucket/dvc-storage

# Track large data files
dvc add data/training/
# Creates data/training.dvc (pointer file, tracked by Git)
# Actual data goes to .dvc/cache and remote

git add data/training.dvc .gitignore
git commit -m "Add training data v1"

# Push data to remote
dvc push

# Update data
cp new_data/* data/training/
dvc add data/training/
git commit -m "Update training data v2"
dvc push

# Switch to old data version
git checkout v1.0
dvc checkout
# Now data/training/ has version from v1.0 tag

DVC Pipelines

# dvc.yaml - Define reproducible pipelines
stages:
  prepare:
    cmd: python src/prepare.py data/raw data/prepared
    deps:
      - data/raw
      - src/prepare.py
    outs:
      - data/prepared

  featurize:
    cmd: python src/featurize.py data/prepared data/features
    deps:
      - data/prepared
      - src/featurize.py
    params:
      - featurize.window_size
      - featurize.aggregations
    outs:
      - data/features

  train:
    cmd: python src/train.py data/features models/model.pkl
    deps:
      - data/features
      - src/train.py
    params:
      - train.learning_rate
      - train.n_estimators
    outs:
      - models/model.pkl
    metrics:
      - metrics/train_metrics.json:
          cache: false

  evaluate:
    cmd: python src/evaluate.py models/model.pkl data/test
    deps:
      - models/model.pkl
      - data/test
      - src/evaluate.py
    metrics:
      - metrics/eval_metrics.json:
          cache: false
    plots:
      - plots/roc_curve.json:
          x: fpr
          y: tpr

Running DVC Pipeline

# Run the full pipeline
dvc repro

# Run specific stage
dvc repro train

# See pipeline DAG
dvc dag

# Compare metrics across experiments
dvc metrics diff

# Show parameter changes
dvc params diff

9.6.5. Versioning Strategy Selection

Comparison Matrix

FeaturelakeFSDelta LakeDVC
Primary UseData lake versioningACID tablesML experiments
BranchingFull Git-likeNo native branchingGit-based
Time TravelVia commitsBuilt-inVia Git tags
ScalabilityPB scalePB scaleTB scale
IntegrationS3 API compatibleSpark nativeCLI + Python
SchemaSchema-agnosticSchema-awareFile-based
OverheadLow (metadata only)Moderate (transaction log)Low

Decision Framework

                              ┌─────────────────┐
                              │ Need ACID for   │
                          ┌───│ concurrent      │
                          │Yes│ updates?        │
                          │   └────────┬────────┘
                          │            │No
                          ▼            ▼
                   ┌─────────────┐  ┌─────────────┐
                   │ Delta Lake  │  │ Need branch │
                   │             │  │ workflows?  │
                   └─────────────┘  └──────┬──────┘
                                       Yes │ No
                                           │
                              ┌────────────┴───────────┐
                              ▼                        ▼
                       ┌─────────────┐          ┌─────────────┐
                       │   lakeFS    │          │    DVC      │
                       │             │          │   (small)   │
                       └─────────────┘          └─────────────┘
Use CaseRecommended Stack
ML experimentsDVC + Git + S3
Data lake governancelakeFS + Delta Lake
Streaming + batchDelta Lake
Feature engineeringDelta Lake + Feast
Multi-environmentlakeFS (branch per env)

9.6.6. Data Lineage and Governance

Why Lineage Matters

QuestionWithout LineageWith Lineage
“Where did this data come from?”UnknownFull trace to sources
“What does this field mean?”Tribal knowledgeCatalog metadata
“Who changed this?”Audit logs (maybe)Full history
“If I change X, what breaks?”Trial and errorImpact analysis

OpenLineage Standard

# Emit lineage events using OpenLineage
from openlineage.client import OpenLineageClient
from openlineage.client.run import RunEvent, Job, Run, Dataset

client = OpenLineageClient.from_environment()

# Define job
job = Job(
    namespace="ml-training",
    name="feature-engineering-pipeline"
)

# Start run
run_start = RunEvent(
    eventType="START",
    job=job,
    run=Run(runId=str(uuid.uuid4())),
    producer="my-pipeline"
)
client.emit(run_start)

# Complete with lineage
run_complete = RunEvent(
    eventType="COMPLETE",
    job=job,
    run=Run(runId=run_id),
    inputs=[
        Dataset(
            namespace="s3://raw-data",
            name="user_events",
            facets={"schema": {"fields": [...]}}
        )
    ],
    outputs=[
        Dataset(
            namespace="s3://feature-store",
            name="user_features",
            facets={"schema": {"fields": [...]}}
        )
    ],
    producer="my-pipeline"
)
client.emit(run_complete)

AWS Glue Data Catalog Lineage

# Enable lineage in Glue Catalog
resource "aws_glue_catalog_database" "ml_database" {
  name = "ml_data_catalog"
  
  create_table_default_permission {
    permissions = ["ALL"]
    
    principal {
      data_lake_principal_identifier = "IAM_ALLOWED_PRINCIPALS"
    }
  }
}

# Glue job with lineage tracking
resource "aws_glue_job" "feature_pipeline" {
  name     = "feature-engineering"
  role_arn = aws_iam_role.glue_role.arn

  command {
    script_location = "s3://scripts/feature_pipeline.py"
    python_version  = "3"
  }

  default_arguments = {
    "--enable-continuous-cloudwatch-log" = "true"
    "--enable-metrics"                   = "true"
    "--enable-glue-datacatalog"         = "true"
    # Lineage tracking
    "--enable-job-insights"             = "true"
  }
}

9.6.7. Key Takeaways

  1. Data versioning is non-negotiable for ML: Reproducibility requires it.

  2. lakeFS for Git-like workflows: Branch, commit, merge for data.

  3. Delta Lake for ACID and time travel: Best for concurrent access.

  4. DVC for ML experiments: Integrates with Git, tracks data + models.

  5. Choose based on use case: Different tools excel at different things.

  6. Lineage completes the picture: Know where data came from and where it goes.

  7. Combine tools: lakeFS + Delta Lake + Feast is common.

  8. Start small, scale up: DVC for experiments → lakeFS for production.


Next: 9.7 Data Lineage & Governance — Automated compliance and impact analysis.