Source code for can_interface

#!/usr/bin/env python3
import threading
import numpy as np
import can
import cantools
from enum import Enum
import rospy
import rospkg
import roslaunch
import rosnode
import time
import traceback

from can_msgs.msg import Frame
from as_can_msgs.msg import LapCount, ConesCountAll, ConesCountActual, ServiceBrakeState, SteeringControlEngage, \
                            AutonomousSystemState, AutonomousMission, Velocity, MissionFinished, FuseStatus, \
                            ShutdownUnitState, BrakePressure, PneumaticPressure, VelocityTorqueSwitch, Torque, \
                            ImuGyro, ImuAcceleration, SteeringWheelAngle, ResStatus, Wheelspeed, \
                            CheckupSequenceState, MonitoringStatus, NodeState, MissionMachineState
from utilities.msg import VehiclePose                            


[docs]class AutonomousSystemStateEnum(Enum): STARTUP = 0 OFF = 1 READY = 2 DRIVING = 3 EMERGENCY = 4 FINISHED = 5
[docs]class ServiceBrakeStateEnum(Enum): DISENGAGED = 1 ENGAGED = 2 AVAILABLE = 3
[docs]class CanBridge(): def __init__(self, path_to_log_dbc: str, path_to_per_dbc: str) -> None: self.per: cantools.db.Database = cantools.database.load_file(path_to_per_dbc) self.log: cantools.db.Database = cantools.database.load_file(path_to_log_dbc) self.log_bus = can.interface.Bus(rospy.get_param('/can/log/log/can_device'), bustype='socketcan') self.per_bus = can.interface.Bus(rospy.get_param('/can/per/per/can_device'), bustype='socketcan') self.setup_variables() self.setup_messages() self.setup_log_rx() self.setup_per_rx() self.setup_per_tx() self.setup_log_tx() self.main_thread()
[docs] def setup_messages(self): # per tx self.steering_control_msg: cantools.db.Message = self.per.get_message_by_name('ACUsCTRL_ActcControl') self.service_brake_msg: cantools.db.Message = self.per.get_message_by_name('ACUsCTRL_ServiceBrake_PRESSxREQ') self.monitoring_status_msg: cantools.db.Message = self.per.get_message_by_name('ACUsMON_STATE') self.dsb_msg: cantools.db.Message = self.per.get_message_by_name('ACUsMON_RosNodes_STATE') self.ros_recording_msg: cantools.db.Message = self.per.get_message_by_name('ACUsCAN_RosbagRecording_STATUS') # log tx self.dv_driving_diagnostics_1_msg: cantools.db.Message = self.log.get_message_by_name('ACU_DvDrivingDiagnostics1') self.dv_driving_diagnostics_2_msg: cantools.db.Message = self.log.get_message_by_name('ACU_DvDrivingDiagnostics2') self.dv_system_diagnostics_msg: cantools.db.Message = self.log.get_message_by_name('ACU_DvSystemDiagnostics') self.engine_control_msg: cantools.db.Message = self.log.get_message_by_name('ACUsCTRL_EngineControl') self.mission_finished_msg: cantools.db.Message = self.log.get_message_by_name('ACUsMM_MissionFinished_STATUS')
[docs] def setup_variables(self): # per rx self.steering_wheel_angle_actual = 0 self.pneumatic_pressure_override_front = 0 self.pneumatic_pressure_override_rear = 0 self.hydraulic_pressure_front = 0 self.hydraulic_pressure_rear = 0 self.autonomous_mission = 1 # first possible state: Acceleration self.imu_accel_x = 0 self.imu_accel_y = 0 self.gyro_z = 0 self.sdu_state = 1 # open self.sdu_as_status = 0 # idle self.asms_status = 0 # blown self.as_state = 1 # AS off self.ebs_state = 1 # unavailable self.last_log_button_state = 0 # not pressed self.log_button_state = 0 # not pressed # per tx self.camera_state = 0 self.lidar_state = 0 self.gps_state = 0 self.filtering_state = 0 self.slam_state = 0 self.control_state = 0 self.planning_state = 0 self.monitoring_state = 0 if('/monitoring' in rosnode.get_node_names()): self.monitoring_state = 1 self.local_mapping_state = 0 self.lidar_perception_state = 0 self.perception_state = 0 self.mission_machine_state = 0 self.imu_state = 0 self.wheelspeed_front_state = 0 self.wheelspeed_rear_state = 0 self.steering_control_engage = False self.steering_wheel_angle_request = 0 # log tx self.torque_request = 0 self.wheel_radius = rospy.get_param('/vehicle/wheel_radius') self.gear_ratio = rospy.get_param('/vehicle/rear_gear_ratio') self.velocity_request = 0 self.slam_velocity = 0 self.torque_actual = 0 self.lap_count = 0 self.cones_all_count = 0 self.cones_actual_count = 0 self.use_velocity = True
[docs] def setup_per_rx(self): self.per_receiving_frame_names= ['ACTC_Measurements', 'ASBCsSENS_PneumaticFrontxACT', 'ASBCsSENS_PneumaticRearxACT', 'ASBCsCUS_CheckupSequence_STATE', 'BPCF_PRESS', 'BPCR_PRESS', 'DSB_AutonomousMission_STATExSET', 'IMU_ACCEL', 'IMU_ANGLERATE', 'SDU', 'SUP_Fuse_STATUS', 'VCU_Statemachine_STATE', 'WB_WheelF_SPEED', 'DSB_Buttons_STATE'] self.steering_wheel_angle_pub = rospy.Publisher('/can/per/steering_wheel_angle/actual', SteeringWheelAngle, queue_size=10) self.pneumatic_pressure_front_pub = rospy.Publisher('/can/per/pneumatic_pressure/front', PneumaticPressure, queue_size=10) self.pneumatic_pressure_rear_pub = rospy.Publisher('/can/per/pneumatic_pressure/rear', PneumaticPressure, queue_size=10) self.checkup_sequence_pub = rospy.Publisher('/can/per/checkup_sequence_state', CheckupSequenceState, queue_size=10) self.hydraulic_pressure_front_pub = rospy.Publisher('/can/per/hydraulic_pressure/front', BrakePressure, queue_size=10) self.hydraulic_pressure_rear_pub = rospy.Publisher('/can/per/hydraulic_pressure/rear', BrakePressure, queue_size=10) self.autonomous_mission_pub = rospy.Publisher('/can/per/autonomous_mission', AutonomousMission, queue_size=10) self.imu_accel_pub = rospy.Publisher('/can/per/imu/accel', ImuAcceleration, queue_size=10) self.imu_gyro_pub = rospy.Publisher('/can/per/imu/gyro', ImuGyro, queue_size=10) self.sdu_pub = rospy.Publisher('/can/per/sdu_state', ShutdownUnitState, queue_size=10) self.asms_pub = rospy.Publisher('/can/per/asms_status', FuseStatus, queue_size=10) self.autonomous_driving_pub = rospy.Publisher('/can/per/as_state', AutonomousSystemState, queue_size=10) self.wheelspeed_pub = rospy.Publisher('/can/per/wheelspeed/front', Wheelspeed, queue_size=10) rospy.Subscriber('/can/per/received_messages', Frame, self.per_frame_received_callback)
[docs] def setup_log_rx(self): self.log_receiving_frame_names = ['RES_Status', 'VCUsINVx_Diagnostics'] self.res_pub = rospy.Publisher('/can/log/res_status', ResStatus, queue_size=10) self.wheelspeed_rear_pub = rospy.Publisher('/can/log/wheelspeed/rear', Wheelspeed, queue_size=10) self.torque_pub = rospy.Publisher('/can/log/torque/actual', Torque, queue_size=10) rospy.Subscriber('/can/log/received_messages', Frame, self.log_frame_received_callback)
[docs] def setup_per_tx(self): rospy.Subscriber('/control/steering_wheel_angle/request', SteeringWheelAngle, self.steering_wheel_angle_callback) rospy.Subscriber('/mission_machine/steering_control_engage', SteeringControlEngage, self.steering_control_engange_callback) self.server_brake_pneumatic_pressure_request = 0 rospy.Subscriber('/control/service_brake/request', BrakePressure, self.service_brake_callback) rospy.Subscriber('/monitoring/status', MonitoringStatus, self.monitoring_status_callback) # dashboard node status callbacks self.monitoring_received_new = rospy.Time(secs=0) self.monitoring_received_old = rospy.Time(secs=0) rospy.Subscriber('/monitoring/node_states', NodeState, self.node_states_callback) #rospy.Subscriber('/monitoring/sensor_states', SensorStates, self.sensor_states_callback) rospy.Subscriber('/mission_machine/state', MissionMachineState, self.mission_machine_state_callback) self.dsb_rate = rospy.Rate(10) # 100 ms cyclic time self.dsb_publisher = threading.Thread(target=self.send_dashboard_msg, daemon=True) self.dsb_publisher.start() self.rosbag_recording_process = None self.rosbag_recording_event = threading.Event() self.ros_recording_rate = rospy.Rate(10) # 100 ms cyclic time self.ros_recording_thread = threading.Thread(target=self.send_rosbag_record_status, daemon=True) self.ros_recording_thread.start()
[docs] def setup_log_tx(self): # torque rospy.Subscriber('/control/torque/request', Torque, self.torque_callback) # velocity rospy.Subscriber('/control/velocity/request', Velocity, self.velocity_callback) # vehicle velocity from slam rospy.Subscriber('/slam/vehicle_pose', VehiclePose, self.slam_velocity_callback) # actual torque rospy.Subscriber('/can/log/torque/actual', Torque, self.actual_torque_callback) # dv driving diagnostics 1 self.dv_driving_diagnostics_1_rate = rospy.Rate(10) # 100 ms cyclic time self.dv_driving_diagnostics_1_publisher = threading.Thread(target=self.send_dv_driving_diagnostics_1, daemon=True) self.dv_driving_diagnostics_1_publisher.start() # dv driving diagnostics 2 self.dv_driving_diagnostics_2_rate = rospy.Rate(10) # 100 ms cyclic time self.dv_driving_diagnostics_2_publisher = threading.Thread(target=self.send_dv_driving_diagnostics_2, daemon=True) self.dv_driving_diagnostics_2_publisher.start() # lap count rospy.Subscriber('/finish_line_detection/lap_count', LapCount, self.lap_count_callback) # cone counts rospy.Subscriber('/slam/cone_count', ConesCountAll, self.slam_cones_count_callback) rospy.Subscriber('/local_mapping/cone_count', ConesCountActual, self.local_mapping_cones_count_callback) # dv system diagnostics self.dv_system_diagnostics_rate = rospy.Rate(10) # 100 ms cyclic time self.dv_system_diagnostics_publisher = threading.Thread(target=self.send_dv_system_diagnostics, daemon=True) self.dv_system_diagnostics_publisher.start() # engine control msg # switch between torque and velocity from mission machine rospy.Subscriber('/mission_machine/use_velocity', VelocityTorqueSwitch, self.velocity_torque_switch_callback) rospy.Subscriber('/mission_machine/finished', MissionFinished, self.mission_finished_callback)
[docs] def generate_message(self, data, frame_definition): """ Used for sending it on the can device directly. """ return can.Message(arbitration_id=frame_definition.frame_id, data=data, is_extended_id=frame_definition.is_extended_frame)
[docs] def generate_frame(self, data, frame_definition): """ Used for sending it on the can ros bridge. """ frame = Frame() frame.header.stamp = rospy.Time.now() frame.data = data frame.dlc = frame_definition.length frame.is_extended = frame_definition.is_extended_frame frame.is_error = False frame.is_rtr = False frame.id = frame_definition.frame_id return frame
[docs] def log_frame_received_callback(self, received_frame: Frame): try: dbc_frame: cantools.database.can.Message = self.log.get_message_by_frame_id(frame_id=received_frame.id) except KeyError: # there is no CAN Frame specified within the dbc file with this id, ignore this message return if dbc_frame.name not in self.log_receiving_frame_names: # frame is not of interest for the ACU. return try: # interprete the current frame according to the dbc file interpreted_signals = self.log.decode_message(received_frame.id, received_frame.data, decode_choices=False) except Exception: rospy.loginfo(f'Cannot interprete log can message with id={received_frame.id} and data={received_frame.data}') return if dbc_frame.name == 'RES_Status': res_message = ResStatus() res_message.header.stamp = received_frame.header.stamp res_message.switch = interpreted_signals.get('RES_Switch_STATUS') res_message.button = interpreted_signals.get('RES_Button_STATUS') res_message.emergency_button = interpreted_signals.get('RES_EmergencyButton0_STATUS') self.res_pub.publish(res_message) elif dbc_frame.name == 'VCUsINVx_Diagnostics': wheelspeed_msg = Wheelspeed() wheelspeed_msg.header.stamp = received_frame.header.stamp wheelspeed_msg.left = interpreted_signals.get('VCUsINVL_EngineRL_SPEED') wheelspeed_msg.right = interpreted_signals.get('VCUsINVR_EngineRR_SPEED') self.wheelspeed_rear_pub.publish(wheelspeed_msg) torque_msg = Torque() torque_msg.header.stamp = received_frame.header.stamp torque_msg.torque = interpreted_signals.get('VCUsINV_ActualTorque_CURR') / 0.64 # 0.64 is the scaling factor between torque and current self.torque_pub.publish(torque_msg)
[docs] def per_frame_received_callback(self, received_frame): try: dbc_frame: cantools.database.can.Message = self.per.get_message_by_frame_id(frame_id=received_frame.id) except KeyError: # there is no CAN Frame specified within the dbc file with this id, ignore this message return if dbc_frame.name not in self.per_receiving_frame_names: # frame is not of interest for the ACU. return try: # interprete the current frame according to the dbc file interpreted_signals = self.per.decode_message(received_frame.id, received_frame.data, decode_choices=False) except Exception: rospy.loginfo(f'Cannot interprete can message with id={received_frame.id} and data={received_frame.data}') return # switch case which message got received and publish the according data on the according ros topic if dbc_frame.name == 'ACTC_Measurements': steering_wheel_angle_msg = SteeringWheelAngle() steering_wheel_angle_msg.header.stamp = received_frame.header.stamp self.steering_wheel_angle_actual = interpreted_signals.get('ACTC_SteeringWheel_ANGLExACT') steering_wheel_angle_msg.angle = self.steering_wheel_angle_actual self.steering_wheel_angle_pub.publish(steering_wheel_angle_msg) elif dbc_frame.name == 'ASBCsSENS_PneumaticFrontxACT': pneumatic_pressure_front_msg = PneumaticPressure() pneumatic_pressure_front_msg.header.stamp = received_frame.header.stamp pneumatic_pressure_front_msg.pneumatic_pressure = interpreted_signals.get('ASBCsSENS_Front_Pressure_PRESS') pneumatic_pressure_front_msg.SB_pressure = interpreted_signals.get('ASBCsSENS_Front_SB_PRESS') pneumatic_pressure_front_msg.tank_pressure = interpreted_signals.get('ASBCsSENS_Front_Tank_PRESS') self.pneumatic_pressure_override_front = interpreted_signals.get('ASBCsSENS_Front_Override_PRESS') pneumatic_pressure_front_msg.override_pressure = self.pneumatic_pressure_override_front self.pneumatic_pressure_front_pub.publish(pneumatic_pressure_front_msg) elif dbc_frame.name == 'ASBCsSENS_PneumaticRearxACT': pneumatic_pressure_rear_msg = PneumaticPressure() pneumatic_pressure_rear_msg.header.stamp = received_frame.header.stamp pneumatic_pressure_rear_msg.pneumatic_pressure = interpreted_signals.get('ASBCsSENS_Rear_Pressure_PRESS') pneumatic_pressure_rear_msg.SB_pressure = interpreted_signals.get('ASBCsSENS_Rear_SB_PRESS') pneumatic_pressure_rear_msg.tank_pressure = interpreted_signals.get('ASBCsSENS_Rear_Tank_PRESS') self.pneumatic_pressure_override_rear = interpreted_signals.get('ASBCsSENS_Rear_Override_PRESS') pneumatic_pressure_rear_msg.override_pressure = self.pneumatic_pressure_override_rear self.pneumatic_pressure_rear_pub.publish(pneumatic_pressure_rear_msg) elif dbc_frame.name == 'ASBCsCUS_CheckupSequence_STATE': # rospy.loginfo(f'Received CUS state {interpreted_signals.get("ASBCsCUS_Cus_STATE")}') checkup_sequence_msg = CheckupSequenceState() checkup_sequence_msg.header.stamp = received_frame.header.stamp checkup_sequence_msg.state = interpreted_signals.get('ASBCsCUS_Cus_STATE') self.checkup_sequence_pub.publish(checkup_sequence_msg) elif dbc_frame.name == 'BPCF_PRESS': brake_pressure_front_msg = BrakePressure() brake_pressure_front_msg.header.stamp = received_frame.header.stamp self.hydraulic_pressure_front = interpreted_signals.get('BPCF_BrakeSystemF_PRESS') brake_pressure_front_msg.pressure = self.hydraulic_pressure_front self.hydraulic_pressure_front_pub.publish(brake_pressure_front_msg) elif dbc_frame.name == 'BPCR_PRESS': brake_pressure_rear_msg = BrakePressure() brake_pressure_rear_msg.header.stamp = received_frame.header.stamp self.hydraulic_pressure_rear = interpreted_signals.get('BPCR_BrakeSystemR_PRESS') brake_pressure_rear_msg.pressure = self.hydraulic_pressure_rear self.hydraulic_pressure_rear_pub.publish(brake_pressure_rear_msg) elif dbc_frame.name == 'DSB_AutonomousMission_STATExSET': autonomous_mission_msg = AutonomousMission() autonomous_mission_msg.header.stamp = received_frame.header.stamp self.autonomous_mission = interpreted_signals.get('DSB_AutonomousMission_STATExSET') autonomous_mission_msg.mission = self.autonomous_mission self.autonomous_mission_pub.publish(autonomous_mission_msg) elif dbc_frame.name == 'IMU_ACCEL': imu_acceleration_msg = ImuAcceleration() imu_acceleration_msg.header.stamp = received_frame.header.stamp self.imu_accel_x = interpreted_signals.get('IMU_X_ACCEL') imu_acceleration_msg.x = self.imu_accel_x self.imu_accel_y = interpreted_signals.get('IMU_Y_ACCEL') imu_acceleration_msg.y = self.imu_accel_y imu_acceleration_msg.z = interpreted_signals.get('IMU_Z_ACCEL') self.imu_accel_pub.publish(imu_acceleration_msg) elif dbc_frame.name == 'IMU_ANGLERATE': imu_gyro_msg = ImuGyro() imu_gyro_msg.header.stamp = received_frame.header.stamp imu_gyro_msg.roll = interpreted_signals.get('IMU_X_ANGLERATE') imu_gyro_msg.pitch = interpreted_signals.get('IMU_Y_ANGLERATE') self.gyro_z = interpreted_signals.get('IMU_Z_ANGLERATE') imu_gyro_msg.yaw = self.gyro_z self.imu_gyro_pub.publish(imu_gyro_msg) elif dbc_frame.name == 'SDU': sdu_msg = ShutdownUnitState() sdu_msg.header.stamp = received_frame.header.stamp sdu_msg.state = interpreted_signals.get('SDU_StateMachine_STATE') sdu_msg.as_status = interpreted_signals.get('SDU_SdcAs_STATUS') self.sdu_pub.publish(sdu_msg) elif dbc_frame.name == 'SUP_Fuse_STATUS': asms_msg = FuseStatus() asms_msg.header.stamp = received_frame.header.stamp self.asms_status = interpreted_signals.get('SUP_AsmsFuse_STATUS') asms_msg.intact = self.asms_status self.asms_pub.publish(asms_msg) elif dbc_frame.name == 'VCU_Statemachine_STATE': as_state_msg = AutonomousSystemState() as_state_msg.header.stamp = received_frame.header.stamp self.as_state = interpreted_signals.get('VCUsSM_AutonomousSystem_STATE') as_state_msg.as_state = self.as_state self.ebs_state = interpreted_signals.get('VCUsSM_Ebs_STATE') as_state_msg.ebs_state = self.ebs_state self.autonomous_driving_pub.publish(as_state_msg) elif dbc_frame.name == 'WB_WheelF_SPEED': wheelspeed_msg = Wheelspeed() wheelspeed_msg.header.stamp = received_frame.header.stamp wheelspeed_msg.left = interpreted_signals.get('WB_WheelFL_SPEED') wheelspeed_msg.right = interpreted_signals.get('WB_WheelFR_SPEED') self.wheelspeed_pub.publish(wheelspeed_msg) elif dbc_frame.name == 'DSB_Buttons_STATE': self.handle_log_button_state(interpreted_signals.get('DSB_LogButton_STATE'))
[docs] def send_steering_msg(self): data = self.steering_control_msg.encode({'ACU_ControlEngage_STATUSxSET': self.steering_control_engage, 'ACUsCTRL_SteeringWheel_ANGLExREQ': self.steering_wheel_angle_request}) self.per_bus.send(self.generate_message(data=data, frame_definition=self.steering_control_msg))
[docs] def steering_control_engange_callback(self, msg): self.steering_control_engage = msg.engage self.send_steering_msg()
[docs] def steering_wheel_angle_callback(self, msg): self.steering_wheel_angle_request = msg.angle self.send_steering_msg()
[docs] def service_brake_callback(self, msg): data = self.service_brake_msg.encode({'ACUsCTRL_ServiceBrakeF_PRESSxREQ': msg.pressure, 'ACUsCTRL_ServiceBrakeR_PRESSxREQ': msg.pressure}) self.server_brake_pneumatic_pressure_request = msg.pressure self.per_bus.send(self.generate_message(data=data, frame_definition=self.service_brake_msg))
[docs] def monitoring_status_callback(self, msg): data = self.monitoring_status_msg.encode({'ACUsMON_Toggle': msg.toggle}) self.per_bus.send(self.generate_message(data=data, frame_definition=self.monitoring_status_msg))
[docs] def node_states_callback(self, msg: NodeState): self.camera_state = msg.camera self.lidar_state = msg.lidar self.gps_state = msg.gps self.perception_state = msg.perception self.lidar_perception_state = msg.lidar_perception self.local_mapping_state = msg.local_mapping self.slam_state = msg.slam self.filtering_state = msg.filtering self.planning_state = msg.motion_planning self.control_state = msg.control self.monitoring_received_new = rospy.Time.now() self.monitoring_state = 2
# def sensor_states_callback(self, msg: SensorStates): # self.imu_state = msg.imu_state # self.wheelspeed_front_state = msg.wheelspeed_front_state # self.wheelspeed_rear_state = msg.wheelspeed_rear_state
[docs] def check_monitoring_state(self): if(self.monitoring_state > 1 and self.monitoring_received_new == self.monitoring_received_old): rospy.loginfo('[Monitoring] monitoring node stopped running.') self.monitoring_state = 3 self.monitoring_received_old = self.monitoring_received_new
[docs] def mission_machine_state_callback(self, msg: NodeState): self.mission_machine_state = msg.state
[docs] def send_dashboard_msg(self): while not rospy.is_shutdown(): self.check_monitoring_state() data = self.dsb_msg.encode({'ACUsMON_Camera_STATE': self.camera_state, 'ACUsMON_Lidar_STATE': self.lidar_state, 'ACUsMON_Gps_STATE': self.gps_state, 'ACUsMON_TrackFiltering_STATE': self.filtering_state, 'ACUsMON_Slam_STATE': self.slam_state, 'ACUsMON_Mpc_STATE': self.control_state, #'ACUsMON_Mpc_STATE': self.wheelspeed_rear_state, 'ACUsMON_MotionPlanning_STATE': self.planning_state, #'ACUsMON_MotionPlanning_STATE': self.wheelspeed_front_state, 'ACUsMON_Monitoring_STATE': self.monitoring_state, 'ACUsMON_LocalMapping_STATE': self.local_mapping_state, 'ACUsMON_LidarPerception_STATE': self.lidar_perception_state, 'ACUsMON_CameraPerception_STATE': self.lidar_perception_state, 'ACUsMM_AsMissionMachine_STATE': self.mission_machine_state}) #'ACUsMM_AsMissionMachine_STATE': self.imu_state}) self.per_bus.send(self.generate_message(data=data, frame_definition=self.dsb_msg)) self.dsb_rate.sleep()
[docs] def check_if_rosbag_recording_running(self): return '/record_full_as' in rosnode.get_node_names()
[docs] def send_rosbag_record_status(self): while not rospy.is_shutdown(): data = self.ros_recording_msg.encode({'ACUsCAN_RosbagRecording_STATUS': self.check_if_rosbag_recording_running()}) self.per_bus.send(self.generate_message(data=data, frame_definition=self.ros_recording_msg)) self.ros_recording_rate.sleep()
[docs] def start_rosbag_recording(self): rospy.loginfo(f'[CanInterface] Start rosbag recording via launchfile') uuid = roslaunch.rlutil.get_or_generate_uuid(None, False) roslaunch.configure_logging(uuid) rospack = rospkg.RosPack() cli_args = [f"{rospack.get_path('utilities')}/launch/rosbag.launch", 'camera:=no'] self.rosbag_recording_process = roslaunch.parent.ROSLaunchParent(uuid, [(roslaunch.rlutil.resolve_launch_arguments(cli_args)[0], cli_args[1:])]) self.rosbag_recording_process.start()
[docs] def end_rosbag_recording(self): rospy.loginfo(f'[CanInterface] End rosbag recording via launchfile') self.rosbag_recording_process.shutdown()
[docs] def handle_log_button_state(self, state: int): if self.last_log_button_state != state: self.log_button_state = state self.rosbag_recording_event.set() self.last_log_button_state = state
[docs] def torque_callback(self, msg: Torque): self.torque_request = msg.torque self.send_engine_control_msg()
[docs] def m_per_s_to_rpm(self, velocity): return ((60 * self.gear_ratio) / (2 * np.pi * self.wheel_radius)) * velocity
[docs] def velocity_callback(self, msg: Velocity): self.velocity_request = self.m_per_s_to_rpm(velocity=msg.velocity) self.send_engine_control_msg()
[docs] def slam_velocity_callback(self, msg: VehiclePose): self.slam_velocity = msg.v_x
[docs] def actual_torque_callback(self, msg: Torque): self.torque_actual = msg.torque
[docs] def send_dv_driving_diagnostics_1(self): rospy.loginfo('[CAN Node] Start publishing dv driving dynamics 1 to the data logger.') while not rospy.is_shutdown(): data = self.dv_driving_diagnostics_1_msg.encode( {'ACUsCTRL_Ackermann_ANGLExREQ': max(-128, min(self.steering_wheel_angle_request, 127)), 'ACUsACTC_Ackermann_ANGLExACT': max(-128, min(self.steering_wheel_angle_actual, 127)), 'ACUsLOG_Vehicle_VELOxREQ': max(0, min(255, self.velocity_request * 3.6)), 'ACUsSLAM_Vehicle_VELOxACT': max(0, min(255, self.slam_velocity * 3.6)), 'ACUsLOG_Engine_TORQUExREQ': max(0, min(self.torque_request * 100 / 240, 100)), 'ACUsVCU_Engine_TORQUExACT': max(0, min(self.torque_actual * 100 / 240, 100)), 'ACUsCTRL_HydrBrake_PRESSxREQ': max(0, min(100, self.server_brake_pneumatic_pressure_request * 100 / 12)), 'ACUsASBC_HydrBrake_PRESSxACT': max(0, min(100, (self.pneumatic_pressure_override_front * 100 / 12 + \ self.pneumatic_pressure_override_rear * 100 / 12) / 2))}, strict=False) self.log_bus.send(self.generate_message(data=data, frame_definition=self.dv_driving_diagnostics_1_msg)) self.dv_driving_diagnostics_1_rate.sleep()
[docs] def send_dv_driving_diagnostics_2(self): rospy.loginfo('[CAN Node] Start publishing dv driving dynamics 2 to the data logger.') while not rospy.is_shutdown(): data = self.dv_driving_diagnostics_2_msg.encode({'ACUsIMU_Z_ANGLERATE': max(-256, min(self.gyro_z, 255)), 'ACUsIMU_X_ACCEL': max(-64, min(self.imu_accel_x, 63)), 'ACUsIMU_Y_ACCEL': max(-64, min(self.imu_accel_y, 63))}) self.log_bus.send(self.generate_message(data=data, frame_definition=self.dv_driving_diagnostics_2_msg)) self.dv_driving_diagnostics_2_rate.sleep()
[docs] def lap_count_callback(self, msg: LapCount): self.lap_count = msg.laps
[docs] def slam_cones_count_callback(self, msg: ConesCountAll): self.cones_all_count = msg.count
[docs] def local_mapping_cones_count_callback(self, msg: ConesCountActual): self.cones_actual_count = msg.count
[docs] def send_dv_system_diagnostics(self): rospy.loginfo('[CAN Node] Start publishing dv system diagnostics to the data logger.') while not rospy.is_shutdown(): # set service brake state based on 2020 rules and AS state if self.as_state == AutonomousSystemStateEnum.DRIVING.value: service_brake_state = ServiceBrakeStateEnum.AVAILABLE.value elif self.as_state == AutonomousSystemStateEnum.READY.value or self.as_state == AutonomousSystemStateEnum.FINISHED.value: service_brake_state = ServiceBrakeStateEnum.ENGAGED.value else: service_brake_state = ServiceBrakeStateEnum.DISENGAGED.value data = self.dv_system_diagnostics_msg.encode({'ACUsVCU_AutonomousSystem_STATE': max(1, min(self.as_state, 5)), 'ACUsLOG_Ebs_STATE': max(1, min(self.ebs_state, 3)), 'ACUsDSB_AutonomousMission_STATE': max(1, min(self.autonomous_mission, 6)), 'ACUsMM_ActcControlEngage_STATUS': max(0, min(self.steering_control_engage, 1)), 'ACUsLOG_ServiceBrake_STATE': max(1, min(service_brake_state, 3)), 'ACUsFLD_Lap_COUNT': max(0, min(self.lap_count, 255)), 'ACUsLM_ActualCones_COUNT': max(0, min(self.cones_actual_count, 255)), 'ACUsSLAM_AllCones_COUNT': max(0, min(self.cones_all_count, 131071))}) self.log_bus.send(self.generate_message(data=data, frame_definition=self.dv_system_diagnostics_msg)) self.dv_system_diagnostics_rate.sleep()
[docs] def velocity_torque_switch_callback(self, msg: VelocityTorqueSwitch): self.use_velocity = msg.use_velocity self.send_engine_control_msg()
[docs] def send_engine_control_msg(self): data = self.engine_control_msg.encode({'ACUsMM_EngineCtrlType_STATUSxSET': not self.use_velocity, 'ACUsCTRL_Engine_TORQUExREQ': self.torque_request, 'ACUsCTRL_Engine_SPEEDxREQ': self.velocity_request}) self.log_bus.send(self.generate_message(data=data, frame_definition=self.engine_control_msg))
[docs] def mission_finished_callback(self, msg: MissionFinished): data = self.mission_finished_msg.encode({'ACUsMM_MissionFinished_STATUS': msg.finished}) self.log_bus.send(self.generate_message(data=data, frame_definition=self.mission_finished_msg))
[docs] def main_thread(self): while not rospy.is_shutdown(): self.rosbag_recording_event.wait() self.rosbag_recording_event.clear() if self.log_button_state == 2: if self.check_if_rosbag_recording_running() is True: self.end_rosbag_recording() elif self.log_button_state == 1: if self.check_if_rosbag_recording_running() is False: self.start_rosbag_recording()
if __name__ == '__main__': rospy.init_node('can_interface', anonymous=True) try: rospack = rospkg.RosPack() per_dbc_path = rospack.get_path('can_interface') + '/dbcs/per/PER.dbc' log_dbc_path = rospack.get_path('can_interface') + '/dbcs/log/LOG.dbc' CanBridge(path_to_log_dbc=log_dbc_path, path_to_per_dbc=per_dbc_path) rospy.spin() except Exception as e: if not isinstance(e, rospy.exceptions.ROSInterruptException): rospy.logfatal(traceback.format_exc()) time.sleep(2) raise e