import os
import glob
import warnings
from typing import Optional, Any
import functools
from pathlib import Path
import click
import progressbar
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import plotly.graph_objects as go
from ruamel.yaml import YAML
import yaml
from scipy.interpolate import interp1d
from scipy.spatial.transform import Rotation as R
import rosbag
import rospy
[docs]def rgetattr(obj, attr, *args):
def _getattr(obj, attr):
return getattr(obj, attr, *args)
return functools.reduce(_getattr, [obj] + attr.split('.'))
[docs]def save_intrinsic_rotation(vehicle_name: str, intrinsic_rotation: np.ndarray) -> None:
"""
Save intrinsic rotation to correct transforms yaml.
"""
yaml = YAML()
yaml.preserve_quotes = True
# Load transforms yaml
transforms_path = f'{os.path.abspath(os.path.dirname(__file__))}/../../config/transforms/{vehicle_name}.yaml'
with open(transforms_path, 'r') as transforms_file:
transforms: dict = yaml.load(transforms_file)
# Update transforms yaml
for i, letter in enumerate(("x", "y", "z")):
transforms['tf']['vehicle']["imu"]["rotation"][letter] = float(intrinsic_rotation[i])
# Save transforms yaml
with open(transforms_path, 'w') as transforms_file:
yaml.dump(transforms, transforms_file)
[docs]def save_rotated_gggv_data(vehicle_name: str, ggg_data: list, intrinsic_rotation: np.ndarray, velocities: list) -> None:
accel_data = np.array(ggg_data)
rotation_matrix = R.from_quat([intrinsic_rotation[0], intrinsic_rotation[1], intrinsic_rotation[2], 0.01]).as_matrix()
rotated_data = np.zeros(accel_data.shape)
for i in range(accel_data.shape[0]):
rotated_vector = np.dot(accel_data[i,:], rotation_matrix)
rotated_data[i,:] = rotated_vector
base_path = f'{os.path.abspath(os.path.dirname(__file__))}/../../../../rosbags/gggv_data/{vehicle_name}'
os.makedirs(base_path, exist_ok=True)
# file_number = max([-1] + [int(filename.split('gggv_')[1].split('.npz')[0]) for filename in glob.glob(f'{base_path}/gggv_*.npz')]) + 1
file_number = len(glob.glob(f'{base_path}/gggv_*.npz'))
np.savez(f'{base_path}/gggv_{file_number}.npz',
ax=rotated_data[:,0], ay=rotated_data[:,1], az=rotated_data[:,2], v=np.array(velocities))
[docs]class RosbagAnalyzer():
def __init__(self, input_rosbag: str, time_delay: bool, actuator_delay: bool, imu_tilt: bool,
parameter_dump: bool, calibration_matrices: bool, gggv_dump: bool, vehicle_name: str,
start: float, duration: Optional[float], end: Optional[float]):
# Raise error if no analyzing parameter is set
if not imu_tilt and not time_delay and not actuator_delay and not parameter_dump and not calibration_matrices and not gggv_dump:
warnings.warn('No analyzing parameter is set. Nothing will be analyzed.')
# Variables needed for class wide usage
self.input_rosbag_path = input_rosbag
self.input_rosbag = rosbag.Bag(input_rosbag)
self.input_rosbag_start = self.input_rosbag.get_start_time()
self.start = self.input_rosbag_start + start
self.end = self.input_rosbag.get_end_time()
self.rosbag_name = Path(input_rosbag).name
# Calculate end time
if end:
self.end = self.input_rosbag_start + end
if duration:
self.end = min(self.start + duration, self.end)
# Variables needed for actuator time delay
self.actuator_delay = actuator_delay
self.actuator_request_list = []
self.actuator_request_time_list = []
self.actuator_actual_list = []
self.actuator_actual_time_list = []
# Variables needed for IMU tilt
self.rotation = None
self.started_driving = False
self.imu_tilt = imu_tilt
self.imu_tilt_data = []
# variables needed for parameter and calibration matrices
self.parameter_dump = parameter_dump
self.extrinsic_matrix_left = np.array([])
self.extrinsic_matrix_right = np.array([])
self.intrinsic_matrix_left = np.array([])
self.intrinsic_matrix_right = np.array([])
self.calibration_matrices = calibration_matrices
# variables needed for ggv dump
self.gggv_dump = gggv_dump
self.accel_data = []
self.velocities = []
if self.gggv_dump or self.imu_tilt:
if vehicle_name == "":
self.vehicle_name = click.prompt('Please enter the vehicle name')
else:
self.vehicle_name = vehicle_name
if self.vehicle_name not in ['eva', 'emma', 'rennate']:
raise ValueError(f"Vehicle name {self.vehicle_name} not supported.")
if self.gggv_dump:
vehicle_config_path = f'{os.path.abspath(os.path.dirname(__file__))}/../../config/{self.vehicle_name}.yaml'
try:
with open(vehicle_config_path, 'r') as config_file:
vehicle_config: dict = yaml.load(config_file, Loader=yaml.SafeLoader)
self.wheel_radius = vehicle_config['vehicle']['wheel_radius']
self.gear_ratio = vehicle_config['vehicle']['rear_gear_ratio']
except FileNotFoundError:
print(f"The file {vehicle_config_path} does not exist.")
except yaml.YAMLError as exc:
print(f"Error parsing YAML file: {exc}")
# Variables needed for time delay
self.time_delay = time_delay
self.time_delay_list = []
self.time_delay_config = {
'/can/log/wheelspeed/rear': [
('receive_time', 'header')
],
'/slam/vehicle_pose': [
('receive_time', 'header')
],
'/zed2i/zed_node/left/image_rect_color': [
('receive_time', 'header')
],
'/zed2i/zed_node/right/image_rect_color': [
('receive_time', 'header')
],
'/zed2i/zed_node/left/image_rect_color/compressed': [
('receive_time', 'header')
],
'/zed2i/zed_node/right/image_rect_color/compressed': [
('receive_time', 'header')
],
'/zed2i/zed_node/left/image_rect_color/qoi': [
('receive_time', 'header')
],
'/zed2i/zed_node/right/image_rect_color/qoi': [
('receive_time', 'header')
],
'/perception/bounding_boxes': [
('header', 'image_header')
],
'/local_mapping/cone_positions': [
('header', 'image_header')
],
'/slam/landmarks': [
('receive_time', 'header')
],
'/filtering/centerpoints': [
('header', 'image_header'),
('header', 'vehicle_pose.header'),
],
'/motion_planning/trajectory': [
('receive_time', 'pose_header'),
('receive_time', 'image_header')
],
'/control/predicted_states': [
('header', 'pose_header'),
('header', 'image_header')
]
}
[docs] def process(self, topic: str, msg: Any, t: rospy.Time):
if self.time_delay:
for end_time, start_time in self.time_delay_config.get(topic, {}):
try:
if end_time == 'receive_time':
end: rospy.Time = t
else:
end: rospy.Time = rgetattr(msg, f'{end_time}.stamp')
if start_time == 'receive_time':
start: rospy.Time = t
else:
start: rospy.Time = rgetattr(msg, f'{start_time}.stamp')
delay = end.to_sec() - start.to_sec()
self.time_delay_list.append((topic, delay, f'{topic}: {start_time} -> {end_time}'))
except Exception:
click.echo(f"Error while processing end or start time for topic: {topic}")
if self.actuator_delay:
if topic == '/control/steering_wheel_angle/request':
try:
if hasattr(msg, 'angle'):
self.actuator_request_list.append(getattr(msg, 'angle'))
self.actuator_request_time_list.append(msg.header.stamp.to_sec() - self.input_rosbag_start)
except Exception:
click.echo(f"Error while loading topic: {topic}")
if topic == '/can/per/steering_wheel_angle/actual':
try:
if hasattr(msg, 'angle'):
self.actuator_actual_list.append(getattr(msg, 'angle'))
self.actuator_actual_time_list.append(msg.header.stamp.to_sec() - self.input_rosbag_start)
except Exception:
click.echo(f"Error while loading topic: {topic}")
if self.imu_tilt and not self.started_driving:
if topic == '/can/per/wheelspeed/front':
try:
if hasattr(msg, 'right'):
speed = getattr(msg, 'right')
self.started_driving = speed > 0
except Exception:
click.echo(f"Error while loading topic: {topic}")
if topic == '/can/per/imu/accel':
try:
if hasattr(msg, 'x') and hasattr(msg, 'y') and hasattr(msg, 'z'):
x, y, z = getattr(msg, 'x'), getattr(msg, 'y'), getattr(msg, 'z')
acc = [x, y, z]
self.imu_tilt_data.append(acc)
except Exception:
click.echo(f"Error while loading topic: {topic}")
if self.parameter_dump:
if topic == '/parameters':
try:
if hasattr(msg, 'data'):
self.parameters_string = getattr(msg, 'data')
except Exception:
click.echo(f"Error while loading topic: {topic}")
if self.calibration_matrices:
if topic == '/calibration_matrices':
try:
if hasattr(msg, 'extrinsic_matrix_left'):
self.extrinsic_matrix_left = np.array([getattr(msg, 'extrinsic_matrix_left')]).reshape(4,4)
if hasattr(msg, 'extrinsic_matrix_right'):
self.extrinsic_matrix_right = np.array([getattr(msg, 'extrinsic_matrix_right')]).reshape(4,4)
if hasattr(msg, 'intrinsic_matrix_left'):
self.intrinsic_matrix_left = np.array([getattr(msg, 'intrinsic_matrix_left')]).reshape(3,3)
if hasattr(msg, 'intrinsic_matrix_right'):
self.intrinsic_matrix_right = np.array([getattr(msg, 'intrinsic_matrix_right')]).reshape(3,3)
except Exception:
click.echo(f"Error while loading topic: {topic}")
if self.gggv_dump:
if topic == '/can/per/wheelspeed/front':
try:
if hasattr(msg, 'right') or hasattr(msg, 'left'):
n_right = getattr(msg, 'right')
n_left = getattr(msg, 'left')
n_front = (abs(n_right) + abs(n_left)) / 2
v = (2 * n_front * np.pi * self.wheel_radius) / 60
self.velocities.append(v)
except Exception:
click.echo(f"Error while loading topic: {topic}")
if topic == '/can/per/imu/accel':
try:
if hasattr(msg, 'x') and hasattr(msg, 'y') and hasattr(msg, 'z'):
x, y, z = getattr(msg, 'x'), getattr(msg, 'y'), getattr(msg, 'z')
acc = [x, y, z]
self.accel_data.append(acc)
except Exception:
click.echo(f"Error while loading topic: {topic}")
[docs] def run(self):
with progressbar.ProgressBar(max_value=self.input_rosbag.get_message_count()) as bar:
bar.start()
for topic, msg, t in self.input_rosbag.read_messages():
if t.to_sec() < self.start:
bar.max_value -= 1
bar.update(bar.value)
continue
if t.to_sec() > self.end:
break
self.process(topic=topic, msg=msg, t=t)
bar.update(bar.value + 1)
bar.finish()
[docs] def plot_time_delay(self):
fig = go.Figure()
names = self.time_delay_df['name'].unique()
means = [self.time_delay_df[self.time_delay_df['name'] == name]['delay'].median() for name in names]
for name in names[np.argsort(means)]:
sub_df = self.time_delay_df[self.time_delay_df['name'] == name]
fig.add_trace(go.Box(
y=sub_df['delay'],
name=name,
boxmean='sd'
))
base_path = f'{self.input_rosbag_path.rsplit(".", 1)[0]}'
Path(base_path).mkdir(parents=True, exist_ok=True)
fig.write_html(f'{base_path}/time_delay.html')
plt.rcParams.update({'font.size': 10})
fig = plt.figure(figsize=(6, 2), dpi=600)
ax = fig.add_subplot(111)
ax.set_title(f'Median Time Delay Distribution for {self.rosbag_name.split(".")[0]}')
names = ['/zed2i/zed_node/left/image_rect_color/compressed: header -> receive_time',
# '/zed2i/zed_node/right/image_rect_color: header -> receive_time', # noqa: E800
'/perception/bounding_boxes: image_header -> header',
'/local_mapping/cone_positions: image_header -> header',
'/slam/landmarks: header -> receive_time',
'/filtering/centerpoints: image_header -> header',
'/motion_planning/trajectory: image_header -> receive_time',
'/control/predicted_states: image_header -> header']
labels = ['Camera Wrapper', 'Inference', 'Local Mapping', 'SLAM', 'Filtering', 'Motion Planning', 'Control']
colors = ['blue', 'orange', 'green', 'red', 'purple', 'brown', 'pink']
left = 0
used_labels = []
for name, label, color in zip(names, labels, colors):
# Calculate the median and quartiles
median = self.time_delay_df[self.time_delay_df['name'] == name]['delay'].median()
q1_val = self.time_delay_df[self.time_delay_df['name'] == name]['delay'].quantile(0.25)
q3_val = self.time_delay_df[self.time_delay_df['name'] == name]['delay'].quantile(0.75)
if np.isfinite(median):
# Calculate the error bars
error1 = median - q1_val
error2 = q3_val - median
# Plot the bar
width = median - left
ax.barh([-len(used_labels)], [median], height=0.05, color='grey')
bar = ax.barh([-len(used_labels)], [width], left=left, height=1,
color=color, xerr=np.array(error1, error2).T, capsize=5)
# Update the left position
left += width
used_labels.append(label)
ax.bar_label(bar, fmt='%1.3f', padding=3, labels=[f'{width:0.3f}'])
ax.spines['top'].set_visible(False)
ax.spines['right'].set_visible(False)
ax.spines['left'].set_visible(False)
ax.set_yticks(-np.arange(len(used_labels)))
ax.set_yticklabels(used_labels)
ax.tick_params(
axis='y', # changes apply to the x-axis
which='both', # both major and minor ticks are affected
bottom=False, # ticks along the bottom edge are off
top=False, # ticks along the top edge are off
labelbottom=False,
length=0) # labels along the bottom edge are off
base_path = f'{self.input_rosbag_path.rsplit(".", 1)[0]}'
Path(base_path).mkdir(parents=True, exist_ok=True)
fig.savefig(f'{base_path}/time_delay_distribution.pdf', bbox_inches='tight')
[docs] def plot_actuator_delay(self):
fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(10, 6))
ax1.plot(self.sampling_time_array, self.actuator_request_corrected_array, label='requested angle')
ax1.plot(self.sampling_time_array, self.actuator_actural_corrected_array, label='actual angle')
ax1.set_xlabel('timestamp')
ax1.set_ylabel('angle')
ax1.set_title(f'requested and actual steering wheel angle')
ax2.plot(self.actuator_cross_cov_time, self.actuator_cross_covariance)
ax2.set_xlabel(f'timestamp, maximum at {self.time_max_cross_cov} ms')
ax2.set_ylabel('cross covariance')
ax2.set_title('cross covariance of actuator delay')
plt.tight_layout()
base_path = f'{self.input_rosbag_path.rsplit(".", 1)[0]}'
Path(base_path).mkdir(parents=True, exist_ok=True)
fig.savefig(f'{base_path}/actuator_delay.pdf',
bbox_inches='tight')
data_for_csv = pd.DataFrame({'Rosbag': [self.input_rosbag_path], 'Start': [self.start - self.input_rosbag_start], \
'End': [self.end - self.input_rosbag_start], 'delay': [self.time_max_cross_cov]})
data_for_csv.to_csv(f'{base_path}/../cross_cov_max.csv', mode='a', header=False, index=False)
[docs] def plot(self):
if self.time_delay:
self.plot_time_delay()
if self.actuator_delay:
self.plot_actuator_delay()
[docs] def calculate_acc_rotation_object(self) -> None:
"""
Calculate rotation object for accelerometer.
Uses the mean of the acceleration while standing still to calculate the gravitational acceleration.
"""
# Calculate index
index = int(0.95 * len(self.imu_tilt_data)) # 95% of the data to be sure to be standing still
# what the imu measured where g points to
measured_g = np.nanmean(self.imu_tilt_data[: index], axis=0)
dest_vec = np.array([0, 0, -1]) # g should just point downwards
# https://stackoverflow.com/a/59204638
# Calculate rotation matrix to rotate measure g to dest_vector
a, b = (measured_g / np.linalg.norm(measured_g)).reshape(3), (dest_vec / np.linalg.norm(dest_vec)).reshape(3)
v = np.cross(a, b)
c = np.dot(a, b)
s = np.linalg.norm(v)
kmat = np.array([[0, -v[2], v[1]], [v[2], 0, -v[0]], [-v[1], v[0], 0]])
rotation_matrix = np.eye(3) + kmat + kmat.dot(kmat) * ((1 - c) / (s ** 2))
self.rotation = R.from_matrix(rotation_matrix)
[docs] def finish(self):
if self.time_delay:
self.time_delay_df = pd.DataFrame(self.time_delay_list, columns=['topic', 'delay', 'name'])
if self.actuator_delay:
actuator_request_array = np.array(self.actuator_request_list)
actuator_request_time_array = np.array(self.actuator_request_time_list)
actuator_actual_array = np.array(self.actuator_actual_list)
actuator_actual_time_array = np.array(self.actuator_actual_time_list)
first_time_index = np.argmax(actuator_actual_time_array > actuator_request_time_array[0])
last_time_index = np.argmax(actuator_actual_time_array > actuator_request_time_array[-1])
actuator_actual_time_sliced_array = actuator_actual_time_array[first_time_index + 1:last_time_index - 1]
actuator_actual_sliced_array = actuator_actual_array[first_time_index + 1:last_time_index - 1]
actuator_request_lut = interp1d(actuator_request_time_array, actuator_request_array)
actuator_actual_lut = interp1d(actuator_actual_time_sliced_array, actuator_actual_sliced_array)
sampling_dt = 0.002
n_samples_over_time_frame = \
int(round((actuator_actual_time_sliced_array[-1] - actuator_actual_time_sliced_array[0]) / sampling_dt))
self.sampling_time_array = np.linspace(actuator_actual_time_sliced_array[0], \
actuator_actual_time_sliced_array[-1], \
n_samples_over_time_frame)
self.actuator_request_corrected_array = actuator_request_lut(self.sampling_time_array)
self.actuator_actural_corrected_array = actuator_actual_lut(self.sampling_time_array)
self.actuator_cross_covariance = \
np.correlate(self.actuator_request_corrected_array - np.mean(self.actuator_request_corrected_array), \
self.actuator_actural_corrected_array - np.mean(self.actuator_actural_corrected_array), mode='full')
self.actuator_cross_cov_time = np.arange(-len(self.sampling_time_array) + 1, len(self.sampling_time_array)) \
* sampling_dt
self.time_max_cross_cov = self.actuator_cross_cov_time[np.argmax(self.actuator_cross_covariance)] * 1000
if self.parameter_dump:
parameters_dict = yaml.safe_load(self.parameters_string)
restructured_parameters_dict = {}
for key, value in parameters_dict.items():
current_dict = restructured_parameters_dict
keys = key.split("/")
for k in keys[1:-1]:
if k not in current_dict:
current_dict[k] = {}
current_dict = current_dict[k]
current_dict[keys[-1]] = value
parameter_path = '/workspace/as_ros/rosbags/parameters'
Path(f'{parameter_path}').mkdir(parents=True, exist_ok=True)
with open(f'{parameter_path}/{self.rosbag_name.split(".")[0]}.yaml', 'w') as parameter_file:
yaml.dump(restructured_parameters_dict, parameter_file, default_flow_style=False)
if self.calibration_matrices:
matrices_path = f'/workspace/as_ros/src/local_mapping/scripts/matrices/{self.rosbag_name.split(".")[0]}'
Path(f'{matrices_path}').mkdir(parents=True, exist_ok=True)
np.save(f'{matrices_path}/extrinsic_matrix_left.npy', self.extrinsic_matrix_left)
np.save(f'{matrices_path}/extrinsic_matrix_right.npy', self.extrinsic_matrix_right)
np.save(f'{matrices_path}/intrinsic_matrix_left.npy', self.intrinsic_matrix_left)
np.save(f'{matrices_path}/intrinsic_matrix_right.npy', self.intrinsic_matrix_right)
self.plot()
if self.imu_tilt:
# Calculate Rotation
self.calculate_acc_rotation_object()
# Extrinsic rotation
extrinsic_rotation = self.rotation.as_euler('xyz', degrees=True)
# Intrinsic rotation
intrinsic_rotation = self.rotation.as_euler('XYZ', degrees=True)
# Ask user if values look good and save them
click.echo(f"\nExtrinsic rotation: x:{extrinsic_rotation[0]}, y:{extrinsic_rotation[1]}, z:{extrinsic_rotation[2]}")
click.echo(f"Intrinsic rotation (will be saved): X:{intrinsic_rotation[0]}, Y:{intrinsic_rotation[1]}, Z:{intrinsic_rotation[2]}")
answer = click.prompt('Do the values look good?', type=click.Choice(['y', 'n']), show_choices=True)
if answer == 'y':
save_intrinsic_rotation(self.vehicle_name, intrinsic_rotation)
click.echo(f"Saving imu tilt values for vehicle: {self.vehicle_name}")
elif answer == 'n':
click.echo("Values not saved")
if self.gggv_dump:
transforms_path = f'{os.path.abspath(os.path.dirname(__file__))}/../../config/transforms/{self.vehicle_name}.yaml'
try:
with open(transforms_path, 'r') as transforms_file:
transforms: dict = yaml.load(transforms_file, Loader=yaml.SafeLoader)
# read imu rotation from transforms yaml
intrinsic_rotation = np.zeros(3)
for i, letter in enumerate(("x", "y", "z")):
intrinsic_rotation[i] = transforms['tf']['vehicle']["imu"]["rotation"][letter]
except FileNotFoundError:
print(f"The file {transforms_path} does not exist.")
except yaml.YAMLError as exc:
print(f"Error parsing YAML file: {exc}")
save_rotated_gggv_data(self.vehicle_name, self.accel_data, intrinsic_rotation, self.velocities)
click.echo(f"\n Saved gggv values for vehicle: {self.vehicle_name}")
if __name__ == '__main__':
analyzer = RosbagAnalyzer(input_rosbag='/workspace/as_ros/rosbags/rosbag_name.bag',
start=0, duration=None, end=None, time_delay=False, actuator_delay=False, imu_tilt=False)
analyzer.run()
analyzer.finish()