import json
import os
from typing import Optional, Tuple, List
from pathlib import Path
import progressbar
import signal
from enum import Enum
from fractions import Fraction
import click
from matplotlib.lines import Line2D
from matplotlib.backend_bases import PickEvent
import matplotlib.pyplot as plt
import numpy as np
import numpy.typing as npt
import pandas as pd
import cv2
import av
import av.datasets
import rosbag
PIXEL_PER_METER = 1000
N_STDDEV = 1
ALPHA = 0.2
LEGEND_TEXT_SCALE = 3
LEGEND_TEXT_DISTANCE = 15
MAX_PNEUMATIC_PRESSURE = 3
MAX_VELOCITY = 100
MAX_TORQUE = 280
MAX_ACCELERATION = 2 * 9.81
MAX_STEERING_WHEEL_ANGLE = 115
CONTROL_TIME_WINDOW = 1.5
[docs]def generate_progressbar(name: str, data_length: int,
variable_widgets: List[progressbar.Variable] = []) -> progressbar.ProgressBar:
"""
Generates a progressbar object with the given data length with more information than the standard one.
Parameters
----------
data_length : int
Length of data points to process.
Returns
-------
progressbar.ProgressBar
Progressbar object with the given data length with more information than the standard one.
"""
widgets1 = [
name,
progressbar.Percentage(), ' ',
progressbar.SimpleProgress(), ' ',
progressbar.Bar(), ' ']
# progressbar.Variable('prediction_step'), ' ', # noqa: E800
widgets2 = [
progressbar.Timer(), ' ',
progressbar.ETA()
]
widgets = widgets1 + variable_widgets + widgets2
return progressbar.ProgressBar(max_value=data_length, widgets=widgets, redirect_stdout=True)
[docs]class GracefulKiller:
kill_now = False
def __init__(self):
signal.signal(signal.SIGINT, self.exit_gracefully)
signal.signal(signal.SIGTERM, self.exit_gracefully)
[docs] def exit_gracefully(self, *args):
self.kill_now = True
[docs]class OffsetFinder():
def __init__(self, video_path: str, input_rosbag_path: str, killer: GracefulKiller) -> float:
self.video_path = video_path
self.input_rosbag_path = input_rosbag_path
self.rosbag_start_time = None
self.video_start_time = None
self.offset = None
self.video_start_pts = None
self.video_start_index = None
self.killer = killer
[docs] def find_image_i(self, image_i: int, key: str):
if key == 81:
image_i = max(0, image_i - 1)
elif key == 83:
image_i = max(0, image_i + 1)
elif key == 85:
image_i = max(0, image_i + 5)
elif key == 86:
image_i = max(0, image_i - 5)
return image_i
[docs] def calculate_offset(self):
if self.video_start_time is not None and self.rosbag_start_time is not None:
self.offset = - (self.rosbag_start_time - self.video_start_time)
[docs] def find_video_start(self):
cv2.namedWindow('offset_finder_window', cv2.WINDOW_NORMAL)
with av.open(self.video_path) as container:
key_stream = container.streams.video[0]
key_stream.codec_context.skip_frame = "NONKEY"
stream = container.streams.video[0]
key_frame_generator = container.decode(key_stream)
key_images = []
key_frame_pts = []
key_image_i = 0
while self.killer.kill_now is False:
while key_image_i + 1 > len(key_images):
frame = next(key_frame_generator)
key_images.append(frame.to_ndarray(format='bgr24'))
key_frame_pts.append(frame.pts)
cv2.imshow('offset_finder_window', key_images[key_image_i])
key = cv2.waitKey()
if key == ord('\r'):
break
elif key is not None:
key_image_i = self.find_image_i(image_i=key_image_i, key=key)
stream.codec_context.skip_frame = "DEFAULT"
assert key_image_i > 0
container.seek(offset=key_frame_pts[key_image_i - 1], stream=stream)
image_i = 0
images = []
frame_pts = []
frame_generator = container.decode(stream)
while self.killer.kill_now is False:
while image_i + 1 > len(images):
frame = next(frame_generator)
images.append(frame.to_ndarray(format='bgr24'))
frame_pts.append(frame.pts)
cv2.imshow('offset_finder_window', images[image_i])
key = cv2.waitKey()
if key == ord('\r'):
break
elif key is not None:
image_i = self.find_image_i(image_i=image_i, key=key)
if self.killer.kill_now is False:
assert image_i > 0
self.video_start_time = frame.time
self.video_start_pts = frame.pts
self.video_start_index = frame.index
cv2.destroyWindow('offset_finder_window')
[docs] def select_rosbag_start_time(self, event: PickEvent):
if isinstance(event.artist, Line2D):
x, y = event.mouseevent.xdata, event.mouseevent.ydata
self.ax.scatter(x, y, color='r', s=500, marker='x')
self.fig.canvas.draw()
self.rosbag_start_time = x
[docs] def find_rosbag_start(self):
if self.killer.kill_now is True:
return
bag = rosbag.Bag(self.input_rosbag_path)
input_rosbag_start = bag.get_start_time()
first_movement: Optional[float] = None
messages = []
with progressbar.ProgressBar(max_value=bag.get_message_count()) as bar:
bar.start()
for topic, msg, t in bag.read_messages():
if first_movement is not None and t.to_sec() > first_movement + 1:
break
if topic == '/can/log/wheelspeed/rear':
if first_movement is None and (abs(msg.right) > 100 or abs(msg.left) > 100):
first_movement = t.to_sec()
messages.append((t.to_sec(), 'wheelspeed_rear_right', msg.right))
messages.append((t.to_sec(), 'wheelspeed_rear_left', msg.left))
if topic == '/can/per/wheelspeed/front':
if first_movement is None and (abs(msg.right) > 100 or abs(msg.left) > 100):
first_movement = t.to_sec()
messages.append((t.to_sec(), 'wheelspeed_front_right', msg.right))
messages.append((t.to_sec(), 'wheelspeed_front_left', msg.left))
bar.update(bar.value + 1)
assert first_movement is not None
dataframe = pd.DataFrame(messages, columns=['time', 'topic', 'data'])
dataframe['time'] -= input_rosbag_start
dataframe = dataframe[dataframe['time'] > first_movement - input_rosbag_start - 1]
self.fig = plt.figure()
self.ax = self.fig.add_subplot(111)
self.ax.set_title('Pick point of first movement, afterwards close window', picker=True)
topics = ['wheelspeed_rear_right', 'wheelspeed_rear_left',
'wheelspeed_front_right', 'wheelspeed_front_left']
colors = ['g', 'b', 'm', 'c']
for topic, color in zip(topics, colors):
subdf = dataframe[dataframe['topic'] == topic]
self.ax.plot(subdf['time'], subdf['data'], 'o', picker=5, label=topic, c=color)
self.ax.legend()
self.fig.canvas.mpl_connect('pick_event', self.select_rosbag_start_time)
while self.rosbag_start_time is None and self.killer.kill_now is False:
plt.show()
[docs]class FeatureSelectorMode(Enum):
BOUNDARIES = 1
POSE_DEFINITION = 2
VEHICLE_MASK = 3
[docs]class FeatureSelector():
def __init__(self, image: npt.NDArray, killer: GracefulKiller,
track_width: float, track_height: float, mode: FeatureSelectorMode) -> None:
self.features = []
self.killer = killer
self.finished = False
self.track_width = track_width
self.track_height = track_height
self.mode = mode
self.image = image
self.plot_order()
self.original_image = image.copy()
cv2.namedWindow('feature_selector_window', cv2.WINDOW_NORMAL)
cv2.setMouseCallback('feature_selector_window', self.add_feature)
self.last_position: Optional[Tuple[int]] = None
[docs] def plot_order(self):
if self.mode == FeatureSelectorMode.BOUNDARIES:
rows, cols = self.image.shape[:2]
cv2.arrowedLine(self.image, (20, 20), (cols - 50, 20), color=(130, 130, 0),
thickness=5, tipLength=0.01)
cv2.arrowedLine(self.image, (cols - 20, 20), (cols - 20, rows - 50),
color=(130, 130, 0), thickness=5, tipLength=0.01)
cv2.arrowedLine(self.image, (cols - 20, rows - 20), (50, rows - 20),
color=(130, 130, 0), thickness=5, tipLength=0.01)
[docs] def plot_features(self):
self.image = self.original_image.copy()
for feature in self.features:
if self.mode == FeatureSelectorMode.BOUNDARIES:
cv2.drawMarker(self.image, feature, (0, 255, 0), markerType=cv2.MARKER_STAR,
markerSize=40, thickness=2, line_type=cv2.LINE_AA)
elif self.mode == FeatureSelectorMode.POSE_DEFINITION:
cv2.circle(self.image, feature, radius=3, color=(0, 0, 255), thickness=-1)
elif self.mode == FeatureSelectorMode.VEHICLE_MASK:
cv2.polylines(img=self.image, pts=np.array(self.features)[np.newaxis], isClosed=True,
color=(0, 0, 255), thickness=3, lineType=cv2.LINE_AA)
[docs] def show(self):
while not self.killer.kill_now and not self.finished:
cv2.imshow('feature_selector_window', self.image)
key = cv2.waitKey(50)
if key == ord('\b'):
if self.features:
del self.features[-1]
self.plot_features()
if self.mode == FeatureSelectorMode.VEHICLE_MASK and key == ord('\r'):
break
while not self.killer.kill_now:
if self.mode == FeatureSelectorMode.BOUNDARIES:
cv2.imshow('feature_selector_window', self.new_image)
elif self.mode == FeatureSelectorMode.POSE_DEFINITION:
cv2.imshow('feature_selector_window', self.image)
elif self.mode == FeatureSelectorMode.VEHICLE_MASK:
break
key = cv2.waitKey(3000)
if key == ord('\r'):
break
cv2.destroyWindow('feature_selector_window')
[docs] def add_feature(self, event, x, y, flags, param):
if self.finished is False:
if event == cv2.EVENT_MBUTTONDOWN:
self.last_position = (x, y)
elif event == cv2.EVENT_MBUTTONUP and self.last_position is not None:
last_position = self.last_position
self.last_position = None
self.find_good_features_to_track_in_rectangle(last_position, (x, y))
if event == cv2.EVENT_LBUTTONDBLCLK:
if self.mode == FeatureSelectorMode.BOUNDARIES:
cv2.drawMarker(self.image, (x, y), (0, 255, 0), markerType=cv2.MARKER_STAR,
markerSize=40, thickness=2, line_type=cv2.LINE_AA)
self.features.append((x, y))
if len(self.features) == 4:
self.transform_image_by_features()
self.finished = True
elif self.mode == FeatureSelectorMode.POSE_DEFINITION:
cv2.circle(self.image, (x, y), radius=3, color=(0, 0, 255), thickness=-1)
self.features.append((x, y))
if len(self.features) == 2:
cv2.arrowedLine(self.image, self.features[0], self.features[1], color=(0, 0, 255), thickness=2)
self.finished = True
elif self.mode == FeatureSelectorMode.VEHICLE_MASK:
self.features.append((x, y))
self.plot_features()
[docs] def find_good_features_to_track_in_rectangle(self, point1, point2):
y_min = min(point1[1], point2[1])
x_min = min(point1[0], point2[0])
x_max = max(point1[0], point2[0])
y_max = max(point1[1], point2[1])
mask = np.zeros(self.image.shape[:2], dtype=np.uint8)
mask[y_min:y_max, x_min:x_max] = 255
corners = cv2.goodFeaturesToTrack(cv2.cvtColor(self.image, cv2.COLOR_BGR2GRAY),
maxCorners=10, qualityLevel=0.0001, minDistance=4, mask=mask)
cv2.rectangle(self.image, pt1=point1, pt2=point2, color=(255, 0, 0), thickness=4)
if corners is not None:
for corner in corners:
x, y = np.int0(corner).ravel()
cv2.circle(self.image, (x, y), 1, (0, 0, 255), -1)
[docs]class DroneVisualization():
_cone_colors = [(0, 255, 255), (255, 0, 0), (0, 165, 255), (0, 0, 255)]
def __init__(self, input_rosbag: str, input_video: str, recalculate_transforms: bool, no_new_transforms: bool,
track_height: float, track_width: float, recover_offset: bool, recover_initial_features: bool,
killer: GracefulKiller, start: int, duration: Optional[int], end: Optional[int], output: str,
save_recovery: bool, recover_initial_position: bool, recover_initial_vehicle_mask: bool,
visualize_only_debug_information: bool) -> None:
self.killer = killer
self.input_video_path = input_video
self.incrementing_transforms: Optional[npt.NDArray] = None
self.total_transforms: Optional[npt.NDArray] = None
self.video_pts_and_time_to_index: Optional[npt.NDArray] = None
self.chained_transforms: Optional[npt.NDArray] = None
# initial features transformed to world for every time step
self.transformed_features: Optional[npt.NDArray] = None
# from normalized world to world
self.perspective_transforms: Optional[npt.NDArray] = None
self.map_to_world_transforms: Optional[npt.NDArray] = None
self.recalculate_transforms = recalculate_transforms
self.no_new_transforms = no_new_transforms
self.start = start
self.duration = duration
self.end = end
self.input_rosbag_path = input_rosbag
self.input_rosbag = rosbag.Bag(input_rosbag)
self.input_rosbag_start = self.input_rosbag.get_start_time()
self.rosbag_start: Optional[float] = None
self.rosbag_end: Optional[float] = None
self.save_recovery = save_recovery
self.recover_offset = recover_offset
self.offset = None
self.video_start = start
self.video_end = end
self.input_video_start_pts = None
self.input_video_start_index = None
self.track_width = track_width
self.track_height = track_height
self.recover_initial_features = recover_initial_features
self.initial_features: Optional[Optional.NDArray] = None
self.recover_initial_position = recover_initial_position
self.initial_position: Optional[Optional.NDArray] = None
self.recover_initial_vehicle_mask = recover_initial_vehicle_mask
self.initial_vehicle_mask: Optional[Optional.NDArray] = None
self.rosbag_df = pd.DataFrame()
self.legend = {}
self.recover_information()
self.output_suffix = output
self.visualize_only_debug_information = visualize_only_debug_information
[docs] def run(self):
if self.offset is None or self.input_video_start_pts is None or self.input_video_start_index is None:
self.find_offset()
if self.offset is not None:
self.find_start_and_end()
if self.initial_features is None:
self.find_initial_features()
if self.initial_vehicle_mask is None:
self.find_initial_vehicle_mask()
if self.no_new_transforms is False:
self.calculate_transforms_from_video()
if self.initial_position is None:
self.find_initial_position()
if self.save_recovery is True:
print('Saving recovery information!')
self.save_recovery_information()
if self.total_transforms is not None:
self.affine_transform_initial_features()
if self.transformed_features is not None:
self.calculate_perspective_transform()
if self.transformed_features is not None:
self.calculate_map_to_world_transform()
if self.map_to_world_transforms is not None:
self.import_rosbag()
if self.transformed_features is not None:
self.generate_output()
[docs] def find_initial_features(self):
with av.open(self.input_video_path) as container:
stream = container.streams.video[0]
frame_generator = container.decode(stream)
frame = next(frame_generator)
image = frame.to_ndarray(format='bgr24')
self.feature_selector = FeatureSelector(image=image, killer=self.killer,
track_height=self.track_height, track_width=self.track_width,
mode=FeatureSelectorMode.BOUNDARIES)
self.feature_selector.show()
self.initial_features = np.array(self.feature_selector.features)
[docs] def find_initial_vehicle_mask(self):
with av.open(self.input_video_path) as container:
stream = container.streams.video[0]
frame_generator = container.decode(stream)
frame = next(frame_generator)
image = frame.to_ndarray(format='bgr24')
self.feature_selector = FeatureSelector(image=image, killer=self.killer,
track_height=self.track_height, track_width=self.track_width,
mode=FeatureSelectorMode.VEHICLE_MASK)
self.feature_selector.show()
self.initial_vehicle_mask = np.array(self.feature_selector.features, dtype=int)
[docs] def find_initial_position(self):
with av.open(self.input_video_path) as container:
stream = container.streams.video[0]
container.seek(offset=self.input_video_start_pts, stream=stream)
frame_generator = container.decode(stream)
frame = next(frame_generator)
while frame.pts != self.input_video_start_pts:
frame = next(frame_generator)
image = frame.to_ndarray(format='bgr24')
self.initial_pose_selector = FeatureSelector(image=image, killer=self.killer,
track_height=self.track_height, track_width=self.track_width,
mode=FeatureSelectorMode.POSE_DEFINITION)
self.initial_pose_selector.show()
self.initial_position = np.array(self.initial_pose_selector.features)
[docs] def find_offset(self):
offset_finder = OffsetFinder(video_path=self.input_video_path, input_rosbag_path=self.input_rosbag_path,
killer=self.killer)
if self.offset is None or self.input_video_start_pts is None:
offset_finder.find_video_start()
self.input_video_start_pts = offset_finder.video_start_pts
if self.input_video_start_index is None:
self.find_input_video_start_index()
if self.offset is None:
offset_finder.find_rosbag_start()
offset_finder.calculate_offset()
if offset_finder.offset is not False:
self.offset = offset_finder.offset
[docs] def find_start_and_end(self):
if self.start + self.offset < 0:
self.rosbag_start = self.input_rosbag_start - self.offset
self.video_start = 0
else:
self.rosbag_start = self.input_rosbag_start + self.start
self.video_start = self.start + self.offset
with av.open(self.input_video_path) as container:
stream = container.streams.video[0]
input_video_length = float(stream.time_base * stream.duration)
duration = min(value for value in [self.duration, input_video_length - self.video_start,
self.input_rosbag.get_end_time() - self.rosbag_start]
if value is not None)
self.rosbag_end = self.rosbag_start + duration
self.video_end = self.video_start + duration
[docs] def import_rosbag(self):
data = []
with generate_progressbar(name='Importing Rosbag | ', data_length=self.input_rosbag.get_message_count()) as bar:
bar.start()
perception_disabled = False
for topic, msg, t in self.input_rosbag.read_messages():
if t.to_sec() > 1667726358.271653935:
perception_disabled = True
if t.to_sec() > self.rosbag_end + 2:
break
if topic == '/slam/vehicle_pose':
data.append((msg.header.stamp.to_sec() - self.rosbag_start,
'/slam/vehicle_pose.pose', np.array([msg.x, msg.y, msg.psi])))
data.append((msg.header.stamp.to_sec() - self.rosbag_start,
'/slam/vehicle_pose.v_x', np.array([msg.v_x])))
data.append((msg.header.stamp.to_sec() - self.rosbag_start,
'/slam/vehicle_pose.pose.uncertainty', np.array(msg.P).reshape((4, 4))))
elif topic == '/slam/vehicle_pose/main':
data.append((msg.header.stamp.to_sec() - self.rosbag_start,
'/slam/vehicle_pose/main.pose', np.array([msg.x, msg.y, msg.psi])))
data.append((msg.header.stamp.to_sec() - self.rosbag_start,
'/slam/vehicle_pose/main.v_x', np.array([msg.v_x])))
data.append((msg.header.stamp.to_sec() - self.rosbag_start,
'/slam/vehicle_pose/main.pose.uncertainty', np.array(msg.P).reshape((4, 4))))
elif topic == '/slam/landmarks':
landmark_positions = np.array([(cone.x, cone.y, cone.id) for cone in msg.cones])
landmark_uncertainty = np.array([np.reshape(cone.covariance, (2, 2)) for cone in msg.cones])
data.append((msg.header.stamp.to_sec() - self.rosbag_start,
'/slam/landmarks', (landmark_positions, landmark_uncertainty)))
elif topic == '/slam/disable_perception':
perception_disabled = msg.disable_perception
elif topic == '/filtering/centerpoints':
header = msg.header if msg.image_header.stamp.to_sec() == 0 else msg.image_header
data.append((header.stamp.to_sec() - self.rosbag_start,
'/filtering/centerpoints', (np.c_[msg.x, msg.y], msg.closed_track)))
elif topic == '/motion_planning/trajectory':
header = (msg.header if perception_disabled is True or msg.image_header.stamp.to_sec() == 0
else msg.image_header)
data.append((header.stamp.to_sec() - self.rosbag_start, '/motion_planning/trajectory',
(np.c_[msg.x, msg.y, msg.v], msg.closed)))
elif topic == '/control/predicted_states':
header = (msg.header if perception_disabled is True or msg.image_header.stamp.to_sec() == 0
else msg.image_header)
data.append((header.stamp.to_sec() - self.rosbag_start, '/control/predicted_states.trajectory',
np.c_[msg.x_pos, msg.y_pos, msg.v_x]))
elif topic == '/slam/predicted_measurements/main':
data.append((msg.header.stamp.to_sec() - self.rosbag_start,
'/slam/predicted_measurements/main.gps_position',
np.array([msg.gps_x[1], msg.gps_y[1], msg.gps_speed[1]])))
data.append((msg.header.stamp.to_sec() - self.rosbag_start,
'/slam/predicted_measurements/main.gps_position.uncertainty',
np.array([msg.gps_x[2], msg.gps_y[2], msg.gps_speed[2]])))
elif topic == '/control/torque/request':
data.append((msg.header.stamp.to_sec() - self.rosbag_start,
'/control/torque/request.torque', np.array([msg.torque])))
elif topic == '/control/service_brake/request':
data.append((msg.header.stamp.to_sec() - self.rosbag_start,
'/control/service_brake/request.pressure', np.array([msg.pressure])))
elif topic == '/can/per/imu/accel':
data.append((msg.header.stamp.to_sec() - self.rosbag_start,
'/can/per/imu/accel', np.array([msg.x, msg.y, msg.z])))
elif topic == '/control/steering_wheel_angle/request':
data.append((msg.header.stamp.to_sec() - self.rosbag_start,
'/control/steering_wheel_angle/request.angle', msg.angle))
elif topic == '/can/per/steering_wheel_angle/actual':
data.append((msg.header.stamp.to_sec() - self.rosbag_start,
'/can/per/steering_wheel_angle/actual.angle', msg.angle))
bar.update(bar.value + 1)
bar.finish(dirty=True)
self.rosbag_df = pd.DataFrame(data, columns=['time', 'name', 'data']).sort_values(by='time', ascending=True)
[docs] def get_value_from_rosbag_df(self, time: float, name: str):
if self.rosbag_df.empty:
return None
sliced = self.rosbag_df[(self.rosbag_df['time'].lt(time)) & (self.rosbag_df['name'] == name)]
if sliced.shape[0] == 0:
return None
return sliced['data'].iloc[-1]
[docs] def get_values_from_rosbag_df(self, name: str, start_time: Optional[float] = None,
end_time: Optional[float] = None):
if self.rosbag_df.empty:
return None
sliced = self.rosbag_df[self.rosbag_df['name'] == name]
if start_time is not None:
sliced = sliced[sliced['time'].ge(start_time)]
if end_time is not None:
sliced = sliced[sliced['time'].le(end_time)]
if sliced.shape[0] == 0:
return None
return sliced['data']
[docs] def visualize_feature(self, image: npt.NDArray, frame_i: int):
for point in self.transformed_features[frame_i]:
cv2.drawMarker(image, (int(point[0]), int(point[1])), (0, 0, 255), markerType=cv2.MARKER_TILTED_CROSS,
markerSize=40, thickness=2, line_type=cv2.LINE_AA)
for point in self.initial_features:
cv2.drawMarker(image, (int(point[0]), int(point[1])), (255, 0, 0), markerType=cv2.MARKER_CROSS,
markerSize=40, thickness=2, line_type=cv2.LINE_AA)
[docs] @staticmethod
def rotate_and_move(xy: npt.NDArray, rotation: float = 0.,
offset_before_rotation: npt.NDArray = np.array([0, 0]),
offset_after_rotation: npt.NDArray = np.array([0, 0])) -> npt.NDArray:
"""
Function to move and rotate given coordinates.
First moves the coordinates, then rotates them around the origin and then move them again.
Using a numpy built rotation matrix.
Parameters
----------
xy : npt.NDArray
Coordinates to move, rotate and move again.
rotation : float, optional
Rotation angle to rotate the coordinates around the origin, by default 0.
offset_before_rotation : npt.NDArray, optional
Offset to move the coordinates before rotating them, by default np.array([0, 0]).
offset_after_rotation : npt.NDArray, optional
Offset to move the coordinates after rotating them, by default np.array([0, 0]).
Returns
-------
npt.NDArray
Moved, rotated and moved coordinates.
"""
c, s = np.cos(rotation), np.sin(rotation)
rotation = np.array([[c, -s], [s, c]])
points = np.einsum('ab, cb -> ca', rotation, xy + offset_before_rotation)
points += offset_after_rotation
return points
[docs] def visualize_fov_gate(self, image: npt.NDArray, frame_i: int, frame_t: int, add_to_legend: bool = False):
color = (255, 0, 0)
if add_to_legend is True:
self.legend['Field of View Gate'] = color
self.visualize_fov(image=image, frame_i=frame_i, frame_t=frame_t,
fov_angle=np.radians(100), fov_distance=10, color=color)
[docs] def visualize_fov(self, image: npt.NDArray, frame_i: int, frame_t: int,
fov_angle: float, fov_distance: float, color: Tuple[int, int, int]):
vehicle_pose = self.get_value_from_rosbag_df(time=frame_t, name='/slam/vehicle_pose.pose')
if vehicle_pose is None:
return
point_n = 50
path_points_map = np.empty((50 + 1, 2))
path_points_map[0] = [0, 0]
for i, angle in enumerate(np.linspace(fov_angle / 2, - fov_angle / 2, point_n)):
y = fov_distance * np.sin(angle)
x = fov_distance * (np.cos(angle))
path_points_map[i + 1] = [x, y]
rotated_path_points_map = self.rotate_and_move(xy=path_points_map, offset_after_rotation=vehicle_pose[:2],
rotation=vehicle_pose[2])
path_points_world = self.transform_map_coordinated_to_world_coordinates(
map_coordinates=rotated_path_points_map, frame_i=frame_i).astype(int)
cv2.polylines(img=image, pts=path_points_world[np.newaxis], isClosed=True, color=color,
lineType=cv2.LINE_AA, thickness=3)
[docs] def visualize_position_covariance(self, image: npt.NDArray, frame_i: int, color: Tuple[int],
uncertainty: npt.NDArray, center: npt.NDArray):
axes_size, angle = self.compute_eigenvalues_and_angle_of_covariance(uncertainty[:2, :2])
ellipse_poly_map = cv2.ellipse2Poly(center=(center[:2] * PIXEL_PER_METER).astype(int),
axes=(np.array(axes_size) * PIXEL_PER_METER).astype(int), delta=1,
angle=int(np.degrees(angle)), arcStart=0, arcEnd=360) / PIXEL_PER_METER
ellipse_poly_world = self.transform_map_coordinated_to_world_coordinates(
map_coordinates=ellipse_poly_map[:, :2], frame_i=frame_i).astype(int)
cv2.polylines(img=image, pts=ellipse_poly_world[np.newaxis], isClosed=True, thickness=3,
lineType=cv2.LINE_AA, color=color)
image_copy = image.copy()
cv2.fillPoly(img=image_copy, pts=ellipse_poly_world[np.newaxis], lineType=cv2.LINE_AA, color=color)
cv2.addWeighted(src1=image, alpha=1 - ALPHA, src2=image_copy, beta=ALPHA, gamma=0, dst=image)
[docs] def visualize_heading_covariance(self, image: npt.NDArray, frame_i: int, color: Tuple[int],
pose: npt.NDArray, uncertainty: float):
points_n = 50
position_map = np.empty((points_n + 1, 2))
position_map[:] = pose[np.newaxis, :2]
angles = pose[2] + np.linspace(start=-uncertainty, stop=uncertainty, num=points_n)
position_map[1:, 0] += np.cos(angles)
position_map[1:, 1] += np.sin(angles)
position_world = self.transform_map_coordinated_to_world_coordinates(
map_coordinates=position_map, frame_i=frame_i).astype(int)
cv2.polylines(img=image, pts=position_world[np.newaxis], isClosed=True, thickness=3,
lineType=cv2.LINE_AA, color=color)
image_copy = image.copy()
cv2.fillPoly(img=image_copy, pts=position_world[np.newaxis], lineType=cv2.LINE_AA, color=color)
cv2.addWeighted(src1=image, alpha=1 - ALPHA, src2=image_copy, beta=ALPHA, gamma=0, dst=image)
[docs] def visualize_vehicle_position(self, image: npt.NDArray, frame_i: int, frame_t: int,
uncertainty: bool = False, add_to_legend: bool = False):
assert not self.rosbag_df.empty
color = (0, 0, 255)
vehicle_pose_map = self.get_value_from_rosbag_df(time=frame_t, name='/slam/vehicle_pose.pose')
if vehicle_pose_map is None:
return
if add_to_legend is True:
self.legend['Vehicle Position'] = color
vehicle_position_map = np.array([vehicle_pose_map[:2], vehicle_pose_map[:2]])
vehicle_position_map[1] += [np.cos(vehicle_pose_map[2]) * 3, np.sin(vehicle_pose_map[2]) * 3]
vehicle_position_world = self.transform_map_coordinated_to_world_coordinates(
map_coordinates=vehicle_position_map, frame_i=frame_i).astype(int)
cv2.arrowedLine(img=image, pt1=vehicle_position_world[0], pt2=vehicle_position_world[1],
color=(0, 0, 255), line_type=cv2.LINE_AA, thickness=3)
if uncertainty is True:
vehicle_uncertainty_map = self.get_value_from_rosbag_df(time=frame_t,
name='/slam/vehicle_pose.pose.uncertainty')
if vehicle_uncertainty_map is None:
return
self.visualize_position_covariance(image=image, frame_i=frame_i, center=vehicle_position_map[0],
uncertainty=vehicle_uncertainty_map, color=color)
self.visualize_heading_covariance(image=image, frame_i=frame_i, pose=vehicle_pose_map,
uncertainty=vehicle_uncertainty_map[3, 3], color=color)
[docs] def compute_eigenvalues_and_angle_of_covariance(self, covariance: npt.NDArray):
def eigsorted(cov):
vals, vecs = np.linalg.eigh(cov)
order = vals.argsort()[::-1]
return vals[order], vecs[:, order]
vals, vecs = eigsorted(covariance)
theta = np.arctan2(*vecs[:, 0][::-1])
w, h = 2 * N_STDDEV * np.sqrt(vals)
return (w, h), theta
[docs] def visualize_landmarks(self, image: npt.NDArray, frame_i: int, frame_t: int, uncertainty: bool = False):
assert not self.rosbag_df.empty
landmark_info = self.get_value_from_rosbag_df(time=frame_t, name='/slam/landmarks')
if landmark_info is None:
return
landmarks_map = landmark_info[0]
landmarks_world = self.transform_map_coordinated_to_world_coordinates(
map_coordinates=landmarks_map[:, :2], frame_i=frame_i).astype(int)
for landmark_i, landmark in enumerate(landmarks_world):
cv2.circle(img=image, center=landmark[:2], color=self._cone_colors[int(landmarks_map[landmark_i, 2])],
radius=5, thickness=-1, lineType=cv2.LINE_AA)
if uncertainty is False:
return
landmarks_uncertainty_map = landmark_info[1]
for landmark_i, (landmark, landmark_uncertainty) in enumerate(zip(landmarks_map, landmarks_uncertainty_map)):
self.visualize_position_covariance(image=image, frame_i=frame_i, center=landmark[:2],
uncertainty=landmark_uncertainty,
color=self._cone_colors[int(landmarks_map[landmark_i, 2])])
[docs] def visualize_centerpoints(self, image: npt.NDArray, frame_i: int, frame_t: int, add_to_legend: bool = False):
assert not self.rosbag_df.empty
color = (0, 255, 0)
centerpoints_info = self.get_value_from_rosbag_df(time=frame_t, name='/filtering/centerpoints')
if centerpoints_info is None:
return
if add_to_legend is True:
self.legend['Centerpoints'] = color
centerpoints_map, closed_track = centerpoints_info
centerpoints_world = self.transform_map_coordinated_to_world_coordinates(
map_coordinates=centerpoints_map, frame_i=frame_i).astype(int)
cv2.polylines(img=image, pts=centerpoints_world[np.newaxis], isClosed=closed_track,
thickness=3, lineType=cv2.LINE_AA, color=color)
[docs] def visualize_planned_path(self, image: npt.NDArray, frame_i: int, frame_t: int, add_to_legend: bool = False):
assert not self.rosbag_df.empty
color = (232, 52, 235)
trajectory_info = self.get_value_from_rosbag_df(time=frame_t, name='/motion_planning/trajectory')
if trajectory_info is None:
return
if add_to_legend is True:
self.legend['Planned path'] = color
trajectory_map, closed = trajectory_info
trajectory_world = self.transform_map_coordinated_to_world_coordinates(
map_coordinates=trajectory_map[:, :2], frame_i=frame_i).astype(int)
cv2.polylines(img=image, pts=trajectory_world[np.newaxis], isClosed=closed,
thickness=3, lineType=cv2.LINE_AA, color=color)
[docs] def visualize_predicted_path(self, image: npt.NDArray, frame_i: int, frame_t: int, add_to_legend: bool = False):
assert not self.rosbag_df.empty
color = (255, 255, 0)
predicted_trajectory_map = self.get_value_from_rosbag_df(time=frame_t,
name='/control/predicted_states.trajectory')
if predicted_trajectory_map is None:
return
if add_to_legend is True:
self.legend['Predicted path'] = color
trajectory_world = self.transform_map_coordinated_to_world_coordinates(
map_coordinates=predicted_trajectory_map[:, :2], frame_i=frame_i).astype(int)
cv2.polylines(img=image, pts=trajectory_world[np.newaxis], isClosed=False,
thickness=3, lineType=cv2.LINE_AA, color=color)
[docs] def visualize_driven_path(self, image: npt.NDArray, frame_i: int, frame_t: int, add_to_legend: bool = False):
assert not self.rosbag_df.empty
color = (255, 0, 128)
vehicle_poses_map = self.get_values_from_rosbag_df(name='/slam/vehicle_pose.pose',
start_time=0, end_time=frame_t)
if vehicle_poses_map is None:
return
if add_to_legend is True:
self.legend['Driven path'] = color
vehicle_poses_world = self.transform_map_coordinated_to_world_coordinates(
map_coordinates=np.stack(vehicle_poses_map.to_numpy(), axis=0)[:, :2], frame_i=frame_i).astype(int)
cv2.polylines(img=image, pts=vehicle_poses_world[np.newaxis], isClosed=False,
thickness=3, lineType=cv2.LINE_AA, color=color)
[docs] def visualize_will_have_driven_path(self, image: npt.NDArray, frame_i: int,
frame_t: int, add_to_legend: bool = False):
assert not self.rosbag_df.empty
color = (52, 131, 235)
vehicle_poses_map = self.get_values_from_rosbag_df(name='/slam/vehicle_pose.pose',
start_time=frame_t, end_time=frame_t + CONTROL_TIME_WINDOW)
if vehicle_poses_map is None:
return
if add_to_legend is True:
self.legend['Future driven path'] = color
vehicle_poses_world = self.transform_map_coordinated_to_world_coordinates(
map_coordinates=np.stack(vehicle_poses_map.to_numpy(), axis=0)[:, :2], frame_i=frame_i).astype(int)
cv2.polylines(img=image, pts=vehicle_poses_world[np.newaxis], isClosed=False,
thickness=3, lineType=cv2.LINE_AA, color=color)
[docs] def visualize_gps_measurement(self, image: npt.NDArray, frame_i: int, frame_t: int,
uncertainty: bool, add_to_legend: bool = False):
color = (0, 128, 255)
assert not self.rosbag_df.empty
gps_measurement_map = self.get_value_from_rosbag_df(name='/slam/predicted_measurements/main.gps_position',
time=frame_t)
if gps_measurement_map is None or np.any(np.isnan(gps_measurement_map)):
return
if add_to_legend is True:
self.legend['GPS Measurement'] = color
gps_measurement_world = self.transform_map_coordinated_to_world_coordinates(
map_coordinates=gps_measurement_map[np.newaxis, :2], frame_i=frame_i).astype(int)
cv2.drawMarker(img=image, position=gps_measurement_world[0], color=color, markerType=cv2.MARKER_CROSS,
markerSize=40, thickness=3, line_type=cv2.LINE_AA)
if uncertainty is True:
gps_uncertainty_map = self.get_value_from_rosbag_df(
time=frame_t, name='/slam/predicted_measurements/main.gps_position.uncertainty')
if gps_uncertainty_map is None:
return
covariance = np.square(np.array([[gps_uncertainty_map[0], 0], [0, gps_uncertainty_map[1]]]))
self.visualize_position_covariance(image=image, frame_i=frame_i, center=gps_measurement_map[:2],
color=color, uncertainty=covariance)
[docs] def visualize_speed(self, image: npt.NDArray, frame_t: int, origin: Tuple[int, int]):
color = (0, 128, 255)
origin = np.array((250, 250))
radius = 200
font_scale = 3
alpha = 0.6
assert not self.rosbag_df.empty
vehicle_velocity = max(self.get_value_from_rosbag_df(time=frame_t, name='/slam/vehicle_pose.v_x')[0] * 3.6, 0)
if vehicle_velocity is None:
return
image_copy = image.copy()
middle_angle = int(vehicle_velocity / MAX_VELOCITY * 180 + 180)
cv2.ellipse(img=image, center=origin, axes=(radius, radius), angle=0, startAngle=middle_angle,
endAngle=360, color=(0, 0, 0), thickness=6, lineType=cv2.LINE_AA)
cv2.addWeighted(src1=image, alpha=alpha, src2=image_copy, beta=1 - alpha, gamma=0, dst=image)
cv2.ellipse(img=image, center=origin, axes=(radius, radius), angle=0, startAngle=180,
endAngle=middle_angle, color=(255, 255, 255), thickness=6, lineType=cv2.LINE_AA)
if vehicle_velocity < 10:
text = f'{vehicle_velocity:.2f}'
elif vehicle_velocity < 100:
text = f'{vehicle_velocity:.1f}'
else:
text = f'{int(vehicle_velocity)}'
(font_width2, font_height2), _ = cv2.getTextSize(text='km/h', fontFace=cv2.FONT_HERSHEY_SIMPLEX,
fontScale=1, thickness=2)
cv2.putText(img=image, text='km/h', fontFace=cv2.FONT_HERSHEY_SIMPLEX, thickness=2,
lineType=cv2.LINE_AA, org=(origin - (font_width2, 0)), color=color, fontScale=1)
cv2.putText(img=image, text=text, fontFace=cv2.FONT_HERSHEY_SIMPLEX, thickness=2,
lineType=cv2.LINE_AA, org=(origin - (font_width2, font_height2 + 20)), color=color,
fontScale=font_scale)
[docs] def draw_rounded_rectangle(self, image: npt.NDArray, origin: Tuple[int, int], size: Tuple[int, int],
radius: int, color: Tuple[int, int, int], alpha: float):
image_copy = image.copy()
cv2.rectangle(img=image, color=color, thickness=-1,
pt1=origin + (radius, 0), pt2=origin + size - (radius, 0))
cv2.rectangle(img=image, color=color, thickness=-1,
pt1=origin + (0, radius), pt2=origin + size - (0, radius))
cv2.ellipse(img=image, center=origin + (radius, radius), axes=(radius, radius), angle=180.0,
startAngle=0, endAngle=90, color=color, thickness=-1, lineType=cv2.LINE_AA)
cv2.ellipse(img=image, center=origin + (size[0], 0) + (-radius, radius), axes=(radius, radius), angle=270.0,
startAngle=0, endAngle=90, color=color, thickness=-1, lineType=cv2.LINE_AA)
cv2.ellipse(img=image, center=origin + size - (radius, radius), axes=(radius, radius), angle=0.0,
startAngle=0, endAngle=90, color=color, thickness=-1, lineType=cv2.LINE_AA)
cv2.ellipse(img=image, center=origin + (0, size[1]) + (radius, -radius), axes=(radius, radius), angle=90.0,
startAngle=0, endAngle=90, color=color, thickness=-1, lineType=cv2.LINE_AA)
cv2.addWeighted(src1=image, alpha=alpha, src2=image_copy, beta=1 - alpha, gamma=0, dst=image)
[docs] def visualize_torque(self, image: npt.NDArray, frame_t: int, origin: Tuple[int, int], height: int):
assert not self.rosbag_df.empty
torque = self.get_value_from_rosbag_df(time=frame_t, name='/control/torque/request.torque')
self.draw_rounded_rectangle(image=image, origin=origin, size=(30, height), radius=5, color=(0, 0, 0), alpha=0.5)
if torque is None or torque == 0:
return
torque = max(torque[0], 0)
middle_height = int(height - torque / MAX_TORQUE * height)
self.draw_rounded_rectangle(image=image, origin=origin + (0, middle_height),
size=(30, height - middle_height), radius=5, color=(0, 255, 0), alpha=0.5)
[docs] def visualize_brake(self, image: npt.NDArray, frame_t: int, origin: Tuple[int, int], height: int):
assert not self.rosbag_df.empty
pneumatic_pressure = self.get_value_from_rosbag_df(time=frame_t, name='/control/service_brake/request.pressure')
self.draw_rounded_rectangle(image=image, origin=origin, size=(30, height), radius=5, color=(0, 0, 0), alpha=0.5)
if pneumatic_pressure is None or pneumatic_pressure == 0:
return
pneumatic_pressure = max(pneumatic_pressure[0], 0)
middle_height = int(height - pneumatic_pressure / MAX_PNEUMATIC_PRESSURE * height)
self.draw_rounded_rectangle(image=image, origin=origin + (0, middle_height),
size=(30, height - middle_height), radius=5, color=(0, 0, 255), alpha=0.5)
[docs] def visualize_acceleration(self, image: npt.NDArray, frame_t: int, origin: Tuple[int, int], radius: int):
assert not self.rosbag_df.empty
acceleration = self.get_value_from_rosbag_df(time=frame_t, name='/can/per/imu/accel')
if acceleration is None:
return
axes = np.array((radius, radius))
alpha = 0.4
image_copy = image.copy()
acc_x = acceleration[0]
acc_y = acceleration[1]
acc_offset = np.array((acc_y, -acc_x)) / MAX_ACCELERATION
cv2.ellipse(img=image, center=origin + axes, axes=(0.25 * axes).astype(int), angle=0, startAngle=0,
endAngle=360, color=(255, 255, 255), thickness=3, lineType=cv2.LINE_AA)
cv2.ellipse(img=image, center=origin + axes, axes=(0.75 * axes).astype(int), angle=0, startAngle=0,
endAngle=360, color=(255, 255, 255), thickness=3, lineType=cv2.LINE_AA)
cv2.addWeighted(src1=image, alpha=alpha, src2=image_copy, beta=1 - alpha, gamma=0, dst=image)
cv2.ellipse(img=image, center=origin + axes, axes=(0.5 * axes).astype(int), angle=0, startAngle=0,
endAngle=360, color=(255, 255, 255), thickness=3, lineType=cv2.LINE_AA)
cv2.ellipse(img=image, center=origin + axes, axes=axes.astype(int), angle=0, startAngle=0,
endAngle=360, color=(255, 255, 255), thickness=3, lineType=cv2.LINE_AA)
cv2.line(img=image, pt1=origin + (0, radius), pt2=origin + (2 * radius, radius),
color=(255, 255, 255), thickness=3, lineType=cv2.LINE_AA)
cv2.line(img=image, pt1=origin + (radius, 0), pt2=origin + (radius, 2 * radius),
color=(255, 255, 255), thickness=3, lineType=cv2.LINE_AA)
cv2.circle(img=image, center=(origin + axes + acc_offset * axes).astype(int), radius=6,
color=(255, 0, 0), thickness=-1, lineType=cv2.LINE_AA)
cv2.line(img=image, pt1=origin + axes, pt2=(origin + axes + acc_offset * axes).astype(int),
color=(255, 0, 0), thickness=3, lineType=cv2.LINE_AA)
[docs] def visualize_steering_wheel_angle(self, image: npt.NDArray, frame_t: int, origin: Tuple[int, int], height: int):
assert not self.rosbag_df.empty
steering_wheel_request = self.get_value_from_rosbag_df(
time=frame_t, name='/control/steering_wheel_angle/request.angle')
steering_wheel_actual = self.get_value_from_rosbag_df(
time=frame_t, name='/can/per/steering_wheel_angle/actual.angle')
if steering_wheel_request is None and steering_wheel_actual is None:
return
image_copy = image.copy()
alpha = 0.4
original_ellipse = cv2.ellipse2Poly(center=(500, 500), axes=(500, 500), angle=-90,
arcStart=-115, arcEnd=115, delta=1)
original_ellipse_height = abs(min(original_ellipse[:, 1]) - max(original_ellipse[:, 1]))
ellipse = (0.8 * (original_ellipse / (original_ellipse_height / (height)))
+ (origin + (0.1 * height, 0.1 * height)))
center = np.array((ellipse[np.argmin(original_ellipse[:, 1])][0],
ellipse[np.argmin(original_ellipse[:, 0])][1]))
radius = int(abs(center[0] - origin[0]))
pt2 = center + (radius * np.cos(np.radians(90)), - radius * np.sin(np.radians(90)))
cv2.line(img=image, pt1=center.astype(int), pt2=pt2.astype(int), thickness=3,
lineType=cv2.LINE_AA, color=(255, 0, 0))
cv2.addWeighted(src1=image, alpha=alpha, src2=image_copy, beta=1 - alpha, gamma=0, dst=image)
cv2.polylines(img=image, pts=(ellipse[np.newaxis] * 8).astype(int), isClosed=False,
color=(255, 255, 255), thickness=3, lineType=cv2.LINE_AA, shift=3)
if steering_wheel_request is not None and steering_wheel_actual is not None:
original_error_ellipse = cv2.ellipse2Poly(center=(500, 500), axes=(500, 500),
angle=-90, arcStart=-int(steering_wheel_request),
arcEnd=-int(steering_wheel_actual), delta=1)
error_ellipse = 0.8 * (original_error_ellipse / (original_ellipse_height / (height))) \
+ (origin + (0.1 * height, 0.1 * height))
cv2.polylines(img=image, pts=(error_ellipse[np.newaxis] * 8).astype(int), isClosed=False,
color=(0, 0, 255), thickness=3, lineType=cv2.LINE_AA, shift=3)
if steering_wheel_actual is not None:
pt2 = center + (radius * np.cos(np.radians(steering_wheel_actual + 90)),
- radius * np.sin(np.radians(steering_wheel_actual + 90)))
cv2.line(img=image, pt1=center.astype(int), pt2=pt2.astype(int), thickness=3,
lineType=cv2.LINE_AA, color=(255, 0, 0))
if steering_wheel_request is not None:
pt2 = center + (radius * np.cos(np.radians(steering_wheel_request + 90)),
- radius * np.sin(np.radians(steering_wheel_request + 90)))
cv2.line(img=image, pt1=center.astype(int), pt2=pt2.astype(int), thickness=3,
lineType=cv2.LINE_AA, color=(0, 255, 0))
[docs] def visualize_overlay(self, image: npt.NDArray, frame_t: int):
self.draw_rounded_rectangle(image=image, origin=np.array((25, 25)),
size=(1150, 250), radius=25, color=(0, 0, 0), alpha=0.5)
self.visualize_speed(image=image, frame_t=frame_t, origin=np.array((250, 250)))
self.visualize_torque(image=image, frame_t=frame_t, origin=np.array((500, 50)), height=205)
self.visualize_brake(image=image, frame_t=frame_t, origin=np.array((550, 50)), height=205)
self.visualize_acceleration(image=image, frame_t=frame_t, origin=np.array((650, 50)), radius=100)
self.visualize_steering_wheel_angle(image=image, frame_t=frame_t, origin=np.array((900, 50)), height=200)
[docs] def visualize_legend(self, image: npt.NDArray):
org = np.array([image.shape[1] - 50, image.shape[0] - 50])
total_height = 0
max_width = 0
for text, color in self.legend.items():
(font_width, font_height), _ = cv2.getTextSize(text=text, fontFace=cv2.FONT_HERSHEY_SIMPLEX,
fontScale=LEGEND_TEXT_SCALE, thickness=2)
total_height += font_height + LEGEND_TEXT_DISTANCE
max_width = max(max_width, font_width)
size = (max_width + 50, total_height + 50)
self.draw_rounded_rectangle(image=image, origin=org + (25, 25) - size,
size=size, radius=25, color=(0, 0, 0), alpha=0.5)
for text, color in self.legend.items():
(font_width, font_height), _ = cv2.getTextSize(text=text, fontFace=cv2.FONT_HERSHEY_SIMPLEX,
fontScale=LEGEND_TEXT_SCALE, thickness=2)
cv2.putText(img=image, text=text, fontFace=cv2.FONT_HERSHEY_SIMPLEX, thickness=2,
lineType=cv2.LINE_AA, org=org - (font_width, 0), color=color, fontScale=LEGEND_TEXT_SCALE)
org -= (0, font_height + LEGEND_TEXT_DISTANCE)
[docs] def visualize_image(self, image: npt.NDArray, frame_i: int, frame_t: int):
# self.visualize_feature(image=image, frame_i=frame_i) # noqa: E800
# self.visualize_transform_points(image=image, frame_i=frame_i) # noqa: E800
if self.visualize_only_debug_information is True:
return
self.visualize_vehicle_position(image=image, frame_i=frame_i, frame_t=frame_t,
uncertainty=True, add_to_legend=True)
# self.visualize_fov_gate(image=image, frame_i=frame_i, frame_t=frame_t, add_to_legend=True) # noqa: E800
self.visualize_landmarks(image=image, frame_i=frame_i, frame_t=frame_t, uncertainty=False)
self.visualize_centerpoints(image=image, frame_i=frame_i, frame_t=frame_t, add_to_legend=True)
self.visualize_planned_path(image=image, frame_i=frame_i, frame_t=frame_t, add_to_legend=True)
self.visualize_predicted_path(image=image, frame_i=frame_i, frame_t=frame_t, add_to_legend=True)
self.visualize_driven_path(image=image, frame_i=frame_i, frame_t=frame_t, add_to_legend=True)
self.visualize_will_have_driven_path(image=image, frame_i=frame_i, frame_t=frame_t, add_to_legend=True)
# self.visualize_gps_measurement(image=image, frame_i=frame_i, frame_t=frame_t, uncertainty=True) # noqa: E800
self.visualize_legend(image=image)
self.visualize_overlay(image=image, frame_t=frame_t)
[docs] def generate_output(self):
max_value = np.argwhere((self.video_pts_and_time_to_index[:, 1] >= self.video_start)
& (self.video_pts_and_time_to_index[:, 1] <= self.video_end)).size
variable_widgets = [progressbar.Variable('input_time', format='{name}: {formatted_value}'), ' ',
progressbar.Variable('output_time', format='{name}: {formatted_value}'), ' ']
first = True
with av.open(self.input_video_path) as input_container, \
av.open(f'{self.input_video_path.rsplit(".")[0]}{self.output_suffix}.mp4', mode='w') as output_container, \
generate_progressbar(name='Generating output video | ', data_length=max_value,
variable_widgets=variable_widgets) as bar:
input_stream = input_container.streams.video[0]
input_container.seek(offset=int(self.video_start / input_stream.time_base), stream=input_stream)
output_stream = output_container.add_stream('h264', rate=input_stream.base_rate)
output_stream.pix_fmt = 'yuv420p'
output_stream.codec_context.time_base = Fraction(1, 30000)
output_stream.height = input_stream.height
output_stream.width = input_stream.width
output_stream.bit_rate = input_stream.bit_rate
start_pts = None
artifical_video_end = self.video_pts_and_time_to_index[-1, 1]
for input_frame in input_container.decode(input_stream):
if input_frame.time < self.video_start:
continue
if input_frame.time > self.video_end or input_frame.time > artifical_video_end:
break
if start_pts is None:
start_pts = input_frame.pts
if first is True:
first = False
frame_time = input_frame.time - self.video_start
frame_i = np.argwhere(self.video_pts_and_time_to_index[:, 0] == input_frame.pts)[0, 0]
if self.killer.kill_now is True:
self.killer.kill_now = False
break
image = input_frame.to_ndarray(format='bgr24')
np.save(f'{self.input_video_path.rsplit(".")[0]}{self.output_suffix}.JPG', image)
self.visualize_image(image=image, frame_i=frame_i, frame_t=frame_time)
output_frame = av.VideoFrame.from_ndarray(image, format='bgr24')
output_frame.pts = input_frame.pts - start_pts
for packet in output_stream.encode(output_frame):
output_container.mux(packet)
bar.update(bar.value + 1, input_time=input_frame.time, output_time=frame_time)
if input_frame.index >= self.transformed_features.shape[0] - 1:
break
for packet in output_stream.encode():
output_container.mux(packet)
bar.finish(dirty=True)
if __name__ == '__main__':
drone_visualization = DroneVisualization(input_rosbag='/workspace/as_ros/rosbags/run_370_sim_ni.bag',
input_video='/workspace/as_ros/rosbags/videos/run_370.MP4',
recalculate_transforms=False, killer=GracefulKiller(),
no_new_transforms=False, save_recovery=True,
track_height=30.5, track_width=60, recover_initial_features=True,
recover_offset=True, recover_initial_position=True,
start=5, duration=5, end=None, output='_5')
drone_visualization.run()