#!/usr/bin/python3
"""ROS module to detect finish line in landmarks. Publish finish line, count laps and announce completed mission."""
import traceback
import time
from enum import Enum
from typing import Optional
import numpy as np
import numpy.typing as npt
import cv2
import rospy
from visualization_msgs.msg import Marker, MarkerArray
from geometry_msgs.msg import Point
from utilities.msg import ConeListWithCovariance, VehiclePose, MapAlignment
from as_can_msgs.msg import LapCount, MissionFinished
[docs]class AutonomousMissionEnum(Enum):
"""
Enum to describe which Autonomous Mission is currently select.
.. csv-table:: Mission IDs
:header: "Mission ID", "Mission Name"
0, Manual Driving
1, Acceleration
2, Skidpad
3, Trackdrive
4, Braketest / EBS Test
5, Inspection
6, Autocross
"""
MANUAL_DRIVING = 0
ACCELERATION = 1
SKIDPAD = 2
TRACKDRIVE = 3
BRAKETEST = 4
INSPECTION = 5
AUTOCROSS = 6
[docs]class FinishLineDetection():
"""
ROS module to detect finish line in global landmarks. Publish finish line, count laps & announce completed mission.
Uses landmarks (big orange cones) estimated by SLAM to detect finish line.
Uses vehicle pose estimated by SLAM to count and publish laps when crossing the finish line.
Therefore an area with a hysteris around the finish line cones is used to detect when the area is entered and left.
When reaching the maximum laps for the respective Autonomous Mission,
it is announced via ROS that the mission is completed.
.. figure:: images/finish_line_boxes.svg
:class: with-border
Parameters and attributes for finish line search, enter and leave box when staging the vehicle.
Attributes
----------
mission : AutonomousMissionEnum
Current Autonomous Mission which is being driven.
big_orange_id : int
ID which identifies big orange cones.
inside : bool
Indicates whether the vehicle currently is inside or outside the area around the finish line.
lap_count : int
Current driven laps, can be negative. This signalizes that the first lap is still driven
and the finish line has not been crossed, but will be during the first lap due to
the track layout of the Autonomous Mission, e.g. skidpad or trackdrive.
lap_count_publisher : rospy.Publusher
ROS publisher to publish the current lap count.
finish_line_enter : Optional[npt.NDArray]
Polygon representing box to use to detect entering the finish line area. Smaller than finish_line_leave.
finish_line_leave : Optional[npt.NDArray]
Polygon representing box to use to detect entering the finish line area. Bigger than finish_line_enter.
finish_line_marker_publisher : rospy.Publisher
ROS publisher to publish visualization markers to visualize enter and leave box of finish line area.
finish_line_search_marker_publisher : rospy.Publisher
ROS Publisher to publish visualization markers to visualize search box where the finish line area is searched.
mission_completed_publisher : rospy.Publisher
ROS Publisher to announce that the mission is completed due to driving the required laps.
landmark_subscriber : Optional
ROS subscriber to receive landmarks to detect finish line. Gets unregistered after detecting the finish line.
vehicle_subscriber : Optional
ROS subscriber to receive current vehicle pose to detect entering and leaving finish line area to count laps.
Gets registered after detecting the finish line.
vehicle_x_offset : float
Number of meters between foremost point of vehicle and rear axle of vehicle.
Used to specify search area relative to foremost point of vehicle.
search_box_additional_width : float
Half of the width of the search box.
search_box_additional_height : float
Half of the height of the search box.
enter_box_additional_width : float
Width to add to polygon around detected finish line to use it as box to detect entering the finish line area.
enter_box_additional_height : float
Height to add to polygon around detected finish line to use it as box to detect entering the finish line area.
leave_box_additional_width : float
Width to add to polygon around detected finish line to use it as box to detect leaving the finish line area.
leave_box_additional_height : float
Height to add to polygon around detected finish line to use it as box to detect leaving the finish line area.
max_laps : int
Laps to be driven for the current Autonomous Mission.
initial_distance : float
Longitudinal distance between foremost point of vehicle and middle finish line when staging the vehicle.
"""
def __init__(self):
"""Initialize finish line detection ROS module."""
self.mission = AutonomousMissionEnum(rospy.get_param('/mission'))
if self.mission == AutonomousMissionEnum.BRAKETEST:
rospy.loginfo("[FLD] Not started since the WEG IST DAS ZIEL. (brake test)")
return
self.big_orange_id = rospy.get_param('/utilities/id_orange_big')
self.inside: bool = False
self.lap_count: int = 0
self.lap_count_publisher = rospy.Publisher('/finish_line_detection/lap_count',
LapCount, queue_size=10, latch=True)
self.finish_line_enter: Optional[npt.NDArray] = None
self.finish_line_leave: Optional[npt.NDArray] = None
self.finish_line_marker_publisher = rospy.Publisher('/finish_line_detection/finish_line_box',
MarkerArray, queue_size=10, latch=True)
self.finish_line_search_marker_publisher = rospy.Publisher('/finish_line_detection/finish_line_search_box',
Marker, queue_size=10, latch=True)
self.mission_completed_publisher = rospy.Publisher('/finish_line_detection/mission_completed',
MissionFinished, queue_size=10, latch=True)
self.vehicle_subscriber: Optional[rospy.Subscriber] = None
self.landmark_subscriber: Optional[rospy.Subscriber] = None
self.alignment_subscriber: Optional[rospy.Subscriber] = None
self.vehicle_x_offset = rospy.get_param('/finish_line_detection/search_box/vehicle_x_offset')
self.search_box_additional_width = rospy.get_param('/finish_line_detection/search_box/additional_width')
self.search_box_additional_height = rospy.get_param('/finish_line_detection/search_box/additional_height')
self.enter_box_additional_width = rospy.get_param(
'/finish_line_detection/finish_line_box/enter_box_additional_width')
self.enter_box_additional_height = rospy.get_param(
'/finish_line_detection/finish_line_box/enter_box_additional_height')
self.leave_box_additional_width = rospy.get_param(
'/finish_line_detection/finish_line_box/leave_box_additional_width')
self.leave_box_additional_height = rospy.get_param(
'/finish_line_detection/finish_line_box/leave_box_additional_height')
if self.mission == AutonomousMissionEnum.TRACKDRIVE:
self.max_laps = rospy.get_param('/finish_line_detection/trackdrive/max_laps')
self.initial_distance = rospy.get_param('/finish_line_detection/trackdrive/initial_distance')
if self.mission == AutonomousMissionEnum.AUTOCROSS:
self.max_laps = rospy.get_param('/finish_line_detection/autocross/max_laps')
self.initial_distance = rospy.get_param('/finish_line_detection/autocross/initial_distance')
if self.mission == AutonomousMissionEnum.SKIDPAD:
self.max_laps = rospy.get_param('/finish_line_detection/skidpad/max_laps')
self.initial_distance = rospy.get_param('/finish_line_detection/skidpad/initial_distance')
if self.mission == AutonomousMissionEnum.ACCELERATION:
self.max_laps = rospy.get_param('/finish_line_detection/acceleration/max_laps')
self.initial_distance = rospy.get_param('/finish_line_detection/acceleration/initial_distance')
self.search_x_min = self.initial_distance + self.vehicle_x_offset - self.search_box_additional_height
self.search_x_max = self.initial_distance + self.vehicle_x_offset + self.search_box_additional_height
self.search_y_min = -self.search_box_additional_width
self.search_y_max = self.search_box_additional_width
self.publish_finish_line_search_box()
rospy.loginfo('[FLD] Searching for finish line in: '
f'{self.search_x_min} < x < {self.search_x_max} '
f'^ {self.search_y_min} < y < {self.search_y_max}')
[docs] def generate_finish_line_marker(self, points: npt.NDArray) -> Marker:
"""
Generate visualization marker representing finish line for given poins.
Parameters
----------
points : npt.NDArray
Points representing finish line as polygon. First point will be repeated to close the polyogon.
Returns
-------
Marker
Visualization marker for ROS.
"""
marker = Marker()
marker.header.stamp = rospy.Time.now()
marker.header.frame_id = 'map'
marker.ns = 'finish_line_detection'
marker.action = marker.ADD
marker.type = marker.LINE_STRIP
marker.color.a = 1
marker.color.r = 207 / 255
marker.color.g = 47 / 255
marker.color.b = 116 / 255
marker.scale.x = 0.05
marker.points = []
for point in points:
marker_point = Point()
marker_point.x = point[0]
marker_point.y = point[1]
marker_point.z = 0
marker.points.append(marker_point)
marker.points.append(marker.points[0])
return marker
[docs] def finish_line_landmark_callback(self, cone_list: ConeListWithCovariance):
"""
Receive landmarks estimated by SLAM and received via ROS. Generate finish line from them if possible.
Passes cones in search area to function to generate finish line.
Unregisters landmark subscriber as soon as finish line has been detected.
Parameters
----------
cone_list : ConeListWithCovariance
Landmarks estimated by SLAM.
"""
cones = np.array([[cone.x, cone.y] for cone in cone_list.cones
if (cone.id == self.big_orange_id
and self.search_x_min < cone.x < self.search_x_max
and self.search_y_min < cone.y < self.search_y_max)], dtype=np.float32)
self.generate_finish_line(cones=cones)
if self.finish_line_enter is not None and self.finish_line_leave is not None:
self.landmark_subscriber.unregister()
[docs] def map_alignment_callback(self, msg: MapAlignment):
"""
Receive map alignment estimated by SLAM and received via ROS. Update search box when vehicle is aligned.
Parameters
----------
msg : MapAlignment
Map alignment estimated by SLAM.
"""
rotation_matrix = np.array(msg.rotation_matrix).reshape(2,2)
translation = np.array(msg.translation)
point1 = np.array([self.search_x_min, self.search_y_min])
point2 = np.array([self.search_x_min, self.search_y_max])
point3 = np.array([self.search_x_max, self.search_y_min])
point4 = np.array([self.search_x_max, self.search_y_max])
point1_transformed = rotation_matrix @ point1 + translation
point2_transformed = rotation_matrix @ point2 + translation
point3_transformed = rotation_matrix @ point3 + translation
point4_transformed = rotation_matrix @ point4 + translation
self.search_x_min = min(point1_transformed[0], point2_transformed[0], point3_transformed[0], point4_transformed[0])
self.search_x_max = max(point1_transformed[0], point2_transformed[0], point3_transformed[0], point4_transformed[0])
self.search_y_min = -max(point1_transformed[1], point2_transformed[1], point3_transformed[1], point4_transformed[1])
self.search_y_max = -min(point1_transformed[1], point2_transformed[1], point3_transformed[1], point4_transformed[1])
self.publish_finish_line_search_box()
rospy.loginfo('[FLD] Received Map alignment. Now searching for finish line in: '
f'{self.search_x_min} < x < {self.search_x_max} '
f'^ {self.search_y_min} < y < {self.search_y_max}')
self.alignment_subscriber.unregister()
[docs] def publish_finish_line_search_box(self):
"""Publish finish line search box as visualization marker to ROS."""
corners = [[self.search_x_min, self.search_y_min],
[self.search_x_min, self.search_y_max],
[self.search_x_max, self.search_y_max],
[self.search_x_max, self.search_y_min]]
marker = self.generate_finish_line_marker(points=corners)
marker.id = 1
marker.color.r = 46 / 255
marker.color.g = 134 / 255
marker.color.b = 193 / 255
self.finish_line_search_marker_publisher.publish(marker)
[docs] def generate_finish_line(self, cones: npt.NDArray):
"""
Generate finish line from cones in search box if possible.
There need to be atleas four big orange cones and the resulting rectangle has to has atleats a specific size.
Publishes enter and leave box representing finish line if finish has been detected.
Parameters
----------
cones : npt.NDArray
Big orange cones estimated by SLAM which are in the search box.
"""
if cones.shape[0] < 4:
return
center, (width, height), angle = cv2.minAreaRect(cones)
if (width < 2 or height < 0.2) and (width < 0.2 or height < 2):
rospy.loginfo_throttle_identical(0.2, f'[FLD] Width: {width}, Height: {height}, Angle: {angle}')
return
self.finish_line_enter = cv2.boxPoints((center, (width + self.enter_box_additional_width,
height + self.enter_box_additional_height), angle))
finish_line_enter_marker = self.generate_finish_line_marker(self.finish_line_enter)
finish_line_enter_marker.id = 2
self.finish_line_leave = cv2.boxPoints((center, (width + self.leave_box_additional_width,
height + self.leave_box_additional_height), angle))
finish_line_leave_marker = self.generate_finish_line_marker(self.finish_line_leave)
finish_line_leave_marker.id = 3
markers = MarkerArray()
finish_line_enter_marker.color.r = 34 / 255
finish_line_enter_marker.color.g = 153 / 255
finish_line_enter_marker.color.b = 84 / 255
finish_line_leave_marker.color.r = 176 / 255
finish_line_leave_marker.color.g = 58 / 255
finish_line_leave_marker.color.b = 46 / 255
markers.markers = [finish_line_enter_marker, finish_line_leave_marker]
self.finish_line_marker_publisher.publish(markers)
rospy.loginfo('[FLD] Finish line detected and published.')
[docs] def lap_count_vehicle_callback(self, vehicle_pose: VehiclePose):
"""
Receive vehicle pose and increase lap count if leave box has been left after enter box has been entered before.
Vehicle pose is estimated by SLAM and received via ROS.
Publish lap count if increased.
Publish that mission is completed when max laps has been reached.
Parameters
----------
vehicle_pose : VehiclePose
Received vehicle pose received via ROS and estimated by SLAM.
"""
pose = (vehicle_pose.x, vehicle_pose.y) # Ensure it's a tuple
if not self.inside:
if cv2.pointPolygonTest(self.finish_line_enter, pose, False) > 0:
rospy.loginfo('[FLD] Entered finish line area.')
self.inside = True
else:
if cv2.pointPolygonTest(self.finish_line_leave, pose, False) < 0:
# leave
self.inside = False
self.lap_count += 1
rospy.loginfo(f'[FLD] Left finish line area: Finished lap {self.lap_count}')
self.publish_lap_count()
if self.lap_count >= self.max_laps:
rospy.loginfo_once(f'[FLD] Finished {self.mission}')
msg = MissionFinished()
msg.header.stamp = rospy.Time.now()
msg.finished = True
self.mission_completed_publisher.publish(msg)
[docs] def trackdrive_and_autocross(self):
"""
Run finish line detection when driving trackdrive or autocross.
Lap count is set to -1, since car is staged before the start & finish line
and thus finish line is crossed while driving the first lap.
Callback for landmarks estimated by SLAM is set. As soon as the finish line is detected, callback for
vehicle pose is set so that finish line detection can count the driven laps.
"""
self.landmark_subscriber = rospy.Subscriber('/slam/landmarks', ConeListWithCovariance,
self.finish_line_landmark_callback)
while not rospy.is_shutdown():
if self.finish_line_enter is not None and self.finish_line_leave is not None:
break
rospy.sleep(0.01)
self.lap_count = -1
self.vehicle_subscriber = rospy.Subscriber('/slam/vehicle_pose', VehiclePose, self.lap_count_vehicle_callback)
[docs] def skidpad(self):
"""
Run finish line detection when driving skidpad.
Lap count is set to -1, since first lap is started when intersection has been crossed the first time.
Callback for landmarks estimated by SLAM is set. As soon as the finish line is detected, callback for
vehicle pose is set so that finish line detection can count the driven laps.
"""
self.landmark_subscriber = rospy.Subscriber('/slam/landmarks', ConeListWithCovariance,
self.finish_line_landmark_callback)
self.alignment_subscriber = rospy.Subscriber('/slam/map_alignment', MapAlignment,
self.map_alignment_callback)
while not rospy.is_shutdown():
if self.finish_line_enter is not None and self.finish_line_leave is not None:
break
rospy.sleep(0.01)
self.lap_count = -1
self.vehicle_subscriber = rospy.Subscriber('/slam/vehicle_pose', VehiclePose, self.lap_count_vehicle_callback)
[docs] def acceleration(self):
"""
Run finish line detection when driving acceleration.
Lap count is kept at 0, since first lap is finished when finish line is crossed.
Callback for landmarks estimated by SLAM is set. As soon as the finish line is detected, callback for
vehicle pose is set so that finish line detection can count the driven laps.
"""
self.landmark_subscriber = rospy.Subscriber('/slam/landmarks', ConeListWithCovariance,
self.finish_line_landmark_callback)
self.alignment_subscriber = rospy.Subscriber('/slam/map_alignment', MapAlignment,
self.map_alignment_callback)
while not rospy.is_shutdown():
if self.finish_line_enter is not None and self.finish_line_leave is not None:
break
rospy.sleep(0.01)
self.vehicle_subscriber = rospy.Subscriber('/slam/vehicle_pose', VehiclePose, self.lap_count_vehicle_callback)
[docs] def publish_lap_count(self):
"""Publish current lap count to ROS."""
msg = LapCount()
msg.header.stamp = rospy.Time.now()
msg.laps = max(self.lap_count, 0)
self.lap_count_publisher.publish(msg)
[docs] def run(self):
"""Run finish line detection by calling the respective function based on the current AS Mission."""
self.publish_lap_count()
if self.mission == AutonomousMissionEnum.TRACKDRIVE or self.mission == AutonomousMissionEnum.AUTOCROSS:
rospy.loginfo('[FLD] Driving Autocross or Trackdrive')
self.trackdrive_and_autocross()
if self.mission == AutonomousMissionEnum.SKIDPAD:
rospy.loginfo('[FLD] Driving Skidpad')
self.skidpad()
if self.mission == AutonomousMissionEnum.ACCELERATION:
rospy.loginfo('[FLD] Driving Acceleration')
self.acceleration()
if __name__ == '__main__':
rospy.init_node('finish_line_detection', anonymous=True)
try:
finish_line_detection = FinishLineDetection()
if finish_line_detection.mission != AutonomousMissionEnum.BRAKETEST:
finish_line_detection.run()
rospy.spin()
except Exception as e:
if not isinstance(e, rospy.exceptions.ROSInterruptException):
rospy.logfatal(traceback.format_exc())
time.sleep(2)
raise e