"""
Python module to implement bicycle model and cost function for MPC as model and respective MPC.
This MPC predicts vehicle behaviour to follow a planned trajectory.
"""
import sys
import os
from typing import Optional, Any, Tuple
import numpy as np
import numpy.typing as npt
from scipy.ndimage import uniform_filter1d
import casadi
sys.path.insert(0, f'{os.path.abspath(os.path.dirname(__file__))}/../forces_pro_client') # On Unix
import forcespro # noqa: E402
import forcespro.nlp # noqa: E402
[docs]class NoIteration(Exception):
"""Exception to signal that the MPC did not solve the problem since no iteration was calculated."""
pass
[docs]class Model():
"""
Bicycle model for the model predictive controller.
Implements system propagation function (continues_dynamics / F_x),
objective functions and functions to create Forces Pro Model and Solver.
Attributes
----------
prediction_steps_n : int
Number of prediction steps for prediction.
step_size : float
Delta t [s] between two subsequent prediction steps.
ackermann_angle_upper_boundary : float
Allowed upper boundary of ackermann angle.
ackermann_angle_lower_boundary : float
Allowed lower boundary of ackermann angle.
l_r : float
tbd.
l_f : float
tbd.
m : float
Mass of vehicle.
c_aero : float
C Aero of vehicle.
"""
def __init__(self, prediction_steps_n: int, step_size: float, ackermann_angle_upper_boundary: float,
ackermann_angle_lower_boundary: float, l_r: float, l_f: float, m: float, c_aero: float) -> None:
"""
Initialize bicycle model.
Parameters
----------
prediction_steps_n : int
Number of prediction steps for prediction.
step_size : float
Delta t [s] between two subsequent prediction steps.
ackermann_angle_upper_boundary : float
Allowed upper boundary of ackermann angle.
ackermann_angle_lower_boundary : float
Allowed lower boundary of ackermann angle.
l_r : float
tbd.
l_f : float
tbd.
m : float
Mass of vehicle.
c_aero : float
C Aero of vehicle.
"""
self.prediction_steps_n = prediction_steps_n
self.step_size = step_size
self.l_r = l_r
self.l_f = l_f
self.m = m
self.c_aero = c_aero
self.ackermann_angle_upper_boundary = ackermann_angle_upper_boundary
self.ackermann_angle_lower_boundary = ackermann_angle_lower_boundary
[docs] def continuous_dynamics(self, x: Any, u: Any) -> Any: # noqa: ANN401
"""
Calculate derivation of state vector based on state vector and control inputs.
Parameters
----------
x : Any
State vector (x, y, psi, v_x, delta), shape: (5, ).
u : Any
Control input vector (a_x, delta_rate), shape: (2, ).
Returns
-------
Any
Derivation of state vector (dx, dy, dpsi, a_x, delta_rate), shape: (5, ).
"""
# transfer variables to variables with clear names
x_pos = x[0] # noqa: F841
y_pos = x[1] # noqa: F841
psi = x[2]
v_x = x[3]
delta = x[4]
ax = u[0]
delta_rate = u[1]
# longitudinal force
F_x = self. m * ax - self.c_aero * (v_x**2)
F_reib = -68.204 - 0.2541 * F_x
F_x = F_x + F_reib
# lateral velocity
v_y = casadi.tan(delta) * v_x * self.l_r / (self.l_r + self.l_f)
# calculate dx/dt
return casadi.vertcat(v_x * casadi.cos(psi) - v_y * casadi.sin(psi), # dxPos/dt = v_x*cos(psi)-v_y*sin(psi)
v_x * casadi.sin(psi) + v_y * casadi.cos(psi), # dyPos/dt = v_x*sin(psi)+v_y*cos(psi)
casadi.tan(delta) * v_x / (self.l_r + self.l_f), # dpsi/dt = tan(delta)*v_x/(l_r+l_f)
F_x / self.m, # dvdt = F_x/m
delta_rate)
[docs] def objective(self, z: Any, runtime_params: Any) -> float: # noqa: ANN401
"""
Calculate cost for prediction step based on predictes states and control inputs and current target.
.. todo:: Which types do they really have?
Parameters
----------
z : Any
Predicted states and control inputs, shape: (7, ) (a_x, delta_rate, x, y, psi, v_x, delta).
runtime_params : Any
Current target path points and objective factors, shape: (7, )
(x, y, obj_factor_x_pos, obj_factor_y_pos, obj_factor_a_x, obj_factor_delta_rate, obj_factor_delta).
Returns
-------
float
Cost for prediction step.
"""
"""
"""
return (runtime_params[2] * ((z[2] - runtime_params[0])**2)
+ runtime_params[3] * ((z[3] - runtime_params[1])**2)
+ runtime_params[4] * (z[0] ** 2)
+ runtime_params[5] * (z[1] ** 2)
+ runtime_params[6] * (z[6] ** 2))
[docs] def objective_n(self, z: Any, runtime_params: Any) -> float: # noqa: ANN401
"""
Calculate cost for last prediction step based on predictes states and control inputs and current target.
.. todo:: Which types do they really have?
Parameters
----------
z : Any
Predicted states and control inputs, shape: (7, ) (a_x, delta_rate, x, y, psi, v_x, delta).
runtime_params : Any
Current target path points and objective factors, shape: (7, )
(x, y, obj_factor_x_pos, obj_factor_y_pos, obj_factor_a_x, obj_factor_delta_rate, obj_factor_delta).
Returns
-------
float
Cost for the last prediction step.
"""
return (2 * runtime_params[2] * ((z[2] - runtime_params[0])**2)
+ 2 * runtime_params[3] * ((z[3] - runtime_params[1])**2)
+ runtime_params[4] * (z[0] ** 2)
+ runtime_params[5] * (z[1] ** 2)
+ runtime_params[6] * (z[6] ** 2))
[docs] def generate_model(self) -> forcespro.nlp.SymbolicModel:
"""
Generate Model based on bicycle model for the Forces Pro Solver.
Returns
-------
forcespro.nlp.SymbolicModel
Model for Forces Pro Solver.
"""
# Model Definition
# ----------------
# Problem dimensions
model = forcespro.nlp.SymbolicModel()
model.N = self.prediction_steps_n # horizon length
model.nvar = 7 # number of variables
model.neq = 5 # number of equality constraints
model.npar = 7 # number of runtime parameters
# Objective function
model.objective = self.objective
model.objectiveN = self.objective_n # increased costs for the last stage
# The function must be able to handle symbolic evaluation,
# by passing in CasADi symbols. This means certain numpy funcions are not
# available.
# Explicit RK4 integrator here to discretize continuous dynamics
model.eq = lambda z: forcespro.nlp.integrate(self.continuous_dynamics, z[2:7], z[0:2],
integrator=forcespro.nlp.integrators.RK4,
stepsize=self.step_size)
# Indices on LHS of dynamical constraint - for efficiency reasons, make
# sure the matrix E has structure [0 I] where I is the identity matrix.
model.E = np.concatenate([np.zeros((5, 2)), np.eye(5)], axis=1)
# Inequality constraints
# upper/lower variable bounds lb <= z <= ub
model.lb = np.array([-0.8 * 9.81, # ax
np.deg2rad(-60.), # deltarate
-np.inf, # x
-np.inf, # y
-np.inf, # psi
0.0, # v_x
self.ackermann_angle_lower_boundary]) # delta
model.ub = np.array([0.8 * 9.81, # ax
np.deg2rad(+60.), # deltarate
+np.inf, # x
+np.inf, # y
+np.inf, # psi
np.inf, # v_x
self.ackermann_angle_upper_boundary]) # delta
# ax maximum boundaries: -23....9.81
# Initial condition on vehicle states x
model.xinitidx = range(2, 7) # use this to specify on which variables initial conditions are imposed
return model
[docs] def generate_solver(self) -> forcespro.nlp.Solver:
"""
Generate Forces Pro Solver for bicycle model.
Returns
-------
forcespro.nlp.Solver
Generated Forces Pro Solver.
"""
model = self.generate_model()
# Solver generation
# -----------------
# Set solver options
codeoptions = forcespro.CodeOptions('AbsoluterHighLevelShitSolver')
codeoptions.maxit = 200 # Maximum number of iterations
codeoptions.printlevel = 0 # Use printlevel = 2 to print progress (but not for timings)
codeoptions.optlevel = 0 # 0 no optimization, 1 optimize for size,
# 2 optimize for speed, 3 optimize for size & speed
codeoptions.cleanup = False
codeoptions.timing = 1
codeoptions.nlp.hessian_approximation = 'bfgs'
codeoptions.solvemethod = 'SQP_NLP' # choose the solver method Sequential Quadratic Programming
model.bfgs_init = 2.5 * np.identity(7) # codeoptions.nlp.bfgs_init = 2.5 * np.identity(7)
codeoptions.sqp_nlp.maxqps = 1 # maximum number of quadratic problems to be solved
codeoptions.sqp_nlp.reg_hessian = 5e-9 # increase this if exitflag=-8
codeoptions.sse = 1 # enable SIMD instructions for accelerated execution
# change this to your server or leave uncommented for using the
# standard embotech server at https://forces.embotech.com
# codeoptions.server = 'https://forces.embotech.com' # noqa: E800
# codeoptions.license.print_system_info = 1 # noqa: E800
# codeoptions.server = 'https://forces-beta-cure.embotech.com' # noqa: E800
# codeoptions.license.static_license_file_name = 'FORCESPRO' # noqa: E800
# Creates code for symbolic model formulation given above, then contacts
solver = model.generate_solver(options=codeoptions)
return solver
[docs]class MPC():
"""
Implement functionality to predict states and control inputs with MPC based on trajectory, vehicle pose and model.
Model Predictive Controller class to implement functionality to predict states and control inputs with
Model Predictive Controller based on planned trajectory, current vehicle pose and actual steering wheel angle
and non linear bicycle model.
Attributes
----------
trajectory : Optional[npt.NDArray]
Current planned trajectory calculated by Motion Planning, shape: (n, 3) with n equal number trajectory points.
cumulative_arc_length : Optional[npt.NDArray]
Respective cumulative arc length of the trajectory, shape: (n, ) with n equal number trajectory points.
heading : Optional[npt.NDArray]
Respective vehicle heading on the trajectory, shape: (n, ) with n equal number trajectory points.
curvature : Optional[npt.NDArray]
Respective curvature of the trajectory, shape: (n, ) with n equal number trajectory points.
time : Optional[npt.NDArray]
Respective time when the vehicle will be there on the trajectory,
shape: (n, ) with n equal number trajectory points.
closed_track : Optional[bool]
Indicates whether the trajectory is closed or open.
ackermann_angle_upper_boundary : float
Allowed upper boundary of ackermann angle.
ackermann_angle_lower_boundary : float
Allowed lower boundary of ackermann angle.
is_skidpad : bool
Indicates whether the current selected Autonomous Mission is Skidpad.
last_trajectory_index : Optional[int]
Trajectory index of last closest trajectory point to the vehicle. Used for driving Skidpad.
exitflag : Optional[int]
Exitflag of last prediction.
info : Optional[Any]
Information of last prediction.
x : npt.NDArray
Current state vector of the bicycle model (x, y, psi, v_x, delta), shape: (5, ).
prediction_steps_n : int
Number of prediction steps for prediction.
step_size : float
Delta t [s] between two subsequent prediction steps.
obj_factor_x_pos : float
Factor for objective function for deviation of x position.
obj_factor_y_pos : float
Factor for objective function for deviation of y position.
obj_factor_a_x : float
Factor for objective function for acceleration in x direction.
obj_factor_delta_rate : float
Factor for objective function for delta rate.
obj_factor_delta : float
Factor for objective function for delta.
l_r : float
tbd.
l_f : float
tbd.
m : float
Mass of vehicle.
c_aero : float
C Aero of vehicle.
model : Model
Bicycle model used as basis for the model predictive controller.
solver : forcespro.nlp.Solver
Forces Pro Solver for solving the optimization problem of the model predictive controller prediction.
"""
def __init__(self, prediction_steps_n: int, step_size: float, obj_factor_x_pos: float, obj_factor_y_pos: float,
obj_factor_a_x: float, obj_factor_delta_rate: float, obj_factor_delta: float,
ackermann_angle_upper_boundary: float, ackermann_angle_lower_boundary: float,
l_r: float, l_f: float, m: float, c_aero: float, is_skidpad: bool) -> None:
"""
Initialize Model Predictive Controller.
Intialize all parameters and intializes bicycle model and Forces Pro Solver.
Parameters
----------
prediction_steps_n : int
Number of prediction steps for prediction.
step_size : float
Delta t [s] between two subsequent prediction steps.
obj_factor_x_pos : float
Factor for objective function for deviation of x position.
obj_factor_y_pos : float
Factor for objective function for deviation of y position.
obj_factor_a_x : float
Factor for objective function for acceleration in x direction.
obj_factor_delta_rate : float
Factor for objective function for delta rate.
obj_factor_delta : float
Factor for objective function for delta.
ackermann_angle_upper_boundary : float
Allowed upper boundary of ackermann angle.
ackermann_angle_lower_boundary : float
Allowed lower boundary of ackermann angle.
l_r : float
tbd.
l_f : float
tbd.
m : float
Mass of vehicle.
c_aero : float
C Aero of vehicle.
is_skidpad : bool
Indicates whether the current selected Autonomous Mission is Skidpad.
"""
self.trajectory: Optional[npt.NDArray] = None
self.cumulative_arc_length: Optional[npt.NDArray] = None
self.heading: Optional[npt.NDArray] = None
self.curvature: Optional[npt.NDArray] = None
self.time: Optional[npt.NDArray] = None
self.closed_track: Optional[bool] = None
self.ackermann_angle_upper_boundary = ackermann_angle_upper_boundary
self.ackermann_angle_lower_boundary = ackermann_angle_lower_boundary
self.is_skidpad = is_skidpad
self.last_trajectory_index = 0
self.exitflag: Optional[int] = None
self.info: Optional[Any] = None
self.x = np.zeros((5))
self.prediction_steps_n = prediction_steps_n
self.step_size = step_size
self.obj_factor_x_pos = obj_factor_x_pos
self.obj_factor_y_pos = obj_factor_y_pos
self.obj_factor_a_x = obj_factor_a_x
self.obj_factor_delta_rate = obj_factor_delta_rate
self.obj_factor_delta = obj_factor_delta
self.l_r = l_r
self.l_f = l_f
self.m = m
self.c_aero = c_aero
self.model, self.solver = self.generate_pathplanner()
assert self.prediction_steps_n == self.solver.nstages, ('Number of Predictions steps of configuration and'
' solver differ. Correct config or regenerate solver!')
self.problem = {
'x0': np.zeros((self.prediction_steps_n, self.model.nvar)),
'xinit': np.zeros((5))
}
[docs] @staticmethod
def interp_2d(x: npt.NDArray, xp: npt.NDArray, fp: npt.NDArray) -> npt.NDArray:
"""
Interpolates 2d function for given points.
Parameters
----------
x : npt.NDArray
Points for which the function should be interpolated,
shape: (m, ) with m equal to number of to be interpolated points.
xp : npt.NDArray
X values of the 2d function, shape: (n, ) with n equal to the support points of the function.
fp : npt.NDArray
Y values of the 2d function, shape: (n, 2) with n equal to the support points of the function.
Returns
-------
npt.NDArray
Interpolated function for the given points,
shape: (m, 2) with m equal to number of to be interpolated points.
"""
j = np.searchsorted(xp, x) - 1
d = ((x - xp[j]) / (xp[j + 1] - xp[j]))[:, np.newaxis]
return (1 - d) * fp[j] + fp[j + 1] * d
[docs] def generate_pathplanner(self) -> Tuple[Model, forcespro.nlp.Solver]:
"""
Generate FORCESPRO solver to calculate paths based on constraints and dynamics by minimizing an objective func.
Returns
-------
Tuple[Model, forcespro.nlp.Solver]
Bicycle model used for the MPC and the respective solver for the MPC.
"""
model = Model(prediction_steps_n=self.prediction_steps_n, step_size=self.step_size,
ackermann_angle_lower_boundary=self.ackermann_angle_lower_boundary,
ackermann_angle_upper_boundary=self.ackermann_angle_upper_boundary,
l_f=self.l_f, l_r=self.l_r, m=self.m, c_aero=self.c_aero).generate_model()
# if the solver is already generated
solver = forcespro.nlp.Solver.from_directory(
f'{os.path.abspath(os.path.dirname(__file__))}/AbsoluterHighLevelShitSolver')
return model, solver
[docs] def predict(self, state_vector: npt.NDArray) -> Tuple[npt.NDArray, npt.NDArray]:
"""
Predict control inputs for the prediction horizon with the MPC based on the states & previously set trajectory.
Parameters
----------
state_vector : npt.NDArray
Current state vector of the bicycle model (x, y, psi, v_x, delta).
Returns
-------
Tuple[npt.NDArray, npt.NDArray]
Predicted states (x, y, psi, v_x, delta) (shape: (n, 5)) with n equal number of prediction steps
and control inputs (a_x, delta_rate) (shape: (n, 2)) with n equal number of prediction steps
Raises
------
NoIteration
Indicates that the prediction of the model predictive controller was not succesful.
"""
# Set initial condition (xPos, yPos, psi, vx, delta)
self.x = state_vector
self.problem['xinit'] = state_vector
# Set runtime parameters (here, the next N points on the path)
next_path_points = self.resample_path_for_tracker()
obj_factors = np.tile(
[self.obj_factor_x_pos, self.obj_factor_y_pos, self.obj_factor_a_x, self.obj_factor_delta_rate, self.obj_factor_delta],
(self.model.N,1))
runtime_parameters = np.hstack((next_path_points, obj_factors))
self.problem['all_parameters'] = np.reshape(runtime_parameters, (7 * self.model.N, 1))
# Time to solve the NLP!
output, self.exitflag, self.info = self.solver.solve(self.problem) # TODO: Types
if self.info.it == 0:
raise NoIteration(f'No Iterations Executed, Exitflag: {self.exitflag}')
# Extract output
temp_output = np.empty((self.model.nvar, self.prediction_steps_n))
for i in range(0, self.prediction_steps_n):
temp_output[:, i] = output[f'x{i + 1:02d}']
pred_u = temp_output[:2]
pred_x = temp_output[2:]
return pred_u, pred_x
[docs] def resample_path_for_tracker(self) -> npt.NDArray:
"""
Resamples path of trajectory for the tracker of the model predictive controller.
Therefore interpolates at which point of the path the vehicle
should be for the respective predictions steps of the prediction horizon.
Returns
-------
npt.NDArray
Planned path for the prediction horizon, shape: (n, 2) with n equal number of prediction steps.
"""
# Localizes the vehicle on the map and resamples it for the tracker's prediction horizon
# Localize
arc_length_localization = self.localize_vehicle_on_path()
# Get map time corresponding to current localization
time_reference = np.interp(arc_length_localization, self.cumulative_arc_length, self.time)
# Time vector of prediction horizon
time_horizon = np.arange(0, self.prediction_steps_n * self.step_size, self.step_size) + time_reference
if self.closed_track is False:
time_horizon = np.minimum(self.time[-1], time_horizon)
else:
time_horizon = np.mod(time_horizon, self.time[-1])
return self.interp_2d(time_horizon, self.time, self.trajectory[:, :2])
[docs] def localize_vehicle_on_path(self) -> float:
"""
Localizes vehicle on the current path.
Returns which path point is closest to the vehicle
by returning the respective cumulative arc length for this point.
Calculates the closest pathpoint based only on the the distance.
In case a skidpad is driven, only the next few points since the last prediction steps are allowed.
Calculates the respective cumulative arc length for this point.
Returns
-------
float
Respective arc length for the closest point on the path to the vehicle.
"""
distance_from_waypoints = np.sqrt(np.sum(np.power((self.x[:2] - self.trajectory[:, :2]), 2), axis=1))
# Find the waypoint with the smallest distance to the vehicle
# If the current missions is skidpad, add the offset due to previous negleted already driven trajectory
if self.is_skidpad is True:
closest_trajectory_index = np.argmin(
distance_from_waypoints[self.last_trajectory_index:self.last_trajectory_index + 9])
closest_trajectory_index += self.last_trajectory_index
if closest_trajectory_index >= self.last_trajectory_index:
self.last_trajectory_index = closest_trajectory_index
else:
closest_trajectory_index = self.last_trajectory_index
else:
closest_trajectory_index = np.argmin(distance_from_waypoints)
# Identify second closest waypoint w.r.t. the vehicle
# trajectory_index1 is the index of the first of the two points
# which make up the straight onto which the vehicle'sposition is projected
if self.closed_track is False:
if (distance_from_waypoints[closest_trajectory_index - 2]
< distance_from_waypoints[closest_trajectory_index]):
trajectory_index1 = closest_trajectory_index - 2
trajectory_index2 = closest_trajectory_index - 1
else:
trajectory_index1 = closest_trajectory_index - 1
trajectory_index2 = closest_trajectory_index
else:
nElements = self.trajectory.shape[0]
if (distance_from_waypoints[np.mod(nElements + closest_trajectory_index - 2, nElements)]
< distance_from_waypoints[np.mod(nElements + closest_trajectory_index, nElements)]):
trajectory_index1 = np.mod(nElements + closest_trajectory_index - 2, nElements)
trajectory_index2 = closest_trajectory_index - 1
else:
trajectory_index1 = closest_trajectory_index - 1
trajectory_index2 = np.mod(nElements + closest_trajectory_index, nElements)
point1 = self.trajectory[trajectory_index1, :2]
point2 = self.trajectory[trajectory_index2, :2]
delta_points = point2 - point1
ds = np.linalg.norm(delta_points)
# Calculate the projection and the corresponding arclength
if ds < 1e-4:
lambda_ = 0
else:
lambda_ = -(delta_points[0] * (point1[0] - self.x[0])
+ delta_points[1] * (point1[1] - self.x[1])) / ds**2
if self.closed_track is False:
arc_length_localization = self.cumulative_arc_length[trajectory_index1] + lambda_ * ds
else:
arc_length_localization = np.mod(self.cumulative_arc_length[trajectory_index1] + lambda_ * ds,
self.cumulative_arc_length[-1])
return arc_length_localization
[docs] def set_trajectory(self, trajectory: npt.NDArray, closed_track: bool) -> None:
"""
Set current planned trajectory and calculate other important values like curvature based on it.
Calculates cumulative arc length, heading at each point,
curvature and a respective time for each trajectory point.
Parameters
----------
trajectory : npt.NDArray
Planned trajectory calculated by Motion Planning, shape: (n, 3) with n equal number trajectory points.
closed_track : bool
Indicates whether the trajectory is closed.
"""
self.trajectory = trajectory
self.closed_track = closed_track
delta_arc_length = np.sqrt(np.sum(np.power(np.diff(trajectory[:,:2], axis=0, prepend=trajectory[:1,:2]), 2), axis=1))
self.cumulative_arc_length = np.cumsum(delta_arc_length)
derivation1 = np.gradient(self.trajectory, axis=0, edge_order=1)
derivation2 = np.gradient(derivation1, axis=0, edge_order=1)
self.heading = np.arctan2(derivation1[:, 1], derivation1[:, 0])
self.curvature = np.divide((np.multiply(derivation1[:, 0], derivation2[:, 1])
- np.multiply(derivation1[:, 1], derivation2[:, 0])),
np.power(np.sum(np.square(derivation1), axis=1), 3 / 2))
smoothed_velocity = np.maximum(uniform_filter1d(trajectory[:, 2], 2, mode='nearest'), 0.0001)
dt = np.divide(delta_arc_length, smoothed_velocity)
self.time = np.cumsum(dt)