DQN
收藏
import numpy as np import paddle.fluid as fluid import random import gym from collections import deque from paddle.fluid.param_attr import ParamAttr # 定义一个深度神经网络 def Model(ipt, variable_field): fc1 = fluid.layers.fc(input=ipt, size=24, act='relu', param_attr=ParamAttr(name='{}_fc1'.format(variable_field)), bias_attr=ParamAttr(name='{}_fc1_b'.format(variable_field))) fc2 = fluid.layers.fc(input=fc1, size=24, act='relu', param_attr=ParamAttr(name='{}_fc2'.format(variable_field)), bias_attr=ParamAttr(name='{}_fc2_b'.format(variable_field))) out = fluid.layers.fc(input=fc2, size=2, param_attr=ParamAttr(name='{}_fc3'.format(variable_field)), bias_attr=ParamAttr(name='{}_fc3_b'.format(variable_field))) return out def dqn(): # 定义输入数据 state_data = fluid.data(name='state', shape=[None, 4], dtype='float32') action_data = fluid.data(name='action', shape=[None, 1], dtype='int64') reward_data = fluid.data(name='reward', shape=[None], dtype='float32') next_state_data = fluid.data(name='next_state', shape=[None, 4], dtype='float32') done_data = fluid.data(name='done', shape=[None], dtype='float32') # 获取策略网络 policyQ = Model(state_data, 'policy') # 克隆预测程序 predict_program = fluid.default_main_program().clone() action_onehot = fluid.layers.one_hot(action_data, 2) action_value = fluid.layers.elementwise_mul(action_onehot, policyQ) pred_action_value = fluid.layers.reduce_sum(action_value, dim=1) # 获取目标网络 targetQ = Model(next_state_data, 'target') best_v = fluid.layers.reduce_max(targetQ, dim=1) # 停止梯度更新 best_v.stop_gradient = True gamma = 1.0 target = reward_data + gamma * best_v * (1.0 - done_data) # 定义损失函数 cost = fluid.layers.square_error_cost(pred_action_value, target) avg_cost = fluid.layers.reduce_mean(cost) return predict_program, policyQ, avg_cost # 定义更新参数程序 def _build_sync_target_network(): # 获取所有的参数 vars = list(fluid.default_main_program().list_vars()) # 把两个网络的参数分别过滤出来 policy_vars = list(filter(lambda x: 'GRAD' not in x.name and 'policy' in x.name, vars)) target_vars = list(filter(lambda x: 'GRAD' not in x.name and 'target' in x.name, vars)) policy_vars.sort(key=lambda x: x.name) target_vars.sort(key=lambda x: x.name) # 从主程序中克隆一个程序用于更新参数 sync_program = fluid.default_main_program().clone() with fluid.program_guard(sync_program): sync_ops = [] for i, var in enumerate(policy_vars): sync_op = fluid.layers.assign(policy_vars[i], target_vars[i]) sync_ops.append(sync_op) # 完成更新参数 sync_program = sync_program._prune(sync_ops) return sync_program # 获取DQN程序 predict_program, policyQ, avg_cost = dqn() # 获取更新参数程序 _sync_program = _build_sync_target_network() # 定义优化方法 optimizer = fluid.optimizer.Adam(learning_rate=1e-3, epsilon=1e-3) opt = optimizer.minimize(avg_cost) # 创建执行器并进行初始化 place = fluid.CPUPlace() exe = fluid.Executor(place) exe.run(fluid.default_startup_program()) # 定义训练的参数 batch_size = 64 # batch大小 num_episodes = 10000 # 训练次数 e_greed = 0.1 # 探索初始概率 e_greed_decrement = 1e-6 # 在训练过程中,降低探索的概率 update_num = 0 def run_train(env, replay_buffer): global update_num, e_greed total_reward = 0 # 重置游戏状态 state = env.reset() while True: # 显示游戏界面 # env.render() state = np.expand_dims(state, axis=0) # 定义探索策略 e_greed = max(0.01, e_greed - e_greed_decrement) if np.random.rand() < e_greed: # 以 e_greed 的概率选择随机下一步动作 action = env.action_space.sample() else: # 使用模型预测作为结果下一步动作 action = exe.run(predict_program, feed={'state': state.astype('float32')}, fetch_list=[policyQ])[0] action = np.squeeze(action, axis=0) action = np.argmax(action) # 让游戏执行动作,获得执行完 动作的下一个状态,动作的奖励,游戏是否已结束以及额外信息 next_state, reward, done, info = env.step(action) total_reward += reward # 记录游戏输出的结果,作为之后训练的数据 replay_buffer.append((state, action, reward, next_state, done)) state = next_state # 如果游戏结束,就重新玩游戏 if done: break # 如果收集的数据大于Batch的大小,就开始训练 if len(replay_buffer) >= batch_size: batch_state, batch_action, batch_reward, batch_next_state, \ batch_done = [np.array(a, np.float32) for a in zip(*random.sample(replay_buffer, batch_size))] # 调整数据维度 batch_action = np.expand_dims(batch_action, axis=-1) batch_state = np.squeeze(batch_state, axis=1) # 执行训练 exe.run(program=fluid.default_main_program(), feed={'state': batch_state, 'action': batch_action.astype('int64'), 'reward': batch_reward, 'next_state': batch_next_state, 'done': batch_done}) # 更新参数 if update_num % 200 == 0: exe.run(program=_sync_program) update_num += 1 return total_reward # 评估模型 def evaluate(env): total_reward = 0 # 重置游戏状态 state = env.reset() while True: state = np.expand_dims(state, axis=0) # 使用模型预测作为结果下一步动作 action = exe.run(predict_program, feed={'state': state.astype('float32')}, fetch_list=[policyQ])[0] action = np.squeeze(action, axis=0) action = np.argmax(action) next_state, reward, done, info = env.step(action) state = next_state total_reward += reward if done: break return total_reward # 实例化一个游戏环境,参数为游戏名称 env = gym.make("CartPole-v1") replay_buffer = deque(maxlen=10000) # 开始训练 episode = 0 while episode < num_episodes: for t in range(50): train_reward = run_train(env, replay_buffer) episode += 1 print('Episode: {}, Reward: {:.2f}, e_greed: {:.2f}'.format(episode, train_reward, e_greed)) # 评估 eval_reward = evaluate(env) print('episode:{} test_reward:{}'.format(episode, eval_reward)) env.close()
0
收藏
请登录后评论
DQN训练Gym游戏