From 7819a7b6821c8f66dea800c06be7d3402b612542 Mon Sep 17 00:00:00 2001 From: Chris Burr Date: Fri, 22 Nov 2024 11:44:23 +0100 Subject: [PATCH] feat (WMS): Improve caching performance of Limiter --- environment.yml | 4 +- .../Client/Limiter.py | 63 +++++++++++++++---- 2 files changed, 52 insertions(+), 15 deletions(-) diff --git a/environment.yml b/environment.yml index c412023f70d..a3f140203b0 100644 --- a/environment.yml +++ b/environment.yml @@ -104,10 +104,10 @@ dependencies: - dominate - pyjwt # This is a fork of tornado with a patch to allow for configurable iostream - - git+https://github.com/DIRACGrid/tornado.git@iostreamConfigurable + - tornado @ git+https://github.com/DIRACGrid/tornado.git@iostreamConfigurable # This is an extension of Tornado to use M2Crypto # It should eventually be part of DIRACGrid - - git+https://github.com/DIRACGrid/tornado_m2crypto + - tornado_m2crypto @ git+https://github.com/DIRACGrid/tornado_m2crypto - -e .[server] # Add diracdoctools - -e docs/ diff --git a/src/DIRAC/WorkloadManagementSystem/Client/Limiter.py b/src/DIRAC/WorkloadManagementSystem/Client/Limiter.py index 9037a4aa669..86c26a2a10d 100644 --- a/src/DIRAC/WorkloadManagementSystem/Client/Limiter.py +++ b/src/DIRAC/WorkloadManagementSystem/Client/Limiter.py @@ -2,6 +2,13 @@ Utilities and classes here are used by the Matcher """ +import threading +from collections import defaultdict +from concurrent.futures import ThreadPoolExecutor, wait +from functools import partial + +from cachetools import TTLCache + from DIRAC import S_OK, S_ERROR from DIRAC import gLogger @@ -12,10 +19,40 @@ from DIRAC.WorkloadManagementSystem.Client import JobStatus +class TwoLevelCache: + def __init__(self, soft_ttl: int, hard_ttl: int): + self.soft_cache = TTLCache(1_000_000, soft_ttl) + self.hard_cache = TTLCache(1_000_000, hard_ttl) + self.locks = defaultdict(threading.Lock) + self.futures = {} + self.pool = ThreadPoolExecutor(max_workers=10) + + def get(self, key, populate_func): + if result := self.soft_cache.get(key): + return result + with self.locks[key]: + if key not in self.futures: + self.futures[key] = self.pool.submit(self._work, key, populate_func) + if result := self.hard_cache.get(key): + self.soft_cache[key] = result + return result + future = self.futures[key] + wait([future]) + return self.hard_cache[key] + + def _work(self, key, func): + result = func() + with self.locks[key]: + self.futures.pop(key) + self.hard_cache[key] = result + self.soft_cache[key] = result + + class Limiter: # static variables shared between all instances of this class csDictCache = DictCache() condCache = DictCache() + newCache = TwoLevelCache() delayMem = {} def __init__(self, jobDB=None, opsHelper=None, pilotRef=None): @@ -177,19 +214,7 @@ def __getRunningCondition(self, siteName, gridCE=None): if attName not in self.jobDB.jobAttributeNames: self.log.error("Attribute does not exist", f"({attName}). Check the job limits") continue - cK = f"Running:{siteName}:{attName}" - data = self.condCache.get(cK) - if not data: - result = self.jobDB.getCounters( - "Jobs", - [attName], - {"Site": siteName, "Status": [JobStatus.RUNNING, JobStatus.MATCHED, JobStatus.STALLED]}, - ) - if not result["OK"]: - return result - data = result["Value"] - data = {k[0][attName]: k[1] for k in data} - self.condCache.add(cK, 10, data) + data = self.newCache.get(f"Running:{siteName}:{attName}", partial(self._countsByJobType, siteName, attName)) for attValue in limitsDict[attName]: limit = limitsDict[attName][attValue] running = data.get(attValue, 0) @@ -249,3 +274,15 @@ def __getDelayCondition(self, siteName): negCond[attName] = [] negCond[attName].append(attValue) return S_OK(negCond) + + def _countsByJobType(self, siteName, attName): + result = self.jobDB.getCounters( + "Jobs", + [attName], + {"Site": siteName, "Status": [JobStatus.RUNNING, JobStatus.MATCHED, JobStatus.STALLED]}, + ) + if not result["OK"]: + return result + data = result["Value"] + data = {k[0][attName]: k[1] for k in data} + return data