Source code for utils.data_association

"""
Implements multiple classes and function used for the data association of landmarks
"""
from __future__ import annotations

from enum import Enum
from typing import List, Optional

import numpy.typing as npt
import numpy as np

from scipy.spatial import distance, KDTree
from scipy.stats.distributions import chi2


[docs]class DataAssociationMetric(Enum): """ Enum to describe which metric to use to measure the compatilibity of two landmarks. """ MAHALANOBIS_DISTANCE = 1 MAXIMUM_LIKELIHOOD = 2
[docs]class Associations(): """ Class to represent a set of established associations between observed and observable landmarks. The associations are saved in a numpy array. Each index represents an observed landmark identified by its index and the respective value describes the associated observable landmark identified by its index. Attributes ---------- array : npt.NDArray Numpy array saving the established associations between the observed and observable landmarks. size : int Number of established associations. """ def __init__(self, observed_landmarks_n: Optional[int] = None, array: npt.NDArray = None, size: Optional[int] = None) -> None: """ Initilization function for a new set of associations. One either passes the number of observed landmarks in this update step to generate a completely new set or an already existing array to generate a new copy. Parameters ---------- observed_landmarks_n : Optional[int], optional Number of observed landmarks in this update step to initialize the numpy array, by default None array : npt.NDArray, optional Associations array to create a copy of an existing associations object, by default None """ if array is None: assert observed_landmarks_n is not None self.size = 0 self.array = np.full(shape=(observed_landmarks_n, ), fill_value=np.nan, dtype=float) else: # copy constructor assert size is not None self.size = size self.array = array.copy() @property def is_empty(self) -> bool: """ Gets whether at least one association has been established yet. Returns ------- bool Whether at least one association has been established yet. """ return self.size == 0 @property def copy(self) -> Associations: """ Gets a deepcopy of the current Associations object by generating a new one with a copy of the array. Returns ------- Associations Copy of the current associations object. """ return Associations(array=self.array, size=self.size) @property def flat_observed_landmark_indices(self) -> npt.NDArray: """ Gets indices of state vector and state covariance observed landmarks as flat numpy array. Returns ------- npt.NDArray Indices of observed landmarks as flat numpy array. """ indices = np.argwhere(~np.isnan(self.array))[:, 0] return self.coords_indices_to_flat_indices(indices) @property def flat_observable_landmark_indices(self) -> npt.NDArray: """ Gets indices for state vector and state covariance of observable landmarks as flat numpy array. Returns ------- npt.NDArray Indices of observable landmarks as flat numpy array. """ values = self.array[np.where(~np.isnan(self.array))[0]] return self.coords_indices_to_flat_indices(values)
[docs] @staticmethod def coords_indices_to_flat_indices(indices: npt.NDArray) -> npt.NDArray: """ Converts landmark indices to flat numpy array containing the indices of this landmarks for the state vector. Parameters ---------- indices : npt.NDArray Indices of landmarks to generate the flat numpy array for. Returns ------- npt.NDArray Indices of landmarks as flat numpy array. """ length = indices.shape[0] flatten_indices = np.empty((2*length)) flatten_indices[::2] = 2*indices flatten_indices[1::2] = 2*indices + 1 return flatten_indices.astype(dtype=int)
[docs] def translate_to_global_mapping(self, observable_to_global_mapping: Optional[List[int]] = None): """ Translates the associations between the observed and observable landmarks to associations between the observed landmarks and all tracked landmarks. Parameters ---------- observable_to_global_mapping : Optional[List[int]], optional Mapping between the observable landmarks to all tracked landmarks, by default None """ if observable_to_global_mapping is not None: self.array[np.isnan(self.array)] = -1 self.array = np.take(np.append(observable_to_global_mapping, -1), self.array.astype(dtype=int))
[docs]class JCBB(): """ Class to represent and execute the Joint compatibility branch and bound (JCBB) algorithm for a given set of observed and observable landmarks. Code has been ported from the C++ Implementation of the mobile robot programming toolkit. https://github.com/MRPT/mrpt/blob/develop/libs/slam/src/slam/data_association.cpp There have been made three changes compared to the implementation above. 1. The joint complatibility (mahalanobis distance of all associations) is only calculated at the end for all associations sets with the highest number of established associations to determine the best association set. 2. There has been prepared a feature to calculate in a more efficient way, so we believe: We take into consideration that an observed landmark might not have any individual compatible observable landmark left since all individual compatible observable landmarks have either already been associated or willingly not. 3. This implementation only supports the mahalanobis distance and not the maximum likelihood. Attributes ---------- observed_landmarks : npt.NDArray Mx(O+1) Numpy array representing the corodinates and the id of the observed landmarks. M being the number of landmarks and O the dimension of coordinates. observable_landmarks : npt.NDArray Nx(O+1) Numpy array representing the coordinates and the id of the observable landmarks. M being the number of landmarks and O the dimension of coordinates. observable_state_covariance : npt.NDArray Covariance matrix of the observable landmarks. Dimension and explanation will follow. observed_landmarks_n : int Number of observed landmarks. observable_landmarks_n : int Number of observable landmarks. associations_objects : List[Associations] List of Associations objects (sets) that have the highest number of established associations and thus are candidates for the best associations set. best_associations : Associations Best Associations object, gets assigned at the end and must be an element of associations_objects. best_distance : float Mahalanobis distance of the currently best associations set. individual_compatibility : npt.NDArray Compatibility matrix between the observed and observable landmarks based on the individual distance and color. individual_distances : npt.NDArray Matrix containing the individual distances between the observed and observable landmarks. individual_compatibility_counts : npt.NDArray Array containing the count of compatible observable landmarks for the respective observed landmark. nodes_explored_n : int Number of explored nodes of the search tree for the best associations set. association_metric : AssociationMetric Which metric to use to compare different associations sets. use_kd_tree : bool Option to specify whether a kd tree should be used for evaluating which pairings should be considered. verifies_landmark_id : bool Specified whether the landmark id (color) of observed and observable landmark should match. inverse_observable_landmark_covariances : npt.NDArray Inverse of the covariance matrix for all observable covariances so that it only has to be calculated once. chi2thres : float Threshold to evaluate whether two landmarks are compatible given their mahalanobis distance. observable_landmarks_flat_coords : npt.NDArray Coordinates of all observable landmarks flattened. observed_landmarks_flat_coords : npt.NDArray Coordinates of all observed landmarks flattened. """ def __init__(self, observed_landmarks: npt.NDArray, observable_landmarks: npt.NDArray, observable_state_covariance: npt.NDArray, use_kd_tree: bool = True, chi2quantile: float = 0.5, verifies_landmark_id: bool = True, observable_to_global_mapping: Optional[List[int]] = None) -> None: """ Initizalizes the JCBB class. Parameters ---------- observed_landmarks : npt.NDArray Mx(O+1) Numpy array representing the corodinates and the id of the observed landmarks. M being the number of landmarks and O the dimension of coordinates. observable_landmarks : npt.NDArray Nx(O+1) Numpy array representing the coordinates and the id of the observable landmarks. M being the number of landmarks and O the dimension of coordinates. observable_state_covariance : npt.NDArray Covariance matrix of the observable landmarks. Dimension and explanation will follow. use_kd_tree : bool, optional Option to specify whether a kd tree should be used for evaluating which pairings should be considered, by default True chi2quantile : float, optional Chi^2 quantile to calculate the threshold for evaluate the compatibility of two landmarks, by default 0.5 verifies_landmark_id : bool, optional Specified whether the landmark id (color) of observed and observable landmark should match., by default True observable_to_global_mapping : Optional[List[int]], optional Mapping between the observable landmarks to all tracked landmarks., by default None """ self.observed_landmarks = observed_landmarks self.observable_landmarks = observable_landmarks # filtered with FOV gate self.observable_state_covariance = observable_state_covariance self.observed_landmarks_n = observed_landmarks.shape[0] self.observable_landmarks_n = observable_landmarks.shape[0] self.observable_landmarks_flat_coords: npt.NDArray self.observed_landmarks_flat_coords: npt.NDArray self.associations_objects: List[Associations] # canditates for best associations self.best_associations: Associations self.best_distance = np.nan self.individual_compatibility = np.zeros((self.observed_landmarks_n, self.observable_landmarks_n), dtype=bool) self.individual_distances = np.full((self.observed_landmarks_n, self.observable_landmarks_n), np.nan) self.individual_compatibility_counts = np.zeros(self.observed_landmarks_n, dtype=int) self.nodes_explored_n = 0 self.association_metric = DataAssociationMetric.MAHALANOBIS_DISTANCE self.use_kd_tree = use_kd_tree self.verifies_landmark_id = verifies_landmark_id self.inverse_observable_landmark_covariances = np.empty((self.observable_landmarks_n, 2, 2)) self.chi2thres = chi2.ppf(chi2quantile, df=2) # TODO: what value to use? :( self.prepare() self.associate_with_full_covariance() self.clusters: List[Cluster] = [Cluster(observed_landmark_indices=np.full((self.observed_landmarks_n), True), individual_compatibility=self.individual_compatibility, observable_landmark_indices=np.full((self.observable_landmarks_n), True), order=np.arange(self.observed_landmarks_n))] self.clusters[0].best_associations = self.best_associations self.best_associations.translate_to_global_mapping(observable_to_global_mapping=observable_to_global_mapping)
[docs] def prepare(self): """ Function to prepare the data for the JCBB algorithm to save some computation time. """ matrices = [self.observable_state_covariance[2*observable_landmark_i:2*observable_landmark_i + 2, 2*observable_landmark_i:2*observable_landmark_i + 2] for observable_landmark_i in range(self.observable_landmarks_n)] self.inverse_observable_landmark_covariances = np.linalg.inv(matrices) self.observable_landmarks_flat_coords = self.observable_landmarks[:, 0:2].flatten() self.observed_landmarks_flat_coords = self.observed_landmarks[:, 0:2].flatten()
[docs] def recursive(self, observed_landmark_i: int, current_associations: Associations): """ Function to search for the best associations set recursivly. Parameters ---------- observed_landmark_i : int Index of the current observed landmark to try all possible not already associated observable landmarks. current_associations : Associations Current already established associations for this branch of the search tree. """ # Is it the end of the iteration? --> one possible termination condition if observed_landmark_i >= self.observed_landmarks_n: if current_associations.size > self.associations_objects[0].size: # This must be a better choice since more features are matched, so all other candidates can be forgotten self.associations_objects = [current_associations] elif not current_associations.is_empty and current_associations.size == self.associations_objects[0].size: # otherwise just add his associations set to the candidates for the best associations set self.associations_objects.append(current_associations) else: # normal iteration # how many association can be still established at this point potentials = np.count_nonzero(self.individual_compatibility_counts[observed_landmark_i:]) # if the current highest number of established association can still be topped, continue. if current_associations.size + potentials >= self.associations_objects[0].size: # loop over all observable landmarks for this observed landmark for observable_landmark_i in range(self.observable_landmarks_n): # continue if this observed landmark is compatible with this observable landmark if self.individual_compatibility[observed_landmark_i, observable_landmark_i]: # if there isn't already an association within the associations set for this observable landmark if observable_landmark_i not in current_associations.array: # search along this branch then new_current_associations = current_associations.copy new_current_associations.array[observed_landmark_i] = observable_landmark_i new_current_associations.size += 1 self.nodes_explored_n += 1 self.recursive(observed_landmark_i=observed_landmark_i + 1, current_associations=new_current_associations) # if this observed landmark had any compatible observable landmark, there is one potential less # by not associating any observable landmark with this observed landmark. if self.individual_compatibility_counts[observed_landmark_i] > 0: potentials -= 1 if current_associations.size + potentials >= self.associations_objects[0].size: self.nodes_explored_n += 1 self.recursive(observed_landmark_i=observed_landmark_i + 1, current_associations=current_associations)
[docs] def mahalanobis_distance_of_associations(self, current_associations: Associations) -> float: """ Mahalanobis distance for a given set of associations. Is equivalent to the joint compatibility. Parameters ---------- current_associations : Associations Associations set to calculate the joint compatibility for. Returns ------- float Mahalanobis distance for the given set of associations. """ associations_size = current_associations.size assert(associations_size > 0) flat_observed_landmarks_indices = current_associations.flat_observed_landmark_indices flat_observable_landmarks_indices = current_associations.flat_observable_landmark_indices observed_landmarks_coords = self.observed_landmarks_flat_coords[flat_observed_landmarks_indices] observable_landmarks_coords = self.observable_landmarks_flat_coords[flat_observable_landmarks_indices] observable_landmarks_covarince = self.observable_state_covariance[np.ix_(flat_observable_landmarks_indices, flat_observable_landmarks_indices)] return self.calculate_metric(observed_landmarks_coords, observable_landmarks_coords, np.linalg.inv(observable_landmarks_covarince), self.association_metric)
[docs] def verify_landmark_id(self, observed_landmark_i: int, observable_landmark_i: int) -> bool: """ Verifies whether the landmark id (color) of the given observed and observable landmark match. Returns always True if it has been specified that the landmark id shouldn't be verified. Parameters ---------- observed_landmark_i : int Index of the observed landmark the landmark id should be verified for. observable_landmark_i : int Index of the observable landmark the landmark id should be verified for. Returns ------- bool Whether the landmark id (color) of the given observed and observable landmark match. """ if not self.verifies_landmark_id: return True return self.observed_landmarks[observed_landmark_i, 2] == self.observable_landmarks[observable_landmark_i, 2]
[docs] def associate_with_full_covariance(self): """ Calculate best association between the observed and observable landmarks. """ # at this moment only the mahalanobis distance is implemented. assert self.association_metric == DataAssociationMetric.MAHALANOBIS_DISTANCE # somehow N_KD_RESULTS = self.observable_landmarks_n kd_tree = KDTree(self.observable_landmarks[:, 0:2]) # init worst case distance? hopefully np.nan, what is done in the init # First calculate the individual compatibility between the observed and observable landmarks. for observed_landmark_i in range(self.observed_landmarks_n): if self.use_kd_tree is False: # if the kd tree should not be used, loop over every observable landmark for this observed landmark for observable_landmark_i in range(self.observable_landmarks_n): # cannot be compatible if the landmark id (color) doesn't match if not self.verify_landmark_id(observed_landmark_i, observable_landmark_i): continue mahalanobis_distance = distance.mahalanobis( self.observed_landmarks[observed_landmark_i], self.observable_landmarks[observable_landmark_i], self.inverse_observable_landmark_covariances[observable_landmark_i]) # if the mahalanobis distance is larger than the specified threshold, they are not compatible. if(mahalanobis_distance > self.chi2thres): continue # save that they are compatible and their compatibility. self.individual_distances[observed_landmark_i, observable_landmark_i] = mahalanobis_distance self.individual_compatibility[observed_landmark_i, observable_landmark_i] = True self.individual_compatibility_counts[observed_landmark_i] += 1 elif self.use_kd_tree is True: # if the kd tree should not be used, loop over all observable landmarks beginning with the nearest # and break if one is too far from the observed landmark. Everything else stays the same as above. _, indices = kd_tree.query(self.observed_landmarks[observed_landmark_i, 0:2], k=N_KD_RESULTS) indices = np.atleast_1d(indices) for result_index in range(indices.shape[-1]): observable_landmark_i = indices[result_index] if not self.verify_landmark_id(observed_landmark_i, observable_landmark_i): continue mahalanobis_distance = distance.mahalanobis( self.observed_landmarks[observed_landmark_i, 0:2], self.observable_landmarks[observable_landmark_i, 0:2], self.inverse_observable_landmark_covariances[observable_landmark_i]) if(mahalanobis_distance > self.chi2thres): break self.individual_distances[observed_landmark_i, observable_landmark_i] = mahalanobis_distance self.individual_compatibility[observed_landmark_i, observable_landmark_i] = True self.individual_compatibility_counts[observed_landmark_i] += 1 # initialize the candidates for the best associations set with an empty one. self.associations_objects = [Associations(observed_landmarks_n=self.observed_landmarks_n)] # start the recursive search of the search tree for the best associations sets self.recursive(observed_landmark_i=0, current_associations=self.associations_objects[0].copy) # if there are multiple candidates for the best associations set (same number of associations) if len(self.associations_objects) > 1: self.best_associations = None # choose the one with the lowest mahalanobis distance for associations_object in self.associations_objects: mahalanobis_distance = self.mahalanobis_distance_of_associations(associations_object) if self.is_closer(mahalanobis_distance, self.best_distance): self.best_associations = associations_object self.best_distance = mahalanobis_distance else: # otherwise take the one best. self.best_associations = self.associations_objects[0]
[docs] @staticmethod def calculate_metric(observation: npt.NDArray, prediction: npt.NDArray, covariance: npt.NDArray, metric: DataAssociationMetric) -> float: """ Calculate metric between an observation and a prediction scaled by the covariance of the prediction. Parameters ---------- observation : npt.NDArray Observed landmark(s). prediction : npt.NDArray Observable landmark(s). covariance : npt.NDArray Covariance of the observable landmark(s). metric : DataAssociationMetric Metric to calculate. Returns ------- float Calculated metric. """ mahalanobis_distance = distance.mahalanobis(observation, prediction, covariance) if metric == DataAssociationMetric.MAHALANOBIS_DISTANCE: return mahalanobis_distance assert metric == DataAssociationMetric.MAXIMUM_LIKELIHOOD dovariance_det = np.linalg.det(covariance) maximum_likelihood = np.exp(np.sqrt(mahalanobis_distance) / -2) / (np.sqrt(2 * np.pi) * np.sqrt(dovariance_det)) return maximum_likelihood
[docs] def is_closer(self, distance1: float, distance2: float) -> bool: """ Returns whether the first distance is smaller than the second distance and thus its landmarks are closer. Based on the metric to use. Parameters ---------- distance1 : float First distance. distance2 : float First distance. Returns ------- bool Whether the first distance is smaller than the second distance and thus its landmarks are closer. """ if np.isnan(distance2): return True if np.isnan(distance1): return False if self.association_metric == DataAssociationMetric.MAHALANOBIS_DISTANCE: return distance1 < distance2 elif self.association_metric == DataAssociationMetric.MAXIMUM_LIKELIHOOD: return distance1 > distance2
[docs]class Cluster(): def __init__(self, observed_landmark_indices: npt.NDArray, individual_compatibility: npt.NDArray, observable_landmark_indices: npt.NDArray, order: npt.NDArray): """ Class to represent a cluster of observed landmark that can be searched for the best associations set independently from other landmarks cluster. Parameters ---------- observed_landmark_indices : npt.NDArray Numpy boolean array masking which observed landmarks are part of this cluster. observable_landmark_indices : npt.NDArray Numpy boolean array masking which observable landmarks are part of this cluster. order : npt.NDArray Numpy array containing the array by which the the compatibility matrix has been ordered for the clustering. """ observed_order = np.argsort(np.count_nonzero(individual_compatibility[observed_landmark_indices][:, observable_landmark_indices], axis=1))[::-1] self.observed_landmark_indices = np.nonzero(observed_landmark_indices[np.argsort(order)])[0][observed_order] observable_order = np.argsort(np.count_nonzero(individual_compatibility[observed_landmark_indices][:, observable_landmark_indices], axis=0))[::-1] self.observable_landmark_indices = np.nonzero(observable_landmark_indices)[0][observable_order] self.observed_landmarks_n = np.count_nonzero(observed_landmark_indices) self.observable_landmarks_n = np.count_nonzero(observable_landmark_indices) self.associations_objects = List[Associations] self.best_associations: Associations self.best_distance = np.nan
[docs]class ClusterJCBB(): """ Class to represent and execute the Joint compatibility branch and bound (JCBB) algorithm for a given set of observed and observable landmarks. Code has been ported from the C++ Implementation of the mobile robot programming toolkit. https://github.com/MRPT/mrpt/blob/develop/libs/slam/src/slam/data_association.cpp There have been made three changes compared to the implementation above. 1. The joint complatibility (mahalanobis distance of all associations) is only calculated at the end for all associations sets with the highest number of established associations to determine the best association set. 2. There has been prepared a feature to calculate in a more efficient way, so we believe: We take into consideration that an observed landmark might not have any individual compatible observable landmark left since all individual compatible observable landmarks have either already been associated or willingly not. 3. This implementation only supports the mahalanobis distance and not the maximum likelihood. Attributes ---------- observed_landmarks : npt.NDArray Mx(O+1) Numpy array representing the corodinates and the id of the observed landmarks. M being the number of landmarks and O the dimension of coordinates. observable_landmarks : npt.NDArray Nx(O+1) Numpy array representing the coordinates and the id of the observable landmarks. M being the number of landmarks and O the dimension of coordinates. observable_state_covariance : npt.NDArray Covariance matrix of the observable landmarks. Dimension and explanation will follow. observed_landmarks_n : int Number of observed landmarks. observable_landmarks_n : int Number of observable landmarks. associations_objects : List[Associations] List of Associations objects (sets) that have the highest number of established associations and thus are candidates for the best associations set. best_associations : Associations Best Associations object, gets assigned at the end and must be an element of associations_objects. best_distance : float Mahalanobis distance of the currently best associations set. individual_compatibility : npt.NDArray Compatibility matrix between the observed and observable landmarks based on the individual distance and color. individual_distances : npt.NDArray Matrix containing the individual distances between the observed and observable landmarks. individual_compatibility_counts : npt.NDArray Array containing the count of compatible observable landmarks for the respective observed landmark. nodes_explored_n : int Number of explored nodes of the search tree for the best associations set. association_metric : AssociationMetric Which metric to use to compare different associations sets. use_kd_tree : bool Option to specify whether a kd tree should be used for evaluating which pairings should be considered. verifies_landmark_id : bool Specified whether the landmark id (color) of observed and observable landmark should match. inverse_observable_landmark_covariances : npt.NDArray Inverse of the covariance matrix for all observable covariances so that it only has to be calculated once. chi2thres : float Threshold to evaluate whether two landmarks are compatible given their mahalanobis distance. observable_landmarks_flat_coords : npt.NDArray Coordinates of all observable landmarks flattened. observed_landmarks_flat_coords : npt.NDArray Coordinates of all observed landmarks flattened. clusters : List[Cluster] List of all landmark clusters. """ def __init__(self, observed_landmarks: npt.NDArray, observable_landmarks: npt.NDArray, observable_state_covariance: npt.NDArray, use_kd_tree: bool = True, chi2quantile: float = 0.5, verifies_landmark_id: bool = True, observable_to_global_mapping: Optional[List[int]] = None) -> None: """ Initizalizes the JCBB class. Parameters ---------- observed_landmarks : npt.NDArray Mx(O+1) Numpy array representing the coordinates and the id of the observed landmarks. M being the number of landmarks and O the dimension of coordinates. observable_landmarks : npt.NDArray Nx(O+1) Numpy array representing the coordinates and the id of the observable landmarks. M being the number of landmarks and O the dimension of coordinates. observable_state_covariance : npt.NDArray Covariance matrix of the observable landmarks. Dimension and explanation will follow. use_kd_tree : bool, optional Option to specify whether a kd tree should be used for evaluating which pairings should be considered, by default True chi2quantile : float, optional Chi^2 quantile to calculate the threshold for evaluating the compatibility of two landmarks, by default 0.5 verifies_landmark_id : bool, optional Specified whether the landmark id (color) of observed and observable landmark should match, by default True observable_to_global_mapping : Optional[List[int]], optional Mapping between the observable landmarks to all tracked landmarks, by default None """ self.observed_landmarks = observed_landmarks self.observable_landmarks = observable_landmarks # filtered with FOV gate self.observable_state_covariance = observable_state_covariance self.observed_landmarks_n = observed_landmarks.shape[0] self.observable_landmarks_n = observable_landmarks.shape[0] self.observable_landmarks_flat_coords: npt.NDArray self.observed_landmarks_flat_coords: npt.NDArray self.associations_objects: List[Associations] = [] # candidates for best associations self.best_associations: Associations self.best_distance = np.nan self.individual_compatibility = np.zeros((self.observed_landmarks_n, self.observable_landmarks_n), dtype=bool) self.individual_distances = np.full((self.observed_landmarks_n, self.observable_landmarks_n), np.nan) self.individual_compatibility_counts = np.zeros(self.observed_landmarks_n, dtype=int) self.nodes_explored_n = 0 self.association_metric = DataAssociationMetric.MAHALANOBIS_DISTANCE self.use_kd_tree = use_kd_tree self.verifies_landmark_id = verifies_landmark_id self.inverse_observable_landmark_covariances = np.empty((self.observable_landmarks_n, 2, 2)) self.chi2thres = chi2.ppf(chi2quantile, df=2) # TODO: what value to use? :( self.clusters: List[Cluster] = [] self.prepare() self.associate_with_full_covariance() self.best_associations.translate_to_global_mapping(observable_to_global_mapping=observable_to_global_mapping)
[docs] def prepare(self): """ Function to prepare the data for the JCBB algorithm to save some computation time. """ matrices = [self.observable_state_covariance[2*observable_landmark_i:2*observable_landmark_i + 2, 2*observable_landmark_i:2*observable_landmark_i + 2] for observable_landmark_i in range(self.observable_landmarks_n)] self.inverse_observable_landmark_covariances = np.linalg.inv(matrices) self.observable_landmarks_flat_coords = self.observable_landmarks[:, 0:2].flatten() self.observed_landmarks_flat_coords = self.observed_landmarks[:, 0:2].flatten()
[docs] def recursive(self, cluster_observed_landmark_i: int, current_associations: Associations, cluster: Cluster): """ Function to search for the best associations set recursivly. Parameters ---------- cluster_observed_landmark_i : int Index of the current observed landmark to try all possible not already associated observable landmarks. current_associations : Associations Current already established associations for this branch of the search tree. cluster : Cluster Cluster for which the best associations are currently searched for. """ # Is it the end of the iteration? --> one possible termination condition if cluster_observed_landmark_i >= cluster.observed_landmarks_n: if current_associations.size > cluster.associations_objects[0].size: # This must be a better choice since more features are matched, so all other candidates can be forgotten cluster.associations_objects = [current_associations] elif not current_associations.is_empty and current_associations.size == cluster.associations_objects[0].size: # noqa: E501 # otherwise just add his associations set to the candidates for the best associations set cluster.associations_objects.append(current_associations) else: # normal iteration # how many association can be still established at this point observed_landmark_i = cluster.observed_landmark_indices[cluster_observed_landmark_i] potentials = np.count_nonzero(self.individual_compatibility_counts[cluster.observed_landmark_indices[cluster_observed_landmark_i:]]) # if the current highest number of established association can still be topped, continue. if current_associations.size + potentials >= cluster.associations_objects[0].size: # loop over all observable landmarks for this observed landmark for observable_landmark_i in cluster.observable_landmark_indices: # continue if this observed landmark is compatible with this observable landmark if self.individual_compatibility[observed_landmark_i, observable_landmark_i]: # if there isn't already an association within the associations set for this observable landmark if observable_landmark_i not in current_associations.array: # search along this branch then new_current_associations = current_associations.copy new_current_associations.array[observed_landmark_i] = observable_landmark_i new_current_associations.size += 1 self.nodes_explored_n += 1 self.recursive(cluster_observed_landmark_i=cluster_observed_landmark_i + 1, current_associations=new_current_associations, cluster=cluster) # if this observed landmark had any compatible observable landmark, there is one potential less # by not associating any observable landmark with this observed landmark. if self.individual_compatibility_counts[observed_landmark_i] > 0: potentials -= 1 if current_associations.size + potentials >= cluster.associations_objects[0].size: self.nodes_explored_n += 1 self.recursive(cluster_observed_landmark_i=cluster_observed_landmark_i + 1, current_associations=current_associations, cluster=cluster)
[docs] def mahalanobis_distance_of_associations(self, current_associations: Associations) -> float: """ Mahalanobis distance for a given set of associations. Is equivalent to the joint compatibility. Parameters ---------- current_associations : Associations Associations set to calculate the joint compatibility for. Returns ------- float Mahalanobis distance for the given set of associations. """ associations_size = current_associations.size assert(associations_size > 0) flat_observed_landmarks_indices = current_associations.flat_observed_landmark_indices flat_observable_landmarks_indices = current_associations.flat_observable_landmark_indices observed_landmarks_coords = self.observed_landmarks_flat_coords[flat_observed_landmarks_indices] observable_landmarks_coords = self.observable_landmarks_flat_coords[flat_observable_landmarks_indices] observable_landmarks_covarince = self.observable_state_covariance[np.ix_(flat_observable_landmarks_indices, flat_observable_landmarks_indices)] return self.calculate_metric(observed_landmarks_coords, observable_landmarks_coords, np.linalg.inv(observable_landmarks_covarince), self.association_metric)
[docs] def verify_landmark_id(self, observed_landmark_i: int, observable_landmark_i: int) -> bool: """ Verifies whether the landmark id (color) of the given observed and observable landmark match. Returns always True if it has been specified that the landmark id shouldn't be verified. Parameters ---------- observed_landmark_i : int Index of the observed landmark the landmark id should be verified for. observable_landmark_i : int Index of the observable landmark the landmark id should be verified for. Returns ------- bool Whether the landmark id (color) of the given observed and observable landmark match. """ if not self.verifies_landmark_id: return True return self.observed_landmarks[observed_landmark_i, 2] == self.observable_landmarks[observable_landmark_i, 2]
[docs] def associate_with_full_covariance(self): """ Calculate best association between the observed and observable landmarks. """ # at this moment only the mahalanobis distance is implemented. assert self.association_metric == DataAssociationMetric.MAHALANOBIS_DISTANCE # somehow N_KD_RESULTS = self.observable_landmarks_n kd_tree = KDTree(self.observable_landmarks[:, 0:2]) # init worst case distance? hopefully np.nan, what is done in the init # First calculate the individual compatibility between the observed and observable landmarks. for observed_landmark_i in range(self.observed_landmarks_n): if self.use_kd_tree is False: # if the kd tree should not be used, loop over every observable landmark for this observed landmark for observable_landmark_i in range(self.observable_landmarks_n): # cannot be compatible if the landmark id (color) doesn't match if not self.verify_landmark_id(observed_landmark_i, observable_landmark_i): continue mahalanobis_distance = distance.mahalanobis( self.observed_landmarks[observed_landmark_i], self.observable_landmarks[observable_landmark_i], self.inverse_observable_landmark_covariances[observable_landmark_i]) # if the mahalanobis distance is larger than the specified threshold, they are not compatible. if(mahalanobis_distance > self.chi2thres): continue # save that they are compatible and their compatibility. self.individual_distances[observed_landmark_i, observable_landmark_i] = mahalanobis_distance self.individual_compatibility[observed_landmark_i, observable_landmark_i] = True self.individual_compatibility_counts[observed_landmark_i] += 1 elif self.use_kd_tree is True: # if the kd tree should not be used, loop over all observable landmarks beginning with the nearest # and break if one is too far from the observed landmark. Everything else stays the same as above. _, indices = kd_tree.query(self.observed_landmarks[observed_landmark_i, 0:2], k=N_KD_RESULTS) indices = np.atleast_1d(indices) for result_index in range(indices.shape[-1]): observable_landmark_i = indices[result_index] if not self.verify_landmark_id(observed_landmark_i, observable_landmark_i): continue mahalanobis_distance = distance.mahalanobis( self.observed_landmarks[observed_landmark_i, 0:2], self.observable_landmarks[observable_landmark_i, 0:2], self.inverse_observable_landmark_covariances[observable_landmark_i]) if(mahalanobis_distance > self.chi2thres): break self.individual_distances[observed_landmark_i, observable_landmark_i] = mahalanobis_distance self.individual_compatibility[observed_landmark_i, observable_landmark_i] = True self.individual_compatibility_counts[observed_landmark_i] += 1 if np.count_nonzero(self.individual_compatibility) == 0: self.best_associations = Associations(array=np.full(self.observed_landmarks_n, -1), size=0) return self.find_clusters() # find the best associations set for every cluster for cluster in self.clusters: # initialize the candidates for the best associations set with an empty one. cluster.associations_objects = [Associations(observed_landmarks_n=self.observed_landmarks_n)] # start the recursive search of the search tree for the best associations sets self.recursive(cluster_observed_landmark_i=0, cluster=cluster, current_associations=cluster.associations_objects[0].copy) self.associations_objects += cluster.associations_objects # if there are multiple candidates for the best associations set (same number of associations) if len(cluster.associations_objects) > 1: cluster.best_associations = None # choose the one with the lowest mahalanobis distance for associations_object in cluster.associations_objects: mahalanobis_distance = self.mahalanobis_distance_of_associations(associations_object) if self.is_closer(mahalanobis_distance, cluster.best_distance): cluster.best_associations = associations_object cluster.best_distance = mahalanobis_distance else: # otherwise take the one best. cluster.best_associations = cluster.associations_objects[0] # combine every best association sets for each cluster to one associations set arrays = [cluster.best_associations.array for cluster in self.clusters] array = np.atleast_1d(np.nansum(arrays, axis=0)) array[np.all(np.isnan(arrays), axis=0)] = np.nan self.best_associations = Associations(array=array, size=np.count_nonzero(array))
[docs] def find_clusters(self): """ Find clusters of compatible landmarks in which the best associations can be searched for independently. """ order = np.argsort(self.individual_compatibility_counts) to_be_examined_individual_compatibility = self.individual_compatibility[order] cluster_oberved_indices_array = [] cluster_obervable_indices_array = [] while(to_be_examined_individual_compatibility.shape[0] > 0): if np.count_nonzero(to_be_examined_individual_compatibility[-1]) == 0: break old_observable_landmarks_in_cluster = to_be_examined_individual_compatibility[-1] while True: cluster_oberved_indices = np.count_nonzero( to_be_examined_individual_compatibility[:, old_observable_landmarks_in_cluster], axis=1) > 0 new_observable_landmarks_in_cluster = np.logical_or.reduce( to_be_examined_individual_compatibility[cluster_oberved_indices]) if np.array_equal(new_observable_landmarks_in_cluster, old_observable_landmarks_in_cluster): break old_observable_landmarks_in_cluster = new_observable_landmarks_in_cluster if len(cluster_oberved_indices_array) > 0: cluster_oberved_indices_mask = np.zeros(self.individual_compatibility.shape[0], dtype=bool) cluster_oberved_indices_mask[~np.logical_or.reduce(cluster_oberved_indices_array)] = \ cluster_oberved_indices else: cluster_oberved_indices_mask = cluster_oberved_indices cluster_oberved_indices_array.append(cluster_oberved_indices_mask) cluster_obervable_indices_array.append(old_observable_landmarks_in_cluster) self.clusters.append(Cluster(observed_landmark_indices=cluster_oberved_indices_mask, observable_landmark_indices=old_observable_landmarks_in_cluster, order=order, individual_compatibility=self.individual_compatibility[order])) to_be_examined_individual_compatibility = to_be_examined_individual_compatibility[~cluster_oberved_indices]
[docs] @staticmethod def calculate_metric(observation: npt.NDArray, prediction: npt.NDArray, covariance: npt.NDArray, metric: DataAssociationMetric) -> float: """ Calculate metric between an observation and a prediction scaled by the covariance of the prediction. Parameters ---------- observation : npt.NDArray Observed landmark(s). prediction : npt.NDArray Observable landmark(s). covariance : npt.NDArray Covariance of the observable landmark(s). metric : DataAssociationMetric Metric to calculate. Returns ------- float Calculated metric. """ mahalanobis_distance = distance.mahalanobis(observation, prediction, covariance) if metric == DataAssociationMetric.MAHALANOBIS_DISTANCE: return mahalanobis_distance assert metric == DataAssociationMetric.MAXIMUM_LIKELIHOOD dovariance_det = np.linalg.det(covariance) maximum_likelihood = np.exp(np.sqrt(mahalanobis_distance) / -2) / (np.sqrt(2 * np.pi) * np.sqrt(dovariance_det)) return maximum_likelihood
[docs] def is_closer(self, distance1: float, distance2: float) -> bool: """ Returns whether the first distance is smaller than the second distance and thus its landmarks are closer. Based on the metric to use. Parameters ---------- distance1 : float First distance. distance2 : float First distance. Returns ------- bool Whether the first distance is smaller than the second distance and thus its landmarks are closer. """ if np.isnan(distance2): return True if np.isnan(distance1): return False if self.association_metric == DataAssociationMetric.MAHALANOBIS_DISTANCE: return distance1 < distance2 elif self.association_metric == DataAssociationMetric.MAXIMUM_LIKELIHOOD: return distance1 > distance2