Source code for ukf

"""

"""
from __future__ import annotations

import os
import cProfile
import warnings
from pathlib import Path
from typing import List, Optional, Set, Tuple
import logging
import coloredlogs

import numpy as np
import numpy.typing as npt
from scipy import interpolate
from statsmodels.stats.correlation_tools import cov_nearest
import yaml

from filterpy.kalman.UKF import UnscentedKalmanFilter
from sigma_points import MerwePoints

from utils import plot, vehicle_dynamics, math, helpers
from sensors import CUREDiffWheelspeed, Sensor, LocalMapping, NovatelGPSPosition, NovatelGPSHeading

profiling = True
plotting = True
saving_saver = True
test_case = 'improve_viz'
logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)
coloredlogs.install(level='INFO', logger=logger)


[docs]class UKF(UnscentedKalmanFilter): """ Unscented Kalman Filter class which inherits from the UKF implementation in filterpy. Filter used to combine control inputs from the steering wheel and measurements from the IMU, the wheel speed sensors and the GPS. The goal is to approximate the driven track of our car as accurately as possible. The control inputs are used to predict the next state vector which is then updated using the measurements from the sensors. Attributes ---------- x : npt.NDArray State vector of x, y, v_x, psi. x_names : List[str] Names of the different parameters in x. u : npt.NDArray Control input at one timestamp. ros_u : npt.NDArray Variable to prevent u from being overwritten during ROS callbacks. u_array : npt.NDArray Control inputs at all timestamps. u_names : List[str] Name of the control input in u. t_array : npt.NDArray Timestamps where measurements or control inputs exist. sensors : List[Sensor] The sensors used for the update step. They hold the measurements z, covariances R and the transformation function Hx. last_prediction : int Index of the control input for which the last prediction was done. predict_tolerance : float Time in which the same prediction can be used. If exceeded, a new prediction is necessary. P : npt.NDArray State covariance matrix. Holds the covariances of all components of x. Q : npt.NDArray Process covariance matrix. landmark_ids : List[float] List of ids of tracked landmarks. fake_landmark_ids : List[float] List of ids of tracked landmarks to send out. Used to fake ids for skidpad cones for planning. landmark_perceived_n : List[int] List of how often this specifc landmark has already been perceived. mapping : List[npt.NDArray] Mapping / association between the current observed landmarks and all tracked landmarks. local_mapping_updates : List[int] List of all time steps the kalman filter has been updated with the local mapping. local_mapping_nodes_explored_n : int Number of explored nodes during data association. local_mapping_compared_associations_n : int Number of finally compared best associations object during data association. individual_compatability : Optional[npt.NDArray] Individual compatibility between the observed and observable landmarks of the last local mapping update. observable_to_global_mapping : Optional[npt.NDArray] Mapping between the observable and all tracked landmarks of the last local mapping update. update_strategy : helpers.UpdateStrategy Update strategy to use during the update step. initial_world_heading : Optional[float] Vehicle heading in world frame during first measurement. initial_map_heading : Optional[float] Vehicle heading in map frame during first vehicle heading in world frame measurement. executed_steps : List[str] String representation of executed steps (predict, update for sensors) for this step. template_Q : npt.NDArray Template for robot pose process noise Q which only needs to be multiplied by dt for current predict step. max_z_dim : int Maximum dimension of measurements of all sensors. max_decimals : int Maximum number of decimal places behind the comma for the state noise matrix. observed_landmarks : npt.NDArray Currently observed landmark relative to vehicle pose. observable_landmarks : npt.NDArray Currently observable landmarks relative to vehicle pose. observable_state_covariance : npt.NDArray Covariance of currently observable landmark. sigma_alpha : float Parameter for Merwe Sigma Points. minimal_cone_covariance_area : float Minimal size of the covariance ellipse. perceived_n_threshold : int How many times the landmark should been perceived to not be discarded during map clean up. steering_wheel_to_ackermann_lut : Look up table to convert steering wheel angle (degrees) to ackermann angle (degrees). """ def __init__(self, dt: float = 0.01, update_strategy: helpers.UpdateStrategy = helpers.UpdateStrategy.ALL_STATES, sigma_alpha: float = 0.1, minimal_cone_covariance_area: float = 0.15, u_dim: int = None, u_array: npt.NDArray = None, t_array: npt.NDArray = None, name: str = 'ukf', P: List[float] = [0.1, 0.1, 0.2, 3], template_Q: List[float] = [1, 1, 5, 15], perceived_n_threshold: int = 1, steering_model: Optional[dict] = None, steering_wheel_to_ackermann_lut: Optional[interpolate.interp1d] = None) -> None: """ Initialization function. Parameters ---------- dt : float, optional Time in which the same prediction can be used. If exceeded, a new prediction is necessary, by default 0.01 update_strategy : UpdateStrategy, optional Update strategy to use during the update step, by default UpdateStrategy.ALL_STATES u_dim : int, optional Dimension of control inputs, by default None u_array : npt.NDArray, optional Control inputs at all timestamps, by default None t_array : npt.NDArray, optional Timestamps where measurements or control inputs exist, by default None P: List[float], Optional State covariance matrix. Holds the covariances of all components of x. template_Q: List[float], optional Template for robot pose process noise Q which only needs to be multiplied by dt for current predict step. perceived_n_threshold : int, optional How many times the landmark should been perceived to not be discarded during map clean up. """ assert steering_wheel_to_ackermann_lut is not None or steering_model is not None self.x_r_dim = 4 # robot pose x self.x = np.zeros(self.x_r_dim) # Logic to decide where to get the dimension of u from if u_array is not None: u_array = u_array.reshape((u_array.shape[0], u_array.shape[-1])) if u_dim is not None and u_array is not None and u_dim != u_array.shape[-1]: u_dim = u_array.shape[-1] warnings.warn('Warning: The specified dimension of u is not the same as the dimension of the given u_array.' ' We used the dimension of the u_array.') elif u_dim is None and u_array is not None: u_dim = u_array.shape[-1] elif u_dim is None: u_dim = 0 self.u = np.full(u_dim, np.nan) self.ros_u = np.full(u_dim, np.nan) self.max_z_dim = 1 self.u_array = u_array self.t_array = t_array self.sensors: List[Sensor] = [] self.last_prediction: int = 0 self.update_strategy = update_strategy self.executed_steps: List[str] = [] self.sensor_updated_z_indices: List[npt.NDArray] = [] self.predict_tolerance = dt self.x_names = ['x', 'y', 'v_x', 'psi'] self.u_names = ['delta', 'a_x', 'psi_dot'] self.current_global_landmarks: Optional[npt.NDArray] = None self.initial_heading_t: Optional[float] = None self.initial_world_heading: Optional[float] = None self.initial_map_heading: Optional[float] = None self.landmark_ids: List[float] = [] self.fake_landmark_ids: List[float] = [] self.landmark_perceived_n: List[int] = [] self.perceived_n_threshold = perceived_n_threshold self.mapping: List[npt.NDArray] = [] self.individual_compatability: Optional[npt.NDArray] = None self.observable_to_global_mapping: Optional[npt.NDArray] = None self.local_mapping_updates: List[int] = [] self.local_mapping_nodes_explored_n: int = 0 self.local_mapping_compared_associations_n: int = 0 template_Q[3] = np.radians(template_Q[3]) self.template_Q = np.diag(np.array(template_Q)**2) self.observed_landmarks: npt.NDArray = None self.observable_landmarks: npt.NDArray = None self.observable_state_covariance: npt.NDArray = None self.sigma_alpha = sigma_alpha self.minimal_cone_covariance_area = minimal_cone_covariance_area self.name = name if steering_wheel_to_ackermann_lut is not None: self.steering_wheel_to_ackermann_lut = steering_wheel_to_ackermann_lut else: self.calculate_inverse_steering_lookup_table(model=steering_model) super().__init__(dim_x=self.x.shape[0], dim_z=0, dt=dt, hx=Sensor.hx, fx=self.f_x, points=MerwePoints(n=self.x.shape[0], alpha=self.sigma_alpha, beta=2, kappa=3-self.x.shape[0], subtract=self.residual_sigmaf_and_x), x_mean_fn=self.state_mean) P[3] = np.radians(P[3]) self.P = np.diag(np.array(P)**2) self.max_decimals = 7 def __repr__(self): return f'<UKF {self.name}>'
[docs] def calculate_inverse_steering_lookup_table(self, model: dict, num: int = 50): # define the range of the steering_wheel steering_wheel_angle = np.linspace(-0.5 * model['steering_range'], 0.5 * model['steering_range'], num=num) # calculate the corresponding Ackermann-Angle (between x-axis an tire) for each tire ackermann_angle = np.empty_like(steering_wheel_angle) for i in range(ackermann_angle.shape[0]): ackermann_angle[i] = vehicle_dynamics.steering_wheel_to_ackermann(steering_wheel_angle[i], model=model) # return the array for the steering_wheel_anngle and the corresponding angle of the tire self.steering_wheel_to_ackermann_lut = interpolate.interp1d(np.degrees(steering_wheel_angle), ackermann_angle)
[docs] def copy(self, x: npt.NDArray, P: npt.NDArray, timestamp: float, name: str = 'default') -> 'UKF': copy = UKF(dt=self.predict_tolerance, update_strategy=self.update_strategy, sigma_alpha=self.sigma_alpha, minimal_cone_covariance_area=self.minimal_cone_covariance_area, u_dim=self.u.shape[0], steering_wheel_to_ackermann_lut=self.steering_wheel_to_ackermann_lut, perceived_n_threshold=self.perceived_n_threshold) copy.template_Q = self.template_Q copy.P = P.copy() copy.x = x.copy() copy.landmark_ids = self.landmark_ids.copy() copy.fake_landmark_ids = self.fake_landmark_ids.copy() copy.landmark_perceived_n = self.landmark_perceived_n.copy() copy.t = timestamp copy.name = name # only set world and map heading if timestamp of copy is later than transformer definition if self.initial_heading_t is not None and timestamp > self.initial_heading_t: copy.initial_world_heading = self.initial_world_heading copy.initial_map_heading = self.initial_map_heading copy.initial_heading_t = self.initial_heading_t for sensor in self.sensors: copy.add_sensor(sensor.copy(filter=copy, timestamp=timestamp)) copy.sensor_z = self.sensor_z.copy() copy.sensor_R = self.sensor_R.copy() copy.max_z_dim = self.max_z_dim return copy
[docs] def sensor(self, sensor_name: str) -> Tuple[int, Sensor]: """ Return the index and the sensor object given the sensor name. ... Parameters ---------- sensor_name : str Name of the sensor. Returns ------- Tuple[int, Sensor] The index and the sensor object. Raises ------ KeyError Raised if the sensor can not be found in the list of the filter's sensors. """ for i, sensor in enumerate(self.sensors): if sensor.sensor_name == sensor_name: return i, sensor raise KeyError(f'The sensor {sensor_name} is not in the list of sensors.')
@property def x_necessary(self) -> npt.NDArray: """ Returns the necessary states for the prediction and update based on self.x_indices_to_update. Returns ------- npt.NDArray Necessary states for the prediction and update based on self.x_indices_to_update. """ if self.update_strategy == helpers.UpdateStrategy.ALL_STATES: return self.x elif self.update_strategy == helpers.UpdateStrategy.ONLY_ROBOT_POSE_OR_ALL_STATES: if len(self.x_indices_to_update) <= self.x_r_dim: return self.x[:self.x_r_dim] else: return self.x elif self.update_strategy == helpers.UpdateStrategy.ONLY_ROBOT_POSE_AND_NECESSARY_STATES: state_indices = np.sort(np.array(list(self.x_indices_to_update), dtype=int)) return self.x[state_indices] @x_necessary.setter def x_necessary(self, value: npt.NDArray): """ Sets the necessary states for the prediction and update based on self.x_indices_to_update. Parameters ---------- value : npt.NDArray State variables values to set. """ if self.update_strategy == helpers.UpdateStrategy.ALL_STATES: self.x = value elif self.update_strategy == helpers.UpdateStrategy.ONLY_ROBOT_POSE_OR_ALL_STATES: if len(self.x_indices_to_update) <= self.x_r_dim: self.x[:self.x_r_dim] = value else: self.x = value elif self.update_strategy == helpers.UpdateStrategy.ONLY_ROBOT_POSE_AND_NECESSARY_STATES: state_indices = np.sort(np.array(list(self.x_indices_to_update), dtype=int)) self.x[state_indices] = value @property def P_necessary(self) -> npt.NDArray: """ Returns the necessary state noises for the prediction and update based on self.x_indices_to_update. Returns ------- npt.NDArray Necessary state noises for the prediction and update based on self.x_indices_to_update. """ if self.update_strategy == helpers.UpdateStrategy.ALL_STATES: return self.P elif self.update_strategy == helpers.UpdateStrategy.ONLY_ROBOT_POSE_OR_ALL_STATES: if len(self.x_indices_to_update) <= self.x_r_dim: robot_pose_state_indices = np.arange(self.x_r_dim, dtype=int) return self.P[np.ix_(robot_pose_state_indices, robot_pose_state_indices)] else: return self.P elif self.update_strategy == helpers.UpdateStrategy.ONLY_ROBOT_POSE_AND_NECESSARY_STATES: state_indices = np.sort(np.array(list(self.x_indices_to_update), dtype=int)) return self.P[np.ix_(state_indices, state_indices)] @P_necessary.setter def P_necessary(self, value: npt.NDArray): """ Sets the necessary state noises for the prediction and update based on self.x_indices_to_update. Parameters ---------- value : npt.NDArray State noises to set. """ if self.update_strategy == helpers.UpdateStrategy.ALL_STATES: self.P = value elif self.update_strategy == helpers.UpdateStrategy.ONLY_ROBOT_POSE_OR_ALL_STATES: if len(self.x_indices_to_update) <= self.x_r_dim: robot_pose_state_indices = np.arange(self.x_r_dim, dtype=int) self.P[np.ix_(robot_pose_state_indices, robot_pose_state_indices)] = value else: self.P = value elif self.update_strategy == helpers.UpdateStrategy.ONLY_ROBOT_POSE_AND_NECESSARY_STATES: state_indices = np.sort(np.array(list(self.x_indices_to_update), dtype=int)) self.P[np.ix_(state_indices, state_indices)] = value @property def x_R(self) -> npt.NDArray: """ Returns all physical state variables concerning the robot pose. Returns ------- npt.NDArray State vector of the robot pose. [x, y, v_x, psi]. """ return self.x[:self.x_r_dim] @x_R.setter def x_R(self, value: npt.NDArray): """ Sets all physical state variables concerning the robot pose. """ self.x[:self.x_r_dim] = value @property def landmarks(self) -> npt.NDArray: """ Returns cartesian coordinates of all tracked landmarks. Returns ------- npt.NDArray Cartesian coordinates of all tracked landmarks. """ return np.reshape(self.x[self.x_r_dim:], (-1, 2)) @property def landmarks_prior(self) -> npt.NDArray: """ Returns cartesian coordinates of all prior tracked landmarks. Returns ------- npt.NDArray Cartesian coordinates of all prior tracked landmarks. """ return np.reshape(self.x_prior[self.x_r_dim:], (-1, 2)) @landmarks.setter def landmarks(self, value: npt.NDArray): self.x[self.x_r_dim:] = value.flatten() @property def x_R_prior(self) -> npt.NDArray: """ Returns prior of the state vector of the robot pose. Returns ------- npt.NDArray Prior of the state vector of the robot pose. """ return self.x_prior[:self.x_r_dim] @property def x_R_post(self) -> npt.NDArray: """ Returns posterior of the state vector of the robot pose. Returns ------- npt.NDArray Post of the state vector of the robot pose. """ return self.x_post[:self.x_r_dim] @property def P_R(self) -> npt.NDArray: """ Returns state noise covariance matrix of the robot pose state vector. Returns ------- npt.NDArray State noise covariance matrix of the robot pose state vector. """ return self.P[:self.x_r_dim, :self.x_r_dim] @P_R.setter def P_R(self, value: npt.NDArray): """ Sets state noise covariance matrix of the robot pose state vector. """ self.P[:self.x_r_dim, :self.x_r_dim] = value @property def P_R_post(self) -> npt.NDArray: """ Returns posterior of the state noise covariance matrix of the robot pose state vector. Returns ------- npt.NDArray Post of the state noise covariance matrix of the robot pose state vector. """ return self.P_post[:self.x_r_dim, :self.x_r_dim] @property def P_R_prior(self) -> npt.NDArray: """ Returns prior of the state noise covariance matrix of the robot pose state vector. Returns ------- npt.NDArray Prior of the state noise covariance matrix of the robot pose state vector. """ return self.P_prior[:self.x_r_dim, :self.x_r_dim]
[docs] def calculate_Q(self, dt: float, x_dim: int) -> None: """ Calculate state transition noise covariance matrix Q based on the current dt. Parameters ---------- dt : float Current dt of the prediction step. x_dim : int Number of state variable dimension to use for prediction. """ self.Q = np.zeros((x_dim, x_dim)) self.Q[:self.x_r_dim, :self.x_r_dim] = self.template_Q * dt
[docs] @staticmethod def f_x(sigma_points: npt.NDArray, dt: int, u: npt.NDArray, steering_wheel_to_ackermann_lut) -> npt.NDArray: """ Calculate the predicted next state based on the current state x,the timestep t and the control input u. Using a simple bicycle model that disregards any side slip angle. .. figure:: images/f_x_model.png :class: with-border This is the caption. Parameters ---------- sigma_points : npt.NDArray All sigma points around the current state vector. dt : int Timestep since last predict. u : npt.NDArray Current control input vector of the system. Returns ------- npt.NDArray New estimated state vector of the system """ sigma_points[:, 2] += u[1] * dt sigma_points[:, 3] += np.radians(u[2]) * dt steering_wheel_angle = u[0] velocity = sigma_points[:, 2] distance = velocity * dt ackermann = steering_wheel_to_ackermann_lut(steering_wheel_angle) if ackermann != 0: turning_radius = vehicle_dynamics.ackermann_to_radius(ackermann) alpha = distance / np.abs(turning_radius) local_pos = np.array([np.abs(turning_radius) * np.sin(alpha), turning_radius * (1 - np.cos(alpha))]).T else: local_pos = np.stack([distance, np.zeros(sigma_points.shape[0])], axis=1) rot_matrix = np.empty((sigma_points.shape[0], 2, 2)) cos, sin = np.cos(-sigma_points[0, 3]), np.sin(-sigma_points[0, 3]) rot_matrix[:, 0, 0], rot_matrix[:, 0, 1], rot_matrix[:, 1, 0], rot_matrix[:, 1, 1] = cos, sin, -sin, cos rot_pos = np.einsum("ijk, ijk -> ij", rot_matrix, local_pos.reshape(-1, 1, 2)) sigma_points[:, 0:2] += rot_pos return sigma_points
[docs] @staticmethod def state_mean(sigmas: npt.NDArray, Wm: npt.NDArray) -> npt.NDArray: """ Calculate the mean of the sigma points and weights. The mean for the angle in the state vector has to be calculated seperately. Parameters ---------- sigmas : npt.NDArray Sigma Points used in the filter. Wm : npt.NDArray Weights of the sigma points. Returns ------- npt.NDArray Mean of the sigma points. """ x = np.average(sigmas, axis=0, weights=Wm) # since psi is an angle, the mean has to be calculated in a very special way x[3] = math.circular_mean(sigmas[:, 3], Wm, normalize=False) # psi return x
[docs] def unscented_transform(self, sigmas, Wm, Wc, noise_cov=None, mean_fn=None, residual_fn=None): if mean_fn is None: # new mean is just the sum of the sigmas * weight x = np.dot(Wm, sigmas) # dot = \Sigma^n_1 (W[k]*Xi[k]) else: x = mean_fn(sigmas, Wm) # new covariance is the sum of the outer product of the residuals # times the weights y = residual_fn(sigmas, x) P = np.around(np.einsum('l,ln,lm->nm', Wc, y, y), decimals=self.max_decimals) if noise_cov is not None: P += noise_cov return (x, P)
[docs] def cross_variance(self, x, z, sigmas_f, sigmas_h): """ Compute cross variance of the state `x` and measurement `z`. """ dx = self.residual_sigmaf_and_x(sigmas_f=sigmas_f, x=x) dz = self.residual_z(sigmas_h, z) Pxz = np.around(np.einsum('l,ln,lm->nm', self.Wc, dx, dz), decimals=self.max_decimals) return Pxz
[docs] @staticmethod def residual_sigmaf_and_x(sigmas_f: npt.NDArray[np.floating], x: npt.NDArray[np.floating]) -> npt.NDArray[np.floating]: """ Calculate the difference the sigma points and the state vector. As the state vector contains an angle, the result from the subtraction has to be normalized to the interval [-pi:pi]. Parameters ---------- sigmas_f : npt.NDArray Sigma point. x : npt.NDArray Current state vector. Returns ------- npt.NDArray Difference between sigma points and state vector. """ dx = sigmas_f - x return dx
[docs] def add_sensor(self, sensor_obj: Sensor) -> Sensor: """ Add a new sensor object to the filter. Possible options for sensor classes are defined in the sensors folder. Parameters ---------- sensor_obj : Sensor The sensor object to add. Returns ------- sensor_obj : Sensor The added sensor. """ sensor_obj.sensor_i = len(self.sensors) self.sensors.append(sensor_obj) return sensor_obj
[docs] def recreate_sigma_points(self, x_dim: int): self.points_fn = MerwePoints(n=x_dim, alpha=self.sigma_alpha, beta=2, kappa=3-x_dim, subtract=self.residual_sigmaf_and_x) self._num_sigmas = self.points_fn.num_sigmas() self._dim_x = x_dim self.sigmas_f = np.zeros((self._num_sigmas, self._dim_x)) self.sigmas_h = np.zeros((self._num_sigmas, self._dim_z)) self.Wm, self.Wc = self.points_fn.Wm, self.points_fn.Wc
[docs] def compute_process_sigmas(self, dt, fx=None, **fx_args): """ computes the values of sigmas_f. Normally a user would not call this, but it is useful if you need to call update more than once between calls to predict (to update for multiple simultaneous measurements), so the sigmas correctly reflect the updated state x, P. """ if fx is None: fx = self.fx # calculate sigma points for given mean and covariance sigmas = self.points_fn.sigma_points(self.x_necessary, self.P_necessary) self.sigmas_f = self.fx(sigmas, dt, steering_wheel_to_ackermann_lut=self.steering_wheel_to_ackermann_lut, **fx_args)
[docs] def residual_sigmash_and_z(self, sigmas_h: npt.NDArray, z: npt.NDArray) -> npt.NDArray: """ Calculates the difference between the measurement sigma points and the measurement for all sensors. Parameters ---------- sigmas_h : npt.NDArray Sigma points for all measurements. z : npt.NDArray Measurements of all sensors. Returns ------- npt.NDArray Difference between sigma points and measurements of all sensors. """ y = np.empty_like(sigmas_h) last_z_index = 0 for sensor in self.sensors_to_update: z_end_index = last_z_index + sensor.z_to_update.shape[0] z_indices = np.arange(last_z_index, z_end_index) y[..., z_indices] = sensor.residual_sigmash_and_z(sigmas_h=sigmas_h[..., z_indices], z=z[z_indices]) last_z_index = z_end_index return y
[docs] def measurement_mean(self, sigmas: npt.NDArray, Wm: npt.NDArray) -> npt.NDArray: """ Calculates the weighted mean of all sigma points of the measurement vectors of all sensors. Parameters ---------- sigmas : npt.NDArray Sigma points of the measurement vector of all sensors.. Wm : npt.NDArray Weights for the calculation of the weighted mean of the measurement. Returns ------- npt.NDArray Weighted mean of the sigma points of the measurement vectors of all sensors. """ zp = np.empty((sigmas.shape[1])) last_z_index = 0 for sensor in self.sensors_to_update: z_end_index = last_z_index + sensor.z_to_update.shape[0] z_indices = np.arange(last_z_index, z_end_index) zp[z_indices] = sensor.measurement_mean(sigmas=sigmas[:, z_indices], Wm=Wm) last_z_index = z_end_index return zp
[docs] def update(self, u: Optional[npt.NDArray] = None): """ Update the UKF with the given measurements. On return, self.x and self.P contain the new mean and covariance of the filter. Parameters ---------- u : Optional[npt.NDArray] Current control input vector of the system. """ # pass prior sigmas through h(x) to get measurement sigmas # the shape of sigmas_h will vary if the shape of z varies, so # recreate each time x = self.x_necessary P = self.P_necessary # Calculate how many measurements there will be and # initialize enitre measurement vector, measurement noise matrix and sigma points z_n = sum([sensor.z_to_update.shape[0] for sensor in self.sensors_to_update]) sigmas_h = np.empty((self.sigmas_f.shape[0], z_n)) R = np.zeros((z_n, z_n)) z = np.empty((z_n)) # calculate sigma points for based on the respective masurement for every sensor last_z_index = 0 self.sensor_updated_z_indices = [] for sensor in self.sensors_to_update: z_end_index = last_z_index + sensor.z_to_update.shape[0] z_indices = np.arange(last_z_index, z_end_index) sigmas_h[:, z_indices] = sensor.hx(sigmas_f=self.sigmas_f, u=u) R[np.ix_(z_indices, z_indices)] = sensor.R_to_update z[z_indices] = sensor.z_to_update last_z_index = z_end_index self.executed_steps.append(sensor.sensor_name) self.sensor_updated_z_indices.append(z_indices) self.sigmas_h = np.atleast_2d(sigmas_h) # mean and covariance of prediction passed through unscented transform zp, self.S = self.unscented_transform(self.sigmas_h, self.Wm, self.Wc, R, self.measurement_mean, self.residual_z) self.zp = zp self.SI = self.inv(self.S) # compute cross variance of the state and the measurements Pxz = self.cross_variance(x, zp, self.sigmas_f, self.sigmas_h) self.K = np.dot(Pxz, self.SI) # Kalman gain self.y = self.residual_sigmash_and_z(z, zp) # residual # update Gaussian state estimate (x, P) x = x + np.dot(self.K, self.y) P -= np.around(np.dot(self.K, np.dot(self.S, self.K.T)), decimals=self.max_decimals) # limit minimal cone covariance for i in range(self.x_r_dim, x.shape[0], 2): factor = np.pi * np.product(np.linalg.eigvals(P[i:i+2, i:i+2])) / self.minimal_cone_covariance_area if factor < 1: P[i:i+2, i:i+2] /= factor self.x_necessary = x self.P_necessary = P # save measurement and posterior state self.z = z.copy() self.R = R.copy() self.x_post = self.x.copy() self.P_post = self.P.copy() # set to None to force recompute self._log_likelihood = None self._likelihood = None self._mahalanobis = None
[docs] def predict(self, dt=None, UT=None, fx=None, **fx_args): r""" Performs the predict step of the UKF. On return, self.x and self.P contain the predicted state (x) and covariance (P). ' Important: this MUST be called before update() is called for the first time. Parameters ---------- dt : double, optional If specified, the time step to be used for this prediction. self._dt is used if this is not provided. fx : callable f(x, **fx_args), optional State transition function. If not provided, the default function passed in during construction will be used. UT : function(sigmas, Wm, Wc, noise_cov), optional Optional function to compute the unscented transform for the sigma points passed through hx. Typically the default function will work - you can use x_mean_fn and z_mean_fn to alter the behavior of the unscented transform. **fx_args : keyword arguments optional keyword arguments to be passed into f(x). """ if dt is None: dt = self._dt if UT is None: UT = self.unscented_transform # calculate sigma points for given mean and covariance self.compute_process_sigmas(dt, fx, **fx_args) # and pass sigmas through the unscented transform to compute prior # only use the robot pose states for the predict step. self.x_necessary, self.P_necessary = UT(self.sigmas_f, self.Wm, self.Wc, self.Q, self.x_mean, self.residual_sigmaf_and_x) # save prior self.x_prior = np.copy(self.x) self.P_prior = np.copy(self.P)
[docs] def prepare_update(self): """ Prepare the updates for all sensors. """ self.sensors_to_update: List[Sensor] = [] self.x_indices_to_update: Set[int] = set(np.arange(self.x_r_dim, dtype=int)) for sensor in self.sensors: if sensor.prepare_update(): self.sensors_to_update.append(sensor) self.x_indices_to_update.update(sensor.x_indices_to_update)
[docs] def try_predict(self, dt: float, u: npt.NDArray): """ Try to predict based on timestep and control input, correct state noise if there is a non-semi-positive-definite matrix and try again. Parameters ---------- dt : float Timestep since last predict. u : npt.NDArray Control input. """ x_dim = self.x_necessary.shape[0] self.calculate_Q(dt=dt, x_dim=x_dim) if self.points_fn.n != x_dim: self.recreate_sigma_points(x_dim=x_dim) # TODO: vllt vorgenerieren try: self.predict(dt=dt, u=u, UT=self.unscented_transform) except np.linalg.LinAlgError: logging.critical(f'P is not semi-positive-definite (predict after {self.executed_steps[-1]}).' ' Finding nearest covariance matrix and trying again!') self.P = cov_nearest(self.P) self.predict(dt=dt, u=u, UT=self.unscented_transform) self.executed_steps = ['predict']
[docs] def try_update(self, u: npt.NDArray): """ Try to predict based on control input, correct state noise if there is a non-semi-positive-definite matrix and try again. Parameters ---------- u : npt.NDArray Control input. """ try: self.update(u=u) except np.linalg.LinAlgError: logging.critical('P is not positive definite (update).' 'Finding nearest covariance matrix and trying again!') self.P = cov_nearest(self.P) self.update(u=u)
[docs] def postprocess_update(self): """ Postprocess update for all sensors if the respective flag specifies. """ for sensor in self.sensors: if sensor.execute_update_postprocess is True: sensor.postprocess_update()
[docs] def cleanup_map(self): landmark_perceived_n = np.array(self.landmark_perceived_n) condition = landmark_perceived_n >= self.perceived_n_threshold state_condition = np.concatenate([np.full(self.x_r_dim, True), np.dstack([condition, condition]).flatten()]) self.x = self.x[state_condition] self.P = self.P[np.ix_(state_condition, state_condition)] self.landmark_ids = np.array(self.landmark_ids)[condition].tolist() self.fake_landmark_ids = np.array(self.fake_landmark_ids)[condition].tolist() self.landmark_perceived_n = landmark_perceived_n[condition].tolist()
[docs] def run_batch(self, saver: helpers.Saver) -> None: """ Run predict and update steps for the given control inputs and measurements. Using the prerecorded measurements that have been passed during the initialization of the filter and sensors. If there are values for all control inputs and the time since the last predict step is bigger than the predict tolerance, a new prediction is made. Updates are done with the mean of the meausrements since the last prediction. Parameters ---------- saver : helpers.Saver Saver object to save the filter object at every timestep. """ assert self.u_array is not None assert self.t_array is not None self.sensor_z = [None] * len(self.sensors) self.sensor_R = [None] * len(self.sensors) # Calculate the maximal z dimension based on the prerecorded measurement arrays. self.max_z_dim = max([sensor.z_array.shape[1] if sensor.z_array is not None and sensor.z_array.ndim > 1 else 1 for sensor in self.sensors]) local_mapping = self.sensor('local_mapping')[1] prediction_step = 0 bar = helpers.generate_progressbar(self.u_array.shape[0]).start() for i in range(self.u_array.shape[0]): # if any control input is finite, remember it if np.any(np.isfinite(self.u_array[i])): mask = np.isfinite(self.u_array[i]) self.u[mask] = self.u_array[i][mask] time_since_predict = self.t_array[i] - self.t_array[self.last_prediction] # the following code contains the calling of the predict and update procedure # it will only be executed if all control inputs have been defined already # and either there has not been a predict in the last self.predict_tolerance seconds # or if there is new data from the local_mapping if np.all(np.isfinite(self.u)) and (time_since_predict > self.predict_tolerance or np.all(np.isfinite(local_mapping.z_array[i]))): # Gets the measurements of all sensors for this particular step, for sensor in self.sensors: sensor.get_prerecorded_z_and_R(last_prediction_step=self.last_prediction, i=i) self.prepare_update() self.try_predict(dt=time_since_predict, u=self.u) self.try_update(u=self.u) self.postprocess_update() self.last_prediction = i self.t = self.t_array[i] saver.save() prediction_step += 1 bar.update(value=i, prediction_step=prediction_step) bar.finish()
if __name__ == '__main__': from utils import data_import df = data_import.RosbagAndMDFImporter(bag_filename='/home/martina/CURE/as_ros/rosbags/run_373_per.bag').df.loc[0:15] with open('src/slam/param_study/test/configs/test_case_0.yaml', 'r') as config_file: settings = yaml.safe_load(config_file) ukf = UKF(u_array=df[['steering_wheel_angle', 'linear_acceleration.x', 'angular_velocity.z']].to_numpy(), update_strategy=helpers.UpdateStrategy(settings['ukf']['update_strategy']), sigma_alpha=settings['ukf']['sigma_alpha'], minimal_cone_covariance_area=settings['ukf']['minimal_cone_covariance_area'], P=settings['ukf']['P'], template_Q=settings['ukf']['template_Q'], u_dim=3, t_array=np.asarray(df.index.to_numpy(), dtype=np.float32)) ukf.add_sensor(NovatelGPSHeading(filter=ukf, z_array=df['heading'].to_numpy(), sensor_name='novatel_gps_heading', z_names=['world_heading'], R_array=NovatelGPSHeading.generate_R_matrix_from_df(df), active=settings['novatel_heading']['active'])) ukf.add_sensor(LocalMapping(filter=ukf, z_array=df['cones'].to_numpy(), active=settings['local_mapping']['active'], sensor_name='local_mapping', z_names=['landmarks'], data_association=helpers.DataAssociation( settings['local_mapping']['data_association']['method']), use_kd_tree=settings['local_mapping']['data_association']['use_kd_tree'], chi2_quantile=settings['local_mapping']['data_association']['chi2_quantile'], fov_min_distance=settings['local_mapping']['fov_gate']['min_distance'], fov_max_distance=settings['local_mapping']['fov_gate']['max_distance'], fov_angle=settings['local_mapping']['fov_gate']['angle'], fov_scaling=settings['local_mapping']['fov_gate']['scaling'], probability_gate=settings['local_mapping']['probability_gate'], R_azimuth=settings['local_mapping']['R_azimuth'], R_distance_factor=settings['local_mapping']['R_distance_factor'], R_offset=settings['local_mapping']['R_distance_offset'])) ukf.add_sensor(NovatelGPSPosition(filter=ukf, z_array=df[['latitude', 'longitude', 'speed']].to_numpy(), sensor_name='novatel_gps_position', z_names=['latitude', 'longitude', 'velocity'], R_array=NovatelGPSPosition.generate_R_matrix_from_df(df), active=settings['novatel_position']['active'])) ukf.add_sensor(CUREDiffWheelspeed(filter=ukf, z_array=df['wheelspeed_fl'].to_numpy(), active=settings['wheelspeed']['fl_active'], right=False, sensor_name='ros_wheelspeed_fl', z_names=['omega_fl'], gear_ratio=1, track_width=settings['wheelspeed']['track_width'], wheel_radius=settings['wheelspeed']['wheel_radius'], R=settings['wheelspeed']['R'])) ukf.add_sensor(CUREDiffWheelspeed(filter=ukf, z_array=df['wheelspeed_fr'].to_numpy(), active=settings['wheelspeed']['fr_active'], right=True, sensor_name='ros_wheelspeed_fr', z_names=['omega_fr'], gear_ratio=1, track_width=settings['wheelspeed']['track_width'], wheel_radius=settings['wheelspeed']['wheel_radius'], R=settings['wheelspeed']['R'])) ukf.add_sensor(CUREDiffWheelspeed(filter=ukf, z_array=df['wheelspeed_rl'].to_numpy(), active=settings['wheelspeed']['rl_active'], right=False, sensor_name='ros_wheelspeed_rl', z_names=['omega_rl'], gear_ratio=settings['wheelspeed']['rear_gear_ratio'], track_width=settings['wheelspeed']['track_width'], wheel_radius=settings['wheelspeed']['wheel_radius'], R=settings['wheelspeed']['R'])) ukf.add_sensor(CUREDiffWheelspeed(filter=ukf, z_array=df['wheelspeed_rr'].to_numpy(), active=settings['wheelspeed']['rr_active'], right=True, sensor_name='ros_wheelspeed_rr', z_names=['omega_rr'], gear_ratio=settings['wheelspeed']['rear_gear_ratio'], track_width=settings['wheelspeed']['track_width'], wheel_radius=settings['wheelspeed']['wheel_radius'], R=settings['wheelspeed']['R'])) saver = helpers.Saver(ukf) if profiling is True: Path(f'plots/profiling/{test_case}').mkdir(parents=True, exist_ok=True) cProfile.runctx( 'ukf.run_batch(saver)', globals(), locals(), f'plots/profiling/{test_case}/profiling.pstats' ) os.system(f'python scripts/utils/gprof2dot.py -f pstats plots/profiling/{test_case}/profiling.pstats' f'| dot -Tsvg -o plots/profiling/{test_case}/{test_case}.svg') else: ukf.run_batch(saver=saver) # for sensor in ukf.sensors: # sensor.postprocess(saver) if saving_saver is True: saver.pickle(test_case=test_case) if plotting is True: plot.plot_standard_plots(saver=saver, start=0, end=None, test_case=test_case, map_cones=data_import.import_map_csv('data/cure/may/trackdrive/run_373_moved.csv'), map_x_offset=0, map_y_offset=0, map_rotation=0)