Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
MaskWriteRegisterRequest/MaskWriteRegisterResponse  to register_write_message.py from file_message.py
  • Loading branch information
dhoomakethu committed Jun 22, 2017
1 parent 746a9cd commit 3801477
Show file tree
Hide file tree
Showing 5 changed files with 168 additions and 158 deletions.
11 changes: 6 additions & 5 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@
Version 1.3.1
------------------------------------------------------------
* Recall socket recv until get a complete response
* register_write_message.py: Observe skip_encode option when encoding a single register request
* fix wrong expected response length for coils and discrete inputs
* fix decode errors with ReadDeviceInformationRequest and ReportSlaveIdRequest on Python3
* python3 compatible examples [WIP]
* misc updates with examples
* Register_write_message.py: Observe skip_encode option when encoding a single register request
* Fix wrong expected response length for coils and discrete inputs
* Fix decode errors with ReadDeviceInformationRequest and ReportSlaveIdRequest on Python3
* Move MaskWriteRegisterRequest/MaskWriteRegisterResponse to register_write_message.py from file_message.py
* Python3 compatible examples [WIP]
* Misc updates with examples

Version 1.3.0.rc2
------------------------------------------------------------
Expand Down
91 changes: 0 additions & 91 deletions pymodbus/file_message.py
Original file line number Diff line number Diff line change
Expand Up @@ -279,96 +279,6 @@ def decode(self, data):
if decoded[0] == 0x06: self.records.append(record)


class MaskWriteRegisterRequest(ModbusRequest):
'''
This function code is used to modify the contents of a specified holding
register using a combination of an AND mask, an OR mask, and the
register's current contents. The function can be used to set or clear
individual bits in the register.
'''
function_code = 0x16
_rtu_frame_size = 10


def __init__(self, address=0x0000, and_mask=0xffff, or_mask=0x0000, **kwargs):
''' Initializes a new instance
:param address: The mask pointer address (0x0000 to 0xffff)
:param and_mask: The and bitmask to apply to the register address
:param or_mask: The or bitmask to apply to the register address
'''
ModbusRequest.__init__(self, **kwargs)
self.address = address
self.and_mask = and_mask
self.or_mask = or_mask

def encode(self):
''' Encodes the request packet
:returns: The byte encoded packet
'''
return struct.pack('>HHH', self.address, self.and_mask, self.or_mask)

def decode(self, data):
''' Decodes the incoming request
:param data: The data to decode into the address
'''
self.address, self.and_mask, self.or_mask = struct.unpack('>HHH', data)

def execute(self, context):
''' Run a mask write register request against the store
:param context: The datastore to request from
:returns: The populated response
'''
if not (0x0000 <= self.and_mask <= 0xffff):
return self.doException(merror.IllegalValue)
if not (0x0000 <= self.or_mask <= 0xffff):
return self.doException(merror.IllegalValue)
if not context.validate(self.function_code, self.address, 1):
return self.doException(merror.IllegalAddress)
values = context.getValues(self.function_code, self.address, 1)[0]
values = ((values & self.and_mask) | self.or_mask)
context.setValues(self.function_code, self.address, [values])
return MaskWriteRegisterResponse(self.address, self.and_mask, self.or_mask)


class MaskWriteRegisterResponse(ModbusResponse):
'''
The normal response is an echo of the request. The response is returned
after the register has been written.
'''
function_code = 0x16
_rtu_frame_size = 10

def __init__(self, address=0x0000, and_mask=0xffff, or_mask=0x0000, **kwargs):
''' Initializes a new instance
:param address: The mask pointer address (0x0000 to 0xffff)
:param and_mask: The and bitmask applied to the register address
:param or_mask: The or bitmask applied to the register address
'''
ModbusResponse.__init__(self, **kwargs)
self.address = address
self.and_mask = and_mask
self.or_mask = or_mask

def encode(self):
''' Encodes the response
:returns: The byte encoded message
'''
return struct.pack('>HHH', self.address, self.and_mask, self.or_mask)

def decode(self, data):
''' Decodes a the response
:param data: The packet data to decode
'''
self.address, self.and_mask, self.or_mask = struct.unpack('>HHH', data)


class ReadFifoQueueRequest(ModbusRequest):
'''
This function code allows to read the contents of a First-In-First-Out
Expand Down Expand Up @@ -481,6 +391,5 @@ def decode(self, data):
"FileRecord",
"ReadFileRecordRequest", "ReadFileRecordResponse",
"WriteFileRecordRequest", "WriteFileRecordResponse",
"MaskWriteRegisterRequest", "MaskWriteRegisterResponse",
"ReadFifoQueueRequest", "ReadFifoQueueResponse",
]
97 changes: 97 additions & 0 deletions pymodbus/register_write_message.py
Original file line number Diff line number Diff line change
Expand Up @@ -241,10 +241,107 @@ def __str__(self):
params = (self.address, self.count)
return "WriteMultipleRegisterResponse (%d,%d)" % params

class MaskWriteRegisterRequest(ModbusRequest):
'''
This function code is used to modify the contents of a specified holding
register using a combination of an AND mask, an OR mask, and the
register's current contents. The function can be used to set or clear
individual bits in the register.
'''
function_code = 0x16
_rtu_frame_size = 10

def __init__(self, address=0x0000, and_mask=0xffff, or_mask=0x0000,
**kwargs):
''' Initializes a new instance
:param address: The mask pointer address (0x0000 to 0xffff)
:param and_mask: The and bitmask to apply to the register address
:param or_mask: The or bitmask to apply to the register address
'''
ModbusRequest.__init__(self, **kwargs)
self.address = address
self.and_mask = and_mask
self.or_mask = or_mask

def encode(self):
''' Encodes the request packet
:returns: The byte encoded packet
'''
return struct.pack('>HHH', self.address, self.and_mask,
self.or_mask)

def decode(self, data):
''' Decodes the incoming request
:param data: The data to decode into the address
'''
self.address, self.and_mask, self.or_mask = struct.unpack('>HHH',
data)

def execute(self, context):
''' Run a mask write register request against the store
:param context: The datastore to request from
:returns: The populated response
'''
if not (0x0000 <= self.and_mask <= 0xffff):
return self.doException(merror.IllegalValue)
if not (0x0000 <= self.or_mask <= 0xffff):
return self.doException(merror.IllegalValue)
if not context.validate(self.function_code, self.address, 1):
return self.doException(merror.IllegalAddress)
values = context.getValues(self.function_code, self.address, 1)[0]
values = ((values & self.and_mask) | self.or_mask)
context.setValues(self.function_code, self.address, [values])
return MaskWriteRegisterResponse(self.address, self.and_mask,
self.or_mask)


class MaskWriteRegisterResponse(ModbusResponse):
'''
The normal response is an echo of the request. The response is returned
after the register has been written.
'''
function_code = 0x16
_rtu_frame_size = 10

def __init__(self, address=0x0000, and_mask=0xffff, or_mask=0x0000,
**kwargs):
''' Initializes a new instance
:param address: The mask pointer address (0x0000 to 0xffff)
:param and_mask: The and bitmask applied to the register address
:param or_mask: The or bitmask applied to the register address
'''
ModbusResponse.__init__(self, **kwargs)
self.address = address
self.and_mask = and_mask
self.or_mask = or_mask

def encode(self):
''' Encodes the response
:returns: The byte encoded message
'''
return struct.pack('>HHH', self.address, self.and_mask,
self.or_mask)

def decode(self, data):
''' Decodes a the response
:param data: The packet data to decode
'''
self.address, self.and_mask, self.or_mask = struct.unpack('>HHH',
data)


#---------------------------------------------------------------------------#
# Exported symbols
#---------------------------------------------------------------------------#
__all__ = [
"WriteSingleRegisterRequest", "WriteSingleRegisterResponse",
"WriteMultipleRegistersRequest", "WriteMultipleRegistersResponse",
"MaskWriteRegisterRequest", "MaskWriteRegisterResponse"
]
62 changes: 0 additions & 62 deletions test/test_file_message.py
Original file line number Diff line number Diff line change
Expand Up @@ -243,68 +243,6 @@ def testWriteFileRecordResponseRtuFrameSize(self):
size = handle.calculateRtuFrameSize(request)
self.assertEqual(size, 0x0d + 5)

#-----------------------------------------------------------------------#
# Mask Write Register Request
#-----------------------------------------------------------------------#

def testMaskWriteRegisterRequestEncode(self):
''' Test basic bit message encoding/decoding '''
handle = MaskWriteRegisterRequest(0x0000, 0x0101, 0x1010)
result = handle.encode()
self.assertEqual(result, b'\x00\x00\x01\x01\x10\x10')

def testMaskWriteRegisterRequestDecode(self):
''' Test basic bit message encoding/decoding '''
request = b'\x00\x04\x00\xf2\x00\x25'
handle = MaskWriteRegisterRequest()
handle.decode(request)
self.assertEqual(handle.address, 0x0004)
self.assertEqual(handle.and_mask, 0x00f2)
self.assertEqual(handle.or_mask, 0x0025)

def testMaskWriteRegisterRequestExecute(self):
''' Test write register request valid execution '''
context = MockContext(valid=True, default=0x0000)
handle = MaskWriteRegisterRequest(0x0000, 0x0101, 0x1010)
result = handle.execute(context)
self.assertTrue(isinstance(result, MaskWriteRegisterResponse))

def testMaskWriteRegisterRequestInvalidExecute(self):
''' Test write register request execute with invalid data '''
context = MockContext(valid=False, default=0x0000)
handle = MaskWriteRegisterRequest(0x0000, -1, 0x1010)
result = handle.execute(context)
self.assertEqual(ModbusExceptions.IllegalValue,
result.exception_code)

handle = MaskWriteRegisterRequest(0x0000, 0x0101, -1)
result = handle.execute(context)
self.assertEqual(ModbusExceptions.IllegalValue,
result.exception_code)

handle = MaskWriteRegisterRequest(0x0000, 0x0101, 0x1010)
result = handle.execute(context)
self.assertEqual(ModbusExceptions.IllegalAddress,
result.exception_code)

#-----------------------------------------------------------------------#
# Mask Write Register Response
#-----------------------------------------------------------------------#

def testMaskWriteRegisterResponseEncode(self):
''' Test basic bit message encoding/decoding '''
handle = MaskWriteRegisterResponse(0x0000, 0x0101, 0x1010)
result = handle.encode()
self.assertEqual(result, b'\x00\x00\x01\x01\x10\x10')

def testMaskWriteRegisterResponseDecode(self):
''' Test basic bit message encoding/decoding '''
request = b'\x00\x04\x00\xf2\x00\x25'
handle = MaskWriteRegisterResponse()
handle.decode(request)
self.assertEqual(handle.address, 0x0004)
self.assertEqual(handle.and_mask, 0x00f2)
self.assertEqual(handle.or_mask, 0x0025)

#---------------------------------------------------------------------------#
# Main
Expand Down
65 changes: 65 additions & 0 deletions test/test_register_write_messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,71 @@ def testWriteMultipleRegisterRequest(self):
result = request.execute(context)
self.assertEqual(result.function_code, request.function_code)

# -----------------------------------------------------------------------#
# Mask Write Register Request
# -----------------------------------------------------------------------#

def testMaskWriteRegisterRequestEncode(self):
''' Test basic bit message encoding/decoding '''
handle = MaskWriteRegisterRequest(0x0000, 0x0101, 0x1010)
result = handle.encode()
self.assertEqual(result, b'\x00\x00\x01\x01\x10\x10')

def testMaskWriteRegisterRequestDecode(self):
''' Test basic bit message encoding/decoding '''
request = b'\x00\x04\x00\xf2\x00\x25'
handle = MaskWriteRegisterRequest()
handle.decode(request)
self.assertEqual(handle.address, 0x0004)
self.assertEqual(handle.and_mask, 0x00f2)
self.assertEqual(handle.or_mask, 0x0025)

def testMaskWriteRegisterRequestExecute(self):
''' Test write register request valid execution '''
context = MockContext(valid=True, default=0x0000)
handle = MaskWriteRegisterRequest(0x0000, 0x0101, 0x1010)
result = handle.execute(context)
self.assertTrue(isinstance(result, MaskWriteRegisterResponse))

def testMaskWriteRegisterRequestInvalidExecute(self):
''' Test write register request execute with invalid data '''
context = MockContext(valid=False, default=0x0000)
handle = MaskWriteRegisterRequest(0x0000, -1, 0x1010)
result = handle.execute(context)
self.assertEqual(ModbusExceptions.IllegalValue,
result.exception_code)

handle = MaskWriteRegisterRequest(0x0000, 0x0101, -1)
result = handle.execute(context)
self.assertEqual(ModbusExceptions.IllegalValue,
result.exception_code)

handle = MaskWriteRegisterRequest(0x0000, 0x0101, 0x1010)
result = handle.execute(context)
self.assertEqual(ModbusExceptions.IllegalAddress,
result.exception_code)

# -----------------------------------------------------------------------#
# Mask Write Register Response
# -----------------------------------------------------------------------#

def testMaskWriteRegisterResponseEncode(self):
''' Test basic bit message encoding/decoding '''
handle = MaskWriteRegisterResponse(0x0000, 0x0101, 0x1010)
result = handle.encode()
self.assertEqual(result, b'\x00\x00\x01\x01\x10\x10')

def testMaskWriteRegisterResponseDecode(self):
''' Test basic bit message encoding/decoding '''
request = b'\x00\x04\x00\xf2\x00\x25'
handle = MaskWriteRegisterResponse()
handle.decode(request)
self.assertEqual(handle.address, 0x0004)
self.assertEqual(handle.and_mask, 0x00f2)
self.assertEqual(handle.or_mask, 0x0025)



#---------------------------------------------------------------------------#
# Main
#---------------------------------------------------------------------------#
Expand Down

0 comments on commit 3801477

Please sign in to comment.