#!/usr/bin/env python3
import threading
import numpy as np
import can
import cantools
from enum import Enum
import rospy
import rospkg
import roslaunch
import rosnode
import time
import traceback
from can_msgs.msg import Frame
from as_can_msgs.msg import LapCount, ConesCountAll, ConesCountActual, ServiceBrakeState, SteeringControlEngage, \
AutonomousSystemState, AutonomousMission, Velocity, MissionFinished, FuseStatus, \
ShutdownUnitState, BrakePressure, PneumaticPressure, VelocityTorqueSwitch, Torque, \
ImuGyro, ImuAcceleration, SteeringWheelAngle, ResStatus, Wheelspeed, \
CheckupSequenceState, MonitoringStatus, NodeState, MissionMachineState
from utilities.msg import VehiclePose
[docs]class AutonomousSystemStateEnum(Enum):
STARTUP = 0
OFF = 1
READY = 2
DRIVING = 3
EMERGENCY = 4
FINISHED = 5
[docs]class ServiceBrakeStateEnum(Enum):
DISENGAGED = 1
ENGAGED = 2
AVAILABLE = 3
[docs]class CanBridge():
def __init__(self, path_to_log_dbc: str, path_to_per_dbc: str) -> None:
self.per: cantools.db.Database = cantools.database.load_file(path_to_per_dbc)
self.log: cantools.db.Database = cantools.database.load_file(path_to_log_dbc)
self.log_bus = can.interface.Bus(rospy.get_param('/can/log/log/can_device'), bustype='socketcan')
self.per_bus = can.interface.Bus(rospy.get_param('/can/per/per/can_device'), bustype='socketcan')
self.setup_variables()
self.setup_messages()
self.setup_log_rx()
self.setup_per_rx()
self.setup_per_tx()
self.setup_log_tx()
self.main_thread()
[docs] def setup_messages(self):
# per tx
self.steering_control_msg: cantools.db.Message = self.per.get_message_by_name('ACUsCTRL_ActcControl')
self.service_brake_msg: cantools.db.Message = self.per.get_message_by_name('ACUsCTRL_ServiceBrake_PRESSxREQ')
self.monitoring_status_msg: cantools.db.Message = self.per.get_message_by_name('ACUsMON_STATE')
self.dsb_msg: cantools.db.Message = self.per.get_message_by_name('ACUsMON_RosNodes_STATE')
self.ros_recording_msg: cantools.db.Message = self.per.get_message_by_name('ACUsCAN_RosbagRecording_STATUS')
# log tx
self.dv_driving_diagnostics_1_msg: cantools.db.Message = self.log.get_message_by_name('ACU_DvDrivingDiagnostics1')
self.dv_driving_diagnostics_2_msg: cantools.db.Message = self.log.get_message_by_name('ACU_DvDrivingDiagnostics2')
self.dv_system_diagnostics_msg: cantools.db.Message = self.log.get_message_by_name('ACU_DvSystemDiagnostics')
self.engine_control_msg: cantools.db.Message = self.log.get_message_by_name('ACUsCTRL_EngineControl')
self.mission_finished_msg: cantools.db.Message = self.log.get_message_by_name('ACUsMM_MissionFinished_STATUS')
[docs] def setup_variables(self):
# per rx
self.steering_wheel_angle_actual = 0
self.pneumatic_pressure_override_front = 0
self.pneumatic_pressure_override_rear = 0
self.hydraulic_pressure_front = 0
self.hydraulic_pressure_rear = 0
self.autonomous_mission = 1 # first possible state: Acceleration
self.imu_accel_x = 0
self.imu_accel_y = 0
self.gyro_z = 0
self.sdu_state = 1 # open
self.sdu_as_status = 0 # idle
self.asms_status = 0 # blown
self.as_state = 1 # AS off
self.ebs_state = 1 # unavailable
self.last_log_button_state = 0 # not pressed
self.log_button_state = 0 # not pressed
# per tx
self.camera_state = 0
self.lidar_state = 0
self.gps_state = 0
self.filtering_state = 0
self.slam_state = 0
self.control_state = 0
self.planning_state = 0
self.monitoring_state = 0
if('/monitoring' in rosnode.get_node_names()):
self.monitoring_state = 1
self.local_mapping_state = 0
self.lidar_perception_state = 0
self.perception_state = 0
self.mission_machine_state = 0
self.imu_state = 0
self.wheelspeed_front_state = 0
self.wheelspeed_rear_state = 0
self.steering_control_engage = False
self.steering_wheel_angle_request = 0
# log tx
self.torque_request = 0
self.wheel_radius = rospy.get_param('/vehicle/wheel_radius')
self.gear_ratio = rospy.get_param('/vehicle/rear_gear_ratio')
self.velocity_request = 0
self.slam_velocity = 0
self.torque_actual = 0
self.lap_count = 0
self.cones_all_count = 0
self.cones_actual_count = 0
self.use_velocity = True
[docs] def setup_per_rx(self):
self.per_receiving_frame_names= ['ACTC_Measurements', 'ASBCsSENS_PneumaticFrontxACT', 'ASBCsSENS_PneumaticRearxACT', 'ASBCsCUS_CheckupSequence_STATE',
'BPCF_PRESS', 'BPCR_PRESS', 'DSB_AutonomousMission_STATExSET', 'IMU_ACCEL', 'IMU_ANGLERATE',
'SDU', 'SUP_Fuse_STATUS', 'VCU_Statemachine_STATE', 'WB_WheelF_SPEED', 'DSB_Buttons_STATE']
self.steering_wheel_angle_pub = rospy.Publisher('/can/per/steering_wheel_angle/actual', SteeringWheelAngle, queue_size=10)
self.pneumatic_pressure_front_pub = rospy.Publisher('/can/per/pneumatic_pressure/front', PneumaticPressure, queue_size=10)
self.pneumatic_pressure_rear_pub = rospy.Publisher('/can/per/pneumatic_pressure/rear', PneumaticPressure, queue_size=10)
self.checkup_sequence_pub = rospy.Publisher('/can/per/checkup_sequence_state', CheckupSequenceState, queue_size=10)
self.hydraulic_pressure_front_pub = rospy.Publisher('/can/per/hydraulic_pressure/front', BrakePressure, queue_size=10)
self.hydraulic_pressure_rear_pub = rospy.Publisher('/can/per/hydraulic_pressure/rear', BrakePressure, queue_size=10)
self.autonomous_mission_pub = rospy.Publisher('/can/per/autonomous_mission', AutonomousMission, queue_size=10)
self.imu_accel_pub = rospy.Publisher('/can/per/imu/accel', ImuAcceleration, queue_size=10)
self.imu_gyro_pub = rospy.Publisher('/can/per/imu/gyro', ImuGyro, queue_size=10)
self.sdu_pub = rospy.Publisher('/can/per/sdu_state', ShutdownUnitState, queue_size=10)
self.asms_pub = rospy.Publisher('/can/per/asms_status', FuseStatus, queue_size=10)
self.autonomous_driving_pub = rospy.Publisher('/can/per/as_state', AutonomousSystemState, queue_size=10)
self.wheelspeed_pub = rospy.Publisher('/can/per/wheelspeed/front', Wheelspeed, queue_size=10)
rospy.Subscriber('/can/per/received_messages', Frame, self.per_frame_received_callback)
[docs] def setup_log_rx(self):
self.log_receiving_frame_names = ['RES_Status', 'VCUsINVx_Diagnostics']
self.res_pub = rospy.Publisher('/can/log/res_status', ResStatus, queue_size=10)
self.wheelspeed_rear_pub = rospy.Publisher('/can/log/wheelspeed/rear', Wheelspeed, queue_size=10)
self.torque_pub = rospy.Publisher('/can/log/torque/actual', Torque, queue_size=10)
rospy.Subscriber('/can/log/received_messages', Frame, self.log_frame_received_callback)
[docs] def setup_per_tx(self):
rospy.Subscriber('/control/steering_wheel_angle/request', SteeringWheelAngle, self.steering_wheel_angle_callback)
rospy.Subscriber('/mission_machine/steering_control_engage', SteeringControlEngage, self.steering_control_engange_callback)
self.server_brake_pneumatic_pressure_request = 0
rospy.Subscriber('/control/service_brake/request', BrakePressure, self.service_brake_callback)
rospy.Subscriber('/monitoring/status', MonitoringStatus, self.monitoring_status_callback)
# dashboard node status callbacks
self.monitoring_received_new = rospy.Time(secs=0)
self.monitoring_received_old = rospy.Time(secs=0)
rospy.Subscriber('/monitoring/node_states', NodeState, self.node_states_callback)
#rospy.Subscriber('/monitoring/sensor_states', SensorStates, self.sensor_states_callback)
rospy.Subscriber('/mission_machine/state', MissionMachineState, self.mission_machine_state_callback)
self.dsb_rate = rospy.Rate(10) # 100 ms cyclic time
self.dsb_publisher = threading.Thread(target=self.send_dashboard_msg, daemon=True)
self.dsb_publisher.start()
self.rosbag_recording_process = None
self.rosbag_recording_event = threading.Event()
self.ros_recording_rate = rospy.Rate(10) # 100 ms cyclic time
self.ros_recording_thread = threading.Thread(target=self.send_rosbag_record_status, daemon=True)
self.ros_recording_thread.start()
[docs] def setup_log_tx(self):
# torque
rospy.Subscriber('/control/torque/request', Torque, self.torque_callback)
# velocity
rospy.Subscriber('/control/velocity/request', Velocity, self.velocity_callback)
# vehicle velocity from slam
rospy.Subscriber('/slam/vehicle_pose', VehiclePose, self.slam_velocity_callback)
# actual torque
rospy.Subscriber('/can/log/torque/actual', Torque, self.actual_torque_callback)
# dv driving diagnostics 1
self.dv_driving_diagnostics_1_rate = rospy.Rate(10) # 100 ms cyclic time
self.dv_driving_diagnostics_1_publisher = threading.Thread(target=self.send_dv_driving_diagnostics_1, daemon=True)
self.dv_driving_diagnostics_1_publisher.start()
# dv driving diagnostics 2
self.dv_driving_diagnostics_2_rate = rospy.Rate(10) # 100 ms cyclic time
self.dv_driving_diagnostics_2_publisher = threading.Thread(target=self.send_dv_driving_diagnostics_2, daemon=True)
self.dv_driving_diagnostics_2_publisher.start()
# lap count
rospy.Subscriber('/finish_line_detection/lap_count', LapCount, self.lap_count_callback)
# cone counts
rospy.Subscriber('/slam/cone_count', ConesCountAll, self.slam_cones_count_callback)
rospy.Subscriber('/local_mapping/cone_count', ConesCountActual, self.local_mapping_cones_count_callback)
# dv system diagnostics
self.dv_system_diagnostics_rate = rospy.Rate(10) # 100 ms cyclic time
self.dv_system_diagnostics_publisher = threading.Thread(target=self.send_dv_system_diagnostics, daemon=True)
self.dv_system_diagnostics_publisher.start()
# engine control msg
# switch between torque and velocity from mission machine
rospy.Subscriber('/mission_machine/use_velocity', VelocityTorqueSwitch, self.velocity_torque_switch_callback)
rospy.Subscriber('/mission_machine/finished', MissionFinished, self.mission_finished_callback)
[docs] def generate_message(self, data, frame_definition):
"""
Used for sending it on the can device directly.
"""
return can.Message(arbitration_id=frame_definition.frame_id, data=data,
is_extended_id=frame_definition.is_extended_frame)
[docs] def generate_frame(self, data, frame_definition):
"""
Used for sending it on the can ros bridge.
"""
frame = Frame()
frame.header.stamp = rospy.Time.now()
frame.data = data
frame.dlc = frame_definition.length
frame.is_extended = frame_definition.is_extended_frame
frame.is_error = False
frame.is_rtr = False
frame.id = frame_definition.frame_id
return frame
[docs] def log_frame_received_callback(self, received_frame: Frame):
try:
dbc_frame: cantools.database.can.Message = self.log.get_message_by_frame_id(frame_id=received_frame.id)
except KeyError:
# there is no CAN Frame specified within the dbc file with this id, ignore this message
return
if dbc_frame.name not in self.log_receiving_frame_names:
# frame is not of interest for the ACU.
return
try:
# interprete the current frame according to the dbc file
interpreted_signals = self.log.decode_message(received_frame.id, received_frame.data, decode_choices=False)
except Exception:
rospy.loginfo(f'Cannot interprete log can message with id={received_frame.id} and data={received_frame.data}')
return
if dbc_frame.name == 'RES_Status':
res_message = ResStatus()
res_message.header.stamp = received_frame.header.stamp
res_message.switch = interpreted_signals.get('RES_Switch_STATUS')
res_message.button = interpreted_signals.get('RES_Button_STATUS')
res_message.emergency_button = interpreted_signals.get('RES_EmergencyButton0_STATUS')
self.res_pub.publish(res_message)
elif dbc_frame.name == 'VCUsINVx_Diagnostics':
wheelspeed_msg = Wheelspeed()
wheelspeed_msg.header.stamp = received_frame.header.stamp
wheelspeed_msg.left = interpreted_signals.get('VCUsINVL_EngineRL_SPEED')
wheelspeed_msg.right = interpreted_signals.get('VCUsINVR_EngineRR_SPEED')
self.wheelspeed_rear_pub.publish(wheelspeed_msg)
torque_msg = Torque()
torque_msg.header.stamp = received_frame.header.stamp
torque_msg.torque = interpreted_signals.get('VCUsINV_ActualTorque_CURR') / 0.64 # 0.64 is the scaling factor between torque and current
self.torque_pub.publish(torque_msg)
[docs] def per_frame_received_callback(self, received_frame):
try:
dbc_frame: cantools.database.can.Message = self.per.get_message_by_frame_id(frame_id=received_frame.id)
except KeyError:
# there is no CAN Frame specified within the dbc file with this id, ignore this message
return
if dbc_frame.name not in self.per_receiving_frame_names:
# frame is not of interest for the ACU.
return
try:
# interprete the current frame according to the dbc file
interpreted_signals = self.per.decode_message(received_frame.id, received_frame.data, decode_choices=False)
except Exception:
rospy.loginfo(f'Cannot interprete can message with id={received_frame.id} and data={received_frame.data}')
return
# switch case which message got received and publish the according data on the according ros topic
if dbc_frame.name == 'ACTC_Measurements':
steering_wheel_angle_msg = SteeringWheelAngle()
steering_wheel_angle_msg.header.stamp = received_frame.header.stamp
self.steering_wheel_angle_actual = interpreted_signals.get('ACTC_SteeringWheel_ANGLExACT')
steering_wheel_angle_msg.angle = self.steering_wheel_angle_actual
self.steering_wheel_angle_pub.publish(steering_wheel_angle_msg)
elif dbc_frame.name == 'ASBCsSENS_PneumaticFrontxACT':
pneumatic_pressure_front_msg = PneumaticPressure()
pneumatic_pressure_front_msg.header.stamp = received_frame.header.stamp
pneumatic_pressure_front_msg.pneumatic_pressure = interpreted_signals.get('ASBCsSENS_Front_Pressure_PRESS')
pneumatic_pressure_front_msg.SB_pressure = interpreted_signals.get('ASBCsSENS_Front_SB_PRESS')
pneumatic_pressure_front_msg.tank_pressure = interpreted_signals.get('ASBCsSENS_Front_Tank_PRESS')
self.pneumatic_pressure_override_front = interpreted_signals.get('ASBCsSENS_Front_Override_PRESS')
pneumatic_pressure_front_msg.override_pressure = self.pneumatic_pressure_override_front
self.pneumatic_pressure_front_pub.publish(pneumatic_pressure_front_msg)
elif dbc_frame.name == 'ASBCsSENS_PneumaticRearxACT':
pneumatic_pressure_rear_msg = PneumaticPressure()
pneumatic_pressure_rear_msg.header.stamp = received_frame.header.stamp
pneumatic_pressure_rear_msg.pneumatic_pressure = interpreted_signals.get('ASBCsSENS_Rear_Pressure_PRESS')
pneumatic_pressure_rear_msg.SB_pressure = interpreted_signals.get('ASBCsSENS_Rear_SB_PRESS')
pneumatic_pressure_rear_msg.tank_pressure = interpreted_signals.get('ASBCsSENS_Rear_Tank_PRESS')
self.pneumatic_pressure_override_rear = interpreted_signals.get('ASBCsSENS_Rear_Override_PRESS')
pneumatic_pressure_rear_msg.override_pressure = self.pneumatic_pressure_override_rear
self.pneumatic_pressure_rear_pub.publish(pneumatic_pressure_rear_msg)
elif dbc_frame.name == 'ASBCsCUS_CheckupSequence_STATE':
# rospy.loginfo(f'Received CUS state {interpreted_signals.get("ASBCsCUS_Cus_STATE")}')
checkup_sequence_msg = CheckupSequenceState()
checkup_sequence_msg.header.stamp = received_frame.header.stamp
checkup_sequence_msg.state = interpreted_signals.get('ASBCsCUS_Cus_STATE')
self.checkup_sequence_pub.publish(checkup_sequence_msg)
elif dbc_frame.name == 'BPCF_PRESS':
brake_pressure_front_msg = BrakePressure()
brake_pressure_front_msg.header.stamp = received_frame.header.stamp
self.hydraulic_pressure_front = interpreted_signals.get('BPCF_BrakeSystemF_PRESS')
brake_pressure_front_msg.pressure = self.hydraulic_pressure_front
self.hydraulic_pressure_front_pub.publish(brake_pressure_front_msg)
elif dbc_frame.name == 'BPCR_PRESS':
brake_pressure_rear_msg = BrakePressure()
brake_pressure_rear_msg.header.stamp = received_frame.header.stamp
self.hydraulic_pressure_rear = interpreted_signals.get('BPCR_BrakeSystemR_PRESS')
brake_pressure_rear_msg.pressure = self.hydraulic_pressure_rear
self.hydraulic_pressure_rear_pub.publish(brake_pressure_rear_msg)
elif dbc_frame.name == 'DSB_AutonomousMission_STATExSET':
autonomous_mission_msg = AutonomousMission()
autonomous_mission_msg.header.stamp = received_frame.header.stamp
self.autonomous_mission = interpreted_signals.get('DSB_AutonomousMission_STATExSET')
autonomous_mission_msg.mission = self.autonomous_mission
self.autonomous_mission_pub.publish(autonomous_mission_msg)
elif dbc_frame.name == 'IMU_ACCEL':
imu_acceleration_msg = ImuAcceleration()
imu_acceleration_msg.header.stamp = received_frame.header.stamp
self.imu_accel_x = interpreted_signals.get('IMU_X_ACCEL')
imu_acceleration_msg.x = self.imu_accel_x
self.imu_accel_y = interpreted_signals.get('IMU_Y_ACCEL')
imu_acceleration_msg.y = self.imu_accel_y
imu_acceleration_msg.z = interpreted_signals.get('IMU_Z_ACCEL')
self.imu_accel_pub.publish(imu_acceleration_msg)
elif dbc_frame.name == 'IMU_ANGLERATE':
imu_gyro_msg = ImuGyro()
imu_gyro_msg.header.stamp = received_frame.header.stamp
imu_gyro_msg.roll = interpreted_signals.get('IMU_X_ANGLERATE')
imu_gyro_msg.pitch = interpreted_signals.get('IMU_Y_ANGLERATE')
self.gyro_z = interpreted_signals.get('IMU_Z_ANGLERATE')
imu_gyro_msg.yaw = self.gyro_z
self.imu_gyro_pub.publish(imu_gyro_msg)
elif dbc_frame.name == 'SDU':
sdu_msg = ShutdownUnitState()
sdu_msg.header.stamp = received_frame.header.stamp
sdu_msg.state = interpreted_signals.get('SDU_StateMachine_STATE')
sdu_msg.as_status = interpreted_signals.get('SDU_SdcAs_STATUS')
self.sdu_pub.publish(sdu_msg)
elif dbc_frame.name == 'SUP_Fuse_STATUS':
asms_msg = FuseStatus()
asms_msg.header.stamp = received_frame.header.stamp
self.asms_status = interpreted_signals.get('SUP_AsmsFuse_STATUS')
asms_msg.intact = self.asms_status
self.asms_pub.publish(asms_msg)
elif dbc_frame.name == 'VCU_Statemachine_STATE':
as_state_msg = AutonomousSystemState()
as_state_msg.header.stamp = received_frame.header.stamp
self.as_state = interpreted_signals.get('VCUsSM_AutonomousSystem_STATE')
as_state_msg.as_state = self.as_state
self.ebs_state = interpreted_signals.get('VCUsSM_Ebs_STATE')
as_state_msg.ebs_state = self.ebs_state
self.autonomous_driving_pub.publish(as_state_msg)
elif dbc_frame.name == 'WB_WheelF_SPEED':
wheelspeed_msg = Wheelspeed()
wheelspeed_msg.header.stamp = received_frame.header.stamp
wheelspeed_msg.left = interpreted_signals.get('WB_WheelFL_SPEED')
wheelspeed_msg.right = interpreted_signals.get('WB_WheelFR_SPEED')
self.wheelspeed_pub.publish(wheelspeed_msg)
elif dbc_frame.name == 'DSB_Buttons_STATE':
self.handle_log_button_state(interpreted_signals.get('DSB_LogButton_STATE'))
[docs] def send_steering_msg(self):
data = self.steering_control_msg.encode({'ACU_ControlEngage_STATUSxSET': self.steering_control_engage,
'ACUsCTRL_SteeringWheel_ANGLExREQ': self.steering_wheel_angle_request})
self.per_bus.send(self.generate_message(data=data, frame_definition=self.steering_control_msg))
[docs] def steering_control_engange_callback(self, msg):
self.steering_control_engage = msg.engage
self.send_steering_msg()
[docs] def steering_wheel_angle_callback(self, msg):
self.steering_wheel_angle_request = msg.angle
self.send_steering_msg()
[docs] def service_brake_callback(self, msg):
data = self.service_brake_msg.encode({'ACUsCTRL_ServiceBrakeF_PRESSxREQ': msg.pressure,
'ACUsCTRL_ServiceBrakeR_PRESSxREQ': msg.pressure})
self.server_brake_pneumatic_pressure_request = msg.pressure
self.per_bus.send(self.generate_message(data=data, frame_definition=self.service_brake_msg))
[docs] def monitoring_status_callback(self, msg):
data = self.monitoring_status_msg.encode({'ACUsMON_Toggle': msg.toggle})
self.per_bus.send(self.generate_message(data=data, frame_definition=self.monitoring_status_msg))
[docs] def node_states_callback(self, msg: NodeState):
self.camera_state = msg.camera
self.lidar_state = msg.lidar
self.gps_state = msg.gps
self.perception_state = msg.perception
self.lidar_perception_state = msg.lidar_perception
self.local_mapping_state = msg.local_mapping
self.slam_state = msg.slam
self.filtering_state = msg.filtering
self.planning_state = msg.motion_planning
self.control_state = msg.control
self.monitoring_received_new = rospy.Time.now()
self.monitoring_state = 2
# def sensor_states_callback(self, msg: SensorStates):
# self.imu_state = msg.imu_state
# self.wheelspeed_front_state = msg.wheelspeed_front_state
# self.wheelspeed_rear_state = msg.wheelspeed_rear_state
[docs] def check_monitoring_state(self):
if(self.monitoring_state > 1 and self.monitoring_received_new == self.monitoring_received_old):
rospy.loginfo('[Monitoring] monitoring node stopped running.')
self.monitoring_state = 3
self.monitoring_received_old = self.monitoring_received_new
[docs] def mission_machine_state_callback(self, msg: NodeState):
self.mission_machine_state = msg.state
[docs] def send_dashboard_msg(self):
while not rospy.is_shutdown():
self.check_monitoring_state()
data = self.dsb_msg.encode({'ACUsMON_Camera_STATE': self.camera_state,
'ACUsMON_Lidar_STATE': self.lidar_state,
'ACUsMON_Gps_STATE': self.gps_state,
'ACUsMON_TrackFiltering_STATE': self.filtering_state,
'ACUsMON_Slam_STATE': self.slam_state,
'ACUsMON_Mpc_STATE': self.control_state,
#'ACUsMON_Mpc_STATE': self.wheelspeed_rear_state,
'ACUsMON_MotionPlanning_STATE': self.planning_state,
#'ACUsMON_MotionPlanning_STATE': self.wheelspeed_front_state,
'ACUsMON_Monitoring_STATE': self.monitoring_state,
'ACUsMON_LocalMapping_STATE': self.local_mapping_state,
'ACUsMON_LidarPerception_STATE': self.lidar_perception_state,
'ACUsMON_CameraPerception_STATE': self.lidar_perception_state,
'ACUsMM_AsMissionMachine_STATE': self.mission_machine_state})
#'ACUsMM_AsMissionMachine_STATE': self.imu_state})
self.per_bus.send(self.generate_message(data=data, frame_definition=self.dsb_msg))
self.dsb_rate.sleep()
[docs] def check_if_rosbag_recording_running(self):
return '/record_full_as' in rosnode.get_node_names()
[docs] def send_rosbag_record_status(self):
while not rospy.is_shutdown():
data = self.ros_recording_msg.encode({'ACUsCAN_RosbagRecording_STATUS': self.check_if_rosbag_recording_running()})
self.per_bus.send(self.generate_message(data=data, frame_definition=self.ros_recording_msg))
self.ros_recording_rate.sleep()
[docs] def start_rosbag_recording(self):
rospy.loginfo(f'[CanInterface] Start rosbag recording via launchfile')
uuid = roslaunch.rlutil.get_or_generate_uuid(None, False)
roslaunch.configure_logging(uuid)
rospack = rospkg.RosPack()
cli_args = [f"{rospack.get_path('utilities')}/launch/rosbag.launch", 'camera:=no']
self.rosbag_recording_process = roslaunch.parent.ROSLaunchParent(uuid, [(roslaunch.rlutil.resolve_launch_arguments(cli_args)[0], cli_args[1:])])
self.rosbag_recording_process.start()
[docs] def end_rosbag_recording(self):
rospy.loginfo(f'[CanInterface] End rosbag recording via launchfile')
self.rosbag_recording_process.shutdown()
[docs] def torque_callback(self, msg: Torque):
self.torque_request = msg.torque
self.send_engine_control_msg()
[docs] def m_per_s_to_rpm(self, velocity):
return ((60 * self.gear_ratio) / (2 * np.pi * self.wheel_radius)) * velocity
[docs] def velocity_callback(self, msg: Velocity):
self.velocity_request = self.m_per_s_to_rpm(velocity=msg.velocity)
self.send_engine_control_msg()
[docs] def slam_velocity_callback(self, msg: VehiclePose):
self.slam_velocity = msg.v_x
[docs] def actual_torque_callback(self, msg: Torque):
self.torque_actual = msg.torque
[docs] def send_dv_driving_diagnostics_1(self):
rospy.loginfo('[CAN Node] Start publishing dv driving dynamics 1 to the data logger.')
while not rospy.is_shutdown():
data = self.dv_driving_diagnostics_1_msg.encode(
{'ACUsCTRL_Ackermann_ANGLExREQ': max(-128, min(self.steering_wheel_angle_request, 127)),
'ACUsACTC_Ackermann_ANGLExACT': max(-128, min(self.steering_wheel_angle_actual, 127)),
'ACUsLOG_Vehicle_VELOxREQ': max(0, min(255, self.velocity_request * 3.6)),
'ACUsSLAM_Vehicle_VELOxACT': max(0, min(255, self.slam_velocity * 3.6)),
'ACUsLOG_Engine_TORQUExREQ': max(0, min(self.torque_request * 100 / 240, 100)),
'ACUsVCU_Engine_TORQUExACT': max(0, min(self.torque_actual * 100 / 240, 100)),
'ACUsCTRL_HydrBrake_PRESSxREQ': max(0, min(100, self.server_brake_pneumatic_pressure_request * 100 / 12)),
'ACUsASBC_HydrBrake_PRESSxACT': max(0, min(100, (self.pneumatic_pressure_override_front * 100 / 12 + \
self.pneumatic_pressure_override_rear * 100 / 12) / 2))}, strict=False)
self.log_bus.send(self.generate_message(data=data, frame_definition=self.dv_driving_diagnostics_1_msg))
self.dv_driving_diagnostics_1_rate.sleep()
[docs] def send_dv_driving_diagnostics_2(self):
rospy.loginfo('[CAN Node] Start publishing dv driving dynamics 2 to the data logger.')
while not rospy.is_shutdown():
data = self.dv_driving_diagnostics_2_msg.encode({'ACUsIMU_Z_ANGLERATE': max(-256, min(self.gyro_z, 255)),
'ACUsIMU_X_ACCEL': max(-64, min(self.imu_accel_x, 63)),
'ACUsIMU_Y_ACCEL': max(-64, min(self.imu_accel_y, 63))})
self.log_bus.send(self.generate_message(data=data, frame_definition=self.dv_driving_diagnostics_2_msg))
self.dv_driving_diagnostics_2_rate.sleep()
[docs] def lap_count_callback(self, msg: LapCount):
self.lap_count = msg.laps
[docs] def slam_cones_count_callback(self, msg: ConesCountAll):
self.cones_all_count = msg.count
[docs] def local_mapping_cones_count_callback(self, msg: ConesCountActual):
self.cones_actual_count = msg.count
[docs] def send_dv_system_diagnostics(self):
rospy.loginfo('[CAN Node] Start publishing dv system diagnostics to the data logger.')
while not rospy.is_shutdown():
# set service brake state based on 2020 rules and AS state
if self.as_state == AutonomousSystemStateEnum.DRIVING.value:
service_brake_state = ServiceBrakeStateEnum.AVAILABLE.value
elif self.as_state == AutonomousSystemStateEnum.READY.value or self.as_state == AutonomousSystemStateEnum.FINISHED.value:
service_brake_state = ServiceBrakeStateEnum.ENGAGED.value
else:
service_brake_state = ServiceBrakeStateEnum.DISENGAGED.value
data = self.dv_system_diagnostics_msg.encode({'ACUsVCU_AutonomousSystem_STATE': max(1, min(self.as_state, 5)),
'ACUsLOG_Ebs_STATE': max(1, min(self.ebs_state, 3)),
'ACUsDSB_AutonomousMission_STATE': max(1, min(self.autonomous_mission, 6)),
'ACUsMM_ActcControlEngage_STATUS': max(0, min(self.steering_control_engage, 1)),
'ACUsLOG_ServiceBrake_STATE': max(1, min(service_brake_state, 3)),
'ACUsFLD_Lap_COUNT': max(0, min(self.lap_count, 255)),
'ACUsLM_ActualCones_COUNT': max(0, min(self.cones_actual_count, 255)),
'ACUsSLAM_AllCones_COUNT': max(0, min(self.cones_all_count, 131071))})
self.log_bus.send(self.generate_message(data=data, frame_definition=self.dv_system_diagnostics_msg))
self.dv_system_diagnostics_rate.sleep()
[docs] def velocity_torque_switch_callback(self, msg: VelocityTorqueSwitch):
self.use_velocity = msg.use_velocity
self.send_engine_control_msg()
[docs] def send_engine_control_msg(self):
data = self.engine_control_msg.encode({'ACUsMM_EngineCtrlType_STATUSxSET': not self.use_velocity,
'ACUsCTRL_Engine_TORQUExREQ': self.torque_request,
'ACUsCTRL_Engine_SPEEDxREQ': self.velocity_request})
self.log_bus.send(self.generate_message(data=data, frame_definition=self.engine_control_msg))
[docs] def mission_finished_callback(self, msg: MissionFinished):
data = self.mission_finished_msg.encode({'ACUsMM_MissionFinished_STATUS': msg.finished})
self.log_bus.send(self.generate_message(data=data, frame_definition=self.mission_finished_msg))
[docs] def main_thread(self):
while not rospy.is_shutdown():
self.rosbag_recording_event.wait()
self.rosbag_recording_event.clear()
if self.log_button_state == 2:
if self.check_if_rosbag_recording_running() is True:
self.end_rosbag_recording()
elif self.log_button_state == 1:
if self.check_if_rosbag_recording_running() is False:
self.start_rosbag_recording()
if __name__ == '__main__':
rospy.init_node('can_interface', anonymous=True)
try:
rospack = rospkg.RosPack()
per_dbc_path = rospack.get_path('can_interface') + '/dbcs/per/PER.dbc'
log_dbc_path = rospack.get_path('can_interface') + '/dbcs/log/LOG.dbc'
CanBridge(path_to_log_dbc=log_dbc_path, path_to_per_dbc=per_dbc_path)
rospy.spin()
except Exception as e:
if not isinstance(e, rospy.exceptions.ROSInterruptException):
rospy.logfatal(traceback.format_exc())
time.sleep(2)
raise e