"""Implementation of all misc utility functions."""from__future__importannotationsfromtypingimportTYPE_CHECKING,List,OptionalifTYPE_CHECKING:importukfimportosfromenumimportEnumimportloggingfrompathlibimportPathimportpickleimportnumpyasnpimportnumpy.typingasnptimportpandasaspdimportprogressbarfrompyprojimportTransformerfrompyproj.crs.crsimportCRS
[docs]deflocal_to_gps(data:npt.NDArray[np.floating],lat_origin:float,lon_origin:float,gamma:float=0,colorize:bool=False)->pd.DataFrame:""" Converts numpy array of local coordinates to DataFrame of global coordinates. Using a transformer object to convert local to global coordinates. Parameters ---------- data : npt.NDArray[np.floating] Numpy array containing local x and y data. lat_origin : float Latitude origin of the transformer object. lon_origin : float Longitude origin of the transformer object. gamma : float, optional Azimuth of centerline clockwise from north of the rectified bearing of centre line., by default 0 colorize : bool, optional Specifies whether a color for every datapoint should be calculated., by default False Returns ------- pd.DataFrame Pandas Dataframe containing global latitude and longitude as columns. """gps=np.zeros((data.shape[0],2))crs_omerc=CRS.from_proj4(f'''+proj=omerc +lat_0={lat_origin} +lonc={lon_origin} +k_0=1 +x_0=0 +y_0=0 +gamma={gamma} +ellps=WGS84 +towgs84=0,0,0,0,0,0,0''')transformer_omerc=Transformer.from_crs(crs_from=crs_omerc,crs_to='EPSG:4326')foriinrange(data.shape[0]):gps[i,:]=transformer_omerc.transform(xx=data[i,0],yy=data[i,1],errcheck=True)gps=pd.DataFrame(data=gps,columns=['lat','lon'])ifcolorize:importmatplotlib.cmcmap=matplotlib.cm.get_cmap('magma')rgbs=np.array(cmap(np.linspace(0,1,gps.shape[0])[::-1]))[:,0:3]gps['color']=['#%02x%02x%02x'%tuple(map(int,tuple(rgb*255)))forrgbinrgbs]returngps
[docs]defsave(self):""" Save the current values of the predefined attributes to be saved of the kalman filter. """forattributeinself.saving_attributes:getattr(self,attribute).append(np.copy(getattr(self._kf,attribute)))
[docs]deffinalize(self):""" Converts the list of all attributes to numpy arrays. Currently unused. """forattributeinself.saving_attributes:setattr(self,attribute,np.asarray(getattr(self,attribute)))
[docs]defpickle(self,filename:str='saver',test_case:str='default')->None:""" Pickle the saver object for analysis later on. Parameters ---------- filename : str, optional Filename the saver object should be saved to, by default 'saver'. test_case : str, optional Name of the subfolder where the pickle should be saved to, by default 'default'. """Path(f'{os.path.abspath(os.path.dirname(__file__))}/../../plots/saver/{test_case}').mkdir(parents=True,exist_ok=True)pickle.dump(self,open(f'{os.path.abspath(os.path.dirname(__file__))}/../../plots/saver/{test_case}/{filename}.pickle','wb'))logging.info(msg=f'Saving pickle of the saver for test case \'{test_case}\'')
[docs]defpickle_parameter_study(self,filename:str='saver',test_case:str='default',parameter_study_name:str='default')->None:""" Pickle the saver object for analysis later on. Parameters ---------- filename : str, optional Filename the saver object should be saved to, by default 'saver'. test_case : str, optional Name of the subfolder where the pickle should be saved to, by default 'default'. """Path(f'{os.path.abspath(os.path.dirname(__file__))}/../../param_study/{parameter_study_name}/{test_case}').mkdir(parents=True,exist_ok=True)withopen(f'{os.path.abspath(os.path.dirname(__file__))}/../../param_study/{parameter_study_name}/{test_case}/{filename}.pickle','wb')asoutput:pickle.dump(self,output)logging.info(msg=f'Saving pickle of the saver for test case \'{test_case}\'')
[docs]defrecover(self,delete_last_state:bool=True):""" Function to recover from a state where the state covariance matrix is not semi-positive definite and thus cannot be updated or predicted anymore Parameters ---------- delete_last_state : bool, optional Specified whether the last saved state should be deleted before recovering last saved state, by default True """ifdelete_last_stateisTrue:forattributeinself.saving_attributes:getattr(self,attribute).pop()self._kf.executed_steps=self.executed_steps[-1]forsensorinself._kf.sensors:sensor.execute_update_postprocess=Falseself._kf.P=self.P[-1]self._kf.x=self.x[-1]self._kf.mapping=self.mapping[-1]self._kf.observable_to_global_mapping=self.observable_to_global_mapping[-1]self._kf.individual_compatability=self.individual_compatability[-1]self._kf.landmark_ids=self._kf.landmark_ids[:self._kf.landmarks.shape[0]]self._kf.fake_landmark_ids=self._kf.fake_landmark_ids[:self._kf.landmarks.shape[0]]
[docs]@staticmethoddefread_pickle(filename:str='saver',test_case:str='default')->Optional[Saver]:""" Read the pickle of a previous pickled saver object. Returns None if the file does not exist. Parameters ---------- filename : str, optional Filename of the pickle object to be read, by default 'saver' test_case : str, optional Name of the subfolder where the pickle should be read from, by default 'default' Returns ------- Optional[Saver] Read in saver object. """ifPath(f'{os.path.abspath(os.path.dirname(__file__))}/../../plots/saver/{test_case}/{filename}.pickle').is_file():returnpickle.load(open(f'{os.path.abspath(os.path.dirname(__file__))}/../../plots/saver/{test_case}/{filename}.pickle','rb'))else:returnNone
[docs]@staticmethoddefread_pickle_parameter_study(filename:str='saver',test_case:str='default',parameter_study_name:str='default')->Optional[Saver]:""" Read the pickle of a previous pickled saver object. Returns None if the file does not exist. Parameters ---------- filename : str, optional Filename of the pickle object to be read, by default 'saver' test_case : str, optional Name of the subfolder where the pickle should be read from, by default 'default' Returns ------- Optional[Saver] Read in saver object. """ifPath(f'{os.path.abspath(os.path.dirname(__file__))}/../../param_study/{parameter_study_name}/{test_case}/{filename}.pickle').is_file():returnpickle.load(open(f'{os.path.abspath(os.path.dirname(__file__))}/../../param_study/{parameter_study_name}/{test_case}/{filename}.pickle','rb'))else:returnNone
[docs]defcompare(self,cmp_saver:Saver,exclude:List[str]=[]):""" Compares the current saver with another saver. Finds prediction step with diverging attributes. Prints them and the executed steps in the previous and current prediction step. Parameters ---------- cmp_saver : Saver Saver to compare with """diverging_attributes=[]foriinrange(len(self.x)):forattributeinself.saving_attributes:ifattributeinexclude:continueold_attribute=getattr(self,attribute)[i]new_attribute=getattr(cmp_saver,attribute)[i]iftype(old_attribute)!=type(new_attribute):print(f'Type different of {attribute} at {i}')iftype(old_attribute)==np.ndarrayandnp.issubdtype(old_attribute.dtype,np.number):ifnotnp.array_equal(old_attribute,new_attribute,equal_nan=True):diverging_attributes.append(attribute)else:ifold_attribute.shape!=new_attribute.shapeornp.any(old_attribute!=new_attribute):diverging_attributes.append(attribute)iflen(diverging_attributes)>0:print(f'Diverging attributes at prediction_steo {i}: {diverging_attributes}')print(f'Executed steps at current prediction_step {i}: {self.executed_steps[i]}')ifi>0:print(f'Executed steps at previous prediction_step {i-1}: {self.executed_steps[i-1]}')break
[docs]classUpdateStrategy(Enum):""" Enum to describe which states to use during the update step. ALL_STATES means that all tracked states will be used for the update step as as part of the sigma points. ONLY_ROBOT_POSE_AND_NECESSARY_STATES means that all robot pose states and the states that can be measured (e.g. by the local mapping) will be used for the update step as as part of the sigma points. ONLY_ROBOT_POSE_OR_ALL_STATES means that either only the robot states will be used for the update step as as part of the sigma points or all states when there is at least one other state that can be measured. """ALL_STATES=1ONLY_ROBOT_POSE_AND_NECESSARY_STATES=2ONLY_ROBOT_POSE_OR_ALL_STATES=3
[docs]classDataAssociation(Enum):""" Enum to describe which states to use during the update step. JCBB uses the just little adapted Jointly Compatibility Branch and Bound algorithm. ClusterJCBB uses the JCBB algorithm for previous formed compatibility clusters. """JCBB=1ClusterJCBB=2
[docs]defgenerate_progressbar(data_length:int)->progressbar.ProgressBar:""" Generates a progressbar object with the given data length with more information than the standard one. Parameters ---------- data_length : int Length of data points to process. Returns ------- progressbar.ProgressBar Progressbar object with the given data length with more information than the standard one. """widgets=[progressbar.Percentage(),' ',progressbar.SimpleProgress(),' ',progressbar.Bar(),' ',progressbar.Variable('prediction_step'),' ',progressbar.Timer(),' ',progressbar.ETA()]returnprogressbar.ProgressBar(max_value=data_length,widgets=widgets,redirect_stdout=True)