diff --git a/VERSION.in b/VERSION.in index 2856407c0..8eac30c38 100644 --- a/VERSION.in +++ b/VERSION.in @@ -1 +1 @@ -0.15 +0.16 diff --git a/cuebot/src/main/java/com/imageworks/spcue/config/AppConfig.java b/cuebot/src/main/java/com/imageworks/spcue/config/AppConfig.java index 4edac2826..adf6e0368 100644 --- a/cuebot/src/main/java/com/imageworks/spcue/config/AppConfig.java +++ b/cuebot/src/main/java/com/imageworks/spcue/config/AppConfig.java @@ -65,6 +65,5 @@ public ServletRegistrationBean jobLaunchServlet() { b.setServlet(new JobLaunchServlet()); return b; } - } diff --git a/cuebot/src/main/java/com/imageworks/spcue/dispatcher/BookingQueue.java b/cuebot/src/main/java/com/imageworks/spcue/dispatcher/BookingQueue.java index 7428f0bca..347acd025 100644 --- a/cuebot/src/main/java/com/imageworks/spcue/dispatcher/BookingQueue.java +++ b/cuebot/src/main/java/com/imageworks/spcue/dispatcher/BookingQueue.java @@ -19,71 +19,69 @@ package com.imageworks.spcue.dispatcher; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; - import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; -import com.imageworks.spcue.dispatcher.commands.DispatchBookHost; -import com.imageworks.spcue.util.CueUtil; +import com.imageworks.spcue.dispatcher.commands.KeyRunnable; import org.apache.log4j.Logger; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.core.env.Environment; -public class BookingQueue extends ThreadPoolExecutor { - - private static final Logger logger = Logger.getLogger(BookingQueue.class); - - private static final int THREADS_KEEP_ALIVE_SECONDS = 10; +public class BookingQueue { - private int queueCapacity; - private int baseSleepTimeMillis = 400; - private AtomicBoolean isShutdown = new AtomicBoolean(false); + private final int healthThreshold; + private final int minUnhealthyPeriodMin; + private final int queueCapacity; + private final int corePoolSize; + private final int maxPoolSize; + private static final int BASE_SLEEP_TIME_MILLIS = 300; - private QueueRejectCounter rejectCounter = new QueueRejectCounter(); + private static final Logger logger = Logger.getLogger("HEALTH"); + private HealthyThreadPool healthyThreadPool; - private Cache bookingCache = CacheBuilder.newBuilder() - .expireAfterWrite(3, TimeUnit.MINUTES) - // Invalidate entries that got executed by the threadpool and lost their reference - .weakValues() - .build(); - - private BookingQueue(int corePoolSize, int maxPoolSize, int queueCapacity, int sleepTimeMs) { - super(corePoolSize, maxPoolSize, THREADS_KEEP_ALIVE_SECONDS, - TimeUnit.SECONDS, new LinkedBlockingQueue(queueCapacity)); + public BookingQueue(int healthThreshold, int minUnhealthyPeriodMin, int queueCapacity, + int corePoolSize, int maxPoolSize) { + this.healthThreshold = healthThreshold; + this.minUnhealthyPeriodMin = minUnhealthyPeriodMin; this.queueCapacity = queueCapacity; - this.baseSleepTimeMillis = sleepTimeMs; - this.setRejectedExecutionHandler(rejectCounter); - logger.info("BookingQueue" + - " core:" + getCorePoolSize() + - " max:" + getMaximumPoolSize() + - " capacity:" + queueCapacity + - " sleepTimeMs:" + sleepTimeMs); + this.corePoolSize = corePoolSize; + this.maxPoolSize = maxPoolSize; + initThreadPool(); } - @Autowired - public BookingQueue(Environment env, String propertyKeyPrefix, int sleepTimeMs) { - this(CueUtil.getIntProperty(env, propertyKeyPrefix, "core_pool_size"), - CueUtil.getIntProperty(env, propertyKeyPrefix, "max_pool_size"), - CueUtil.getIntProperty(env, propertyKeyPrefix, "queue_capacity"), - sleepTimeMs); + public void initThreadPool() { + healthyThreadPool = new HealthyThreadPool( + "BookingQueue", + healthThreshold, + minUnhealthyPeriodMin, + queueCapacity, + corePoolSize, + maxPoolSize, + BASE_SLEEP_TIME_MILLIS); } - public void execute(DispatchBookHost r) { - if (isShutdown.get()) { - return; - } - if (bookingCache.getIfPresent(r.getKey()) == null){ - bookingCache.put(r.getKey(), r); - super.execute(r); + public boolean isHealthy() { + try { + if (!healthyThreadPool.isHealthyOrShutdown()) { + logger.warn("BookingQueue: Unhealthy queue terminated, starting a new one"); + initThreadPool(); + } + } catch (InterruptedException e) { + // TODO: evaluate crashing the whole springbook context here + // to force a container restart cycle + logger.error("Failed to restart BookingThreadPool", e); + return false; } + + return true; + } + + public void execute(KeyRunnable r) { + healthyThreadPool.execute(r); } public long getRejectedTaskCount() { - return rejectCounter.getRejectCount(); + return healthyThreadPool.getRejectedTaskCount(); } public int getQueueCapacity() { @@ -91,56 +89,32 @@ public int getQueueCapacity() { } public void shutdown() { - if (!isShutdown.getAndSet(true)) { - logger.info("clearing out booking queue: " + this.getQueue().size()); - this.getQueue().clear(); - } + healthyThreadPool.shutdown(); + } + public int getSize() { + return healthyThreadPool.getQueue().size(); } - /** - * Lowers the sleep time as the queue grows. - * - * @return - */ - public int sleepTime() { - if (!isShutdown.get()) { - int sleep = (int) (baseSleepTimeMillis - (((this.getQueue().size () / - (float) queueCapacity) * baseSleepTimeMillis)) * 2); - if (sleep < 0) { - sleep = 0; - } - return sleep; - } else { - return 0; - } + public int getRemainingCapacity() { + return healthyThreadPool.getQueue().remainingCapacity(); } - protected void beforeExecute(Thread t, Runnable r) { - super.beforeExecute(t, r); - if (isShutdown()) { - this.remove(r); - } else { - try { - Thread.sleep(sleepTime()); - } catch (InterruptedException e) { - logger.info("booking queue was interrupted."); - Thread.currentThread().interrupt(); - } - } + public int getActiveCount() { + return healthyThreadPool.getActiveCount(); } - protected void afterExecute(Runnable r, Throwable t) { - super.afterExecute(r, t); + public long getCompletedTaskCount() { + return healthyThreadPool.getCompletedTaskCount(); + } - // Invalidate cache to avoid having to wait for GC to mark processed entries collectible - DispatchBookHost h = (DispatchBookHost)r; - bookingCache.invalidate(h.getKey()); + public long getCorePoolSize() { + return corePoolSize; + } - if (sleepTime() < 100) { - logger.info("BookingQueue cleanup executed."); - getQueue().clear(); - } + public long getMaximumPoolSize() { + return maxPoolSize; } + } diff --git a/cuebot/src/main/java/com/imageworks/spcue/dispatcher/DispatchQueue.java b/cuebot/src/main/java/com/imageworks/spcue/dispatcher/DispatchQueue.java index 81366dedc..3798bddaa 100644 --- a/cuebot/src/main/java/com/imageworks/spcue/dispatcher/DispatchQueue.java +++ b/cuebot/src/main/java/com/imageworks/spcue/dispatcher/DispatchQueue.java @@ -23,93 +23,84 @@ import java.util.concurrent.atomic.AtomicLong; import org.apache.log4j.Logger; -import org.springframework.core.task.TaskExecutor; -import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; +import com.imageworks.spcue.dispatcher.commands.KeyRunnable; public class DispatchQueue { - private TaskExecutor dispatchPool; - private ThreadPoolTaskExecutor _dispatchPool; - private String name = "Default"; - private AtomicBoolean isShutdown = new AtomicBoolean(false); - - private final AtomicLong tasksRun = new AtomicLong(0); - private final AtomicLong tasksRejected = new AtomicLong(0); + private int healthThreshold; + private int minUnhealthyPeriodMin; + private int queueCapacity; + private int corePoolSize; + private int maxPoolSize; - private static final Logger logger = Logger.getLogger(DispatchQueue.class); - - public DispatchQueue() {} + private static final Logger logger = Logger.getLogger("HEALTH"); + private String name = "Default"; + private HealthyThreadPool healthyDispatchPool; - public DispatchQueue(String name) { + public DispatchQueue(String name, int healthThreshold, int minUnhealthyPeriodMin, int queueCapacity, + int corePoolSize, int maxPoolSize) { this.name = name; + this.healthThreshold = healthThreshold; + this.minUnhealthyPeriodMin = minUnhealthyPeriodMin; + this.queueCapacity = queueCapacity; + this.corePoolSize = corePoolSize; + this.maxPoolSize = maxPoolSize; + initThreadPool(); } - public void execute(Runnable r) { + public void initThreadPool() { + healthyDispatchPool = new HealthyThreadPool( + name, + healthThreshold, + minUnhealthyPeriodMin, + queueCapacity, + corePoolSize, + maxPoolSize); + } + + public boolean isHealthy() { try { - if (!isShutdown.get()) { - this.dispatchPool.execute(r); - tasksRun.addAndGet(1); + if (!healthyDispatchPool.isHealthyOrShutdown()) { + logger.warn("DispatchQueue_" + name + ": Unhealthy queue terminated, starting a new one"); + initThreadPool(); } - } catch (Exception e) { - long rejection = tasksRejected.addAndGet(1); - logger.warn("Warning, dispatch queue - [" + name + "] rejected, " + e); - throw new DispatchQueueTaskRejectionException( - "Warning, dispatch queue [" + name + " rejected task #" - + rejection); + } catch (InterruptedException e) { + // TODO: evaluate crashing the whole springbook context here + // to force a container restart cycle + logger.error("DispatchQueue_" + name + ":Failed to restart DispatchThreadPool", e); + return false; } - } - public int getMaxPoolSize() { - return _dispatchPool.getMaxPoolSize(); + return true; } - public int getActiveThreadCount() { - return _dispatchPool.getActiveCount(); + public void execute(KeyRunnable r) { + healthyDispatchPool.execute(r); } - public int getWaitingCount() { - return _dispatchPool.getThreadPoolExecutor().getQueue().size(); + public long getRejectedTaskCount() { + return healthyDispatchPool.getRejectedTaskCount(); } - public int getRemainingCapacity() { - return _dispatchPool.getThreadPoolExecutor().getQueue().remainingCapacity(); + public void shutdown() { + healthyDispatchPool.shutdown(); } - public long getTotalDispatched() { - return tasksRun.get(); + public int getSize() { + return healthyDispatchPool.getQueue().size(); } - public long getTotalRejected() { - return tasksRejected.get(); + public int getRemainingCapacity() { + return healthyDispatchPool.getQueue().remainingCapacity(); } - public TaskExecutor getDispatchPool() { - return dispatchPool; + public int getActiveCount() { + return healthyDispatchPool.getActiveCount(); } - public void setDispatchPool(TaskExecutor dispatchPool) { - this.dispatchPool = dispatchPool; - this._dispatchPool = (ThreadPoolTaskExecutor) dispatchPool; + public long getCompletedTaskCount() { + return healthyDispatchPool.getCompletedTaskCount(); } - public void shutdown() { - if (!isShutdown.getAndSet(true)) { - logger.info("Shutting down thread pool " + name + ", currently " - + getActiveThreadCount() + " active threads."); - final long startTime = System.currentTimeMillis(); - while (getWaitingCount() != 0 && getActiveThreadCount() != 0) { - try { - if (System.currentTimeMillis() - startTime > 10000) { - throw new InterruptedException(name - + " thread pool failed to shutdown properly"); - } - Thread.sleep(250); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - break; - } - } - } - } } diff --git a/cuebot/src/main/java/com/imageworks/spcue/dispatcher/FrameCompleteHandler.java b/cuebot/src/main/java/com/imageworks/spcue/dispatcher/FrameCompleteHandler.java index f66b91097..b54757b69 100644 --- a/cuebot/src/main/java/com/imageworks/spcue/dispatcher/FrameCompleteHandler.java +++ b/cuebot/src/main/java/com/imageworks/spcue/dispatcher/FrameCompleteHandler.java @@ -39,6 +39,7 @@ import com.imageworks.spcue.VirtualProc; import com.imageworks.spcue.dispatcher.commands.DispatchBookHost; import com.imageworks.spcue.dispatcher.commands.DispatchNextFrame; +import com.imageworks.spcue.dispatcher.commands.KeyRunnable; import com.imageworks.spcue.grpc.host.LockState; import com.imageworks.spcue.grpc.job.FrameExitStatus; import com.imageworks.spcue.grpc.job.FrameState; @@ -158,10 +159,11 @@ public void handleFrameCompleteReport(final FrameCompleteReport report) { final LayerDetail layer = jobManager.getLayerDetail(report.getFrame().getLayerId()); final DispatchFrame frame = jobManager.getDispatchFrame(report.getFrame().getFrameId()); final FrameState newFrameState = determineFrameState(job, layer, frame, report); - + final String key = proc.getJobId() + "_" + report.getFrame().getLayerId() + + "_" + report.getFrame().getFrameId(); if (dispatchSupport.stopFrame(frame, newFrameState, report.getExitStatus(), report.getFrame().getMaxRss())) { - dispatchQueue.execute(new Runnable() { + dispatchQueue.execute(new KeyRunnable(key) { @Override public void run() { try { @@ -182,7 +184,7 @@ public void run() { * properties. */ if (redirectManager.hasRedirect(proc)) { - dispatchQueue.execute(new Runnable() { + dispatchQueue.execute(new KeyRunnable(key) { @Override public void run() { try { @@ -195,7 +197,7 @@ public void run() { }); } else { - dispatchQueue.execute(new Runnable() { + dispatchQueue.execute(new KeyRunnable(key) { @Override public void run() { try { diff --git a/cuebot/src/main/java/com/imageworks/spcue/dispatcher/HealthyThreadPool.java b/cuebot/src/main/java/com/imageworks/spcue/dispatcher/HealthyThreadPool.java new file mode 100644 index 000000000..13c96e776 --- /dev/null +++ b/cuebot/src/main/java/com/imageworks/spcue/dispatcher/HealthyThreadPool.java @@ -0,0 +1,235 @@ +package com.imageworks.spcue.dispatcher; + +import java.lang.management.ManagementFactory; +import java.lang.management.ThreadInfo; +import java.lang.management.ThreadMXBean; +import java.util.Date; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.LinkedBlockingQueue; + +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; +import com.imageworks.spcue.dispatcher.commands.DispatchBookHost; +import com.imageworks.spcue.dispatcher.commands.KeyRunnable; +import org.apache.log4j.Logger; + + +/*** + * A ThreadPoolExecutor with two additional features: + * - Handles repeated tasks by always keeping the latest version + * - With isHealthyOrShutdown, the threadpool will drain and clear resources when unhealthy + * + */ +public class HealthyThreadPool extends ThreadPoolExecutor { + // The service need s to be unhealthy for this period of time to report + private static final Logger logger = Logger.getLogger("HEALTH"); + // Threshold to consider healthy or unhealthy + private final int healthThreshold; + private final int poolSize; + private final int minUnhealthyPeriodMin; + private final QueueRejectCounter rejectCounter = new QueueRejectCounter(); + private final Cache taskCache; + private final String name; + private Date lastCheck = new Date(); + private boolean wasHealthy = true; + protected final AtomicBoolean isShutdown = new AtomicBoolean(false); + private final int baseSleepTimeMillis; + + /** + * Start a thread pool + * @param name For logging purposes + * @param healthThreshold Percentage that should be available to consider healthy + * @param minUnhealthyPeriodMin Period in min to consider a queue unhealthy + * @param poolSize how many jobs can be queued + * @param threadsMinimum Minimum number of threads + * @param threadsMaximum Maximum number of threads to grow to + */ + public HealthyThreadPool(String name, + int healthThreshold, + int minUnhealthyPeriodMin, + int poolSize, + int threadsMinimum, + int threadsMaximum) { + this(name, healthThreshold, minUnhealthyPeriodMin, poolSize, + threadsMinimum, threadsMaximum, 0); + } + + /** + * Start a thread pool + * + * @param name For logging purposes + * @param healthThreshold Percentage that should be available to consider healthy + * @param minUnhealthyPeriodMin Period in min to consider a queue unhealthy + * @param poolSize how many jobs can be queued + * @param threadsMinimum Minimum number of threads + * @param threadsMaximum Maximum number of threads to grow to + * @param baseSleepTimeMillis Time a thread should sleep when the service is not under pressure + */ + public HealthyThreadPool(String name, + int healthThreshold, + int minUnhealthyPeriodMin, + int poolSize, + int threadsMinimum, + int threadsMaximum, + int baseSleepTimeMillis) { + super(threadsMinimum, threadsMaximum, 10, + TimeUnit.SECONDS, new LinkedBlockingQueue(poolSize)); + + logger.debug(name + ": Starting a new HealthyThreadPool"); + this.name = name; + this.healthThreshold = healthThreshold; + this.poolSize = poolSize; + this.minUnhealthyPeriodMin = minUnhealthyPeriodMin; + this.baseSleepTimeMillis = baseSleepTimeMillis; + this.setRejectedExecutionHandler(rejectCounter); + + this.taskCache = CacheBuilder.newBuilder() + .expireAfterWrite(3, TimeUnit.MINUTES) + // Invalidate entries that got executed by the threadPool and lost their reference + .weakValues() + .concurrencyLevel(threadsMaximum) + .build(); + } + + public void execute(KeyRunnable r) { + if (isShutdown.get()) { + logger.info(name + ": Task ignored, queue on hold or shutdown"); + return; + } + if (taskCache.getIfPresent(r.getKey()) == null){ + taskCache.put(r.getKey(), r); + super.execute(r); + } + } + + public long getRejectedTaskCount() { + return rejectCounter.getRejectCount(); + } + + /** + * Monitor if the queue is unhealthy for MIN_UNHEALTHY_PERIOD_MIN + * + * If unhealthy, the service will start the shutdown process and the + * caller is responsible for starting a new instance after the lock on + * awaitTermination is released. + */ + protected boolean isHealthyOrShutdown() throws InterruptedException { + Date now = new Date(); + if (diffInMinutes(lastCheck, now) > minUnhealthyPeriodMin){ + this.wasHealthy = healthCheck(); + this.lastCheck = now; + } + + if(healthCheck() || wasHealthy) { + logger.debug(name + ": healthy (" + + "Remaining Capacity: " + this.getQueue().remainingCapacity() + + ", Running: " + this.getActiveCount() + + ", Total Executed: " + this.getCompletedTaskCount() + + ")"); + return true; + } + else if (isShutdown.get()) { + logger.warn("Queue shutting down"); + return false; + } + else { + logger.warn(name + ": unhealthy, starting shutdown)"); + threadDump(); + + isShutdown.set(true); + super.shutdownNow(); + logger.warn(name + ": Awaiting unhealthy queue termination"); + if (super.awaitTermination(1, TimeUnit.MINUTES)){ + logger.info(name + ": Terminated successfully"); + } + else { + logger.warn(name + ": Failed to terminate"); + } + // Threads will eventually terminate, proceed + taskCache.invalidateAll(); + return false; + } + } + + private void threadDump() { + ThreadMXBean mx = ManagementFactory.getThreadMXBean(); + for(ThreadInfo info : mx.dumpAllThreads(true, true)){ + logger.debug(info.toString()); + } + } + + private static long diffInMinutes(Date dateStart, Date dateEnd) { + return TimeUnit.MINUTES.convert( + dateEnd.getTime() - dateStart.getTime(), + TimeUnit.MILLISECONDS + ); + } + + /** + * Lowers the sleep time as the queue grows. + * + * @return + */ + public int sleepTime() { + if (!isShutdown.get()) { + int sleep = (int) (baseSleepTimeMillis - (((this.getQueue().size () / + (float) this.poolSize) * baseSleepTimeMillis)) * 2); + if (sleep < 0) { + sleep = 0; + } + return sleep; + } else { + return 0; + } + } + + protected void beforeExecute(Thread t, Runnable r) { + super.beforeExecute(t, r); + if (isShutdown()) { + this.remove(r); + } else { + if (baseSleepTimeMillis > 0) { + try { + Thread.sleep(sleepTime()); + } catch (InterruptedException e) { + logger.info(name + ": booking queue was interrupted."); + } + } + } + } + + protected void afterExecute(Runnable r, Throwable t) { + super.afterExecute(r, t); + + // Invalidate cache to avoid having to wait for GC to mark processed entries collectible + KeyRunnable h = (KeyRunnable)r; + taskCache.invalidate(h.getKey()); + } + + protected boolean healthCheck() { + return (this.getQueue().remainingCapacity() > 0) || + (getRejectedTaskCount() < this.poolSize / healthThreshold); + } + + public void shutdown() { + if (!isShutdown.getAndSet(true)) { + logger.info("Shutting down thread pool " + name + ", currently " + + getActiveCount() + " active threads."); + final long startTime = System.currentTimeMillis(); + while (this.getQueue().size() != 0 && this.getActiveCount() != 0) { + try { + if (System.currentTimeMillis() - startTime > 10000) { + throw new InterruptedException(name + + " thread pool failed to shutdown properly"); + } + Thread.sleep(250); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + break; + } + } + } + } +} diff --git a/cuebot/src/main/java/com/imageworks/spcue/dispatcher/HostReportQueue.java b/cuebot/src/main/java/com/imageworks/spcue/dispatcher/HostReportQueue.java index 922172202..59b91abcc 100644 --- a/cuebot/src/main/java/com/imageworks/spcue/dispatcher/HostReportQueue.java +++ b/cuebot/src/main/java/com/imageworks/spcue/dispatcher/HostReportQueue.java @@ -19,14 +19,16 @@ package com.imageworks.spcue.dispatcher; +import java.lang.ref.WeakReference; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import com.imageworks.spcue.grpc.report.HostReport; import org.apache.log4j.Logger; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.core.env.Environment; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; import com.imageworks.spcue.dispatcher.commands.DispatchHandleHostReport; import com.imageworks.spcue.util.CueUtil; @@ -34,38 +36,76 @@ public class HostReportQueue extends ThreadPoolExecutor { private static final Logger logger = Logger.getLogger(HostReportQueue.class); - private QueueRejectCounter rejectCounter = new QueueRejectCounter(); private AtomicBoolean isShutdown = new AtomicBoolean(false); private int queueCapacity; - private HostReportQueue(String name, int corePoolSize, int maxPoolSize, int queueCapacity) { - super(corePoolSize, maxPoolSize, 10 , TimeUnit.SECONDS, - new LinkedBlockingQueue(queueCapacity)); - this.queueCapacity = queueCapacity; - this.setRejectedExecutionHandler(rejectCounter); - logger.info(name + - " core:" + getCorePoolSize() + - " max:" + getMaximumPoolSize() + - " capacity:" + queueCapacity); + private Cache hostMap = CacheBuilder.newBuilder() + .expireAfterWrite(1, TimeUnit.HOURS) + .build(); + + /** + * Wrapper around protobuf object HostReport to add reportTi + */ + private class HostReportWrapper{ + private final HostReport hostReport; + private final WeakReference reportTaskRef; + public long taskTime = System.currentTimeMillis(); + + public HostReportWrapper(HostReport hostReport, DispatchHandleHostReport reportTask) { + this.hostReport = hostReport; + this.reportTaskRef = new WeakReference<>(reportTask); + } + + public HostReport getHostReport() { + return hostReport; + } + + public DispatchHandleHostReport getReportTask() { + return reportTaskRef.get(); + } + + public long getTaskTime() { + return taskTime; + } } - @Autowired - public HostReportQueue(Environment env, String name, String propertyKeyPrefix) { - this(name, - CueUtil.getIntProperty(env, propertyKeyPrefix, "core_pool_size"), - CueUtil.getIntProperty(env, propertyKeyPrefix, "max_pool_size"), - CueUtil.getIntProperty(env, propertyKeyPrefix, "queue_capacity")); + public HostReportQueue(int threadPoolSizeInitial, int threadPoolSizeMax, int queueSize) { + super(threadPoolSizeInitial, threadPoolSizeMax, 10 , TimeUnit.SECONDS, + new LinkedBlockingQueue(queueSize)); + this.setRejectedExecutionHandler(rejectCounter); } - public void execute(DispatchHandleHostReport r) { + public void execute(DispatchHandleHostReport newReport) { if (isShutdown.get()) { return; } - if (getQueue().contains(r)) { - getQueue().remove(r); + HostReportWrapper oldWrappedReport = hostMap.getIfPresent(newReport.getKey()); + // If hostReport exists on the cache and there's also a task waiting to be executed + // replace the old report by the new on, but refrain from creating another task + if (oldWrappedReport != null) { + DispatchHandleHostReport oldReport = oldWrappedReport.getReportTask(); + if(oldReport != null) { + // Replace report, but keep the reference of the existing task + hostMap.put(newReport.getKey(), + new HostReportWrapper(newReport.getHostReport(), oldReport)); + return; + } + } + hostMap.put(newReport.getKey(), + new HostReportWrapper(newReport.getHostReport(), newReport)); + super.execute(newReport); + } + + public HostReport removePendingHostReport(String key) { + if (key != null) { + HostReportWrapper r = hostMap.getIfPresent(key); + if (r != null) { + hostMap.asMap().remove(key, r); + return r.getHostReport(); + } } - super.execute(r); + return null; } public long getRejectedTaskCount() { @@ -95,5 +135,9 @@ public void shutdown() { } } } + + public boolean isHealthy() { + return getQueue().remainingCapacity() > 0; + } } diff --git a/cuebot/src/main/java/com/imageworks/spcue/dispatcher/QueueRejectCounter.java b/cuebot/src/main/java/com/imageworks/spcue/dispatcher/QueueRejectCounter.java index a9e0809f9..bb3d00716 100644 --- a/cuebot/src/main/java/com/imageworks/spcue/dispatcher/QueueRejectCounter.java +++ b/cuebot/src/main/java/com/imageworks/spcue/dispatcher/QueueRejectCounter.java @@ -36,5 +36,9 @@ public long getRejectCount() { return rejectCounter.get(); } + public void clear() { + rejectCounter.set(0); + } + } diff --git a/cuebot/src/main/java/com/imageworks/spcue/dispatcher/commands/DispatchBookHost.java b/cuebot/src/main/java/com/imageworks/spcue/dispatcher/commands/DispatchBookHost.java index 594444573..0b65257dd 100644 --- a/cuebot/src/main/java/com/imageworks/spcue/dispatcher/commands/DispatchBookHost.java +++ b/cuebot/src/main/java/com/imageworks/spcue/dispatcher/commands/DispatchBookHost.java @@ -19,62 +19,64 @@ package com.imageworks.spcue.dispatcher.commands; +import java.util.List; +import java.util.ArrayList; +import org.apache.log4j.Logger; + import com.imageworks.spcue.DispatchHost; import com.imageworks.spcue.GroupInterface; import com.imageworks.spcue.JobInterface; import com.imageworks.spcue.ShowInterface; import com.imageworks.spcue.dispatcher.Dispatcher; +import com.imageworks.spcue.VirtualProc; /** * A command for booking a host. * * @category command */ -public class DispatchBookHost implements Runnable { +public class DispatchBookHost extends KeyRunnable { + private static final Logger logger = + Logger.getLogger(DispatchBookHost.class); private ShowInterface show = null; private GroupInterface group = null; private JobInterface job = null; private DispatchHost host; private Dispatcher dispatcher; - private String key; public DispatchHost getDispatchHost() { - this.key = host.getId(); + this.setKey(host.getId()); return host; } public DispatchBookHost(DispatchHost host, Dispatcher d) { + super(host.getId()); this.host = host; - this.key = host.getId(); this.dispatcher = d; } public DispatchBookHost(DispatchHost host, JobInterface job, Dispatcher d) { + super(host.getId() + "_job_" + job.getJobId()); this.host = host; this.job = job; - this.key = host.getId() + "_job_" + job.getJobId(); this.dispatcher = d; } public DispatchBookHost(DispatchHost host, GroupInterface group, Dispatcher d) { + super(host.getId() + "_group_" + group.getGroupId()); this.host = host; this.group = group; - this.key = host.getId() + "_group_" + group.getGroupId(); this.dispatcher = d; } public DispatchBookHost(DispatchHost host, ShowInterface show, Dispatcher d) { + super(host.getId() + "_name_" + show.getName()); this.host = host; this.show = show; - this.key = host.getId() + "_name_" + show.getName(); this.dispatcher = d; } - public String getKey() { - return this.key; - } - public void run() { new DispatchCommandTemplate() { public void wrapDispatchCommand() { diff --git a/cuebot/src/main/java/com/imageworks/spcue/dispatcher/commands/DispatchBookHostLocal.java b/cuebot/src/main/java/com/imageworks/spcue/dispatcher/commands/DispatchBookHostLocal.java index 3a742504c..737541a08 100644 --- a/cuebot/src/main/java/com/imageworks/spcue/dispatcher/commands/DispatchBookHostLocal.java +++ b/cuebot/src/main/java/com/imageworks/spcue/dispatcher/commands/DispatchBookHostLocal.java @@ -22,12 +22,14 @@ import com.imageworks.spcue.DispatchHost; import com.imageworks.spcue.dispatcher.Dispatcher; -public class DispatchBookHostLocal implements Runnable { +public class DispatchBookHostLocal extends KeyRunnable { private DispatchHost host; private Dispatcher dispatcher; public DispatchBookHostLocal(DispatchHost host, Dispatcher d) { + super(host.getId()); + this.host = host; this.dispatcher = d; } diff --git a/cuebot/src/main/java/com/imageworks/spcue/dispatcher/commands/DispatchDropDepends.java b/cuebot/src/main/java/com/imageworks/spcue/dispatcher/commands/DispatchDropDepends.java index ca6e4b8c2..cf6428f3a 100644 --- a/cuebot/src/main/java/com/imageworks/spcue/dispatcher/commands/DispatchDropDepends.java +++ b/cuebot/src/main/java/com/imageworks/spcue/dispatcher/commands/DispatchDropDepends.java @@ -19,6 +19,7 @@ package com.imageworks.spcue.dispatcher.commands; + import com.imageworks.spcue.FrameInterface; import com.imageworks.spcue.JobInterface; import com.imageworks.spcue.LayerInterface; @@ -31,7 +32,7 @@ * * @category command */ -public class DispatchDropDepends implements Runnable { +public class DispatchDropDepends extends KeyRunnable { JobInterface job; LayerInterface layer; @@ -41,18 +42,21 @@ public class DispatchDropDepends implements Runnable { DependManager dependManager; public DispatchDropDepends(JobInterface job, DependTarget target, DependManager dependManager) { + super("disp_drop_dep_job_" + job.getJobId() + "_" + target.toString()); this.job = job; this.target = target; this.dependManager = dependManager; } public DispatchDropDepends(LayerInterface layer, DependTarget target, DependManager dependManager) { + super("disp_drop_dep_layer_" + layer.getLayerId() + "_" + target.toString()); this.layer = layer; this.target = target; this.dependManager = dependManager; } public DispatchDropDepends(FrameInterface frame, DependTarget target, DependManager dependManager) { + super("disp_drop_dep_frame_" + frame.getFrameId() + "_" + target.toString()); this.frame = frame; this.target = target; this.dependManager = dependManager; diff --git a/cuebot/src/main/java/com/imageworks/spcue/dispatcher/commands/DispatchEatFrames.java b/cuebot/src/main/java/com/imageworks/spcue/dispatcher/commands/DispatchEatFrames.java index 34292db49..32a2acf69 100644 --- a/cuebot/src/main/java/com/imageworks/spcue/dispatcher/commands/DispatchEatFrames.java +++ b/cuebot/src/main/java/com/imageworks/spcue/dispatcher/commands/DispatchEatFrames.java @@ -23,18 +23,20 @@ import com.imageworks.spcue.dao.criteria.FrameSearchInterface; import com.imageworks.spcue.service.JobManagerSupport; + /** * A command for eating an array of frames * * @category command */ -public class DispatchEatFrames implements Runnable { +public class DispatchEatFrames extends KeyRunnable { private FrameSearchInterface search; private Source source; private JobManagerSupport jobManagerSupport; public DispatchEatFrames(FrameSearchInterface search, Source source, JobManagerSupport jobManagerSupport) { + super("disp_eat_frames_job_" + search.hashCode() + "_" + jobManagerSupport.hashCode()); this.search = search; this.source = source; this.jobManagerSupport = jobManagerSupport; diff --git a/cuebot/src/main/java/com/imageworks/spcue/dispatcher/commands/DispatchHandleHostReport.java b/cuebot/src/main/java/com/imageworks/spcue/dispatcher/commands/DispatchHandleHostReport.java index c8f41131d..1a18f06ad 100644 --- a/cuebot/src/main/java/com/imageworks/spcue/dispatcher/commands/DispatchHandleHostReport.java +++ b/cuebot/src/main/java/com/imageworks/spcue/dispatcher/commands/DispatchHandleHostReport.java @@ -28,7 +28,7 @@ * * @category command */ -public class DispatchHandleHostReport implements Runnable { +public class DispatchHandleHostReport extends KeyRunnable { private HostReport hostReport; private boolean isBootReport; @@ -36,12 +36,16 @@ public class DispatchHandleHostReport implements Runnable { public volatile int reportTime = (int) (System.currentTimeMillis() / 1000); public DispatchHandleHostReport(HostReport report, HostReportHandler rqdReportManager) { + super("disp_handle_host_report_" + report.hashCode() + + "_" + rqdReportManager.hashCode()); this.hostReport = report; this.isBootReport = false; this.hostReportHandler = rqdReportManager; } public DispatchHandleHostReport(BootReport report, HostReportHandler rqdReportManager) { + super("disp_handle_host_report_" + report.hashCode() + + "_" + rqdReportManager.hashCode()); HostReport hostReport = HostReport.newBuilder() .setHost(report.getHost()) .setCoreInfo(report.getCoreInfo()) diff --git a/cuebot/src/main/java/com/imageworks/spcue/dispatcher/commands/DispatchJobComplete.java b/cuebot/src/main/java/com/imageworks/spcue/dispatcher/commands/DispatchJobComplete.java index 5cb0a0da7..1910321a8 100644 --- a/cuebot/src/main/java/com/imageworks/spcue/dispatcher/commands/DispatchJobComplete.java +++ b/cuebot/src/main/java/com/imageworks/spcue/dispatcher/commands/DispatchJobComplete.java @@ -28,13 +28,15 @@ * * @category command */ -public class DispatchJobComplete implements Runnable { +public class DispatchJobComplete extends KeyRunnable { private JobInterface job; private Source source; private boolean isManualKill; private JobManagerSupport jobManagerSupport; - public DispatchJobComplete(JobInterface job, Source source, boolean isManualKill, JobManagerSupport jobManagerSupport) { + public DispatchJobComplete(JobInterface job, Source source, boolean isManualKill, + JobManagerSupport jobManagerSupport) { + super("disp_job_complete_" + job.getJobId() + "_" + source.toString()); this.job = job; this.source = source; this.isManualKill = isManualKill; diff --git a/cuebot/src/main/java/com/imageworks/spcue/dispatcher/commands/DispatchKillFrames.java b/cuebot/src/main/java/com/imageworks/spcue/dispatcher/commands/DispatchKillFrames.java index 73ce9d97c..986d6bd05 100644 --- a/cuebot/src/main/java/com/imageworks/spcue/dispatcher/commands/DispatchKillFrames.java +++ b/cuebot/src/main/java/com/imageworks/spcue/dispatcher/commands/DispatchKillFrames.java @@ -28,13 +28,14 @@ * * @category command */ -public class DispatchKillFrames implements Runnable { +public class DispatchKillFrames extends KeyRunnable { private FrameSearchInterface search; private JobManagerSupport jobManagerSupport; private Source source; public DispatchKillFrames(FrameSearchInterface search, Source source, JobManagerSupport jobManagerSupport) { + super("disp_kill_frames_" + source.toString() + "_" + jobManagerSupport.hashCode()); this.search = search; this.source = source; this.jobManagerSupport = jobManagerSupport; diff --git a/cuebot/src/main/java/com/imageworks/spcue/dispatcher/commands/DispatchKillProcs.java b/cuebot/src/main/java/com/imageworks/spcue/dispatcher/commands/DispatchKillProcs.java index 7c6464eff..d97966139 100644 --- a/cuebot/src/main/java/com/imageworks/spcue/dispatcher/commands/DispatchKillProcs.java +++ b/cuebot/src/main/java/com/imageworks/spcue/dispatcher/commands/DispatchKillProcs.java @@ -19,18 +19,19 @@ package com.imageworks.spcue.dispatcher.commands; -import java.util.Collection; - import com.imageworks.spcue.Source; import com.imageworks.spcue.VirtualProc; import com.imageworks.spcue.service.JobManagerSupport; +import java.util.Collection; -public class DispatchKillProcs implements Runnable { +public class DispatchKillProcs extends KeyRunnable { private Collection procs; private JobManagerSupport jobManagerSupport; private Source source; public DispatchKillProcs(Collection procs, Source source, JobManagerSupport jobManagerSupport) { + super("disp_kill_procs_" + procs.hashCode() + "_" + source.toString() + + "_" + jobManagerSupport.hashCode()); this.procs = procs; this.source = source; this.jobManagerSupport = jobManagerSupport; diff --git a/cuebot/src/main/java/com/imageworks/spcue/dispatcher/commands/DispatchLaunchJob.java b/cuebot/src/main/java/com/imageworks/spcue/dispatcher/commands/DispatchLaunchJob.java index cfbcd3eb4..c3682866e 100644 --- a/cuebot/src/main/java/com/imageworks/spcue/dispatcher/commands/DispatchLaunchJob.java +++ b/cuebot/src/main/java/com/imageworks/spcue/dispatcher/commands/DispatchLaunchJob.java @@ -25,12 +25,13 @@ /** * @category DispatchCommand */ -public class DispatchLaunchJob implements Runnable { +public class DispatchLaunchJob extends KeyRunnable { private JobLauncher jobLauncher; private JobSpec spec; public DispatchLaunchJob(JobSpec spec, JobLauncher jobLauncher) { + super("disp_launch_job_" + spec.getShow() + "_" + spec.getShot() + "_" + spec.getUid()); this.spec = spec; this.jobLauncher = jobLauncher; } diff --git a/cuebot/src/main/java/com/imageworks/spcue/dispatcher/commands/DispatchMoveJobs.java b/cuebot/src/main/java/com/imageworks/spcue/dispatcher/commands/DispatchMoveJobs.java index b5c96740b..92ec0db29 100644 --- a/cuebot/src/main/java/com/imageworks/spcue/dispatcher/commands/DispatchMoveJobs.java +++ b/cuebot/src/main/java/com/imageworks/spcue/dispatcher/commands/DispatchMoveJobs.java @@ -26,13 +26,15 @@ import com.imageworks.spcue.JobInterface; import com.imageworks.spcue.service.GroupManager; -public class DispatchMoveJobs implements Runnable { +public class DispatchMoveJobs extends KeyRunnable { private GroupDetail group; private List jobs; private GroupManager groupManager; public DispatchMoveJobs(GroupDetail group, List jobs, GroupManager groupManager) { + super("disp_move_jobs_" + group.getGroupId() + "_dept_" + group.getDepartmentId() + + "_show_" + group.getShowId()); this.group = group; this.jobs = jobs; this.groupManager = groupManager; diff --git a/cuebot/src/main/java/com/imageworks/spcue/dispatcher/commands/DispatchNextFrame.java b/cuebot/src/main/java/com/imageworks/spcue/dispatcher/commands/DispatchNextFrame.java index c563a34c0..7e12eb90c 100644 --- a/cuebot/src/main/java/com/imageworks/spcue/dispatcher/commands/DispatchNextFrame.java +++ b/cuebot/src/main/java/com/imageworks/spcue/dispatcher/commands/DispatchNextFrame.java @@ -28,13 +28,14 @@ * * @category command */ -public class DispatchNextFrame implements Runnable { +public class DispatchNextFrame extends KeyRunnable { private VirtualProc proc; private DispatchJob job; private Dispatcher dispatcher; public DispatchNextFrame(DispatchJob j, VirtualProc p, Dispatcher d) { + super("disp_next_frame_" + j.getJobId() + "_" + p.getProcId()); this.job = j; this.proc = p; this.dispatcher = d; diff --git a/cuebot/src/main/java/com/imageworks/spcue/dispatcher/commands/DispatchReorderFrames.java b/cuebot/src/main/java/com/imageworks/spcue/dispatcher/commands/DispatchReorderFrames.java index b17710f51..528474929 100644 --- a/cuebot/src/main/java/com/imageworks/spcue/dispatcher/commands/DispatchReorderFrames.java +++ b/cuebot/src/main/java/com/imageworks/spcue/dispatcher/commands/DispatchReorderFrames.java @@ -25,7 +25,7 @@ import com.imageworks.spcue.service.JobManagerSupport; import com.imageworks.spcue.util.FrameSet; -public class DispatchReorderFrames implements Runnable { +public class DispatchReorderFrames extends KeyRunnable { private JobInterface job = null; private LayerInterface layer = null; @@ -33,14 +33,20 @@ public class DispatchReorderFrames implements Runnable { private Order order; private JobManagerSupport jobManagerSupport; - public DispatchReorderFrames(JobInterface job, FrameSet frameSet, Order order, JobManagerSupport jobManagerSupport) { + public DispatchReorderFrames(JobInterface job, FrameSet frameSet, Order order, + JobManagerSupport jobManagerSupport) { + super("disp_reorder_frames_job_" + job.getJobId() + + "_" + jobManagerSupport.toString()); this.job = job; this.frameSet = frameSet; this.order = order; this.jobManagerSupport = jobManagerSupport; } - public DispatchReorderFrames(LayerInterface layer, FrameSet frameSet, Order order, JobManagerSupport jobManagerSupport) { + public DispatchReorderFrames(LayerInterface layer, FrameSet frameSet, Order order, + JobManagerSupport jobManagerSupport) { + super("disp_reorder_frames_layer_" + layer.getLayerId() + + "_" + jobManagerSupport.toString()); this.layer = layer; this.frameSet = frameSet; this.order = order; diff --git a/cuebot/src/main/java/com/imageworks/spcue/dispatcher/commands/DispatchRetryFrames.java b/cuebot/src/main/java/com/imageworks/spcue/dispatcher/commands/DispatchRetryFrames.java index 5d9037032..8546423dd 100644 --- a/cuebot/src/main/java/com/imageworks/spcue/dispatcher/commands/DispatchRetryFrames.java +++ b/cuebot/src/main/java/com/imageworks/spcue/dispatcher/commands/DispatchRetryFrames.java @@ -28,7 +28,7 @@ * * @category command */ -public class DispatchRetryFrames implements Runnable { +public class DispatchRetryFrames extends KeyRunnable { private FrameSearchInterface search; private Source source; @@ -36,6 +36,7 @@ public class DispatchRetryFrames implements Runnable { public DispatchRetryFrames(FrameSearchInterface search, Source source, JobManagerSupport jobManagerSupport) { + super("disp_retry_frames_" + search.hashCode() + "_" + source.toString()); this.search = search; this.source = source; this.jobManagerSupport = jobManagerSupport; diff --git a/cuebot/src/main/java/com/imageworks/spcue/dispatcher/commands/DispatchRqdKillFrame.java b/cuebot/src/main/java/com/imageworks/spcue/dispatcher/commands/DispatchRqdKillFrame.java index 4b7af8328..3a43c6fc3 100644 --- a/cuebot/src/main/java/com/imageworks/spcue/dispatcher/commands/DispatchRqdKillFrame.java +++ b/cuebot/src/main/java/com/imageworks/spcue/dispatcher/commands/DispatchRqdKillFrame.java @@ -21,11 +21,12 @@ import org.apache.log4j.Logger; +import com.imageworks.spcue.dispatcher.commands.KeyRunnable; import com.imageworks.spcue.VirtualProc; import com.imageworks.spcue.rqd.RqdClient; import com.imageworks.spcue.rqd.RqdClientException; -public class DispatchRqdKillFrame implements Runnable { +public class DispatchRqdKillFrame extends KeyRunnable { private static final Logger logger = Logger.getLogger(DispatchRqdKillFrame.class); @@ -38,6 +39,7 @@ public class DispatchRqdKillFrame implements Runnable { private final RqdClient rqdClient; public DispatchRqdKillFrame(String hostname, String frameId, String message, RqdClient rqdClient) { + super("disp_rqd_kill_frame_" + hostname + "_" + frameId + "_" + rqdClient.toString()); this.hostname = hostname; this.frameId = frameId; this.message = message; @@ -45,6 +47,7 @@ public DispatchRqdKillFrame(String hostname, String frameId, String message, Rqd } public DispatchRqdKillFrame(VirtualProc proc, String message, RqdClient rqdClient) { + super("disp_rqd_kill_frame_" + proc.getProcId() + "_" + rqdClient.toString()); this.proc = proc; this.hostname = proc.hostName; this.message = message; diff --git a/cuebot/src/main/java/com/imageworks/spcue/dispatcher/commands/DispatchSatisfyDepends.java b/cuebot/src/main/java/com/imageworks/spcue/dispatcher/commands/DispatchSatisfyDepends.java index 7016e526f..5294a203c 100644 --- a/cuebot/src/main/java/com/imageworks/spcue/dispatcher/commands/DispatchSatisfyDepends.java +++ b/cuebot/src/main/java/com/imageworks/spcue/dispatcher/commands/DispatchSatisfyDepends.java @@ -30,7 +30,7 @@ * * @category command */ -public class DispatchSatisfyDepends implements Runnable { +public class DispatchSatisfyDepends extends KeyRunnable { private JobInterface job = null; private LayerInterface layer = null; @@ -39,21 +39,25 @@ public class DispatchSatisfyDepends implements Runnable { private JobManagerSupport jobManagerSupport; public DispatchSatisfyDepends(JobInterface job, JobManagerSupport jobManagerSupport) { + super("disp_sat_deps_" + job.getJobId() + "_" + jobManagerSupport.toString()); this.job = job; this.jobManagerSupport = jobManagerSupport; } public DispatchSatisfyDepends(LayerInterface layer, JobManagerSupport jobManagerSupport) { + super("disp_sat_deps_" + layer.getLayerId() + "_" + jobManagerSupport.toString()); this.layer = layer; this.jobManagerSupport = jobManagerSupport; } public DispatchSatisfyDepends(FrameInterface frame, JobManagerSupport jobManagerSupport) { + super("disp_sat_deps_" + frame.getFrameId() + "_" + jobManagerSupport.toString()); this.frame = frame; this.jobManagerSupport = jobManagerSupport; } public DispatchSatisfyDepends(FrameSearchInterface search, JobManagerSupport jobManagerSupport) { + super("disp_sat_deps_" + search.hashCode() + "_" + jobManagerSupport.hashCode()); this.search = search; this.jobManagerSupport = jobManagerSupport; } diff --git a/cuebot/src/main/java/com/imageworks/spcue/dispatcher/commands/DispatchShutdownJobIfCompleted.java b/cuebot/src/main/java/com/imageworks/spcue/dispatcher/commands/DispatchShutdownJobIfCompleted.java index 15ddc805e..b4eb11a07 100644 --- a/cuebot/src/main/java/com/imageworks/spcue/dispatcher/commands/DispatchShutdownJobIfCompleted.java +++ b/cuebot/src/main/java/com/imageworks/spcue/dispatcher/commands/DispatchShutdownJobIfCompleted.java @@ -29,11 +29,12 @@ * * @category command */ -public class DispatchShutdownJobIfCompleted implements Runnable { +public class DispatchShutdownJobIfCompleted extends KeyRunnable { private JobInterface job; private JobManagerSupport jobManagerSupport; public DispatchShutdownJobIfCompleted(JobInterface job, JobManagerSupport jobManagerSupport) { + super("disp_st_job_comp_" + job.getJobId()); this.job = job; this.jobManagerSupport = jobManagerSupport; } diff --git a/cuebot/src/main/java/com/imageworks/spcue/dispatcher/commands/DispatchStaggerFrames.java b/cuebot/src/main/java/com/imageworks/spcue/dispatcher/commands/DispatchStaggerFrames.java index d40e5e185..b0430b892 100644 --- a/cuebot/src/main/java/com/imageworks/spcue/dispatcher/commands/DispatchStaggerFrames.java +++ b/cuebot/src/main/java/com/imageworks/spcue/dispatcher/commands/DispatchStaggerFrames.java @@ -23,7 +23,7 @@ import com.imageworks.spcue.LayerInterface; import com.imageworks.spcue.service.JobManagerSupport; -public class DispatchStaggerFrames implements Runnable { +public class DispatchStaggerFrames extends KeyRunnable { private JobInterface job = null; private LayerInterface layer = null; @@ -32,6 +32,7 @@ public class DispatchStaggerFrames implements Runnable { private JobManagerSupport jobManagerSupport; public DispatchStaggerFrames(JobInterface job, String range, int stagger, JobManagerSupport jobManagerSupport) { + super("disp_stag_frames_" + job.getJobId() + "_" + range); this.job = job; this.range = range; this.stagger = stagger; @@ -39,6 +40,7 @@ public DispatchStaggerFrames(JobInterface job, String range, int stagger, JobMan } public DispatchStaggerFrames(LayerInterface layer, String range, int stagger, JobManagerSupport jobManagerSupport) { + super("disp_stag_frames_" + layer.getLayerId() + "_" + range); this.layer = layer; this.range = range; this.stagger = stagger; diff --git a/cuebot/src/main/java/com/imageworks/spcue/dispatcher/commands/KeyRunnable.java b/cuebot/src/main/java/com/imageworks/spcue/dispatcher/commands/KeyRunnable.java new file mode 100644 index 000000000..bdbdb87da --- /dev/null +++ b/cuebot/src/main/java/com/imageworks/spcue/dispatcher/commands/KeyRunnable.java @@ -0,0 +1,41 @@ + +/* + * Copyright Contributors to the OpenCue Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + + +package com.imageworks.spcue.dispatcher.commands; + +import com.imageworks.spcue.DispatchHost; +import com.imageworks.spcue.dispatcher.Dispatcher; + +public abstract class KeyRunnable implements Runnable { + + private String key; + + public KeyRunnable(String key) { + this.key = key; + } + + public String getKey() { + return key; + } + + public void setKey(String key) { + this.key = key; + } +} + diff --git a/cuebot/src/main/java/com/imageworks/spcue/dispatcher/commands/ManageReparentHosts.java b/cuebot/src/main/java/com/imageworks/spcue/dispatcher/commands/ManageReparentHosts.java index 1dfc2ca9d..15ab1384e 100644 --- a/cuebot/src/main/java/com/imageworks/spcue/dispatcher/commands/ManageReparentHosts.java +++ b/cuebot/src/main/java/com/imageworks/spcue/dispatcher/commands/ManageReparentHosts.java @@ -22,15 +22,17 @@ import java.util.List; import com.imageworks.spcue.AllocationInterface; +import com.imageworks.spcue.dispatcher.commands.KeyRunnable; import com.imageworks.spcue.HostInterface; import com.imageworks.spcue.service.HostManager; -public class ManageReparentHosts implements Runnable { +public class ManageReparentHosts extends KeyRunnable { AllocationInterface alloc; List hosts; HostManager hostManager; public ManageReparentHosts(AllocationInterface alloc, List hosts, HostManager hostManager) { + super(alloc.getAllocationId()); this.alloc = alloc; this.hosts = hosts; this.hostManager = hostManager; diff --git a/cuebot/src/main/java/com/imageworks/spcue/rqd/RqdClientGrpc.java b/cuebot/src/main/java/com/imageworks/spcue/rqd/RqdClientGrpc.java index d9ad91cfd..5e1e016fa 100644 --- a/cuebot/src/main/java/com/imageworks/spcue/rqd/RqdClientGrpc.java +++ b/cuebot/src/main/java/com/imageworks/spcue/rqd/RqdClientGrpc.java @@ -122,7 +122,7 @@ public void setHostLock(HostInterface host, LockState lock) { logger.debug("Locking RQD host"); lockHost(host); } else { - logger.debug("Unkown LockState passed to setHostLock."); + logger.debug("Unknown LockState passed to setHostLock."); } } diff --git a/cuebot/src/main/java/com/imageworks/spcue/servant/CueStatic.java b/cuebot/src/main/java/com/imageworks/spcue/servant/CueStatic.java index f7d5437af..d2c8b17e1 100644 --- a/cuebot/src/main/java/com/imageworks/spcue/servant/CueStatic.java +++ b/cuebot/src/main/java/com/imageworks/spcue/servant/CueStatic.java @@ -44,17 +44,17 @@ public class CueStatic extends CueInterfaceGrpc.CueInterfaceImplBase { public void getSystemStats(CueGetSystemStatsRequest request, StreamObserver responseObserver) { SystemStats stats = SystemStats.newBuilder() - .setDispatchThreads(dispatchQueue.getActiveThreadCount()) - .setDispatchWaiting(dispatchQueue.getWaitingCount()) + .setDispatchThreads(dispatchQueue.getActiveCount()) + .setDispatchWaiting(dispatchQueue.getSize()) .setDispatchRemainingCapacity(dispatchQueue.getRemainingCapacity()) - .setDispatchExecuted(dispatchQueue.getTotalDispatched()) - .setDispatchRejected(dispatchQueue.getTotalRejected()) + .setDispatchExecuted(dispatchQueue.getCompletedTaskCount()) + .setDispatchRejected(dispatchQueue.getRejectedTaskCount()) - .setManageThreads(manageQueue.getActiveThreadCount()) - .setManageWaiting(manageQueue.getWaitingCount()) + .setManageThreads(manageQueue.getActiveCount()) + .setManageWaiting(manageQueue.getSize()) .setManageRemainingCapacity(manageQueue.getRemainingCapacity()) - .setManageExecuted(manageQueue.getTotalDispatched()) - .setManageRejected(manageQueue.getTotalRejected()) + .setManageExecuted(manageQueue.getCompletedTaskCount()) + .setManageRejected(manageQueue.getRejectedTaskCount()) .setReportThreads(reportQueue.getActiveCount()) .setReportWaiting(reportQueue.getQueue().size()) @@ -62,12 +62,12 @@ public void getSystemStats(CueGetSystemStatsRequest request, .setReportExecuted(reportQueue.getTaskCount()) .setReportRejected(reportQueue.getRejectedTaskCount()) - .setBookingWaiting(bookingQueue.getQueue().size()) - .setBookingRemainingCapacity(bookingQueue.getQueue().remainingCapacity()) + .setBookingWaiting(bookingQueue.getSize()) + .setBookingRemainingCapacity(bookingQueue.getRemainingCapacity()) .setBookingThreads(bookingQueue.getActiveCount()) .setBookingExecuted(bookingQueue.getCompletedTaskCount()) .setBookingRejected(bookingQueue.getRejectedTaskCount()) - .setBookingSleepMillis(bookingQueue.sleepTime()) + .setBookingSleepMillis(0) .setHostBalanceSuccess(DispatchSupport.balanceSuccess.get()) .setHostBalanceFailed(DispatchSupport.balanceFailed.get()) @@ -76,7 +76,7 @@ public void getSystemStats(CueGetSystemStatsRequest request, .setClearedProcs(DispatchSupport.clearedProcs.get()) .setBookingRetries(DispatchSupport.bookingRetries.get()) .setBookingErrors(DispatchSupport.bookingErrors.get()) - .setBookedProcs( DispatchSupport.bookedProcs.get()) + .setBookedProcs(DispatchSupport.bookedProcs.get()) // TODO(gregdenton) Reimplement these with gRPC. (Issue #69) // .setReqForData(IceServer.dataRequests.get()) diff --git a/cuebot/src/main/java/com/imageworks/spcue/servant/ManageDepend.java b/cuebot/src/main/java/com/imageworks/spcue/servant/ManageDepend.java index bb3837c26..5d1c2ab6e 100644 --- a/cuebot/src/main/java/com/imageworks/spcue/servant/ManageDepend.java +++ b/cuebot/src/main/java/com/imageworks/spcue/servant/ManageDepend.java @@ -26,6 +26,7 @@ import com.imageworks.spcue.LightweightDependency; import com.imageworks.spcue.dispatcher.DispatchQueue; +import com.imageworks.spcue.dispatcher.commands.KeyRunnable; import com.imageworks.spcue.grpc.depend.DependGetDependRequest; import com.imageworks.spcue.grpc.depend.DependGetDependResponse; import com.imageworks.spcue.grpc.depend.DependInterfaceGrpc; @@ -62,7 +63,8 @@ public void getDepend(DependGetDependRequest request, StreamObserver responseObserver) { LightweightDependency depend = dependManager.getDepend(request.getDepend().getId()); - manageQueue.execute(new Runnable() { + String key = "manage_dep_sat_req_" + request.getDepend().getId(); + manageQueue.execute(new KeyRunnable(key) { public void run() { try { logger.info("dropping dependency: " + depend.id); diff --git a/cuebot/src/main/java/com/imageworks/spcue/servant/ManageFilter.java b/cuebot/src/main/java/com/imageworks/spcue/servant/ManageFilter.java index 15f6163bc..8749a5787 100644 --- a/cuebot/src/main/java/com/imageworks/spcue/servant/ManageFilter.java +++ b/cuebot/src/main/java/com/imageworks/spcue/servant/ManageFilter.java @@ -31,6 +31,7 @@ import com.imageworks.spcue.dao.FilterDao; import com.imageworks.spcue.dao.GroupDao; import com.imageworks.spcue.dispatcher.DispatchQueue; +import com.imageworks.spcue.dispatcher.commands.KeyRunnable; import com.imageworks.spcue.grpc.filter.Action; import com.imageworks.spcue.grpc.filter.ActionSeq; import com.imageworks.spcue.grpc.filter.Filter; @@ -125,7 +126,8 @@ public void createMatcher(FilterCreateMatcherRequest request, @Override public void delete(FilterDeleteRequest request, StreamObserver responseObserver) { FilterEntity filter = getFilterEntity(request.getFilter()); - manageQueue.execute(new Runnable() { + String key = "manage_filter_del_req_" + filter.getId(); + manageQueue.execute(new KeyRunnable(key) { public void run() { filterManager.deleteFilter(filter); } diff --git a/cuebot/src/main/java/com/imageworks/spcue/util/CueUtil.java b/cuebot/src/main/java/com/imageworks/spcue/util/CueUtil.java index 990511f47..2a7438f49 100644 --- a/cuebot/src/main/java/com/imageworks/spcue/util/CueUtil.java +++ b/cuebot/src/main/java/com/imageworks/spcue/util/CueUtil.java @@ -35,6 +35,7 @@ import java.util.regex.Pattern; import javax.activation.DataHandler; import javax.activation.DataSource; +import javax.annotation.PostConstruct; import javax.mail.BodyPart; import javax.mail.Message; import javax.mail.Session; @@ -47,17 +48,24 @@ import org.apache.log4j.Logger; import org.springframework.core.env.Environment; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; import com.imageworks.spcue.LayerInterface; import com.imageworks.spcue.SpcueRuntimeException; import com.imageworks.spcue.dispatcher.Dispatcher; + /** * CueUtil is set of common methods used throughout the application. */ +@Component public final class CueUtil { private static final Logger logger = Logger.getLogger(CueUtil.class); + private static String smtpHost = ""; + @Autowired + private Environment env; /** * Commonly used macros for gigabyte values in KB. @@ -88,6 +96,11 @@ public final class CueUtil { */ public static final int ONE_HOUR = 3600; + @PostConstruct + public void init() { + CueUtil.smtpHost = this.env.getRequiredProperty("smtp_host", String.class); + } + /** * Return true if the given name is formatted as a valid * allocation name. Allocation names should be facility.unique_name. @@ -157,7 +170,7 @@ public static int findChunk(List dependOnFrames, int dependErFrame) { public static void sendmail(String to, String from, String subject, StringBuilder body, Map images) { try { Properties props = System.getProperties(); - props.put("mail.smtp.host", "smtp"); + props.put("mail.smtp.host", CueUtil.smtpHost); Session session = Session.getDefaultInstance(props, null); Message msg = new MimeMessage(session); msg.setFrom(new InternetAddress(from)); @@ -189,6 +202,8 @@ public static void sendmail(String to, String from, String subject, StringBuilde msg.setContent(mimeMultipart); msg.setHeader("X-Mailer", "OpenCueMailer"); msg.setSentDate(new Date()); + Transport transport = session.getTransport("smtp"); + transport.connect(CueUtil.smtpHost, null, null); Transport.send(msg); } catch (Exception e) { diff --git a/cuebot/src/main/resources/conf/spring/applicationContext-service.xml b/cuebot/src/main/resources/conf/spring/applicationContext-service.xml index 521c74dd1..12e0889d8 100644 --- a/cuebot/src/main/resources/conf/spring/applicationContext-service.xml +++ b/cuebot/src/main/resources/conf/spring/applicationContext-service.xml @@ -54,33 +54,33 @@ - - - - LaunchQueue - - - dispatcher.launch_queue - + + + + - - - - DispatchPool - - - dispatcher.dispatch_pool - + + + + - - - - ManagePool + + + ${booking_queue.threadpool.health_threshold} - - dispatcher.manage_pool + + ${healthy_threadpool.min_unhealthy_period_min} + + + ${booking_queue.threadpool.queue_capacity} + + + ${booking_queue.threadpool.core_pool_size} + + + ${booking_queue.threadpool.max_pool_size} @@ -88,45 +88,67 @@ DispatchQueue - + + ${healthy_threadpool.health_threshold} + + + ${healthy_threadpool.min_unhealthy_period_min} + + + ${dispatch.threadpool.queue_capacity} + + + ${dispatch.threadpool.core_pool_size} + + + ${dispatch.threadpool.max_pool_size} + - + ManageQueue - + + ${healthy_threadpool.health_threshold} + + + ${healthy_threadpool.min_unhealthy_period_min} + + + ${dispatch.threadpool.queue_capacity} + + + ${dispatch.threadpool.core_pool_size} + + + ${dispatch.threadpool.max_pool_size} + - - - - ReportQueue + + ${report_queue.threadPoolSizeInitial} - - dispatcher.report_queue + + ${report_queue.threadPoolSizeMax} + + + ${report_queue.queueSize} - - - KillQueue - - - dispatcher.kill_queue + + ${kill_queue.threadPoolSizeInitial} - - - - - - dispatcher.booking_queue + + ${kill_queue.threadPoolSizeMax} - 300 + ${kill_queue.queueSize} + @@ -341,8 +363,8 @@ - + @@ -411,6 +433,76 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/cuebot/src/main/resources/log4j.properties b/cuebot/src/main/resources/log4j.properties index 138d514ae..5f44dbee9 100644 --- a/cuebot/src/main/resources/log4j.properties +++ b/cuebot/src/main/resources/log4j.properties @@ -28,6 +28,15 @@ log4j.appender.API.MaxBackupIndex=20 log4j.appender.API.layout=org.apache.log4j.PatternLayout log4j.appender.API.layout.ConversionPattern=%d:%m%n +log4j.category.HEALTH=DEBUG, HEALTH +log4j.additivity.HEALTH=false +log4j.appender.HEALTH=org.apache.log4j.RollingFileAppender +log4j.appender.HEALTH.File=logs/health.log +log4j.appender.HEALTH.MaxFileSize=10MB +log4j.appender.HEALTH.MaxBackupIndex=20 +log4j.appender.HEALTH.layout=org.apache.log4j.PatternLayout +log4j.appender.HEALTH.layout.ConversionPattern=%d:%m%n + log4j.logger.org.apache.catalina=INFO log4j.logger.com.imageworks.spcue=DEBUG log4j.logger.com.imageworks.spcue.dispatcher.RqdReportManagerService=DEBUG diff --git a/cuebot/src/main/resources/opencue.properties b/cuebot/src/main/resources/opencue.properties index 220cc2d5f..e64f6e417 100644 --- a/cuebot/src/main/resources/opencue.properties +++ b/cuebot/src/main/resources/opencue.properties @@ -21,6 +21,27 @@ grpc.rqd_cache_concurrency=20 # RQD Channel task deadline in seconds grpc.rqd_task_deadline=10 +# Healthy Threadpool Executor +booking_queue.threadpool.health_threshold=10 +booking_queue.threadpool.core_pool_size=10 +booking_queue.threadpool.max_pool_size=14 +booking_queue.threadpool.queue_capacity=2000 +dispatch.threadpool.core_pool_size=6 +dispatch.threadpool.max_pool_size=8 +dispatch.threadpool.queue_capacity=2000 +healthy_threadpool.health_threshold=6 +healthy_threadpool.min_unhealthy_period_min=3 +report_queue.threadPoolSizeInitial=6 +report_queue.threadPoolSizeMax=12 +# The queue size should be bigger then the expected amount of hosts +report_queue.queueSize=5000 +kill_queue.threadPoolSizeInitial=2 +kill_queue.threadPoolSizeMax=6 +kill_queue.queueSize=1000 + +# Turn on/off jobCompletion mailing module +mailing.enabled=true + # Whether or not to enable publishing to a messaging topic. # Set to a boolean value. See com/imageworks/spcue/services/JmsMover.java. messaging.enabled=false @@ -34,9 +55,9 @@ log.frame-log-root=${CUE_FRAME_LOG_DIR:/shots} dispatcher.job_query_max=20 # Number of seconds before waiting to book the same job from a different host. # "0" disables the job_lock -dispatcher.job_lock_expire_seconds=0 +dispatcher.job_lock_expire_seconds=20 # Concurrency level to allow on the job lock cache -dispatcher.job_lock_concurrency_level=3 +dispatcher.job_lock_concurrency_level=14 # Maximum number of frames to query from the DB to attempt to dispatch. dispatcher.frame_query_max=20 # Maximum number of frames to book at one time on the same host. @@ -96,3 +117,6 @@ history.archive_jobs_cutoff_hours=72 # Delete down hosts automatically. maintenance.auto_delete_down_hosts=false + +# Set hostname/IP of the smtp host. Will be used for mailing +smtp_host=smtp diff --git a/cuebot/src/test/java/com/imageworks/spcue/test/dispatcher/TestBookingQueue.java b/cuebot/src/test/java/com/imageworks/spcue/test/dispatcher/TestBookingQueue.java index 319c4cfe1..74b21c102 100644 --- a/cuebot/src/test/java/com/imageworks/spcue/test/dispatcher/TestBookingQueue.java +++ b/cuebot/src/test/java/com/imageworks/spcue/test/dispatcher/TestBookingQueue.java @@ -89,11 +89,18 @@ public void create() { @Rollback(true) public void testBookingQueue() { + int healthThreshold = 10; + int minUnhealthyPeriodMin = 3; + int queueCapacity = 2000; + int corePoolSize = 10; + int maxPoolSize = 14; + DispatchHost host1 = hostDao.findDispatchHost(HOSTNAME); host1.idleCores = 500; DispatchHost host2 = hostDao.findDispatchHost(HOSTNAME); DispatchHost host3 = hostDao.findDispatchHost(HOSTNAME); - + BookingQueue queue = new BookingQueue(healthThreshold, minUnhealthyPeriodMin, queueCapacity, + corePoolSize, maxPoolSize); bookingQueue.execute(new DispatchBookHost(host2,dispatcher)); bookingQueue.execute(new DispatchBookHost(host3,dispatcher)); bookingQueue.execute(new DispatchBookHost(host1,dispatcher)); diff --git a/cuebot/src/test/java/com/imageworks/spcue/test/dispatcher/ThreadPoolTests.java b/cuebot/src/test/java/com/imageworks/spcue/test/dispatcher/ThreadPoolTests.java deleted file mode 100644 index 76c9a3e0e..000000000 --- a/cuebot/src/test/java/com/imageworks/spcue/test/dispatcher/ThreadPoolTests.java +++ /dev/null @@ -1,93 +0,0 @@ -/* - * Copyright Contributors to the OpenCue Project - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - - -package com.imageworks.spcue.test.dispatcher; - -import java.util.concurrent.ThreadPoolExecutor; -import javax.annotation.Resource; - -import com.imageworks.spcue.config.TestAppConfig; -import com.imageworks.spcue.dispatcher.BookingQueue; -import com.imageworks.spcue.dispatcher.HostReportQueue; -import com.imageworks.spcue.dispatcher.ThreadPoolTaskExecutorWrapper; - -import org.junit.Test; -import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; -import org.springframework.test.context.ContextConfiguration; -import org.springframework.test.context.junit4.AbstractTransactionalJUnit4SpringContextTests; -import org.springframework.test.context.support.AnnotationConfigContextLoader; - -import static org.junit.Assert.assertEquals; - -@ContextConfiguration(classes=TestAppConfig.class, loader=AnnotationConfigContextLoader.class) -public class ThreadPoolTests extends AbstractTransactionalJUnit4SpringContextTests { - - @Resource - ThreadPoolTaskExecutor launchQueue; - - @Resource - ThreadPoolTaskExecutor dispatchPool; - - @Resource - ThreadPoolTaskExecutor managePool; - - @Resource - ThreadPoolExecutor reportQueue; - - @Resource - ThreadPoolExecutor killQueue; - - @Resource - BookingQueue bookingQueue; - - private int getQueueCapacity(ThreadPoolTaskExecutor t) { - return ((ThreadPoolTaskExecutorWrapper) t).getQueueCapacity(); - } - - private int getQueueCapacity(ThreadPoolExecutor t) { - return ((HostReportQueue) t).getQueueCapacity(); - } - - @Test - public void testPropertyValues() { - assertEquals(1, launchQueue.getCorePoolSize()); - assertEquals(1, launchQueue.getMaxPoolSize()); - assertEquals(100, getQueueCapacity(launchQueue)); - - assertEquals(4, dispatchPool.getCorePoolSize()); - assertEquals(4, dispatchPool.getMaxPoolSize()); - assertEquals(500, getQueueCapacity(dispatchPool)); - - assertEquals(8, managePool.getCorePoolSize()); - assertEquals(8, managePool.getMaxPoolSize()); - assertEquals(250, getQueueCapacity(managePool)); - - assertEquals(6, reportQueue.getCorePoolSize()); - assertEquals(8, reportQueue.getMaximumPoolSize()); - assertEquals(1000, getQueueCapacity(reportQueue)); - - assertEquals(6, killQueue.getCorePoolSize()); - assertEquals(8, killQueue.getMaximumPoolSize()); - assertEquals(1000, getQueueCapacity(killQueue)); - - assertEquals(6, bookingQueue.getCorePoolSize()); - assertEquals(6, bookingQueue.getMaximumPoolSize()); - assertEquals(1000, bookingQueue.getQueueCapacity()); - } -} - diff --git a/cuebot/src/test/resources/opencue.properties b/cuebot/src/test/resources/opencue.properties index d492f0a87..5c9264492 100644 --- a/cuebot/src/test/resources/opencue.properties +++ b/cuebot/src/test/resources/opencue.properties @@ -13,6 +13,24 @@ grpc.rqd_cache_concurrency=20 # RQD Channel task deadline in seconds grpc.rqd_task_deadline=10 +# Healthy Threadpool Executor +booking_queue.threadpool.health_threshold=10 +booking_queue.threadpool.core_pool_size=10 +booking_queue.threadpool.max_pool_size=14 +booking_queue.threadpool.queue_capacity=2000 +dispatch.threadpool.core_pool_size=6 +dispatch.threadpool.max_pool_size=8 +dispatch.threadpool.queue_capacity=2000 +healthy_threadpool.health_threshold=6 +healthy_threadpool.min_unhealthy_period_min=3 +report_queue.threadPoolSizeInitial=6 +report_queue.threadPoolSizeMax=12 +# The queue size should be bigger then the expected amount of hosts +report_queue.queueSize=5000 +kill_queue.threadPoolSizeInitial=2 +kill_queue.threadPoolSizeMax=6 +kill_queue.queueSize=1000 + log.frame-log-root=/arbitraryLogDirectory dispatcher.job_query_max=20 diff --git a/cuegui/cuegui/Config.py b/cuegui/cuegui/Config.py new file mode 100644 index 000000000..2c80f8492 --- /dev/null +++ b/cuegui/cuegui/Config.py @@ -0,0 +1,63 @@ +# Copyright Contributors to the OpenCue Project +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +"""Functions for loading application state and settings from disk.""" + +from __future__ import print_function +from __future__ import division +from __future__ import absolute_import + +import os +import shutil + +from PySide2 import QtCore + +import cuegui.Constants +import cuegui.Logger + +logger = cuegui.Logger.getLogger(__file__) + + +def startup(app_name): + """ + Reads config from disk, restoring default config if necessary. + + :param app_name: application window name + :type app_name: str + :return: settings object containing the loaded settings + :rtype: QtCore.QSettings + """ + # read saved config from disk + # copy default config + config_path = "/.%s/config" % app_name.lower() + settings = QtCore.QSettings(QtCore.QSettings.IniFormat, QtCore.QSettings.UserScope, config_path) + logger.info('Reading config file from %s', settings.fileName()) + local = settings.fileName() + + # If the user has chose to revert the layout. delete the file and copy the default back. + if settings.value('RevertLayout'): + logger.warning('Found RevertLayout flag, will restore default config') + os.remove(local) + + # If the config file does not exist, copy over the default + if not os.path.exists(local): + default = os.path.join(cuegui.Constants.DEFAULT_INI_PATH, "%s.ini" % app_name.lower()) + logger.warning('Local config file not found at %s', local) + logger.warning('Copying %s to %s', default, local) + os.makedirs(os.path.dirname(local), exist_ok=True) + shutil.copy2(default, local) + settings.sync() + + return settings diff --git a/cuegui/cuegui/Main.py b/cuegui/cuegui/Main.py index b97e3d885..58bb3073d 100644 --- a/cuegui/cuegui/Main.py +++ b/cuegui/cuegui/Main.py @@ -20,14 +20,13 @@ from __future__ import print_function from __future__ import division -import os -import shutil import signal from PySide2 import QtCore from PySide2 import QtGui from PySide2 import QtWidgets +import cuegui.Config import cuegui.Constants import cuegui.Logger import cuegui.MainWindow @@ -92,33 +91,11 @@ def startup(app_name, app_version, argv): QtGui.qApp.threads = [] # pylint: enable=attribute-defined-outside-init - config_path = "/.%s/config" % app_name.lower() - settings = QtCore.QSettings(QtCore.QSettings.IniFormat, QtCore.QSettings.UserScope, config_path) - local = settings.fileName() - # If the user has chose to revert the layout. delete the file and copy the default back. - if settings.value('RevertLayout'): - os.remove(local) - + settings = cuegui.Config.startup(app_name) QtGui.qApp.settings = settings # pylint: disable=attribute-defined-outside-init cuegui.Style.init() - # If the config file does not exist, copy over the default - # pylint: disable=broad-except - if not os.path.exists(local): - default = os.path.join(cuegui.Constants.DEFAULT_INI_PATH, "%s.ini" % app_name.lower()) - logger.warning('Not found: %s\nCopying: %s', local, default) - try: - os.mkdir(os.path.dirname(local)) - except Exception as e: - logger.debug(e) - try: - shutil.copy2(default, local) - except Exception as e: - logger.debug(e) - settings.sync() - # pylint: enable=broad-except - mainWindow = cuegui.MainWindow.MainWindow(app_name, app_version, None) mainWindow.displayStartupNotice() mainWindow.show() diff --git a/cuegui/tests/Config_tests.py b/cuegui/tests/Config_tests.py new file mode 100644 index 000000000..61ab2c193 --- /dev/null +++ b/cuegui/tests/Config_tests.py @@ -0,0 +1,93 @@ +# Copyright Contributors to the OpenCue Project +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +"""Tests for cuegui.Config""" + + +from __future__ import print_function +from __future__ import division +from __future__ import absolute_import + +import os +import shutil +import tempfile +import unittest + +from PySide2 import QtCore + +import cuegui.Config + + +CONFIG_INI = ''' +[General] +Version=0.14 + +[CueCommander] +Open=true +Title=CustomWindowTitle +OtherAttr=arbitrary-value +''' + +CONFIG_WITH_RESTORE_FLAG = ''' +[General] +Version=0.14 +RevertLayout=true + +[CueCommander] +OtherAttr=arbitrary-value +''' + + +class ConfigTests(unittest.TestCase): + def setUp(self): + self.config_dir = tempfile.mkdtemp() + QtCore.QSettings.setPath( + QtCore.QSettings.IniFormat, QtCore.QSettings.UserScope, self.config_dir) + + def tearDown(self): + shutil.rmtree(self.config_dir) + + def test__should_load_user_config(self): + app_name = 'arbitraryapp' + config_file_path = os.path.join(self.config_dir, '.%s' % app_name, 'config.ini') + os.mkdir(os.path.dirname(config_file_path)) + with open(config_file_path, 'w') as fp: + fp.write(CONFIG_INI) + + settings = cuegui.Config.startup(app_name) + + self.assertEqual('0.14', settings.value('Version')) + self.assertEqual('true', settings.value('CueCommander/Open')) + self.assertEqual('CustomWindowTitle', settings.value('CueCommander/Title')) + self.assertEqual('arbitrary-value', settings.value('CueCommander/OtherAttr')) + + def test__should_load_default_config(self): + settings = cuegui.Config.startup('CueCommander') + + self.assertEqual('false', settings.value('CueCommander/Open')) + self.assertEqual('CueCommander', settings.value('CueCommander/Title')) + self.assertFalse(settings.value('CueCommander/OtherAttr', False)) + + def test__should_restore_default_config(self): + config_file_path = os.path.join(self.config_dir, '.cuecommander', 'config.ini') + os.mkdir(os.path.dirname(config_file_path)) + with open(config_file_path, 'w') as fp: + fp.write(CONFIG_WITH_RESTORE_FLAG) + + settings = cuegui.Config.startup('CueCommander') + + self.assertEqual('false', settings.value('CueCommander/Open')) + self.assertEqual('CueCommander', settings.value('CueCommander/Title')) + self.assertFalse(settings.value('CueCommander/OtherAttr', False)) diff --git a/sandbox/flyway.Dockerfile b/sandbox/flyway.Dockerfile index 900c4449c..d3d45b998 100644 --- a/sandbox/flyway.Dockerfile +++ b/sandbox/flyway.Dockerfile @@ -1,16 +1,20 @@ FROM centos +ARG FLYWAY_VERSION=8.5.4 + # Get flyway -RUN ["curl", "-O", "https://repo1.maven.org/maven2/org/flywaydb/flyway-commandline/6.0.0/flyway-commandline-6.0.0-linux-x64.tar.gz"] -RUN ["yum", "install", "-y", "tar", "java-1.8.0-openjdk", "postgresql-jdbc", "nc", "postgresql"] -RUN ["tar", "-xzf", "flyway-commandline-6.0.0-linux-x64.tar.gz"] +RUN sed -i 's/mirrorlist/#mirrorlist/g' /etc/yum.repos.d/CentOS-* +RUN sed -i 's|#baseurl=http://mirror.centos.org|baseurl=http://vault.centos.org|g' /etc/yum.repos.d/CentOS-* +RUN yum install -y tar java-1.8.0-openjdk postgresql-jdbc nc postgresql +RUN curl -O https://repo1.maven.org/maven2/org/flywaydb/flyway-commandline/${FLYWAY_VERSION}/flyway-commandline-${FLYWAY_VERSION}-linux-x64.tar.gz +RUN tar -xzf flyway-commandline-${FLYWAY_VERSION}-linux-x64.tar.gz -WORKDIR flyway-6.0.0 +WORKDIR flyway-${FLYWAY_VERSION} # Copy the postgres driver to its required location -RUN ["cp", "/usr/share/java/postgresql-jdbc.jar", "jars/"] -RUN ["mkdir", "/opt/migrations"] -RUN ["mkdir", "/opt/scripts"] +RUN cp /usr/share/java/postgresql-jdbc.jar jars/ +RUN mkdir /opt/migrations +RUN mkdir /opt/scripts COPY ./cuebot/src/main/resources/conf/ddl/postgres/migrations /opt/migrations COPY ./cuebot/src/main/resources/conf/ddl/postgres/seed_data.sql /opt/scripts COPY ./sandbox/migrate.sh /opt/scripts/