Skip to content

Commit

Permalink
Merge remote-tracking branch 'github_mirror/master' into add_layer_ma…
Browse files Browse the repository at this point in the history
…x_cores
  • Loading branch information
akim-ruslanov committed Apr 5, 2022
2 parents ba6a2ce + 6993baa commit ad0b14b
Show file tree
Hide file tree
Showing 43 changed files with 970 additions and 397 deletions.
2 changes: 1 addition & 1 deletion VERSION.in
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.15
0.16
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,5 @@ public ServletRegistrationBean<JobLaunchServlet> jobLaunchServlet() {
b.setServlet(new JobLaunchServlet());
return b;
}

}

148 changes: 61 additions & 87 deletions cuebot/src/main/java/com/imageworks/spcue/dispatcher/BookingQueue.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,128 +19,102 @@

package com.imageworks.spcue.dispatcher;

import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.imageworks.spcue.dispatcher.commands.DispatchBookHost;
import com.imageworks.spcue.util.CueUtil;
import com.imageworks.spcue.dispatcher.commands.KeyRunnable;

import org.apache.log4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.env.Environment;

public class BookingQueue extends ThreadPoolExecutor {

private static final Logger logger = Logger.getLogger(BookingQueue.class);

private static final int THREADS_KEEP_ALIVE_SECONDS = 10;
public class BookingQueue {

private int queueCapacity;
private int baseSleepTimeMillis = 400;
private AtomicBoolean isShutdown = new AtomicBoolean(false);
private final int healthThreshold;
private final int minUnhealthyPeriodMin;
private final int queueCapacity;
private final int corePoolSize;
private final int maxPoolSize;
private static final int BASE_SLEEP_TIME_MILLIS = 300;

private QueueRejectCounter rejectCounter = new QueueRejectCounter();
private static final Logger logger = Logger.getLogger("HEALTH");
private HealthyThreadPool healthyThreadPool;

private Cache<String, DispatchBookHost> bookingCache = CacheBuilder.newBuilder()
.expireAfterWrite(3, TimeUnit.MINUTES)
// Invalidate entries that got executed by the threadpool and lost their reference
.weakValues()
.build();

private BookingQueue(int corePoolSize, int maxPoolSize, int queueCapacity, int sleepTimeMs) {
super(corePoolSize, maxPoolSize, THREADS_KEEP_ALIVE_SECONDS,
TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(queueCapacity));
public BookingQueue(int healthThreshold, int minUnhealthyPeriodMin, int queueCapacity,
int corePoolSize, int maxPoolSize) {
this.healthThreshold = healthThreshold;
this.minUnhealthyPeriodMin = minUnhealthyPeriodMin;
this.queueCapacity = queueCapacity;
this.baseSleepTimeMillis = sleepTimeMs;
this.setRejectedExecutionHandler(rejectCounter);
logger.info("BookingQueue" +
" core:" + getCorePoolSize() +
" max:" + getMaximumPoolSize() +
" capacity:" + queueCapacity +
" sleepTimeMs:" + sleepTimeMs);
this.corePoolSize = corePoolSize;
this.maxPoolSize = maxPoolSize;
initThreadPool();
}

@Autowired
public BookingQueue(Environment env, String propertyKeyPrefix, int sleepTimeMs) {
this(CueUtil.getIntProperty(env, propertyKeyPrefix, "core_pool_size"),
CueUtil.getIntProperty(env, propertyKeyPrefix, "max_pool_size"),
CueUtil.getIntProperty(env, propertyKeyPrefix, "queue_capacity"),
sleepTimeMs);
public void initThreadPool() {
healthyThreadPool = new HealthyThreadPool(
"BookingQueue",
healthThreshold,
minUnhealthyPeriodMin,
queueCapacity,
corePoolSize,
maxPoolSize,
BASE_SLEEP_TIME_MILLIS);
}

public void execute(DispatchBookHost r) {
if (isShutdown.get()) {
return;
}
if (bookingCache.getIfPresent(r.getKey()) == null){
bookingCache.put(r.getKey(), r);
super.execute(r);
public boolean isHealthy() {
try {
if (!healthyThreadPool.isHealthyOrShutdown()) {
logger.warn("BookingQueue: Unhealthy queue terminated, starting a new one");
initThreadPool();
}
} catch (InterruptedException e) {
// TODO: evaluate crashing the whole springbook context here
// to force a container restart cycle
logger.error("Failed to restart BookingThreadPool", e);
return false;
}

return true;
}

public void execute(KeyRunnable r) {
healthyThreadPool.execute(r);
}

public long getRejectedTaskCount() {
return rejectCounter.getRejectCount();
return healthyThreadPool.getRejectedTaskCount();
}

public int getQueueCapacity() {
return queueCapacity;
}

public void shutdown() {
if (!isShutdown.getAndSet(true)) {
logger.info("clearing out booking queue: " + this.getQueue().size());
this.getQueue().clear();
}
healthyThreadPool.shutdown();
}

public int getSize() {
return healthyThreadPool.getQueue().size();
}

/**
* Lowers the sleep time as the queue grows.
*
* @return
*/
public int sleepTime() {
if (!isShutdown.get()) {
int sleep = (int) (baseSleepTimeMillis - (((this.getQueue().size () /
(float) queueCapacity) * baseSleepTimeMillis)) * 2);
if (sleep < 0) {
sleep = 0;
}
return sleep;
} else {
return 0;
}
public int getRemainingCapacity() {
return healthyThreadPool.getQueue().remainingCapacity();
}

protected void beforeExecute(Thread t, Runnable r) {
super.beforeExecute(t, r);
if (isShutdown()) {
this.remove(r);
} else {
try {
Thread.sleep(sleepTime());
} catch (InterruptedException e) {
logger.info("booking queue was interrupted.");
Thread.currentThread().interrupt();
}
}
public int getActiveCount() {
return healthyThreadPool.getActiveCount();
}

protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
public long getCompletedTaskCount() {
return healthyThreadPool.getCompletedTaskCount();
}

// Invalidate cache to avoid having to wait for GC to mark processed entries collectible
DispatchBookHost h = (DispatchBookHost)r;
bookingCache.invalidate(h.getKey());
public long getCorePoolSize() {
return corePoolSize;
}

if (sleepTime() < 100) {
logger.info("BookingQueue cleanup executed.");
getQueue().clear();
}
public long getMaximumPoolSize() {
return maxPoolSize;
}

}

Original file line number Diff line number Diff line change
Expand Up @@ -23,93 +23,84 @@
import java.util.concurrent.atomic.AtomicLong;

import org.apache.log4j.Logger;
import org.springframework.core.task.TaskExecutor;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import com.imageworks.spcue.dispatcher.commands.KeyRunnable;

public class DispatchQueue {

private TaskExecutor dispatchPool;
private ThreadPoolTaskExecutor _dispatchPool;
private String name = "Default";
private AtomicBoolean isShutdown = new AtomicBoolean(false);

private final AtomicLong tasksRun = new AtomicLong(0);
private final AtomicLong tasksRejected = new AtomicLong(0);
private int healthThreshold;
private int minUnhealthyPeriodMin;
private int queueCapacity;
private int corePoolSize;
private int maxPoolSize;

private static final Logger logger = Logger.getLogger(DispatchQueue.class);

public DispatchQueue() {}
private static final Logger logger = Logger.getLogger("HEALTH");
private String name = "Default";
private HealthyThreadPool healthyDispatchPool;

public DispatchQueue(String name) {
public DispatchQueue(String name, int healthThreshold, int minUnhealthyPeriodMin, int queueCapacity,
int corePoolSize, int maxPoolSize) {
this.name = name;
this.healthThreshold = healthThreshold;
this.minUnhealthyPeriodMin = minUnhealthyPeriodMin;
this.queueCapacity = queueCapacity;
this.corePoolSize = corePoolSize;
this.maxPoolSize = maxPoolSize;
initThreadPool();
}

public void execute(Runnable r) {
public void initThreadPool() {
healthyDispatchPool = new HealthyThreadPool(
name,
healthThreshold,
minUnhealthyPeriodMin,
queueCapacity,
corePoolSize,
maxPoolSize);
}

public boolean isHealthy() {
try {
if (!isShutdown.get()) {
this.dispatchPool.execute(r);
tasksRun.addAndGet(1);
if (!healthyDispatchPool.isHealthyOrShutdown()) {
logger.warn("DispatchQueue_" + name + ": Unhealthy queue terminated, starting a new one");
initThreadPool();
}
} catch (Exception e) {
long rejection = tasksRejected.addAndGet(1);
logger.warn("Warning, dispatch queue - [" + name + "] rejected, " + e);
throw new DispatchQueueTaskRejectionException(
"Warning, dispatch queue [" + name + " rejected task #"
+ rejection);
} catch (InterruptedException e) {
// TODO: evaluate crashing the whole springbook context here
// to force a container restart cycle
logger.error("DispatchQueue_" + name + ":Failed to restart DispatchThreadPool", e);
return false;
}
}

public int getMaxPoolSize() {
return _dispatchPool.getMaxPoolSize();
return true;
}

public int getActiveThreadCount() {
return _dispatchPool.getActiveCount();
public void execute(KeyRunnable r) {
healthyDispatchPool.execute(r);
}

public int getWaitingCount() {
return _dispatchPool.getThreadPoolExecutor().getQueue().size();
public long getRejectedTaskCount() {
return healthyDispatchPool.getRejectedTaskCount();
}

public int getRemainingCapacity() {
return _dispatchPool.getThreadPoolExecutor().getQueue().remainingCapacity();
public void shutdown() {
healthyDispatchPool.shutdown();
}

public long getTotalDispatched() {
return tasksRun.get();
public int getSize() {
return healthyDispatchPool.getQueue().size();
}

public long getTotalRejected() {
return tasksRejected.get();
public int getRemainingCapacity() {
return healthyDispatchPool.getQueue().remainingCapacity();
}

public TaskExecutor getDispatchPool() {
return dispatchPool;
public int getActiveCount() {
return healthyDispatchPool.getActiveCount();
}

public void setDispatchPool(TaskExecutor dispatchPool) {
this.dispatchPool = dispatchPool;
this._dispatchPool = (ThreadPoolTaskExecutor) dispatchPool;
public long getCompletedTaskCount() {
return healthyDispatchPool.getCompletedTaskCount();
}

public void shutdown() {
if (!isShutdown.getAndSet(true)) {
logger.info("Shutting down thread pool " + name + ", currently "
+ getActiveThreadCount() + " active threads.");
final long startTime = System.currentTimeMillis();
while (getWaitingCount() != 0 && getActiveThreadCount() != 0) {
try {
if (System.currentTimeMillis() - startTime > 10000) {
throw new InterruptedException(name
+ " thread pool failed to shutdown properly");
}
Thread.sleep(250);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
}
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import com.imageworks.spcue.VirtualProc;
import com.imageworks.spcue.dispatcher.commands.DispatchBookHost;
import com.imageworks.spcue.dispatcher.commands.DispatchNextFrame;
import com.imageworks.spcue.dispatcher.commands.KeyRunnable;
import com.imageworks.spcue.grpc.host.LockState;
import com.imageworks.spcue.grpc.job.FrameExitStatus;
import com.imageworks.spcue.grpc.job.FrameState;
Expand Down Expand Up @@ -158,10 +159,11 @@ public void handleFrameCompleteReport(final FrameCompleteReport report) {
final LayerDetail layer = jobManager.getLayerDetail(report.getFrame().getLayerId());
final DispatchFrame frame = jobManager.getDispatchFrame(report.getFrame().getFrameId());
final FrameState newFrameState = determineFrameState(job, layer, frame, report);

final String key = proc.getJobId() + "_" + report.getFrame().getLayerId() +
"_" + report.getFrame().getFrameId();
if (dispatchSupport.stopFrame(frame, newFrameState, report.getExitStatus(),
report.getFrame().getMaxRss())) {
dispatchQueue.execute(new Runnable() {
dispatchQueue.execute(new KeyRunnable(key) {
@Override
public void run() {
try {
Expand All @@ -182,7 +184,7 @@ public void run() {
* properties.
*/
if (redirectManager.hasRedirect(proc)) {
dispatchQueue.execute(new Runnable() {
dispatchQueue.execute(new KeyRunnable(key) {
@Override
public void run() {
try {
Expand All @@ -195,7 +197,7 @@ public void run() {
});
}
else {
dispatchQueue.execute(new Runnable() {
dispatchQueue.execute(new KeyRunnable(key) {
@Override
public void run() {
try {
Expand Down
Loading

0 comments on commit ad0b14b

Please sign in to comment.