#!/usr/bin/env python3
"""Motion Planning ROS node to plan a trajectory (path + velocity) based on centerpoints and vehicle pose."""
import threading
import traceback
import time
from typing import Optional, Tuple
import numpy as np
import numpy.typing as npt
import rospy
import rospkg
import multiprocessing as mp
import multiprocessing.connection as mpc
from enum import Enum
from std_msgs.msg import Header
from utilities.msg import CenterPoints, TrajectoryPoints, VehiclePose, TrajectoryPathSlice, TrajectoryPathSlices
from as_can_msgs.msg import MissionFinished
from local_motion import local_motion_planner, velocity_profile as local_velocity_profile
from global_motion import global_motion_planner
from planning_lib import gg_manager
[docs]class AutonomousMissionEnum(Enum):
"""Enum to describe which Autonomous Mission is currently select."""
MANUAL_DRIVING = 0
ACCELERATION = 1
SKIDPAD = 2
TRACKDRIVE = 3
BRAKETEST = 4
INSPECTION = 5
AUTOCROSS = 6
[docs]class PlanningMode(Enum):
"""
Enum to describe which Planning Mode is currently used.
.. csv-table:: Planning Modes
:header: "Value", "Name", "Meaning"
0, NO, "Nothing is planned yet"
1, LOCAL, "Currently only local tracks are planned"
2, PARALLEL, "Currently a global track is optimized and parallel local tracks are still planned"
3, GLOBAL, "A global, closed track is already been optimized, either optimizing another or waiting for one"
4, SKIDPAD, "Global skidpad tracks are being planned"
5, FINISH, "Motion is planned to brake as fast as possible and allowed"
"""
NO = 0
LOCAL = 1
PARALLEL = 2
GLOBAL = 3
SKIDPAD = 4
FINISH = 5
[docs]class MotionPlanning():
"""
Motion Planning ROS node to plan a trajectory (path + velocity) based on centerpoints and vehicle pose.
Attributes
----------
mission_completed : bool
Indicates whether the mission is already completed.
vehicle_pose : VehiclePose
Current vehicle pose estimated by SLAM.
vehicle_pose_lock : threading.Lock
Mutual exclusion lock for current vehicle pose estimated by SLAM.
centerpoints_msg : CenterPoints
Latest centerpoints filtered by track filtering.
centerpoints_lock : threading.Lock
Mutual exclusion lock for latest centerpoints filtered by track filtering.
send_trajectory_lock : threading.Lock
Mutual exclusion lock for publishing trajectory. Used to ensure that
**not** a local trajectory is send after a global trajectory by mistake.
optimizer_lock : threading.Lock
Mutual exclusion lock for optimizing a global track. Check what it is used for.
plan_locally_event : threading.Event
Threading event to signalize that a new centerpoints have been received for which should be planned locally.
plan_globally_event : threading.Event
Threading event to signalize that a new centerpoints have been received for which should be planned globally.
local_thread : threading.Thread
Thread for planning locally.
global_thread : threading.Thread
Thread for planning globally.
optimizer : Optional[global_optimization.GlobalOptimizer]
Global Optimizer which has been used for optimizing a global trajectory successfully.
send_debug_informations : bool
Indicates whether debug informations should be send via ROS.
last_planning_mode : PlanningMode
Planning mode of last Motion Planning.
mission : AutonomousMissionEnum
Mission which the vehicle currently drives.
optimize_skidpad_path : bool
Whether to only calculate the velocity profile for skidpad or also optimize the path for skidpad.
is_skidpad : bool
Indicates whether the current autonomous mission is skidpad.
max_local_track_length : float
Maximal track length for local paths for when they are limited.
min_local_track_gap : float
Mnimum gap between the nearest centerpoint and the vehicle for when centerpoints are limited.
lr_cog : float
Offset between rear axle and center of gravity (cog) of vehicle. Used for moving vehicle position to cog.
trajectory_publisher : rospy.Publisher
ROS Publisher to publish trajectories.
local_trajectory_path_slices_publisher : rospy.Publisher
ROS Publisher to publish debug informations for local motion planner
local_motion_planner_config : local_motion_planner.LocalMotionPlannerConfig
Configuration for local motion planner.
global_optimizer_config : global_optimization.GlobalOptimizerConfig
Configuration for global optimizer.
global_velocity_config : global_velocity.VelocityConfig
Configuration for velocity profiler for global planning.
local_velocity_config : local_velocity_profile.VelocityProfileConfig
Configuration for velocity profiler for local planning.
"""
def __init__(self) -> None:
"""Initialize motion planning ROS node."""
self.mission_completed = False
self.vehicle_pose = VehiclePose()
self.centerpoints_lock = threading.Lock()
self.vehicle_pose_lock = threading.Lock()
self.send_trajectory_lock = threading.Lock()
self.optimizer_lock = threading.Lock()
self.plan_locally_event = threading.Event()
self.plan_globally_event = threading.Event()
self.local_thread = threading.Thread(target=self.plan_locally, daemon=True)
self.local_thread.start()
self.global_thread = threading.Thread(target=self.plan_globally, daemon=True)
self.global_thread.start()
self.optimizer = None
self.centerpoints_msg = None
self.send_debug_informations = rospy.get_param('/motion_planning/send_debug_informations', default=False)
self.last_planning_mode = PlanningMode(0)
self.mission = AutonomousMissionEnum(rospy.get_param('/mission'))
self.optimize_skidpad_path = rospy.get_param('/local_motion_planning/skidpad/optimize_path')
self.is_skidpad = self.mission == AutonomousMissionEnum.SKIDPAD
self.max_local_track_length = rospy.get_param(
'/local_motion_planning/limit_global_centerpoints/max_track_length')
self.min_local_track_gap = rospy.get_param('/local_motion_planning/limit_global_centerpoints/min_track_gap')
self.lr_cog = rospy.get_param('model/loads/lr_cog')
self.local_motion_planner_config = local_motion_planner.LocalMotionPlannerConfig(
max_length_between_layer=rospy.get_param('/local_motion_planning/interpolation/max_length_between_layers'),
support_points_between_nodes_n=rospy.get_param(
'/local_motion_planning/interpolation/support_points_between_nodes_n'),
support_points_for_trajectory_n=rospy.get_param(
'/local_motion_planning/interpolation/support_points_for_trajectory_n'),
support_points_for_initial_interpolation_n=rospy.get_param(
'/local_motion_planning/interpolation/support_points_for_initial_interpolation_n'),
smooth_final_path=rospy.get_param('/local_motion_planning/interpolation/smooth_final_path'),
nodes_per_layer=rospy.get_param('/local_motion_planning/interpolation/nodes_per_layer'),
vehicle_width=rospy.get_param('/local_motion_planning/track_width/vehicle_width'),
buffer_to_trackbounds=rospy.get_param('/local_motion_planning/track_width/buffer_to_trackbounds'),
weight_length=rospy.get_param('/local_motion_planning/optimization/weights/length'),
weight_max_curvature=rospy.get_param('/local_motion_planning/optimization/weights/max_curvature'),
weight_average_curvature=rospy.get_param('/local_motion_planning/optimization/weights/average_curvature'),
)
self.local_velocity_config = local_velocity_profile.VelocityProfileConfig(
g=rospy.get_param('/local_motion_planning/velocity_profile/g'),
a_x_factor=rospy.get_param('/local_motion_planning/velocity_profile/a_x_factor'),
a_y_factor=rospy.get_param('/local_motion_planning/velocity_profile/a_y_factor'),
v_max=rospy.get_param('/local_motion_planning/velocity_profile/v_max'),
a_x_factor_completed=rospy.get_param('/local_motion_planning/velocity_profile/a_x_factor_completed'),
v_end=None if self.mission == AutonomousMissionEnum.BRAKETEST else rospy.get_param(
'/local_motion_planning/velocity_profile/v_end'),
stretch_time_for_mpc=rospy.get_param('/local_motion_planning/velocity_profile/stretch_time_for_mpc'),
mpc_min_time_horizont=rospy.get_param('/local_motion_planning/velocity_profile/mpc_min_time_horizont'),
)
self.global_motion_planner_config = global_motion_planner.GlobalMotionPlannerConfig(
vehicle_width=rospy.get_param('/global_motion_planning/track/vehicle_width'),
cone_width=rospy.get_param('/global_motion_planning/track/cone_width'),
safety_distance=rospy.get_param('/global_motion_planning/track/safety_distance'),
V_max=rospy.get_param('/global_motion_planning/ggV/v_max'),
gg_margin=rospy.get_param('/global_motion_planning/ggV/gg_margin'),
w_T=rospy.get_param('/global_motion_planning/optimization/weights/time'),
w_jx=rospy.get_param('/global_motion_planning/optimization/weights/jx'),
w_jy=rospy.get_param('/global_motion_planning/optimization/weights/jy'),
w_dOmega_z=rospy.get_param('/global_motion_planning/optimization/weights/dOmega_z'),
preperation_step_size=rospy.get_param('/global_motion_planning/optimization/step_size/preperation'),
second_preperation_step_size = rospy.get_param('/global_motion_planning/optimization/step_size/second_preperation'),
interpolation_after_optimization_step_size=rospy.get_param('/global_motion_planning/optimization/step_size/interpolation_after_optimization'))
rospack = rospkg.RosPack()
vehicle_name = rospy.get_param('/vehicle/name')
ggv_path = f'{rospack.get_path("utilities")}/config/ggv_constraints/{vehicle_name}.npz'
ax_max_limit = rospy.get_param('/global_motion_planning/ggV/ax_max_limit')
ax_min_limit = rospy.get_param('/global_motion_planning/ggV/ax_min_limit')
ay_max_limit = rospy.get_param('/global_motion_planning/ggV/ay_max_limit')
if ax_max_limit == 0.0:
ax_max_limit = None
if ax_min_limit == 0.0:
ax_min_limit = None
if ay_max_limit == 0.0:
ay_max_limit = None
self.gg_handler = gg_manager.GGManager(
gg_path=ggv_path,
use_linear_ggV=rospy.get_param('/global_motion_planning/ggV/use_linear_ggV'),
hard_limit_ax_max=ax_max_limit,
hard_limit_ax_min=ax_min_limit,
hard_limit_ay_max=ay_max_limit)
self.lap_time = 0.0
self.local_planning_calculation_time = 0.0
self.global_planning_calculation_time = 0.0
self.trajectory_publisher = rospy.Publisher('/motion_planning/trajectory', TrajectoryPoints,
queue_size=10, latch=True)
self.local_trajectory_path_slices_publisher = rospy.Publisher('/motion_planning/local/path_slices',
TrajectoryPathSlices, queue_size=10, latch=True)
rospy.Subscriber('/filtering/centerpoints', CenterPoints, self.centerpoints_callback, queue_size=2)
rospy.Subscriber('/finish_line_detection/mission_completed', MissionFinished,
self.mission_completed_callback, queue_size=10)
rospy.Subscriber('/slam/vehicle_pose', VehiclePose, self.vehicle_pose_callback, queue_size=10)
[docs] @staticmethod
def optimize_globally(pipe: mpc.Connection, global_motion_planner_config: global_motion_planner.GlobalMotionPlannerConfig,
gg_handler: gg_manager.GGManager):
"""
Optimize closed path globally.
Parameters
----------
pipe : mpc.Connection
Pipe where to receive the centerpoints message from.
global_motion_planner_config: global_motion_planner.GlobalMotionPlannerConfig
Configuration for global motion planner config.
gg_handler: gg_manager.GGManager
Class for managing for ggV constraints.
"""
centerpoints_msg: CenterPoints = pipe.recv()
centerpoints = np.array([centerpoints_msg.x, centerpoints_msg.y]).T
track_width = np.array([centerpoints_msg.left_track_width, centerpoints_msg.right_track_width]).T
global_planner = global_motion_planner.GlobalMotionPlanner(
global_motion_planner_config=global_motion_planner_config,
gg_handler=gg_handler)
global_planner.run(centerpoints, track_width)
pipe.send(global_planner)
[docs] def mission_completed_callback(self, msg: MissionFinished):
"""
Buffer whether the mission is completed calculated by finish line detection and received via ROS.
A completed mission signals that we should brake as fast as possible and allowed.
Parameters
----------
msg : MissionFinished
Message indicating whether mission is completed.
"""
rospy.loginfo_once('[MotionPlanning] Mission completed, planning to brake as fast as allowed.')
self.mission_completed = msg.finished
self.set_planning_mode(closed_track=False)
[docs] def vehicle_pose_callback(self, msg: VehiclePose):
"""
Buffer vehicle pose estimated by SLAM and received via SLAM.
Also moves vehicle position from rear axle to center of gravity.
.. todo:: Get offset between rear axle and center of gravity from transform tree.
Parameters
----------
msg : VehiclePose
Current vehicle position estimated by SLAM and received via SLAM.
"""
with self.vehicle_pose_lock:
self.vehicle_pose = msg
# Transform vehicle position from rear axle to COG
self.vehicle_pose.x = self.vehicle_pose.x + self.lr_cog * np.cos(self.vehicle_pose.psi)
self.vehicle_pose.y = self.vehicle_pose.y + self.lr_cog * np.sin(self.vehicle_pose.psi)
if self.last_planning_mode == PlanningMode.PARALLEL or self.last_planning_mode == PlanningMode.FINISH:
self.plan_locally_event.set()
[docs] def set_planning_mode(self, closed_track: bool):
"""
Set planning mode according to current state.
.. todo:: Enable planning finishing while driving skidpad. Control Node needs to support it first.
Parameters
----------
closed_track : bool
Indicates whether the track represented by the centerpoints is closed.
"""
if self.is_skidpad is True:
new_planning_mode = PlanningMode.SKIDPAD
elif self.mission_completed is True:
new_planning_mode = PlanningMode.FINISH
elif closed_track is True and self.optimizer is None:
new_planning_mode = PlanningMode.PARALLEL
elif closed_track is True and self.optimizer is not None:
new_planning_mode = PlanningMode.GLOBAL
else:
new_planning_mode = PlanningMode.LOCAL
if self.last_planning_mode != new_planning_mode:
rospy.loginfo(f'[MotionPlanning] Switched Planning mode to {new_planning_mode}')
self.last_planning_mode = new_planning_mode
[docs] def centerpoints_callback(self, centerpoints_msg: CenterPoints):
"""
Buffer centerpoints calculated by track filtering and received via ROS.
Parameters
----------
centerpoints_msg : CenterPoints
Received centerpoints message.
"""
self.set_planning_mode(closed_track=centerpoints_msg.closed_track)
if self.centerpoints_msg is not None and centerpoints_msg.closed_track is True:
msg_changed = self.check_msg_changed(self.centerpoints_msg, centerpoints_msg)
with self.centerpoints_lock:
self.centerpoints_msg = centerpoints_msg
if self.is_skidpad:
rospy.loginfo('[Motion Planning] Plan skidpad')
centerpoints = np.array([centerpoints_msg.x, centerpoints_msg.y]).T
trajectory = self.plan_skidpad(centerpoints=centerpoints)
self.send_trajectory(trajectory=trajectory, closed=centerpoints_msg.closed_track,
centerpoints_image_header=centerpoints_msg.image_header,
vehicle_pose_header=centerpoints_msg.vehicle_pose.header)
if not self.optimize_skidpad_path:
return
with self.send_trajectory_lock:
if self.optimizer is None:
self.plan_locally_event.set()
if centerpoints_msg.closed_track is True:
if ((self.optimizer is None) or
(self.global_motion_planner_config.preperation_step_size != self.global_motion_planner_config.second_preperation_step_size) or
(msg_changed is True)):
with self.send_trajectory_lock:
rospy.loginfo('[Motion Planning] Set event to plan globally')
self.plan_globally_event.set()
[docs] def check_msg_changed(self, centerpoints_msg_old, centerpoints_msg):
""" Check if the centerpoints message has changed.
Parameters
----------
centerpoints_msg_old : CenterPoints
Recently received centerpoints message.
centerpoints_msg : CenterPoints
Received centerpoints message.
Returns
-------
msg_changed : bool
Indicates whether the centerpoints message has changed.
"""
msg_changed = False
if (centerpoints_msg_old.x != centerpoints_msg.x):
msg_changed = True
elif (centerpoints_msg_old.y != centerpoints_msg.y):
msg_changed = True
elif (centerpoints_msg_old.left_track_width != centerpoints_msg.left_track_width):
msg_changed = True
elif (centerpoints_msg_old.right_track_width != centerpoints_msg.right_track_width):
msg_changed = True
return msg_changed
[docs] def plan_locally(self):
"""Plan motion locally as soon as event is set."""
while not rospy.is_shutdown():
self.plan_locally_event.wait()
try:
start = rospy.Time.now()
local_motion_planner_obj, vehicle_pose_header, centerpoints_image_header = self.plan_motion_locally()
self.local_planning_calculation_time = (rospy.Time.now() - start).to_sec()
with self.send_trajectory_lock:
if self.optimizer is None or self.mission_completed is True:
stamp = self.send_trajectory(trajectory=local_motion_planner_obj.trajectory, closed=False,
centerpoints_image_header=centerpoints_image_header,
vehicle_pose_header=vehicle_pose_header)
if self.send_debug_informations is True:
self.send_local_trajectory_debug_informations(local_motion_planner_obj=local_motion_planner_obj,
vehicle_pose_header=vehicle_pose_header,
centerpoints_image_header=centerpoints_image_header)
except Exception:
traceback.print_exc()
rospy.logerr('[MotionPlanning] Failed to plan motion locally.')
[docs] def plan_globally(self):
"""Plan motion globally as soon as event is set."""
while not rospy.is_shutdown():
self.plan_globally_event.wait()
rospy.loginfo('[MotionPlanning] Start optimizing globally')
start_time = rospy.Time.now()
optimizer, trajectory, centerpoints_image_header = self.plan_motion_globally()
self.global_planning_calculation_time = (rospy.Time.now() - start_time).to_sec()
rospy.loginfo('[MotionPlanning] Finished optimizing globally')
with self.send_trajectory_lock:
if self.mission_completed is False:
with self.optimizer_lock:
self.optimizer = optimizer
self.set_planning_mode(closed_track=True)
self.send_trajectory(trajectory=trajectory, closed=True,
centerpoints_image_header=centerpoints_image_header)
[docs] def find_nearest_centerpoint(self, centerpoints: npt.NDArray, vehicle_position: npt.NDArray) -> int:
"""
Find the nearest centerpoint to the current vehicle position in a list of centerpoint which.
Centerpoint must be atleast self.min_local_track_gap away.
Parameters
----------
centerpoints : npt.NDArray
Centerpoints for which the nearest should be found, shape: (n, 2) with n equal number of centerpoints.
vehicle_position : npt.NDArray
Current vehicle position, shape: (2, ).
Returns
-------
int
Index oif nearest centerpoint.
"""
distance_to_vehicle = np.sqrt(np.sum(np.power(centerpoints - vehicle_position, 2), axis=1))
nearest_index = min(np.argmin(distance_to_vehicle) + 1, centerpoints.shape[0] - 1)
return np.argmax(distance_to_vehicle[nearest_index:] > self.min_local_track_gap) + nearest_index
[docs] def limit_global_centerpoints_by_rolling(self, centerpoints: npt.NDArray,
vehicle_position: npt.NDArray) -> Tuple[int, int]:
"""
Limit global and closed centerpoints by rolling the centerpoints.
First find the nearest centerpoint. Roll centerpoints so that nearest centerpoints is the first one.
Calculate the cumulative distance for that an find the index
where the cumulative distance is greater than self.max_local_track_length.
Parameters
----------
centerpoints : npt.NDArray
Centerpoints which should be limited, shape: (n, 2) with n equal number of centerpoints.
vehicle_position : npt.NDArray
Current vehicle position, shape: (2, ).
Returns
-------
Tuple[int, int]
**Index of nearest centerpoint** of original centerpoints and
**Index of last centerpoint** of rolled centerpoints.
"""
nearest_centerpoint_index = self.find_nearest_centerpoint(centerpoints=centerpoints,
vehicle_position=vehicle_position)
rolled_centerpoints = np.roll(centerpoints, -(nearest_centerpoint_index), axis=0)
cum_distance = np.cumsum(np.sqrt(np.sum(np.power(np.diff(rolled_centerpoints, axis=0,
prepend=rolled_centerpoints[:1]), 2), axis=1)))
return -(nearest_centerpoint_index), np.argmax(cum_distance > self.max_local_track_length)
[docs] def limit_global_centerpoints_by_cutting(self, centerpoints: npt.NDArray, vehicle_position: npt.NDArray,
max_length: bool = True) -> Tuple[int, Optional[int]]:
"""
Limit global and open centerpoints by cutting the centerpoints.
First find the nearest centerpoint. All centerpoints forward are kept,
except when max_length is defined as centerline would then be longer than self.max_local_track_length.
In this case they get cut too.
Parameters
----------
centerpoints : npt.NDArray
Centerpoints which should be limited, shape, (n, 2) with n equal number of centerpoints.
vehicle_position : npt.NDArray
Current vehicle position, shape: (2, ).
max_length : bool, optional
Indicates whether centerline should not be longer than self.max_local_track_length, by default True
Returns
-------
Tuple[int, Optional[int]]
**Index of nearest centerpoint** and
**Index of last centerpoint**
Index of last centerpoint is None if max_length is not defined
or centerline is not longer self.max_local_track_length.
"""
nearest_centerpoint_index = self.find_nearest_centerpoint(centerpoints=centerpoints,
vehicle_position=vehicle_position)
if max_length is True:
cut_centerpoints = centerpoints[nearest_centerpoint_index:]
cum_distance = np.cumsum(np.sqrt(np.sum(np.power(np.diff(cut_centerpoints, axis=0,
prepend=cut_centerpoints[:1]), 2), axis=1)))
last_index = np.argmax(cum_distance > self.max_local_track_length)
if last_index == 0:
last_index = None
else:
last_index = None
return (nearest_centerpoint_index), last_index
[docs] def plan_motion_locally(self) -> Tuple[local_motion_planner.LocalMotionPlanner, Header, Header]:
"""
Plan and calculate trajectory for a local track based on centerpoints.
Returns
-------
Tuple[local_motion_planner.LocalMotionPlanner, Header, Header]
**LocalMotionPlanner object** used for planning,
**Header of Vehicle Pose** used for planning and
**Header of Centerpoints** used for planning
"""
with self.centerpoints_lock:
centerpoints_msg: CenterPoints = self.centerpoints_msg
self.plan_locally_event.clear()
centerpoints = np.array([centerpoints_msg.x, centerpoints_msg.y]).T
track_width = np.array([centerpoints_msg.left_track_width, centerpoints_msg.right_track_width]).T
if centerpoints_msg.closed_track is True or self.last_planning_mode == PlanningMode.FINISH:
with self.vehicle_pose_lock:
vehicle_position = np.array([self.vehicle_pose.x, self.vehicle_pose.y])
vehicle_v = self.vehicle_pose.v_x
vehicle_psi = self.vehicle_pose.psi
vehicle_pose_header = self.vehicle_pose.header
if centerpoints_msg.closed_track is True:
roll, end = self.limit_global_centerpoints_by_rolling(centerpoints=centerpoints,
vehicle_position=vehicle_position)
centerpoints = np.roll(centerpoints, roll, axis=0)[:end]
track_width = np.roll(track_width, roll, axis=0)[:end]
else:
start, end = self.limit_global_centerpoints_by_cutting(centerpoints=centerpoints,
vehicle_position=vehicle_position)
centerpoints = centerpoints[start:][:end]
track_width = track_width[start:][:end]
else:
vehicle_position = np.array([centerpoints_msg.vehicle_pose.x, centerpoints_msg.vehicle_pose.y])
vehicle_v = centerpoints_msg.vehicle_pose.v_x
vehicle_psi = centerpoints_msg.vehicle_pose.psi
vehicle_pose_header = centerpoints_msg.vehicle_pose.header
start, end = self.limit_global_centerpoints_by_cutting(centerpoints=centerpoints,
vehicle_position=vehicle_position,
max_length=False)
centerpoints = centerpoints[start:][:end]
track_width = track_width[start:][:end]
local_motion_planner_obj = local_motion_planner.LocalMotionPlanner(
local_motion_planner_config=self.local_motion_planner_config,
velocity_profile_config=self.local_velocity_config,
centerpoints=centerpoints, track_width=track_width, vehicle_v=vehicle_v,
vehicle_psi=vehicle_psi, vehicle_position=vehicle_position,
mission_completed=self.mission_completed)
local_motion_planner_obj.plan()
return local_motion_planner_obj, vehicle_pose_header, centerpoints_msg.image_header
[docs] def plan_motion_globally(self) -> Tuple[global_motion_planner.GlobalMotionPlanner, npt.NDArray, Header]:
"""
Plan and calculate trajectory for a global and closed track based on centerpoints.
Returns
-------
Tuple[global_optimization.GlobalOptimizer, npt.NDArray, Header]
**GlobalOptimizer** used for Optimization,
**global optimized trajectory** (shape: (n, 3) with n equal number of centerpoints)
and **header of the centerpoints** for which the trajectory has been calculated for.
"""
with self.centerpoints_lock:
centerpoints_msg = self.centerpoints_msg
self.plan_globally_event.clear()
parent_conn, child_conn = mp.Pipe()
parent_conn.send(centerpoints_msg)
process = mp.Process(target=self.optimize_globally, args=(child_conn, self.global_motion_planner_config,
self.gg_handler))
process.start()
global_planner: global_motion_planner.GlobalMotionPlanner = parent_conn.recv()
process.join()
global_trajectory = global_planner.trajectory
self.lap_time = global_planner.laptime
# set a higher resolution for the second optimization, because it's okay if this takes longer
self.global_motion_planner_config.preperation_step_size = self.global_motion_planner_config.second_preperation_step_size
return global_planner, global_trajectory, centerpoints_msg.image_header
[docs] def plan_skidpad(self, centerpoints: npt.NDArray) -> npt.NDArray:
"""
Calculate trajectory for skidpad based on centerpoints and maximum lateral acceleration.
Parameters
----------
centerpoints : npt.NDArray
Centerpoints for which the velocity should be calculated, shape: (n, 2) with n equal number of centerpoints.
Returns
-------
npt.NDArray
Trajectory contianing centerpoints and velocity, shape: (n, 3) with n equal number of centerpoints.
"""
velocity_obj = local_velocity_profile.VelocityProfile(path=centerpoints,
velocity_profile_config=self.local_velocity_config)
velocity_obj.limit_v_max_for_skidpad()
optimal_velocity = velocity_obj.calculate(v_end=0, v_initial=0)
global_trajectory = np.c_[centerpoints, optimal_velocity]
return global_trajectory
[docs] def send_trajectory(self, trajectory: npt.NDArray, closed: bool, centerpoints_image_header: Header,
vehicle_pose_header: Optional[Header] = None) -> rospy.Time:
"""
Publish planned trajectory on ROS.
Parameters
----------
trajectory : npt.NDArray
Trajectory to be published, shape: (n, 3) with n equal number of points of trajectory.
closed : bool
Indicates whether the trajectory is closed.
centerpoints_image_header : Header
Header of the centerpoints for which the trajectory has been planned.
vehicle_pose_header : Optional[Header], optional
Header of the vehicle pose for which the trajectory has been planned, by default None
Returns
-------
rospy.Time
Time of trajectory header.
"""
trajectory_msg = TrajectoryPoints()
trajectory_msg.closed = closed
trajectory_msg.x = trajectory[:, 0].tolist()
trajectory_msg.y = trajectory[:, 1].tolist()
trajectory_msg.v = trajectory[:, 2].tolist()
trajectory_msg.lap_time = self.lap_time
trajectory_msg.local_planning_calculation_time = self.local_planning_calculation_time
trajectory_msg.global_planning_calculation_time = self.global_planning_calculation_time
trajectory_msg.header.stamp = rospy.Time.now()
trajectory_msg.header.frame_id = 'map'
trajectory_msg.image_header = centerpoints_image_header
if vehicle_pose_header is not None:
trajectory_msg.pose_header = vehicle_pose_header
self.trajectory_publisher.publish(trajectory_msg)
return trajectory_msg.header.stamp
if __name__ == '__main__':
rospy.init_node('motion_planning', anonymous=False)
try:
rospy.loginfo('[MotionPlanning] MotionPlanning Node started.')
motion_planning = MotionPlanning()
rospy.spin()
except Exception as e:
if not isinstance(e, rospy.exceptions.ROSInterruptException):
rospy.logfatal(traceback.format_exc())
time.sleep(2)
raise e