Skip to content

Commit

Permalink
Merge pull request #348 from PanDAWMS/resource_types
Browse files Browse the repository at this point in the history
Clean up of hardcoded resource types + worker stats reporting concurrency issue
  • Loading branch information
fbarreir authored Apr 16, 2024
2 parents 3c4297e + 974b9e8 commit 28fdd52
Show file tree
Hide file tree
Showing 5 changed files with 149 additions and 85 deletions.
8 changes: 0 additions & 8 deletions pandaserver/taskbuffer/JobUtils.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,6 @@
priorityTasksToJumpOver = 1500


def translate_resourcetype_to_cores(resource_type, cores_queue):
# resolve the multiplying core factor
if "MCORE" in resource_type:
return cores_queue
else:
return 1


def translate_prodsourcelabel_to_jobtype(queue_type, prodsourcelabel):
if prodsourcelabel in analy_sources:
return ANALY_PS
Expand Down
168 changes: 101 additions & 67 deletions pandaserver/taskbuffer/OraDBProxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,11 @@
from pandaserver.taskbuffer.FileSpec import FileSpec
from pandaserver.taskbuffer.HarvesterMetricsSpec import HarvesterMetricsSpec
from pandaserver.taskbuffer.JobSpec import JobSpec, push_status_changes
from pandaserver.taskbuffer.ResourceSpec import ResourceSpec
from pandaserver.taskbuffer.ResourceSpec import (
BASIC_RESOURCE_TYPE,
ResourceSpec,
ResourceSpecMapper,
)
from pandaserver.taskbuffer.SupErrors import SupErrors
from pandaserver.taskbuffer.Utils import create_shards
from pandaserver.taskbuffer.WorkerSpec import WorkerSpec
Expand Down Expand Up @@ -156,6 +160,11 @@ def __init__(self, useOtherError=False):
self.__hs_distribution = None # HS06s distribution of sites
self.__t_update_distribution = None # Timestamp when the HS06s distribution was last updated

# resource type mapper
# if you want to use it, you need to call __reload_resource_spec_mapper first
self.__resource_spec_mapper = None
self.__t_update_resource_type_mapper = None

# priority boost
self.job_prio_boost_dict = None
self.job_prio_boost_dict_update_time = None
Expand All @@ -166,9 +175,6 @@ def __init__(self, useOtherError=False):
# mb proxy
self.mb_proxy_dict = None

# self.__reload_shares()
# self.__reload_hs_distribution()

# connect to DB
def connect(
self,
Expand Down Expand Up @@ -16809,7 +16815,7 @@ def setSiteForEsMerge(self, jobSpec, isFakeCJ, methodName, comment):

# set score site to ES job
def setScoreSiteToEs(self, jobSpec, methodName, comment):
_logger.debug(f"{methodName} looking for SCORE site")
_logger.debug(f"{methodName} looking for single-core site")

# get score PQ in the nucleus associated to the site to run the small ES job
sqlSN = "SELECT /* use_json_type */ ps2.panda_site_name "
Expand Down Expand Up @@ -16867,9 +16873,9 @@ def setScoreSiteToEs(self, jobSpec, methodName, comment):
jobSpec.resource_type = self.get_resource_type_job(jobSpec)
newSiteName = jobSpec.computingSite
if newSiteName is not None:
_logger.info(f"{methodName} set SCORE site to {newSiteName}")
_logger.info(f"{methodName} set single-core site to {newSiteName}")
else:
_logger.info(f"{methodName} no SCORE site for {jobSpec.computingSite}")
_logger.info(f"{methodName} no single-core site for {jobSpec.computingSite}")
# return
return

Expand Down Expand Up @@ -22672,6 +22678,18 @@ def storePilotLog(self, panda_id, pilot_log):
_logger.error(f"{comment}: {type} {value}")
return -1

def __reload_resource_spec_mapper(self):
# update once per hour only
if self.__t_update_resource_type_mapper and self.__t_update_resource_type_mapper > datetime.datetime.now() - datetime.timedelta(hours=1):
return

# get the resource types from the DB and make the ResourceSpecMapper object
resource_types = self.load_resource_types()
if resource_types:
self.__resource_spec_mapper = ResourceSpecMapper(resource_types)
self.__t_update_resource_type_mapper = datetime.datetime.now()
return

def load_resource_types(self, formatting="spec"):
"""
Load the resource type table to memory
Expand Down Expand Up @@ -22872,57 +22890,74 @@ def reportWorkerStats(self, harvesterID, siteName, paramsList):
return False, "database error"

# update stat of workers with jobtype breakdown
def reportWorkerStats_jobtype(self, harvesterID, siteName, paramsList):
def reportWorkerStats_jobtype(self, harvesterID, siteName, parameter_list):
comment = " /* DBProxy.reportWorkerStats_jobtype */"
methodName = comment.split(" ")[-2].split(".")[-1]
tmpLog = LogWrapper(
method_name = comment.split(" ")[-2].split(".")[-1]
tmp_log = LogWrapper(
_logger,
methodName + f" < harvesterID={harvesterID} siteName={siteName} >",
f"{method_name} < harvesterID={harvesterID} siteName={siteName} >",
)
tmpLog.debug("start")
tmpLog.debug(f"params={str(paramsList)}")
tmp_log.debug("start")
tmp_log.debug(f"params={str(parameter_list)}")
try:
# load new site data
paramsList = json.loads(paramsList)
parameter_list = json.loads(parameter_list)
# set autocommit on
self.conn.begin()
# delete old site data
sqlDel = "DELETE FROM ATLAS_PANDA.Harvester_Worker_Stats "
sqlDel += "WHERE harvester_ID=:harvesterID AND computingSite=:siteName "
varMap = dict()
varMap[":harvesterID"] = harvesterID
varMap[":siteName"] = siteName
self.cur.execute(sqlDel + comment, varMap)

# lock the site data rows
var_map = dict()
var_map[":harvesterID"] = harvesterID
var_map[":siteName"] = siteName
sql_lock = "SELECT harvester_ID, computingSite FROM ATLAS_PANDA.Harvester_Worker_Stats "
sql_lock += "WHERE harvester_ID=:harvesterID AND computingSite=:siteName FOR UPDATE NOWAIT "
try:
self.cur.execute(sql_lock + comment, var_map)
except Exception:
self._rollback()
message = "rows locked by another update"
tmp_log.debug(message)
tmp_log.debug("done")
return False, message

# delete them
sql_delete = "DELETE FROM ATLAS_PANDA.Harvester_Worker_Stats "
sql_delete += "WHERE harvester_ID=:harvesterID AND computingSite=:siteName "
self.cur.execute(sql_delete + comment, var_map)

# insert new site data
sqlI = "INSERT INTO ATLAS_PANDA.Harvester_Worker_Stats (harvester_ID, computingSite, jobType, resourceType, status, n_workers, lastUpdate) "
sqlI += "VALUES (:harvester_ID, :siteName, :jobType, :resourceType, :status, :n_workers, CURRENT_DATE) "
sql_insert = "INSERT INTO ATLAS_PANDA.Harvester_Worker_Stats (harvester_ID, computingSite, jobType, resourceType, status, n_workers, lastUpdate) "
sql_insert += "VALUES (:harvester_ID, :siteName, :jobType, :resourceType, :status, :n_workers, CURRENT_DATE) "

for jobType in paramsList:
jt_params = paramsList[jobType]
var_map_list = []
for jobType in parameter_list:
jt_params = parameter_list[jobType]
for resourceType in jt_params:
params = jt_params[resourceType]
if resourceType == "Undefined":
continue
for status in params:
n_workers = params[status]
varMap = dict()
varMap[":harvester_ID"] = harvesterID
varMap[":siteName"] = siteName
varMap[":status"] = status
varMap[":jobType"] = jobType
varMap[":resourceType"] = resourceType
varMap[":n_workers"] = n_workers
self.cur.execute(sqlI + comment, varMap)
# commit
var_map = {
":harvester_ID": harvesterID,
":siteName": siteName,
":status": status,
":jobType": jobType,
":resourceType": resourceType,
":n_workers": n_workers,
}
var_map_list.append(var_map)

self.cur.executemany(sql_insert + comment, var_map_list)

if not self._commit():
raise RuntimeError("Commit error")
# return
tmpLog.debug("done")

tmp_log.debug("done")
return True, "OK"
except Exception:
# roll back
except Exception as e:
self._rollback()
self.dumpErrorMessage(tmpLog, methodName)
self.dumpErrorMessage(tmp_log, method_name)
return False, "database error"

# get stat of workers
Expand Down Expand Up @@ -23412,40 +23447,39 @@ def ups_new_worker_distribution(self, queue, worker_stats):

comment = " /* DBProxy.ups_new_worker_distribution */"
method_name = comment.split(" ")[-2].split(".")[-1]
tmpLog = LogWrapper(_logger, f"{method_name}-{queue}")
tmpLog.debug("start")
tmp_log = LogWrapper(_logger, f"{method_name}-{queue}")
tmp_log.debug("start")
n_cores_running = 0
workers_queued = {}
n_cores_queued = 0
harvester_ids_temp = list(worker_stats)

# HIMEM resource types group
HIMEM = "HIMEM"
HIMEM_RTS = ["SCORE_HIMEM", "MCORE_HIMEM"]
self.__reload_resource_spec_mapper()

# get the configuration for maximum workers of each type
pq_data_des = self.get_config_for_pq(queue)
resource_type_limits = {}
queue_type = "production"
if not pq_data_des:
tmpLog.debug("Error retrieving queue configuration from DB, limits can not be applied")
tmp_log.debug("Error retrieving queue configuration from DB, limits can not be applied")
else:
try:
resource_type_limits = pq_data_des["uconfig"]["resource_type_limits"]
except KeyError:
tmpLog.debug("No resource type limits")
tmp_log.debug("No resource type limits")
pass
try:
queue_type = pq_data_des["type"]
except KeyError:
tmpLog.error("No queue type")
tmp_log.error("No queue type")
pass
try:
cores_queue = pq_data_des["corecount"]
if not cores_queue:
cores_queue = 1
except KeyError:
tmpLog.error("No corecount")
tmp_log.error("No corecount")
pass

# Retrieve the assigned harvester instance and submit UPS commands only to this instance. We have had multiple
Expand All @@ -23470,7 +23504,7 @@ def ups_new_worker_distribution(self, queue, worker_stats):
for job_type in worker_stats[harvester_id]:
workers_queued.setdefault(job_type, {})
for resource_type in worker_stats[harvester_id][job_type]:
core_factor = JobUtils.translate_resourcetype_to_cores(resource_type, cores_queue)
core_factor = self.__resource_spec_mapper.translate_resourcetype_to_cores(resource_type, cores_queue)
try:
n_cores_running = n_cores_running + worker_stats[harvester_id][job_type][resource_type]["running"] * core_factor

Expand All @@ -23479,14 +23513,14 @@ def ups_new_worker_distribution(self, queue, worker_stats):
resource_type_limits[resource_type] = (
resource_type_limits[resource_type] - worker_stats[harvester_id][job_type][resource_type]["running"]
)
tmpLog.debug(f"Limit for rt {resource_type} down to {resource_type_limits[resource_type]}")
tmp_log.debug(f"Limit for rt {resource_type} down to {resource_type_limits[resource_type]}")

# This limit is in #CORES, since it mixes single and multi core jobs
if resource_type in HIMEM_RTS and HIMEM in resource_type_limits:
if self.__resource_spec_mapper.is_high_memory(resource_type) and HIMEM in resource_type_limits:
resource_type_limits[HIMEM] = (
resource_type_limits[HIMEM] - worker_stats[harvester_id][job_type][resource_type]["running"] * core_factor
)
tmpLog.debug(f"Limit for rt group {HIMEM} down to {resource_type_limits[HIMEM]}")
tmp_log.debug(f"Limit for rt group {HIMEM} down to {resource_type_limits[HIMEM]}")

except KeyError:
pass
Expand All @@ -23509,7 +23543,7 @@ def ups_new_worker_distribution(self, queue, worker_stats):
except KeyError:
pass

tmpLog.debug(f"Queue {queue} queued worker overview: {workers_queued}")
tmp_log.debug(f"Queue {queue} queued worker overview: {workers_queued}")

# For queues that need more pressure towards reaching a target
n_cores_running_fake = 0
Expand All @@ -23519,15 +23553,15 @@ def ups_new_worker_distribution(self, queue, worker_stats):
"brokeroff",
]: # don't flood test sites with workers
n_cores_running_fake = pq_data_des["params"]["ups_core_target"]
tmpLog.debug(f"Using ups_core_target {n_cores_running_fake} for queue {queue}")
except KeyError: # no value defined in AGIS
tmp_log.debug(f"Using ups_core_target {n_cores_running_fake} for queue {queue}")
except KeyError: # no value defined in CRIC
pass

n_cores_running = max(n_cores_running, n_cores_running_fake)

n_cores_target = max(int(n_cores_running * 0.4), 75 * cores_queue)
n_cores_to_submit = max(n_cores_target - n_cores_queued, 5 * cores_queue)
tmpLog.debug(f"IN CORES: nrunning {n_cores_running}, ntarget {n_cores_target}, nqueued {n_cores_queued}. We need to process {n_cores_to_submit} cores")
tmp_log.debug(f"IN CORES: nrunning {n_cores_running}, ntarget {n_cores_target}, nqueued {n_cores_queued}. We need to process {n_cores_to_submit} cores")

# Get the sorted global shares
sorted_shares = self.get_sorted_leaves()
Expand All @@ -23545,20 +23579,20 @@ def ups_new_worker_distribution(self, queue, worker_stats):
"""
self.cur.execute(sql + comment, var_map)
activated_jobs = self.cur.fetchall()
tmpLog.debug(f"Processing share: {share.name}. Got {len(activated_jobs)} activated jobs")
tmp_log.debug(f"Processing share: {share.name}. Got {len(activated_jobs)} activated jobs")
for gshare, prodsourcelabel, resource_type in activated_jobs:
core_factor = JobUtils.translate_resourcetype_to_cores(resource_type, cores_queue)
core_factor = self.__resource_spec_mapper.translate_resourcetype_to_cores(resource_type, cores_queue)

# translate prodsourcelabel to a subset of job types, typically 'user' and 'managed'
job_type = JobUtils.translate_prodsourcelabel_to_jobtype(queue_type, prodsourcelabel)
# if we reached the limit for the resource type, skip the job
if resource_type in resource_type_limits and resource_type_limits[resource_type] <= 0:
# tmpLog.debug('Reached resource type limit for {0}'.format(resource_type))
# tmp_log.debug('Reached resource type limit for {0}'.format(resource_type))
continue

# if we reached the limit for the HIMEM resource type group, skip the job
if resource_type in HIMEM_RTS and HIMEM in resource_type_limits and resource_type_limits[HIMEM] <= 0:
# tmpLog.debug('Reached resource type limit for {0}'.format(resource_type))
if self.__resource_spec_mapper.is_high_memory(resource_type) and HIMEM in resource_type_limits and resource_type_limits[HIMEM] <= 0:
# tmp_log.debug('Reached resource type limit for {0}'.format(resource_type))
continue

workers_queued.setdefault(job_type, {})
Expand All @@ -23570,15 +23604,15 @@ def ups_new_worker_distribution(self, queue, worker_stats):

# We reached the number of workers needed
if n_cores_to_submit <= 0:
tmpLog.debug("Reached cores needed (inner)")
tmp_log.debug("Reached cores needed (inner)")
break

# We reached the number of workers needed
if n_cores_to_submit <= 0:
tmpLog.debug("Reached cores needed (outer)")
tmp_log.debug("Reached cores needed (outer)")
break

tmpLog.debug(f"workers_queued: {workers_queued}")
tmp_log.debug(f"workers_queued: {workers_queued}")

new_workers = {}
for job_type in workers_queued:
Expand All @@ -23591,15 +23625,15 @@ def ups_new_worker_distribution(self, queue, worker_stats):
# we don't have enough workers for this resource type
new_workers[job_type][resource_type] = -workers_queued[job_type][resource_type] + 1

# We should still submit a SCORE worker, even if there are no activated jobs to avoid queue deactivation
# We should still submit a basic worker, even if there are no activated jobs to avoid queue deactivation
workers = False
for job_type in new_workers:
for resource_type in new_workers[job_type]:
if new_workers[job_type][resource_type] > 0:
workers = True
break
if not workers:
new_workers["managed"] = {"SCORE": 1}
new_workers["managed"] = {BASIC_RESOURCE_TYPE: 1}

# In case multiple harvester instances are serving a panda queue, split workers evenly between them
new_workers_per_harvester = {}
Expand All @@ -23612,8 +23646,8 @@ def ups_new_worker_distribution(self, queue, worker_stats):
math.ceil(new_workers[job_type][resource_type] * 1.0 / len(harvester_ids))
)

tmpLog.debug(f"Workers to submit: {new_workers_per_harvester}")
tmpLog.debug("done")
tmp_log.debug(f"Workers to submit: {new_workers_per_harvester}")
tmp_log.debug("done")
return new_workers_per_harvester

# get active consumers
Expand Down
Loading

0 comments on commit 28fdd52

Please sign in to comment.