Source code for cli.merge_bags

from typing import List
import rosbag

import click

# Original:
# https://github.com/udacity/self-driving-car/blob/master/image-localization/community-code/roboauto/scripts/bagmerge.py


[docs]class RosbagMerger(): def __init__(self, main_input_rosbag: str, merge_input_rosbag: str, output_rosbag_suffix: str, topics: List[str]) -> None: self.main_input_rosbag_path = main_input_rosbag self.main_input_rosbag = rosbag.Bag(main_input_rosbag, 'r') self.merge_input_rosbag_path = merge_input_rosbag self.merge_input_rosbag = rosbag.Bag(merge_input_rosbag, 'r') self.output_rosbag_path = \ f'{main_input_rosbag.rsplit(".")[0]}{output_rosbag_suffix}.{main_input_rosbag.rsplit(".")[1]}' self.topics = topics self.main_limit = (self.main_input_rosbag.get_start_time(), self.main_input_rosbag.get_end_time()) self.main_rosbag_iter = self.main_input_rosbag.read_messages() self.merge_limit = (self.merge_input_rosbag.get_start_time(), self.merge_input_rosbag.get_end_time()) self.merge_rosbag_iter = self.merge_input_rosbag.read_messages(connection_filter=self.merge_filter) click.echo(f'Merging topics ({self.topics}) from {self.merge_input_rosbag_path}' f' into {self.main_input_rosbag_path}. Results will be saved in {self.output_rosbag_path} ')
[docs] def merge_filter(self, topic: str, datatype: str, md5sum: str, msg_def: str, header: dict): return topic in self.topics
[docs] def run(self): next_main_message = next(self.main_rosbag_iter) next_merge_message = next(self.merge_rosbag_iter) with rosbag.Bag(self.output_rosbag_path, 'w') as outbag: while next_main_message is not None or next_merge_message is not None: if next_main_message is None: outbag.write(*next_merge_message) next_merge_message = next(self.merge_input_rosbag, None) elif next_merge_message is None: outbag.write(*next_main_message) next_main_message = next(self.main_rosbag_iter, None) elif next_main_message[2] < next_merge_message[2]: outbag.write(*next_main_message) next_main_message = next(self.main_rosbag_iter, None) else: outbag.write(*next_merge_message) next_merge_message = next(self.merge_rosbag_iter, None)