Source code for mission_machine

#!/usr/bin/env python3

from matplotlib import use
import rospy
import roslaunch
import rospkg

from enum import Enum
import time
import traceback

from as_can_msgs.msg import AutonomousSystemState, AutonomousMission, MissionFinished, Wheelspeed, \
                            SteeringControlEngage, VelocityTorqueSwitch, MissionMachineState, Velocity, Torque


DEBUG = False


[docs]class AutonomousSystemStateEnum(Enum): STARTUP = 0 OFF = 1 READY = 2 DRIVING = 3 EMERGENCY = 4 FINISHED = 5
[docs]class AutonomousMissionEnum(Enum): MANUAL_DRIVING = 0 ACCELERATION = 1 SKIDPAD = 2 TRACKDRIVE = 3 BRAKETEST = 4 INSPECTION = 5 AUTOCROSS = 6
[docs]class MissionMachineEnum(Enum): IDLE = 0 READY = 1 DRIVING = 2 PRE_FINISHED = 3 FINISHED = 4 EMERGENCY = 5 PRE_INSPECTION = 6 INSPECTION = 7
[docs]class MissionMachine(): def __init__(self): self.state = MissionMachineEnum(0) self.as_running = False self.stop_threshold = rospy.get_param('/mission_machine/stop_threshold') self.basic_tests = rospy.get_param('/mission_machine/basic_tests') self.only_use_classic_control = rospy.get_param('/control/use_old_control') self.as_state = AutonomousSystemStateEnum(0) rospy.Subscriber('/can/per/as_state', AutonomousSystemState, self.as_state_callback) self.mission = AutonomousMissionEnum(0) rospy.Subscriber('/can/per/autonomous_mission', AutonomousMission, self.mission_callback) self.mission_completed = False rospy.Subscriber('/finish_line_detection/mission_completed', MissionFinished, self.completed_callback) self.motorspeed_rr = None self.motorspeed_rl = None rospy.Subscriber('/can/log/wheelspeed/rear', Wheelspeed, self.motorspeed_callback) self.wheelspeed_fr = None self.wheelspeed_fl = None rospy.Subscriber('/can/per/wheelspeed/front', Wheelspeed, self.wheelspeed_callback) self.finished = False self.mission_finished_pub = rospy.Publisher('/mission_machine/finished', MissionFinished, queue_size=10) self.steering_control_engage_pub = rospy.Publisher('/mission_machine/steering_control_engage', SteeringControlEngage, queue_size=10) self.velocity_torque_switch_pub = rospy.Publisher('/mission_machine/use_velocity', VelocityTorqueSwitch, queue_size=10) self.velocity_pub = rospy.Publisher('/control/velocity', Velocity, queue_size=10) self.torque_pub = rospy.Publisher('/control/torque', Torque, queue_size=10) self.mission_machine_state_pub = rospy.Publisher('/mission_machine/state', MissionMachineState, queue_size=10)
[docs] def as_state_callback(self, msg): # rospy.loginfo(f'Switch to state {AutonomousSystemStateEnum(msg.as_state)}') self.as_state = AutonomousSystemStateEnum(msg.as_state)
[docs] def mission_callback(self, msg): rospy.loginfo(f'Received mission {AutonomousMissionEnum(msg.mission)}') self.mission = AutonomousMissionEnum(msg.mission)
[docs] def completed_callback(self, msg): self.mission_completed = msg.finished
[docs] def wheelspeed_callback(self, msg): self.wheelspeed_fl = msg.left self.wheelspeed_fr = msg.right
[docs] def motorspeed_callback(self, msg): self.motorspeed_rl = msg.left self.motorspeed_rr = msg.right
[docs] def start_inspection(self): rospy.loginfo('[Mission Machine] Start Inspection Node.') uuid = roslaunch.rlutil.get_or_generate_uuid(None, False) roslaunch.configure_logging(uuid) rospack = rospkg.RosPack() cli_args = [f"{rospack.get_path('as_missions')}/launch/inspection.launch"] self.inspection_process = roslaunch.parent.ROSLaunchParent(uuid, [(roslaunch.rlutil.resolve_launch_arguments(cli_args)[0])]) self.inspection_process.start()
[docs] def kill_inspection(self): rospy.loginfo('[Mission Machine] Kill Inspection Node.') self.inspection_process.shutdown()
[docs] def check_if_inspection_finished(self): if self.motorspeed_rr is None or self.motorspeed_rl is None: rospy.logwarn('[Mission Machine] Cannot check if inspection is finished since the rear wheelspeed message is missing.') return False return self.motorspeed_rr <= 0 and self.motorspeed_rl <= 0
[docs] def check_if_driving_mission_finished(self): if self.motorspeed_rr is None or self.motorspeed_rl is None or self.wheelspeed_fr is None or self.wheelspeed_fl is None: rospy.logwarn('[Mission Machine] Cannot check if mission is finished since the rear or front wheelspeed message is missing.') return False return self.motorspeed_rr <= 0 and self.motorspeed_rl <= 0 and self.wheelspeed_fr <= 0 and self.wheelspeed_fl <= 0
[docs] def check_if_at_standstill_in_emergency(self) -> bool: if self.wheelspeed_fr is None or self.wheelspeed_fl is None: rospy.logwarn('[Mission Machine] Cannot check if at standstill in emergency since the front wheelspeed message is missing.') return False return self.wheelspeed_fr <= self.stop_threshold and self.wheelspeed_fl <= self.stop_threshold
[docs] def finish_mission(self): self.finished = True self.state = MissionMachineEnum.PRE_FINISHED
[docs] def publish_steering_control_engage_msg(self, engage_control: bool): steering_control_engage_msg = SteeringControlEngage() steering_control_engage_msg.header.stamp = rospy.Time.now() steering_control_engage_msg.engage = engage_control self.steering_control_engage_pub.publish(steering_control_engage_msg)
[docs] def publish_finished_msg(self): mission_finished_msg = MissionFinished() mission_finished_msg.header.stamp = rospy.Time.now() mission_finished_msg.finished = self.finished self.mission_finished_pub.publish(mission_finished_msg)
[docs] def publish_velocity_torque_switch(self, use_velocity: bool): msg = VelocityTorqueSwitch() msg.header.stamp = rospy.Time.now() msg.use_velocity = use_velocity self.velocity_torque_switch_pub.publish(msg)
[docs] def publish_velocity(self, velocity: float): msg = Velocity() msg.header.stamp = rospy.Time.now() msg.velocity = velocity self.velocity_pub.publish(msg)
[docs] def publish_torque(self, torque: float): msg = Torque() msg.header.stamp = rospy.Time.now() msg.torque = torque self.torque_pub.publish(msg)
[docs] def publish_mission_machine_state(self): msg = MissionMachineState() msg.header.stamp = rospy.Time.now() msg.state = self.state.value self.mission_machine_state_pub.publish(msg)
[docs] def spawn_basic_test(self): rospy.set_param('/mission', self.mission.value) rospy.loginfo(f'[MissionMachine] Spawning basic test via launchfile') uuid = roslaunch.rlutil.get_or_generate_uuid(None, False) roslaunch.configure_logging(uuid) rospack = rospkg.RosPack() cli_args = [f"{rospack.get_path('basic_tests')}/launch/basic_test.launch"] self.mission_process = roslaunch.parent.ROSLaunchParent(uuid, [(roslaunch.rlutil.resolve_launch_arguments(cli_args)[0])]) self.mission_process.start() self.as_running = True
[docs] def kill_basic_test(self): rospy.loginfo(f'[MissionMachine] Killing previously spawned basic test nodes!') self.mission_process.shutdown() self.as_running = False if DEBUG is True: self.rosbag_process.stop()
[docs] def spawn_as(self): self.publish_velocity_torque_switch(use_velocity=self.only_use_classic_control) #self.publish_velocity_torque_switch(use_velocity=False) if self.mission == AutonomousMissionEnum.BRAKETEST and self.basic_tests is True: self.spawn_basic_test() return rospy.set_param('/mission', self.mission.value) rospy.loginfo(f'[MissionMachine] Spawning AS nodes via launchfile for {self.mission._name_}') uuid = roslaunch.rlutil.get_or_generate_uuid(None, False) roslaunch.configure_logging(uuid) rospack = rospkg.RosPack() cli_args = [f"{rospack.get_path('as_missions')}/launch/dynamic_discipline.launch"] self.mission_process = roslaunch.parent.ROSLaunchParent(uuid, [(roslaunch.rlutil.resolve_launch_arguments(cli_args)[0])]) self.mission_process.start() self.as_running = True
[docs] def start_rosbag(self): rospy.loginfo('[Mission Machine] Start Rosbag.') rospack = rospkg.RosPack() node = roslaunch.core.Node('rosbag', 'play', args='/workspace/as_ros/rosbags/run_373_migrated.bag') launch = roslaunch.scriptapi.ROSLaunch() launch.start() self.rosbag_process = launch.launch(node)
[docs] def kill_as(self): if self.mission == AutonomousMissionEnum.BRAKETEST and self.basic_tests is True: self.kill_basic_test() return rospy.loginfo(f'[MissionMachine] Killing previously spawned AS nodes for {self.mission._name_}!') self.mission_process.shutdown() self.as_running = False if DEBUG is True: self.rosbag_process.stop()
[docs] def switch_state(self): if self.state == MissionMachineEnum.IDLE: if self.as_state == AutonomousSystemStateEnum.READY: if self.mission == AutonomousMissionEnum.INSPECTION: rospy.loginfo('[MissionMachine] IDLE -> PRE_INSPECTION') self.state = MissionMachineEnum.PRE_INSPECTION self.publish_velocity_torque_switch(use_velocity=True) elif self.mission in [AutonomousMissionEnum.BRAKETEST, AutonomousMissionEnum.ACCELERATION, AutonomousMissionEnum.SKIDPAD, AutonomousMissionEnum.TRACKDRIVE, AutonomousMissionEnum.AUTOCROSS]: rospy.loginfo('[MissionMachine] IDLE -> READY') self.spawn_as() self.state = MissionMachineEnum.READY self.publish_steering_control_engage_msg(engage_control=False) elif self.state == MissionMachineEnum.READY: if self.as_state == AutonomousSystemStateEnum.DRIVING: if DEBUG is True: self.start_rosbag() rospy.loginfo('[MissionMachine] READY -> DRIVING') self.publish_steering_control_engage_msg(engage_control=True) self.state = MissionMachineEnum.DRIVING if self.as_state == AutonomousSystemStateEnum.EMERGENCY: rospy.loginfo('[MissionMachine] READY -> EMERGENCY') self.state = MissionMachineEnum.EMERGENCY self.kill_as() elif self.state == MissionMachineEnum.DRIVING: if self.as_state == AutonomousSystemStateEnum.EMERGENCY: rospy.loginfo('[MissionMachine] DRIVING -> EMERGENCY') self.state = MissionMachineEnum.EMERGENCY if self.mission_completed is True: if self.check_if_driving_mission_finished(): rospy.loginfo('[MissionMachine] DRIVING -> PRE_FINISHED') self.finish_mission() self.kill_as() self.publish_steering_control_engage_msg(engage_control=False) self.state = MissionMachineEnum.PRE_FINISHED elif self.state == MissionMachineEnum.PRE_INSPECTION: if self.as_state == AutonomousSystemStateEnum.DRIVING: rospy.loginfo('[MissionMachine] PRE_INSPECTION -> INSPECTION') self.publish_steering_control_engage_msg(engage_control=True) self.state = MissionMachineEnum.INSPECTION self.start_inspection() elif self.as_state == AutonomousSystemStateEnum.EMERGENCY: rospy.loginfo('[MissionMachine] PRE_INSPECTION -> EMERGENCY') self.state = MissionMachineEnum.EMERGENCY elif self.state == MissionMachineEnum.INSPECTION: if self.as_state == AutonomousSystemStateEnum.EMERGENCY: rospy.loginfo('[MissionMachine] INSPECTION -> EMERGENCY') self.kill_inspection() self.publish_steering_control_engage_msg(engage_control=False) self.state = MissionMachineEnum.EMERGENCY if self.mission_completed is True: self.publish_steering_control_engage_msg(engage_control=False) if self.check_if_inspection_finished(): rospy.loginfo('[MissionMachine] INSPECTION -> PRE_FINISHED') self.finish_mission() elif self.state == MissionMachineEnum.EMERGENCY: if self.as_running is True: if self.check_if_at_standstill_in_emergency(): rospy.loginfo('[MissionMachine] EMERGENCY: Vehicle at standstill. Disabling steering.') self.kill_as() self.publish_steering_control_engage_msg(engage_control=False) self.publish_velocity_torque_switch(use_velocity=False) # use torque self.publish_torque(0) self.publish_velocity_torque_switch(use_velocity=True) # use velocity self.publish_velocity(0) if self.as_state == AutonomousSystemStateEnum.OFF: if self.as_running is True: self.kill_as() self.publish_steering_control_engage_msg(engage_control=False) rospy.loginfo('[MissionMachine] EMERGENCY -> IDLE') self.state = MissionMachineEnum.IDLE elif self.state == MissionMachineEnum.PRE_FINISHED: if self.as_state == AutonomousSystemStateEnum.FINISHED: rospy.loginfo('[MissionMachine] PRE_FINISHED -> FINISHED') self.state = MissionMachineEnum.FINISHED if self.as_state == AutonomousSystemStateEnum.EMERGENCY: rospy.loginfo('[MissionMachine] PRE_FINISHED -> EMERGENCY') self.state = MissionMachineEnum.EMERGENCY elif self.state == MissionMachineEnum.FINISHED: if self.as_state == AutonomousSystemStateEnum.OFF: self.mission_completed = False self.finished = False rospy.loginfo('[MissionMachine] FINISHED -> IDLE') self.state = MissionMachineEnum.IDLE
[docs] def run(self): rate = rospy.Rate(20) while not rospy.is_shutdown(): self.switch_state() self.publish_mission_machine_state() self.publish_finished_msg() rate.sleep()
if __name__ == '__main__': rospy.init_node('mission_machine', anonymous=False) try: rospy.loginfo('[Mission Machine] Mission Machine Node started.') mission_machine = MissionMachine() mission_machine.run() except Exception as e: if not isinstance(e, rospy.exceptions.ROSInterruptException): rospy.logfatal(traceback.format_exc()) time.sleep(2) raise e