From 0a6e3ee6dfe4505c54490402e70823335cb3cd16 Mon Sep 17 00:00:00 2001 From: Carter Kozak Date: Tue, 30 Jun 2020 17:46:19 -0400 Subject: [PATCH 1/2] Borrow WC variants of executor views to reduce thread count and increase reuse Note that this includes an API+ABI break for fixed-size-executor factory methods which have been updated to return ExecutorService instead of ThreadPoolExecutor. --- ...tCassandraAsyncKeyValueServiceFactory.java | 8 +- .../impl/AbstractKeyValueService.java | 7 +- .../com/palantir/atlasdb/factory/Leaders.java | 22 +- .../atlasdb/keyvalue/dbkvs/impl/DbKvs.java | 13 +- .../com/palantir/nexus/db/sql/BasicSQL.java | 7 +- .../concurrent/AtlasQueuedViewExecutor.java | 335 ++++++++++++++++++ .../AtlasQueuelessViewExecutor.java | 267 ++++++++++++++ .../AtlasRenamingExecutorService.java | 94 +++++ .../AtlasUncaughtExceptionHandler.java | 31 ++ .../common/concurrent/AtlasViewExecutor.java | 146 ++++++++ .../common/concurrent/NamedThreadFactory.java | 1 + .../common/concurrent/PTExecutors.java | 48 ++- .../paxos/TimeLockPaxosExecutors.java | 21 +- 13 files changed, 926 insertions(+), 74 deletions(-) create mode 100644 commons-executors/src/main/java/com/palantir/common/concurrent/AtlasQueuedViewExecutor.java create mode 100644 commons-executors/src/main/java/com/palantir/common/concurrent/AtlasQueuelessViewExecutor.java create mode 100644 commons-executors/src/main/java/com/palantir/common/concurrent/AtlasRenamingExecutorService.java create mode 100644 commons-executors/src/main/java/com/palantir/common/concurrent/AtlasUncaughtExceptionHandler.java create mode 100644 commons-executors/src/main/java/com/palantir/common/concurrent/AtlasViewExecutor.java diff --git a/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/async/DefaultCassandraAsyncKeyValueServiceFactory.java b/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/async/DefaultCassandraAsyncKeyValueServiceFactory.java index 0ad66e9bdb0..a7f4b7e54d1 100644 --- a/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/async/DefaultCassandraAsyncKeyValueServiceFactory.java +++ b/atlasdb-cassandra/src/main/java/com/palantir/atlasdb/keyvalue/cassandra/async/DefaultCassandraAsyncKeyValueServiceFactory.java @@ -18,8 +18,6 @@ import java.util.Optional; import java.util.concurrent.ExecutorService; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.TimeUnit; import com.codahale.metrics.InstrumentedExecutorService; import com.codahale.metrics.MetricRegistry; @@ -33,7 +31,6 @@ import com.palantir.atlasdb.keyvalue.cassandra.async.client.creation.CqlClientFactory; import com.palantir.atlasdb.keyvalue.cassandra.async.client.creation.DefaultCqlClientFactory; import com.palantir.atlasdb.util.MetricsManager; -import com.palantir.common.concurrent.NamedThreadFactory; import com.palantir.common.concurrent.PTExecutors; import com.palantir.tracing.Tracers; @@ -88,10 +85,7 @@ public ExecutorService visit(CqlCapableConfig cqlCapableConfig) { * @return a new dynamic thread pool with a thread keep alive time of 1 minute */ private static ExecutorService createThreadPool(int maxPoolSize) { - LinkedBlockingQueue workQueue = new LinkedBlockingQueue<>(); - NamedThreadFactory threadFactory = new NamedThreadFactory("Atlas Cassandra Async KVS", false); - - return PTExecutors.newThreadPoolExecutor(0, maxPoolSize, 1, TimeUnit.MINUTES, workQueue, threadFactory); + return PTExecutors.newFixedThreadPool(maxPoolSize, "Atlas Cassandra Async KVS"); } private ExecutorService tracingExecutorService(ExecutorService executorService) { diff --git a/atlasdb-client/src/main/java/com/palantir/atlasdb/keyvalue/impl/AbstractKeyValueService.java b/atlasdb-client/src/main/java/com/palantir/atlasdb/keyvalue/impl/AbstractKeyValueService.java index 12be00e284f..ae99faed172 100644 --- a/atlasdb-client/src/main/java/com/palantir/atlasdb/keyvalue/impl/AbstractKeyValueService.java +++ b/atlasdb-client/src/main/java/com/palantir/atlasdb/keyvalue/impl/AbstractKeyValueService.java @@ -21,7 +21,6 @@ import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; -import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -89,10 +88,8 @@ protected static ExecutorService createFixedThreadPool(String threadNamePrefix, * @param maxPoolSize maximum size of the pool * @return a new fixed size thread pool with a keep alive time of 1 minute */ - protected static ExecutorService createThreadPool(String threadNamePrefix, int corePoolSize, int maxPoolSize) { - return PTExecutors.newThreadPoolExecutor(corePoolSize, maxPoolSize, - 1, TimeUnit.MINUTES, - new LinkedBlockingQueue<>(), new NamedThreadFactory(threadNamePrefix, false)); + protected static ExecutorService createThreadPool(String threadNamePrefix, int _corePoolSize, int maxPoolSize) { + return PTExecutors.newFixedThreadPool(maxPoolSize, threadNamePrefix); } @Override diff --git a/atlasdb-config/src/main/java/com/palantir/atlasdb/factory/Leaders.java b/atlasdb-config/src/main/java/com/palantir/atlasdb/factory/Leaders.java index c6ee65d414b..bacfeccca58 100644 --- a/atlasdb-config/src/main/java/com/palantir/atlasdb/factory/Leaders.java +++ b/atlasdb-config/src/main/java/com/palantir/atlasdb/factory/Leaders.java @@ -23,17 +23,12 @@ import java.util.Set; import java.util.UUID; import java.util.concurrent.ExecutorService; -import java.util.concurrent.SynchronousQueue; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.TimeUnit; import java.util.function.Consumer; import java.util.function.Supplier; import java.util.stream.Collectors; import org.immutables.value.Value; -import com.codahale.metrics.InstrumentedExecutorService; -import com.codahale.metrics.MetricRegistry; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; @@ -49,7 +44,6 @@ import com.palantir.atlasdb.http.NotCurrentLeaderExceptionMapper; import com.palantir.atlasdb.util.AtlasDbMetrics; import com.palantir.atlasdb.util.MetricsManager; -import com.palantir.common.concurrent.NamedThreadFactory; import com.palantir.common.concurrent.PTExecutors; import com.palantir.common.streams.KeyedStream; import com.palantir.conjure.java.api.config.service.UserAgent; @@ -59,7 +53,6 @@ import com.palantir.leader.LeaderElectionServiceBuilder; import com.palantir.leader.LeadershipObserver; import com.palantir.leader.LocalPingableLeader; -import com.palantir.leader.PaxosLeaderElectionService; import com.palantir.leader.PaxosLeadershipEventRecorder; import com.palantir.leader.PingableLeader; import com.palantir.paxos.ImmutableLeaderPingerContext; @@ -239,20 +232,7 @@ private static Map createExecutorsForService( // TODO (jkong): Make the limits configurable. // Current use cases tend to have not more than 10 (<< 100) inflight tasks under normal circumstances. private static ExecutorService createExecutor(MetricsManager metricsManager, String useCase, int corePoolSize) { - return new InstrumentedExecutorService( - PTExecutors.newThreadPoolExecutor( - corePoolSize, - 100, - 5000, - TimeUnit.MILLISECONDS, - new SynchronousQueue<>(), - daemonThreadFactory("atlas-leaders-election-" + useCase)), - metricsManager.getRegistry(), - MetricRegistry.name(PaxosLeaderElectionService.class, useCase, "executor")); - } - - private static ThreadFactory daemonThreadFactory(String name) { - return new NamedThreadFactory(name, true); + return PTExecutors.newCachedThreadPoolWithMaxThreads(100, "atlas-leaders-election-" + useCase); } public static List createProxyAndLocalList( diff --git a/atlasdb-dbkvs/src/main/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/DbKvs.java b/atlasdb-dbkvs/src/main/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/DbKvs.java index 6d519336738..a77d2635f1a 100644 --- a/atlasdb-dbkvs/src/main/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/DbKvs.java +++ b/atlasdb-dbkvs/src/main/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/DbKvs.java @@ -36,8 +36,6 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Supplier; @@ -121,7 +119,6 @@ import com.palantir.common.base.ClosableIterators; import com.palantir.common.base.Throwables; import com.palantir.common.collect.Maps2; -import com.palantir.common.concurrent.NamedThreadFactory; import com.palantir.common.concurrent.PTExecutors; import com.palantir.exception.PalantirSqlException; import com.palantir.logsafe.Preconditions; @@ -248,14 +245,8 @@ private DbKvs(ExecutorService executor, this.getCandidateCellsForSweepingStrategy = getCandidateCellsForSweepingStrategy; } - private static ThreadPoolExecutor newFixedThreadPool(int maxPoolSize) { - ThreadPoolExecutor pool = PTExecutors.newThreadPoolExecutor(maxPoolSize, maxPoolSize, - 15L, TimeUnit.SECONDS, - new LinkedBlockingQueue(), - new NamedThreadFactory("Atlas DbKvs reader", true /* daemon */)); - - pool.allowCoreThreadTimeOut(false); - return pool; + private static ExecutorService newFixedThreadPool(int maxPoolSize) { + return PTExecutors.newFixedThreadPool(maxPoolSize, "Atlas DbKvs reader"); } private void init() { diff --git a/commons-db/src/main/java/com/palantir/nexus/db/sql/BasicSQL.java b/commons-db/src/main/java/com/palantir/nexus/db/sql/BasicSQL.java index 3f77eac947c..f8f26d9eae9 100644 --- a/commons-db/src/main/java/com/palantir/nexus/db/sql/BasicSQL.java +++ b/commons-db/src/main/java/com/palantir/nexus/db/sql/BasicSQL.java @@ -360,15 +360,12 @@ public static boolean assertNotOnSqlThread() { private static final String SELECT_THREAD_NAME = "SQL select statement"; //$NON-NLS-1$ private static final String EXECUTE_THREAD_NAME = "SQL execute statement"; //$NON-NLS-1$ - private static final int KEEP_SQL_THREAD_ALIVE_TIMEOUT = 3000; //3 seconds // TODO (jkong): Should these be lazily initialized? private static final Supplier DEFAULT_SELECT_EXECUTOR = - Suppliers.memoize(() -> PTExecutors.newCachedThreadPool( - new NamedThreadFactory(SELECT_THREAD_NAME, true), KEEP_SQL_THREAD_ALIVE_TIMEOUT)); + Suppliers.memoize(() -> PTExecutors.newCachedThreadPool(SELECT_THREAD_NAME)); static final Supplier DEFAULT_EXECUTE_EXECUTOR = - Suppliers.memoize(() -> PTExecutors.newCachedThreadPool( - new NamedThreadFactory(EXECUTE_THREAD_NAME, true), KEEP_SQL_THREAD_ALIVE_TIMEOUT)); + Suppliers.memoize(() -> PTExecutors.newCachedThreadPool(EXECUTE_THREAD_NAME)); private ExecutorService selectStatementExecutor; private ExecutorService executeStatementExecutor; diff --git a/commons-executors/src/main/java/com/palantir/common/concurrent/AtlasQueuedViewExecutor.java b/commons-executors/src/main/java/com/palantir/common/concurrent/AtlasQueuedViewExecutor.java new file mode 100644 index 00000000000..7b1683514cd --- /dev/null +++ b/commons-executors/src/main/java/com/palantir/common/concurrent/AtlasQueuedViewExecutor.java @@ -0,0 +1,335 @@ +/* + * (c) Copyright 2020 Palantir Technologies Inc. All rights reserved. + */ + +package com.palantir.common.concurrent; + +import java.util.ArrayDeque; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executor; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Borrowed from jboss-threads. http://www.apache.org/licenses/LICENSE-2.0 + * https://github.com/jbossas/jboss-threads/blob/master/src/main/java/org/jboss/threads/QueuedViewExecutor.java Changes + * have been contributed and merged, this may be replaced by the upstream ViewExecutor pending a release including + * https://github.com/jbossas/jboss-threads/pull/85. + * + *

Licensed under http://www.apache.org/licenses/LICENSE-2.0. + * https://github.com/jbossas/jboss-threads/blob/5df767f325214acf3f7b80fa5354411c4453e073/LICENSE.txt + * + *

An executorservice that is actually a "view" over another executor service. + */ +@SuppressWarnings({ + "checkstyle:InnerAssignment", + "checkstyle:HiddenField", + "checkstyle:NestedTryDepth", + "checkstyle:CyclomaticComplexity", + "NullAway" + }) +final class AtlasQueuedViewExecutor extends AtlasViewExecutor { + private static final Logger log = LoggerFactory.getLogger(AtlasQueuedViewExecutor.class); + private static final Runnable[] NO_RUNNABLES = new Runnable[0]; + + private final Executor delegate; + + private final Lock lock; + private final Condition shutDownCondition; + private final ArrayDeque queue; + private final Set allWrappers = Collections.newSetFromMap(new ConcurrentHashMap<>()); + private final int queueLimit; + private final short maxCount; + private short submittedCount; + private short runningCount; + private int state = ST_RUNNING; + + private static final int ST_RUNNING = 0; + private static final int ST_SHUTDOWN_REQ = 1; + private static final int ST_SHUTDOWN_INT_REQ = 2; + private static final int ST_STOPPED = 3; + + AtlasQueuedViewExecutor( + final Executor delegate, + final short maxCount, + final int queueLimit, + final int queueInitialSize, + final Thread.UncaughtExceptionHandler handler) { + this.delegate = delegate; + this.maxCount = maxCount; + this.queueLimit = queueLimit; + this.setExceptionHandler(handler); + queue = new ArrayDeque<>(Math.min(queueLimit, queueInitialSize)); + lock = new ReentrantLock(); + shutDownCondition = lock.newCondition(); + } + + @Override + public void execute(final Runnable command) { + lock.lock(); + try { + if (state != ST_RUNNING) { + throw new RejectedExecutionException("Executor has been shut down"); + } + final short submittedCount = this.submittedCount; + if (runningCount + submittedCount < maxCount) { + this.submittedCount = (short) (submittedCount + 1); + final TaskWrapper tw = new TaskWrapper(command); + allWrappers.add(tw); + try { + /* this cannot be easily moved outside of the lock, otherwise queued tasks might never run + * under certain rare scenarios. + */ + delegate.execute(tw); + } catch (Throwable t) { + this.submittedCount--; + allWrappers.remove(tw); + throw t; + } + } else if (queue.size() < queueLimit) { + queue.add(command); + } else { + throw new RejectedExecutionException("No executor queue space remaining"); + } + } finally { + lock.unlock(); + } + } + + @Override + @SuppressWarnings("LockNotBeforeTry") + public void shutdown(boolean interrupt) { + lock.lock(); + int oldState = this.state; + if (oldState < ST_SHUTDOWN_REQ) { + // do shutdown + final boolean emptyQueue; + try { + emptyQueue = queue.isEmpty(); + } catch (Throwable t) { + lock.unlock(); + throw t; + } + if (runningCount == 0 && submittedCount == 0 && emptyQueue) { + this.state = ST_STOPPED; + try { + shutDownCondition.signalAll(); + } finally { + lock.unlock(); + } + runTermination(); + return; + } + } + // didn't exit + this.state = interrupt ? ST_SHUTDOWN_INT_REQ : ST_SHUTDOWN_REQ; + lock.unlock(); + if (interrupt && oldState < ST_SHUTDOWN_INT_REQ) { + // interrupt all runners + for (TaskWrapper wrapper : allWrappers) { + wrapper.interrupt(); + } + } + } + + @Override + public List shutdownNow() { + lock.lock(); + int oldState = this.state; + final Runnable[] tasks; + try { + tasks = queue.toArray(NO_RUNNABLES); + queue.clear(); + } catch (Throwable t) { + lock.unlock(); + throw t; + } + if (oldState < ST_SHUTDOWN_INT_REQ) { + // do shutdown + if (runningCount == 0 && submittedCount == 0) { + this.state = ST_STOPPED; + try { + shutDownCondition.signalAll(); + } finally { + lock.unlock(); + } + runTermination(); + } else { + lock.unlock(); + this.state = ST_SHUTDOWN_INT_REQ; + // interrupt all runners + for (TaskWrapper wrapper : allWrappers) { + wrapper.interrupt(); + } + } + } else { + lock.unlock(); + } + return Arrays.asList(tasks); + } + + @Override + public boolean isShutdown() { + lock.lock(); + try { + return state >= ST_SHUTDOWN_REQ; + } finally { + lock.unlock(); + } + } + + @Override + public boolean isTerminated() { + lock.lock(); + try { + return state == ST_STOPPED; + } finally { + lock.unlock(); + } + } + + @Override + public boolean awaitTermination(final long timeout, final TimeUnit unit) throws InterruptedException { + lock.lock(); + try { + if (state == ST_STOPPED) { + return true; + } + final long nanos = unit.toNanos(timeout); + if (nanos <= 0) { + return false; + } + long elapsed = 0; + final long start = System.nanoTime(); + for (;;) { + shutDownCondition.awaitNanos(nanos - elapsed); + if (state == ST_STOPPED) { + return true; + } + elapsed = System.nanoTime() - start; + if (elapsed >= nanos) { + return false; + } + } + } finally { + lock.unlock(); + } + } + + @Override + public String toString() { + return "view of " + delegate; + } + + class TaskWrapper implements Runnable { + private volatile Thread thread; + private Runnable command; + + TaskWrapper(final Runnable command) { + this.command = command; + } + + synchronized void interrupt() { + final Thread thread = this.thread; + if (thread != null) { + thread.interrupt(); + } + } + + @Override + public void run() { + boolean resetStateOnCompletion = true; + thread = Thread.currentThread(); + // Interruption may be missed between when a TaskWrapper is submitted + // to the delegate executor, and when the task begins to execute. + // This must execute after thread is set. + if (state == ST_SHUTDOWN_INT_REQ) { + Thread.currentThread().interrupt(); + } + try { + for (;;) { + lock.lock(); + try { + submittedCount--; + runningCount++; + } finally { + lock.unlock(); + } + try { + command.run(); + } catch (Throwable t) { + try { + getExceptionHandler().uncaughtException(Thread.currentThread(), t); + } catch (Throwable tt) { + log.debug("failed to call the uncaught exception handler", tt); + } + } + lock.lock(); + runningCount--; + try { + command = queue.pollFirst(); + } catch (Throwable t) { + lock.unlock(); + throw t; + } + if (runningCount + submittedCount < maxCount && command != null) { + // execute next + submittedCount++; + lock.unlock(); + } else if (command == null && runningCount == 0 && submittedCount == 0 && state != ST_RUNNING) { + // we're the last task + state = ST_STOPPED; + try { + shutDownCondition.signalAll(); + } finally { + lock.unlock(); + } + runTermination(); + return; + } else { + lock.unlock(); + return; + } + try { + // Unset the current thread prior to resubmitting this task to avoid clobbering the value + // if the delegate executes in parallel to the finally block. + unsetThread(); + delegate.execute(this); + resetStateOnCompletion = false; + // resubmitted this task for execution, so return + return; + } catch (Throwable t) { + log.warn( + "Failed to resubmit executor task to delegate executor" + + " (executing task immediately instead)", + t); + // resubmit failed, so continue execution in this thread after resetting state + thread = Thread.currentThread(); + // resubmit failed, so continue execution in this thread + } + } + } finally { + if (resetStateOnCompletion) { + allWrappers.remove(this); + unsetThread(); + } + } + } + + // Must be synchronized with interrupt() to avoid acquiring the thread reference as work completes + // and interrupting the next task run by this thread which may not originate from this view. + private synchronized void unsetThread() { + thread = null; + } + } +} diff --git a/commons-executors/src/main/java/com/palantir/common/concurrent/AtlasQueuelessViewExecutor.java b/commons-executors/src/main/java/com/palantir/common/concurrent/AtlasQueuelessViewExecutor.java new file mode 100644 index 00000000000..76e8ab3e840 --- /dev/null +++ b/commons-executors/src/main/java/com/palantir/common/concurrent/AtlasQueuelessViewExecutor.java @@ -0,0 +1,267 @@ +/* + * (c) Copyright 2020 Palantir Technologies Inc. All rights reserved. + */ + +package com.palantir.common.concurrent; + +import java.util.Collections; +import java.util.List; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executor; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; +import javax.annotation.Nullable; + +import com.palantir.logsafe.Preconditions; + +/** + * Borrowed from jboss-threads. http://www.apache.org/licenses/LICENSE-2.0 + * https://github.com/jbossas/jboss-threads/blob/master/src/main/java/org/jboss/threads/QueuelessViewExecutor.java + * Changes have been contributed and merged, this may be replaced by the upstream ViewExecutor pending a release + * including https://github.com/jbossas/jboss-threads/pull/85. + * + *

Licensed under http://www.apache.org/licenses/LICENSE-2.0. + * https://github.com/jbossas/jboss-threads/blob/5df767f325214acf3f7b80fa5354411c4453e073/LICENSE.txt + * + *

A View Executor implementation which avoids lock contention in the common path. This allows us to provide + * references to the same underlying pool of threads to different consumers and utilize distinct instrumentation without + * duplicating resources. This implementation is optimized to avoid locking in cases where the view is not required + * queue work beyond a fixed number of permits, useful for cached executors for example. + * + * @author Carter Kozak + */ +@SuppressWarnings({"checkstyle:InnerAssignment", "checkstyle:HiddenField", "NullAway"}) +final class AtlasQueuelessViewExecutor extends AtlasViewExecutor { + private static final AtomicIntegerFieldUpdater stateUpdater = + AtomicIntegerFieldUpdater.newUpdater(AtlasQueuelessViewExecutor.class, "state"); + + private static final int SHUTDOWN_MASK = 1 << 31; + private static final int ACTIVE_COUNT_MASK = (1 << 31) - 1; + + private final Executor delegate; + private final int maxCount; + + private final Object shutdownLock = new Object(); + private final Set activeRunnables = ConcurrentHashMap.newKeySet(); + + /** + * State structure. + * + *

    + *
  • Bit 00..30: Number of active tasks (unsigned) + *
  • Bit 31: executor shutdown state; 0 = shutdown has not been requested + *
+ */ + @SuppressWarnings("unused") + private volatile int state = 0; + + private volatile boolean interrupted = false; + + AtlasQueuelessViewExecutor( + final Executor delegate, + final int maxCount, + @Nullable final Thread.UncaughtExceptionHandler uncaughtExceptionHandler) { + this.delegate = Preconditions.checkNotNull(delegate, "delegate"); + this.maxCount = maxCount; + this.setExceptionHandler(uncaughtExceptionHandler); + } + + @Override + public void shutdown(boolean interrupt) { + for (;;) { + int stateSnapshot = state; + if (isShutdown(stateSnapshot)) { + break; // nothing to do + } + int newState = stateSnapshot | SHUTDOWN_MASK; + if (compareAndSwapState(stateSnapshot, newState)) { + notifyWaitersIfTerminated(newState); + break; + } + } + if (interrupt) { + interrupted = true; + activeRunnables.forEach(AtlasQueuelessViewExecutorRunnable::interrupt); + } + } + + @Override + public List shutdownNow() { + shutdown(true); + // This implementation is built for cached executors which do not queue so it's impossible + // to have queued runnables. + return Collections.emptyList(); + } + + @Override + public boolean isShutdown() { + return isShutdown(state); + } + + private static boolean isShutdown(int state) { + return (state & SHUTDOWN_MASK) != 0; + } + + @Override + public boolean isTerminated() { + return isTerminated(state); + } + + private static boolean isTerminated(int state) { + return state == SHUTDOWN_MASK; + } + + private void notifyWaitersIfTerminated(int stateSnapshot) { + if (isTerminated(stateSnapshot)) { + synchronized (shutdownLock) { + shutdownLock.notifyAll(); + } + runTermination(); + } + } + + @Override + public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { + long remainingNanos = unit.toNanos(timeout); + // Use the system precise clock to avoid issues resulting from time changes. + long now = System.nanoTime(); + synchronized (shutdownLock) { + while (!isTerminated()) { + remainingNanos -= Math.max(-now + (now = System.nanoTime()), 0L); + long remainingMillis = TimeUnit.MILLISECONDS.convert(remainingNanos, TimeUnit.NANOSECONDS); + if (remainingMillis <= 0) { + return false; + } + shutdownLock.wait(remainingMillis); + } + return true; + } + } + + @Override + public void execute(Runnable task) { + Preconditions.checkNotNull(task, "task"); + incrementActiveOrReject(); + boolean submittedTask = false; + try { + // When CachedExecutorViewRunnable allocation fails the active count must be reduced. + delegate.execute(new AtlasQueuelessViewExecutorRunnable(task)); + submittedTask = true; + } finally { + if (!submittedTask) { + decrementActive(); + } + } + } + + /** Increments the active task count, otherwise throws a {@link RejectedExecutionException}. */ + private void incrementActiveOrReject() { + int maxCount = this.maxCount; + for (;;) { + int stateSnapshot = state; + if (isShutdown(stateSnapshot)) { + throw new RejectedExecutionException("Executor has been shut down"); + } + + int activeCount = getActiveCount(stateSnapshot); + if (activeCount >= maxCount) { + throw new RejectedExecutionException("No executor queue space remaining"); + } + int updatedActiveCount = activeCount + 1; + if (compareAndSwapState(stateSnapshot, updatedActiveCount | (stateSnapshot & ~ACTIVE_COUNT_MASK))) { + return; + } + } + } + + private void decrementActive() { + for (;;) { + int stateSnapshot = state; + int updatedActiveCount = getActiveCount(stateSnapshot) - 1; + int newState = updatedActiveCount | (stateSnapshot & ~ACTIVE_COUNT_MASK); + if (compareAndSwapState(stateSnapshot, newState)) { + notifyWaitersIfTerminated(newState); + return; + } + } + } + + private static int getActiveCount(int state) { + return state & ACTIVE_COUNT_MASK; + } + + private boolean compareAndSwapState(int expected, int update) { + return stateUpdater.compareAndSet(this, expected, update); + } + + @Override + public String toString() { + return "AtlasQueuelessViewExecutor{delegate=" + delegate + ", state=" + state + '}'; + } + + private final class AtlasQueuelessViewExecutorRunnable implements Runnable { + + private final Runnable delegate; + + @Nullable + private volatile Thread thread; + + AtlasQueuelessViewExecutorRunnable(Runnable delegate) { + this.delegate = delegate; + } + + @Override + public void run() { + Thread currentThread = Thread.currentThread(); + Set runnables = activeRunnables; + this.thread = currentThread; + try { + runnables.add(this); + if (interrupted) { + // shutdownNow may have been invoked after this task was submitted + // but prior to activeRunnables.add(this). + currentThread.interrupt(); + } + delegate.run(); + } catch (Throwable t) { + // The uncaught exception handler should be called on the current thread in order to log + // using the updated thread name based on nameFunction. + uncaughtExceptionHandler().uncaughtException(thread, t); + } finally { + runnables.remove(this); + // Synchronization is important to avoid racily reading the current thread and interrupting + // it after this task completes and a task from another view has begun execution. + synchronized (this) { + this.thread = null; + } + decrementActive(); + } + } + + private Thread.UncaughtExceptionHandler uncaughtExceptionHandler() { + Thread.UncaughtExceptionHandler handler = getExceptionHandler(); + if (handler != null) { + return handler; + } + // If not uncaught exception handler is set, use the current threads existing handler if present. + // Otherwise use the default handler. + Thread.UncaughtExceptionHandler threadHandler = + Thread.currentThread().getUncaughtExceptionHandler(); + return threadHandler != null ? threadHandler : AtlasUncaughtExceptionHandler.INSTANCE; + } + + synchronized void interrupt() { + Thread taskThread = this.thread; + if (taskThread != null) { + taskThread.interrupt(); + } + } + + @Override + public String toString() { + return "AtlasQueuelessViewExecutorRunnable{" + delegate + '}'; + } + } +} diff --git a/commons-executors/src/main/java/com/palantir/common/concurrent/AtlasRenamingExecutorService.java b/commons-executors/src/main/java/com/palantir/common/concurrent/AtlasRenamingExecutorService.java new file mode 100644 index 00000000000..779ce061bfb --- /dev/null +++ b/commons-executors/src/main/java/com/palantir/common/concurrent/AtlasRenamingExecutorService.java @@ -0,0 +1,94 @@ +/* + * (c) Copyright 2020 Palantir Technologies Inc. All rights reserved. + */ + +package com.palantir.common.concurrent; + +import java.util.List; +import java.util.concurrent.AbstractExecutorService; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Supplier; + +import com.palantir.logsafe.Preconditions; + +final class AtlasRenamingExecutorService extends AbstractExecutorService { + + private final ExecutorService delegate; + + private final Thread.UncaughtExceptionHandler handler; + + private final Supplier nameSupplier; + + AtlasRenamingExecutorService( + ExecutorService delegate, Thread.UncaughtExceptionHandler handler, Supplier nameSupplier) { + this.delegate = Preconditions.checkNotNull(delegate, "delegate"); + this.nameSupplier = Preconditions.checkNotNull(nameSupplier, "nameSupplier"); + this.handler = Preconditions.checkNotNull(handler, "handler"); + } + + @Override + public void shutdown() { + delegate.shutdown(); + } + + @Override + public List shutdownNow() { + return delegate.shutdownNow(); + } + + @Override + public boolean isShutdown() { + return delegate.isShutdown(); + } + + @Override + public boolean isTerminated() { + return delegate.isTerminated(); + } + + @Override + public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { + return delegate.awaitTermination(timeout, unit); + } + + @Override + public void execute(Runnable command) { + delegate.execute(new RenamingRunnable(command)); + } + + @Override + public String toString() { + return "AtlasRenamingExecutorService{delegate=" + delegate + '}'; + } + + final class RenamingRunnable implements Runnable { + + private final Runnable command; + + RenamingRunnable(Runnable command) { + this.command = command; + } + + @Override + public void run() { + final Thread currentThread = Thread.currentThread(); + final String originalName = currentThread.getName(); + currentThread.setName(nameSupplier.get()); + try { + command.run(); + } catch (Throwable t) { + handler.uncaughtException(currentThread, t); + } finally { + currentThread.setName(originalName); + } + } + } + + static Supplier threadNameSupplier(String name) { + AtomicLong index = new AtomicLong(); + ThreadLocal threadNameCache = ThreadLocal.withInitial(() -> name + "-" + index.getAndIncrement()); + return threadNameCache::get; + } +} diff --git a/commons-executors/src/main/java/com/palantir/common/concurrent/AtlasUncaughtExceptionHandler.java b/commons-executors/src/main/java/com/palantir/common/concurrent/AtlasUncaughtExceptionHandler.java new file mode 100644 index 00000000000..99ae454b1d8 --- /dev/null +++ b/commons-executors/src/main/java/com/palantir/common/concurrent/AtlasUncaughtExceptionHandler.java @@ -0,0 +1,31 @@ +/* + * (c) Copyright 2020 Palantir Technologies Inc. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.palantir.common.concurrent; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +enum AtlasUncaughtExceptionHandler implements Thread.UncaughtExceptionHandler { + INSTANCE; + + private static final Logger log = LoggerFactory.getLogger(AtlasUncaughtExceptionHandler.class); + + @Override + public void uncaughtException(@SuppressWarnings("unused") Thread thread, Throwable throwable) { + log.error("Uncaught Exception", throwable); + } +} diff --git a/commons-executors/src/main/java/com/palantir/common/concurrent/AtlasViewExecutor.java b/commons-executors/src/main/java/com/palantir/common/concurrent/AtlasViewExecutor.java new file mode 100644 index 00000000000..82227a08e8c --- /dev/null +++ b/commons-executors/src/main/java/com/palantir/common/concurrent/AtlasViewExecutor.java @@ -0,0 +1,146 @@ +/* + * (c) Copyright 2020 Palantir Technologies Inc. All rights reserved. + */ + +package com.palantir.common.concurrent; + +import java.util.concurrent.AbstractExecutorService; +import java.util.concurrent.Executor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.palantir.logsafe.Preconditions; + +/** + * Borrowed from jboss-threads. http://www.apache.org/licenses/LICENSE-2.0 + * https://github.com/jbossas/jboss-threads/blob/master/src/main/java/org/jboss/threads/ViewExecutor.java Changes have + * been contributed and merged, this may be replaced by the upstream ViewExecutor pending a release including + * https://github.com/jbossas/jboss-threads/pull/85. + * + *

Licensed under http://www.apache.org/licenses/LICENSE-2.0. + * https://github.com/jbossas/jboss-threads/blob/5df767f325214acf3f7b80fa5354411c4453e073/LICENSE.txt + * + *

An executor service that is actually a "view" over another executor service. + */ +@SuppressWarnings("NullAway") +abstract class AtlasViewExecutor extends AbstractExecutorService { + + private static final Logger log = LoggerFactory.getLogger(AtlasViewExecutor.class); + private volatile Thread.UncaughtExceptionHandler handler; + private volatile Runnable terminationTask; + + // Intentionally package private to effectively seal the type. + AtlasViewExecutor() {} + + @Override + public final void shutdown() { + shutdown(false); + } + + abstract void shutdown(boolean interrupt); + + final Thread.UncaughtExceptionHandler getExceptionHandler() { + return handler; + } + + public final void setExceptionHandler(final Thread.UncaughtExceptionHandler value) { + Preconditions.checkNotNull(value, "handler"); + this.handler = value; + } + + final Runnable getTerminationTask() { + return terminationTask; + } + + final void setTerminationTask(final Runnable terminationTask) { + this.terminationTask = terminationTask; + } + + static Builder builder(Executor delegate) { + Preconditions.checkNotNull(delegate, "delegate"); + return new Builder(delegate); + } + + static final class Builder { + private final Executor delegate; + private short maxSize = 1; + private int queueLimit = Integer.MAX_VALUE; + private int queueInitialSize = 256; + private Thread.UncaughtExceptionHandler handler = AtlasUncaughtExceptionHandler.INSTANCE; + + Builder(final Executor delegate) { + this.delegate = delegate; + } + + int getMaxSize() { + return maxSize; + } + + Builder setMaxSize(final int value) { + Preconditions.checkArgument(value > 0, "maxSize must be positive"); + Preconditions.checkArgument(value <= Short.MAX_VALUE, "maxSize must not exceed " + Short.MAX_VALUE); + this.maxSize = (short) value; + return this; + } + + int getQueueLimit() { + return queueLimit; + } + + Builder setQueueLimit(final int value) { + Preconditions.checkArgument(value >= 0, "queueLimit must be non-negative"); + this.queueLimit = value; + return this; + } + + Executor getDelegate() { + return delegate; + } + + Thread.UncaughtExceptionHandler getUncaughtHandler() { + return handler; + } + + Builder setUncaughtHandler(final Thread.UncaughtExceptionHandler value) { + this.handler = value; + return this; + } + + int getQueueInitialSize() { + return queueInitialSize; + } + + Builder setQueueInitialSize(final int queueInitialSize) { + this.queueInitialSize = queueInitialSize; + return this; + } + + AtlasViewExecutor build() { + if (queueLimit == 0) { + return new AtlasQueuelessViewExecutor( + Preconditions.checkNotNull(delegate, "delegate"), maxSize, handler); + } + return new AtlasQueuedViewExecutor( + Preconditions.checkNotNull(delegate, "delegate"), maxSize, queueLimit, queueInitialSize, handler); + } + } + + protected final void runTermination() { + final Runnable task = AtlasViewExecutor.this.terminationTask; + AtlasViewExecutor.this.terminationTask = null; + if (task != null) { + try { + task.run(); + } catch (Throwable t) { + Thread.UncaughtExceptionHandler configuredHandler = handler; + if (configuredHandler != null) { + try { + handler.uncaughtException(Thread.currentThread(), t); + } catch (Throwable tt) { + log.debug("failed to invoke uncaught exception handler", tt); + } + } + } + } + } +} diff --git a/commons-executors/src/main/java/com/palantir/common/concurrent/NamedThreadFactory.java b/commons-executors/src/main/java/com/palantir/common/concurrent/NamedThreadFactory.java index a8e6e9e40ad..bc70dfa9174 100644 --- a/commons-executors/src/main/java/com/palantir/common/concurrent/NamedThreadFactory.java +++ b/commons-executors/src/main/java/com/palantir/common/concurrent/NamedThreadFactory.java @@ -69,6 +69,7 @@ public Thread newThread(Runnable runnable) { Thread thread = threadFactory.newThread(runnable); thread.setName(prefix + "-" + count.getAndIncrement()); thread.setDaemon(isDaemon); + thread.setUncaughtExceptionHandler(AtlasUncaughtExceptionHandler.INSTANCE); return thread; } diff --git a/commons-executors/src/main/java/com/palantir/common/concurrent/PTExecutors.java b/commons-executors/src/main/java/com/palantir/common/concurrent/PTExecutors.java index d0291cffc71..19725967ab4 100644 --- a/commons-executors/src/main/java/com/palantir/common/concurrent/PTExecutors.java +++ b/commons-executors/src/main/java/com/palantir/common/concurrent/PTExecutors.java @@ -36,16 +36,19 @@ import java.util.concurrent.ThreadPoolExecutor.AbortPolicy; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Supplier; import javax.annotation.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.annotations.Beta; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.CharMatcher; import com.google.common.base.Preconditions; import com.google.common.base.Strings; +import com.google.common.base.Suppliers; import com.google.common.util.concurrent.Runnables; import com.palantir.tracing.Tracers; import com.palantir.tritium.metrics.MetricRegistries; @@ -62,6 +65,10 @@ public final class PTExecutors { private static Logger log = LoggerFactory.getLogger(PTExecutors.class); + private static final Supplier SHARED_EXECUTOR = Suppliers.memoize(() -> + // Shared pool uses 60 second idle thread timeouts for greater reuse + Executors.newCachedThreadPool(new NamedThreadFactory("ptexecutors-shared", true))); + private static final String FILE_NAME_FOR_THIS_CLASS = PTExecutors.class.getSimpleName() + ".java"; /** @@ -109,7 +116,7 @@ public static ExecutorService newCachedThreadPool() { public static ExecutorService newCachedThreadPool(String name) { Preconditions.checkNotNull(name, "Name is required"); Preconditions.checkArgument(!name.isEmpty(), "Name must not be empty"); - return newCachedThreadPool(new NamedThreadFactory(name, true)); + return newCachedThreadPoolWithMaxThreads(Short.MAX_VALUE, name); } /** @@ -151,6 +158,27 @@ public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory, i threadFactory), threadFactory); } + /** Specialized cached executor which throws + * {@link java.util.concurrent.RejectedExecutionException} once max-threads have been exceeded. + * + * If you have any doubt, this probably isn't what you're looking for. Best of luck, friend. + */ + @Beta + public static ExecutorService newCachedThreadPoolWithMaxThreads(int maxThreads, String name) { + Preconditions.checkNotNull(name, "Name is required"); + Preconditions.checkArgument(!name.isEmpty(), "Name must not be empty"); + Preconditions.checkArgument(maxThreads > 0, "Max threads must be positive"); + return MetricRegistries.instrument(SharedTaggedMetricRegistries.getSingleton(), + PTExecutors.wrap(name, new AtlasRenamingExecutorService(AtlasViewExecutor.builder(SHARED_EXECUTOR.get()) + .setMaxSize(Math.min(Short.MAX_VALUE, maxThreads)) + .setQueueLimit(0) + .setUncaughtHandler(AtlasUncaughtExceptionHandler.INSTANCE) + .build(), + AtlasUncaughtExceptionHandler.INSTANCE, + AtlasRenamingExecutorService.threadNameSupplier(name))), + name); + } + /** * Instruments the provided {@link ExecutorService} if the {@link ThreadFactory} is a {@link NamedThreadFactory}. */ @@ -176,10 +204,8 @@ private static ExecutorService tryInstrument(ExecutorService executorService, Th * @return the newly created thread pool * @throws IllegalArgumentException if numThreads <= 0 */ - public static ThreadPoolExecutor newFixedThreadPool(int numThreads) { - return newThreadPoolExecutor(numThreads, numThreads, - DEFAULT_THREAD_POOL_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS, - new LinkedBlockingQueue(), newNamedThreadFactory()); + public static ExecutorService newFixedThreadPool(int numThreads) { + return newFixedThreadPool(numThreads, computeBaseThreadName()); } /** @@ -219,8 +245,16 @@ public static ThreadPoolExecutor newFixedThreadPool(int numThreads, ThreadFactor * @return the newly created thread pool * @throws IllegalArgumentException if numThreads <= 0 */ - public static ThreadPoolExecutor newFixedThreadPool(int numThreads, String name) { - return newFixedThreadPool(numThreads, new NamedThreadFactory(name, true)); + public static ExecutorService newFixedThreadPool(int numThreads, String name) { + return MetricRegistries.instrument(SharedTaggedMetricRegistries.getSingleton(), + PTExecutors.wrap(name, new AtlasRenamingExecutorService(AtlasViewExecutor.builder(SHARED_EXECUTOR.get()) + .setMaxSize(numThreads) + .setQueueLimit(Integer.MAX_VALUE) + .setUncaughtHandler(AtlasUncaughtExceptionHandler.INSTANCE) + .build(), + AtlasUncaughtExceptionHandler.INSTANCE, + AtlasRenamingExecutorService.threadNameSupplier(name))), + name); } /** diff --git a/timelock-agent/src/main/java/com/palantir/atlasdb/timelock/paxos/TimeLockPaxosExecutors.java b/timelock-agent/src/main/java/com/palantir/atlasdb/timelock/paxos/TimeLockPaxosExecutors.java index 1ec42a99f1f..ef298de1973 100644 --- a/timelock-agent/src/main/java/com/palantir/atlasdb/timelock/paxos/TimeLockPaxosExecutors.java +++ b/timelock-agent/src/main/java/com/palantir/atlasdb/timelock/paxos/TimeLockPaxosExecutors.java @@ -16,27 +16,19 @@ package com.palantir.atlasdb.timelock.paxos; -import java.time.Duration; import java.util.Map; import java.util.concurrent.ExecutorService; -import java.util.concurrent.SynchronousQueue; -import java.util.concurrent.TimeUnit; -import com.codahale.metrics.InstrumentedExecutorService; import com.codahale.metrics.MetricRegistry; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableMap; import com.google.common.util.concurrent.MoreExecutors; -import com.palantir.common.concurrent.NamedThreadFactory; import com.palantir.common.concurrent.PTExecutors; final class TimeLockPaxosExecutors { @VisibleForTesting static final int MAXIMUM_POOL_SIZE = 100; - private static final Duration THREAD_KEEP_ALIVE = Duration.ofSeconds(5); - private static final int SINGLE_THREAD_FOR_MOSTLY_AUTOBATCHED_OPERATIONS = 1; - private TimeLockPaxosExecutors() { // no } @@ -69,15 +61,8 @@ static Map createBoundedExecutors( * Users of such an executor should be prepared to handle {@link java.util.concurrent.RejectedExecutionException}. */ static ExecutorService createBoundedExecutor(MetricRegistry metricRegistry, String useCase, int index) { - return new InstrumentedExecutorService( - PTExecutors.newThreadPoolExecutor( - SINGLE_THREAD_FOR_MOSTLY_AUTOBATCHED_OPERATIONS, - MAXIMUM_POOL_SIZE, - THREAD_KEEP_ALIVE.toMillis(), - TimeUnit.MILLISECONDS, - new SynchronousQueue<>(), - new NamedThreadFactory("timelock-executors-" + useCase, true)), - metricRegistry, - MetricRegistry.name(TimeLockPaxosExecutors.class, useCase, "executor-" + index)); + // metricRegistry is ignored because TExecutors.newCachedThreadPoolWithMaxThreads provides instrumentation. + return PTExecutors.newCachedThreadPoolWithMaxThreads( + MAXIMUM_POOL_SIZE, "timelock-executors-" + useCase + "-" + index); } } From 8e42db57fbd49bffcf1d731bfa3ef16c02d8357b Mon Sep 17 00:00:00 2001 From: Carter Kozak Date: Wed, 1 Jul 2020 07:59:41 -0400 Subject: [PATCH 2/2] changelog --- changelog/@unreleased/pr-4877.v2.yml | 14 ++++++++++++++ 1 file changed, 14 insertions(+) create mode 100644 changelog/@unreleased/pr-4877.v2.yml diff --git a/changelog/@unreleased/pr-4877.v2.yml b/changelog/@unreleased/pr-4877.v2.yml new file mode 100644 index 00000000000..d239e10e0e9 --- /dev/null +++ b/changelog/@unreleased/pr-4877.v2.yml @@ -0,0 +1,14 @@ +changes: + - type: improvement + improvement: + description: |- + PTExecutors simple cached and fixed executor factories (those which don't consume a ThreadFactory) use views + over a shared executor service to reduce total thread count and promote resource reuse. + links: + - https://github.com/palantir/atlasdb/pull/4877 + - type: break + break: + description: |- + `PTExecutors.newFixedThreadPool` overloads return `ExecutorService` instead of the concrete `ThreadPoolExecutor` type. + links: + - https://github.com/palantir/atlasdb/pull/4877 \ No newline at end of file