Source code for transforms
#!/usr/bin/python3
import time
import threading
import traceback
import yaml
import os
from typing import Optional, List
import numpy as np
import rospy
# Because of transformations
import tf_conversions
import tf2_ros
from geometry_msgs.msg import TransformStamped
from utilities.msg import VehiclePose
[docs]class TransformationHandler():
rate = 50
def __init__(self, vehicle_name: str):
self.vehicle_name = vehicle_name
self.map_rear_axle_lock = threading.Lock()
self.map_rear_axle_transform: Optional[TransformStamped] = None
self.transforms: List[TransformStamped] = []
self.prepare_transforms()
rospy.Subscriber('/slam/vehicle_pose', VehiclePose, self.vehicle_pose_callback)
self.transform_broadcaster = tf2_ros.TransformBroadcaster()
self.transform_broadcast_thread = threading.Thread(target=self.broadcast_transforms, daemon=True)
[docs] def create_transform_from_config(self, frame_id: str, child_frame_id: str, transform_config: dict):
t = TransformStamped()
t.header.frame_id = frame_id
t.child_frame_id = child_frame_id
t.transform.translation.x = transform_config['translation']['x']
t.transform.translation.y = transform_config['translation']['y']
t.transform.translation.z = transform_config['translation']['z']
q = tf_conversions.transformations.quaternion_from_euler(np.radians(transform_config['rotation']['x']),
np.radians(transform_config['rotation']['y']),
np.radians(transform_config['rotation']['z']))
t.transform.rotation.x = q[0]
t.transform.rotation.y = q[1]
t.transform.rotation.z = q[2]
t.transform.rotation.w = q[3]
return t
[docs] def prepare_transforms(self):
config_path = f'{os.path.abspath(os.path.dirname(__file__))}/../config/transforms/{self.vehicle_name}.yaml'
with open(config_path, 'r') as config_file:
config: dict = yaml.safe_load(config_file)
for frame_id, child_frames in config['tf'].items():
for child_frame_id, transform_config in child_frames.items():
self.transforms.append(self.create_transform_from_config(frame_id=frame_id,
child_frame_id=child_frame_id,
transform_config=transform_config))
[docs] def vehicle_pose_callback(self, vehicle_pose: VehiclePose):
q = tf_conversions.transformations.quaternion_from_euler(0, 0, vehicle_pose.psi)
if self.map_rear_axle_transform is None:
with self.map_rear_axle_lock:
self.map_rear_axle_transform = TransformStamped()
self.map_rear_axle_transform.child_frame_id = 'rear_axle'
self.map_rear_axle_transform.header.frame_id = 'map'
if self.map_rear_axle_lock:
self.map_rear_axle_transform.header.stamp = vehicle_pose.header.stamp
self.map_rear_axle_transform.transform.translation.x = vehicle_pose.x
self.map_rear_axle_transform.transform.translation.y = vehicle_pose.y
self.map_rear_axle_transform.transform.translation.z = 0.0
self.map_rear_axle_transform.transform.rotation.x = q[0]
self.map_rear_axle_transform.transform.rotation.y = q[1]
self.map_rear_axle_transform.transform.rotation.z = q[2]
self.map_rear_axle_transform.transform.rotation.w = q[3]
[docs] def broadcast_transforms(self):
rate = rospy.Rate(self.rate)
while not rospy.is_shutdown():
self.send_transforms()
rate.sleep()
[docs] def send_transforms(self):
now = rospy.Time.now()
for transform in self.transforms:
transform.header.stamp = now
with self.map_rear_axle_lock:
self.transform_broadcaster.sendTransform(
self.transforms + ([self.map_rear_axle_transform] if self.map_rear_axle_transform is not None else []))
[docs] def wait_for_start(self):
while rospy.get_time() == 0 and not rospy.is_shutdown():
time.sleep(0.01)
if not rospy.is_shutdown():
self.transform_broadcast_thread.start()
if __name__ == '__main__':
rospy.init_node('map_rear_axle_transform_publisher')
try:
vehicle_name = rospy.get_param('/vehicle/name')
transformation_handler = TransformationHandler(vehicle_name=vehicle_name)
transformation_handler.wait_for_start()
rospy.spin()
except Exception as e:
if not isinstance(e, rospy.exceptions.ROSInterruptException):
rospy.logfatal(traceback.format_exc())
time.sleep(2)
raise e