From ea2b4453c0419bcfcaae914b6c32e8c0129f955a Mon Sep 17 00:00:00 2001 From: Federico Stagni Date: Mon, 18 Mar 2024 11:27:41 +0100 Subject: [PATCH 1/2] fix: only use psutil for memory reporting --- .../JobWrapper/Watchdog.py | 44 ++++--------------- 1 file changed, 8 insertions(+), 36 deletions(-) diff --git a/src/DIRAC/WorkloadManagementSystem/JobWrapper/Watchdog.py b/src/DIRAC/WorkloadManagementSystem/JobWrapper/Watchdog.py index 575fed3fdbb..7133fcb5c29 100755 --- a/src/DIRAC/WorkloadManagementSystem/JobWrapper/Watchdog.py +++ b/src/DIRAC/WorkloadManagementSystem/JobWrapper/Watchdog.py @@ -1,8 +1,3 @@ -######################################################################## -# File : Watchdog.py -# Author: Stuart Paterson -######################################################################## - """ The Watchdog class is used by the Job Wrapper to resolve and monitor the system resource consumption. The Watchdog can determine if a running job is stalled and indicate this to the Job Wrapper. @@ -22,7 +17,6 @@ import math import os import re -import resource import socket import time from pathlib import Path @@ -287,12 +281,14 @@ def _performChecks(self): self.parameters["LoadAverage"] = [] self.parameters["LoadAverage"].append(loadAvg) - memoryUsed = self.getMemoryUsed() - msg += f"MemUsed: {memoryUsed:.1f} kb " - heartBeatDict["MemoryUsed"] = memoryUsed + result = self.profiler.memoryUsage(withChildren=True) + if not result["OK"]: + self.log.warn("Could not get rss info from profiler", result["Message"]) + msg += f"MemUsed: {result['Value']:.1f} kb " + heartBeatDict["MemoryUsed"] = result["Value"] if "MemoryUsed" not in self.parameters: self.parameters["MemoryUsed"] = [] - self.parameters["MemoryUsed"].append(memoryUsed) + self.parameters["MemoryUsed"].append(result["Value"]) result = self.profiler.vSizeUsage(withChildren=True) if not result["OK"]: @@ -304,16 +300,6 @@ def _performChecks(self): self.parameters["Vsize"].append(vsize) msg += f"Job Vsize: {vsize:.1f} kb " - result = self.profiler.memoryUsage(withChildren=True) - if not result["OK"]: - self.log.warn("Could not get rss info from profiler", result["Message"]) - else: - rss = result["Value"] * 1024.0 - heartBeatDict["RSS"] = rss - self.parameters.setdefault("RSS", []) - self.parameters["RSS"].append(rss) - msg += f"Job RSS: {rss:.1f} kb " - if "DiskSpace" not in self.parameters: self.parameters["DiskSpace"] = [] @@ -744,11 +730,6 @@ def calibrate(self): self.initialValues["LoadAverage"] = float(os.getloadavg()[0]) self.parameters["LoadAverage"] = [] - memUsed = self.getMemoryUsed() - - self.initialValues["MemoryUsed"] = memUsed - self.parameters["MemoryUsed"] = [] - result = self.profiler.vSizeUsage(withChildren=True) if not result["OK"]: self.log.warn("Could not get vSize info from profiler", result["Message"]) @@ -762,9 +743,8 @@ def calibrate(self): if not result["OK"]: self.log.warn("Could not get rss info from profiler", result["Message"]) else: - rss = result["Value"] * 1024.0 - self.initialValues["RSS"] = rss - self.log.verbose("RSS(kb)", f"{rss:.1f}") + self.initialValues["RSS"] = result["Value"] + self.log.verbose("RSS(mb)", f"{result['Value']:.1f}") self.parameters["RSS"] = [] # We exclude fuse so that mountpoints can be cleaned up by automount after a period unused @@ -968,14 +948,6 @@ def getNodeInformation(self): return result - ############################################################################# - def getMemoryUsed(self): - """Obtains the memory used.""" - mem = ( - resource.getrusage(resource.RUSAGE_SELF).ru_maxrss + resource.getrusage(resource.RUSAGE_CHILDREN).ru_maxrss - ) - return float(mem) - ############################################################################# def getDiskSpace(self, exclude=None): """Obtains the available disk space.""" From 27f2f18dac6908ae7b4600bd24911bdaa2c5a289 Mon Sep 17 00:00:00 2001 From: Federico Stagni Date: Mon, 18 Mar 2024 15:11:26 +0100 Subject: [PATCH 2/2] fix: printouts: from kB to MB --- .../JobWrapper/Watchdog.py | 25 ++++++++++--------- 1 file changed, 13 insertions(+), 12 deletions(-) diff --git a/src/DIRAC/WorkloadManagementSystem/JobWrapper/Watchdog.py b/src/DIRAC/WorkloadManagementSystem/JobWrapper/Watchdog.py index 7133fcb5c29..9fc0f3533b9 100755 --- a/src/DIRAC/WorkloadManagementSystem/JobWrapper/Watchdog.py +++ b/src/DIRAC/WorkloadManagementSystem/JobWrapper/Watchdog.py @@ -284,11 +284,12 @@ def _performChecks(self): result = self.profiler.memoryUsage(withChildren=True) if not result["OK"]: self.log.warn("Could not get rss info from profiler", result["Message"]) - msg += f"MemUsed: {result['Value']:.1f} kb " - heartBeatDict["MemoryUsed"] = result["Value"] - if "MemoryUsed" not in self.parameters: - self.parameters["MemoryUsed"] = [] - self.parameters["MemoryUsed"].append(result["Value"]) + else: + msg += f"MemUsed: {result['Value']:.1f} MB " + heartBeatDict["MemoryUsed"] = result["Value"] + if "MemoryUsed" not in self.parameters: + self.parameters["MemoryUsed"] = [] + self.parameters["MemoryUsed"].append(result["Value"]) result = self.profiler.vSizeUsage(withChildren=True) if not result["OK"]: @@ -298,7 +299,7 @@ def _performChecks(self): heartBeatDict["Vsize"] = vsize self.parameters.setdefault("Vsize", []) self.parameters["Vsize"].append(vsize) - msg += f"Job Vsize: {vsize:.1f} kb " + msg += f"Job Vsize: {vsize:.1f} MB " if "DiskSpace" not in self.parameters: self.parameters["DiskSpace"] = [] @@ -652,7 +653,7 @@ def __checkMemoryLimit(self): if vsize and self.memoryLimit: if vsize > self.memoryLimit: # Just a warning for the moment - self.log.warn(f"Job has consumed {vsize:f}.2 KB of memory with the limit of {self.memoryLimit:f}.2 KB") + self.log.warn(f"Job has consumed {vsize:f}.2 MB of memory with the limit of {self.memoryLimit:f}.2 MB") return S_OK() @@ -736,7 +737,7 @@ def calibrate(self): else: vsize = result["Value"] * 1024.0 self.initialValues["Vsize"] = vsize - self.log.verbose("Vsize(kb)", f"{vsize:.1f}") + self.log.verbose("Vsize(MB)", f"{vsize:.1f}") self.parameters["Vsize"] = [] result = self.profiler.memoryUsage(withChildren=True) @@ -744,7 +745,7 @@ def calibrate(self): self.log.warn("Could not get rss info from profiler", result["Message"]) else: self.initialValues["RSS"] = result["Value"] - self.log.verbose("RSS(mb)", f"{result['Value']:.1f}") + self.log.verbose("RSS(MB)", f"{result['Value']:.1f}") self.parameters["RSS"] = [] # We exclude fuse so that mountpoints can be cleaned up by automount after a period unused @@ -827,9 +828,9 @@ def __getUsageSummary(self): if "MemoryUsed" in self.parameters: memory = self.parameters["MemoryUsed"] if memory: - summary["MemoryUsed(kb)"] = abs(float(memory[-1]) - float(self.initialValues["MemoryUsed"])) + summary["MemoryUsed(MB)"] = abs(float(memory[-1]) - float(self.initialValues["MemoryUsed"])) else: - summary["MemoryUsed(kb)"] = math.nan + summary["MemoryUsed(MB)"] = math.nan # LoadAverage if "LoadAverage" in self.parameters: laList = self.parameters["LoadAverage"] @@ -936,7 +937,7 @@ def getNodeInformation(self): """Retrieves all static system information""" result = {} result["HostName"] = socket.gethostname() - result["Memory(kB)"] = int(psutil.virtual_memory()[1] / 1024) + result["Memory(MB)"] = int(psutil.virtual_memory()[1] / 1024 / 1024) result["LocalAccount"] = getpass.getuser() path = Path("/proc/cpuinfo")