Source code for mpc

"""
Python module to implement bicycle model and cost function for MPC as model and respective MPC.

This MPC predicts vehicle behaviour to follow a planned trajectory.
"""
import sys
import os
from typing import Optional, Any, Tuple

import numpy as np
import numpy.typing as npt
from scipy.ndimage import uniform_filter1d
import casadi

sys.path.insert(0, f'{os.path.abspath(os.path.dirname(__file__))}/../forces_pro_client')  # On Unix
import forcespro  # noqa: E402
import forcespro.nlp  # noqa: E402


[docs]class NoIteration(Exception): """Exception to signal that the MPC did not solve the problem since no iteration was calculated.""" pass
[docs]class Model(): """ Bicycle model for the model predictive controller. Implements system propagation function (continues_dynamics / F_x), objective functions and functions to create Forces Pro Model and Solver. Attributes ---------- prediction_steps_n : int Number of prediction steps for prediction. step_size : float Delta t [s] between two subsequent prediction steps. ackermann_angle_upper_boundary : float Allowed upper boundary of ackermann angle. ackermann_angle_lower_boundary : float Allowed lower boundary of ackermann angle. l_r : float tbd. l_f : float tbd. m : float Mass of vehicle. c_aero : float C Aero of vehicle. """ def __init__(self, prediction_steps_n: int, step_size: float, ackermann_angle_upper_boundary: float, ackermann_angle_lower_boundary: float, l_r: float, l_f: float, m: float, c_aero: float) -> None: """ Initialize bicycle model. Parameters ---------- prediction_steps_n : int Number of prediction steps for prediction. step_size : float Delta t [s] between two subsequent prediction steps. ackermann_angle_upper_boundary : float Allowed upper boundary of ackermann angle. ackermann_angle_lower_boundary : float Allowed lower boundary of ackermann angle. l_r : float tbd. l_f : float tbd. m : float Mass of vehicle. c_aero : float C Aero of vehicle. """ self.prediction_steps_n = prediction_steps_n self.step_size = step_size self.l_r = l_r self.l_f = l_f self.m = m self.c_aero = c_aero self.ackermann_angle_upper_boundary = ackermann_angle_upper_boundary self.ackermann_angle_lower_boundary = ackermann_angle_lower_boundary
[docs] def continuous_dynamics(self, x: Any, u: Any) -> Any: # noqa: ANN401 """ Calculate derivation of state vector based on state vector and control inputs. Parameters ---------- x : Any State vector (x, y, psi, v_x, delta), shape: (5, ). u : Any Control input vector (a_x, delta_rate), shape: (2, ). Returns ------- Any Derivation of state vector (dx, dy, dpsi, a_x, delta_rate), shape: (5, ). """ # transfer variables to variables with clear names x_pos = x[0] # noqa: F841 y_pos = x[1] # noqa: F841 psi = x[2] v_x = x[3] delta = x[4] ax = u[0] delta_rate = u[1] # longitudinal force F_x = self. m * ax - self.c_aero * (v_x**2) F_reib = -68.204 - 0.2541 * F_x F_x = F_x + F_reib # lateral velocity v_y = casadi.tan(delta) * v_x * self.l_r / (self.l_r + self.l_f) # calculate dx/dt return casadi.vertcat(v_x * casadi.cos(psi) - v_y * casadi.sin(psi), # dxPos/dt = v_x*cos(psi)-v_y*sin(psi) v_x * casadi.sin(psi) + v_y * casadi.cos(psi), # dyPos/dt = v_x*sin(psi)+v_y*cos(psi) casadi.tan(delta) * v_x / (self.l_r + self.l_f), # dpsi/dt = tan(delta)*v_x/(l_r+l_f) F_x / self.m, # dvdt = F_x/m delta_rate)
[docs] def objective(self, z: Any, runtime_params: Any) -> float: # noqa: ANN401 """ Calculate cost for prediction step based on predictes states and control inputs and current target. .. todo:: Which types do they really have? Parameters ---------- z : Any Predicted states and control inputs, shape: (7, ) (a_x, delta_rate, x, y, psi, v_x, delta). runtime_params : Any Current target path points and objective factors, shape: (7, ) (x, y, obj_factor_x_pos, obj_factor_y_pos, obj_factor_a_x, obj_factor_delta_rate, obj_factor_delta). Returns ------- float Cost for prediction step. """ """ """ return (runtime_params[2] * ((z[2] - runtime_params[0])**2) + runtime_params[3] * ((z[3] - runtime_params[1])**2) + runtime_params[4] * (z[0] ** 2) + runtime_params[5] * (z[1] ** 2) + runtime_params[6] * (z[6] ** 2))
[docs] def objective_n(self, z: Any, runtime_params: Any) -> float: # noqa: ANN401 """ Calculate cost for last prediction step based on predictes states and control inputs and current target. .. todo:: Which types do they really have? Parameters ---------- z : Any Predicted states and control inputs, shape: (7, ) (a_x, delta_rate, x, y, psi, v_x, delta). runtime_params : Any Current target path points and objective factors, shape: (7, ) (x, y, obj_factor_x_pos, obj_factor_y_pos, obj_factor_a_x, obj_factor_delta_rate, obj_factor_delta). Returns ------- float Cost for the last prediction step. """ return (2 * runtime_params[2] * ((z[2] - runtime_params[0])**2) + 2 * runtime_params[3] * ((z[3] - runtime_params[1])**2) + runtime_params[4] * (z[0] ** 2) + runtime_params[5] * (z[1] ** 2) + runtime_params[6] * (z[6] ** 2))
[docs] def generate_model(self) -> forcespro.nlp.SymbolicModel: """ Generate Model based on bicycle model for the Forces Pro Solver. Returns ------- forcespro.nlp.SymbolicModel Model for Forces Pro Solver. """ # Model Definition # ---------------- # Problem dimensions model = forcespro.nlp.SymbolicModel() model.N = self.prediction_steps_n # horizon length model.nvar = 7 # number of variables model.neq = 5 # number of equality constraints model.npar = 7 # number of runtime parameters # Objective function model.objective = self.objective model.objectiveN = self.objective_n # increased costs for the last stage # The function must be able to handle symbolic evaluation, # by passing in CasADi symbols. This means certain numpy funcions are not # available. # Explicit RK4 integrator here to discretize continuous dynamics model.eq = lambda z: forcespro.nlp.integrate(self.continuous_dynamics, z[2:7], z[0:2], integrator=forcespro.nlp.integrators.RK4, stepsize=self.step_size) # Indices on LHS of dynamical constraint - for efficiency reasons, make # sure the matrix E has structure [0 I] where I is the identity matrix. model.E = np.concatenate([np.zeros((5, 2)), np.eye(5)], axis=1) # Inequality constraints # upper/lower variable bounds lb <= z <= ub model.lb = np.array([-0.8 * 9.81, # ax np.deg2rad(-60.), # deltarate -np.inf, # x -np.inf, # y -np.inf, # psi 0.0, # v_x self.ackermann_angle_lower_boundary]) # delta model.ub = np.array([0.8 * 9.81, # ax np.deg2rad(+60.), # deltarate +np.inf, # x +np.inf, # y +np.inf, # psi np.inf, # v_x self.ackermann_angle_upper_boundary]) # delta # ax maximum boundaries: -23....9.81 # Initial condition on vehicle states x model.xinitidx = range(2, 7) # use this to specify on which variables initial conditions are imposed return model
[docs] def generate_solver(self) -> forcespro.nlp.Solver: """ Generate Forces Pro Solver for bicycle model. Returns ------- forcespro.nlp.Solver Generated Forces Pro Solver. """ model = self.generate_model() # Solver generation # ----------------- # Set solver options codeoptions = forcespro.CodeOptions('AbsoluterHighLevelShitSolver') codeoptions.maxit = 200 # Maximum number of iterations codeoptions.printlevel = 0 # Use printlevel = 2 to print progress (but not for timings) codeoptions.optlevel = 0 # 0 no optimization, 1 optimize for size, # 2 optimize for speed, 3 optimize for size & speed codeoptions.cleanup = False codeoptions.timing = 1 codeoptions.nlp.hessian_approximation = 'bfgs' codeoptions.solvemethod = 'SQP_NLP' # choose the solver method Sequential Quadratic Programming model.bfgs_init = 2.5 * np.identity(7) # codeoptions.nlp.bfgs_init = 2.5 * np.identity(7) codeoptions.sqp_nlp.maxqps = 1 # maximum number of quadratic problems to be solved codeoptions.sqp_nlp.reg_hessian = 5e-9 # increase this if exitflag=-8 codeoptions.sse = 1 # enable SIMD instructions for accelerated execution # change this to your server or leave uncommented for using the # standard embotech server at https://forces.embotech.com # codeoptions.server = 'https://forces.embotech.com' # noqa: E800 # codeoptions.license.print_system_info = 1 # noqa: E800 # codeoptions.server = 'https://forces-beta-cure.embotech.com' # noqa: E800 # codeoptions.license.static_license_file_name = 'FORCESPRO' # noqa: E800 # Creates code for symbolic model formulation given above, then contacts solver = model.generate_solver(options=codeoptions) return solver
[docs]class MPC(): """ Implement functionality to predict states and control inputs with MPC based on trajectory, vehicle pose and model. Model Predictive Controller class to implement functionality to predict states and control inputs with Model Predictive Controller based on planned trajectory, current vehicle pose and actual steering wheel angle and non linear bicycle model. Attributes ---------- trajectory : Optional[npt.NDArray] Current planned trajectory calculated by Motion Planning, shape: (n, 3) with n equal number trajectory points. cumulative_arc_length : Optional[npt.NDArray] Respective cumulative arc length of the trajectory, shape: (n, ) with n equal number trajectory points. heading : Optional[npt.NDArray] Respective vehicle heading on the trajectory, shape: (n, ) with n equal number trajectory points. curvature : Optional[npt.NDArray] Respective curvature of the trajectory, shape: (n, ) with n equal number trajectory points. time : Optional[npt.NDArray] Respective time when the vehicle will be there on the trajectory, shape: (n, ) with n equal number trajectory points. closed_track : Optional[bool] Indicates whether the trajectory is closed or open. ackermann_angle_upper_boundary : float Allowed upper boundary of ackermann angle. ackermann_angle_lower_boundary : float Allowed lower boundary of ackermann angle. is_skidpad : bool Indicates whether the current selected Autonomous Mission is Skidpad. last_trajectory_index : Optional[int] Trajectory index of last closest trajectory point to the vehicle. Used for driving Skidpad. exitflag : Optional[int] Exitflag of last prediction. info : Optional[Any] Information of last prediction. x : npt.NDArray Current state vector of the bicycle model (x, y, psi, v_x, delta), shape: (5, ). prediction_steps_n : int Number of prediction steps for prediction. step_size : float Delta t [s] between two subsequent prediction steps. obj_factor_x_pos : float Factor for objective function for deviation of x position. obj_factor_y_pos : float Factor for objective function for deviation of y position. obj_factor_a_x : float Factor for objective function for acceleration in x direction. obj_factor_delta_rate : float Factor for objective function for delta rate. obj_factor_delta : float Factor for objective function for delta. l_r : float tbd. l_f : float tbd. m : float Mass of vehicle. c_aero : float C Aero of vehicle. model : Model Bicycle model used as basis for the model predictive controller. solver : forcespro.nlp.Solver Forces Pro Solver for solving the optimization problem of the model predictive controller prediction. """ def __init__(self, prediction_steps_n: int, step_size: float, obj_factor_x_pos: float, obj_factor_y_pos: float, obj_factor_a_x: float, obj_factor_delta_rate: float, obj_factor_delta: float, ackermann_angle_upper_boundary: float, ackermann_angle_lower_boundary: float, l_r: float, l_f: float, m: float, c_aero: float, is_skidpad: bool) -> None: """ Initialize Model Predictive Controller. Intialize all parameters and intializes bicycle model and Forces Pro Solver. Parameters ---------- prediction_steps_n : int Number of prediction steps for prediction. step_size : float Delta t [s] between two subsequent prediction steps. obj_factor_x_pos : float Factor for objective function for deviation of x position. obj_factor_y_pos : float Factor for objective function for deviation of y position. obj_factor_a_x : float Factor for objective function for acceleration in x direction. obj_factor_delta_rate : float Factor for objective function for delta rate. obj_factor_delta : float Factor for objective function for delta. ackermann_angle_upper_boundary : float Allowed upper boundary of ackermann angle. ackermann_angle_lower_boundary : float Allowed lower boundary of ackermann angle. l_r : float tbd. l_f : float tbd. m : float Mass of vehicle. c_aero : float C Aero of vehicle. is_skidpad : bool Indicates whether the current selected Autonomous Mission is Skidpad. """ self.trajectory: Optional[npt.NDArray] = None self.cumulative_arc_length: Optional[npt.NDArray] = None self.heading: Optional[npt.NDArray] = None self.curvature: Optional[npt.NDArray] = None self.time: Optional[npt.NDArray] = None self.closed_track: Optional[bool] = None self.ackermann_angle_upper_boundary = ackermann_angle_upper_boundary self.ackermann_angle_lower_boundary = ackermann_angle_lower_boundary self.is_skidpad = is_skidpad self.last_trajectory_index = 0 self.exitflag: Optional[int] = None self.info: Optional[Any] = None self.x = np.zeros((5)) self.prediction_steps_n = prediction_steps_n self.step_size = step_size self.obj_factor_x_pos = obj_factor_x_pos self.obj_factor_y_pos = obj_factor_y_pos self.obj_factor_a_x = obj_factor_a_x self.obj_factor_delta_rate = obj_factor_delta_rate self.obj_factor_delta = obj_factor_delta self.l_r = l_r self.l_f = l_f self.m = m self.c_aero = c_aero self.model, self.solver = self.generate_pathplanner() assert self.prediction_steps_n == self.solver.nstages, ('Number of Predictions steps of configuration and' ' solver differ. Correct config or regenerate solver!') self.problem = { 'x0': np.zeros((self.prediction_steps_n, self.model.nvar)), 'xinit': np.zeros((5)) }
[docs] @staticmethod def interp_2d(x: npt.NDArray, xp: npt.NDArray, fp: npt.NDArray) -> npt.NDArray: """ Interpolates 2d function for given points. Parameters ---------- x : npt.NDArray Points for which the function should be interpolated, shape: (m, ) with m equal to number of to be interpolated points. xp : npt.NDArray X values of the 2d function, shape: (n, ) with n equal to the support points of the function. fp : npt.NDArray Y values of the 2d function, shape: (n, 2) with n equal to the support points of the function. Returns ------- npt.NDArray Interpolated function for the given points, shape: (m, 2) with m equal to number of to be interpolated points. """ j = np.searchsorted(xp, x) - 1 d = ((x - xp[j]) / (xp[j + 1] - xp[j]))[:, np.newaxis] return (1 - d) * fp[j] + fp[j + 1] * d
[docs] def generate_pathplanner(self) -> Tuple[Model, forcespro.nlp.Solver]: """ Generate FORCESPRO solver to calculate paths based on constraints and dynamics by minimizing an objective func. Returns ------- Tuple[Model, forcespro.nlp.Solver] Bicycle model used for the MPC and the respective solver for the MPC. """ model = Model(prediction_steps_n=self.prediction_steps_n, step_size=self.step_size, ackermann_angle_lower_boundary=self.ackermann_angle_lower_boundary, ackermann_angle_upper_boundary=self.ackermann_angle_upper_boundary, l_f=self.l_f, l_r=self.l_r, m=self.m, c_aero=self.c_aero).generate_model() # if the solver is already generated solver = forcespro.nlp.Solver.from_directory( f'{os.path.abspath(os.path.dirname(__file__))}/AbsoluterHighLevelShitSolver') return model, solver
[docs] def predict(self, state_vector: npt.NDArray) -> Tuple[npt.NDArray, npt.NDArray]: """ Predict control inputs for the prediction horizon with the MPC based on the states & previously set trajectory. Parameters ---------- state_vector : npt.NDArray Current state vector of the bicycle model (x, y, psi, v_x, delta). Returns ------- Tuple[npt.NDArray, npt.NDArray] Predicted states (x, y, psi, v_x, delta) (shape: (n, 5)) with n equal number of prediction steps and control inputs (a_x, delta_rate) (shape: (n, 2)) with n equal number of prediction steps Raises ------ NoIteration Indicates that the prediction of the model predictive controller was not succesful. """ # Set initial condition (xPos, yPos, psi, vx, delta) self.x = state_vector self.problem['xinit'] = state_vector # Set runtime parameters (here, the next N points on the path) next_path_points = self.resample_path_for_tracker() obj_factors = np.tile( [self.obj_factor_x_pos, self.obj_factor_y_pos, self.obj_factor_a_x, self.obj_factor_delta_rate, self.obj_factor_delta], (self.model.N,1)) runtime_parameters = np.hstack((next_path_points, obj_factors)) self.problem['all_parameters'] = np.reshape(runtime_parameters, (7 * self.model.N, 1)) # Time to solve the NLP! output, self.exitflag, self.info = self.solver.solve(self.problem) # TODO: Types if self.info.it == 0: raise NoIteration(f'No Iterations Executed, Exitflag: {self.exitflag}') # Extract output temp_output = np.empty((self.model.nvar, self.prediction_steps_n)) for i in range(0, self.prediction_steps_n): temp_output[:, i] = output[f'x{i + 1:02d}'] pred_u = temp_output[:2] pred_x = temp_output[2:] return pred_u, pred_x
[docs] def resample_path_for_tracker(self) -> npt.NDArray: """ Resamples path of trajectory for the tracker of the model predictive controller. Therefore interpolates at which point of the path the vehicle should be for the respective predictions steps of the prediction horizon. Returns ------- npt.NDArray Planned path for the prediction horizon, shape: (n, 2) with n equal number of prediction steps. """ # Localizes the vehicle on the map and resamples it for the tracker's prediction horizon # Localize arc_length_localization = self.localize_vehicle_on_path() # Get map time corresponding to current localization time_reference = np.interp(arc_length_localization, self.cumulative_arc_length, self.time) # Time vector of prediction horizon time_horizon = np.arange(0, self.prediction_steps_n * self.step_size, self.step_size) + time_reference if self.closed_track is False: time_horizon = np.minimum(self.time[-1], time_horizon) else: time_horizon = np.mod(time_horizon, self.time[-1]) return self.interp_2d(time_horizon, self.time, self.trajectory[:, :2])
[docs] def localize_vehicle_on_path(self) -> float: """ Localizes vehicle on the current path. Returns which path point is closest to the vehicle by returning the respective cumulative arc length for this point. Calculates the closest pathpoint based only on the the distance. In case a skidpad is driven, only the next few points since the last prediction steps are allowed. Calculates the respective cumulative arc length for this point. Returns ------- float Respective arc length for the closest point on the path to the vehicle. """ distance_from_waypoints = np.sqrt(np.sum(np.power((self.x[:2] - self.trajectory[:, :2]), 2), axis=1)) # Find the waypoint with the smallest distance to the vehicle # If the current missions is skidpad, add the offset due to previous negleted already driven trajectory if self.is_skidpad is True: closest_trajectory_index = np.argmin( distance_from_waypoints[self.last_trajectory_index:self.last_trajectory_index + 9]) closest_trajectory_index += self.last_trajectory_index if closest_trajectory_index >= self.last_trajectory_index: self.last_trajectory_index = closest_trajectory_index else: closest_trajectory_index = self.last_trajectory_index else: closest_trajectory_index = np.argmin(distance_from_waypoints) # Identify second closest waypoint w.r.t. the vehicle # trajectory_index1 is the index of the first of the two points # which make up the straight onto which the vehicle'sposition is projected if self.closed_track is False: if (distance_from_waypoints[closest_trajectory_index - 2] < distance_from_waypoints[closest_trajectory_index]): trajectory_index1 = closest_trajectory_index - 2 trajectory_index2 = closest_trajectory_index - 1 else: trajectory_index1 = closest_trajectory_index - 1 trajectory_index2 = closest_trajectory_index else: nElements = self.trajectory.shape[0] if (distance_from_waypoints[np.mod(nElements + closest_trajectory_index - 2, nElements)] < distance_from_waypoints[np.mod(nElements + closest_trajectory_index, nElements)]): trajectory_index1 = np.mod(nElements + closest_trajectory_index - 2, nElements) trajectory_index2 = closest_trajectory_index - 1 else: trajectory_index1 = closest_trajectory_index - 1 trajectory_index2 = np.mod(nElements + closest_trajectory_index, nElements) point1 = self.trajectory[trajectory_index1, :2] point2 = self.trajectory[trajectory_index2, :2] delta_points = point2 - point1 ds = np.linalg.norm(delta_points) # Calculate the projection and the corresponding arclength if ds < 1e-4: lambda_ = 0 else: lambda_ = -(delta_points[0] * (point1[0] - self.x[0]) + delta_points[1] * (point1[1] - self.x[1])) / ds**2 if self.closed_track is False: arc_length_localization = self.cumulative_arc_length[trajectory_index1] + lambda_ * ds else: arc_length_localization = np.mod(self.cumulative_arc_length[trajectory_index1] + lambda_ * ds, self.cumulative_arc_length[-1]) return arc_length_localization
[docs] def set_trajectory(self, trajectory: npt.NDArray, closed_track: bool) -> None: """ Set current planned trajectory and calculate other important values like curvature based on it. Calculates cumulative arc length, heading at each point, curvature and a respective time for each trajectory point. Parameters ---------- trajectory : npt.NDArray Planned trajectory calculated by Motion Planning, shape: (n, 3) with n equal number trajectory points. closed_track : bool Indicates whether the trajectory is closed. """ self.trajectory = trajectory self.closed_track = closed_track delta_arc_length = np.sqrt(np.sum(np.power(np.diff(trajectory[:,:2], axis=0, prepend=trajectory[:1,:2]), 2), axis=1)) self.cumulative_arc_length = np.cumsum(delta_arc_length) derivation1 = np.gradient(self.trajectory, axis=0, edge_order=1) derivation2 = np.gradient(derivation1, axis=0, edge_order=1) self.heading = np.arctan2(derivation1[:, 1], derivation1[:, 0]) self.curvature = np.divide((np.multiply(derivation1[:, 0], derivation2[:, 1]) - np.multiply(derivation1[:, 1], derivation2[:, 0])), np.power(np.sum(np.square(derivation1), axis=1), 3 / 2)) smoothed_velocity = np.maximum(uniform_filter1d(trajectory[:, 2], 2, mode='nearest'), 0.0001) dt = np.divide(delta_arc_length, smoothed_velocity) self.time = np.cumsum(dt)