import os
import sys
from typing import Optional , List , Tuple , Union , Iterable , TYPE_CHECKING
import copy
import json
from fractions import Fraction
from functools import partial
from tqdm import tqdm
import yappi
import numpy as np
import numpy.typing as npt
import pandas as pd
from scipy.spatial.transform import Rotation as R
from cv_bridge import CvBridge
import cv2
import av
import qoi
from seaborn import color_palette
import pyproj
import rosbag
import rospy
import rospkg
import tf_conversions
from std_msgs.msg import ColorRGBA
from geometry_msgs.msg import Point , PoseStamped
from visualization_msgs.msg import Marker , MarkerArray
from sensor_msgs.msg import CompressedImage , Image
from tf2_msgs.msg import TFMessage
from gps_common.msg import GPSFix
from novatel_oem7_msgs.msg import HEADING2
from utilities.msg import ( VehiclePose , PredictedMeasurements , ConePosition , ConePositionWithCovariance , MapAlignment ,
ConeList , ConeListWithCovariance , CenterPoints , TrajectoryPoints , PredictedStates ,
TrajectoryPathSlices , BoundingBoxes , BoundingBox )
sys . path . append ( f ' { os . path . join ( os . path . dirname ( __file__ )) } /..' )
if TYPE_CHECKING :
from ..transforms import TransformationHandler
else :
from transforms import TransformationHandler
CONTROL_TIME_WINDOW = 1.5
DISABLE_PROGRESSBAR = False
LEAVE_JOB_PROGRESSBAR = True
yappi . set_clock_type ( 'cpu' )
[docs] class Visualizer ():
_cone_colors = [ ColorRGBA ( r = 1 , g = 1 , b = 0 ), ColorRGBA ( r = 0 , g = 0 , b = 1 ),
ColorRGBA ( r = 1 , g = 0.6 , b = 0 ), ColorRGBA ( r = 1 , g = 0 , b = 0 )]
_cone_colors_cv2 = [( 0 , 255 , 255 ), ( 255 , 0 , 0 ), ( 0 , 165 , 255 ), ( 0 , 0 , 255 )]
_cone_color_resources = [ 'small_cone_yellow.dae' , 'small_cone_blue.dae' ,
'small_cone_orange.dae' , 'big_cone_orange.dae' ]
_cone_size = [( 0.228 , 0.228 , 0.325 ), ( 0.228 , 0.228 , 0.325 ), ( 0.228 , 0.228 , 0.325 ), ( 0.285 , 0.285 , 0.505 )] # WxDxH
camera_offset = 0.590
def __init__ ( self , input_rosbag : str , output_rosbag_suffix : str , use_header_time : bool ,
vehicle : str , test_day : str , track_layout : str , transforms : bool , n_stddev : float ,
recording : Optional [ Tuple [ str ]], generate_detection_image : bool , gps : bool ,
start : float , duration : Optional [ float ], end : Optional [ float ],
calibration_matrices_subdirectory : str , image_visualization : bool ,
local_motion_planning_color_scale : str ) -> None :
self . input_rosbag_path = input_rosbag
self . input_rosbag = rosbag . Bag ( input_rosbag )
self . input_rosbag_start = self . input_rosbag . get_start_time ()
self . output_rosbag_path = f ' { input_rosbag . rsplit ( "." )[ 0 ] }{ output_rosbag_suffix } . { input_rosbag . rsplit ( "." )[ 1 ] } '
self . use_header_time = use_header_time
self . image_visualization = image_visualization
self . last_camera_frame_for_image_visualization = None
self . calibration_matrices_subdirectory = calibration_matrices_subdirectory
self . generate_detection_image = generate_detection_image
self . recording_topics = recording
self . containers = {}
self . gps = gps
self . vehicle = vehicle
self . transforms = transforms
self . track_layout = track_layout
self . test_day = test_day
self . n_stddev = n_stddev
self . start = self . input_rosbag_start + start
self . end = end or self . input_rosbag . get_end_time ()
if duration :
self . end = min ( self . start + duration , self . end )
self . local_motion_planning_color_scale = local_motion_planning_color_scale
self . input_msg_n = 0
self . messages_to_add : List [ Tuple [ str , object , float ]] = []
self . bridge = CvBridge ()
self . import_camera_calibration_matrices ()
# Used for image visualization
self . is_using_mask_segmentation = False
def __getstate__ ( self ):
state = self . __dict__ . copy ()
state [ 'input_rosbag' ] = None
state [ 'bridge' ] = None
return state
[docs] def import_camera_calibration_matrices ( self ):
rospack = rospkg . RosPack ()
base_path = f ' { rospack . get_path ( "local_mapping" ) } /scripts/matrices/ { self . calibration_matrices_subdirectory } '
left_intrinsic_matrix = np . load ( f ' { base_path } /intrinsic_matrix_left.npy' )
right_intrinsic_matrix = np . load ( f ' { base_path } /intrinsic_matrix_right.npy' )
left_extrinsic_matrix = np . load ( f ' { base_path } /extrinsic_matrix_left.npy' )
right_extrinsic_matrix = np . load ( f ' { base_path } /extrinsic_matrix_right.npy' )
self . combined_matrix = {
'left' : np . dot ( np . c_ [ left_intrinsic_matrix , np . zeros ( 3 )], np . linalg . inv ( left_extrinsic_matrix )),
'right' : np . dot ( np . c_ [ right_intrinsic_matrix , np . zeros ( 3 )], np . linalg . inv ( right_extrinsic_matrix ))
}
[docs] def create_vehicle_pose_entries ( self , topic : str , vehicle_pose : VehiclePose , t ):
if self . use_header_time is True :
time = vehicle_pose . header . stamp . to_sec ()
else :
time = t . to_sec ()
pose = [ vehicle_pose . x , vehicle_pose . y , vehicle_pose . v_x , vehicle_pose . psi ]
uncertainty = np . reshape ( vehicle_pose . P , ( 4 , 4 )) . tolist ()
entries = [( time , f ' { topic } .pose' , pose ), ( time , f ' { topic } .uncertainty' , uncertainty )]
if topic == '/slam/vehicle_pose' :
entries . append (( time , f ' { topic } .pose.raw' , vehicle_pose ))
return entries
[docs] @staticmethod
def cone_list_to_array ( cone_list : Union [ ConeList , List [ ConePosition ]]) -> List [ Tuple [ float , float , int , float , bool ]]:
if isinstance ( cone_list , Iterable ):
cones = cone_list
else :
cones = cone_list . cones
return [( cone . x , cone . y , cone . id , cone . probability , cone . is_lidar if hasattr ( cone , 'is_lidar' ) else None ) for cone in cones ]
[docs] def create_predicted_measurements_entries ( self , topic : str , predicted_measurements : PredictedMeasurements , t ):
if self . use_header_time is True :
time = predicted_measurements . header . stamp . to_sec ()
else :
time = t . to_sec ()
gps_position = [ predicted_measurements . gps_y , predicted_measurements . gps_x , predicted_measurements . gps_speed ]
gps_heading = predicted_measurements . gps_heading
return [( time , f ' { topic } .gps_position' , gps_position ), ( time , f ' { topic } .gps_heading' , gps_heading ),
( time , f ' { topic } .all' , predicted_measurements )]
[docs] def create_slam_landmark_entries ( self , topic : str , landmarks : ConeListWithCovariance , t ):
if self . use_header_time is True :
time = landmarks . header . stamp . to_sec ()
else :
time = t . to_sec ()
return [( time , topic , landmarks )]
[docs] def create_local_mapping_entries ( self , topic : str , cone_list : ConeList , t ):
if self . use_header_time is True :
time = cone_list . image_header . stamp . to_sec ()
else :
time = t . to_sec ()
cones = self . cone_list_to_array ( cone_list = cone_list )
return [( time , topic , cones )]
[docs] def create_centerpoints_entries ( self , topic : str , center_points : CenterPoints , t ):
if self . use_header_time is True :
time = center_points . image_header . stamp . to_sec ()
else :
time = t . to_sec ()
centerpoints = ( np . c_ [ center_points . x , center_points . y ,
center_points . left_track_width ,
center_points . right_track_width ]) . tolist ()
return [( time , topic , centerpoints )]
[docs] def create_trajectory_entries ( self , topic : str , trajectory_points : TrajectoryPoints , t ):
if self . use_header_time is True :
time = trajectory_points . header . stamp . to_sec ()
else :
time = t . to_sec ()
centerpoints = ( np . c_ [ trajectory_points . x , trajectory_points . y , trajectory_points . v ]) . tolist ()
return [( time , topic , centerpoints )]
[docs] def create_control_predicted_states_entries ( self , topic : str , predicted_states : PredictedStates , t ):
if self . use_header_time is True :
time = predicted_states . header . stamp . to_sec ()
else :
time = t . to_sec ()
predicted_states_ = ( np . c_ [ predicted_states . x_pos , predicted_states . y_pos , predicted_states . v_x ,
predicted_states . psi , predicted_states . steering_wheel_angle ]) . tolist ()
return [( time , topic , predicted_states_ )]
[docs] def create_slam_map_alignment_entries ( self , topic : str , map_alignment : MapAlignment , t : rospy . Time ):
if self . use_header_time is True :
time = map_alignment . header . stamp . to_sec ()
else :
time = t . to_sec ()
return [( time , topic , map_alignment )]
[docs] def create_local_motion_planning_path_slices_entries ( self , topic : str , path_slices : TrajectoryPathSlices ,
t : rospy . Time
) -> List [ Tuple [ float , str , TrajectoryPathSlices ]]:
if self . use_header_time is True :
time = path_slices . header . stamp . to_sec ()
else :
time = t . to_sec ()
return [( time , topic , path_slices )]
[docs] def create_bounding_boxes_entries ( self , topic : str , bounding_boxes : BoundingBoxes ,
t : rospy . Time ) -> List [ Tuple [ float , str , TrajectoryPathSlices ]]:
return [( bounding_boxes . image_header . stamp . to_sec (), topic , bounding_boxes )]
[docs] def create_gps_entries ( self , topic : str , gps : GPSFix , t : rospy . Time ) -> List [ Tuple [ float , str , GPSFix ]]:
return [( gps . header . stamp . to_sec (), topic , gps )]
[docs] def create_gps_heading_entries ( self , topic : str , gps : HEADING2 , t : rospy . Time ) -> List [ Tuple [ float , str , HEADING2 ]]:
return [( gps . header . stamp . to_sec (), topic , gps )]
[docs] def import_rosbag ( self , pbar : tqdm ):
data = []
pbar . reset ( total = self . input_rosbag . get_message_count ())
for topic , msg , t in self . input_rosbag . read_messages ():
if t . to_sec () >= self . start :
self . input_msg_n += 1
if t . to_sec () > self . end + 2 :
break
if 'vehicle_pose' in topic :
data += self . create_vehicle_pose_entries ( topic = topic , vehicle_pose = msg , t = t )
if 'predicted_measurements' in topic :
data += self . create_predicted_measurements_entries ( topic = topic , predicted_measurements = msg , t = t )
if '/slam/landmarks' == topic :
data += self . create_slam_landmark_entries ( topic = topic , landmarks = msg , t = t )
if '/local_mapping/cone_positions' == topic :
data += self . create_local_mapping_entries ( topic = topic , cone_list = msg , t = t )
if '/filtering/centerpoints' == topic :
data += self . create_centerpoints_entries ( topic = topic , center_points = msg , t = t )
if '/motion_planning/trajectory' == topic :
data += self . create_trajectory_entries ( topic = topic , trajectory_points = msg , t = t )
if '/control/predicted_states' == topic :
data += self . create_control_predicted_states_entries ( topic = topic , predicted_states = msg , t = t )
if '/slam/map_alignment' == topic :
data += self . create_slam_map_alignment_entries ( topic = topic , map_alignment = msg , t = t )
if '/motion_planning/local/path_slices' == topic :
data += self . create_local_motion_planning_path_slices_entries ( topic = topic , path_slices = msg , t = t )
if '/perception/bounding_boxes' == topic :
data += self . create_bounding_boxes_entries ( topic = topic , bounding_boxes = msg , t = t )
if '/gps/gps' == topic :
data += self . create_gps_entries ( topic = topic , gps = msg , t = t )
if '/novatel/oem7/heading2' == topic :
data += self . create_gps_heading_entries ( topic = topic , gps = msg , t = t )
pbar . update ( 1 )
pbar . total = pbar . n
pbar . refresh ()
self . rosbag_df = pd . DataFrame ( data , columns = [ 'time' , 'name' , 'data' ]) . sort_values ( by = 'time' , ascending = True )
[docs] def save_rosbag ( self ):
self . rosbag_df . html ( f ' { self . input_rosbag_path . rsplit ( "." )[ 0 ] } .html' )
[docs] def get_values_from_rosbag_df ( self , name : str , start_time : Optional [ float ] = None ,
end_time : Optional [ float ] = None , duration : Optional [ float ] = None ):
if self . rosbag_df . empty :
return None
sliced = self . rosbag_df [ self . rosbag_df [ 'name' ] == name ]
if start_time is not None :
sliced = sliced [ sliced [ 'time' ] . ge ( start_time )]
if start_time is not None and duration is not None :
sliced = sliced [ sliced [ 'time' ] . le ( start_time + duration )]
if end_time is not None and duration is not None :
sliced = sliced [ sliced [ 'time' ] . ge ( end_time - duration )]
if end_time is not None :
sliced = sliced [ sliced [ 'time' ] . le ( end_time )]
if sliced . shape [ 0 ] == 0 :
return None
return sliced [ 'data' ]
[docs] def create_cone_marker ( self , cone : Tuple [ float , float , int , float , bool ], frame_id : str ,
namespace : str , cone_id : int , time , stretch : bool = False ) -> Marker :
marker = Marker ()
marker . type = Marker . CYLINDER
marker . action = marker . ADD
marker . id = cone_id
marker . ns = namespace
marker . header . stamp = rospy . Time . from_sec ( time )
marker . header . frame_id = frame_id
marker . pose . orientation . w = 1.0
marker . pose . position . x = cone [ 0 ]
marker . pose . position . y = cone [ 1 ]
cone_size = self . _cone_size [ cone [ 2 ]]
marker . pose . position . z = cone_size [ 2 ] / 2
if stretch is False :
marker . scale . x = cone_size [ 0 ]
marker . scale . y = cone_size [ 1 ]
marker . scale . z = cone_size [ 2 ]
else :
marker . scale . x = cone_size [ 0 ] / 1.5
marker . scale . y = cone_size [ 1 ] / 1.5
marker . scale . z = cone_size [ 2 ] * 1.5
marker . color = self . _cone_colors [ cone [ 2 ]]
marker . color . a = 1
return marker
[docs] def create_cone_annotation_marker ( self , cone : Tuple [ float , float , int , float ],
annotation : str , frame_id : str , namespace : str , cone_id : int , time ) -> Marker :
marker = Marker ()
marker . type = Marker . TEXT_VIEW_FACING
marker . action = marker . ADD
marker . id = cone_id
marker . ns = namespace
marker . text = annotation
marker . header . stamp = rospy . Time . from_sec ( time )
marker . header . frame_id = frame_id
marker . pose . orientation . w = 1.0
marker . pose . position . x = cone [ 0 ]
marker . pose . position . y = cone [ 1 ]
cone_size = self . _cone_size [ cone [ 2 ]]
marker . pose . position . z = cone_size [ 2 ]
marker . scale . z = 0.3
marker . color = self . _cone_colors [ cone [ 2 ]]
marker . color . a = 1
return marker
[docs] @staticmethod
def create_mock_marker ( frame_id : str , namespace : str , cone_id : int , time ) -> Marker :
marker = Marker ()
marker . ns = namespace
marker . header . stamp = rospy . Time . from_sec ( time )
marker . header . frame_id = frame_id
marker . id = cone_id
marker . action = marker . DELETE
return marker
[docs] def create_mock_markers ( self , length : int , max_length : int , frame_id : str , namespace : str , time ) -> List [ Marker ]:
markers = []
for cone_id in range ( length , max_length ):
markers . append ( self . create_mock_marker ( frame_id = frame_id , namespace = namespace , cone_id = cone_id , time = time ))
return markers
[docs] def create_vehicle_marker ( self , pbar : tqdm ):
pbar . reset ( total = 1 )
marker = Marker ()
marker . header . frame_id = 'vehicle'
marker . ns = 'vehicle'
# Shape (mesh resource type - 10)
marker . type = 10
marker . id = 0
marker . action = 0
# Note: Must set mesh_resource to a valid URL for a model to appear
marker . mesh_resource = f 'http://localhost:7777/ { self . vehicle } .dae'
marker . mesh_use_embedded_materials = True
marker . frame_locked = True
# Scale
marker . scale . x = 0.001
marker . scale . y = 0.001
marker . scale . z = 0.001
# Color
marker . color . r = 1.0
marker . color . g = 1.0
marker . color . b = 1.0
marker . color . a = 1.0
marker . pose . orientation . y = 0.0
marker . pose . orientation . z = 0.0
marker . pose . orientation . w = 0.707
# Pose
if self . vehicle == 'eva' :
marker . pose . orientation . x = 0.707
marker . pose . position . x = 0.2
marker . pose . position . y = 0
marker . pose . position . z = 0
if self . vehicle in [ 'emma' , 'rennate' ] :
marker . pose . orientation . x = - 0.707
marker . pose . position . x = 0.8
marker . pose . position . y = 0
marker . pose . position . z = 0
pbar . update ( 1 )
return [( '/vehicle' , marker , self . start )]
[docs] def create_ground_truth_marker ( self , cone : Tuple [ float , float , int ]) -> Marker :
marker = Marker ()
marker . ns = 'ground_truth_map'
marker . header . frame_id = 'map'
marker . action = Marker . ADD
marker . type = Marker . MESH_RESOURCE
marker . mesh_resource = f 'http://localhost:7777/ { self . _cone_color_resources [ cone [ 2 ]] } '
marker . mesh_use_embedded_materials = True
marker . frame_locked = True
marker . scale . x = 1
marker . scale . y = 1
marker . scale . z = 1
marker . pose . orientation . x = 0.0
marker . pose . orientation . y = 0.0
marker . pose . orientation . z = 0.0
marker . pose . orientation . w = 1
marker . pose . position . x = cone [ 0 ]
marker . pose . position . y = cone [ 1 ]
marker . pose . position . z = 0
return marker
[docs] def create_ground_truth_markers ( self , pbar : tqdm ):
pbar . reset ( total = 1 )
if self . test_day is None or self . track_layout is None :
tqdm . write ( '[GroundTruth] Test day and/or track layout is not set. Ground Truth cannot be visualized' )
path = f ' { os . path . abspath ( os . path . dirname ( __file__ )) } /../../../' \
f 'slam/plots/maps/ { self . test_day } / { self . track_layout } /ground_truth.json'
if not os . path . exists ( path ):
tqdm . write ( f '[GroundTruth] { path } does not exist. Ground Truth cannot be loaded and visualized.' )
return []
with open ( path , 'r' ) as input :
ground_truth_map = json . load ( input )
markers = MarkerArray ()
for cone_i , cone in enumerate ( zip ( ground_truth_map [ 'x' ], ground_truth_map [ 'y' ], ground_truth_map [ 'id' ])):
marker = self . create_ground_truth_marker ( cone )
marker . id = cone_i
markers . markers . append ( marker )
tqdm . write ( f '[GroundTruth] Visualized { path } as ground truth map!' )
pbar . update ( 1 )
return [( '/visualization/ground_truth' , markers , self . start )]
[docs] def create_local_mapping_markers ( self , pbar : tqdm ):
messages_to_add = []
max_length = 0
cone_lists = self . rosbag_df [ self . rosbag_df [ 'name' ] == '/local_mapping/cone_positions' ][[ 'time' , 'data' ]]
pbar . reset ( total = cone_lists . shape [ 0 ])
for ( time , cone_list ) in cone_lists . itertuples ( index = False , name = None ):
marker_array = MarkerArray ()
length = len ( cone_list )
for cone_id , cone in enumerate ( cone_list ):
cone : ConePosition
# Use lidar frame if is_lidar field is true
frame_id = 'lidar' if ( len ( cone ) >= 5 and cone [ 4 ]) else 'camera'
marker_array . markers . append ( self . create_cone_marker (
cone = cone , frame_id = frame_id , time = time , cone_id = cone_id , namespace = 'local_mapping' , stretch = True ))
marker_array . markers . append (
self . create_cone_annotation_marker ( cone = cone , frame_id = frame_id , time = time , cone_id = cone_id ,
annotation = f 'prob.: { cone [ 3 ] : .2f } ' ,
namespace = 'local_mapping_probability' ))
marker_array . markers += self . create_mock_markers ( length = length , max_length = max_length , time = time ,
frame_id = 'camera' , namespace = 'local_mapping' )
marker_array . markers += self . create_mock_markers ( length = length , max_length = max_length , time = time ,
frame_id = 'camera' , namespace = 'local_mapping_probability' )
max_length = max ( length , max_length )
messages_to_add . append (( '/local_mapping/visualization' , marker_array , time ))
pbar . update ( 1 )
return messages_to_add
[docs] def create_path_marker ( self , path : List [ Tuple [ float , float ]], frame_id : str ,
namespace : str , cone_id : int , time , color : ColorRGBA ,
previous_path : Optional [ List [ Point ]] = None ) -> Marker :
marker = Marker ( color = color )
marker . action = marker . ADD
marker . type = marker . LINE_STRIP
marker . ns = namespace
marker . id = cone_id
marker . header . frame_id = frame_id
marker . header . stamp = rospy . Time . from_sec ( time )
if previous_path is not None :
marker . points = previous_path + [ Point ( x = point [ 0 ], y = point [ 1 ], z = 0 ) for point in path [ len ( previous_path ):]]
else :
marker . points = [ Point ( x = point [ 0 ], y = point [ 1 ], z = 0 ) for point in path ]
marker . scale . x = 0.2
return marker
[docs] def create_control_markers ( self , pbar : tqdm ):
messages_to_add = []
predicted_states_color = ColorRGBA ( r = 0 , g = 1 , b = 1 , a = 1 )
main_slam_color = ColorRGBA ( r = 1 , g = 1 , b = 0 , a = 1 )
realtime_slam_color = ColorRGBA ( r = 0 , g = 0.5 , b = 0.5 , a = 1 )
predicted_states_list = self . rosbag_df [ self . rosbag_df [ 'name' ] == '/control/predicted_states' ][[ 'time' , 'data' ]]
pbar . reset ( total = predicted_states_list . shape [ 0 ])
for ( time , predicted_states ) in predicted_states_list . itertuples ( index = False , name = None ):
marker_array = MarkerArray ()
marker_array . markers . append ( self . create_path_marker ( path = predicted_states , frame_id = 'map' ,
namespace = 'predicted_path' , time = time ,
cone_id = 1 , color = predicted_states_color ))
realtime_future_driven_path = self . get_values_from_rosbag_df ( name = '/slam/vehicle_pose.pose' ,
start_time = time , duration = CONTROL_TIME_WINDOW )
if realtime_future_driven_path is not None :
marker_array . markers . append ( self . create_path_marker ( path = realtime_future_driven_path , frame_id = 'map' ,
namespace = 'realtime_future_driven_path' , time = time ,
cone_id = 1 , color = realtime_slam_color ))
main_future_driven_path = self . get_values_from_rosbag_df ( name = '/slam/vehicle_pose/main.pose' ,
start_time = time , duration = CONTROL_TIME_WINDOW )
if main_future_driven_path is not None :
marker_array . markers . append ( self . create_path_marker ( path = main_future_driven_path , frame_id = 'map' ,
namespace = 'main_future_driven_path' , time = time ,
cone_id = 1 , color = main_slam_color ))
messages_to_add . append (( '/control/visualization' , marker_array , time ))
pbar . update ( 1 )
return messages_to_add
[docs] def create_centerpoints_width_marker ( self , centerpoints : Tuple [ float , float , float , float ],
time , namespace : str , color : ColorRGBA ) -> List [ Marker ]:
markers = []
time_obj = rospy . Time . from_sec ( time )
for index , centerpoint in enumerate ( centerpoints ):
marker = Marker ()
marker . header . frame_id = 'map'
marker . header . stamp = time_obj
marker . ns = namespace
marker . id = index
marker . type = marker . CYLINDER
marker . action = marker . ADD
marker . scale . x = ( centerpoint [ 2 ] + centerpoint [ 3 ]) / 2
marker . scale . y = marker . scale . x
marker . scale . z = 0.05
marker . color = color
marker . pose . orientation . w = 1.0
marker . pose . position . x = centerpoint [ 0 ]
marker . pose . position . y = centerpoint [ 1 ]
marker . pose . position . z = 0
markers . append ( marker )
return markers
[docs] def create_filtering_markers ( self , pbar : tqdm ):
messages_to_add = []
centerpoints_color = ColorRGBA ( r = 0 , g = 1 , b = 0 , a = 0.3 )
centerpoints_width_color = ColorRGBA ( r = 0 , g = 1 , b = 0 , a = 0.1 )
max_length = 0
centerpoints_list = self . rosbag_df [ self . rosbag_df [ 'name' ] == '/filtering/centerpoints' ][[ 'time' , 'data' ]]
pbar . reset ( total = centerpoints_list . shape [ 0 ])
for ( time , centerpoints ) in centerpoints_list . itertuples ( index = False , name = None ):
length = len ( centerpoints )
marker_array = MarkerArray ()
marker_array . markers . append ( self . create_path_marker ( path = centerpoints , frame_id = 'map' ,
namespace = 'centerpoints' , time = time ,
cone_id = 1 , color = centerpoints_color ))
marker_array . markers += self . create_centerpoints_width_marker ( centerpoints = centerpoints , time = time ,
color = centerpoints_width_color ,
namespace = 'centerpoints_width' )
marker_array . markers += self . create_mock_markers ( length = length , max_length = max_length , time = time ,
frame_id = 'map' , namespace = 'centerpoints_width' )
max_length = max ( length , max_length )
messages_to_add . append (( '/filtering/visualization' , marker_array , time ))
pbar . update ( 1 )
return messages_to_add
[docs] def create_motion_planning_markers ( self , pbar : tqdm ):
messages_to_add = []
trajectory_color = ColorRGBA ( r = 1 , g = 0 , b = 0.5 , a = 0.3 )
trajectories = self . rosbag_df [ self . rosbag_df [ 'name' ] == '/motion_planning/trajectory' ][[ 'time' , 'data' ]]
pbar . reset ( total = trajectories . shape [ 0 ])
for ( time , trajectory ) in trajectories . itertuples ( index = False , name = None ):
marker_array = MarkerArray ()
marker_array . markers . append ( self . create_path_marker ( path = trajectory , frame_id = 'map' ,
namespace = 'predicted_path' , time = time ,
cone_id = 1 , color = trajectory_color ))
messages_to_add . append (( '/motion_planning/visualization' , marker_array , time ))
pbar . update ( 1 )
return messages_to_add
[docs] @staticmethod
def compute_eigenvalues_and_angle_of_covariance ( cov : npt . NDArray , n_std : float = 1.0
) -> Tuple [ npt . NDArray , npt . NDArray , float ]:
"""
Compute the eigenvalues and the angle of the covariance matrix.
https://stackoverflow.com/questions/20126061/creating-a-confidence-ellipses-in-a-sccatterplot-using-matplotlib.
Parameters
----------
cov : npt.NDArray
Covariance matrix.
n_std : float, optional
Number of standard deviations to plot, by default 1.0.
Returns
-------
Tuple[npt.NDArray, npt.NDArray, float]
Weight, Height and angle of the covariance matrix.
"""
def eigsorted ( cov ):
vals , vecs = np . linalg . eigh ( cov )
order = vals . argsort ()[:: - 1 ]
return vals [ order ], vecs [:, order ]
vals , vecs = eigsorted ( cov [: 2 , : 2 ])
theta = np . arctan2 ( * vecs [:, 0 ][:: - 1 ])
w , h = 2 * n_std * np . sqrt ( vals )
return w , h , theta
[docs] def create_pose_arrow_marker ( self , vehicle_pose : Tuple [ float , float , float ], time , color : ColorRGBA ,
namespace : str , marker_id : int , frame_id : str , length : float = 1. ) -> Marker :
marker = Marker ()
marker . header . frame_id = frame_id
marker . frame_locked = False
marker . header . stamp = rospy . Time . from_sec ( time )
marker . ns = namespace
marker . id = marker_id
marker . type = marker . ARROW
marker . color = color
marker . points . append ( Point ( x = vehicle_pose [ 0 ], y = vehicle_pose [ 1 ], z = 0 ))
marker . points . append ( Point ( x = vehicle_pose [ 0 ] + length * np . cos ( vehicle_pose [ 3 ]),
y = vehicle_pose [ 1 ] + length * np . sin ( vehicle_pose [ 3 ]), z = 0 ))
marker . scale . x = 0.1 * length
marker . scale . y = 0.3 * length
marker . scale . z = 0.2 * length
return marker
[docs] def create_position_uncertainty_marker ( self , position : Tuple [ float , float ], covariance : npt . NDArray , time ,
namespace : str , frame_id : str , marker_id : int , color : ColorRGBA , n_std : float = 1 ) -> Marker :
marker = Marker ()
marker . header . frame_id = frame_id
marker . frame_locked = True
marker . header . stamp = rospy . Time . from_sec ( time )
marker . ns = namespace
marker . id = marker_id
marker . type = marker . CYLINDER
marker . color = color
w , h , theta = self . compute_eigenvalues_and_angle_of_covariance ( cov = covariance , n_std = n_std )
marker . scale . z = 0.05
marker . scale . x = w
marker . scale . y = h
q = tf_conversions . transformations . quaternion_from_euler ( 0 , 0 , theta )
marker . pose . orientation . x = q [ 0 ]
marker . pose . orientation . y = q [ 1 ]
marker . pose . orientation . z = q [ 2 ]
marker . pose . orientation . w = q [ 3 ]
marker . pose . position . x = position [ 0 ]
marker . pose . position . y = position [ 1 ]
marker . pose . position . z = 0
return marker
[docs] def create_heading_uncertainity_marker ( self , vehicle_pose : Tuple [ float , float , float , float ], time ,
heading_uncertainty : float , namespace : str , frame_id : str ,
marker_id : int , color : ColorRGBA , length : float ) -> Marker :
marker = Marker ()
marker . header . frame_id = frame_id
marker . frame_locked = True
marker . header . stamp = rospy . Time . from_sec ( time )
marker . ns = namespace
marker . id = marker_id
marker . type = marker . TRIANGLE_LIST
marker . color = color
stddev = np . sqrt ( heading_uncertainty )
marker . points . append ( Point ( x = vehicle_pose [ 0 ], y = vehicle_pose [ 1 ], z = 0 ))
marker . points . append ( Point ( x = vehicle_pose [ 0 ] + length * np . cos ( vehicle_pose [ 3 ] + self . n_stddev * stddev ),
y = vehicle_pose [ 1 ] + length * np . sin ( vehicle_pose [ 3 ] + self . n_stddev * stddev ), z = 0 ))
marker . points . append ( Point ( x = vehicle_pose [ 0 ] + length * np . cos ( vehicle_pose [ 3 ] - self . n_stddev * stddev ),
y = vehicle_pose [ 1 ] + length * np . sin ( vehicle_pose [ 3 ] - self . n_stddev * stddev ), z = 0 ))
marker . pose . orientation . w = 1
return marker
[docs] def create_slam_pose_markers ( self , filter : str , base_color : ColorRGBA , pbar : tqdm ):
messages_to_add = []
previous_path = []
vehicle_pose_history_color = copy . deepcopy ( base_color )
vehicle_pose_uncertainty_color = copy . deepcopy ( base_color )
vehicle_pose_uncertainty_color . a = 0.2
vehicle_heading_uncertainty_color = copy . deepcopy ( base_color )
vehicle_heading_uncertainty_color . a = 0.4
filter_slash = f '/ { filter } ' if filter != '' else ''
filter_underscore = f '_ { filter } ' if filter != '' else ''
vehicle_poses = self . rosbag_df [
self . rosbag_df [ 'name' ] == f '/slam/vehicle_pose { filter_slash } .pose' ][[ 'time' , 'data' ]]
vehicle_pose_uncertainties = self . rosbag_df [
self . rosbag_df [ 'name' ] == f '/slam/vehicle_pose { filter_slash } .uncertainty' ][[ 'time' , 'data' ]]
vehicle_pose_history = []
pbar . reset ( total = vehicle_poses . shape [ 0 ])
for (( time , vehicle_pose ),
( _ , vehicle_pose_uncertainty )) in zip ( vehicle_poses . itertuples ( index = False , name = None ),
vehicle_pose_uncertainties . itertuples ( index = False , name = None )):
marker_array = MarkerArray ()
vehicle_pose_history . append ( vehicle_pose )
marker_array . markers . append ( self . create_pose_arrow_marker (
vehicle_pose = vehicle_pose , time = time , length = 2 , color = base_color , marker_id = 1 , frame_id = 'map' ,
namespace = f 'vehicle_pose { filter_underscore } ' ))
if len ( vehicle_pose_history ) > 1 :
marker_array . markers . append ( self . create_path_marker (
path = vehicle_pose_history , frame_id = 'map' , cone_id = 2 , time = time ,
color = vehicle_pose_history_color , namespace = f 'vehicle_pose { filter_underscore } ' ,
previous_path = previous_path ))
previous_path = marker_array . markers [ - 1 ] . points
marker_array . markers . append ( self . create_position_uncertainty_marker (
position = vehicle_pose [: 2 ], covariance = np . reshape ( vehicle_pose_uncertainty , ( 4 , 4 )),
time = time , marker_id = 3 , namespace = f 'vehicle_pose { filter_underscore } _uncertainty' ,
color = vehicle_pose_uncertainty_color , frame_id = 'map' ))
marker_array . markers . append ( self . create_heading_uncertainity_marker (
vehicle_pose = vehicle_pose , heading_uncertainty = vehicle_pose_uncertainty [ 3 ][ 3 ], time = time , marker_id = 4 ,
namespace = f 'vehicle_pose { filter_underscore } _uncertainty' , color = vehicle_heading_uncertainty_color ,
length = 2 , frame_id = 'map' ))
messages_to_add . append (( f '/slam { filter_slash } /visualization' , marker_array , time ))
pbar . update ( 1 )
return messages_to_add
[docs] def create_landmark_position_and_uncertainty_markers ( self , landmarks : List [ ConePositionWithCovariance ],
base_namespace : str , frame_id : str , time ) -> List [ Marker ]:
markers : List [ Marker ] = []
for landmark_i , landmark in enumerate ( landmarks ):
markers . append ( self . create_cone_marker ( cone = ( landmark . x , landmark . y , landmark . id ), frame_id = frame_id ,
namespace = base_namespace , cone_id = landmark_i , time = time ))
color = copy . deepcopy ( self . _cone_colors [ landmark . id ])
color . a = 0.2
markers . append ( self . create_position_uncertainty_marker (
position = ( landmark . x , landmark . y ), covariance = np . reshape ( landmark . covariance , ( 2 , 2 )), time = time ,
namespace = f ' { base_namespace } _uncertainty' , frame_id = frame_id , color = color , marker_id = landmark_i ))
markers . append (
self . create_cone_annotation_marker ( cone = [ landmark . x , landmark . y , landmark . id ],
frame_id = 'map' , time = time , cone_id = landmark_i ,
annotation = f 'perc. n: { landmark . perceived_n } ' ,
namespace = f ' { base_namespace } _perceived_n' ))
return markers
[docs] def create_landmark_mapping_markers ( self , frame_id : str , namespace : str , observed_landmarks : List [ ConePosition ],
observable_landmarks : List [ ConePosition ], mapping , time ) -> List [ Marker ]:
marker = Marker ()
marker . header . stamp = rospy . Time . from_sec ( time )
marker . frame_locked = True
marker . header . frame_id = frame_id
marker . type = marker . LINE_LIST
marker . action = marker . ADD
marker . ns = namespace
marker . id = 1
marker . scale . x = 0.1
for observed_landmark , observable_landmark_i in zip ( observed_landmarks , mapping ):
if observable_landmark_i == - 1 :
continue
marker . colors . append ( self . _cone_colors [ observed_landmark . id ])
marker . colors . append ( self . _cone_colors [ observed_landmark . id ])
height = self . _cone_size [ observed_landmark . id ][ 2 ] / 2
marker . points . append ( Point ( x = observed_landmark . x , y = observed_landmark . y , z = height ))
marker . points . append ( Point ( x = observable_landmarks [ observable_landmark_i ] . x ,
y = observable_landmarks [ observable_landmark_i ] . y , z = height ))
return [ marker ]
[docs] def create_landmark_compatibility_markers ( self , frame_id : str , namespace : str , time , color : ColorRGBA ,
individual_compatibility : npt . NDArray ,
observed_landmarks : List [ ConePosition ],
observable_landmarks : List [ ConePosition ]) -> List [ Marker ]:
marker = Marker ()
marker . header . stamp = rospy . Time . from_sec ( time )
marker . frame_locked = True
marker . header . frame_id = frame_id
marker . type = marker . LINE_LIST
marker . action = marker . ADD
marker . ns = namespace
marker . id = 1
marker . scale . x = 0.03
for observed_landmark_i , observable_landmark_i in np . argwhere ( individual_compatibility != 0 ):
if observable_landmark_i == - 1 :
continue
marker . colors . append ( color )
marker . colors . append ( color )
height = self . _cone_size [ observed_landmarks [ observed_landmark_i ] . id ][ 2 ] / 2
marker . points . append ( Point ( x = observed_landmarks [ observed_landmark_i ] . x ,
y = observed_landmarks [ observed_landmark_i ] . y , z = height ))
marker . points . append ( Point ( x = observable_landmarks [ observable_landmark_i ] . x ,
y = observable_landmarks [ observable_landmark_i ] . y , z = height ))
return [ marker ]
[docs] def create_predicted_observed_landmarks_markers ( self , frame_id : str , namespace : str , time ,
predicted_observed_landmarks : List [ ConePosition ]) -> List [ Marker ]:
marker = Marker ()
marker . header . stamp = rospy . Time . from_sec ( time )
marker . frame_locked = True
marker . header . frame_id = frame_id
marker . type = marker . SPHERE_LIST
marker . action = marker . ADD
marker . ns = namespace
marker . id = 1
marker . scale . x = self . _cone_size [ 0 ][ 0 ]
marker . scale . y = self . _cone_size [ 0 ][ 1 ]
marker . scale . z = self . _cone_size [ 0 ][ 2 ]
for predicted_observed_landmark in predicted_observed_landmarks :
marker . colors . append ( self . _cone_colors [ predicted_observed_landmark . id ])
marker . points . append ( Point ( x = predicted_observed_landmark . x , y = predicted_observed_landmark . y ,
z = self . _cone_size [ predicted_observed_landmark . id ][ 2 ] / 2 ))
return [ marker ]
[docs] def create_observable_landmarks_markers ( self , frame_id : str , namespace : str , time ,
observable_landmarks : List [ ConePosition ]) -> List [ Marker ]:
marker = Marker ()
marker . header . stamp = rospy . Time . from_sec ( time )
marker . frame_locked = True
marker . header . frame_id = frame_id
marker . type = marker . CUBE_LIST
marker . action = marker . ADD
marker . ns = namespace
marker . id = 1
marker . scale . x = self . _cone_size [ 0 ][ 0 ]
marker . scale . y = self . _cone_size [ 0 ][ 1 ]
marker . scale . z = self . _cone_size [ 0 ][ 2 ]
for observable_landmark in observable_landmarks :
marker . colors . append ( self . _cone_colors [ observable_landmark . id ])
marker . points . append ( Point ( x = observable_landmark . x , y = observable_landmark . y ,
z = self . _cone_size [ observable_landmark . id ][ 2 ] / 2 ))
return [ marker ]
[docs] def create_observed_landmarks_markers ( self , frame_id : str , namespace : str , time ,
observed_landmarks : List [ ConePosition ]) -> List [ Marker ]:
marker = Marker ()
marker . header . stamp = rospy . Time . from_sec ( time )
marker . frame_locked = True
marker . header . frame_id = frame_id
marker . type = marker . LINE_LIST
marker . action = marker . ADD
marker . ns = namespace
marker . id = 1
marker . scale . x = self . _cone_size [ 0 ][ 0 ] / 5
for observed_landmark in observed_landmarks :
marker . colors . append ( self . _cone_colors [ observed_landmark . id ])
marker . colors . append ( self . _cone_colors [ observed_landmark . id ])
marker . points . append ( Point ( x = observed_landmark . x , y = observed_landmark . y ,
z = self . _cone_size [ observed_landmark . id ][ 2 ] * 1.5 ))
marker . points . append ( Point ( x = observed_landmark . x , y = observed_landmark . y ,
z = self . _cone_size [ observed_landmark . id ][ 2 ] * - 0.5 ))
return [ marker ]
[docs] def create_slam_landmark_markers ( self , pbar : tqdm ):
messages_to_add = []
max_length = 0
landmarks_list = self . rosbag_df [ self . rosbag_df [ 'name' ] == '/slam/landmarks' ][[ 'time' , 'data' ]]
pbar . reset ( total = landmarks_list . shape [ 0 ])
for time , landmarks in landmarks_list . itertuples ( index = False , name = None ):
landmarks : ConeListWithCovariance
length = len ( landmarks . cones )
marker_array = MarkerArray ()
marker_array . markers += self . create_landmark_position_and_uncertainty_markers ( landmarks = landmarks . cones ,
base_namespace = 'landmarks' ,
frame_id = 'map' , time = time )
marker_array . markers += self . create_mock_markers ( length = length , max_length = max_length , frame_id = 'map' ,
namespace = 'landmarks' , time = time )
marker_array . markers += self . create_mock_markers ( length = length , max_length = max_length , frame_id = 'map' ,
namespace = 'landmarks_uncertainty' , time = time )
marker_array . markers += self . create_mock_markers ( length = length , max_length = max_length , frame_id = 'map' ,
namespace = 'landmarks_perceived_n' , time = time )
max_length = max ( max_length , length )
predicted_measurements = self . rosbag_df [
( self . rosbag_df [ 'name' ] == '/slam/predicted_measurements/main.all' ) & ( self . rosbag_df [ 'time' ] == time )]
if not predicted_measurements . empty :
predicted_measurements_msg : PredictedMeasurements = predicted_measurements . iloc [ 0 ][ 'data' ]
if predicted_measurements_msg . observed_landmarks :
marker_array . markers += self . create_landmark_mapping_markers (
frame_id = 'map' , namespace = 'landmark_mapping' , time = time ,
observed_landmarks = predicted_measurements_msg . observed_landmarks ,
observable_landmarks = predicted_measurements_msg . observable_landmarks ,
mapping = predicted_measurements_msg . mapping )
individual_compatibility = np . reshape ( predicted_measurements_msg . individual_compatibility ,
( len ( predicted_measurements_msg . observed_landmarks ), - 1 ))
marker_array . markers += self . create_landmark_compatibility_markers (
frame_id = 'map' , namespace = 'landmark_mapping_compatibility' ,
time = time , color = ColorRGBA ( r = 0.5 , g = 0.5 , b = 0.5 , a = 0.3 ),
observed_landmarks = predicted_measurements_msg . observed_landmarks ,
observable_landmarks = predicted_measurements_msg . observable_landmarks ,
individual_compatibility = individual_compatibility )
marker_array . markers += self . create_predicted_observed_landmarks_markers (
frame_id = 'map' , namespace = 'predicted_observed_landmark' , time = time ,
predicted_observed_landmarks = predicted_measurements_msg . predicted_observed_landmarks
)
marker_array . markers += self . create_observable_landmarks_markers (
frame_id = 'map' , namespace = 'observable_landmarks' , time = time ,
observable_landmarks = predicted_measurements_msg . observable_landmarks
)
marker_array . markers += self . create_observed_landmarks_markers (
frame_id = 'map' , namespace = 'observed_landmarks' , time = time ,
observed_landmarks = predicted_measurements_msg . observed_landmarks
)
messages_to_add . append (( '/slam/landmarks/visualization' , marker_array , time ))
pbar . update ( 1 )
return messages_to_add
[docs] def create_map_alignment_pose ( self , frame_id : str , namespace : str , time : rospy . Time ,
translation : Tuple [ float , float ], rotation_matrix : Tuple [ float , float , float , float ]
) -> PoseStamped :
pose = PoseStamped ()
pose . header . frame_id = frame_id
pose . header . stamp = rospy . Time . from_sec ( time )
pose . pose . position . x = translation [ 0 ]
pose . pose . position . y = translation [ 1 ]
rotation_matrix_3d = np . zeros (( 3 , 3 ))
rotation_matrix_3d [: 2 , : 2 ] = np . reshape ( rotation_matrix , ( 2 , 2 ))
q = R . from_matrix ( rotation_matrix_3d ) . inv () . as_quat ()
pose . pose . orientation . x = q [ 0 ]
pose . pose . orientation . y = q [ 1 ]
pose . pose . orientation . z = q [ 2 ]
pose . pose . orientation . w = q [ 3 ]
return pose
[docs] def create_map_alignment_arrow ( self , frame_id : str , namespace : str , time : rospy . Time ,
translation : Tuple [ float , float ], rotation_matrix : Tuple [ float , float , float , float ]
) -> List [ Marker ]:
marker = Marker ()
marker . header . frame_id = frame_id
marker . frame_locked = False
marker . header . stamp = rospy . Time . from_sec ( time )
marker . ns = namespace
marker . id = 1
marker . type = marker . ARROW
marker . color = ColorRGBA ( r = 0.5 , g = 0.5 , b = 0.5 , a = 1 )
marker . points . append ( Point ( x = 0 , y = 0 , z = 0 ))
second_point = np . reshape ( rotation_matrix , ( 2 , 2 )) @ translation
marker . points . append ( Point ( x = second_point [ 0 ], y = second_point [ 1 ], z = 0 ))
length = np . linalg . norm ( translation )
marker . scale . x = 0.1 * length
marker . scale . y = 0.3 * length
marker . scale . z = 0.2 * length
return [ marker ]
[docs] def create_slam_map_alignment_markers ( self , pbar : tqdm ):
messages_to_add = []
max_length = 0
map_alignment_list = self . rosbag_df [ self . rosbag_df [ 'name' ] == '/slam/map_alignment' ][[ 'time' , 'data' ]]
pbar . reset ( total = map_alignment_list . shape [ 0 ])
for time , map_alignment in map_alignment_list . itertuples ( index = False , name = None ):
map_alignment : MapAlignment
marker_array = MarkerArray ()
length = len ( map_alignment . observable_preloaded_cones )
for landmark_i , landmark in enumerate ( map_alignment . observable_preloaded_cones ):
color = copy . deepcopy ( self . _cone_colors [ landmark . id ])
color . a = 0.2
marker_array . markers . append ( self . create_position_uncertainty_marker (
position = ( landmark . x , landmark . y ), covariance = np . reshape ( landmark . covariance , ( 2 , 2 )), time = time ,
frame_id = 'map' , color = color , marker_id = landmark_i ,
namespace = 'map_alignment_observable_landmark_uncertainty' ))
marker_array . markers += self . create_mock_markers (
length = length , max_length = max_length , frame_id = 'map' , time = time ,
namespace = 'map_alignment_observable_landmarks_uncertainty' )
max_length = max ( max_length , length )
marker_array . markers += self . create_landmark_mapping_markers (
frame_id = 'map' , namespace = 'map_alignment_mapping' , time = time ,
observed_landmarks = map_alignment . tracked_observed_cones ,
observable_landmarks = map_alignment . observable_preloaded_cones ,
mapping = map_alignment . mapping )
individual_compatibility = np . reshape ( map_alignment . individual_compatibility ,
( len ( map_alignment . tracked_observed_cones ), - 1 ))
marker_array . markers += self . create_landmark_compatibility_markers (
frame_id = 'map' , namespace = 'map_alignment_mapping_compatibility' ,
time = time , color = ColorRGBA ( r = 0.5 , g = 0.5 , b = 0.5 , a = 0.3 ),
observed_landmarks = map_alignment . tracked_observed_cones ,
observable_landmarks = map_alignment . observable_preloaded_cones ,
individual_compatibility = individual_compatibility )
marker_array . markers += self . create_observable_landmarks_markers (
frame_id = 'map' , namespace = 'map_alignment_observable_landmarks' , time = time ,
observable_landmarks = map_alignment . observable_preloaded_cones
)
marker_array . markers += self . create_observed_landmarks_markers (
frame_id = 'map' , namespace = 'map_alignment_observed_landmarks' , time = time ,
observed_landmarks = map_alignment . tracked_observed_cones
)
marker_array . markers += self . create_map_alignment_arrow (
frame_id = 'map' , namespace = 'map_alignment' , time = time ,
translation = map_alignment . translation , rotation_matrix = map_alignment . rotation_matrix
)
pose = self . create_map_alignment_pose (
frame_id = 'map' , namespace = 'map_alignment' , time = time ,
translation = map_alignment . translation , rotation_matrix = map_alignment . rotation_matrix
)
messages_to_add . append (( '/slam/map_alignment/visualization' , marker_array , time ))
messages_to_add . append (( '/slam/map_alignment/transform_pose' , pose , time ))
pbar . update ( 1 )
return messages_to_add
[docs] def create_motion_planning_path_slices_markers ( self , path_slices : TrajectoryPathSlices ,
time : float ) -> List [ Marker ]:
color_scale = color_palette ( "viridis" , as_cmap = True )
marker = Marker ()
marker . header . frame_id = 'map'
marker . frame_locked = False
marker . header . stamp = rospy . Time . from_sec ( time )
marker . ns = 'local_motion_planning_path_slices'
marker . id = 1
marker . type = marker . LINE_LIST
marker . scale . x = 0.005
for path_slice in path_slices . path_slices :
if self . local_motion_planning_color_scale == 'TOTAL' :
color = ColorRGBA ( * color_scale ( path_slice . total_cost ))
if self . local_motion_planning_color_scale == 'LENGTH' :
color = ColorRGBA ( * color_scale ( path_slice . length_cost ))
if self . local_motion_planning_color_scale == 'ASC' :
color = ColorRGBA ( * color_scale ( path_slice . average_squared_curvature_cost ))
if self . local_motion_planning_color_scale == 'PSC' :
color = ColorRGBA ( * color_scale ( path_slice . peak_squared_curvature_cost ))
marker . colors . append ( color )
marker . points . append ( Point ( x = path_slice . x [ 0 ], y = path_slice . y [ 0 ], z = 0 ))
for x , y in zip ( path_slice . x [ 1 : - 1 ], path_slice . y [ 1 : - 1 ]):
marker . colors . append ( color )
marker . colors . append ( color )
marker . points . append ( Point ( x = x , y = y , z = 0 ))
marker . points . append ( Point ( x = x , y = y , z = 0 ))
marker . colors . append ( color )
marker . points . append ( Point ( x = path_slice . x [ - 1 ], y = path_slice . y [ - 1 ], z = 0 ))
return [ marker ]
[docs] def create_all_motion_planning_path_slices_markers ( self , pbar : tqdm ):
messages_to_add = []
max_length = 0
path_slices_list = \
self . rosbag_df [ self . rosbag_df [ 'name' ] == '/motion_planning/local/path_slices' ][[ 'time' , 'data' ]]
pbar . reset ( total = path_slices_list . shape [ 0 ])
for time , path_slices in path_slices_list . itertuples ( index = False , name = None ):
path_slices : TrajectoryPathSlices
length = len ( path_slices . path_slices )
marker_array = MarkerArray ()
marker_array . markers += self . create_motion_planning_path_slices_markers ( path_slices = path_slices , time = time )
# marker_array.markers += self.create_mock_markers(length=length, max_length=0, frame_id='map', time=time,
# namespace='local_motion_planning_path_slices')
max_length = max ( length , max_length )
messages_to_add . append (( '/motion_planning/local/visualization' , marker_array , time ))
pbar . update ( 1 )
return messages_to_add
[docs] def create_gps_markers ( self , pbar : tqdm ):
messages_to_add = []
previous_path = []
gps_pos_list = self . rosbag_df [ self . rosbag_df [ 'name' ] == '/gps/gps' ][[ 'time' , 'data' ]]
vehicle_pose_list = self . rosbag_df [ self . rosbag_df [ 'name' ] == '/slam/vehicle_pose.pose.raw' ][[ 'time' , 'data' ]]
gps_heading_list = self . rosbag_df [ self . rosbag_df [ 'name' ] == '/novatel/oem7/heading2' ][[ 'time' , 'data' ]]
# only continue if all values are present
if gps_pos_list . empty or gps_heading_list . empty or vehicle_pose_list . empty :
return []
# get the gps position message and vehicle pose message for the first heading message so that the gps transformer can be created
gps_pos_for_first_heading_index = np . argmin ( np . abs ( gps_pos_list [ 'time' ] - gps_heading_list . iloc [ 0 ][ 'time' ]))
gps_pos_for_first_heading = gps_pos_list . iloc [ gps_pos_for_first_heading_index ][ 'data' ]
vehicle_pose_for_first_heading_index = np . argmin ( np . abs ( vehicle_pose_list [ 'time' ] - gps_heading_list . iloc [ 0 ][ 'time' ]))
vehicle_pose_for_first_heading = vehicle_pose_list . iloc [ vehicle_pose_for_first_heading_index ][ 'data' ]
lat0 = gps_pos_for_first_heading . latitude
lon0 = gps_pos_for_first_heading . longitude
# create gps transformer which will translate gps coordinates to local gps_map coordinates
crs = pyproj . CRS . from_proj4 ( f '+proj=sterea +lat_0= { lat0 } +lon_0= { lon0 } +k=1 +x_0= { 0 } +y_0= { 0 } +ellps=GRS80 +towgs84=0,0,0,0,0,0,0 +units=m +no_defs' )
transformer = pyproj . Transformer . from_crs ( crs_from = crs , crs_to = 'EPSG:4326' )
# calculate translation and rotation matrix to transform gps_map coordinates to map coordinates
heading = gps_heading_list . iloc [ 0 ][ 'data' ] . heading * np . pi / 180
translation = np . array ([ vehicle_pose_for_first_heading . x , vehicle_pose_for_first_heading . y ])
psi = vehicle_pose_for_first_heading . psi
alpha = heading - psi - np . pi / 2
rotation_matrix = np . array ([[ np . cos ( alpha ), - np . sin ( alpha )], [ np . sin ( alpha ), np . cos ( alpha )]])
gps_pos_history = []
gps_heading_for_pos = {}
# create a dictionary that maps gps positions to gps headings, so that the heading can be visualized at the correct position
for time , gps_heading in gps_heading_list . itertuples ( index = False , name = None ):
gps_pos_time = gps_pos_list . iloc [ np . argmin ( np . abs ( gps_pos_list [ 'time' ] - time ))][ 'time' ]
gps_heading_for_pos [ gps_pos_time ] = gps_heading
# create markers for gps positions, heading and uncertainty
pbar . reset ( total = gps_pos_list . shape [ 0 ])
for time , gps_pos in gps_pos_list . itertuples ( index = False , name = None ):
marker_array = MarkerArray ()
# transform gps coordinates to map coordinates
gps_map_frame = transformer . transform ( xx = gps_pos . latitude , yy = gps_pos . longitude , errcheck = True , direction = 'INVERSE' )
map_frame_rotated = rotation_matrix @ ( gps_map_frame - translation )
gps_pos_history . append ( map_frame_rotated . tolist ())
marker_array . markers . append ( self . create_path_marker (
path = gps_pos_history , frame_id = 'map' , cone_id = 1 , time = time ,
color = ColorRGBA ( r = 0 , g = 1 , b = 0 , a = 0.3 ), namespace = 'gps' ,
previous_path = previous_path ))
previous_path = marker_array . markers [ - 1 ] . points
cov = gps_pos . position_covariance
cov_rot = rotation_matrix @ np . reshape ( cov , ( 3 , 3 ))[: 2 , : 2 ] @ rotation_matrix . T
marker_array . markers . append ( self . create_position_uncertainty_marker (
position = map_frame_rotated , covariance = cov_rot , time = time , marker_id = 2 ,
namespace = 'gps_uncertainty' , color = ColorRGBA ( r = 0 , g = 1 , b = 0 , a = 0.2 ), frame_id = 'map' ))
if time in gps_heading_for_pos :
# if heading is available, visualize it
heading = - gps_heading_for_pos [ time ] . heading * np . pi / 180 + alpha + np . pi / 2
marker_array . markers . append ( self . create_pose_arrow_marker (
vehicle_pose = ( map_frame_rotated [ 0 ], map_frame_rotated [ 1 ], 0 , heading ), time = time , length = 2 ,
color = ColorRGBA ( r = 0 , g = 1 , b = 0 , a = 1 ), marker_id = 3 , frame_id = 'map' , namespace = 'gps' ))
marker_array . markers . append ( self . create_heading_uncertainity_marker (
vehicle_pose = ( map_frame_rotated [ 0 ], map_frame_rotated [ 1 ], 0 , heading ), time = time ,
heading_uncertainty = ( gps_heading_for_pos [ time ] . heading_stdev * np . pi / 180 ) ** 2 , marker_id = 4 ,
namespace = 'gps_uncertainty' , color = ColorRGBA ( r = 0 , g = 1 , b = 0 , a = 0.4 ), length = 2 , frame_id = 'map' ))
messages_to_add . append (( '/gps/visualization' , marker_array , time ))
pbar . update ( 1 )
return messages_to_add
[docs] def plot_detection_image ( self , bounding_boxes : BoundingBoxes , image : npt . NDArray ) -> None :
"""
Visualize detected bounding boxes and highest point of cone on an image along with their associated information.
Parameters
----------
bounding_boxes : BoundingBoxes
An object containing a list of `BoundingBox` objects, each of which has attributes
defining the coordinates of the bounding box (xmin, ymin, xmax, ymax), probability
of detection, and cone top coordinates (x_cone_top, y_cone_top).
image : npt.NDArray
A NumPy array representing the image to be annotated, where the image shape is
expected to be in the form (height, width, num_channels) with `num_channels`.
"""
line_thickness = 1
font_thickness = max ( line_thickness - 1 , 1 )
for bounding_box in bounding_boxes . bounding_boxes :
# Draw bounding box
c1 = ( bounding_box . xmin , bounding_box . ymin )
c2 = ( bounding_box . xmax , bounding_box . ymax )
cv2 . rectangle ( image , c1 , c2 , self . _cone_colors_cv2 [ bounding_box . id ], line_thickness , lineType = cv2 . LINE_AA )
# Display probability
prob_text = f ' { bounding_box . probability : .2f } '
c3 = ( c1 [ 0 ], c1 [ 1 ] - 3 )
cv2 . putText ( image , prob_text , c3 , 0 , line_thickness / 3 , self . _cone_colors_cv2 [ bounding_box . id ], font_thickness , lineType = cv2 . LINE_AA )
if hasattr ( bounding_box , "x_cone_top" ):
# Draw the cone top
cone_top = ( bounding_box . x_cone_top , bounding_box . y_cone_top )
cv2 . circle ( image , cone_top , radius = 3 , color = ( 255 , 0 , 0 ), thickness =- 1 ) # Red color for the cone top
else :
if not self . is_using_mask_segmentation :
print ( "No cone top coordinates available. Using Legacy mode." )
self . is_using_mask_segmentation = True
# Draw midpoint
midpoint = (( bounding_box . xmin + bounding_box . xmax ) // 2 , bounding_box . ymin )
cv2 . circle ( image , midpoint , radius = 3 , color = ( 0 , 255 , 0 ), thickness =- 1 ) # Green color for the midpoint
[docs] @staticmethod
def interpolate_multidimensional ( x : npt . NDArray , xp : npt . NDArray , fp : npt . NDArray ) -> npt . NDArray :
"""
Interpolates 2d function for given points.
Parameters
----------
x : npt.NDArray
Points for which the function should be interpolated,
shape: (m, ) with m equal to number of to be interpolated points.
xp : npt.NDArray
X values of the 2d function, shape: (n, ) with n equal to the support points of the function.
fp : npt.NDArray
Y values of the 2d function, shape: (n, k)
with n equal to the support points of the function and
with k equal number of dimensions of function.
Returns
-------
npt.NDArray
Interpolated function for the given points, shape: (m, 2)
with m equal to number of to be interpolated points and
with k equal number of dimensions of function.
"""
j = np . searchsorted ( xp , x ) - 1
d = (( x - xp [ j ]) / ( xp [ j + 1 ] - xp [ j ]))[:, np . newaxis ]
return ( 1 - d ) * fp [ j ] + fp [ j + 1 ] * d
[docs] def project_world_coordinates_to_image_pixels ( self , coordinates : npt . NDArray , camera : str ):
image_pixels = np . einsum ( 'ab, cb -> ca' , self . combined_matrix [ camera ], np . c_ [ 1000 * coordinates ,
np . ones ( coordinates . shape [ 0 ])])
return ( image_pixels / image_pixels [:, 2 , None ])[:, : 2 ] . astype ( np . int32 )
[docs] def plot_coordinates ( self , image : npt . NDArray , global_coordinates_list : npt . NDArray ,
vehicle_pose : npt . NDArray , camera : str , colors : Tuple [ int ], thickness : int = 3 ):
for global_coordinates , color in zip ( global_coordinates_list , colors ):
local_coordinates = self . transform_global_to_local_coordinates_by_vehicle_pose (
global_coordinates = global_coordinates , vehicle_pose = vehicle_pose
)
angles = np . mod ( np . arctan2 ( local_coordinates [:, 1 ], local_coordinates [:, 0 ]), 2 * np . pi )
angles [ angles > np . pi ] -= 2 * np . pi
image_pixels = self . project_world_coordinates_to_image_pixels (
coordinates = local_coordinates [ np . abs ( angles ) < np . radians ( 120 / 2 )], camera = camera )
cv2 . polylines ( img = image , pts = image_pixels [ np . newaxis ], isClosed = False ,
color = color , thickness = thickness , lineType = cv2 . LINE_AA )
[docs] def plot_centerpoints_on_visualization_image ( self , t : rospy . Time , image : npt . NDArray ,
vehicle_pose : npt . NDArray , camera : str ):
centerpoints_list = self . rosbag_df [( self . rosbag_df [ 'name' ] == '/filtering/centerpoints' )
& ( self . rosbag_df [ 'time' ] <= t . to_sec ())][[ 'time' , 'data' ]]
if centerpoints_list . empty is True :
return
centerpoints = np . array ( centerpoints_list . iloc [ - 1 ][ 'data' ])
global_coordinates = np . zeros (( centerpoints . shape [ 0 ], 3 ))
global_coordinates [:, 0 ] = centerpoints [:, 0 ]
global_coordinates [:, 1 ] = centerpoints [:, 1 ]
self . plot_coordinates ( image = image , global_coordinates_list = [ global_coordinates ], camera = camera ,
vehicle_pose = vehicle_pose , colors = [( 0 , 255 , 0 )])
[docs] def plot_future_driven_path_on_visualization_image ( self , t : rospy . Time , image : npt . NDArray ,
vehicle_pose : npt . NDArray , camera ):
realtime_future_driven_path = self . get_values_from_rosbag_df ( name = '/slam/vehicle_pose.pose' ,
start_time = t . to_sec (), duration = CONTROL_TIME_WINDOW )
if realtime_future_driven_path . empty is True :
return
realtime_future_driven_points = np . zeros (( realtime_future_driven_path . shape [ 0 ], 3 ))
realtime_future_driven_points [:, : 2 ] = np . stack ( realtime_future_driven_path )[:, : 2 ]
self . plot_coordinates ( image = image , global_coordinates_list = [ realtime_future_driven_points ], camera = camera ,
vehicle_pose = vehicle_pose , colors = [( 127 , 127 , 0 )])
[docs] def create_image_visualization ( self , outbag : rosbag . Bag , t : rospy . Time , topic : str ,
image_msg : Union [ Image , CompressedImage ]):
bounding_box_msgs = self . rosbag_df [( self . rosbag_df [ 'name' ] == '/perception/bounding_boxes' )
& ( self . rosbag_df [ 'time' ] == image_msg . header . stamp . to_sec ())]
if bounding_box_msgs . empty is True :
bounding_boxes = None
if ( self . last_camera_frame_for_image_visualization is not None
and image_msg . header . frame_id != self . last_camera_frame_for_image_visualization ):
return
else :
bounding_boxes : BoundingBoxes = bounding_box_msgs [ 'data' ] . iloc [ 0 ]
if image_msg . header . frame_id != bounding_boxes . header . frame_id :
return
if 'compressed' in topic :
image = self . bridge . compressed_imgmsg_to_cv2 ( image_msg , desired_encoding = 'bgr8' )
elif 'qoi' in topic :
image = qoi . decode ( image_msg . data )
else :
image = self . bridge . imgmsg_to_cv2 ( image_msg , desired_encoding = 'bgr8' )
vehicle_poses = self . rosbag_df [ self . rosbag_df [ 'name' ] == f '/slam/vehicle_pose.pose' ][[ 'time' , 'data' ]]
if vehicle_poses . iloc [ - 1 ][ 'time' ] < image_msg . header . stamp . to_sec ():
return
camera = 'right' if 'right' in image_msg . header . frame_id else 'left'
vehicle_pose = self . interpolate_multidimensional ([ image_msg . header . stamp . to_sec ()],
vehicle_poses [ 'time' ] . to_numpy (),
np . stack ( vehicle_poses [ 'data' ] . to_numpy ())[:, ( 0 , 1 , 3 )])[ 0 ]
self . plot_centerpoints_on_visualization_image ( t = t , image = image , vehicle_pose = vehicle_pose , camera = camera )
self . plot_control_informations_on_visualization_image ( t = t , image = image ,
vehicle_pose = vehicle_pose , camera = camera )
self . plot_motion_planning_informations_on_visualization_image ( t = t , image = image ,
vehicle_pose = vehicle_pose , camera = camera )
self . plot_landmark_informations_on_visualization_image ( t = t , image = image ,
vehicle_pose = vehicle_pose , camera = camera )
if bounding_boxes is not None :
self . plot_detection_image ( bounding_boxes = bounding_boxes , image = image )
visualization_image_msg = self . bridge . cv2_to_compressed_imgmsg ( image , dst_format = 'jpeg' )
visualization_image_msg . header = image_msg . header
outbag . write ( topic = '/visualization/image' , msg = visualization_image_msg , t = t )
self . add_image_to_video_stream ( image_msg = visualization_image_msg , topic = '/visualization/image' )
self . last_camera_frame_for_image_visualization = image_msg . header . frame_id
[docs] def create_detection_image ( self , outbag : rosbag . Bag , t : rospy . Time , topic : str ,
image_msg : Union [ Image , CompressedImage ]):
bounding_box_msgs = self . rosbag_df [( self . rosbag_df [ 'name' ] == '/perception/bounding_boxes' )
& ( self . rosbag_df [ 'time' ] == image_msg . header . stamp . to_sec ())]
if bounding_box_msgs . empty is True :
return
else :
bounding_boxes : BoundingBoxes = bounding_box_msgs [ 'data' ] . iloc [ 0 ]
if image_msg . header . frame_id != bounding_boxes . header . frame_id :
return
if 'compressed' in topic :
image = self . bridge . compressed_imgmsg_to_cv2 ( image_msg , desired_encoding = 'bgr8' )
elif 'qoi' in topic :
image = qoi . decode ( image_msg . data )
else :
image = self . bridge . imgmsg_to_cv2 ( image_msg , desired_encoding = 'bgr8' )
if bounding_boxes is not None :
self . plot_detection_image ( bounding_boxes = bounding_boxes , image = image )
detection_image_msg = self . bridge . cv2_to_compressed_imgmsg ( image , dst_format = 'jpeg' )
detection_image_msg . header = image_msg . header
outbag . write ( topic = '/perception/detection_image' , msg = detection_image_msg , t = t )
self . add_image_to_video_stream ( image_msg = detection_image_msg , topic = '/perception/detection_image' )
[docs] def prepare_recordings ( self ):
for recording_topic in self . recording_topics :
container = av . open ( f ' { self . input_rosbag_path . rsplit ( "." )[ 0 ] }{ recording_topic . replace ( "/" , "_" ) } .mp4' ,
mode = 'w' )
stream = container . add_stream ( 'h264' , rate = 25 )
stream . bit_rate = 3000000
stream . pix_fmt = 'yuv420p'
stream . codec_context . time_base = Fraction ( 1 , 1000 )
self . containers [ recording_topic ] = {
'container' : container ,
'stream' : stream ,
'last_time' : self . start ,
'pts' : 0
}
[docs] def add_image_to_video_stream ( self , image_msg : Union [ Image , CompressedImage ], topic : str ):
if topic not in self . recording_topics :
return
container = self . containers [ topic ][ 'container' ]
stream = self . containers [ topic ][ 'stream' ]
last_time = self . containers [ topic ][ 'last_time' ]
time = image_msg . header . stamp . to_sec ()
delta_pts = int (( time - last_time ) / stream . codec_context . time_base )
if delta_pts <= 0 :
return
if 'Compressed' in image_msg . _type :
image = self . bridge . compressed_imgmsg_to_cv2 ( image_msg )
else :
image = self . bridge . imgmsg_to_cv2 ( image_msg )
stream . width = image . shape [ 1 ]
stream . height = image . shape [ 0 ]
frame = av . VideoFrame . from_ndarray ( image [:, :, : 3 ], format = 'bgr24' )
self . containers [ topic ][ 'pts' ] += delta_pts
frame . pts = self . containers [ topic ][ 'pts' ]
for packet in stream . encode ( frame ):
container . mux ( packet )
self . containers [ topic ][ 'last_time' ] = time
[docs] def finish_recordings ( self ):
for recording_topic in self . recording_topics :
container = self . containers [ recording_topic ][ 'container' ]
stream = self . containers [ recording_topic ][ 'stream' ]
for packet in stream . encode ():
container . mux ( packet )
container . close ()
[docs] def generate_output_bag ( self , pbar : tqdm ):
self . messages_to_add . sort ( key = lambda message : message [ 2 ])
self . prepare_recordings ()
messages = ( message for message in self . messages_to_add )
next_message = next ( messages , None )
pbar . reset ( total = self . input_msg_n + len ( self . messages_to_add ))
with rosbag . Bag ( self . output_rosbag_path , 'w' ) as outbag :
while next_message is not None and next_message [ 2 ] + 0.3 < self . start :
next_message = next ( messages , None )
pbar . update ( 1 )
for topic , msg , t in self . input_rosbag . read_messages ():
t : rospy . Time
if t . to_sec () < self . start :
continue
if t . to_sec () > self . end + 2 :
break
if topic == "/tf" and self . transforms is True :
continue
while next_message is not None and rospy . Time . from_sec ( next_message [ 2 ]) < t :
outbag . write ( next_message [ 0 ], next_message [ 1 ], rospy . Time . from_sec ( next_message [ 2 ]))
self . add_image_to_video_stream ( image_msg = next_message [ 1 ], topic = next_message [ 0 ])
next_message = next ( messages , None )
pbar . update ( 1 )
if self . image_visualization is True and '/image_rect_color' in topic :
self . create_image_visualization ( outbag = outbag , t = t , image_msg = msg , topic = topic )
if self . generate_detection_image is True and '/image_rect_color' in topic :
self . create_detection_image ( outbag = outbag , t = t , image_msg = msg , topic = topic )
outbag . write ( topic , msg , t )
self . add_image_to_video_stream ( image_msg = msg , topic = topic )
pbar . update ( 1 )
while next_message is not None and next_message [ 2 ] < t . to_sec ():
outbag . write ( next_message [ 0 ], next_message [ 1 ], rospy . Time . from_sec ( next_message [ 2 ]))
self . add_image_to_video_stream ( image_msg = next_message [ 1 ], topic = next_message [ 0 ])
next_message = next ( messages , None )
pbar . update ( 1 )
self . finish_recordings ()
[docs] def run ( self ):
jobs = [
[ self . create_gps_markers , "Creating GPS Markers" , None ],
[ self . create_vehicle_marker , "Creating Vehicle Marker" , None ],
[ self . create_ground_truth_markers , "Creating Ground Truth Markers" , None ],
[ self . create_local_mapping_markers , "Creating Local Mapping Markers" , None ],
[ self . create_filtering_markers , "Creating Filtering Markers" , None ],
[ self . create_motion_planning_markers , "Creating Motion Planning Markers" , None ],
[ self . create_control_markers , "Creating Control Markers" , None ],
[ partial ( self . create_slam_pose_markers , base_color = ColorRGBA ( r = 1 , g = 0 , b = 0 , a = 1 ), filter = '' ), "Creating Slam Realtime Pose Markers" , None ],
[ partial ( self . create_slam_pose_markers , base_color = ColorRGBA ( r = 1 , g = 0.5 , b = 0 , a = 1 ), filter = 'main' ), "Creating Slam Main Pose Markers" , None ],
[ self . create_slam_landmark_markers , "Creating Slam Landmark Markers" , None ],
[ self . create_slam_map_alignment_markers , "Creating Slam Map Alignment Markers" , None ],
[ self . create_all_motion_planning_path_slices_markers , "Creating All Motion Planning Path Slices Markers" , None ],
[ self . create_transforms , "Creating Transforms" , None ],
]
total_func_i = 2
import_pbar = tqdm ( total = 1 , desc = "Importing Rosbag" , leave = True , disable = DISABLE_PROGRESSBAR , position = 1 )
for job in jobs :
job [ 2 ] = tqdm ( total = 1 , desc = job [ 1 ], leave = True , disable = DISABLE_PROGRESSBAR , position = total_func_i )
total_func_i += 1
outbag_pbar = tqdm ( total = 1 , desc = "Generating Output Rosbag" , leave = True , disable = DISABLE_PROGRESSBAR , position = total_func_i )
self . import_rosbag ( import_pbar )
for func , name , pbar in jobs :
self . messages_to_add += func ( pbar = pbar )
self . generate_output_bag ( outbag_pbar )
if __name__ == '__main__' :
# yappi.start()
visualizer = Visualizer ( input_rosbag = '/workspace/as_ros/rosbags/run_811_migrated_2024-05-20-01-49-57.bag' ,
vehicle = 'rennate' , transforms = False , output_rosbag_suffix = '_viz2' ,
start = 0 , duration = 10 , end = None , use_header_time = True , gps = False ,
test_day = 'kw20' , track_layout = 'run_811' , n_stddev = 1 , image_visualization = False ,
calibration_matrices_subdirectory = '' , local_motion_planning_color_scale = 'TOTAL' ,
generate_detection_image = False , recording = [])
visualizer . run ()
# yappi.stop()
# path = '/workspace/as_ros/rosbags/profiling/callgrind.out_visualizer'
# yappi.get_func_stats().save(path, type='callgrind')
# os.system(f'gprof2dot -w -s -f callgrind {path} | dot -Tsvg -o {path}.svg')
Copy to clipboard