Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Cuebot] Add FIFO scheduling capability #1060

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public interface DispatcherDao {
* @param numJobs
* @return
*/
Set<String> findDispatchJobsForAllShows(DispatchHost host, int numJobs);
List<String> findDispatchJobsForAllShows(DispatchHost host, int numJobs);

/**
* Return a list of jobs which could use resources of the specified
Expand All @@ -92,7 +92,7 @@ public interface DispatcherDao {
* @param numJobs
* @return
*/
Set<String> findDispatchJobs(DispatchHost host, int numJobs);
List<String> findDispatchJobs(DispatchHost host, int numJobs);

/**
* Return a list of jobs which could use resources of the specified
Expand All @@ -102,7 +102,7 @@ public interface DispatcherDao {
* @param numJobs
* @return
*/
Set<String> findDispatchJobs(DispatchHost host, GroupInterface g);
List<String> findDispatchJobs(DispatchHost host, GroupInterface g);

/**
* Finds an under proced job if one exists and returns it,
Expand Down Expand Up @@ -131,7 +131,7 @@ public interface DispatcherDao {
* @param numJobs
* @return
*/
Set<String> findDispatchJobs(DispatchHost host, ShowInterface show, int numJobs);
List<String> findDispatchJobs(DispatchHost host, ShowInterface show, int numJobs);

/**
* Find a list of local dispatch jobs.
Expand Down Expand Up @@ -162,6 +162,20 @@ List<DispatchFrame> findNextDispatchFrames(LayerInterface layer, VirtualProc pro
*/
List<DispatchFrame> findNextDispatchFrames(LayerInterface layer, DispatchHost host,
int limit);

/**
* Return whether FIFO scheduling is enabled or not in the same priority for unittest.
*
* @return
*/
boolean getFifoSchedulingEnabled();

/**
* Set whether FIFO scheduling is enabled or not in the same priority for unittest.
*
* @param fifoSchedulingEnabled
*/
void setFifoSchedulingEnabled(boolean fifoSchedulingEnabled);
}


Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,22 @@ public class DispatchQuery {
"AND job.pk_folder = ? ");


private static final String replaceQueryForFifo(String query) {
return query
.replace(
"JOBS_BY",
"JOBS_FIFO_BY")
.replace(
"ORDER BY job_resource.int_priority DESC",
"ORDER BY job_resource.int_priority DESC, job.ts_started ASC")
.replace(
"WHERE rank < ?",
"WHERE rank < ? ORDER BY rank");
}

public static final String FIND_JOBS_FIFO_BY_SHOW = replaceQueryForFifo(FIND_JOBS_BY_SHOW);
public static final String FIND_JOBS_FIFO_BY_GROUP = replaceQueryForFifo(FIND_JOBS_BY_GROUP);

/**
* Dispatch a host in local booking mode.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedHashSet;
import java.util.LinkedList;
Expand All @@ -28,6 +29,8 @@
import java.util.concurrent.ConcurrentHashMap;

import org.apache.log4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.env.Environment;
import org.springframework.jdbc.core.RowMapper;
import org.springframework.jdbc.core.support.JdbcDaoSupport;

Expand All @@ -52,6 +55,8 @@
import static com.imageworks.spcue.dao.postgres.DispatchQuery.FIND_JOBS_BY_GROUP;
import static com.imageworks.spcue.dao.postgres.DispatchQuery.FIND_JOBS_BY_LOCAL;
import static com.imageworks.spcue.dao.postgres.DispatchQuery.FIND_JOBS_BY_SHOW;
import static com.imageworks.spcue.dao.postgres.DispatchQuery.FIND_JOBS_FIFO_BY_GROUP;
import static com.imageworks.spcue.dao.postgres.DispatchQuery.FIND_JOBS_FIFO_BY_SHOW;
import static com.imageworks.spcue.dao.postgres.DispatchQuery.FIND_LOCAL_DISPATCH_FRAME_BY_JOB_AND_HOST;
import static com.imageworks.spcue.dao.postgres.DispatchQuery.FIND_LOCAL_DISPATCH_FRAME_BY_JOB_AND_PROC;
import static com.imageworks.spcue.dao.postgres.DispatchQuery.FIND_LOCAL_DISPATCH_FRAME_BY_LAYER_AND_HOST;
Expand Down Expand Up @@ -124,6 +129,27 @@ public List<SortableShow> getShows() {
private final ConcurrentHashMap<String, ShowCache> bookableShows =
new ConcurrentHashMap<String, ShowCache>();

/**
* Whether or not to enable FIFO scheduling in the same priority.
*/
private boolean fifoSchedulingEnabled;

@Autowired
public DispatcherDaoJdbc(Environment env) {
fifoSchedulingEnabled = env.getProperty(
"dispatcher.fifo_scheduling_enabled", Boolean.class, false);
}

@Override
public boolean getFifoSchedulingEnabled() {
return fifoSchedulingEnabled;
}

@Override
public void setFifoSchedulingEnabled(boolean fifoSchedulingEnabled) {
this.fifoSchedulingEnabled = fifoSchedulingEnabled;
}

/**
* Returns a sorted list of shows that have pending jobs
* which could benefit from the specified allocation.
Expand All @@ -149,8 +175,8 @@ else if (cached.isExpired()) {
return bookableShows.get(key).shows;
}

private Set<String> findDispatchJobs(DispatchHost host, int numJobs, boolean shuffleShows) {
LinkedHashSet<String> result = new LinkedHashSet<String>();
private List<String> findDispatchJobs(DispatchHost host, int numJobs, boolean shuffleShows) {
ArrayList<String> result = new ArrayList<String>();
List<SortableShow> shows = new LinkedList<SortableShow>(getBookableShows(host));
// shows were sorted. If we want it in random sequence, we need to shuffle it.
if (shuffleShows) {
Expand Down Expand Up @@ -185,7 +211,7 @@ private Set<String> findDispatchJobs(DispatchHost host, int numJobs, boolean shu
}

result.addAll(getJdbcTemplate().query(
FIND_JOBS_BY_SHOW,
fifoSchedulingEnabled ? FIND_JOBS_FIFO_BY_SHOW : FIND_JOBS_BY_SHOW,
PKJOB_MAPPER,
s.getShowId(), host.getFacilityId(), host.os,
host.idleCores, host.idleMemory,
Expand All @@ -208,27 +234,26 @@ private Set<String> findDispatchJobs(DispatchHost host, int numJobs, boolean shu
}

@Override
public Set<String> findDispatchJobsForAllShows(DispatchHost host, int numJobs) {
public List<String> findDispatchJobsForAllShows(DispatchHost host, int numJobs) {
return findDispatchJobs(host, numJobs, true);
}

@Override
public Set<String> findDispatchJobs(DispatchHost host, int numJobs) {
public List<String> findDispatchJobs(DispatchHost host, int numJobs) {
return findDispatchJobs(host, numJobs, false);
}

@Override
public Set<String> findDispatchJobs(DispatchHost host, GroupInterface g) {
LinkedHashSet<String> result = new LinkedHashSet<String>(5);
result.addAll(getJdbcTemplate().query(
FIND_JOBS_BY_GROUP,
public List<String> findDispatchJobs(DispatchHost host, GroupInterface g) {
List<String> result = getJdbcTemplate().query(
fifoSchedulingEnabled ? FIND_JOBS_FIFO_BY_GROUP : FIND_JOBS_BY_GROUP,
PKJOB_MAPPER,
g.getGroupId(),host.getFacilityId(), host.os,
host.idleCores, host.idleMemory,
threadMode(host.threadMode),
host.idleGpus,
(host.idleGpuMemory > 0) ? 1 : 0, host.idleGpuMemory,
host.getName(), 50));
host.getName(), 50);

return result;
}
Expand Down Expand Up @@ -378,19 +403,17 @@ public boolean higherPriorityJobExists(JobDetail baseJob, VirtualProc proc) {
}

@Override
public Set<String> findDispatchJobs(DispatchHost host,
public List<String> findDispatchJobs(DispatchHost host,
ShowInterface show, int numJobs) {
LinkedHashSet<String> result = new LinkedHashSet<String>(numJobs);

result.addAll(getJdbcTemplate().query(
FIND_JOBS_BY_SHOW,
List<String> result = getJdbcTemplate().query(
fifoSchedulingEnabled ? FIND_JOBS_FIFO_BY_SHOW : FIND_JOBS_BY_SHOW,
PKJOB_MAPPER,
show.getShowId(), host.getFacilityId(), host.os,
host.idleCores, host.idleMemory,
threadMode(host.threadMode),
host.idleGpus,
(host.idleGpuMemory > 0) ? 1 : 0, host.idleGpuMemory,
host.getName(), numJobs * 10));
host.getName(), numJobs * 10);

return result;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;

import com.google.common.cache.Cache;
Expand Down Expand Up @@ -126,7 +125,7 @@ private Cache<String, String> getOrCreateJobLock() {
}


private List<VirtualProc> dispatchJobs(DispatchHost host, Set<String> jobs) {
private List<VirtualProc> dispatchJobs(DispatchHost host, List<String> jobs) {
List<VirtualProc> procs = new ArrayList<VirtualProc>();

try {
Expand Down Expand Up @@ -170,8 +169,8 @@ private List<VirtualProc> dispatchJobs(DispatchHost host, Set<String> jobs) {
return procs;
}

private Set<String> getGpuJobs(DispatchHost host, ShowInterface show) {
Set<String> jobs = null;
private List<String> getGpuJobs(DispatchHost host, ShowInterface show) {
List<String> jobs = null;

// TODO: GPU: make index with the 4 components instead of just 3, replace the just 3

Expand Down Expand Up @@ -200,7 +199,7 @@ private Set<String> getGpuJobs(DispatchHost host, ShowInterface show) {

@Override
public List<VirtualProc> dispatchHostToAllShows(DispatchHost host) {
Set<String> jobs = dispatchSupport.findDispatchJobsForAllShows(
List<String> jobs = dispatchSupport.findDispatchJobsForAllShows(
host,
getIntProperty("dispatcher.job_query_max"));

Expand All @@ -210,7 +209,7 @@ public List<VirtualProc> dispatchHostToAllShows(DispatchHost host) {
@Override
public List<VirtualProc> dispatchHost(DispatchHost host) {

Set<String> jobs = getGpuJobs(host, null);
List<String> jobs = getGpuJobs(host, null);

if (jobs == null)
jobs = dispatchSupport.findDispatchJobs(host, getIntProperty("dispatcher.job_query_max"));
Expand All @@ -221,7 +220,7 @@ public List<VirtualProc> dispatchHost(DispatchHost host) {
@Override
public List<VirtualProc> dispatchHost(DispatchHost host, ShowInterface show) {

Set<String> jobs = getGpuJobs(host, show);
List<String> jobs = getGpuJobs(host, show);

if (jobs == null)
jobs = dispatchSupport.findDispatchJobs(host, show,
Expand All @@ -233,7 +232,7 @@ public List<VirtualProc> dispatchHost(DispatchHost host, ShowInterface show) {
@Override
public List<VirtualProc> dispatchHost(DispatchHost host, GroupInterface group) {

Set<String> jobs = getGpuJobs(host, null);
List<String> jobs = getGpuJobs(host, null);

if (jobs == null)
jobs = dispatchSupport.findDispatchJobs(host, group);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,7 @@ List<DispatchFrame> findNextDispatchFrames(LayerInterface layer, VirtualProc pro
* @param host
* @return
*/
Set<String> findDispatchJobsForAllShows(DispatchHost host, int numJobs);
List<String> findDispatchJobsForAllShows(DispatchHost host, int numJobs);

/**
* Returns the highest priority job that can utilize
Expand All @@ -315,7 +315,7 @@ List<DispatchFrame> findNextDispatchFrames(LayerInterface layer, VirtualProc pro
* @param host
* @return
*/
Set<String> findDispatchJobs(DispatchHost host, int numJobs);
List<String> findDispatchJobs(DispatchHost host, int numJobs);

/**
* Returns the highest priority jobs that can utilize
Expand All @@ -324,7 +324,7 @@ List<DispatchFrame> findNextDispatchFrames(LayerInterface layer, VirtualProc pro
* @param host
* @return A set of unique job ids.
*/
Set<String> findDispatchJobs(DispatchHost host, GroupInterface p);
List<String> findDispatchJobs(DispatchHost host, GroupInterface p);

/**
*
Expand Down Expand Up @@ -523,14 +523,14 @@ void updateProcMemoryUsage(FrameInterface frame, long rss, long maxRss, long vsi
void determineIdleCores(DispatchHost host, int load);

/**
* Return a set of job IDs that can take the given host.
* Return a list of job IDs that can take the given host.
*
* @param host
* @param show
* @param numJobs
* @return
*/
Set<String> findDispatchJobs(DispatchHost host, ShowInterface show, int numJobs);
List<String> findDispatchJobs(DispatchHost host, ShowInterface show, int numJobs);

/**
* Return true of the job has pending frames.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,17 +148,17 @@ public boolean higherPriorityJobExists(JobDetail baseJob, VirtualProc proc) {
}

@Transactional(readOnly = true)
public Set<String> findDispatchJobsForAllShows(DispatchHost host, int numJobs) {
public List<String> findDispatchJobsForAllShows(DispatchHost host, int numJobs) {
return dispatcherDao.findDispatchJobsForAllShows(host, numJobs);
}

@Transactional(readOnly = true)
public Set<String> findDispatchJobs(DispatchHost host, int numJobs) {
public List<String> findDispatchJobs(DispatchHost host, int numJobs) {
return dispatcherDao.findDispatchJobs(host, numJobs);
}

@Transactional(readOnly = true)
public Set<String> findDispatchJobs(DispatchHost host, GroupInterface g) {
public List<String> findDispatchJobs(DispatchHost host, GroupInterface g) {
return dispatcherDao.findDispatchJobs(host, g);
}

Expand All @@ -170,7 +170,7 @@ public Set<String> findLocalDispatchJobs(DispatchHost host) {

@Override
@Transactional(readOnly = true)
public Set<String> findDispatchJobs(DispatchHost host, ShowInterface show,
public List<String> findDispatchJobs(DispatchHost host, ShowInterface show,
int numJobs) {
return dispatcherDao.findDispatchJobs(host, show, numJobs);
}
Expand Down
2 changes: 2 additions & 0 deletions cuebot/src/main/resources/opencue.properties
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ dispatcher.frame_query_max=20
dispatcher.job_frame_dispatch_max=8
# Maximum number of frames to dispatch from a host at one time.
dispatcher.host_frame_dispatch_max=12
# Whether or not to enable FIFO scheduling in the same priority.
dispatcher.fifo_scheduling_enabled=false

# Jobs will be archived to the history tables after being completed for this long.
history.archive_jobs_cutoff_hours=72
Expand Down
Loading