PARL实现DQN
收藏
agent.py
import numpy as np
import paddle.fluid as fluid
import parl
from parl import layers
class Agent(parl.Agent):
def __init__(self,
algorithm,
obs_dim,
act_dim,
e_greed=0.1,
e_greed_decrement=0):
assert isinstance(obs_dim, int)
assert isinstance(act_dim, int)
# 预测图像的shape
self.obs_dim = obs_dim
# 动作组合的数量
self.act_dim = act_dim
super(Agent, self).__init__(algorithm)
self.global_step = 0
self.update_target_steps = 200
# 探索衰减参数
self.e_greed = e_greed
self.e_greed_decrement = e_greed_decrement
def build_program(self):
self.pred_program = fluid.Program()
self.learn_program = fluid.Program()
with fluid.program_guard(self.pred_program):
obs = layers.data(name='obs', shape=[self.obs_dim], dtype='float32')
self.value = self.alg.predict(obs)
with fluid.program_guard(self.learn_program):
obs = layers.data(name='obs', shape=[self.obs_dim], dtype='float32')
action = layers.data(name='act', shape=[1], dtype='int32')
reward = layers.data(name='reward', shape=[], dtype='float32')
next_obs = layers.data(name='next_obs', shape=[self.obs_dim], dtype='float32')
isOver = layers.data(name='isOver', shape=[], dtype='bool')
self.cost = self.alg.learn(obs, action, reward, next_obs, isOver)
# 获取动作
def sample(self, obs):
sample = np.random.rand()
if sample < self.e_greed:
# 随机生成动作
act = np.random.randint(self.act_dim)
else:
# 预测动作
act = self.predict(obs)
self.e_greed = max(0.01, self.e_greed - self.e_greed_decrement)
return act
# 预测动作
def predict(self, obs):
obs = np.expand_dims(obs, axis=0)
pred_Q = self.fluid_executor.run(program=self.pred_program,
feed={'obs': obs.astype('float32')},
fetch_list=[self.value])
pred_Q = np.squeeze(pred_Q)
action = np.argmax(pred_Q)
return action
# 训练模型,在固定训练次数将参数更新到目标模型
def learn(self, obs, act, reward, next_obs, isOver):
if self.global_step % self.update_target_steps == 0:
# 更新目标模型参数
self.alg.sync_target()
self.global_step += 1
feed = {
'obs': obs.astype('float32'),
'act': act.astype('int32'),
'reward': reward,
'next_obs': next_obs.astype('float32'),
'isOver': isOver,
}
cost = self.fluid_executor.run(self.learn_program, feed=feed, fetch_list=[self.cost])[0]
return cost
model.py
import parl
from parl import layers
class Model(parl.Model):
def __init__(self, act_dim):
super().__init__()
self.fc1 = layers.fc(size=128, act='relu')
self.fc2 = layers.fc(size=128, act='relu')
self.fc3 = layers.fc(size=act_dim, act=None)
def value(self, obs):
h1 = self.fc1(obs)
h2 = self.fc2(h1)
Q = self.fc3(h2)
return Q
replay_memory.py
import random
import collections
import numpy as np
class ReplayMemory(object):
def __init__(self, max_size):
self.buffer = collections.deque(maxlen=max_size)
# 添加数据
def append(self, exp):
self.buffer.append(exp)
# 获取一批数据
def sample(self, batch_size):
mini_batch = random.sample(self.buffer, batch_size)
obs_batch, action_batch, reward_batch, next_obs_batch, isOver_batch = [], [], [], [], []
for experience in mini_batch:
s, a, r, s_p, isOver = experience
obs_batch.append(s)
action_batch.append(a)
reward_batch.append(r)
next_obs_batch.append(s_p)
isOver_batch.append(isOver)
return np.array(obs_batch).astype('float32'), \
np.array(action_batch).astype('float32'), np.array(reward_batch).astype('float32'),\
np.array(next_obs_batch).astype('float32'), np.array(isOver_batch).astype('float32')
# 获取当前数据记录的大小
def __len__(self):
return len(self.buffer)
train.py
import gym
import numpy as np
from parl.utils import logger
import parl
from model import Model
from agent import Agent
from replay_memory import ReplayMemory
LEARN_FREQ = 5 # 更新参数步数
MEMORY_SIZE = 20000 # 内存记忆
MEMORY_WARMUP_SIZE = 200 # 热身大小
BATCH_SIZE = 64 # batch大小
LEARNING_RATE = 0.0005 # 学习率大小
GAMMA = 0.99 # 奖励系数
E_GREED = 0.1 # 探索初始概率
E_GREED_DECREMENT = 1e-6 # 在训练过程中,降低探索的概率
MAX_EPISODE = 10000 # 训练次数
def run_train(agent, env, rpm):
total_reward = 0
obs = env.reset()
step = 0
while True:
step += 1
# 获取随机动作和执行游戏
action = agent.sample(obs)
next_obs, reward, isOver, _ = env.step(action)
# 记录数据
rpm.append((obs, [action], reward, next_obs, isOver))
# train model
if (len(rpm) > MEMORY_WARMUP_SIZE) and (step % LEARN_FREQ == 0):
(batch_obs, batch_action, batch_reward, batch_next_obs, batch_isOver) = rpm.sample(BATCH_SIZE)
agent.learn(batch_obs, batch_action, batch_reward, batch_next_obs, batch_isOver)
total_reward += reward
obs = next_obs
# 结束游戏
if isOver:
break
return total_reward
# 评估模型
def evaluate(agent, env):
eval_reward = []
for i in range(5):
obs = env.reset()
episode_reward = 0
isOver = False
while not isOver:
action = agent.predict(obs)
env.render()
obs, reward, isOver, _ = env.step(action)
episode_reward += reward
eval_reward.append(episode_reward)
return np.mean(eval_reward)
def main():
# 实例化一个游戏环境,参数为游戏名称
env = gym.make('CartPole-v1')
# 图像输入形状和动作维度
action_dim = env.action_space.n
obs_shape = env.observation_space.shape
# 创建存储执行游戏的内存
rpm = ReplayMemory(MEMORY_SIZE)
# 创建模型
model = Model(act_dim=action_dim)
algorithm = parl.algorithms.DQN(model, act_dim=action_dim, gamma=GAMMA, lr=LEARNING_RATE)
agent = Agent(algorithm=algorithm,
obs_dim=obs_shape[0],
act_dim=action_dim,
e_greed=E_GREED,
e_greed_decrement=E_GREED_DECREMENT)
# 预热
print("开始预热...")
while len(rpm) < MEMORY_WARMUP_SIZE:
run_train(agent, env, rpm)
# 开始训练
print("开始正式训练...")
episode = 0
while episode < MAX_EPISODE:
# 训练
for i in range(0, 50):
train_reward = run_train(agent, env, rpm)
episode += 1
logger.info('Episode: {}, Reward: {:.2f}, e_greed: {:.2f}'.format(episode, train_reward, agent.e_greed))
# 评估
eval_reward = evaluate(agent, env)
logger.info('episode:{} test_reward:{}'.format(episode, eval_reward))
if __name__ == '__main__':
main()
0
收藏
请登录后评论
使用的是gym游戏
赞