Skip to content

Commit

Permalink
Frames killed for OOM should be retried
Browse files Browse the repository at this point in the history
(cherry picked from commit b88f7bcb1ad43f83fb8357576c33483dc2bf4952)
  • Loading branch information
DiegoTavares committed Oct 4, 2023
1 parent 4377272 commit a4c832d
Show file tree
Hide file tree
Showing 11 changed files with 222 additions and 87 deletions.
7 changes: 7 additions & 0 deletions cuebot/src/main/java/com/imageworks/spcue/dao/FrameDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,13 @@ boolean updateFrameStopped(FrameInterface frame, FrameState state, int exitStatu
* @return
*/
boolean updateFrameCleared(FrameInterface frame);
/**
* Sets a frame exitStatus to EXIT_STATUS_MEMORY_FAILURE
*
* @param frame
* @return
*/
boolean updateFrameMemoryError(FrameInterface frame);

/**
* Sets a frame to an unreserved waiting state.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,24 @@ public boolean updateFrameCleared(FrameInterface frame) {
return updateFrame(frame, Dispatcher.EXIT_STATUS_FRAME_CLEARED) > 0;
}

private static final String UPDATE_FRAME_MEMORY_ERROR =
"UPDATE "+
"frame "+
"SET " +
"int_exit_status = ?, " +
"int_version = int_version + 1 " +
"WHERE " +
"frame.pk_frame = ? ";
@Override
public boolean updateFrameMemoryError(FrameInterface frame) {
int result = getJdbcTemplate().update(
UPDATE_FRAME_MEMORY_ERROR,
Dispatcher.EXIT_STATUS_MEMORY_FAILURE,
frame.getFrameId());

return result > 0;
}

private static final String UPDATE_FRAME_STARTED =
"UPDATE " +
"frame " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -415,6 +415,13 @@ List<DispatchFrame> findNextDispatchFrames(LayerInterface layer, VirtualProc pro
*/
void clearFrame(DispatchFrame frame);

/**
* Sets the frame state exitStatus to EXIT_STATUS_MEMORY_FAILURE
*
* @param frame
*/
void updateFrameMemoryError(FrameInterface frame);

/**
* Update Memory usage data and LLU time for the given frame.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,12 @@ public void clearFrame(DispatchFrame frame) {
frameDao.updateFrameCleared(frame);
}

@Override
@Transactional(propagation = Propagation.REQUIRED)
public void updateFrameMemoryError(FrameInterface frame) {
frameDao.updateFrameMemoryError(frame);
}

@Transactional(propagation = Propagation.SUPPORTS)
public RunFrame prepareRqdRunFrame(VirtualProc proc, DispatchFrame frame) {
int threads = proc.coresReserved / 100;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,49 +143,35 @@ public void handleFrameCompleteReport(final FrameCompleteReport report) {
}

try {

final VirtualProc proc;

try {

proc = hostManager.getVirtualProc(
report.getFrame().getResourceId());
}
catch (EmptyResultDataAccessException e) {
/*
* Do not propagate this exception to RQD. This
* usually means the cue lost connectivity to
* the host and cleared out the record of the proc.
* If this is propagated back to RQD, RQD will
* keep retrying the operation forever.
*/
logger.info("failed to acquire data needed to " +
"process completed frame: " +
report.getFrame().getFrameName() + " in job " +
report.getFrame().getJobName() + "," + e);
return;
}

final VirtualProc proc = hostManager.getVirtualProc(report.getFrame().getResourceId());
final DispatchJob job = jobManager.getDispatchJob(proc.getJobId());
final LayerDetail layer = jobManager.getLayerDetail(report.getFrame().getLayerId());
final FrameDetail frameDetail = jobManager.getFrameDetail(report.getFrame().getFrameId());
final DispatchFrame frame = jobManager.getDispatchFrame(report.getFrame().getFrameId());
final FrameState newFrameState = determineFrameState(job, layer, frame, report);
final String key = proc.getJobId() + "_" + report.getFrame().getLayerId() +
"_" + report.getFrame().getFrameId();

if (dispatchSupport.stopFrame(frame, newFrameState, report.getExitStatus(),
report.getFrame().getMaxRss())) {
dispatchQueue.execute(new KeyRunnable(key) {
@Override
public void run() {
try {
handlePostFrameCompleteOperations(proc, report, job, frame,
newFrameState);
} catch (Exception e) {
logger.warn("Exception during handlePostFrameCompleteOperations " +
"in handleFrameCompleteReport" + CueExceptionUtil.getStackTrace(e));
if (dispatcher.isTestMode()) {
// Database modifications on a threadpool cannot be captured by the test thread
handlePostFrameCompleteOperations(proc, report, job, frame,
newFrameState, frameDetail);
} else {
dispatchQueue.execute(new KeyRunnable(key) {
@Override
public void run() {
try {
handlePostFrameCompleteOperations(proc, report, job, frame,
newFrameState, frameDetail);
} catch (Exception e) {
logger.warn("Exception during handlePostFrameCompleteOperations " +
"in handleFrameCompleteReport" + CueExceptionUtil.getStackTrace(e));
}
}
}
});
});
}
}
else {
/*
Expand Down Expand Up @@ -222,6 +208,19 @@ public void run() {
}
}
}
catch (EmptyResultDataAccessException e) {
/*
* Do not propagate this exception to RQD. This
* usually means the cue lost connectivity to
* the host and cleared out the record of the proc.
* If this is propagated back to RQD, RQD will
* keep retrying the operation forever.
*/
logger.info("failed to acquire data needed to " +
"process completed frame: " +
report.getFrame().getFrameName() + " in job " +
report.getFrame().getJobName() + "," + e);
}
catch (Exception e) {

/*
Expand Down Expand Up @@ -259,7 +258,7 @@ public void run() {
*/
public void handlePostFrameCompleteOperations(VirtualProc proc,
FrameCompleteReport report, DispatchJob job, DispatchFrame frame,
FrameState newFrameState) {
FrameState newFrameState, FrameDetail frameDetail) {
try {

/*
Expand Down Expand Up @@ -313,7 +312,8 @@ public void handlePostFrameCompleteOperations(VirtualProc proc,
* specified in the show's service override, service or 2GB.
*/
if (report.getExitStatus() == Dispatcher.EXIT_STATUS_MEMORY_FAILURE
|| report.getExitSignal() == Dispatcher.EXIT_STATUS_MEMORY_FAILURE) {
|| report.getExitSignal() == Dispatcher.EXIT_STATUS_MEMORY_FAILURE
|| frameDetail.exitStatus == Dispatcher.EXIT_STATUS_MEMORY_FAILURE) {
long increase = CueUtil.GB2;

// since there can be multiple services, just going for the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@
import com.imageworks.spcue.service.CommentManager;
import com.imageworks.spcue.service.HostManager;
import com.imageworks.spcue.service.JobManager;
import com.imageworks.spcue.service.JobManagerSupport;
import com.imageworks.spcue.util.CueExceptionUtil;
import com.imageworks.spcue.util.CueUtil;

Expand All @@ -83,7 +82,6 @@ public class HostReportHandler {
private Dispatcher localDispatcher;
private RqdClient rqdClient;
private JobManager jobManager;
private JobManagerSupport jobManagerSupport;
private JobDao jobDao;
private LayerDao layerDao;
@Autowired
Expand Down Expand Up @@ -497,7 +495,7 @@ private void handleMemoryUsage(final DispatchHost host, final HostReport report)
// them accordingly
for (final RunningFrameInfo frame: runningFrames) {
if (isFrameOverboard(frame)) {
if (!killFrame(frame, host.getName())) {
if (!killFrameOverusingMemory(frame, host.getName())) {
logger.warn("Frame " + frame.getJobName() + "." + frame.getFrameName() +
" is overboard but could not be killed");
}
Expand All @@ -508,7 +506,7 @@ private void handleMemoryUsage(final DispatchHost host, final HostReport report)
}
}

private boolean killFrame(RunningFrameInfo frame, String hostName) {
private boolean killFrameOverusingMemory(RunningFrameInfo frame, String hostname) {
try {
VirtualProc proc = hostManager.getVirtualProc(frame.getResourceId());

Expand All @@ -519,13 +517,30 @@ private boolean killFrame(RunningFrameInfo frame, String hostName) {

logger.info("Killing frame on " + frame.getJobName() + "." + frame.getFrameName() +
", using too much memory.");
DispatchSupport.killedOffenderProcs.incrementAndGet();
return killProcForMemory(proc, hostname, "This frame is using way more than it had reserved.");
} catch (EmptyResultDataAccessException e) {
return false;
}
}

if (!dispatcher.isTestMode()) {
jobManagerSupport.kill(proc, new Source("This frame is using way more than it had reserved."));
private boolean killProcForMemory(VirtualProc proc, String hostname, String reason) {
try {
FrameInterface frame = jobManager.getFrame(proc.frameId);

if (dispatcher.isTestMode()) {
// For testing, don't run on a different threadpool, as different threads don't share
// the same database state
(new DispatchRqdKillFrameMemory(proc.hostName, frame, reason, rqdClient,
dispatchSupport, dispatcher.isTestMode())).run();
} else {
killQueue.execute(new DispatchRqdKillFrameMemory(proc.hostName, frame, reason, rqdClient,
dispatchSupport, dispatcher.isTestMode()));
prometheusMetrics.incrementFrameOomKilledCounter(hostname);
}
DispatchSupport.killedOffenderProcs.incrementAndGet();
return true;
} catch (EmptyResultDataAccessException e) {
} catch (TaskRejectedException e) {
logger.warn("Unable to queue RQD kill, task rejected, " + e);
return false;
}
}
Expand All @@ -534,26 +549,23 @@ private boolean killFrame(RunningFrameInfo frame, String hostName) {
* Kill proc with the worst user/reserved memory ratio.
*
* @param host
* @return killed proc, or null if none could be found
* @return killed proc, or null if none could be found or failed to be killed
*/
private VirtualProc killWorstMemoryOffender(final DispatchHost host) {
VirtualProc proc;
try {
proc = hostManager.getWorstMemoryOffender(host);
VirtualProc proc = hostManager.getWorstMemoryOffender(host);
logger.info("Killing frame on " + proc.getName() + ", host is under stress.");

if (!killProcForMemory(proc, host.getName(), "The host was dangerously low on memory and swapping.")) {
// Returning null will prevent the caller from overflowing the kill queue with more messages
proc = null;
}
return proc;
}
catch (EmptyResultDataAccessException e) {
logger.error(host.name + " is under OOM and no proc is running on it.");
return null;
}

logger.info("Killing frame on " + proc.getName() + ", host is under stress.");
DispatchSupport.killedOffenderProcs.incrementAndGet();

if (!dispatcher.isTestMode()) {
jobManagerSupport.kill(proc, new Source("The host was dangerously low on memory and swapping."));
}

return proc;
}

/**
Expand Down Expand Up @@ -962,14 +974,6 @@ public void setJobManager(JobManager jobManager) {
this.jobManager = jobManager;
}

public JobManagerSupport getJobManagerSupport() {
return jobManagerSupport;
}

public void setJobManagerSupport(JobManagerSupport jobManagerSupport) {
this.jobManagerSupport = jobManagerSupport;
}

public JobDao getJobDao() {
return jobDao;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,7 @@ public class DispatchRqdKillFrame extends KeyRunnable {

private static final Logger logger = LogManager.getLogger(DispatchRqdKillFrame.class);

private VirtualProc proc = null;
private String message;

private String hostname;
private String frameId;

Expand All @@ -47,28 +45,14 @@ public DispatchRqdKillFrame(String hostname, String frameId, String message, Rqd
this.rqdClient = rqdClient;
}

public DispatchRqdKillFrame(VirtualProc proc, String message, RqdClient rqdClient) {
super("disp_rqd_kill_frame_" + proc.getProcId() + "_" + rqdClient.toString());
this.proc = proc;
this.hostname = proc.hostName;
this.message = message;
this.rqdClient = rqdClient;
}

@Override
public void run() {
long startTime = System.currentTimeMillis();
try {
if (proc != null) {
rqdClient.killFrame(proc, message);
}
else {
rqdClient.killFrame(hostname, frameId, message);
}
rqdClient.killFrame(hostname, frameId, message);
} catch (RqdClientException e) {
logger.info("Failed to contact host " + hostname + ", " + e);
}
finally {
} finally {
long elapsedTime = System.currentTimeMillis() - startTime;
logger.info("RQD communication with " + hostname +
" took " + elapsedTime + "ms");
Expand Down
Loading

0 comments on commit a4c832d

Please sign in to comment.