Source code for cli.visualizer

import os
import sys
from typing import Optional, List, Tuple, Union, Iterable, TYPE_CHECKING
import copy
import json
from fractions import Fraction

from functools import partial
from tqdm import tqdm

import yappi

import numpy as np
import numpy.typing as npt
import pandas as pd
from scipy.spatial.transform import Rotation as R
from cv_bridge import CvBridge
import cv2
import av
import qoi

from seaborn import color_palette

import pyproj

import rosbag
import rospy
import rospkg
import tf_conversions

from std_msgs.msg import ColorRGBA
from geometry_msgs.msg import Point, PoseStamped
from visualization_msgs.msg import Marker, MarkerArray
from sensor_msgs.msg import CompressedImage, Image
from tf2_msgs.msg import TFMessage
from gps_common.msg import GPSFix
from novatel_oem7_msgs.msg import HEADING2

from utilities.msg import (VehiclePose, PredictedMeasurements, ConePosition, ConePositionWithCovariance, MapAlignment,
                           ConeList, ConeListWithCovariance, CenterPoints, TrajectoryPoints, PredictedStates,
                           TrajectoryPathSlices, BoundingBoxes, BoundingBox)

sys.path.append(f'{os.path.join(os.path.dirname(__file__))}/..')

if TYPE_CHECKING:
    from ..transforms import TransformationHandler
else:
    from transforms import TransformationHandler


CONTROL_TIME_WINDOW = 1.5
DISABLE_PROGRESSBAR = False
LEAVE_JOB_PROGRESSBAR = True


yappi.set_clock_type('cpu')


[docs]class Visualizer(): _cone_colors = [ColorRGBA(r=1, g=1, b=0), ColorRGBA(r=0, g=0, b=1), ColorRGBA(r=1, g=0.6, b=0), ColorRGBA(r=1, g=0, b=0)] _cone_colors_cv2 = [(0, 255, 255), (255, 0, 0), (0, 165, 255), (0, 0, 255)] _cone_color_resources = ['small_cone_yellow.dae', 'small_cone_blue.dae', 'small_cone_orange.dae', 'big_cone_orange.dae'] _cone_size = [(0.228, 0.228, 0.325), (0.228, 0.228, 0.325), (0.228, 0.228, 0.325), (0.285, 0.285, 0.505)] # WxDxH camera_offset = 0.590 def __init__(self, input_rosbag: str, output_rosbag_suffix: str, use_header_time: bool, vehicle: str, test_day: str, track_layout: str, transforms: bool, n_stddev: float, recording: Optional[Tuple[str]], generate_detection_image: bool, gps: bool, start: float, duration: Optional[float], end: Optional[float], calibration_matrices_subdirectory: str, image_visualization: bool, local_motion_planning_color_scale: str) -> None: self.input_rosbag_path = input_rosbag self.input_rosbag = rosbag.Bag(input_rosbag) self.input_rosbag_start = self.input_rosbag.get_start_time() self.output_rosbag_path = f'{input_rosbag.rsplit(".")[0]}{output_rosbag_suffix}.{input_rosbag.rsplit(".")[1]}' self.use_header_time = use_header_time self.image_visualization = image_visualization self.last_camera_frame_for_image_visualization = None self.calibration_matrices_subdirectory = calibration_matrices_subdirectory self.generate_detection_image = generate_detection_image self.recording_topics = recording self.containers = {} self.gps = gps self.vehicle = vehicle self.transforms = transforms self.track_layout = track_layout self.test_day = test_day self.n_stddev = n_stddev self.start = self.input_rosbag_start + start self.end = end or self.input_rosbag.get_end_time() if duration: self.end = min(self.start + duration, self.end) self.local_motion_planning_color_scale = local_motion_planning_color_scale self.input_msg_n = 0 self.messages_to_add: List[Tuple[str, object, float]] = [] self.bridge = CvBridge() self.import_camera_calibration_matrices() # Used for image visualization self.is_using_mask_segmentation = False def __getstate__(self): state = self.__dict__.copy() state['input_rosbag'] = None state['bridge'] = None return state
[docs] def import_camera_calibration_matrices(self): rospack = rospkg.RosPack() base_path = f'{rospack.get_path("local_mapping")}/scripts/matrices/{self.calibration_matrices_subdirectory}' left_intrinsic_matrix = np.load(f'{base_path}/intrinsic_matrix_left.npy') right_intrinsic_matrix = np.load(f'{base_path}/intrinsic_matrix_right.npy') left_extrinsic_matrix = np.load(f'{base_path}/extrinsic_matrix_left.npy') right_extrinsic_matrix = np.load(f'{base_path}/extrinsic_matrix_right.npy') self.combined_matrix = { 'left': np.dot(np.c_[left_intrinsic_matrix, np.zeros(3)], np.linalg.inv(left_extrinsic_matrix)), 'right': np.dot(np.c_[right_intrinsic_matrix, np.zeros(3)], np.linalg.inv(right_extrinsic_matrix)) }
[docs] def create_vehicle_pose_entries(self, topic: str, vehicle_pose: VehiclePose, t): if self.use_header_time is True: time = vehicle_pose.header.stamp.to_sec() else: time = t.to_sec() pose = [vehicle_pose.x, vehicle_pose.y, vehicle_pose.v_x, vehicle_pose.psi] uncertainty = np.reshape(vehicle_pose.P, (4, 4)).tolist() entries = [(time, f'{topic}.pose', pose), (time, f'{topic}.uncertainty', uncertainty)] if topic == '/slam/vehicle_pose': entries.append((time, f'{topic}.pose.raw', vehicle_pose)) return entries
[docs] @staticmethod def cone_list_to_array(cone_list: Union[ConeList, List[ConePosition]]) -> List[Tuple[float, float, int, float, bool]]: if isinstance(cone_list, Iterable): cones = cone_list else: cones = cone_list.cones return [(cone.x, cone.y, cone.id, cone.probability, cone.is_lidar if hasattr(cone, 'is_lidar') else None) for cone in cones]
[docs] def create_predicted_measurements_entries(self, topic: str, predicted_measurements: PredictedMeasurements, t): if self.use_header_time is True: time = predicted_measurements.header.stamp.to_sec() else: time = t.to_sec() gps_position = [predicted_measurements.gps_y, predicted_measurements.gps_x, predicted_measurements.gps_speed] gps_heading = predicted_measurements.gps_heading return [(time, f'{topic}.gps_position', gps_position), (time, f'{topic}.gps_heading', gps_heading), (time, f'{topic}.all', predicted_measurements)]
[docs] def create_slam_landmark_entries(self, topic: str, landmarks: ConeListWithCovariance, t): if self.use_header_time is True: time = landmarks.header.stamp.to_sec() else: time = t.to_sec() return [(time, topic, landmarks)]
[docs] def create_local_mapping_entries(self, topic: str, cone_list: ConeList, t): if self.use_header_time is True: time = cone_list.image_header.stamp.to_sec() else: time = t.to_sec() cones = self.cone_list_to_array(cone_list=cone_list) return [(time, topic, cones)]
[docs] def create_centerpoints_entries(self, topic: str, center_points: CenterPoints, t): if self.use_header_time is True: time = center_points.image_header.stamp.to_sec() else: time = t.to_sec() centerpoints = (np.c_[center_points.x, center_points.y, center_points.left_track_width, center_points.right_track_width]).tolist() return [(time, topic, centerpoints)]
[docs] def create_trajectory_entries(self, topic: str, trajectory_points: TrajectoryPoints, t): if self.use_header_time is True: time = trajectory_points.header.stamp.to_sec() else: time = t.to_sec() centerpoints = (np.c_[trajectory_points.x, trajectory_points.y, trajectory_points.v]).tolist() return [(time, topic, centerpoints)]
[docs] def create_control_predicted_states_entries(self, topic: str, predicted_states: PredictedStates, t): if self.use_header_time is True: time = predicted_states.header.stamp.to_sec() else: time = t.to_sec() predicted_states_ = (np.c_[predicted_states.x_pos, predicted_states.y_pos, predicted_states.v_x, predicted_states.psi, predicted_states.steering_wheel_angle]).tolist() return [(time, topic, predicted_states_)]
[docs] def create_slam_map_alignment_entries(self, topic: str, map_alignment: MapAlignment, t: rospy.Time): if self.use_header_time is True: time = map_alignment.header.stamp.to_sec() else: time = t.to_sec() return [(time, topic, map_alignment)]
[docs] def create_local_motion_planning_path_slices_entries(self, topic: str, path_slices: TrajectoryPathSlices, t: rospy.Time ) -> List[Tuple[float, str, TrajectoryPathSlices]]: if self.use_header_time is True: time = path_slices.header.stamp.to_sec() else: time = t.to_sec() return [(time, topic, path_slices)]
[docs] def create_bounding_boxes_entries(self, topic: str, bounding_boxes: BoundingBoxes, t: rospy.Time) -> List[Tuple[float, str, TrajectoryPathSlices]]: return [(bounding_boxes.image_header.stamp.to_sec(), topic, bounding_boxes)]
[docs] def create_gps_entries(self, topic: str, gps: GPSFix, t: rospy.Time) -> List[Tuple[float, str, GPSFix]]: return [(gps.header.stamp.to_sec(), topic, gps)]
[docs] def create_gps_heading_entries(self, topic: str, gps: HEADING2, t: rospy.Time) -> List[Tuple[float, str, HEADING2]]: return [(gps.header.stamp.to_sec(), topic, gps)]
[docs] def import_rosbag(self, pbar: tqdm): data = [] pbar.reset(total=self.input_rosbag.get_message_count()) for topic, msg, t in self.input_rosbag.read_messages(): if t.to_sec() >= self.start: self.input_msg_n += 1 if t.to_sec() > self.end + 2: break if 'vehicle_pose' in topic: data += self.create_vehicle_pose_entries(topic=topic, vehicle_pose=msg, t=t) if 'predicted_measurements' in topic: data += self.create_predicted_measurements_entries(topic=topic, predicted_measurements=msg, t=t) if '/slam/landmarks' == topic: data += self.create_slam_landmark_entries(topic=topic, landmarks=msg, t=t) if '/local_mapping/cone_positions' == topic: data += self.create_local_mapping_entries(topic=topic, cone_list=msg, t=t) if '/filtering/centerpoints' == topic: data += self.create_centerpoints_entries(topic=topic, center_points=msg, t=t) if '/motion_planning/trajectory' == topic: data += self.create_trajectory_entries(topic=topic, trajectory_points=msg, t=t) if '/control/predicted_states' == topic: data += self.create_control_predicted_states_entries(topic=topic, predicted_states=msg, t=t) if '/slam/map_alignment' == topic: data += self.create_slam_map_alignment_entries(topic=topic, map_alignment=msg, t=t) if '/motion_planning/local/path_slices' == topic: data += self.create_local_motion_planning_path_slices_entries(topic=topic, path_slices=msg, t=t) if '/perception/bounding_boxes' == topic: data += self.create_bounding_boxes_entries(topic=topic, bounding_boxes=msg, t=t) if '/gps/gps' == topic: data += self.create_gps_entries(topic=topic, gps=msg, t=t) if '/novatel/oem7/heading2' == topic: data += self.create_gps_heading_entries(topic=topic, gps=msg, t=t) pbar.update(1) pbar.total = pbar.n pbar.refresh() self.rosbag_df = pd.DataFrame(data, columns=['time', 'name', 'data']).sort_values(by='time', ascending=True)
[docs] def save_rosbag(self): self.rosbag_df.html(f'{self.input_rosbag_path.rsplit(".")[0]}.html')
[docs] def get_values_from_rosbag_df(self, name: str, start_time: Optional[float] = None, end_time: Optional[float] = None, duration: Optional[float] = None): if self.rosbag_df.empty: return None sliced = self.rosbag_df[self.rosbag_df['name'] == name] if start_time is not None: sliced = sliced[sliced['time'].ge(start_time)] if start_time is not None and duration is not None: sliced = sliced[sliced['time'].le(start_time + duration)] if end_time is not None and duration is not None: sliced = sliced[sliced['time'].ge(end_time - duration)] if end_time is not None: sliced = sliced[sliced['time'].le(end_time)] if sliced.shape[0] == 0: return None return sliced['data']
[docs] def create_cone_marker(self, cone: Tuple[float, float, int, float, bool], frame_id: str, namespace: str, cone_id: int, time, stretch: bool = False) -> Marker: marker = Marker() marker.type = Marker.CYLINDER marker.action = marker.ADD marker.id = cone_id marker.ns = namespace marker.header.stamp = rospy.Time.from_sec(time) marker.header.frame_id = frame_id marker.pose.orientation.w = 1.0 marker.pose.position.x = cone[0] marker.pose.position.y = cone[1] cone_size = self._cone_size[cone[2]] marker.pose.position.z = cone_size[2] / 2 if stretch is False: marker.scale.x = cone_size[0] marker.scale.y = cone_size[1] marker.scale.z = cone_size[2] else: marker.scale.x = cone_size[0] / 1.5 marker.scale.y = cone_size[1] / 1.5 marker.scale.z = cone_size[2] * 1.5 marker.color = self._cone_colors[cone[2]] marker.color.a = 1 return marker
[docs] def create_cone_annotation_marker(self, cone: Tuple[float, float, int, float], annotation: str, frame_id: str, namespace: str, cone_id: int, time) -> Marker: marker = Marker() marker.type = Marker.TEXT_VIEW_FACING marker.action = marker.ADD marker.id = cone_id marker.ns = namespace marker.text = annotation marker.header.stamp = rospy.Time.from_sec(time) marker.header.frame_id = frame_id marker.pose.orientation.w = 1.0 marker.pose.position.x = cone[0] marker.pose.position.y = cone[1] cone_size = self._cone_size[cone[2]] marker.pose.position.z = cone_size[2] marker.scale.z = 0.3 marker.color = self._cone_colors[cone[2]] marker.color.a = 1 return marker
[docs] @staticmethod def create_mock_marker(frame_id: str, namespace: str, cone_id: int, time) -> Marker: marker = Marker() marker.ns = namespace marker.header.stamp = rospy.Time.from_sec(time) marker.header.frame_id = frame_id marker.id = cone_id marker.action = marker.DELETE return marker
[docs] def create_mock_markers(self, length: int, max_length: int, frame_id: str, namespace: str, time) -> List[Marker]: markers = [] for cone_id in range(length, max_length): markers.append(self.create_mock_marker(frame_id=frame_id, namespace=namespace, cone_id=cone_id, time=time)) return markers
[docs] def create_vehicle_marker(self, pbar: tqdm): pbar.reset(total=1) marker = Marker() marker.header.frame_id = 'vehicle' marker.ns = 'vehicle' # Shape (mesh resource type - 10) marker.type = 10 marker.id = 0 marker.action = 0 # Note: Must set mesh_resource to a valid URL for a model to appear marker.mesh_resource = f'http://localhost:7777/{self.vehicle}.dae' marker.mesh_use_embedded_materials = True marker.frame_locked = True # Scale marker.scale.x = 0.001 marker.scale.y = 0.001 marker.scale.z = 0.001 # Color marker.color.r = 1.0 marker.color.g = 1.0 marker.color.b = 1.0 marker.color.a = 1.0 marker.pose.orientation.y = 0.0 marker.pose.orientation.z = 0.0 marker.pose.orientation.w = 0.707 # Pose if self.vehicle == 'eva': marker.pose.orientation.x = 0.707 marker.pose.position.x = 0.2 marker.pose.position.y = 0 marker.pose.position.z = 0 if self.vehicle in ['emma', 'rennate'] : marker.pose.orientation.x = -0.707 marker.pose.position.x = 0.8 marker.pose.position.y = 0 marker.pose.position.z = 0 pbar.update(1) return [('/vehicle', marker, self.start)]
[docs] def create_ground_truth_marker(self, cone: Tuple[float, float, int]) -> Marker: marker = Marker() marker.ns = 'ground_truth_map' marker.header.frame_id = 'map' marker.action = Marker.ADD marker.type = Marker.MESH_RESOURCE marker.mesh_resource = f'http://localhost:7777/{self._cone_color_resources[cone[2]]}' marker.mesh_use_embedded_materials = True marker.frame_locked = True marker.scale.x = 1 marker.scale.y = 1 marker.scale.z = 1 marker.pose.orientation.x = 0.0 marker.pose.orientation.y = 0.0 marker.pose.orientation.z = 0.0 marker.pose.orientation.w = 1 marker.pose.position.x = cone[0] marker.pose.position.y = cone[1] marker.pose.position.z = 0 return marker
[docs] def create_ground_truth_markers(self, pbar: tqdm): pbar.reset(total=1) if self.test_day is None or self.track_layout is None: tqdm.write('[GroundTruth] Test day and/or track layout is not set. Ground Truth cannot be visualized') path = f'{os.path.abspath(os.path.dirname(__file__))}/../../../' \ f'slam/plots/maps/{self.test_day}/{self.track_layout}/ground_truth.json' if not os.path.exists(path): tqdm.write(f'[GroundTruth] {path} does not exist. Ground Truth cannot be loaded and visualized.') return [] with open(path, 'r') as input: ground_truth_map = json.load(input) markers = MarkerArray() for cone_i, cone in enumerate(zip(ground_truth_map['x'], ground_truth_map['y'], ground_truth_map['id'])): marker = self.create_ground_truth_marker(cone) marker.id = cone_i markers.markers.append(marker) tqdm.write(f'[GroundTruth] Visualized {path} as ground truth map!') pbar.update(1) return [('/visualization/ground_truth', markers, self.start)]
[docs] def create_local_mapping_markers(self, pbar: tqdm): messages_to_add = [] max_length = 0 cone_lists = self.rosbag_df[self.rosbag_df['name'] == '/local_mapping/cone_positions'][['time', 'data']] pbar.reset(total=cone_lists.shape[0]) for (time, cone_list) in cone_lists.itertuples(index=False, name=None): marker_array = MarkerArray() length = len(cone_list) for cone_id, cone in enumerate(cone_list): cone: ConePosition # Use lidar frame if is_lidar field is true frame_id = 'lidar' if (len(cone) >= 5 and cone[4]) else 'camera' marker_array.markers.append(self.create_cone_marker( cone=cone, frame_id=frame_id, time=time, cone_id=cone_id, namespace='local_mapping', stretch=True)) marker_array.markers.append( self.create_cone_annotation_marker(cone=cone, frame_id=frame_id, time=time, cone_id=cone_id, annotation=f'prob.: {cone[3]:.2f}', namespace='local_mapping_probability')) marker_array.markers += self.create_mock_markers(length=length, max_length=max_length, time=time, frame_id='camera', namespace='local_mapping') marker_array.markers += self.create_mock_markers(length=length, max_length=max_length, time=time, frame_id='camera', namespace='local_mapping_probability') max_length = max(length, max_length) messages_to_add.append(('/local_mapping/visualization', marker_array, time)) pbar.update(1) return messages_to_add
[docs] def create_path_marker(self, path: List[Tuple[float, float]], frame_id: str, namespace: str, cone_id: int, time, color: ColorRGBA, previous_path: Optional[List[Point]] = None) -> Marker: marker = Marker(color=color) marker.action = marker.ADD marker.type = marker.LINE_STRIP marker.ns = namespace marker.id = cone_id marker.header.frame_id = frame_id marker.header.stamp = rospy.Time.from_sec(time) if previous_path is not None: marker.points = previous_path + [Point(x=point[0], y=point[1], z=0) for point in path[len(previous_path):]] else: marker.points = [Point(x=point[0], y=point[1], z=0) for point in path] marker.scale.x = 0.2 return marker
[docs] def create_control_markers(self, pbar: tqdm): messages_to_add = [] predicted_states_color = ColorRGBA(r=0, g=1, b=1, a=1) main_slam_color = ColorRGBA(r=1, g=1, b=0, a=1) realtime_slam_color = ColorRGBA(r=0, g=0.5, b=0.5, a=1) predicted_states_list = self.rosbag_df[self.rosbag_df['name'] == '/control/predicted_states'][['time', 'data']] pbar.reset(total=predicted_states_list.shape[0]) for (time, predicted_states) in predicted_states_list.itertuples(index=False, name=None): marker_array = MarkerArray() marker_array.markers.append(self.create_path_marker(path=predicted_states, frame_id='map', namespace='predicted_path', time=time, cone_id=1, color=predicted_states_color)) realtime_future_driven_path = self.get_values_from_rosbag_df(name='/slam/vehicle_pose.pose', start_time=time, duration=CONTROL_TIME_WINDOW) if realtime_future_driven_path is not None: marker_array.markers.append(self.create_path_marker(path=realtime_future_driven_path, frame_id='map', namespace='realtime_future_driven_path', time=time, cone_id=1, color=realtime_slam_color)) main_future_driven_path = self.get_values_from_rosbag_df(name='/slam/vehicle_pose/main.pose', start_time=time, duration=CONTROL_TIME_WINDOW) if main_future_driven_path is not None: marker_array.markers.append(self.create_path_marker(path=main_future_driven_path, frame_id='map', namespace='main_future_driven_path', time=time, cone_id=1, color=main_slam_color)) messages_to_add.append(('/control/visualization', marker_array, time)) pbar.update(1) return messages_to_add
[docs] def create_centerpoints_width_marker(self, centerpoints: Tuple[float, float, float, float], time, namespace: str, color: ColorRGBA) -> List[Marker]: markers = [] time_obj = rospy.Time.from_sec(time) for index, centerpoint in enumerate(centerpoints): marker = Marker() marker.header.frame_id = 'map' marker.header.stamp = time_obj marker.ns = namespace marker.id = index marker.type = marker.CYLINDER marker.action = marker.ADD marker.scale.x = (centerpoint[2] + centerpoint[3]) / 2 marker.scale.y = marker.scale.x marker.scale.z = 0.05 marker.color = color marker.pose.orientation.w = 1.0 marker.pose.position.x = centerpoint[0] marker.pose.position.y = centerpoint[1] marker.pose.position.z = 0 markers.append(marker) return markers
[docs] def create_filtering_markers(self, pbar: tqdm): messages_to_add = [] centerpoints_color = ColorRGBA(r=0, g=1, b=0, a=0.3) centerpoints_width_color = ColorRGBA(r=0, g=1, b=0, a=0.1) max_length = 0 centerpoints_list = self.rosbag_df[self.rosbag_df['name'] == '/filtering/centerpoints'][['time', 'data']] pbar.reset(total=centerpoints_list.shape[0]) for (time, centerpoints) in centerpoints_list.itertuples(index=False, name=None): length = len(centerpoints) marker_array = MarkerArray() marker_array.markers.append(self.create_path_marker(path=centerpoints, frame_id='map', namespace='centerpoints', time=time, cone_id=1, color=centerpoints_color)) marker_array.markers += self.create_centerpoints_width_marker(centerpoints=centerpoints, time=time, color=centerpoints_width_color, namespace='centerpoints_width') marker_array.markers += self.create_mock_markers(length=length, max_length=max_length, time=time, frame_id='map', namespace='centerpoints_width') max_length = max(length, max_length) messages_to_add.append(('/filtering/visualization', marker_array, time)) pbar.update(1) return messages_to_add
[docs] def create_motion_planning_markers(self, pbar: tqdm): messages_to_add = [] trajectory_color = ColorRGBA(r=1, g=0, b=0.5, a=0.3) trajectories = self.rosbag_df[self.rosbag_df['name'] == '/motion_planning/trajectory'][['time', 'data']] pbar.reset(total=trajectories.shape[0]) for (time, trajectory) in trajectories.itertuples(index=False, name=None): marker_array = MarkerArray() marker_array.markers.append(self.create_path_marker(path=trajectory, frame_id='map', namespace='predicted_path', time=time, cone_id=1, color=trajectory_color)) messages_to_add.append(('/motion_planning/visualization', marker_array, time)) pbar.update(1) return messages_to_add
[docs] @staticmethod def compute_eigenvalues_and_angle_of_covariance(cov: npt.NDArray, n_std: float = 1.0 ) -> Tuple[npt.NDArray, npt.NDArray, float]: """ Compute the eigenvalues and the angle of the covariance matrix. https://stackoverflow.com/questions/20126061/creating-a-confidence-ellipses-in-a-sccatterplot-using-matplotlib. Parameters ---------- cov : npt.NDArray Covariance matrix. n_std : float, optional Number of standard deviations to plot, by default 1.0. Returns ------- Tuple[npt.NDArray, npt.NDArray, float] Weight, Height and angle of the covariance matrix. """ def eigsorted(cov): vals, vecs = np.linalg.eigh(cov) order = vals.argsort()[::-1] return vals[order], vecs[:, order] vals, vecs = eigsorted(cov[:2, :2]) theta = np.arctan2(*vecs[:, 0][::-1]) w, h = 2 * n_std * np.sqrt(vals) return w, h, theta
[docs] def create_pose_arrow_marker(self, vehicle_pose: Tuple[float, float, float], time, color: ColorRGBA, namespace: str, marker_id: int, frame_id: str, length: float = 1.) -> Marker: marker = Marker() marker.header.frame_id = frame_id marker.frame_locked = False marker.header.stamp = rospy.Time.from_sec(time) marker.ns = namespace marker.id = marker_id marker.type = marker.ARROW marker.color = color marker.points.append(Point(x=vehicle_pose[0], y=vehicle_pose[1], z=0)) marker.points.append(Point(x=vehicle_pose[0] + length * np.cos(vehicle_pose[3]), y=vehicle_pose[1] + length * np.sin(vehicle_pose[3]), z=0)) marker.scale.x = 0.1 * length marker.scale.y = 0.3 * length marker.scale.z = 0.2 * length return marker
[docs] def create_position_uncertainty_marker(self, position: Tuple[float, float], covariance: npt.NDArray, time, namespace: str, frame_id: str, marker_id: int, color: ColorRGBA, n_std: float = 1) -> Marker: marker = Marker() marker.header.frame_id = frame_id marker.frame_locked = True marker.header.stamp = rospy.Time.from_sec(time) marker.ns = namespace marker.id = marker_id marker.type = marker.CYLINDER marker.color = color w, h, theta = self.compute_eigenvalues_and_angle_of_covariance(cov=covariance, n_std=n_std) marker.scale.z = 0.05 marker.scale.x = w marker.scale.y = h q = tf_conversions.transformations.quaternion_from_euler(0, 0, theta) marker.pose.orientation.x = q[0] marker.pose.orientation.y = q[1] marker.pose.orientation.z = q[2] marker.pose.orientation.w = q[3] marker.pose.position.x = position[0] marker.pose.position.y = position[1] marker.pose.position.z = 0 return marker
[docs] def create_heading_uncertainity_marker(self, vehicle_pose: Tuple[float, float, float, float], time, heading_uncertainty: float, namespace: str, frame_id: str, marker_id: int, color: ColorRGBA, length: float) -> Marker: marker = Marker() marker.header.frame_id = frame_id marker.frame_locked = True marker.header.stamp = rospy.Time.from_sec(time) marker.ns = namespace marker.id = marker_id marker.type = marker.TRIANGLE_LIST marker.color = color stddev = np.sqrt(heading_uncertainty) marker.points.append(Point(x=vehicle_pose[0], y=vehicle_pose[1], z=0)) marker.points.append(Point(x=vehicle_pose[0] + length * np.cos(vehicle_pose[3] + self.n_stddev * stddev), y=vehicle_pose[1] + length * np.sin(vehicle_pose[3] + self.n_stddev * stddev), z=0)) marker.points.append(Point(x=vehicle_pose[0] + length * np.cos(vehicle_pose[3] - self.n_stddev * stddev), y=vehicle_pose[1] + length * np.sin(vehicle_pose[3] - self.n_stddev * stddev), z=0)) marker.pose.orientation.w = 1 return marker
[docs] def create_slam_pose_markers(self, filter: str, base_color: ColorRGBA, pbar: tqdm): messages_to_add = [] previous_path = [] vehicle_pose_history_color = copy.deepcopy(base_color) vehicle_pose_uncertainty_color = copy.deepcopy(base_color) vehicle_pose_uncertainty_color.a = 0.2 vehicle_heading_uncertainty_color = copy.deepcopy(base_color) vehicle_heading_uncertainty_color.a = 0.4 filter_slash = f'/{filter}' if filter != '' else '' filter_underscore = f'_{filter}' if filter != '' else '' vehicle_poses = self.rosbag_df[ self.rosbag_df['name'] == f'/slam/vehicle_pose{filter_slash}.pose'][['time', 'data']] vehicle_pose_uncertainties = self.rosbag_df[ self.rosbag_df['name'] == f'/slam/vehicle_pose{filter_slash}.uncertainty'][['time', 'data']] vehicle_pose_history = [] pbar.reset(total=vehicle_poses.shape[0]) for ((time, vehicle_pose), (_, vehicle_pose_uncertainty)) in zip(vehicle_poses.itertuples(index=False, name=None), vehicle_pose_uncertainties.itertuples(index=False, name=None)): marker_array = MarkerArray() vehicle_pose_history.append(vehicle_pose) marker_array.markers.append(self.create_pose_arrow_marker( vehicle_pose=vehicle_pose, time=time, length=2, color=base_color, marker_id=1, frame_id='map', namespace=f'vehicle_pose{filter_underscore}')) if len(vehicle_pose_history) > 1: marker_array.markers.append(self.create_path_marker( path=vehicle_pose_history, frame_id='map', cone_id=2, time=time, color=vehicle_pose_history_color, namespace=f'vehicle_pose{filter_underscore}', previous_path=previous_path)) previous_path = marker_array.markers[-1].points marker_array.markers.append(self.create_position_uncertainty_marker( position=vehicle_pose[:2], covariance=np.reshape(vehicle_pose_uncertainty, (4, 4)), time=time, marker_id=3, namespace=f'vehicle_pose{filter_underscore}_uncertainty', color=vehicle_pose_uncertainty_color, frame_id='map')) marker_array.markers.append(self.create_heading_uncertainity_marker( vehicle_pose=vehicle_pose, heading_uncertainty=vehicle_pose_uncertainty[3][3], time=time, marker_id=4, namespace=f'vehicle_pose{filter_underscore}_uncertainty', color=vehicle_heading_uncertainty_color, length=2, frame_id='map')) messages_to_add.append((f'/slam{filter_slash}/visualization', marker_array, time)) pbar.update(1) return messages_to_add
[docs] def create_landmark_position_and_uncertainty_markers(self, landmarks: List[ConePositionWithCovariance], base_namespace: str, frame_id: str, time) -> List[Marker]: markers: List[Marker] = [] for landmark_i, landmark in enumerate(landmarks): markers.append(self.create_cone_marker(cone=(landmark.x, landmark.y, landmark.id), frame_id=frame_id, namespace=base_namespace, cone_id=landmark_i, time=time)) color = copy.deepcopy(self._cone_colors[landmark.id]) color.a = 0.2 markers.append(self.create_position_uncertainty_marker( position=(landmark.x, landmark.y), covariance=np.reshape(landmark.covariance, (2, 2)), time=time, namespace=f'{base_namespace}_uncertainty', frame_id=frame_id, color=color, marker_id=landmark_i)) markers.append( self.create_cone_annotation_marker(cone=[landmark.x, landmark.y, landmark.id], frame_id='map', time=time, cone_id=landmark_i, annotation=f'perc. n: {landmark.perceived_n}', namespace=f'{base_namespace}_perceived_n')) return markers
[docs] def create_landmark_mapping_markers(self, frame_id: str, namespace: str, observed_landmarks: List[ConePosition], observable_landmarks: List[ConePosition], mapping, time) -> List[Marker]: marker = Marker() marker.header.stamp = rospy.Time.from_sec(time) marker.frame_locked = True marker.header.frame_id = frame_id marker.type = marker.LINE_LIST marker.action = marker.ADD marker.ns = namespace marker.id = 1 marker.scale.x = 0.1 for observed_landmark, observable_landmark_i in zip(observed_landmarks, mapping): if observable_landmark_i == -1: continue marker.colors.append(self._cone_colors[observed_landmark.id]) marker.colors.append(self._cone_colors[observed_landmark.id]) height = self._cone_size[observed_landmark.id][2] / 2 marker.points.append(Point(x=observed_landmark.x, y=observed_landmark.y, z=height)) marker.points.append(Point(x=observable_landmarks[observable_landmark_i].x, y=observable_landmarks[observable_landmark_i].y, z=height)) return [marker]
[docs] def create_landmark_compatibility_markers(self, frame_id: str, namespace: str, time, color: ColorRGBA, individual_compatibility: npt.NDArray, observed_landmarks: List[ConePosition], observable_landmarks: List[ConePosition]) -> List[Marker]: marker = Marker() marker.header.stamp = rospy.Time.from_sec(time) marker.frame_locked = True marker.header.frame_id = frame_id marker.type = marker.LINE_LIST marker.action = marker.ADD marker.ns = namespace marker.id = 1 marker.scale.x = 0.03 for observed_landmark_i, observable_landmark_i in np.argwhere(individual_compatibility != 0): if observable_landmark_i == -1: continue marker.colors.append(color) marker.colors.append(color) height = self._cone_size[observed_landmarks[observed_landmark_i].id][2] / 2 marker.points.append(Point(x=observed_landmarks[observed_landmark_i].x, y=observed_landmarks[observed_landmark_i].y, z=height)) marker.points.append(Point(x=observable_landmarks[observable_landmark_i].x, y=observable_landmarks[observable_landmark_i].y, z=height)) return [marker]
[docs] def create_predicted_observed_landmarks_markers(self, frame_id: str, namespace: str, time, predicted_observed_landmarks: List[ConePosition]) -> List[Marker]: marker = Marker() marker.header.stamp = rospy.Time.from_sec(time) marker.frame_locked = True marker.header.frame_id = frame_id marker.type = marker.SPHERE_LIST marker.action = marker.ADD marker.ns = namespace marker.id = 1 marker.scale.x = self._cone_size[0][0] marker.scale.y = self._cone_size[0][1] marker.scale.z = self._cone_size[0][2] for predicted_observed_landmark in predicted_observed_landmarks: marker.colors.append(self._cone_colors[predicted_observed_landmark.id]) marker.points.append(Point(x=predicted_observed_landmark.x, y=predicted_observed_landmark.y, z=self._cone_size[predicted_observed_landmark.id][2] / 2)) return [marker]
[docs] def create_observable_landmarks_markers(self, frame_id: str, namespace: str, time, observable_landmarks: List[ConePosition]) -> List[Marker]: marker = Marker() marker.header.stamp = rospy.Time.from_sec(time) marker.frame_locked = True marker.header.frame_id = frame_id marker.type = marker.CUBE_LIST marker.action = marker.ADD marker.ns = namespace marker.id = 1 marker.scale.x = self._cone_size[0][0] marker.scale.y = self._cone_size[0][1] marker.scale.z = self._cone_size[0][2] for observable_landmark in observable_landmarks: marker.colors.append(self._cone_colors[observable_landmark.id]) marker.points.append(Point(x=observable_landmark.x, y=observable_landmark.y, z=self._cone_size[observable_landmark.id][2] / 2)) return [marker]
[docs] def create_observed_landmarks_markers(self, frame_id: str, namespace: str, time, observed_landmarks: List[ConePosition]) -> List[Marker]: marker = Marker() marker.header.stamp = rospy.Time.from_sec(time) marker.frame_locked = True marker.header.frame_id = frame_id marker.type = marker.LINE_LIST marker.action = marker.ADD marker.ns = namespace marker.id = 1 marker.scale.x = self._cone_size[0][0] / 5 for observed_landmark in observed_landmarks: marker.colors.append(self._cone_colors[observed_landmark.id]) marker.colors.append(self._cone_colors[observed_landmark.id]) marker.points.append(Point(x=observed_landmark.x, y=observed_landmark.y, z=self._cone_size[observed_landmark.id][2] * 1.5)) marker.points.append(Point(x=observed_landmark.x, y=observed_landmark.y, z=self._cone_size[observed_landmark.id][2] * -0.5)) return [marker]
[docs] def create_slam_landmark_markers(self, pbar: tqdm): messages_to_add = [] max_length = 0 landmarks_list = self.rosbag_df[self.rosbag_df['name'] == '/slam/landmarks'][['time', 'data']] pbar.reset(total=landmarks_list.shape[0]) for time, landmarks in landmarks_list.itertuples(index=False, name=None): landmarks: ConeListWithCovariance length = len(landmarks.cones) marker_array = MarkerArray() marker_array.markers += self.create_landmark_position_and_uncertainty_markers(landmarks=landmarks.cones, base_namespace='landmarks', frame_id='map', time=time) marker_array.markers += self.create_mock_markers(length=length, max_length=max_length, frame_id='map', namespace='landmarks', time=time) marker_array.markers += self.create_mock_markers(length=length, max_length=max_length, frame_id='map', namespace='landmarks_uncertainty', time=time) marker_array.markers += self.create_mock_markers(length=length, max_length=max_length, frame_id='map', namespace='landmarks_perceived_n', time=time) max_length = max(max_length, length) predicted_measurements = self.rosbag_df[ (self.rosbag_df['name'] == '/slam/predicted_measurements/main.all') & (self.rosbag_df['time'] == time)] if not predicted_measurements.empty: predicted_measurements_msg: PredictedMeasurements = predicted_measurements.iloc[0]['data'] if predicted_measurements_msg.observed_landmarks: marker_array.markers += self.create_landmark_mapping_markers( frame_id='map', namespace='landmark_mapping', time=time, observed_landmarks=predicted_measurements_msg.observed_landmarks, observable_landmarks=predicted_measurements_msg.observable_landmarks, mapping=predicted_measurements_msg.mapping) individual_compatibility = np.reshape(predicted_measurements_msg.individual_compatibility, (len(predicted_measurements_msg.observed_landmarks), -1)) marker_array.markers += self.create_landmark_compatibility_markers( frame_id='map', namespace='landmark_mapping_compatibility', time=time, color=ColorRGBA(r=0.5, g=0.5, b=0.5, a=0.3), observed_landmarks=predicted_measurements_msg.observed_landmarks, observable_landmarks=predicted_measurements_msg.observable_landmarks, individual_compatibility=individual_compatibility) marker_array.markers += self.create_predicted_observed_landmarks_markers( frame_id='map', namespace='predicted_observed_landmark', time=time, predicted_observed_landmarks=predicted_measurements_msg.predicted_observed_landmarks ) marker_array.markers += self.create_observable_landmarks_markers( frame_id='map', namespace='observable_landmarks', time=time, observable_landmarks=predicted_measurements_msg.observable_landmarks ) marker_array.markers += self.create_observed_landmarks_markers( frame_id='map', namespace='observed_landmarks', time=time, observed_landmarks=predicted_measurements_msg.observed_landmarks ) messages_to_add.append(('/slam/landmarks/visualization', marker_array, time)) pbar.update(1) return messages_to_add
[docs] def create_map_alignment_pose(self, frame_id: str, namespace: str, time: rospy.Time, translation: Tuple[float, float], rotation_matrix: Tuple[float, float, float, float] ) -> PoseStamped: pose = PoseStamped() pose.header.frame_id = frame_id pose.header.stamp = rospy.Time.from_sec(time) pose.pose.position.x = translation[0] pose.pose.position.y = translation[1] rotation_matrix_3d = np.zeros((3, 3)) rotation_matrix_3d[:2, :2] = np.reshape(rotation_matrix, (2, 2)) q = R.from_matrix(rotation_matrix_3d).inv().as_quat() pose.pose.orientation.x = q[0] pose.pose.orientation.y = q[1] pose.pose.orientation.z = q[2] pose.pose.orientation.w = q[3] return pose
[docs] def create_map_alignment_arrow(self, frame_id: str, namespace: str, time: rospy.Time, translation: Tuple[float, float], rotation_matrix: Tuple[float, float, float, float] ) -> List[Marker]: marker = Marker() marker.header.frame_id = frame_id marker.frame_locked = False marker.header.stamp = rospy.Time.from_sec(time) marker.ns = namespace marker.id = 1 marker.type = marker.ARROW marker.color = ColorRGBA(r=0.5, g=0.5, b=0.5, a=1) marker.points.append(Point(x=0, y=0, z=0)) second_point = np.reshape(rotation_matrix, (2, 2)) @ translation marker.points.append(Point(x=second_point[0], y=second_point[1], z=0)) length = np.linalg.norm(translation) marker.scale.x = 0.1 * length marker.scale.y = 0.3 * length marker.scale.z = 0.2 * length return [marker]
[docs] def create_slam_map_alignment_markers(self, pbar: tqdm): messages_to_add = [] max_length = 0 map_alignment_list = self.rosbag_df[self.rosbag_df['name'] == '/slam/map_alignment'][['time', 'data']] pbar.reset(total=map_alignment_list.shape[0]) for time, map_alignment in map_alignment_list.itertuples(index=False, name=None): map_alignment: MapAlignment marker_array = MarkerArray() length = len(map_alignment.observable_preloaded_cones) for landmark_i, landmark in enumerate(map_alignment.observable_preloaded_cones): color = copy.deepcopy(self._cone_colors[landmark.id]) color.a = 0.2 marker_array.markers.append(self.create_position_uncertainty_marker( position=(landmark.x, landmark.y), covariance=np.reshape(landmark.covariance, (2, 2)), time=time, frame_id='map', color=color, marker_id=landmark_i, namespace='map_alignment_observable_landmark_uncertainty')) marker_array.markers += self.create_mock_markers( length=length, max_length=max_length, frame_id='map', time=time, namespace='map_alignment_observable_landmarks_uncertainty') max_length = max(max_length, length) marker_array.markers += self.create_landmark_mapping_markers( frame_id='map', namespace='map_alignment_mapping', time=time, observed_landmarks=map_alignment.tracked_observed_cones, observable_landmarks=map_alignment.observable_preloaded_cones, mapping=map_alignment.mapping) individual_compatibility = np.reshape(map_alignment.individual_compatibility, (len(map_alignment.tracked_observed_cones), -1)) marker_array.markers += self.create_landmark_compatibility_markers( frame_id='map', namespace='map_alignment_mapping_compatibility', time=time, color=ColorRGBA(r=0.5, g=0.5, b=0.5, a=0.3), observed_landmarks=map_alignment.tracked_observed_cones, observable_landmarks=map_alignment.observable_preloaded_cones, individual_compatibility=individual_compatibility) marker_array.markers += self.create_observable_landmarks_markers( frame_id='map', namespace='map_alignment_observable_landmarks', time=time, observable_landmarks=map_alignment.observable_preloaded_cones ) marker_array.markers += self.create_observed_landmarks_markers( frame_id='map', namespace='map_alignment_observed_landmarks', time=time, observed_landmarks=map_alignment.tracked_observed_cones ) marker_array.markers += self.create_map_alignment_arrow( frame_id='map', namespace='map_alignment', time=time, translation=map_alignment.translation, rotation_matrix=map_alignment.rotation_matrix ) pose = self.create_map_alignment_pose( frame_id='map', namespace='map_alignment', time=time, translation=map_alignment.translation, rotation_matrix=map_alignment.rotation_matrix ) messages_to_add.append(('/slam/map_alignment/visualization', marker_array, time)) messages_to_add.append(('/slam/map_alignment/transform_pose', pose, time)) pbar.update(1) return messages_to_add
[docs] def create_motion_planning_path_slices_markers(self, path_slices: TrajectoryPathSlices, time: float) -> List[Marker]: color_scale = color_palette("viridis", as_cmap=True) marker = Marker() marker.header.frame_id = 'map' marker.frame_locked = False marker.header.stamp = rospy.Time.from_sec(time) marker.ns = 'local_motion_planning_path_slices' marker.id = 1 marker.type = marker.LINE_LIST marker.scale.x = 0.005 for path_slice in path_slices.path_slices: if self.local_motion_planning_color_scale == 'TOTAL': color = ColorRGBA(*color_scale(path_slice.total_cost)) if self.local_motion_planning_color_scale == 'LENGTH': color = ColorRGBA(*color_scale(path_slice.length_cost)) if self.local_motion_planning_color_scale == 'ASC': color = ColorRGBA(*color_scale(path_slice.average_squared_curvature_cost)) if self.local_motion_planning_color_scale == 'PSC': color = ColorRGBA(*color_scale(path_slice.peak_squared_curvature_cost)) marker.colors.append(color) marker.points.append(Point(x=path_slice.x[0], y=path_slice.y[0], z=0)) for x, y in zip(path_slice.x[1:-1], path_slice.y[1:-1]): marker.colors.append(color) marker.colors.append(color) marker.points.append(Point(x=x, y=y, z=0)) marker.points.append(Point(x=x, y=y, z=0)) marker.colors.append(color) marker.points.append(Point(x=path_slice.x[-1], y=path_slice.y[-1], z=0)) return [marker]
[docs] def create_all_motion_planning_path_slices_markers(self, pbar: tqdm): messages_to_add = [] max_length = 0 path_slices_list = \ self.rosbag_df[self.rosbag_df['name'] == '/motion_planning/local/path_slices'][['time', 'data']] pbar.reset(total=path_slices_list.shape[0]) for time, path_slices in path_slices_list.itertuples(index=False, name=None): path_slices: TrajectoryPathSlices length = len(path_slices.path_slices) marker_array = MarkerArray() marker_array.markers += self.create_motion_planning_path_slices_markers(path_slices=path_slices, time=time) # marker_array.markers += self.create_mock_markers(length=length, max_length=0, frame_id='map', time=time, # namespace='local_motion_planning_path_slices') max_length = max(length, max_length) messages_to_add.append(('/motion_planning/local/visualization', marker_array, time)) pbar.update(1) return messages_to_add
[docs] def create_gps_markers(self, pbar: tqdm): messages_to_add = [] previous_path = [] gps_pos_list = self.rosbag_df[self.rosbag_df['name'] == '/gps/gps'][['time', 'data']] vehicle_pose_list = self.rosbag_df[self.rosbag_df['name'] == '/slam/vehicle_pose.pose.raw'][['time', 'data']] gps_heading_list = self.rosbag_df[self.rosbag_df['name'] == '/novatel/oem7/heading2'][['time', 'data']] # only continue if all values are present if gps_pos_list.empty or gps_heading_list.empty or vehicle_pose_list.empty: return [] # get the gps position message and vehicle pose message for the first heading message so that the gps transformer can be created gps_pos_for_first_heading_index = np.argmin(np.abs(gps_pos_list['time'] - gps_heading_list.iloc[0]['time'])) gps_pos_for_first_heading = gps_pos_list.iloc[gps_pos_for_first_heading_index]['data'] vehicle_pose_for_first_heading_index = np.argmin(np.abs(vehicle_pose_list['time'] - gps_heading_list.iloc[0]['time'])) vehicle_pose_for_first_heading = vehicle_pose_list.iloc[vehicle_pose_for_first_heading_index]['data'] lat0 = gps_pos_for_first_heading.latitude lon0 = gps_pos_for_first_heading.longitude # create gps transformer which will translate gps coordinates to local gps_map coordinates crs = pyproj.CRS.from_proj4(f'+proj=sterea +lat_0={lat0} +lon_0={lon0} +k=1 +x_0={0} +y_0={0} +ellps=GRS80 +towgs84=0,0,0,0,0,0,0 +units=m +no_defs') transformer = pyproj.Transformer.from_crs(crs_from=crs, crs_to='EPSG:4326') # calculate translation and rotation matrix to transform gps_map coordinates to map coordinates heading = gps_heading_list.iloc[0]['data'].heading * np.pi / 180 translation = np.array([vehicle_pose_for_first_heading.x, vehicle_pose_for_first_heading.y]) psi = vehicle_pose_for_first_heading.psi alpha = heading - psi - np.pi / 2 rotation_matrix = np.array([[np.cos(alpha), -np.sin(alpha)], [np.sin(alpha), np.cos(alpha)]]) gps_pos_history = [] gps_heading_for_pos = {} # create a dictionary that maps gps positions to gps headings, so that the heading can be visualized at the correct position for time, gps_heading in gps_heading_list.itertuples(index=False, name=None): gps_pos_time = gps_pos_list.iloc[np.argmin(np.abs(gps_pos_list['time'] - time))]['time'] gps_heading_for_pos[gps_pos_time] = gps_heading # create markers for gps positions, heading and uncertainty pbar.reset(total=gps_pos_list.shape[0]) for time, gps_pos in gps_pos_list.itertuples(index=False, name=None): marker_array = MarkerArray() # transform gps coordinates to map coordinates gps_map_frame = transformer.transform(xx=gps_pos.latitude, yy=gps_pos.longitude, errcheck=True, direction='INVERSE') map_frame_rotated = rotation_matrix @ (gps_map_frame - translation) gps_pos_history.append(map_frame_rotated.tolist()) marker_array.markers.append(self.create_path_marker( path=gps_pos_history, frame_id='map', cone_id=1, time=time, color=ColorRGBA(r=0, g=1, b=0, a=0.3), namespace='gps', previous_path=previous_path)) previous_path = marker_array.markers[-1].points cov = gps_pos.position_covariance cov_rot = rotation_matrix @ np.reshape(cov, (3, 3))[:2, :2] @ rotation_matrix.T marker_array.markers.append(self.create_position_uncertainty_marker( position=map_frame_rotated, covariance=cov_rot, time=time, marker_id=2, namespace='gps_uncertainty', color=ColorRGBA(r=0, g=1, b=0, a=0.2), frame_id='map')) if time in gps_heading_for_pos: # if heading is available, visualize it heading = - gps_heading_for_pos[time].heading * np.pi / 180 + alpha + np.pi / 2 marker_array.markers.append(self.create_pose_arrow_marker( vehicle_pose=(map_frame_rotated[0], map_frame_rotated[1], 0, heading), time=time, length=2, color=ColorRGBA(r=0, g=1, b=0, a=1), marker_id=3, frame_id='map', namespace='gps')) marker_array.markers.append(self.create_heading_uncertainity_marker( vehicle_pose=(map_frame_rotated[0], map_frame_rotated[1], 0, heading), time=time, heading_uncertainty=(gps_heading_for_pos[time].heading_stdev * np.pi / 180) ** 2, marker_id=4, namespace='gps_uncertainty', color=ColorRGBA(r=0, g=1, b=0, a=0.4), length=2, frame_id='map')) messages_to_add.append(('/gps/visualization', marker_array, time)) pbar.update(1) return messages_to_add
[docs] def add_transform_messages(self, transformation_handler: TransformationHandler, time: float): current_time_obj = rospy.Time.from_sec(time) for transform in transformation_handler.transforms: transform.header.stamp = current_time_obj transforms = transformation_handler.transforms + ([transformation_handler.map_rear_axle_transform] if transformation_handler.map_rear_axle_transform is not None else []) return [('/tf', copy.deepcopy(TFMessage(transforms)), time)]
[docs] def create_transforms(self, pbar: tqdm): messages_to_add = [] if self.transforms is False: return [] transformation_handler = TransformationHandler(vehicle_name=self.vehicle) delta_t = (1 / transformation_handler.rate) current_time = self.start - delta_t realtime_vehicle_poses = self.rosbag_df[ self.rosbag_df['name'] == '/slam/vehicle_pose.pose.raw'][['time', 'data']] pbar.reset(total=(self.end - self.start) / delta_t) for (time, vehicle_pose) in realtime_vehicle_poses.itertuples(index=False, name=None): while (current_time < time): messages_to_add += self.add_transform_messages(transformation_handler=transformation_handler, time=current_time) current_time += delta_t pbar.update(1) transformation_handler.vehicle_pose_callback(vehicle_pose=vehicle_pose) while (current_time < self.end): messages_to_add += self.add_transform_messages(transformation_handler=transformation_handler, time=current_time) current_time += delta_t pbar.update(1) return messages_to_add
[docs] def plot_detection_image(self, bounding_boxes: BoundingBoxes, image: npt.NDArray) -> None: """ Visualize detected bounding boxes and highest point of cone on an image along with their associated information. Parameters ---------- bounding_boxes : BoundingBoxes An object containing a list of `BoundingBox` objects, each of which has attributes defining the coordinates of the bounding box (xmin, ymin, xmax, ymax), probability of detection, and cone top coordinates (x_cone_top, y_cone_top). image : npt.NDArray A NumPy array representing the image to be annotated, where the image shape is expected to be in the form (height, width, num_channels) with `num_channels`. """ line_thickness = 1 font_thickness = max(line_thickness - 1, 1) for bounding_box in bounding_boxes.bounding_boxes: # Draw bounding box c1 = (bounding_box.xmin, bounding_box.ymin) c2 = (bounding_box.xmax, bounding_box.ymax) cv2.rectangle(image, c1, c2, self._cone_colors_cv2[bounding_box.id], line_thickness, lineType=cv2.LINE_AA) # Display probability prob_text = f'{bounding_box.probability:.2f}' c3 = (c1[0], c1[1] - 3) cv2.putText(image, prob_text, c3, 0, line_thickness / 3, self._cone_colors_cv2[bounding_box.id], font_thickness, lineType=cv2.LINE_AA) if hasattr(bounding_box, "x_cone_top"): # Draw the cone top cone_top = (bounding_box.x_cone_top, bounding_box.y_cone_top) cv2.circle(image, cone_top, radius=3, color=(255, 0, 0), thickness=-1) # Red color for the cone top else: if not self.is_using_mask_segmentation: print("No cone top coordinates available. Using Legacy mode.") self.is_using_mask_segmentation = True # Draw midpoint midpoint = ((bounding_box.xmin + bounding_box.xmax) // 2, bounding_box.ymin) cv2.circle(image, midpoint, radius=3, color=(0, 255, 0), thickness=-1) # Green color for the midpoint
[docs] @staticmethod def interpolate_multidimensional(x: npt.NDArray, xp: npt.NDArray, fp: npt.NDArray) -> npt.NDArray: """ Interpolates 2d function for given points. Parameters ---------- x : npt.NDArray Points for which the function should be interpolated, shape: (m, ) with m equal to number of to be interpolated points. xp : npt.NDArray X values of the 2d function, shape: (n, ) with n equal to the support points of the function. fp : npt.NDArray Y values of the 2d function, shape: (n, k) with n equal to the support points of the function and with k equal number of dimensions of function. Returns ------- npt.NDArray Interpolated function for the given points, shape: (m, 2) with m equal to number of to be interpolated points and with k equal number of dimensions of function. """ j = np.searchsorted(xp, x) - 1 d = ((x - xp[j]) / (xp[j + 1] - xp[j]))[:, np.newaxis] return (1 - d) * fp[j] + fp[j + 1] * d
[docs] def transform_global_to_local_coordinates_by_vehicle_pose(self, global_coordinates: npt.NDArray, vehicle_pose: npt.NDArray) -> npt.NDArray: angle = -vehicle_pose[2] if global_coordinates.shape[1] == 2: rotation = np.array([[np.cos(angle), -np.sin(angle)], [np.sin(angle), np.cos(angle)]]) vehicle_position = np.array([vehicle_pose[0], vehicle_pose[1]]) camera_offset = np.array([self.camera_offset, 0]) elif global_coordinates.shape[1] == 3: rotation = np.array([[np.cos(angle), -np.sin(angle), 0], [np.sin(angle), np.cos(angle), 0], [0, 0, 1]]) vehicle_position = np.array([vehicle_pose[0], vehicle_pose[1], 0]) camera_offset = np.array([self.camera_offset, 0, 0]) return np.einsum('ab, cb -> ca', rotation, global_coordinates - vehicle_position) - camera_offset
[docs] def project_world_coordinates_to_image_pixels(self, coordinates: npt.NDArray, camera: str): image_pixels = np.einsum('ab, cb -> ca', self.combined_matrix[camera], np.c_[1000 * coordinates, np.ones(coordinates.shape[0])]) return (image_pixels / image_pixels[:, 2, None])[:, :2].astype(np.int32)
[docs] def plot_coordinates(self, image: npt.NDArray, global_coordinates_list: npt.NDArray, vehicle_pose: npt.NDArray, camera: str, colors: Tuple[int], thickness: int = 3): for global_coordinates, color in zip(global_coordinates_list, colors): local_coordinates = self.transform_global_to_local_coordinates_by_vehicle_pose( global_coordinates=global_coordinates, vehicle_pose=vehicle_pose ) angles = np.mod(np.arctan2(local_coordinates[:, 1], local_coordinates[:, 0]), 2 * np.pi) angles[angles > np.pi] -= 2 * np.pi image_pixels = self.project_world_coordinates_to_image_pixels( coordinates=local_coordinates[np.abs(angles) < np.radians(120 / 2)], camera=camera) cv2.polylines(img=image, pts=image_pixels[np.newaxis], isClosed=False, color=color, thickness=thickness, lineType=cv2.LINE_AA)
[docs] def plot_centerpoints_on_visualization_image(self, t: rospy.Time, image: npt.NDArray, vehicle_pose: npt.NDArray, camera: str): centerpoints_list = self.rosbag_df[(self.rosbag_df['name'] == '/filtering/centerpoints') & (self.rosbag_df['time'] <= t.to_sec())][['time', 'data']] if centerpoints_list.empty is True: return centerpoints = np.array(centerpoints_list.iloc[-1]['data']) global_coordinates = np.zeros((centerpoints.shape[0], 3)) global_coordinates[:, 0] = centerpoints[:, 0] global_coordinates[:, 1] = centerpoints[:, 1] self.plot_coordinates(image=image, global_coordinates_list=[global_coordinates], camera=camera, vehicle_pose=vehicle_pose, colors=[(0, 255, 0)])
[docs] def plot_future_driven_path_on_visualization_image(self, t: rospy.Time, image: npt.NDArray, vehicle_pose: npt.NDArray, camera): realtime_future_driven_path = self.get_values_from_rosbag_df(name='/slam/vehicle_pose.pose', start_time=t.to_sec(), duration=CONTROL_TIME_WINDOW) if realtime_future_driven_path.empty is True: return realtime_future_driven_points = np.zeros((realtime_future_driven_path.shape[0], 3)) realtime_future_driven_points[:, :2] = np.stack(realtime_future_driven_path)[:, :2] self.plot_coordinates(image=image, global_coordinates_list=[realtime_future_driven_points], camera=camera, vehicle_pose=vehicle_pose, colors=[(127, 127, 0)])
[docs] def plot_control_informations_on_visualization_image(self, t: rospy.Time, image: npt.NDArray, vehicle_pose: npt.NDArray, camera): predicted_states_list = self.rosbag_df[(self.rosbag_df['name'] == '/control/predicted_states') & (self.rosbag_df['time'] <= t.to_sec())][['time', 'data']] if predicted_states_list.empty is True: return predicted_states = np.array(predicted_states_list.iloc[-1]['data']) time = predicted_states_list.iloc[-1]['time'] global_coordinates = np.zeros((predicted_states.shape[0], 3)) global_coordinates[:, 0] = predicted_states[:, 0] global_coordinates[:, 1] = predicted_states[:, 1] self.plot_coordinates(image=image, global_coordinates_list=[global_coordinates], camera=camera, vehicle_pose=vehicle_pose, colors=[(255, 255, 0)]) self.plot_future_driven_path_on_visualization_image(t=rospy.Time.from_sec(time), image=image, vehicle_pose=vehicle_pose, camera=camera)
[docs] def plot_motion_planning_informations_on_visualization_image(self, t: rospy.Time, image: npt.NDArray, vehicle_pose: npt.NDArray, camera): trajectory_list = self.rosbag_df[(self.rosbag_df['name'] == '/motion_planning/trajectory') & (self.rosbag_df['time'] <= t.to_sec())][['time', 'data']] if trajectory_list.empty is True: return trajectory = np.array(trajectory_list.iloc[-1]['data']) global_coordinates = np.zeros((trajectory.shape[0], 3)) global_coordinates[:, 0] = trajectory[:, 0] global_coordinates[:, 1] = trajectory[:, 1] self.plot_coordinates(image=image, global_coordinates_list=[global_coordinates], camera=camera, vehicle_pose=vehicle_pose, colors=[(127, 0, 255)])
[docs] def plot_landmark_informations_on_visualization_image(self, t: rospy.Time, image: npt.NDArray, vehicle_pose: npt.NDArray, camera): landmark_list = self.rosbag_df[(self.rosbag_df['name'] == '/slam/landmarks') & (self.rosbag_df['time'] <= t.to_sec())][['time', 'data']] if landmark_list.empty is True: return landmarks: ConeListWithCovariance = landmark_list.iloc[-1]['data'] angles = np.linspace(0, 2 * np.pi, 100, endpoint=True) circle = np.zeros((100, 3)) circle[:, 0] = np.sin(angles) circle[:, 1] = np.cos(angles) global_coordinates = np.zeros((len(landmarks.cones), 100, 3)) colors = [] for cone_i, cone in enumerate(landmarks.cones): cone: ConePositionWithCovariance global_coordinates[cone_i] = ((circle) * (self._cone_size[cone.id][0] / 2)) + np.array([cone.x, cone.y, 0]) colors.append(self._cone_colors_cv2[cone.id]) self.plot_coordinates(image=image, global_coordinates_list=global_coordinates, camera=camera, vehicle_pose=vehicle_pose, colors=colors, thickness=1)
[docs] def create_image_visualization(self, outbag: rosbag.Bag, t: rospy.Time, topic: str, image_msg: Union[Image, CompressedImage]): bounding_box_msgs = self.rosbag_df[(self.rosbag_df['name'] == '/perception/bounding_boxes') & (self.rosbag_df['time'] == image_msg.header.stamp.to_sec())] if bounding_box_msgs.empty is True: bounding_boxes = None if (self.last_camera_frame_for_image_visualization is not None and image_msg.header.frame_id != self.last_camera_frame_for_image_visualization): return else: bounding_boxes: BoundingBoxes = bounding_box_msgs['data'].iloc[0] if image_msg.header.frame_id != bounding_boxes.header.frame_id: return if 'compressed' in topic: image = self.bridge.compressed_imgmsg_to_cv2(image_msg, desired_encoding='bgr8') elif 'qoi' in topic: image = qoi.decode(image_msg.data) else: image = self.bridge.imgmsg_to_cv2(image_msg, desired_encoding='bgr8') vehicle_poses = self.rosbag_df[self.rosbag_df['name'] == f'/slam/vehicle_pose.pose'][['time', 'data']] if vehicle_poses.iloc[-1]['time'] < image_msg.header.stamp.to_sec(): return camera = 'right' if 'right' in image_msg.header.frame_id else 'left' vehicle_pose = self.interpolate_multidimensional([image_msg.header.stamp.to_sec()], vehicle_poses['time'].to_numpy(), np.stack(vehicle_poses['data'].to_numpy())[:, (0, 1, 3)])[0] self.plot_centerpoints_on_visualization_image(t=t, image=image, vehicle_pose=vehicle_pose, camera=camera) self.plot_control_informations_on_visualization_image(t=t, image=image, vehicle_pose=vehicle_pose, camera=camera) self.plot_motion_planning_informations_on_visualization_image(t=t, image=image, vehicle_pose=vehicle_pose, camera=camera) self.plot_landmark_informations_on_visualization_image(t=t, image=image, vehicle_pose=vehicle_pose, camera=camera) if bounding_boxes is not None: self.plot_detection_image(bounding_boxes=bounding_boxes, image=image) visualization_image_msg = self.bridge.cv2_to_compressed_imgmsg(image, dst_format='jpeg') visualization_image_msg.header = image_msg.header outbag.write(topic='/visualization/image', msg=visualization_image_msg, t=t) self.add_image_to_video_stream(image_msg=visualization_image_msg, topic='/visualization/image') self.last_camera_frame_for_image_visualization = image_msg.header.frame_id
[docs] def create_detection_image(self, outbag: rosbag.Bag, t: rospy.Time, topic: str, image_msg: Union[Image, CompressedImage]): bounding_box_msgs = self.rosbag_df[(self.rosbag_df['name'] == '/perception/bounding_boxes') & (self.rosbag_df['time'] == image_msg.header.stamp.to_sec())] if bounding_box_msgs.empty is True: return else: bounding_boxes: BoundingBoxes = bounding_box_msgs['data'].iloc[0] if image_msg.header.frame_id != bounding_boxes.header.frame_id: return if 'compressed' in topic: image = self.bridge.compressed_imgmsg_to_cv2(image_msg, desired_encoding='bgr8') elif 'qoi' in topic: image = qoi.decode(image_msg.data) else: image = self.bridge.imgmsg_to_cv2(image_msg, desired_encoding='bgr8') if bounding_boxes is not None: self.plot_detection_image(bounding_boxes=bounding_boxes, image=image) detection_image_msg = self.bridge.cv2_to_compressed_imgmsg(image, dst_format='jpeg') detection_image_msg.header = image_msg.header outbag.write(topic='/perception/detection_image', msg=detection_image_msg, t=t) self.add_image_to_video_stream(image_msg=detection_image_msg, topic='/perception/detection_image')
[docs] def prepare_recordings(self): for recording_topic in self.recording_topics: container = av.open(f'{self.input_rosbag_path.rsplit(".")[0]}{recording_topic.replace("/", "_")}.mp4', mode='w') stream = container.add_stream('h264', rate=25) stream.bit_rate = 3000000 stream.pix_fmt = 'yuv420p' stream.codec_context.time_base = Fraction(1, 1000) self.containers[recording_topic] = { 'container': container, 'stream': stream, 'last_time': self.start, 'pts': 0 }
[docs] def add_image_to_video_stream(self, image_msg: Union[Image, CompressedImage], topic: str): if topic not in self.recording_topics: return container = self.containers[topic]['container'] stream = self.containers[topic]['stream'] last_time = self.containers[topic]['last_time'] time = image_msg.header.stamp.to_sec() delta_pts = int((time - last_time) / stream.codec_context.time_base) if delta_pts <= 0: return if 'Compressed' in image_msg._type: image = self.bridge.compressed_imgmsg_to_cv2(image_msg) else: image = self.bridge.imgmsg_to_cv2(image_msg) stream.width = image.shape[1] stream.height = image.shape[0] frame = av.VideoFrame.from_ndarray(image[:, :, :3], format='bgr24') self.containers[topic]['pts'] += delta_pts frame.pts = self.containers[topic]['pts'] for packet in stream.encode(frame): container.mux(packet) self.containers[topic]['last_time'] = time
[docs] def finish_recordings(self): for recording_topic in self.recording_topics: container = self.containers[recording_topic]['container'] stream = self.containers[recording_topic]['stream'] for packet in stream.encode(): container.mux(packet) container.close()
[docs] def generate_output_bag(self, pbar: tqdm): self.messages_to_add.sort(key=lambda message: message[2]) self.prepare_recordings() messages = (message for message in self.messages_to_add) next_message = next(messages, None) pbar.reset(total=self.input_msg_n + len(self.messages_to_add)) with rosbag.Bag(self.output_rosbag_path, 'w') as outbag: while next_message is not None and next_message[2] + 0.3 < self.start: next_message = next(messages, None) pbar.update(1) for topic, msg, t in self.input_rosbag.read_messages(): t: rospy.Time if t.to_sec() < self.start: continue if t.to_sec() > self.end + 2: break if topic == "/tf" and self.transforms is True: continue while next_message is not None and rospy.Time.from_sec(next_message[2]) < t: outbag.write(next_message[0], next_message[1], rospy.Time.from_sec(next_message[2])) self.add_image_to_video_stream(image_msg=next_message[1], topic=next_message[0]) next_message = next(messages, None) pbar.update(1) if self.image_visualization is True and '/image_rect_color' in topic: self.create_image_visualization(outbag=outbag, t=t, image_msg=msg, topic=topic) if self.generate_detection_image is True and '/image_rect_color' in topic: self.create_detection_image(outbag=outbag, t=t, image_msg=msg, topic=topic) outbag.write(topic, msg, t) self.add_image_to_video_stream(image_msg=msg, topic=topic) pbar.update(1) while next_message is not None and next_message[2] < t.to_sec(): outbag.write(next_message[0], next_message[1], rospy.Time.from_sec(next_message[2])) self.add_image_to_video_stream(image_msg=next_message[1], topic=next_message[0]) next_message = next(messages, None) pbar.update(1) self.finish_recordings()
[docs] def run(self): jobs = [ [self.create_gps_markers, "Creating GPS Markers", None], [self.create_vehicle_marker, "Creating Vehicle Marker", None], [self.create_ground_truth_markers, "Creating Ground Truth Markers", None], [self.create_local_mapping_markers, "Creating Local Mapping Markers", None], [self.create_filtering_markers, "Creating Filtering Markers", None], [self.create_motion_planning_markers, "Creating Motion Planning Markers", None], [self.create_control_markers, "Creating Control Markers", None], [partial(self.create_slam_pose_markers, base_color=ColorRGBA(r=1, g=0, b=0, a=1), filter=''), "Creating Slam Realtime Pose Markers", None], [partial(self.create_slam_pose_markers, base_color=ColorRGBA(r=1, g=0.5, b=0, a=1), filter='main'), "Creating Slam Main Pose Markers", None], [self.create_slam_landmark_markers, "Creating Slam Landmark Markers", None], [self.create_slam_map_alignment_markers, "Creating Slam Map Alignment Markers", None], [self.create_all_motion_planning_path_slices_markers, "Creating All Motion Planning Path Slices Markers", None], [self.create_transforms, "Creating Transforms", None], ] total_func_i = 2 import_pbar = tqdm(total=1, desc="Importing Rosbag", leave=True, disable=DISABLE_PROGRESSBAR, position=1) for job in jobs: job[2] = tqdm(total=1, desc=job[1], leave=True, disable=DISABLE_PROGRESSBAR, position=total_func_i) total_func_i += 1 outbag_pbar = tqdm(total=1, desc="Generating Output Rosbag", leave=True, disable=DISABLE_PROGRESSBAR, position=total_func_i) self.import_rosbag(import_pbar) for func, name, pbar in jobs: self.messages_to_add += func(pbar=pbar) self.generate_output_bag(outbag_pbar)
if __name__ == '__main__': # yappi.start() visualizer = Visualizer(input_rosbag='/workspace/as_ros/rosbags/run_811_migrated_2024-05-20-01-49-57.bag', vehicle='rennate', transforms=False, output_rosbag_suffix='_viz2', start=0, duration=10, end=None, use_header_time=True, gps=False, test_day='kw20', track_layout='run_811', n_stddev=1, image_visualization=False, calibration_matrices_subdirectory='', local_motion_planning_color_scale='TOTAL', generate_detection_image=False, recording=[]) visualizer.run() # yappi.stop() # path = '/workspace/as_ros/rosbags/profiling/callgrind.out_visualizer' # yappi.get_func_stats().save(path, type='callgrind') # os.system(f'gprof2dot -w -s -f callgrind {path} | dot -Tsvg -o {path}.svg')