diff --git a/cuebot/src/main/java/com/imageworks/spcue/FrameInterface.java b/cuebot/src/main/java/com/imageworks/spcue/FrameInterface.java index eff22768f..945685444 100644 --- a/cuebot/src/main/java/com/imageworks/spcue/FrameInterface.java +++ b/cuebot/src/main/java/com/imageworks/spcue/FrameInterface.java @@ -1,4 +1,3 @@ - /* * Copyright Contributors to the OpenCue Project * diff --git a/cuebot/src/main/java/com/imageworks/spcue/dao/FrameDao.java b/cuebot/src/main/java/com/imageworks/spcue/dao/FrameDao.java index 7c2e3c050..4dbb0e987 100644 --- a/cuebot/src/main/java/com/imageworks/spcue/dao/FrameDao.java +++ b/cuebot/src/main/java/com/imageworks/spcue/dao/FrameDao.java @@ -202,6 +202,13 @@ boolean updateFrameStopped(FrameInterface frame, FrameState state, int exitStatu * @return */ boolean updateFrameCleared(FrameInterface frame); + /** + * Sets a frame exitStatus to EXIT_STATUS_MEMORY_FAILURE + * + * @param frame + * @return whether the frame has been updated + */ + boolean updateFrameMemoryError(FrameInterface frame); /** * Sets a frame to an unreserved waiting state. diff --git a/cuebot/src/main/java/com/imageworks/spcue/dao/HostDao.java b/cuebot/src/main/java/com/imageworks/spcue/dao/HostDao.java index 5ed18947e..94ba316b1 100644 --- a/cuebot/src/main/java/com/imageworks/spcue/dao/HostDao.java +++ b/cuebot/src/main/java/com/imageworks/spcue/dao/HostDao.java @@ -252,15 +252,6 @@ public interface HostDao { */ void updateThreadMode(HostInterface host, ThreadMode mode); - /** - * When a host is in kill mode that means its 256MB+ into the swap and the - * the worst memory offender is killed. - * - * @param h HostInterface - * @return boolean - */ - boolean isKillMode(HostInterface h); - /** * Update the specified host's hardware information. * diff --git a/cuebot/src/main/java/com/imageworks/spcue/dao/postgres/FrameDaoJdbc.java b/cuebot/src/main/java/com/imageworks/spcue/dao/postgres/FrameDaoJdbc.java index 3c752ad96..21c197a3b 100644 --- a/cuebot/src/main/java/com/imageworks/spcue/dao/postgres/FrameDaoJdbc.java +++ b/cuebot/src/main/java/com/imageworks/spcue/dao/postgres/FrameDaoJdbc.java @@ -155,6 +155,24 @@ public boolean updateFrameCleared(FrameInterface frame) { return updateFrame(frame, Dispatcher.EXIT_STATUS_FRAME_CLEARED) > 0; } + private static final String UPDATE_FRAME_MEMORY_ERROR = + "UPDATE "+ + "frame "+ + "SET " + + "int_exit_status = ?, " + + "int_version = int_version + 1 " + + "WHERE " + + "frame.pk_frame = ? "; + @Override + public boolean updateFrameMemoryError(FrameInterface frame) { + int result = getJdbcTemplate().update( + UPDATE_FRAME_MEMORY_ERROR, + Dispatcher.EXIT_STATUS_MEMORY_FAILURE, + frame.getFrameId()); + + return result > 0; + } + private static final String UPDATE_FRAME_STARTED = "UPDATE " + "frame " + diff --git a/cuebot/src/main/java/com/imageworks/spcue/dao/postgres/HostDaoJdbc.java b/cuebot/src/main/java/com/imageworks/spcue/dao/postgres/HostDaoJdbc.java index 6fe898b44..f703416b2 100644 --- a/cuebot/src/main/java/com/imageworks/spcue/dao/postgres/HostDaoJdbc.java +++ b/cuebot/src/main/java/com/imageworks/spcue/dao/postgres/HostDaoJdbc.java @@ -612,15 +612,6 @@ public void updateHostOs(HostInterface host, String os) { os, host.getHostId()); } - @Override - public boolean isKillMode(HostInterface h) { - return getJdbcTemplate().queryForObject( - "SELECT COUNT(1) FROM host_stat WHERE pk_host = ? " + - "AND int_swap_total - int_swap_free > ? AND int_mem_free < ?", - Integer.class, h.getHostId(), Dispatcher.KILL_MODE_SWAP_THRESHOLD, - Dispatcher.KILL_MODE_MEM_THRESHOLD) > 0; - } - @Override public int getStrandedCoreUnits(HostInterface h) { try { diff --git a/cuebot/src/main/java/com/imageworks/spcue/dao/postgres/ProcDaoJdbc.java b/cuebot/src/main/java/com/imageworks/spcue/dao/postgres/ProcDaoJdbc.java index 5af292fb3..8f6322690 100644 --- a/cuebot/src/main/java/com/imageworks/spcue/dao/postgres/ProcDaoJdbc.java +++ b/cuebot/src/main/java/com/imageworks/spcue/dao/postgres/ProcDaoJdbc.java @@ -564,7 +564,7 @@ public boolean increaseReservedMemory(ProcInterface p, long value) { value, p.getProcId(), value) == 1; } catch (Exception e) { // check by trigger erify_host_resources - throw new ResourceReservationFailureException("failed to increase memory reserveration for proc " + throw new ResourceReservationFailureException("failed to increase memory reservation for proc " + p.getProcId() + " to " + value + ", proc does not have that much memory to spare."); } } diff --git a/cuebot/src/main/java/com/imageworks/spcue/dispatcher/DispatchSupport.java b/cuebot/src/main/java/com/imageworks/spcue/dispatcher/DispatchSupport.java index 4accd0f8d..aa20e6266 100644 --- a/cuebot/src/main/java/com/imageworks/spcue/dispatcher/DispatchSupport.java +++ b/cuebot/src/main/java/com/imageworks/spcue/dispatcher/DispatchSupport.java @@ -415,6 +415,14 @@ List findNextDispatchFrames(LayerInterface layer, VirtualProc pro */ void clearFrame(DispatchFrame frame); + /** + * Sets the frame state exitStatus to EXIT_STATUS_MEMORY_FAILURE + * + * @param frame + * @return whether the frame has been updated + */ + boolean updateFrameMemoryError(FrameInterface frame); + /** * Update Memory usage data and LLU time for the given frame. * diff --git a/cuebot/src/main/java/com/imageworks/spcue/dispatcher/DispatchSupportService.java b/cuebot/src/main/java/com/imageworks/spcue/dispatcher/DispatchSupportService.java index 887d0c29e..713f6c86c 100644 --- a/cuebot/src/main/java/com/imageworks/spcue/dispatcher/DispatchSupportService.java +++ b/cuebot/src/main/java/com/imageworks/spcue/dispatcher/DispatchSupportService.java @@ -42,6 +42,7 @@ import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.LogManager; import org.springframework.dao.EmptyResultDataAccessException; +import org.springframework.dao.DataAccessException; import org.springframework.transaction.annotation.Propagation; import org.springframework.transaction.annotation.Transactional; @@ -184,7 +185,11 @@ public boolean increaseReservedMemory(ProcInterface p, long value) { @Override public boolean clearVirtualProcAssignement(ProcInterface proc) { - return procDao.clearVirtualProcAssignment(proc); + try { + return procDao.clearVirtualProcAssignment(proc); + } catch (DataAccessException e) { + return false; + } } @Transactional(propagation = Propagation.REQUIRED) @@ -343,6 +348,12 @@ public void clearFrame(DispatchFrame frame) { frameDao.updateFrameCleared(frame); } + @Override + @Transactional(propagation = Propagation.REQUIRED) + public boolean updateFrameMemoryError(FrameInterface frame) { + return frameDao.updateFrameMemoryError(frame); + } + @Transactional(propagation = Propagation.SUPPORTS) public RunFrame prepareRqdRunFrame(VirtualProc proc, DispatchFrame frame) { int threads = proc.coresReserved / 100; diff --git a/cuebot/src/main/java/com/imageworks/spcue/dispatcher/Dispatcher.java b/cuebot/src/main/java/com/imageworks/spcue/dispatcher/Dispatcher.java index 6ac703a41..072b04113 100644 --- a/cuebot/src/main/java/com/imageworks/spcue/dispatcher/Dispatcher.java +++ b/cuebot/src/main/java/com/imageworks/spcue/dispatcher/Dispatcher.java @@ -1,4 +1,3 @@ - /* * Copyright Contributors to the OpenCue Project * @@ -108,13 +107,8 @@ public interface Dispatcher { // without being penalized for it. public static final long VIRTUAL_MEM_THRESHHOLD = CueUtil.GB2; - // The amount of swap that must be used before a host can go - // into kill mode. - public static final long KILL_MODE_SWAP_THRESHOLD = CueUtil.MB128; - - // When the amount of free memory drops below this point, the - // host can go into kill mode. - public static final long KILL_MODE_MEM_THRESHOLD = CueUtil.MB512; + // How long to keep track of a frame kill request + public static final int FRAME_KILL_CACHE_EXPIRE_AFTER_WRITE_MINUTES = 3; // A higher number gets more deep booking but less spread on the cue. public static final int DEFAULT_MAX_FRAMES_PER_PASS = 4; diff --git a/cuebot/src/main/java/com/imageworks/spcue/dispatcher/FrameCompleteHandler.java b/cuebot/src/main/java/com/imageworks/spcue/dispatcher/FrameCompleteHandler.java index 55336aaf4..c405a9e31 100644 --- a/cuebot/src/main/java/com/imageworks/spcue/dispatcher/FrameCompleteHandler.java +++ b/cuebot/src/main/java/com/imageworks/spcue/dispatcher/FrameCompleteHandler.java @@ -33,6 +33,7 @@ import com.imageworks.spcue.DispatchFrame; import com.imageworks.spcue.DispatchHost; import com.imageworks.spcue.DispatchJob; +import com.imageworks.spcue.FrameDetail; import com.imageworks.spcue.JobDetail; import com.imageworks.spcue.LayerDetail; import com.imageworks.spcue.LayerInterface; @@ -143,49 +144,35 @@ public void handleFrameCompleteReport(final FrameCompleteReport report) { } try { - - final VirtualProc proc; - - try { - - proc = hostManager.getVirtualProc( - report.getFrame().getResourceId()); - } - catch (EmptyResultDataAccessException e) { - /* - * Do not propagate this exception to RQD. This - * usually means the cue lost connectivity to - * the host and cleared out the record of the proc. - * If this is propagated back to RQD, RQD will - * keep retrying the operation forever. - */ - logger.info("failed to acquire data needed to " + - "process completed frame: " + - report.getFrame().getFrameName() + " in job " + - report.getFrame().getJobName() + "," + e); - return; - } - + final VirtualProc proc = hostManager.getVirtualProc(report.getFrame().getResourceId()); final DispatchJob job = jobManager.getDispatchJob(proc.getJobId()); final LayerDetail layer = jobManager.getLayerDetail(report.getFrame().getLayerId()); + final FrameDetail frameDetail = jobManager.getFrameDetail(report.getFrame().getFrameId()); final DispatchFrame frame = jobManager.getDispatchFrame(report.getFrame().getFrameId()); final FrameState newFrameState = determineFrameState(job, layer, frame, report); final String key = proc.getJobId() + "_" + report.getFrame().getLayerId() + "_" + report.getFrame().getFrameId(); + if (dispatchSupport.stopFrame(frame, newFrameState, report.getExitStatus(), report.getFrame().getMaxRss())) { - dispatchQueue.execute(new KeyRunnable(key) { - @Override - public void run() { - try { - handlePostFrameCompleteOperations(proc, report, job, frame, - newFrameState); - } catch (Exception e) { - logger.warn("Exception during handlePostFrameCompleteOperations " + - "in handleFrameCompleteReport" + CueExceptionUtil.getStackTrace(e)); + if (dispatcher.isTestMode()) { + // Database modifications on a threadpool cannot be captured by the test thread + handlePostFrameCompleteOperations(proc, report, job, frame, + newFrameState, frameDetail); + } else { + dispatchQueue.execute(new KeyRunnable(key) { + @Override + public void run() { + try { + handlePostFrameCompleteOperations(proc, report, job, frame, + newFrameState, frameDetail); + } catch (Exception e) { + logger.warn("Exception during handlePostFrameCompleteOperations " + + "in handleFrameCompleteReport" + CueExceptionUtil.getStackTrace(e)); + } } - } - }); + }); + } } else { /* @@ -222,6 +209,19 @@ public void run() { } } } + catch (EmptyResultDataAccessException e) { + /* + * Do not propagate this exception to RQD. This + * usually means the cue lost connectivity to + * the host and cleared out the record of the proc. + * If this is propagated back to RQD, RQD will + * keep retrying the operation forever. + */ + logger.info("failed to acquire data needed to " + + "process completed frame: " + + report.getFrame().getFrameName() + " in job " + + report.getFrame().getJobName() + "," + e); + } catch (Exception e) { /* @@ -259,7 +259,7 @@ public void run() { */ public void handlePostFrameCompleteOperations(VirtualProc proc, FrameCompleteReport report, DispatchJob job, DispatchFrame frame, - FrameState newFrameState) { + FrameState newFrameState, FrameDetail frameDetail) { try { /* @@ -313,7 +313,8 @@ public void handlePostFrameCompleteOperations(VirtualProc proc, * specified in the show's service override, service or 2GB. */ if (report.getExitStatus() == Dispatcher.EXIT_STATUS_MEMORY_FAILURE - || report.getExitSignal() == Dispatcher.EXIT_STATUS_MEMORY_FAILURE) { + || report.getExitSignal() == Dispatcher.EXIT_STATUS_MEMORY_FAILURE + || frameDetail.exitStatus == Dispatcher.EXIT_STATUS_MEMORY_FAILURE) { long increase = CueUtil.GB2; // since there can be multiple services, just going for the diff --git a/cuebot/src/main/java/com/imageworks/spcue/dispatcher/HostReportHandler.java b/cuebot/src/main/java/com/imageworks/spcue/dispatcher/HostReportHandler.java index 2adef34fb..997e32fd4 100644 --- a/cuebot/src/main/java/com/imageworks/spcue/dispatcher/HostReportHandler.java +++ b/cuebot/src/main/java/com/imageworks/spcue/dispatcher/HostReportHandler.java @@ -21,12 +21,16 @@ import java.sql.Timestamp; import java.util.ArrayList; -import java.util.EnumSet; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; +import com.imageworks.spcue.JobInterface; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.LogManager; import org.springframework.beans.factory.annotation.Autowired; @@ -50,6 +54,7 @@ import com.imageworks.spcue.dispatcher.commands.DispatchBookHostLocal; import com.imageworks.spcue.dispatcher.commands.DispatchHandleHostReport; import com.imageworks.spcue.dispatcher.commands.DispatchRqdKillFrame; +import com.imageworks.spcue.dispatcher.commands.DispatchRqdKillFrameMemory; import com.imageworks.spcue.grpc.host.HardwareState; import com.imageworks.spcue.grpc.host.LockState; import com.imageworks.spcue.grpc.report.BootReport; @@ -63,10 +68,11 @@ import com.imageworks.spcue.service.CommentManager; import com.imageworks.spcue.service.HostManager; import com.imageworks.spcue.service.JobManager; -import com.imageworks.spcue.service.JobManagerSupport; import com.imageworks.spcue.util.CueExceptionUtil; import com.imageworks.spcue.util.CueUtil; +import static com.imageworks.spcue.dispatcher.Dispatcher.*; + public class HostReportHandler { private static final Logger logger = LogManager.getLogger(HostReportHandler.class); @@ -81,7 +87,6 @@ public class HostReportHandler { private Dispatcher localDispatcher; private RqdClient rqdClient; private JobManager jobManager; - private JobManagerSupport jobManagerSupport; private JobDao jobDao; private LayerDao layerDao; @Autowired @@ -93,6 +98,13 @@ public class HostReportHandler { "space on the temporary directory (mcp)"; private static final String CUEBOT_COMMENT_USER = "cuebot"; + // A cache to store kill requests and count the number of occurrences. + // The cache expires after write to avoid growing unbounded. If a request for a host-frame doesn't appear + // for a period of time, the entry will be removed. + Cache killRequestCounterCache = CacheBuilder.newBuilder() + .expireAfterWrite(FRAME_KILL_CACHE_EXPIRE_AFTER_WRITE_MINUTES, TimeUnit.MINUTES) + .build(); + /** * Boolean to toggle if this class is accepting data or not. */ @@ -143,7 +155,6 @@ public void queueHostReport(HostReport report) { reportQueue.execute(new DispatchHandleHostReport(report, this)); } - public void handleHostReport(HostReport report, boolean isBoot) { long startTime = System.currentTimeMillis(); try { @@ -211,9 +222,9 @@ public void handleHostReport(HostReport report, boolean isBoot) { killTimedOutFrames(report); /* - * Increase/decreased reserved memory. + * Prevent OOM (Out-Of-Memory) issues on the host and manage frame reserved memory */ - handleMemoryReservations(host, report); + handleMemoryUsage(host, report); /* * The checks are done in order of least CPU intensive to @@ -456,103 +467,279 @@ private void changeLockState(DispatchHost host, CoreDetail coreInfo) { } /** - * Handle memory reservations for the given host. This will re-balance memory - * reservations on the machine and kill and frames that are out of control. - * + * Prevent host from entering an OOM state where oom-killer might start killing important OS processes. + * The kill logic will kick in one of the following conditions is met: + * - Host has less than OOM_MEMORY_LEFT_THRESHOLD_PERCENT memory available + * - A frame is taking more than OOM_FRAME_OVERBOARD_PERCENT of what it had reserved + * For frames that are using more than they had reserved but not above the threshold, negotiate expanding + * the reservations with other frames on the same host * @param host * @param report */ - private void handleMemoryReservations(final DispatchHost host, final HostReport report) { + private void handleMemoryUsage(final DispatchHost host, final HostReport report) { + // Don't keep memory balances on nimby hosts + if (host.isNimby) { + return; + } - // TODO: GPU: Need to keep frames from growing into space reserved for GPU frames - // However all this is done in the database without a chance to edit the values here + final double OOM_MAX_SAFE_USED_MEMORY_THRESHOLD = + env.getRequiredProperty("dispatcher.oom_max_safe_used_memory_threshold", Double.class); + final double OOM_FRAME_OVERBOARD_ALLOWED_THRESHOLD = + env.getRequiredProperty("dispatcher.oom_frame_overboard_allowed_threshold", Double.class); + RenderHost renderHost = report.getHost(); + List runningFrames = report.getFramesList(); + + boolean memoryWarning = renderHost.getTotalMem() > 0 && + ((double)renderHost.getFreeMem()/renderHost.getTotalMem() < + (1.0 - OOM_MAX_SAFE_USED_MEMORY_THRESHOLD)); + + if (memoryWarning) { + long memoryAvailable = renderHost.getFreeMem(); + long minSafeMemoryAvailable = (long)(renderHost.getTotalMem() * (1.0 - OOM_MAX_SAFE_USED_MEMORY_THRESHOLD)); + // Only allow killing up to 10 frames at a time + int killAttemptsRemaining = 10; + VirtualProc killedProc = null; + do { + killedProc = killWorstMemoryOffender(host); + killAttemptsRemaining -= 1; + if (killedProc != null) { + memoryAvailable = memoryAvailable + killedProc.memoryUsed; + } + } while (killAttemptsRemaining > 0 && + memoryAvailable < minSafeMemoryAvailable && + killedProc != null); + } else { + // When no mass cleaning was required, check for frames going overboard + // if frames didn't go overboard, manage its reservations trying to increase + // them accordingly + for (final RunningFrameInfo frame : runningFrames) { + if (OOM_FRAME_OVERBOARD_ALLOWED_THRESHOLD > 0 && isFrameOverboard(frame)) { + if (!killFrameOverusingMemory(frame, host.getName())) { + logger.warn("Frame " + frame.getJobName() + "." + frame.getFrameName() + + " is overboard but could not be killed"); + } + } else { + handleMemoryReservations(frame); + } + } + } + } - /* - * Check to see if we enable kill mode to free up memory. - */ - boolean killMode = hostManager.isSwapping(host); + public enum KillCause { + FrameOverboard("This frame is using more memory than it had reserved."), + HostUnderOom("Frame killed by host under OOM pressure"), + FrameTimedOut("Frame timed out"), + FrameLluTimedOut("Frame LLU timed out"), + FrameVerificationFailure("Frame failed to be verified on the database"); + private final String message; - for (final RunningFrameInfo f: report.getFramesList()) { + private KillCause(String message) { + this.message = message; + } + @Override + public String toString() { + return message; + } + } - VirtualProc proc = null; + private boolean killFrameOverusingMemory(RunningFrameInfo frame, String hostname) { + try { + VirtualProc proc = hostManager.getVirtualProc(frame.getResourceId()); + + // Don't mess with localDispatch procs + if (proc.isLocalDispatch) { + return false; + } + + logger.info("Killing frame on " + frame.getJobName() + "." + frame.getFrameName() + + ", using too much memory."); + return killProcForMemory(proc, hostname, KillCause.FrameOverboard); + } catch (EmptyResultDataAccessException e) { + return false; + } + } + + private boolean getKillClearance(String hostname, String frameId) { + String cacheKey = hostname + "-" + frameId; + final int FRAME_KILL_RETRY_LIMIT = + env.getRequiredProperty("dispatcher.frame_kill_retry_limit", Integer.class); + + // Cache frame+host receiving a killRequest and count how many times the request is being retried + // meaning rqd is probably failing at attempting to kill the related proc + long cachedCount; + try { + cachedCount = 1 + killRequestCounterCache.get(cacheKey, () -> 0L); + } catch (ExecutionException e) { + return false; + } + killRequestCounterCache.put(cacheKey, cachedCount); + if (cachedCount > FRAME_KILL_RETRY_LIMIT) { + FrameInterface frame = jobManager.getFrame(frameId); + JobInterface job = jobManager.getJob(frame.getJobId()); + + logger.warn("KillRequest blocked for " + job.getName() + "." + frame.getName() + + " blocked for host " + hostname + ". The kill retry limit has been reached."); + return false; + } + return true; + } + + private boolean killProcForMemory(VirtualProc proc, String hostname, KillCause killCause) { + if (!getKillClearance(hostname, proc.frameId)) { + return false; + } + + FrameInterface frame = jobManager.getFrame(proc.frameId); + if (dispatcher.isTestMode()) { + // Different threads don't share the same database state on the test environment + (new DispatchRqdKillFrameMemory(hostname, frame, killCause.toString(), rqdClient, + dispatchSupport, dispatcher.isTestMode())).run(); + } else { try { - proc = hostManager.getVirtualProc(f.getResourceId()); + killQueue.execute(new DispatchRqdKillFrameMemory(hostname, frame, killCause.toString(), rqdClient, + dispatchSupport, dispatcher.isTestMode())); + } catch (TaskRejectedException e) { + logger.warn("Unable to add a DispatchRqdKillFrame request, task rejected, " + e); + return false; + } + } + DispatchSupport.killedOffenderProcs.incrementAndGet(); + return true; + } - // TODO: handle memory management for local dispatches - // Skip local dispatches for now. - if (proc.isLocalDispatch) { - continue; - } + private boolean killFrame(String frameId, String hostname, KillCause killCause) { + if (!getKillClearance(hostname, frameId)) { + return false; + } + if (dispatcher.isTestMode()) { + // Different threads don't share the same database state on the test environment + (new DispatchRqdKillFrame(hostname, frameId, killCause.toString(), rqdClient)).run(); + } else { + try { + killQueue.execute(new DispatchRqdKillFrame(hostname, + frameId, + killCause.toString(), + rqdClient)); + } catch (TaskRejectedException e) { + logger.warn("Unable to add a DispatchRqdKillFrame request, task rejected, " + e); + } + } + DispatchSupport.killedOffenderProcs.incrementAndGet(); + return true; + } - if (f.getRss() > host.memory) { - try{ - logger.info("Killing frame " + f.getJobName() + "/" + f.getFrameName() + ", " - + proc.getName() + " was OOM"); - try { - killQueue.execute(new DispatchRqdKillFrame(proc, "The frame required " + - CueUtil.KbToMb(f.getRss()) + " but the machine only has " + - CueUtil.KbToMb(host.memory), rqdClient)); - } catch (TaskRejectedException e) { - logger.warn("Unable to queue RQD kill, task rejected, " + e); - } - DispatchSupport.killedOomProcs.incrementAndGet(); - } catch (Exception e) { - logger.info("failed to kill frame on " + proc.getName() + - "," + e); - } - } + /** + * Kill proc with the worst user/reserved memory ratio. + * + * @param host + * @return killed proc, or null if none could be found or failed to be killed + */ + private VirtualProc killWorstMemoryOffender(final DispatchHost host) { + try { + VirtualProc proc = hostManager.getWorstMemoryOffender(host); + logger.info("Killing frame on " + proc.getName() + ", host is under stress."); - if (dispatchSupport.increaseReservedMemory(proc, f.getRss())) { - proc.memoryReserved = f.getRss(); - logger.info("frame " + f.getFrameName() + " on job " + f.getJobName() - + " increased its reserved memory to " + - CueUtil.KbToMb(f.getRss())); - } + if (!killProcForMemory(proc, host.getName(), KillCause.HostUnderOom)) { + proc = null; + } + return proc; + } + catch (EmptyResultDataAccessException e) { + logger.error(host.name + " is under OOM and no proc is running on it."); + return null; + } + } + + /** + * Check frame memory usage comparing the amount used with the amount it had reserved + * @param frame + * @return + */ + private boolean isFrameOverboard(final RunningFrameInfo frame) { + final double OOM_FRAME_OVERBOARD_ALLOWED_THRESHOLD = + env.getRequiredProperty("dispatcher.oom_frame_overboard_allowed_threshold", Double.class); + + if (OOM_FRAME_OVERBOARD_ALLOWED_THRESHOLD < 0) { + return false; + } + + double rss = (double)frame.getRss(); + double maxRss = (double)frame.getMaxRss(); + final double MAX_RSS_OVERBOARD_THRESHOLD = OOM_FRAME_OVERBOARD_ALLOWED_THRESHOLD * 2; + final double RSS_AVAILABLE_FOR_MAX_RSS_TRIGGER = 0.1; + + try { + VirtualProc proc = hostManager.getVirtualProc(frame.getResourceId()); + double reserved = (double)proc.memoryReserved; + + // Last memory report is higher than the threshold + if (isOverboard(rss, reserved, OOM_FRAME_OVERBOARD_ALLOWED_THRESHOLD)) { + return true; + } + // If rss is not overboard, handle the situation where the frame might be going overboard from + // time to time but the last report wasn't during a spike. For this case, consider a combination + // of rss and maxRss. maxRss > 2 * threshold and rss > 0.9 + else { + return isOverboard(maxRss, reserved, MAX_RSS_OVERBOARD_THRESHOLD) && + isOverboard(rss, reserved, -RSS_AVAILABLE_FOR_MAX_RSS_TRIGGER); + } + } catch (EmptyResultDataAccessException e) { + logger.info("HostReportHandler(isFrameOverboard): Virtual proc for frame " + + frame.getFrameName() + " on job " + frame.getJobName() + " doesn't exist on the database"); + // Not able to mark the frame overboard is it couldn't be found on the db. + // Proc accounting (verifyRunningProc) should take care of it + return false; + } + } - } catch (ResourceReservationFailureException e) { + private boolean isOverboard(double value, double total, double threshold) { + return value/total >= (1 + threshold); + } - long memNeeded = f.getRss() - proc.memoryReserved; + /** + * Handle memory reservations for the given frame + * + * @param frame + */ + private void handleMemoryReservations(final RunningFrameInfo frame) { + VirtualProc proc = null; + try { + proc = hostManager.getVirtualProc(frame.getResourceId()); - logger.info("frame " + f.getFrameName() + " on job " + f.getJobName() + if (proc.isLocalDispatch) { + return; + } + + if (dispatchSupport.increaseReservedMemory(proc, frame.getRss())) { + proc.memoryReserved = frame.getRss(); + logger.info("frame " + frame.getFrameName() + " on job " + frame.getJobName() + + " increased its reserved memory to " + + CueUtil.KbToMb(frame.getRss())); + } + } catch (ResourceReservationFailureException e) { + if (proc != null) { + long memNeeded = frame.getRss() - proc.memoryReserved; + logger.info("frame " + frame.getFrameName() + " on job " + frame.getJobName() + "was unable to reserve an additional " + CueUtil.KbToMb(memNeeded) + "on proc " + proc.getName() + ", " + e); - try { if (dispatchSupport.balanceReservedMemory(proc, memNeeded)) { - proc.memoryReserved = f.getRss(); + proc.memoryReserved = frame.getRss(); logger.info("was able to balance host: " + proc.getName()); - } - else { + } else { logger.info("failed to balance host: " + proc.getName()); } } catch (Exception ex) { logger.warn("failed to balance host: " + proc.getName() + ", " + e); } - } catch (EmptyResultDataAccessException e) { - logger.info("HostReportHandler: frame " + f.getFrameName() + - " on job " + f.getJobName() + - " was unable be processed" + - " because the proc could not be found"); - } - } - - if (killMode) { - VirtualProc proc; - try { - proc = hostManager.getWorstMemoryOffender(host); - } - catch (EmptyResultDataAccessException e) { - logger.info(host.name + " is swapping and no proc is running on it."); - return; + } else { + logger.info("frame " + frame.getFrameName() + " on job " + frame.getJobName() + + "was unable to reserve an additional memory. Proc could not be found"); } - - logger.info("Killing frame on " + - proc.getName() + ", host is distressed."); - - DispatchSupport.killedOffenderProcs.incrementAndGet(); - jobManagerSupport.kill(proc, new Source( - "The host was dangerously low on memory and swapping.")); + } catch (EmptyResultDataAccessException e) { + logger.info("HostReportHandler: Memory reservations for frame " + frame.getFrameName() + + " on job " + frame.getJobName() + " proc could not be found"); } } @@ -562,7 +749,6 @@ private void handleMemoryReservations(final DispatchHost host, final HostReport * @param rFrames */ private void killTimedOutFrames(HostReport report) { - final Map layers = new HashMap(5); for (RunningFrameInfo frame: report.getFramesList()) { @@ -570,36 +756,16 @@ private void killTimedOutFrames(HostReport report) { LayerDetail layer = layerDao.getLayerDetail(layerId); long runtimeMinutes = ((System.currentTimeMillis() - frame.getStartTime()) / 1000l) / 60; - if (layer.timeout != 0 && runtimeMinutes > layer.timeout){ - try { - killQueue.execute(new DispatchRqdKillFrame(report.getHost().getName(), - frame.getFrameId(), - "This frame has reached it timeout.", - rqdClient)); - } catch (TaskRejectedException e) { - logger.warn("Unable to queue RQD kill, task rejected, " + e); - } - } + String hostname = report.getHost().getName(); - if (layer.timeout_llu == 0){ - continue; - } - - if (frame.getLluTime() == 0){ - continue; - } - - long r = System.currentTimeMillis() / 1000; - long lastUpdate = (r - frame.getLluTime()) / 60; + if (layer.timeout != 0 && runtimeMinutes > layer.timeout){ + killFrame(frame.getFrameId(), hostname, KillCause.FrameTimedOut); + } else if (layer.timeout_llu != 0 && frame.getLluTime() != 0) { + long r = System.currentTimeMillis() / 1000; + long lastUpdate = (r - frame.getLluTime()) / 60; - if (layer.timeout_llu != 0 && lastUpdate > (layer.timeout_llu -1)){ - try { - killQueue.execute(new DispatchRqdKillFrame(report.getHost().getName(), - frame.getFrameId(), - "This frame has reached it LLU timeout.", - rqdClient)); - } catch (TaskRejectedException e) { - logger.warn("Unable to queue RQD kill, task rejected, " + e); + if (layer.timeout_llu != 0 && lastUpdate > (layer.timeout_llu - 1)){ + killFrame(frame.getFrameId(), hostname, KillCause.FrameLluTimedOut); } } } @@ -725,98 +891,59 @@ public void verifyRunningFrameInfo(HostReport report) { continue; } + if (hostManager.verifyRunningProc(runningFrame.getResourceId(), runningFrame.getFrameId())) { + runningFrames.add(runningFrame); + continue; + } - if (!hostManager.verifyRunningProc(runningFrame.getResourceId(), - runningFrame.getFrameId())) { - - /* - * The frame this proc is running is no longer - * assigned to this proc. Don't ever touch - * the frame record. If we make it here that means - * the proc has been running for over 2 min. - */ - - String msg; - VirtualProc proc = null; + /* + * The frame this proc is running is no longer + * assigned to this proc. Don't ever touch + * the frame record. If we make it here that means + * the proc has been running for over 2 min. + */ + String msg; + VirtualProc proc = null; - try { - proc = hostManager.getVirtualProc(runningFrame.getResourceId()); - msg = "Virutal proc " + proc.getProcId() + + try { + proc = hostManager.getVirtualProc(runningFrame.getResourceId()); + msg = "Virtual proc " + proc.getProcId() + "is assigned to " + proc.getFrameId() + " not " + runningFrame.getFrameId(); - } - catch (Exception e) { - /* - * This will happen if the host goes off line and then - * comes back. In this case, we don't touch the frame - * since it might already be running somewhere else. We - * do however kill the proc. - */ - msg = "Virtual proc did not exist."; - } - - logger.info("warning, the proc " + - runningFrame.getResourceId() + " on host " + - report.getHost().getName() + " was running for " + - (runtimeSeconds / 60.0f) + " minutes " + - runningFrame.getJobName() + "/" + runningFrame.getFrameName() + - "but the DB did not " + - "reflect this " + - msg); - - DispatchSupport.accountingErrors.incrementAndGet(); - - try { - /* - * If the proc did exist unbook it if we can't - * verify its running something. - */ - boolean rqd_kill = false; - if (proc != null) { - - /* - * Check to see if the proc is an orphan. - */ - if (hostManager.isOprhan(proc)) { - dispatchSupport.clearVirtualProcAssignement(proc); - dispatchSupport.unbookProc(proc); - rqd_kill = true; - } - } - else { - /* Proc doesn't exist so a kill won't hurt */ - rqd_kill = true; - } - - if (rqd_kill) { - try { - killQueue.execute(new DispatchRqdKillFrame(report.getHost().getName(), - runningFrame.getFrameId(), - "OpenCue could not verify this frame.", - rqdClient)); - } catch (TaskRejectedException e) { - logger.warn("Unable to queue RQD kill, task rejected, " + e); - } - } + } + catch (Exception e) { + /* + * This will happen if the host goes offline and then + * comes back. In this case, we don't touch the frame + * since it might already be running somewhere else. We + * do however kill the proc. + */ + msg = "Virtual proc did not exist."; + } - } catch (RqdClientException rqde) { - logger.warn("failed to kill " + - runningFrame.getJobName() + "/" + - runningFrame.getFrameName() + - " when trying to clear a failed " + - " frame verification, " + rqde); - - } catch (Exception e) { - CueExceptionUtil.logStackTrace("failed", e); - logger.warn("failed to verify " + - runningFrame.getJobName() + "/" + - runningFrame.getFrameName() + - " was running but the frame was " + - " unable to be killed, " + e); - } + DispatchSupport.accountingErrors.incrementAndGet(); + if (proc != null && hostManager.isOprhan(proc)) { + dispatchSupport.clearVirtualProcAssignement(proc); + dispatchSupport.unbookProc(proc); + proc = null; } - else { - runningFrames.add(runningFrame); + if (proc == null) { + if (killFrame(runningFrame.getFrameId(), + report.getHost().getName(), + KillCause.FrameVerificationFailure)) { + logger.info("FrameVerificationError, the proc " + + runningFrame.getResourceId() + " on host " + + report.getHost().getName() + " was running for " + + (runtimeSeconds / 60.0f) + " minutes " + + runningFrame.getJobName() + "/" + runningFrame.getFrameName() + + "but the DB did not " + + "reflect this. " + + msg); + } else { + logger.warn("FrameStuckWarning: frameId=" + runningFrame.getFrameId() + + " render_node=" + report.getHost().getName() + " - " + + runningFrame.getJobName() + "/" + runningFrame.getFrameName()); + } } } } @@ -877,14 +1004,6 @@ public void setJobManager(JobManager jobManager) { this.jobManager = jobManager; } - public JobManagerSupport getJobManagerSupport() { - return jobManagerSupport; - } - - public void setJobManagerSupport(JobManagerSupport jobManagerSupport) { - this.jobManagerSupport = jobManagerSupport; - } - public JobDao getJobDao() { return jobDao; } diff --git a/cuebot/src/main/java/com/imageworks/spcue/dispatcher/commands/DispatchRqdKillFrame.java b/cuebot/src/main/java/com/imageworks/spcue/dispatcher/commands/DispatchRqdKillFrame.java index 61258a824..fe9bde60e 100644 --- a/cuebot/src/main/java/com/imageworks/spcue/dispatcher/commands/DispatchRqdKillFrame.java +++ b/cuebot/src/main/java/com/imageworks/spcue/dispatcher/commands/DispatchRqdKillFrame.java @@ -31,9 +31,7 @@ public class DispatchRqdKillFrame extends KeyRunnable { private static final Logger logger = LogManager.getLogger(DispatchRqdKillFrame.class); - private VirtualProc proc = null; private String message; - private String hostname; private String frameId; @@ -47,28 +45,14 @@ public DispatchRqdKillFrame(String hostname, String frameId, String message, Rqd this.rqdClient = rqdClient; } - public DispatchRqdKillFrame(VirtualProc proc, String message, RqdClient rqdClient) { - super("disp_rqd_kill_frame_" + proc.getProcId() + "_" + rqdClient.toString()); - this.proc = proc; - this.hostname = proc.hostName; - this.message = message; - this.rqdClient = rqdClient; - } - @Override public void run() { long startTime = System.currentTimeMillis(); try { - if (proc != null) { - rqdClient.killFrame(proc, message); - } - else { - rqdClient.killFrame(hostname, frameId, message); - } + rqdClient.killFrame(hostname, frameId, message); } catch (RqdClientException e) { logger.info("Failed to contact host " + hostname + ", " + e); - } - finally { + } finally { long elapsedTime = System.currentTimeMillis() - startTime; logger.info("RQD communication with " + hostname + " took " + elapsedTime + "ms"); diff --git a/cuebot/src/main/java/com/imageworks/spcue/dispatcher/commands/DispatchRqdKillFrameMemory.java b/cuebot/src/main/java/com/imageworks/spcue/dispatcher/commands/DispatchRqdKillFrameMemory.java new file mode 100644 index 000000000..301f77479 --- /dev/null +++ b/cuebot/src/main/java/com/imageworks/spcue/dispatcher/commands/DispatchRqdKillFrameMemory.java @@ -0,0 +1,78 @@ + +/* + * Copyright Contributors to the OpenCue Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + + +package com.imageworks.spcue.dispatcher.commands; + +import com.imageworks.spcue.FrameInterface; +import com.imageworks.spcue.VirtualProc; +import com.imageworks.spcue.dispatcher.DispatchSupport; +import com.imageworks.spcue.rqd.RqdClient; +import com.imageworks.spcue.rqd.RqdClientException; +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.LogManager; + +/** + * A runnable to communicate with rqd requesting for a frame to be killed due to memory issues. + *

+ * Before killing a frame, the database is updated to mark the frame status as EXIT_STATUS_MEMORY_FAILURE, + * this allows the FrameCompleteHandler to possibly retry the frame after increasing its memory requirements + */ +public class DispatchRqdKillFrameMemory extends KeyRunnable { + + private static final Logger logger = LogManager.getLogger(DispatchRqdKillFrameMemory.class); + + private String message; + private String hostname; + private DispatchSupport dispatchSupport; + private final RqdClient rqdClient; + private final boolean isTestMode; + + private FrameInterface frame; + + public DispatchRqdKillFrameMemory(String hostname, FrameInterface frame, String message, RqdClient rqdClient, + DispatchSupport dispatchSupport, boolean isTestMode) { + super("disp_rqd_kill_frame_" + frame.getFrameId() + "_" + rqdClient.toString()); + this.frame = frame; + this.hostname = hostname; + this.message = message; + this.rqdClient = rqdClient; + this.dispatchSupport = dispatchSupport; + this.isTestMode = isTestMode; + } + + @Override + public void run() { + long startTime = System.currentTimeMillis(); + try { + if (dispatchSupport.updateFrameMemoryError(frame) && !isTestMode) { + rqdClient.killFrame(hostname, frame.getFrameId(), message); + } else { + logger.warn("Could not update frame " + frame.getFrameId() + + " status to EXIT_STATUS_MEMORY_FAILURE. Canceling kill request!"); + } + } catch (RqdClientException e) { + logger.warn("Failed to contact host " + hostname + ", " + e); + } finally { + long elapsedTime = System.currentTimeMillis() - startTime; + logger.info("RQD communication with " + hostname + + " took " + elapsedTime + "ms"); + } + } +} + diff --git a/cuebot/src/main/java/com/imageworks/spcue/service/HostManager.java b/cuebot/src/main/java/com/imageworks/spcue/service/HostManager.java index aaf401688..e62d8647b 100644 --- a/cuebot/src/main/java/com/imageworks/spcue/service/HostManager.java +++ b/cuebot/src/main/java/com/imageworks/spcue/service/HostManager.java @@ -70,15 +70,6 @@ public interface HostManager { */ void setHostFreeTempDir(HostInterface host, Long freeTempDir); - /** - * Return true if the host is swapping hard enough - * that killing frames will save the entire machine. - * - * @param host - * @return - */ - boolean isSwapping(HostInterface host); - DispatchHost createHost(HostReport report); DispatchHost createHost(RenderHost host); diff --git a/cuebot/src/main/java/com/imageworks/spcue/service/HostManagerService.java b/cuebot/src/main/java/com/imageworks/spcue/service/HostManagerService.java index a1533d695..36de34a1c 100644 --- a/cuebot/src/main/java/com/imageworks/spcue/service/HostManagerService.java +++ b/cuebot/src/main/java/com/imageworks/spcue/service/HostManagerService.java @@ -98,12 +98,6 @@ public void setHostFreeTempDir(HostInterface host, Long freeTempDir) { hostDao.updateHostFreeTempDir(host, freeTempDir); } - @Override - @Transactional(propagation = Propagation.REQUIRED, readOnly=true) - public boolean isSwapping(HostInterface host) { - return hostDao.isKillMode(host); - } - public void rebootWhenIdle(HostInterface host) { try { hostDao.updateHostState(host, HardwareState.REBOOT_WHEN_IDLE); diff --git a/cuebot/src/main/resources/conf/spring/applicationContext-service.xml b/cuebot/src/main/resources/conf/spring/applicationContext-service.xml index 35bd5cbb8..5aedc91b3 100644 --- a/cuebot/src/main/resources/conf/spring/applicationContext-service.xml +++ b/cuebot/src/main/resources/conf/spring/applicationContext-service.xml @@ -384,7 +384,6 @@ - diff --git a/cuebot/src/main/resources/opencue.properties b/cuebot/src/main/resources/opencue.properties index 6b2875899..b7f2a23ff 100644 --- a/cuebot/src/main/resources/opencue.properties +++ b/cuebot/src/main/resources/opencue.properties @@ -130,7 +130,21 @@ dispatcher.booking_queue.max_pool_size=6 # Queue capacity for booking. dispatcher.booking_queue.queue_capacity=1000 -# Whether or not to satisfy dependents (*_ON_FRAME and *_ON_LAYER) only on Frame success +# Percentage of used memory to consider a risk for triggering oom-killer +dispatcher.oom_max_safe_used_memory_threshold=0.95 + +# How much can a frame exceed its reserved memory. +# - 0.5 means 50% above reserve +# - -1.0 makes the feature inactive +# This feature is being kept inactive for now as we work on improving the +# frame retry logic (See commit comment for more details). +dispatcher.oom_frame_overboard_allowed_threshold=-1.0 + +# How many times should cuebot send a kill request for the same frame-host before reporting +# the frame as stuck +dispatcher.frame_kill_retry_limit=3 + +# Whether to satisfy dependents (*_ON_FRAME and *_ON_LAYER) only on Frame success depend.satisfy_only_on_frame_success=true # Jobs will be archived to the history tables after being completed for this long. diff --git a/cuebot/src/test/java/com/imageworks/spcue/test/dao/postgres/HostDaoTests.java b/cuebot/src/test/java/com/imageworks/spcue/test/dao/postgres/HostDaoTests.java index a6261e464..918b43679 100644 --- a/cuebot/src/test/java/com/imageworks/spcue/test/dao/postgres/HostDaoTests.java +++ b/cuebot/src/test/java/com/imageworks/spcue/test/dao/postgres/HostDaoTests.java @@ -331,24 +331,6 @@ public void testIsHostLocked() { assertEquals(hostDao.isHostLocked(host),true); } - @Test - @Transactional - @Rollback(true) - public void testIsKillMode() { - hostDao.insertRenderHost(buildRenderHost(TEST_HOST), - hostManager.getDefaultAllocationDetail(), - false); - - HostEntity host = hostDao.findHostDetail(TEST_HOST); - assertFalse(hostDao.isKillMode(host)); - - jdbcTemplate.update( - "UPDATE host_stat SET int_swap_free = ?, int_mem_free = ? WHERE pk_host = ?", - CueUtil.MB256, CueUtil.MB256, host.getHostId()); - - assertTrue(hostDao.isKillMode(host)); - } - @Test @Transactional @Rollback(true) diff --git a/cuebot/src/test/java/com/imageworks/spcue/test/dispatcher/FrameCompleteHandlerTests.java b/cuebot/src/test/java/com/imageworks/spcue/test/dispatcher/FrameCompleteHandlerTests.java index d313c5293..1f452e92a 100644 --- a/cuebot/src/test/java/com/imageworks/spcue/test/dispatcher/FrameCompleteHandlerTests.java +++ b/cuebot/src/test/java/com/imageworks/spcue/test/dispatcher/FrameCompleteHandlerTests.java @@ -310,10 +310,11 @@ private void executeDepend( DispatchJob dispatchJob = jobManager.getDispatchJob(proc.getJobId()); DispatchFrame dispatchFrame = jobManager.getDispatchFrame(report.getFrame().getFrameId()); + FrameDetail frameDetail = jobManager.getFrameDetail(report.getFrame().getFrameId()); dispatchSupport.stopFrame(dispatchFrame, frameState, report.getExitStatus(), report.getFrame().getMaxRss()); frameCompleteHandler.handlePostFrameCompleteOperations(proc, - report, dispatchJob, dispatchFrame, frameState); + report, dispatchJob, dispatchFrame, frameState, frameDetail); assertTrue(jobManager.isLayerComplete(layerFirst)); assertFalse(jobManager.isLayerComplete(layerSecond)); @@ -401,10 +402,11 @@ private void executeMinMemIncrease(int expected, boolean override) { DispatchJob dispatchJob = jobManager.getDispatchJob(proc.getJobId()); DispatchFrame dispatchFrame = jobManager.getDispatchFrame(report.getFrame().getFrameId()); + FrameDetail frameDetail = jobManager.getFrameDetail(report.getFrame().getFrameId()); dispatchSupport.stopFrame(dispatchFrame, FrameState.DEAD, report.getExitStatus(), report.getFrame().getMaxRss()); frameCompleteHandler.handlePostFrameCompleteOperations(proc, - report, dispatchJob, dispatchFrame, FrameState.WAITING); + report, dispatchJob, dispatchFrame, FrameState.WAITING, frameDetail); assertFalse(jobManager.isLayerComplete(layer)); diff --git a/cuebot/src/test/java/com/imageworks/spcue/test/dispatcher/HostReportHandlerGpuTests.java b/cuebot/src/test/java/com/imageworks/spcue/test/dispatcher/HostReportHandlerGpuTests.java index ce1e98ae1..120c620a1 100644 --- a/cuebot/src/test/java/com/imageworks/spcue/test/dispatcher/HostReportHandlerGpuTests.java +++ b/cuebot/src/test/java/com/imageworks/spcue/test/dispatcher/HostReportHandlerGpuTests.java @@ -83,12 +83,12 @@ private static RenderHost getRenderHost() { .setBootTime(1192369572) // The minimum amount of free space in the temporary directory to book a host. .setFreeMcp(CueUtil.GB) - .setFreeMem(53500) - .setFreeSwap(20760) + .setFreeMem(CueUtil.GB8) + .setFreeSwap(CueUtil.GB2) .setLoad(0) .setTotalMcp(CueUtil.GB4) - .setTotalMem(1048576L * 4096) - .setTotalSwap(20960) + .setTotalMem(CueUtil.GB8) + .setTotalSwap(CueUtil.GB2) .setNimbyEnabled(false) .setNumProcs(2) .setCoresPerProc(100) @@ -115,7 +115,7 @@ public void testHandleHostReport() { hostReportHandler.handleHostReport(report, true); DispatchHost host = getHost(); assertEquals(host.lockState, LockState.OPEN); - assertEquals(host.memory, 4294443008L); + assertEquals(host.memory, CueUtil.GB8 - 524288); assertEquals(host.gpus, 64); assertEquals(host.idleGpus, 64); assertEquals(host.gpuMemory, 1048576L * 2048); diff --git a/cuebot/src/test/java/com/imageworks/spcue/test/dispatcher/HostReportHandlerTests.java b/cuebot/src/test/java/com/imageworks/spcue/test/dispatcher/HostReportHandlerTests.java index 970b97a95..971df8d14 100644 --- a/cuebot/src/test/java/com/imageworks/spcue/test/dispatcher/HostReportHandlerTests.java +++ b/cuebot/src/test/java/com/imageworks/spcue/test/dispatcher/HostReportHandlerTests.java @@ -21,10 +21,15 @@ import java.io.File; import java.sql.Timestamp; +import java.util.Arrays; import java.util.List; import java.util.concurrent.ThreadPoolExecutor; import javax.annotation.Resource; +import com.imageworks.spcue.dispatcher.DispatchSupport; +import com.imageworks.spcue.dispatcher.HostReportQueue; +import com.imageworks.spcue.dispatcher.FrameCompleteHandler; +import com.imageworks.spcue.grpc.job.FrameState; import org.junit.Before; import org.junit.Test; import org.springframework.test.annotation.Rollback; @@ -44,6 +49,7 @@ import com.imageworks.spcue.grpc.report.HostReport; import com.imageworks.spcue.grpc.report.RenderHost; import com.imageworks.spcue.grpc.report.RunningFrameInfo; +import com.imageworks.spcue.grpc.report.FrameCompleteReport; import com.imageworks.spcue.service.AdminManager; import com.imageworks.spcue.service.CommentManager; import com.imageworks.spcue.service.HostManager; @@ -52,6 +58,7 @@ import com.imageworks.spcue.test.TransactionalTest; import com.imageworks.spcue.util.CueUtil; import com.imageworks.spcue.VirtualProc; +import com.imageworks.spcue.LayerDetail; import java.util.UUID; @@ -70,6 +77,9 @@ public class HostReportHandlerTests extends TransactionalTest { @Resource HostReportHandler hostReportHandler; + @Resource + FrameCompleteHandler frameCompleteHandler; + @Resource Dispatcher dispatcher; @@ -99,9 +109,9 @@ public void setTestMode() { public void createHost() { hostname = UUID.randomUUID().toString().substring(0, 8); hostname2 = UUID.randomUUID().toString().substring(0, 8); - hostManager.createHost(getRenderHost(hostname, HardwareState.UP), + hostManager.createHost(getRenderHost(hostname), adminManager.findAllocationDetail("spi","general")); - hostManager.createHost(getRenderHost(hostname2, HardwareState.UP), + hostManager.createHost(getRenderHost(hostname2), adminManager.findAllocationDetail("spi","general")); } @@ -118,52 +128,32 @@ private DispatchHost getHost(String hostname) { return hostManager.findDispatchHost(hostname); } - private static RenderHost getRenderHost(String hostname, HardwareState state) { + private static RenderHost.Builder getRenderHostBuilder(String hostname) { return RenderHost.newBuilder() .setName(hostname) .setBootTime(1192369572) // The minimum amount of free space in the temporary directory to book a host. .setFreeMcp(CueUtil.GB) - .setFreeMem((int) CueUtil.GB8) - .setFreeSwap(20760) + .setFreeMem(CueUtil.GB8) + .setFreeSwap(CueUtil.GB2) .setLoad(0) .setTotalMcp(CueUtil.GB4) .setTotalMem(CueUtil.GB8) .setTotalSwap(CueUtil.GB2) .setNimbyEnabled(false) - .setNumProcs(2) + .setNumProcs(16) .setCoresPerProc(100) .addTags("test") - .setState(state) + .setState(HardwareState.UP) .setFacility("spi") .putAttributes("SP_OS", "Linux") - .setFreeGpuMem((int) CueUtil.MB512) - .setTotalGpuMem((int) CueUtil.MB512) - .build(); + .setNumGpus(0) + .setFreeGpuMem(0) + .setTotalGpuMem(0); } - private static RenderHost getRenderHost(String hostname, HardwareState state, Long freeTempDir) { - return RenderHost.newBuilder() - .setName(hostname) - .setBootTime(1192369572) - // The minimum amount of free space in the temporary directory to book a host. - .setFreeMcp(freeTempDir) - .setFreeMem((int) CueUtil.GB8) - .setFreeSwap(20760) - .setLoad(0) - .setTotalMcp(freeTempDir * 4) - .setTotalMem(CueUtil.GB8) - .setTotalSwap(CueUtil.GB2) - .setNimbyEnabled(false) - .setNumProcs(2) - .setCoresPerProc(100) - .addTags("test") - .setState(state) - .setFacility("spi") - .putAttributes("SP_OS", "Linux") - .setFreeGpuMem((int) CueUtil.MB512) - .setTotalGpuMem((int) CueUtil.MB512) - .build(); + private static RenderHost getRenderHost(String hostname) { + return getRenderHostBuilder(hostname).build(); } private static RenderHost getNewRenderHost(String tags) { @@ -172,12 +162,12 @@ private static RenderHost getNewRenderHost(String tags) { .setBootTime(1192369572) // The minimum amount of free space in the temporary directory to book a host. .setFreeMcp(CueUtil.GB) - .setFreeMem(53500) - .setFreeSwap(20760) + .setFreeMem(CueUtil.GB8) + .setFreeSwap(CueUtil.GB2) .setLoad(0) - .setTotalMcp(CueUtil.GB4) - .setTotalMem(8173264) - .setTotalSwap(20960) + .setTotalMcp(195430) + .setTotalMem(CueUtil.GB8) + .setTotalSwap(CueUtil.GB2) .setNimbyEnabled(false) .setNumProcs(2) .setCoresPerProc(100) @@ -196,15 +186,15 @@ private static RenderHost getNewRenderHost(String tags) { public void testHandleHostReport() throws InterruptedException { CoreDetail cores = getCoreDetail(200, 200, 0, 0); HostReport report1 = HostReport.newBuilder() - .setHost(getRenderHost(hostname, HardwareState.UP)) + .setHost(getRenderHost(hostname)) .setCoreInfo(cores) .build(); HostReport report2 = HostReport.newBuilder() - .setHost(getRenderHost(hostname2, HardwareState.UP)) + .setHost(getRenderHost(hostname2)) .setCoreInfo(cores) .build(); HostReport report1_2 = HostReport.newBuilder() - .setHost(getRenderHost(hostname, HardwareState.UP)) + .setHost(getRenderHost(hostname)) .setCoreInfo(getCoreDetail(200, 200, 100, 0)) .build(); @@ -314,7 +304,7 @@ public void testHandleHostReportWithFullTemporaryDirectories() { * */ // Create HostReport HostReport report1 = HostReport.newBuilder() - .setHost(getRenderHost(hostname, HardwareState.UP, 1024L)) + .setHost(getRenderHostBuilder(hostname).setFreeMcp(1024L).build()) .setCoreInfo(cores) .build(); // Call handleHostReport() => Create the comment with subject=SUBJECT_COMMENT_FULL_TEMP_DIR and change the @@ -356,7 +346,7 @@ public void testHandleHostReportWithFullTemporaryDirectories() { * */ // Set the host freeTempDir to the minimum size required = 1GB (1048576 KB) HostReport report2 = HostReport.newBuilder() - .setHost(getRenderHost(hostname, HardwareState.UP, 1048576L)) + .setHost(getRenderHostBuilder(hostname).setFreeMcp(CueUtil.GB).build()) .setCoreInfo(cores) .build(); // Call handleHostReport() => Delete the comment with subject=SUBJECT_COMMENT_FULL_TEMP_DIR and change the @@ -397,7 +387,7 @@ public void testHandleHostReportWithHardwareStateRepairNotRelatedToFullTempDir() * */ // Create HostReport HostReport report = HostReport.newBuilder() - .setHost(getRenderHost(hostname, HardwareState.UP, 1048576L)) + .setHost(getRenderHostBuilder(hostname).setFreeMcp(CueUtil.GB).build()) .setCoreInfo(cores) .build(); // Get host @@ -451,7 +441,7 @@ public void testMemoryAndLlu() { .setMaxRss(420000) .build(); HostReport report = HostReport.newBuilder() - .setHost(getRenderHost(hostname, HardwareState.UP)) + .setHost(getRenderHost(hostname)) .setCoreInfo(cores) .addFrames(info) .build(); @@ -462,5 +452,170 @@ public void testMemoryAndLlu() { assertEquals(frame.dateLLU, new Timestamp(now / 1000 * 1000)); assertEquals(420000, frame.maxRss); } + + @Test + @Transactional + @Rollback(true) + public void testMemoryAggressionRss() { + jobLauncher.testMode = true; + dispatcher.setTestMode(true); + + jobLauncher.launch(new File("src/test/resources/conf/jobspec/jobspec_simple.xml")); + + DispatchHost host = getHost(hostname); + List procs = dispatcher.dispatchHost(host); + assertEquals(1, procs.size()); + VirtualProc proc = procs.get(0); + + // 1.6 = 1 + dispatcher.oom_frame_overboard_allowed_threshold + long memoryOverboard = (long) Math.ceil((double) proc.memoryReserved * 1.6); + + // Test rss overboard + RunningFrameInfo info = RunningFrameInfo.newBuilder() + .setJobId(proc.getJobId()) + .setLayerId(proc.getLayerId()) + .setFrameId(proc.getFrameId()) + .setResourceId(proc.getProcId()) + .setRss(memoryOverboard) + .setMaxRss(memoryOverboard) + .build(); + HostReport report = HostReport.newBuilder() + .setHost(getRenderHost(hostname)) + .setCoreInfo(getCoreDetail(200, 200, 0, 0)) + .addFrames(info) + .build(); + + long killCount = DispatchSupport.killedOffenderProcs.get(); + hostReportHandler.handleHostReport(report, false); + assertEquals(killCount + 1, DispatchSupport.killedOffenderProcs.get()); + } + + @Test + @Transactional + @Rollback(true) + public void testMemoryAggressionMaxRss() { + jobLauncher.testMode = true; + dispatcher.setTestMode(true); + jobLauncher.launch(new File("src/test/resources/conf/jobspec/jobspec_simple.xml")); + + DispatchHost host = getHost(hostname); + List procs = dispatcher.dispatchHost(host); + assertEquals(1, procs.size()); + VirtualProc proc = procs.get(0); + + // 0.6 = dispatcher.oom_frame_overboard_allowed_threshold + long memoryOverboard = (long) Math.ceil((double) proc.memoryReserved * + (1.0 + (2 * 0.6))); + + // Test rss>90% and maxRss overboard + RunningFrameInfo info = RunningFrameInfo.newBuilder() + .setJobId(proc.getJobId()) + .setLayerId(proc.getLayerId()) + .setFrameId(proc.getFrameId()) + .setResourceId(proc.getProcId()) + .setRss((long)Math.ceil(0.95 * proc.memoryReserved)) + .setMaxRss(memoryOverboard) + .build(); + HostReport report = HostReport.newBuilder() + .setHost(getRenderHost(hostname)) + .setCoreInfo(getCoreDetail(200, 200, 0, 0)) + .addFrames(info) + .build(); + + long killCount = DispatchSupport.killedOffenderProcs.get(); + hostReportHandler.handleHostReport(report, false); + assertEquals(killCount + 1, DispatchSupport.killedOffenderProcs.get()); + } + + @Test + @Transactional + @Rollback(true) + public void testMemoryAggressionMemoryWarning() { + jobLauncher.testMode = true; + dispatcher.setTestMode(true); + jobLauncher.launch(new File("src/test/resources/conf/jobspec/jobspec_multiple_frames.xml")); + + DispatchHost host = getHost(hostname); + List procs = dispatcher.dispatchHost(host); + assertEquals(3, procs.size()); + VirtualProc proc1 = procs.get(0); + VirtualProc proc2 = procs.get(1); + VirtualProc proc3 = procs.get(2); + + // Ok + RunningFrameInfo info1 = RunningFrameInfo.newBuilder() + .setJobId(proc1.getJobId()) + .setLayerId(proc1.getLayerId()) + .setFrameId(proc1.getFrameId()) + .setResourceId(proc1.getProcId()) + .setRss(CueUtil.GB2) + .setMaxRss(CueUtil.GB2) + .build(); + + // Overboard Rss + RunningFrameInfo info2 = RunningFrameInfo.newBuilder() + .setJobId(proc2.getJobId()) + .setLayerId(proc2.getLayerId()) + .setFrameId(proc2.getFrameId()) + .setResourceId(proc2.getProcId()) + .setRss(CueUtil.GB4) + .setMaxRss(CueUtil.GB4) + .build(); + + // Overboard Rss + long memoryUsedProc3 = CueUtil.GB8; + RunningFrameInfo info3 = RunningFrameInfo.newBuilder() + .setJobId(proc3.getJobId()) + .setLayerId(proc3.getLayerId()) + .setFrameId(proc3.getFrameId()) + .setResourceId(proc3.getProcId()) + .setRss(memoryUsedProc3) + .setMaxRss(memoryUsedProc3) + .build(); + + RenderHost hostAfterUpdate = getRenderHostBuilder(hostname).setFreeMem(0).build(); + + HostReport report = HostReport.newBuilder() + .setHost(hostAfterUpdate) + .setCoreInfo(getCoreDetail(200, 200, 0, 0)) + .addAllFrames(Arrays.asList(info1, info2, info3)) + .build(); + + // Get layer state before report gets sent + LayerDetail layerBeforeIncrease = jobManager.getLayerDetail(proc3.getLayerId()); + + // In this case, killing one job should be enough to ge the machine to a safe state + long killCount = DispatchSupport.killedOffenderProcs.get(); + hostReportHandler.handleHostReport(report, false); + assertEquals(killCount + 1, DispatchSupport.killedOffenderProcs.get()); + + // Confirm the frame will be set to retry after it's completion has been processed + + RunningFrameInfo runningFrame = RunningFrameInfo.newBuilder() + .setFrameId(proc3.getFrameId()) + .setFrameName("frame_name") + .setLayerId(proc3.getLayerId()) + .setRss(memoryUsedProc3) + .setMaxRss(memoryUsedProc3) + .setResourceId(proc3.id) + .build(); + FrameCompleteReport completeReport = FrameCompleteReport.newBuilder() + .setHost(hostAfterUpdate) + .setFrame(runningFrame) + .setExitSignal(9) + .setRunTime(1) + .setExitStatus(1) + .build(); + + frameCompleteHandler.handleFrameCompleteReport(completeReport); + FrameDetail killedFrame = jobManager.getFrameDetail(proc3.getFrameId()); + LayerDetail layer = jobManager.getLayerDetail(proc3.getLayerId()); + assertEquals(FrameState.WAITING, killedFrame.state); + // Memory increases are processed in two different places one will set the new value to proc.reserved + 2GB + // and the other will set to the maximum reported proc.maxRss the end value will be whoever is higher. + // In this case, proc.maxRss + assertEquals(Math.max(memoryUsedProc3, layerBeforeIncrease.getMinimumMemory() + CueUtil.GB2), + layer.getMinimumMemory()); + } } diff --git a/cuebot/src/test/resources/conf/jobspec/jobspec_multiple_frames.xml b/cuebot/src/test/resources/conf/jobspec/jobspec_multiple_frames.xml new file mode 100644 index 000000000..3baa0b22b --- /dev/null +++ b/cuebot/src/test/resources/conf/jobspec/jobspec_multiple_frames.xml @@ -0,0 +1,48 @@ + + + + + + + + + spi + pipe + default + testuser + 9860 + + + False + 2 + False + + + + echo hello + 1-3 + 1 + 2gb + + + shell + + + + + + diff --git a/cuebot/src/test/resources/opencue.properties b/cuebot/src/test/resources/opencue.properties index 00d0c4463..10516f881 100644 --- a/cuebot/src/test/resources/opencue.properties +++ b/cuebot/src/test/resources/opencue.properties @@ -38,7 +38,7 @@ dispatcher.job_query_max=20 dispatcher.job_lock_expire_seconds=2 dispatcher.job_lock_concurrency_level=3 dispatcher.frame_query_max=10 -dispatcher.job_frame_dispatch_max=2 +dispatcher.job_frame_dispatch_max=3 dispatcher.host_frame_dispatch_max=12 dispatcher.launch_queue.core_pool_size=1 @@ -64,9 +64,8 @@ dispatcher.kill_queue.queue_capacity=1000 dispatcher.booking_queue.core_pool_size=6 dispatcher.booking_queue.max_pool_size=6 dispatcher.booking_queue.queue_capacity=1000 - -# The minimum amount of free space in the temporary directory (mcp) to book a host. -# E.g: 1G = 1048576 kB => dispatcher.min_bookable_free_temp_dir_kb=1048576 -# Default = 1G = 1048576 kB -# If equals to -1, it means the feature is turned off -dispatcher.min_bookable_free_temp_dir_kb=1048576 \ No newline at end of file +dispatcher.min_bookable_free_temp_dir_kb=1048576 +dispatcher.min_bookable_free_mcp_kb=1048576 +dispatcher.oom_max_safe_used_memory_threshold=0.95 +dispatcher.oom_frame_overboard_allowed_threshold=0.6 +dispatcher.frame_kill_retry_limit=3 \ No newline at end of file diff --git a/rqd/rqd/rqdservicers.py b/rqd/rqd/rqdservicers.py index 98ab358ac..b736ef43b 100644 --- a/rqd/rqd/rqdservicers.py +++ b/rqd/rqd/rqdservicers.py @@ -67,6 +67,8 @@ def KillRunningFrame(self, request, context): frame = self.rqCore.getRunningFrame(request.frame_id) if frame: frame.kill(message=request.message) + else: + log.warning("Wasn't able to find frame(%s) to kill", request.frame_id) return rqd.compiled_proto.rqd_pb2.RqdStaticKillRunningFrameResponse() def ShutdownRqdNow(self, request, context): diff --git a/rqd/rqd/rqnetwork.py b/rqd/rqd/rqnetwork.py index 9f33e64a2..9d3591fdd 100644 --- a/rqd/rqd/rqnetwork.py +++ b/rqd/rqd/rqnetwork.py @@ -162,6 +162,7 @@ def kill(self, message=""): else: os.killpg(self.pid, rqd.rqconstants.KILL_SIGNAL) finally: + log.warning("kill() successfully killed frameId=%s pid=%s", self.frameId, self.pid) rqd.rqutil.permissionsLow() except OSError as e: log.warning(