from typing import Any, List, Optional, Tuple
import progressbar
import numpy as np
import cv2
import qoi
import rosbag
import rospy
from sensor_msgs.msg import Image, CompressedImage, NavSatFix
from as_can_msgs.msg import SteeringWheelAngle, Wheelspeed, ImuAcceleration, ImuGyro
[docs]def intersects(set1: set, set2: set) -> bool:
return not set1.isdisjoint(set2)
[docs]class RosbagMigrator():
_cone_colors = [(0, 255, 255), (255, 0, 0), (0, 165, 255), (0, 0, 255)]
def __init__(self, input_rosbag: str, output_rosbag_suffix: str, uncompress_images: bool, compress_images: Optional[str],
start: float, duration: Optional[float], end: Optional[float], delete_pipeline: Optional[Tuple[str]],
delete_lidar_points: bool, delete_images: bool, delete_transforms: bool, delete_visualization: bool,
migration_strategy: Optional[Tuple[str]], can_time_offset: Optional[float], gps: bool, fix_clock: bool,
rename_vehicle: Optional[str], fixing_strategy: Optional[Tuple[str]],
header_time_delta_topics: Optional[Tuple[str]], header_time_deltas: Optional[Tuple[int]]) -> None:
self.input_rosbag_path = input_rosbag
self.input_rosbag = rosbag.Bag(input_rosbag)
self.input_rosbag_start = self.input_rosbag.get_start_time()
self.output_rosbag_path = f'{input_rosbag.rsplit(".")[0]}{output_rosbag_suffix}.{input_rosbag.rsplit(".")[1]}'
self.uncompress_images = uncompress_images
self.compress_images = compress_images
self.delete_transforms = delete_transforms
self.delete_pipeline = set(delete_pipeline) if delete_pipeline is not None else None
self.migration_strategy = migration_strategy
self.fixing_strategy = fixing_strategy
self.delete_visualization = delete_visualization
self.delete_lidar_points = delete_lidar_points
self.delete_images = delete_images
self.start = self.input_rosbag_start + start
self.end = end or self.input_rosbag.get_end_time()
self.can_time_offset = can_time_offset
self.rename_vehicle = rename_vehicle
if duration:
self.end = min(self.start + duration, self.end)
self.v1_to_v2_migration_wheelspeed_fr = 0
self.fix_clock = fix_clock
self.first_clock = None
self.last_clock = None
self.gps = gps
self.gps_topics = []
if self.gps is True:
for key, value in self.input_rosbag.get_type_and_topic_info()[1].items():
if value.msg_type == 'gps_common/GPSFix':
self.gps_topics.append(key)
# Set values for header time correction
self.header_time_deltas = {}
self.adjust_time_topics = []
self.used_topics_for_time_delta = []
if len(header_time_delta_topics) != len(header_time_deltas):
rospy.logfatal("The lengths of header_time_delta_topics and header_time_deltas do not match.")
exit(-1)
if len(header_time_delta_topics) > 0:
self.header_time_deltas = dict(zip(header_time_delta_topics, header_time_deltas))
self.adjust_time_topics = list(self.header_time_deltas.keys())
[docs] def uncompress_qoi_image(self, compressed_image: CompressedImage, topic: str) -> Tuple[Image, str]:
uncompressed_image = qoi.decode(compressed_image.data)
image = Image()
image.encoding = 'bgra8'
image.data = uncompressed_image.tobytes()
image.header = compressed_image.header
image.height, image.width, channels = uncompressed_image.shape
image.step = image.width * channels
image.is_bigendian = True
return image, topic.split('/qoi')[0]
[docs] def uncompress_image(self, compressed_image: CompressedImage, topic: str) -> Tuple[Image, str]:
uncompressed_image = cv2.cvtColor(cv2.imdecode(np.frombuffer(compressed_image.data, np.uint8),
cv2.IMREAD_COLOR),
cv2.COLOR_BGR2BGRA)
image = Image()
image.encoding = 'bgra8'
image.data = uncompressed_image.tobytes()
image.header = compressed_image.header
image.height, image.width, channels = uncompressed_image.shape
image.step = image.width * channels
image.is_bigendian = True
return image, topic.split('/compressed')[0]
[docs] def compress_qoi_image(self, image: Image, topic: str) -> Tuple[CompressedImage, str]:
compressed_image = CompressedImage()
compressed_image.header = image.header
compressed_image.format = image.encoding + '; qoi compressed bgra8'
data = np.frombuffer(image.data, np.uint8).reshape((image.height, image.width, 4)).copy()
compressed_image.data = qoi.encode(data)
return compressed_image, topic + '/qoi'
[docs] def compress_jpeg_image(self, image: Image, topic: str) -> Tuple[CompressedImage, str]:
compressed_image = CompressedImage()
compressed_image.header = image.header
compressed_image.format = image.encoding + '; jpeg compressed bgr8'
compressed_image.data = np.array(
cv2.imencode('.' + self.compress_images, np.frombuffer(image.data, np.uint8).reshape((image.height, image.width, 4)),
[cv2.IMWRITE_JPEG_QUALITY, 90])[1]).tostring()
return compressed_image, topic + '/compressed'
[docs] def compress_png_image(self, image: Image, topic:str) -> Tuple[CompressedImage, str]:
compressed_image = CompressedImage()
compressed_image.header = image.header
compressed_image.format = image.encoding + '; png compressed bgr8'
compressed_image.data = np.array(
cv2.imencode('.' + self.compress_images, np.frombuffer(image.data, np.uint8).reshape((image.height, image.width, 4)),
[cv2.IMWRITE_PNG_COMPRESSION, 6])[1]).tostring()
return compressed_image, topic + '/compressed'
[docs] def compress_image(self, image: Image, topic: str) -> Tuple[CompressedImage, str]:
if self.compress_images == 'qoi':
return self.compress_qoi_image(image=image, topic=topic)
elif self.compress_images == 'jpeg':
return self.compress_jpeg_image(image=image, topic=topic)
elif self.compress_images == 'png':
return self.compress_png_image(image=image, topic=topic)
[docs] def filter_ros_msg_by_pipeline(self, topic: str, msg: Any, t: rospy.Time) -> bool:
if intersects({'PER'}, self.delete_pipeline):
if 'perception' in topic:
return False
if intersects({'PER', 'LOMA'}, self.delete_pipeline):
if 'local_mapping' in topic:
return False
if intersects({'PER', 'LOMA', 'SLAM'}, self.delete_pipeline):
if 'slam' in topic or 'finish_line_detection' in topic:
return False
if intersects({'PER', 'LOMA', 'SLAM', 'FILT'}, self.delete_pipeline):
if 'filtering' in topic:
return False
if intersects({'PER', 'LOMA', 'SLAM', 'FILT', 'MOPA'}, self.delete_pipeline):
if 'motion_planning' in topic:
return False
if intersects({'PER', 'LOMA', 'SLAM', 'FILT', 'MOPA', 'CTRL'}, self.delete_pipeline):
if 'control' in topic:
return False
return True
[docs] def filter_ros_msg(self, topic: str, msg: Any, t: rospy.Time) -> bool:
if self.delete_visualization is True or len(self.delete_pipeline):
if 'visualization' in topic or topic == '/vehicle':
return False
if self.delete_transforms is True:
if topic == '/tf':
return False
if self.delete_lidar_points is True:
if topic == '/velodyne_points' or topic == '/ouster/points':
return False
if self.delete_images is True:
if 'Image' in msg._type:
return False
if self.delete_pipeline is not None:
if self.filter_ros_msg_by_pipeline(topic=topic, msg=msg, t=t) is False:
return False
return True
[docs] def migrate_v1_to_v2_ros_message(self, old_topic: str, old_msg: Any, t: rospy.Time
) -> Tuple[bool, List[Tuple[str, Any, rospy.Time]]]:
new_topic = old_topic
new_msg = old_msg
if '/zed/' in old_topic:
new_topic = old_topic.replace('/zed/', '/zed2i/')
new_msg.header.frame_id.replace('zed_', 'zed2i')
if old_topic == '/actc_pos':
new_topic = '/can/per/steering_wheel_angle/actual'
new_msg = SteeringWheelAngle()
new_msg.header = old_msg.header
new_msg.angle = old_msg.data
elif old_topic == '/wheelspeed/left':
new_topic = '/can/per/wheelspeed/front'
new_msg = Wheelspeed()
new_msg.header = old_msg.header
new_msg.right = self.v1_to_v2_migration_wheelspeed_fr
new_msg.left = old_msg.data1
elif old_topic == '/wheelspeed/right':
self.v1_to_v2_migration_wheelspeed_fr = old_msg.data1
return False, []
elif old_topic == '/imu/accel':
new_topic = '/can/per/imu/accel'
new_msg = ImuAcceleration()
new_msg.header = old_msg.header
new_msg.x = old_msg.linear_acceleration.x
new_msg.y = old_msg.linear_acceleration.y
new_msg.z = old_msg.linear_acceleration.z
elif old_topic == '/imu/gyro':
new_topic = '/can/per/imu/gyro'
new_msg = ImuGyro()
new_msg.header = old_msg.header
new_msg.roll = old_msg.angular_velocity.x
new_msg.pitch = old_msg.angular_velocity.y
new_msg.yaw = old_msg.angular_velocity.z
elif old_topic == '/engine/rpm':
new_topic = '/can/log/wheelspeed/rear'
new_msg = Wheelspeed()
new_msg.header = old_msg.header
new_msg.left = old_msg.data1
new_msg.right = old_msg.data2
return True, [(new_topic, new_msg, t)]
[docs] def migrate_gps_message(self, old_topic: str, old_msg: Any, t: rospy.Time) -> Tuple[str, Any, rospy.Time]:
nav_sat_fix_msg = NavSatFix(header=old_msg.header, latitude=old_msg.latitude,
longitude=old_msg.longitude, altitude=old_msg.altitude,
position_covariance=old_msg.position_covariance,
position_covariance_type=old_msg.position_covariance_type)
return (old_topic + '/migrated', nav_sat_fix_msg, t)
[docs] def migrate_ros_msg(self, old_topic: str, old_msg: Any, t: rospy.Time):
if not self.migration_strategy:
return True, [(old_topic, old_msg, t)]
all_messages = []
keep_at_all = False
if 'V1_to_V2' in self.migration_strategy:
keep, messages = self.migrate_v1_to_v2_ros_message(old_topic=old_topic, old_msg=old_msg, t=t)
keep_at_all |= keep
all_messages += messages
if not keep_at_all:
return False, []
else:
return True, all_messages
[docs] def fix_ros_msg(self, topic: str, msg: Any, t: rospy.Time):
if 'rear_wheelspeed' in self.fixing_strategy:
if topic == '/can/log/wheelspeed/rear':
msg.right = (5500 / 6000) * msg.right
msg.left = (5500 / 6000) * msg.left
if 'inverse_rl_wheelspeed' in self.fixing_strategy:
if topic == '/can/log/wheelspeed/rear':
msg.left *= -1
if 'fix_lidar_time' in self.fixing_strategy:
if topic.startswith('/ouster') and hasattr(msg, 'header'):
msg.header.stamp = t
[docs] def process_ros_msg(self, old_topic: str, old_msg: Any, t: rospy.Time) -> List[Tuple[str, Any, rospy.Time]]:
messages = []
if self.fix_clock is True and '/clock' == old_topic:
if self.last_clock is not None and old_msg.clock < self.last_clock:
return ()
else:
self.last_clock = old_msg.clock
keep, migrated_messages = self.migrate_ros_msg(old_topic=old_topic, old_msg=old_msg, t=t)
if keep is False:
return messages
if self.fixing_strategy:
for migrated_topic, migrated_msg, t in migrated_messages:
self.fix_ros_msg(topic=migrated_topic, msg=migrated_msg, t=t)
for migrated_topic, migrated_msg, t in migrated_messages:
if self.can_time_offset is not None and hasattr(migrated_msg, 'header'):
if '/can/' in migrated_topic:
migrated_msg.header.stamp += rospy.Duration.from_sec(self.can_time_offset)
if self.rename_vehicle is not None and migrated_topic == '/vehicle':
migrated_msg.mesh_resource = f'http://localhost:7777/{self.rename_vehicle}.dae'
if self.rename_vehicle == 'emma':
migrated_msg.pose.orientation.x = 0
migrated_msg.pose.orientation.y = 0
migrated_msg.pose.orientation.z = 0
migrated_msg.pose.orientation.w = 1
migrated_msg.pose.position.x = 0.8
migrated_msg.pose.position.y = 0
migrated_msg.pose.position.z = 0
elif self.rename_vehicle == 'eva':
migrated_msg.pose.orientation.x = 0.707
migrated_msg.pose.orientation.y = 0.0
migrated_msg.pose.orientation.z = 0.0
migrated_msg.pose.orientation.w = 0.707
migrated_msg.pose.position.x = 0.2
migrated_msg.pose.position.y = 0
migrated_msg.pose.position.z = 0
new_msg = migrated_msg
new_topic = migrated_topic
if self.gps is True and migrated_topic in self.gps_topics:
messages.append(self.migrate_gps_message(old_topic=migrated_topic, old_msg=migrated_msg, t=t))
if self.uncompress_images is True:
if '/compressed' in migrated_topic:
new_msg, new_topic = self.uncompress_image(compressed_image=migrated_msg, topic=migrated_topic)
elif '/qoi' in migrated_topic:
new_msg, new_topic = self.uncompress_qoi_image(compressed_image=migrated_msg, topic=migrated_topic)
elif migrated_topic in ['/perception/detection_image', '/visualization/image']:
if 'qoi' in migrated_msg.format:
new_msg, new_topic = self.uncompress_qoi_image(compressed_image=migrated_msg, topic=migrated_topic)
elif 'jpeg' in migrated_msg.format or 'png' in migrated_msg.format:
new_msg, new_topic = self.uncompress_image(compressed_image=migrated_msg, topic=migrated_topic)
if (self.compress_images is not None and ('image_rect_color' in new_topic
or new_topic == '/perception/detection_image' or new_topic == '/visualization/image')
and new_msg._type == 'sensor_msgs/Image'):
new_msg, new_topic = self.compress_image(image=new_msg, topic=new_topic)
if (self.compress_images is not None and ('image_rect_color' in migrated_topic
or migrated_topic == '/perception/detection_image' or migrated_topic == '/visualization/image')
and migrated_msg._type == 'sensor_msgs/Image'):
new_msg, new_topic = self.compress_image(image=migrated_msg, topic=migrated_topic)
if new_topic in self.adjust_time_topics:
if new_topic not in self.used_topics_for_time_delta:
print(f"Adjusting time for {new_topic} by {self.header_time_deltas[new_topic]} ms.")
self.used_topics_for_time_delta.append(new_topic)
new_msg.header = self.adjust_header_time(new_msg.header, int(self.header_time_deltas[new_topic]))
messages.append((new_topic, new_msg, t))
return messages
[docs] def run(self):
with progressbar.ProgressBar(max_value=self.input_rosbag.get_message_count()) as bar:
bar.start()
with rosbag.Bag(self.output_rosbag_path, 'w') as outbag:
for old_topic, old_msg, t in self.input_rosbag.read_messages():
if t.to_sec() < self.start:
bar.max_value -= 1
bar.update(bar.value)
continue
if t.to_sec() > self.end:
break
new_messages = self.process_ros_msg(old_topic=old_topic, old_msg=old_msg, t=t)
for new_topic, new_msg, t in new_messages:
if self.filter_ros_msg(topic=new_topic, msg=new_msg, t=t):
outbag.write(new_topic, new_msg, t)
bar.update(bar.value + 1)
bar.finish()
if __name__ == '__main__':
migrator = RosbagMigrator(input_rosbag='/workspace/as_ros/rosbags/trackdrive_slam_detection.bag',
output_rosbag_suffix='_migrated',
start=0, duration=None, end=None, delete_lidar_points=False, delete_images=False,
generate_detection_image=False, uncompress_images=False, delete_transforms=False,
delete_visualization=False, delete_pipeline=(), migration_strategy=(),
can_time_offset=0, recording_topics=['/perception/detection_image'], fix_clock=False)
migrator.run()