Source code for calibration.lidar_camera_calibration.calibrate_camera_lidar

#!/usr/bin/env python
# -*- coding: utf-8 -*-


# Built-in modules
import os
import threading
import multiprocessing

# External modules
import cv2
import numpy as np
import matplotlib.cm
import matplotlib.pyplot as plt
import yaml

# ROS modules
PKG = 'local_mapping'
import roslib; roslib.load_manifest(PKG)
import rospkg
import rospy
import tf2_ros
import ros_numpy
import image_geometry
import message_filters
from cv_bridge import CvBridge, CvBridgeError
from tf.transformations import euler_from_matrix
from sensor_msgs.msg import Image, CameraInfo, PointCloud2

# Config
PLOT_REP_DEBUG = True # Plot re-projected points
PLOT_POINT_CLOUD = True # Plot Complete re-projected point cloud
USE_EXTRINSIC_GUESS = True
FLIP_LIDAR = False
SUBDIRECTORY = '2k'
EXTRINSIC_MATRIX_PATH = os.path.join(rospkg.RosPack().get_path('local_mapping'), 'scripts/matrices', SUBDIRECTORY) + '/lidar_to_camera.npy'
CAMERA_INFO = '/zed2i/zed_node/left/camera_info'
IMAGE_COLOR = '/zed2i/zed_node/left/image_rect_color'
OUSTER_POINTS = '/ouster/points'
RANSAC_ITERATIONS_COUNT = 100_000
USE_NATIVE_POINTS = False  # native points include all 9 components (xyzti, etc.), otherwise xyzi is used

# Constraints to show limited LiDAR points in m
CONSTRAINTS_FOR_IN_RANGE = {
    'Xmin': 0,
    'Xmax': 15,
    'Y': 10, # Both left and right (abs),
    'Zmax': 1.5
}

# Global variables
PAUSE = False
FIRST_TIME = True
KEY_LOCK = threading.Lock()
TF_BUFFER = None
TF_LISTENER = None
CV_BRIDGE = CvBridge()
CAMERA_MODEL = image_geometry.PinholeCameraModel()

# Global paths
PKG_PATH = os.path.dirname(os.path.dirname(os.path.realpath(__file__)))
CALIB_PATH = 'lidar_camera_calibration/calibration_data/lidar_camera_calibration'

directory, filename = os.path.split(EXTRINSIC_MATRIX_PATH)

# Make the directories if they don't exist
if not os.path.exists(directory):
    os.makedirs(directory)

# Make the file if it doesn't exist
if not os.path.exists(EXTRINSIC_MATRIX_PATH):
    open(EXTRINSIC_MATRIX_PATH, 'w').close()

[docs]def handle_keyboard(): """ Handle keyboard interrupt to pause and pick points. :return: None """ global KEY_LOCK, PAUSE key = input('Press [ENTER] to pause and pick points\n') with KEY_LOCK: PAUSE = True
[docs]def start_keyboard_handler(): """ Start keyboard handler thread. :return: None """ keyboard_t = threading.Thread(target=handle_keyboard) keyboard_t.daemon = True keyboard_t.start()
[docs]def save_data(data, filename, folder, is_image=False): """ Save the point correspondences and image data. Points data will be appended if file already exists. :param data: [numpy array] - points or opencv image :param filename: [str] - filename to save :param folder: [str] - folder to save at :param is_image: [bool] - to specify whether points or image data :return: None """ # Empty data if not len(data): return # Handle filename filename = os.path.join(PKG_PATH, os.path.join(folder, filename)) # Create folder try: os.makedirs(os.path.join(PKG_PATH, folder)) except OSError: if not os.path.isdir(os.path.join(PKG_PATH, folder)): raise # Save image if is_image: cv2.imwrite(filename, data) return # Save points data if os.path.isfile(filename): rospy.logwarn('Updating file: %s' % filename) data = np.vstack((np.load(filename), data)) np.save(filename, data)
[docs]def extract_points_2D(disp, now): """ Runs the image point selection GUI process. Saves them in img_points.npy. :param disp: [numpy array] - image to display :param now: [int] - ROS bag time in seconds :return: None """ # Log PID rospy.loginfo('2D Picker PID: [%d]' % os.getpid()) # Setup matplotlib GUI fig = plt.figure() ax = fig.add_subplot(111) ax.set_title('Select 2D Image Points - %d' % now.secs) ax.set_axis_off() ax.imshow(disp) # Pick points picked, corners = [], [] lines = [] def onclick(event): x = event.xdata y = event.ydata if (x is None) or (y is None): return # Display the picked point picked.append((x, y)) corners.append((x, y)) rospy.loginfo('IMG: %s', str(picked[-1])) if len(picked) > 1: # Draw the line temp = np.array(picked) line, = ax.plot(temp[:, 0], temp[:, 1]) lines.append(line) ax.figure.canvas.draw_idle() # Reset list for future pick events del picked[0] # Undo last point def onkey(event): if event.key == 'r': if corners: # Remove the last picked point popped_point = corners.pop() rospy.loginfo('Removed IMG: %s', str(popped_point)) if lines: # Remove the last drawn line line = lines.pop() line.remove() ax.figure.canvas.draw_idle() # Display GUI fig.canvas.mpl_connect('button_press_event', onclick) fig.canvas.mpl_connect('key_press_event', onkey) plt.show() # Warn if duplicate points if len(corners) != len(list(set(corners))): rospy.logwarn('Removed last duplicate. Make sure to select also the same LiDAR points.') # Save image points save_data(corners, 'img_points.npy', CALIB_PATH)
[docs]def extract_points_3D(ouster, now): """ Runs the LiDAR point selection GUI process. Saves them in pcl_points.npy. :param ouster: [sensor_msgs/PointCloud2] - ROS ouster PCL2 message :param now: [int] - ROS bag time in seconds :return: None """ global CONSTRAINTS_FOR_IN_RANGE Xmin, Xmax, Y, Zmax = (CONSTRAINTS_FOR_IN_RANGE['Xmin'], CONSTRAINTS_FOR_IN_RANGE['Xmax'], CONSTRAINTS_FOR_IN_RANGE['Y'], CONSTRAINTS_FOR_IN_RANGE['Zmax']) # Log PID rospy.loginfo('3D Picker PID: [%d]' % os.getpid()) # Extract points data points = ros_numpy.point_cloud2.pointcloud2_to_array(ouster) points = np.asarray(points.tolist()) # Group all beams together and pick the first 4 columns for X, Y, Z, intensity. points = points.reshape(-1,9)[:, :4] if USE_NATIVE_POINTS else points.reshape(-1, 4) # Select points within chessboard range inrange = np.where((points[:, 0] > Xmin) & # X (points[:, 0] < Xmax) & # X (np.abs(points[:, 1]) < Y) & # Y (points[:, 2] < Zmax)) # Z points = points[inrange[0]] if points.shape[0] > 5: rospy.loginfo('PCL points available: %d', points.shape[0]) else: rospy.logwarn('Very few PCL points available in range') return # Color map for the points cmap = matplotlib.cm.get_cmap('hsv') colors = cmap(points[:, -1] / np.max(points[:, -1])) # Setup matplotlib GUI fig = plt.figure() ax = fig.add_subplot(111, projection='3d') ax.set_title('Select 3D LiDAR Points - %d Selection mode: disabled ' % now.secs, color='black') ax.set_axis_off() ax.set_facecolor((0, 0, 0)) ax.scatter(points[:, 0], points[:, 1], points[:, 2], c=colors, s=5, picker=5) # Equalize display aspect ratio for all axes max_range = (np.array([points[:, 0].max() - points[:, 0].min(), points[:, 1].max() - points[:, 1].min(), points[:, 2].max() - points[:, 2].min()]).max() / 2.0) mid_x = (points[:, 0].max() + points[:, 0].min()) * 0.5 mid_y = (points[:, 1].max() + points[:, 1].min()) * 0.5 mid_z = (points[:, 2].max() + points[:, 2].min()) * 0.5 ax.set_xlim(mid_x - max_range, mid_x + max_range) ax.set_ylim(mid_y - max_range, mid_y + max_range) ax.set_zlim(mid_z - max_range, mid_z + max_range) if FLIP_LIDAR: ax.set_box_aspect([-1,-1,-1]) # Pick points picked, corners = [], [] lines = [] selection_mode = False def onpick(event): nonlocal selection_mode if not selection_mode: return ind = event.ind[0] x, y, z = event.artist._offsets3d # Ignore if same point selected again if picked and (x[ind] == picked[-1][0] and y[ind] == picked[-1][1] and z[ind] == picked[-1][2]): return # Display picked point picked.append((x[ind], y[ind], z[ind])) corners.append((x[ind], y[ind], z[ind])) rospy.loginfo('PCL: %s', str(picked[-1])) if len(picked) > 1: # Draw the line temp = np.array(picked) line, = ax.plot(temp[:, 0], temp[:, 1], temp[:, 2]) lines.append(line) ax.figure.canvas.draw_idle() # Reset list for future pick events del picked[0] def toggle_selection_mode(): nonlocal selection_mode selection_mode = not selection_mode title = f"Select 3D LiDAR Points - Time: {now.secs} Selection mode: {'enabled' if selection_mode else 'disabled'}" ax.set_title(title, color='black') ax.figure.canvas.draw_idle() print(f"Selection mode {'enabled' if selection_mode else 'disabled'}.") # Undo last LiDAR point def onkey(event): if event.key == 'r': if corners: # Remove the last picked point popped_point = corners.pop() rospy.loginfo('Removed PCL: %s', str(popped_point)) if lines: # Remove the last drawn line line = lines.pop() line.remove() ax.figure.canvas.draw_idle() elif event.key == 'd': toggle_selection_mode() # Display GUI fig.canvas.mpl_connect('pick_event', onpick) fig.canvas.mpl_connect('key_press_event', onkey) fig.canvas.manager.window.showMaximized() plt.show() # Warn if duplicate points if len(corners) != len(list(set(corners))): rospy.logwarn('Removed last duplicate. Make sure to select also the same Camera points.') # Save LiDAR points save_data(corners, 'pcl_points.npy', CALIB_PATH)
[docs]def calibrate(ouster, image, queue): """ Calibrate the LiDAR and image points using OpenCV PnP RANSAC. Requires minimum 5 point correspondences. :param ouster: [sensor_msgs/PointCloud2] - ROS ouster PCL2 message :param image: [numpy array] - image to display :param queue: [multiprocessing.Queue] - queue to save the extrinsic matrix :return: None """ # Load corresponding points folder = os.path.join(PKG_PATH, CALIB_PATH) points2D = np.load(os.path.join(folder, 'img_points.npy')) points3D = np.load(os.path.join(folder, 'pcl_points.npy')) # Check points shape assert(points2D.shape[0] == points3D.shape[0]) if not (points2D.shape[0] >= 5): rospy.logwarn('PnP RANSAC Requires minimum 5 points') return np.set_printoptions(suppress=True) # Obtain camera matrix and distortion coefficients camera_matrix = CAMERA_MODEL.intrinsicMatrix() dist_coeffs = CAMERA_MODEL.distortionCoeffs() # Estimate extrinsics if USE_EXTRINSIC_GUESS: # Load extrinsic matrix old_extrinsic = np.load(EXTRINSIC_MATRIX_PATH) old_translation_vector = old_extrinsic[:3, 3].flatten() old_rotation, _ = cv2.Rodrigues(old_extrinsic[:3, :3]) success, rotation_vector, translation_vector, inliers = cv2.solvePnPRansac(points3D, points2D, camera_matrix, None, flags=cv2.SOLVEPNP_ITERATIVE, useExtrinsicGuess=True, rvec=old_rotation, tvec=old_translation_vector, iterationsCount=RANSAC_ITERATIONS_COUNT) else: success, rotation_vector, translation_vector, inliers = cv2.solvePnPRansac(points3D, points2D, camera_matrix, None, flags=cv2.SOLVEPNP_ITERATIVE, iterationsCount=RANSAC_ITERATIONS_COUNT) # Compute re-projection error. points2D_reproj = cv2.projectPoints(points3D, rotation_vector, translation_vector, camera_matrix, None)[0].squeeze(1) assert(points2D_reproj.shape == points2D.shape) error = (points2D_reproj - points2D)[inliers] # Compute error only over inliers. error = np.reshape(error, (-1, 2)) rmse = np.sqrt(np.mean(error[:, 0] ** 2 + error[:, 1] ** 2)) rospy.loginfo('Re-projection error before LM refinement (RMSE) in px: ' + str(rmse)) # Refine estimate using LM if not success: rospy.logwarn('Initial estimation unsuccessful, skipping refinement') elif not hasattr(cv2, 'solvePnPRefineLM'): rospy.logwarn('solvePnPRefineLM requires OpenCV >= 4.1.1, skipping refinement') else: assert len(inliers) >= 3, 'LM refinement requires at least 3 inlier points' rotation_vector, translation_vector = cv2.solvePnPRefineLM(points3D[inliers], points2D[inliers], camera_matrix, dist_coeffs, rotation_vector, translation_vector) # Compute re-projection error. points2D_reproj = cv2.projectPoints(points3D, rotation_vector, translation_vector, camera_matrix, None)[0].squeeze(1) assert(points2D_reproj.shape == points2D.shape) error = (points2D_reproj - points2D)[inliers] # Compute error only over inliers. error = np.reshape(error, (-1, 2)) rmse = np.sqrt(np.mean(error[:, 0] ** 2 + error[:, 1] ** 2)) rospy.loginfo('Re-projection error after LM refinement (RMSE) in px: ' + str(rmse)) # Plot calibration markers only if PLOT_REP_DEBUG: plot_projected_calibration_points(points3D, rotation_vector, translation_vector, camera_matrix,image, color="green") # Plot point cloud with used extrinsic matrix if PLOT_POINT_CLOUD: # Extract points data ouster = ros_numpy.point_cloud2.pointcloud2_to_array(ouster) ouster = np.asarray(ouster.tolist()) # Group all beams together and pick the first 4 columns for X, Y, Z, intensity. ouster = ouster.reshape(-1,9)[:, :4] if USE_NATIVE_POINTS else ouster.reshape(-1, 4) plot_projected_calibration_points(ouster, rotation_vector, translation_vector, camera_matrix,image, color="gradient") # Convert rotation vector rotation_matrix = cv2.Rodrigues(rotation_vector)[0] euler = euler_from_matrix(rotation_matrix) # Display results print('Euler angles (RPY):', euler) print('Rotation Matrix:', rotation_matrix) print('Translation Offsets:', translation_vector.T) # Save the extrinsic matrix with the translation vector and rotation matrix and padding in a 4x4 matrix transformation_matrix = np.eye(4) transformation_matrix[:3, :3] = rotation_matrix transformation_matrix[:3, 3] = translation_vector.flatten() transformation_matrix[3,3] = 1 # Save numpy in queue queue.put(transformation_matrix)
[docs]def plot_projected_calibration_points(points3D, rotation_vector, translation_vector, camera_matrix, image, color): """ Generates a plot with 3D points projected onto the image. :param points3D: [numpy array] - (N, 3) array of 3D points :param rotation_vector: [numpy array] - (3, 1) :param translation_vector: [numpy array] - (3, 1) :param camera_matrix: [numpy array] - (3, 3) intrinsic camera matrix :param image: [cv Mat] - camera image :param color: [str] - color used for points :return: None """ if color == "gradient": max_intensity = np.max(points3D[:, -1]) # Color map for the points cmap = matplotlib.colormaps.get_cmap('jet') colors = cmap(points3D[:, -1] / max_intensity) # This will be Nx4 array for RGBA points = points3D[:, :3].astype(np.float32) reprojected_points = cv2.projectPoints(points, rotation_vector, translation_vector, camera_matrix, None)[0].squeeze(1) fig = plt.figure() ax = fig.add_subplot(111) ax.imshow(image) ax.set_xlim(0, image.shape[1]) ax.set_ylim(image.shape[0], 0) # Plotting the 2D points on the image if color == "gradient": # Flatten the RGBA values to match the points count colors = colors.reshape(-1, 4) # Ensure colors are Nx4 array ax.scatter(reprojected_points[:, 0], reprojected_points[:, 1], c=colors, marker='o') else: # Use a single color for all points ax.scatter(reprojected_points[:, 0], reprojected_points[:, 1], c=color, marker='o') ax.set_title("Calibrated image", color="black") plt.show()
[docs]def callback(image, camera_info, ouster): """ Callback function to handle calibration and save new calibration matrix :param image: [sensor_msgs/Image] - ROS sensor image message :param camera_info: [sensor_msgs/CameraInfo] - ROS sensor camera info message :param ouster: [sensor_msgs/PointCloud2] - ROS ouster PCL2 message :return: None """ global CAMERA_MODEL, FIRST_TIME, PAUSE, TF_BUFFER, TF_LISTENER # Setup the pinhole camera model if FIRST_TIME: FIRST_TIME = False # Setup camera model rospy.loginfo('Setting up camera model') CAMERA_MODEL.fromCameraInfo(camera_info) # TF listener rospy.loginfo('Setting up static transform listener') TF_BUFFER = tf2_ros.Buffer() TF_LISTENER = tf2_ros.TransformListener(TF_BUFFER) # Calibration mode elif PAUSE: # Create GUI processes now = rospy.get_rostime() # Read image using CV bridge try: img = CV_BRIDGE.imgmsg_to_cv2(image, 'bgr8') except CvBridgeError as e: rospy.logerr(e) return disp = cv2.cvtColor(img.copy(), cv2.COLOR_BGR2RGB) # Create queue queue = multiprocessing.Queue() # Start two threads to simultaneously extract points img_p = multiprocessing.Process(target=extract_points_2D, args=[disp, now]) pcl_p = multiprocessing.Process(target=extract_points_3D, args=[ouster, now]) img_p.start(); pcl_p.start() img_p.join(); pcl_p.join() # Calibrate for existing corresponding points calib_p = multiprocessing.Process(target=calibrate, args=[ouster, disp, queue]) calib_p.start(); calib_p.join() # Input whether to save the extrinsic matrix save = input('Save extrinsic matrix? [y/n]: ') if save == 'y': # Save the transformation matrix transformation_matrix = queue.get() np.save(EXTRINSIC_MATRIX_PATH, transformation_matrix) save_to_transforms = input('Save matrix to transforms [n/car]: ') if save_to_transforms != 'n': # Save the transformation matrix to the car's transforms car_transforms = os.path.join(rospkg.RosPack().get_path('utilities'), f'config/transforms/{save_to_transforms}.yaml') # Load the yaml file with open(car_transforms, 'r') as file: data = yaml.safe_load(file) # Update the matrix rotation_matrix = transformation_matrix[:3, :3] # Get degrees from the rotation matrix euler = euler_from_matrix(rotation_matrix) # Convert to degrees euler = np.degrees(euler).tolist() translation_vector = transformation_matrix[:3, 3].tolist() for i, axis in enumerate(['x', 'y', 'z']): data['tf']['zed2i_left_camera_optical_frame']['os_sensor']['rotation'][axis] = euler[i] data['tf']['zed2i_left_camera_optical_frame']['os_sensor']['translation'][axis] = translation_vector[i] # Save the updated yaml back to the file with open(car_transforms, 'w') as file: yaml.dump(data, file, sort_keys=False) # Resume listener with KEY_LOCK: PAUSE = False start_keyboard_handler()
[docs]def listener(camera_info, image_color, ouster_points): """ The main ROS node which handles the topics :param camera_info: [str] - ROS sensor camera info topic :param image_color: [str] - ROS sensor image topic :param ouster_points: [str] - ROS ouster PCL2 topic :return: None """ # Start node rospy.init_node('calibrate_camera_lidar', anonymous=True) rospy.loginfo('Current PID: [%d]' % os.getpid()) rospy.loginfo('CameraInfo topic: %s' % camera_info) rospy.loginfo('Image topic: %s' % image_color) rospy.loginfo('PointCloud2 topic: %s' % ouster_points) # Subscribe to topics info_sub = message_filters.Subscriber(camera_info, CameraInfo) image_sub = message_filters.Subscriber(image_color, Image) ouster_sub = message_filters.Subscriber(ouster_points, PointCloud2) # Synchronize the topics by time ats = message_filters.ApproximateTimeSynchronizer( [image_sub, info_sub, ouster_sub], queue_size=5, slop=0.1) ats.registerCallback(callback) # Keep python from exiting until this node is stopped try: rospy.spin() except rospy.ROSInterruptException: rospy.loginfo('Shutting down')
if __name__ == '__main__': # Start keyboard handler thread start_keyboard_handler() # Start subscriber listener(CAMERA_INFO, IMAGE_COLOR, OUSTER_POINTS)