[docs]@dataclassclassOusterTelemetry:"""Represents the telemetry data of the ouster sensor."""input_current_ma:intinput_voltage_mv:intinternal_temperature_deg_c:int
[docs]classOusterTelemetryRecorder:""" Records the telemetry data of the ouster sensor. Use the method record_telemetry() to start recording. Attributes: host: IP address or hostname of the ouster sensor port: Port of the ouster sensor interval: Interval in seconds to record the telemetry data output: Path to the output file """def__init__(self,host:str,port:int,interval:int,output:str):self.sock=create_ouster_client(host,port)self.host=hostself.interval=intervalself.output=output
[docs]defget_current_sensor_telemetry(self)->OusterTelemetry:"""Gets the telemetry data from the sensor."""# read telemetry data from Ouster LiDARself.sock.sendall(b"get_telemetry\n")data=self.sock.recv(512)# decode data and parse json to OusterTelemetry objectdata=json.loads(data.decode("utf-8"))returnOusterTelemetry(input_current_ma=data["input_current_ma"],input_voltage_mv=data["input_voltage_mv"],internal_temperature_deg_c=data["internal_temperature_deg_c"],)
[docs]defprepare_telemetry_csv(self)->None:"""Prepares the csv file for recording the telemetry data."""# Initialize DataFramedf=pd.DataFrame(columns=["input_current_ma","input_voltage_mv","internal_temperature_deg_c"])df.index.name="elapsed_time_s"# Check if the output file already existsifos.path.exists(self.output):user_response=input(f"The file already exists: {self.output}\nDo you want to overwrite it? (y/n): ")ifuser_response.lower()!="y":print("Recording aborted. Existing file was not overwritten.")self.sock.close()sys.exit(0)# write csv headerdf.to_csv(self.output)
[docs]defrecord_telemetry(self)->None:"""Records the telemetry data from the sensor continuously."""self.prepare_telemetry_csv()# keep track of number of records for elapsed time (index)num_records=0# record telemetry datawhileTrue:elapsed_time=num_records*self.intervaltelemetry=self.get_current_sensor_telemetry()print(f"Current temperature of Ouster LiDAR: {telemetry.internal_temperature_deg_c} °C after {time.strftime('%M:%S',time.gmtime(elapsed_time))} minutes")# Append telemetry data to csv with elapsed time as index (name)pd.Series(vars(telemetry),name=elapsed_time).to_frame().T.to_csv(self.output,mode='a',header=False)# wait for specified interval before recording next data pointnum_records+=1time.sleep(self.interval)
[docs]defcreate_ouster_client(host:str,port:int)->socket.socket:"""Creates a socket connection to the ouster sensor."""try:sock=socket.socket(socket.AF_INET,socket.SOCK_STREAM)sock.connect((host,port))exceptsocket.errorassocket_err:print(f"Socket connection error: {socket_err}")sys.exit(1)returnsock
[docs]defget_args()->argparse.Namespace:"""Gets the command line arguments."""parser=argparse.ArgumentParser()parser.add_argument("--host",type=str,required=True,help="IP address or hostname of the ouster sensor")parser.add_argument("--port",type=int,required=True,help="Port of the ouster sensor")parser.add_argument("--interval",type=int,required=True,help="Interval in seconds to record the telemetry data")parser.add_argument("--output",type=str,required=True,help="Path to the output file")returnparser.parse_args()