Skip to content

Commit

Permalink
update message tests (incorporate all old tests). (#2088)
Browse files Browse the repository at this point in the history
  • Loading branch information
janiversen authored Mar 6, 2024
1 parent 647b270 commit 7c4a310
Show file tree
Hide file tree
Showing 7 changed files with 126 additions and 1,371 deletions.
21 changes: 0 additions & 21 deletions test/message/test_ascii.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,6 @@ def prepare_frame():
return MessageAscii([1], False)


def test_roundtrip_LRC(self):
"""Test combined compute/check LRC."""
data = b'\x12\x34\x23\x45\x34\x56\x45\x67'
assert MessageAscii.compute_LRC(data) == 0x1c
assert MessageAscii.check_LRC(data, 0x1C)

@pytest.mark.parametrize(
("packet", "used_len", "res_id", "res"),
[
Expand All @@ -43,21 +37,6 @@ def test_decode(self, frame, packet, used_len, res_id, res):
assert not tid
assert dev_id == res_id

@pytest.mark.parametrize(
("data", "dev_id", "res_msg"),
[
(b'\x01\x05\x04\x00\x17', 1, b':010105040017DE\r\n'),
(b'\x03\x07\x06\x00\x73', 2, b':0203070600737B\r\n'),
(b'\x08\x00\x01', 3, b':03080001F4\r\n'),
(b'\x84\x01', 2, b':02840179\r\n'),
],
)
def test_encode(self, frame, data, dev_id, res_msg):
"""Test encode."""
msg = frame.encode(data, dev_id, 0)
assert res_msg == msg
assert dev_id == int(msg[1:3], 16)

@pytest.mark.parametrize(
("data", "dev_id", "res_msg"),
[
Expand Down
129 changes: 126 additions & 3 deletions test/message/test_message.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,27 @@ def test_LRC_CRC(self, func, lrc, expect):
data = b'\x12\x34\x23\x45\x34\x56\x45\x67'
assert expect == func(data, lrc) if lrc else func(data)

def test_roundtrip_LRC(self):
"""Test combined compute/check LRC."""
data = b'\x12\x34\x23\x45\x34\x56\x45\x67'
assert MessageAscii.compute_LRC(data) == 0x1c
assert MessageAscii.check_LRC(data, 0x1C)

def test_crc16_table(self):
"""Test the crc16 table is prefilled."""
assert len(MessageRTU.crc16_table) == 256
assert isinstance(MessageRTU.crc16_table[0], int)
assert isinstance(MessageRTU.crc16_table[255], int)

def test_roundtrip_CRC(self):
"""Test combined compute/check CRC."""
data = b'\x12\x34\x23\x45\x34\x56\x45\x67'
assert MessageRTU.compute_CRC(data) == 0xE2DB
assert MessageRTU.check_CRC(data, 0xE2DB)

class TestMessages: # pylint: disable=too-few-public-methods


class TestMessages:
"""Test message classes."""

@pytest.mark.parametrize(
Expand Down Expand Up @@ -198,10 +217,114 @@ class TestMessages: # pylint: disable=too-few-public-methods
def test_encode(self, frame, frame_expected, data, dev_id, tid, inx1, inx2, inx3):
"""Test encode method."""
if frame != MessageSocket and tid:
return
pytest.skip("Not supported")
if frame == MessageTLS and (tid or dev_id):
return
pytest.skip("Not supported")
frame_obj = frame(None, True)
expected = frame_expected[inx1 + inx2 + inx3]
encoded_data = frame_obj.encode(data, dev_id, tid)
assert encoded_data == expected

@pytest.mark.parametrize(
("msg_type", "data", "dev_id", "tid", "expected"),
[
(MessageType.ASCII, b':0003007C00027F\r\n', 0, 0, b"\x03\x00\x7c\x00\x02",), # Request
(MessageType.ASCII, b':000304008D008EDE\r\n', 0, 0, b"\x03\x04\x00\x8d\x00\x8e",), # Response
(MessageType.ASCII, b':0083027B\r\n', 0, 0, b'\x83\x02',), # Exception
(MessageType.ASCII, b':1103007C00026E\r\n', 17, 0, b"\x03\x00\x7c\x00\x02",), # Request
(MessageType.ASCII, b':110304008D008ECD\r\n', 17, 0, b"\x03\x04\x00\x8d\x00\x8e",), # Response
(MessageType.ASCII, b':1183026A\r\n', 17, 0, b'\x83\x02',), # Exception
(MessageType.ASCII, b':FF03007C000280\r\n', 255, 0, b"\x03\x00\x7c\x00\x02",), # Request
(MessageType.ASCII, b':FF0304008D008EDF\r\n', 255, 0, b"\x03\x04\x00\x8d\x00\x8e",), # Response
(MessageType.ASCII, b':FF83027C\r\n', 255, 0, b'\x83\x02',), # Exception
(MessageType.RTU, b'\x00\x03\x00\x7c\x00\x02\x04\x02', 0, 0, b"\x03\x00\x7c\x00\x02",), # Request
(MessageType.RTU, b'\x00\x03\x04\x00\x8d\x00\x8e\xfa\xbc', 0, 0, b"\x03\x04\x00\x8d\x00\x8e",), # Response
(MessageType.RTU, b'\x00\x83\x02\x91\x31', 0, 0, b'\x83\x02',), # Exception
(MessageType.RTU, b'\x11\x03\x00\x7c\x00\x02\x07\x43', 17, 0, b"\x03\x00\x7c\x00\x02",), # Request
(MessageType.RTU, b'\x11\x03\x04\x00\x8d\x00\x8e\xfb\xbd', 17, 0, b"\x03\x04\x00\x8d\x00\x8e",), # Response
(MessageType.RTU, b'\x11\x83\x02\xc1\x34', 17, 0, b'\x83\x02',), # Exception
(MessageType.RTU, b'\xff\x03\x00|\x00\x02\x10\x0d', 255, 0, b"\x03\x00\x7c\x00\x02",), # Request
(MessageType.RTU, b'\xff\x03\x04\x00\x8d\x00\x8e\xf5\xb3', 255, 0, b"\x03\x04\x00\x8d\x00\x8e",), # Response
(MessageType.RTU, b'\xff\x83\x02\xa1\x01', 255, 0, b'\x83\x02',), # Exception
(MessageType.SOCKET, b'\x00\x00\x00\x00\x00\x06\x00\x03\x00\x7c\x00\x02', 0, 0, b"\x03\x00\x7c\x00\x02",), # Request
(MessageType.SOCKET, b'\x00\x00\x00\x00\x00\x07\x00\x03\x04\x00\x8d\x00\x8e', 0, 0, b"\x03\x04\x00\x8d\x00\x8e",), # Response
(MessageType.SOCKET, b'\x00\x00\x00\x00\x00\x03\x00\x83\x02', 0, 0, b'\x83\x02',), # Exception
(MessageType.SOCKET, b'\x00\x00\x00\x00\x00\x06\x11\x03\x00\x7c\x00\x02', 17, 0, b"\x03\x00\x7c\x00\x02",), # Request
(MessageType.SOCKET, b'\x00\x00\x00\x00\x00\x07\x11\x03\x04\x00\x8d\x00\x8e', 17, 0, b"\x03\x04\x00\x8d\x00\x8e",), # Response
(MessageType.SOCKET, b'\x00\x00\x00\x00\x00\x03\x11\x83\x02', 17, 0, b'\x83\x02',), # Exception
(MessageType.SOCKET, b'\x00\x00\x00\x00\x00\x06\xff\x03\x00\x7c\x00\x02', 255, 0, b"\x03\x00\x7c\x00\x02",), # Request
(MessageType.SOCKET, b'\x00\x00\x00\x00\x00\x07\xff\x03\x04\x00\x8d\x00\x8e', 255, 0, b"\x03\x04\x00\x8d\x00\x8e",), # Response
(MessageType.SOCKET, b'\x00\x00\x00\x00\x00\x03\xff\x83\x02', 255, 0, b'\x83\x02',), # Exception
(MessageType.SOCKET, b'\x0c\x05\x00\x00\x00\x06\x00\x03\x00\x7c\x00\x02', 0, 3077, b"\x03\x00\x7c\x00\x02",), # Request
(MessageType.SOCKET, b'\x0c\x05\x00\x00\x00\x07\x00\x03\x04\x00\x8d\x00\x8e', 0, 3077, b"\x03\x04\x00\x8d\x00\x8e",), # Response
(MessageType.SOCKET, b'\x0c\x05\x00\x00\x00\x03\x00\x83\x02', 0, 3077, b'\x83\x02',), # Exception
(MessageType.SOCKET, b'\x0c\x05\x00\x00\x00\x06\x11\x03\x00\x7c\x00\x02', 17, 3077, b"\x03\x00\x7c\x00\x02",), # Request
(MessageType.SOCKET, b'\x0c\x05\x00\x00\x00\x07\x11\x03\x04\x00\x8d\x00\x8e', 17, 3077, b"\x03\x04\x00\x8d\x00\x8e",), # Response
(MessageType.SOCKET, b'\x0c\x05\x00\x00\x00\x03\x11\x83\x02', 17, 3077, b'\x83\x02',), # Exception
(MessageType.SOCKET, b'\x0c\x05\x00\x00\x00\x06\xff\x03\x00\x7c\x00\x02', 255, 3077, b"\x03\x00\x7c\x00\x02",), # Request
(MessageType.SOCKET, b'\x0c\x05\x00\x00\x00\x07\xff\x03\x04\x00\x8d\x00\x8e', 255, 3077, b"\x03\x04\x00\x8d\x00\x8e",), # Response
(MessageType.SOCKET, b'\x0c\x05\x00\x00\x00\x03\xff\x83\x02', 255, 3077, b'\x83\x02',), # Exception
(MessageType.TLS, b'\x03\x00\x7c\x00\x02', 0, 0, b"\x03\x00\x7c\x00\x02",), # Request
(MessageType.TLS, b'\x03\x04\x00\x8d\x00\x8e', 0, 0, b"\x03\x04\x00\x8d\x00\x8e",), # Response
(MessageType.TLS, b'\x83\x02', 0, 0, b'\x83\x02',), # Exception
]
)
@pytest.mark.parametrize(
("split"),
[
"no",
"half",
"single",
]
)
async def test_decode(self, dummy_message, msg_type, data, dev_id, tid, expected, split):
"""Test encode method."""
if msg_type == MessageType.RTU:
pytest.skip("Waiting on implementation!")
if msg_type == MessageType.TLS and split != "no":
pytest.skip("Not supported.")
frame = dummy_message(
msg_type,
CommParams(),
False,
[1],
)
frame.callback_request_response = mock.Mock()
if split == "no":
used_len = frame.callback_data(data)

elif split == "half":
split_len = int(len(data) / 2)
assert not frame.callback_data(data[0:split_len])
frame.callback_request_response.assert_not_called()
used_len = frame.callback_data(data)
else:
last = len(data)
for i in range(0, last -1):
assert not frame.callback_data(data[0:i+1])
frame.callback_request_response.assert_not_called()
used_len = frame.callback_data(data)
assert used_len == len(data)
frame.callback_request_response.assert_called_with(expected, dev_id, tid)

@pytest.mark.parametrize(
("frame", "data", "exp_len"),
[
(MessageAscii, b':0003007C00017F\r\n', 17), # bad crc
# (MessageAscii, b'abc:0003007C00027F\r\n', 3), # garble in front
# (MessageAscii, b':0003007C00017F\r\nabc', 17), # bad crc, garble after
# (MessageAscii, b':0003007C00017F\r\n:0003', 17), # part second message
(MessageRTU, b'\x00\x83\x02\x91\x31', 0), # bad crc
# (MessageRTU, b'\x00\x83\x02\x91\x31', 0), # garble in front
# (MessageRTU, b'\x00\x83\x02\x91\x31', 0), # garble after
# (MessageRTU, b'\x00\x83\x02\x91\x31', 0), # part second message
]
)
async def test_decode_bad_crc(self, frame, data, exp_len):
"""Test encode method."""
if frame == MessageRTU:
pytest.skip("Waiting for implementation.")
frame_obj = frame(None, True)
used_len, _, _, data = frame_obj.decode(data)
assert used_len == exp_len
assert not data
70 changes: 0 additions & 70 deletions test/message/test_rtu.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,76 +13,6 @@ def prepare_frame():
"""Return message object."""
return MessageRTU([1], False)

def test_crc16_table(self):
"""Test the crc16 table is prefilled."""
assert len(MessageRTU.crc16_table) == 256
assert isinstance(MessageRTU.crc16_table[0], int)
assert isinstance(MessageRTU.crc16_table[255], int)

def test_roundtrip_CRC(self):
"""Test combined compute/check CRC."""
data = b'\x12\x34\x23\x45\x34\x56\x45\x67'
assert MessageRTU.compute_CRC(data) == 0xE2DB
assert MessageRTU.check_CRC(data, 0xE2DB)

# b"\x02\x01\x01\x00Q\xcc"
# b"\x01\x01\x03\x01\x00\n\xed\x89"
# b"\x11\x03\x06\xAE\x41\x56\x52\x43\x40\x43"
# b"\x11\x03\x06\xAE\x41\x56\x52\x43\x40\x49\xAD"

# b"\x11\x03\x06\xAE\x41\x56\x52\x43\x40\x49\xAD\x11\x03" # good frame + part of next frame

# b"\x11\x03\x06\xAE\x41\x56\x52\x43\x40\x49\xAC" # invalid frame CRC
# b"\x11\x03\x06\xAE\x41\x56\x52\x43\x40\x49\xAC" # bad crc
# b"\x61\x62\x00\x01\x00\n\xec\x1c" # bad function code
# b"\x01\x03\x03\x01\x00\n\x94\x49" # Not ok

# test frame ready
# (b"", False),
# (b"\x11", False),
# (b"\x11\x03", False),
# (b"\x11\x03\x06", False),
# (b"\x11\x03\x06\xAE\x41\x56\x52\x43\x40\x49", False),
# (b"\x11\x03\x06\xAE\x41\x56\x52\x43\x40\x49\xAD", True),
# (b"\x11\x03\x06\xAE\x41\x56\x52\x43\x40\x49\xAD\xAB\xCD", True),


@pytest.mark.parametrize(
("packet", "used_len", "res_id", "res"),
[
(b':010100010001FC\r\n', 17, 1, b'\x01\x00\x01\x00\x01'),
(b':00010001000AF4\r\n', 17, 0, b'\x01\x00\x01\x00\x0a'),
(b':01010001000AF3\r\n', 17, 1, b'\x01\x00\x01\x00\x0a'),
(b':61620001000A32\r\n', 17, 97, b'\x62\x00\x01\x00\x0a'),
(b':01270001000ACD\r\n', 17, 1, b'\x27\x00\x01\x00\x0a'),
(b':010100', 0, 0, b''), # short frame
(b':00010001000AF4', 0, 0, b''),
(b'abc:00010001000AF4', 3, 0, b''), # garble before frame
(b'abc00010001000AF4', 17, 0, b''), # only garble
(b':01010001000A00\r\n', 17, 0, b''),
],
)
def xtest_decode(self, frame, packet, used_len, res_id, res):
"""Test decode."""
res_len, tid, dev_id, data = frame.decode(packet)
assert res_len == used_len
assert data == res
assert not tid
assert dev_id == res_id

@pytest.mark.parametrize(
("data", "dev_id", "res_msg"),
[
(b'\x01\x01\x00', 2, b'\x02\x01\x01\x00\x51\xcc'),
(b'\x03\x06\xAE\x41\x56\x52\x43\x40', 17, b'\x11\x03\x06\xAE\x41\x56\x52\x43\x40\x49\xAD'),
(b'\x01\x03\x01\x00\x0a', 1, b'\x01\x01\x03\x01\x00\x0a\xed\x89'),
],
)
def test_encode(self, frame, data, dev_id, res_msg):
"""Test encode."""
msg = frame.encode(data, dev_id, 0)
assert res_msg == msg
assert dev_id == int(msg[0])

@pytest.mark.parametrize(
("data", "dev_id", "res_msg"),
Expand Down
15 changes: 0 additions & 15 deletions test/message/test_socket.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,21 +33,6 @@ def test_decode(self, frame, packet, used_len, res_id, res_tid, res):
assert res_tid == tid
assert dev_id == res_id

@pytest.mark.parametrize(
("data", "dev_id", "tid", "res_msg"),
[
(b'\x01\x05\x04\x00\x17', 7, 5, b'\x00\x05\x00\x00\x00\x06\x07\x01\x05\x04\x00\x17'),
(b'\x03\x07\x06\x00\x73', 2, 9, b'\x00\x09\x00\x00\x00\x06\x02\x03\x07\x06\x00\x73'),
(b'\x08\x00\x01', 3, 6, b'\x00\x06\x00\x00\x00\x04\x03\x08\x00\x01'),
(b'\x84\x01', 4, 8, b'\x00\x08\x00\x00\x00\x03\x04\x84\x01'),
],
)
def test_encode(self, frame, data, dev_id, tid, res_msg):
"""Test encode."""
msg = frame.encode(data, dev_id, tid)
assert res_msg == msg
assert dev_id == int(msg[6])
assert tid == int.from_bytes(msg[0:2], 'big')

@pytest.mark.parametrize(
("data", "dev_id", "tid", "res_msg"),
Expand Down
13 changes: 0 additions & 13 deletions test/message/test_tls.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,19 +30,6 @@ def test_decode(self, frame, packet, used_len,):
assert not tid
assert not dev_id

@pytest.mark.parametrize(
("data"),
[
(b'\x01\x05\x04\x00\x17'),
(b'\x03\x07\x06\x00\x73'),
(b'\x08\x00\x01'),
(b'\x84\x01'),
],
)
def test_encode(self, frame, data):
"""Test encode."""
msg = frame.encode(data, 0, 0)
assert data == msg

@pytest.mark.parametrize(
("data"),
Expand Down
Loading

0 comments on commit 7c4a310

Please sign in to comment.