Source code for simulate_local_mapping_for_alignment

#!/usr/bin/env python

import rospy
from std_msgs.msg import Header
from utilities.msg import ConePosition, ConeList  # Adjust according to your package name
import json
import traceback
import numpy as np
import os


[docs]class NoiseType: GAUSSIAN = 0 SYSTEMATIC = 1
APPLY_NOISE = True NOISE_TYPE = NoiseType.SYSTEMATIC TRANSFORM_ANGLE = 2 # degrees TRANSFORM_TRANSLATION = [0.25, 0] # meters
[docs]def publish_cone_positions(test_track, test_day): # Initialize the ROS node rospy.init_node('cone_positions_publisher', anonymous=True) # Create a publisher object pub = rospy.Publisher('/local_mapping/cone_positions', ConeList, queue_size=10) rospy.sleep(1) # Create the message cone_list_msg = ConeList() # Fill in the header fields current_time = rospy.Time.now() cone_list_msg.header = Header(stamp=current_time, frame_id='base_link') cone_list_msg.image_header = Header(stamp=current_time, frame_id='camera_link') cone_list_msg.lidar_header = Header(stamp=current_time, frame_id='lidar_link') # Fill in the processing times cone_list_msg.processing_time = 0.1 # Example value cone_list_msg.lidar_processing_time = 0.05 # Example value cone_list_msg.sensor_time_delta = 0.01 # Example value # Check if manual json file exists if not os.path.exists(f'/workspace/as_ros/src/slam/plots/maps/{test_day}/{test_track}/manual.json'): rospy.logerr("[Local Mapping Simulation] We might need to talk about creating a manual.json.") exit(69) # Load the cone positions from the manual json file with open(f'/workspace/as_ros/src/slam/plots/maps/{test_day}/{test_track}/manual.json') as f: data = json.load(f) # Apply a rigid body transformation to the cone positions cone_positions = [] for x, y in zip(data['x'], data['y']): position = np.array([x, y]) position = position angle = np.deg2rad(TRANSFORM_ANGLE) rotation_matrix = np.array([[np.cos(angle), -np.sin(angle)], [np.sin(angle), np.cos(angle)]]) new_position = np.dot(rotation_matrix, position) + TRANSFORM_TRANSLATION cone_positions.append(new_position) cone_positions_np = np.array(cone_positions) data["x"] = cone_positions_np[:, 0].tolist() data["y"] = cone_positions_np[:, 1].tolist() # Save the transformed cone positions to a new json ground truth file with open(f'/workspace/as_ros/src/slam/plots/maps/{test_day}/{test_track}/ground_truth.json', 'w') as f: json.dump(data, f) # Apply noise to the cone positions for (x, y), id in zip(cone_positions, data['id']): # Move the cones according to the distance between rear axle and lidar x -= 0.57 # Save the cone positions directly to the message if noise is not applied if not APPLY_NOISE: cone_list_msg.cones.append(ConePosition(x=x, y=y, id=int(id), probability=0.9, is_lidar=True)) continue # calculate polar coordinates angle = np.arctan2(y, x) distance = np.sqrt(x**2 + y**2) # Only consider cones within 17 meters if distance > 17: continue # Add noise to the cone positions if NOISE_TYPE == NoiseType.SYSTEMATIC: pass new_angle = angle new_distance = 0.9*distance elif NOISE_TYPE == NoiseType.GAUSSIAN: angle_uncertainty = np.deg2rad(1) distance_uncertainty = 0.15 + 0.05 * distance # sample from the normal distribution new_angle = np.random.normal(angle + np.deg2rad(5), angle_uncertainty) new_distance = np.random.normal(distance, distance_uncertainty) x = new_distance * np.cos(new_angle) y = new_distance * np.sin(new_angle) cone_list_msg.cones.append(ConePosition(x=x, y=y, id=int(id), probability=0.9, is_lidar=True)) # Publish the message rospy.loginfo("[Local Mapping] Publishing cone positions message") pub.publish(cone_list_msg) # Send the message again after a short duration to make slam send landmarks after alignment rospy.sleep(0.5) cone_list_msg.header.stamp = rospy.Time.now() cone_list_msg.image_header.stamp = rospy.Time.now() cone_list_msg.lidar_header.stamp = rospy.Time.now() pub.publish(cone_list_msg) # Sleep for a short duration to ensure the message is sent rospy.sleep(1)
if __name__ == '__main__': # Check if test_track/test_day is set if not rospy.has_param('/test_track/track_layout') or not rospy.has_param('/test_track/test_day'): rospy.logerr("[Local Mapping Simulation] Let's talk about starting SLAM first.") exit(69) test_track = rospy.get_param('/test_track/track_layout') test_day = rospy.get_param('/test_track/test_day') try: publish_cone_positions(test_track, test_day) except rospy.ROSInterruptException: traceback.print_exc() pass