#!/usr/bin/python3
import argparse
import os
import sys
import time
from dataclasses import dataclass
import pandas as pd
import socket
import json
[docs]@dataclass
class OusterTelemetry:
"""Represents the telemetry data of the ouster sensor."""
input_current_ma: int
input_voltage_mv: int
internal_temperature_deg_c: int
[docs]class OusterTelemetryRecorder:
"""
Records the telemetry data of the ouster sensor.
Use the method record_telemetry() to start recording.
Attributes:
host: IP address or hostname of the ouster sensor
port: Port of the ouster sensor
interval: Interval in seconds to record the telemetry data
output: Path to the output file
"""
def __init__(self, host: str, port: int, interval: int, output: str):
self.sock = create_ouster_client(host, port)
self.host = host
self.interval = interval
self.output = output
[docs] def get_current_sensor_telemetry(self) -> OusterTelemetry:
"""Gets the telemetry data from the sensor."""
# read telemetry data from Ouster LiDAR
self.sock.sendall(b"get_telemetry\n")
data = self.sock.recv(512)
# decode data and parse json to OusterTelemetry object
data = json.loads(data.decode("utf-8"))
return OusterTelemetry(
input_current_ma=data["input_current_ma"],
input_voltage_mv=data["input_voltage_mv"],
internal_temperature_deg_c=data["internal_temperature_deg_c"],
)
[docs] def prepare_telemetry_csv(self) -> None:
"""Prepares the csv file for recording the telemetry data."""
# Initialize DataFrame
df = pd.DataFrame(columns=["input_current_ma", "input_voltage_mv", "internal_temperature_deg_c"])
df.index.name = "elapsed_time_s"
# Check if the output file already exists
if os.path.exists(self.output):
user_response = input(f"The file already exists: {self.output}\nDo you want to overwrite it? (y/n): ")
if user_response.lower() != "y":
print("Recording aborted. Existing file was not overwritten.")
self.sock.close()
sys.exit(0)
# write csv header
df.to_csv(self.output)
[docs] def record_telemetry(self) -> None:
"""Records the telemetry data from the sensor continuously."""
self.prepare_telemetry_csv()
# keep track of number of records for elapsed time (index)
num_records = 0
# record telemetry data
while True:
elapsed_time = num_records * self.interval
telemetry = self.get_current_sensor_telemetry()
print(
f"Current temperature of Ouster LiDAR: {telemetry.internal_temperature_deg_c} °C after {time.strftime('%M:%S', time.gmtime(elapsed_time))} minutes")
# Append telemetry data to csv with elapsed time as index (name)
pd.Series(vars(telemetry), name=elapsed_time).to_frame().T.to_csv(self.output, mode='a', header=False)
# wait for specified interval before recording next data point
num_records += 1
time.sleep(self.interval)
[docs]def create_ouster_client(host: str, port: int) -> socket.socket:
"""Creates a socket connection to the ouster sensor."""
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect((host, port))
except socket.error as socket_err:
print(f"Socket connection error: {socket_err}")
sys.exit(1)
return sock
[docs]def get_args() -> argparse.Namespace:
"""Gets the command line arguments."""
parser = argparse.ArgumentParser()
parser.add_argument("--host", type=str, required=True, help="IP address or hostname of the ouster sensor")
parser.add_argument("--port", type=int, required=True, help="Port of the ouster sensor")
parser.add_argument("--interval", type=int, required=True, help="Interval in seconds to record the telemetry data")
parser.add_argument("--output", type=str, required=True, help="Path to the output file")
return parser.parse_args()
if __name__ == "__main__":
args = get_args()
recorder = OusterTelemetryRecorder(
host=args.host,
port=args.port,
interval=args.interval,
output=args.output,
)
try:
recorder.record_telemetry()
except KeyboardInterrupt:
print("\nRecording ended.")
except Exception as err:
print(f"Unexpected error: {str(err)}")
finally:
recorder.sock.close()
print("Socket closed.")
sys.exit(0)