Source code for control_node

#!/usr/bin/env python3
"""
ROS Node for control module.

Planned trajectory by Motion Planning, estimated vehicle pose by SLAM and current
steering wheel angle are input for a model predictive controller which uses a nonlinear bicycle model to predict
which are the optimal control inputs (ackermann angle rate and acceleration) to follow this trajectory.
The model predictive controller therefore looks one second into the future. The resulting predicted states and
control inputs are used to calculate reference values for steering actuator, motor and service brake.
"""

from typing import Optional
from enum import Enum
import threading
import traceback
import time

import numpy as np
import numpy.typing as npt

import rospy
import tf2_ros

import tf_conversions

from nav_msgs.msg import Path
from geometry_msgs.msg import PoseStamped
from as_can_msgs.msg import SteeringWheelAngle, Torque, BrakePressure, MissionMachineState, MissionFinished, Velocity
from utilities.msg import TrajectoryPoints, VehiclePose, PredictedStates, MPCStatistics

from mpc import MPC, NoIteration
from vehicle_model import VehicleModel


[docs]class AutonomousMissionEnum(Enum): """Enum to describe which Autonomous Mission is currently select.""" MANUAL_DRIVING = 0 ACCELERATION = 1 SKIDPAD = 2 TRACKDRIVE = 3 BRAKETEST = 4 INSPECTION = 5 AUTOCROSS = 6
[docs]class MissionMachineEnum(Enum): """ Enum to describe the current state of the Autonomous System intern Mission Machine. IDLE means AS OFF. READY equals AS READY but with any mission but Inspection Mission. DRIVING equals AS Driving but with any mission but Inspection Mission. PRE_FINISHED means that the mission is completed but the vehicle has to come to a standstill before it is finished. FINISHED equals AS Finish. EMERGENCY equals AS Emergency. PRE_INSPECTION equals AS READY but with Inspection Mission. INSPECTION equals AS Driving but with Inspection Mission. """ IDLE = 0 READY = 1 DRIVING = 2 PRE_FINISHED = 3 FINISHED = 4 EMERGENCY = 5 PRE_INSPECTION = 6 INSPECTION = 7
[docs]class ControlNode(): """ Implement functionality for calculating reference values for actuators based on trajectory and vehicle pose. Control node class to implement functionality for calculating reference values for steering actuator, motor and service brake with MPC based on planned trajectory, estimated vehicle pose and actual ackermann angle. Uses a nonlinear bicycle model within the MPC. Attributes ---------- mission : AutonomousMissionEnum Current selected Autonomous Mission by user on dashboard display. steering_wheel_angle_control_value_time : float Assumed dead time [s] of steering actuator and thus time to select from prediction horizon for steering reference value. torque_control_value_time : float Assumed dead time [s] of inverter and motor and thus time to select from prediction horizon for motor reference value. service_brake_pressure_control_value_time : float Assumed dead time [s] of service brake and thus time to select from prediction horizon for brake reference value. steering_wheel_angle_range : float Range of steering [degrees] from most left to most right steering wheel angle. steering_wheel_angle_offset : float Offset of steering wheel angle, where the vehicle really drives straight. steering_wheel_angle_upper_boundary : float Allowed upper boundary for steering wheel angle for the current selected mission. steering_wheel_angle_lower_boundary : float Allowed lower boundary for steering wheel angle for the current selected mission. vehicle_model : VehicleModel Vehicle Model used for translating steering wheel angle to ackermann angle and the otherway around as well as acceleration to motor torque and service brake pressure. mpc : MPC Model predictive controller instance to predict optimal control inputs for the prediction horizon. steering_wheel_angle_pub : rospy.Publisher ROS publisher to publish steering wheel angle reference values as requests. torque_pub : rospy.Publisher ROS publisher to publish motor torque reference values as requests. service_brake_pressure_pub : rospy.Publisher ROS publisher to publish service brake pressure reference values as requests. predicted_path_pub : rospy.Publisher ROS publisher to publish visualization of predicted path. predicted_states_pub : rospy.Publisher ROS publisher to publish predicted states and predicted control inputs. mpc_statistics_pub : rospy.Publisher ROS publisher to publish statistics of the model prediction controller for the current prediction. tf_buffer : tf2_ros.Buffer Transformation tree buffer for buffering transforms of the transformation tree. Used to get offset between rear axle and center of gravity. transform_listener : tf2_ros.TransformListener Transformation tree listener to listen to transform of the transformation tree. Used to get offset between rear axle and center of gravity. rear_axle_to_cog_offset : float Projected offset between rear axle and center of gravity in x direction. state_vector : npt.NDArray Current state vector of the the bicycle model of the MPC, shape: (5, ). pred_x : Optional[npt.NDArray] Current predicted states of the bicycle model of the MPC for the prediction horizon, shape (n, 5) with n equals number of prediction steps. (x, y, psi, v_x, delta) pred_u : Optional[npt.NDArray] Current predicted control inputs of the bicycle model of the MPC for the prediction horizon, shape (n, 2) with n equals number of prediction steps. (a_x, delta_rate) pose_lock : threading.Lock Mutual exclusion lock for current vehicle pose estimated by SLAM. trajectory_lock : threading.Lock Mutual exclusion lock for current trajectory planned by Motion Planning. steering_lock : threading.Lock Mutual exclusion lock for current measured actual steering wheel angle. trajectory : Optional[TrajectoryPoints] Current planned trajectory calculated by Motion Planning. vehicle_pose : Optional[VehiclePose] Current vehicle pose estimated by SLAM. ackermann_angle_actual : Optional[float] Current actual ackermann angle converted with vehicle model from measured actual steering wheel angle. mission_machine_state : MissionMachineEnum Current state of Autonomous System intern Mission Machine. """ def __init__(self) -> None: """Initialize Control Node.""" # Current mission self.mission = AutonomousMissionEnum(rospy.get_param('/mission')) self.steering_wheel_angle_control_value_time: float = rospy.get_param( '/control/mpc/control_value_times/steering_wheel_angle') self.torque_control_value_time: float = rospy.get_param( '/control/mpc/control_value_times/torque') self.service_brake_pressure_control_value_time: float = rospy.get_param( '/control/mpc/control_value_times/service_brake_pressure') self.steering_wheel_angle_range = rospy.get_param( '/vehicle/steering_wheel_angle_range') self.steering_wheel_angle_offset = rospy.get_param( '/vehicle/steering_wheel_angle_offset') if self.mission == AutonomousMissionEnum.BRAKETEST: self.steering_wheel_angle_upper_boundary = rospy.get_param( 'control/steering_wheel_angle_boundaries/ebs_test/upper') self.steering_wheel_angle_lower_boundary = rospy.get_param( 'control/steering_wheel_angle_boundaries/ebs_test/lower') elif self.mission == AutonomousMissionEnum.ACCELERATION: self.steering_wheel_angle_upper_boundary = rospy.get_param( 'control/steering_wheel_angle_boundaries/acceleration/upper') self.steering_wheel_angle_lower_boundary = rospy.get_param( 'control/steering_wheel_angle_boundaries/acceleration/lower') else: self.steering_wheel_angle_upper_boundary = rospy.get_param('control/steering_wheel_angle_boundaries/upper') self.steering_wheel_angle_lower_boundary = rospy.get_param('control/steering_wheel_angle_boundaries/lower') self.vehicle_model = VehicleModel( m=rospy.get_param('/vehicle/m'), radius=rospy.get_param('/vehicle/wheel_radius'), gear_ratio=rospy.get_param('/vehicle/rear_gear_ratio'), steering_wheel_angle_range=self.steering_wheel_angle_range, steering_wheel_angle_offset=self.steering_wheel_angle_offset, steering_wheel_to_ackermann_angle_factor=rospy.get_param( '/vehicle/steering_wheel_to_ackermann_angle_factor') ) rospy.loginfo('[Control] Initialize MPC.') self.mpc = MPC(prediction_steps_n=rospy.get_param('/control/mpc/prediction_steps_n'), step_size=rospy.get_param('/control/mpc/step_size'), obj_factor_x_pos=rospy.get_param('/control/mpc/obj_factor/x_pos'), obj_factor_y_pos=rospy.get_param('/control/mpc/obj_factor/y_pos'), obj_factor_a_x=rospy.get_param('/control/mpc/obj_factor/ax'), obj_factor_delta=rospy.get_param('/control/mpc/obj_factor/delta'), obj_factor_delta_rate=rospy.get_param('/control/mpc/obj_factor/delta_rate'), l_r=rospy.get_param('/vehicle/l_r'), l_f=rospy.get_param('/vehicle/l_f'), m=rospy.get_param('/vehicle/m'), c_aero=rospy.get_param('/vehicle/c_aero'), ackermann_angle_lower_boundary=self.vehicle_model.steering_wheel_to_ackermann_angle( -self.steering_wheel_angle_range / 2 + self.steering_wheel_angle_offset), ackermann_angle_upper_boundary=self.vehicle_model.steering_wheel_to_ackermann_angle( self.steering_wheel_angle_range / 2 + self.steering_wheel_angle_offset), is_skidpad=self.mission == AutonomousMissionEnum.SKIDPAD) rospy.loginfo('[Control] MPC initialized.') # Initialize Request Publisher self.steering_wheel_angle_pub = rospy.Publisher('/control/steering_wheel_angle/request', SteeringWheelAngle, queue_size=5) self.torque_pub = rospy.Publisher('/control/torque/request', Torque, queue_size=5) self.service_brake_pressure_pub = rospy.Publisher('/control/service_brake/request', BrakePressure, queue_size=5) # Initialize Visualizer Publisher self.predicted_path_pub = rospy.Publisher('/control/predicted_path/visualization', Path, queue_size=5) self.predicted_states_pub = rospy.Publisher('/control/predicted_states', PredictedStates, queue_size=5) self.mpc_statistics_pub = rospy.Publisher('/control/statistics', MPCStatistics, queue_size=5) self.use_old_control = rospy.get_param('/control/use_old_control') if self.use_old_control is True: self.velocity_pub = rospy.Publisher('/control/velocity/request', Velocity, queue_size=10) self.publish_velocity_request(velocity=0, stamp=rospy.Time.now()) # Initialize buffer for tf tree transformations self.rear_axle_to_cog_offset: Optional[float] = None self.tf_buffer = tf2_ros.Buffer() self.transform_listener = tf2_ros.TransformListener(self.tf_buffer) # Initialize state vector self.state_vector = np.zeros((5)) self.pred_x: Optional[npt.NDArray] = None self.pred_u: Optional[npt.NDArray] = None # Locks for mutual exclusion self.pose_lock = threading.Lock() self.trajectory_lock = threading.Lock() self.steering_lock = threading.Lock() # Initialize subscriber self.trajectory: Optional[TrajectoryPoints] = None rospy.Subscriber('/motion_planning/trajectory', TrajectoryPoints, self.motion_planning_callback) self.vehicle_pose: Optional[VehiclePose] = None rospy.Subscriber('/slam/vehicle_pose', VehiclePose, self.vehicle_pose_callback) self.ackermann_angle_actual: Optional[float] = None rospy.Subscriber('/can/per/steering_wheel_angle/actual', SteeringWheelAngle, self.steering_wheel_angle_callback) self.mission_machine_state = MissionMachineEnum.IDLE rospy.Subscriber('/mission_machine/state', MissionMachineState, self.mission_machine_state_callback) self.mission_completed: bool = False rospy.Subscriber('/finish_line_detection/mission_completed', MissionFinished, self.mission_completed_callback, queue_size=10)
[docs] def mission_completed_callback(self, msg: MissionFinished): """ Buffer whether the mission is completed calculated by finish line detection and received via ROS. A completed mission signals that we should brake as fast as possible and allowed. Parameters ---------- msg : MissionFinished Message indicating whether mission is completed. """ rospy.loginfo_once('[Control] Mission completed, not allowing acceleration anymore.') self.mission_completed = msg.finished
[docs] def mission_machine_state_callback(self, mission_machine_state: MissionMachineState): """ Buffer mission machine state from received ros message. Parameters ---------- mission_machine_state : MissionMachineState Current State of the Autonomous System intern Mission Machine. """ self.mission_machine_state = MissionMachineEnum(mission_machine_state.state)
[docs] def motion_planning_callback(self, trajectory: TrajectoryPoints): """ Buffer planned trajectory calculated by motion planning from received ros message.. Parameters ---------- trajectory : TrajectoryPoints Current by motion planning planned trajectory. """ with self.trajectory_lock: self.trajectory = trajectory
[docs] def vehicle_pose_callback(self, vehicle_pose: VehiclePose): """ Buffer current vehicle pose estaimted by SLAM received from ros message. Parameters ---------- vehicle_pose : VehiclePose Current vehicle pose estimated by SLAM. """ with self.pose_lock: self.vehicle_pose = vehicle_pose
[docs] def steering_wheel_angle_callback(self, steering_wheel_angle: SteeringWheelAngle): """ Buffer current actual steering wheel angle measured by ACTC and received via ros message. Converts and buffers current steering wheel angle [degrees] to ackermann angle [rad]. Parameters ---------- steering_wheel_angle : SteeringWheelAngle Current steering wheel angle in degrees. """ with self.steering_lock: self.ackermann_angle_actual = self.vehicle_model.steering_wheel_to_ackermann_angle( np.clip(steering_wheel_angle.angle, -self.steering_wheel_angle_range / 2 + self.steering_wheel_angle_offset, +self.steering_wheel_angle_range / 2 + self.steering_wheel_angle_offset))
[docs] def get_rear_axle_to_cog_offset(self): """Get offset between rear axle and cog from tf tree and saves the offset as attribute.""" try: translation = self.tf_buffer.lookup_transform('rear_axle', 'cog', rospy.Time()).transform.translation self.rear_axle_to_cog_offset = translation.x except (tf2_ros.LookupException, tf2_ros.ConnectivityException, tf2_ros.ExtrapolationException): rospy.logwarn('Can not look up tansform from rear axle to cog.') else: self.transform_listener.unregister()
[docs] def publish_steering_wheel_angle_request(self, steering_wheel_angle: float, stamp: rospy.Time): """ Publish reference steering wheel angle on ROS topic. Parameters ---------- steering_wheel_angle : float Steering wheel angle to request. stamp : rospy.Time Timestamp for header of request message. """ steering_wheel_angle_msg = SteeringWheelAngle() steering_wheel_angle_msg.header.stamp = stamp steering_wheel_angle_msg.angle = steering_wheel_angle self.steering_wheel_angle_pub.publish(steering_wheel_angle_msg)
[docs] def publish_torque_request(self, torque: float, stamp: rospy.Time): """ Publish reference motor torque on ROS topic. Parameters ---------- torque : float Motor torque to request. stamp : rospy.Time Timestamp for header of request message. """ torque_msg = Torque() torque_msg.header.stamp = stamp if self.mission_completed is False: torque_msg.torque = torque else: torque_msg.torque = 0 self.torque_pub.publish(torque_msg)
[docs] def publish_velocity_request(self, velocity: float, stamp: rospy.Time): """ Publish reference velocity on ROS topic. Parameters ---------- velocity : float Vehicle velocity to request. stamp : rospy.Time Timestamp for header of request message. """ velocity_msg = Velocity() velocity_msg.header.stamp = stamp if self.mission_completed is False: velocity_msg.velocity = velocity else: velocity_msg.velocity = 0 self.velocity_pub.publish(velocity_msg)
[docs] def publish_service_brake_pressure(self, service_brake_pressure: float, stamp: rospy.Time): """ Publish reference service brake pressure on ROS topic. Parameters ---------- service_brake_pressure : float Service Brake pressure to request. stamp : rospy.Time Timestamp for header of request message. """ service_brake_pressure_msg = BrakePressure() service_brake_pressure_msg.header.stamp = stamp service_brake_pressure_msg.pressure = service_brake_pressure self.service_brake_pressure_pub.publish(service_brake_pressure_msg)
[docs] def get_state_vector_for_vehicle_pose_and_ackermann_angle(self, vehicle_pose: VehiclePose, ackermann_angle: float) -> npt.NDArray: """ Get state vector (x, y, psi, v, delta) from ROS vehicle pose message and ackermann angle. Translates rear axle position from vehicle pose message to center of gravity position. Parameters ---------- vehicle_pose : VehiclePose Vehicle pose message originated from SLAM estimation. ackermann_angle : float Ackermann angle. Returns ------- npt.NDArray State vector (x, y, psi, v, delta), shape: (5, ). """ assert self.rear_axle_to_cog_offset is not None new_position = np.array([vehicle_pose.x, vehicle_pose.y]) \ + self.rear_axle_to_cog_offset * np.array([np.cos(vehicle_pose.psi), np.sin(vehicle_pose.psi)]) return np.array([new_position[0], new_position[1], vehicle_pose.psi, max(vehicle_pose.v_x, 0), ackermann_angle])
[docs] def publish_control_requests(self, stamp: rospy.Time, time_offset: float = 0.): """ Publish refrence values for steering, motor and brake based on the latest prediction of the mpc. Interpolates the predicted states and control inputs based on the respective assumed deadtime of the actor and the time since the last succesful prediction (time_offset). This can be used to still send a new reference values after a failed prediction. Also prevents requesting engine torque and service brake pressure at the same time. Also prevents sending service brake pressure during EBS test. Parameters ---------- stamp : rospy.Time Timestamp for the header of the request messages. time_offset : float, optional Time since the last succesful prediction, by default 0. """ time_base = np.arange(0, self.mpc.step_size * self.mpc.prediction_steps_n, self.mpc.step_size) # Get the respective base values for the low level controllers for their respective time delays ackermann_angle = np.interp(self.steering_wheel_angle_control_value_time + time_offset, time_base, self.pred_x[4]) acceleration_for_motor = np.interp(self.torque_control_value_time + time_offset, time_base, self.pred_u[0]) acceleration_for_brake = np.interp(self.service_brake_pressure_control_value_time + time_offset, time_base, self.pred_u[0]) # calculate the control values for the low level controllers based on their respective base values service_brake_pressure = self.vehicle_model.pneumatic_pressure(acceleration=acceleration_for_brake) motor_torque = self.vehicle_model.motor_torque(acceleration=acceleration_for_motor) steering_wheel_angle = self.vehicle_model.ackermann_to_steering_wheel_angle(ackermann_angle=ackermann_angle) # Avoid simultaneous accelerating decelearting if service_brake_pressure > 0: motor_torque = 0.0 elif motor_torque > 0: service_brake_pressure = 0.0 else: service_brake_pressure = 0.0 motor_torque = 0.0 if self.use_old_control is True: with self.trajectory_lock: trajectory = self.trajectory self.publish_velocity_request(velocity=trajectory.v, stamp=rospy.Time.now()) # During Braketest, the SB is disabled if self.mission == AutonomousMissionEnum.BRAKETEST: service_brake_pressure = 0 # Steering angle constrain if steering_wheel_angle < self.steering_wheel_angle_lower_boundary: steering_wheel_angle = self.steering_wheel_angle_lower_boundary elif steering_wheel_angle > self.steering_wheel_angle_upper_boundary: steering_wheel_angle = self.steering_wheel_angle_upper_boundary self.publish_service_brake_pressure(service_brake_pressure=service_brake_pressure, stamp=stamp) self.publish_steering_wheel_angle_request(steering_wheel_angle=steering_wheel_angle, stamp=stamp) self.publish_torque_request(torque=motor_torque, stamp=stamp)
[docs] def publish_old_control_requests(self, failed_steps_n: int, stamp: rospy.Time): """ Publish new control requests based on an old prediction. Used when an execution of the model predictive controller and it's respective predition was not succesful. An old prediction is used but a point more in the future is used. Parameters ---------- failed_steps_n : int Number of predictions that failed since the last succesful prediction. stamp : rospy.Time Timestamp for the header of the request messages. """ if self.pred_u is not None and self.pred_x is not None and max( [self.steering_wheel_angle_control_value_time, self.service_brake_pressure_control_value_time, self.torque_control_value_time]) / self.mpc.step_size + failed_steps_n < self.mpc.prediction_steps_n: self.publish_control_requests(time_offset=failed_steps_n * self.mpc.step_size, stamp=stamp)
[docs] def publish_predicted_states_and_inputs(self, trajectory: TrajectoryPoints, vehicle_pose: VehiclePose, stamp: rospy.Time): """ Publish predicted states and control inputs to ROS topic. Publishes all states, all control inputs and all converted control inputs. Parameters ---------- trajectory : TrajectoryPoints Trajectory for which the prediction has been executed. Used for its image header. vehicle_pose : VehiclePose Vehicle pose for which the prediction has been executed. Used for its header. stamp : rospy.Time Timestamp for the header of the ROS message. """ predicted_states = PredictedStates() predicted_states.pose_header = vehicle_pose.header predicted_states.image_header = trajectory.image_header predicted_states.header.stamp = stamp predicted_states.header.frame_id = 'map' predicted_states.t = np.arange(0, self.mpc.step_size * self.mpc.prediction_steps_n, self.mpc.step_size).tolist() predicted_states.x_pos = self.pred_x[0].tolist() predicted_states.y_pos = self.pred_x[1].tolist() predicted_states.psi = self.pred_x[2].tolist() predicted_states.v_x = self.pred_x[3].tolist() predicted_states.steering_wheel_angle = \ self.vehicle_model.ackermann_to_steering_wheel_angle(self.pred_x[4]).tolist() predicted_states.delta_rate = self.pred_u[1].tolist() predicted_states.a_x = self.pred_u[0].tolist() predicted_states.service_brake_pressure = self.vehicle_model.pneumatic_pressure(self.pred_u[0]).tolist() predicted_states.torque = self.vehicle_model.motor_torque(self.pred_u[0]).tolist() self.predicted_states_pub.publish(predicted_states)
[docs] def publish_predicted_path_visualization(self, stamp: rospy.Time): """ Publish predicted states as path for visualization. Parameters ---------- stamp : rospy.Time Timestamp for the header of the ROS message. """ path_msg = Path() path_msg.header.stamp = stamp path_msg.header.frame_id = 'map' path_msg.poses = [] for i in range(self.mpc.prediction_steps_n): # PredPathViz poses = PoseStamped() poses.header.stamp = stamp poses.header.frame_id = 'map' poses.pose.position.x = self.pred_x[0, i] poses.pose.position.y = self.pred_x[1, i] poses.pose.position.z = 0 q = tf_conversions.transformations.quaternion_from_euler(0, 0, self.pred_x[2, i]) poses.pose.orientation.x = q[0] poses.pose.orientation.y = q[1] poses.pose.orientation.z = q[2] poses.pose.orientation.w = q[3] path_msg.poses.append(poses) self.predicted_path_pub.publish(path_msg)
[docs] def publish_output(self, trajectory: TrajectoryPoints, vehicle_pose: VehiclePose, stamp: rospy.Time): """ Publish all output for the current prediction of the model predictive controller. Parameters ---------- trajectory : TrajectoryPoints Trajectory for which the prediction has been executed. Used for its image header. vehicle_pose : VehiclePose Vehicle pose for which the prediction has been executed. Used for its header. stamp : rospy.Time Timestamp when the prediction has been executed for the header of the ROS messages. """ self.publish_control_requests(stamp=stamp) self.publish_predicted_states_and_inputs(trajectory=trajectory, vehicle_pose=vehicle_pose, stamp=stamp) self.publish_predicted_path_visualization(stamp=stamp)
[docs] def publish_mpc_statistics(self, stamp: rospy.Time): """ Publish statistics of the prediction of the model predictive controller. Parameters ---------- stamp : rospy.Time Timestamp when the prediction has been executed for the header of the ROS message. """ mpc_statistics = MPCStatistics() mpc_statistics.header.stamp = stamp if self.mpc.exitflag is not None: mpc_statistics.exitflag = self.mpc.exitflag if self.mpc.info is not None: mpc_statistics.iterations = self.mpc.info.it mpc_statistics.solve_time = self.mpc.info.solvetime mpc_statistics.function_evaluation_time = self.mpc.info.fevalstime mpc_statistics.objective = self.mpc.info.pobj if self.mpc.exitflag is not None or self.mpc.info is not None: self.mpc_statistics_pub.publish(mpc_statistics)
[docs] def run(self): """ Run the control module. Only executes the model predictive controller when MissionMachine State is Driving, PreFinished (mission completed but not finished) or emergency. Also Trajectory, vehicle pose and actual ackermann angle has to be received atleast once and the offset between rear axle and center of gravity has to be known. Newest trajectory, actual ackermann angle and vehicle pose are read by using mutual exclusion. If the prediction of the model predictive controller is sucecesful all output is published. If the prediction failed, but atleast on prediction had been succesful, new reference values based on old predictions are published. Statistics of the prediction are always publishing including the exit flag and time to solve. """ failed_steps_n = 0 rate = rospy.Rate(1 / self.mpc.step_size) while not rospy.is_shutdown(): if self.mission_machine_state in [MissionMachineEnum.DRIVING, MissionMachineEnum.PRE_FINISHED, MissionMachineEnum.EMERGENCY]\ and self.trajectory is not None and self.vehicle_pose is not None\ and self.ackermann_angle_actual is not None and self.rear_axle_to_cog_offset is not None: with self.steering_lock: ackermann_angle_actual = self.ackermann_angle_actual with self.trajectory_lock: trajectory = self.trajectory self.mpc.set_trajectory(trajectory=np.c_[trajectory.x, trajectory.y, trajectory.v], closed_track=trajectory.closed) with self.pose_lock: vehicle_pose = self.vehicle_pose state_vector = self.get_state_vector_for_vehicle_pose_and_ackermann_angle( vehicle_pose=vehicle_pose, ackermann_angle=ackermann_angle_actual) try: self.pred_u, self.pred_x = self.mpc.predict(state_vector=state_vector) except Exception as e: now = rospy.Time.now() failed_steps_n += 1 if isinstance(e, NoIteration): rospy.logwarn(f'[Control] No prediction executed: {self.mpc.exitflag}') else: rospy.logerr(traceback.format_exc()) self.publish_old_control_requests(failed_steps_n=failed_steps_n, stamp=now) else: now = rospy.Time.now() self.publish_output(vehicle_pose=vehicle_pose, trajectory=trajectory, stamp=now) failed_steps_n = 0 finally: self.publish_mpc_statistics(stamp=now) else: if self.rear_axle_to_cog_offset is None: self.get_rear_axle_to_cog_offset() rate.sleep()
if __name__ == '__main__': rospy.init_node('control_node', anonymous=False) try: rospy.loginfo('[Control] Started Node.') control = ControlNode() control.run() except Exception as e: if not isinstance(e, rospy.exceptions.ROSInterruptException): rospy.logfatal(traceback.format_exc()) time.sleep(2) raise e