Source code for cli.drone_visualization

import json
import os
from typing import Optional, Tuple, List
from pathlib import Path
import progressbar
import signal
from enum import Enum
from fractions import Fraction
import click

from matplotlib.lines import Line2D
from matplotlib.backend_bases import PickEvent
import matplotlib.pyplot as plt

import numpy as np
import numpy.typing as npt
import pandas as pd
import cv2
import av
import av.datasets

import rosbag

PIXEL_PER_METER = 1000
N_STDDEV = 1
ALPHA = 0.2
LEGEND_TEXT_SCALE = 3
LEGEND_TEXT_DISTANCE = 15

MAX_PNEUMATIC_PRESSURE = 3
MAX_VELOCITY = 100
MAX_TORQUE = 280
MAX_ACCELERATION = 2 * 9.81
MAX_STEERING_WHEEL_ANGLE = 115

CONTROL_TIME_WINDOW = 1.5


[docs]def generate_progressbar(name: str, data_length: int, variable_widgets: List[progressbar.Variable] = []) -> progressbar.ProgressBar: """ Generates a progressbar object with the given data length with more information than the standard one. Parameters ---------- data_length : int Length of data points to process. Returns ------- progressbar.ProgressBar Progressbar object with the given data length with more information than the standard one. """ widgets1 = [ name, progressbar.Percentage(), ' ', progressbar.SimpleProgress(), ' ', progressbar.Bar(), ' '] # progressbar.Variable('prediction_step'), ' ', # noqa: E800 widgets2 = [ progressbar.Timer(), ' ', progressbar.ETA() ] widgets = widgets1 + variable_widgets + widgets2 return progressbar.ProgressBar(max_value=data_length, widgets=widgets, redirect_stdout=True)
[docs]class GracefulKiller: kill_now = False def __init__(self): signal.signal(signal.SIGINT, self.exit_gracefully) signal.signal(signal.SIGTERM, self.exit_gracefully)
[docs] def exit_gracefully(self, *args): self.kill_now = True
[docs]class OffsetFinder(): def __init__(self, video_path: str, input_rosbag_path: str, killer: GracefulKiller) -> float: self.video_path = video_path self.input_rosbag_path = input_rosbag_path self.rosbag_start_time = None self.video_start_time = None self.offset = None self.video_start_pts = None self.video_start_index = None self.killer = killer
[docs] def find_image_i(self, image_i: int, key: str): if key == 81: image_i = max(0, image_i - 1) elif key == 83: image_i = max(0, image_i + 1) elif key == 85: image_i = max(0, image_i + 5) elif key == 86: image_i = max(0, image_i - 5) return image_i
[docs] def calculate_offset(self): if self.video_start_time is not None and self.rosbag_start_time is not None: self.offset = - (self.rosbag_start_time - self.video_start_time)
[docs] def find_video_start(self): cv2.namedWindow('offset_finder_window', cv2.WINDOW_NORMAL) with av.open(self.video_path) as container: key_stream = container.streams.video[0] key_stream.codec_context.skip_frame = "NONKEY" stream = container.streams.video[0] key_frame_generator = container.decode(key_stream) key_images = [] key_frame_pts = [] key_image_i = 0 while self.killer.kill_now is False: while key_image_i + 1 > len(key_images): frame = next(key_frame_generator) key_images.append(frame.to_ndarray(format='bgr24')) key_frame_pts.append(frame.pts) cv2.imshow('offset_finder_window', key_images[key_image_i]) key = cv2.waitKey() if key == ord('\r'): break elif key is not None: key_image_i = self.find_image_i(image_i=key_image_i, key=key) stream.codec_context.skip_frame = "DEFAULT" assert key_image_i > 0 container.seek(offset=key_frame_pts[key_image_i - 1], stream=stream) image_i = 0 images = [] frame_pts = [] frame_generator = container.decode(stream) while self.killer.kill_now is False: while image_i + 1 > len(images): frame = next(frame_generator) images.append(frame.to_ndarray(format='bgr24')) frame_pts.append(frame.pts) cv2.imshow('offset_finder_window', images[image_i]) key = cv2.waitKey() if key == ord('\r'): break elif key is not None: image_i = self.find_image_i(image_i=image_i, key=key) if self.killer.kill_now is False: assert image_i > 0 self.video_start_time = frame.time self.video_start_pts = frame.pts self.video_start_index = frame.index cv2.destroyWindow('offset_finder_window')
[docs] def select_rosbag_start_time(self, event: PickEvent): if isinstance(event.artist, Line2D): x, y = event.mouseevent.xdata, event.mouseevent.ydata self.ax.scatter(x, y, color='r', s=500, marker='x') self.fig.canvas.draw() self.rosbag_start_time = x
[docs] def find_rosbag_start(self): if self.killer.kill_now is True: return bag = rosbag.Bag(self.input_rosbag_path) input_rosbag_start = bag.get_start_time() first_movement: Optional[float] = None messages = [] with progressbar.ProgressBar(max_value=bag.get_message_count()) as bar: bar.start() for topic, msg, t in bag.read_messages(): if first_movement is not None and t.to_sec() > first_movement + 1: break if topic == '/can/log/wheelspeed/rear': if first_movement is None and (abs(msg.right) > 100 or abs(msg.left) > 100): first_movement = t.to_sec() messages.append((t.to_sec(), 'wheelspeed_rear_right', msg.right)) messages.append((t.to_sec(), 'wheelspeed_rear_left', msg.left)) if topic == '/can/per/wheelspeed/front': if first_movement is None and (abs(msg.right) > 100 or abs(msg.left) > 100): first_movement = t.to_sec() messages.append((t.to_sec(), 'wheelspeed_front_right', msg.right)) messages.append((t.to_sec(), 'wheelspeed_front_left', msg.left)) bar.update(bar.value + 1) assert first_movement is not None dataframe = pd.DataFrame(messages, columns=['time', 'topic', 'data']) dataframe['time'] -= input_rosbag_start dataframe = dataframe[dataframe['time'] > first_movement - input_rosbag_start - 1] self.fig = plt.figure() self.ax = self.fig.add_subplot(111) self.ax.set_title('Pick point of first movement, afterwards close window', picker=True) topics = ['wheelspeed_rear_right', 'wheelspeed_rear_left', 'wheelspeed_front_right', 'wheelspeed_front_left'] colors = ['g', 'b', 'm', 'c'] for topic, color in zip(topics, colors): subdf = dataframe[dataframe['topic'] == topic] self.ax.plot(subdf['time'], subdf['data'], 'o', picker=5, label=topic, c=color) self.ax.legend() self.fig.canvas.mpl_connect('pick_event', self.select_rosbag_start_time) while self.rosbag_start_time is None and self.killer.kill_now is False: plt.show()
[docs]class FeatureSelectorMode(Enum): BOUNDARIES = 1 POSE_DEFINITION = 2 VEHICLE_MASK = 3
[docs]class FeatureSelector(): def __init__(self, image: npt.NDArray, killer: GracefulKiller, track_width: float, track_height: float, mode: FeatureSelectorMode) -> None: self.features = [] self.killer = killer self.finished = False self.track_width = track_width self.track_height = track_height self.mode = mode self.image = image self.plot_order() self.original_image = image.copy() cv2.namedWindow('feature_selector_window', cv2.WINDOW_NORMAL) cv2.setMouseCallback('feature_selector_window', self.add_feature) self.last_position: Optional[Tuple[int]] = None
[docs] def plot_order(self): if self.mode == FeatureSelectorMode.BOUNDARIES: rows, cols = self.image.shape[:2] cv2.arrowedLine(self.image, (20, 20), (cols - 50, 20), color=(130, 130, 0), thickness=5, tipLength=0.01) cv2.arrowedLine(self.image, (cols - 20, 20), (cols - 20, rows - 50), color=(130, 130, 0), thickness=5, tipLength=0.01) cv2.arrowedLine(self.image, (cols - 20, rows - 20), (50, rows - 20), color=(130, 130, 0), thickness=5, tipLength=0.01)
[docs] def plot_features(self): self.image = self.original_image.copy() for feature in self.features: if self.mode == FeatureSelectorMode.BOUNDARIES: cv2.drawMarker(self.image, feature, (0, 255, 0), markerType=cv2.MARKER_STAR, markerSize=40, thickness=2, line_type=cv2.LINE_AA) elif self.mode == FeatureSelectorMode.POSE_DEFINITION: cv2.circle(self.image, feature, radius=3, color=(0, 0, 255), thickness=-1) elif self.mode == FeatureSelectorMode.VEHICLE_MASK: cv2.polylines(img=self.image, pts=np.array(self.features)[np.newaxis], isClosed=True, color=(0, 0, 255), thickness=3, lineType=cv2.LINE_AA)
[docs] def show(self): while not self.killer.kill_now and not self.finished: cv2.imshow('feature_selector_window', self.image) key = cv2.waitKey(50) if key == ord('\b'): if self.features: del self.features[-1] self.plot_features() if self.mode == FeatureSelectorMode.VEHICLE_MASK and key == ord('\r'): break while not self.killer.kill_now: if self.mode == FeatureSelectorMode.BOUNDARIES: cv2.imshow('feature_selector_window', self.new_image) elif self.mode == FeatureSelectorMode.POSE_DEFINITION: cv2.imshow('feature_selector_window', self.image) elif self.mode == FeatureSelectorMode.VEHICLE_MASK: break key = cv2.waitKey(3000) if key == ord('\r'): break cv2.destroyWindow('feature_selector_window')
[docs] def add_feature(self, event, x, y, flags, param): if self.finished is False: if event == cv2.EVENT_MBUTTONDOWN: self.last_position = (x, y) elif event == cv2.EVENT_MBUTTONUP and self.last_position is not None: last_position = self.last_position self.last_position = None self.find_good_features_to_track_in_rectangle(last_position, (x, y)) if event == cv2.EVENT_LBUTTONDBLCLK: if self.mode == FeatureSelectorMode.BOUNDARIES: cv2.drawMarker(self.image, (x, y), (0, 255, 0), markerType=cv2.MARKER_STAR, markerSize=40, thickness=2, line_type=cv2.LINE_AA) self.features.append((x, y)) if len(self.features) == 4: self.transform_image_by_features() self.finished = True elif self.mode == FeatureSelectorMode.POSE_DEFINITION: cv2.circle(self.image, (x, y), radius=3, color=(0, 0, 255), thickness=-1) self.features.append((x, y)) if len(self.features) == 2: cv2.arrowedLine(self.image, self.features[0], self.features[1], color=(0, 0, 255), thickness=2) self.finished = True elif self.mode == FeatureSelectorMode.VEHICLE_MASK: self.features.append((x, y)) self.plot_features()
[docs] def transform_image_by_features(self): _, cols, _ = self.image.shape rows = int((self.track_height / self.track_width) * cols) pts1 = np.float32(self.features) pts2 = np.float32([[0, 0], [cols, 0], [cols, rows], [0, rows]]) M = cv2.getPerspectiveTransform(pts1, pts2) self.new_image = cv2.warpPerspective(self.original_image, M, (cols, rows))
[docs] def find_good_features_to_track_in_rectangle(self, point1, point2): y_min = min(point1[1], point2[1]) x_min = min(point1[0], point2[0]) x_max = max(point1[0], point2[0]) y_max = max(point1[1], point2[1]) mask = np.zeros(self.image.shape[:2], dtype=np.uint8) mask[y_min:y_max, x_min:x_max] = 255 corners = cv2.goodFeaturesToTrack(cv2.cvtColor(self.image, cv2.COLOR_BGR2GRAY), maxCorners=10, qualityLevel=0.0001, minDistance=4, mask=mask) cv2.rectangle(self.image, pt1=point1, pt2=point2, color=(255, 0, 0), thickness=4) if corners is not None: for corner in corners: x, y = np.int0(corner).ravel() cv2.circle(self.image, (x, y), 1, (0, 0, 255), -1)
[docs]class DroneVisualization(): _cone_colors = [(0, 255, 255), (255, 0, 0), (0, 165, 255), (0, 0, 255)] def __init__(self, input_rosbag: str, input_video: str, recalculate_transforms: bool, no_new_transforms: bool, track_height: float, track_width: float, recover_offset: bool, recover_initial_features: bool, killer: GracefulKiller, start: int, duration: Optional[int], end: Optional[int], output: str, save_recovery: bool, recover_initial_position: bool, recover_initial_vehicle_mask: bool, visualize_only_debug_information: bool) -> None: self.killer = killer self.input_video_path = input_video self.incrementing_transforms: Optional[npt.NDArray] = None self.total_transforms: Optional[npt.NDArray] = None self.video_pts_and_time_to_index: Optional[npt.NDArray] = None self.chained_transforms: Optional[npt.NDArray] = None # initial features transformed to world for every time step self.transformed_features: Optional[npt.NDArray] = None # from normalized world to world self.perspective_transforms: Optional[npt.NDArray] = None self.map_to_world_transforms: Optional[npt.NDArray] = None self.recalculate_transforms = recalculate_transforms self.no_new_transforms = no_new_transforms self.start = start self.duration = duration self.end = end self.input_rosbag_path = input_rosbag self.input_rosbag = rosbag.Bag(input_rosbag) self.input_rosbag_start = self.input_rosbag.get_start_time() self.rosbag_start: Optional[float] = None self.rosbag_end: Optional[float] = None self.save_recovery = save_recovery self.recover_offset = recover_offset self.offset = None self.video_start = start self.video_end = end self.input_video_start_pts = None self.input_video_start_index = None self.track_width = track_width self.track_height = track_height self.recover_initial_features = recover_initial_features self.initial_features: Optional[Optional.NDArray] = None self.recover_initial_position = recover_initial_position self.initial_position: Optional[Optional.NDArray] = None self.recover_initial_vehicle_mask = recover_initial_vehicle_mask self.initial_vehicle_mask: Optional[Optional.NDArray] = None self.rosbag_df = pd.DataFrame() self.legend = {} self.recover_information() self.output_suffix = output self.visualize_only_debug_information = visualize_only_debug_information
[docs] def recover_information(self): if self.recalculate_transforms is False: self.recover_transforms_from_storage() path = f'{self.input_video_path.rsplit(".", maxsplit=1)[0]}/recovery.json' if os.path.isfile(path): with open(path, 'r') as input: recovery = json.load(input) else: return if self.recover_offset is True and self.offset is None: if 'offset' in recovery: self.offset = recovery['offset'] if 'video_start_pts' in recovery: self.input_video_start_pts = recovery['video_start_pts'] self.input_video_start_index = recovery['video_start_index'] if self.recover_initial_features is True and self.initial_features is None: if 'initial_features' in recovery: self.initial_features = np.array(recovery['initial_features']) if self.recover_initial_position is True and self.initial_position is None: if 'initial_features' in recovery: self.initial_position = np.array(recovery['initial_position']) if self.recover_initial_vehicle_mask is True and self.initial_vehicle_mask is None: if 'initial_vehicle_mask' in recovery: self.initial_vehicle_mask = np.array(recovery['initial_vehicle_mask'], dtype=int)
[docs] def save_recovery_information(self): recovery = {} if self.offset is not None: recovery['offset'] = self.offset if self.input_video_start_pts is not None: recovery['video_start_pts'] = self.input_video_start_pts recovery['video_start_index'] = self.input_video_start_index if self.initial_features is not None: recovery['initial_features'] = self.initial_features.tolist() if self.initial_position is not None: recovery['initial_position'] = self.initial_position.tolist() if self.initial_vehicle_mask is not None: recovery['initial_vehicle_mask'] = self.initial_vehicle_mask.tolist() base_path = self.input_video_path.rsplit(".", maxsplit=1)[0] path = f'{base_path}/recovery.json' Path(base_path).mkdir(parents=True, exist_ok=True) with open(path, 'w') as output: json.dump(recovery, output, indent=4)
[docs] def run(self): if self.offset is None or self.input_video_start_pts is None or self.input_video_start_index is None: self.find_offset() if self.offset is not None: self.find_start_and_end() if self.initial_features is None: self.find_initial_features() if self.initial_vehicle_mask is None: self.find_initial_vehicle_mask() if self.no_new_transforms is False: self.calculate_transforms_from_video() if self.initial_position is None: self.find_initial_position() if self.save_recovery is True: print('Saving recovery information!') self.save_recovery_information() if self.total_transforms is not None: self.affine_transform_initial_features() if self.transformed_features is not None: self.calculate_perspective_transform() if self.transformed_features is not None: self.calculate_map_to_world_transform() if self.map_to_world_transforms is not None: self.import_rosbag() if self.transformed_features is not None: self.generate_output()
[docs] def recover_transforms_from_storage(self): base_path = self.input_video_path.rsplit('.', maxsplit=1)[0] path2 = f'{base_path}/total_transforms.npy' path3 = f'{base_path}/pts.npy' path4 = f'{base_path}/transform_points.npy' path5 = f'{base_path}/transform_status.npy' if os.path.isfile(path2): self.total_transforms = np.load(path2) self.video_pts_and_time_to_index = np.load(path3) self.transform_points = np.load(path4) self.transform_status = np.load(path5).astype(int) click.echo(f'Previously saved total transforms recovered: {self.total_transforms.shape[0]}') else: self.recalculate_transforms = True click.echo('No Previously saved incrementing transforms available')
[docs] def save_incrementing_transforms(self): base_path = self.input_video_path.rsplit('.', maxsplit=1)[0] Path(base_path).mkdir(parents=True, exist_ok=True) np.save(f'{base_path}/total_transforms.npy', self.total_transforms) np.save(f'{base_path}/pts.npy', self.video_pts_and_time_to_index) np.save(f'{base_path}/transform_points.npy', self.transform_points) np.save(f'{base_path}/transform_status.npy', self.transform_status)
[docs] def calculate_transforms_from_video(self): def show_frame_info(data: Optional[Tuple[av.VideoFrame, Optional[int], Optional[int]]]): if data is None: return '' frame, matched_corners, total_corners = data if matched_corners is not None: return f'#{frame.index}: {frame.time:.2f} s ({matched_corners}/{total_corners} corners matched)' return f'#{frame.index}: {frame.time:.2f} s' with av.open(self.input_video_path) as container: stream = container.streams.video[0] if self.recalculate_transforms is False and self.total_transforms.shape[0] == stream.frames: click.echo('Already recovered all transforms. Not calculting anymore') return first_frame = next(container.decode(stream)) first_gray = cv2.cvtColor(first_frame.to_ndarray(format='rgb24'), cv2.COLOR_RGB2GRAY) mask = np.full(first_gray.shape[:2], fill_value=255, dtype=np.uint8) cv2.fillPoly(img=mask, pts=self.initial_vehicle_mask[np.newaxis], color=(0), lineType=cv2.LINE_AA) first_pts = cv2.goodFeaturesToTrack(first_gray, maxCorners=80, qualityLevel=0.2, minDistance=200, mask=mask) curr_orig_pts = np.empty_like(first_pts) curr_orig_status = np.empty(first_pts.shape[0]) curr_orig_err = np.empty(first_pts.shape[0]) max_value = stream.frames if self.recalculate_transforms is False: max_value -= self.video_pts_and_time_to_index.shape[0] container.seek(offset=int(self.video_pts_and_time_to_index[-1, 0]), stream=stream) frame_generator = container.decode(stream) atleast_video_end = max(float(self.input_video_start_pts * stream.time_base), self.video_end) with click.progressbar(length=max_value, item_show_func=show_frame_info, label=f'TransformCalculation ({atleast_video_end:.2f})') as bar: if self.incrementing_transforms is not None: frame = next(frame_generator) while frame.pts < self.video_pts_and_time_to_index[-1, 0]: frame = next(frame_generator) if self.killer.kill_now is True: self.killer.kill_now = False return total_transforms = [] pts = [] points = [] statuses = [] else: frame = next(frame_generator) total_transforms = [np.array([[1., 0., 0.], [0., 1., 0.]])] pts = [(frame.pts, frame.time)] points = [first_pts] statuses = [np.ones(first_pts.shape[0])] bar.update(n_steps=1, current_item=(frame, None, None)) for frame in frame_generator: if self.killer.kill_now is True: self.killer.kill_now = False bar.finish() break pts.append((frame.pts, frame.time)) curr_gray = cv2.cvtColor(frame.to_ndarray(format='rgb24'), cv2.COLOR_RGB2GRAY) curr_orig_pts, curr_orig_status, curr_orig_err = cv2.calcOpticalFlowPyrLK(prevImg=first_gray, nextImg=curr_gray, prevPts=first_pts, nextPts=curr_orig_pts, status=curr_orig_status, err=curr_orig_err, maxLevel=3) points.append(curr_orig_pts.copy()) statuses.append(curr_orig_status.copy()[:, 0]) total_transforms.append(cv2.estimateAffine2D(from_=first_pts[curr_orig_status == 1], to=curr_orig_pts[curr_orig_status == 1], method=cv2.LMEDS)[0]) bar.update(n_steps=1, current_item=(frame, np.count_nonzero(curr_orig_status), curr_orig_status.shape[0])) if self.total_transforms is None: self.total_transforms = np.array(total_transforms) self.video_pts_and_time_to_index = np.array(pts) self.transform_points = np.stack(points, axis=0) self.transform_status = np.stack(statuses, axis=0).astype(int) else: self.total_transforms = np.r_[self.total_transforms, np.array(total_transforms)] self.video_pts_and_time_to_index = np.r_[self.video_pts_and_time_to_index, np.array(pts)] self.transform_points = np.r_[self.transform_points, np.array(points)] self.transform_status = np.r_[self.transform_status, np.stack(statuses, axis=0).astype(int)] self.save_incrementing_transforms()
[docs] @staticmethod def chain_affine_transforms(first_transform: npt.NDArray, second_transform: npt.NDArray): padding_array = np.array([[0, 0, 1]]) return (np.r_[second_transform, padding_array] * np.r_[first_transform, padding_array])[:2]
[docs] @staticmethod def chain_perspective_transforms(first_transform: npt.NDArray, second_transform: npt.NDArray): if first_transform.shape[0] == 2: first_transform = np.r_[first_transform, np.array([[0, 0, 1]])] if second_transform.shape[0] == 2: second_transform = np.r_[second_transform, np.array([[0, 0, 1]])] return second_transform @ first_transform
[docs] def chain_incrementing_transforms(self): self.chained_transforms = np.empty_like(self.incrementing_transforms) self.inverse_chained_transforms = np.empty_like(self.chained_transforms) with progressbar.ProgressBar(max_value=self.incrementing_transforms.shape[0]) as bar: for transform_i, transform in enumerate(self.total_transforms): self.chained_transforms[transform_i] = transform cv2.invertAffineTransform(transform, self.inverse_chained_transforms[transform_i]) bar.update(bar.value + 1) if self.killer.kill_now is True: self.killer.kill_now = False break self.chained_transforms = self.chained_transforms[:transform_i + 1] self.inverse_chained_transforms = self.inverse_chained_transforms[:transform_i + 1]
[docs] def find_initial_features(self): with av.open(self.input_video_path) as container: stream = container.streams.video[0] frame_generator = container.decode(stream) frame = next(frame_generator) image = frame.to_ndarray(format='bgr24') self.feature_selector = FeatureSelector(image=image, killer=self.killer, track_height=self.track_height, track_width=self.track_width, mode=FeatureSelectorMode.BOUNDARIES) self.feature_selector.show() self.initial_features = np.array(self.feature_selector.features)
[docs] def find_initial_vehicle_mask(self): with av.open(self.input_video_path) as container: stream = container.streams.video[0] frame_generator = container.decode(stream) frame = next(frame_generator) image = frame.to_ndarray(format='bgr24') self.feature_selector = FeatureSelector(image=image, killer=self.killer, track_height=self.track_height, track_width=self.track_width, mode=FeatureSelectorMode.VEHICLE_MASK) self.feature_selector.show() self.initial_vehicle_mask = np.array(self.feature_selector.features, dtype=int)
[docs] def find_initial_position(self): with av.open(self.input_video_path) as container: stream = container.streams.video[0] container.seek(offset=self.input_video_start_pts, stream=stream) frame_generator = container.decode(stream) frame = next(frame_generator) while frame.pts != self.input_video_start_pts: frame = next(frame_generator) image = frame.to_ndarray(format='bgr24') self.initial_pose_selector = FeatureSelector(image=image, killer=self.killer, track_height=self.track_height, track_width=self.track_width, mode=FeatureSelectorMode.POSE_DEFINITION) self.initial_pose_selector.show() self.initial_position = np.array(self.initial_pose_selector.features)
[docs] def find_offset(self): offset_finder = OffsetFinder(video_path=self.input_video_path, input_rosbag_path=self.input_rosbag_path, killer=self.killer) if self.offset is None or self.input_video_start_pts is None: offset_finder.find_video_start() self.input_video_start_pts = offset_finder.video_start_pts if self.input_video_start_index is None: self.find_input_video_start_index() if self.offset is None: offset_finder.find_rosbag_start() offset_finder.calculate_offset() if offset_finder.offset is not False: self.offset = offset_finder.offset
[docs] def find_input_video_start_index(self): assert self.input_video_start_pts is not None if self.video_pts_and_time_to_index is not None: index = np.argwhere(self.video_pts_and_time_to_index[:, 0] == self.input_video_start_pts) if index.size != 0: self.input_video_start_index = index[0, 0] return with av.open(self.input_video_path) as container: stream = container.streams.video[0] container.seek(offset=0, stream=stream) frame_generator = container.decode(stream) frame = next(frame_generator) with progressbar.ProgressBar(max_value=stream.frames) as bar: bar.start() while frame.pts != self.input_video_start_pts: frame = next(frame_generator) bar.update(bar.value + 1) bar.finish(dirty=True) self.input_video_start_index = frame.index
[docs] def find_start_and_end(self): if self.start + self.offset < 0: self.rosbag_start = self.input_rosbag_start - self.offset self.video_start = 0 else: self.rosbag_start = self.input_rosbag_start + self.start self.video_start = self.start + self.offset with av.open(self.input_video_path) as container: stream = container.streams.video[0] input_video_length = float(stream.time_base * stream.duration) duration = min(value for value in [self.duration, input_video_length - self.video_start, self.input_rosbag.get_end_time() - self.rosbag_start] if value is not None) self.rosbag_end = self.rosbag_start + duration self.video_end = self.video_start + duration
[docs] def affine_transform_initial_features(self): assert self.total_transforms is not None self.transformed_features = np.empty((self.total_transforms.shape[0], 4, 2)) with generate_progressbar(name='Transform initial features | ', data_length=self.total_transforms.shape[0]) as bar: for transform_i, transform in enumerate(self.total_transforms): for point_i, point in enumerate(self.initial_features): self.transformed_features[transform_i][point_i] = transform[:, :2] @ point + transform[:, 2] bar.update(bar.value + 1) self.transformed_features = self.transformed_features[:transform_i + 1]
[docs] def calculate_perspective_transform(self): assert self.transformed_features is not None cols = int(self.track_width * PIXEL_PER_METER) rows = int(self.track_height * PIXEL_PER_METER) pts1 = np.float32([[0, 0], [cols, 0], [cols, rows], [0, rows]]) self.perspective_transforms = np.empty((self.transformed_features.shape[0], 3, 3)) with generate_progressbar(name='Calculate perspective transform | ', data_length=self.transformed_features.shape[0]) as bar: for features_i, features in enumerate(self.transformed_features): self.perspective_transforms[features_i] = cv2.getPerspectiveTransform(pts1, features.astype(np.float32)) bar.update(bar.value + 1) self.perspective_transforms = self.perspective_transforms[:features_i + 1]
[docs] def calculate_map_to_world_transform(self): assert self.initial_position is not None assert self.perspective_transforms is not None assert len(self.perspective_transforms) - 1 > self.input_video_start_index inverse_perspective_transform = np.linalg.inv(self.perspective_transforms[self.input_video_start_index]) projected_initial_position = cv2.perspectiveTransform(src=self.initial_position.astype(np.float64)[np.newaxis], m=inverse_perspective_transform)[0] angle = np.arctan2(projected_initial_position[0, 1] - projected_initial_position[1, 1], projected_initial_position[0, 0] - projected_initial_position[1, 0]) rotation = np.array([[np.cos(angle), -np.sin(angle)], [np.sin(angle), np.cos(angle)]]) map_points = np.array([[0, 0], [0, 1], [-1, 0]], dtype=np.float32) projected_world_points = np.empty_like(map_points) projected_world_points[:] = projected_initial_position[0].copy()[np.newaxis] projected_world_points[1] += (rotation @ [0, PIXEL_PER_METER]) projected_world_points[2] += (rotation @ [PIXEL_PER_METER, 0]) self.map_to_projected_world_transform = cv2.getAffineTransform(map_points, projected_world_points) self.map_to_world_transforms = np.empty_like(self.perspective_transforms) for transform_i, transform in enumerate(self.perspective_transforms): self.map_to_world_transforms[transform_i] = self.chain_perspective_transforms( self.map_to_projected_world_transform, transform) self.map_to_world_transforms = self.map_to_world_transforms[:transform_i + 1]
[docs] def transform_map_coordinated_to_world_coordinates(self, map_coordinates: npt.NDArray, frame_i: int): reduce = False if map_coordinates.ndim == 2: map_coordinates = map_coordinates[np.newaxis] reduce = True world_coordinates = cv2.perspectiveTransform(src=map_coordinates, m=self.map_to_world_transforms[frame_i]) if reduce is True: return world_coordinates[0] return world_coordinates
[docs] def import_rosbag(self): data = [] with generate_progressbar(name='Importing Rosbag | ', data_length=self.input_rosbag.get_message_count()) as bar: bar.start() perception_disabled = False for topic, msg, t in self.input_rosbag.read_messages(): if t.to_sec() > 1667726358.271653935: perception_disabled = True if t.to_sec() > self.rosbag_end + 2: break if topic == '/slam/vehicle_pose': data.append((msg.header.stamp.to_sec() - self.rosbag_start, '/slam/vehicle_pose.pose', np.array([msg.x, msg.y, msg.psi]))) data.append((msg.header.stamp.to_sec() - self.rosbag_start, '/slam/vehicle_pose.v_x', np.array([msg.v_x]))) data.append((msg.header.stamp.to_sec() - self.rosbag_start, '/slam/vehicle_pose.pose.uncertainty', np.array(msg.P).reshape((4, 4)))) elif topic == '/slam/vehicle_pose/main': data.append((msg.header.stamp.to_sec() - self.rosbag_start, '/slam/vehicle_pose/main.pose', np.array([msg.x, msg.y, msg.psi]))) data.append((msg.header.stamp.to_sec() - self.rosbag_start, '/slam/vehicle_pose/main.v_x', np.array([msg.v_x]))) data.append((msg.header.stamp.to_sec() - self.rosbag_start, '/slam/vehicle_pose/main.pose.uncertainty', np.array(msg.P).reshape((4, 4)))) elif topic == '/slam/landmarks': landmark_positions = np.array([(cone.x, cone.y, cone.id) for cone in msg.cones]) landmark_uncertainty = np.array([np.reshape(cone.covariance, (2, 2)) for cone in msg.cones]) data.append((msg.header.stamp.to_sec() - self.rosbag_start, '/slam/landmarks', (landmark_positions, landmark_uncertainty))) elif topic == '/slam/disable_perception': perception_disabled = msg.disable_perception elif topic == '/filtering/centerpoints': header = msg.header if msg.image_header.stamp.to_sec() == 0 else msg.image_header data.append((header.stamp.to_sec() - self.rosbag_start, '/filtering/centerpoints', (np.c_[msg.x, msg.y], msg.closed_track))) elif topic == '/motion_planning/trajectory': header = (msg.header if perception_disabled is True or msg.image_header.stamp.to_sec() == 0 else msg.image_header) data.append((header.stamp.to_sec() - self.rosbag_start, '/motion_planning/trajectory', (np.c_[msg.x, msg.y, msg.v], msg.closed))) elif topic == '/control/predicted_states': header = (msg.header if perception_disabled is True or msg.image_header.stamp.to_sec() == 0 else msg.image_header) data.append((header.stamp.to_sec() - self.rosbag_start, '/control/predicted_states.trajectory', np.c_[msg.x_pos, msg.y_pos, msg.v_x])) elif topic == '/slam/predicted_measurements/main': data.append((msg.header.stamp.to_sec() - self.rosbag_start, '/slam/predicted_measurements/main.gps_position', np.array([msg.gps_x[1], msg.gps_y[1], msg.gps_speed[1]]))) data.append((msg.header.stamp.to_sec() - self.rosbag_start, '/slam/predicted_measurements/main.gps_position.uncertainty', np.array([msg.gps_x[2], msg.gps_y[2], msg.gps_speed[2]]))) elif topic == '/control/torque/request': data.append((msg.header.stamp.to_sec() - self.rosbag_start, '/control/torque/request.torque', np.array([msg.torque]))) elif topic == '/control/service_brake/request': data.append((msg.header.stamp.to_sec() - self.rosbag_start, '/control/service_brake/request.pressure', np.array([msg.pressure]))) elif topic == '/can/per/imu/accel': data.append((msg.header.stamp.to_sec() - self.rosbag_start, '/can/per/imu/accel', np.array([msg.x, msg.y, msg.z]))) elif topic == '/control/steering_wheel_angle/request': data.append((msg.header.stamp.to_sec() - self.rosbag_start, '/control/steering_wheel_angle/request.angle', msg.angle)) elif topic == '/can/per/steering_wheel_angle/actual': data.append((msg.header.stamp.to_sec() - self.rosbag_start, '/can/per/steering_wheel_angle/actual.angle', msg.angle)) bar.update(bar.value + 1) bar.finish(dirty=True) self.rosbag_df = pd.DataFrame(data, columns=['time', 'name', 'data']).sort_values(by='time', ascending=True)
[docs] def get_value_from_rosbag_df(self, time: float, name: str): if self.rosbag_df.empty: return None sliced = self.rosbag_df[(self.rosbag_df['time'].lt(time)) & (self.rosbag_df['name'] == name)] if sliced.shape[0] == 0: return None return sliced['data'].iloc[-1]
[docs] def get_values_from_rosbag_df(self, name: str, start_time: Optional[float] = None, end_time: Optional[float] = None): if self.rosbag_df.empty: return None sliced = self.rosbag_df[self.rosbag_df['name'] == name] if start_time is not None: sliced = sliced[sliced['time'].ge(start_time)] if end_time is not None: sliced = sliced[sliced['time'].le(end_time)] if sliced.shape[0] == 0: return None return sliced['data']
[docs] def visualize_feature(self, image: npt.NDArray, frame_i: int): for point in self.transformed_features[frame_i]: cv2.drawMarker(image, (int(point[0]), int(point[1])), (0, 0, 255), markerType=cv2.MARKER_TILTED_CROSS, markerSize=40, thickness=2, line_type=cv2.LINE_AA) for point in self.initial_features: cv2.drawMarker(image, (int(point[0]), int(point[1])), (255, 0, 0), markerType=cv2.MARKER_CROSS, markerSize=40, thickness=2, line_type=cv2.LINE_AA)
[docs] def visualize_transform_points(self, image: npt.NDArray, frame_i: int): colors = [(255, 255, 0), (255, 0, 255)] for point_i, point in enumerate(self.transform_points[frame_i]): cv2.drawMarker(image, (int(point[0, 0]), int(point[0, 1])), colors[self.transform_status[frame_i, point_i]], markerType=cv2.MARKER_STAR, markerSize=20, thickness=1, line_type=cv2.LINE_AA) cv2.putText(img=image, text=f'{np.count_nonzero(self.transform_status[frame_i])}', fontFace=cv2.FONT_HERSHEY_SIMPLEX, thickness=2, lineType=cv2.LINE_AA, org=(200, 2000), color=(0, 0, 255), fontScale=20)
[docs] @staticmethod def rotate_and_move(xy: npt.NDArray, rotation: float = 0., offset_before_rotation: npt.NDArray = np.array([0, 0]), offset_after_rotation: npt.NDArray = np.array([0, 0])) -> npt.NDArray: """ Function to move and rotate given coordinates. First moves the coordinates, then rotates them around the origin and then move them again. Using a numpy built rotation matrix. Parameters ---------- xy : npt.NDArray Coordinates to move, rotate and move again. rotation : float, optional Rotation angle to rotate the coordinates around the origin, by default 0. offset_before_rotation : npt.NDArray, optional Offset to move the coordinates before rotating them, by default np.array([0, 0]). offset_after_rotation : npt.NDArray, optional Offset to move the coordinates after rotating them, by default np.array([0, 0]). Returns ------- npt.NDArray Moved, rotated and moved coordinates. """ c, s = np.cos(rotation), np.sin(rotation) rotation = np.array([[c, -s], [s, c]]) points = np.einsum('ab, cb -> ca', rotation, xy + offset_before_rotation) points += offset_after_rotation return points
[docs] def visualize_fov_gate(self, image: npt.NDArray, frame_i: int, frame_t: int, add_to_legend: bool = False): color = (255, 0, 0) if add_to_legend is True: self.legend['Field of View Gate'] = color self.visualize_fov(image=image, frame_i=frame_i, frame_t=frame_t, fov_angle=np.radians(100), fov_distance=10, color=color)
[docs] def visualize_fov(self, image: npt.NDArray, frame_i: int, frame_t: int, fov_angle: float, fov_distance: float, color: Tuple[int, int, int]): vehicle_pose = self.get_value_from_rosbag_df(time=frame_t, name='/slam/vehicle_pose.pose') if vehicle_pose is None: return point_n = 50 path_points_map = np.empty((50 + 1, 2)) path_points_map[0] = [0, 0] for i, angle in enumerate(np.linspace(fov_angle / 2, - fov_angle / 2, point_n)): y = fov_distance * np.sin(angle) x = fov_distance * (np.cos(angle)) path_points_map[i + 1] = [x, y] rotated_path_points_map = self.rotate_and_move(xy=path_points_map, offset_after_rotation=vehicle_pose[:2], rotation=vehicle_pose[2]) path_points_world = self.transform_map_coordinated_to_world_coordinates( map_coordinates=rotated_path_points_map, frame_i=frame_i).astype(int) cv2.polylines(img=image, pts=path_points_world[np.newaxis], isClosed=True, color=color, lineType=cv2.LINE_AA, thickness=3)
[docs] def visualize_position_covariance(self, image: npt.NDArray, frame_i: int, color: Tuple[int], uncertainty: npt.NDArray, center: npt.NDArray): axes_size, angle = self.compute_eigenvalues_and_angle_of_covariance(uncertainty[:2, :2]) ellipse_poly_map = cv2.ellipse2Poly(center=(center[:2] * PIXEL_PER_METER).astype(int), axes=(np.array(axes_size) * PIXEL_PER_METER).astype(int), delta=1, angle=int(np.degrees(angle)), arcStart=0, arcEnd=360) / PIXEL_PER_METER ellipse_poly_world = self.transform_map_coordinated_to_world_coordinates( map_coordinates=ellipse_poly_map[:, :2], frame_i=frame_i).astype(int) cv2.polylines(img=image, pts=ellipse_poly_world[np.newaxis], isClosed=True, thickness=3, lineType=cv2.LINE_AA, color=color) image_copy = image.copy() cv2.fillPoly(img=image_copy, pts=ellipse_poly_world[np.newaxis], lineType=cv2.LINE_AA, color=color) cv2.addWeighted(src1=image, alpha=1 - ALPHA, src2=image_copy, beta=ALPHA, gamma=0, dst=image)
[docs] def visualize_heading_covariance(self, image: npt.NDArray, frame_i: int, color: Tuple[int], pose: npt.NDArray, uncertainty: float): points_n = 50 position_map = np.empty((points_n + 1, 2)) position_map[:] = pose[np.newaxis, :2] angles = pose[2] + np.linspace(start=-uncertainty, stop=uncertainty, num=points_n) position_map[1:, 0] += np.cos(angles) position_map[1:, 1] += np.sin(angles) position_world = self.transform_map_coordinated_to_world_coordinates( map_coordinates=position_map, frame_i=frame_i).astype(int) cv2.polylines(img=image, pts=position_world[np.newaxis], isClosed=True, thickness=3, lineType=cv2.LINE_AA, color=color) image_copy = image.copy() cv2.fillPoly(img=image_copy, pts=position_world[np.newaxis], lineType=cv2.LINE_AA, color=color) cv2.addWeighted(src1=image, alpha=1 - ALPHA, src2=image_copy, beta=ALPHA, gamma=0, dst=image)
[docs] def visualize_vehicle_position(self, image: npt.NDArray, frame_i: int, frame_t: int, uncertainty: bool = False, add_to_legend: bool = False): assert not self.rosbag_df.empty color = (0, 0, 255) vehicle_pose_map = self.get_value_from_rosbag_df(time=frame_t, name='/slam/vehicle_pose.pose') if vehicle_pose_map is None: return if add_to_legend is True: self.legend['Vehicle Position'] = color vehicle_position_map = np.array([vehicle_pose_map[:2], vehicle_pose_map[:2]]) vehicle_position_map[1] += [np.cos(vehicle_pose_map[2]) * 3, np.sin(vehicle_pose_map[2]) * 3] vehicle_position_world = self.transform_map_coordinated_to_world_coordinates( map_coordinates=vehicle_position_map, frame_i=frame_i).astype(int) cv2.arrowedLine(img=image, pt1=vehicle_position_world[0], pt2=vehicle_position_world[1], color=(0, 0, 255), line_type=cv2.LINE_AA, thickness=3) if uncertainty is True: vehicle_uncertainty_map = self.get_value_from_rosbag_df(time=frame_t, name='/slam/vehicle_pose.pose.uncertainty') if vehicle_uncertainty_map is None: return self.visualize_position_covariance(image=image, frame_i=frame_i, center=vehicle_position_map[0], uncertainty=vehicle_uncertainty_map, color=color) self.visualize_heading_covariance(image=image, frame_i=frame_i, pose=vehicle_pose_map, uncertainty=vehicle_uncertainty_map[3, 3], color=color)
[docs] def compute_eigenvalues_and_angle_of_covariance(self, covariance: npt.NDArray): def eigsorted(cov): vals, vecs = np.linalg.eigh(cov) order = vals.argsort()[::-1] return vals[order], vecs[:, order] vals, vecs = eigsorted(covariance) theta = np.arctan2(*vecs[:, 0][::-1]) w, h = 2 * N_STDDEV * np.sqrt(vals) return (w, h), theta
[docs] def visualize_landmarks(self, image: npt.NDArray, frame_i: int, frame_t: int, uncertainty: bool = False): assert not self.rosbag_df.empty landmark_info = self.get_value_from_rosbag_df(time=frame_t, name='/slam/landmarks') if landmark_info is None: return landmarks_map = landmark_info[0] landmarks_world = self.transform_map_coordinated_to_world_coordinates( map_coordinates=landmarks_map[:, :2], frame_i=frame_i).astype(int) for landmark_i, landmark in enumerate(landmarks_world): cv2.circle(img=image, center=landmark[:2], color=self._cone_colors[int(landmarks_map[landmark_i, 2])], radius=5, thickness=-1, lineType=cv2.LINE_AA) if uncertainty is False: return landmarks_uncertainty_map = landmark_info[1] for landmark_i, (landmark, landmark_uncertainty) in enumerate(zip(landmarks_map, landmarks_uncertainty_map)): self.visualize_position_covariance(image=image, frame_i=frame_i, center=landmark[:2], uncertainty=landmark_uncertainty, color=self._cone_colors[int(landmarks_map[landmark_i, 2])])
[docs] def visualize_centerpoints(self, image: npt.NDArray, frame_i: int, frame_t: int, add_to_legend: bool = False): assert not self.rosbag_df.empty color = (0, 255, 0) centerpoints_info = self.get_value_from_rosbag_df(time=frame_t, name='/filtering/centerpoints') if centerpoints_info is None: return if add_to_legend is True: self.legend['Centerpoints'] = color centerpoints_map, closed_track = centerpoints_info centerpoints_world = self.transform_map_coordinated_to_world_coordinates( map_coordinates=centerpoints_map, frame_i=frame_i).astype(int) cv2.polylines(img=image, pts=centerpoints_world[np.newaxis], isClosed=closed_track, thickness=3, lineType=cv2.LINE_AA, color=color)
[docs] def visualize_planned_path(self, image: npt.NDArray, frame_i: int, frame_t: int, add_to_legend: bool = False): assert not self.rosbag_df.empty color = (232, 52, 235) trajectory_info = self.get_value_from_rosbag_df(time=frame_t, name='/motion_planning/trajectory') if trajectory_info is None: return if add_to_legend is True: self.legend['Planned path'] = color trajectory_map, closed = trajectory_info trajectory_world = self.transform_map_coordinated_to_world_coordinates( map_coordinates=trajectory_map[:, :2], frame_i=frame_i).astype(int) cv2.polylines(img=image, pts=trajectory_world[np.newaxis], isClosed=closed, thickness=3, lineType=cv2.LINE_AA, color=color)
[docs] def visualize_predicted_path(self, image: npt.NDArray, frame_i: int, frame_t: int, add_to_legend: bool = False): assert not self.rosbag_df.empty color = (255, 255, 0) predicted_trajectory_map = self.get_value_from_rosbag_df(time=frame_t, name='/control/predicted_states.trajectory') if predicted_trajectory_map is None: return if add_to_legend is True: self.legend['Predicted path'] = color trajectory_world = self.transform_map_coordinated_to_world_coordinates( map_coordinates=predicted_trajectory_map[:, :2], frame_i=frame_i).astype(int) cv2.polylines(img=image, pts=trajectory_world[np.newaxis], isClosed=False, thickness=3, lineType=cv2.LINE_AA, color=color)
[docs] def visualize_driven_path(self, image: npt.NDArray, frame_i: int, frame_t: int, add_to_legend: bool = False): assert not self.rosbag_df.empty color = (255, 0, 128) vehicle_poses_map = self.get_values_from_rosbag_df(name='/slam/vehicle_pose.pose', start_time=0, end_time=frame_t) if vehicle_poses_map is None: return if add_to_legend is True: self.legend['Driven path'] = color vehicle_poses_world = self.transform_map_coordinated_to_world_coordinates( map_coordinates=np.stack(vehicle_poses_map.to_numpy(), axis=0)[:, :2], frame_i=frame_i).astype(int) cv2.polylines(img=image, pts=vehicle_poses_world[np.newaxis], isClosed=False, thickness=3, lineType=cv2.LINE_AA, color=color)
[docs] def visualize_will_have_driven_path(self, image: npt.NDArray, frame_i: int, frame_t: int, add_to_legend: bool = False): assert not self.rosbag_df.empty color = (52, 131, 235) vehicle_poses_map = self.get_values_from_rosbag_df(name='/slam/vehicle_pose.pose', start_time=frame_t, end_time=frame_t + CONTROL_TIME_WINDOW) if vehicle_poses_map is None: return if add_to_legend is True: self.legend['Future driven path'] = color vehicle_poses_world = self.transform_map_coordinated_to_world_coordinates( map_coordinates=np.stack(vehicle_poses_map.to_numpy(), axis=0)[:, :2], frame_i=frame_i).astype(int) cv2.polylines(img=image, pts=vehicle_poses_world[np.newaxis], isClosed=False, thickness=3, lineType=cv2.LINE_AA, color=color)
[docs] def visualize_gps_measurement(self, image: npt.NDArray, frame_i: int, frame_t: int, uncertainty: bool, add_to_legend: bool = False): color = (0, 128, 255) assert not self.rosbag_df.empty gps_measurement_map = self.get_value_from_rosbag_df(name='/slam/predicted_measurements/main.gps_position', time=frame_t) if gps_measurement_map is None or np.any(np.isnan(gps_measurement_map)): return if add_to_legend is True: self.legend['GPS Measurement'] = color gps_measurement_world = self.transform_map_coordinated_to_world_coordinates( map_coordinates=gps_measurement_map[np.newaxis, :2], frame_i=frame_i).astype(int) cv2.drawMarker(img=image, position=gps_measurement_world[0], color=color, markerType=cv2.MARKER_CROSS, markerSize=40, thickness=3, line_type=cv2.LINE_AA) if uncertainty is True: gps_uncertainty_map = self.get_value_from_rosbag_df( time=frame_t, name='/slam/predicted_measurements/main.gps_position.uncertainty') if gps_uncertainty_map is None: return covariance = np.square(np.array([[gps_uncertainty_map[0], 0], [0, gps_uncertainty_map[1]]])) self.visualize_position_covariance(image=image, frame_i=frame_i, center=gps_measurement_map[:2], color=color, uncertainty=covariance)
[docs] def visualize_speed(self, image: npt.NDArray, frame_t: int, origin: Tuple[int, int]): color = (0, 128, 255) origin = np.array((250, 250)) radius = 200 font_scale = 3 alpha = 0.6 assert not self.rosbag_df.empty vehicle_velocity = max(self.get_value_from_rosbag_df(time=frame_t, name='/slam/vehicle_pose.v_x')[0] * 3.6, 0) if vehicle_velocity is None: return image_copy = image.copy() middle_angle = int(vehicle_velocity / MAX_VELOCITY * 180 + 180) cv2.ellipse(img=image, center=origin, axes=(radius, radius), angle=0, startAngle=middle_angle, endAngle=360, color=(0, 0, 0), thickness=6, lineType=cv2.LINE_AA) cv2.addWeighted(src1=image, alpha=alpha, src2=image_copy, beta=1 - alpha, gamma=0, dst=image) cv2.ellipse(img=image, center=origin, axes=(radius, radius), angle=0, startAngle=180, endAngle=middle_angle, color=(255, 255, 255), thickness=6, lineType=cv2.LINE_AA) if vehicle_velocity < 10: text = f'{vehicle_velocity:.2f}' elif vehicle_velocity < 100: text = f'{vehicle_velocity:.1f}' else: text = f'{int(vehicle_velocity)}' (font_width2, font_height2), _ = cv2.getTextSize(text='km/h', fontFace=cv2.FONT_HERSHEY_SIMPLEX, fontScale=1, thickness=2) cv2.putText(img=image, text='km/h', fontFace=cv2.FONT_HERSHEY_SIMPLEX, thickness=2, lineType=cv2.LINE_AA, org=(origin - (font_width2, 0)), color=color, fontScale=1) cv2.putText(img=image, text=text, fontFace=cv2.FONT_HERSHEY_SIMPLEX, thickness=2, lineType=cv2.LINE_AA, org=(origin - (font_width2, font_height2 + 20)), color=color, fontScale=font_scale)
[docs] def draw_rounded_rectangle(self, image: npt.NDArray, origin: Tuple[int, int], size: Tuple[int, int], radius: int, color: Tuple[int, int, int], alpha: float): image_copy = image.copy() cv2.rectangle(img=image, color=color, thickness=-1, pt1=origin + (radius, 0), pt2=origin + size - (radius, 0)) cv2.rectangle(img=image, color=color, thickness=-1, pt1=origin + (0, radius), pt2=origin + size - (0, radius)) cv2.ellipse(img=image, center=origin + (radius, radius), axes=(radius, radius), angle=180.0, startAngle=0, endAngle=90, color=color, thickness=-1, lineType=cv2.LINE_AA) cv2.ellipse(img=image, center=origin + (size[0], 0) + (-radius, radius), axes=(radius, radius), angle=270.0, startAngle=0, endAngle=90, color=color, thickness=-1, lineType=cv2.LINE_AA) cv2.ellipse(img=image, center=origin + size - (radius, radius), axes=(radius, radius), angle=0.0, startAngle=0, endAngle=90, color=color, thickness=-1, lineType=cv2.LINE_AA) cv2.ellipse(img=image, center=origin + (0, size[1]) + (radius, -radius), axes=(radius, radius), angle=90.0, startAngle=0, endAngle=90, color=color, thickness=-1, lineType=cv2.LINE_AA) cv2.addWeighted(src1=image, alpha=alpha, src2=image_copy, beta=1 - alpha, gamma=0, dst=image)
[docs] def visualize_torque(self, image: npt.NDArray, frame_t: int, origin: Tuple[int, int], height: int): assert not self.rosbag_df.empty torque = self.get_value_from_rosbag_df(time=frame_t, name='/control/torque/request.torque') self.draw_rounded_rectangle(image=image, origin=origin, size=(30, height), radius=5, color=(0, 0, 0), alpha=0.5) if torque is None or torque == 0: return torque = max(torque[0], 0) middle_height = int(height - torque / MAX_TORQUE * height) self.draw_rounded_rectangle(image=image, origin=origin + (0, middle_height), size=(30, height - middle_height), radius=5, color=(0, 255, 0), alpha=0.5)
[docs] def visualize_brake(self, image: npt.NDArray, frame_t: int, origin: Tuple[int, int], height: int): assert not self.rosbag_df.empty pneumatic_pressure = self.get_value_from_rosbag_df(time=frame_t, name='/control/service_brake/request.pressure') self.draw_rounded_rectangle(image=image, origin=origin, size=(30, height), radius=5, color=(0, 0, 0), alpha=0.5) if pneumatic_pressure is None or pneumatic_pressure == 0: return pneumatic_pressure = max(pneumatic_pressure[0], 0) middle_height = int(height - pneumatic_pressure / MAX_PNEUMATIC_PRESSURE * height) self.draw_rounded_rectangle(image=image, origin=origin + (0, middle_height), size=(30, height - middle_height), radius=5, color=(0, 0, 255), alpha=0.5)
[docs] def visualize_acceleration(self, image: npt.NDArray, frame_t: int, origin: Tuple[int, int], radius: int): assert not self.rosbag_df.empty acceleration = self.get_value_from_rosbag_df(time=frame_t, name='/can/per/imu/accel') if acceleration is None: return axes = np.array((radius, radius)) alpha = 0.4 image_copy = image.copy() acc_x = acceleration[0] acc_y = acceleration[1] acc_offset = np.array((acc_y, -acc_x)) / MAX_ACCELERATION cv2.ellipse(img=image, center=origin + axes, axes=(0.25 * axes).astype(int), angle=0, startAngle=0, endAngle=360, color=(255, 255, 255), thickness=3, lineType=cv2.LINE_AA) cv2.ellipse(img=image, center=origin + axes, axes=(0.75 * axes).astype(int), angle=0, startAngle=0, endAngle=360, color=(255, 255, 255), thickness=3, lineType=cv2.LINE_AA) cv2.addWeighted(src1=image, alpha=alpha, src2=image_copy, beta=1 - alpha, gamma=0, dst=image) cv2.ellipse(img=image, center=origin + axes, axes=(0.5 * axes).astype(int), angle=0, startAngle=0, endAngle=360, color=(255, 255, 255), thickness=3, lineType=cv2.LINE_AA) cv2.ellipse(img=image, center=origin + axes, axes=axes.astype(int), angle=0, startAngle=0, endAngle=360, color=(255, 255, 255), thickness=3, lineType=cv2.LINE_AA) cv2.line(img=image, pt1=origin + (0, radius), pt2=origin + (2 * radius, radius), color=(255, 255, 255), thickness=3, lineType=cv2.LINE_AA) cv2.line(img=image, pt1=origin + (radius, 0), pt2=origin + (radius, 2 * radius), color=(255, 255, 255), thickness=3, lineType=cv2.LINE_AA) cv2.circle(img=image, center=(origin + axes + acc_offset * axes).astype(int), radius=6, color=(255, 0, 0), thickness=-1, lineType=cv2.LINE_AA) cv2.line(img=image, pt1=origin + axes, pt2=(origin + axes + acc_offset * axes).astype(int), color=(255, 0, 0), thickness=3, lineType=cv2.LINE_AA)
[docs] def visualize_steering_wheel_angle(self, image: npt.NDArray, frame_t: int, origin: Tuple[int, int], height: int): assert not self.rosbag_df.empty steering_wheel_request = self.get_value_from_rosbag_df( time=frame_t, name='/control/steering_wheel_angle/request.angle') steering_wheel_actual = self.get_value_from_rosbag_df( time=frame_t, name='/can/per/steering_wheel_angle/actual.angle') if steering_wheel_request is None and steering_wheel_actual is None: return image_copy = image.copy() alpha = 0.4 original_ellipse = cv2.ellipse2Poly(center=(500, 500), axes=(500, 500), angle=-90, arcStart=-115, arcEnd=115, delta=1) original_ellipse_height = abs(min(original_ellipse[:, 1]) - max(original_ellipse[:, 1])) ellipse = (0.8 * (original_ellipse / (original_ellipse_height / (height))) + (origin + (0.1 * height, 0.1 * height))) center = np.array((ellipse[np.argmin(original_ellipse[:, 1])][0], ellipse[np.argmin(original_ellipse[:, 0])][1])) radius = int(abs(center[0] - origin[0])) pt2 = center + (radius * np.cos(np.radians(90)), - radius * np.sin(np.radians(90))) cv2.line(img=image, pt1=center.astype(int), pt2=pt2.astype(int), thickness=3, lineType=cv2.LINE_AA, color=(255, 0, 0)) cv2.addWeighted(src1=image, alpha=alpha, src2=image_copy, beta=1 - alpha, gamma=0, dst=image) cv2.polylines(img=image, pts=(ellipse[np.newaxis] * 8).astype(int), isClosed=False, color=(255, 255, 255), thickness=3, lineType=cv2.LINE_AA, shift=3) if steering_wheel_request is not None and steering_wheel_actual is not None: original_error_ellipse = cv2.ellipse2Poly(center=(500, 500), axes=(500, 500), angle=-90, arcStart=-int(steering_wheel_request), arcEnd=-int(steering_wheel_actual), delta=1) error_ellipse = 0.8 * (original_error_ellipse / (original_ellipse_height / (height))) \ + (origin + (0.1 * height, 0.1 * height)) cv2.polylines(img=image, pts=(error_ellipse[np.newaxis] * 8).astype(int), isClosed=False, color=(0, 0, 255), thickness=3, lineType=cv2.LINE_AA, shift=3) if steering_wheel_actual is not None: pt2 = center + (radius * np.cos(np.radians(steering_wheel_actual + 90)), - radius * np.sin(np.radians(steering_wheel_actual + 90))) cv2.line(img=image, pt1=center.astype(int), pt2=pt2.astype(int), thickness=3, lineType=cv2.LINE_AA, color=(255, 0, 0)) if steering_wheel_request is not None: pt2 = center + (radius * np.cos(np.radians(steering_wheel_request + 90)), - radius * np.sin(np.radians(steering_wheel_request + 90))) cv2.line(img=image, pt1=center.astype(int), pt2=pt2.astype(int), thickness=3, lineType=cv2.LINE_AA, color=(0, 255, 0))
[docs] def visualize_overlay(self, image: npt.NDArray, frame_t: int): self.draw_rounded_rectangle(image=image, origin=np.array((25, 25)), size=(1150, 250), radius=25, color=(0, 0, 0), alpha=0.5) self.visualize_speed(image=image, frame_t=frame_t, origin=np.array((250, 250))) self.visualize_torque(image=image, frame_t=frame_t, origin=np.array((500, 50)), height=205) self.visualize_brake(image=image, frame_t=frame_t, origin=np.array((550, 50)), height=205) self.visualize_acceleration(image=image, frame_t=frame_t, origin=np.array((650, 50)), radius=100) self.visualize_steering_wheel_angle(image=image, frame_t=frame_t, origin=np.array((900, 50)), height=200)
[docs] def visualize_legend(self, image: npt.NDArray): org = np.array([image.shape[1] - 50, image.shape[0] - 50]) total_height = 0 max_width = 0 for text, color in self.legend.items(): (font_width, font_height), _ = cv2.getTextSize(text=text, fontFace=cv2.FONT_HERSHEY_SIMPLEX, fontScale=LEGEND_TEXT_SCALE, thickness=2) total_height += font_height + LEGEND_TEXT_DISTANCE max_width = max(max_width, font_width) size = (max_width + 50, total_height + 50) self.draw_rounded_rectangle(image=image, origin=org + (25, 25) - size, size=size, radius=25, color=(0, 0, 0), alpha=0.5) for text, color in self.legend.items(): (font_width, font_height), _ = cv2.getTextSize(text=text, fontFace=cv2.FONT_HERSHEY_SIMPLEX, fontScale=LEGEND_TEXT_SCALE, thickness=2) cv2.putText(img=image, text=text, fontFace=cv2.FONT_HERSHEY_SIMPLEX, thickness=2, lineType=cv2.LINE_AA, org=org - (font_width, 0), color=color, fontScale=LEGEND_TEXT_SCALE) org -= (0, font_height + LEGEND_TEXT_DISTANCE)
[docs] def visualize_image(self, image: npt.NDArray, frame_i: int, frame_t: int): # self.visualize_feature(image=image, frame_i=frame_i) # noqa: E800 # self.visualize_transform_points(image=image, frame_i=frame_i) # noqa: E800 if self.visualize_only_debug_information is True: return self.visualize_vehicle_position(image=image, frame_i=frame_i, frame_t=frame_t, uncertainty=True, add_to_legend=True) # self.visualize_fov_gate(image=image, frame_i=frame_i, frame_t=frame_t, add_to_legend=True) # noqa: E800 self.visualize_landmarks(image=image, frame_i=frame_i, frame_t=frame_t, uncertainty=False) self.visualize_centerpoints(image=image, frame_i=frame_i, frame_t=frame_t, add_to_legend=True) self.visualize_planned_path(image=image, frame_i=frame_i, frame_t=frame_t, add_to_legend=True) self.visualize_predicted_path(image=image, frame_i=frame_i, frame_t=frame_t, add_to_legend=True) self.visualize_driven_path(image=image, frame_i=frame_i, frame_t=frame_t, add_to_legend=True) self.visualize_will_have_driven_path(image=image, frame_i=frame_i, frame_t=frame_t, add_to_legend=True) # self.visualize_gps_measurement(image=image, frame_i=frame_i, frame_t=frame_t, uncertainty=True) # noqa: E800 self.visualize_legend(image=image) self.visualize_overlay(image=image, frame_t=frame_t)
[docs] def generate_output(self): max_value = np.argwhere((self.video_pts_and_time_to_index[:, 1] >= self.video_start) & (self.video_pts_and_time_to_index[:, 1] <= self.video_end)).size variable_widgets = [progressbar.Variable('input_time', format='{name}: {formatted_value}'), ' ', progressbar.Variable('output_time', format='{name}: {formatted_value}'), ' '] first = True with av.open(self.input_video_path) as input_container, \ av.open(f'{self.input_video_path.rsplit(".")[0]}{self.output_suffix}.mp4', mode='w') as output_container, \ generate_progressbar(name='Generating output video | ', data_length=max_value, variable_widgets=variable_widgets) as bar: input_stream = input_container.streams.video[0] input_container.seek(offset=int(self.video_start / input_stream.time_base), stream=input_stream) output_stream = output_container.add_stream('h264', rate=input_stream.base_rate) output_stream.pix_fmt = 'yuv420p' output_stream.codec_context.time_base = Fraction(1, 30000) output_stream.height = input_stream.height output_stream.width = input_stream.width output_stream.bit_rate = input_stream.bit_rate start_pts = None artifical_video_end = self.video_pts_and_time_to_index[-1, 1] for input_frame in input_container.decode(input_stream): if input_frame.time < self.video_start: continue if input_frame.time > self.video_end or input_frame.time > artifical_video_end: break if start_pts is None: start_pts = input_frame.pts if first is True: first = False frame_time = input_frame.time - self.video_start frame_i = np.argwhere(self.video_pts_and_time_to_index[:, 0] == input_frame.pts)[0, 0] if self.killer.kill_now is True: self.killer.kill_now = False break image = input_frame.to_ndarray(format='bgr24') np.save(f'{self.input_video_path.rsplit(".")[0]}{self.output_suffix}.JPG', image) self.visualize_image(image=image, frame_i=frame_i, frame_t=frame_time) output_frame = av.VideoFrame.from_ndarray(image, format='bgr24') output_frame.pts = input_frame.pts - start_pts for packet in output_stream.encode(output_frame): output_container.mux(packet) bar.update(bar.value + 1, input_time=input_frame.time, output_time=frame_time) if input_frame.index >= self.transformed_features.shape[0] - 1: break for packet in output_stream.encode(): output_container.mux(packet) bar.finish(dirty=True)
if __name__ == '__main__': drone_visualization = DroneVisualization(input_rosbag='/workspace/as_ros/rosbags/run_370_sim_ni.bag', input_video='/workspace/as_ros/rosbags/videos/run_370.MP4', recalculate_transforms=False, killer=GracefulKiller(), no_new_transforms=False, save_recovery=True, track_height=30.5, track_width=60, recover_initial_features=True, recover_offset=True, recover_initial_position=True, start=5, duration=5, end=None, output='_5') drone_visualization.run()