Keyboard shortcuts

Press or to navigate between chapters

Press ? to show this help

Press Esc to hide this help

38.2. Policy Serving Architecture

Status: Draft Version: 1.0.0 Tags: #RLOps, #Serving, #Rust, #gRPC Author: MLOps Team


Table of Contents

  1. The Stateful Paradox
  2. Project Structure: High-Performance Rust Policy Server
  3. Architecture 1: The Actor-Learner Decomposition
  4. Architecture 2: Inference-Only Serving
  5. Dynamic Batching Middleware
  6. Infrastructure: Kubernetes Deployment
  7. Shadow Mode (Dark Launch)
  8. Canary Deployment Strategy
  9. The Latency Hierarchy
  10. Summary Checklist

Prerequisites

Before diving into this chapter, ensure you have the following installed:

  • Rust: 1.70+ (cargo, rustc)
  • Protobuf: protoc compiler
  • Kubernetes: kubectl and minikube (optional)
  • gRPC Client: grpcurl for testing

The Stateful Paradox

In Supervised Learning, serving is easy: f(x) -> y. It’s a stateless function. In Reinforcement Learning, serving is hard: f(state_t, hidden_t) -> (action_t, hidden_t+1). It is Stateful, Sequential, and extremely Latency Sensitive.

Sticky Sessions vs Stateless

Most ML serving infrastructure (KServe, TorchServe) assumes stateless REST/gRPC calls. Load balancers distribute requests to any available pod.

  • The Problem: If valid Agents use RNNs (LSTMs/Transformers) for memory, the “Hidden State” $h_t$ must be passed from Step 1 to Step 2.
  • Failed Pattern: Client-Side State. Passing $h_t$ over the network (Client sends state back and forth) causes bandwidth explosion. For a Transformer KV cache, $h_t$ can be megabytes per user.
  • Correct Pattern: Sticky Sessions. The Request for Episode_123 must always go to Pod_A where the state resides.

Project Structure: High-Performance Rust Policy Server

To achieve <2ms latency, we use Rust with tonic (gRPC) and tokio.

policy-server/
├── Cargo.toml
├── build.rs
├── proto/
│   └── policy_service.proto
└── src/
    ├── main.rs
    ├── model.rs
    └── server.rs

Cargo.toml:

[package]
name = "policy-server"
version = "0.1.0"
edition = "2021"

[dependencies]
tonic = "0.9"           # gRPC
prost = "0.11"          # Protobuf
tokio = { version = "1.0", features = ["macros", "rt-multi-thread"] }
tch = "0.13"            # PyTorch bindings (LibTorch)
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
metrics = "0.21"        # Low latency metrics
anyhow = "1.0"

[build-dependencies]
tonic-build = "0.9"

proto/policy_service.proto:

syntax = "proto3";

package rl.policy.v1;

service PolicyService {
  // Unary call for stateless agents (MLP)
  rpc Predict (Observation) returns (Action);
  
  // Streaming for Stateful/Session-based (LSTM/Transformer)
  // Client opens stream, sends obs_t, receives act_t (keeps connection open)
  rpc PredictStream (stream Observation) returns (stream Action);
}

message Observation {
  string episode_id = 1;
  repeated float features = 2;
  int64 step_index = 3;
  
  // Optional: Client-side state (if strict sticky sessions unavailable)
  // bytes hidden_state = 4; // Warning: High Bandwidth
}

message Action {
  repeated float continuous_actions = 1;
  int32 discrete_action = 2;
  float value_estimate = 3; // For monitoring
  
  // Debug info
  string model_version = 4;
}

Architecture 1: The Actor-Learner Decomposition (Training)

During training (e.g., PPO/DQN), “Serving” means generating experience. We decouple the system into:

+-----------+       +-----------+       +-------------+
|  Actor 1  | ----> |  Learner  | <---- |   Actor N   |
|   (CPU)   |       |   (GPU)   |       |    (CPU)    |
+-----------+       +-----------+       +-------------+
      ^                   |                    ^
      |                   v                    |
      +------------ Parameter Server ----------+
  1. Actors (CPU): Interact with the Environment. Lightweight.
  2. Learner (GPU): Batches trajectories, computes Gradients, updates Weights.
  3. Parameter Server: Broadcasts new weights from Learner to Actors.

Rust Implementation: Async Actor

Using tokio to handle the environment loop asynchronously. This mimics the Ray/RLLib architecture but with zero-overhead Rust channels.

#![allow(unused)]
fn main() {
use tokio::sync::{mpsc, watch};
use crate::model::Weights;

struct Trajectory {
    obs: Vec<Vec<f32>>,
    actions: Vec<i32>,
    rewards: Vec<f32>,
}

struct Actor {
    id: usize,
    env: Box<dyn Environment>, // The Simulator
    policy_net: PolicyNetwork, // Local copy of weights
    experience_sender: mpsc::Sender<Trajectory>,
    weights_receiver: watch::Receiver<Weights>, // For weight sync
}

impl Actor {
    pub async fn run(&mut self) {
        loop {
            // 1. Sync Weights (Optimistic)
            // If new weights are available, grab them. If not, preserve old weights.
            if self.weights_receiver.has_changed().unwrap_or(false) {
                let new_weights = self.weights_receiver.borrow_and_update();
                self.policy_net.update(&new_weights);
            }

            // 2. Interact loop (The Rollout)
            let mut obs = self.env.reset();
            let mut trajectory = Trajectory::new();
            let mut done = false;
            
            while !done {
                // Inference (Forward Pass)
                // This typically runs on CPU or localized inference accelerator
                let action = self.policy_net.predict(&obs);
                
                let (next_obs, reward, is_done) = self.env.step(action);
                
                trajectory.push(obs, action, reward);
                
                obs = next_obs;
                done = is_done;
            }

            // 3. Send Experience to Learner
            if let Err(e) = self.experience_sender.send(trajectory).await {
                eprintln!("Actor {}: Learner is dead, exiting.", self.id);
                break;
            }
        }
    }
}
}

Architecture 2: Inference-Only Serving (Deployment)

When deploying to a Robot or an HFT Trading Engine, the “Learner” is gone. The weights are frozen. Constraints:

  • Latency: Critical. 100ms lag might crash the drone. In HFT, 1ms is eternity.
  • Reliability: What if the Neural Network outputs NaN?

The “Safety Cage” Pattern

Do not connect the Policy Network directly to the Actuators. Wrap it in a hard-coded Safety Layer (The “Lizard Brain”).

#![allow(unused)]
fn main() {
struct SafetyCage {
    policy: PolicyModel,
    config: SafetyConfig,
}

struct SafetyConfig {
    min_altitude: f64,
    max_throttle: f64,
}

impl SafetyCage {
    fn act(&self, state: &State) -> Action {
        // 1. Policy Inference (The "Cortex")
        let proposed = self.policy.predict(state);
        
        // 2. Safety Interlock (The "Reflex")
        let mut final_action = proposed;
        
        // Constraint: Anti-Crash Logic
        // If altitude is critical, ignore policy and apply Full Throttle
        if state.altitude < self.config.min_altitude {
            println!("SAFETY INTERVENE: Low Altitude Recovery");
            final_action.throttle = 1.0; 
            final_action.pitch = 0.0;
        }
        
        // Constraint: Hardware Limit
        if final_action.throttle > self.config.max_throttle {
            final_action.throttle = self.config.max_throttle;
        }
        
        final_action
    }
}
}

Dynamic Batching Middleware

In high-throughput serving (e.g., Ad Bidding with Contextual Bandits), processing requests one-by-one is inefficient for Matrix Multiplication. We need Dynamic Batching: Wait 5ms to collect 64 requests, then run one big matrix multiply.

Rust Implementation: The Batcher

#![allow(unused)]
fn main() {
use tokio::sync::{oneshot, mpsc};
use tokio::time::{sleep, Duration};

struct Request {
    input: Vec<f32>,
    response_tx: oneshot::Sender<f32>,
}

struct Batcher {
    queue_tx: mpsc::Sender<Request>,
}

impl Batcher {
    // Background Task
    async fn run_loop(mut queue_rx: mpsc::Receiver<Request>) {
        let mut batch = Vec::new();
        let max_batch_size = 64;
        let timeout = Duration::from_millis(5);
        
        loop {
            // Collect batch with timeout
            match tokio::time::timeout(timeout, queue_rx.recv()).await {
                Ok(Some(req)) => {
                    batch.push(req);
                    if batch.len() >= max_batch_size {
                        let full_batch = std::mem::take(&mut batch);
                        process_batch(full_batch).await;
                    }
                }
                Err(_) => {
                    // Timeout hit, process whatever we have
                    if !batch.is_empty() {
                        let partial_batch = std::mem::take(&mut batch);
                        process_batch(partial_batch).await;
                    }
                }
                Ok(None) => break, // Channel closed
            }
        }
    }
}

async fn process_batch(mut batch: Vec<Request>) {
    // 1. Stack inputs into Tensor (BatchSize, InputDim)
    // 2. Run Model Inference (Simulated)
    // 3. Send results back via oneshot channels
    for req in batch.drain(..) {
        let _ = req.response_tx.send(0.99);
    }
}
}

Infrastructure: Kubernetes Deployment

For standard serving, we use K8s. For HFT, we use bare metal.

deployment.yaml:

apiVersion: apps/v1
kind: Deployment
metadata:
  name: policy-server
spec:
  replicas: 10
  selector:
    matchLabels:
      app: policy-server
  template:
    metadata:
      labels:
        app: policy-server
    spec:
      containers:
      - name: policy-server
        image: my-policy-server:v1
        ports:
        - containerPort: 50051
        resources:
          limits:
            nvidia.com/gpu: 1
        env:
        - name: MODEL_PATH
          value: "/models/v1.pt"
        volumeMounts:
        - name: model-volume
          mountPath: /models
      volumes:
      - name: model-volume
        emptyDir: {}

Shadow Mode (Dark Launch)

How do you deploy a new RL policy without risking the robot? Shadow Mode:

  1. Run Old_Policy connected to the motors.
  2. Run New_Policy in parallel, receiving the same observations.
  3. Log New_Policy actions but do not execute them.
  4. Offline Analysis: “If we had executed New_Policy, would it have crashed?”

Rust Implementation:

#![allow(unused)]
fn main() {
fn step_shadow(state: &State) -> Action {
    let safe_action = production_policy.predict(state);
    let risky_action = experimental_policy.predict(state);
    
    // Log diversion
    if (safe_action - risky_action).abs() > 0.1 {
        log_divergence(state, safe_action, risky_action);
    }
    
    // Execute Safe Action
    safe_action
}
}

The Latency Hierarchy

In HFT or Robotics, your specific budget determines the architecture.

TierLatencyTechnologyTypical Use Case
Micro< 10µsFPGA / ASICHigh Frequency Trading (Market Making)
Embedded< 1msEmbedded Rust/C++ (no_std)Drone Flight Controller / ABS Brakes
Near-RT< 20msLocal Server (Rust/gRPC)Industrial Robotics arms
Interactive< 200msCloud API (Python/FastAPI)Recommender Systems / Chatbots

Summary Checklist

  1. Latency Test: Measure P99 latency. Ideally, inference < 20% of control loop time (e.g., if loop is 50Hz (20ms), inference must be < 4ms).
  2. Sticky Sessions: Ensure stateful RNNs use sticky routing or pass state explicitly.
  3. Safety Cage: Never deploy a neural net directly to motors without a hard-coded clamp layer.
  4. Obs Normalization: Export your running mean/std stats alongside model weights. Evaluating without them is a common bug.
  5. Fallback: If the model server times out, does the robot fail gracefully (hover/stop) or crash?
  6. Shadow Mode: Always shadow a new policy for 24h before enabling actuators.