"""
Implementation of all misc utility functions.
"""
from __future__ import annotations
from typing import TYPE_CHECKING, List, Optional
if TYPE_CHECKING:
import ukf
import os
from enum import Enum
import logging
from pathlib import Path
import pickle
import numpy as np
import numpy.typing as npt
import pandas as pd
import progressbar
from pyproj import Transformer
from pyproj.crs.crs import CRS
[docs]def local_to_gps(data: npt.NDArray[np.floating], lat_origin: float, lon_origin: float,
gamma: float = 0, colorize: bool = False) -> pd.DataFrame:
"""
Converts numpy array of local coordinates to DataFrame of global coordinates.
Using a transformer object to convert local to global coordinates.
Parameters
----------
data : npt.NDArray[np.floating]
Numpy array containing local x and y data.
lat_origin : float
Latitude origin of the transformer object.
lon_origin : float
Longitude origin of the transformer object.
gamma : float, optional
Azimuth of centerline clockwise from north of the rectified bearing of centre line., by default 0
colorize : bool, optional
Specifies whether a color for every datapoint should be calculated., by default False
Returns
-------
pd.DataFrame
Pandas Dataframe containing global latitude and longitude as columns.
"""
gps = np.zeros((data.shape[0], 2))
crs_omerc = CRS.from_proj4(f'''+proj=omerc +lat_0={lat_origin} +lonc={lon_origin} +k_0=1
+x_0=0 +y_0=0 +gamma={gamma} +ellps=WGS84 +towgs84=0,0,0,0,0,0,0''')
transformer_omerc = Transformer.from_crs(crs_from=crs_omerc, crs_to='EPSG:4326')
for i in range(data.shape[0]):
gps[i, :] = transformer_omerc.transform(xx=data[i, 0], yy=data[i, 1], errcheck=True)
gps = pd.DataFrame(data=gps, columns=['lat', 'lon'])
if colorize:
import matplotlib.cm
cmap = matplotlib.cm.get_cmap('magma')
rgbs = np.array(cmap(np.linspace(0, 1, gps.shape[0])[::-1]))[:, 0:3]
gps['color'] = ['#%02x%02x%02x' % tuple(map(int, tuple(rgb*255))) for rgb in rgbs]
return gps
[docs]class Saver():
saving_attributes = ['x', 'u', 'sensor_z', 'sensor_R', 'current_global_landmarks', 't', 'x_R', 'landmarks',
'mapping', 'x_R_prior', 'x_R_post', 'P_R_prior', 'P_R_post', 'P', 'executed_steps',
'individual_compatability', 'observable_to_global_mapping', 'local_mapping_nodes_explored_n',
'local_mapping_compared_associations_n', 'observed_landmarks', 'observable_landmarks', 'observable_state_covariance']
def __init__(self, filter: ukf.UKF) -> None:
self.x: List[npt.NDArray] = []
self.x_R: List[npt.NDArray] = []
self.x_R_prior: List[npt.NDArray] = []
self.x_R_post: List[npt.NDArray] = []
self.P_R_post: List[npt.NDArray] = []
self.P_R_prior: List[npt.NDArray] = []
self.P: List[npt.NDArray] = []
self.u: List[npt.NDArray] = []
self.executed_steps: List[List[str]] = []
self.sensor_z: List[npt.NDArray] = []
self.sensor_R: List[npt.NDArray] = []
self.landmarks: List[npt.NDArray] = []
self.individual_compatability: List[npt.NDArray] = []
self.observable_to_global_mapping: List[npt.NDArray] = []
self.current_global_landmarks: List[npt.NDArray] = []
self.observed_landmarks: List[npt.NDArray] = []
self.observable_landmarks: List[npt.NDArray] = []
self.observable_state_covariance: List[npt.NDArray] = []
self.mapping: List[List[int]] = []
self.t: List[float] = []
self.local_mapping_compared_associations_n: List[int] = []
self.local_mapping_nodes_explored_n: List[int] = []
self._kf = filter
[docs] def save(self):
"""
Save the current values of the predefined attributes to be saved of the kalman filter.
"""
for attribute in self.saving_attributes:
getattr(self, attribute).append(np.copy(getattr(self._kf, attribute)))
[docs] def finalize(self):
"""
Converts the list of all attributes to numpy arrays. Currently unused.
"""
for attribute in self.saving_attributes:
setattr(self, attribute, np.asarray(getattr(self, attribute)))
[docs] def pickle(self, filename: str = 'saver', test_case: str = 'default') -> None:
"""
Pickle the saver object for analysis later on.
Parameters
----------
filename : str, optional
Filename the saver object should be saved to, by default 'saver'.
test_case : str, optional
Name of the subfolder where the pickle should be saved to, by default 'default'.
"""
Path(f'{os.path.abspath(os.path.dirname(__file__))}/../../plots/saver/{test_case}').mkdir(parents=True, exist_ok=True)
pickle.dump(self, open(f'{os.path.abspath(os.path.dirname(__file__))}/../../plots/saver/{test_case}/{filename}.pickle', 'wb'))
logging.info(msg=f'Saving pickle of the saver for test case \'{test_case}\'')
[docs] def pickle_parameter_study(self, filename: str = 'saver', test_case: str = 'default', parameter_study_name: str = 'default') -> None:
"""
Pickle the saver object for analysis later on.
Parameters
----------
filename : str, optional
Filename the saver object should be saved to, by default 'saver'.
test_case : str, optional
Name of the subfolder where the pickle should be saved to, by default 'default'.
"""
Path(f'{os.path.abspath(os.path.dirname(__file__))}/../../param_study/{parameter_study_name}/{test_case}').mkdir(parents=True, exist_ok=True)
with open(f'{os.path.abspath(os.path.dirname(__file__))}/../../param_study/{parameter_study_name}/{test_case}/{filename}.pickle', 'wb') as output:
pickle.dump(self, output)
logging.info(msg=f'Saving pickle of the saver for test case \'{test_case}\'')
[docs] def recover(self, delete_last_state: bool = True):
"""
Function to recover from a state where the state covariance matrix is not semi-positive definite
and thus cannot be updated or predicted anymore
Parameters
----------
delete_last_state : bool, optional
Specified whether the last saved state should be deleted before recovering last saved state, by default True
"""
if delete_last_state is True:
for attribute in self.saving_attributes:
getattr(self, attribute).pop()
self._kf.executed_steps = self.executed_steps[-1]
for sensor in self._kf.sensors:
sensor.execute_update_postprocess = False
self._kf.P = self.P[-1]
self._kf.x = self.x[-1]
self._kf.mapping = self.mapping[-1]
self._kf.observable_to_global_mapping = self.observable_to_global_mapping[-1]
self._kf.individual_compatability = self.individual_compatability[-1]
self._kf.landmark_ids = self._kf.landmark_ids[:self._kf.landmarks.shape[0]]
self._kf.fake_landmark_ids = self._kf.fake_landmark_ids[:self._kf.landmarks.shape[0]]
[docs] @staticmethod
def read_pickle(filename: str = 'saver', test_case: str = 'default') -> Optional[Saver]:
"""
Read the pickle of a previous pickled saver object.
Returns None if the file does not exist.
Parameters
----------
filename : str, optional
Filename of the pickle object to be read, by default 'saver'
test_case : str, optional
Name of the subfolder where the pickle should be read from, by default 'default'
Returns
-------
Optional[Saver]
Read in saver object.
"""
if Path(f'{os.path.abspath(os.path.dirname(__file__))}/../../plots/saver/{test_case}/{filename}.pickle').is_file():
return pickle.load(open(f'{os.path.abspath(os.path.dirname(__file__))}/../../plots/saver/{test_case}/{filename}.pickle', 'rb'))
else:
return None
[docs] @staticmethod
def read_pickle_parameter_study(filename: str = 'saver', test_case: str = 'default', parameter_study_name: str = 'default') -> Optional[Saver]:
"""
Read the pickle of a previous pickled saver object.
Returns None if the file does not exist.
Parameters
----------
filename : str, optional
Filename of the pickle object to be read, by default 'saver'
test_case : str, optional
Name of the subfolder where the pickle should be read from, by default 'default'
Returns
-------
Optional[Saver]
Read in saver object.
"""
if Path(f'{os.path.abspath(os.path.dirname(__file__))}/../../param_study/{parameter_study_name}/{test_case}/{filename}.pickle').is_file():
return pickle.load(open(f'{os.path.abspath(os.path.dirname(__file__))}/../../param_study/{parameter_study_name}/{test_case}/{filename}.pickle', 'rb'))
else:
return None
[docs] def compare(self, cmp_saver: Saver, exclude: List[str] = []):
"""
Compares the current saver with another saver. Finds prediction step with diverging attributes.
Prints them and the executed steps in the previous and current prediction step.
Parameters
----------
cmp_saver : Saver
Saver to compare with
"""
diverging_attributes = []
for i in range(len(self.x)):
for attribute in self.saving_attributes:
if attribute in exclude:
continue
old_attribute = getattr(self, attribute)[i]
new_attribute = getattr(cmp_saver, attribute)[i]
if type(old_attribute) != type(new_attribute):
print(f'Type different of {attribute} at {i}')
if type(old_attribute) == np.ndarray and np.issubdtype(old_attribute.dtype, np.number):
if not np.array_equal(old_attribute, new_attribute, equal_nan=True):
diverging_attributes.append(attribute)
else:
if old_attribute.shape != new_attribute.shape or np.any(old_attribute != new_attribute):
diverging_attributes.append(attribute)
if len(diverging_attributes) > 0:
print(f'Diverging attributes at prediction_steo {i}: {diverging_attributes}')
print(f'Executed steps at current prediction_step {i}: {self.executed_steps[i]}')
if i > 0:
print(f'Executed steps at previous prediction_step {i-1}: {self.executed_steps[i-1]}')
break
[docs]class LandmarkUpdateStrategy(Enum):
ALL = 1
NO_HEADING = 2
NO_POSE = 3
DO_NOT_INITIALIZE = 4
DO_NOT_UPDATE = 5
[docs]class UpdateStrategy(Enum):
"""
Enum to describe which states to use during the update step.
ALL_STATES means that all tracked states will be used for the update step as as part of the sigma points.
ONLY_ROBOT_POSE_AND_NECESSARY_STATES means that all robot pose states and the states that can be measured
(e.g. by the local mapping) will be used for the update step as as part of the sigma points.
ONLY_ROBOT_POSE_OR_ALL_STATES means that either only the robot states will be used for the update step as as part
of the sigma points or all states when there is at least one other state that can be measured.
"""
ALL_STATES = 1
ONLY_ROBOT_POSE_AND_NECESSARY_STATES = 2
ONLY_ROBOT_POSE_OR_ALL_STATES = 3
[docs]class DataAssociation(Enum):
"""
Enum to describe which states to use during the update step.
JCBB uses the just little adapted Jointly Compatibility Branch and Bound algorithm.
ClusterJCBB uses the JCBB algorithm for previous formed compatibility clusters.
"""
JCBB = 1
ClusterJCBB = 2
[docs]def generate_progressbar(data_length: int) -> progressbar.ProgressBar:
"""
Generates a progressbar object with the given data length with more information than the standard one.
Parameters
----------
data_length : int
Length of data points to process.
Returns
-------
progressbar.ProgressBar
Progressbar object with the given data length with more information than the standard one.
"""
widgets = [
progressbar.Percentage(), ' ',
progressbar.SimpleProgress(), ' ',
progressbar.Bar(), ' ',
progressbar.Variable('prediction_step'), ' ',
progressbar.Timer(), ' ',
progressbar.ETA()
]
return progressbar.ProgressBar(max_value=data_length, widgets=widgets, redirect_stdout=True)