diff --git a/atlasdb-client/src/main/java/com/palantir/atlasdb/keyvalue/impl/AbstractKeyValueService.java b/atlasdb-client/src/main/java/com/palantir/atlasdb/keyvalue/impl/AbstractKeyValueService.java index 60521224371..07a2202efc0 100644 --- a/atlasdb-client/src/main/java/com/palantir/atlasdb/keyvalue/impl/AbstractKeyValueService.java +++ b/atlasdb-client/src/main/java/com/palantir/atlasdb/keyvalue/impl/AbstractKeyValueService.java @@ -23,7 +23,6 @@ import com.google.common.collect.Maps; import com.google.common.collect.Multimap; import com.google.common.primitives.Ints; -import com.google.common.util.concurrent.Futures; import com.palantir.atlasdb.AtlasDbConstants; import com.palantir.atlasdb.AtlasDbPerformanceConstants; import com.palantir.atlasdb.keyvalue.api.BatchColumnRangeSelection; @@ -38,16 +37,14 @@ import com.palantir.atlasdb.keyvalue.api.TableReference; import com.palantir.atlasdb.keyvalue.api.Value; import com.palantir.common.base.ClosableIterator; +import com.palantir.common.concurrent.BlockingWorkerPool; import com.palantir.common.concurrent.PTExecutors; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; -import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; -import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; @SuppressFBWarnings("SLF4J_ILLEGAL_PASSED_CLASS") public abstract class AbstractKeyValueService implements KeyValueService { @@ -57,14 +54,20 @@ public abstract class AbstractKeyValueService implements KeyValueService { .maximumWeight(1_000_000) .build(AbstractKeyValueService::computeInternalTableName); - protected ExecutorService executor; + protected final ExecutorService executor; + protected final int executorQosSize; /** * Note: This takes ownership of the given executor. It will be shutdown when the key * value service is closed. */ public AbstractKeyValueService(ExecutorService executor) { + this(executor, 0); + } + + public AbstractKeyValueService(ExecutorService executor, int executorQosSize) { this.executor = executor; + this.executorQosSize = executorQosSize; } /** @@ -189,16 +192,17 @@ public void close() { @Override public void truncateTables(final Set tableRefs) { - List> futures = new ArrayList<>(); - for (final TableReference tableRef : tableRefs) { - futures.add(executor.submit(() -> { - truncateTable(tableRef); - return null; - })); - } - - for (Future future : futures) { - Futures.getUnchecked(future); + BlockingWorkerPool pool = new BlockingWorkerPool<>(executor, executorQosSize); + try { + for (final TableReference tableRef : tableRefs) { + pool.submitTask(() -> { + truncateTable(tableRef); + }); + } + pool.waitForSubmittedTasks(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); } } diff --git a/atlasdb-client/src/main/java/com/palantir/atlasdb/keyvalue/impl/KeyValueServices.java b/atlasdb-client/src/main/java/com/palantir/atlasdb/keyvalue/impl/KeyValueServices.java index 484296016ac..5627247f7fc 100644 --- a/atlasdb-client/src/main/java/com/palantir/atlasdb/keyvalue/impl/KeyValueServices.java +++ b/atlasdb-client/src/main/java/com/palantir/atlasdb/keyvalue/impl/KeyValueServices.java @@ -114,7 +114,7 @@ public static void getFirstBatchForRangeUsingGetRange( final long timestamp, int maxConcurrentRequests) { final Map, byte[]>> ret = new ConcurrentHashMap<>(); - BlockingWorkerPool pool = new BlockingWorkerPool(executor, maxConcurrentRequests); + BlockingWorkerPool pool = new BlockingWorkerPool<>(executor, maxConcurrentRequests); try { for (final RangeRequest request : rangeRequests) { pool.submitTask(() -> getFirstBatchForRangeUsingGetRange(kv, tableRef, request, timestamp, ret)); diff --git a/atlasdb-commons/src/main/java/com/palantir/common/concurrent/BlockingWorkerPool.java b/atlasdb-commons/src/main/java/com/palantir/common/concurrent/BlockingWorkerPool.java index 84967639e5f..f59ac894c9e 100644 --- a/atlasdb-commons/src/main/java/com/palantir/common/concurrent/BlockingWorkerPool.java +++ b/atlasdb-commons/src/main/java/com/palantir/common/concurrent/BlockingWorkerPool.java @@ -17,6 +17,7 @@ import com.google.common.base.Throwables; import com.palantir.logsafe.Preconditions; +import java.util.concurrent.Callable; import java.util.concurrent.CompletionService; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorCompletionService; @@ -24,14 +25,21 @@ import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicInteger; -public class BlockingWorkerPool { - private final CompletionService service; +public class BlockingWorkerPool { + private final CompletionService service; private final int concurrentTaskLimit; private final AtomicInteger currentTaskCount = new AtomicInteger(); + /** + * Construct a BlockingWorkerPool. + * + * @param executor The ExecutorService to use for tasks + * @param concurrentTaskLimit The limit for concurrently running tasks. If this value is 0 or negative, then no + * limit is enforced. + */ public BlockingWorkerPool(ExecutorService executor, int concurrentTaskLimit) { - this.service = new ExecutorCompletionService(executor); + this.service = new ExecutorCompletionService<>(executor); this.concurrentTaskLimit = concurrentTaskLimit; } @@ -47,17 +55,45 @@ public synchronized void submitTask(Runnable task) throws InterruptedException { waitForAvailability(); Preconditions.checkState( - currentTaskCount.get() < concurrentTaskLimit, "currentTaskCount must be less than currentTaskLimit"); + concurrentTaskLimit <= 0 || currentTaskCount.get() < concurrentTaskLimit, + "currentTaskCount must be less than currentTaskLimit"); service.submit(task, null); currentTaskCount.incrementAndGet(); } + /** + * Submits a callable task to the pool. Behaves the same as {@link #submitTask(Runnable)}. + */ + public synchronized Future submitCallable(Callable task) throws InterruptedException { + waitForAvailability(); + + Preconditions.checkState( + concurrentTaskLimit <= 0 || currentTaskCount.get() < concurrentTaskLimit, + "currentTaskCount must be less than currentTaskLimit"); + Future result = service.submit(task); + currentTaskCount.incrementAndGet(); + return result; + } + + /** + * Same as {@link #submitCallable(Callable)} but will wrap any InterruptedException in a RuntimeException. + * If an InterruptedException was encountered, the thread interrupt flag will still be set. + */ + public Future submitCallableUnchecked(Callable task) { + try { + return submitCallable(task); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } + } + private void waitForSingleTask() throws InterruptedException { if (currentTaskCount.get() <= 0) { return; } - Future f = service.take(); + Future f = service.take(); currentTaskCount.decrementAndGet(); try { f.get(); @@ -81,9 +117,10 @@ public synchronized void waitForSubmittedTasks() throws InterruptedException { /** * Waits until the number of tasks drops below the concurrent task limit. + * If the limit is 0 or negative, then there is no enforced limit and this will not wait. */ public synchronized void waitForAvailability() throws InterruptedException { - if (currentTaskCount.get() >= concurrentTaskLimit) { + if (concurrentTaskLimit > 0 && currentTaskCount.get() >= concurrentTaskLimit) { waitForSingleTask(); } } diff --git a/atlasdb-dbkvs/src/main/java/com/palantir/atlasdb/keyvalue/dbkvs/DdlConfig.java b/atlasdb-dbkvs/src/main/java/com/palantir/atlasdb/keyvalue/dbkvs/DdlConfig.java index 69092026f06..aea62c61a93 100644 --- a/atlasdb-dbkvs/src/main/java/com/palantir/atlasdb/keyvalue/dbkvs/DdlConfig.java +++ b/atlasdb-dbkvs/src/main/java/com/palantir/atlasdb/keyvalue/dbkvs/DdlConfig.java @@ -42,6 +42,18 @@ public int poolSize() { return 64; } + /** + * If set to a positive number, this is the maximum number of concurrent tasks that may execute from a single + * parallel operation. The overall max concurrency is still limited by the {@link #poolSize()}. Parallel operations + * include multiPut, truncateTables, and getRanges (postgres only). + *

+ * If this is 0 or negative, then no limit is enforced. + */ + @Value.Default + public int poolQosSize() { + return 0; + } + @Value.Default public int fetchBatchSize() { return 256; diff --git a/atlasdb-dbkvs/src/main/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/DbKvs.java b/atlasdb-dbkvs/src/main/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/DbKvs.java index 30239d5d4f1..98f92a94816 100644 --- a/atlasdb-dbkvs/src/main/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/DbKvs.java +++ b/atlasdb-dbkvs/src/main/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/DbKvs.java @@ -96,6 +96,7 @@ import com.palantir.common.base.ClosableIterators; import com.palantir.common.base.Throwables; import com.palantir.common.collect.Maps2; +import com.palantir.common.concurrent.BlockingWorkerPool; import com.palantir.common.concurrent.PTExecutors; import com.palantir.common.concurrent.SharedFixedExecutors; import com.palantir.exception.PalantirSqlException; @@ -132,9 +133,7 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.Callable; -import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Supplier; @@ -217,7 +216,8 @@ private static DbKvs createPostgres( config, tableFactory, connections, - new ParallelTaskRunner(newFixedThreadPool(config.poolSize()), config.fetchBatchSize()), + new ParallelTaskRunner( + newFixedThreadPool(config.poolSize()), config.fetchBatchSize(), config.poolQosSize()), (conns, tbl, ids) -> Collections.emptyMap(), // no overflow on postgres new PostgresGetRange(prefixedTableNames, connections, tableMetadataCache), new DbKvsGetCandidateCellsForSweeping(cellTsPairLoader)); @@ -266,7 +266,7 @@ private DbKvs( OverflowValueLoader overflowValueLoader, DbKvsGetRange getRangeStrategy, DbKvsGetCandidateCellsForSweeping getCandidateCellsForSweepingStrategy) { - super(executor); + super(executor, config.poolQosSize()); this.config = config; this.dbTables = dbTables; this.connections = connections; @@ -451,20 +451,15 @@ public void multiPut(Map> valuesByTa } } - List> futures; + BlockingWorkerPool pool = new BlockingWorkerPool<>(executor, executorQosSize); try { - futures = executor.invokeAll(callables); - } catch (InterruptedException e) { - throw Throwables.throwUncheckedException(e); - } - for (Future future : futures) { - try { - future.get(); - } catch (InterruptedException e) { - throw Throwables.throwUncheckedException(e); - } catch (ExecutionException e) { - throw Throwables.rewrapAndThrowUncheckedException(e.getCause()); + for (Callable callable : callables) { + pool.submitCallable(callable); } + pool.waitForSubmittedTasks(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); } } diff --git a/atlasdb-dbkvs/src/main/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/batch/ParallelTaskRunner.java b/atlasdb-dbkvs/src/main/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/batch/ParallelTaskRunner.java index 2dba4ec4f86..f2b990381d5 100644 --- a/atlasdb-dbkvs/src/main/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/batch/ParallelTaskRunner.java +++ b/atlasdb-dbkvs/src/main/java/com/palantir/atlasdb/keyvalue/dbkvs/impl/batch/ParallelTaskRunner.java @@ -16,6 +16,7 @@ package com.palantir.atlasdb.keyvalue.dbkvs.impl.batch; import com.google.common.base.Throwables; +import com.palantir.common.concurrent.BlockingWorkerPool; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ExecutionException; @@ -26,10 +27,28 @@ public class ParallelTaskRunner implements BatchingTaskRunner { private final ExecutorService executor; private final int batchSize; + private final int executorQosSize; public ParallelTaskRunner(ExecutorService executor, int batchSize) { + this(executor, batchSize, 0); + } + + /** + * Constructs a ParallelTaskRunner. + * + * @param executor the ExecutorService to use for running tasks + * @param batchSize the batchSize to pass into the {@link BatchingStrategy}. + * @param executorQosSize When set to a positive value, this is the maximum number of concurrent tasks that may run + * spawning from a single thread. Multiple threads may each call {@link #runTask} and each + * may have up to this number of tasks run concurrently (independently). If the executor has + * a bounded number of threads, that limit is still applies which may result in lower + * concurrency than this value. If this is set to zero or a negative number, then there is + * no limit for the number of concurrent tasks which originate from the same thread. + */ + public ParallelTaskRunner(ExecutorService executor, int batchSize, int executorQosSize) { this.executor = executor; this.batchSize = batchSize; + this.executorQosSize = executorQosSize; } @Override @@ -40,8 +59,9 @@ public OutT runTask( Function task) { Iterable batches = batchingStrategy.partitionIntoBatches(input, batchSize); List> futures = new ArrayList<>(); + BlockingWorkerPool pool = new BlockingWorkerPool<>(executor, executorQosSize); for (InT batch : batches) { - Future future = executor.submit(() -> task.apply(batch)); + Future future = pool.submitCallableUnchecked(() -> task.apply(batch)); futures.add(future); } OutT result = resultAccumulatingStrategy.createEmptyResult(); diff --git a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/table/common/RangeVisitor.java b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/table/common/RangeVisitor.java index a3bfac723e7..810d5e1975d 100644 --- a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/table/common/RangeVisitor.java +++ b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/table/common/RangeVisitor.java @@ -99,7 +99,7 @@ public RangeVisitor setProgressCounter(AtomicLong newCounter) { } public long visit(final Visitor visitor) throws InterruptedException { - BlockingWorkerPool pool = new BlockingWorkerPool(exec, threadCount); + BlockingWorkerPool pool = new BlockingWorkerPool<>(exec, threadCount); for (final MutableRange range : getRanges()) { pool.submitTask(() -> visitRange(visitor, range)); } diff --git a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/table/common/TableTasks.java b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/table/common/TableTasks.java index b230d0e3fef..f654efec5ef 100644 --- a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/table/common/TableTasks.java +++ b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/table/common/TableTasks.java @@ -525,7 +525,7 @@ private static class InterruptibleRangeExecutor { } void executeTask(String taskName, InterruptibleRangeTask task) throws InterruptedException { - BlockingWorkerPool pool = new BlockingWorkerPool(exec, threadCount); + BlockingWorkerPool pool = new BlockingWorkerPool<>(exec, threadCount); for (final MutableRange range : getRanges(threadCount, batchSize)) { if (Thread.currentThread().isInterrupted()) { log.info( diff --git a/changelog/@unreleased/pr-6773.v2.yml b/changelog/@unreleased/pr-6773.v2.yml new file mode 100644 index 00000000000..9f34aa99c46 --- /dev/null +++ b/changelog/@unreleased/pr-6773.v2.yml @@ -0,0 +1,8 @@ +type: feature +feature: + description: Adds optional poolQosSize config to DBKvs DdlConfig which controls + the max concurrency of individual parallel operations such as multiput, truncateTables, + and read operations on postgres. The default value is no limit, so behavior is + not changing. + links: + - https://github.com/palantir/atlasdb/pull/6773