Source code for filtering_node

#!/usr/bin/python3
"""Filtering ROS node to calculate centerline and track widths based on the landmarks estimated by SLAM."""
from enum import Enum
from typing import Optional
import time
import traceback

import numpy as np
import numpy.typing as npt

import rospy

from std_msgs.msg import Header
from utilities.msg import ConeListWithCovariance, VehiclePose, CenterPoints

from filtering_lib import TrackFiltering


[docs]class AutonomousMissionEnum(Enum): """Enum to describe which Autonomous Mission is currently select.""" MANUAL_DRIVING = 0 ACCELERATION = 1 SKIDPAD = 2 TRACKDRIVE = 3 BRAKETEST = 4 INSPECTION = 5 AUTOCROSS = 6
[docs]class Filtering(): """ Filtering ROS node to calculate centerline and track widths based on the landmarks estimated by SLAM. Attributes ---------- track_filtering : TrackFiltering TrackFiltering object to centerline and track widths. vehicle_pose : VehiclePose Latest vehicle pose estimated by SLAM. slam_closed_track : bool Indicates whether SLAM assumes that the track is closed. filtering_closed_track : bool Indicates whether filtering assumes that the track is closed. cones : Optional[npt.NDArray] Latest by SLAM estimated cones. centerpoints_publisher : rospy.Publisher ROS publisher to publish calculated centerpoints and track widths. lr_cog : float Offset between rear axle and center of gravity (cog) of vehicle. Used for moving vehicle position to cog. """ def __init__(self) -> None: """Initialize filtering ROS node.""" self.track_filtering = TrackFiltering( local_fov_angle_rad=np.radians(rospy.get_param('/filtering/local_fov_angle_deg')), local_fov_range_m=rospy.get_param('/filtering/local_fov_range_m'), track_width_min=rospy.get_param('/filtering/track_width_min')) self.perceived_n_threshold = rospy.get_param('/filtering/perceived_n_threshold') self.vehicle_pose = VehiclePose() self.slam_closed_track = False self.filtering_closed_track = False self.cones: Optional[npt.NDArray] = None rospy.Subscriber('/slam/landmarks', ConeListWithCovariance, self.landmarks_callback, queue_size=10) rospy.Subscriber('/slam/vehicle_pose', VehiclePose, self.vehicle_pose_callback, queue_size=10) self.centerpoints_publisher = rospy.Publisher('/filtering/centerpoints', CenterPoints, queue_size=10) self.lr_cog = rospy.get_param('model/loads/lr_cog')
[docs] def publish_centerpoints(self, image_header: Header): """ Publish centerpoints to ROS. Parameters ---------- image_header : Header Image header the centerpoints have been calculated on. """ centerpoints_msg = CenterPoints() centerpoints_msg.header.stamp = rospy.Time.now() centerpoints_msg.header.frame_id = 'map' centerpoints_msg.closed_track = self.filtering_closed_track centerpoints_msg.left_track_width = self.track_width[:, 0].tolist() centerpoints_msg.right_track_width = self.track_width[:, 1].tolist() centerpoints_msg.x = self.centerpoints[:, 0].tolist() centerpoints_msg.y = self.centerpoints[:, 1].tolist() centerpoints_msg.vehicle_pose = self.vehicle_pose centerpoints_msg.image_header = image_header self.centerpoints_publisher.publish(centerpoints_msg)
[docs] def landmarks_callback(self, landmarks: ConeListWithCovariance): """ Buffer landmark positions estimated by SLAM and received via ROS. Parameters ---------- landmarks : ConeListWithCovariance Landmark positions estimated by SLAM. """ self.slam_closed_track = landmarks.closed_track cones = np.array([[cone.x, cone.y, cone.id] for cone in landmarks.cones if cone.perceived_n > self.perceived_n_threshold]) if cones.size == 0: rospy.loginfo(f'[Filtering] No cones left after filtering {len(landmarks.cones)} cones out ' f'since they have been perceived less than {self.perceived_n_threshold}') return if (self.filtering_closed_track is False or self.cones is None or (self.cones is not None and np.array_equal(self.cones, cones))): self.centerpoints, self.track_width, self.filtering_closed_track = self.track_filtering.filtering_main( left_cones=cones[cones[:, 2] == 1, :2], right_cones=cones[cones[:, 2] == 0, :2], orange_cones=cones[cones[:, 2] == 3, :2], x_m=self.vehicle_pose.x, y_m=self.vehicle_pose.y, psi_rad=self.vehicle_pose.psi, global_track=self.slam_closed_track) if self.filtering_closed_track: self.cones = cones self.publish_centerpoints(image_header=landmarks.header)
[docs] def vehicle_pose_callback(self, vehicle_pose: VehiclePose): """ Buffer vehicle pose estimated by SLAM and received via SLAM. Also moves vehicle position from rear axle to center of gravity. .. todo:: Get offset between rear axle and center of gravity from transform tree. Parameters ---------- msg : VehiclePose Current vehicle position estimated by SLAM and received via SLAM. """ self.vehicle_pose = vehicle_pose # Transform vehicle position from rear axle to COG self.vehicle_pose.x = self.vehicle_pose.x + self.lr_cog * np.cos(self.vehicle_pose.psi) self.vehicle_pose.y = self.vehicle_pose.y + self.lr_cog * np.sin(self.vehicle_pose.psi)
if __name__ == '__main__': rospy.init_node(name='filtering', anonymous=False) try: if AutonomousMissionEnum(rospy.get_param('/mission')) not in [AutonomousMissionEnum.SKIDPAD, AutonomousMissionEnum.ACCELERATION, AutonomousMissionEnum.BRAKETEST]: rospy.loginfo('[Filtering] Filtering started since we are driving' f'{AutonomousMissionEnum(rospy.get_param("/mission"))} *broom broom*') filtering = Filtering() rospy.spin() else: rospy.loginfo('[Filtering] Filtering not started since we are driving' f'{AutonomousMissionEnum(rospy.get_param("/mission"))} *broom broom*') except Exception as e: if not isinstance(e, rospy.exceptions.ROSInterruptException): rospy.logfatal(traceback.format_exc()) time.sleep(2) raise e