Source code for finish_line_detection

#!/usr/bin/python3
"""ROS module to detect finish line in landmarks. Publish finish line, count laps and announce completed mission."""
import traceback
import time
from enum import Enum
from typing import Optional

import numpy as np
import numpy.typing as npt

import cv2

import rospy
from visualization_msgs.msg import Marker, MarkerArray
from geometry_msgs.msg import Point
from utilities.msg import ConeListWithCovariance, VehiclePose, MapAlignment
from as_can_msgs.msg import LapCount, MissionFinished


[docs]class AutonomousMissionEnum(Enum): """ Enum to describe which Autonomous Mission is currently select. .. csv-table:: Mission IDs :header: "Mission ID", "Mission Name" 0, Manual Driving 1, Acceleration 2, Skidpad 3, Trackdrive 4, Braketest / EBS Test 5, Inspection 6, Autocross """ MANUAL_DRIVING = 0 ACCELERATION = 1 SKIDPAD = 2 TRACKDRIVE = 3 BRAKETEST = 4 INSPECTION = 5 AUTOCROSS = 6
[docs]class FinishLineDetection(): """ ROS module to detect finish line in global landmarks. Publish finish line, count laps & announce completed mission. Uses landmarks (big orange cones) estimated by SLAM to detect finish line. Uses vehicle pose estimated by SLAM to count and publish laps when crossing the finish line. Therefore an area with a hysteris around the finish line cones is used to detect when the area is entered and left. When reaching the maximum laps for the respective Autonomous Mission, it is announced via ROS that the mission is completed. .. figure:: images/finish_line_boxes.svg :class: with-border Parameters and attributes for finish line search, enter and leave box when staging the vehicle. Attributes ---------- mission : AutonomousMissionEnum Current Autonomous Mission which is being driven. big_orange_id : int ID which identifies big orange cones. inside : bool Indicates whether the vehicle currently is inside or outside the area around the finish line. lap_count : int Current driven laps, can be negative. This signalizes that the first lap is still driven and the finish line has not been crossed, but will be during the first lap due to the track layout of the Autonomous Mission, e.g. skidpad or trackdrive. lap_count_publisher : rospy.Publusher ROS publisher to publish the current lap count. finish_line_enter : Optional[npt.NDArray] Polygon representing box to use to detect entering the finish line area. Smaller than finish_line_leave. finish_line_leave : Optional[npt.NDArray] Polygon representing box to use to detect entering the finish line area. Bigger than finish_line_enter. finish_line_marker_publisher : rospy.Publisher ROS publisher to publish visualization markers to visualize enter and leave box of finish line area. finish_line_search_marker_publisher : rospy.Publisher ROS Publisher to publish visualization markers to visualize search box where the finish line area is searched. mission_completed_publisher : rospy.Publisher ROS Publisher to announce that the mission is completed due to driving the required laps. landmark_subscriber : Optional ROS subscriber to receive landmarks to detect finish line. Gets unregistered after detecting the finish line. vehicle_subscriber : Optional ROS subscriber to receive current vehicle pose to detect entering and leaving finish line area to count laps. Gets registered after detecting the finish line. vehicle_x_offset : float Number of meters between foremost point of vehicle and rear axle of vehicle. Used to specify search area relative to foremost point of vehicle. search_box_additional_width : float Half of the width of the search box. search_box_additional_height : float Half of the height of the search box. enter_box_additional_width : float Width to add to polygon around detected finish line to use it as box to detect entering the finish line area. enter_box_additional_height : float Height to add to polygon around detected finish line to use it as box to detect entering the finish line area. leave_box_additional_width : float Width to add to polygon around detected finish line to use it as box to detect leaving the finish line area. leave_box_additional_height : float Height to add to polygon around detected finish line to use it as box to detect leaving the finish line area. max_laps : int Laps to be driven for the current Autonomous Mission. initial_distance : float Longitudinal distance between foremost point of vehicle and middle finish line when staging the vehicle. """ def __init__(self): """Initialize finish line detection ROS module.""" self.mission = AutonomousMissionEnum(rospy.get_param('/mission')) if self.mission == AutonomousMissionEnum.BRAKETEST: rospy.loginfo("[FLD] Not started since the WEG IST DAS ZIEL. (brake test)") return self.big_orange_id = rospy.get_param('/utilities/id_orange_big') self.inside: bool = False self.lap_count: int = 0 self.lap_count_publisher = rospy.Publisher('/finish_line_detection/lap_count', LapCount, queue_size=10, latch=True) self.finish_line_enter: Optional[npt.NDArray] = None self.finish_line_leave: Optional[npt.NDArray] = None self.finish_line_marker_publisher = rospy.Publisher('/finish_line_detection/finish_line_box', MarkerArray, queue_size=10, latch=True) self.finish_line_search_marker_publisher = rospy.Publisher('/finish_line_detection/finish_line_search_box', Marker, queue_size=10, latch=True) self.mission_completed_publisher = rospy.Publisher('/finish_line_detection/mission_completed', MissionFinished, queue_size=10, latch=True) self.vehicle_subscriber: Optional[rospy.Subscriber] = None self.landmark_subscriber: Optional[rospy.Subscriber] = None self.alignment_subscriber: Optional[rospy.Subscriber] = None self.vehicle_x_offset = rospy.get_param('/finish_line_detection/search_box/vehicle_x_offset') self.search_box_additional_width = rospy.get_param('/finish_line_detection/search_box/additional_width') self.search_box_additional_height = rospy.get_param('/finish_line_detection/search_box/additional_height') self.enter_box_additional_width = rospy.get_param( '/finish_line_detection/finish_line_box/enter_box_additional_width') self.enter_box_additional_height = rospy.get_param( '/finish_line_detection/finish_line_box/enter_box_additional_height') self.leave_box_additional_width = rospy.get_param( '/finish_line_detection/finish_line_box/leave_box_additional_width') self.leave_box_additional_height = rospy.get_param( '/finish_line_detection/finish_line_box/leave_box_additional_height') if self.mission == AutonomousMissionEnum.TRACKDRIVE: self.max_laps = rospy.get_param('/finish_line_detection/trackdrive/max_laps') self.initial_distance = rospy.get_param('/finish_line_detection/trackdrive/initial_distance') if self.mission == AutonomousMissionEnum.AUTOCROSS: self.max_laps = rospy.get_param('/finish_line_detection/autocross/max_laps') self.initial_distance = rospy.get_param('/finish_line_detection/autocross/initial_distance') if self.mission == AutonomousMissionEnum.SKIDPAD: self.max_laps = rospy.get_param('/finish_line_detection/skidpad/max_laps') self.initial_distance = rospy.get_param('/finish_line_detection/skidpad/initial_distance') if self.mission == AutonomousMissionEnum.ACCELERATION: self.max_laps = rospy.get_param('/finish_line_detection/acceleration/max_laps') self.initial_distance = rospy.get_param('/finish_line_detection/acceleration/initial_distance') self.search_x_min = self.initial_distance + self.vehicle_x_offset - self.search_box_additional_height self.search_x_max = self.initial_distance + self.vehicle_x_offset + self.search_box_additional_height self.search_y_min = -self.search_box_additional_width self.search_y_max = self.search_box_additional_width self.publish_finish_line_search_box() rospy.loginfo('[FLD] Searching for finish line in: ' f'{self.search_x_min} < x < {self.search_x_max} ' f'^ {self.search_y_min} < y < {self.search_y_max}')
[docs] def generate_finish_line_marker(self, points: npt.NDArray) -> Marker: """ Generate visualization marker representing finish line for given poins. Parameters ---------- points : npt.NDArray Points representing finish line as polygon. First point will be repeated to close the polyogon. Returns ------- Marker Visualization marker for ROS. """ marker = Marker() marker.header.stamp = rospy.Time.now() marker.header.frame_id = 'map' marker.ns = 'finish_line_detection' marker.action = marker.ADD marker.type = marker.LINE_STRIP marker.color.a = 1 marker.color.r = 207 / 255 marker.color.g = 47 / 255 marker.color.b = 116 / 255 marker.scale.x = 0.05 marker.points = [] for point in points: marker_point = Point() marker_point.x = point[0] marker_point.y = point[1] marker_point.z = 0 marker.points.append(marker_point) marker.points.append(marker.points[0]) return marker
[docs] def finish_line_landmark_callback(self, cone_list: ConeListWithCovariance): """ Receive landmarks estimated by SLAM and received via ROS. Generate finish line from them if possible. Passes cones in search area to function to generate finish line. Unregisters landmark subscriber as soon as finish line has been detected. Parameters ---------- cone_list : ConeListWithCovariance Landmarks estimated by SLAM. """ cones = np.array([[cone.x, cone.y] for cone in cone_list.cones if (cone.id == self.big_orange_id and self.search_x_min < cone.x < self.search_x_max and self.search_y_min < cone.y < self.search_y_max)], dtype=np.float32) self.generate_finish_line(cones=cones) if self.finish_line_enter is not None and self.finish_line_leave is not None: self.landmark_subscriber.unregister()
[docs] def map_alignment_callback(self, msg: MapAlignment): """ Receive map alignment estimated by SLAM and received via ROS. Update search box when vehicle is aligned. Parameters ---------- msg : MapAlignment Map alignment estimated by SLAM. """ rotation_matrix = np.array(msg.rotation_matrix).reshape(2,2) translation = np.array(msg.translation) point1 = np.array([self.search_x_min, self.search_y_min]) point2 = np.array([self.search_x_min, self.search_y_max]) point3 = np.array([self.search_x_max, self.search_y_min]) point4 = np.array([self.search_x_max, self.search_y_max]) point1_transformed = rotation_matrix @ point1 + translation point2_transformed = rotation_matrix @ point2 + translation point3_transformed = rotation_matrix @ point3 + translation point4_transformed = rotation_matrix @ point4 + translation self.search_x_min = min(point1_transformed[0], point2_transformed[0], point3_transformed[0], point4_transformed[0]) self.search_x_max = max(point1_transformed[0], point2_transformed[0], point3_transformed[0], point4_transformed[0]) self.search_y_min = -max(point1_transformed[1], point2_transformed[1], point3_transformed[1], point4_transformed[1]) self.search_y_max = -min(point1_transformed[1], point2_transformed[1], point3_transformed[1], point4_transformed[1]) self.publish_finish_line_search_box() rospy.loginfo('[FLD] Received Map alignment. Now searching for finish line in: ' f'{self.search_x_min} < x < {self.search_x_max} ' f'^ {self.search_y_min} < y < {self.search_y_max}') self.alignment_subscriber.unregister()
[docs] def generate_finish_line(self, cones: npt.NDArray): """ Generate finish line from cones in search box if possible. There need to be atleas four big orange cones and the resulting rectangle has to has atleats a specific size. Publishes enter and leave box representing finish line if finish has been detected. Parameters ---------- cones : npt.NDArray Big orange cones estimated by SLAM which are in the search box. """ if cones.shape[0] < 4: return center, (width, height), angle = cv2.minAreaRect(cones) if (width < 2 or height < 0.2) and (width < 0.2 or height < 2): rospy.loginfo_throttle_identical(0.2, f'[FLD] Width: {width}, Height: {height}, Angle: {angle}') return self.finish_line_enter = cv2.boxPoints((center, (width + self.enter_box_additional_width, height + self.enter_box_additional_height), angle)) finish_line_enter_marker = self.generate_finish_line_marker(self.finish_line_enter) finish_line_enter_marker.id = 2 self.finish_line_leave = cv2.boxPoints((center, (width + self.leave_box_additional_width, height + self.leave_box_additional_height), angle)) finish_line_leave_marker = self.generate_finish_line_marker(self.finish_line_leave) finish_line_leave_marker.id = 3 markers = MarkerArray() finish_line_enter_marker.color.r = 34 / 255 finish_line_enter_marker.color.g = 153 / 255 finish_line_enter_marker.color.b = 84 / 255 finish_line_leave_marker.color.r = 176 / 255 finish_line_leave_marker.color.g = 58 / 255 finish_line_leave_marker.color.b = 46 / 255 markers.markers = [finish_line_enter_marker, finish_line_leave_marker] self.finish_line_marker_publisher.publish(markers) rospy.loginfo('[FLD] Finish line detected and published.')
[docs] def lap_count_vehicle_callback(self, vehicle_pose: VehiclePose): """ Receive vehicle pose and increase lap count if leave box has been left after enter box has been entered before. Vehicle pose is estimated by SLAM and received via ROS. Publish lap count if increased. Publish that mission is completed when max laps has been reached. Parameters ---------- vehicle_pose : VehiclePose Received vehicle pose received via ROS and estimated by SLAM. """ pose = (vehicle_pose.x, vehicle_pose.y) # Ensure it's a tuple if not self.inside: if cv2.pointPolygonTest(self.finish_line_enter, pose, False) > 0: rospy.loginfo('[FLD] Entered finish line area.') self.inside = True else: if cv2.pointPolygonTest(self.finish_line_leave, pose, False) < 0: # leave self.inside = False self.lap_count += 1 rospy.loginfo(f'[FLD] Left finish line area: Finished lap {self.lap_count}') self.publish_lap_count() if self.lap_count >= self.max_laps: rospy.loginfo_once(f'[FLD] Finished {self.mission}') msg = MissionFinished() msg.header.stamp = rospy.Time.now() msg.finished = True self.mission_completed_publisher.publish(msg)
[docs] def trackdrive_and_autocross(self): """ Run finish line detection when driving trackdrive or autocross. Lap count is set to -1, since car is staged before the start & finish line and thus finish line is crossed while driving the first lap. Callback for landmarks estimated by SLAM is set. As soon as the finish line is detected, callback for vehicle pose is set so that finish line detection can count the driven laps. """ self.landmark_subscriber = rospy.Subscriber('/slam/landmarks', ConeListWithCovariance, self.finish_line_landmark_callback) while not rospy.is_shutdown(): if self.finish_line_enter is not None and self.finish_line_leave is not None: break rospy.sleep(0.01) self.lap_count = -1 self.vehicle_subscriber = rospy.Subscriber('/slam/vehicle_pose', VehiclePose, self.lap_count_vehicle_callback)
[docs] def skidpad(self): """ Run finish line detection when driving skidpad. Lap count is set to -1, since first lap is started when intersection has been crossed the first time. Callback for landmarks estimated by SLAM is set. As soon as the finish line is detected, callback for vehicle pose is set so that finish line detection can count the driven laps. """ self.landmark_subscriber = rospy.Subscriber('/slam/landmarks', ConeListWithCovariance, self.finish_line_landmark_callback) self.alignment_subscriber = rospy.Subscriber('/slam/map_alignment', MapAlignment, self.map_alignment_callback) while not rospy.is_shutdown(): if self.finish_line_enter is not None and self.finish_line_leave is not None: break rospy.sleep(0.01) self.lap_count = -1 self.vehicle_subscriber = rospy.Subscriber('/slam/vehicle_pose', VehiclePose, self.lap_count_vehicle_callback)
[docs] def acceleration(self): """ Run finish line detection when driving acceleration. Lap count is kept at 0, since first lap is finished when finish line is crossed. Callback for landmarks estimated by SLAM is set. As soon as the finish line is detected, callback for vehicle pose is set so that finish line detection can count the driven laps. """ self.landmark_subscriber = rospy.Subscriber('/slam/landmarks', ConeListWithCovariance, self.finish_line_landmark_callback) self.alignment_subscriber = rospy.Subscriber('/slam/map_alignment', MapAlignment, self.map_alignment_callback) while not rospy.is_shutdown(): if self.finish_line_enter is not None and self.finish_line_leave is not None: break rospy.sleep(0.01) self.vehicle_subscriber = rospy.Subscriber('/slam/vehicle_pose', VehiclePose, self.lap_count_vehicle_callback)
[docs] def publish_lap_count(self): """Publish current lap count to ROS.""" msg = LapCount() msg.header.stamp = rospy.Time.now() msg.laps = max(self.lap_count, 0) self.lap_count_publisher.publish(msg)
[docs] def run(self): """Run finish line detection by calling the respective function based on the current AS Mission.""" self.publish_lap_count() if self.mission == AutonomousMissionEnum.TRACKDRIVE or self.mission == AutonomousMissionEnum.AUTOCROSS: rospy.loginfo('[FLD] Driving Autocross or Trackdrive') self.trackdrive_and_autocross() if self.mission == AutonomousMissionEnum.SKIDPAD: rospy.loginfo('[FLD] Driving Skidpad') self.skidpad() if self.mission == AutonomousMissionEnum.ACCELERATION: rospy.loginfo('[FLD] Driving Acceleration') self.acceleration()
if __name__ == '__main__': rospy.init_node('finish_line_detection', anonymous=True) try: finish_line_detection = FinishLineDetection() if finish_line_detection.mission != AutonomousMissionEnum.BRAKETEST: finish_line_detection.run() rospy.spin() except Exception as e: if not isinstance(e, rospy.exceptions.ROSInterruptException): rospy.logfatal(traceback.format_exc()) time.sleep(2) raise e