Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cleanup ReadWriteMultipleRegistersResponse and testing #2261

Merged
merged 3 commits into from
Jul 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 1 addition & 36 deletions pymodbus/pdu/register_read_message.py
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,7 @@ def __str__(self):
)


class ReadWriteMultipleRegistersResponse(ModbusResponse):
class ReadWriteMultipleRegistersResponse(ReadHoldingRegistersResponse):
"""Read/write multiple registers.

The normal response contains the data from the group of registers that
Expand All @@ -362,38 +362,3 @@ class ReadWriteMultipleRegistersResponse(ModbusResponse):
"""

function_code = 23
_rtu_byte_count_pos = 2

def __init__(self, values=None, slave=0, transaction=0, protocol=0, skip_encode=False):
"""Initialize a new instance.

:param values: The register values to write
"""
super().__init__(slave, transaction, protocol, skip_encode)
self.registers = values or []

def encode(self):
"""Encode the response packet.

:returns: The encoded packet
"""
result = struct.pack(">B", len(self.registers) * 2)
for register in self.registers:
result += struct.pack(">H", register)
return result

def decode(self, data):
"""Decode the register response packet.

:param data: The response to decode
"""
bytecount = int(data[0])
for i in range(1, bytecount, 2):
self.registers.append(struct.unpack(">H", data[i : i + 2])[0])

def __str__(self):
"""Return a string representation of the instance.

:returns: A string representation of the instance
"""
return f"ReadWriteNRegisterResponse ({len(self.registers)})"
25 changes: 6 additions & 19 deletions test/sub_function_codes/test_register_read_messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,9 @@ class TestReadRegisterMessages:
* Read Holding Registers
"""

value = None
values = None
request_read = None
response_read = None
values: list
request_read: dict
response_read: dict

def setup_method(self):
"""Initialize the test environment and builds request/result encoding pairs."""
Expand All @@ -42,7 +41,6 @@ def setup_method(self):
"read_count": 5,
"write_address": 1,
}
self.value = 0xABCD
self.values = [0xA, 0xB, 0xC]
self.request_read = {
ReadRegistersRequestBase(1, 5): b"\x00\x01\x00\x05",
Expand Down Expand Up @@ -84,20 +82,9 @@ def test_register_read_responses(self):

def test_register_read_response_decode(self):
"""Test register read response."""
registers = [
[0x0A, 0x0B, 0x0C],
[0x0A, 0x0B, 0x0C],
[0x0A, 0x0B, 0x0C],
[0x0A, 0x0B, 0x0C, 0x0A, 0x0B, 0x0C],
]
values = sorted(
self.response_read.items(),
key=lambda x: str(x), # pylint: disable=unnecessary-lambda
)
for packet, register in zip(values, registers):
request, response = packet
request.decode(response)
assert request.registers == register
for response, packet in self.response_read.items():
response.decode(packet)
assert response.registers == self.values

async def test_register_read_requests_count_errors(self):
"""This tests that the register request messages.
Expand Down