[docs]classRosbagMigrator():_cone_colors=[(0,255,255),(255,0,0),(0,165,255),(0,0,255)]def__init__(self,input_rosbag:str,output_rosbag_suffix:str,uncompress_images:bool,compress_images:Optional[str],start:float,duration:Optional[float],end:Optional[float],delete_pipeline:Optional[Tuple[str]],delete_lidar_points:bool,delete_images:bool,delete_transforms:bool,delete_visualization:bool,migration_strategy:Optional[Tuple[str]],can_time_offset:Optional[float],gps:bool,fix_clock:bool,rename_vehicle:Optional[str],fixing_strategy:Optional[Tuple[str]],header_time_delta_topics:Optional[Tuple[str]],header_time_deltas:Optional[Tuple[int]])->None:self.input_rosbag_path=input_rosbagself.input_rosbag=rosbag.Bag(input_rosbag)self.input_rosbag_start=self.input_rosbag.get_start_time()self.output_rosbag_path=f'{input_rosbag.rsplit(".")[0]}{output_rosbag_suffix}.{input_rosbag.rsplit(".")[1]}'self.uncompress_images=uncompress_imagesself.compress_images=compress_imagesself.delete_transforms=delete_transformsself.delete_pipeline=set(delete_pipeline)ifdelete_pipelineisnotNoneelseNoneself.migration_strategy=migration_strategyself.fixing_strategy=fixing_strategyself.delete_visualization=delete_visualizationself.delete_lidar_points=delete_lidar_pointsself.delete_images=delete_imagesself.start=self.input_rosbag_start+startself.end=endorself.input_rosbag.get_end_time()self.can_time_offset=can_time_offsetself.rename_vehicle=rename_vehicleifduration:self.end=min(self.start+duration,self.end)self.v1_to_v2_migration_wheelspeed_fr=0self.fix_clock=fix_clockself.first_clock=Noneself.last_clock=Noneself.gps=gpsself.gps_topics=[]ifself.gpsisTrue:forkey,valueinself.input_rosbag.get_type_and_topic_info()[1].items():ifvalue.msg_type=='gps_common/GPSFix':self.gps_topics.append(key)# Set values for header time correctionself.header_time_deltas={}self.adjust_time_topics=[]self.used_topics_for_time_delta=[]iflen(header_time_delta_topics)!=len(header_time_deltas):rospy.logfatal("The lengths of header_time_delta_topics and header_time_deltas do not match.")exit(-1)iflen(header_time_delta_topics)>0:self.header_time_deltas=dict(zip(header_time_delta_topics,header_time_deltas))self.adjust_time_topics=list(self.header_time_deltas.keys())
[docs]defadjust_header_time(self,header:str,delta:int):# Duration to add to the headerduration=rospy.Duration(nsecs=delta*1000*1000)# Adjust the header's stampheader.stamp+=durationreturnheader
[docs]defprocess_ros_msg(self,old_topic:str,old_msg:Any,t:rospy.Time)->List[Tuple[str,Any,rospy.Time]]:messages=[]ifself.fix_clockisTrueand'/clock'==old_topic:ifself.last_clockisnotNoneandold_msg.clock<self.last_clock:return()else:self.last_clock=old_msg.clockkeep,migrated_messages=self.migrate_ros_msg(old_topic=old_topic,old_msg=old_msg,t=t)ifkeepisFalse:returnmessagesifself.fixing_strategy:formigrated_topic,migrated_msg,tinmigrated_messages:self.fix_ros_msg(topic=migrated_topic,msg=migrated_msg,t=t)formigrated_topic,migrated_msg,tinmigrated_messages:ifself.can_time_offsetisnotNoneandhasattr(migrated_msg,'header'):if'/can/'inmigrated_topic:migrated_msg.header.stamp+=rospy.Duration.from_sec(self.can_time_offset)ifself.rename_vehicleisnotNoneandmigrated_topic=='/vehicle':migrated_msg.mesh_resource=f'http://localhost:7777/{self.rename_vehicle}.dae'ifself.rename_vehicle=='emma':migrated_msg.pose.orientation.x=0migrated_msg.pose.orientation.y=0migrated_msg.pose.orientation.z=0migrated_msg.pose.orientation.w=1migrated_msg.pose.position.x=0.8migrated_msg.pose.position.y=0migrated_msg.pose.position.z=0elifself.rename_vehicle=='eva':migrated_msg.pose.orientation.x=0.707migrated_msg.pose.orientation.y=0.0migrated_msg.pose.orientation.z=0.0migrated_msg.pose.orientation.w=0.707migrated_msg.pose.position.x=0.2migrated_msg.pose.position.y=0migrated_msg.pose.position.z=0new_msg=migrated_msgnew_topic=migrated_topicifself.gpsisTrueandmigrated_topicinself.gps_topics:messages.append(self.migrate_gps_message(old_topic=migrated_topic,old_msg=migrated_msg,t=t))ifself.uncompress_imagesisTrue:if'/compressed'inmigrated_topic:new_msg,new_topic=self.uncompress_image(compressed_image=migrated_msg,topic=migrated_topic)elif'/qoi'inmigrated_topic:new_msg,new_topic=self.uncompress_qoi_image(compressed_image=migrated_msg,topic=migrated_topic)elifmigrated_topicin['/perception/detection_image','/visualization/image']:if'qoi'inmigrated_msg.format:new_msg,new_topic=self.uncompress_qoi_image(compressed_image=migrated_msg,topic=migrated_topic)elif'jpeg'inmigrated_msg.formator'png'inmigrated_msg.format:new_msg,new_topic=self.uncompress_image(compressed_image=migrated_msg,topic=migrated_topic)if(self.compress_imagesisnotNoneand('image_rect_color'innew_topicornew_topic=='/perception/detection_image'ornew_topic=='/visualization/image')andnew_msg._type=='sensor_msgs/Image'):new_msg,new_topic=self.compress_image(image=new_msg,topic=new_topic)if(self.compress_imagesisnotNoneand('image_rect_color'inmigrated_topicormigrated_topic=='/perception/detection_image'ormigrated_topic=='/visualization/image')andmigrated_msg._type=='sensor_msgs/Image'):new_msg,new_topic=self.compress_image(image=migrated_msg,topic=migrated_topic)ifnew_topicinself.adjust_time_topics:ifnew_topicnotinself.used_topics_for_time_delta:print(f"Adjusting time for {new_topic} by {self.header_time_deltas[new_topic]} ms.")self.used_topics_for_time_delta.append(new_topic)new_msg.header=self.adjust_header_time(new_msg.header,int(self.header_time_deltas[new_topic]))messages.append((new_topic,new_msg,t))returnmessages