Source code for cli.merge_bags
from typing import List
import rosbag
import click
# Original:
# https://github.com/udacity/self-driving-car/blob/master/image-localization/community-code/roboauto/scripts/bagmerge.py
[docs]class RosbagMerger():
def __init__(self, main_input_rosbag: str, merge_input_rosbag: str,
output_rosbag_suffix: str, topics: List[str]) -> None:
self.main_input_rosbag_path = main_input_rosbag
self.main_input_rosbag = rosbag.Bag(main_input_rosbag, 'r')
self.merge_input_rosbag_path = merge_input_rosbag
self.merge_input_rosbag = rosbag.Bag(merge_input_rosbag, 'r')
self.output_rosbag_path = \
f'{main_input_rosbag.rsplit(".")[0]}{output_rosbag_suffix}.{main_input_rosbag.rsplit(".")[1]}'
self.topics = topics
self.main_limit = (self.main_input_rosbag.get_start_time(),
self.main_input_rosbag.get_end_time())
self.main_rosbag_iter = self.main_input_rosbag.read_messages()
self.merge_limit = (self.merge_input_rosbag.get_start_time(),
self.merge_input_rosbag.get_end_time())
self.merge_rosbag_iter = self.merge_input_rosbag.read_messages(connection_filter=self.merge_filter)
click.echo(f'Merging topics ({self.topics}) from {self.merge_input_rosbag_path}'
f' into {self.main_input_rosbag_path}. Results will be saved in {self.output_rosbag_path} ')
[docs] def merge_filter(self, topic: str, datatype: str, md5sum: str, msg_def: str, header: dict):
return topic in self.topics
[docs] def run(self):
next_main_message = next(self.main_rosbag_iter)
next_merge_message = next(self.merge_rosbag_iter)
with rosbag.Bag(self.output_rosbag_path, 'w') as outbag:
while next_main_message is not None or next_merge_message is not None:
if next_main_message is None:
outbag.write(*next_merge_message)
next_merge_message = next(self.merge_input_rosbag, None)
elif next_merge_message is None:
outbag.write(*next_main_message)
next_main_message = next(self.main_rosbag_iter, None)
elif next_main_message[2] < next_merge_message[2]:
outbag.write(*next_main_message)
next_main_message = next(self.main_rosbag_iter, None)
else:
outbag.write(*next_merge_message)
next_merge_message = next(self.merge_rosbag_iter, None)