Keyboard shortcuts

Press or to navigate between chapters

Press ? to show this help

Press Esc to hide this help

15.1 Managed Real-Time Inference: SageMaker & Vertex AI

15.1.1 Introduction to Managed Inference Services

When a user clicks “Buy Now” on an e-commerce site, swipes a credit card, or uploads an X-ray for diagnosis, they expect an immediate response. This is the domain of Real-Time Inference—synchronous, low-latency prediction serving where milliseconds matter and reliability is non-negotiable.

Managed inference services abstract away the operational complexity of running production ML systems. They handle load balancing, auto-scaling, health monitoring, and infrastructure provisioning, allowing ML teams to focus on model quality rather than DevOps toil. However, “managed” does not mean “zero-ops.” Understanding the architecture, configuration options, and operational patterns of these services is critical for building production-grade systems.

This chapter provides an exhaustive technical deep dive into the two dominant managed platforms: Amazon SageMaker Real-time Inference and Google Cloud Vertex AI Prediction. We will explore their architectures, implementation patterns, security models, cost structures, and operational best practices at a level suitable for Principal Engineers and Platform Architects.

The Promise and Reality of Managed Services

Managed inference services promise to handle:

  1. Infrastructure Provisioning: Automatic allocation of EC2/Compute Engine instances with the correct GPU drivers and ML frameworks.
  2. Load Balancing: Distributing traffic across multiple instances with health checking and automatic failover.
  3. Auto-Scaling: Dynamic adjustment of fleet size based on traffic patterns and custom metrics.
  4. Availability: Multi-AZ/Multi-Zone deployment with SLA guarantees (typically 99.9% or 99.95%).
  5. Patching: Automated OS and container runtime security updates.

However, the user still owns critical responsibilities:

  • Model Container Code: The serving logic, pre/post-processing, and error handling.
  • IAM and Security: Network policies, encryption, and access control.
  • Cost Optimization: Instance selection, auto-scaling policies, and utilization monitoring.
  • Performance Tuning: Batch size configuration, worker count, and memory allocation.

Understanding where the provider’s responsibilities end and yours begin is the key to successful deployments.


15.1.2 Amazon SageMaker Real-Time Inference

SageMaker Real-time Inference is AWS’s flagship managed serving solution. It is engineered for high availability and supports complex deployment patterns like multi-model endpoints and production variants.

Architecture: The Three-Tier Stack

A SageMaker Endpoint is a logical abstraction over a complex physical infrastructure:

graph TD
    Client[Client Application] -->|HTTPS| ALB[SageMaker ALB<br/>TLS Termination]
    ALB -->|Route| AZ1[Availability Zone 1]
    ALB -->|Route| AZ2[Availability Zone 2]
    
    subgraph AZ1
        Inst1[ml.g4dn.xlarge]
        Agent1[SageMaker Agent]
        Container1[Model Container]
        Model1[Loaded Model]
        
        Agent1 -->|Lifecycle| Container1
        Container1 -->|Inference| Model1
    end
    
    subgraph AZ2
        Inst2[ml.g4dn.xlarge]
        Agent2[SageMaker Agent]
        Container2[Model Container]
        Model2[Loaded Model]
        
        Agent2 -->|Lifecycle| Container2
        Container2 -->|Inference| Model2
    end

Key Components:

  1. Application Load Balancer (ALB): A managed, invisible ALB sits in front of your endpoint. It handles:

    • TLS termination (using AWS-managed certificates or customer-provided certs via ACM).
    • Health checking (periodic pings to the /ping endpoint of each instance).
    • Cross-AZ load balancing for high availability.
  2. SageMaker Agent: A sidecar process running on each instance that:

    • Manages the lifecycle of the model container (start, stop, health checks).
    • Collects CloudWatch metrics (invocations, latency, errors).
    • Handles Data Capture for Model Monitor.
  3. Model Container: Your Docker image (or a pre-built framework image) that implements the serving logic.

  4. Instance Fleet: EC2 instances (with the ml.* prefix) optimized for ML workloads, often with attached GPUs or AWS-custom accelerators (Inferentia, Trainium).

The Model Artifact Structure

SageMaker expects model artifacts to be packaged as a compressed tarball (.tar.gz) and stored in S3. The structure depends on whether you’re using a pre-built framework container or a custom container.

For Framework Containers (PyTorch, TensorFlow, Sklearn):

model.tar.gz
├── model.pth (or model.joblib, saved_model/, etc.)
├── code/
│   ├── inference.py
│   └── requirements.txt
└── (optional) config files

Example: Packaging a PyTorch Model:

# Directory structure
model/
├── code/
│   ├── inference.py
│   └── requirements.txt
└── model.pth

# Create the tarball (IMPORTANT: tar from inside the directory)
cd model
tar -czf ../model.tar.gz .
cd ..

# Upload to S3 with versioning
aws s3 cp model.tar.gz s3://my-mlops-bucket/models/fraud-detector/v1.2.3/model.tar.gz

Best Practice: Never overwrite artifacts. Use semantic versioning in S3 keys (/v1.2.3/) to ensure immutability and enable rollback.

The Inference Script Contract

The inference.py script (or equivalent) must implement a specific contract for framework containers. This contract consists of four functions:

# inference.py
import os
import json
import logging
import torch
import torch.nn.functional as F
from transformers import BertTokenizer, BertForSequenceClassification

# Configure logging
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
handler = logging.StreamHandler()
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
logger.addHandler(handler)

# Global variables (loaded once per container lifecycle)
MODEL = None
TOKENIZER = None
DEVICE = None

def model_fn(model_dir):
    """
    Loads the model from disk into memory.
    This function is called ONCE when the container starts.
    
    Args:
        model_dir (str): Path to the directory containing model artifacts
        
    Returns:
        The loaded model object
    """
    global MODEL, TOKENIZER, DEVICE
    
    # Determine device
    DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    logger.info(f"Loading model on device: {DEVICE}")
    
    try:
        # Load the model architecture and weights
        model_path = os.path.join(model_dir, 'model.pth')
        
        # Option 1: If you saved the entire model
        # MODEL = torch.load(model_path, map_location=DEVICE)
        
        # Option 2: If you saved state_dict (recommended)
        MODEL = BertForSequenceClassification.from_pretrained(model_dir)
        MODEL.load_state_dict(torch.load(model_path, map_location=DEVICE))
        
        MODEL.to(DEVICE)
        MODEL.eval()  # Set to evaluation mode (disables dropout, etc.)
        
        # Load tokenizer
        TOKENIZER = BertTokenizer.from_pretrained(model_dir)
        
        logger.info("Model loaded successfully")
        return MODEL
        
    except Exception as e:
        logger.error(f"Failed to load model: {str(e)}", exc_info=True)
        raise

def input_fn(request_body, request_content_type):
    """
    Deserializes the request payload.
    This function is called for EVERY request.
    
    Args:
        request_body: The raw request body (bytes or str)
        request_content_type: The Content-Type header value
        
    Returns:
        Deserialized input data (any Python object)
    """
    logger.debug(f"Received request with content-type: {request_content_type}")
    
    if request_content_type == 'application/json':
        try:
            data = json.loads(request_body)
            
            # Expect {"inputs": ["text1", "text2", ...]} or {"inputs": "single text"}
            if 'inputs' not in data:
                raise ValueError("Request must contain 'inputs' field")
            
            inputs = data['inputs']
            # Normalize to list
            if isinstance(inputs, str):
                inputs = [inputs]
            
            return inputs
            
        except json.JSONDecodeError as e:
            raise ValueError(f"Invalid JSON: {str(e)}")
    
    elif request_content_type == 'text/csv':
        # Simple CSV handling (one column)
        return [line.strip() for line in request_body.decode('utf-8').split('\n') if line.strip()]
    
    elif request_content_type == 'text/plain':
        # Single text input
        return [request_body.decode('utf-8').strip()]
    
    else:
        raise ValueError(f"Unsupported content type: {request_content_type}")

def predict_fn(input_object, model):
    """
    Performs the actual inference.
    This function is called for EVERY request.
    
    Args:
        input_object: The output of input_fn
        model: The output of model_fn
        
    Returns:
        Inference results (any Python object)
    """
    global TOKENIZER, DEVICE
    
    logger.info(f"Running prediction on {len(input_object)} inputs")
    
    try:
        # Tokenize the batch
        encoded = TOKENIZER(
            input_object,
            padding="max_length",
            truncation=True,
            max_length=128,
            return_tensors="pt"
        )
        
        input_ids = encoded['input_ids'].to(DEVICE)
        attention_mask = encoded['attention_mask'].to(DEVICE)
        
        # Run inference (no gradient computation)
        with torch.no_grad():
            outputs = model(input_ids=input_ids, attention_mask=attention_mask)
            logits = outputs.logits
            probs = F.softmax(logits, dim=1)
        
        return probs
        
    except Exception as e:
        logger.error(f"Prediction failed: {str(e)}", exc_info=True)
        raise RuntimeError(f"Inference error: {str(e)}")

def output_fn(predictions, response_content_type):
    """
    Serializes the prediction results.
    This function is called for EVERY request.
    
    Args:
        predictions: The output of predict_fn
        response_content_type: The Accept header value
        
    Returns:
        Serialized response body (str or bytes)
    """
    logger.debug("Serializing output")
    
    if response_content_type == 'application/json':
        # Convert tensor to list
        result = predictions.cpu().numpy().tolist()
        return json.dumps({'predictions': result})
    
    elif response_content_type == 'text/csv':
        # Return as CSV (one row per input)
        result = predictions.cpu().numpy()
        csv_rows = [','.join(map(str, row)) for row in result]
        return '\n'.join(csv_rows)
    
    else:
        raise ValueError(f"Unsupported accept type: {response_content_type}")

Performance Considerations:

  1. Global Variables: Load heavy resources (models, tokenizers) in the global scope or in model_fn. They persist across requests, avoiding repeated loading.

  2. GPU Warmup: The first inference on a cold container may be slower due to CUDA initialization. Consider running a dummy inference in model_fn.

  3. Batch-Aware Code: If using batching (via SageMaker’s built-in batching or multi-model endpoints), ensure your code handles lists of inputs efficiently.

  4. Error Handling: Wrap critical sections in try/except to return meaningful error messages rather than crashing the container.

Infrastructure as Code: Terraform

While the SageMaker Python SDK is convenient for exploration, production deployments demand Infrastructure as Code. Terraform provides declarative, version-controlled infrastructure.

Complete Terraform Example:

# variables.tf
variable "model_name" {
  description = "Name of the model"
  type        = string
  default     = "fraud-detector"
}

variable "model_version" {
  description = "Model version"
  type        = string
  default     = "v1.2.3"
}

variable "instance_type" {
  description = "SageMaker instance type"
  type        = string
  default     = "ml.g4dn.xlarge"
}

variable "instance_count" {
  description = "Initial instance count"
  type        = number
  default     = 2
}

# iam.tf
data "aws_iam_policy_document" "sagemaker_assume_role" {
  statement {
    actions = ["sts:AssumeRole"]
    
    principals {
      type        = "Service"
      identifiers = ["sagemaker.amazonaws.com"]
    }
  }
}

resource "aws_iam_role" "sagemaker_execution_role" {
  name               = "${var.model_name}-sagemaker-role"
  assume_role_policy = data.aws_iam_policy_document.sagemaker_assume_role.json
}

resource "aws_iam_role_policy_attachment" "sagemaker_full_access" {
  role       = aws_iam_role.sagemaker_execution_role.name
  policy_arn = "arn:aws:iam::aws:policy/AmazonSageMakerFullAccess"
}

# Additional policy for S3 access
resource "aws_iam_role_policy" "s3_access" {
  name = "${var.model_name}-s3-access"
  role = aws_iam_role.sagemaker_execution_role.id

  policy = jsonencode({
    Version = "2012-10-17"
    Statement = [
      {
        Effect = "Allow"
        Action = [
          "s3:GetObject",
          "s3:ListBucket"
        ]
        Resource = [
          "arn:aws:s3:::my-mlops-bucket/*",
          "arn:aws:s3:::my-mlops-bucket"
        ]
      }
    ]
  })
}

# model.tf
resource "aws_sagemaker_model" "model" {
  name               = "${var.model_name}-${var.model_version}"
  execution_role_arn = aws_iam_role.sagemaker_execution_role.arn

  primary_container {
    image          = "763104351884.dkr.ecr.us-east-1.amazonaws.com/pytorch-inference:2.0.0-gpu-py310-cu118-ubuntu20.04-sagemaker"
    model_data_url = "s3://my-mlops-bucket/models/${var.model_name}/${var.model_version}/model.tar.gz"
    
    environment = {
      "SAGEMAKER_PROGRAM"           = "inference.py"
      "SAGEMAKER_SUBMIT_DIRECTORY"  = "s3://my-mlops-bucket/models/${var.model_name}/${var.model_version}/model.tar.gz"
      "SAGEMAKER_REGION"            = "us-east-1"
      "TS_MAX_RESPONSE_SIZE"        = "20971520"      # 20MB
      "TS_MAX_REQUEST_SIZE"         = "10485760"      # 10MB
      "TS_DEFAULT_WORKERS_PER_MODEL"= "1"             # One worker per GPU
      "OMP_NUM_THREADS"             = "1"             # Prevent CPU over-subscription
      "MKL_NUM_THREADS"             = "1"
    }
  }

  tags = {
    Environment = "production"
    Model       = var.model_name
    Version     = var.model_version
  }
}

# endpoint_config.tf
resource "aws_sagemaker_endpoint_configuration" "config" {
  name = "${var.model_name}-config-${var.model_version}"

  production_variants {
    variant_name           = "AllTraffic"
    model_name             = aws_sagemaker_model.model.name
    initial_instance_count = var.instance_count
    instance_type          = var.instance_type
    
    # Optional: Serverless config
    # serverless_config {
    #   max_concurrency       = 10
    #   memory_size_in_mb     = 6144
    #   provisioned_concurrency = 2
    # }
  }

  # Data Capture for Model Monitor
  data_capture_config {
    enable_capture              = true
    initial_sampling_percentage = 100
    destination_s3_uri          = "s3://my-mlops-bucket/model-monitor/${var.model_name}"
    
    capture_options {
      capture_mode = "InputAndOutput"
    }
    
    capture_content_type_header {
      csv_content_types  = ["text/csv"]
      json_content_types = ["application/json"]
    }
  }

  tags = {
    Environment = "production"
    Model       = var.model_name
  }
}

# endpoint.tf
resource "aws_sagemaker_endpoint" "endpoint" {
  name                 = "${var.model_name}-prod"
  endpoint_config_name = aws_sagemaker_endpoint_configuration.config.name

  tags = {
    Environment = "production"
    Model       = var.model_name
    CostCenter  = "ML-Platform"
  }
}

# autoscaling.tf
resource "aws_appautoscaling_target" "sagemaker_target" {
  max_capacity       = 20
  min_capacity       = var.instance_count
  resource_id        = "endpoint/${aws_sagemaker_endpoint.endpoint.name}/variant/AllTraffic"
  scalable_dimension = "sagemaker:variant:DesiredInstanceCount"
  service_namespace  = "sagemaker"

  depends_on = [aws_sagemaker_endpoint.endpoint]
}

resource "aws_appautoscaling_policy" "sagemaker_scaling_policy" {
  name               = "${var.model_name}-scaling-policy"
  policy_type        = "TargetTrackingScaling"
  resource_id        = aws_appautoscaling_target.sagemaker_target.resource_id
  scalable_dimension = aws_appautoscaling_target.sagemaker_target.scalable_dimension
  service_namespace  = aws_appautoscaling_target.sagemaker_target.service_namespace

  target_tracking_scaling_policy_configuration {
    predefined_metric_specification {
      predefined_metric_type = "SageMakerVariantInvocationsPerInstance"
    }
    
    target_value       = 1000.0  # Target 1000 invocations per minute per instance
    scale_in_cooldown  = 300     # Wait 5 minutes before scaling down
    scale_out_cooldown = 60      # Wait 1 minute before scaling up again
  }
}

# outputs.tf
output "endpoint_name" {
  value = aws_sagemaker_endpoint.endpoint.name
}

output "endpoint_arn" {
  value = aws_sagemaker_endpoint.endpoint.arn
}

Deploying:

terraform init
terraform plan -var="model_version=v1.2.4"
terraform apply -var="model_version=v1.2.4"

Auto-Scaling Deep Dive

Auto-scaling is critical for cost optimization and reliability. SageMaker uses AWS Application Auto Scaling, which supports several scaling strategies.

Target Tracking Scaling (Most Common):

This maintains a specified metric (like InvocationsPerInstance) at a target value. If the metric exceeds the target, it scales out. If it falls below, it scales in.

Determining the Target Value:

  1. Load Test: Use tools like Locust or k6 to simulate realistic traffic.
  2. Measure Max Throughput: Find the RPS where P99 latency stays below your SLA (e.g., 200ms).
  3. Add Safety Factor: Multiply by 0.7 to leave headroom for spikes.
  4. Convert to Invocations Per Minute:
    Target = (Max RPS * 60) * 0.7
    

Example: If your model on ml.g4dn.xlarge handles 10 RPS comfortably:

Target = (10 * 60) * 0.7 = 420 invocations/minute

Step Scaling (For Finer Control):

Step scaling allows you to define different scaling behaviors for different metric ranges.

resource "aws_appautoscaling_policy" "step_scaling" {
  name               = "${var.model_name}-step-scaling"
  policy_type        = "StepScaling"
  resource_id        = aws_appautoscaling_target.sagemaker_target.resource_id
  scalable_dimension = aws_appautoscaling_target.sagemaker_target.scalable_dimension
  service_namespace  = aws_appautoscaling_target.sagemaker_target.service_namespace

  step_scaling_policy_configuration {
    adjustment_type         = "PercentChangeInCapacity"
    cooldown                = 60
    metric_aggregation_type = "Average"

    step_adjustment {
      metric_interval_lower_bound = 0
      metric_interval_upper_bound = 10
      scaling_adjustment          = 10  # Add 10% capacity
    }

    step_adjustment {
      metric_interval_lower_bound = 10
      metric_interval_upper_bound = 20
      scaling_adjustment          = 20  # Add 20% capacity
    }

    step_adjustment {
      metric_interval_lower_bound = 20
      scaling_adjustment          = 30  # Add 30% capacity
    }
  }
}

# CloudWatch Alarm to trigger scaling
resource "aws_cloudwatch_metric_alarm" "high_invocations" {
  alarm_name          = "${var.model_name}-high-invocations"
  comparison_operator = "GreaterThanThreshold"
  evaluation_periods  = 2
  metric_name         = "ModelLatency"
  namespace           = "AWS/SageMaker"
  period              = 60
  statistic           = "Average"
  threshold           = 200  # 200ms

  dimensions = {
    EndpointName = aws_sagemaker_endpoint.endpoint.name
    VariantName  = "AllTraffic"
  }

  alarm_actions = [aws_appautoscaling_policy.step_scaling.arn]
}

Multi-Model Endpoints (MME)

Multi-Model Endpoints are a game-changer for SaaS platforms that need to serve thousands of models (e.g., one model per customer).

How MME Works:

  1. You have a fleet of instances (e.g., 5 x ml.m5.xlarge).
  2. You store thousands of model artifacts in S3 under a prefix: s3://bucket/models/customer-1/, s3://bucket/models/customer-2/, etc.
  3. When an inference request arrives with TargetModel=customer-1.tar.gz, SageMaker:
    • Checks if the model is already loaded in memory on an instance.
    • If yes, routes to that instance.
    • If no, downloads it from S3 to an instance, loads it, and then runs inference.
  4. When memory fills up, Least-Recently-Used (LRU) models are evicted.

Configuration:

from sagemaker.pytorch import PyTorchModel

model = PyTorchModel(
    model_data="s3://my-bucket/models/",  # Note: Directory, not .tar.gz
    role=role,
    framework_version="2.0.0",
    entry_point="inference.py",
    py_version="py310"
)

predictor = model.deploy(
    initial_instance_count=5,
    instance_type="ml.m5.2xlarge",
    endpoint_name="multi-model-endpoint"
)

Invoking with a Specific Model:

import boto3

runtime_client = boto3.client('sagemaker-runtime')

response = runtime_client.invoke_endpoint(
    EndpointName='multi-model-endpoint',
    TargetModel='customer-123/model.tar.gz',  # Specify which model
    ContentType='application/json',
    Body=json.dumps({'inputs': ['Sample text']})
)

Trade-offs:

  • Pros: Massive cost savings (serving 1000 models on 5 instances instead of 1000 endpoints).
  • Cons: Cold start latency for models not in memory (5-30 seconds depending on model size).

Best For: B2B SaaS where each customer has a custom-trained model and queries are infrequent enough that cold starts are acceptable.


15.1.3 Google Cloud Vertex AI Prediction

Vertex AI Prediction is GCP’s answer to SageMaker Real-time Inference. It emphasizes separation of concerns: Models (the artifacts) are distinct from Endpoints (the serving infrastructure).

Architecture: The Model-Endpoint Duality

graph TD
    Client[Client] -->|HTTPS| LB[Load Balancer]
    LB -->|Route| Endpoint[Vertex AI Endpoint]
    
    Endpoint -->|90% Traffic| DM1[DeployedModel v1.0]
    Endpoint -->|10% Traffic| DM2[DeployedModel v2.0]
    
    DM1 -->|References| Model1[Model Resource v1.0]
    DM2 -->|References| Model2[Model Resource v2.0]
    
    Model1 -->|Artifacts| GCS1[gs://bucket/models/v1/]
    Model2 -->|Artifacts| GCS2[gs://bucket/models/v2/]

Key Concepts:

  1. Model: A registry entry pointing to artifacts in GCS and specifying a serving container.
  2. Endpoint: A URL and compute resource pool.
  3. DeployedModel: The association between a Model and an Endpoint, with traffic percentage.

This allows you to deploy multiple model versions to the same endpoint and split traffic for A/B testing or canary rollouts.

Custom Prediction Routines (CPR)

While Vertex AI supports pre-built containers (TensorFlow, scikit-learn, XGBoost), production systems often require custom logic. CPR provides a Pythonic interface for building custom serving containers.

The Predictor Class:

# predictor.py
from google.cloud.aiplatform.prediction.predictor import Predictor
from google.cloud.aiplatform.utils import prediction_utils
import numpy as np
import joblib
import os

class CustomPredictor(Predictor):
    """
    Custom predictor implementing the CPR interface.
    """

    def __init__(self):
        """
        Constructor. Do NOT load model here (not yet available).
        """
        self._model = None
        self._preprocessor = None

    def load(self, artifacts_uri: str) -> None:
        """
        Loads the model from the artifacts directory.
        Called ONCE when the container starts.
        
        Args:
            artifacts_uri: GCS path (e.g., gs://bucket/model/) or local path
        """
        # Download artifacts from GCS if needed
        prediction_utils.download_model_artifacts(artifacts_uri)
        
        # Load model
        model_path = os.path.join(artifacts_uri, 'model.joblib')
        self._model = joblib.load(model_path)
        
        # Load preprocessor
        preprocessor_path = os.path.join(artifacts_uri, 'preprocessor.joblib')
        if os.path.exists(preprocessor_path):
            self._preprocessor = joblib.load(preprocessor_path)

    def preprocess(self, prediction_input: dict) -> np.ndarray:
        """
        Preprocesses the input.
        Called for EVERY request.
        
        Args:
            prediction_input: {"instances": [[f1, f2, ...], ...]}
            
        Returns:
            Numpy array ready for model.predict()
        """
        instances = prediction_input["instances"]
        arr = np.array(instances)
        
        if self._preprocessor:
            arr = self._preprocessor.transform(arr)
        
        return arr

    def predict(self, instances: np.ndarray) -> np.ndarray:
        """
        Runs inference.
        Called for EVERY request.
        
        Args:
            instances: Preprocessed input array
            
        Returns:
            Predictions as numpy array
        """
        return self._model.predict(instances)

    def postprocess(self, prediction_results: np.ndarray) -> dict:
        """
        Formats the output.
        Called for EVERY request.
        
        Args:
            prediction_results: Raw model outputs
            
        Returns:
            {"predictions": [...]}
        """
        return {"predictions": prediction_results.tolist()}

Building and Uploading the Model:

from google.cloud import aiplatform
from google.cloud.aiplatform.prediction import LocalModel

# Build the container locally
local_model = LocalModel.build_cpr_model(
    source_dir="src",  # Directory containing predictor.py
    output_image_uri=f"us-docker.pkg.dev/{PROJECT_ID}/ml-repo/custom-predictor:v1",
    predictor=CustomPredictor,
    requirements_path="src/requirements.txt",
    extra_packages=[]
)

# Push to Artifact Registry
local_model.push_image()

# Upload to Vertex AI Model Registry
model = local_model.upload(
    display_name="fraud-detector-v1",
    artifact_uri=f"gs://{BUCKET_NAME}/models/fraud-detector/v1",
    serving_container_ports=[8080],
)

print(f"Model uploaded: {model.resource_name}")

Deploying to an Endpoint

Step 1: Create an Endpoint

from google.cloud import aiplatform

aiplatform.init(project=PROJECT_ID, location=REGION)

endpoint = aiplatform.Endpoint.create(
    display_name="fraud-detection-endpoint",
    description="Production fraud detection endpoint",
    labels={"env": "prod", "team": "ml-platform"}
)

Step 2: Deploy the Model

model.deploy(
    endpoint=endpoint,
    deployed_model_display_name="fraud-v1",
    machine_type="n1-standard-4",
    min_replica_count=2,
    max_replica_count=10,
    accelerator_type="NVIDIA_TESLA_T4",  # Optional GPU
    accelerator_count=1,
    traffic_percentage=100,
    
    # Auto-scaling settings
    autoscaling_target_cpu_utilization=60,  # Scale when CPU > 60%
    autoscaling_target_accelerator_duty_cycle=80,  # Scale when GPU > 80%
)

Traffic Splitting for A/B Testing

Vertex AI makes canary deployments trivial.

Scenario: Deploy v2 with 10% traffic, v1 keeps 90%.

# Deploy v2 to the same endpoint
model_v2.deploy(
    endpoint=endpoint,
    deployed_model_display_name="fraud-v2",
    machine_type="n1-standard-4",
    min_replica_count=1,
    max_replica_count=5,
    traffic_percentage=10,  # 10% to v2
    traffic_split={
        "fraud-v1": 90,  # 90% to v1
        "fraud-v2": 10   # 10% to v2
    }
)

Monitoring the Split:

# Get traffic allocation
endpoint.list_deployed_models()
# Returns: [
#   {"id": "...", "display_name": "fraud-v1", "traffic_split": 90},
#   {"id": "...", "display_name": "fraud-v2", "traffic_split": 10}
# ]

Promoting v2:

# Send 100% traffic to v2
endpoint.update_traffic_split({"fraud-v2": 100})

# Optionally undeploy v1
endpoint.undeploy(deployed_model_id="fraud-v1-id")

Private Endpoints and VPC Service Controls

Enterprise deployments require private networking.

Private Service Connect (PSC):

from google.cloud import aiplatform

endpoint = aiplatform.Endpoint.create(
    display_name="private-fraud-endpoint",
    network="projects/{PROJECT_NUMBER}/global/networks/{VPC_NAME}",
    encryption_spec_key_name=f"projects/{PROJECT_ID}/locations/{REGION}/keyRings/my-kr/cryptoKeys/my-key"
)

This creates an endpoint accessible only within your VPC, with no public internet exposure.


15.1.4 Comparative Analysis

FeatureAWS SageMakerGCP Vertex AI
Billing ModelInstance-hour (24/7 running)Node-hour (24/7 running)
Deployment AbstractionModel → EndpointConfig → EndpointModel → Endpoint → DeployedModel
Multi-Model ServingMulti-Model Endpoints (MME) - Very efficientManual (deploy multiple Models to one Endpoint)
Traffic SplittingProduction Variants (cumbersome)Native, elegant traffic_percentage
ProtocolHTTP/REST (gRPC via custom setup)HTTP/REST and gRPC native
Private NetworkingVPC Endpoints (PrivateLink)Private Service Connect (PSC)
Log LatencyCloudWatch (1-5 min delay)Cloud Logging (near real-time)
GPU VarietyT4, A10G, V100, A100, Inferentia, TrainiumT4, L4, A100, H100, TPU

Key Differentiator: MME: For multi-tenant SaaS (one model per customer), SageMaker’s MME is a 10x cost saver. Vertex AI doesn’t have an equivalent.

Key Differentiator: Traffic Splitting: Vertex AI’s traffic splitting is far more elegant and Pythonic than SageMaker’s Production Variants.


15.1.5 Monitoring and Observability

Deploying is 10% of the work. Keeping the system healthy is the other 90%.

The Four Golden Signals

  1. Latency: How long does it take to return a prediction?
  2. Traffic: How many requests per second?
  3. Errors: What percentage of requests fail?
  4. Saturation: Are resources (CPU/GPU/Memory) approaching limits?

SageMaker CloudWatch Metrics:

import boto3

cloudwatch = boto3.client('cloudwatch')

# Query P99 latency
response = cloudwatch.get_metric_statistics(
    Namespace='AWS/SageMaker',
    MetricName='ModelLatency',
    Dimensions=[
        {'Name': 'EndpointName', 'Value': 'fraud-detector-prod'},
        {'Name': 'VariantName', 'Value': 'AllTraffic'}
    ],
    StartTime=datetime.utcnow() - timedelta(hours=1),
    EndTime=datetime.utcnow(),
    Period=300,
    Statistics=['Average', 'Maximum'],
    ExtendedStatistics=['p99']
)

Vertex AI Monitoring (Cloud Monitoring):

from google.cloud import monitoring_v3

client = monitoring_v3.MetricServiceClient()
project_name = f"projects/{PROJECT_ID}"

# Query request count
query = monitoring_v3.TimeSeriesQuery(
    query=f'''
    fetch aiplatform.googleapis.com/prediction/online/prediction_count
    | filter resource.endpoint_id == "{ENDPOINT_ID}"
    | group_by 1m, mean(val())
    '''
)

results = client.query_time_series(request={"name": project_name, "query": query.query})

SageMaker Model Monitor

Model Monitor automatically detects data drift and model quality degradation.

Setup:

from sagemaker.model_monitor import DefaultModelMonitor, CronExpressionGenerator

monitor = DefaultModelMonitor(
    role=role,
    instance_count=1,
    instance_type='ml.m5.xlarge',
    volume_size_in_gb=20,
    max_runtime_in_seconds=3600
)

monitor.create_monitoring_schedule(
    endpoint_input=predictor.endpoint_name,
    output_s3_uri=f's3://my-bucket/model-monitor/reports',
    statistics=baseline_statistics_path,
    constraints=baseline_constraints_path,
    schedule_cron_expression=CronExpressionGenerator.hourly()
)

This runs hourly jobs to compare live traffic against the training baseline.


15.1.6 Cost Optimization Strategies

1. Instance Right-Sizing:

Use CloudWatch GPU Utilization metrics. If consistently < 20%, downgrade to CPU or smaller GPU.

2. Spot Instances (Experimental):

Not officially supported, but you can deploy custom containers on EC2 Spot behind your own ALB.

3. Serverless Inference (SageMaker):

For sporadic workloads, use SageMaker Serverless:

from sagemaker.serverless import ServerlessInferenceConfig

serverless_config = ServerlessInferenceConfig(
    memory_size_in_mb=4096,
    max_concurrency=10,
    provisioned_concurrency=2  # Keep 2 warm
)

predictor = model.deploy(
    serverless_inference_config=serverless_config
)

Cost Comparison:

  • Real-time: $0.736/hour = $531/month (24/7)
  • Serverless: $0.20/hour compute + $0.000001/request (scales to zero)

15.1.7 Conclusion

Managed real-time inference services provide a robust foundation for production ML systems. SageMaker excels in multi-tenant scenarios with MME, while Vertex AI provides a cleaner API and superior traffic splitting. Both require deep understanding of their operational knobs—auto-scaling policies, instance selection, and monitoring—to deliver cost-effective, reliable predictions at scale.