"""Implementation of all utility function needed for the import of data."""fromtypingimportOptional,Listimportos.pathimportnumpyasnpimportpandasaspdfromscipy.spatial.transformimportRotationasRimportjsonfromasammdfimportMDFfrombagpyimportbagreader
[docs]classRosbagAndMDFImporter():""" Helper Class to import data from a mdf file and if specified also from a bag file. Attributes ---------- mdf_filename : str Filename of the mdf file. rotate_acc : bool Specifies whether the accelerometer should be rotated, by default True. bag_filename : Optional[str] Filename of the bag file, specify if you want to merge bag data, by default None. time_offset : float Time offset between the mdf and bag file. Might need to adapt vor every combination-, by default 0.. df : pd.DataFrame Dataframe containing the imported data. """def__init__(self,mdf_filename:Optional[str]=None,rotate_acc:bool=True,bag_filename:Optional[str]=None,time_offset:float=0.)->None:""" Initialize object to import a mdf file and convert it to a pandas dataframe. Import the specified columns and subsequently rotate the accelerometer. If passed also imports a bag file for the cone positions. Parameters ---------- mdf_filename : Optional[str], optional Filename of the mdf file, by default None rotate_acc : bool, optional Specifies whether the accelerometer should be rotated, by default True bag_filename : Optional[str], optional Filename of the bag file, specify if you want to merge bag data, by default None time_offset : float, optional Time offset between the mdf and bag file. Might need to adapt vor every combination, by default 0. """self.mdf_filename=mdf_filenameself.bag_filename=bag_filenameself.time_offset=time_offsetself.rotate_acc=rotate_accself.df=self.import_data()self.calculate_acc_rotation_object()self.postprocess_data()
[docs]defimport_bag(self,start:Optional[float]=None,end:Optional[float]=None,offset:Optional[float]=None)->pd.DataFrame:""" Import the bag file and return it as dataframe. Bag is being cut, so that it matches with the mdf dataframe. Parameters ---------- start : float, optional Start timestamp that should be returned, by default None end : float, optional End timestamp that should be returned, by default None offset : float, optional Offset in seconds since 01.01.1970, by default None Returns ------- pd.DataFrame Dataframe containing the measured cone positions in cartesian coordinates. """assertself.bag_filenameisnotNonedefstring_to_python_list_of_dicts(string:str)->List[dict]:try:returnjson.loads(str(string).replace('[','[{"').replace(']','}]').replace(', ','},{"').replace('\n',',"').replace(':','":'))exceptException:return[]defexport_topic_from_bag_or_return_filename(bag_filename:str,topic:str)->str:filename=bag_filename.rsplit('.',1)[0]+'/'+topic[1:].replace('/','-')+'.csv'ifnotos.path.isfile(filename):bag=bagreader(bag_filename)returnbag.message_by_topic(topic)returnfilenametime_cols=['header.stamp.secs','header.stamp.nsecs']cones=export_topic_from_bag_or_return_filename(self.bag_filename,topic='/local_mapping/cone_positions')gps_position=export_topic_from_bag_or_return_filename(self.bag_filename,topic='/gps/gps')gps_heading=export_topic_from_bag_or_return_filename(self.bag_filename,topic='/novatel/oem7/heading2')wheelspeed_fr=export_topic_from_bag_or_return_filename(self.bag_filename,topic='/wheelspeed/right')wheelspeed_fl=export_topic_from_bag_or_return_filename(self.bag_filename,topic='/wheelspeed/left')engine_speed=export_topic_from_bag_or_return_filename(self.bag_filename,topic='/engine/rpm')imu_accel=export_topic_from_bag_or_return_filename(self.bag_filename,topic='/imu/accel')imu_gyro=export_topic_from_bag_or_return_filename(self.bag_filename,topic='/imu/gyro')steering_wheel_angle=export_topic_from_bag_or_return_filename(self.bag_filename,topic='/actc_pos')cones_df=pd.read_csv(cones)[time_cols+['cones']]gps_position_df=pd.read_csv(gps_position)[time_cols+['longitude','latitude','speed','track','position_covariance_0','position_covariance_1','position_covariance_3','position_covariance_4']]gps_heading_df=pd.read_csv(gps_heading)[time_cols+['heading','heading_stdev']]wheelspeed_fr_df=pd.read_csv(wheelspeed_fr)[time_cols+['data1']].rename(columns={'data1':'wheelspeed_fr'})wheelspeed_fl_df=pd.read_csv(wheelspeed_fl)[time_cols+['data1']].rename(columns={'data1':'wheelspeed_fl'})engine_speed_df=pd.read_csv(engine_speed)[time_cols+['data1','data2']].rename(columns={'data1':'wheelspeed_rl','data2':'wheelspeed_rr'})steering_wheel_angle_df=pd.read_csv(steering_wheel_angle)[time_cols+['data']].rename(columns={'data':'steering_wheel_angle'})imu_gyro_df=pd.read_csv(imu_gyro)[time_cols+['angular_velocity.z']]imu_accel_df=pd.read_csv(imu_accel)[time_cols+['linear_acceleration.x','linear_acceleration.y','linear_acceleration.z']]bag_df=pd.concat([cones_df,gps_position_df,gps_heading_df,wheelspeed_fr_df,wheelspeed_fl_df,engine_speed_df,steering_wheel_angle_df,imu_gyro_df,imu_accel_df],axis=0)bag_df['Time']=bag_df['header.stamp.secs']+bag_df['header.stamp.nsecs']*(10**(-9))bag_df.drop(time_cols,axis='columns',inplace=True)bag_df=bag_df.groupby('Time').first()ifoffsetisNone:offset=bag_df.index.min()bag_df.index-=offsetifstartisnotNoneandendisnotNone:bag_df=bag_df.loc[(bag_df['Time']>=start)&(bag_df['Time']<=end)]bag_df['cones']=bag_df['cones'].map(string_to_python_list_of_dicts)bag_df['cones']=bag_df['cones'].apply(lambdax:np.array([[cone['x']/1000,cone['y']/1000,cone['id'],cone['probability']]forconeinx])iflen(x)>0elsenp.nan)returnbag_df.sort_index(ascending=True)
[docs]defimport_data(self)->pd.DataFrame:""" Import the mdf file and optional an additional bag file and return it as dataframe. If the filename of the bag file is specified, the bag file is merged into the mdf file. Returns ------- pd.DataFrame mdf file as pandas dataframe. """ifself.mdf_filenameisnotNone:mdf=MDF(self.mdf_filename)mdf_filtered=mdf.filter(['Aceinna_AccX','Aceinna_AccY','Aceinna_AccZ','Aceinna_GyroX','Aceinna_GyroY','Aceinna_GyroZ','WFR_WHEELSPEED','WFL_WHEELSPEED','INVL_N_Actual_Filt','INVR_N_Actual_Filt','ACTC_POS'])mdf_df=mdf_filtered.to_dataframe(raster=None,use_interpolation=False,time_from_zero=False)ifself.bag_filenameisnotNone:offset=mdf.start_time.timestamp()-self.time_offsetstart,end=mdf_df.index[[0,-1]].to_numpy(dtype=np.float64)bag_df=self.import_bag(start=start,end=end,offset=offset)df=pd.concat([mdf_df,bag_df]).sort_index(ascending=True)else:df=mdf_dfelse:assertself.bag_filenameisnotNonedf=self.import_bag()returndf
[docs]defcalculate_acc_rotation_object(self)->None:""" Calculate rotation object for accelerometer. Uses the mean of the acceleration while standing still to calculate the gravitational acceleration. """start_ts=self.df['wheelspeed_fr'].dropna().ne(0).idxmax()# what the imu measured where g points tomeasured_g=np.nanmean(self.df[['linear_acceleration.x','linear_acceleration.y','linear_acceleration.z']].loc[:start_ts].values,axis=0)dest_vec=np.array([0,0,-1])# g should just point downwards# https://stackoverflow.com/a/59204638# Calculate rotation matrix to rotate measure g to dest_vectora,b=(measured_g/np.linalg.norm(measured_g)).reshape(3),(dest_vec/np.linalg.norm(dest_vec)).reshape(3)v=np.cross(a,b)c=np.dot(a,b)s=np.linalg.norm(v)kmat=np.array([[0,-v[2],v[1]],[v[2],0,-v[0]],[-v[1],v[0],0]])rotation_matrix=np.eye(3)+kmat+kmat.dot(kmat)*((1-c)/(s**2))self.rotation=R.from_matrix(rotation_matrix)
[docs]defpostprocess_data(self)->None:""" Postprocess the imported data. At the moment only rotate the accelerometer. """self.df.loc[:,['linear_acceleration.x','linear_acceleration.y','linear_acceleration.z']]= \
self.rotation.apply(self.df[['linear_acceleration.x','linear_acceleration.y','linear_acceleration.z']].values)
[docs]defimport_map_csv(map_csv_filename:str)->pd.DataFrame:""" Imports the ground truth of the landmark positions from a csv. CSV should be the export of the CURE intern track creator tool. Parameters ---------- map_csv_filename : str Filename and location of the map csv. Returns ------- pd.DataFrame Pandas Dataframe containing the cone positions of the ground truth. """df=pd.read_csv(f'{os.path.abspath(os.path.dirname(__file__))}/../../data/{map_csv_filename}')cones=df.to_numpy()returncones