Source code for sensors.sensor

"""
Implementation of the baseline sensor class.
"""
from __future__ import annotations
import threading
from typing import TYPE_CHECKING
if TYPE_CHECKING:
    import ukf

import rospy

import numpy.typing as npt
from typing import List, Optional, Set
import numpy as np

from utils import helpers


[docs]class Sensor(): """ Baseline sensor class. This class is used to extend the KalmanFilter class to handle multiple measurements dynamically. Attributes ---------- filter : ukf.UKF KalmanFilter the sensor will be used for. sensor_name : str Name to identify the sensor by. sensor_i : int Index of the sensor as part of the Kalman Filter. z_names : List[str] Names of the measurements of the sensor. z_dim : npt.NDArray Dimension of measurement vector derived by length of z_names. z_array : npt.NDArray Numpy array of prerecorded measurements. R_array : npt.NDArray Numpy array of prerecorded measurement noise covariance matrices. active : bool Specifies whether the update for a sensor should be executed or only the measurements should be read in. x_indices_to_update : Set Indices of the state variables to use for predict and update. Based on base and situation dependent extended. x_indices_to_update_base : Set Base of indices of the state variables to always use for predict and update. z_indices_to_update : npt.NDArray Measurement indices to use for update. z_indices_to_initialize : npt.NDArray Measurement indices to initialize while post_processing update. execute_update_postprocess : bool Flag to decide whether the update of the sensor should be postprocessed. Used to intiialize intern variables or new state variables. z : npt.NDArray Measurement. ros_z : npt.NDArray Variable to save z while another callback is executed, so z is not overwritten. ros_t_array : List[float] List to store timestamps of received sensor measurements. ros_z_array: List[npt.NDArray] List to store received sensor measurements corresponding to the timestamps in ros_t_array. ros_R_array : List[npt.NDArray] List to store the covariance matrices of the sensor measurements in ros_z_array at times in ros_t_array. """ def __init__(self, filter: ukf.UKF, sensor_name: str, z_names: List[str], z_array: Optional[npt.NDArray] = None, R_array: Optional[npt.NDArray] = None, active: bool = True) -> None: """ Initializes a Sensor object. Parameters ---------- filter : ukf.UKF KalmanFilter object the sensor will be used for. sensor_name : str Name to identify the sensor by. z_names : List[str] List of the names of the measurements with the measurement count as length. z_array : npt.NDArray, optional Numpy Array of the prerecorded measurements. Prerecorded measurements of all sensor must be synchronized and have the same length, by default None. R_array : Optional[npt.NDArray], optional Numpy array of prerecorded measurement noises, by default None active : bool, optional Specifies whether the update for a sensor should be executed or only the measurements should be read in, by default True """ self.R_array = R_array self.filter = filter self.z_array = z_array self.sensor_name = sensor_name self.z_names = z_names self.active = active self.sensor_i: Optional[int] = None self.z_dim = len(z_names) self.z = np.full(self.z_dim, np.nan) self.ros_t_array: List[float] = [] self.ros_z_array: List[npt.NDArray] = [] self.ros_R_array: List[npt.NDArray] = [] self.x_indices_to_update_base = {x_i for x_i in range(self.filter.x_r_dim)} self.x_indices_to_update: Set self.z_indices_to_update: npt.NDArray self.z_indices_to_initialize: npt.NDArray self.execute_update_postprocess = False self.measurement_lock = threading.Lock()
[docs] def set_z_and_R_from_buffer(self, timestamp: float): with self.measurement_lock: length = len(self.ros_t_array) if length == 0: return timestamp_index = next((t_i for t_i, t in enumerate(self.ros_t_array[:length]) if t > timestamp), length) if timestamp_index == 0: return # elif timestamp_index != -1: timestamp_index -= 1 if self.ros_t_array[timestamp_index] < timestamp - self.filter.predict_tolerance: return self.z = self.ros_z_array[timestamp_index].copy() if self.ros_R_array: self.R = self.ros_R_array[timestamp_index].copy()
[docs] def pop_z_and_R_from_buffer(self, local_mapping_timestamp: float): with self.measurement_lock: timestamp_index = next((t_i for t_i, t in enumerate(self.ros_t_array) if t > local_mapping_timestamp), None) del self.ros_t_array[:timestamp_index] del self.ros_z_array[:timestamp_index] del self.ros_R_array[:timestamp_index]
[docs] def hx(self, sigmas_f: npt.NDArray[np.floating], u: Optional[npt.NDArray]) -> npt.NDArray: """ Calculate the measurement the sensor should measure based on the current state vector and control input. Dummy function which only returns the current state vector. The detailed sensors will implement this function for the according context. Parameters ---------- sigmas_f : npt.NDArray[npt.NDArray] Sigma points of the current state vector of the system. u : Optional[npt.NDArray] Current control input vector of the system. Returns ------- npt.NDArray Calculated measurement vector of the sensor. """ return sigmas_f
[docs] def prepare_update(self) -> bool: """ Prepares update step and returns whether the update for this sensor should be executed. Saves measurement and measurement noise for this sensor in the filter object and calculates measurement indices respectively for update and intiialization and state variable indices for predict and update. Returns ------- bool Specifies wthether the update for this sensor should be executed. """ self.filter.sensor_z[self.sensor_i] = np.broadcast_to(self.z, self.filter.max_z_dim) self.filter.sensor_R[self.sensor_i] = np.broadcast_to(self.R, (self.filter.max_z_dim, self.filter.max_z_dim)) if np.all(np.isfinite(self.z)): self.x_indices_to_update = self.x_indices_to_update_base self.z_indices_to_update = np.arange(self.z_dim, dtype=int) self.z_indices_to_initialize = np.array([]) else: self.x_indices_to_update = set() self.z_indices_to_update = np.array([]) self.z_indices_to_initialize = np.array([]) return self.active and np.all(np.isfinite(self.z))
[docs] def postprocess_update(self) -> None: """ Postprocess update step for this sensor. Used to intitialize intern variable or a new state variable. """ pass
[docs] def hx_inverse(self, z: npt.NDArray, u: Optional[npt.NDArray] = None) -> npt.NDArray: """ Calculate the state vector based on the current measurement of the sensor. Not every state can be calculated, so only part of the state vector will be calculated. Parameters ---------- z : np.ndarray Measurement vector, can be a matrix with multiple time steps. Returns ------- np.ndarray State vector, can be a matrix with multiple time steps. """ return z
[docs] def postprocess(self, saver: helpers.Saver): """ Postprocess measurements and measurement noises of sensor. Parameters ---------- saver : helpers.Saver Saver object containing the data to postprocess. """ pass
[docs] def get_prerecorded_z_and_R(self, last_prediction_step: int, i: int): """ Get prerecorded measurement and measurement noise for current prediction step. Parameters ---------- last_prediction_step : int Last executed prediction step. i : int Current index of the prerecorded measurement and measurement noise array. """ self.z = np.atleast_1d(np.nanmean(self.z_array[last_prediction_step:i], axis=0)) self.R = np.atleast_1d(np.nanmean(self.R_array[last_prediction_step:i][:, last_prediction_step:i], axis=0))
def __repr__(self) -> str: """ Return representation of Sensor containing sensor class and sensor name. Returns ------- str Representation of Sensor containing sensor class and sensor name. """ return f'<{self.__class__.__name__}: {self.sensor_name}>' @property def z_to_update(self) -> npt.NDArray: """ Returns measurements to update. Returns ------- npt.NDArray Measurements to update """ return self.z[self.z_indices_to_update] @property def R_to_update(self): """ Returns measurement noise to update. Returns ------- npt.NDArray Measurement noise for the measurements to update. """ return self.R[self.z_indices_to_update][:, self.z_indices_to_update]
[docs] @staticmethod def measurement_mean(sigmas: npt.NDArray, Wm: npt.NDArray) -> npt.NDArray: """ Calculates the weighted mean of multiple measurement vectors. Parameters ---------- sigmas : npt.NDArray Sigma points of the measurement vector. Wm : npt.NDArray Weights for the calculation of the weighted mean of the measurement. Returns ------- npt.NDArray Weighted mean of the sigma points of the measurement vector. """ return np.dot(Wm, sigmas)
[docs] @staticmethod def residual_sigmash_and_z(sigmas_h: npt.NDArray[np.floating], z: npt.NDArray[np.floating]) -> npt.NDArray[np.floating]: """ Calculates the difference between the measurement sigma points and the measurement. Parameters ---------- sigmas_h : npt.NDArray[np.floating] Sigma points for the measurement. z : npt.NDArray[np.floating] Measurement. Returns ------- npt.NDArray[np.floating] Difference between sigma points and measurements. """ return np.subtract(sigmas_h, z)