# 这个是我自己写的代码,没有用parl框架
import numpy as np
import paddle.fluid as fluid
import gym
import matplotlib.pyplot as plt
LEARNING_RATE = 0.0001
class Agent(object):
def __init__(self, obs_dim, act_dim, learning_rate):
self.obs_dim = obs_dim
self.act_dim = act_dim
self.lr = learning_rate
self.exe = fluid.Executor(fluid.CPUPlace())
# ============================
self.predict_program = fluid.Program()
self.learn_program = fluid.Program()
self.act_prob = 0
self.cost = 0
with fluid.program_guard(self.predict_program, fluid.default_startup_program()):
obs = fluid.layers.data(name='obs', shape=[self.obs_dim], dtype='float32')
self.act_prob = self.predict(obs)
with fluid.program_guard(self.learn_program, fluid.default_startup_program()):
obs = fluid.layers.data(name='obs', shape=[self.obs_dim], dtype='float32')
action = fluid.layers.data(name='action', shape=[self.act_dim], dtype='int64')
reward = fluid.layers.data(name='reward', shape=[1], dtype='float32')
self.cost = self.learn(obs, action, reward)
self.exe.run(fluid.default_startup_program())
def forward(self, obs):
h1 = fluid.layers.fc(input=obs, size=self.act_dim*10, act='tanh')
h3 = fluid.layers.fc(input=h1, size=self.act_dim, act='softmax')
return h3
def predict(self, obs):
result = self.forward(obs)
return result
def learn(self, obs, action, reward):
act_prob = self.predict(obs)
log_prob = fluid.layers.reduce_sum(-1.0 * fluid.layers.log(act_prob) * fluid.layers.one_hot(action, act_prob.shape[1]), dim=1)
cost = log_prob * reward
# cost = -1 * log_prob * reward # ========================================================================try
avg_cost = fluid.layers.reduce_mean(cost)
optimizer = fluid.optimizer.AdamOptimizer(self.lr)
opt = optimizer.minimize(avg_cost)
return avg_cost
def choose_action(self, obs):
obs = np.expand_dims(obs, axis=0)
act_prob = self.exe.run(self.predict_program,
feed={'obs': obs.astype('float32')},
fetch_list=[self.act_prob])[0]
act_prob = np.squeeze(act_prob, axis=0)
act = np.random.choice(range(self.act_dim), p=act_prob)
return act
def update_learn(self, obs, action, reward):
action = np.expand_dims(action, axis=-1)
feed = {'obs': obs.astype('float32'),
'action': action.astype('int64'),
'reward': reward.astype('float32')}
cost = self.exe.run(self.learn_program, feed=feed, fetch_list=[self.cost])[0]
return cost
def run_episode(env, agent):
obs_ls, action_ls, reward_ls = [], [], []
obs = env.reset()
total_reward = 0
while True:
obs_ls.append(obs)
action = agent.choose_action(obs)
action_ls.append(action)
obs, reward, done, info = env.step(action)
reward_ls.append(reward)
if done:
break
total_reward += reward
return obs_ls, action_ls, reward_ls, total_reward
def evaluate(env, agent, epoch=5):
total_reward = 0
for i in range(epoch):
_1, _2, _3, total_reward = run_episode(env, agent)
return total_reward
def calc_reward_to_go(reward_list, gamma=1):
for i in range(len(reward_list) - 2, -1, -1):
# G_t = r_t + gamma * r_t+1 + .... = r_t + gamma*G_t+1
reward_list[i] += gamma * reward_list[i+1]
return np.array(reward_list)
def main():
env = gym.make('CartPole-v0')
obs_dim = env.observation_space.shape[0]
act_dim = env.action_space.n
agent = Agent(obs_dim, act_dim, learning_rate=LEARNING_RATE)
max_reward = 0
costs = []
for i in range(1000):
obs_ls, action_ls, reward_ls, total_reward = run_episode(env, agent)
batch_obs = np.array(obs_ls)
batch_action = np.array(action_ls)
batch_reward = calc_reward_to_go(reward_ls)
train_cost = agent.update_learn(batch_obs, batch_action, batch_reward)
costs.append(train_cost)
if max_reward < total_reward:
max_reward = total_reward
if i % 10 == 0:
print('after {} episode , the reward is {}, the max_reward is {}'.format(i, total_reward, max_reward))
plt.plot(costs)
plt.show()
if __name__ == '__main__':
main()
#===================================================================
# 这个是飞桨官方的代码
import os
import gym
import numpy as np
import paddle.fluid as fluid
import parl
from parl import layers
from parl.utils import logger
import matplotlib.pyplot as plt
LEARNING_RATE = 1e-3
class Model(parl.Model):
def __init__(self, act_dim):
act_dim = act_dim
hid1_size = act_dim * 10
self.fc1 = layers.fc(size=hid1_size, act='tanh')
self.fc2 = layers.fc(size=act_dim, act='softmax')
def forward(self, obs): # 可直接用 model = Model(5); model(obs)调用
out = self.fc1(obs)
out = self.fc2(out)
return out
class PolicyGradient(parl.Algorithm):
def __init__(self, model, lr=None):
""" Policy Gradient algorithm
Args:
model (parl.Model): policy的前向网络.
lr (float): 学习率.
"""
self.model = model
assert isinstance(lr, float)
self.lr = lr
def predict(self, obs):
""" 使用policy model预测输出的动作概率
"""
return self.model(obs)
def learn(self, obs, action, reward):
""" 用policy gradient 算法更新policy model
"""
act_prob = self.model(obs) # 获取输出动作概率
# log_prob = layers.cross_entropy(act_prob, action) # 交叉熵
log_prob = layers.reduce_sum(
-1.0 * layers.log(act_prob) * layers.one_hot(
action, act_prob.shape[1]),
dim=1)
cost = log_prob * reward
cost = layers.reduce_mean(cost)
optimizer = fluid.optimizer.Adam(self.lr)
optimizer.minimize(cost)
return cost
class Agent(parl.Agent):
def __init__(self, algorithm, obs_dim, act_dim):
self.obs_dim = obs_dim
self.act_dim = act_dim
super(Agent, self).__init__(algorithm)
def build_program(self):
self.pred_program = fluid.Program()
self.learn_program = fluid.Program()
with fluid.program_guard(self.pred_program): # 搭建计算图用于 预测动作,定义输入输出变量
obs = layers.data(
name='obs', shape=[self.obs_dim], dtype='float32')
self.act_prob = self.alg.predict(obs)
with fluid.program_guard(
self.learn_program): # 搭建计算图用于 更新policy网络,定义输入输出变量
obs = layers.data(
name='obs', shape=[self.obs_dim], dtype='float32')
act = layers.data(name='act', shape=[1], dtype='int64')
reward = layers.data(name='reward', shape=[], dtype='float32')
self.cost = self.alg.learn(obs, act, reward)
def sample(self, obs):
obs = np.expand_dims(obs, axis=0) # 增加一维维度
act_prob = self.fluid_executor.run(
self.pred_program,
feed={'obs': obs.astype('float32')},
fetch_list=[self.act_prob])[0]
act_prob = np.squeeze(act_prob, axis=0) # 减少一维维度
act = np.random.choice(range(self.act_dim), p=act_prob) # 根据动作概率选取动作
return act
def predict(self, obs):
obs = np.expand_dims(obs, axis=0)
act_prob = self.fluid_executor.run(
self.pred_program,
feed={'obs': obs.astype('float32')},
fetch_list=[self.act_prob])[0]
act_prob = np.squeeze(act_prob, axis=0)
act = np.argmax(act_prob) # 根据动作概率选择概率最高的动作
return act
def learn(self, obs, act, reward):
act = np.expand_dims(act, axis=-1)
feed = {
'obs': obs.astype('float32'),
'act': act.astype('int64'),
'reward': reward.astype('float32')
}
cost = self.fluid_executor.run(
self.learn_program, feed=feed, fetch_list=[self.cost])[0]
return cost
def run_episode(env, agent):
obs_list, action_list, reward_list = [], [], []
obs = env.reset()
while True:
obs_list.append(obs)
action = agent.sample(obs) # 采样动作
action_list.append(action)
obs, reward, done, info = env.step(action)
reward_list.append(reward)
if done:
break
return obs_list, action_list, reward_list
def evaluate(env, agent, render=False):
eval_reward = []
for i in range(5):
obs = env.reset()
episode_reward = 0
while True:
action = agent.predict(obs) # 选取最优动作
obs, reward, isOver, _ = env.step(action)
episode_reward += reward
if render:
env.render()
if isOver:
break
eval_reward.append(episode_reward)
return np.mean(eval_reward)
# 根据一个episode的每个step的reward列表,计算每一个Step的Gt
def calc_reward_to_go(reward_list, gamma=1.0):
for i in range(len(reward_list) - 2, -1, -1):
# G_t = r_t + γ·r_t+1 + ... = r_t + γ·G_t+1
reward_list[i] += gamma * reward_list[i + 1] # Gt
return np.array(reward_list)
# 创建环境
env = gym.make('CartPole-v0')
obs_dim = env.observation_space.shape[0]
act_dim = env.action_space.n
logger.info('obs_dim {}, act_dim {}'.format(obs_dim, act_dim))
# 根据parl框架构建agent
model = Model(act_dim=act_dim)
alg = PolicyGradient(model, lr=LEARNING_RATE)
agent = Agent(alg, obs_dim=obs_dim, act_dim=act_dim)
# 加载模型
# if os.path.exists('./model.ckpt'):
# agent.restore('./model.ckpt')
# run_episode(env, agent, train_or_test='test', render=True)
# exit()
costs = []
for i in range(1000):
obs_list, action_list, reward_list = run_episode(env, agent)
if i % 10 == 0:
logger.info("Episode {}, Reward Sum {}.".format(
i, sum(reward_list)))
batch_obs = np.array(obs_list)
batch_action = np.array(action_list)
batch_reward = calc_reward_to_go(reward_list)
cost = agent.learn(batch_obs, batch_action, batch_reward)
costs.append(cost)
if (i + 1) % 200 == 0:
total_reward = evaluate(env, agent, render=False) # render=True 查看渲染效果,需要在本地运行,AIStudio无法显示
logger.info('Test reward: {}'.format(total_reward))
plt.plot(costs)
plt.show()
# 保存模型到文件 ./model.ckpt
agent.save('./model.ckpt'
#====================================
相比于parl 框架,直接用fluid,代码量更少。
但是reward最高是120,大部分的回合reward 低于20
求大神分析
+1
+2
提一下issue吧
我也发现这个问题了,parl框架是把fluid又封装了一层,怎么用资料还很少,还有好多bug,反而不如原来fluid的方便,感觉是这个封装里还有些特殊操作。
很迷。网络和原来的网络都一样,搞不懂哪里有问题。我用tf写了一遍,90回合reward就能达到200了。不知道哪里除了问题。
PARL感觉就DQN同步网络参数的那一步有用点。其他地方没感觉哪里好用。有的话就是强化学习的代码写起来像搭积木类似的,按照官方给的步骤来就是了。但是个人不太喜欢那种写代码的方式。CSDN有位大神直接用Fluid把DQN给撸出来了https://blog.csdn.net/qq_41427568/article/details/87614031?utm_medium=distribute.pc_relevant.none-task-blog-BlogCommendFromMachineLearnPai2-1.nonecase&depth_1-utm_source=distribute.pc_relevant.none-task-blog-BlogCommendFromMachineLearnPai2-1.nonecase
问题已经解决了。原因是用于预测agent的动作的网络没有进行更新。
下面的代码如果碰到了reward在100个回合之内都低于50的请多运行几次。有时候会reward在30个回合之内就飙到200,但是有的时候又会一直低于50.
PS:上面的代码能运行,但是逻辑有点不对。需要对上面的代码作出修改。修改如下: