"""
Implementation of all utility function needed for the import of data.
"""
from typing import Optional, List
import os.path
import numpy as np
import pandas as pd
from scipy.spatial.transform import Rotation as R
import json
from asammdf import MDF
from bagpy import bagreader
[docs]class RosbagAndMDFImporter():
"""
Helper Class to import data from a mdf file and if specified also from a bag file.
Attributes
----------
mdf_filename : str
Filename of the mdf file.
rotate_acc : bool
Specifies whether the accelerometer should be rotated, by default True.
bag_filename : Optional[str]
Filename of the bag file, specify if you want to merge bag data, by default None.
time_offset : float
Time offset between the mdf and bag file. Might need to adapt vor every combination-, by default 0..
df : pd.DataFrame
Dataframe containing the imported data.
"""
def __init__(self, mdf_filename: Optional[str] = None, rotate_acc: bool = True,
bag_filename: Optional[str] = None, time_offset: float = 0.) -> None:
"""
Initialize object to import a mdf file and convert it to a pandas dataframe.
Import the specified columns and subsequently rotate the accelerometer.
If passed also imports a bag file for the cone positions.
Parameters
----------
mdf_filename : Optional[str], optional
Filename of the mdf file, by default None
rotate_acc : bool, optional
Specifies whether the accelerometer should be rotated, by default True
bag_filename : Optional[str], optional
Filename of the bag file, specify if you want to merge bag data, by default None
time_offset : float, optional
Time offset between the mdf and bag file. Might need to adapt vor every combination, by default 0.
"""
self.mdf_filename = mdf_filename
self.bag_filename = bag_filename
self.time_offset = time_offset
self.rotate_acc = rotate_acc
self.df = self.import_data()
self.calculate_acc_rotation_object()
self.postprocess_data()
[docs] def import_bag(self, start: Optional[float] = None, end: Optional[float] = None,
offset: Optional[float] = None) -> pd.DataFrame:
"""
Import the bag file and return it as dataframe.
Bag is being cut, so that it matches with the mdf dataframe.
Parameters
----------
start : float, optional
Start timestamp that should be returned, by default None
end : float, optional
End timestamp that should be returned, by default None
offset : float, optional
Offset in seconds since 01.01.1970, by default None
Returns
-------
pd.DataFrame
Dataframe containing the measured cone positions in cartesian coordinates.
"""
assert self.bag_filename is not None
def string_to_python_list_of_dicts(string: str) -> List[dict]:
try:
return json.loads(str(string).replace('[', '[{"').replace(']', '}]')
.replace(', ', '},{"').replace('\n', ',"').replace(':', '":'))
except Exception:
return []
def export_topic_from_bag_or_return_filename(bag_filename: str, topic: str) -> str:
filename = bag_filename.rsplit('.', 1)[0] + '/' + topic[1:].replace('/', '-') + '.csv'
if not os.path.isfile(filename):
bag = bagreader(bag_filename)
return bag.message_by_topic(topic)
return filename
time_cols = ['header.stamp.secs', 'header.stamp.nsecs']
cones = export_topic_from_bag_or_return_filename(self.bag_filename, topic='/local_mapping/cone_positions')
gps_position = export_topic_from_bag_or_return_filename(self.bag_filename, topic='/gps/gps')
gps_heading = export_topic_from_bag_or_return_filename(self.bag_filename, topic='/novatel/oem7/heading2')
wheelspeed_fr = export_topic_from_bag_or_return_filename(self.bag_filename, topic='/wheelspeed/right')
wheelspeed_fl = export_topic_from_bag_or_return_filename(self.bag_filename, topic='/wheelspeed/left')
engine_speed = export_topic_from_bag_or_return_filename(self.bag_filename, topic='/engine/rpm')
imu_accel = export_topic_from_bag_or_return_filename(self.bag_filename, topic='/imu/accel')
imu_gyro = export_topic_from_bag_or_return_filename(self.bag_filename, topic='/imu/gyro')
steering_wheel_angle = export_topic_from_bag_or_return_filename(self.bag_filename, topic='/actc_pos')
cones_df = pd.read_csv(cones)[time_cols + ['cones']]
gps_position_df = pd.read_csv(gps_position)[time_cols + ['longitude', 'latitude', 'speed', 'track',
'position_covariance_0', 'position_covariance_1',
'position_covariance_3', 'position_covariance_4']]
gps_heading_df = pd.read_csv(gps_heading)[time_cols + ['heading', 'heading_stdev']]
wheelspeed_fr_df = pd.read_csv(wheelspeed_fr)[time_cols + ['data1']].rename(columns={'data1': 'wheelspeed_fr'})
wheelspeed_fl_df = pd.read_csv(wheelspeed_fl)[time_cols + ['data1']].rename(columns={'data1': 'wheelspeed_fl'})
engine_speed_df = pd.read_csv(engine_speed)[time_cols + ['data1', 'data2']].rename(
columns={'data1': 'wheelspeed_rl', 'data2': 'wheelspeed_rr'})
steering_wheel_angle_df = pd.read_csv(steering_wheel_angle)[time_cols + ['data']].rename(
columns={'data': 'steering_wheel_angle'})
imu_gyro_df = pd.read_csv(imu_gyro)[time_cols + ['angular_velocity.z']]
imu_accel_df = pd.read_csv(imu_accel)[time_cols + ['linear_acceleration.x', 'linear_acceleration.y',
'linear_acceleration.z']]
bag_df = pd.concat([cones_df, gps_position_df, gps_heading_df, wheelspeed_fr_df, wheelspeed_fl_df,
engine_speed_df, steering_wheel_angle_df, imu_gyro_df, imu_accel_df], axis=0)
bag_df['Time'] = bag_df['header.stamp.secs'] + bag_df['header.stamp.nsecs'] * (10**(-9))
bag_df.drop(time_cols, axis='columns', inplace=True)
bag_df = bag_df.groupby('Time').first()
if offset is None:
offset = bag_df.index.min()
bag_df.index -= offset
if start is not None and end is not None:
bag_df = bag_df.loc[(bag_df['Time'] >= start) & (bag_df['Time'] <= end)]
bag_df['cones'] = bag_df['cones'].map(string_to_python_list_of_dicts)
bag_df['cones'] = bag_df['cones'].apply(
lambda x: np.array([[cone['x']/1000, cone['y']/1000, cone['id'],
cone['probability']] for cone in x]) if len(x) > 0 else np.nan)
return bag_df.sort_index(ascending=True)
[docs] def import_data(self) -> pd.DataFrame:
"""
Import the mdf file and optional an additional bag file and return it as dataframe.
If the filename of the bag file is specified, the bag file is merged into the mdf file.
Returns
-------
pd.DataFrame
mdf file as pandas dataframe.
"""
if self.mdf_filename is not None:
mdf = MDF(self.mdf_filename)
mdf_filtered = mdf.filter(['Aceinna_AccX', 'Aceinna_AccY', 'Aceinna_AccZ',
'Aceinna_GyroX', 'Aceinna_GyroY', 'Aceinna_GyroZ',
'WFR_WHEELSPEED', 'WFL_WHEELSPEED', 'INVL_N_Actual_Filt', 'INVR_N_Actual_Filt',
'ACTC_POS'])
mdf_df = mdf_filtered.to_dataframe(raster=None, use_interpolation=False, time_from_zero=False)
if self.bag_filename is not None:
offset = mdf.start_time.timestamp() - self.time_offset
start, end = mdf_df.index[[0, -1]].to_numpy(dtype=np.float64)
bag_df = self.import_bag(start=start, end=end, offset=offset)
df = pd.concat([mdf_df, bag_df]).sort_index(ascending=True)
else:
df = mdf_df
else:
assert self.bag_filename is not None
df = self.import_bag()
return df
[docs] def calculate_acc_rotation_object(self) -> None:
"""
Calculate rotation object for accelerometer.
Uses the mean of the acceleration while standing still to calculate the gravitational acceleration.
"""
start_ts = self.df['wheelspeed_fr'].dropna().ne(0).idxmax()
# what the imu measured where g points to
measured_g = np.nanmean(self.df[['linear_acceleration.x', 'linear_acceleration.y',
'linear_acceleration.z']].loc[:start_ts].values, axis=0)
dest_vec = np.array([0, 0, -1]) # g should just point downwards
# https://stackoverflow.com/a/59204638
# Calculate rotation matrix to rotate measure g to dest_vector
a, b = (measured_g / np.linalg.norm(measured_g)).reshape(3), (dest_vec / np.linalg.norm(dest_vec)).reshape(3)
v = np.cross(a, b)
c = np.dot(a, b)
s = np.linalg.norm(v)
kmat = np.array([[0, -v[2], v[1]], [v[2], 0, -v[0]], [-v[1], v[0], 0]])
rotation_matrix = np.eye(3) + kmat + kmat.dot(kmat) * ((1 - c) / (s ** 2))
self.rotation = R.from_matrix(rotation_matrix)
[docs] def postprocess_data(self) -> None:
"""
Postprocess the imported data.
At the moment only rotate the accelerometer.
"""
self.df.loc[:, ['linear_acceleration.x', 'linear_acceleration.y', 'linear_acceleration.z']] = \
self.rotation.apply(self.df[['linear_acceleration.x', 'linear_acceleration.y',
'linear_acceleration.z']].values)
[docs]def import_map_csv(map_csv_filename: str) -> pd.DataFrame:
"""
Imports the ground truth of the landmark positions from a csv.
CSV should be the export of the CURE intern track creator tool.
Parameters
----------
map_csv_filename : str
Filename and location of the map csv.
Returns
-------
pd.DataFrame
Pandas Dataframe containing the cone positions of the ground truth.
"""
df = pd.read_csv(f'{os.path.abspath(os.path.dirname(__file__))}/../../data/{map_csv_filename}')
cones = df.to_numpy()
return cones