"""
Implements multiple classes and function used for the data association of landmarks
"""
from __future__ import annotations
from enum import Enum
from typing import List, Optional
import numpy.typing as npt
import numpy as np
from scipy.spatial import distance, KDTree
from scipy.stats.distributions import chi2
[docs]class DataAssociationMetric(Enum):
"""
Enum to describe which metric to use to measure the compatilibity of two landmarks.
"""
MAHALANOBIS_DISTANCE = 1
MAXIMUM_LIKELIHOOD = 2
[docs]class Associations():
"""
Class to represent a set of established associations between observed and observable landmarks.
The associations are saved in a numpy array. Each index represents an observed landmark identified by its index
and the respective value describes the associated observable landmark identified by its index.
Attributes
----------
array : npt.NDArray
Numpy array saving the established associations between the observed and observable landmarks.
size : int
Number of established associations.
"""
def __init__(self, observed_landmarks_n: Optional[int] = None,
array: npt.NDArray = None, size: Optional[int] = None) -> None:
"""
Initilization function for a new set of associations.
One either passes the number of observed landmarks in this update step to generate a completely new set or
an already existing array to generate a new copy.
Parameters
----------
observed_landmarks_n : Optional[int], optional
Number of observed landmarks in this update step to initialize the numpy array, by default None
array : npt.NDArray, optional
Associations array to create a copy of an existing associations object, by default None
"""
if array is None:
assert observed_landmarks_n is not None
self.size = 0
self.array = np.full(shape=(observed_landmarks_n, ), fill_value=np.nan, dtype=float)
else: # copy constructor
assert size is not None
self.size = size
self.array = array.copy()
@property
def is_empty(self) -> bool:
"""
Gets whether at least one association has been established yet.
Returns
-------
bool
Whether at least one association has been established yet.
"""
return self.size == 0
@property
def copy(self) -> Associations:
"""
Gets a deepcopy of the current Associations object by generating a new one with a copy of the array.
Returns
-------
Associations
Copy of the current associations object.
"""
return Associations(array=self.array, size=self.size)
@property
def flat_observed_landmark_indices(self) -> npt.NDArray:
"""
Gets indices of state vector and state covariance observed landmarks as flat numpy array.
Returns
-------
npt.NDArray
Indices of observed landmarks as flat numpy array.
"""
indices = np.argwhere(~np.isnan(self.array))[:, 0]
return self.coords_indices_to_flat_indices(indices)
@property
def flat_observable_landmark_indices(self) -> npt.NDArray:
"""
Gets indices for state vector and state covariance of observable landmarks as flat numpy array.
Returns
-------
npt.NDArray
Indices of observable landmarks as flat numpy array.
"""
values = self.array[np.where(~np.isnan(self.array))[0]]
return self.coords_indices_to_flat_indices(values)
[docs] @staticmethod
def coords_indices_to_flat_indices(indices: npt.NDArray) -> npt.NDArray:
"""
Converts landmark indices to flat numpy array containing the indices of this landmarks for the state vector.
Parameters
----------
indices : npt.NDArray
Indices of landmarks to generate the flat numpy array for.
Returns
-------
npt.NDArray
Indices of landmarks as flat numpy array.
"""
length = indices.shape[0]
flatten_indices = np.empty((2*length))
flatten_indices[::2] = 2*indices
flatten_indices[1::2] = 2*indices + 1
return flatten_indices.astype(dtype=int)
[docs] def translate_to_global_mapping(self, observable_to_global_mapping: Optional[List[int]] = None):
"""
Translates the associations between the observed and observable landmarks
to associations between the observed landmarks and all tracked landmarks.
Parameters
----------
observable_to_global_mapping : Optional[List[int]], optional
Mapping between the observable landmarks to all tracked landmarks, by default None
"""
if observable_to_global_mapping is not None:
self.array[np.isnan(self.array)] = -1
self.array = np.take(np.append(observable_to_global_mapping, -1), self.array.astype(dtype=int))
[docs]class JCBB():
"""
Class to represent and execute the Joint compatibility branch and bound (JCBB) algorithm for a given set of
observed and observable landmarks.
Code has been ported from the C++ Implementation of the mobile robot programming toolkit.
https://github.com/MRPT/mrpt/blob/develop/libs/slam/src/slam/data_association.cpp
There have been made three changes compared to the implementation above.
1. The joint complatibility (mahalanobis distance of all associations) is only calculated at the end for all
associations sets with the highest number of established associations to determine the best association set.
2. There has been prepared a feature to calculate in a more efficient way, so we believe: We take into
consideration that an observed landmark might not have any individual compatible observable landmark left since
all individual compatible observable landmarks have either already been associated or willingly not.
3. This implementation only supports the mahalanobis distance and not the maximum likelihood.
Attributes
----------
observed_landmarks : npt.NDArray
Mx(O+1) Numpy array representing the corodinates and the id of the observed landmarks.
M being the number of landmarks and O the dimension of coordinates.
observable_landmarks : npt.NDArray
Nx(O+1) Numpy array representing the coordinates and the id of the observable landmarks.
M being the number of landmarks and O the dimension of coordinates.
observable_state_covariance : npt.NDArray
Covariance matrix of the observable landmarks. Dimension and explanation will follow.
observed_landmarks_n : int
Number of observed landmarks.
observable_landmarks_n : int
Number of observable landmarks.
associations_objects : List[Associations]
List of Associations objects (sets) that have the highest number of established associations
and thus are candidates for the best associations set.
best_associations : Associations
Best Associations object, gets assigned at the end and must be an element of associations_objects.
best_distance : float
Mahalanobis distance of the currently best associations set.
individual_compatibility : npt.NDArray
Compatibility matrix between the observed and observable landmarks based on the individual distance and color.
individual_distances : npt.NDArray
Matrix containing the individual distances between the observed and observable landmarks.
individual_compatibility_counts : npt.NDArray
Array containing the count of compatible observable landmarks for the respective observed landmark.
nodes_explored_n : int
Number of explored nodes of the search tree for the best associations set.
association_metric : AssociationMetric
Which metric to use to compare different associations sets.
use_kd_tree : bool
Option to specify whether a kd tree should be used for evaluating which pairings should be considered.
verifies_landmark_id : bool
Specified whether the landmark id (color) of observed and observable landmark should match.
inverse_observable_landmark_covariances : npt.NDArray
Inverse of the covariance matrix for all observable covariances so that it only has to be calculated once.
chi2thres : float
Threshold to evaluate whether two landmarks are compatible given their mahalanobis distance.
observable_landmarks_flat_coords : npt.NDArray
Coordinates of all observable landmarks flattened.
observed_landmarks_flat_coords : npt.NDArray
Coordinates of all observed landmarks flattened.
"""
def __init__(self, observed_landmarks: npt.NDArray, observable_landmarks: npt.NDArray,
observable_state_covariance: npt.NDArray, use_kd_tree: bool = True,
chi2quantile: float = 0.5, verifies_landmark_id: bool = True,
observable_to_global_mapping: Optional[List[int]] = None) -> None:
"""
Initizalizes the JCBB class.
Parameters
----------
observed_landmarks : npt.NDArray
Mx(O+1) Numpy array representing the corodinates and the id of the observed landmarks.
M being the number of landmarks and O the dimension of coordinates.
observable_landmarks : npt.NDArray
Nx(O+1) Numpy array representing the coordinates and the id of the observable landmarks.
M being the number of landmarks and O the dimension of coordinates.
observable_state_covariance : npt.NDArray
Covariance matrix of the observable landmarks. Dimension and explanation will follow.
use_kd_tree : bool, optional
Option to specify whether a kd tree should be used for evaluating
which pairings should be considered, by default True
chi2quantile : float, optional
Chi^2 quantile to calculate the threshold for evaluate the compatibility of two landmarks, by default 0.5
verifies_landmark_id : bool, optional
Specified whether the landmark id (color) of observed and observable landmark should match., by default True
observable_to_global_mapping : Optional[List[int]], optional
Mapping between the observable landmarks to all tracked landmarks., by default None
"""
self.observed_landmarks = observed_landmarks
self.observable_landmarks = observable_landmarks # filtered with FOV gate
self.observable_state_covariance = observable_state_covariance
self.observed_landmarks_n = observed_landmarks.shape[0]
self.observable_landmarks_n = observable_landmarks.shape[0]
self.observable_landmarks_flat_coords: npt.NDArray
self.observed_landmarks_flat_coords: npt.NDArray
self.associations_objects: List[Associations] # canditates for best associations
self.best_associations: Associations
self.best_distance = np.nan
self.individual_compatibility = np.zeros((self.observed_landmarks_n, self.observable_landmarks_n), dtype=bool)
self.individual_distances = np.full((self.observed_landmarks_n, self.observable_landmarks_n), np.nan)
self.individual_compatibility_counts = np.zeros(self.observed_landmarks_n, dtype=int)
self.nodes_explored_n = 0
self.association_metric = DataAssociationMetric.MAHALANOBIS_DISTANCE
self.use_kd_tree = use_kd_tree
self.verifies_landmark_id = verifies_landmark_id
self.inverse_observable_landmark_covariances = np.empty((self.observable_landmarks_n, 2, 2))
self.chi2thres = chi2.ppf(chi2quantile, df=2) # TODO: what value to use? :(
self.prepare()
self.associate_with_full_covariance()
self.clusters: List[Cluster] = [Cluster(observed_landmark_indices=np.full((self.observed_landmarks_n), True),
individual_compatibility=self.individual_compatibility,
observable_landmark_indices=np.full((self.observable_landmarks_n), True),
order=np.arange(self.observed_landmarks_n))]
self.clusters[0].best_associations = self.best_associations
self.best_associations.translate_to_global_mapping(observable_to_global_mapping=observable_to_global_mapping)
[docs] def prepare(self):
"""
Function to prepare the data for the JCBB algorithm to save some computation time.
"""
matrices = [self.observable_state_covariance[2*observable_landmark_i:2*observable_landmark_i + 2,
2*observable_landmark_i:2*observable_landmark_i + 2]
for observable_landmark_i in range(self.observable_landmarks_n)]
self.inverse_observable_landmark_covariances = np.linalg.inv(matrices)
self.observable_landmarks_flat_coords = self.observable_landmarks[:, 0:2].flatten()
self.observed_landmarks_flat_coords = self.observed_landmarks[:, 0:2].flatten()
[docs] def recursive(self, observed_landmark_i: int, current_associations: Associations):
"""
Function to search for the best associations set recursivly.
Parameters
----------
observed_landmark_i : int
Index of the current observed landmark to try all possible not already associated observable landmarks.
current_associations : Associations
Current already established associations for this branch of the search tree.
"""
# Is it the end of the iteration? --> one possible termination condition
if observed_landmark_i >= self.observed_landmarks_n:
if current_associations.size > self.associations_objects[0].size:
# This must be a better choice since more features are matched, so all other candidates can be forgotten
self.associations_objects = [current_associations]
elif not current_associations.is_empty and current_associations.size == self.associations_objects[0].size:
# otherwise just add his associations set to the candidates for the best associations set
self.associations_objects.append(current_associations)
else: # normal iteration
# how many association can be still established at this point
potentials = np.count_nonzero(self.individual_compatibility_counts[observed_landmark_i:])
# if the current highest number of established association can still be topped, continue.
if current_associations.size + potentials >= self.associations_objects[0].size:
# loop over all observable landmarks for this observed landmark
for observable_landmark_i in range(self.observable_landmarks_n):
# continue if this observed landmark is compatible with this observable landmark
if self.individual_compatibility[observed_landmark_i, observable_landmark_i]:
# if there isn't already an association within the associations set for this observable landmark
if observable_landmark_i not in current_associations.array:
# search along this branch then
new_current_associations = current_associations.copy
new_current_associations.array[observed_landmark_i] = observable_landmark_i
new_current_associations.size += 1
self.nodes_explored_n += 1
self.recursive(observed_landmark_i=observed_landmark_i + 1,
current_associations=new_current_associations)
# if this observed landmark had any compatible observable landmark, there is one potential less
# by not associating any observable landmark with this observed landmark.
if self.individual_compatibility_counts[observed_landmark_i] > 0:
potentials -= 1
if current_associations.size + potentials >= self.associations_objects[0].size:
self.nodes_explored_n += 1
self.recursive(observed_landmark_i=observed_landmark_i + 1, current_associations=current_associations)
[docs] def mahalanobis_distance_of_associations(self, current_associations: Associations) -> float:
"""
Mahalanobis distance for a given set of associations. Is equivalent to the joint compatibility.
Parameters
----------
current_associations : Associations
Associations set to calculate the joint compatibility for.
Returns
-------
float
Mahalanobis distance for the given set of associations.
"""
associations_size = current_associations.size
assert(associations_size > 0)
flat_observed_landmarks_indices = current_associations.flat_observed_landmark_indices
flat_observable_landmarks_indices = current_associations.flat_observable_landmark_indices
observed_landmarks_coords = self.observed_landmarks_flat_coords[flat_observed_landmarks_indices]
observable_landmarks_coords = self.observable_landmarks_flat_coords[flat_observable_landmarks_indices]
observable_landmarks_covarince = self.observable_state_covariance[np.ix_(flat_observable_landmarks_indices,
flat_observable_landmarks_indices)]
return self.calculate_metric(observed_landmarks_coords, observable_landmarks_coords,
np.linalg.inv(observable_landmarks_covarince), self.association_metric)
[docs] def verify_landmark_id(self, observed_landmark_i: int, observable_landmark_i: int) -> bool:
"""
Verifies whether the landmark id (color) of the given observed and observable landmark match.
Returns always True if it has been specified that the landmark id shouldn't be verified.
Parameters
----------
observed_landmark_i : int
Index of the observed landmark the landmark id should be verified for.
observable_landmark_i : int
Index of the observable landmark the landmark id should be verified for.
Returns
-------
bool
Whether the landmark id (color) of the given observed and observable landmark match.
"""
if not self.verifies_landmark_id:
return True
return self.observed_landmarks[observed_landmark_i, 2] == self.observable_landmarks[observable_landmark_i, 2]
[docs] def associate_with_full_covariance(self):
"""
Calculate best association between the observed and observable landmarks.
"""
# at this moment only the mahalanobis distance is implemented.
assert self.association_metric == DataAssociationMetric.MAHALANOBIS_DISTANCE
# somehow
N_KD_RESULTS = self.observable_landmarks_n
kd_tree = KDTree(self.observable_landmarks[:, 0:2])
# init worst case distance? hopefully np.nan, what is done in the init
# First calculate the individual compatibility between the observed and observable landmarks.
for observed_landmark_i in range(self.observed_landmarks_n):
if self.use_kd_tree is False:
# if the kd tree should not be used, loop over every observable landmark for this observed landmark
for observable_landmark_i in range(self.observable_landmarks_n):
# cannot be compatible if the landmark id (color) doesn't match
if not self.verify_landmark_id(observed_landmark_i, observable_landmark_i):
continue
mahalanobis_distance = distance.mahalanobis(
self.observed_landmarks[observed_landmark_i], self.observable_landmarks[observable_landmark_i],
self.inverse_observable_landmark_covariances[observable_landmark_i])
# if the mahalanobis distance is larger than the specified threshold, they are not compatible.
if(mahalanobis_distance > self.chi2thres):
continue
# save that they are compatible and their compatibility.
self.individual_distances[observed_landmark_i, observable_landmark_i] = mahalanobis_distance
self.individual_compatibility[observed_landmark_i, observable_landmark_i] = True
self.individual_compatibility_counts[observed_landmark_i] += 1
elif self.use_kd_tree is True:
# if the kd tree should not be used, loop over all observable landmarks beginning with the nearest
# and break if one is too far from the observed landmark. Everything else stays the same as above.
_, indices = kd_tree.query(self.observed_landmarks[observed_landmark_i, 0:2], k=N_KD_RESULTS)
indices = np.atleast_1d(indices)
for result_index in range(indices.shape[-1]):
observable_landmark_i = indices[result_index]
if not self.verify_landmark_id(observed_landmark_i, observable_landmark_i):
continue
mahalanobis_distance = distance.mahalanobis(
self.observed_landmarks[observed_landmark_i, 0:2],
self.observable_landmarks[observable_landmark_i, 0:2],
self.inverse_observable_landmark_covariances[observable_landmark_i])
if(mahalanobis_distance > self.chi2thres):
break
self.individual_distances[observed_landmark_i, observable_landmark_i] = mahalanobis_distance
self.individual_compatibility[observed_landmark_i, observable_landmark_i] = True
self.individual_compatibility_counts[observed_landmark_i] += 1
# initialize the candidates for the best associations set with an empty one.
self.associations_objects = [Associations(observed_landmarks_n=self.observed_landmarks_n)]
# start the recursive search of the search tree for the best associations sets
self.recursive(observed_landmark_i=0, current_associations=self.associations_objects[0].copy)
# if there are multiple candidates for the best associations set (same number of associations)
if len(self.associations_objects) > 1:
self.best_associations = None
# choose the one with the lowest mahalanobis distance
for associations_object in self.associations_objects:
mahalanobis_distance = self.mahalanobis_distance_of_associations(associations_object)
if self.is_closer(mahalanobis_distance, self.best_distance):
self.best_associations = associations_object
self.best_distance = mahalanobis_distance
else:
# otherwise take the one best.
self.best_associations = self.associations_objects[0]
[docs] @staticmethod
def calculate_metric(observation: npt.NDArray, prediction: npt.NDArray,
covariance: npt.NDArray, metric: DataAssociationMetric) -> float:
"""
Calculate metric between an observation and a prediction scaled by the covariance of the prediction.
Parameters
----------
observation : npt.NDArray
Observed landmark(s).
prediction : npt.NDArray
Observable landmark(s).
covariance : npt.NDArray
Covariance of the observable landmark(s).
metric : DataAssociationMetric
Metric to calculate.
Returns
-------
float
Calculated metric.
"""
mahalanobis_distance = distance.mahalanobis(observation, prediction, covariance)
if metric == DataAssociationMetric.MAHALANOBIS_DISTANCE:
return mahalanobis_distance
assert metric == DataAssociationMetric.MAXIMUM_LIKELIHOOD
dovariance_det = np.linalg.det(covariance)
maximum_likelihood = np.exp(np.sqrt(mahalanobis_distance) / -2) / (np.sqrt(2 * np.pi) * np.sqrt(dovariance_det))
return maximum_likelihood
[docs] def is_closer(self, distance1: float, distance2: float) -> bool:
"""
Returns whether the first distance is smaller than the second distance and thus its landmarks are closer.
Based on the metric to use.
Parameters
----------
distance1 : float
First distance.
distance2 : float
First distance.
Returns
-------
bool
Whether the first distance is smaller than the second distance and thus its landmarks are closer.
"""
if np.isnan(distance2):
return True
if np.isnan(distance1):
return False
if self.association_metric == DataAssociationMetric.MAHALANOBIS_DISTANCE:
return distance1 < distance2
elif self.association_metric == DataAssociationMetric.MAXIMUM_LIKELIHOOD:
return distance1 > distance2
[docs]class Cluster():
def __init__(self, observed_landmark_indices: npt.NDArray, individual_compatibility: npt.NDArray,
observable_landmark_indices: npt.NDArray, order: npt.NDArray):
"""
Class to represent a cluster of observed landmark that can be searched for the best associations set
independently from other landmarks cluster.
Parameters
----------
observed_landmark_indices : npt.NDArray
Numpy boolean array masking which observed landmarks are part of this cluster.
observable_landmark_indices : npt.NDArray
Numpy boolean array masking which observable landmarks are part of this cluster.
order : npt.NDArray
Numpy array containing the array by which the the compatibility matrix has been ordered for the clustering.
"""
observed_order = np.argsort(np.count_nonzero(individual_compatibility[observed_landmark_indices][:, observable_landmark_indices], axis=1))[::-1]
self.observed_landmark_indices = np.nonzero(observed_landmark_indices[np.argsort(order)])[0][observed_order]
observable_order = np.argsort(np.count_nonzero(individual_compatibility[observed_landmark_indices][:, observable_landmark_indices], axis=0))[::-1]
self.observable_landmark_indices = np.nonzero(observable_landmark_indices)[0][observable_order]
self.observed_landmarks_n = np.count_nonzero(observed_landmark_indices)
self.observable_landmarks_n = np.count_nonzero(observable_landmark_indices)
self.associations_objects = List[Associations]
self.best_associations: Associations
self.best_distance = np.nan
[docs]class ClusterJCBB():
"""
Class to represent and execute the Joint compatibility branch and bound (JCBB) algorithm for a given set of
observed and observable landmarks.
Code has been ported from the C++ Implementation of the mobile robot programming toolkit.
https://github.com/MRPT/mrpt/blob/develop/libs/slam/src/slam/data_association.cpp
There have been made three changes compared to the implementation above.
1. The joint complatibility (mahalanobis distance of all associations) is only calculated at the end for all
associations sets with the highest number of established associations to determine the best association set.
2. There has been prepared a feature to calculate in a more efficient way, so we believe: We take into
consideration that an observed landmark might not have any individual compatible observable landmark left since
all individual compatible observable landmarks have either already been associated or willingly not.
3. This implementation only supports the mahalanobis distance and not the maximum likelihood.
Attributes
----------
observed_landmarks : npt.NDArray
Mx(O+1) Numpy array representing the corodinates and the id of the observed landmarks.
M being the number of landmarks and O the dimension of coordinates.
observable_landmarks : npt.NDArray
Nx(O+1) Numpy array representing the coordinates and the id of the observable landmarks.
M being the number of landmarks and O the dimension of coordinates.
observable_state_covariance : npt.NDArray
Covariance matrix of the observable landmarks. Dimension and explanation will follow.
observed_landmarks_n : int
Number of observed landmarks.
observable_landmarks_n : int
Number of observable landmarks.
associations_objects : List[Associations]
List of Associations objects (sets) that have the highest number of established associations
and thus are candidates for the best associations set.
best_associations : Associations
Best Associations object, gets assigned at the end and must be an element of associations_objects.
best_distance : float
Mahalanobis distance of the currently best associations set.
individual_compatibility : npt.NDArray
Compatibility matrix between the observed and observable landmarks based on the individual distance and color.
individual_distances : npt.NDArray
Matrix containing the individual distances between the observed and observable landmarks.
individual_compatibility_counts : npt.NDArray
Array containing the count of compatible observable landmarks for the respective observed landmark.
nodes_explored_n : int
Number of explored nodes of the search tree for the best associations set.
association_metric : AssociationMetric
Which metric to use to compare different associations sets.
use_kd_tree : bool
Option to specify whether a kd tree should be used for evaluating which pairings should be considered.
verifies_landmark_id : bool
Specified whether the landmark id (color) of observed and observable landmark should match.
inverse_observable_landmark_covariances : npt.NDArray
Inverse of the covariance matrix for all observable covariances so that it only has to be calculated once.
chi2thres : float
Threshold to evaluate whether two landmarks are compatible given their mahalanobis distance.
observable_landmarks_flat_coords : npt.NDArray
Coordinates of all observable landmarks flattened.
observed_landmarks_flat_coords : npt.NDArray
Coordinates of all observed landmarks flattened.
clusters : List[Cluster]
List of all landmark clusters.
"""
def __init__(self, observed_landmarks: npt.NDArray, observable_landmarks: npt.NDArray,
observable_state_covariance: npt.NDArray, use_kd_tree: bool = True,
chi2quantile: float = 0.5, verifies_landmark_id: bool = True,
observable_to_global_mapping: Optional[List[int]] = None) -> None:
"""
Initizalizes the JCBB class.
Parameters
----------
observed_landmarks : npt.NDArray
Mx(O+1) Numpy array representing the coordinates and the id of the observed landmarks.
M being the number of landmarks and O the dimension of coordinates.
observable_landmarks : npt.NDArray
Nx(O+1) Numpy array representing the coordinates and the id of the observable landmarks.
M being the number of landmarks and O the dimension of coordinates.
observable_state_covariance : npt.NDArray
Covariance matrix of the observable landmarks. Dimension and explanation will follow.
use_kd_tree : bool, optional
Option to specify whether a kd tree should be used for evaluating
which pairings should be considered, by default True
chi2quantile : float, optional
Chi^2 quantile to calculate the threshold for evaluating the compatibility of two landmarks, by default 0.5
verifies_landmark_id : bool, optional
Specified whether the landmark id (color) of observed and observable landmark should match, by default True
observable_to_global_mapping : Optional[List[int]], optional
Mapping between the observable landmarks to all tracked landmarks, by default None
"""
self.observed_landmarks = observed_landmarks
self.observable_landmarks = observable_landmarks # filtered with FOV gate
self.observable_state_covariance = observable_state_covariance
self.observed_landmarks_n = observed_landmarks.shape[0]
self.observable_landmarks_n = observable_landmarks.shape[0]
self.observable_landmarks_flat_coords: npt.NDArray
self.observed_landmarks_flat_coords: npt.NDArray
self.associations_objects: List[Associations] = [] # candidates for best associations
self.best_associations: Associations
self.best_distance = np.nan
self.individual_compatibility = np.zeros((self.observed_landmarks_n, self.observable_landmarks_n), dtype=bool)
self.individual_distances = np.full((self.observed_landmarks_n, self.observable_landmarks_n), np.nan)
self.individual_compatibility_counts = np.zeros(self.observed_landmarks_n, dtype=int)
self.nodes_explored_n = 0
self.association_metric = DataAssociationMetric.MAHALANOBIS_DISTANCE
self.use_kd_tree = use_kd_tree
self.verifies_landmark_id = verifies_landmark_id
self.inverse_observable_landmark_covariances = np.empty((self.observable_landmarks_n, 2, 2))
self.chi2thres = chi2.ppf(chi2quantile, df=2) # TODO: what value to use? :(
self.clusters: List[Cluster] = []
self.prepare()
self.associate_with_full_covariance()
self.best_associations.translate_to_global_mapping(observable_to_global_mapping=observable_to_global_mapping)
[docs] def prepare(self):
"""
Function to prepare the data for the JCBB algorithm to save some computation time.
"""
matrices = [self.observable_state_covariance[2*observable_landmark_i:2*observable_landmark_i + 2,
2*observable_landmark_i:2*observable_landmark_i + 2]
for observable_landmark_i in range(self.observable_landmarks_n)]
self.inverse_observable_landmark_covariances = np.linalg.inv(matrices)
self.observable_landmarks_flat_coords = self.observable_landmarks[:, 0:2].flatten()
self.observed_landmarks_flat_coords = self.observed_landmarks[:, 0:2].flatten()
[docs] def recursive(self, cluster_observed_landmark_i: int, current_associations: Associations, cluster: Cluster):
"""
Function to search for the best associations set recursivly.
Parameters
----------
cluster_observed_landmark_i : int
Index of the current observed landmark to try all possible not already associated observable landmarks.
current_associations : Associations
Current already established associations for this branch of the search tree.
cluster : Cluster
Cluster for which the best associations are currently searched for.
"""
# Is it the end of the iteration? --> one possible termination condition
if cluster_observed_landmark_i >= cluster.observed_landmarks_n:
if current_associations.size > cluster.associations_objects[0].size:
# This must be a better choice since more features are matched, so all other candidates can be forgotten
cluster.associations_objects = [current_associations]
elif not current_associations.is_empty and current_associations.size == cluster.associations_objects[0].size: # noqa: E501
# otherwise just add his associations set to the candidates for the best associations set
cluster.associations_objects.append(current_associations)
else: # normal iteration
# how many association can be still established at this point
observed_landmark_i = cluster.observed_landmark_indices[cluster_observed_landmark_i]
potentials = np.count_nonzero(self.individual_compatibility_counts[cluster.observed_landmark_indices[cluster_observed_landmark_i:]])
# if the current highest number of established association can still be topped, continue.
if current_associations.size + potentials >= cluster.associations_objects[0].size:
# loop over all observable landmarks for this observed landmark
for observable_landmark_i in cluster.observable_landmark_indices:
# continue if this observed landmark is compatible with this observable landmark
if self.individual_compatibility[observed_landmark_i, observable_landmark_i]:
# if there isn't already an association within the associations set for this observable landmark
if observable_landmark_i not in current_associations.array:
# search along this branch then
new_current_associations = current_associations.copy
new_current_associations.array[observed_landmark_i] = observable_landmark_i
new_current_associations.size += 1
self.nodes_explored_n += 1
self.recursive(cluster_observed_landmark_i=cluster_observed_landmark_i + 1,
current_associations=new_current_associations, cluster=cluster)
# if this observed landmark had any compatible observable landmark, there is one potential less
# by not associating any observable landmark with this observed landmark.
if self.individual_compatibility_counts[observed_landmark_i] > 0:
potentials -= 1
if current_associations.size + potentials >= cluster.associations_objects[0].size:
self.nodes_explored_n += 1
self.recursive(cluster_observed_landmark_i=cluster_observed_landmark_i + 1,
current_associations=current_associations, cluster=cluster)
[docs] def mahalanobis_distance_of_associations(self, current_associations: Associations) -> float:
"""
Mahalanobis distance for a given set of associations. Is equivalent to the joint compatibility.
Parameters
----------
current_associations : Associations
Associations set to calculate the joint compatibility for.
Returns
-------
float
Mahalanobis distance for the given set of associations.
"""
associations_size = current_associations.size
assert(associations_size > 0)
flat_observed_landmarks_indices = current_associations.flat_observed_landmark_indices
flat_observable_landmarks_indices = current_associations.flat_observable_landmark_indices
observed_landmarks_coords = self.observed_landmarks_flat_coords[flat_observed_landmarks_indices]
observable_landmarks_coords = self.observable_landmarks_flat_coords[flat_observable_landmarks_indices]
observable_landmarks_covarince = self.observable_state_covariance[np.ix_(flat_observable_landmarks_indices,
flat_observable_landmarks_indices)]
return self.calculate_metric(observed_landmarks_coords, observable_landmarks_coords,
np.linalg.inv(observable_landmarks_covarince), self.association_metric)
[docs] def verify_landmark_id(self, observed_landmark_i: int, observable_landmark_i: int) -> bool:
"""
Verifies whether the landmark id (color) of the given observed and observable landmark match.
Returns always True if it has been specified that the landmark id shouldn't be verified.
Parameters
----------
observed_landmark_i : int
Index of the observed landmark the landmark id should be verified for.
observable_landmark_i : int
Index of the observable landmark the landmark id should be verified for.
Returns
-------
bool
Whether the landmark id (color) of the given observed and observable landmark match.
"""
if not self.verifies_landmark_id:
return True
return self.observed_landmarks[observed_landmark_i, 2] == self.observable_landmarks[observable_landmark_i, 2]
[docs] def associate_with_full_covariance(self):
"""
Calculate best association between the observed and observable landmarks.
"""
# at this moment only the mahalanobis distance is implemented.
assert self.association_metric == DataAssociationMetric.MAHALANOBIS_DISTANCE
# somehow
N_KD_RESULTS = self.observable_landmarks_n
kd_tree = KDTree(self.observable_landmarks[:, 0:2])
# init worst case distance? hopefully np.nan, what is done in the init
# First calculate the individual compatibility between the observed and observable landmarks.
for observed_landmark_i in range(self.observed_landmarks_n):
if self.use_kd_tree is False:
# if the kd tree should not be used, loop over every observable landmark for this observed landmark
for observable_landmark_i in range(self.observable_landmarks_n):
# cannot be compatible if the landmark id (color) doesn't match
if not self.verify_landmark_id(observed_landmark_i, observable_landmark_i):
continue
mahalanobis_distance = distance.mahalanobis(
self.observed_landmarks[observed_landmark_i], self.observable_landmarks[observable_landmark_i],
self.inverse_observable_landmark_covariances[observable_landmark_i])
# if the mahalanobis distance is larger than the specified threshold, they are not compatible.
if(mahalanobis_distance > self.chi2thres):
continue
# save that they are compatible and their compatibility.
self.individual_distances[observed_landmark_i, observable_landmark_i] = mahalanobis_distance
self.individual_compatibility[observed_landmark_i, observable_landmark_i] = True
self.individual_compatibility_counts[observed_landmark_i] += 1
elif self.use_kd_tree is True:
# if the kd tree should not be used, loop over all observable landmarks beginning with the nearest
# and break if one is too far from the observed landmark. Everything else stays the same as above.
_, indices = kd_tree.query(self.observed_landmarks[observed_landmark_i, 0:2], k=N_KD_RESULTS)
indices = np.atleast_1d(indices)
for result_index in range(indices.shape[-1]):
observable_landmark_i = indices[result_index]
if not self.verify_landmark_id(observed_landmark_i, observable_landmark_i):
continue
mahalanobis_distance = distance.mahalanobis(
self.observed_landmarks[observed_landmark_i, 0:2],
self.observable_landmarks[observable_landmark_i, 0:2],
self.inverse_observable_landmark_covariances[observable_landmark_i])
if(mahalanobis_distance > self.chi2thres):
break
self.individual_distances[observed_landmark_i, observable_landmark_i] = mahalanobis_distance
self.individual_compatibility[observed_landmark_i, observable_landmark_i] = True
self.individual_compatibility_counts[observed_landmark_i] += 1
if np.count_nonzero(self.individual_compatibility) == 0:
self.best_associations = Associations(array=np.full(self.observed_landmarks_n, -1), size=0)
return
self.find_clusters()
# find the best associations set for every cluster
for cluster in self.clusters:
# initialize the candidates for the best associations set with an empty one.
cluster.associations_objects = [Associations(observed_landmarks_n=self.observed_landmarks_n)]
# start the recursive search of the search tree for the best associations sets
self.recursive(cluster_observed_landmark_i=0, cluster=cluster,
current_associations=cluster.associations_objects[0].copy)
self.associations_objects += cluster.associations_objects
# if there are multiple candidates for the best associations set (same number of associations)
if len(cluster.associations_objects) > 1:
cluster.best_associations = None
# choose the one with the lowest mahalanobis distance
for associations_object in cluster.associations_objects:
mahalanobis_distance = self.mahalanobis_distance_of_associations(associations_object)
if self.is_closer(mahalanobis_distance, cluster.best_distance):
cluster.best_associations = associations_object
cluster.best_distance = mahalanobis_distance
else:
# otherwise take the one best.
cluster.best_associations = cluster.associations_objects[0]
# combine every best association sets for each cluster to one associations set
arrays = [cluster.best_associations.array for cluster in self.clusters]
array = np.atleast_1d(np.nansum(arrays, axis=0))
array[np.all(np.isnan(arrays), axis=0)] = np.nan
self.best_associations = Associations(array=array, size=np.count_nonzero(array))
[docs] def find_clusters(self):
"""
Find clusters of compatible landmarks in which the best associations can be searched for independently.
"""
order = np.argsort(self.individual_compatibility_counts)
to_be_examined_individual_compatibility = self.individual_compatibility[order]
cluster_oberved_indices_array = []
cluster_obervable_indices_array = []
while(to_be_examined_individual_compatibility.shape[0] > 0):
if np.count_nonzero(to_be_examined_individual_compatibility[-1]) == 0:
break
old_observable_landmarks_in_cluster = to_be_examined_individual_compatibility[-1]
while True:
cluster_oberved_indices = np.count_nonzero(
to_be_examined_individual_compatibility[:, old_observable_landmarks_in_cluster], axis=1) > 0
new_observable_landmarks_in_cluster = np.logical_or.reduce(
to_be_examined_individual_compatibility[cluster_oberved_indices])
if np.array_equal(new_observable_landmarks_in_cluster, old_observable_landmarks_in_cluster):
break
old_observable_landmarks_in_cluster = new_observable_landmarks_in_cluster
if len(cluster_oberved_indices_array) > 0:
cluster_oberved_indices_mask = np.zeros(self.individual_compatibility.shape[0], dtype=bool)
cluster_oberved_indices_mask[~np.logical_or.reduce(cluster_oberved_indices_array)] = \
cluster_oberved_indices
else:
cluster_oberved_indices_mask = cluster_oberved_indices
cluster_oberved_indices_array.append(cluster_oberved_indices_mask)
cluster_obervable_indices_array.append(old_observable_landmarks_in_cluster)
self.clusters.append(Cluster(observed_landmark_indices=cluster_oberved_indices_mask,
observable_landmark_indices=old_observable_landmarks_in_cluster, order=order,
individual_compatibility=self.individual_compatibility[order]))
to_be_examined_individual_compatibility = to_be_examined_individual_compatibility[~cluster_oberved_indices]
[docs] @staticmethod
def calculate_metric(observation: npt.NDArray, prediction: npt.NDArray,
covariance: npt.NDArray, metric: DataAssociationMetric) -> float:
"""
Calculate metric between an observation and a prediction scaled by the covariance of the prediction.
Parameters
----------
observation : npt.NDArray
Observed landmark(s).
prediction : npt.NDArray
Observable landmark(s).
covariance : npt.NDArray
Covariance of the observable landmark(s).
metric : DataAssociationMetric
Metric to calculate.
Returns
-------
float
Calculated metric.
"""
mahalanobis_distance = distance.mahalanobis(observation, prediction, covariance)
if metric == DataAssociationMetric.MAHALANOBIS_DISTANCE:
return mahalanobis_distance
assert metric == DataAssociationMetric.MAXIMUM_LIKELIHOOD
dovariance_det = np.linalg.det(covariance)
maximum_likelihood = np.exp(np.sqrt(mahalanobis_distance) / -2) / (np.sqrt(2 * np.pi) * np.sqrt(dovariance_det))
return maximum_likelihood
[docs] def is_closer(self, distance1: float, distance2: float) -> bool:
"""
Returns whether the first distance is smaller than the second distance and thus its landmarks are closer.
Based on the metric to use.
Parameters
----------
distance1 : float
First distance.
distance2 : float
First distance.
Returns
-------
bool
Whether the first distance is smaller than the second distance and thus its landmarks are closer.
"""
if np.isnan(distance2):
return True
if np.isnan(distance1):
return False
if self.association_metric == DataAssociationMetric.MAHALANOBIS_DISTANCE:
return distance1 < distance2
elif self.association_metric == DataAssociationMetric.MAXIMUM_LIKELIHOOD:
return distance1 > distance2