Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Message Raw, ASCII. #2054

Merged
merged 1 commit into from
Feb 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions pymodbus/framer/ascii_framer.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from pymodbus.exceptions import ModbusIOException
from pymodbus.framer.base import BYTE_ORDER, FRAME_HEADER, ModbusFramer
from pymodbus.logging import Log
from pymodbus.utilities import checkLRC, computeLRC
from pymodbus.message.ascii import MessageAscii


ASCII_FRAME_HEADER = BYTE_ORDER + FRAME_HEADER
Expand Down Expand Up @@ -73,7 +73,7 @@ def checkFrame(self):
self._header["uid"] = int(self._buffer[1:3], 16)
self._header["lrc"] = int(self._buffer[end - 2 : end], 16)
data = a2b_hex(self._buffer[start + 1 : end - 2])
return checkLRC(data, self._header["lrc"])
return MessageAscii.check_LRC(data, self._header["lrc"])
return False

def advanceFrame(self):
Expand Down Expand Up @@ -141,7 +141,7 @@ def buildPacket(self, message):
buffer = struct.pack(
ASCII_FRAME_HEADER, message.slave_id, message.function_code
)
checksum = computeLRC(encoded + buffer)
checksum = MessageAscii.compute_LRC(buffer + encoded)

packet = bytearray()
packet.extend(self._start)
Expand Down
69 changes: 58 additions & 11 deletions pymodbus/message/ascii.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,26 +6,73 @@
"""
from __future__ import annotations

from binascii import a2b_hex, b2a_hex

from pymodbus.logging import Log
from pymodbus.message.base import MessageBase


class MessageAscii(MessageBase):
"""Modbus Socket frame type.
r"""Modbus ASCII Frame Controller.

[ Start ][Address ][ Function ][ Data ][ LRC ][ End ]
1c 2c 2c Nc 1c 2c

[ MBAP Header ] [ Function Code] [ Data ]
[ tid ][ pid ][ length ][ uid ]
2b 2b 2b 1b 1b Nb
* data can be 0 - 2x252 chars
* end is "\\r\\n" (Carriage return line feed), however the line feed
character can be changed via a special command
* start is ":"

* length = uid + function code + data
This framer is used for serial transmission. Unlike the RTU protocol,
the data in this framer is transferred in plain text ascii.
"""

def reset(self) -> None:
"""Clear internal handling."""
START = b':'
END = b'\r\n'


def decode(self, _data: bytes) -> tuple[int, int, int, bytes]:
def decode(self, data: bytes) -> tuple[int, int, int, bytes]:
"""Decode message."""
return 0, 0, 0, b''
if (used_len := len(data)) < 10:
Log.debug("Short frame: {} wait for more data", data, ":hex")
return 0, 0, 0, self.EMPTY
if data[0:1] != self.START:
if (start := data.find(self.START)) != -1:
used_len = start
Log.debug("Garble data before frame: {}, skip until start of frame", data, ":hex")
return used_len, 0, 0, self.EMPTY
if (used_len := data.find(self.END)) == -1:
Log.debug("Incomplete frame: {} wait for more data", data, ":hex")
return 0, 0, 0, self.EMPTY

def encode(self, _data: bytes, _device_id: int, _tid: int) -> bytes:
dev_id = int(data[1:3], 16)
lrc = int(data[used_len - 2: used_len], 16)
msg = a2b_hex(data[1 : used_len - 2])
if not self.check_LRC(msg, lrc):
Log.debug("LRC wrong in frame: {} skipping", data, ":hex")
return used_len+2, 0, 0, self.EMPTY
return used_len+2, 0, dev_id, msg[1:]

def encode(self, data: bytes, device_id: int, _tid: int) -> bytes:
"""Decode message."""
return b''
dev_id = device_id.to_bytes(1,'big')
checksum = self.compute_LRC(dev_id + data)
packet = bytearray()
packet.extend(self.START)
packet.extend(f"{device_id:02x}".encode())
packet.extend(b2a_hex(data))
packet.extend(f"{checksum:02x}".encode())
packet.extend(self.END)
return bytes(packet).upper()

@classmethod
def compute_LRC(cls, data: bytes) -> int:
"""Use to compute the longitudinal redundancy check against a string."""
lrc = sum(int(a) for a in data) & 0xFF
lrc = (lrc ^ 0xFF) + 1
return lrc & 0xFF

@classmethod
def check_LRC(cls, data: bytes, check: int) -> bool:
"""Check if the passed in data matches the LRC."""
return cls.compute_LRC(data) == check
8 changes: 5 additions & 3 deletions pymodbus/message/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
class MessageBase:
"""Intern base."""

EMPTY = b''

def __init__(
self,
device_ids: list[int] | None,
Expand All @@ -24,9 +26,9 @@ def __init__(
self.device_ids = device_ids
self.is_server = is_server

@abstractmethod
def reset(self) -> None:
"""Clear internal handling."""
def validate_device_id(self, dev_id: int) -> bool:
"""Check if device id is expected."""
return not (self.device_ids and dev_id not in self.device_ids)

@abstractmethod
def decode(self, _data: bytes) -> tuple[int, int, int, bytes]:
Expand Down
19 changes: 11 additions & 8 deletions pymodbus/message/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,10 +76,17 @@ def __init__(self,

def callback_data(self, data: bytes, addr: tuple | None = None) -> int:
"""Handle received data."""
used_len, tid, device_id, data = self.msg_handle.decode(data)
if data:
self.callback_request_response(data, device_id, tid)
return used_len
tot_len = len(data)
start = 0
while True:
used_len, tid, device_id, msg = self.msg_handle.decode(data[start:])
if msg:
self.callback_request_response(msg, device_id, tid)
if not used_len:
return start
start += used_len
if start == tot_len:
return tot_len

# --------------------- #
# callbacks and helpers #
Expand All @@ -98,7 +105,3 @@ def build_send(self, data: bytes, device_id: int, tid: int, addr: tuple | None =
"""
send_data = self.msg_handle.encode(data, device_id, tid)
self.send(send_data, addr)

def reset(self) -> None:
"""Reset handling."""
self.msg_handle.reset()
26 changes: 15 additions & 11 deletions pymodbus/message/raw.py
Original file line number Diff line number Diff line change
@@ -1,28 +1,32 @@
"""ModbusMessage layer."""
from __future__ import annotations

from pymodbus.logging import Log
from pymodbus.message.base import MessageBase


class MessageRaw(MessageBase):
"""Raw header.
r"""Modbus RAW Frame Controller.

HEADER:
byte[0] = device_id
byte[1] = transaction_id
byte[2..] = request/response
[ Device id ][Transaction id ][ Data ]
1c 2c Nc

This is mainly for test purposes.
"""
* data can be 1 - X chars

def reset(self) -> None:
"""Clear internal handling."""
This framer is used for non modbus communication and testing purposes.
"""

def decode(self, data: bytes) -> tuple[int, int, int, bytes]:
"""Decode message."""
if len(data) < 3:
return 0, 0, 0, b''
return len(data), int(data[0]), int(data[1]), data[2:]
Log.debug("Short frame: {} wait for more data", data, ":hex")
return 0, 0, 0, self.EMPTY
dev_id = int(data[0])
tid = int(data[1])
if not self.validate_device_id(dev_id):
Log.debug("Device id: {} in frame {} unknown, skipping.", dev_id, data, ":hex")

return len(data), dev_id, tid, data[2:]

def encode(self, data: bytes, device_id: int, tid: int) -> bytes:
"""Decode message."""
Expand Down
3 changes: 0 additions & 3 deletions pymodbus/message/rtu.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,6 @@ class MessageRTU(MessageBase):
neither when receiving nor when sending.
"""

def reset(self) -> None:
"""Clear internal handling."""

def decode(self, _data: bytes) -> tuple[int, int, int, bytes]:
"""Decode message."""
return 0, 0, 0, b''
Expand Down
3 changes: 0 additions & 3 deletions pymodbus/message/socket.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,6 @@ class MessageSocket(MessageBase):
* length = uid + function code + data
"""

def reset(self) -> None:
"""Clear internal handling."""

def decode(self, _data: bytes) -> tuple[int, int, int, bytes]:
"""Decode message."""
return 0, 0, 0, b''
Expand Down
3 changes: 0 additions & 3 deletions pymodbus/message/tls.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,6 @@ class MessageTLS(MessageBase):
1b Nb
"""

def reset(self) -> None:
"""Clear internal handling."""

def decode(self, _data: bytes) -> tuple[int, int, int, bytes]:
"""Decode message."""
return 0, 0, 0, b''
Expand Down
28 changes: 0 additions & 28 deletions pymodbus/utilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@
"default",
"computeCRC",
"checkCRC",
"computeLRC",
"checkLRC",
"rtuFrameSize",
]

Expand Down Expand Up @@ -205,32 +203,6 @@ def checkCRC(data, check): # pylint: disable=invalid-name
return computeCRC(data) == check


def computeLRC(data): # pylint: disable=invalid-name
"""Use to compute the longitudinal redundancy check against a string.

This is only used on the serial ASCII
modbus protocol. A full description of this implementation
can be found in appendix B of the serial line modbus description.

:param data: The data to apply a lrc to
:returns: The calculated LRC

"""
lrc = sum(int(a) for a in data) & 0xFF
lrc = (lrc ^ 0xFF) + 1
return lrc & 0xFF


def checkLRC(data, check): # pylint: disable=invalid-name
"""Check if the passed in data matches the LRC.

:param data: The data to calculate
:param check: The LRC to validate
:returns: True if matched, False otherwise
"""
return computeLRC(data) == check


def rtuFrameSize(data, byte_count_pos): # pylint: disable=invalid-name
"""Calculate the size of the frame based on the byte count.

Expand Down
91 changes: 91 additions & 0 deletions test/message/test_ascii.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
"""Test transport."""
import struct

import pytest

from pymodbus.message.ascii import MessageAscii


class TestMessageAscii:
"""Test message module."""

@staticmethod
@pytest.fixture(name="frame")
def prepare_frame():
"""Return message object."""
return MessageAscii([1], False)


def test_check_LRC(self):
"""Test check_LRC."""
data = struct.pack(">HHHH", 0x1234, 0x2345, 0x3456, 0x4567)
assert MessageAscii.check_LRC(data, 0x1C)

def test_check_noLRC(self):
"""Test check_LRC."""
data = struct.pack(">HHHH", 0x1234, 0x2345, 0x3456, 0x4567)
assert not MessageAscii.check_LRC(data, 0x0C)

def test_compute_LRC(self):
"""Test compute_LRC."""
data = struct.pack(">HHHH", 0x1234, 0x2345, 0x3456, 0x4567)
assert MessageAscii.compute_LRC(data) == 0x1c

def test_roundtrip_LRC(self):
"""Test combined compute/check LRC."""
data = struct.pack(">HHHH", 0x1234, 0x2345, 0x3456, 0x4567)
assert MessageAscii.compute_LRC(data) == 0x1c
assert MessageAscii.check_LRC(data, 0x1C)

@pytest.mark.parametrize(
("packet", "used_len", "res_id", "res"),
[
(b':010100010001FC\r\n', 17, 1, b'\x01\x00\x01\x00\x01'),
(b':00010001000AF4\r\n', 17, 0, b'\x01\x00\x01\x00\x0a'),
(b':01010001000AF3\r\n', 17, 1, b'\x01\x00\x01\x00\x0a'),
(b':61620001000A32\r\n', 17, 97, b'\x62\x00\x01\x00\x0a'),
(b':01270001000ACD\r\n', 17, 1, b'\x27\x00\x01\x00\x0a'),
(b':010100', 0, 0, b''), # short frame
(b':00010001000AF4', 0, 0, b''),
(b'abc:00010001000AF4', 3, 0, b''), # garble before frame
(b'abc00010001000AF4', 17, 0, b''), # only garble
(b':01010001000A00\r\n', 17, 0, b''),
],
)
def test_decode(self, frame, packet, used_len, res_id, res):
"""Test decode."""
res_len, tid, dev_id, data = frame.decode(packet)
assert res_len == used_len
assert data == res
assert not tid
assert dev_id == res_id

@pytest.mark.parametrize(
("data", "dev_id", "res_msg"),
[
(b'\x01\x05\x04\x00\x17', 1, b':010105040017DE\r\n'),
(b'\x03\x07\x06\x00\x73', 2, b':0203070600737B\r\n'),
(b'\x08\x00\x01', 3, b':03080001F4\r\n'),
],
)
def test_encode(self, frame, data, dev_id, res_msg):
"""Test encode."""
msg = frame.encode(data, dev_id, 0)
assert res_msg == msg
assert dev_id == int(msg[1:3], 16)

@pytest.mark.parametrize(
("data", "dev_id", "res_msg"),
[
(b'\x01\x05\x04\x00\x17', 1, b':010105040017DF\r\n'),
(b'\x03\x07\x06\x00\x73', 2, b':0203070600737D\r\n'),
(b'\x08\x00\x01', 3, b':03080001F7\r\n'),
],
)
def test_roundtrip(self, frame, data, dev_id, res_msg):
"""Test encode."""
msg = frame.encode(data, dev_id, 0)
res_len, _, res_id, res_data = frame.decode(msg)
assert data == res_data
assert dev_id == res_id
assert res_len == len(res_msg)
Loading