Skip to content

Commit

Permalink
pymodbus-dev#368 adds broadcast support for sync client and server
Browse files Browse the repository at this point in the history
Adds broadcast_enable parameter to client and server, default value is False. When true it will treat unit_id 0 as broadcast and execute requests on all server slave contexts and not send a response and on the client side will send the request and not try to receive a response.
  • Loading branch information
muhlbaier committed Jan 31, 2019
1 parent 249ad8f commit 60aca50
Show file tree
Hide file tree
Showing 4 changed files with 112 additions and 69 deletions.
1 change: 1 addition & 0 deletions pymodbus/client/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ def __init__(self, framer, **kwargs):
self.transaction = FifoTransactionManager(self, **kwargs)
self._debug = False
self._debugfd = None
self.broadcast_enable = kwargs.get('broadcast_enable', Defaults.broadcast_enable)

# ----------------------------------------------------------------------- #
# Client interface
Expand Down
10 changes: 10 additions & 0 deletions pymodbus/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,15 @@ class Defaults(Singleton):
should be returned or simply ignored. This is useful for the case of a
serial server emulater where a request to a non-existant slave on a bus
will never respond. The client in this case will simply timeout.
.. attribute:: broadcast_enable
When False unit_id 0 will be treated as any other unit_id. When True and
the unit_id is 0 the server will execute all requests on all server
contexts and not respond and the client will skip trying to receive a
response. Default value False does not conform to Modbus spec but maintains
legacy behavior for existing pymodbus users.
'''
Port = 502
Retries = 3
Expand All @@ -104,6 +113,7 @@ class Defaults(Singleton):
ZeroMode = False
IgnoreMissingSlaves = False
ReadSize = 1024
broadcast_enable = False

class ModbusStatus(Singleton):
'''
Expand Down
45 changes: 36 additions & 9 deletions pymodbus/server/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,15 @@ def execute(self, request):
:param request: The decoded request message
"""
try:
context = self.server.context[request.unit_id]
response = request.execute(context)
if self.server.broadcast_enable and request.unit_id == 0:
broadcast = True
# if broadcasting then execute on all slave contexts, note response will be ignored
for unit_id in self.server.context.slaves():
response = request.execute(self.server.context[unit_id])
else:
broadcast = False
context = self.server.context[request.unit_id]
response = request.execute(context)
except NoSuchSlaveException as ex:
_logger.debug("requested slave does "
"not exist: %s" % request.unit_id )
Expand All @@ -71,9 +78,11 @@ def execute(self, request):
_logger.debug("Datastore unable to fulfill request: "
"%s; %s", ex, traceback.format_exc())
response = request.doException(merror.SlaveFailure)
response.transaction_id = request.transaction_id
response.unit_id = request.unit_id
self.send(response)
# no response when broadcasting
if not broadcast:
response.transaction_id = request.transaction_id
response.unit_id = request.unit_id
self.send(response)

# ----------------------------------------------------------------------- #
# Base class implementations
Expand Down Expand Up @@ -105,6 +114,12 @@ def handle(self):
data = self.request.recv(1024)
if data:
units = self.server.context.slaves()
if not isinstance(units, (list, tuple)):
units = [units]
# if broadcast is enabled make sure to process requests to address 0
if self.server.broadcast_enable:
if 0 not in units:
units.append(0)
single = self.server.context.single
self.framer.processIncomingPacket(data, self.execute,
units, single=single)
Expand Down Expand Up @@ -288,8 +303,10 @@ def __init__(self, context, framer=None, identity=None,
ModbusConnectedRequestHandler
:param allow_reuse_address: Whether the server will allow the
reuse of an address.
:param ignore_missing_slaves: True to not send errors on a request
to a missing slave
:param ignore_missing_slaves: True to not send errors on a request
to a missing slave
:param broadcast_enable: True to treat unit_id 0 as broadcast address,
False to treat 0 as any other unit_id
"""
self.threads = []
self.allow_reuse_address = allow_reuse_address
Expand All @@ -301,6 +318,8 @@ def __init__(self, context, framer=None, identity=None,
self.handler = handler or ModbusConnectedRequestHandler
self.ignore_missing_slaves = kwargs.get('ignore_missing_slaves',
Defaults.IgnoreMissingSlaves)
self.broadcast_enable = kwargs.get('broadcast_enable',
Defaults.broadcast_enable)

if isinstance(identity, ModbusDeviceIdentification):
self.control.Identity.update(identity)
Expand Down Expand Up @@ -358,7 +377,9 @@ def __init__(self, context, framer=None, identity=None, address=None,
:param handler: A handler for each client session; default is
ModbusDisonnectedRequestHandler
:param ignore_missing_slaves: True to not send errors on a request
to a missing slave
to a missing slave
:param broadcast_enable: True to treat unit_id 0 as broadcast address,
False to treat 0 as any other unit_id
"""
self.threads = []
self.decoder = ServerDecoder()
Expand All @@ -369,6 +390,8 @@ def __init__(self, context, framer=None, identity=None, address=None,
self.handler = handler or ModbusDisconnectedRequestHandler
self.ignore_missing_slaves = kwargs.get('ignore_missing_slaves',
Defaults.IgnoreMissingSlaves)
self.broadcast_enable = kwargs.get('broadcast_enable',
Defaults.broadcast_enable)

if isinstance(identity, ModbusDeviceIdentification):
self.control.Identity.update(identity)
Expand Down Expand Up @@ -423,7 +446,9 @@ def __init__(self, context, framer=None, identity=None, **kwargs):
:param baudrate: The baud rate to use for the serial device
:param timeout: The timeout to use for the serial device
:param ignore_missing_slaves: True to not send errors on a request
to a missing slave
to a missing slave
:param broadcast_enable: True to treat unit_id 0 as broadcast address,
False to treat 0 as any other unit_id
"""
self.threads = []
self.decoder = ServerDecoder()
Expand All @@ -442,6 +467,8 @@ def __init__(self, context, framer=None, identity=None, **kwargs):
self.timeout = kwargs.get('timeout', Defaults.Timeout)
self.ignore_missing_slaves = kwargs.get('ignore_missing_slaves',
Defaults.IgnoreMissingSlaves)
self.broadcast_enable = kwargs.get('broadcast_enable',
Defaults.broadcast_enable)
self.socket = None
if self._connect():
self.is_running = True
Expand Down
125 changes: 65 additions & 60 deletions pymodbus/transaction.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,53 +118,53 @@ def execute(self, request):
_logger.debug("Clearing current Frame : - {}".format(_buffer))
self.client.framer.resetFrame()

expected_response_length = None
if not isinstance(self.client.framer, ModbusSocketFramer):
if hasattr(request, "get_response_pdu_size"):
response_pdu_size = request.get_response_pdu_size()
if isinstance(self.client.framer, ModbusAsciiFramer):
response_pdu_size = response_pdu_size * 2
if response_pdu_size:
expected_response_length = self._calculate_response_length(response_pdu_size)
if request.unit_id == 0:
full = True
expected_response_length = 0
elif request.unit_id in self._no_response_devices:
full = True
if request.unit_id == 0 and self.client.broadcast_enable:
response, last_exception = self._transact(request, None)
response = b'Broadcast write sent - no response expected'
else:
full = False
c_str = str(self.client)
if "modbusudpclient" in c_str.lower().strip():
full = True
if not expected_response_length:
expected_response_length = Defaults.ReadSize
response, last_exception = self._transact(request,
expected_response_length,
full=full
)
if not response and (
request.unit_id not in self._no_response_devices):
self._no_response_devices.append(request.unit_id)
elif request.unit_id in self._no_response_devices and response:
self._no_response_devices.remove(request.unit_id)
if not response and self.retry_on_empty and retries:
while retries > 0:
if hasattr(self.client, "state"):
_logger.debug("RESETTING Transaction state to "
"'IDLE' for retry")
self.client.state = ModbusTransactionState.IDLE
_logger.debug("Retry on empty - {}".format(retries))
response, last_exception = self._transact(
request,
expected_response_length
)
if not response:
retries -= 1
continue
# Remove entry
expected_response_length = None
if not isinstance(self.client.framer, ModbusSocketFramer):
if hasattr(request, "get_response_pdu_size"):
response_pdu_size = request.get_response_pdu_size()
if isinstance(self.client.framer, ModbusAsciiFramer):
response_pdu_size = response_pdu_size * 2
if response_pdu_size:
expected_response_length = self._calculate_response_length(response_pdu_size)
if request.unit_id in self._no_response_devices:
full = True
else:
full = False
c_str = str(self.client)
if "modbusudpclient" in c_str.lower().strip():
full = True
if not expected_response_length:
expected_response_length = Defaults.ReadSize
response, last_exception = self._transact(request,
expected_response_length,
full=full
)
if not response and (
request.unit_id not in self._no_response_devices):
self._no_response_devices.append(request.unit_id)
elif request.unit_id in self._no_response_devices and response:
self._no_response_devices.remove(request.unit_id)
break
if expected_response_length > 0:
if not response and self.retry_on_empty and retries:
while retries > 0:
if hasattr(self.client, "state"):
_logger.debug("RESETTING Transaction state to "
"'IDLE' for retry")
self.client.state = ModbusTransactionState.IDLE
_logger.debug("Retry on empty - {}".format(retries))
response, last_exception = self._transact(
request,
expected_response_length
)
if not response:
retries -= 1
continue
# Remove entry
self._no_response_devices.remove(request.unit_id)
break
addTransaction = partial(self.addTransaction,
tid=request.transaction_id)
self.client.framer.processIncomingPacket(response,
Expand All @@ -180,14 +180,12 @@ def execute(self, request):
"/Unable to decode response")
response = ModbusIOException(last_exception,
request.function_code)
else:
_logger.debug("No response expected when sending to broadcast address 0")
if hasattr(self.client, "state"):
_logger.debug("Changing transaction state from "
"'PROCESSING REPLY' to "
"'TRANSACTION_COMPLETE'")
self.client.state = (
ModbusTransactionState.TRANSACTION_COMPLETE)
if hasattr(self.client, "state"):
_logger.debug("Changing transaction state from "
"'PROCESSING REPLY' to "
"'TRANSACTION_COMPLETE'")
self.client.state = (
ModbusTransactionState.TRANSACTION_COMPLETE)
return response
except ModbusIOException as ex:
# Handle decode errors in processIncomingPacket method
Expand All @@ -211,13 +209,20 @@ def _transact(self, packet, response_length, full=False):
if _logger.isEnabledFor(logging.DEBUG):
_logger.debug("SEND: " + hexlify_packets(packet))
size = self._send(packet)
if size:
_logger.debug("Changing transaction state from 'SENDING' "
"to 'WAITING FOR REPLY'")
self.client.state = ModbusTransactionState.WAITING_FOR_REPLY
result = self._recv(response_length, full)
if _logger.isEnabledFor(logging.DEBUG):
_logger.debug("RECV: " + hexlify_packets(result))
if response_length is not None:
if size:
_logger.debug("Changing transaction state from 'SENDING' "
"to 'WAITING FOR REPLY'")
self.client.state = ModbusTransactionState.WAITING_FOR_REPLY
result = self._recv(response_length, full)
if _logger.isEnabledFor(logging.DEBUG):
_logger.debug("RECV: " + hexlify_packets(result))
else:
if size:
_logger.debug("Changing transaction state from 'SENDING' "
"to 'TRANSACTION_COMPLETE'")
self.client.state = ModbusTransactionState.TRANSACTION_COMPLETE
result = b''
except (socket.error, ModbusIOException,
InvalidMessageReceivedException) as msg:
self.client.close()
Expand Down

0 comments on commit 60aca50

Please sign in to comment.