Source code for cli.analyzer

import os
import glob
import warnings
from typing import Optional, Any
import functools
from pathlib import Path

import click
import progressbar

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import plotly.graph_objects as go

from ruamel.yaml import YAML
import yaml
from scipy.interpolate import interp1d
from scipy.spatial.transform import Rotation as R

import rosbag
import rospy


[docs]def rgetattr(obj, attr, *args): def _getattr(obj, attr): return getattr(obj, attr, *args) return functools.reduce(_getattr, [obj] + attr.split('.'))
[docs]def save_intrinsic_rotation(vehicle_name: str, intrinsic_rotation: np.ndarray) -> None: """ Save intrinsic rotation to correct transforms yaml. """ yaml = YAML() yaml.preserve_quotes = True # Load transforms yaml transforms_path = f'{os.path.abspath(os.path.dirname(__file__))}/../../config/transforms/{vehicle_name}.yaml' with open(transforms_path, 'r') as transforms_file: transforms: dict = yaml.load(transforms_file) # Update transforms yaml for i, letter in enumerate(("x", "y", "z")): transforms['tf']['vehicle']["imu"]["rotation"][letter] = float(intrinsic_rotation[i]) # Save transforms yaml with open(transforms_path, 'w') as transforms_file: yaml.dump(transforms, transforms_file)
[docs]def save_rotated_gggv_data(vehicle_name: str, ggg_data: list, intrinsic_rotation: np.ndarray, velocities: list) -> None: accel_data = np.array(ggg_data) rotation_matrix = R.from_quat([intrinsic_rotation[0], intrinsic_rotation[1], intrinsic_rotation[2], 0.01]).as_matrix() rotated_data = np.zeros(accel_data.shape) for i in range(accel_data.shape[0]): rotated_vector = np.dot(accel_data[i,:], rotation_matrix) rotated_data[i,:] = rotated_vector base_path = f'{os.path.abspath(os.path.dirname(__file__))}/../../../../rosbags/gggv_data/{vehicle_name}' os.makedirs(base_path, exist_ok=True) # file_number = max([-1] + [int(filename.split('gggv_')[1].split('.npz')[0]) for filename in glob.glob(f'{base_path}/gggv_*.npz')]) + 1 file_number = len(glob.glob(f'{base_path}/gggv_*.npz')) np.savez(f'{base_path}/gggv_{file_number}.npz', ax=rotated_data[:,0], ay=rotated_data[:,1], az=rotated_data[:,2], v=np.array(velocities))
[docs]class RosbagAnalyzer(): def __init__(self, input_rosbag: str, time_delay: bool, actuator_delay: bool, imu_tilt: bool, parameter_dump: bool, calibration_matrices: bool, gggv_dump: bool, vehicle_name: str, start: float, duration: Optional[float], end: Optional[float]): # Raise error if no analyzing parameter is set if not imu_tilt and not time_delay and not actuator_delay and not parameter_dump and not calibration_matrices and not gggv_dump: warnings.warn('No analyzing parameter is set. Nothing will be analyzed.') # Variables needed for class wide usage self.input_rosbag_path = input_rosbag self.input_rosbag = rosbag.Bag(input_rosbag) self.input_rosbag_start = self.input_rosbag.get_start_time() self.start = self.input_rosbag_start + start self.end = self.input_rosbag.get_end_time() self.rosbag_name = Path(input_rosbag).name # Calculate end time if end: self.end = self.input_rosbag_start + end if duration: self.end = min(self.start + duration, self.end) # Variables needed for actuator time delay self.actuator_delay = actuator_delay self.actuator_request_list = [] self.actuator_request_time_list = [] self.actuator_actual_list = [] self.actuator_actual_time_list = [] # Variables needed for IMU tilt self.rotation = None self.started_driving = False self.imu_tilt = imu_tilt self.imu_tilt_data = [] # variables needed for parameter and calibration matrices self.parameter_dump = parameter_dump self.extrinsic_matrix_left = np.array([]) self.extrinsic_matrix_right = np.array([]) self.intrinsic_matrix_left = np.array([]) self.intrinsic_matrix_right = np.array([]) self.calibration_matrices = calibration_matrices # variables needed for ggv dump self.gggv_dump = gggv_dump self.accel_data = [] self.velocities = [] if self.gggv_dump or self.imu_tilt: if vehicle_name == "": self.vehicle_name = click.prompt('Please enter the vehicle name') else: self.vehicle_name = vehicle_name if self.vehicle_name not in ['eva', 'emma', 'rennate']: raise ValueError(f"Vehicle name {self.vehicle_name} not supported.") if self.gggv_dump: vehicle_config_path = f'{os.path.abspath(os.path.dirname(__file__))}/../../config/{self.vehicle_name}.yaml' try: with open(vehicle_config_path, 'r') as config_file: vehicle_config: dict = yaml.load(config_file, Loader=yaml.SafeLoader) self.wheel_radius = vehicle_config['vehicle']['wheel_radius'] self.gear_ratio = vehicle_config['vehicle']['rear_gear_ratio'] except FileNotFoundError: print(f"The file {vehicle_config_path} does not exist.") except yaml.YAMLError as exc: print(f"Error parsing YAML file: {exc}") # Variables needed for time delay self.time_delay = time_delay self.time_delay_list = [] self.time_delay_config = { '/can/log/wheelspeed/rear': [ ('receive_time', 'header') ], '/slam/vehicle_pose': [ ('receive_time', 'header') ], '/zed2i/zed_node/left/image_rect_color': [ ('receive_time', 'header') ], '/zed2i/zed_node/right/image_rect_color': [ ('receive_time', 'header') ], '/zed2i/zed_node/left/image_rect_color/compressed': [ ('receive_time', 'header') ], '/zed2i/zed_node/right/image_rect_color/compressed': [ ('receive_time', 'header') ], '/zed2i/zed_node/left/image_rect_color/qoi': [ ('receive_time', 'header') ], '/zed2i/zed_node/right/image_rect_color/qoi': [ ('receive_time', 'header') ], '/perception/bounding_boxes': [ ('header', 'image_header') ], '/local_mapping/cone_positions': [ ('header', 'image_header') ], '/slam/landmarks': [ ('receive_time', 'header') ], '/filtering/centerpoints': [ ('header', 'image_header'), ('header', 'vehicle_pose.header'), ], '/motion_planning/trajectory': [ ('receive_time', 'pose_header'), ('receive_time', 'image_header') ], '/control/predicted_states': [ ('header', 'pose_header'), ('header', 'image_header') ] }
[docs] def process(self, topic: str, msg: Any, t: rospy.Time): if self.time_delay: for end_time, start_time in self.time_delay_config.get(topic, {}): try: if end_time == 'receive_time': end: rospy.Time = t else: end: rospy.Time = rgetattr(msg, f'{end_time}.stamp') if start_time == 'receive_time': start: rospy.Time = t else: start: rospy.Time = rgetattr(msg, f'{start_time}.stamp') delay = end.to_sec() - start.to_sec() self.time_delay_list.append((topic, delay, f'{topic}: {start_time} -> {end_time}')) except Exception: click.echo(f"Error while processing end or start time for topic: {topic}") if self.actuator_delay: if topic == '/control/steering_wheel_angle/request': try: if hasattr(msg, 'angle'): self.actuator_request_list.append(getattr(msg, 'angle')) self.actuator_request_time_list.append(msg.header.stamp.to_sec() - self.input_rosbag_start) except Exception: click.echo(f"Error while loading topic: {topic}") if topic == '/can/per/steering_wheel_angle/actual': try: if hasattr(msg, 'angle'): self.actuator_actual_list.append(getattr(msg, 'angle')) self.actuator_actual_time_list.append(msg.header.stamp.to_sec() - self.input_rosbag_start) except Exception: click.echo(f"Error while loading topic: {topic}") if self.imu_tilt and not self.started_driving: if topic == '/can/per/wheelspeed/front': try: if hasattr(msg, 'right'): speed = getattr(msg, 'right') self.started_driving = speed > 0 except Exception: click.echo(f"Error while loading topic: {topic}") if topic == '/can/per/imu/accel': try: if hasattr(msg, 'x') and hasattr(msg, 'y') and hasattr(msg, 'z'): x, y, z = getattr(msg, 'x'), getattr(msg, 'y'), getattr(msg, 'z') acc = [x, y, z] self.imu_tilt_data.append(acc) except Exception: click.echo(f"Error while loading topic: {topic}") if self.parameter_dump: if topic == '/parameters': try: if hasattr(msg, 'data'): self.parameters_string = getattr(msg, 'data') except Exception: click.echo(f"Error while loading topic: {topic}") if self.calibration_matrices: if topic == '/calibration_matrices': try: if hasattr(msg, 'extrinsic_matrix_left'): self.extrinsic_matrix_left = np.array([getattr(msg, 'extrinsic_matrix_left')]).reshape(4,4) if hasattr(msg, 'extrinsic_matrix_right'): self.extrinsic_matrix_right = np.array([getattr(msg, 'extrinsic_matrix_right')]).reshape(4,4) if hasattr(msg, 'intrinsic_matrix_left'): self.intrinsic_matrix_left = np.array([getattr(msg, 'intrinsic_matrix_left')]).reshape(3,3) if hasattr(msg, 'intrinsic_matrix_right'): self.intrinsic_matrix_right = np.array([getattr(msg, 'intrinsic_matrix_right')]).reshape(3,3) except Exception: click.echo(f"Error while loading topic: {topic}") if self.gggv_dump: if topic == '/can/per/wheelspeed/front': try: if hasattr(msg, 'right') or hasattr(msg, 'left'): n_right = getattr(msg, 'right') n_left = getattr(msg, 'left') n_front = (abs(n_right) + abs(n_left)) / 2 v = (2 * n_front * np.pi * self.wheel_radius) / 60 self.velocities.append(v) except Exception: click.echo(f"Error while loading topic: {topic}") if topic == '/can/per/imu/accel': try: if hasattr(msg, 'x') and hasattr(msg, 'y') and hasattr(msg, 'z'): x, y, z = getattr(msg, 'x'), getattr(msg, 'y'), getattr(msg, 'z') acc = [x, y, z] self.accel_data.append(acc) except Exception: click.echo(f"Error while loading topic: {topic}")
[docs] def run(self): with progressbar.ProgressBar(max_value=self.input_rosbag.get_message_count()) as bar: bar.start() for topic, msg, t in self.input_rosbag.read_messages(): if t.to_sec() < self.start: bar.max_value -= 1 bar.update(bar.value) continue if t.to_sec() > self.end: break self.process(topic=topic, msg=msg, t=t) bar.update(bar.value + 1) bar.finish()
[docs] def plot_time_delay(self): fig = go.Figure() names = self.time_delay_df['name'].unique() means = [self.time_delay_df[self.time_delay_df['name'] == name]['delay'].median() for name in names] for name in names[np.argsort(means)]: sub_df = self.time_delay_df[self.time_delay_df['name'] == name] fig.add_trace(go.Box( y=sub_df['delay'], name=name, boxmean='sd' )) base_path = f'{self.input_rosbag_path.rsplit(".", 1)[0]}' Path(base_path).mkdir(parents=True, exist_ok=True) fig.write_html(f'{base_path}/time_delay.html') plt.rcParams.update({'font.size': 10}) fig = plt.figure(figsize=(6, 2), dpi=600) ax = fig.add_subplot(111) ax.set_title(f'Median Time Delay Distribution for {self.rosbag_name.split(".")[0]}') names = ['/zed2i/zed_node/left/image_rect_color/compressed: header -> receive_time', # '/zed2i/zed_node/right/image_rect_color: header -> receive_time', # noqa: E800 '/perception/bounding_boxes: image_header -> header', '/local_mapping/cone_positions: image_header -> header', '/slam/landmarks: header -> receive_time', '/filtering/centerpoints: image_header -> header', '/motion_planning/trajectory: image_header -> receive_time', '/control/predicted_states: image_header -> header'] labels = ['Camera Wrapper', 'Inference', 'Local Mapping', 'SLAM', 'Filtering', 'Motion Planning', 'Control'] colors = ['blue', 'orange', 'green', 'red', 'purple', 'brown', 'pink'] left = 0 used_labels = [] for name, label, color in zip(names, labels, colors): # Calculate the median and quartiles median = self.time_delay_df[self.time_delay_df['name'] == name]['delay'].median() q1_val = self.time_delay_df[self.time_delay_df['name'] == name]['delay'].quantile(0.25) q3_val = self.time_delay_df[self.time_delay_df['name'] == name]['delay'].quantile(0.75) if np.isfinite(median): # Calculate the error bars error1 = median - q1_val error2 = q3_val - median # Plot the bar width = median - left ax.barh([-len(used_labels)], [median], height=0.05, color='grey') bar = ax.barh([-len(used_labels)], [width], left=left, height=1, color=color, xerr=np.array(error1, error2).T, capsize=5) # Update the left position left += width used_labels.append(label) ax.bar_label(bar, fmt='%1.3f', padding=3, labels=[f'{width:0.3f}']) ax.spines['top'].set_visible(False) ax.spines['right'].set_visible(False) ax.spines['left'].set_visible(False) ax.set_yticks(-np.arange(len(used_labels))) ax.set_yticklabels(used_labels) ax.tick_params( axis='y', # changes apply to the x-axis which='both', # both major and minor ticks are affected bottom=False, # ticks along the bottom edge are off top=False, # ticks along the top edge are off labelbottom=False, length=0) # labels along the bottom edge are off base_path = f'{self.input_rosbag_path.rsplit(".", 1)[0]}' Path(base_path).mkdir(parents=True, exist_ok=True) fig.savefig(f'{base_path}/time_delay_distribution.pdf', bbox_inches='tight')
[docs] def plot_actuator_delay(self): fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(10, 6)) ax1.plot(self.sampling_time_array, self.actuator_request_corrected_array, label='requested angle') ax1.plot(self.sampling_time_array, self.actuator_actural_corrected_array, label='actual angle') ax1.set_xlabel('timestamp') ax1.set_ylabel('angle') ax1.set_title(f'requested and actual steering wheel angle') ax2.plot(self.actuator_cross_cov_time, self.actuator_cross_covariance) ax2.set_xlabel(f'timestamp, maximum at {self.time_max_cross_cov} ms') ax2.set_ylabel('cross covariance') ax2.set_title('cross covariance of actuator delay') plt.tight_layout() base_path = f'{self.input_rosbag_path.rsplit(".", 1)[0]}' Path(base_path).mkdir(parents=True, exist_ok=True) fig.savefig(f'{base_path}/actuator_delay.pdf', bbox_inches='tight') data_for_csv = pd.DataFrame({'Rosbag': [self.input_rosbag_path], 'Start': [self.start - self.input_rosbag_start], \ 'End': [self.end - self.input_rosbag_start], 'delay': [self.time_max_cross_cov]}) data_for_csv.to_csv(f'{base_path}/../cross_cov_max.csv', mode='a', header=False, index=False)
[docs] def plot(self): if self.time_delay: self.plot_time_delay() if self.actuator_delay: self.plot_actuator_delay()
[docs] def calculate_acc_rotation_object(self) -> None: """ Calculate rotation object for accelerometer. Uses the mean of the acceleration while standing still to calculate the gravitational acceleration. """ # Calculate index index = int(0.95 * len(self.imu_tilt_data)) # 95% of the data to be sure to be standing still # what the imu measured where g points to measured_g = np.nanmean(self.imu_tilt_data[: index], axis=0) dest_vec = np.array([0, 0, -1]) # g should just point downwards # https://stackoverflow.com/a/59204638 # Calculate rotation matrix to rotate measure g to dest_vector a, b = (measured_g / np.linalg.norm(measured_g)).reshape(3), (dest_vec / np.linalg.norm(dest_vec)).reshape(3) v = np.cross(a, b) c = np.dot(a, b) s = np.linalg.norm(v) kmat = np.array([[0, -v[2], v[1]], [v[2], 0, -v[0]], [-v[1], v[0], 0]]) rotation_matrix = np.eye(3) + kmat + kmat.dot(kmat) * ((1 - c) / (s ** 2)) self.rotation = R.from_matrix(rotation_matrix)
[docs] def finish(self): if self.time_delay: self.time_delay_df = pd.DataFrame(self.time_delay_list, columns=['topic', 'delay', 'name']) if self.actuator_delay: actuator_request_array = np.array(self.actuator_request_list) actuator_request_time_array = np.array(self.actuator_request_time_list) actuator_actual_array = np.array(self.actuator_actual_list) actuator_actual_time_array = np.array(self.actuator_actual_time_list) first_time_index = np.argmax(actuator_actual_time_array > actuator_request_time_array[0]) last_time_index = np.argmax(actuator_actual_time_array > actuator_request_time_array[-1]) actuator_actual_time_sliced_array = actuator_actual_time_array[first_time_index + 1:last_time_index - 1] actuator_actual_sliced_array = actuator_actual_array[first_time_index + 1:last_time_index - 1] actuator_request_lut = interp1d(actuator_request_time_array, actuator_request_array) actuator_actual_lut = interp1d(actuator_actual_time_sliced_array, actuator_actual_sliced_array) sampling_dt = 0.002 n_samples_over_time_frame = \ int(round((actuator_actual_time_sliced_array[-1] - actuator_actual_time_sliced_array[0]) / sampling_dt)) self.sampling_time_array = np.linspace(actuator_actual_time_sliced_array[0], \ actuator_actual_time_sliced_array[-1], \ n_samples_over_time_frame) self.actuator_request_corrected_array = actuator_request_lut(self.sampling_time_array) self.actuator_actural_corrected_array = actuator_actual_lut(self.sampling_time_array) self.actuator_cross_covariance = \ np.correlate(self.actuator_request_corrected_array - np.mean(self.actuator_request_corrected_array), \ self.actuator_actural_corrected_array - np.mean(self.actuator_actural_corrected_array), mode='full') self.actuator_cross_cov_time = np.arange(-len(self.sampling_time_array) + 1, len(self.sampling_time_array)) \ * sampling_dt self.time_max_cross_cov = self.actuator_cross_cov_time[np.argmax(self.actuator_cross_covariance)] * 1000 if self.parameter_dump: parameters_dict = yaml.safe_load(self.parameters_string) restructured_parameters_dict = {} for key, value in parameters_dict.items(): current_dict = restructured_parameters_dict keys = key.split("/") for k in keys[1:-1]: if k not in current_dict: current_dict[k] = {} current_dict = current_dict[k] current_dict[keys[-1]] = value parameter_path = '/workspace/as_ros/rosbags/parameters' Path(f'{parameter_path}').mkdir(parents=True, exist_ok=True) with open(f'{parameter_path}/{self.rosbag_name.split(".")[0]}.yaml', 'w') as parameter_file: yaml.dump(restructured_parameters_dict, parameter_file, default_flow_style=False) if self.calibration_matrices: matrices_path = f'/workspace/as_ros/src/local_mapping/scripts/matrices/{self.rosbag_name.split(".")[0]}' Path(f'{matrices_path}').mkdir(parents=True, exist_ok=True) np.save(f'{matrices_path}/extrinsic_matrix_left.npy', self.extrinsic_matrix_left) np.save(f'{matrices_path}/extrinsic_matrix_right.npy', self.extrinsic_matrix_right) np.save(f'{matrices_path}/intrinsic_matrix_left.npy', self.intrinsic_matrix_left) np.save(f'{matrices_path}/intrinsic_matrix_right.npy', self.intrinsic_matrix_right) self.plot() if self.imu_tilt: # Calculate Rotation self.calculate_acc_rotation_object() # Extrinsic rotation extrinsic_rotation = self.rotation.as_euler('xyz', degrees=True) # Intrinsic rotation intrinsic_rotation = self.rotation.as_euler('XYZ', degrees=True) # Ask user if values look good and save them click.echo(f"\nExtrinsic rotation: x:{extrinsic_rotation[0]}, y:{extrinsic_rotation[1]}, z:{extrinsic_rotation[2]}") click.echo(f"Intrinsic rotation (will be saved): X:{intrinsic_rotation[0]}, Y:{intrinsic_rotation[1]}, Z:{intrinsic_rotation[2]}") answer = click.prompt('Do the values look good?', type=click.Choice(['y', 'n']), show_choices=True) if answer == 'y': save_intrinsic_rotation(self.vehicle_name, intrinsic_rotation) click.echo(f"Saving imu tilt values for vehicle: {self.vehicle_name}") elif answer == 'n': click.echo("Values not saved") if self.gggv_dump: transforms_path = f'{os.path.abspath(os.path.dirname(__file__))}/../../config/transforms/{self.vehicle_name}.yaml' try: with open(transforms_path, 'r') as transforms_file: transforms: dict = yaml.load(transforms_file, Loader=yaml.SafeLoader) # read imu rotation from transforms yaml intrinsic_rotation = np.zeros(3) for i, letter in enumerate(("x", "y", "z")): intrinsic_rotation[i] = transforms['tf']['vehicle']["imu"]["rotation"][letter] except FileNotFoundError: print(f"The file {transforms_path} does not exist.") except yaml.YAMLError as exc: print(f"Error parsing YAML file: {exc}") save_rotated_gggv_data(self.vehicle_name, self.accel_data, intrinsic_rotation, self.velocities) click.echo(f"\n Saved gggv values for vehicle: {self.vehicle_name}")
if __name__ == '__main__': analyzer = RosbagAnalyzer(input_rosbag='/workspace/as_ros/rosbags/rosbag_name.bag', start=0, duration=None, end=None, time_delay=False, actuator_delay=False, imu_tilt=False) analyzer.run() analyzer.finish()