当前位置: 首页 > news >正文

MDP的recoders部分

文章目录

  • 1. 核心功能
  • 2. 主要组件
  • 2.1 记录器类型 (recorders.py)
    • 2.2 配置选项 (recorders_cfg.py)
  • 3.使用步骤
    • 先配置go2_with_recorder_cfg.py
    • 再编写go2_policy_obs_recorder_example.py
  • 4.使用文档
    • Go2机器人策略观测记录器使用指南
    • 文件结构
    • 快速开始
    • 配置选项
      • 记录器配置类型
      • 环境配置类型
    • 自定义配置
      • 创建自定义记录器配置
      • 在环境中使用自定义配置
    • 数据格式
    • 读取数据示例
    • 观测数据内容

recorders 文件夹是 Isaac Lab 中用于数据记录和数据集生成的重要组件。它的主要作用是在强化学习训练过程中记录各种环境数据,用于后续的分析、调试和数据集构建。
在这里插入图片描述

1. 核心功能

数据记录系统:
记录环境在不同阶段的状态、动作、观测等数据
支持在环境重置前后、步骤前后的数据记录
自动管理数据的收集、存储和导出

2. 主要组件

2.1 记录器类型 (recorders.py)

# 初始状态记录器 - 记录环境重置后的初始状态
class InitialStateRecorder(RecorderTerm):def record_post_reset(self, env_ids):return "initial_state", self._env.scene.get_state(is_relative=True)# 步骤后状态记录器 - 记录每步结束时的环境状态
class PostStepStatesRecorder(RecorderTerm):def record_post_step(self):return "states", self._env.scene.get_state(is_relative=True)# 步骤前动作记录器 - 记录每步开始时的动作
class PreStepActionsRecorder(RecorderTerm):def record_pre_step(self):return "actions", self._env.action_manager.action# 策略观测记录器 - 记录策略网络的观测数据
class PreStepFlatPolicyObservationsRecorder(RecorderTerm):def record_pre_step(self):return "obs", self._env.obs_buf["policy"]

记录器在环境生命周期的四个关键时刻工作:
重置前记录 (record_pre_reset): 在环境重置前记录数据,通常用于导出完整的回合数据
重置后记录 (record_post_reset): 在环境重置后记录初始状态
步骤前记录 (record_pre_step): 在每步开始时记录动作和观测
步骤后记录 (record_post_step): 在每步结束时记录状态和结果

2.2 配置选项 (recorders_cfg.py)

@configclass
class ActionStateRecorderManagerCfg(RecorderManagerBaseCfg):"""动作和状态记录器配置"""record_initial_state = InitialStateRecorderCfg()record_post_step_states = PostStepStatesRecorderCfg()record_pre_step_actions = PreStepActionsRecorderCfg()record_pre_step_flat_policy_observations = PreStepFlatPolicyObservationsRecorderCfg()

3.使用步骤

在这里插入图片描述

先配置go2_with_recorder_cfg.py

# Copyright (c) 2022-2025, The Isaac Lab Project Developers.
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause"""
Go2机器人环境配置,集成策略观测记录器
Configuration for Go2 robot environment with policy observations recorder
"""from isaaclab.utils import configclass
from isaaclab.managers import RecorderManagerBaseCfg, DatasetExportMode
from isaaclab.envs.mdp.recorders import PreStepFlatPolicyObservationsRecorderCfg, PreStepActionsRecorderCfgfrom .rough_env_cfg import UnitreeGo2RoughEnvCfg@configclass
class Go2PolicyObsRecorderManagerCfg(RecorderManagerBaseCfg):"""Go2机器人策略观测记录器配置Policy observations recorder configuration for Go2 robot"""# 记录策略观测数据 - Record policy observationsrecord_policy_observations = PreStepFlatPolicyObservationsRecorderCfg()# 同时记录动作数据用于分析 - Also record actions for analysisrecord_actions = PreStepActionsRecorderCfg()# 数据导出配置 - Data export configurationdataset_export_dir_path: str = "/tmp/isaaclab/go2_policy_data"dataset_filename: str = "go2_policy_observations"dataset_export_mode: DatasetExportMode = DatasetExportMode.EXPORT_ALLexport_in_record_pre_reset: bool = True@configclass
class UnitreeGo2RoughEnvCfg_WithRecorder(UnitreeGo2RoughEnvCfg):"""带有策略观测记录器的Go2粗糙地形环境配置Go2 rough terrain environment configuration with policy observations recorder"""def __post_init__(self):# 调用父类的后初始化 - Call parent's post initializationsuper().__post_init__()# 设置记录器配置 - Set recorder configurationself.recorders = Go2PolicyObsRecorderManagerCfg()# 为了更好的数据收集,减少环境数量 - Reduce number of environments for better data collectionself.scene.num_envs = 64# 延长回合时间以收集更多数据 - Extend episode length to collect more dataself.episode_length_s = 30.0@configclass
class UnitreeGo2RoughEnvCfg_WithRecorder_PLAY(UnitreeGo2RoughEnvCfg_WithRecorder):"""用于演示/测试的Go2记录器环境配置Go2 recorder environment configuration for demonstration/testing"""def __post_init__(self):# 调用父类的后初始化 - Call parent's post initializationsuper().__post_init__()# 更小的场景用于测试 - Smaller scene for testingself.scene.num_envs = 16self.scene.env_spacing = 2.5# 禁用随机化以获得一致的数据 - Disable randomization for consistent dataself.observations.policy.enable_corruption = False# 移除随机推力事件 - Remove random pushing eventsself.events.base_external_force_torque = Noneself.events.push_robot = None# 设置更短的回合时间用于快速测试 - Shorter episodes for quick testingself.episode_length_s = 15.0# 自定义数据导出路径 - Custom data export pathself.recorders.dataset_export_dir_path = "/tmp/isaaclab/go2_test_data"self.recorders.dataset_filename = "go2_test_observations"@configclass
class Go2HighPerformanceRecorderManagerCfg(RecorderManagerBaseCfg):"""高性能Go2记录器配置,优化大规模数据收集High-performance Go2 recorder configuration optimized for large-scale data collection"""# 只记录策略观测,减少数据量 - Only record policy observations to reduce data volumerecord_policy_observations = PreStepFlatPolicyObservationsRecorderCfg()# 使用快速存储路径 - Use fast storage pathdataset_export_dir_path: str = "/fast_ssd/go2_data"dataset_filename: str = "go2_policy_obs_optimized"# 只导出成功的回合以减少数据量 - Only export successful episodes to reduce data volumedataset_export_mode: DatasetExportMode = DatasetExportMode.EXPORT_SUCCEEDED_ONLY# 减少I/O频率 - Reduce I/O frequencyexport_in_record_pre_reset: bool = False@configclass
class UnitreeGo2RoughEnvCfg_HighPerformance(UnitreeGo2RoughEnvCfg):"""高性能Go2环境配置,用于大规模数据收集High-performance Go2 environment configuration for large-scale data collection"""def __post_init__(self):# 调用父类的后初始化 - Call parent's post initializationsuper().__post_init__()# 设置高性能记录器 - Set high-performance recorderself.recorders = Go2HighPerformanceRecorderManagerCfg()# 大规模并行环境 - Large-scale parallel environmentsself.scene.num_envs = 2048# 优化性能设置 - Performance optimization settingsself.sim.physx.gpu_max_rigid_patch_count = 20 * 2**15 

再编写go2_policy_obs_recorder_example.py

#!/usr/bin/env python3# Copyright (c) 2022-2025, The Isaac Lab Project Developers.
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause"""
Go2机器人策略观测记录器使用示例
Example script for using PreStepFlatPolicyObservationsRecorder with Go2 robot使用方法 Usage:
python examples/go2_policy_obs_recorder_example.py --num_envs 16 --num_episodes 5数据分析 Data Analysis:
python examples/go2_policy_obs_recorder_example.py --analyze_data /tmp/isaaclab/go2_test_data/go2_test_observations.hdf5
"""import argparse
import gymnasium as gym
import h5py
import numpy as np
import torch
import os
from datetime import datetime# Isaac Lab imports
import isaaclab
from isaaclab.utils.dict import print_dict
from isaaclab.utils.io import dump_pickle, dump_yaml# 导入我们创建的Go2配置 - Import our Go2 configurations
from isaaclab_tasks.manager_based.locomotion.velocity.config.go2.go2_with_recorder_cfg import (UnitreeGo2RoughEnvCfg_WithRecorder_PLAY,UnitreeGo2RoughEnvCfg_WithRecorder,UnitreeGo2RoughEnvCfg_HighPerformance
)def create_go2_env_with_recorder(config_name: str = "play", num_envs: int = 16):"""创建带有记录器的Go2环境Create Go2 environment with recorderArgs:config_name: 配置名称 ("play", "standard", "high_performance")num_envs: 环境数量"""# 选择配置 - Select configurationif config_name == "play":env_cfg = UnitreeGo2RoughEnvCfg_WithRecorder_PLAY()elif config_name == "standard":env_cfg = UnitreeGo2RoughEnvCfg_WithRecorder()elif config_name == "high_performance":env_cfg = UnitreeGo2RoughEnvCfg_HighPerformance()else:raise ValueError(f"未知配置: {config_name}")# 设置环境数量 - Set number of environmentsenv_cfg.scene.num_envs = num_envs# 创建环境 - Create environmentenv = gym.make("Isaac-Velocity-Rough-Unitree-Go2-v0", cfg=env_cfg)return env, env_cfgdef collect_go2_policy_observations(env, num_episodes: int = 5):"""收集Go2机器人的策略观测数据Collect policy observations data from Go2 robotArgs:env: Isaac Lab环境num_episodes: 收集的回合数"""print(f"开始收集Go2策略观测数据...")print(f"环境数量: {env.num_envs}")print(f"观测空间维度: {env.observation_space.shape}")print(f"动作空间维度: {env.action_space.shape}")print(f"目标回合数: {num_episodes}")episode_count = 0total_steps = 0while episode_count < num_episodes:print(f"\n=== 回合 {episode_count + 1}/{num_episodes} ===")# 重置环境 - Reset environmentobs, info = env.reset()done = Falsestep_count = 0episode_rewards = []while not done:# 使用随机动作(实际应用中可以使用训练好的策略)# Use random actions (in practice, use trained policy)action = env.action_space.sample()# 执行动作 - Execute actionobs, reward, terminated, truncated, info = env.step(action)# 记录奖励 - Record rewardsepisode_rewards.append(reward.mean().item())# 检查是否结束 - Check if donedone = terminated.any() or truncated.any()step_count += 1total_steps += 1# 每100步打印进度 - Print progress every 100 stepsif step_count % 100 == 0:avg_reward = np.mean(episode_rewards[-100:])print(f"  步数: {step_count:4d}, 平均奖励: {avg_reward:6.3f}, "f"存活环境: {(~terminated).sum().item()}/{env.num_envs}")# 回合统计 - Episode statisticsavg_episode_reward = np.mean(episode_rewards)print(f"回合 {episode_count + 1} 完成:")print(f"  总步数: {step_count}")print(f"  平均奖励: {avg_episode_reward:.3f}")print(f"  最终存活环境数: {(~terminated).sum().item()}")episode_count += 1print(f"\n数据收集完成!")print(f"总回合数: {episode_count}")print(f"总步数: {total_steps}")return episode_count, total_stepsdef analyze_recorded_data(dataset_path: str):"""分析记录的策略观测数据Analyze recorded policy observations dataArgs:dataset_path: HDF5数据集路径"""if not os.path.exists(dataset_path):print(f"错误: 数据文件不存在: {dataset_path}")returnprint(f"分析数据文件: {dataset_path}")with h5py.File(dataset_path, "r") as f:print(f"\n数据集概览:")print(f"  回合数量: {len(f.keys())}")# 分析第一个回合的数据 - Analyze first episode dataepisode_keys = list(f.keys())first_episode = f[episode_keys[0]]print(f"\n第一个回合 ({episode_keys[0]}) 数据结构:")print(f"  数据类型: {list(first_episode.keys())}")# 分析观测数据 - Analyze observation dataif "obs" in first_episode:obs_data = first_episode["obs"][:]print(f"\n策略观测数据分析:")print(f"  数据形状: {obs_data.shape}")print(f"  数据类型: {obs_data.dtype}")print(f"  数据范围: [{obs_data.min():.3f}, {obs_data.max():.3f}]")print(f"  数据均值: {obs_data.mean():.3f}")print(f"  数据标准差: {obs_data.std():.3f}")# 分析观测特征的分布 - Analyze observation feature distributionprint(f"\n观测特征统计 (前10个特征):")for i in range(min(10, obs_data.shape[1])):feature_data = obs_data[:, i]print(f"  特征 {i:2d}: 均值={feature_data.mean():6.3f}, "f"标准差={feature_data.std():6.3f}, "f"范围=[{feature_data.min():6.3f}, {feature_data.max():6.3f}]")# 分析动作数据 - Analyze action dataif "actions" in first_episode:action_data = first_episode["actions"][:]print(f"\n动作数据分析:")print(f"  数据形状: {action_data.shape}")print(f"  数据范围: [{action_data.min():.3f}, {action_data.max():.3f}]")print(f"  数据均值: {action_data.mean():.3f}")print(f"  数据标准差: {action_data.std():.3f}")# 统计所有回合 - Statistics for all episodestotal_steps = 0episode_lengths = []for episode_key in episode_keys:episode = f[episode_key]if "obs" in episode:episode_length = episode["obs"].shape[0]episode_lengths.append(episode_length)total_steps += episode_lengthprint(f"\n所有回合统计:")print(f"  总步数: {total_steps}")print(f"  平均回合长度: {np.mean(episode_lengths):.1f}")print(f"  回合长度范围: [{min(episode_lengths)}, {max(episode_lengths)}]")print(f"  回合长度标准差: {np.std(episode_lengths):.1f}")def main():"""主函数 - Main function"""parser = argparse.ArgumentParser(description="Go2策略观测记录器示例")parser.add_argument("--config", type=str, default="play", choices=["play", "standard", "high_performance"],help="环境配置类型")parser.add_argument("--num_envs", type=int, default=16, help="并行环境数量")parser.add_argument("--num_episodes", type=int, default=5, help="收集的回合数")parser.add_argument("--analyze_data", type=str, default=None,help="分析指定的数据文件路径")parser.add_argument("--headless", action="store_true", default=False,help="无头模式运行")args = parser.parse_args()# 如果指定了数据分析,直接分析数据 - If data analysis is specified, analyze data directlyif args.analyze_data:analyze_recorded_data(args.analyze_data)returnprint("=" * 60)print("Go2机器人策略观测记录器示例")print("=" * 60)print(f"配置: {args.config}")print(f"环境数量: {args.num_envs}")print(f"目标回合数: {args.num_episodes}")print(f"无头模式: {args.headless}")print("=" * 60)try:# 创建环境 - Create environmentprint("创建Go2环境...")env, env_cfg = create_go2_env_with_recorder(args.config, args.num_envs)# 打印环境信息 - Print environment infoprint(f"环境创建成功!")print(f"数据将保存到: {env_cfg.recorders.dataset_export_dir_path}")print(f"文件名: {env_cfg.recorders.dataset_filename}.hdf5")# 收集数据 - Collect dataepisode_count, total_steps = collect_go2_policy_observations(env, args.num_episodes)# 关闭环境 - Close environmentenv.close()# 分析收集的数据 - Analyze collected datadataset_path = os.path.join(env_cfg.recorders.dataset_export_dir_path,f"{env_cfg.recorders.dataset_filename}.hdf5")print(f"\n正在分析收集的数据...")analyze_recorded_data(dataset_path)print(f"\n示例完成!")print(f"数据文件位置: {dataset_path}")except Exception as e:print(f"错误: {e}")import tracebacktraceback.print_exc()if __name__ == "__main__":main() 


4.使用文档

Go2机器人策略观测记录器使用指南

本指南详细说明如何在Isaac Lab中为Go2机器人使用 PreStepFlatPolicyObservationsRecorder 来记录策略网络的观测数据。

概述

PreStepFlatPolicyObservationsRecorder 是Isaac Lab中的一个记录器组件,它可以在每个仿真步骤之前记录策略网络接收到的观测数据。这对于以下应用场景非常有用:

  • 模仿学习数据收集 - 收集专家演示数据
  • 行为分析 - 分析机器人的观测模式
  • 离线强化学习 - 为离线RL算法准备数据集
  • 调试和可视化 - 理解策略网络的输入数据

文件结构

source/isaaclab_tasks/isaaclab_tasks/manager_based/locomotion/velocity/config/go2/
├── go2_with_recorder_cfg.py          # Go2记录器配置文件
├── __init__.py                       # 更新的初始化文件
└── ...examples/
├── go2_simple_recorder_demo.py       # 简单演示脚本
├── go2_policy_obs_recorder_example.py # 完整示例脚本
└── README_Go2_Recorder.md            # 本文档

快速开始

# 基本用法
python examples/go2_policy_obs_recorder_example.py --num_envs 16 --num_episodes 3# 高性能配置
python examples/go2_policy_obs_recorder_example.py --config high_performance --num_envs 64 --num_episodes 10# 分析已有数据
python examples/go2_policy_obs_recorder_example.py --analyze_data /tmp/isaaclab/go2_test_data/go2_test_observations.hdf5

配置选项

记录器配置类型

  1. Go2PolicyObsRecorderManagerCfg - 标准配置

    • 记录策略观测和动作数据
    • 适合一般用途的数据收集
  2. Go2HighPerformanceRecorderManagerCfg - 高性能配置

    • 只记录策略观测数据
    • 优化大规模数据收集
    • 只导出成功的回合

环境配置类型

  1. UnitreeGo2RoughEnvCfg_WithRecorder_PLAY - 演示配置

    • 16个环境
    • 15秒回合长度
    • 适合测试和演示
  2. UnitreeGo2RoughEnvCfg_WithRecorder - 标准配置

    • 64个环境
    • 30秒回合长度
    • 适合正常数据收集
  3. UnitreeGo2RoughEnvCfg_HighPerformance - 高性能配置

    • 2048个环境
    • 优化的物理设置
    • 适合大规模数据收集

自定义配置

创建自定义记录器配置

from isaaclab.utils import configclass
from isaaclab.managers import RecorderManagerBaseCfg, DatasetExportMode
from isaaclab.envs.mdp.recorders import PreStepFlatPolicyObservationsRecorderCfg@configclass
class MyCustomRecorderCfg(RecorderManagerBaseCfg):# 只记录策略观测record_policy_observations = PreStepFlatPolicyObservationsRecorderCfg()# 自定义数据路径dataset_export_dir_path: str = "/my/custom/path"dataset_filename: str = "my_go2_data"# 只导出成功的回合dataset_export_mode: DatasetExportMode = DatasetExportMode.EXPORT_SUCCEEDED_ONLY

在环境中使用自定义配置

from isaaclab_tasks.manager_based.locomotion.velocity.config.go2.rough_env_cfg import UnitreeGo2RoughEnvCfg@configclass
class MyGo2EnvCfg(UnitreeGo2RoughEnvCfg):def __post_init__(self):super().__post_init__()self.recorders = MyCustomRecorderCfg()self.scene.num_envs = 32  # 自定义环境数量

数据格式

记录的数据以HDF5格式保存,结构如下:

dataset.hdf5
├── episode_0/
│   ├── obs          # 策略观测数据 [steps, obs_dim]
│   └── actions      # 动作数据 [steps, action_dim] (如果启用)
├── episode_1/
│   ├── obs
│   └── actions
└── ...

读取数据示例

import h5py
import numpy as np# 读取数据
with h5py.File("go2_policy_observations.hdf5", "r") as f:# 获取第一个回合的观测数据obs_data = f["episode_0"]["obs"][:]print(f"观测数据形状: {obs_data.shape}")# 获取所有回合的数据all_observations = []for episode_key in f.keys():episode_obs = f[episode_key]["obs"][:]all_observations.append(episode_obs)# 合并所有数据combined_obs = np.concatenate(all_observations, axis=0)print(f"总观测数据形状: {combined_obs.shape}")

观测数据内容

Go2机器人的策略观测包含以下信息:

  1. base_lin_vel - 基座线速度 (3维)
  2. base_ang_vel - 基座角速度 (3维)
  3. projected_gravity - 投影重力向量 (3维)
  4. velocity_commands - 速度命令 (3维)
  5. joint_pos - 关节位置 (12维)
  6. joint_vel - 关节速度 (12维)
  7. actions - 上一步动作 (12维)
  8. height_scan - 高度扫描数据 (187维)

总观测维度:235维

http://www.lqws.cn/news/122365.html

相关文章:

  • Python基础:文件简单操作
  • .Net Framework 4/C# 属性和方法
  • 【手写系列】手写动态代理
  • C++——智能指针 weak_ptr
  • Java多线程:ThreadPoolTaskExecutor线程池 与CompletableFuture如何打出组合拳
  • Redisson - 实现延迟队列
  • [特殊字符] 深度剖析 n8n 与 Dify:使用场景、优劣势及技术选型建议
  • DA14531_beacon_大小信标设备开发
  • mysql 悲观锁和乐观锁(—悲观锁)
  • 电路设计基础-2
  • Redis-旁路缓存策略详解
  • JSON基础知识
  • Java 线程池原理详解
  • 分布式互斥算法
  • 使用ArcPy进行栅格数据分析
  • Axios 取消请求的演进:CancelToken vs. AbortController
  • rknn优化教程(一)
  • 海信IP810N-海思MV320芯片-安卓9-2+16G-免拆优盘卡刷固件包
  • 瀚文机械键盘固件开发详解:HWKeyboard.cpp文件解析与应用
  • Async-profiler 内存采样机制解析:从原理到实现
  • Docker慢慢学
  • Java 中 ArrayList、Vector、LinkedList 的核心区别与应用场景
  • 【Docker 从入门到实战全攻略(二):核心概念 + 命令详解 + 部署案例】
  • Spring Boot 从Socket 到Netty网络编程(下):Netty基本开发与改进【心跳、粘包与拆包、闲置连接】
  • java从azure中读取用户信息
  • Docker 常用命令详解
  • docker生命周期
  • Elasticsearch的搜索流程描述
  • 微软的新系统Windows12未来有哪些新特性
  • Python 隐藏法宝:双下划线 _ _Dunder_ _