编写rosbag脚本记录雷达与imu数据包
由于雷达比较大坨,而且是需要搭配 rk3588,以及外部的 imu 数据,我是直接拿了一个飞控来发送 imu 数据给 3588,(我的雷达内部没有imu),因此携带着测试不是很方便,需要编写一个脚本来采集雷达与imu数据,后续就可以在座位上调试。
主要需要几个重要的功能:
订阅雷达 (PointCloud2 类型) 和 IMU (Imu 类型) 数据
自动创建保存 rosbag 的目录
使用带时间戳的文件名保存 rosbag
支持指定记录时长,达到时长后自动停止
包含完善的异常处理和资源清理
在实现功能时有以下几个注意点:
线程保护
雷达使用的是探维的雷达,他们驱动中发布的是 PointCloud2 类型的包,imu就直接通过mavros来获取飞控的imu数据,脚本中订阅数据然后在回调中写入即可,代码比较简单。需要注意的是,如果直接在回调中操作 rosbag,可能会发生线程问题,因为有两个数据回调,因此需要在回调中,增加线程保护:
def radar_callback(self, msg):# 使用锁保护对rosbag的访问with self.bag_lock:# 检查是否需要开始新的bag文件if self.bag is None:self.bag = rosbag.Bag(self.bag_filename, 'w')# 写入雷达消息到rosbagself.bag.write(self.radar_topic, msg)# 检查是否达到记录时间self.check_duration()
退出脚本时,关闭rosbag
在退出脚本时,使用 ctrl+C 来退出,但可能会让 rosbag 包缺少索引,在回放时报错:
Bag file radar_imu_20250701_135822.bag is unindexed. Run rosbag reindex.
这个表示 rosbag 没有正常关闭,找不到索引信息,可以使用命令来修复包:
rosbag reindex xxxx.bag
但这样比较麻烦,正常需要在脚本关闭时,调用 close 函数来关闭,就能正常保存文件。
关闭bag前,先停止话题订阅
关闭bag是一个过程,在关闭时,如果没有停止话题订阅,可能还会进入回调,此时回调中调用写入函数,就会报错,因此在关闭前,比较好的是先停止话题的订阅,然后再关闭:
def shutdown(self):# 取消话题订阅(停止接收新消息)避免在关闭bag时,进入消息回调self.radar_sub.unregister()self.imu_sub.unregister()# 确保所有挂起的回调完成rospy.sleep(0.1) # 短暂等待,让可能正在执行的回调完成with self.bag_lock:if self.bag:self.bag.close() # 关闭包rospy.loginfo(f"Rosbag closed successfully: {self.bag_filename}")
完整代码如下:
#!/usr/bin/env python3
import rospy
import rosbag
from sensor_msgs.msg import Imu
from sensor_msgs.msg import PointCloud2
import os
from datetime import datetime
import threading
class RadarIMURecorder:def __init__(self):# 初始化ROS节点rospy.init_node('fastlio_helper', anonymous=True)# 获取ROS参数或设置默认值self.radar_topic = '/tanwaylidar_pointcloud'self.imu_topic = '/mavros/imu/data'self.bag_directory = 'data' # 保存目录self.record_duration = 0 # 0表示无限期记录# 创建保存rosbag的目录(如果不存在)if not os.path.exists(self.bag_directory):os.makedirs(self.bag_directory)# 生成带时间戳的rosbag文件名timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")self.bag_filename = os.path.join(self.bag_directory, f"radar_imu_{timestamp}.bag")# 初始化rosbag对象self.bag = Noneself.bag_lock = threading.Lock()# 注册ROS关闭回调rospy.on_shutdown(self.shutdown)# 订阅话题self.radar_sub = rospy.Subscriber(self.radar_topic, PointCloud2, self.radar_callback)self.imu_sub = rospy.Subscriber(self.imu_topic, Imu, self.imu_callback)# 记录开始时间self.start_time = rospy.Time.now()rospy.loginfo(f"Recorder started. Saving to {self.bag_filename}")rospy.loginfo(f"Recording topics: {self.radar_topic} and {self.imu_topic}")def radar_callback(self, msg):# 使用锁保护对rosbag的访问with self.bag_lock:# 检查是否需要开始新的bag文件if self.bag is None:self.bag = rosbag.Bag(self.bag_filename, 'w')# 写入雷达消息到rosbagself.bag.write(self.radar_topic, msg)def imu_callback(self, msg):# 使用锁保护对rosbag的访问with self.bag_lock:# 检查是否需要开始新的bag文件if self.bag is None:self.bag = rosbag.Bag(self.bag_filename, 'w')# 写入IMU消息到rosbagself.bag.write(self.imu_topic, msg)def shutdown(self):# 取消话题订阅(停止接收新消息)避免在关闭bag时,进入消息回调self.radar_sub.unregister()self.imu_sub.unregister()# 确保所有挂起的回调完成rospy.sleep(0.1) # 短暂等待,让可能正在执行的回调完成with self.bag_lock:if self.bag:self.bag.close() # 关闭包rospy.loginfo(f"Rosbag closed successfully: {self.bag_filename}")
if __name__ == '__main__':try:recorder = RadarIMURecorder()rospy.spin()except rospy.ROSInterruptException:recorder.shutdown()rospy.loginfo("Recording stopped by user")except Exception as e:rospy.logerr(f"An error occurred: {str(e)}")if hasattr(recorder, 'shutdown'):recorder.shutdown()
运行后,使用 rosbag info 可以看到保存的信息
注意: 要回放日志时,先运行roscore,不然会报错。