[docs]classUKF(UnscentedKalmanFilter):""" Unscented Kalman Filter class which inherits from the UKF implementation in filterpy. Filter used to combine control inputs from the steering wheel and measurements from the IMU, the wheel speed sensors and the GPS. The goal is to approximate the driven track of our car as accurately as possible. The control inputs are used to predict the next state vector which is then updated using the measurements from the sensors. Attributes ---------- x : npt.NDArray State vector of x, y, v_x, psi. x_names : List[str] Names of the different parameters in x. u : npt.NDArray Control input at one timestamp. ros_u : npt.NDArray Variable to prevent u from being overwritten during ROS callbacks. u_array : npt.NDArray Control inputs at all timestamps. u_names : List[str] Name of the control input in u. t_array : npt.NDArray Timestamps where measurements or control inputs exist. sensors : List[Sensor] The sensors used for the update step. They hold the measurements z, covariances R and the transformation function Hx. last_prediction : int Index of the control input for which the last prediction was done. predict_tolerance : float Time in which the same prediction can be used. If exceeded, a new prediction is necessary. P : npt.NDArray State covariance matrix. Holds the covariances of all components of x. Q : npt.NDArray Process covariance matrix. landmark_ids : List[float] List of ids of tracked landmarks. fake_landmark_ids : List[float] List of ids of tracked landmarks to send out. Used to fake ids for skidpad cones for planning. landmark_perceived_n : List[int] List of how often this specifc landmark has already been perceived. mapping : List[npt.NDArray] Mapping / association between the current observed landmarks and all tracked landmarks. local_mapping_updates : List[int] List of all time steps the kalman filter has been updated with the local mapping. local_mapping_nodes_explored_n : int Number of explored nodes during data association. local_mapping_compared_associations_n : int Number of finally compared best associations object during data association. individual_compatability : Optional[npt.NDArray] Individual compatibility between the observed and observable landmarks of the last local mapping update. observable_to_global_mapping : Optional[npt.NDArray] Mapping between the observable and all tracked landmarks of the last local mapping update. update_strategy : helpers.UpdateStrategy Update strategy to use during the update step. initial_world_heading : Optional[float] Vehicle heading in world frame during first measurement. initial_map_heading : Optional[float] Vehicle heading in map frame during first vehicle heading in world frame measurement. executed_steps : List[str] String representation of executed steps (predict, update for sensors) for this step. template_Q : npt.NDArray Template for robot pose process noise Q which only needs to be multiplied by dt for current predict step. max_z_dim : int Maximum dimension of measurements of all sensors. max_decimals : int Maximum number of decimal places behind the comma for the state noise matrix. observed_landmarks : npt.NDArray Currently observed landmark relative to vehicle pose. observable_landmarks : npt.NDArray Currently observable landmarks relative to vehicle pose. observable_state_covariance : npt.NDArray Covariance of currently observable landmark. sigma_alpha : float Parameter for Merwe Sigma Points. minimal_cone_covariance_area : float Minimal size of the covariance ellipse. perceived_n_threshold : int How many times the landmark should been perceived to not be discarded during map clean up. steering_wheel_to_ackermann_lut : Look up table to convert steering wheel angle (degrees) to ackermann angle (degrees). """def__init__(self,dt:float=0.01,update_strategy:helpers.UpdateStrategy=helpers.UpdateStrategy.ALL_STATES,sigma_alpha:float=0.1,minimal_cone_covariance_area:float=0.15,u_dim:int=None,u_array:npt.NDArray=None,t_array:npt.NDArray=None,name:str='ukf',P:List[float]=[0.1,0.1,0.2,3],template_Q:List[float]=[1,1,5,15],perceived_n_threshold:int=1,steering_model:Optional[dict]=None,steering_wheel_to_ackermann_lut:Optional[interpolate.interp1d]=None)->None:""" Initialization function. Parameters ---------- dt : float, optional Time in which the same prediction can be used. If exceeded, a new prediction is necessary, by default 0.01 update_strategy : UpdateStrategy, optional Update strategy to use during the update step, by default UpdateStrategy.ALL_STATES u_dim : int, optional Dimension of control inputs, by default None u_array : npt.NDArray, optional Control inputs at all timestamps, by default None t_array : npt.NDArray, optional Timestamps where measurements or control inputs exist, by default None P: List[float], Optional State covariance matrix. Holds the covariances of all components of x. template_Q: List[float], optional Template for robot pose process noise Q which only needs to be multiplied by dt for current predict step. perceived_n_threshold : int, optional How many times the landmark should been perceived to not be discarded during map clean up. """assertsteering_wheel_to_ackermann_lutisnotNoneorsteering_modelisnotNoneself.x_r_dim=4# robot pose xself.x=np.zeros(self.x_r_dim)# Logic to decide where to get the dimension of u fromifu_arrayisnotNone:u_array=u_array.reshape((u_array.shape[0],u_array.shape[-1]))ifu_dimisnotNoneandu_arrayisnotNoneandu_dim!=u_array.shape[-1]:u_dim=u_array.shape[-1]warnings.warn('Warning: The specified dimension of u is not the same as the dimension of the given u_array.'' We used the dimension of the u_array.')elifu_dimisNoneandu_arrayisnotNone:u_dim=u_array.shape[-1]elifu_dimisNone:u_dim=0self.u=np.full(u_dim,np.nan)self.ros_u=np.full(u_dim,np.nan)self.max_z_dim=1self.u_array=u_arrayself.t_array=t_arrayself.sensors:List[Sensor]=[]self.last_prediction:int=0self.update_strategy=update_strategyself.executed_steps:List[str]=[]self.sensor_updated_z_indices:List[npt.NDArray]=[]self.predict_tolerance=dtself.x_names=['x','y','v_x','psi']self.u_names=['delta','a_x','psi_dot']self.current_global_landmarks:Optional[npt.NDArray]=Noneself.initial_heading_t:Optional[float]=Noneself.initial_world_heading:Optional[float]=Noneself.initial_map_heading:Optional[float]=Noneself.landmark_ids:List[float]=[]self.fake_landmark_ids:List[float]=[]self.landmark_perceived_n:List[int]=[]self.perceived_n_threshold=perceived_n_thresholdself.mapping:List[npt.NDArray]=[]self.individual_compatability:Optional[npt.NDArray]=Noneself.observable_to_global_mapping:Optional[npt.NDArray]=Noneself.local_mapping_updates:List[int]=[]self.local_mapping_nodes_explored_n:int=0self.local_mapping_compared_associations_n:int=0template_Q[3]=np.radians(template_Q[3])self.template_Q=np.diag(np.array(template_Q)**2)self.observed_landmarks:npt.NDArray=Noneself.observable_landmarks:npt.NDArray=Noneself.observable_state_covariance:npt.NDArray=Noneself.sigma_alpha=sigma_alphaself.minimal_cone_covariance_area=minimal_cone_covariance_areaself.name=nameifsteering_wheel_to_ackermann_lutisnotNone:self.steering_wheel_to_ackermann_lut=steering_wheel_to_ackermann_lutelse:self.calculate_inverse_steering_lookup_table(model=steering_model)super().__init__(dim_x=self.x.shape[0],dim_z=0,dt=dt,hx=Sensor.hx,fx=self.f_x,points=MerwePoints(n=self.x.shape[0],alpha=self.sigma_alpha,beta=2,kappa=3-self.x.shape[0],subtract=self.residual_sigmaf_and_x),x_mean_fn=self.state_mean)P[3]=np.radians(P[3])self.P=np.diag(np.array(P)**2)self.max_decimals=7def__repr__(self):returnf'<UKF {self.name}>'
[docs]defcalculate_inverse_steering_lookup_table(self,model:dict,num:int=50):# define the range of the steering_wheelsteering_wheel_angle=np.linspace(-0.5*model['steering_range'],0.5*model['steering_range'],num=num)# calculate the corresponding Ackermann-Angle (between x-axis an tire) for each tireackermann_angle=np.empty_like(steering_wheel_angle)foriinrange(ackermann_angle.shape[0]):ackermann_angle[i]=vehicle_dynamics.steering_wheel_to_ackermann(steering_wheel_angle[i],model=model)# return the array for the steering_wheel_anngle and the corresponding angle of the tireself.steering_wheel_to_ackermann_lut=interpolate.interp1d(np.degrees(steering_wheel_angle),ackermann_angle)
[docs]defcopy(self,x:npt.NDArray,P:npt.NDArray,timestamp:float,name:str='default')->'UKF':copy=UKF(dt=self.predict_tolerance,update_strategy=self.update_strategy,sigma_alpha=self.sigma_alpha,minimal_cone_covariance_area=self.minimal_cone_covariance_area,u_dim=self.u.shape[0],steering_wheel_to_ackermann_lut=self.steering_wheel_to_ackermann_lut,perceived_n_threshold=self.perceived_n_threshold)copy.template_Q=self.template_Qcopy.P=P.copy()copy.x=x.copy()copy.landmark_ids=self.landmark_ids.copy()copy.fake_landmark_ids=self.fake_landmark_ids.copy()copy.landmark_perceived_n=self.landmark_perceived_n.copy()copy.t=timestampcopy.name=name# only set world and map heading if timestamp of copy is later than transformer definitionifself.initial_heading_tisnotNoneandtimestamp>self.initial_heading_t:copy.initial_world_heading=self.initial_world_headingcopy.initial_map_heading=self.initial_map_headingcopy.initial_heading_t=self.initial_heading_tforsensorinself.sensors:copy.add_sensor(sensor.copy(filter=copy,timestamp=timestamp))copy.sensor_z=self.sensor_z.copy()copy.sensor_R=self.sensor_R.copy()copy.max_z_dim=self.max_z_dimreturncopy
[docs]defsensor(self,sensor_name:str)->Tuple[int,Sensor]:""" Return the index and the sensor object given the sensor name. ... Parameters ---------- sensor_name : str Name of the sensor. Returns ------- Tuple[int, Sensor] The index and the sensor object. Raises ------ KeyError Raised if the sensor can not be found in the list of the filter's sensors. """fori,sensorinenumerate(self.sensors):ifsensor.sensor_name==sensor_name:returni,sensorraiseKeyError(f'The sensor {sensor_name} is not in the list of sensors.')
@propertydefx_necessary(self)->npt.NDArray:""" Returns the necessary states for the prediction and update based on self.x_indices_to_update. Returns ------- npt.NDArray Necessary states for the prediction and update based on self.x_indices_to_update. """ifself.update_strategy==helpers.UpdateStrategy.ALL_STATES:returnself.xelifself.update_strategy==helpers.UpdateStrategy.ONLY_ROBOT_POSE_OR_ALL_STATES:iflen(self.x_indices_to_update)<=self.x_r_dim:returnself.x[:self.x_r_dim]else:returnself.xelifself.update_strategy==helpers.UpdateStrategy.ONLY_ROBOT_POSE_AND_NECESSARY_STATES:state_indices=np.sort(np.array(list(self.x_indices_to_update),dtype=int))returnself.x[state_indices]@x_necessary.setterdefx_necessary(self,value:npt.NDArray):""" Sets the necessary states for the prediction and update based on self.x_indices_to_update. Parameters ---------- value : npt.NDArray State variables values to set. """ifself.update_strategy==helpers.UpdateStrategy.ALL_STATES:self.x=valueelifself.update_strategy==helpers.UpdateStrategy.ONLY_ROBOT_POSE_OR_ALL_STATES:iflen(self.x_indices_to_update)<=self.x_r_dim:self.x[:self.x_r_dim]=valueelse:self.x=valueelifself.update_strategy==helpers.UpdateStrategy.ONLY_ROBOT_POSE_AND_NECESSARY_STATES:state_indices=np.sort(np.array(list(self.x_indices_to_update),dtype=int))self.x[state_indices]=value@propertydefP_necessary(self)->npt.NDArray:""" Returns the necessary state noises for the prediction and update based on self.x_indices_to_update. Returns ------- npt.NDArray Necessary state noises for the prediction and update based on self.x_indices_to_update. """ifself.update_strategy==helpers.UpdateStrategy.ALL_STATES:returnself.Pelifself.update_strategy==helpers.UpdateStrategy.ONLY_ROBOT_POSE_OR_ALL_STATES:iflen(self.x_indices_to_update)<=self.x_r_dim:robot_pose_state_indices=np.arange(self.x_r_dim,dtype=int)returnself.P[np.ix_(robot_pose_state_indices,robot_pose_state_indices)]else:returnself.Pelifself.update_strategy==helpers.UpdateStrategy.ONLY_ROBOT_POSE_AND_NECESSARY_STATES:state_indices=np.sort(np.array(list(self.x_indices_to_update),dtype=int))returnself.P[np.ix_(state_indices,state_indices)]@P_necessary.setterdefP_necessary(self,value:npt.NDArray):""" Sets the necessary state noises for the prediction and update based on self.x_indices_to_update. Parameters ---------- value : npt.NDArray State noises to set. """ifself.update_strategy==helpers.UpdateStrategy.ALL_STATES:self.P=valueelifself.update_strategy==helpers.UpdateStrategy.ONLY_ROBOT_POSE_OR_ALL_STATES:iflen(self.x_indices_to_update)<=self.x_r_dim:robot_pose_state_indices=np.arange(self.x_r_dim,dtype=int)self.P[np.ix_(robot_pose_state_indices,robot_pose_state_indices)]=valueelse:self.P=valueelifself.update_strategy==helpers.UpdateStrategy.ONLY_ROBOT_POSE_AND_NECESSARY_STATES:state_indices=np.sort(np.array(list(self.x_indices_to_update),dtype=int))self.P[np.ix_(state_indices,state_indices)]=value@propertydefx_R(self)->npt.NDArray:""" Returns all physical state variables concerning the robot pose. Returns ------- npt.NDArray State vector of the robot pose. [x, y, v_x, psi]. """returnself.x[:self.x_r_dim]@x_R.setterdefx_R(self,value:npt.NDArray):""" Sets all physical state variables concerning the robot pose. """self.x[:self.x_r_dim]=value@propertydeflandmarks(self)->npt.NDArray:""" Returns cartesian coordinates of all tracked landmarks. Returns ------- npt.NDArray Cartesian coordinates of all tracked landmarks. """returnnp.reshape(self.x[self.x_r_dim:],(-1,2))@propertydeflandmarks_prior(self)->npt.NDArray:""" Returns cartesian coordinates of all prior tracked landmarks. Returns ------- npt.NDArray Cartesian coordinates of all prior tracked landmarks. """returnnp.reshape(self.x_prior[self.x_r_dim:],(-1,2))@landmarks.setterdeflandmarks(self,value:npt.NDArray):self.x[self.x_r_dim:]=value.flatten()@propertydefx_R_prior(self)->npt.NDArray:""" Returns prior of the state vector of the robot pose. Returns ------- npt.NDArray Prior of the state vector of the robot pose. """returnself.x_prior[:self.x_r_dim]@propertydefx_R_post(self)->npt.NDArray:""" Returns posterior of the state vector of the robot pose. Returns ------- npt.NDArray Post of the state vector of the robot pose. """returnself.x_post[:self.x_r_dim]@propertydefP_R(self)->npt.NDArray:""" Returns state noise covariance matrix of the robot pose state vector. Returns ------- npt.NDArray State noise covariance matrix of the robot pose state vector. """returnself.P[:self.x_r_dim,:self.x_r_dim]@P_R.setterdefP_R(self,value:npt.NDArray):""" Sets state noise covariance matrix of the robot pose state vector. """self.P[:self.x_r_dim,:self.x_r_dim]=value@propertydefP_R_post(self)->npt.NDArray:""" Returns posterior of the state noise covariance matrix of the robot pose state vector. Returns ------- npt.NDArray Post of the state noise covariance matrix of the robot pose state vector. """returnself.P_post[:self.x_r_dim,:self.x_r_dim]@propertydefP_R_prior(self)->npt.NDArray:""" Returns prior of the state noise covariance matrix of the robot pose state vector. Returns ------- npt.NDArray Prior of the state noise covariance matrix of the robot pose state vector. """returnself.P_prior[:self.x_r_dim,:self.x_r_dim]
[docs]defcalculate_Q(self,dt:float,x_dim:int)->None:""" Calculate state transition noise covariance matrix Q based on the current dt. Parameters ---------- dt : float Current dt of the prediction step. x_dim : int Number of state variable dimension to use for prediction. """self.Q=np.zeros((x_dim,x_dim))self.Q[:self.x_r_dim,:self.x_r_dim]=self.template_Q*dt
[docs]@staticmethoddeff_x(sigma_points:npt.NDArray,dt:int,u:npt.NDArray,steering_wheel_to_ackermann_lut)->npt.NDArray:""" Calculate the predicted next state based on the current state x,the timestep t and the control input u. Using a simple bicycle model that disregards any side slip angle. .. figure:: images/f_x_model.png :class: with-border This is the caption. Parameters ---------- sigma_points : npt.NDArray All sigma points around the current state vector. dt : int Timestep since last predict. u : npt.NDArray Current control input vector of the system. Returns ------- npt.NDArray New estimated state vector of the system """sigma_points[:,2]+=u[1]*dtsigma_points[:,3]+=np.radians(u[2])*dtsteering_wheel_angle=u[0]velocity=sigma_points[:,2]distance=velocity*dtackermann=steering_wheel_to_ackermann_lut(steering_wheel_angle)ifackermann!=0:turning_radius=vehicle_dynamics.ackermann_to_radius(ackermann)alpha=distance/np.abs(turning_radius)local_pos=np.array([np.abs(turning_radius)*np.sin(alpha),turning_radius*(1-np.cos(alpha))]).Telse:local_pos=np.stack([distance,np.zeros(sigma_points.shape[0])],axis=1)rot_matrix=np.empty((sigma_points.shape[0],2,2))cos,sin=np.cos(-sigma_points[0,3]),np.sin(-sigma_points[0,3])rot_matrix[:,0,0],rot_matrix[:,0,1],rot_matrix[:,1,0],rot_matrix[:,1,1]=cos,sin,-sin,cosrot_pos=np.einsum("ijk, ijk -> ij",rot_matrix,local_pos.reshape(-1,1,2))sigma_points[:,0:2]+=rot_posreturnsigma_points
[docs]@staticmethoddefstate_mean(sigmas:npt.NDArray,Wm:npt.NDArray)->npt.NDArray:""" Calculate the mean of the sigma points and weights. The mean for the angle in the state vector has to be calculated seperately. Parameters ---------- sigmas : npt.NDArray Sigma Points used in the filter. Wm : npt.NDArray Weights of the sigma points. Returns ------- npt.NDArray Mean of the sigma points. """x=np.average(sigmas,axis=0,weights=Wm)# since psi is an angle, the mean has to be calculated in a very special wayx[3]=math.circular_mean(sigmas[:,3],Wm,normalize=False)# psireturnx
[docs]defunscented_transform(self,sigmas,Wm,Wc,noise_cov=None,mean_fn=None,residual_fn=None):ifmean_fnisNone:# new mean is just the sum of the sigmas * weightx=np.dot(Wm,sigmas)# dot = \Sigma^n_1 (W[k]*Xi[k])else:x=mean_fn(sigmas,Wm)# new covariance is the sum of the outer product of the residuals# times the weightsy=residual_fn(sigmas,x)P=np.around(np.einsum('l,ln,lm->nm',Wc,y,y),decimals=self.max_decimals)ifnoise_covisnotNone:P+=noise_covreturn(x,P)
[docs]defcross_variance(self,x,z,sigmas_f,sigmas_h):""" Compute cross variance of the state `x` and measurement `z`. """dx=self.residual_sigmaf_and_x(sigmas_f=sigmas_f,x=x)dz=self.residual_z(sigmas_h,z)Pxz=np.around(np.einsum('l,ln,lm->nm',self.Wc,dx,dz),decimals=self.max_decimals)returnPxz
[docs]@staticmethoddefresidual_sigmaf_and_x(sigmas_f:npt.NDArray[np.floating],x:npt.NDArray[np.floating])->npt.NDArray[np.floating]:""" Calculate the difference the sigma points and the state vector. As the state vector contains an angle, the result from the subtraction has to be normalized to the interval [-pi:pi]. Parameters ---------- sigmas_f : npt.NDArray Sigma point. x : npt.NDArray Current state vector. Returns ------- npt.NDArray Difference between sigma points and state vector. """dx=sigmas_f-xreturndx
[docs]defadd_sensor(self,sensor_obj:Sensor)->Sensor:""" Add a new sensor object to the filter. Possible options for sensor classes are defined in the sensors folder. Parameters ---------- sensor_obj : Sensor The sensor object to add. Returns ------- sensor_obj : Sensor The added sensor. """sensor_obj.sensor_i=len(self.sensors)self.sensors.append(sensor_obj)returnsensor_obj
[docs]defcompute_process_sigmas(self,dt,fx=None,**fx_args):""" computes the values of sigmas_f. Normally a user would not call this, but it is useful if you need to call update more than once between calls to predict (to update for multiple simultaneous measurements), so the sigmas correctly reflect the updated state x, P. """iffxisNone:fx=self.fx# calculate sigma points for given mean and covariancesigmas=self.points_fn.sigma_points(self.x_necessary,self.P_necessary)self.sigmas_f=self.fx(sigmas,dt,steering_wheel_to_ackermann_lut=self.steering_wheel_to_ackermann_lut,**fx_args)
[docs]defresidual_sigmash_and_z(self,sigmas_h:npt.NDArray,z:npt.NDArray)->npt.NDArray:""" Calculates the difference between the measurement sigma points and the measurement for all sensors. Parameters ---------- sigmas_h : npt.NDArray Sigma points for all measurements. z : npt.NDArray Measurements of all sensors. Returns ------- npt.NDArray Difference between sigma points and measurements of all sensors. """y=np.empty_like(sigmas_h)last_z_index=0forsensorinself.sensors_to_update:z_end_index=last_z_index+sensor.z_to_update.shape[0]z_indices=np.arange(last_z_index,z_end_index)y[...,z_indices]=sensor.residual_sigmash_and_z(sigmas_h=sigmas_h[...,z_indices],z=z[z_indices])last_z_index=z_end_indexreturny
[docs]defmeasurement_mean(self,sigmas:npt.NDArray,Wm:npt.NDArray)->npt.NDArray:""" Calculates the weighted mean of all sigma points of the measurement vectors of all sensors. Parameters ---------- sigmas : npt.NDArray Sigma points of the measurement vector of all sensors.. Wm : npt.NDArray Weights for the calculation of the weighted mean of the measurement. Returns ------- npt.NDArray Weighted mean of the sigma points of the measurement vectors of all sensors. """zp=np.empty((sigmas.shape[1]))last_z_index=0forsensorinself.sensors_to_update:z_end_index=last_z_index+sensor.z_to_update.shape[0]z_indices=np.arange(last_z_index,z_end_index)zp[z_indices]=sensor.measurement_mean(sigmas=sigmas[:,z_indices],Wm=Wm)last_z_index=z_end_indexreturnzp
[docs]defupdate(self,u:Optional[npt.NDArray]=None):""" Update the UKF with the given measurements. On return, self.x and self.P contain the new mean and covariance of the filter. Parameters ---------- u : Optional[npt.NDArray] Current control input vector of the system. """# pass prior sigmas through h(x) to get measurement sigmas# the shape of sigmas_h will vary if the shape of z varies, so# recreate each timex=self.x_necessaryP=self.P_necessary# Calculate how many measurements there will be and# initialize enitre measurement vector, measurement noise matrix and sigma pointsz_n=sum([sensor.z_to_update.shape[0]forsensorinself.sensors_to_update])sigmas_h=np.empty((self.sigmas_f.shape[0],z_n))R=np.zeros((z_n,z_n))z=np.empty((z_n))# calculate sigma points for based on the respective masurement for every sensorlast_z_index=0self.sensor_updated_z_indices=[]forsensorinself.sensors_to_update:z_end_index=last_z_index+sensor.z_to_update.shape[0]z_indices=np.arange(last_z_index,z_end_index)sigmas_h[:,z_indices]=sensor.hx(sigmas_f=self.sigmas_f,u=u)R[np.ix_(z_indices,z_indices)]=sensor.R_to_updatez[z_indices]=sensor.z_to_updatelast_z_index=z_end_indexself.executed_steps.append(sensor.sensor_name)self.sensor_updated_z_indices.append(z_indices)self.sigmas_h=np.atleast_2d(sigmas_h)# mean and covariance of prediction passed through unscented transformzp,self.S=self.unscented_transform(self.sigmas_h,self.Wm,self.Wc,R,self.measurement_mean,self.residual_z)self.zp=zpself.SI=self.inv(self.S)# compute cross variance of the state and the measurementsPxz=self.cross_variance(x,zp,self.sigmas_f,self.sigmas_h)self.K=np.dot(Pxz,self.SI)# Kalman gainself.y=self.residual_sigmash_and_z(z,zp)# residual# update Gaussian state estimate (x, P)x=x+np.dot(self.K,self.y)P-=np.around(np.dot(self.K,np.dot(self.S,self.K.T)),decimals=self.max_decimals)# limit minimal cone covarianceforiinrange(self.x_r_dim,x.shape[0],2):factor=np.pi*np.product(np.linalg.eigvals(P[i:i+2,i:i+2]))/self.minimal_cone_covariance_areaiffactor<1:P[i:i+2,i:i+2]/=factorself.x_necessary=xself.P_necessary=P# save measurement and posterior stateself.z=z.copy()self.R=R.copy()self.x_post=self.x.copy()self.P_post=self.P.copy()# set to None to force recomputeself._log_likelihood=Noneself._likelihood=Noneself._mahalanobis=None
[docs]defpredict(self,dt=None,UT=None,fx=None,**fx_args):r""" Performs the predict step of the UKF. On return, self.x and self.P contain the predicted state (x) and covariance (P). ' Important: this MUST be called before update() is called for the first time. Parameters ---------- dt : double, optional If specified, the time step to be used for this prediction. self._dt is used if this is not provided. fx : callable f(x, **fx_args), optional State transition function. If not provided, the default function passed in during construction will be used. UT : function(sigmas, Wm, Wc, noise_cov), optional Optional function to compute the unscented transform for the sigma points passed through hx. Typically the default function will work - you can use x_mean_fn and z_mean_fn to alter the behavior of the unscented transform. **fx_args : keyword arguments optional keyword arguments to be passed into f(x). """ifdtisNone:dt=self._dtifUTisNone:UT=self.unscented_transform# calculate sigma points for given mean and covarianceself.compute_process_sigmas(dt,fx,**fx_args)# and pass sigmas through the unscented transform to compute prior# only use the robot pose states for the predict step.self.x_necessary,self.P_necessary=UT(self.sigmas_f,self.Wm,self.Wc,self.Q,self.x_mean,self.residual_sigmaf_and_x)# save priorself.x_prior=np.copy(self.x)self.P_prior=np.copy(self.P)
[docs]defprepare_update(self):""" Prepare the updates for all sensors. """self.sensors_to_update:List[Sensor]=[]self.x_indices_to_update:Set[int]=set(np.arange(self.x_r_dim,dtype=int))forsensorinself.sensors:ifsensor.prepare_update():self.sensors_to_update.append(sensor)self.x_indices_to_update.update(sensor.x_indices_to_update)
[docs]deftry_predict(self,dt:float,u:npt.NDArray):""" Try to predict based on timestep and control input, correct state noise if there is a non-semi-positive-definite matrix and try again. Parameters ---------- dt : float Timestep since last predict. u : npt.NDArray Control input. """x_dim=self.x_necessary.shape[0]self.calculate_Q(dt=dt,x_dim=x_dim)ifself.points_fn.n!=x_dim:self.recreate_sigma_points(x_dim=x_dim)# TODO: vllt vorgenerierentry:self.predict(dt=dt,u=u,UT=self.unscented_transform)exceptnp.linalg.LinAlgError:logging.critical(f'P is not semi-positive-definite (predict after {self.executed_steps[-1]}).'' Finding nearest covariance matrix and trying again!')self.P=cov_nearest(self.P)self.predict(dt=dt,u=u,UT=self.unscented_transform)self.executed_steps=['predict']
[docs]deftry_update(self,u:npt.NDArray):""" Try to predict based on control input, correct state noise if there is a non-semi-positive-definite matrix and try again. Parameters ---------- u : npt.NDArray Control input. """try:self.update(u=u)exceptnp.linalg.LinAlgError:logging.critical('P is not positive definite (update).''Finding nearest covariance matrix and trying again!')self.P=cov_nearest(self.P)self.update(u=u)
[docs]defpostprocess_update(self):""" Postprocess update for all sensors if the respective flag specifies. """forsensorinself.sensors:ifsensor.execute_update_postprocessisTrue:sensor.postprocess_update()
[docs]defrun_batch(self,saver:helpers.Saver)->None:""" Run predict and update steps for the given control inputs and measurements. Using the prerecorded measurements that have been passed during the initialization of the filter and sensors. If there are values for all control inputs and the time since the last predict step is bigger than the predict tolerance, a new prediction is made. Updates are done with the mean of the meausrements since the last prediction. Parameters ---------- saver : helpers.Saver Saver object to save the filter object at every timestep. """assertself.u_arrayisnotNoneassertself.t_arrayisnotNoneself.sensor_z=[None]*len(self.sensors)self.sensor_R=[None]*len(self.sensors)# Calculate the maximal z dimension based on the prerecorded measurement arrays.self.max_z_dim=max([sensor.z_array.shape[1]ifsensor.z_arrayisnotNoneandsensor.z_array.ndim>1else1forsensorinself.sensors])local_mapping=self.sensor('local_mapping')[1]prediction_step=0bar=helpers.generate_progressbar(self.u_array.shape[0]).start()foriinrange(self.u_array.shape[0]):# if any control input is finite, remember itifnp.any(np.isfinite(self.u_array[i])):mask=np.isfinite(self.u_array[i])self.u[mask]=self.u_array[i][mask]time_since_predict=self.t_array[i]-self.t_array[self.last_prediction]# the following code contains the calling of the predict and update procedure# it will only be executed if all control inputs have been defined already# and either there has not been a predict in the last self.predict_tolerance seconds# or if there is new data from the local_mappingifnp.all(np.isfinite(self.u))and(time_since_predict>self.predict_toleranceornp.all(np.isfinite(local_mapping.z_array[i]))):# Gets the measurements of all sensors for this particular step,forsensorinself.sensors:sensor.get_prerecorded_z_and_R(last_prediction_step=self.last_prediction,i=i)self.prepare_update()self.try_predict(dt=time_since_predict,u=self.u)self.try_update(u=self.u)self.postprocess_update()self.last_prediction=iself.t=self.t_array[i]saver.save()prediction_step+=1bar.update(value=i,prediction_step=prediction_step)bar.finish()
if__name__=='__main__':fromutilsimportdata_importdf=data_import.RosbagAndMDFImporter(bag_filename='/home/martina/CURE/as_ros/rosbags/run_373_per.bag').df.loc[0:15]withopen('src/slam/param_study/test/configs/test_case_0.yaml','r')asconfig_file:settings=yaml.safe_load(config_file)ukf=UKF(u_array=df[['steering_wheel_angle','linear_acceleration.x','angular_velocity.z']].to_numpy(),update_strategy=helpers.UpdateStrategy(settings['ukf']['update_strategy']),sigma_alpha=settings['ukf']['sigma_alpha'],minimal_cone_covariance_area=settings['ukf']['minimal_cone_covariance_area'],P=settings['ukf']['P'],template_Q=settings['ukf']['template_Q'],u_dim=3,t_array=np.asarray(df.index.to_numpy(),dtype=np.float32))ukf.add_sensor(NovatelGPSHeading(filter=ukf,z_array=df['heading'].to_numpy(),sensor_name='novatel_gps_heading',z_names=['world_heading'],R_array=NovatelGPSHeading.generate_R_matrix_from_df(df),active=settings['novatel_heading']['active']))ukf.add_sensor(LocalMapping(filter=ukf,z_array=df['cones'].to_numpy(),active=settings['local_mapping']['active'],sensor_name='local_mapping',z_names=['landmarks'],data_association=helpers.DataAssociation(settings['local_mapping']['data_association']['method']),use_kd_tree=settings['local_mapping']['data_association']['use_kd_tree'],chi2_quantile=settings['local_mapping']['data_association']['chi2_quantile'],fov_min_distance=settings['local_mapping']['fov_gate']['min_distance'],fov_max_distance=settings['local_mapping']['fov_gate']['max_distance'],fov_angle=settings['local_mapping']['fov_gate']['angle'],fov_scaling=settings['local_mapping']['fov_gate']['scaling'],probability_gate=settings['local_mapping']['probability_gate'],R_azimuth=settings['local_mapping']['R_azimuth'],R_distance_factor=settings['local_mapping']['R_distance_factor'],R_offset=settings['local_mapping']['R_distance_offset']))ukf.add_sensor(NovatelGPSPosition(filter=ukf,z_array=df[['latitude','longitude','speed']].to_numpy(),sensor_name='novatel_gps_position',z_names=['latitude','longitude','velocity'],R_array=NovatelGPSPosition.generate_R_matrix_from_df(df),active=settings['novatel_position']['active']))ukf.add_sensor(CUREDiffWheelspeed(filter=ukf,z_array=df['wheelspeed_fl'].to_numpy(),active=settings['wheelspeed']['fl_active'],right=False,sensor_name='ros_wheelspeed_fl',z_names=['omega_fl'],gear_ratio=1,track_width=settings['wheelspeed']['track_width'],wheel_radius=settings['wheelspeed']['wheel_radius'],R=settings['wheelspeed']['R']))ukf.add_sensor(CUREDiffWheelspeed(filter=ukf,z_array=df['wheelspeed_fr'].to_numpy(),active=settings['wheelspeed']['fr_active'],right=True,sensor_name='ros_wheelspeed_fr',z_names=['omega_fr'],gear_ratio=1,track_width=settings['wheelspeed']['track_width'],wheel_radius=settings['wheelspeed']['wheel_radius'],R=settings['wheelspeed']['R']))ukf.add_sensor(CUREDiffWheelspeed(filter=ukf,z_array=df['wheelspeed_rl'].to_numpy(),active=settings['wheelspeed']['rl_active'],right=False,sensor_name='ros_wheelspeed_rl',z_names=['omega_rl'],gear_ratio=settings['wheelspeed']['rear_gear_ratio'],track_width=settings['wheelspeed']['track_width'],wheel_radius=settings['wheelspeed']['wheel_radius'],R=settings['wheelspeed']['R']))ukf.add_sensor(CUREDiffWheelspeed(filter=ukf,z_array=df['wheelspeed_rr'].to_numpy(),active=settings['wheelspeed']['rr_active'],right=True,sensor_name='ros_wheelspeed_rr',z_names=['omega_rr'],gear_ratio=settings['wheelspeed']['rear_gear_ratio'],track_width=settings['wheelspeed']['track_width'],wheel_radius=settings['wheelspeed']['wheel_radius'],R=settings['wheelspeed']['R']))saver=helpers.Saver(ukf)ifprofilingisTrue:Path(f'plots/profiling/{test_case}').mkdir(parents=True,exist_ok=True)cProfile.runctx('ukf.run_batch(saver)',globals(),locals(),f'plots/profiling/{test_case}/profiling.pstats')os.system(f'python scripts/utils/gprof2dot.py -f pstats plots/profiling/{test_case}/profiling.pstats'f'| dot -Tsvg -o plots/profiling/{test_case}/{test_case}.svg')else:ukf.run_batch(saver=saver)# for sensor in ukf.sensors:# sensor.postprocess(saver)ifsaving_saverisTrue:saver.pickle(test_case=test_case)ifplottingisTrue:plot.plot_standard_plots(saver=saver,start=0,end=None,test_case=test_case,map_cones=data_import.import_map_csv('data/cure/may/trackdrive/run_373_moved.csv'),map_x_offset=0,map_y_offset=0,map_rotation=0)