diff --git a/canopen/emcy.py b/canopen/emcy.py index 118ede4f..1bbdeb75 100644 --- a/canopen/emcy.py +++ b/canopen/emcy.py @@ -130,7 +130,7 @@ def get_desc(self) -> str: return "" def __str__(self): - text = "Code 0x{:04X}".format(self.code) + text = f"Code 0x{self.code:04X}" description = self.get_desc() if description: text = text + ", " + description diff --git a/canopen/lss.py b/canopen/lss.py index d197a5a7..830823a1 100644 --- a/canopen/lss.py +++ b/canopen/lss.py @@ -353,7 +353,7 @@ def __send_configure(self, req_cs, value1=0, value2=0): raise LssError("Response message is not for the request") if error_code != ERROR_NONE: - error_msg = "LSS Error: %d" % error_code + error_msg = f"LSS Error: {error_code}" raise LssError(error_msg) def __send_command(self, message): @@ -368,9 +368,7 @@ def __send_command(self, message): :rtype: bytes """ - message_str = " ".join(["{:02x}".format(x) for x in message]) - logger.info( - "Sending LSS message {}".format(message_str)) + logger.info("Sending LSS message %s", message.hex(" ").upper()) response = None if not self.responses.empty(): diff --git a/canopen/node/local.py b/canopen/node/local.py index de3e23b9..eb74b98d 100644 --- a/canopen/node/local.py +++ b/canopen/node/local.py @@ -86,7 +86,7 @@ def get_data( return obj.encode_raw(obj.default) # Resource not available - logger.info("Resource unavailable for 0x%X:%d", index, subindex) + logger.info("Resource unavailable for 0x%04X:%02X", index, subindex) raise SdoAbortedError(0x060A0023) def set_data( diff --git a/canopen/node/remote.py b/canopen/node/remote.py index 07462422..37eb2ab2 100644 --- a/canopen/node/remote.py +++ b/canopen/node/remote.py @@ -122,18 +122,13 @@ def __load_configuration_helper(self, index, subindex, name, value): """ try: if subindex is not None: - logger.info(str('SDO [{index:#06x}][{subindex:#06x}]: {name}: {value:#06x}'.format( - index=index, - subindex=subindex, - name=name, - value=value))) + logger.info('SDO [0x%04X][0x%02X]: %s: %#06x', + index, subindex, name, value) self.sdo[index][subindex].raw = value else: self.sdo[index].raw = value - logger.info(str('SDO [{index:#06x}]: {name}: {value:#06x}'.format( - index=index, - name=name, - value=value))) + logger.info('SDO [0x%04X]: %s: %#06x', + index, name, value) except SdoCommunicationError as e: logger.warning(str(e)) except SdoAbortedError as e: @@ -142,7 +137,8 @@ def __load_configuration_helper(self, index, subindex, name, value): if e.code != 0x06010002: # Abort codes other than "Attempt to write a read-only object" # should still be reported. - logger.warning('[ERROR SETTING object {0:#06x}:{1:#06x}] {2}'.format(index, subindex, str(e))) + logger.warning('[ERROR SETTING object 0x%04X:%02X] %s', + index, subindex, e) raise def load_configuration(self): diff --git a/canopen/objectdictionary/__init__.py b/canopen/objectdictionary/__init__.py index 19196951..103ebf19 100644 --- a/canopen/objectdictionary/__init__.py +++ b/canopen/objectdictionary/__init__.py @@ -8,6 +8,7 @@ from canopen.objectdictionary.datatypes import * from canopen.objectdictionary.datatypes_24bit import Integer24, Unsigned24 +from canopen.utils import pretty_index logger = logging.getLogger(__name__) @@ -108,8 +109,7 @@ def __getitem__( if isinstance(index, str) and '.' in index: idx, sub = index.split('.', maxsplit=1) return self[idx][sub] - name = "0x%X" % index if isinstance(index, int) else index - raise KeyError("%s was not found in Object Dictionary" % name) + raise KeyError(f"{pretty_index(index)} was not found in Object Dictionary") return item def __setitem__( @@ -180,12 +180,12 @@ def __init__(self, name: str, index: int): self.names = {} def __repr__(self) -> str: - return f"<{type(self).__qualname__} {self.name!r} at 0x{self.index:04X}>" + return f"<{type(self).__qualname__} {self.name!r} at {pretty_index(self.index)}>" def __getitem__(self, subindex: Union[int, str]) -> "ODVariable": item = self.names.get(subindex) or self.subindices.get(subindex) if item is None: - raise KeyError("Subindex %s was not found" % subindex) + raise KeyError(f"Subindex {pretty_index(None, subindex)} was not found") return item def __setitem__(self, subindex: Union[int, str], var: "ODVariable"): @@ -239,7 +239,7 @@ def __init__(self, name: str, index: int): self.names = {} def __repr__(self) -> str: - return f"<{type(self).__qualname__} {self.name!r} at 0x{self.index:04X}>" + return f"<{type(self).__qualname__} {self.name!r} at {pretty_index(self.index)}>" def __getitem__(self, subindex: Union[int, str]) -> "ODVariable": var = self.names.get(subindex) or self.subindices.get(subindex) @@ -249,7 +249,7 @@ def __getitem__(self, subindex: Union[int, str]) -> "ODVariable": elif isinstance(subindex, int) and 0 < subindex < 256: # Create a new variable based on first array item template = self.subindices[1] - name = "%s_%x" % (template.name, subindex) + name = f"{template.name}_{subindex:x}" var = ODVariable(name, self.index, subindex) var.parent = self for attr in ("data_type", "unit", "factor", "min", "max", "default", @@ -258,7 +258,7 @@ def __getitem__(self, subindex: Union[int, str]) -> "ODVariable": if attr in template.__dict__: var.__dict__[attr] = template.__dict__[attr] else: - raise KeyError("Could not find subindex %r" % subindex) + raise KeyError(f"Could not find subindex {pretty_index(None, subindex)}") return var def __len__(self) -> int: @@ -337,8 +337,8 @@ def __init__(self, name: str, index: int, subindex: int = 0): self.pdo_mappable = False def __repr__(self) -> str: - suffix = f":{self.subindex:02X}" if isinstance(self.parent, (ODRecord, ODArray)) else "" - return f"<{type(self).__qualname__} {self.qualname!r} at 0x{self.index:04X}{suffix}>" + subindex = self.subindex if isinstance(self.parent, (ODRecord, ODArray)) else None + return f"<{type(self).__qualname__} {self.qualname!r} at {pretty_index(self.index, subindex)}>" @property def qualname(self) -> str: @@ -417,8 +417,7 @@ def encode_raw(self, value: Union[int, float, str, bytes, bytearray]) -> bytes: if self.max is not None and value > self.max: logger.warning( "Value %d is greater than max value %d", - value, - self.max) + value, self.max) try: return self.STRUCT_TYPES[self.data_type].pack(value) except struct.error: @@ -427,8 +426,7 @@ def encode_raw(self, value: Union[int, float, str, bytes, bytearray]) -> bytes: raise ObjectDictionaryError("Data type has not been specified") else: raise TypeError( - "Do not know how to encode %r to data type %Xh" % ( - value, self.data_type)) + f"Do not know how to encode {value!r} to data type 0x{self.data_type:X}") def decode_phys(self, value: int) -> Union[int, bool, float, str, bytes]: if self.data_type in INTEGER_TYPES: @@ -446,7 +444,7 @@ def decode_desc(self, value: int) -> str: raise ObjectDictionaryError("No value descriptions exist") elif value not in self.value_descriptions: raise ObjectDictionaryError( - "No value description exists for %d" % value) + f"No value description exists for {value}") else: return self.value_descriptions[value] @@ -458,8 +456,8 @@ def encode_desc(self, desc: str) -> int: if description == desc: return value valid_values = ", ".join(self.value_descriptions.values()) - error_text = "No value corresponds to '%s'. Valid values are: %s" - raise ValueError(error_text % (desc, valid_values)) + raise ValueError( + f"No value corresponds to '{desc}'. Valid values are: {valid_values}") def decode_bits(self, value: int, bits: List[int]) -> int: try: diff --git a/canopen/objectdictionary/eds.py b/canopen/objectdictionary/eds.py index a952c8a5..ea133121 100644 --- a/canopen/objectdictionary/eds.py +++ b/canopen/objectdictionary/eds.py @@ -45,7 +45,7 @@ def import_eds(source, node_id): if eds.has_section("Comments"): linecount = int(eds.get("Comments", "Lines"), 0) od.comments = '\n'.join([ - eds.get("Comments", "Line%i" % line) + eds.get("Comments", f"Line{line}") for line in range(1, linecount+1) ]) @@ -54,7 +54,7 @@ def import_eds(source, node_id): else: for rate in [10, 20, 50, 125, 250, 500, 800, 1000]: baudPossible = int( - eds.get("DeviceInfo", "BaudRate_%i" % rate, fallback='0'), 0) + eds.get("DeviceInfo", f"BaudRate_{rate}", fallback='0'), 0) if baudPossible != 0: od.device_information.allowed_baudrates.add(rate*1000) @@ -95,7 +95,7 @@ def import_eds(source, node_id): match = re.match(r"^[Dd]ummy[Uu]sage$", section) if match is not None: for i in range(1, 8): - key = "Dummy%04d" % i + key = f"Dummy{i:04d}" if eds.getint(section, key) == 1: var = objectdictionary.ODVariable(key, i, 0) var.data_type = i @@ -241,7 +241,7 @@ def _revert_variable(var_type, value): elif var_type in datatypes.FLOAT_TYPES: return value else: - return "0x%02X" % value + return f"0x{value:02X}" def build_variable(eds, section, node_id, index, subindex=0): @@ -266,9 +266,9 @@ def build_variable(eds, section, node_id, index, subindex=0): # The eds.get function gives us 0x00A0 now convert to String without hex representation and upper case # The sub2 part is then the section where the type parameter stands try: - var.data_type = int(eds.get("%Xsub1" % var.data_type, "DefaultValue"), 0) + var.data_type = int(eds.get(f"{var.data_type:X}sub1", "DefaultValue"), 0) except NoSectionError: - logger.warning("%s has an unknown or unsupported data type (%X)", name, var.data_type) + logger.warning("%s has an unknown or unsupported data type (0x%X)", name, var.data_type) # Assume DOMAIN to force application to interpret the byte data var.data_type = datatypes.DOMAIN @@ -356,15 +356,15 @@ def export_common(var, eds, section): def export_variable(var, eds): if isinstance(var.parent, ObjectDictionary): # top level variable - section = "%04X" % var.index + section = f"{var.index:04X}" else: # nested variable - section = "%04Xsub%X" % (var.index, var.subindex) + section = f"{var.index:04X}sub{var.subindex:X}" export_common(var, eds, section) - eds.set(section, "ObjectType", "0x%X" % VAR) + eds.set(section, "ObjectType", f"0x{VAR:X}") if var.data_type: - eds.set(section, "DataType", "0x%04X" % var.data_type) + eds.set(section, "DataType", f"0x{var.data_type:04X}") if var.access_type: eds.set(section, "AccessType", var.access_type) @@ -381,7 +381,7 @@ def export_variable(var, eds): eds.set(section, "ParameterValue", _revert_variable(var.data_type, var.value)) - eds.set(section, "DataType", "0x%04X" % var.data_type) + eds.set(section, "DataType", f"0x{var.data_type:04X}") eds.set(section, "PDOMapping", hex(var.pdo_mappable)) if getattr(var, 'min', None) is not None: @@ -397,11 +397,11 @@ def export_variable(var, eds): eds.set(section, "Unit", var.unit) def export_record(var, eds): - section = "%04X" % var.index + section = f"{var.index:04X}" export_common(var, eds, section) - eds.set(section, "SubNumber", "0x%X" % len(var.subindices)) + eds.set(section, "SubNumber", f"0x{len(var.subindices):X}") ot = RECORD if isinstance(var, objectdictionary.ODRecord) else ARR - eds.set(section, "ObjectType", "0x%X" % ot) + eds.set(section, "ObjectType", f"0x{ot:X}") for i in var: export_variable(var[i], eds) @@ -463,7 +463,7 @@ def export_record(var, eds): for rate in od.device_information.allowed_baudrates.union( {10e3, 20e3, 50e3, 125e3, 250e3, 500e3, 800e3, 1000e3}): eds.set( - "DeviceInfo", "BaudRate_%i" % (rate/1000), + "DeviceInfo", f"BaudRate_{rate//1000}", int(rate in od.device_information.allowed_baudrates)) if device_commisioning and (od.bitrate or od.node_id): @@ -477,12 +477,12 @@ def export_record(var, eds): i = 0 for line in od.comments.splitlines(): i += 1 - eds.set("Comments", "Line%i" % i, line) + eds.set("Comments", f"Line{i}", line) eds.set("Comments", "Lines", i) eds.add_section("DummyUsage") for i in range(1, 8): - key = "Dummy%04d" % i + key = f"Dummy{i:04d}" eds.set("DummyUsage", key, 1 if (key in od) else 0) def mandatory_indices(x): @@ -506,7 +506,7 @@ def add_list(section, list): eds.add_section(section) eds.set(section, "SupportedObjects", len(list)) for i in range(0, len(list)): - eds.set(section, (i + 1), "0x%04X" % list[i]) + eds.set(section, (i + 1), f"0x{list[i]:04X}") for index in list: export_object(od[index], eds) diff --git a/canopen/pdo/__init__.py b/canopen/pdo/__init__.py index 8002945a..1c951cf5 100644 --- a/canopen/pdo/__init__.py +++ b/canopen/pdo/__init__.py @@ -39,7 +39,7 @@ class RPDO(PdoBase): def __init__(self, node): super(RPDO, self).__init__(node) self.map = PdoMaps(0x1400, 0x1600, self, 0x200) - logger.debug('RPDO Map as {0}'.format(len(self.map))) + logger.debug('RPDO Map as %d', len(self.map)) def stop(self): """Stop transmission of all RPDOs. @@ -64,7 +64,7 @@ class TPDO(PdoBase): def __init__(self, node): super(TPDO, self).__init__(node) self.map = PdoMaps(0x1800, 0x1A00, self, 0x180) - logger.debug('TPDO Map as {0}'.format(len(self.map))) + logger.debug('TPDO Map as %d', len(self.map)) def stop(self): """Stop transmission of all TPDOs. diff --git a/canopen/pdo/base.py b/canopen/pdo/base.py index 7312f660..1f4bf3e4 100644 --- a/canopen/pdo/base.py +++ b/canopen/pdo/base.py @@ -42,7 +42,7 @@ def __getitem__(self, key): except KeyError: # ignore if one specific PDO does not have the key and try the next one continue - raise KeyError("PDO: {0} was not found in any map".format(key)) + raise KeyError(f"PDO: {key} was not found in any map") def __len__(self): return len(self.map) @@ -201,8 +201,8 @@ def __getitem_by_index(self, value): valid_values.append(var.index) if var.index == value: return var - raise KeyError('{0} not found in map. Valid entries are {1}'.format( - value, ', '.join(str(v) for v in valid_values))) + raise KeyError(f"{value} not found in map. Valid entries are " + f"{', '.join(str(v) for v in valid_values)}") def __getitem_by_name(self, value): valid_values = [] @@ -211,8 +211,8 @@ def __getitem_by_name(self, value): valid_values.append(var.name) if var.name == value: return var - raise KeyError('{0} not found in map. Valid entries are {1}'.format( - value, ', '.join(valid_values))) + raise KeyError(f"{value} not found in map. Valid entries are " + f"{', '.join(valid_values)}") def __getitem__(self, key: Union[int, str]) -> "PdoVariable": var = None @@ -272,7 +272,7 @@ def name(self) -> str: if direction == "Rx": map_id -= 1 node_id = self.cob_id & 0x7F - return "%sPDO%d_node%d" % (direction, map_id, node_id) + return f"{direction}PDO{map_id}_node{node_id}" @property def is_periodic(self) -> bool: @@ -398,7 +398,7 @@ def save(self) -> None: self._fill_map(self.map_array[0].raw) subindex = 1 for var in self.map: - logger.info("Writing %s (0x%X:%d, %d bits) to PDO map", + logger.info("Writing %s (0x%04X:%02X, %d bits) to PDO map", var.name, var.index, var.subindex, var.length) if hasattr(self.pdo_node.node, "curtis_hack") and self.pdo_node.node.curtis_hack: # Curtis HACK: mixed up field order self.map_array[subindex].raw = (var.index | @@ -467,7 +467,7 @@ def add_variable( # We want to see the bit fields within the PDO start_bit = var.offset end_bit = start_bit + var.length - 1 - logger.info("Adding %s (0x%X:%d) at bits %d - %d to PDO map", + logger.info("Adding %s (0x%04X:%02X) at bits %d - %d to PDO map", var.name, var.index, var.subindex, start_bit, end_bit) self.map.append(var) self.length += var.length diff --git a/canopen/profiles/p402.py b/canopen/profiles/p402.py index d5b4ce5e..c94e5356 100644 --- a/canopen/profiles/p402.py +++ b/canopen/profiles/p402.py @@ -249,7 +249,7 @@ def _init_tpdo_values(self): if tpdo.enabled: tpdo.add_callback(self.on_TPDOs_update_callback) for obj in tpdo: - logger.debug('Configured TPDO: {0}'.format(obj.index)) + logger.debug('Configured TPDO: 0x%04X', obj.index) if obj.index not in self.tpdo_values: self.tpdo_values[obj.index] = 0 self.tpdo_pointers[obj.index] = obj @@ -260,31 +260,31 @@ def _init_rpdo_pointers(self): for rpdo in self.rpdo.values(): if rpdo.enabled: for obj in rpdo: - logger.debug('Configured RPDO: {0}'.format(obj.index)) + logger.debug('Configured RPDO: 0x%04X', obj.index) if obj.index not in self.rpdo_pointers: self.rpdo_pointers[obj.index] = obj def _check_controlword_configured(self): if 0x6040 not in self.rpdo_pointers: # Controlword logger.warning( - "Controlword not configured in node {0}'s PDOs. Using SDOs can cause slow performance.".format( - self.id)) + "Controlword not configured in node %s's PDOs. Using SDOs can cause slow performance.", + self.id) def _check_statusword_configured(self): if 0x6041 not in self.tpdo_values: # Statusword logger.warning( - "Statusword not configured in node {0}'s PDOs. Using SDOs can cause slow performance.".format( - self.id)) + "Statusword not configured in node %s's PDOs. Using SDOs can cause slow performance.", + self.id) def _check_op_mode_configured(self): if 0x6060 not in self.rpdo_pointers: # Operation Mode logger.warning( - "Operation Mode not configured in node {0}'s PDOs. Using SDOs can cause slow performance.".format( - self.id)) + "Operation Mode not configured in node %s's PDOs. Using SDOs can cause slow performance.", + self.id) if 0x6061 not in self.tpdo_values: # Operation Mode Display logger.warning( - "Operation Mode Display not configured in node {0}'s PDOs. Using SDOs can cause slow performance.".format( - self.id)) + "Operation Mode Display not configured in node %s's PDOs. Using SDOs can cause slow performance.", + self.id) def reset_from_fault(self): """Reset node from fault and set it to Operation Enable state.""" @@ -357,7 +357,7 @@ def homing(self, timeout=None, restore_op_mode=False): homingstatus = self._homing_status() if homingstatus in ('INTERRUPTED', 'ERROR VELOCITY IS NOT ZERO', 'ERROR VELOCITY IS ZERO'): - raise RuntimeError('Unable to home. Reason: {0}'.format(homingstatus)) + raise RuntimeError(f'Unable to home. Reason: {homingstatus}') if timeout and time.monotonic() > t: raise RuntimeError('Unable to home, timeout reached') logger.info('Homing mode carried out successfully.') @@ -397,8 +397,7 @@ def op_mode(self): if pdo.is_periodic: timestamp = pdo.wait_for_reception(timeout=self.TIMEOUT_CHECK_TPDO) if timestamp is None: - raise RuntimeError("Timeout getting node {0}'s mode of operation.".format( - self.id)) + raise RuntimeError(f"Timeout getting node {self.id}'s mode of operation.") code = self.tpdo_values[0x6061] except KeyError: logger.warning('The object 0x6061 is not a configured TPDO, fallback to SDO') @@ -410,7 +409,7 @@ def op_mode(self, mode): try: if not self.is_op_mode_supported(mode): raise TypeError( - 'Operation mode {m} not suppported on node {n}.'.format(n=self.id, m=mode)) + f'Operation mode {mode} not suppported on node {self.id}.') # Update operation mode in RPDO if possible, fall back to SDO if 0x6060 in self.rpdo_pointers: self.rpdo_pointers[0x6060].raw = OperationMode.NAME2CODE[mode] @@ -423,13 +422,12 @@ def op_mode(self, mode): while self.op_mode != mode: if time.monotonic() > timeout: raise RuntimeError( - "Timeout setting node {0}'s new mode of operation to {1}.".format( - self.id, mode)) - logger.info('Set node {n} operation mode to {m}.'.format(n=self.id, m=mode)) + f"Timeout setting node {self.id}'s new mode of operation to {mode}.") + logger.info('Set node %s operation mode to %s.', self.id, mode) except SdoCommunicationError as e: - logger.warning('[SDO communication error] Cause: {0}'.format(str(e))) + logger.warning('[SDO communication error] Cause: %s', e) except (RuntimeError, ValueError) as e: - logger.warning('{0}'.format(str(e))) + logger.warning(str(e)) def _clear_target_values(self): # [target velocity, target position, target torque] @@ -450,8 +448,8 @@ def is_op_mode_supported(self, mode): if not hasattr(self, '_op_mode_support'): # Cache value only on first lookup, this object should never change. self._op_mode_support = self.sdo[0x6502].raw - logger.info('Caching node {n} supported operation modes 0x{m:04X}'.format( - n=self.id, m=self._op_mode_support)) + logger.info('Caching node %s supported operation modes 0x%04X', + self.id, self._op_mode_support) bits = OperationMode.SUPPORTED[mode] return self._op_mode_support & bits == bits @@ -561,7 +559,7 @@ def _next_state(self, target_state): 'FAULT REACTION ACTIVE', 'FAULT'): raise ValueError( - 'Target state {} cannot be entered programmatically'.format(target_state)) + f'Target state {target_state} cannot be entered programmatically') from_state = self.state if (from_state, target_state) in State402.TRANSITIONTABLE: return target_state @@ -573,7 +571,7 @@ def _change_state(self, target_state): self.controlword = State402.TRANSITIONTABLE[(self.state, target_state)] except KeyError: raise ValueError( - 'Illegal state transition from {f} to {t}'.format(f=self.state, t=target_state)) + f'Illegal state transition from {self.state} to {target_state}') timeout = time.monotonic() + self.TIMEOUT_SWITCH_STATE_SINGLE while self.state != target_state: if time.monotonic() > timeout: diff --git a/canopen/profiles/tools/test_p402_states.py b/canopen/profiles/tools/test_p402_states.py index 39f085f5..721865db 100644 --- a/canopen/profiles/tools/test_p402_states.py +++ b/canopen/profiles/tools/test_p402_states.py @@ -19,19 +19,19 @@ if target_state == from_state: continue if (from_state, target_state) in State402.TRANSITIONTABLE: - print('direct:\t{} -> {}'.format(from_state, target_state)) + print(f'direct:\t{from_state} -> {target_state}') else: next_state = State402.next_state_indirect(from_state) if not next_state: - print('FAIL:\t{} -> {}'.format(from_state, next_state)) + print(f'FAIL:\t{from_state} -> {next_state}') else: - print('\t{} -> {} ...'.format(from_state, next_state)) + print(f'\t{from_state} -> {next_state} ...') try: while from_state != target_state: n.tpdo_values[0x6041] = State402.SW_MASK[from_state][1] next_state = n._next_state(target_state) - print('\t\t-> {}'.format(next_state)) + print(f'\t\t-> {next_state}') from_state = next_state except ValueError: print('\t\t-> disallowed!') diff --git a/canopen/sdo/base.py b/canopen/sdo/base.py index f9daebf5..1099a144 100644 --- a/canopen/sdo/base.py +++ b/canopen/sdo/base.py @@ -5,6 +5,7 @@ from canopen import objectdictionary from canopen.objectdictionary import ObjectDictionary from canopen import variable +from canopen.utils import pretty_index class CrcXmodem: @@ -97,7 +98,7 @@ def __init__(self, sdo_node: SdoBase, od: ObjectDictionary): self.od = od def __repr__(self) -> str: - return f"<{type(self).__qualname__} {self.od.name!r} at 0x{self.od.index:04X}>" + return f"<{type(self).__qualname__} {self.od.name!r} at {pretty_index(self.od.index)}>" def __getitem__(self, subindex: Union[int, str]) -> "SdoVariable": return SdoVariable(self.sdo_node, self.od[subindex]) @@ -119,7 +120,7 @@ def __init__(self, sdo_node: SdoBase, od: ObjectDictionary): self.od = od def __repr__(self) -> str: - return f"<{type(self).__qualname__} {self.od.name!r} at 0x{self.od.index:04X}>" + return f"<{type(self).__qualname__} {self.od.name!r} at {pretty_index(self.od.index)}>" def __getitem__(self, subindex: Union[int, str]) -> "SdoVariable": return SdoVariable(self.sdo_node, self.od[subindex]) diff --git a/canopen/sdo/client.py b/canopen/sdo/client.py index 519bbd5f..8d64a517 100644 --- a/canopen/sdo/client.py +++ b/canopen/sdo/client.py @@ -7,6 +7,7 @@ from canopen.network import CanError from canopen import objectdictionary from canopen.sdo.base import SdoBase +from canopen.utils import pretty_index from canopen.sdo.constants import * from canopen.sdo.exceptions import * @@ -97,7 +98,7 @@ def abort(self, abort_code=0x08000000): # TODO: Is it necessary to include index and subindex? struct.pack_into(" bytes: """May be called to make a read operation without an Object Dictionary. @@ -244,7 +245,7 @@ def __init__(self, sdo_client, index, subindex=0): self._toggle = 0 self.pos = 0 - logger.debug("Reading 0x%X:%d from node %d", index, subindex, + logger.debug("Reading 0x%04X:%02X from node %d", index, subindex, sdo_client.rx_cobid - 0x600) request = bytearray(8) SDO_STRUCT.pack_into(request, 0, REQUEST_UPLOAD, index, subindex) @@ -253,14 +254,14 @@ def __init__(self, sdo_client, index, subindex=0): res_data = response[4:8] if res_command & 0xE0 != RESPONSE_UPLOAD: - raise SdoCommunicationError("Unexpected response 0x%02X" % res_command) + raise SdoCommunicationError(f"Unexpected response 0x{res_command:02X}") # Check that the message is for us if res_index != index or res_subindex != subindex: - raise SdoCommunicationError(( - "Node returned a value for 0x{:X}:{:d} instead, " + raise SdoCommunicationError( + f"Node returned a value for {pretty_index(res_index, res_subindex)} instead, " "maybe there is another SDO client communicating " - "on the same SDO channel?").format(res_index, res_subindex)) + "on the same SDO channel?") self.exp_data = None if res_command & EXPEDITED: @@ -301,7 +302,7 @@ def read(self, size=-1): response = self.sdo_client.request_response(request) res_command, = struct.unpack_from("B", response) if res_command & 0xE0 != RESPONSE_SEGMENT_UPLOAD: - raise SdoCommunicationError("Unexpected response 0x%02X" % res_command) + raise SdoCommunicationError(f"Unexpected response 0x{res_command:02X}") if res_command & TOGGLE_BIT != self._toggle: raise SdoCommunicationError("Toggle bit mismatch") length = 7 - ((res_command >> 1) & 0x7) @@ -362,7 +363,7 @@ def __init__(self, sdo_client, index, subindex=0, size=None, force_segment=False res_command, = struct.unpack_from("B", response) if res_command != RESPONSE_DOWNLOAD: raise SdoCommunicationError( - "Unexpected response 0x%02X" % res_command) + f"Unexpected response 0x{res_command:02X}") else: # Expedited download # Prepare header (first 4 bytes in CAN message) @@ -390,7 +391,7 @@ def write(self, b): res_command, = struct.unpack_from("B", response) if res_command & 0xE0 != RESPONSE_DOWNLOAD: raise SdoCommunicationError( - "Unexpected response 0x%02X" % res_command) + f"Unexpected response 0x{res_command:02X}") bytes_sent = len(b) self._done = True else: @@ -414,8 +415,8 @@ def write(self, b): res_command, = struct.unpack("B", response[0:1]) if res_command & 0xE0 != RESPONSE_SEGMENT_DOWNLOAD: raise SdoCommunicationError( - "Unexpected response 0x%02X (expected 0x%02X)" % - (res_command, RESPONSE_SEGMENT_DOWNLOAD)) + f"Unexpected response 0x{res_command:02X} " + f"(expected 0x{RESPONSE_SEGMENT_DOWNLOAD:02X})") # Advance position self.pos += bytes_sent return bytes_sent @@ -473,7 +474,7 @@ def __init__(self, sdo_client, index, subindex=0, request_crc_support=True): self._ackseq = 0 self._error = False - logger.debug("Reading 0x%X:%d from node %d", index, subindex, + logger.debug("Reading 0x%04X:%02X from node %d", index, subindex, sdo_client.rx_cobid - 0x600) # Initiate Block Upload request = bytearray(8) @@ -487,14 +488,14 @@ def __init__(self, sdo_client, index, subindex=0, request_crc_support=True): if res_command & 0xE0 != RESPONSE_BLOCK_UPLOAD: self._error = True self.sdo_client.abort(0x05040001) - raise SdoCommunicationError("Unexpected response 0x%02X" % res_command) + raise SdoCommunicationError(f"Unexpected response 0x{res_command:02X}") # Check that the message is for us if res_index != index or res_subindex != subindex: self._error = True - raise SdoCommunicationError(( - "Node returned a value for 0x{:X}:{:d} instead, " + raise SdoCommunicationError( + f"Node returned a value for {pretty_index(res_index, res_subindex)} instead, " "maybe there is another SDO client communicating " - "on the same SDO channel?").format(res_index, res_subindex)) + "on the same SDO channel?") if res_command & BLOCK_SIZE_SPECIFIED: self.size, = struct.unpack_from("> 2) & 0x3) else: size = 4 self._node.set_data(index, subindex, request[4:4 + size], check_writable=True) else: - logger.info("Initiating segmented download for 0x%X:%d", index, subindex) + logger.info("Initiating segmented download for 0x%04X:%02X", index, subindex) if command & SIZE_SPECIFIED: size, = struct.unpack_from(" bytes: """May be called to make a read operation without an Object Dictionary. diff --git a/canopen/utils.py b/canopen/utils.py new file mode 100644 index 00000000..37feda93 --- /dev/null +++ b/canopen/utils.py @@ -0,0 +1,22 @@ +"""Additional utility functions for canopen.""" +from typing import Optional, Union + + +def pretty_index(index: Optional[Union[int, str]], + sub: Optional[Union[int, str]] = None): + """Format an index and subindex as a string.""" + + index_str = "" + if isinstance(index, int): + index_str = f"0x{index:04X}" + elif index: + index_str = f"{index!r}" + + sub_str = "" + if isinstance(sub, int): + # Need 0x prefix if index is not present + sub_str = f"{'0x' if not index_str else ''}{sub:02X}" + elif sub: + sub_str = f"{sub!r}" + + return ":".join(s for s in (index_str, sub_str) if s) diff --git a/canopen/variable.py b/canopen/variable.py index a75b68c0..bf584373 100644 --- a/canopen/variable.py +++ b/canopen/variable.py @@ -3,6 +3,7 @@ from collections.abc import Mapping from canopen import objectdictionary +from canopen.utils import pretty_index logger = logging.getLogger(__name__) @@ -23,10 +24,10 @@ def __init__(self, od: objectdictionary.ODVariable): self.subindex = od.subindex def __repr__(self) -> str: - suffix = f":{self.subindex:02X}" if isinstance(self.od.parent, + subindex = self.subindex if isinstance(self.od.parent, (objectdictionary.ODRecord, objectdictionary.ODArray) - ) else "" - return f"<{type(self).__qualname__} {self.name!r} at 0x{self.index:04X}{suffix}>" + ) else None + return f"<{type(self).__qualname__} {self.name!r} at {pretty_index(self.index, subindex)}>" def get_data(self) -> bytes: raise NotImplementedError("Variable is not readable") @@ -76,17 +77,15 @@ def raw(self) -> Union[int, bool, float, str, bytes]: written as :class:`bytes`. """ value = self.od.decode_raw(self.data) - text = "Value of %s (0x%X:%d) is %r" % ( - self.name, self.index, - self.subindex, value) + text = f"Value of {self.name!r} ({pretty_index(self.index, self.subindex)}) is {value!r}" if value in self.od.value_descriptions: - text += " (%s)" % self.od.value_descriptions[value] + text += f" ({self.od.value_descriptions[value]})" logger.debug(text) return value @raw.setter def raw(self, value: Union[int, bool, float, str, bytes]): - logger.debug("Writing %s (0x%X:%d) = %r", + logger.debug("Writing %r (0x%04X:%02X) = %r", self.name, self.index, self.subindex, value) self.data = self.od.encode_raw(value) diff --git a/doc/network.rst b/doc/network.rst index 4fad9cd4..7efab634 100644 --- a/doc/network.rst +++ b/doc/network.rst @@ -51,7 +51,7 @@ To automatically detect which nodes are present on the network, there is the # We may need to wait a short while here to allow all nodes to respond time.sleep(0.05) for node_id in network.scanner.nodes: - print("Found node %d!" % node_id) + print(f"Found node {node_id}!") Finally, make sure to disconnect after you are done:: diff --git a/doc/od.rst b/doc/od.rst index f31c7813..6040bb5e 100644 --- a/doc/od.rst +++ b/doc/od.rst @@ -39,10 +39,10 @@ Here is an example where the entire object dictionary gets printed out:: node = network.add_node(6, 'od.eds') for obj in node.object_dictionary.values(): - print('0x%X: %s' % (obj.index, obj.name)) + print(f'0x{obj.index:X}: {obj.name}') if isinstance(obj, canopen.objectdictionary.ODRecord): for subobj in obj.values(): - print(' %d: %s' % (subobj.subindex, subobj.name)) + print(f' {subobj.subindex}: {subobj.name}') You can access the objects using either index/subindex or names:: diff --git a/doc/pdo.rst b/doc/pdo.rst index bcef197b..d718ea5f 100644 --- a/doc/pdo.rst +++ b/doc/pdo.rst @@ -65,14 +65,14 @@ starts at 1, not 0):: for i in range(50): node.tpdo[4].wait_for_reception() speed = node.tpdo['Application Status.Actual Speed'].phys - f.write('%s\n' % speed) + f.write(f'{speed}\n') # Using a callback to asynchronously receive values # Do not do any blocking operations here! def print_speed(message): - print('%s received' % message.name) + print(f'{message.name} received') for var in message: - print('%s = %d' % (var.name, var.raw)) + print(f'{var.name} = {var.raw}') node.tpdo[4].add_callback(print_speed) time.sleep(5) diff --git a/doc/sdo.rst b/doc/sdo.rst index 8634d28c..a1436572 100644 --- a/doc/sdo.rst +++ b/doc/sdo.rst @@ -50,7 +50,7 @@ The code below only creates objects, no messages are sent or received yet:: To actually read or write the variables, use the ``.raw``, ``.phys``, ``.desc``, or ``.bits`` attributes:: - print("The device type is 0x%X" % device_type.raw) + print(f"The device type is 0x{device_type.raw:X}") # Using value descriptions instead of integers (if supported by OD) control_mode.desc = 'Speed Mode' @@ -59,11 +59,11 @@ or ``.bits`` attributes:: command_all.bits[3] = 1 # Read and write physical values scaled by a factor (if supported by OD) - print("The actual speed is %f rpm" % actual_speed.phys) + print(f"The actual speed is {actual_speed.phys} rpm") # Iterate over arrays or records for error in error_log.values(): - print("Error 0x%X was found in the log" % error.raw) + print(f"Error 0x{error.raw:X} was found in the log") It is also possible to read and write to variables that are not in the Object Dictionary, but only using raw bytes:: diff --git a/examples/simple_ds402_node.py b/examples/simple_ds402_node.py index c99831a0..5847923d 100644 --- a/examples/simple_ds402_node.py +++ b/examples/simple_ds402_node.py @@ -26,17 +26,17 @@ #node.nmt.state = 'RESET' node.nmt.wait_for_bootup(15) - print('node state 1) = {0}'.format(node.nmt.state)) + print(f'node state 1) = {node.nmt.state}') # Iterate over arrays or records error_log = node.sdo[0x1003] for error in error_log.values(): - print("Error {0} was found in the log".format(error.raw)) + print(f"Error {error.raw} was found in the log") for node_id in network: print(network[node_id]) - print('node state 2) = {0}'.format(node.nmt.state)) + print(f'node state 2) = {node.nmt.state}') # Read a variable using SDO @@ -51,7 +51,7 @@ node.load_configuration() - print('node state 3) = {0}'.format(node.nmt.state)) + print(f'node state 3) = {node.nmt.state}') node.setup_402_state_machine() @@ -63,7 +63,7 @@ node.state = 'SWITCH ON DISABLED' - print('node state 4) = {0}'.format(node.nmt.state)) + print(f'node state 4) = {node.nmt.state}') # Read PDO configuration from node node.tpdo.read() @@ -115,7 +115,7 @@ raise Exception('Timeout when trying to change state') time.sleep(0.001) - print('Node Status {0}'.format(node.powerstate_402.state)) + print(f'Node Status {node.powerstate_402.state}') # ----------------------------------------------------------------------------------------- node.nmt.start_node_guarding(0.01) @@ -132,8 +132,8 @@ # Read the state of the Statusword statusword = node.sdo[0x6041].raw - print('statusword: {0}'.format(statusword)) - print('VEL: {0}'.format(speed)) + print(f'statusword: {statusword}') + print(f'VEL: {speed}') time.sleep(0.01) diff --git a/test/test_eds.py b/test/test_eds.py index a35cbb99..1fe8a6c7 100644 --- a/test/test_eds.py +++ b/test/test_eds.py @@ -2,6 +2,7 @@ import unittest import canopen from canopen.objectdictionary.eds import _signed_int_from_hex +from canopen.utils import pretty_index EDS_PATH = os.path.join(os.path.dirname(__file__), 'sample.eds') @@ -185,7 +186,7 @@ def test_export_eds(self): for doctype in {"eds", "dcf"}: tempfile = str(Path(tempdir, "test." + doctype)) with open(tempfile, "w+") as tempeds: - print("exporting %s to " % doctype + tempeds.name) + print(f"exporting {doctype} to {tempeds.name}") canopen.export_od(self.od, tempeds, doc_type=doctype) exported_od = canopen.import_od(tempfile) @@ -236,15 +237,15 @@ def test_export_eds(self): for evar, avar in zip(expected_vars, actual_vars): self.assertEqual(getattr(avar, "data_type", None), getattr(evar, "data_type", None), - " mismatch on %04X:%X" % (evar.index, evar.subindex)) + f" mismatch on {pretty_index(evar.index, evar.subindex)}") self.assertEqual(getattr(avar, "default_raw", None), getattr(evar, "default_raw", None), - " mismatch on %04X:%X" % (evar.index, evar.subindex)) + f" mismatch on {pretty_index(evar.index, evar.subindex)}") self.assertEqual(getattr(avar, "min", None), getattr(evar, "min", None), - " mismatch on %04X:%X" % (evar.index, evar.subindex)) + f" mismatch on {pretty_index(evar.index, evar.subindex)}") self.assertEqual(getattr(avar, "max", None), getattr(evar, "max", None), - " mismatch on %04X:%X" % (evar.index, evar.subindex)) + f" mismatch on {pretty_index(evar.index, evar.subindex)}") if doctype == "dcf": self.assertEqual(getattr(avar, "value", None), getattr(evar, "value", None), - " mismatch on %04X:%X" % (evar.index, evar.subindex)) + f" mismatch on {pretty_index(evar.index, evar.subindex)}") self.assertEqual(self.od.comments, exported_od.comments) diff --git a/test/test_network.py b/test/test_network.py index 04129271..095648ad 100644 --- a/test/test_network.py +++ b/test/test_network.py @@ -61,7 +61,7 @@ def test_send_perodic(self): time.sleep(0.1) # FIXME: This test is a little fragile, as the number of elements # depends on the timing of the machine. - print("Queue size: %s" % (bus.queue.qsize(),)) + print(f"Queue size: {bus.queue.qsize()}") self.assertTrue(9 <= bus.queue.qsize() <= 13) msg = bus.recv(0) self.assertIsNotNone(msg) diff --git a/test/test_sdo.py b/test/test_sdo.py index c0ba086b..31759c22 100644 --- a/test/test_sdo.py +++ b/test/test_sdo.py @@ -23,11 +23,11 @@ def _send_message(self, can_id, data, remote=False): """ next_data = self.data.pop(0) self.assertEqual(next_data[0], TX, "No transmission was expected") - # print("> %s (%s)" % (binascii.hexlify(data), binascii.hexlify(next_data[1]))) + # print(f"> {binascii.hexlify(data)} ({binascii.hexlify(next_data[1])})") self.assertSequenceEqual(data, next_data[1]) self.assertEqual(can_id, 0x602) while self.data and self.data[0][0] == RX: - # print("< %s" % binascii.hexlify(self.data[0][1])) + # print(f"< {binascii.hexlify(self.data[0][1])}") self.network.notify(0x582, self.data.pop(0)[1], 0.0) def setUp(self): diff --git a/test/test_utils.py b/test/test_utils.py new file mode 100644 index 00000000..b412e365 --- /dev/null +++ b/test/test_utils.py @@ -0,0 +1,20 @@ +import unittest +from canopen.utils import pretty_index + + +class TestUtils(unittest.TestCase): + + def test_pretty_index(self): + self.assertEqual(pretty_index(0x12ab), "0x12AB") + self.assertEqual(pretty_index(0x12ab, 0xcd), "0x12AB:CD") + self.assertEqual(pretty_index(0x12ab, ""), "0x12AB") + self.assertEqual(pretty_index("test"), "'test'") + self.assertEqual(pretty_index("test", 0xcd), "'test':CD") + self.assertEqual(pretty_index(None), "") + self.assertEqual(pretty_index(""), "") + self.assertEqual(pretty_index("", ""), "") + self.assertEqual(pretty_index(None, 0xab), "0xAB") + + +if __name__ == "__main__": + unittest.main()