Source code for inspection_node

#!/usr/bin/env python3
import rospy
import math
import time
import traceback

from as_can_msgs.msg import SteeringWheelAngle, Velocity, MissionFinished, BrakePressure


[docs]class InspectionMission(): def __init__(self): self.steering_wheel_angle_request_pub = rospy.Publisher('/control/steering_wheel_angle/request', SteeringWheelAngle, queue_size=10) self.velocity_request_pub = rospy.Publisher('/control/velocity/request', Velocity, queue_size=10) self.service_brake_request_pub = rospy.Publisher('/control/service_brake/request', BrakePressure, queue_size=10) self.mission_completed_pub = rospy.Publisher('/finish_line_detection/mission_completed', MissionFinished, queue_size=10) self.run_time = rospy.Duration(secs=25) # T 14.12.3: 25-30s self.read_config() self.completed = False self.start_time = None
[docs] def read_config(self): self.max_angle = rospy.get_param('/inspection/max_steering_angle') self.velocity = rospy.get_param('/inspection/velocity') self.service_brake_pressure = rospy.get_param('/inspection/service_brake_pressure') self.steering_frequency = rospy.get_param('/inspection/steering_frequency')
[docs] def control_steering_actuator(self): steering_wheel_angle_request_msg = SteeringWheelAngle() now = rospy.Time.now() steering_wheel_angle_request_msg.header.stamp = now if self.start_time is None: self.start_time = now if self.completed is False: angle = math.sin(2 * math.pi * self.steering_frequency * (now - self.start_time).to_sec()) * self.max_angle else: angle = 0 steering_wheel_angle_request_msg.angle = angle self.steering_wheel_angle_request_pub.publish(steering_wheel_angle_request_msg)
[docs] def control_motor(self): velocity_request_msg = Velocity() velocity_request_msg.header.stamp = rospy.Time.now() if self.completed is False: velocity_request_msg.velocity = self.velocity else: velocity_request_msg.velocity = 0 self.velocity_request_pub.publish(velocity_request_msg)
[docs] def control_service_brake(self): service_brake_request_msg = BrakePressure() service_brake_request_msg.header.stamp = rospy.Time.now() if self.completed is False: # rospy.loginfo('[Inspection] Request zero pressure.') service_brake_request_msg.pressure = 0 else: service_brake_request_msg.pressure = self.service_brake_pressure self.service_brake_request_pub.publish(service_brake_request_msg)
[docs] def complete(self): self.completed = True self.control_steering_actuator() self.control_motor() self.control_service_brake() mission_completed_msg = MissionFinished() mission_completed_msg.header.stamp = rospy.Time.now() mission_completed_msg.finished = 1 self.mission_completed_pub.publish(mission_completed_msg)
[docs] def run(self): rospy.loginfo('[Inspection Node] Inspection Mission started.') start_time = rospy.Time.now() run_rate = rospy.Rate(50) while rospy.Time.now() < (start_time + rospy.Duration(secs=5)) and not rospy.is_shutdown(): self.control_service_brake() while rospy.Time.now() < (start_time + self.run_time + rospy.Duration(secs=5)) and not rospy.is_shutdown(): self.control_steering_actuator() self.control_motor() self.control_service_brake() run_rate.sleep() if not rospy.is_shutdown(): self.complete() rospy.loginfo('[Inspection Node] Inspection Mission finished.') else: rospy.loginfo('[Inspection Node] Inspection Mission cancelled.')
if __name__ == '__main__': rospy.init_node('inspection_mission', anonymous=False) try: inspection_mission = InspectionMission() inspection_mission.run() rospy.signal_shutdown('Node was shut down since the inspection mission is completed.') except Exception as e: if not isinstance(e, rospy.exceptions.ROSInterruptException): rospy.logfatal(traceback.format_exc()) time.sleep(2) raise e