强化学习整理

原创
03/14 06:27
阅读数 1.3K

价值函数估计(Value Function Approximation)

状态和行动

我们用Q(S,A)来表示value action function,在有限个状态(state)和行动(action)的情况下,我们其实等价于在不断维护一个Q-table,不断更新,直至其收敛。

  \(a_1\) \(a_2\) ... \(a_m\)
\(s_1\) \(Q(s_1,a_1)\) \(Q(s_1,a_2)\) ... \(Q(s_1,a_m)\)
\(s_2\) \(Q(s_2,a_1)\) \(Q(s_2,a_2)\) ... \(Q(s_2,a_m)\)
\(s_3\) \(Q(s_3,a_1)\) \(Q(s_3,a_2)\) ... \(Q(s_3,a_m)\)
... ... ... ... ...
\(s_n\) \(Q(s_n,a_1)\) \(Q(s_n,a_2)\) ... \(Q(s_n,a_m)\)

举例来说

这里白色的圈表示状态,黑色的圈表示行动,那么我们的初始状态\(s_1\)为"考试前",这个时候有两个行动可供选择"玩游戏"\(a_1\)和"复习"\(a_2\),如果我们选择\(a_1\),则状态就可以更新为"愉快"\(s_2=Q(s_1,a_1)\);如果选择\(a_2\),状态就会变更为"悲伤"\(s_3=Q(s_1,a_2)\)。然后的动作是"考试"\(a_3\),由\(s_2\)的途径,则变为"挂科"\(s_4=Q(s_2,a_3)\),由\(s_3\)的途径,则变为"及格"\(s_5=Q(s_3,a_3)\)

但是这个路径并不是绝对的

这里每一个状态都会有一个价值,由游戏得到的愉快,这里的价值为10,由复习到悲伤,价值为-20,由考试到挂科,价值为-100,由考试到及格,价值为200。所以在第一张图中的第一条链路,最终价值为-90,第二条链路的最终价值为180。那么这里\(Q(s_1,a_1)=10\)\(Q(s_2,a_3)=-100\)\(Q(s_1,a_2)=-20\)\(Q(s_3,a_3)=200\)

状态和价值公式

\(Q_π(s,a)=R_s^a+γ\sum_{s'}P_{ss'}^av_π(s')\)

这里\(Q_π(s,a)\)表示价值v,\(R_s^a\)表示奖励,γ为折扣率,\(P_{ss'}^a\)表示状态转移概率,\(v_π(s')\)表示后续价值Q。

举例来说,\(Q_π(s,a)\)中的s为你是一名高中生,a为考上大学,\(R_s^a\)为即时奖励,你爸给了你1000块,s'为后续的状态,比如读研、工作、出国;\(P_{ss'}^a\)为你从大学生到后续状态(读研、工作、出国)的可能的概率,\(v_π(s')\)就是你变更为后续状态的价值。γ≤1,表示之后的状态对当前的状态的影响会逐步衰减。这其实就是一个马尔科夫过程,有关马尔科夫链的内容可以参考概率论整理(三) 中的马尔科夫过程

Q-learning算法

边探索边学习,经过一些算法,已经有了一个Q表,该表是通过不断的探索环境得来的,它比较准,但是又没有那么精确。

上式的random action是选动作的过程。它会有一定的概率ε进行探索,不取下一步动作的最大价值,不探索的时候即利用(概率1-ε),取下一步动作的最大价值。

RL算法的最终目标是学习每种状态下最优的动作,而在训练过程中,收敛(到最优策略\(π^*\))前的当前策略π并非最优,所以它提供的动作并非最优,为了找到动作空间里潜在的最优动作,算法必须尝试当前策略π认为的非最优动作,因此RL算法中的策略需要有随机探索(Exploration)的能力。

学习策略

\(Q(S,A)\ <-\ Q(S,A)+α[R+γmax_aQ(S',a)-Q(S,A)]\)

上式中Q(S,A)表示当前状态为S,做了一个动作A所得到的价值,R是动作A本身的奖励。做完动作A后,状态更新为S',a是一个动作集合,它包括了很多的动作(\(a_1,a_2,...,a_n\)),\(max_aQ(S',a)\)表示状态S'做这所有动作的价值最大值,γ依然为折扣率,差值\(R+γmax_aQ(S',a)-Q(S,A)\)是对Q(S,A)的修正,如果该差值为正,表示该路径比较好,可以增大Q(S,A);如果该差值为负,表示该路径不是很好,会减少Q(S,A)。α为衰减因子。

Q-learning是一种非常乐观的估计算法

示例场景

初始状态:s1(成绩优秀,有创业想法但缺资金)
可选动作:a1(考研)或 a2(创业)

​步骤1:选择动作(探索与利用)​

  • ​ε-greedy策略:假设以90%概率选择当前最优动作(利用),10%随机尝试(探索)。
    • 若初始Q值均为0,可能随机选择创业(探索)。

​步骤2:执行动作并观察结果

  • 选择 a2(创业),进入状态 s4(创业初期,面临市场风险)。
  • 获得即时奖励 R=50(初期亏损)。

​步骤3:更新Q值

  • 假设学习率 α=0.5,折扣因子 γ=0.8
  • 新状态 s4 的最大Q值:假设 maxaQ(s4,a)=60(例如继续坚持创业可能未来收益高)。
  • ​计算更新:Q(s1,a2)=0+0.5[50+0.8×600]=0.5×(50+48)=1
  • 结果:Q(s1,a2)=1,说明在 s1 下创业的预期收益下降。

​步骤4:后续决策调整

  • 下一次处于 s1 时,由于 Q(s1,a2)=1,而 Q(s1,a1)=0,大概率选择考研(利用)。

​3. 长期策略形成

通过多次尝试,Q值逐步收敛到稳定策略:

状态(s) 最优动作(a) 最终Q值 解释
s1 考研 (a1) +70 短期风险低,长期收益稳定
s2 创业 (a2) +90 家庭支持降低风险,成功概率高
s3 深耕科研 +60 学术领域积累带来持续收益
s4 坚持创业 +80 初期投入后,市场逐渐打开

Q-learning棋盘格找出口代码样例 

import matplotlib.pyplot as plt

import numpy as np
import random
import copy


class GridEnvironment:
    def __init__(self):
        # 初始化10x10的棋盘
        self.grid_size = 10
        self.grid = np.zeros((self.grid_size, self.grid_size))

        # 初始位置
        self.start_position = (0, 0)
        self.current_position = self.start_position

        # 随机生成障碍物和出口位置
        self.generate_random_positions()

        # Action space: Up(0), Right(1), Down(2), Left(3)
        self.actions = [(-1, 0), (0, 1), (1, 0), (0, -1)]
        self.action_names = ["Up", "Right", "Down", "Left"]

        # 修改奖励设置,增强正向激励
        self.obstacle_reward = -100
        self.exit_reward = 200  # 增加出口奖励
        self.step_reward = -0.1  # 进一步减小每步惩罚

    def generate_random_positions(self):
        # 所有可能的位置(除了起始位置)
        all_positions = [(i, j) for i in range(self.grid_size) for j in range(self.grid_size) if
                         (i, j) != self.start_position]

        # 筛选出离起点有一定距离的位置(使用曼哈顿距离)
        min_distance = 5  # 设置最小距离阈值
        distant_positions = [pos for pos in all_positions
                             if self.calculate_distance(self.start_position, pos) >= min_distance]

        # 如果没有足够的远距离位置,则使用所有可用位置
        if len(distant_positions) < 2:
            distant_positions = all_positions

        # 随机选择2个位置作为障碍物
        self.obstacles = random.sample(distant_positions, 2)

        # 从剩余位置中选择一个作为出口,优先选择远离起点的位置
        remaining_positions = [pos for pos in all_positions if pos not in self.obstacles]
        # 按照与起点的距离对剩余位置进行排序(从远到近)
        remaining_positions.sort(key=lambda pos: self.calculate_distance(self.start_position, pos), reverse=True)
        # 从前半部分选择出口位置(确保出口也相对远离起点)
        exit_candidates = remaining_positions[:len(remaining_positions) // 2]
        self.exit = random.choice(exit_candidates)

        # 更新网格
        self.grid = np.zeros((self.grid_size, self.grid_size))
        for obs in self.obstacles:
            self.grid[obs] = -1
        self.grid[self.exit] = 1

    def reset(self):
        # 重置智能体位置到起点
        self.current_position = self.start_position

        # 训练时保持环境一致,不要每次都重新生成随机位置
        # 只在初始化时生成一次随机位置
        # self.generate_random_positions()  # 注释掉这一行

        return self.current_position

    def calculate_distance(self, pos1, pos2):
        return abs(pos1[0] - pos2[0]) + abs(pos1[1] - pos2[1])  # 曼哈顿距离

    def step(self, action):
        # 保存移动前的位置和到出口的距离
        prev_position = self.current_position
        prev_distance = self.calculate_distance(prev_position, self.exit)

        # 执行原有的移动逻辑
        row, col = self.current_position
        d_row, d_col = self.actions[action]

        # 计算新位置
        new_row = row + d_row
        new_col = col + d_col

        # 检查是否超出边界
        if new_row < 0 or new_row >= self.grid_size or new_col < 0 or new_col >= self.grid_size:
            # 如果超出边界,保持原位置不变,给予惩罚,并结束游戏
            return self.current_position, self.obstacle_reward, True  # 撞到围栏也算失败

        # 更新位置
        self.current_position = (new_row, new_col)

        # 检查是否撞到障碍物
        if self.current_position in self.obstacles:
            return self.current_position, self.obstacle_reward, True

        # 检查是否到达出口
        if self.current_position == self.exit:
            return self.current_position, self.exit_reward, True

        # 计算新的距离
        new_distance = self.calculate_distance(self.current_position, self.exit)

        # 基于距离变化的奖励,增强正向激励
        distance_reward = 0
        if new_distance < prev_distance:
            distance_reward = 2  # 增加朝出口方向移动的奖励
        elif new_distance > prev_distance:
            distance_reward = -1  # 保持远离出口方向的惩罚

        # 返回新状态、奖励和是否结束
        return self.current_position, self.step_reward + distance_reward, False

    def render(self, q_table=None, path=None):
        # 可视化棋盘
        fig, ax = plt.subplots(figsize=(8, 8))

        # 创建颜色映射
        cmap = plt.cm.colors.ListedColormap(['white', 'yellow', 'black', 'red'])
        bounds = [-0.5, 0.5, 1.5, 2.5, 3.5]
        norm = plt.cm.colors.BoundaryNorm(bounds, cmap.N)

        # 复制棋盘用于显示
        display_grid = self.grid.copy()

        # 标记当前位置为红色
        if path:
            for pos in path:
                if pos != self.exit:  # 不覆盖出口
                    display_grid[pos] = 2
        else:
            display_grid[self.current_position] = 2

        # 显示棋盘
        ax.imshow(display_grid, cmap=cmap, norm=norm)

        # 添加网格线
        ax.grid(which='major', axis='both', linestyle='-', color='k', linewidth=2)
        ax.set_xticks(np.arange(-0.5, self.grid_size, 1))
        ax.set_yticks(np.arange(-0.5, self.grid_size, 1))

        # 隐藏刻度标签
        ax.set_xticklabels([])
        ax.set_yticklabels([])

        # 如果提供了Q表,显示最佳动作
        if q_table is not None:
            for i in range(self.grid_size):
                for j in range(self.grid_size):
                    if (i, j) in self.obstacles or (i, j) == self.exit:
                        continue

                    # 获取该状态的最佳动作
                    state = (i, j)
                    if state in q_table:
                        best_action = np.argmax(q_table[state])

                        # 在格子中心绘制箭头
                        dx, dy = self.actions[best_action]
                        ax.arrow(j, i, dy * 0.3, dx * 0.3, head_width=0.2, head_length=0.2, fc='blue', ec='blue')

        plt.title("红色:当前位置,黑色:障碍物,黄色:出口")
        plt.tight_layout()
        plt.show()


class QLearningAgent:
    def __init__(self, env, learning_rate=0.1, discount_factor=0.95, exploration_rate=1.0, exploration_decay=0.997):
        # 增加折扣因子,减缓探索率衰减
        self.env = env
        self.learning_rate = learning_rate
        self.discount_factor = discount_factor  # 增加折扣因子
        self.exploration_rate = exploration_rate
        self.exploration_decay = exploration_decay  # 减缓探索率衰减

        # 初始化Q表为空字典
        self.q_table = {}

    def get_q_value(self, state, action):
        # 如果状态-动作对不在Q表中,初始化为0
        if state not in self.q_table:
            self.q_table[state] = np.zeros(len(self.env.actions))
        return self.q_table[state][action]

    def choose_action(self, state):
        # 使用ε-greedy策略选择动作
        if random.uniform(0, 1) < self.exploration_rate:
            # 探索:随机选择动作
            return random.randint(0, len(self.env.actions) - 1)
        else:
            # 利用:选择Q值最大的动作
            if state not in self.q_table:
                self.q_table[state] = np.zeros(len(self.env.actions))
            return np.argmax(self.q_table[state])

    def update_q_value(self, state, action, reward, next_state):
        # 更新Q        if next_state not in self.q_table:
            self.q_table[next_state] = np.zeros(len(self.env.actions))

        # Q-learning更新公式
        current_q = self.get_q_value(state, action)
        max_next_q = np.max(self.q_table[next_state])
        new_q = current_q + self.learning_rate * (reward + self.discount_factor * max_next_q - current_q)

        # 更新Q        if state not in self.q_table:
            self.q_table[state] = np.zeros(len(self.env.actions))
        self.q_table[state][action] = new_q

    def decay_exploration(self):
        # 衰减探索率
        self.exploration_rate *= self.exploration_decay


def print_q_table(q_table, env):
    """打印Q表中的所有值"""
    print("\n===== Q表的所有值 =====")
    print(f"状态数量: {len(q_table)}")
    print("格式: 状态 -> [, , , ] (最佳动作标记为 *)")
    print("-" * 50)

    # 按状态坐标排序
    sorted_states = sorted(q_table.keys())

    for state in sorted_states:
        q_values = q_table[state]
        best_action = np.argmax(q_values)

        # 创建带有最佳动作标记的Q值字符串
        q_values_str = "["
        for i, q_val in enumerate(q_values):
            if i == best_action:
                q_values_str += f"{q_val:.2f}* "
            else:
                q_values_str += f"{q_val:.2f} "
        q_values_str = q_values_str.strip() + "]"

        # 打印状态和对应的Q        print(f"状态 {state} -> {q_values_str} (最佳动作: {env.action_names[best_action]})")

    print("-" * 50)


def train(agent, episodes=5000):
    # 训练智能体,默认训练5000个回合
    rewards_per_episode = []  # 创建列表存储每个回合的累计奖励
    success_count = 0  # 记录成功到达出口的次数
    best_q_table = None  # 保存训练过程中表现最佳的Q    best_success_rate = 0  # 记录最佳成功率
    best_env_config = None  # 保存产生最佳结果的环境配置
    best_episode = 0  # 记录产生最佳策略的训练回合数

    # 为训练生成固定的环境布局
    agent.env.generate_random_positions()  # 随机生成障碍物和出口位置
    print(f"Training Environment - Obstacles: {agent.env.obstacles}, Exit: {agent.env.exit}")  # 打印环境配置信息

    # 保存初始环境配置,用于后续恢复
    initial_env_config = {
        'obstacles': agent.env.obstacles.copy(),
        'exit': agent.env.exit
    }

    for episode in range(episodes):  # 开始训练循环
        state = agent.env.reset()  # 重置环境并获取初始状态
        total_reward = 0  # 初始化当前回合的累计奖励
        done = False  # 初始化回合结束标志
        steps = 0  # 初始化步数计数器

        while not done and steps < 300:  # 单回合循环,直到结束或达到最大步数
            action = agent.choose_action(state)  # 智能体根据当前状态选择动作
            next_state, reward, done = agent.env.step(action)  # 执行动作并获取结果

            agent.update_q_value(state, action, reward, next_state)  # 更新Q值表

            state = next_state  # 更新当前状态
            total_reward += reward  # 累加奖励
            steps += 1  # 增加步数计数

        if done and state == agent.env.exit:  # 检查是否成功到达出口
            success_count += 1  # 增加成功计数

        agent.decay_exploration()  # 衰减探索率,减少随机探索的概率
        rewards_per_episode.append(total_reward)  # 记录当前回合的累计奖励

        if (episode + 1) % 100 == 0:  # 100个回合评估一次性能
            current_success_rate = success_count / 100 if episode >= 99 else success_count / (episode + 1)  # 计算当前成功率
            print(
                f"Episode {episode + 1}/{episodes}, Average Reward: {np.mean(rewards_per_episode[-100:]):.2f}, Success Rate: {current_success_rate:.2f}, Exploration Rate: {agent.exploration_rate:.4f}")  # 打印训练进度

            if current_success_rate > best_success_rate:  # 检查是否找到更好的策略
                best_success_rate = current_success_rate  # 更新最佳成功率
                best_q_table = copy.deepcopy(agent.q_table)  # 保存当前Q表为最佳Q                best_env_config = initial_env_config  # 保存环境配置
                best_episode = episode + 1  # 记录最佳回合数
                print(
                    f"Found Better Strategy! Success Rate: {best_success_rate:.2f}, Training Episode: {best_episode}")  # 打印发现更好策略的信息

            success_count = 0  # 重置成功计数,准备下一个100回合的评估

    print(
        f"Training Complete! Best Success Rate: {best_success_rate:.2f}, Achieved at Episode {best_episode}")  # 训练完成后打印最佳结果

    # 恢复最佳环境配置
    agent.env.obstacles = best_env_config['obstacles']  # 恢复障碍物配置
    agent.env.exit = best_env_config['exit']  # 恢复出口位置
    agent.env.grid = np.zeros((agent.env.grid_size, agent.env.grid_size))  # 重新初始化环境网格
    for obs in agent.env.obstacles:  # 遍历所有障碍物
        agent.env.grid[obs] = -1  # 在网格中标记障碍物位置
    agent.env.grid[agent.env.exit] = 1  # 在网格中标记出口位置

    # 打印最佳Q表的所有值
    print_q_table(best_q_table, agent.env)

    return best_q_table, rewards_per_episode, best_env_config, best_episode  # 返回训练结果


def test(env, q_table, env_config=None, best_episode=None):
    if env_config:
        env.obstacles = env_config['obstacles']
        env.exit = env_config['exit']
        env.grid = np.zeros((env.grid_size, env.grid_size))
        for obs in env.obstacles:
            env.grid[obs] = -1
        env.grid[env.exit] = 1
        print(f"Using Best Strategy Environment - Obstacles: {env.obstacles}, Exit: {env.exit}")
    else:
        env.generate_random_positions()
        print(f"Test Environment - Obstacles: {env.obstacles}, Exit: {env.exit}")

    # 使用训练好的Q表进行测试
    state = env.reset()
    done = False
    steps = 0
    total_reward = 0
    path = [state]
    game_result = "进行中"  # 用于跟踪游戏结果

    # 创建动画效果所需的图形对象
    fig, ax = plt.subplots(figsize=(8, 8))
    plt.ion()  # 打开交互模式

    # 创建颜色映射 - 修改颜色以更好地区分不同元素
    cmap = plt.cm.colors.ListedColormap(['white', 'yellow', 'gray', 'red', 'lightblue'])
    bounds = [-0.5, 0.5, 1.5, 2.5, 3.5, 4.5]
    norm = plt.cm.colors.BoundaryNorm(bounds, cmap.N)

    # 初始化图像
    display_grid = env.grid.copy()
    display_grid[state] = 2  # 标记当前位置为红色

    # 确保障碍物显示为灰色
    for obs in env.obstacles:
        display_grid[obs] = 2  # 使用灰色显示障碍物

    img = ax.imshow(display_grid, cmap=cmap, norm=norm)

    # 添加网格线
    ax.grid(which='major', axis='both', linestyle='-', color='k', linewidth=2)
    ax.set_xticks(np.arange(-0.5, env.grid_size, 1))
    ax.set_yticks(np.arange(-0.5, env.grid_size, 1))

    # 隐藏刻度标签
    ax.set_xticklabels([])
    ax.set_yticklabels([])

    # Modify legend colors and labels
    legend_elements = [
        plt.Rectangle((0, 0), 1, 1, color='white', label='Empty'),
        plt.Rectangle((0, 0), 1, 1, color='yellow', label='Exit'),
        plt.Rectangle((0, 0), 1, 1, color='gray', label='Obstacle'),
        plt.Rectangle((0, 0), 1, 1, color='red', label='Current'),
        plt.Rectangle((0, 0), 1, 1, color='lightblue', label='Path')
    ]
    ax.legend(handles=legend_elements, loc='upper center', bbox_to_anchor=(0.5, -0.05), ncol=5)

    plt.title(f"Q-learning Agent (Best Strategy from Episode {best_episode})")
    plt.tight_layout()
    plt.draw()
    plt.pause(0.5)  # 初始暂停,让用户看清起始状态

    while not done and steps < 100:
        if state not in q_table:
            action = random.randint(0, len(env.actions) - 1)
        else:
            action = np.argmax(q_table[state])

        print(f"Position: {state}, Action: {env.action_names[action]}")
        next_state, reward, done = env.step(action)

        state = next_state
        path.append(state)
        total_reward += reward
        steps += 1

        # 更新动画 - 修改显示逻辑
        display_grid = env.grid.copy()

        # 首先确保障碍物显示
        for obs in env.obstacles:
            display_grid[obs] = 2  # 使用灰色显示障碍物

        # 然后显示路径
        for pos in path[:-1]:
            if pos != env.exit and pos not in env.obstacles:  # 不覆盖出口和障碍物
                display_grid[pos] = 3  # 使用浅蓝色标记路径

        # 最后显示当前位置
        display_grid[state] = 2  # 标记当前位置为红色

        img.set_data(display_grid)
        plt.draw()
        plt.pause(0.3)  # 控制动画速度

        # 判断游戏结果
        if done:
            if state == env.exit:
                game_result = "Success"
            else:
                game_result = "Failure"

    # 显示最终结果
    if game_result == "Success":
        plt.title(
            f"Successfully Reached Exit! Steps: {steps}, Total Reward: {total_reward:.2f}\n(Best Strategy from Episode {best_episode})")
    elif game_result == "Failure":
        if state in env.obstacles:
            plt.title(
                f"Hit Obstacle, Game Over. Steps: {steps}, Total Reward: {total_reward:.2f}\n(Best Strategy from Episode {best_episode})")
        else:
            plt.title(
                f"Hit Wall, Game Over. Steps: {steps}, Total Reward: {total_reward:.2f}\n(Best Strategy from Episode {best_episode})")
    else:
        plt.title(
            f"Failed to Reach Exit Within Step Limit. Steps: {steps}, Total Reward: {total_reward:.2f}\n(Best Strategy from Episode {best_episode})")

    plt.ioff()  # 关闭交互模式
    plt.show()  # 显示最终结果

    return path, game_result


if __name__ == "__main__":
    # Create environment and agent
    env = GridEnvironment()
    agent = QLearningAgent(env)

    # Train agent (no display)
    print("Starting Training...")
    q_table, rewards, best_env_config, best_episode = train(agent, episodes=5000)
    print("Training Complete!")

    # Test agent (using best strategy configuration)
    print("Starting Test...")
    path, result = test(env, q_table, best_env_config, best_episode)

运行结果

Starting Training...
Training Environment - Obstacles: [(9, 1), (9, 6)], Exit: (5, 6)
Episode 100/5000, Average Reward: -53.59, Success Rate: 0.13, Exploration Rate: 0.7405
Found Better Strategy! Success Rate: 0.13, Training Episode: 100
Episode 200/5000, Average Reward: 18.45, Success Rate: 0.36, Exploration Rate: 0.5483
Found Better Strategy! Success Rate: 0.36, Training Episode: 200
Episode 300/5000, Average Reward: 62.07, Success Rate: 0.50, Exploration Rate: 0.4060
Found Better Strategy! Success Rate: 0.50, Training Episode: 300
Episode 400/5000, Average Reward: 131.78, Success Rate: 0.72, Exploration Rate: 0.3007
Found Better Strategy! Success Rate: 0.72, Training Episode: 400
Episode 500/5000, Average Reward: 144.08, Success Rate: 0.76, Exploration Rate: 0.2226
Found Better Strategy! Success Rate: 0.76, Training Episode: 500
Episode 600/5000, Average Reward: 197.80, Success Rate: 0.93, Exploration Rate: 0.1649
Found Better Strategy! Success Rate: 0.93, Training Episode: 600
Episode 700/5000, Average Reward: 184.52, Success Rate: 0.89, Exploration Rate: 0.1221
Episode 800/5000, Average Reward: 206.69, Success Rate: 0.96, Exploration Rate: 0.0904
Found Better Strategy! Success Rate: 0.96, Training Episode: 800
Episode 900/5000, Average Reward: 206.64, Success Rate: 0.96, Exploration Rate: 0.0669
Episode 1000/5000, Average Reward: 203.39, Success Rate: 0.95, Exploration Rate: 0.0496
Episode 1100/5000, Average Reward: 215.98, Success Rate: 0.99, Exploration Rate: 0.0367
Found Better Strategy! Success Rate: 0.99, Training Episode: 1100
Episode 1200/5000, Average Reward: 212.69, Success Rate: 0.98, Exploration Rate: 0.0272
Episode 1300/5000, Average Reward: 212.77, Success Rate: 0.98, Exploration Rate: 0.0201
Episode 1400/5000, Average Reward: 219.08, Success Rate: 1.00, Exploration Rate: 0.0149
Found Better Strategy! Success Rate: 1.00, Training Episode: 1400
Episode 1500/5000, Average Reward: 215.88, Success Rate: 0.99, Exploration Rate: 0.0110
Episode 1600/5000, Average Reward: 215.87, Success Rate: 0.99, Exploration Rate: 0.0082
Episode 1700/5000, Average Reward: 219.03, Success Rate: 1.00, Exploration Rate: 0.0061
Episode 1800/5000, Average Reward: 219.00, Success Rate: 1.00, Exploration Rate: 0.0045
Episode 1900/5000, Average Reward: 219.02, Success Rate: 1.00, Exploration Rate: 0.0033
Episode 2000/5000, Average Reward: 215.84, Success Rate: 0.99, Exploration Rate: 0.0025
Episode 2100/5000, Average Reward: 219.00, Success Rate: 1.00, Exploration Rate: 0.0018
Episode 2200/5000, Average Reward: 219.02, Success Rate: 1.00, Exploration Rate: 0.0013
Episode 2300/5000, Average Reward: 219.01, Success Rate: 1.00, Exploration Rate: 0.0010
Episode 2400/5000, Average Reward: 219.00, Success Rate: 1.00, Exploration Rate: 0.0007
Episode 2500/5000, Average Reward: 219.00, Success Rate: 1.00, Exploration Rate: 0.0005
Episode 2600/5000, Average Reward: 219.01, Success Rate: 1.00, Exploration Rate: 0.0004
Episode 2700/5000, Average Reward: 219.00, Success Rate: 1.00, Exploration Rate: 0.0003
Episode 2800/5000, Average Reward: 219.00, Success Rate: 1.00, Exploration Rate: 0.0002
Episode 2900/5000, Average Reward: 219.00, Success Rate: 1.00, Exploration Rate: 0.0002
Episode 3000/5000, Average Reward: 219.00, Success Rate: 1.00, Exploration Rate: 0.0001
Episode 3100/5000, Average Reward: 219.00, Success Rate: 1.00, Exploration Rate: 0.0001
Episode 3200/5000, Average Reward: 219.00, Success Rate: 1.00, Exploration Rate: 0.0001
Episode 3300/5000, Average Reward: 219.00, Success Rate: 1.00, Exploration Rate: 0.0000
Episode 3400/5000, Average Reward: 219.00, Success Rate: 1.00, Exploration Rate: 0.0000
Episode 3500/5000, Average Reward: 219.00, Success Rate: 1.00, Exploration Rate: 0.0000
Episode 3600/5000, Average Reward: 219.00, Success Rate: 1.00, Exploration Rate: 0.0000
Episode 3700/5000, Average Reward: 219.00, Success Rate: 1.00, Exploration Rate: 0.0000
Episode 3800/5000, Average Reward: 219.00, Success Rate: 1.00, Exploration Rate: 0.0000
Episode 3900/5000, Average Reward: 219.00, Success Rate: 1.00, Exploration Rate: 0.0000
Episode 4000/5000, Average Reward: 219.00, Success Rate: 1.00, Exploration Rate: 0.0000
Episode 4100/5000, Average Reward: 219.00, Success Rate: 1.00, Exploration Rate: 0.0000
Episode 4200/5000, Average Reward: 219.00, Success Rate: 1.00, Exploration Rate: 0.0000
Episode 4300/5000, Average Reward: 219.00, Success Rate: 1.00, Exploration Rate: 0.0000
Episode 4400/5000, Average Reward: 219.00, Success Rate: 1.00, Exploration Rate: 0.0000
Episode 4500/5000, Average Reward: 219.00, Success Rate: 1.00, Exploration Rate: 0.0000
Episode 4600/5000, Average Reward: 219.00, Success Rate: 1.00, Exploration Rate: 0.0000
Episode 4700/5000, Average Reward: 219.00, Success Rate: 1.00, Exploration Rate: 0.0000
Episode 4800/5000, Average Reward: 219.00, Success Rate: 1.00, Exploration Rate: 0.0000
Episode 4900/5000, Average Reward: 219.00, Success Rate: 1.00, Exploration Rate: 0.0000
Episode 5000/5000, Average Reward: 219.00, Success Rate: 1.00, Exploration Rate: 0.0000
Training Complete! Best Success Rate: 1.00, Achieved at Episode 1400

===== Q表的所有值 =====
状态数量: 81
格式: 状态 -> [上, 右, 下, 左] (最佳动作标记为 *)
--------------------------------------------------
状态 (0, 0) -> [19.04 135.00* 118.49 9.83] (最佳动作: Right)
状态 (0, 1) -> [25.52 131.85 140.10* 122.45] (最佳动作: Down)
状态 (0, 2) -> [-38.65 6.17 145.32* 41.33] (最佳动作: Down)
状态 (0, 3) -> [-34.16 0.82 6.31 43.20*] (最佳动作: Left)
状态 (0, 4) -> [-18.90 1.04* 0.81 -0.12] (最佳动作: Right)
状态 (0, 5) -> [-9.98 0.19 0.91* -0.21] (最佳动作: Down)
状态 (0, 6) -> [-10.00 0.00* 0.00 -0.17] (最佳动作: Right)
状态 (0, 7) -> [0.00 0.00 0.00 0.19*] (最佳动作: Left)
状态 (1, 0) -> [35.70 139.16* 2.04 -67.16] (最佳动作: Right)
状态 (1, 1) -> [118.94 145.47* 136.74 113.84] (最佳动作: Right)
状态 (1, 2) -> [130.77 103.40 151.13* 129.66] (最佳动作: Down)
状态 (1, 3) -> [4.81 5.23 139.04* 38.92] (最佳动作: Down)
状态 (1, 4) -> [-0.31 0.84 19.22* 1.23] (最佳动作: Down)
状态 (1, 5) -> [-0.13 1.75* 0.19 0.00] (最佳动作: Right)
状态 (1, 6) -> [-0.11 -0.16 6.02* -0.06] (最佳动作: Down)
状态 (1, 7) -> [-0.11 0.00 0.83* 0.35] (最佳动作: Down)
状态 (1, 8) -> [0.00 0.00 0.00 0.24*] (最佳动作: Left)
状态 (1, 9) -> [0.00* -10.00 0.00 0.00] (最佳动作: Up)
状态 (2, 0) -> [1.05 0.23 3.93* -34.04] (最佳动作: Down)
状态 (2, 1) -> [14.84 150.97* 35.24 0.15] (最佳动作: Right)
状态 (2, 2) -> [140.09 143.45 157.08* 135.89] (最佳动作: Down)
状态 (2, 3) -> [42.93 27.80 162.34* 30.06] (最佳动作: Down)
状态 (2, 4) -> [1.17 11.99 87.36* 1.54] (最佳动作: Down)
状态 (2, 5) -> [-0.08 1.89 60.05* -0.03] (最佳动作: Down)
状态 (2, 6) -> [-0.19 -0.24 64.05* -0.07] (最佳动作: Down)
状态 (2, 7) -> [-0.22 -0.21 2.88 4.04*] (最佳动作: Left)
状态 (2, 8) -> [-0.11 -0.11 0.00* 0.00] (最佳动作: Down)
状态 (2, 9) -> [-0.11 0.00 0.19* 0.00] (最佳动作: Down)
状态 (3, 0) -> [-0.07 0.96 18.12* -51.35] (最佳动作: Down)
状态 (3, 1) -> [21.91 152.99* 34.25 1.59] (最佳动作: Right)
状态 (3, 2) -> [144.48 151.30 163.35* 128.65] (最佳动作: Down)
状态 (3, 3) -> [66.95 74.46 169.94* 101.86] (最佳动作: Down)
状态 (3, 4) -> [15.17 39.30 174.45* 53.51] (最佳动作: Down)
状态 (3, 5) -> [1.88 54.69 175.77* 3.32] (最佳动作: Down)
状态 (3, 6) -> [5.24 8.85 188.04* 2.89] (最佳动作: Down)
状态 (3, 7) -> [0.04 -0.19 51.96* 0.00] (最佳动作: Down)
状态 (3, 8) -> [0.00 -0.09 0.38* 0.19] (最佳动作: Down)
状态 (3, 9) -> [-0.09 0.00 0.19* 0.00] (最佳动作: Down)
状态 (4, 0) -> [0.05 70.01* 0.59 -36.58] (最佳动作: Right)
状态 (4, 1) -> [41.70 161.92* 11.55 8.51] (最佳动作: Right)
状态 (4, 2) -> [151.93 169.95* 150.30 146.86] (最佳动作: Right)
状态 (4, 3) -> [153.93 176.89* 169.65 159.69] (最佳动作: Right)
状态 (4, 4) -> [151.42 184.20* 180.03 164.12] (最佳动作: Right)
状态 (4, 5) -> [136.77 191.90* 188.97 170.40] (最佳动作: Right)
状态 (4, 6) -> [159.82 161.73 200.00* 169.69] (最佳动作: Down)
状态 (4, 7) -> [3.63 -0.20 10.83 187.48*] (最佳动作: Left)
状态 (4, 8) -> [-0.19 -0.17 0.00 15.78*] (最佳动作: Left)
状态 (4, 9) -> [0.00 -9.98 0.00 0.38*] (最佳动作: Left)
状态 (5, 0) -> [0.00 1.26* -0.18 -9.88] (最佳动作: Right)
状态 (5, 1) -> [11.44 68.65* -0.13 -0.29] (最佳动作: Right)
状态 (5, 2) -> [159.88* 28.48 0.93 9.11] (最佳动作: Up)
状态 (5, 3) -> [45.94 182.74* 5.88 59.66] (最佳动作: Right)
状态 (5, 4) -> [101.77 191.86* 10.64 75.66] (最佳动作: Right)
状态 (5, 5) -> [105.31 200.00* 48.24 104.01] (最佳动作: Right)
状态 (5, 6) -> [0.00* 0.00 0.00 0.00] (最佳动作: Up)
状态 (5, 7) -> [-0.09 -0.11 -0.09 68.78*] (最佳动作: Left)
状态 (5, 8) -> [-0.11 -0.11 0.00* 0.00] (最佳动作: Down)
状态 (5, 9) -> [0.00 0.00 -0.28 0.19*] (最佳动作: Left)
状态 (6, 0) -> [0.19 0.96* 0.00 0.00] (最佳动作: Right)
状态 (6, 1) -> [0.36 2.89* -0.17 -0.20] (最佳动作: Right)
状态 (6, 2) -> [31.40* 0.00 -0.09 -0.11] (最佳动作: Up)
状态 (6, 3) -> [41.69* 0.75 -0.11 0.00] (最佳动作: Up)
状态 (6, 4) -> [12.66 41.93* -0.08 -0.26] (最佳动作: Right)
状态 (6, 5) -> [135.57* 8.91 -0.14 0.41] (最佳动作: Up)
状态 (6, 6) -> [68.78* 0.66 0.02 3.64] (最佳动作: Up)
状态 (6, 7) -> [0.19 -0.09 0.00 11.84*] (最佳动作: Left)
状态 (6, 8) -> [0.00 -0.11 0.00 0.19*] (最佳动作: Left)
状态 (6, 9) -> [0.78* -9.97 0.00 0.00] (最佳动作: Up)
状态 (7, 1) -> [0.46* 0.19 0.00 0.00] (最佳动作: Up)
状态 (7, 2) -> [0.19* 0.19 -0.11 -0.11] (最佳动作: Up)
状态 (7, 3) -> [0.00* 0.00 0.00 -0.19] (最佳动作: Up)
状态 (7, 4) -> [0.00 0.72* 0.00 0.00] (最佳动作: Right)
状态 (7, 5) -> [0.21 0.69* -0.11 -0.14] (最佳动作: Right)
状态 (7, 6) -> [7.37* -0.21 0.00 -0.08] (最佳动作: Up)
状态 (7, 7) -> [0.19* -0.11 -0.11 0.00] (最佳动作: Up)
状态 (7, 8) -> [0.19* 0.00 0.00 0.00] (最佳动作: Up)
状态 (8, 2) -> [0.00 0.19* 0.00 0.00] (最佳动作: Right)
状态 (8, 3) -> [0.00 0.19* 0.00 0.00] (最佳动作: Right)
状态 (8, 4) -> [0.19* 0.00 0.00 0.00] (最佳动作: Up)
状态 (8, 5) -> [0.19* 0.00 0.00 0.00] (最佳动作: Up)
状态 (8, 7) -> [0.19* 0.00 0.00 0.00] (最佳动作: Up)
--------------------------------------------------
Training Complete!
Starting Test...
Using Best Strategy Environment - Obstacles: [(9, 1), (9, 6)], Exit: (5, 6)
Position: (0, 0), Action: Right
Position: (0, 1), Action: Down
Position: (1, 1), Action: Right
Position: (1, 2), Action: Down
Position: (2, 2), Action: Down
Position: (3, 2), Action: Down
Position: (4, 2), Action: Right
Position: (4, 3), Action: Right
Position: (4, 4), Action: Right
Position: (4, 5), Action: Right
Position: (4, 6), Action: Down

Q-learning网格环境代码解析
这段代码实现了一个基于Q-learning的强化学习系统,用于训练智能体在有障碍物的网格环境中找到从起点到终点的最优路径。
主要组件
1. GridEnvironment类
这是一个10x10的网格环境:
初始化:创建网格,设置起点(0,0),随机生成障碍物和出口
动作空间:上(0)、右(1)、下(2)、左(3)
奖励机制:
撞到障碍物/边界:-100
到达出口:+200
每一步:-0.1
朝出口方向移动:+2
远离出口方向移动:-1
环境生成:确保障碍物和出口与起点有一定距离
可视化:提供网格环境的图形化展示
2. QLearningAgent类
实现Q-learning算法的智能体:
参数:学习率(0.1)、折扣因子(0.95)、探索率(初始1.0)、探索率衰减(0.997)
Q表:使用字典存储状态-动作对的价值
动作选择:使用ε-greedy策略平衡探索与利用
Q值更新:使用标准Q-learning公式更新状态-动作价值
3. train函数
训练智能体的主要流程:
运行多个训练回合(默认5000次)
记录每个回合的奖励和成功率
保存最佳策略(Q表)和环境配置
定期输出训练进度
4. test函数
测试训练好的智能体:
使用最佳环境配置
创建动画效果展示智能体移动路径
显示最终结果(成功/失败)
工作流程
创建环境和智能体
训练阶段:
智能体在环境中探索并学习
通过Q-learning更新状态-动作价值
探索率逐渐降低,增加利用最优策略的概率
保存最佳策略
测试阶段:
使用训练好的Q表选择最优动作
可视化展示智能体从起点到终点的路径

Sarsa算法

SARSA跟Q-learning核心思想基本是一样的,只不过SARSA在更新Q值时并不是取后续动作的最大值而是取某一动作的实际值。

学习策略

\(Q(S,A)\ <-\ Q(S,A)+α[R+γQ(S',A')-Q(S,A)]\)

这里的A'就是一个实际动作。

Q-learning和Sarsa的共同点都是探索方法一致,更新Q表——动作价值,目标为\(R+Q_{next}\),不同之处就在于\(Q_{next}\)的定义不一样。Q-learning中\(Q_{next}\)取最大,Sarsa中\(Q_{next}\)取真实。

因为Q-learning是乐观估计算法,所以Q-learning一般都是奔着最大收益去的,而Sarsa则是趋向于保守。比如在投资理财中,如果采用Sarsa,则更多的考虑的是平均收益,而采用Q-learning的话,则更多考虑的是最大收益。

用考研/创业场景理解Sarsa的保守性

​场景设定

  • ​状态:s1(成绩优秀,有创业想法但缺资金)
  • ​动作:a2(创业)
  • ​执行结果:进入状态s4(创业初期,面临市场风险),获得即时奖励R=50

​关键步骤:下一步动作的选择

  • ​Q-learning的更新逻辑:

    • 假设在s4下,最优动作是“坚持创业”(Q(s4,a2)=80)。
    • 更新时直接使用最大值80,无论实际下一步是否选择该动作。
    • ​公式体现:R+γmaxaQ(st+1,a)
  • ​Sarsa的更新逻辑:

    • s4下,根据当前策略(如ε-greedy)​实际选择动作,例如:
      • 探索性选择“放弃创业,转为考研”(a1),对应Q(s4,a1)=0
    • 更新时使用实际选择的动作的Q值0。
    • ​公式体现:R+γQ(st+1,at+1)

​更新结果对比

假设学习率α=0.5,折扣因子γ=0.8

  • ​Q-learning更新:

    Q(s1,a2)=0+0.5[50+0.8×800]=0.5×(50+64)=7

    ​结果:创业动作的Q值上升,鼓励冒险。

  • ​Sarsa更新:

    Q(s1,a2)=0+0.5[50+0.8×00]=0.5×(50)=25

    ​结果:创业动作的Q值大幅下降,策略趋于保守。


​3. Sarsa的“路径依赖”特性

Sarsa的更新严格依赖于实际执行的路径,因此对高风险场景更敏感。

​实例:创业失败链

  • ​路径1​(积极策略):
    s1a2s4a2s5(创业成功),总奖励50+100=50

    • 最终Q值更新为正,鼓励持续创业。
  • ​路径2​(保守策略):
    s1a2s4a1s3(放弃创业转考研),总奖励50+40=10

    • 最终Q值更新为负,抑制创业行为。

结论:
Sarsa会根据实际经历的路径调整策略:

  • 若多次经历创业失败后转考研,Q值会抑制创业动作;
  • 若偶然成功一次,后续可能重新鼓励创业。

​4. Sarsa与Q-learning的核心区别

​对比维度 ​Q-learning ​Sarsa
​目标策略 学习理想最优策略(即使未实际执行) 学习当前执行策略(依赖实际动作选择)
​风险偏好 更激进(假设下一步总是最优动作) 更保守(考虑下一步实际动作的风险)
​适用场景 高风险高回报、需长期规划(如创业决策) 安全敏感、需避免即时损失(如医疗决策)
​公式差异 maxaQ(st+1,a) Q(st+1,at+1)

​5. 核心启发:人生决策中的Sarsa思维

  1. ​谨慎试错:

    • Sarsa的保守性类似“走一步看一步”,适合资源有限、失败成本高的场景(如家庭经济压力大的学生更倾向考研而非创业)。
  2. ​路径依赖的影响:

    • 若多次尝试某路径均失败(如创业→亏损→放弃),策略会自我强化保守选择;
    • 需主动引入探索(提高ε),避免陷入局部最优。
  3. ​策略与环境的匹配:

    • 在稳定环境(如考研成功率可预测)中,Q-learning更高效;
    • 在动态环境(如市场快速变化)中,Sarsa能更快适应实际风险。

On-policy和Off-policy

Q-learning是一种Off-policy,Sarsa是一种On-policy。

Off-policy的方法将收集数据和学习数据作为两个不同的任务。它准备两个策略:行为策略(behavior policy)与目标策略(target policy)。

行为策略是专门负责学习数据的获取,具有一定的随机性,总是有一定的概率选出潜在的最优动作。目标策略借助行为策略收集到的样本以及策略提升方法自身性能,并最终成为最优策略。

Off-policy(离策略)和On-policy(同策略)是强化学习中两种核心学习策略,它们的核心区别在于“行为策略”​​(实际选择动作的策略)和“目标策略”​​(被优化更新的策略)是否一致。

1.核心定义

​On-policy(同策略)​

  • ​行为策略 = 目标策略:智能体通过当前正在执行的策略​(例如带探索的ε-greedy策略)生成数据,并直接用这些数据更新同一策略。
  • ​特点:
    • 必须使用“新鲜”数据(当前策略生成的数据)。
    • 更新过程与探索直接绑定(例如探索动作会影响策略优化)。
  • ​典型算法:Sarsa、REINFORCE(策略梯度)。

​Off-policy(离策略)​

  • ​行为策略 ≠ 目标策略:智能体通过某个策略​(例如随机策略)生成数据,但用这些数据更新另一个不同的策略​(例如贪婪策略)。
  • ​特点:
    • 可以复用历史数据(例如经验回放)。
    • 目标策略可以完全贪婪,而行为策略可以保持探索。
  • ​典型算法:Q-learning、DQN、DDPG。

​2. 类比理解:学生备考策略

​场景设定

  • ​目标策略:学生最终想达到的最优学习策略(例如“每天高效学习8小时”)。
  • ​行为策略:学生实际执行的动作(例如“今天玩游戏2小时,学习6小时”)。

​On-policy(同策略)​

  • ​学习方式:
    学生必须根据当前的实际行为​(例如边玩边学)来调整目标策略。如果今天玩了游戏,就要从“边玩边学”的经验中反思如何优化。
  • ​结果:
    策略更新受探索行为直接影响,比如偶尔玩游戏可能导致策略偏向“减少学习时间”。

​Off-policy(离策略)​

  • ​学习方式:
    学生可以观察其他学霸的行为(例如“每天学习10小时”),或利用自己过去的数据(例如上周的学习记录),来优化目标策略,而无需自己实际执行“每天学习10小时”。
  • ​结果:
    目标策略可以独立于实际行为,比如即使自己今天偷懒了,仍能学习如何做到“高效学习8小时”。

​3. 关键差异对比

​维度 ​On-policy ​Off-policy
​数据来源 必须用当前策略生成的新数据 可以用旧数据或其他策略生成的数据
​探索与利用 探索直接影响策略更新(如ε-greedy) 探索和更新策略解耦(行为策略可自由探索)
​数据效率 低(需要持续生成新数据) 高(可复用历史数据)
​稳定性 更稳定(策略更新与行为一致) 可能不稳定(目标与行为策略差异大)
​典型场景 实时交互环境(如机器人控制) 数据驱动的场景(如游戏AI)

​4. 通过算法实例深入理解

​例1:Sarsa(On-policy)​

  • ​行为策略:ε-greedy策略(例如90%概率选当前最优动作,10%随机探索)。
  • ​目标策略:更新的策略仍然是ε-greedy策略。
  • ​更新逻辑:
    • 在状态st选择动作at(可能探索),进入状态st+1后,继续用ε-greedy选择动作at+1
    • 用实际选择的at+1对应的Q值更新当前策略。
  • ​结果:策略优化会考虑探索带来的风险(例如探索到危险动作会抑制后续选择)。

​例2:Q-learning(Off-policy)​

  • ​行为策略:ε-greedy策略(例如随机探索)。
  • ​目标策略:完全贪婪策略(只选最优动作)。
  • ​更新逻辑:
    • 在状态st选择动作at(可能探索),进入状态st+1后,假设下一步选最优动作at+1
    • at+1对应的Q值(即maxaQ(st+1,a))更新策略,而不管实际是否执行了at+1
  • ​结果:策略优化趋向理想最优路径,忽略探索风险。

​5. 生活中的类比

​On-policy:新手司机学车

  • ​行为策略:实际驾驶时小心翼翼,偶尔犯错(例如急刹车)。
  • ​目标策略:从自己的驾驶错误中学习,调整成更谨慎的策略。
  • ​结果:学会的策略会规避实际犯过的错误,但进步较慢(依赖自身经验)。

​Off-policy:老司机看教学视频

  • ​行为策略:自己开车时按习惯驾驶(可能不完美)。
  • ​目标策略:观看专业教练的视频,学习理想驾驶技巧。
  • ​结果:即使自己没尝试过漂移,也能通过观察学习到高级技巧。

​6. 如何选择?

  • ​选On-policy:
    • 环境交互成本低(如模拟器),需在线学习。
    • 策略需要实时适应探索风险(如无人机避障)。
  • ​选Off-policy:
    • 数据宝贵或环境交互成本高(如医疗决策)。
    • 希望复用历史数据或并行学习多个策略(如游戏AI训练)。

​总结

  • ​On-policy:​​“边做边学”​,策略更新与实际行为强绑定,适合安全敏感场景。
  • ​Off-policy:​​“站在巨人的肩膀上”​,可利用他人或历史经验,适合数据驱动场景。
  • ​核心口诀:
    • On-policy:我怎么做,就怎么学。
    • Off-policy:别人怎么做,我学;自己怎么做,随意。

Sarsa棋盘格找出口代码样例

import matplotlib.pyplot as plt

import numpy as np
import random
import copy


class GridEnvironment:
    def __init__(self):
        # 初始化10x10的棋盘
        self.grid_size = 10
        self.grid = np.zeros((self.grid_size, self.grid_size))

        # 初始位置
        self.start_position = (0, 0)
        self.current_position = self.start_position

        # 随机生成障碍物和出口位置
        self.generate_random_positions()

        # Action space: Up(0), Right(1), Down(2), Left(3)
        self.actions = [(-1, 0), (0, 1), (1, 0), (0, -1)]
        self.action_names = ["Up", "Right", "Down", "Left"]

        # 修改奖励设置,增强正向激励
        self.obstacle_reward = -100
        self.exit_reward = 200  # 增加出口奖励
        self.step_reward = -0.1  # 进一步减小每步惩罚

    def generate_random_positions(self):
        # 所有可能的位置(除了起始位置)
        all_positions = [(i, j) for i in range(self.grid_size) for j in range(self.grid_size) if
                         (i, j) != self.start_position]

        # 筛选出离起点有一定距离的位置(使用曼哈顿距离)
        min_distance = 5  # 设置最小距离阈值
        distant_positions = [pos for pos in all_positions
                             if self.calculate_distance(self.start_position, pos) >= min_distance]

        # 如果没有足够的远距离位置,则使用所有可用位置
        if len(distant_positions) < 2:
            distant_positions = all_positions

        # 随机选择2个位置作为障碍物
        self.obstacles = random.sample(distant_positions, 2)

        # 从剩余位置中选择一个作为出口,优先选择远离起点的位置
        remaining_positions = [pos for pos in all_positions if pos not in self.obstacles]
        # 按照与起点的距离对剩余位置进行排序(从远到近)
        remaining_positions.sort(key=lambda pos: self.calculate_distance(self.start_position, pos), reverse=True)
        # 从前半部分选择出口位置(确保出口也相对远离起点)
        exit_candidates = remaining_positions[:len(remaining_positions) // 2]
        self.exit = random.choice(exit_candidates)

        # 更新网格
        self.grid = np.zeros((self.grid_size, self.grid_size))
        for obs in self.obstacles:
            self.grid[obs] = -1
        self.grid[self.exit] = 1

    def reset(self):
        # 重置智能体位置到起点
        self.current_position = self.start_position
        return self.current_position

    def calculate_distance(self, pos1, pos2):
        return abs(pos1[0] - pos2[0]) + abs(pos1[1] - pos2[1])  # 曼哈顿距离

    def step(self, action):
        # 保存移动前的位置和到出口的距离
        prev_position = self.current_position
        prev_distance = self.calculate_distance(prev_position, self.exit)

        # 执行原有的移动逻辑
        row, col = self.current_position
        d_row, d_col = self.actions[action]

        # 计算新位置
        new_row = row + d_row
        new_col = col + d_col

        # 检查是否超出边界
        if new_row < 0 or new_row >= self.grid_size or new_col < 0 or new_col >= self.grid_size:
            # 如果超出边界,保持原位置不变,给予惩罚,并结束游戏
            return self.current_position, self.obstacle_reward, True  # 撞到围栏也算失败

        # 更新位置
        self.current_position = (new_row, new_col)

        # 检查是否撞到障碍物
        if self.current_position in self.obstacles:
            return self.current_position, self.obstacle_reward, True

        # 检查是否到达出口
        if self.current_position == self.exit:
            return self.current_position, self.exit_reward, True

        # 计算新的距离
        new_distance = self.calculate_distance(self.current_position, self.exit)

        # 基于距离变化的奖励,增强正向激励
        distance_reward = 0
        if new_distance < prev_distance:
            distance_reward = 2  # 增加朝出口方向移动的奖励
        elif new_distance > prev_distance:
            distance_reward = -1  # 保持远离出口方向的惩罚

        # 返回新状态、奖励和是否结束
        return self.current_position, self.step_reward + distance_reward, False

    def render(self, q_table=None, path=None):
        # 可视化棋盘
        fig, ax = plt.subplots(figsize=(8, 8))

        # 创建颜色映射
        cmap = plt.cm.colors.ListedColormap(['white', 'yellow', 'black', 'red'])
        bounds = [-0.5, 0.5, 1.5, 2.5, 3.5]
        norm = plt.cm.colors.BoundaryNorm(bounds, cmap.N)

        # 复制棋盘用于显示
        display_grid = self.grid.copy()

        # 标记当前位置为红色
        if path:
            for pos in path:
                if pos != self.exit:  # 不覆盖出口
                    display_grid[pos] = 2
        else:
            display_grid[self.current_position] = 2

        # 显示棋盘
        ax.imshow(display_grid, cmap=cmap, norm=norm)

        # 添加网格线
        ax.grid(which='major', axis='both', linestyle='-', color='k', linewidth=2)
        ax.set_xticks(np.arange(-0.5, self.grid_size, 1))
        ax.set_yticks(np.arange(-0.5, self.grid_size, 1))

        # 隐藏刻度标签
        ax.set_xticklabels([])
        ax.set_yticklabels([])

        # 如果提供了Q表,显示最佳动作
        if q_table is not None:
            for i in range(self.grid_size):
                for j in range(self.grid_size):
                    if (i, j) in self.obstacles or (i, j) == self.exit:
                        continue

                    # 获取该状态的最佳动作
                    state = (i, j)
                    if state in q_table:
                        best_action = np.argmax(q_table[state])

                        # 在格子中心绘制箭头
                        dx, dy = self.actions[best_action]
                        ax.arrow(j, i, dy * 0.3, dx * 0.3, head_width=0.2, head_length=0.2, fc='blue', ec='blue')

        plt.title("Red: Current Position, Black: Obstacles, Yellow: Exit")
        plt.tight_layout()
        plt.show()


class SarsaAgent:
    def __init__(self, env, learning_rate=0.1, discount_factor=0.95, exploration_rate=1.0, exploration_decay=0.997):
        # 增加折扣因子,减缓探索率衰减
        self.env = env
        self.learning_rate = learning_rate
        self.discount_factor = discount_factor  # 增加折扣因子
        self.exploration_rate = exploration_rate
        self.exploration_decay = exploration_decay  # 减缓探索率衰减

        # 初始化Q表为空字典
        self.q_table = {}

    def get_q_value(self, state, action):
        # 如果状态-动作对不在Q表中,初始化为0
        if state not in self.q_table:
            self.q_table[state] = np.zeros(len(self.env.actions))
        return self.q_table[state][action]

    def choose_action(self, state):
        # 使用ε-greedy策略选择动作
        if random.uniform(0, 1) < self.exploration_rate:
            # 探索:随机选择动作
            return random.randint(0, len(self.env.actions) - 1)
        else:
            # 利用:选择Q值最大的动作
            if state not in self.q_table:
                self.q_table[state] = np.zeros(len(self.env.actions))
            return np.argmax(self.q_table[state])

    def update_q_value(self, state, action, reward, next_state, next_action):
        # 更新Q - SARSA使用实际选择的下一个动作
        if next_state not in self.q_table:
            self.q_table[next_state] = np.zeros(len(self.env.actions))

        # SARSA更新公式 - 使用下一个状态的实际选择动作
        current_q = self.get_q_value(state, action)
        next_q = self.get_q_value(next_state, next_action)
        new_q = current_q + self.learning_rate * (reward + self.discount_factor * next_q - current_q)

        # 更新Q        if state not in self.q_table:
            self.q_table[state] = np.zeros(len(self.env.actions))
        self.q_table[state][action] = new_q

    def decay_exploration(self):
        # 衰减探索率
        self.exploration_rate *= self.exploration_decay


def train(agent, episodes=5000):
    # 训练智能体
    rewards_per_episode = []
    success_count = 0  # 记录成功到达出口的次数
    best_q_table = None  # 保存最佳Q    best_success_rate = 0  # 最佳成功率
    best_env_config = None  # 保存最佳环境配置
    best_episode = 0  # 记录最佳策略的训练回合数

    # Fixed environment layout for training
    agent.env.generate_random_positions()
    print(f"Training Environment - Obstacles: {agent.env.obstacles}, Exit: {agent.env.exit}")

    # 保存初始环境配置
    initial_env_config = {
        'obstacles': agent.env.obstacles.copy(),
        'exit': agent.env.exit
    }

    for episode in range(episodes):
        state = agent.env.reset()
        # SARSA需要在循环外选择第一个动作
        action = agent.choose_action(state)
        total_reward = 0
        done = False
        steps = 0

        while not done and steps < 300:
            next_state, reward, done = agent.env.step(action)

            # 如果没有结束,选择下一个动作
            if not done:
                next_action = agent.choose_action(next_state)
            else:
                next_action = 0  # 任意值,因为在终止状态不会使用

            # SARSA更新使用当前状态-动作对和下一个状态-动作对
            agent.update_q_value(state, action, reward, next_state, next_action)

            # 更新状态和动作
            state = next_state
            action = next_action

            total_reward += reward
            steps += 1

        if done and state == agent.env.exit:
            success_count += 1

        agent.decay_exploration()
        rewards_per_episode.append(total_reward)

        if (episode + 1) % 100 == 0:
            current_success_rate = success_count / 100 if episode >= 99 else success_count / (episode + 1)
            print(
                f"Episode {episode + 1}/{episodes}, Average Reward: {np.mean(rewards_per_episode[-100:]):.2f}, Success Rate: {current_success_rate:.2f}, Exploration Rate: {agent.exploration_rate:.4f}")

            if current_success_rate > best_success_rate:
                best_success_rate = current_success_rate
                best_q_table = copy.deepcopy(agent.q_table)
                best_env_config = initial_env_config
                best_episode = episode + 1
                print(f"Found Better Strategy! Success Rate: {best_success_rate:.2f}, Training Episode: {best_episode}")

            success_count = 0  # 重置成功计数

    print(f"Training Complete! Best Success Rate: {best_success_rate:.2f}, Achieved at Episode {best_episode}")

    # 恢复最佳环境配置
    agent.env.obstacles = best_env_config['obstacles']
    agent.env.exit = best_env_config['exit']
    agent.env.grid = np.zeros((agent.env.grid_size, agent.env.grid_size))
    for obs in agent.env.obstacles:
        agent.env.grid[obs] = -1
    agent.env.grid[agent.env.exit] = 1

    return best_q_table, rewards_per_episode, best_env_config, best_episode


def test(env, q_table, env_config=None, best_episode=None):
    if env_config:
        env.obstacles = env_config['obstacles']
        env.exit = env_config['exit']
        env.grid = np.zeros((env.grid_size, env.grid_size))
        for obs in env.obstacles:
            env.grid[obs] = -1
        env.grid[env.exit] = 1
        print(f"Using Best Strategy Environment - Obstacles: {env.obstacles}, Exit: {env.exit}")
    else:
        env.generate_random_positions()
        print(f"Test Environment - Obstacles: {env.obstacles}, Exit: {env.exit}")

    # 使用训练好的Q表进行测试
    state = env.reset()
    done = False
    steps = 0
    total_reward = 0
    path = [state]
    game_result = "In Progress"  # 用于跟踪游戏结果

    # 创建动画效果所需的图形对象
    fig, ax = plt.subplots(figsize=(8, 8))
    plt.ion()  # 打开交互模式

    # 创建颜色映射 - 修改颜色以更好地区分不同元素
    cmap = plt.cm.colors.ListedColormap(['white', 'yellow', 'gray', 'red', 'lightblue'])
    bounds = [-0.5, 0.5, 1.5, 2.5, 3.5, 4.5]
    norm = plt.cm.colors.BoundaryNorm(bounds, cmap.N)

    # 初始化图像
    display_grid = env.grid.copy()
    display_grid[state] = 2  # 标记当前位置为红色

    # 确保障碍物显示为灰色
    for obs in env.obstacles:
        display_grid[obs] = 2  # 使用灰色显示障碍物

    img = ax.imshow(display_grid, cmap=cmap, norm=norm)

    # 添加网格线
    ax.grid(which='major', axis='both', linestyle='-', color='k', linewidth=2)
    ax.set_xticks(np.arange(-0.5, env.grid_size, 1))
    ax.set_yticks(np.arange(-0.5, env.grid_size, 1))

    # 隐藏刻度标签
    ax.set_xticklabels([])
    ax.set_yticklabels([])

    # 修改图例颜色和标签
    legend_elements = [
        plt.Rectangle((0, 0), 1, 1, color='white', label='Empty'),
        plt.Rectangle((0, 0), 1, 1, color='yellow', label='Exit'),
        plt.Rectangle((0, 0), 1, 1, color='gray', label='Obstacle'),
        plt.Rectangle((0, 0), 1, 1, color='red', label='Current'),
        plt.Rectangle((0, 0), 1, 1, color='lightblue', label='Path')
    ]
    ax.legend(handles=legend_elements, loc='upper center', bbox_to_anchor=(0.5, -0.05), ncol=5)

    plt.title(f"SARSA Agent (Best Strategy from Episode {best_episode})")
    plt.tight_layout()
    plt.draw()
    plt.pause(0.5)  # 初始暂停,让用户看清起始状态

    while not done and steps < 100:
        if state not in q_table:
            action = random.randint(0, len(env.actions) - 1)
        else:
            action = np.argmax(q_table[state])

        print(f"Position: {state}, Action: {env.action_names[action]}")
        next_state, reward, done = env.step(action)

        state = next_state
        path.append(state)
        total_reward += reward
        steps += 1

        # 更新动画 - 修改显示逻辑
        display_grid = env.grid.copy()

        # 首先确保障碍物显示
        for obs in env.obstacles:
            display_grid[obs] = 2  # 使用灰色显示障碍物

        # 然后显示路径
        for pos in path[:-1]:
            if pos != env.exit and pos not in env.obstacles:  # 不覆盖出口和障碍物
                display_grid[pos] = 3  # 使用浅蓝色标记路径

        # 最后显示当前位置
        display_grid[state] = 2  # 标记当前位置为红色

        img.set_data(display_grid)
        plt.draw()
        plt.pause(0.3)  # 控制动画速度

        # 判断游戏结果
        if done:
            if state == env.exit:
                game_result = "Success"
            else:
                game_result = "Failure"

    # 显示最终结果
    if game_result == "Success":
        plt.title(
            f"Successfully Reached Exit! Steps: {steps}, Total Reward: {total_reward:.2f}\n(Best Strategy from Episode {best_episode})")
    elif game_result == "Failure":
        if state in env.obstacles:
            plt.title(
                f"Hit Obstacle, Game Over. Steps: {steps}, Total Reward: {total_reward:.2f}\n(Best Strategy from Episode {best_episode})")
        else:
            plt.title(
                f"Hit Wall, Game Over. Steps: {steps}, Total Reward: {total_reward:.2f}\n(Best Strategy from Episode {best_episode})")
    else:
        plt.title(
            f"Failed to Reach Exit Within Step Limit. Steps: {steps}, Total Reward: {total_reward:.2f}\n(Best Strategy from Episode {best_episode})")

    plt.ioff()  # 关闭交互模式
    plt.show()  # 显示最终结果

    return path, game_result


if __name__ == "__main__":
    # 创建环境和智能体
    env = GridEnvironment()
    agent = SarsaAgent(env)

    # 训练智能体(无显示)
    print("Starting Training...")
    q_table, rewards, best_env_config, best_episode = train(agent, episodes=5000)
    print("Training Complete!")

    # 测试智能体(使用最佳策略配置)
    print("Starting Test...")
    path, result = test(env, q_table, best_env_config, best_episode)

运行结果

Starting Training...
Training Environment - Obstacles: [(5, 0), (4, 7)], Exit: (1, 9)
Episode 100/5000, Average Reward: -88.44, Success Rate: 0.02, Exploration Rate: 0.7405
Found Better Strategy! Success Rate: 0.02, Training Episode: 100
Episode 200/5000, Average Reward: -27.54, Success Rate: 0.21, Exploration Rate: 0.5483
Found Better Strategy! Success Rate: 0.21, Training Episode: 200
Episode 300/5000, Average Reward: 47.24, Success Rate: 0.45, Exploration Rate: 0.4060
Found Better Strategy! Success Rate: 0.45, Training Episode: 300
Episode 400/5000, Average Reward: 99.36, Success Rate: 0.62, Exploration Rate: 0.3007
Found Better Strategy! Success Rate: 0.62, Training Episode: 400
Episode 500/5000, Average Reward: 143.51, Success Rate: 0.76, Exploration Rate: 0.2226
Found Better Strategy! Success Rate: 0.76, Training Episode: 500
Episode 600/5000, Average Reward: 177.01, Success Rate: 0.87, Exploration Rate: 0.1649
Found Better Strategy! Success Rate: 0.87, Training Episode: 600
Episode 700/5000, Average Reward: 173.56, Success Rate: 0.86, Exploration Rate: 0.1221
Episode 800/5000, Average Reward: 198.62, Success Rate: 0.94, Exploration Rate: 0.0904
Found Better Strategy! Success Rate: 0.94, Training Episode: 800
Episode 900/5000, Average Reward: 198.51, Success Rate: 0.94, Exploration Rate: 0.0669
Episode 1000/5000, Average Reward: 195.40, Success Rate: 0.93, Exploration Rate: 0.0496
Episode 1100/5000, Average Reward: 211.00, Success Rate: 0.98, Exploration Rate: 0.0367
Found Better Strategy! Success Rate: 0.98, Training Episode: 1100
Episode 1200/5000, Average Reward: 204.53, Success Rate: 0.96, Exploration Rate: 0.0272
Episode 1300/5000, Average Reward: 217.20, Success Rate: 1.00, Exploration Rate: 0.0201
Found Better Strategy! Success Rate: 1.00, Training Episode: 1300
Episode 1400/5000, Average Reward: 204.52, Success Rate: 0.96, Exploration Rate: 0.0149
Episode 1500/5000, Average Reward: 210.82, Success Rate: 0.98, Exploration Rate: 0.0110
Episode 1600/5000, Average Reward: 210.93, Success Rate: 0.98, Exploration Rate: 0.0082
Episode 1700/5000, Average Reward: 217.11, Success Rate: 1.00, Exploration Rate: 0.0061
Episode 1800/5000, Average Reward: 213.97, Success Rate: 0.99, Exploration Rate: 0.0045
Episode 1900/5000, Average Reward: 217.10, Success Rate: 1.00, Exploration Rate: 0.0033
Episode 2000/5000, Average Reward: 217.10, Success Rate: 1.00, Exploration Rate: 0.0025
Episode 2100/5000, Average Reward: 217.11, Success Rate: 1.00, Exploration Rate: 0.0018
Episode 2200/5000, Average Reward: 217.12, Success Rate: 1.00, Exploration Rate: 0.0013
Episode 2300/5000, Average Reward: 217.12, Success Rate: 1.00, Exploration Rate: 0.0010
Episode 2400/5000, Average Reward: 217.10, Success Rate: 1.00, Exploration Rate: 0.0007
Episode 2500/5000, Average Reward: 217.11, Success Rate: 1.00, Exploration Rate: 0.0005
Episode 2600/5000, Average Reward: 217.12, Success Rate: 1.00, Exploration Rate: 0.0004
Episode 2700/5000, Average Reward: 217.10, Success Rate: 1.00, Exploration Rate: 0.0003
Episode 2800/5000, Average Reward: 217.10, Success Rate: 1.00, Exploration Rate: 0.0002
Episode 2900/5000, Average Reward: 217.10, Success Rate: 1.00, Exploration Rate: 0.0002
Episode 3000/5000, Average Reward: 217.10, Success Rate: 1.00, Exploration Rate: 0.0001
Episode 3100/5000, Average Reward: 217.10, Success Rate: 1.00, Exploration Rate: 0.0001
Episode 3200/5000, Average Reward: 217.10, Success Rate: 1.00, Exploration Rate: 0.0001
Episode 3300/5000, Average Reward: 217.10, Success Rate: 1.00, Exploration Rate: 0.0000
Episode 3400/5000, Average Reward: 217.10, Success Rate: 1.00, Exploration Rate: 0.0000
Episode 3500/5000, Average Reward: 217.10, Success Rate: 1.00, Exploration Rate: 0.0000
Episode 3600/5000, Average Reward: 217.10, Success Rate: 1.00, Exploration Rate: 0.0000
Episode 3700/5000, Average Reward: 217.10, Success Rate: 1.00, Exploration Rate: 0.0000
Episode 3800/5000, Average Reward: 217.10, Success Rate: 1.00, Exploration Rate: 0.0000
Episode 3900/5000, Average Reward: 217.10, Success Rate: 1.00, Exploration Rate: 0.0000
Episode 4000/5000, Average Reward: 217.10, Success Rate: 1.00, Exploration Rate: 0.0000
Episode 4100/5000, Average Reward: 217.10, Success Rate: 1.00, Exploration Rate: 0.0000
Episode 4200/5000, Average Reward: 217.10, Success Rate: 1.00, Exploration Rate: 0.0000
Episode 4300/5000, Average Reward: 217.10, Success Rate: 1.00, Exploration Rate: 0.0000
Episode 4400/5000, Average Reward: 217.10, Success Rate: 1.00, Exploration Rate: 0.0000
Episode 4500/5000, Average Reward: 217.10, Success Rate: 1.00, Exploration Rate: 0.0000
Episode 4600/5000, Average Reward: 217.10, Success Rate: 1.00, Exploration Rate: 0.0000
Episode 4700/5000, Average Reward: 217.10, Success Rate: 1.00, Exploration Rate: 0.0000
Episode 4800/5000, Average Reward: 217.10, Success Rate: 1.00, Exploration Rate: 0.0000
Episode 4900/5000, Average Reward: 217.10, Success Rate: 1.00, Exploration Rate: 0.0000
Episode 5000/5000, Average Reward: 217.10, Success Rate: 1.00, Exploration Rate: 0.0000
Training Complete! Best Success Rate: 1.00, Achieved at Episode 1300
Training Complete!
Starting Test...
Using Best Strategy Environment - Obstacles: [(5, 0), (4, 7)], Exit: (1, 9)
Position: (0, 0), Action: Down
Position: (1, 0), Action: Right
Position: (1, 1), Action: Right
Position: (1, 2), Action: Right
Position: (1, 3), Action: Right
Position: (1, 4), Action: Right
Position: (1, 5), Action: Right
Position: (1, 6), Action: Right
Position: (1, 7), Action: Right
Position: (1, 8), Action: Right

这段代码实现了一个基于SARSA算法的强化学习系统,用于训练智能体在10x10网格环境中从起点导航到出口,同时避开障碍物。
主要组件
1. GridEnvironment类
这是环境模拟器,定义了智能体所处的世界:
创建10x10网格,随机生成障碍物和出口位置
定义动作空间:上(0)、右(1)、下(2)、左(3)
设置奖励机制:
到达出口:+200分
撞到障碍物:-100分
每走一步:-0.1分
朝出口方向移动:+2分
远离出口方向移动:-1分
提供step()方法执行动作并返回新状态、奖励和是否结束
提供render()方法可视化网格环境和智能体路径
2. SarsaAgent类
实现SARSA算法的智能体:
维护Q表(状态-动作价值函数)
使用ε-greedy策略选择动作(平衡探索与利用)
根据SARSA更新公式更新Q值:
  Q(s,a) ← Q(s,a) + α[r + γQ(s',a') - Q(s,a)]
其中s'是下一状态,a'是在s'状态下选择的实际动作
3. 训练函数(train)
训练智能体5000个回合
记录奖励和成功率
保存最佳Q表和环境配置
每100个回合输出训练进度
4. 测试函数(test)
使用训练好的Q表在环境中导航
创建动画效果显示智能体的路径
输出测试结果(成功/失败)
SARSA算法特点
SARSA是一种"在策略"(on-policy)的时序差分学习算法,与Q-learning的主要区别:
SARSA使用实际选择的下一个动作a'来更新Q值
Q-learning使用下一状态中最大Q值的动作来更新
代码执行流程
创建环境和智能体
训练智能体5000个回合,保存最佳策略
使用最佳策略在环境中进行可视化测试
显示智能体导航路径和最终结果

DQN算法

DQN的全称为Deep Q-Network。对于Q-learning,稍微复杂的环境,state的数量很容易到达无穷多:只要state的表示中有一个连续变量。对此,有两种解决方案

  1. 对连续变量离散化,按照其所在的区间进行分类,从而退化回之前的Q-table解决,这个方法会损失精度,并在维度升高时,table大小指数级增长。
  2. 用函数f(S,A;θ)来估计Q(S,A),其中θ为可学习的参数。

对θ的学习方式很多,比如:

  1. 线性函数可以直接用最小二乘法拟合。
  2. 定义损失函数后,可以通过类似于梯度下降法的方式优化。
  3. 类似于进化的方法学习。

DQN对于Q-learning的改进在于将Q-table变成了一个神经网络,用Q-learning算法来更新网络参数。神经网络的本质是使用非线性函数来拟合数据的规律,达到举一反三,更加具有泛化能力。它学习的数据是动态平衡的数据,好的探索策略可以带来好的正样本,不好的探索策略可以带来好的负样本,都可以提供给神经网络进行学习。在训练初期,智能体通常会执行一段纯探索阶段​(例如前1万步),完全通过随机动作(或高ε值的ε-greedy策略)与环境交互,目的是快速填充经验回放缓冲区(Replay Buffer),确保后续训练时有足够多样化的数据可用。一旦缓冲区达到最小容量(例如1万条经验),智能体进入探索与学习并行阶段。有别于人类事先标注的监督学习,它属于一种半监督学习

  1. 观察一个状态\((s_t,a_t,r_t,s_{t+1})\)
  2. Q-learning的学习策略为\(Q(S,A)\ <-\ Q(S,A)+α[R+γmax_aQ(S',a)-Q(S,A)]\),当前状态的动作价值为\(Q(s_t,a_t)\),在DQN中就为\(Q(s_t,a_t;θ)\);下一步动作的动作价值为\(R+γmax_aQ(S',a)\),在DQN中就为\(y_t=r_t+γmax_aQ(s_{t+1},a;θ)\).
  3. 它们之间的差值\(δ_t=Q(s_t,a_t;θ)-y_t\)就是误差,我们希望当前状态的动作价值和下一步动作的价值越接近越好,即该误差越接小越好。
  4. 使用梯度下降法\(θ <-\ θ-αδ_t{∂Q(s_t,a_t;θ)\over ∂θ}\)来更新θ。
  5. \(δ_t{∂Q(s_t,a_t;θ)\over ∂θ}\)这一项是误差的平方关于θ的梯度。

简单来说,就是\(Q(s_t,a_t;θ)\)要达到目标值——价值最大化。但是θ作为参数,它在学习的过程中是会不断变化的,由于下一个动作的价值\(Q(s_{t+1},a;θ)\)与当前动作价值\(Q(s_t,a_t;θ)\)的θ是参数共用的,是同一个神经网络,这就造成了我们要接近终点,但是我一行动会使得终点会跟着一起动。这样就会使得\(θ <-\ θ-αδ_t{∂Q(s_t,a_t;θ)\over ∂θ}\)非常不好收敛。它跟Q-table不同,Q-table所有的价值是独立互相不影响的。

对于Q-learning来说

它是探索一次,获取一条数据就更新一次

    for episode in range(episodes):  # 开始训练循环
        state = agent.env.reset()  # 重置环境并获取初始状态
        total_reward = 0  # 初始化当前回合的累计奖励
        done = False  # 初始化回合结束标志
        steps = 0  # 初始化步数计数器

        while not done and steps < 300:  # 单回合循环,直到结束或达到最大步数
            action = agent.choose_action(state)  # 智能体根据当前状态选择动作
            next_state, reward, done = agent.env.step(action)  # 执行动作并获取结果

            agent.update_q_value(state, action, reward, next_state)  # 更新Q值表

            state = next_state  # 更新当前状态
            total_reward += reward  # 累加奖励
            steps += 1  # 增加步数计数

通过以上的代码,我们可以看到在训练中每一次的动作(移动)后,都会更新一次Q-table。

对于DQN来说

它会有两个网络,一个叫做评估网络,一个叫目标网络。评估网络就是我们要学习的神经网络,每隔5步训练一次。目标网络是产生目标函数的网络。

DQN的训练流程:强化与监督的融合

  1. ​数据收集:
    • 智能体探索环境,生成经验(st,at,rt+1,st+1)
  2. ​经验回放:
    • 随机抽取一批历史数据,打破时序相关性(类似监督学习的mini-batch)。
  3. ​目标值计算:
    • 使用目标网络(Target Network)生成稳定的目标Q值yj
  4. ​监督式训练:
    • 通过梯度下降最小化Q(sj,aj;θ)yj的误差,更新主网络。

DQN棋盘格找出口代码样例

import matplotlib.pyplot as plt
import numpy as np
import random
import copy
import torch
import torch.nn as nn
import torch.optim as optim
from collections import deque
import torch.nn.functional as F


class GridEnvironment:
    def __init__(self):
        # 初始化10x10的棋盘
        self.grid_size = 10
        self.grid = np.zeros((self.grid_size, self.grid_size))

        # 初始位置
        self.start_position = (0, 0)
        self.current_position = self.start_position

        # 随机生成障碍物和出口位置
        self.generate_random_positions()

        # Action space: Up(0), Right(1), Down(2), Left(3)
        self.actions = [(-1, 0), (0, 1), (1, 0), (0, -1)]
        self.action_names = ["Up", "Right", "Down", "Left"]

        # 修改奖励设置,增强正向激励
        self.obstacle_reward = -100
        self.exit_reward = 200  # 增加出口奖励
        self.step_reward = -0.1  # 进一步减小每步惩罚

    def generate_random_positions(self):
        # 所有可能的位置(除了起始位置)
        all_positions = [(i, j) for i in range(self.grid_size) for j in range(self.grid_size) if
                         (i, j) != self.start_position]

        # 筛选出离起点有一定距离的位置(使用曼哈顿距离)
        min_distance = 5  # 设置最小距离阈值
        distant_positions = [pos for pos in all_positions
                             if self.calculate_distance(self.start_position, pos) >= min_distance]

        # 如果没有足够的远距离位置,则使用所有可用位置
        if len(distant_positions) < 2:
            distant_positions = all_positions

        # 随机选择2个位置作为障碍物
        self.obstacles = random.sample(distant_positions, 2)

        # 从剩余位置中选择一个作为出口,优先选择远离起点的位置
        remaining_positions = [pos for pos in all_positions if pos not in self.obstacles]
        # 按照与起点的距离对剩余位置进行排序(从远到近)
        remaining_positions.sort(key=lambda pos: self.calculate_distance(self.start_position, pos), reverse=True)
        # 从前半部分选择出口位置(确保出口也相对远离起点)
        exit_candidates = remaining_positions[:len(remaining_positions) // 2]
        self.exit = random.choice(exit_candidates)

        # 更新网格
        self.grid = np.zeros((self.grid_size, self.grid_size))
        for obs in self.obstacles:
            self.grid[obs] = -1
        self.grid[self.exit] = 1

    def reset(self):
        # 重置智能体位置到起点
        self.current_position = self.start_position

        # 训练时保持环境一致,不要每次都重新生成随机位置
        # 只在初始化时生成一次随机位置
        # self.generate_random_positions()  # 注释掉这一行

        return self.current_position

    def calculate_distance(self, pos1, pos2):
        return abs(pos1[0] - pos2[0]) + abs(pos1[1] - pos2[1])  # 曼哈顿距离

    def step(self, action):
        # 保存移动前的位置和到出口的距离
        prev_position = self.current_position
        prev_distance = self.calculate_distance(prev_position, self.exit)

        # 执行原有的移动逻辑
        row, col = self.current_position
        d_row, d_col = self.actions[action]

        # 计算新位置
        new_row = row + d_row
        new_col = col + d_col

        # 检查是否超出边界
        if new_row < 0 or new_row >= self.grid_size or new_col < 0 or new_col >= self.grid_size:
            # 如果超出边界,保持原位置不变,给予惩罚,并结束游戏
            return self.current_position, self.obstacle_reward, True  # 撞到围栏也算失败

        # 更新位置
        self.current_position = (new_row, new_col)

        # 检查是否撞到障碍物
        if self.current_position in self.obstacles:
            return self.current_position, self.obstacle_reward, True

        # 检查是否到达出口
        if self.current_position == self.exit:
            return self.current_position, self.exit_reward, True

        # 计算新的距离
        new_distance = self.calculate_distance(self.current_position, self.exit)

        # 基于距离变化的奖励,增强正向激励
        distance_reward = 0
        if new_distance < prev_distance:
            distance_reward = 3  # 增加朝出口方向移动的奖励
        elif new_distance > prev_distance:
            distance_reward = -1.5  # 增加远离出口方向的惩罚

        # 返回新状态、奖励和是否结束
        return self.current_position, self.step_reward + distance_reward, False

    def render(self, q_table=None, path=None):
        # 可视化棋盘
        fig, ax = plt.subplots(figsize=(8, 8))

        # 创建颜色映射
        cmap = plt.cm.colors.ListedColormap(['white', 'yellow', 'black', 'red'])
        bounds = [-0.5, 0.5, 1.5, 2.5, 3.5]
        norm = plt.cm.colors.BoundaryNorm(bounds, cmap.N)

        # 复制棋盘用于显示
        display_grid = self.grid.copy()

        # 标记当前位置为红色
        if path:
            for pos in path:
                if pos != self.exit:  # 不覆盖出口
                    display_grid[pos] = 2
        else:
            display_grid[self.current_position] = 2

        # 显示棋盘
        ax.imshow(display_grid, cmap=cmap, norm=norm)

        # 添加网格线
        ax.grid(which='major', axis='both', linestyle='-', color='k', linewidth=2)
        ax.set_xticks(np.arange(-0.5, self.grid_size, 1))
        ax.set_yticks(np.arange(-0.5, self.grid_size, 1))

        # 隐藏刻度标签
        ax.set_xticklabels([])
        ax.set_yticklabels([])

        # 如果提供了Q表,显示最佳动作
        if q_table is not None:
            for i in range(self.grid_size):
                for j in range(self.grid_size):
                    if (i, j) in self.obstacles or (i, j) == self.exit:
                        continue

                    # 获取该状态的最佳动作
                    state = (i, j)
                    if state in q_table:
                        best_action = np.argmax(q_table[state])

                        # 在格子中心绘制箭头
                        dx, dy = self.actions[best_action]
                        ax.arrow(j, i, dy * 0.3, dx * 0.3, head_width=0.2, head_length=0.2, fc='blue', ec='blue')

        plt.title("红色:当前位置,黑色:障碍物,黄色:出口")
        plt.tight_layout()
        plt.show()


# 定义DQN神经网络模型
class DQNetwork(nn.Module):
    def __init__(self, input_size, output_size):
        super(DQNetwork, self).__init__()
        self.fc1 = nn.Linear(input_size, 128)
        self.fc2 = nn.Linear(128, 128)
        self.fc3 = nn.Linear(128, 64)  # 添加一层
        self.fc4 = nn.Linear(64, output_size)

        # 初始化权重,帮助收敛
        self.apply(self._init_weights)

    def _init_weights(self, module):
        if isinstance(module, nn.Linear):
            nn.init.xavier_uniform_(module.weight)
            if module.bias is not None:
                module.bias.data.fill_(0.01)

    def forward(self, x):
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = F.relu(self.fc3(x))
        return self.fc4(x)


# 经验回放缓冲区
class ReplayBuffer:
    def __init__(self, capacity):
        self.buffer = deque(maxlen=capacity)

    def add(self, state, action, reward, next_state, done):
        self.buffer.append((state, action, reward, next_state, done))

    def sample(self, batch_size):
        return random.sample(self.buffer, batch_size)

    def __len__(self):
        return len(self.buffer)


# DQN智能体
class DQNAgent:
    def __init__(self, env, learning_rate=0.001, discount_factor=0.99,
                 exploration_rate=1.0, exploration_decay=0.998,
                 min_exploration_rate=0.05, buffer_size=10000, batch_size=64,
                 target_update=20):
        self.env = env
        self.state_size = 8  # 更新为新的状态大小: 位置(2) + 相对出口位置(2) + 障碍物信息(4)
        self.action_size = len(env.actions)
        self.learning_rate = learning_rate
        self.discount_factor = discount_factor
        self.exploration_rate = exploration_rate
        self.exploration_decay = exploration_decay
        self.min_exploration_rate = min_exploration_rate
        self.batch_size = batch_size
        self.target_update = target_update
        self.update_counter = 0

        # 创建策略网络和目标网络
        self.policy_net = DQNetwork(self.state_size, self.action_size)
        self.target_net = DQNetwork(self.state_size, self.action_size)
        self.target_net.load_state_dict(self.policy_net.state_dict())
        self.target_net.eval()  # 目标网络设置为评估模式

        # 优化器
        self.optimizer = optim.Adam(self.policy_net.parameters(), lr=learning_rate)

        # 经验回放缓冲区
        self.memory = ReplayBuffer(buffer_size)

        # 用于存储训练过程中的最佳模型
        self.best_model_state = None

    def state_to_tensor(self, state):
        """将状态转换为增强的张量表示"""
        # 获取当前位置
        row, col = state

        # 计算到出口的相对位置
        exit_row, exit_col = self.env.exit
        rel_exit_row = exit_row - row
        rel_exit_col = exit_col - col

        # 周围是否有障碍物的信息
        obstacles_info = []
        for d_row, d_col in self.env.actions:
            new_row, new_col = row + d_row, col + d_col
            # 检查是否是障碍物或边界
            is_obstacle = 1.0 if (new_row < 0 or new_row >= self.env.grid_size or
                                  new_col < 0 or new_col >= self.env.grid_size or
                                  (new_row, new_col) in self.env.obstacles) else 0.0
            obstacles_info.append(is_obstacle)

        # 合并所有信息
        return torch.FloatTensor([row, col, rel_exit_row, rel_exit_col] + obstacles_info)

    def choose_action(self, state):
        """使用ε-greedy策略选择动作"""
        if random.uniform(0, 1) < self.exploration_rate:
            # 探索:随机选择动作
            return random.randint(0, self.action_size - 1)
        else:
            # 利用:选择Q值最大的动作
            state_tensor = self.state_to_tensor(state)
            with torch.no_grad():
                q_values = self.policy_net(state_tensor)
                return torch.argmax(q_values).item()

    def store_experience(self, state, action, reward, next_state, done):
        """存储经验到回放缓冲区"""
        self.memory.add(state, action, reward, next_state, done)

    def learn(self):
        """从经验回放缓冲区中批量学习"""
        if len(self.memory) < self.batch_size:
            return

        # 从缓冲区中采样一批经验
        batch = self.memory.sample(self.batch_size)
        states, actions, rewards, next_states, dones = zip(*batch)

        # 转换为张量
        state_tensors = torch.stack([self.state_to_tensor(s) for s in states])
        action_tensors = torch.LongTensor(actions).unsqueeze(1)
        reward_tensors = torch.FloatTensor(rewards)
        next_state_tensors = torch.stack([self.state_to_tensor(s) for s in next_states])
        done_tensors = torch.FloatTensor(dones)

        # 计算当前Q值
        current_q_values = self.policy_net(state_tensors).gather(1, action_tensors)

        # 计算目标Q值
        with torch.no_grad():
            next_q_values = self.target_net(next_state_tensors).max(1)[0]
            target_q_values = reward_tensors + (1 - done_tensors) * self.discount_factor * next_q_values

        # 计算损失
        loss = F.smooth_l1_loss(current_q_values.squeeze(), target_q_values)

        # 优化模型
        self.optimizer.zero_grad()
        loss.backward()

        # 梯度检查 - 移到 backward() 之后
        total_norm = 0
        for p in self.policy_net.parameters():
            if p.grad is not None:  # 添加检查,确保梯度存在
                param_norm = p.grad.data.norm(2)
                total_norm += param_norm.item() ** 2
        total_norm = total_norm ** (1. / 2)

        # 如果梯度过大,跳过此次更新
        if total_norm > 10.0:
            print(f"警告:梯度范数过大 ({total_norm:.2f}),跳过此次更新")
            return

        # 梯度裁剪,防止梯度爆炸
        for param in self.policy_net.parameters():
            if param.grad is not None:  # 添加检查,确保梯度存在
                param.grad.data.clamp_(-1, 1)

        self.optimizer.step()

        # 更新目标网络
        self.update_counter += 1
        if self.update_counter % self.target_update == 0:
            self.target_net.load_state_dict(self.policy_net.state_dict())

    def decay_exploration(self):
        """衰减探索率"""
        self.exploration_rate = max(self.min_exploration_rate,
                                    self.exploration_rate * self.exploration_decay)

    def save_model(self, path):
        """保存模型"""
        torch.save(self.policy_net.state_dict(), path)

    def load_model(self, path):
        """加载模型"""
        self.policy_net.load_state_dict(torch.load(path))
        self.target_net.load_state_dict(self.policy_net.state_dict())


def train(agent, episodes=10000):
    """训练DQN智能体"""
    rewards_per_episode = []
    success_count = 0
    best_success_rate = 0
    best_env_config = None
    best_episode = 0

    # 为训练生成固定的环境布局
    agent.env.generate_random_positions()
    print(f"Training Environment - Obstacles: {agent.env.obstacles}, Exit: {agent.env.exit}")

    # 保存初始环境配置
    initial_env_config = {
        'obstacles': agent.env.obstacles.copy(),
        'exit': agent.env.exit
    }

    for episode in range(episodes):
        state = agent.env.reset()
        total_reward = 0
        done = False
        steps = 0

        while not done and steps < 1000:
            action = agent.choose_action(state)
            next_state, reward, done = agent.env.step(action)

            # 存储经验
            agent.store_experience(state, action, reward, next_state, done)

            # 学习
            agent.learn()

            state = next_state
            total_reward += reward
            steps += 1

        if done and state == agent.env.exit:
            success_count += 1

        agent.decay_exploration()
        rewards_per_episode.append(total_reward)

        if (episode + 1) % 100 == 0:
            current_success_rate = success_count / 100 if episode >= 99 else success_count / (episode + 1)
            print(f"Episode {episode + 1}/{episodes}, Average Reward: {np.mean(rewards_per_episode[-100:]):.2f}, "
                  f"Success Rate: {current_success_rate:.2f}, Exploration Rate: {agent.exploration_rate:.4f}")

            if current_success_rate > best_success_rate:
                best_success_rate = current_success_rate
                agent.best_model_state = copy.deepcopy(agent.policy_net.state_dict())
                best_env_config = initial_env_config
                best_episode = episode + 1
                print(f"Found Better Strategy! Success Rate: {best_success_rate:.2f}, Training Episode: {best_episode}")

            success_count = 0

    print(f"Training Complete! Best Success Rate: {best_success_rate:.2f}, Achieved at Episode {best_episode}")

    # 恢复最佳模型
    if agent.best_model_state:
        agent.policy_net.load_state_dict(agent.best_model_state)
        agent.target_net.load_state_dict(agent.best_model_state)

    # 恢复最佳环境配置
    agent.env.obstacles = best_env_config['obstacles']
    agent.env.exit = best_env_config['exit']
    agent.env.grid = np.zeros((agent.env.grid_size, agent.env.grid_size))
    for obs in agent.env.obstacles:
        agent.env.grid[obs] = -1
    agent.env.grid[agent.env.exit] = 1

    return agent.policy_net, rewards_per_episode, best_env_config, best_episode


def asd(env, dqn_model, env_config=None, best_episode=None):
    """测试DQN智能体"""
    if env_config:
        env.obstacles = env_config['obstacles']
        env.exit = env_config['exit']
        env.grid = np.zeros((env.grid_size, env.grid_size))
        for obs in env.obstacles:
            env.grid[obs] = -1
        env.grid[env.exit] = 1
        print(f"Using Best Strategy Environment - Obstacles: {env.obstacles}, Exit: {env.exit}")
    else:
        env.generate_random_positions()
        print(f"Test Environment - Obstacles: {env.obstacles}, Exit: {env.exit}")

    state = env.reset()
    done = False
    steps = 0
    total_reward = 0
    path = [state]
    game_result = "进行中"

    # 创建动画效果所需的图形对象
    fig, ax = plt.subplots(figsize=(8, 8))
    plt.ion()

    # 创建颜色映射
    cmap = plt.cm.colors.ListedColormap(['white', 'yellow', 'gray', 'red', 'lightblue'])
    bounds = [-0.5, 0.5, 1.5, 2.5, 3.5, 4.5]
    norm = plt.cm.colors.BoundaryNorm(bounds, cmap.N)

    # 初始化图像
    display_grid = env.grid.copy()
    display_grid[state] = 2

    for obs in env.obstacles:
        display_grid[obs] = 2

    img = ax.imshow(display_grid, cmap=cmap, norm=norm)

    # 添加网格线
    ax.grid(which='major', axis='both', linestyle='-', color='k', linewidth=2)
    ax.set_xticks(np.arange(-0.5, env.grid_size, 1))
    ax.set_yticks(np.arange(-0.5, env.grid_size, 1))

    # 隐藏刻度标签
    ax.set_xticklabels([])
    ax.set_yticklabels([])

    # 图例
    legend_elements = [
        plt.Rectangle((0, 0), 1, 1, color='white', label='Empty'),
        plt.Rectangle((0, 0), 1, 1, color='yellow', label='Exit'),
        plt.Rectangle((0, 0), 1, 1, color='gray', label='Obstacle'),
        plt.Rectangle((0, 0), 1, 1, color='red', label='Current'),
        plt.Rectangle((0, 0), 1, 1, color='lightblue', label='Path')
    ]
    ax.legend(handles=legend_elements, loc='upper center', bbox_to_anchor=(0.5, -0.05), ncol=5)

    plt.title(f"DQN Agent (Best Strategy from Episode {best_episode})")
    plt.tight_layout()
    plt.draw()
    plt.pause(0.5)

    while not done and steps < 100:
        # 将状态转换为张量 - 这里需要修改
        state_tensor = agent.state_to_tensor(state)  # 使用增强的状态表示

        # 使用DQN模型选择动作
        with torch.no_grad():
            q_values = dqn_model(state_tensor)
            action = torch.argmax(q_values).item()

        print(f"Position: {state}, Action: {env.action_names[action]}")
        next_state, reward, done = env.step(action)

        state = next_state
        path.append(state)
        total_reward += reward
        steps += 1

        # 更新动画
        display_grid = env.grid.copy()

        for obs in env.obstacles:
            display_grid[obs] = 2

        for pos in path[:-1]:
            if pos != env.exit and pos not in env.obstacles:
                display_grid[pos] = 3

        display_grid[state] = 2

        img.set_data(display_grid)
        plt.draw()
        plt.pause(0.3)

        # 判断游戏结果
        if done:
            if state == env.exit:
                game_result = "Success"
            else:
                game_result = "Failure"

    # 显示最终结果
    if game_result == "Success":
        plt.title(
            f"Successfully Reached Exit! Steps: {steps}, Total Reward: {total_reward:.2f}\n(Best Strategy from Episode {best_episode})")
    elif game_result == "Failure":
        if state in env.obstacles:
            plt.title(
                f"Hit Obstacle, Game Over. Steps: {steps}, Total Reward: {total_reward:.2f}\n(Best Strategy from Episode {best_episode})")
        else:
            plt.title(
                f"Hit Wall, Game Over. Steps: {steps}, Total Reward: {total_reward:.2f}\n(Best Strategy from Episode {best_episode})")
    else:
        plt.title(
            f"Failed to Reach Exit Within Step Limit. Steps: {steps}, Total Reward: {total_reward:.2f}\n(Best Strategy from Episode {best_episode})")

    plt.ioff()
    plt.show()

    return path, game_result


if __name__ == "__main__":
    # 创建环境和DQN智能体
    env = GridEnvironment()
    agent = DQNAgent(env)

    # 训练智能体
    print("开始训练...")
    policy_net, rewards, best_env_config, best_episode = train(agent, episodes=10000)
    print("训练完成!")

    # 测试智能体
    print("开始测试...")
    path, result = asd(env, policy_net, best_env_config, best_episode)

运行结果

Episode 10000/10000, Average Reward: 191.69, Success Rate: 0.82, Exploration Rate: 0.0500
Training Complete! Best Success Rate: 0.92, Achieved at Episode 7700
训练完成!
开始测试...
Using Best Strategy Environment - Obstacles: [(4, 8), (7, 1)], Exit: (9, 3)
Position: (0, 0), Action: Right
Position: (0, 1), Action: Right
Position: (0, 2), Action: Right
Position: (0, 3), Action: Down
Position: (1, 3), Action: Down
Position: (2, 3), Action: Down
Position: (3, 3), Action: Down
Position: (4, 3), Action: Down
Position: (5, 3), Action: Down
Position: (6, 3), Action: Down
Position: (7, 3), Action: Down
Position: (8, 3), Action: Down

整体流程
这段代码实现了一个深度Q网络(DQN)强化学习算法,用于解决10x10网格环境中的路径规划问题。智能体需要从起点出发,避开障碍物,找到出口。
整体流程分为以下几个部分:
环境构建
深度Q网络模型定义
经验回放缓冲区实现
智能体行为定义
智能体训练
测试与可视化
主要组件详解
1. 环境设计 GridEnvironment
初始化:10x10网格,起点固定在(0,0)
随机元素:随机生成2个障碍物和1个出口,且距离起点一定距离
状态转换:通过step方法实现行动后的状态转换
奖励设计:
撞到障碍物或边界:-100
达到出口:+200
每一步:-0.1
靠近出口:+3
远离出口:-1.5
2. 深度Q网络 DQNetwork
网络结构:4层全连接网络(128→128→64→输出层)
输入特征:8维向量(当前位置坐标2维+相对出口位置2维+四周障碍信息4维)
输出:4个动作的Q值(上、右、下、左)
权重初始化:使用Xavier均匀初始化,提高收敛性能
3. 经验回放缓冲区 ReplayBuffer
使用双端队列实现固定大小的经验存储
支持随机采样,打破经验之间的相关性
4. DQN智能体 DQNAgent
双网络结构:策略网络和目标网络,提高训练稳定性
增强状态表示:将原始坐标转换为包含环境信息的特征向量
探索策略:ε-greedy策略,随时间衰减探索率
学习过程:从经验回放中批量学习,更新Q值
梯度处理:包含梯度检查和裁剪机制,防止训练不稳定
5. 训练过程 train
训练10000个回合
每100回合评估一次成功率,保存最佳模型
跟踪并记录奖励和成功率,用于评估学习进展
6. 测试与可视化 asd
使用训练好的模型在环境中测试
动态可视化智能体移动路径和决策过程
重要改进与注意点
增强状态表示:
不仅包含坐标信息,还包含出口位置和障碍物信息
使智能体能够"看见"环境,而不仅仅是知道自己的位置
网络结构优化:
添加额外隐藏层(64个节点)
使用权重初始化技术提高收敛性能
奖励设计调整:
增加朝出口方向移动的正奖励
增加远离出口方向的负惩罚
稳定性措施:
梯度检查和裁剪机制
在backward()之后进行梯度处理
加入NULL检查,避免访问不存在的梯度
固定环境布局:
训练时使用固定布局,帮助智能体学习特定环境
保存最佳环境配置,用于测试时复现
测试时状态表示一致性:
确保测试时使用与训练时相同的状态表示方法
修复了原代码中测试函数使用错误状态表示的问题
可能的进一步改进方向
使用CNN:对于网格环境,可考虑使用卷积神经网络处理空间信息
增加内在激励:加入好奇心机制,鼓励探索未访问区域
多步学习:实现n步DQN或TD(λ)方法,加速价值传播
优先经验回放:根据TD误差大小优先采样重要经验
多任务学习:在不同环境布局下训练,提高泛化能力
通过这些设计和改进,这个DQN算法应该能够成功学习到避开障碍物并找到出口的策略。
  • ε-greedy探索策略的动态平衡 
  1. 探索率衰减机制:DQNAgent类通过exploration_decay参数(默认0.998)实现指数衰减,使探索率从初始值1.0逐渐降至最小值0.05。这种设计在训练初期鼓励更多随机探索,后期转向利用已有知识:
    1. self.exploration_rate = max(self.min_exploration_rate,
                                  self.exploration_rate * self.exploration_decay)
    2. 这种动态衰减平衡了早期探索未知状态和后期利用最优策略的需求        
  2. 随机动作选择逻辑,在choose_action方法中,当随机数小于探索率时,智能体会在动作空间中随机选择动作:
    1. if random.uniform(0, 1) < self.exploration_rate:
          return random.randint(0, self.action_size - 1)  # 完全随机探索
    2. 这种方式确保了即使在训练后期仍保留5%的随机探索概率,避免策略完全固化
  • 多维度奖励函数设计
  1. 基础奖励结构
    1. 稀疏奖励:到达出口获得+200,触碰障碍/边界获得-100
    2. ​密集奖励:每步固定惩罚-0.1,避免无效徘徊
    3. self.obstacle_reward = -100  # 障碍惩罚
      self.exit_reward = 200       # 出口奖励 
      self.step_reward = -0.1      # 步长惩罚
  2. 距离引导奖励
    1. 通过计算曼哈顿距离变化,给予方向性引导:
    2. if new_distance < prev_distance:
          distance_reward = 3     # 靠近出口奖励
      elif new_distance > prev_distance:
          distance_reward = -1.5  # 远离出口惩罚
    3. 这种设计属于奖励塑形技术,通过附加奖励加速学习
  3. 复合奖励计算
    1. 最终奖励是基础步长惩罚与距离奖励的叠加:
    2. return self.step_reward + distance_reward  # 综合奖励=-0.1±3/-1.5
    3. 这种设计实现了"密集奖励+方向引导"的混合策略
  •  智能体在探索后的数据保存到经验回放缓冲区的具体机制
  1. 存储的数据内容,每次智能体与环境交互后,会保存以下5个关键数据元素到经验回放缓冲区
    1. **state**:当前状态,表示智能体的位置(以二维坐标元组形式存储,如 (row, col)
    2. ​**action**:执行的动作(整数,0-3对应上下左右)
    3. ​**reward**:执行动作后获得的即时奖励(浮点数,如-100表示撞障碍,+200表示到达出口)
    4. ​**next_state**:下一时刻的状态(同样为二维坐标元组)
    5. ​**done**:终止标志(布尔值,True表示回合结束)
  2.  ​存储方式与流程
    1. 触发时机:每次执行动作后,在 DQNAgent  store_experience 方法中调用 memory.add()
    2. 缓冲区结构:使用双端队列 deque 实现(容量上限为10,000),通过 ReplayBuffer 类管理
    3. class ReplayBuffer:
          def add(self, state, action, reward, next_state, done):
              self.buffer.append((state, action, reward, next_state, done))
    4. 其中 buffer 存储的是元组 (state, action, reward, next_state, done) 的集合
  3. 数据格式特点
    1. ​原始状态保存:存储的是未经处理的原始坐标(如 (3,5)),而非神经网络输入的8维张量(张量转换在 state_to_tensor 方法中动态完成)
    2. 批量采样机制:训练时通过 sample(batch_size) 随机抽取一批经验,返回格式为 (states, actions, rewards, next_states, dones) 的元组列表
    3. ​终止状态处理:done=True 时表示后续无 next_state,此时目标Q值仅包含当前奖励
    4. 原始存储: state=(2,3), next_state=(2,4)
      转换为张量: [2, 3, 6, 2, 0, 1, 0, 0]  # 含位置(2,3)、相对出口(6,2)、四方向障碍检测(右方向存在障碍)
  • DQN网络特征向量的特点

DQN网络接收的输入特征向量是一个8维向量,具体组成如下:

  1. 当前位置坐标 (2维)
    1. row: 当前格子的行坐标 (0-9)
    2. col: 当前格子的列坐标 (0-9)
  2. 相对出口位置 (2维)
    1. rel_exit_row: 出口行坐标减去当前行坐标 (即 exit_row - row)
    2. rel_exit_col: 出口列坐标减去当前列坐标 (即 exit_col - col)
    3. 这两个值表示从当前位置到出口的相对方向和距离
  3. 周围障碍物信息 (4维)
    1. 上方是否有障碍物或边界 (1.0表示有,0.0表示没有)
    2. 右方是否有障碍物或边界 (1.0表示有,0.0表示没有)
    3. 下方是否有障碍物或边界 (1.0表示有,0.0表示没有)
    4. 左方是否有障碍物或边界 (1.0表示有,0.0表示没有)

特征设计的重要性

这种增强的状态表示方式相比仅使用坐标(row, col)的简单表示有几个关键优势

  1. 环境感知:智能体能够"看到"出口在哪里以及周围有哪些障碍物
  2. 方向性信:通过相对出口位置,智能体可以知道应该向哪个方向移动才能接近出口
  3. 碍物避让:通过周围障碍物信息,智能体可以学习避开障碍物的策略
  4. 空间感知:结合当前位置和相对位置,智能体能够在网格中建立起自己的空间感知

这种丰富的状态表示是这个DQN算法能够成功学习最优策略的关键因素之一,使得智能体能够在复杂环境中找到从起点到出口的最优路径

def state_to_tensor(self, state):
    """将状态转换为增强的张量表示"""
    # 获取当前位置
    row, col = state
    
    # 计算到出口的相对位置
    exit_row, exit_col = self.env.exit
    rel_exit_row = exit_row - row
    rel_exit_col = exit_col - col
    
    # 周围是否有障碍物的信息
    obstacles_info = []
    for d_row, d_col in self.env.actions:
        new_row, new_col = row + d_row, col + d_col
        # 检查是否是障碍物或边界
        is_obstacle = 1.0 if (new_row < 0 or new_row >= self.env.grid_size or 
                             new_col < 0 or new_col >= self.env.grid_size or 
                             (new_row, new_col) in self.env.obstacles) else 0.0
        obstacles_info.append(is_obstacle)
    
    # 合并所有信息
    return torch.FloatTensor([row, col, rel_exit_row, rel_exit_col] + obstacles_info)
  • DQN网络的输出是什么?

DQN网络接收8维特征向量作为输入,经过神经网络处理后,输出的是一个4维向量,每个维度对应一个动作(上、右、下、左)的Q值

q_values = [Q值(上), Q值(右), Q值(下), Q值(左)]

其中,Q值(Q-value)代表在当前状态下,选择该动作后能够获得的预期累积奖励。这是强化学习中的核心概念,表示从当前状态出发,采取特定动作,然后遵循最优策略能够获得的长期回报。

def forward(self, x):
    x = F.relu(self.fc1(x))
    x = F.relu(self.fc2(x))
    x = F.relu(self.fc3(x))
    return self.fc4(x)  # 返回4个动作的Q值
  • 输出对测试结果的意义

在测试阶段,DQN网络的输出被用来决定智能体的行动方式,这种方式对测试结果有以下几个关键意义:

  1. 策依据
    1. Q值越高表示该动作预期带来的长期收益越大
    2. 选择Q值最高的动作意味着选择"最优"的行动方向
    3. 这种贪婪策略(选择最大Q值)反映了DQN学到的策略质量
  2. 路径规划能力评估
    1. 如果DQN模型学习得好,它应该能为每个状态都选出合理的动作
    2. 这些动作串联起来应该形成一条从起点到出口的可行路径
    3. 试过程可以直观展示模型学习到的路径规划能力
  3. 略质量验证
    1. 功找到出口表明模型学习到了有效策
    2. 撞到障碍物或边界表明模型策略存在缺陷
    3. 在固定环境中的表现可以量化模型的策略质量(如步数、总奖励等)
  4. 环境理解程度
    1. Q值分布反映了模型对环境的理解程度
    2. 对危险区域(如障碍物附近)的正确评估表明模型理解了环境结构
    3. 沿着高Q值路径行走应该能够安全到达目标
  5. 际测试中的意义,在实际测试可视化中,我们可以观察到
    1. 路径规划:智能体是否能找到一条通往出口的有效路径
    2. 障碍物避让:智能体是否能成功避开障碍物和边界
    3. :智能体从起点到出口用了多少步(越少越好
    4. 一致性:多次测试中是否能稳定表现出高质量的决策能力

如果测试结果显示智能体能够稳定地找到最短或接近最短的路径到达出口,同时成功避开所有障碍物,这就表明DQN网络已经成功学习到了环境中的最优策略,其输出的Q值能够准确地引导智能体做出正确的决策

# 使用DQN模型选择动作
with torch.no_grad():
    q_values = dqn_model(state_tensor)
    action = torch.argmax(q_values).item()  # 选择Q值最高的动作

蒙特卡洛树算法

1987年Bruce Abramson在他的博士论文中提出了基于蒙特卡洛方法的树搜索这一想法。这种算法简而言之是用蒙特卡洛方法估算每一种走法的胜率。如果描述的再具体一些,通过不断的模拟每一种走法,直至终局,该走法的模拟总次数N,与胜局次数W,即可推算出该走法的胜率为\(W\over N\)。通过随机的对游戏进行推演来逐渐建立一颗不对称的搜索树的过程。

蒙特卡洛算法本身并非强化学习,具体可以参考算法可视化 中的蒙特卡洛方法,蒙特卡洛树强化学习算法通常指结合了蒙特卡洛树搜索(Monte Carlo Tree Search, MCTS)​与强化学习(Reinforcement Learning, RL)​的混合算法,典型代表如DeepMind的AlphaGo、AlphaZero等。这类算法通过将MCTS的搜索能力与强化学习的策略优化相结合,能够在复杂决策问题中实现高效探索和策略迭代。以下从核心概念、算法流程、关键技术和应用场景展开详细说明。

一、核心概念

  1. ​蒙特卡洛树搜索(MCTS)​:

    • 一种基于随机模拟的树搜索算法,通过反复模拟可能的状态转移路径,逐步构建一棵非对称的搜索树,最终选择最优动作。
    • ​核心步骤:选择(Selection)、扩展(Expansion)、模拟(Simulation)、回溯(Backpropagation)。
  2. ​强化学习(RL)

    • 通过智能体与环境的交互学习最优策略,目标是最大化累积奖励。RL的关键要素包括状态(State)、动作(Action)、奖励(Reward)和策略(Policy)。
  3. ​MCTS与RL的结合:

    • ​MCTS为RL提供搜索能力:通过模拟探索状态空间,减少对经验数据的依赖。
    • ​RL优化MCTS的策略:用强化学习的策略网络和价值网络引导搜索方向,提升效率。
import matplotlib
matplotlib.use('Agg')  # 使用非交互式后端
import matplotlib.pyplot as plt
import numpy as np
import random
import copy
import torch
import torch.nn as nn
import torch.optim as optim
from collections import deque, defaultdict
import torch.nn.functional as F
import math
import time


class GridEnvironment:
    def __init__(self):
        # 初始化10x10的棋盘
        self.grid_size = 10
        self.grid = np.zeros((self.grid_size, self.grid_size))

        # 初始位置
        self.start_position = (0, 0)
        self.current_position = self.start_position

        # 随机生成障碍物和出口位置
        self.generate_random_positions()

        # Action space: Up(0), Right(1), Down(2), Left(3)
        self.actions = [(-1, 0), (0, 1), (1, 0), (0, -1)]
        self.action_names = ["Up", "Right", "Down", "Left"]

        # 修改奖励设置,增强正向激励
        self.obstacle_reward = -100
        self.exit_reward = 200  # 增加出口奖励
        self.step_reward = -0.1  # 进一步减小每步惩罚

    def generate_random_positions(self):
        # 所有可能的位置(除了起始位置)
        all_positions = [(i, j) for i in range(self.grid_size) for j in range(self.grid_size) if
                         (i, j) != self.start_position]

        # 筛选出离起点有一定距离的位置(使用曼哈顿距离)
        min_distance = 5  # 设置最小距离阈值
        distant_positions = [pos for pos in all_positions
                             if self.calculate_distance(self.start_position, pos) >= min_distance]

        # 如果没有足够的远距离位置,则使用所有可用位置
        if len(distant_positions) < 8:  # 需要7个障碍物和1个出口
            distant_positions = all_positions

        # 随机选择7个位置作为障碍物(减少到7个)
        self.obstacles = random.sample(distant_positions, 7)

        # 从剩余位置中选择一个作为出口,优先选择远离起点的位置
        remaining_positions = [pos for pos in all_positions if pos not in self.obstacles]
        # 按照与起点的距离对剩余位置进行排序(从远到近)
        remaining_positions.sort(key=lambda pos: self.calculate_distance(self.start_position, pos), reverse=True)
        # 从前半部分选择出口位置(确保出口也相对远离起点)
        exit_candidates = remaining_positions[:len(remaining_positions) // 2]
        self.exit = random.choice(exit_candidates)

        # 更新网格
        self.grid = np.zeros((self.grid_size, self.grid_size))
        for obs in self.obstacles:
            self.grid[obs] = -1
        self.grid[self.exit] = 1

    def reset(self):
        # 重置智能体位置到起点
        self.current_position = self.start_position
        return self.current_position

    def calculate_distance(self, pos1, pos2):
        return abs(pos1[0] - pos2[0]) + abs(pos1[1] - pos2[1])  # 曼哈顿距离

    def step(self, action):
        # 保存移动前的位置和到出口的距离
        prev_position = self.current_position
        prev_distance = self.calculate_distance(prev_position, self.exit)

        # 执行原有的移动逻辑
        row, col = self.current_position
        d_row, d_col = self.actions[action]

        # 计算新位置
        new_row = row + d_row
        new_col = col + d_col

        # 检查是否超出边界
        if new_row < 0 or new_row >= self.grid_size or new_col < 0 or new_col >= self.grid_size:
            # 如果超出边界,保持原位置不变,给予惩罚,并结束游戏
            return self.current_position, self.obstacle_reward, True  # 撞到围栏也算失败

        # 更新位置
        self.current_position = (new_row, new_col)

        # 检查是否撞到障碍物
        if self.current_position in self.obstacles:
            return self.current_position, self.obstacle_reward, True

        # 检查是否到达出口
        if self.current_position == self.exit:
            return self.current_position, self.exit_reward, True

        # 计算新的距离
        new_distance = self.calculate_distance(self.current_position, self.exit)

        # 基于距离变化的奖励,增强正向激励
        distance_reward = 0
        if new_distance < prev_distance:
            distance_reward = 3  # 增加朝出口方向移动的奖励
        elif new_distance > prev_distance:
            distance_reward = -1.5  # 增加远离出口方向的惩罚

        # 返回新状态、奖励和是否结束
        return self.current_position, self.step_reward + distance_reward, False

    def render(self, path=None):
        # 可视化棋盘
        fig, ax = plt.subplots(figsize=(8, 8))

        # 创建颜色映射
        cmap = plt.cm.colors.ListedColormap(['white', 'yellow', 'black', 'red'])
        bounds = [-0.5, 0.5, 1.5, 2.5, 3.5]
        norm = plt.cm.colors.BoundaryNorm(bounds, cmap.N)

        # 复制棋盘用于显示
        display_grid = self.grid.copy()

        # 标记当前位置为红色
        if path:
            for pos in path:
                if pos != self.exit:  # 不覆盖出口
                    display_grid[pos] = 2
        else:
            display_grid[self.current_position] = 2

        # 显示棋盘
        ax.imshow(display_grid, cmap=cmap, norm=norm)

        # 添加网格线
        ax.grid(which='major', axis='both', linestyle='-', color='k', linewidth=2)
        ax.set_xticks(np.arange(-0.5, self.grid_size, 1))
        ax.set_yticks(np.arange(-0.5, self.grid_size, 1))

        # 隐藏刻度标签
        ax.set_xticklabels([])
        ax.set_yticklabels([])

        plt.title("红色:当前位置,黑色:障碍物,黄色:出口")
        plt.tight_layout()
        plt.show()


# MCTS节点类
class MCTSNode:
    def __init__(self, state, parent=None, action=None):
        self.state = state  # 状态
        self.parent = parent  # 父节点
        self.action = action  # 从父节点到达此节点的动作
        self.children = {}  # 子节点 {action: node}
        self.visits = 0  # 访问次数
        self.value = 0.0  # 节点价值
        self.untried_actions = None  # 未尝试的动作

    def is_fully_expanded(self):
        """检查是否所有可能的动作都已经被尝试"""
        return self.untried_actions is not None and len(self.untried_actions) == 0

    def best_child(self, exploration_weight=1.0):
        """选择最佳子节点,使用UCB1公式"""
        if not self.children:
            return None
            
        # 使用UCB1公式选择最佳子节点
        def ucb_score(n):
            exploitation = n.value / n.visits if n.visits > 0 else 0
            exploration = exploration_weight * math.sqrt(2 * math.log(self.visits) / n.visits) if n.visits > 0 else float('inf')
            return exploitation + exploration
            
        return max(self.children.values(), key=ucb_score)

    def expand(self, action, next_state):
        """扩展一个新的子节点"""
        child = MCTSNode(next_state, parent=self, action=action)
        self.children[action] = child
        if self.untried_actions is not None:
            self.untried_actions.remove(action)
        return child

    def update(self, result):
        """更新节点的统计信息"""
        self.visits += 1
        self.value += result


# MCTS智能体
class MCTSAgent:
    def __init__(self, env, simulation_count=100, max_depth=50, exploration_weight=1.0):
        self.env = env
        self.simulation_count = simulation_count  # 每次决策的模拟次数
        self.max_depth = max_depth  # 最大模拟深度
        self.exploration_weight = exploration_weight  # UCB1公式中的探索权重
        self.value_table = defaultdict(float)  # 状态-价值表
        self.visit_count = defaultdict(int)  # 状态访问计数
        
        # 用于存储训练过程中的最佳模型
        self.best_model_state = None
        self.best_success_rate = 0

    def get_action(self, state):
        """使用MCTS选择最佳动作"""
        root = MCTSNode(state)
        root.untried_actions = list(range(len(self.env.actions)))
        
        # 执行模拟
        for _ in range(self.simulation_count):
            # 选择阶段
            node = self._select(root)
            
            # 扩展阶段
            if not node.is_fully_expanded() and node.visits > 0:
                node = self._expand(node)
            
            # 模拟阶段
            reward = self._simulate(node.state)
            
            # 回溯阶段
            self._backpropagate(node, reward)
        
        # 选择访问次数最多的子节点对应的动作
        if not root.children:
            return random.randint(0, len(self.env.actions) - 1)
            
        return max(root.children.items(), key=lambda x: x[1].visits)[0]

    def _select(self, node):
        """选择阶段:从根节点选择到叶节点"""
        while not self._is_terminal(node.state) and node.is_fully_expanded():
            child = node.best_child(self.exploration_weight)
            if child is None:
                break
            node = child
        return node

    def _expand(self, node):
        """扩展阶段:为当前节点添加一个新的子节点"""
        if node.untried_actions is None:
            node.untried_actions = list(range(len(self.env.actions)))
            
        action = random.choice(node.untried_actions)
        
        # 复制环境状态
        env_copy = copy.deepcopy(self.env)
        env_copy.current_position = node.state
        
        # 执行动作
        next_state, _, done = env_copy.step(action)
        
        # 如果游戏结束,不扩展
        if done:
            return node
            
        # 扩展新节点
        return node.expand(action, next_state)

    def _simulate(self, state):
        """改进的模拟阶段:使用简单启发式策略而不是纯随机"""
        env_copy = copy.deepcopy(self.env)
        env_copy.current_position = state
        
        current_state = state
        total_reward = 0
        depth = 0
        done = False
        
        # 使用启发式策略模拟直到游戏结束或达到最大深度
        while not done and depth < self.max_depth:
            # 计算到出口的方向
            row, col = current_state
            exit_row, exit_col = env_copy.exit
            
            # 计算每个动作的启发式得分
            action_scores = []
            for i, (d_row, d_col) in enumerate(env_copy.actions):
                new_row, new_col = row + d_row, col + d_col
                
                # 检查是否超出边界或撞到障碍物
                if (new_row < 0 or new_row >= env_copy.grid_size or 
                    new_col < 0 or new_col >= env_copy.grid_size or
                    (new_row, new_col) in env_copy.obstacles):
                    action_scores.append(-1000)  # 非常低的分数
                    continue
                
                # 计算新位置到出口的距离
                new_dist = abs(new_row - exit_row) + abs(new_col - exit_col)
                curr_dist = abs(row - exit_row) + abs(col - exit_col)
                
                # 距离减少得高分,增加得低分
                if new_dist < curr_dist:
                    action_scores.append(10)
                elif new_dist > curr_dist:
                    action_scores.append(1)
                else:
                    action_scores.append(5)
            
            # 如果所有动作都不可行,随机选择
            if all(score == -1000 for score in action_scores):
                action = random.randint(0, len(env_copy.actions) - 1)
            else:
                # 按概率选择动作(得分高的概率大)
                valid_actions = [i for i, score in enumerate(action_scores) if score > -1000]
                valid_scores = [action_scores[i] for i in valid_actions]
                
                # 转换为概率
                total = sum(valid_scores)
                if total == 0:
                    probs = [1.0 / len(valid_actions)] * len(valid_actions)
                else:
                    probs = [score / total for score in valid_scores]
                
                action = random.choices(valid_actions, weights=probs, k=1)[0]
            
            next_state, reward, done = env_copy.step(action)
            current_state = next_state
            total_reward += reward
            depth += 1
            
            # 如果到达出口,给予额外奖励
            if done and current_state == self.env.exit:
                total_reward += 100
        
        return total_reward

    def _backpropagate(self, node, reward):
        """回溯阶段:更新节点的统计信息"""
        while node is not None:
            node.update(reward)
            # 更新状态-价值表
            state_key = str(node.state)
            self.value_table[state_key] = ((self.value_table[state_key] * self.visit_count[state_key]) + reward) / (self.visit_count[state_key] + 1)
            self.visit_count[state_key] += 1
            node = node.parent

    def _is_terminal(self, state):
        """检查状态是否为终止状态"""
        return state == self.env.exit or state in self.env.obstacles

    def save_model(self, path):
        """保存模型(状态-价值表)"""
        model_data = {
            'value_table': dict(self.value_table),
            'visit_count': dict(self.visit_count)
        }
        torch.save(model_data, path)

    def load_model(self, path):
        """加载模型(状态-价值表)"""
        model_data = torch.load(path)
        self.value_table = defaultdict(float, model_data['value_table'])
        self.visit_count = defaultdict(int, model_data['visit_count'])


def train_mcts(agent, episodes=1000):
    """训练MCTS智能体"""
    rewards_per_episode = []
    success_count = 0
    best_success_rate = 0
    best_episode = 0

    # 为训练生成固定的环境布局
    agent.env.generate_random_positions()
    print(f"训练环境 - 障碍物: {agent.env.obstacles}, 出口: {agent.env.exit}")

    # 保存初始环境配置
    best_env_config = {
        'obstacles': agent.env.obstacles.copy(),
        'exit': agent.env.exit
    }

    for episode in range(episodes):
        state = agent.env.reset()
        total_reward = 0
        done = False
        steps = 0

        while not done and steps < 100:
            action = agent.get_action(state)
            next_state, reward, done = agent.env.step(action)

            state = next_state
            total_reward += reward
            steps += 1

        if done and state == agent.env.exit:
            success_count += 1

        rewards_per_episode.append(total_reward)

        if (episode + 1) % 10 == 0:
            current_success_rate = success_count / 10 if episode >= 9 else success_count / (episode + 1)
            print(f"Episode {episode + 1}/{episodes}, 平均奖励: {np.mean(rewards_per_episode[-10:]):.2f}, "
                  f"成功率: {current_success_rate:.2f}")

            if current_success_rate > best_success_rate:
                best_success_rate = current_success_rate
                agent.best_model_state = {
                    'value_table': dict(agent.value_table),
                    'visit_count': dict(agent.visit_count)
                }
                best_env_config = {
                    'obstacles': agent.env.obstacles.copy(),
                    'exit': agent.env.exit
                }
                best_episode = episode + 1
                print(f"找到更好的策略! 成功率: {best_success_rate:.2f}, 训练轮次: {best_episode}")

            success_count = 0

    print(f"训练完成! 最佳成功率: {best_success_rate:.2f}, 在第 {best_episode} 轮达成")

    # 恢复最佳模型
    if agent.best_model_state:
        agent.value_table = defaultdict(float, agent.best_model_state['value_table'])
        agent.visit_count = defaultdict(int, agent.best_model_state['visit_count'])

    # 恢复最佳环境配置
    agent.env.obstacles = best_env_config['obstacles']
    agent.env.exit = best_env_config['exit']
    agent.env.grid = np.zeros((agent.env.grid_size, agent.env.grid_size))
    for obs in agent.env.obstacles:
        agent.env.grid[obs] = -1
    agent.env.grid[agent.env.exit] = 1

    return rewards_per_episode, best_env_config, best_episode


def test_mcts(env, agent, env_config=None, best_episode=None, headless=False):
    """测试MCTS智能体,支持无图形界面模式"""
    if env_config:
        env.obstacles = env_config['obstacles']
        env.exit = env_config['exit']
        env.grid = np.zeros((env.grid_size, env.grid_size))
        for obs in env.obstacles:
            env.grid[obs] = -1
        env.grid[env.exit] = 1
        print(f"使用最佳策略环境 - 障碍物: {env.obstacles}, 出口: {env.exit}")
    else:
        env.generate_random_positions()
        print(f"测试环境 - 障碍物: {env.obstacles}, 出口: {env.exit}")

    state = env.reset()
    done = False
    steps = 0
    total_reward = 0
    path = [state]
    game_result = "进行中"

    # 如果是无图形界面模式,跳过可视化部分
    if not headless:
        try:
            # 创建动画效果所需的图形对象
            fig, ax = plt.subplots(figsize=(8, 8))
            plt.ion()

            # 创建颜色映射
            cmap = plt.cm.colors.ListedColormap(['white', 'yellow', 'gray', 'red', 'lightblue'])
            bounds = [-0.5, 0.5, 1.5, 2.5, 3.5, 4.5]
            norm = plt.cm.colors.BoundaryNorm(bounds, cmap.N)

            # 初始化图像
            display_grid = env.grid.copy()
            display_grid[state] = 2

            for obs in env.obstacles:
                display_grid[obs] = 2

            img = ax.imshow(display_grid, cmap=cmap, norm=norm)

            # 添加网格线
            ax.grid(which='major', axis='both', linestyle='-', color='k', linewidth=2)
            ax.set_xticks(np.arange(-0.5, env.grid_size, 1))
            ax.set_yticks(np.arange(-0.5, env.grid_size, 1))

            # 隐藏刻度标签
            ax.set_xticklabels([])
            ax.set_yticklabels([])

            # 图例
            legend_elements = [
                plt.Rectangle((0, 0), 1, 1, color='white', label='空白'),
                plt.Rectangle((0, 0), 1, 1, color='yellow', label='出口'),
                plt.Rectangle((0, 0), 1, 1, color='gray', label='障碍物'),
                plt.Rectangle((0, 0), 1, 1, color='red', label='当前位置'),
                plt.Rectangle((0, 0), 1, 1, color='lightblue', label='路径')
            ]
            ax.legend(handles=legend_elements, loc='upper center', bbox_to_anchor=(0.5, -0.05), ncol=5)

            plt.title(f"MCTS智能体 (来自第 {best_episode} 轮的最佳策略)")
            plt.tight_layout()
            plt.draw()
            plt.pause(0.5)
        except Exception as e:
            print(f"可视化初始化失败,切换到无图形界面模式: {e}")
            headless = True

    while not done and steps < 100:
        # 使用MCTS选择动作
        action = agent.get_action(state)

        print(f"位置: {state}, 动作: {env.action_names[action]}")
        next_state, reward, done = env.step(action)

        state = next_state
        path.append(state)
        total_reward += reward
        steps += 1

        # 更新动画(如果不是无图形界面模式)
        if not headless:
            try:
                display_grid = env.grid.copy()

                for obs in env.obstacles:
                    display_grid[obs] = 2

                for pos in path[:-1]:
                    if pos != env.exit and pos not in env.obstacles:
                        display_grid[pos] = 3

                display_grid[state] = 2

                img.set_data(display_grid)
                plt.draw()
                plt.pause(0.3)
            except Exception as e:
                print(f"可视化更新失败: {e}")
                headless = True

        # 判断游戏结果
        if done:
            if state == env.exit:
                game_result = "成功"
            else:
                game_result = "失败"

    # 显示最终结果(如果不是无图形界面模式)
    if not headless:
        try:
            if game_result == "成功":
                plt.title(
                    f"成功到达出口! 步数: {steps}, 总奖励: {total_reward:.2f}\n(来自第 {best_episode} 轮的最佳策略)")
            elif game_result == "失败":
                if state in env.obstacles:
                    plt.title(
                        f"撞到障碍物,游戏结束。步数: {steps}, 总奖励: {total_reward:.2f}\n(来自第 {best_episode} 轮的最佳策略)")
                else:
                    plt.title(
                        f"撞到墙壁,游戏结束。步数: {steps}, 总奖励: {total_reward:.2f}\n(来自第 {best_episode} 轮的最佳策略)")
            else:
                plt.title(
                    f"未能在步数限制内到达出口。步数: {steps}, 总奖励: {total_reward:.2f}\n(来自第 {best_episode} 轮的最佳策略)")

            plt.ioff()
            plt.savefig(f"mcts_result_episode_{best_episode}.png")  # 保存图像而不是显示
            plt.close()
            print(f"结果图像已保存为 mcts_result_episode_{best_episode}.png")
        except Exception as e:
            print(f"保存结果图像失败: {e}")

    # 打印文本结果
    print(f"测试结果: {game_result}")
    print(f"总步数: {steps}")
    print(f"总奖励: {total_reward:.2f}")
    print(f"路径: {path}")

    return path, game_result


if __name__ == "__main__":
    # 创建环境和MCTS智能体
    env = GridEnvironment()
    
    # 修改环境的generate_random_positions方法来减少障碍物数量
    original_generate = env.generate_random_positions
    
    def generate_fewer_obstacles():
        # 所有可能的位置(除了起始位置)
        all_positions = [(i, j) for i in range(env.grid_size) for j in range(env.grid_size) if
                         (i, j) != env.start_position]

        # 筛选出离起点有一定距离的位置(使用曼哈顿距离)
        min_distance = 5  # 设置最小距离阈值
        distant_positions = [pos for pos in all_positions
                             if env.calculate_distance(env.start_position, pos) >= min_distance]

        # 如果没有足够的远距离位置,则使用所有可用位置
        if len(distant_positions) < 8:  # 需要7个障碍物和1个出口
            distant_positions = all_positions

        # 随机选择7个位置作为障碍物(减少到7个)
        env.obstacles = random.sample(distant_positions, 7)

        # 从剩余位置中选择一个作为出口,优先选择远离起点的位置
        remaining_positions = [pos for pos in all_positions if pos not in env.obstacles]
        # 按照与起点的距离对剩余位置进行排序(从远到近)
        remaining_positions.sort(key=lambda pos: env.calculate_distance(env.start_position, pos), reverse=True)
        # 从前半部分选择出口位置(确保出口也相对远离起点)
        exit_candidates = remaining_positions[:len(remaining_positions) // 2]
        env.exit = random.choice(exit_candidates)

        # 更新网格
        env.grid = np.zeros((env.grid_size, env.grid_size))
        for obs in env.obstacles:
            env.grid[obs] = -1
        env.grid[env.exit] = 1
    
    # 替换原始方法
    env.generate_random_positions = generate_fewer_obstacles
    
    # 重新生成环境
    env.generate_random_positions()
    
    # 创建智能体,增加模拟次数
    agent = MCTSAgent(env, simulation_count=1000, max_depth=100)  # 进一步增加模拟次数
    
    # 训练智能体
    print("开始训练...")
    start_time = time.time()
    rewards, best_env_config, best_episode = train_mcts(agent, episodes=500)
    end_time = time.time()
    print(f"训练完成! 耗时: {end_time - start_time:.2f}秒")

    # 测试智能体,添加headless参数
    print("开始测试...")
    try:
        path, result = test_mcts(env, agent, best_env_config, best_episode, headless=False)
    except Exception as e:
        print(f"图形界面测试失败,切换到无图形界面模式: {e}")
        path, result = test_mcts(env, agent, best_env_config, best_episode, headless=True)
Traceback (most recent call last):
  File "/home/ubuntu/Downloads/yolov5-rk_opt-v6.2/mtkl.py", line 634, in <module>
    agent._simulate = improved_simulate.__get__(agent, MCTSAgent)
NameError: name 'improved_simulate' is not defined

 

展开阅读全文
加载中
点击引领话题📣 发布并加入讨论🔥
0 评论
0 收藏
1
分享
返回顶部
顶部