Source code for cli.track_creator

import signal
from enum import Enum
from typing import Tuple, List, Optional
from pathlib import Path
import os.path

import json

import matplotlib.pyplot as plt
import numpy as np
import numpy.typing as npt
import cv2

import av
import mimetypes


PIXEL_PER_METER = 100


[docs]class GracefulKiller: kill_now = False def __init__(self): signal.signal(signal.SIGINT, self.exit_gracefully) signal.signal(signal.SIGTERM, self.exit_gracefully)
[docs] def exit_gracefully(self, *args): self.kill_now = True
[docs]class ImageSelector(): def __init__(self, input_video_path: str, killer: GracefulKiller) -> None: self.input_video_path = input_video_path self.killer = killer
[docs] def find_image_i(self, image_i: int, key: str): if key == 81: image_i = max(0, image_i - 1) elif key == 83: image_i = max(0, image_i + 1) elif key == 85: image_i = max(0, image_i + 5) elif key == 86: image_i = max(0, image_i - 5) return image_i
[docs] def select_image(self) -> npt.NDArray: cv2.namedWindow('image_selector_window', cv2.WINDOW_NORMAL) cv2.setWindowTitle('image_selector_window', 'Select the image to use for creating the plot. ' 'Use arrow and picture keys to choose a suitable image') with av.open(self.input_video_path) as container: key_stream = container.streams.video[0] key_stream.codec_context.skip_frame = "NONKEY" key_frame_generator = container.decode(key_stream) key_images = [] key_image_i = 0 while self.killer.kill_now is False: while key_image_i + 1 > len(key_images): frame = next(key_frame_generator) key_images.append(frame.to_ndarray(format='bgr24')) cv2.imshow('image_selector_window', key_images[key_image_i]) key = cv2.waitKey() if key == ord('\r'): break elif key is not None: key_image_i = self.find_image_i(image_i=key_image_i, key=key) cv2.destroyWindow('image_selector_window') return key_images[key_image_i]
[docs]class FeatureSelectorMode(Enum): BOUNDARIES = 1 VEHICLE_POSE = 2 CONES = 3 SKIDPAD_CENTERPOINTS = 4 ACCELERATION_CENTERPOINTS = 5
[docs]class FeatureSelector(): _cone_colors = [(0, 255, 255), (255, 0, 0), (0, 165, 255), (0, 0, 255)] _cone_color_representations = ['yellow', 'blue', 'small orange', 'big orange'] def __init__(self, image: npt.NDArray, mode: FeatureSelectorMode, killer: GracefulKiller) -> None: self.mode = mode self.original_image = image.copy() self.plot_order() self.image = self.original_image.copy() self.features = [] self.finished = False self.current_cone_id = 0 self.current_fake_cone_id = 0 self.use_for_registration = True self.killer = killer
[docs] def plot_order(self): if self.mode == FeatureSelectorMode.BOUNDARIES: rows, cols = self.original_image.shape[:2] cv2.arrowedLine(self.original_image, (20, 20), (cols - 50, 20), color=(130, 130, 0), thickness=5, tipLength=0.01) cv2.arrowedLine(self.original_image, (cols - 20, 20), (cols - 20, rows - 50), color=(130, 130, 0), thickness=5, tipLength=0.01) cv2.arrowedLine(self.original_image, (cols - 20, rows - 20), (50, rows - 20), color=(130, 130, 0), thickness=5, tipLength=0.01)
[docs] def plot_features(self): self.image = self.original_image.copy() for feature in self.features: if self.mode == FeatureSelectorMode.BOUNDARIES: cv2.drawMarker(self.image, feature, (0, 255, 0), markerType=cv2.MARKER_STAR, markerSize=40, thickness=2, line_type=cv2.LINE_AA) elif self.mode == FeatureSelectorMode.VEHICLE_POSE: cv2.circle(self.image, feature, radius=3, color=(0, 0, 255), thickness=-1) elif self.mode == FeatureSelectorMode.CONES: if feature[4] is True: cv2.drawMarker(self.image, feature[:2], self._cone_colors[feature[2]], markerType=cv2.MARKER_CROSS, markerSize=20, thickness=2, line_type=cv2.LINE_AA) else: cv2.drawMarker(self.image, feature[:2], self._cone_colors[feature[2]], markerType=cv2.MARKER_TILTED_CROSS, markerSize=20, thickness=2, line_type=cv2.LINE_AA) cv2.circle(img=self.image, center=feature[:2], color=self._cone_colors[feature[3]], radius=10, thickness=2, lineType=cv2.LINE_AA) if self.mode == FeatureSelectorMode.SKIDPAD_CENTERPOINTS: if len(self.features) == 1: cv2.circle(self.image, self.features[0], radius=5, color=(0, 0, 255), thickness=-1) if len(self.features) >= 2: cv2.arrowedLine(self.image, self.features[0], self.features[1], color=(0, 0, 255), thickness=3) if len(self.features) >= 3: cv2.circle(self.image, self.features[2], color=(0, 0, 255), thickness=3, radius=int(np.linalg.norm(np.array(self.features[2]) - np.array(self.features[1])))) if len(self.features) >= 4: cv2.circle(self.image, self.features[3], color=(0, 0, 255), thickness=3, radius=int(np.linalg.norm(np.array(self.features[3]) - np.array(self.features[1])))) if len(self.features) >= 5: cv2.arrowedLine(self.image, self.features[1], self.features[4], color=(0, 0, 255), thickness=3) if self.mode == FeatureSelectorMode.ACCELERATION_CENTERPOINTS: if len(self.features) == 1: cv2.circle(self.image, self.features[0], radius=5, color=(0, 0, 255), thickness=-1) if len(self.features) >= 2: cv2.arrowedLine(self.image, self.features[0], self.features[1], color=(0, 0, 255), thickness=3)
[docs] def delete_features_in_rectangle(self, position1: Tuple[int, int], position2: Tuple[int, int]): if self.features: features = np.array(self.features)[:, :2] y_min = min(position1[1], position2[1]) x_min = min(position1[0], position2[0]) y_max = max(position1[1], position2[1]) x_max = max(position1[0], position2[0]) keep = np.logical_not(np.all(np.c_[features > (x_min, y_min), features < (x_max, y_max)], axis=1)) self.features = [feature for keep_feature, feature in zip(keep, self.features) if keep_feature]
[docs] def add_feature(self, event, x, y, flags, param): if self.finished is False: if event == cv2.EVENT_MBUTTONDOWN: self.last_position = (x, y) elif event == cv2.EVENT_MBUTTONUP and self.last_position is not None: last_position = self.last_position self.last_position = None self.delete_features_in_rectangle(last_position, (x, y)) self.plot_features() if event == cv2.EVENT_LBUTTONDBLCLK: if self.mode == FeatureSelectorMode.BOUNDARIES: cv2.drawMarker(self.image, (x, y), (0, 255, 0), markerType=cv2.MARKER_STAR, markerSize=40, thickness=2, line_type=cv2.LINE_AA) self.features.append((x, y)) if len(self.features) == 4: self.finished = True elif self.mode == FeatureSelectorMode.VEHICLE_POSE: cv2.circle(self.image, (x, y), radius=3, color=(0, 0, 255), thickness=-1) self.features.append((x, y)) if len(self.features) == 2: cv2.arrowedLine(self.image, self.features[0], self.features[1], color=(0, 0, 255), thickness=2) self.finished = True elif self.mode == FeatureSelectorMode.CONES: self.features.append((x, y, self.current_cone_id, self.current_fake_cone_id, self.use_for_registration)) self.plot_features() elif self.mode == FeatureSelectorMode.SKIDPAD_CENTERPOINTS: self.features.append((x, y)) self.plot_features() if len(self.features) == 5: self.finished = True elif self.mode == FeatureSelectorMode.ACCELERATION_CENTERPOINTS: self.features.append((x, y)) self.plot_features() if len(self.features) == 2: self.finished = True
[docs] def recover_features(self, features: List[Tuple[int, int, int, int, int]]): if self.mode == FeatureSelectorMode.CONES: self.features = [[*feature[:4], feature[4] == 1] for feature in features] self.plot_features() elif self.mode in [FeatureSelectorMode.SKIDPAD_CENTERPOINTS, FeatureSelectorMode.ACCELERATION_CENTERPOINTS]: self.features = features self.plot_features()
[docs] def run(self): cv2.namedWindow('feature_selector_window', cv2.WINDOW_NORMAL) cv2.setMouseCallback('feature_selector_window', self.add_feature) if self.mode == FeatureSelectorMode.BOUNDARIES: cv2.setWindowTitle('feature_selector_window', 'Select the track boundaries according to the visualized order') elif self.mode == FeatureSelectorMode.VEHICLE_POSE: cv2.setWindowTitle('feature_selector_window', 'Select initial position of the vehicle: Centerpoint of rear axle ' 'and some centerpoint in the front') elif self.mode == FeatureSelectorMode.SKIDPAD_CENTERPOINTS: cv2.setWindowTitle('feature_selector_window', 'Select keypoints for skidpad centerpoints: First point, centerpoint of finish line, ' 'circle center of right and left cirle and last point') elif self.mode == FeatureSelectorMode.ACCELERATION_CENTERPOINTS: cv2.setWindowTitle('feature_selector_window', 'Select keypoints for acceleration centerpoints: First and last point') while not self.killer.kill_now and not self.finished: cv2.imshow('feature_selector_window', self.image) key = cv2.waitKey(50) if key == ord('\b') and self.mode != FeatureSelectorMode.CONES: if self.features: del self.features[-1] self.plot_features() if self.mode == FeatureSelectorMode.CONES: if key == ord('\r'): break elif key == ord('y'): self.current_cone_id = 0 self.current_fake_cone_id = 0 elif key == ord('b'): self.current_cone_id = 1 self.current_fake_cone_id = 1 elif key == ord('o'): self.current_cone_id = 2 self.current_fake_cone_id = 2 elif key == ord('f'): self.current_cone_id = 3 self.current_fake_cone_id = 3 elif key == ord('p'): self.current_fake_cone_id = 1 elif key == ord('x'): self.current_fake_cone_id = 0 elif key == ord('r'): self.use_for_registration = not self.use_for_registration elif key == ord('\b'): for feature in self.features[::-1]: if feature[2] == self.current_cone_id: del feature else: break cv2.setWindowTitle('feature_selector_window', f'color: {self._cone_color_representations[self.current_cone_id]}, ' f'fake color: {self._cone_color_representations[self.current_fake_cone_id]}, ' f'use for registration: {self.use_for_registration}') while not self.killer.kill_now: if self.mode in [FeatureSelectorMode.BOUNDARIES, FeatureSelectorMode.VEHICLE_POSE, FeatureSelectorMode.SKIDPAD_CENTERPOINTS, FeatureSelectorMode.ACCELERATION_CENTERPOINTS]: cv2.imshow('feature_selector_window', self.image) elif self.mode == FeatureSelectorMode.CONES: break key = cv2.waitKey(3000) if key == ord('\r'): break cv2.destroyWindow('feature_selector_window')
[docs]class TrackCreator(): _cone_colors = ['gold', 'blue', 'orange', 'orange'] def __init__(self, input_file_path: str, killer: GracefulKiller, plot: bool, test_day: str, track_layout: str, track_height: float, track_width: float, improve_world_cones: bool, centerpoints_width: float, centerpoints: Optional[Tuple[str]], recover_centerpoints: bool, improve_centerpoints: bool, recover_map_origin: bool, recover_world_cones: bool, recover_track_boundaries: bool, manual_track: bool) -> None: self.input_file_path = input_file_path self.base_path = f'{self.input_file_path.rsplit(".", maxsplit=1)[0]}' self.track_height = track_height self.track_width = track_width self.plot = plot self.image: npt.NDArray self.map_origin: List[Tuple[int, int]] self.map_boundaries: List[Tuple[int, int]] self.cones: List[Tuple[int, int, int]] self.improve_world_cones = improve_world_cones self.recover_map_origin = recover_map_origin self.recover_world_cones = recover_world_cones self.recover_track_boundaries = recover_track_boundaries self.recover_centerpoints = recover_centerpoints self.improve_centerpoints = improve_centerpoints self.centerpoints_missions = centerpoints self.centerpoints_width = centerpoints_width self.test_day = test_day self.track_layout = track_layout self.manual_track = manual_track self.killer = killer
[docs] def read_in_image(self): if mimetypes.guess_type(self.input_file_path)[0].startswith('video'): image_selector = ImageSelector(input_video_path=self.input_file_path, killer=self.killer) self.image = image_selector.select_image() elif mimetypes.guess_type(self.input_file_path)[0].startswith('image'): self.image = cv2.imread(self.input_file_path)
[docs] def select_track_boundaries(self): if self.recover_track_boundaries is True: if os.path.isfile(self.base_path + '/boundaries.npy') is True: self.map_boundaries = np.load(self.base_path + '/boundaries.npy') return boundaries_selector = FeatureSelector(image=self.image, mode=FeatureSelectorMode.BOUNDARIES, killer=self.killer) boundaries_selector.run() self.map_boundaries = np.array(boundaries_selector.features)
[docs] def select_map_origin(self): if self.recover_map_origin is True: if os.path.isfile(self.base_path + '/origin.npy') is True: self.map_origin = np.load(self.base_path + '/origin.npy') return map_origin_selector = FeatureSelector(image=self.image, mode=FeatureSelectorMode.VEHICLE_POSE, killer=self.killer) map_origin_selector.run() self.map_origin = np.array(map_origin_selector.features)
[docs] def select_cones(self): if self.recover_world_cones is True: if os.path.isfile(self.base_path + '/world_cones.npy') is True: self.world_cones = np.load(self.base_path + '/world_cones.npy') if self.improve_world_cones is False: return cone_selector = FeatureSelector(image=self.image, mode=FeatureSelectorMode.CONES, killer=self.killer) if self.improve_world_cones is True and hasattr(self, 'world_cones'): cone_selector.recover_features(self.world_cones.tolist()) cone_selector.run() self.world_cones = np.array(cone_selector.features)
[docs] def select_support_centerpoints(self): if self.recover_centerpoints is True: if os.path.isfile(self.base_path + '/support_centerpoints.npy') is True: self.support_centerpoints = np.load(self.base_path + '/support_centerpoints.npy') if self.improve_centerpoints is False: return if self.centerpoints_missions == 'skidpad': mode = FeatureSelectorMode.SKIDPAD_CENTERPOINTS elif self.centerpoints_missions == 'acceleration': mode = FeatureSelectorMode.ACCELERATION_CENTERPOINTS centerpoints_selector = FeatureSelector(image=self.image, mode=mode, killer=self.killer) if self.improve_centerpoints is True and hasattr(self, 'support_centerpoints'): centerpoints_selector.recover_features(self.support_centerpoints.tolist()) centerpoints_selector.run() self.support_centerpoints = np.array(centerpoints_selector.features)
[docs] @staticmethod def chain_perspective_transforms(first_transform: npt.NDArray, second_transform: npt.NDArray): if first_transform.shape[0] == 2: first_transform = np.r_[first_transform, np.array([[0, 0, 1]])] if second_transform.shape[0] == 2: second_transform = np.r_[second_transform, np.array([[0, 0, 1]])] return second_transform @ first_transform
[docs] def calculate_perspective_transform(self): cols = int(self.track_width * PIXEL_PER_METER) rows = int(self.track_height * PIXEL_PER_METER) pts1 = np.float32([[0, 0], [cols, 0], [cols, rows], [0, rows]]) self.perspective_transforms = cv2.getPerspectiveTransform(pts1, self.map_boundaries.astype(np.float32))
[docs] def calculate_world_to_map_transform(self): inverse_perspective_transform = np.linalg.inv(self.perspective_transforms) projected_map_origin = cv2.perspectiveTransform(src=self.map_origin.astype(np.float64)[np.newaxis], m=inverse_perspective_transform)[0] angle = np.arctan2(projected_map_origin[0, 1] - projected_map_origin[1, 1], projected_map_origin[0, 0] - projected_map_origin[1, 0]) rotation = np.array([[np.cos(angle), -np.sin(angle)], [np.sin(angle), np.cos(angle)]]) map_points = np.array([[0, 0], [0, 1], [-1, 0]], dtype=np.float32) projected_world_points = np.empty_like(map_points) projected_world_points[:] = projected_map_origin[0].copy()[np.newaxis] projected_world_points[1] += (rotation @ [0, PIXEL_PER_METER]) projected_world_points[2] += (rotation @ [PIXEL_PER_METER, 0]) map_to_projected_world_transform = cv2.getAffineTransform(map_points, projected_world_points) map_to_world_transform = self.chain_perspective_transforms( map_to_projected_world_transform, self.perspective_transforms) self.world_to_map_transform = np.linalg.inv(map_to_world_transform)
[docs] def calculate_map_coordinates_of_cones(self): self.map_cones = cv2.perspectiveTransform(src=self.world_cones[np.newaxis, :, :2].astype(np.float32), m=self.world_to_map_transform)[0]
[docs] def plot_results(self): use_for_registration = self.world_cones[:, 4].astype(bool) not_use_for_registation = np.logical_not(use_for_registration) plt.scatter(self.map_cones[:, 0], self.map_cones[:, 1], marker='o', label='Fake Cone Color', facecolors='none', edgecolors=np.choose(self.world_cones[:, 3].astype(int), self._cone_colors)) plt.scatter(self.map_cones[use_for_registration, 0], self.map_cones[use_for_registration, 1], marker='+', label='Cone Color, use for registration', color=np.choose(self.world_cones[use_for_registration, 2].astype(int), self._cone_colors)) plt.scatter(self.map_cones[not_use_for_registation, 0], self.map_cones[not_use_for_registation, 1], marker='x', label='Cone color, dont use for registration', color=np.choose(self.world_cones[not_use_for_registation, 2].astype(int), self._cone_colors)) if hasattr(self, 'centerpoints') is True: plt.plot(self.centerpoints[:, 0], self.centerpoints[:, 1]) plt.axis('equal') plt.legend() plt.show()
[docs] def export_map(self): output_dict = {} # assign values to dictionary to use for JSON output_dict = {} output_dict['x'] = self.map_cones[:, 0].tolist() output_dict['y'] = self.map_cones[:, 1].tolist() output_dict['id'] = self.world_cones[:, 2].astype(int).tolist() output_dict['faked_id'] = self.world_cones[:, 3].astype(int).tolist() # output_dict['P'] = P.tolist() output_dict['gps'] = None output_dict['registration'] = self.world_cones[:, 4].astype(bool).tolist() if hasattr(self, 'centerpoints') is True: output_dict['centerpoints'] = { 'x': self.centerpoints[:, 0].tolist(), 'y': self.centerpoints[:, 1].tolist(), 'left_track_width': np.full(self.centerpoints.shape[0], self.centerpoints_width/2).tolist(), 'right_track_width': np.full(self.centerpoints.shape[0], self.centerpoints_width/2).tolist(), 'closed_track': False } Path(self.base_path).mkdir(parents=True, exist_ok=True) with open(f'{self.base_path}/ground_truth.json', 'w') as output: json.dump(output_dict, output, indent=4) if self.manual_track is True: with open( f'{base_path}/manual.json', 'w') as output: json.dump(output_dict, output, indent=4) if self.test_day is not None and self.track_layout is not None: base_path = f'{os.path.abspath(os.path.dirname(__file__))}/../../../slam/plots/maps/{self.test_day}/{self.track_layout}' Path(base_path).mkdir(parents=True, exist_ok=True) with open( f'{base_path}/ground_truth.json', 'w') as output: json.dump(output_dict, output, indent=4) if self.manual_track is True: with open( f'{base_path}/manual.json', 'w') as output: json.dump(output_dict, output, indent=4)
[docs] def save_recovery(self): Path(self.base_path).mkdir(parents=True, exist_ok=True) np.save(self.base_path + '/boundaries.npy', self.map_boundaries) np.save(self.base_path + '/origin.npy', self.map_origin) np.save(self.base_path + '/world_cones.npy', self.world_cones) if self.centerpoints_missions: np.save(self.base_path + '/support_centerpoints.npy', self.support_centerpoints)
[docs] @staticmethod def generate_and_move_base_circle(radius: float, points_n: int, angle: float, inverse: bool, offset: npt.NDArray) -> npt.NDArray: def rtpairs(r, n): for j in range(n): yield r, j*(2 * np.pi / n) rotation_matrix = np.array([[np.cos(angle), -np.sin(angle)], [np.sin(angle), np.cos(angle)]]) points = np.array([[r * np.cos(t), r * np.sin(t)] for r, t in rtpairs(radius, points_n)]) if inverse: rotated_points = np.dot(points, rotation_matrix)[::-1] else: rotated_points = np.dot(points, rotation_matrix) return rotated_points + offset
[docs] @staticmethod def generate_circle_from_two_points(centerpoint: npt.NDArray, circlepoint: npt.NDArray, inverse: bool) -> npt.NDArray: distance = circlepoint - centerpoint radius = np.linalg.norm(distance) angle = np.arccos(distance[0]/radius) return TrackCreator.generate_and_move_base_circle(radius=radius, points_n=80, angle=angle, inverse=inverse, offset=centerpoint)
[docs] def calculate_skidpad_world_centerpoints(self): left_circle = self.generate_circle_from_two_points(centerpoint=self.support_centerpoints[3], circlepoint=self.support_centerpoints[1], inverse=True) right_circle = self.generate_circle_from_two_points(centerpoint=self.support_centerpoints[2], circlepoint=self.support_centerpoints[1], inverse=False) acceleration_part = np.linspace(self.support_centerpoints[0], self.support_centerpoints[1], 80) deceleration_part = np.linspace(self.support_centerpoints[1], self.support_centerpoints[4], 80) return np.r_[acceleration_part, right_circle, right_circle, left_circle, left_circle, deceleration_part]
[docs] def calculate_acceleration_world_centerpoints(self): return np.linspace(self.support_centerpoints[0], self.support_centerpoints[1], 100)
[docs] def calculate_centerpoints(self): world_centerpoints = None if self.centerpoints_missions == 'skidpad': world_centerpoints = self.calculate_skidpad_world_centerpoints() elif self.centerpoints_missions == 'acceleration': world_centerpoints = self.calculate_acceleration_world_centerpoints() assert world_centerpoints is not None self.centerpoints = cv2.perspectiveTransform(src=world_centerpoints[np.newaxis, :, :2].astype(np.float32), m=self.world_to_map_transform)[0]
[docs] def run(self): self.read_in_image() self.select_track_boundaries() self.select_map_origin() self.calculate_perspective_transform() self.calculate_world_to_map_transform() self.select_cones() self.calculate_map_coordinates_of_cones() if self.centerpoints_missions: self.select_support_centerpoints() self.calculate_centerpoints() if self.plot is True: self.plot_results() self.export_map() self.save_recovery()
if __name__ == '__main__': init()