Source code for param_study.execute

import signal
import time

import rospy
import roslaunch
import rospkg

import database

parameter_study_name = 'test'

[docs]class GracefulKiller: kill_now = False def __init__(self): signal.signal(signal.SIGINT, self.exit_gracefully) signal.signal(signal.SIGTERM, self.exit_gracefully)
[docs] def exit_gracefully(self, *args): self.kill_now = True
[docs]class ParameterStudyWorker(): def __init__(self, parameter_study_name: str): self.parameter_study_name = parameter_study_name self.session = database.Session() self.killer = GracefulKiller()
[docs] def execute_test_case(self, test_case: database.TestCase): rospy.loginfo(f'[ParameterStudyWorker] Spawning SLAM debug for "{self.parameter_study_name}" study with {test_case.index} test_case') uuid = roslaunch.rlutil.get_or_generate_uuid(None, False) roslaunch.configure_logging(uuid) rospack = rospkg.RosPack() cli_args = [f"{rospack.get_path('slam')}/launch/slam_debug.launch", 'input_rosbag:=/workspace/as_ros/rosbags/run_373_migrated.bag', f'slam_config:=../param_study/{self.parameter_study_name}/configs/test_case_{test_case.index}.yaml', f'output_rosbag:=/workspace/as_ros/src/slam/param_study/{self.parameter_study_name}/{test_case.index}/'] self.test_case_process = roslaunch.parent.ROSLaunchParent(uuid, [(roslaunch.rlutil.resolve_launch_arguments(cli_args)[0], cli_args[1:])]) self.test_case_process.start()
[docs] def wait_for_test_case_to_finish(self): rospy.loginfo(f'[ParameterStudyWorker] Waiting for test case to finish') while self.killer.kill_now is False and self.test_case_process.pm.is_alive() is True: time.sleep(0.1) rospy.loginfo(f'[ParameterStudyWorker] Test case finished. Next....')
[docs] def run_next_test_case(self) -> bool: test_case = database.TestCase.find_next_unstarted_for_study(session=self.session, parameter_study_name=self.parameter_study_name) if test_case is None: return False test_case.started = True self.session.commit() self.execute_test_case(test_case=test_case) self.wait_for_test_case_to_finish() rospy.loginfo(f'[ParameterStudyWorker] Marking test case {test_case.index} as finished.') test_case.finished = True self.session.commit() return True
[docs] def run(self): while self.killer.kill_now is False: continue_running = self.run_next_test_case() if continue_running is False: break
if __name__ == '__main__': rospy.init_node('parameter_study_worker', anonymous=False) rospy.loginfo('[ParameterStudyWorker] Parameter Study Worker Node started.') parameter_study_worker = ParameterStudyWorker(parameter_study_name=parameter_study_name) parameter_study_worker.run()