From ed46df6a2d1bde4e8924d5e94a658bc1e42b11d0 Mon Sep 17 00:00:00 2001 From: EricGaoo Date: Sat, 23 Nov 2024 19:27:22 -0500 Subject: [PATCH] Implemented decoder with cantools but more testing required --- py/decoder/__init__.py | 0 py/decoder/decoder.py | 89 ++++++++++++++++++++++++++++++++++++++++++ py/decoder/test.py | 26 ++++++++++++ 3 files changed, 115 insertions(+) create mode 100644 py/decoder/__init__.py create mode 100644 py/decoder/decoder.py create mode 100644 py/decoder/test.py diff --git a/py/decoder/__init__.py b/py/decoder/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/py/decoder/decoder.py b/py/decoder/decoder.py new file mode 100644 index 000000000..3645be126 --- /dev/null +++ b/py/decoder/decoder.py @@ -0,0 +1,89 @@ +import os +import pty +import cantools +import serial + +class State: + SOM = "SOM" + ID = "ID" + DLC = "DLC" + DATA = "DATA" + EOM = "EOM" + VALID = "VALID" + +class DatagramDecoder: + def __init__(self): + self.message_state = State.SOM + self.buffer = [] + self.datagram = None + self.init_serial() + self.init_dbc() + + def init_serial(self): + self.master, self.slave = pty.openpty() + self.slave_name = os.ttyname(self.slave) + self.slave_serial = serial.Serial(self.slave_name) + + def init_dbc(self): + dbc_path = "/Users/ericgao/box/shared/fwxv/libraries/codegen/system_can.dbc" + self.db = cantools.database.load_file(dbc_path) + + def write(self, packet): + self.slave_serial.write(bytes(packet)) + + def read(self): + recv = os.read(self.master,1000) + if self.parse_byte(recv): + message = self.db.get_message_by_frame_id(self.datagram["id"]) + decoded_data = message.decode(self.datagram['data']) + print(decoded_data) + + def read_test(self, byte): + if self.parse_byte(byte): + message = self.db.get_message_by_frame_id(self.datagram["id"]) + decoded_data = message.decode(bytes(self.datagram['data'])) + print(decoded_data) + + def is_valid_id(self, id): + try: + self.db.get_message_by_frame_id(id) + return True + except KeyError: + return False + + def parse_byte(self, byte): + match self.message_state: + case (State.SOM | State.VALID): + self.buffer = [] + self.datagram = None + if byte == 0xAA: + self.message_state = State.ID + case State.ID: + self.buffer.append(byte) + if len(self.buffer) == 4: + message_id = int.from_bytes(self.buffer, byteorder='big') + if self.is_valid_id(message_id): + self.datagram = {"id": message_id} + self.buffer = [] + self.message_state = State.DLC + else: + self.message_state = State.SOM + case State.DLC: + self.datagram["dlc"] = byte + if self.datagram["dlc"] <= 9: + self.datagram["data"] = [] + self.message_state = State.DATA + else: + self.message_state = State.SOM + case State.DATA: + self.buffer.append(byte) + if len(self.buffer) == self.datagram["dlc"]: + self.datagram["data"] = self.buffer + self.message_state = State.EOM + case State.EOM: + if byte == 0xBB: + self.message_state = State.VALID + else: + self.message_state = State.SOM + return self.message_state == State.VALID + \ No newline at end of file diff --git a/py/decoder/test.py b/py/decoder/test.py new file mode 100644 index 000000000..f0f63e65f --- /dev/null +++ b/py/decoder/test.py @@ -0,0 +1,26 @@ +from decoder import DatagramDecoder + +decoder = DatagramDecoder() + +decoder.init_serial() +decoder.init_dbc() + +invalid_message = [0xAA, 0xFF, 0x00, 0x00, 0x00, 0x01, 0x06, 0xAA, 0x01, 0x01, 0x01, 0x01, 0x01, 0xBB] + +for byte in invalid_message: + decoder.read_test(byte) + +valid_message = [0xAA, 0x00, 0x00, 0x00, 0x01, 0x07, 0x01, 0x01, 0x01, 0x01, 0x01, 0x01, 0x01, 0xBB] + +for byte in valid_message: + decoder.read_test(byte) + +invalid_message = [0xAA, 0x00, 0x00, 0x00, 0x01, 0x07, 0x01, 0x01, 0x01, 0x01, 0x01, 0x01, 0x01, 0xAA] + +for byte in invalid_message: + decoder.read_test(byte) + +valid_message_2 = [0xAA, 0x00, 0x00, 0x00, 0x01, 0x07, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0xBB] + +for byte in valid_message_2: + decoder.read_test(byte) \ No newline at end of file