#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Built-in modules
import os
import threading
import multiprocessing
# External modules
import cv2
import numpy as np
import matplotlib.cm
import matplotlib.pyplot as plt
import yaml
# ROS modules
PKG = 'local_mapping'
import roslib; roslib.load_manifest(PKG)
import rospkg
import rospy
import tf2_ros
import ros_numpy
import image_geometry
import message_filters
from cv_bridge import CvBridge, CvBridgeError
from tf.transformations import euler_from_matrix
from sensor_msgs.msg import Image, CameraInfo, PointCloud2
# Config
PLOT_REP_DEBUG = True # Plot re-projected points
PLOT_POINT_CLOUD = True # Plot Complete re-projected point cloud
USE_EXTRINSIC_GUESS = True
FLIP_LIDAR = False
SUBDIRECTORY = '2k'
EXTRINSIC_MATRIX_PATH = os.path.join(rospkg.RosPack().get_path('local_mapping'), 'scripts/matrices', SUBDIRECTORY) + '/lidar_to_camera.npy'
CAMERA_INFO = '/zed2i/zed_node/left/camera_info'
IMAGE_COLOR = '/zed2i/zed_node/left/image_rect_color'
OUSTER_POINTS = '/ouster/points'
RANSAC_ITERATIONS_COUNT = 100_000
USE_NATIVE_POINTS = False # native points include all 9 components (xyzti, etc.), otherwise xyzi is used
# Constraints to show limited LiDAR points in m
CONSTRAINTS_FOR_IN_RANGE = {
'Xmin': 0,
'Xmax': 15,
'Y': 10, # Both left and right (abs),
'Zmax': 1.5
}
# Global variables
PAUSE = False
FIRST_TIME = True
KEY_LOCK = threading.Lock()
TF_BUFFER = None
TF_LISTENER = None
CV_BRIDGE = CvBridge()
CAMERA_MODEL = image_geometry.PinholeCameraModel()
# Global paths
PKG_PATH = os.path.dirname(os.path.dirname(os.path.realpath(__file__)))
CALIB_PATH = 'lidar_camera_calibration/calibration_data/lidar_camera_calibration'
directory, filename = os.path.split(EXTRINSIC_MATRIX_PATH)
# Make the directories if they don't exist
if not os.path.exists(directory):
os.makedirs(directory)
# Make the file if it doesn't exist
if not os.path.exists(EXTRINSIC_MATRIX_PATH):
open(EXTRINSIC_MATRIX_PATH, 'w').close()
[docs]def handle_keyboard():
"""
Handle keyboard interrupt to pause and pick points.
:return: None
"""
global KEY_LOCK, PAUSE
key = input('Press [ENTER] to pause and pick points\n')
with KEY_LOCK: PAUSE = True
[docs]def start_keyboard_handler():
"""
Start keyboard handler thread.
:return: None
"""
keyboard_t = threading.Thread(target=handle_keyboard)
keyboard_t.daemon = True
keyboard_t.start()
[docs]def save_data(data, filename, folder, is_image=False):
"""
Save the point correspondences and image data.
Points data will be appended if file already exists.
:param data: [numpy array] - points or opencv image
:param filename: [str] - filename to save
:param folder: [str] - folder to save at
:param is_image: [bool] - to specify whether points or image data
:return: None
"""
# Empty data
if not len(data): return
# Handle filename
filename = os.path.join(PKG_PATH, os.path.join(folder, filename))
# Create folder
try:
os.makedirs(os.path.join(PKG_PATH, folder))
except OSError:
if not os.path.isdir(os.path.join(PKG_PATH, folder)): raise
# Save image
if is_image:
cv2.imwrite(filename, data)
return
# Save points data
if os.path.isfile(filename):
rospy.logwarn('Updating file: %s' % filename)
data = np.vstack((np.load(filename), data))
np.save(filename, data)
[docs]def calibrate(ouster, image, queue):
"""
Calibrate the LiDAR and image points using OpenCV PnP RANSAC.
Requires minimum 5 point correspondences.
:param ouster: [sensor_msgs/PointCloud2] - ROS ouster PCL2 message
:param image: [numpy array] - image to display
:param queue: [multiprocessing.Queue] - queue to save the extrinsic matrix
:return: None
"""
# Load corresponding points
folder = os.path.join(PKG_PATH, CALIB_PATH)
points2D = np.load(os.path.join(folder, 'img_points.npy'))
points3D = np.load(os.path.join(folder, 'pcl_points.npy'))
# Check points shape
assert(points2D.shape[0] == points3D.shape[0])
if not (points2D.shape[0] >= 5):
rospy.logwarn('PnP RANSAC Requires minimum 5 points')
return
np.set_printoptions(suppress=True)
# Obtain camera matrix and distortion coefficients
camera_matrix = CAMERA_MODEL.intrinsicMatrix()
dist_coeffs = CAMERA_MODEL.distortionCoeffs()
# Estimate extrinsics
if USE_EXTRINSIC_GUESS:
# Load extrinsic matrix
old_extrinsic = np.load(EXTRINSIC_MATRIX_PATH)
old_translation_vector = old_extrinsic[:3, 3].flatten()
old_rotation, _ = cv2.Rodrigues(old_extrinsic[:3, :3])
success, rotation_vector, translation_vector, inliers = cv2.solvePnPRansac(points3D,
points2D, camera_matrix, None, flags=cv2.SOLVEPNP_ITERATIVE, useExtrinsicGuess=True, rvec=old_rotation, tvec=old_translation_vector, iterationsCount=RANSAC_ITERATIONS_COUNT)
else:
success, rotation_vector, translation_vector, inliers = cv2.solvePnPRansac(points3D,
points2D, camera_matrix, None, flags=cv2.SOLVEPNP_ITERATIVE, iterationsCount=RANSAC_ITERATIONS_COUNT)
# Compute re-projection error.
points2D_reproj = cv2.projectPoints(points3D, rotation_vector,
translation_vector, camera_matrix, None)[0].squeeze(1)
assert(points2D_reproj.shape == points2D.shape)
error = (points2D_reproj - points2D)[inliers] # Compute error only over inliers.
error = np.reshape(error, (-1, 2))
rmse = np.sqrt(np.mean(error[:, 0] ** 2 + error[:, 1] ** 2))
rospy.loginfo('Re-projection error before LM refinement (RMSE) in px: ' + str(rmse))
# Refine estimate using LM
if not success:
rospy.logwarn('Initial estimation unsuccessful, skipping refinement')
elif not hasattr(cv2, 'solvePnPRefineLM'):
rospy.logwarn('solvePnPRefineLM requires OpenCV >= 4.1.1, skipping refinement')
else:
assert len(inliers) >= 3, 'LM refinement requires at least 3 inlier points'
rotation_vector, translation_vector = cv2.solvePnPRefineLM(points3D[inliers],
points2D[inliers], camera_matrix, dist_coeffs, rotation_vector, translation_vector)
# Compute re-projection error.
points2D_reproj = cv2.projectPoints(points3D, rotation_vector,
translation_vector, camera_matrix, None)[0].squeeze(1)
assert(points2D_reproj.shape == points2D.shape)
error = (points2D_reproj - points2D)[inliers] # Compute error only over inliers.
error = np.reshape(error, (-1, 2))
rmse = np.sqrt(np.mean(error[:, 0] ** 2 + error[:, 1] ** 2))
rospy.loginfo('Re-projection error after LM refinement (RMSE) in px: ' + str(rmse))
# Plot calibration markers only
if PLOT_REP_DEBUG:
plot_projected_calibration_points(points3D, rotation_vector, translation_vector, camera_matrix,image, color="green")
# Plot point cloud with used extrinsic matrix
if PLOT_POINT_CLOUD:
# Extract points data
ouster = ros_numpy.point_cloud2.pointcloud2_to_array(ouster)
ouster = np.asarray(ouster.tolist())
# Group all beams together and pick the first 4 columns for X, Y, Z, intensity.
ouster = ouster.reshape(-1,9)[:, :4] if USE_NATIVE_POINTS else ouster.reshape(-1, 4)
plot_projected_calibration_points(ouster, rotation_vector, translation_vector, camera_matrix,image, color="gradient")
# Convert rotation vector
rotation_matrix = cv2.Rodrigues(rotation_vector)[0]
euler = euler_from_matrix(rotation_matrix)
# Display results
print('Euler angles (RPY):', euler)
print('Rotation Matrix:', rotation_matrix)
print('Translation Offsets:', translation_vector.T)
# Save the extrinsic matrix with the translation vector and rotation matrix and padding in a 4x4 matrix
transformation_matrix = np.eye(4)
transformation_matrix[:3, :3] = rotation_matrix
transformation_matrix[:3, 3] = translation_vector.flatten()
transformation_matrix[3,3] = 1
# Save numpy in queue
queue.put(transformation_matrix)
[docs]def plot_projected_calibration_points(points3D, rotation_vector, translation_vector, camera_matrix, image, color):
"""
Generates a plot with 3D points projected onto the image.
:param points3D: [numpy array] - (N, 3) array of 3D points
:param rotation_vector: [numpy array] - (3, 1)
:param translation_vector: [numpy array] - (3, 1)
:param camera_matrix: [numpy array] - (3, 3) intrinsic camera matrix
:param image: [cv Mat] - camera image
:param color: [str] - color used for points
:return: None
"""
if color == "gradient":
max_intensity = np.max(points3D[:, -1])
# Color map for the points
cmap = matplotlib.colormaps.get_cmap('jet')
colors = cmap(points3D[:, -1] / max_intensity) # This will be Nx4 array for RGBA
points = points3D[:, :3].astype(np.float32)
reprojected_points = cv2.projectPoints(points, rotation_vector, translation_vector, camera_matrix, None)[0].squeeze(1)
fig = plt.figure()
ax = fig.add_subplot(111)
ax.imshow(image)
ax.set_xlim(0, image.shape[1])
ax.set_ylim(image.shape[0], 0)
# Plotting the 2D points on the image
if color == "gradient":
# Flatten the RGBA values to match the points count
colors = colors.reshape(-1, 4) # Ensure colors are Nx4 array
ax.scatter(reprojected_points[:, 0], reprojected_points[:, 1], c=colors, marker='o')
else:
# Use a single color for all points
ax.scatter(reprojected_points[:, 0], reprojected_points[:, 1], c=color, marker='o')
ax.set_title("Calibrated image", color="black")
plt.show()
[docs]def callback(image, camera_info, ouster):
"""
Callback function to handle calibration and save new calibration matrix
:param image: [sensor_msgs/Image] - ROS sensor image message
:param camera_info: [sensor_msgs/CameraInfo] - ROS sensor camera info message
:param ouster: [sensor_msgs/PointCloud2] - ROS ouster PCL2 message
:return: None
"""
global CAMERA_MODEL, FIRST_TIME, PAUSE, TF_BUFFER, TF_LISTENER
# Setup the pinhole camera model
if FIRST_TIME:
FIRST_TIME = False
# Setup camera model
rospy.loginfo('Setting up camera model')
CAMERA_MODEL.fromCameraInfo(camera_info)
# TF listener
rospy.loginfo('Setting up static transform listener')
TF_BUFFER = tf2_ros.Buffer()
TF_LISTENER = tf2_ros.TransformListener(TF_BUFFER)
# Calibration mode
elif PAUSE:
# Create GUI processes
now = rospy.get_rostime()
# Read image using CV bridge
try:
img = CV_BRIDGE.imgmsg_to_cv2(image, 'bgr8')
except CvBridgeError as e:
rospy.logerr(e)
return
disp = cv2.cvtColor(img.copy(), cv2.COLOR_BGR2RGB)
# Create queue
queue = multiprocessing.Queue()
# Start two threads to simultaneously extract points
img_p = multiprocessing.Process(target=extract_points_2D, args=[disp, now])
pcl_p = multiprocessing.Process(target=extract_points_3D, args=[ouster, now])
img_p.start(); pcl_p.start()
img_p.join(); pcl_p.join()
# Calibrate for existing corresponding points
calib_p = multiprocessing.Process(target=calibrate, args=[ouster, disp, queue])
calib_p.start(); calib_p.join()
# Input whether to save the extrinsic matrix
save = input('Save extrinsic matrix? [y/n]: ')
if save == 'y':
# Save the transformation matrix
transformation_matrix = queue.get()
np.save(EXTRINSIC_MATRIX_PATH, transformation_matrix)
save_to_transforms = input('Save matrix to transforms [n/car]: ')
if save_to_transforms != 'n':
# Save the transformation matrix to the car's transforms
car_transforms = os.path.join(rospkg.RosPack().get_path('utilities'), f'config/transforms/{save_to_transforms}.yaml')
# Load the yaml file
with open(car_transforms, 'r') as file:
data = yaml.safe_load(file)
# Update the matrix
rotation_matrix = transformation_matrix[:3, :3]
# Get degrees from the rotation matrix
euler = euler_from_matrix(rotation_matrix)
# Convert to degrees
euler = np.degrees(euler).tolist()
translation_vector = transformation_matrix[:3, 3].tolist()
for i, axis in enumerate(['x', 'y', 'z']):
data['tf']['zed2i_left_camera_optical_frame']['os_sensor']['rotation'][axis] = euler[i]
data['tf']['zed2i_left_camera_optical_frame']['os_sensor']['translation'][axis] = translation_vector[i]
# Save the updated yaml back to the file
with open(car_transforms, 'w') as file:
yaml.dump(data, file, sort_keys=False)
# Resume listener
with KEY_LOCK: PAUSE = False
start_keyboard_handler()
[docs]def listener(camera_info, image_color, ouster_points):
"""
The main ROS node which handles the topics
:param camera_info: [str] - ROS sensor camera info topic
:param image_color: [str] - ROS sensor image topic
:param ouster_points: [str] - ROS ouster PCL2 topic
:return: None
"""
# Start node
rospy.init_node('calibrate_camera_lidar', anonymous=True)
rospy.loginfo('Current PID: [%d]' % os.getpid())
rospy.loginfo('CameraInfo topic: %s' % camera_info)
rospy.loginfo('Image topic: %s' % image_color)
rospy.loginfo('PointCloud2 topic: %s' % ouster_points)
# Subscribe to topics
info_sub = message_filters.Subscriber(camera_info, CameraInfo)
image_sub = message_filters.Subscriber(image_color, Image)
ouster_sub = message_filters.Subscriber(ouster_points, PointCloud2)
# Synchronize the topics by time
ats = message_filters.ApproximateTimeSynchronizer(
[image_sub, info_sub, ouster_sub], queue_size=5, slop=0.1)
ats.registerCallback(callback)
# Keep python from exiting until this node is stopped
try:
rospy.spin()
except rospy.ROSInterruptException:
rospy.loginfo('Shutting down')
if __name__ == '__main__':
# Start keyboard handler thread
start_keyboard_handler()
# Start subscriber
listener(CAMERA_INFO, IMAGE_COLOR, OUSTER_POINTS)