"""
Implementation of all utility functions regarding the plotting of any data regarding the kalman filter.
"""
import os
import glob
import logging
from pathlib import Path
from typing import Callable, List, Tuple, Optional, Dict
import numpy as np
import numpy.typing as npt
import pandas as pd
import json
import plotly.express as px
import plotly.graph_objects as go
from plotly.subplots import make_subplots
import matplotlib.pyplot as plt
import matplotlib.animation as animation
import matplotlib.transforms as transforms
import matplotlib.collections as clt
import matplotlib.widgets
import matplotlib.figure
from matplotlib.patches import Ellipse
from matplotlib.patches import Patch, PathPatch, FancyArrow
from matplotlib.lines import Line2D
from matplotlib.ticker import MultipleLocator, FormatStrFormatter, AutoMinorLocator
import mpl_toolkits.axes_grid1
from sensors import Sensor, LocalMapping, NovatelGPSHeading, NovatelGPSPosition
import ukf
from utils import geometry, helpers
[docs]def title_generator(saver: helpers.Saver, x_i: List[int] = [0], u_i: List[int] = [], z_i: List[int] = []) -> str:
"""
Generate title of plot.
Uses the names of states, control inputs and measurements to generate plot title.
Parameters
----------
saver : helpers.Saver
helpers.Saver object containing the data of the kalman filter.
x_i : List[int], optional
Indices of the states being plotted., by default [0]
u_i : List[int], optional
Indices of the control inputs being plotted., by default []
z_i : List[int], optional
Indices of the measurements being plotted., by default []
Returns
-------
str
Title of the plot.
"""
states = ', '.join([saver._kf.x_names[i] for i in x_i])
return f'Plot states {states} against control inputs'
[docs]def save_flow_json(flow_dict: Dict[int, List[str]], test_case: str = 'default'):
"""
Saves the flow of the kalman filter as json.
Parameters
----------
flow_dict : Dict[int, List[str]]
Dictionary containing the flow of the kalman filter.
test_case : str, optional
Name of the subfolder where the plot should be saved to., by default 'default'
"""
logging.info(msg=f'Saving flow json for test case \'{test_case}\'')
Path(f'{os.path.abspath(os.path.dirname(__file__))}/../../plots/flow/{test_case}').mkdir(parents=True, exist_ok=True)
with open(f'{os.path.abspath(os.path.dirname(__file__))}/../../plots/flow/{test_case}/flow.json', 'w') as flow_json_file:
json.dump(flow_dict, flow_json_file, indent=4)
[docs]def plot_flow(saver: helpers.Saver, test_case: str = 'default'):
"""
_summary_
Parameters
----------
saver : helpers.Saver
helpers.Saver object containing the data of the KalmanFilter.
test_case : str, optional
Name of the subfolder where the plot should be saved to., by default 'default'
"""
flow_dict = {}
for prediction_step, executed_steps in enumerate(saver.executed_steps):
flow_dict[prediction_step] = [str(executed_step) for executed_step in executed_steps]
save_flow_json(flow_dict=flow_dict, test_case=test_case)
[docs]def save_plot(fig: go.Figure, filename: str, test_case: str = 'default',
plot_html: bool = True, plot_png: bool = True) -> None:
"""
Saves the plot.
Saves the plot as html and/or png.
Parameters
----------
fig : go.Figure
Plot to be saved.
filename : str
Name of the file where the plot should be saved to.
test_case : str, optional
Name of the subfolder where the plot should be saved to., by default 'default'
plot_html : bool, optional
Specifies whether the plot should be saved as html., by default True
plot_png : bool, optional
Specifies whether the plot should be saved as png., by default True
"""
logging.info(msg=f'Saving \'{filename}\' plots for test case \'{test_case}\'')
if plot_html:
Path(f'{os.path.abspath(os.path.dirname(__file__))}/../../plots/html/{test_case}').mkdir(parents=True, exist_ok=True)
fig.write_html(f'{os.path.abspath(os.path.dirname(__file__))}/../../plots/html/{test_case}/{filename}.html')
if plot_png:
Path(f'{os.path.abspath(os.path.dirname(__file__))}/../../plots/images/{test_case}').mkdir(parents=True, exist_ok=True)
fig.write_image(f'{os.path.abspath(os.path.dirname(__file__))}/../../plots/images/{test_case}/{filename}.png')
[docs]class SensorPlotConfig():
"""
Class to specify how to plot the measurements of a sensor.
Specify what the sensor name is, that should be used, which measurements to convert with the hx_inverse function
and subsequently which converted states to plot.
Attributes
----------
sensor_name : str
Name of the sensor to be plotted.
z_i : List[int]
Indices of the measurements to be converted.
x_i : List[int]
Indices of the converted states to be plotted.
"""
def __init__(self, sensor_name: str, z_i: List[int], x_i: List[int]) -> None:
"""
Initialize sensor plot config.
...
Parameters
----------
sensor_name : str
Name of the sensor to be plotted.
z_i : List[int]
Indices of the measurements to be converted.
x_i : List[int]
Indices of the converted states to be plotted.
"""
self.sensor_name = sensor_name
self.z_i = z_i
self.x_i = x_i
[docs]def plot_state_measurement_control(
saver: helpers.Saver, x_i: List[int],
u_i: List[int] = [], sensor_configs: List[SensorPlotConfig] = [],
start: int = 0, end: Optional[int] = None, filename: str = 'plot', test_case: str = 'default'):
"""
Plot some state(s) against some measurement(s) and control input(s).
Uses sensor plot configs to plot measurements.
Plot the control inputs on a secondary axis.
Parameters
----------
saver : helpers.Saver
helpers.Saver object containing the data of the KalmanFilter
x_i : List[int]
Indices of the state(s) to be plotted.
u_i : List[int], optional
Indices of the control input(s) to be plotted., by default []
sensor_configs : List[SensorPlotConfig], optional
Sensor configs of the measurements to be plotted., by default []
start : int, optional
Timestamp from where to plot the data., by default 0
end : Optional[int], optional
Timestamp until where to plot the data., by default None
filename : str, optional
Name of the file where the plot should be saved to., by default 'plot'
test_case : str, optional
Name of the subfolder where the plot should be saved to., by default 'default'
"""
fig = make_subplots(specs=[[{'secondary_y': True}]])
t = saver.t[start:end]
# Plot posterior incl. variances
for i in x_i:
fig.add_trace(go.Scatter(x=t, y=np.array(saver.x_R_post)[start:end, i],
mode='lines+markers',
name=f'posterior of x[{i}]',
error_y=dict(
type='data', # value of error bar given in data coordinates
array=np.array(saver.P_R_post)[start:end, i, i],
visible=True)))
fig.add_trace(go.Scatter(x=t, y=np.array(saver.x_R_prior)[start:end, i],
mode='markers',
name=f'prior of x[{i}]',
error_y=dict(
type='data', # value of error bar given in data coordinates
array=np.array(saver.P_R_prior)[start:end, i, i],
visible=True)))
fig.add_trace(go.Scatter(x=t, y=np.array(saver.x_R)[start:end, i],
mode='markers',
name=f'x[{i}]'))
for i in u_i:
fig.add_trace(go.Scatter(x=saver.t, y=np.array(saver.u)[:, i],
mode='markers',
name=f'control input u[{i}]'),
secondary_y=True)
for sensor_conf in sensor_configs:
sensor: Sensor
sensor_i: int
sensor_i, sensor = saver._kf.sensor(sensor_conf.sensor_name)
y = sensor.hx_inverse(z=np.array(saver.sensor_z)[start:end, sensor_i, sensor_conf.z_i], u=np.array(saver.u))
for _x_i in sensor_conf.x_i:
fig.add_trace(
go.Scatter(x=t, y=y[:, _x_i],
mode='lines+markers',
name=f'measurements of {sensor.sensor_name}',
error_y=dict(
type='data', # value of error bar given in data coordinates
array=sensor.hx_inverse(z=np.sqrt(
np.array(saver.sensor_R)[start:end, sensor_i, sensor_conf.z_i, sensor_conf.z_i]),
u=np.array(saver.u))[:, _x_i],
visible=True)))
fig.update_layout(
title=title_generator(saver, x_i),
xaxis_title="Time t / s",
yaxis_title=', '.join([saver._kf.x_names[i] for i in x_i]),
# legend_title="Legend Title",
font=dict(
family="Courier New, monospace",
size=18,
color="RebeccaPurple"
)
)
save_plot(fig=fig, filename=filename, test_case=test_case)
[docs]def plot_track(saver: helpers.Saver, filename='track', test_case: str = 'default'):
"""
Plot and save the local estimated trajectory.
...
Parameters
----------
saver : helpers.Saver
helpers.Saver object containing the estimated data of the KalmanFilter.
filename : str, optional
Name of the file where the plot should be saved to, by default 'track'.
test_case : str, optional
Name of the subfolder where the plot should be saved to, by default 'default'.
"""
# local_mapping: LocalMapping = saver._kf.sensor('local_mapping')[1]
fig = go.Figure()
fig.add_trace(go.Scatter(x=np.array(saver.x_R)[:, 0], y=np.array(saver.x_R)[:, 1],
mode='lines+markers',
name='track',
hovertext=saver.t,
marker=dict(
color=list(range(np.array(saver.x_R).shape[0])),
colorscale="matter")))
saver._kf.landmark_ids
fig.add_trace(go.Scatter(x=saver.landmarks[-1][:, 0], y=saver.landmarks[-1][:, 1],
name='Landmarks',
mode='markers',
marker=dict(color=LocalMapping.get_color_for_cone_ids(np.array(saver._kf.landmark_ids)),
size=LocalMapping.get_size_for_cone_ids(np.array(saver._kf.landmark_ids)),
symbol='triangle-up')))
fig.update_yaxes(scaleanchor='x', scaleratio=1)
save_plot(fig=fig, filename=filename, test_case=test_case)
[docs]def plot_gps_coordinates(gps_coordinates: pd.DataFrame, lat_key: str = 'lat', lon_key: str = 'lon',
filename: str = 'gps', test_case: str = 'default'):
"""
Plot GPS Coordinates.
...
Parameters
----------
gps_coordinates : pd.DataFrame
Pandas dataframe containing the gps coordinates.
lat_key : str, optional
Key for the latitudes in the pandas dataframe., by default 'lat'
lon_key : str, optional
Key for the longitudes in the pandas dataframe., by default 'lon'
filename : str, optional
Name of the file where the plot should be saved to., by default 'gps'
test_case : str, optional
Name of the subfolder where the plot should be saved to., by default 'default'
"""
px.set_mapbox_access_token(
'pk.eyJ1Ijoib2xla2V0dGVybiIsImEiOiJja3kzajI1dzYwMmo3MnFvM3psYWg3YzM5In0.CveZ2yvlXSMo7r3R9ECAqQ')
fig = px.line_mapbox(gps_coordinates,
lat=lat_key,
lon=lon_key,
zoom=18,
height=1000,
# color_discrete_map='identity',
# color='color'
)
fig.update_layout(mapbox_style="satellite", mapbox_zoom=14, mapbox_center_lat=gps_coordinates[lat_key].iloc[0],
margin={'r': 0, 't': 0, 'l': 0, 'b': 0})
save_plot(fig=fig, filename=filename, test_case=test_case, plot_png=False)
[docs]def plot_animation(saver: helpers.Saver, filename: str = 'animation',
test_case: str = 'default', map_cones: pd.DataFrame = None,
map_x_offset: float = 0, map_y_offset: float = 0, map_rotation: float = 0) -> None:
"""
Plot and save animation of the vehicle position and landmark position and the respective uncertainty.
...
Parameters
----------
saver : helpers.Saver
helpers.Saver object containing the estimated data of the KalmanFilter.
filename : str, optional
Name of the file where the plot should be saved to, by default 'track'.
test_case : str, optional
Name of the subfolder where the plot should be saved to, by default 'default'.
map_cones : pd.DataFrame, optional
Pandas dataframe containing the ground truth positions of the landmarks of the map, by default None.
map_x_offset : float, optional
Offset to move the ground truth map along the x axis., by default 0
map_y_offset : float, optional
Offset to move the ground truth map along the y axis., by default 0
map_rotation : float, optional
Angle in degrees to rotate the ground truth map around after moving it., by default 0
"""
a = KalmanFilterAnimation(saver=saver, save_count=len(saver.x_R)-1, step_size=1, map_cones=map_cones, # noqa: F841
map_x_offset=map_x_offset, map_y_offset=map_y_offset, map_rotation=map_rotation)
# FFwriter = animation.FFMpegWriter(fps=10)
# a.fig.set_size_inches(20, 10, True)
# a.ani.save(f'plots/images/{test_case}/{filename}.mp4', writer=FFwriter, dpi=200)
plt.show()
[docs]def plot_standard_plots(saver: helpers.Saver, start: int = 0, end: Optional[int] = None,
test_case: str = 'default', map_cones=None, map_x_offset: float = 0,
map_y_offset: float = 0, map_rotation: float = 0) -> None:
"""
Plot all standard plots for the saver of the KalmanFilter.
Plot the estimated acceleration, angular rate, velocity, position, local and global trajectory
Parameters
----------
saver : helpers.Saver
helpers.Saver with the KalmanFilter data.
start : int, optional
Timestamp from where to plot the data., by default 0
end : Optional[int], optional
Timestamp until where to plot the data., by default None
test_case : str, optional
Name of the test case., by default 'default'
map_x_offset : float, optional
Offset to move the ground truth map along the x axis., by default 0
map_y_offset : float, optional
Offset to move the ground truth map along the y axis., by default 0
map_rotation : float, optional
Angle in degrees to rotate the ground truth map around after moving it., by default 0
"""
plot_track(saver, filename='track', test_case=test_case)
# plot_gps_coordinates(helpers.local_to_gps(np.array(saver.x_R), gamma=saver.x_R[0][5],
# lat_origin=49.5104631, lon_origin=7.8769798),
# test_case=test_case)
# # plot.plot_gps_coordinates(df[['GPS_LAT', 'GPS_LON']].dropna(), lat_key='GPS_LAT', lon_key='GPS_LON',
# # filename='gps_measurement', test_case=test_case)
plot_state_measurement_control(
saver, x_i=[2], start=start, end=end, filename='v_x', test_case=test_case, u_i=[1],
sensor_configs=[
SensorPlotConfig(sensor_name='wheelspeed_fl', z_i=[0], x_i=[0]),
SensorPlotConfig(sensor_name='wheelspeed_fr', z_i=[0], x_i=[0]),
SensorPlotConfig(sensor_name='wheelspeed_rl', z_i=[0], x_i=[0]),
SensorPlotConfig(sensor_name='wheelspeed_rr', z_i=[0], x_i=[0]),
SensorPlotConfig(sensor_name='gps_position', z_i=[0, 1, 2], x_i=[2])])
plot_state_measurement_control(saver, x_i=[3], start=start, end=end, filename='psi', test_case=test_case, u_i=[2],
sensor_configs=[SensorPlotConfig(
sensor_name='gps_heading', z_i=[0], x_i=[0])])
plot_state_measurement_control(saver, x_i=[0], start=start, end=end, filename='x_pos', test_case=test_case,
sensor_configs=[SensorPlotConfig(sensor_name='gps_position',
z_i=[0, 1, 2], x_i=[0])])
plot_state_measurement_control(saver, x_i=[1], start=start, end=end, filename='y_pos', test_case=test_case,
sensor_configs=[SensorPlotConfig(sensor_name='gps_position',
z_i=[0, 1, 2], x_i=[1])])
plot_flow(saver=saver, test_case=test_case)
plot_animation(saver, filename='animation', test_case=test_case, map_cones=map_cones,
map_x_offset=map_x_offset, map_y_offset=map_y_offset, map_rotation=map_rotation)
[docs]class KalmanFilterAnimation(object):
"""An animated scatter plot using matplotlib.animations.FuncAnimation."""
plot_heatmap = False
plot_fov = True
def __init__(self, saver: helpers.Saver, save_count: int = 1000, step_size: int = 1,
map_cones=None, map_x_offset: float = 0, map_y_offset: float = 0, map_rotation: float = 0):
"""
Initializes kalman filter animation.
Parameters
----------
saver : helpers.Saver
Saver containing all data of the kalman filter to animate.
save_count : int, optional
Number of frame to presave, by default 1000.
step_size : int, optional
Step size of animation, by default 1.
map_cones : _type_, optional
Pandas Dataframe containing the ground truth position of the landmarks, by default None.
map_x_offset : float, optional
Offset to move the ground truth map along the x axis., by default 0
map_y_offset : float, optional
Offset to move the ground truth map along the y axis., by default 0
map_rotation : float, optional
Angle in degrees to rotate the ground truth map around after moving it., by default 0
"""
self.saver = saver
self.sensor_z = np.array(saver.sensor_z)
self.map_x_offset = map_x_offset
self.map_y_offset = map_y_offset
self.map_rotation = map_rotation
self.gps_position_sensor: NovatelGPSPosition
self.gps_heading_sensor: NovatelGPSHeading
self.gps_position_sensor_index, self.gps_position_sensor = self.saver._kf.sensor('gps_position')
self.gps_heading_sensor_index, self.gps_heading_sensor = self.saver._kf.sensor('gps_heading')
# size of vehicle
l_h, l_v, l_y = 0.5, 2.5, 0.7
l_gps_h, l_gps_v, l_gps_y = 0.5, 2.5, 0.7
self.vehicle_points = np.array([[-l_h, -l_y], [l_v, -l_y], [l_v, l_y], [-l_h, l_y], [-l_h, -l_y]])
self.vehicle_gps_points = np.array([[-l_gps_h, -l_gps_y], [l_gps_v, -l_gps_y], [l_gps_v, l_gps_y],
[-l_gps_h, l_gps_y], [-l_gps_h, -l_gps_y]])
if self.plot_heatmap:
self.fig, (self.ax, self.heatmap_ax) = plt.subplots(1, 2)
else:
self.fig, self.ax = plt.subplots()
self.raw_map_cones = map_cones
self.map_cones = map_cones.copy()
self.move_map_cones()
self.ax.set_aspect('equal', adjustable='box')
self.step_size = step_size
# Then setup FuncAnimation.
self.ani = Player(self.fig, self.update, step_size=step_size, interval=10,
save_count=int((save_count-1)/step_size), init_func=self.setup_plot, maxi=save_count,
map_update_transmation_func=self.update_map_transformation, map_x_offset=map_x_offset,
map_y_offset=map_y_offset, map_rotation=map_rotation)
[docs] def replot_map_cones(self):
"""
Replots the ground truth map after changig map transformation parameters.
"""
if self.map_cones is not None:
size = LocalMapping.get_size_for_cone_ids(self.raw_map_cones[:, 2])
colors = LocalMapping.get_color_for_cone_ids(self.raw_map_cones[:, 2])
self.set_cones_data_for_scatter(self.ground_truth_scatter, (self.map_cones[:, 0:2], colors, size))
[docs] def move_map_cones(self):
"""
Moves and rotates the ground truth map according to the current map transformation parameters.
"""
if self.map_cones is not None:
self.map_cones[:, 0:2] = geometry.rotate_and_move(xy=self.raw_map_cones.copy()[:, 0:2],
rotation=np.radians(self.map_rotation),
offset_after_rotation=np.array([self.map_x_offset,
self.map_y_offset]))
[docs] def setup_plot(self):
"""
Setup all plots.
"""
# Setup scatter plot to plot tacked landmarks
self.tracked_cones_scatter = self.ax.scatter(np.array([np.nan]), np.array([np.nan]), c=np.array(['blue']),
s=np.array([np.nan]), vmin=0, vmax=1,
cmap="jet", edgecolor=None, marker="^")
# Setup line plot to plot position and heading of the vehicle
self.vehicle_plot = self.ax.plot(self.vehicle_points[:, 0], self.vehicle_points[:, 1])
# Setup line plot to plot position and heading of the vehicle according to the novatel gps,
# if both measurements available
self.vehicle_gps_with_heading_plot = self.ax.plot(self.vehicle_gps_points[:, 0],
self.vehicle_gps_points[:, 1], alpha=0, c='red')
# Setup scatter plot to plot position of the vehicle according to the novatel gps,
# if only the position measurement is available
self.vehicle_gps_without_heading_plot = self.ax.scatter(0, 0, alpha=0, marker='x', s=100, c='red')
# Setup arrow plot to plot heading of the vehicle according to the novatel gps,
# if only the heading measurement is avaiblable
self.gps_heading_plot = self.ax.arrow(0, 0, 4, 0, alpha=0, color='red', width=0.1)
# Setup scatter plot to plot observed landmarks
self.observed_cones_scatter = self.ax.scatter(np.array([np.nan]), np.array([np.nan]), c=np.array(['blue']),
s=np.array([np.nan]), vmin=0, vmax=1,
cmap="jet", edgecolor=None, marker='2')
# Plot ground truth once if specified
if self.map_cones is not None:
landmark_ids = self.map_cones[:, 2].astype(int)
# Setup scatter plot to plot externally generated ground truth
self.ground_truth_scatter = self.ax.scatter(self.map_cones[:, 0], self.map_cones[:, 1],
edgecolor=None, marker="x",
c=LocalMapping.get_color_for_cone_ids(landmark_ids),
s=LocalMapping.get_size_for_cone_ids(landmark_ids))
# Setup info text with time_step and time
self.time_text = self.ax.text(-5, -3.5, 't = 0 s', fontsize=15)
# Setup heatmap to plot the state noise covariance matrix if specified
if self.plot_heatmap:
self.P_heatmap = self.heatmap_ax.imshow(self.saver.P[0], cmap='hot', interpolation='nearest')
labels = ['x', 'y', 'v_x', 'psi'] + \
np.array([[f'x for Landmark {i}', f'y for Landmark {i}']
for i in range(int((self.saver.P[0].shape[0] - self.saver._kf.x_r_dim)/2))]
).flatten().tolist()
self.heatmap_ax.set_xticks(np.arange(len(labels)))
self.heatmap_ax.set_yticks(np.arange(len(labels)))
self.heatmap_ax.set_xticklabels(labels)
self.heatmap_ax.set_yticklabels(labels)
# Setup Axis
max_and_mins = np.array([[min(global_landmarks[:, 0]), max(global_landmarks[:, 0]),
min(global_landmarks[:, 1]), max(global_landmarks[:, 1])]
for global_landmarks in self.saver.current_global_landmarks
if global_landmarks is not None and global_landmarks.any()])
axis_max_mins = [min(min(max_and_mins[:, 0]), -5) - 2, max(max_and_mins[:, 1]) + 2,
min(max_and_mins[:, 2]) - 2, max(max_and_mins[:, 3]) + 2]
self.ax.axis(axis_max_mins)
# Setup Legend
legend_elements = [Line2D([0], [0], color='b', marker='^', lw=0, label='Tracked Landmarks'),
Line2D([0], [0], marker='2', color='b', lw=0,
label='Measured Landmarks relative to new position'),
Line2D([0], [0], color='b', marker='x', lw=0, label='"Ground Truth"'),
Patch(facecolor='orange', alpha=0.1, edgecolor='blue', label='Predicted FoV Gate'),
Patch(facecolor='purple', alpha=0.1, edgecolor='blue', label='FoV Gate'),
Patch(facecolor='blue', edgecolor='blue', label='Vehicle'),
Patch(facecolor='red', edgecolor='red', label='GPS Position and Heading'),
FancyArrow(0, 0, 1, 0, color='red', width=0.3, label='GPS Heading'),
Line2D([0], [0], color='red', marker='x', lw=0, label='GPS Position')]
self.ax.legend(handles=legend_elements)
# Setup Grid
self.ax.minorticks_on()
self.ax.xaxis.set_major_locator(MultipleLocator(6))
self.ax.xaxis.set_major_formatter(FormatStrFormatter('%d'))
self.ax.xaxis.set_minor_locator(AutoMinorLocator(1))
# self.ax.xaxis.set_minor_locator(MultipleLocator(1))
self.ax.yaxis.set_major_locator(MultipleLocator(6))
self.ax.yaxis.set_major_formatter(FormatStrFormatter('%d'))
self.ax.yaxis.set_minor_locator(AutoMinorLocator(1))
# self.ax.yaxis.set_minor_locator(MultipleLocator(1))
self.ax.set_axisbelow(True)
self.ax.grid(which='major', color='red')
self.ax.grid(which='minor', color='grey')
return self.observed_cones_scatter, self.tracked_cones_scatter, self.vehicle_plot[0]
[docs] def set_cones_data_for_scatter(self, scatter: clt.PathCollection,
cones_data: Tuple[npt.NDArray, npt.NDArray, npt.NDArray]) -> None:
"""
Set cones data for a specified scatter plot.
Sets coordinates of scatter points, marker color and size.
Parameters
----------
scatter : clt.PathCollection
Scatter object to be modified.
cones_data : Tuple[npt.NDArray, npt.NDArray, npt.NDArray]
Landmark positions, marker color and marker size.
"""
scatter.set_offsets(cones_data[0])
scatter.set_facecolor(cones_data[1])
scatter.set_sizes(cones_data[2])
scatter.set_edgecolor('face')
[docs] def get_cones_data(self, cone_coords: npt.NDArray,
cone_ids: npt.NDArray) -> Tuple[npt.NDArray, npt.NDArray, npt.NDArray]:
"""
Get landmark position, color and size.
Parameters
----------
cone_coords : npt.NDArray
Cartesian oordinates of the cones.
cone_ids : npt.NDArray
ID of the cones.
Returns
-------
Tuple[npt.NDArray, npt.NDArray, npt.NDArray]
Landmark positions, marker color and marker size.
"""
size = LocalMapping.get_size_for_cone_ids(cone_ids)
colors = LocalMapping.get_color_for_cone_ids(cone_ids)
return np.array(cone_coords), np.array(colors), np.array(size)
[docs] def get_tracked_cones_data(self, time_step: int) -> Tuple[npt.NDArray, npt.NDArray, npt.NDArray]:
"""
Get position, color and size for all tracked landmarks of the specified time step.
Parameters
----------
time_step : int
Time step to be animated.
Returns
-------
Tuple[npt.NDArray, npt.NDArray, npt.NDArray]
Tracked landmark positions, marker color and marker size.
"""
landmarks = self.saver.landmarks[time_step]
if landmarks is None or not landmarks.any():
return np.array([[np.nan, np.nan]]), np.array(['blue']), np.array([np.nan])
return self.get_cones_data(landmarks,
np.array(self.saver._kf.landmark_ids[:len(landmarks)],
dtype=int))
[docs] def get_observed_cones_data(self, time_step: int) -> Tuple[npt.NDArray, npt.NDArray, npt.NDArray]:
"""
Get position, color and size for all observed landmarks of the specified time step.
Parameters
----------
time_step : int
Time step to be animated.
Returns
-------
Tuple[npt.NDArray, npt.NDArray, npt.NDArray]
Observed landmark positions, marker color and marker size.
"""
landmarks = self.saver.current_global_landmarks[time_step]
if landmarks is None or not landmarks.any():
return np.array([[np.nan, np.nan]]), np.array(['blue']), np.array([np.nan])
return self.get_cones_data(landmarks[:, 0:2], landmarks[:, 2])
[docs] def create_confidence_ellipse(self, pos: npt.NDArray, cov: npt.NDArray, n_std: float = 3.0,
facecolor: str = 'none', **kwargs) -> Ellipse:
"""
Create confidence ellipse to plot position uncertainty.
Using https://matplotlib.org/devdocs/gallery/statistics/confidence_ellipse.html.
Parameters
----------
pos : npt.NDArray
Cartesian coordinates of the position.
cov : npt.NDArray
Uncertainty / covariance matrix of the positon.
n_std : float, optional
Number of standard deviations to plot, by default 3.0.
facecolor : str, optional
Face color of ellipse to plot, by default 'none'.
Returns
-------
Ellipse
Confidence Ellipse to plot.
"""
pearson = cov[0, 1]/np.sqrt(cov[0, 0] * cov[1, 1])
# Using a special case to obtain the eigenvalues of this
# two-dimensionl dataset.
ell_radius_x = np.sqrt(1 + pearson)
ell_radius_y = np.sqrt(1 - pearson)
ellipse = Ellipse((0, 0), width=ell_radius_x * 2, height=ell_radius_y * 2,
facecolor=facecolor, **kwargs)
# Calculating the stdandard deviation of x from
# the squareroot of the variance and multiplying
# with the given number of standard deviations.
scale_x = np.sqrt(cov[0, 0]) * n_std
mean_x = pos[0]
# calculating the stdandard deviation of y ...
scale_y = np.sqrt(cov[1, 1]) * n_std
mean_y = pos[1]
transf = transforms.Affine2D() \
.rotate_deg(45) \
.scale(scale_x, scale_y) \
.translate(mean_x, mean_y)
ellipse.set_transform(transf + self.ax.transData)
return ellipse
[docs] def update_gps_measurements(self, i):
gps_world_position_measurement = self.saver.sensor_z[i][self.gps_position_sensor_index]
gps_world_heading_measurement = self.saver.sensor_z[i][self.gps_heading_sensor_index]
if np.all(np.isfinite(gps_world_heading_measurement)):
gps_map_heading_measurement = self.gps_heading_sensor.hx_inverse(z=gps_world_heading_measurement)
else:
gps_map_heading_measurement = None
if np.all(np.isfinite(gps_world_position_measurement)):
gps_map_position_measurement = self.gps_position_sensor.hx_inverse(z=gps_world_position_measurement)
else:
gps_map_position_measurement = None
if gps_map_position_measurement is not None:
if gps_map_heading_measurement is not None:
vehicle_gps_points = geometry.rotate_and_move(
xy=self.vehicle_gps_points.copy(), offset_after_rotation=gps_map_position_measurement[0:2].copy(),
rotation=gps_map_heading_measurement[0])
self.vehicle_gps_with_heading_plot[0].set_data(vehicle_gps_points[:, 0], vehicle_gps_points[:, 1])
self.vehicle_gps_with_heading_plot[0].set_alpha(1)
self.vehicle_gps_without_heading_plot.set_alpha(0)
self.gps_heading_plot.set_alpha(0)
else:
self.vehicle_gps_without_heading_plot.set_offsets(gps_map_position_measurement[0:2])
self.vehicle_gps_with_heading_plot[0].set_alpha(0)
self.vehicle_gps_without_heading_plot.set_alpha(1)
self.gps_heading_plot.set_alpha(0)
elif gps_map_heading_measurement is not None:
self.gps_heading_plot.set_data(x=self.saver.x_R[i][0], y=self.saver.x_R[i][1],
dx=np.cos(gps_map_heading_measurement)[0]*3,
dy=np.sin(gps_map_heading_measurement)[0]*3)
self.vehicle_gps_with_heading_plot[0].set_alpha(0)
self.vehicle_gps_without_heading_plot.set_alpha(0)
self.gps_heading_plot.set_alpha(1)
else:
self.vehicle_gps_with_heading_plot[0].set_alpha(0)
self.vehicle_gps_without_heading_plot.set_alpha(0)
self.gps_heading_plot.set_alpha(0)
[docs] def update(self, i: int):
"""
Function to update the plots of the animation / figure.
Parameters
----------
i : int
Current timestep to plot and animate.
"""
i *= self.step_size
if i + self.step_size > len(self.saver.t) or i - self.step_size < 0:
self.ani.stop()
return
# Update info text with timestep and time
self.time_text.set_text(f't = {self.saver.t[i]} s, prediction_step = {i}')
# Get cone data for observed and tracked cones for this timestep and plot accordingly
tracked_cones_data = self.get_tracked_cones_data(i)
observed_cones_data = self.get_observed_cones_data(i)
self.set_cones_data_for_scatter(self.tracked_cones_scatter, tracked_cones_data)
self.set_cones_data_for_scatter(self.observed_cones_scatter, observed_cones_data)
# Get vehicle data for this timestep and plot accordingly
vehicle_points = geometry.rotate_and_move(self.vehicle_points, rotation=self.saver.x_R[i][3],
offset_after_rotation=self.saver.x_R[i][0:2])
self.vehicle_plot[0].set_data(vehicle_points[:, 0], vehicle_points[:, 1])
self.update_gps_measurements(i)
# Delete all mapping lines and replot them for the current timestep (keep tracked and gps vehicle plot)
for line in self.ax.lines[2:]:
line.remove()
for observed_landmark_i, tracked_landmark_i in enumerate(self.saver.mapping[i]):
if tracked_landmark_i == -1:
continue
self.ax.plot(
[observed_cones_data[0][observed_landmark_i, 0], tracked_cones_data[0][tracked_landmark_i, 0]],
[observed_cones_data[0][observed_landmark_i, 1], tracked_cones_data[0][tracked_landmark_i, 1]], 'k-')
if self.saver.individual_compatability[i].ndim == 2 and self.saver.observable_to_global_mapping[i].ndim == 1:
for observed_landmark_i, observed_landmark_compatability in enumerate(self.saver.individual_compatability[i]): # noqa: E501
for observable_landmark_i, compatability in enumerate(observed_landmark_compatability):
if not compatability:
continue
tracked_landmark_i = self.saver.observable_to_global_mapping[i][observable_landmark_i]
self.ax.plot(
[observed_cones_data[0][observed_landmark_i, 0], tracked_cones_data[0][tracked_landmark_i, 0]],
[observed_cones_data[0][observed_landmark_i, 1], tracked_cones_data[0][tracked_landmark_i, 1]],
c='grey', alpha=0.5)
# Delete all uncertainty ellipses
for collection in self.ax.collections:
if isinstance(collection, matplotlib.collections.EllipseCollection):
collection.remove()
# Calculate uncertainty ellipses for tracked landmarks
if np.isfinite(tracked_cones_data[0]).any():
ellipses = np.empty((tracked_cones_data[1].shape[0] + 1, 5))
for tracked_pos_i, pos in enumerate(tracked_cones_data[0]):
start_index = self.saver._kf.x_r_dim + 2 * tracked_pos_i
covariance = self.saver.P[i][start_index:start_index+2, start_index:start_index+2]
ellipses[tracked_pos_i, 0:3] = self.compute_eigenvalues_and_angle_of_covariance(covariance)
ellipses[tracked_pos_i, 3:5] = pos
else:
ellipses = np.empty((1, 5))
# Calculate uncertainty ellipses for vehicle position
ellipses[-1, 0:3] = self.compute_eigenvalues_and_angle_of_covariance(self.saver.P[i][0:2, 0:2])
ellipses[-1, 3:5] = self.saver.x[i][0:2]
# Plot all new uncertainity ellipses
ec = clt.EllipseCollection(ellipses[:, 0], ellipses[:, 1], ellipses[:, 2], offsets=ellipses[:, 3:5],
units='xy', transOffset=self.ax.transData, alpha=0.2)
self.ax.add_collection(ec)
# If specified, delete heatmap image for state noise covariance and replot it
if self.plot_heatmap:
for image in self.heatmap_ax.images:
image.remove()
P = self.saver.P[i]
self.P_heatmap = self.heatmap_ax.imshow(P, cmap='hot', interpolation='nearest')
labels = ['x', 'y', 'v_x', 'psi'] + np.array([[f'x for Landmark {i}', f'y for Landmark {i}']
for i in range(int((P.shape[0] - self.saver._kf.x_r_dim)/2))]
).flatten().tolist()
self.heatmap_ax.set_xticks(np.arange(len(labels)))
self.heatmap_ax.set_yticks(np.arange(len(labels)))
self.heatmap_ax.set_xticklabels(labels)
self.heatmap_ax.set_yticklabels(labels)
# If specified, plot field of view gate for observing and mapping
if self.plot_fov:
local_mapping: LocalMapping = self.saver._kf.sensor('local_mapping')[1]
# keep gps heading arrow
while len(self.ax.patches) > 1:
self.ax.patches[1].remove()
# Plot field of view gate for observing
path = geometry.generate_fov_path(fov_max_distance=local_mapping.fov_max_distance,
fov_min_distance=local_mapping.fov_min_distance,
fov_angle=local_mapping.fov_angle,
offset_after_rotation=self.saver.x[i][0:2],
offset_before_rotation=local_mapping.camera_offset,
rotation=self.saver.x[i][3])
patch = PathPatch(path, facecolor='purple', lw=2, alpha=0.1)
self.ax.add_patch(patch)
# Plot field of view gate for mapping
path = geometry.generate_fov_path(fov_max_distance=local_mapping.fov_max_distance,
fov_angle=local_mapping.fov_angle,
fov_scaling=local_mapping.fov_scaling,
offset_after_rotation=self.saver.x[i][0:2],
offset_before_rotation=local_mapping.camera_offset,
rotation=self.saver.x[i][3])
patch = PathPatch(path, facecolor='orange', lw=2, alpha=0.1)
self.ax.add_patch(patch)
return self.observed_cones_scatter, self.tracked_cones_scatter, self.vehicle_plot[0]
[docs] @staticmethod
def compute_eigenvalues_and_angle_of_covariance(cov: npt.NDArray,
n_std: float = 1.0) -> Tuple[npt.NDArray, npt.NDArray, float]:
"""
Compute the eigenvalues and the angle of the covariance matrix. :cite:p:`Kington2012`
Parameters
----------
cov : npt.NDArray
Covariance matrix.
n_std : float, optional
Number of standard deviations to plot, by default 1.0.
Returns
-------
Tuple[npt.NDArray, npt.NDArray, float]
Weight, Height and angle of the covariance matrix.
"""
def eigsorted(cov):
vals, vecs = np.linalg.eigh(cov)
order = vals.argsort()[::-1]
return vals[order], vecs[:, order]
vals, vecs = eigsorted(cov)
theta = np.degrees(np.arctan2(*vecs[:, 0][::-1]))
w, h = 2 * n_std * np.sqrt(vals)
return w, h, theta
[docs]class Player(animation.FuncAnimation):
"""
Wrapper class to extend animation by interactive menu.
Attributes
----------
fig : matplotlib.figure.Figure
Figure that should be animated.
func : Callable
Function to call to update figure.
min : int
Minimal frame number to be animated.
max : int
Maximal frame number to be animated.
step_size : int
Number of frames to skip during animation per step.
i : int
Current frame number.
forwards : bool
Boolean to specify whether the animation currently runs forward.
runs : bool
Boolean to specify whether the animation currently is animated / runs.
"""
def __init__(self, fig: matplotlib.figure.Figure, func: Callable, map_update_transmation_func: Callable,
fargs: Optional[Tuple] = None, init_func: Callable = None, save_count: int = None, mini: int = 0,
maxi: int = 100, frames=None, pos: Tuple[float, float] = (0.125, 0.92), step_size: int = 1,
map_x_offset: float = 0, map_y_offset: float = 0, map_rotation: float = 0, **kwargs):
"""
Initializes Player with an interactive menu for an animation.
Parameters
----------
fig : matplotlib.figure.Figure
Figure that should be animated.
func : Callable
Function to call to update figure.
map_update_transmation_func: Callable
Function to call to update the parameters for the ground truth map transformation.
frames : Optional[Iterable, int, Generator], optional
No idea yet, by default None.
init_func : Callable, optional
Function to initialize figure, by default None.
args : Optional[Tuple], optional
args to pass to update function, by default None.
save_count : int, optional
Integer how many frames should be presaved, by default None.
mini : int, optional
Minimal frame number to be animated, by default 0.
maxi : int, optional
Maximal frame number to be animated, by default 100.
pos : Tuple[float, float], optional
Position of interactive menu, by default (0.125, 0.92).
step_size : int, optional
Number of frames to skip during animation per step, by default 1.
map_x_offset : float, optional
Offset to move the ground truth map along the x axis., by default 0
map_y_offset : float, optional
Offset to move the ground truth map along the y axis., by default 0
map_rotation : float, optional
Angle in degrees to rotate the ground truth map around after moving it., by default 0
"""
self.i = 0
self.min = mini
self.max = maxi
self.runs = True
self.forwards = True
self.step_size = step_size
self.fig = fig
self.func = func
self.map_update_transmation_func = map_update_transmation_func
self.setup(pos, map_x_offset=map_x_offset, map_y_offset=map_y_offset, map_rotation=map_rotation)
animation.FuncAnimation.__init__(self, self.fig, self.func, frames=self.play(), init_func=init_func,
fargs=fargs, save_count=save_count, **kwargs)
[docs] def play(self):
"""
Play animation.
"""
while self.runs:
self.i = self.i + self.forwards * int(self.step_size_input.val)\
- (not self.forwards) * int(self.step_size_input.val)
if self.i > self.min and self.i < self.max:
yield self.i
else:
self.stop()
yield self.i
[docs] def start(self):
"""
Start animation.
"""
self.runs = True
self.event_source.start()
[docs] def stop(self, event=None):
"""
Stop animation.
"""
self.runs = False
self.event_source.stop()
[docs] def forward(self, event=None):
"""
Play animation forward.
"""
self.forwards = True
self.start()
[docs] def backward(self, event=None):
"""
Play animation backward.
"""
self.forwards = False
self.start()
[docs] def oneforward(self, event=None):
"""
Step one frame forward by setting direction and calling onestep.
"""
self.forwards = True
self.onestep()
[docs] def onebackward(self, event=None):
"""
Step one frame backward by setting direction and calling onestep.
"""
self.forwards = False
self.onestep()
[docs] def onestep(self):
"""
Step one frame forward or backward. Direction depends on previous specification.
"""
if self.i > self.min and self.i < self.max:
self.i = self.i+self.forwards-(not self.forwards)
elif self.i == self.min and self.forwards:
self.i += 1
elif self.i == self.max and not self.forwards:
self.i -= 1
self.func(self.i)
self.fig.canvas.draw_idle()
[docs] def map_x_offset_changed(self, map_x_offset: str):
"""
Callback function to change the x offset map transformation parameter.
Parameters
----------
map_x_offset : float, optional
Offset to move the ground truth map along the x axis.
"""
self.map_update_transmation_func(map_x_offset=float(map_x_offset))
[docs] def map_y_offset_changed(self, map_y_offset: str):
"""
Callback function to change the y offset map transformation parameter.
Parameters
----------
map_y_offset : float, optional
Offset to move the ground truth map along the y axis.
"""
self.map_update_transmation_func(map_y_offset=float(map_y_offset))
[docs] def map_rotation_changed(self, map_rotation: str):
"""
Callback function to change the rotation angle map transformation parameter.
Parameters
----------
map_rotation : float, optional
Angle in degrees to rotate the ground truth map around after moving it.
"""
self.map_update_transmation_func(map_rotation=float(map_rotation))
[docs] def set_label_for_textbox(self, textbox):
label = textbox.ax.get_children()[0]
label.set_position([0.5, 1.35])
label.set_verticalalignment('top')
label.set_horizontalalignment('center')
[docs] def setup(self, pos: Tuple[float, float], map_x_offset: float = 0,
map_y_offset: float = 0, map_rotation: float = 0):
"""
Setup player and interactive menu.
Parameters
----------
pos : Tuple[float, float]
Position of the interactive menu.
map_x_offset : float, optional
Offset to move the ground truth map along the x axis., by default 0
map_y_offset : float, optional
Offset to move the ground truth map along the y axis., by default 0
map_rotation : float, optional
Angle in degrees to rotate the ground truth map around after moving it., by default 0
"""
playerax = self.fig.add_axes([pos[0], pos[1], 0.5, 0.04])
divider = mpl_toolkits.axes_grid1.make_axes_locatable(playerax)
bax = divider.append_axes("right", size="100%", pad=0.05)
sax = divider.append_axes("right", size="100%", pad=0.05)
fax = divider.append_axes("right", size="100%", pad=0.05)
ofax = divider.append_axes("right", size="100%", pad=0.1)
sssax = divider.append_axes("right", size="100%", pad=0.1)
map_x_offset_ax = divider.append_axes("right", size="100%", pad=0.2)
map_y_offset_ax = divider.append_axes("right", size="100%", pad=0.2)
rotation_ax = divider.append_axes("right", size="100%", pad=0.2)
self.button_oneback = matplotlib.widgets.Button(playerax, label=u'$\u29CF$')
self.button_back = matplotlib.widgets.Button(bax, label=u'$\u25C0$')
self.button_stop = matplotlib.widgets.Button(sax, label=u'$\u25A0$')
self.button_forward = matplotlib.widgets.Button(fax, label=u'$\u25B6$')
self.button_oneforward = matplotlib.widgets.Button(ofax, label=u'$\u29D0$')
self.step_size_input = matplotlib.widgets.Slider(sssax, label='StepSize', valmin=1, valmax=10,
valinit=self.step_size, orientation='vertical')
self.map_x_offset_input = matplotlib.widgets.TextBox(map_x_offset_ax, 'MapOffsetX',
textalignment='center', initial=str(map_x_offset))
self.map_y_offset_input = matplotlib.widgets.TextBox(map_y_offset_ax, 'MapOffsetY', textalignment='center',
initial=str(map_y_offset))
self.map_rotation_input = matplotlib.widgets.TextBox(rotation_ax, 'MapRotation', textalignment='center',
initial=str(map_rotation))
# move labels above boxes
self.set_label_for_textbox(self.map_x_offset_input)
self.set_label_for_textbox(self.map_y_offset_input)
self.set_label_for_textbox(self.map_rotation_input)
self.button_oneback.on_clicked(self.onebackward)
self.button_back.on_clicked(self.backward)
self.button_stop.on_clicked(self.stop)
self.button_forward.on_clicked(self.forward)
self.button_oneforward.on_clicked(self.oneforward)
self.map_x_offset_input.on_submit(self.map_x_offset_changed)
self.map_rotation_input.on_submit(self.map_rotation_changed)
self.map_y_offset_input.on_submit(self.map_y_offset_changed)
[docs]def save_tracked_map(filter: ukf.UKF, gps_filter: Optional[ukf.UKF], test_day: str = 'default', track_layout: str = 'default') -> str:
base_path = f'{os.path.abspath(os.path.dirname(__file__))}/../../plots/maps/{test_day}/{track_layout}'
Path(base_path).mkdir(parents=True, exist_ok=True)
gps = {}
if gps_filter is not None and gps_filter.initial_heading_t is not None:
gps_position_sensor: NovatelGPSPosition = gps_filter.sensor('gps_position')[1]
gps_heading_sensor: NovatelGPSHeading = gps_filter.sensor('gps_heading')[1]
if gps_position_sensor is not None and gps_position_sensor._transformer is not None and gps_heading_sensor is not None:
lat, lon = gps_position_sensor._transformer.transform(xx=0, yy=0, errcheck=True, direction='FORWARD')
heading = gps_heading_sensor.hx(sigmas_f=np.zeros((1, 4)), u=None)[0]
gps = {
'lat': lat,
'lon': lon,
'heading': heading[0],
}
map_number = max([-1] + [int(filename.split('map_')[1].split('.json')[0]) for filename in glob.glob(f'{base_path}/map_*.json')]) + 1
perceived_n = filter.landmark_perceived_n.tolist() if type(filter.landmark_perceived_n) == np.ndarray else filter.landmark_perceived_n
data = {
'x': filter.x[filter.x_r_dim::2].tolist(),
'y': filter.x[filter.x_r_dim+1::2].tolist(),
'perceived_n': perceived_n,
'registration': (np.array(filter.landmark_ids) != 2).tolist(),
'faked_id': filter.landmark_ids,
'id': filter.landmark_ids,
'gps': gps
}
with open(f'{base_path}/map_{map_number}.json', 'w') as output:
json.dump(data, output, indent=4)
return f'{base_path}/map_{map_number}.json'
[docs]def load_preloading_map(test_day: str = 'default', track_layout: str = 'default') -> Tuple[Optional[str], Optional[pd.DataFrame]]:
base_path = f'{os.path.abspath(os.path.dirname(__file__))}/../../plots/maps/{test_day}/{track_layout}/'
if not os.path.isdir(base_path):
return None, None
if os.path.isfile(f'{base_path}/manual.json'):
file_path = f'{base_path}/manual.json'
elif os.path.isfile(f'{base_path}/ground_truth.json'):
file_path = f'{base_path}/ground_truth.json'
else:
map_number = max([-1] + [int(filename.split('map_')[1].split('.json')[0]) for filename in glob.glob(f'{base_path}/map_*.json')])
if map_number == -1:
return None, None
file_path = f'{base_path}/map_{map_number}.json'
with open(file_path, 'r') as input:
data = json.load(input)
return file_path, data
[docs]def plot_observed_landmark_vs_observable_landmarks(observed_landmarks, observable_landmarks, mapping,
test_day='default', track_layout='default'):
base_path = f'{os.path.abspath(os.path.dirname(__file__))}/../../plots/maps/{test_day}/{track_layout}'
Path(base_path).mkdir(parents=True, exist_ok=True)
plt.figure()
plt.scatter(observed_landmarks[:, 0], observed_landmarks[:, 1], color=np.choose(observed_landmarks.astype(int)[:, 2], LocalMapping._colors), marker='2', label='observed cones')
plt.scatter(observable_landmarks[:, 0], observable_landmarks[:, 1], color=np.choose(observable_landmarks.astype(int)[:, 2], LocalMapping._colors), marker='^', label='observable cones')
for observed_landmark_i, observable_landmark_i in enumerate(mapping):
if observable_landmark_i == -1:
continue
plt.plot([observed_landmarks[observed_landmark_i][0], observable_landmarks[observable_landmark_i][0]],
[observed_landmarks[observed_landmark_i][1], observable_landmarks[observable_landmark_i][1]],
color='black')
plt.legend()
plt.axis('equal')
plt.savefig(f'{base_path}/mapping.png', dpi=400)