Source code for utils.helpers

"""
Implementation of all misc utility functions.
"""
from __future__ import annotations
from typing import TYPE_CHECKING, List, Optional
if TYPE_CHECKING:
    import ukf

import os
from enum import Enum
import logging
from pathlib import Path
import pickle
import numpy as np
import numpy.typing as npt
import pandas as pd

import progressbar

from pyproj import Transformer
from pyproj.crs.crs import CRS


[docs]def local_to_gps(data: npt.NDArray[np.floating], lat_origin: float, lon_origin: float, gamma: float = 0, colorize: bool = False) -> pd.DataFrame: """ Converts numpy array of local coordinates to DataFrame of global coordinates. Using a transformer object to convert local to global coordinates. Parameters ---------- data : npt.NDArray[np.floating] Numpy array containing local x and y data. lat_origin : float Latitude origin of the transformer object. lon_origin : float Longitude origin of the transformer object. gamma : float, optional Azimuth of centerline clockwise from north of the rectified bearing of centre line., by default 0 colorize : bool, optional Specifies whether a color for every datapoint should be calculated., by default False Returns ------- pd.DataFrame Pandas Dataframe containing global latitude and longitude as columns. """ gps = np.zeros((data.shape[0], 2)) crs_omerc = CRS.from_proj4(f'''+proj=omerc +lat_0={lat_origin} +lonc={lon_origin} +k_0=1 +x_0=0 +y_0=0 +gamma={gamma} +ellps=WGS84 +towgs84=0,0,0,0,0,0,0''') transformer_omerc = Transformer.from_crs(crs_from=crs_omerc, crs_to='EPSG:4326') for i in range(data.shape[0]): gps[i, :] = transformer_omerc.transform(xx=data[i, 0], yy=data[i, 1], errcheck=True) gps = pd.DataFrame(data=gps, columns=['lat', 'lon']) if colorize: import matplotlib.cm cmap = matplotlib.cm.get_cmap('magma') rgbs = np.array(cmap(np.linspace(0, 1, gps.shape[0])[::-1]))[:, 0:3] gps['color'] = ['#%02x%02x%02x' % tuple(map(int, tuple(rgb*255))) for rgb in rgbs] return gps
[docs]class Saver(): saving_attributes = ['x', 'u', 'sensor_z', 'sensor_R', 'current_global_landmarks', 't', 'x_R', 'landmarks', 'mapping', 'x_R_prior', 'x_R_post', 'P_R_prior', 'P_R_post', 'P', 'executed_steps', 'individual_compatability', 'observable_to_global_mapping', 'local_mapping_nodes_explored_n', 'local_mapping_compared_associations_n', 'observed_landmarks', 'observable_landmarks', 'observable_state_covariance'] def __init__(self, filter: ukf.UKF) -> None: self.x: List[npt.NDArray] = [] self.x_R: List[npt.NDArray] = [] self.x_R_prior: List[npt.NDArray] = [] self.x_R_post: List[npt.NDArray] = [] self.P_R_post: List[npt.NDArray] = [] self.P_R_prior: List[npt.NDArray] = [] self.P: List[npt.NDArray] = [] self.u: List[npt.NDArray] = [] self.executed_steps: List[List[str]] = [] self.sensor_z: List[npt.NDArray] = [] self.sensor_R: List[npt.NDArray] = [] self.landmarks: List[npt.NDArray] = [] self.individual_compatability: List[npt.NDArray] = [] self.observable_to_global_mapping: List[npt.NDArray] = [] self.current_global_landmarks: List[npt.NDArray] = [] self.observed_landmarks: List[npt.NDArray] = [] self.observable_landmarks: List[npt.NDArray] = [] self.observable_state_covariance: List[npt.NDArray] = [] self.mapping: List[List[int]] = [] self.t: List[float] = [] self.local_mapping_compared_associations_n: List[int] = [] self.local_mapping_nodes_explored_n: List[int] = [] self._kf = filter
[docs] def save(self): """ Save the current values of the predefined attributes to be saved of the kalman filter. """ for attribute in self.saving_attributes: getattr(self, attribute).append(np.copy(getattr(self._kf, attribute)))
[docs] def finalize(self): """ Converts the list of all attributes to numpy arrays. Currently unused. """ for attribute in self.saving_attributes: setattr(self, attribute, np.asarray(getattr(self, attribute)))
[docs] def pickle(self, filename: str = 'saver', test_case: str = 'default') -> None: """ Pickle the saver object for analysis later on. Parameters ---------- filename : str, optional Filename the saver object should be saved to, by default 'saver'. test_case : str, optional Name of the subfolder where the pickle should be saved to, by default 'default'. """ Path(f'{os.path.abspath(os.path.dirname(__file__))}/../../plots/saver/{test_case}').mkdir(parents=True, exist_ok=True) pickle.dump(self, open(f'{os.path.abspath(os.path.dirname(__file__))}/../../plots/saver/{test_case}/{filename}.pickle', 'wb')) logging.info(msg=f'Saving pickle of the saver for test case \'{test_case}\'')
[docs] def pickle_parameter_study(self, filename: str = 'saver', test_case: str = 'default', parameter_study_name: str = 'default') -> None: """ Pickle the saver object for analysis later on. Parameters ---------- filename : str, optional Filename the saver object should be saved to, by default 'saver'. test_case : str, optional Name of the subfolder where the pickle should be saved to, by default 'default'. """ Path(f'{os.path.abspath(os.path.dirname(__file__))}/../../param_study/{parameter_study_name}/{test_case}').mkdir(parents=True, exist_ok=True) with open(f'{os.path.abspath(os.path.dirname(__file__))}/../../param_study/{parameter_study_name}/{test_case}/{filename}.pickle', 'wb') as output: pickle.dump(self, output) logging.info(msg=f'Saving pickle of the saver for test case \'{test_case}\'')
[docs] def recover(self, delete_last_state: bool = True): """ Function to recover from a state where the state covariance matrix is not semi-positive definite and thus cannot be updated or predicted anymore Parameters ---------- delete_last_state : bool, optional Specified whether the last saved state should be deleted before recovering last saved state, by default True """ if delete_last_state is True: for attribute in self.saving_attributes: getattr(self, attribute).pop() self._kf.executed_steps = self.executed_steps[-1] for sensor in self._kf.sensors: sensor.execute_update_postprocess = False self._kf.P = self.P[-1] self._kf.x = self.x[-1] self._kf.mapping = self.mapping[-1] self._kf.observable_to_global_mapping = self.observable_to_global_mapping[-1] self._kf.individual_compatability = self.individual_compatability[-1] self._kf.landmark_ids = self._kf.landmark_ids[:self._kf.landmarks.shape[0]] self._kf.fake_landmark_ids = self._kf.fake_landmark_ids[:self._kf.landmarks.shape[0]]
[docs] @staticmethod def read_pickle(filename: str = 'saver', test_case: str = 'default') -> Optional[Saver]: """ Read the pickle of a previous pickled saver object. Returns None if the file does not exist. Parameters ---------- filename : str, optional Filename of the pickle object to be read, by default 'saver' test_case : str, optional Name of the subfolder where the pickle should be read from, by default 'default' Returns ------- Optional[Saver] Read in saver object. """ if Path(f'{os.path.abspath(os.path.dirname(__file__))}/../../plots/saver/{test_case}/{filename}.pickle').is_file(): return pickle.load(open(f'{os.path.abspath(os.path.dirname(__file__))}/../../plots/saver/{test_case}/{filename}.pickle', 'rb')) else: return None
[docs] @staticmethod def read_pickle_parameter_study(filename: str = 'saver', test_case: str = 'default', parameter_study_name: str = 'default') -> Optional[Saver]: """ Read the pickle of a previous pickled saver object. Returns None if the file does not exist. Parameters ---------- filename : str, optional Filename of the pickle object to be read, by default 'saver' test_case : str, optional Name of the subfolder where the pickle should be read from, by default 'default' Returns ------- Optional[Saver] Read in saver object. """ if Path(f'{os.path.abspath(os.path.dirname(__file__))}/../../param_study/{parameter_study_name}/{test_case}/{filename}.pickle').is_file(): return pickle.load(open(f'{os.path.abspath(os.path.dirname(__file__))}/../../param_study/{parameter_study_name}/{test_case}/{filename}.pickle', 'rb')) else: return None
[docs] def compare(self, cmp_saver: Saver, exclude: List[str] = []): """ Compares the current saver with another saver. Finds prediction step with diverging attributes. Prints them and the executed steps in the previous and current prediction step. Parameters ---------- cmp_saver : Saver Saver to compare with """ diverging_attributes = [] for i in range(len(self.x)): for attribute in self.saving_attributes: if attribute in exclude: continue old_attribute = getattr(self, attribute)[i] new_attribute = getattr(cmp_saver, attribute)[i] if type(old_attribute) != type(new_attribute): print(f'Type different of {attribute} at {i}') if type(old_attribute) == np.ndarray and np.issubdtype(old_attribute.dtype, np.number): if not np.array_equal(old_attribute, new_attribute, equal_nan=True): diverging_attributes.append(attribute) else: if old_attribute.shape != new_attribute.shape or np.any(old_attribute != new_attribute): diverging_attributes.append(attribute) if len(diverging_attributes) > 0: print(f'Diverging attributes at prediction_steo {i}: {diverging_attributes}') print(f'Executed steps at current prediction_step {i}: {self.executed_steps[i]}') if i > 0: print(f'Executed steps at previous prediction_step {i-1}: {self.executed_steps[i-1]}') break
[docs]class LandmarkUpdateStrategy(Enum): ALL = 1 NO_HEADING = 2 NO_POSE = 3 DO_NOT_INITIALIZE = 4 DO_NOT_UPDATE = 5
[docs]class UpdateStrategy(Enum): """ Enum to describe which states to use during the update step. ALL_STATES means that all tracked states will be used for the update step as as part of the sigma points. ONLY_ROBOT_POSE_AND_NECESSARY_STATES means that all robot pose states and the states that can be measured (e.g. by the local mapping) will be used for the update step as as part of the sigma points. ONLY_ROBOT_POSE_OR_ALL_STATES means that either only the robot states will be used for the update step as as part of the sigma points or all states when there is at least one other state that can be measured. """ ALL_STATES = 1 ONLY_ROBOT_POSE_AND_NECESSARY_STATES = 2 ONLY_ROBOT_POSE_OR_ALL_STATES = 3
[docs]class DataAssociation(Enum): """ Enum to describe which states to use during the update step. JCBB uses the just little adapted Jointly Compatibility Branch and Bound algorithm. ClusterJCBB uses the JCBB algorithm for previous formed compatibility clusters. """ JCBB = 1 ClusterJCBB = 2
[docs]def generate_progressbar(data_length: int) -> progressbar.ProgressBar: """ Generates a progressbar object with the given data length with more information than the standard one. Parameters ---------- data_length : int Length of data points to process. Returns ------- progressbar.ProgressBar Progressbar object with the given data length with more information than the standard one. """ widgets = [ progressbar.Percentage(), ' ', progressbar.SimpleProgress(), ' ', progressbar.Bar(), ' ', progressbar.Variable('prediction_step'), ' ', progressbar.Timer(), ' ', progressbar.ETA() ] return progressbar.ProgressBar(max_value=data_length, widgets=widgets, redirect_stdout=True)