Skip to content
This repository has been archived by the owner on Nov 14, 2024. It is now read-only.

add QoS config for DBKvs parallel operations #6773

Merged
merged 2 commits into from
Oct 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;
import com.google.common.primitives.Ints;
import com.google.common.util.concurrent.Futures;
import com.palantir.atlasdb.AtlasDbConstants;
import com.palantir.atlasdb.AtlasDbPerformanceConstants;
import com.palantir.atlasdb.keyvalue.api.BatchColumnRangeSelection;
Expand All @@ -38,16 +37,14 @@
import com.palantir.atlasdb.keyvalue.api.TableReference;
import com.palantir.atlasdb.keyvalue.api.Value;
import com.palantir.common.base.ClosableIterator;
import com.palantir.common.concurrent.BlockingWorkerPool;
import com.palantir.common.concurrent.PTExecutors;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;

@SuppressFBWarnings("SLF4J_ILLEGAL_PASSED_CLASS")
public abstract class AbstractKeyValueService implements KeyValueService {
Expand All @@ -57,14 +54,20 @@ public abstract class AbstractKeyValueService implements KeyValueService {
.maximumWeight(1_000_000)
.build(AbstractKeyValueService::computeInternalTableName);

protected ExecutorService executor;
protected final ExecutorService executor;
protected final int executorQosSize;

/**
* Note: This takes ownership of the given executor. It will be shutdown when the key
* value service is closed.
*/
public AbstractKeyValueService(ExecutorService executor) {
this(executor, 0);
}

public AbstractKeyValueService(ExecutorService executor, int executorQosSize) {
this.executor = executor;
this.executorQosSize = executorQosSize;
}

/**
Expand Down Expand Up @@ -189,16 +192,17 @@ public void close() {

@Override
public void truncateTables(final Set<TableReference> tableRefs) {
List<Future<Void>> futures = new ArrayList<>();
for (final TableReference tableRef : tableRefs) {
futures.add(executor.submit(() -> {
truncateTable(tableRef);
return null;
}));
}

for (Future<Void> future : futures) {
Futures.getUnchecked(future);
BlockingWorkerPool<Void> pool = new BlockingWorkerPool<>(executor, executorQosSize);
try {
for (final TableReference tableRef : tableRefs) {
pool.submitTask(() -> {
truncateTable(tableRef);
});
}
pool.waitForSubmittedTasks();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ public static void getFirstBatchForRangeUsingGetRange(
final long timestamp,
int maxConcurrentRequests) {
final Map<RangeRequest, TokenBackedBasicResultsPage<RowResult<Value>, byte[]>> ret = new ConcurrentHashMap<>();
BlockingWorkerPool pool = new BlockingWorkerPool(executor, maxConcurrentRequests);
BlockingWorkerPool<Void> pool = new BlockingWorkerPool<>(executor, maxConcurrentRequests);
try {
for (final RangeRequest request : rangeRequests) {
pool.submitTask(() -> getFirstBatchForRangeUsingGetRange(kv, tableRef, request, timestamp, ret));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,29 @@

import com.google.common.base.Throwables;
import com.palantir.logsafe.Preconditions;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;

public class BlockingWorkerPool {
private final CompletionService<Void> service;
public class BlockingWorkerPool<T> {
private final CompletionService<T> service;
private final int concurrentTaskLimit;

private final AtomicInteger currentTaskCount = new AtomicInteger();

/**
* Construct a BlockingWorkerPool.
*
* @param executor The ExecutorService to use for tasks
* @param concurrentTaskLimit The limit for concurrently running tasks. If this value is 0 or negative, then no
* limit is enforced.
*/
public BlockingWorkerPool(ExecutorService executor, int concurrentTaskLimit) {
this.service = new ExecutorCompletionService<Void>(executor);
this.service = new ExecutorCompletionService<>(executor);
this.concurrentTaskLimit = concurrentTaskLimit;
}

Expand All @@ -47,17 +55,45 @@ public synchronized void submitTask(Runnable task) throws InterruptedException {
waitForAvailability();

Preconditions.checkState(
currentTaskCount.get() < concurrentTaskLimit, "currentTaskCount must be less than currentTaskLimit");
concurrentTaskLimit <= 0 || currentTaskCount.get() < concurrentTaskLimit,
"currentTaskCount must be less than currentTaskLimit");
service.submit(task, null);
currentTaskCount.incrementAndGet();
}

/**
* Submits a callable task to the pool. Behaves the same as {@link #submitTask(Runnable)}.
*/
public synchronized Future<T> submitCallable(Callable<T> task) throws InterruptedException {
waitForAvailability();

Preconditions.checkState(
concurrentTaskLimit <= 0 || currentTaskCount.get() < concurrentTaskLimit,
"currentTaskCount must be less than currentTaskLimit");
Future<T> result = service.submit(task);
currentTaskCount.incrementAndGet();
return result;
}

/**
* Same as {@link #submitCallable(Callable)} but will wrap any InterruptedException in a RuntimeException.
* If an InterruptedException was encountered, the thread interrupt flag will still be set.
*/
public Future<T> submitCallableUnchecked(Callable<T> task) {
try {
return submitCallable(task);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
}

private void waitForSingleTask() throws InterruptedException {
if (currentTaskCount.get() <= 0) {
return;
}

Future<Void> f = service.take();
Future<T> f = service.take();
currentTaskCount.decrementAndGet();
try {
f.get();
Expand All @@ -81,9 +117,10 @@ public synchronized void waitForSubmittedTasks() throws InterruptedException {

/**
* Waits until the number of tasks drops below the concurrent task limit.
* If the limit is 0 or negative, then there is no enforced limit and this will not wait.
*/
public synchronized void waitForAvailability() throws InterruptedException {
if (currentTaskCount.get() >= concurrentTaskLimit) {
if (concurrentTaskLimit > 0 && currentTaskCount.get() >= concurrentTaskLimit) {
waitForSingleTask();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,18 @@ public int poolSize() {
return 64;
}

/**
* If set to a positive number, this is the maximum number of concurrent tasks that may execute from a single
* parallel operation. The overall max concurrency is still limited by the {@link #poolSize()}. Parallel operations
* include multiPut, truncateTables, and getRanges (postgres only).
* <p>
* If this is 0 or negative, then no limit is enforced.
*/
@Value.Default
public int poolQosSize() {
return 0;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we're okay explicitly changing behavior we could probably make this default to use poolSize() instead of be hard-coded at 0, as that is probably a more reasonable default than being unlimited.

}

@Value.Default
public int fetchBatchSize() {
return 256;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@
import com.palantir.common.base.ClosableIterators;
import com.palantir.common.base.Throwables;
import com.palantir.common.collect.Maps2;
import com.palantir.common.concurrent.BlockingWorkerPool;
import com.palantir.common.concurrent.PTExecutors;
import com.palantir.common.concurrent.SharedFixedExecutors;
import com.palantir.exception.PalantirSqlException;
Expand Down Expand Up @@ -132,9 +133,7 @@
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
Expand Down Expand Up @@ -217,7 +216,8 @@ private static DbKvs createPostgres(
config,
tableFactory,
connections,
new ParallelTaskRunner(newFixedThreadPool(config.poolSize()), config.fetchBatchSize()),
new ParallelTaskRunner(
newFixedThreadPool(config.poolSize()), config.fetchBatchSize(), config.poolQosSize()),
(conns, tbl, ids) -> Collections.emptyMap(), // no overflow on postgres
new PostgresGetRange(prefixedTableNames, connections, tableMetadataCache),
new DbKvsGetCandidateCellsForSweeping(cellTsPairLoader));
Expand Down Expand Up @@ -266,7 +266,7 @@ private DbKvs(
OverflowValueLoader overflowValueLoader,
DbKvsGetRange getRangeStrategy,
DbKvsGetCandidateCellsForSweeping getCandidateCellsForSweepingStrategy) {
super(executor);
super(executor, config.poolQosSize());
this.config = config;
this.dbTables = dbTables;
this.connections = connections;
Expand Down Expand Up @@ -451,20 +451,15 @@ public void multiPut(Map<TableReference, ? extends Map<Cell, byte[]>> valuesByTa
}
}

List<Future<Void>> futures;
BlockingWorkerPool<Void> pool = new BlockingWorkerPool<>(executor, executorQosSize);
try {
futures = executor.invokeAll(callables);
} catch (InterruptedException e) {
throw Throwables.throwUncheckedException(e);
}
for (Future<Void> future : futures) {
try {
future.get();
} catch (InterruptedException e) {
throw Throwables.throwUncheckedException(e);
} catch (ExecutionException e) {
throw Throwables.rewrapAndThrowUncheckedException(e.getCause());
for (Callable<Void> callable : callables) {
pool.submitCallable(callable);
}
pool.waitForSubmittedTasks();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package com.palantir.atlasdb.keyvalue.dbkvs.impl.batch;

import com.google.common.base.Throwables;
import com.palantir.common.concurrent.BlockingWorkerPool;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
Expand All @@ -26,10 +27,28 @@
public class ParallelTaskRunner implements BatchingTaskRunner {
private final ExecutorService executor;
private final int batchSize;
private final int executorQosSize;

public ParallelTaskRunner(ExecutorService executor, int batchSize) {
this(executor, batchSize, 0);
}

/**
* Constructs a ParallelTaskRunner.
*
* @param executor the ExecutorService to use for running tasks
* @param batchSize the batchSize to pass into the {@link BatchingStrategy}.
* @param executorQosSize When set to a positive value, this is the maximum number of concurrent tasks that may run
* spawning from a single thread. Multiple threads may each call {@link #runTask} and each
* may have up to this number of tasks run concurrently (independently). If the executor has
* a bounded number of threads, that limit is still applies which may result in lower
* concurrency than this value. If this is set to zero or a negative number, then there is
* no limit for the number of concurrent tasks which originate from the same thread.
*/
public ParallelTaskRunner(ExecutorService executor, int batchSize, int executorQosSize) {
this.executor = executor;
this.batchSize = batchSize;
this.executorQosSize = executorQosSize;
}

@Override
Expand All @@ -40,8 +59,9 @@ public <InT, OutT> OutT runTask(
Function<InT, OutT> task) {
Iterable<? extends InT> batches = batchingStrategy.partitionIntoBatches(input, batchSize);
List<Future<OutT>> futures = new ArrayList<>();
BlockingWorkerPool<OutT> pool = new BlockingWorkerPool<>(executor, executorQosSize);
for (InT batch : batches) {
Future<OutT> future = executor.submit(() -> task.apply(batch));
Future<OutT> future = pool.submitCallableUnchecked(() -> task.apply(batch));
futures.add(future);
}
OutT result = resultAccumulatingStrategy.createEmptyResult();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ public RangeVisitor setProgressCounter(AtomicLong newCounter) {
}

public long visit(final Visitor visitor) throws InterruptedException {
BlockingWorkerPool pool = new BlockingWorkerPool(exec, threadCount);
BlockingWorkerPool<Void> pool = new BlockingWorkerPool<>(exec, threadCount);
for (final MutableRange range : getRanges()) {
pool.submitTask(() -> visitRange(visitor, range));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -525,7 +525,7 @@ private static class InterruptibleRangeExecutor {
}

void executeTask(String taskName, InterruptibleRangeTask task) throws InterruptedException {
BlockingWorkerPool pool = new BlockingWorkerPool(exec, threadCount);
BlockingWorkerPool<Void> pool = new BlockingWorkerPool<>(exec, threadCount);
for (final MutableRange range : getRanges(threadCount, batchSize)) {
if (Thread.currentThread().isInterrupted()) {
log.info(
Expand Down
8 changes: 8 additions & 0 deletions changelog/@unreleased/pr-6773.v2.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
type: feature
feature:
description: Adds optional poolQosSize config to DBKvs DdlConfig which controls
the max concurrency of individual parallel operations such as multiput, truncateTables,
and read operations on postgres. The default value is no limit, so behavior is
not changing.
links:
- https://github.com/palantir/atlasdb/pull/6773