Source code for inspection_node
#!/usr/bin/env python3
import rospy
import math
import time
import traceback
from as_can_msgs.msg import SteeringWheelAngle, Velocity, MissionFinished, BrakePressure
[docs]class InspectionMission():
def __init__(self):
self.steering_wheel_angle_request_pub = rospy.Publisher('/control/steering_wheel_angle/request',
SteeringWheelAngle, queue_size=10)
self.velocity_request_pub = rospy.Publisher('/control/velocity/request', Velocity, queue_size=10)
self.service_brake_request_pub = rospy.Publisher('/control/service_brake/request', BrakePressure, queue_size=10)
self.mission_completed_pub = rospy.Publisher('/finish_line_detection/mission_completed', MissionFinished, queue_size=10)
self.run_time = rospy.Duration(secs=25) # T 14.12.3: 25-30s
self.read_config()
self.completed = False
self.start_time = None
[docs] def read_config(self):
self.max_angle = rospy.get_param('/inspection/max_steering_angle')
self.velocity = rospy.get_param('/inspection/velocity')
self.service_brake_pressure = rospy.get_param('/inspection/service_brake_pressure')
self.steering_frequency = rospy.get_param('/inspection/steering_frequency')
[docs] def control_steering_actuator(self):
steering_wheel_angle_request_msg = SteeringWheelAngle()
now = rospy.Time.now()
steering_wheel_angle_request_msg.header.stamp = now
if self.start_time is None:
self.start_time = now
if self.completed is False:
angle = math.sin(2 * math.pi * self.steering_frequency * (now - self.start_time).to_sec()) * self.max_angle
else:
angle = 0
steering_wheel_angle_request_msg.angle = angle
self.steering_wheel_angle_request_pub.publish(steering_wheel_angle_request_msg)
[docs] def control_motor(self):
velocity_request_msg = Velocity()
velocity_request_msg.header.stamp = rospy.Time.now()
if self.completed is False:
velocity_request_msg.velocity = self.velocity
else:
velocity_request_msg.velocity = 0
self.velocity_request_pub.publish(velocity_request_msg)
[docs] def control_service_brake(self):
service_brake_request_msg = BrakePressure()
service_brake_request_msg.header.stamp = rospy.Time.now()
if self.completed is False:
# rospy.loginfo('[Inspection] Request zero pressure.')
service_brake_request_msg.pressure = 0
else:
service_brake_request_msg.pressure = self.service_brake_pressure
self.service_brake_request_pub.publish(service_brake_request_msg)
[docs] def complete(self):
self.completed = True
self.control_steering_actuator()
self.control_motor()
self.control_service_brake()
mission_completed_msg = MissionFinished()
mission_completed_msg.header.stamp = rospy.Time.now()
mission_completed_msg.finished = 1
self.mission_completed_pub.publish(mission_completed_msg)
[docs] def run(self):
rospy.loginfo('[Inspection Node] Inspection Mission started.')
start_time = rospy.Time.now()
run_rate = rospy.Rate(50)
while rospy.Time.now() < (start_time + rospy.Duration(secs=5)) and not rospy.is_shutdown():
self.control_service_brake()
while rospy.Time.now() < (start_time + self.run_time + rospy.Duration(secs=5)) and not rospy.is_shutdown():
self.control_steering_actuator()
self.control_motor()
self.control_service_brake()
run_rate.sleep()
if not rospy.is_shutdown():
self.complete()
rospy.loginfo('[Inspection Node] Inspection Mission finished.')
else:
rospy.loginfo('[Inspection Node] Inspection Mission cancelled.')
if __name__ == '__main__':
rospy.init_node('inspection_mission', anonymous=False)
try:
inspection_mission = InspectionMission()
inspection_mission.run()
rospy.signal_shutdown('Node was shut down since the inspection mission is completed.')
except Exception as e:
if not isinstance(e, rospy.exceptions.ROSInterruptException):
rospy.logfatal(traceback.format_exc())
time.sleep(2)
raise e