"""
"""
from __future__ import annotations
import os
import cProfile
import warnings
from pathlib import Path
from typing import List, Optional, Set, Tuple
import logging
import coloredlogs
import numpy as np
import numpy.typing as npt
from scipy import interpolate
from statsmodels.stats.correlation_tools import cov_nearest
import yaml
from filterpy.kalman.UKF import UnscentedKalmanFilter
from sigma_points import MerwePoints
from utils import plot, vehicle_dynamics, math, helpers
from sensors import CUREDiffWheelspeed, Sensor, LocalMapping, NovatelGPSPosition, NovatelGPSHeading
profiling = True
plotting = True
saving_saver = True
test_case = 'improve_viz'
logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)
coloredlogs.install(level='INFO', logger=logger)
[docs]class UKF(UnscentedKalmanFilter):
"""
Unscented Kalman Filter class which inherits from the UKF implementation in filterpy.
Filter used to combine control inputs from the steering wheel and measurements from the IMU,
the wheel speed sensors and the GPS. The goal is to approximate the driven track of our car
as accurately as possible. The control inputs are used to predict the next state vector which
is then updated using the measurements from the sensors.
Attributes
----------
x : npt.NDArray
State vector of x, y, v_x, psi.
x_names : List[str]
Names of the different parameters in x.
u : npt.NDArray
Control input at one timestamp.
ros_u : npt.NDArray
Variable to prevent u from being overwritten during ROS callbacks.
u_array : npt.NDArray
Control inputs at all timestamps.
u_names : List[str]
Name of the control input in u.
t_array : npt.NDArray
Timestamps where measurements or control inputs exist.
sensors : List[Sensor]
The sensors used for the update step.
They hold the measurements z, covariances R and the transformation function Hx.
last_prediction : int
Index of the control input for which the last prediction was done.
predict_tolerance : float
Time in which the same prediction can be used. If exceeded, a new prediction is necessary.
P : npt.NDArray
State covariance matrix. Holds the covariances of all components of x.
Q : npt.NDArray
Process covariance matrix.
landmark_ids : List[float]
List of ids of tracked landmarks.
fake_landmark_ids : List[float]
List of ids of tracked landmarks to send out. Used to fake ids for skidpad cones for planning.
landmark_perceived_n : List[int]
List of how often this specifc landmark has already been perceived.
mapping : List[npt.NDArray]
Mapping / association between the current observed landmarks and all tracked landmarks.
local_mapping_updates : List[int]
List of all time steps the kalman filter has been updated with the local mapping.
local_mapping_nodes_explored_n : int
Number of explored nodes during data association.
local_mapping_compared_associations_n : int
Number of finally compared best associations object during data association.
individual_compatability : Optional[npt.NDArray]
Individual compatibility between the observed and observable landmarks of the last local mapping update.
observable_to_global_mapping : Optional[npt.NDArray]
Mapping between the observable and all tracked landmarks of the last local mapping update.
update_strategy : helpers.UpdateStrategy
Update strategy to use during the update step.
initial_world_heading : Optional[float]
Vehicle heading in world frame during first measurement.
initial_map_heading : Optional[float]
Vehicle heading in map frame during first vehicle heading in world frame measurement.
executed_steps : List[str]
String representation of executed steps (predict, update for sensors) for this step.
template_Q : npt.NDArray
Template for robot pose process noise Q which only needs to be multiplied by dt for current predict step.
max_z_dim : int
Maximum dimension of measurements of all sensors.
max_decimals : int
Maximum number of decimal places behind the comma for the state noise matrix.
observed_landmarks : npt.NDArray
Currently observed landmark relative to vehicle pose.
observable_landmarks : npt.NDArray
Currently observable landmarks relative to vehicle pose.
observable_state_covariance : npt.NDArray
Covariance of currently observable landmark.
sigma_alpha : float
Parameter for Merwe Sigma Points.
minimal_cone_covariance_area : float
Minimal size of the covariance ellipse.
perceived_n_threshold : int
How many times the landmark should been perceived to not be discarded during map clean up.
steering_wheel_to_ackermann_lut :
Look up table to convert steering wheel angle (degrees) to ackermann angle (degrees).
"""
def __init__(self, dt: float = 0.01, update_strategy: helpers.UpdateStrategy = helpers.UpdateStrategy.ALL_STATES,
sigma_alpha: float = 0.1, minimal_cone_covariance_area: float = 0.15, u_dim: int = None,
u_array: npt.NDArray = None, t_array: npt.NDArray = None, name: str = 'ukf',
P: List[float] = [0.1, 0.1, 0.2, 3], template_Q: List[float] = [1, 1, 5, 15],
perceived_n_threshold: int = 1, steering_model: Optional[dict] = None,
steering_wheel_to_ackermann_lut: Optional[interpolate.interp1d] = None) -> None:
"""
Initialization function.
Parameters
----------
dt : float, optional
Time in which the same prediction can be used. If exceeded, a new prediction is necessary, by default 0.01
update_strategy : UpdateStrategy, optional
Update strategy to use during the update step, by default UpdateStrategy.ALL_STATES
u_dim : int, optional
Dimension of control inputs, by default None
u_array : npt.NDArray, optional
Control inputs at all timestamps, by default None
t_array : npt.NDArray, optional
Timestamps where measurements or control inputs exist, by default None
P: List[float], Optional
State covariance matrix. Holds the covariances of all components of x.
template_Q: List[float], optional
Template for robot pose process noise Q which only needs to be multiplied by dt for current predict step.
perceived_n_threshold : int, optional
How many times the landmark should been perceived to not be discarded during map clean up.
"""
assert steering_wheel_to_ackermann_lut is not None or steering_model is not None
self.x_r_dim = 4 # robot pose x
self.x = np.zeros(self.x_r_dim)
# Logic to decide where to get the dimension of u from
if u_array is not None:
u_array = u_array.reshape((u_array.shape[0], u_array.shape[-1]))
if u_dim is not None and u_array is not None and u_dim != u_array.shape[-1]:
u_dim = u_array.shape[-1]
warnings.warn('Warning: The specified dimension of u is not the same as the dimension of the given u_array.'
' We used the dimension of the u_array.')
elif u_dim is None and u_array is not None:
u_dim = u_array.shape[-1]
elif u_dim is None:
u_dim = 0
self.u = np.full(u_dim, np.nan)
self.ros_u = np.full(u_dim, np.nan)
self.max_z_dim = 1
self.u_array = u_array
self.t_array = t_array
self.sensors: List[Sensor] = []
self.last_prediction: int = 0
self.update_strategy = update_strategy
self.executed_steps: List[str] = []
self.sensor_updated_z_indices: List[npt.NDArray] = []
self.predict_tolerance = dt
self.x_names = ['x', 'y', 'v_x', 'psi']
self.u_names = ['delta', 'a_x', 'psi_dot']
self.current_global_landmarks: Optional[npt.NDArray] = None
self.initial_heading_t: Optional[float] = None
self.initial_world_heading: Optional[float] = None
self.initial_map_heading: Optional[float] = None
self.landmark_ids: List[float] = []
self.fake_landmark_ids: List[float] = []
self.landmark_perceived_n: List[int] = []
self.perceived_n_threshold = perceived_n_threshold
self.mapping: List[npt.NDArray] = []
self.individual_compatability: Optional[npt.NDArray] = None
self.observable_to_global_mapping: Optional[npt.NDArray] = None
self.local_mapping_updates: List[int] = []
self.local_mapping_nodes_explored_n: int = 0
self.local_mapping_compared_associations_n: int = 0
template_Q[3] = np.radians(template_Q[3])
self.template_Q = np.diag(np.array(template_Q)**2)
self.observed_landmarks: npt.NDArray = None
self.observable_landmarks: npt.NDArray = None
self.observable_state_covariance: npt.NDArray = None
self.sigma_alpha = sigma_alpha
self.minimal_cone_covariance_area = minimal_cone_covariance_area
self.name = name
if steering_wheel_to_ackermann_lut is not None:
self.steering_wheel_to_ackermann_lut = steering_wheel_to_ackermann_lut
else:
self.calculate_inverse_steering_lookup_table(model=steering_model)
super().__init__(dim_x=self.x.shape[0],
dim_z=0,
dt=dt,
hx=Sensor.hx,
fx=self.f_x,
points=MerwePoints(n=self.x.shape[0], alpha=self.sigma_alpha, beta=2,
kappa=3-self.x.shape[0], subtract=self.residual_sigmaf_and_x),
x_mean_fn=self.state_mean)
P[3] = np.radians(P[3])
self.P = np.diag(np.array(P)**2)
self.max_decimals = 7
def __repr__(self):
return f'<UKF {self.name}>'
[docs] def calculate_inverse_steering_lookup_table(self, model: dict, num: int = 50):
# define the range of the steering_wheel
steering_wheel_angle = np.linspace(-0.5 * model['steering_range'], 0.5 * model['steering_range'], num=num)
# calculate the corresponding Ackermann-Angle (between x-axis an tire) for each tire
ackermann_angle = np.empty_like(steering_wheel_angle)
for i in range(ackermann_angle.shape[0]):
ackermann_angle[i] = vehicle_dynamics.steering_wheel_to_ackermann(steering_wheel_angle[i], model=model)
# return the array for the steering_wheel_anngle and the corresponding angle of the tire
self.steering_wheel_to_ackermann_lut = interpolate.interp1d(np.degrees(steering_wheel_angle), ackermann_angle)
[docs] def copy(self, x: npt.NDArray, P: npt.NDArray, timestamp: float, name: str = 'default') -> 'UKF':
copy = UKF(dt=self.predict_tolerance, update_strategy=self.update_strategy, sigma_alpha=self.sigma_alpha,
minimal_cone_covariance_area=self.minimal_cone_covariance_area, u_dim=self.u.shape[0],
steering_wheel_to_ackermann_lut=self.steering_wheel_to_ackermann_lut,
perceived_n_threshold=self.perceived_n_threshold)
copy.template_Q = self.template_Q
copy.P = P.copy()
copy.x = x.copy()
copy.landmark_ids = self.landmark_ids.copy()
copy.fake_landmark_ids = self.fake_landmark_ids.copy()
copy.landmark_perceived_n = self.landmark_perceived_n.copy()
copy.t = timestamp
copy.name = name
# only set world and map heading if timestamp of copy is later than transformer definition
if self.initial_heading_t is not None and timestamp > self.initial_heading_t:
copy.initial_world_heading = self.initial_world_heading
copy.initial_map_heading = self.initial_map_heading
copy.initial_heading_t = self.initial_heading_t
for sensor in self.sensors:
copy.add_sensor(sensor.copy(filter=copy, timestamp=timestamp))
copy.sensor_z = self.sensor_z.copy()
copy.sensor_R = self.sensor_R.copy()
copy.max_z_dim = self.max_z_dim
return copy
[docs] def sensor(self, sensor_name: str) -> Tuple[int, Sensor]:
"""
Return the index and the sensor object given the sensor name.
...
Parameters
----------
sensor_name : str
Name of the sensor.
Returns
-------
Tuple[int, Sensor]
The index and the sensor object.
Raises
------
KeyError
Raised if the sensor can not be found in the list of the filter's sensors.
"""
for i, sensor in enumerate(self.sensors):
if sensor.sensor_name == sensor_name:
return i, sensor
raise KeyError(f'The sensor {sensor_name} is not in the list of sensors.')
@property
def x_necessary(self) -> npt.NDArray:
"""
Returns the necessary states for the prediction and update based on self.x_indices_to_update.
Returns
-------
npt.NDArray
Necessary states for the prediction and update based on self.x_indices_to_update.
"""
if self.update_strategy == helpers.UpdateStrategy.ALL_STATES:
return self.x
elif self.update_strategy == helpers.UpdateStrategy.ONLY_ROBOT_POSE_OR_ALL_STATES:
if len(self.x_indices_to_update) <= self.x_r_dim:
return self.x[:self.x_r_dim]
else:
return self.x
elif self.update_strategy == helpers.UpdateStrategy.ONLY_ROBOT_POSE_AND_NECESSARY_STATES:
state_indices = np.sort(np.array(list(self.x_indices_to_update), dtype=int))
return self.x[state_indices]
@x_necessary.setter
def x_necessary(self, value: npt.NDArray):
"""
Sets the necessary states for the prediction and update based on self.x_indices_to_update.
Parameters
----------
value : npt.NDArray
State variables values to set.
"""
if self.update_strategy == helpers.UpdateStrategy.ALL_STATES:
self.x = value
elif self.update_strategy == helpers.UpdateStrategy.ONLY_ROBOT_POSE_OR_ALL_STATES:
if len(self.x_indices_to_update) <= self.x_r_dim:
self.x[:self.x_r_dim] = value
else:
self.x = value
elif self.update_strategy == helpers.UpdateStrategy.ONLY_ROBOT_POSE_AND_NECESSARY_STATES:
state_indices = np.sort(np.array(list(self.x_indices_to_update), dtype=int))
self.x[state_indices] = value
@property
def P_necessary(self) -> npt.NDArray:
"""
Returns the necessary state noises for the prediction and update based on self.x_indices_to_update.
Returns
-------
npt.NDArray
Necessary state noises for the prediction and update based on self.x_indices_to_update.
"""
if self.update_strategy == helpers.UpdateStrategy.ALL_STATES:
return self.P
elif self.update_strategy == helpers.UpdateStrategy.ONLY_ROBOT_POSE_OR_ALL_STATES:
if len(self.x_indices_to_update) <= self.x_r_dim:
robot_pose_state_indices = np.arange(self.x_r_dim, dtype=int)
return self.P[np.ix_(robot_pose_state_indices, robot_pose_state_indices)]
else:
return self.P
elif self.update_strategy == helpers.UpdateStrategy.ONLY_ROBOT_POSE_AND_NECESSARY_STATES:
state_indices = np.sort(np.array(list(self.x_indices_to_update), dtype=int))
return self.P[np.ix_(state_indices, state_indices)]
@P_necessary.setter
def P_necessary(self, value: npt.NDArray):
"""
Sets the necessary state noises for the prediction and update based on self.x_indices_to_update.
Parameters
----------
value : npt.NDArray
State noises to set.
"""
if self.update_strategy == helpers.UpdateStrategy.ALL_STATES:
self.P = value
elif self.update_strategy == helpers.UpdateStrategy.ONLY_ROBOT_POSE_OR_ALL_STATES:
if len(self.x_indices_to_update) <= self.x_r_dim:
robot_pose_state_indices = np.arange(self.x_r_dim, dtype=int)
self.P[np.ix_(robot_pose_state_indices, robot_pose_state_indices)] = value
else:
self.P = value
elif self.update_strategy == helpers.UpdateStrategy.ONLY_ROBOT_POSE_AND_NECESSARY_STATES:
state_indices = np.sort(np.array(list(self.x_indices_to_update), dtype=int))
self.P[np.ix_(state_indices, state_indices)] = value
@property
def x_R(self) -> npt.NDArray:
"""
Returns all physical state variables concerning the robot pose.
Returns
-------
npt.NDArray
State vector of the robot pose. [x, y, v_x, psi].
"""
return self.x[:self.x_r_dim]
@x_R.setter
def x_R(self, value: npt.NDArray):
"""
Sets all physical state variables concerning the robot pose.
"""
self.x[:self.x_r_dim] = value
@property
def landmarks(self) -> npt.NDArray:
"""
Returns cartesian coordinates of all tracked landmarks.
Returns
-------
npt.NDArray
Cartesian coordinates of all tracked landmarks.
"""
return np.reshape(self.x[self.x_r_dim:], (-1, 2))
@property
def landmarks_prior(self) -> npt.NDArray:
"""
Returns cartesian coordinates of all prior tracked landmarks.
Returns
-------
npt.NDArray
Cartesian coordinates of all prior tracked landmarks.
"""
return np.reshape(self.x_prior[self.x_r_dim:], (-1, 2))
@landmarks.setter
def landmarks(self, value: npt.NDArray):
self.x[self.x_r_dim:] = value.flatten()
@property
def x_R_prior(self) -> npt.NDArray:
"""
Returns prior of the state vector of the robot pose.
Returns
-------
npt.NDArray
Prior of the state vector of the robot pose.
"""
return self.x_prior[:self.x_r_dim]
@property
def x_R_post(self) -> npt.NDArray:
"""
Returns posterior of the state vector of the robot pose.
Returns
-------
npt.NDArray
Post of the state vector of the robot pose.
"""
return self.x_post[:self.x_r_dim]
@property
def P_R(self) -> npt.NDArray:
"""
Returns state noise covariance matrix of the robot pose state vector.
Returns
-------
npt.NDArray
State noise covariance matrix of the robot pose state vector.
"""
return self.P[:self.x_r_dim, :self.x_r_dim]
@P_R.setter
def P_R(self, value: npt.NDArray):
"""
Sets state noise covariance matrix of the robot pose state vector.
"""
self.P[:self.x_r_dim, :self.x_r_dim] = value
@property
def P_R_post(self) -> npt.NDArray:
"""
Returns posterior of the state noise covariance matrix of the robot pose state vector.
Returns
-------
npt.NDArray
Post of the state noise covariance matrix of the robot pose state vector.
"""
return self.P_post[:self.x_r_dim, :self.x_r_dim]
@property
def P_R_prior(self) -> npt.NDArray:
"""
Returns prior of the state noise covariance matrix of the robot pose state vector.
Returns
-------
npt.NDArray
Prior of the state noise covariance matrix of the robot pose state vector.
"""
return self.P_prior[:self.x_r_dim, :self.x_r_dim]
[docs] def calculate_Q(self, dt: float, x_dim: int) -> None:
"""
Calculate state transition noise covariance matrix Q based on the current dt.
Parameters
----------
dt : float
Current dt of the prediction step.
x_dim : int
Number of state variable dimension to use for prediction.
"""
self.Q = np.zeros((x_dim, x_dim))
self.Q[:self.x_r_dim, :self.x_r_dim] = self.template_Q * dt
[docs] @staticmethod
def f_x(sigma_points: npt.NDArray, dt: int, u: npt.NDArray, steering_wheel_to_ackermann_lut) -> npt.NDArray:
"""
Calculate the predicted next state based on the current state x,the timestep t and the control input u.
Using a simple bicycle model that disregards any side slip angle.
.. figure:: images/f_x_model.png
:class: with-border
This is the caption.
Parameters
----------
sigma_points : npt.NDArray
All sigma points around the current state vector.
dt : int
Timestep since last predict.
u : npt.NDArray
Current control input vector of the system.
Returns
-------
npt.NDArray
New estimated state vector of the system
"""
sigma_points[:, 2] += u[1] * dt
sigma_points[:, 3] += np.radians(u[2]) * dt
steering_wheel_angle = u[0]
velocity = sigma_points[:, 2]
distance = velocity * dt
ackermann = steering_wheel_to_ackermann_lut(steering_wheel_angle)
if ackermann != 0:
turning_radius = vehicle_dynamics.ackermann_to_radius(ackermann)
alpha = distance / np.abs(turning_radius)
local_pos = np.array([np.abs(turning_radius) * np.sin(alpha), turning_radius * (1 - np.cos(alpha))]).T
else:
local_pos = np.stack([distance, np.zeros(sigma_points.shape[0])], axis=1)
rot_matrix = np.empty((sigma_points.shape[0], 2, 2))
cos, sin = np.cos(-sigma_points[0, 3]), np.sin(-sigma_points[0, 3])
rot_matrix[:, 0, 0], rot_matrix[:, 0, 1], rot_matrix[:, 1, 0], rot_matrix[:, 1, 1] = cos, sin, -sin, cos
rot_pos = np.einsum("ijk, ijk -> ij", rot_matrix, local_pos.reshape(-1, 1, 2))
sigma_points[:, 0:2] += rot_pos
return sigma_points
[docs] @staticmethod
def state_mean(sigmas: npt.NDArray, Wm: npt.NDArray) -> npt.NDArray:
"""
Calculate the mean of the sigma points and weights.
The mean for the angle in the state vector has to be calculated seperately.
Parameters
----------
sigmas : npt.NDArray
Sigma Points used in the filter.
Wm : npt.NDArray
Weights of the sigma points.
Returns
-------
npt.NDArray
Mean of the sigma points.
"""
x = np.average(sigmas, axis=0, weights=Wm)
# since psi is an angle, the mean has to be calculated in a very special way
x[3] = math.circular_mean(sigmas[:, 3], Wm, normalize=False) # psi
return x
[docs] def cross_variance(self, x, z, sigmas_f, sigmas_h):
"""
Compute cross variance of the state `x` and measurement `z`.
"""
dx = self.residual_sigmaf_and_x(sigmas_f=sigmas_f, x=x)
dz = self.residual_z(sigmas_h, z)
Pxz = np.around(np.einsum('l,ln,lm->nm', self.Wc, dx, dz), decimals=self.max_decimals)
return Pxz
[docs] @staticmethod
def residual_sigmaf_and_x(sigmas_f: npt.NDArray[np.floating],
x: npt.NDArray[np.floating]) -> npt.NDArray[np.floating]:
"""
Calculate the difference the sigma points and the state vector.
As the state vector contains an angle, the result from the subtraction has to be
normalized to the interval [-pi:pi].
Parameters
----------
sigmas_f : npt.NDArray
Sigma point.
x : npt.NDArray
Current state vector.
Returns
-------
npt.NDArray
Difference between sigma points and state vector.
"""
dx = sigmas_f - x
return dx
[docs] def add_sensor(self, sensor_obj: Sensor) -> Sensor:
"""
Add a new sensor object to the filter.
Possible options for sensor classes are defined in the sensors folder.
Parameters
----------
sensor_obj : Sensor
The sensor object to add.
Returns
-------
sensor_obj : Sensor
The added sensor.
"""
sensor_obj.sensor_i = len(self.sensors)
self.sensors.append(sensor_obj)
return sensor_obj
[docs] def recreate_sigma_points(self, x_dim: int):
self.points_fn = MerwePoints(n=x_dim, alpha=self.sigma_alpha, beta=2,
kappa=3-x_dim, subtract=self.residual_sigmaf_and_x)
self._num_sigmas = self.points_fn.num_sigmas()
self._dim_x = x_dim
self.sigmas_f = np.zeros((self._num_sigmas, self._dim_x))
self.sigmas_h = np.zeros((self._num_sigmas, self._dim_z))
self.Wm, self.Wc = self.points_fn.Wm, self.points_fn.Wc
[docs] def compute_process_sigmas(self, dt, fx=None, **fx_args):
"""
computes the values of sigmas_f. Normally a user would not call
this, but it is useful if you need to call update more than once
between calls to predict (to update for multiple simultaneous
measurements), so the sigmas correctly reflect the updated state
x, P.
"""
if fx is None:
fx = self.fx
# calculate sigma points for given mean and covariance
sigmas = self.points_fn.sigma_points(self.x_necessary, self.P_necessary)
self.sigmas_f = self.fx(sigmas, dt,
steering_wheel_to_ackermann_lut=self.steering_wheel_to_ackermann_lut, **fx_args)
[docs] def residual_sigmash_and_z(self, sigmas_h: npt.NDArray, z: npt.NDArray) -> npt.NDArray:
"""
Calculates the difference between the measurement sigma points and the measurement for all sensors.
Parameters
----------
sigmas_h : npt.NDArray
Sigma points for all measurements.
z : npt.NDArray
Measurements of all sensors.
Returns
-------
npt.NDArray
Difference between sigma points and measurements of all sensors.
"""
y = np.empty_like(sigmas_h)
last_z_index = 0
for sensor in self.sensors_to_update:
z_end_index = last_z_index + sensor.z_to_update.shape[0]
z_indices = np.arange(last_z_index, z_end_index)
y[..., z_indices] = sensor.residual_sigmash_and_z(sigmas_h=sigmas_h[..., z_indices], z=z[z_indices])
last_z_index = z_end_index
return y
[docs] def measurement_mean(self, sigmas: npt.NDArray, Wm: npt.NDArray) -> npt.NDArray:
"""
Calculates the weighted mean of all sigma points of the measurement vectors of all sensors.
Parameters
----------
sigmas : npt.NDArray
Sigma points of the measurement vector of all sensors..
Wm : npt.NDArray
Weights for the calculation of the weighted mean of the measurement.
Returns
-------
npt.NDArray
Weighted mean of the sigma points of the measurement vectors of all sensors.
"""
zp = np.empty((sigmas.shape[1]))
last_z_index = 0
for sensor in self.sensors_to_update:
z_end_index = last_z_index + sensor.z_to_update.shape[0]
z_indices = np.arange(last_z_index, z_end_index)
zp[z_indices] = sensor.measurement_mean(sigmas=sigmas[:, z_indices], Wm=Wm)
last_z_index = z_end_index
return zp
[docs] def update(self, u: Optional[npt.NDArray] = None):
"""
Update the UKF with the given measurements. On return,
self.x and self.P contain the new mean and covariance of the filter.
Parameters
----------
u : Optional[npt.NDArray]
Current control input vector of the system.
"""
# pass prior sigmas through h(x) to get measurement sigmas
# the shape of sigmas_h will vary if the shape of z varies, so
# recreate each time
x = self.x_necessary
P = self.P_necessary
# Calculate how many measurements there will be and
# initialize enitre measurement vector, measurement noise matrix and sigma points
z_n = sum([sensor.z_to_update.shape[0] for sensor in self.sensors_to_update])
sigmas_h = np.empty((self.sigmas_f.shape[0], z_n))
R = np.zeros((z_n, z_n))
z = np.empty((z_n))
# calculate sigma points for based on the respective masurement for every sensor
last_z_index = 0
self.sensor_updated_z_indices = []
for sensor in self.sensors_to_update:
z_end_index = last_z_index + sensor.z_to_update.shape[0]
z_indices = np.arange(last_z_index, z_end_index)
sigmas_h[:, z_indices] = sensor.hx(sigmas_f=self.sigmas_f, u=u)
R[np.ix_(z_indices, z_indices)] = sensor.R_to_update
z[z_indices] = sensor.z_to_update
last_z_index = z_end_index
self.executed_steps.append(sensor.sensor_name)
self.sensor_updated_z_indices.append(z_indices)
self.sigmas_h = np.atleast_2d(sigmas_h)
# mean and covariance of prediction passed through unscented transform
zp, self.S = self.unscented_transform(self.sigmas_h, self.Wm, self.Wc, R,
self.measurement_mean, self.residual_z)
self.zp = zp
self.SI = self.inv(self.S)
# compute cross variance of the state and the measurements
Pxz = self.cross_variance(x, zp, self.sigmas_f, self.sigmas_h)
self.K = np.dot(Pxz, self.SI) # Kalman gain
self.y = self.residual_sigmash_and_z(z, zp) # residual
# update Gaussian state estimate (x, P)
x = x + np.dot(self.K, self.y)
P -= np.around(np.dot(self.K, np.dot(self.S, self.K.T)), decimals=self.max_decimals)
# limit minimal cone covariance
for i in range(self.x_r_dim, x.shape[0], 2):
factor = np.pi * np.product(np.linalg.eigvals(P[i:i+2, i:i+2])) / self.minimal_cone_covariance_area
if factor < 1:
P[i:i+2, i:i+2] /= factor
self.x_necessary = x
self.P_necessary = P
# save measurement and posterior state
self.z = z.copy()
self.R = R.copy()
self.x_post = self.x.copy()
self.P_post = self.P.copy()
# set to None to force recompute
self._log_likelihood = None
self._likelihood = None
self._mahalanobis = None
[docs] def predict(self, dt=None, UT=None, fx=None, **fx_args):
r"""
Performs the predict step of the UKF. On return, self.x and
self.P contain the predicted state (x) and covariance (P). '
Important: this MUST be called before update() is called for the first
time.
Parameters
----------
dt : double, optional
If specified, the time step to be used for this prediction.
self._dt is used if this is not provided.
fx : callable f(x, **fx_args), optional
State transition function. If not provided, the default
function passed in during construction will be used.
UT : function(sigmas, Wm, Wc, noise_cov), optional
Optional function to compute the unscented transform for the sigma
points passed through hx. Typically the default function will
work - you can use x_mean_fn and z_mean_fn to alter the behavior
of the unscented transform.
**fx_args : keyword arguments
optional keyword arguments to be passed into f(x).
"""
if dt is None:
dt = self._dt
if UT is None:
UT = self.unscented_transform
# calculate sigma points for given mean and covariance
self.compute_process_sigmas(dt, fx, **fx_args)
# and pass sigmas through the unscented transform to compute prior
# only use the robot pose states for the predict step.
self.x_necessary, self.P_necessary = UT(self.sigmas_f, self.Wm, self.Wc, self.Q,
self.x_mean, self.residual_sigmaf_and_x)
# save prior
self.x_prior = np.copy(self.x)
self.P_prior = np.copy(self.P)
[docs] def prepare_update(self):
"""
Prepare the updates for all sensors.
"""
self.sensors_to_update: List[Sensor] = []
self.x_indices_to_update: Set[int] = set(np.arange(self.x_r_dim, dtype=int))
for sensor in self.sensors:
if sensor.prepare_update():
self.sensors_to_update.append(sensor)
self.x_indices_to_update.update(sensor.x_indices_to_update)
[docs] def try_predict(self, dt: float, u: npt.NDArray):
"""
Try to predict based on timestep and control input, correct state noise
if there is a non-semi-positive-definite matrix and try again.
Parameters
----------
dt : float
Timestep since last predict.
u : npt.NDArray
Control input.
"""
x_dim = self.x_necessary.shape[0]
self.calculate_Q(dt=dt, x_dim=x_dim)
if self.points_fn.n != x_dim:
self.recreate_sigma_points(x_dim=x_dim)
# TODO: vllt vorgenerieren
try:
self.predict(dt=dt, u=u, UT=self.unscented_transform)
except np.linalg.LinAlgError:
logging.critical(f'P is not semi-positive-definite (predict after {self.executed_steps[-1]}).'
' Finding nearest covariance matrix and trying again!')
self.P = cov_nearest(self.P)
self.predict(dt=dt, u=u, UT=self.unscented_transform)
self.executed_steps = ['predict']
[docs] def try_update(self, u: npt.NDArray):
"""
Try to predict based on control input, correct state noise
if there is a non-semi-positive-definite matrix and try again.
Parameters
----------
u : npt.NDArray
Control input.
"""
try:
self.update(u=u)
except np.linalg.LinAlgError:
logging.critical('P is not positive definite (update).'
'Finding nearest covariance matrix and trying again!')
self.P = cov_nearest(self.P)
self.update(u=u)
[docs] def postprocess_update(self):
"""
Postprocess update for all sensors if the respective flag specifies.
"""
for sensor in self.sensors:
if sensor.execute_update_postprocess is True:
sensor.postprocess_update()
[docs] def cleanup_map(self):
landmark_perceived_n = np.array(self.landmark_perceived_n)
condition = landmark_perceived_n >= self.perceived_n_threshold
state_condition = np.concatenate([np.full(self.x_r_dim, True), np.dstack([condition, condition]).flatten()])
self.x = self.x[state_condition]
self.P = self.P[np.ix_(state_condition, state_condition)]
self.landmark_ids = np.array(self.landmark_ids)[condition].tolist()
self.fake_landmark_ids = np.array(self.fake_landmark_ids)[condition].tolist()
self.landmark_perceived_n = landmark_perceived_n[condition].tolist()
[docs] def run_batch(self, saver: helpers.Saver) -> None:
"""
Run predict and update steps for the given control inputs and measurements.
Using the prerecorded measurements that have been passed during the initialization of the filter and sensors.
If there are values for all control inputs and the time since the last predict step is bigger than the predict
tolerance, a new prediction is made.
Updates are done with the mean of the meausrements since the last prediction.
Parameters
----------
saver : helpers.Saver
Saver object to save the filter object at every timestep.
"""
assert self.u_array is not None
assert self.t_array is not None
self.sensor_z = [None] * len(self.sensors)
self.sensor_R = [None] * len(self.sensors)
# Calculate the maximal z dimension based on the prerecorded measurement arrays.
self.max_z_dim = max([sensor.z_array.shape[1]
if sensor.z_array is not None and sensor.z_array.ndim > 1
else 1 for sensor in self.sensors])
local_mapping = self.sensor('local_mapping')[1]
prediction_step = 0
bar = helpers.generate_progressbar(self.u_array.shape[0]).start()
for i in range(self.u_array.shape[0]):
# if any control input is finite, remember it
if np.any(np.isfinite(self.u_array[i])):
mask = np.isfinite(self.u_array[i])
self.u[mask] = self.u_array[i][mask]
time_since_predict = self.t_array[i] - self.t_array[self.last_prediction]
# the following code contains the calling of the predict and update procedure
# it will only be executed if all control inputs have been defined already
# and either there has not been a predict in the last self.predict_tolerance seconds
# or if there is new data from the local_mapping
if np.all(np.isfinite(self.u)) and (time_since_predict > self.predict_tolerance
or np.all(np.isfinite(local_mapping.z_array[i]))):
# Gets the measurements of all sensors for this particular step,
for sensor in self.sensors:
sensor.get_prerecorded_z_and_R(last_prediction_step=self.last_prediction, i=i)
self.prepare_update()
self.try_predict(dt=time_since_predict, u=self.u)
self.try_update(u=self.u)
self.postprocess_update()
self.last_prediction = i
self.t = self.t_array[i]
saver.save()
prediction_step += 1
bar.update(value=i, prediction_step=prediction_step)
bar.finish()
if __name__ == '__main__':
from utils import data_import
df = data_import.RosbagAndMDFImporter(bag_filename='/home/martina/CURE/as_ros/rosbags/run_373_per.bag').df.loc[0:15]
with open('src/slam/param_study/test/configs/test_case_0.yaml', 'r') as config_file:
settings = yaml.safe_load(config_file)
ukf = UKF(u_array=df[['steering_wheel_angle', 'linear_acceleration.x', 'angular_velocity.z']].to_numpy(),
update_strategy=helpers.UpdateStrategy(settings['ukf']['update_strategy']),
sigma_alpha=settings['ukf']['sigma_alpha'],
minimal_cone_covariance_area=settings['ukf']['minimal_cone_covariance_area'],
P=settings['ukf']['P'],
template_Q=settings['ukf']['template_Q'],
u_dim=3,
t_array=np.asarray(df.index.to_numpy(), dtype=np.float32))
ukf.add_sensor(NovatelGPSHeading(filter=ukf,
z_array=df['heading'].to_numpy(),
sensor_name='novatel_gps_heading',
z_names=['world_heading'],
R_array=NovatelGPSHeading.generate_R_matrix_from_df(df),
active=settings['novatel_heading']['active']))
ukf.add_sensor(LocalMapping(filter=ukf,
z_array=df['cones'].to_numpy(),
active=settings['local_mapping']['active'],
sensor_name='local_mapping',
z_names=['landmarks'],
data_association=helpers.DataAssociation(
settings['local_mapping']['data_association']['method']),
use_kd_tree=settings['local_mapping']['data_association']['use_kd_tree'],
chi2_quantile=settings['local_mapping']['data_association']['chi2_quantile'],
fov_min_distance=settings['local_mapping']['fov_gate']['min_distance'],
fov_max_distance=settings['local_mapping']['fov_gate']['max_distance'],
fov_angle=settings['local_mapping']['fov_gate']['angle'],
fov_scaling=settings['local_mapping']['fov_gate']['scaling'],
probability_gate=settings['local_mapping']['probability_gate'],
R_azimuth=settings['local_mapping']['R_azimuth'],
R_distance_factor=settings['local_mapping']['R_distance_factor'],
R_offset=settings['local_mapping']['R_distance_offset']))
ukf.add_sensor(NovatelGPSPosition(filter=ukf,
z_array=df[['latitude', 'longitude', 'speed']].to_numpy(),
sensor_name='novatel_gps_position',
z_names=['latitude', 'longitude', 'velocity'],
R_array=NovatelGPSPosition.generate_R_matrix_from_df(df),
active=settings['novatel_position']['active']))
ukf.add_sensor(CUREDiffWheelspeed(filter=ukf,
z_array=df['wheelspeed_fl'].to_numpy(),
active=settings['wheelspeed']['fl_active'],
right=False,
sensor_name='ros_wheelspeed_fl',
z_names=['omega_fl'],
gear_ratio=1,
track_width=settings['wheelspeed']['track_width'],
wheel_radius=settings['wheelspeed']['wheel_radius'],
R=settings['wheelspeed']['R']))
ukf.add_sensor(CUREDiffWheelspeed(filter=ukf,
z_array=df['wheelspeed_fr'].to_numpy(),
active=settings['wheelspeed']['fr_active'],
right=True,
sensor_name='ros_wheelspeed_fr',
z_names=['omega_fr'],
gear_ratio=1,
track_width=settings['wheelspeed']['track_width'],
wheel_radius=settings['wheelspeed']['wheel_radius'],
R=settings['wheelspeed']['R']))
ukf.add_sensor(CUREDiffWheelspeed(filter=ukf,
z_array=df['wheelspeed_rl'].to_numpy(),
active=settings['wheelspeed']['rl_active'],
right=False,
sensor_name='ros_wheelspeed_rl',
z_names=['omega_rl'],
gear_ratio=settings['wheelspeed']['rear_gear_ratio'],
track_width=settings['wheelspeed']['track_width'],
wheel_radius=settings['wheelspeed']['wheel_radius'],
R=settings['wheelspeed']['R']))
ukf.add_sensor(CUREDiffWheelspeed(filter=ukf,
z_array=df['wheelspeed_rr'].to_numpy(),
active=settings['wheelspeed']['rr_active'],
right=True,
sensor_name='ros_wheelspeed_rr',
z_names=['omega_rr'],
gear_ratio=settings['wheelspeed']['rear_gear_ratio'],
track_width=settings['wheelspeed']['track_width'],
wheel_radius=settings['wheelspeed']['wheel_radius'],
R=settings['wheelspeed']['R']))
saver = helpers.Saver(ukf)
if profiling is True:
Path(f'plots/profiling/{test_case}').mkdir(parents=True, exist_ok=True)
cProfile.runctx(
'ukf.run_batch(saver)',
globals(),
locals(),
f'plots/profiling/{test_case}/profiling.pstats'
)
os.system(f'python scripts/utils/gprof2dot.py -f pstats plots/profiling/{test_case}/profiling.pstats'
f'| dot -Tsvg -o plots/profiling/{test_case}/{test_case}.svg')
else:
ukf.run_batch(saver=saver)
# for sensor in ukf.sensors:
# sensor.postprocess(saver)
if saving_saver is True:
saver.pickle(test_case=test_case)
if plotting is True:
plot.plot_standard_plots(saver=saver, start=0, end=None, test_case=test_case,
map_cones=data_import.import_map_csv('data/cure/may/trackdrive/run_373_moved.csv'),
map_x_offset=0, map_y_offset=0, map_rotation=0)