Chapter 9.6: Advanced Data Versioning
“Without versioning, you can’t reproduce results. Without reproducibility, you don’t have science—you have anecdotes.” — Pete Warden, Former Google Staff Engineer
Data versioning is the foundation of ML reproducibility. This chapter covers deep dives into lakeFS, Delta Lake, and other versioning strategies that enable time travel, rollback, and experiment tracking for data.
9.6.1. Why Data Versioning Matters for ML
The Reproducibility Crisis
| Problem | Without Versioning | With Versioning |
|---|---|---|
| “What data trained this model?” | Unknown | Exact commit hash |
| “Can we reproduce last month’s results?” | No | Yes, checkout data version |
| “Something broke—what changed?” | Manual investigation | Diff between versions |
| “Can we rollback bad data?” | Restore from backup (hours) | Instant rollback |
Data Versioning vs. Code Versioning
| Aspect | Code (Git) | Data |
|---|---|---|
| Size | MBs | TBs-PBs |
| Change frequency | Commits | Continuous streams |
| Diff granularity | Line-by-line | Row/column/partition |
| Storage model | Full copies | Copy-on-write/delta |
| Branching | Cheap | Must be efficient |
9.6.2. lakeFS: Git for Data
lakeFS provides Git-like operations (branch, commit, merge) for data lakes.
Core Concepts
| Concept | lakeFS Implementation |
|---|---|
| Repository | A bucket or prefix in object storage |
| Branch | Pointer to a commit, mutable |
| Commit | Immutable snapshot of data |
| Object | Individual file in the lake |
| Merge | Combine branches (three-way merge) |
Architecture Overview
┌─────────────────────────────────────────────────────────────────────┐
│ lakeFS Architecture │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ ┌────────────────┐ │
│ │ Clients │ │
│ │ (S3/GCS API) │ │
│ └───────┬────────┘ │
│ │ │
│ ▼ │
│ ┌────────────────┐ ┌────────────────┐ │
│ │ lakeFS │◄──│ Metadata │ │
│ │ Gateway │ │ Store │ │
│ │ │ │(PostgreSQL/ │ │
│ │ (S3 Protocol) │ │ DynamoDB) │ │
│ └───────┬────────┘ └────────────────┘ │
│ │ │
│ ▼ │
│ ┌────────────────────────────────────────────────────────┐ │
│ │ Object Storage (S3/GCS/Azure) │ │
│ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │
│ │ │ Branch │ │ Branch │ │ Branch │ ... │ │
│ │ │ main │ │ develop │ │ feature │ │ │
│ │ └──────────┘ └──────────┘ └──────────┘ │ │
│ └────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────────┘
Installing lakeFS
# docker-compose.yml for lakeFS
version: "3"
services:
lakefs:
image: treeverse/lakefs:0.110.0
ports:
- "8000:8000"
environment:
- LAKEFS_DATABASE_TYPE=local
- LAKEFS_BLOCKSTORE_TYPE=s3
- LAKEFS_BLOCKSTORE_S3_REGION=us-east-1
- LAKEFS_AUTH_ENCRYPT_SECRET_KEY=${LAKEFS_SECRET}
- AWS_ACCESS_KEY_ID=${AWS_ACCESS_KEY_ID}
- AWS_SECRET_ACCESS_KEY=${AWS_SECRET_ACCESS_KEY}
volumes:
- ./lakefs-data:/var/lib/lakefs
command: run --db.local.path /var/lib/lakefs/metadata
lakeFS with Terraform (AWS)
# lakeFS on EKS
resource "helm_release" "lakefs" {
name = "lakefs"
repository = "https://charts.lakefs.io"
chart = "lakefs"
namespace = "lakefs"
values = [
yamlencode({
lakefsConfig = {
database = {
type = "postgres"
postgres = {
connection_string = "postgres://${var.db_user}:${var.db_password}@${aws_db_instance.lakefs.endpoint}/lakefs"
}
}
blockstore = {
type = "s3"
s3 = {
region = var.region
}
}
}
service = {
type = "LoadBalancer"
annotations = {
"service.beta.kubernetes.io/aws-load-balancer-type" = "nlb"
}
}
})
]
}
# S3 bucket for lakeFS storage
resource "aws_s3_bucket" "lakefs_data" {
bucket = "lakefs-ml-data-${var.environment}"
}
# IAM role for lakeFS
resource "aws_iam_role_policy" "lakefs_s3_access" {
name = "lakefs-s3-access"
role = aws_iam_role.lakefs.id
policy = jsonencode({
Version = "2012-10-17"
Statement = [
{
Effect = "Allow"
Action = [
"s3:GetObject",
"s3:PutObject",
"s3:DeleteObject",
"s3:ListBucket"
]
Resource = [
aws_s3_bucket.lakefs_data.arn,
"${aws_s3_bucket.lakefs_data.arn}/*"
]
}
]
})
}
Using lakeFS in Python
import lakefs_client
from lakefs_client import models
from lakefs_client.client import LakeFSClient
# Configure client
configuration = lakefs_client.Configuration()
configuration.host = "http://lakefs:8000/api/v1"
configuration.username = "AKIAIOSFODNN7EXAMPLE"
configuration.password = "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY"
client = LakeFSClient(configuration)
# Create a repository
repo_api = lakefs_client.RepositoriesApi(client)
repo_api.create_repository(
models.RepositoryCreation(
name="ml-training-data",
storage_namespace="s3://lakefs-ml-data-prod/repos/ml-training-data",
default_branch="main"
)
)
# Create a branch for experimentation
branch_api = lakefs_client.BranchesApi(client)
branch_api.create_branch(
repository="ml-training-data",
branch_creation=models.BranchCreation(
name="experiment-new-features",
source="main"
)
)
# Upload data using S3 API (lakeFS speaks S3!)
import boto3
s3 = boto3.client(
's3',
endpoint_url='http://lakefs:8000',
aws_access_key_id='AKIAIOSFODNN7EXAMPLE',
aws_secret_access_key='wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY'
)
# Upload to a branch
s3.upload_file(
'training_data.parquet',
'ml-training-data',
'experiment-new-features/data/training_data.parquet'
)
# Commit the changes
commits_api = lakefs_client.CommitsApi(client)
commits_api.commit(
repository="ml-training-data",
branch="experiment-new-features",
commit_creation=models.CommitCreation(
message="Add new feature engineering pipeline output",
metadata={"experiment_id": "exp-123", "author": "data-team"}
)
)
# Diff between branches
diff_api = lakefs_client.RefsApi(client)
diff_result = diff_api.diff_refs(
repository="ml-training-data",
left_ref="main",
right_ref="experiment-new-features"
)
for diff in diff_result.results:
print(f"{diff.type}: {diff.path}")
# Merge if experiment is successful
merge_api = lakefs_client.RefsApi(client)
merge_api.merge_into_branch(
repository="ml-training-data",
source_ref="experiment-new-features",
destination_branch="main"
)
lakeFS for ML Training
# Reading versioned data in training
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.config("spark.hadoop.fs.s3a.endpoint", "http://lakefs:8000") \
.config("spark.hadoop.fs.s3a.access.key", "AKIAIOSFODNN7EXAMPLE") \
.config("spark.hadoop.fs.s3a.secret.key", "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY") \
.config("spark.hadoop.fs.s3a.path.style.access", "true") \
.getOrCreate()
# Read from a specific commit (reproducible!)
commit_id = "abc123def456"
training_data = spark.read.parquet(
f"s3a://ml-training-data/{commit_id}/data/training/"
)
# Or read from a branch
training_data = spark.read.parquet(
"s3a://ml-training-data/main/data/training/"
)
# Log the data version with the model
import mlflow
with mlflow.start_run():
mlflow.log_param("data_commit", commit_id)
mlflow.log_param("data_branch", "main")
# Train model...
9.6.3. Delta Lake: ACID Transactions for Big Data
Delta Lake brings ACID transactions to data lakes.
Core Features
| Feature | Description |
|---|---|
| ACID Transactions | Concurrent reads/writes without corruption |
| Time Travel | Query historical versions |
| Schema Evolution | Add columns without breaking |
| Unified Batch/Streaming | Same table for both |
| Audit Log | Transaction history |
Delta Lake on AWS
# Using Delta Lake with Spark on EMR/Glue
from delta import *
from pyspark.sql import SparkSession
# Configure Spark with Delta
spark = SparkSession.builder \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
.getOrCreate()
# Write data as Delta table
df = spark.read.parquet("s3://raw-data/events/")
df.write.format("delta").save("s3://delta-lake/events/")
# Read Delta table
events = spark.read.format("delta").load("s3://delta-lake/events/")
# Time travel: read historical version
events_v0 = spark.read \
.format("delta") \
.option("versionAsOf", 0) \
.load("s3://delta-lake/events/")
# Time travel by timestamp
events_yesterday = spark.read \
.format("delta") \
.option("timestampAsOf", "2024-01-14 00:00:00") \
.load("s3://delta-lake/events/")
Delta Lake Operations
from delta.tables import DeltaTable
# Create Delta table reference
delta_table = DeltaTable.forPath(spark, "s3://delta-lake/events/")
# UPSERT (merge)
updates_df = spark.read.parquet("s3://updates/")
delta_table.alias("target").merge(
updates_df.alias("source"),
"target.event_id = source.event_id"
).whenMatchedUpdate(
set={"amount": "source.amount", "timestamp": "source.timestamp"}
).whenNotMatchedInsert(
values={
"event_id": "source.event_id",
"user_id": "source.user_id",
"amount": "source.amount",
"timestamp": "source.timestamp"
}
).execute()
# Delete
delta_table.delete("timestamp < '2023-01-01'")
# Vacuum (clean up old files)
delta_table.vacuum(retentionHours=168) # 7 days
# Get history
history = delta_table.history()
history.show()
Delta Lake on GCP with Dataproc
# Dataproc cluster with Delta Lake
resource "google_dataproc_cluster" "delta_cluster" {
name = "delta-processing"
region = var.region
cluster_config {
master_config {
num_instances = 1
machine_type = "n2-standard-4"
}
worker_config {
num_instances = 4
machine_type = "n2-standard-8"
}
software_config {
image_version = "2.1-debian11"
optional_components = ["JUPYTER"]
override_properties = {
"spark:spark.sql.extensions" = "io.delta.sql.DeltaSparkSessionExtension"
"spark:spark.sql.catalog.spark_catalog" = "org.apache.spark.sql.delta.catalog.DeltaCatalog"
"spark:spark.jars.packages" = "io.delta:delta-core_2.12:2.4.0"
}
}
gce_cluster_config {
zone = "${var.region}-a"
service_account_scopes = [
"cloud-platform"
]
}
}
}
9.6.4. DVC: Git-Based Data Versioning
DVC (Data Version Control) extends Git for large files.
How DVC Works
┌─────────────────────────────────────────────────────────────────────┐
│ DVC Architecture │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ Git Repository Remote Storage │
│ ┌──────────────┐ ┌──────────────┐ │
│ │ code/ │ │ S3/GCS/ │ │
│ │ ├── train.py │ │ Azure Blob │ │
│ │ ├── model.py │ ├──────────────┤ │
│ │ │ │ data/ │ │
│ │ data.dvc ◄───┼──────.dvc────▶│ └── v1/ │ │
│ │ (pointer) │ files │ training/│ │
│ │ │ │ └── v2/ │ │
│ │ dvc.lock │ │ training/│ │
│ │ (pipeline) │ └──────────────┘ │
│ └──────────────┘ │
└─────────────────────────────────────────────────────────────────────┘
DVC Setup and Usage
# Initialize DVC in a Git repo
git init
dvc init
# Configure remote storage
dvc remote add -d myremote s3://my-bucket/dvc-storage
# Track large data files
dvc add data/training/
# Creates data/training.dvc (pointer file, tracked by Git)
# Actual data goes to .dvc/cache and remote
git add data/training.dvc .gitignore
git commit -m "Add training data v1"
# Push data to remote
dvc push
# Update data
cp new_data/* data/training/
dvc add data/training/
git commit -m "Update training data v2"
dvc push
# Switch to old data version
git checkout v1.0
dvc checkout
# Now data/training/ has version from v1.0 tag
DVC Pipelines
# dvc.yaml - Define reproducible pipelines
stages:
prepare:
cmd: python src/prepare.py data/raw data/prepared
deps:
- data/raw
- src/prepare.py
outs:
- data/prepared
featurize:
cmd: python src/featurize.py data/prepared data/features
deps:
- data/prepared
- src/featurize.py
params:
- featurize.window_size
- featurize.aggregations
outs:
- data/features
train:
cmd: python src/train.py data/features models/model.pkl
deps:
- data/features
- src/train.py
params:
- train.learning_rate
- train.n_estimators
outs:
- models/model.pkl
metrics:
- metrics/train_metrics.json:
cache: false
evaluate:
cmd: python src/evaluate.py models/model.pkl data/test
deps:
- models/model.pkl
- data/test
- src/evaluate.py
metrics:
- metrics/eval_metrics.json:
cache: false
plots:
- plots/roc_curve.json:
x: fpr
y: tpr
Running DVC Pipeline
# Run the full pipeline
dvc repro
# Run specific stage
dvc repro train
# See pipeline DAG
dvc dag
# Compare metrics across experiments
dvc metrics diff
# Show parameter changes
dvc params diff
9.6.5. Versioning Strategy Selection
Comparison Matrix
| Feature | lakeFS | Delta Lake | DVC |
|---|---|---|---|
| Primary Use | Data lake versioning | ACID tables | ML experiments |
| Branching | Full Git-like | No native branching | Git-based |
| Time Travel | Via commits | Built-in | Via Git tags |
| Scalability | PB scale | PB scale | TB scale |
| Integration | S3 API compatible | Spark native | CLI + Python |
| Schema | Schema-agnostic | Schema-aware | File-based |
| Overhead | Low (metadata only) | Moderate (transaction log) | Low |
Decision Framework
┌─────────────────┐
│ Need ACID for │
┌───│ concurrent │
│Yes│ updates? │
│ └────────┬────────┘
│ │No
▼ ▼
┌─────────────┐ ┌─────────────┐
│ Delta Lake │ │ Need branch │
│ │ │ workflows? │
└─────────────┘ └──────┬──────┘
Yes │ No
│
┌────────────┴───────────┐
▼ ▼
┌─────────────┐ ┌─────────────┐
│ lakeFS │ │ DVC │
│ │ │ (small) │
└─────────────┘ └─────────────┘
Recommended Combinations
| Use Case | Recommended Stack |
|---|---|
| ML experiments | DVC + Git + S3 |
| Data lake governance | lakeFS + Delta Lake |
| Streaming + batch | Delta Lake |
| Feature engineering | Delta Lake + Feast |
| Multi-environment | lakeFS (branch per env) |
9.6.6. Data Lineage and Governance
Why Lineage Matters
| Question | Without Lineage | With Lineage |
|---|---|---|
| “Where did this data come from?” | Unknown | Full trace to sources |
| “What does this field mean?” | Tribal knowledge | Catalog metadata |
| “Who changed this?” | Audit logs (maybe) | Full history |
| “If I change X, what breaks?” | Trial and error | Impact analysis |
OpenLineage Standard
# Emit lineage events using OpenLineage
from openlineage.client import OpenLineageClient
from openlineage.client.run import RunEvent, Job, Run, Dataset
client = OpenLineageClient.from_environment()
# Define job
job = Job(
namespace="ml-training",
name="feature-engineering-pipeline"
)
# Start run
run_start = RunEvent(
eventType="START",
job=job,
run=Run(runId=str(uuid.uuid4())),
producer="my-pipeline"
)
client.emit(run_start)
# Complete with lineage
run_complete = RunEvent(
eventType="COMPLETE",
job=job,
run=Run(runId=run_id),
inputs=[
Dataset(
namespace="s3://raw-data",
name="user_events",
facets={"schema": {"fields": [...]}}
)
],
outputs=[
Dataset(
namespace="s3://feature-store",
name="user_features",
facets={"schema": {"fields": [...]}}
)
],
producer="my-pipeline"
)
client.emit(run_complete)
AWS Glue Data Catalog Lineage
# Enable lineage in Glue Catalog
resource "aws_glue_catalog_database" "ml_database" {
name = "ml_data_catalog"
create_table_default_permission {
permissions = ["ALL"]
principal {
data_lake_principal_identifier = "IAM_ALLOWED_PRINCIPALS"
}
}
}
# Glue job with lineage tracking
resource "aws_glue_job" "feature_pipeline" {
name = "feature-engineering"
role_arn = aws_iam_role.glue_role.arn
command {
script_location = "s3://scripts/feature_pipeline.py"
python_version = "3"
}
default_arguments = {
"--enable-continuous-cloudwatch-log" = "true"
"--enable-metrics" = "true"
"--enable-glue-datacatalog" = "true"
# Lineage tracking
"--enable-job-insights" = "true"
}
}
9.6.7. Key Takeaways
-
Data versioning is non-negotiable for ML: Reproducibility requires it.
-
lakeFS for Git-like workflows: Branch, commit, merge for data.
-
Delta Lake for ACID and time travel: Best for concurrent access.
-
DVC for ML experiments: Integrates with Git, tracks data + models.
-
Choose based on use case: Different tools excel at different things.
-
Lineage completes the picture: Know where data came from and where it goes.
-
Combine tools: lakeFS + Delta Lake + Feast is common.
-
Start small, scale up: DVC for experiments → lakeFS for production.
Next: 9.7 Data Lineage & Governance — Automated compliance and impact analysis.