Source code for utils.data_import

"""
Implementation of all utility function needed for the import of data.
"""
from typing import Optional, List

import os.path

import numpy as np
import pandas as pd

from scipy.spatial.transform import Rotation as R

import json

from asammdf import MDF
from bagpy import bagreader


[docs]class RosbagAndMDFImporter(): """ Helper Class to import data from a mdf file and if specified also from a bag file. Attributes ---------- mdf_filename : str Filename of the mdf file. rotate_acc : bool Specifies whether the accelerometer should be rotated, by default True. bag_filename : Optional[str] Filename of the bag file, specify if you want to merge bag data, by default None. time_offset : float Time offset between the mdf and bag file. Might need to adapt vor every combination-, by default 0.. df : pd.DataFrame Dataframe containing the imported data. """ def __init__(self, mdf_filename: Optional[str] = None, rotate_acc: bool = True, bag_filename: Optional[str] = None, time_offset: float = 0.) -> None: """ Initialize object to import a mdf file and convert it to a pandas dataframe. Import the specified columns and subsequently rotate the accelerometer. If passed also imports a bag file for the cone positions. Parameters ---------- mdf_filename : Optional[str], optional Filename of the mdf file, by default None rotate_acc : bool, optional Specifies whether the accelerometer should be rotated, by default True bag_filename : Optional[str], optional Filename of the bag file, specify if you want to merge bag data, by default None time_offset : float, optional Time offset between the mdf and bag file. Might need to adapt vor every combination, by default 0. """ self.mdf_filename = mdf_filename self.bag_filename = bag_filename self.time_offset = time_offset self.rotate_acc = rotate_acc self.df = self.import_data() self.calculate_acc_rotation_object() self.postprocess_data()
[docs] def import_bag(self, start: Optional[float] = None, end: Optional[float] = None, offset: Optional[float] = None) -> pd.DataFrame: """ Import the bag file and return it as dataframe. Bag is being cut, so that it matches with the mdf dataframe. Parameters ---------- start : float, optional Start timestamp that should be returned, by default None end : float, optional End timestamp that should be returned, by default None offset : float, optional Offset in seconds since 01.01.1970, by default None Returns ------- pd.DataFrame Dataframe containing the measured cone positions in cartesian coordinates. """ assert self.bag_filename is not None def string_to_python_list_of_dicts(string: str) -> List[dict]: try: return json.loads(str(string).replace('[', '[{"').replace(']', '}]') .replace(', ', '},{"').replace('\n', ',"').replace(':', '":')) except Exception: return [] def export_topic_from_bag_or_return_filename(bag_filename: str, topic: str) -> str: filename = bag_filename.rsplit('.', 1)[0] + '/' + topic[1:].replace('/', '-') + '.csv' if not os.path.isfile(filename): bag = bagreader(bag_filename) return bag.message_by_topic(topic) return filename time_cols = ['header.stamp.secs', 'header.stamp.nsecs'] cones = export_topic_from_bag_or_return_filename(self.bag_filename, topic='/local_mapping/cone_positions') gps_position = export_topic_from_bag_or_return_filename(self.bag_filename, topic='/gps/gps') gps_heading = export_topic_from_bag_or_return_filename(self.bag_filename, topic='/novatel/oem7/heading2') wheelspeed_fr = export_topic_from_bag_or_return_filename(self.bag_filename, topic='/wheelspeed/right') wheelspeed_fl = export_topic_from_bag_or_return_filename(self.bag_filename, topic='/wheelspeed/left') engine_speed = export_topic_from_bag_or_return_filename(self.bag_filename, topic='/engine/rpm') imu_accel = export_topic_from_bag_or_return_filename(self.bag_filename, topic='/imu/accel') imu_gyro = export_topic_from_bag_or_return_filename(self.bag_filename, topic='/imu/gyro') steering_wheel_angle = export_topic_from_bag_or_return_filename(self.bag_filename, topic='/actc_pos') cones_df = pd.read_csv(cones)[time_cols + ['cones']] gps_position_df = pd.read_csv(gps_position)[time_cols + ['longitude', 'latitude', 'speed', 'track', 'position_covariance_0', 'position_covariance_1', 'position_covariance_3', 'position_covariance_4']] gps_heading_df = pd.read_csv(gps_heading)[time_cols + ['heading', 'heading_stdev']] wheelspeed_fr_df = pd.read_csv(wheelspeed_fr)[time_cols + ['data1']].rename(columns={'data1': 'wheelspeed_fr'}) wheelspeed_fl_df = pd.read_csv(wheelspeed_fl)[time_cols + ['data1']].rename(columns={'data1': 'wheelspeed_fl'}) engine_speed_df = pd.read_csv(engine_speed)[time_cols + ['data1', 'data2']].rename( columns={'data1': 'wheelspeed_rl', 'data2': 'wheelspeed_rr'}) steering_wheel_angle_df = pd.read_csv(steering_wheel_angle)[time_cols + ['data']].rename( columns={'data': 'steering_wheel_angle'}) imu_gyro_df = pd.read_csv(imu_gyro)[time_cols + ['angular_velocity.z']] imu_accel_df = pd.read_csv(imu_accel)[time_cols + ['linear_acceleration.x', 'linear_acceleration.y', 'linear_acceleration.z']] bag_df = pd.concat([cones_df, gps_position_df, gps_heading_df, wheelspeed_fr_df, wheelspeed_fl_df, engine_speed_df, steering_wheel_angle_df, imu_gyro_df, imu_accel_df], axis=0) bag_df['Time'] = bag_df['header.stamp.secs'] + bag_df['header.stamp.nsecs'] * (10**(-9)) bag_df.drop(time_cols, axis='columns', inplace=True) bag_df = bag_df.groupby('Time').first() if offset is None: offset = bag_df.index.min() bag_df.index -= offset if start is not None and end is not None: bag_df = bag_df.loc[(bag_df['Time'] >= start) & (bag_df['Time'] <= end)] bag_df['cones'] = bag_df['cones'].map(string_to_python_list_of_dicts) bag_df['cones'] = bag_df['cones'].apply( lambda x: np.array([[cone['x']/1000, cone['y']/1000, cone['id'], cone['probability']] for cone in x]) if len(x) > 0 else np.nan) return bag_df.sort_index(ascending=True)
[docs] def import_data(self) -> pd.DataFrame: """ Import the mdf file and optional an additional bag file and return it as dataframe. If the filename of the bag file is specified, the bag file is merged into the mdf file. Returns ------- pd.DataFrame mdf file as pandas dataframe. """ if self.mdf_filename is not None: mdf = MDF(self.mdf_filename) mdf_filtered = mdf.filter(['Aceinna_AccX', 'Aceinna_AccY', 'Aceinna_AccZ', 'Aceinna_GyroX', 'Aceinna_GyroY', 'Aceinna_GyroZ', 'WFR_WHEELSPEED', 'WFL_WHEELSPEED', 'INVL_N_Actual_Filt', 'INVR_N_Actual_Filt', 'ACTC_POS']) mdf_df = mdf_filtered.to_dataframe(raster=None, use_interpolation=False, time_from_zero=False) if self.bag_filename is not None: offset = mdf.start_time.timestamp() - self.time_offset start, end = mdf_df.index[[0, -1]].to_numpy(dtype=np.float64) bag_df = self.import_bag(start=start, end=end, offset=offset) df = pd.concat([mdf_df, bag_df]).sort_index(ascending=True) else: df = mdf_df else: assert self.bag_filename is not None df = self.import_bag() return df
[docs] def calculate_acc_rotation_object(self) -> None: """ Calculate rotation object for accelerometer. Uses the mean of the acceleration while standing still to calculate the gravitational acceleration. """ start_ts = self.df['wheelspeed_fr'].dropna().ne(0).idxmax() # what the imu measured where g points to measured_g = np.nanmean(self.df[['linear_acceleration.x', 'linear_acceleration.y', 'linear_acceleration.z']].loc[:start_ts].values, axis=0) dest_vec = np.array([0, 0, -1]) # g should just point downwards # https://stackoverflow.com/a/59204638 # Calculate rotation matrix to rotate measure g to dest_vector a, b = (measured_g / np.linalg.norm(measured_g)).reshape(3), (dest_vec / np.linalg.norm(dest_vec)).reshape(3) v = np.cross(a, b) c = np.dot(a, b) s = np.linalg.norm(v) kmat = np.array([[0, -v[2], v[1]], [v[2], 0, -v[0]], [-v[1], v[0], 0]]) rotation_matrix = np.eye(3) + kmat + kmat.dot(kmat) * ((1 - c) / (s ** 2)) self.rotation = R.from_matrix(rotation_matrix)
[docs] def postprocess_data(self) -> None: """ Postprocess the imported data. At the moment only rotate the accelerometer. """ self.df.loc[:, ['linear_acceleration.x', 'linear_acceleration.y', 'linear_acceleration.z']] = \ self.rotation.apply(self.df[['linear_acceleration.x', 'linear_acceleration.y', 'linear_acceleration.z']].values)
[docs]def import_map_csv(map_csv_filename: str) -> pd.DataFrame: """ Imports the ground truth of the landmark positions from a csv. CSV should be the export of the CURE intern track creator tool. Parameters ---------- map_csv_filename : str Filename and location of the map csv. Returns ------- pd.DataFrame Pandas Dataframe containing the cone positions of the ground truth. """ df = pd.read_csv(f'{os.path.abspath(os.path.dirname(__file__))}/../../data/{map_csv_filename}') cones = df.to_numpy() return cones