PARL实现DQN
收藏
agent.py
import numpy as np import paddle.fluid as fluid import parl from parl import layers class Agent(parl.Agent): def __init__(self, algorithm, obs_dim, act_dim, e_greed=0.1, e_greed_decrement=0): assert isinstance(obs_dim, int) assert isinstance(act_dim, int) # 预测图像的shape self.obs_dim = obs_dim # 动作组合的数量 self.act_dim = act_dim super(Agent, self).__init__(algorithm) self.global_step = 0 self.update_target_steps = 200 # 探索衰减参数 self.e_greed = e_greed self.e_greed_decrement = e_greed_decrement def build_program(self): self.pred_program = fluid.Program() self.learn_program = fluid.Program() with fluid.program_guard(self.pred_program): obs = layers.data(name='obs', shape=[self.obs_dim], dtype='float32') self.value = self.alg.predict(obs) with fluid.program_guard(self.learn_program): obs = layers.data(name='obs', shape=[self.obs_dim], dtype='float32') action = layers.data(name='act', shape=[1], dtype='int32') reward = layers.data(name='reward', shape=[], dtype='float32') next_obs = layers.data(name='next_obs', shape=[self.obs_dim], dtype='float32') isOver = layers.data(name='isOver', shape=[], dtype='bool') self.cost = self.alg.learn(obs, action, reward, next_obs, isOver) # 获取动作 def sample(self, obs): sample = np.random.rand() if sample < self.e_greed: # 随机生成动作 act = np.random.randint(self.act_dim) else: # 预测动作 act = self.predict(obs) self.e_greed = max(0.01, self.e_greed - self.e_greed_decrement) return act # 预测动作 def predict(self, obs): obs = np.expand_dims(obs, axis=0) pred_Q = self.fluid_executor.run(program=self.pred_program, feed={'obs': obs.astype('float32')}, fetch_list=[self.value]) pred_Q = np.squeeze(pred_Q) action = np.argmax(pred_Q) return action # 训练模型,在固定训练次数将参数更新到目标模型 def learn(self, obs, act, reward, next_obs, isOver): if self.global_step % self.update_target_steps == 0: # 更新目标模型参数 self.alg.sync_target() self.global_step += 1 feed = { 'obs': obs.astype('float32'), 'act': act.astype('int32'), 'reward': reward, 'next_obs': next_obs.astype('float32'), 'isOver': isOver, } cost = self.fluid_executor.run(self.learn_program, feed=feed, fetch_list=[self.cost])[0] return cost
model.py
import parl from parl import layers class Model(parl.Model): def __init__(self, act_dim): super().__init__() self.fc1 = layers.fc(size=128, act='relu') self.fc2 = layers.fc(size=128, act='relu') self.fc3 = layers.fc(size=act_dim, act=None) def value(self, obs): h1 = self.fc1(obs) h2 = self.fc2(h1) Q = self.fc3(h2) return Q
replay_memory.py
import random import collections import numpy as np class ReplayMemory(object): def __init__(self, max_size): self.buffer = collections.deque(maxlen=max_size) # 添加数据 def append(self, exp): self.buffer.append(exp) # 获取一批数据 def sample(self, batch_size): mini_batch = random.sample(self.buffer, batch_size) obs_batch, action_batch, reward_batch, next_obs_batch, isOver_batch = [], [], [], [], [] for experience in mini_batch: s, a, r, s_p, isOver = experience obs_batch.append(s) action_batch.append(a) reward_batch.append(r) next_obs_batch.append(s_p) isOver_batch.append(isOver) return np.array(obs_batch).astype('float32'), \ np.array(action_batch).astype('float32'), np.array(reward_batch).astype('float32'),\ np.array(next_obs_batch).astype('float32'), np.array(isOver_batch).astype('float32') # 获取当前数据记录的大小 def __len__(self): return len(self.buffer)
train.py
import gym import numpy as np from parl.utils import logger import parl from model import Model from agent import Agent from replay_memory import ReplayMemory LEARN_FREQ = 5 # 更新参数步数 MEMORY_SIZE = 20000 # 内存记忆 MEMORY_WARMUP_SIZE = 200 # 热身大小 BATCH_SIZE = 64 # batch大小 LEARNING_RATE = 0.0005 # 学习率大小 GAMMA = 0.99 # 奖励系数 E_GREED = 0.1 # 探索初始概率 E_GREED_DECREMENT = 1e-6 # 在训练过程中,降低探索的概率 MAX_EPISODE = 10000 # 训练次数 def run_train(agent, env, rpm): total_reward = 0 obs = env.reset() step = 0 while True: step += 1 # 获取随机动作和执行游戏 action = agent.sample(obs) next_obs, reward, isOver, _ = env.step(action) # 记录数据 rpm.append((obs, [action], reward, next_obs, isOver)) # train model if (len(rpm) > MEMORY_WARMUP_SIZE) and (step % LEARN_FREQ == 0): (batch_obs, batch_action, batch_reward, batch_next_obs, batch_isOver) = rpm.sample(BATCH_SIZE) agent.learn(batch_obs, batch_action, batch_reward, batch_next_obs, batch_isOver) total_reward += reward obs = next_obs # 结束游戏 if isOver: break return total_reward # 评估模型 def evaluate(agent, env): eval_reward = [] for i in range(5): obs = env.reset() episode_reward = 0 isOver = False while not isOver: action = agent.predict(obs) env.render() obs, reward, isOver, _ = env.step(action) episode_reward += reward eval_reward.append(episode_reward) return np.mean(eval_reward) def main(): # 实例化一个游戏环境,参数为游戏名称 env = gym.make('CartPole-v1') # 图像输入形状和动作维度 action_dim = env.action_space.n obs_shape = env.observation_space.shape # 创建存储执行游戏的内存 rpm = ReplayMemory(MEMORY_SIZE) # 创建模型 model = Model(act_dim=action_dim) algorithm = parl.algorithms.DQN(model, act_dim=action_dim, gamma=GAMMA, lr=LEARNING_RATE) agent = Agent(algorithm=algorithm, obs_dim=obs_shape[0], act_dim=action_dim, e_greed=E_GREED, e_greed_decrement=E_GREED_DECREMENT) # 预热 print("开始预热...") while len(rpm) < MEMORY_WARMUP_SIZE: run_train(agent, env, rpm) # 开始训练 print("开始正式训练...") episode = 0 while episode < MAX_EPISODE: # 训练 for i in range(0, 50): train_reward = run_train(agent, env, rpm) episode += 1 logger.info('Episode: {}, Reward: {:.2f}, e_greed: {:.2f}'.format(episode, train_reward, agent.e_greed)) # 评估 eval_reward = evaluate(agent, env) logger.info('episode:{} test_reward:{}'.format(episode, eval_reward)) if __name__ == '__main__': main()
0
收藏
请登录后评论
使用的是gym游戏
赞