43.2. Serverless MLOps (Lambda / Cloud Run)
Tip
Scale-to-Zero is the most critical feature for pre-PMF startups. Deploy 50 experimental models for near-zero cost—you only pay when a user actually clicks.
43.2.1. The Economics of Serverless vs Serverful
Cost Comparison by Traffic Pattern
| Traffic Pattern | Serverful (EC2) | Serverless (Lambda) | Winner |
|---|---|---|---|
| 0 requests/day | $180/month | $0/month | Lambda |
| 1,000 requests/day | $180/month | $3/month | Lambda |
| 100,000 requests/day | $180/month | $15/month | Lambda |
| 1M requests/day | $180/month | $150/month | Lambda |
| 10M requests/day | $180/month | $1,500/month | EC2 |
| 100M requests/day | $360/month (+ scale) | $15,000/month | EC2 |
Little’s Law for Concurrency
$$ L = \lambda \times W $$
| Variable | Definition | Example |
|---|---|---|
| L | Concurrent executions | 200 |
| λ | Request rate (req/sec) | 100 |
| W | Execution time (seconds) | 2 |
def calculate_concurrency(requests_per_second: float, execution_time_s: float) -> dict:
"""Calculate Lambda concurrency requirements."""
concurrent = requests_per_second * execution_time_s
return {
"concurrent_executions": int(concurrent),
"default_limit": 1000,
"needs_quota_increase": concurrent > 1000,
"estimated_cost_per_1m": round(
1_000_000 * (128 / 1024) * execution_time_s * 0.0000166667, 2
)
}
# Example
calc = calculate_concurrency(requests_per_second=100, execution_time_s=2)
# {'concurrent_executions': 200, 'needs_quota_increase': False, ...}
Decision Framework
graph TD
A[New ML Endpoint] --> B{Daily Requests?}
B -->|< 100K| C[Serverless]
B -->|100K - 1M| D{Latency Critical?}
B -->|> 1M| E[Serverful]
D -->|No| C
D -->|Yes| F{Cold Start OK?}
F -->|Yes| G[Lambda + Provisioned]
F -->|No| E
C --> H[Lambda / Cloud Run]
G --> H
E --> I[ECS / K8s]
43.2.2. The Lambdaith Pattern
Avoid “Micro-Lambdas” (one function per endpoint). Use the Lambdaith: a single Lambda running FastAPI.
Why Lambdaith?
| Approach | Cold Start Penalty | Memory Efficiency | Complexity |
|---|---|---|---|
| Micro-Lambdas (10 functions) | 10× model loads | 10× memory | High |
| Lambdaith (1 function) | 1× model load | 1× memory | Low |
FastAPI + Mangum Implementation
# app.py
from fastapi import FastAPI, HTTPException
from mangum import Mangum
from pydantic import BaseModel, Field
from typing import List, Optional
import torch
import boto3
import os
app = FastAPI(
title="ML Inference API",
description="Serverless ML inference endpoint",
version="1.0.0"
)
# Global model cache
_model = None
_tokenizer = None
def get_model():
"""Lazy load model on first request."""
global _model, _tokenizer
if _model is None:
model_path = os.environ.get("MODEL_PATH", "/opt/ml/model")
# Load from S3 if needed
if model_path.startswith("s3://"):
s3 = boto3.client("s3")
bucket, key = model_path.replace("s3://", "").split("/", 1)
local_path = "/tmp/model.pt"
s3.download_file(bucket, key, local_path)
model_path = local_path
_model = torch.jit.load(model_path)
_model.eval()
return _model
class PredictRequest(BaseModel):
text: str = Field(..., min_length=1, max_length=1000)
threshold: float = Field(0.5, ge=0, le=1)
class PredictResponse(BaseModel):
prediction: str
confidence: float
model_version: str
class BatchRequest(BaseModel):
items: List[PredictRequest] = Field(..., max_items=100)
class BatchResponse(BaseModel):
predictions: List[PredictResponse]
processed: int
latency_ms: float
@app.get("/health")
async def health():
"""Health check for load balancer."""
return {"status": "healthy"}
@app.post("/predict", response_model=PredictResponse)
async def predict(request: PredictRequest):
"""Single prediction endpoint."""
import time
start = time.perf_counter()
model = get_model()
# Tokenize and predict
with torch.no_grad():
# Simplified - real implementation would tokenize
input_tensor = torch.randn(1, 768)
output = model(input_tensor)
confidence = torch.sigmoid(output).item()
prediction = "positive" if confidence > request.threshold else "negative"
return PredictResponse(
prediction=prediction,
confidence=round(confidence, 4),
model_version=os.environ.get("MODEL_VERSION", "1.0.0")
)
@app.post("/batch", response_model=BatchResponse)
async def batch_predict(request: BatchRequest):
"""Batch prediction for efficiency."""
import time
start = time.perf_counter()
model = get_model()
predictions = []
for item in request.items:
with torch.no_grad():
input_tensor = torch.randn(1, 768)
output = model(input_tensor)
confidence = torch.sigmoid(output).item()
predictions.append(PredictResponse(
prediction="positive" if confidence > item.threshold else "negative",
confidence=round(confidence, 4),
model_version=os.environ.get("MODEL_VERSION", "1.0.0")
))
latency = (time.perf_counter() - start) * 1000
return BatchResponse(
predictions=predictions,
processed=len(predictions),
latency_ms=round(latency, 2)
)
# Lambda handler
handler = Mangum(app, lifespan="off")
# Handle warmup pings
def lambda_handler(event, context):
# CloudWatch keep-warm event
if event.get("source") == "aws.events":
print("Warmup ping received")
get_model() # Pre-load model
return {"statusCode": 200, "body": "warm"}
return handler(event, context)
Optimized Dockerfile
# Dockerfile
FROM public.ecr.aws/lambda/python:3.11
# Install dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# CPU-only PyTorch (smaller image)
RUN pip install torch torchvision --index-url https://download.pytorch.org/whl/cpu
# Copy application
COPY app.py ${LAMBDA_TASK_ROOT}/
COPY models/ ${LAMBDA_TASK_ROOT}/models/
# Set handler
CMD ["app.lambda_handler"]
Size Optimization Tips
| Technique | Size Reduction | Impact |
|---|---|---|
| CPU-only PyTorch | -1.5GB | Critical |
| Strip .so files | -200MB | Medium |
| Remove tests/docs | -100MB | Low |
| Use python:slim base | -500MB | Medium |
| Quantize model (INT8) | -75% model size | High |
# Strip shared libraries
find /opt/python -name "*.so" -exec strip --strip-unneeded {} \;
# Remove unnecessary files
find /opt/python -name "tests" -type d -exec rm -rf {} +
find /opt/python -name "__pycache__" -type d -exec rm -rf {} +
find /opt/python -name "*.pyc" -delete
43.2.3. GPU Serverless: Modal, Replicate, Beam
AWS Lambda has no GPUs. For LLMs/Diffusion, use GPU serverless providers.
Provider Comparison
| Provider | GPU Types | Cold Start | Pricing | Lock-in |
|---|---|---|---|---|
| Modal | A10G, A100, H100 | 1-5s | $0.0005/s A10G | High (DSL) |
| Replicate | A40, A100 | 5-30s | $0.00115/s A40 | Low (API) |
| Beam | T4, A10G | 2-10s | Variable | Medium |
| Banana | A10G | 5-15s | $0.0004/s | Medium |
| RunPod Serverless | Various | 2-10s | Variable | Low |
Modal Implementation
# modal_inference.py
import modal
from modal import Image, Stub, web_endpoint
from typing import Optional
# Define container image
image = Image.debian_slim().pip_install(
"torch",
"transformers",
"diffusers",
"accelerate"
)
stub = Stub("ml-inference", image=image)
# Persistent model storage
volume = modal.Volume.from_name("model-cache", create_if_missing=True)
@stub.cls(
gpu="A10G",
container_idle_timeout=300, # Keep warm for 5 minutes
volumes={"/models": volume}
)
class StableDiffusionService:
"""Serverless Stable Diffusion inference."""
def __enter__(self):
"""Load model on container startup."""
import torch
from diffusers import StableDiffusionPipeline
self.pipe = StableDiffusionPipeline.from_pretrained(
"runwayml/stable-diffusion-v1-5",
torch_dtype=torch.float16,
cache_dir="/models"
)
self.pipe = self.pipe.to("cuda")
self.pipe.enable_attention_slicing()
@modal.method()
def generate(
self,
prompt: str,
negative_prompt: str = "",
num_inference_steps: int = 30,
guidance_scale: float = 7.5
) -> bytes:
"""Generate image from prompt."""
import io
image = self.pipe(
prompt,
negative_prompt=negative_prompt,
num_inference_steps=num_inference_steps,
guidance_scale=guidance_scale
).images[0]
buffer = io.BytesIO()
image.save(buffer, format="PNG")
return buffer.getvalue()
@modal.web_endpoint()
def api(self, prompt: str, steps: int = 30):
"""HTTP endpoint for image generation."""
import base64
image_bytes = self.generate(prompt, num_inference_steps=steps)
return {
"image": base64.b64encode(image_bytes).decode(),
"prompt": prompt
}
@stub.function(gpu="A10G", timeout=300)
def batch_generate(prompts: list) -> list:
"""Batch generation for multiple prompts."""
service = StableDiffusionService()
results = []
for prompt in prompts:
with service:
image = service.generate(prompt)
results.append(image)
return results
# LLM Inference
@stub.cls(
gpu="A100",
container_idle_timeout=600
)
class LLMService:
"""Serverless LLM inference."""
def __enter__(self):
import torch
from transformers import AutoModelForCausalLM, AutoTokenizer
model_id = "meta-llama/Llama-2-7b-chat-hf"
self.tokenizer = AutoTokenizer.from_pretrained(model_id)
self.model = AutoModelForCausalLM.from_pretrained(
model_id,
torch_dtype=torch.float16,
device_map="auto"
)
@modal.method()
def generate(self, prompt: str, max_tokens: int = 256) -> str:
inputs = self.tokenizer(prompt, return_tensors="pt").to("cuda")
outputs = self.model.generate(
**inputs,
max_new_tokens=max_tokens,
do_sample=True,
temperature=0.7
)
return self.tokenizer.decode(outputs[0], skip_special_tokens=True)
# Deploy
if __name__ == "__main__":
stub.deploy()
Replicate Integration
# replicate_client.py
import replicate
from typing import Optional, List
import asyncio
import httpx
class ReplicateClient:
"""Client for Replicate serverless inference."""
def __init__(self, api_token: str):
self.client = replicate.Client(api_token=api_token)
def run_stable_diffusion(
self,
prompt: str,
negative_prompt: str = "",
width: int = 512,
height: int = 512,
num_outputs: int = 1
) -> List[str]:
"""Run Stable Diffusion on Replicate."""
output = self.client.run(
"stability-ai/stable-diffusion:db21e45d3f7023abc2a46ee38a23973f6dce16bb082a930b0c49861f96d1e5bf",
input={
"prompt": prompt,
"negative_prompt": negative_prompt,
"width": width,
"height": height,
"num_outputs": num_outputs
}
)
return list(output)
def run_llama(
self,
prompt: str,
max_tokens: int = 256,
temperature: float = 0.7
) -> str:
"""Run Llama on Replicate."""
output = self.client.run(
"meta/llama-2-70b-chat:02e509c789964a7ea8736978a43525956ef40397be9033abf9fd2badfe68c9e3",
input={
"prompt": prompt,
"max_new_tokens": max_tokens,
"temperature": temperature
}
)
return "".join(output)
async def run_async(self, model: str, inputs: dict) -> dict:
"""Run model asynchronously."""
prediction = self.client.predictions.create(
model=model,
input=inputs
)
# Poll for completion
while prediction.status not in ["succeeded", "failed", "canceled"]:
await asyncio.sleep(0.5)
prediction.reload()
if prediction.status == "failed":
raise Exception(f"Prediction failed: {prediction.error}")
return prediction.output
43.2.4. Terraform: Async Inference Stack
Sync Lambda has 29s hard timeout. ML often exceeds this. Use async pattern.
graph LR
A[API Gateway] --> B[Lambda: Enqueue]
B --> C[SQS Queue]
C --> D[Lambda: Process]
D --> E[DynamoDB: Results]
F[Webhook/Poll] --> E
Full Terraform Configuration
# main.tf
terraform {
required_providers {
aws = {
source = "hashicorp/aws"
version = "~> 5.0"
}
}
}
provider "aws" {
region = var.region
}
# ECR Repository
resource "aws_ecr_repository" "ml_inference" {
name = "ml-inference-${var.environment}"
image_tag_mutability = "IMMUTABLE"
image_scanning_configuration {
scan_on_push = true
}
encryption_configuration {
encryption_type = "AES256"
}
}
# SQS Queue for async processing
resource "aws_sqs_queue" "inference_queue" {
name = "ml-inference-queue-${var.environment}"
visibility_timeout_seconds = 360 # 6 minutes (> Lambda timeout)
message_retention_seconds = 86400
receive_wait_time_seconds = 20 # Long polling
redrive_policy = jsonencode({
deadLetterTargetArn = aws_sqs_queue.dlq.arn
maxReceiveCount = 3
})
}
resource "aws_sqs_queue" "dlq" {
name = "ml-inference-dlq-${var.environment}"
message_retention_seconds = 1209600 # 14 days
}
# DynamoDB for results
resource "aws_dynamodb_table" "inference_results" {
name = "ml-inference-results-${var.environment}"
billing_mode = "PAY_PER_REQUEST"
hash_key = "request_id"
attribute {
name = "request_id"
type = "S"
}
ttl {
attribute_name = "ttl"
enabled = true
}
}
# Lambda IAM Role
resource "aws_iam_role" "lambda_role" {
name = "ml-lambda-role-${var.environment}"
assume_role_policy = jsonencode({
Version = "2012-10-17"
Statement = [{
Action = "sts:AssumeRole"
Effect = "Allow"
Principal = { Service = "lambda.amazonaws.com" }
}]
})
}
resource "aws_iam_role_policy" "lambda_policy" {
name = "ml-lambda-policy"
role = aws_iam_role.lambda_role.id
policy = jsonencode({
Version = "2012-10-17"
Statement = [
{
Effect = "Allow"
Action = [
"logs:CreateLogGroup",
"logs:CreateLogStream",
"logs:PutLogEvents"
]
Resource = "arn:aws:logs:*:*:*"
},
{
Effect = "Allow"
Action = [
"sqs:ReceiveMessage",
"sqs:DeleteMessage",
"sqs:GetQueueAttributes",
"sqs:SendMessage"
]
Resource = [
aws_sqs_queue.inference_queue.arn,
aws_sqs_queue.dlq.arn
]
},
{
Effect = "Allow"
Action = [
"dynamodb:PutItem",
"dynamodb:GetItem",
"dynamodb:UpdateItem"
]
Resource = aws_dynamodb_table.inference_results.arn
},
{
Effect = "Allow"
Action = ["s3:GetObject"]
Resource = "arn:aws:s3:::${var.model_bucket}/*"
}
]
})
}
# Lambda Function
resource "aws_lambda_function" "inference_worker" {
function_name = "ml-inference-worker-${var.environment}"
role = aws_iam_role.lambda_role.arn
package_type = "Image"
image_uri = "${aws_ecr_repository.ml_inference.repository_url}:latest"
timeout = 300 # 5 minutes
memory_size = 3008 # Max memory = 2 vCPUs
environment {
variables = {
MODEL_BUCKET = var.model_bucket
RESULTS_TABLE = aws_dynamodb_table.inference_results.name
ENVIRONMENT = var.environment
}
}
# VPC config if needed
dynamic "vpc_config" {
for_each = var.vpc_enabled ? [1] : []
content {
subnet_ids = var.subnet_ids
security_group_ids = var.security_group_ids
}
}
}
# Connect SQS to Lambda
resource "aws_lambda_event_source_mapping" "sqs_trigger" {
event_source_arn = aws_sqs_queue.inference_queue.arn
function_name = aws_lambda_function.inference_worker.arn
batch_size = 1
maximum_batching_window_in_seconds = 0
scaling_config {
maximum_concurrency = 10
}
}
# API Gateway for submitting requests
resource "aws_apigatewayv2_api" "inference_api" {
name = "ml-inference-api-${var.environment}"
protocol_type = "HTTP"
cors_configuration {
allow_origins = ["*"]
allow_methods = ["POST", "GET"]
allow_headers = ["Content-Type"]
}
}
resource "aws_apigatewayv2_stage" "default" {
api_id = aws_apigatewayv2_api.inference_api.id
name = "$default"
auto_deploy = true
access_log_settings {
destination_arn = aws_cloudwatch_log_group.api_logs.arn
format = jsonencode({
requestId = "$context.requestId"
ip = "$context.identity.sourceIp"
requestTime = "$context.requestTime"
httpMethod = "$context.httpMethod"
routeKey = "$context.routeKey"
status = "$context.status"
responseLength = "$context.responseLength"
})
}
}
resource "aws_cloudwatch_log_group" "api_logs" {
name = "/aws/apigateway/ml-inference-${var.environment}"
retention_in_days = 14
}
# Enqueue Lambda
resource "aws_lambda_function" "enqueue" {
function_name = "ml-inference-enqueue-${var.environment}"
role = aws_iam_role.lambda_role.arn
runtime = "python3.11"
handler = "enqueue.handler"
filename = "lambda/enqueue.zip"
source_code_hash = filebase64sha256("lambda/enqueue.zip")
timeout = 10
memory_size = 256
environment {
variables = {
QUEUE_URL = aws_sqs_queue.inference_queue.url
RESULTS_TABLE = aws_dynamodb_table.inference_results.name
}
}
}
# API Gateway routes
resource "aws_apigatewayv2_integration" "enqueue" {
api_id = aws_apigatewayv2_api.inference_api.id
integration_type = "AWS_PROXY"
integration_uri = aws_lambda_function.enqueue.invoke_arn
payload_format_version = "2.0"
}
resource "aws_apigatewayv2_route" "submit" {
api_id = aws_apigatewayv2_api.inference_api.id
route_key = "POST /predict"
target = "integrations/${aws_apigatewayv2_integration.enqueue.id}"
}
resource "aws_apigatewayv2_route" "status" {
api_id = aws_apigatewayv2_api.inference_api.id
route_key = "GET /status/{request_id}"
target = "integrations/${aws_apigatewayv2_integration.enqueue.id}"
}
resource "aws_lambda_permission" "api_gateway" {
statement_id = "AllowAPIGateway"
action = "lambda:InvokeFunction"
function_name = aws_lambda_function.enqueue.function_name
principal = "apigateway.amazonaws.com"
source_arn = "${aws_apigatewayv2_api.inference_api.execution_arn}/*/*"
}
# Outputs
output "api_endpoint" {
value = aws_apigatewayv2_stage.default.invoke_url
}
output "ecr_repository" {
value = aws_ecr_repository.ml_inference.repository_url
}
Enqueue Handler
# lambda/enqueue.py
import json
import boto3
import uuid
import os
import time
sqs = boto3.client("sqs")
dynamodb = boto3.resource("dynamodb")
QUEUE_URL = os.environ["QUEUE_URL"]
RESULTS_TABLE = os.environ["RESULTS_TABLE"]
def handler(event, context):
"""Handle API Gateway requests."""
method = event.get("requestContext", {}).get("http", {}).get("method")
path = event.get("rawPath", "")
if method == "POST" and "/predict" in path:
return submit_request(event)
elif method == "GET" and "/status/" in path:
request_id = event.get("pathParameters", {}).get("request_id")
return get_status(request_id)
return {"statusCode": 404, "body": "Not found"}
def submit_request(event):
"""Submit prediction request to queue."""
try:
body = json.loads(event.get("body", "{}"))
except json.JSONDecodeError:
return {"statusCode": 400, "body": "Invalid JSON"}
request_id = str(uuid.uuid4())
# Store pending status
table = dynamodb.Table(RESULTS_TABLE)
table.put_item(Item={
"request_id": request_id,
"status": "pending",
"submitted_at": int(time.time()),
"ttl": int(time.time()) + 86400 # 24 hour TTL
})
# Send to queue
sqs.send_message(
QueueUrl=QUEUE_URL,
MessageBody=json.dumps({
"request_id": request_id,
"payload": body
})
)
return {
"statusCode": 202,
"body": json.dumps({
"request_id": request_id,
"status": "pending",
"poll_url": f"/status/{request_id}"
})
}
def get_status(request_id):
"""Get prediction status/result."""
if not request_id:
return {"statusCode": 400, "body": "Missing request_id"}
table = dynamodb.Table(RESULTS_TABLE)
response = table.get_item(Key={"request_id": request_id})
if "Item" not in response:
return {"statusCode": 404, "body": "Request not found"}
item = response["Item"]
return {
"statusCode": 200,
"body": json.dumps({
"request_id": request_id,
"status": item.get("status"),
"result": item.get("result"),
"error": item.get("error")
})
}
43.2.5. Cold Start Optimization
Cold starts kill UX. Here’s how to minimize them.
Cold Start Sources
| Source | Typical Delay | Mitigation |
|---|---|---|
| Container init | 500-2000ms | Smaller image |
| Python import | 500-5000ms | Lazy imports |
| Model load | 2000-30000ms | Provisioned concurrency |
| VPC ENI attach | 5000-10000ms | Avoid VPC if possible |
Provisioned Concurrency
# provisioned_concurrency.tf
resource "aws_lambda_alias" "live" {
name = "live"
function_name = aws_lambda_function.inference_worker.function_name
function_version = aws_lambda_function.inference_worker.version
}
resource "aws_lambda_provisioned_concurrency_config" "warm" {
function_name = aws_lambda_function.inference_worker.function_name
qualifier = aws_lambda_alias.live.name
provisioned_concurrent_executions = 5
}
# Cost: ~$15/month per instance
The Poor Man’s Warmer
# warmer.py
import json
import boto3
from typing import List
lambda_client = boto3.client("lambda")
def warm_functions(function_names: List[str], concurrency: int = 5):
"""Send warmup pings to multiple Lambda instances."""
for func_name in function_names:
for i in range(concurrency):
lambda_client.invoke(
FunctionName=func_name,
InvocationType="Event", # Async
Payload=json.dumps({
"source": "aws.events",
"detail-type": "Warmup",
"instance": i
})
)
return {"warmed": len(function_names) * concurrency}
# CloudWatch Events Rule (Terraform)
"""
resource "aws_cloudwatch_event_rule" "warmer" {
name = "lambda-warmer"
schedule_expression = "rate(4 minutes)"
}
resource "aws_cloudwatch_event_target" "warmer" {
rule = aws_cloudwatch_event_rule.warmer.name
arn = aws_lambda_function.warmer.arn
input = jsonencode({
functions = ["ml-inference-worker-prod"]
concurrency = 3
})
}
"""
Lazy Loading Pattern
# lazy_loading.py
import os
from functools import lru_cache
from typing import Optional
# Don't import heavy libraries at module level
# BAD: import torch, transformers, scipy, numpy
class LazyLoader:
"""Lazy load heavy dependencies."""
_torch = None
_model = None
_tokenizer = None
@classmethod
def get_torch(cls):
if cls._torch is None:
import torch
cls._torch = torch
return cls._torch
@classmethod
@lru_cache(maxsize=1)
def get_model(cls):
if cls._model is None:
torch = cls.get_torch()
# Import here, not at module level
from transformers import AutoModel
model_path = os.environ.get("MODEL_PATH", "model.pt")
if model_path.endswith(".pt"):
cls._model = torch.jit.load(model_path)
else:
cls._model = AutoModel.from_pretrained(model_path)
cls._model.eval()
return cls._model
def handler(event, context):
# Warmup ping - just load model
if event.get("source") == "aws.events":
LazyLoader.get_model()
return {"statusCode": 200, "body": "warm"}
# Real request - model already loaded
model = LazyLoader.get_model()
# ... inference logic
43.2.6. Event-Driven Architecture
Replace service-to-service calls with event flows.
graph TB
A[S3: Video Upload] --> B[EventBridge]
B --> C[Lambda: Transcode]
B --> D[Lambda: Thumbnail]
B --> E[Lambda: Whisper Transcribe]
B --> F[Lambda: Object Detection]
C --> G[S3: Processed]
D --> G
E --> H[DynamoDB: Metadata]
F --> H
G --> I[CloudFront CDN]
H --> J[API: Video Details]
Fan-Out Implementation
# eventbridge.tf
resource "aws_s3_bucket_notification" "video_upload" {
bucket = aws_s3_bucket.uploads.id
eventbridge = true
}
resource "aws_cloudwatch_event_rule" "video_uploaded" {
name = "video-uploaded-${var.environment}"
event_pattern = jsonencode({
source = ["aws.s3"]
detail-type = ["Object Created"]
detail = {
bucket = { name = [aws_s3_bucket.uploads.id] }
object = { key = [{ prefix = "videos/" }] }
}
})
}
# Transcode Lambda
resource "aws_cloudwatch_event_target" "transcode" {
rule = aws_cloudwatch_event_rule.video_uploaded.name
arn = aws_lambda_function.transcode.arn
}
# Thumbnail Lambda
resource "aws_cloudwatch_event_target" "thumbnail" {
rule = aws_cloudwatch_event_rule.video_uploaded.name
arn = aws_lambda_function.thumbnail.arn
}
# Transcription Lambda
resource "aws_cloudwatch_event_target" "transcribe" {
rule = aws_cloudwatch_event_rule.video_uploaded.name
arn = aws_lambda_function.transcribe.arn
}
# Object Detection Lambda
resource "aws_cloudwatch_event_target" "detect_objects" {
rule = aws_cloudwatch_event_rule.video_uploaded.name
arn = aws_lambda_function.detect_objects.arn
}
43.2.7. Troubleshooting
Common Issues
| Problem | Symptom | Cause | Solution |
|---|---|---|---|
| Timeout | 15min limit hit | Long inference | Use Fargate or Step Functions |
| OOM | signal: killed | Model > memory | Increase to 10GB or quantize |
| Cold Start | 10s+ latency | Heavy imports | Provisioned concurrency |
| ENI Exhaustion | Stuck in Pending | VPC Lambda limit | Run outside VPC |
| Payload limit | 413 error | >6MB sync payload | Use S3 presigned URLs |
Debug Pattern
import logging
import json
import traceback
import time
logger = logging.getLogger()
logger.setLevel(logging.INFO)
def handler(event, context):
request_id = context.aws_request_id
start = time.perf_counter()
logger.info(json.dumps({
"event": "request_start",
"request_id": request_id,
"memory_limit_mb": context.memory_limit_in_mb,
"remaining_time_ms": context.get_remaining_time_in_millis()
}))
try:
result = process(event)
logger.info(json.dumps({
"event": "request_complete",
"request_id": request_id,
"duration_ms": (time.perf_counter() - start) * 1000,
"remaining_time_ms": context.get_remaining_time_in_millis()
}))
return {"statusCode": 200, "body": json.dumps(result)}
except Exception as e:
logger.error(json.dumps({
"event": "request_error",
"request_id": request_id,
"error": str(e),
"traceback": traceback.format_exc()
}))
return {"statusCode": 500, "body": json.dumps({"error": str(e)})}
43.2.8. Summary Checklist
| Step | Action | Priority |
|---|---|---|
| 1 | Use Lambdaith pattern (single function) | Critical |
| 2 | CPU-only PyTorch for Lambda | Critical |
| 3 | Async pattern for >30s workloads | High |
| 4 | Provisioned concurrency for production | High |
| 5 | Lazy load models on first request | High |
| 6 | Modal/Replicate for GPU inference | Medium |
| 7 | S3 presigned URLs for large payloads | Medium |
| 8 | Event-driven for pipelines | Medium |
| 9 | Structured logging for debugging | Medium |
| 10 | Avoid VPC unless necessary | Low |
Platform Selection Guide
| Requirement | AWS | GCP | Modal | Replicate |
|---|---|---|---|---|
| CPU inference | Lambda | Cloud Run | ✓ | ✗ |
| GPU inference | SageMaker | Cloud Run GPU | ✓ | ✓ |
| Scale-to-zero | ✓ | ✓ | ✓ | ✓ |
| Cold start | 1-10s | 1-5s | 1-5s | 5-30s |
| Max memory | 10GB | 32GB | 256GB | Varies |
| Max timeout | 15min | 60min | Unlimited | Unlimited |
[End of Section 43.2]