From da2fe011cb3a005df27a1ac053f6553619979266 Mon Sep 17 00:00:00 2001 From: Kern Attila GERMAIN <5556461+KernAttila@users.noreply.github.com> Date: Wed, 25 Sep 2024 20:31:01 +0200 Subject: [PATCH] Cuebot reserve all cores (#1313) **Link the Issue(s) this Pull Request is related to.** Fixes #1297 **Summarize your change.** As in many render engines, we should be able to set a negative core requirement. minCores=8 > reserve 8 cores minCores=0 > reserve all cores minCores=-2 > reserve all cores minus 2 This PR addresses this feature by handling negative core requests. Cuebot will try to match this number against the number of cores on each host. The frame will be booked only if all cores are available in this scenario. If the host is busy (even slightly), the frame is **not** booked, to avoid filling the remaining cores. **Testing** I would need some guidance to create proper tests for cuebot. **Screenshot** ![negative_cores](https://github.com/AcademySoftwareFoundation/OpenCue/assets/5556461/d9c4400c-824a-40cc-9ba9-2f76a3fd8ceb) Update: There is now a "ALL" text for zero cores, or "ALL (-2)" for negative cores reservation. ![core_reservation](https://github.com/user-attachments/assets/88802b15-3ccd-4cb5-90b7-58e532523ae6) (cuesubmit feature in another PR #1284) --------- Signed-off-by: Kern Attila GERMAIN <5556461+KernAttila@users.noreply.github.com> --- .../com/imageworks/spcue/DispatchHost.java | 48 ++++++++++++++++++- .../imageworks/spcue/LocalHostAssignment.java | 32 ++++++++++++- .../com/imageworks/spcue/SortableShow.java | 8 ++-- .../com/imageworks/spcue/VirtualProc.java | 18 ++++++- .../com/imageworks/spcue/dao/LayerDao.java | 27 ++++++----- .../spcue/dao/postgres/LayerDaoJdbc.java | 8 +++- .../spcue/dispatcher/CoreUnitDispatcher.java | 10 +++- .../spcue/dispatcher/HostReportHandler.java | 9 ++-- .../spcue/service/JobManagerService.java | 2 +- .../com/imageworks/spcue/service/JobSpec.java | 12 +++-- cuegui/cuegui/FilterDialog.py | 2 +- cuegui/cuegui/LayerMonitorTree.py | 14 +++++- cuesubmit/cuesubmit/Submission.py | 2 +- cuesubmit/cuesubmit/Validators.py | 2 +- cuesubmit/tests/Validators_tests.py | 1 + 15 files changed, 159 insertions(+), 36 deletions(-) diff --git a/cuebot/src/main/java/com/imageworks/spcue/DispatchHost.java b/cuebot/src/main/java/com/imageworks/spcue/DispatchHost.java index 495d0a9b1..f01724e17 100644 --- a/cuebot/src/main/java/com/imageworks/spcue/DispatchHost.java +++ b/cuebot/src/main/java/com/imageworks/spcue/DispatchHost.java @@ -24,9 +24,14 @@ import com.imageworks.spcue.grpc.host.LockState; import com.imageworks.spcue.util.CueUtil; +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.LogManager; + public class DispatchHost extends Entity implements HostInterface, FacilityInterface, ResourceContainer { + private static final Logger logger = LogManager.getLogger(DispatchHost.class); + public String facilityId; public String allocationId; public LockState lockState; @@ -76,12 +81,53 @@ public String getFacilityId() { return facilityId; } + public boolean canHandleNegativeCoresRequest(int requestedCores) { + // Request is positive, no need to test further. + if (requestedCores > 0) { + logger.debug(getName() + " can handle the job with " + requestedCores + " cores."); + return true; + } + // All cores are available, validate the request. + if (cores == idleCores) { + logger.debug(getName() + " can handle the job with " + requestedCores + " cores."); + return true; + } + // Some or all cores are busy, avoid booking again. + logger.debug(getName() + " cannot handle the job with " + requestedCores + " cores."); + return false; + } + + public int handleNegativeCoresRequirement(int requestedCores) { + // If we request a <=0 amount of cores, return positive core count. + // Request -2 on a 24 core machine will return 22. + + if (requestedCores > 0) { + // Do not process positive core requests. + logger.debug("Requested " + requestedCores + " cores."); + return requestedCores; + } + if (requestedCores <=0 && idleCores < cores) { + // If request is negative but cores are already used, return 0. + // We don't want to overbook the host. + logger.debug("Requested " + requestedCores + " cores, but the host is busy and cannot book more jobs."); + return 0; + } + // Book all cores minus the request + int totalCores = idleCores + requestedCores; + logger.debug("Requested " + requestedCores + " cores <= 0, " + + idleCores + " cores are free, booking " + totalCores + " cores"); + return totalCores; + } + @Override public boolean hasAdditionalResources(int minCores, long minMemory, int minGpus, long minGpuMemory) { - + minCores = handleNegativeCoresRequirement(minCores); if (idleCores < minCores) { return false; } + if (minCores <= 0) { + return false; + } else if (idleMemory < minMemory) { return false; } diff --git a/cuebot/src/main/java/com/imageworks/spcue/LocalHostAssignment.java b/cuebot/src/main/java/com/imageworks/spcue/LocalHostAssignment.java index 3e073fa73..65ce05c7e 100644 --- a/cuebot/src/main/java/com/imageworks/spcue/LocalHostAssignment.java +++ b/cuebot/src/main/java/com/imageworks/spcue/LocalHostAssignment.java @@ -22,6 +22,9 @@ import com.imageworks.spcue.dispatcher.ResourceContainer; import com.imageworks.spcue.grpc.renderpartition.RenderPartitionType; +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.LogManager; + /** * Contains information about local desktop cores a user has * assigned to the given job. @@ -33,6 +36,8 @@ public class LocalHostAssignment extends Entity implements ResourceContainer { + private static final Logger logger = LogManager.getLogger(LocalHostAssignment.class); + private int idleCoreUnits; private long idleMemory; private int idleGpuUnits; @@ -62,12 +67,37 @@ public LocalHostAssignment(int maxCores, int threads, long maxMemory, int maxGpu this.maxGpuMemory = maxGpuMemory; } + public int handleNegativeCoresRequirement(int requestedCores) { + // If we request a <=0 amount of cores, return positive core count. + // Request -2 on a 24 core machine will return 22. + + if (requestedCores > 0) { + // Do not process positive core requests. + logger.debug("Requested " + requestedCores + " cores."); + return requestedCores; + } + if (requestedCores <=0 && idleCoreUnits < threads) { + // If request is negative but cores are already used, return 0. + // We don't want to overbook the host. + logger.debug("Requested " + requestedCores + " cores, but the host is busy and cannot book more jobs."); + return 0; + } + // Book all cores minus the request + int totalCores = idleCoreUnits + requestedCores; + logger.debug("Requested " + requestedCores + " cores <= 0, " + + idleCoreUnits + " cores are free, booking " + totalCores + " cores"); + return totalCores; + } + @Override public boolean hasAdditionalResources(int minCores, long minMemory, int minGpus, long minGpuMemory) { - + minCores = handleNegativeCoresRequirement(minCores); if (idleCoreUnits < minCores) { return false; } + if (minCores <= 0) { + return false; + } else if (idleMemory < minMemory) { return false; } diff --git a/cuebot/src/main/java/com/imageworks/spcue/SortableShow.java b/cuebot/src/main/java/com/imageworks/spcue/SortableShow.java index f13fbaae2..83798f079 100644 --- a/cuebot/src/main/java/com/imageworks/spcue/SortableShow.java +++ b/cuebot/src/main/java/com/imageworks/spcue/SortableShow.java @@ -54,12 +54,12 @@ public boolean isSkipped(String tags, long cores, long memory) { try { if (failed.containsKey(tags)) { long [] mark = failed.get(tags); - if (cores <= mark[0]) { - logger.info("skipped due to not enough cores " + cores + " <= " + mark[0]); + if (cores < mark[0]) { + logger.info("skipped due to not enough cores " + cores + " < " + mark[0]); return true; } - else if (memory <= mark[1]) { - logger.info("skipped due to not enough memory " + memory + " <= " + mark[1]); + else if (memory < mark[1]) { + logger.info("skipped due to not enough memory " + memory + " < " + mark[1]); return true; } } diff --git a/cuebot/src/main/java/com/imageworks/spcue/VirtualProc.java b/cuebot/src/main/java/com/imageworks/spcue/VirtualProc.java index ea0f5b98e..8205f3021 100644 --- a/cuebot/src/main/java/com/imageworks/spcue/VirtualProc.java +++ b/cuebot/src/main/java/com/imageworks/spcue/VirtualProc.java @@ -22,8 +22,13 @@ import com.imageworks.spcue.dispatcher.Dispatcher; import com.imageworks.spcue.grpc.host.ThreadMode; +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.LogManager; + public class VirtualProc extends FrameEntity implements ProcInterface { + private static final Logger logger = LogManager.getLogger(VirtualProc.class); + public String hostId; public String allocationId; public String frameId; @@ -31,6 +36,7 @@ public class VirtualProc extends FrameEntity implements ProcInterface { public String os; public byte[] childProcesses; + public boolean canHandleNegativeCoresRequest; public int coresReserved; public long memoryReserved; public long memoryUsed; @@ -111,7 +117,17 @@ public static final VirtualProc build(DispatchHost host, DispatchFrame frame, St proc.coresReserved = proc.coresReserved + host.strandedCores; } - if (proc.coresReserved >= 100) { + proc.canHandleNegativeCoresRequest = host.canHandleNegativeCoresRequest(proc.coresReserved); + + if (proc.coresReserved == 0) { + logger.debug("Reserving all cores"); + proc.coresReserved = host.cores; + } + else if (proc.coresReserved < 0) { + logger.debug("Reserving all cores minus " + proc.coresReserved); + proc.coresReserved = host.cores + proc.coresReserved; + } + else if (proc.coresReserved >= 100) { int originalCores = proc.coresReserved; diff --git a/cuebot/src/main/java/com/imageworks/spcue/dao/LayerDao.java b/cuebot/src/main/java/com/imageworks/spcue/dao/LayerDao.java index 9343c3aa0..c4b07edf9 100644 --- a/cuebot/src/main/java/com/imageworks/spcue/dao/LayerDao.java +++ b/cuebot/src/main/java/com/imageworks/spcue/dao/LayerDao.java @@ -59,7 +59,7 @@ public interface LayerDao { public List getLayerDetails(JobInterface job); /** - * Returns true if supplied layer is compelte. + * Returns true if supplied layer is complete. * * @param layer * @return boolean @@ -82,7 +82,7 @@ public interface LayerDao { void insertLayerDetail(LayerDetail l); /** - * gets a layer detail from an object that implments layer + * gets a layer detail from an object that implements layer * * @param layer * @return LayerDetail @@ -167,7 +167,7 @@ public interface LayerDao { void updateLayerTags(LayerInterface layer, Set tags); /** - * Insert a key/valye pair into the layer environment + * Insert a key/value pair into the layer environment * * @param layer * @param key @@ -292,7 +292,7 @@ public interface LayerDao { /** * Update all layers of the set type in the specified job - * with the new min cores requirement. + * with the new min gpu requirement. * * @param job * @param gpus @@ -304,9 +304,8 @@ public interface LayerDao { * Update a layer's max cores value, which limits how * much threading can go on. * - * @param job - * @param cores - * @param type + * @param layer + * @param threadable */ void updateThreadable(LayerInterface layer, boolean threadable); @@ -314,7 +313,7 @@ public interface LayerDao { * Update a layer's timeout value, which limits how * much the frame can run on a host. * - * @param job + * @param layer * @param timeout */ void updateTimeout(LayerInterface layer, int timeout); @@ -323,8 +322,8 @@ public interface LayerDao { * Update a layer's LLU timeout value, which limits how * much the frame can run on a host without updates in the log file. * - * @param job - * @param timeout + * @param layer + * @param timeout_llu */ void updateTimeoutLLU(LayerInterface layer, int timeout_llu); @@ -341,7 +340,7 @@ public interface LayerDao { /** * Appends a tag to the current set of tags. If the tag - * already exists than nothing happens. + * already exists then nothing happens. * * @param layer * @param val @@ -363,8 +362,9 @@ public interface LayerDao { * Update layer usage with processor time usage. * This happens when the proc has completed or failed some work. * - * @param proc + * @param layer * @param newState + * @param exitStatus */ void updateUsage(LayerInterface layer, ResourceUsage usage, int exitStatus); @@ -387,6 +387,9 @@ public interface LayerDao { /** * Enable/disable memory optimizer. + * + * @param layer + * @param state */ void enableMemoryOptimizer(LayerInterface layer, boolean state); diff --git a/cuebot/src/main/java/com/imageworks/spcue/dao/postgres/LayerDaoJdbc.java b/cuebot/src/main/java/com/imageworks/spcue/dao/postgres/LayerDaoJdbc.java index f555bef6e..78753f578 100644 --- a/cuebot/src/main/java/com/imageworks/spcue/dao/postgres/LayerDaoJdbc.java +++ b/cuebot/src/main/java/com/imageworks/spcue/dao/postgres/LayerDaoJdbc.java @@ -51,8 +51,12 @@ import com.imageworks.spcue.util.CueUtil; import com.imageworks.spcue.util.SqlUtil; -public class LayerDaoJdbc extends JdbcDaoSupport implements LayerDao { +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.LogManager; + +public class LayerDaoJdbc extends JdbcDaoSupport implements LayerDao { + private static final Logger logger = LogManager.getLogger(LayerDaoJdbc.class); private static final String INSERT_OUTPUT_PATH = "INSERT INTO " + "layer_output " + @@ -77,7 +81,7 @@ public void insertLayerOutput(LayerInterface layer, String filespec) { "FROM " + "layer_output " + "WHERE " + - "pk_layer = ?" + + "pk_layer = ? " + "ORDER BY " + "ser_order"; diff --git a/cuebot/src/main/java/com/imageworks/spcue/dispatcher/CoreUnitDispatcher.java b/cuebot/src/main/java/com/imageworks/spcue/dispatcher/CoreUnitDispatcher.java index e55a76865..226d9466c 100644 --- a/cuebot/src/main/java/com/imageworks/spcue/dispatcher/CoreUnitDispatcher.java +++ b/cuebot/src/main/java/com/imageworks/spcue/dispatcher/CoreUnitDispatcher.java @@ -264,10 +264,16 @@ public List dispatchHost(DispatchHost host, JobInterface job) { VirtualProc proc = VirtualProc.build(host, frame, selfishServices); - if (host.idleCores < frame.minCores || + if (frame.minCores <= 0 && !proc.canHandleNegativeCoresRequest) { + logger.debug("Cannot dispatch job, host is busy."); + break; + } + + if (host.idleCores < host.handleNegativeCoresRequirement(frame.minCores) || host.idleMemory < frame.minMemory || host.idleGpus < frame.minGpus || host.idleGpuMemory < frame.minGpuMemory) { + logger.debug("Cannot dispatch, insufficient resources."); break; } @@ -283,6 +289,8 @@ public List dispatchHost(DispatchHost host, JobInterface job) { boolean success = new DispatchFrameTemplate(proc, job, frame, false) { public void wrapDispatchFrame() { + logger.debug("Dispatching frame with " + frame.minCores + " minCores on proc with " + + proc.coresReserved + " coresReserved"); dispatch(frame, proc); dispatchSummary(proc, frame, "Booking"); return; diff --git a/cuebot/src/main/java/com/imageworks/spcue/dispatcher/HostReportHandler.java b/cuebot/src/main/java/com/imageworks/spcue/dispatcher/HostReportHandler.java index 46d56929f..b0a7ccd9c 100644 --- a/cuebot/src/main/java/com/imageworks/spcue/dispatcher/HostReportHandler.java +++ b/cuebot/src/main/java/com/imageworks/spcue/dispatcher/HostReportHandler.java @@ -245,6 +245,7 @@ public void handleHostReport(HostReport report, boolean isBoot) { */ String msg = null; boolean hasLocalJob = bookingManager.hasLocalHostAssignment(host); + int coresToReserve = host.handleNegativeCoresRequirement(Dispatcher.CORE_POINTS_RESERVED_MIN); if (hasLocalJob) { List lcas = @@ -253,13 +254,13 @@ public void handleHostReport(HostReport report, boolean isBoot) { bookingManager.removeInactiveLocalHostAssignment(lca); } } - + if (!isTempDirStorageEnough(report.getHost().getTotalMcp(), report.getHost().getFreeMcp(), host.os)) { msg = String.format( - "%s doens't have enough free space in the temporary directory (mcp), %dMB", + "%s doesn't have enough free space in the temporary directory (mcp), %dMB", host.name, (report.getHost().getFreeMcp()/1024)); } - else if (host.idleCores < Dispatcher.CORE_POINTS_RESERVED_MIN) { + else if (coresToReserve <= 0 || host.idleCores < Dispatcher.CORE_POINTS_RESERVED_MIN) { msg = String.format("%s doesn't have enough idle cores, %d needs %d", host.name, host.idleCores, Dispatcher.CORE_POINTS_RESERVED_MIN); } @@ -268,7 +269,7 @@ else if (host.idleMemory < Dispatcher.MEM_RESERVED_MIN) { host.name, host.idleMemory, Dispatcher.MEM_RESERVED_MIN); } else if (report.getHost().getFreeMem() < CueUtil.MB512) { - msg = String.format("%s doens't have enough free system mem, %d needs %d", + msg = String.format("%s doesn't have enough free system mem, %d needs %d", host.name, report.getHost().getFreeMem(), Dispatcher.MEM_RESERVED_MIN); } else if(!host.hardwareState.equals(HardwareState.UP)) { diff --git a/cuebot/src/main/java/com/imageworks/spcue/service/JobManagerService.java b/cuebot/src/main/java/com/imageworks/spcue/service/JobManagerService.java index 844f67635..2c9c14425 100644 --- a/cuebot/src/main/java/com/imageworks/spcue/service/JobManagerService.java +++ b/cuebot/src/main/java/com/imageworks/spcue/service/JobManagerService.java @@ -274,7 +274,7 @@ public JobDetail createJob(BuildableJob buildableJob) { } } - if (layer.minimumCores < Dispatcher.CORE_POINTS_RESERVED_MIN) { + if (layer.minimumCores > 0 && layer.minimumCores < Dispatcher.CORE_POINTS_RESERVED_MIN) { layer.minimumCores = Dispatcher.CORE_POINTS_RESERVED_MIN; } diff --git a/cuebot/src/main/java/com/imageworks/spcue/service/JobSpec.java b/cuebot/src/main/java/com/imageworks/spcue/service/JobSpec.java index 7be581d1b..2e2fa0801 100644 --- a/cuebot/src/main/java/com/imageworks/spcue/service/JobSpec.java +++ b/cuebot/src/main/java/com/imageworks/spcue/service/JobSpec.java @@ -117,7 +117,7 @@ public class JobSpec { public JobSpec() { } - public static final String NAME_REGEX = "^([\\w\\.]{3,})$"; + public static final String NAME_REGEX = "^([\\w\\.-]{3,})$"; public static final Pattern NAME_PATTERN = Pattern.compile(NAME_REGEX); @@ -607,12 +607,16 @@ private void determineMinimumCores(Element layerTag, LayerDetail layer) { int corePoints = layer.minimumCores; if (cores.contains(".")) { - corePoints = (int) (Double.valueOf(cores) * 100 + .5); + if (cores.contains("-")) { + corePoints = (int) (Double.valueOf(cores) * 100 - .5); + } else { + corePoints = (int) (Double.valueOf(cores) * 100 + .5); + } } else { corePoints = Integer.valueOf(cores); } - if (corePoints < Dispatcher.CORE_POINTS_RESERVED_MIN) { + if (corePoints > 0 && corePoints < Dispatcher.CORE_POINTS_RESERVED_MIN) { corePoints = Dispatcher.CORE_POINTS_RESERVED_DEFAULT; } else if (corePoints > Dispatcher.CORE_POINTS_RESERVED_MAX) { @@ -649,7 +653,7 @@ private void determineChunkSize(Element layerTag, LayerDetail layer) { */ private void determineThreadable(Element layerTag, LayerDetail layer) { // Must have at least 1 core to thread. - if (layer.minimumCores < 100) { + if (layer.minimumCores > 0 && layer.minimumCores < 100) { layer.isThreadable = false; } else if (layerTag.getChildTextTrim("threadable") != null) { diff --git a/cuegui/cuegui/FilterDialog.py b/cuegui/cuegui/FilterDialog.py index 90c72e263..ab4d4d25b 100644 --- a/cuegui/cuegui/FilterDialog.py +++ b/cuegui/cuegui/FilterDialog.py @@ -459,7 +459,7 @@ def createAction(self): "Create Action", "What value should this property be set to?", 0, - 0, + -8, # Minimum core value can be <=0, booking all cores minus this value. 50000, 2) value = float(value) diff --git a/cuegui/cuegui/LayerMonitorTree.py b/cuegui/cuegui/LayerMonitorTree.py index 6ddd6cf18..0f110f874 100644 --- a/cuegui/cuegui/LayerMonitorTree.py +++ b/cuegui/cuegui/LayerMonitorTree.py @@ -67,10 +67,12 @@ def __init__(self, parent): data=lambda layer: displayRange(layer), tip="The range of frames that the layer should render.") self.addColumn("Cores", 45, id=6, - data=lambda layer: "%.2f" % layer.data.min_cores, + data=lambda layer: self.labelCoresColumn(layer.data.min_cores), sort=lambda layer: layer.data.min_cores, tip="The number of cores that the frames in this layer\n" - "will reserve as a minimum.") + "will reserve as a minimum." + "Zero or negative value indicate that the layer will use\n" + "all available cores on the machine, minus this value.") self.addColumn("Memory", 60, id=7, data=lambda layer: cuegui.Utils.memoryToString(layer.data.min_memory), sort=lambda layer: layer.data.min_memory, @@ -181,6 +183,14 @@ def updateRequest(self): since last updated""" self.ticksWithoutUpdate = 9999 + def labelCoresColumn(self, reserved_cores): + """Returns the reserved cores for a job""" + if reserved_cores > 0: + return "%.2f" % reserved_cores + if reserved_cores == 0: + return "ALL" + return "ALL (%.2f)" % reserved_cores + # pylint: disable=inconsistent-return-statements def setJob(self, job): """Sets the current job. diff --git a/cuesubmit/cuesubmit/Submission.py b/cuesubmit/cuesubmit/Submission.py index dc5e64d0f..f064b4cd3 100644 --- a/cuesubmit/cuesubmit/Submission.py +++ b/cuesubmit/cuesubmit/Submission.py @@ -97,7 +97,7 @@ def buildLayer(layerData, command, lastLayer=None): @type lastLayer: outline.layer.Layer @param lastLayer: layer that this new layer should be dependent on if dependType is set. """ - threadable = float(layerData.cores) >= 2 + threadable = float(layerData.cores) >= 2 or float(layerData.cores) <= 0 layer = outline.modules.shell.Shell( layerData.name, command=command.split(), chunk=layerData.chunk, threads=float(layerData.cores), range=str(layerData.layerRange), threadable=threadable) diff --git a/cuesubmit/cuesubmit/Validators.py b/cuesubmit/cuesubmit/Validators.py index 540f92e21..0b0bfb6f8 100644 --- a/cuesubmit/cuesubmit/Validators.py +++ b/cuesubmit/cuesubmit/Validators.py @@ -53,7 +53,7 @@ def matchNoSpaces(value): def matchNumbersOnly(value): """Matches strings with numbers and '.' only.""" - if re.match(r'^[0-9.]+$', value): + if re.match(r'^-?[0-9.]+$', value): return True return False diff --git a/cuesubmit/tests/Validators_tests.py b/cuesubmit/tests/Validators_tests.py index 0a5ef78eb..cbbf0b9cd 100644 --- a/cuesubmit/tests/Validators_tests.py +++ b/cuesubmit/tests/Validators_tests.py @@ -77,6 +77,7 @@ def testMatchNoSpaces(self): def testMatchNumbersOnly(self): self.assertTrue(matchNumbersOnly('0123')) self.assertTrue(matchNumbersOnly('3.14')) + self.assertTrue(matchNumbersOnly('-3.14')) # bit weird, but that's how the function is written self.assertTrue(matchNumbersOnly('800.555.555'))