#!/usr/bin/python3r"""SLAM ROS Module to track and estimate vehicle pose and track defined by landmarks in form of cones.Vehicle pose is defined as :math:`\vec{x_R} = \begin{pmatrix} x & y & v_x & \psi \end{pmatrix}`.An :term:`UKF` is used to track and estimate the vehicle pose and the track.This :term:`UKF` is implemented in :class:`ukf.UKF`.Since measurements containing the position of cones are delayed by upto :math:`400` ms, using only one filter toestimate the vehicle pose and the track at the same time would mean that the latest vehicle pose would also be delayedby upto :math:`400` ms and more.Thus SLAM uses two independent filters, one saved :attr:`slam.SLAM.realtime_filter` for estimating the vehicle posewithout using landmark measurements in realtime. The other one saved in :attr:`slam.SLAM.main_filter`estimates the vehicle pose and the landmarks in one state vector, defined as:math:`\vec{x} = \begin{pmatrix} x & y & v_x & \psi & x_{l,0} & y_{l,0} & \ldots & x_{l,n} & y_{l,n} \end{pmatrix}`with n equal the number of tracked landmarks.To enable this, the measurements of the different sensors cannot be used just in time and rather have to be buffereduntil they are used by the main filter. Therefore received measurements by wheelspeed sensors, IMU, GPS, local mappingand steering wheel angle sensors are buffered in :attr:`slam.SLAM.steering_u_array`, :attr:`slam.SLAM.accel_u_array`,:attr:`slam.SLAM.gyro_u_array` and the respective :attr:`sensors.sensor.Sensor.ros_z_array`. The respective timestampfor the buffered measurement is saved in the respective `t_array`. Also, if the Sensor publishes the uncertainty aswell,the uncertainty is buffered in the respective :attr:`sensors.sensor.Sensor.ros_R_array`.When :func:`executing a filter <slam.SLAM.execute_filter_for_timestamp>`, the respective buffered measurementsand control inputs for the timestamp are retrieved and set to the filter or sensors(:func:`slam.SLAM.set_measurements` and :func:`slam.SLAM.set_u_from_buffer`).In normal operation, all incoming measurements via ROS are buffered in mentioned buffered, except the local mappingmeasurements. They get discarded until the realtime filter has been executed atleast once.The module waits until all control inputs are received atleast once. Afterwards the realtime filter is executed reguarlywith a rate defined in :attr:`ukf.UKF.predict_tolerance`. The states and their uncertainty after each execution stepare buffered in :attr:`slam.SLAM.x_array` and :attr:`slam.SLAM.P_array` until the main filter is initialized.As soon as the first local mapping measurement is received, the thread for the main filter is started and themain filter is intialized with the states and their uncertainty buffered for the latest timestamp before thelocal mapping measurement. Afterwards the main filter is always executed when new local mapping measurementsare received. The main filter is executed with a maximum :math:`dt` equal :attr:`ukf.UKF.predict_tolerance`.**Preloading**In case preloading is activated, the map to be preloaded is loaded into :attr:`slam.SLAM.preload_filter` and:attr:`slam.SLAM.preloaded_centerpoints`. The logic where to maps are saved and the logic to decide which one to chooseis decribed in :func:`slam.SLAM.try_preload_map`. If the alignment of the preloaded map(:attr:`slam.SLAM.preloaded_map_alignment_active`) is deactived, the map from the preloaded filter is subsequentlyloaded into the main filter.Otherwise there are two ways to align the preloaded map, chosen via :attr:`slam.SLAM.use_gps_instead_of_tracked_map`.The first approach uses the first used GPS Measurement (by calling :func:`slam.SLAM.try_aligning_preloaded_map_via_gps`during execution of realtime filter) and the GPS Coordinates for the coordinate origin of the preloaded map, to alignboth. This assumes that the preloaded map contains the gps coordinates for the coordinate origin, which are saved under:attr:`slam.SLAM.preloaded_gps_measurements` after preloading the map.The second approach will execute the realtime and main filter as in normal operation. Except, when enough cones havebeen perceived and tracked (:attr:`slam.SLAM.minimum_preload_landmarks` and:attr:`slam.SLAM.alignment_perceived_n_threshold`, see :func:`slam.SLAM.try_aligning_preloaded_map_via_main_filter`),a data association between the tracked map and preloaded map is calculated. Subsequently the preloaded map is rotatedand translated to align both maps according to the data association. Afterwards the preloaded map is loaded into themain filter.After loading the preloaded map into the main filter, :attr:`slam.SLAM.preloading_keep_updating` decides whetherthe landmarks should still be updated and thus whether the main filter and the perception node should be kept active.Also, if the Autonomous System drives Skidpad or Acceleration, preloaded centerpoints are published to ROS. First, whenthe map has been preloaded into the preload filter and second, in case the preloaded map has been aligned and loadedinto the main filter. This assumes that the preloaded map contained centerpoints. This is necessary if the AutonomousSystem drives Acceleration or Skidpad, since Filtering will shutdown in these cases automatically.When the Autonomous System shutdowns (someone kills the simulation or the mission machine kills it), the tracked mapwill be saved on shutdown to the according directory with an auto-incrementing number so it can be preloaded forthe next, possible run.**Conclusion**We have two seperate and independent UKF instances, one for tracking the vehicle pose in realtime (realtime filter)and one to track the map as soon as the local mapping measurements are available (main filter).The map for the second filter can be preloaded from existings maps. These preloaded maps can be aligned either via GPSor via aligning a tracked map with the preloaded map.The second filter can be deactivated as soon as a preloaded map has been loaded into the main filter,or the Autonomous System perceived all parts of the map (Autocross: one lap driven, Skidpad: both circles driven).**Outlook**Since we use two independent filter their vehicle poses can diverge a lot over the time, especially when no GPSmeasurements are used. When this happens, the main filter probably will track the more precise vehicle pose sinceit can use the landmark measurements to improve the estimated vehicle pose.To solve this problem there exist a possible approach which already was implemented once, but the support was droppedsince the advantage was marginally when using GPS Measurements and the filter where not perfomant enough back then:The idea is to fork a new realtime filter from the main filter from time to time. This is not possible directly, sincethis realtime filter would be not upto date anymore. Thus the a third filter was introduces: The catchup filter.The catchup filter was introduced as soon as the last timestamp of the realtime filter with local mapping measurementswas older than x seconds and since then there were atleast one new timestamp with local mapping measurements. Then,the catchup filter was forked from the latest timestamp of the main filter with local mapping measurements and wasexecuted for with :attr:`ukf.UKF.predict_tolerance` incrementing timestamps until it reached the realtime filter.Then the catchup filter was used as new realtime filter, until a new catchup filter was forked from the main filter andreached the realtime filter. And so on. This might be an idea for the future. Please call it Ketchup filter in the nextimplementation. Just for the heck of it. Thanks."""fromtypingimportList,Optional,TuplefromenumimportEnumimporttimeimportthreadingimportcv2importvtkfromvtk.utilimportnumpy_supportfromtkinterimportTclErrorimporttracebackimportnumpyasnpimportnumpy.typingasnptfromscipy.spatial.transformimportRotationimportrospyimporttf2_rosfromgps_common.msgimportGPSFixfromnovatel_oem7_msgs.msgimportHEADING2fromsensor_msgs.msgimportImufromas_can_msgs.msgimportImuAcceleration,ImuGyro,Wheelspeed,SteeringWheelAngle,LapCount,ConesCountAllfromutilities.msgimport(ConeListWithCovariance,VehiclePose,ConePositionWithCovariance,ConePosition,ConeList,ClosedTrack,DisablePerception,CenterPoints,PredictedMeasurements,MapAlignment)fromutilsimporthelpers,plot,geometry,vehicle_dynamicsfromsensorsimportCUREDiffWheelspeed,NovatelGPSHeading,NovatelGPSPosition,LocalMappingfromukfimportUKF
[docs]classAutonomousMissionEnum(Enum):""" Enum to describe which Autonomous Mission is currently select. .. csv-table:: Mission IDs :header: "Mission ID", "Mission Name" 0, Manual Driving 1, Acceleration 2, Skidpad 3, Trackdrive 4, Braketest / EBS Test 5, Inspection 6, Autocross """MANUAL_DRIVING=0ACCELERATION=1SKIDPAD=2TRACKDRIVE=3BRAKETEST=4INSPECTION=5AUTOCROSS=6
[docs]classSLAM():r""" Test. Attributes ---------- mission : AutonomousMissionEnum Autonomous Mission which is currently driven. realtime_filter : UKF UKF to estimate and track vehicle pose in realtime with as little as possible delay. main_filter : Optional[UKF] UKF to estimate and track vehicle pose and track at the same time but delayed. main_filter_active : bool Indicates whether main filter is still activated. disable_perception_when_inactive : bool Indicates whether to disable perception when main filter is not updating landmarks anyway. send_debug_informations : bool Indicates whether debug informations should be send to ROS, e.g. prior vehicle poses, predicted measurements and map alignment information. imu_source : str Indicates whether to use GPS IMU or Aceinna IMU as source for control input :math:`u`. preload_filter : UKF UKF to act as container for preloaded map for buffering until preloaded map is aligned and loaded into main filter. preloaded_gps_measurements : dict GPS Measurements preloaded which would be measured at map origin, must not be present. preloaded_centerpoints : dict Centerpoints of preloaded map, must not be present. preload_aligned : bool Indicates whether preloaded map has already been aligned. use_landmark_for_alignment : Optional[npt.NDArray] Array indicating whether the respective landmark should be used for registration during alignment of preloaded map and estimated & tracked landmarks, shape: (n ) with n equal number of prelaoded landmarks. preloading_active : bool Indicates whether preloading is activated. use_gps_instead_of_tracked_map : bool Indicates whether preloaded map and estimated & tracked map should be aligned via GPS instead of registration. minimum_preload_landmarks : int Number of landmarks that must be estimated and tracked before preloaded map and estimated & tracked map are aligned. alignment_perceived_n_threshold : int Number of times a landmark has to been perceived before it is usable for map alignment. preloading_added_width : float Width [m] to be added around polygon containing tracked & estimated landmarks for computing observable preloaded landmarks. preloading_added_height : float Height [m] to be added around polygon containing tracked & estimated landmarks for computing observable preloaded landmarks. preloaded_map_alignment_active : bool Indicates whether preloaded map should be aligned rather than directly loaded into main filter. preloading_keep_updating : bool Indicates whether preloaded landmarks still should be updated. align_event : threading.Event Event to indicate that map alignment can be execute since enough landmarks are estimated & tracked. align_thread : threading.Thread Thread to execute the alignment of the estimated & tracked map and the preloaded map. max_age_of_local_mapping_measurements : float Maximum age of local mapping measurements [s] to be used during update of main filter, otherwise get discarded. x_array : List[npt.NDArray] Array containing buffered states :math:`\vec{x}` of realtime filter, shape: (x, ) with x equal number of buffered states. P_array : List[npt.NDArray] Array containing buffered covariance matrices :math:`\vec{P}` of realtime filter, shape: (x, ) with x equal number of buffered states. t_array : List[float] Array containing the timestamps states :math:`\vec{x}` and covariance matrices :math:`\vec{P}` of realtime filter have been buffered for, shape: (x, ) with x equal number of buffered states. wheelspeed_rr : CUREDiffWheelspeed Wheelspeed sensor representing the rear right wheelspeed sensor of the car. wheelspeed_rl : CUREDiffWheelspeed Wheelspeed sensor representing the rear left wheelspeed sensor of the car. wheelspeed_fr : CUREDiffWheelspeed Wheelspeed sensor representing the front right wheelspeed sensor of the car. wheelspeed_fl : CUREDiffWheelspeed Wheelspeed sensor representing the front left wheelspeed sensor of the car. gps_heading : NovatelGPSHeading Sensor representing the heading measurements of the Novatel GPS. gps_position : NovatelGPSHeading Sensor representing the position and speed measurements of the Novatel GPS. local_mapping : LocalMapping Sensor representing the measurements of the local mapping module. local_mapping_subscriber : rospy.Subscriber Subscriber for measurements from local mapping module. camera_offset : Optional[float] Offset [m] between rear axle and camera, retrieved from ROS transform tree. steering_u_array : List[float] Array containing buffered measured steering wheel angles [deg] for control input :math:`\vec{u}`, shape: (s, ) with s equal number of steering wheel angles. steering_t_array : List[float] Array containing timestamps for which measured steering wheel angles have been buffered, shape: (s, ) with s equal number of steering wheel angles. accel_u_array : List[float] Array containing buffered measured longitudinal accelerations [m/s] for control input :math:`\vec{u}`, shape: (a, ) with a equal number of longitudinal accelerations. accel_t_array : List[float] Array containing timestamps for which measured longitudinal accelerations have been buffered, shape: (a, ) with a equal number of longitudinal accelerations. gyro_u_array : List[float] Array containing buffered measured yaw rates [deg/s] for control input :math:`\vec{u}`, shape: (y, ) with y equal number of buffered yaw rates. gyro_t_array : List[float] Array containing timestamps for which measured yaw rates have been buffered, shape: (y, ) with y equal number of buffered yaw rates. rotation_matrix : npt.NDArray Rotation matrix to compensate for possible inclination of Aceinna IMU, retrieved from ROS transform tree. steering_u_lock : threading.Lock Mutual exclusion lock so that steering wheel measurements are not added, retrieved or deleted at the same time. accel_u_lock : threading.Lock Mutual exclusion lock so that acceleration measurements are not added, retrieved or deleted at the same time. gyro_u_lock : threading.Lock Mutual exclusion lock so that gyro measurements are not added, retrieved or deleted at the same time. realtime_lock : threading.Lock Mutual exclusion lock so that realtime filter is not used from mutliple threads at the same time. main_lock : threading.Lock Mutual exclusion lock so that main filter is not used from mutliple threads at the same time. main_thread : threading.Thread Thread for executing the main filter. steering_model : dict Steering model defined by steering characteristics. max_steering_wheel_angle : float Boundaries of steering wheel angle [deg]. steering_wheel_angle_offset : float Offset of steering wheel angle [deg] at which vehicle should drive straight. lap_count : int Number of laps driven received by finish line detection. transform_listener : tf2_ros.TransformListener Listener for ROS transform tree. Used to retrieve transform from rear axle to camera and Aceinna IMU. tf_buffer : tf2_ros.Buffer Buffer for :attr:`slam.SLAM.transform_listener`. """def__init__(self)->None:"""Initialize SLAM ros node."""self.mission=AutonomousMissionEnum(rospy.get_param('/mission'))# Create unscented kalman filter instace to track vehicle pose and landmarksself.steering_model=vehicle_dynamics.load_steering_model()self.max_steering_wheel_angle=np.degrees(self.steering_model['steering_range'])/2self.steering_wheel_angle_offset=rospy.get_param('/vehicle/steering_wheel_angle_offset')self.realtime_filter=UKF(dt=rospy.get_param('/slam/ukf/dt'),name='realtime',update_strategy=helpers.UpdateStrategy(rospy.get_param('/slam/ukf/update_strategy')),sigma_alpha=rospy.get_param('/slam/ukf/sigma_alpha'),minimal_cone_covariance_area=rospy.get_param('/slam/ukf/minimal_cone_covariance_area'),P=rospy.get_param('/slam/ukf/P'),template_Q=rospy.get_param('/slam/ukf/template_Q'),perceived_n_threshold=rospy.get_param('/slam/map_cleanup/perceived_n_threshold'),u_dim=3,steering_model=self.steering_model)self.main_filter:Optional[UKF]=Noneself.main_filter_active=False# self.main_filter_active = Falseself.disable_perception_when_inactive=rospy.get_param('/slam/local_mapping/disable_perception_when_inactive')self.send_debug_informations=rospy.get_param('/slam/send_debug_informations',default=False)self.imu_source=rospy.get_param('/slam/ukf/imu_source')self.preload_filter:Optional[UKF]=Noneself.preloaded_gps_measurements:Optional[dict]=Noneself.preloaded_centerpoints:Optional[dict]=Noneself.preload_aligned=Falseself.use_landmark_for_alignment:Optional[npt.NDArray]=Noneself.preloading_active=rospy.get_param('/slam/preloading/activated')self.use_gps_instead_of_tracked_map=rospy.get_param('/slam/preloading/use_gps_instead_of_tracked_map')self.minimum_preload_landmarks=rospy.get_param('/slam/preloading/minimum_landmarks')self.alignment_perceived_n_threshold=rospy.get_param('/slam/preloading/perceived_n_threshold')self.preloading_added_width=rospy.get_param('/slam/preloading/added_width')self.preloading_added_height=rospy.get_param('/slam/preloading/added_height')self.preloaded_map_alignment_active=rospy.get_param('/slam/preloading/align')self.preloading_keep_updating=rospy.get_param('/slam/preloading/keep_updating')self.align_event=threading.Event()self.align_thread=threading.Thread(target=self.align_preloaded_map,daemon=True)ifself.preloading_active \
andself.preloaded_map_alignment_active \
andself.use_gps_instead_of_tracked_mapisFalse:self.align_thread.start()self.max_age_of_local_mapping_measurements=rospy.get_param('/slam/local_mapping/max_age_of_measurements')self.t_array:List[float]=[]self.x_array:List[npt.NDArray]=[]self.P_array:List[npt.NDArray]=[]# Create and add sensors to ukf instanceself.wheelspeed_rr:CUREDiffWheelspeed=self.realtime_filter.add_sensor(CUREDiffWheelspeed(self.realtime_filter,'wheelspeed_rr',right=True,z_names=['wheelspeed'],gear_ratio=rospy.get_param('/vehicle/rear_gear_ratio'),active=rospy.get_param('/slam/wheelspeed/rr_active'),track_width=rospy.get_param('/vehicle/track_width'),wheel_radius=rospy.get_param('/vehicle/wheel_radius'),R=rospy.get_param('/vehicle/rear_gear_ratio')*rospy.get_param('/slam/wheelspeed/R')))self.wheelspeed_fr:CUREDiffWheelspeed=self.realtime_filter.add_sensor(CUREDiffWheelspeed(self.realtime_filter,'wheelspeed_fr',right=True,z_names=['wheelspeed'],gear_ratio=1,active=rospy.get_param('/slam/wheelspeed/fr_active'),track_width=rospy.get_param('/vehicle/track_width'),wheel_radius=rospy.get_param('/vehicle/wheel_radius'),R=rospy.get_param('/slam/wheelspeed/R')))self.wheelspeed_rl:CUREDiffWheelspeed=self.realtime_filter.add_sensor(CUREDiffWheelspeed(self.realtime_filter,'wheelspeed_rl',right=False,z_names=['wheelspeed'],gear_ratio=-rospy.get_param('/vehicle/rear_gear_ratio'),active=rospy.get_param('/slam/wheelspeed/rl_active'),track_width=rospy.get_param('/vehicle/track_width'),wheel_radius=rospy.get_param('/vehicle/wheel_radius'),R=rospy.get_param('/vehicle/rear_gear_ratio')*rospy.get_param('/slam/wheelspeed/R')))self.wheelspeed_fl:CUREDiffWheelspeed=self.realtime_filter.add_sensor(CUREDiffWheelspeed(self.realtime_filter,'wheelspeed_fl',right=False,z_names=['wheelspeed'],gear_ratio=1,active=rospy.get_param('/slam/wheelspeed/fl_active'),track_width=rospy.get_param('/vehicle/track_width'),wheel_radius=rospy.get_param('/vehicle/wheel_radius'),R=rospy.get_param('/slam/wheelspeed/R')))self.gps_heading:NovatelGPSHeading=self.realtime_filter.add_sensor(NovatelGPSHeading(self.realtime_filter,'gps_heading',z_names=['Heading'],active=rospy.get_param('/slam/novatel_heading/active'),offset=rospy.get_param('/slam/novatel_heading/offset')))self.gps_position:NovatelGPSPosition=self.realtime_filter.add_sensor(NovatelGPSPosition(self.realtime_filter,'gps_position',z_names=['x','y','v'],active=rospy.get_param('/slam/novatel_position/active')))self.local_mapping:LocalMapping=self.realtime_filter.add_sensor(LocalMapping(self.realtime_filter,'local_mapping',z_names=['landmarks'],active=rospy.get_param('/slam/local_mapping/active'),data_association=helpers.DataAssociation(rospy.get_param('/slam/local_mapping/data_association/method')),use_kd_tree=rospy.get_param('/slam/local_mapping/data_association/use_kd_tree'),chi2_quantile=rospy.get_param('/slam/local_mapping/data_association/chi2_quantile'),fov_min_distance=rospy.get_param('/slam/local_mapping/fov_gate/min_distance'),fov_max_distance=rospy.get_param('/slam/local_mapping/fov_gate/max_distance'),fov_angle=rospy.get_param('/slam/local_mapping/fov_gate/angle'),fov_scaling=rospy.get_param('/slam/local_mapping/fov_gate/scaling'),probability_gate=rospy.get_param('/slam/local_mapping/probability_gate'),R_azimuth=rospy.get_param('/slam/local_mapping/R_azimuth'),R_distance_factor=rospy.get_param('/slam/local_mapping/R_distance_factor'),R_offset=rospy.get_param('/slam/local_mapping/R_distance_offset'),update_strategy=helpers.LandmarkUpdateStrategy(rospy.get_param('/slam/local_mapping/update_strategy'))))# Initialize Attributes to save used sensor measurementsself.realtime_filter.sensor_z=[None]*len(self.realtime_filter.sensors)self.realtime_filter.sensor_R=[None]*len(self.realtime_filter.sensors)# Set maximal dimension of sensor measurementsself.realtime_filter.max_z_dim=3# Initialize Publisher for vehicle pose, cone positions with covariance, closed track and cone countself.realtime_vehicle_pose_publisher=rospy.Publisher('/slam/vehicle_pose',VehiclePose,queue_size=10)self.main_vehicle_pose_publisher=rospy.Publisher('/slam/vehicle_pose/main',VehiclePose,queue_size=10)self.realtime_prior_vehicle_pose_publisher=rospy.Publisher('/slam/prior_vehicle_pose',VehiclePose,queue_size=10)self.main_prior_vehicle_pose_publisher=rospy.Publisher('/slam/prior_vehicle_pose/main',VehiclePose,queue_size=10)self.landmark_publisher=rospy.Publisher('/slam/landmarks',ConeListWithCovariance,queue_size=10,latch=True)self.main_predicted_measurement_publisher=rospy.Publisher('/slam/predicted_measurements/main',PredictedMeasurements,queue_size=10,latch=True)self.predicted_measurement_publisher=rospy.Publisher('/slam/predicted_measurements',PredictedMeasurements,queue_size=10,latch=True)self.closed_track=Falseself.closed_track_publisher=rospy.Publisher('/slam/closed_track',ClosedTrack,queue_size=10,latch=True)self.all_cones_publisher=rospy.Publisher('/slam/cone_count',ConesCountAll,queue_size=10)self.disable_perception_publisher=rospy.Publisher('/slam/disable_perception',DisablePerception,queue_size=10,latch=True)self.centerpoints_publisher=rospy.Publisher('/filtering/centerpoints',CenterPoints,queue_size=10,latch=True)self.map_alignment_publisher=rospy.Publisher('/slam/map_alignment',MapAlignment,queue_size=10)# Initialize buffer for tf tree transformationsself.tf_buffer=tf2_ros.Buffer()self.transform_listener=tf2_ros.TransformListener(self.tf_buffer)self.steering_u_lock=threading.Lock()self.accel_u_lock=threading.Lock()self.gyro_u_lock=threading.Lock()self.realtime_lock=threading.Lock()self.main_lock=threading.Lock()# Initialize ros subscriber for all input valuesself.accel_t_array:List[float]=[]self.accel_u_array:List[float]=[]self.gyro_t_array:List[float]=[]self.gyro_u_array:List[float]=[]self.rotation_matrix:Optional[npt.NDArray]=Noneifself.imu_source=='aceinna':rospy.Subscriber('/can/per/imu/gyro',ImuGyro,self.imu_gyro_callback)rospy.Subscriber('/can/per/imu/accel',ImuAcceleration,self.imu_accel_callback)elifself.imu_source=='gps':rospy.Subscriber('/gps/imu',Imu,self.gps_imu_callback)else:rospy.logfatal('[SLAM] Specified not implemented imu source.')raiseException('Specified not implemented imu source.')rospy.Subscriber('/can/per/wheelspeed/front',Wheelspeed,self.wheelspeed_front_callback)self.last_wheelspeed_fl=Noneself.last_wheelspeed_fr=Noneself.last_wheelspeed_fl_time=Noneself.last_wheelspeed_fr_time=Noneself.wheel_radius=rospy.get_param('/vehicle/wheel_radius')self.ax_wheelspeed_max=50.0rospy.Subscriber('/can/log/wheelspeed/rear',Wheelspeed,self.wheelspeed_rear_callback)self.steering_t_array:List[float]=[]self.steering_u_array:List[float]=[]rospy.Subscriber('/can/per/steering_wheel_angle/actual',SteeringWheelAngle,self.steering_wheel_angle_callback)rospy.Subscriber('/gps/gps',GPSFix,self.gps_position_callback)rospy.Subscriber('/novatel/oem7/heading2',HEADING2,self.gps_heading_callback)self.camera_offset:Optional[float]=Noneself.local_mapping_subscriber=rospy.Subscriber('/local_mapping/cone_positions',ConeList,self.local_mapping_callback)self.lap_count=0rospy.Subscriber('/finish_line_detection/lap_count',LapCount,self.lap_count_callback)self.main_thread=threading.Thread(target=self.execute_main_filter,daemon=True)rospy.on_shutdown(self.on_shutdown)while(rospy.Time.now().to_sec()==0):time.sleep(0.01)self.try_preload_map()
[docs]@staticmethoddefget_u_from_input_buffer(t_array:List[float],u_array:List[float],timestamp:float)->float:""" Get latest control input :math:`u` for given control input buffer which lies before given timestamp. Parameters ---------- t_array : List[float] Buffer for timestamps for respective buffered control input :math:`u`, shape: (n, ) with n equal number of buffered timesteps. u_array : List[float] Buffer for control input :math:`u`, shape: (n, ) with n equal number of buffered timesteps. timestamp : float Timestamp for which the latest control input :math:`u` should be get. Returns ------- float Latest control input :math:`u` for given control input buffer which lies before given timestamp. """timestamp_index=next((t_ifort_i,tinenumerate(t_array)ift>timestamp),0)returnu_array[timestamp_index-1]
[docs]defcorrect_offsets(self):"""Correct camera and lidar offsets of local mapping sensor by extracting values from transformation tree."""camera_success=Falselidar_success=False# Attempt to correct camera offsettry:translation=self.tf_buffer.lookup_transform('rear_axle','camera',rospy.Time()).transform.translationself.local_mapping.camera_offset=np.array([translation.x,translation.y])camera_success=Trueexcept(tf2_ros.LookupException,tf2_ros.ConnectivityException,tf2_ros.ExtrapolationException):rospy.logwarn('Can not look up transform from rear axle to camera.')# Attempt to correct lidar offsettry:translation=self.tf_buffer.lookup_transform('rear_axle','os_sensor',rospy.Time()).transform.translationself.local_mapping.lidar_offset=np.array([translation.x,translation.y])lidar_success=Trueexcept(tf2_ros.LookupException,tf2_ros.ConnectivityException,tf2_ros.ExtrapolationException):rospy.logwarn('Can not look up transform from rear axle to os_sensor.')# Execute if both camera and lidar lookups are successfulifcamera_successandlidar_success:ifself.rotation_matrixisnotNoneorself.imu_source!='aceinna':self.transform_listener.unregister()
[docs]deflap_count_callback(self,lap_count:LapCount):""" Decide which filters to execute and whether to use perception based on lap count from finish line detection. Strategy decision is based on the current mission, lap count and whether the filter is even active. **Trackdrive and Autocross** When the first lap is driven and the main filter is active the following is done: #. Disable perception, #. Change LandmarkUpdateStrategy of main filter to not updating anymore. #. Deactivate main filter #. Assume track to be closed. #. Cleanup map, see :func:`ukf.UKF.cleanup_map` #. Publish cleaned map **Skidpad** When the first lap is driven (aka the right circle is driven the first time) and the main filter is active the following things are done: #. Disable perception, #. Change LandmarkUpdateStrategy of main filter to not updating anymore. #. Cleanup map, see :func:`ukf.UKF.cleanup_map` #. Publish cleaned map When the second lap is driven (aka the left circle has now to be driven the first time) and the main filter is active, the following things are done: #. Enable perception, #. Change LandmarkUpdateStrategy of main filter to update and initialize Landmarks but not vehicle pose anymore. When the third lap is driven (aka the left circle is driven the first time) and the main filter is active, the following things are done: #. Disable perception, #. Change LandmarkUpdateStrategy of main filter to not updating anymore. #. Deactivate main filter #. Cleanup map, see :func:`ukf.UKF.cleanup_map` #. Publish cleaned map Parameters ---------- lap_count : LapCount Lap count counted by finish line detection and received via ROS, (:ros:msg:`as_can_msgs.msg.LapCount`). """self.lap_count=lap_count.lapsifself.mission==AutonomousMissionEnum.TRACKDRIVEorself.mission==AutonomousMissionEnum.AUTOCROSS:ifself.lap_count>0andself.main_filter_activeisTrue:withself.main_lock:self.disable_perception()self.local_mapping_subscriber.unregister()self.update_landmark_update_strategy(strategy=helpers.LandmarkUpdateStrategy.DO_NOT_UPDATE)self.main_filter_active=Falseself.closed_track=Truecones_n=len(self.main_filter.landmark_ids)rospy.loginfo(f'[SLAM] Number of times global cones have been perceived: 'f'{self.main_filter.landmark_perceived_n}')self.main_filter.cleanup_map()rospy.loginfo(f'[SLAM] MapCleanup finished: Deleted 'f'{cones_n-len(self.main_filter.landmark_ids)} cones.')self.send_global_landmarks()ifself.mission==AutonomousMissionEnum.SKIDPAD:ifself.lap_count==1andself.main_filter_activeisTrue:self.disable_perception()self.update_landmark_update_strategy(strategy=helpers.LandmarkUpdateStrategy.DO_NOT_UPDATE)cones_n=len(self.main_filter.landmark_ids)rospy.loginfo(f'[SLAM] Number of times global cones have been perceived: 'f'{self.main_filter.landmark_perceived_n}')self.main_filter.cleanup_map()rospy.loginfo(f'[SLAM] MapCleanup finished: Deleted 'f'{cones_n-len(self.main_filter.landmark_ids)} cones.')self.send_global_landmarks()ifself.lap_count==2andself.main_filter_activeisTrue:self.enable_perception()self.update_landmark_update_strategy(strategy=helpers.LandmarkUpdateStrategy.NO_POSE)ifself.lap_count==3andself.main_filter_activeisTrue:self.disable_perception()self.update_landmark_update_strategy(strategy=helpers.LandmarkUpdateStrategy.DO_NOT_UPDATE)self.local_mapping_subscriber.unregister()self.main_filter_active=Falsecones_n=len(self.main_filter.landmark_ids)rospy.loginfo(f'[SLAM] Number of times global cones have been perceived: 'f'{self.main_filter.landmark_perceived_n}')self.main_filter.cleanup_map()rospy.loginfo(f'[SLAM] MapCleanup finished: Deleted 'f'{cones_n-len(self.main_filter.landmark_ids)} cones.')self.send_global_landmarks()
[docs]defwheelspeed_front_callback(self,wheelspeed_front:Wheelspeed):""" Buffer front wheelspeed measurements. Buffers new measurements to :attr:`sensors.wheelspeed.CUREDiffWheelspeed.ros_z_array` of the respective sensor (:attr:`slam.SLAM.wheelspeed_fl` and :attr:`slam.SLAM.wheelspeed_fr`). Parameters ---------- wheelspeed_front : Wheelspeed ROS message containing the front wheelspeed measurements, (:ros:msg:`as_can_msgs.msg.Wheelspeed`). """ifself.last_wheelspeed_flisnotNone:ax_fl=(wheelspeed_front.left-self.last_wheelspeed_fl)*2*np.pi*self.wheel_radius/60/(wheelspeed_front.header.stamp.to_sec()-self.last_wheelspeed_fl_time)ifax_fl<self.ax_wheelspeed_max:withself.wheelspeed_fl.measurement_lock:self.wheelspeed_fl.ros_t_array.append(wheelspeed_front.header.stamp.to_sec())self.wheelspeed_fl.ros_z_array.append(np.array([wheelspeed_front.left]))self.last_wheelspeed_fl=wheelspeed_front.leftself.last_wheelspeed_fl_time=wheelspeed_front.header.stamp.to_sec()else:withself.wheelspeed_fl.measurement_lock:self.wheelspeed_fl.ros_t_array.append(wheelspeed_front.header.stamp.to_sec())self.wheelspeed_fl.ros_z_array.append(np.array([wheelspeed_front.left]))self.last_wheelspeed_fl=wheelspeed_front.leftself.last_wheelspeed_fl_time=wheelspeed_front.header.stamp.to_sec()ifself.last_wheelspeed_frisnotNone:ax_fr=(wheelspeed_front.right-self.last_wheelspeed_fr)*2*np.pi*self.wheel_radius/60/(wheelspeed_front.header.stamp.to_sec()-self.last_wheelspeed_fr_time)ifax_fr<self.ax_wheelspeed_max:withself.wheelspeed_fr.measurement_lock:self.wheelspeed_fr.ros_t_array.append(wheelspeed_front.header.stamp.to_sec())self.wheelspeed_fr.ros_z_array.append(np.array([wheelspeed_front.right]))self.last_wheelspeed_fr=wheelspeed_front.rightself.last_wheelspeed_fr_time=wheelspeed_front.header.stamp.to_sec()else:withself.wheelspeed_fr.measurement_lock:self.wheelspeed_fr.ros_t_array.append(wheelspeed_front.header.stamp.to_sec())self.wheelspeed_fr.ros_z_array.append(np.array([wheelspeed_front.right]))self.last_wheelspeed_fr=wheelspeed_front.rightself.last_wheelspeed_fr_time=wheelspeed_front.header.stamp.to_sec()
[docs]defwheelspeed_rear_callback(self,wheelspeed_rear:Wheelspeed):""" Buffer rear wheelspeed measurements. Buffers new measurements to :attr:`sensors.wheelspeed.CUREDiffWheelspeed.ros_z_array` of the respective sensor (:attr:`slam.SLAM.wheelspeed_rl` and :attr:`slam.SLAM.wheelspeed_rr`). Parameters ---------- msg : Wheelspeed ROS message containing the rear wheelspeed measurements, (:ros:msg:`as_can_msgs.msg.Wheelspeed`). """withself.wheelspeed_rl.measurement_lock:self.wheelspeed_rl.ros_t_array.append(wheelspeed_rear.header.stamp.to_sec())self.wheelspeed_rl.ros_z_array.append(np.array([wheelspeed_rear.left]))withself.wheelspeed_rr.measurement_lock:self.wheelspeed_rr.ros_t_array.append(wheelspeed_rear.header.stamp.to_sec())self.wheelspeed_rr.ros_z_array.append(np.array([wheelspeed_rear.right]))
[docs]defsteering_wheel_angle_callback(self,steering_wheel_angle:SteeringWheelAngle):""" Buffer steering wheel angle measurements. Buffers new measurements to the :attr:`slam.SLAM.steering_u_array` attribute. Parameters ---------- steering_wheel_angle : SteeringWheelAngle ROS message containing steering wheel angle measurement, (:ros:msg:`as_can_msgs.msg.SteeringWheelAngle`). """angle=np.clip(steering_wheel_angle.angle-self.steering_wheel_angle_offset,-self.max_steering_wheel_angle,self.max_steering_wheel_angle)withself.steering_u_lock:self.steering_t_array.append(steering_wheel_angle.header.stamp.to_sec())self.steering_u_array.append(angle)
[docs]deflookup_rotation_matrix(self):""" Lookup rotation matrix for Aceinna IMU in transformation tree. Saves rotation matrix in :attr:`slam.SLAM.rotation_matrix`. Unregister :attr:`slam.SLAM.transform_listener` when :attr:`slam.SLAM.camera_offset` is also already looked up. """ifself.rotation_matrixisNone:try:rotation=self.tf_buffer.lookup_transform('imu','rear_axle',rospy.Time()).transform.rotationself.rotation_matrix=Rotation.from_quat([rotation.x,rotation.y,rotation.z,rotation.w]).as_matrix()except(tf2_ros.LookupException,tf2_ros.ConnectivityException,tf2_ros.ExtrapolationException):rospy.logwarn('Can not look up transform from imu to rear_axle.')else:ifself.camera_offsetisnotNoneandself.local_mapping.lidar_offsetisnotNone:self.transform_listener.unregister()
[docs]defimu_gyro_callback(self,imu_gyro:ImuGyro):""" Buffer gyro measurements from Aceinna IMU. Buffers new measurements to the :attr:`slam.SLAM.gyro_u_array` attribute. Only buffers when :attr:`slam.SLAM.rotation_matrix` is already looked up. Before buffering, measurements are rotated by :attr:`slam.SLAM.rotation_matrix`. Parameters ---------- imu_gyro : ImuGyro ROS message containing Aceinna IMU gyro measurement, (:ros:msg:`as_can_msgs.msg.ImuGyro`). """self.lookup_rotation_matrix()ifself.rotation_matrixisnotNone:gyro_vector=np.array([[imu_gyro.roll,imu_gyro.pitch,imu_gyro.yaw]])rotated_vector=np.dot(gyro_vector,self.rotation_matrix)withself.gyro_u_lock:self.gyro_t_array.append(imu_gyro.header.stamp.to_sec())self.gyro_u_array.append(rotated_vector[0,2])
[docs]defimu_accel_callback(self,imu_accel:ImuAcceleration):""" Buffer acceleration measurements from Aceinna IMU. Buffers new measurements to the :attr:`slam.SLAM.accel_u_array` attribute. Only buffers when :attr:`slam.SLAM.rotation_matrix` is already looked up. Before buffering, measurements are rotated by :attr:`slam.SLAM.rotation_matrix`. Parameters ---------- imu_accel : ImuAcceleration ROS message containing Aceinna IMU acceleleration measurement, (:ros:msg:`as_can_msgs.msg.ImuAcceleration`). """self.lookup_rotation_matrix()ifself.rotation_matrixisnotNone:accel_vector=np.array([[imu_accel.x,imu_accel.y,imu_accel.z]])rotated_vector=np.dot(accel_vector,self.rotation_matrix)withself.accel_u_lock:self.accel_t_array.append(imu_accel.header.stamp.to_sec())self.accel_u_array.append(rotated_vector[0,0])
[docs]defgps_imu_callback(self,gps_imu:Imu):""" Buffer acceleration measurements from GPS IMU. Buffers new measurements to the :attr:`slam.SLAM.accel_u_array` and :attr:`slam.SLAM.gyro_u_array` attribute. Measurements are not rotated since the GPS does this already internally. Parameters ---------- gps_imu : Imu ROS message containing GPS IMU measurements. """withself.accel_u_lock:self.accel_t_array.append(gps_imu.header.stamp.to_sec())self.accel_u_array.append(gps_imu.linear_acceleration.x)withself.gyro_u_lock:self.gyro_t_array.append(gps_imu.header.stamp.to_sec())self.gyro_u_array.append(np.degrees(gps_imu.angular_velocity.z))
[docs]defgps_position_callback(self,gps_position:GPSFix):""" Buffer gps position and speed measurements inclusive their uncertainty from the GPS. Buffers new measurements to the :attr:`sensors.novatel.NovatelGPSPosition.ros_z` and :attr:`sensors.novatel.NovatelGPSPosition.ros_R` attribute of the respective sensor (:attr:`slam.SLAM.gps_position`). Parameters ---------- gps_position : GPSFix ROS message containing gps position measurements and their uncertainty. """withself.gps_position.measurement_lock:self.gps_position.ros_t_array.append(gps_position.header.stamp.to_sec())self.gps_position.ros_z_array.append(np.array([gps_position.latitude,gps_position.longitude,gps_position.speed]))self.gps_position.ros_R_array.append(np.diag([gps_position.position_covariance[0],gps_position.position_covariance[4],gps_position.speed*0.1]))
[docs]defgps_heading_callback(self,gps_heading:HEADING2):""" Buffer gps heading measurement incusive its uncertainty. Buffers new measurements to the :attr:`sensors.novatel.NovatelGPSHeading.ros_z` and :attr:`sensors.novatel.NovatelGPSHeading.ros_R` attribute of the respective sensor (:attr:`slam.SLAM.gps_heading`). Before buffering, the :attr:`sensors.novatel.NovatelGPSHeading.offset` is added to the measurements, to compensate that the GPS antennas might be mounted not inline with the vehicle x axis. Parameters ---------- gps_heading : HEADING2 ROS message containing gps heading measurement and its uncertainty. """withself.gps_heading.measurement_lock:self.gps_heading.ros_t_array.append(gps_heading.header.stamp.to_sec())self.gps_heading.ros_z_array.append(np.array([gps_heading.heading+self.gps_heading.offset]))self.gps_heading.ros_R_array.append(np.array([[gps_heading.heading_stdev]]))
[docs]deflocal_mapping_callback(self,cone_list:ConeList):""" Buffer positions of perceived cones mapped by the local mapping module. Cones are discarded if: #. Main filter is deactived, #. Cones are empty #. Cones are older than :attr:`slam.SLAM.max_age_of_local_mapping_measurements` or #. Main filter is not initialized yet and measurement lies before first realtime filter execution. Buffers new measurements to the :attr:`sensors.landmarks.LocalMapping.ros_z` of the respective sensor (:attr:`slam.SLAM.local_mapping`). Thread for executing the main filter is started when main filter is not initialized yet and thread not started yet. Parameters ---------- cone_list : ConeList Ros message containing local mapping measurements, (:ros:msg:`utilities.msg.ConeList`). """ifself.main_filter_activeisFalseornotcone_list.cones:returntimestamp=cone_list.image_header.stamp.to_sec()iftimestamp+self.max_age_of_local_mapping_measurements>cone_list.header.stamp.to_sec():ros_z=np.array([[cone.x,cone.y,cone.id,cone.probability,cone.is_lidar]forconeincone_list.cones])ifself.main_filterisNoneand(notself.t_arrayortimestamp<self.t_array[0]):returnwithself.local_mapping.measurement_lock:self.local_mapping.ros_z_array.append(ros_z)self.local_mapping.ros_t_array.append(timestamp)ifself.main_filterisNoneandnotself.main_thread.is_alive():self.main_thread.start()else:rospy.loginfo(f'[SLAM] Local mapping msg discarded: {cone_list.header.stamp.to_sec()-timestamp} s old')
[docs]defupdate_landmark_update_strategy(self,strategy:helpers.LandmarkUpdateStrategy):""" Update the LandmarkUpdateStrategy of the local mapping sensor of the :attr:`slam.SLAM.main_filter`. Parameters ---------- strategy : helpers.LandmarkUpdateStrategy new LandmarkUpdateStrategy- """rospy.loginfo(f'[SLAM] Switch landmark update strategy to {strategy}')self.main_filter.sensor('local_mapping')[1].update_strategy=strategy
[docs]defset_u_from_buffer(self,filter:UKF,timestamp:float):# noqa: A002r""" Set all control inputs :math:`\vec{u}` to :attr:`ukf.UKF.u` of respective filter for given timestamp. Parameters ---------- filter : UKF Filter of which the control inputs :math:`\vec{u}` should be set. timestamp : float Timestamp for which the latest control inputs :math:`\vec{u}` should be set. """withself.steering_u_lock:steering=self.get_u_from_input_buffer(self.steering_t_array,self.steering_u_array,timestamp)withself.accel_u_lock:accel=self.get_u_from_input_buffer(self.accel_t_array,self.accel_u_array,timestamp)withself.gyro_u_lock:gyro=self.get_u_from_input_buffer(self.gyro_t_array,self.gyro_u_array,timestamp)filter.u=np.array([steering,accel,gyro])
[docs]defpop_u_from_buffer(self,local_mapping_timestamp:float):r""" Delete all buffered control inputs :math:`\vec{u}` before given timestamp except each latest. Parameters ---------- local_mapping_timestamp : float Timestamps before all buffered control inputs :math:`\vec{u}` should be deleted. """withself.steering_u_lock:timestamp_index=next((t_ifort_i,tinenumerate(self.steering_t_array)ift>local_mapping_timestamp),-1)self.steering_t_array=self.steering_t_array[timestamp_index:]self.steering_u_array=self.steering_u_array[timestamp_index:]withself.accel_u_lock:timestamp_index=next((t_ifort_i,tinenumerate(self.accel_t_array)ift>local_mapping_timestamp),-1)self.accel_t_array=self.accel_t_array[timestamp_index:]self.accel_u_array=self.accel_u_array[timestamp_index:]withself.gyro_u_lock:timestamp_index=next((t_ifort_i,tinenumerate(self.gyro_t_array)ift>local_mapping_timestamp),-1)self.gyro_t_array=self.gyro_t_array[timestamp_index:]self.gyro_u_array=self.gyro_u_array[timestamp_index:]
[docs]defset_measurements(self,filter:UKF,timestamp:float,use_local_mapping:bool=False):# noqa: A002r""" Set all measurements :math:`\vec{z}` to all sensors of respective filter for given timestamp. Only measurements between given timestamp and :attr:`ukf.UKF.predict_tolerance` before are used. Measurements :math:`\vec{z}` from :attr:`sensors.sensor.Sensor.ros_z_array` and its uncertainity :math:`\vec{R}` from :attr:`sensors.sensor.Sensor.ros_R_array` are set to :attr:`sensors.sensor.Sensor.ros_z` and :attr:`sensors.sensor.Sensor.ros_R`. Parameters ---------- filter : UKF Filter of which the measurements :math:`\\vec{z}` should be set. timestamp : float Timestamp for which the latest measurements :math:`\\vec{z}` since last execution step should be set. use_local_mapping : bool, optional Whether measurements from local mapping should be set aswell, by default False """forsensorinfilter.sensors:ifnotuse_local_mappingandisinstance(sensor,LocalMapping):continuesensor.set_z_and_R_from_buffer(timestamp=timestamp)self.set_u_from_buffer(filter=filter,timestamp=timestamp)
[docs]defdelete_measurements(self,filter:UKF):# noqa: A002r""" Delete all measurements :math:`\vec{z}` for all sensors by setting the array values to :attr:`np.nan`. """forsensorinfilter.sensors:sensor.z=np.full((sensor.z_dim),np.nan)
[docs]defsend_predicted_measurements(self,filter:UKF,publisher:rospy.Publisher):# noqa: A002""" Publish predicted measurements and more to ROS if configured by :attr:`slam.SLAM.send_debug_informations`. Among others statistics about data association. """ifself.send_debug_informationsisFalse:returnmsg=PredictedMeasurements()msg.header.stamp=rospy.Time.from_sec(filter.t)msg.header.frame_id='map'# Setting everything unused to np.nanmsg.wheelspeed_rr=[np.nan,np.nan,np.nan]msg.wheelspeed_fr=[np.nan,np.nan,np.nan]msg.wheelspeed_rl=[np.nan,np.nan,np.nan]msg.wheelspeed_fl=[np.nan,np.nan,np.nan]msg.gps_speed=[np.nan,np.nan,np.nan]msg.gps_heading=[np.nan,np.nan,np.nan]msg.gps_x=[np.nan,np.nan,np.nan]msg.gps_y=[np.nan,np.nan,np.nan]msg.used_sensors=filter.executed_stepsmsg.cluster_sizes=[]msg.landmark_association_nodes_explored_n=0msg.landmark_association_compared_associations_n=0msg.u=filter.u.tolist()iflen(filter.executed_steps)>1:R=np.sqrt(filter.R)# First value is the predicted measurements, second one the real and used measurements and third the uncertaintyforsensor_name,z_indicesinzip(filter.executed_steps[1:],filter.sensor_updated_z_indices):ifsensor_name=='wheelspeed_rr':msg.wheelspeed_rr[0]=filter.zp[z_indices][0]msg.wheelspeed_rr[1]=filter.z[z_indices][0]msg.wheelspeed_rr[2]=R[z_indices][:,z_indices][0,0]ifsensor_name=='wheelspeed_rl':msg.wheelspeed_rl[0]=filter.zp[z_indices][0]msg.wheelspeed_rl[1]=filter.z[z_indices][0]msg.wheelspeed_rl[2]=R[z_indices][:,z_indices][0,0]ifsensor_name=='wheelspeed_fr':msg.wheelspeed_fr[0]=filter.zp[z_indices][0]msg.wheelspeed_fr[1]=filter.z[z_indices][0]msg.wheelspeed_fr[2]=R[z_indices][:,z_indices][0,0]ifsensor_name=='wheelspeed_fl':msg.wheelspeed_fl[0]=filter.zp[z_indices][0]msg.wheelspeed_fl[1]=filter.z[z_indices][0]msg.wheelspeed_fl[2]=R[z_indices][:,z_indices][0,0]ifsensor_name=='gps_heading':msg.gps_heading[0]=(filter.zp[z_indices][0]-self.gps_heading.offset)%360msg.gps_heading[1]=(filter.z[z_indices][0]-self.gps_heading.offset)%360msg.gps_heading[2]=R[z_indices][:,z_indices][0,0]ifsensor_name=='gps_position':msg.gps_x[0]=filter.zp[z_indices][0]msg.gps_x[1]=filter.z[z_indices][0]msg.gps_x[2]=R[z_indices][:,z_indices][0,0]msg.gps_y[0]=filter.zp[z_indices][1]msg.gps_y[1]=filter.z[z_indices][1]msg.gps_y[2]=R[z_indices][:,z_indices][1,1]msg.gps_speed[0]=filter.zp[z_indices][2]msg.gps_speed[1]=filter.z[z_indices][2]msg.gps_speed[2]=R[z_indices][:,z_indices][2,2]ifsensor_name=='local_mapping':local_mapping_sensor:LocalMapping=filter.sensor('local_mapping')[1]observed_landmarks=local_mapping_sensor.global_observed_landmarksmsg.observed_landmarks=[ConePosition(x=cone[0],y=cone[1],id=int(cone[2]),probability=cone[3])forconeinobserved_landmarks]msg.observable_landmarks=[ConePosition(x=cone[0],y=cone[1],id=int(cone[2]),probability=1)forconeinfilter.observable_landmarks]msg.individual_compatibility=filter.individual_compatability.flatten().tolist()mapping=np.full(filter.mapping.shape[0],-1)helping_mask=np.zeros(filter.landmarks.shape[0])helping_mask[filter.observable_to_global_mapping]=np.arange(filter.observable_to_global_mapping.shape[0])mapping[filter.mapping!=-1]=helping_mask[filter.mapping[filter.mapping!=-1]]msg.mapping=mapping.astype(int).tolist()local_polar_coords=np.reshape(filter.zp[z_indices],(-1,2))local_cartesian_coords=np.c_[local_polar_coords[:,1]*np.cos(local_polar_coords[:,0]),local_polar_coords[:,1]*np.sin(local_polar_coords[:,0])]global_cartesian_coords=geometry.rotate_and_move(local_cartesian_coords.copy(),rotation=filter.x_prior[3],offset_after_rotation=filter.x_prior[0:2],offset_before_rotation=local_mapping_sensor.camera_offset)msg.predicted_observed_landmarks=[ConePosition(x=cone[0],y=cone[1],id=int(cone_id),probability=1)forcone,cone_idinzip(global_cartesian_coords,observed_landmarks[filter.mapping!=-1,2])]msg.cluster_sizes=[cluster.best_associations.sizeforclusterinlocal_mapping_sensor.clusters]msg.landmark_association_nodes_explored_n=filter.local_mapping_nodes_explored_nmsg.landmark_association_compared_associations_n=filter.local_mapping_compared_associations_npublisher.publish(msg)
[docs]defsend_vehicle_pose(self,filter:UKF,publisher:rospy.Publisher):# noqa: A002r""" Publish vehicle pose :math:`\vec{x_R}` and its uncertainty :math:`\vec{P_R}` of the respective filter to ROS. Parameters ---------- filter : UKF Filter of which vehicle pose should be published. publisher : rospy.Publisher Publisher for the vehicle pose of respective filter. """msg=VehiclePose()msg.header.frame_id='map'msg.header.stamp=rospy.Time.from_sec(filter.t)msg.x=filter.x[0]msg.y=filter.x[1]msg.v_x=filter.x[2]msg.psi=filter.x[3]msg.P=filter.P_R.flatten().tolist()publisher.publish(msg)
[docs]defsend_prior_vehicle_pose(self,filter:UKF,publisher:rospy.Publisher):# noqa: A002r""" Publish prior vehicle pose and its uncertainty of the respective filter to ROS. Publish prior vehicle pose :math:`\vec{x_{R, prior}}` and its uncertainty :math:`\vec{P_{R, prior}}` of the respective filter to ROS. Parameters ---------- filter : UKF Filter of which prior vehicle pose should be published. publisher : rospy.Publisher Publisher for the prior vehicle pose of respective filter. """ifself.send_debug_informationsisFalse:returnmsg=VehiclePose()msg.header.frame_id='map'msg.header.stamp=rospy.Time.from_sec(filter.t)msg.x=filter.x_R_prior[0]msg.y=filter.x_R_prior[1]msg.v_x=filter.x_R_prior[2]msg.psi=filter.x_R_prior[3]msg.P=filter.P_R_prior.flatten().tolist()publisher.publish(msg)
[docs]defsend_realtime_vehicle_pose(self):r""" Publish vehicle pose :math:`\vec{x_R}` and its uncertainty :math:`\vec{P_R}` of the realtime filter to ROS. """self.send_vehicle_pose(filter=self.realtime_filter,publisher=self.realtime_vehicle_pose_publisher)
[docs]defsend_main_vehicle_pose(self):r"""Publish vehicle pose :math:`\vec{x_R}` and its uncertainty :math:`\vec{P_R}` of the main filter to ROS."""self.send_vehicle_pose(filter=self.main_filter,publisher=self.main_vehicle_pose_publisher)
[docs]defsend_realtime_prior_vehicle_pose(self):r""" Publish prior vehicle pose and its uncertainty of the realtime filter to ROS. Publish prior vehicle pose :math:`\vec{x_{R, prior}}` and its uncertainty :math:`\vec{P_{R, prior}}` of the realtime filter to ROS. """self.send_prior_vehicle_pose(filter=self.realtime_filter,publisher=self.realtime_prior_vehicle_pose_publisher)
[docs]defsend_main_prior_vehicle_pose(self):r""" Publish prior vehicle pose and its uncertainty of the main filter to ROS. Publish prior vehicle pose :math:`\vec{x_{R, prior}}` and its uncertainty :math:`\vec{P_{R, prior}}` of the main filter to ROS. """self.send_prior_vehicle_pose(filter=self.main_filter,publisher=self.main_prior_vehicle_pose_publisher)
[docs]defsend_global_landmarks(self):"""Publish tracked landmarks, flag whether track is closed and cone count to ROS."""msg=ConeListWithCovariance()msg.header.stamp=rospy.Time.from_sec(self.main_filter.t)msg.header.frame_id='map'cones=[]landmarks=self.main_filter.landmarkslandmark_ids=self.main_filter.fake_landmark_idsperceived_n=self.main_filter.landmark_perceived_nforiinrange(landmarks.shape[0]):pos=ConePositionWithCovariance()pos.x=landmarks[i,0]pos.y=landmarks[i,1]pos.id=int(landmark_ids[i])pos.perceived_n=perceived_n[i]index=self.main_filter.x_r_dim+2*ipos.covariance=self.main_filter.P[index:index+2,index:index+2].flatten().tolist()cones.append(pos)msg.closed_track=self.closed_trackmsg.cones=conesself.landmark_publisher.publish(msg)closed_track_msg=ClosedTrack()closed_track_msg.closed=self.closed_trackclosed_track_msg.header.stamp=rospy.Time.from_sec(self.main_filter.t)self.closed_track_publisher.publish(closed_track_msg)# publish cone count allcone_count_msg=ConesCountAll()cone_count_msg.header.stamp=rospy.Time.from_sec(self.main_filter.t)cone_count_msg.header.frame_id='map'cone_count_msg.count=landmarks.shape[0]self.all_cones_publisher.publish(cone_count_msg)
[docs]defexecute_real_time_filter(self):r""" Execute realtime filter. Repeats the following until ROS is shutdown: #. Execute realtime filter for current timestamp #. Send vehicle pose :math:`\vec{x_R}` to ROS #. Send predicted measurements and more debug informations to ROS, if configured. #. Try to align preloaded map via gps, in case its configured and GPS measurements already received. #. Send prior vehicle pose :math:`\vec{x_{R, prior}}` to ROS, if configured #. Buffer estimated states :math:`\vec{x}` and their uncertainty :math:`\vec{P}` if main filter not existing yet #. Pop previously buffered states and their uncertainty if main filter is deactivated """rate=rospy.Rate(1/self.realtime_filter.predict_tolerance)rospy.loginfo('[SLAM] Start executing real time filter.')whilenotrospy.is_shutdown():now=rospy.Time.now().to_sec()self.execute_filter_for_timestamp(filter=self.realtime_filter,timestamp=now)self.send_realtime_vehicle_pose()self.send_predicted_measurements(filter=self.realtime_filter,publisher=self.predicted_measurement_publisher)self.try_aligning_preloaded_map_via_gps()self.send_realtime_prior_vehicle_pose()ifself.main_filterisNone:self.buffer_x_and_P(current_timestamp=now)elifself.main_filter_activeisFalse:self.pop_from_buffer(local_mapping_timestamp=now)rate.sleep()
[docs]defexecute_main_filter(self):r""" Execute main filter. Repeats the following as long as main filter is activated and until ROS is shutdown: #. If main filter is not set yet but local mapping measurements available a. Get vehicle pose :math:`\vec{x}` and its uncertainty :math:`\vec{P}` from the buffers :attr:`slam.SLAM.x_array` and :attr:`slam.SLAM.P_array` b. Copy realtime filter to main filter for respective timestamp with respective :math:`\vec{x}` and :math:`\vec{P}` #. Until current timestamp of main filter is less than :attr:`ukf.UKF.predict_tolerance` away from oldest unused local mapping measurement a. Execute main filter for :attr:`ukf.UKF.t` + :attr:`ukf.UKF.predict_tolerance` b. Send vehicle pose :math:`\vec{x_R}` to ROS c. Send prior vehicle pose :math:`\vec{x_{R, prior}}` to ROS d. Send predicted measurements and more debug informations to ROS, if configured. #. Execute main filter for timestamp of oldest unused local mapping measurement #. Send vehicle pose :math:`\vec{x_R}` to ROS #. Send prior vehicle pose :math:`\vec{x_{R, prior}}` to ROS #. Send predicted measurements and more debug informations to ROS, if configured. #. Send tracked landmarks to ROS if main filter is still active. """rate=rospy.Rate(1/self.realtime_filter.predict_tolerance)whilenotrospy.is_shutdown():ifself.main_filter_activeisTrue:ifself.local_mapping.ros_t_array:ifself.main_filterisNone:rospy.loginfo('[SLAM] Initialize main filter.')x,P,t=self.get_x_and_P_from_buffer(local_mapping_timestamp=self.local_mapping.ros_t_array[0])self.main_filter=self.realtime_filter.copy(x=x,P=P,timestamp=t,name='main')withself.main_lock:local_mapping_timestamp=self.local_mapping.ros_t_array[0]whilelocal_mapping_timestamp-self.main_filter.t>self.main_filter.predict_tolerance:self.execute_filter_for_timestamp(filter=self.main_filter,use_local_mapping=True,timestamp=self.main_filter.t+self.main_filter.predict_tolerance)self.send_main_vehicle_pose()self.send_main_prior_vehicle_pose()self.send_predicted_measurements(filter=self.main_filter,publisher=self.main_predicted_measurement_publisher)self.execute_filter_for_timestamp(filter=self.main_filter,use_local_mapping=True,timestamp=local_mapping_timestamp)self.send_main_vehicle_pose()self.send_main_prior_vehicle_pose()self.send_predicted_measurements(filter=self.main_filter,publisher=self.main_predicted_measurement_publisher)self.pop_from_buffer(local_mapping_timestamp=local_mapping_timestamp)ifself.main_filter_active:self.send_global_landmarks()self.try_aligning_preloaded_map_via_main_filter()else:time.sleep(0.005)else:rate.sleep()
[docs]defalign_preloaded_map(self):""" Align preloaded map via perceived and tracked landmarks. First, wait for the event to be set that enough landmarks have been tracked that preloaded map can be aligned. #. Get the data from the main filter #. Compute data association between preloaded map and tracked landmarks via JCBB #. Compute best transform between preloaded map and associated tracked landmarks #. Align centerpoints if they have been preloaded previously #. Load preloaded map into main filter #. Send debug informations of map alignment to ROS, if configured """# Initialize vtk object for transformation via vtk.landmark_transform=vtk.vtkLandmarkTransform()source_points=vtk.vtkPoints()target_points=vtk.vtkPoints()landmark_transform.SetModeToRigidBody()rospy.loginfo('[SLAM] [MapAlignment] Finished initializing map transformer.')self.align_event.wait()rospy.loginfo('[SLAM] [MapAlignment] Start aligning preloaded map.')# get data from main filterwithself.main_lock:landmark_condition=np.logical_and(np.array(self.main_filter.landmark_ids)!=3,np.array(self.main_filter.landmark_perceived_n)>self.alignment_perceived_n_threshold)main_landmarks=self.main_filter.landmarks[landmark_condition]main_landmark_ids=np.array(self.main_filter.landmark_ids)[landmark_condition]# JCBB# Calculate polygon around perceived and tracked landmarkscenter,(width,height),angle=cv2.minAreaRect(main_landmarks.astype(np.float32))# Calculate which landmarks of preloaded track might be observableobserved_landmarks_rectangle=cv2.boxPoints((center,(width+self.preloading_added_width,height+self.preloading_added_height),angle))observable_landmarks_mask=np.array([cv2.pointPolygonTest(observed_landmarks_rectangle,landmark,False)>0forlandmarkinself.preload_filter.landmarks])# Mapping which observable landmark belongs to which preloaded landmarkobservable_mapping=np.argwhere(observable_landmarks_mask)rospy.loginfo(f'[SLAM] [MapAlignment] Calculating mapping for aligning the preloaded map with 'f' {main_landmarks.shape[0]} observed cones and {np.count_nonzero(observable_landmarks_mask)} 'f'observable cones with observable retangle = {observed_landmarks_rectangle}')# Observable preloaded landmarkobservable_landmarks=self.preload_filter.landmarks[observable_landmarks_mask]observable_landmark_ids=np.array(self.preload_filter.landmark_ids,dtype=int)[observable_landmarks_mask]# Data association between tracked landmark and observable preloaded landmarkmapping=self.preload_filter.sensor('local_mapping')[1].compute_landmark_mapping(z=np.c_[main_landmarks,main_landmark_ids],observable_mapping=observable_mapping,observable_landmarks=np.c_[observable_landmarks,observable_landmark_ids])rospy.loginfo(f'[SLAM] [MapAlignment] Finished calculating mapping for aligning the preloaded map.'f' Effectly using {np.count_nonzero(mapping!=-1)} cones.')# plot observed & observable cones and the mapping between themtry:plot.plot_observed_landmark_vs_observable_landmarks(observed_landmarks=np.c_[main_landmarks,main_landmark_ids],observable_landmarks=np.c_[self.preload_filter.landmarks,np.array(self.preload_filter.landmark_ids)],mapping=mapping,test_day=rospy.get_param('/test_track/test_day'),track_layout=rospy.get_param('/test_track/track_layout'))exceptTclError:rospy.logwarn('[SLAM] [MapAlignment] Could not create mapping plot for map alignment.''Execute "sudo xhost +si:localuser:root" on host system please!')# set data in VTK Transformersource_points.SetData(numpy_support.numpy_to_vtk(np.c_[main_landmarks[mapping!=-1],np.zeros(main_landmarks[mapping!=-1].shape[0])],deep=True))target_points.SetData(numpy_support.numpy_to_vtk(np.c_[self.preload_filter.landmarks[mapping[mapping!=-1]],np.zeros(self.preload_filter.landmarks[mapping[mapping!=-1]].shape[0])],deep=True))# Calculate transform to align preloaded map and tracked landmarkslandmark_transform.SetSourceLandmarks(source_points)landmark_transform.SetTargetLandmarks(target_points)landmark_transform.Update()landmark_transform.MakeTransform()# get rotation and translation from transformrotation_matrix=np.empty((2,2))offset=np.empty((2))vtk_transform_matrix=landmark_transform.GetMatrix()foriinrange(2):forjinrange(2):rotation_matrix[i,j]=vtk_transform_matrix.GetElement(i,j)offset[0]=vtk_transform_matrix.GetElement(0,3)offset[1]=vtk_transform_matrix.GetElement(1,3)rospy.loginfo(f'[SLAM] [MapAlignment] Using preloaded map with offset: {offset[0]},'f' {offset[1]} and rotation {np.degrees(np.arccos(rotation_matrix[0,0]))}° 'f'and {self.preload_filter.landmarks.shape[0]} cones.')self.preload_aligned=True# if centerpoints are also preloaded, align them alsoifself.preloaded_centerpointsisnotNone:centerpoints=np.c_[self.preloaded_centerpoints['x'],self.preloaded_centerpoints['y']]centerpoints=np.dot(centerpoints-offset,rotation_matrix)self.preloaded_centerpoints['x']=centerpoints[:,0].tolist()self.preloaded_centerpoints['y']=centerpoints[:,1].tolist()# Load aligned and transformed preloaded landmarks into main filterself.load_preloaded_filter_into_main_filter(landmarks=np.dot(self.preload_filter.landmarks-offset,rotation_matrix))# If configured, send informations about map alignment to ROSifself.send_debug_informationsisFalse:returnmap_alignment_msg=MapAlignment()map_alignment_msg.header.frame_id='map'map_alignment_msg.header.stamp=rospy.Time.from_sec(self.main_filter.t)map_alignment_msg.translation=-offsetmap_alignment_msg.rotation_matrix=rotation_matrix.flatten()alignment_mapping=np.full(mapping.shape[0],-1)helping_mask=np.zeros(self.preload_filter.landmarks.shape[0])helping_mask[observable_mapping[:,0]]=np.arange(observable_mapping.shape[0])alignment_mapping[mapping!=-1]=helping_mask[mapping[mapping!=-1]]map_alignment_msg.mapping=alignment_mapping.astype(int).tolist()map_alignment_msg.individual_compatibility=self.preload_filter.individual_compatability.flatten()observable_landmark_covariances=[self.preload_filter.observable_state_covariance[2*observable_landmark_i:2*observable_landmark_i+2,2*observable_landmark_i:2*observable_landmark_i+2]forobservable_landmark_iinrange(observable_landmark_ids.shape[0])]map_alignment_msg.observable_preloaded_cones=[ConePositionWithCovariance(x=cone[0],y=cone[1],id=int(cone_id),covariance=covariance.flatten().tolist())forcone,cone_id,covarianceinzip(observable_landmarks,observable_landmark_ids,observable_landmark_covariances)]map_alignment_msg.tracked_observed_cones=[ConePosition(x=cone[0],y=cone[1],id=int(cone_id),probability=1)forcone,cone_idinzip(main_landmarks,main_landmark_ids)]self.map_alignment_publisher.publish(map_alignment_msg)
[docs]deftry_aligning_preloaded_map_via_gps(self):""" Try to align preloaded map via gps measurements. .. todo: Check if function for transforming is still up to date or another must be used. Preload filter must be set and alignment via gps configured. Also GPS measurements must exist and transformer for GPS measurements exist. If so, translate and rotate preloaded map and afterwards load preloaded map into main filter. """ifself.preload_filterisNoneorself.preloaded_gps_measurementsisNoneorself.preload_alignedisTrue:returnifself.preloaded_map_alignment_activeandself.use_gps_instead_of_tracked_mapisTrue:gps_position_sensor:NovatelGPSPosition=self.realtime_filter.sensor('gps_position')[1]ifself.realtime_filter.initial_heading_tisnotNoneandgps_position_sensor._transformerisnotNone:gps_heading_sensor:NovatelGPSHeading=self.realtime_filter.sensor('gps_heading')[1]offset=gps_position_sensor._transformer.transform(xx=self.preloaded_gps_measurements['lat'],yy=self.preloaded_gps_measurements['lon'],errcheck=True,direction='INVERSE')rotation=gps_heading_sensor.hx_inverse(np.array([self.preloaded_gps_measurements['heading']]))[0]rospy.loginfo(f'[SLAM] [MapAlignment] Using preloaded map with offset = {offset}, 'f'rotation = {np.degrees(rotation)} ° and {self.preload_filter.landmarks.shape[0]} cones.'' Used GPS for alignment.')self.preload_aligned=Trueself.load_preloaded_filter_into_main_filter(landmarks=geometry.rotate_and_move(xy=self.preload_filter.landmarks,offset_before_rotation=offset,rotation=rotation))
[docs]deftry_aligning_preloaded_map_via_main_filter(self):""" Try to align preloaded map via the perceived and tracked cones in the main filter. Preload filter must be set, alignment configured but not via gps. Also atleast a configured amount of landmarks must be perceived excluding big orange cones. If so, alignment event is set so other thread can align preloaded map. """ifself.preload_filterisNone:returnifself.preloaded_map_alignment_activeandself.use_gps_instead_of_tracked_mapisFalse:usable_landmarks_for_alignment=np.count_nonzero(np.logical_and(np.array(self.main_filter.landmark_ids)!=3,np.array(self.main_filter.landmark_perceived_n)>self.alignment_perceived_n_threshold))ifusable_landmarks_for_alignment>self.minimum_preload_landmarks:ifnotself.align_event.is_set():rospy.loginfo('[SLAM] [MapAlignment] Set event to align maps.')self.align_event.set()
[docs]defexecute_filter_for_timestamp(self,filter:UKF,timestamp:float,use_local_mapping:bool=False):# noqa:A002""" Execute predict and update for a filter. #. Delete measurements for all sensors for the given filter used in the last timestamp. #. Set measurements from the buffered ROS messages for the given timestamp and filter. #. Prepare the update by preparing every sensor and calcuting which states have to be predicted and updated. #. Predict. #. Update. #. Postprocess the update by post processing every sensor. """self.delete_measurements(filter=filter)self.set_measurements(filter=filter,timestamp=timestamp,use_local_mapping=use_local_mapping)filter.prepare_update()filter.try_predict(timestamp-filter.t,filter.u)filter.t=timestampfilter.try_update(filter.u)filter.postprocess_update()
[docs]defbuffer_x_and_P(self,current_timestamp:float):""" Buffer current state vector :math:`x` and covariance matrix :math:`P` of realtime filter for current timestamp. Buffer current state vector :math:`x` to :attr:`slam.SLAM.x_array` and covariance matrix :math:`P` to to :attr:`slam.SLAM.P_array`. Parameters ---------- current_timestamp : float Timestmap for which the current state vector :math:`x` and covariance matrix :math:`P` should be buffered. """self.t_array.append(current_timestamp)self.x_array.append(self.realtime_filter.x)self.P_array.append(self.realtime_filter.P)
[docs]defget_x_and_P_from_buffer(self,local_mapping_timestamp:float)->Tuple[npt.NDArray,npt.NDArray,float]:""" Get latest buffered state vector :math:`x` and covariance matrix :math:`P` which are before the given timestamp. Parameters ---------- local_mapping_timestamp : float Timestamp for which the latest state vector and covariance matrix should be returned. Returns ------- Tuple[npt.NDArray, npt.NDArray, float] State vector :math:`x`, covariance matrix :math:`P` and timestamp of both. Raises ------ RuntimeError No timestamp buffered suitable for given timestamp. """timestamp_index=next((t_ifort_i,tinenumerate(self.t_array)ift>local_mapping_timestamp),-1)iftimestamp_index==0:raiseRuntimeError('Necessary timestamp for local mapping update not buffered.')else:timestamp_index-=1returnself.x_array[timestamp_index],self.P_array[timestamp_index],self.t_array[timestamp_index]
[docs]defpop_from_buffer(self,local_mapping_timestamp:float):""" Pop all buffered messages from buffers for control inputs and measurements preceding specified timestamp. Parameters ---------- local_mapping_timestamp : float Timestamp until which all buffered messages should be popped. """self.pop_u_from_buffer(local_mapping_timestamp=local_mapping_timestamp)forsensorinself.realtime_filter.sensors:sensor.pop_z_and_R_from_buffer(local_mapping_timestamp=local_mapping_timestamp)
[docs]defrun(self):""" Run SLAM. First waits for every control input to be set, then executes the filter according to the defined dt / rate. Also correct camera/LiDAR offset according to transformation tree. """rospy.loginfo('[SLAM] SLAM started.')whilenotrospy.is_shutdown():iflen(self.gyro_u_array)>0andlen(self.accel_u_array)>0andlen(self.steering_u_array)>0:breaktime.sleep(self.realtime_filter.predict_tolerance)rospy.loginfo('[SLAM] Control inputs initialized. Go.')start=rospy.Time.now().to_sec()self.buffer_x_and_P(start-self.realtime_filter.predict_tolerance)self.realtime_filter_start=startself.realtime_filter.t=start# overwrite camera and LiDAR offsetself.correct_offsets()whilenotrospy.is_shutdown():self.execute_real_time_filter()
[docs]defload_preloaded_filter_into_main_filter(self,landmarks:npt.NDArray):""" Load preloading filter into main filter. Afterwards send preloaded landmarks and publish preloaded centerpoints if necessary. If configured, disable perception and main filter after loading landmarks into main filter. Otherwise stop filter from initializing any new landmarks when seeing new ones. Parameters ---------- landmarks : npt.NDArray Landmarks to preload, shape: (n, 2) with n equals number of landmarks. """withself.main_lock:ifself.main_filterisNone:# if realtime filter was not execute yet, assume that now is the current time of main filter.timestamp=getattr(self.realtime_filter,'t',rospy.Time.now().to_sec())self.main_filter=self.realtime_filter.copy(x=self.realtime_filter.x,P=self.realtime_filter.P,timestamp=timestamp,name='main')else:self.main_filter.t=rospy.Time.now().to_sec()# assume that the preloaded and tracked map is closed when driving autocross or trackdrive.self.closed_track=self.missionin[AutonomousMissionEnum.AUTOCROSS,AutonomousMissionEnum.TRACKDRIVE]ifnotself.preloading_keep_updating:self.main_filter_active=Falseself.disable_perception()self.local_mapping_subscriber.unregister()self.update_landmark_update_strategy(strategy=helpers.LandmarkUpdateStrategy.DO_NOT_UPDATE)else:self.update_landmark_update_strategy(strategy=helpers.LandmarkUpdateStrategy.DO_NOT_INITIALIZE)# load landmarks from preloading filter into main filter but keep robot state from main filter.x_R=self.main_filter.x_RP_R=self.main_filter.P_Rself.main_filter.x=np.empty_like(self.preload_filter.x)self.main_filter.P=np.zeros_like(self.preload_filter.P)self.main_filter.x_R=x_Rself.main_filter.landmarks=landmarksself.main_filter.P_R=P_Rself.main_filter.P[self.main_filter.x_r_dim:,self.main_filter.x_r_dim:]= \
self.preload_filter.P[self.main_filter.x_r_dim:,self.main_filter.x_r_dim:]self.main_filter.landmark_ids=self.preload_filter.landmark_idsself.main_filter.fake_landmark_ids=self.preload_filter.fake_landmark_idsself.main_filter.landmark_perceived_n=self.preload_filter.landmark_perceived_n# delete preloading filterself.preloading_filter=None# send preloaded landmarks and publish centerpoints if necessaryself.send_global_landmarks()self.publish_centerpoints_if_necessary()
[docs]defpublish_centerpoints_if_necessary(self):""" Publish preloaded and (if configured and applicable) aligned centerpoints to ROS if necessary. It is necessary if we drive Skidpad, Acceleration or Braketest (EBS Test). Fatal if for those mission no preloaded centerpoints exists although a track was preloaded. """ifself.missionin[AutonomousMissionEnum.SKIDPAD,AutonomousMissionEnum.ACCELERATION,AutonomousMissionEnum.BRAKETEST]:ifself.preloaded_centerpointsisNone:rospy.logfatal('[SLAM] So you thought it is a good idea to drive skidpad, acceleration or ebs test but''not create a manual preloading map with centerpoints. Man this is going to be awful.')returnrospy.loginfo('[SLAM] Publishing centerpoints since we are driving skidpad, acceleration or ebs test.')self.publish_centerpoints()
[docs]defpublish_centerpoints(self):""" Publish preloaded and (if configured and applicable) aligned centerpoints to ROS. This is especially important for skidpad, since it is very difficult to plan a centerline including a crossing. It is way easiert to preload this track and its centerline. """centerpoints_msg=CenterPoints()centerpoints_msg.image_header.frame_id='map'centerpoints_msg.header.frame_id='map'centerpoints_msg.x=self.preloaded_centerpoints['x']centerpoints_msg.y=self.preloaded_centerpoints['y']centerpoints_msg.left_track_width=self.preloaded_centerpoints['left_track_width']centerpoints_msg.right_track_width=self.preloaded_centerpoints['right_track_width']centerpoints_msg.closed_track=self.preloaded_centerpoints['closed_track']centerpoints_msg.vehicle_pose.header.frame_id='map'withself.realtime_lock:ifhasattr(self.realtime_filter,'t')andself.realtime_filter.t!=0:timestamp=rospy.Time.from_sec(self.realtime_filter.t)else:timestamp=rospy.Time().now()centerpoints_msg.vehicle_pose.x=self.realtime_filter.x[0]centerpoints_msg.vehicle_pose.y=self.realtime_filter.x[1]centerpoints_msg.vehicle_pose.v_x=self.realtime_filter.x[2]centerpoints_msg.vehicle_pose.psi=self.realtime_filter.x[3]centerpoints_msg.header.stamp=timestampcenterpoints_msg.image_header.stamp=timestampcenterpoints_msg.vehicle_pose.header.stamp=timestampself.centerpoints_publisher.publish(centerpoints_msg)
[docs]deftry_preload_map(self):""" Try preloading map (when activated) from previously saved json file. Preloadable maps lie under ``as_ros/src/slam/plots/maps/<test_day>/<track_layout>/<file_name>.json``. Filename can be: #. ``manual.json``, being created by :ref:`track-creator` or :ref:`map-generation` #. ``ground_truth.json``, being created by :ref:`track-creator` #. ``map_<number>.json`` with number being auto incrementing. The highest number will have the highest priority. Tracks are loaded according to this order. When a filename does not exist, the next possible filename in order is chosen. An extra UKF, called peload filter, is used to load the preloaded map into. If alignment of the preloaded map is deactived, the preloaded map saved in the preloading filter is afterwards saved into the main filter. Also, if necessary and possible, the preloaded centerpoints are published. """ifself.preloading_activeisFalse:rospy.loginfo('[SLAM] Preloading deactivated.')returntry:file_path,preloaded_map=plot.load_preloading_map(test_day=rospy.get_param('/test_track/test_day'),track_layout=rospy.get_param('/test_track/track_layout'))iffile_pathisNoneormapisNone:rospy.logwarn(f'[SLAM] No Map to preload!: {rospy.get_param("/test_track/test_day")}/'f'{rospy.get_param("/test_track/track_layout")}')exceptException:rospy.logerr('[SLAM] Preloading Map failed!')return# Load map information into attributes and preloading filter.self.preloaded_gps_measurements=preloaded_map.get('gps',None)self.preloaded_centerpoints=preloaded_map.get('centerpoints',None)self.use_landmark_for_alignment=preloaded_map.get('registration')self.preload_filter=self.realtime_filter.copy(timestamp=0,x=self.realtime_filter.x,P=self.realtime_filter.P)self.preload_filter.landmark_ids=preloaded_map['id']self.preload_filter.fake_landmark_ids=preloaded_map['faked_id']self.preload_filter.landmark_perceived_n=preloaded_map.get('perceived_n',np.full(len(self.preload_filter.landmark_ids),500))landmarks_n=len(self.preload_filter.landmark_ids)states_n=2*landmarks_n+self.preload_filter.x_r_dim# load landmark position into states of preloading filterx=self.preload_filter.xself.preload_filter.x=np.empty(states_n)self.preload_filter.x_R=xself.preload_filter.x[self.preload_filter.x_r_dim::2]=np.array(preloaded_map['x'])self.preload_filter.x[self.preload_filter.x_r_dim+1::2]=np.array(preloaded_map['y'])# load landmark uncertainity into covariance matrix of preloading filterP=self.preload_filter.Pself.preload_filter.P=np.zeros((states_n,states_n))self.preload_filter.P_R=PP_x=rospy.get_param('/slam/preloading/P_x')P_y=rospy.get_param('/slam/preloading/P_y')foriinrange(self.preload_filter.x_r_dim,states_n,2):self.preload_filter.P[i:i+2,i:i+2]=np.array([[P_x,0],[0,P_y]])rospy.loginfo(f'[SLAM] Successfully imported map from {file_path}!')# if alignment is deactivated, preloading filter is directly loaded into main filterifself.preloaded_map_alignment_activeisFalse:rospy.loginfo('[SLAM] Alignment is deactivated, load preloading filter into main filter.')self.load_preloaded_filter_into_main_filter(landmarks=self.preload_filter.landmarks)# publish centerpoints if necessary, condition is explained in respective functionself.publish_centerpoints_if_necessary()
[docs]defsave_tracked_map(self):""" Save tracked map to json file. Tracked maps are saved under ``as_ros/src/slam/plots/maps/<test_day>/<track_layout>/map_<number>.json``. Number is auto incrementing for the giben test day and track layout. """withself.main_lock:file_path=plot.save_tracked_map(filter=self.main_filter,gps_filter=self.realtime_filter,test_day=rospy.get_param('/test_track/test_day'),track_layout=rospy.get_param('/test_track/track_layout'))rospy.logwarn(f'[SLAM] Saved tracked map to {file_path}.')
[docs]defdisable_perception(self):"""Disabling perception by sending a ROS message to inference node."""ifself.disable_perception_when_inactiveisTrue:msg=DisablePerception()msg.disable_perception=Truemsg.header.stamp=rospy.Time.now()self.disable_perception_publisher.publish(msg)rospy.loginfo('[SLAM] Disabling Perception!')else:rospy.loginfo('[SLAM] Not disabling perception again since option is disabled!')
[docs]defenable_perception(self):"""Enable perception again by sending a ROS message to inference node after disabling it."""ifself.disable_perception_when_inactiveisTrue:msg=DisablePerception()msg.disable_perception=Falsemsg.header.stamp=rospy.Time.now()self.disable_perception_publisher.publish(msg)rospy.loginfo('[SLAM] Enabling Perception again!')else:rospy.loginfo('[SLAM] Not enabling perception again since we should not have disabled at anyway.')
[docs]defon_shutdown(self):"""Save tracked map on shutdown."""self.save_tracked_map()