Skip to content

Commit

Permalink
Merge pull request #7518 from fstagni/fixMemoryReporting
Browse files Browse the repository at this point in the history
[8.0] Fix memory reporting
  • Loading branch information
fstagni authored Mar 18, 2024
2 parents b8e8dfa + 27f2f18 commit 108861c
Showing 1 changed file with 17 additions and 44 deletions.
61 changes: 17 additions & 44 deletions src/DIRAC/WorkloadManagementSystem/JobWrapper/Watchdog.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,3 @@
########################################################################
# File : Watchdog.py
# Author: Stuart Paterson
########################################################################

""" The Watchdog class is used by the Job Wrapper to resolve and monitor
the system resource consumption. The Watchdog can determine if
a running job is stalled and indicate this to the Job Wrapper.
Expand All @@ -22,7 +17,6 @@
import math
import os
import re
import resource
import socket
import time
from pathlib import Path
Expand Down Expand Up @@ -287,12 +281,15 @@ def _performChecks(self):
self.parameters["LoadAverage"] = []
self.parameters["LoadAverage"].append(loadAvg)

memoryUsed = self.getMemoryUsed()
msg += f"MemUsed: {memoryUsed:.1f} kb "
heartBeatDict["MemoryUsed"] = memoryUsed
if "MemoryUsed" not in self.parameters:
self.parameters["MemoryUsed"] = []
self.parameters["MemoryUsed"].append(memoryUsed)
result = self.profiler.memoryUsage(withChildren=True)
if not result["OK"]:
self.log.warn("Could not get rss info from profiler", result["Message"])
else:
msg += f"MemUsed: {result['Value']:.1f} MB "
heartBeatDict["MemoryUsed"] = result["Value"]
if "MemoryUsed" not in self.parameters:
self.parameters["MemoryUsed"] = []
self.parameters["MemoryUsed"].append(result["Value"])

result = self.profiler.vSizeUsage(withChildren=True)
if not result["OK"]:
Expand All @@ -302,17 +299,7 @@ def _performChecks(self):
heartBeatDict["Vsize"] = vsize
self.parameters.setdefault("Vsize", [])
self.parameters["Vsize"].append(vsize)
msg += f"Job Vsize: {vsize:.1f} kb "

result = self.profiler.memoryUsage(withChildren=True)
if not result["OK"]:
self.log.warn("Could not get rss info from profiler", result["Message"])
else:
rss = result["Value"] * 1024.0
heartBeatDict["RSS"] = rss
self.parameters.setdefault("RSS", [])
self.parameters["RSS"].append(rss)
msg += f"Job RSS: {rss:.1f} kb "
msg += f"Job Vsize: {vsize:.1f} MB "

if "DiskSpace" not in self.parameters:
self.parameters["DiskSpace"] = []
Expand Down Expand Up @@ -666,7 +653,7 @@ def __checkMemoryLimit(self):
if vsize and self.memoryLimit:
if vsize > self.memoryLimit:
# Just a warning for the moment
self.log.warn(f"Job has consumed {vsize:f}.2 KB of memory with the limit of {self.memoryLimit:f}.2 KB")
self.log.warn(f"Job has consumed {vsize:f}.2 MB of memory with the limit of {self.memoryLimit:f}.2 MB")

return S_OK()

Expand Down Expand Up @@ -744,27 +731,21 @@ def calibrate(self):
self.initialValues["LoadAverage"] = float(os.getloadavg()[0])
self.parameters["LoadAverage"] = []

memUsed = self.getMemoryUsed()

self.initialValues["MemoryUsed"] = memUsed
self.parameters["MemoryUsed"] = []

result = self.profiler.vSizeUsage(withChildren=True)
if not result["OK"]:
self.log.warn("Could not get vSize info from profiler", result["Message"])
else:
vsize = result["Value"] * 1024.0
self.initialValues["Vsize"] = vsize
self.log.verbose("Vsize(kb)", f"{vsize:.1f}")
self.log.verbose("Vsize(MB)", f"{vsize:.1f}")
self.parameters["Vsize"] = []

result = self.profiler.memoryUsage(withChildren=True)
if not result["OK"]:
self.log.warn("Could not get rss info from profiler", result["Message"])
else:
rss = result["Value"] * 1024.0
self.initialValues["RSS"] = rss
self.log.verbose("RSS(kb)", f"{rss:.1f}")
self.initialValues["RSS"] = result["Value"]
self.log.verbose("RSS(MB)", f"{result['Value']:.1f}")
self.parameters["RSS"] = []

# We exclude fuse so that mountpoints can be cleaned up by automount after a period unused
Expand Down Expand Up @@ -847,9 +828,9 @@ def __getUsageSummary(self):
if "MemoryUsed" in self.parameters:
memory = self.parameters["MemoryUsed"]
if memory:
summary["MemoryUsed(kb)"] = abs(float(memory[-1]) - float(self.initialValues["MemoryUsed"]))
summary["MemoryUsed(MB)"] = abs(float(memory[-1]) - float(self.initialValues["MemoryUsed"]))
else:
summary["MemoryUsed(kb)"] = math.nan
summary["MemoryUsed(MB)"] = math.nan
# LoadAverage
if "LoadAverage" in self.parameters:
laList = self.parameters["LoadAverage"]
Expand Down Expand Up @@ -956,7 +937,7 @@ def getNodeInformation(self):
"""Retrieves all static system information"""
result = {}
result["HostName"] = socket.gethostname()
result["Memory(kB)"] = int(psutil.virtual_memory()[1] / 1024)
result["Memory(MB)"] = int(psutil.virtual_memory()[1] / 1024 / 1024)
result["LocalAccount"] = getpass.getuser()

path = Path("/proc/cpuinfo")
Expand All @@ -968,14 +949,6 @@ def getNodeInformation(self):

return result

#############################################################################
def getMemoryUsed(self):
"""Obtains the memory used."""
mem = (
resource.getrusage(resource.RUSAGE_SELF).ru_maxrss + resource.getrusage(resource.RUSAGE_CHILDREN).ru_maxrss
)
return float(mem)

#############################################################################
def getDiskSpace(self, exclude=None):
"""Obtains the available disk space."""
Expand Down

0 comments on commit 108861c

Please sign in to comment.