Source code for sensors.novatel

"""
Implementation of all Sensors of the Novatel GPS.
"""
from __future__ import annotations
import logging
from typing import TYPE_CHECKING

if TYPE_CHECKING:
    import ukf

from typing import List, Optional

import numpy as np
import numpy.typing as npt
import pandas as pd

import pyproj

from .sensor import Sensor
from utils import math, helpers

import rospy


[docs]class NovatelGPSHeading(Sensor): """ Sensor class for the heading measurement of the novatel gps. Attributes ---------- filter : ukf.UKF KalmanFilter the sensor will be used for. sensor_name : str Name to identify the sensor by. sensor_i : int Index of the sensor as part of the Kalman Filter. z_names : List[str] Names of the measurements of the sensor. z_dim : npt.NDArray Dimension of measurement vector derived by length of z_names. z_array : npt.NDArray Numpy array of prerecorded measurements. R_array : npt.NDArray Numpy array of prerecorded measurement noise covariance matrices. active : bool Specifies whether the update for a sensor should be executed or only the measurements should be read in. x_indices_to_update : Set Indices of the state variables to use for predict and update. Based on base and situation dependent extended. x_indices_to_update_base : Set Base of indices of the state variables to always use for predict and update. z_indices_to_update : npt.NDArray Measurement indices to use for update. z_indices_to_initialize : npt.NDArray Measurement indices to initialize while post_processing update. execute_update_postprocess : bool Flag to decide whether the update of the sensor should be postprocessed. Used to intiialize intern variables or new state variables. R : npt.NDArray Covariance matrix. ros_R : npt.NDArray Variable to save R while another callback is executed, so R is not overwritten. offset : float Heading offset to correct. """ def __init__(self, filter: ukf.UKF, sensor_name: str, z_names: List[str], z_array: Optional[npt.NDArray] = None, R_array: Optional[npt.NDArray] = None, active: bool = True, offset: float = 0) -> None: """ Initialize an object of the position gps sensor of the novatel gps. Parameters ---------- filter : ukf.UKF KalmanFilter object the sensor will be used for. sensor_name : str Name to identify the sensor by. z_names : List[str] List of the names of the measurements with the measurement count as length. z_array : npt.NDArray, optional Numpy Array of the prerecorded measurements. Prerecorded measurements of all sensor must be synchronized and have the same length, by default None. R_array : Optional[npt.NDArray], optional Numpy array of prerecorded measurement noises, by default None active : bool, optional Specifies whether the update for a sensor should be executed or only the measurements should be read in, by default True offset : float Heading offset to correct. """ super().__init__(filter, z_array=z_array, R_array=R_array, sensor_name=sensor_name, z_names=z_names, active=active) self.R = np.full((self.z_dim, self.z_dim), np.nan) self.ros_R = np.full((self.z_dim, self.z_dim), np.nan) self.offset = offset
[docs] def copy(self, filter: ukf.UKF, timestamp: float) -> 'NovatelGPSHeading': copy = NovatelGPSHeading(filter=filter, sensor_name=self.sensor_name, z_names=self.z_names, active=self.active) copy.ros_t_array = self.ros_t_array copy.ros_R_array = self.ros_R_array copy.ros_z_array = self.ros_z_array copy.measurement_lock = self.measurement_lock return copy
[docs] def prepare_update(self) -> bool: """ Prepares update step and returns whether the update for this sensor should be executed. Saves measurement and measurement noise for this sensor in the filter object and calculates measurement indices respectively for update and intiialization and state variable indices for predict and update. If initial world and map heading have not been set yet and measurements are available, specify to postprocess. Returns ------- bool Specifies wthether the update for this sensor should be executed. """ self.filter.sensor_z[self.sensor_i] = np.broadcast_to(self.z, self.filter.max_z_dim) self.filter.sensor_R[self.sensor_i] = np.broadcast_to(self.R, (self.filter.max_z_dim, self.filter.max_z_dim)) if np.all(np.isfinite(self.z)) and self.z[0] != 0. + self.offset: self.x_indices_to_update = self.x_indices_to_update_base self.z_indices_to_update = np.arange(self.z_dim, dtype=int) self.z_indices_to_initialize = np.array([]) if self.filter.initial_world_heading is not None and self.filter.initial_map_heading is not None: return self.active else: self.execute_update_postprocess = True else: self.x_indices_to_update = set() self.z_indices_to_update = np.array([]) self.z_indices_to_initialize = np.array([]) return False
[docs] def postprocess_update(self) -> None: """ Postprocess update step for this sensor by initialzing initial world and map heading if not initialzed yet. """ if self.filter.initial_world_heading is None and self.filter.initial_map_heading is None: self.filter.initial_world_heading = self.z[0] self.filter.initial_map_heading = self.filter.x[3] self.filter.initial_heading_t = self.filter.t rospy.loginfo(f'[SLAM] Initialized map_heading = {self.filter.x[3]} and world_heading = {self.z[0]}') self.execute_update_postprocess = False
[docs] def hx(self, sigmas_f: npt.NDArray[np.floating], u: Optional[npt.NDArray[np.float32]]) -> npt.NDArray[np.float32]: """ Calculate the measurement the sensor should measure based on the current state vector and control input. Parameters ---------- sigmas_f : npt.NDArray[np.floating] Sigma point of the current state vector of the system. u : Optional[npt.NDArray] Current control input vector of the system. Returns ------- npt.NDArray Calculated measurement vector of the sensor. """ return math.normalize_degree_angles((self.filter.initial_world_heading - np.degrees(sigmas_f[:, 3:4] - self.filter.initial_map_heading)))
[docs] def hx_inverse(self, z: np.ndarray, u: Optional[npt.NDArray] = None) -> npt.NDArray: """ Calculate the state vector based on the current measurement of the sensor. Not every state can be calculated, so only part of the state vector will be calculated. Use the transformer to convert lat, lon to local coordinates Parameters ---------- z : np.ndarray Measurement vector, can be a matrix with multiple time steps. Returns ------- npt.NDArray State vector, can be a matrix with multiple time steps. """ return math.normalize_rad_angles(self.filter.initial_map_heading - np.radians(z[..., 0:1] - self.filter.initial_world_heading))
[docs] @staticmethod def residual_sigmash_and_z(sigmas_h: npt.NDArray[np.floating], z: npt.NDArray[np.floating]) -> npt.NDArray[np.floating]: """ Calculates the difference between the measurement sigma points and the measurement. Parameters ---------- sigmas_h : npt.NDArray[np.floating] Sigma points for the measurement. z : npt.NDArray[np.floating] Measurement Returns ------- npt.NDArray[np.floating] Difference between sigma points and measurements. """ dz = sigmas_h - z if dz.ndim > 1: dz[:] = math.normalize_degree_angles(dz[:]) else: dz = math.normalize_degree_angle(dz) return dz
[docs] @staticmethod def measurement_mean(sigmas: npt.NDArray, Wm: npt.NDArray) -> npt.NDArray: """ Calculates the weighted mean of multiple measurement vectors. Parameters ---------- sigmas : npt.NDArray Sigma points of the measurement vector. Wm : npt.NDArray Weights for the calculation of the weighted mean of the measurement. Returns ------- npt.NDArray Weighted mean of the sigma points of the measurement vector. """ z = np.atleast_1d(np.degrees(math.circular_mean(np.radians(sigmas[:, 0]), Wm))) # heading return z
[docs] @staticmethod def generate_R_matrix_from_df(df: pd.DataFrame) -> npt.NDArray: """ Generate all measurement noise matrices based on the measurements of the dataframe object. Parameters ---------- df : pd.DataFrame DataFrame object containing the needed data to calculate the measurement noise matrices. Returns ------- npt.NDArray Measurement noise matrices for all measurements. """ R = np.empty((df.shape[0], 1, 1)) R[:, 0, 0] = df['heading_stdev'] return R
[docs] def get_prerecorded_z_and_R(self, last_prediction_step: int, i: int): """ Get prerecorded measurement and measurement noise for current prediction step. Using the last valid emasurement since last the last prediction step. Parameters ---------- last_prediction_step : int Last executed prediction step. i : int Current index of the prerecorded measurement and measurement noise array. """ measurements = self.z_array[last_prediction_step:i] last_measurement_index = (~np.isnan(measurements[:])).cumsum(0).argmax() self.z = np.atleast_1d(measurements[last_measurement_index].copy()) self.R = np.atleast_2d(measurements[last_measurement_index].copy()) self.filter.sensor_z[self.sensor_i] = np.broadcast_to(self.z, self.filter.max_z_dim) self.filter.sensor_R[self.sensor_i] = np.broadcast_to(self.R, (self.filter.max_z_dim, self.filter.max_z_dim))
[docs]class NovatelGPSPosition(Sensor): """ Sensor class for the position and speed measurement of the novatel gps. Attributes ---------- filter : ukf.UKF KalmanFilter the sensor will be used for. sensor_name : str Name to identify the sensor by. sensor_i : int Index of the sensor as part of the Kalman Filter. z_names : List[str] Names of the measurements of the sensor. z_dim : npt.NDArray Dimension of measurement vector derived by length of z_names. z_array : npt.NDArray Numpy array of prerecorded measurements. R_array : npt.NDArray Numpy array of prerecorded measurement noise covariance matrices. active : bool Specifies whether the update for a sensor should be executed or only the measurements should be read in. x_indices_to_update : Set Indices of the state variables to use for predict and update. Based on base and situation dependent extended. x_indices_to_update_base : Set Base of indices of the state variables to always use for predict and update. z_indices_to_update : npt.NDArray Measurement indices to use for update. z_indices_to_initialize : npt.NDArray Measurement indices to initialize while post_processing update. execute_update_postprocess : bool Flag to decide whether the update of the sensor should be postprocessed. Used to intiialize intern variables or new state variables. _transformer : pyproj.Transformer Transformer to convert gps measurements in world frame to map frame. R : npt.NDArray Covariance matrix. ros_R : npt.NDArray Variable to save R while another callback is executed, so R is not overwritten. """ def __init__(self, filter: ukf.UKF, sensor_name: str, z_names: List[str], z_array: Optional[npt.NDArray] = None, R_array: Optional[npt.NDArray] = None, active: bool = True) -> None: """ Initialize an object of the position gps sensor of the novatel gps. Parameters ---------- filter : ukf.UKF KalmanFilter object the sensor will be used for. sensor_name : str Name to identify the sensor by. z_names : List[str] List of the names of the measurements with the measurement count as length. z_array : npt.NDArray, optional Numpy Array of the prerecorded measurements. Prerecorded measurements of all sensor must be synchronized and have the same length, by default None. R_array : Optional[npt.NDArray], optional Numpy array of prerecorded measurement noises, by default None active : bool, optional Specifies whether the update for a sensor should be executed or only the measurements should be read in, by default True """ super().__init__(filter, z_array=z_array, R_array=R_array, sensor_name=sensor_name, z_names=z_names, active=active) self._transformer: Optional[pyproj.Transformer] = None self.transformer_rotation_matrix: Optional[npt.NDArray] = None self.transformer_t: Optional[float] = None self.R = np.full((self.z_dim, self.z_dim), np.nan) self.ros_R = np.full((self.z_dim, self.z_dim), np.nan)
[docs] def copy(self, filter: ukf.UKF, timestamp: float) -> 'NovatelGPSPosition': copy = NovatelGPSPosition(filter=filter, sensor_name=self.sensor_name, z_names=self.z_names, active=self.active) copy.ros_t_array = self.ros_t_array copy.ros_R_array = self.ros_R_array copy.ros_z_array = self.ros_z_array copy.measurement_lock = self.measurement_lock if self.transformer_t is not None and timestamp > self.transformer_t: copy._transformer = self._transformer copy.transformer_t = self.transformer_t copy.transformer_rotation_matrix = self.transformer_rotation_matrix return copy
[docs] def set_z_and_R_from_buffer(self, timestamp: float): with self.measurement_lock: length = len(self.ros_t_array) if length == 0: return timestamp_index = next((t_i for t_i, t in enumerate(self.ros_t_array[:length]) if t > timestamp), length) if timestamp_index == 0: return timestamp_index -= 1 if self.ros_t_array[timestamp_index] < timestamp - self.filter.predict_tolerance: return self.z = self.ros_z_array[timestamp_index].copy() if self.ros_R_array != []: self.R = self.ros_R_array[timestamp_index]
[docs] def prepare_update(self) -> bool: """ Prepares update step and returns whether the update for this sensor should be executed. Saves measurement and measurement noise for this sensor in the filter object and calculates measurement indices respectively for update and intiialization and state variable indices for predict and update. If initial world and map heading are initialized, but the transformer not yet and measurements are available, specify to postprocess to initialize the transformer object. If everything has been initialized already, transform gps measurement to map frame and specifiy to update. Returns ------- bool Specifies wthether the update for this sensor should be executed. """ self.filter.sensor_z[self.sensor_i] = np.broadcast_to(self.z, self.filter.max_z_dim) self.filter.sensor_R[self.sensor_i] = np.broadcast_to(self.R, (self.filter.max_z_dim, self.filter.max_z_dim)) if np.all(np.isfinite(self.z)): self.x_indices_to_update = self.x_indices_to_update_base self.z_indices_to_update = np.arange(self.z_dim, dtype=int) self.z_indices_to_initialize = np.array([]) if self._transformer is not None: z = self.z[0:2] transformed_coord = np.array(self._transformer.transform(xx=z[0], yy=z[1], errcheck=True, direction='INVERSE')) self.z[0:2] = np.dot(self.transformer_rotation_matrix, transformed_coord) return self.active else: self.execute_update_postprocess = True else: self.x_indices_to_update = set() self.z_indices_to_update = np.array([]) self.z_indices_to_initialize = np.array([]) return False
[docs] def postprocess_update(self) -> None: """ Postprocess update step for this sensor by initialzing transformer object if initial map and world heading have been intialized already. """ if (self._transformer is None and self.filter.initial_map_heading is not None and self.filter.initial_world_heading is not None): logging.info('Creating Transformer now!') alpha = self.filter.sensor('gps_heading')[1].hx(sigmas_f=np.zeros((1, 4)), u=None)[0, 0] self.create_transformer(lat_0=self.z[0], lon_0=self.z[1], alpha=alpha, x_0=self.filter.x[0], y_0=self.filter.x[1]) self.execute_update_postprocess = False
[docs] @staticmethod def hx(sigmas_f: npt.NDArray[np.floating], u: Optional[npt.NDArray[np.float32]]) -> npt.NDArray[np.float32]: """ Calculate the measurement the sensor should measure based on the current state vector and control input. Parameters ---------- sigmas_f : npt.NDArray[np.floating] Sigma points of the current state vector of the system. u : Optional[npt.NDArray] Current control input vector of the system. Returns ------- npt.NDArray Calculated measurement vector of the sensor. """ return sigmas_f[:, 0:3]
[docs] def hx_inverse(self, z: npt.NDArray, u: Optional[npt.NDArray] = None) -> npt.NDArray: """ Calculate the state vector based on the current measurement of the sensor. Not every state can be calculated, so only part of the state vector will be calculated. Use the transformer to convert lat, lon to local coordinates Parameters ---------- z : np.ndarray Measurement vector, can be a matrix with multiple time steps. Returns ------- npt.NDArray State vector, can be a matrix with multiple time steps. """ return z[..., 0:3]
[docs] def create_transformer(self, lat_0: float = 0, lon_0: float = 0, alpha: float = 0, x_0=0, y_0=0) -> pyproj.Transformer: """ Create a transformer object once to transform global and local coordinates. Initialize transformer with current global measurement, current estimated local position and global heading. Parameters ---------- lat_0 : float global origin of the latitude of the transformer object lon_0 : float global origin of the longitude of the transformer object alpha : float Azimuth of centerline clockwise from north at the center point of the line x_0 : int, optional local origin of the transformer object, by default 0 y_0 : int, optional local origin of the transformer object, by default 0 Returns ------- pyproj.Transformer Transformer to convert gps measurements in world frame to map frame. """ if self._transformer is not None: return crs = pyproj.CRS.from_proj4(f'+proj=sterea +lat_0={lat_0} +lon_0={lon_0} +k=1 +x_0={x_0} +y_0={y_0} +ellps=GRS80 +towgs84=0,0,0,0,0,0,0 +units=m +no_defs') self._transformer = pyproj.Transformer.from_crs(crs_from=crs, crs_to='EPSG:4326') angle = np.radians(alpha - 90) self.transformer_rotation_matrix = np.array([[np.cos(angle), -np.sin(angle)], [np.sin(angle), np.cos(angle)]]) rospy.loginfo(f'[SLAM] Initialized GPS Transformer with lat_0={lat_0}, lon_0={lon_0} and alpha={alpha}; {x_0}, {y_0}') self.transformer_t = self.filter.t
def __getstate__(self) -> dict: """ the saver cannot pickle the transformer. the __get_state__ function defines which attributes get pickled. Returns ------- dict state dict """ d = dict(self.__dict__) del d['_transformer'] return d
[docs] @staticmethod def generate_R_matrix_from_df(df: pd.DataFrame) -> npt.NDArray: """ Generate all measurement noise matrices based on the measurements of the dataframe object. Parameters ---------- df : pd.DataFrame DataFrame object containing the needed data to calculate the measurement noise matrices. Returns ------- npt.NDArray Measurement noise matrices for all measurements. """ R = np.zeros((df.shape[0], 3, 3)) R[:, 0, 0] = df['position_covariance_0'] R[:, 0, 1] = df['position_covariance_1'] R[:, 1, 0] = df['position_covariance_3'] R[:, 1, 1] = df['position_covariance_4'] R[:, 2, 2] = 0.1 * df['speed'] return R
[docs] def postprocess(self, saver: helpers.Saver): """ Postprocess measurements and measurement noises of sensor. Transform position measurements in latitude in world frame to to position measurements in map frame. Parameters ---------- saver : helpers.Saver Saver object containing the data of the novatel gps position sensor measurements to be postprocessed. """ sensor_i = self.filter.sensor('novatel_gps_position')[0] last_unused_gps_measurement_index = 0 gps_measurements = np.array(saver.sensor_z)[:last_unused_gps_measurement_index, sensor_i, 0:2] result = np.array(self._transformer.transform(xx=gps_measurements[:, 0], yy=gps_measurements[:, 1], errcheck=True, direction='INVERSE')) local_coords_rotated = np.dot(self.transformer_rotation_matrix, result).T for i in range(last_unused_gps_measurement_index): saver.sensor_z[i][sensor_i][0:2] = local_coords_rotated[i]
[docs] def get_prerecorded_z_and_R(self, last_prediction_step: int, i: int): """ Get prerecorded measurement and measurement noise for current prediction step. Using the last valid emasurement since last the last prediction step. Parameters ---------- last_prediction_step : int Last executed prediction step. i : int Current index of the prerecorded measurement and measurement noise array. """ measurements = self.z_array[last_prediction_step:i] last_measurement_index = (~np.isnan(measurements[:, 0])).cumsum(0).argmax() self.z = measurements[last_measurement_index].copy() self.R = self.R_array[last_prediction_step:i][last_measurement_index].copy()