Source code for transforms

#!/usr/bin/python3
import time
import threading
import traceback
import yaml
import os
from typing import Optional, List
import numpy as np

import rospy

# Because of transformations
import tf_conversions
import tf2_ros

from geometry_msgs.msg import TransformStamped
from utilities.msg import VehiclePose


[docs]class TransformationHandler(): rate = 50 def __init__(self, vehicle_name: str): self.vehicle_name = vehicle_name self.map_rear_axle_lock = threading.Lock() self.map_rear_axle_transform: Optional[TransformStamped] = None self.transforms: List[TransformStamped] = [] self.prepare_transforms() rospy.Subscriber('/slam/vehicle_pose', VehiclePose, self.vehicle_pose_callback) self.transform_broadcaster = tf2_ros.TransformBroadcaster() self.transform_broadcast_thread = threading.Thread(target=self.broadcast_transforms, daemon=True)
[docs] def create_transform_from_config(self, frame_id: str, child_frame_id: str, transform_config: dict): t = TransformStamped() t.header.frame_id = frame_id t.child_frame_id = child_frame_id t.transform.translation.x = transform_config['translation']['x'] t.transform.translation.y = transform_config['translation']['y'] t.transform.translation.z = transform_config['translation']['z'] q = tf_conversions.transformations.quaternion_from_euler(np.radians(transform_config['rotation']['x']), np.radians(transform_config['rotation']['y']), np.radians(transform_config['rotation']['z'])) t.transform.rotation.x = q[0] t.transform.rotation.y = q[1] t.transform.rotation.z = q[2] t.transform.rotation.w = q[3] return t
[docs] def prepare_transforms(self): config_path = f'{os.path.abspath(os.path.dirname(__file__))}/../config/transforms/{self.vehicle_name}.yaml' with open(config_path, 'r') as config_file: config: dict = yaml.safe_load(config_file) for frame_id, child_frames in config['tf'].items(): for child_frame_id, transform_config in child_frames.items(): self.transforms.append(self.create_transform_from_config(frame_id=frame_id, child_frame_id=child_frame_id, transform_config=transform_config))
[docs] def vehicle_pose_callback(self, vehicle_pose: VehiclePose): q = tf_conversions.transformations.quaternion_from_euler(0, 0, vehicle_pose.psi) if self.map_rear_axle_transform is None: with self.map_rear_axle_lock: self.map_rear_axle_transform = TransformStamped() self.map_rear_axle_transform.child_frame_id = 'rear_axle' self.map_rear_axle_transform.header.frame_id = 'map' if self.map_rear_axle_lock: self.map_rear_axle_transform.header.stamp = vehicle_pose.header.stamp self.map_rear_axle_transform.transform.translation.x = vehicle_pose.x self.map_rear_axle_transform.transform.translation.y = vehicle_pose.y self.map_rear_axle_transform.transform.translation.z = 0.0 self.map_rear_axle_transform.transform.rotation.x = q[0] self.map_rear_axle_transform.transform.rotation.y = q[1] self.map_rear_axle_transform.transform.rotation.z = q[2] self.map_rear_axle_transform.transform.rotation.w = q[3]
[docs] def broadcast_transforms(self): rate = rospy.Rate(self.rate) while not rospy.is_shutdown(): self.send_transforms() rate.sleep()
[docs] def send_transforms(self): now = rospy.Time.now() for transform in self.transforms: transform.header.stamp = now with self.map_rear_axle_lock: self.transform_broadcaster.sendTransform( self.transforms + ([self.map_rear_axle_transform] if self.map_rear_axle_transform is not None else []))
[docs] def wait_for_start(self): while rospy.get_time() == 0 and not rospy.is_shutdown(): time.sleep(0.01) if not rospy.is_shutdown(): self.transform_broadcast_thread.start()
if __name__ == '__main__': rospy.init_node('map_rear_axle_transform_publisher') try: vehicle_name = rospy.get_param('/vehicle/name') transformation_handler = TransformationHandler(vehicle_name=vehicle_name) transformation_handler.wait_for_start() rospy.spin() except Exception as e: if not isinstance(e, rospy.exceptions.ROSInterruptException): rospy.logfatal(traceback.format_exc()) time.sleep(2) raise e