MDP的recoders部分
文章目录
- 1. 核心功能
- 2. 主要组件
- 2.1 记录器类型 (recorders.py)
- 2.2 配置选项 (recorders_cfg.py)
- 3.使用步骤
- 先配置go2_with_recorder_cfg.py
- 再编写go2_policy_obs_recorder_example.py
- 4.使用文档
- Go2机器人策略观测记录器使用指南
- 文件结构
- 快速开始
- 配置选项
- 记录器配置类型
- 环境配置类型
- 自定义配置
- 创建自定义记录器配置
- 在环境中使用自定义配置
- 数据格式
- 读取数据示例
- 观测数据内容
recorders 文件夹是 Isaac Lab 中用于数据记录和数据集生成的重要组件。它的主要作用是在强化学习训练过程中记录各种环境数据,用于后续的分析、调试和数据集构建。
1. 核心功能
数据记录系统:
记录环境在不同阶段的状态、动作、观测等数据
支持在环境重置前后、步骤前后的数据记录
自动管理数据的收集、存储和导出
2. 主要组件
2.1 记录器类型 (recorders.py)
# 初始状态记录器 - 记录环境重置后的初始状态
class InitialStateRecorder(RecorderTerm):def record_post_reset(self, env_ids):return "initial_state", self._env.scene.get_state(is_relative=True)# 步骤后状态记录器 - 记录每步结束时的环境状态
class PostStepStatesRecorder(RecorderTerm):def record_post_step(self):return "states", self._env.scene.get_state(is_relative=True)# 步骤前动作记录器 - 记录每步开始时的动作
class PreStepActionsRecorder(RecorderTerm):def record_pre_step(self):return "actions", self._env.action_manager.action# 策略观测记录器 - 记录策略网络的观测数据
class PreStepFlatPolicyObservationsRecorder(RecorderTerm):def record_pre_step(self):return "obs", self._env.obs_buf["policy"]
记录器在环境生命周期的四个关键时刻工作:
重置前记录 (record_pre_reset): 在环境重置前记录数据,通常用于导出完整的回合数据
重置后记录 (record_post_reset): 在环境重置后记录初始状态
步骤前记录 (record_pre_step): 在每步开始时记录动作和观测
步骤后记录 (record_post_step): 在每步结束时记录状态和结果
2.2 配置选项 (recorders_cfg.py)
@configclass
class ActionStateRecorderManagerCfg(RecorderManagerBaseCfg):"""动作和状态记录器配置"""record_initial_state = InitialStateRecorderCfg()record_post_step_states = PostStepStatesRecorderCfg()record_pre_step_actions = PreStepActionsRecorderCfg()record_pre_step_flat_policy_observations = PreStepFlatPolicyObservationsRecorderCfg()
3.使用步骤
先配置go2_with_recorder_cfg.py
# Copyright (c) 2022-2025, The Isaac Lab Project Developers.
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause"""
Go2机器人环境配置,集成策略观测记录器
Configuration for Go2 robot environment with policy observations recorder
"""from isaaclab.utils import configclass
from isaaclab.managers import RecorderManagerBaseCfg, DatasetExportMode
from isaaclab.envs.mdp.recorders import PreStepFlatPolicyObservationsRecorderCfg, PreStepActionsRecorderCfgfrom .rough_env_cfg import UnitreeGo2RoughEnvCfg@configclass
class Go2PolicyObsRecorderManagerCfg(RecorderManagerBaseCfg):"""Go2机器人策略观测记录器配置Policy observations recorder configuration for Go2 robot"""# 记录策略观测数据 - Record policy observationsrecord_policy_observations = PreStepFlatPolicyObservationsRecorderCfg()# 同时记录动作数据用于分析 - Also record actions for analysisrecord_actions = PreStepActionsRecorderCfg()# 数据导出配置 - Data export configurationdataset_export_dir_path: str = "/tmp/isaaclab/go2_policy_data"dataset_filename: str = "go2_policy_observations"dataset_export_mode: DatasetExportMode = DatasetExportMode.EXPORT_ALLexport_in_record_pre_reset: bool = True@configclass
class UnitreeGo2RoughEnvCfg_WithRecorder(UnitreeGo2RoughEnvCfg):"""带有策略观测记录器的Go2粗糙地形环境配置Go2 rough terrain environment configuration with policy observations recorder"""def __post_init__(self):# 调用父类的后初始化 - Call parent's post initializationsuper().__post_init__()# 设置记录器配置 - Set recorder configurationself.recorders = Go2PolicyObsRecorderManagerCfg()# 为了更好的数据收集,减少环境数量 - Reduce number of environments for better data collectionself.scene.num_envs = 64# 延长回合时间以收集更多数据 - Extend episode length to collect more dataself.episode_length_s = 30.0@configclass
class UnitreeGo2RoughEnvCfg_WithRecorder_PLAY(UnitreeGo2RoughEnvCfg_WithRecorder):"""用于演示/测试的Go2记录器环境配置Go2 recorder environment configuration for demonstration/testing"""def __post_init__(self):# 调用父类的后初始化 - Call parent's post initializationsuper().__post_init__()# 更小的场景用于测试 - Smaller scene for testingself.scene.num_envs = 16self.scene.env_spacing = 2.5# 禁用随机化以获得一致的数据 - Disable randomization for consistent dataself.observations.policy.enable_corruption = False# 移除随机推力事件 - Remove random pushing eventsself.events.base_external_force_torque = Noneself.events.push_robot = None# 设置更短的回合时间用于快速测试 - Shorter episodes for quick testingself.episode_length_s = 15.0# 自定义数据导出路径 - Custom data export pathself.recorders.dataset_export_dir_path = "/tmp/isaaclab/go2_test_data"self.recorders.dataset_filename = "go2_test_observations"@configclass
class Go2HighPerformanceRecorderManagerCfg(RecorderManagerBaseCfg):"""高性能Go2记录器配置,优化大规模数据收集High-performance Go2 recorder configuration optimized for large-scale data collection"""# 只记录策略观测,减少数据量 - Only record policy observations to reduce data volumerecord_policy_observations = PreStepFlatPolicyObservationsRecorderCfg()# 使用快速存储路径 - Use fast storage pathdataset_export_dir_path: str = "/fast_ssd/go2_data"dataset_filename: str = "go2_policy_obs_optimized"# 只导出成功的回合以减少数据量 - Only export successful episodes to reduce data volumedataset_export_mode: DatasetExportMode = DatasetExportMode.EXPORT_SUCCEEDED_ONLY# 减少I/O频率 - Reduce I/O frequencyexport_in_record_pre_reset: bool = False@configclass
class UnitreeGo2RoughEnvCfg_HighPerformance(UnitreeGo2RoughEnvCfg):"""高性能Go2环境配置,用于大规模数据收集High-performance Go2 environment configuration for large-scale data collection"""def __post_init__(self):# 调用父类的后初始化 - Call parent's post initializationsuper().__post_init__()# 设置高性能记录器 - Set high-performance recorderself.recorders = Go2HighPerformanceRecorderManagerCfg()# 大规模并行环境 - Large-scale parallel environmentsself.scene.num_envs = 2048# 优化性能设置 - Performance optimization settingsself.sim.physx.gpu_max_rigid_patch_count = 20 * 2**15
再编写go2_policy_obs_recorder_example.py
#!/usr/bin/env python3# Copyright (c) 2022-2025, The Isaac Lab Project Developers.
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause"""
Go2机器人策略观测记录器使用示例
Example script for using PreStepFlatPolicyObservationsRecorder with Go2 robot使用方法 Usage:
python examples/go2_policy_obs_recorder_example.py --num_envs 16 --num_episodes 5数据分析 Data Analysis:
python examples/go2_policy_obs_recorder_example.py --analyze_data /tmp/isaaclab/go2_test_data/go2_test_observations.hdf5
"""import argparse
import gymnasium as gym
import h5py
import numpy as np
import torch
import os
from datetime import datetime# Isaac Lab imports
import isaaclab
from isaaclab.utils.dict import print_dict
from isaaclab.utils.io import dump_pickle, dump_yaml# 导入我们创建的Go2配置 - Import our Go2 configurations
from isaaclab_tasks.manager_based.locomotion.velocity.config.go2.go2_with_recorder_cfg import (UnitreeGo2RoughEnvCfg_WithRecorder_PLAY,UnitreeGo2RoughEnvCfg_WithRecorder,UnitreeGo2RoughEnvCfg_HighPerformance
)def create_go2_env_with_recorder(config_name: str = "play", num_envs: int = 16):"""创建带有记录器的Go2环境Create Go2 environment with recorderArgs:config_name: 配置名称 ("play", "standard", "high_performance")num_envs: 环境数量"""# 选择配置 - Select configurationif config_name == "play":env_cfg = UnitreeGo2RoughEnvCfg_WithRecorder_PLAY()elif config_name == "standard":env_cfg = UnitreeGo2RoughEnvCfg_WithRecorder()elif config_name == "high_performance":env_cfg = UnitreeGo2RoughEnvCfg_HighPerformance()else:raise ValueError(f"未知配置: {config_name}")# 设置环境数量 - Set number of environmentsenv_cfg.scene.num_envs = num_envs# 创建环境 - Create environmentenv = gym.make("Isaac-Velocity-Rough-Unitree-Go2-v0", cfg=env_cfg)return env, env_cfgdef collect_go2_policy_observations(env, num_episodes: int = 5):"""收集Go2机器人的策略观测数据Collect policy observations data from Go2 robotArgs:env: Isaac Lab环境num_episodes: 收集的回合数"""print(f"开始收集Go2策略观测数据...")print(f"环境数量: {env.num_envs}")print(f"观测空间维度: {env.observation_space.shape}")print(f"动作空间维度: {env.action_space.shape}")print(f"目标回合数: {num_episodes}")episode_count = 0total_steps = 0while episode_count < num_episodes:print(f"\n=== 回合 {episode_count + 1}/{num_episodes} ===")# 重置环境 - Reset environmentobs, info = env.reset()done = Falsestep_count = 0episode_rewards = []while not done:# 使用随机动作(实际应用中可以使用训练好的策略)# Use random actions (in practice, use trained policy)action = env.action_space.sample()# 执行动作 - Execute actionobs, reward, terminated, truncated, info = env.step(action)# 记录奖励 - Record rewardsepisode_rewards.append(reward.mean().item())# 检查是否结束 - Check if donedone = terminated.any() or truncated.any()step_count += 1total_steps += 1# 每100步打印进度 - Print progress every 100 stepsif step_count % 100 == 0:avg_reward = np.mean(episode_rewards[-100:])print(f" 步数: {step_count:4d}, 平均奖励: {avg_reward:6.3f}, "f"存活环境: {(~terminated).sum().item()}/{env.num_envs}")# 回合统计 - Episode statisticsavg_episode_reward = np.mean(episode_rewards)print(f"回合 {episode_count + 1} 完成:")print(f" 总步数: {step_count}")print(f" 平均奖励: {avg_episode_reward:.3f}")print(f" 最终存活环境数: {(~terminated).sum().item()}")episode_count += 1print(f"\n数据收集完成!")print(f"总回合数: {episode_count}")print(f"总步数: {total_steps}")return episode_count, total_stepsdef analyze_recorded_data(dataset_path: str):"""分析记录的策略观测数据Analyze recorded policy observations dataArgs:dataset_path: HDF5数据集路径"""if not os.path.exists(dataset_path):print(f"错误: 数据文件不存在: {dataset_path}")returnprint(f"分析数据文件: {dataset_path}")with h5py.File(dataset_path, "r") as f:print(f"\n数据集概览:")print(f" 回合数量: {len(f.keys())}")# 分析第一个回合的数据 - Analyze first episode dataepisode_keys = list(f.keys())first_episode = f[episode_keys[0]]print(f"\n第一个回合 ({episode_keys[0]}) 数据结构:")print(f" 数据类型: {list(first_episode.keys())}")# 分析观测数据 - Analyze observation dataif "obs" in first_episode:obs_data = first_episode["obs"][:]print(f"\n策略观测数据分析:")print(f" 数据形状: {obs_data.shape}")print(f" 数据类型: {obs_data.dtype}")print(f" 数据范围: [{obs_data.min():.3f}, {obs_data.max():.3f}]")print(f" 数据均值: {obs_data.mean():.3f}")print(f" 数据标准差: {obs_data.std():.3f}")# 分析观测特征的分布 - Analyze observation feature distributionprint(f"\n观测特征统计 (前10个特征):")for i in range(min(10, obs_data.shape[1])):feature_data = obs_data[:, i]print(f" 特征 {i:2d}: 均值={feature_data.mean():6.3f}, "f"标准差={feature_data.std():6.3f}, "f"范围=[{feature_data.min():6.3f}, {feature_data.max():6.3f}]")# 分析动作数据 - Analyze action dataif "actions" in first_episode:action_data = first_episode["actions"][:]print(f"\n动作数据分析:")print(f" 数据形状: {action_data.shape}")print(f" 数据范围: [{action_data.min():.3f}, {action_data.max():.3f}]")print(f" 数据均值: {action_data.mean():.3f}")print(f" 数据标准差: {action_data.std():.3f}")# 统计所有回合 - Statistics for all episodestotal_steps = 0episode_lengths = []for episode_key in episode_keys:episode = f[episode_key]if "obs" in episode:episode_length = episode["obs"].shape[0]episode_lengths.append(episode_length)total_steps += episode_lengthprint(f"\n所有回合统计:")print(f" 总步数: {total_steps}")print(f" 平均回合长度: {np.mean(episode_lengths):.1f}")print(f" 回合长度范围: [{min(episode_lengths)}, {max(episode_lengths)}]")print(f" 回合长度标准差: {np.std(episode_lengths):.1f}")def main():"""主函数 - Main function"""parser = argparse.ArgumentParser(description="Go2策略观测记录器示例")parser.add_argument("--config", type=str, default="play", choices=["play", "standard", "high_performance"],help="环境配置类型")parser.add_argument("--num_envs", type=int, default=16, help="并行环境数量")parser.add_argument("--num_episodes", type=int, default=5, help="收集的回合数")parser.add_argument("--analyze_data", type=str, default=None,help="分析指定的数据文件路径")parser.add_argument("--headless", action="store_true", default=False,help="无头模式运行")args = parser.parse_args()# 如果指定了数据分析,直接分析数据 - If data analysis is specified, analyze data directlyif args.analyze_data:analyze_recorded_data(args.analyze_data)returnprint("=" * 60)print("Go2机器人策略观测记录器示例")print("=" * 60)print(f"配置: {args.config}")print(f"环境数量: {args.num_envs}")print(f"目标回合数: {args.num_episodes}")print(f"无头模式: {args.headless}")print("=" * 60)try:# 创建环境 - Create environmentprint("创建Go2环境...")env, env_cfg = create_go2_env_with_recorder(args.config, args.num_envs)# 打印环境信息 - Print environment infoprint(f"环境创建成功!")print(f"数据将保存到: {env_cfg.recorders.dataset_export_dir_path}")print(f"文件名: {env_cfg.recorders.dataset_filename}.hdf5")# 收集数据 - Collect dataepisode_count, total_steps = collect_go2_policy_observations(env, args.num_episodes)# 关闭环境 - Close environmentenv.close()# 分析收集的数据 - Analyze collected datadataset_path = os.path.join(env_cfg.recorders.dataset_export_dir_path,f"{env_cfg.recorders.dataset_filename}.hdf5")print(f"\n正在分析收集的数据...")analyze_recorded_data(dataset_path)print(f"\n示例完成!")print(f"数据文件位置: {dataset_path}")except Exception as e:print(f"错误: {e}")import tracebacktraceback.print_exc()if __name__ == "__main__":main()
4.使用文档
Go2机器人策略观测记录器使用指南
本指南详细说明如何在Isaac Lab中为Go2机器人使用 PreStepFlatPolicyObservationsRecorder
来记录策略网络的观测数据。
概述
PreStepFlatPolicyObservationsRecorder
是Isaac Lab中的一个记录器组件,它可以在每个仿真步骤之前记录策略网络接收到的观测数据。这对于以下应用场景非常有用:
- 模仿学习数据收集 - 收集专家演示数据
- 行为分析 - 分析机器人的观测模式
- 离线强化学习 - 为离线RL算法准备数据集
- 调试和可视化 - 理解策略网络的输入数据
文件结构
source/isaaclab_tasks/isaaclab_tasks/manager_based/locomotion/velocity/config/go2/
├── go2_with_recorder_cfg.py # Go2记录器配置文件
├── __init__.py # 更新的初始化文件
└── ...examples/
├── go2_simple_recorder_demo.py # 简单演示脚本
├── go2_policy_obs_recorder_example.py # 完整示例脚本
└── README_Go2_Recorder.md # 本文档
快速开始
# 基本用法
python examples/go2_policy_obs_recorder_example.py --num_envs 16 --num_episodes 3# 高性能配置
python examples/go2_policy_obs_recorder_example.py --config high_performance --num_envs 64 --num_episodes 10# 分析已有数据
python examples/go2_policy_obs_recorder_example.py --analyze_data /tmp/isaaclab/go2_test_data/go2_test_observations.hdf5
配置选项
记录器配置类型
-
Go2PolicyObsRecorderManagerCfg - 标准配置
- 记录策略观测和动作数据
- 适合一般用途的数据收集
-
Go2HighPerformanceRecorderManagerCfg - 高性能配置
- 只记录策略观测数据
- 优化大规模数据收集
- 只导出成功的回合
环境配置类型
-
UnitreeGo2RoughEnvCfg_WithRecorder_PLAY - 演示配置
- 16个环境
- 15秒回合长度
- 适合测试和演示
-
UnitreeGo2RoughEnvCfg_WithRecorder - 标准配置
- 64个环境
- 30秒回合长度
- 适合正常数据收集
-
UnitreeGo2RoughEnvCfg_HighPerformance - 高性能配置
- 2048个环境
- 优化的物理设置
- 适合大规模数据收集
自定义配置
创建自定义记录器配置
from isaaclab.utils import configclass
from isaaclab.managers import RecorderManagerBaseCfg, DatasetExportMode
from isaaclab.envs.mdp.recorders import PreStepFlatPolicyObservationsRecorderCfg@configclass
class MyCustomRecorderCfg(RecorderManagerBaseCfg):# 只记录策略观测record_policy_observations = PreStepFlatPolicyObservationsRecorderCfg()# 自定义数据路径dataset_export_dir_path: str = "/my/custom/path"dataset_filename: str = "my_go2_data"# 只导出成功的回合dataset_export_mode: DatasetExportMode = DatasetExportMode.EXPORT_SUCCEEDED_ONLY
在环境中使用自定义配置
from isaaclab_tasks.manager_based.locomotion.velocity.config.go2.rough_env_cfg import UnitreeGo2RoughEnvCfg@configclass
class MyGo2EnvCfg(UnitreeGo2RoughEnvCfg):def __post_init__(self):super().__post_init__()self.recorders = MyCustomRecorderCfg()self.scene.num_envs = 32 # 自定义环境数量
数据格式
记录的数据以HDF5格式保存,结构如下:
dataset.hdf5
├── episode_0/
│ ├── obs # 策略观测数据 [steps, obs_dim]
│ └── actions # 动作数据 [steps, action_dim] (如果启用)
├── episode_1/
│ ├── obs
│ └── actions
└── ...
读取数据示例
import h5py
import numpy as np# 读取数据
with h5py.File("go2_policy_observations.hdf5", "r") as f:# 获取第一个回合的观测数据obs_data = f["episode_0"]["obs"][:]print(f"观测数据形状: {obs_data.shape}")# 获取所有回合的数据all_observations = []for episode_key in f.keys():episode_obs = f[episode_key]["obs"][:]all_observations.append(episode_obs)# 合并所有数据combined_obs = np.concatenate(all_observations, axis=0)print(f"总观测数据形状: {combined_obs.shape}")
观测数据内容
Go2机器人的策略观测包含以下信息:
- base_lin_vel - 基座线速度 (3维)
- base_ang_vel - 基座角速度 (3维)
- projected_gravity - 投影重力向量 (3维)
- velocity_commands - 速度命令 (3维)
- joint_pos - 关节位置 (12维)
- joint_vel - 关节速度 (12维)
- actions - 上一步动作 (12维)
- height_scan - 高度扫描数据 (187维)
总观测维度:235维