Source code for utils.plot

"""
Implementation of all utility functions regarding the plotting of any data regarding the kalman filter.
"""
import os
import glob
import logging
from pathlib import Path
from typing import Callable, List, Tuple, Optional, Dict

import numpy as np
import numpy.typing as npt
import pandas as pd
import json

import plotly.express as px
import plotly.graph_objects as go
from plotly.subplots import make_subplots

import matplotlib.pyplot as plt
import matplotlib.animation as animation
import matplotlib.transforms as transforms
import matplotlib.collections as clt
import matplotlib.widgets
import matplotlib.figure
from matplotlib.patches import Ellipse
from matplotlib.patches import Patch, PathPatch, FancyArrow
from matplotlib.lines import Line2D
from matplotlib.ticker import MultipleLocator, FormatStrFormatter, AutoMinorLocator
import mpl_toolkits.axes_grid1


from sensors import Sensor, LocalMapping, NovatelGPSHeading, NovatelGPSPosition
import ukf
from utils import geometry, helpers


[docs]def title_generator(saver: helpers.Saver, x_i: List[int] = [0], u_i: List[int] = [], z_i: List[int] = []) -> str: """ Generate title of plot. Uses the names of states, control inputs and measurements to generate plot title. Parameters ---------- saver : helpers.Saver helpers.Saver object containing the data of the kalman filter. x_i : List[int], optional Indices of the states being plotted., by default [0] u_i : List[int], optional Indices of the control inputs being plotted., by default [] z_i : List[int], optional Indices of the measurements being plotted., by default [] Returns ------- str Title of the plot. """ states = ', '.join([saver._kf.x_names[i] for i in x_i]) return f'Plot states {states} against control inputs'
[docs]def save_flow_json(flow_dict: Dict[int, List[str]], test_case: str = 'default'): """ Saves the flow of the kalman filter as json. Parameters ---------- flow_dict : Dict[int, List[str]] Dictionary containing the flow of the kalman filter. test_case : str, optional Name of the subfolder where the plot should be saved to., by default 'default' """ logging.info(msg=f'Saving flow json for test case \'{test_case}\'') Path(f'{os.path.abspath(os.path.dirname(__file__))}/../../plots/flow/{test_case}').mkdir(parents=True, exist_ok=True) with open(f'{os.path.abspath(os.path.dirname(__file__))}/../../plots/flow/{test_case}/flow.json', 'w') as flow_json_file: json.dump(flow_dict, flow_json_file, indent=4)
[docs]def plot_flow(saver: helpers.Saver, test_case: str = 'default'): """ _summary_ Parameters ---------- saver : helpers.Saver helpers.Saver object containing the data of the KalmanFilter. test_case : str, optional Name of the subfolder where the plot should be saved to., by default 'default' """ flow_dict = {} for prediction_step, executed_steps in enumerate(saver.executed_steps): flow_dict[prediction_step] = [str(executed_step) for executed_step in executed_steps] save_flow_json(flow_dict=flow_dict, test_case=test_case)
[docs]def save_plot(fig: go.Figure, filename: str, test_case: str = 'default', plot_html: bool = True, plot_png: bool = True) -> None: """ Saves the plot. Saves the plot as html and/or png. Parameters ---------- fig : go.Figure Plot to be saved. filename : str Name of the file where the plot should be saved to. test_case : str, optional Name of the subfolder where the plot should be saved to., by default 'default' plot_html : bool, optional Specifies whether the plot should be saved as html., by default True plot_png : bool, optional Specifies whether the plot should be saved as png., by default True """ logging.info(msg=f'Saving \'{filename}\' plots for test case \'{test_case}\'') if plot_html: Path(f'{os.path.abspath(os.path.dirname(__file__))}/../../plots/html/{test_case}').mkdir(parents=True, exist_ok=True) fig.write_html(f'{os.path.abspath(os.path.dirname(__file__))}/../../plots/html/{test_case}/{filename}.html') if plot_png: Path(f'{os.path.abspath(os.path.dirname(__file__))}/../../plots/images/{test_case}').mkdir(parents=True, exist_ok=True) fig.write_image(f'{os.path.abspath(os.path.dirname(__file__))}/../../plots/images/{test_case}/{filename}.png')
[docs]class SensorPlotConfig(): """ Class to specify how to plot the measurements of a sensor. Specify what the sensor name is, that should be used, which measurements to convert with the hx_inverse function and subsequently which converted states to plot. Attributes ---------- sensor_name : str Name of the sensor to be plotted. z_i : List[int] Indices of the measurements to be converted. x_i : List[int] Indices of the converted states to be plotted. """ def __init__(self, sensor_name: str, z_i: List[int], x_i: List[int]) -> None: """ Initialize sensor plot config. ... Parameters ---------- sensor_name : str Name of the sensor to be plotted. z_i : List[int] Indices of the measurements to be converted. x_i : List[int] Indices of the converted states to be plotted. """ self.sensor_name = sensor_name self.z_i = z_i self.x_i = x_i
[docs]def plot_state_measurement_control( saver: helpers.Saver, x_i: List[int], u_i: List[int] = [], sensor_configs: List[SensorPlotConfig] = [], start: int = 0, end: Optional[int] = None, filename: str = 'plot', test_case: str = 'default'): """ Plot some state(s) against some measurement(s) and control input(s). Uses sensor plot configs to plot measurements. Plot the control inputs on a secondary axis. Parameters ---------- saver : helpers.Saver helpers.Saver object containing the data of the KalmanFilter x_i : List[int] Indices of the state(s) to be plotted. u_i : List[int], optional Indices of the control input(s) to be plotted., by default [] sensor_configs : List[SensorPlotConfig], optional Sensor configs of the measurements to be plotted., by default [] start : int, optional Timestamp from where to plot the data., by default 0 end : Optional[int], optional Timestamp until where to plot the data., by default None filename : str, optional Name of the file where the plot should be saved to., by default 'plot' test_case : str, optional Name of the subfolder where the plot should be saved to., by default 'default' """ fig = make_subplots(specs=[[{'secondary_y': True}]]) t = saver.t[start:end] # Plot posterior incl. variances for i in x_i: fig.add_trace(go.Scatter(x=t, y=np.array(saver.x_R_post)[start:end, i], mode='lines+markers', name=f'posterior of x[{i}]', error_y=dict( type='data', # value of error bar given in data coordinates array=np.array(saver.P_R_post)[start:end, i, i], visible=True))) fig.add_trace(go.Scatter(x=t, y=np.array(saver.x_R_prior)[start:end, i], mode='markers', name=f'prior of x[{i}]', error_y=dict( type='data', # value of error bar given in data coordinates array=np.array(saver.P_R_prior)[start:end, i, i], visible=True))) fig.add_trace(go.Scatter(x=t, y=np.array(saver.x_R)[start:end, i], mode='markers', name=f'x[{i}]')) for i in u_i: fig.add_trace(go.Scatter(x=saver.t, y=np.array(saver.u)[:, i], mode='markers', name=f'control input u[{i}]'), secondary_y=True) for sensor_conf in sensor_configs: sensor: Sensor sensor_i: int sensor_i, sensor = saver._kf.sensor(sensor_conf.sensor_name) y = sensor.hx_inverse(z=np.array(saver.sensor_z)[start:end, sensor_i, sensor_conf.z_i], u=np.array(saver.u)) for _x_i in sensor_conf.x_i: fig.add_trace( go.Scatter(x=t, y=y[:, _x_i], mode='lines+markers', name=f'measurements of {sensor.sensor_name}', error_y=dict( type='data', # value of error bar given in data coordinates array=sensor.hx_inverse(z=np.sqrt( np.array(saver.sensor_R)[start:end, sensor_i, sensor_conf.z_i, sensor_conf.z_i]), u=np.array(saver.u))[:, _x_i], visible=True))) fig.update_layout( title=title_generator(saver, x_i), xaxis_title="Time t / s", yaxis_title=', '.join([saver._kf.x_names[i] for i in x_i]), # legend_title="Legend Title", font=dict( family="Courier New, monospace", size=18, color="RebeccaPurple" ) ) save_plot(fig=fig, filename=filename, test_case=test_case)
[docs]def plot_track(saver: helpers.Saver, filename='track', test_case: str = 'default'): """ Plot and save the local estimated trajectory. ... Parameters ---------- saver : helpers.Saver helpers.Saver object containing the estimated data of the KalmanFilter. filename : str, optional Name of the file where the plot should be saved to, by default 'track'. test_case : str, optional Name of the subfolder where the plot should be saved to, by default 'default'. """ # local_mapping: LocalMapping = saver._kf.sensor('local_mapping')[1] fig = go.Figure() fig.add_trace(go.Scatter(x=np.array(saver.x_R)[:, 0], y=np.array(saver.x_R)[:, 1], mode='lines+markers', name='track', hovertext=saver.t, marker=dict( color=list(range(np.array(saver.x_R).shape[0])), colorscale="matter"))) saver._kf.landmark_ids fig.add_trace(go.Scatter(x=saver.landmarks[-1][:, 0], y=saver.landmarks[-1][:, 1], name='Landmarks', mode='markers', marker=dict(color=LocalMapping.get_color_for_cone_ids(np.array(saver._kf.landmark_ids)), size=LocalMapping.get_size_for_cone_ids(np.array(saver._kf.landmark_ids)), symbol='triangle-up'))) fig.update_yaxes(scaleanchor='x', scaleratio=1) save_plot(fig=fig, filename=filename, test_case=test_case)
[docs]def plot_gps_coordinates(gps_coordinates: pd.DataFrame, lat_key: str = 'lat', lon_key: str = 'lon', filename: str = 'gps', test_case: str = 'default'): """ Plot GPS Coordinates. ... Parameters ---------- gps_coordinates : pd.DataFrame Pandas dataframe containing the gps coordinates. lat_key : str, optional Key for the latitudes in the pandas dataframe., by default 'lat' lon_key : str, optional Key for the longitudes in the pandas dataframe., by default 'lon' filename : str, optional Name of the file where the plot should be saved to., by default 'gps' test_case : str, optional Name of the subfolder where the plot should be saved to., by default 'default' """ px.set_mapbox_access_token( 'pk.eyJ1Ijoib2xla2V0dGVybiIsImEiOiJja3kzajI1dzYwMmo3MnFvM3psYWg3YzM5In0.CveZ2yvlXSMo7r3R9ECAqQ') fig = px.line_mapbox(gps_coordinates, lat=lat_key, lon=lon_key, zoom=18, height=1000, # color_discrete_map='identity', # color='color' ) fig.update_layout(mapbox_style="satellite", mapbox_zoom=14, mapbox_center_lat=gps_coordinates[lat_key].iloc[0], margin={'r': 0, 't': 0, 'l': 0, 'b': 0}) save_plot(fig=fig, filename=filename, test_case=test_case, plot_png=False)
[docs]def plot_animation(saver: helpers.Saver, filename: str = 'animation', test_case: str = 'default', map_cones: pd.DataFrame = None, map_x_offset: float = 0, map_y_offset: float = 0, map_rotation: float = 0) -> None: """ Plot and save animation of the vehicle position and landmark position and the respective uncertainty. ... Parameters ---------- saver : helpers.Saver helpers.Saver object containing the estimated data of the KalmanFilter. filename : str, optional Name of the file where the plot should be saved to, by default 'track'. test_case : str, optional Name of the subfolder where the plot should be saved to, by default 'default'. map_cones : pd.DataFrame, optional Pandas dataframe containing the ground truth positions of the landmarks of the map, by default None. map_x_offset : float, optional Offset to move the ground truth map along the x axis., by default 0 map_y_offset : float, optional Offset to move the ground truth map along the y axis., by default 0 map_rotation : float, optional Angle in degrees to rotate the ground truth map around after moving it., by default 0 """ a = KalmanFilterAnimation(saver=saver, save_count=len(saver.x_R)-1, step_size=1, map_cones=map_cones, # noqa: F841 map_x_offset=map_x_offset, map_y_offset=map_y_offset, map_rotation=map_rotation) # FFwriter = animation.FFMpegWriter(fps=10) # a.fig.set_size_inches(20, 10, True) # a.ani.save(f'plots/images/{test_case}/{filename}.mp4', writer=FFwriter, dpi=200) plt.show()
[docs]def plot_standard_plots(saver: helpers.Saver, start: int = 0, end: Optional[int] = None, test_case: str = 'default', map_cones=None, map_x_offset: float = 0, map_y_offset: float = 0, map_rotation: float = 0) -> None: """ Plot all standard plots for the saver of the KalmanFilter. Plot the estimated acceleration, angular rate, velocity, position, local and global trajectory Parameters ---------- saver : helpers.Saver helpers.Saver with the KalmanFilter data. start : int, optional Timestamp from where to plot the data., by default 0 end : Optional[int], optional Timestamp until where to plot the data., by default None test_case : str, optional Name of the test case., by default 'default' map_x_offset : float, optional Offset to move the ground truth map along the x axis., by default 0 map_y_offset : float, optional Offset to move the ground truth map along the y axis., by default 0 map_rotation : float, optional Angle in degrees to rotate the ground truth map around after moving it., by default 0 """ plot_track(saver, filename='track', test_case=test_case) # plot_gps_coordinates(helpers.local_to_gps(np.array(saver.x_R), gamma=saver.x_R[0][5], # lat_origin=49.5104631, lon_origin=7.8769798), # test_case=test_case) # # plot.plot_gps_coordinates(df[['GPS_LAT', 'GPS_LON']].dropna(), lat_key='GPS_LAT', lon_key='GPS_LON', # # filename='gps_measurement', test_case=test_case) plot_state_measurement_control( saver, x_i=[2], start=start, end=end, filename='v_x', test_case=test_case, u_i=[1], sensor_configs=[ SensorPlotConfig(sensor_name='wheelspeed_fl', z_i=[0], x_i=[0]), SensorPlotConfig(sensor_name='wheelspeed_fr', z_i=[0], x_i=[0]), SensorPlotConfig(sensor_name='wheelspeed_rl', z_i=[0], x_i=[0]), SensorPlotConfig(sensor_name='wheelspeed_rr', z_i=[0], x_i=[0]), SensorPlotConfig(sensor_name='gps_position', z_i=[0, 1, 2], x_i=[2])]) plot_state_measurement_control(saver, x_i=[3], start=start, end=end, filename='psi', test_case=test_case, u_i=[2], sensor_configs=[SensorPlotConfig( sensor_name='gps_heading', z_i=[0], x_i=[0])]) plot_state_measurement_control(saver, x_i=[0], start=start, end=end, filename='x_pos', test_case=test_case, sensor_configs=[SensorPlotConfig(sensor_name='gps_position', z_i=[0, 1, 2], x_i=[0])]) plot_state_measurement_control(saver, x_i=[1], start=start, end=end, filename='y_pos', test_case=test_case, sensor_configs=[SensorPlotConfig(sensor_name='gps_position', z_i=[0, 1, 2], x_i=[1])]) plot_flow(saver=saver, test_case=test_case) plot_animation(saver, filename='animation', test_case=test_case, map_cones=map_cones, map_x_offset=map_x_offset, map_y_offset=map_y_offset, map_rotation=map_rotation)
[docs]class KalmanFilterAnimation(object): """An animated scatter plot using matplotlib.animations.FuncAnimation.""" plot_heatmap = False plot_fov = True def __init__(self, saver: helpers.Saver, save_count: int = 1000, step_size: int = 1, map_cones=None, map_x_offset: float = 0, map_y_offset: float = 0, map_rotation: float = 0): """ Initializes kalman filter animation. Parameters ---------- saver : helpers.Saver Saver containing all data of the kalman filter to animate. save_count : int, optional Number of frame to presave, by default 1000. step_size : int, optional Step size of animation, by default 1. map_cones : _type_, optional Pandas Dataframe containing the ground truth position of the landmarks, by default None. map_x_offset : float, optional Offset to move the ground truth map along the x axis., by default 0 map_y_offset : float, optional Offset to move the ground truth map along the y axis., by default 0 map_rotation : float, optional Angle in degrees to rotate the ground truth map around after moving it., by default 0 """ self.saver = saver self.sensor_z = np.array(saver.sensor_z) self.map_x_offset = map_x_offset self.map_y_offset = map_y_offset self.map_rotation = map_rotation self.gps_position_sensor: NovatelGPSPosition self.gps_heading_sensor: NovatelGPSHeading self.gps_position_sensor_index, self.gps_position_sensor = self.saver._kf.sensor('gps_position') self.gps_heading_sensor_index, self.gps_heading_sensor = self.saver._kf.sensor('gps_heading') # size of vehicle l_h, l_v, l_y = 0.5, 2.5, 0.7 l_gps_h, l_gps_v, l_gps_y = 0.5, 2.5, 0.7 self.vehicle_points = np.array([[-l_h, -l_y], [l_v, -l_y], [l_v, l_y], [-l_h, l_y], [-l_h, -l_y]]) self.vehicle_gps_points = np.array([[-l_gps_h, -l_gps_y], [l_gps_v, -l_gps_y], [l_gps_v, l_gps_y], [-l_gps_h, l_gps_y], [-l_gps_h, -l_gps_y]]) if self.plot_heatmap: self.fig, (self.ax, self.heatmap_ax) = plt.subplots(1, 2) else: self.fig, self.ax = plt.subplots() self.raw_map_cones = map_cones self.map_cones = map_cones.copy() self.move_map_cones() self.ax.set_aspect('equal', adjustable='box') self.step_size = step_size # Then setup FuncAnimation. self.ani = Player(self.fig, self.update, step_size=step_size, interval=10, save_count=int((save_count-1)/step_size), init_func=self.setup_plot, maxi=save_count, map_update_transmation_func=self.update_map_transformation, map_x_offset=map_x_offset, map_y_offset=map_y_offset, map_rotation=map_rotation)
[docs] def update_map_transformation(self, map_x_offset: Optional[float] = None, map_y_offset: Optional[float] = None, map_rotation: Optional[float] = None): """ Uupdate all transformation attributes for the ground truth with the. Parameters ---------- map_x_offset : Optional[float], optional Offset to move the ground truth map along the x axis., by default None map_y_offset : Optional[float], optional Offset to move the ground truth map along the y axis., by default None map_rotation : Optional[float], optional Angle in degrees to rotate the ground truth map around after moving it., by default None """ if map_x_offset is not None: self.map_x_offset = map_x_offset if map_y_offset is not None: self.map_y_offset = map_y_offset if map_rotation is not None: self.map_rotation = map_rotation self.move_map_cones() self.replot_map_cones()
[docs] def replot_map_cones(self): """ Replots the ground truth map after changig map transformation parameters. """ if self.map_cones is not None: size = LocalMapping.get_size_for_cone_ids(self.raw_map_cones[:, 2]) colors = LocalMapping.get_color_for_cone_ids(self.raw_map_cones[:, 2]) self.set_cones_data_for_scatter(self.ground_truth_scatter, (self.map_cones[:, 0:2], colors, size))
[docs] def move_map_cones(self): """ Moves and rotates the ground truth map according to the current map transformation parameters. """ if self.map_cones is not None: self.map_cones[:, 0:2] = geometry.rotate_and_move(xy=self.raw_map_cones.copy()[:, 0:2], rotation=np.radians(self.map_rotation), offset_after_rotation=np.array([self.map_x_offset, self.map_y_offset]))
[docs] def setup_plot(self): """ Setup all plots. """ # Setup scatter plot to plot tacked landmarks self.tracked_cones_scatter = self.ax.scatter(np.array([np.nan]), np.array([np.nan]), c=np.array(['blue']), s=np.array([np.nan]), vmin=0, vmax=1, cmap="jet", edgecolor=None, marker="^") # Setup line plot to plot position and heading of the vehicle self.vehicle_plot = self.ax.plot(self.vehicle_points[:, 0], self.vehicle_points[:, 1]) # Setup line plot to plot position and heading of the vehicle according to the novatel gps, # if both measurements available self.vehicle_gps_with_heading_plot = self.ax.plot(self.vehicle_gps_points[:, 0], self.vehicle_gps_points[:, 1], alpha=0, c='red') # Setup scatter plot to plot position of the vehicle according to the novatel gps, # if only the position measurement is available self.vehicle_gps_without_heading_plot = self.ax.scatter(0, 0, alpha=0, marker='x', s=100, c='red') # Setup arrow plot to plot heading of the vehicle according to the novatel gps, # if only the heading measurement is avaiblable self.gps_heading_plot = self.ax.arrow(0, 0, 4, 0, alpha=0, color='red', width=0.1) # Setup scatter plot to plot observed landmarks self.observed_cones_scatter = self.ax.scatter(np.array([np.nan]), np.array([np.nan]), c=np.array(['blue']), s=np.array([np.nan]), vmin=0, vmax=1, cmap="jet", edgecolor=None, marker='2') # Plot ground truth once if specified if self.map_cones is not None: landmark_ids = self.map_cones[:, 2].astype(int) # Setup scatter plot to plot externally generated ground truth self.ground_truth_scatter = self.ax.scatter(self.map_cones[:, 0], self.map_cones[:, 1], edgecolor=None, marker="x", c=LocalMapping.get_color_for_cone_ids(landmark_ids), s=LocalMapping.get_size_for_cone_ids(landmark_ids)) # Setup info text with time_step and time self.time_text = self.ax.text(-5, -3.5, 't = 0 s', fontsize=15) # Setup heatmap to plot the state noise covariance matrix if specified if self.plot_heatmap: self.P_heatmap = self.heatmap_ax.imshow(self.saver.P[0], cmap='hot', interpolation='nearest') labels = ['x', 'y', 'v_x', 'psi'] + \ np.array([[f'x for Landmark {i}', f'y for Landmark {i}'] for i in range(int((self.saver.P[0].shape[0] - self.saver._kf.x_r_dim)/2))] ).flatten().tolist() self.heatmap_ax.set_xticks(np.arange(len(labels))) self.heatmap_ax.set_yticks(np.arange(len(labels))) self.heatmap_ax.set_xticklabels(labels) self.heatmap_ax.set_yticklabels(labels) # Setup Axis max_and_mins = np.array([[min(global_landmarks[:, 0]), max(global_landmarks[:, 0]), min(global_landmarks[:, 1]), max(global_landmarks[:, 1])] for global_landmarks in self.saver.current_global_landmarks if global_landmarks is not None and global_landmarks.any()]) axis_max_mins = [min(min(max_and_mins[:, 0]), -5) - 2, max(max_and_mins[:, 1]) + 2, min(max_and_mins[:, 2]) - 2, max(max_and_mins[:, 3]) + 2] self.ax.axis(axis_max_mins) # Setup Legend legend_elements = [Line2D([0], [0], color='b', marker='^', lw=0, label='Tracked Landmarks'), Line2D([0], [0], marker='2', color='b', lw=0, label='Measured Landmarks relative to new position'), Line2D([0], [0], color='b', marker='x', lw=0, label='"Ground Truth"'), Patch(facecolor='orange', alpha=0.1, edgecolor='blue', label='Predicted FoV Gate'), Patch(facecolor='purple', alpha=0.1, edgecolor='blue', label='FoV Gate'), Patch(facecolor='blue', edgecolor='blue', label='Vehicle'), Patch(facecolor='red', edgecolor='red', label='GPS Position and Heading'), FancyArrow(0, 0, 1, 0, color='red', width=0.3, label='GPS Heading'), Line2D([0], [0], color='red', marker='x', lw=0, label='GPS Position')] self.ax.legend(handles=legend_elements) # Setup Grid self.ax.minorticks_on() self.ax.xaxis.set_major_locator(MultipleLocator(6)) self.ax.xaxis.set_major_formatter(FormatStrFormatter('%d')) self.ax.xaxis.set_minor_locator(AutoMinorLocator(1)) # self.ax.xaxis.set_minor_locator(MultipleLocator(1)) self.ax.yaxis.set_major_locator(MultipleLocator(6)) self.ax.yaxis.set_major_formatter(FormatStrFormatter('%d')) self.ax.yaxis.set_minor_locator(AutoMinorLocator(1)) # self.ax.yaxis.set_minor_locator(MultipleLocator(1)) self.ax.set_axisbelow(True) self.ax.grid(which='major', color='red') self.ax.grid(which='minor', color='grey') return self.observed_cones_scatter, self.tracked_cones_scatter, self.vehicle_plot[0]
[docs] def set_cones_data_for_scatter(self, scatter: clt.PathCollection, cones_data: Tuple[npt.NDArray, npt.NDArray, npt.NDArray]) -> None: """ Set cones data for a specified scatter plot. Sets coordinates of scatter points, marker color and size. Parameters ---------- scatter : clt.PathCollection Scatter object to be modified. cones_data : Tuple[npt.NDArray, npt.NDArray, npt.NDArray] Landmark positions, marker color and marker size. """ scatter.set_offsets(cones_data[0]) scatter.set_facecolor(cones_data[1]) scatter.set_sizes(cones_data[2]) scatter.set_edgecolor('face')
[docs] def get_cones_data(self, cone_coords: npt.NDArray, cone_ids: npt.NDArray) -> Tuple[npt.NDArray, npt.NDArray, npt.NDArray]: """ Get landmark position, color and size. Parameters ---------- cone_coords : npt.NDArray Cartesian oordinates of the cones. cone_ids : npt.NDArray ID of the cones. Returns ------- Tuple[npt.NDArray, npt.NDArray, npt.NDArray] Landmark positions, marker color and marker size. """ size = LocalMapping.get_size_for_cone_ids(cone_ids) colors = LocalMapping.get_color_for_cone_ids(cone_ids) return np.array(cone_coords), np.array(colors), np.array(size)
[docs] def get_tracked_cones_data(self, time_step: int) -> Tuple[npt.NDArray, npt.NDArray, npt.NDArray]: """ Get position, color and size for all tracked landmarks of the specified time step. Parameters ---------- time_step : int Time step to be animated. Returns ------- Tuple[npt.NDArray, npt.NDArray, npt.NDArray] Tracked landmark positions, marker color and marker size. """ landmarks = self.saver.landmarks[time_step] if landmarks is None or not landmarks.any(): return np.array([[np.nan, np.nan]]), np.array(['blue']), np.array([np.nan]) return self.get_cones_data(landmarks, np.array(self.saver._kf.landmark_ids[:len(landmarks)], dtype=int))
[docs] def get_observed_cones_data(self, time_step: int) -> Tuple[npt.NDArray, npt.NDArray, npt.NDArray]: """ Get position, color and size for all observed landmarks of the specified time step. Parameters ---------- time_step : int Time step to be animated. Returns ------- Tuple[npt.NDArray, npt.NDArray, npt.NDArray] Observed landmark positions, marker color and marker size. """ landmarks = self.saver.current_global_landmarks[time_step] if landmarks is None or not landmarks.any(): return np.array([[np.nan, np.nan]]), np.array(['blue']), np.array([np.nan]) return self.get_cones_data(landmarks[:, 0:2], landmarks[:, 2])
[docs] def create_confidence_ellipse(self, pos: npt.NDArray, cov: npt.NDArray, n_std: float = 3.0, facecolor: str = 'none', **kwargs) -> Ellipse: """ Create confidence ellipse to plot position uncertainty. Using https://matplotlib.org/devdocs/gallery/statistics/confidence_ellipse.html. Parameters ---------- pos : npt.NDArray Cartesian coordinates of the position. cov : npt.NDArray Uncertainty / covariance matrix of the positon. n_std : float, optional Number of standard deviations to plot, by default 3.0. facecolor : str, optional Face color of ellipse to plot, by default 'none'. Returns ------- Ellipse Confidence Ellipse to plot. """ pearson = cov[0, 1]/np.sqrt(cov[0, 0] * cov[1, 1]) # Using a special case to obtain the eigenvalues of this # two-dimensionl dataset. ell_radius_x = np.sqrt(1 + pearson) ell_radius_y = np.sqrt(1 - pearson) ellipse = Ellipse((0, 0), width=ell_radius_x * 2, height=ell_radius_y * 2, facecolor=facecolor, **kwargs) # Calculating the stdandard deviation of x from # the squareroot of the variance and multiplying # with the given number of standard deviations. scale_x = np.sqrt(cov[0, 0]) * n_std mean_x = pos[0] # calculating the stdandard deviation of y ... scale_y = np.sqrt(cov[1, 1]) * n_std mean_y = pos[1] transf = transforms.Affine2D() \ .rotate_deg(45) \ .scale(scale_x, scale_y) \ .translate(mean_x, mean_y) ellipse.set_transform(transf + self.ax.transData) return ellipse
[docs] def update_gps_measurements(self, i): gps_world_position_measurement = self.saver.sensor_z[i][self.gps_position_sensor_index] gps_world_heading_measurement = self.saver.sensor_z[i][self.gps_heading_sensor_index] if np.all(np.isfinite(gps_world_heading_measurement)): gps_map_heading_measurement = self.gps_heading_sensor.hx_inverse(z=gps_world_heading_measurement) else: gps_map_heading_measurement = None if np.all(np.isfinite(gps_world_position_measurement)): gps_map_position_measurement = self.gps_position_sensor.hx_inverse(z=gps_world_position_measurement) else: gps_map_position_measurement = None if gps_map_position_measurement is not None: if gps_map_heading_measurement is not None: vehicle_gps_points = geometry.rotate_and_move( xy=self.vehicle_gps_points.copy(), offset_after_rotation=gps_map_position_measurement[0:2].copy(), rotation=gps_map_heading_measurement[0]) self.vehicle_gps_with_heading_plot[0].set_data(vehicle_gps_points[:, 0], vehicle_gps_points[:, 1]) self.vehicle_gps_with_heading_plot[0].set_alpha(1) self.vehicle_gps_without_heading_plot.set_alpha(0) self.gps_heading_plot.set_alpha(0) else: self.vehicle_gps_without_heading_plot.set_offsets(gps_map_position_measurement[0:2]) self.vehicle_gps_with_heading_plot[0].set_alpha(0) self.vehicle_gps_without_heading_plot.set_alpha(1) self.gps_heading_plot.set_alpha(0) elif gps_map_heading_measurement is not None: self.gps_heading_plot.set_data(x=self.saver.x_R[i][0], y=self.saver.x_R[i][1], dx=np.cos(gps_map_heading_measurement)[0]*3, dy=np.sin(gps_map_heading_measurement)[0]*3) self.vehicle_gps_with_heading_plot[0].set_alpha(0) self.vehicle_gps_without_heading_plot.set_alpha(0) self.gps_heading_plot.set_alpha(1) else: self.vehicle_gps_with_heading_plot[0].set_alpha(0) self.vehicle_gps_without_heading_plot.set_alpha(0) self.gps_heading_plot.set_alpha(0)
[docs] def update(self, i: int): """ Function to update the plots of the animation / figure. Parameters ---------- i : int Current timestep to plot and animate. """ i *= self.step_size if i + self.step_size > len(self.saver.t) or i - self.step_size < 0: self.ani.stop() return # Update info text with timestep and time self.time_text.set_text(f't = {self.saver.t[i]} s, prediction_step = {i}') # Get cone data for observed and tracked cones for this timestep and plot accordingly tracked_cones_data = self.get_tracked_cones_data(i) observed_cones_data = self.get_observed_cones_data(i) self.set_cones_data_for_scatter(self.tracked_cones_scatter, tracked_cones_data) self.set_cones_data_for_scatter(self.observed_cones_scatter, observed_cones_data) # Get vehicle data for this timestep and plot accordingly vehicle_points = geometry.rotate_and_move(self.vehicle_points, rotation=self.saver.x_R[i][3], offset_after_rotation=self.saver.x_R[i][0:2]) self.vehicle_plot[0].set_data(vehicle_points[:, 0], vehicle_points[:, 1]) self.update_gps_measurements(i) # Delete all mapping lines and replot them for the current timestep (keep tracked and gps vehicle plot) for line in self.ax.lines[2:]: line.remove() for observed_landmark_i, tracked_landmark_i in enumerate(self.saver.mapping[i]): if tracked_landmark_i == -1: continue self.ax.plot( [observed_cones_data[0][observed_landmark_i, 0], tracked_cones_data[0][tracked_landmark_i, 0]], [observed_cones_data[0][observed_landmark_i, 1], tracked_cones_data[0][tracked_landmark_i, 1]], 'k-') if self.saver.individual_compatability[i].ndim == 2 and self.saver.observable_to_global_mapping[i].ndim == 1: for observed_landmark_i, observed_landmark_compatability in enumerate(self.saver.individual_compatability[i]): # noqa: E501 for observable_landmark_i, compatability in enumerate(observed_landmark_compatability): if not compatability: continue tracked_landmark_i = self.saver.observable_to_global_mapping[i][observable_landmark_i] self.ax.plot( [observed_cones_data[0][observed_landmark_i, 0], tracked_cones_data[0][tracked_landmark_i, 0]], [observed_cones_data[0][observed_landmark_i, 1], tracked_cones_data[0][tracked_landmark_i, 1]], c='grey', alpha=0.5) # Delete all uncertainty ellipses for collection in self.ax.collections: if isinstance(collection, matplotlib.collections.EllipseCollection): collection.remove() # Calculate uncertainty ellipses for tracked landmarks if np.isfinite(tracked_cones_data[0]).any(): ellipses = np.empty((tracked_cones_data[1].shape[0] + 1, 5)) for tracked_pos_i, pos in enumerate(tracked_cones_data[0]): start_index = self.saver._kf.x_r_dim + 2 * tracked_pos_i covariance = self.saver.P[i][start_index:start_index+2, start_index:start_index+2] ellipses[tracked_pos_i, 0:3] = self.compute_eigenvalues_and_angle_of_covariance(covariance) ellipses[tracked_pos_i, 3:5] = pos else: ellipses = np.empty((1, 5)) # Calculate uncertainty ellipses for vehicle position ellipses[-1, 0:3] = self.compute_eigenvalues_and_angle_of_covariance(self.saver.P[i][0:2, 0:2]) ellipses[-1, 3:5] = self.saver.x[i][0:2] # Plot all new uncertainity ellipses ec = clt.EllipseCollection(ellipses[:, 0], ellipses[:, 1], ellipses[:, 2], offsets=ellipses[:, 3:5], units='xy', transOffset=self.ax.transData, alpha=0.2) self.ax.add_collection(ec) # If specified, delete heatmap image for state noise covariance and replot it if self.plot_heatmap: for image in self.heatmap_ax.images: image.remove() P = self.saver.P[i] self.P_heatmap = self.heatmap_ax.imshow(P, cmap='hot', interpolation='nearest') labels = ['x', 'y', 'v_x', 'psi'] + np.array([[f'x for Landmark {i}', f'y for Landmark {i}'] for i in range(int((P.shape[0] - self.saver._kf.x_r_dim)/2))] ).flatten().tolist() self.heatmap_ax.set_xticks(np.arange(len(labels))) self.heatmap_ax.set_yticks(np.arange(len(labels))) self.heatmap_ax.set_xticklabels(labels) self.heatmap_ax.set_yticklabels(labels) # If specified, plot field of view gate for observing and mapping if self.plot_fov: local_mapping: LocalMapping = self.saver._kf.sensor('local_mapping')[1] # keep gps heading arrow while len(self.ax.patches) > 1: self.ax.patches[1].remove() # Plot field of view gate for observing path = geometry.generate_fov_path(fov_max_distance=local_mapping.fov_max_distance, fov_min_distance=local_mapping.fov_min_distance, fov_angle=local_mapping.fov_angle, offset_after_rotation=self.saver.x[i][0:2], offset_before_rotation=local_mapping.camera_offset, rotation=self.saver.x[i][3]) patch = PathPatch(path, facecolor='purple', lw=2, alpha=0.1) self.ax.add_patch(patch) # Plot field of view gate for mapping path = geometry.generate_fov_path(fov_max_distance=local_mapping.fov_max_distance, fov_angle=local_mapping.fov_angle, fov_scaling=local_mapping.fov_scaling, offset_after_rotation=self.saver.x[i][0:2], offset_before_rotation=local_mapping.camera_offset, rotation=self.saver.x[i][3]) patch = PathPatch(path, facecolor='orange', lw=2, alpha=0.1) self.ax.add_patch(patch) return self.observed_cones_scatter, self.tracked_cones_scatter, self.vehicle_plot[0]
[docs] @staticmethod def compute_eigenvalues_and_angle_of_covariance(cov: npt.NDArray, n_std: float = 1.0) -> Tuple[npt.NDArray, npt.NDArray, float]: """ Compute the eigenvalues and the angle of the covariance matrix. :cite:p:`Kington2012` Parameters ---------- cov : npt.NDArray Covariance matrix. n_std : float, optional Number of standard deviations to plot, by default 1.0. Returns ------- Tuple[npt.NDArray, npt.NDArray, float] Weight, Height and angle of the covariance matrix. """ def eigsorted(cov): vals, vecs = np.linalg.eigh(cov) order = vals.argsort()[::-1] return vals[order], vecs[:, order] vals, vecs = eigsorted(cov) theta = np.degrees(np.arctan2(*vecs[:, 0][::-1])) w, h = 2 * n_std * np.sqrt(vals) return w, h, theta
[docs]class Player(animation.FuncAnimation): """ Wrapper class to extend animation by interactive menu. Attributes ---------- fig : matplotlib.figure.Figure Figure that should be animated. func : Callable Function to call to update figure. min : int Minimal frame number to be animated. max : int Maximal frame number to be animated. step_size : int Number of frames to skip during animation per step. i : int Current frame number. forwards : bool Boolean to specify whether the animation currently runs forward. runs : bool Boolean to specify whether the animation currently is animated / runs. """ def __init__(self, fig: matplotlib.figure.Figure, func: Callable, map_update_transmation_func: Callable, fargs: Optional[Tuple] = None, init_func: Callable = None, save_count: int = None, mini: int = 0, maxi: int = 100, frames=None, pos: Tuple[float, float] = (0.125, 0.92), step_size: int = 1, map_x_offset: float = 0, map_y_offset: float = 0, map_rotation: float = 0, **kwargs): """ Initializes Player with an interactive menu for an animation. Parameters ---------- fig : matplotlib.figure.Figure Figure that should be animated. func : Callable Function to call to update figure. map_update_transmation_func: Callable Function to call to update the parameters for the ground truth map transformation. frames : Optional[Iterable, int, Generator], optional No idea yet, by default None. init_func : Callable, optional Function to initialize figure, by default None. args : Optional[Tuple], optional args to pass to update function, by default None. save_count : int, optional Integer how many frames should be presaved, by default None. mini : int, optional Minimal frame number to be animated, by default 0. maxi : int, optional Maximal frame number to be animated, by default 100. pos : Tuple[float, float], optional Position of interactive menu, by default (0.125, 0.92). step_size : int, optional Number of frames to skip during animation per step, by default 1. map_x_offset : float, optional Offset to move the ground truth map along the x axis., by default 0 map_y_offset : float, optional Offset to move the ground truth map along the y axis., by default 0 map_rotation : float, optional Angle in degrees to rotate the ground truth map around after moving it., by default 0 """ self.i = 0 self.min = mini self.max = maxi self.runs = True self.forwards = True self.step_size = step_size self.fig = fig self.func = func self.map_update_transmation_func = map_update_transmation_func self.setup(pos, map_x_offset=map_x_offset, map_y_offset=map_y_offset, map_rotation=map_rotation) animation.FuncAnimation.__init__(self, self.fig, self.func, frames=self.play(), init_func=init_func, fargs=fargs, save_count=save_count, **kwargs)
[docs] def play(self): """ Play animation. """ while self.runs: self.i = self.i + self.forwards * int(self.step_size_input.val)\ - (not self.forwards) * int(self.step_size_input.val) if self.i > self.min and self.i < self.max: yield self.i else: self.stop() yield self.i
[docs] def start(self): """ Start animation. """ self.runs = True self.event_source.start()
[docs] def stop(self, event=None): """ Stop animation. """ self.runs = False self.event_source.stop()
[docs] def forward(self, event=None): """ Play animation forward. """ self.forwards = True self.start()
[docs] def backward(self, event=None): """ Play animation backward. """ self.forwards = False self.start()
[docs] def oneforward(self, event=None): """ Step one frame forward by setting direction and calling onestep. """ self.forwards = True self.onestep()
[docs] def onebackward(self, event=None): """ Step one frame backward by setting direction and calling onestep. """ self.forwards = False self.onestep()
[docs] def onestep(self): """ Step one frame forward or backward. Direction depends on previous specification. """ if self.i > self.min and self.i < self.max: self.i = self.i+self.forwards-(not self.forwards) elif self.i == self.min and self.forwards: self.i += 1 elif self.i == self.max and not self.forwards: self.i -= 1 self.func(self.i) self.fig.canvas.draw_idle()
[docs] def map_x_offset_changed(self, map_x_offset: str): """ Callback function to change the x offset map transformation parameter. Parameters ---------- map_x_offset : float, optional Offset to move the ground truth map along the x axis. """ self.map_update_transmation_func(map_x_offset=float(map_x_offset))
[docs] def map_y_offset_changed(self, map_y_offset: str): """ Callback function to change the y offset map transformation parameter. Parameters ---------- map_y_offset : float, optional Offset to move the ground truth map along the y axis. """ self.map_update_transmation_func(map_y_offset=float(map_y_offset))
[docs] def map_rotation_changed(self, map_rotation: str): """ Callback function to change the rotation angle map transformation parameter. Parameters ---------- map_rotation : float, optional Angle in degrees to rotate the ground truth map around after moving it. """ self.map_update_transmation_func(map_rotation=float(map_rotation))
[docs] def set_label_for_textbox(self, textbox): label = textbox.ax.get_children()[0] label.set_position([0.5, 1.35]) label.set_verticalalignment('top') label.set_horizontalalignment('center')
[docs] def setup(self, pos: Tuple[float, float], map_x_offset: float = 0, map_y_offset: float = 0, map_rotation: float = 0): """ Setup player and interactive menu. Parameters ---------- pos : Tuple[float, float] Position of the interactive menu. map_x_offset : float, optional Offset to move the ground truth map along the x axis., by default 0 map_y_offset : float, optional Offset to move the ground truth map along the y axis., by default 0 map_rotation : float, optional Angle in degrees to rotate the ground truth map around after moving it., by default 0 """ playerax = self.fig.add_axes([pos[0], pos[1], 0.5, 0.04]) divider = mpl_toolkits.axes_grid1.make_axes_locatable(playerax) bax = divider.append_axes("right", size="100%", pad=0.05) sax = divider.append_axes("right", size="100%", pad=0.05) fax = divider.append_axes("right", size="100%", pad=0.05) ofax = divider.append_axes("right", size="100%", pad=0.1) sssax = divider.append_axes("right", size="100%", pad=0.1) map_x_offset_ax = divider.append_axes("right", size="100%", pad=0.2) map_y_offset_ax = divider.append_axes("right", size="100%", pad=0.2) rotation_ax = divider.append_axes("right", size="100%", pad=0.2) self.button_oneback = matplotlib.widgets.Button(playerax, label=u'$\u29CF$') self.button_back = matplotlib.widgets.Button(bax, label=u'$\u25C0$') self.button_stop = matplotlib.widgets.Button(sax, label=u'$\u25A0$') self.button_forward = matplotlib.widgets.Button(fax, label=u'$\u25B6$') self.button_oneforward = matplotlib.widgets.Button(ofax, label=u'$\u29D0$') self.step_size_input = matplotlib.widgets.Slider(sssax, label='StepSize', valmin=1, valmax=10, valinit=self.step_size, orientation='vertical') self.map_x_offset_input = matplotlib.widgets.TextBox(map_x_offset_ax, 'MapOffsetX', textalignment='center', initial=str(map_x_offset)) self.map_y_offset_input = matplotlib.widgets.TextBox(map_y_offset_ax, 'MapOffsetY', textalignment='center', initial=str(map_y_offset)) self.map_rotation_input = matplotlib.widgets.TextBox(rotation_ax, 'MapRotation', textalignment='center', initial=str(map_rotation)) # move labels above boxes self.set_label_for_textbox(self.map_x_offset_input) self.set_label_for_textbox(self.map_y_offset_input) self.set_label_for_textbox(self.map_rotation_input) self.button_oneback.on_clicked(self.onebackward) self.button_back.on_clicked(self.backward) self.button_stop.on_clicked(self.stop) self.button_forward.on_clicked(self.forward) self.button_oneforward.on_clicked(self.oneforward) self.map_x_offset_input.on_submit(self.map_x_offset_changed) self.map_rotation_input.on_submit(self.map_rotation_changed) self.map_y_offset_input.on_submit(self.map_y_offset_changed)
[docs]def save_tracked_map(filter: ukf.UKF, gps_filter: Optional[ukf.UKF], test_day: str = 'default', track_layout: str = 'default') -> str: base_path = f'{os.path.abspath(os.path.dirname(__file__))}/../../plots/maps/{test_day}/{track_layout}' Path(base_path).mkdir(parents=True, exist_ok=True) gps = {} if gps_filter is not None and gps_filter.initial_heading_t is not None: gps_position_sensor: NovatelGPSPosition = gps_filter.sensor('gps_position')[1] gps_heading_sensor: NovatelGPSHeading = gps_filter.sensor('gps_heading')[1] if gps_position_sensor is not None and gps_position_sensor._transformer is not None and gps_heading_sensor is not None: lat, lon = gps_position_sensor._transformer.transform(xx=0, yy=0, errcheck=True, direction='FORWARD') heading = gps_heading_sensor.hx(sigmas_f=np.zeros((1, 4)), u=None)[0] gps = { 'lat': lat, 'lon': lon, 'heading': heading[0], } map_number = max([-1] + [int(filename.split('map_')[1].split('.json')[0]) for filename in glob.glob(f'{base_path}/map_*.json')]) + 1 perceived_n = filter.landmark_perceived_n.tolist() if type(filter.landmark_perceived_n) == np.ndarray else filter.landmark_perceived_n data = { 'x': filter.x[filter.x_r_dim::2].tolist(), 'y': filter.x[filter.x_r_dim+1::2].tolist(), 'perceived_n': perceived_n, 'registration': (np.array(filter.landmark_ids) != 2).tolist(), 'faked_id': filter.landmark_ids, 'id': filter.landmark_ids, 'gps': gps } with open(f'{base_path}/map_{map_number}.json', 'w') as output: json.dump(data, output, indent=4) return f'{base_path}/map_{map_number}.json'
[docs]def load_preloading_map(test_day: str = 'default', track_layout: str = 'default') -> Tuple[Optional[str], Optional[pd.DataFrame]]: base_path = f'{os.path.abspath(os.path.dirname(__file__))}/../../plots/maps/{test_day}/{track_layout}/' if not os.path.isdir(base_path): return None, None if os.path.isfile(f'{base_path}/manual.json'): file_path = f'{base_path}/manual.json' elif os.path.isfile(f'{base_path}/ground_truth.json'): file_path = f'{base_path}/ground_truth.json' else: map_number = max([-1] + [int(filename.split('map_')[1].split('.json')[0]) for filename in glob.glob(f'{base_path}/map_*.json')]) if map_number == -1: return None, None file_path = f'{base_path}/map_{map_number}.json' with open(file_path, 'r') as input: data = json.load(input) return file_path, data
[docs]def plot_observed_landmark_vs_observable_landmarks(observed_landmarks, observable_landmarks, mapping, test_day='default', track_layout='default'): base_path = f'{os.path.abspath(os.path.dirname(__file__))}/../../plots/maps/{test_day}/{track_layout}' Path(base_path).mkdir(parents=True, exist_ok=True) plt.figure() plt.scatter(observed_landmarks[:, 0], observed_landmarks[:, 1], color=np.choose(observed_landmarks.astype(int)[:, 2], LocalMapping._colors), marker='2', label='observed cones') plt.scatter(observable_landmarks[:, 0], observable_landmarks[:, 1], color=np.choose(observable_landmarks.astype(int)[:, 2], LocalMapping._colors), marker='^', label='observable cones') for observed_landmark_i, observable_landmark_i in enumerate(mapping): if observable_landmark_i == -1: continue plt.plot([observed_landmarks[observed_landmark_i][0], observable_landmarks[observable_landmark_i][0]], [observed_landmarks[observed_landmark_i][1], observable_landmarks[observable_landmark_i][1]], color='black') plt.legend() plt.axis('equal') plt.savefig(f'{base_path}/mapping.png', dpi=400)