Skip to content

Commit

Permalink
RTU decoder, ready for test.
Browse files Browse the repository at this point in the history
  • Loading branch information
janiversen committed Apr 16, 2024
1 parent 9c4466b commit 3c0e9ed
Show file tree
Hide file tree
Showing 8 changed files with 343 additions and 304 deletions.
42 changes: 23 additions & 19 deletions pymodbus/framer/ascii.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,25 +34,29 @@ class FramerAscii(FramerBase):

def decode(self, data: bytes) -> tuple[int, int, int, bytes]:
"""Decode ADU."""
if (used_len := len(data)) < self.MIN_SIZE:
Log.debug("Short frame: {} wait for more data", data, ":hex")
return 0, 0, 0, self.EMPTY
if data[0:1] != self.START:
if (start := data.find(self.START)) != -1:
used_len = start
Log.debug("Garble data before frame: {}, skip until start of frame", data, ":hex")
return used_len, 0, 0, self.EMPTY
if (used_len := data.find(self.END)) == -1:
Log.debug("Incomplete frame: {} wait for more data", data, ":hex")
return 0, 0, 0, self.EMPTY

dev_id = int(data[1:3], 16)
lrc = int(data[used_len - 2: used_len], 16)
msg = a2b_hex(data[1 : used_len - 2])
if not self.check_LRC(msg, lrc):
Log.debug("LRC wrong in frame: {} skipping", data, ":hex")
return used_len+2, 0, 0, self.EMPTY
return used_len+2, 0, dev_id, msg[1:]
buf_len = len(data)
used_len = 0
while True:
if buf_len - used_len < self.MIN_SIZE:
return used_len, 0, 0, self.EMPTY
buffer = data[used_len:]
if buffer[0:1] != self.START:
if (i := buffer.find(self.START)) == -1:
Log.debug("No frame start in data: {}, wait for data", data, ":hex")
return buf_len, 0, 0, self.EMPTY
used_len += i
continue
if (end := buffer.find(self.END)) == -1:
Log.debug("Incomplete frame: {} wait for more data", data, ":hex")
return used_len, 0, 0, self.EMPTY
dev_id = int(buffer[1:3], 16)
lrc = int(buffer[end - 2: end], 16)
msg = a2b_hex(buffer[1 : end - 2])
used_len += end + 2
if not self.check_LRC(msg, lrc):
Log.debug("LRC wrong in frame: {} skipping", data, ":hex")
continue
return used_len, dev_id, dev_id, msg[1:]

def encode(self, data: bytes, device_id: int, _tid: int) -> bytes:
"""Encode ADU."""
Expand Down
8 changes: 6 additions & 2 deletions pymodbus/framer/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,12 @@ class FramerBase:
def __init__(self) -> None:
"""Initialize a ADU instance."""

def set_dev_ids(self, _dev_ids: list[int]):
"""Set/update allowed device ids."""

def set_fc_calc(self, _fc: int, _msg_size: int, _count_pos: int):
"""Set/Update function code information."""

def decode(self, data: bytes) -> tuple[int, int, int, bytes]:
"""Decode ADU.
Expand All @@ -24,12 +30,10 @@ def decode(self, data: bytes) -> tuple[int, int, int, bytes]:
device_id (int) or 0
modbus request/response (bytes)
"""
raise RuntimeError("NOT IMPLEMENTED!")

def encode(self, pdu: bytes, dev_id: int, tid: int) -> bytes:
"""Encode ADU.
returns:
modbus ADU (bytes)
"""
raise RuntimeError("NOT IMPLEMENTED!")
22 changes: 9 additions & 13 deletions pymodbus/framer/framer.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,23 +75,19 @@ def __init__(self,
}[framer_type]()


def validate_device_id(self, dev_id: int) -> bool:
"""Check if device id is expected."""
return self.broadcast or (dev_id in self.device_ids)


def callback_data(self, data: bytes, addr: tuple | None = None) -> int:
"""Handle received data."""
tot_len = len(data)
start = 0
tot_len = 0
buf_len = len(data)
while True:
used_len, tid, device_id, msg = self.handle.decode(data[start:])
used_len, tid, device_id, msg = self.handle.decode(data[tot_len:])
tot_len += used_len
if msg:
self.callback_request_response(msg, device_id, tid)
if not used_len:
return start
start += used_len
if start == tot_len:
if self.broadcast or device_id in self.device_ids:
self.callback_request_response(msg, device_id, tid)
if tot_len == buf_len:
return tot_len
else:
return tot_len

# --------------------- #
Expand Down
181 changes: 54 additions & 127 deletions pymodbus/framer/rtu.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,6 @@
"""Modbus RTU frame implementation."""
from __future__ import annotations

import struct

from pymodbus.exceptions import ModbusIOException
from pymodbus.factory import ClientDecoder
from pymodbus.framer.base import FramerBase
from pymodbus.logging import Log

Expand All @@ -17,26 +13,46 @@ class FramerRTU(FramerBase):
* Note: due to the USB converter and the OS drivers, timing cannot be quaranteed
neither when receiving nor when sending.
Decoding is a complicated process because the RTU frame does not have a fixed prefix
only suffix, therefore it is nessecary to decode the content (PDU) to get length etc.
There are some restraints however that help the detection.
For client:
- a request causes 1 response !
- Multiple requests are NOT allowed (master-slave protocol)
- the server will not retransmit responses
this means decoding is always exactly 1 frame (response)
For server (Single device)
- only 1 request allowed (master-slave) protocol
- the client (master) may retransmit but in larger time intervals
this means decoding is always exactly 1 frame (request)
For server (Multidrop line --> devices in parallel)
- only 1 request allowed (master-slave) protocol
- other devices will send responses
- the client (master) may retransmit but in larger time intervals
this means decoding is always exactly 1 frame (request / response)
Recovery from bad cabling etc is important, the following scenarios is possible:
- garble data before frame
- garble data in frame
- garble data after frame
- data in frame garbled (wrong CRC)
decoding assumes the frame is sound, and if not enters a hunting mode.
The 3.5 byte wait betwen frames is 31ms at 1.200Bps and 1ms at 38.600bps,
so the decoder will wait 50ms for more data if not the transmission is
considered complete
"""

MIN_SIZE = 5

def __init__(self) -> None:
"""Initialize a ADU instance."""
super().__init__()
self.broadcast: bool = False
self.dev_ids: list[int]
self.fc_calc: dict[int, int]

def set_dev_ids(self, dev_ids: list[int]):
"""Set/update allowed device ids."""
if 0 in dev_ids:
self.broadcast = True
self.dev_ids = dev_ids

def set_fc_calc(self, fc: int, msg_size: int, count_pos: int):
"""Set/Update function code information."""
self.fc_calc[fc] = msg_size if not count_pos else -count_pos


@classmethod
Expand All @@ -58,120 +74,31 @@ def generate_crc16_table(cls) -> list[int]:
return result
crc16_table: list[int] = [0]

def _legacy_decode(self, callback, slave): # noqa: C901
"""Process new packet pattern."""

def is_frame_ready(self):
"""Check if we should continue decode logic."""
size = self._header.get("len", 0)
if not size and len(self._buffer) > self._hsize:
try:
self._header["uid"] = int(self._buffer[0])
self._header["tid"] = int(self._buffer[0])
func_code = int(self._buffer[1])
pdu_class = self.decoder.lookupPduClass(func_code)
size = pdu_class.calculateRtuFrameSize(self._buffer)
self._header["len"] = size

if len(self._buffer) < size:
raise IndexError
self._header["crc"] = self._buffer[size - 2 : size]
except IndexError:
return False
return len(self._buffer) >= size if size > 0 else False

def get_frame_start(self, slaves, broadcast, skip_cur_frame):
"""Scan buffer for a relevant frame start."""
start = 1 if skip_cur_frame else 0
if (buf_len := len(self._buffer)) < 4:
return False
for i in range(start, buf_len - 3): # <slave id><function code><crc 2 bytes>
if not broadcast and self._buffer[i] not in slaves:
continue
if (
self._buffer[i + 1] not in self.function_codes
and (self._buffer[i + 1] - 0x80) not in self.function_codes
):
continue
if i:
self._buffer = self._buffer[i:] # remove preceding trash.
return True
if buf_len > 3:
self._buffer = self._buffer[-3:]
return False

def check_frame(self):
"""Check if the next frame is available."""
try:
self._header["uid"] = int(self._buffer[0])
self._header["tid"] = int(self._buffer[0])
func_code = int(self._buffer[1])
pdu_class = self.decoder.lookupPduClass(func_code)
size = pdu_class.calculateRtuFrameSize(self._buffer)
self._header["len"] = size

if len(self._buffer) < size:
raise IndexError
self._header["crc"] = self._buffer[size - 2 : size]
frame_size = self._header["len"]
data = self._buffer[: frame_size - 2]
crc = self._header["crc"]
crc_val = (int(crc[0]) << 8) + int(crc[1])
return FramerRTU.check_CRC(data, crc_val)
except (IndexError, KeyError, struct.error):
return False

self._buffer = b'' # pylint: disable=attribute-defined-outside-init
broadcast = not slave[0]
skip_cur_frame = False
while get_frame_start(self, slave, broadcast, skip_cur_frame):
self._header: dict = {"uid": 0x00, "len": 0, "crc": b"\x00\x00"} # pylint: disable=attribute-defined-outside-init
if not is_frame_ready(self):
Log.debug("Frame - not ready")
break
if not check_frame(self):
Log.debug("Frame check failed, ignoring!!")
# x = self._buffer
# self.resetFrame()
# self._buffer = x
skip_cur_frame = True
continue
start = 0x01 # self._hsize
end = self._header["len"] - 2
buffer = self._buffer[start:end]
if end > 0:
Log.debug("Getting Frame - {}", buffer, ":hex")
data = buffer
else:
data = b""
if (result := ClientDecoder().decode(data)) is None:
raise ModbusIOException("Unable to decode request")
result.slave_id = self._header["uid"]
result.transaction_id = self._header["tid"]
self._buffer = self._buffer[self._header["len"] :] # pylint: disable=attribute-defined-outside-init
Log.debug("Frame advanced, resetting header!!")
callback(result) # defer or push to a thread?


def hunt_frame_start(self, skip_cur_frame: bool, data: bytes) -> int:
"""Scan buffer for a relevant frame start."""
buf_len = len(data)
for i in range(1 if skip_cur_frame else 0, buf_len - self.MIN_SIZE):
if not (self.broadcast or data[i] in self.dev_ids):
continue
if (_fc := data[i + 1]) not in self.fc_calc:
continue
return i
return -i

def decode(self, data: bytes) -> tuple[int, int, int, bytes]:
"""Decode ADU."""
if len(data) < self.MIN_SIZE:
if (buf_len := len(data)) < self.MIN_SIZE:
Log.debug("Short frame: {} wait for more data", data, ":hex")
return 0, 0, 0, b''

while (i := self.hunt_frame_start(False, data)) > 0:
pass
return -i, 0, 0, b''
i = -1
try:
while True:
i += 1
if i > buf_len - self.MIN_SIZE + 1:
break
dev_id = int(data[i])
fc_len = 5
msg_len = fc_len -2 if fc_len > 0 else int(data[i-fc_len])-fc_len+1
if msg_len + i + 2 > buf_len:
break
crc_val = (int(data[i+msg_len]) << 8) + int(data[i+msg_len+1])
if not self.check_CRC(data[i:i+msg_len], crc_val):
Log.debug("Skipping frame CRC with len {} at index {}!", msg_len, i)
raise KeyError
return i+msg_len+2, dev_id, dev_id, data[i+1:i+msg_len]
except KeyError:
i = buf_len
return i, 0, 0, b''


def encode(self, pdu: bytes, device_id: int, _tid: int) -> bytes:
Expand Down
43 changes: 33 additions & 10 deletions test/framers/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,27 +5,28 @@

import pytest

from pymodbus.factory import ClientDecoder, ServerDecoder
from pymodbus.framer import Framer, FramerType
from pymodbus.transport import CommParams, ModbusProtocol


class DummyMessage(Framer):
class DummyFramer(Framer):
"""Implement use of ModbusProtocol."""

def __init__(self,
message_type: FramerType,
framer_type: FramerType,
params: CommParams,
is_server: bool,
device_ids: list[int] | None,
):
"""Initialize a message instance."""
super().__init__(message_type, params, is_server, device_ids)
"""Initialize a frame instance."""
super().__init__(framer_type, params, is_server, device_ids)
self.send = mock.Mock()
self.message_type = message_type
self.framer_type = framer_type

def callback_new_connection(self) -> ModbusProtocol:
"""Call when listener receive new connection request."""
return DummyMessage(self.message_type, self.comm_params, self.is_server, self.device_ids) # pragma: no cover
return DummyFramer(self.framer_type, self.comm_params, self.is_server, self.device_ids) # pragma: no cover

def callback_connected(self) -> None:
"""Call when connection is succcesfull."""
Expand All @@ -37,7 +38,29 @@ def callback_request_response(self, data: bytes, device_id: int, tid: int) -> No
"""Handle received modbus request/response."""


@pytest.fixture(name="dummy_message")
async def prepare_dummy_message():
"""Return message object."""
return DummyMessage
@pytest.fixture(name="entry")
def prepare_entry():
"""Return framer_type."""
return FramerType.RAW

@pytest.fixture(name="is_server")
def prepare_is_server():
"""Return client/server."""
return False

@pytest.fixture(name="dummy_framer")
async def prepare_test_framer(entry, is_server):
"""Return framer object."""
framer = DummyFramer(
entry,
CommParams(),
is_server,
[0, 1],
)
if entry == FramerType.RTU:
func_table = (ServerDecoder if is_server else ClientDecoder)().lookup
for key, ent in func_table.items():
fix_len = ent._rtu_frame_size if hasattr(ent, "_rtu_frame_size") else 0 # pylint: disable=protected-access
cnt_pos = ent. _rtu_byte_count_pos if hasattr(ent, "_rtu_byte_count_pos") else 0 # pylint: disable=protected-access
framer.handle.set_fc_calc(key, fix_len, cnt_pos)
return framer
2 changes: 1 addition & 1 deletion test/framers/test_ascii.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def test_decode(self, frame, packet, used_len, res_id, res):
res_len, tid, dev_id, data = frame.decode(packet)
assert res_len == used_len
assert data == res
assert not tid
assert tid == res_id
assert dev_id == res_id

@pytest.mark.parametrize(
Expand Down
Loading

0 comments on commit 3c0e9ed

Please sign in to comment.