Skip to content

Commit

Permalink
[RQD] Support multiple GPUs with nvidia-smi
Browse files Browse the repository at this point in the history
Co-authored-by: Lars van der Bijl <[email protected]>
  • Loading branch information
splhack and larsbijl committed Feb 22, 2021
1 parent 45cdc0f commit a51ef0b
Show file tree
Hide file tree
Showing 4 changed files with 102 additions and 47 deletions.
19 changes: 16 additions & 3 deletions rqd/rqd/rqcore.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ def __createEnvVariables(self):
self.frameEnv["maxframetime"] = "0"
self.frameEnv["minspace"] = "200"
self.frameEnv["CUE3"] = "True"
self.frameEnv["CUE_GPU_MEMORY"] = str(self.rqCore.machine.getGpuMemory())
self.frameEnv["CUE_GPU_MEMORY"] = str(self.rqCore.machine.getGpuMemoryFree())
self.frameEnv["SP_NOMYCSHRC"] = "1"

for key in self.runFrame.environment:
Expand All @@ -104,6 +104,10 @@ def __createEnvVariables(self):
len(self.runFrame.attributes['CPU_LIST'].split(','))))
self.frameEnv['CUE_HT'] = "True"

# Add GPU's to use all assigned GPU cores
if 'GPU_LIST' in self.runFrame.attributes:
self.frameEnv['CUE_GPU_CORES'] = self.runFrame.attributes['GPU_LIST']

def _createCommandFile(self, command):
"""Creates a file that subprocess. Popen then executes.
@type command: string
Expand Down Expand Up @@ -511,7 +515,8 @@ def run(self):
# Delay keeps the cuebot from spamming failing booking requests
time.sleep(10)
finally:
self.rqCore.releaseCores(self.runFrame.num_cores, runFrame.attributes.get('CPU_LIST'))
self.rqCore.releaseCores(self.runFrame.num_cores, runFrame.attributes.get('CPU_LIST'),
runFrame.attributes.get('GPU_LIST') if 'GPU_LIST' in self.runFrame.attributes else None)

self.rqCore.deleteFrame(self.runFrame.frame_id)

Expand Down Expand Up @@ -701,7 +706,7 @@ def killAllFrame(self, reason):
pass
time.sleep(1)

def releaseCores(self, reqRelease, releaseHT=None):
def releaseCores(self, reqRelease, releaseHT=None, releaseGpus=None):
"""The requested number of cores are released
@type reqRelease: int
@param reqRelease: Number of cores to release, 100 = 1 physical core"""
Expand All @@ -719,6 +724,9 @@ def releaseCores(self, reqRelease, releaseHT=None):
if releaseHT:
self.machine.releaseHT(releaseHT)

if releaseGpus:
self.machine.releaseGpus(releaseGpus)

finally:
self.__threadLock.release()

Expand Down Expand Up @@ -811,6 +819,11 @@ def launchFrame(self, runFrame):
if reserveHT:
runFrame.attributes['CPU_LIST'] = reserveHT

if runFrame.num_gpus:
reserveGpus = self.machine.reserveGpus(runFrame.num_gpus)
if reserveGpus:
runFrame.attributes['GPU_LIST'] = reserveGpus

# They must be available at this point, reserve them
self.cores.idle_cores -= runFrame.num_cores
self.cores.booked_cores += runFrame.num_cores
Expand Down
76 changes: 59 additions & 17 deletions rqd/rqd/rqmachine.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ def __init__(self, rqCore, coreInfo):
self.__rqCore = rqCore
self.__coreInfo = coreInfo
self.__tasksets = set()
self.__gpusets = set()

if platform.system() == 'Linux':
self.__vmstat = rqd.rqswap.VmStat()
Expand All @@ -90,6 +91,7 @@ def __init__(self, rqCore, coreInfo):
self.__pidHistory = {}

self.setupHT()
self.setupGpu()

def isNimbySafeToRunJobs(self):
"""Returns False if nimby should be triggered due to resource limits"""
Expand Down Expand Up @@ -296,40 +298,55 @@ def getBootTime(self):
return int(line.split()[1])
return 0

@rqd.rqutil.Memoize
def getGpuCount(self):
"""Returns the total gpu's on the machine"""
return self.__getGpuValues()['count']

@rqd.rqutil.Memoize
def getGpuMemoryTotal(self):
"""Returns the total gpu memory in kb for CUE_GPU_MEMORY"""
return self.__getGpuValues()['total']

def getGpuMemory(self):
def getGpuMemoryFree(self):
"""Returns the available gpu memory in kb for CUE_GPU_MEMORY"""
return self.__getGpuValues()['free']

def __resetGpuResults(self):
self.gpuResults = {'count': 0, 'total': 0, 'free': 0, 'updated': 0}

def __getGpuValues(self):
if not hasattr(self, 'gpuNotSupported'):
if not hasattr(self, 'gpuResults'):
self.gpuResults = {'total': 0, 'free': 0, 'updated': 0}
self.__resetGpuResults()
if not rqd.rqconstants.ALLOW_GPU:
self.gpuNotSupported = True
return self.gpuResults
if self.gpuResults['updated'] > time.time() - 60:
if self.gpuResults['updated'] > int(time.time()) - 60:
return self.gpuResults
try:
# /shots/spi/home/bin/spinux1/cudaInfo
# /shots/spi/home/bin/rhel7/cudaInfo
cudaInfo = subprocess.getoutput('/usr/local/spi/rqd3/cudaInfo')
if 'There is no device supporting CUDA' in cudaInfo:
self.gpuNotSupported = True
else:
results = cudaInfo.splitlines()[-1].split()
# TotalMem 1023 Mb FreeMem 968 Mb
# The int(math.ceil(int(x) / 32.0) * 32) rounds up to the next multiple of 32
self.gpuResults['total'] = int(math.ceil(int(results[1]) / 32.0) * 32) * KILOBYTE
self.gpuResults['free'] = int(results[4]) * KILOBYTE
self.gpuResults['updated'] = time.time()
nvidia_smi = subprocess.getoutput('nvidia-smi --query-gpu=memory.total,memory.free,count --format=csv,noheader')
total = 0
free = 0
count = 0
for line in nvidia_smi.splitlines():
# Example "16130 MiB, 16103 MiB, 8"
l = line.split()
total += math.ceil(int(l[0]) * 1048.576)
free += math.ceil(int(l[2]) * 1048.576)
count = int(l[-1])

self.gpuResults['total'] = int(total)
self.gpuResults['free'] = int(free)
self.gpuResults['count'] = count
self.gpuResults['updated'] = int(time.time())
except Exception as e:
log.warning('Failed to get FreeMem from cudaInfo due to: %s at %s' % \
self.gpuNotSupported = True
self.__resetGpuResults()
log.warning('Failed to query nvidia-smi due to: %s at %s' % \
(e, traceback.extract_tb(sys.exc_info()[2])))
else:
self.__resetGpuResults()
return self.gpuResults

def __getSwapout(self):
Expand Down Expand Up @@ -566,7 +583,9 @@ def updateMachineStats(self):

self.__renderHost.free_swap = freeSwapMem
self.__renderHost.free_mem = freeMem + cachedMem
self.__renderHost.attributes['freeGpu'] = str(self.getGpuMemory())
self.__renderHost.total_gpu_mem = self.getGpuMemoryTotal()
self.__renderHost.free_gpu_mem = self.getGpuMemoryFree()

self.__renderHost.attributes['swapout'] = self.__getSwapout()

elif platform.system() == 'Darwin':
Expand Down Expand Up @@ -621,6 +640,10 @@ def setupHT(self):
if self.__enabledHT():
self.__tasksets = set(range(self.__coreInfo.total_cores // 100))

def setupGpu(self):
""" Setup rqd for Gpus """
self.__gpusets = set(range(self.getGpuCount()))

def reserveHT(self, reservedCores):
""" Reserve cores for use by taskset
taskset -c 0,1,8,9 COMMAND
Expand Down Expand Up @@ -670,3 +693,22 @@ def releaseHT(self, reservedHT):
for core in reservedHT.split(','):
if int(core) < self.__coreInfo.total_cores // 100:
self.__tasksets.add(int(core))

def reserveGpus(self, reservedGpus):
if len(self.__gpusets) < reservedGpus:
err = 'Not launching, insufficient GPUs to reserve based on reservedGpus'
log.critical(err)
raise CoreReservationFailureException(err)

gpusets = []
for x in range(reservedGpus):
gpu = self.__gpusets.pop()
gpusets.append(str(gpu))

return ','.join(gpusets)

def releaseGpu(self, reservedGpus):
log.debug('GPU set: Releasing gpu - %s' % reservedGpus)
for gpu in reservedGpus.split(','):
if int(gpu) < self.getGpuCount():
self.__gpusets.add(int(gpu))
2 changes: 0 additions & 2 deletions rqd/tests/rqconstants_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@

from .rqmachine_tests import (
CPUINFO,
CUDAINFO,
LOADAVG_LOW_USAGE,
MEMINFO_MODERATE_USAGE,
PROC_STAT,
Expand Down Expand Up @@ -76,7 +75,6 @@ def decorator(*args, **kwargs):
return decorator


@mock.patch("subprocess.getoutput", new=mock.MagicMock(return_value=CUDAINFO))
@mock.patch.object(
rqd.rqutil.Memoize, "isCached", new=mock.MagicMock(return_value=False)
)
Expand Down
52 changes: 27 additions & 25 deletions rqd/tests/rqmachine_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,17 +150,13 @@
'16781318 0 0 0 0 17 4 0 0 0 0 0 6303248 6304296 23932928 140725890743234 '
'140725890743420 140725890743420 140725890744298 0')

CUDAINFO = ' TotalMem 1023 Mb FreeMem 968 Mb'


@mock.patch('subprocess.getoutput', new=mock.MagicMock(return_value=CUDAINFO))
@mock.patch.object(rqd.rqutil.Memoize, 'isCached', new=mock.MagicMock(return_value=False))
@mock.patch('platform.system', new=mock.MagicMock(return_value='Linux'))
@mock.patch('os.statvfs', new=mock.MagicMock())
@mock.patch('rqd.rqutil.getHostname', new=mock.MagicMock(return_value='arbitrary-hostname'))
class MachineTests(pyfakefs.fake_filesystem_unittest.TestCase):

@mock.patch('subprocess.getoutput', new=mock.MagicMock(return_value=CUDAINFO))
@mock.patch('os.statvfs', new=mock.MagicMock())
@mock.patch('platform.system', new=mock.MagicMock(return_value='Linux'))
def setUp(self):
Expand Down Expand Up @@ -314,29 +310,39 @@ def test_getBootTime(self):

self.assertEqual(1569882758, self.machine.getBootTime())

@mock.patch(
'subprocess.getoutput',
new=mock.MagicMock(return_value=' TotalMem 1023 Mb FreeMem 968 Mb'))
def test_getGpuMemoryTotal(self):
def _resetGpuStat(self):
if hasattr(self.machine, 'gpuNotSupported'):
delattr(self.machine, 'gpuNotSupported')
if hasattr(self.machine, 'gpuResults'):
delattr(self.machine, 'gpuResults')
rqd.rqconstants.ALLOW_GPU = True

self.assertEqual(1048576, self.machine.getGpuMemoryTotal())

@mock.patch(
'subprocess.getoutput',
new=mock.MagicMock(return_value=' TotalMem 1023 Mb FreeMem 968 Mb'))
def test_getGpuMemory(self):
if hasattr(self.machine, 'gpuNotSupported'):
delattr(self.machine, 'gpuNotSupported')
if hasattr(self.machine, 'gpuResults'):
delattr(self.machine, 'gpuResults')
rqd.rqconstants.ALLOW_GPU = True
@mock.patch.object(
rqd.rqconstants, 'ALLOW_GPU', new=mock.MagicMock(return_value=True))
@mock.patch('subprocess.getoutput',
new=mock.MagicMock(return_value='16130 MiB, 16119 MiB, 1'))
def test_getGpuStat(self):
self._resetGpuStat()
self.assertEqual(1, self.machine.getGpuCount())
self.assertEqual(16913531, self.machine.getGpuMemoryTotal())
self.assertEqual(16901997, self.machine.getGpuMemoryFree())

self.assertEqual(991232, self.machine.getGpuMemory())
@mock.patch.object(
rqd.rqconstants, 'ALLOW_GPU', new=mock.MagicMock(return_value=True))
@mock.patch('subprocess.getoutput',
new=mock.MagicMock(return_value="""\
16130 MiB, 16103 MiB, 8
16130 MiB, 16119 MiB, 8
16130 MiB, 16119 MiB, 8
16130 MiB, 16119 MiB, 8
16130 MiB, 4200 MiB, 8
16130 MiB, 16119 MiB, 8
16130 MiB, 16119 MiB, 8
16130 MiB, 16119 MiB, 8"""))
def test_multipleGpus(self):
self._resetGpuStat()
self.assertEqual(8, self.machine.getGpuCount())
self.assertEqual(135308248, self.machine.getGpuMemoryTotal())
self.assertEqual(122701222, self.machine.getGpuMemoryFree())

def test_getPathEnv(self):
self.assertEqual(
Expand All @@ -361,15 +367,11 @@ def test_reboot(self, popenMock):

popenMock.assert_called_with(['/usr/bin/sudo', '/sbin/reboot', '-f'])

@mock.patch(
'subprocess.getoutput',
new=mock.MagicMock(return_value=' TotalMem 1023 Mb FreeMem 968 Mb'))
def test_getHostInfo(self):
hostInfo = self.machine.getHostInfo()

self.assertEqual(4105212, hostInfo.free_swap)
self.assertEqual(25699176, hostInfo.free_mem)
self.assertEqual('991232', hostInfo.attributes['freeGpu'])
self.assertEqual('0', hostInfo.attributes['swapout'])
self.assertEqual(25, hostInfo.load)
self.assertEqual(False, hostInfo.nimby_enabled)
Expand Down

0 comments on commit a51ef0b

Please sign in to comment.