Keyboard shortcuts

Press or to navigate between chapters

Press ? to show this help

Press Esc to hide this help

9.3. Processing Engines: The Heavy Lifters

“The efficiency of an AI organization is inversely proportional to the amount of time its data scientists spend waiting for a progress bar. Distributed computing is the art of deleting that progress bar, at the cost of your sanity.”

In the MLOps Lifecycle, the Processing Layer is where the “Raw Material” (Log Data) is refined into “Fuel” (Tensors). This is the bridge between the messy reality of the world and the mathematical purity of the model.

If you get Storage (Chapter 3.2) wrong, your system is slow. If you get Processing (Chapter 3.3) wrong, your system is insolvent. A poorly written join in a distributed system does not just fail; it silently consumes 5,000 vCPUs for 12 hours before failing.

This chapter is a technical deep dive into the three dominant computation engines in modern MLOps: Apache Spark (AWS EMR), Apache Beam (GCP Dataflow), and the rising star, Ray. We will explore their internal architectures, their specific implementations on AWS and Google Cloud, and the “Black Magic” required to tune them.


3.3.1. The Physics of Distributed Compute

Before discussing specific tools, we must agree on the fundamental constraints of distributed data processing. Whether you use Spark, Flink, or Beam, you are fighting the same three physics problems:

1. The Shuffle (The Network Bottleneck)

The “Shuffle” is the process of redistributing data across the cluster so that all data belonging to a specific key (e.g., User_ID) ends up on the same physical machine.

  • The Cost: Shuffle requires serializing data, transmitting it over TCP/IP, and deserializing it.
  • The Failure Mode: If a single node holds 20% of the keys (Data Skew), that node becomes a straggler. The entire cluster waits for one machine.
  • MLOps Context: Doing a JOIN between a 1PB “Click Logs” table and a 500GB “User Metadata” table is the single most expensive operation in Feature Engineering.

2. Serialization (The CPU Bottleneck)

In the Cloud, CPU time is money. In Python-based MLOps (PySpark/Beam Python), 40% to 60% of CPU cycles are often spent converting data formats:

  • Java Object $\leftrightarrow$ Pickle (Python)
  • Network Byte Stream $\leftrightarrow$ In-Memory Object
  • Row-based format $\leftrightarrow$ Columnar format (Parquet/Arrow)

3. State Management (The Memory Bottleneck)

Stream processing is stateful. Calculating “Clicks in the last 24 hours” requires holding 24 hours of history in memory.

  • The Challenge: What happens if the cluster crashes? The state must be checkpointed to durable storage (S3/GCS/HDFS).
  • The Trade-off: Frequent checkpointing guarantees correctness but kills throughput. Infrequent checkpointing risks data loss or long recovery times.

Real-World Example: The Cost of Poor Processing Design

Scenario: A mid-size e-commerce company needs to join two datasets for ML training:

  • Orders table: 100M rows, 50GB (user_id, order_id, timestamp, amount)
  • User features table: 10M rows, 5GB (user_id, age, country, lifetime_value)

Naive Implementation (Cost: $450):

# BAD: This causes a full shuffle of 100M rows
orders_df = spark.read.parquet("s3://data/orders/")
users_df = spark.read.parquet("s3://data/users/")

# The JOIN triggers a massive shuffle
result = orders_df.join(users_df, on="user_id")
result.write.parquet("s3://output/joined/")

# EMR Cluster: 50 x r5.4xlarge for 3 hours
# Cost: 50 × $1.008/hr × 3hr = $151/run × 3 runs/day = $450/day

Optimized Implementation (Cost: $30):

# GOOD: Broadcast the small table to avoid shuffle
from pyspark.sql.functions import broadcast

orders_df = spark.read.parquet("s3://data/orders/")
users_df = spark.read.parquet("s3://data/users/")

# Force broadcast join (users table < 8GB, fits in memory)
result = orders_df.join(broadcast(users_df), on="user_id")
result.write.parquet("s3://output/joined/")

# EMR Cluster: 10 x r5.4xlarge for 0.5 hours
# Cost: 10 × $1.008/hr × 0.5hr = $5/run × 3 runs/day = $15/day
# Plus data transfer savings

Savings: $435/day = $13,050/month = $156,600/year

The difference? Understanding that the smaller table can fit in memory on each executor, eliminating network shuffle.


3.3.2. AWS Architecture: The EMR Ecosystem

Amazon Elastic MapReduce (EMR) is the Swiss Army Knife of AWS data processing. It is not a single engine; it is a managed platform for Hadoop, Spark, Hive, Presto, and Hudi.

1. EMR Deployment Modes

AWS offers three distinct ways to run EMR. Choosing the wrong one is a common architectural error.

ModeArchitectureStartup TimeCost ModelBest For
EMR on EC2Traditional Clusters. You manage the OS/Nodes.7-15 minsPer Instance/HrMassive, long-running batch jobs (Petabytes).
EMR on EKSDockerized Spark on Kubernetes.1-2 minsPer vCPU/HrIterative ML experiments, CI/CD pipelines.
EMR ServerlessFully abstract. No instance management.~1 minPremiumSporadic, bursty workloads.

2. EMR on EC2: The “Instance Fleet” Strategy

For training data preparation, we typically need massive throughput for short periods. The most cost-effective pattern is using Instance Fleets with Spot Allocation Strategies.

The Terraform Configuration: This configuration ensures that if r5.4xlarge is out of stock in us-east-1a, EMR automatically attempts to provision r5.8xlarge or r4.4xlarge instead, preventing pipeline failure.

resource "aws_emr_cluster" "feature_engineering" {
  name          = "mlops-feature-eng-prod"
  release_label = "emr-7.1.0" # Always use latest for Spark/Arrow optimizations
  applications  = ["Spark", "Hadoop", "Livy"]

  # 1. Master Node (The Brain) - ALWAYS ON-DEMAND
  master_instance_fleet {
    name = "Master-Fleet"
    instance_type_configs {
      instance_type = "m5.xlarge"
    }
    target_on_demand_capacity = 1
  }

  # 2. Core Nodes (HDFS Storage) - ON-DEMAND PREFERRED
  # We need HDFS for intermediate shuffle data, even if input/output is S3.
  core_instance_fleet {
    name = "Core-Fleet"
    instance_type_configs {
      instance_type = "r5.2xlarge"
    }
    target_on_demand_capacity = 2
  }

  # 3. Task Nodes (Pure Compute) - SPOT INSTANCES
  # This is where the heavy lifting happens. We bid on Spot.
  task_instance_fleet {
    name = "Task-Fleet"
    
    # Diversify types to increase Spot availability probability
    instance_type_configs {
      instance_type = "c5.4xlarge"
      weighted_capacity = 1
    }
    instance_type_configs {
      instance_type = "c5.9xlarge"
      weighted_capacity = 2
    }
    
    target_spot_capacity = 100 # Spin up 100 nodes cheaply
    launch_specifications {
      spot_specification {
        allocation_strategy      = "capacity-optimized" 
        timeout_action           = "SWITCH_TO_ON_DEMAND" # Failover safety
        timeout_duration_minutes = 10
      }
    }
  }
}

3. Tuning Spark for Deep Learning Data

Standard Spark is tuned for ETL (aggregating sales numbers). MLOps Spark (processing images/embeddings) requires specific tuning.

The spark-defaults.conf Bible:

# 1. Memory Management for Large Tensors
# Increase overhead memory because Python processes (PyTorch/Numpy)
# run outside the JVM heap.
spark.executor.memoryOverhead = 4g
spark.executor.memory = 16g
spark.driver.memory = 8g

# 2. Apache Arrow (The Speedup)
# Critical for PySpark. Enables zero-copy data transfer between JVM and Python.
spark.sql.execution.arrow.pyspark.enabled = true
spark.sql.execution.arrow.maxRecordsPerBatch = 10000

# 3. S3 Performance (The Commit Protocol)
# "Magic Committer" writes directly to S3, bypassing the "Rename" step.
# Without this, the final step of your job will hang for hours.
spark.hadoop.fs.s3a.bucket.all.committer.magic.enabled = true
spark.sql.sources.commitProtocolClass = org.apache.spark.internal.io.cloud.PathOutputCommitProtocol
spark.sql.parquet.output.committer.class = org.apache.spark.internal.io.cloud.BindingParquetOutputCommitter

# 4. Shuffle Optimization
# For ML, we often have fewer, larger partitions.
spark.sql.shuffle.partitions = 500  # Default is 200
spark.default.parallelism = 500

4. Advanced Performance Tuning

Problem: Data Skew

Data skew occurs when one partition has significantly more data than others. This causes one executor to become a straggler while others sit idle.

Diagnosis:

# Check partition distribution
df.groupBy(spark_partition_id()).count().show()

# If one partition has 10x more rows than others, you have skew

Solutions:

A) Salting (for Join Skew):

from pyspark.sql.functions import rand, col, concat, lit

# Problem: user_id="whale_user" has 1M orders, all other users have <100
# This single user dominates one partition

# Solution: Add random "salt" to distribute the whale
skewed_df = orders_df.withColumn("salt", (rand() * 10).cast("int"))
skewed_df = skewed_df.withColumn("join_key", concat(col("user_id"), lit("_"), col("salt")))

# Replicate the smaller table with all salt values
users_salted = users_df.crossJoin(
    spark.range(10).select(col("id").alias("salt"))
).withColumn("join_key", concat(col("user_id"), lit("_"), col("salt")))

# Now join is distributed across 10 partitions instead of 1
result = skewed_df.join(users_salted, on="join_key")

B) Adaptive Query Execution (Spark 3.0+):

# Enable AQE (Adaptive Query Execution)
# Spark dynamically adjusts execution plan based on runtime statistics
spark.sql.adaptive.enabled = true
spark.sql.adaptive.coalescePartitions.enabled = true
spark.sql.adaptive.skewJoin.enabled = true
spark.sql.adaptive.skewJoin.skewedPartitionFactor = 5
spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes = 256MB

With AQE enabled, Spark automatically detects skewed partitions and splits them during execution—no manual intervention required.

5. Memory Pressure Debugging

Symptom: Jobs fail with “OutOfMemoryError” or “Container killed by YARN for exceeding memory limits”

Diagnosis Tools:

# Enable detailed GC logging
spark.executor.extraJavaOptions = -XX:+PrintGCDetails -XX:+PrintGCTimeStamps

# Access Spark UI
# http://<driver-node>:4040/executors/
# Look for:
# - High "GC Time" (> 10% of task time = memory pressure)
# - Frequent "Spill to Disk" (indicates not enough memory for shuffle)

Solutions:

# Option 1: Increase executor memory
spark.executor.memory = 32g
spark.executor.memoryOverhead = 8g

# Option 2: Reduce parallelism (fewer concurrent tasks = less memory per executor)
spark.executor.cores = 2  # Down from default 4

# Option 3: Enable off-heap memory for shuffle
spark.memory.offHeap.enabled = true
spark.memory.offHeap.size = 10g

6. Monitoring EMR Jobs

CloudWatch Metrics (Automatic):

  • IsIdle - Is the cluster running tasks?
  • MRActiveNodes - Number of active nodes
  • HDFSUtilization - Are we running out of HDFS space?

Custom Metrics (Push to CloudWatch):

import boto3

cloudwatch = boto3.client('cloudwatch')

def log_job_metrics(job_name, rows_processed, duration_sec):
    cloudwatch.put_metric_data(
        Namespace='MLOps/DataProcessing',
        MetricData=[
            {
                'MetricName': 'RowsProcessed',
                'Value': rows_processed,
                'Unit': 'Count',
                'Dimensions': [{'Name': 'Job', 'Value': job_name}]
            },
            {
                'MetricName': 'JobDuration',
                'Value': duration_sec,
                'Unit': 'Seconds',
                'Dimensions': [{'Name': 'Job', 'Value': job_name}]
            }
        ]
    )

# Use in Spark job
start_time = time.time()
result_df = process_data()
rows = result_df.count()
duration = time.time() - start_time
log_job_metrics("feature_engineering", rows, duration)

Alerting Strategy:

  • Critical: Job fails (EMR Step State = FAILED)
  • Warning: Job duration > 2x baseline, Memory utilization > 90%
  • Info: Job completes successfully

3.3.3. GCP Architecture: The Dataflow Difference

Google Cloud Dataflow is a managed service for Apache Beam. Unlike Spark, which exposes the cluster (Drivers/Executors), Dataflow exposes a Job Service.

1. The Beam Programming Model

Understanding Beam is a prerequisite for GCP Dataflow. It unifies Batch and Stream into a single semantic model.

  • PCollection: A distributed dataset (bounded or unbounded).
  • PTransform: An operation (Map, Filter, Group).
  • Pipeline: The DAG (Directed Acyclic Graph) of transforms.
  • Window: How you slice time (Fixed, Sliding, Session).
  • Trigger: When you emit results (Early, On-time, Late).

2. The Streaming Engine & Shuffle Service

Dataflow separates compute from state.

  • Scenario: You are calculating a 24-hour sliding window of user activity.
  • In Spark: The state (24 hours of data) is stored on the Worker Node’s local disk (RocksDB). If the node fails, the state must be recovered from a checkpoint, causing a latency spike.
  • In Dataflow: The state is offloaded to the Streaming Engine (a remote, managed tiered storage service).
    • Result: Compute is stateless. You can scale down from 100 workers to 1 worker instantly without losing data. The new worker simply queries the Streaming Engine for the state it needs.

3. Handling “Late Data” in MLOps

In ML Feature Engineering, “Event Time” matters more than “Processing Time”. If a user clicks an ad at 12:00, but the log arrives at 12:15 due to network lag, it must be counted in the 12:00 bucket.

Beam Python Code for Robust Feature Engineering:

import apache_beam as beam
from apache_beam.transforms.trigger import AfterWatermark, AfterProcessingTime, AccumulatingFiredPanes

def run_pipeline():
    with beam.Pipeline(options=pipeline_options) as p:
        (p 
         | 'ReadPubSub' >> beam.io.ReadFromPubSub(topic=input_topic)
         | 'ParseJson' >> beam.Map(json.loads)
         | 'AddTimestamp' >> beam.Map(lambda x: beam.window.TimestampedValue(x, x['event_timestamp']))
         
         # THE MAGIC: Windowing with Late Data Handling
         | 'Window' >> beam.WindowInto(
             beam.window.FixedWindows(60), # 1-minute windows
             
             # Triggering Strategy:
             # 1. Emit purely speculative results every 10 seconds (for real-time dashboards)
             # 2. Emit the "Final" result when the Watermark passes (completeness)
             # 3. Update the result if late data arrives (correctness)
             trigger=AfterWatermark(
                 early=AfterProcessingTime(10),
                 late=AfterProcessingTime(10)
             ),
             
             # How strictly do we drop old data?
             allowed_lateness=3600, # Allow data up to 1 hour late
             accumulation_mode=AccumulatingFiredPanes() # Add late data to previous sum
           )
           
         | 'CalculateFeature' >> beam.CombineGlobally(SumFn()).without_defaults()
         | 'WriteToFeatureStore' >> beam.ParDo(WriteToRedis())
        )

This level of granular control over time is why sophisticated ML teams (Spotify, Twitter/X, Lyft) prefer Beam/Dataflow for real-time features, despite the steeper learning curve compared to Spark.


3.3.4. The Emerging “GenAI” Stack: Ray

Spark and Beam were built for CPU-bound tasks (counting words, summing clicks). Large Language Models (LLMs) and Generative AI are GPU-bound.

Running an embedding model (e.g., BERT or CLIP) inside a Spark Executor is painful:

  1. Scheduling: Spark doesn’t understand “0.5 GPU”. It assumes 1 Task = 1 Core.
  2. Environment: Managing CUDA drivers inside YARN containers is “Linux dependency hell”.

Enter Ray

Ray is a distributed execution framework built for AI. It allows you to write Python code that scales from your laptop to a cluster of 1,000 GPUs.

Ray Architecture:

  • Head Node: Runs the Global Control Store (GCS).
  • Worker Nodes: Run the Raylet (Scheduler + Object Store).
  • Object Store (Plasma): A shared-memory store. Zero-copy reads between processes on the same node.

Ray Data (The Replacement for Spark?)

Ray Data (formerly Ray Datasets) is designed for “The Last Mile” of Deep Learning data loading.

Comparison: Image Processing Pipeline

Option A: The Old Way (PySpark)

# Spark Code
def process_image(row):
    # Setup TensorFlow/PyTorch here? Expensive overhead per row!
    model = load_model() 
    return model(row.image)

# This fails because you can't pickle a Model object to broadcast it 
# to workers easily, and loading it per-row is too slow.
df.rdd.map(process_image) 

Option B: The Modern Way (Ray)

import ray
from ray.data import ActorPoolStrategy

class GPUInferencer:
    def __init__(self):
        # Initialized ONCE per worker process
        self.model = LoadModel().cuda()
        
    def __call__(self, batch):
        # Process a whole batch on the GPU
        return self.model.predict(batch["image"])

ds = ray.data.read_parquet("s3://bucket/images")

# Ray intelligently manages the actors
transformed_ds = ds.map_batches(
    GPUInferencer,
    compute=ActorPoolStrategy(min_size=2, max_size=10), # Autoscaling
    num_gpus=1,     # Ray ensures each actor gets a dedicated GPU
    batch_size=64   # Optimal GPU batch size
)

Deploying Ray on Kubernetes (KubeRay)

The standard way to run Ray in production is via the KubeRay Operator.

apiVersion: ray.io/v1
kind: RayCluster
metadata:
  name: genai-processing-cluster
spec:
  headGroupSpec:
    rayStartParams:
      dashboard-host: '0.0.0.0'
    template:
      spec:
        containers:
        - name: ray-head
          image: rayproject/ray-ml:2.9.0-gpu
          resources:
            requests:
              cpu: 2
              memory: 8Gi
  workerGroupSpecs:
  - replicas: 2
    minReplicas: 0
    maxReplicas: 10 # Autoscaling
    groupName: gpu-group
    rayStartParams: {}
    template:
      spec:
        containers:
        - name: ray-worker
          image: rayproject/ray-ml:2.9.0-gpu
          resources:
            limits:
              nvidia.com/gpu: 1 # Request 1 GPU per pod
            requests:
              cpu: 4
              memory: 16Gi

7. Dataflow Performance Optimization

Problem: Worker Thrashing

Symptom: Dataflow continuously scales up to maxWorkers, then back down, then up again.

Cause: Autoscaling based on CPU is too reactive for bursty ML workloads.

Solution: Use custom autoscaling parameters:

# Launch Dataflow job with tuned autoscaling
python pipeline.py \
    --runner=DataflowRunner \
    --project=my-project \
    --region=us-central1 \
    --max_num_workers=100 \
    --autoscaling_algorithm=THROUGHPUT_BASED \  # Not CPU-based
    --worker_machine_type=n1-standard-8 \
    --disk_size_gb=100

Problem: Slow Windowing Operations

Symptom: Windows accumulate too much state before triggering.

Solution: Use triggering strategies to emit partial results:

windowed = input_data | beam.WindowInto(
    beam.window.FixedWindows(60),
    trigger=AfterWatermark(
        early=AfterProcessingTime(10),  # Emit every 10 seconds
        late=AfterCount(100)             # Or after 100 late records
    ),
    accumulation_mode=AccumulationMode.ACCUMULATING
)

8. Dataflow Flex Templates

For production ML pipelines, use Flex Templates to version and deploy pipelines without recompiling.

Dockerfile:

FROM gcr.io/dataflow-templates-base/python38-template-launcher-base

COPY requirements.txt .
RUN pip install -r requirements.txt

COPY pipeline.py .
ENV FLEX_TEMPLATE_PYTHON_PY_FILE="/template/pipeline.py"

Deploy:

# Build and push container
gcloud builds submit --tag gcr.io/my-project/feature-pipeline:v1

# Create template
gcloud dataflow flex-template build gs://my-bucket/templates/feature-pipeline.json \
    --image gcr.io/my-project/feature-pipeline:v1 \
    --sdk-language PYTHON

# Run template (can be triggered by Cloud Scheduler)
gcloud dataflow flex-template run feature-job-$(date +%s) \
    --template-file-gcs-location gs://my-bucket/templates/feature-pipeline.json \
    --parameters input_topic=projects/my-project/topics/events \
    --parameters output_table=my_dataset.features \
    --region us-central1

3.3.5. Cost Optimization & FinOps Strategies

Compute costs are the silent killer. Here are the strategies to reduce the bill by 50-80%.

1. Spot Instance Handling (The “Graceful Death”)

Both AWS and GCP offer Spot/Preemptible instances at ~70% discount. However, they can be reclaimed with 2 minutes’ notice.

  • Spark: Spark is resilient to task failure. If a node dies, the stage retries.
    • Risk: If the Driver (Master) node is on Spot and dies, the whole job dies.
    • Rule: Driver on On-Demand; Executors on Spot.
  • Ray: Ray supports object reconstruction. If a node holding an object in plasma memory dies, Ray looks at the lineage graph and re-computes just that object.

2. Autoscale Dampening

Autoscalers (especially in Dataflow and EMR Managed Scaling) can be “twitchy”—scaling up for a momentary burst and then paying for the billing minimum (usually 10 mins or 1 hour).

  • Strategy: Set scale-down-behavior to be aggressive, but scale-up-behavior to be conservative.
  • Dataflow: Use --maxNumWorkers to set a hard cap. Without this, a bad regular expression could cause Dataflow to spin up 1,000 nodes to process a single corrupt file.

3. The “Format” Optimization

The cost of reading data is determined by the format.

  • JSON/CSV: Expensive. Requires parsing every byte.
  • Parquet: Cheap. Columnar.
    • Predicate Pushdown: If you run SELECT * FROM data WHERE year=2024, the engine skips reading 90% of the file because the footer metadata tells it where “2024” is.
  • Compression: Always use Snappy (speed focus) or ZSTD (compression focus). Avoid GZIP for splittable files (it is not splittable in parallel).

3.3.6. Synthetic Data Generation

Sometimes, you don’t have enough data. Or the data is PII (Personally Identifiable Information) restricted. A growing trend in MLOps is using the compute layer to generate data.

Use Cases

  1. Cold Start: Simulating user behavior for a new recommendation engine.
  2. Robustness Testing: Generating adversarial examples to test model stability.

The Tooling

  • Numpy/Scikit-learn: Good for simple statistical distributions.
  • Unity Perception / Unreal Engine: For Computer Vision. You can run headless Unity instances in Docker containers on EKS/GKE to render millions of synthetic images (e.g., “Person walking across street in rain”) with perfect pixel-level labels.
  • LLM Synthesis: Using GPT-4 or Llama-3 to generate synthetic customer support chat logs for training a smaller, private model (Distillation).

4. Committed Use Discounts (CUD) and Savings Plans

For predictable workloads, pre-commit to usage for deep discounts:

AWS:

  • Savings Plans: Commit to $X/hour of compute for 1-3 years
    • Flexibility: Works across EC2, Fargate, Lambda
    • Discount: Up to 72% off on-demand pricing
    • Recommendation: Start with 50% of baseline usage on 1-year plan

GCP:

  • Committed Use Discounts: Commit to vCPU and memory for 1-3 years
    • Discount: Up to 57% off on-demand pricing
    • Applies to: Compute Engine, Dataflow, GKE

Calculation Example:

# Baseline: Running 20 x n1-standard-8 VMs continuously
# Monthly cost: 20 × 8 vCPU × $0.0475/hr × 730 hrs = $5,548/month
# On-demand annual: $66,576

# With 1-year CUD (57% discount):
# Annual cost: $66,576 × 0.43 = $28,628
# Savings: $37,948/year

3.3.6. Anti-Patterns and Common Mistakes

Anti-Pattern 1: “Using DataFrame.collect() on Large Datasets”

Symptom:

# BAD: Pulling 100GB into driver memory
df = spark.read.parquet("s3://data/large-dataset/")
all_data = df.collect()  # Driver OOM!

Why It Fails: collect() pulls all data to the driver node. If dataset > driver memory, crash.

Solution:

# GOOD: Process in distributed fashion
df.write.parquet("s3://output/processed/")

# Or sample if you need local inspection
sample = df.sample(fraction=0.001).collect()  # 0.1% sample

Anti-Pattern 2: “Reading from Database with Single Connection”

Symptom:

# BAD: Single connection, serial reads
df = spark.read.jdbc(
    url="jdbc:postgresql://db.example.com/prod",
    table="users",
    properties={"user": "read_only", "password": "xxx"}
)
# Spark creates ONE connection, reads ALL 100M rows serially

Why It Fails: No parallelism. All executors wait for single JDBC connection.

Solution:

# GOOD: Partition reads across executors
df = spark.read.jdbc(
    url="jdbc:postgresql://db.example.com/prod",
    table="users",
    column="user_id",           # Partition column (must be numeric)
    lowerBound=0,
    upperBound=100000000,       # Max user_id
    numPartitions=100,          # 100 parallel reads
    properties={"user": "read_only", "password": "xxx"}
)
# Spark creates 100 connections, each reads 1M rows in parallel

Anti-Pattern 3: “Not Caching Intermediate Results”

Symptom:

# BAD: Recomputing expensive transformation multiple times
raw_df = spark.read.parquet("s3://data/raw/")
cleaned_df = raw_df.filter(...).withColumn(...)  # Expensive operation

# Each action triggers full recomputation
cleaned_df.count()          # Reads + transforms raw_df
cleaned_df.write.parquet()  # Reads + transforms raw_df AGAIN

Solution:

# GOOD: Cache intermediate result
raw_df = spark.read.parquet("s3://data/raw/")
cleaned_df = raw_df.filter(...).withColumn(...).cache()  # Mark for caching

# First action materializes cache
cleaned_df.count()          # Reads + transforms + caches

# Subsequent actions read from cache
cleaned_df.write.parquet()  # Reads from cache (fast!)

# Clean up when done
cleaned_df.unpersist()

Anti-Pattern 4: “Ignoring Partitioning for Time-Series Data”

Symptom:

# BAD: Writing without partitioning
df.write.parquet("s3://data/events/")
# Creates one massive directory with 10,000 files

# Later queries are slow:
# "SELECT * FROM events WHERE date='2024-01-01'"
# Has to scan ALL 10,000 files to find the relevant ones

Solution:

# GOOD: Partition by common query patterns
df.write.partitionBy("year", "month", "day").parquet("s3://data/events/")
# Creates directory structure:
# s3://data/events/year=2024/month=01/day=01/*.parquet
# s3://data/events/year=2024/month=01/day=02/*.parquet

# Later queries are fast (predicate pushdown):
# "SELECT * FROM events WHERE year=2024 AND month=01 AND day=01"
# Only scans 1 day's worth of files

3.3.7. Case Study: Airbnb’s Data Processing Evolution

The Problem (2014)

Airbnb’s data scientists were spending 60% of their time waiting for Hive queries to complete. A simple feature engineering job (joining listings, bookings, and user data) took 12 hours.

Initial Solution: Migrating to Spark (2015)

Results:

  • 12-hour jobs reduced to 2 hours (6x speedup)
  • Cost increased 40% due to larger cluster requirements
  • New problem: Spark job failures due to memory pressure

Optimization Phase (2016-2017)

Key Changes:

  1. Broadcast Joins: Identified that “listings” table (5GB) was being shuffled repeatedly. Converted to broadcast join.
    • Result: 2-hour jobs reduced to 30 minutes
  2. Partition Tuning: Reduced shuffle partitions from default 200 to 50 for smaller intermediate datasets.
    • Result: Eliminated 1000s of small file writes
  3. Data Format: Migrated from JSON to Parquet.
    • Result: Storage reduced by 70%, read performance improved 5x

Current State (2023): Hybrid Architecture

  • Batch: EMR with Spark for daily feature engineering (100+ TB processed daily)
  • Streaming: Flink for real-time pricing updates (sub-second latency)
  • ML Inference: Ray for batch prediction (embedding generation for search)

Key Metrics:

  • Data processing cost: $200k/month (down from $800k/month in 2015)
  • Data scientist productivity: 5x improvement (measured by experiment velocity)
  • Feature freshness: From daily to hourly for critical features

3.3.8. Troubleshooting Guide

Problem: Job Stuck in “RUNNING” with No Progress

Diagnosis:

# Check Spark UI
# Look at "Stages" tab
# If one stage is at 99% complete for >30 mins, there's a straggler

# Identify the straggler task
# In Spark UI > Stages > Task Metrics
# Sort by "Duration" - the slowest task is the culprit

Common Causes:

  1. Data Skew: One partition has 10x more data
  2. Resource Starvation: Other jobs are consuming cluster resources
  3. Network Issues: Slow network to S3/GCS

Solutions:

  1. Enable Adaptive Query Execution (handles skew automatically)
  2. Kill competing jobs or increase cluster size
  3. Check S3/GCS request metrics for throttling

Problem: “Py4JNetworkError” in PySpark

Symptom:

py4j.protocol.Py4JNetworkError: Error while sending or receiving

Cause: Python worker process crashed or timed out communicating with JVM.

Common Triggers:

  • Python function raising exception
  • Python dependency not installed on all workers
  • Memory leak in Python UDF

Solution:

# Add error handling in UDFs
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

@udf(returnType=StringType())
def safe_process(value):
    try:
        return complex_processing(value)
    except Exception as e:
        # Log error but don't crash
        return f"ERROR: {str(e)}"

Problem: Dataflow Job Stuck in “Draining”

Symptom: Dataflow streaming job won’t cancel, stuck in “Draining” state for hours.

Cause: Pipeline is waiting for in-flight elements to complete. Usually due to a PTransform that never completes.

Solution:

# Force cancel (data loss possible)
gcloud dataflow jobs cancel JOB_ID --force --region us-central1

# Better: Fix the pipeline
# Check for:
# 1. Stateful transforms that accumulate unbounded state
# 2. External API calls that time out
# 3. Missing watermark advancement

3.3.9. Best Practices Summary

  1. Start with Parquet: Always use columnar formats (Parquet, ORC) for intermediate data. JSON/CSV are for ingestion only.

  2. Partition Strategically: Partition by common query patterns (date/time, region, category). Avoid over-partitioning (<1GB per partition).

  3. Monitor Resource Utilization: Track CPU, memory, disk, network. Identify bottlenecks before they become outages.

  4. Test at Scale: Don’t just test on 1GB samples. Test on 10% of production data to catch performance issues.

  5. Separate Concerns: Use different clusters for experimentation vs. production. Don’t let ad-hoc queries slow down critical pipelines.

  6. Version Your Code: Use git tags for production pipelines. Know exactly what code ran when things break.

  7. Document Tuning Decisions: When you change spark.sql.shuffle.partitions, write a comment explaining why. Future you will thank you.

  8. Fail Fast: Add data quality checks early in the pipeline. Better to fail fast than process garbage for 3 hours.


3.3.10. Exercises for the Reader

Exercise 1: Join Optimization Take an existing Spark join in your codebase. Measure its execution time. Apply broadcast join optimization. Measure again. Calculate cost savings.

Exercise 2: Partitioning Analysis Query your Parquet data lake. How many partitions does a typical query scan? If > 1000, consider repartitioning by query patterns.

Exercise 3: Cost Attribution For one week, tag all EMR/Dataflow jobs with cost center tags. Generate a report of cost by team/project. Identify the top 3 most expensive jobs.

Exercise 4: Failure Simulation In a test environment, kill a worker node during a Spark job. Observe recovery behavior. How long until recovery? Is data lost?

Exercise 5: Data Skew Detection Write a Spark job that detects data skew automatically. For each key, compute: mean records per key, max records per key. Alert if max > 10x mean.


3.3.11. Decision Matrix: Selecting the Engine

FeatureAWS EMR (Spark)AWS GlueGCP DataflowRay (KubeRay)
Primary Use CaseMassive Historical Batch Processing (Petabytes)Simple, irregular ETL tasksComplex Streaming & Unified Batch/StreamGenAI, LLM Fine-tuning, Reinforcement Learning
LanguagePython (PySpark), Scala, SQLPython, ScalaPython, JavaPython Native
LatencyMinutes (Batch)Minutes (Cold Start)Seconds to Sub-secondMilliseconds
Ops ComplexityHigh (Cluster Tuning)Low (Serverless)Medium (Managed Service)High (Kubernetes Mgmt)
CostLow (if using Spot)High (Premium pricing)MediumMedium (GPU costs)
Best For MLOps?Data Prep (Pre-training)Ad-hoc ScriptsReal-time FeaturesDeep Learning Jobs

The Architect’s Verdict

  1. Standardize on Parquet: Regardless of the engine, store your intermediate data in Parquet on S3/GCS. This allows you to switch engines later (e.g., write with Spark, read with Ray) without migration.
  2. Separate Compute: Do not use your Training Cluster (Ray/Slurm) for Data Processing (Spark). GPUs are too expensive to be used for parsing JSON logs.
  3. Code for the Future: If you are building a new platform today, lean heavily towards Ray for Python-centric ML workflows, and Dataflow/Flink for streaming features. The dominance of Spark is waning in the GenAI era.

1. Serverless Spark

Both AWS (EMR Serverless) and GCP (Dataproc Serverless) are pushing toward “Spark without clusters.”

Benefits:

  • No cluster management
  • Auto-scaling from 0 to 1000s of workers
  • Pay only for execution time (second-level billing)

Trade-offs:

  • Higher per-vCPU cost (~2x on-demand)
  • Less control over instance types and networking
  • Cold start latency (30-60 seconds)

Recommendation: Use serverless for sporadic, unpredictable workloads. Use managed clusters for continuous production pipelines.

2. SQL-First Data Processing

The rise of tools like dbt (data build tool) is pushing toward “SQL as the processing layer.”

Instead of writing PySpark:

df = spark.read.parquet("events")
df.filter(col("date") > "2024-01-01").groupBy("user_id").agg(...)

You write SQL:

-- models/user_features.sql
SELECT
    user_id,
    COUNT(*) as event_count,
    MAX(timestamp) as last_event
FROM {{ ref('events') }}
WHERE date > '2024-01-01'
GROUP BY user_id

dbt compiles this to optimized Spark/Trino/BigQuery and handles dependencies, testing, and documentation.

Trend: Data Scientists prefer SQL over Scala/Java. Tools that hide the complexity of distributed computing behind SQL will win.

3. In-Database ML

Running ML training directly in the data warehouse (BigQuery ML, Snowflake ML, Redshift ML) eliminates data movement.

Example:

-- Train a linear regression model in BigQuery
CREATE MODEL my_dataset.sales_model
OPTIONS(model_type='linear_reg', input_label_cols=['sales']) AS
SELECT
    price,
    marketing_spend,
    seasonality,
    sales
FROM my_dataset.historical_sales;

-- Predict
SELECT predicted_sales
FROM ML.PREDICT(MODEL my_dataset.sales_model, TABLE my_dataset.new_data);

Limitation: Only supports basic ML algorithms (linear regression, XGBoost, AutoML). For deep learning, still need to export data.

Trend: For simple ML (churn prediction, fraud detection with tabular data), in-database ML reduces engineering overhead by 90%.

4. GPU-Native Processing

The next frontier: Processing engines that natively support GPU acceleration.

RAPIDS (NVIDIA):

  • cuDF: Pandas-like API on GPUs
  • cuML: Scikit-learn-like API on GPUs
  • Spark RAPIDS: GPU-accelerated Spark operations

Performance: 10-50x speedup for operations like joins, sorts, and aggregations on GPUs vs. CPUs.

Cost: A100 instance costs 10x more than CPU instance. Only pays off for compute-heavy operations (string parsing, regex, complex aggregations).


3.3.13. Implementation Checklist

Before deploying a production data processing pipeline:

Pre-Deployment:

  • Tested on 10% of production data volume
  • Configured monitoring (job duration, rows processed, cost per run)
  • Set up alerting (job failure, duration > 2x baseline)
  • Documented cluster configuration and tuning parameters
  • Implemented error handling and retry logic
  • Tagged resources for cost attribution
  • Estimated monthly cost at full production load

Post-Deployment:

  • Monitored first 5 runs for anomalies
  • Validated output data quality (row counts, schema, null rates)
  • Reviewed Spark UI / Dataflow metrics for bottlenecks
  • Created runbook for common issues
  • Established SLA (e.g., “Job must complete within 2 hours”)
  • Scheduled regular cost reviews (monthly)

Optimization Phase:

  • Applied broadcast join optimizations where applicable
  • Tuned partitioning strategy based on query patterns
  • Enabled caching for frequently accessed intermediate results
  • Migrated to spot/preemptible instances for non-critical workloads
  • Implemented data quality checks and early failure detection
  • Documented all performance tuning decisions

3.3.14. Summary: The Heavy Lifters

Processing engines are the workhorses of MLOps. They transform raw logs into training datasets, power real-time feature computation, and enable experimentation at scale.

Key Takeaways:

  1. Physics First: Understand the fundamental constraints (shuffle, serialization, state management) before choosing tools. These constraints apply to all engines.

  2. Match Tool to Use Case:

    • Spark/EMR: Massive batch ETL, cost-sensitive workloads
    • Beam/Dataflow: Complex streaming, unified batch/stream
    • Ray: GPU-bound ML workloads, GenAI pipelines
  3. Cost is a First-Class Concern: A poorly optimized join can cost $100k/year. Invest time in understanding broadcast joins, partition tuning, and spot instances.

  4. Start Simple, Optimize Pragmatically: Don’t prematurely optimize. Start with default settings, measure performance, identify bottlenecks, then tune. Most “performance issues” are actually “wrong algorithm” issues (O(n²) when O(n log n) was available).

  5. Observability is Non-Negotiable: If you can’t measure it, you can’t improve it. Instrument your pipelines with custom metrics. Track cost per job, rows processed per dollar, job duration trends.

  6. Embrace the Lakehouse: Use open table formats (Parquet, Iceberg, Delta) as your intermediate storage layer. This gives you flexibility to switch processing engines without data migration.

  7. Test Failure Scenarios: Your pipeline will fail. Test how it recovers. Can it resume from checkpoint? Does it create duplicate data? Does it alert the right people?

  8. The Future is Serverless and GPU-Native: The industry is moving toward “processing without clusters” and “GPU-accelerated everything.” Build your platform to be portable (avoid vendor lock-in).

The Meta-Lesson:

Processing engines are not the differentiator. Your competitors use Spark too. The differentiator is:

  • How fast can your data scientists iterate (experimentation velocity)
  • How reliably do your pipelines run (uptime SLA)
  • How efficiently do you use compute (cost per prediction)

Optimize for these outcomes, not for “using the latest technology.”


In Part IV, we shift from Data to Models: how to train, tune, serve, and monitor machine learning models at production scale—without losing your mind or your budget.