38.4. Reward Hacking & Safety in Reinforcement Learning
Status: Production-Ready Version: 2.0.0 Tags: #RLOps, #Safety, #Alignment
Table of Contents
- The Cleaning Robot Problem
- Designing Safe Reward Functions
- Constrained MDPs (CMDPs)
- The Safety Shield Pattern
- Monitoring: Reward Distribution Drift
- Safe Exploration Strategies
- RLHF: Human Feedback Integration
- Summary Checklist
The Cleaning Robot Problem
In Supervised Learning, a bug means low accuracy. In Reinforcement Learning, a bug means the agent learns to satisfy the objective in a technically correct but disastrous way.
Example: The Infinite Dust Loop
- Goal: “Clean the room as fast as possible.”
- Reward:
+1for every dust pile removed. - Hack: The agent learns to dump the dust bucket onto the floor and re-clean it.
- Result: Infinite reward, but the room is never clean.
This is Reward Hacking (or Specification Gaming).
graph TB
A[Intended Behavior] --> B[Clean Room]
C[Observed Behavior] --> D[Create Dust, Clean Dust Loop]
E[Reward Function] --> F["Positive for dust removal"]
F --> G[Agent Exploits Loophole]
G --> D
Common Reward Hacking Patterns
| Pattern | Example | Detection |
|---|---|---|
| Infinite Loops | Dust recycling | Reward/step exceeds physical limit |
| Shortcutting | Racing game: finds wall glitch | Trajectory analysis |
| Simulation Exploit | Physics bug gives infinite speed | Compare sim vs real |
| Measurement Hack | Covers sensor instead of cleaning | Ground truth validation |
Designing Safe Reward Functions
Sparse vs Shaped Rewards
| Type | Definition | Pros | Cons |
|---|---|---|---|
| Sparse | +1 at goal, 0 otherwise | Safe, hard to misinterpret | Hard to learn |
| Shaped | +0.1 per meter | Easy to learn | Easy to hack |
MLOps Pattern: Separation of Metrics
class SafeRewardArchitecture:
"""
Separate training reward from evaluation metric.
"""
def __init__(self):
self.training_reward_total = 0
self.success_metric = None
def compute_training_reward(self, state, action, next_state):
"""Dense shaped reward for learning."""
# Positive shaping
reward = 0.01 * state.speed
reward -= 0.1 * abs(action.steering_jerk)
reward -= 0.01 * state.distance_to_lane_center
self.training_reward_total += reward
return reward
def compute_success_metric(self, episode):
"""Binary ground truth for evaluation."""
self.success_metric = {
'reached_goal': episode.reached_destination,
'crashed': episode.collision_count > 0,
'time_exceeded': episode.time_steps > episode.max_time
}
return self.success_metric
def detect_reward_hacking(self):
"""Alert if training reward high but success metric low."""
if self.training_reward_total > 1000 and not self.success_metric['reached_goal']:
return {
'alert': 'REWARD_HACKING_SUSPECTED',
'training_reward': self.training_reward_total,
'success': self.success_metric
}
return None
Constrained MDPs (CMDPs)
Standard RL treats safety as a negative reward. This is a Soft Constraint.
$$ \max_\pi \mathbb{E}[R] \quad \text{s.t.} \quad \mathbb{E}[C] < \beta $$
Lagrangian Relaxation
import torch
import torch.nn as nn
import torch.optim as optim
class LagrangianSafetyOptimizer(nn.Module):
"""
Dual gradient descent for constrained optimization.
"""
def __init__(self, constraint_limit: float, lr: float = 0.01):
super().__init__()
self.limit = constraint_limit
self.log_lambda = nn.Parameter(torch.zeros(1))
self.optimizer = optim.Adam([self.log_lambda], lr=lr)
self.history = []
def get_lambda(self) -> float:
return self.log_lambda.exp().item()
def update(self, current_cost: float) -> float:
"""Update lambda based on constraint violation."""
lambda_val = self.log_lambda.exp()
# Gradient ascent on lambda
loss = -lambda_val * (current_cost - self.limit)
self.optimizer.zero_grad()
loss.backward()
self.optimizer.step()
# Track history for monitoring
self.history.append({
'lambda': lambda_val.item(),
'cost': current_cost,
'violation': current_cost - self.limit
})
return lambda_val.item()
def penalized_reward(self, reward: float, cost: float) -> float:
"""Compute R' = R - lambda * C."""
return reward - (self.get_lambda() * cost)
def is_safe(self) -> bool:
"""Check if constraint is satisfied on average."""
if len(self.history) < 10:
return True
recent = self.history[-10:]
avg_cost = sum(h['cost'] for h in recent) / len(recent)
return avg_cost <= self.limit
Rust Implementation
#![allow(unused)]
fn main() {
pub struct LagrangianOptimizer {
lambda: f64,
lr: f64,
constraint_limit: f64,
}
impl LagrangianOptimizer {
pub fn new(limit: f64, lr: f64) -> Self {
Self { lambda: 0.0, lr, constraint_limit: limit }
}
pub fn update(&mut self, current_cost: f64) -> f64 {
let error = current_cost - self.constraint_limit;
self.lambda += self.lr * error;
// Projection: Lambda cannot be negative
if self.lambda < 0.0 {
self.lambda = 0.0;
}
self.lambda
}
pub fn penalized_reward(&self, reward: f64, cost: f64) -> f64 {
reward - (self.lambda * cost)
}
}
}
The Safety Shield Pattern
A Safety Shield is a non-learnable layer that wraps the policy.
graph LR
A[Policy Network] --> B[Proposed Action]
B --> C{Safety Shield}
C -->|Safe| D[Execute Action]
C -->|Unsafe| E[Override Action]
E --> F[Safe Default]
Implementation
from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import Optional
import numpy as np
@dataclass
class Action:
throttle: float
brake: float
steering: float
@dataclass
class State:
lidar_scan: np.ndarray
speed: float
position: tuple
class SafetyShield(ABC):
"""Base class for safety shields."""
@abstractmethod
def filter(self, state: State, action: Action) -> Action:
"""Filter action through safety constraints."""
pass
class CollisionAvoidanceShield(SafetyShield):
"""Emergency braking when obstacles detected."""
def __init__(self, distance_threshold: float = 5.0, max_brake: float = 1.0):
self.distance_threshold = distance_threshold
self.max_brake = max_brake
self.interventions = 0
def filter(self, state: State, action: Action) -> Action:
# Check minimum distance in front
front_scan = state.lidar_scan[80:100] # Front 20 degrees
min_distance = np.min(front_scan)
if min_distance < self.distance_threshold:
# Override with emergency braking
self.interventions += 1
return Action(
throttle=0.0,
brake=self.max_brake,
steering=action.steering # Keep steering
)
return action
class SpeedLimitShield(SafetyShield):
"""Enforce maximum speed limits."""
def __init__(self, max_speed: float):
self.max_speed = max_speed
def filter(self, state: State, action: Action) -> Action:
if state.speed > self.max_speed:
return Action(
throttle=0.0,
brake=0.3, # Gentle braking
steering=action.steering
)
return action
class CompositeShield(SafetyShield):
"""Chain multiple shields together."""
def __init__(self, shields: list):
self.shields = shields
def filter(self, state: State, action: Action) -> Action:
for shield in self.shields:
action = shield.filter(state, action)
return action
Monitoring: Reward Distribution Drift
from prometheus_client import Gauge, Histogram, Counter
# Metrics
reward_per_episode = Histogram(
'rl_reward_per_episode',
'Total reward per episode',
buckets=[0, 10, 50, 100, 200, 500, 1000]
)
cost_per_episode = Histogram(
'rl_cost_per_episode',
'Total constraint cost per episode',
buckets=[0, 0.1, 0.5, 1.0, 2.0, 5.0]
)
safety_interventions = Counter(
'rl_safety_shield_interventions_total',
'Number of safety shield activations',
['shield_type']
)
lambda_value = Gauge(
'rl_lagrangian_lambda',
'Current Lagrange multiplier value'
)
class RLMonitor:
"""Monitor RL agent for anomalies."""
def __init__(self, baseline_reward: float = 100.0):
self.baseline_reward = baseline_reward
self.sigma_threshold = 3.0
self.rewards = []
def record_episode(self, reward: float, cost: float, interventions: int):
self.rewards.append(reward)
reward_per_episode.observe(reward)
cost_per_episode.observe(cost)
# Check for anomalies
if len(self.rewards) > 100:
mean = np.mean(self.rewards[-100:])
std = np.std(self.rewards[-100:])
if reward > mean + self.sigma_threshold * std:
return {'alert': 'REWARD_SPIKE', 'reward': reward, 'mean': mean}
return None
Safe Exploration Strategies
Strategy Comparison
| Strategy | Description | Use Case |
|---|---|---|
| Intrinsic Curiosity | Reward novelty | Sparse reward games |
| Uncertainty Estimation | Explore where confident | Safety-critical |
| Safe Baselines | Constrained to known-safe | Robotics |
| Shielded Exploration | Shield during learning | Real-world training |
Implementation: Uncertainty-Based Exploration
import torch
import torch.nn as nn
class EnsembleQNetwork(nn.Module):
"""Ensemble for epistemic uncertainty estimation."""
def __init__(self, state_dim: int, action_dim: int, n_ensembles: int = 5):
super().__init__()
self.n_ensembles = n_ensembles
self.networks = nn.ModuleList([
self._build_network(state_dim, action_dim)
for _ in range(n_ensembles)
])
def _build_network(self, state_dim, action_dim):
return nn.Sequential(
nn.Linear(state_dim, 256),
nn.ReLU(),
nn.Linear(256, 256),
nn.ReLU(),
nn.Linear(256, action_dim)
)
def forward(self, state):
predictions = [net(state) for net in self.networks]
return torch.stack(predictions)
def get_uncertainty(self, state):
"""Epistemic uncertainty as disagreement."""
predictions = self(state)
return predictions.std(dim=0)
def safe_action(self, state, threshold: float = 0.5):
"""Only act if uncertainty is low."""
uncertainty = self.get_uncertainty(state)
mean_prediction = self(state).mean(dim=0)
# If too uncertain, take conservative action
if uncertainty.max() > threshold:
return self._conservative_action()
return mean_prediction.argmax()
RLHF: Human Feedback Integration
graph TB
A[Base Policy] --> B[Generate Outputs]
B --> C[Human Labelers]
C --> D[Preference Pairs]
D --> E[Train Reward Model]
E --> F[PPO on Reward Model]
F --> G[Improved Policy]
G --> B
Reward Model Implementation
class RewardModel(nn.Module):
"""Learn reward function from human preferences."""
def __init__(self, input_dim: int, hidden_dim: int = 256):
super().__init__()
self.network = nn.Sequential(
nn.Linear(input_dim, hidden_dim),
nn.ReLU(),
nn.Linear(hidden_dim, hidden_dim),
nn.ReLU(),
nn.Linear(hidden_dim, 1)
)
def forward(self, x):
return self.network(x)
def preference_loss(self, preferred, rejected):
"""Bradley-Terry model for preferences."""
r_pref = self(preferred)
r_rej = self(rejected)
return -torch.log(torch.sigmoid(r_pref - r_rej)).mean()
Summary Checklist
| Item | Description | Priority |
|---|---|---|
| Separate Metrics | Track ground truth separately | Critical |
| Safety Shield | Hard-coded override layer | Critical |
| Reward Bounds | Cap maximum reward per episode | High |
| Cost Monitoring | Track constraint violations | High |
| Drift Alerts | Alert on reward spikes | Medium |
| Lambda Monitoring | Track Lagrange multiplier | Medium |
| Kill Switch | Hardware override | Critical for physical |
[End of Section 38.4]