Keyboard shortcuts

Press or to navigate between chapters

Press ? to show this help

Press Esc to hide this help

43.3. Cost Optimization & Spot Instances

Note

The Cloud Bill Shock: A Data Scientist spins up a p4d.24xlarge ($32/hour) to test a model. They go home for the weekend. Monday Morning Bill: $1,536. Scale that to 10 scientists = $15k wasted.


43.3.1. The Economics of Cloud ML

Cost Structure Breakdown

Cost CategoryTypical % of ML BillOptimization Potential
Compute (GPU)50-70%High (Spot, right-sizing)
Storage15-25%Medium (lifecycle policies)
Data Transfer5-15%Medium (region placement)
Managed Services5-10%Low (negotiation)

FinOps Maturity Model

LevelCharacteristicTools
0 - CrawlNo visibilityNone
1 - WalkCost reportsAWS Cost Explorer
2 - RunTagging + allocationKubecost, Infracost
3 - FlyPredictive optimizationSpot.io, Cast AI
graph TB
    A[Engineer Request] --> B{Cost Gate}
    B -->|< $100| C[Auto-approve]
    B -->|$100-$1000| D[Manager Approval]
    B -->|> $1000| E[FinOps Review]
    
    C --> F[Provision]
    D --> F
    E --> F
    
    F --> G[Tag Enforcement]
    G --> H[Running Resource]
    H --> I[Cost Anomaly Detection]

CI/CD Cost Integration with Infracost

# .github/workflows/cost-check.yaml
name: Terraform Cost Check

on:
  pull_request:
    paths:
      - 'terraform/**'

jobs:
  infracost:
    runs-on: ubuntu-latest
    
    steps:
      - uses: actions/checkout@v4
      
      - name: Setup Infracost
        uses: infracost/actions/setup@v2
        with:
          api-key: ${{ secrets.INFRACOST_API_KEY }}
      
      - name: Generate cost breakdown
        run: |
          infracost breakdown --path=terraform/ \
            --format=json \
            --out-file=/tmp/infracost.json
      
      - name: Post comment
        uses: infracost/actions/comment@v1
        with:
          path: /tmp/infracost.json
          behavior: update
      
      - name: Check cost threshold
        run: |
          MONTHLY=$(jq '.totalMonthlyCost | tonumber' /tmp/infracost.json)
          if (( $(echo "$MONTHLY > 10000" | bc -l) )); then
            echo "::error::Estimated monthly cost $MONTHLY exceeds $10,000 threshold"
            exit 1
          fi

43.3.2. Spot Instance Economics

Cloud providers sell spare capacity at 60-90% discount. The tradeoff: 2-minute termination notice.

Probability of Interruption

$$ P(I) \propto \frac{1}{D \times S} $$

VariableDefinitionImpact
DPool depth (available instances)Higher = fewer interruptions
SSpot price stabilityHigher = fewer interruptions
P(I)Probability of interruptionLower = safer

Instance Interruption Rates by Type

Instance FamilyAgeTypical Interruption RateRecommendation
p2.xlargeOld<5%✅ Very safe
p3.2xlargeMedium5-10%✅ Safe
g4dn.xlargePopular10-15%⚠️ Diversify
g5.xlargeNew/Hot15-25%⚠️ Use fallback
p4d.24xlargeNew20-30%❌ On-demand for critical

Allocation Strategies

StrategyDescriptionBest For
lowest-priceCheapest pools firstCost-only optimization
capacity-optimizedDeepest pools firstWorkload reliability
diversifiedSpread across poolsBalanced approach
price-capacity-optimizedBlend of price + depthRecommended default
from dataclasses import dataclass
from typing import List, Dict, Optional
import boto3
from datetime import datetime, timedelta

@dataclass
class SpotPriceHistory:
    instance_type: str
    availability_zone: str
    current_price: float
    avg_price_24h: float
    max_price_24h: float
    interruption_rate: float  # Estimated

class SpotAdvisor:
    """Analyze Spot pricing and recommend instances."""
    
    # Historical interruption rates (approximate)
    INTERRUPTION_RATES = {
        "p2.xlarge": 0.05,
        "p3.2xlarge": 0.08,
        "g4dn.xlarge": 0.12,
        "g4dn.2xlarge": 0.10,
        "g5.xlarge": 0.18,
        "g5.2xlarge": 0.15,
        "p4d.24xlarge": 0.25,
    }
    
    def __init__(self, region: str = "us-east-1"):
        self.ec2 = boto3.client("ec2", region_name=region)
        self.region = region
    
    def get_spot_prices(
        self, 
        instance_types: List[str],
        hours_back: int = 24
    ) -> Dict[str, SpotPriceHistory]:
        """Get Spot price history for instance types."""
        
        end_time = datetime.utcnow()
        start_time = end_time - timedelta(hours=hours_back)
        
        response = self.ec2.describe_spot_price_history(
            InstanceTypes=instance_types,
            ProductDescriptions=["Linux/UNIX"],
            StartTime=start_time,
            EndTime=end_time
        )
        
        # Aggregate by instance type
        prices_by_type: Dict[str, List[float]] = {}
        latest_by_type: Dict[str, tuple] = {}  # (price, az)
        
        for record in response["SpotPriceHistory"]:
            itype = record["InstanceType"]
            price = float(record["SpotPrice"])
            az = record["AvailabilityZone"]
            timestamp = record["Timestamp"]
            
            if itype not in prices_by_type:
                prices_by_type[itype] = []
            prices_by_type[itype].append(price)
            
            if itype not in latest_by_type or timestamp > latest_by_type[itype][2]:
                latest_by_type[itype] = (price, az, timestamp)
        
        result = {}
        for itype in instance_types:
            prices = prices_by_type.get(itype, [0])
            latest = latest_by_type.get(itype, (0, "unknown", None))
            
            result[itype] = SpotPriceHistory(
                instance_type=itype,
                availability_zone=latest[1],
                current_price=latest[0],
                avg_price_24h=sum(prices) / len(prices) if prices else 0,
                max_price_24h=max(prices) if prices else 0,
                interruption_rate=self.INTERRUPTION_RATES.get(itype, 0.20)
            )
        
        return result
    
    def recommend(
        self,
        min_gpu_memory: int,
        min_vcpus: int,
        max_price_per_hour: float,
        prefer_stability: bool = True
    ) -> List[dict]:
        """Recommend Spot instances based on requirements."""
        
        # GPU instance specs (simplified)
        GPU_SPECS = {
            "g4dn.xlarge": {"gpu_mem": 16, "vcpus": 4, "on_demand": 0.526},
            "g4dn.2xlarge": {"gpu_mem": 16, "vcpus": 8, "on_demand": 0.752},
            "g5.xlarge": {"gpu_mem": 24, "vcpus": 4, "on_demand": 1.006},
            "g5.2xlarge": {"gpu_mem": 24, "vcpus": 8, "on_demand": 1.212},
            "p3.2xlarge": {"gpu_mem": 16, "vcpus": 8, "on_demand": 3.06},
            "p4d.24xlarge": {"gpu_mem": 320, "vcpus": 96, "on_demand": 32.77},
        }
        
        candidates = [
            itype for itype, specs in GPU_SPECS.items()
            if specs["gpu_mem"] >= min_gpu_memory and specs["vcpus"] >= min_vcpus
        ]
        
        prices = self.get_spot_prices(candidates)
        
        recommendations = []
        for itype, price_info in prices.items():
            if price_info.current_price > max_price_per_hour:
                continue
            
            on_demand = GPU_SPECS[itype]["on_demand"]
            savings = (1 - price_info.current_price / on_demand) * 100
            
            # Score: balance price and stability
            if prefer_stability:
                score = (1 - price_info.interruption_rate) * 0.6 + (savings / 100) * 0.4
            else:
                score = (savings / 100) * 0.8 + (1 - price_info.interruption_rate) * 0.2
            
            recommendations.append({
                "instance_type": itype,
                "spot_price": round(price_info.current_price, 4),
                "on_demand_price": on_demand,
                "savings_percent": round(savings, 1),
                "interruption_rate": round(price_info.interruption_rate * 100, 1),
                "score": round(score, 3),
                "availability_zone": price_info.availability_zone
            })
        
        return sorted(recommendations, key=lambda x: -x["score"])
    
    def get_savings_report(self, instance_type: str, hours_used: float) -> dict:
        """Calculate actual savings from Spot usage."""
        prices = self.get_spot_prices([instance_type])
        price_info = prices.get(instance_type)
        
        if not price_info:
            return {"error": "Instance type not found"}
        
        GPU_SPECS = {
            "g4dn.xlarge": 0.526,
            "g4dn.2xlarge": 0.752,
            "g5.xlarge": 1.006,
            "p3.2xlarge": 3.06,
        }
        
        on_demand = GPU_SPECS.get(instance_type, price_info.current_price * 3)
        
        spot_cost = price_info.avg_price_24h * hours_used
        on_demand_cost = on_demand * hours_used
        savings = on_demand_cost - spot_cost
        
        return {
            "instance_type": instance_type,
            "hours_used": hours_used,
            "spot_cost": round(spot_cost, 2),
            "on_demand_equivalent": round(on_demand_cost, 2),
            "savings": round(savings, 2),
            "savings_percent": round((savings / on_demand_cost) * 100, 1)
        }


# Usage
advisor = SpotAdvisor()
recommendations = advisor.recommend(
    min_gpu_memory=16,
    min_vcpus=4,
    max_price_per_hour=1.0,
    prefer_stability=True
)

43.3.3. Karpenter: Next-Gen Kubernetes Autoscaling

Cluster Autoscaler is slow—it waits for pending pods. Karpenter proactively provisions nodes in seconds.

Karpenter vs Cluster Autoscaler

FeatureCluster AutoscalerKarpenter
ProvisioningVia ASG (slow)Direct EC2 API (fast)
Node GroupsRequiredNot needed
Instance selectionPre-defined in ASGDynamic per pod
Spot HandlingBasicNative with fallback
ConsolidationManualAutomatic
GPU Support✓ with better selection

Production Karpenter Configuration

# karpenter/nodepool.yaml
apiVersion: karpenter.sh/v1beta1
kind: NodePool
metadata:
  name: gpu-training
spec:
  template:
    metadata:
      labels:
        workload-type: gpu-training
    spec:
      requirements:
        # GPU families
        - key: "karpenter.k8s.aws/instance-category"
          operator: In
          values: ["g", "p"]
        
        # Specific types for training
        - key: "node.kubernetes.io/instance-type"
          operator: In
          values: 
            - "g4dn.xlarge"
            - "g4dn.2xlarge"
            - "g5.xlarge"
            - "g5.2xlarge"
            - "p3.2xlarge"
        
        # Prefer Spot, fallback to On-Demand
        - key: "karpenter.sh/capacity-type"
          operator: In
          values: ["spot", "on-demand"]
        
        # Architecture
        - key: "kubernetes.io/arch"
          operator: In
          values: ["amd64"]
      
      nodeClassRef:
        name: gpu-nodes
      
      # Expiry for node rotation
      expireAfter: 720h  # 30 days
  
  # Resource limits
  limits:
    cpu: 1000
    memory: 4000Gi
    nvidia.com/gpu: 32
  
  # Disruption settings
  disruption:
    consolidationPolicy: WhenUnderutilized
    consolidateAfter: 30s
    budgets:
      - nodes: "10%"

---
apiVersion: karpenter.k8s.aws/v1beta1
kind: EC2NodeClass
metadata:
  name: gpu-nodes
spec:
  amiFamily: AL2
  
  subnetSelectorTerms:
    - tags:
        karpenter.sh/discovery: "ml-cluster"
  
  securityGroupSelectorTerms:
    - tags:
        karpenter.sh/discovery: "ml-cluster"
  
  instanceProfile: KarpenterNodeInstanceProfile
  
  # GPU-specific settings
  blockDeviceMappings:
    - deviceName: /dev/xvda
      ebs:
        volumeSize: 200Gi
        volumeType: gp3
        iops: 10000
        throughput: 500
        deleteOnTermination: true
  
  # Metadata options
  metadataOptions:
    httpEndpoint: enabled
    httpProtocolIPv6: disabled
    httpPutResponseHopLimit: 2
    httpTokens: required
  
  tags:
    Environment: production
    ManagedBy: karpenter

Terraform for Karpenter

# karpenter.tf

module "karpenter" {
  source  = "terraform-aws-modules/eks/aws//modules/karpenter"
  version = "~> 19.0"
  
  cluster_name           = module.eks.cluster_name
  irsa_oidc_provider_arn = module.eks.oidc_provider_arn
  
  # Create IAM roles
  create_iam_role = true
  iam_role_name   = "KarpenterController-${var.cluster_name}"
  
  # Node IAM role
  create_node_iam_role = true
  node_iam_role_name   = "KarpenterNode-${var.cluster_name}"
  
  node_iam_role_additional_policies = {
    AmazonSSMManagedInstanceCore = "arn:aws:iam::aws:policy/AmazonSSMManagedInstanceCore"
  }
  
  tags = var.tags
}

resource "helm_release" "karpenter" {
  namespace        = "karpenter"
  create_namespace = true
  name             = "karpenter"
  repository       = "oci://public.ecr.aws/karpenter"
  chart            = "karpenter"
  version          = "v0.32.0"
  
  set {
    name  = "settings.clusterName"
    value = module.eks.cluster_name
  }
  
  set {
    name  = "settings.clusterEndpoint"
    value = module.eks.cluster_endpoint
  }
  
  set {
    name  = "serviceAccount.annotations.eks\\.amazonaws\\.com/role-arn"
    value = module.karpenter.iam_role_arn
  }
  
  set {
    name  = "settings.interruptionQueue"
    value = module.karpenter.queue_name
  }
}

43.3.4. Graceful Interruption Handling

When AWS reclaims a Spot instance, you have exactly 2 minutes to checkpoint.

Signal Handler Pattern

import signal
import sys
import os
import time
import threading
from typing import Callable, Optional
from dataclasses import dataclass
import requests
import torch

@dataclass
class CheckpointConfig:
    checkpoint_dir: str
    s3_bucket: str
    checkpoint_interval_epochs: int = 10
    async_upload: bool = True

class SpotTerminationHandler:
    """Handle Spot instance termination gracefully."""
    
    METADATA_URL = "http://169.254.169.254/latest/meta-data/spot/instance-action"
    POLL_INTERVAL = 5  # seconds
    
    def __init__(
        self,
        checkpoint_fn: Callable,
        config: CheckpointConfig
    ):
        self.checkpoint_fn = checkpoint_fn
        self.config = config
        self.terminating = False
        self._setup_signal_handlers()
        self._start_metadata_monitor()
    
    def _setup_signal_handlers(self):
        """Register signal handlers."""
        signal.signal(signal.SIGTERM, self._handle_signal)
        signal.signal(signal.SIGINT, self._handle_signal)
    
    def _handle_signal(self, signum, frame):
        """Handle termination signal."""
        print(f"Received signal {signum}, initiating graceful shutdown...")
        self.terminating = True
    
    def _start_metadata_monitor(self):
        """Start background thread to poll instance metadata."""
        def monitor():
            while not self.terminating:
                try:
                    response = requests.get(
                        self.METADATA_URL,
                        timeout=1,
                        headers={"X-aws-ec2-metadata-token-ttl-seconds": "21600"}
                    )
                    if response.status_code == 200:
                        print(f"Spot interruption notice: {response.json()}")
                        self.terminating = True
                        break
                except requests.exceptions.RequestException:
                    pass  # No termination notice
                
                time.sleep(self.POLL_INTERVAL)
        
        thread = threading.Thread(target=monitor, daemon=True)
        thread.start()
    
    def should_stop(self) -> bool:
        """Check if training should stop."""
        return self.terminating
    
    def checkpoint_and_exit(self, state: dict):
        """Save checkpoint and exit cleanly."""
        print("Saving emergency checkpoint...")
        
        # Save locally first
        local_path = os.path.join(
            self.config.checkpoint_dir,
            "emergency_checkpoint.pt"
        )
        torch.save(state, local_path)
        
        # Upload to S3
        self._upload_to_s3(local_path)
        
        print("Checkpoint saved. Exiting.")
        sys.exit(0)
    
    def _upload_to_s3(self, local_path: str):
        """Upload checkpoint to S3."""
        import boto3
        
        s3 = boto3.client("s3")
        key = f"checkpoints/{os.path.basename(local_path)}"
        
        s3.upload_file(local_path, self.config.s3_bucket, key)
        print(f"Uploaded to s3://{self.config.s3_bucket}/{key}")


class ResilientTrainer:
    """Training loop with Spot interruption handling."""
    
    def __init__(
        self,
        model: torch.nn.Module,
        optimizer: torch.optim.Optimizer,
        config: CheckpointConfig
    ):
        self.model = model
        self.optimizer = optimizer
        self.config = config
        self.current_epoch = 0
        
        self.handler = SpotTerminationHandler(
            checkpoint_fn=self.save_checkpoint,
            config=config
        )
        
        # Try to resume from checkpoint
        self._maybe_resume()
    
    def _maybe_resume(self):
        """Resume from checkpoint if available."""
        checkpoint_path = os.path.join(
            self.config.checkpoint_dir,
            "latest_checkpoint.pt"
        )
        
        if os.path.exists(checkpoint_path):
            print(f"Resuming from {checkpoint_path}")
            checkpoint = torch.load(checkpoint_path)
            
            self.model.load_state_dict(checkpoint["model_state"])
            self.optimizer.load_state_dict(checkpoint["optimizer_state"])
            self.current_epoch = checkpoint["epoch"]
            
            print(f"Resumed from epoch {self.current_epoch}")
    
    def save_checkpoint(self, epoch: Optional[int] = None):
        """Save training checkpoint."""
        if epoch is None:
            epoch = self.current_epoch
        
        checkpoint = {
            "epoch": epoch,
            "model_state": self.model.state_dict(),
            "optimizer_state": self.optimizer.state_dict(),
            "timestamp": time.time()
        }
        
        # Atomic save with temp file
        temp_path = os.path.join(self.config.checkpoint_dir, "checkpoint_temp.pt")
        final_path = os.path.join(self.config.checkpoint_dir, "latest_checkpoint.pt")
        
        torch.save(checkpoint, temp_path)
        os.rename(temp_path, final_path)
        
        print(f"Checkpoint saved for epoch {epoch}")
    
    def train(self, dataloader, epochs: int):
        """Main training loop with interruption handling."""
        
        for epoch in range(self.current_epoch, epochs):
            self.current_epoch = epoch
            
            # Check for interruption before each epoch
            if self.handler.should_stop():
                self.handler.checkpoint_and_exit({
                    "epoch": epoch,
                    "model_state": self.model.state_dict(),
                    "optimizer_state": self.optimizer.state_dict()
                })
            
            # Training epoch
            for batch_idx, (data, target) in enumerate(dataloader):
                # Mini-batch training
                self.optimizer.zero_grad()
                output = self.model(data)
                loss = torch.nn.functional.cross_entropy(output, target)
                loss.backward()
                self.optimizer.step()
                
                # Check interruption within epoch
                if batch_idx % 100 == 0 and self.handler.should_stop():
                    self.handler.checkpoint_and_exit({
                        "epoch": epoch,
                        "batch": batch_idx,
                        "model_state": self.model.state_dict(),
                        "optimizer_state": self.optimizer.state_dict()
                    })
            
            # Periodic checkpoint
            if epoch % self.config.checkpoint_interval_epochs == 0:
                self.save_checkpoint(epoch)
            
            print(f"Epoch {epoch} complete")


# Usage
config = CheckpointConfig(
    checkpoint_dir="/tmp/checkpoints",
    s3_bucket="my-training-bucket",
    checkpoint_interval_epochs=5
)

model = torch.nn.Linear(100, 10)
optimizer = torch.optim.Adam(model.parameters())

trainer = ResilientTrainer(model, optimizer, config)
trainer.train(dataloader, epochs=100)

43.3.5. Mixed Instance Strategies

Diversification increases Spot availability from 50% to 99%:

# mixed-instance-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: training-workers
spec:
  replicas: 10
  selector:
    matchLabels:
      app: training-worker
  template:
    metadata:
      labels:
        app: training-worker
    spec:
      affinity:
        nodeAffinity:
          requiredDuringSchedulingIgnoredDuringExecution:
            nodeSelectorTerms:
              - matchExpressions:
                  # Accept any of these GPU types
                  - key: node.kubernetes.io/instance-type
                    operator: In
                    values:
                      - g4dn.xlarge
                      - g4dn.2xlarge
                      - g5.xlarge
                      - g5.2xlarge
                      - p3.2xlarge
        podAntiAffinity:
          preferredDuringSchedulingIgnoredDuringExecution:
            # Spread across nodes
            - weight: 100
              podAffinityTerm:
                labelSelector:
                  matchLabels:
                    app: training-worker
                topologyKey: kubernetes.io/hostname
      
      tolerations:
        - key: "nvidia.com/gpu"
          operator: "Exists"
          effect: "NoSchedule"
        - key: "spot"
          operator: "Equal"
          value: "true"
          effect: "NoSchedule"
      
      containers:
        - name: trainer
          image: training:latest
          resources:
            limits:
              nvidia.com/gpu: 1
            requests:
              nvidia.com/gpu: 1
              memory: "16Gi"
              cpu: "4"

AWS Fleet Configuration

# spot_fleet.tf

resource "aws_launch_template" "gpu_training" {
  name_prefix   = "gpu-training-"
  image_id      = data.aws_ami.gpu.id
  instance_type = "g4dn.xlarge"  # Default, overridden by fleet
  
  block_device_mappings {
    device_name = "/dev/xvda"
    ebs {
      volume_size = 200
      volume_type = "gp3"
      iops        = 10000
      throughput  = 500
    }
  }
  
  iam_instance_profile {
    name = aws_iam_instance_profile.training.name
  }
  
  tag_specifications {
    resource_type = "instance"
    tags = {
      Name        = "gpu-training-worker"
      Environment = var.environment
    }
  }
}

resource "aws_ec2_fleet" "gpu_training" {
  type = "maintain"
  
  target_capacity_specification {
    default_target_capacity_type = "spot"
    total_target_capacity        = var.worker_count
    on_demand_target_capacity    = 1  # 1 on-demand for stability
    spot_target_capacity         = var.worker_count - 1
  }
  
  launch_template_config {
    launch_template_specification {
      launch_template_id = aws_launch_template.gpu_training.id
      version            = "$Latest"
    }
    
    # Mixed instance types
    override {
      instance_type     = "g4dn.xlarge"
      weighted_capacity = 1
    }
    override {
      instance_type     = "g4dn.2xlarge"
      weighted_capacity = 2
    }
    override {
      instance_type     = "g5.xlarge"
      weighted_capacity = 1
    }
    override {
      instance_type     = "g5.2xlarge"
      weighted_capacity = 2
    }
    override {
      instance_type     = "p3.2xlarge"
      weighted_capacity = 1
    }
  }
  
  spot_options {
    allocation_strategy                 = "price-capacity-optimized"
    instance_interruption_behavior      = "terminate"
    maintenance_strategies {
      capacity_rebalance {
        replacement_strategy = "launch-before-terminate"
      }
    }
  }
  
  # Terminate instances when fleet is deleted
  terminate_instances                 = true
  terminate_instances_with_expiration = true
}

43.3.6. Storage Cost Optimization

Storage is the “silent killer”—cheap per GB but accumulates forever.

Cost Breakdown by Storage Type

StorageCost/GB/MonthUse CaseLifecycle
S3 Standard$0.023Active dataTransition after 30d
S3 IA$0.0125Infrequent accessTransition after 90d
S3 Glacier$0.004ArchiveAfter 365d
EBS gp3$0.08Attached volumesDelete on termination
EFS$0.30Shared storageExpensive! Avoid

Automated Cleanup Scripts

import boto3
from datetime import datetime, timedelta
from typing import List
from dataclasses import dataclass

@dataclass
class CleanupReport:
    orphan_volumes_deleted: int
    orphan_volumes_size_gb: int
    snapshots_deleted: int
    estimated_monthly_savings: float

class StorageCleaner:
    """Clean up orphaned storage resources."""
    
    def __init__(self, region: str = "us-east-1", dry_run: bool = True):
        self.ec2 = boto3.resource("ec2", region_name=region)
        self.ec2_client = boto3.client("ec2", region_name=region)
        self.s3 = boto3.client("s3", region_name=region)
        self.dry_run = dry_run
    
    def find_orphan_volumes(self, min_age_days: int = 7) -> List[dict]:
        """Find EBS volumes not attached to any instance."""
        cutoff = datetime.utcnow() - timedelta(days=min_age_days)
        
        orphans = []
        volumes = self.ec2.volumes.filter(
            Filters=[{"Name": "status", "Values": ["available"]}]
        )
        
        for vol in volumes:
            if vol.create_time.replace(tzinfo=None) < cutoff:
                orphans.append({
                    "volume_id": vol.id,
                    "size_gb": vol.size,
                    "created": vol.create_time.isoformat(),
                    "age_days": (datetime.utcnow() - vol.create_time.replace(tzinfo=None)).days,
                    "monthly_cost": vol.size * 0.08,
                    "tags": {t["Key"]: t["Value"] for t in (vol.tags or [])}
                })
        
        return orphans
    
    def delete_orphan_volumes(self, min_age_days: int = 7) -> CleanupReport:
        """Delete orphaned volumes older than min_age_days."""
        orphans = self.find_orphan_volumes(min_age_days)
        
        total_deleted = 0
        total_size = 0
        
        for orphan in orphans:
            print(f"{'Would delete' if self.dry_run else 'Deleting'} "
                  f"volume {orphan['volume_id']} ({orphan['size_gb']}GB)")
            
            if not self.dry_run:
                self.ec2_client.delete_volume(VolumeId=orphan["volume_id"])
            
            total_deleted += 1
            total_size += orphan["size_gb"]
        
        return CleanupReport(
            orphan_volumes_deleted=total_deleted,
            orphan_volumes_size_gb=total_size,
            snapshots_deleted=0,
            estimated_monthly_savings=total_size * 0.08
        )
    
    def find_old_snapshots(
        self, 
        min_age_days: int = 90,
        exclude_tags: List[str] = None
    ) -> List[dict]:
        """Find old EBS snapshots."""
        exclude_tags = exclude_tags or ["keep", "production"]
        cutoff = datetime.utcnow() - timedelta(days=min_age_days)
        
        snapshots = []
        response = self.ec2_client.describe_snapshots(OwnerIds=["self"])
        
        for snap in response["Snapshots"]:
            if snap["StartTime"].replace(tzinfo=None) > cutoff:
                continue
            
            tags = {t["Key"]: t["Value"] for t in snap.get("Tags", [])}
            
            # Skip if has exclude tags
            if any(tag in tags for tag in exclude_tags):
                continue
            
            snapshots.append({
                "snapshot_id": snap["SnapshotId"],
                "volume_id": snap.get("VolumeId"),
                "size_gb": snap["VolumeSize"],
                "created": snap["StartTime"].isoformat(),
                "age_days": (datetime.utcnow() - snap["StartTime"].replace(tzinfo=None)).days,
                "description": snap.get("Description", "")
            })
        
        return snapshots
    
    def cleanup_s3_checkpoints(
        self,
        bucket: str,
        prefix: str,
        keep_last_n: int = 5
    ) -> int:
        """Keep only last N checkpoints, delete older ones."""
        paginator = self.s3.get_paginator("list_objects_v2")
        
        all_objects = []
        for page in paginator.paginate(Bucket=bucket, Prefix=prefix):
            for obj in page.get("Contents", []):
                all_objects.append({
                    "Key": obj["Key"],
                    "LastModified": obj["LastModified"],
                    "Size": obj["Size"]
                })
        
        # Sort by date, newest first
        all_objects.sort(key=lambda x: x["LastModified"], reverse=True)
        
        # Delete old ones
        to_delete = all_objects[keep_last_n:]
        deleted_count = 0
        
        for obj in to_delete:
            print(f"{'Would delete' if self.dry_run else 'Deleting'} {obj['Key']}")
            
            if not self.dry_run:
                self.s3.delete_object(Bucket=bucket, Key=obj["Key"])
            
            deleted_count += 1
        
        return deleted_count


# Lambda handler for scheduled cleanup
def lambda_handler(event, context):
    cleaner = StorageCleaner(dry_run=False)
    
    # Clean orphan volumes older than 14 days
    volume_report = cleaner.delete_orphan_volumes(min_age_days=14)
    
    # Clean old checkpoints
    checkpoint_deleted = cleaner.cleanup_s3_checkpoints(
        bucket="ml-checkpoints",
        prefix="training/",
        keep_last_n=10
    )
    
    return {
        "statusCode": 200,
        "body": {
            "volumes_deleted": volume_report.orphan_volumes_deleted,
            "storage_freed_gb": volume_report.orphan_volumes_size_gb,
            "checkpoints_deleted": checkpoint_deleted,
            "monthly_savings": volume_report.estimated_monthly_savings
        }
    }

S3 Lifecycle Policy

# s3_lifecycle.tf

resource "aws_s3_bucket_lifecycle_configuration" "ml_data" {
  bucket = aws_s3_bucket.ml_data.id
  
  # Checkpoints - aggressive cleanup
  rule {
    id     = "checkpoint-cleanup"
    status = "Enabled"
    
    filter {
      prefix = "checkpoints/"
    }
    
    transition {
      days          = 7
      storage_class = "STANDARD_IA"
    }
    
    transition {
      days          = 30
      storage_class = "GLACIER"
    }
    
    expiration {
      days = 90
    }
  }
  
  # Training datasets - keep longer
  rule {
    id     = "dataset-lifecycle"
    status = "Enabled"
    
    filter {
      prefix = "datasets/"
    }
    
    transition {
      days          = 30
      storage_class = "STANDARD_IA"
    }
    
    transition {
      days          = 180
      storage_class = "GLACIER"
    }
  }
  
  # Model artifacts - preserve
  rule {
    id     = "model-lifecycle"
    status = "Enabled"
    
    filter {
      prefix = "models/"
    }
    
    transition {
      days          = 90
      storage_class = "STANDARD_IA"
    }
    
    # No expiration - models are valuable
  }
  
  # Logs - delete aggressively
  rule {
    id     = "logs-cleanup"
    status = "Enabled"
    
    filter {
      prefix = "logs/"
    }
    
    expiration {
      days = 14
    }
  }
}

43.3.7. GPU Sharing with MIG

A100 GPUs have 80GB memory. Most inference needs 10GB. MIG splits one GPU into 7.

MIG Configuration

# nvidia-mig-config.yaml
apiVersion: v1
kind: ConfigMap
metadata:
  name: nvidia-mig-config
  namespace: gpu-operator
data:
  config.yaml: |
    version: v1
    mig-configs:
      all-1g.10gb:
        - devices: all
          mig-enabled: true
          mig-devices:
            "1g.10gb": 7
      
      all-2g.20gb:
        - devices: all
          mig-enabled: true
          mig-devices:
            "2g.20gb": 3
      
      mixed-mig:
        - devices: [0]
          mig-enabled: true
          mig-devices:
            "1g.10gb": 4
            "2g.20gb": 1
        - devices: [1]
          mig-enabled: false

Cost Comparison

ScenarioHardwareCost/HourJobs ServedCost/Job
Full A1001x A100$4.101$4.10
MIG 7x1x A100 (7 slices)$4.107$0.59
7x Smaller GPUs7x T4$3.507$0.50

Verdict: MIG is optimal when you need A100 tensor cores but not full memory.


43.3.8. Budget Alerts and Governance

AWS Budget Terraform

# budgets.tf

resource "aws_budgets_budget" "ml_monthly" {
  name              = "ml-monthly-budget"
  budget_type       = "COST"
  limit_amount      = "10000"
  limit_unit        = "USD"
  time_unit         = "MONTHLY"
  
  cost_filter {
    name = "TagKeyValue"
    values = [
      "user:Team$DataScience"
    ]
  }
  
  notification {
    comparison_operator        = "GREATER_THAN"
    threshold                  = 50
    threshold_type             = "PERCENTAGE"
    notification_type          = "ACTUAL"
    subscriber_email_addresses = ["ml-team@company.com"]
  }
  
  notification {
    comparison_operator        = "GREATER_THAN"
    threshold                  = 80
    threshold_type             = "PERCENTAGE"
    notification_type          = "ACTUAL"
    subscriber_email_addresses = ["ml-lead@company.com", "finance@company.com"]
  }
  
  notification {
    comparison_operator        = "GREATER_THAN"
    threshold                  = 100
    threshold_type             = "PERCENTAGE"
    notification_type          = "ACTUAL"
    subscriber_email_addresses = ["cto@company.com"]
  }
  
  notification {
    comparison_operator        = "GREATER_THAN"
    threshold                  = 100
    threshold_type             = "PERCENTAGE"
    notification_type          = "FORECASTED"
    subscriber_email_addresses = ["ml-lead@company.com"]
  }
}

resource "aws_budgets_budget_action" "stop_instances" {
  budget_name        = aws_budgets_budget.ml_monthly.name
  action_type        = "RUN_SSM_DOCUMENTS"
  approval_model     = "AUTOMATIC"
  notification_type  = "ACTUAL"
  
  action_threshold {
    action_threshold_type  = "PERCENTAGE"
    action_threshold_value = 120
  }
  
  definition {
    ssm_action_definition {
      action_sub_type = "STOP_EC2_INSTANCES"
      region          = var.region
      instance_ids    = []  # Will stop tagged instances
    }
  }
  
  execution_role_arn = aws_iam_role.budget_action.arn
  
  subscriber {
    subscription_type = "EMAIL"
    address           = "emergency@company.com"
  }
}

43.3.9. Summary Checklist

CategoryActionPrioritySavings
SpotUse price-capacity-optimized allocationCritical60-90%
SpotImplement graceful checkpointingCriticalPrevents data loss
SpotDiversify across 4+ instance typesHighReduces interruptions
AutoscalingDeploy Karpenter over Cluster AutoscalerHighFaster scaling
StorageSet S3 lifecycle policiesHigh50-80% on old data
StorageWeekly orphan volume cleanupMediumVariable
GovernanceEnable Infracost in CI/CDHighPrevents surprises
GovernanceSet budget alerts at 50/80/100%CriticalVisibility
GPUUse MIG for inference workloadsMedium7x efficiency
TaggingEnforce Team/Project tagsHighCost allocation

Quick Decision Matrix

Workload TypeSpot Safe?Recommended InstanceFallback
Training (long)⚠️ With checkpointsp3, g5On-demand
Training (short)g4dn, g5Different AZ
Inference (batch)g4dn, T4On-demand queue
Inference (real-time)On-demand or reservedN/A
Dev/ExperimentsSpot onlyWait

[End of Section 43.3]