9.3. Processing Engines: The Heavy Lifters
“The efficiency of an AI organization is inversely proportional to the amount of time its data scientists spend waiting for a progress bar. Distributed computing is the art of deleting that progress bar, at the cost of your sanity.”
In the MLOps Lifecycle, the Processing Layer is where the “Raw Material” (Log Data) is refined into “Fuel” (Tensors). This is the bridge between the messy reality of the world and the mathematical purity of the model.
If you get Storage (Chapter 3.2) wrong, your system is slow. If you get Processing (Chapter 3.3) wrong, your system is insolvent. A poorly written join in a distributed system does not just fail; it silently consumes 5,000 vCPUs for 12 hours before failing.
This chapter is a technical deep dive into the three dominant computation engines in modern MLOps: Apache Spark (AWS EMR), Apache Beam (GCP Dataflow), and the rising star, Ray. We will explore their internal architectures, their specific implementations on AWS and Google Cloud, and the “Black Magic” required to tune them.
3.3.1. The Physics of Distributed Compute
Before discussing specific tools, we must agree on the fundamental constraints of distributed data processing. Whether you use Spark, Flink, or Beam, you are fighting the same three physics problems:
1. The Shuffle (The Network Bottleneck)
The “Shuffle” is the process of redistributing data across the cluster so that all data belonging to a specific key (e.g., User_ID) ends up on the same physical machine.
- The Cost: Shuffle requires serializing data, transmitting it over TCP/IP, and deserializing it.
- The Failure Mode: If a single node holds 20% of the keys (Data Skew), that node becomes a straggler. The entire cluster waits for one machine.
- MLOps Context: Doing a
JOINbetween a 1PB “Click Logs” table and a 500GB “User Metadata” table is the single most expensive operation in Feature Engineering.
2. Serialization (The CPU Bottleneck)
In the Cloud, CPU time is money. In Python-based MLOps (PySpark/Beam Python), 40% to 60% of CPU cycles are often spent converting data formats:
- Java Object $\leftrightarrow$ Pickle (Python)
- Network Byte Stream $\leftrightarrow$ In-Memory Object
- Row-based format $\leftrightarrow$ Columnar format (Parquet/Arrow)
3. State Management (The Memory Bottleneck)
Stream processing is stateful. Calculating “Clicks in the last 24 hours” requires holding 24 hours of history in memory.
- The Challenge: What happens if the cluster crashes? The state must be checkpointed to durable storage (S3/GCS/HDFS).
- The Trade-off: Frequent checkpointing guarantees correctness but kills throughput. Infrequent checkpointing risks data loss or long recovery times.
Real-World Example: The Cost of Poor Processing Design
Scenario: A mid-size e-commerce company needs to join two datasets for ML training:
- Orders table: 100M rows, 50GB (user_id, order_id, timestamp, amount)
- User features table: 10M rows, 5GB (user_id, age, country, lifetime_value)
Naive Implementation (Cost: $450):
# BAD: This causes a full shuffle of 100M rows
orders_df = spark.read.parquet("s3://data/orders/")
users_df = spark.read.parquet("s3://data/users/")
# The JOIN triggers a massive shuffle
result = orders_df.join(users_df, on="user_id")
result.write.parquet("s3://output/joined/")
# EMR Cluster: 50 x r5.4xlarge for 3 hours
# Cost: 50 × $1.008/hr × 3hr = $151/run × 3 runs/day = $450/day
Optimized Implementation (Cost: $30):
# GOOD: Broadcast the small table to avoid shuffle
from pyspark.sql.functions import broadcast
orders_df = spark.read.parquet("s3://data/orders/")
users_df = spark.read.parquet("s3://data/users/")
# Force broadcast join (users table < 8GB, fits in memory)
result = orders_df.join(broadcast(users_df), on="user_id")
result.write.parquet("s3://output/joined/")
# EMR Cluster: 10 x r5.4xlarge for 0.5 hours
# Cost: 10 × $1.008/hr × 0.5hr = $5/run × 3 runs/day = $15/day
# Plus data transfer savings
Savings: $435/day = $13,050/month = $156,600/year
The difference? Understanding that the smaller table can fit in memory on each executor, eliminating network shuffle.
3.3.2. AWS Architecture: The EMR Ecosystem
Amazon Elastic MapReduce (EMR) is the Swiss Army Knife of AWS data processing. It is not a single engine; it is a managed platform for Hadoop, Spark, Hive, Presto, and Hudi.
1. EMR Deployment Modes
AWS offers three distinct ways to run EMR. Choosing the wrong one is a common architectural error.
| Mode | Architecture | Startup Time | Cost Model | Best For |
|---|---|---|---|---|
| EMR on EC2 | Traditional Clusters. You manage the OS/Nodes. | 7-15 mins | Per Instance/Hr | Massive, long-running batch jobs (Petabytes). |
| EMR on EKS | Dockerized Spark on Kubernetes. | 1-2 mins | Per vCPU/Hr | Iterative ML experiments, CI/CD pipelines. |
| EMR Serverless | Fully abstract. No instance management. | ~1 min | Premium | Sporadic, bursty workloads. |
2. EMR on EC2: The “Instance Fleet” Strategy
For training data preparation, we typically need massive throughput for short periods. The most cost-effective pattern is using Instance Fleets with Spot Allocation Strategies.
The Terraform Configuration:
This configuration ensures that if r5.4xlarge is out of stock in us-east-1a, EMR automatically attempts to provision r5.8xlarge or r4.4xlarge instead, preventing pipeline failure.
resource "aws_emr_cluster" "feature_engineering" {
name = "mlops-feature-eng-prod"
release_label = "emr-7.1.0" # Always use latest for Spark/Arrow optimizations
applications = ["Spark", "Hadoop", "Livy"]
# 1. Master Node (The Brain) - ALWAYS ON-DEMAND
master_instance_fleet {
name = "Master-Fleet"
instance_type_configs {
instance_type = "m5.xlarge"
}
target_on_demand_capacity = 1
}
# 2. Core Nodes (HDFS Storage) - ON-DEMAND PREFERRED
# We need HDFS for intermediate shuffle data, even if input/output is S3.
core_instance_fleet {
name = "Core-Fleet"
instance_type_configs {
instance_type = "r5.2xlarge"
}
target_on_demand_capacity = 2
}
# 3. Task Nodes (Pure Compute) - SPOT INSTANCES
# This is where the heavy lifting happens. We bid on Spot.
task_instance_fleet {
name = "Task-Fleet"
# Diversify types to increase Spot availability probability
instance_type_configs {
instance_type = "c5.4xlarge"
weighted_capacity = 1
}
instance_type_configs {
instance_type = "c5.9xlarge"
weighted_capacity = 2
}
target_spot_capacity = 100 # Spin up 100 nodes cheaply
launch_specifications {
spot_specification {
allocation_strategy = "capacity-optimized"
timeout_action = "SWITCH_TO_ON_DEMAND" # Failover safety
timeout_duration_minutes = 10
}
}
}
}
3. Tuning Spark for Deep Learning Data
Standard Spark is tuned for ETL (aggregating sales numbers). MLOps Spark (processing images/embeddings) requires specific tuning.
The spark-defaults.conf Bible:
# 1. Memory Management for Large Tensors
# Increase overhead memory because Python processes (PyTorch/Numpy)
# run outside the JVM heap.
spark.executor.memoryOverhead = 4g
spark.executor.memory = 16g
spark.driver.memory = 8g
# 2. Apache Arrow (The Speedup)
# Critical for PySpark. Enables zero-copy data transfer between JVM and Python.
spark.sql.execution.arrow.pyspark.enabled = true
spark.sql.execution.arrow.maxRecordsPerBatch = 10000
# 3. S3 Performance (The Commit Protocol)
# "Magic Committer" writes directly to S3, bypassing the "Rename" step.
# Without this, the final step of your job will hang for hours.
spark.hadoop.fs.s3a.bucket.all.committer.magic.enabled = true
spark.sql.sources.commitProtocolClass = org.apache.spark.internal.io.cloud.PathOutputCommitProtocol
spark.sql.parquet.output.committer.class = org.apache.spark.internal.io.cloud.BindingParquetOutputCommitter
# 4. Shuffle Optimization
# For ML, we often have fewer, larger partitions.
spark.sql.shuffle.partitions = 500 # Default is 200
spark.default.parallelism = 500
4. Advanced Performance Tuning
Problem: Data Skew
Data skew occurs when one partition has significantly more data than others. This causes one executor to become a straggler while others sit idle.
Diagnosis:
# Check partition distribution
df.groupBy(spark_partition_id()).count().show()
# If one partition has 10x more rows than others, you have skew
Solutions:
A) Salting (for Join Skew):
from pyspark.sql.functions import rand, col, concat, lit
# Problem: user_id="whale_user" has 1M orders, all other users have <100
# This single user dominates one partition
# Solution: Add random "salt" to distribute the whale
skewed_df = orders_df.withColumn("salt", (rand() * 10).cast("int"))
skewed_df = skewed_df.withColumn("join_key", concat(col("user_id"), lit("_"), col("salt")))
# Replicate the smaller table with all salt values
users_salted = users_df.crossJoin(
spark.range(10).select(col("id").alias("salt"))
).withColumn("join_key", concat(col("user_id"), lit("_"), col("salt")))
# Now join is distributed across 10 partitions instead of 1
result = skewed_df.join(users_salted, on="join_key")
B) Adaptive Query Execution (Spark 3.0+):
# Enable AQE (Adaptive Query Execution)
# Spark dynamically adjusts execution plan based on runtime statistics
spark.sql.adaptive.enabled = true
spark.sql.adaptive.coalescePartitions.enabled = true
spark.sql.adaptive.skewJoin.enabled = true
spark.sql.adaptive.skewJoin.skewedPartitionFactor = 5
spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes = 256MB
With AQE enabled, Spark automatically detects skewed partitions and splits them during execution—no manual intervention required.
5. Memory Pressure Debugging
Symptom: Jobs fail with “OutOfMemoryError” or “Container killed by YARN for exceeding memory limits”
Diagnosis Tools:
# Enable detailed GC logging
spark.executor.extraJavaOptions = -XX:+PrintGCDetails -XX:+PrintGCTimeStamps
# Access Spark UI
# http://<driver-node>:4040/executors/
# Look for:
# - High "GC Time" (> 10% of task time = memory pressure)
# - Frequent "Spill to Disk" (indicates not enough memory for shuffle)
Solutions:
# Option 1: Increase executor memory
spark.executor.memory = 32g
spark.executor.memoryOverhead = 8g
# Option 2: Reduce parallelism (fewer concurrent tasks = less memory per executor)
spark.executor.cores = 2 # Down from default 4
# Option 3: Enable off-heap memory for shuffle
spark.memory.offHeap.enabled = true
spark.memory.offHeap.size = 10g
6. Monitoring EMR Jobs
CloudWatch Metrics (Automatic):
IsIdle- Is the cluster running tasks?MRActiveNodes- Number of active nodesHDFSUtilization- Are we running out of HDFS space?
Custom Metrics (Push to CloudWatch):
import boto3
cloudwatch = boto3.client('cloudwatch')
def log_job_metrics(job_name, rows_processed, duration_sec):
cloudwatch.put_metric_data(
Namespace='MLOps/DataProcessing',
MetricData=[
{
'MetricName': 'RowsProcessed',
'Value': rows_processed,
'Unit': 'Count',
'Dimensions': [{'Name': 'Job', 'Value': job_name}]
},
{
'MetricName': 'JobDuration',
'Value': duration_sec,
'Unit': 'Seconds',
'Dimensions': [{'Name': 'Job', 'Value': job_name}]
}
]
)
# Use in Spark job
start_time = time.time()
result_df = process_data()
rows = result_df.count()
duration = time.time() - start_time
log_job_metrics("feature_engineering", rows, duration)
Alerting Strategy:
- Critical: Job fails (EMR Step State = FAILED)
- Warning: Job duration > 2x baseline, Memory utilization > 90%
- Info: Job completes successfully
3.3.3. GCP Architecture: The Dataflow Difference
Google Cloud Dataflow is a managed service for Apache Beam. Unlike Spark, which exposes the cluster (Drivers/Executors), Dataflow exposes a Job Service.
1. The Beam Programming Model
Understanding Beam is a prerequisite for GCP Dataflow. It unifies Batch and Stream into a single semantic model.
- PCollection: A distributed dataset (bounded or unbounded).
- PTransform: An operation (Map, Filter, Group).
- Pipeline: The DAG (Directed Acyclic Graph) of transforms.
- Window: How you slice time (Fixed, Sliding, Session).
- Trigger: When you emit results (Early, On-time, Late).
2. The Streaming Engine & Shuffle Service
Dataflow separates compute from state.
- Scenario: You are calculating a 24-hour sliding window of user activity.
- In Spark: The state (24 hours of data) is stored on the Worker Node’s local disk (RocksDB). If the node fails, the state must be recovered from a checkpoint, causing a latency spike.
- In Dataflow: The state is offloaded to the Streaming Engine (a remote, managed tiered storage service).
- Result: Compute is stateless. You can scale down from 100 workers to 1 worker instantly without losing data. The new worker simply queries the Streaming Engine for the state it needs.
3. Handling “Late Data” in MLOps
In ML Feature Engineering, “Event Time” matters more than “Processing Time”. If a user clicks an ad at 12:00, but the log arrives at 12:15 due to network lag, it must be counted in the 12:00 bucket.
Beam Python Code for Robust Feature Engineering:
import apache_beam as beam
from apache_beam.transforms.trigger import AfterWatermark, AfterProcessingTime, AccumulatingFiredPanes
def run_pipeline():
with beam.Pipeline(options=pipeline_options) as p:
(p
| 'ReadPubSub' >> beam.io.ReadFromPubSub(topic=input_topic)
| 'ParseJson' >> beam.Map(json.loads)
| 'AddTimestamp' >> beam.Map(lambda x: beam.window.TimestampedValue(x, x['event_timestamp']))
# THE MAGIC: Windowing with Late Data Handling
| 'Window' >> beam.WindowInto(
beam.window.FixedWindows(60), # 1-minute windows
# Triggering Strategy:
# 1. Emit purely speculative results every 10 seconds (for real-time dashboards)
# 2. Emit the "Final" result when the Watermark passes (completeness)
# 3. Update the result if late data arrives (correctness)
trigger=AfterWatermark(
early=AfterProcessingTime(10),
late=AfterProcessingTime(10)
),
# How strictly do we drop old data?
allowed_lateness=3600, # Allow data up to 1 hour late
accumulation_mode=AccumulatingFiredPanes() # Add late data to previous sum
)
| 'CalculateFeature' >> beam.CombineGlobally(SumFn()).without_defaults()
| 'WriteToFeatureStore' >> beam.ParDo(WriteToRedis())
)
This level of granular control over time is why sophisticated ML teams (Spotify, Twitter/X, Lyft) prefer Beam/Dataflow for real-time features, despite the steeper learning curve compared to Spark.
3.3.4. The Emerging “GenAI” Stack: Ray
Spark and Beam were built for CPU-bound tasks (counting words, summing clicks). Large Language Models (LLMs) and Generative AI are GPU-bound.
Running an embedding model (e.g., BERT or CLIP) inside a Spark Executor is painful:
- Scheduling: Spark doesn’t understand “0.5 GPU”. It assumes 1 Task = 1 Core.
- Environment: Managing CUDA drivers inside YARN containers is “Linux dependency hell”.
Enter Ray
Ray is a distributed execution framework built for AI. It allows you to write Python code that scales from your laptop to a cluster of 1,000 GPUs.
Ray Architecture:
- Head Node: Runs the Global Control Store (GCS).
- Worker Nodes: Run the Raylet (Scheduler + Object Store).
- Object Store (Plasma): A shared-memory store. Zero-copy reads between processes on the same node.
Ray Data (The Replacement for Spark?)
Ray Data (formerly Ray Datasets) is designed for “The Last Mile” of Deep Learning data loading.
Comparison: Image Processing Pipeline
Option A: The Old Way (PySpark)
# Spark Code
def process_image(row):
# Setup TensorFlow/PyTorch here? Expensive overhead per row!
model = load_model()
return model(row.image)
# This fails because you can't pickle a Model object to broadcast it
# to workers easily, and loading it per-row is too slow.
df.rdd.map(process_image)
Option B: The Modern Way (Ray)
import ray
from ray.data import ActorPoolStrategy
class GPUInferencer:
def __init__(self):
# Initialized ONCE per worker process
self.model = LoadModel().cuda()
def __call__(self, batch):
# Process a whole batch on the GPU
return self.model.predict(batch["image"])
ds = ray.data.read_parquet("s3://bucket/images")
# Ray intelligently manages the actors
transformed_ds = ds.map_batches(
GPUInferencer,
compute=ActorPoolStrategy(min_size=2, max_size=10), # Autoscaling
num_gpus=1, # Ray ensures each actor gets a dedicated GPU
batch_size=64 # Optimal GPU batch size
)
Deploying Ray on Kubernetes (KubeRay)
The standard way to run Ray in production is via the KubeRay Operator.
apiVersion: ray.io/v1
kind: RayCluster
metadata:
name: genai-processing-cluster
spec:
headGroupSpec:
rayStartParams:
dashboard-host: '0.0.0.0'
template:
spec:
containers:
- name: ray-head
image: rayproject/ray-ml:2.9.0-gpu
resources:
requests:
cpu: 2
memory: 8Gi
workerGroupSpecs:
- replicas: 2
minReplicas: 0
maxReplicas: 10 # Autoscaling
groupName: gpu-group
rayStartParams: {}
template:
spec:
containers:
- name: ray-worker
image: rayproject/ray-ml:2.9.0-gpu
resources:
limits:
nvidia.com/gpu: 1 # Request 1 GPU per pod
requests:
cpu: 4
memory: 16Gi
7. Dataflow Performance Optimization
Problem: Worker Thrashing
Symptom: Dataflow continuously scales up to maxWorkers, then back down, then up again.
Cause: Autoscaling based on CPU is too reactive for bursty ML workloads.
Solution: Use custom autoscaling parameters:
# Launch Dataflow job with tuned autoscaling
python pipeline.py \
--runner=DataflowRunner \
--project=my-project \
--region=us-central1 \
--max_num_workers=100 \
--autoscaling_algorithm=THROUGHPUT_BASED \ # Not CPU-based
--worker_machine_type=n1-standard-8 \
--disk_size_gb=100
Problem: Slow Windowing Operations
Symptom: Windows accumulate too much state before triggering.
Solution: Use triggering strategies to emit partial results:
windowed = input_data | beam.WindowInto(
beam.window.FixedWindows(60),
trigger=AfterWatermark(
early=AfterProcessingTime(10), # Emit every 10 seconds
late=AfterCount(100) # Or after 100 late records
),
accumulation_mode=AccumulationMode.ACCUMULATING
)
8. Dataflow Flex Templates
For production ML pipelines, use Flex Templates to version and deploy pipelines without recompiling.
Dockerfile:
FROM gcr.io/dataflow-templates-base/python38-template-launcher-base
COPY requirements.txt .
RUN pip install -r requirements.txt
COPY pipeline.py .
ENV FLEX_TEMPLATE_PYTHON_PY_FILE="/template/pipeline.py"
Deploy:
# Build and push container
gcloud builds submit --tag gcr.io/my-project/feature-pipeline:v1
# Create template
gcloud dataflow flex-template build gs://my-bucket/templates/feature-pipeline.json \
--image gcr.io/my-project/feature-pipeline:v1 \
--sdk-language PYTHON
# Run template (can be triggered by Cloud Scheduler)
gcloud dataflow flex-template run feature-job-$(date +%s) \
--template-file-gcs-location gs://my-bucket/templates/feature-pipeline.json \
--parameters input_topic=projects/my-project/topics/events \
--parameters output_table=my_dataset.features \
--region us-central1
3.3.5. Cost Optimization & FinOps Strategies
Compute costs are the silent killer. Here are the strategies to reduce the bill by 50-80%.
1. Spot Instance Handling (The “Graceful Death”)
Both AWS and GCP offer Spot/Preemptible instances at ~70% discount. However, they can be reclaimed with 2 minutes’ notice.
- Spark: Spark is resilient to task failure. If a node dies, the stage retries.
- Risk: If the Driver (Master) node is on Spot and dies, the whole job dies.
- Rule: Driver on On-Demand; Executors on Spot.
- Ray: Ray supports object reconstruction. If a node holding an object in plasma memory dies, Ray looks at the lineage graph and re-computes just that object.
2. Autoscale Dampening
Autoscalers (especially in Dataflow and EMR Managed Scaling) can be “twitchy”—scaling up for a momentary burst and then paying for the billing minimum (usually 10 mins or 1 hour).
- Strategy: Set
scale-down-behaviorto be aggressive, butscale-up-behaviorto be conservative. - Dataflow: Use
--maxNumWorkersto set a hard cap. Without this, a bad regular expression could cause Dataflow to spin up 1,000 nodes to process a single corrupt file.
3. The “Format” Optimization
The cost of reading data is determined by the format.
- JSON/CSV: Expensive. Requires parsing every byte.
- Parquet: Cheap. Columnar.
- Predicate Pushdown: If you run
SELECT * FROM data WHERE year=2024, the engine skips reading 90% of the file because the footer metadata tells it where “2024” is.
- Predicate Pushdown: If you run
- Compression: Always use Snappy (speed focus) or ZSTD (compression focus). Avoid GZIP for splittable files (it is not splittable in parallel).
3.3.6. Synthetic Data Generation
Sometimes, you don’t have enough data. Or the data is PII (Personally Identifiable Information) restricted. A growing trend in MLOps is using the compute layer to generate data.
Use Cases
- Cold Start: Simulating user behavior for a new recommendation engine.
- Robustness Testing: Generating adversarial examples to test model stability.
The Tooling
- Numpy/Scikit-learn: Good for simple statistical distributions.
- Unity Perception / Unreal Engine: For Computer Vision. You can run headless Unity instances in Docker containers on EKS/GKE to render millions of synthetic images (e.g., “Person walking across street in rain”) with perfect pixel-level labels.
- LLM Synthesis: Using GPT-4 or Llama-3 to generate synthetic customer support chat logs for training a smaller, private model (Distillation).
4. Committed Use Discounts (CUD) and Savings Plans
For predictable workloads, pre-commit to usage for deep discounts:
AWS:
- Savings Plans: Commit to $X/hour of compute for 1-3 years
- Flexibility: Works across EC2, Fargate, Lambda
- Discount: Up to 72% off on-demand pricing
- Recommendation: Start with 50% of baseline usage on 1-year plan
GCP:
- Committed Use Discounts: Commit to vCPU and memory for 1-3 years
- Discount: Up to 57% off on-demand pricing
- Applies to: Compute Engine, Dataflow, GKE
Calculation Example:
# Baseline: Running 20 x n1-standard-8 VMs continuously
# Monthly cost: 20 × 8 vCPU × $0.0475/hr × 730 hrs = $5,548/month
# On-demand annual: $66,576
# With 1-year CUD (57% discount):
# Annual cost: $66,576 × 0.43 = $28,628
# Savings: $37,948/year
3.3.6. Anti-Patterns and Common Mistakes
Anti-Pattern 1: “Using DataFrame.collect() on Large Datasets”
Symptom:
# BAD: Pulling 100GB into driver memory
df = spark.read.parquet("s3://data/large-dataset/")
all_data = df.collect() # Driver OOM!
Why It Fails: collect() pulls all data to the driver node. If dataset > driver memory, crash.
Solution:
# GOOD: Process in distributed fashion
df.write.parquet("s3://output/processed/")
# Or sample if you need local inspection
sample = df.sample(fraction=0.001).collect() # 0.1% sample
Anti-Pattern 2: “Reading from Database with Single Connection”
Symptom:
# BAD: Single connection, serial reads
df = spark.read.jdbc(
url="jdbc:postgresql://db.example.com/prod",
table="users",
properties={"user": "read_only", "password": "xxx"}
)
# Spark creates ONE connection, reads ALL 100M rows serially
Why It Fails: No parallelism. All executors wait for single JDBC connection.
Solution:
# GOOD: Partition reads across executors
df = spark.read.jdbc(
url="jdbc:postgresql://db.example.com/prod",
table="users",
column="user_id", # Partition column (must be numeric)
lowerBound=0,
upperBound=100000000, # Max user_id
numPartitions=100, # 100 parallel reads
properties={"user": "read_only", "password": "xxx"}
)
# Spark creates 100 connections, each reads 1M rows in parallel
Anti-Pattern 3: “Not Caching Intermediate Results”
Symptom:
# BAD: Recomputing expensive transformation multiple times
raw_df = spark.read.parquet("s3://data/raw/")
cleaned_df = raw_df.filter(...).withColumn(...) # Expensive operation
# Each action triggers full recomputation
cleaned_df.count() # Reads + transforms raw_df
cleaned_df.write.parquet() # Reads + transforms raw_df AGAIN
Solution:
# GOOD: Cache intermediate result
raw_df = spark.read.parquet("s3://data/raw/")
cleaned_df = raw_df.filter(...).withColumn(...).cache() # Mark for caching
# First action materializes cache
cleaned_df.count() # Reads + transforms + caches
# Subsequent actions read from cache
cleaned_df.write.parquet() # Reads from cache (fast!)
# Clean up when done
cleaned_df.unpersist()
Anti-Pattern 4: “Ignoring Partitioning for Time-Series Data”
Symptom:
# BAD: Writing without partitioning
df.write.parquet("s3://data/events/")
# Creates one massive directory with 10,000 files
# Later queries are slow:
# "SELECT * FROM events WHERE date='2024-01-01'"
# Has to scan ALL 10,000 files to find the relevant ones
Solution:
# GOOD: Partition by common query patterns
df.write.partitionBy("year", "month", "day").parquet("s3://data/events/")
# Creates directory structure:
# s3://data/events/year=2024/month=01/day=01/*.parquet
# s3://data/events/year=2024/month=01/day=02/*.parquet
# Later queries are fast (predicate pushdown):
# "SELECT * FROM events WHERE year=2024 AND month=01 AND day=01"
# Only scans 1 day's worth of files
3.3.7. Case Study: Airbnb’s Data Processing Evolution
The Problem (2014)
Airbnb’s data scientists were spending 60% of their time waiting for Hive queries to complete. A simple feature engineering job (joining listings, bookings, and user data) took 12 hours.
Initial Solution: Migrating to Spark (2015)
Results:
- 12-hour jobs reduced to 2 hours (6x speedup)
- Cost increased 40% due to larger cluster requirements
- New problem: Spark job failures due to memory pressure
Optimization Phase (2016-2017)
Key Changes:
- Broadcast Joins: Identified that “listings” table (5GB) was being shuffled repeatedly. Converted to broadcast join.
- Result: 2-hour jobs reduced to 30 minutes
- Partition Tuning: Reduced shuffle partitions from default 200 to 50 for smaller intermediate datasets.
- Result: Eliminated 1000s of small file writes
- Data Format: Migrated from JSON to Parquet.
- Result: Storage reduced by 70%, read performance improved 5x
Current State (2023): Hybrid Architecture
- Batch: EMR with Spark for daily feature engineering (100+ TB processed daily)
- Streaming: Flink for real-time pricing updates (sub-second latency)
- ML Inference: Ray for batch prediction (embedding generation for search)
Key Metrics:
- Data processing cost: $200k/month (down from $800k/month in 2015)
- Data scientist productivity: 5x improvement (measured by experiment velocity)
- Feature freshness: From daily to hourly for critical features
3.3.8. Troubleshooting Guide
Problem: Job Stuck in “RUNNING” with No Progress
Diagnosis:
# Check Spark UI
# Look at "Stages" tab
# If one stage is at 99% complete for >30 mins, there's a straggler
# Identify the straggler task
# In Spark UI > Stages > Task Metrics
# Sort by "Duration" - the slowest task is the culprit
Common Causes:
- Data Skew: One partition has 10x more data
- Resource Starvation: Other jobs are consuming cluster resources
- Network Issues: Slow network to S3/GCS
Solutions:
- Enable Adaptive Query Execution (handles skew automatically)
- Kill competing jobs or increase cluster size
- Check S3/GCS request metrics for throttling
Problem: “Py4JNetworkError” in PySpark
Symptom:
py4j.protocol.Py4JNetworkError: Error while sending or receiving
Cause: Python worker process crashed or timed out communicating with JVM.
Common Triggers:
- Python function raising exception
- Python dependency not installed on all workers
- Memory leak in Python UDF
Solution:
# Add error handling in UDFs
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
@udf(returnType=StringType())
def safe_process(value):
try:
return complex_processing(value)
except Exception as e:
# Log error but don't crash
return f"ERROR: {str(e)}"
Problem: Dataflow Job Stuck in “Draining”
Symptom: Dataflow streaming job won’t cancel, stuck in “Draining” state for hours.
Cause: Pipeline is waiting for in-flight elements to complete. Usually due to a PTransform that never completes.
Solution:
# Force cancel (data loss possible)
gcloud dataflow jobs cancel JOB_ID --force --region us-central1
# Better: Fix the pipeline
# Check for:
# 1. Stateful transforms that accumulate unbounded state
# 2. External API calls that time out
# 3. Missing watermark advancement
3.3.9. Best Practices Summary
-
Start with Parquet: Always use columnar formats (Parquet, ORC) for intermediate data. JSON/CSV are for ingestion only.
-
Partition Strategically: Partition by common query patterns (date/time, region, category). Avoid over-partitioning (<1GB per partition).
-
Monitor Resource Utilization: Track CPU, memory, disk, network. Identify bottlenecks before they become outages.
-
Test at Scale: Don’t just test on 1GB samples. Test on 10% of production data to catch performance issues.
-
Separate Concerns: Use different clusters for experimentation vs. production. Don’t let ad-hoc queries slow down critical pipelines.
-
Version Your Code: Use git tags for production pipelines. Know exactly what code ran when things break.
-
Document Tuning Decisions: When you change
spark.sql.shuffle.partitions, write a comment explaining why. Future you will thank you. -
Fail Fast: Add data quality checks early in the pipeline. Better to fail fast than process garbage for 3 hours.
3.3.10. Exercises for the Reader
Exercise 1: Join Optimization Take an existing Spark join in your codebase. Measure its execution time. Apply broadcast join optimization. Measure again. Calculate cost savings.
Exercise 2: Partitioning Analysis Query your Parquet data lake. How many partitions does a typical query scan? If > 1000, consider repartitioning by query patterns.
Exercise 3: Cost Attribution For one week, tag all EMR/Dataflow jobs with cost center tags. Generate a report of cost by team/project. Identify the top 3 most expensive jobs.
Exercise 4: Failure Simulation In a test environment, kill a worker node during a Spark job. Observe recovery behavior. How long until recovery? Is data lost?
Exercise 5: Data Skew Detection Write a Spark job that detects data skew automatically. For each key, compute: mean records per key, max records per key. Alert if max > 10x mean.
3.3.11. Decision Matrix: Selecting the Engine
| Feature | AWS EMR (Spark) | AWS Glue | GCP Dataflow | Ray (KubeRay) |
|---|---|---|---|---|
| Primary Use Case | Massive Historical Batch Processing (Petabytes) | Simple, irregular ETL tasks | Complex Streaming & Unified Batch/Stream | GenAI, LLM Fine-tuning, Reinforcement Learning |
| Language | Python (PySpark), Scala, SQL | Python, Scala | Python, Java | Python Native |
| Latency | Minutes (Batch) | Minutes (Cold Start) | Seconds to Sub-second | Milliseconds |
| Ops Complexity | High (Cluster Tuning) | Low (Serverless) | Medium (Managed Service) | High (Kubernetes Mgmt) |
| Cost | Low (if using Spot) | High (Premium pricing) | Medium | Medium (GPU costs) |
| Best For MLOps? | Data Prep (Pre-training) | Ad-hoc Scripts | Real-time Features | Deep Learning Jobs |
The Architect’s Verdict
- Standardize on Parquet: Regardless of the engine, store your intermediate data in Parquet on S3/GCS. This allows you to switch engines later (e.g., write with Spark, read with Ray) without migration.
- Separate Compute: Do not use your Training Cluster (Ray/Slurm) for Data Processing (Spark). GPUs are too expensive to be used for parsing JSON logs.
- Code for the Future: If you are building a new platform today, lean heavily towards Ray for Python-centric ML workflows, and Dataflow/Flink for streaming features. The dominance of Spark is waning in the GenAI era.
3.3.12. Future Trends
1. Serverless Spark
Both AWS (EMR Serverless) and GCP (Dataproc Serverless) are pushing toward “Spark without clusters.”
Benefits:
- No cluster management
- Auto-scaling from 0 to 1000s of workers
- Pay only for execution time (second-level billing)
Trade-offs:
- Higher per-vCPU cost (~2x on-demand)
- Less control over instance types and networking
- Cold start latency (30-60 seconds)
Recommendation: Use serverless for sporadic, unpredictable workloads. Use managed clusters for continuous production pipelines.
2. SQL-First Data Processing
The rise of tools like dbt (data build tool) is pushing toward “SQL as the processing layer.”
Instead of writing PySpark:
df = spark.read.parquet("events")
df.filter(col("date") > "2024-01-01").groupBy("user_id").agg(...)
You write SQL:
-- models/user_features.sql
SELECT
user_id,
COUNT(*) as event_count,
MAX(timestamp) as last_event
FROM {{ ref('events') }}
WHERE date > '2024-01-01'
GROUP BY user_id
dbt compiles this to optimized Spark/Trino/BigQuery and handles dependencies, testing, and documentation.
Trend: Data Scientists prefer SQL over Scala/Java. Tools that hide the complexity of distributed computing behind SQL will win.
3. In-Database ML
Running ML training directly in the data warehouse (BigQuery ML, Snowflake ML, Redshift ML) eliminates data movement.
Example:
-- Train a linear regression model in BigQuery
CREATE MODEL my_dataset.sales_model
OPTIONS(model_type='linear_reg', input_label_cols=['sales']) AS
SELECT
price,
marketing_spend,
seasonality,
sales
FROM my_dataset.historical_sales;
-- Predict
SELECT predicted_sales
FROM ML.PREDICT(MODEL my_dataset.sales_model, TABLE my_dataset.new_data);
Limitation: Only supports basic ML algorithms (linear regression, XGBoost, AutoML). For deep learning, still need to export data.
Trend: For simple ML (churn prediction, fraud detection with tabular data), in-database ML reduces engineering overhead by 90%.
4. GPU-Native Processing
The next frontier: Processing engines that natively support GPU acceleration.
RAPIDS (NVIDIA):
- cuDF: Pandas-like API on GPUs
- cuML: Scikit-learn-like API on GPUs
- Spark RAPIDS: GPU-accelerated Spark operations
Performance: 10-50x speedup for operations like joins, sorts, and aggregations on GPUs vs. CPUs.
Cost: A100 instance costs 10x more than CPU instance. Only pays off for compute-heavy operations (string parsing, regex, complex aggregations).
3.3.13. Implementation Checklist
Before deploying a production data processing pipeline:
Pre-Deployment:
- Tested on 10% of production data volume
- Configured monitoring (job duration, rows processed, cost per run)
- Set up alerting (job failure, duration > 2x baseline)
- Documented cluster configuration and tuning parameters
- Implemented error handling and retry logic
- Tagged resources for cost attribution
- Estimated monthly cost at full production load
Post-Deployment:
- Monitored first 5 runs for anomalies
- Validated output data quality (row counts, schema, null rates)
- Reviewed Spark UI / Dataflow metrics for bottlenecks
- Created runbook for common issues
- Established SLA (e.g., “Job must complete within 2 hours”)
- Scheduled regular cost reviews (monthly)
Optimization Phase:
- Applied broadcast join optimizations where applicable
- Tuned partitioning strategy based on query patterns
- Enabled caching for frequently accessed intermediate results
- Migrated to spot/preemptible instances for non-critical workloads
- Implemented data quality checks and early failure detection
- Documented all performance tuning decisions
3.3.14. Summary: The Heavy Lifters
Processing engines are the workhorses of MLOps. They transform raw logs into training datasets, power real-time feature computation, and enable experimentation at scale.
Key Takeaways:
-
Physics First: Understand the fundamental constraints (shuffle, serialization, state management) before choosing tools. These constraints apply to all engines.
-
Match Tool to Use Case:
- Spark/EMR: Massive batch ETL, cost-sensitive workloads
- Beam/Dataflow: Complex streaming, unified batch/stream
- Ray: GPU-bound ML workloads, GenAI pipelines
-
Cost is a First-Class Concern: A poorly optimized join can cost $100k/year. Invest time in understanding broadcast joins, partition tuning, and spot instances.
-
Start Simple, Optimize Pragmatically: Don’t prematurely optimize. Start with default settings, measure performance, identify bottlenecks, then tune. Most “performance issues” are actually “wrong algorithm” issues (O(n²) when O(n log n) was available).
-
Observability is Non-Negotiable: If you can’t measure it, you can’t improve it. Instrument your pipelines with custom metrics. Track cost per job, rows processed per dollar, job duration trends.
-
Embrace the Lakehouse: Use open table formats (Parquet, Iceberg, Delta) as your intermediate storage layer. This gives you flexibility to switch processing engines without data migration.
-
Test Failure Scenarios: Your pipeline will fail. Test how it recovers. Can it resume from checkpoint? Does it create duplicate data? Does it alert the right people?
-
The Future is Serverless and GPU-Native: The industry is moving toward “processing without clusters” and “GPU-accelerated everything.” Build your platform to be portable (avoid vendor lock-in).
The Meta-Lesson:
Processing engines are not the differentiator. Your competitors use Spark too. The differentiator is:
- How fast can your data scientists iterate (experimentation velocity)
- How reliably do your pipelines run (uptime SLA)
- How efficiently do you use compute (cost per prediction)
Optimize for these outcomes, not for “using the latest technology.”
In Part IV, we shift from Data to Models: how to train, tune, serve, and monitor machine learning models at production scale—without losing your mind or your budget.