15.1 Managed Real-Time Inference: SageMaker & Vertex AI
15.1.1 Introduction to Managed Inference Services
When a user clicks “Buy Now” on an e-commerce site, swipes a credit card, or uploads an X-ray for diagnosis, they expect an immediate response. This is the domain of Real-Time Inference—synchronous, low-latency prediction serving where milliseconds matter and reliability is non-negotiable.
Managed inference services abstract away the operational complexity of running production ML systems. They handle load balancing, auto-scaling, health monitoring, and infrastructure provisioning, allowing ML teams to focus on model quality rather than DevOps toil. However, “managed” does not mean “zero-ops.” Understanding the architecture, configuration options, and operational patterns of these services is critical for building production-grade systems.
This chapter provides an exhaustive technical deep dive into the two dominant managed platforms: Amazon SageMaker Real-time Inference and Google Cloud Vertex AI Prediction. We will explore their architectures, implementation patterns, security models, cost structures, and operational best practices at a level suitable for Principal Engineers and Platform Architects.
The Promise and Reality of Managed Services
Managed inference services promise to handle:
- Infrastructure Provisioning: Automatic allocation of EC2/Compute Engine instances with the correct GPU drivers and ML frameworks.
- Load Balancing: Distributing traffic across multiple instances with health checking and automatic failover.
- Auto-Scaling: Dynamic adjustment of fleet size based on traffic patterns and custom metrics.
- Availability: Multi-AZ/Multi-Zone deployment with SLA guarantees (typically 99.9% or 99.95%).
- Patching: Automated OS and container runtime security updates.
However, the user still owns critical responsibilities:
- Model Container Code: The serving logic, pre/post-processing, and error handling.
- IAM and Security: Network policies, encryption, and access control.
- Cost Optimization: Instance selection, auto-scaling policies, and utilization monitoring.
- Performance Tuning: Batch size configuration, worker count, and memory allocation.
Understanding where the provider’s responsibilities end and yours begin is the key to successful deployments.
15.1.2 Amazon SageMaker Real-Time Inference
SageMaker Real-time Inference is AWS’s flagship managed serving solution. It is engineered for high availability and supports complex deployment patterns like multi-model endpoints and production variants.
Architecture: The Three-Tier Stack
A SageMaker Endpoint is a logical abstraction over a complex physical infrastructure:
graph TD
Client[Client Application] -->|HTTPS| ALB[SageMaker ALB<br/>TLS Termination]
ALB -->|Route| AZ1[Availability Zone 1]
ALB -->|Route| AZ2[Availability Zone 2]
subgraph AZ1
Inst1[ml.g4dn.xlarge]
Agent1[SageMaker Agent]
Container1[Model Container]
Model1[Loaded Model]
Agent1 -->|Lifecycle| Container1
Container1 -->|Inference| Model1
end
subgraph AZ2
Inst2[ml.g4dn.xlarge]
Agent2[SageMaker Agent]
Container2[Model Container]
Model2[Loaded Model]
Agent2 -->|Lifecycle| Container2
Container2 -->|Inference| Model2
end
Key Components:
-
Application Load Balancer (ALB): A managed, invisible ALB sits in front of your endpoint. It handles:
- TLS termination (using AWS-managed certificates or customer-provided certs via ACM).
- Health checking (periodic pings to the
/pingendpoint of each instance). - Cross-AZ load balancing for high availability.
-
SageMaker Agent: A sidecar process running on each instance that:
- Manages the lifecycle of the model container (start, stop, health checks).
- Collects CloudWatch metrics (invocations, latency, errors).
- Handles Data Capture for Model Monitor.
-
Model Container: Your Docker image (or a pre-built framework image) that implements the serving logic.
-
Instance Fleet: EC2 instances (with the
ml.*prefix) optimized for ML workloads, often with attached GPUs or AWS-custom accelerators (Inferentia, Trainium).
The Model Artifact Structure
SageMaker expects model artifacts to be packaged as a compressed tarball (.tar.gz) and stored in S3. The structure depends on whether you’re using a pre-built framework container or a custom container.
For Framework Containers (PyTorch, TensorFlow, Sklearn):
model.tar.gz
├── model.pth (or model.joblib, saved_model/, etc.)
├── code/
│ ├── inference.py
│ └── requirements.txt
└── (optional) config files
Example: Packaging a PyTorch Model:
# Directory structure
model/
├── code/
│ ├── inference.py
│ └── requirements.txt
└── model.pth
# Create the tarball (IMPORTANT: tar from inside the directory)
cd model
tar -czf ../model.tar.gz .
cd ..
# Upload to S3 with versioning
aws s3 cp model.tar.gz s3://my-mlops-bucket/models/fraud-detector/v1.2.3/model.tar.gz
Best Practice: Never overwrite artifacts. Use semantic versioning in S3 keys (/v1.2.3/) to ensure immutability and enable rollback.
The Inference Script Contract
The inference.py script (or equivalent) must implement a specific contract for framework containers. This contract consists of four functions:
# inference.py
import os
import json
import logging
import torch
import torch.nn.functional as F
from transformers import BertTokenizer, BertForSequenceClassification
# Configure logging
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
handler = logging.StreamHandler()
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
logger.addHandler(handler)
# Global variables (loaded once per container lifecycle)
MODEL = None
TOKENIZER = None
DEVICE = None
def model_fn(model_dir):
"""
Loads the model from disk into memory.
This function is called ONCE when the container starts.
Args:
model_dir (str): Path to the directory containing model artifacts
Returns:
The loaded model object
"""
global MODEL, TOKENIZER, DEVICE
# Determine device
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
logger.info(f"Loading model on device: {DEVICE}")
try:
# Load the model architecture and weights
model_path = os.path.join(model_dir, 'model.pth')
# Option 1: If you saved the entire model
# MODEL = torch.load(model_path, map_location=DEVICE)
# Option 2: If you saved state_dict (recommended)
MODEL = BertForSequenceClassification.from_pretrained(model_dir)
MODEL.load_state_dict(torch.load(model_path, map_location=DEVICE))
MODEL.to(DEVICE)
MODEL.eval() # Set to evaluation mode (disables dropout, etc.)
# Load tokenizer
TOKENIZER = BertTokenizer.from_pretrained(model_dir)
logger.info("Model loaded successfully")
return MODEL
except Exception as e:
logger.error(f"Failed to load model: {str(e)}", exc_info=True)
raise
def input_fn(request_body, request_content_type):
"""
Deserializes the request payload.
This function is called for EVERY request.
Args:
request_body: The raw request body (bytes or str)
request_content_type: The Content-Type header value
Returns:
Deserialized input data (any Python object)
"""
logger.debug(f"Received request with content-type: {request_content_type}")
if request_content_type == 'application/json':
try:
data = json.loads(request_body)
# Expect {"inputs": ["text1", "text2", ...]} or {"inputs": "single text"}
if 'inputs' not in data:
raise ValueError("Request must contain 'inputs' field")
inputs = data['inputs']
# Normalize to list
if isinstance(inputs, str):
inputs = [inputs]
return inputs
except json.JSONDecodeError as e:
raise ValueError(f"Invalid JSON: {str(e)}")
elif request_content_type == 'text/csv':
# Simple CSV handling (one column)
return [line.strip() for line in request_body.decode('utf-8').split('\n') if line.strip()]
elif request_content_type == 'text/plain':
# Single text input
return [request_body.decode('utf-8').strip()]
else:
raise ValueError(f"Unsupported content type: {request_content_type}")
def predict_fn(input_object, model):
"""
Performs the actual inference.
This function is called for EVERY request.
Args:
input_object: The output of input_fn
model: The output of model_fn
Returns:
Inference results (any Python object)
"""
global TOKENIZER, DEVICE
logger.info(f"Running prediction on {len(input_object)} inputs")
try:
# Tokenize the batch
encoded = TOKENIZER(
input_object,
padding="max_length",
truncation=True,
max_length=128,
return_tensors="pt"
)
input_ids = encoded['input_ids'].to(DEVICE)
attention_mask = encoded['attention_mask'].to(DEVICE)
# Run inference (no gradient computation)
with torch.no_grad():
outputs = model(input_ids=input_ids, attention_mask=attention_mask)
logits = outputs.logits
probs = F.softmax(logits, dim=1)
return probs
except Exception as e:
logger.error(f"Prediction failed: {str(e)}", exc_info=True)
raise RuntimeError(f"Inference error: {str(e)}")
def output_fn(predictions, response_content_type):
"""
Serializes the prediction results.
This function is called for EVERY request.
Args:
predictions: The output of predict_fn
response_content_type: The Accept header value
Returns:
Serialized response body (str or bytes)
"""
logger.debug("Serializing output")
if response_content_type == 'application/json':
# Convert tensor to list
result = predictions.cpu().numpy().tolist()
return json.dumps({'predictions': result})
elif response_content_type == 'text/csv':
# Return as CSV (one row per input)
result = predictions.cpu().numpy()
csv_rows = [','.join(map(str, row)) for row in result]
return '\n'.join(csv_rows)
else:
raise ValueError(f"Unsupported accept type: {response_content_type}")
Performance Considerations:
-
Global Variables: Load heavy resources (models, tokenizers) in the global scope or in
model_fn. They persist across requests, avoiding repeated loading. -
GPU Warmup: The first inference on a cold container may be slower due to CUDA initialization. Consider running a dummy inference in
model_fn. -
Batch-Aware Code: If using batching (via SageMaker’s built-in batching or multi-model endpoints), ensure your code handles lists of inputs efficiently.
-
Error Handling: Wrap critical sections in try/except to return meaningful error messages rather than crashing the container.
Infrastructure as Code: Terraform
While the SageMaker Python SDK is convenient for exploration, production deployments demand Infrastructure as Code. Terraform provides declarative, version-controlled infrastructure.
Complete Terraform Example:
# variables.tf
variable "model_name" {
description = "Name of the model"
type = string
default = "fraud-detector"
}
variable "model_version" {
description = "Model version"
type = string
default = "v1.2.3"
}
variable "instance_type" {
description = "SageMaker instance type"
type = string
default = "ml.g4dn.xlarge"
}
variable "instance_count" {
description = "Initial instance count"
type = number
default = 2
}
# iam.tf
data "aws_iam_policy_document" "sagemaker_assume_role" {
statement {
actions = ["sts:AssumeRole"]
principals {
type = "Service"
identifiers = ["sagemaker.amazonaws.com"]
}
}
}
resource "aws_iam_role" "sagemaker_execution_role" {
name = "${var.model_name}-sagemaker-role"
assume_role_policy = data.aws_iam_policy_document.sagemaker_assume_role.json
}
resource "aws_iam_role_policy_attachment" "sagemaker_full_access" {
role = aws_iam_role.sagemaker_execution_role.name
policy_arn = "arn:aws:iam::aws:policy/AmazonSageMakerFullAccess"
}
# Additional policy for S3 access
resource "aws_iam_role_policy" "s3_access" {
name = "${var.model_name}-s3-access"
role = aws_iam_role.sagemaker_execution_role.id
policy = jsonencode({
Version = "2012-10-17"
Statement = [
{
Effect = "Allow"
Action = [
"s3:GetObject",
"s3:ListBucket"
]
Resource = [
"arn:aws:s3:::my-mlops-bucket/*",
"arn:aws:s3:::my-mlops-bucket"
]
}
]
})
}
# model.tf
resource "aws_sagemaker_model" "model" {
name = "${var.model_name}-${var.model_version}"
execution_role_arn = aws_iam_role.sagemaker_execution_role.arn
primary_container {
image = "763104351884.dkr.ecr.us-east-1.amazonaws.com/pytorch-inference:2.0.0-gpu-py310-cu118-ubuntu20.04-sagemaker"
model_data_url = "s3://my-mlops-bucket/models/${var.model_name}/${var.model_version}/model.tar.gz"
environment = {
"SAGEMAKER_PROGRAM" = "inference.py"
"SAGEMAKER_SUBMIT_DIRECTORY" = "s3://my-mlops-bucket/models/${var.model_name}/${var.model_version}/model.tar.gz"
"SAGEMAKER_REGION" = "us-east-1"
"TS_MAX_RESPONSE_SIZE" = "20971520" # 20MB
"TS_MAX_REQUEST_SIZE" = "10485760" # 10MB
"TS_DEFAULT_WORKERS_PER_MODEL"= "1" # One worker per GPU
"OMP_NUM_THREADS" = "1" # Prevent CPU over-subscription
"MKL_NUM_THREADS" = "1"
}
}
tags = {
Environment = "production"
Model = var.model_name
Version = var.model_version
}
}
# endpoint_config.tf
resource "aws_sagemaker_endpoint_configuration" "config" {
name = "${var.model_name}-config-${var.model_version}"
production_variants {
variant_name = "AllTraffic"
model_name = aws_sagemaker_model.model.name
initial_instance_count = var.instance_count
instance_type = var.instance_type
# Optional: Serverless config
# serverless_config {
# max_concurrency = 10
# memory_size_in_mb = 6144
# provisioned_concurrency = 2
# }
}
# Data Capture for Model Monitor
data_capture_config {
enable_capture = true
initial_sampling_percentage = 100
destination_s3_uri = "s3://my-mlops-bucket/model-monitor/${var.model_name}"
capture_options {
capture_mode = "InputAndOutput"
}
capture_content_type_header {
csv_content_types = ["text/csv"]
json_content_types = ["application/json"]
}
}
tags = {
Environment = "production"
Model = var.model_name
}
}
# endpoint.tf
resource "aws_sagemaker_endpoint" "endpoint" {
name = "${var.model_name}-prod"
endpoint_config_name = aws_sagemaker_endpoint_configuration.config.name
tags = {
Environment = "production"
Model = var.model_name
CostCenter = "ML-Platform"
}
}
# autoscaling.tf
resource "aws_appautoscaling_target" "sagemaker_target" {
max_capacity = 20
min_capacity = var.instance_count
resource_id = "endpoint/${aws_sagemaker_endpoint.endpoint.name}/variant/AllTraffic"
scalable_dimension = "sagemaker:variant:DesiredInstanceCount"
service_namespace = "sagemaker"
depends_on = [aws_sagemaker_endpoint.endpoint]
}
resource "aws_appautoscaling_policy" "sagemaker_scaling_policy" {
name = "${var.model_name}-scaling-policy"
policy_type = "TargetTrackingScaling"
resource_id = aws_appautoscaling_target.sagemaker_target.resource_id
scalable_dimension = aws_appautoscaling_target.sagemaker_target.scalable_dimension
service_namespace = aws_appautoscaling_target.sagemaker_target.service_namespace
target_tracking_scaling_policy_configuration {
predefined_metric_specification {
predefined_metric_type = "SageMakerVariantInvocationsPerInstance"
}
target_value = 1000.0 # Target 1000 invocations per minute per instance
scale_in_cooldown = 300 # Wait 5 minutes before scaling down
scale_out_cooldown = 60 # Wait 1 minute before scaling up again
}
}
# outputs.tf
output "endpoint_name" {
value = aws_sagemaker_endpoint.endpoint.name
}
output "endpoint_arn" {
value = aws_sagemaker_endpoint.endpoint.arn
}
Deploying:
terraform init
terraform plan -var="model_version=v1.2.4"
terraform apply -var="model_version=v1.2.4"
Auto-Scaling Deep Dive
Auto-scaling is critical for cost optimization and reliability. SageMaker uses AWS Application Auto Scaling, which supports several scaling strategies.
Target Tracking Scaling (Most Common):
This maintains a specified metric (like InvocationsPerInstance) at a target value. If the metric exceeds the target, it scales out. If it falls below, it scales in.
Determining the Target Value:
- Load Test: Use tools like Locust or k6 to simulate realistic traffic.
- Measure Max Throughput: Find the RPS where P99 latency stays below your SLA (e.g., 200ms).
- Add Safety Factor: Multiply by 0.7 to leave headroom for spikes.
- Convert to Invocations Per Minute:
Target = (Max RPS * 60) * 0.7
Example: If your model on ml.g4dn.xlarge handles 10 RPS comfortably:
Target = (10 * 60) * 0.7 = 420 invocations/minute
Step Scaling (For Finer Control):
Step scaling allows you to define different scaling behaviors for different metric ranges.
resource "aws_appautoscaling_policy" "step_scaling" {
name = "${var.model_name}-step-scaling"
policy_type = "StepScaling"
resource_id = aws_appautoscaling_target.sagemaker_target.resource_id
scalable_dimension = aws_appautoscaling_target.sagemaker_target.scalable_dimension
service_namespace = aws_appautoscaling_target.sagemaker_target.service_namespace
step_scaling_policy_configuration {
adjustment_type = "PercentChangeInCapacity"
cooldown = 60
metric_aggregation_type = "Average"
step_adjustment {
metric_interval_lower_bound = 0
metric_interval_upper_bound = 10
scaling_adjustment = 10 # Add 10% capacity
}
step_adjustment {
metric_interval_lower_bound = 10
metric_interval_upper_bound = 20
scaling_adjustment = 20 # Add 20% capacity
}
step_adjustment {
metric_interval_lower_bound = 20
scaling_adjustment = 30 # Add 30% capacity
}
}
}
# CloudWatch Alarm to trigger scaling
resource "aws_cloudwatch_metric_alarm" "high_invocations" {
alarm_name = "${var.model_name}-high-invocations"
comparison_operator = "GreaterThanThreshold"
evaluation_periods = 2
metric_name = "ModelLatency"
namespace = "AWS/SageMaker"
period = 60
statistic = "Average"
threshold = 200 # 200ms
dimensions = {
EndpointName = aws_sagemaker_endpoint.endpoint.name
VariantName = "AllTraffic"
}
alarm_actions = [aws_appautoscaling_policy.step_scaling.arn]
}
Multi-Model Endpoints (MME)
Multi-Model Endpoints are a game-changer for SaaS platforms that need to serve thousands of models (e.g., one model per customer).
How MME Works:
- You have a fleet of instances (e.g., 5 x
ml.m5.xlarge). - You store thousands of model artifacts in S3 under a prefix:
s3://bucket/models/customer-1/,s3://bucket/models/customer-2/, etc. - When an inference request arrives with
TargetModel=customer-1.tar.gz, SageMaker:- Checks if the model is already loaded in memory on an instance.
- If yes, routes to that instance.
- If no, downloads it from S3 to an instance, loads it, and then runs inference.
- When memory fills up, Least-Recently-Used (LRU) models are evicted.
Configuration:
from sagemaker.pytorch import PyTorchModel
model = PyTorchModel(
model_data="s3://my-bucket/models/", # Note: Directory, not .tar.gz
role=role,
framework_version="2.0.0",
entry_point="inference.py",
py_version="py310"
)
predictor = model.deploy(
initial_instance_count=5,
instance_type="ml.m5.2xlarge",
endpoint_name="multi-model-endpoint"
)
Invoking with a Specific Model:
import boto3
runtime_client = boto3.client('sagemaker-runtime')
response = runtime_client.invoke_endpoint(
EndpointName='multi-model-endpoint',
TargetModel='customer-123/model.tar.gz', # Specify which model
ContentType='application/json',
Body=json.dumps({'inputs': ['Sample text']})
)
Trade-offs:
- Pros: Massive cost savings (serving 1000 models on 5 instances instead of 1000 endpoints).
- Cons: Cold start latency for models not in memory (5-30 seconds depending on model size).
Best For: B2B SaaS where each customer has a custom-trained model and queries are infrequent enough that cold starts are acceptable.
15.1.3 Google Cloud Vertex AI Prediction
Vertex AI Prediction is GCP’s answer to SageMaker Real-time Inference. It emphasizes separation of concerns: Models (the artifacts) are distinct from Endpoints (the serving infrastructure).
Architecture: The Model-Endpoint Duality
graph TD
Client[Client] -->|HTTPS| LB[Load Balancer]
LB -->|Route| Endpoint[Vertex AI Endpoint]
Endpoint -->|90% Traffic| DM1[DeployedModel v1.0]
Endpoint -->|10% Traffic| DM2[DeployedModel v2.0]
DM1 -->|References| Model1[Model Resource v1.0]
DM2 -->|References| Model2[Model Resource v2.0]
Model1 -->|Artifacts| GCS1[gs://bucket/models/v1/]
Model2 -->|Artifacts| GCS2[gs://bucket/models/v2/]
Key Concepts:
- Model: A registry entry pointing to artifacts in GCS and specifying a serving container.
- Endpoint: A URL and compute resource pool.
- DeployedModel: The association between a Model and an Endpoint, with traffic percentage.
This allows you to deploy multiple model versions to the same endpoint and split traffic for A/B testing or canary rollouts.
Custom Prediction Routines (CPR)
While Vertex AI supports pre-built containers (TensorFlow, scikit-learn, XGBoost), production systems often require custom logic. CPR provides a Pythonic interface for building custom serving containers.
The Predictor Class:
# predictor.py
from google.cloud.aiplatform.prediction.predictor import Predictor
from google.cloud.aiplatform.utils import prediction_utils
import numpy as np
import joblib
import os
class CustomPredictor(Predictor):
"""
Custom predictor implementing the CPR interface.
"""
def __init__(self):
"""
Constructor. Do NOT load model here (not yet available).
"""
self._model = None
self._preprocessor = None
def load(self, artifacts_uri: str) -> None:
"""
Loads the model from the artifacts directory.
Called ONCE when the container starts.
Args:
artifacts_uri: GCS path (e.g., gs://bucket/model/) or local path
"""
# Download artifacts from GCS if needed
prediction_utils.download_model_artifacts(artifacts_uri)
# Load model
model_path = os.path.join(artifacts_uri, 'model.joblib')
self._model = joblib.load(model_path)
# Load preprocessor
preprocessor_path = os.path.join(artifacts_uri, 'preprocessor.joblib')
if os.path.exists(preprocessor_path):
self._preprocessor = joblib.load(preprocessor_path)
def preprocess(self, prediction_input: dict) -> np.ndarray:
"""
Preprocesses the input.
Called for EVERY request.
Args:
prediction_input: {"instances": [[f1, f2, ...], ...]}
Returns:
Numpy array ready for model.predict()
"""
instances = prediction_input["instances"]
arr = np.array(instances)
if self._preprocessor:
arr = self._preprocessor.transform(arr)
return arr
def predict(self, instances: np.ndarray) -> np.ndarray:
"""
Runs inference.
Called for EVERY request.
Args:
instances: Preprocessed input array
Returns:
Predictions as numpy array
"""
return self._model.predict(instances)
def postprocess(self, prediction_results: np.ndarray) -> dict:
"""
Formats the output.
Called for EVERY request.
Args:
prediction_results: Raw model outputs
Returns:
{"predictions": [...]}
"""
return {"predictions": prediction_results.tolist()}
Building and Uploading the Model:
from google.cloud import aiplatform
from google.cloud.aiplatform.prediction import LocalModel
# Build the container locally
local_model = LocalModel.build_cpr_model(
source_dir="src", # Directory containing predictor.py
output_image_uri=f"us-docker.pkg.dev/{PROJECT_ID}/ml-repo/custom-predictor:v1",
predictor=CustomPredictor,
requirements_path="src/requirements.txt",
extra_packages=[]
)
# Push to Artifact Registry
local_model.push_image()
# Upload to Vertex AI Model Registry
model = local_model.upload(
display_name="fraud-detector-v1",
artifact_uri=f"gs://{BUCKET_NAME}/models/fraud-detector/v1",
serving_container_ports=[8080],
)
print(f"Model uploaded: {model.resource_name}")
Deploying to an Endpoint
Step 1: Create an Endpoint
from google.cloud import aiplatform
aiplatform.init(project=PROJECT_ID, location=REGION)
endpoint = aiplatform.Endpoint.create(
display_name="fraud-detection-endpoint",
description="Production fraud detection endpoint",
labels={"env": "prod", "team": "ml-platform"}
)
Step 2: Deploy the Model
model.deploy(
endpoint=endpoint,
deployed_model_display_name="fraud-v1",
machine_type="n1-standard-4",
min_replica_count=2,
max_replica_count=10,
accelerator_type="NVIDIA_TESLA_T4", # Optional GPU
accelerator_count=1,
traffic_percentage=100,
# Auto-scaling settings
autoscaling_target_cpu_utilization=60, # Scale when CPU > 60%
autoscaling_target_accelerator_duty_cycle=80, # Scale when GPU > 80%
)
Traffic Splitting for A/B Testing
Vertex AI makes canary deployments trivial.
Scenario: Deploy v2 with 10% traffic, v1 keeps 90%.
# Deploy v2 to the same endpoint
model_v2.deploy(
endpoint=endpoint,
deployed_model_display_name="fraud-v2",
machine_type="n1-standard-4",
min_replica_count=1,
max_replica_count=5,
traffic_percentage=10, # 10% to v2
traffic_split={
"fraud-v1": 90, # 90% to v1
"fraud-v2": 10 # 10% to v2
}
)
Monitoring the Split:
# Get traffic allocation
endpoint.list_deployed_models()
# Returns: [
# {"id": "...", "display_name": "fraud-v1", "traffic_split": 90},
# {"id": "...", "display_name": "fraud-v2", "traffic_split": 10}
# ]
Promoting v2:
# Send 100% traffic to v2
endpoint.update_traffic_split({"fraud-v2": 100})
# Optionally undeploy v1
endpoint.undeploy(deployed_model_id="fraud-v1-id")
Private Endpoints and VPC Service Controls
Enterprise deployments require private networking.
Private Service Connect (PSC):
from google.cloud import aiplatform
endpoint = aiplatform.Endpoint.create(
display_name="private-fraud-endpoint",
network="projects/{PROJECT_NUMBER}/global/networks/{VPC_NAME}",
encryption_spec_key_name=f"projects/{PROJECT_ID}/locations/{REGION}/keyRings/my-kr/cryptoKeys/my-key"
)
This creates an endpoint accessible only within your VPC, with no public internet exposure.
15.1.4 Comparative Analysis
| Feature | AWS SageMaker | GCP Vertex AI |
|---|---|---|
| Billing Model | Instance-hour (24/7 running) | Node-hour (24/7 running) |
| Deployment Abstraction | Model → EndpointConfig → Endpoint | Model → Endpoint → DeployedModel |
| Multi-Model Serving | Multi-Model Endpoints (MME) - Very efficient | Manual (deploy multiple Models to one Endpoint) |
| Traffic Splitting | Production Variants (cumbersome) | Native, elegant traffic_percentage |
| Protocol | HTTP/REST (gRPC via custom setup) | HTTP/REST and gRPC native |
| Private Networking | VPC Endpoints (PrivateLink) | Private Service Connect (PSC) |
| Log Latency | CloudWatch (1-5 min delay) | Cloud Logging (near real-time) |
| GPU Variety | T4, A10G, V100, A100, Inferentia, Trainium | T4, L4, A100, H100, TPU |
Key Differentiator: MME: For multi-tenant SaaS (one model per customer), SageMaker’s MME is a 10x cost saver. Vertex AI doesn’t have an equivalent.
Key Differentiator: Traffic Splitting: Vertex AI’s traffic splitting is far more elegant and Pythonic than SageMaker’s Production Variants.
15.1.5 Monitoring and Observability
Deploying is 10% of the work. Keeping the system healthy is the other 90%.
The Four Golden Signals
- Latency: How long does it take to return a prediction?
- Traffic: How many requests per second?
- Errors: What percentage of requests fail?
- Saturation: Are resources (CPU/GPU/Memory) approaching limits?
SageMaker CloudWatch Metrics:
import boto3
cloudwatch = boto3.client('cloudwatch')
# Query P99 latency
response = cloudwatch.get_metric_statistics(
Namespace='AWS/SageMaker',
MetricName='ModelLatency',
Dimensions=[
{'Name': 'EndpointName', 'Value': 'fraud-detector-prod'},
{'Name': 'VariantName', 'Value': 'AllTraffic'}
],
StartTime=datetime.utcnow() - timedelta(hours=1),
EndTime=datetime.utcnow(),
Period=300,
Statistics=['Average', 'Maximum'],
ExtendedStatistics=['p99']
)
Vertex AI Monitoring (Cloud Monitoring):
from google.cloud import monitoring_v3
client = monitoring_v3.MetricServiceClient()
project_name = f"projects/{PROJECT_ID}"
# Query request count
query = monitoring_v3.TimeSeriesQuery(
query=f'''
fetch aiplatform.googleapis.com/prediction/online/prediction_count
| filter resource.endpoint_id == "{ENDPOINT_ID}"
| group_by 1m, mean(val())
'''
)
results = client.query_time_series(request={"name": project_name, "query": query.query})
SageMaker Model Monitor
Model Monitor automatically detects data drift and model quality degradation.
Setup:
from sagemaker.model_monitor import DefaultModelMonitor, CronExpressionGenerator
monitor = DefaultModelMonitor(
role=role,
instance_count=1,
instance_type='ml.m5.xlarge',
volume_size_in_gb=20,
max_runtime_in_seconds=3600
)
monitor.create_monitoring_schedule(
endpoint_input=predictor.endpoint_name,
output_s3_uri=f's3://my-bucket/model-monitor/reports',
statistics=baseline_statistics_path,
constraints=baseline_constraints_path,
schedule_cron_expression=CronExpressionGenerator.hourly()
)
This runs hourly jobs to compare live traffic against the training baseline.
15.1.6 Cost Optimization Strategies
1. Instance Right-Sizing:
Use CloudWatch GPU Utilization metrics. If consistently < 20%, downgrade to CPU or smaller GPU.
2. Spot Instances (Experimental):
Not officially supported, but you can deploy custom containers on EC2 Spot behind your own ALB.
3. Serverless Inference (SageMaker):
For sporadic workloads, use SageMaker Serverless:
from sagemaker.serverless import ServerlessInferenceConfig
serverless_config = ServerlessInferenceConfig(
memory_size_in_mb=4096,
max_concurrency=10,
provisioned_concurrency=2 # Keep 2 warm
)
predictor = model.deploy(
serverless_inference_config=serverless_config
)
Cost Comparison:
- Real-time: $0.736/hour = $531/month (24/7)
- Serverless: $0.20/hour compute + $0.000001/request (scales to zero)
15.1.7 Conclusion
Managed real-time inference services provide a robust foundation for production ML systems. SageMaker excels in multi-tenant scenarios with MME, while Vertex AI provides a cleaner API and superior traffic splitting. Both require deep understanding of their operational knobs—auto-scaling policies, instance selection, and monitoring—to deliver cost-effective, reliable predictions at scale.