使用actor-critic方法来控制CartPole-V0 游戏详解
CartPole 介绍
在一个光滑的轨道上有个推车,杆子垂直微置在推车上,随时有倒的风险。系统每次对推车施加向左或者向右的力,但我们的目标是让杆子保持直立。杆子保持直立的每个时间单位都会获得 +1 的奖励。但是当杆子与垂直方向成 15 度以上的位置,或者推车偏离中心点超过 2.4 个单位后,这一轮局游戏结束。因此我们可以获得的最高回报等于 200 。我们这里就是要通过使用 PPO 算法来训练一个强化学习模型 actor-critic ,通过对比模型训练前后的游戏运行 gif 图,可以看出来我们训练好的模型能长时间保持杆子处于垂直状态。
Actor Critic 介绍
当 agent 采取行动并在环境中移动时,它在观察到的环境状态的情况下,学习两个可能的输出:
- 接下来最合适的一个操作,actor 负责此部分输出。
- 未来可能获得的奖励总和,critic 负责此部分的输出。
actor 和 critic 通过不断地学习,以便使得 agent 在游戏中最终获得的奖励最大,这里的 agent 就是那个小车。
库准备
tensorflow-gpu==2.10.0 imageio==2.26.1 keras==2.10,0 gym==0.20.0 pyglet==1.5.20 scipy==1.10.1
设置超参数
这部分代码主要有:
(1)导入所需的Python库:gym、numpy、tensorflow 和 keras。
(2)设置整个环境的超参数:种子、折扣因子和每个回合的最大步数。
(3)创建 CartPole-v0 环境,并设置种子。
(4)定义一个非常小的值 eps ,表示的机器两个不同的数字之间的最小差值,用于检验数值稳定性。
import gym # 导入Gym库,用于开发和比较强化学习算法 import numpy as np # 导入NumPy库,用于进行科学计算 import tensorflow as tf # 导入TensorFlow库 from tensorflow import keras # 导入keras模块,这是一个高级神经网络API from tensorflow.keras import layers # 导入keras中的layers模块,用于创建神经网络层 seed = 42 # 设定随机种子,用于复现实验结果 gamma = 0.99 # 定义折扣率,用于计算未来奖励的现值 max_steps_per_episode = 10000 # 设定每个 episode 的最大步数 env = gym.make("CartPole-v0") # 创建 CartPole-v0 环境实例 env.seed(seed) # 设定环境的随机种子 eps = np.finfo(np.float32).eps.item() # 获取 float32 数据类型的误差最小值 epsilon
Actor Critic 结构搭建
(1)Actor:将环境的状态作为输入,返回操作空间中每个操作及其概率值,其实总共只有两个操作,往左和往右。
(2)Critic:将环境的状态作为输入,返回未来奖励综合的估计。
(3)在这里网络结构中我们在一开始接收 inputs 之后,我们的 Actor 和 Critic 共用了中间的部分隐藏层 common 层,然后在一个输出分支上连接了一个全连接进行动作分类作为 action ,另一个分支上连接了一个全连接层进行未来奖励计算作为 critic 。
num_inputs = 4 # 状态空间的维度,即输入层的节点数 num_actions = 2 # 行为空间的维度,即输出层的节点数 num_hidden = 128 # 隐藏层的节点数 inputs = layers.Input(shape=(num_inputs,)) # 创建输入层,指定输入的形状 common = layers.Dense(num_hidden, activation="relu")(inputs) # 创建一个全连接层,包含num_hidden 个神经元,使用 ReLU 作为激活函数 action = layers.Dense(num_actions, activation="softmax")(common) # 创建一个全连接层,包含 num_actions 个神经元,使用 softmax 作为激活函数 critic = layers.Dense(1)(common) # 创建一个全连接层,包含1个神经元 model = keras.Model(inputs=inputs, outputs=[action, critic]) # 创建一个 Keras 模型,包含输入层、共享的隐藏层和两个输出层
训练前的样子
import imageio start = env.reset() frames = [] for t in range(max_steps_per_episode): frames.append(env.render(mode='rgb_array')) start = start.reshape(1, -1) start, reward, done, _ = env.step(np.random.choice(num_actions, p=np.squeeze(action_probs))) if done: break with imageio.get_writer('未训练前的样子.gif', mode='I') as writer: for frame in frames: writer.append_data(frame)
模型训练
设置训练所需要的优化器,以及各种参数来记录每个时间步上的数据。
optimizer = keras.optimizers.Adam(learning_rate=0.01) # 创建 Adam 优化器实例,设置学习率为 0.01 huber_loss = keras.losses.Huber() # 创建损失函数实例 action_probs_history = [] # 创建一个列表,用于保存 action 网络在每个步骤中采取各个行动的概率 critic_value_history = [] # 创建一个列表,用于保存 critic 网络在每个步骤中对应的值 rewards_history = [] # 创建一个列表,用于保存每个步骤的奖励值 running_reward = 0 # 初始化运行过程中的每轮奖励 episode_count = 0 # 初始化 episode 计数器
一直训练下去,直到满足奖励大于 195 才会停下训练过程。
while True: state = env.reset() # 新一轮游戏开始,重置环境 episode_reward = 0 # 记录本轮游戏的总奖励值 with tf.GradientTape() as tape: # 构建 GradientTape 用于计算梯度 for timestep in range(1, max_steps_per_episode): # 本轮游戏如果一切正常会进行 max_steps_per_episode 步 state = tf.convert_to_tensor(state) # 将状态转换为张量 state = tf.expand_dims(state, 0) # 扩展维度,以适应模型的输入形状 action_probs, critic_value = model(state) # 前向传播,得到 action 网络输出的动作空间的概率分布,和 critic 网络预测的奖励值 critic_value_history.append(critic_value[0, 0]) # 将上面 critic 预测的奖励值记录在 critic_value_history 列表中 action = np.random.choice(num_actions, p=np.squeeze(action_probs)) # 依据概率分布抽样某个动作,当然了某个动作概率越大越容易被抽中,同时也保留了一定的随机性 action_probs_history.append(tf.math.log(action_probs[0, action])) # 将使用该动作的对数概率值记录在 action_probs_history 列表中 state, reward, done, _ = env.step(action) # 游戏环境使用选中的动作去执行,得到下一个游戏状态、奖励、是否终止和其他信息 rewards_history.append(reward) # 将该时刻的奖励记录在 rewards_history 列表中 episode_reward += reward # 累加本轮游戏的总奖励值 if done: # 如果到达终止状态,则结束循环 break running_reward = 0.05 * episode_reward + (1 - 0.05) * running_reward # 计算平均奖励 returns = [] # 存储折扣回报 discounted_sum = 0 for r in rewards_history[::-1]: # 从后往前遍历奖励的历史值 discounted_sum = r + gamma * discounted_sum # 计算折扣回报 returns.insert(0, discounted_sum) # 将折扣回报插入列表的开头,最后形成的还是从前往后的折扣奖励列表 returns = np.array(returns) # 将折扣回报转换为数组 returns = (returns - np.mean(returns)) / (np.std(returns) + eps) # 归一化折扣回报 returns = returns.tolist() # 将折扣回报转换为列表形式 history = zip(action_probs_history, critic_value_history, returns) # 将三个列表进行 zip 压缩 actor_losses = [] # 存储 action 网络的损失 critic_losses = [] # 存储 critic 网络的损失 for log_prob, value, ret in history: diff = ret - value actor_losses.append(-log_prob * diff) # 计算 actor 的损失函数 critic_losses.append( huber_loss(tf.expand_dims(value, 0), tf.expand_dims(ret, 0)) # 计算 critic 的损失函数 ) loss_value = sum(actor_losses) + sum(critic_losses) # 计算总损失函数 grads = tape.gradient(loss_value, model.trainable_variables) # 计算梯度 optimizer.apply_gradients(zip(grads, model.trainable_variables)) # 更新模型参数 action_probs_history.clear() # 清空之前的历史记录 critic_value_history.clear() # 清空之前的历史记录 rewards_history.clear() # 清空之前的历史记录 episode_count += 1 # 当一轮游戏结束时, episode 加一 if episode_count % 10 == 0: # 每训练 10 个 episode ,输出当前的平均奖励 template = "在第 {} 轮游戏中获得奖励: {:.2f} 分" print(template.format(episode_count, running_reward)) if running_reward > 195: # 如果平均奖励超过195,视为任务已经解决 print("奖励超过 195 ,训练结束") break
打印:
在第 10 轮游戏中获得奖励: 11.17 分
在第 20 轮游戏中获得奖励: 17.12 分
...
在第 170 轮游戏中获得奖励: 155.02 分
在第 180 轮游戏中获得奖励: 171.67 分
...
在第 220 轮游戏中获得奖励: 193.74 分
奖励超过 195 ,训练结束
训练后的样子
import imageio start = env.reset() frames = [] for t in range(max_steps_per_episode): frames.append(env.render(mode='rgb_array')) start = start.reshape(1, -1) action_probs, _ = model(start) action = np.random.choice(num_actions, p=np.squeeze(action_probs)) start, reward, done, _ = env.step(action) if done: break with imageio.get_writer('训练后的样子.gif', mode='I') as writer: for frame in frames: writer.append_data(frame)
以上就是使用actor-critic方法来控制CartPole-V0 游戏详解的详细内容,更多关于 actor-critic控制CartPole-V0的资料请关注脚本之家其它相关文章!
相关文章
利用Jmeter实现在请求param和body里面加入随机参数
本文介绍了如何使用jemeter实现新增接口压力测试中的随机参数生成,首先,使用函数助手对话框生成随机数,然后将生成的随机数作为参数放入请求中,无论请求格式是json、xml还是text等,如果param和body同时存在并需要随机生成参数,可以把参数写入到请求地址中2024-10-10
最新评论