"""
Implementation of all Sensors concerning landmarks.
"""
from __future__ import annotations
import logging
from typing import TYPE_CHECKING, Tuple
if TYPE_CHECKING:
import ukf
from typing import List, Optional
import numpy.typing as npt
import numpy as np
from utils.helpers import LandmarkUpdateStrategy
from .sensor import Sensor
from utils import geometry, math, data_association, helpers
from astropy.stats import circmean
[docs]class LocalMapping(Sensor):
"""
Sensor class for Landmarks from the local mapping.
Implements the LocalMapping sensor from EVA.
Attributes
----------
filter : ukf.UKF
KalmanFilter the sensor will be used for.
sensor_name : str
Name to identify the sensor by.
sensor_i : int
Index of the sensor as part of the Kalman Filter.
z_names : List[str]
Names of the measurements of the sensor.
z_dim : npt.NDArray
Dimension of measurement vector derived by length of z_names.
z_array : npt.NDArray
Numpy array of prerecorded measurements.
R_array : npt.NDArray
Numpy array of prerecorded measurement noise covariance matrices.
active : bool
Specifies whether the update for a sensor should be executed or only the measurements should be read in.
x_indices_to_update : Set
Indices of the state variables to use for predict and update. Based on base and situation dependent extended.
x_indices_to_update_base : Set
Base of indices of the state variables to always use for predict and update.
z_indices_to_update : npt.NDArray
Measurement indices to use for update.
z_indices_to_initialize : npt.NDArray
Measurement indices to initialize while post_processing update.
execute_update_postprocess : bool
Flag to decide whether the update of the sensor should be postprocessed. Used to intiialize intern variables
or new state variables.
R_azimut : float
Measurement noise variance for the azimut angle of the landmark measurement.
R_distance_factor : float
Factor by which the distance variance increases with distance.
R_offset : float
Offset of the line used to increase variance with distance.
camera_offset : npt.NDArray
Offset of the camera position relative to the vehicle origin (middle of the rear axle).
lidar_offset : npt.NDArray
Offset of the LiDAR position relative to the vehicle origin (middle of the rear axle).
fov_angle : float
Field of View angle of the camera.
fov_min_distance : float
Minimum field of view distance of the camera. Measured landmarks will only be considered if they are not closer.
fov_max_distance : float
Maximum field of view distance of the camera. Measured landmarks will only be considered if they are not farther away.
fov_scaling : float
Scaling factor for the Field of View angle and distance to extract the theoretically measurable landmarks.
probabilty_gate : List[float]
Minimum probabilty for which landmarks will be considered. Different for each cone color.
_colors : List[str]
Mapping of the landmark id/type to the respective color.
_size : List[int]
Mapping of the landmark id/type to the respective size.
chi2_quantile : float
Threshold for the mahalanobis distance to associate measured and tracked landmarks.
data_association : helpers.DataAssociation
Which data association algorithm to use.
use_kd_tree : bool
Whether to use the KD Tree for data association or not.
"""
_colors = ['gold', 'blue', 'orange', 'orange']
_size = [50, 50, 50, 80]
def __init__(self, filter: ukf.UKF, sensor_name: str, z_names: List[str], z_array: Optional[npt.NDArray] = None,
R_array: Optional[npt.NDArray] = None, active: bool = False,
data_association: helpers.DataAssociation = helpers.DataAssociation.ClusterJCBB,
use_kd_tree: bool=True, chi2_quantile: float=0.8, fov_min_distance: float=0,
fov_max_distance: float=12, fov_angle: float=100, fov_scaling: float=1.2,
probability_gate: List[float]=[0.8, 0.8, 0.8, 0.94], R_azimuth: float=10,
R_distance_factor: float=0.15, R_offset: float=0.15,
update_strategy: Optional[LandmarkUpdateStrategy] = LandmarkUpdateStrategy.ALL) -> None:
"""
Initialize an object of the LocalMapping sensor of autonomous system.
Parameters
----------
filter : ukf.UKF
KalmanFilter object the sensor will be used for.
sensor_name : str
Name to identify the sensor by.
z_names : List[str]
List of the names of the measurements with the measurement count as length.
z_array : npt.NDArray, optional
Numpy Array of the prerecorded measurements. Prerecorded measurements of
all sensor must be synchronized and have the same length, by default None.
R_array : Optional[npt.NDArray], optional
Numpy array of prerecorded measurement noises, by default None
active : bool, optional
Specifies whether the update for a sensor should be executed
or only the measurements should be read in, by default True
data_association : helpers.DataAssociation, optional
Which data association algorithm to use, by default helpers.DataAssociation.ClusterJCBB
use_kd_tree: bool, optional
Whether to use the KD Tree for data association or not.
chi2_quantile: float, optional
Threshold for the mahalanobis distance to associate measured and tracked landmarks.
fov_min_distance: float, optional
Minimum field of view distance of the camera. Measured landmarks will only be considered if they are not closer.
fov_max_distance: float, optional
Maximum field of view distance of the camera. Measured landmarks will only be considered if they are not farther away.
fov_angle: float, optional
Field of View angle of the camera.
fov_scaling: float, optional
Scaling factor for the Field of View angle and distance to extract the theoretically measurable landmarks.
probability_gate : List, Optional
Minimum probabilty for which landmarks will be considered. Different for each cone color.
R_azimuth: float, optional
Measurement noise variance for the azimut angle of the landmark measurement.
R_distance_factor: float, optional
Factor by which the distance variance increases with distance.
R_offset: float, optional
Offset of the line used to increase variance with distance.
"""
super().__init__(filter, z_array=z_array, R_array=R_array,
sensor_name=sensor_name, z_names=z_names, active=active)
self.camera_offset = np.array([0.59, 0])
self.lidar_offset = np.array([0.57, 0])
self.data_association = data_association
self.use_kd_tree = use_kd_tree
self.chi2_quantile = chi2_quantile
self.fov_min_distance = fov_min_distance
self.fov_max_distance = fov_max_distance
self.fov_angle = np.radians(fov_angle)
self.fov_scaling = fov_scaling
self.probability_gate = np.array(probability_gate)
self.R_azimut = np.radians(R_azimuth) ** 2
self.R_distance_factor = R_distance_factor
self.R_offset = R_offset
self.update_strategy = update_strategy
[docs] def copy(self, filter: ukf.UKF, timestamp: float) -> 'LocalMapping':
copy = LocalMapping(filter=filter, sensor_name=self.sensor_name, z_names=self.z_names, active=self.active,
data_association=self.data_association, use_kd_tree=self.use_kd_tree,
chi2_quantile=self.chi2_quantile, fov_min_distance=self.fov_min_distance,
fov_max_distance=self.fov_max_distance, fov_angle=self.fov_angle,
fov_scaling=self.fov_scaling, probability_gate=self.probability_gate,
R_azimuth=self.R_azimut, R_distance_factor=self.R_distance_factor, R_offset=self.R_offset,
update_strategy=self.update_strategy)
copy.fov_angle = self.fov_angle
copy.ros_z_array = self.ros_z_array
copy.ros_t_array = self.ros_t_array
copy.measurement_lock = self.measurement_lock
copy.camera_offset = self.camera_offset
copy.lidar_offset = self.lidar_offset
copy.R_azimut = self.R_azimut
return copy
[docs] def prepare_update(self) -> None:
"""
Prepares update step and returns whether the update for this sensor should be executed.
Doesn't save measurement and measurement noise for this sensor in the filter object, but calculates measurement
indices respectively for update and intiialization and state variable indices for predict and update.
This indices are based on the previous executed data association between tracked landmarks (evaporated to
obersavable landmarks) and observed landmarks.
Returns
-------
bool
Specifies wthether the update for this sensor should be executed.
"""
if self.update_strategy == LandmarkUpdateStrategy.DO_NOT_UPDATE:
self.x_indices_to_update = set()
self.x_indices_to_initialize = set()
self.execute_update_postprocess = False
return False
self.filter.current_global_landmarks = None
self.filter.observable_to_global_mapping = None
self.filter.individual_compatability = None
self.filter.observed_landmarks = None
self.filter.observable_landmarks = None
self.filter.observable_state_covariance = None
self.filter.mapping = []
self.filter.local_mapping_nodes_explored_n = 0
self.filter.local_mapping_compared_associations_n = 0
self.filter.sensor_z[self.sensor_i] = np.broadcast_to(0, self.filter.max_z_dim)
self.filter.sensor_R[self.sensor_i] = np.broadcast_to(0, (self.filter.max_z_dim, self.filter.max_z_dim))
if np.any(np.isnan(self.z)):
return False
# fov and probability gates
norm = np.linalg.norm(self.z[:, 0:2], axis=1)
# Conex, coney, coneid, coneprob, is_lidar
z = self.z[np.logical_and.reduce((self.fov_min_distance < norm,
norm < self.fov_max_distance,
self.z[:, 3] > np.choose(self.z[:, 2].astype(int), self.probability_gate)))]
global_observed_landmarks = self.convert_measurement_to_global_landmarks(z)
observable_landmarks, observable_mapping = self.limit_to_observable_landmarks()
observable_landmarks = np.append(observable_landmarks,
np.atleast_2d(np.take(np.array(self.filter.landmark_ids).astype(int),
observable_mapping)).T,
axis=1)
self.mapping = self.compute_landmark_mapping(global_observed_landmarks,
observable_landmarks=observable_landmarks,
observable_mapping=observable_mapping)
known_indices, unknown_indices = self.compute_known_indices(self.mapping)
temp_additional_state_indices = np.sort(self.mapping[known_indices])
additional_state_indices = np.empty((2 * temp_additional_state_indices.shape[0]))
additional_state_indices[::2] = 4 + 2 * temp_additional_state_indices
additional_state_indices[1::2] = 5 + 2 * temp_additional_state_indices
self.x_indices_to_update = self.x_indices_to_update_base.union(additional_state_indices.astype(int))
self.z_indices_to_update = known_indices
self.z_indices_to_initialize = unknown_indices
self.z = geometry.tranform_cartesian_to_polar_cone_coords(z)
self.z_cartesian = z
self.global_observed_landmarks = global_observed_landmarks
self.R = self.calculate_polar_cone_covariance(self.z)
self.execute_update_postprocess = True
return self.active and self.z_to_update.shape[0]
[docs] def postprocess_update(self) -> None:
"""
Postprocess update step for this sensor.
Used to intitialize newly observed landmarks as new state variables and save the current observed landmarks.
"""
global_observed_landmarks = self.convert_measurement_to_global_landmarks(self.z_cartesian)
polar_local_observed_coords = self.z
for landmark_i in self.mapping:
if landmark_i != -1:
self.filter.landmark_perceived_n[landmark_i] += 1
if not self.update_strategy == LandmarkUpdateStrategy.DO_NOT_INITIALIZE:
for global_observed_landmark, polar_local_observed_coord in \
zip(global_observed_landmarks[self.z_indices_to_initialize],
polar_local_observed_coords[self.z_indices_to_initialize]):
self.initialize_landmark(global_observed_landmark, polar_local_observed_coord)
self.filter.current_global_landmarks = global_observed_landmarks
self.execute_update_postprocess = False
[docs] def convert_measurement_to_global_landmarks(self, z: npt.NDArray) -> npt.NDArray:
"""
Converts cartesian coordinates of local observed coordinates to global cartesian coordinates.
Parameters
----------
z : npt.NDArray
Current measurement vector of the sensor. [(x, y, type, probability, is_lidar), ...].
Returns
-------
npt.NDArray
Global cartesian coordinates of observed landmarks.
"""
# move measured landmarks by camera offset, rotate around global heading and move by global position
is_lidar_index = z[:, 4].astype(bool)
global_landmarks = geometry.rotate_and_move(z[:, 0:2].copy(), rotation=self.filter.x[3],
offset_after_rotation=self.filter.x[0:2],
offset_before_rotation=self.camera_offset, offset_before_rotation_lidar=self.lidar_offset, is_lidar_index=is_lidar_index)
# append landmark id and probability again
global_landmarks = np.append(global_landmarks, z[:, 2:4], axis=1)
return global_landmarks
[docs] def initialize_landmark(self, global_landmark: npt.NDArray, local_coords_polar: npt.NDArray) -> None:
"""
Initializes new, unknown landmark. Expands state vector x and system noise covariance matrices P and Q.
Parameters
----------
global_landmark : npt.NDArray
Global position and type of landmark.
local_coords_polar : npt.NDArray
Local position of landmark in polar coordinates.
"""
dims_x = self.filter.x.shape[0]
# expand state vector, landmark id (color, size) and covariance matrix
self.filter.x = np.append(self.filter.x, global_landmark[0:2])
self.filter.landmark_ids.append(global_landmark[2])
self.filter.fake_landmark_ids.append(global_landmark[2])
self.filter.landmark_perceived_n.append(1)
self.filter.P = np.append(self.filter.P, np.zeros((2, dims_x)), axis=0)
self.filter.P = np.append(self.filter.P, np.zeros((dims_x+2, 2)), axis=1)
# calculate cartesian cone covariance
local_coords_polar = local_coords_polar.copy()
local_coords_polar[0] += self.filter.x[3]
cartesian_cone_covariance = self.calculate_cartesian_cone_covariance(np.atleast_2d(local_coords_polar))
self.filter.P[dims_x:dims_x+2, dims_x:dims_x+2] = np.around(cartesian_cone_covariance + self.filter.P[:2, :2],
decimals=self.filter.max_decimals)
[docs] @staticmethod
def measurement_mean(sigmas: npt.NDArray, Wm: npt.NDArray) -> npt.NDArray:
"""
Calculates the weighted mean of multiple measurement vectors.
Parameters
----------
sigmas : npt.NDArray
Sigma points of the measurement vector.
Wm : npt.NDArray
Weights for the calculation of the weighted mean of the measurement.
Returns
-------
npt.NDArray
Weighted mean of the sigma points of the measurement vector.
"""
z = np.empty(sigmas.shape[1])
# weighted circular mean for the azimuth angle of the polar coordinates
z[::2] = circmean(sigmas[:, ::2], axis=0, weights=np.repeat(np.atleast_2d(Wm), repeats=z.shape[0]/2, axis=0).T)
# normal weights mean for the distance of the polar coordinates
z[1::2] = np.average(sigmas[:, 1::2], axis=0, weights=Wm)
return z
[docs] @staticmethod
def residual_sigmash_and_z(sigmas_h: npt.NDArray[np.floating],
z: npt.NDArray[np.floating]) -> npt.NDArray[np.floating]:
"""
Calculates the difference between the measurement sigma points and the measurement.
Parameters
----------
sigmas_h : npt.NDArray[np.floating]
Sigma points for the measurement.
z : npt.NDArray[np.floating]
Measurement
Returns
-------
npt.NDArray[np.floating]
Difference between sigma points and measurements.
"""
dz = sigmas_h - z
dz[..., ::2] = math.normalize_rad_angles(dz[..., ::2])
return dz
[docs] def hx(self, sigmas_f: npt.NDArray[np.floating], u: Optional[npt.NDArray[np.floating]]) -> npt.NDArray[np.floating]:
"""
Computes the measurement vector the sensor should measure based on the current state vector and camera offset.
Parameters
----------
sigmas_f : npt.NDArray[np.floating]
Sigma point of the current state vector of the system.
u : Optional[npt.NDArray[np.floating]]
Current control input vector of the system. Unused for this sensor.
Returns
-------
npt.NDArray[np.floating]
Measurement state vector that should be measured given the current state vector.
"""
if self.filter.update_strategy == helpers.UpdateStrategy.ONLY_ROBOT_POSE_AND_NECESSARY_STATES:
mapping = np.argsort(np.argsort(self.mapping[self.z_indices_to_update]))
else:
mapping = self.mapping[self.z_indices_to_update]
if self.update_strategy == LandmarkUpdateStrategy.NO_POSE:
heading = sigmas_f[0, 3]
position = sigmas_f[0, np.newaxis, 0:2]
elif self.update_strategy == LandmarkUpdateStrategy.NO_HEADING:
heading = sigmas_f[0, 3]
position = sigmas_f[:, np.newaxis, 0:2]
else:
heading = sigmas_f[:, 3]
position = sigmas_f[:, np.newaxis, 0:2]
rot_matrix = np.empty((sigmas_f.shape[0], 2, 2))
cos, sin = np.cos(heading), np.sin(heading)
rot_matrix[:, 0, 0], rot_matrix[:, 0, 1], rot_matrix[:, 1, 0], rot_matrix[:, 1, 1] = cos, sin, -sin, cos
global_estimated_measured_landmarks = sigmas_f[:, 4:].reshape(
(sigmas_f.shape[0], -1, 2))[:, mapping] - position
local_estimated_measured_landmarks = np.einsum(
'ijk, ilk -> ilj', rot_matrix, global_estimated_measured_landmarks) - self.camera_offset
landmark_azimuth = np.arctan2(local_estimated_measured_landmarks[..., 1],
local_estimated_measured_landmarks[..., 0])
landmark_distance = np.hypot(local_estimated_measured_landmarks[..., 1],
local_estimated_measured_landmarks[..., 0])
return np.dstack((landmark_azimuth, landmark_distance)).reshape((sigmas_f.shape[0], -1))
[docs] def hx_inverse(self, z: np.ndarray, u: Optional[npt.NDArray] = None) -> npt.NDArray:
"""
Calculate the state vector based on the current measurement of the sensor.
# TODO: Implement :D .
Not every state can be calculated, so only part of the state vector will be calculated.
Use the transformer to convert lat, lon to local coordinates
Parameters
----------
z : np.ndarray
Measurement vector, can be a matrix with multiple time steps.
Returns
-------
np.ndarray
State vector, can be a matrix with multiple time steps.
"""
return z
[docs] def limit_to_observable_landmarks(self) -> Tuple[npt.NDArray, npt.NDArray]:
"""
Limits and returns the observable subset of all tracked landmarks based on the current vehicle state and FoV.
Returns
-------
Tuple[npt.NDArray, npt.NDArray]
Position of the landmarks and mapping between the observable landmarks and all tracked landmarks.
"""
assert self.filter.landmarks is not None
path = geometry.generate_fov_path(fov_min_distance=self.fov_min_distance, fov_max_distance=self.fov_max_distance,
fov_angle=self.fov_angle, fov_scaling=self.fov_scaling,
offset_after_rotation=self.filter.x[0:2],
offset_before_rotation=self.camera_offset, rotation=self.filter.x[3])
points, indices = geometry.limit_coordinates_inside_path(coordinates=self.filter.landmarks[:, 0:2], path=path)
return points, indices
[docs] def compute_landmark_mapping(self, z: npt.NDArray, observable_landmarks: npt.NDArray,
observable_mapping: npt.NDArray) -> npt.NDArray:
"""
Computes the mapping between the currently observed landmarks and all tracked landmarks.
Only uses the currently oberservable landmarks for the calculation.
Later translate this mapping into the desired one.
Very naive and inconsistent approach. Calculates the mahalanobis distance for every possible combination.
Parameters
----------
z : npt.NDArray
Current measurement vector of the sensor. [(x, y, type, propability), ...].
observable_landmarks : npt.NDArray
Subset of all tracked landmarks that could be observed right now.
observable_mapping : npt.NDArray
Mapping / Association between the observable landmarks and all tracked landmarks.
Returns
-------
npt.NDArray
Mapping / Associations between the observed landmarks and all tracked landmarks.
"""
if observable_landmarks.shape[0] == 0:
return np.full(z.shape[0], -1)
temp_observable_indices = 2 * observable_mapping
observable_indices = np.sort(np.append(4 + temp_observable_indices, 4 + temp_observable_indices + 1))
observable_state_covariance = self.filter.P[np.ix_(observable_indices, observable_indices)]
# use and execute data association between the observed and observable landmarks using the JCBB algorithm
if self.data_association == helpers.DataAssociation.ClusterJCBB:
JCBB = data_association.ClusterJCBB
elif self.data_association == helpers.DataAssociation.JCBB:
JCBB = data_association.JCBB
else:
logging.critical('The specified data association algorithm is not implemented. Exiting')
exit(137)
jcbb = JCBB(observed_landmarks=z, observable_landmarks=observable_landmarks,
observable_state_covariance=observable_state_covariance,
observable_to_global_mapping=observable_mapping,
use_kd_tree=self.use_kd_tree, chi2quantile=self.chi2_quantile, verifies_landmark_id=True)
self.filter.observed_landmarks = z
self.filter.observable_landmarks = observable_landmarks
self.filter.observable_state_covariance = observable_state_covariance
# save the mapping between the observable and all tracked landmarks for plotting later on
self.filter.observable_to_global_mapping = observable_mapping
# save the association between the observed and observable landmarks for plotting later on
self.filter.individual_compatability = jcbb.individual_compatibility
mapping = jcbb.best_associations.array
self.filter.mapping = mapping
self.filter.local_mapping_nodes_explored_n = jcbb.nodes_explored_n
self.filter.local_mapping_compared_associations_n = len(jcbb.associations_objects)
self.clusters = jcbb.clusters
return mapping
[docs] def compute_known_indices(self, mapping: npt.NDArray) -> Tuple[npt.NDArray, npt.NDArray]:
"""
Computes indices of which measured landmarks are known and which are unknown
Parameters
----------
mapping : npt.NDArray
Mapping / association between the observed landmarks and all tracked landmarks.
Returns
-------
Tuple[npt.NDArray, npt.NDArray]
Indices of all observed landmarks that are already tracked,
Indices of all observed landmarks that are not already tracked.
"""
indices = mapping >= 0
return indices, np.invert(indices)
[docs] def calculate_polar_cone_covariance(self, polar_cones: npt.NDArray) -> npt.NDArray:
"""
Computes the covariance matrix in polar coordinates for given set of cones in polar coordinates.
Uses a predefined variance for the azimut angle.
Uses a percentage of the distance for the standard deviation of the distance.
Parameters
----------
polar_cones : npt.NDArray
Coordinates of the cones in polar coordinates.
Returns
-------
npt.NDArray
Cones covariances in polar coordinates.
"""
diagonal = np.empty(polar_cones.shape[0]*2)
diagonal[::2] = np.full(polar_cones.shape[0], self.R_azimut)
diagonal[1::2] = (polar_cones[:, 1] * self.R_distance_factor + self.R_offset)**2
return np.diag(diagonal)
[docs] def calculate_cartesian_cone_covariance(self, polar_cone: npt.NDArray) -> npt.NDArray:
"""
Computes the covariance matrix in cartesian coordinates for a given cone in polar coordinates. :cite:p:`Hall1992`
Parameters
----------
polar_cone : npt.NDArray
Coordinates of the cone in polar coordinates.
Returns
-------
npt.NDArray
Cone covariance in cartesian coordinates.
"""
polar_covariance = self.calculate_polar_cone_covariance(polar_cone)
adapted_covariance = np.diag([polar_covariance[1, 1],
np.square(polar_cone[0, 1] * np.tan(np.sqrt(polar_covariance[0, 0])))])
rotation_matrix = np.array([[np.cos(polar_cone[0, 0]), -np.sin(polar_cone[0, 0])],
[np.sin(polar_cone[0, 0]), np.cos(polar_cone[0, 0])]])
return rotation_matrix @ adapted_covariance @ rotation_matrix.T
[docs] @staticmethod
def get_color_for_cone_ids(cone_ids: npt.NDArray) -> npt.NDArray:
return np.choose(cone_ids.astype(int), LocalMapping._colors)
[docs] @staticmethod
def get_size_for_cone_ids(cone_ids: npt.NDArray) -> npt.NDArray:
return np.choose(cone_ids.astype(int), LocalMapping._size)
[docs] def get_prerecorded_z_and_R(self, last_prediction_step: int, i: int) -> Tuple[npt.NDArray, npt.NDArray]:
"""
Just get the current measurement.
Parameters
----------
last_prediction_step : int
Last executed prediction step.
i : int
Current index of the prerecorded measurement and measurement noise array.
"""
self.z = self.z_array[i]
self.R = None
@property
def z_to_update(self):
"""
Returns measurements to update.
Returns
-------
npt.NDArray
Measurements to update
"""
return self.z[self.z_indices_to_update, 0:2].flatten()
@property
def R_to_update(self):
"""
Returns measurement noise to update.
Returns
-------
npt.NDArray
Measurement noise for the measurements to update.
"""
R_indices_to_update = np.empty(2*self.z_indices_to_update.shape[0], dtype=bool)
R_indices_to_update[::2] = self.z_indices_to_update
R_indices_to_update[1::2] = self.z_indices_to_update
return self.R[R_indices_to_update][:, R_indices_to_update]