#!/usr/bin/env python3
from matplotlib import use
import rospy
import roslaunch
import rospkg
from enum import Enum
import time
import traceback
from as_can_msgs.msg import AutonomousSystemState, AutonomousMission, MissionFinished, Wheelspeed, \
SteeringControlEngage, VelocityTorqueSwitch, MissionMachineState, Velocity, Torque
DEBUG = False
[docs]class AutonomousSystemStateEnum(Enum):
STARTUP = 0
OFF = 1
READY = 2
DRIVING = 3
EMERGENCY = 4
FINISHED = 5
[docs]class AutonomousMissionEnum(Enum):
MANUAL_DRIVING = 0
ACCELERATION = 1
SKIDPAD = 2
TRACKDRIVE = 3
BRAKETEST = 4
INSPECTION = 5
AUTOCROSS = 6
[docs]class MissionMachineEnum(Enum):
IDLE = 0
READY = 1
DRIVING = 2
PRE_FINISHED = 3
FINISHED = 4
EMERGENCY = 5
PRE_INSPECTION = 6
INSPECTION = 7
[docs]class MissionMachine():
def __init__(self):
self.state = MissionMachineEnum(0)
self.as_running = False
self.stop_threshold = rospy.get_param('/mission_machine/stop_threshold')
self.basic_tests = rospy.get_param('/mission_machine/basic_tests')
self.only_use_classic_control = rospy.get_param('/control/use_old_control')
self.as_state = AutonomousSystemStateEnum(0)
rospy.Subscriber('/can/per/as_state', AutonomousSystemState, self.as_state_callback)
self.mission = AutonomousMissionEnum(0)
rospy.Subscriber('/can/per/autonomous_mission', AutonomousMission, self.mission_callback)
self.mission_completed = False
rospy.Subscriber('/finish_line_detection/mission_completed', MissionFinished, self.completed_callback)
self.motorspeed_rr = None
self.motorspeed_rl = None
rospy.Subscriber('/can/log/wheelspeed/rear', Wheelspeed, self.motorspeed_callback)
self.wheelspeed_fr = None
self.wheelspeed_fl = None
rospy.Subscriber('/can/per/wheelspeed/front', Wheelspeed, self.wheelspeed_callback)
self.finished = False
self.mission_finished_pub = rospy.Publisher('/mission_machine/finished', MissionFinished, queue_size=10)
self.steering_control_engage_pub = rospy.Publisher('/mission_machine/steering_control_engage', SteeringControlEngage, queue_size=10)
self.velocity_torque_switch_pub = rospy.Publisher('/mission_machine/use_velocity', VelocityTorqueSwitch, queue_size=10)
self.velocity_pub = rospy.Publisher('/control/velocity', Velocity, queue_size=10)
self.torque_pub = rospy.Publisher('/control/torque', Torque, queue_size=10)
self.mission_machine_state_pub = rospy.Publisher('/mission_machine/state', MissionMachineState, queue_size=10)
[docs] def as_state_callback(self, msg):
# rospy.loginfo(f'Switch to state {AutonomousSystemStateEnum(msg.as_state)}')
self.as_state = AutonomousSystemStateEnum(msg.as_state)
[docs] def mission_callback(self, msg):
rospy.loginfo(f'Received mission {AutonomousMissionEnum(msg.mission)}')
self.mission = AutonomousMissionEnum(msg.mission)
[docs] def completed_callback(self, msg):
self.mission_completed = msg.finished
[docs] def wheelspeed_callback(self, msg):
self.wheelspeed_fl = msg.left
self.wheelspeed_fr = msg.right
[docs] def motorspeed_callback(self, msg):
self.motorspeed_rl = msg.left
self.motorspeed_rr = msg.right
[docs] def start_inspection(self):
rospy.loginfo('[Mission Machine] Start Inspection Node.')
uuid = roslaunch.rlutil.get_or_generate_uuid(None, False)
roslaunch.configure_logging(uuid)
rospack = rospkg.RosPack()
cli_args = [f"{rospack.get_path('as_missions')}/launch/inspection.launch"]
self.inspection_process = roslaunch.parent.ROSLaunchParent(uuid, [(roslaunch.rlutil.resolve_launch_arguments(cli_args)[0])])
self.inspection_process.start()
[docs] def kill_inspection(self):
rospy.loginfo('[Mission Machine] Kill Inspection Node.')
self.inspection_process.shutdown()
[docs] def check_if_inspection_finished(self):
if self.motorspeed_rr is None or self.motorspeed_rl is None:
rospy.logwarn('[Mission Machine] Cannot check if inspection is finished since the rear wheelspeed message is missing.')
return False
return self.motorspeed_rr <= 0 and self.motorspeed_rl <= 0
[docs] def check_if_driving_mission_finished(self):
if self.motorspeed_rr is None or self.motorspeed_rl is None or self.wheelspeed_fr is None or self.wheelspeed_fl is None:
rospy.logwarn('[Mission Machine] Cannot check if mission is finished since the rear or front wheelspeed message is missing.')
return False
return self.motorspeed_rr <= 0 and self.motorspeed_rl <= 0 and self.wheelspeed_fr <= 0 and self.wheelspeed_fl <= 0
[docs] def check_if_at_standstill_in_emergency(self) -> bool:
if self.wheelspeed_fr is None or self.wheelspeed_fl is None:
rospy.logwarn('[Mission Machine] Cannot check if at standstill in emergency since the front wheelspeed message is missing.')
return False
return self.wheelspeed_fr <= self.stop_threshold and self.wheelspeed_fl <= self.stop_threshold
[docs] def finish_mission(self):
self.finished = True
self.state = MissionMachineEnum.PRE_FINISHED
[docs] def publish_steering_control_engage_msg(self, engage_control: bool):
steering_control_engage_msg = SteeringControlEngage()
steering_control_engage_msg.header.stamp = rospy.Time.now()
steering_control_engage_msg.engage = engage_control
self.steering_control_engage_pub.publish(steering_control_engage_msg)
[docs] def publish_finished_msg(self):
mission_finished_msg = MissionFinished()
mission_finished_msg.header.stamp = rospy.Time.now()
mission_finished_msg.finished = self.finished
self.mission_finished_pub.publish(mission_finished_msg)
[docs] def publish_velocity_torque_switch(self, use_velocity: bool):
msg = VelocityTorqueSwitch()
msg.header.stamp = rospy.Time.now()
msg.use_velocity = use_velocity
self.velocity_torque_switch_pub.publish(msg)
[docs] def publish_velocity(self, velocity: float):
msg = Velocity()
msg.header.stamp = rospy.Time.now()
msg.velocity = velocity
self.velocity_pub.publish(msg)
[docs] def publish_torque(self, torque: float):
msg = Torque()
msg.header.stamp = rospy.Time.now()
msg.torque = torque
self.torque_pub.publish(msg)
[docs] def publish_mission_machine_state(self):
msg = MissionMachineState()
msg.header.stamp = rospy.Time.now()
msg.state = self.state.value
self.mission_machine_state_pub.publish(msg)
[docs] def spawn_basic_test(self):
rospy.set_param('/mission', self.mission.value)
rospy.loginfo(f'[MissionMachine] Spawning basic test via launchfile')
uuid = roslaunch.rlutil.get_or_generate_uuid(None, False)
roslaunch.configure_logging(uuid)
rospack = rospkg.RosPack()
cli_args = [f"{rospack.get_path('basic_tests')}/launch/basic_test.launch"]
self.mission_process = roslaunch.parent.ROSLaunchParent(uuid, [(roslaunch.rlutil.resolve_launch_arguments(cli_args)[0])])
self.mission_process.start()
self.as_running = True
[docs] def kill_basic_test(self):
rospy.loginfo(f'[MissionMachine] Killing previously spawned basic test nodes!')
self.mission_process.shutdown()
self.as_running = False
if DEBUG is True:
self.rosbag_process.stop()
[docs] def spawn_as(self):
self.publish_velocity_torque_switch(use_velocity=self.only_use_classic_control)
#self.publish_velocity_torque_switch(use_velocity=False)
if self.mission == AutonomousMissionEnum.BRAKETEST and self.basic_tests is True:
self.spawn_basic_test()
return
rospy.set_param('/mission', self.mission.value)
rospy.loginfo(f'[MissionMachine] Spawning AS nodes via launchfile for {self.mission._name_}')
uuid = roslaunch.rlutil.get_or_generate_uuid(None, False)
roslaunch.configure_logging(uuid)
rospack = rospkg.RosPack()
cli_args = [f"{rospack.get_path('as_missions')}/launch/dynamic_discipline.launch"]
self.mission_process = roslaunch.parent.ROSLaunchParent(uuid, [(roslaunch.rlutil.resolve_launch_arguments(cli_args)[0])])
self.mission_process.start()
self.as_running = True
[docs] def start_rosbag(self):
rospy.loginfo('[Mission Machine] Start Rosbag.')
rospack = rospkg.RosPack()
node = roslaunch.core.Node('rosbag', 'play', args='/workspace/as_ros/rosbags/run_373_migrated.bag')
launch = roslaunch.scriptapi.ROSLaunch()
launch.start()
self.rosbag_process = launch.launch(node)
[docs] def kill_as(self):
if self.mission == AutonomousMissionEnum.BRAKETEST and self.basic_tests is True:
self.kill_basic_test()
return
rospy.loginfo(f'[MissionMachine] Killing previously spawned AS nodes for {self.mission._name_}!')
self.mission_process.shutdown()
self.as_running = False
if DEBUG is True:
self.rosbag_process.stop()
[docs] def switch_state(self):
if self.state == MissionMachineEnum.IDLE:
if self.as_state == AutonomousSystemStateEnum.READY:
if self.mission == AutonomousMissionEnum.INSPECTION:
rospy.loginfo('[MissionMachine] IDLE -> PRE_INSPECTION')
self.state = MissionMachineEnum.PRE_INSPECTION
self.publish_velocity_torque_switch(use_velocity=True)
elif self.mission in [AutonomousMissionEnum.BRAKETEST, AutonomousMissionEnum.ACCELERATION, AutonomousMissionEnum.SKIDPAD, AutonomousMissionEnum.TRACKDRIVE, AutonomousMissionEnum.AUTOCROSS]:
rospy.loginfo('[MissionMachine] IDLE -> READY')
self.spawn_as()
self.state = MissionMachineEnum.READY
self.publish_steering_control_engage_msg(engage_control=False)
elif self.state == MissionMachineEnum.READY:
if self.as_state == AutonomousSystemStateEnum.DRIVING:
if DEBUG is True:
self.start_rosbag()
rospy.loginfo('[MissionMachine] READY -> DRIVING')
self.publish_steering_control_engage_msg(engage_control=True)
self.state = MissionMachineEnum.DRIVING
if self.as_state == AutonomousSystemStateEnum.EMERGENCY:
rospy.loginfo('[MissionMachine] READY -> EMERGENCY')
self.state = MissionMachineEnum.EMERGENCY
self.kill_as()
elif self.state == MissionMachineEnum.DRIVING:
if self.as_state == AutonomousSystemStateEnum.EMERGENCY:
rospy.loginfo('[MissionMachine] DRIVING -> EMERGENCY')
self.state = MissionMachineEnum.EMERGENCY
if self.mission_completed is True:
if self.check_if_driving_mission_finished():
rospy.loginfo('[MissionMachine] DRIVING -> PRE_FINISHED')
self.finish_mission()
self.kill_as()
self.publish_steering_control_engage_msg(engage_control=False)
self.state = MissionMachineEnum.PRE_FINISHED
elif self.state == MissionMachineEnum.PRE_INSPECTION:
if self.as_state == AutonomousSystemStateEnum.DRIVING:
rospy.loginfo('[MissionMachine] PRE_INSPECTION -> INSPECTION')
self.publish_steering_control_engage_msg(engage_control=True)
self.state = MissionMachineEnum.INSPECTION
self.start_inspection()
elif self.as_state == AutonomousSystemStateEnum.EMERGENCY:
rospy.loginfo('[MissionMachine] PRE_INSPECTION -> EMERGENCY')
self.state = MissionMachineEnum.EMERGENCY
elif self.state == MissionMachineEnum.INSPECTION:
if self.as_state == AutonomousSystemStateEnum.EMERGENCY:
rospy.loginfo('[MissionMachine] INSPECTION -> EMERGENCY')
self.kill_inspection()
self.publish_steering_control_engage_msg(engage_control=False)
self.state = MissionMachineEnum.EMERGENCY
if self.mission_completed is True:
self.publish_steering_control_engage_msg(engage_control=False)
if self.check_if_inspection_finished():
rospy.loginfo('[MissionMachine] INSPECTION -> PRE_FINISHED')
self.finish_mission()
elif self.state == MissionMachineEnum.EMERGENCY:
if self.as_running is True:
if self.check_if_at_standstill_in_emergency():
rospy.loginfo('[MissionMachine] EMERGENCY: Vehicle at standstill. Disabling steering.')
self.kill_as()
self.publish_steering_control_engage_msg(engage_control=False)
self.publish_velocity_torque_switch(use_velocity=False) # use torque
self.publish_torque(0)
self.publish_velocity_torque_switch(use_velocity=True) # use velocity
self.publish_velocity(0)
if self.as_state == AutonomousSystemStateEnum.OFF:
if self.as_running is True:
self.kill_as()
self.publish_steering_control_engage_msg(engage_control=False)
rospy.loginfo('[MissionMachine] EMERGENCY -> IDLE')
self.state = MissionMachineEnum.IDLE
elif self.state == MissionMachineEnum.PRE_FINISHED:
if self.as_state == AutonomousSystemStateEnum.FINISHED:
rospy.loginfo('[MissionMachine] PRE_FINISHED -> FINISHED')
self.state = MissionMachineEnum.FINISHED
if self.as_state == AutonomousSystemStateEnum.EMERGENCY:
rospy.loginfo('[MissionMachine] PRE_FINISHED -> EMERGENCY')
self.state = MissionMachineEnum.EMERGENCY
elif self.state == MissionMachineEnum.FINISHED:
if self.as_state == AutonomousSystemStateEnum.OFF:
self.mission_completed = False
self.finished = False
rospy.loginfo('[MissionMachine] FINISHED -> IDLE')
self.state = MissionMachineEnum.IDLE
[docs] def run(self):
rate = rospy.Rate(20)
while not rospy.is_shutdown():
self.switch_state()
self.publish_mission_machine_state()
self.publish_finished_msg()
rate.sleep()
if __name__ == '__main__':
rospy.init_node('mission_machine', anonymous=False)
try:
rospy.loginfo('[Mission Machine] Mission Machine Node started.')
mission_machine = MissionMachine()
mission_machine.run()
except Exception as e:
if not isinstance(e, rospy.exceptions.ROSInterruptException):
rospy.logfatal(traceback.format_exc())
time.sleep(2)
raise e