From 8f27e339cb6c49dfa2ce68ede5b47a366e692531 Mon Sep 17 00:00:00 2001 From: Fernando Harald Barreiro Megino Date: Wed, 27 Mar 2024 13:55:05 +0100 Subject: [PATCH 1/7] resource type mapper work --- pandaserver/taskbuffer/OraDBProxy.py | 7 ++++--- pandaserver/taskbuffer/ResourceSpec.py | 29 ++++++++++++++++++++++++++ 2 files changed, 33 insertions(+), 3 deletions(-) diff --git a/pandaserver/taskbuffer/OraDBProxy.py b/pandaserver/taskbuffer/OraDBProxy.py index ec0e4ddfb..dbc11e429 100644 --- a/pandaserver/taskbuffer/OraDBProxy.py +++ b/pandaserver/taskbuffer/OraDBProxy.py @@ -156,6 +156,10 @@ def __init__(self, useOtherError=False): self.__hs_distribution = None # HS06s distribution of sites self.__t_update_distribution = None # Timestamp when the HS06s distribution was last updated + # resource type mapper + self.__resource_type_mapper = None + self.__t_update_resource_type_mapper = None + # priority boost self.job_prio_boost_dict = None self.job_prio_boost_dict_update_time = None @@ -166,9 +170,6 @@ def __init__(self, useOtherError=False): # mb proxy self.mb_proxy_dict = None - # self.__reload_shares() - # self.__reload_hs_distribution() - # connect to DB def connect( self, diff --git a/pandaserver/taskbuffer/ResourceSpec.py b/pandaserver/taskbuffer/ResourceSpec.py index 2f5c5b22a..08b8af09f 100644 --- a/pandaserver/taskbuffer/ResourceSpec.py +++ b/pandaserver/taskbuffer/ResourceSpec.py @@ -6,6 +6,26 @@ from . import JobUtils +class ResourceSpecMapper(object): + def __init__(self, resource_types): + """ + :param resource_types: list of ResourceSpec objects + """ + self.resource_types = resource_types + + def is_single_core(self, resource_name): + for resource_type in self.resource_types: + if resource_type.resource_name == resource_name: + return resource_type.is_single_core() + return False + + def is_multi_core(self, resource_name): + for resource_type in self.resource_types: + if resource_type.resource_name == resource_name: + return resource_type.is_multi_core() + return False + + class ResourceSpec(object): # attributes attributes = ( @@ -105,6 +125,15 @@ def match_job(self, job_spec): return True + def is_single_core(self): + if self.mincore == 1 and self.maxcore == 1: + return True + return False + + def is_multi_core(self): + if self.mincore > 1: + return True + def column_names(cls, prefix=None): """ return column names for DB interactions From c3928a312c20f07e0cb4317444dbd901c9ed8c60 Mon Sep 17 00:00:00 2001 From: Fernando Harald Barreiro Megino Date: Wed, 3 Apr 2024 15:31:38 +0200 Subject: [PATCH 2/7] Removing hardcoded resource types --- pandaserver/taskbuffer/JobUtils.py | 8 ----- pandaserver/taskbuffer/OraDBProxy.py | 43 ++++++++++++++++++-------- pandaserver/taskbuffer/ResourceSpec.py | 17 ++++++++++ pandaserver/taskbuffer/SiteSpec.py | 10 +----- pandaserver/taskbuffer/TaskBuffer.py | 2 +- 5 files changed, 49 insertions(+), 31 deletions(-) diff --git a/pandaserver/taskbuffer/JobUtils.py b/pandaserver/taskbuffer/JobUtils.py index a185768df..9d8080f42 100644 --- a/pandaserver/taskbuffer/JobUtils.py +++ b/pandaserver/taskbuffer/JobUtils.py @@ -26,14 +26,6 @@ priorityTasksToJumpOver = 1500 -def translate_resourcetype_to_cores(resource_type, cores_queue): - # resolve the multiplying core factor - if "MCORE" in resource_type: - return cores_queue - else: - return 1 - - def translate_prodsourcelabel_to_jobtype(queue_type, prodsourcelabel): if prodsourcelabel in analy_sources: return ANALY_PS diff --git a/pandaserver/taskbuffer/OraDBProxy.py b/pandaserver/taskbuffer/OraDBProxy.py index dbc11e429..3b3a687cf 100644 --- a/pandaserver/taskbuffer/OraDBProxy.py +++ b/pandaserver/taskbuffer/OraDBProxy.py @@ -43,7 +43,11 @@ from pandaserver.taskbuffer.FileSpec import FileSpec from pandaserver.taskbuffer.HarvesterMetricsSpec import HarvesterMetricsSpec from pandaserver.taskbuffer.JobSpec import JobSpec, push_status_changes -from pandaserver.taskbuffer.ResourceSpec import ResourceSpec +from pandaserver.taskbuffer.ResourceSpec import ( + BASIC_RESOURCE_TYPE, + ResourceSpec, + ResourceSpecMapper, +) from pandaserver.taskbuffer.SupErrors import SupErrors from pandaserver.taskbuffer.Utils import create_shards from pandaserver.taskbuffer.WorkerSpec import WorkerSpec @@ -157,7 +161,8 @@ def __init__(self, useOtherError=False): self.__t_update_distribution = None # Timestamp when the HS06s distribution was last updated # resource type mapper - self.__resource_type_mapper = None + # if you want to use it, you need to call __reload_resource_spec_mapper first + self.__resource_spec_mapper = None self.__t_update_resource_type_mapper = None # priority boost @@ -16808,7 +16813,7 @@ def setSiteForEsMerge(self, jobSpec, isFakeCJ, methodName, comment): # set score site to ES job def setScoreSiteToEs(self, jobSpec, methodName, comment): - _logger.debug(f"{methodName} looking for SCORE site") + _logger.debug(f"{methodName} looking for single-core site") # get score PQ in the nucleus associated to the site to run the small ES job sqlSN = "SELECT /* use_json_type */ ps2.panda_site_name " @@ -16866,9 +16871,9 @@ def setScoreSiteToEs(self, jobSpec, methodName, comment): jobSpec.resource_type = self.get_resource_type_job(jobSpec) newSiteName = jobSpec.computingSite if newSiteName is not None: - _logger.info(f"{methodName} set SCORE site to {newSiteName}") + _logger.info(f"{methodName} set single-core site to {newSiteName}") else: - _logger.info(f"{methodName} no SCORE site for {jobSpec.computingSite}") + _logger.info(f"{methodName} no single-core site for {jobSpec.computingSite}") # return return @@ -22671,6 +22676,18 @@ def storePilotLog(self, panda_id, pilot_log): _logger.error(f"{comment}: {type} {value}") return -1 + def __reload_resource_spec_mapper(self): + # update once per hour only + if self.__t_update_resource_type_mapper and self.__t_update_resource_type_mapper > datetime.datetime.now() - datetime.timedelta(hours=1): + return + + # get the resource types from the DB and make the ResourceSpecMapper object + resource_types = self.load_resource_types() + if resource_types: + self.__resource_spec_mapper = ResourceSpecMapper(resource_types) + self.__t_update_resource_type_mapper = datetime.datetime.now() + return + def load_resource_types(self, formatting="spec"): """ Load the resource type table to memory @@ -23418,9 +23435,9 @@ def ups_new_worker_distribution(self, queue, worker_stats): n_cores_queued = 0 harvester_ids_temp = list(worker_stats) - # HIMEM resource types group + # HIMEM limit HIMEM = "HIMEM" - HIMEM_RTS = ["SCORE_HIMEM", "MCORE_HIMEM"] + self.__reload_resource_spec_mapper() # get the configuration for maximum workers of each type pq_data_des = self.get_config_for_pq(queue) @@ -23469,7 +23486,7 @@ def ups_new_worker_distribution(self, queue, worker_stats): for job_type in worker_stats[harvester_id]: workers_queued.setdefault(job_type, {}) for resource_type in worker_stats[harvester_id][job_type]: - core_factor = JobUtils.translate_resourcetype_to_cores(resource_type, cores_queue) + core_factor = self.__resource_spec_mapper.translate_resourcetype_to_cores(resource_type, cores_queue) try: n_cores_running = n_cores_running + worker_stats[harvester_id][job_type][resource_type]["running"] * core_factor @@ -23481,7 +23498,7 @@ def ups_new_worker_distribution(self, queue, worker_stats): tmpLog.debug(f"Limit for rt {resource_type} down to {resource_type_limits[resource_type]}") # This limit is in #CORES, since it mixes single and multi core jobs - if resource_type in HIMEM_RTS and HIMEM in resource_type_limits: + if self.__resource_spec_mapper.is_high_memory(resource_type) and HIMEM in resource_type_limits: resource_type_limits[HIMEM] = ( resource_type_limits[HIMEM] - worker_stats[harvester_id][job_type][resource_type]["running"] * core_factor ) @@ -23546,7 +23563,7 @@ def ups_new_worker_distribution(self, queue, worker_stats): activated_jobs = self.cur.fetchall() tmpLog.debug(f"Processing share: {share.name}. Got {len(activated_jobs)} activated jobs") for gshare, prodsourcelabel, resource_type in activated_jobs: - core_factor = JobUtils.translate_resourcetype_to_cores(resource_type, cores_queue) + core_factor = self.__resource_spec_mapper.translate_resourcetype_to_cores(resource_type, cores_queue) # translate prodsourcelabel to a subset of job types, typically 'user' and 'managed' job_type = JobUtils.translate_prodsourcelabel_to_jobtype(queue_type, prodsourcelabel) @@ -23556,7 +23573,7 @@ def ups_new_worker_distribution(self, queue, worker_stats): continue # if we reached the limit for the HIMEM resource type group, skip the job - if resource_type in HIMEM_RTS and HIMEM in resource_type_limits and resource_type_limits[HIMEM] <= 0: + if self.__resource_spec_mapper.is_high_memory(resource_type) and HIMEM in resource_type_limits and resource_type_limits[HIMEM] <= 0: # tmpLog.debug('Reached resource type limit for {0}'.format(resource_type)) continue @@ -23590,7 +23607,7 @@ def ups_new_worker_distribution(self, queue, worker_stats): # we don't have enough workers for this resource type new_workers[job_type][resource_type] = -workers_queued[job_type][resource_type] + 1 - # We should still submit a SCORE worker, even if there are no activated jobs to avoid queue deactivation + # We should still submit a basic worker, even if there are no activated jobs to avoid queue deactivation workers = False for job_type in new_workers: for resource_type in new_workers[job_type]: @@ -23598,7 +23615,7 @@ def ups_new_worker_distribution(self, queue, worker_stats): workers = True break if not workers: - new_workers["managed"] = {"SCORE": 1} + new_workers["managed"] = {BASIC_RESOURCE_TYPE: 1} # In case multiple harvester instances are serving a panda queue, split workers evenly between them new_workers_per_harvester = {} diff --git a/pandaserver/taskbuffer/ResourceSpec.py b/pandaserver/taskbuffer/ResourceSpec.py index 08b8af09f..dc5d683a5 100644 --- a/pandaserver/taskbuffer/ResourceSpec.py +++ b/pandaserver/taskbuffer/ResourceSpec.py @@ -5,6 +5,9 @@ from . import JobUtils +HIMEM_THRESHOLD = 2000 # MB per core +BASIC_RESOURCE_TYPE = "SCORE" + class ResourceSpecMapper(object): def __init__(self, resource_types): @@ -25,6 +28,20 @@ def is_multi_core(self, resource_name): return resource_type.is_multi_core() return False + def is_high_memory(self, resource_name): + for resource_type in self.resource_types: + if resource_type.resource_name == resource_name: + if resource_type.minrampercore > HIMEM_THRESHOLD: + return True + return False + + def translate_resourcetype_to_cores(self, resource_name, cores_queue): + # if the resource type is multi-core, return the number of cores in the queue + if self.is_multi_core(resource_name): + return cores_queue + + return 1 + class ResourceSpec(object): # attributes diff --git a/pandaserver/taskbuffer/SiteSpec.py b/pandaserver/taskbuffer/SiteSpec.py index 409dbda50..a31d4a146 100644 --- a/pandaserver/taskbuffer/SiteSpec.py +++ b/pandaserver/taskbuffer/SiteSpec.py @@ -82,14 +82,6 @@ def isDirectIO(self): return True return False - # get resource type - def getResourceType(self): - if self.type == "analysis": - return "ANALY" - if self.coreCount > 1: - return "MCORE" - return "SCORE" - # check what type of jobs are allowed def getJobSeed(self): tmpVal = self.jobseed @@ -157,7 +149,7 @@ def get_n_sim_events(self): return None return int(tmpVal) - # get minimum of remainig events for jumbo jobs + # get minimum of remaining events for jumbo jobs def getMinEventsForJumbo(self): tmpVal = self.getValueFromCatchall("minEventsForJumbo") if tmpVal is None: diff --git a/pandaserver/taskbuffer/TaskBuffer.py b/pandaserver/taskbuffer/TaskBuffer.py index aa3db9123..58a9bd57d 100755 --- a/pandaserver/taskbuffer/TaskBuffer.py +++ b/pandaserver/taskbuffer/TaskBuffer.py @@ -3580,7 +3580,7 @@ def commandToHarvester( def getResourceTypes(self): """ - Get resource types (SCORE, MCORE, SCORE_HIMEM, MCORE_HIMEM) and their definitions + Get resource types (SCORE, MCORE, ...) and their definitions """ # get DB proxy proxy = self.proxyPool.getProxy() From ba070cb27b17f71ce6a4ee0ad7c8710c97b81c63 Mon Sep 17 00:00:00 2001 From: Fernando Harald Barreiro Megino Date: Wed, 3 Apr 2024 18:06:27 +0200 Subject: [PATCH 3/7] Removing hardcoded resource types --- pandaserver/taskbuffer/OraDBProxy.py | 347 ++++++++++++++------------- 1 file changed, 176 insertions(+), 171 deletions(-) diff --git a/pandaserver/taskbuffer/OraDBProxy.py b/pandaserver/taskbuffer/OraDBProxy.py index 3b3a687cf..56b76f531 100644 --- a/pandaserver/taskbuffer/OraDBProxy.py +++ b/pandaserver/taskbuffer/OraDBProxy.py @@ -23428,209 +23428,214 @@ def ups_new_worker_distribution(self, queue, worker_stats): comment = " /* DBProxy.ups_new_worker_distribution */" method_name = comment.split(" ")[-2].split(".")[-1] - tmpLog = LogWrapper(_logger, f"{method_name}-{queue}") - tmpLog.debug("start") + tmp_log = LogWrapper(_logger, f"{method_name}-{queue}") + tmp_log.debug("start") n_cores_running = 0 workers_queued = {} n_cores_queued = 0 harvester_ids_temp = list(worker_stats) - # HIMEM limit - HIMEM = "HIMEM" - self.__reload_resource_spec_mapper() + try: + HIMEM = "HIMEM" + self.__reload_resource_spec_mapper() - # get the configuration for maximum workers of each type - pq_data_des = self.get_config_for_pq(queue) - resource_type_limits = {} - queue_type = "production" - if not pq_data_des: - tmpLog.debug("Error retrieving queue configuration from DB, limits can not be applied") - else: - try: - resource_type_limits = pq_data_des["uconfig"]["resource_type_limits"] - except KeyError: - tmpLog.debug("No resource type limits") - pass - try: - queue_type = pq_data_des["type"] - except KeyError: - tmpLog.error("No queue type") - pass + # get the configuration for maximum workers of each type + pq_data_des = self.get_config_for_pq(queue) + resource_type_limits = {} + queue_type = "production" + if not pq_data_des: + tmp_log.debug("Error retrieving queue configuration from DB, limits can not be applied") + else: + try: + resource_type_limits = pq_data_des["uconfig"]["resource_type_limits"] + except KeyError: + tmp_log.debug("No resource type limits") + pass + try: + queue_type = pq_data_des["type"] + except KeyError: + tmp_log.error("No queue type") + pass + try: + cores_queue = pq_data_des["corecount"] + if not cores_queue: + cores_queue = 1 + except KeyError: + tmp_log.error("No corecount") + pass + + # Retrieve the assigned harvester instance and submit UPS commands only to this instance. We have had multiple + # cases of test instances submitting to large queues in classic pull mode and not following commands. try: - cores_queue = pq_data_des["corecount"] - if not cores_queue: - cores_queue = 1 - except KeyError: - tmpLog.error("No corecount") - pass + assigned_harvester_id = pq_data_des["harvester"] + except KeyErrorException: + assigned_harvester_id = None - # Retrieve the assigned harvester instance and submit UPS commands only to this instance. We have had multiple - # cases of test instances submitting to large queues in classic pull mode and not following commands. - try: - assigned_harvester_id = pq_data_des["harvester"] - except KeyErrorException: - assigned_harvester_id = None + harvester_ids = [] + # If the assigned instance is working, use it for the statistics + if assigned_harvester_id in harvester_ids_temp: + harvester_ids = [assigned_harvester_id] - harvester_ids = [] - # If the assigned instance is working, use it for the statistics - if assigned_harvester_id in harvester_ids_temp: - harvester_ids = [assigned_harvester_id] + # Filter central harvester instances that support UPS model + else: + for harvester_id in harvester_ids_temp: + if "ACT" not in harvester_id and "test_fbarreir" not in harvester_id and "cern_cloud" not in harvester_id: + harvester_ids.append(harvester_id) + + for harvester_id in harvester_ids: + for job_type in worker_stats[harvester_id]: + workers_queued.setdefault(job_type, {}) + for resource_type in worker_stats[harvester_id][job_type]: + core_factor = self.__resource_spec_mapper.translate_resourcetype_to_cores(resource_type, cores_queue) + try: + n_cores_running = n_cores_running + worker_stats[harvester_id][job_type][resource_type]["running"] * core_factor - # Filter central harvester instances that support UPS model - else: - for harvester_id in harvester_ids_temp: - if "ACT" not in harvester_id and "test_fbarreir" not in harvester_id and "cern_cloud" not in harvester_id: - harvester_ids.append(harvester_id) - - for harvester_id in harvester_ids: - for job_type in worker_stats[harvester_id]: - workers_queued.setdefault(job_type, {}) - for resource_type in worker_stats[harvester_id][job_type]: - core_factor = self.__resource_spec_mapper.translate_resourcetype_to_cores(resource_type, cores_queue) - try: - n_cores_running = n_cores_running + worker_stats[harvester_id][job_type][resource_type]["running"] * core_factor + # This limit is in #JOBS or #WORKERS + if resource_type in resource_type_limits: + resource_type_limits[resource_type] = ( + resource_type_limits[resource_type] - worker_stats[harvester_id][job_type][resource_type]["running"] + ) + tmp_log.debug(f"Limit for rt {resource_type} down to {resource_type_limits[resource_type]}") - # This limit is in #JOBS or #WORKERS - if resource_type in resource_type_limits: - resource_type_limits[resource_type] = ( - resource_type_limits[resource_type] - worker_stats[harvester_id][job_type][resource_type]["running"] + # This limit is in #CORES, since it mixes single and multi core jobs + if self.__resource_spec_mapper.is_high_memory(resource_type) and HIMEM in resource_type_limits: + resource_type_limits[HIMEM] = ( + resource_type_limits[HIMEM] - worker_stats[harvester_id][job_type][resource_type]["running"] * core_factor + ) + tmp_log.debug(f"Limit for rt group {HIMEM} down to {resource_type_limits[HIMEM]}") + + except KeyError: + pass + + try: # submitted + workers_queued[job_type].setdefault(resource_type, 0) + workers_queued[job_type][resource_type] = ( + workers_queued[job_type][resource_type] + worker_stats[harvester_id][job_type][resource_type]["submitted"] ) - tmpLog.debug(f"Limit for rt {resource_type} down to {resource_type_limits[resource_type]}") + n_cores_queued = n_cores_queued + worker_stats[harvester_id][job_type][resource_type]["submitted"] * core_factor + except KeyError: + pass - # This limit is in #CORES, since it mixes single and multi core jobs - if self.__resource_spec_mapper.is_high_memory(resource_type) and HIMEM in resource_type_limits: - resource_type_limits[HIMEM] = ( - resource_type_limits[HIMEM] - worker_stats[harvester_id][job_type][resource_type]["running"] * core_factor + try: # ready + workers_queued[job_type].setdefault(resource_type, 0) + workers_queued[job_type][resource_type] = ( + workers_queued[job_type][resource_type] + worker_stats[harvester_id][job_type][resource_type]["ready"] ) - tmpLog.debug(f"Limit for rt group {HIMEM} down to {resource_type_limits[HIMEM]}") + n_cores_queued = n_cores_queued + worker_stats[harvester_id][job_type][resource_type]["ready"] * core_factor + except KeyError: + pass - except KeyError: - pass + tmp_log.debug(f"Queue {queue} queued worker overview: {workers_queued}") - try: # submitted - workers_queued[job_type].setdefault(resource_type, 0) - workers_queued[job_type][resource_type] = ( - workers_queued[job_type][resource_type] + worker_stats[harvester_id][job_type][resource_type]["submitted"] - ) - n_cores_queued = n_cores_queued + worker_stats[harvester_id][job_type][resource_type]["submitted"] * core_factor - except KeyError: - pass - - try: # ready - workers_queued[job_type].setdefault(resource_type, 0) - workers_queued[job_type][resource_type] = ( - workers_queued[job_type][resource_type] + worker_stats[harvester_id][job_type][resource_type]["ready"] - ) - n_cores_queued = n_cores_queued + worker_stats[harvester_id][job_type][resource_type]["ready"] * core_factor - except KeyError: - pass + # For queues that need more pressure towards reaching a target + n_cores_running_fake = 0 + try: + if pq_data_des["status"] in [ + "online", + "brokeroff", + ]: # don't flood test sites with workers + n_cores_running_fake = pq_data_des["params"]["ups_core_target"] + tmp_log.debug(f"Using ups_core_target {n_cores_running_fake} for queue {queue}") + except KeyError: # no value defined in CRIC + pass - tmpLog.debug(f"Queue {queue} queued worker overview: {workers_queued}") + n_cores_running = max(n_cores_running, n_cores_running_fake) - # For queues that need more pressure towards reaching a target - n_cores_running_fake = 0 - try: - if pq_data_des["status"] in [ - "online", - "brokeroff", - ]: # don't flood test sites with workers - n_cores_running_fake = pq_data_des["params"]["ups_core_target"] - tmpLog.debug(f"Using ups_core_target {n_cores_running_fake} for queue {queue}") - except KeyError: # no value defined in AGIS - pass + n_cores_target = max(int(n_cores_running * 0.4), 75 * cores_queue) + n_cores_to_submit = max(n_cores_target - n_cores_queued, 5 * cores_queue) + tmp_log.debug( + f"IN CORES: nrunning {n_cores_running}, ntarget {n_cores_target}, nqueued {n_cores_queued}. We need to process {n_cores_to_submit} cores" + ) - n_cores_running = max(n_cores_running, n_cores_running_fake) + # Get the sorted global shares + sorted_shares = self.get_sorted_leaves() - n_cores_target = max(int(n_cores_running * 0.4), 75 * cores_queue) - n_cores_to_submit = max(n_cores_target - n_cores_queued, 5 * cores_queue) - tmpLog.debug(f"IN CORES: nrunning {n_cores_running}, ntarget {n_cores_target}, nqueued {n_cores_queued}. We need to process {n_cores_to_submit} cores") + # Run over the activated jobs by gshare & priority, and substract them from the queued + # A negative value for queued will mean more pilots of that resource type are missing + for share in sorted_shares: + var_map = {":queue": queue, ":gshare": share.name} + sql = f""" + SELECT gshare, prodsourcelabel, resource_type FROM {panda_config.schemaPANDA}.jobsactive4 + WHERE jobstatus = 'activated' + AND computingsite=:queue + AND gshare=:gshare + ORDER BY currentpriority DESC + """ + self.cur.execute(sql + comment, var_map) + activated_jobs = self.cur.fetchall() + tmp_log.debug(f"Processing share: {share.name}. Got {len(activated_jobs)} activated jobs") + for gshare, prodsourcelabel, resource_type in activated_jobs: + core_factor = self.__resource_spec_mapper.translate_resourcetype_to_cores(resource_type, cores_queue) - # Get the sorted global shares - sorted_shares = self.get_sorted_leaves() + # translate prodsourcelabel to a subset of job types, typically 'user' and 'managed' + job_type = JobUtils.translate_prodsourcelabel_to_jobtype(queue_type, prodsourcelabel) + # if we reached the limit for the resource type, skip the job + if resource_type in resource_type_limits and resource_type_limits[resource_type] <= 0: + # tmp_log.debug('Reached resource type limit for {0}'.format(resource_type)) + continue - # Run over the activated jobs by gshare & priority, and substract them from the queued - # A negative value for queued will mean more pilots of that resource type are missing - for share in sorted_shares: - var_map = {":queue": queue, ":gshare": share.name} - sql = f""" - SELECT gshare, prodsourcelabel, resource_type FROM {panda_config.schemaPANDA}.jobsactive4 - WHERE jobstatus = 'activated' - AND computingsite=:queue - AND gshare=:gshare - ORDER BY currentpriority DESC - """ - self.cur.execute(sql + comment, var_map) - activated_jobs = self.cur.fetchall() - tmpLog.debug(f"Processing share: {share.name}. Got {len(activated_jobs)} activated jobs") - for gshare, prodsourcelabel, resource_type in activated_jobs: - core_factor = self.__resource_spec_mapper.translate_resourcetype_to_cores(resource_type, cores_queue) - - # translate prodsourcelabel to a subset of job types, typically 'user' and 'managed' - job_type = JobUtils.translate_prodsourcelabel_to_jobtype(queue_type, prodsourcelabel) - # if we reached the limit for the resource type, skip the job - if resource_type in resource_type_limits and resource_type_limits[resource_type] <= 0: - # tmpLog.debug('Reached resource type limit for {0}'.format(resource_type)) - continue + # if we reached the limit for the HIMEM resource type group, skip the job + if self.__resource_spec_mapper.is_high_memory(resource_type) and HIMEM in resource_type_limits and resource_type_limits[HIMEM] <= 0: + # tmp_log.debug('Reached resource type limit for {0}'.format(resource_type)) + continue - # if we reached the limit for the HIMEM resource type group, skip the job - if self.__resource_spec_mapper.is_high_memory(resource_type) and HIMEM in resource_type_limits and resource_type_limits[HIMEM] <= 0: - # tmpLog.debug('Reached resource type limit for {0}'.format(resource_type)) - continue + workers_queued.setdefault(job_type, {}) + workers_queued[job_type].setdefault(resource_type, 0) + workers_queued[job_type][resource_type] = workers_queued[job_type][resource_type] - 1 + if workers_queued[job_type][resource_type] <= 0: + # we've gone over the jobs that already have a queued worker, now we go for new workers + n_cores_to_submit = n_cores_to_submit - core_factor - workers_queued.setdefault(job_type, {}) - workers_queued[job_type].setdefault(resource_type, 0) - workers_queued[job_type][resource_type] = workers_queued[job_type][resource_type] - 1 - if workers_queued[job_type][resource_type] <= 0: - # we've gone over the jobs that already have a queued worker, now we go for new workers - n_cores_to_submit = n_cores_to_submit - core_factor + # We reached the number of workers needed + if n_cores_to_submit <= 0: + tmp_log.debug("Reached cores needed (inner)") + break # We reached the number of workers needed if n_cores_to_submit <= 0: - tmpLog.debug("Reached cores needed (inner)") - break - - # We reached the number of workers needed - if n_cores_to_submit <= 0: - tmpLog.debug("Reached cores needed (outer)") - break - - tmpLog.debug(f"workers_queued: {workers_queued}") - - new_workers = {} - for job_type in workers_queued: - new_workers.setdefault(job_type, {}) - for resource_type in workers_queued[job_type]: - if workers_queued[job_type][resource_type] >= 0: - # we have too many workers queued already, don't submit more - new_workers[job_type][resource_type] = 0 - elif workers_queued[job_type][resource_type] < 0: - # we don't have enough workers for this resource type - new_workers[job_type][resource_type] = -workers_queued[job_type][resource_type] + 1 - - # We should still submit a basic worker, even if there are no activated jobs to avoid queue deactivation - workers = False - for job_type in new_workers: - for resource_type in new_workers[job_type]: - if new_workers[job_type][resource_type] > 0: - workers = True + tmp_log.debug("Reached cores needed (outer)") break - if not workers: - new_workers["managed"] = {BASIC_RESOURCE_TYPE: 1} - # In case multiple harvester instances are serving a panda queue, split workers evenly between them - new_workers_per_harvester = {} - for harvester_id in harvester_ids: - new_workers_per_harvester.setdefault(harvester_id, {}) + tmp_log.debug(f"workers_queued: {workers_queued}") + + new_workers = {} + for job_type in workers_queued: + new_workers.setdefault(job_type, {}) + for resource_type in workers_queued[job_type]: + if workers_queued[job_type][resource_type] >= 0: + # we have too many workers queued already, don't submit more + new_workers[job_type][resource_type] = 0 + elif workers_queued[job_type][resource_type] < 0: + # we don't have enough workers for this resource type + new_workers[job_type][resource_type] = -workers_queued[job_type][resource_type] + 1 + + # We should still submit a basic worker, even if there are no activated jobs to avoid queue deactivation + workers = False for job_type in new_workers: - new_workers_per_harvester[harvester_id].setdefault(job_type, {}) for resource_type in new_workers[job_type]: - new_workers_per_harvester[harvester_id][job_type][resource_type] = int( - math.ceil(new_workers[job_type][resource_type] * 1.0 / len(harvester_ids)) - ) + if new_workers[job_type][resource_type] > 0: + workers = True + break + if not workers: + new_workers["managed"] = {BASIC_RESOURCE_TYPE: 1} + + # In case multiple harvester instances are serving a panda queue, split workers evenly between them + new_workers_per_harvester = {} + for harvester_id in harvester_ids: + new_workers_per_harvester.setdefault(harvester_id, {}) + for job_type in new_workers: + new_workers_per_harvester[harvester_id].setdefault(job_type, {}) + for resource_type in new_workers[job_type]: + new_workers_per_harvester[harvester_id][job_type][resource_type] = int( + math.ceil(new_workers[job_type][resource_type] * 1.0 / len(harvester_ids)) + ) - tmpLog.debug(f"Workers to submit: {new_workers_per_harvester}") - tmpLog.debug("done") - return new_workers_per_harvester + tmp_log.debug(f"Workers to submit: {new_workers_per_harvester}") + tmp_log.debug("done") + return new_workers_per_harvester + except: + tmp_log.error(traceback.format_exc()) + return {} # get active consumers def getActiveConsumers(self, jediTaskID, jobsetID, myPandaID): From 4dded983c4fee527f08976562131ec433e952a61 Mon Sep 17 00:00:00 2001 From: Fernando Harald Barreiro Megino Date: Wed, 3 Apr 2024 18:32:49 +0200 Subject: [PATCH 4/7] Removing hardcoded resource types - bugs --- pandaserver/taskbuffer/ResourceSpec.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pandaserver/taskbuffer/ResourceSpec.py b/pandaserver/taskbuffer/ResourceSpec.py index dc5d683a5..9c19d5691 100644 --- a/pandaserver/taskbuffer/ResourceSpec.py +++ b/pandaserver/taskbuffer/ResourceSpec.py @@ -31,7 +31,7 @@ def is_multi_core(self, resource_name): def is_high_memory(self, resource_name): for resource_type in self.resource_types: if resource_type.resource_name == resource_name: - if resource_type.minrampercore > HIMEM_THRESHOLD: + if resource_type.minrampercore is not None and resource_type.minrampercore > HIMEM_THRESHOLD: return True return False @@ -143,12 +143,12 @@ def match_job(self, job_spec): return True def is_single_core(self): - if self.mincore == 1 and self.maxcore == 1: + if self.mincore is not None and self.mincore == 1 and self.maxcore is not None and self.maxcore == 1: return True return False def is_multi_core(self): - if self.mincore > 1: + if self.mincore is not None and self.mincore > 1: return True def column_names(cls, prefix=None): From 7e1263b3ffc685f5236599505adbef8bac0da553 Mon Sep 17 00:00:00 2001 From: Fernando Harald Barreiro Megino Date: Wed, 3 Apr 2024 18:40:11 +0200 Subject: [PATCH 5/7] Removing hardcoded resource types --- pandaserver/taskbuffer/OraDBProxy.py | 344 +++++++++++++-------------- 1 file changed, 169 insertions(+), 175 deletions(-) diff --git a/pandaserver/taskbuffer/OraDBProxy.py b/pandaserver/taskbuffer/OraDBProxy.py index 56b76f531..ae5b4a6ad 100644 --- a/pandaserver/taskbuffer/OraDBProxy.py +++ b/pandaserver/taskbuffer/OraDBProxy.py @@ -23435,207 +23435,201 @@ def ups_new_worker_distribution(self, queue, worker_stats): n_cores_queued = 0 harvester_ids_temp = list(worker_stats) - try: - HIMEM = "HIMEM" - self.__reload_resource_spec_mapper() - - # get the configuration for maximum workers of each type - pq_data_des = self.get_config_for_pq(queue) - resource_type_limits = {} - queue_type = "production" - if not pq_data_des: - tmp_log.debug("Error retrieving queue configuration from DB, limits can not be applied") - else: - try: - resource_type_limits = pq_data_des["uconfig"]["resource_type_limits"] - except KeyError: - tmp_log.debug("No resource type limits") - pass - try: - queue_type = pq_data_des["type"] - except KeyError: - tmp_log.error("No queue type") - pass - try: - cores_queue = pq_data_des["corecount"] - if not cores_queue: - cores_queue = 1 - except KeyError: - tmp_log.error("No corecount") - pass - - # Retrieve the assigned harvester instance and submit UPS commands only to this instance. We have had multiple - # cases of test instances submitting to large queues in classic pull mode and not following commands. + HIMEM = "HIMEM" + self.__reload_resource_spec_mapper() + + # get the configuration for maximum workers of each type + pq_data_des = self.get_config_for_pq(queue) + resource_type_limits = {} + queue_type = "production" + if not pq_data_des: + tmp_log.debug("Error retrieving queue configuration from DB, limits can not be applied") + else: try: - assigned_harvester_id = pq_data_des["harvester"] - except KeyErrorException: - assigned_harvester_id = None - - harvester_ids = [] - # If the assigned instance is working, use it for the statistics - if assigned_harvester_id in harvester_ids_temp: - harvester_ids = [assigned_harvester_id] - - # Filter central harvester instances that support UPS model - else: - for harvester_id in harvester_ids_temp: - if "ACT" not in harvester_id and "test_fbarreir" not in harvester_id and "cern_cloud" not in harvester_id: - harvester_ids.append(harvester_id) - - for harvester_id in harvester_ids: - for job_type in worker_stats[harvester_id]: - workers_queued.setdefault(job_type, {}) - for resource_type in worker_stats[harvester_id][job_type]: - core_factor = self.__resource_spec_mapper.translate_resourcetype_to_cores(resource_type, cores_queue) - try: - n_cores_running = n_cores_running + worker_stats[harvester_id][job_type][resource_type]["running"] * core_factor + resource_type_limits = pq_data_des["uconfig"]["resource_type_limits"] + except KeyError: + tmp_log.debug("No resource type limits") + pass + try: + queue_type = pq_data_des["type"] + except KeyError: + tmp_log.error("No queue type") + pass + try: + cores_queue = pq_data_des["corecount"] + if not cores_queue: + cores_queue = 1 + except KeyError: + tmp_log.error("No corecount") + pass - # This limit is in #JOBS or #WORKERS - if resource_type in resource_type_limits: - resource_type_limits[resource_type] = ( - resource_type_limits[resource_type] - worker_stats[harvester_id][job_type][resource_type]["running"] - ) - tmp_log.debug(f"Limit for rt {resource_type} down to {resource_type_limits[resource_type]}") + # Retrieve the assigned harvester instance and submit UPS commands only to this instance. We have had multiple + # cases of test instances submitting to large queues in classic pull mode and not following commands. + try: + assigned_harvester_id = pq_data_des["harvester"] + except KeyErrorException: + assigned_harvester_id = None - # This limit is in #CORES, since it mixes single and multi core jobs - if self.__resource_spec_mapper.is_high_memory(resource_type) and HIMEM in resource_type_limits: - resource_type_limits[HIMEM] = ( - resource_type_limits[HIMEM] - worker_stats[harvester_id][job_type][resource_type]["running"] * core_factor - ) - tmp_log.debug(f"Limit for rt group {HIMEM} down to {resource_type_limits[HIMEM]}") + harvester_ids = [] + # If the assigned instance is working, use it for the statistics + if assigned_harvester_id in harvester_ids_temp: + harvester_ids = [assigned_harvester_id] - except KeyError: - pass + # Filter central harvester instances that support UPS model + else: + for harvester_id in harvester_ids_temp: + if "ACT" not in harvester_id and "test_fbarreir" not in harvester_id and "cern_cloud" not in harvester_id: + harvester_ids.append(harvester_id) + + for harvester_id in harvester_ids: + for job_type in worker_stats[harvester_id]: + workers_queued.setdefault(job_type, {}) + for resource_type in worker_stats[harvester_id][job_type]: + core_factor = self.__resource_spec_mapper.translate_resourcetype_to_cores(resource_type, cores_queue) + try: + n_cores_running = n_cores_running + worker_stats[harvester_id][job_type][resource_type]["running"] * core_factor - try: # submitted - workers_queued[job_type].setdefault(resource_type, 0) - workers_queued[job_type][resource_type] = ( - workers_queued[job_type][resource_type] + worker_stats[harvester_id][job_type][resource_type]["submitted"] + # This limit is in #JOBS or #WORKERS + if resource_type in resource_type_limits: + resource_type_limits[resource_type] = ( + resource_type_limits[resource_type] - worker_stats[harvester_id][job_type][resource_type]["running"] ) - n_cores_queued = n_cores_queued + worker_stats[harvester_id][job_type][resource_type]["submitted"] * core_factor - except KeyError: - pass + tmp_log.debug(f"Limit for rt {resource_type} down to {resource_type_limits[resource_type]}") - try: # ready - workers_queued[job_type].setdefault(resource_type, 0) - workers_queued[job_type][resource_type] = ( - workers_queued[job_type][resource_type] + worker_stats[harvester_id][job_type][resource_type]["ready"] + # This limit is in #CORES, since it mixes single and multi core jobs + if self.__resource_spec_mapper.is_high_memory(resource_type) and HIMEM in resource_type_limits: + resource_type_limits[HIMEM] = ( + resource_type_limits[HIMEM] - worker_stats[harvester_id][job_type][resource_type]["running"] * core_factor ) - n_cores_queued = n_cores_queued + worker_stats[harvester_id][job_type][resource_type]["ready"] * core_factor - except KeyError: - pass + tmp_log.debug(f"Limit for rt group {HIMEM} down to {resource_type_limits[HIMEM]}") - tmp_log.debug(f"Queue {queue} queued worker overview: {workers_queued}") + except KeyError: + pass - # For queues that need more pressure towards reaching a target - n_cores_running_fake = 0 - try: - if pq_data_des["status"] in [ - "online", - "brokeroff", - ]: # don't flood test sites with workers - n_cores_running_fake = pq_data_des["params"]["ups_core_target"] - tmp_log.debug(f"Using ups_core_target {n_cores_running_fake} for queue {queue}") - except KeyError: # no value defined in CRIC - pass + try: # submitted + workers_queued[job_type].setdefault(resource_type, 0) + workers_queued[job_type][resource_type] = ( + workers_queued[job_type][resource_type] + worker_stats[harvester_id][job_type][resource_type]["submitted"] + ) + n_cores_queued = n_cores_queued + worker_stats[harvester_id][job_type][resource_type]["submitted"] * core_factor + except KeyError: + pass + + try: # ready + workers_queued[job_type].setdefault(resource_type, 0) + workers_queued[job_type][resource_type] = ( + workers_queued[job_type][resource_type] + worker_stats[harvester_id][job_type][resource_type]["ready"] + ) + n_cores_queued = n_cores_queued + worker_stats[harvester_id][job_type][resource_type]["ready"] * core_factor + except KeyError: + pass - n_cores_running = max(n_cores_running, n_cores_running_fake) + tmp_log.debug(f"Queue {queue} queued worker overview: {workers_queued}") - n_cores_target = max(int(n_cores_running * 0.4), 75 * cores_queue) - n_cores_to_submit = max(n_cores_target - n_cores_queued, 5 * cores_queue) - tmp_log.debug( - f"IN CORES: nrunning {n_cores_running}, ntarget {n_cores_target}, nqueued {n_cores_queued}. We need to process {n_cores_to_submit} cores" - ) + # For queues that need more pressure towards reaching a target + n_cores_running_fake = 0 + try: + if pq_data_des["status"] in [ + "online", + "brokeroff", + ]: # don't flood test sites with workers + n_cores_running_fake = pq_data_des["params"]["ups_core_target"] + tmp_log.debug(f"Using ups_core_target {n_cores_running_fake} for queue {queue}") + except KeyError: # no value defined in CRIC + pass - # Get the sorted global shares - sorted_shares = self.get_sorted_leaves() + n_cores_running = max(n_cores_running, n_cores_running_fake) - # Run over the activated jobs by gshare & priority, and substract them from the queued - # A negative value for queued will mean more pilots of that resource type are missing - for share in sorted_shares: - var_map = {":queue": queue, ":gshare": share.name} - sql = f""" - SELECT gshare, prodsourcelabel, resource_type FROM {panda_config.schemaPANDA}.jobsactive4 - WHERE jobstatus = 'activated' - AND computingsite=:queue - AND gshare=:gshare - ORDER BY currentpriority DESC - """ - self.cur.execute(sql + comment, var_map) - activated_jobs = self.cur.fetchall() - tmp_log.debug(f"Processing share: {share.name}. Got {len(activated_jobs)} activated jobs") - for gshare, prodsourcelabel, resource_type in activated_jobs: - core_factor = self.__resource_spec_mapper.translate_resourcetype_to_cores(resource_type, cores_queue) + n_cores_target = max(int(n_cores_running * 0.4), 75 * cores_queue) + n_cores_to_submit = max(n_cores_target - n_cores_queued, 5 * cores_queue) + tmp_log.debug(f"IN CORES: nrunning {n_cores_running}, ntarget {n_cores_target}, nqueued {n_cores_queued}. We need to process {n_cores_to_submit} cores") - # translate prodsourcelabel to a subset of job types, typically 'user' and 'managed' - job_type = JobUtils.translate_prodsourcelabel_to_jobtype(queue_type, prodsourcelabel) - # if we reached the limit for the resource type, skip the job - if resource_type in resource_type_limits and resource_type_limits[resource_type] <= 0: - # tmp_log.debug('Reached resource type limit for {0}'.format(resource_type)) - continue + # Get the sorted global shares + sorted_shares = self.get_sorted_leaves() - # if we reached the limit for the HIMEM resource type group, skip the job - if self.__resource_spec_mapper.is_high_memory(resource_type) and HIMEM in resource_type_limits and resource_type_limits[HIMEM] <= 0: - # tmp_log.debug('Reached resource type limit for {0}'.format(resource_type)) - continue + # Run over the activated jobs by gshare & priority, and substract them from the queued + # A negative value for queued will mean more pilots of that resource type are missing + for share in sorted_shares: + var_map = {":queue": queue, ":gshare": share.name} + sql = f""" + SELECT gshare, prodsourcelabel, resource_type FROM {panda_config.schemaPANDA}.jobsactive4 + WHERE jobstatus = 'activated' + AND computingsite=:queue + AND gshare=:gshare + ORDER BY currentpriority DESC + """ + self.cur.execute(sql + comment, var_map) + activated_jobs = self.cur.fetchall() + tmp_log.debug(f"Processing share: {share.name}. Got {len(activated_jobs)} activated jobs") + for gshare, prodsourcelabel, resource_type in activated_jobs: + core_factor = self.__resource_spec_mapper.translate_resourcetype_to_cores(resource_type, cores_queue) + + # translate prodsourcelabel to a subset of job types, typically 'user' and 'managed' + job_type = JobUtils.translate_prodsourcelabel_to_jobtype(queue_type, prodsourcelabel) + # if we reached the limit for the resource type, skip the job + if resource_type in resource_type_limits and resource_type_limits[resource_type] <= 0: + # tmp_log.debug('Reached resource type limit for {0}'.format(resource_type)) + continue - workers_queued.setdefault(job_type, {}) - workers_queued[job_type].setdefault(resource_type, 0) - workers_queued[job_type][resource_type] = workers_queued[job_type][resource_type] - 1 - if workers_queued[job_type][resource_type] <= 0: - # we've gone over the jobs that already have a queued worker, now we go for new workers - n_cores_to_submit = n_cores_to_submit - core_factor + # if we reached the limit for the HIMEM resource type group, skip the job + if self.__resource_spec_mapper.is_high_memory(resource_type) and HIMEM in resource_type_limits and resource_type_limits[HIMEM] <= 0: + # tmp_log.debug('Reached resource type limit for {0}'.format(resource_type)) + continue - # We reached the number of workers needed - if n_cores_to_submit <= 0: - tmp_log.debug("Reached cores needed (inner)") - break + workers_queued.setdefault(job_type, {}) + workers_queued[job_type].setdefault(resource_type, 0) + workers_queued[job_type][resource_type] = workers_queued[job_type][resource_type] - 1 + if workers_queued[job_type][resource_type] <= 0: + # we've gone over the jobs that already have a queued worker, now we go for new workers + n_cores_to_submit = n_cores_to_submit - core_factor # We reached the number of workers needed if n_cores_to_submit <= 0: - tmp_log.debug("Reached cores needed (outer)") + tmp_log.debug("Reached cores needed (inner)") + break + + # We reached the number of workers needed + if n_cores_to_submit <= 0: + tmp_log.debug("Reached cores needed (outer)") + break + + tmp_log.debug(f"workers_queued: {workers_queued}") + + new_workers = {} + for job_type in workers_queued: + new_workers.setdefault(job_type, {}) + for resource_type in workers_queued[job_type]: + if workers_queued[job_type][resource_type] >= 0: + # we have too many workers queued already, don't submit more + new_workers[job_type][resource_type] = 0 + elif workers_queued[job_type][resource_type] < 0: + # we don't have enough workers for this resource type + new_workers[job_type][resource_type] = -workers_queued[job_type][resource_type] + 1 + + # We should still submit a basic worker, even if there are no activated jobs to avoid queue deactivation + workers = False + for job_type in new_workers: + for resource_type in new_workers[job_type]: + if new_workers[job_type][resource_type] > 0: + workers = True break + if not workers: + new_workers["managed"] = {BASIC_RESOURCE_TYPE: 1} - tmp_log.debug(f"workers_queued: {workers_queued}") - - new_workers = {} - for job_type in workers_queued: - new_workers.setdefault(job_type, {}) - for resource_type in workers_queued[job_type]: - if workers_queued[job_type][resource_type] >= 0: - # we have too many workers queued already, don't submit more - new_workers[job_type][resource_type] = 0 - elif workers_queued[job_type][resource_type] < 0: - # we don't have enough workers for this resource type - new_workers[job_type][resource_type] = -workers_queued[job_type][resource_type] + 1 - - # We should still submit a basic worker, even if there are no activated jobs to avoid queue deactivation - workers = False + # In case multiple harvester instances are serving a panda queue, split workers evenly between them + new_workers_per_harvester = {} + for harvester_id in harvester_ids: + new_workers_per_harvester.setdefault(harvester_id, {}) for job_type in new_workers: + new_workers_per_harvester[harvester_id].setdefault(job_type, {}) for resource_type in new_workers[job_type]: - if new_workers[job_type][resource_type] > 0: - workers = True - break - if not workers: - new_workers["managed"] = {BASIC_RESOURCE_TYPE: 1} - - # In case multiple harvester instances are serving a panda queue, split workers evenly between them - new_workers_per_harvester = {} - for harvester_id in harvester_ids: - new_workers_per_harvester.setdefault(harvester_id, {}) - for job_type in new_workers: - new_workers_per_harvester[harvester_id].setdefault(job_type, {}) - for resource_type in new_workers[job_type]: - new_workers_per_harvester[harvester_id][job_type][resource_type] = int( - math.ceil(new_workers[job_type][resource_type] * 1.0 / len(harvester_ids)) - ) + new_workers_per_harvester[harvester_id][job_type][resource_type] = int( + math.ceil(new_workers[job_type][resource_type] * 1.0 / len(harvester_ids)) + ) - tmp_log.debug(f"Workers to submit: {new_workers_per_harvester}") - tmp_log.debug("done") - return new_workers_per_harvester - except: - tmp_log.error(traceback.format_exc()) - return {} + tmp_log.debug(f"Workers to submit: {new_workers_per_harvester}") + tmp_log.debug("done") + return new_workers_per_harvester # get active consumers def getActiveConsumers(self, jediTaskID, jobsetID, myPandaID): From 81b5d3e20838b185a7e8ee75219e25bdcc68256a Mon Sep 17 00:00:00 2001 From: Fernando Harald Barreiro Megino Date: Fri, 5 Apr 2024 13:43:31 +0200 Subject: [PATCH 6/7] Locking rows in reportWorkerStats_jobtype --- pandaserver/taskbuffer/OraDBProxy.py | 72 ++++++++++++++++------------ 1 file changed, 41 insertions(+), 31 deletions(-) diff --git a/pandaserver/taskbuffer/OraDBProxy.py b/pandaserver/taskbuffer/OraDBProxy.py index 4df5583dd..2b04a824c 100644 --- a/pandaserver/taskbuffer/OraDBProxy.py +++ b/pandaserver/taskbuffer/OraDBProxy.py @@ -22890,57 +22890,67 @@ def reportWorkerStats(self, harvesterID, siteName, paramsList): return False, "database error" # update stat of workers with jobtype breakdown - def reportWorkerStats_jobtype(self, harvesterID, siteName, paramsList): + def reportWorkerStats_jobtype(self, harvesterID, siteName, parameter_list): comment = " /* DBProxy.reportWorkerStats_jobtype */" - methodName = comment.split(" ")[-2].split(".")[-1] - tmpLog = LogWrapper( + method_name = comment.split(" ")[-2].split(".")[-1] + tmp_log = LogWrapper( _logger, - methodName + f" < harvesterID={harvesterID} siteName={siteName} >", + f"{method_name} < harvesterID={harvesterID} siteName={siteName} >", ) - tmpLog.debug("start") - tmpLog.debug(f"params={str(paramsList)}") + tmp_log.debug("start") + tmp_log.debug(f"params={str(parameter_list)}") try: # load new site data - paramsList = json.loads(paramsList) + parameter_list = json.loads(parameter_list) # set autocommit on self.conn.begin() - # delete old site data - sqlDel = "DELETE FROM ATLAS_PANDA.Harvester_Worker_Stats " - sqlDel += "WHERE harvester_ID=:harvesterID AND computingSite=:siteName " - varMap = dict() - varMap[":harvesterID"] = harvesterID - varMap[":siteName"] = siteName - self.cur.execute(sqlDel + comment, varMap) + + # lock the site data rows + var_map = dict() + var_map[":harvesterID"] = harvesterID + var_map[":siteName"] = siteName + sql_lock = "SELECT harvester_ID, computingSite FROM ATLAS_PANDA.Harvester_Worker_Stats " + sql_lock += "WHERE harvester_ID=:harvesterID AND computingSite=:siteName FOR UPDATE NOWAIT " + self.cur.execute(sql_lock + comment, var_map) + + # delete them + sql_delete = "DELETE FROM ATLAS_PANDA.Harvester_Worker_Stats " + sql_delete += "WHERE harvester_ID=:harvesterID AND computingSite=:siteName " + self.cur.execute(sql_delete + comment, var_map) + # insert new site data - sqlI = "INSERT INTO ATLAS_PANDA.Harvester_Worker_Stats (harvester_ID, computingSite, jobType, resourceType, status, n_workers, lastUpdate) " - sqlI += "VALUES (:harvester_ID, :siteName, :jobType, :resourceType, :status, :n_workers, CURRENT_DATE) " + sql_insert = "INSERT INTO ATLAS_PANDA.Harvester_Worker_Stats (harvester_ID, computingSite, jobType, resourceType, status, n_workers, lastUpdate) " + sql_insert += "VALUES (:harvester_ID, :siteName, :jobType, :resourceType, :status, :n_workers, CURRENT_DATE) " - for jobType in paramsList: - jt_params = paramsList[jobType] + var_map_list = [] + for jobType in parameter_list: + jt_params = parameter_list[jobType] for resourceType in jt_params: params = jt_params[resourceType] if resourceType == "Undefined": continue for status in params: n_workers = params[status] - varMap = dict() - varMap[":harvester_ID"] = harvesterID - varMap[":siteName"] = siteName - varMap[":status"] = status - varMap[":jobType"] = jobType - varMap[":resourceType"] = resourceType - varMap[":n_workers"] = n_workers - self.cur.execute(sqlI + comment, varMap) - # commit + var_map = { + ":harvester_ID": harvesterID, + ":siteName": siteName, + ":status": status, + ":jobType": jobType, + ":resourceType": resourceType, + ":n_workers": n_workers, + } + var_map_list.append(var_map) + + self.cur.executemany(sql_insert + comment, var_map_list) + if not self._commit(): raise RuntimeError("Commit error") - # return - tmpLog.debug("done") + + tmp_log.debug("done") return True, "OK" except Exception: - # roll back self._rollback() - self.dumpErrorMessage(tmpLog, methodName) + self.dumpErrorMessage(tmp_log, method_name) return False, "database error" # get stat of workers From d9510e39c2b993dc22b331b7ea4899670a188722 Mon Sep 17 00:00:00 2001 From: Fernando Harald Barreiro Megino Date: Fri, 5 Apr 2024 14:11:42 +0200 Subject: [PATCH 7/7] Locking rows in reportWorkerStats_jobtype --- pandaserver/taskbuffer/OraDBProxy.py | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/pandaserver/taskbuffer/OraDBProxy.py b/pandaserver/taskbuffer/OraDBProxy.py index 2b04a824c..454a293a7 100644 --- a/pandaserver/taskbuffer/OraDBProxy.py +++ b/pandaserver/taskbuffer/OraDBProxy.py @@ -22911,7 +22911,14 @@ def reportWorkerStats_jobtype(self, harvesterID, siteName, parameter_list): var_map[":siteName"] = siteName sql_lock = "SELECT harvester_ID, computingSite FROM ATLAS_PANDA.Harvester_Worker_Stats " sql_lock += "WHERE harvester_ID=:harvesterID AND computingSite=:siteName FOR UPDATE NOWAIT " - self.cur.execute(sql_lock + comment, var_map) + try: + self.cur.execute(sql_lock + comment, var_map) + except Exception: + self._rollback() + message = "rows locked by another update" + tmp_log.debug(message) + tmp_log.debug("done") + return False, message # delete them sql_delete = "DELETE FROM ATLAS_PANDA.Harvester_Worker_Stats " @@ -22948,7 +22955,7 @@ def reportWorkerStats_jobtype(self, harvesterID, siteName, parameter_list): tmp_log.debug("done") return True, "OK" - except Exception: + except Exception as e: self._rollback() self.dumpErrorMessage(tmp_log, method_name) return False, "database error"