PPO (Proximal Policy Optimization)#
In this lesson, we will explore Proximal Policy Optimization (PPO), a powerful reinforcement learning algorithm that builds on the Actor-Critic framework (like A2C) but introduces a key mechanism to stabilize learning. PPO is one of the most popular policy optimization algorithms because it balances ease of implementation and performance across a wide range of tasks.
PPO improves upon vanilla Policy Gradient methods and A2C by clipping updates to prevent large policy changes, leading to more stable learning. Let’s explore how PPO works and why it outperforms simpler approaches like A2C in many environments.
PPO Overview#
Similar to A2C, PPO is an Actor-Critic method, which means it uses:
Actor: The policy function \(π(a∣s)\), which maps states to actions and decides which action to take in a given state.
Critic: The value function \(V(s)\), which evaluates the quality of the actions chosen by the actor and provides feedback to help improve the policy.
The core difference lies in how PPO updates the policy, using a clipping mechanism that controls how much the policy can change with each update, thus preventing instability.
The Clipping Mechanism#
The key idea in PPO is the clipping of the policy update to avoid large, sudden changes that could destabilize learning.
In traditional Actor-Critic methods (like A2C), we update the policy using the advantage function:
Where \(A(s,a)\) is the advantage function, which measures how much better an action was compared to the baseline value of the state.
However, in PPO, instead of directly maximizing this objective, we introduce the probability ratio:
Where:
\(πθ(a∣s)\) is the probability of the action under the new policy.
\(πθold(a∣s)\) is the probability of the action under the old policy.
The ratio tells us how much the policy has changed from the previous update. PPO clips this ratio to stay within a safe range:
Where:
\(ϵ\) is a hyperparameter (e.g., 0.1 or 0.2) that limits the policy update.
\(clip(rθ,1−ϵ,1+ϵ)\) ensures the ratio stays close to 1, meaning that the policy only changes slightly.
The advantage of clipping is that it prevents overly large updates to the policy, ensuring stability while still allowing for some exploration of new actions.
PPO Algorithm#
The process of training with PPO involves the following steps:
Interact with the Environment
The agent collects data by interacting with the environment over multiple time steps:
It observes the current state ss.
The actor selects an action aa based on the current policy πθ(a∣s)πθ(a∣s).
The environment provides the next state s′s′ and a reward rr.
Compute the Advantage
The agent computes the advantage function:
Where:
\(r\) is the reward.
\(γ\) is the discount factor.
\(V(s)\) and \(V(s′)\) are the value estimates for the current and next states.
Update the Policy (Actor)
The actor is updated using the clipped objective. The policy gradient is computed as:
This ensures that the policy only changes gradually, preventing large updates that could destabilize the learning process. 4. Update the Value Function (Critic)
The critic is updated to minimize the value loss. The loss function for the critic is:
The critic minimizes the difference between the actual return (reward + discounted future rewards) and the value estimate, helping the actor to make better decisions over time. 5. Total Loss
The total loss in PPO is a combination of the actor loss, the critic loss, and an optional entropy bonus (to encourage exploration):
Where:
\(c1\) and \(c2\) are hyperparameters controlling the balance between the actor and critic losses, and the entropy term.
Entropy encourages exploration by preventing the policy from becoming too deterministic early in training.
Coding PPO#
from torch import nn
import torch
from torch.distributions import Categorical
class ActorCritic(nn.Module):
def __init__(self, input_size, hidden_size, output_size):
super(ActorCritic, self).__init__()
self.shared = nn.Sequential(nn.Linear(input_size, hidden_size), nn.ReLU())
self.actor = nn.Sequential(
nn.Linear(hidden_size, output_size), nn.Softmax(dim=-1)
)
self.critic = nn.Linear(hidden_size, 1)
def forward(self, x):
shared = self.shared(x)
return self.actor(shared), self.critic(shared)
class PPOCar():
def __init__(
self,
input_size=5,
hidden_size=5,
output_size=4,
):
super().__init__()
self.model = self.model = ActorCritic(input_size, hidden_size, output_size).to(
self.device
)
def reset_episode(self):
self.states = []
self.actions = []
self.log_probs = []
self.values = []
self.entropies = []
def forward(self, state):
state = torch.FloatTensor(state).unsqueeze(0).to(self.device)
probs, value = self.model(state)
m = Categorical(probs)
action = m.sample()
log_prob = m.log_prob(action)
entropy = m.entropy()
self.states.append(state)
self.actions.append(action)
self.log_probs.append(log_prob)
self.values.append(value)
self.entropies.append(entropy)
return action.item()
def compute_gae(self, next_value, rewards):
gae = 0
returns = []
values = self.values + [next_value]
for step in reversed(range(len(rewards))):
delta = (
rewards[step] + self.discount_factor * values[step + 1] - values[step]
)
gae = delta + self.discount_factor * 0.95 * gae
returns.insert(0, gae + values[step])
return returns
def train(self, rewards):
next_state = torch.FloatTensor(self.get_data()).unsqueeze(0).to(self.device)
_, next_value = self.model(next_state)
returns = self.compute_gae(next_value, rewards)
states = torch.cat(self.states)
actions = torch.cat(self.actions)
old_log_probs = torch.cat(self.log_probs).detach()
returns = torch.tensor(returns).to(self.device)
advantages = returns - torch.cat(self.values).detach().squeeze()
# Normalize advantages
advantages = (advantages - advantages.mean()) / (advantages.std() + 1e-8)
for _ in range(self.ppo_epochs):
# Create mini-batches
indices = torch.randperm(len(states))
for start in range(0, len(states), self.batch_size):
end = min(start + self.batch_size, len(states))
batch_indices = indices[start:end]
batch_states = states[batch_indices]
batch_actions = actions[batch_indices]
batch_old_log_probs = old_log_probs[batch_indices]
batch_returns = returns[batch_indices]
batch_advantages = advantages[batch_indices]
probs, values = self.model(batch_states)
m = Categorical(probs)
new_log_probs = m.log_prob(batch_actions)
entropy = m.entropy().mean()
ratio = torch.exp(new_log_probs - batch_old_log_probs)
surr1 = ratio * batch_advantages
surr2 = (
torch.clamp(ratio, 1 - self.epsilon, 1 + self.epsilon)
* batch_advantages
)
actor_loss = -torch.min(surr1, surr2).mean()
critic_loss = (batch_returns - values.squeeze()).pow(2).mean()
entropy_loss = -entropy
loss = (
actor_loss
+ self.critic_weight * critic_loss
+ self.entropy_weight * entropy_loss
)
self.optimizer.zero_grad()
loss.backward()
nn.utils.clip_grad_norm_(self.model.parameters(), 0.5)
self.optimizer.step()
self.reset_episode()
return loss.item()
def action_train(self, state):
action = self.forward(state)
if action == 0:
self.angle += 10 # Left
elif action == 1:
self.angle -= 10 # Right
elif action == 2:
if self.speed - 2 >= 6:
self.speed -= 2 # Slow Down
else:
self.speed += 2 # Speed Up
return action
class PPORace():
def training_race(self, car: PPOCar, episodes, train_every):
for episode in range(1, episodes + 1):
current_state = car.get_data()
rewards = []
done = False
episode_reward = 0
while not done:
action = car.action_train(current_state)
car.update(self.game_map)
new_state, reward, done = self.step(car)
rewards.append(reward)
episode_reward += reward
current_state = new_state
if episode % train_every == 0:
loss = car.train(rewards)