#!/usr/bin/env python3
from matplotlib import use
import rospy
import roslaunch
import rospkg
from enum import Enum
import time
import traceback
from as_can_msgs.msg import AutonomousSystemState , AutonomousMission , MissionFinished , Wheelspeed , \
SteeringControlEngage , VelocityTorqueSwitch , MissionMachineState , Velocity , Torque
DEBUG = False
[docs] class AutonomousSystemStateEnum ( Enum ):
STARTUP = 0
OFF = 1
READY = 2
DRIVING = 3
EMERGENCY = 4
FINISHED = 5
[docs] class AutonomousMissionEnum ( Enum ):
MANUAL_DRIVING = 0
ACCELERATION = 1
SKIDPAD = 2
TRACKDRIVE = 3
BRAKETEST = 4
INSPECTION = 5
AUTOCROSS = 6
[docs] class MissionMachineEnum ( Enum ):
IDLE = 0
READY = 1
DRIVING = 2
PRE_FINISHED = 3
FINISHED = 4
EMERGENCY = 5
PRE_INSPECTION = 6
INSPECTION = 7
[docs] class MissionMachine ():
def __init__ ( self ):
self . state = MissionMachineEnum ( 0 )
self . as_running = False
self . stop_threshold = rospy . get_param ( '/mission_machine/stop_threshold' )
self . basic_tests = rospy . get_param ( '/mission_machine/basic_tests' )
self . only_use_classic_control = rospy . get_param ( '/control/use_old_control' )
self . as_state = AutonomousSystemStateEnum ( 0 )
rospy . Subscriber ( '/can/per/as_state' , AutonomousSystemState , self . as_state_callback )
self . mission = AutonomousMissionEnum ( 0 )
rospy . Subscriber ( '/can/per/autonomous_mission' , AutonomousMission , self . mission_callback )
self . mission_completed = False
rospy . Subscriber ( '/finish_line_detection/mission_completed' , MissionFinished , self . completed_callback )
self . motorspeed_rr = None
self . motorspeed_rl = None
rospy . Subscriber ( '/can/log/wheelspeed/rear' , Wheelspeed , self . motorspeed_callback )
self . wheelspeed_fr = None
self . wheelspeed_fl = None
rospy . Subscriber ( '/can/per/wheelspeed/front' , Wheelspeed , self . wheelspeed_callback )
self . finished = False
self . mission_finished_pub = rospy . Publisher ( '/mission_machine/finished' , MissionFinished , queue_size = 10 )
self . steering_control_engage_pub = rospy . Publisher ( '/mission_machine/steering_control_engage' , SteeringControlEngage , queue_size = 10 )
self . velocity_torque_switch_pub = rospy . Publisher ( '/mission_machine/use_velocity' , VelocityTorqueSwitch , queue_size = 10 )
self . velocity_pub = rospy . Publisher ( '/control/velocity' , Velocity , queue_size = 10 )
self . torque_pub = rospy . Publisher ( '/control/torque' , Torque , queue_size = 10 )
self . mission_machine_state_pub = rospy . Publisher ( '/mission_machine/state' , MissionMachineState , queue_size = 10 )
[docs] def as_state_callback ( self , msg ):
# rospy.loginfo(f'Switch to state {AutonomousSystemStateEnum(msg.as_state)}')
self . as_state = AutonomousSystemStateEnum ( msg . as_state )
[docs] def mission_callback ( self , msg ):
rospy . loginfo ( f 'Received mission { AutonomousMissionEnum ( msg . mission ) } ' )
self . mission = AutonomousMissionEnum ( msg . mission )
[docs] def completed_callback ( self , msg ):
self . mission_completed = msg . finished
[docs] def wheelspeed_callback ( self , msg ):
self . wheelspeed_fl = msg . left
self . wheelspeed_fr = msg . right
[docs] def motorspeed_callback ( self , msg ):
self . motorspeed_rl = msg . left
self . motorspeed_rr = msg . right
[docs] def start_inspection ( self ):
rospy . loginfo ( '[Mission Machine] Start Inspection Node.' )
uuid = roslaunch . rlutil . get_or_generate_uuid ( None , False )
roslaunch . configure_logging ( uuid )
rospack = rospkg . RosPack ()
cli_args = [ f " { rospack . get_path ( 'as_missions' ) } /launch/inspection.launch" ]
self . inspection_process = roslaunch . parent . ROSLaunchParent ( uuid , [( roslaunch . rlutil . resolve_launch_arguments ( cli_args )[ 0 ])])
self . inspection_process . start ()
[docs] def kill_inspection ( self ):
rospy . loginfo ( '[Mission Machine] Kill Inspection Node.' )
self . inspection_process . shutdown ()
[docs] def check_if_inspection_finished ( self ):
if self . motorspeed_rr is None or self . motorspeed_rl is None :
rospy . logwarn ( '[Mission Machine] Cannot check if inspection is finished since the rear wheelspeed message is missing.' )
return False
return self . motorspeed_rr <= 0 and self . motorspeed_rl <= 0
[docs] def check_if_driving_mission_finished ( self ):
if self . motorspeed_rr is None or self . motorspeed_rl is None or self . wheelspeed_fr is None or self . wheelspeed_fl is None :
rospy . logwarn ( '[Mission Machine] Cannot check if mission is finished since the rear or front wheelspeed message is missing.' )
return False
return self . motorspeed_rr <= 0 and self . motorspeed_rl <= 0 and self . wheelspeed_fr <= 0 and self . wheelspeed_fl <= 0
[docs] def check_if_at_standstill_in_emergency ( self ) -> bool :
if self . wheelspeed_fr is None or self . wheelspeed_fl is None :
rospy . logwarn ( '[Mission Machine] Cannot check if at standstill in emergency since the front wheelspeed message is missing.' )
return False
return self . wheelspeed_fr <= self . stop_threshold and self . wheelspeed_fl <= self . stop_threshold
[docs] def finish_mission ( self ):
self . finished = True
self . state = MissionMachineEnum . PRE_FINISHED
[docs] def publish_steering_control_engage_msg ( self , engage_control : bool ):
steering_control_engage_msg = SteeringControlEngage ()
steering_control_engage_msg . header . stamp = rospy . Time . now ()
steering_control_engage_msg . engage = engage_control
self . steering_control_engage_pub . publish ( steering_control_engage_msg )
[docs] def publish_finished_msg ( self ):
mission_finished_msg = MissionFinished ()
mission_finished_msg . header . stamp = rospy . Time . now ()
mission_finished_msg . finished = self . finished
self . mission_finished_pub . publish ( mission_finished_msg )
[docs] def publish_velocity_torque_switch ( self , use_velocity : bool ):
msg = VelocityTorqueSwitch ()
msg . header . stamp = rospy . Time . now ()
msg . use_velocity = use_velocity
self . velocity_torque_switch_pub . publish ( msg )
[docs] def publish_velocity ( self , velocity : float ):
msg = Velocity ()
msg . header . stamp = rospy . Time . now ()
msg . velocity = velocity
self . velocity_pub . publish ( msg )
[docs] def publish_torque ( self , torque : float ):
msg = Torque ()
msg . header . stamp = rospy . Time . now ()
msg . torque = torque
self . torque_pub . publish ( msg )
[docs] def publish_mission_machine_state ( self ):
msg = MissionMachineState ()
msg . header . stamp = rospy . Time . now ()
msg . state = self . state . value
self . mission_machine_state_pub . publish ( msg )
[docs] def spawn_basic_test ( self ):
rospy . set_param ( '/mission' , self . mission . value )
rospy . loginfo ( f '[MissionMachine] Spawning basic test via launchfile' )
uuid = roslaunch . rlutil . get_or_generate_uuid ( None , False )
roslaunch . configure_logging ( uuid )
rospack = rospkg . RosPack ()
cli_args = [ f " { rospack . get_path ( 'basic_tests' ) } /launch/basic_test.launch" ]
self . mission_process = roslaunch . parent . ROSLaunchParent ( uuid , [( roslaunch . rlutil . resolve_launch_arguments ( cli_args )[ 0 ])])
self . mission_process . start ()
self . as_running = True
[docs] def kill_basic_test ( self ):
rospy . loginfo ( f '[MissionMachine] Killing previously spawned basic test nodes!' )
self . mission_process . shutdown ()
self . as_running = False
if DEBUG is True :
self . rosbag_process . stop ()
[docs] def spawn_as ( self ):
self . publish_velocity_torque_switch ( use_velocity = self . only_use_classic_control )
#self.publish_velocity_torque_switch(use_velocity=False)
if self . mission == AutonomousMissionEnum . BRAKETEST and self . basic_tests is True :
self . spawn_basic_test ()
return
rospy . set_param ( '/mission' , self . mission . value )
rospy . loginfo ( f '[MissionMachine] Spawning AS nodes via launchfile for { self . mission . _name_ } ' )
uuid = roslaunch . rlutil . get_or_generate_uuid ( None , False )
roslaunch . configure_logging ( uuid )
rospack = rospkg . RosPack ()
cli_args = [ f " { rospack . get_path ( 'as_missions' ) } /launch/dynamic_discipline.launch" ]
self . mission_process = roslaunch . parent . ROSLaunchParent ( uuid , [( roslaunch . rlutil . resolve_launch_arguments ( cli_args )[ 0 ])])
self . mission_process . start ()
self . as_running = True
[docs] def start_rosbag ( self ):
rospy . loginfo ( '[Mission Machine] Start Rosbag.' )
rospack = rospkg . RosPack ()
node = roslaunch . core . Node ( 'rosbag' , 'play' , args = '/workspace/as_ros/rosbags/run_373_migrated.bag' )
launch = roslaunch . scriptapi . ROSLaunch ()
launch . start ()
self . rosbag_process = launch . launch ( node )
[docs] def kill_as ( self ):
if self . mission == AutonomousMissionEnum . BRAKETEST and self . basic_tests is True :
self . kill_basic_test ()
return
rospy . loginfo ( f '[MissionMachine] Killing previously spawned AS nodes for { self . mission . _name_ } !' )
self . mission_process . shutdown ()
self . as_running = False
if DEBUG is True :
self . rosbag_process . stop ()
[docs] def switch_state ( self ):
if self . state == MissionMachineEnum . IDLE :
if self . as_state == AutonomousSystemStateEnum . READY :
if self . mission == AutonomousMissionEnum . INSPECTION :
rospy . loginfo ( '[MissionMachine] IDLE -> PRE_INSPECTION' )
self . state = MissionMachineEnum . PRE_INSPECTION
self . publish_velocity_torque_switch ( use_velocity = True )
elif self . mission in [ AutonomousMissionEnum . BRAKETEST , AutonomousMissionEnum . ACCELERATION , AutonomousMissionEnum . SKIDPAD , AutonomousMissionEnum . TRACKDRIVE , AutonomousMissionEnum . AUTOCROSS ]:
rospy . loginfo ( '[MissionMachine] IDLE -> READY' )
self . spawn_as ()
self . state = MissionMachineEnum . READY
self . publish_steering_control_engage_msg ( engage_control = False )
elif self . state == MissionMachineEnum . READY :
if self . as_state == AutonomousSystemStateEnum . DRIVING :
if DEBUG is True :
self . start_rosbag ()
rospy . loginfo ( '[MissionMachine] READY -> DRIVING' )
self . publish_steering_control_engage_msg ( engage_control = True )
self . state = MissionMachineEnum . DRIVING
if self . as_state == AutonomousSystemStateEnum . EMERGENCY :
rospy . loginfo ( '[MissionMachine] READY -> EMERGENCY' )
self . state = MissionMachineEnum . EMERGENCY
self . kill_as ()
elif self . state == MissionMachineEnum . DRIVING :
if self . as_state == AutonomousSystemStateEnum . EMERGENCY :
rospy . loginfo ( '[MissionMachine] DRIVING -> EMERGENCY' )
self . state = MissionMachineEnum . EMERGENCY
if self . mission_completed is True :
if self . check_if_driving_mission_finished ():
rospy . loginfo ( '[MissionMachine] DRIVING -> PRE_FINISHED' )
self . finish_mission ()
self . kill_as ()
self . publish_steering_control_engage_msg ( engage_control = False )
self . state = MissionMachineEnum . PRE_FINISHED
elif self . state == MissionMachineEnum . PRE_INSPECTION :
if self . as_state == AutonomousSystemStateEnum . DRIVING :
rospy . loginfo ( '[MissionMachine] PRE_INSPECTION -> INSPECTION' )
self . publish_steering_control_engage_msg ( engage_control = True )
self . state = MissionMachineEnum . INSPECTION
self . start_inspection ()
elif self . as_state == AutonomousSystemStateEnum . EMERGENCY :
rospy . loginfo ( '[MissionMachine] PRE_INSPECTION -> EMERGENCY' )
self . state = MissionMachineEnum . EMERGENCY
elif self . state == MissionMachineEnum . INSPECTION :
if self . as_state == AutonomousSystemStateEnum . EMERGENCY :
rospy . loginfo ( '[MissionMachine] INSPECTION -> EMERGENCY' )
self . kill_inspection ()
self . publish_steering_control_engage_msg ( engage_control = False )
self . state = MissionMachineEnum . EMERGENCY
if self . mission_completed is True :
self . publish_steering_control_engage_msg ( engage_control = False )
if self . check_if_inspection_finished ():
rospy . loginfo ( '[MissionMachine] INSPECTION -> PRE_FINISHED' )
self . finish_mission ()
elif self . state == MissionMachineEnum . EMERGENCY :
if self . as_running is True :
if self . check_if_at_standstill_in_emergency ():
rospy . loginfo ( '[MissionMachine] EMERGENCY: Vehicle at standstill. Disabling steering.' )
self . kill_as ()
self . publish_steering_control_engage_msg ( engage_control = False )
self . publish_velocity_torque_switch ( use_velocity = False ) # use torque
self . publish_torque ( 0 )
self . publish_velocity_torque_switch ( use_velocity = True ) # use velocity
self . publish_velocity ( 0 )
if self . as_state == AutonomousSystemStateEnum . OFF :
if self . as_running is True :
self . kill_as ()
self . publish_steering_control_engage_msg ( engage_control = False )
rospy . loginfo ( '[MissionMachine] EMERGENCY -> IDLE' )
self . state = MissionMachineEnum . IDLE
elif self . state == MissionMachineEnum . PRE_FINISHED :
if self . as_state == AutonomousSystemStateEnum . FINISHED :
rospy . loginfo ( '[MissionMachine] PRE_FINISHED -> FINISHED' )
self . state = MissionMachineEnum . FINISHED
if self . as_state == AutonomousSystemStateEnum . EMERGENCY :
rospy . loginfo ( '[MissionMachine] PRE_FINISHED -> EMERGENCY' )
self . state = MissionMachineEnum . EMERGENCY
elif self . state == MissionMachineEnum . FINISHED :
if self . as_state == AutonomousSystemStateEnum . OFF :
self . mission_completed = False
self . finished = False
rospy . loginfo ( '[MissionMachine] FINISHED -> IDLE' )
self . state = MissionMachineEnum . IDLE
[docs] def run ( self ):
rate = rospy . Rate ( 20 )
while not rospy . is_shutdown ():
self . switch_state ()
self . publish_mission_machine_state ()
self . publish_finished_msg ()
rate . sleep ()
if __name__ == '__main__' :
rospy . init_node ( 'mission_machine' , anonymous = False )
try :
rospy . loginfo ( '[Mission Machine] Mission Machine Node started.' )
mission_machine = MissionMachine ()
mission_machine . run ()
except Exception as e :
if not isinstance ( e , rospy . exceptions . ROSInterruptException ):
rospy . logfatal ( traceback . format_exc ())
time . sleep ( 2 )
raise e
Copy to clipboard