Skip to content

Commit

Permalink
[rqd/cuebot] Hard and Soft memory limits (AcademySoftwareFoundation#1589
Browse files Browse the repository at this point in the history
)

Currently, frames are created with minimal requirements, but limits are
not enforced. This PR implements soft and hard limits on RQD when
running on docker mode.

For more information about soft and hard limits
[read](https://docs.docker.com/engine/containers/resource_constraints/).

Limits are calculated using the minimum memory defined for the layer and
a multiplier that can be tuned at Dispatcher.java. Ideally, these values
should be extracted to opencue.properties, but they are being used in a
static context, and interpreted before the file is actually read.

---------

Signed-off-by: Diego Tavares <[email protected]>
  • Loading branch information
DiegoTavares authored Nov 20, 2024
1 parent 633df41 commit c00d214
Show file tree
Hide file tree
Showing 16 changed files with 177 additions and 24 deletions.
5 changes: 4 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,7 @@ rest_gateway/*.tar\.gz
.eggs/*
/cuebot/bin/*
/logs/*
/.gradle/*
/.gradle/*
cuebot/.settings/*
cuebot/.classpath
cuebot/.project
2 changes: 1 addition & 1 deletion VERSION.in
Original file line number Diff line number Diff line change
@@ -1 +1 @@
1.2
1.3
22 changes: 21 additions & 1 deletion cuebot/src/main/java/com/imageworks/spcue/DispatchFrame.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
package com.imageworks.spcue;

import java.util.Optional;

import com.imageworks.spcue.dispatcher.Dispatcher;
import com.imageworks.spcue.grpc.job.FrameState;

public class DispatchFrame extends FrameEntity implements FrameInterface {
Expand All @@ -42,7 +44,6 @@ public class DispatchFrame extends FrameEntity implements FrameInterface {
public int minCores;
public int maxCores;
public boolean threadable;
public long minMemory;
public int minGpus;
public int maxGpus;
public long minGpuMemory;
Expand All @@ -52,5 +53,24 @@ public class DispatchFrame extends FrameEntity implements FrameInterface {

// The Operational System this frame is expected to run in
public String os;

// Memory requirement for this frame in bytes
private long minMemory;

// Soft limit to be enforced for this frame in bytes
public long softMemoryLimit;

// Hard limit to be enforced for this frame in bytes
public long hardMemoryLimit;

public void setMinMemory(long minMemory) {
this.minMemory = minMemory;
this.softMemoryLimit = (long)(((double)minMemory) * Dispatcher.SOFT_MEMORY_MULTIPLIER);
this.hardMemoryLimit = (long)(((double)minMemory) * Dispatcher.HARD_MEMORY_MULTIPLIER);
}

public long getMinMemory() {
return this.minMemory;
}
}

8 changes: 4 additions & 4 deletions cuebot/src/main/java/com/imageworks/spcue/VirtualProc.java
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ public static final VirtualProc build(DispatchHost host,
proc.isLocalDispatch = host.isLocalDispatch;

proc.coresReserved = frame.minCores;
proc.memoryReserved = frame.minMemory;
proc.memoryReserved = frame.getMinMemory();
proc.gpusReserved = frame.minGpus;
proc.gpuMemoryReserved = frame.minGpuMemory;

Expand Down Expand Up @@ -156,11 +156,11 @@ else if (proc.coresReserved >= 100) {
proc.coresReserved = wholeCores * 100;
}
else {
if (host.idleMemory - frame.minMemory
if (host.idleMemory - frame.getMinMemory()
<= Dispatcher.MEM_STRANDED_THRESHHOLD) {
proc.coresReserved = wholeCores * 100;
} else {
proc.coresReserved = getCoreSpan(host, frame.minMemory);
proc.coresReserved = getCoreSpan(host, frame.getMinMemory());
}
}
if (host.threadMode == ThreadMode.VARIABLE_VALUE
Expand Down Expand Up @@ -247,7 +247,7 @@ public static final VirtualProc build(DispatchHost host,
proc.isLocalDispatch = host.isLocalDispatch;

proc.coresReserved = lja.getThreads() * 100;
proc.memoryReserved = frame.minMemory;
proc.memoryReserved = frame.getMinMemory();
proc.gpusReserved = frame.minGpus;
proc.gpuMemoryReserved = frame.minGpuMemory;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,7 @@ public DispatchFrame mapRow(ResultSet rs, int rowNum) throws SQLException {
frame.minCores = rs.getInt("int_cores_min");
frame.maxCores = rs.getInt("int_cores_max");
frame.threadable = rs.getBoolean("b_threadable");
frame.minMemory = rs.getLong("int_mem_min");
frame.setMinMemory(rs.getLong("int_mem_min"));
frame.minGpus = rs.getInt("int_gpus_min");
frame.maxGpus = rs.getInt("int_gpus_max");
frame.minGpuMemory = rs.getLong("int_gpu_mem_min");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ public List<VirtualProc> dispatchHost(DispatchHost host, JobInterface job) {
}

if (host.idleCores < host.handleNegativeCoresRequirement(frame.minCores) ||
host.idleMemory < frame.minMemory ||
host.idleMemory < frame.getMinMemory() ||
host.idleGpus < frame.minGpus ||
host.idleGpuMemory < frame.minGpuMemory) {
logger.debug("Cannot dispatch, insufficient resources.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -396,6 +396,8 @@ public RunFrame prepareRqdRunFrame(VirtualProc proc, DispatchFrame frame) {
.setStartTime(System.currentTimeMillis())
.setIgnoreNimby(proc.isLocalDispatch)
.setOs(proc.os)
.setSoftMemoryLimit(frame.softMemoryLimit)
.setHardMemoryLimit(frame.hardMemoryLimit)
.putAllEnvironment(jobDao.getEnvironment(frame))
.putAllEnvironment(layerDao.getLayerEnvironment(frame))
.putEnvironment("CUE3", "1")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,9 @@ public interface Dispatcher {
// Upgrade the memory on the layer by 1g and retry.
public static final int EXIT_STATUS_MEMORY_FAILURE = 33;

// Upgrade the memory on the layer by 1g and retry.
public static final int DOCKER_EXIT_STATUS_MEMORY_FAILURE = 137;

// max retry time
public static final int FRAME_TIME_NO_RETRY = 3600 * 8;

Expand Down Expand Up @@ -112,6 +115,9 @@ public interface Dispatcher {
// memory
public static final long MINIMUM_MEMORY_INCREASE = CueUtil.GB2;

public static final double SOFT_MEMORY_MULTIPLIER = 1.1;
public static final double HARD_MEMORY_MULTIPLIER = 1.4;

/**
* Dispatch a host to the facility.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -320,14 +320,15 @@ public void handlePostFrameCompleteOperations(VirtualProc proc,
}

/*
* An exit status of 33 indicates that the frame was killed by the
* Some exit statuses indicate that a frame was killed by the
* application due to a memory issue and should be retried. In this
* case, disable the optimizer and raise the memory by what is
* specified in the show's service override, service or 2GB.
*/
if (report.getExitStatus() == Dispatcher.EXIT_STATUS_MEMORY_FAILURE
|| report.getExitSignal() == Dispatcher.EXIT_STATUS_MEMORY_FAILURE
|| frameDetail.exitStatus == Dispatcher.EXIT_STATUS_MEMORY_FAILURE) {
|| frameDetail.exitStatus == Dispatcher.EXIT_STATUS_MEMORY_FAILURE
|| report.getExitStatus() == Dispatcher.DOCKER_EXIT_STATUS_MEMORY_FAILURE) {
long increase = CueUtil.GB2;

// since there can be multiple services, just going for the
Expand Down Expand Up @@ -641,7 +642,8 @@ else if (frame.state.equals(FrameState.DEAD)) {
newState = FrameState.DEAD;
} else if (frame.retries >= job.maxRetries) {
if (!(report.getExitStatus() == Dispatcher.EXIT_STATUS_MEMORY_FAILURE
|| report.getExitSignal() == Dispatcher.EXIT_STATUS_MEMORY_FAILURE))
|| report.getExitSignal() == Dispatcher.EXIT_STATUS_MEMORY_FAILURE
|| report.getExitStatus() == Dispatcher.DOCKER_EXIT_STATUS_MEMORY_FAILURE))
newState = FrameState.DEAD;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ private List<VirtualProc> dispatchHost(DispatchHost host, JobInterface job,
* not move on.
*/
if (!lha.hasAdditionalResources(lha.getThreads() * 100,
frame.minMemory,
frame.getMinMemory(),
frame.minGpus,
frame.minGpuMemory)) {
continue;
Expand Down Expand Up @@ -209,7 +209,7 @@ private List<VirtualProc> dispatchHost(DispatchHost host, LayerInterface layer,
* not move on.
*/
if (!lha.hasAdditionalResources(lha.getThreads() * 100,
frame.minMemory,
frame.getMinMemory(),
frame.minGpus,
frame.minGpuMemory)) {
continue;
Expand Down Expand Up @@ -294,7 +294,7 @@ private List<VirtualProc> dispatchHost(DispatchHost host, FrameInterface frame,
*/
DispatchFrame dframe = jobManager.getDispatchFrame(frame.getId());
if (!lha.hasAdditionalResources(lha.getMaxCoreUnits(),
dframe.minMemory,
dframe.getMinMemory(),
lha.getMaxGpuUnits(),
dframe.minGpuMemory)) {
return procs;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ public void testGpuReportOver() {
(jobManager.isLayerComplete(layer1_0) ? 1 : 0) +
(jobManager.isLayerComplete(layer2_0) ? 1 : 0));
assertEquals(1,
(jobManager.isJobComplete(job1) ? 1 : 0) +
(jobManager.isJobComplete(job1) ? 1 : 0) +
(jobManager.isJobComplete(job2) ? 1 : 0));
}

Expand Down Expand Up @@ -415,6 +415,63 @@ private void executeMinMemIncrease(int expected, boolean override) {
assertEquals(expected, ulayer.getMinimumMemory());
}



private void executeMinMemIncreaseDocker(int expected, boolean override) {
if (override) {
ServiceOverrideEntity soe = new ServiceOverrideEntity();
soe.showId = "00000000-0000-0000-0000-000000000000";
soe.name = "apitest";
soe.threadable = false;
soe.minCores = 10;
soe.minMemory = (int) CueUtil.GB2;
soe.tags = new LinkedHashSet<>();
soe.tags.add("general");
soe.minMemoryIncrease = (int) CueUtil.GB8;

serviceManager.createService(soe);
}

String jobName = "pipe-default-testuser_min_mem_test";
JobDetail job = jobManager.findJobDetail(jobName);
LayerDetail layer = layerDao.findLayerDetail(job, "test_layer");
FrameDetail frame = frameDao.findFrameDetail(job, "0000-test_layer");
jobManager.setJobPaused(job, false);

DispatchHost host = getHost(HOSTNAME2);
List<VirtualProc> procs = dispatcher.dispatchHost(host);
assertEquals(1, procs.size());
VirtualProc proc = procs.get(0);
assertEquals(job.getId(), proc.getJobId());
assertEquals(layer.getId(), proc.getLayerId());
assertEquals(frame.getId(), proc.getFrameId());

RunningFrameInfo info = RunningFrameInfo.newBuilder()
.setJobId(proc.getJobId())
.setLayerId(proc.getLayerId())
.setFrameId(proc.getFrameId())
.setResourceId(proc.getProcId())
.build();
FrameCompleteReport report = FrameCompleteReport.newBuilder()
.setFrame(info)
.setExitStatus(Dispatcher.DOCKER_EXIT_STATUS_MEMORY_FAILURE)
.build();

DispatchJob dispatchJob = jobManager.getDispatchJob(proc.getJobId());
DispatchFrame dispatchFrame = jobManager.getDispatchFrame(report.getFrame().getFrameId());
FrameDetail frameDetail = jobManager.getFrameDetail(report.getFrame().getFrameId());
dispatchSupport.stopFrame(dispatchFrame, FrameState.DEAD, report.getExitStatus(),
report.getFrame().getMaxRss());
frameCompleteHandler.handlePostFrameCompleteOperations(proc,
report, dispatchJob, dispatchFrame, FrameState.WAITING, frameDetail);

assertFalse(jobManager.isLayerComplete(layer));

JobDetail ujob = jobManager.findJobDetail(jobName);
LayerDetail ulayer = layerDao.findLayerDetail(ujob, "test_layer");
assertEquals(expected, ulayer.getMinimumMemory());
}

@Test
@Transactional
@Rollback(true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public void testCoreAndMemorySaturation1() {
DispatchFrame frame = new DispatchFrame();
frame.services = "NOTarnold";
frame.minCores = 100;
frame.minMemory = CueUtil.GB * 7;
frame.setMinMemory(CueUtil.GB * 7);
frame.threadable = true;

VirtualProc proc = VirtualProc.build(host, frame);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public void testCoreSpan() {

DispatchFrame frame = new DispatchFrame();
frame.minCores = 100;
frame.minMemory = CueUtil.GB * 7;
frame.setMinMemory(CueUtil.GB * 7);
frame.threadable = true;

VirtualProc proc = VirtualProc.build(host, frame);
Expand All @@ -70,7 +70,7 @@ public void testCoreSpanTest1(){

DispatchFrame frame = new DispatchFrame();
frame.minCores = 100;
frame.minMemory = CueUtil.GB;
frame.setMinMemory(CueUtil.GB);

VirtualProc proc = VirtualProc.build(host, frame);
assertEquals(100, proc.coresReserved);
Expand All @@ -84,7 +84,7 @@ public void testCoreSpanTest2() {

DispatchFrame frame = new DispatchFrame();
frame.minCores = 100;
frame.minMemory = CueUtil.GB4;
frame.setMinMemory(CueUtil.GB4);
frame.threadable = true;

VirtualProc proc = VirtualProc.build(host, frame);
Expand All @@ -102,7 +102,7 @@ public void testCoreSpanTest3() {

DispatchFrame frame = new DispatchFrame();
frame.minCores = 100;
frame.minMemory = memReservedDefault;
frame.setMinMemory(memReservedDefault);
frame.threadable = true;

VirtualProc proc = VirtualProc.build(host, frame);
Expand All @@ -117,7 +117,7 @@ public void testCoreSpanTest4() {

DispatchFrame frame = new DispatchFrame();
frame.minCores = 100;
frame.minMemory = CueUtil.GB * 8;
frame.setMinMemory(CueUtil.GB * 8);
frame.threadable = true;

VirtualProc proc = VirtualProc.build(host, frame);
Expand All @@ -141,7 +141,7 @@ public void testBuildVirtualProc() {

DispatchFrame frame = new DispatchFrame();
frame.minCores = 100;
frame.minMemory = memReservedDefault;
frame.setMinMemory(memReservedDefault);
frame.threadable = true;

proc = VirtualProc.build(host, frame);
Expand Down
2 changes: 2 additions & 0 deletions proto/rqd.proto
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,8 @@ message RunFrame {
int32 num_gpus = 23;
report.ChildrenProcStats children = 24;
string os = 25;
int64 soft_memory_limit = 26;
int64 hard_memory_limit = 27;
}

message RunFrameSeq {
Expand Down
16 changes: 16 additions & 0 deletions rqd/rqd/rqcore.py
Original file line number Diff line number Diff line change
Expand Up @@ -1039,6 +1039,20 @@ def runDocker(self):
command.replace(tempPassword, "[password]").replace(";", "\n"),
prependTimestamp=rqd.rqconstants.RQD_PREPEND_TIMESTAMP)

# Handle memory limits. Cuebot users KB docker uses Bytes.
# Docker min requirement is 6MB, if request is bellow limit, give the frame a reasonable
# amount of memory.
soft_memory_limit = runFrame.soft_memory_limit * 1000
if soft_memory_limit <= 6291456:
logging.warning("Frame requested %s bytes of soft_memory_limit, which is lower than "
"minimum required. Running with 1MB", soft_memory_limit)
soft_memory_limit = "1GB"
hard_memory_limit = runFrame.hard_memory_limit * 1000
if hard_memory_limit <= 6291456:
logging.warning("Frame requested %s bytes of hard_memory_limit, which is lower than "
"minimum required. Running with 2MB", hard_memory_limit)
hard_memory_limit = "2GB"

# Write command to a file on the job tmpdir to simplify replaying a frame
command = self._createCommandFile(command)
docker_client = self.rqCore.docker.from_env()
Expand All @@ -1058,6 +1072,8 @@ def runDocker(self):
network="host",
stderr=True,
hostname=self.frameEnv["jobhost"],
mem_reservation=soft_memory_limit,
mem_limit=hard_memory_limit,
entrypoint=command)

log_stream = container.logs(stream=True)
Expand Down
Loading

0 comments on commit c00d214

Please sign in to comment.