45.4. High-Performance Inference Serving
Important
The Goal: Serve 100,000 req/sec with < 5ms latency. Python (Uvicorn) caps out at ~5,000 req/sec due to GIL contension. Go is faster but suffers from GC pauses (latency spikes). Rust is the only choice for predictable, low-latency AI serving.
45.4.1. The Architecture of Speed
A modern AI Inference Server is not just “Flask with a model”. It is a distributed system component that must handle:
- Backpressure: Reject requests if the GPU queue is full (“Shed Load”).
- Concurrency: Handle 10k connections waiting for IO.
- Batching: Group 32 requests into 1 GPU call (Dynamic Batching).
- Observability: Trace ID propagation.
The Stack
- Web Framework:
axum(Ergonomic, built on Tokio). - Runtime:
tokio(Work-Stealing Async Runtime). - GRPC:
tonic(High performance RPC). - Observability:
tower-http+tracing.
45.4.2. Production Server Boilerplate
Do not use axum::serve directly. Use a ServerBuilder pattern.
use axum::{Router, routing::post};
use tokio::signal;
use tower_http::trace::TraceLayer;
use std::net::SocketAddr;
async fn main() {
// 1. Initialize Tracing (JSON logs)
tracing_subscriber::fmt()
.with_target(false)
.json()
.init();
// 2. Build Router
let app = Router::new()
.route("/predict", post(predict_handler))
.layer(TraceLayer::new_for_http()); // Access Logs
// 3. Bind Address
let addr = SocketAddr::from(([0, 0, 0, 0], 3000));
println!("listening on {}", addr);
let listener = tokio::net::TcpListener::bind(addr).await.unwrap();
// 4. Graceful Shutdown
// This allows in-flight requests to finish before killing the pod.
axum::serve(listener, app)
.with_graceful_shutdown(shutdown_signal())
.await
.unwrap();
}
async fn shutdown_signal() {
let ctrl_c = async {
signal::ctrl_c().await.expect("failed to install Ctrl+C handler");
};
#[cfg(unix)]
let terminate = async {
signal::unix::signal(signal::unix::SignalKind::terminate())
.expect("failed to install signal handler")
.recv()
.await;
};
#[cfg(not(unix))]
let terminate = std::future::pending::<()>();
tokio::select! {
_ = ctrl_c => {},
_ = terminate => {},
}
println!("Signal received, starting graceful shutdown");
}
45.4.3. Middleware: The tower Ecosystem
Rust’s middleware ecosystem is distinct from Python’s. Middlewares are “Services” that wrap other Services.
Adding Rate Limiting and Compression
#![allow(unused)]
fn main() {
use tower_http::{
compression::CompressionLayer,
limit::RequestBodyLimitLayer,
timeout::TimeoutLayer,
};
use std::time::Duration;
let app = Router::new()
.route("/", post(handler))
// 1. Defend against DoS (Max 10MB Load)
.layer(RequestBodyLimitLayer::new(1024 * 1024 * 10))
// 2. Defend against Slowloris (5 sec timeout)
.layer(TimeoutLayer::new(Duration::from_secs(5)))
// 3. Save Bandwidth (Gzip/Brotli)
.layer(CompressionLayer::new());
}
45.4.4. The Model Actor Pattern
The most critical mistake is putting the Model inside the Request Handler directly.
If model.forward() takes 100ms and holds the GIL (in PyO3) or blocks the thread (in Burn), you starve the web server.
Solution: The Actor Pattern.
- Web Handler: Receives Request -> Sends to Channel -> Awaits Response.
- Model Actor: Looping on Channel -> Batches Inputs -> Runs Inference -> Sends Response.
The Actor Implementation
#![allow(unused)]
fn main() {
use tokio::sync::{mpsc, oneshot};
use burn::tensor::Tensor;
struct InferenceActor {
receiver: mpsc::Receiver<ActorMessage>,
model: MyBurnModel,
}
struct ActorMessage {
input: Tensor<Backend, 2>,
responder: oneshot::Sender<Tensor<Backend, 2>>,
}
impl InferenceActor {
async fn run(mut self) {
while let Some(msg) = self.receiver.recv().await {
// In a real actor, we would accumulate 'msg' into a Vec
// and perform Dynamic Batching here.
let output = self.model.forward(msg.input);
let _ = msg.responder.send(output);
}
}
}
}
The Web Handler (Lightweight)
#![allow(unused)]
fn main() {
#[derive(Clone)]
struct AppState {
sender: mpsc::Sender<ActorMessage>, // Cheap to clone
}
async fn predict(
State(state): State<AppState>,
Json(payload): Json<Payload>
) -> Json<Response> {
// 1. Create OneShot channel for the reply
let (tx, rx) = oneshot::channel();
// 2. Send to Actor
// If channel is full, this `.send()` will wait (Backpressure!)
// If the queue is > 1000 items, we return 503 Overloaded immediately.
let msg = ActorMessage {
input: payload.to_tensor(),
responder: tx
};
if state.sender.try_send(msg).is_err() {
return StatusCode::SERVICE_UNAVAILABLE.into();
}
// 3. Wait for result
let result = rx.await.unwrap();
Json(result.into())
}
}
45.4.5. gRPC with Tonic
REST is great for public APIs. For internal microservices (Embeddings -> Reranker -> LLM), use gRPC.
tonic is a pure Rust gRPC implementation.
The inference.proto
syntax = "proto3";
package inference;
service ModelService {
rpc Predict (PredictRequest) returns (PredictResponse);
}
message PredictRequest {
repeated float data = 1;
repeated int64 shape = 2;
}
message PredictResponse {
repeated float logits = 1;
}
The Code Generation (build.rs)
fn main() -> Result<(), Box<dyn std::error::Error>> {
tonic_build::compile_protos("proto/inference.proto")?;
Ok(())
}
The Implementation
use tonic::{transport::Server, Request, Response, Status};
use inference::model_service_server::{ModelService, ModelServiceServer};
use inference::{PredictRequest, PredictResponse};
pub struct MyService;
#[tonic::async_trait]
impl ModelService for MyService {
async fn predict(&self, request: Request<PredictRequest>) -> Result<Response<PredictResponse>, Status> {
let req = request.into_inner();
// ... inference logic ...
Ok(Response::new(PredictResponse { logits: vec![0.1, 0.9] }))
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let addr = "[::1]:50051".parse()?;
let service = MyService::default();
Server::builder()
.add_service(ModelServiceServer::new(service))
.serve(addr)
.await?;
Ok(())
}
45.4.6. Dynamic Batching Implementation
This is where Rust shines.
In Python, implementing a “wait 5ms or until batch size 32” loop is hard because the GIL interferes with the timer.
In Rust, tokio::select! makes it trivial.
#![allow(unused)]
fn main() {
async fn run_batcher(mut rx: mpsc::Receiver<Request>) {
let mut batch = Vec::new();
let max_batch_size = 32;
let timeout = Duration::from_millis(5);
loop {
tokio::select! {
// Case 1: New Request Arrived
Some(req) = rx.recv() => {
batch.push(req);
if batch.len() >= max_batch_size {
process_batch(&mut batch).await;
}
}
// Case 2: Timeout Expired
_ = tokio::time::sleep(timeout), if !batch.is_empty() => {
process_batch(&mut batch).await;
}
}
}
}
}
This loop is “Zero CPU” while waiting. It grabs the OS timer interrupt accurately.
45.4.7. Reference Architecture: The Dynamic Batcher Actor
This is a production-grade implementation of Dynamic Batching.
It uses tokio::select! to handle timeouts (latency budget) and Vec::with_capacity to prevent allocation churn.
src/batcher.rs
#![allow(unused)]
fn main() {
use tokio::sync::{mpsc, oneshot};
use tokio::time::{timeout, Duration};
use ndarray::{Array2, Axis};
use std::sync::Arc;
// Configuration
const MAX_BATCH_SIZE: usize = 32;
const MAX_LATENCY_MS: u64 = 5;
// The Request Object
pub struct BatchRequest {
pub input: Vec<f32>,
pub tx: oneshot::Sender<Vec<f32>>, // Send result back
}
// The Batcher Struct (Handle)
#[derive(Clone)]
pub struct Batcher {
tx: mpsc::Sender<BatchRequest>,
}
impl Batcher {
// Spawns the Actor Loop
pub fn new(model: Arc<Model>) -> Self {
let (tx, rx) = mpsc::channel(1024); // Backpressure buffer
tokio::spawn(async move {
run_actor_loop(rx, model).await;
});
Self { tx }
}
// Public API
pub async fn predict(&self, input: Vec<f32>) -> Result<Vec<f32>, String> {
let (resp_tx, resp_rx) = oneshot::channel();
let req = BatchRequest {
input,
tx: resp_tx,
};
// Send to Actor
self.tx.send(req).await.map_err(|_| "Actor died")?;
// Wait for Actor response
resp_rx.await.map_err(|_| "Response dropped")
}
}
// The Actor Loop (Zero Allocation Hot Path)
async fn run_actor_loop(mut rx: mpsc::Receiver<BatchRequest>, model: Arc<Model>) {
// Pre-allocate buffer to avoid reallocating every loop
let mut buffer: Vec<BatchRequest> = Vec::with_capacity(MAX_BATCH_SIZE);
loop {
// 1. Fetch first item (wait indefinitely)
let first = match rx.recv().await {
Some(req) => req,
None => break, // Channel closed
};
buffer.push(first);
// 2. Deadline for the batch
let deadline = tokio::time::Instant::now() + Duration::from_millis(MAX_LATENCY_MS);
// 3. Fill the rest of the batch (up to MAX_BATCH_SIZE) or Timeout
while buffer.len() < MAX_BATCH_SIZE {
let time_left = deadline.saturating_duration_since(tokio::time::Instant::now());
if time_left.is_zero() {
break;
}
// Wait for next item OR timeout
match timeout(time_left, rx.recv()).await {
Ok(Some(req)) => buffer.push(req),
Ok(None) => return, // Channel closed
Err(_) => break, // Timeout reached! Commit batch.
}
}
// 4. BATCH IS READY. EXECUTE.
// Convert [Vec<f32>] -> Array2<f32> (Batch Tensor)
// This copy is necessary unless we use 'Bytes' (Scatter/Gather)
let batch_size = buffer.len();
let flat_input: Vec<f32> = buffer.iter().flat_map(|r| r.input.clone()).collect();
// Assuming 512 dims
let tensor = Array2::from_shape_vec((batch_size, 512), flat_input).unwrap();
// Run Inference (Global Interpreter Lock Free!)
let results = model.forward(tensor);
// 5. Distribute Results
// 'results' is (Batch, 10)
for (i, req) in buffer.drain(..).enumerate() {
let row = results.index_axis(Axis(0), i).to_vec();
let _ = req.tx.send(row);
}
// Buffer is empty (drain), capacity is preserved.
// Loop continues.
}
}
}
45.4.8. Why this beats Python
In Python, implementing this loop with asyncio is possible, but:
- Event Loop Overhead: Python’s loop wakes up, acquires GIL, checks
recv, releases GIL. - Latency Jitter: If the GC runs during
rx.recv(), your 5ms deadline becomes 50ms. - Throughput: Rust handles channel messages in nanoseconds. Python handles them in microseconds.
The Rust implementation guarantees that the Batch Latency is exactly max(5ms, T_inference). No GC spikes.
45.4.9. Scaling to Multi-GPU
Dynamic Batching is trivial to shard.
Just instantiate Batcher multiple times.
Or, make the Batcher send full batches to a RoundRobin channel that feeds 4 GPU workers.
The mpsc::channel acts as the perfect Load Balancer.
45.4.10. Handling Cancellation
If the User disconnects (HTTP client closes socket), resp_rx in predict drops.
When the Actor tries to send req.tx.send(row), it fails.
Rust handles this gracefully (Result::Err).
You don’t process potential “Zombie Requests” because you check connection status before pushing to buffer (optional optimization).
45.4.11. Observability: Metrics that Matter
A dashboard with “CPU Usage” is useless. You need “Queue Depth” and “Token Latency”.
We use metrics and metrics-exporter-prometheus.
Instrumentation
#![allow(unused)]
fn main() {
use metrics::{histogram, counter, gauge};
async fn run_actor_loop(...) {
loop {
let queue_len = rx.len();
gauge!("inference_queue_depth", queue_len as f64);
let start = Instant::now();
// ... inference ...
let latency = start.elapsed();
histogram!("inference_latency_seconds", latency.as_secs_f64());
counter!("inference_requests_total", batch_size as u64);
}
}
}
Exposing /metrics Endpoint
use metrics_exporter_prometheus::PrometheusBuilder;
async fn main() {
let builder = PrometheusBuilder::new();
let handle = builder.install_recorder().expect("failed to install recorder");
let app = Router::new()
.route("/metrics", get(move || std::future::ready(handle.render())));
}
Now point Grafana to localhost:3000/metrics.
45.4.12. Performance Tuning: The OS Layer
You can write the fastest Rust code, but if the Linux Kernel blocks you, you lose.
1. TCP Keepalives & Backlog
By default, the backlog (pending connections) is small (128). For 10k RPS, you need to bump it.
#![allow(unused)]
fn main() {
let listener = TcpListener::bind(addr).await.unwrap();
// Rust doesn't expose backlog easily, setup usually happens in sysctl
}
Sysctl Config:
# /etc/sysctl.conf
net.core.somaxconn = 65535
net.ipv4.tcp_max_syn_backlog = 65535
net.ipv4.ip_local_port_range = 1024 65535
2. File Descriptors
Every socket is a file. The default limit is 1024.
If you have 5000 concurrent users, the server crashes with Too many open files.
Fix:
ulimit -n 100000 (in your Dockerfile/Systemd).
45.4.13. Load Shedding: Survival of the Fittest
When the GPU is saturated, accepting more requests just increases latency for everyone.
It is better to return 503 Service Unavailable instantly.
#![allow(unused)]
fn main() {
use tower::load_shed::LoadShedLayer;
let service = ServiceBuilder::new()
.layer(LoadShedLayer::new()) // Reject if inner service is not ready
.service(inner_service);
}
Implementing Backpressure in Actor:
#![allow(unused)]
fn main() {
// Web Handler
if state.sender.capacity() == 0 {
// Queue is full. Shed load.
return StatusCode::SERVICE_UNAVAILABLE;
}
state.sender.send(msg).await;
}
45.4.14. Streaming Responses (LLM Style)
For LLMs, waiting 5 seconds for the full text is bad UX. We need Server-Sent Events (SSE).
#![allow(unused)]
fn main() {
use axum::response::sse::{Event, Sse};
use futures::stream::Stream;
async fn stream_handler(
State(state): State<AppState>,
Json(payload): Json<Payload>
) -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
// Create a channel for tokens
let (tx, rx) = mpsc::channel(100);
// Send request to Actor (Actor must support streaming)
state.sender.send(StreamingRequest { input: payload, tx }).await.unwrap();
// Convert Receiver to Stream
let stream = tokio_stream::wrappers::ReceiverStream::new(rx)
.map(|token| {
Ok(Event::default().data(token))
});
Sse::new(stream).keep_alive(axum::response::sse::KeepAlive::default())
}
}
This pipes the tokio::sync::mpsc channel directly to the HTTP Response body. Unique to Rust/Tokio.
45.4.15. Advanced Pattern: Redis Caching Layer
Inference is expensive. Looking up a key in Redis is cheap.
We use bb8 (Connection Pool) + redis crate.
#![allow(unused)]
fn main() {
use bb8_redis::{RedisConnectionManager, bb8};
type Pool = bb8::Pool<RedisConnectionManager>;
async fn predict_cached(
State(pool): State<Pool>,
Json(payload): Json<Payload>
) -> Json<Response> {
// 1. Hash the Input
let key = format!("cache:{}", hash(&payload));
// 2. Check Cache
let mut conn = pool.get().await.unwrap();
if let Ok(cached) = redis::cmd("GET").arg(&key).query_async(&mut *conn).await {
return Json(serde_json::from_str(&cached).unwrap());
}
// 3. Miss? Run Inference
let result = run_inference(payload).await;
// 4. Write Cace (TTL 1 hour)
let _ = redis::cmd("SETEX").arg(&key).arg(3600).arg(json_str).query_async(&mut *conn).await;
Json(result)
}
}
45.4.16. Request De-duplication (Singleflight)
If 100 users ask “What is the capital of France?” at the exact same millisecond:
- Naive Server: Runs inference 100 times.
- Smart Server: Runs inference 1 time, returns result to 100 users.
This is called Singleflight.
#![allow(unused)]
fn main() {
use cached::stores::TimedCache;
use tokio::sync::Mutex;
use std::sync::Arc;
// Map: QueryHash -> WaitBuffer
type InFlight = Arc<Mutex<HashMap<String, Vec<oneshot::Sender<Response>>>>>;
async fn deduplicated_handler(
State(inflight): State<InFlight>,
Json(payload): Json<Payload>
) -> Json<Response> {
let key = hash(&payload);
let (tx, rx) = oneshot::channel();
let mut is_leader = false;
{
let mut map = inflight.lock().await;
if let Some(waiters) = map.get_mut(&key) {
waiters.push(tx); // I am a follower
} else {
map.insert(key.clone(), vec![tx]); // I am the leader
is_leader = true;
}
}
if is_leader {
let result = run_model(payload).await;
let mut map = inflight.lock().await;
if let Some(waiters) = map.remove(&key) {
for waiter in waiters {
let _ = waiter.send(result.clone());
}
}
}
Json(rx.await.unwrap())
}
}
45.4.17. Authentication Middleware (JWT)
Unless you are giving away free compute, you need Auth. Axum middleware makes this clean.
#![allow(unused)]
fn main() {
use axum_extra::headers::{Authorization, authorization::Bearer};
use jsonwebtoken::{decode, DecodingKey, Validation};
async fn auth_middleware<B>(
request: Request<B>,
next: Next<B>,
) -> Result<Response, StatusCode> {
let headers = request.headers();
let auth_header = headers.get("Authorization")
.and_then(|h| h.to_str().ok())
.and_then(|h| h.strip_prefix("Bearer "));
let token = match auth_header {
Some(t) => t,
None => return Err(StatusCode::UNAUTHORIZED),
};
// CPU-intensive crypto, but negligible compared to inference
let token_data = decode::<Claims>(
token,
&DecodingKey::from_secret("secret".as_ref()),
&Validation::default(),
).map_err(|_| StatusCode::UNAUTHORIZED)?;
// Inject UserID into Request Extensions
let mut request = request;
request.extensions_mut().insert(token_data.claims.user_id);
Ok(next.run(request).await)
}
}
45.4.18. Load Balancing Strategies
When running a cluster of Rust pods:
- Round Robin: Good for homogenous requests.
- Least Connections: Better for variable length generation.
- Peak EWMA (Exponential Weighted Moving Average): The gold standard.
In Rust, you handle this at the tower layer in your Gateway.
#![allow(unused)]
fn main() {
use tower::balance::p2c::Balance;
use tower::load::PeakEwma;
let service = Balance::new(discover);
let service = PeakEwma::new(service, decay_ns, default_rtt, cost_fn);
}
This is built-in to the ecosystem. No need for Nginx/Envoy if you build a Rust Gateway.
45.4.19. Handling Large Payloads (Multipart)
Sending an Image (5MB) via JSON is slow (Base64 overhead + 33% bloat).
Use Multipart.
#![allow(unused)]
fn main() {
use axum::extract::Multipart;
async fn upload_image(mut multipart: Multipart) {
while let Some(field) = multipart.next_field().await.unwrap() {
let name = field.name().unwrap().to_string();
let data = field.bytes().await.unwrap();
println!("Received {} bytes for {}", data.len(), name);
// Zero-Copy conversion to Tensor
// ...
}
}
}
45.4.20. Final Exam: The 100k RPS Architecture
Scenario: You are serving a Spam Detection Model (DistilBERT). Traffic: 100k emails/sec. SLA: P99 < 50ms.
The Rust Solution:
- Ingress: Cloudflare -> Rust Gateway (Axum).
- Gateway:
- Auth (JWT).
- Deduplication (10% cache hit).
- Sharding (Hash email -> Specific Worker Pod).
- Worker (Pod):
tokio::mpscActor (Batch Size 128).- ONNX Runtime (Int8 Quantization).
- Metrics Reporter.
Why Python Fails:
Python’s uvicorn creates a new Task for every request. At 100k RPS, the Scheduler overhead kills the CPU before the model even runs.
Rust’s tokio creates a lightweight Future (200 bytes state machine). It scales linearly until the NIC saturates.
[End of Section 45.4]
45.4.21. Connection Pooling and Resource Management
Managing connections efficiently is critical for high-throughput serving.
HTTP Client Pooling
#![allow(unused)]
fn main() {
use reqwest::Client;
use std::sync::Arc;
pub struct InferenceClient {
client: Arc<Client>,
endpoints: Vec<String>,
current_idx: std::sync::atomic::AtomicUsize,
}
impl InferenceClient {
pub fn new(endpoints: Vec<String>, max_connections: usize) -> Self {
let client = Client::builder()
.pool_max_idle_per_host(max_connections)
.pool_idle_timeout(std::time::Duration::from_secs(30))
.timeout(std::time::Duration::from_secs(5))
.tcp_keepalive(std::time::Duration::from_secs(60))
.build()
.unwrap();
Self {
client: Arc::new(client),
endpoints,
current_idx: std::sync::atomic::AtomicUsize::new(0),
}
}
fn next_endpoint(&self) -> &str {
let idx = self.current_idx.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
&self.endpoints[idx % self.endpoints.len()]
}
pub async fn predict(&self, input: &[f32]) -> Result<Vec<f32>, Error> {
let endpoint = self.next_endpoint();
let response = self.client
.post(format!("{}/predict", endpoint))
.json(&serde_json::json!({ "input": input }))
.send()
.await?
.json()
.await?;
Ok(response)
}
}
}
GPU Memory Pool
#![allow(unused)]
fn main() {
use std::collections::VecDeque;
pub struct GpuMemoryPool {
available: tokio::sync::Mutex<VecDeque<GpuBuffer>>,
buffer_size: usize,
max_buffers: usize,
}
impl GpuMemoryPool {
pub async fn acquire(&self) -> GpuBuffer {
let mut available = self.available.lock().await;
if let Some(buffer) = available.pop_front() {
return buffer;
}
// Allocate new buffer if under limit
if available.len() < self.max_buffers {
return GpuBuffer::allocate(self.buffer_size);
}
// Wait for buffer to become available
drop(available);
loop {
tokio::time::sleep(std::time::Duration::from_millis(1)).await;
let mut available = self.available.lock().await;
if let Some(buffer) = available.pop_front() {
return buffer;
}
}
}
pub async fn release(&self, buffer: GpuBuffer) {
let mut available = self.available.lock().await;
available.push_back(buffer);
}
}
}
45.4.22. Zero-Copy Request/Response
Avoid copying data between network and model.
Using Bytes Crate
#![allow(unused)]
fn main() {
use bytes::Bytes;
use axum::body::Body;
async fn predict_zero_copy(body: Body) -> impl IntoResponse {
// Stream body directly without buffering
let bytes = axum::body::to_bytes(body, 10_000_000).await.unwrap();
// bytes is Arc-backed, can be shared without copying
let result = process_input(&bytes).await;
// Return response using same mechanism
Response::builder()
.header("Content-Type", "application/octet-stream")
.body(Body::from(result))
.unwrap()
}
}
Memory-Mapped Input
#![allow(unused)]
fn main() {
use memmap2::Mmap;
pub struct MappedModel {
weights: Mmap,
}
impl MappedModel {
pub fn load(path: &str) -> Result<Self, Error> {
let file = std::fs::File::open(path)?;
// Memory-map the file
// OS handles paging, we don't load entire 7GB into RAM
let weights = unsafe { Mmap::map(&file)? };
Ok(Self { weights })
}
pub fn get_layer(&self, offset: usize, size: usize) -> &[f32] {
// Direct pointer into mapped memory
// No copy, no allocation
let bytes = &self.weights[offset..offset + size * 4];
unsafe {
std::slice::from_raw_parts(
bytes.as_ptr() as *const f32,
size
)
}
}
}
}
45.4.23. Structured Concurrency
Manage complex async workflows safely.
#![allow(unused)]
fn main() {
use tokio::task::JoinSet;
async fn parallel_inference(inputs: Vec<Input>) -> Vec<Output> {
let mut set = JoinSet::new();
for input in inputs {
set.spawn(async move {
run_single_inference(input).await
});
}
let mut results = Vec::with_capacity(set.len());
while let Some(result) = set.join_next().await {
match result {
Ok(output) => results.push(output),
Err(e) => {
tracing::error!("Task panicked: {:?}", e);
// Continue with other tasks
}
}
}
results
}
}
Cancellation-Safe Operations
#![allow(unused)]
fn main() {
use tokio_util::sync::CancellationToken;
pub struct InferenceService {
cancel_token: CancellationToken,
}
impl InferenceService {
pub async fn run(&self, input: Input) -> Result<Output, Error> {
tokio::select! {
result = self.do_inference(input) => {
result
}
_ = self.cancel_token.cancelled() => {
Err(Error::Cancelled)
}
}
}
pub fn shutdown(&self) {
self.cancel_token.cancel();
}
}
}
45.4.24. Comprehensive Health Checks
Production services need detailed health information.
#![allow(unused)]
fn main() {
use serde::Serialize;
#[derive(Serialize)]
pub struct HealthStatus {
status: String,
components: HashMap<String, ComponentHealth>,
metadata: Metadata,
}
#[derive(Serialize)]
pub struct ComponentHealth {
status: String,
latency_ms: Option<f64>,
error: Option<String>,
}
#[derive(Serialize)]
pub struct Metadata {
version: String,
uptime_seconds: u64,
requests_total: u64,
requests_failed: u64,
}
async fn health_check(State(state): State<AppState>) -> Json<HealthStatus> {
let mut components = HashMap::new();
// Check model
let model_health = check_model_health(&state.model).await;
components.insert("model".to_string(), model_health);
// Check GPU
let gpu_health = check_gpu_health().await;
components.insert("gpu".to_string(), gpu_health);
// Check dependencies
let redis_health = check_redis_health(&state.redis).await;
components.insert("redis".to_string(), redis_health);
// Aggregate status
let all_healthy = components.values().all(|c| c.status == "healthy");
Json(HealthStatus {
status: if all_healthy { "healthy" } else { "degraded" }.to_string(),
components,
metadata: Metadata {
version: env!("CARGO_PKG_VERSION").to_string(),
uptime_seconds: state.start_time.elapsed().as_secs(),
requests_total: state.metrics.requests_total.load(Ordering::Relaxed),
requests_failed: state.metrics.requests_failed.load(Ordering::Relaxed),
},
})
}
async fn check_gpu_health() -> ComponentHealth {
match get_gpu_utilization() {
Ok(util) => ComponentHealth {
status: if util < 95.0 { "healthy" } else { "degraded" }.to_string(),
latency_ms: None,
error: None,
},
Err(e) => ComponentHealth {
status: "unhealthy".to_string(),
latency_ms: None,
error: Some(e.to_string()),
},
}
}
}
45.4.25. A/B Testing in the Serving Layer
Route traffic to different model versions.
#![allow(unused)]
fn main() {
pub struct ABRouter {
models: HashMap<String, Arc<dyn Model>>,
traffic_split: HashMap<String, f32>, // model_name -> percentage
}
impl ABRouter {
pub fn route(&self, request_id: &str) -> &Arc<dyn Model> {
// Deterministic routing based on request ID
let hash = fxhash::hash64(request_id.as_bytes());
let normalized = (hash % 10000) as f32 / 100.0; // 0-100
let mut cumulative = 0.0;
for (model_name, percentage) in &self.traffic_split {
cumulative += percentage;
if normalized < cumulative {
return self.models.get(model_name).unwrap();
}
}
// Fallback to first model
self.models.values().next().unwrap()
}
pub async fn predict(&self, request_id: &str, input: Input) -> Output {
let model = self.route(request_id);
// Record which variant was used
metrics::counter!("ab_variant", "variant" => model.name()).increment(1);
model.predict(input).await
}
}
}
45.4.26. Production Deployment Checklist
Pre-Deployment
- Load test at 2x expected peak traffic
- Verify graceful shutdown behavior
- Test circuit breaker activation
- Validate health check endpoints
- Review timeout configurations
Deployment
- Blue-green or canary deployment
- Monitor error rates during rollout
- Verify metrics are flowing
- Check log aggregation
Post-Deployment
- Establish baseline latency
- Set up alerting thresholds
- Document runbook for incidents
- Schedule chaos engineering tests
45.4.27. Final Architecture Summary
┌─────────────────────────────────────────────────────────────────────┐
│ High-Performance Inference Architecture │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ Internet Traffic │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────────┐│
│ │ Load Balancer (with SSL termination) ││
│ └──────────────────────────┬──────────────────────────────────────┘│
│ │ │
│ ┌──────────────────────────▼──────────────────────────────────────┐│
│ │ API Gateway (Axum) ││
│ │ • Rate Limiting • Auth (JWT) • Request Validation ││
│ │ • Deduplication • Response Caching ││
│ └──────────────────────────┬──────────────────────────────────────┘│
│ │ │
│ ┌──────────────────────────▼──────────────────────────────────────┐│
│ │ Request Router ││
│ │ • A/B Testing • Model Selection • Load Balancing ││
│ └──────────────────────────┬──────────────────────────────────────┘│
│ │ │
│ ┌──────────────────────────▼──────────────────────────────────────┐│
│ │ Dynamic Batcher (Actor Pattern) ││
│ │ • Accumulate requests • Timeout handling • Backpressure ││
│ └──────────────────────────┬──────────────────────────────────────┘│
│ │ │
│ ┌──────────────────────────▼──────────────────────────────────────┐│
│ │ Model Execution (GPU/CPU) ││
│ │ • CUDA Kernels • Quantization • Memory-mapped weights ││
│ └─────────────────────────────────────────────────────────────────┘│
│ │
└─────────────────────────────────────────────────────────────────────┘
This architecture handles 100k+ RPS with sub-5ms P99 latency.
[End of Section 45.4]