Keyboard shortcuts

Press or to navigate between chapters

Press ? to show this help

Press Esc to hide this help

45.9. MLOps Tooling: Building the Platform

Tip

The Mindset: In Python, you write scripts. In Rust, you build Tools. A Python script breaks when you change a CUDA version. A Rust binary works forever. This chapter covers how to build professional-grade CLI tools for MLOps.

45.9.1. The User Interface: clap

Documentation is good. Self-documenting CLIs are better. clap (Command Line Argument Parser) is the standard.

Defining the CLI

use clap::{Parser, Subcommand, ValueEnum};

#[derive(Parser)]
#[command(name = "ml-platform")]
#[command(about = "The Corporate MLOps CLI", long_about = None)]
struct Cli {
    #[command(subcommand)]
    command: Commands,

    /// Verbosity level
    #[arg(short, long, global = true, action = clap::ArgAction::Count)]
    verbose: u8,
}

#[derive(Subcommand)]
enum Commands {
    /// Train a model on a dataset
    Train {
        /// Path to dataset
        #[arg(short, long)]
        dataset: String,

        /// Learning Rate
        #[arg(long, default_value_t = 0.001)]
        lr: f64,

        /// Optimizer type
        #[arg(long, value_enum, default_value_t = Optimizer::Adam)]
        optim: Optimizer,
    },
    /// Serve a trained model
    Serve {
        /// Port to bind to
        #[arg(short, long, default_value_t = 8080)]
        port: u16,
    },
}

#[derive(Copy, Clone, PartialEq, Eq, PartialOrd, Ord, ValueEnum)]
enum Optimizer {
    Adam,
    Sgd,
    RmsProp,
}

fn main() {
    let cli = Cli::parse();

    match &cli.command {
        Commands::Train { dataset, lr, optim } => {
            println!("Training on {} with lr={} optim={:?}", dataset, lr, optim);
        }
        Commands::Serve { port } => {
            println!("Serving on port {}", port);
        }
    }
}

Why this matters: Type ml-platform --help. You get a man-page generated for you. Type ml-platform train --optim foo. You get “error: invalid value ‘foo’”. This prevents “Config Drift” where colleagues run scripts with invalid arguments.

45.9.2. Configuration Management: config

Hardcoding paths is bad. Environment variables are better. Layered config is best. The config crate merges:

  1. config/default.toml
  2. config/production.toml
  3. ML_PLATFORM_DB_URL environment variable.
#![allow(unused)]
fn main() {
use config::{Config, File, Environment};
use serde::Deserialize;

#[derive(Debug, Deserialize)]
struct Settings {
    database: DatabaseSettings,
    s3: S3Settings,
}

#[derive(Debug, Deserialize)]
struct DatabaseSettings {
    url: String,
    pool_size: u32,
}

#[derive(Debug, Deserialize)]
struct S3Settings {
    bucket: String,
    region: String,
}

impl Settings {
    pub fn new() -> Result<Self, config::ConfigError> {
        let run_mode = std::env::var("RUN_MODE").unwrap_or_else(|_| "development".into());

        let s = Config::builder()
            // Start with defaults
            .add_source(File::with_name("config/default"))
            // Add environment specific config
            .add_source(File::with_name(&format!("config/{}", run_mode)).required(false))
            // Add Environment Variables (e.g. APP_DATABASE__URL=...)
            .add_source(Environment::with_prefix("APP").separator("__"))
            .build()?;

        s.try_deserialize()
    }
}
}

45.9.3. Terminal UIs (TUI): ratatui

Sometimes you need to monitor training on a remote server via SSH. Using tqdm is okay. Using a full TUI Dashboard is professional. Ratatui is the successor to tui-rs.

Designing a Dashboard

#![allow(unused)]
fn main() {
use ratatui::{
    backend::CrosstermBackend,
    widgets::{Block, Borders, Gauge, Chart, Dataset},
    Terminal,
};

fn draw_ui<B: Backend>(f: &mut Frame<B>, state: &AppState) {
    let chunks = Layout::default()
        .direction(Direction::Vertical)
        .constraints([Constraint::Percentage(50), Constraint::Percentage(50)].as_ref())
        .split(f.size());

    // 1. Loss Chart
    let datasets = vec![
        Dataset::default()
            .name("Training Loss")
            .marker(symbols::Marker::Braille)
            .style(Style::default().fg(Color::Cyan))
            .data(&state.loss_history),
    ];
    let chart = Chart::new(datasets)
        .block(Block::default().title("Loss").borders(Borders::ALL));
    f.render_widget(chart, chunks[0]);

    // 2. GPU Utilization Gauge
    let gauge = Gauge::default()
        .block(Block::default().title("GPU Usage").borders(Borders::ALL))
        .gauge_style(Style::default().fg(Color::Red))
        .percent(state.gpu_util);
    f.render_widget(gauge, chunks[1]);
}
}

45.9.4. Docker Optimization: The 20MB Container

Python containers are huge (1GB+ for pytorch). Rust containers can be tiny (scratch images) or small (distroless).

Technique 1: cargo-chef (Layer Caching)

Compiling dependencies takes time. Docker doesn’t cache cargo build well because Cargo.toml rarely changes but src/ always changes. cargo-chef computes a “recipe” (dependency tree) to cache crates.

Technique 2: Distroless

Google’s Distroless images contain GLIBC and SSL certs, but no Shell. Perfect for security.

The Ultimate Dockerfile

# Stage 1: Plan
FROM lukemathwalker/cargo-chef:latest-rust-1.75 AS planner
WORKDIR /app
COPY . .
RUN cargo chef prepare --recipe-path recipe.json

# Stage 2: Cache Dependencies
FROM lukemathwalker/cargo-chef:latest-rust-1.75 AS cacher
WORKDIR /app
COPY --from=planner /app/recipe.json recipe.json
# Build dependencies - this is the caching layer!
RUN cargo chef cook --release --recipe-path recipe.json

# Stage 3: Builder
FROM lukemathwalker/cargo-chef:latest-rust-1.75 AS builder
WORKDIR /app
COPY . .
# Copy compiled dependencies
COPY --from=cacher /app/target target
COPY --from=cacher /usr/local/cargo /usr/local/cargo
RUN cargo build --release --bin ml-platform

# Stage 4: Runtime
# Use 'cc-debian12' for GLIBC compatibility
FROM gcr.io/distroless/cc-debian12
COPY --from=builder /app/target/release/ml-platform /
CMD ["./ml-platform"]

Result: A 25MB Docker image that runs your entire ML pipeline.

45.9.5. Cross Compilation: Building for ARM on x86

You develop on Mac (ARM). You deploy to Linux (x86). In Python, this is fine. In C++, this is hell. In Rust, we use cross.

# Install Cross
cargo install cross

# Build for Linux x86_64
cross build --target x86_64-unknown-linux-gnu --release

# Build for Raspberry Pi
cross build --target aarch64-unknown-linux-gnu --release

cross uses Docker transparently to provide the toolchain.

45.9.6. CI/CD: Testing and Linting

Rust’s CI is fast if you use nextest. cargo-nextest runs tests in parallel processes (isolating failures).

The GitHub Actions Workflow

name: Rust CI
on: [push, pull_request]

jobs:
  test:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
      - uses: dtolnay/rust-toolchain@stable
      
      # Cache Cargo Registry + Target
      - uses: Swatinem/rust-cache@v2
      
      # Install Nextest
      - uses: taiki-e/install-action@nextest
      
      # Run Tests
      - run: cargo nextest run
      
      # Linting (Clippy)
      - run: cargo clippy -- -D warnings
      
      # Formatting
      - run: cargo fmt --check

45.9.7. Property Based Testing: proptest

Unit tests (assert_eq!(add(2, 2), 4)) are weak. Property tests (assert_eq!(add(a, b), add(b, a))) are strong. proptest generates thousands of random inputs trying to break your code.

#![allow(unused)]
fn main() {
use proptest::prelude::*;

proptest! {
    #[test]
    fn test_normalization_invariants(data in prop::collection::vec(0.0f32..10.0f32, 1..100)) {
        let normalized = normalize(&data);
        
        // Invariant 1: Max is 1.0 (approx)
        let max = normalized.iter().fold(0.0/0.0, |m, v| v.max(m));
        prop_assert!(max <= 1.0 + 1e-6);
        
        // Invariant 2: Length preserved
        prop_assert_eq!(data.len(), normalized.len());
    }
}
}

This finds edge cases (Empty vector? NaN? Infinity?) that humans miss.

45.9.8. Error Handling Best Practices

Do not use unwrap(). Do not use String as error.

Library Code: thiserror

If you are writing a crate for others (my-ml-lib), use thiserror.

#![allow(unused)]
fn main() {
#[derive(thiserror::Error, Debug)]
pub enum ModelError {
    #[error("tensor shape mismatch: expected {expected:?}, got {found:?}")]
    ShapeMismatch { expected: Vec<usize>, found: Vec<usize> },
    
    #[error("io error")]
    Io(#[from] std::io::Error),
}
}

Application Code: anyhow / eyre

If you are writing the CLI (ml-platform), use anyhow. It adds context to stacks.

fn main() -> anyhow::Result<()> {
    load_model().context("Failed to load initial model")?;
    Ok(())
}

Output: Error: Failed to load initial model Caused by: file not found

45.9.9. Release Engineering: cargo-dist

Shipping binaries to users is hard (building MSIs, DEBs, Homebrew taps). cargo-dist automates this. It generates a CI workflow that:

  1. Builds for all platforms.
  2. Zips them up.
  3. Creates a GitHub Release.
  4. Generates a shell installer script.

Run cargo dist init and commit the workflow. Users can now: curl --proto '=https' --tlsv1.2 -LsSf https://github.com/myorg/ml-platform/releases/download/v0.1.0/ml-platform-installer.sh | sh

45.9.10. Final Checklist for MLOps Tooling

  1. Safety: Use clippy to enforce best practices.
  2. Config: Use config crate for layered settings.
  3. Observability: Use tracing for structured logs.
  4. UI: Use clap for CLI and ratatui for dashboards.
  5. Distribution: Use cargo-dist + Distroless Docker images.

[End of Section 45.9]

45.9.11. Structured Logging with tracing

println! is for scripts. tracing is for production. It provides structured, contextual logging with spans.

Setting Up Tracing

#![allow(unused)]
fn main() {
use tracing::{info, warn, error, span, Level, Instrument};
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt, fmt};

fn init_tracing() {
    tracing_subscriber::registry()
        .with(fmt::layer()
            .json()  // JSON output for log aggregation
            .with_target(false)
            .with_thread_ids(true))
        .init();
}

async fn train_model(config: &TrainConfig) -> Result<Model, Error> {
    // Create a span for the entire training run
    let span = span!(Level::INFO, "train", 
        model_name = %config.model_name,
        dataset = %config.dataset_path
    );
    let _enter = span.enter();
    
    info!(learning_rate = config.lr, epochs = config.epochs, "Starting training");
    
    for epoch in 0..config.epochs {
        let epoch_span = span!(Level::DEBUG, "epoch", epoch = epoch);
        let _epoch_enter = epoch_span.enter();
        
        let loss = train_epoch(&config).await?;
        
        info!(loss = loss, "Epoch completed");
        
        if loss.is_nan() {
            error!(lr = config.lr, "Loss diverged!");
            return Err(Error::TrainingDiverged);
        }
    }
    
    Ok(model)
}
}

Output (JSON Format)

{
  "timestamp": "2024-01-15T10:30:45Z",
  "level": "INFO",
  "span": {"train": {"model_name": "bert-base", "dataset": "squad"}},
  "message": "Starting training",
  "fields": {"learning_rate": 0.001, "epochs": 10}
}

Distributed Tracing with OpenTelemetry

#![allow(unused)]
fn main() {
use opentelemetry::sdk::trace::TracerProvider;
use opentelemetry_otlp::WithExportConfig;
use tracing_opentelemetry::OpenTelemetryLayer;

fn init_otel_tracing() {
    let tracer = opentelemetry_otlp::new_pipeline()
        .tracing()
        .with_exporter(
            opentelemetry_otlp::new_exporter()
                .tonic()
                .with_endpoint("http://jaeger:4317")
        )
        .install_batch(opentelemetry::runtime::Tokio)
        .unwrap();
    
    tracing_subscriber::registry()
        .with(OpenTelemetryLayer::new(tracer))
        .with(fmt::layer().json())
        .init();
}
}

Now your logs appear in Jaeger/Grafana Tempo with full trace context!

45.9.12. Metrics with metrics Crate

Beyond logs, you need metrics for dashboards.

#![allow(unused)]
fn main() {
use metrics::{counter, gauge, histogram};
use metrics_exporter_prometheus::PrometheusBuilder;

fn init_metrics() {
    // Start Prometheus exporter on :9090/metrics
    PrometheusBuilder::new()
        .with_http_listener(([0, 0, 0, 0], 9090))
        .install()
        .expect("Failed to install Prometheus recorder");
}

fn record_training_metrics(epoch: u32, loss: f64, lr: f64) {
    gauge!("training_epoch").set(epoch as f64);
    histogram!("training_loss").record(loss);
    gauge!("training_learning_rate").set(lr);
    counter!("training_steps_total").increment(1);
}

fn record_inference_metrics(model: &str, latency: std::time::Duration, success: bool) {
    let labels = vec![("model", model.to_string())];
    
    histogram!("inference_latency_seconds", &labels)
        .record(latency.as_secs_f64());
    
    if success {
        counter!("inference_success_total", &labels).increment(1);
    } else {
        counter!("inference_failure_total", &labels).increment(1);
    }
}
}

45.9.13. Model Registry

Track model versions, metadata, and lineage.

#![allow(unused)]
fn main() {
use serde::{Deserialize, Serialize};
use std::path::PathBuf;

#[derive(Debug, Serialize, Deserialize)]
pub struct ModelVersion {
    pub name: String,
    pub version: String,
    pub created_at: chrono::DateTime<chrono::Utc>,
    pub metrics: HashMap<String, f64>,
    pub parameters: HashMap<String, serde_json::Value>,
    pub artifact_path: PathBuf,
    pub git_commit: String,
    pub tags: Vec<String>,
}

pub struct ModelRegistry {
    storage: Box<dyn ModelStorage>,
}

impl ModelRegistry {
    pub async fn register(
        &self,
        name: &str,
        model_path: &Path,
        metrics: HashMap<String, f64>,
        params: HashMap<String, serde_json::Value>,
    ) -> Result<ModelVersion, Error> {
        // Generate version (semantic or timestamp-based)
        let version = self.next_version(name).await?;
        
        // Get git commit
        let git_commit = std::process::Command::new("git")
            .args(["rev-parse", "HEAD"])
            .output()
            .map(|o| String::from_utf8_lossy(&o.stdout).trim().to_string())
            .unwrap_or_else(|_| "unknown".to_string());
        
        // Upload artifact
        let artifact_path = self.storage.upload(name, &version, model_path).await?;
        
        let model_version = ModelVersion {
            name: name.to_string(),
            version,
            created_at: chrono::Utc::now(),
            metrics,
            parameters: params,
            artifact_path,
            git_commit,
            tags: vec![],
        };
        
        // Store metadata
        self.storage.save_metadata(&model_version).await?;
        
        tracing::info!(
            name = %model_version.name,
            version = %model_version.version,
            "Model registered"
        );
        
        Ok(model_version)
    }
    
    pub async fn load(&self, name: &str, version: &str) -> Result<PathBuf, Error> {
        let metadata = self.storage.get_metadata(name, version).await?;
        let local_path = self.storage.download(&metadata.artifact_path).await?;
        Ok(local_path)
    }
    
    pub async fn promote(&self, name: &str, version: &str, stage: &str) -> Result<(), Error> {
        // Add tag like "production" or "staging"
        self.storage.add_tag(name, version, stage).await
    }
    
    pub async fn list_versions(&self, name: &str) -> Result<Vec<ModelVersion>, Error> {
        self.storage.list_versions(name).await
    }
}

// Storage backends
#[async_trait]
pub trait ModelStorage: Send + Sync {
    async fn upload(&self, name: &str, version: &str, path: &Path) -> Result<PathBuf, Error>;
    async fn download(&self, path: &PathBuf) -> Result<PathBuf, Error>;
    async fn save_metadata(&self, model: &ModelVersion) -> Result<(), Error>;
    async fn get_metadata(&self, name: &str, version: &str) -> Result<ModelVersion, Error>;
    async fn list_versions(&self, name: &str) -> Result<Vec<ModelVersion>, Error>;
    async fn add_tag(&self, name: &str, version: &str, tag: &str) -> Result<(), Error>;
}

// S3 implementation
pub struct S3ModelStorage {
    client: aws_sdk_s3::Client,
    bucket: String,
}

#[async_trait]
impl ModelStorage for S3ModelStorage {
    async fn upload(&self, name: &str, version: &str, path: &Path) -> Result<PathBuf, Error> {
        let key = format!("models/{}/{}/model.tar.gz", name, version);
        
        let body = aws_sdk_s3::primitives::ByteStream::from_path(path).await?;
        
        self.client
            .put_object()
            .bucket(&self.bucket)
            .key(&key)
            .body(body)
            .send()
            .await?;
        
        Ok(PathBuf::from(format!("s3://{}/{}", self.bucket, key)))
    }
    
    // ... other implementations
}
}

45.9.14. Secret Management

Never put API keys in code or environment variables directly.

#![allow(unused)]
fn main() {
use aws_sdk_secretsmanager::Client as SecretsClient;

pub struct SecretManager {
    client: SecretsClient,
    cache: tokio::sync::RwLock<HashMap<String, CachedSecret>>,
}

struct CachedSecret {
    value: String,
    expires_at: std::time::Instant,
}

impl SecretManager {
    pub async fn get(&self, secret_name: &str) -> Result<String, Error> {
        // Check cache first
        {
            let cache = self.cache.read().await;
            if let Some(cached) = cache.get(secret_name) {
                if cached.expires_at > std::time::Instant::now() {
                    return Ok(cached.value.clone());
                }
            }
        }
        
        // Fetch from AWS Secrets Manager
        let response = self.client
            .get_secret_value()
            .secret_id(secret_name)
            .send()
            .await?;
        
        let value = response.secret_string()
            .ok_or(Error::SecretNotFound)?
            .to_string();
        
        // Cache for 5 minutes
        let mut cache = self.cache.write().await;
        cache.insert(secret_name.to_string(), CachedSecret {
            value: value.clone(),
            expires_at: std::time::Instant::now() + std::time::Duration::from_secs(300),
        });
        
        Ok(value)
    }
}

// Usage
async fn connect_database(secrets: &SecretManager) -> Result<PgPool, Error> {
    let db_url = secrets.get("prod/database/url").await?;
    let pool = PgPoolOptions::new()
        .max_connections(5)
        .connect(&db_url)
        .await?;
    Ok(pool)
}
}

45.9.15. Plugin Architecture

Allow users to extend your CLI.

#![allow(unused)]
fn main() {
use libloading::{Library, Symbol};
use std::path::Path;

pub trait Plugin: Send + Sync {
    fn name(&self) -> &str;
    fn version(&self) -> &str;
    fn execute(&self, args: &[String]) -> Result<(), Box<dyn std::error::Error>>;
}

pub struct PluginManager {
    plugins: Vec<(Library, Box<dyn Plugin>)>,
}

impl PluginManager {
    pub fn load_from_directory(dir: &Path) -> Result<Self, Error> {
        let mut plugins = vec![];
        
        for entry in std::fs::read_dir(dir)? {
            let path = entry?.path();
            if path.extension().map(|e| e == "so" || e == "dylib").unwrap_or(false) {
                match Self::load_plugin(&path) {
                    Ok((lib, plugin)) => {
                        tracing::info!(
                            name = plugin.name(),
                            version = plugin.version(),
                            "Loaded plugin"
                        );
                        plugins.push((lib, plugin));
                    }
                    Err(e) => {
                        tracing::warn!(path = ?path, error = ?e, "Failed to load plugin");
                    }
                }
            }
        }
        
        Ok(Self { plugins })
    }
    
    fn load_plugin(path: &Path) -> Result<(Library, Box<dyn Plugin>), Error> {
        unsafe {
            let lib = Library::new(path)?;
            let create_fn: Symbol<fn() -> Box<dyn Plugin>> = lib.get(b"create_plugin")?;
            let plugin = create_fn();
            Ok((lib, plugin))
        }
    }
    
    pub fn execute(&self, plugin_name: &str, args: &[String]) -> Result<(), Error> {
        for (_, plugin) in &self.plugins {
            if plugin.name() == plugin_name {
                return plugin.execute(args).map_err(Error::PluginError);
            }
        }
        Err(Error::PluginNotFound(plugin_name.to_string()))
    }
}

// Example plugin (separate crate compiled to .so/.dylib)
#[no_mangle]
pub extern "C" fn create_plugin() -> Box<dyn Plugin> {
    Box::new(MyCustomPlugin)
}

struct MyCustomPlugin;

impl Plugin for MyCustomPlugin {
    fn name(&self) -> &str { "custom-exporter" }
    fn version(&self) -> &str { "0.1.0" }
    
    fn execute(&self, args: &[String]) -> Result<(), Box<dyn std::error::Error>> {
        // Custom export logic
        Ok(())
    }
}
}

45.9.16. Feature Flags

Control rollouts without redeploying.

#![allow(unused)]
fn main() {
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use tokio::sync::RwLock;

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct FeatureFlag {
    pub name: String,
    pub enabled: bool,
    pub percentage: f32,  // 0.0-100.0 for gradual rollout
    pub user_whitelist: Vec<String>,
}

pub struct FeatureFlagService {
    flags: Arc<RwLock<HashMap<String, FeatureFlag>>>,
}

impl FeatureFlagService {
    pub async fn is_enabled(&self, flag_name: &str, user_id: Option<&str>) -> bool {
        let flags = self.flags.read().await;
        
        if let Some(flag) = flags.get(flag_name) {
            // Check whitelist
            if let Some(uid) = user_id {
                if flag.user_whitelist.contains(&uid.to_string()) {
                    return true;
                }
            }
            
            // Check percentage rollout
            if flag.percentage >= 100.0 {
                return flag.enabled;
            }
            
            if flag.percentage > 0.0 {
                // Deterministic based on user_id for consistency
                if let Some(uid) = user_id {
                    let hash = fxhash::hash64(uid.as_bytes());
                    let bucket = (hash % 10000) as f32 / 100.0;
                    return bucket < flag.percentage;
                }
            }
            
            flag.enabled
        } else {
            false
        }
    }
    
    pub async fn refresh(&self) -> Result<(), Error> {
        // Fetch from remote config (e.g., LaunchDarkly, internal service)
        let new_flags = fetch_flags_from_remote().await?;
        let mut flags = self.flags.write().await;
        *flags = new_flags;
        Ok(())
    }
}

// Usage
async fn serve_model(flags: &FeatureFlagService, user_id: &str) {
    let model = if flags.is_enabled("new-model-v2", Some(user_id)).await {
        load_model("v2")
    } else {
        load_model("v1")
    };
    
    // ...
}
}

45.9.17. Health Checks and Readiness Probes

Production services need proper health endpoints.

#![allow(unused)]
fn main() {
use axum::{Router, routing::get, Json, http::StatusCode};
use serde::Serialize;

#[derive(Serialize)]
struct HealthResponse {
    status: String,
    checks: HashMap<String, CheckResult>,
    version: String,
    uptime_seconds: u64,
}

#[derive(Serialize)]
struct CheckResult {
    status: String,
    latency_ms: u64,
    message: Option<String>,
}

async fn health_check(State(state): State<AppState>) -> (StatusCode, Json<HealthResponse>) {
    let mut checks = HashMap::new();
    let mut all_healthy = true;
    
    // Database check
    let db_start = std::time::Instant::now();
    let db_healthy = sqlx::query("SELECT 1")
        .fetch_one(&state.db_pool)
        .await
        .is_ok();
    checks.insert("database".to_string(), CheckResult {
        status: if db_healthy { "healthy" } else { "unhealthy" }.to_string(),
        latency_ms: db_start.elapsed().as_millis() as u64,
        message: None,
    });
    all_healthy &= db_healthy;
    
    // Model loaded check
    let model_loaded = state.model.read().await.is_some();
    checks.insert("model".to_string(), CheckResult {
        status: if model_loaded { "healthy" } else { "unhealthy" }.to_string(),
        latency_ms: 0,
        message: if model_loaded { None } else { Some("Model not loaded".to_string()) },
    });
    all_healthy &= model_loaded;
    
    // Redis check
    let redis_start = std::time::Instant::now();
    let redis_healthy = state.redis.ping().await.is_ok();
    checks.insert("redis".to_string(), CheckResult {
        status: if redis_healthy { "healthy" } else { "unhealthy" }.to_string(),
        latency_ms: redis_start.elapsed().as_millis() as u64,
        message: None,
    });
    all_healthy &= redis_healthy;
    
    let response = HealthResponse {
        status: if all_healthy { "healthy" } else { "unhealthy" }.to_string(),
        checks,
        version: env!("CARGO_PKG_VERSION").to_string(),
        uptime_seconds: state.start_time.elapsed().as_secs(),
    };
    
    let status = if all_healthy { StatusCode::OK } else { StatusCode::SERVICE_UNAVAILABLE };
    (status, Json(response))
}

// Kubernetes-style probes
async fn liveness() -> StatusCode {
    // Just check if the process is alive
    StatusCode::OK
}

async fn readiness(State(state): State<AppState>) -> StatusCode {
    // Check if ready to serve traffic
    if state.model.read().await.is_some() && state.db_pool.is_closed() == false {
        StatusCode::OK
    } else {
        StatusCode::SERVICE_UNAVAILABLE
    }
}

fn create_router(state: AppState) -> Router {
    Router::new()
        .route("/health", get(health_check))
        .route("/healthz", get(liveness))      // Kubernetes liveness
        .route("/readyz", get(readiness))      // Kubernetes readiness
        .with_state(state)
}
}

45.9.18. Graceful Shutdown

Handle termination signals properly.

use tokio::signal;

async fn graceful_shutdown(state: Arc<AppState>) {
    let ctrl_c = async {
        signal::ctrl_c().await.expect("Failed to install Ctrl+C handler");
    };

    #[cfg(unix)]
    let terminate = async {
        signal::unix::signal(signal::unix::SignalKind::terminate())
            .expect("Failed to install SIGTERM handler")
            .recv()
            .await;
    };

    #[cfg(not(unix))]
    let terminate = std::future::pending::<()>();

    tokio::select! {
        _ = ctrl_c => {
            tracing::info!("Received Ctrl+C, starting graceful shutdown");
        }
        _ = terminate => {
            tracing::info!("Received SIGTERM, starting graceful shutdown");
        }
    }

    // 1. Stop accepting new requests
    state.accepting_requests.store(false, Ordering::SeqCst);
    
    // 2. Wait for in-flight requests (max 30 seconds)
    let deadline = std::time::Instant::now() + std::time::Duration::from_secs(30);
    while state.active_requests.load(Ordering::SeqCst) > 0 {
        if std::time::Instant::now() > deadline {
            tracing::warn!("Timeout waiting for requests, forcing shutdown");
            break;
        }
        tokio::time::sleep(std::time::Duration::from_millis(100)).await;
    }
    
    // 3. Flush metrics
    if let Some(reporter) = &state.metrics_reporter {
        reporter.flush().await;
    }
    
    // 4. Close database connections
    state.db_pool.close().await;
    
    // 5. Save state if needed
    if let Some(checkpoint) = &state.checkpoint_manager {
        checkpoint.save().await.ok();
    }
    
    tracing::info!("Graceful shutdown complete");
}

#[tokio::main]
async fn main() {
    let state = Arc::new(AppState::new().await);
    let app = create_app(state.clone());
    
    let listener = tokio::net::TcpListener::bind("0.0.0.0:8080").await.unwrap();
    
    axum::serve(listener, app)
        .with_graceful_shutdown(graceful_shutdown(state))
        .await
        .unwrap();
}

45.9.19. Final Production Checklist

Before Deploying

  • Logging: JSON structured logs with tracing
  • Metrics: Prometheus endpoint exposed
  • Health checks: /healthz, /readyz endpoints
  • Graceful shutdown: Handle SIGTERM properly
  • Configuration: Layered config (file + env)
  • Secrets: Use Secrets Manager, not env vars
  • Error handling: anyhow + proper context

Deployment

  • Docker: Multi-stage build, distroless base
  • Size: < 50MB final image
  • Cross-compile: Test on target architecture
  • CI/CD: Automated builds with cargo-nextest

Operations

  • Version: Embed git commit in binary
  • Feature flags: Gradual rollout capability
  • Model registry: Track model versions
  • Rollback: Ability to revert quickly

[End of Section 45.9]