Skip to content

Commit

Permalink
move common framer to base.
Browse files Browse the repository at this point in the history
  • Loading branch information
janiversen committed Jul 2, 2023
1 parent 7caf42d commit b55a751
Show file tree
Hide file tree
Showing 9 changed files with 102 additions and 99 deletions.
14 changes: 0 additions & 14 deletions pymodbus/framer/ascii_framer.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,6 @@ def __init__(self, decoder, client=None):
:param decoder: The decoder implementation to use
"""
super().__init__(decoder, client)
self._buffer = b""
self._header = {"lrc": "0000", "len": 0, "uid": 0x00}
self._hsize = 0x02
self._start = b":"
self._end = b"\r\n"
Expand Down Expand Up @@ -117,18 +115,6 @@ def getFrame(self):
return a2b_hex(buffer)
return b""

def resetFrame(self):
"""Reset the entire message frame.
This allows us to skip ovver errors that may be in the stream.
It is hard to know if we are simply out of sync or if there is
an error in the stream as we have no way to check the start or
end of the message (python just doesn't have the resolution to
check for millisecond delays).
"""
self._buffer = b""
self._header = {"lrc": "0000", "len": 0, "uid": 0x00}

def populateResult(self, result):
"""Populate the modbus result header.
Expand Down
33 changes: 32 additions & 1 deletion pymodbus/framer/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from typing import Any, Dict, Union

from pymodbus.factory import ClientDecoder, ServerDecoder
from pymodbus.logging import Log


# Unit ID, Function Code
Expand Down Expand Up @@ -32,7 +33,15 @@ def __init__(
"""
self.decoder = decoder
self.client = client
self._header: Dict[str, Any] = {}
self._header: Dict[str, Any] = {
"lrc": "0000",
"len": 0,
"uid": 0x00,
"tid": 0,
"pid": 0,
"crc": b"\x00\x00",
}
self._buffer = b""

def _validate_slave_id(self, slaves: list, single: bool) -> bool:
"""Validate if the received data is valid for the client.
Expand Down Expand Up @@ -66,3 +75,25 @@ def recvPacket(self, size):
:return:
"""
return self.client.recv(size)

def resetFrame(self):
"""Reset the entire message frame.
This allows us to skip ovver errors that may be in the stream.
It is hard to know if we are simply out of sync or if there is
an error in the stream as we have no way to check the start or
end of the message (python just doesn't have the resolution to
check for millisecond delays).
"""
Log.debug(
"Resetting frame - Current Frame in buffer - {}", self._buffer, ":hex"
)
self._buffer = b""
self._header = {
"lrc": "0000",
"crc": b"\x00\x00",
"len": 0,
"uid": 0x00,
"pid": 0,
"tid": 0,
}
15 changes: 1 addition & 14 deletions pymodbus/framer/binary_framer.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,7 @@ def __init__(self, decoder, client=None):
:param decoder: The decoder implementation to use
"""
super().__init__(decoder, client)
self._buffer = b""
self._header = {"crc": 0x0000, "len": 0, "uid": 0x00}
# self._header.update({"crc": 0x0000})
self._hsize = 0x01
self._start = b"\x7b" # {
self._end = b"\x7d" # }
Expand Down Expand Up @@ -209,17 +208,5 @@ def _preflight(self, data):
array.append(item)
return bytes(array)

def resetFrame(self):
"""Reset the entire message frame.
This allows us to skip ovver errors that may be in the stream.
It is hard to know if we are simply out of sync or if there is
an error in the stream as we have no way to check the start or
end of the message (python just doesn't have the resolution to
check for millisecond delays).
"""
self._buffer = b""
self._header = {"crc": 0x0000, "len": 0, "uid": 0x00}


# __END__
10 changes: 3 additions & 7 deletions pymodbus/framer/rtu_framer.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,6 @@ def __init__(self, decoder, client=None):
:param decoder: The decoder factory implementation to use
"""
super().__init__(decoder, client)
self._buffer = b""
self._header = {"uid": 0x00, "len": 0, "crc": b"\x00\x00"}
self._hsize = 0x01
self._end = b"\x0d\x0a"
self._min_frame_size = 4
Expand Down Expand Up @@ -115,11 +113,9 @@ def resetFrame(self):
end of the message (python just doesn't have the resolution to
check for millisecond delays).
"""
Log.debug(
"Resetting frame - Current Frame in buffer - {}", self._buffer, ":hex"
)
# self._buffer = b""
self._header = {"uid": 0x00, "len": 0, "crc": b"\x00\x00"}
x = self._buffer
super().resetFrame()
self._buffer = x

def isFrameReady(self):
"""Check if we should continue decode logic.
Expand Down
14 changes: 0 additions & 14 deletions pymodbus/framer/socket_framer.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,6 @@ def __init__(self, decoder, client=None):
:param decoder: The decoder factory implementation to use
"""
super().__init__(decoder, client)
self._buffer = b""
self._header = {"tid": 0, "pid": 0, "len": 0, "uid": 0}
self._hsize = 0x07

# ----------------------------------------------------------------------- #
Expand Down Expand Up @@ -187,18 +185,6 @@ def _process(self, callback, tid, error=False):
else:
callback(result) # defer or push to a thread?

def resetFrame(self):
"""Reset the entire message frame.
This allows us to skip ovver errors that may be in the stream.
It is hard to know if we are simply out of sync or if there is
an error in the stream as we have no way to check the start or
end of the message (python just doesn't have the resolution to
check for millisecond delays).
"""
self._buffer = b""
self._header = {"tid": 0, "pid": 0, "len": 0, "uid": 0}

def getRawFrame(self):
"""Return the complete buffer."""
return self._buffer
Expand Down
13 changes: 0 additions & 13 deletions pymodbus/framer/tls_framer.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,6 @@ def __init__(self, decoder, client=None):
:param decoder: The decoder factory implementation to use
"""
super().__init__(decoder, client)
self._buffer = b""
self._header = {}
self._hsize = 0x0

# ----------------------------------------------------------------------- #
Expand Down Expand Up @@ -148,17 +146,6 @@ def _process(self, callback, error=False):
self.advanceFrame()
callback(result) # defer or push to a thread?

def resetFrame(self):
"""Reset the entire message frame.
This allows us to skip ovver errors that may be in the stream.
It is hard to know if we are simply out of sync or if there is
an error in the stream as we have no way to check the start or
end of the message (python just doesn't have the resolution to
check for millisecond delays).
"""
self._buffer = b""

def getRawFrame(self):
"""Return the complete buffer."""
return self._buffer
Expand Down
2 changes: 1 addition & 1 deletion pymodbus/other_message.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ def __init__(self, status=0x00, **kwargs):
:param status: The status response to report
"""
ModbusResponse.__init__(self, **kwargs)
self.status = status
self.status = status if status < 256 else 255

def encode(self):
"""Encode the response.
Expand Down
64 changes: 34 additions & 30 deletions pymodbus/server/async_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,33 @@ def callback_disconnected(self, call_exc: Exception) -> None:
traceback.format_exc(),
)

async def inner_handle(self):
"""Handle handler."""
slaves = self.server.context.slaves()
# this is an asyncio.Queue await, it will never fail
data = await self._recv_()
if isinstance(data, tuple):
# addr is populated when talking over UDP
data, *addr = data
else:
addr = (None,) # empty tuple

# if broadcast is enabled make sure to
# process requests to address 0
if self.server.broadcast_enable: # pragma: no cover
if 0 not in slaves:
slaves.append(0)

Log.debug("Handling data: {}", data, ":hex")

single = self.server.context.single
self.framer.processIncomingPacket(
data=data,
callback=lambda x: self.execute(x, *addr),
slave=slaves,
single=single,
)

async def handle(self):
"""Return Asyncio coroutine which represents a single conversation.
Expand All @@ -145,31 +172,7 @@ async def handle(self):
reset_frame = False
while self.running:
try:
slaves = self.server.context.slaves()
# this is an asyncio.Queue await, it will never fail
data = await self._recv_()
if isinstance(data, tuple):
# addr is populated when talking over UDP
data, *addr = data
else:
addr = (None,) # empty tuple

# if broadcast is enabled make sure to
# process requests to address 0
if self.server.broadcast_enable: # pragma: no cover
if 0 not in slaves:
slaves.append(0)

Log.debug("Handling data: {}", data, ":hex")

single = self.server.context.single
self.framer.processIncomingPacket(
data=data,
callback=lambda x: self.execute(x, *addr),
slave=slaves,
single=single,
)

await self.inner_handle()
except asyncio.CancelledError:
# catch and ignore cancellation errors
if self.running:
Expand Down Expand Up @@ -251,7 +254,6 @@ def __send(msg, *addr):
if kwargs.get("skip_encoding", False):
__send(message, *addr)
elif message.should_respond:
# self.server.control.Counter.BusMessage += 1
pdu = self.framer.buildPacket(message)
__send(pdu, *addr)
else:
Expand Down Expand Up @@ -394,10 +396,12 @@ async def server_close(self):
active_connecions = self.local_active_connections.copy()
for k_item, v_item in active_connecions.items():
Log.warning("aborting active session {}", k_item)
v_item.transport.close()
await asyncio.sleep(0.1)
v_item.handler_task.cancel()
await v_item.handler_task
if v_item.transport:
v_item.transport.close()
await asyncio.sleep(0.1)
if v_item.handler_task:
v_item.handler_task.cancel()
await v_item.handler_task
self.local_active_connections = {}
self.transport_close()

Expand Down
36 changes: 31 additions & 5 deletions test/test_framers.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,15 +55,21 @@ def test_framer_initialization(framer):
assert framer.decoder == decoder
if isinstance(framer, ModbusAsciiFramer):
assert framer._header == { # pylint: disable=protected-access
"tid": 0,
"pid": 0,
"lrc": "0000",
"len": 0,
"uid": 0x00,
"crc": b"\x00\x00",
}
assert framer._hsize == 0x02 # pylint: disable=protected-access
assert framer._start == b":" # pylint: disable=protected-access
assert framer._end == b"\r\n" # pylint: disable=protected-access
elif isinstance(framer, ModbusRtuFramer):
assert framer._header == { # pylint: disable=protected-access
"tid": 0,
"pid": 0,
"lrc": "0000",
"uid": 0x00,
"len": 0,
"crc": b"\x00\x00",
Expand All @@ -73,7 +79,10 @@ def test_framer_initialization(framer):
assert framer._min_frame_size == 4 # pylint: disable=protected-access
else:
assert framer._header == { # pylint: disable=protected-access
"crc": 0x0000,
"tid": 0,
"pid": 0,
"lrc": "0000",
"crc": b"\x00\x00",
"len": 0,
"uid": 0x00,
}
Expand Down Expand Up @@ -143,9 +152,12 @@ def test_rtu_reset_framer(rtu_framer, data):
rtu_framer._buffer = data # pylint: disable=protected-access
rtu_framer.resetFrame()
assert rtu_framer._header == { # pylint: disable=protected-access
"uid": 0x00,
"len": 0,
"lrc": "0000",
"crc": b"\x00\x00",
"len": 0,
"uid": 0x00,
"pid": 0,
"tid": 0,
}


Expand Down Expand Up @@ -190,11 +202,25 @@ def test_rtu_populate_header_fail(rtu_framer, data):
[
(
b"\x11\x03\x06\xAE\x41\x56\x52\x43\x40\x49\xAD",
{"crc": b"\x49\xAD", "uid": 17, "len": 11},
{
"crc": b"\x49\xAD",
"uid": 17,
"len": 11,
"lrc": "0000",
"tid": 0,
"pid": 0,
},
),
(
b"\x11\x03\x06\xAE\x41\x56\x52\x43\x40\x49\xAD\x11\x03",
{"crc": b"\x49\xAD", "uid": 17, "len": 11},
{
"crc": b"\x49\xAD",
"uid": 17,
"len": 11,
"lrc": "0000",
"tid": 0,
"pid": 0,
},
),
],
)
Expand Down

0 comments on commit b55a751

Please sign in to comment.