#!/usr/bin/env python3
"""
ROS Node for control module.
Planned trajectory by Motion Planning, estimated vehicle pose by SLAM and current
steering wheel angle are input for a model predictive controller which uses a nonlinear bicycle model to predict
which are the optimal control inputs (ackermann angle rate and acceleration) to follow this trajectory.
The model predictive controller therefore looks one second into the future. The resulting predicted states and
control inputs are used to calculate reference values for steering actuator, motor and service brake.
"""
from typing import Optional
from enum import Enum
import threading
import traceback
import time
import numpy as np
import numpy.typing as npt
import rospy
import tf2_ros
import tf_conversions
from nav_msgs.msg import Path
from geometry_msgs.msg import PoseStamped
from as_can_msgs.msg import SteeringWheelAngle, Torque, BrakePressure, MissionMachineState, MissionFinished, Velocity
from utilities.msg import TrajectoryPoints, VehiclePose, PredictedStates, MPCStatistics
from mpc import MPC, NoIteration
from vehicle_model import VehicleModel
[docs]class AutonomousMissionEnum(Enum):
"""Enum to describe which Autonomous Mission is currently select."""
MANUAL_DRIVING = 0
ACCELERATION = 1
SKIDPAD = 2
TRACKDRIVE = 3
BRAKETEST = 4
INSPECTION = 5
AUTOCROSS = 6
[docs]class MissionMachineEnum(Enum):
"""
Enum to describe the current state of the Autonomous System intern Mission Machine.
IDLE means AS OFF.
READY equals AS READY but with any mission but Inspection Mission.
DRIVING equals AS Driving but with any mission but Inspection Mission.
PRE_FINISHED means that the mission is completed but the vehicle has to come to a standstill before it is finished.
FINISHED equals AS Finish.
EMERGENCY equals AS Emergency.
PRE_INSPECTION equals AS READY but with Inspection Mission.
INSPECTION equals AS Driving but with Inspection Mission.
"""
IDLE = 0
READY = 1
DRIVING = 2
PRE_FINISHED = 3
FINISHED = 4
EMERGENCY = 5
PRE_INSPECTION = 6
INSPECTION = 7
[docs]class ControlNode():
"""
Implement functionality for calculating reference values for actuators based on trajectory and vehicle pose.
Control node class to implement functionality for calculating reference values for steering actuator, motor and
service brake with MPC based on planned trajectory, estimated vehicle pose and actual ackermann angle.
Uses a nonlinear bicycle model within the MPC.
Attributes
----------
mission : AutonomousMissionEnum
Current selected Autonomous Mission by user on dashboard display.
steering_wheel_angle_control_value_time : float
Assumed dead time [s] of steering actuator and thus time to
select from prediction horizon for steering reference value.
torque_control_value_time : float
Assumed dead time [s] of inverter and motor and thus time to
select from prediction horizon for motor reference value.
service_brake_pressure_control_value_time : float
Assumed dead time [s] of service brake and thus time to
select from prediction horizon for brake reference value.
steering_wheel_angle_range : float
Range of steering [degrees] from most left to most right steering wheel angle.
steering_wheel_angle_offset : float
Offset of steering wheel angle, where the vehicle really drives straight.
steering_wheel_angle_upper_boundary : float
Allowed upper boundary for steering wheel angle for the current selected mission.
steering_wheel_angle_lower_boundary : float
Allowed lower boundary for steering wheel angle for the current selected mission.
vehicle_model : VehicleModel
Vehicle Model used for translating steering wheel angle to ackermann angle and the otherway around
as well as acceleration to motor torque and service brake pressure.
mpc : MPC
Model predictive controller instance to predict optimal control inputs for the prediction horizon.
steering_wheel_angle_pub : rospy.Publisher
ROS publisher to publish steering wheel angle reference values as requests.
torque_pub : rospy.Publisher
ROS publisher to publish motor torque reference values as requests.
service_brake_pressure_pub : rospy.Publisher
ROS publisher to publish service brake pressure reference values as requests.
predicted_path_pub : rospy.Publisher
ROS publisher to publish visualization of predicted path.
predicted_states_pub : rospy.Publisher
ROS publisher to publish predicted states and predicted control inputs.
mpc_statistics_pub : rospy.Publisher
ROS publisher to publish statistics of the model prediction controller for the current prediction.
tf_buffer : tf2_ros.Buffer
Transformation tree buffer for buffering transforms of the transformation tree.
Used to get offset between rear axle and center of gravity.
transform_listener : tf2_ros.TransformListener
Transformation tree listener to listen to transform of the transformation tree.
Used to get offset between rear axle and center of gravity.
rear_axle_to_cog_offset : float
Projected offset between rear axle and center of gravity in x direction.
state_vector : npt.NDArray
Current state vector of the the bicycle model of the MPC, shape: (5, ).
pred_x : Optional[npt.NDArray]
Current predicted states of the bicycle model of the MPC for the prediction horizon,
shape (n, 5) with n equals number of prediction steps. (x, y, psi, v_x, delta)
pred_u : Optional[npt.NDArray]
Current predicted control inputs of the bicycle model of the MPC for the prediction horizon,
shape (n, 2) with n equals number of prediction steps. (a_x, delta_rate)
pose_lock : threading.Lock
Mutual exclusion lock for current vehicle pose estimated by SLAM.
trajectory_lock : threading.Lock
Mutual exclusion lock for current trajectory planned by Motion Planning.
steering_lock : threading.Lock
Mutual exclusion lock for current measured actual steering wheel angle.
trajectory : Optional[TrajectoryPoints]
Current planned trajectory calculated by Motion Planning.
vehicle_pose : Optional[VehiclePose]
Current vehicle pose estimated by SLAM.
ackermann_angle_actual : Optional[float]
Current actual ackermann angle converted with vehicle model from measured actual steering wheel angle.
mission_machine_state : MissionMachineEnum
Current state of Autonomous System intern Mission Machine.
"""
def __init__(self) -> None:
"""Initialize Control Node."""
# Current mission
self.mission = AutonomousMissionEnum(rospy.get_param('/mission'))
self.steering_wheel_angle_control_value_time: float = rospy.get_param(
'/control/mpc/control_value_times/steering_wheel_angle')
self.torque_control_value_time: float = rospy.get_param(
'/control/mpc/control_value_times/torque')
self.service_brake_pressure_control_value_time: float = rospy.get_param(
'/control/mpc/control_value_times/service_brake_pressure')
self.steering_wheel_angle_range = rospy.get_param(
'/vehicle/steering_wheel_angle_range')
self.steering_wheel_angle_offset = rospy.get_param(
'/vehicle/steering_wheel_angle_offset')
if self.mission == AutonomousMissionEnum.BRAKETEST:
self.steering_wheel_angle_upper_boundary = rospy.get_param(
'control/steering_wheel_angle_boundaries/ebs_test/upper')
self.steering_wheel_angle_lower_boundary = rospy.get_param(
'control/steering_wheel_angle_boundaries/ebs_test/lower')
elif self.mission == AutonomousMissionEnum.ACCELERATION:
self.steering_wheel_angle_upper_boundary = rospy.get_param(
'control/steering_wheel_angle_boundaries/acceleration/upper')
self.steering_wheel_angle_lower_boundary = rospy.get_param(
'control/steering_wheel_angle_boundaries/acceleration/lower')
else:
self.steering_wheel_angle_upper_boundary = rospy.get_param('control/steering_wheel_angle_boundaries/upper')
self.steering_wheel_angle_lower_boundary = rospy.get_param('control/steering_wheel_angle_boundaries/lower')
self.vehicle_model = VehicleModel(
m=rospy.get_param('/vehicle/m'),
radius=rospy.get_param('/vehicle/wheel_radius'),
gear_ratio=rospy.get_param('/vehicle/rear_gear_ratio'),
steering_wheel_angle_range=self.steering_wheel_angle_range,
steering_wheel_angle_offset=self.steering_wheel_angle_offset,
steering_wheel_to_ackermann_angle_factor=rospy.get_param(
'/vehicle/steering_wheel_to_ackermann_angle_factor')
)
rospy.loginfo('[Control] Initialize MPC.')
self.mpc = MPC(prediction_steps_n=rospy.get_param('/control/mpc/prediction_steps_n'),
step_size=rospy.get_param('/control/mpc/step_size'),
obj_factor_x_pos=rospy.get_param('/control/mpc/obj_factor/x_pos'),
obj_factor_y_pos=rospy.get_param('/control/mpc/obj_factor/y_pos'),
obj_factor_a_x=rospy.get_param('/control/mpc/obj_factor/ax'),
obj_factor_delta=rospy.get_param('/control/mpc/obj_factor/delta'),
obj_factor_delta_rate=rospy.get_param('/control/mpc/obj_factor/delta_rate'),
l_r=rospy.get_param('/vehicle/l_r'), l_f=rospy.get_param('/vehicle/l_f'),
m=rospy.get_param('/vehicle/m'), c_aero=rospy.get_param('/vehicle/c_aero'),
ackermann_angle_lower_boundary=self.vehicle_model.steering_wheel_to_ackermann_angle(
-self.steering_wheel_angle_range / 2 + self.steering_wheel_angle_offset),
ackermann_angle_upper_boundary=self.vehicle_model.steering_wheel_to_ackermann_angle(
self.steering_wheel_angle_range / 2 + self.steering_wheel_angle_offset),
is_skidpad=self.mission == AutonomousMissionEnum.SKIDPAD)
rospy.loginfo('[Control] MPC initialized.')
# Initialize Request Publisher
self.steering_wheel_angle_pub = rospy.Publisher('/control/steering_wheel_angle/request', SteeringWheelAngle,
queue_size=5)
self.torque_pub = rospy.Publisher('/control/torque/request', Torque, queue_size=5)
self.service_brake_pressure_pub = rospy.Publisher('/control/service_brake/request', BrakePressure, queue_size=5)
# Initialize Visualizer Publisher
self.predicted_path_pub = rospy.Publisher('/control/predicted_path/visualization', Path, queue_size=5)
self.predicted_states_pub = rospy.Publisher('/control/predicted_states', PredictedStates, queue_size=5)
self.mpc_statistics_pub = rospy.Publisher('/control/statistics', MPCStatistics, queue_size=5)
self.use_old_control = rospy.get_param('/control/use_old_control')
if self.use_old_control is True:
self.velocity_pub = rospy.Publisher('/control/velocity/request', Velocity, queue_size=10)
self.publish_velocity_request(velocity=0, stamp=rospy.Time.now())
# Initialize buffer for tf tree transformations
self.rear_axle_to_cog_offset: Optional[float] = None
self.tf_buffer = tf2_ros.Buffer()
self.transform_listener = tf2_ros.TransformListener(self.tf_buffer)
# Initialize state vector
self.state_vector = np.zeros((5))
self.pred_x: Optional[npt.NDArray] = None
self.pred_u: Optional[npt.NDArray] = None
# Locks for mutual exclusion
self.pose_lock = threading.Lock()
self.trajectory_lock = threading.Lock()
self.steering_lock = threading.Lock()
# Initialize subscriber
self.trajectory: Optional[TrajectoryPoints] = None
rospy.Subscriber('/motion_planning/trajectory', TrajectoryPoints, self.motion_planning_callback)
self.vehicle_pose: Optional[VehiclePose] = None
rospy.Subscriber('/slam/vehicle_pose', VehiclePose, self.vehicle_pose_callback)
self.ackermann_angle_actual: Optional[float] = None
rospy.Subscriber('/can/per/steering_wheel_angle/actual', SteeringWheelAngle, self.steering_wheel_angle_callback)
self.mission_machine_state = MissionMachineEnum.IDLE
rospy.Subscriber('/mission_machine/state', MissionMachineState, self.mission_machine_state_callback)
self.mission_completed: bool = False
rospy.Subscriber('/finish_line_detection/mission_completed', MissionFinished,
self.mission_completed_callback, queue_size=10)
[docs] def mission_completed_callback(self, msg: MissionFinished):
"""
Buffer whether the mission is completed calculated by finish line detection and received via ROS.
A completed mission signals that we should brake as fast as possible and allowed.
Parameters
----------
msg : MissionFinished
Message indicating whether mission is completed.
"""
rospy.loginfo_once('[Control] Mission completed, not allowing acceleration anymore.')
self.mission_completed = msg.finished
[docs] def mission_machine_state_callback(self, mission_machine_state: MissionMachineState):
"""
Buffer mission machine state from received ros message.
Parameters
----------
mission_machine_state : MissionMachineState
Current State of the Autonomous System intern Mission Machine.
"""
self.mission_machine_state = MissionMachineEnum(mission_machine_state.state)
[docs] def motion_planning_callback(self, trajectory: TrajectoryPoints):
"""
Buffer planned trajectory calculated by motion planning from received ros message..
Parameters
----------
trajectory : TrajectoryPoints
Current by motion planning planned trajectory.
"""
with self.trajectory_lock:
self.trajectory = trajectory
[docs] def vehicle_pose_callback(self, vehicle_pose: VehiclePose):
"""
Buffer current vehicle pose estaimted by SLAM received from ros message.
Parameters
----------
vehicle_pose : VehiclePose
Current vehicle pose estimated by SLAM.
"""
with self.pose_lock:
self.vehicle_pose = vehicle_pose
[docs] def steering_wheel_angle_callback(self, steering_wheel_angle: SteeringWheelAngle):
"""
Buffer current actual steering wheel angle measured by ACTC and received via ros message.
Converts and buffers current steering wheel angle [degrees] to ackermann angle [rad].
Parameters
----------
steering_wheel_angle : SteeringWheelAngle
Current steering wheel angle in degrees.
"""
with self.steering_lock:
self.ackermann_angle_actual = self.vehicle_model.steering_wheel_to_ackermann_angle(
np.clip(steering_wheel_angle.angle,
-self.steering_wheel_angle_range / 2 + self.steering_wheel_angle_offset,
+self.steering_wheel_angle_range / 2 + self.steering_wheel_angle_offset))
[docs] def get_rear_axle_to_cog_offset(self):
"""Get offset between rear axle and cog from tf tree and saves the offset as attribute."""
try:
translation = self.tf_buffer.lookup_transform('rear_axle', 'cog', rospy.Time()).transform.translation
self.rear_axle_to_cog_offset = translation.x
except (tf2_ros.LookupException, tf2_ros.ConnectivityException, tf2_ros.ExtrapolationException):
rospy.logwarn('Can not look up tansform from rear axle to cog.')
else:
self.transform_listener.unregister()
[docs] def publish_steering_wheel_angle_request(self, steering_wheel_angle: float, stamp: rospy.Time):
"""
Publish reference steering wheel angle on ROS topic.
Parameters
----------
steering_wheel_angle : float
Steering wheel angle to request.
stamp : rospy.Time
Timestamp for header of request message.
"""
steering_wheel_angle_msg = SteeringWheelAngle()
steering_wheel_angle_msg.header.stamp = stamp
steering_wheel_angle_msg.angle = steering_wheel_angle
self.steering_wheel_angle_pub.publish(steering_wheel_angle_msg)
[docs] def publish_torque_request(self, torque: float, stamp: rospy.Time):
"""
Publish reference motor torque on ROS topic.
Parameters
----------
torque : float
Motor torque to request.
stamp : rospy.Time
Timestamp for header of request message.
"""
torque_msg = Torque()
torque_msg.header.stamp = stamp
if self.mission_completed is False:
torque_msg.torque = torque
else:
torque_msg.torque = 0
self.torque_pub.publish(torque_msg)
[docs] def publish_velocity_request(self, velocity: float, stamp: rospy.Time):
"""
Publish reference velocity on ROS topic.
Parameters
----------
velocity : float
Vehicle velocity to request.
stamp : rospy.Time
Timestamp for header of request message.
"""
velocity_msg = Velocity()
velocity_msg.header.stamp = stamp
if self.mission_completed is False:
velocity_msg.velocity = velocity
else:
velocity_msg.velocity = 0
self.velocity_pub.publish(velocity_msg)
[docs] def publish_service_brake_pressure(self, service_brake_pressure: float, stamp: rospy.Time):
"""
Publish reference service brake pressure on ROS topic.
Parameters
----------
service_brake_pressure : float
Service Brake pressure to request.
stamp : rospy.Time
Timestamp for header of request message.
"""
service_brake_pressure_msg = BrakePressure()
service_brake_pressure_msg.header.stamp = stamp
service_brake_pressure_msg.pressure = service_brake_pressure
self.service_brake_pressure_pub.publish(service_brake_pressure_msg)
[docs] def get_state_vector_for_vehicle_pose_and_ackermann_angle(self, vehicle_pose: VehiclePose,
ackermann_angle: float) -> npt.NDArray:
"""
Get state vector (x, y, psi, v, delta) from ROS vehicle pose message and ackermann angle.
Translates rear axle position from vehicle pose message to center of gravity position.
Parameters
----------
vehicle_pose : VehiclePose
Vehicle pose message originated from SLAM estimation.
ackermann_angle : float
Ackermann angle.
Returns
-------
npt.NDArray
State vector (x, y, psi, v, delta), shape: (5, ).
"""
assert self.rear_axle_to_cog_offset is not None
new_position = np.array([vehicle_pose.x, vehicle_pose.y]) \
+ self.rear_axle_to_cog_offset * np.array([np.cos(vehicle_pose.psi), np.sin(vehicle_pose.psi)])
return np.array([new_position[0], new_position[1], vehicle_pose.psi, max(vehicle_pose.v_x, 0), ackermann_angle])
[docs] def publish_control_requests(self, stamp: rospy.Time, time_offset: float = 0.):
"""
Publish refrence values for steering, motor and brake based on the latest prediction of the mpc.
Interpolates the predicted states and control inputs based on the respective assumed deadtime of the actor
and the time since the last succesful prediction (time_offset). This can be used to still send a new
reference values after a failed prediction.
Also prevents requesting engine torque and service brake pressure at the same time.
Also prevents sending service brake pressure during EBS test.
Parameters
----------
stamp : rospy.Time
Timestamp for the header of the request messages.
time_offset : float, optional
Time since the last succesful prediction, by default 0.
"""
time_base = np.arange(0, self.mpc.step_size * self.mpc.prediction_steps_n, self.mpc.step_size)
# Get the respective base values for the low level controllers for their respective time delays
ackermann_angle = np.interp(self.steering_wheel_angle_control_value_time + time_offset,
time_base, self.pred_x[4])
acceleration_for_motor = np.interp(self.torque_control_value_time + time_offset, time_base, self.pred_u[0])
acceleration_for_brake = np.interp(self.service_brake_pressure_control_value_time + time_offset,
time_base, self.pred_u[0])
# calculate the control values for the low level controllers based on their respective base values
service_brake_pressure = self.vehicle_model.pneumatic_pressure(acceleration=acceleration_for_brake)
motor_torque = self.vehicle_model.motor_torque(acceleration=acceleration_for_motor)
steering_wheel_angle = self.vehicle_model.ackermann_to_steering_wheel_angle(ackermann_angle=ackermann_angle)
# Avoid simultaneous accelerating decelearting
if service_brake_pressure > 0:
motor_torque = 0.0
elif motor_torque > 0:
service_brake_pressure = 0.0
else:
service_brake_pressure = 0.0
motor_torque = 0.0
if self.use_old_control is True:
with self.trajectory_lock:
trajectory = self.trajectory
self.publish_velocity_request(velocity=trajectory.v, stamp=rospy.Time.now())
# During Braketest, the SB is disabled
if self.mission == AutonomousMissionEnum.BRAKETEST:
service_brake_pressure = 0
# Steering angle constrain
if steering_wheel_angle < self.steering_wheel_angle_lower_boundary:
steering_wheel_angle = self.steering_wheel_angle_lower_boundary
elif steering_wheel_angle > self.steering_wheel_angle_upper_boundary:
steering_wheel_angle = self.steering_wheel_angle_upper_boundary
self.publish_service_brake_pressure(service_brake_pressure=service_brake_pressure, stamp=stamp)
self.publish_steering_wheel_angle_request(steering_wheel_angle=steering_wheel_angle, stamp=stamp)
self.publish_torque_request(torque=motor_torque, stamp=stamp)
[docs] def publish_old_control_requests(self, failed_steps_n: int, stamp: rospy.Time):
"""
Publish new control requests based on an old prediction.
Used when an execution of the model predictive controller and it's respective predition was not succesful.
An old prediction is used but a point more in the future is used.
Parameters
----------
failed_steps_n : int
Number of predictions that failed since the last succesful prediction.
stamp : rospy.Time
Timestamp for the header of the request messages.
"""
if self.pred_u is not None and self.pred_x is not None and max(
[self.steering_wheel_angle_control_value_time, self.service_brake_pressure_control_value_time,
self.torque_control_value_time]) / self.mpc.step_size + failed_steps_n < self.mpc.prediction_steps_n:
self.publish_control_requests(time_offset=failed_steps_n * self.mpc.step_size, stamp=stamp)
[docs] def publish_predicted_path_visualization(self, stamp: rospy.Time):
"""
Publish predicted states as path for visualization.
Parameters
----------
stamp : rospy.Time
Timestamp for the header of the ROS message.
"""
path_msg = Path()
path_msg.header.stamp = stamp
path_msg.header.frame_id = 'map'
path_msg.poses = []
for i in range(self.mpc.prediction_steps_n):
# PredPathViz
poses = PoseStamped()
poses.header.stamp = stamp
poses.header.frame_id = 'map'
poses.pose.position.x = self.pred_x[0, i]
poses.pose.position.y = self.pred_x[1, i]
poses.pose.position.z = 0
q = tf_conversions.transformations.quaternion_from_euler(0, 0, self.pred_x[2, i])
poses.pose.orientation.x = q[0]
poses.pose.orientation.y = q[1]
poses.pose.orientation.z = q[2]
poses.pose.orientation.w = q[3]
path_msg.poses.append(poses)
self.predicted_path_pub.publish(path_msg)
[docs] def publish_output(self, trajectory: TrajectoryPoints, vehicle_pose: VehiclePose,
stamp: rospy.Time):
"""
Publish all output for the current prediction of the model predictive controller.
Parameters
----------
trajectory : TrajectoryPoints
Trajectory for which the prediction has been executed. Used for its image header.
vehicle_pose : VehiclePose
Vehicle pose for which the prediction has been executed. Used for its header.
stamp : rospy.Time
Timestamp when the prediction has been executed for the header of the ROS messages.
"""
self.publish_control_requests(stamp=stamp)
self.publish_predicted_states_and_inputs(trajectory=trajectory, vehicle_pose=vehicle_pose, stamp=stamp)
self.publish_predicted_path_visualization(stamp=stamp)
[docs] def publish_mpc_statistics(self, stamp: rospy.Time):
"""
Publish statistics of the prediction of the model predictive controller.
Parameters
----------
stamp : rospy.Time
Timestamp when the prediction has been executed for the header of the ROS message.
"""
mpc_statistics = MPCStatistics()
mpc_statistics.header.stamp = stamp
if self.mpc.exitflag is not None:
mpc_statistics.exitflag = self.mpc.exitflag
if self.mpc.info is not None:
mpc_statistics.iterations = self.mpc.info.it
mpc_statistics.solve_time = self.mpc.info.solvetime
mpc_statistics.function_evaluation_time = self.mpc.info.fevalstime
mpc_statistics.objective = self.mpc.info.pobj
if self.mpc.exitflag is not None or self.mpc.info is not None:
self.mpc_statistics_pub.publish(mpc_statistics)
[docs] def run(self):
"""
Run the control module.
Only executes the model predictive controller when MissionMachine State is
Driving, PreFinished (mission completed but not finished) or emergency.
Also Trajectory, vehicle pose and actual ackermann angle has to be received atleast once
and the offset between rear axle and center of gravity has to be known.
Newest trajectory, actual ackermann angle and vehicle pose are read by using mutual exclusion.
If the prediction of the model predictive controller is sucecesful all output is published.
If the prediction failed, but atleast on prediction had been succesful, new reference values
based on old predictions are published.
Statistics of the prediction are always publishing including the exit flag and time to solve.
"""
failed_steps_n = 0
rate = rospy.Rate(1 / self.mpc.step_size)
while not rospy.is_shutdown():
if self.mission_machine_state in [MissionMachineEnum.DRIVING, MissionMachineEnum.PRE_FINISHED,
MissionMachineEnum.EMERGENCY]\
and self.trajectory is not None and self.vehicle_pose is not None\
and self.ackermann_angle_actual is not None and self.rear_axle_to_cog_offset is not None:
with self.steering_lock:
ackermann_angle_actual = self.ackermann_angle_actual
with self.trajectory_lock:
trajectory = self.trajectory
self.mpc.set_trajectory(trajectory=np.c_[trajectory.x, trajectory.y, trajectory.v],
closed_track=trajectory.closed)
with self.pose_lock:
vehicle_pose = self.vehicle_pose
state_vector = self.get_state_vector_for_vehicle_pose_and_ackermann_angle(
vehicle_pose=vehicle_pose, ackermann_angle=ackermann_angle_actual)
try:
self.pred_u, self.pred_x = self.mpc.predict(state_vector=state_vector)
except Exception as e:
now = rospy.Time.now()
failed_steps_n += 1
if isinstance(e, NoIteration):
rospy.logwarn(f'[Control] No prediction executed: {self.mpc.exitflag}')
else:
rospy.logerr(traceback.format_exc())
self.publish_old_control_requests(failed_steps_n=failed_steps_n, stamp=now)
else:
now = rospy.Time.now()
self.publish_output(vehicle_pose=vehicle_pose, trajectory=trajectory, stamp=now)
failed_steps_n = 0
finally:
self.publish_mpc_statistics(stamp=now)
else:
if self.rear_axle_to_cog_offset is None:
self.get_rear_axle_to_cog_offset()
rate.sleep()
if __name__ == '__main__':
rospy.init_node('control_node', anonymous=False)
try:
rospy.loginfo('[Control] Started Node.')
control = ControlNode()
control.run()
except Exception as e:
if not isinstance(e, rospy.exceptions.ROSInterruptException):
rospy.logfatal(traceback.format_exc())
time.sleep(2)
raise e