Source code for cli.migrator

from typing import Any, List, Optional, Tuple

import progressbar

import numpy as np
import cv2
import qoi

import rosbag
import rospy

from sensor_msgs.msg import Image, CompressedImage, NavSatFix
from as_can_msgs.msg import SteeringWheelAngle, Wheelspeed, ImuAcceleration, ImuGyro


[docs]def intersects(set1: set, set2: set) -> bool: return not set1.isdisjoint(set2)
[docs]class RosbagMigrator(): _cone_colors = [(0, 255, 255), (255, 0, 0), (0, 165, 255), (0, 0, 255)] def __init__(self, input_rosbag: str, output_rosbag_suffix: str, uncompress_images: bool, compress_images: Optional[str], start: float, duration: Optional[float], end: Optional[float], delete_pipeline: Optional[Tuple[str]], delete_lidar_points: bool, delete_images: bool, delete_transforms: bool, delete_visualization: bool, migration_strategy: Optional[Tuple[str]], can_time_offset: Optional[float], gps: bool, fix_clock: bool, rename_vehicle: Optional[str], fixing_strategy: Optional[Tuple[str]], header_time_delta_topics: Optional[Tuple[str]], header_time_deltas: Optional[Tuple[int]]) -> None: self.input_rosbag_path = input_rosbag self.input_rosbag = rosbag.Bag(input_rosbag) self.input_rosbag_start = self.input_rosbag.get_start_time() self.output_rosbag_path = f'{input_rosbag.rsplit(".")[0]}{output_rosbag_suffix}.{input_rosbag.rsplit(".")[1]}' self.uncompress_images = uncompress_images self.compress_images = compress_images self.delete_transforms = delete_transforms self.delete_pipeline = set(delete_pipeline) if delete_pipeline is not None else None self.migration_strategy = migration_strategy self.fixing_strategy = fixing_strategy self.delete_visualization = delete_visualization self.delete_lidar_points = delete_lidar_points self.delete_images = delete_images self.start = self.input_rosbag_start + start self.end = end or self.input_rosbag.get_end_time() self.can_time_offset = can_time_offset self.rename_vehicle = rename_vehicle if duration: self.end = min(self.start + duration, self.end) self.v1_to_v2_migration_wheelspeed_fr = 0 self.fix_clock = fix_clock self.first_clock = None self.last_clock = None self.gps = gps self.gps_topics = [] if self.gps is True: for key, value in self.input_rosbag.get_type_and_topic_info()[1].items(): if value.msg_type == 'gps_common/GPSFix': self.gps_topics.append(key) # Set values for header time correction self.header_time_deltas = {} self.adjust_time_topics = [] self.used_topics_for_time_delta = [] if len(header_time_delta_topics) != len(header_time_deltas): rospy.logfatal("The lengths of header_time_delta_topics and header_time_deltas do not match.") exit(-1) if len(header_time_delta_topics) > 0: self.header_time_deltas = dict(zip(header_time_delta_topics, header_time_deltas)) self.adjust_time_topics = list(self.header_time_deltas.keys())
[docs] def adjust_header_time(self, header: str, delta: int): # Duration to add to the header duration = rospy.Duration(nsecs=delta*1000*1000) # Adjust the header's stamp header.stamp += duration return header
[docs] def uncompress_qoi_image(self, compressed_image: CompressedImage, topic: str) -> Tuple[Image, str]: uncompressed_image = qoi.decode(compressed_image.data) image = Image() image.encoding = 'bgra8' image.data = uncompressed_image.tobytes() image.header = compressed_image.header image.height, image.width, channels = uncompressed_image.shape image.step = image.width * channels image.is_bigendian = True return image, topic.split('/qoi')[0]
[docs] def uncompress_image(self, compressed_image: CompressedImage, topic: str) -> Tuple[Image, str]: uncompressed_image = cv2.cvtColor(cv2.imdecode(np.frombuffer(compressed_image.data, np.uint8), cv2.IMREAD_COLOR), cv2.COLOR_BGR2BGRA) image = Image() image.encoding = 'bgra8' image.data = uncompressed_image.tobytes() image.header = compressed_image.header image.height, image.width, channels = uncompressed_image.shape image.step = image.width * channels image.is_bigendian = True return image, topic.split('/compressed')[0]
[docs] def compress_qoi_image(self, image: Image, topic: str) -> Tuple[CompressedImage, str]: compressed_image = CompressedImage() compressed_image.header = image.header compressed_image.format = image.encoding + '; qoi compressed bgra8' data = np.frombuffer(image.data, np.uint8).reshape((image.height, image.width, 4)).copy() compressed_image.data = qoi.encode(data) return compressed_image, topic + '/qoi'
[docs] def compress_jpeg_image(self, image: Image, topic: str) -> Tuple[CompressedImage, str]: compressed_image = CompressedImage() compressed_image.header = image.header compressed_image.format = image.encoding + '; jpeg compressed bgr8' compressed_image.data = np.array( cv2.imencode('.' + self.compress_images, np.frombuffer(image.data, np.uint8).reshape((image.height, image.width, 4)), [cv2.IMWRITE_JPEG_QUALITY, 90])[1]).tostring() return compressed_image, topic + '/compressed'
[docs] def compress_png_image(self, image: Image, topic:str) -> Tuple[CompressedImage, str]: compressed_image = CompressedImage() compressed_image.header = image.header compressed_image.format = image.encoding + '; png compressed bgr8' compressed_image.data = np.array( cv2.imencode('.' + self.compress_images, np.frombuffer(image.data, np.uint8).reshape((image.height, image.width, 4)), [cv2.IMWRITE_PNG_COMPRESSION, 6])[1]).tostring() return compressed_image, topic + '/compressed'
[docs] def compress_image(self, image: Image, topic: str) -> Tuple[CompressedImage, str]: if self.compress_images == 'qoi': return self.compress_qoi_image(image=image, topic=topic) elif self.compress_images == 'jpeg': return self.compress_jpeg_image(image=image, topic=topic) elif self.compress_images == 'png': return self.compress_png_image(image=image, topic=topic)
[docs] def filter_ros_msg_by_pipeline(self, topic: str, msg: Any, t: rospy.Time) -> bool: if intersects({'PER'}, self.delete_pipeline): if 'perception' in topic: return False if intersects({'PER', 'LOMA'}, self.delete_pipeline): if 'local_mapping' in topic: return False if intersects({'PER', 'LOMA', 'SLAM'}, self.delete_pipeline): if 'slam' in topic or 'finish_line_detection' in topic: return False if intersects({'PER', 'LOMA', 'SLAM', 'FILT'}, self.delete_pipeline): if 'filtering' in topic: return False if intersects({'PER', 'LOMA', 'SLAM', 'FILT', 'MOPA'}, self.delete_pipeline): if 'motion_planning' in topic: return False if intersects({'PER', 'LOMA', 'SLAM', 'FILT', 'MOPA', 'CTRL'}, self.delete_pipeline): if 'control' in topic: return False return True
[docs] def filter_ros_msg(self, topic: str, msg: Any, t: rospy.Time) -> bool: if self.delete_visualization is True or len(self.delete_pipeline): if 'visualization' in topic or topic == '/vehicle': return False if self.delete_transforms is True: if topic == '/tf': return False if self.delete_lidar_points is True: if topic == '/velodyne_points' or topic == '/ouster/points': return False if self.delete_images is True: if 'Image' in msg._type: return False if self.delete_pipeline is not None: if self.filter_ros_msg_by_pipeline(topic=topic, msg=msg, t=t) is False: return False return True
[docs] def migrate_v1_to_v2_ros_message(self, old_topic: str, old_msg: Any, t: rospy.Time ) -> Tuple[bool, List[Tuple[str, Any, rospy.Time]]]: new_topic = old_topic new_msg = old_msg if '/zed/' in old_topic: new_topic = old_topic.replace('/zed/', '/zed2i/') new_msg.header.frame_id.replace('zed_', 'zed2i') if old_topic == '/actc_pos': new_topic = '/can/per/steering_wheel_angle/actual' new_msg = SteeringWheelAngle() new_msg.header = old_msg.header new_msg.angle = old_msg.data elif old_topic == '/wheelspeed/left': new_topic = '/can/per/wheelspeed/front' new_msg = Wheelspeed() new_msg.header = old_msg.header new_msg.right = self.v1_to_v2_migration_wheelspeed_fr new_msg.left = old_msg.data1 elif old_topic == '/wheelspeed/right': self.v1_to_v2_migration_wheelspeed_fr = old_msg.data1 return False, [] elif old_topic == '/imu/accel': new_topic = '/can/per/imu/accel' new_msg = ImuAcceleration() new_msg.header = old_msg.header new_msg.x = old_msg.linear_acceleration.x new_msg.y = old_msg.linear_acceleration.y new_msg.z = old_msg.linear_acceleration.z elif old_topic == '/imu/gyro': new_topic = '/can/per/imu/gyro' new_msg = ImuGyro() new_msg.header = old_msg.header new_msg.roll = old_msg.angular_velocity.x new_msg.pitch = old_msg.angular_velocity.y new_msg.yaw = old_msg.angular_velocity.z elif old_topic == '/engine/rpm': new_topic = '/can/log/wheelspeed/rear' new_msg = Wheelspeed() new_msg.header = old_msg.header new_msg.left = old_msg.data1 new_msg.right = old_msg.data2 return True, [(new_topic, new_msg, t)]
[docs] def migrate_gps_message(self, old_topic: str, old_msg: Any, t: rospy.Time) -> Tuple[str, Any, rospy.Time]: nav_sat_fix_msg = NavSatFix(header=old_msg.header, latitude=old_msg.latitude, longitude=old_msg.longitude, altitude=old_msg.altitude, position_covariance=old_msg.position_covariance, position_covariance_type=old_msg.position_covariance_type) return (old_topic + '/migrated', nav_sat_fix_msg, t)
[docs] def migrate_ros_msg(self, old_topic: str, old_msg: Any, t: rospy.Time): if not self.migration_strategy: return True, [(old_topic, old_msg, t)] all_messages = [] keep_at_all = False if 'V1_to_V2' in self.migration_strategy: keep, messages = self.migrate_v1_to_v2_ros_message(old_topic=old_topic, old_msg=old_msg, t=t) keep_at_all |= keep all_messages += messages if not keep_at_all: return False, [] else: return True, all_messages
[docs] def fix_ros_msg(self, topic: str, msg: Any, t: rospy.Time): if 'rear_wheelspeed' in self.fixing_strategy: if topic == '/can/log/wheelspeed/rear': msg.right = (5500 / 6000) * msg.right msg.left = (5500 / 6000) * msg.left if 'inverse_rl_wheelspeed' in self.fixing_strategy: if topic == '/can/log/wheelspeed/rear': msg.left *= -1 if 'fix_lidar_time' in self.fixing_strategy: if topic.startswith('/ouster') and hasattr(msg, 'header'): msg.header.stamp = t
[docs] def process_ros_msg(self, old_topic: str, old_msg: Any, t: rospy.Time) -> List[Tuple[str, Any, rospy.Time]]: messages = [] if self.fix_clock is True and '/clock' == old_topic: if self.last_clock is not None and old_msg.clock < self.last_clock: return () else: self.last_clock = old_msg.clock keep, migrated_messages = self.migrate_ros_msg(old_topic=old_topic, old_msg=old_msg, t=t) if keep is False: return messages if self.fixing_strategy: for migrated_topic, migrated_msg, t in migrated_messages: self.fix_ros_msg(topic=migrated_topic, msg=migrated_msg, t=t) for migrated_topic, migrated_msg, t in migrated_messages: if self.can_time_offset is not None and hasattr(migrated_msg, 'header'): if '/can/' in migrated_topic: migrated_msg.header.stamp += rospy.Duration.from_sec(self.can_time_offset) if self.rename_vehicle is not None and migrated_topic == '/vehicle': migrated_msg.mesh_resource = f'http://localhost:7777/{self.rename_vehicle}.dae' if self.rename_vehicle == 'emma': migrated_msg.pose.orientation.x = 0 migrated_msg.pose.orientation.y = 0 migrated_msg.pose.orientation.z = 0 migrated_msg.pose.orientation.w = 1 migrated_msg.pose.position.x = 0.8 migrated_msg.pose.position.y = 0 migrated_msg.pose.position.z = 0 elif self.rename_vehicle == 'eva': migrated_msg.pose.orientation.x = 0.707 migrated_msg.pose.orientation.y = 0.0 migrated_msg.pose.orientation.z = 0.0 migrated_msg.pose.orientation.w = 0.707 migrated_msg.pose.position.x = 0.2 migrated_msg.pose.position.y = 0 migrated_msg.pose.position.z = 0 new_msg = migrated_msg new_topic = migrated_topic if self.gps is True and migrated_topic in self.gps_topics: messages.append(self.migrate_gps_message(old_topic=migrated_topic, old_msg=migrated_msg, t=t)) if self.uncompress_images is True: if '/compressed' in migrated_topic: new_msg, new_topic = self.uncompress_image(compressed_image=migrated_msg, topic=migrated_topic) elif '/qoi' in migrated_topic: new_msg, new_topic = self.uncompress_qoi_image(compressed_image=migrated_msg, topic=migrated_topic) elif migrated_topic in ['/perception/detection_image', '/visualization/image']: if 'qoi' in migrated_msg.format: new_msg, new_topic = self.uncompress_qoi_image(compressed_image=migrated_msg, topic=migrated_topic) elif 'jpeg' in migrated_msg.format or 'png' in migrated_msg.format: new_msg, new_topic = self.uncompress_image(compressed_image=migrated_msg, topic=migrated_topic) if (self.compress_images is not None and ('image_rect_color' in new_topic or new_topic == '/perception/detection_image' or new_topic == '/visualization/image') and new_msg._type == 'sensor_msgs/Image'): new_msg, new_topic = self.compress_image(image=new_msg, topic=new_topic) if (self.compress_images is not None and ('image_rect_color' in migrated_topic or migrated_topic == '/perception/detection_image' or migrated_topic == '/visualization/image') and migrated_msg._type == 'sensor_msgs/Image'): new_msg, new_topic = self.compress_image(image=migrated_msg, topic=migrated_topic) if new_topic in self.adjust_time_topics: if new_topic not in self.used_topics_for_time_delta: print(f"Adjusting time for {new_topic} by {self.header_time_deltas[new_topic]} ms.") self.used_topics_for_time_delta.append(new_topic) new_msg.header = self.adjust_header_time(new_msg.header, int(self.header_time_deltas[new_topic])) messages.append((new_topic, new_msg, t)) return messages
[docs] def run(self): with progressbar.ProgressBar(max_value=self.input_rosbag.get_message_count()) as bar: bar.start() with rosbag.Bag(self.output_rosbag_path, 'w') as outbag: for old_topic, old_msg, t in self.input_rosbag.read_messages(): if t.to_sec() < self.start: bar.max_value -= 1 bar.update(bar.value) continue if t.to_sec() > self.end: break new_messages = self.process_ros_msg(old_topic=old_topic, old_msg=old_msg, t=t) for new_topic, new_msg, t in new_messages: if self.filter_ros_msg(topic=new_topic, msg=new_msg, t=t): outbag.write(new_topic, new_msg, t) bar.update(bar.value + 1) bar.finish()
if __name__ == '__main__': migrator = RosbagMigrator(input_rosbag='/workspace/as_ros/rosbags/trackdrive_slam_detection.bag', output_rosbag_suffix='_migrated', start=0, duration=None, end=None, delete_lidar_points=False, delete_images=False, generate_detection_image=False, uncompress_images=False, delete_transforms=False, delete_visualization=False, delete_pipeline=(), migration_strategy=(), can_time_offset=0, recording_topics=['/perception/detection_image'], fix_clock=False) migrator.run()