Keyboard shortcuts

Press or to navigate between chapters

Press ? to show this help

Press Esc to hide this help

45.4. High-Performance Inference Serving

Important

The Goal: Serve 100,000 req/sec with < 5ms latency. Python (Uvicorn) caps out at ~5,000 req/sec due to GIL contension. Go is faster but suffers from GC pauses (latency spikes). Rust is the only choice for predictable, low-latency AI serving.

45.4.1. The Architecture of Speed

A modern AI Inference Server is not just “Flask with a model”. It is a distributed system component that must handle:

  1. Backpressure: Reject requests if the GPU queue is full (“Shed Load”).
  2. Concurrency: Handle 10k connections waiting for IO.
  3. Batching: Group 32 requests into 1 GPU call (Dynamic Batching).
  4. Observability: Trace ID propagation.

The Stack

  • Web Framework: axum (Ergonomic, built on Tokio).
  • Runtime: tokio (Work-Stealing Async Runtime).
  • GRPC: tonic (High performance RPC).
  • Observability: tower-http + tracing.

45.4.2. Production Server Boilerplate

Do not use axum::serve directly. Use a ServerBuilder pattern.

use axum::{Router, routing::post};
use tokio::signal;
use tower_http::trace::TraceLayer;
use std::net::SocketAddr;

async fn main() {
    // 1. Initialize Tracing (JSON logs)
    tracing_subscriber::fmt()
        .with_target(false)
        .json()
        .init();

    // 2. Build Router
    let app = Router::new()
        .route("/predict", post(predict_handler))
        .layer(TraceLayer::new_for_http()); // Access Logs

    // 3. Bind Address
    let addr = SocketAddr::from(([0, 0, 0, 0], 3000));
    println!("listening on {}", addr);

    let listener = tokio::net::TcpListener::bind(addr).await.unwrap();

    // 4. Graceful Shutdown
    // This allows in-flight requests to finish before killing the pod.
    axum::serve(listener, app)
        .with_graceful_shutdown(shutdown_signal())
        .await
        .unwrap();
}

async fn shutdown_signal() {
    let ctrl_c = async {
        signal::ctrl_c().await.expect("failed to install Ctrl+C handler");
    };

    #[cfg(unix)]
    let terminate = async {
        signal::unix::signal(signal::unix::SignalKind::terminate())
            .expect("failed to install signal handler")
            .recv()
            .await;
    };

    #[cfg(not(unix))]
    let terminate = std::future::pending::<()>();

    tokio::select! {
        _ = ctrl_c => {},
        _ = terminate => {},
    }
    println!("Signal received, starting graceful shutdown");
}

45.4.3. Middleware: The tower Ecosystem

Rust’s middleware ecosystem is distinct from Python’s. Middlewares are “Services” that wrap other Services.

Adding Rate Limiting and Compression

#![allow(unused)]
fn main() {
use tower_http::{
    compression::CompressionLayer,
    limit::RequestBodyLimitLayer,
    timeout::TimeoutLayer,
};
use std::time::Duration;

let app = Router::new()
    .route("/", post(handler))
    // 1. Defend against DoS (Max 10MB Load)
    .layer(RequestBodyLimitLayer::new(1024 * 1024 * 10))
    // 2. Defend against Slowloris (5 sec timeout)
    .layer(TimeoutLayer::new(Duration::from_secs(5)))
    // 3. Save Bandwidth (Gzip/Brotli)
    .layer(CompressionLayer::new());
}

45.4.4. The Model Actor Pattern

The most critical mistake is putting the Model inside the Request Handler directly. If model.forward() takes 100ms and holds the GIL (in PyO3) or blocks the thread (in Burn), you starve the web server.

Solution: The Actor Pattern.

  1. Web Handler: Receives Request -> Sends to Channel -> Awaits Response.
  2. Model Actor: Looping on Channel -> Batches Inputs -> Runs Inference -> Sends Response.

The Actor Implementation

#![allow(unused)]
fn main() {
use tokio::sync::{mpsc, oneshot};
use burn::tensor::Tensor;

struct InferenceActor {
    receiver: mpsc::Receiver<ActorMessage>,
    model: MyBurnModel,
}

struct ActorMessage {
    input: Tensor<Backend, 2>,
    responder: oneshot::Sender<Tensor<Backend, 2>>,
}

impl InferenceActor {
    async fn run(mut self) {
        while let Some(msg) = self.receiver.recv().await {
            // In a real actor, we would accumulate 'msg' into a Vec
            // and perform Dynamic Batching here.
            
            let output = self.model.forward(msg.input);
            let _ = msg.responder.send(output);
        }
    }
}
}

The Web Handler (Lightweight)

#![allow(unused)]
fn main() {
#[derive(Clone)]
struct AppState {
    sender: mpsc::Sender<ActorMessage>, // Cheap to clone
}

async fn predict(
    State(state): State<AppState>,
    Json(payload): Json<Payload>
) -> Json<Response> {
    // 1. Create OneShot channel for the reply
    let (tx, rx) = oneshot::channel();
    
    // 2. Send to Actor
    // If channel is full, this `.send()` will wait (Backpressure!)
    // If the queue is > 1000 items, we return 503 Overloaded immediately.
    let msg = ActorMessage {
         input: payload.to_tensor(),
         responder: tx 
    };
    
    if state.sender.try_send(msg).is_err() {
        return StatusCode::SERVICE_UNAVAILABLE.into();
    }
    
    // 3. Wait for result
    let result = rx.await.unwrap();
    Json(result.into())
}
}

45.4.5. gRPC with Tonic

REST is great for public APIs. For internal microservices (Embeddings -> Reranker -> LLM), use gRPC. tonic is a pure Rust gRPC implementation.

The inference.proto

syntax = "proto3";
package inference;

service ModelService {
  rpc Predict (PredictRequest) returns (PredictResponse);
}

message PredictRequest {
  repeated float data = 1;
  repeated int64 shape = 2;
}

message PredictResponse {
  repeated float logits = 1;
}

The Code Generation (build.rs)

fn main() -> Result<(), Box<dyn std::error::Error>> {
    tonic_build::compile_protos("proto/inference.proto")?;
    Ok(())
}

The Implementation

use tonic::{transport::Server, Request, Response, Status};
use inference::model_service_server::{ModelService, ModelServiceServer};
use inference::{PredictRequest, PredictResponse};

pub struct MyService;

#[tonic::async_trait]
impl ModelService for MyService {
    async fn predict(&self, request: Request<PredictRequest>) -> Result<Response<PredictResponse>, Status> {
        let req = request.into_inner();
        // ... inference logic ...
        Ok(Response::new(PredictResponse { logits: vec![0.1, 0.9] }))
    }
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let addr = "[::1]:50051".parse()?;
    let service = MyService::default();

    Server::builder()
        .add_service(ModelServiceServer::new(service))
        .serve(addr)
        .await?;

    Ok(())
}

45.4.6. Dynamic Batching Implementation

This is where Rust shines. In Python, implementing a “wait 5ms or until batch size 32” loop is hard because the GIL interferes with the timer. In Rust, tokio::select! makes it trivial.

#![allow(unused)]
fn main() {
async fn run_batcher(mut rx: mpsc::Receiver<Request>) {
    let mut batch = Vec::new();
    let max_batch_size = 32;
    let timeout = Duration::from_millis(5);

    loop {
        tokio::select! {
            // Case 1: New Request Arrived
            Some(req) = rx.recv() => {
                batch.push(req);
                if batch.len() >= max_batch_size {
                    process_batch(&mut batch).await;
                }
            }
            // Case 2: Timeout Expired
            _ = tokio::time::sleep(timeout), if !batch.is_empty() => {
                process_batch(&mut batch).await;
            }
        }
    }
}
}

This loop is “Zero CPU” while waiting. It grabs the OS timer interrupt accurately.

45.4.7. Reference Architecture: The Dynamic Batcher Actor

This is a production-grade implementation of Dynamic Batching. It uses tokio::select! to handle timeouts (latency budget) and Vec::with_capacity to prevent allocation churn.

src/batcher.rs

#![allow(unused)]
fn main() {
use tokio::sync::{mpsc, oneshot};
use tokio::time::{timeout, Duration};
use ndarray::{Array2, Axis};
use std::sync::Arc;

// Configuration
const MAX_BATCH_SIZE: usize = 32;
const MAX_LATENCY_MS: u64 = 5;

// The Request Object
pub struct BatchRequest {
    pub input: Vec<f32>,
    pub tx: oneshot::Sender<Vec<f32>>, // Send result back
}

// The Batcher Struct (Handle)
#[derive(Clone)]
pub struct Batcher {
    tx: mpsc::Sender<BatchRequest>,
}

impl Batcher {
    // Spawns the Actor Loop
    pub fn new(model: Arc<Model>) -> Self {
        let (tx, rx) = mpsc::channel(1024); // Backpressure buffer
        
        tokio::spawn(async move {
            run_actor_loop(rx, model).await;
        });
        
        Self { tx }
    }
    
    // Public API
    pub async fn predict(&self, input: Vec<f32>) -> Result<Vec<f32>, String> {
        let (resp_tx, resp_rx) = oneshot::channel();
        
        let req = BatchRequest {
            input,
            tx: resp_tx,
        };
        
        // Send to Actor
        self.tx.send(req).await.map_err(|_| "Actor died")?;
        
        // Wait for Actor response
        resp_rx.await.map_err(|_| "Response dropped")
    }
}

// The Actor Loop (Zero Allocation Hot Path)
async fn run_actor_loop(mut rx: mpsc::Receiver<BatchRequest>, model: Arc<Model>) {
    // Pre-allocate buffer to avoid reallocating every loop
    let mut buffer: Vec<BatchRequest> = Vec::with_capacity(MAX_BATCH_SIZE);
    
    loop {
        // 1. Fetch first item (wait indefinitely)
        let first = match rx.recv().await {
            Some(req) => req,
            None => break, // Channel closed
        };
        buffer.push(first);
        
        // 2. Deadline for the batch
        let deadline = tokio::time::Instant::now() + Duration::from_millis(MAX_LATENCY_MS);
        
        // 3. Fill the rest of the batch (up to MAX_BATCH_SIZE) or Timeout
        while buffer.len() < MAX_BATCH_SIZE {
            let time_left = deadline.saturating_duration_since(tokio::time::Instant::now());
            if time_left.is_zero() {
                break;
            }
            
            // Wait for next item OR timeout
            match timeout(time_left, rx.recv()).await {
                Ok(Some(req)) => buffer.push(req),
                Ok(None) => return, // Channel closed
                Err(_) => break, // Timeout reached! Commit batch.
            }
        }
        
        // 4. BATCH IS READY. EXECUTE.
        // Convert [Vec<f32>] -> Array2<f32> (Batch Tensor)
        // This copy is necessary unless we use 'Bytes' (Scatter/Gather)
        let batch_size = buffer.len();
        let flat_input: Vec<f32> = buffer.iter().flat_map(|r| r.input.clone()).collect();
        // Assuming 512 dims
        let tensor = Array2::from_shape_vec((batch_size, 512), flat_input).unwrap();
        
        // Run Inference (Global Interpreter Lock Free!)
        let results = model.forward(tensor);
        
        // 5. Distribute Results
        // 'results' is (Batch, 10)
        for (i, req) in buffer.drain(..).enumerate() {
            let row = results.index_axis(Axis(0), i).to_vec();
            let _ = req.tx.send(row);
        }
        
        // Buffer is empty (drain), capacity is preserved.
        // Loop continues.
    }
}
}

45.4.8. Why this beats Python

In Python, implementing this loop with asyncio is possible, but:

  1. Event Loop Overhead: Python’s loop wakes up, acquires GIL, checks recv, releases GIL.
  2. Latency Jitter: If the GC runs during rx.recv(), your 5ms deadline becomes 50ms.
  3. Throughput: Rust handles channel messages in nanoseconds. Python handles them in microseconds.

The Rust implementation guarantees that the Batch Latency is exactly max(5ms, T_inference). No GC spikes.

45.4.9. Scaling to Multi-GPU

Dynamic Batching is trivial to shard. Just instantiate Batcher multiple times. Or, make the Batcher send full batches to a RoundRobin channel that feeds 4 GPU workers. The mpsc::channel acts as the perfect Load Balancer.

45.4.10. Handling Cancellation

If the User disconnects (HTTP client closes socket), resp_rx in predict drops. When the Actor tries to send req.tx.send(row), it fails. Rust handles this gracefully (Result::Err). You don’t process potential “Zombie Requests” because you check connection status before pushing to buffer (optional optimization).

45.4.11. Observability: Metrics that Matter

A dashboard with “CPU Usage” is useless. You need “Queue Depth” and “Token Latency”. We use metrics and metrics-exporter-prometheus.

Instrumentation

#![allow(unused)]
fn main() {
use metrics::{histogram, counter, gauge};

async fn run_actor_loop(...) {
    loop {
        let queue_len = rx.len();
        gauge!("inference_queue_depth", queue_len as f64);
        
        let start = Instant::now();
        // ... inference ...
        let latency = start.elapsed();
        histogram!("inference_latency_seconds", latency.as_secs_f64());
        
        counter!("inference_requests_total", batch_size as u64);
    }
}
}

Exposing /metrics Endpoint

use metrics_exporter_prometheus::PrometheusBuilder;

async fn main() {
    let builder = PrometheusBuilder::new();
    let handle = builder.install_recorder().expect("failed to install recorder");
    
    let app = Router::new()
        .route("/metrics", get(move || std::future::ready(handle.render())));
}

Now point Grafana to localhost:3000/metrics.

45.4.12. Performance Tuning: The OS Layer

You can write the fastest Rust code, but if the Linux Kernel blocks you, you lose.

1. TCP Keepalives & Backlog

By default, the backlog (pending connections) is small (128). For 10k RPS, you need to bump it.

#![allow(unused)]
fn main() {
let listener = TcpListener::bind(addr).await.unwrap();
// Rust doesn't expose backlog easily, setup usually happens in sysctl
}

Sysctl Config:

# /etc/sysctl.conf
net.core.somaxconn = 65535
net.ipv4.tcp_max_syn_backlog = 65535
net.ipv4.ip_local_port_range = 1024 65535

2. File Descriptors

Every socket is a file. The default limit is 1024. If you have 5000 concurrent users, the server crashes with Too many open files.

Fix: ulimit -n 100000 (in your Dockerfile/Systemd).

45.4.13. Load Shedding: Survival of the Fittest

When the GPU is saturated, accepting more requests just increases latency for everyone. It is better to return 503 Service Unavailable instantly.

#![allow(unused)]
fn main() {
use tower::load_shed::LoadShedLayer;

let service = ServiceBuilder::new()
    .layer(LoadShedLayer::new()) // Reject if inner service is not ready
    .service(inner_service);
}

Implementing Backpressure in Actor:

#![allow(unused)]
fn main() {
// Web Handler
if state.sender.capacity() == 0 {
    // Queue is full. Shed load.
    return StatusCode::SERVICE_UNAVAILABLE;
}
state.sender.send(msg).await;
}

45.4.14. Streaming Responses (LLM Style)

For LLMs, waiting 5 seconds for the full text is bad UX. We need Server-Sent Events (SSE).

#![allow(unused)]
fn main() {
use axum::response::sse::{Event, Sse};
use futures::stream::Stream;

async fn stream_handler(
    State(state): State<AppState>,
    Json(payload): Json<Payload>
) -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
    
    // Create a channel for tokens
    let (tx, rx) = mpsc::channel(100);
    
    // Send request to Actor (Actor must support streaming)
    state.sender.send(StreamingRequest { input: payload, tx }).await.unwrap();
    
    // Convert Receiver to Stream
    let stream = tokio_stream::wrappers::ReceiverStream::new(rx)
        .map(|token| {
            Ok(Event::default().data(token))
        });
        
    Sse::new(stream).keep_alive(axum::response::sse::KeepAlive::default())
}
}

This pipes the tokio::sync::mpsc channel directly to the HTTP Response body. Unique to Rust/Tokio.

45.4.15. Advanced Pattern: Redis Caching Layer

Inference is expensive. Looking up a key in Redis is cheap. We use bb8 (Connection Pool) + redis crate.

#![allow(unused)]
fn main() {
use bb8_redis::{RedisConnectionManager, bb8};

type Pool = bb8::Pool<RedisConnectionManager>;

async fn predict_cached(
    State(pool): State<Pool>,
    Json(payload): Json<Payload>
) -> Json<Response> {
    // 1. Hash the Input
    let key = format!("cache:{}", hash(&payload));
    
    // 2. Check Cache
    let mut conn = pool.get().await.unwrap();
    if let Ok(cached) = redis::cmd("GET").arg(&key).query_async(&mut *conn).await {
        return Json(serde_json::from_str(&cached).unwrap());
    }
    
    // 3. Miss? Run Inference
    let result = run_inference(payload).await;
    
    // 4. Write Cace (TTL 1 hour)
    let _ = redis::cmd("SETEX").arg(&key).arg(3600).arg(json_str).query_async(&mut *conn).await;
    
    Json(result)
}
}

45.4.16. Request De-duplication (Singleflight)

If 100 users ask “What is the capital of France?” at the exact same millisecond:

  • Naive Server: Runs inference 100 times.
  • Smart Server: Runs inference 1 time, returns result to 100 users.

This is called Singleflight.

#![allow(unused)]
fn main() {
use cached::stores::TimedCache;
use tokio::sync::Mutex;
use std::sync::Arc;

// Map: QueryHash -> WaitBuffer
type InFlight = Arc<Mutex<HashMap<String, Vec<oneshot::Sender<Response>>>>>;

async fn deduplicated_handler(
    State(inflight): State<InFlight>,
    Json(payload): Json<Payload>
) -> Json<Response> {
    let key = hash(&payload);
    let (tx, rx) = oneshot::channel();
    let mut is_leader = false;
    
    {
        let mut map = inflight.lock().await;
        if let Some(waiters) = map.get_mut(&key) {
           waiters.push(tx); // I am a follower
        } else {
           map.insert(key.clone(), vec![tx]); // I am the leader
           is_leader = true;
        }
    }
    
    if is_leader {
        let result = run_model(payload).await;
        let mut map = inflight.lock().await;
        if let Some(waiters) = map.remove(&key) {
            for waiter in waiters {
                let _ = waiter.send(result.clone());
            }
        }
    }
    
    Json(rx.await.unwrap())
}
}

45.4.17. Authentication Middleware (JWT)

Unless you are giving away free compute, you need Auth. Axum middleware makes this clean.

#![allow(unused)]
fn main() {
use axum_extra::headers::{Authorization, authorization::Bearer};
use jsonwebtoken::{decode, DecodingKey, Validation};

async fn auth_middleware<B>(
    request: Request<B>,
    next: Next<B>,
) -> Result<Response, StatusCode> {
    let headers = request.headers();
    let auth_header = headers.get("Authorization")
        .and_then(|h| h.to_str().ok())
        .and_then(|h| h.strip_prefix("Bearer "));
        
    let token = match auth_header {
        Some(t) => t,
        None => return Err(StatusCode::UNAUTHORIZED),
    };
    
    // CPU-intensive crypto, but negligible compared to inference
    let token_data = decode::<Claims>(
        token,
        &DecodingKey::from_secret("secret".as_ref()),
        &Validation::default(),
    ).map_err(|_| StatusCode::UNAUTHORIZED)?;
    
    // Inject UserID into Request Extensions
    let mut request = request;
    request.extensions_mut().insert(token_data.claims.user_id);
    
    Ok(next.run(request).await)
}
}

45.4.18. Load Balancing Strategies

When running a cluster of Rust pods:

  1. Round Robin: Good for homogenous requests.
  2. Least Connections: Better for variable length generation.
  3. Peak EWMA (Exponential Weighted Moving Average): The gold standard.

In Rust, you handle this at the tower layer in your Gateway.

#![allow(unused)]
fn main() {
use tower::balance::p2c::Balance;
use tower::load::PeakEwma;

let service = Balance::new(discover);
let service = PeakEwma::new(service, decay_ns, default_rtt, cost_fn);
}

This is built-in to the ecosystem. No need for Nginx/Envoy if you build a Rust Gateway.

45.4.19. Handling Large Payloads (Multipart)

Sending an Image (5MB) via JSON is slow (Base64 overhead + 33% bloat). Use Multipart.

#![allow(unused)]
fn main() {
use axum::extract::Multipart;

async fn upload_image(mut multipart: Multipart) {
    while let Some(field) = multipart.next_field().await.unwrap() {
        let name = field.name().unwrap().to_string();
        let data = field.bytes().await.unwrap();
        
        println!("Received {} bytes for {}", data.len(), name);
        // Zero-Copy conversion to Tensor
        // ...
    }
}
}

45.4.20. Final Exam: The 100k RPS Architecture

Scenario: You are serving a Spam Detection Model (DistilBERT). Traffic: 100k emails/sec. SLA: P99 < 50ms.

The Rust Solution:

  1. Ingress: Cloudflare -> Rust Gateway (Axum).
  2. Gateway:
    • Auth (JWT).
    • Deduplication (10% cache hit).
    • Sharding (Hash email -> Specific Worker Pod).
  3. Worker (Pod):
    • tokio::mpsc Actor (Batch Size 128).
    • ONNX Runtime (Int8 Quantization).
    • Metrics Reporter.

Why Python Fails: Python’s uvicorn creates a new Task for every request. At 100k RPS, the Scheduler overhead kills the CPU before the model even runs. Rust’s tokio creates a lightweight Future (200 bytes state machine). It scales linearly until the NIC saturates.

[End of Section 45.4]

45.4.21. Connection Pooling and Resource Management

Managing connections efficiently is critical for high-throughput serving.

HTTP Client Pooling

#![allow(unused)]
fn main() {
use reqwest::Client;
use std::sync::Arc;

pub struct InferenceClient {
    client: Arc<Client>,
    endpoints: Vec<String>,
    current_idx: std::sync::atomic::AtomicUsize,
}

impl InferenceClient {
    pub fn new(endpoints: Vec<String>, max_connections: usize) -> Self {
        let client = Client::builder()
            .pool_max_idle_per_host(max_connections)
            .pool_idle_timeout(std::time::Duration::from_secs(30))
            .timeout(std::time::Duration::from_secs(5))
            .tcp_keepalive(std::time::Duration::from_secs(60))
            .build()
            .unwrap();
        
        Self {
            client: Arc::new(client),
            endpoints,
            current_idx: std::sync::atomic::AtomicUsize::new(0),
        }
    }
    
    fn next_endpoint(&self) -> &str {
        let idx = self.current_idx.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
        &self.endpoints[idx % self.endpoints.len()]
    }
    
    pub async fn predict(&self, input: &[f32]) -> Result<Vec<f32>, Error> {
        let endpoint = self.next_endpoint();
        
        let response = self.client
            .post(format!("{}/predict", endpoint))
            .json(&serde_json::json!({ "input": input }))
            .send()
            .await?
            .json()
            .await?;
        
        Ok(response)
    }
}
}

GPU Memory Pool

#![allow(unused)]
fn main() {
use std::collections::VecDeque;

pub struct GpuMemoryPool {
    available: tokio::sync::Mutex<VecDeque<GpuBuffer>>,
    buffer_size: usize,
    max_buffers: usize,
}

impl GpuMemoryPool {
    pub async fn acquire(&self) -> GpuBuffer {
        let mut available = self.available.lock().await;
        
        if let Some(buffer) = available.pop_front() {
            return buffer;
        }
        
        // Allocate new buffer if under limit
        if available.len() < self.max_buffers {
            return GpuBuffer::allocate(self.buffer_size);
        }
        
        // Wait for buffer to become available
        drop(available);
        loop {
            tokio::time::sleep(std::time::Duration::from_millis(1)).await;
            let mut available = self.available.lock().await;
            if let Some(buffer) = available.pop_front() {
                return buffer;
            }
        }
    }
    
    pub async fn release(&self, buffer: GpuBuffer) {
        let mut available = self.available.lock().await;
        available.push_back(buffer);
    }
}
}

45.4.22. Zero-Copy Request/Response

Avoid copying data between network and model.

Using Bytes Crate

#![allow(unused)]
fn main() {
use bytes::Bytes;
use axum::body::Body;

async fn predict_zero_copy(body: Body) -> impl IntoResponse {
    // Stream body directly without buffering
    let bytes = axum::body::to_bytes(body, 10_000_000).await.unwrap();
    
    // bytes is Arc-backed, can be shared without copying
    let result = process_input(&bytes).await;
    
    // Return response using same mechanism
    Response::builder()
        .header("Content-Type", "application/octet-stream")
        .body(Body::from(result))
        .unwrap()
}
}

Memory-Mapped Input

#![allow(unused)]
fn main() {
use memmap2::Mmap;

pub struct MappedModel {
    weights: Mmap,
}

impl MappedModel {
    pub fn load(path: &str) -> Result<Self, Error> {
        let file = std::fs::File::open(path)?;
        
        // Memory-map the file
        // OS handles paging, we don't load entire 7GB into RAM
        let weights = unsafe { Mmap::map(&file)? };
        
        Ok(Self { weights })
    }
    
    pub fn get_layer(&self, offset: usize, size: usize) -> &[f32] {
        // Direct pointer into mapped memory
        // No copy, no allocation
        let bytes = &self.weights[offset..offset + size * 4];
        unsafe {
            std::slice::from_raw_parts(
                bytes.as_ptr() as *const f32,
                size
            )
        }
    }
}
}

45.4.23. Structured Concurrency

Manage complex async workflows safely.

#![allow(unused)]
fn main() {
use tokio::task::JoinSet;

async fn parallel_inference(inputs: Vec<Input>) -> Vec<Output> {
    let mut set = JoinSet::new();
    
    for input in inputs {
        set.spawn(async move {
            run_single_inference(input).await
        });
    }
    
    let mut results = Vec::with_capacity(set.len());
    
    while let Some(result) = set.join_next().await {
        match result {
            Ok(output) => results.push(output),
            Err(e) => {
                tracing::error!("Task panicked: {:?}", e);
                // Continue with other tasks
            }
        }
    }
    
    results
}
}

Cancellation-Safe Operations

#![allow(unused)]
fn main() {
use tokio_util::sync::CancellationToken;

pub struct InferenceService {
    cancel_token: CancellationToken,
}

impl InferenceService {
    pub async fn run(&self, input: Input) -> Result<Output, Error> {
        tokio::select! {
            result = self.do_inference(input) => {
                result
            }
            _ = self.cancel_token.cancelled() => {
                Err(Error::Cancelled)
            }
        }
    }
    
    pub fn shutdown(&self) {
        self.cancel_token.cancel();
    }
}
}

45.4.24. Comprehensive Health Checks

Production services need detailed health information.

#![allow(unused)]
fn main() {
use serde::Serialize;

#[derive(Serialize)]
pub struct HealthStatus {
    status: String,
    components: HashMap<String, ComponentHealth>,
    metadata: Metadata,
}

#[derive(Serialize)]
pub struct ComponentHealth {
    status: String,
    latency_ms: Option<f64>,
    error: Option<String>,
}

#[derive(Serialize)]
pub struct Metadata {
    version: String,
    uptime_seconds: u64,
    requests_total: u64,
    requests_failed: u64,
}

async fn health_check(State(state): State<AppState>) -> Json<HealthStatus> {
    let mut components = HashMap::new();
    
    // Check model
    let model_health = check_model_health(&state.model).await;
    components.insert("model".to_string(), model_health);
    
    // Check GPU
    let gpu_health = check_gpu_health().await;
    components.insert("gpu".to_string(), gpu_health);
    
    // Check dependencies
    let redis_health = check_redis_health(&state.redis).await;
    components.insert("redis".to_string(), redis_health);
    
    // Aggregate status
    let all_healthy = components.values().all(|c| c.status == "healthy");
    
    Json(HealthStatus {
        status: if all_healthy { "healthy" } else { "degraded" }.to_string(),
        components,
        metadata: Metadata {
            version: env!("CARGO_PKG_VERSION").to_string(),
            uptime_seconds: state.start_time.elapsed().as_secs(),
            requests_total: state.metrics.requests_total.load(Ordering::Relaxed),
            requests_failed: state.metrics.requests_failed.load(Ordering::Relaxed),
        },
    })
}

async fn check_gpu_health() -> ComponentHealth {
    match get_gpu_utilization() {
        Ok(util) => ComponentHealth {
            status: if util < 95.0 { "healthy" } else { "degraded" }.to_string(),
            latency_ms: None,
            error: None,
        },
        Err(e) => ComponentHealth {
            status: "unhealthy".to_string(),
            latency_ms: None,
            error: Some(e.to_string()),
        },
    }
}
}

45.4.25. A/B Testing in the Serving Layer

Route traffic to different model versions.

#![allow(unused)]
fn main() {
pub struct ABRouter {
    models: HashMap<String, Arc<dyn Model>>,
    traffic_split: HashMap<String, f32>, // model_name -> percentage
}

impl ABRouter {
    pub fn route(&self, request_id: &str) -> &Arc<dyn Model> {
        // Deterministic routing based on request ID
        let hash = fxhash::hash64(request_id.as_bytes());
        let normalized = (hash % 10000) as f32 / 100.0; // 0-100
        
        let mut cumulative = 0.0;
        for (model_name, percentage) in &self.traffic_split {
            cumulative += percentage;
            if normalized < cumulative {
                return self.models.get(model_name).unwrap();
            }
        }
        
        // Fallback to first model
        self.models.values().next().unwrap()
    }
    
    pub async fn predict(&self, request_id: &str, input: Input) -> Output {
        let model = self.route(request_id);
        
        // Record which variant was used
        metrics::counter!("ab_variant", "variant" => model.name()).increment(1);
        
        model.predict(input).await
    }
}
}

45.4.26. Production Deployment Checklist

Pre-Deployment

  • Load test at 2x expected peak traffic
  • Verify graceful shutdown behavior
  • Test circuit breaker activation
  • Validate health check endpoints
  • Review timeout configurations

Deployment

  • Blue-green or canary deployment
  • Monitor error rates during rollout
  • Verify metrics are flowing
  • Check log aggregation

Post-Deployment

  • Establish baseline latency
  • Set up alerting thresholds
  • Document runbook for incidents
  • Schedule chaos engineering tests

45.4.27. Final Architecture Summary

┌─────────────────────────────────────────────────────────────────────┐
│                High-Performance Inference Architecture               │
├─────────────────────────────────────────────────────────────────────┤
│                                                                      │
│  Internet Traffic                                                    │
│       │                                                              │
│       ▼                                                              │
│  ┌─────────────────────────────────────────────────────────────────┐│
│  │  Load Balancer (with SSL termination)                           ││
│  └──────────────────────────┬──────────────────────────────────────┘│
│                             │                                        │
│  ┌──────────────────────────▼──────────────────────────────────────┐│
│  │  API Gateway (Axum)                                             ││
│  │  • Rate Limiting  • Auth (JWT)  • Request Validation            ││
│  │  • Deduplication  • Response Caching                            ││
│  └──────────────────────────┬──────────────────────────────────────┘│
│                             │                                        │
│  ┌──────────────────────────▼──────────────────────────────────────┐│
│  │  Request Router                                                  ││
│  │  • A/B Testing  • Model Selection  • Load Balancing             ││
│  └──────────────────────────┬──────────────────────────────────────┘│
│                             │                                        │
│  ┌──────────────────────────▼──────────────────────────────────────┐│
│  │  Dynamic Batcher (Actor Pattern)                                ││
│  │  • Accumulate requests  • Timeout handling  • Backpressure      ││
│  └──────────────────────────┬──────────────────────────────────────┘│
│                             │                                        │
│  ┌──────────────────────────▼──────────────────────────────────────┐│
│  │  Model Execution (GPU/CPU)                                       ││
│  │  • CUDA Kernels  • Quantization  • Memory-mapped weights        ││
│  └─────────────────────────────────────────────────────────────────┘│
│                                                                      │
└─────────────────────────────────────────────────────────────────────┘

This architecture handles 100k+ RPS with sub-5ms P99 latency.

[End of Section 45.4]