38.2. Policy Serving Architecture
Status: Draft Version: 1.0.0 Tags: #RLOps, #Serving, #Rust, #gRPC Author: MLOps Team
Table of Contents
- The Stateful Paradox
- Project Structure: High-Performance Rust Policy Server
- Architecture 1: The Actor-Learner Decomposition
- Architecture 2: Inference-Only Serving
- Dynamic Batching Middleware
- Infrastructure: Kubernetes Deployment
- Shadow Mode (Dark Launch)
- Canary Deployment Strategy
- The Latency Hierarchy
- Summary Checklist
Prerequisites
Before diving into this chapter, ensure you have the following installed:
- Rust: 1.70+ (
cargo,rustc) - Protobuf:
protoccompiler - Kubernetes:
kubectlandminikube(optional) - gRPC Client:
grpcurlfor testing
The Stateful Paradox
In Supervised Learning, serving is easy: f(x) -> y. It’s a stateless function.
In Reinforcement Learning, serving is hard: f(state_t, hidden_t) -> (action_t, hidden_t+1).
It is Stateful, Sequential, and extremely Latency Sensitive.
Sticky Sessions vs Stateless
Most ML serving infrastructure (KServe, TorchServe) assumes stateless REST/gRPC calls. Load balancers distribute requests to any available pod.
- The Problem: If valid Agents use RNNs (LSTMs/Transformers) for memory, the “Hidden State” $h_t$ must be passed from Step 1 to Step 2.
- Failed Pattern: Client-Side State. Passing $h_t$ over the network (Client sends state back and forth) causes bandwidth explosion. For a Transformer KV cache, $h_t$ can be megabytes per user.
- Correct Pattern: Sticky Sessions. The Request for
Episode_123must always go toPod_Awhere the state resides.
Project Structure: High-Performance Rust Policy Server
To achieve <2ms latency, we use Rust with tonic (gRPC) and tokio.
policy-server/
├── Cargo.toml
├── build.rs
├── proto/
│ └── policy_service.proto
└── src/
├── main.rs
├── model.rs
└── server.rs
Cargo.toml:
[package]
name = "policy-server"
version = "0.1.0"
edition = "2021"
[dependencies]
tonic = "0.9" # gRPC
prost = "0.11" # Protobuf
tokio = { version = "1.0", features = ["macros", "rt-multi-thread"] }
tch = "0.13" # PyTorch bindings (LibTorch)
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
metrics = "0.21" # Low latency metrics
anyhow = "1.0"
[build-dependencies]
tonic-build = "0.9"
proto/policy_service.proto:
syntax = "proto3";
package rl.policy.v1;
service PolicyService {
// Unary call for stateless agents (MLP)
rpc Predict (Observation) returns (Action);
// Streaming for Stateful/Session-based (LSTM/Transformer)
// Client opens stream, sends obs_t, receives act_t (keeps connection open)
rpc PredictStream (stream Observation) returns (stream Action);
}
message Observation {
string episode_id = 1;
repeated float features = 2;
int64 step_index = 3;
// Optional: Client-side state (if strict sticky sessions unavailable)
// bytes hidden_state = 4; // Warning: High Bandwidth
}
message Action {
repeated float continuous_actions = 1;
int32 discrete_action = 2;
float value_estimate = 3; // For monitoring
// Debug info
string model_version = 4;
}
Architecture 1: The Actor-Learner Decomposition (Training)
During training (e.g., PPO/DQN), “Serving” means generating experience. We decouple the system into:
+-----------+ +-----------+ +-------------+
| Actor 1 | ----> | Learner | <---- | Actor N |
| (CPU) | | (GPU) | | (CPU) |
+-----------+ +-----------+ +-------------+
^ | ^
| v |
+------------ Parameter Server ----------+
- Actors (CPU): Interact with the Environment. Lightweight.
- Learner (GPU): Batches trajectories, computes Gradients, updates Weights.
- Parameter Server: Broadcasts new weights from Learner to Actors.
Rust Implementation: Async Actor
Using tokio to handle the environment loop asynchronously. This mimics the Ray/RLLib architecture but with zero-overhead Rust channels.
#![allow(unused)]
fn main() {
use tokio::sync::{mpsc, watch};
use crate::model::Weights;
struct Trajectory {
obs: Vec<Vec<f32>>,
actions: Vec<i32>,
rewards: Vec<f32>,
}
struct Actor {
id: usize,
env: Box<dyn Environment>, // The Simulator
policy_net: PolicyNetwork, // Local copy of weights
experience_sender: mpsc::Sender<Trajectory>,
weights_receiver: watch::Receiver<Weights>, // For weight sync
}
impl Actor {
pub async fn run(&mut self) {
loop {
// 1. Sync Weights (Optimistic)
// If new weights are available, grab them. If not, preserve old weights.
if self.weights_receiver.has_changed().unwrap_or(false) {
let new_weights = self.weights_receiver.borrow_and_update();
self.policy_net.update(&new_weights);
}
// 2. Interact loop (The Rollout)
let mut obs = self.env.reset();
let mut trajectory = Trajectory::new();
let mut done = false;
while !done {
// Inference (Forward Pass)
// This typically runs on CPU or localized inference accelerator
let action = self.policy_net.predict(&obs);
let (next_obs, reward, is_done) = self.env.step(action);
trajectory.push(obs, action, reward);
obs = next_obs;
done = is_done;
}
// 3. Send Experience to Learner
if let Err(e) = self.experience_sender.send(trajectory).await {
eprintln!("Actor {}: Learner is dead, exiting.", self.id);
break;
}
}
}
}
}
Architecture 2: Inference-Only Serving (Deployment)
When deploying to a Robot or an HFT Trading Engine, the “Learner” is gone. The weights are frozen. Constraints:
- Latency: Critical. 100ms lag might crash the drone. In HFT, 1ms is eternity.
- Reliability: What if the Neural Network outputs NaN?
The “Safety Cage” Pattern
Do not connect the Policy Network directly to the Actuators. Wrap it in a hard-coded Safety Layer (The “Lizard Brain”).
#![allow(unused)]
fn main() {
struct SafetyCage {
policy: PolicyModel,
config: SafetyConfig,
}
struct SafetyConfig {
min_altitude: f64,
max_throttle: f64,
}
impl SafetyCage {
fn act(&self, state: &State) -> Action {
// 1. Policy Inference (The "Cortex")
let proposed = self.policy.predict(state);
// 2. Safety Interlock (The "Reflex")
let mut final_action = proposed;
// Constraint: Anti-Crash Logic
// If altitude is critical, ignore policy and apply Full Throttle
if state.altitude < self.config.min_altitude {
println!("SAFETY INTERVENE: Low Altitude Recovery");
final_action.throttle = 1.0;
final_action.pitch = 0.0;
}
// Constraint: Hardware Limit
if final_action.throttle > self.config.max_throttle {
final_action.throttle = self.config.max_throttle;
}
final_action
}
}
}
Dynamic Batching Middleware
In high-throughput serving (e.g., Ad Bidding with Contextual Bandits), processing requests one-by-one is inefficient for Matrix Multiplication. We need Dynamic Batching: Wait 5ms to collect 64 requests, then run one big matrix multiply.
Rust Implementation: The Batcher
#![allow(unused)]
fn main() {
use tokio::sync::{oneshot, mpsc};
use tokio::time::{sleep, Duration};
struct Request {
input: Vec<f32>,
response_tx: oneshot::Sender<f32>,
}
struct Batcher {
queue_tx: mpsc::Sender<Request>,
}
impl Batcher {
// Background Task
async fn run_loop(mut queue_rx: mpsc::Receiver<Request>) {
let mut batch = Vec::new();
let max_batch_size = 64;
let timeout = Duration::from_millis(5);
loop {
// Collect batch with timeout
match tokio::time::timeout(timeout, queue_rx.recv()).await {
Ok(Some(req)) => {
batch.push(req);
if batch.len() >= max_batch_size {
let full_batch = std::mem::take(&mut batch);
process_batch(full_batch).await;
}
}
Err(_) => {
// Timeout hit, process whatever we have
if !batch.is_empty() {
let partial_batch = std::mem::take(&mut batch);
process_batch(partial_batch).await;
}
}
Ok(None) => break, // Channel closed
}
}
}
}
async fn process_batch(mut batch: Vec<Request>) {
// 1. Stack inputs into Tensor (BatchSize, InputDim)
// 2. Run Model Inference (Simulated)
// 3. Send results back via oneshot channels
for req in batch.drain(..) {
let _ = req.response_tx.send(0.99);
}
}
}
Infrastructure: Kubernetes Deployment
For standard serving, we use K8s. For HFT, we use bare metal.
deployment.yaml:
apiVersion: apps/v1
kind: Deployment
metadata:
name: policy-server
spec:
replicas: 10
selector:
matchLabels:
app: policy-server
template:
metadata:
labels:
app: policy-server
spec:
containers:
- name: policy-server
image: my-policy-server:v1
ports:
- containerPort: 50051
resources:
limits:
nvidia.com/gpu: 1
env:
- name: MODEL_PATH
value: "/models/v1.pt"
volumeMounts:
- name: model-volume
mountPath: /models
volumes:
- name: model-volume
emptyDir: {}
Shadow Mode (Dark Launch)
How do you deploy a new RL policy without risking the robot? Shadow Mode:
- Run
Old_Policyconnected to the motors. - Run
New_Policyin parallel, receiving the same observations. - Log
New_Policyactions but do not execute them. - Offline Analysis: “If we had executed
New_Policy, would it have crashed?”
Rust Implementation:
#![allow(unused)]
fn main() {
fn step_shadow(state: &State) -> Action {
let safe_action = production_policy.predict(state);
let risky_action = experimental_policy.predict(state);
// Log diversion
if (safe_action - risky_action).abs() > 0.1 {
log_divergence(state, safe_action, risky_action);
}
// Execute Safe Action
safe_action
}
}
The Latency Hierarchy
In HFT or Robotics, your specific budget determines the architecture.
| Tier | Latency | Technology | Typical Use Case |
|---|---|---|---|
| Micro | < 10µs | FPGA / ASIC | High Frequency Trading (Market Making) |
| Embedded | < 1ms | Embedded Rust/C++ (no_std) | Drone Flight Controller / ABS Brakes |
| Near-RT | < 20ms | Local Server (Rust/gRPC) | Industrial Robotics arms |
| Interactive | < 200ms | Cloud API (Python/FastAPI) | Recommender Systems / Chatbots |
Summary Checklist
- Latency Test: Measure P99 latency. Ideally, inference < 20% of control loop time (e.g., if loop is 50Hz (20ms), inference must be < 4ms).
- Sticky Sessions: Ensure stateful RNNs use sticky routing or pass state explicitly.
- Safety Cage: Never deploy a neural net directly to motors without a hard-coded clamp layer.
- Obs Normalization: Export your running mean/std stats alongside model weights. Evaluating without them is a common bug.
- Fallback: If the model server times out, does the robot fail gracefully (hover/stop) or crash?
- Shadow Mode: Always shadow a new policy for 24h before enabling actuators.