diff --git a/src/DIRAC/WorkloadManagementSystem/JobWrapper/Watchdog.py b/src/DIRAC/WorkloadManagementSystem/JobWrapper/Watchdog.py index 575fed3fdbb..9fc0f3533b9 100755 --- a/src/DIRAC/WorkloadManagementSystem/JobWrapper/Watchdog.py +++ b/src/DIRAC/WorkloadManagementSystem/JobWrapper/Watchdog.py @@ -1,8 +1,3 @@ -######################################################################## -# File : Watchdog.py -# Author: Stuart Paterson -######################################################################## - """ The Watchdog class is used by the Job Wrapper to resolve and monitor the system resource consumption. The Watchdog can determine if a running job is stalled and indicate this to the Job Wrapper. @@ -22,7 +17,6 @@ import math import os import re -import resource import socket import time from pathlib import Path @@ -287,12 +281,15 @@ def _performChecks(self): self.parameters["LoadAverage"] = [] self.parameters["LoadAverage"].append(loadAvg) - memoryUsed = self.getMemoryUsed() - msg += f"MemUsed: {memoryUsed:.1f} kb " - heartBeatDict["MemoryUsed"] = memoryUsed - if "MemoryUsed" not in self.parameters: - self.parameters["MemoryUsed"] = [] - self.parameters["MemoryUsed"].append(memoryUsed) + result = self.profiler.memoryUsage(withChildren=True) + if not result["OK"]: + self.log.warn("Could not get rss info from profiler", result["Message"]) + else: + msg += f"MemUsed: {result['Value']:.1f} MB " + heartBeatDict["MemoryUsed"] = result["Value"] + if "MemoryUsed" not in self.parameters: + self.parameters["MemoryUsed"] = [] + self.parameters["MemoryUsed"].append(result["Value"]) result = self.profiler.vSizeUsage(withChildren=True) if not result["OK"]: @@ -302,17 +299,7 @@ def _performChecks(self): heartBeatDict["Vsize"] = vsize self.parameters.setdefault("Vsize", []) self.parameters["Vsize"].append(vsize) - msg += f"Job Vsize: {vsize:.1f} kb " - - result = self.profiler.memoryUsage(withChildren=True) - if not result["OK"]: - self.log.warn("Could not get rss info from profiler", result["Message"]) - else: - rss = result["Value"] * 1024.0 - heartBeatDict["RSS"] = rss - self.parameters.setdefault("RSS", []) - self.parameters["RSS"].append(rss) - msg += f"Job RSS: {rss:.1f} kb " + msg += f"Job Vsize: {vsize:.1f} MB " if "DiskSpace" not in self.parameters: self.parameters["DiskSpace"] = [] @@ -666,7 +653,7 @@ def __checkMemoryLimit(self): if vsize and self.memoryLimit: if vsize > self.memoryLimit: # Just a warning for the moment - self.log.warn(f"Job has consumed {vsize:f}.2 KB of memory with the limit of {self.memoryLimit:f}.2 KB") + self.log.warn(f"Job has consumed {vsize:f}.2 MB of memory with the limit of {self.memoryLimit:f}.2 MB") return S_OK() @@ -744,27 +731,21 @@ def calibrate(self): self.initialValues["LoadAverage"] = float(os.getloadavg()[0]) self.parameters["LoadAverage"] = [] - memUsed = self.getMemoryUsed() - - self.initialValues["MemoryUsed"] = memUsed - self.parameters["MemoryUsed"] = [] - result = self.profiler.vSizeUsage(withChildren=True) if not result["OK"]: self.log.warn("Could not get vSize info from profiler", result["Message"]) else: vsize = result["Value"] * 1024.0 self.initialValues["Vsize"] = vsize - self.log.verbose("Vsize(kb)", f"{vsize:.1f}") + self.log.verbose("Vsize(MB)", f"{vsize:.1f}") self.parameters["Vsize"] = [] result = self.profiler.memoryUsage(withChildren=True) if not result["OK"]: self.log.warn("Could not get rss info from profiler", result["Message"]) else: - rss = result["Value"] * 1024.0 - self.initialValues["RSS"] = rss - self.log.verbose("RSS(kb)", f"{rss:.1f}") + self.initialValues["RSS"] = result["Value"] + self.log.verbose("RSS(MB)", f"{result['Value']:.1f}") self.parameters["RSS"] = [] # We exclude fuse so that mountpoints can be cleaned up by automount after a period unused @@ -847,9 +828,9 @@ def __getUsageSummary(self): if "MemoryUsed" in self.parameters: memory = self.parameters["MemoryUsed"] if memory: - summary["MemoryUsed(kb)"] = abs(float(memory[-1]) - float(self.initialValues["MemoryUsed"])) + summary["MemoryUsed(MB)"] = abs(float(memory[-1]) - float(self.initialValues["MemoryUsed"])) else: - summary["MemoryUsed(kb)"] = math.nan + summary["MemoryUsed(MB)"] = math.nan # LoadAverage if "LoadAverage" in self.parameters: laList = self.parameters["LoadAverage"] @@ -956,7 +937,7 @@ def getNodeInformation(self): """Retrieves all static system information""" result = {} result["HostName"] = socket.gethostname() - result["Memory(kB)"] = int(psutil.virtual_memory()[1] / 1024) + result["Memory(MB)"] = int(psutil.virtual_memory()[1] / 1024 / 1024) result["LocalAccount"] = getpass.getuser() path = Path("/proc/cpuinfo") @@ -968,14 +949,6 @@ def getNodeInformation(self): return result - ############################################################################# - def getMemoryUsed(self): - """Obtains the memory used.""" - mem = ( - resource.getrusage(resource.RUSAGE_SELF).ru_maxrss + resource.getrusage(resource.RUSAGE_CHILDREN).ru_maxrss - ) - return float(mem) - ############################################################################# def getDiskSpace(self, exclude=None): """Obtains the available disk space."""