Skip to content

Commit

Permalink
Merge branch 'master' into rqd-config-file-from-env-var
Browse files Browse the repository at this point in the history
Signed-off-by: Kern Attila GERMAIN <[email protected]>
  • Loading branch information
KernAttila authored Nov 9, 2023
2 parents e4f474d + fe519e3 commit b6dd28b
Show file tree
Hide file tree
Showing 61 changed files with 1,335 additions and 410 deletions.
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@

/*
* Copyright Contributors to the OpenCue Project
*
Expand Down
22 changes: 22 additions & 0 deletions cuebot/src/main/java/com/imageworks/spcue/dao/CommentDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import com.imageworks.spcue.HostInterface;
import com.imageworks.spcue.JobInterface;

import java.util.List;

public interface CommentDao {

/**
Expand All @@ -32,6 +34,26 @@ public interface CommentDao {
*/
public void deleteComment(String id);

/**
* Deletes comments using host, user, and subject
*
* @param host
* @param user
* @param subject
* @return boolean: returns true if one or more comments where deleted
*/
public boolean deleteCommentByHostUserAndSubject(HostInterface host, String user, String subject);

/**
* Get comments using host, user, and subject
*
* @param host
* @param user
* @param subject
* @return List<Comment>
*/
public List<CommentDetail> getCommentsByHostUserAndSubject(HostInterface host, String user, String subject);

/**
* Retrieves the specified comment.
*
Expand Down
7 changes: 7 additions & 0 deletions cuebot/src/main/java/com/imageworks/spcue/dao/FrameDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,13 @@ boolean updateFrameStopped(FrameInterface frame, FrameState state, int exitStatu
* @return
*/
boolean updateFrameCleared(FrameInterface frame);
/**
* Sets a frame exitStatus to EXIT_STATUS_MEMORY_FAILURE
*
* @param frame
* @return whether the frame has been updated
*/
boolean updateFrameMemoryError(FrameInterface frame);

/**
* Sets a frame to an unreserved waiting state.
Expand Down
17 changes: 8 additions & 9 deletions cuebot/src/main/java/com/imageworks/spcue/dao/HostDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,14 @@ public interface HostDao {
*/
void updateHostState(HostInterface host, HardwareState state);

/**
* updates a host with the passed free temporary directory
*
* @param host
* @param freeTempDir
*/
void updateHostFreeTempDir(HostInterface host, Long freeTempDir);

/**
* returns a full host detail
*
Expand Down Expand Up @@ -244,15 +252,6 @@ public interface HostDao {
*/
void updateThreadMode(HostInterface host, ThreadMode mode);

/**
* When a host is in kill mode that means its 256MB+ into the swap and the
* the worst memory offender is killed.
*
* @param h HostInterface
* @return boolean
*/
boolean isKillMode(HostInterface h);

/**
* Update the specified host's hardware information.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.List;
import java.util.Map;

import org.springframework.jdbc.core.RowMapper;
Expand Down Expand Up @@ -71,6 +72,18 @@ public CommentDetail mapRow(ResultSet rs, int row) throws SQLException {
}
};

public boolean deleteCommentByHostUserAndSubject(HostInterface host, String user, String subject) {
return getJdbcTemplate().update(
"DELETE FROM comments WHERE pk_host=? AND str_user=? AND str_subject=?",
host.getHostId(), user, subject) > 0;
}

public List<CommentDetail> getCommentsByHostUserAndSubject(HostInterface host, String user, String subject) {
return getJdbcTemplate().query(
"SELECT * FROM comments WHERE pk_host=? AND str_user=? AND str_subject=?",
COMMENT_DETAIL_MAPPER, host.getHostId(), user, subject);
}

public CommentDetail getCommentDetail(String id) {
return getJdbcTemplate().queryForObject(
"SELECT * FROM comments WHERE pk_comment=?",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,24 @@ public boolean updateFrameCleared(FrameInterface frame) {
return updateFrame(frame, Dispatcher.EXIT_STATUS_FRAME_CLEARED) > 0;
}

private static final String UPDATE_FRAME_MEMORY_ERROR =
"UPDATE "+
"frame "+
"SET " +
"int_exit_status = ?, " +
"int_version = int_version + 1 " +
"WHERE " +
"frame.pk_frame = ? ";
@Override
public boolean updateFrameMemoryError(FrameInterface frame) {
int result = getJdbcTemplate().update(
UPDATE_FRAME_MEMORY_ERROR,
Dispatcher.EXIT_STATUS_MEMORY_FAILURE,
frame.getFrameId());

return result > 0;
}

private static final String UPDATE_FRAME_STARTED =
"UPDATE " +
"frame " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -523,6 +523,13 @@ public void updateHostState(HostInterface host, HardwareState state) {
state.toString(), host.getHostId());
}

@Override
public void updateHostFreeTempDir(HostInterface host, Long freeTempDir) {
getJdbcTemplate().update(
"UPDATE host_stat SET int_mcp_free=? WHERE pk_host=?",
freeTempDir, host.getHostId());
}

@Override
public void updateHostSetAllocation(HostInterface host, AllocationInterface alloc) {

Expand Down Expand Up @@ -605,15 +612,6 @@ public void updateHostOs(HostInterface host, String os) {
os, host.getHostId());
}

@Override
public boolean isKillMode(HostInterface h) {
return getJdbcTemplate().queryForObject(
"SELECT COUNT(1) FROM host_stat WHERE pk_host = ? " +
"AND int_swap_total - int_swap_free > ? AND int_mem_free < ?",
Integer.class, h.getHostId(), Dispatcher.KILL_MODE_SWAP_THRESHOLD,
Dispatcher.KILL_MODE_MEM_THRESHOLD) > 0;
}

@Override
public int getStrandedCoreUnits(HostInterface h) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -564,7 +564,7 @@ public boolean increaseReservedMemory(ProcInterface p, long value) {
value, p.getProcId(), value) == 1;
} catch (Exception e) {
// check by trigger erify_host_resources
throw new ResourceReservationFailureException("failed to increase memory reserveration for proc "
throw new ResourceReservationFailureException("failed to increase memory reservation for proc "
+ p.getProcId() + " to " + value + ", proc does not have that much memory to spare.");
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -415,6 +415,14 @@ List<DispatchFrame> findNextDispatchFrames(LayerInterface layer, VirtualProc pro
*/
void clearFrame(DispatchFrame frame);

/**
* Sets the frame state exitStatus to EXIT_STATUS_MEMORY_FAILURE
*
* @param frame
* @return whether the frame has been updated
*/
boolean updateFrameMemoryError(FrameInterface frame);

/**
* Update Memory usage data and LLU time for the given frame.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.LogManager;
import org.springframework.dao.EmptyResultDataAccessException;
import org.springframework.dao.DataAccessException;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;

Expand Down Expand Up @@ -184,7 +185,11 @@ public boolean increaseReservedMemory(ProcInterface p, long value) {

@Override
public boolean clearVirtualProcAssignement(ProcInterface proc) {
return procDao.clearVirtualProcAssignment(proc);
try {
return procDao.clearVirtualProcAssignment(proc);
} catch (DataAccessException e) {
return false;
}
}

@Transactional(propagation = Propagation.REQUIRED)
Expand Down Expand Up @@ -343,6 +348,12 @@ public void clearFrame(DispatchFrame frame) {
frameDao.updateFrameCleared(frame);
}

@Override
@Transactional(propagation = Propagation.REQUIRED)
public boolean updateFrameMemoryError(FrameInterface frame) {
return frameDao.updateFrameMemoryError(frame);
}

@Transactional(propagation = Propagation.SUPPORTS)
public RunFrame prepareRqdRunFrame(VirtualProc proc, DispatchFrame frame) {
int threads = proc.coresReserved / 100;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@

/*
* Copyright Contributors to the OpenCue Project
*
Expand Down Expand Up @@ -108,13 +107,8 @@ public interface Dispatcher {
// without being penalized for it.
public static final long VIRTUAL_MEM_THRESHHOLD = CueUtil.GB2;

// The amount of swap that must be used before a host can go
// into kill mode.
public static final long KILL_MODE_SWAP_THRESHOLD = CueUtil.MB128;

// When the amount of free memory drops below this point, the
// host can go into kill mode.
public static final long KILL_MODE_MEM_THRESHOLD = CueUtil.MB512;
// How long to keep track of a frame kill request
public static final int FRAME_KILL_CACHE_EXPIRE_AFTER_WRITE_MINUTES = 3;

// A higher number gets more deep booking but less spread on the cue.
public static final int DEFAULT_MAX_FRAMES_PER_PASS = 4;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import com.imageworks.spcue.DispatchFrame;
import com.imageworks.spcue.DispatchHost;
import com.imageworks.spcue.DispatchJob;
import com.imageworks.spcue.FrameDetail;
import com.imageworks.spcue.JobDetail;
import com.imageworks.spcue.LayerDetail;
import com.imageworks.spcue.LayerInterface;
Expand Down Expand Up @@ -143,49 +144,35 @@ public void handleFrameCompleteReport(final FrameCompleteReport report) {
}

try {

final VirtualProc proc;

try {

proc = hostManager.getVirtualProc(
report.getFrame().getResourceId());
}
catch (EmptyResultDataAccessException e) {
/*
* Do not propagate this exception to RQD. This
* usually means the cue lost connectivity to
* the host and cleared out the record of the proc.
* If this is propagated back to RQD, RQD will
* keep retrying the operation forever.
*/
logger.info("failed to acquire data needed to " +
"process completed frame: " +
report.getFrame().getFrameName() + " in job " +
report.getFrame().getJobName() + "," + e);
return;
}

final VirtualProc proc = hostManager.getVirtualProc(report.getFrame().getResourceId());
final DispatchJob job = jobManager.getDispatchJob(proc.getJobId());
final LayerDetail layer = jobManager.getLayerDetail(report.getFrame().getLayerId());
final FrameDetail frameDetail = jobManager.getFrameDetail(report.getFrame().getFrameId());
final DispatchFrame frame = jobManager.getDispatchFrame(report.getFrame().getFrameId());
final FrameState newFrameState = determineFrameState(job, layer, frame, report);
final String key = proc.getJobId() + "_" + report.getFrame().getLayerId() +
"_" + report.getFrame().getFrameId();

if (dispatchSupport.stopFrame(frame, newFrameState, report.getExitStatus(),
report.getFrame().getMaxRss())) {
dispatchQueue.execute(new KeyRunnable(key) {
@Override
public void run() {
try {
handlePostFrameCompleteOperations(proc, report, job, frame,
newFrameState);
} catch (Exception e) {
logger.warn("Exception during handlePostFrameCompleteOperations " +
"in handleFrameCompleteReport" + CueExceptionUtil.getStackTrace(e));
if (dispatcher.isTestMode()) {
// Database modifications on a threadpool cannot be captured by the test thread
handlePostFrameCompleteOperations(proc, report, job, frame,
newFrameState, frameDetail);
} else {
dispatchQueue.execute(new KeyRunnable(key) {
@Override
public void run() {
try {
handlePostFrameCompleteOperations(proc, report, job, frame,
newFrameState, frameDetail);
} catch (Exception e) {
logger.warn("Exception during handlePostFrameCompleteOperations " +
"in handleFrameCompleteReport" + CueExceptionUtil.getStackTrace(e));
}
}
}
});
});
}
}
else {
/*
Expand Down Expand Up @@ -222,6 +209,19 @@ public void run() {
}
}
}
catch (EmptyResultDataAccessException e) {
/*
* Do not propagate this exception to RQD. This
* usually means the cue lost connectivity to
* the host and cleared out the record of the proc.
* If this is propagated back to RQD, RQD will
* keep retrying the operation forever.
*/
logger.info("failed to acquire data needed to " +
"process completed frame: " +
report.getFrame().getFrameName() + " in job " +
report.getFrame().getJobName() + "," + e);
}
catch (Exception e) {

/*
Expand Down Expand Up @@ -259,7 +259,7 @@ public void run() {
*/
public void handlePostFrameCompleteOperations(VirtualProc proc,
FrameCompleteReport report, DispatchJob job, DispatchFrame frame,
FrameState newFrameState) {
FrameState newFrameState, FrameDetail frameDetail) {
try {

/*
Expand Down Expand Up @@ -313,7 +313,8 @@ public void handlePostFrameCompleteOperations(VirtualProc proc,
* specified in the show's service override, service or 2GB.
*/
if (report.getExitStatus() == Dispatcher.EXIT_STATUS_MEMORY_FAILURE
|| report.getExitSignal() == Dispatcher.EXIT_STATUS_MEMORY_FAILURE) {
|| report.getExitSignal() == Dispatcher.EXIT_STATUS_MEMORY_FAILURE
|| frameDetail.exitStatus == Dispatcher.EXIT_STATUS_MEMORY_FAILURE) {
long increase = CueUtil.GB2;

// since there can be multiple services, just going for the
Expand Down
Loading

0 comments on commit b6dd28b

Please sign in to comment.