43.3. Cost Optimization & Spot Instances
Note
The Cloud Bill Shock: A Data Scientist spins up a
p4d.24xlarge($32/hour) to test a model. They go home for the weekend. Monday Morning Bill: $1,536. Scale that to 10 scientists = $15k wasted.
43.3.1. The Economics of Cloud ML
Cost Structure Breakdown
| Cost Category | Typical % of ML Bill | Optimization Potential |
|---|---|---|
| Compute (GPU) | 50-70% | High (Spot, right-sizing) |
| Storage | 15-25% | Medium (lifecycle policies) |
| Data Transfer | 5-15% | Medium (region placement) |
| Managed Services | 5-10% | Low (negotiation) |
FinOps Maturity Model
| Level | Characteristic | Tools |
|---|---|---|
| 0 - Crawl | No visibility | None |
| 1 - Walk | Cost reports | AWS Cost Explorer |
| 2 - Run | Tagging + allocation | Kubecost, Infracost |
| 3 - Fly | Predictive optimization | Spot.io, Cast AI |
graph TB
A[Engineer Request] --> B{Cost Gate}
B -->|< $100| C[Auto-approve]
B -->|$100-$1000| D[Manager Approval]
B -->|> $1000| E[FinOps Review]
C --> F[Provision]
D --> F
E --> F
F --> G[Tag Enforcement]
G --> H[Running Resource]
H --> I[Cost Anomaly Detection]
CI/CD Cost Integration with Infracost
# .github/workflows/cost-check.yaml
name: Terraform Cost Check
on:
pull_request:
paths:
- 'terraform/**'
jobs:
infracost:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Setup Infracost
uses: infracost/actions/setup@v2
with:
api-key: ${{ secrets.INFRACOST_API_KEY }}
- name: Generate cost breakdown
run: |
infracost breakdown --path=terraform/ \
--format=json \
--out-file=/tmp/infracost.json
- name: Post comment
uses: infracost/actions/comment@v1
with:
path: /tmp/infracost.json
behavior: update
- name: Check cost threshold
run: |
MONTHLY=$(jq '.totalMonthlyCost | tonumber' /tmp/infracost.json)
if (( $(echo "$MONTHLY > 10000" | bc -l) )); then
echo "::error::Estimated monthly cost $MONTHLY exceeds $10,000 threshold"
exit 1
fi
43.3.2. Spot Instance Economics
Cloud providers sell spare capacity at 60-90% discount. The tradeoff: 2-minute termination notice.
Probability of Interruption
$$ P(I) \propto \frac{1}{D \times S} $$
| Variable | Definition | Impact |
|---|---|---|
| D | Pool depth (available instances) | Higher = fewer interruptions |
| S | Spot price stability | Higher = fewer interruptions |
| P(I) | Probability of interruption | Lower = safer |
Instance Interruption Rates by Type
| Instance Family | Age | Typical Interruption Rate | Recommendation |
|---|---|---|---|
| p2.xlarge | Old | <5% | ✅ Very safe |
| p3.2xlarge | Medium | 5-10% | ✅ Safe |
| g4dn.xlarge | Popular | 10-15% | ⚠️ Diversify |
| g5.xlarge | New/Hot | 15-25% | ⚠️ Use fallback |
| p4d.24xlarge | New | 20-30% | ❌ On-demand for critical |
Allocation Strategies
| Strategy | Description | Best For |
|---|---|---|
| lowest-price | Cheapest pools first | Cost-only optimization |
| capacity-optimized | Deepest pools first | Workload reliability |
| diversified | Spread across pools | Balanced approach |
| price-capacity-optimized | Blend of price + depth | Recommended default |
from dataclasses import dataclass
from typing import List, Dict, Optional
import boto3
from datetime import datetime, timedelta
@dataclass
class SpotPriceHistory:
instance_type: str
availability_zone: str
current_price: float
avg_price_24h: float
max_price_24h: float
interruption_rate: float # Estimated
class SpotAdvisor:
"""Analyze Spot pricing and recommend instances."""
# Historical interruption rates (approximate)
INTERRUPTION_RATES = {
"p2.xlarge": 0.05,
"p3.2xlarge": 0.08,
"g4dn.xlarge": 0.12,
"g4dn.2xlarge": 0.10,
"g5.xlarge": 0.18,
"g5.2xlarge": 0.15,
"p4d.24xlarge": 0.25,
}
def __init__(self, region: str = "us-east-1"):
self.ec2 = boto3.client("ec2", region_name=region)
self.region = region
def get_spot_prices(
self,
instance_types: List[str],
hours_back: int = 24
) -> Dict[str, SpotPriceHistory]:
"""Get Spot price history for instance types."""
end_time = datetime.utcnow()
start_time = end_time - timedelta(hours=hours_back)
response = self.ec2.describe_spot_price_history(
InstanceTypes=instance_types,
ProductDescriptions=["Linux/UNIX"],
StartTime=start_time,
EndTime=end_time
)
# Aggregate by instance type
prices_by_type: Dict[str, List[float]] = {}
latest_by_type: Dict[str, tuple] = {} # (price, az)
for record in response["SpotPriceHistory"]:
itype = record["InstanceType"]
price = float(record["SpotPrice"])
az = record["AvailabilityZone"]
timestamp = record["Timestamp"]
if itype not in prices_by_type:
prices_by_type[itype] = []
prices_by_type[itype].append(price)
if itype not in latest_by_type or timestamp > latest_by_type[itype][2]:
latest_by_type[itype] = (price, az, timestamp)
result = {}
for itype in instance_types:
prices = prices_by_type.get(itype, [0])
latest = latest_by_type.get(itype, (0, "unknown", None))
result[itype] = SpotPriceHistory(
instance_type=itype,
availability_zone=latest[1],
current_price=latest[0],
avg_price_24h=sum(prices) / len(prices) if prices else 0,
max_price_24h=max(prices) if prices else 0,
interruption_rate=self.INTERRUPTION_RATES.get(itype, 0.20)
)
return result
def recommend(
self,
min_gpu_memory: int,
min_vcpus: int,
max_price_per_hour: float,
prefer_stability: bool = True
) -> List[dict]:
"""Recommend Spot instances based on requirements."""
# GPU instance specs (simplified)
GPU_SPECS = {
"g4dn.xlarge": {"gpu_mem": 16, "vcpus": 4, "on_demand": 0.526},
"g4dn.2xlarge": {"gpu_mem": 16, "vcpus": 8, "on_demand": 0.752},
"g5.xlarge": {"gpu_mem": 24, "vcpus": 4, "on_demand": 1.006},
"g5.2xlarge": {"gpu_mem": 24, "vcpus": 8, "on_demand": 1.212},
"p3.2xlarge": {"gpu_mem": 16, "vcpus": 8, "on_demand": 3.06},
"p4d.24xlarge": {"gpu_mem": 320, "vcpus": 96, "on_demand": 32.77},
}
candidates = [
itype for itype, specs in GPU_SPECS.items()
if specs["gpu_mem"] >= min_gpu_memory and specs["vcpus"] >= min_vcpus
]
prices = self.get_spot_prices(candidates)
recommendations = []
for itype, price_info in prices.items():
if price_info.current_price > max_price_per_hour:
continue
on_demand = GPU_SPECS[itype]["on_demand"]
savings = (1 - price_info.current_price / on_demand) * 100
# Score: balance price and stability
if prefer_stability:
score = (1 - price_info.interruption_rate) * 0.6 + (savings / 100) * 0.4
else:
score = (savings / 100) * 0.8 + (1 - price_info.interruption_rate) * 0.2
recommendations.append({
"instance_type": itype,
"spot_price": round(price_info.current_price, 4),
"on_demand_price": on_demand,
"savings_percent": round(savings, 1),
"interruption_rate": round(price_info.interruption_rate * 100, 1),
"score": round(score, 3),
"availability_zone": price_info.availability_zone
})
return sorted(recommendations, key=lambda x: -x["score"])
def get_savings_report(self, instance_type: str, hours_used: float) -> dict:
"""Calculate actual savings from Spot usage."""
prices = self.get_spot_prices([instance_type])
price_info = prices.get(instance_type)
if not price_info:
return {"error": "Instance type not found"}
GPU_SPECS = {
"g4dn.xlarge": 0.526,
"g4dn.2xlarge": 0.752,
"g5.xlarge": 1.006,
"p3.2xlarge": 3.06,
}
on_demand = GPU_SPECS.get(instance_type, price_info.current_price * 3)
spot_cost = price_info.avg_price_24h * hours_used
on_demand_cost = on_demand * hours_used
savings = on_demand_cost - spot_cost
return {
"instance_type": instance_type,
"hours_used": hours_used,
"spot_cost": round(spot_cost, 2),
"on_demand_equivalent": round(on_demand_cost, 2),
"savings": round(savings, 2),
"savings_percent": round((savings / on_demand_cost) * 100, 1)
}
# Usage
advisor = SpotAdvisor()
recommendations = advisor.recommend(
min_gpu_memory=16,
min_vcpus=4,
max_price_per_hour=1.0,
prefer_stability=True
)
43.3.3. Karpenter: Next-Gen Kubernetes Autoscaling
Cluster Autoscaler is slow—it waits for pending pods. Karpenter proactively provisions nodes in seconds.
Karpenter vs Cluster Autoscaler
| Feature | Cluster Autoscaler | Karpenter |
|---|---|---|
| Provisioning | Via ASG (slow) | Direct EC2 API (fast) |
| Node Groups | Required | Not needed |
| Instance selection | Pre-defined in ASG | Dynamic per pod |
| Spot Handling | Basic | Native with fallback |
| Consolidation | Manual | Automatic |
| GPU Support | ✓ | ✓ with better selection |
Production Karpenter Configuration
# karpenter/nodepool.yaml
apiVersion: karpenter.sh/v1beta1
kind: NodePool
metadata:
name: gpu-training
spec:
template:
metadata:
labels:
workload-type: gpu-training
spec:
requirements:
# GPU families
- key: "karpenter.k8s.aws/instance-category"
operator: In
values: ["g", "p"]
# Specific types for training
- key: "node.kubernetes.io/instance-type"
operator: In
values:
- "g4dn.xlarge"
- "g4dn.2xlarge"
- "g5.xlarge"
- "g5.2xlarge"
- "p3.2xlarge"
# Prefer Spot, fallback to On-Demand
- key: "karpenter.sh/capacity-type"
operator: In
values: ["spot", "on-demand"]
# Architecture
- key: "kubernetes.io/arch"
operator: In
values: ["amd64"]
nodeClassRef:
name: gpu-nodes
# Expiry for node rotation
expireAfter: 720h # 30 days
# Resource limits
limits:
cpu: 1000
memory: 4000Gi
nvidia.com/gpu: 32
# Disruption settings
disruption:
consolidationPolicy: WhenUnderutilized
consolidateAfter: 30s
budgets:
- nodes: "10%"
---
apiVersion: karpenter.k8s.aws/v1beta1
kind: EC2NodeClass
metadata:
name: gpu-nodes
spec:
amiFamily: AL2
subnetSelectorTerms:
- tags:
karpenter.sh/discovery: "ml-cluster"
securityGroupSelectorTerms:
- tags:
karpenter.sh/discovery: "ml-cluster"
instanceProfile: KarpenterNodeInstanceProfile
# GPU-specific settings
blockDeviceMappings:
- deviceName: /dev/xvda
ebs:
volumeSize: 200Gi
volumeType: gp3
iops: 10000
throughput: 500
deleteOnTermination: true
# Metadata options
metadataOptions:
httpEndpoint: enabled
httpProtocolIPv6: disabled
httpPutResponseHopLimit: 2
httpTokens: required
tags:
Environment: production
ManagedBy: karpenter
Terraform for Karpenter
# karpenter.tf
module "karpenter" {
source = "terraform-aws-modules/eks/aws//modules/karpenter"
version = "~> 19.0"
cluster_name = module.eks.cluster_name
irsa_oidc_provider_arn = module.eks.oidc_provider_arn
# Create IAM roles
create_iam_role = true
iam_role_name = "KarpenterController-${var.cluster_name}"
# Node IAM role
create_node_iam_role = true
node_iam_role_name = "KarpenterNode-${var.cluster_name}"
node_iam_role_additional_policies = {
AmazonSSMManagedInstanceCore = "arn:aws:iam::aws:policy/AmazonSSMManagedInstanceCore"
}
tags = var.tags
}
resource "helm_release" "karpenter" {
namespace = "karpenter"
create_namespace = true
name = "karpenter"
repository = "oci://public.ecr.aws/karpenter"
chart = "karpenter"
version = "v0.32.0"
set {
name = "settings.clusterName"
value = module.eks.cluster_name
}
set {
name = "settings.clusterEndpoint"
value = module.eks.cluster_endpoint
}
set {
name = "serviceAccount.annotations.eks\\.amazonaws\\.com/role-arn"
value = module.karpenter.iam_role_arn
}
set {
name = "settings.interruptionQueue"
value = module.karpenter.queue_name
}
}
43.3.4. Graceful Interruption Handling
When AWS reclaims a Spot instance, you have exactly 2 minutes to checkpoint.
Signal Handler Pattern
import signal
import sys
import os
import time
import threading
from typing import Callable, Optional
from dataclasses import dataclass
import requests
import torch
@dataclass
class CheckpointConfig:
checkpoint_dir: str
s3_bucket: str
checkpoint_interval_epochs: int = 10
async_upload: bool = True
class SpotTerminationHandler:
"""Handle Spot instance termination gracefully."""
METADATA_URL = "http://169.254.169.254/latest/meta-data/spot/instance-action"
POLL_INTERVAL = 5 # seconds
def __init__(
self,
checkpoint_fn: Callable,
config: CheckpointConfig
):
self.checkpoint_fn = checkpoint_fn
self.config = config
self.terminating = False
self._setup_signal_handlers()
self._start_metadata_monitor()
def _setup_signal_handlers(self):
"""Register signal handlers."""
signal.signal(signal.SIGTERM, self._handle_signal)
signal.signal(signal.SIGINT, self._handle_signal)
def _handle_signal(self, signum, frame):
"""Handle termination signal."""
print(f"Received signal {signum}, initiating graceful shutdown...")
self.terminating = True
def _start_metadata_monitor(self):
"""Start background thread to poll instance metadata."""
def monitor():
while not self.terminating:
try:
response = requests.get(
self.METADATA_URL,
timeout=1,
headers={"X-aws-ec2-metadata-token-ttl-seconds": "21600"}
)
if response.status_code == 200:
print(f"Spot interruption notice: {response.json()}")
self.terminating = True
break
except requests.exceptions.RequestException:
pass # No termination notice
time.sleep(self.POLL_INTERVAL)
thread = threading.Thread(target=monitor, daemon=True)
thread.start()
def should_stop(self) -> bool:
"""Check if training should stop."""
return self.terminating
def checkpoint_and_exit(self, state: dict):
"""Save checkpoint and exit cleanly."""
print("Saving emergency checkpoint...")
# Save locally first
local_path = os.path.join(
self.config.checkpoint_dir,
"emergency_checkpoint.pt"
)
torch.save(state, local_path)
# Upload to S3
self._upload_to_s3(local_path)
print("Checkpoint saved. Exiting.")
sys.exit(0)
def _upload_to_s3(self, local_path: str):
"""Upload checkpoint to S3."""
import boto3
s3 = boto3.client("s3")
key = f"checkpoints/{os.path.basename(local_path)}"
s3.upload_file(local_path, self.config.s3_bucket, key)
print(f"Uploaded to s3://{self.config.s3_bucket}/{key}")
class ResilientTrainer:
"""Training loop with Spot interruption handling."""
def __init__(
self,
model: torch.nn.Module,
optimizer: torch.optim.Optimizer,
config: CheckpointConfig
):
self.model = model
self.optimizer = optimizer
self.config = config
self.current_epoch = 0
self.handler = SpotTerminationHandler(
checkpoint_fn=self.save_checkpoint,
config=config
)
# Try to resume from checkpoint
self._maybe_resume()
def _maybe_resume(self):
"""Resume from checkpoint if available."""
checkpoint_path = os.path.join(
self.config.checkpoint_dir,
"latest_checkpoint.pt"
)
if os.path.exists(checkpoint_path):
print(f"Resuming from {checkpoint_path}")
checkpoint = torch.load(checkpoint_path)
self.model.load_state_dict(checkpoint["model_state"])
self.optimizer.load_state_dict(checkpoint["optimizer_state"])
self.current_epoch = checkpoint["epoch"]
print(f"Resumed from epoch {self.current_epoch}")
def save_checkpoint(self, epoch: Optional[int] = None):
"""Save training checkpoint."""
if epoch is None:
epoch = self.current_epoch
checkpoint = {
"epoch": epoch,
"model_state": self.model.state_dict(),
"optimizer_state": self.optimizer.state_dict(),
"timestamp": time.time()
}
# Atomic save with temp file
temp_path = os.path.join(self.config.checkpoint_dir, "checkpoint_temp.pt")
final_path = os.path.join(self.config.checkpoint_dir, "latest_checkpoint.pt")
torch.save(checkpoint, temp_path)
os.rename(temp_path, final_path)
print(f"Checkpoint saved for epoch {epoch}")
def train(self, dataloader, epochs: int):
"""Main training loop with interruption handling."""
for epoch in range(self.current_epoch, epochs):
self.current_epoch = epoch
# Check for interruption before each epoch
if self.handler.should_stop():
self.handler.checkpoint_and_exit({
"epoch": epoch,
"model_state": self.model.state_dict(),
"optimizer_state": self.optimizer.state_dict()
})
# Training epoch
for batch_idx, (data, target) in enumerate(dataloader):
# Mini-batch training
self.optimizer.zero_grad()
output = self.model(data)
loss = torch.nn.functional.cross_entropy(output, target)
loss.backward()
self.optimizer.step()
# Check interruption within epoch
if batch_idx % 100 == 0 and self.handler.should_stop():
self.handler.checkpoint_and_exit({
"epoch": epoch,
"batch": batch_idx,
"model_state": self.model.state_dict(),
"optimizer_state": self.optimizer.state_dict()
})
# Periodic checkpoint
if epoch % self.config.checkpoint_interval_epochs == 0:
self.save_checkpoint(epoch)
print(f"Epoch {epoch} complete")
# Usage
config = CheckpointConfig(
checkpoint_dir="/tmp/checkpoints",
s3_bucket="my-training-bucket",
checkpoint_interval_epochs=5
)
model = torch.nn.Linear(100, 10)
optimizer = torch.optim.Adam(model.parameters())
trainer = ResilientTrainer(model, optimizer, config)
trainer.train(dataloader, epochs=100)
43.3.5. Mixed Instance Strategies
Diversification increases Spot availability from 50% to 99%:
# mixed-instance-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: training-workers
spec:
replicas: 10
selector:
matchLabels:
app: training-worker
template:
metadata:
labels:
app: training-worker
spec:
affinity:
nodeAffinity:
requiredDuringSchedulingIgnoredDuringExecution:
nodeSelectorTerms:
- matchExpressions:
# Accept any of these GPU types
- key: node.kubernetes.io/instance-type
operator: In
values:
- g4dn.xlarge
- g4dn.2xlarge
- g5.xlarge
- g5.2xlarge
- p3.2xlarge
podAntiAffinity:
preferredDuringSchedulingIgnoredDuringExecution:
# Spread across nodes
- weight: 100
podAffinityTerm:
labelSelector:
matchLabels:
app: training-worker
topologyKey: kubernetes.io/hostname
tolerations:
- key: "nvidia.com/gpu"
operator: "Exists"
effect: "NoSchedule"
- key: "spot"
operator: "Equal"
value: "true"
effect: "NoSchedule"
containers:
- name: trainer
image: training:latest
resources:
limits:
nvidia.com/gpu: 1
requests:
nvidia.com/gpu: 1
memory: "16Gi"
cpu: "4"
AWS Fleet Configuration
# spot_fleet.tf
resource "aws_launch_template" "gpu_training" {
name_prefix = "gpu-training-"
image_id = data.aws_ami.gpu.id
instance_type = "g4dn.xlarge" # Default, overridden by fleet
block_device_mappings {
device_name = "/dev/xvda"
ebs {
volume_size = 200
volume_type = "gp3"
iops = 10000
throughput = 500
}
}
iam_instance_profile {
name = aws_iam_instance_profile.training.name
}
tag_specifications {
resource_type = "instance"
tags = {
Name = "gpu-training-worker"
Environment = var.environment
}
}
}
resource "aws_ec2_fleet" "gpu_training" {
type = "maintain"
target_capacity_specification {
default_target_capacity_type = "spot"
total_target_capacity = var.worker_count
on_demand_target_capacity = 1 # 1 on-demand for stability
spot_target_capacity = var.worker_count - 1
}
launch_template_config {
launch_template_specification {
launch_template_id = aws_launch_template.gpu_training.id
version = "$Latest"
}
# Mixed instance types
override {
instance_type = "g4dn.xlarge"
weighted_capacity = 1
}
override {
instance_type = "g4dn.2xlarge"
weighted_capacity = 2
}
override {
instance_type = "g5.xlarge"
weighted_capacity = 1
}
override {
instance_type = "g5.2xlarge"
weighted_capacity = 2
}
override {
instance_type = "p3.2xlarge"
weighted_capacity = 1
}
}
spot_options {
allocation_strategy = "price-capacity-optimized"
instance_interruption_behavior = "terminate"
maintenance_strategies {
capacity_rebalance {
replacement_strategy = "launch-before-terminate"
}
}
}
# Terminate instances when fleet is deleted
terminate_instances = true
terminate_instances_with_expiration = true
}
43.3.6. Storage Cost Optimization
Storage is the “silent killer”—cheap per GB but accumulates forever.
Cost Breakdown by Storage Type
| Storage | Cost/GB/Month | Use Case | Lifecycle |
|---|---|---|---|
| S3 Standard | $0.023 | Active data | Transition after 30d |
| S3 IA | $0.0125 | Infrequent access | Transition after 90d |
| S3 Glacier | $0.004 | Archive | After 365d |
| EBS gp3 | $0.08 | Attached volumes | Delete on termination |
| EFS | $0.30 | Shared storage | Expensive! Avoid |
Automated Cleanup Scripts
import boto3
from datetime import datetime, timedelta
from typing import List
from dataclasses import dataclass
@dataclass
class CleanupReport:
orphan_volumes_deleted: int
orphan_volumes_size_gb: int
snapshots_deleted: int
estimated_monthly_savings: float
class StorageCleaner:
"""Clean up orphaned storage resources."""
def __init__(self, region: str = "us-east-1", dry_run: bool = True):
self.ec2 = boto3.resource("ec2", region_name=region)
self.ec2_client = boto3.client("ec2", region_name=region)
self.s3 = boto3.client("s3", region_name=region)
self.dry_run = dry_run
def find_orphan_volumes(self, min_age_days: int = 7) -> List[dict]:
"""Find EBS volumes not attached to any instance."""
cutoff = datetime.utcnow() - timedelta(days=min_age_days)
orphans = []
volumes = self.ec2.volumes.filter(
Filters=[{"Name": "status", "Values": ["available"]}]
)
for vol in volumes:
if vol.create_time.replace(tzinfo=None) < cutoff:
orphans.append({
"volume_id": vol.id,
"size_gb": vol.size,
"created": vol.create_time.isoformat(),
"age_days": (datetime.utcnow() - vol.create_time.replace(tzinfo=None)).days,
"monthly_cost": vol.size * 0.08,
"tags": {t["Key"]: t["Value"] for t in (vol.tags or [])}
})
return orphans
def delete_orphan_volumes(self, min_age_days: int = 7) -> CleanupReport:
"""Delete orphaned volumes older than min_age_days."""
orphans = self.find_orphan_volumes(min_age_days)
total_deleted = 0
total_size = 0
for orphan in orphans:
print(f"{'Would delete' if self.dry_run else 'Deleting'} "
f"volume {orphan['volume_id']} ({orphan['size_gb']}GB)")
if not self.dry_run:
self.ec2_client.delete_volume(VolumeId=orphan["volume_id"])
total_deleted += 1
total_size += orphan["size_gb"]
return CleanupReport(
orphan_volumes_deleted=total_deleted,
orphan_volumes_size_gb=total_size,
snapshots_deleted=0,
estimated_monthly_savings=total_size * 0.08
)
def find_old_snapshots(
self,
min_age_days: int = 90,
exclude_tags: List[str] = None
) -> List[dict]:
"""Find old EBS snapshots."""
exclude_tags = exclude_tags or ["keep", "production"]
cutoff = datetime.utcnow() - timedelta(days=min_age_days)
snapshots = []
response = self.ec2_client.describe_snapshots(OwnerIds=["self"])
for snap in response["Snapshots"]:
if snap["StartTime"].replace(tzinfo=None) > cutoff:
continue
tags = {t["Key"]: t["Value"] for t in snap.get("Tags", [])}
# Skip if has exclude tags
if any(tag in tags for tag in exclude_tags):
continue
snapshots.append({
"snapshot_id": snap["SnapshotId"],
"volume_id": snap.get("VolumeId"),
"size_gb": snap["VolumeSize"],
"created": snap["StartTime"].isoformat(),
"age_days": (datetime.utcnow() - snap["StartTime"].replace(tzinfo=None)).days,
"description": snap.get("Description", "")
})
return snapshots
def cleanup_s3_checkpoints(
self,
bucket: str,
prefix: str,
keep_last_n: int = 5
) -> int:
"""Keep only last N checkpoints, delete older ones."""
paginator = self.s3.get_paginator("list_objects_v2")
all_objects = []
for page in paginator.paginate(Bucket=bucket, Prefix=prefix):
for obj in page.get("Contents", []):
all_objects.append({
"Key": obj["Key"],
"LastModified": obj["LastModified"],
"Size": obj["Size"]
})
# Sort by date, newest first
all_objects.sort(key=lambda x: x["LastModified"], reverse=True)
# Delete old ones
to_delete = all_objects[keep_last_n:]
deleted_count = 0
for obj in to_delete:
print(f"{'Would delete' if self.dry_run else 'Deleting'} {obj['Key']}")
if not self.dry_run:
self.s3.delete_object(Bucket=bucket, Key=obj["Key"])
deleted_count += 1
return deleted_count
# Lambda handler for scheduled cleanup
def lambda_handler(event, context):
cleaner = StorageCleaner(dry_run=False)
# Clean orphan volumes older than 14 days
volume_report = cleaner.delete_orphan_volumes(min_age_days=14)
# Clean old checkpoints
checkpoint_deleted = cleaner.cleanup_s3_checkpoints(
bucket="ml-checkpoints",
prefix="training/",
keep_last_n=10
)
return {
"statusCode": 200,
"body": {
"volumes_deleted": volume_report.orphan_volumes_deleted,
"storage_freed_gb": volume_report.orphan_volumes_size_gb,
"checkpoints_deleted": checkpoint_deleted,
"monthly_savings": volume_report.estimated_monthly_savings
}
}
S3 Lifecycle Policy
# s3_lifecycle.tf
resource "aws_s3_bucket_lifecycle_configuration" "ml_data" {
bucket = aws_s3_bucket.ml_data.id
# Checkpoints - aggressive cleanup
rule {
id = "checkpoint-cleanup"
status = "Enabled"
filter {
prefix = "checkpoints/"
}
transition {
days = 7
storage_class = "STANDARD_IA"
}
transition {
days = 30
storage_class = "GLACIER"
}
expiration {
days = 90
}
}
# Training datasets - keep longer
rule {
id = "dataset-lifecycle"
status = "Enabled"
filter {
prefix = "datasets/"
}
transition {
days = 30
storage_class = "STANDARD_IA"
}
transition {
days = 180
storage_class = "GLACIER"
}
}
# Model artifacts - preserve
rule {
id = "model-lifecycle"
status = "Enabled"
filter {
prefix = "models/"
}
transition {
days = 90
storage_class = "STANDARD_IA"
}
# No expiration - models are valuable
}
# Logs - delete aggressively
rule {
id = "logs-cleanup"
status = "Enabled"
filter {
prefix = "logs/"
}
expiration {
days = 14
}
}
}
43.3.7. GPU Sharing with MIG
A100 GPUs have 80GB memory. Most inference needs 10GB. MIG splits one GPU into 7.
MIG Configuration
# nvidia-mig-config.yaml
apiVersion: v1
kind: ConfigMap
metadata:
name: nvidia-mig-config
namespace: gpu-operator
data:
config.yaml: |
version: v1
mig-configs:
all-1g.10gb:
- devices: all
mig-enabled: true
mig-devices:
"1g.10gb": 7
all-2g.20gb:
- devices: all
mig-enabled: true
mig-devices:
"2g.20gb": 3
mixed-mig:
- devices: [0]
mig-enabled: true
mig-devices:
"1g.10gb": 4
"2g.20gb": 1
- devices: [1]
mig-enabled: false
Cost Comparison
| Scenario | Hardware | Cost/Hour | Jobs Served | Cost/Job |
|---|---|---|---|---|
| Full A100 | 1x A100 | $4.10 | 1 | $4.10 |
| MIG 7x | 1x A100 (7 slices) | $4.10 | 7 | $0.59 |
| 7x Smaller GPUs | 7x T4 | $3.50 | 7 | $0.50 |
Verdict: MIG is optimal when you need A100 tensor cores but not full memory.
43.3.8. Budget Alerts and Governance
AWS Budget Terraform
# budgets.tf
resource "aws_budgets_budget" "ml_monthly" {
name = "ml-monthly-budget"
budget_type = "COST"
limit_amount = "10000"
limit_unit = "USD"
time_unit = "MONTHLY"
cost_filter {
name = "TagKeyValue"
values = [
"user:Team$DataScience"
]
}
notification {
comparison_operator = "GREATER_THAN"
threshold = 50
threshold_type = "PERCENTAGE"
notification_type = "ACTUAL"
subscriber_email_addresses = ["ml-team@company.com"]
}
notification {
comparison_operator = "GREATER_THAN"
threshold = 80
threshold_type = "PERCENTAGE"
notification_type = "ACTUAL"
subscriber_email_addresses = ["ml-lead@company.com", "finance@company.com"]
}
notification {
comparison_operator = "GREATER_THAN"
threshold = 100
threshold_type = "PERCENTAGE"
notification_type = "ACTUAL"
subscriber_email_addresses = ["cto@company.com"]
}
notification {
comparison_operator = "GREATER_THAN"
threshold = 100
threshold_type = "PERCENTAGE"
notification_type = "FORECASTED"
subscriber_email_addresses = ["ml-lead@company.com"]
}
}
resource "aws_budgets_budget_action" "stop_instances" {
budget_name = aws_budgets_budget.ml_monthly.name
action_type = "RUN_SSM_DOCUMENTS"
approval_model = "AUTOMATIC"
notification_type = "ACTUAL"
action_threshold {
action_threshold_type = "PERCENTAGE"
action_threshold_value = 120
}
definition {
ssm_action_definition {
action_sub_type = "STOP_EC2_INSTANCES"
region = var.region
instance_ids = [] # Will stop tagged instances
}
}
execution_role_arn = aws_iam_role.budget_action.arn
subscriber {
subscription_type = "EMAIL"
address = "emergency@company.com"
}
}
43.3.9. Summary Checklist
| Category | Action | Priority | Savings |
|---|---|---|---|
| Spot | Use price-capacity-optimized allocation | Critical | 60-90% |
| Spot | Implement graceful checkpointing | Critical | Prevents data loss |
| Spot | Diversify across 4+ instance types | High | Reduces interruptions |
| Autoscaling | Deploy Karpenter over Cluster Autoscaler | High | Faster scaling |
| Storage | Set S3 lifecycle policies | High | 50-80% on old data |
| Storage | Weekly orphan volume cleanup | Medium | Variable |
| Governance | Enable Infracost in CI/CD | High | Prevents surprises |
| Governance | Set budget alerts at 50/80/100% | Critical | Visibility |
| GPU | Use MIG for inference workloads | Medium | 7x efficiency |
| Tagging | Enforce Team/Project tags | High | Cost allocation |
Quick Decision Matrix
| Workload Type | Spot Safe? | Recommended Instance | Fallback |
|---|---|---|---|
| Training (long) | ⚠️ With checkpoints | p3, g5 | On-demand |
| Training (short) | ✅ | g4dn, g5 | Different AZ |
| Inference (batch) | ✅ | g4dn, T4 | On-demand queue |
| Inference (real-time) | ❌ | On-demand or reserved | N/A |
| Dev/Experiments | ✅ | Spot only | Wait |
[End of Section 43.3]