From d86973f319acc8343462dbc41e3a4818b0dbab9e Mon Sep 17 00:00:00 2001 From: Aishwarya Mathuria Date: Thu, 24 Jun 2021 19:50:51 +0530 Subject: [PATCH] report, lock.ops, lock.query: add retry mechanism to paddles requests We need requests to paddles to retry multiple times in case of 500 errors from the DB server. This change adds a retry mechanism to the paddles requests that previously did not have one. Fixes: https://tracker.ceph.com/issues/50921 Signed-off-by: Aishwarya Mathuria --- teuthology/lock/ops.py | 111 +++++++++++++++++++-------------------- teuthology/lock/query.py | 23 ++++---- teuthology/report.py | 48 +++++++++++++---- 3 files changed, 100 insertions(+), 82 deletions(-) diff --git a/teuthology/lock/ops.py b/teuthology/lock/ops.py index 27fb68935e..df2d296a13 100644 --- a/teuthology/lock/ops.py +++ b/teuthology/lock/ops.py @@ -110,39 +110,42 @@ def lock_many(ctx, num, machine_type, user=None, description=None, if arch: data['arch'] = arch log.debug("lock_many request: %s", repr(data)) - response = requests.post( - uri, - data=json.dumps(data), - headers={'content-type': 'application/json'}, - ) - if response.ok: - machines = {misc.canonicalize_hostname(machine['name']): - machine['ssh_pub_key'] for machine in response.json()} - log.debug('locked {machines}'.format( - machines=', '.join(machines.keys()))) - if machine_type in vm_types: - ok_machs = {} - update_nodes(machines, True) - for machine in machines: - if teuthology.provision.create_if_vm(ctx, machine): - ok_machs[machine] = machines[machine] - else: - log.error('Unable to create virtual machine: %s', - machine) - unlock_one(ctx, machine, user) - ok_machs = do_update_keys(list(ok_machs.keys()))[1] - update_nodes(ok_machs) - return ok_machs - elif reimage and machine_type in reimage_types: - return reimage_machines(ctx, machines, machine_type) - return machines - elif response.status_code == 503: - log.error('Insufficient nodes available to lock %d %s nodes.', - num, machine_type) - log.error(response.text) - else: - log.error('Could not lock %d %s nodes, reason: unknown.', - num, machine_type) + inc = random.uniform(0, 1) + with safe_while( + sleep=1, increment=inc, action=f'lock many {machine_type}') as proceed: + while proceed(): + response = requests.post( + uri, + data=json.dumps(data), + headers={'content-type': 'application/json'}, + ) + if response.ok: + machines = {misc.canonicalize_hostname(machine['name']): + machine['ssh_pub_key'] for machine in response.json()} + log.debug('locked {machines}'.format( + machines=', '.join(machines.keys()))) + if machine_type in vm_types: + ok_machs = {} + update_nodes(machines, True) + for machine in machines: + if teuthology.provision.create_if_vm(ctx, machine): + ok_machs[machine] = machines[machine] + else: + log.error('Unable to create virtual machine: %s', + machine) + unlock_one(ctx, machine, user) + ok_machs = do_update_keys(list(ok_machs.keys()))[1] + update_nodes(ok_machs) + return ok_machs + elif reimage and machine_type in reimage_types: + return reimage_machines(ctx, machines, machine_type) + return machines + elif response.status_code == 503: + log.error('Insufficient nodes available to lock %d %s nodes.', + num, machine_type) + log.error(response.text) + log.error('Could not lock %d %s nodes, reason: unknown.', + num, machine_type) return [] @@ -153,18 +156,18 @@ def lock_one(name, user=None, description=None): request = dict(name=name, locked=True, locked_by=user, description=description) uri = os.path.join(config.lock_server, 'nodes', name, 'lock', '') - response = requests.put(uri, json.dumps(request)) - success = response.ok - if success: - log.debug('locked %s as %s', name, user) - else: - try: - reason = response.json().get('message') - except ValueError: - reason = str(response.status_code) - log.error('failed to lock {node}. reason: {reason}'.format( - node=name, reason=reason)) - return response + inc = random.uniform(0, 1) + with safe_while( + sleep=1, increment=inc, action=f'lock one {machine_type}') as proceed: + while proceed(): + response = requests.put(uri, json.dumps(request)) + success = response.ok + if success: + log.debug('locked %s as %s', name, user) + return response + log.error('failed to lock {node}'.format( + node=name)) + response.raise_for_status() def unlock_many(names, user): @@ -204,21 +207,15 @@ def unlock_one(ctx, name, user, description=None): while proceed(): try: response = requests.put(uri, json.dumps(request)) - break + if response.ok: + log.info('Unlocked %s', name) + return True # Work around https://github.com/kennethreitz/requests/issues/2364 except requests.ConnectionError as e: log.warn("Saw %s while unlocking; retrying...", str(e)) - success = response.ok - if success: - log.info('unlocked %s', name) - else: - try: - reason = response.json().get('message') - except ValueError: - reason = str(response.status_code) - log.error('failed to unlock {node}. reason: {reason}'.format( - node=name, reason=reason)) - return success + log.error('Failed to unlock {node}.'.format( + node=name)) + return False def update_lock(name, description=None, status=None, ssh_pub_key=None): diff --git a/teuthology/lock/query.py b/teuthology/lock/query.py index bb1044c2b3..f077bd1c86 100644 --- a/teuthology/lock/query.py +++ b/teuthology/lock/query.py @@ -68,15 +68,13 @@ def list_locks(keyed_by_name=False, **kwargs): try: response = requests.get(uri) if response.ok: - break + if not keyed_by_name: + return response.json() + else: + return {node['name']: node + for node in response.json()} except requests.ConnectionError: log.exception("Could not contact lock server: %s, retrying...", config.lock_server) - if response.ok: - if not keyed_by_name: - return response.json() - else: - return {node['name']: node - for node in response.json()} return dict() @@ -132,13 +130,10 @@ def node_job_is_active(node, cache): while proceed(): resp = requests.get(url) if resp.ok: - break - if not resp.ok: - return False - job_info = resp.json() - if job_info['status'] in ('running', 'waiting'): - cache.add(description) - return True + job_info = resp.json() + if job_info['status'] in ('running', 'waiting'): + cache.add(description) + return True return False result = list() diff --git a/teuthology/report.py b/teuthology/report.py index 2d06356772..8995a1ce58 100644 --- a/teuthology/report.py +++ b/teuthology/report.py @@ -252,11 +252,15 @@ def report_run(self, run_name, dead=False): )) if jobs: if not self.refresh: - response = self.session.head("{base}/runs/{name}/".format( - base=self.base_uri, name=run_name)) - if response.status_code == 200: - self.log.info(" already present; skipped") - return 0 + inc = random.uniform(0, 1) + with safe_while( + sleep=1, increment=inc, action=f'report run {name}') as proceed: + while proceed(): + response = self.session.head("{base}/runs/{name}/".format( + base=self.base_uri, name=run_name)) + if response.status_code == 200: + self.log.info(" already present; skipped") + return 0 self.report_jobs(run_name, jobs.keys(), dead=dead) elif not jobs: self.log.debug(" no jobs; skipped") @@ -369,9 +373,14 @@ def get_jobs(self, run_name, job_id=None, fields=None): if 'job_id' not in fields: fields.append('job_id') uri += "?fields=" + ','.join(fields) - response = self.session.get(uri) + inc = random.uniform(0, 1) + with safe_while( + sleep=1, increment=inc, action=f'get jobs for {run_name}') as proceed: + while proceed(): + response = self.session.get(uri) + if response.status_code == 200: + return response.json() response.raise_for_status() - return response.json() def get_run(self, run_name, fields=None): """ @@ -384,9 +393,14 @@ def get_run(self, run_name, fields=None): uri = "{base}/runs/{name}".format(base=self.base_uri, name=run_name) if fields: uri += "?fields=" + ','.join(fields) - response = self.session.get(uri) + inc = random.uniform(0, 1) + with safe_while( + sleep=1, increment=inc, action=f'get run {run_name}') as proceed: + while proceed(): + response = self.session.get(uri) + if response.status_code == 200: + return response.json() response.raise_for_status() - return response.json() def _parse_log_line(self, line, prefix): # parse log lines like @@ -429,7 +443,13 @@ def delete_job(self, run_name, job_id): """ uri = "{base}/runs/{name}/jobs/{job_id}/".format( base=self.base_uri, name=run_name, job_id=job_id) - response = self.session.delete(uri) + inc = random.uniform(0, 1) + with safe_while( + sleep=1, increment=inc, action=f'delete job {job_id}') as proceed: + while proceed(): + response = self.session.delete(uri) + if response.status_code == 200: + return response.raise_for_status() def delete_jobs(self, run_name, job_ids): @@ -450,7 +470,13 @@ def delete_run(self, run_name): """ uri = "{base}/runs/{name}/".format( base=self.base_uri, name=run_name) - response = self.session.delete(uri) + inc = random.uniform(0, 1) + with safe_while( + sleep=1, increment=inc, action=f'delete run {run_name}') as proceed: + while proceed(): + response = self.session.delete(uri) + if response.status_code == 200: + return response.raise_for_status()