Source code for calibration.calibrate_extrinsic_matrix

#!/usr/bin/env python3
import signal
import threading
from typing import Any, Optional
import click
from pathlib import Path
import os.path
import json

import numpy as np
import numpy.typing as npt
import cv2
from cv_bridge import CvBridge
import matplotlib.pyplot as plt

import rospkg
import rosbag
import tf.transformations as tft

from sensor_msgs.msg import Image, CameraInfo


"""
Usage:
- Set up a pattern on the floor and measure its positions -> points_in_world
- Extract one image from the rosbag using export.launch (image is saved to .ros)
- Use GIMP or a similar tool to get the positions in the image -> points_in_image
- Get the intrinsic matrix K from the camera_info topic -> intrinsic_matrix
- Run the script, the extrinsic matrix is saved automatically
"""

BOARD_POINTS_IN_WORLD = np.array([[593, 510, 0], [593, 0, 0], [593, -510, 0],
                                  [296.5, 510, 0], [296.5, 0, 0], [296.5, -510, 0],
                                  [0,   510, 0], [0,   0, 0], [0,   -510, 0]], dtype=float)  # measure by hand

BOARD_CONTROL_POINTS_IN_WORLD = np.array([[148, 378, 0], [410, 378, 0],
                                          [519, -365, 0], [148, -305, 0]], dtype=float)  # measure by hand

MANUAL_POINTS_IN_WORLD = np.array([[6.33670616, 1.38793337, 0.325], [6.49126244, -1.46336901, 0.325],
                                   [4.55862474, 1.38464952, 0.325], [4.62539816, -1.59668756, 0.325],
                                   [2.87427402,   1.38464952, 0.325], [2.94407439, -1.59668756, 0.325]], dtype=float) * 1000  # measure by hand

MANUAL_CONTROL_POINTS_IN_WORLD = np.array([[5, 0.5, 0.325], [5, -0.5, 0.325],
                                           [3, -0.5, 0.325], [3, 0.5, 0.325]], dtype=float) * 1000  # measure by hand


[docs]class GracefulKiller: kill_now = False def __init__(self): signal.signal(signal.SIGINT, self.exit_gracefully) signal.signal(signal.SIGTERM, self.exit_gracefully)
[docs] def exit_gracefully(self, *args): self.kill_now = True
[docs]class FeatureSelector(): _cone_colors = [(0, 255, 255), (255, 0, 0), (0, 165, 255), (0, 0, 255)] def __init__(self, image: npt.NDArray, killer: GracefulKiller, control_cone_ids: npt.NDArray, calibration_cone_ids: npt.NDArray, event: threading.Event, points_in_world: npt.NDArray): self.image = image self.original_image = self.image.copy() self.killer = killer self.event = event self.features = [] self.finished = False self.stop = False self.points_in_world = points_in_world self.control_cone_ids = control_cone_ids self.calibration_cone_ids = calibration_cone_ids cv2.namedWindow('feature_selector_window', cv2.WINDOW_NORMAL) cv2.setMouseCallback('feature_selector_window', self.add_feature)
[docs] def show(self): while not self.killer.kill_now and self.stop is False: cv2.imshow('feature_selector_window', self.image) key = cv2.waitKey(50) if key == ord('\r') and self.finished: self.stop = True elif key == ord('\b') and self.finished is False: del self.features[-1] self.image = self.original_image.copy() for index, feature in enumerate(self.features): self.draw_marker(feature, index) cv2.destroyWindow('feature_selector_window')
[docs] def draw_marker(self, pos, index): if self.calibration_cone_ids is None: color = (0, 0, 255) else: color = self._cone_colors[self.calibration_cone_ids[index]] cv2.drawMarker(self.image, (pos[0], pos[1]), color, markerType=cv2.MARKER_CROSS, markerSize=10, thickness=1, line_type=cv2.LINE_AA)
[docs] def add_feature(self, event, x, y, flags, param): if event == cv2.EVENT_LBUTTONDOWN and self.finished is False: self.draw_marker((x, y), len(self.features)) self.features.append((x, y)) if len(self.features) == len(self.points_in_world): print(self.features) self.finished = True self.event.set()
[docs] def draw_control_polygon(self, image_pixels: npt.NDArray, draw_polygon: bool): print(f'Dawing control polygon: {image_pixels}') for image_pixel_i, image_pixel in enumerate(image_pixels): if self.control_cone_ids is None: color = (255, 0, 0) else: color = self._cone_colors[self.control_cone_ids[image_pixel_i]] cv2.drawMarker(self.image, (image_pixel), color, markerType=cv2.MARKER_TILTED_CROSS, markerSize=10, thickness=1, line_type=cv2.LINE_AA) if draw_polygon is True: cv2.polylines(img=self.image, pts=image_pixels[np.newaxis], isClosed=True, color=(255, 0, 0), thickness=2, lineType=cv2.LINE_AA)
[docs]class ExtrinsicCalibration(): def __init__(self, intrinsic_matrix, points_in_world, points_in_image): self.intrinsic_matrix = intrinsic_matrix self.points_in_world = points_in_world self.points_in_image = points_in_image
[docs] def calculate_extrinsic_matrix(self, extrinsic_matrix: npt.NDArray): ret, rvecs, tvecs = cv2.solvePnP(self.points_in_world, self.points_in_image, self.intrinsic_matrix, None) rotation_matrix, _ = cv2.Rodrigues(rvecs) # convert from 3x3 to 4x4 rotation_matrix = np.pad(rotation_matrix, (0, 1), mode='constant') rotation_matrix[3, 3] = 1 world_f_camera = tft.concatenate_matrices(tft.translation_matrix(tvecs.flatten()), rotation_matrix) extrinsic_matrix[:, :] = tft.inverse_matrix(world_f_camera)
[docs]class Calibration(): _cone_heights = [0.325, 0.325, 0.325, 0.505] _cone_colors = ['gold', 'blue', 'orange', 'orange'] def __init__(self, input_rosbag: str, start: float, duration: float, end: float, data_source: str, track_layout: str, test_day: str, distance: float, camera: str, calibration_matrices_subdirectory: str) -> None: self.input_rosbag_path = input_rosbag self.input_rosbag = rosbag.Bag(input_rosbag) self.input_rosbag_start = self.input_rosbag.get_start_time() self.start = self.input_rosbag_start + start self.end = end or self.input_rosbag.get_end_time() if duration: self.end = min(self.start + duration, self.end) self.distance = np.array([distance, 0, 0]) self.calibration_matrices_subdirectory = calibration_matrices_subdirectory if camera.lower() == 'both': self.cameras_to_calibrate = ['left', 'right'] else: self.cameras_to_calibrate = [camera.lower()] self.control_cone_ids = None self.calibration_cone_ids = None self.data_source = data_source if data_source == 'board': self.points_in_world = BOARD_POINTS_IN_WORLD self.control_points_in_world = BOARD_CONTROL_POINTS_IN_WORLD elif data_source == 'manual': self.points_in_world = MANUAL_POINTS_IN_WORLD self.control_points_in_world = MANUAL_CONTROL_POINTS_IN_WORLD elif data_source == 'ground_truth': if test_day is None or track_layout is None: print('Testday and Tracklayout must be specified if ground truth data source is chosen.') exit() self.test_day = test_day self.track_layout = track_layout self.import_ground_truth() self.bridge = CvBridge() self.killer = GracefulKiller()
[docs] def plot_ground_truth(self): plt.scatter(self.points_in_world[:, 0], self.points_in_world[:, 1], marker='+', label='Cones for calibration', color=np.choose(self.calibration_cone_ids, self._cone_colors), s=800, linewidths=5) plt.scatter(self.control_points_in_world[:, 0], self.control_points_in_world[:, 1], marker='x', linewidths=5, label='Cones for control', color=np.choose(self.control_cone_ids, self._cone_colors), s=800) for index, points in enumerate(self.points_in_world[:, :2]): plt.annotate(index, points, size=30) for first_point, second_point in zip(self.points_in_world[:-1, :2], self.points_in_world[1:, :2]): distance = (second_point - first_point) head_length = 0.2 * np.sqrt(distance[0]**2 + distance[1]**2) plt.arrow(first_point[0], first_point[1], 0.8 * distance[0], 0.8 * distance[1], head_width=0.5*head_length, head_length=head_length, length_includes_head=False) plt.axis('equal') plt.legend() plt.show(block=False)
[docs] def import_ground_truth(self): rospack = rospkg.RosPack() path = f'{rospack.get_path("slam")}/plots/maps/{self.test_day}/{self.track_layout}/ground_truth.json' if not os.path.exists(path): print(f'Ground truth track does not exist: {path}') with open(path, 'r') as input: ground_truth = json.load(input) cone_coordinates = np.c_[ground_truth['x'], ground_truth['y']] cone_ids = np.array(ground_truth['id'], dtype=int) calibrating_cones_mask = np.array(ground_truth['registration']) == 1 control_cones_mask = np.array(ground_truth['registration']) != 1 self.points_in_world = np.c_[cone_coordinates[calibrating_cones_mask], np.choose(cone_ids[calibrating_cones_mask], self._cone_heights)] * 1000 self.control_points_in_world = np.c_[cone_coordinates[control_cones_mask], np.choose(cone_ids[control_cones_mask], self._cone_heights)] * 1000 self.calibration_cone_ids = cone_ids[calibrating_cones_mask] self.control_cone_ids = cone_ids[control_cones_mask] self.plot_ground_truth()
[docs] def find_first_message(self, searched_topic) -> Optional[Any]: for topic, msg, t in self.input_rosbag.read_messages(): if t.to_sec() < self.start: continue if t.to_sec() > self.end: break if topic == searched_topic: return msg
[docs] def find_image(self, camera: str) -> Optional[npt.NDArray]: image_msg: Image = self.find_first_message(searched_topic=f'/zed2i/zed_node/{camera}/image_rect_color') if image_msg is not None: return self.bridge.imgmsg_to_cv2(image_msg) image_msg: Image = self.find_first_message(searched_topic=f'/zed2i/zed_node/{camera}/image_rect_color/compressed') if image_msg is not None: return cv2.cvtColor(cv2.imdecode(np.frombuffer(image_msg.data, np.uint8), cv2.IMREAD_COLOR), cv2.COLOR_BGR2BGRA) else: print(f'Did not find any suitable image for {camera} camera. Skip')
[docs] def find_intrinsic_matrix(self, camera: str) -> Optional[npt.NDArray]: camera_info_msg: CameraInfo = self.find_first_message(searched_topic=f'/zed2i/zed_node/{camera}/camera_info') if camera_info_msg is not None: return np.array(camera_info_msg.P).reshape((3, 4))[:3, :3] else: print(f'Did not find any suitable camera info for {camera} camera. Skip')
[docs] def project_world_coordinates_to_image_pixels(self, coordinates: npt.NDArray, camera: str, extrinsic_matrix: npt.NDArray, intrinsic_matrix: npt.NDArray): combined_matrix = np.dot(np.c_[intrinsic_matrix, np.zeros(3)], np.linalg.inv(extrinsic_matrix)) image_pixels = np.einsum('ab, cb -> ca', combined_matrix, np.c_[coordinates, np.ones(coordinates.shape[0])]) return (image_pixels / image_pixels[:, 2, None])[:, :2].astype(np.int32)
[docs] def calibrate_extrinsic_matrix(self, feature_selector: FeatureSelector, camera: str, intrinsic_matrix: npt.NDArray, extrinsic_matrix: npt.NDArray): self.feature_selector_event.wait() self.feature_selector_event.clear() extrinsic_calibration = ExtrinsicCalibration(intrinsic_matrix=intrinsic_matrix, points_in_world=self.points_in_world + self.distance, points_in_image=np.array(feature_selector.features, dtype=np.float32)) extrinsic_calibration.calculate_extrinsic_matrix(extrinsic_matrix) feature_selector.draw_control_polygon(image_pixels=self.project_world_coordinates_to_image_pixels( coordinates=self.control_points_in_world + self.distance, camera=camera, extrinsic_matrix=extrinsic_matrix, intrinsic_matrix=intrinsic_matrix ), draw_polygon=self.data_source != 'ground_truth')
[docs] def calibrate_camera(self, camera: str): print(f'Calibrating {camera} camera') image = self.find_image(camera=camera) if image is None: return None intrinsic_matrix = self.find_intrinsic_matrix(camera=camera) if intrinsic_matrix is None: return None print(f'Image and camera info for {camera} found') self.feature_selector_event = threading.Event() feature_selector = FeatureSelector(image=image, killer=self.killer, points_in_world=self.points_in_world, event=self.feature_selector_event, calibration_cone_ids =self.calibration_cone_ids, control_cone_ids=self.control_cone_ids) print('Start feature selection') extrinsic_matrix = np.empty((4, 4)) extrinsic_calibration_thread = threading.Thread(target=self.calibrate_extrinsic_matrix, daemon=True, args=(feature_selector, camera, intrinsic_matrix, extrinsic_matrix)) extrinsic_calibration_thread.start() feature_selector.show() if not click.confirm('Did the control polygon look good?', default=True): if click.confirm('Do you want to try again?', default=True): return False return None self.save_matrices(camera=camera, extrinsic_matrix=extrinsic_matrix, intrinsic_matrix=intrinsic_matrix) print(f'Sucessfully calibrated {camera} camera.') return True
[docs] def save_matrices(self, camera: str, extrinsic_matrix: npt.NDArray, intrinsic_matrix: npt.NDArray): rospack = rospkg.RosPack() base_path = f'{rospack.get_path("local_mapping")}/scripts/matrices/{self.calibration_matrices_subdirectory}/' Path(base_path).mkdir(parents=True, exist_ok=True) np.save(f'{base_path}extrinsic_matrix_{camera}.npy', extrinsic_matrix) np.save(f'{base_path}intrinsic_matrix_{camera}.npy', intrinsic_matrix) print(f'Matrices for {camera} camera saved')
[docs] def run(self): for camera in self.cameras_to_calibrate: while self.calibrate_camera(camera=camera) is False and self.killer.kill_now is False: pass
@click.command() @click.argument('input_rosbag', type=click.Path(exists=True, file_okay=True, dir_okay=False)) @click.option('-s', '--start', default=0, type=float, help='relative start time of the new bag') @click.option('-d', '--duration', type=float, help='maximal duration of the new bag') @click.option('-e', '--end', type=float, help='relative end time of the new bag') @click.option('-l', '--distance', type=float, help='Distance between camera and begin of calibration board', prompt='Distance in [mm] from camera to begin of calibration board') @click.option('-cms', '--calibration-matrices-subdirectory', type=str, default='', help='subdirectory for camera calibration matrices') @click.option('-c', '--camera', help='camera to calibrate', default='both', type=click.Choice(['left', 'right', 'both'], case_sensitive=False)) @click.option('-ds', '--data-source', help='Data source for Calibraition. Standardized calibration board, manual points or cones from ground truth', default='board', type=click.Choice(['board', 'manual', 'ground_truth'], case_sensitive=False)) @click.option('-tl', '--track-layout', type=str, help='Track layout the ground truth is saved under') @click.option('-td', '--test-day', type=str, help='Test day the ground truth is saved under') def init(input_rosbag: str, camera: str, start: float, duration: float, end: float, track_layout: str, test_day: str, calibration_matrices_subdirectory: str, distance: float, data_source: str): calibration = Calibration(input_rosbag=input_rosbag, camera=camera, distance=distance, start=start, duration=duration, end=end, data_source=data_source, test_day=test_day, track_layout=track_layout, calibration_matrices_subdirectory=calibration_matrices_subdirectory) calibration.run() if __name__ == '__main__': calibration = Calibration(input_rosbag='/workspace/as_ros/rosbags/trackdrive_cal.bag', camera='left', start=0, duration=None, end=None, distance=4000, calibration_matrices_subdirectory='') calibration.run()