- 符号
T
: 预测轨迹长度
eval_policy.py
调用deploy_policy.py:eval()
,其调用RobotRunner.get_action()
RobotRunner.get_action()
(robot_runner.py)obs = self.get_n_steps_obs()
- obs <- update_obs() 就是 append <- Base_task.get_obs()
- [ai]
- observation - 包含来自各种相机的观察数据
- 相机数据包括
head_camera
,left_camera
,right_camera
,front_camera
等 - 每个相机可以包含以下数据(取决于
data_type
设置):rgb
- RGB图像数据mesh_segmentation
- 网格分割数据actor_segmentation
- 实体分割数据depth
- 深度图像数据- 相机内参和外参矩阵
- 相机数据包括
- pointcloud - 点云数据
- 如果
data_type['pointcloud']
为 True,则包含点云数据 - 可以选择是否合并多个相机的点云数据
- 如果
- joint_action - 机器人关节状态
- 如果
data_type['qpos']
为 True,包含机器人关节角度 - 双臂模式下,包含左臂和右臂的关节角度
- 单臂模式下,仅包含右臂的关节角度
- 如果
- endpose - 机器人末端执行器姿态
- 如果
data_type['endpose']
为 True,包含末端执行器的位置和姿态 - 双臂模式下,包含左右两个末端执行器的信息(位置x,y,z,欧拉角roll,pitch,yaw,以及夹爪状态)
- 单臂模式下,仅包含右臂末端执行器信息
- 如果
- vision_tactile - 视觉触觉传感器数据(当
TACTILE_ON
为 True 时)- 如果
data_type['vision_tactile']
为 True,包含触觉传感器的RGB图像数据
- 如果
- 随后拿出两个数据并重命名: pointcloud -> point_cloud, joint_action -> agent_pos
- 得到 obs:
Dict
- each_key => 将最近 n 个观测的 key 在第 0 维度拼接. 形状为
(n_steps, ) + shape_of_the_value
- n_steps 在参数 yaml 里为 n_obs_steps = 3
- each_key => 将最近 n 个观测的 key 在第 0 维度拼接. 形状为
- 在前面 unsqueeze 一个长度为 1 的维度后送进
DP3.predict_action()
(应该是因为推理的时候 batch 必是 1)
- obs <- update_obs() 就是 append <- Base_task.get_obs()
1class DP3:2 predict_action() and (dp3.py)3 [arg] (obs_dict):4 'point_cloud': (1, 3, 1024, 6)5 'agent_pos': (3, 14) 就是关节角度6
7 normalize()8 if @?global_cond:9 point_cloud & agent_pos 都送入 DP3Encoder,得到 (3, 192),压扁成 (1, 576)10 mask 就是全部 mask 掉, 所有动作都需要通过扩散模型生成11 else:12 mask 观察特征保持可见13 送入 self.condition_sapmle()14 return. 实测表明一次预测 6 步且会把这 6 步执行完,再预测下 6 步.15
11 collapsed lines
16 ...17 action_pred: (B, T, action_dim) e.g. (1, 8, 14)18
19 start = To - 120 end = start + self.n_action_steps21 action = action_pred[:, start:end] e.g. (1, 6, 14)22
23 condition_sample():24 生成的 traj shape 是 (B, T, action_dim) = (1, 8, 14)25 每个去噪步 model(sample=trajectory, timestep=t, local_cond=local_cond(必为 None), global_cond=global_cond)26 model is @ConditionalUnet1D.forward():
1[ConditionalUnet1D.forward()]:2 ...3
4 timestep: (形状 (B, ) or int)5 encoding: (SinusoidalPosEmb, Linear, Mish, Linear)6
7 if global_cond: global_feature = cat([timestep_embed, global_cond], axis=-1) }8
9 [Downsample]10 x = sample (生成 trajactory)11 for idx, (@resnet, @resnet2, @downsample) in enumerate(self.down_modules):12 if self.use_down_condition:13 x = resnet(x, global_feature)14 if idx == 0 and len(h_local) > 0:15 x = x + h_local[0]39 collapsed lines
16 x = resnet2(x, global_feature)17 else:18 x = resnet(x)19 if idx == 0 and len(h_local) > 0:20 x = x + h_local[0]21 x = resnet2(x)22 h.append(x)23 x = downsample(x)24
25 [mid_module (@ConditionalResidualBlock1D)]26 ...27
28 [Upsample]29 对称的30
31 [return]32 x = self.final_conv(x)33 return x34
35[resnet] = @ConditionalResidualBlock1D(36 dim_in, dim_out, cond_dim=cond_dim,37 kernel_size=kernel_size, n_groups=n_groups,38 condition_type=condition_type39),40[resnet2] = ConditionalResidualBlock1D(41 dim_out, dim_out, cond_dim=cond_dim,42 kernel_size=kernel_size, n_groups=n_groups,43 condition_type=condition_type44),45[downsample] = Downsample1d(dim_out) if not is_last else nn.Identity()46
47[ConditionalResidualBlock1D].forward():48 out = Conv1dBlock() (x)49 if `cross_attention_add`50 embed = CrossAttention() (x)51 out = out + embed (是 tensor 值加)52 out = another Conv1dBlock() (x)53 out = out + self.residual_conv(x)54 return out
1DP3Encoder:2 'point_cloud' => B x N x 3的 点云 (B: batch) = (3, 1024, 3) 送入 PointNetEncoderXYZ:3
4class PointNetEncoderXYZ5 MLP: [Linear + LayerNorm + ReLU] x 36 channels: 3 => 64 => 128 => 256 => max => Linear + LayerNorm (128)7
8 forward():9 (B, N, 3) = (3, 1024, 3)10 mlp => (3, 1024, 256)11 max => (3, 256)12 Linear + LayerNorm => (3, 128)13 self.state_mlp: 简单的 MLP (Linear + ReLU). state_mlp_size = (64, 64).14 最后 cat 成 (3, 192)