PPO (Proximal Policy Optimization)

PPO (Proximal Policy Optimization)#

Alt text

In this lesson, we will explore Proximal Policy Optimization (PPO), a powerful reinforcement learning algorithm that builds on the Actor-Critic framework (like A2C) but introduces a key mechanism to stabilize learning. PPO is one of the most popular policy optimization algorithms because it balances ease of implementation and performance across a wide range of tasks.

PPO improves upon vanilla Policy Gradient methods and A2C by clipping updates to prevent large policy changes, leading to more stable learning. Let’s explore how PPO works and why it outperforms simpler approaches like A2C in many environments.

PPO Overview#

Similar to A2C, PPO is an Actor-Critic method, which means it uses:

  • Actor: The policy function \(π(a∣s)\), which maps states to actions and decides which action to take in a given state.

  • Critic: The value function \(V(s)\), which evaluates the quality of the actions chosen by the actor and provides feedback to help improve the policy.

The core difference lies in how PPO updates the policy, using a clipping mechanism that controls how much the policy can change with each update, thus preventing instability.

The Clipping Mechanism#

The key idea in PPO is the clipping of the policy update to avoid large, sudden changes that could destabilize learning.

In traditional Actor-Critic methods (like A2C), we update the policy using the advantage function:

\[ E[A(s,a)⋅logπθ​(a∣s)] \]

Where \(A(s,a)\) is the advantage function, which measures how much better an action was compared to the baseline value of the state.

However, in PPO, instead of directly maximizing this objective, we introduce the probability ratio:

\[ rθ​=πθold​​(a∣s)πθ​(a∣s)​ \]

Where:

  • \(πθ​(a∣s)\) is the probability of the action under the new policy.

  • \(πθold​​(a∣s)\) is the probability of the action under the old policy.

The ratio tells us how much the policy has changed from the previous update. PPO clips this ratio to stay within a safe range:

\[ LCLIP(θ)=E[min(rθ​A(s,a),clip(rθ​,1−ϵ,1+ϵ)A(s,a))] \]

Where:

  • \(ϵ\) is a hyperparameter (e.g., 0.1 or 0.2) that limits the policy update.

  • \(clip(rθ,1−ϵ,1+ϵ)\) ensures the ratio stays close to 1, meaning that the policy only changes slightly.

The advantage of clipping is that it prevents overly large updates to the policy, ensuring stability while still allowing for some exploration of new actions.

PPO Algorithm#

The process of training with PPO involves the following steps:

  1. Interact with the Environment

The agent collects data by interacting with the environment over multiple time steps:

  • It observes the current state ss.

  • The actor selects an action aa based on the current policy πθ(a∣s)πθ​(a∣s).

  • The environment provides the next state s′s′ and a reward rr.

  1. Compute the Advantage

The agent computes the advantage function:

\[ A(s,a)=r+γV(s′)−V(s) \]

Where:

  • \(r\) is the reward.

  • \(γ\) is the discount factor.

  • \(V(s)\) and \(V(s′)\) are the value estimates for the current and next states.

  1. Update the Policy (Actor)

The actor is updated using the clipped objective. The policy gradient is computed as:

\[ LCLIP(θ)=min⁡(rθA(s,a),clip(rθ,1−ϵ,1+ϵ)A(s,a)) \]

This ensures that the policy only changes gradually, preventing large updates that could destabilize the learning process. 4. Update the Value Function (Critic)

The critic is updated to minimize the value loss. The loss function for the critic is:

\[ Critic Loss=(r+γV(s′)−V(s))2 \]

The critic minimizes the difference between the actual return (reward + discounted future rewards) and the value estimate, helping the actor to make better decisions over time. 5. Total Loss

The total loss in PPO is a combination of the actor loss, the critic loss, and an optional entropy bonus (to encourage exploration):

\[ Total Loss=LCLIP(θ)+c1​⋅Critic Loss−c2​⋅Entropy \]

Where:

  • \(c1\)​ and \(c2\)​ are hyperparameters controlling the balance between the actor and critic losses, and the entropy term.

  • Entropy encourages exploration by preventing the policy from becoming too deterministic early in training.

Coding PPO#

from torch import nn
import torch
from torch.distributions import Categorical


class ActorCritic(nn.Module):
    def __init__(self, input_size, hidden_size, output_size):
        super(ActorCritic, self).__init__()
        self.shared = nn.Sequential(nn.Linear(input_size, hidden_size), nn.ReLU())
        self.actor = nn.Sequential(
            nn.Linear(hidden_size, output_size), nn.Softmax(dim=-1)
        )
        self.critic = nn.Linear(hidden_size, 1)

    def forward(self, x):
        shared = self.shared(x)
        return self.actor(shared), self.critic(shared)


class PPOCar():

    def __init__(
        self,
        input_size=5,
        hidden_size=5,
        output_size=4,

    ):
        super().__init__()
        self.model = self.model = ActorCritic(input_size, hidden_size, output_size).to(
            self.device
        )

    def reset_episode(self):
        self.states = []
        self.actions = []
        self.log_probs = []
        self.values = []
        self.entropies = []

    def forward(self, state):
        state = torch.FloatTensor(state).unsqueeze(0).to(self.device)
        probs, value = self.model(state)
        m = Categorical(probs)
        action = m.sample()
        log_prob = m.log_prob(action)
        entropy = m.entropy()

        self.states.append(state)
        self.actions.append(action)
        self.log_probs.append(log_prob)
        self.values.append(value)
        self.entropies.append(entropy)

        return action.item()

    def compute_gae(self, next_value, rewards):
        gae = 0
        returns = []
        values = self.values + [next_value]
        for step in reversed(range(len(rewards))):
            delta = (
                rewards[step] + self.discount_factor * values[step + 1] - values[step]
            )
            gae = delta + self.discount_factor * 0.95 * gae
            returns.insert(0, gae + values[step])
        return returns

    def train(self, rewards):
        next_state = torch.FloatTensor(self.get_data()).unsqueeze(0).to(self.device)
        _, next_value = self.model(next_state)
        returns = self.compute_gae(next_value, rewards)

        states = torch.cat(self.states)
        actions = torch.cat(self.actions)
        old_log_probs = torch.cat(self.log_probs).detach()
        returns = torch.tensor(returns).to(self.device)

        advantages = returns - torch.cat(self.values).detach().squeeze()

        # Normalize advantages
        advantages = (advantages - advantages.mean()) / (advantages.std() + 1e-8)

        for _ in range(self.ppo_epochs):
            # Create mini-batches
            indices = torch.randperm(len(states))
            for start in range(0, len(states), self.batch_size):
                end = min(start + self.batch_size, len(states))
                batch_indices = indices[start:end]

                batch_states = states[batch_indices]
                batch_actions = actions[batch_indices]
                batch_old_log_probs = old_log_probs[batch_indices]
                batch_returns = returns[batch_indices]
                batch_advantages = advantages[batch_indices]

                probs, values = self.model(batch_states)
                m = Categorical(probs)
                new_log_probs = m.log_prob(batch_actions)
                entropy = m.entropy().mean()

                ratio = torch.exp(new_log_probs - batch_old_log_probs)
                surr1 = ratio * batch_advantages
                surr2 = (
                    torch.clamp(ratio, 1 - self.epsilon, 1 + self.epsilon)
                    * batch_advantages
                )

                actor_loss = -torch.min(surr1, surr2).mean()
                critic_loss = (batch_returns - values.squeeze()).pow(2).mean()
                entropy_loss = -entropy

                loss = (
                    actor_loss
                    + self.critic_weight * critic_loss
                    + self.entropy_weight * entropy_loss
                )

                self.optimizer.zero_grad()
                loss.backward()
                nn.utils.clip_grad_norm_(self.model.parameters(), 0.5)
                self.optimizer.step()

        self.reset_episode()
        return loss.item()

    def action_train(self, state):

        action = self.forward(state)

        if action == 0:
            self.angle += 10  # Left
        elif action == 1:
            self.angle -= 10  # Right
        elif action == 2:
            if self.speed - 2 >= 6:
                self.speed -= 2  # Slow Down
        else:
            self.speed += 2  # Speed Up

        return action
class PPORace():

    def training_race(self, car: PPOCar, episodes, train_every):

        for episode in range(1, episodes + 1):

            current_state = car.get_data()
            rewards = []

            done = False
            episode_reward = 0
            while not done:

                action = car.action_train(current_state)
                car.update(self.game_map)
                new_state, reward, done = self.step(car)

                rewards.append(reward)
                episode_reward += reward

                current_state = new_state

            if episode % train_every == 0:
                loss = car.train(rewards)

Actual Training#