Skip to content

Commit

Permalink
[thci] Convert python2 script to python3: convert .format(...) style …
Browse files Browse the repository at this point in the history
…string formatting to the modern f-strings
  • Loading branch information
yangsong-cnyn committed Jan 8, 2025
1 parent 6307528 commit a2bf96e
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 80 deletions.
119 changes: 46 additions & 73 deletions tools/commissioner_thci/commissioner_impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,16 +142,15 @@ def __init__(self, config, handler, simulator=None):
self._handler = handler
self._lines = []

self._command('stty cols {}'.format(TTY_COLS))
self._command(f'stty cols {TTY_COLS}')

config_path = '/tmp/commissioner.{}.json'.format(uuid.uuid4())
config_path = f'/tmp/commissioner.{uuid.uuid4()}.json'
self._write_config(config_path=config_path, config=config)

response = self._command('{} init "{}"'.format(COMMISSIONER_CTL,
config_path))
response = self._command(f'{COMMISSIONER_CTL} init "{config_path}"')
if self._command('echo $?')[0] != '0':
raise commissioner.Error('Failed to init, error:\n{}'.format(
'\n'.join(response)))
raise commissioner.Error(
f"Failed to init, error:\n{'\n'.join(response)}")

@staticmethod
def makeLocalCommissioner(config, simulator):
Expand All @@ -166,12 +165,9 @@ def makeHarnessCommissioner(config, serial_handler):
return OTCommissioner(config, serial_handler)

def start(self, borderAgentAddr, borderAgentPort):
self._command('sudo rm {}'.format(self.log_file))
self._command('sudo touch {}'.format(self.log_file))
self._execute_and_check('start {} {}'.format(
borderAgentAddr,
borderAgentPort,
))
self._command(f'sudo rm {self.log_file}')
self._command(f'sudo touch {self.log_file}')
self._execute_and_check(f'start {borderAgentAddr} {borderAgentPort}')

def stop(self):
self._execute_and_check('stop')
Expand All @@ -183,8 +179,7 @@ def isActive(self):
elif 'false' in response[0]:
return False
else:
raise commissioner.Error('Unrecognized result "{}"'.format(
response[0]))
raise commissioner.Error(f'Unrecognized result "{response[0]}"')

def getSessionId(self):
response = self._execute_and_check('sessionid')
Expand Down Expand Up @@ -226,7 +221,7 @@ def MGMT_COMMISSIONER_SET(self, commDataset):
TLV_TYPE_TO_STRING[key]: commDataset[key] for key in commDataset
}
data = json.dumps(dataset)
self._execute_and_check("commdataset set '{}'".format(data))
self._execute_and_check(f"commdataset set '{data}'")

def enableJoiner(self, joinerType, eui64=None, password=None):
command = ['joiner', 'enable', JOINER_TYPE_TO_STRING[joinerType]]
Expand All @@ -253,8 +248,7 @@ def disableJoiner(self, joinerType, eui64=None):

def MGMT_ACTIVE_GET(self, tlvTypes):
types = ' '.join([TLV_TYPE_TO_STRING[x] for x in tlvTypes])
result = self._execute_and_check(
'opdataset get active {}'.format(types))
result = self._execute_and_check(f'opdataset get active {types}')

try:
return OTCommissioner._active_op_dataset_from_json(' '.join(
Expand All @@ -263,13 +257,13 @@ def MGMT_ACTIVE_GET(self, tlvTypes):
raise_(commissioner.Error, repr(e), sys.exc_info()[2])

def MGMT_ACTIVE_SET(self, activeOpDataset):
self._execute_and_check("opdataset set active '{}'".format(
OTCommissioner._active_op_dataset_to_json(activeOpDataset)))
self._execute_and_check(
f"opdataset set active '{OTCommissioner._active_op_dataset_to_json(activeOpDataset)}'"
)

def MGMT_PENDING_GET(self, tlvTypes):
types = ' '.join([TLV_TYPE_TO_STRING[x] for x in tlvTypes])
result = self._execute_and_check(
'opdataset get pending {}'.format(types))
result = self._execute_and_check(f'opdataset get pending {types}')

try:
return OTCommissioner._pending_op_dataset_from_json(' '.join(
Expand All @@ -278,12 +272,13 @@ def MGMT_PENDING_GET(self, tlvTypes):
raise_(commissioner.Error, repr(e), sys.exc_info()[2])

def MGMT_PENDING_SET(self, pendingOpDataset):
self._execute_and_check("opdataset set pending '{}'".format(
OTCommissioner._pending_op_dataset_to_json(pendingOpDataset)))
self._execute_and_check(
f"opdataset set pending '{OTCommissioner._pending_op_dataset_to_json(pendingOpDataset)}'"
)

def MGMT_BBR_GET(self, tlvTypes):
types = ' '.join([TLV_TYPE_TO_STRING[x] for x in tlvTypes])
result = self._execute_and_check('bbrdataset get {}'.format(types))
result = self._execute_and_check(f'bbrdataset get {types}')

try:
result = json.loads(' '.join(result[:-1]))
Expand All @@ -296,49 +291,33 @@ def MGMT_BBR_SET(self, bbrDataset):
TLV_TYPE_TO_STRING[key]: bbrDataset[key] for key in bbrDataset
}
dataset = json.dumps(dataset)
self._execute_and_check("bbrdataset set '{}'".format(dataset))
self._execute_and_check(f"bbrdataset set '{dataset}'")

def MLR(self, multicastAddrs, timeout):
self._execute_and_check('mlr {} {}'.format(
' '.join(multicastAddrs),
timeout,
),
self._execute_and_check(f"mlr {' '.join(multicastAddrs)} {timeout}",
check=False)

def MGMT_ANNOUNCE_BEGIN(self, channelMask, count, period, dstAddr):
self._execute_and_check('announce {} {} {} {}'.format(
channelMask,
count,
period,
dstAddr,
))
self._execute_and_check(
f'announce {channelMask} {count} {period} {dstAddr}')

def MGMT_PANID_QUERY(self, channelMask, panId, dstAddr, timeout):
self._execute_and_check('panid query {} {} {}'.format(
channelMask,
panId,
dstAddr,
))
self._execute_and_check(f'panid query {channelMask} {panId} {dstAddr}')
self._sleep(timeout)
result = self._execute_and_check('panid conflict {}'.format(panId))
result = self._execute_and_check(f'panid conflict {panId}')
result = int(result[0])
return False if result == 0 else True

def MGMT_ED_SCAN(self, channelMask, count, period, scanDuration, dstAddr,
timeout):
self._execute_and_check('energy scan {} {} {} {} {}'.format(
channelMask,
count,
period,
scanDuration,
dstAddr,
))
self._execute_and_check(
f'energy scan {channelMask} {count} {period} {scanDuration} {dstAddr}'
)

self._sleep(timeout)
result = self._execute_and_check('energy report {}'.format(dstAddr))
result = self._execute_and_check(f'energy report {dstAddr}')
if result[0] == 'null':
raise commissioner.Error(
'No energy report found for {}'.format(dstAddr))
raise commissioner.Error(f'No energy report found for {dstAddr}')

try:
result = json.loads(' '.join(result[:-1]))
Expand All @@ -352,31 +331,26 @@ def MGMT_ED_SCAN(self, channelMask, count, period, scanDuration, dstAddr,
raise_(commissioner.Error, repr(e), sys.exc_info()[2])

def MGMT_REENROLL(self, dstAddr):
self._execute_and_check('reenroll {}'.format(dstAddr))
self._execute_and_check(f'reenroll {dstAddr}')

def MGMT_DOMAIN_RESET(self, dstAddr):
self._execute_and_check('domainreset {}'.format(dstAddr))
self._execute_and_check(f'domainreset {dstAddr}')

def MGMT_NET_MIGRATE(self, dstAddr, designatedNetwork):
self._execute_and_check('migrate {} {}'.format(
dstAddr,
designatedNetwork,
))
self._execute_and_check(f'migrate {dstAddr} {designatedNetwork}')

def requestCOM_TOK(self, registrarAddr, registrarPort):
self._execute_and_check('token request {} {}'.format(
registrarAddr,
registrarPort,
))
self._execute_and_check(
f'token request {registrarAddr} {registrarPort}')

def setCOM_TOK(self, signedCOM_TOK):
path_token = '/tmp/commissioner.token.{}'.format(uuid.uuid4())
path_token = f'/tmp/commissioner.token.{uuid.uuid4()}'
step = 40
for i in range(0, len(signedCOM_TOK), step):
data = self._bytes_to_hex(signedCOM_TOK[i:i + step])
self._command('echo {} >> "{}"'.format(data, path_token))
self._command(f'echo {data} >> "{path_token}"')

self._execute_and_check('token set {}'.format(path_token))
self._execute_and_check(f'token set {path_token}')

def getCOM_TOK(self):
result = self._execute_and_check('token print')
Expand Down Expand Up @@ -422,13 +396,12 @@ def getMlrLogs(self):
return processed_logs

def _getThciLogs(self):
return self._command("grep \"\\[ thci \\]\" {}".format(self.log_file))
return self._command(f"grep \"\\[ thci \\]\" {self.log_file}")

def _execute_and_check(self, command, check=True):
# Escape quotes for bash
command = command.replace('"', r'"\""')
response = self._command('{} execute "{}"'.format(
COMMISSIONER_CTL, command))
response = self._command(f'{COMMISSIONER_CTL} execute "{command}"')
if check:
response = OTCommissioner._check_response(response)
return response
Expand Down Expand Up @@ -457,29 +430,29 @@ def _write_config(self, config_path, config):

if config.isCcmMode:
if config.privateKey:
path = '/tmp/commissioner.private_key.{}'.format(uuid.uuid4())
path = f'/tmp/commissioner.private_key.{uuid.uuid4()}'
self._send_file(local_path=config.privateKey, remote_path=path)
data['PrivateKeyFile'] = path
if config.cert:
path = '/tmp/commissioner.cert.{}'.format(uuid.uuid4())
path = f'/tmp/commissioner.cert.{uuid.uuid4()}'
self._send_file(local_path=config.cert, remote_path=path)
data['CertificateFile'] = path
if config.trustAnchor:
path = '/tmp/commissioner.trush_anchor.{}'.format(uuid.uuid4())
path = f'/tmp/commissioner.trush_anchor.{uuid.uuid4()}'
self._send_file(local_path=config.trustAnchor, remote_path=path)
data['TrustAnchorFile'] = path

self._command("echo '{}' >> '{}'".format(json.dumps(data), config_path))
self._command(f"echo '{json.dumps(data)}' >> '{config_path}'")

def _send_file(self, local_path, remote_path):
with open(local_path, 'rb') as f:
b64 = base64.b64encode(f.read()).decode()
self._command('echo "{}" | base64 -d - > "{}"'.format(b64, remote_path))
self._command(f'echo "{b64}" | base64 -d - > "{remote_path}"')

@staticmethod
def _check_response(response):
if response[-1] != '[done]':
raise commissioner.Error('Error message:\n{!r}'.format(response))
raise commissioner.Error(f'Error message:\n{response!r}')
return response

@staticmethod
Expand Down
8 changes: 3 additions & 5 deletions tools/commissioner_thci/commissionerd.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ def _execute_command(self, command, wait_for_result=True):
if not self._process:
raise self._AppException('CLI not started yet')

print('Executing: "{}"'.format(command))
print(f'Executing: "{command}"')
self._process.sendline(command)
if wait_for_result:
try:
Expand All @@ -178,8 +178,7 @@ def _execute_command(self, command, wait_for_result=True):
self._process.kill(sig=signal.SIGINT)
try:
self._process.expect(r'> $', timeout=1)
return 'Timed out executing "{}"\n{}'.format(
command, self._process.before.decode())
return f'Timed out executing "{command}"\n{self._process.before.decode()}'
except pexpect.exceptions.TIMEOUT as e:
raise self._AppException(e)

Expand All @@ -201,8 +200,7 @@ def _exit(self):
def _discard_process_with_error(self):
message = self._process.before.decode()
self._reset_states()
raise self._AppException(
'CLI process exited with error:\n{}'.format(message))
raise self._AppException(f'CLI process exited with error:\n{message}')

def _reset_states(self):
self._process = None
Expand Down
3 changes: 1 addition & 2 deletions tools/commissioner_thci/example_send_mlr.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,7 @@ def test_mlr():
comm.start(BORDER_AGENT_ADDR, BORDER_AGENT_PORT)
assert comm.isActive()

print(("commissioner connected, session ID = {}".format(
comm.getSessionId())))
print(f"commissioner connected, session ID = {comm.getSessionId()}")

## Send MLR.req
comm.MLR([MA1, MA2], 60)
Expand Down

0 comments on commit a2bf96e

Please sign in to comment.