Skip to content

Commit

Permalink
add typing hints and doc strings to functions file functions, see #27
Browse files Browse the repository at this point in the history
  • Loading branch information
brainelectronics committed Nov 19, 2022
1 parent 7dfda22 commit ca4a0ee
Showing 1 changed file with 207 additions and 47 deletions.
254 changes: 207 additions & 47 deletions umodbus/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,36 +14,104 @@
# custom packages
from . import const as Const

# typing not natively supported on MicroPython
from .typing import List, Union

def read_coils(starting_address, quantity):

def read_coils(starting_address: int, quantity: int) -> bytes:
"""
Create Modbus Protocol Data Unit for reading coils.
:param starting_address: The starting address
:type starting_address: int
:param quantity: Quantity of coils
:type quantity: int
:returns: Packed Modbus message
:rtype: bytes
"""
if not (1 <= quantity <= 2000):
raise ValueError('invalid number of coils')
raise ValueError('Invalid number of coils')

return struct.pack('>BHH', Const.READ_COILS, starting_address, quantity)


def read_discrete_inputs(starting_address, quantity):
def read_discrete_inputs(starting_address: int, quantity: int) -> bytes:
"""
Create Modbus Protocol Data Unit for reading discrete inputs.
:param starting_address: The starting address
:type starting_address: int
:param quantity: Quantity of coils
:type quantity: int
:returns: Packed Modbus message
:rtype: bytes
"""
if not (1 <= quantity <= 2000):
raise ValueError('invalid number of discrete inputs')
raise ValueError('Invalid number of discrete inputs')

return struct.pack('>BHH', Const.READ_DISCRETE_INPUTS, starting_address, quantity)
return struct.pack('>BHH',
Const.READ_DISCRETE_INPUTS,
starting_address,
quantity)


def read_holding_registers(starting_address, quantity):
def read_holding_registers(starting_address: int, quantity: int) -> bytes:
"""
Create Modbus Protocol Data Unit for reading holding registers.
:param starting_address: The starting address
:type starting_address: int
:param quantity: Quantity of coils
:type quantity: int
:returns: Packed Modbus message
:rtype: bytes
"""
if not (1 <= quantity <= 125):
raise ValueError('invalid number of holding registers')
raise ValueError('Invalid number of holding registers')

return struct.pack('>BHH',
Const.READ_HOLDING_REGISTERS,
starting_address,
quantity)

return struct.pack('>BHH', Const.READ_HOLDING_REGISTERS, starting_address, quantity)

def read_input_registers(starting_address: int, quantity: int) -> bytes:
"""
Create Modbus Protocol Data Unit for reading input registers.
def read_input_registers(starting_address, quantity):
:param starting_address: The starting address
:type starting_address: int
:param quantity: Quantity of coils
:type quantity: int
:returns: Packed Modbus message
:rtype: bytes
"""
if not (1 <= quantity <= 125):
raise ValueError('invalid number of input registers')
raise ValueError('Invalid number of input registers')

return struct.pack('>BHH',
Const.READ_INPUT_REGISTER,
starting_address,
quantity)

return struct.pack('>BHH', Const.READ_INPUT_REGISTER, starting_address, quantity)

def write_single_coil(output_address: int,
output_value: Union[int, bool]) -> bytes:
"""
Create Modbus message to update single coil
def write_single_coil(output_address, output_value):
:param output_address: The output address
:type output_address: int
:param output_value: The output value
:type output_value: Union[int, bool]
:returns: Packed Modbus message
:rtype: bytes
"""
if output_value not in [0x0000, 0xFF00, True]:
raise ValueError('Illegal coil value')

Expand All @@ -53,17 +121,50 @@ def write_single_coil(output_address, output_value):
else:
output_value = 0x0000

return struct.pack('>BHH', Const.WRITE_SINGLE_COIL, output_address, output_value)
return struct.pack('>BHH',
Const.WRITE_SINGLE_COIL,
output_address,
output_value)


def write_single_register(register_address: int,
register_value: int,
signed=True) -> bytes:
"""
Create Modbus message to writes a single register
:param register_address: The register address
:type register_address: int
:param register_value: The register value
:type register_value: int
:param signed: Flag whether data is signed or not
:type signed: bool
def write_single_register(register_address, register_value, signed=True):
:returns: Packed Modbus message
:rtype: bytes
"""
fmt = 'h' if signed else 'H'

return struct.pack('>BH' + fmt, Const.WRITE_SINGLE_REGISTER, register_address, register_value)
return struct.pack('>BH' + fmt,
Const.WRITE_SINGLE_REGISTER,
register_address,
register_value)


def write_multiple_coils(starting_address: int,
value_list: List[int, bool]) -> bytes:
"""
Create Modbus message to update multiple coils
:param starting_address: The starting address
:type starting_address: int
:param value_list: The list of output values
:type value_list: List[int, bool]
def write_multiple_coils(starting_address, value_list):
sectioned_list = [value_list[i:i + 8] for i in range(0, len(value_list), 8)]
:returns: Packed Modbus message
:rtype: bytes
"""
sectioned_list = [value_list[i:i + 8] for i in range(0, len(value_list), 8)] # noqa: E501

output_value = []
for index, byte in enumerate(sectioned_list):
Expand All @@ -72,33 +173,77 @@ def write_multiple_coils(starting_address, value_list):

fmt = 'B' * len(output_value)

return struct.pack('>BHHB' + fmt, Const.WRITE_MULTIPLE_COILS, starting_address, len(value_list), ((len(value_list) - 1) // 8) + 1, *output_value)


def write_multiple_registers(starting_address, register_values, signed=True):
return struct.pack('>BHHB' + fmt,
Const.WRITE_MULTIPLE_COILS,
starting_address,
len(value_list), # quantity of outputs
((len(value_list) - 1) // 8) + 1, # byte count
*output_value)


def write_multiple_registers(starting_address: int,
register_values: List[int],
signed=True):
"""
Create Modbus message to update multiple coils
:param starting_address: The starting address
:type starting_address: int
:param register_values: The list of output value
:type register_values: List[int, bool]
:param signed: Flag whether data is signed or not
:type signed: bool
:returns: Packed Modbus message
:rtype: bytes
"""
quantity = len(register_values)

if not (1 <= quantity <= 123):
raise ValueError('invalid number of registers')

fmt = ('h' if signed else 'H') * quantity
return struct.pack('>BHHB' + fmt, Const.WRITE_MULTIPLE_REGISTERS, starting_address, quantity, quantity * 2, *register_values)


def validate_resp_data(data,
function_code,
address,
value=None,
quantity=None,
signed=True):
return struct.pack('>BHHB' + fmt,
Const.WRITE_MULTIPLE_REGISTERS,
starting_address,
quantity,
quantity * 2,
*register_values)


def validate_resp_data(data: bytes,
function_code: int,
address: int,
value: int = None,
quantity: int = None,
signed: bool = True) -> bool:
"""
Validate the response data.
:param data: The data
:type data: bytes
:param function_code: The function code
:type function_code: int
:param address: The address
:type address: int
:param value: The value
:type value: int
:param quantity: The quantity
:type quantity: int
:param signed: Indicates if signed
:type signed: bool
:returns: True if valid, False otherwise
:rtype: bool
"""
if function_code in [Const.WRITE_SINGLE_COIL, Const.WRITE_SINGLE_REGISTER]:
fmt = '>H' + ('h' if signed else 'H')
resp_addr, resp_value = struct.unpack(fmt, data)

if (address == resp_addr) and (value == resp_value):
return True

elif function_code in [Const.WRITE_MULTIPLE_COILS, Const.WRITE_MULTIPLE_REGISTERS]:
elif function_code in [Const.WRITE_MULTIPLE_COILS,
Const.WRITE_MULTIPLE_REGISTERS]:
resp_addr, resp_qty = struct.unpack('>HH', data)

if (address == resp_addr) and (quantity == resp_qty):
Expand All @@ -107,24 +252,28 @@ def validate_resp_data(data,
return False


def response(function_code,
request_register_addr,
request_register_qty,
def response(function_code: int,
request_register_addr: int,
request_register_qty: int,
request_data,
value_list=None,
signed=True):
if function_code in [Const.READ_COILS, Const.READ_DISCRETE_INPUTS]:
output_value = []
sectioned_list = [value_list[i:i + 8] for i in range(0, len(value_list), 8)]
sectioned_list = [value_list[i:i + 8] for i in range(0, len(value_list), 8)] # noqa: E501

for index, byte in enumerate(sectioned_list):
output = sum(v << i for i, v in enumerate(byte))
output_value.append(output)

fmt = 'B' * len(output_value)
return struct.pack('>BB' + fmt, function_code, ((len(value_list) - 1) // 8) + 1, *output_value)
return struct.pack('>BB' + fmt,
function_code,
((len(value_list) - 1) // 8) + 1,
*output_value)

elif function_code in [Const.READ_HOLDING_REGISTERS, Const.READ_INPUT_REGISTER]:
elif function_code in [Const.READ_HOLDING_REGISTERS,
Const.READ_INPUT_REGISTER]:
quantity = len(value_list)

if not (0x0001 <= quantity <= 0x007D):
Expand All @@ -137,26 +286,37 @@ def response(function_code,
for s in signed:
fmt += 'h' if s else 'H'

return struct.pack('>BB' + fmt, function_code, quantity * 2, *value_list)
return struct.pack('>BB' + fmt,
function_code,
quantity * 2,
*value_list)

elif function_code in [Const.WRITE_SINGLE_COIL, Const.WRITE_SINGLE_REGISTER]:
return struct.pack('>BHBB', function_code, request_register_addr, *request_data)
elif function_code in [Const.WRITE_SINGLE_COIL,
Const.WRITE_SINGLE_REGISTER]:
return struct.pack('>BHBB',
function_code,
request_register_addr,
*request_data)

elif function_code in [Const.WRITE_MULTIPLE_COILS, Const.WRITE_MULTIPLE_REGISTERS]:
return struct.pack('>BHH', function_code, request_register_addr, request_register_qty)
elif function_code in [Const.WRITE_MULTIPLE_COILS,
Const.WRITE_MULTIPLE_REGISTERS]:
return struct.pack('>BHH',
function_code,
request_register_addr,
request_register_qty)


def exception_response(function_code, exception_code):
def exception_response(function_code: int, exception_code: int) -> bytes:
return struct.pack('>BB', Const.ERROR_BIAS + function_code, exception_code)


def float_to_bin(num):
def float_to_bin(num: float) -> bin:
return bin(struct.unpack('!I', struct.pack('!f', num))[0])[2:].zfill(32)


def bin_to_float(binary):
def bin_to_float(binary: bin) -> float:
return struct.unpack('!f', struct.pack('!I', int(binary, 2)))[0]


def int_to_bin(num):
def int_to_bin(num: int) -> str:
return "{0:b}".format(num)

0 comments on commit ca4a0ee

Please sign in to comment.