Source code for motion_planning_node

#!/usr/bin/env python3
"""Motion Planning ROS node to plan a trajectory (path + velocity) based on centerpoints and vehicle pose."""
import threading
import traceback
import time
from typing import Optional, Tuple

import numpy as np
import numpy.typing as npt

import rospy
import rospkg

import multiprocessing as mp
import multiprocessing.connection as mpc

from enum import Enum

from std_msgs.msg import Header

from utilities.msg import CenterPoints, TrajectoryPoints, VehiclePose, TrajectoryPathSlice, TrajectoryPathSlices
from as_can_msgs.msg import MissionFinished

from local_motion import local_motion_planner, velocity_profile as local_velocity_profile
from global_motion import global_motion_planner
from planning_lib import gg_manager

[docs]class AutonomousMissionEnum(Enum): """Enum to describe which Autonomous Mission is currently select.""" MANUAL_DRIVING = 0 ACCELERATION = 1 SKIDPAD = 2 TRACKDRIVE = 3 BRAKETEST = 4 INSPECTION = 5 AUTOCROSS = 6
[docs]class PlanningMode(Enum): """ Enum to describe which Planning Mode is currently used. .. csv-table:: Planning Modes :header: "Value", "Name", "Meaning" 0, NO, "Nothing is planned yet" 1, LOCAL, "Currently only local tracks are planned" 2, PARALLEL, "Currently a global track is optimized and parallel local tracks are still planned" 3, GLOBAL, "A global, closed track is already been optimized, either optimizing another or waiting for one" 4, SKIDPAD, "Global skidpad tracks are being planned" 5, FINISH, "Motion is planned to brake as fast as possible and allowed" """ NO = 0 LOCAL = 1 PARALLEL = 2 GLOBAL = 3 SKIDPAD = 4 FINISH = 5
[docs]class MotionPlanning(): """ Motion Planning ROS node to plan a trajectory (path + velocity) based on centerpoints and vehicle pose. Attributes ---------- mission_completed : bool Indicates whether the mission is already completed. vehicle_pose : VehiclePose Current vehicle pose estimated by SLAM. vehicle_pose_lock : threading.Lock Mutual exclusion lock for current vehicle pose estimated by SLAM. centerpoints_msg : CenterPoints Latest centerpoints filtered by track filtering. centerpoints_lock : threading.Lock Mutual exclusion lock for latest centerpoints filtered by track filtering. send_trajectory_lock : threading.Lock Mutual exclusion lock for publishing trajectory. Used to ensure that **not** a local trajectory is send after a global trajectory by mistake. optimizer_lock : threading.Lock Mutual exclusion lock for optimizing a global track. Check what it is used for. plan_locally_event : threading.Event Threading event to signalize that a new centerpoints have been received for which should be planned locally. plan_globally_event : threading.Event Threading event to signalize that a new centerpoints have been received for which should be planned globally. local_thread : threading.Thread Thread for planning locally. global_thread : threading.Thread Thread for planning globally. optimizer : Optional[global_optimization.GlobalOptimizer] Global Optimizer which has been used for optimizing a global trajectory successfully. send_debug_informations : bool Indicates whether debug informations should be send via ROS. last_planning_mode : PlanningMode Planning mode of last Motion Planning. mission : AutonomousMissionEnum Mission which the vehicle currently drives. optimize_skidpad_path : bool Whether to only calculate the velocity profile for skidpad or also optimize the path for skidpad. is_skidpad : bool Indicates whether the current autonomous mission is skidpad. max_local_track_length : float Maximal track length for local paths for when they are limited. min_local_track_gap : float Mnimum gap between the nearest centerpoint and the vehicle for when centerpoints are limited. lr_cog : float Offset between rear axle and center of gravity (cog) of vehicle. Used for moving vehicle position to cog. trajectory_publisher : rospy.Publisher ROS Publisher to publish trajectories. local_trajectory_path_slices_publisher : rospy.Publisher ROS Publisher to publish debug informations for local motion planner local_motion_planner_config : local_motion_planner.LocalMotionPlannerConfig Configuration for local motion planner. global_optimizer_config : global_optimization.GlobalOptimizerConfig Configuration for global optimizer. global_velocity_config : global_velocity.VelocityConfig Configuration for velocity profiler for global planning. local_velocity_config : local_velocity_profile.VelocityProfileConfig Configuration for velocity profiler for local planning. """ def __init__(self) -> None: """Initialize motion planning ROS node.""" self.mission_completed = False self.vehicle_pose = VehiclePose() self.centerpoints_lock = threading.Lock() self.vehicle_pose_lock = threading.Lock() self.send_trajectory_lock = threading.Lock() self.optimizer_lock = threading.Lock() self.plan_locally_event = threading.Event() self.plan_globally_event = threading.Event() self.local_thread = threading.Thread(target=self.plan_locally, daemon=True) self.local_thread.start() self.global_thread = threading.Thread(target=self.plan_globally, daemon=True) self.global_thread.start() self.optimizer = None self.centerpoints_msg = None self.send_debug_informations = rospy.get_param('/motion_planning/send_debug_informations', default=False) self.last_planning_mode = PlanningMode(0) self.mission = AutonomousMissionEnum(rospy.get_param('/mission')) self.optimize_skidpad_path = rospy.get_param('/local_motion_planning/skidpad/optimize_path') self.is_skidpad = self.mission == AutonomousMissionEnum.SKIDPAD self.max_local_track_length = rospy.get_param( '/local_motion_planning/limit_global_centerpoints/max_track_length') self.min_local_track_gap = rospy.get_param('/local_motion_planning/limit_global_centerpoints/min_track_gap') self.lr_cog = rospy.get_param('model/loads/lr_cog') self.local_motion_planner_config = local_motion_planner.LocalMotionPlannerConfig( max_length_between_layer=rospy.get_param('/local_motion_planning/interpolation/max_length_between_layers'), support_points_between_nodes_n=rospy.get_param( '/local_motion_planning/interpolation/support_points_between_nodes_n'), support_points_for_trajectory_n=rospy.get_param( '/local_motion_planning/interpolation/support_points_for_trajectory_n'), support_points_for_initial_interpolation_n=rospy.get_param( '/local_motion_planning/interpolation/support_points_for_initial_interpolation_n'), smooth_final_path=rospy.get_param('/local_motion_planning/interpolation/smooth_final_path'), nodes_per_layer=rospy.get_param('/local_motion_planning/interpolation/nodes_per_layer'), vehicle_width=rospy.get_param('/local_motion_planning/track_width/vehicle_width'), buffer_to_trackbounds=rospy.get_param('/local_motion_planning/track_width/buffer_to_trackbounds'), weight_length=rospy.get_param('/local_motion_planning/optimization/weights/length'), weight_max_curvature=rospy.get_param('/local_motion_planning/optimization/weights/max_curvature'), weight_average_curvature=rospy.get_param('/local_motion_planning/optimization/weights/average_curvature'), ) self.local_velocity_config = local_velocity_profile.VelocityProfileConfig( g=rospy.get_param('/local_motion_planning/velocity_profile/g'), a_x_factor=rospy.get_param('/local_motion_planning/velocity_profile/a_x_factor'), a_y_factor=rospy.get_param('/local_motion_planning/velocity_profile/a_y_factor'), v_max=rospy.get_param('/local_motion_planning/velocity_profile/v_max'), a_x_factor_completed=rospy.get_param('/local_motion_planning/velocity_profile/a_x_factor_completed'), v_end=None if self.mission == AutonomousMissionEnum.BRAKETEST else rospy.get_param( '/local_motion_planning/velocity_profile/v_end'), stretch_time_for_mpc=rospy.get_param('/local_motion_planning/velocity_profile/stretch_time_for_mpc'), mpc_min_time_horizont=rospy.get_param('/local_motion_planning/velocity_profile/mpc_min_time_horizont'), ) self.global_motion_planner_config = global_motion_planner.GlobalMotionPlannerConfig( vehicle_width=rospy.get_param('/global_motion_planning/track/vehicle_width'), cone_width=rospy.get_param('/global_motion_planning/track/cone_width'), safety_distance=rospy.get_param('/global_motion_planning/track/safety_distance'), V_max=rospy.get_param('/global_motion_planning/ggV/v_max'), gg_margin=rospy.get_param('/global_motion_planning/ggV/gg_margin'), w_T=rospy.get_param('/global_motion_planning/optimization/weights/time'), w_jx=rospy.get_param('/global_motion_planning/optimization/weights/jx'), w_jy=rospy.get_param('/global_motion_planning/optimization/weights/jy'), w_dOmega_z=rospy.get_param('/global_motion_planning/optimization/weights/dOmega_z'), preperation_step_size=rospy.get_param('/global_motion_planning/optimization/step_size/preperation'), second_preperation_step_size = rospy.get_param('/global_motion_planning/optimization/step_size/second_preperation'), interpolation_after_optimization_step_size=rospy.get_param('/global_motion_planning/optimization/step_size/interpolation_after_optimization')) rospack = rospkg.RosPack() vehicle_name = rospy.get_param('/vehicle/name') ggv_path = f'{rospack.get_path("utilities")}/config/ggv_constraints/{vehicle_name}.npz' ax_max_limit = rospy.get_param('/global_motion_planning/ggV/ax_max_limit') ax_min_limit = rospy.get_param('/global_motion_planning/ggV/ax_min_limit') ay_max_limit = rospy.get_param('/global_motion_planning/ggV/ay_max_limit') if ax_max_limit == 0.0: ax_max_limit = None if ax_min_limit == 0.0: ax_min_limit = None if ay_max_limit == 0.0: ay_max_limit = None self.gg_handler = gg_manager.GGManager( gg_path=ggv_path, use_linear_ggV=rospy.get_param('/global_motion_planning/ggV/use_linear_ggV'), hard_limit_ax_max=ax_max_limit, hard_limit_ax_min=ax_min_limit, hard_limit_ay_max=ay_max_limit) self.lap_time = 0.0 self.local_planning_calculation_time = 0.0 self.global_planning_calculation_time = 0.0 self.trajectory_publisher = rospy.Publisher('/motion_planning/trajectory', TrajectoryPoints, queue_size=10, latch=True) self.local_trajectory_path_slices_publisher = rospy.Publisher('/motion_planning/local/path_slices', TrajectoryPathSlices, queue_size=10, latch=True) rospy.Subscriber('/filtering/centerpoints', CenterPoints, self.centerpoints_callback, queue_size=2) rospy.Subscriber('/finish_line_detection/mission_completed', MissionFinished, self.mission_completed_callback, queue_size=10) rospy.Subscriber('/slam/vehicle_pose', VehiclePose, self.vehicle_pose_callback, queue_size=10)
[docs] @staticmethod def optimize_globally(pipe: mpc.Connection, global_motion_planner_config: global_motion_planner.GlobalMotionPlannerConfig, gg_handler: gg_manager.GGManager): """ Optimize closed path globally. Parameters ---------- pipe : mpc.Connection Pipe where to receive the centerpoints message from. global_motion_planner_config: global_motion_planner.GlobalMotionPlannerConfig Configuration for global motion planner config. gg_handler: gg_manager.GGManager Class for managing for ggV constraints. """ centerpoints_msg: CenterPoints = pipe.recv() centerpoints = np.array([centerpoints_msg.x, centerpoints_msg.y]).T track_width = np.array([centerpoints_msg.left_track_width, centerpoints_msg.right_track_width]).T global_planner = global_motion_planner.GlobalMotionPlanner( global_motion_planner_config=global_motion_planner_config, gg_handler=gg_handler) global_planner.run(centerpoints, track_width) pipe.send(global_planner)
[docs] def mission_completed_callback(self, msg: MissionFinished): """ Buffer whether the mission is completed calculated by finish line detection and received via ROS. A completed mission signals that we should brake as fast as possible and allowed. Parameters ---------- msg : MissionFinished Message indicating whether mission is completed. """ rospy.loginfo_once('[MotionPlanning] Mission completed, planning to brake as fast as allowed.') self.mission_completed = msg.finished self.set_planning_mode(closed_track=False)
[docs] def vehicle_pose_callback(self, msg: VehiclePose): """ Buffer vehicle pose estimated by SLAM and received via SLAM. Also moves vehicle position from rear axle to center of gravity. .. todo:: Get offset between rear axle and center of gravity from transform tree. Parameters ---------- msg : VehiclePose Current vehicle position estimated by SLAM and received via SLAM. """ with self.vehicle_pose_lock: self.vehicle_pose = msg # Transform vehicle position from rear axle to COG self.vehicle_pose.x = self.vehicle_pose.x + self.lr_cog * np.cos(self.vehicle_pose.psi) self.vehicle_pose.y = self.vehicle_pose.y + self.lr_cog * np.sin(self.vehicle_pose.psi) if self.last_planning_mode == PlanningMode.PARALLEL or self.last_planning_mode == PlanningMode.FINISH: self.plan_locally_event.set()
[docs] def set_planning_mode(self, closed_track: bool): """ Set planning mode according to current state. .. todo:: Enable planning finishing while driving skidpad. Control Node needs to support it first. Parameters ---------- closed_track : bool Indicates whether the track represented by the centerpoints is closed. """ if self.is_skidpad is True: new_planning_mode = PlanningMode.SKIDPAD elif self.mission_completed is True: new_planning_mode = PlanningMode.FINISH elif closed_track is True and self.optimizer is None: new_planning_mode = PlanningMode.PARALLEL elif closed_track is True and self.optimizer is not None: new_planning_mode = PlanningMode.GLOBAL else: new_planning_mode = PlanningMode.LOCAL if self.last_planning_mode != new_planning_mode: rospy.loginfo(f'[MotionPlanning] Switched Planning mode to {new_planning_mode}') self.last_planning_mode = new_planning_mode
[docs] def centerpoints_callback(self, centerpoints_msg: CenterPoints): """ Buffer centerpoints calculated by track filtering and received via ROS. Parameters ---------- centerpoints_msg : CenterPoints Received centerpoints message. """ self.set_planning_mode(closed_track=centerpoints_msg.closed_track) if self.centerpoints_msg is not None and centerpoints_msg.closed_track is True: msg_changed = self.check_msg_changed(self.centerpoints_msg, centerpoints_msg) with self.centerpoints_lock: self.centerpoints_msg = centerpoints_msg if self.is_skidpad: rospy.loginfo('[Motion Planning] Plan skidpad') centerpoints = np.array([centerpoints_msg.x, centerpoints_msg.y]).T trajectory = self.plan_skidpad(centerpoints=centerpoints) self.send_trajectory(trajectory=trajectory, closed=centerpoints_msg.closed_track, centerpoints_image_header=centerpoints_msg.image_header, vehicle_pose_header=centerpoints_msg.vehicle_pose.header) if not self.optimize_skidpad_path: return with self.send_trajectory_lock: if self.optimizer is None: self.plan_locally_event.set() if centerpoints_msg.closed_track is True: if ((self.optimizer is None) or (self.global_motion_planner_config.preperation_step_size != self.global_motion_planner_config.second_preperation_step_size) or (msg_changed is True)): with self.send_trajectory_lock: rospy.loginfo('[Motion Planning] Set event to plan globally') self.plan_globally_event.set()
[docs] def check_msg_changed(self, centerpoints_msg_old, centerpoints_msg): """ Check if the centerpoints message has changed. Parameters ---------- centerpoints_msg_old : CenterPoints Recently received centerpoints message. centerpoints_msg : CenterPoints Received centerpoints message. Returns ------- msg_changed : bool Indicates whether the centerpoints message has changed. """ msg_changed = False if (centerpoints_msg_old.x != centerpoints_msg.x): msg_changed = True elif (centerpoints_msg_old.y != centerpoints_msg.y): msg_changed = True elif (centerpoints_msg_old.left_track_width != centerpoints_msg.left_track_width): msg_changed = True elif (centerpoints_msg_old.right_track_width != centerpoints_msg.right_track_width): msg_changed = True return msg_changed
[docs] def send_local_trajectory_debug_informations(self, centerpoints_image_header: Header, vehicle_pose_header: Header, local_motion_planner_obj: local_motion_planner.LocalMotionPlanner): """ Publish debug informations for local motion planning. Parameters ---------- centerpoints_image_header : Header Header of centerpoints for which the trajectory has been planned. vehicle_pose_header : Header Header of vehicle pose for which the trajectory has been planned. local_motion_planner_obj : local_motion_planner.LocalMotionPlanner Local Motion Planner object which was used to plan the trajectory. """ path_slices = TrajectoryPathSlices() path_slices.header.stamp = rospy.Time.now() path_slices.image_header = centerpoints_image_header path_slices.pose_header = vehicle_pose_header point_indices = np.linspace( 0, self.local_motion_planner_config.support_points_between_nodes_n, 10, dtype=int, endpoint=False) if local_motion_planner_obj.costs is not None: costs = np.c_[local_motion_planner_obj.costs, local_motion_planner_obj.length_prozentual, local_motion_planner_obj.average_sqaured_curvature_prozentual, local_motion_planner_obj.peak_sqaured_curvature_prozentual] path_slices.path_slices = [TrajectoryPathSlice(x=trajectory[point_indices, 0], y=trajectory[point_indices, 1], total_cost=cost[0], length_cost=cost[1], average_squared_curvature_cost=cost[2], peak_squared_curvature_cost=cost[3]) for trajectory, cost in zip(local_motion_planner_obj.splines.swapaxes(0, 1), costs)] else: path_slices.path_slices = [TrajectoryPathSlice(x=local_motion_planner_obj.improved_path[point_indices, 0], y=local_motion_planner_obj.improved_path[point_indices, 1], total_cost=0.5, length_cost=0.5, average_squared_curvature_cost=0.5, peak_squared_curvature_cost=0.5)] self.local_trajectory_path_slices_publisher.publish(path_slices)
[docs] def plan_locally(self): """Plan motion locally as soon as event is set.""" while not rospy.is_shutdown(): self.plan_locally_event.wait() try: start = rospy.Time.now() local_motion_planner_obj, vehicle_pose_header, centerpoints_image_header = self.plan_motion_locally() self.local_planning_calculation_time = (rospy.Time.now() - start).to_sec() with self.send_trajectory_lock: if self.optimizer is None or self.mission_completed is True: stamp = self.send_trajectory(trajectory=local_motion_planner_obj.trajectory, closed=False, centerpoints_image_header=centerpoints_image_header, vehicle_pose_header=vehicle_pose_header) if self.send_debug_informations is True: self.send_local_trajectory_debug_informations(local_motion_planner_obj=local_motion_planner_obj, vehicle_pose_header=vehicle_pose_header, centerpoints_image_header=centerpoints_image_header) except Exception: traceback.print_exc() rospy.logerr('[MotionPlanning] Failed to plan motion locally.')
[docs] def plan_globally(self): """Plan motion globally as soon as event is set.""" while not rospy.is_shutdown(): self.plan_globally_event.wait() rospy.loginfo('[MotionPlanning] Start optimizing globally') start_time = rospy.Time.now() optimizer, trajectory, centerpoints_image_header = self.plan_motion_globally() self.global_planning_calculation_time = (rospy.Time.now() - start_time).to_sec() rospy.loginfo('[MotionPlanning] Finished optimizing globally') with self.send_trajectory_lock: if self.mission_completed is False: with self.optimizer_lock: self.optimizer = optimizer self.set_planning_mode(closed_track=True) self.send_trajectory(trajectory=trajectory, closed=True, centerpoints_image_header=centerpoints_image_header)
[docs] def find_nearest_centerpoint(self, centerpoints: npt.NDArray, vehicle_position: npt.NDArray) -> int: """ Find the nearest centerpoint to the current vehicle position in a list of centerpoint which. Centerpoint must be atleast self.min_local_track_gap away. Parameters ---------- centerpoints : npt.NDArray Centerpoints for which the nearest should be found, shape: (n, 2) with n equal number of centerpoints. vehicle_position : npt.NDArray Current vehicle position, shape: (2, ). Returns ------- int Index oif nearest centerpoint. """ distance_to_vehicle = np.sqrt(np.sum(np.power(centerpoints - vehicle_position, 2), axis=1)) nearest_index = min(np.argmin(distance_to_vehicle) + 1, centerpoints.shape[0] - 1) return np.argmax(distance_to_vehicle[nearest_index:] > self.min_local_track_gap) + nearest_index
[docs] def limit_global_centerpoints_by_rolling(self, centerpoints: npt.NDArray, vehicle_position: npt.NDArray) -> Tuple[int, int]: """ Limit global and closed centerpoints by rolling the centerpoints. First find the nearest centerpoint. Roll centerpoints so that nearest centerpoints is the first one. Calculate the cumulative distance for that an find the index where the cumulative distance is greater than self.max_local_track_length. Parameters ---------- centerpoints : npt.NDArray Centerpoints which should be limited, shape: (n, 2) with n equal number of centerpoints. vehicle_position : npt.NDArray Current vehicle position, shape: (2, ). Returns ------- Tuple[int, int] **Index of nearest centerpoint** of original centerpoints and **Index of last centerpoint** of rolled centerpoints. """ nearest_centerpoint_index = self.find_nearest_centerpoint(centerpoints=centerpoints, vehicle_position=vehicle_position) rolled_centerpoints = np.roll(centerpoints, -(nearest_centerpoint_index), axis=0) cum_distance = np.cumsum(np.sqrt(np.sum(np.power(np.diff(rolled_centerpoints, axis=0, prepend=rolled_centerpoints[:1]), 2), axis=1))) return -(nearest_centerpoint_index), np.argmax(cum_distance > self.max_local_track_length)
[docs] def limit_global_centerpoints_by_cutting(self, centerpoints: npt.NDArray, vehicle_position: npt.NDArray, max_length: bool = True) -> Tuple[int, Optional[int]]: """ Limit global and open centerpoints by cutting the centerpoints. First find the nearest centerpoint. All centerpoints forward are kept, except when max_length is defined as centerline would then be longer than self.max_local_track_length. In this case they get cut too. Parameters ---------- centerpoints : npt.NDArray Centerpoints which should be limited, shape, (n, 2) with n equal number of centerpoints. vehicle_position : npt.NDArray Current vehicle position, shape: (2, ). max_length : bool, optional Indicates whether centerline should not be longer than self.max_local_track_length, by default True Returns ------- Tuple[int, Optional[int]] **Index of nearest centerpoint** and **Index of last centerpoint** Index of last centerpoint is None if max_length is not defined or centerline is not longer self.max_local_track_length. """ nearest_centerpoint_index = self.find_nearest_centerpoint(centerpoints=centerpoints, vehicle_position=vehicle_position) if max_length is True: cut_centerpoints = centerpoints[nearest_centerpoint_index:] cum_distance = np.cumsum(np.sqrt(np.sum(np.power(np.diff(cut_centerpoints, axis=0, prepend=cut_centerpoints[:1]), 2), axis=1))) last_index = np.argmax(cum_distance > self.max_local_track_length) if last_index == 0: last_index = None else: last_index = None return (nearest_centerpoint_index), last_index
[docs] def plan_motion_locally(self) -> Tuple[local_motion_planner.LocalMotionPlanner, Header, Header]: """ Plan and calculate trajectory for a local track based on centerpoints. Returns ------- Tuple[local_motion_planner.LocalMotionPlanner, Header, Header] **LocalMotionPlanner object** used for planning, **Header of Vehicle Pose** used for planning and **Header of Centerpoints** used for planning """ with self.centerpoints_lock: centerpoints_msg: CenterPoints = self.centerpoints_msg self.plan_locally_event.clear() centerpoints = np.array([centerpoints_msg.x, centerpoints_msg.y]).T track_width = np.array([centerpoints_msg.left_track_width, centerpoints_msg.right_track_width]).T if centerpoints_msg.closed_track is True or self.last_planning_mode == PlanningMode.FINISH: with self.vehicle_pose_lock: vehicle_position = np.array([self.vehicle_pose.x, self.vehicle_pose.y]) vehicle_v = self.vehicle_pose.v_x vehicle_psi = self.vehicle_pose.psi vehicle_pose_header = self.vehicle_pose.header if centerpoints_msg.closed_track is True: roll, end = self.limit_global_centerpoints_by_rolling(centerpoints=centerpoints, vehicle_position=vehicle_position) centerpoints = np.roll(centerpoints, roll, axis=0)[:end] track_width = np.roll(track_width, roll, axis=0)[:end] else: start, end = self.limit_global_centerpoints_by_cutting(centerpoints=centerpoints, vehicle_position=vehicle_position) centerpoints = centerpoints[start:][:end] track_width = track_width[start:][:end] else: vehicle_position = np.array([centerpoints_msg.vehicle_pose.x, centerpoints_msg.vehicle_pose.y]) vehicle_v = centerpoints_msg.vehicle_pose.v_x vehicle_psi = centerpoints_msg.vehicle_pose.psi vehicle_pose_header = centerpoints_msg.vehicle_pose.header start, end = self.limit_global_centerpoints_by_cutting(centerpoints=centerpoints, vehicle_position=vehicle_position, max_length=False) centerpoints = centerpoints[start:][:end] track_width = track_width[start:][:end] local_motion_planner_obj = local_motion_planner.LocalMotionPlanner( local_motion_planner_config=self.local_motion_planner_config, velocity_profile_config=self.local_velocity_config, centerpoints=centerpoints, track_width=track_width, vehicle_v=vehicle_v, vehicle_psi=vehicle_psi, vehicle_position=vehicle_position, mission_completed=self.mission_completed) local_motion_planner_obj.plan() return local_motion_planner_obj, vehicle_pose_header, centerpoints_msg.image_header
[docs] def plan_motion_globally(self) -> Tuple[global_motion_planner.GlobalMotionPlanner, npt.NDArray, Header]: """ Plan and calculate trajectory for a global and closed track based on centerpoints. Returns ------- Tuple[global_optimization.GlobalOptimizer, npt.NDArray, Header] **GlobalOptimizer** used for Optimization, **global optimized trajectory** (shape: (n, 3) with n equal number of centerpoints) and **header of the centerpoints** for which the trajectory has been calculated for. """ with self.centerpoints_lock: centerpoints_msg = self.centerpoints_msg self.plan_globally_event.clear() parent_conn, child_conn = mp.Pipe() parent_conn.send(centerpoints_msg) process = mp.Process(target=self.optimize_globally, args=(child_conn, self.global_motion_planner_config, self.gg_handler)) process.start() global_planner: global_motion_planner.GlobalMotionPlanner = parent_conn.recv() process.join() global_trajectory = global_planner.trajectory self.lap_time = global_planner.laptime # set a higher resolution for the second optimization, because it's okay if this takes longer self.global_motion_planner_config.preperation_step_size = self.global_motion_planner_config.second_preperation_step_size return global_planner, global_trajectory, centerpoints_msg.image_header
[docs] def plan_skidpad(self, centerpoints: npt.NDArray) -> npt.NDArray: """ Calculate trajectory for skidpad based on centerpoints and maximum lateral acceleration. Parameters ---------- centerpoints : npt.NDArray Centerpoints for which the velocity should be calculated, shape: (n, 2) with n equal number of centerpoints. Returns ------- npt.NDArray Trajectory contianing centerpoints and velocity, shape: (n, 3) with n equal number of centerpoints. """ velocity_obj = local_velocity_profile.VelocityProfile(path=centerpoints, velocity_profile_config=self.local_velocity_config) velocity_obj.limit_v_max_for_skidpad() optimal_velocity = velocity_obj.calculate(v_end=0, v_initial=0) global_trajectory = np.c_[centerpoints, optimal_velocity] return global_trajectory
[docs] def send_trajectory(self, trajectory: npt.NDArray, closed: bool, centerpoints_image_header: Header, vehicle_pose_header: Optional[Header] = None) -> rospy.Time: """ Publish planned trajectory on ROS. Parameters ---------- trajectory : npt.NDArray Trajectory to be published, shape: (n, 3) with n equal number of points of trajectory. closed : bool Indicates whether the trajectory is closed. centerpoints_image_header : Header Header of the centerpoints for which the trajectory has been planned. vehicle_pose_header : Optional[Header], optional Header of the vehicle pose for which the trajectory has been planned, by default None Returns ------- rospy.Time Time of trajectory header. """ trajectory_msg = TrajectoryPoints() trajectory_msg.closed = closed trajectory_msg.x = trajectory[:, 0].tolist() trajectory_msg.y = trajectory[:, 1].tolist() trajectory_msg.v = trajectory[:, 2].tolist() trajectory_msg.lap_time = self.lap_time trajectory_msg.local_planning_calculation_time = self.local_planning_calculation_time trajectory_msg.global_planning_calculation_time = self.global_planning_calculation_time trajectory_msg.header.stamp = rospy.Time.now() trajectory_msg.header.frame_id = 'map' trajectory_msg.image_header = centerpoints_image_header if vehicle_pose_header is not None: trajectory_msg.pose_header = vehicle_pose_header self.trajectory_publisher.publish(trajectory_msg) return trajectory_msg.header.stamp
if __name__ == '__main__': rospy.init_node('motion_planning', anonymous=False) try: rospy.loginfo('[MotionPlanning] MotionPlanning Node started.') motion_planning = MotionPlanning() rospy.spin() except Exception as e: if not isinstance(e, rospy.exceptions.ROSInterruptException): rospy.logfatal(traceback.format_exc()) time.sleep(2) raise e