Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[8.0] Fix memory reporting #7518

Merged
merged 2 commits into from
Mar 18, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 17 additions & 44 deletions src/DIRAC/WorkloadManagementSystem/JobWrapper/Watchdog.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,3 @@
########################################################################
# File : Watchdog.py
# Author: Stuart Paterson
########################################################################

""" The Watchdog class is used by the Job Wrapper to resolve and monitor
the system resource consumption. The Watchdog can determine if
a running job is stalled and indicate this to the Job Wrapper.
Expand All @@ -22,7 +17,6 @@
import math
import os
import re
import resource
import socket
import time
from pathlib import Path
Expand Down Expand Up @@ -287,12 +281,15 @@ def _performChecks(self):
self.parameters["LoadAverage"] = []
self.parameters["LoadAverage"].append(loadAvg)

memoryUsed = self.getMemoryUsed()
msg += f"MemUsed: {memoryUsed:.1f} kb "
heartBeatDict["MemoryUsed"] = memoryUsed
if "MemoryUsed" not in self.parameters:
self.parameters["MemoryUsed"] = []
self.parameters["MemoryUsed"].append(memoryUsed)
result = self.profiler.memoryUsage(withChildren=True)
if not result["OK"]:
self.log.warn("Could not get rss info from profiler", result["Message"])
else:
msg += f"MemUsed: {result['Value']:.1f} MB "
heartBeatDict["MemoryUsed"] = result["Value"]
if "MemoryUsed" not in self.parameters:
self.parameters["MemoryUsed"] = []
self.parameters["MemoryUsed"].append(result["Value"])

result = self.profiler.vSizeUsage(withChildren=True)
if not result["OK"]:
Expand All @@ -302,17 +299,7 @@ def _performChecks(self):
heartBeatDict["Vsize"] = vsize
self.parameters.setdefault("Vsize", [])
self.parameters["Vsize"].append(vsize)
msg += f"Job Vsize: {vsize:.1f} kb "

result = self.profiler.memoryUsage(withChildren=True)
if not result["OK"]:
self.log.warn("Could not get rss info from profiler", result["Message"])
else:
rss = result["Value"] * 1024.0
heartBeatDict["RSS"] = rss
self.parameters.setdefault("RSS", [])
self.parameters["RSS"].append(rss)
msg += f"Job RSS: {rss:.1f} kb "
msg += f"Job Vsize: {vsize:.1f} MB "

if "DiskSpace" not in self.parameters:
self.parameters["DiskSpace"] = []
Expand Down Expand Up @@ -666,7 +653,7 @@ def __checkMemoryLimit(self):
if vsize and self.memoryLimit:
if vsize > self.memoryLimit:
# Just a warning for the moment
self.log.warn(f"Job has consumed {vsize:f}.2 KB of memory with the limit of {self.memoryLimit:f}.2 KB")
self.log.warn(f"Job has consumed {vsize:f}.2 MB of memory with the limit of {self.memoryLimit:f}.2 MB")

return S_OK()

Expand Down Expand Up @@ -744,27 +731,21 @@ def calibrate(self):
self.initialValues["LoadAverage"] = float(os.getloadavg()[0])
self.parameters["LoadAverage"] = []

memUsed = self.getMemoryUsed()

self.initialValues["MemoryUsed"] = memUsed
self.parameters["MemoryUsed"] = []

result = self.profiler.vSizeUsage(withChildren=True)
if not result["OK"]:
self.log.warn("Could not get vSize info from profiler", result["Message"])
else:
vsize = result["Value"] * 1024.0
self.initialValues["Vsize"] = vsize
self.log.verbose("Vsize(kb)", f"{vsize:.1f}")
self.log.verbose("Vsize(MB)", f"{vsize:.1f}")
self.parameters["Vsize"] = []

result = self.profiler.memoryUsage(withChildren=True)
if not result["OK"]:
self.log.warn("Could not get rss info from profiler", result["Message"])
else:
rss = result["Value"] * 1024.0
self.initialValues["RSS"] = rss
self.log.verbose("RSS(kb)", f"{rss:.1f}")
self.initialValues["RSS"] = result["Value"]
self.log.verbose("RSS(MB)", f"{result['Value']:.1f}")
self.parameters["RSS"] = []

# We exclude fuse so that mountpoints can be cleaned up by automount after a period unused
Expand Down Expand Up @@ -847,9 +828,9 @@ def __getUsageSummary(self):
if "MemoryUsed" in self.parameters:
memory = self.parameters["MemoryUsed"]
if memory:
summary["MemoryUsed(kb)"] = abs(float(memory[-1]) - float(self.initialValues["MemoryUsed"]))
summary["MemoryUsed(MB)"] = abs(float(memory[-1]) - float(self.initialValues["MemoryUsed"]))
else:
summary["MemoryUsed(kb)"] = math.nan
summary["MemoryUsed(MB)"] = math.nan
# LoadAverage
if "LoadAverage" in self.parameters:
laList = self.parameters["LoadAverage"]
Expand Down Expand Up @@ -956,7 +937,7 @@ def getNodeInformation(self):
"""Retrieves all static system information"""
result = {}
result["HostName"] = socket.gethostname()
result["Memory(kB)"] = int(psutil.virtual_memory()[1] / 1024)
result["Memory(MB)"] = int(psutil.virtual_memory()[1] / 1024 / 1024)
result["LocalAccount"] = getpass.getuser()

path = Path("/proc/cpuinfo")
Expand All @@ -968,14 +949,6 @@ def getNodeInformation(self):

return result

#############################################################################
def getMemoryUsed(self):
"""Obtains the memory used."""
mem = (
resource.getrusage(resource.RUSAGE_SELF).ru_maxrss + resource.getrusage(resource.RUSAGE_CHILDREN).ru_maxrss
)
return float(mem)

#############################################################################
def getDiskSpace(self, exclude=None):
"""Obtains the available disk space."""
Expand Down
Loading