首页 > 代码库 > Deep Q-Network 学习笔记(四)—— 改进②:double dqn

Deep Q-Network 学习笔记(四)—— 改进②:double dqn

这篇没搞懂。。。这里只对实现做记录。

修改的地方也只是在上一篇的基础上,在“记忆回放”函数里,计算 target Q 时取值做下调整即可。

 

    def experience_replay(self):        """        记忆回放。        :return:        """        # 检查是否替换 target_net 参数        if self.learn_step_counter % self.network.replace_target_stepper == 0:            self.network.replace_target_params()        # 随机选择一小批记忆样本。        batch = self.BATCH if self.memory_counter > self.BATCH else self.memory_counter        minibatch = random.sample(self.replay_memory_store, batch)        batch_state = None        batch_action = None        batch_reward = None        batch_next_state = None        batch_done = None        for index in range(len(minibatch)):            if batch_state is None:                batch_state = minibatch[index][0]            elif batch_state is not None:                batch_state = np.vstack((batch_state, minibatch[index][0]))            if batch_action is None:                batch_action = minibatch[index][1]            elif batch_action is not None:                batch_action = np.vstack((batch_action, minibatch[index][1]))            if batch_reward is None:                batch_reward = minibatch[index][2]            elif batch_reward is not None:                batch_reward = np.vstack((batch_reward, minibatch[index][2]))            if batch_next_state is None:                batch_next_state = minibatch[index][3]            elif batch_next_state is not None:                batch_next_state = np.vstack((batch_next_state, minibatch[index][3]))            if batch_done is None:                batch_done = minibatch[index][4]            elif batch_done is not None:                batch_done = np.vstack((batch_done, minibatch[index][4]))        q_next = self.network.get_next_q(batch_next_state)        q_eval4next = self.network.get_q(batch_next_state)        # q_eval 得出的最高奖励动作。        max_act4next = np.argmax(q_eval4next, axis=1)        q_target = []        for i in range(len(minibatch)):            # Double DQN 选择 q_next 依据 q_eval 选出的动作。            selected_q_next = q_next[i, max_act4next]            max_q = selected_q_next[0]            # 当前即时得分。            current_reward = batch_reward[i][0]            # # 游戏是否结束。            # current_done = batch_done[i][0]            # 更新 Q 值。            q_value = http://www.mamicode.com/current_reward + self.gamma * max_q            # 当得分小于 -1 时,表示走了不可走的位置。            if current_reward <= -1:                q_target.append(current_reward)            else:                q_target.append(q_value)        self.network.train(batch_state, q_target, batch_action)        self.learn_step_counter += 1

 

完整代码

神经网络部分:

import tensorflow as tfimport numpy as npclass DeepQNetwork:    # q_eval 网络状态输入参数。    q_eval_input = None    # q_eval 网络动作输入参数。    q_action_input = None    # q_eval 网络中 q_target 的输入参数。    q_eval_target = None    # q_eval 网络输出结果。    q_eval_output = None    # q_eval 网络输出的结果中的最优得分。    q_predict = None    # q_eval 网络输出的结果中当前选择的动作得分。    reward_action = None    # q_eval 网络损失函数。    loss = None    # q_eval 网络训练。    train_op = None    # q_target 网络状态输入参数。    q_target_input = None    # q_target 网络输出结果。    q_target_output = None    # 更换 target_net 的步数。    replace_target_stepper = 0    def __init__(self, input_num, output_num, learning_rate=0.001, replace_target_stepper=300, session=None):        self.learning_rate = learning_rate        self.INPUT_NUM = input_num        self.OUTPUT_NUM = output_num        self.replace_target_stepper = replace_target_stepper        self.create()        if session is None:            self.session = tf.InteractiveSession()            self.session.run(tf.initialize_all_variables())    def create(self):        neuro_layer_1 = 3        w_init = tf.random_normal_initializer(0, 0.3)        b_init = tf.constant_initializer(0.1)        # -------------- 创建 eval 神经网络, 及时提升参数 -------------- #        self.q_eval_input = tf.placeholder(shape=[None, self.INPUT_NUM], dtype=tf.float32, name="q_eval_input")        self.q_action_input = tf.placeholder(shape=[None, self.OUTPUT_NUM], dtype=tf.float32)        self.q_eval_target = tf.placeholder(shape=[None], dtype=tf.float32, name="q_target")        with tf.variable_scope("eval_net"):            q_name = [eval_net_params, tf.GraphKeys.GLOBAL_VARIABLES]            with tf.variable_scope(l1):                w1 = tf.get_variable(w1, [self.INPUT_NUM, neuro_layer_1], initializer=w_init, collections=q_name)                b1 = tf.get_variable(b1, [1, neuro_layer_1], initializer=b_init, collections=q_name)                l1 = tf.nn.relu(tf.matmul(self.q_eval_input, w1) + b1)            with tf.variable_scope(l2):                w2 = tf.get_variable(w2, [neuro_layer_1, self.OUTPUT_NUM], initializer=w_init, collections=q_name)                b2 = tf.get_variable(b2, [1, self.OUTPUT_NUM], initializer=b_init, collections=q_name)                self.q_eval_output = tf.matmul(l1, w2) + b2                self.q_predict = tf.argmax(self.q_eval_output, 1)        with tf.variable_scope(loss):            # 取出当前动作的得分。            self.reward_action = tf.reduce_sum(tf.multiply(self.q_eval_output, self.q_action_input),                                               reduction_indices=1)            self.loss = tf.reduce_mean(tf.square((self.q_eval_target - self.reward_action)))        with tf.variable_scope(train):            self.train_op = tf.train.GradientDescentOptimizer(self.learning_rate).minimize(self.loss)        # -------------- 创建 target 神经网络, 及时提升参数 -------------- #        self.q_target_input = tf.placeholder(shape=[None, self.INPUT_NUM], dtype=tf.float32, name="q_target_input")        with tf.variable_scope("target_net"):            t_name = [target_net_params, tf.GraphKeys.GLOBAL_VARIABLES]            with tf.variable_scope(l1):                w1 = tf.get_variable(w1, [self.INPUT_NUM, neuro_layer_1], initializer=w_init, collections=t_name)                b1 = tf.get_variable(b1, [1, neuro_layer_1], initializer=b_init, collections=t_name)                l1 = tf.nn.relu(tf.matmul(self.q_target_input, w1) + b1)            with tf.variable_scope(l2):                w2 = tf.get_variable(w2, [neuro_layer_1, self.OUTPUT_NUM], initializer=w_init, collections=t_name)                b2 = tf.get_variable(b2, [1, self.OUTPUT_NUM], initializer=b_init, collections=t_name)                self.q_target_output = tf.matmul(l1, w2) + b2    def replace_target_params(self):        """        使用 Tensorflow 中的 assign 功能替换 target_net 所有参数。        :return:        """        # 提取 target_net 的参数。        t_params = tf.get_collection(target_net_params)        # 提取 eval_net 的参数。        e_params = tf.get_collection(eval_net_params)        # 更新 target_net 参数。        self.session.run([tf.assign(t, e) for t, e in zip(t_params, e_params)])    def get_q(self, input_data):        return self.session.run(self.q_eval_output, {self.q_eval_input: input_data})    def get_next_q(self, input_data):        return self.session.run(self.q_target_output, {self.q_target_input: input_data})    def get_predict(self, input_data):        return np.max(self.get_q(input_data))    def get_action(self, input_data):        return np.argmax(self.get_q(input_data))    def train(self, input_data, y_, action_input):        _, cost = self.session.run([self.train_op, self.loss],                                   feed_dict={self.q_eval_input: input_data,                                              self.q_action_input: action_input,                                              self.q_eval_target: y_})        return cost

主逻辑实现:

import numpy as npfrom collections import dequeimport randomfrom q_network import DeepQNetworkclass Agent:    r = np.array([[-1, -1, -1, -1, 0, -1],                  [-1, -1, -1, 0, -1, 100.0],                  [-1, -1, -1, 0, -1, -1],                  [-1, 0, 0, -1, 0, -1],                  [0, -1, -1, 1, -1, 100],                  [-1, 0, -1, -1, 0, 100],                  ])    # 神经网络。    network = None    def __init__(self):        # 执行步数。        self.step_index = 0        # 状态数。        self.STATE_NUM = 6        # 动作数。        self.ACTION_NUM = 6        # 记忆上限。        self.memory_size = 5000        # 当前记忆数。        self.memory_counter = 0        # 保存观察到的执行过的行动的存储器,即:曾经经历过的记忆。        self.replay_memory_store = deque()        # 训练之前观察多少步。        self.OBSERVE = 5000        # 训练步数统计。        self.learn_step_counter = 0        # 选取的小批量训练样本数。        self.BATCH = 20        # γ经验折损率。        self.gamma = 0.9        # -------------------- 探索策略 -------------------- #        # epsilon 的最小值,当 epsilon 小于该值时,将不在随机选择行为。        self.FINAL_EPSILON = 0.0001        # epsilon 的初始值,epsilon 逐渐减小。        self.INITIAL_EPSILON = 0.1        # epsilon 衰减的总步数。        self.EXPLORE = 3000000.        # 探索模式计数。        self.epsilon = 0        # -------------------- 探索策略 -------------------- #        # 生成神经网络。        self.network = DeepQNetwork(input_num=self.STATE_NUM,                                    output_num=self.ACTION_NUM,                                    learning_rate=0.001,                                    replace_target_stepper=300,                                    session=None)        # 生成一个状态矩阵(6 X 6),每一行代表一个状态。        self.state_list = np.identity(self.STATE_NUM)        # 生成一个动作矩阵。        self.action_list = np.identity(self.ACTION_NUM)    def select_action(self, current_state_index):        """        根据策略选择动作。        :param current_state_index:        :return:        """        # 获得当前状态。        current_state = self.state_list[current_state_index:current_state_index + 1]        # 根据当前状态获得在 Q 网络中最有价值的动作,并返回动作序号。        current_action_index = self.network.get_action(current_state)        if np.random.uniform() < self.epsilon:            current_action_index = np.random.randint(0, self.ACTION_NUM)        # 开始训练后,在 epsilon 小于一定的值之前,将逐步减小 epsilon。        if self.step_index > self.OBSERVE and self.epsilon > self.FINAL_EPSILON:            self.epsilon -= (self.INITIAL_EPSILON - self.FINAL_EPSILON) / self.EXPLORE        return current_action_index    def save_store(self, current_state_index, current_action_index, current_reward, next_state_index, done):            """            保存记忆。            :param current_state_index: 当前状态 index。            :param current_action_index: 动作 index。            :param current_reward: 奖励。            :param next_state_index: 下一个状态 index。            :param done: 是否结束。            :return:            """            current_state = self.state_list[current_state_index:current_state_index + 1]            current_action = self.action_list[current_action_index:current_action_index + 1]            next_state = self.state_list[next_state_index:next_state_index + 1]            # 记忆动作(当前状态, 当前执行的动作, 当前动作的得分,下一个状态)。            self.replay_memory_store.append((                current_state,                current_action,                current_reward,                next_state,                done))            # 如果超过记忆的容量,则将最久远的记忆移除。            if len(self.replay_memory_store) > self.memory_size:                self.replay_memory_store.popleft()            self.memory_counter += 1    def run_game(self, state_index, action_index):        """        执行动作。        :param state_index: 当前状态。        :param action_index: 执行的动作。        :return:        """        reward = self.r[state_index][action_index]        next_state = action_index        done = False        if action_index == 5:            done = True        return next_state, reward, done    def experience_replay(self):        """        记忆回放。        :return:        """        # 检查是否替换 target_net 参数        if self.learn_step_counter % self.network.replace_target_stepper == 0:            self.network.replace_target_params()        # 随机选择一小批记忆样本。        batch = self.BATCH if self.memory_counter > self.BATCH else self.memory_counter        minibatch = random.sample(self.replay_memory_store, batch)        batch_state = None        batch_action = None        batch_reward = None        batch_next_state = None        batch_done = None        for index in range(len(minibatch)):            if batch_state is None:                batch_state = minibatch[index][0]            elif batch_state is not None:                batch_state = np.vstack((batch_state, minibatch[index][0]))            if batch_action is None:                batch_action = minibatch[index][1]            elif batch_action is not None:                batch_action = np.vstack((batch_action, minibatch[index][1]))            if batch_reward is None:                batch_reward = minibatch[index][2]            elif batch_reward is not None:                batch_reward = np.vstack((batch_reward, minibatch[index][2]))            if batch_next_state is None:                batch_next_state = minibatch[index][3]            elif batch_next_state is not None:                batch_next_state = np.vstack((batch_next_state, minibatch[index][3]))            if batch_done is None:                batch_done = minibatch[index][4]            elif batch_done is not None:                batch_done = np.vstack((batch_done, minibatch[index][4]))
     q_next
= self.network.get_next_q(batch_next_state) q_eval4next = self.network.get_q(batch_next_state) # q_eval 得出的最高奖励动作。 max_act4next = np.argmax(q_eval4next, axis=1) q_target = [] for i in range(len(minibatch)): # Double DQN 选择 q_next 依据 q_eval 选出的动作。 selected_q_next = q_next[i, max_act4next] max_q = selected_q_next[0] # 当前即时得分。 current_reward = batch_reward[i][0] # # 游戏是否结束。 # current_done = batch_done[i][0] # 更新 Q 值。 q_value = http://www.mamicode.com/current_reward + self.gamma * max_q # 当得分小于 -1 时,表示走了不可走的位置。 if current_reward <= -1: q_target.append(current_reward) else: q_target.append(q_value) self.network.train(batch_state, q_target, batch_action) self.learn_step_counter += 1 def train(self): """ 训练。 :return: """ # 初始化当前状态。 current_state = np.random.randint(0, self.ACTION_NUM - 1) self.epsilon = self.INITIAL_EPSILON while True: # 选择动作。 action = self.select_action(current_state) # 执行动作,得到:下一个状态,执行动作的得分,是否结束。 next_state, reward, done = self.run_game(current_state, action) # 保存记忆。 self.save_store(current_state, action, reward, next_state, done) # 先观察一段时间累积足够的记忆在进行训练。 if self.step_index > self.OBSERVE: self.experience_replay() if self.step_index - self.OBSERVE > 15000: break if done: current_state = np.random.randint(0, self.ACTION_NUM - 1) else: current_state = next_state self.step_index += 1 def pay(self): """ 运行并测试。 :return: """ self.train() # 显示 R 矩阵。 print(self.r) for index in range(5): start_room = index print("#############################", "Agent 在", start_room, "开始行动", "#############################") current_state = start_room step = 0 target_state = 5 while current_state != target_state: next_state = self.network.get_action(self.state_list[current_state:current_state + 1]) print("Agent 由", current_state, "号房间移动到了", next_state, "号房间") current_state = next_state step += 1 print("Agent 在", start_room, "号房间开始移动了", step, "步到达了目标房间 5") print("#############################", "Agent 在", 5, "结束行动", "#############################")if __name__ == "__main__": agent = Agent() agent.pay()

 

Deep Q-Network 学习笔记(四)—— 改进②:double dqn