import signal
from enum import Enum
from typing import Tuple, List, Optional
from pathlib import Path
import os.path
import json
import matplotlib.pyplot as plt
import numpy as np
import numpy.typing as npt
import cv2
import av
import mimetypes
PIXEL_PER_METER = 100
[docs]class GracefulKiller:
kill_now = False
def __init__(self):
signal.signal(signal.SIGINT, self.exit_gracefully)
signal.signal(signal.SIGTERM, self.exit_gracefully)
[docs] def exit_gracefully(self, *args):
self.kill_now = True
[docs]class ImageSelector():
def __init__(self, input_video_path: str, killer: GracefulKiller) -> None:
self.input_video_path = input_video_path
self.killer = killer
[docs] def find_image_i(self, image_i: int, key: str):
if key == 81:
image_i = max(0, image_i - 1)
elif key == 83:
image_i = max(0, image_i + 1)
elif key == 85:
image_i = max(0, image_i + 5)
elif key == 86:
image_i = max(0, image_i - 5)
return image_i
[docs] def select_image(self) -> npt.NDArray:
cv2.namedWindow('image_selector_window', cv2.WINDOW_NORMAL)
cv2.setWindowTitle('image_selector_window', 'Select the image to use for creating the plot. '
'Use arrow and picture keys to choose a suitable image')
with av.open(self.input_video_path) as container:
key_stream = container.streams.video[0]
key_stream.codec_context.skip_frame = "NONKEY"
key_frame_generator = container.decode(key_stream)
key_images = []
key_image_i = 0
while self.killer.kill_now is False:
while key_image_i + 1 > len(key_images):
frame = next(key_frame_generator)
key_images.append(frame.to_ndarray(format='bgr24'))
cv2.imshow('image_selector_window', key_images[key_image_i])
key = cv2.waitKey()
if key == ord('\r'):
break
elif key is not None:
key_image_i = self.find_image_i(image_i=key_image_i, key=key)
cv2.destroyWindow('image_selector_window')
return key_images[key_image_i]
[docs]class FeatureSelectorMode(Enum):
BOUNDARIES = 1
VEHICLE_POSE = 2
CONES = 3
SKIDPAD_CENTERPOINTS = 4
ACCELERATION_CENTERPOINTS = 5
[docs]class FeatureSelector():
_cone_colors = [(0, 255, 255), (255, 0, 0), (0, 165, 255), (0, 0, 255)]
_cone_color_representations = ['yellow', 'blue', 'small orange', 'big orange']
def __init__(self, image: npt.NDArray, mode: FeatureSelectorMode, killer: GracefulKiller) -> None:
self.mode = mode
self.original_image = image.copy()
self.plot_order()
self.image = self.original_image.copy()
self.features = []
self.finished = False
self.current_cone_id = 0
self.current_fake_cone_id = 0
self.use_for_registration = True
self.killer = killer
[docs] def plot_order(self):
if self.mode == FeatureSelectorMode.BOUNDARIES:
rows, cols = self.original_image.shape[:2]
cv2.arrowedLine(self.original_image, (20, 20), (cols - 50, 20), color=(130, 130, 0),
thickness=5, tipLength=0.01)
cv2.arrowedLine(self.original_image, (cols - 20, 20), (cols - 20, rows - 50),
color=(130, 130, 0), thickness=5, tipLength=0.01)
cv2.arrowedLine(self.original_image, (cols - 20, rows - 20), (50, rows - 20),
color=(130, 130, 0), thickness=5, tipLength=0.01)
[docs] def plot_features(self):
self.image = self.original_image.copy()
for feature in self.features:
if self.mode == FeatureSelectorMode.BOUNDARIES:
cv2.drawMarker(self.image, feature, (0, 255, 0), markerType=cv2.MARKER_STAR,
markerSize=40, thickness=2, line_type=cv2.LINE_AA)
elif self.mode == FeatureSelectorMode.VEHICLE_POSE:
cv2.circle(self.image, feature, radius=3, color=(0, 0, 255), thickness=-1)
elif self.mode == FeatureSelectorMode.CONES:
if feature[4] is True:
cv2.drawMarker(self.image, feature[:2], self._cone_colors[feature[2]], markerType=cv2.MARKER_CROSS,
markerSize=20, thickness=2, line_type=cv2.LINE_AA)
else:
cv2.drawMarker(self.image, feature[:2], self._cone_colors[feature[2]], markerType=cv2.MARKER_TILTED_CROSS,
markerSize=20, thickness=2, line_type=cv2.LINE_AA)
cv2.circle(img=self.image, center=feature[:2], color=self._cone_colors[feature[3]],
radius=10, thickness=2, lineType=cv2.LINE_AA)
if self.mode == FeatureSelectorMode.SKIDPAD_CENTERPOINTS:
if len(self.features) == 1:
cv2.circle(self.image, self.features[0], radius=5, color=(0, 0, 255), thickness=-1)
if len(self.features) >= 2:
cv2.arrowedLine(self.image, self.features[0], self.features[1], color=(0, 0, 255), thickness=3)
if len(self.features) >= 3:
cv2.circle(self.image, self.features[2], color=(0, 0, 255), thickness=3,
radius=int(np.linalg.norm(np.array(self.features[2]) - np.array(self.features[1]))))
if len(self.features) >= 4:
cv2.circle(self.image, self.features[3], color=(0, 0, 255), thickness=3,
radius=int(np.linalg.norm(np.array(self.features[3]) - np.array(self.features[1]))))
if len(self.features) >= 5:
cv2.arrowedLine(self.image, self.features[1], self.features[4], color=(0, 0, 255), thickness=3)
if self.mode == FeatureSelectorMode.ACCELERATION_CENTERPOINTS:
if len(self.features) == 1:
cv2.circle(self.image, self.features[0], radius=5, color=(0, 0, 255), thickness=-1)
if len(self.features) >= 2:
cv2.arrowedLine(self.image, self.features[0], self.features[1], color=(0, 0, 255), thickness=3)
[docs] def delete_features_in_rectangle(self, position1: Tuple[int, int], position2: Tuple[int, int]):
if self.features:
features = np.array(self.features)[:, :2]
y_min = min(position1[1], position2[1])
x_min = min(position1[0], position2[0])
y_max = max(position1[1], position2[1])
x_max = max(position1[0], position2[0])
keep = np.logical_not(np.all(np.c_[features > (x_min, y_min), features < (x_max, y_max)], axis=1))
self.features = [feature for keep_feature, feature in zip(keep, self.features) if keep_feature]
[docs] def add_feature(self, event, x, y, flags, param):
if self.finished is False:
if event == cv2.EVENT_MBUTTONDOWN:
self.last_position = (x, y)
elif event == cv2.EVENT_MBUTTONUP and self.last_position is not None:
last_position = self.last_position
self.last_position = None
self.delete_features_in_rectangle(last_position, (x, y))
self.plot_features()
if event == cv2.EVENT_LBUTTONDBLCLK:
if self.mode == FeatureSelectorMode.BOUNDARIES:
cv2.drawMarker(self.image, (x, y), (0, 255, 0), markerType=cv2.MARKER_STAR,
markerSize=40, thickness=2, line_type=cv2.LINE_AA)
self.features.append((x, y))
if len(self.features) == 4:
self.finished = True
elif self.mode == FeatureSelectorMode.VEHICLE_POSE:
cv2.circle(self.image, (x, y), radius=3, color=(0, 0, 255), thickness=-1)
self.features.append((x, y))
if len(self.features) == 2:
cv2.arrowedLine(self.image, self.features[0], self.features[1], color=(0, 0, 255), thickness=2)
self.finished = True
elif self.mode == FeatureSelectorMode.CONES:
self.features.append((x, y, self.current_cone_id, self.current_fake_cone_id, self.use_for_registration))
self.plot_features()
elif self.mode == FeatureSelectorMode.SKIDPAD_CENTERPOINTS:
self.features.append((x, y))
self.plot_features()
if len(self.features) == 5:
self.finished = True
elif self.mode == FeatureSelectorMode.ACCELERATION_CENTERPOINTS:
self.features.append((x, y))
self.plot_features()
if len(self.features) == 2:
self.finished = True
[docs] def recover_features(self, features: List[Tuple[int, int, int, int, int]]):
if self.mode == FeatureSelectorMode.CONES:
self.features = [[*feature[:4], feature[4] == 1] for feature in features]
self.plot_features()
elif self.mode in [FeatureSelectorMode.SKIDPAD_CENTERPOINTS, FeatureSelectorMode.ACCELERATION_CENTERPOINTS]:
self.features = features
self.plot_features()
[docs] def run(self):
cv2.namedWindow('feature_selector_window', cv2.WINDOW_NORMAL)
cv2.setMouseCallback('feature_selector_window', self.add_feature)
if self.mode == FeatureSelectorMode.BOUNDARIES:
cv2.setWindowTitle('feature_selector_window',
'Select the track boundaries according to the visualized order')
elif self.mode == FeatureSelectorMode.VEHICLE_POSE:
cv2.setWindowTitle('feature_selector_window',
'Select initial position of the vehicle: Centerpoint of rear axle '
'and some centerpoint in the front')
elif self.mode == FeatureSelectorMode.SKIDPAD_CENTERPOINTS:
cv2.setWindowTitle('feature_selector_window',
'Select keypoints for skidpad centerpoints: First point, centerpoint of finish line, '
'circle center of right and left cirle and last point')
elif self.mode == FeatureSelectorMode.ACCELERATION_CENTERPOINTS:
cv2.setWindowTitle('feature_selector_window',
'Select keypoints for acceleration centerpoints: First and last point')
while not self.killer.kill_now and not self.finished:
cv2.imshow('feature_selector_window', self.image)
key = cv2.waitKey(50)
if key == ord('\b') and self.mode != FeatureSelectorMode.CONES:
if self.features:
del self.features[-1]
self.plot_features()
if self.mode == FeatureSelectorMode.CONES:
if key == ord('\r'):
break
elif key == ord('y'):
self.current_cone_id = 0
self.current_fake_cone_id = 0
elif key == ord('b'):
self.current_cone_id = 1
self.current_fake_cone_id = 1
elif key == ord('o'):
self.current_cone_id = 2
self.current_fake_cone_id = 2
elif key == ord('f'):
self.current_cone_id = 3
self.current_fake_cone_id = 3
elif key == ord('p'):
self.current_fake_cone_id = 1
elif key == ord('x'):
self.current_fake_cone_id = 0
elif key == ord('r'):
self.use_for_registration = not self.use_for_registration
elif key == ord('\b'):
for feature in self.features[::-1]:
if feature[2] == self.current_cone_id:
del feature
else:
break
cv2.setWindowTitle('feature_selector_window',
f'color: {self._cone_color_representations[self.current_cone_id]}, '
f'fake color: {self._cone_color_representations[self.current_fake_cone_id]}, '
f'use for registration: {self.use_for_registration}')
while not self.killer.kill_now:
if self.mode in [FeatureSelectorMode.BOUNDARIES, FeatureSelectorMode.VEHICLE_POSE,
FeatureSelectorMode.SKIDPAD_CENTERPOINTS,
FeatureSelectorMode.ACCELERATION_CENTERPOINTS]:
cv2.imshow('feature_selector_window', self.image)
elif self.mode == FeatureSelectorMode.CONES:
break
key = cv2.waitKey(3000)
if key == ord('\r'):
break
cv2.destroyWindow('feature_selector_window')
[docs]class TrackCreator():
_cone_colors = ['gold', 'blue', 'orange', 'orange']
def __init__(self, input_file_path: str, killer: GracefulKiller, plot: bool, test_day: str, track_layout: str,
track_height: float, track_width: float, improve_world_cones: bool, centerpoints_width: float,
centerpoints: Optional[Tuple[str]], recover_centerpoints: bool, improve_centerpoints: bool,
recover_map_origin: bool, recover_world_cones: bool, recover_track_boundaries: bool,
manual_track: bool) -> None:
self.input_file_path = input_file_path
self.base_path = f'{self.input_file_path.rsplit(".", maxsplit=1)[0]}'
self.track_height = track_height
self.track_width = track_width
self.plot = plot
self.image: npt.NDArray
self.map_origin: List[Tuple[int, int]]
self.map_boundaries: List[Tuple[int, int]]
self.cones: List[Tuple[int, int, int]]
self.improve_world_cones = improve_world_cones
self.recover_map_origin = recover_map_origin
self.recover_world_cones = recover_world_cones
self.recover_track_boundaries = recover_track_boundaries
self.recover_centerpoints = recover_centerpoints
self.improve_centerpoints = improve_centerpoints
self.centerpoints_missions = centerpoints
self.centerpoints_width = centerpoints_width
self.test_day = test_day
self.track_layout = track_layout
self.manual_track = manual_track
self.killer = killer
[docs] def read_in_image(self):
if mimetypes.guess_type(self.input_file_path)[0].startswith('video'):
image_selector = ImageSelector(input_video_path=self.input_file_path,
killer=self.killer)
self.image = image_selector.select_image()
elif mimetypes.guess_type(self.input_file_path)[0].startswith('image'):
self.image = cv2.imread(self.input_file_path)
[docs] def select_track_boundaries(self):
if self.recover_track_boundaries is True:
if os.path.isfile(self.base_path + '/boundaries.npy') is True:
self.map_boundaries = np.load(self.base_path + '/boundaries.npy')
return
boundaries_selector = FeatureSelector(image=self.image, mode=FeatureSelectorMode.BOUNDARIES,
killer=self.killer)
boundaries_selector.run()
self.map_boundaries = np.array(boundaries_selector.features)
[docs] def select_map_origin(self):
if self.recover_map_origin is True:
if os.path.isfile(self.base_path + '/origin.npy') is True:
self.map_origin = np.load(self.base_path + '/origin.npy')
return
map_origin_selector = FeatureSelector(image=self.image, mode=FeatureSelectorMode.VEHICLE_POSE,
killer=self.killer)
map_origin_selector.run()
self.map_origin = np.array(map_origin_selector.features)
[docs] def select_cones(self):
if self.recover_world_cones is True:
if os.path.isfile(self.base_path + '/world_cones.npy') is True:
self.world_cones = np.load(self.base_path + '/world_cones.npy')
if self.improve_world_cones is False:
return
cone_selector = FeatureSelector(image=self.image, mode=FeatureSelectorMode.CONES,
killer=self.killer)
if self.improve_world_cones is True and hasattr(self, 'world_cones'):
cone_selector.recover_features(self.world_cones.tolist())
cone_selector.run()
self.world_cones = np.array(cone_selector.features)
[docs] def select_support_centerpoints(self):
if self.recover_centerpoints is True:
if os.path.isfile(self.base_path + '/support_centerpoints.npy') is True:
self.support_centerpoints = np.load(self.base_path + '/support_centerpoints.npy')
if self.improve_centerpoints is False:
return
if self.centerpoints_missions == 'skidpad':
mode = FeatureSelectorMode.SKIDPAD_CENTERPOINTS
elif self.centerpoints_missions == 'acceleration':
mode = FeatureSelectorMode.ACCELERATION_CENTERPOINTS
centerpoints_selector = FeatureSelector(image=self.image, mode=mode, killer=self.killer)
if self.improve_centerpoints is True and hasattr(self, 'support_centerpoints'):
centerpoints_selector.recover_features(self.support_centerpoints.tolist())
centerpoints_selector.run()
self.support_centerpoints = np.array(centerpoints_selector.features)
[docs] def calculate_map_coordinates_of_cones(self):
self.map_cones = cv2.perspectiveTransform(src=self.world_cones[np.newaxis, :, :2].astype(np.float32),
m=self.world_to_map_transform)[0]
[docs] def plot_results(self):
use_for_registration = self.world_cones[:, 4].astype(bool)
not_use_for_registation = np.logical_not(use_for_registration)
plt.scatter(self.map_cones[:, 0], self.map_cones[:, 1], marker='o', label='Fake Cone Color', facecolors='none',
edgecolors=np.choose(self.world_cones[:, 3].astype(int), self._cone_colors))
plt.scatter(self.map_cones[use_for_registration, 0], self.map_cones[use_for_registration, 1], marker='+', label='Cone Color, use for registration',
color=np.choose(self.world_cones[use_for_registration, 2].astype(int), self._cone_colors))
plt.scatter(self.map_cones[not_use_for_registation, 0], self.map_cones[not_use_for_registation, 1], marker='x', label='Cone color, dont use for registration',
color=np.choose(self.world_cones[not_use_for_registation, 2].astype(int), self._cone_colors))
if hasattr(self, 'centerpoints') is True:
plt.plot(self.centerpoints[:, 0], self.centerpoints[:, 1])
plt.axis('equal')
plt.legend()
plt.show()
[docs] def export_map(self):
output_dict = {}
# assign values to dictionary to use for JSON
output_dict = {}
output_dict['x'] = self.map_cones[:, 0].tolist()
output_dict['y'] = self.map_cones[:, 1].tolist()
output_dict['id'] = self.world_cones[:, 2].astype(int).tolist()
output_dict['faked_id'] = self.world_cones[:, 3].astype(int).tolist()
# output_dict['P'] = P.tolist()
output_dict['gps'] = None
output_dict['registration'] = self.world_cones[:, 4].astype(bool).tolist()
if hasattr(self, 'centerpoints') is True:
output_dict['centerpoints'] = {
'x': self.centerpoints[:, 0].tolist(),
'y': self.centerpoints[:, 1].tolist(),
'left_track_width': np.full(self.centerpoints.shape[0], self.centerpoints_width/2).tolist(),
'right_track_width': np.full(self.centerpoints.shape[0], self.centerpoints_width/2).tolist(),
'closed_track': False
}
Path(self.base_path).mkdir(parents=True, exist_ok=True)
with open(f'{self.base_path}/ground_truth.json', 'w') as output:
json.dump(output_dict, output, indent=4)
if self.manual_track is True:
with open( f'{base_path}/manual.json', 'w') as output:
json.dump(output_dict, output, indent=4)
if self.test_day is not None and self.track_layout is not None:
base_path = f'{os.path.abspath(os.path.dirname(__file__))}/../../../slam/plots/maps/{self.test_day}/{self.track_layout}'
Path(base_path).mkdir(parents=True, exist_ok=True)
with open( f'{base_path}/ground_truth.json', 'w') as output:
json.dump(output_dict, output, indent=4)
if self.manual_track is True:
with open( f'{base_path}/manual.json', 'w') as output:
json.dump(output_dict, output, indent=4)
[docs] def save_recovery(self):
Path(self.base_path).mkdir(parents=True, exist_ok=True)
np.save(self.base_path + '/boundaries.npy', self.map_boundaries)
np.save(self.base_path + '/origin.npy', self.map_origin)
np.save(self.base_path + '/world_cones.npy', self.world_cones)
if self.centerpoints_missions:
np.save(self.base_path + '/support_centerpoints.npy', self.support_centerpoints)
[docs] @staticmethod
def generate_and_move_base_circle(radius: float, points_n: int, angle: float, inverse: bool, offset: npt.NDArray) -> npt.NDArray:
def rtpairs(r, n):
for j in range(n):
yield r, j*(2 * np.pi / n)
rotation_matrix = np.array([[np.cos(angle), -np.sin(angle)], [np.sin(angle), np.cos(angle)]])
points = np.array([[r * np.cos(t), r * np.sin(t)] for r, t in rtpairs(radius, points_n)])
if inverse:
rotated_points = np.dot(points, rotation_matrix)[::-1]
else:
rotated_points = np.dot(points, rotation_matrix)
return rotated_points + offset
[docs] @staticmethod
def generate_circle_from_two_points(centerpoint: npt.NDArray, circlepoint: npt.NDArray,
inverse: bool) -> npt.NDArray:
distance = circlepoint - centerpoint
radius = np.linalg.norm(distance)
angle = np.arccos(distance[0]/radius)
return TrackCreator.generate_and_move_base_circle(radius=radius, points_n=80, angle=angle,
inverse=inverse, offset=centerpoint)
[docs] def calculate_skidpad_world_centerpoints(self):
left_circle = self.generate_circle_from_two_points(centerpoint=self.support_centerpoints[3],
circlepoint=self.support_centerpoints[1],
inverse=True)
right_circle = self.generate_circle_from_two_points(centerpoint=self.support_centerpoints[2],
circlepoint=self.support_centerpoints[1],
inverse=False)
acceleration_part = np.linspace(self.support_centerpoints[0], self.support_centerpoints[1], 80)
deceleration_part = np.linspace(self.support_centerpoints[1], self.support_centerpoints[4], 80)
return np.r_[acceleration_part, right_circle, right_circle,
left_circle, left_circle, deceleration_part]
[docs] def calculate_acceleration_world_centerpoints(self):
return np.linspace(self.support_centerpoints[0], self.support_centerpoints[1], 100)
[docs] def calculate_centerpoints(self):
world_centerpoints = None
if self.centerpoints_missions == 'skidpad':
world_centerpoints = self.calculate_skidpad_world_centerpoints()
elif self.centerpoints_missions == 'acceleration':
world_centerpoints = self.calculate_acceleration_world_centerpoints()
assert world_centerpoints is not None
self.centerpoints = cv2.perspectiveTransform(src=world_centerpoints[np.newaxis, :, :2].astype(np.float32),
m=self.world_to_map_transform)[0]
[docs] def run(self):
self.read_in_image()
self.select_track_boundaries()
self.select_map_origin()
self.calculate_perspective_transform()
self.calculate_world_to_map_transform()
self.select_cones()
self.calculate_map_coordinates_of_cones()
if self.centerpoints_missions:
self.select_support_centerpoints()
self.calculate_centerpoints()
if self.plot is True:
self.plot_results()
self.export_map()
self.save_recovery()
if __name__ == '__main__':
init()