首页 Paddle框架 帖子详情
PARL实现DQN
收藏
快速回复
Paddle框架 问答深度学习炼丹技巧 987 2
PARL实现DQN
收藏
快速回复
Paddle框架 问答深度学习炼丹技巧 987 2

agent.py

import numpy as np
import paddle.fluid as fluid
import parl
from parl import layers


class Agent(parl.Agent):
    def __init__(self,
                 algorithm,
                 obs_dim,
                 act_dim,
                 e_greed=0.1,
                 e_greed_decrement=0):
        assert isinstance(obs_dim, int)
        assert isinstance(act_dim, int)
        # 预测图像的shape
        self.obs_dim = obs_dim
        # 动作组合的数量
        self.act_dim = act_dim
        super(Agent, self).__init__(algorithm)

        self.global_step = 0
        self.update_target_steps = 200

        # 探索衰减参数
        self.e_greed = e_greed
        self.e_greed_decrement = e_greed_decrement

    def build_program(self):
        self.pred_program = fluid.Program()
        self.learn_program = fluid.Program()

        with fluid.program_guard(self.pred_program):
            obs = layers.data(name='obs', shape=[self.obs_dim], dtype='float32')
            self.value = self.alg.predict(obs)

        with fluid.program_guard(self.learn_program):
            obs = layers.data(name='obs', shape=[self.obs_dim], dtype='float32')
            action = layers.data(name='act', shape=[1], dtype='int32')
            reward = layers.data(name='reward', shape=[], dtype='float32')
            next_obs = layers.data(name='next_obs', shape=[self.obs_dim], dtype='float32')
            isOver = layers.data(name='isOver', shape=[], dtype='bool')
            self.cost = self.alg.learn(obs, action, reward, next_obs, isOver)

    # 获取动作
    def sample(self, obs):
        sample = np.random.rand()
        if sample < self.e_greed:
            # 随机生成动作
            act = np.random.randint(self.act_dim)
        else:
            # 预测动作
            act = self.predict(obs)
        self.e_greed = max(0.01, self.e_greed - self.e_greed_decrement)
        return act

    # 预测动作
    def predict(self, obs):
        obs = np.expand_dims(obs, axis=0)
        pred_Q = self.fluid_executor.run(program=self.pred_program,
                                         feed={'obs': obs.astype('float32')},
                                         fetch_list=[self.value])
        pred_Q = np.squeeze(pred_Q)
        action = np.argmax(pred_Q)
        return action

    # 训练模型,在固定训练次数将参数更新到目标模型
    def learn(self, obs, act, reward, next_obs, isOver):
        if self.global_step % self.update_target_steps == 0:
            # 更新目标模型参数
            self.alg.sync_target()
        self.global_step += 1

        feed = {
            'obs': obs.astype('float32'),
            'act': act.astype('int32'),
            'reward': reward,
            'next_obs': next_obs.astype('float32'),
            'isOver': isOver,
        }
        cost = self.fluid_executor.run(self.learn_program, feed=feed, fetch_list=[self.cost])[0]
        return cost

 

model.py

import parl
from parl import layers


class Model(parl.Model):
    def __init__(self, act_dim):
        super().__init__()
        self.fc1 = layers.fc(size=128, act='relu')
        self.fc2 = layers.fc(size=128, act='relu')
        self.fc3 = layers.fc(size=act_dim, act=None)

    def value(self, obs):
        h1 = self.fc1(obs)
        h2 = self.fc2(h1)
        Q = self.fc3(h2)
        return Q

 

replay_memory.py

import random
import collections
import numpy as np


class ReplayMemory(object):
    def __init__(self, max_size):
        self.buffer = collections.deque(maxlen=max_size)

    # 添加数据
    def append(self, exp):
        self.buffer.append(exp)

    # 获取一批数据
    def sample(self, batch_size):
        mini_batch = random.sample(self.buffer, batch_size)
        obs_batch, action_batch, reward_batch, next_obs_batch, isOver_batch = [], [], [], [], []

        for experience in mini_batch:
            s, a, r, s_p, isOver = experience
            obs_batch.append(s)
            action_batch.append(a)
            reward_batch.append(r)
            next_obs_batch.append(s_p)
            isOver_batch.append(isOver)

        return np.array(obs_batch).astype('float32'), \
            np.array(action_batch).astype('float32'), np.array(reward_batch).astype('float32'),\
            np.array(next_obs_batch).astype('float32'), np.array(isOver_batch).astype('float32')

    # 获取当前数据记录的大小
    def __len__(self):
        return len(self.buffer)

 

train.py

import gym
import numpy as np
from parl.utils import logger
import parl
from model import Model
from agent import Agent
from replay_memory import ReplayMemory

LEARN_FREQ = 5  # 更新参数步数
MEMORY_SIZE = 20000  # 内存记忆
MEMORY_WARMUP_SIZE = 200  # 热身大小
BATCH_SIZE = 64  # batch大小
LEARNING_RATE = 0.0005  # 学习率大小
GAMMA = 0.99  # 奖励系数
E_GREED = 0.1  # 探索初始概率
E_GREED_DECREMENT = 1e-6  # 在训练过程中,降低探索的概率
MAX_EPISODE = 10000  # 训练次数


def run_train(agent, env, rpm):
    total_reward = 0
    obs = env.reset()
    step = 0
    while True:
        step += 1
        # 获取随机动作和执行游戏
        action = agent.sample(obs)
        next_obs, reward, isOver, _ = env.step(action)

        # 记录数据
        rpm.append((obs, [action], reward, next_obs, isOver))

        # train model
        if (len(rpm) > MEMORY_WARMUP_SIZE) and (step % LEARN_FREQ == 0):
            (batch_obs, batch_action, batch_reward, batch_next_obs, batch_isOver) = rpm.sample(BATCH_SIZE)
            agent.learn(batch_obs, batch_action, batch_reward, batch_next_obs, batch_isOver)

        total_reward += reward
        obs = next_obs

        # 结束游戏
        if isOver:
            break
    return total_reward


# 评估模型
def evaluate(agent, env):
    eval_reward = []
    for i in range(5):
        obs = env.reset()
        episode_reward = 0
        isOver = False
        while not isOver:
            action = agent.predict(obs)
            env.render()
            obs, reward, isOver, _ = env.step(action)
            episode_reward += reward
        eval_reward.append(episode_reward)
    return np.mean(eval_reward)


def main():
    # 实例化一个游戏环境,参数为游戏名称
    env = gym.make('CartPole-v1')

    # 图像输入形状和动作维度
    action_dim = env.action_space.n
    obs_shape = env.observation_space.shape

    # 创建存储执行游戏的内存
    rpm = ReplayMemory(MEMORY_SIZE)

    # 创建模型
    model = Model(act_dim=action_dim)
    algorithm = parl.algorithms.DQN(model, act_dim=action_dim, gamma=GAMMA, lr=LEARNING_RATE)
    agent = Agent(algorithm=algorithm,
                  obs_dim=obs_shape[0],
                  act_dim=action_dim,
                  e_greed=E_GREED,
                  e_greed_decrement=E_GREED_DECREMENT)

    # 预热
    print("开始预热...")
    while len(rpm) < MEMORY_WARMUP_SIZE:
        run_train(agent, env, rpm)

    # 开始训练
    print("开始正式训练...")
    episode = 0
    while episode < MAX_EPISODE:
        # 训练
        for i in range(0, 50):
            train_reward = run_train(agent, env, rpm)
            episode += 1
            logger.info('Episode: {}, Reward: {:.2f}, e_greed: {:.2f}'.format(episode, train_reward, agent.e_greed))

        # 评估
        eval_reward = evaluate(agent, env)
        logger.info('episode:{}    test_reward:{}'.format(episode, eval_reward))


if __name__ == '__main__':
    main()

 

0
收藏
回复
全部评论(2)
时间顺序
夜雨飘零1
#2 回复于2020-11

使用的是gym游戏

0
回复
thinc
#3 回复于2020-11

0
回复
需求/bug反馈?一键提issue告诉我们
发现bug?如果您知道修复办法,欢迎提pr直接参与建设飞桨~
在@后输入用户全名并按空格结束,可艾特全站任一用户