Source code for sensors.landmarks

"""
Implementation of all Sensors concerning landmarks.
"""
from __future__ import annotations
import logging
from typing import TYPE_CHECKING, Tuple

if TYPE_CHECKING:
    import ukf

from typing import List, Optional
import numpy.typing as npt
import numpy as np

from utils.helpers import LandmarkUpdateStrategy

from .sensor import Sensor
from utils import geometry, math, data_association, helpers

from astropy.stats import circmean


[docs]class LocalMapping(Sensor): """ Sensor class for Landmarks from the local mapping. Implements the LocalMapping sensor from EVA. Attributes ---------- filter : ukf.UKF KalmanFilter the sensor will be used for. sensor_name : str Name to identify the sensor by. sensor_i : int Index of the sensor as part of the Kalman Filter. z_names : List[str] Names of the measurements of the sensor. z_dim : npt.NDArray Dimension of measurement vector derived by length of z_names. z_array : npt.NDArray Numpy array of prerecorded measurements. R_array : npt.NDArray Numpy array of prerecorded measurement noise covariance matrices. active : bool Specifies whether the update for a sensor should be executed or only the measurements should be read in. x_indices_to_update : Set Indices of the state variables to use for predict and update. Based on base and situation dependent extended. x_indices_to_update_base : Set Base of indices of the state variables to always use for predict and update. z_indices_to_update : npt.NDArray Measurement indices to use for update. z_indices_to_initialize : npt.NDArray Measurement indices to initialize while post_processing update. execute_update_postprocess : bool Flag to decide whether the update of the sensor should be postprocessed. Used to intiialize intern variables or new state variables. R_azimut : float Measurement noise variance for the azimut angle of the landmark measurement. R_distance_factor : float Factor by which the distance variance increases with distance. R_offset : float Offset of the line used to increase variance with distance. camera_offset : npt.NDArray Offset of the camera position relative to the vehicle origin (middle of the rear axle). lidar_offset : npt.NDArray Offset of the LiDAR position relative to the vehicle origin (middle of the rear axle). fov_angle : float Field of View angle of the camera. fov_min_distance : float Minimum field of view distance of the camera. Measured landmarks will only be considered if they are not closer. fov_max_distance : float Maximum field of view distance of the camera. Measured landmarks will only be considered if they are not farther away. fov_scaling : float Scaling factor for the Field of View angle and distance to extract the theoretically measurable landmarks. probabilty_gate : List[float] Minimum probabilty for which landmarks will be considered. Different for each cone color. _colors : List[str] Mapping of the landmark id/type to the respective color. _size : List[int] Mapping of the landmark id/type to the respective size. chi2_quantile : float Threshold for the mahalanobis distance to associate measured and tracked landmarks. data_association : helpers.DataAssociation Which data association algorithm to use. use_kd_tree : bool Whether to use the KD Tree for data association or not. """ _colors = ['gold', 'blue', 'orange', 'orange'] _size = [50, 50, 50, 80] def __init__(self, filter: ukf.UKF, sensor_name: str, z_names: List[str], z_array: Optional[npt.NDArray] = None, R_array: Optional[npt.NDArray] = None, active: bool = False, data_association: helpers.DataAssociation = helpers.DataAssociation.ClusterJCBB, use_kd_tree: bool=True, chi2_quantile: float=0.8, fov_min_distance: float=0, fov_max_distance: float=12, fov_angle: float=100, fov_scaling: float=1.2, probability_gate: List[float]=[0.8, 0.8, 0.8, 0.94], R_azimuth: float=10, R_distance_factor: float=0.15, R_offset: float=0.15, update_strategy: Optional[LandmarkUpdateStrategy] = LandmarkUpdateStrategy.ALL) -> None: """ Initialize an object of the LocalMapping sensor of autonomous system. Parameters ---------- filter : ukf.UKF KalmanFilter object the sensor will be used for. sensor_name : str Name to identify the sensor by. z_names : List[str] List of the names of the measurements with the measurement count as length. z_array : npt.NDArray, optional Numpy Array of the prerecorded measurements. Prerecorded measurements of all sensor must be synchronized and have the same length, by default None. R_array : Optional[npt.NDArray], optional Numpy array of prerecorded measurement noises, by default None active : bool, optional Specifies whether the update for a sensor should be executed or only the measurements should be read in, by default True data_association : helpers.DataAssociation, optional Which data association algorithm to use, by default helpers.DataAssociation.ClusterJCBB use_kd_tree: bool, optional Whether to use the KD Tree for data association or not. chi2_quantile: float, optional Threshold for the mahalanobis distance to associate measured and tracked landmarks. fov_min_distance: float, optional Minimum field of view distance of the camera. Measured landmarks will only be considered if they are not closer. fov_max_distance: float, optional Maximum field of view distance of the camera. Measured landmarks will only be considered if they are not farther away. fov_angle: float, optional Field of View angle of the camera. fov_scaling: float, optional Scaling factor for the Field of View angle and distance to extract the theoretically measurable landmarks. probability_gate : List, Optional Minimum probabilty for which landmarks will be considered. Different for each cone color. R_azimuth: float, optional Measurement noise variance for the azimut angle of the landmark measurement. R_distance_factor: float, optional Factor by which the distance variance increases with distance. R_offset: float, optional Offset of the line used to increase variance with distance. """ super().__init__(filter, z_array=z_array, R_array=R_array, sensor_name=sensor_name, z_names=z_names, active=active) self.camera_offset = np.array([0.59, 0]) self.lidar_offset = np.array([0.57, 0]) self.data_association = data_association self.use_kd_tree = use_kd_tree self.chi2_quantile = chi2_quantile self.fov_min_distance = fov_min_distance self.fov_max_distance = fov_max_distance self.fov_angle = np.radians(fov_angle) self.fov_scaling = fov_scaling self.probability_gate = np.array(probability_gate) self.R_azimut = np.radians(R_azimuth) ** 2 self.R_distance_factor = R_distance_factor self.R_offset = R_offset self.update_strategy = update_strategy
[docs] def copy(self, filter: ukf.UKF, timestamp: float) -> 'LocalMapping': copy = LocalMapping(filter=filter, sensor_name=self.sensor_name, z_names=self.z_names, active=self.active, data_association=self.data_association, use_kd_tree=self.use_kd_tree, chi2_quantile=self.chi2_quantile, fov_min_distance=self.fov_min_distance, fov_max_distance=self.fov_max_distance, fov_angle=self.fov_angle, fov_scaling=self.fov_scaling, probability_gate=self.probability_gate, R_azimuth=self.R_azimut, R_distance_factor=self.R_distance_factor, R_offset=self.R_offset, update_strategy=self.update_strategy) copy.fov_angle = self.fov_angle copy.ros_z_array = self.ros_z_array copy.ros_t_array = self.ros_t_array copy.measurement_lock = self.measurement_lock copy.camera_offset = self.camera_offset copy.lidar_offset = self.lidar_offset copy.R_azimut = self.R_azimut return copy
[docs] def prepare_update(self) -> None: """ Prepares update step and returns whether the update for this sensor should be executed. Doesn't save measurement and measurement noise for this sensor in the filter object, but calculates measurement indices respectively for update and intiialization and state variable indices for predict and update. This indices are based on the previous executed data association between tracked landmarks (evaporated to obersavable landmarks) and observed landmarks. Returns ------- bool Specifies wthether the update for this sensor should be executed. """ if self.update_strategy == LandmarkUpdateStrategy.DO_NOT_UPDATE: self.x_indices_to_update = set() self.x_indices_to_initialize = set() self.execute_update_postprocess = False return False self.filter.current_global_landmarks = None self.filter.observable_to_global_mapping = None self.filter.individual_compatability = None self.filter.observed_landmarks = None self.filter.observable_landmarks = None self.filter.observable_state_covariance = None self.filter.mapping = [] self.filter.local_mapping_nodes_explored_n = 0 self.filter.local_mapping_compared_associations_n = 0 self.filter.sensor_z[self.sensor_i] = np.broadcast_to(0, self.filter.max_z_dim) self.filter.sensor_R[self.sensor_i] = np.broadcast_to(0, (self.filter.max_z_dim, self.filter.max_z_dim)) if np.any(np.isnan(self.z)): return False # fov and probability gates norm = np.linalg.norm(self.z[:, 0:2], axis=1) # Conex, coney, coneid, coneprob, is_lidar z = self.z[np.logical_and.reduce((self.fov_min_distance < norm, norm < self.fov_max_distance, self.z[:, 3] > np.choose(self.z[:, 2].astype(int), self.probability_gate)))] global_observed_landmarks = self.convert_measurement_to_global_landmarks(z) observable_landmarks, observable_mapping = self.limit_to_observable_landmarks() observable_landmarks = np.append(observable_landmarks, np.atleast_2d(np.take(np.array(self.filter.landmark_ids).astype(int), observable_mapping)).T, axis=1) self.mapping = self.compute_landmark_mapping(global_observed_landmarks, observable_landmarks=observable_landmarks, observable_mapping=observable_mapping) known_indices, unknown_indices = self.compute_known_indices(self.mapping) temp_additional_state_indices = np.sort(self.mapping[known_indices]) additional_state_indices = np.empty((2 * temp_additional_state_indices.shape[0])) additional_state_indices[::2] = 4 + 2 * temp_additional_state_indices additional_state_indices[1::2] = 5 + 2 * temp_additional_state_indices self.x_indices_to_update = self.x_indices_to_update_base.union(additional_state_indices.astype(int)) self.z_indices_to_update = known_indices self.z_indices_to_initialize = unknown_indices self.z = geometry.tranform_cartesian_to_polar_cone_coords(z) self.z_cartesian = z self.global_observed_landmarks = global_observed_landmarks self.R = self.calculate_polar_cone_covariance(self.z) self.execute_update_postprocess = True return self.active and self.z_to_update.shape[0]
[docs] def postprocess_update(self) -> None: """ Postprocess update step for this sensor. Used to intitialize newly observed landmarks as new state variables and save the current observed landmarks. """ global_observed_landmarks = self.convert_measurement_to_global_landmarks(self.z_cartesian) polar_local_observed_coords = self.z for landmark_i in self.mapping: if landmark_i != -1: self.filter.landmark_perceived_n[landmark_i] += 1 if not self.update_strategy == LandmarkUpdateStrategy.DO_NOT_INITIALIZE: for global_observed_landmark, polar_local_observed_coord in \ zip(global_observed_landmarks[self.z_indices_to_initialize], polar_local_observed_coords[self.z_indices_to_initialize]): self.initialize_landmark(global_observed_landmark, polar_local_observed_coord) self.filter.current_global_landmarks = global_observed_landmarks self.execute_update_postprocess = False
[docs] def convert_measurement_to_global_landmarks(self, z: npt.NDArray) -> npt.NDArray: """ Converts cartesian coordinates of local observed coordinates to global cartesian coordinates. Parameters ---------- z : npt.NDArray Current measurement vector of the sensor. [(x, y, type, probability, is_lidar), ...]. Returns ------- npt.NDArray Global cartesian coordinates of observed landmarks. """ # move measured landmarks by camera offset, rotate around global heading and move by global position is_lidar_index = z[:, 4].astype(bool) global_landmarks = geometry.rotate_and_move(z[:, 0:2].copy(), rotation=self.filter.x[3], offset_after_rotation=self.filter.x[0:2], offset_before_rotation=self.camera_offset, offset_before_rotation_lidar=self.lidar_offset, is_lidar_index=is_lidar_index) # append landmark id and probability again global_landmarks = np.append(global_landmarks, z[:, 2:4], axis=1) return global_landmarks
[docs] def initialize_landmark(self, global_landmark: npt.NDArray, local_coords_polar: npt.NDArray) -> None: """ Initializes new, unknown landmark. Expands state vector x and system noise covariance matrices P and Q. Parameters ---------- global_landmark : npt.NDArray Global position and type of landmark. local_coords_polar : npt.NDArray Local position of landmark in polar coordinates. """ dims_x = self.filter.x.shape[0] # expand state vector, landmark id (color, size) and covariance matrix self.filter.x = np.append(self.filter.x, global_landmark[0:2]) self.filter.landmark_ids.append(global_landmark[2]) self.filter.fake_landmark_ids.append(global_landmark[2]) self.filter.landmark_perceived_n.append(1) self.filter.P = np.append(self.filter.P, np.zeros((2, dims_x)), axis=0) self.filter.P = np.append(self.filter.P, np.zeros((dims_x+2, 2)), axis=1) # calculate cartesian cone covariance local_coords_polar = local_coords_polar.copy() local_coords_polar[0] += self.filter.x[3] cartesian_cone_covariance = self.calculate_cartesian_cone_covariance(np.atleast_2d(local_coords_polar)) self.filter.P[dims_x:dims_x+2, dims_x:dims_x+2] = np.around(cartesian_cone_covariance + self.filter.P[:2, :2], decimals=self.filter.max_decimals)
[docs] @staticmethod def measurement_mean(sigmas: npt.NDArray, Wm: npt.NDArray) -> npt.NDArray: """ Calculates the weighted mean of multiple measurement vectors. Parameters ---------- sigmas : npt.NDArray Sigma points of the measurement vector. Wm : npt.NDArray Weights for the calculation of the weighted mean of the measurement. Returns ------- npt.NDArray Weighted mean of the sigma points of the measurement vector. """ z = np.empty(sigmas.shape[1]) # weighted circular mean for the azimuth angle of the polar coordinates z[::2] = circmean(sigmas[:, ::2], axis=0, weights=np.repeat(np.atleast_2d(Wm), repeats=z.shape[0]/2, axis=0).T) # normal weights mean for the distance of the polar coordinates z[1::2] = np.average(sigmas[:, 1::2], axis=0, weights=Wm) return z
[docs] @staticmethod def residual_sigmash_and_z(sigmas_h: npt.NDArray[np.floating], z: npt.NDArray[np.floating]) -> npt.NDArray[np.floating]: """ Calculates the difference between the measurement sigma points and the measurement. Parameters ---------- sigmas_h : npt.NDArray[np.floating] Sigma points for the measurement. z : npt.NDArray[np.floating] Measurement Returns ------- npt.NDArray[np.floating] Difference between sigma points and measurements. """ dz = sigmas_h - z dz[..., ::2] = math.normalize_rad_angles(dz[..., ::2]) return dz
[docs] def hx(self, sigmas_f: npt.NDArray[np.floating], u: Optional[npt.NDArray[np.floating]]) -> npt.NDArray[np.floating]: """ Computes the measurement vector the sensor should measure based on the current state vector and camera offset. Parameters ---------- sigmas_f : npt.NDArray[np.floating] Sigma point of the current state vector of the system. u : Optional[npt.NDArray[np.floating]] Current control input vector of the system. Unused for this sensor. Returns ------- npt.NDArray[np.floating] Measurement state vector that should be measured given the current state vector. """ if self.filter.update_strategy == helpers.UpdateStrategy.ONLY_ROBOT_POSE_AND_NECESSARY_STATES: mapping = np.argsort(np.argsort(self.mapping[self.z_indices_to_update])) else: mapping = self.mapping[self.z_indices_to_update] if self.update_strategy == LandmarkUpdateStrategy.NO_POSE: heading = sigmas_f[0, 3] position = sigmas_f[0, np.newaxis, 0:2] elif self.update_strategy == LandmarkUpdateStrategy.NO_HEADING: heading = sigmas_f[0, 3] position = sigmas_f[:, np.newaxis, 0:2] else: heading = sigmas_f[:, 3] position = sigmas_f[:, np.newaxis, 0:2] rot_matrix = np.empty((sigmas_f.shape[0], 2, 2)) cos, sin = np.cos(heading), np.sin(heading) rot_matrix[:, 0, 0], rot_matrix[:, 0, 1], rot_matrix[:, 1, 0], rot_matrix[:, 1, 1] = cos, sin, -sin, cos global_estimated_measured_landmarks = sigmas_f[:, 4:].reshape( (sigmas_f.shape[0], -1, 2))[:, mapping] - position local_estimated_measured_landmarks = np.einsum( 'ijk, ilk -> ilj', rot_matrix, global_estimated_measured_landmarks) - self.camera_offset landmark_azimuth = np.arctan2(local_estimated_measured_landmarks[..., 1], local_estimated_measured_landmarks[..., 0]) landmark_distance = np.hypot(local_estimated_measured_landmarks[..., 1], local_estimated_measured_landmarks[..., 0]) return np.dstack((landmark_azimuth, landmark_distance)).reshape((sigmas_f.shape[0], -1))
[docs] def hx_inverse(self, z: np.ndarray, u: Optional[npt.NDArray] = None) -> npt.NDArray: """ Calculate the state vector based on the current measurement of the sensor. # TODO: Implement :D . Not every state can be calculated, so only part of the state vector will be calculated. Use the transformer to convert lat, lon to local coordinates Parameters ---------- z : np.ndarray Measurement vector, can be a matrix with multiple time steps. Returns ------- np.ndarray State vector, can be a matrix with multiple time steps. """ return z
[docs] def limit_to_observable_landmarks(self) -> Tuple[npt.NDArray, npt.NDArray]: """ Limits and returns the observable subset of all tracked landmarks based on the current vehicle state and FoV. Returns ------- Tuple[npt.NDArray, npt.NDArray] Position of the landmarks and mapping between the observable landmarks and all tracked landmarks. """ assert self.filter.landmarks is not None path = geometry.generate_fov_path(fov_min_distance=self.fov_min_distance, fov_max_distance=self.fov_max_distance, fov_angle=self.fov_angle, fov_scaling=self.fov_scaling, offset_after_rotation=self.filter.x[0:2], offset_before_rotation=self.camera_offset, rotation=self.filter.x[3]) points, indices = geometry.limit_coordinates_inside_path(coordinates=self.filter.landmarks[:, 0:2], path=path) return points, indices
[docs] def compute_landmark_mapping(self, z: npt.NDArray, observable_landmarks: npt.NDArray, observable_mapping: npt.NDArray) -> npt.NDArray: """ Computes the mapping between the currently observed landmarks and all tracked landmarks. Only uses the currently oberservable landmarks for the calculation. Later translate this mapping into the desired one. Very naive and inconsistent approach. Calculates the mahalanobis distance for every possible combination. Parameters ---------- z : npt.NDArray Current measurement vector of the sensor. [(x, y, type, propability), ...]. observable_landmarks : npt.NDArray Subset of all tracked landmarks that could be observed right now. observable_mapping : npt.NDArray Mapping / Association between the observable landmarks and all tracked landmarks. Returns ------- npt.NDArray Mapping / Associations between the observed landmarks and all tracked landmarks. """ if observable_landmarks.shape[0] == 0: return np.full(z.shape[0], -1) temp_observable_indices = 2 * observable_mapping observable_indices = np.sort(np.append(4 + temp_observable_indices, 4 + temp_observable_indices + 1)) observable_state_covariance = self.filter.P[np.ix_(observable_indices, observable_indices)] # use and execute data association between the observed and observable landmarks using the JCBB algorithm if self.data_association == helpers.DataAssociation.ClusterJCBB: JCBB = data_association.ClusterJCBB elif self.data_association == helpers.DataAssociation.JCBB: JCBB = data_association.JCBB else: logging.critical('The specified data association algorithm is not implemented. Exiting') exit(137) jcbb = JCBB(observed_landmarks=z, observable_landmarks=observable_landmarks, observable_state_covariance=observable_state_covariance, observable_to_global_mapping=observable_mapping, use_kd_tree=self.use_kd_tree, chi2quantile=self.chi2_quantile, verifies_landmark_id=True) self.filter.observed_landmarks = z self.filter.observable_landmarks = observable_landmarks self.filter.observable_state_covariance = observable_state_covariance # save the mapping between the observable and all tracked landmarks for plotting later on self.filter.observable_to_global_mapping = observable_mapping # save the association between the observed and observable landmarks for plotting later on self.filter.individual_compatability = jcbb.individual_compatibility mapping = jcbb.best_associations.array self.filter.mapping = mapping self.filter.local_mapping_nodes_explored_n = jcbb.nodes_explored_n self.filter.local_mapping_compared_associations_n = len(jcbb.associations_objects) self.clusters = jcbb.clusters return mapping
[docs] def compute_known_indices(self, mapping: npt.NDArray) -> Tuple[npt.NDArray, npt.NDArray]: """ Computes indices of which measured landmarks are known and which are unknown Parameters ---------- mapping : npt.NDArray Mapping / association between the observed landmarks and all tracked landmarks. Returns ------- Tuple[npt.NDArray, npt.NDArray] Indices of all observed landmarks that are already tracked, Indices of all observed landmarks that are not already tracked. """ indices = mapping >= 0 return indices, np.invert(indices)
[docs] def calculate_polar_cone_covariance(self, polar_cones: npt.NDArray) -> npt.NDArray: """ Computes the covariance matrix in polar coordinates for given set of cones in polar coordinates. Uses a predefined variance for the azimut angle. Uses a percentage of the distance for the standard deviation of the distance. Parameters ---------- polar_cones : npt.NDArray Coordinates of the cones in polar coordinates. Returns ------- npt.NDArray Cones covariances in polar coordinates. """ diagonal = np.empty(polar_cones.shape[0]*2) diagonal[::2] = np.full(polar_cones.shape[0], self.R_azimut) diagonal[1::2] = (polar_cones[:, 1] * self.R_distance_factor + self.R_offset)**2 return np.diag(diagonal)
[docs] def calculate_cartesian_cone_covariance(self, polar_cone: npt.NDArray) -> npt.NDArray: """ Computes the covariance matrix in cartesian coordinates for a given cone in polar coordinates. :cite:p:`Hall1992` Parameters ---------- polar_cone : npt.NDArray Coordinates of the cone in polar coordinates. Returns ------- npt.NDArray Cone covariance in cartesian coordinates. """ polar_covariance = self.calculate_polar_cone_covariance(polar_cone) adapted_covariance = np.diag([polar_covariance[1, 1], np.square(polar_cone[0, 1] * np.tan(np.sqrt(polar_covariance[0, 0])))]) rotation_matrix = np.array([[np.cos(polar_cone[0, 0]), -np.sin(polar_cone[0, 0])], [np.sin(polar_cone[0, 0]), np.cos(polar_cone[0, 0])]]) return rotation_matrix @ adapted_covariance @ rotation_matrix.T
[docs] @staticmethod def get_color_for_cone_ids(cone_ids: npt.NDArray) -> npt.NDArray: return np.choose(cone_ids.astype(int), LocalMapping._colors)
[docs] @staticmethod def get_size_for_cone_ids(cone_ids: npt.NDArray) -> npt.NDArray: return np.choose(cone_ids.astype(int), LocalMapping._size)
[docs] def get_prerecorded_z_and_R(self, last_prediction_step: int, i: int) -> Tuple[npt.NDArray, npt.NDArray]: """ Just get the current measurement. Parameters ---------- last_prediction_step : int Last executed prediction step. i : int Current index of the prerecorded measurement and measurement noise array. """ self.z = self.z_array[i] self.R = None
@property def z_to_update(self): """ Returns measurements to update. Returns ------- npt.NDArray Measurements to update """ return self.z[self.z_indices_to_update, 0:2].flatten() @property def R_to_update(self): """ Returns measurement noise to update. Returns ------- npt.NDArray Measurement noise for the measurements to update. """ R_indices_to_update = np.empty(2*self.z_indices_to_update.shape[0], dtype=bool) R_indices_to_update[::2] = self.z_indices_to_update R_indices_to_update[1::2] = self.z_indices_to_update return self.R[R_indices_to_update][:, R_indices_to_update]