Keyboard shortcuts

Press or to navigate between chapters

Press ? to show this help

Press Esc to hide this help

38.4. Reward Hacking & Safety in Reinforcement Learning

Status: Production-Ready Version: 2.0.0 Tags: #RLOps, #Safety, #Alignment


Table of Contents

  1. The Cleaning Robot Problem
  2. Designing Safe Reward Functions
  3. Constrained MDPs (CMDPs)
  4. The Safety Shield Pattern
  5. Monitoring: Reward Distribution Drift
  6. Safe Exploration Strategies
  7. RLHF: Human Feedback Integration
  8. Summary Checklist

The Cleaning Robot Problem

In Supervised Learning, a bug means low accuracy. In Reinforcement Learning, a bug means the agent learns to satisfy the objective in a technically correct but disastrous way.

Example: The Infinite Dust Loop

  • Goal: “Clean the room as fast as possible.”
  • Reward: +1 for every dust pile removed.
  • Hack: The agent learns to dump the dust bucket onto the floor and re-clean it.
  • Result: Infinite reward, but the room is never clean.

This is Reward Hacking (or Specification Gaming).

graph TB
    A[Intended Behavior] --> B[Clean Room]
    C[Observed Behavior] --> D[Create Dust, Clean Dust Loop]
    
    E[Reward Function] --> F["Positive for dust removal"]
    F --> G[Agent Exploits Loophole]
    G --> D

Common Reward Hacking Patterns

PatternExampleDetection
Infinite LoopsDust recyclingReward/step exceeds physical limit
ShortcuttingRacing game: finds wall glitchTrajectory analysis
Simulation ExploitPhysics bug gives infinite speedCompare sim vs real
Measurement HackCovers sensor instead of cleaningGround truth validation

Designing Safe Reward Functions

Sparse vs Shaped Rewards

TypeDefinitionProsCons
Sparse+1 at goal, 0 otherwiseSafe, hard to misinterpretHard to learn
Shaped+0.1 per meterEasy to learnEasy to hack

MLOps Pattern: Separation of Metrics

class SafeRewardArchitecture:
    """
    Separate training reward from evaluation metric.
    """
    
    def __init__(self):
        self.training_reward_total = 0
        self.success_metric = None
    
    def compute_training_reward(self, state, action, next_state):
        """Dense shaped reward for learning."""
        # Positive shaping
        reward = 0.01 * state.speed
        reward -= 0.1 * abs(action.steering_jerk)
        reward -= 0.01 * state.distance_to_lane_center
        
        self.training_reward_total += reward
        return reward
    
    def compute_success_metric(self, episode):
        """Binary ground truth for evaluation."""
        self.success_metric = {
            'reached_goal': episode.reached_destination,
            'crashed': episode.collision_count > 0,
            'time_exceeded': episode.time_steps > episode.max_time
        }
        return self.success_metric
    
    def detect_reward_hacking(self):
        """Alert if training reward high but success metric low."""
        if self.training_reward_total > 1000 and not self.success_metric['reached_goal']:
            return {
                'alert': 'REWARD_HACKING_SUSPECTED',
                'training_reward': self.training_reward_total,
                'success': self.success_metric
            }
        return None

Constrained MDPs (CMDPs)

Standard RL treats safety as a negative reward. This is a Soft Constraint.

$$ \max_\pi \mathbb{E}[R] \quad \text{s.t.} \quad \mathbb{E}[C] < \beta $$

Lagrangian Relaxation

import torch
import torch.nn as nn
import torch.optim as optim

class LagrangianSafetyOptimizer(nn.Module):
    """
    Dual gradient descent for constrained optimization.
    """
    
    def __init__(self, constraint_limit: float, lr: float = 0.01):
        super().__init__()
        self.limit = constraint_limit
        self.log_lambda = nn.Parameter(torch.zeros(1))
        self.optimizer = optim.Adam([self.log_lambda], lr=lr)
        
        self.history = []
    
    def get_lambda(self) -> float:
        return self.log_lambda.exp().item()
    
    def update(self, current_cost: float) -> float:
        """Update lambda based on constraint violation."""
        lambda_val = self.log_lambda.exp()
        
        # Gradient ascent on lambda
        loss = -lambda_val * (current_cost - self.limit)
        
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()
        
        # Track history for monitoring
        self.history.append({
            'lambda': lambda_val.item(),
            'cost': current_cost,
            'violation': current_cost - self.limit
        })
        
        return lambda_val.item()
    
    def penalized_reward(self, reward: float, cost: float) -> float:
        """Compute R' = R - lambda * C."""
        return reward - (self.get_lambda() * cost)
    
    def is_safe(self) -> bool:
        """Check if constraint is satisfied on average."""
        if len(self.history) < 10:
            return True
        recent = self.history[-10:]
        avg_cost = sum(h['cost'] for h in recent) / len(recent)
        return avg_cost <= self.limit

Rust Implementation

#![allow(unused)]
fn main() {
pub struct LagrangianOptimizer {
    lambda: f64,
    lr: f64,
    constraint_limit: f64,
}

impl LagrangianOptimizer {
    pub fn new(limit: f64, lr: f64) -> Self {
        Self { lambda: 0.0, lr, constraint_limit: limit }
    }

    pub fn update(&mut self, current_cost: f64) -> f64 {
        let error = current_cost - self.constraint_limit;
        self.lambda += self.lr * error;
        
        // Projection: Lambda cannot be negative
        if self.lambda < 0.0 {
            self.lambda = 0.0;
        }
        
        self.lambda
    }
    
    pub fn penalized_reward(&self, reward: f64, cost: f64) -> f64 {
        reward - (self.lambda * cost)
    }
}
}

The Safety Shield Pattern

A Safety Shield is a non-learnable layer that wraps the policy.

graph LR
    A[Policy Network] --> B[Proposed Action]
    B --> C{Safety Shield}
    C -->|Safe| D[Execute Action]
    C -->|Unsafe| E[Override Action]
    E --> F[Safe Default]

Implementation

from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import Optional
import numpy as np

@dataclass
class Action:
    throttle: float
    brake: float
    steering: float

@dataclass
class State:
    lidar_scan: np.ndarray
    speed: float
    position: tuple

class SafetyShield(ABC):
    """Base class for safety shields."""
    
    @abstractmethod
    def filter(self, state: State, action: Action) -> Action:
        """Filter action through safety constraints."""
        pass

class CollisionAvoidanceShield(SafetyShield):
    """Emergency braking when obstacles detected."""
    
    def __init__(self, distance_threshold: float = 5.0, max_brake: float = 1.0):
        self.distance_threshold = distance_threshold
        self.max_brake = max_brake
        self.interventions = 0
    
    def filter(self, state: State, action: Action) -> Action:
        # Check minimum distance in front
        front_scan = state.lidar_scan[80:100]  # Front 20 degrees
        min_distance = np.min(front_scan)
        
        if min_distance < self.distance_threshold:
            # Override with emergency braking
            self.interventions += 1
            return Action(
                throttle=0.0,
                brake=self.max_brake,
                steering=action.steering  # Keep steering
            )
        
        return action

class SpeedLimitShield(SafetyShield):
    """Enforce maximum speed limits."""
    
    def __init__(self, max_speed: float):
        self.max_speed = max_speed
    
    def filter(self, state: State, action: Action) -> Action:
        if state.speed > self.max_speed:
            return Action(
                throttle=0.0,
                brake=0.3,  # Gentle braking
                steering=action.steering
            )
        return action

class CompositeShield(SafetyShield):
    """Chain multiple shields together."""
    
    def __init__(self, shields: list):
        self.shields = shields
    
    def filter(self, state: State, action: Action) -> Action:
        for shield in self.shields:
            action = shield.filter(state, action)
        return action

Monitoring: Reward Distribution Drift

from prometheus_client import Gauge, Histogram, Counter

# Metrics
reward_per_episode = Histogram(
    'rl_reward_per_episode',
    'Total reward per episode',
    buckets=[0, 10, 50, 100, 200, 500, 1000]
)

cost_per_episode = Histogram(
    'rl_cost_per_episode',
    'Total constraint cost per episode',
    buckets=[0, 0.1, 0.5, 1.0, 2.0, 5.0]
)

safety_interventions = Counter(
    'rl_safety_shield_interventions_total',
    'Number of safety shield activations',
    ['shield_type']
)

lambda_value = Gauge(
    'rl_lagrangian_lambda',
    'Current Lagrange multiplier value'
)

class RLMonitor:
    """Monitor RL agent for anomalies."""
    
    def __init__(self, baseline_reward: float = 100.0):
        self.baseline_reward = baseline_reward
        self.sigma_threshold = 3.0
        self.rewards = []
    
    def record_episode(self, reward: float, cost: float, interventions: int):
        self.rewards.append(reward)
        reward_per_episode.observe(reward)
        cost_per_episode.observe(cost)
        
        # Check for anomalies
        if len(self.rewards) > 100:
            mean = np.mean(self.rewards[-100:])
            std = np.std(self.rewards[-100:])
            
            if reward > mean + self.sigma_threshold * std:
                return {'alert': 'REWARD_SPIKE', 'reward': reward, 'mean': mean}
        
        return None

Safe Exploration Strategies

Strategy Comparison

StrategyDescriptionUse Case
Intrinsic CuriosityReward noveltySparse reward games
Uncertainty EstimationExplore where confidentSafety-critical
Safe BaselinesConstrained to known-safeRobotics
Shielded ExplorationShield during learningReal-world training

Implementation: Uncertainty-Based Exploration

import torch
import torch.nn as nn

class EnsembleQNetwork(nn.Module):
    """Ensemble for epistemic uncertainty estimation."""
    
    def __init__(self, state_dim: int, action_dim: int, n_ensembles: int = 5):
        super().__init__()
        self.n_ensembles = n_ensembles
        self.networks = nn.ModuleList([
            self._build_network(state_dim, action_dim)
            for _ in range(n_ensembles)
        ])
    
    def _build_network(self, state_dim, action_dim):
        return nn.Sequential(
            nn.Linear(state_dim, 256),
            nn.ReLU(),
            nn.Linear(256, 256),
            nn.ReLU(),
            nn.Linear(256, action_dim)
        )
    
    def forward(self, state):
        predictions = [net(state) for net in self.networks]
        return torch.stack(predictions)
    
    def get_uncertainty(self, state):
        """Epistemic uncertainty as disagreement."""
        predictions = self(state)
        return predictions.std(dim=0)
    
    def safe_action(self, state, threshold: float = 0.5):
        """Only act if uncertainty is low."""
        uncertainty = self.get_uncertainty(state)
        mean_prediction = self(state).mean(dim=0)
        
        # If too uncertain, take conservative action
        if uncertainty.max() > threshold:
            return self._conservative_action()
        
        return mean_prediction.argmax()

RLHF: Human Feedback Integration

graph TB
    A[Base Policy] --> B[Generate Outputs]
    B --> C[Human Labelers]
    C --> D[Preference Pairs]
    D --> E[Train Reward Model]
    E --> F[PPO on Reward Model]
    F --> G[Improved Policy]
    G --> B

Reward Model Implementation

class RewardModel(nn.Module):
    """Learn reward function from human preferences."""
    
    def __init__(self, input_dim: int, hidden_dim: int = 256):
        super().__init__()
        self.network = nn.Sequential(
            nn.Linear(input_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, 1)
        )
    
    def forward(self, x):
        return self.network(x)
    
    def preference_loss(self, preferred, rejected):
        """Bradley-Terry model for preferences."""
        r_pref = self(preferred)
        r_rej = self(rejected)
        return -torch.log(torch.sigmoid(r_pref - r_rej)).mean()

Summary Checklist

ItemDescriptionPriority
Separate MetricsTrack ground truth separatelyCritical
Safety ShieldHard-coded override layerCritical
Reward BoundsCap maximum reward per episodeHigh
Cost MonitoringTrack constraint violationsHigh
Drift AlertsAlert on reward spikesMedium
Lambda MonitoringTrack Lagrange multiplierMedium
Kill SwitchHardware overrideCritical for physical

[End of Section 38.4]