45.9. MLOps Tooling: Building the Platform
Tip
The Mindset: In Python, you write scripts. In Rust, you build Tools. A Python script breaks when you change a CUDA version. A Rust binary works forever. This chapter covers how to build professional-grade CLI tools for MLOps.
45.9.1. The User Interface: clap
Documentation is good. Self-documenting CLIs are better.
clap (Command Line Argument Parser) is the standard.
Defining the CLI
use clap::{Parser, Subcommand, ValueEnum};
#[derive(Parser)]
#[command(name = "ml-platform")]
#[command(about = "The Corporate MLOps CLI", long_about = None)]
struct Cli {
#[command(subcommand)]
command: Commands,
/// Verbosity level
#[arg(short, long, global = true, action = clap::ArgAction::Count)]
verbose: u8,
}
#[derive(Subcommand)]
enum Commands {
/// Train a model on a dataset
Train {
/// Path to dataset
#[arg(short, long)]
dataset: String,
/// Learning Rate
#[arg(long, default_value_t = 0.001)]
lr: f64,
/// Optimizer type
#[arg(long, value_enum, default_value_t = Optimizer::Adam)]
optim: Optimizer,
},
/// Serve a trained model
Serve {
/// Port to bind to
#[arg(short, long, default_value_t = 8080)]
port: u16,
},
}
#[derive(Copy, Clone, PartialEq, Eq, PartialOrd, Ord, ValueEnum)]
enum Optimizer {
Adam,
Sgd,
RmsProp,
}
fn main() {
let cli = Cli::parse();
match &cli.command {
Commands::Train { dataset, lr, optim } => {
println!("Training on {} with lr={} optim={:?}", dataset, lr, optim);
}
Commands::Serve { port } => {
println!("Serving on port {}", port);
}
}
}
Why this matters:
Type ml-platform --help. You get a man-page generated for you.
Type ml-platform train --optim foo. You get “error: invalid value ‘foo’”.
This prevents “Config Drift” where colleagues run scripts with invalid arguments.
45.9.2. Configuration Management: config
Hardcoding paths is bad. Environment variables are better. Layered config is best.
The config crate merges:
config/default.tomlconfig/production.tomlML_PLATFORM_DB_URLenvironment variable.
#![allow(unused)]
fn main() {
use config::{Config, File, Environment};
use serde::Deserialize;
#[derive(Debug, Deserialize)]
struct Settings {
database: DatabaseSettings,
s3: S3Settings,
}
#[derive(Debug, Deserialize)]
struct DatabaseSettings {
url: String,
pool_size: u32,
}
#[derive(Debug, Deserialize)]
struct S3Settings {
bucket: String,
region: String,
}
impl Settings {
pub fn new() -> Result<Self, config::ConfigError> {
let run_mode = std::env::var("RUN_MODE").unwrap_or_else(|_| "development".into());
let s = Config::builder()
// Start with defaults
.add_source(File::with_name("config/default"))
// Add environment specific config
.add_source(File::with_name(&format!("config/{}", run_mode)).required(false))
// Add Environment Variables (e.g. APP_DATABASE__URL=...)
.add_source(Environment::with_prefix("APP").separator("__"))
.build()?;
s.try_deserialize()
}
}
}
45.9.3. Terminal UIs (TUI): ratatui
Sometimes you need to monitor training on a remote server via SSH.
Using tqdm is okay. Using a full TUI Dashboard is professional.
Ratatui is the successor to tui-rs.
Designing a Dashboard
#![allow(unused)]
fn main() {
use ratatui::{
backend::CrosstermBackend,
widgets::{Block, Borders, Gauge, Chart, Dataset},
Terminal,
};
fn draw_ui<B: Backend>(f: &mut Frame<B>, state: &AppState) {
let chunks = Layout::default()
.direction(Direction::Vertical)
.constraints([Constraint::Percentage(50), Constraint::Percentage(50)].as_ref())
.split(f.size());
// 1. Loss Chart
let datasets = vec![
Dataset::default()
.name("Training Loss")
.marker(symbols::Marker::Braille)
.style(Style::default().fg(Color::Cyan))
.data(&state.loss_history),
];
let chart = Chart::new(datasets)
.block(Block::default().title("Loss").borders(Borders::ALL));
f.render_widget(chart, chunks[0]);
// 2. GPU Utilization Gauge
let gauge = Gauge::default()
.block(Block::default().title("GPU Usage").borders(Borders::ALL))
.gauge_style(Style::default().fg(Color::Red))
.percent(state.gpu_util);
f.render_widget(gauge, chunks[1]);
}
}
45.9.4. Docker Optimization: The 20MB Container
Python containers are huge (1GB+ for pytorch).
Rust containers can be tiny (scratch images) or small (distroless).
Technique 1: cargo-chef (Layer Caching)
Compiling dependencies takes time. Docker doesn’t cache cargo build well because Cargo.toml rarely changes but src/ always changes.
cargo-chef computes a “recipe” (dependency tree) to cache crates.
Technique 2: Distroless
Google’s Distroless images contain GLIBC and SSL certs, but no Shell. Perfect for security.
The Ultimate Dockerfile
# Stage 1: Plan
FROM lukemathwalker/cargo-chef:latest-rust-1.75 AS planner
WORKDIR /app
COPY . .
RUN cargo chef prepare --recipe-path recipe.json
# Stage 2: Cache Dependencies
FROM lukemathwalker/cargo-chef:latest-rust-1.75 AS cacher
WORKDIR /app
COPY --from=planner /app/recipe.json recipe.json
# Build dependencies - this is the caching layer!
RUN cargo chef cook --release --recipe-path recipe.json
# Stage 3: Builder
FROM lukemathwalker/cargo-chef:latest-rust-1.75 AS builder
WORKDIR /app
COPY . .
# Copy compiled dependencies
COPY --from=cacher /app/target target
COPY --from=cacher /usr/local/cargo /usr/local/cargo
RUN cargo build --release --bin ml-platform
# Stage 4: Runtime
# Use 'cc-debian12' for GLIBC compatibility
FROM gcr.io/distroless/cc-debian12
COPY --from=builder /app/target/release/ml-platform /
CMD ["./ml-platform"]
Result: A 25MB Docker image that runs your entire ML pipeline.
45.9.5. Cross Compilation: Building for ARM on x86
You develop on Mac (ARM). You deploy to Linux (x86).
In Python, this is fine. In C++, this is hell.
In Rust, we use cross.
# Install Cross
cargo install cross
# Build for Linux x86_64
cross build --target x86_64-unknown-linux-gnu --release
# Build for Raspberry Pi
cross build --target aarch64-unknown-linux-gnu --release
cross uses Docker transparently to provide the toolchain.
45.9.6. CI/CD: Testing and Linting
Rust’s CI is fast if you use nextest.
cargo-nextest runs tests in parallel processes (isolating failures).
The GitHub Actions Workflow
name: Rust CI
on: [push, pull_request]
jobs:
test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: dtolnay/rust-toolchain@stable
# Cache Cargo Registry + Target
- uses: Swatinem/rust-cache@v2
# Install Nextest
- uses: taiki-e/install-action@nextest
# Run Tests
- run: cargo nextest run
# Linting (Clippy)
- run: cargo clippy -- -D warnings
# Formatting
- run: cargo fmt --check
45.9.7. Property Based Testing: proptest
Unit tests (assert_eq!(add(2, 2), 4)) are weak.
Property tests (assert_eq!(add(a, b), add(b, a))) are strong.
proptest generates thousands of random inputs trying to break your code.
#![allow(unused)]
fn main() {
use proptest::prelude::*;
proptest! {
#[test]
fn test_normalization_invariants(data in prop::collection::vec(0.0f32..10.0f32, 1..100)) {
let normalized = normalize(&data);
// Invariant 1: Max is 1.0 (approx)
let max = normalized.iter().fold(0.0/0.0, |m, v| v.max(m));
prop_assert!(max <= 1.0 + 1e-6);
// Invariant 2: Length preserved
prop_assert_eq!(data.len(), normalized.len());
}
}
}
This finds edge cases (Empty vector? NaN? Infinity?) that humans miss.
45.9.8. Error Handling Best Practices
Do not use unwrap().
Do not use String as error.
Library Code: thiserror
If you are writing a crate for others (my-ml-lib), use thiserror.
#![allow(unused)]
fn main() {
#[derive(thiserror::Error, Debug)]
pub enum ModelError {
#[error("tensor shape mismatch: expected {expected:?}, got {found:?}")]
ShapeMismatch { expected: Vec<usize>, found: Vec<usize> },
#[error("io error")]
Io(#[from] std::io::Error),
}
}
Application Code: anyhow / eyre
If you are writing the CLI (ml-platform), use anyhow.
It adds context to stacks.
fn main() -> anyhow::Result<()> {
load_model().context("Failed to load initial model")?;
Ok(())
}
Output:
Error: Failed to load initial model
Caused by: file not found
45.9.9. Release Engineering: cargo-dist
Shipping binaries to users is hard (building MSIs, DEBs, Homebrew taps).
cargo-dist automates this.
It generates a CI workflow that:
- Builds for all platforms.
- Zips them up.
- Creates a GitHub Release.
- Generates a shell installer script.
Run cargo dist init and commit the workflow.
Users can now:
curl --proto '=https' --tlsv1.2 -LsSf https://github.com/myorg/ml-platform/releases/download/v0.1.0/ml-platform-installer.sh | sh
45.9.10. Final Checklist for MLOps Tooling
- Safety: Use
clippyto enforce best practices. - Config: Use
configcrate for layered settings. - Observability: Use
tracingfor structured logs. - UI: Use
clapfor CLI andratatuifor dashboards. - Distribution: Use
cargo-dist+ Distroless Docker images.
[End of Section 45.9]
45.9.11. Structured Logging with tracing
println! is for scripts. tracing is for production.
It provides structured, contextual logging with spans.
Setting Up Tracing
#![allow(unused)]
fn main() {
use tracing::{info, warn, error, span, Level, Instrument};
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt, fmt};
fn init_tracing() {
tracing_subscriber::registry()
.with(fmt::layer()
.json() // JSON output for log aggregation
.with_target(false)
.with_thread_ids(true))
.init();
}
async fn train_model(config: &TrainConfig) -> Result<Model, Error> {
// Create a span for the entire training run
let span = span!(Level::INFO, "train",
model_name = %config.model_name,
dataset = %config.dataset_path
);
let _enter = span.enter();
info!(learning_rate = config.lr, epochs = config.epochs, "Starting training");
for epoch in 0..config.epochs {
let epoch_span = span!(Level::DEBUG, "epoch", epoch = epoch);
let _epoch_enter = epoch_span.enter();
let loss = train_epoch(&config).await?;
info!(loss = loss, "Epoch completed");
if loss.is_nan() {
error!(lr = config.lr, "Loss diverged!");
return Err(Error::TrainingDiverged);
}
}
Ok(model)
}
}
Output (JSON Format)
{
"timestamp": "2024-01-15T10:30:45Z",
"level": "INFO",
"span": {"train": {"model_name": "bert-base", "dataset": "squad"}},
"message": "Starting training",
"fields": {"learning_rate": 0.001, "epochs": 10}
}
Distributed Tracing with OpenTelemetry
#![allow(unused)]
fn main() {
use opentelemetry::sdk::trace::TracerProvider;
use opentelemetry_otlp::WithExportConfig;
use tracing_opentelemetry::OpenTelemetryLayer;
fn init_otel_tracing() {
let tracer = opentelemetry_otlp::new_pipeline()
.tracing()
.with_exporter(
opentelemetry_otlp::new_exporter()
.tonic()
.with_endpoint("http://jaeger:4317")
)
.install_batch(opentelemetry::runtime::Tokio)
.unwrap();
tracing_subscriber::registry()
.with(OpenTelemetryLayer::new(tracer))
.with(fmt::layer().json())
.init();
}
}
Now your logs appear in Jaeger/Grafana Tempo with full trace context!
45.9.12. Metrics with metrics Crate
Beyond logs, you need metrics for dashboards.
#![allow(unused)]
fn main() {
use metrics::{counter, gauge, histogram};
use metrics_exporter_prometheus::PrometheusBuilder;
fn init_metrics() {
// Start Prometheus exporter on :9090/metrics
PrometheusBuilder::new()
.with_http_listener(([0, 0, 0, 0], 9090))
.install()
.expect("Failed to install Prometheus recorder");
}
fn record_training_metrics(epoch: u32, loss: f64, lr: f64) {
gauge!("training_epoch").set(epoch as f64);
histogram!("training_loss").record(loss);
gauge!("training_learning_rate").set(lr);
counter!("training_steps_total").increment(1);
}
fn record_inference_metrics(model: &str, latency: std::time::Duration, success: bool) {
let labels = vec![("model", model.to_string())];
histogram!("inference_latency_seconds", &labels)
.record(latency.as_secs_f64());
if success {
counter!("inference_success_total", &labels).increment(1);
} else {
counter!("inference_failure_total", &labels).increment(1);
}
}
}
45.9.13. Model Registry
Track model versions, metadata, and lineage.
#![allow(unused)]
fn main() {
use serde::{Deserialize, Serialize};
use std::path::PathBuf;
#[derive(Debug, Serialize, Deserialize)]
pub struct ModelVersion {
pub name: String,
pub version: String,
pub created_at: chrono::DateTime<chrono::Utc>,
pub metrics: HashMap<String, f64>,
pub parameters: HashMap<String, serde_json::Value>,
pub artifact_path: PathBuf,
pub git_commit: String,
pub tags: Vec<String>,
}
pub struct ModelRegistry {
storage: Box<dyn ModelStorage>,
}
impl ModelRegistry {
pub async fn register(
&self,
name: &str,
model_path: &Path,
metrics: HashMap<String, f64>,
params: HashMap<String, serde_json::Value>,
) -> Result<ModelVersion, Error> {
// Generate version (semantic or timestamp-based)
let version = self.next_version(name).await?;
// Get git commit
let git_commit = std::process::Command::new("git")
.args(["rev-parse", "HEAD"])
.output()
.map(|o| String::from_utf8_lossy(&o.stdout).trim().to_string())
.unwrap_or_else(|_| "unknown".to_string());
// Upload artifact
let artifact_path = self.storage.upload(name, &version, model_path).await?;
let model_version = ModelVersion {
name: name.to_string(),
version,
created_at: chrono::Utc::now(),
metrics,
parameters: params,
artifact_path,
git_commit,
tags: vec![],
};
// Store metadata
self.storage.save_metadata(&model_version).await?;
tracing::info!(
name = %model_version.name,
version = %model_version.version,
"Model registered"
);
Ok(model_version)
}
pub async fn load(&self, name: &str, version: &str) -> Result<PathBuf, Error> {
let metadata = self.storage.get_metadata(name, version).await?;
let local_path = self.storage.download(&metadata.artifact_path).await?;
Ok(local_path)
}
pub async fn promote(&self, name: &str, version: &str, stage: &str) -> Result<(), Error> {
// Add tag like "production" or "staging"
self.storage.add_tag(name, version, stage).await
}
pub async fn list_versions(&self, name: &str) -> Result<Vec<ModelVersion>, Error> {
self.storage.list_versions(name).await
}
}
// Storage backends
#[async_trait]
pub trait ModelStorage: Send + Sync {
async fn upload(&self, name: &str, version: &str, path: &Path) -> Result<PathBuf, Error>;
async fn download(&self, path: &PathBuf) -> Result<PathBuf, Error>;
async fn save_metadata(&self, model: &ModelVersion) -> Result<(), Error>;
async fn get_metadata(&self, name: &str, version: &str) -> Result<ModelVersion, Error>;
async fn list_versions(&self, name: &str) -> Result<Vec<ModelVersion>, Error>;
async fn add_tag(&self, name: &str, version: &str, tag: &str) -> Result<(), Error>;
}
// S3 implementation
pub struct S3ModelStorage {
client: aws_sdk_s3::Client,
bucket: String,
}
#[async_trait]
impl ModelStorage for S3ModelStorage {
async fn upload(&self, name: &str, version: &str, path: &Path) -> Result<PathBuf, Error> {
let key = format!("models/{}/{}/model.tar.gz", name, version);
let body = aws_sdk_s3::primitives::ByteStream::from_path(path).await?;
self.client
.put_object()
.bucket(&self.bucket)
.key(&key)
.body(body)
.send()
.await?;
Ok(PathBuf::from(format!("s3://{}/{}", self.bucket, key)))
}
// ... other implementations
}
}
45.9.14. Secret Management
Never put API keys in code or environment variables directly.
#![allow(unused)]
fn main() {
use aws_sdk_secretsmanager::Client as SecretsClient;
pub struct SecretManager {
client: SecretsClient,
cache: tokio::sync::RwLock<HashMap<String, CachedSecret>>,
}
struct CachedSecret {
value: String,
expires_at: std::time::Instant,
}
impl SecretManager {
pub async fn get(&self, secret_name: &str) -> Result<String, Error> {
// Check cache first
{
let cache = self.cache.read().await;
if let Some(cached) = cache.get(secret_name) {
if cached.expires_at > std::time::Instant::now() {
return Ok(cached.value.clone());
}
}
}
// Fetch from AWS Secrets Manager
let response = self.client
.get_secret_value()
.secret_id(secret_name)
.send()
.await?;
let value = response.secret_string()
.ok_or(Error::SecretNotFound)?
.to_string();
// Cache for 5 minutes
let mut cache = self.cache.write().await;
cache.insert(secret_name.to_string(), CachedSecret {
value: value.clone(),
expires_at: std::time::Instant::now() + std::time::Duration::from_secs(300),
});
Ok(value)
}
}
// Usage
async fn connect_database(secrets: &SecretManager) -> Result<PgPool, Error> {
let db_url = secrets.get("prod/database/url").await?;
let pool = PgPoolOptions::new()
.max_connections(5)
.connect(&db_url)
.await?;
Ok(pool)
}
}
45.9.15. Plugin Architecture
Allow users to extend your CLI.
#![allow(unused)]
fn main() {
use libloading::{Library, Symbol};
use std::path::Path;
pub trait Plugin: Send + Sync {
fn name(&self) -> &str;
fn version(&self) -> &str;
fn execute(&self, args: &[String]) -> Result<(), Box<dyn std::error::Error>>;
}
pub struct PluginManager {
plugins: Vec<(Library, Box<dyn Plugin>)>,
}
impl PluginManager {
pub fn load_from_directory(dir: &Path) -> Result<Self, Error> {
let mut plugins = vec![];
for entry in std::fs::read_dir(dir)? {
let path = entry?.path();
if path.extension().map(|e| e == "so" || e == "dylib").unwrap_or(false) {
match Self::load_plugin(&path) {
Ok((lib, plugin)) => {
tracing::info!(
name = plugin.name(),
version = plugin.version(),
"Loaded plugin"
);
plugins.push((lib, plugin));
}
Err(e) => {
tracing::warn!(path = ?path, error = ?e, "Failed to load plugin");
}
}
}
}
Ok(Self { plugins })
}
fn load_plugin(path: &Path) -> Result<(Library, Box<dyn Plugin>), Error> {
unsafe {
let lib = Library::new(path)?;
let create_fn: Symbol<fn() -> Box<dyn Plugin>> = lib.get(b"create_plugin")?;
let plugin = create_fn();
Ok((lib, plugin))
}
}
pub fn execute(&self, plugin_name: &str, args: &[String]) -> Result<(), Error> {
for (_, plugin) in &self.plugins {
if plugin.name() == plugin_name {
return plugin.execute(args).map_err(Error::PluginError);
}
}
Err(Error::PluginNotFound(plugin_name.to_string()))
}
}
// Example plugin (separate crate compiled to .so/.dylib)
#[no_mangle]
pub extern "C" fn create_plugin() -> Box<dyn Plugin> {
Box::new(MyCustomPlugin)
}
struct MyCustomPlugin;
impl Plugin for MyCustomPlugin {
fn name(&self) -> &str { "custom-exporter" }
fn version(&self) -> &str { "0.1.0" }
fn execute(&self, args: &[String]) -> Result<(), Box<dyn std::error::Error>> {
// Custom export logic
Ok(())
}
}
}
45.9.16. Feature Flags
Control rollouts without redeploying.
#![allow(unused)]
fn main() {
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use tokio::sync::RwLock;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct FeatureFlag {
pub name: String,
pub enabled: bool,
pub percentage: f32, // 0.0-100.0 for gradual rollout
pub user_whitelist: Vec<String>,
}
pub struct FeatureFlagService {
flags: Arc<RwLock<HashMap<String, FeatureFlag>>>,
}
impl FeatureFlagService {
pub async fn is_enabled(&self, flag_name: &str, user_id: Option<&str>) -> bool {
let flags = self.flags.read().await;
if let Some(flag) = flags.get(flag_name) {
// Check whitelist
if let Some(uid) = user_id {
if flag.user_whitelist.contains(&uid.to_string()) {
return true;
}
}
// Check percentage rollout
if flag.percentage >= 100.0 {
return flag.enabled;
}
if flag.percentage > 0.0 {
// Deterministic based on user_id for consistency
if let Some(uid) = user_id {
let hash = fxhash::hash64(uid.as_bytes());
let bucket = (hash % 10000) as f32 / 100.0;
return bucket < flag.percentage;
}
}
flag.enabled
} else {
false
}
}
pub async fn refresh(&self) -> Result<(), Error> {
// Fetch from remote config (e.g., LaunchDarkly, internal service)
let new_flags = fetch_flags_from_remote().await?;
let mut flags = self.flags.write().await;
*flags = new_flags;
Ok(())
}
}
// Usage
async fn serve_model(flags: &FeatureFlagService, user_id: &str) {
let model = if flags.is_enabled("new-model-v2", Some(user_id)).await {
load_model("v2")
} else {
load_model("v1")
};
// ...
}
}
45.9.17. Health Checks and Readiness Probes
Production services need proper health endpoints.
#![allow(unused)]
fn main() {
use axum::{Router, routing::get, Json, http::StatusCode};
use serde::Serialize;
#[derive(Serialize)]
struct HealthResponse {
status: String,
checks: HashMap<String, CheckResult>,
version: String,
uptime_seconds: u64,
}
#[derive(Serialize)]
struct CheckResult {
status: String,
latency_ms: u64,
message: Option<String>,
}
async fn health_check(State(state): State<AppState>) -> (StatusCode, Json<HealthResponse>) {
let mut checks = HashMap::new();
let mut all_healthy = true;
// Database check
let db_start = std::time::Instant::now();
let db_healthy = sqlx::query("SELECT 1")
.fetch_one(&state.db_pool)
.await
.is_ok();
checks.insert("database".to_string(), CheckResult {
status: if db_healthy { "healthy" } else { "unhealthy" }.to_string(),
latency_ms: db_start.elapsed().as_millis() as u64,
message: None,
});
all_healthy &= db_healthy;
// Model loaded check
let model_loaded = state.model.read().await.is_some();
checks.insert("model".to_string(), CheckResult {
status: if model_loaded { "healthy" } else { "unhealthy" }.to_string(),
latency_ms: 0,
message: if model_loaded { None } else { Some("Model not loaded".to_string()) },
});
all_healthy &= model_loaded;
// Redis check
let redis_start = std::time::Instant::now();
let redis_healthy = state.redis.ping().await.is_ok();
checks.insert("redis".to_string(), CheckResult {
status: if redis_healthy { "healthy" } else { "unhealthy" }.to_string(),
latency_ms: redis_start.elapsed().as_millis() as u64,
message: None,
});
all_healthy &= redis_healthy;
let response = HealthResponse {
status: if all_healthy { "healthy" } else { "unhealthy" }.to_string(),
checks,
version: env!("CARGO_PKG_VERSION").to_string(),
uptime_seconds: state.start_time.elapsed().as_secs(),
};
let status = if all_healthy { StatusCode::OK } else { StatusCode::SERVICE_UNAVAILABLE };
(status, Json(response))
}
// Kubernetes-style probes
async fn liveness() -> StatusCode {
// Just check if the process is alive
StatusCode::OK
}
async fn readiness(State(state): State<AppState>) -> StatusCode {
// Check if ready to serve traffic
if state.model.read().await.is_some() && state.db_pool.is_closed() == false {
StatusCode::OK
} else {
StatusCode::SERVICE_UNAVAILABLE
}
}
fn create_router(state: AppState) -> Router {
Router::new()
.route("/health", get(health_check))
.route("/healthz", get(liveness)) // Kubernetes liveness
.route("/readyz", get(readiness)) // Kubernetes readiness
.with_state(state)
}
}
45.9.18. Graceful Shutdown
Handle termination signals properly.
use tokio::signal;
async fn graceful_shutdown(state: Arc<AppState>) {
let ctrl_c = async {
signal::ctrl_c().await.expect("Failed to install Ctrl+C handler");
};
#[cfg(unix)]
let terminate = async {
signal::unix::signal(signal::unix::SignalKind::terminate())
.expect("Failed to install SIGTERM handler")
.recv()
.await;
};
#[cfg(not(unix))]
let terminate = std::future::pending::<()>();
tokio::select! {
_ = ctrl_c => {
tracing::info!("Received Ctrl+C, starting graceful shutdown");
}
_ = terminate => {
tracing::info!("Received SIGTERM, starting graceful shutdown");
}
}
// 1. Stop accepting new requests
state.accepting_requests.store(false, Ordering::SeqCst);
// 2. Wait for in-flight requests (max 30 seconds)
let deadline = std::time::Instant::now() + std::time::Duration::from_secs(30);
while state.active_requests.load(Ordering::SeqCst) > 0 {
if std::time::Instant::now() > deadline {
tracing::warn!("Timeout waiting for requests, forcing shutdown");
break;
}
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
}
// 3. Flush metrics
if let Some(reporter) = &state.metrics_reporter {
reporter.flush().await;
}
// 4. Close database connections
state.db_pool.close().await;
// 5. Save state if needed
if let Some(checkpoint) = &state.checkpoint_manager {
checkpoint.save().await.ok();
}
tracing::info!("Graceful shutdown complete");
}
#[tokio::main]
async fn main() {
let state = Arc::new(AppState::new().await);
let app = create_app(state.clone());
let listener = tokio::net::TcpListener::bind("0.0.0.0:8080").await.unwrap();
axum::serve(listener, app)
.with_graceful_shutdown(graceful_shutdown(state))
.await
.unwrap();
}
45.9.19. Final Production Checklist
Before Deploying
- Logging: JSON structured logs with tracing
- Metrics: Prometheus endpoint exposed
- Health checks:
/healthz,/readyzendpoints - Graceful shutdown: Handle SIGTERM properly
- Configuration: Layered config (file + env)
- Secrets: Use Secrets Manager, not env vars
- Error handling:
anyhow+ proper context
Deployment
- Docker: Multi-stage build, distroless base
- Size: < 50MB final image
- Cross-compile: Test on target architecture
- CI/CD: Automated builds with cargo-nextest
Operations
- Version: Embed git commit in binary
- Feature flags: Gradual rollout capability
- Model registry: Track model versions
- Rollback: Ability to revert quickly
[End of Section 45.9]