安徽工程建设信息网站王开林,什么网站可以做线上小游戏,新公司成立建设网站,怎么做模板网站第九章#xff1a;连续动作深度Q网络 - 小智的精确控制
9.1 连续动作的挑战
之前我们的小智只能选择离散的动作#xff1a;上、下、左、右、不动。但现实中的很多任务需要连续的动作。
例子#xff1a;
机器人控制#xff1a;关节角度、力矩大小自动驾驶#xff1a;方…
第九章连续动作深度Q网络 - 小智的精确控制
9.1 连续动作的挑战
之前我们的小智只能选择离散的动作上、下、左、右、不动。但现实中的很多任务需要连续的动作。
例子
机器人控制关节角度、力矩大小自动驾驶方向盘角度、油门/刹车力度游戏AI移动方向的精确角度和速度
问题传统DQN无法处理连续动作空间因为
无法枚举所有可能的动作无法计算 max a Q ( s , a ) \max_a Q(s,a) maxaQ(s,a)动作空间可能是无限的
9.2 连续动作的解决方案
9.2.1 动作离散化
最简单的方法将连续动作空间离散化。
例子假设小智可以选择移动的精确角度
原始动作角度 θ ∈ [ 0 , 2 π ) \theta \in [0, 2\pi) θ∈[0,2π)离散化选择8个方向 { 0 ° , 45 ° , 90 ° , 135 ° , 180 ° , 225 ° , 270 ° , 315 ° } \{0°, 45°, 90°, 135°, 180°, 225°, 270°, 315°\} {0°,45°,90°,135°,180°,225°,270°,315°}
优点
可以直接使用DQN实现简单
缺点
失去了连续性离散化粒度难以选择高维动作空间会导致组合爆炸
9.2.2 函数近似方法
思路用函数近似器来处理连续动作。
方法1最大化优化
对每个状态用优化算法求解 a ∗ arg max a Q ( s , a ; θ ) a^* \arg\max_a Q(s,a;\theta) a∗argmaxaQ(s,a;θ)问题计算复杂度高不适用于实时决策
方法2NAF (Normalized Advantage Functions)
将Q函数参数化为特殊形式使得最大化有解析解问题限制了Q函数的表达能力
9.2.3 演员-评论员方法
最成功的方法结合策略梯度和价值函数
演员(Actor)策略网络输出连续动作评论员(Critic)价值网络评估动作好坏
这就是我们下一章要学习的内容
9.3 网格世界中的连续动作
让我们扩展网格世界使小智可以选择连续的移动方向和速度
9.3.1 连续动作定义
动作空间 A { ( θ , v ) ∣ θ ∈ [ 0 , 2 π ) , v ∈ [ 0 , 1 ] } \mathcal{A} \{(\theta, v) | \theta \in [0, 2\pi), v \in [0, 1]\} A{(θ,v)∣θ∈[0,2π),v∈[0,1]}
其中 θ \theta θ移动方向角度 v v v移动速度0表示不动1表示最大速度
9.3.2 动作到位置的映射
位置更新 x t 1 x t v cos ( θ ) ⋅ step_size x_{t1} x_t v \cos(\theta) \cdot \text{step\_size} xt1xtvcos(θ)⋅step_size y t 1 y t v sin ( θ ) ⋅ step_size y_{t1} y_t v \sin(\theta) \cdot \text{step\_size} yt1ytvsin(θ)⋅step_size
9.4 动作离散化的DQN实现
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import numpy as np
import random
from collections import deque
import matplotlib.pyplot as plt
import mathclass ContinuousGridWorldEnv:连续动作网格世界环境def __init__(self, grid_size4, step_size0.3):self.grid_size grid_sizeself.step_size step_size# 起点和终点连续坐标self.start_pos np.array([0.5, 0.5]) # 左上角self.goal_pos np.array([3.5, 3.5]) # 右下角# 障碍物矩形区域self.obstacles [{min: np.array([1.0, 1.0]), max: np.array([2.0, 2.0])}, # (2,2){min: np.array([2.0, 2.0]), max: np.array([3.0, 3.0])} # (3,3)]# 边界self.bounds {min: np.array([0.0, 0.0]), max: np.array([4.0, 4.0])}# 当前状态self.current_pos self.start_pos.copy()def reset(self):重置环境self.current_pos self.start_pos.copy()return self.get_state()def get_state(self):获取当前状态# 状态包含当前位置和目标位置的相对位置relative_pos self.goal_pos - self.current_posstate np.concatenate([self.current_pos, relative_pos])return statedef is_in_obstacle(self, pos):检查位置是否在障碍物中for obs in self.obstacles:if (obs[min][0] pos[0] obs[max][0] and obs[min][1] pos[1] obs[max][1]):return Truereturn Falsedef is_out_of_bounds(self, pos):检查位置是否超出边界return (pos[0] self.bounds[min][0] or pos[0] self.bounds[max][0] orpos[1] self.bounds[min][1] or pos[1] self.bounds[max][1])def step(self, action):执行动作theta, velocity action# 计算新位置dx velocity * math.cos(theta) * self.step_sizedy velocity * math.sin(theta) * self.step_sizenew_pos self.current_pos np.array([dx, dy])# 检查碰撞if self.is_out_of_bounds(new_pos) or self.is_in_obstacle(new_pos):# 碰撞不移动reward -10.0done Falseelse:# 合法移动old_dist np.linalg.norm(self.current_pos - self.goal_pos)self.current_pos new_posnew_dist np.linalg.norm(self.current_pos - self.goal_pos)# 奖励设计if new_dist 0.3: # 到达目标reward 100.0done Trueelse:# 鼓励接近目标reward (old_dist - new_dist) * 10 - 1 # 移动成本done Falsereturn self.get_state(), reward, doneclass DiscretizedContinuousActionDQN:离散化连续动作的DQNdef __init__(self, state_size, angle_bins8, velocity_bins3):self.state_size state_sizeself.angle_bins angle_binsself.velocity_bins velocity_binsself.action_size angle_bins * velocity_bins# 创建离散动作空间self.actions self._create_discrete_actions()# DQN网络self.q_network self._build_network()self.target_network self._build_network()self.optimizer optim.Adam(self.q_network.parameters(), lr1e-3)# 训练参数self.epsilon 1.0self.epsilon_min 0.01self.epsilon_decay 0.995self.gamma 0.99# 经验回放self.memory deque(maxlen10000)self.batch_size 32# 更新目标网络self.update_target_network()def _create_discrete_actions(self):创建离散化的动作空间actions []for i in range(self.angle_bins):angle 2 * math.pi * i / self.angle_binsfor j in range(self.velocity_bins):velocity (j 1) / self.velocity_bins # 避免速度为0actions.append((angle, velocity))return actionsdef _build_network(self):构建Q网络return nn.Sequential(nn.Linear(self.state_size, 128),nn.ReLU(),nn.Linear(128, 64),nn.ReLU(),nn.Linear(64, self.action_size))def update_target_network(self):更新目标网络self.target_network.load_state_dict(self.q_network.state_dict())def select_action(self, state):选择动作if random.random() self.epsilon:with torch.no_grad():state_tensor torch.FloatTensor(state).unsqueeze(0)q_values self.q_network(state_tensor)action_idx q_values.argmax().item()else:action_idx random.randrange(self.action_size)return action_idx, self.actions[action_idx]def store_transition(self, state, action_idx, reward, next_state, done):存储经验self.memory.append((state, action_idx, reward, next_state, done))def learn(self):学习if len(self.memory) self.batch_size:return# 采样批次batch random.sample(self.memory, self.batch_size)states, actions, rewards, next_states, dones zip(*batch)states torch.FloatTensor(states)actions torch.LongTensor(actions)rewards torch.FloatTensor(rewards)next_states torch.FloatTensor(next_states)dones torch.BoolTensor(dones)# 计算当前Q值current_q_values self.q_network(states).gather(1, actions.unsqueeze(1))# 计算目标Q值with torch.no_grad():next_q_values self.target_network(next_states).max(1)[0]target_q_values rewards (self.gamma * next_q_values * ~dones)# 计算损失loss F.mse_loss(current_q_values.squeeze(), target_q_values)# 更新网络self.optimizer.zero_grad()loss.backward()self.optimizer.step()# 更新探索率if self.epsilon self.epsilon_min:self.epsilon * self.epsilon_decaydef train_discretized_continuous_dqn():训练离散化连续动作DQNprint( 离散化连续动作DQN训练 )env ContinuousGridWorldEnv()agent DiscretizedContinuousActionDQN(state_size4)episode_rewards []for episode in range(1000):state env.reset()episode_reward 0for step in range(200):# 选择动作action_idx, continuous_action agent.select_action(state)# 执行动作next_state, reward, done env.step(continuous_action)# 存储经验agent.store_transition(state, action_idx, reward, next_state, done)# 学习agent.learn()episode_reward rewardstate next_stateif done:breakepisode_rewards.append(episode_reward)# 更新目标网络if episode % 100 0:agent.update_target_network()if episode % 100 0:avg_reward np.mean(episode_rewards[-100:])print(f回合 {episode}: 平均奖励 {avg_reward:.2f}, 探索率 {agent.epsilon:.3f})return agent, episode_rewards# 运行训练
# agent, rewards train_discretized_continuous_dqn()9.5 动作离散化的局限性
通过上面的实现我们可以看到动作离散化的问题
9.5.1 精度损失
问题离散化会丢失精度
8个角度只能表示45°的倍数无法精确控制移动方向
9.5.2 维度爆炸
问题高维连续动作空间离散化后组合数激增
2维动作角度速度8×324个选择如果是机器人7个关节每个关节10个离散值 10 7 10^7 107个动作
9.5.3 不自然的动作
问题真实世界的动作是连续的
机器人关节角度应该是连续的汽车方向盘转角应该是平滑的
9.6 连续动作的真正解决方案
9.6.1 策略梯度方法
优势
直接输出连续动作可以学习随机策略适用于高维动作空间
9.6.2 演员-评论员方法
结合两者优势
演员策略网络输出连续动作评论员价值网络评估策略好坏
这就是我们下一章要深入学习的内容
小结
连续动作深度Q网络面临的挑战
理论挑战无法直接最大化Q函数计算挑战动作空间可能无限大实现挑战需要新的算法框架
解决方向
动作离散化简单但有局限性特殊Q函数形式如NAF限制表达能力演员-评论员最成功的方法
下一章我们将学习演员-评论员算法这是处理连续动作的主流方法 第十章演员-评论员算法 - 小智的双重人格
10.1 演员-评论员的核心思想
想象小智有两个大脑
演员大脑负责做决策选择动作评论员大脑负责评价判断动作好坏
这就是**演员-评论员(Actor-Critic)**算法的基本思想
10.1.1 为什么需要两个网络
单独使用策略梯度的问题
方差大学习不稳定需要完整回合才能计算回报
单独使用价值函数的问题
连续动作空间难以处理 max a Q ( s , a ) \max_a Q(s,a) maxaQ(s,a)只能评估不能直接决策
演员-评论员的优势
演员处理连续动作评论员提供即时反馈两者相互促进共同提高
10.2 演员-评论员的数学原理
10.2.1 演员网络策略网络
参数化策略 π ( a ∣ s ; θ ) \pi(a|s;\theta) π(a∣s;θ)
目标最大化期望回报 J ( θ ) E s ∼ ρ π , a ∼ π [ Q π ( s , a ) ] J(\theta) \mathbb{E}_{s \sim \rho^\pi, a \sim \pi}[Q^\pi(s,a)] J(θ)Es∼ρπ,a∼π[Qπ(s,a)]
策略梯度 ∇ θ J ( θ ) E s ∼ ρ π , a ∼ π [ Q π ( s , a ) ∇ θ ln π ( a ∣ s ; θ ) ] \nabla_\theta J(\theta) \mathbb{E}_{s \sim \rho^\pi, a \sim \pi}[Q^\pi(s,a) \nabla_\theta \ln \pi(a|s;\theta)] ∇θJ(θ)Es∼ρπ,a∼π[Qπ(s,a)∇θlnπ(a∣s;θ)]
10.2.2 评论员网络价值网络
状态价值函数 V ( s ; ϕ ) V(s;\phi) V(s;ϕ) 目标近似真实价值函数 V π ( s ) V^\pi(s) Vπ(s)
TD误差 δ r γ V ( s ′ ; ϕ ) − V ( s ; ϕ ) \delta r \gamma V(s;\phi) - V(s;\phi) δrγV(s′;ϕ)−V(s;ϕ)
更新规则 ϕ ← ϕ α v δ ∇ ϕ V ( s ; ϕ ) \phi \leftarrow \phi \alpha_v \delta \nabla_\phi V(s;\phi) ϕ←ϕαvδ∇ϕV(s;ϕ)
10.2.3 优势函数
关键思想用TD误差估计优势函数 A ( s , a ) Q ( s , a ) − V ( s ) ≈ r γ V ( s ′ ) − V ( s ) δ A(s,a) Q(s,a) - V(s) \approx r \gamma V(s) - V(s) \delta A(s,a)Q(s,a)−V(s)≈rγV(s′)−V(s)δ
最终的演员-评论员更新
评论员更新 ϕ ← ϕ α v δ ∇ ϕ V ( s ; ϕ ) \phi \leftarrow \phi \alpha_v \delta \nabla_\phi V(s;\phi) ϕ←ϕαvδ∇ϕV(s;ϕ)演员更新 θ ← θ α π δ ∇ θ ln π ( a ∣ s ; θ ) \theta \leftarrow \theta \alpha_\pi \delta \nabla_\theta \ln \pi(a|s;\theta) θ←θαπδ∇θlnπ(a∣s;θ)
10.3 算法流程
算法演员-评论员 (Actor-Critic)
输入- 演员网络 π(a|s;θ)- 评论员网络 V(s;φ)- 学习率 α_π, α_v1. 初始化网络参数 θ, φ
2. for each episode:
3. 初始化状态 s
4. for each step:
5. 根据 π(a|s;θ) 采样动作 a
6. 执行动作 a获得 r, s
7. 计算 TD误差δ r γV(s;φ) - V(s;φ)
8. 更新评论员φ ← φ α_v δ ∇_φ V(s;φ)
9. 更新演员θ ← θ α_π δ ∇_θ ln π(a|s;θ)
10. s ← s
11. end for
12. end for10.4 网格世界中的演员-评论员实现
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import numpy as np
import random
import matplotlib.pyplot as plt
import mathclass ActorNetwork(nn.Module):演员网络输出动作的概率分布def __init__(self, state_size, action_size, hidden_size64):super(ActorNetwork, self).__init__()self.fc1 nn.Linear(state_size, hidden_size)self.fc2 nn.Linear(hidden_size, hidden_size)# 对于连续动作我们输出高斯分布的参数self.mean_head nn.Linear(hidden_size, action_size)self.std_head nn.Linear(hidden_size, action_size)def forward(self, state):x F.relu(self.fc1(state))x F.relu(self.fc2(x))# 输出动作的均值和标准差mean self.mean_head(x)std F.softplus(self.std_head(x)) 1e-5 # 确保std 0return mean, stddef sample_action(self, state):采样动作mean, std self.forward(state)normal_dist torch.distributions.Normal(mean, std)action normal_dist.sample()log_prob normal_dist.log_prob(action).sum(dim-1)return action, log_probclass CriticNetwork(nn.Module):评论员网络估计状态价值def __init__(self, state_size, hidden_size64):super(CriticNetwork, self).__init__()self.fc1 nn.Linear(state_size, hidden_size)self.fc2 nn.Linear(hidden_size, hidden_size)self.value_head nn.Linear(hidden_size, 1)def forward(self, state):x F.relu(self.fc1(state))x F.relu(self.fc2(x))value self.value_head(x)return valueclass ActorCriticAgent:演员-评论员智能体def __init__(self, state_size, action_size, lr_actor1e-3, lr_critic1e-3, gamma0.99):self.state_size state_sizeself.action_size action_sizeself.gamma gamma# 网络self.actor ActorNetwork(state_size, action_size)self.critic CriticNetwork(state_size)# 优化器self.actor_optimizer optim.Adam(self.actor.parameters(), lrlr_actor)self.critic_optimizer optim.Adam(self.critic.parameters(), lrlr_critic)# 统计信息self.actor_losses []self.critic_losses []self.episode_rewards []def select_action(self, state):选择动作state_tensor torch.FloatTensor(state).unsqueeze(0)action, log_prob self.actor.sample_action(state_tensor)value self.critic(state_tensor)return action.squeeze().numpy(), log_prob.item(), value.item()def update(self, state, action, reward, next_state, done, log_prob):更新网络state_tensor torch.FloatTensor(state).unsqueeze(0)next_state_tensor torch.FloatTensor(next_state).unsqueeze(0)# 计算TD误差current_value self.critic(state_tensor)if done:target_value rewardelse:next_value self.critic(next_state_tensor)target_value reward self.gamma * next_value.item()td_error target_value - current_value.item()# 更新评论员critic_loss F.mse_loss(current_value, torch.FloatTensor([target_value]))self.critic_optimizer.zero_grad()critic_loss.backward()self.critic_optimizer.step()# 更新演员actor_loss -log_prob * td_error # 负号因为我们要最大化self.actor_optimizer.zero_grad()actor_loss.backward()self.actor_optimizer.step()# 记录损失self.actor_losses.append(abs(actor_loss.item()))self.critic_losses.append(critic_loss.item())return td_errorclass ContinuousActionGridWorld:连续动作网格世界适用于演员-评论员def __init__(self, grid_size4, step_size0.2):self.grid_size grid_sizeself.step_size step_size# 位置self.start_pos np.array([0.5, 0.5])self.goal_pos np.array([3.5, 3.5])# 障碍物self.obstacles [{center: np.array([1.5, 1.5]), radius: 0.4}, # (2,2){center: np.array([2.5, 2.5]), radius: 0.4} # (3,3)]self.current_pos self.start_pos.copy()def reset(self):重置环境self.current_pos self.start_pos.copy()return self.get_state()def get_state(self):获取状态# 归一化位置信息normalized_pos self.current_pos / self.grid_sizerelative_goal (self.goal_pos - self.current_pos) / self.grid_size# 计算到障碍物的距离obstacle_distances []for obs in self.obstacles:dist np.linalg.norm(self.current_pos - obs[center])obstacle_distances.append(dist / self.grid_size)state np.concatenate([normalized_pos, relative_goal, obstacle_distances])return statedef is_in_obstacle(self, pos):检查是否在障碍物中for obs in self.obstacles:if np.linalg.norm(pos - obs[center]) obs[radius]:return Truereturn Falsedef is_out_of_bounds(self, pos):检查是否越界return (pos[0] 0 or pos[0] self.grid_size or pos[1] 0 or pos[1] self.grid_size)def step(self, action):执行动作# 动作是2维[dx, dy]范围是[-1, 1]action np.clip(action, -1, 1) # 确保动作在合理范围内# 计算新位置new_pos self.current_pos action * self.step_size# 检查碰撞if self.is_out_of_bounds(new_pos) or self.is_in_obstacle(new_pos):# 碰撞不移动reward -10.0done Falseelse:# 合法移动old_dist np.linalg.norm(self.current_pos - self.goal_pos)self.current_pos new_posnew_dist np.linalg.norm(self.current_pos - self.goal_pos)# 奖励设计if new_dist 0.2: # 到达目标reward 100.0done Trueelse:# 鼓励接近目标惩罚时间消耗reward (old_dist - new_dist) * 20 - 0.1done Falsereturn self.get_state(), reward, donedef train_actor_critic():训练演员-评论员算法print( 演员-评论员算法训练 )env ContinuousActionGridWorld()state_size len(env.get_state())action_size 2 # 2D移动agent ActorCriticAgent(state_size, action_size)episode_rewards []for episode in range(1000):state env.reset()episode_reward 0for step in range(200):# 选择动作action, log_prob, value agent.select_action(state)# 执行动作next_state, reward, done env.step(action)# 更新网络td_error agent.update(state, action, reward, next_state, done, log_prob)episode_reward rewardstate next_stateif done:breakepisode_rewards.append(episode_reward)agent.episode_rewards.append(episode_reward)if episode % 100 0:avg_reward np.mean(episode_rewards[-100:])print(f回合 {episode}: 平均奖励 {avg_reward:.2f})return agent, episode_rewards# 训练智能体
# agent, rewards train_actor_critic()10.5 演员-评论员的变种
10.5.1 A2C (Advantage Actor-Critic)
改进使用优势函数而不是TD误差
优势函数估计 A ( s , a ) Q ( s , a ) − V ( s ) ≈ r γ V ( s ′ ) − V ( s ) A(s,a) Q(s,a) - V(s) \approx r \gamma V(s) - V(s) A(s,a)Q(s,a)−V(s)≈rγV(s′)−V(s)
10.5.2 A3C (Asynchronous Advantage Actor-Critic)
核心思想多个智能体并行训练
每个智能体独立探索环境定期同步网络参数提高训练效率和稳定性
10.5.3 TRPO (Trust Region Policy Optimization)
问题策略更新步长难以控制 解决限制策略更新的KL散度 max θ E [ A ( s , a ) π θ ( a ∣ s ) π θ o l d ( a ∣ s ) ] \max_\theta \mathbb{E}[A(s,a) \frac{\pi_\theta(a|s)}{\pi_{\theta_{old}}(a|s)}] θmaxE[A(s,a)πθold(a∣s)πθ(a∣s)] s.t. E [ D K L ( π θ o l d ( ⋅ ∣ s ) ∣ ∣ π θ ( ⋅ ∣ s ) ) ] ≤ δ \text{s.t. } \mathbb{E}[D_{KL}(\pi_{\theta_{old}}(\cdot|s) || \pi_\theta(\cdot|s))] \leq \delta s.t. E[DKL(πθold(⋅∣s)∣∣πθ(⋅∣s))]≤δ
10.6 演员-评论员的优缺点
10.6.1 优点
处理连续动作直接输出连续动作分布在线学习每步都可以更新不需要等待回合结束低方差评论员提供基线减少方差理论基础有坚实的理论保证
10.6.2 缺点
两个网络需要平衡两个网络的学习速度超参数敏感学习率、网络结构等需要仔细调节收敛性可能不如纯策略梯度方法稳定计算复杂比单一网络方法更复杂
10.7 实际应用技巧
10.7.1 网络架构设计
class SharedActorCritic(nn.Module):共享特征提取的演员-评论员网络def __init__(self, state_size, action_size):super().__init__()# 共享特征提取层self.shared_layers nn.Sequential(nn.Linear(state_size, 128),nn.ReLU(),nn.Linear(128, 128),nn.ReLU())# 演员头self.actor_head nn.Sequential(nn.Linear(128, 64),nn.ReLU(),nn.Linear(64, action_size))# 评论员头self.critic_head nn.Sequential(nn.Linear(128, 64),nn.ReLU(),nn.Linear(64, 1))10.7.2 动作空间处理
def process_continuous_action(self, raw_action):处理连续动作输出# 使用tanh激活函数将输出限制在[-1, 1]action torch.tanh(raw_action)# 或者使用更复杂的映射# action torch.sigmoid(raw_action) * 2 - 1return action小结
演员-评论员算法是连续控制的重要方法
双网络架构演员负责决策评论员负责评价理论优雅结合了策略梯度和价值函数的优点实用性强可以处理连续动作空间持续发展衍生出PPO、SAC等先进算法
下一章我们将学习稀疏奖励环境中的挑战和解决方案 第十一章稀疏奖励 - 小智的艰难探索
11.1 稀疏奖励的挑战
想象一个更困难的场景小智被放在一个巨大的迷宫中只有到达终点才能获得奖励其他时候都没有任何反馈。这就是稀疏奖励的挑战
11.1.1 稀疏奖励的定义
稀疏奖励环境奖励信号很少出现大部分时间奖励为0。
例子
迷宫游戏只有到达出口才有奖励机器人抓取只有成功抓取物体才有奖励游戏AI只有游戏结束时才知道输赢
11.1.2 为什么稀疏奖励困难
信号稀少 R ( s , a ) { 1 如果到达目标 0 其他所有情况 R(s,a) \begin{cases} 1 \text{如果到达目标} \\ 0 \text{其他所有情况} \end{cases} R(s,a){10如果到达目标其他所有情况
探索困难
随机策略很难找到奖励没有奖励信号指导学习方向容易陷入局部最优
信用分配问题
一个回合有100步只在最后获得奖励哪些动作真正对成功有贡献
11.2 稀疏奖励网格世界
让我们创建一个稀疏奖励版本的网格世界
11.2.1 环境设计
class SparseRewardGridWorld(MDPGridWorld):稀疏奖励网格世界def __init__(self, success_prob0.7, slip_prob0.15, gamma0.99):super().__init__(success_prob, slip_prob, gamma)# 重新设计奖励函数 - 只有成功才有奖励self.sparse_rewards self._build_sparse_reward_matrix()def _build_sparse_reward_matrix(self):构建稀疏奖励矩阵R np.zeros((self.num_states, self.num_actions, self.num_states))goal_state_id self.pos_to_state_id(self.goal_pos[0], self.goal_pos[1])goal_state_idx self.state_to_idx[goal_state_id]# 只有到达目标才有奖励for state_idx in range(self.num_states):for action_id in range(self.num_actions):for next_state_idx in range(self.num_states):if (self.transition_probs[state_idx][action_id][next_state_idx] 0 and next_state_idx goal_state_idx):R[state_idx][action_id][next_state_idx] 1.0else:R[state_idx][action_id][next_state_idx] 0.0return R11.1.2 传统方法的失败
def test_traditional_methods_on_sparse_reward():测试传统方法在稀疏奖励下的表现print( 传统方法在稀疏奖励下的表现 )env SparseRewardGridWorld()env.rewards env.sparse_rewards # 使用稀疏奖励# 测试Q-learningprint(1. Q-learning:)q_agent QLearningGridWorld()q_agent.rewards env.sparse_rewards# 训练更长时间q_agent.train_q_learning(num_episodes5000, verboseFalse)q_result q_agent.evaluate_learned_policy(num_test_episodes100)print(f 成功率: {q_result[success_rate]:.2%})print(f 平均奖励: {q_result[avg_reward]:.2f})# 大多数情况下成功率会很低因为很难探索到目标return q_result# 运行测试
# result test_traditional_methods_on_sparse_reward()11.3 解决稀疏奖励的方法
11.3.1 奖励整形 (Reward Shaping)
基本思想为环境添加额外的奖励信号引导智能体探索。
距离奖励 R shaped ( s , a , s ′ ) R ( s , a , s ′ ) F ( s , s ′ ) R_{\text{shaped}}(s,a,s) R(s,a,s) F(s,s) Rshaped(s,a,s′)R(s,a,s′)F(s,s′)
其中 F ( s , s ′ ) F(s,s) F(s,s′)是整形函数。
常用整形函数 F ( s , s ′ ) γ Φ ( s ′ ) − Φ ( s ) F(s,s) \gamma \Phi(s) - \Phi(s) F(s,s′)γΦ(s′)−Φ(s)
其中 Φ ( s ) \Phi(s) Φ(s)是势函数(potential function)。
例子距离势函数 Φ ( s ) − ∥ s − s goal ∥ 2 \Phi(s) -\|s - s_{\text{goal}}\|_2 Φ(s)−∥s−sgoal∥2
class RewardShapingGridWorld(SparseRewardGridWorld):奖励整形网格世界def __init__(self, success_prob0.7, slip_prob0.15, gamma0.99, shaping_weight0.1):super().__init__(success_prob, slip_prob, gamma)self.shaping_weight shaping_weight# 构建整形奖励矩阵self.shaped_rewards self._build_shaped_reward_matrix()def _build_shaped_reward_matrix(self):构建整形奖励矩阵R self.sparse_rewards.copy()goal_pos np.array(self.goal_pos)for state_idx in range(self.num_states):state_id self.idx_to_state[state_idx]state_pos np.array(self.state_id_to_pos(state_id))for action_id in range(self.num_actions):for next_state_idx in range(self.num_states):if self.transition_probs[state_idx][action_id][next_state_idx] 0:next_state_id self.idx_to_state[next_state_idx]next_state_pos np.array(self.state_id_to_pos(next_state_id))# 计算势函数差old_potential -np.linalg.norm(state_pos - goal_pos)new_potential -np.linalg.norm(next_state_pos - goal_pos)# 添加整形奖励shaping_reward self.gamma * new_potential - old_potentialR[state_idx][action_id][next_state_idx] self.shaping_weight * shaping_rewardreturn Rdef test_reward_shaping():测试奖励整形print( 奖励整形测试 )# 原始稀疏奖励sparse_env SparseRewardGridWorld()sparse_env.rewards sparse_env.sparse_rewards# 奖励整形shaped_env RewardShapingGridWorld(shaping_weight0.1)shaped_env.rewards shaped_env.shaped_rewards# 比较结果results {}for name, env in [(稀疏奖励, sparse_env), (奖励整形, shaped_env)]:q_agent QLearningGridWorld()q_agent.rewards env.rewardsq_agent.transition_probs env.transition_probsq_agent.train_q_learning(num_episodes2000, verboseFalse)result q_agent.evaluate_learned_policy(num_test_episodes100)results[name] resultprint(f{name}:)print(f 成功率: {result[success_rate]:.2%})print(f 平均奖励: {result[avg_reward]:.2f})return results# 运行测试
# results test_reward_shaping()11.3.2 课程学习 (Curriculum Learning)
基本思想从简单任务开始逐渐增加难度。
例子
第一阶段目标在距离起点1步的位置第二阶段目标在距离起点2步的位置最终阶段目标在原始位置
class CurriculumGridWorld:课程学习网格世界def __init__(self, start_pos(1, 1), final_goal(4, 4)):self.start_pos start_posself.final_goal final_goal# 设计课程从近到远的目标位置self.curriculum [(2, 2), # 第一阶段简单目标(3, 3), # 第二阶段中等难度(4, 4) # 第三阶段最终目标]self.current_stage 0def get_current_environment(self):获取当前阶段的环境current_goal self.curriculum[self.current_stage]env SparseRewardGridWorld()# 修改目标位置env.goal_pos current_goalenv.sparse_rewards env._build_sparse_reward_matrix()env.rewards env.sparse_rewardsreturn envdef should_advance_stage(self, success_rate):判断是否应该进入下一阶段return success_rate 0.8 # 成功率超过80%就进入下一阶段def advance_stage(self):进入下一阶段if self.current_stage len(self.curriculum) - 1:self.current_stage 1return Truereturn Falsedef train_with_curriculum():使用课程学习训练print( 课程学习训练 )curriculum CurriculumGridWorld()agent QLearningGridWorld()while True:# 获取当前阶段环境env curriculum.get_current_environment()agent.rewards env.rewardsagent.transition_probs env.transition_probsprint(f阶段 {curriculum.current_stage 1}: 目标位置 {env.goal_pos})# 在当前阶段训练agent.train_q_learning(num_episodes1000, verboseFalse)# 评估性能result agent.evaluate_learned_policy(num_test_episodes100)success_rate result[success_rate]print(f 成功率: {success_rate:.2%})# 判断是否进入下一阶段if curriculum.should_advance_stage(success_rate):if not curriculum.advance_stage():print(课程学习完成)breakelse:print( 继续当前阶段训练...)return agent# 运行课程学习
# trained_agent train_with_curriculum()11.3.3 好奇心驱动探索 (Curiosity-Driven Exploration)
核心思想给智能体内在动机去探索新奇的状态。
ICM (Intrinsic Curiosity Module)
前向模型预测下一个状态的特征逆向模型从状态特征预测动作好奇心奖励预测误差作为内在奖励
数学formulation r i η ∥ ϕ ( s t 1 ) − ϕ ^ ( s t 1 ) ∥ 2 r_i \eta \|\phi(s_{t1}) - \hat{\phi}(s_{t1})\|^2 riη∥ϕ(st1)−ϕ^(st1)∥2
其中 ϕ ( s ) \phi(s) ϕ(s)是状态特征 ϕ ^ ( s t 1 ) \hat{\phi}(s_{t1}) ϕ^(st1)是预测的下一状态特征。
class CuriosityModule(nn.Module):好奇心模块def __init__(self, state_size, action_size, feature_size64):super().__init__()# 特征提取网络self.feature_net nn.Sequential(nn.Linear(state_size, 128),nn.ReLU(),nn.Linear(128, feature_size))# 前向模型从特征和动作预测下一状态特征self.forward_model nn.Sequential(nn.Linear(feature_size action_size, 128),nn.ReLU(),nn.Linear(128, feature_size))# 逆向模型从特征变化预测动作self.inverse_model nn.Sequential(nn.Linear(feature_size * 2, 128),nn.ReLU(),nn.Linear(128, action_size))self.optimizer optim.Adam(self.parameters(), lr1e-3)def forward(self, state, action, next_state):计算好奇心奖励# 提取特征phi_s self.feature_net(state)phi_s_next self.feature_net(next_state)# 前向模型预测action_onehot F.one_hot(action.long(), num_classes5).float()pred_phi_s_next self.forward_model(torch.cat([phi_s, action_onehot], dim-1))# 逆向模型预测pred_action self.inverse_model(torch.cat([phi_s, phi_s_next], dim-1))# 计算损失forward_loss F.mse_loss(pred_phi_s_next, phi_s_next)inverse_loss F.cross_entropy(pred_action, action.long())# 好奇心奖励预测误差curiosity_reward forward_loss.detach()return curiosity_reward, forward_loss, inverse_lossdef update(self, forward_loss, inverse_loss):更新好奇心模块total_loss forward_loss inverse_lossself.optimizer.zero_grad()total_loss.backward()self.optimizer.step()class CuriosityDrivenAgent(QLearningGridWorld):好奇心驱动的Q-learning智能体def __init__(self, success_prob0.7, slip_prob0.15, gamma0.99, curiosity_weight0.1):super().__init__(success_prob, slip_prob, gamma)self.curiosity_weight curiosity_weightself.curiosity_module CuriosityModule(state_sizeself.num_states, action_sizeself.num_actions)# 用于存储好奇心奖励self.curiosity_rewards []def state_to_tensor(self, state_id):将状态转换为one-hot张量state_vector np.zeros(self.num_states)if state_id in self.valid_states:state_vector[self.valid_states.index(state_id)] 1.0return torch.FloatTensor(state_vector)def curiosity_q_learning_step(self, state_id, action_id, reward, next_state_id, doneFalse):带好奇心的Q-learning更新# 原始Q-learning更新td_error super().q_learning_step(state_id, action_id, reward, next_state_id, done)# 计算好奇心奖励state_tensor self.state_to_tensor(state_id).unsqueeze(0)next_state_tensor self.state_to_tensor(next_state_id).unsqueeze(0)action_tensor torch.tensor([action_id])curiosity_reward, forward_loss, inverse_loss self.curiosity_module(state_tensor, action_tensor, next_state_tensor)# 更新好奇心模块self.curiosity_module.update(forward_loss, inverse_loss)# 将好奇心奖励添加到原始奖励total_reward reward self.curiosity_weight * curiosity_reward.item()self.curiosity_rewards.append(curiosity_reward.item())# 用总奖励更新Q值state_idx self.state_to_idx[state_id]next_state_idx self.state_to_idx[next_state_id]current_q self.Q_table[state_idx][action_id]if done:target_q total_rewardelse:next_max_q np.max(self.Q_table[next_state_idx])target_q total_reward self.gamma * next_max_qtd_error target_q - current_qself.Q_table[state_idx][action_id] self.alpha * td_errorreturn abs(td_error)def test_curiosity_driven_learning():测试好奇心驱动学习print( 好奇心驱动学习测试 )# 创建稀疏奖励环境env SparseRewardGridWorld()# 普通Q-learningnormal_agent QLearningGridWorld()normal_agent.rewards env.sparse_rewardsnormal_agent.train_q_learning(num_episodes3000, verboseFalse)normal_result normal_agent.evaluate_learned_policy(num_test_episodes100)# 好奇心驱动Q-learningcurious_agent CuriosityDrivenAgent(curiosity_weight0.1)curious_agent.rewards env.sparse_rewards# 重写训练循环以使用好奇心for episode in range(3000):current_state_id curious_agent.pos_to_state_id(curious_agent.start_pos[0], curious_agent.start_pos[1])goal_state_id curious_agent.pos_to_state_id(curious_agent.goal_pos[0], curious_agent.goal_pos[1])episode_reward 0for step in range(1000):action_id curious_agent.epsilon_greedy_action(current_state_id)next_state_id curious_agent.simulate_transition(current_state_id, action_id)current_state_idx curious_agent.state_to_idx[current_state_id]next_state_idx curious_agent.state_to_idx[next_state_id]reward curious_agent.rewards[current_state_idx][action_id][next_state_idx]done (next_state_id goal_state_id)# 使用好奇心Q-learning更新curious_agent.curiosity_q_learning_step(current_state_id, action_id, reward, next_state_id, done)episode_reward rewardif done:breakcurrent_state_id next_state_idcurious_agent.episode_rewards.append(episode_reward)curious_agent.epsilon max(curious_agent.epsilon_min, curious_agent.epsilon * curious_agent.epsilon_decay)curious_result curious_agent.evaluate_learned_policy(num_test_episodes100)# 比较结果print(普通Q-learning:)print(f 成功率: {normal_result[success_rate]:.2%})print(f 平均奖励: {normal_result[avg_reward]:.2f})print(好奇心驱动Q-learning:)print(f 成功率: {curious_result[success_rate]:.2%})print(f 平均奖励: {curious_result[avg_reward]:.2f})return normal_result, curious_result# 运行测试
# normal_result, curious_result test_curiosity_driven_learning()11.4 其他稀疏奖励解决方法
11.4.1 HER (Hindsight Experience Replay)
核心思想从失败中学习将失败的轨迹重新标记为成功。
例子
原始目标到达(4,4)实际到达(3,2)HER重新标记假设目标就是(3,2)这次就是成功的
11.4.2 探索策略
ε-贪心的改进
Upper Confidence Bound (UCB)考虑不确定性的探索Thompson Sampling贝叶斯探索Count-based Exploration访问次数较少的状态-动作对
11.4.3 分层强化学习
思想将复杂任务分解为子任务
高层策略选择子目标底层策略执行具体动作
11.5 稀疏奖励的理论分析
11.5.1 探索复杂度
定理在稀疏奖励环境中随机探索需要指数级时间找到奖励。
证明思路
状态空间大小 ∣ S ∣ |\mathcal{S}| ∣S∣随机策略找到目标的概率 1 ∣ S ∣ \frac{1}{|\mathcal{S}|} ∣S∣1期望时间 ∣ S ∣ |\mathcal{S}| ∣S∣对于我们的4×4网格约14步
11.5.2 奖励整形的理论保证
定理势函数形式的奖励整形不改变最优策略。
证明 对于整形奖励 R ′ ( s , a , s ′ ) R ( s , a , s ′ ) γ Φ ( s ′ ) − Φ ( s ) R(s,a,s) R(s,a,s) \gamma\Phi(s) - \Phi(s) R′(s,a,s′)R(s,a,s′)γΦ(s′)−Φ(s)
新的Q函数 Q ′ ( s , a ) Q ( s , a ) Φ ( s ) Q(s,a) Q(s,a) \Phi(s) Q′(s,a)Q(s,a)Φ(s)
最优动作 arg max a Q ′ ( s , a ) arg max a ( Q ( s , a ) Φ ( s ) ) arg max a Q ( s , a ) \arg\max_a Q(s,a) \arg\max_a (Q(s,a) \Phi(s)) \arg\max_a Q(s,a) argmaxaQ′(s,a)argmaxa(Q(s,a)Φ(s))argmaxaQ(s,a)
因此最优策略不变。
小结
稀疏奖励是强化学习中的重大挑战
探索困难缺乏奖励信号指导学习缓慢样本效率低下信用分配难以确定关键动作
解决方案
奖励整形添加引导信号课程学习从简单到复杂好奇心驱动内在动机探索经验重用HER等技术
下一章我们将学习模仿学习看看如何从专家演示中学习 第十二章模仿学习 - 小智的学徒之路
12.1 模仿学习的动机
想象小智有一位经验丰富的导师这位导师已经知道如何完美地完成任务。我们能否让小智直接从导师的演示中学习而不是从零开始探索
这就是**模仿学习(Imitation Learning)**的核心思想
12.1.1 为什么需要模仿学习
传统强化学习的问题
探索代价高机器人在学习过程中可能损坏时间成本大需要大量试错才能学会安全风险自动驾驶不能在学习过程中撞车
模仿学习的优势
快速启动从专家经验开始而不是随机策略安全高效避免危险的探索样本效率专家演示信息量大
12.1.2 模仿学习的类型
监督学习视角
输入状态 s s s输出专家动作 a ∗ a^* a∗目标学习映射 s → a ∗ s \rightarrow a^* s→a∗
强化学习视角
从专家轨迹中推断奖励函数使用推断的奖励训练强化学习智能体
12.2 行为克隆 (Behavioral Cloning)
12.2.1 基本思想
行为克隆将模仿学习看作监督学习问题。
数学公式 给定专家轨迹数据集 D { ( s i , a i ) } i 1 N \mathcal{D} \{(s_i, a_i)\}_{i1}^N D{(si,ai)}i1N学习策略 π θ ( a ∣ s ) \pi_\theta(a|s) πθ(a∣s)最小化 L ( θ ) ∑ i 1 N ℓ ( π θ ( a i ∣ s i ) , a i ) L(\theta) \sum_{i1}^N \ell(\pi_\theta(a_i|s_i), a_i) L(θ)i1∑Nℓ(πθ(ai∣si),ai)
对于离散动作 ℓ − log π θ ( a i ∣ s i ) \ell -\log \pi_\theta(a_i|s_i) ℓ−logπθ(ai∣si)交叉熵损失 对于连续动作 ℓ ∥ a i − π θ ( s i ) ∥ 2 \ell \|a_i - \pi_\theta(s_i)\|^2 ℓ∥ai−πθ(si)∥2均方误差损失
12.2.2 分布偏移问题
核心问题训练分布与测试分布不匹配。
原因
训练时在专家状态分布下学习测试时在学到的策略状态分布下执行小错误会累积导致分布偏移
数学分析 假设单步错误概率为 ϵ \epsilon ϵ T T T步后期望错误为 O ( ϵ T 2 ) O(\epsilon T^2) O(ϵT2)
12.3 网格世界中的行为克隆
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import numpy as np
from collections import defaultdict
import matplotlib.pyplot as pltclass ExpertGridWorld(MDPGridWorld):生成专家演示的网格世界def __init__(self, success_prob0.7, slip_prob0.15, gamma0.99):super().__init__(success_prob, slip_prob, gamma)# 计算最优策略作为专家策略self.expert_policy self._compute_expert_policy()def _compute_expert_policy(self):计算专家策略最优策略optimal_V self.value_iteration()optimal_policy self.extract_optimal_policy(optimal_V)return optimal_policydef generate_expert_demonstrations(self, num_episodes100):生成专家演示数据demonstrations []for episode in range(num_episodes):trajectory []current_state_id self.pos_to_state_id(self.start_pos[0], self.start_pos[1])goal_state_id self.pos_to_state_id(self.goal_pos[0], self.goal_pos[1])for step in range(1000): # 防止无限循环# 专家选择最优动作action_id self.expert_policy[current_state_id]# 记录状态-动作对trajectory.append((current_state_id, action_id))# 执行动作next_state_id self.simulate_transition(current_state_id, action_id)if next_state_id goal_state_id:breakcurrent_state_id next_state_iddemonstrations.extend(trajectory)return demonstrationsclass BehavioralCloningAgent:行为克隆智能体def __init__(self, state_size, action_size, lr1e-3):self.state_size state_sizeself.action_size action_size# 策略网络self.policy_net nn.Sequential(nn.Linear(state_size, 128),nn.ReLU(),nn.Linear(128, 64),nn.ReLU(),nn.Linear(64, action_size),nn.Softmax(dim-1))self.optimizer optim.Adam(self.policy_net.parameters(), lrlr)self.losses []def state_to_tensor(self, state_id, valid_states):将状态ID转换为one-hot张量state_vector np.zeros(len(valid_states))if state_id in valid_states:state_vector[valid_states.index(state_id)] 1.0return torch.FloatTensor(state_vector)def train(self, demonstrations, valid_states, num_epochs100, batch_size32):训练行为克隆模型print(f开始行为克隆训练样本数: {len(demonstrations)})# 转换数据格式states []actions []for state_id, action_id in demonstrations:state_tensor self.state_to_tensor(state_id, valid_states)states.append(state_tensor)actions.append(action_id)states torch.stack(states)actions torch.LongTensor(actions)# 训练dataset torch.utils.data.TensorDataset(states, actions)dataloader torch.utils.data.DataLoader(dataset, batch_sizebatch_size, shuffleTrue)for epoch in range(num_epochs):epoch_loss 0for batch_states, batch_actions in dataloader:# 前向传播action_probs self.policy_net(batch_states)# 计算交叉熵损失loss F.cross_entropy(action_probs, batch_actions)# 反向传播self.optimizer.zero_grad()loss.backward()self.optimizer.step()epoch_loss loss.item()avg_loss epoch_loss / len(dataloader)self.losses.append(avg_loss)if (epoch 1) % 10 0:print(f Epoch {epoch 1}: Loss {avg_loss:.4f})print(行为克隆训练完成)def select_action(self, state_tensor):选择动作with torch.no_grad():action_probs self.policy_net(state_tensor)action torch.argmax(action_probs).item()return actiondef evaluate(self, env, valid_states, num_episodes100):评估学到的策略rewards []lengths []for _ in range(num_episodes):current_state_id env.pos_to_state_id(env.start_pos[0], env.start_pos[1])goal_state_id env.pos_to_state_id(env.goal_pos[0], env.goal_pos[1])episode_reward 0steps 0for step in range(1000):state_tensor self.state_to_tensor(current_state_id, valid_states)action_id self.select_action(state_tensor)next_state_id env.simulate_transition(current_state_id, action_id)# 计算奖励current_state_idx env.state_to_idx[current_state_id]next_state_idx env.state_to_idx[next_state_id]reward env.rewards[current_state_idx][action_id][next_state_idx]episode_reward rewardsteps 1if next_state_id goal_state_id:breakcurrent_state_id next_state_idrewards.append(episode_reward)lengths.append(steps)avg_reward np.mean(rewards)avg_length np.mean(lengths)success_rate sum(1 for r in rewards if r 50) / len(rewards)return {avg_reward: avg_reward,avg_length: avg_length,success_rate: success_rate}def test_behavioral_cloning():测试行为克隆print( 行为克隆测试 )# 创建专家环境expert_env ExpertGridWorld()# 生成专家演示print(生成专家演示...)demonstrations expert_env.generate_expert_demonstrations(num_episodes50)print(f收集到 {len(demonstrations)} 个状态-动作对)# 分析专家演示action_counts defaultdict(int)for _, action_id in demonstrations:action_counts[action_id] 1print(专家动作分布:)for action_id, count in action_counts.items():action_name expert_env.actions[action_id]percentage count / len(demonstrations) * 100print(f {action_name}: {count} ({percentage:.1f}%))# 训练行为克隆智能体bc_agent BehavioralCloningAgent(state_sizeexpert_env.num_states,action_sizeexpert_env.num_actions)bc_agent.train(demonstrations, expert_env.valid_states, num_epochs50)# 评估行为克隆智能体bc_result bc_agent.evaluate(expert_env, expert_env.valid_states)print(f\n行为克隆结果:)print(f 平均奖励: {bc_result[avg_reward]:.2f})print(f 平均步数: {bc_result[avg_length]:.1f})print(f 成功率: {bc_result[success_rate]:.2%})# 与专家策略比较expert_result expert_env.evaluate_policy_exactly(expert_env.expert_policy)print(f\n专家策略理论值:)print(f 平均价值: {np.mean(expert_result):.2f})return bc_agent, bc_result# 运行测试
# bc_agent, bc_result test_behavioral_cloning()12.4 DAgger算法
12.4.1 DAgger的基本思想
Dataset Aggregation (DAgger)解决分布偏移问题的在线方法。
核心思想
用当前策略收集数据专家为这些状态提供标签将新数据加入训练集重新训练策略
算法流程
DAgger算法:
1. 初始化 D ∅, π̂₁ π*专家策略
2. for i 1 to N:
3. 用 π̂ᵢ 收集轨迹获得状态分布
4. 请专家为这些状态标注动作
5. 将新数据加入 D
6. 训练 π̂ᵢ₊₁ 在 D 上
7. end for12.4.2 DAgger的理论分析
定理DAgger的性能界限
如果专家策略是最优的单步错误率为 ϵ \epsilon ϵ则DAgger在 T T T步后的期望成本为 E [ c T ] ≤ c π ∗ O ( ϵ T ) \mathbb{E}[c_T] \leq c_{\pi^*} O(\epsilon T) E[cT]≤cπ∗O(ϵT)
相比行为克隆的 O ( ϵ T 2 ) O(\epsilon T^2) O(ϵT2)这是显著的改进
12.5 DAgger实现
class DAggerAgent(BehavioralCloningAgent):DAgger智能体def __init__(self, state_size, action_size, lr1e-3):super().__init__(state_size, action_size, lr)self.all_demonstrations []def collect_trajectories(self, env, valid_states, num_episodes20):用当前策略收集轨迹new_states []for episode in range(num_episodes):current_state_id env.pos_to_state_id(env.start_pos[0], env.start_pos[1])goal_state_id env.pos_to_state_id(env.goal_pos[0], env.goal_pos[1])for step in range(1000):new_states.append(current_state_id)# 用当前策略选择动作state_tensor self.state_to_tensor(current_state_id, valid_states)action_id self.select_action(state_tensor)# 执行动作next_state_id env.simulate_transition(current_state_id, action_id)if next_state_id goal_state_id:breakcurrent_state_id next_state_idreturn new_statesdef get_expert_labels(self, states, expert_policy):获取专家对这些状态的动作标签expert_actions []for state_id in states:expert_action expert_policy[state_id]expert_actions.append(expert_action)return list(zip(states, expert_actions))def dagger_train(self, env, expert_policy, valid_states, num_iterations10):DAgger训练print( DAgger训练 )for iteration in range(num_iterations):print(fDAgger迭代 {iteration 1})# 1. 收集轨迹new_states self.collect_trajectories(env, valid_states)print(f 收集了 {len(new_states)} 个新状态)# 2. 获取专家标签new_demonstrations self.get_expert_labels(new_states, expert_policy)# 3. 添加到数据集self.all_demonstrations.extend(new_demonstrations)print(f 总数据集大小: {len(self.all_demonstrations)})# 4. 重新训练self.train(self.all_demonstrations, valid_states, num_epochs20, batch_size32)# 5. 评估当前性能result self.evaluate(env, valid_states, num_episodes50)print(f 成功率: {result[success_rate]:.2%})print(f 平均奖励: {result[avg_reward]:.2f})print(DAgger训练完成)def compare_bc_and_dagger():比较行为克隆和DAggerprint( 行为克隆 vs DAgger 比较 )# 创建环境env ExpertGridWorld()# 生成初始专家演示较少initial_demos env.generate_expert_demonstrations(num_episodes20)print(f初始专家演示: {len(initial_demos)} 个样本)# 行为克隆print(\n1. 行为克隆:)bc_agent BehavioralCloningAgent(env.num_states, env.num_actions)bc_agent.train(initial_demos, env.valid_states, num_epochs50)bc_result bc_agent.evaluate(env, env.valid_states)# DAggerprint(\n2. DAgger:)dagger_agent DAggerAgent(env.num_states, env.num_actions)dagger_agent.all_demonstrations initial_demos.copy()dagger_agent.train(initial_demos, env.valid_states, num_epochs20)dagger_agent.dagger_train(env, env.expert_policy, env.valid_states, num_iterations5)dagger_result dagger_agent.evaluate(env, env.valid_states)# 结果比较print(\n 结果比较 )print(f{方法:15} | {成功率:10} | {平均奖励:10} | {平均步数:10})print(- * 55)print(f{行为克隆:15} | {bc_result[success_rate]:10.2%} | f{bc_result[avg_reward]:10.2f} | {bc_result[avg_length]:10.1f})print(f{DAgger:15} | {dagger_result[success_rate]:10.2%} | f{dagger_result[avg_reward]:10.2f} | {dagger_result[avg_length]:10.1f})return bc_result, dagger_result# 运行比较
# bc_result, dagger_result compare_bc_and_dagger()12.6 逆向强化学习 (Inverse Reinforcement Learning)
12.6.1 核心思想
问题我们能从专家行为中推断出奖励函数吗
逆向强化学习从专家演示中学习奖励函数然后用这个奖励函数训练策略。
数学表述 给定专家轨迹 { ( s i , a i ) } \{(s_i, a_i)\} {(si,ai)}找到奖励函数 R ( s , a ) R(s,a) R(s,a)使得专家策略是最优的。
12.6.2 最大熵IRL
思想在满足约束的奖励函数中选择使策略熵最大的那个。
目标函数 max R H ( π ∗ ) − λ ∥ E π ∗ [ ϕ ( s , a ) ] − E expert [ ϕ ( s , a ) ] ∥ \max_{R} H(\pi^*) - \lambda \|\mathbb{E}_{\pi^*}[\phi(s,a)] - \mathbb{E}_{\text{expert}}[\phi(s,a)]\| RmaxH(π∗)−λ∥Eπ∗[ϕ(s,a)]−Eexpert[ϕ(s,a)]∥
其中 ϕ ( s , a ) \phi(s,a) ϕ(s,a)是特征函数 H ( π ∗ ) H(\pi^*) H(π∗)是策略熵。
12.7 生成对抗模仿学习 (GAIL)
12.7.1 基本思想
GAIL用生成对抗网络的思想来做模仿学习。
组件
生成器策略网络 π θ \pi_\theta πθ生成轨迹判别器判别网络 D ϕ D_\phi Dϕ区分专家轨迹和生成轨迹
目标函数
判别器 max ϕ E expert [ log D ϕ ( s , a ) ] E π θ [ log ( 1 − D ϕ ( s , a ) ) ] \max_\phi \mathbb{E}_{\text{expert}}[\log D_\phi(s,a)] \mathbb{E}_{\pi_\theta}[\log(1-D_\phi(s,a))] maxϕEexpert[logDϕ(s,a)]Eπθ[log(1−Dϕ(s,a))]生成器 min θ E π θ [ log ( 1 − D ϕ ( s , a ) ) ] − λ H ( π θ ) \min_\theta \mathbb{E}_{\pi_\theta}[\log(1-D_\phi(s,a))] - \lambda H(\pi_\theta) minθEπθ[log(1−Dϕ(s,a))]−λH(πθ)
class GAILAgent:GAIL智能体简化版def __init__(self, state_size, action_size):self.state_size state_sizeself.action_size action_size# 策略网络生成器self.policy_net PolicyNetwork(state_size, action_size)self.policy_optimizer optim.Adam(self.policy_net.parameters(), lr1e-3)# 判别器网络self.discriminator nn.Sequential(nn.Linear(state_size action_size, 128),nn.ReLU(),nn.Linear(128, 64),nn.ReLU(),nn.Linear(64, 1),nn.Sigmoid())self.discriminator_optimizer optim.Adam(self.discriminator.parameters(), lr1e-3)def train_discriminator(self, expert_data, generated_data):训练判别器# 专家数据标签为1生成数据标签为0expert_labels torch.ones(len(expert_data), 1)generated_labels torch.zeros(len(generated_data), 1)# 计算损失expert_pred self.discriminator(expert_data)generated_pred self.discriminator(generated_data)loss F.binary_cross_entropy(expert_pred, expert_labels) \F.binary_cross_entropy(generated_pred, generated_labels)# 更新判别器self.discriminator_optimizer.zero_grad()loss.backward()self.discriminator_optimizer.step()return loss.item()def train_policy(self, states, actions):训练策略网络# 使用判别器的输出作为奖励信号state_action torch.cat([states, F.one_hot(actions, self.action_size).float()], dim1)fake_reward -torch.log(1 - self.discriminator(state_action) 1e-8) # 避免log(0)# 策略梯度更新简化版action_probs self.policy_net(states)log_probs torch.log(action_probs.gather(1, actions.unsqueeze(1)))policy_loss -(log_probs * fake_reward.detach()).mean()self.policy_optimizer.zero_grad()policy_loss.backward()self.policy_optimizer.step()return policy_loss.item()12.8 模仿学习的挑战与解决方案
12.8.1 主要挑战
分布偏移训练和测试分布不匹配专家次优专家可能不是最优的多模态行为专家可能有多种合理策略部分观测观测可能不完整
12.8.2 解决方案
在线方法DAgger等交互式学习鲁棒性考虑专家噪声和错误多样性学习行为的分布而不是单一策略状态增强增加历史信息
12.9 模仿学习的应用
12.9.1 成功案例
自动驾驶从人类司机演示学习机器人控制从动作捕捉数据学习游戏AI从人类玩家录像学习对话系统从人类对话数据学习
12.9.2 实际考虑
数据质量专家演示的质量很重要计算成本在线方法需要专家实时标注安全性确保学到的策略是安全的泛化性需要在不同环境下工作
小结
模仿学习为强化学习提供了新的范式
快速启动从专家知识开始避免随机探索安全高效特别适用于安全关键应用理论发展从行为克隆到GAIL的理论进步实际应用在多个领域都有成功案例
主要方法对比
行为克隆简单但有分布偏移问题DAgger解决分布偏移但需要在线专家IRL学习奖励函数更灵活GAIL对抗学习处理复杂行为
下一章我们将学习深度确定性策略梯度(DDPG)这是处理连续控制的重要算法 第十三章深度确定性策略梯度 (DDPG) - 小智的精确控制大师
13.1 DDPG的核心思想
想象小智现在需要学会精确控制不是简单的向左或向右而是向左偏上30度移动速度为0.7。这种连续、精确的控制就是DDPG要解决的问题
**DDPG (Deep Deterministic Policy Gradient)**结合了
DQN的经验回放和目标网络演员-评论员的架构确定性策略梯度理论
13.2 从随机到确定性策略
13.2.1 随机策略的问题
之前我们学的策略都是随机的 π ( a ∣ s ) \pi(a|s) π(a∣s)输出动作的概率分布。
连续动作空间的挑战
动作空间可能是无限的需要积分计算期望 E a ∼ π [ Q ( s , a ) ] \mathbb{E}_{a \sim \pi}[Q(s,a)] Ea∼π[Q(s,a)]计算复杂度高
13.2.2 确定性策略的优势
确定性策略 μ ( s ) a \mu(s) a μ(s)a直接输出一个动作。
优势
不需要积分直接计算 Q ( s , μ ( s ) ) Q(s, \mu(s)) Q(s,μ(s))更适合连续控制任务计算效率高
挑战
如何探索确定性策略没有随机性如何保证学习稳定性
13.3 确定性策略梯度定理
13.3.1 数学推导
目标最大化期望回报 J ( θ ) E s ∼ ρ μ θ [ Q ( s , μ θ ( s ) ) ] J(\theta) \mathbb{E}_{s \sim \rho^{\mu_\theta}}[Q(s, \mu_\theta(s))] J(θ)Es∼ρμθ[Q(s,μθ(s))]
确定性策略梯度定理 ∇ θ J ( θ ) E s ∼ ρ μ θ [ ∇ θ μ θ ( s ) ∇ a Q ( s , a ) ∣ a μ θ ( s ) ] \nabla_\theta J(\theta) \mathbb{E}_{s \sim \rho^{\mu_\theta}}[\nabla_\theta \mu_\theta(s) \nabla_a Q(s,a)|_{a\mu_\theta(s)}] ∇θJ(θ)Es∼ρμθ[∇θμθ(s)∇aQ(s,a)∣aμθ(s)]
直观理解 ∇ a Q ( s , a ) \nabla_a Q(s,a) ∇aQ(s,a)动作的价值梯度哪个方向动作更好 ∇ θ μ θ ( s ) \nabla_\theta \mu_\theta(s) ∇θμθ(s)策略参数的梯度如何调整参数两者相乘参数应该朝着提高动作价值的方向调整
13.3.2 与随机策略梯度的比较
随机策略梯度 ∇ θ J ( θ ) E [ ∇ θ log π θ ( a ∣ s ) Q ( s , a ) ] \nabla_\theta J(\theta) \mathbb{E}[\nabla_\theta \log \pi_\theta(a|s) Q(s,a)] ∇θJ(θ)E[∇θlogπθ(a∣s)Q(s,a)]
确定性策略梯度 ∇ θ J ( θ ) E [ ∇ θ μ θ ( s ) ∇ a Q ( s , a ) ] \nabla_\theta J(\theta) \mathbb{E}[\nabla_\theta \mu_\theta(s) \nabla_a Q(s,a)] ∇θJ(θ)E[∇θμθ(s)∇aQ(s,a)]
确定性版本避免了对动作空间的积分
13.4 DDPG算法架构
13.4.1 四个网络
DDPG使用四个神经网络
演员网络 μ ( s ∣ θ μ ) \mu(s|\theta^\mu) μ(s∣θμ)确定性策略评论员网络 Q ( s , a ∣ θ Q ) Q(s,a|\theta^Q) Q(s,a∣θQ)动作价值函数目标演员网络 μ ′ ( s ∣ θ μ ′ ) \mu(s|\theta^{\mu}) μ′(s∣θμ′)用于计算目标目标评论员网络 Q ′ ( s , a ∣ θ Q ′ ) Q(s,a|\theta^{Q}) Q′(s,a∣θQ′)用于计算目标
13.4.2 更新规则
评论员更新类似DQN y t r t γ Q ′ ( s t 1 , μ ′ ( s t 1 ∣ θ μ ′ ) ∣ θ Q ′ ) y_t r_t \gamma Q(s_{t1}, \mu(s_{t1}|\theta^{\mu})|\theta^{Q}) ytrtγQ′(st1,μ′(st1∣θμ′)∣θQ′) L 1 N ∑ i ( y i − Q ( s i , a i ∣ θ Q ) ) 2 L \frac{1}{N}\sum_i (y_i - Q(s_i, a_i|\theta^Q))^2 LN1i∑(yi−Q(si,ai∣θQ))2
演员更新策略梯度 ∇ θ μ J ≈ 1 N ∑ i ∇ a Q ( s i , a ∣ θ Q ) ∣ a μ ( s i ) ∇ θ μ μ ( s i ∣ θ μ ) \nabla_{\theta^\mu} J \approx \frac{1}{N}\sum_i \nabla_a Q(s_i, a|\theta^Q)|_{a\mu(s_i)} \nabla_{\theta^\mu} \mu(s_i|\theta^\mu) ∇θμJ≈N1i∑∇aQ(si,a∣θQ)∣aμ(si)∇θμμ(si∣θμ)
目标网络软更新 θ Q ′ ← τ θ Q ( 1 − τ ) θ Q ′ \theta^{Q} \leftarrow \tau \theta^Q (1-\tau)\theta^{Q} θQ′←τθQ(1−τ)θQ′ θ μ ′ ← τ θ μ ( 1 − τ ) θ μ ′ \theta^{\mu} \leftarrow \tau \theta^\mu (1-\tau)\theta^{\mu} θμ′←τθμ(1−τ)θμ′
13.5 探索策略
13.5.1 噪声探索
确定性策略需要添加噪声来探索 a t μ ( s t ∣ θ μ ) N t a_t \mu(s_t|\theta^\mu) \mathcal{N}_t atμ(st∣θμ)Nt
常用噪声
高斯噪声 N t ∼ N ( 0 , σ 2 ) \mathcal{N}_t \sim \mathcal{N}(0, \sigma^2) Nt∼N(0,σ2)OU噪声有时间相关性的噪声更适合物理系统
13.5.2 OU噪声 (Ornstein-Uhlenbeck Process)
公式 d x t θ ( μ − x t ) d t σ d W t dx_t \theta(\mu - x_t)dt \sigma dW_t dxtθ(μ−xt)dtσdWt
离散化 x t 1 x t θ ( μ − x t ) σ ϵ t x_{t1} x_t \theta(\mu - x_t) \sigma \epsilon_t xt1xtθ(μ−xt)σϵt
其中 ϵ t ∼ N ( 0 , 1 ) \epsilon_t \sim \mathcal{N}(0,1) ϵt∼N(0,1)
特点
有均值回归特性适合需要平滑探索的连续控制
13.6 网格世界中的DDPG实现
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import numpy as np
import random
from collections import deque
import matplotlib.pyplot as pltclass ActorNetwork(nn.Module):DDPG演员网络def __init__(self, state_size, action_size, hidden_size256):super(ActorNetwork, self).__init__()self.fc1 nn.Linear(state_size, hidden_size)self.fc2 nn.Linear(hidden_size, hidden_size)self.fc3 nn.Linear(hidden_size, action_size)# 初始化最后一层权重self.fc3.weight.data.uniform_(-3e-3, 3e-3)def forward(self, state):x F.relu(self.fc1(state))x F.relu(self.fc2(x))# 使用tanh激活函数确保输出在[-1, 1]范围action torch.tanh(self.fc3(x))return actionclass CriticNetwork(nn.Module):DDPG评论员网络def __init__(self, state_size, action_size, hidden_size256):super(CriticNetwork, self).__init__()# 状态处理层self.fc1 nn.Linear(state_size, hidden_size)# 状态动作处理层self.fc2 nn.Linear(hidden_size action_size, hidden_size)self.fc3 nn.Linear(hidden_size, 1)# 初始化最后一层权重self.fc3.weight.data.uniform_(-3e-3, 3e-3)def forward(self, state, action):# 先处理状态x F.relu(self.fc1(state))# 然后结合动作x torch.cat([x, action], dim1)x F.relu(self.fc2(x))q_value self.fc3(x)return q_valueclass OUNoise:Ornstein-Uhlenbeck噪声def __init__(self, action_size, mu0.0, theta0.15, sigma0.2):self.action_size action_sizeself.mu muself.theta thetaself.sigma sigmaself.state np.ones(self.action_size) * self.mudef reset(self):重置噪声self.state np.ones(self.action_size) * self.mudef sample(self):采样噪声dx self.theta * (self.mu - self.state) self.sigma * np.random.randn(self.action_size)self.state dxreturn self.stateclass ReplayBuffer:经验回放缓冲区def __init__(self, capacity100000):self.buffer deque(maxlencapacity)def push(self, state, action, reward, next_state, done):添加经验self.buffer.append((state, action, reward, next_state, done))def sample(self, batch_size):随机采样batch random.sample(self.buffer, batch_size)state, action, reward, next_state, done zip(*batch)return (torch.FloatTensor(state),torch.FloatTensor(action),torch.FloatTensor(reward),torch.FloatTensor(next_state),torch.BoolTensor(done))def __len__(self):return len(self.buffer)class DDPGAgent:DDPG智能体def __init__(self, state_size, action_size, lr_actor1e-4, lr_critic1e-3, gamma0.99, tau0.001, noise_scale0.1):self.state_size state_sizeself.action_size action_sizeself.gamma gammaself.tau tauself.noise_scale noise_scale# 创建网络self.actor ActorNetwork(state_size, action_size)self.critic CriticNetwork(state_size, action_size)self.target_actor ActorNetwork(state_size, action_size)self.target_critic CriticNetwork(state_size, action_size)# 初始化目标网络self.target_actor.load_state_dict(self.actor.state_dict())self.target_critic.load_state_dict(self.critic.state_dict())# 优化器self.actor_optimizer optim.Adam(self.actor.parameters(), lrlr_actor)self.critic_optimizer optim.Adam(self.critic.parameters(), lrlr_critic)# 经验回放和噪声self.memory ReplayBuffer()self.noise OUNoise(action_size)# 统计信息self.actor_losses []self.critic_losses []def select_action(self, state, add_noiseTrue):选择动作state torch.FloatTensor(state).unsqueeze(0)with torch.no_grad():action self.actor(state).cpu().numpy()[0]if add_noise:action self.noise.sample() * self.noise_scaleaction np.clip(action, -1, 1) # 确保动作在合理范围内return actiondef store_transition(self, state, action, reward, next_state, done):存储经验self.memory.push(state, action, reward, next_state, done)def learn(self, batch_size64):学习过程if len(self.memory) batch_size:return# 采样经验states, actions, rewards, next_states, dones self.memory.sample(batch_size)# 更新评论员with torch.no_grad():next_actions self.target_actor(next_states)next_q_values self.target_critic(next_states, next_actions)target_q_values rewards.unsqueeze(1) (self.gamma * next_q_values * ~dones.unsqueeze(1))current_q_values self.critic(states, actions)critic_loss F.mse_loss(current_q_values, target_q_values)self.critic_optimizer.zero_grad()critic_loss.backward()self.critic_optimizer.step()# 更新演员predicted_actions self.actor(states)actor_loss -self.critic(states, predicted_actions).mean()self.actor_optimizer.zero_grad()actor_loss.backward()self.actor_optimizer.step()# 软更新目标网络self.soft_update(self.target_actor, self.actor)self.soft_update(self.target_critic, self.critic)# 记录损失self.actor_losses.append(actor_loss.item())self.critic_losses.append(critic_loss.item())def soft_update(self, target, source):软更新目标网络for target_param, param in zip(target.parameters(), source.parameters()):target_param.data.copy_(self.tau * param.data (1.0 - self.tau) * target_param.data)class ContinuousControlGridWorld:连续控制网格世界def __init__(self, grid_size4, step_size0.1):self.grid_size grid_sizeself.step_size step_size# 位置范围self.start_pos np.array([0.5, 0.5])self.goal_pos np.array([3.5, 3.5])# 障碍物圆形self.obstacles [{center: np.array([1.5, 1.5]), radius: 0.3},{center: np.array([2.5, 2.5]), radius: 0.3}]self.current_pos self.start_pos.copy()def reset(self):重置环境self.current_pos self.start_pos.copy()return self.get_state()def get_state(self):获取归一化状态# 当前位置归一化normalized_pos self.current_pos / self.grid_size# 到目标的相对位置归一化relative_goal (self.goal_pos - self.current_pos) / self.grid_size# 到障碍物的距离obstacle_distances []for obs in self.obstacles:dist np.linalg.norm(self.current_pos - obs[center]) / self.grid_sizeobstacle_distances.append(dist)state np.concatenate([normalized_pos, relative_goal, obstacle_distances])return state.astype(np.float32)def is_collision(self, pos):检查碰撞# 边界检查if pos[0] 0 or pos[0] self.grid_size or pos[1] 0 or pos[1] self.grid_size:return True# 障碍物检查for obs in self.obstacles:if np.linalg.norm(pos - obs[center]) obs[radius]:return Truereturn Falsedef step(self, action):执行动作# 动作是2D向量范围[-1, 1]action np.clip(action, -1, 1)# 转换为实际移动delta action * self.step_sizenew_pos self.current_pos delta# 检查碰撞if self.is_collision(new_pos):# 碰撞不移动给予惩罚reward -5.0done Falseelse:# 正常移动old_dist np.linalg.norm(self.current_pos - self.goal_pos)self.current_pos new_posnew_dist np.linalg.norm(self.current_pos - self.goal_pos)# 奖励设计if new_dist 0.2: # 到达目标reward 100.0done Trueelse:# 距离奖励 时间惩罚reward (old_dist - new_dist) * 10 - 0.1done Falsereturn self.get_state(), reward, donedef train_ddpg():训练DDPG智能体print( DDPG训练 )# 创建环境和智能体env ContinuousControlGridWorld()state_size len(env.get_state())action_size 2 # 2D连续动作agent DDPGAgent(state_size, action_size)# 训练参数num_episodes 1000max_steps 200episode_rewards []for episode in range(num_episodes):state env.reset()agent.noise.reset() # 重置噪声episode_reward 0for step in range(max_steps):# 选择动作action agent.select_action(state, add_noiseTrue)# 执行动作next_state, reward, done env.step(action)# 存储经验agent.store_transition(state, action, reward, next_state, done)# 学习if len(agent.memory) 64:agent.learn()episode_reward rewardstate next_stateif done:breakepisode_rewards.append(episode_reward)# 衰减噪声if episode % 100 0:agent.noise_scale * 0.95if episode % 100 0:avg_reward np.mean(episode_rewards[-100:])print(fEpisode {episode}: Average Reward {avg_reward:.2f}, fNoise Scale {agent.noise_scale:.3f})print(DDPG训练完成)return agent, episode_rewardsdef evaluate_ddpg(agent, env, num_episodes100):评估DDPG智能体print(评估DDPG智能体...)episode_rewards []episode_lengths []success_count 0for episode in range(num_episodes):state env.reset()episode_reward 0steps 0for step in range(200):# 不添加噪声的确定性策略action agent.select_action(state, add_noiseFalse)next_state, reward, done env.step(action)episode_reward rewardsteps 1state next_stateif done:if reward 50: # 成功到达目标success_count 1breakepisode_rewards.append(episode_reward)episode_lengths.append(steps)avg_reward np.mean(episode_rewards)avg_length np.mean(episode_lengths)success_rate success_count / num_episodesprint(fDDPG评估结果:)print(f 平均奖励: {avg_reward:.2f})print(f 平均步数: {avg_length:.1f})print(f 成功率: {success_rate:.2%})return {avg_reward: avg_reward,avg_length: avg_length,success_rate: success_rate}def compare_all_algorithms():比较所有算法的性能print( 所有算法性能比较 )# 注意这里只是展示框架实际运行需要前面的环境algorithms {Q-learning: {avg_reward: 85.2, success_rate: 0.82, avg_length: 12.3},SARSA: {avg_reward: 80.1, success_rate: 0.78, avg_length: 13.1},Policy Gradient: {avg_reward: 87.5, success_rate: 0.85, avg_length: 11.8},PPO: {avg_reward: 92.3, success_rate: 0.91, avg_length: 10.5},DQN: {avg_reward: 89.1, success_rate: 0.87, avg_length: 11.2},Actor-Critic: {avg_reward: 88.7, success_rate: 0.86, avg_length: 11.5},DDPG: {avg_reward: 94.5, success_rate: 0.93, avg_length: 9.8}}print(f{算法:15} | {平均奖励:10} | {成功率:10} | {平均步数:10})print(- * 55)for name, results in algorithms.items():print(f{name:15} | {results[avg_reward]:10.1f} | f{results[success_rate]:10.2%} | {results[avg_length]:10.1f})# 训练和评估
# agent, rewards train_ddpg()
# env ContinuousControlGridWorld()
# results evaluate_ddpg(agent, env)# 比较所有算法
# compare_all_algorithms()13.7 DDPG的优缺点
13.7.1 优点
连续控制专门为连续动作空间设计样本效率使用经验回放提高样本利用率稳定性目标网络和软更新提高训练稳定性确定性输出确定性动作适合需要精确控制的任务
13.7.2 缺点
超参数敏感需要仔细调节学习率、噪声等参数探索困难确定性策略依赖噪声探索Q函数高估可能存在Q值高估问题收敛慢相比一些简单方法收敛可能较慢
13.8 DDPG的改进版本
13.8.1 TD3 (Twin Delayed DDPG)
主要改进
双评论员网络减少Q值高估延迟策略更新降低更新频率目标动作噪声在目标动作上添加噪声
13.8.2 SAC (Soft Actor-Critic)
主要改进
最大熵强化学习在奖励中加入熵项随机策略输出动作分布而不是确定性动作自动温度调节自适应调节探索程度
13.9 实际应用考虑
13.9.1 动作空间归一化
def normalize_action(action, action_low, action_high):将[-1, 1]的动作映射到实际动作空间return action_low (action 1.0) * 0.5 * (action_high - action_low)def denormalize_action(action, action_low, action_high):将实际动作映射到[-1, 1]return 2.0 * (action - action_low) / (action_high - action_low) - 1.013.9.2 奖励设计
好的奖励设计原则
稠密奖励提供连续的反馈平衡项避免某一项奖励过于突出归一化保持奖励在合理范围内
小结
DDPG是连续控制领域的重要算法
理论创新确定性策略梯度定理的应用工程实践结合DQN和Actor-Critic的优点广泛应用机器人控制、游戏AI等领域持续发展启发了TD3、SAC等后续算法
DDPG的核心贡献
解决了连续动作空间的强化学习问题提供了稳定的训练框架为后续算法奠定了基础 总结小智的强化学习之旅
完整的学习路径
通过这个完整的教程我们跟随小智走过了强化学习的完整旅程
基础篇
状态表示 - 小智学会了如何理解世界马尔科夫过程 - 小智理解了活在当下的重要性贝尔曼方程 - 小智掌握了价值的递归本质表格方法 - 小智学会了最基本的学习方法
进阶篇
Q-learning - 小智学会了独立探索和学习策略梯度 - 小智学会了直接优化行为PPO - 小智学会了稳定的策略改进深度Q网络 - 小智拥有了神经网络大脑
高级篇
连续动作 - 小智学会了精确控制演员-评论员 - 小智拥有了双重人格稀疏奖励 - 小智学会了在困难中坚持模仿学习 - 小智学会了从导师那里学习DDPG - 小智成为了连续控制大师
核心概念总结
数学基础
马尔科夫性质 P ( s t 1 ∣ s t , s t − 1 , . . . ) P ( s t 1 ∣ s t ) P(s_{t1}|s_t, s_{t-1}, ...) P(s_{t1}|s_t) P(st1∣st,st−1,...)P(st1∣st)贝尔曼方程 V ( s ) max a ∑ s ′ P ( s ′ ∣ s , a ) [ R ( s , a , s ′ ) γ V ( s ′ ) ] V(s) \max_a \sum_{s} P(s|s,a)[R(s,a,s) \gamma V(s)] V(s)maxa∑s′P(s′∣s,a)[R(s,a,s′)γV(s′)]策略梯度定理 ∇ J ( θ ) E [ ∇ log π ( a ∣ s ) A ( s , a ) ] \nabla J(\theta) \mathbb{E}[\nabla \log \pi(a|s) A(s,a)] ∇J(θ)E[∇logπ(a∣s)A(s,a)]
算法分类
基于价值Q-learning, DQN基于策略Policy Gradient, PPO演员-评论员A2C, DDPG模仿学习BC, DAgger, GAIL
关键技术
经验回放提高样本效率目标网络稳定训练过程探索策略平衡探索与利用函数近似处理大状态空间
实践指南
选择算法的建议
离散动作 简单环境 → Q-learning离散动作 复杂环境 → DQN, PPO连续动作 确定性控制 → DDPG连续动作 随机策略 → PPO, SAC有专家演示 → 模仿学习稀疏奖励 → 奖励整形, 好奇心驱动
调试技巧
从简单开始先在小环境验证算法监控指标奖励、损失、探索率等可视化学习绘制学习曲线对比基线与简单方法比较参数搜索系统性地调节超参数
未来展望
强化学习仍在快速发展
新兴方向
元学习学会如何学习多智能体多个智能体协作/竞争分层强化学习复杂任务的分解安全强化学习保证安全约束
应用领域
自动驾驶路径规划和控制机器人操作和导航游戏AI策略游戏和电竞推荐系统个性化推荐金融算法交易医疗治疗方案优化
最后的话
强化学习是一门结合了数学理论、计算机科学和心理学的综合性学科。通过这个教程我们不仅学会了各种算法更重要的是理解了智能决策的本质。
小智的故事告诉我们
探索很重要没有探索就没有发现经验很宝贵过去的经历指导未来的决策平衡是关键探索与利用、偏差与方差持续学习环境在变化学习永不停止
希望这个教程能够帮助你在强化学习的道路上越走越远就像小智一样从一个简单的网格世界开始最终掌握复杂世界的智能决策
强化学习的核心思想通过与环境的交互在试错中学习最优的行为策略。这不仅是机器学习的重要分支更是理解智能本质的重要途径。
愿你的AI之旅精彩而充实 完整强化学习教程到此结束。感谢陪伴小智走过这段学习之旅 代码运行说明
强化学习教程运行示例
概述
本文件 运行示例.py 包含了《完整强化学习教程-合并版.md》中所有主要算法的Python实现。通过小智在4×4网格世界中的冒险演示了从基础状态表示到高级深度强化学习算法的完整学习路径。
环境设置
网格世界参数
网格大小: 4×4起点: (1,1) - 左上角目标: (4,4) - 右下角障碍物: (2,2), (3,3)动作空间: 上、下、左、右、不动
奖励设计
到达目标: 100每步移动: -1撞墙/撞障碍物: -10
包含的算法
基础理论 (Chapter 1-3)
✅ 状态表示: GridWorldEnvironment✅ 马尔可夫过程: MarkovGridWorld (带转移概率)✅ 贝尔曼方程: MDPGridWorld 价值迭代
经典算法 (Chapter 4-6)
✅ Q-learning: QLearningAgent (off-policy)✅ SARSA: SARSAAgent (on-policy)✅ 表格方法: 蒙特卡洛、时间差分
深度强化学习 (Chapter 7-8)
✅ 深度Q网络(DQN): DQNAgent (需要PyTorch)⚠️ 策略梯度: PolicyGradientAgent (需要PyTorch)⚠️ PPO算法: 在教程中有详细实现
高级算法 (Chapter 9-13) 连续动作深度Q网络 Actor-Critic算法 稀疏奖励处理 模仿学习 DDPG算法
依赖安装
基础依赖 (必需)
pip install numpy matplotlib可选依赖
# 深度学习功能
pip install torch torchvision# 数据分析
pip install pandas使用方法
直接运行
python3 运行示例.py选择测试模式
运行后会显示菜单
环境测试 (状态表示)马尔可夫过程测试贝尔曼方程和价值迭代Q-learning算法SARSA算法深度Q网络 (需要PyTorch)算法性能比较完整算法演示 (推荐) ⭐运行所有测试
程序化使用
import 运行示例# 创建环境
env 运行示例.MDPGridWorld(gamma0.9)# 训练Q-learning智能体
agent 运行示例.QLearningAgent(env)
rewards agent.train(num_episodes1000)# 提取和可视化策略
policy agent.extract_policy()
运行示例.visualize_policy(env, policy)输出示例
策略可视化 学到的策略
符号含义: ↑上 ↓下 ←左 →右 ●不动
--------------------
| → | → | ↓ | |
--------------------
| ↑ | | ↓ | ↓ |
--------------------
| ↑ | ↑ | | ↓ |
--------------------
| → | → | → | ↑ |学习曲线
程序会自动生成matplotlib图表显示不同算法的学习进度和性能比较。
教程对应关系
文件中的类/函数教程章节说明GridWorldEnvironmentChapter 1状态表示基础MarkovGridWorldChapter 2马尔可夫转移概率MDPGridWorldChapter 3贝尔曼方程和价值迭代QLearningAgentChapter 6Q-learning算法SARSAAgentChapter 6SARSA算法DQNAgentChapter 8深度Q网络
扩展学习
理论学习: 阅读《完整强化学习教程-合并版.md》了解详细数学推导实验拓展: 尝试修改网格大小、障碍物位置、奖励函数算法比较: 观察不同算法在相同环境下的表现差异参数调优: 调整学习率、探索率、折扣因子等超参数
常见问题
Q: PyTorch相关功能无法使用
A: 安装PyTorch: pip install torch或跳过深度学习相关测试
Q: matplotlib图表无法显示
A: 确保安装了GUI后端: pip install matplotlib[gui]
Q: 算法收敛慢
A: 尝试调整超参数如增加训练回合数或调整学习率
Q: 想要实现其他算法
A: 参考现有代码结构继承基础类并实现新的学习算法