Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove callback from framer. #2374

Merged
merged 19 commits into from
Oct 13, 2024
2 changes: 1 addition & 1 deletion examples/message_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ def decode(self, message):
print(f"{decoder.decoder.__class__.__name__}")
print("-" * 80)
try:
decoder.processIncomingPacket(message, self.report)
self.report(decoder.processIncomingFrame(message))
except Exception: # pylint: disable=broad-except
self.check_errors(decoder, message)

Expand Down
2 changes: 1 addition & 1 deletion pymodbus/client/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ async def async_execute(self, request) -> ModbusResponse:
:meta private:
"""
request.transaction_id = self.ctx.transaction.getNextTID()
packet = self.ctx.framer.buildPacket(request)
packet = self.ctx.framer.buildFrame(request)

count = 0
while count <= self.retries:
Expand Down
3 changes: 2 additions & 1 deletion pymodbus/client/modbusclientprotocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,8 @@ def callback_data(self, data: bytes, addr: tuple | None = None) -> int:

returns number of bytes consumed
"""
self.framer.processIncomingPacket(data, self._handle_response)
if (pdu := self.framer.processIncomingFrame(data)):
self._handle_response(pdu)
return len(data)

def __str__(self):
Expand Down
19 changes: 10 additions & 9 deletions pymodbus/framer/ascii.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,44 +33,45 @@ class FramerAscii(FramerBase):
MIN_SIZE = 10


def specific_decode(self, data: bytes, data_len: int) -> tuple[int, bytes]:
def decode(self, data: bytes) -> tuple[int, int, int, bytes]:
"""Decode ADU."""
used_len = 0
data_len = len(data)
while True:
if data_len - used_len < self.MIN_SIZE:
return used_len, self.EMPTY
Log.debug("Short frame: {} wait for more data", data, ":hex")
return used_len, 0, 0, self.EMPTY
buffer = data[used_len:]
if buffer[0:1] != self.START:
if (i := buffer.find(self.START)) == -1:
Log.debug("No frame start in data: {}, wait for data", data, ":hex")
return data_len, self.EMPTY
return data_len, 0, 0, self.EMPTY
used_len += i
continue
if (end := buffer.find(self.END)) == -1:
Log.debug("Incomplete frame: {} wait for more data", data, ":hex")
return used_len, self.EMPTY
self.incoming_dev_id = int(buffer[1:3], 16)
self.incoming_tid = self.incoming_dev_id
return used_len, 0, 0, self.EMPTY
dev_id = int(buffer[1:3], 16)
lrc = int(buffer[end - 2: end], 16)
msg = a2b_hex(buffer[1 : end - 2])
used_len += end + 2
if not self.check_LRC(msg, lrc):
Log.debug("LRC wrong in frame: {} skipping", data, ":hex")
continue
return used_len, msg[1:]
return used_len, dev_id, 0, msg[1:]

def encode(self, data: bytes, device_id: int, _tid: int) -> bytes:
"""Encode ADU."""
dev_id = device_id.to_bytes(1,'big')
checksum = self.compute_LRC(dev_id + data)
packet = (
frame = (
self.START +
f"{device_id:02x}".encode() +
b2a_hex(data) +
f"{checksum:02x}".encode() +
self.END
).upper()
return packet
return frame

@classmethod
def compute_LRC(cls, data: bytes) -> int:
Expand Down
96 changes: 45 additions & 51 deletions pymodbus/framer/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from pymodbus.exceptions import ModbusIOException
from pymodbus.factory import ClientDecoder, ServerDecoder
from pymodbus.logging import Log
from pymodbus.pdu import ModbusRequest, ModbusResponse
from pymodbus.pdu import ModbusPDU


class FramerType(str, Enum):
Expand Down Expand Up @@ -39,89 +39,83 @@ def __init__(
if 0 in dev_ids:
dev_ids = []
self.dev_ids = dev_ids
self.incoming_dev_id = 0
self.incoming_tid = 0
self.databuffer = b""

def decode(self, data: bytes) -> tuple[int, bytes]:
def decode(self, _data: bytes) -> tuple[int, int, int, bytes]:
"""Decode ADU.

returns:
used_len (int) or 0 to read more
dev_id,
tid,
modbus request/response (bytes)
"""
if (data_len := len(data)) < self.MIN_SIZE:
Log.debug("Very short frame (NO MBAP): {} wait for more data", data, ":hex")
return 0, self.EMPTY
used_len, res_data = self.specific_decode(data, data_len)
if not res_data:
self.incoming_dev_id = 0
self.incoming_tid = 0
return used_len, res_data

def specific_decode(self, data: bytes, data_len: int) -> tuple[int, bytes]:
"""Decode ADU.
return 0, 0, 0, self.EMPTY

returns:
used_len (int) or 0 to read more
modbus request/response (bytes)
"""
return data_len, data


def encode(self, pdu: bytes, dev_id: int, _tid: int) -> bytes:
def encode(self, data: bytes, dev_id: int, _tid: int) -> bytes:
"""Encode ADU.

returns:
modbus ADU (bytes)
"""
if dev_id and dev_id not in self.dev_ids:
self.dev_ids.append(dev_id)
return pdu
return data

def buildPacket(self, message: ModbusRequest | ModbusResponse) -> bytes:
def buildFrame(self, message: ModbusPDU) -> bytes:
"""Create a ready to send modbus packet.

:param message: The populated request/response to send
"""
data = message.function_code.to_bytes(1,'big') + message.encode()
packet = self.encode(data, message.slave_id, message.transaction_id)
return packet
frame = self.encode(data, message.slave_id, message.transaction_id)
return frame

def processIncomingPacket(self, data: bytes, callback, tid=None):
def processIncomingFrame(self, data: bytes, tid=None) -> ModbusPDU | None:
"""Process new packet pattern.

This takes in a new request packet, adds it to the current
packet stream, and performs framing on it. That is, checks
for complete messages, and once found, will process all that
exist. This handles the case when we read N + 1 or 1 // N
messages at a time instead of 1.

The processed and decoded messages are pushed to the callback
function to process and send.
exist.
"""
Log.debug("Processing: {}", data, ":hex")
self.databuffer += data
while True:
if self.databuffer == b'':
return
used_len, data = self.decode(self.databuffer)
try:
used_len, pdu = self._processIncomingFrame(self.databuffer, tid=tid)
if not used_len:
return None
if pdu:
self.databuffer = self.databuffer[used_len:]
return pdu
except ModbusIOException as exc:
self.databuffer = self.EMPTY
raise exc
self.databuffer = self.databuffer[used_len:]

def _processIncomingFrame(self, data: bytes, tid=None) -> tuple[int, ModbusPDU | None]:
"""Process new packet pattern.

This takes in a new request packet, adds it to the current
packet stream, and performs framing on it. That is, checks
for complete messages, and once found, will process all that
exist.
"""
Log.debug("Processing: {}", data, ":hex")
while True:
if not data:
if used_len:
continue
return
if self.dev_ids and self.incoming_dev_id not in self.dev_ids:
Log.debug("Not a valid slave id - {}, ignoring!!", self.incoming_dev_id)
self.databuffer = b''
continue
if (result := self.decoder.decode(data)) is None:
self.databuffer = b''
return 0, None
used_len, dev_id, tid, frame_data = self.decode(self.databuffer)
if not frame_data:
return used_len, None
if self.dev_ids and dev_id not in self.dev_ids:
Log.debug("Not a valid slave id - {}, ignoring!!", dev_id)
return used_len, None
if (result := self.decoder.decode(frame_data)) is None:
raise ModbusIOException("Unable to decode request")
result.slave_id = self.incoming_dev_id
result.transaction_id = self.incoming_tid
result.slave_id = dev_id
result.transaction_id = tid
Log.debug("Frame advanced, resetting header!!")
if tid and result.transaction_id and tid != result.transaction_id:
self.databuffer = b''
else:
callback(result) # defer or push to a thread?
return used_len, None
return used_len, result
22 changes: 10 additions & 12 deletions pymodbus/framer/rtu.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,39 +97,37 @@ def generate_crc16_table(cls) -> list[int]:
crc16_table: list[int] = [0]


def specific_decode(self, data: bytes, data_len: int) -> tuple[int, bytes]:
def decode(self, data: bytes) -> tuple[int, int, int, bytes]:
"""Decode ADU."""
data_len = len(data)
for used_len in range(data_len):
if data_len - used_len < self.MIN_SIZE:
Log.debug("Short frame: {} wait for more data", data, ":hex")
return used_len, self.EMPTY
self.incoming_dev_id = int(data[used_len])
return used_len, 0, 0, self.EMPTY
dev_id = int(data[used_len])
func_code = int(data[used_len + 1])
if (self.dev_ids and self.incoming_dev_id not in self.dev_ids) or func_code & 0x7F not in self.decoder.lookup:
if func_code & 0x7F not in self.decoder.lookup:
continue
pdu_class = self.decoder.lookupPduClass(func_code)
if not (size := pdu_class.calculateRtuFrameSize(data[used_len:])):
size = data_len +1
if data_len < used_len +size:
Log.debug("Frame - not ready")
# if no_recur:
# return used_len, self.EMPTY
# res_len, res_data = self.hunt_second_frame(data[used_len:], data_len - used_len)
return used_len, self.EMPTY
return used_len, dev_id, 0, self.EMPTY
start_crc = used_len + size -2
crc = data[start_crc : start_crc + 2]
crc_val = (int(crc[0]) << 8) + int(crc[1])
if not FramerRTU.check_CRC(data[used_len : start_crc], crc_val):
Log.debug("Frame check failed, ignoring!!")
continue
return start_crc + 2, data[used_len + 1 : start_crc]
return 0, self.EMPTY
return start_crc + 2, dev_id, 0, data[used_len + 1 : start_crc]
return 0, 0, 0, self.EMPTY


def encode(self, pdu: bytes, device_id: int, _tid: int) -> bytes:
"""Encode ADU."""
packet = device_id.to_bytes(1,'big') + pdu
return packet + FramerRTU.compute_CRC(packet).to_bytes(2,'big')
frame = device_id.to_bytes(1,'big') + pdu
return frame + FramerRTU.compute_CRC(frame).to_bytes(2,'big')

@classmethod
def check_CRC(cls, data: bytes, check: int) -> bool:
Expand Down
17 changes: 10 additions & 7 deletions pymodbus/framer/socket.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,25 +19,28 @@ class FramerSocket(FramerBase):

MIN_SIZE = 8

def specific_decode(self, data: bytes, data_len: int) -> tuple[int, bytes]:
def decode(self, data: bytes) -> tuple[int, int, int, bytes]:
"""Decode ADU."""
self.incoming_tid = int.from_bytes(data[0:2], 'big')
if (data_len := len(data)) < self.MIN_SIZE:
Log.debug("Very short frame (NO MBAP): {} wait for more data", data, ":hex")
return 0, 0, 0, self.EMPTY
tid = int.from_bytes(data[0:2], 'big')
msg_len = int.from_bytes(data[4:6], 'big') + 6
self.incoming_dev_id = int(data[6])
dev_id = int(data[6])
if data_len < msg_len:
Log.debug("Short frame: {} wait for more data", data, ":hex")
return 0, self.EMPTY
return 0, 0, 0, self.EMPTY
if msg_len == 8 and data_len == 9:
msg_len = 9
return msg_len, data[7:msg_len]
return msg_len, dev_id, tid, data[7:msg_len]

def encode(self, pdu: bytes, device_id: int, tid: int) -> bytes:
"""Encode ADU."""
packet = (
frame = (
tid.to_bytes(2, 'big') +
b'\x00\x00' +
(len(pdu) + 1).to_bytes(2, 'big') +
device_id.to_bytes(1, 'big') +
pdu
)
return packet
return frame
4 changes: 2 additions & 2 deletions pymodbus/framer/tls.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ class FramerTLS(FramerBase):
1b Nb
"""

def specific_decode(self, data: bytes, data_len: int) -> tuple[int, bytes]:
def decode(self, data: bytes) -> tuple[int, int, int, bytes]:
"""Decode ADU."""
return data_len, data
return len(data), 0, 0, data

def encode(self, pdu: bytes, _device_id: int, _tid: int) -> bytes:
"""Encode ADU."""
Expand Down
2 changes: 2 additions & 0 deletions pymodbus/pdu/pdu.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ class ModbusPDU:
of encoding it again.
"""

function_code = -1

def __init__(self, slave, transaction, skip_encode):
"""Initialize the base data for a modbus request.

Expand Down
10 changes: 4 additions & 6 deletions pymodbus/server/async_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,10 +123,8 @@ async def inner_handle(self):
# if broadcast is enabled make sure to
# process requests to address 0
Log.debug("Handling data: {}", data, ":hex")
self.framer.processIncomingPacket(
data=data,
callback=lambda x: self.execute(x, *addr),
)
if (pdu := self.framer.processIncomingFrame(data)):
self.execute(pdu, *addr)

async def handle(self) -> None:
"""Coroutine which represents a single master <=> slave conversation.
Expand All @@ -152,7 +150,7 @@ async def handle(self) -> None:
self._log_exception()
self.running = False
except Exception as exc: # pylint: disable=broad-except
# force TCP socket termination as processIncomingPacket
# force TCP socket termination as framer
# should handle application layer errors
Log.error(
'Unknown exception "{}" on stream {} forcing disconnect',
Expand Down Expand Up @@ -212,7 +210,7 @@ def server_send(self, message, addr, **kwargs):
if kwargs.get("skip_encoding", False):
self.send(message, addr=addr)
elif message.should_respond:
pdu = self.framer.buildPacket(message)
pdu = self.framer.buildFrame(message)
self.send(pdu, addr=addr)
else:
Log.debug("Skipping sending response!!")
Expand Down
Loading