"""Implements multiple classes and function used for the data association of landmarks"""from__future__importannotationsfromenumimportEnumfromtypingimportList,Optionalimportnumpy.typingasnptimportnumpyasnpfromscipy.spatialimportdistance,KDTreefromscipy.stats.distributionsimportchi2
[docs]classDataAssociationMetric(Enum):""" Enum to describe which metric to use to measure the compatilibity of two landmarks. """MAHALANOBIS_DISTANCE=1MAXIMUM_LIKELIHOOD=2
[docs]classAssociations():""" Class to represent a set of established associations between observed and observable landmarks. The associations are saved in a numpy array. Each index represents an observed landmark identified by its index and the respective value describes the associated observable landmark identified by its index. Attributes ---------- array : npt.NDArray Numpy array saving the established associations between the observed and observable landmarks. size : int Number of established associations. """def__init__(self,observed_landmarks_n:Optional[int]=None,array:npt.NDArray=None,size:Optional[int]=None)->None:""" Initilization function for a new set of associations. One either passes the number of observed landmarks in this update step to generate a completely new set or an already existing array to generate a new copy. Parameters ---------- observed_landmarks_n : Optional[int], optional Number of observed landmarks in this update step to initialize the numpy array, by default None array : npt.NDArray, optional Associations array to create a copy of an existing associations object, by default None """ifarrayisNone:assertobserved_landmarks_nisnotNoneself.size=0self.array=np.full(shape=(observed_landmarks_n,),fill_value=np.nan,dtype=float)else:# copy constructorassertsizeisnotNoneself.size=sizeself.array=array.copy()@propertydefis_empty(self)->bool:""" Gets whether at least one association has been established yet. Returns ------- bool Whether at least one association has been established yet. """returnself.size==0@propertydefcopy(self)->Associations:""" Gets a deepcopy of the current Associations object by generating a new one with a copy of the array. Returns ------- Associations Copy of the current associations object. """returnAssociations(array=self.array,size=self.size)@propertydefflat_observed_landmark_indices(self)->npt.NDArray:""" Gets indices of state vector and state covariance observed landmarks as flat numpy array. Returns ------- npt.NDArray Indices of observed landmarks as flat numpy array. """indices=np.argwhere(~np.isnan(self.array))[:,0]returnself.coords_indices_to_flat_indices(indices)@propertydefflat_observable_landmark_indices(self)->npt.NDArray:""" Gets indices for state vector and state covariance of observable landmarks as flat numpy array. Returns ------- npt.NDArray Indices of observable landmarks as flat numpy array. """values=self.array[np.where(~np.isnan(self.array))[0]]returnself.coords_indices_to_flat_indices(values)
[docs]@staticmethoddefcoords_indices_to_flat_indices(indices:npt.NDArray)->npt.NDArray:""" Converts landmark indices to flat numpy array containing the indices of this landmarks for the state vector. Parameters ---------- indices : npt.NDArray Indices of landmarks to generate the flat numpy array for. Returns ------- npt.NDArray Indices of landmarks as flat numpy array. """length=indices.shape[0]flatten_indices=np.empty((2*length))flatten_indices[::2]=2*indicesflatten_indices[1::2]=2*indices+1returnflatten_indices.astype(dtype=int)
[docs]deftranslate_to_global_mapping(self,observable_to_global_mapping:Optional[List[int]]=None):""" Translates the associations between the observed and observable landmarks to associations between the observed landmarks and all tracked landmarks. Parameters ---------- observable_to_global_mapping : Optional[List[int]], optional Mapping between the observable landmarks to all tracked landmarks, by default None """ifobservable_to_global_mappingisnotNone:self.array[np.isnan(self.array)]=-1self.array=np.take(np.append(observable_to_global_mapping,-1),self.array.astype(dtype=int))
[docs]classJCBB():""" Class to represent and execute the Joint compatibility branch and bound (JCBB) algorithm for a given set of observed and observable landmarks. Code has been ported from the C++ Implementation of the mobile robot programming toolkit. https://github.com/MRPT/mrpt/blob/develop/libs/slam/src/slam/data_association.cpp There have been made three changes compared to the implementation above. 1. The joint complatibility (mahalanobis distance of all associations) is only calculated at the end for all associations sets with the highest number of established associations to determine the best association set. 2. There has been prepared a feature to calculate in a more efficient way, so we believe: We take into consideration that an observed landmark might not have any individual compatible observable landmark left since all individual compatible observable landmarks have either already been associated or willingly not. 3. This implementation only supports the mahalanobis distance and not the maximum likelihood. Attributes ---------- observed_landmarks : npt.NDArray Mx(O+1) Numpy array representing the corodinates and the id of the observed landmarks. M being the number of landmarks and O the dimension of coordinates. observable_landmarks : npt.NDArray Nx(O+1) Numpy array representing the coordinates and the id of the observable landmarks. M being the number of landmarks and O the dimension of coordinates. observable_state_covariance : npt.NDArray Covariance matrix of the observable landmarks. Dimension and explanation will follow. observed_landmarks_n : int Number of observed landmarks. observable_landmarks_n : int Number of observable landmarks. associations_objects : List[Associations] List of Associations objects (sets) that have the highest number of established associations and thus are candidates for the best associations set. best_associations : Associations Best Associations object, gets assigned at the end and must be an element of associations_objects. best_distance : float Mahalanobis distance of the currently best associations set. individual_compatibility : npt.NDArray Compatibility matrix between the observed and observable landmarks based on the individual distance and color. individual_distances : npt.NDArray Matrix containing the individual distances between the observed and observable landmarks. individual_compatibility_counts : npt.NDArray Array containing the count of compatible observable landmarks for the respective observed landmark. nodes_explored_n : int Number of explored nodes of the search tree for the best associations set. association_metric : AssociationMetric Which metric to use to compare different associations sets. use_kd_tree : bool Option to specify whether a kd tree should be used for evaluating which pairings should be considered. verifies_landmark_id : bool Specified whether the landmark id (color) of observed and observable landmark should match. inverse_observable_landmark_covariances : npt.NDArray Inverse of the covariance matrix for all observable covariances so that it only has to be calculated once. chi2thres : float Threshold to evaluate whether two landmarks are compatible given their mahalanobis distance. observable_landmarks_flat_coords : npt.NDArray Coordinates of all observable landmarks flattened. observed_landmarks_flat_coords : npt.NDArray Coordinates of all observed landmarks flattened. """def__init__(self,observed_landmarks:npt.NDArray,observable_landmarks:npt.NDArray,observable_state_covariance:npt.NDArray,use_kd_tree:bool=True,chi2quantile:float=0.5,verifies_landmark_id:bool=True,observable_to_global_mapping:Optional[List[int]]=None)->None:""" Initizalizes the JCBB class. Parameters ---------- observed_landmarks : npt.NDArray Mx(O+1) Numpy array representing the corodinates and the id of the observed landmarks. M being the number of landmarks and O the dimension of coordinates. observable_landmarks : npt.NDArray Nx(O+1) Numpy array representing the coordinates and the id of the observable landmarks. M being the number of landmarks and O the dimension of coordinates. observable_state_covariance : npt.NDArray Covariance matrix of the observable landmarks. Dimension and explanation will follow. use_kd_tree : bool, optional Option to specify whether a kd tree should be used for evaluating which pairings should be considered, by default True chi2quantile : float, optional Chi^2 quantile to calculate the threshold for evaluate the compatibility of two landmarks, by default 0.5 verifies_landmark_id : bool, optional Specified whether the landmark id (color) of observed and observable landmark should match., by default True observable_to_global_mapping : Optional[List[int]], optional Mapping between the observable landmarks to all tracked landmarks., by default None """self.observed_landmarks=observed_landmarksself.observable_landmarks=observable_landmarks# filtered with FOV gateself.observable_state_covariance=observable_state_covarianceself.observed_landmarks_n=observed_landmarks.shape[0]self.observable_landmarks_n=observable_landmarks.shape[0]self.observable_landmarks_flat_coords:npt.NDArrayself.observed_landmarks_flat_coords:npt.NDArrayself.associations_objects:List[Associations]# canditates for best associationsself.best_associations:Associationsself.best_distance=np.nanself.individual_compatibility=np.zeros((self.observed_landmarks_n,self.observable_landmarks_n),dtype=bool)self.individual_distances=np.full((self.observed_landmarks_n,self.observable_landmarks_n),np.nan)self.individual_compatibility_counts=np.zeros(self.observed_landmarks_n,dtype=int)self.nodes_explored_n=0self.association_metric=DataAssociationMetric.MAHALANOBIS_DISTANCEself.use_kd_tree=use_kd_treeself.verifies_landmark_id=verifies_landmark_idself.inverse_observable_landmark_covariances=np.empty((self.observable_landmarks_n,2,2))self.chi2thres=chi2.ppf(chi2quantile,df=2)# TODO: what value to use? :(self.prepare()self.associate_with_full_covariance()self.clusters:List[Cluster]=[Cluster(observed_landmark_indices=np.full((self.observed_landmarks_n),True),individual_compatibility=self.individual_compatibility,observable_landmark_indices=np.full((self.observable_landmarks_n),True),order=np.arange(self.observed_landmarks_n))]self.clusters[0].best_associations=self.best_associationsself.best_associations.translate_to_global_mapping(observable_to_global_mapping=observable_to_global_mapping)
[docs]defprepare(self):""" Function to prepare the data for the JCBB algorithm to save some computation time. """matrices=[self.observable_state_covariance[2*observable_landmark_i:2*observable_landmark_i+2,2*observable_landmark_i:2*observable_landmark_i+2]forobservable_landmark_iinrange(self.observable_landmarks_n)]self.inverse_observable_landmark_covariances=np.linalg.inv(matrices)self.observable_landmarks_flat_coords=self.observable_landmarks[:,0:2].flatten()self.observed_landmarks_flat_coords=self.observed_landmarks[:,0:2].flatten()
[docs]defrecursive(self,observed_landmark_i:int,current_associations:Associations):""" Function to search for the best associations set recursivly. Parameters ---------- observed_landmark_i : int Index of the current observed landmark to try all possible not already associated observable landmarks. current_associations : Associations Current already established associations for this branch of the search tree. """# Is it the end of the iteration? --> one possible termination conditionifobserved_landmark_i>=self.observed_landmarks_n:ifcurrent_associations.size>self.associations_objects[0].size:# This must be a better choice since more features are matched, so all other candidates can be forgottenself.associations_objects=[current_associations]elifnotcurrent_associations.is_emptyandcurrent_associations.size==self.associations_objects[0].size:# otherwise just add his associations set to the candidates for the best associations setself.associations_objects.append(current_associations)else:# normal iteration# how many association can be still established at this pointpotentials=np.count_nonzero(self.individual_compatibility_counts[observed_landmark_i:])# if the current highest number of established association can still be topped, continue.ifcurrent_associations.size+potentials>=self.associations_objects[0].size:# loop over all observable landmarks for this observed landmarkforobservable_landmark_iinrange(self.observable_landmarks_n):# continue if this observed landmark is compatible with this observable landmarkifself.individual_compatibility[observed_landmark_i,observable_landmark_i]:# if there isn't already an association within the associations set for this observable landmarkifobservable_landmark_inotincurrent_associations.array:# search along this branch thennew_current_associations=current_associations.copynew_current_associations.array[observed_landmark_i]=observable_landmark_inew_current_associations.size+=1self.nodes_explored_n+=1self.recursive(observed_landmark_i=observed_landmark_i+1,current_associations=new_current_associations)# if this observed landmark had any compatible observable landmark, there is one potential less# by not associating any observable landmark with this observed landmark.ifself.individual_compatibility_counts[observed_landmark_i]>0:potentials-=1ifcurrent_associations.size+potentials>=self.associations_objects[0].size:self.nodes_explored_n+=1self.recursive(observed_landmark_i=observed_landmark_i+1,current_associations=current_associations)
[docs]defmahalanobis_distance_of_associations(self,current_associations:Associations)->float:""" Mahalanobis distance for a given set of associations. Is equivalent to the joint compatibility. Parameters ---------- current_associations : Associations Associations set to calculate the joint compatibility for. Returns ------- float Mahalanobis distance for the given set of associations. """associations_size=current_associations.sizeassert(associations_size>0)flat_observed_landmarks_indices=current_associations.flat_observed_landmark_indicesflat_observable_landmarks_indices=current_associations.flat_observable_landmark_indicesobserved_landmarks_coords=self.observed_landmarks_flat_coords[flat_observed_landmarks_indices]observable_landmarks_coords=self.observable_landmarks_flat_coords[flat_observable_landmarks_indices]observable_landmarks_covarince=self.observable_state_covariance[np.ix_(flat_observable_landmarks_indices,flat_observable_landmarks_indices)]returnself.calculate_metric(observed_landmarks_coords,observable_landmarks_coords,np.linalg.inv(observable_landmarks_covarince),self.association_metric)
[docs]defverify_landmark_id(self,observed_landmark_i:int,observable_landmark_i:int)->bool:""" Verifies whether the landmark id (color) of the given observed and observable landmark match. Returns always True if it has been specified that the landmark id shouldn't be verified. Parameters ---------- observed_landmark_i : int Index of the observed landmark the landmark id should be verified for. observable_landmark_i : int Index of the observable landmark the landmark id should be verified for. Returns ------- bool Whether the landmark id (color) of the given observed and observable landmark match. """ifnotself.verifies_landmark_id:returnTruereturnself.observed_landmarks[observed_landmark_i,2]==self.observable_landmarks[observable_landmark_i,2]
[docs]defassociate_with_full_covariance(self):""" Calculate best association between the observed and observable landmarks. """# at this moment only the mahalanobis distance is implemented.assertself.association_metric==DataAssociationMetric.MAHALANOBIS_DISTANCE# somehowN_KD_RESULTS=self.observable_landmarks_nkd_tree=KDTree(self.observable_landmarks[:,0:2])# init worst case distance? hopefully np.nan, what is done in the init# First calculate the individual compatibility between the observed and observable landmarks.forobserved_landmark_iinrange(self.observed_landmarks_n):ifself.use_kd_treeisFalse:# if the kd tree should not be used, loop over every observable landmark for this observed landmarkforobservable_landmark_iinrange(self.observable_landmarks_n):# cannot be compatible if the landmark id (color) doesn't matchifnotself.verify_landmark_id(observed_landmark_i,observable_landmark_i):continuemahalanobis_distance=distance.mahalanobis(self.observed_landmarks[observed_landmark_i],self.observable_landmarks[observable_landmark_i],self.inverse_observable_landmark_covariances[observable_landmark_i])# if the mahalanobis distance is larger than the specified threshold, they are not compatible.if(mahalanobis_distance>self.chi2thres):continue# save that they are compatible and their compatibility.self.individual_distances[observed_landmark_i,observable_landmark_i]=mahalanobis_distanceself.individual_compatibility[observed_landmark_i,observable_landmark_i]=Trueself.individual_compatibility_counts[observed_landmark_i]+=1elifself.use_kd_treeisTrue:# if the kd tree should not be used, loop over all observable landmarks beginning with the nearest# and break if one is too far from the observed landmark. Everything else stays the same as above._,indices=kd_tree.query(self.observed_landmarks[observed_landmark_i,0:2],k=N_KD_RESULTS)indices=np.atleast_1d(indices)forresult_indexinrange(indices.shape[-1]):observable_landmark_i=indices[result_index]ifnotself.verify_landmark_id(observed_landmark_i,observable_landmark_i):continuemahalanobis_distance=distance.mahalanobis(self.observed_landmarks[observed_landmark_i,0:2],self.observable_landmarks[observable_landmark_i,0:2],self.inverse_observable_landmark_covariances[observable_landmark_i])if(mahalanobis_distance>self.chi2thres):breakself.individual_distances[observed_landmark_i,observable_landmark_i]=mahalanobis_distanceself.individual_compatibility[observed_landmark_i,observable_landmark_i]=Trueself.individual_compatibility_counts[observed_landmark_i]+=1# initialize the candidates for the best associations set with an empty one.self.associations_objects=[Associations(observed_landmarks_n=self.observed_landmarks_n)]# start the recursive search of the search tree for the best associations setsself.recursive(observed_landmark_i=0,current_associations=self.associations_objects[0].copy)# if there are multiple candidates for the best associations set (same number of associations)iflen(self.associations_objects)>1:self.best_associations=None# choose the one with the lowest mahalanobis distanceforassociations_objectinself.associations_objects:mahalanobis_distance=self.mahalanobis_distance_of_associations(associations_object)ifself.is_closer(mahalanobis_distance,self.best_distance):self.best_associations=associations_objectself.best_distance=mahalanobis_distanceelse:# otherwise take the one best.self.best_associations=self.associations_objects[0]
[docs]@staticmethoddefcalculate_metric(observation:npt.NDArray,prediction:npt.NDArray,covariance:npt.NDArray,metric:DataAssociationMetric)->float:""" Calculate metric between an observation and a prediction scaled by the covariance of the prediction. Parameters ---------- observation : npt.NDArray Observed landmark(s). prediction : npt.NDArray Observable landmark(s). covariance : npt.NDArray Covariance of the observable landmark(s). metric : DataAssociationMetric Metric to calculate. Returns ------- float Calculated metric. """mahalanobis_distance=distance.mahalanobis(observation,prediction,covariance)ifmetric==DataAssociationMetric.MAHALANOBIS_DISTANCE:returnmahalanobis_distanceassertmetric==DataAssociationMetric.MAXIMUM_LIKELIHOODdovariance_det=np.linalg.det(covariance)maximum_likelihood=np.exp(np.sqrt(mahalanobis_distance)/-2)/(np.sqrt(2*np.pi)*np.sqrt(dovariance_det))returnmaximum_likelihood
[docs]defis_closer(self,distance1:float,distance2:float)->bool:""" Returns whether the first distance is smaller than the second distance and thus its landmarks are closer. Based on the metric to use. Parameters ---------- distance1 : float First distance. distance2 : float First distance. Returns ------- bool Whether the first distance is smaller than the second distance and thus its landmarks are closer. """ifnp.isnan(distance2):returnTrueifnp.isnan(distance1):returnFalseifself.association_metric==DataAssociationMetric.MAHALANOBIS_DISTANCE:returndistance1<distance2elifself.association_metric==DataAssociationMetric.MAXIMUM_LIKELIHOOD:returndistance1>distance2
[docs]classCluster():def__init__(self,observed_landmark_indices:npt.NDArray,individual_compatibility:npt.NDArray,observable_landmark_indices:npt.NDArray,order:npt.NDArray):""" Class to represent a cluster of observed landmark that can be searched for the best associations set independently from other landmarks cluster. Parameters ---------- observed_landmark_indices : npt.NDArray Numpy boolean array masking which observed landmarks are part of this cluster. observable_landmark_indices : npt.NDArray Numpy boolean array masking which observable landmarks are part of this cluster. order : npt.NDArray Numpy array containing the array by which the the compatibility matrix has been ordered for the clustering. """observed_order=np.argsort(np.count_nonzero(individual_compatibility[observed_landmark_indices][:,observable_landmark_indices],axis=1))[::-1]self.observed_landmark_indices=np.nonzero(observed_landmark_indices[np.argsort(order)])[0][observed_order]observable_order=np.argsort(np.count_nonzero(individual_compatibility[observed_landmark_indices][:,observable_landmark_indices],axis=0))[::-1]self.observable_landmark_indices=np.nonzero(observable_landmark_indices)[0][observable_order]self.observed_landmarks_n=np.count_nonzero(observed_landmark_indices)self.observable_landmarks_n=np.count_nonzero(observable_landmark_indices)self.associations_objects=List[Associations]self.best_associations:Associationsself.best_distance=np.nan
[docs]classClusterJCBB():""" Class to represent and execute the Joint compatibility branch and bound (JCBB) algorithm for a given set of observed and observable landmarks. Code has been ported from the C++ Implementation of the mobile robot programming toolkit. https://github.com/MRPT/mrpt/blob/develop/libs/slam/src/slam/data_association.cpp There have been made three changes compared to the implementation above. 1. The joint complatibility (mahalanobis distance of all associations) is only calculated at the end for all associations sets with the highest number of established associations to determine the best association set. 2. There has been prepared a feature to calculate in a more efficient way, so we believe: We take into consideration that an observed landmark might not have any individual compatible observable landmark left since all individual compatible observable landmarks have either already been associated or willingly not. 3. This implementation only supports the mahalanobis distance and not the maximum likelihood. Attributes ---------- observed_landmarks : npt.NDArray Mx(O+1) Numpy array representing the corodinates and the id of the observed landmarks. M being the number of landmarks and O the dimension of coordinates. observable_landmarks : npt.NDArray Nx(O+1) Numpy array representing the coordinates and the id of the observable landmarks. M being the number of landmarks and O the dimension of coordinates. observable_state_covariance : npt.NDArray Covariance matrix of the observable landmarks. Dimension and explanation will follow. observed_landmarks_n : int Number of observed landmarks. observable_landmarks_n : int Number of observable landmarks. associations_objects : List[Associations] List of Associations objects (sets) that have the highest number of established associations and thus are candidates for the best associations set. best_associations : Associations Best Associations object, gets assigned at the end and must be an element of associations_objects. best_distance : float Mahalanobis distance of the currently best associations set. individual_compatibility : npt.NDArray Compatibility matrix between the observed and observable landmarks based on the individual distance and color. individual_distances : npt.NDArray Matrix containing the individual distances between the observed and observable landmarks. individual_compatibility_counts : npt.NDArray Array containing the count of compatible observable landmarks for the respective observed landmark. nodes_explored_n : int Number of explored nodes of the search tree for the best associations set. association_metric : AssociationMetric Which metric to use to compare different associations sets. use_kd_tree : bool Option to specify whether a kd tree should be used for evaluating which pairings should be considered. verifies_landmark_id : bool Specified whether the landmark id (color) of observed and observable landmark should match. inverse_observable_landmark_covariances : npt.NDArray Inverse of the covariance matrix for all observable covariances so that it only has to be calculated once. chi2thres : float Threshold to evaluate whether two landmarks are compatible given their mahalanobis distance. observable_landmarks_flat_coords : npt.NDArray Coordinates of all observable landmarks flattened. observed_landmarks_flat_coords : npt.NDArray Coordinates of all observed landmarks flattened. clusters : List[Cluster] List of all landmark clusters. """def__init__(self,observed_landmarks:npt.NDArray,observable_landmarks:npt.NDArray,observable_state_covariance:npt.NDArray,use_kd_tree:bool=True,chi2quantile:float=0.5,verifies_landmark_id:bool=True,observable_to_global_mapping:Optional[List[int]]=None)->None:""" Initizalizes the JCBB class. Parameters ---------- observed_landmarks : npt.NDArray Mx(O+1) Numpy array representing the coordinates and the id of the observed landmarks. M being the number of landmarks and O the dimension of coordinates. observable_landmarks : npt.NDArray Nx(O+1) Numpy array representing the coordinates and the id of the observable landmarks. M being the number of landmarks and O the dimension of coordinates. observable_state_covariance : npt.NDArray Covariance matrix of the observable landmarks. Dimension and explanation will follow. use_kd_tree : bool, optional Option to specify whether a kd tree should be used for evaluating which pairings should be considered, by default True chi2quantile : float, optional Chi^2 quantile to calculate the threshold for evaluating the compatibility of two landmarks, by default 0.5 verifies_landmark_id : bool, optional Specified whether the landmark id (color) of observed and observable landmark should match, by default True observable_to_global_mapping : Optional[List[int]], optional Mapping between the observable landmarks to all tracked landmarks, by default None """self.observed_landmarks=observed_landmarksself.observable_landmarks=observable_landmarks# filtered with FOV gateself.observable_state_covariance=observable_state_covarianceself.observed_landmarks_n=observed_landmarks.shape[0]self.observable_landmarks_n=observable_landmarks.shape[0]self.observable_landmarks_flat_coords:npt.NDArrayself.observed_landmarks_flat_coords:npt.NDArrayself.associations_objects:List[Associations]=[]# candidates for best associationsself.best_associations:Associationsself.best_distance=np.nanself.individual_compatibility=np.zeros((self.observed_landmarks_n,self.observable_landmarks_n),dtype=bool)self.individual_distances=np.full((self.observed_landmarks_n,self.observable_landmarks_n),np.nan)self.individual_compatibility_counts=np.zeros(self.observed_landmarks_n,dtype=int)self.nodes_explored_n=0self.association_metric=DataAssociationMetric.MAHALANOBIS_DISTANCEself.use_kd_tree=use_kd_treeself.verifies_landmark_id=verifies_landmark_idself.inverse_observable_landmark_covariances=np.empty((self.observable_landmarks_n,2,2))self.chi2thres=chi2.ppf(chi2quantile,df=2)# TODO: what value to use? :(self.clusters:List[Cluster]=[]self.prepare()self.associate_with_full_covariance()self.best_associations.translate_to_global_mapping(observable_to_global_mapping=observable_to_global_mapping)
[docs]defprepare(self):""" Function to prepare the data for the JCBB algorithm to save some computation time. """matrices=[self.observable_state_covariance[2*observable_landmark_i:2*observable_landmark_i+2,2*observable_landmark_i:2*observable_landmark_i+2]forobservable_landmark_iinrange(self.observable_landmarks_n)]self.inverse_observable_landmark_covariances=np.linalg.inv(matrices)self.observable_landmarks_flat_coords=self.observable_landmarks[:,0:2].flatten()self.observed_landmarks_flat_coords=self.observed_landmarks[:,0:2].flatten()
[docs]defrecursive(self,cluster_observed_landmark_i:int,current_associations:Associations,cluster:Cluster):""" Function to search for the best associations set recursivly. Parameters ---------- cluster_observed_landmark_i : int Index of the current observed landmark to try all possible not already associated observable landmarks. current_associations : Associations Current already established associations for this branch of the search tree. cluster : Cluster Cluster for which the best associations are currently searched for. """# Is it the end of the iteration? --> one possible termination conditionifcluster_observed_landmark_i>=cluster.observed_landmarks_n:ifcurrent_associations.size>cluster.associations_objects[0].size:# This must be a better choice since more features are matched, so all other candidates can be forgottencluster.associations_objects=[current_associations]elifnotcurrent_associations.is_emptyandcurrent_associations.size==cluster.associations_objects[0].size:# noqa: E501# otherwise just add his associations set to the candidates for the best associations setcluster.associations_objects.append(current_associations)else:# normal iteration# how many association can be still established at this pointobserved_landmark_i=cluster.observed_landmark_indices[cluster_observed_landmark_i]potentials=np.count_nonzero(self.individual_compatibility_counts[cluster.observed_landmark_indices[cluster_observed_landmark_i:]])# if the current highest number of established association can still be topped, continue.ifcurrent_associations.size+potentials>=cluster.associations_objects[0].size:# loop over all observable landmarks for this observed landmarkforobservable_landmark_iincluster.observable_landmark_indices:# continue if this observed landmark is compatible with this observable landmarkifself.individual_compatibility[observed_landmark_i,observable_landmark_i]:# if there isn't already an association within the associations set for this observable landmarkifobservable_landmark_inotincurrent_associations.array:# search along this branch thennew_current_associations=current_associations.copynew_current_associations.array[observed_landmark_i]=observable_landmark_inew_current_associations.size+=1self.nodes_explored_n+=1self.recursive(cluster_observed_landmark_i=cluster_observed_landmark_i+1,current_associations=new_current_associations,cluster=cluster)# if this observed landmark had any compatible observable landmark, there is one potential less# by not associating any observable landmark with this observed landmark.ifself.individual_compatibility_counts[observed_landmark_i]>0:potentials-=1ifcurrent_associations.size+potentials>=cluster.associations_objects[0].size:self.nodes_explored_n+=1self.recursive(cluster_observed_landmark_i=cluster_observed_landmark_i+1,current_associations=current_associations,cluster=cluster)
[docs]defmahalanobis_distance_of_associations(self,current_associations:Associations)->float:""" Mahalanobis distance for a given set of associations. Is equivalent to the joint compatibility. Parameters ---------- current_associations : Associations Associations set to calculate the joint compatibility for. Returns ------- float Mahalanobis distance for the given set of associations. """associations_size=current_associations.sizeassert(associations_size>0)flat_observed_landmarks_indices=current_associations.flat_observed_landmark_indicesflat_observable_landmarks_indices=current_associations.flat_observable_landmark_indicesobserved_landmarks_coords=self.observed_landmarks_flat_coords[flat_observed_landmarks_indices]observable_landmarks_coords=self.observable_landmarks_flat_coords[flat_observable_landmarks_indices]observable_landmarks_covarince=self.observable_state_covariance[np.ix_(flat_observable_landmarks_indices,flat_observable_landmarks_indices)]returnself.calculate_metric(observed_landmarks_coords,observable_landmarks_coords,np.linalg.inv(observable_landmarks_covarince),self.association_metric)
[docs]defverify_landmark_id(self,observed_landmark_i:int,observable_landmark_i:int)->bool:""" Verifies whether the landmark id (color) of the given observed and observable landmark match. Returns always True if it has been specified that the landmark id shouldn't be verified. Parameters ---------- observed_landmark_i : int Index of the observed landmark the landmark id should be verified for. observable_landmark_i : int Index of the observable landmark the landmark id should be verified for. Returns ------- bool Whether the landmark id (color) of the given observed and observable landmark match. """ifnotself.verifies_landmark_id:returnTruereturnself.observed_landmarks[observed_landmark_i,2]==self.observable_landmarks[observable_landmark_i,2]
[docs]defassociate_with_full_covariance(self):""" Calculate best association between the observed and observable landmarks. """# at this moment only the mahalanobis distance is implemented.assertself.association_metric==DataAssociationMetric.MAHALANOBIS_DISTANCE# somehowN_KD_RESULTS=self.observable_landmarks_nkd_tree=KDTree(self.observable_landmarks[:,0:2])# init worst case distance? hopefully np.nan, what is done in the init# First calculate the individual compatibility between the observed and observable landmarks.forobserved_landmark_iinrange(self.observed_landmarks_n):ifself.use_kd_treeisFalse:# if the kd tree should not be used, loop over every observable landmark for this observed landmarkforobservable_landmark_iinrange(self.observable_landmarks_n):# cannot be compatible if the landmark id (color) doesn't matchifnotself.verify_landmark_id(observed_landmark_i,observable_landmark_i):continuemahalanobis_distance=distance.mahalanobis(self.observed_landmarks[observed_landmark_i],self.observable_landmarks[observable_landmark_i],self.inverse_observable_landmark_covariances[observable_landmark_i])# if the mahalanobis distance is larger than the specified threshold, they are not compatible.if(mahalanobis_distance>self.chi2thres):continue# save that they are compatible and their compatibility.self.individual_distances[observed_landmark_i,observable_landmark_i]=mahalanobis_distanceself.individual_compatibility[observed_landmark_i,observable_landmark_i]=Trueself.individual_compatibility_counts[observed_landmark_i]+=1elifself.use_kd_treeisTrue:# if the kd tree should not be used, loop over all observable landmarks beginning with the nearest# and break if one is too far from the observed landmark. Everything else stays the same as above._,indices=kd_tree.query(self.observed_landmarks[observed_landmark_i,0:2],k=N_KD_RESULTS)indices=np.atleast_1d(indices)forresult_indexinrange(indices.shape[-1]):observable_landmark_i=indices[result_index]ifnotself.verify_landmark_id(observed_landmark_i,observable_landmark_i):continuemahalanobis_distance=distance.mahalanobis(self.observed_landmarks[observed_landmark_i,0:2],self.observable_landmarks[observable_landmark_i,0:2],self.inverse_observable_landmark_covariances[observable_landmark_i])if(mahalanobis_distance>self.chi2thres):breakself.individual_distances[observed_landmark_i,observable_landmark_i]=mahalanobis_distanceself.individual_compatibility[observed_landmark_i,observable_landmark_i]=Trueself.individual_compatibility_counts[observed_landmark_i]+=1ifnp.count_nonzero(self.individual_compatibility)==0:self.best_associations=Associations(array=np.full(self.observed_landmarks_n,-1),size=0)returnself.find_clusters()# find the best associations set for every clusterforclusterinself.clusters:# initialize the candidates for the best associations set with an empty one.cluster.associations_objects=[Associations(observed_landmarks_n=self.observed_landmarks_n)]# start the recursive search of the search tree for the best associations setsself.recursive(cluster_observed_landmark_i=0,cluster=cluster,current_associations=cluster.associations_objects[0].copy)self.associations_objects+=cluster.associations_objects# if there are multiple candidates for the best associations set (same number of associations)iflen(cluster.associations_objects)>1:cluster.best_associations=None# choose the one with the lowest mahalanobis distanceforassociations_objectincluster.associations_objects:mahalanobis_distance=self.mahalanobis_distance_of_associations(associations_object)ifself.is_closer(mahalanobis_distance,cluster.best_distance):cluster.best_associations=associations_objectcluster.best_distance=mahalanobis_distanceelse:# otherwise take the one best.cluster.best_associations=cluster.associations_objects[0]# combine every best association sets for each cluster to one associations setarrays=[cluster.best_associations.arrayforclusterinself.clusters]array=np.atleast_1d(np.nansum(arrays,axis=0))array[np.all(np.isnan(arrays),axis=0)]=np.nanself.best_associations=Associations(array=array,size=np.count_nonzero(array))
[docs]deffind_clusters(self):""" Find clusters of compatible landmarks in which the best associations can be searched for independently. """order=np.argsort(self.individual_compatibility_counts)to_be_examined_individual_compatibility=self.individual_compatibility[order]cluster_oberved_indices_array=[]cluster_obervable_indices_array=[]while(to_be_examined_individual_compatibility.shape[0]>0):ifnp.count_nonzero(to_be_examined_individual_compatibility[-1])==0:breakold_observable_landmarks_in_cluster=to_be_examined_individual_compatibility[-1]whileTrue:cluster_oberved_indices=np.count_nonzero(to_be_examined_individual_compatibility[:,old_observable_landmarks_in_cluster],axis=1)>0new_observable_landmarks_in_cluster=np.logical_or.reduce(to_be_examined_individual_compatibility[cluster_oberved_indices])ifnp.array_equal(new_observable_landmarks_in_cluster,old_observable_landmarks_in_cluster):breakold_observable_landmarks_in_cluster=new_observable_landmarks_in_clusteriflen(cluster_oberved_indices_array)>0:cluster_oberved_indices_mask=np.zeros(self.individual_compatibility.shape[0],dtype=bool)cluster_oberved_indices_mask[~np.logical_or.reduce(cluster_oberved_indices_array)]= \
cluster_oberved_indiceselse:cluster_oberved_indices_mask=cluster_oberved_indicescluster_oberved_indices_array.append(cluster_oberved_indices_mask)cluster_obervable_indices_array.append(old_observable_landmarks_in_cluster)self.clusters.append(Cluster(observed_landmark_indices=cluster_oberved_indices_mask,observable_landmark_indices=old_observable_landmarks_in_cluster,order=order,individual_compatibility=self.individual_compatibility[order]))to_be_examined_individual_compatibility=to_be_examined_individual_compatibility[~cluster_oberved_indices]
[docs]@staticmethoddefcalculate_metric(observation:npt.NDArray,prediction:npt.NDArray,covariance:npt.NDArray,metric:DataAssociationMetric)->float:""" Calculate metric between an observation and a prediction scaled by the covariance of the prediction. Parameters ---------- observation : npt.NDArray Observed landmark(s). prediction : npt.NDArray Observable landmark(s). covariance : npt.NDArray Covariance of the observable landmark(s). metric : DataAssociationMetric Metric to calculate. Returns ------- float Calculated metric. """mahalanobis_distance=distance.mahalanobis(observation,prediction,covariance)ifmetric==DataAssociationMetric.MAHALANOBIS_DISTANCE:returnmahalanobis_distanceassertmetric==DataAssociationMetric.MAXIMUM_LIKELIHOODdovariance_det=np.linalg.det(covariance)maximum_likelihood=np.exp(np.sqrt(mahalanobis_distance)/-2)/(np.sqrt(2*np.pi)*np.sqrt(dovariance_det))returnmaximum_likelihood
[docs]defis_closer(self,distance1:float,distance2:float)->bool:""" Returns whether the first distance is smaller than the second distance and thus its landmarks are closer. Based on the metric to use. Parameters ---------- distance1 : float First distance. distance2 : float First distance. Returns ------- bool Whether the first distance is smaller than the second distance and thus its landmarks are closer. """ifnp.isnan(distance2):returnTrueifnp.isnan(distance1):returnFalseifself.association_metric==DataAssociationMetric.MAHALANOBIS_DISTANCE:returndistance1<distance2elifself.association_metric==DataAssociationMetric.MAXIMUM_LIKELIHOOD:returndistance1>distance2