Source code for ouster_telemetry

#!/usr/bin/python3

import argparse
import os
import sys
import time
from dataclasses import dataclass
import pandas as pd
import socket
import json


[docs]@dataclass class OusterTelemetry: """Represents the telemetry data of the ouster sensor.""" input_current_ma: int input_voltage_mv: int internal_temperature_deg_c: int
[docs]class OusterTelemetryRecorder: """ Records the telemetry data of the ouster sensor. Use the method record_telemetry() to start recording. Attributes: host: IP address or hostname of the ouster sensor port: Port of the ouster sensor interval: Interval in seconds to record the telemetry data output: Path to the output file """ def __init__(self, host: str, port: int, interval: int, output: str): self.sock = create_ouster_client(host, port) self.host = host self.interval = interval self.output = output
[docs] def get_current_sensor_telemetry(self) -> OusterTelemetry: """Gets the telemetry data from the sensor.""" # read telemetry data from Ouster LiDAR self.sock.sendall(b"get_telemetry\n") data = self.sock.recv(512) # decode data and parse json to OusterTelemetry object data = json.loads(data.decode("utf-8")) return OusterTelemetry( input_current_ma=data["input_current_ma"], input_voltage_mv=data["input_voltage_mv"], internal_temperature_deg_c=data["internal_temperature_deg_c"], )
[docs] def prepare_telemetry_csv(self) -> None: """Prepares the csv file for recording the telemetry data.""" # Initialize DataFrame df = pd.DataFrame(columns=["input_current_ma", "input_voltage_mv", "internal_temperature_deg_c"]) df.index.name = "elapsed_time_s" # Check if the output file already exists if os.path.exists(self.output): user_response = input(f"The file already exists: {self.output}\nDo you want to overwrite it? (y/n): ") if user_response.lower() != "y": print("Recording aborted. Existing file was not overwritten.") self.sock.close() sys.exit(0) # write csv header df.to_csv(self.output)
[docs] def record_telemetry(self) -> None: """Records the telemetry data from the sensor continuously.""" self.prepare_telemetry_csv() # keep track of number of records for elapsed time (index) num_records = 0 # record telemetry data while True: elapsed_time = num_records * self.interval telemetry = self.get_current_sensor_telemetry() print( f"Current temperature of Ouster LiDAR: {telemetry.internal_temperature_deg_c} °C after {time.strftime('%M:%S', time.gmtime(elapsed_time))} minutes") # Append telemetry data to csv with elapsed time as index (name) pd.Series(vars(telemetry), name=elapsed_time).to_frame().T.to_csv(self.output, mode='a', header=False) # wait for specified interval before recording next data point num_records += 1 time.sleep(self.interval)
[docs]def create_ouster_client(host: str, port: int) -> socket.socket: """Creates a socket connection to the ouster sensor.""" try: sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.connect((host, port)) except socket.error as socket_err: print(f"Socket connection error: {socket_err}") sys.exit(1) return sock
[docs]def get_args() -> argparse.Namespace: """Gets the command line arguments.""" parser = argparse.ArgumentParser() parser.add_argument("--host", type=str, required=True, help="IP address or hostname of the ouster sensor") parser.add_argument("--port", type=int, required=True, help="Port of the ouster sensor") parser.add_argument("--interval", type=int, required=True, help="Interval in seconds to record the telemetry data") parser.add_argument("--output", type=str, required=True, help="Path to the output file") return parser.parse_args()
if __name__ == "__main__": args = get_args() recorder = OusterTelemetryRecorder( host=args.host, port=args.port, interval=args.interval, output=args.output, ) try: recorder.record_telemetry() except KeyboardInterrupt: print("\nRecording ended.") except Exception as err: print(f"Unexpected error: {str(err)}") finally: recorder.sock.close() print("Socket closed.") sys.exit(0)