Skip to content
This repository has been archived by the owner on Nov 14, 2024. It is now read-only.

Borrow WC variants of executor views to reduce thread count and increase reuse #4877

Merged
merged 2 commits into from
Jul 2, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@

import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

import com.codahale.metrics.InstrumentedExecutorService;
import com.codahale.metrics.MetricRegistry;
Expand All @@ -33,7 +31,6 @@
import com.palantir.atlasdb.keyvalue.cassandra.async.client.creation.CqlClientFactory;
import com.palantir.atlasdb.keyvalue.cassandra.async.client.creation.DefaultCqlClientFactory;
import com.palantir.atlasdb.util.MetricsManager;
import com.palantir.common.concurrent.NamedThreadFactory;
import com.palantir.common.concurrent.PTExecutors;
import com.palantir.tracing.Tracers;

Expand Down Expand Up @@ -88,10 +85,7 @@ public ExecutorService visit(CqlCapableConfig cqlCapableConfig) {
* @return a new dynamic thread pool with a thread keep alive time of 1 minute
*/
private static ExecutorService createThreadPool(int maxPoolSize) {
LinkedBlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>();
NamedThreadFactory threadFactory = new NamedThreadFactory("Atlas Cassandra Async KVS", false);

return PTExecutors.newThreadPoolExecutor(0, maxPoolSize, 1, TimeUnit.MINUTES, workQueue, threadFactory);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This executor configuration is highly suspect. At a glance I believe it will only ever have a single thread unless we manage to fill the LinkedBlockingQueue with Integer.MAX_VALUE elements.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We looked at metrics-platform and it turns out this executor is basically unused in practice...

return PTExecutors.newFixedThreadPool(maxPoolSize, "Atlas Cassandra Async KVS");
}

private ExecutorService tracingExecutorService(ExecutorService executorService) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

Expand Down Expand Up @@ -89,10 +88,8 @@ protected static ExecutorService createFixedThreadPool(String threadNamePrefix,
* @param maxPoolSize maximum size of the pool
* @return a new fixed size thread pool with a keep alive time of 1 minute
*/
protected static ExecutorService createThreadPool(String threadNamePrefix, int corePoolSize, int maxPoolSize) {
return PTExecutors.newThreadPoolExecutor(corePoolSize, maxPoolSize,
1, TimeUnit.MINUTES,
new LinkedBlockingQueue<>(), new NamedThreadFactory(threadNamePrefix, false));
protected static ExecutorService createThreadPool(String threadNamePrefix, int _corePoolSize, int maxPoolSize) {
return PTExecutors.newFixedThreadPool(maxPoolSize, threadNamePrefix);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,12 @@
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.Collectors;

import org.immutables.value.Value;

import com.codahale.metrics.InstrumentedExecutorService;
import com.codahale.metrics.MetricRegistry;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
Expand All @@ -49,7 +44,6 @@
import com.palantir.atlasdb.http.NotCurrentLeaderExceptionMapper;
import com.palantir.atlasdb.util.AtlasDbMetrics;
import com.palantir.atlasdb.util.MetricsManager;
import com.palantir.common.concurrent.NamedThreadFactory;
import com.palantir.common.concurrent.PTExecutors;
import com.palantir.common.streams.KeyedStream;
import com.palantir.conjure.java.api.config.service.UserAgent;
Expand All @@ -59,7 +53,6 @@
import com.palantir.leader.LeaderElectionServiceBuilder;
import com.palantir.leader.LeadershipObserver;
import com.palantir.leader.LocalPingableLeader;
import com.palantir.leader.PaxosLeaderElectionService;
import com.palantir.leader.PaxosLeadershipEventRecorder;
import com.palantir.leader.PingableLeader;
import com.palantir.paxos.ImmutableLeaderPingerContext;
Expand Down Expand Up @@ -239,20 +232,7 @@ private static <T> Map<T, ExecutorService> createExecutorsForService(
// TODO (jkong): Make the limits configurable.
// Current use cases tend to have not more than 10 (<< 100) inflight tasks under normal circumstances.
private static ExecutorService createExecutor(MetricsManager metricsManager, String useCase, int corePoolSize) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: this is private so we could probably remove the unused corepoolsize parameter.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

iirc it's passed through several methods and I wanted to keep the code churn to a minimum.

return new InstrumentedExecutorService(
PTExecutors.newThreadPoolExecutor(
corePoolSize,
100,
5000,
TimeUnit.MILLISECONDS,
new SynchronousQueue<>(),
daemonThreadFactory("atlas-leaders-election-" + useCase)),
metricsManager.getRegistry(),
MetricRegistry.name(PaxosLeaderElectionService.class, useCase, "executor"));
}

private static ThreadFactory daemonThreadFactory(String name) {
return new NamedThreadFactory(name, true);
return PTExecutors.newCachedThreadPoolWithMaxThreads(100, "atlas-leaders-election-" + useCase);
}

public static <T> List<T> createProxyAndLocalList(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,6 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
Expand Down Expand Up @@ -121,7 +119,6 @@
import com.palantir.common.base.ClosableIterators;
import com.palantir.common.base.Throwables;
import com.palantir.common.collect.Maps2;
import com.palantir.common.concurrent.NamedThreadFactory;
import com.palantir.common.concurrent.PTExecutors;
import com.palantir.exception.PalantirSqlException;
import com.palantir.logsafe.Preconditions;
Expand Down Expand Up @@ -248,14 +245,8 @@ private DbKvs(ExecutorService executor,
this.getCandidateCellsForSweepingStrategy = getCandidateCellsForSweepingStrategy;
}

private static ThreadPoolExecutor newFixedThreadPool(int maxPoolSize) {
ThreadPoolExecutor pool = PTExecutors.newThreadPoolExecutor(maxPoolSize, maxPoolSize,
15L, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(),
new NamedThreadFactory("Atlas DbKvs reader", true /* daemon */));

pool.allowCoreThreadTimeOut(false);
return pool;
private static ExecutorService newFixedThreadPool(int maxPoolSize) {
return PTExecutors.newFixedThreadPool(maxPoolSize, "Atlas DbKvs reader");
}

private void init() {
Expand Down
14 changes: 14 additions & 0 deletions changelog/@unreleased/pr-4877.v2.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
changes:
- type: improvement
improvement:
description: |-
PTExecutors simple cached and fixed executor factories (those which don't consume a ThreadFactory) use views
over a shared executor service to reduce total thread count and promote resource reuse.
links:
- https://github.com/palantir/atlasdb/pull/4877
- type: break
break:
description: |-
`PTExecutors.newFixedThreadPool` overloads return `ExecutorService` instead of the concrete `ThreadPoolExecutor` type.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note: will cause breaks in large internal product. Can confirm not used by the common AtlasDB supporting libraries.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm planning to fix the internal product once this is available.

links:
- https://github.com/palantir/atlasdb/pull/4877
Original file line number Diff line number Diff line change
Expand Up @@ -360,15 +360,12 @@ public static boolean assertNotOnSqlThread() {

private static final String SELECT_THREAD_NAME = "SQL select statement"; //$NON-NLS-1$
private static final String EXECUTE_THREAD_NAME = "SQL execute statement"; //$NON-NLS-1$
private static final int KEEP_SQL_THREAD_ALIVE_TIMEOUT = 3000; //3 seconds

// TODO (jkong): Should these be lazily initialized?
private static final Supplier<ExecutorService> DEFAULT_SELECT_EXECUTOR =
Suppliers.memoize(() -> PTExecutors.newCachedThreadPool(
new NamedThreadFactory(SELECT_THREAD_NAME, true), KEEP_SQL_THREAD_ALIVE_TIMEOUT));
Suppliers.memoize(() -> PTExecutors.newCachedThreadPool(SELECT_THREAD_NAME));
static final Supplier<ExecutorService> DEFAULT_EXECUTE_EXECUTOR =
Suppliers.memoize(() -> PTExecutors.newCachedThreadPool(
new NamedThreadFactory(EXECUTE_THREAD_NAME, true), KEEP_SQL_THREAD_ALIVE_TIMEOUT));
Suppliers.memoize(() -> PTExecutors.newCachedThreadPool(EXECUTE_THREAD_NAME));

private ExecutorService selectStatementExecutor;
private ExecutorService executeStatementExecutor;
Expand Down
Loading