Keyboard shortcuts

Press or to navigate between chapters

Press ? to show this help

Press Esc to hide this help

43.2. Serverless MLOps (Lambda / Cloud Run)

Tip

Scale-to-Zero is the most critical feature for pre-PMF startups. Deploy 50 experimental models for near-zero cost—you only pay when a user actually clicks.


43.2.1. The Economics of Serverless vs Serverful

Cost Comparison by Traffic Pattern

Traffic PatternServerful (EC2)Serverless (Lambda)Winner
0 requests/day$180/month$0/monthLambda
1,000 requests/day$180/month$3/monthLambda
100,000 requests/day$180/month$15/monthLambda
1M requests/day$180/month$150/monthLambda
10M requests/day$180/month$1,500/monthEC2
100M requests/day$360/month (+ scale)$15,000/monthEC2

Little’s Law for Concurrency

$$ L = \lambda \times W $$

VariableDefinitionExample
LConcurrent executions200
λRequest rate (req/sec)100
WExecution time (seconds)2
def calculate_concurrency(requests_per_second: float, execution_time_s: float) -> dict:
    """Calculate Lambda concurrency requirements."""
    concurrent = requests_per_second * execution_time_s
    
    return {
        "concurrent_executions": int(concurrent),
        "default_limit": 1000,
        "needs_quota_increase": concurrent > 1000,
        "estimated_cost_per_1m": round(
            1_000_000 * (128 / 1024) * execution_time_s * 0.0000166667, 2
        )
    }

# Example
calc = calculate_concurrency(requests_per_second=100, execution_time_s=2)
# {'concurrent_executions': 200, 'needs_quota_increase': False, ...}

Decision Framework

graph TD
    A[New ML Endpoint] --> B{Daily Requests?}
    B -->|< 100K| C[Serverless]
    B -->|100K - 1M| D{Latency Critical?}
    B -->|> 1M| E[Serverful]
    
    D -->|No| C
    D -->|Yes| F{Cold Start OK?}
    
    F -->|Yes| G[Lambda + Provisioned]
    F -->|No| E
    
    C --> H[Lambda / Cloud Run]
    G --> H
    E --> I[ECS / K8s]

43.2.2. The Lambdaith Pattern

Avoid “Micro-Lambdas” (one function per endpoint). Use the Lambdaith: a single Lambda running FastAPI.

Why Lambdaith?

ApproachCold Start PenaltyMemory EfficiencyComplexity
Micro-Lambdas (10 functions)10× model loads10× memoryHigh
Lambdaith (1 function)1× model load1× memoryLow

FastAPI + Mangum Implementation

# app.py
from fastapi import FastAPI, HTTPException
from mangum import Mangum
from pydantic import BaseModel, Field
from typing import List, Optional
import torch
import boto3
import os

app = FastAPI(
    title="ML Inference API",
    description="Serverless ML inference endpoint",
    version="1.0.0"
)

# Global model cache
_model = None
_tokenizer = None

def get_model():
    """Lazy load model on first request."""
    global _model, _tokenizer
    
    if _model is None:
        model_path = os.environ.get("MODEL_PATH", "/opt/ml/model")
        
        # Load from S3 if needed
        if model_path.startswith("s3://"):
            s3 = boto3.client("s3")
            bucket, key = model_path.replace("s3://", "").split("/", 1)
            local_path = "/tmp/model.pt"
            s3.download_file(bucket, key, local_path)
            model_path = local_path
        
        _model = torch.jit.load(model_path)
        _model.eval()
    
    return _model


class PredictRequest(BaseModel):
    text: str = Field(..., min_length=1, max_length=1000)
    threshold: float = Field(0.5, ge=0, le=1)

class PredictResponse(BaseModel):
    prediction: str
    confidence: float
    model_version: str

class BatchRequest(BaseModel):
    items: List[PredictRequest] = Field(..., max_items=100)

class BatchResponse(BaseModel):
    predictions: List[PredictResponse]
    processed: int
    latency_ms: float


@app.get("/health")
async def health():
    """Health check for load balancer."""
    return {"status": "healthy"}


@app.post("/predict", response_model=PredictResponse)
async def predict(request: PredictRequest):
    """Single prediction endpoint."""
    import time
    start = time.perf_counter()
    
    model = get_model()
    
    # Tokenize and predict
    with torch.no_grad():
        # Simplified - real implementation would tokenize
        input_tensor = torch.randn(1, 768)
        output = model(input_tensor)
        confidence = torch.sigmoid(output).item()
    
    prediction = "positive" if confidence > request.threshold else "negative"
    
    return PredictResponse(
        prediction=prediction,
        confidence=round(confidence, 4),
        model_version=os.environ.get("MODEL_VERSION", "1.0.0")
    )


@app.post("/batch", response_model=BatchResponse)
async def batch_predict(request: BatchRequest):
    """Batch prediction for efficiency."""
    import time
    start = time.perf_counter()
    
    model = get_model()
    predictions = []
    
    for item in request.items:
        with torch.no_grad():
            input_tensor = torch.randn(1, 768)
            output = model(input_tensor)
            confidence = torch.sigmoid(output).item()
        
        predictions.append(PredictResponse(
            prediction="positive" if confidence > item.threshold else "negative",
            confidence=round(confidence, 4),
            model_version=os.environ.get("MODEL_VERSION", "1.0.0")
        ))
    
    latency = (time.perf_counter() - start) * 1000
    
    return BatchResponse(
        predictions=predictions,
        processed=len(predictions),
        latency_ms=round(latency, 2)
    )


# Lambda handler
handler = Mangum(app, lifespan="off")

# Handle warmup pings
def lambda_handler(event, context):
    # CloudWatch keep-warm event
    if event.get("source") == "aws.events":
        print("Warmup ping received")
        get_model()  # Pre-load model
        return {"statusCode": 200, "body": "warm"}
    
    return handler(event, context)

Optimized Dockerfile

# Dockerfile
FROM public.ecr.aws/lambda/python:3.11

# Install dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# CPU-only PyTorch (smaller image)
RUN pip install torch torchvision --index-url https://download.pytorch.org/whl/cpu

# Copy application
COPY app.py ${LAMBDA_TASK_ROOT}/
COPY models/ ${LAMBDA_TASK_ROOT}/models/

# Set handler
CMD ["app.lambda_handler"]

Size Optimization Tips

TechniqueSize ReductionImpact
CPU-only PyTorch-1.5GBCritical
Strip .so files-200MBMedium
Remove tests/docs-100MBLow
Use python:slim base-500MBMedium
Quantize model (INT8)-75% model sizeHigh
# Strip shared libraries
find /opt/python -name "*.so" -exec strip --strip-unneeded {} \;

# Remove unnecessary files
find /opt/python -name "tests" -type d -exec rm -rf {} +
find /opt/python -name "__pycache__" -type d -exec rm -rf {} +
find /opt/python -name "*.pyc" -delete

43.2.3. GPU Serverless: Modal, Replicate, Beam

AWS Lambda has no GPUs. For LLMs/Diffusion, use GPU serverless providers.

Provider Comparison

ProviderGPU TypesCold StartPricingLock-in
ModalA10G, A100, H1001-5s$0.0005/s A10GHigh (DSL)
ReplicateA40, A1005-30s$0.00115/s A40Low (API)
BeamT4, A10G2-10sVariableMedium
BananaA10G5-15s$0.0004/sMedium
RunPod ServerlessVarious2-10sVariableLow
# modal_inference.py
import modal
from modal import Image, Stub, web_endpoint
from typing import Optional

# Define container image
image = Image.debian_slim().pip_install(
    "torch",
    "transformers",
    "diffusers",
    "accelerate"
)

stub = Stub("ml-inference", image=image)

# Persistent model storage
volume = modal.Volume.from_name("model-cache", create_if_missing=True)

@stub.cls(
    gpu="A10G",
    container_idle_timeout=300,  # Keep warm for 5 minutes
    volumes={"/models": volume}
)
class StableDiffusionService:
    """Serverless Stable Diffusion inference."""
    
    def __enter__(self):
        """Load model on container startup."""
        import torch
        from diffusers import StableDiffusionPipeline
        
        self.pipe = StableDiffusionPipeline.from_pretrained(
            "runwayml/stable-diffusion-v1-5",
            torch_dtype=torch.float16,
            cache_dir="/models"
        )
        self.pipe = self.pipe.to("cuda")
        self.pipe.enable_attention_slicing()
    
    @modal.method()
    def generate(
        self, 
        prompt: str,
        negative_prompt: str = "",
        num_inference_steps: int = 30,
        guidance_scale: float = 7.5
    ) -> bytes:
        """Generate image from prompt."""
        import io
        
        image = self.pipe(
            prompt,
            negative_prompt=negative_prompt,
            num_inference_steps=num_inference_steps,
            guidance_scale=guidance_scale
        ).images[0]
        
        buffer = io.BytesIO()
        image.save(buffer, format="PNG")
        return buffer.getvalue()
    
    @modal.web_endpoint()
    def api(self, prompt: str, steps: int = 30):
        """HTTP endpoint for image generation."""
        import base64
        
        image_bytes = self.generate(prompt, num_inference_steps=steps)
        
        return {
            "image": base64.b64encode(image_bytes).decode(),
            "prompt": prompt
        }


@stub.function(gpu="A10G", timeout=300)
def batch_generate(prompts: list) -> list:
    """Batch generation for multiple prompts."""
    service = StableDiffusionService()
    
    results = []
    for prompt in prompts:
        with service:
            image = service.generate(prompt)
            results.append(image)
    
    return results


# LLM Inference
@stub.cls(
    gpu="A100",
    container_idle_timeout=600
)
class LLMService:
    """Serverless LLM inference."""
    
    def __enter__(self):
        import torch
        from transformers import AutoModelForCausalLM, AutoTokenizer
        
        model_id = "meta-llama/Llama-2-7b-chat-hf"
        
        self.tokenizer = AutoTokenizer.from_pretrained(model_id)
        self.model = AutoModelForCausalLM.from_pretrained(
            model_id,
            torch_dtype=torch.float16,
            device_map="auto"
        )
    
    @modal.method()
    def generate(self, prompt: str, max_tokens: int = 256) -> str:
        inputs = self.tokenizer(prompt, return_tensors="pt").to("cuda")
        
        outputs = self.model.generate(
            **inputs,
            max_new_tokens=max_tokens,
            do_sample=True,
            temperature=0.7
        )
        
        return self.tokenizer.decode(outputs[0], skip_special_tokens=True)


# Deploy
if __name__ == "__main__":
    stub.deploy()

Replicate Integration

# replicate_client.py
import replicate
from typing import Optional, List
import asyncio
import httpx

class ReplicateClient:
    """Client for Replicate serverless inference."""
    
    def __init__(self, api_token: str):
        self.client = replicate.Client(api_token=api_token)
    
    def run_stable_diffusion(
        self,
        prompt: str,
        negative_prompt: str = "",
        width: int = 512,
        height: int = 512,
        num_outputs: int = 1
    ) -> List[str]:
        """Run Stable Diffusion on Replicate."""
        output = self.client.run(
            "stability-ai/stable-diffusion:db21e45d3f7023abc2a46ee38a23973f6dce16bb082a930b0c49861f96d1e5bf",
            input={
                "prompt": prompt,
                "negative_prompt": negative_prompt,
                "width": width,
                "height": height,
                "num_outputs": num_outputs
            }
        )
        return list(output)
    
    def run_llama(
        self,
        prompt: str,
        max_tokens: int = 256,
        temperature: float = 0.7
    ) -> str:
        """Run Llama on Replicate."""
        output = self.client.run(
            "meta/llama-2-70b-chat:02e509c789964a7ea8736978a43525956ef40397be9033abf9fd2badfe68c9e3",
            input={
                "prompt": prompt,
                "max_new_tokens": max_tokens,
                "temperature": temperature
            }
        )
        return "".join(output)
    
    async def run_async(self, model: str, inputs: dict) -> dict:
        """Run model asynchronously."""
        prediction = self.client.predictions.create(
            model=model,
            input=inputs
        )
        
        # Poll for completion
        while prediction.status not in ["succeeded", "failed", "canceled"]:
            await asyncio.sleep(0.5)
            prediction.reload()
        
        if prediction.status == "failed":
            raise Exception(f"Prediction failed: {prediction.error}")
        
        return prediction.output

43.2.4. Terraform: Async Inference Stack

Sync Lambda has 29s hard timeout. ML often exceeds this. Use async pattern.

graph LR
    A[API Gateway] --> B[Lambda: Enqueue]
    B --> C[SQS Queue]
    C --> D[Lambda: Process]
    D --> E[DynamoDB: Results]
    F[Webhook/Poll] --> E

Full Terraform Configuration

# main.tf

terraform {
  required_providers {
    aws = {
      source  = "hashicorp/aws"
      version = "~> 5.0"
    }
  }
}

provider "aws" {
  region = var.region
}

# ECR Repository
resource "aws_ecr_repository" "ml_inference" {
  name                 = "ml-inference-${var.environment}"
  image_tag_mutability = "IMMUTABLE"
  
  image_scanning_configuration {
    scan_on_push = true
  }
  
  encryption_configuration {
    encryption_type = "AES256"
  }
}

# SQS Queue for async processing
resource "aws_sqs_queue" "inference_queue" {
  name                       = "ml-inference-queue-${var.environment}"
  visibility_timeout_seconds = 360  # 6 minutes (> Lambda timeout)
  message_retention_seconds  = 86400
  receive_wait_time_seconds  = 20  # Long polling
  
  redrive_policy = jsonencode({
    deadLetterTargetArn = aws_sqs_queue.dlq.arn
    maxReceiveCount     = 3
  })
}

resource "aws_sqs_queue" "dlq" {
  name                      = "ml-inference-dlq-${var.environment}"
  message_retention_seconds = 1209600  # 14 days
}

# DynamoDB for results
resource "aws_dynamodb_table" "inference_results" {
  name         = "ml-inference-results-${var.environment}"
  billing_mode = "PAY_PER_REQUEST"
  hash_key     = "request_id"
  
  attribute {
    name = "request_id"
    type = "S"
  }
  
  ttl {
    attribute_name = "ttl"
    enabled        = true
  }
}

# Lambda IAM Role
resource "aws_iam_role" "lambda_role" {
  name = "ml-lambda-role-${var.environment}"
  
  assume_role_policy = jsonencode({
    Version = "2012-10-17"
    Statement = [{
      Action = "sts:AssumeRole"
      Effect = "Allow"
      Principal = { Service = "lambda.amazonaws.com" }
    }]
  })
}

resource "aws_iam_role_policy" "lambda_policy" {
  name = "ml-lambda-policy"
  role = aws_iam_role.lambda_role.id
  
  policy = jsonencode({
    Version = "2012-10-17"
    Statement = [
      {
        Effect = "Allow"
        Action = [
          "logs:CreateLogGroup",
          "logs:CreateLogStream",
          "logs:PutLogEvents"
        ]
        Resource = "arn:aws:logs:*:*:*"
      },
      {
        Effect = "Allow"
        Action = [
          "sqs:ReceiveMessage",
          "sqs:DeleteMessage",
          "sqs:GetQueueAttributes",
          "sqs:SendMessage"
        ]
        Resource = [
          aws_sqs_queue.inference_queue.arn,
          aws_sqs_queue.dlq.arn
        ]
      },
      {
        Effect = "Allow"
        Action = [
          "dynamodb:PutItem",
          "dynamodb:GetItem",
          "dynamodb:UpdateItem"
        ]
        Resource = aws_dynamodb_table.inference_results.arn
      },
      {
        Effect = "Allow"
        Action = ["s3:GetObject"]
        Resource = "arn:aws:s3:::${var.model_bucket}/*"
      }
    ]
  })
}

# Lambda Function
resource "aws_lambda_function" "inference_worker" {
  function_name = "ml-inference-worker-${var.environment}"
  role          = aws_iam_role.lambda_role.arn
  package_type  = "Image"
  image_uri     = "${aws_ecr_repository.ml_inference.repository_url}:latest"
  
  timeout     = 300  # 5 minutes
  memory_size = 3008  # Max memory = 2 vCPUs
  
  environment {
    variables = {
      MODEL_BUCKET    = var.model_bucket
      RESULTS_TABLE   = aws_dynamodb_table.inference_results.name
      ENVIRONMENT     = var.environment
    }
  }
  
  # VPC config if needed
  dynamic "vpc_config" {
    for_each = var.vpc_enabled ? [1] : []
    content {
      subnet_ids         = var.subnet_ids
      security_group_ids = var.security_group_ids
    }
  }
}

# Connect SQS to Lambda
resource "aws_lambda_event_source_mapping" "sqs_trigger" {
  event_source_arn                   = aws_sqs_queue.inference_queue.arn
  function_name                      = aws_lambda_function.inference_worker.arn
  batch_size                         = 1
  maximum_batching_window_in_seconds = 0
  
  scaling_config {
    maximum_concurrency = 10
  }
}

# API Gateway for submitting requests
resource "aws_apigatewayv2_api" "inference_api" {
  name          = "ml-inference-api-${var.environment}"
  protocol_type = "HTTP"
  
  cors_configuration {
    allow_origins = ["*"]
    allow_methods = ["POST", "GET"]
    allow_headers = ["Content-Type"]
  }
}

resource "aws_apigatewayv2_stage" "default" {
  api_id      = aws_apigatewayv2_api.inference_api.id
  name        = "$default"
  auto_deploy = true
  
  access_log_settings {
    destination_arn = aws_cloudwatch_log_group.api_logs.arn
    format = jsonencode({
      requestId      = "$context.requestId"
      ip             = "$context.identity.sourceIp"
      requestTime    = "$context.requestTime"
      httpMethod     = "$context.httpMethod"
      routeKey       = "$context.routeKey"
      status         = "$context.status"
      responseLength = "$context.responseLength"
    })
  }
}

resource "aws_cloudwatch_log_group" "api_logs" {
  name              = "/aws/apigateway/ml-inference-${var.environment}"
  retention_in_days = 14
}

# Enqueue Lambda
resource "aws_lambda_function" "enqueue" {
  function_name = "ml-inference-enqueue-${var.environment}"
  role          = aws_iam_role.lambda_role.arn
  runtime       = "python3.11"
  handler       = "enqueue.handler"
  
  filename         = "lambda/enqueue.zip"
  source_code_hash = filebase64sha256("lambda/enqueue.zip")
  
  timeout     = 10
  memory_size = 256
  
  environment {
    variables = {
      QUEUE_URL     = aws_sqs_queue.inference_queue.url
      RESULTS_TABLE = aws_dynamodb_table.inference_results.name
    }
  }
}

# API Gateway routes
resource "aws_apigatewayv2_integration" "enqueue" {
  api_id                 = aws_apigatewayv2_api.inference_api.id
  integration_type       = "AWS_PROXY"
  integration_uri        = aws_lambda_function.enqueue.invoke_arn
  payload_format_version = "2.0"
}

resource "aws_apigatewayv2_route" "submit" {
  api_id    = aws_apigatewayv2_api.inference_api.id
  route_key = "POST /predict"
  target    = "integrations/${aws_apigatewayv2_integration.enqueue.id}"
}

resource "aws_apigatewayv2_route" "status" {
  api_id    = aws_apigatewayv2_api.inference_api.id
  route_key = "GET /status/{request_id}"
  target    = "integrations/${aws_apigatewayv2_integration.enqueue.id}"
}

resource "aws_lambda_permission" "api_gateway" {
  statement_id  = "AllowAPIGateway"
  action        = "lambda:InvokeFunction"
  function_name = aws_lambda_function.enqueue.function_name
  principal     = "apigateway.amazonaws.com"
  source_arn    = "${aws_apigatewayv2_api.inference_api.execution_arn}/*/*"
}

# Outputs
output "api_endpoint" {
  value = aws_apigatewayv2_stage.default.invoke_url
}

output "ecr_repository" {
  value = aws_ecr_repository.ml_inference.repository_url
}

Enqueue Handler

# lambda/enqueue.py
import json
import boto3
import uuid
import os
import time

sqs = boto3.client("sqs")
dynamodb = boto3.resource("dynamodb")

QUEUE_URL = os.environ["QUEUE_URL"]
RESULTS_TABLE = os.environ["RESULTS_TABLE"]

def handler(event, context):
    """Handle API Gateway requests."""
    method = event.get("requestContext", {}).get("http", {}).get("method")
    path = event.get("rawPath", "")
    
    if method == "POST" and "/predict" in path:
        return submit_request(event)
    elif method == "GET" and "/status/" in path:
        request_id = event.get("pathParameters", {}).get("request_id")
        return get_status(request_id)
    
    return {"statusCode": 404, "body": "Not found"}


def submit_request(event):
    """Submit prediction request to queue."""
    try:
        body = json.loads(event.get("body", "{}"))
    except json.JSONDecodeError:
        return {"statusCode": 400, "body": "Invalid JSON"}
    
    request_id = str(uuid.uuid4())
    
    # Store pending status
    table = dynamodb.Table(RESULTS_TABLE)
    table.put_item(Item={
        "request_id": request_id,
        "status": "pending",
        "submitted_at": int(time.time()),
        "ttl": int(time.time()) + 86400  # 24 hour TTL
    })
    
    # Send to queue
    sqs.send_message(
        QueueUrl=QUEUE_URL,
        MessageBody=json.dumps({
            "request_id": request_id,
            "payload": body
        })
    )
    
    return {
        "statusCode": 202,
        "body": json.dumps({
            "request_id": request_id,
            "status": "pending",
            "poll_url": f"/status/{request_id}"
        })
    }


def get_status(request_id):
    """Get prediction status/result."""
    if not request_id:
        return {"statusCode": 400, "body": "Missing request_id"}
    
    table = dynamodb.Table(RESULTS_TABLE)
    response = table.get_item(Key={"request_id": request_id})
    
    if "Item" not in response:
        return {"statusCode": 404, "body": "Request not found"}
    
    item = response["Item"]
    
    return {
        "statusCode": 200,
        "body": json.dumps({
            "request_id": request_id,
            "status": item.get("status"),
            "result": item.get("result"),
            "error": item.get("error")
        })
    }

43.2.5. Cold Start Optimization

Cold starts kill UX. Here’s how to minimize them.

Cold Start Sources

SourceTypical DelayMitigation
Container init500-2000msSmaller image
Python import500-5000msLazy imports
Model load2000-30000msProvisioned concurrency
VPC ENI attach5000-10000msAvoid VPC if possible

Provisioned Concurrency

# provisioned_concurrency.tf

resource "aws_lambda_alias" "live" {
  name             = "live"
  function_name    = aws_lambda_function.inference_worker.function_name
  function_version = aws_lambda_function.inference_worker.version
}

resource "aws_lambda_provisioned_concurrency_config" "warm" {
  function_name                     = aws_lambda_function.inference_worker.function_name
  qualifier                         = aws_lambda_alias.live.name
  provisioned_concurrent_executions = 5
}

# Cost: ~$15/month per instance

The Poor Man’s Warmer

# warmer.py
import json
import boto3
from typing import List

lambda_client = boto3.client("lambda")

def warm_functions(function_names: List[str], concurrency: int = 5):
    """Send warmup pings to multiple Lambda instances."""
    
    for func_name in function_names:
        for i in range(concurrency):
            lambda_client.invoke(
                FunctionName=func_name,
                InvocationType="Event",  # Async
                Payload=json.dumps({
                    "source": "aws.events",
                    "detail-type": "Warmup",
                    "instance": i
                })
            )
    
    return {"warmed": len(function_names) * concurrency}


# CloudWatch Events Rule (Terraform)
"""
resource "aws_cloudwatch_event_rule" "warmer" {
  name                = "lambda-warmer"
  schedule_expression = "rate(4 minutes)"
}

resource "aws_cloudwatch_event_target" "warmer" {
  rule = aws_cloudwatch_event_rule.warmer.name
  arn  = aws_lambda_function.warmer.arn
  
  input = jsonencode({
    functions = ["ml-inference-worker-prod"]
    concurrency = 3
  })
}
"""

Lazy Loading Pattern

# lazy_loading.py
import os
from functools import lru_cache
from typing import Optional

# Don't import heavy libraries at module level
# BAD: import torch, transformers, scipy, numpy

class LazyLoader:
    """Lazy load heavy dependencies."""
    
    _torch = None
    _model = None
    _tokenizer = None
    
    @classmethod
    def get_torch(cls):
        if cls._torch is None:
            import torch
            cls._torch = torch
        return cls._torch
    
    @classmethod
    @lru_cache(maxsize=1)
    def get_model(cls):
        if cls._model is None:
            torch = cls.get_torch()
            
            # Import here, not at module level
            from transformers import AutoModel
            
            model_path = os.environ.get("MODEL_PATH", "model.pt")
            
            if model_path.endswith(".pt"):
                cls._model = torch.jit.load(model_path)
            else:
                cls._model = AutoModel.from_pretrained(model_path)
            
            cls._model.eval()
        
        return cls._model


def handler(event, context):
    # Warmup ping - just load model
    if event.get("source") == "aws.events":
        LazyLoader.get_model()
        return {"statusCode": 200, "body": "warm"}
    
    # Real request - model already loaded
    model = LazyLoader.get_model()
    # ... inference logic

43.2.6. Event-Driven Architecture

Replace service-to-service calls with event flows.

graph TB
    A[S3: Video Upload] --> B[EventBridge]
    B --> C[Lambda: Transcode]
    B --> D[Lambda: Thumbnail]
    B --> E[Lambda: Whisper Transcribe]
    B --> F[Lambda: Object Detection]
    
    C --> G[S3: Processed]
    D --> G
    E --> H[DynamoDB: Metadata]
    F --> H
    
    G --> I[CloudFront CDN]
    H --> J[API: Video Details]

Fan-Out Implementation

# eventbridge.tf

resource "aws_s3_bucket_notification" "video_upload" {
  bucket = aws_s3_bucket.uploads.id
  
  eventbridge = true
}

resource "aws_cloudwatch_event_rule" "video_uploaded" {
  name = "video-uploaded-${var.environment}"
  
  event_pattern = jsonencode({
    source      = ["aws.s3"]
    detail-type = ["Object Created"]
    detail = {
      bucket = { name = [aws_s3_bucket.uploads.id] }
      object = { key = [{ prefix = "videos/" }] }
    }
  })
}

# Transcode Lambda
resource "aws_cloudwatch_event_target" "transcode" {
  rule = aws_cloudwatch_event_rule.video_uploaded.name
  arn  = aws_lambda_function.transcode.arn
}

# Thumbnail Lambda
resource "aws_cloudwatch_event_target" "thumbnail" {
  rule = aws_cloudwatch_event_rule.video_uploaded.name
  arn  = aws_lambda_function.thumbnail.arn
}

# Transcription Lambda
resource "aws_cloudwatch_event_target" "transcribe" {
  rule = aws_cloudwatch_event_rule.video_uploaded.name
  arn  = aws_lambda_function.transcribe.arn
}

# Object Detection Lambda
resource "aws_cloudwatch_event_target" "detect_objects" {
  rule = aws_cloudwatch_event_rule.video_uploaded.name
  arn  = aws_lambda_function.detect_objects.arn
}

43.2.7. Troubleshooting

Common Issues

ProblemSymptomCauseSolution
Timeout15min limit hitLong inferenceUse Fargate or Step Functions
OOMsignal: killedModel > memoryIncrease to 10GB or quantize
Cold Start10s+ latencyHeavy importsProvisioned concurrency
ENI ExhaustionStuck in PendingVPC Lambda limitRun outside VPC
Payload limit413 error>6MB sync payloadUse S3 presigned URLs

Debug Pattern

import logging
import json
import traceback
import time

logger = logging.getLogger()
logger.setLevel(logging.INFO)

def handler(event, context):
    request_id = context.aws_request_id
    start = time.perf_counter()
    
    logger.info(json.dumps({
        "event": "request_start",
        "request_id": request_id,
        "memory_limit_mb": context.memory_limit_in_mb,
        "remaining_time_ms": context.get_remaining_time_in_millis()
    }))
    
    try:
        result = process(event)
        
        logger.info(json.dumps({
            "event": "request_complete",
            "request_id": request_id,
            "duration_ms": (time.perf_counter() - start) * 1000,
            "remaining_time_ms": context.get_remaining_time_in_millis()
        }))
        
        return {"statusCode": 200, "body": json.dumps(result)}
    
    except Exception as e:
        logger.error(json.dumps({
            "event": "request_error",
            "request_id": request_id,
            "error": str(e),
            "traceback": traceback.format_exc()
        }))
        
        return {"statusCode": 500, "body": json.dumps({"error": str(e)})}

43.2.8. Summary Checklist

StepActionPriority
1Use Lambdaith pattern (single function)Critical
2CPU-only PyTorch for LambdaCritical
3Async pattern for >30s workloadsHigh
4Provisioned concurrency for productionHigh
5Lazy load models on first requestHigh
6Modal/Replicate for GPU inferenceMedium
7S3 presigned URLs for large payloadsMedium
8Event-driven for pipelinesMedium
9Structured logging for debuggingMedium
10Avoid VPC unless necessaryLow

Platform Selection Guide

RequirementAWSGCPModalReplicate
CPU inferenceLambdaCloud Run
GPU inferenceSageMakerCloud Run GPU
Scale-to-zero
Cold start1-10s1-5s1-5s5-30s
Max memory10GB32GB256GBVaries
Max timeout15min60minUnlimitedUnlimited

[End of Section 43.2]