Skip to content
This repository has been archived by the owner on Nov 14, 2024. It is now read-only.

Avoid creating thousands of get-ranges threads #5224

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,14 @@
import com.palantir.atlasdb.transaction.api.TransactionManager;
import com.palantir.atlasdb.transaction.api.TransactionTask;
import com.palantir.atlasdb.util.MetricsManager;
import com.palantir.common.concurrent.NamedThreadFactory;
import com.palantir.common.concurrent.PTExecutors;
import com.palantir.logsafe.Preconditions;
import java.util.concurrent.BlockingQueue;
import com.palantir.logsafe.SafeArg;
import java.util.List;
import java.util.concurrent.AbstractExecutorService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -92,35 +93,59 @@ public void clearTimestampCache() {

@SuppressWarnings("DangerousThreadPoolExecutorUsage")
ExecutorService createGetRangesExecutor(int numThreads) {
BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<Runnable>() {
ExecutorService executor = PTExecutors.newFixedThreadPool(
numThreads, AbstractTransactionManager.this.getClass().getSimpleName() + "-get-ranges");

return new AbstractExecutorService() {
private final AtomicInteger queueSizeEstimate = new AtomicInteger();
private final RateLimiter warningRateLimiter = RateLimiter.create(1);

@Override
public boolean offer(Runnable runnable) {
sanityCheckQueueSize();
return super.offer(runnable);
public void shutdown() {
executor.shutdown();
}

@Override
public List<Runnable> shutdownNow() {
return executor.shutdownNow();
}

@Override
public boolean isShutdown() {
return executor.isShutdown();
}

@Override
public boolean isTerminated() {
return executor.isTerminated();
}

@Override
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
return executor.awaitTermination(timeout, unit);
}

@Override
public void execute(Runnable command) {
sanityCheckAndIncrementQueueSize();
executor.execute(() -> {
queueSizeEstimate.getAndDecrement();
command.run();
});
}

private void sanityCheckQueueSize() {
int currentSize = this.size();
private void sanityCheckAndIncrementQueueSize() {
int currentSize = queueSizeEstimate.getAndIncrement();
if (currentSize >= GET_RANGES_QUEUE_SIZE_WARNING_THRESHOLD && warningRateLimiter.tryAcquire()) {
log.warn(
"You have {} pending getRanges tasks. Please sanity check both your level "
+ "of concurrency and size of batched range requests. If necessary you can "
+ "increase the value of concurrentGetRangesThreadPoolSize to allow for a larger "
+ "thread pool.",
currentSize);
SafeArg.of("currentSize", currentSize));
}
}
};
return new ThreadPoolExecutor(
numThreads,
numThreads,
0L,
TimeUnit.MILLISECONDS,
workQueue,
new NamedThreadFactory(
AbstractTransactionManager.this.getClass().getSimpleName() + "-get-ranges"));
}

@Override
Expand Down
5 changes: 5 additions & 0 deletions changelog/@unreleased/pr-5224.v2.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
type: fix
fix:
description: Avoid creating thousands of `serializabletransactionmanager-get-ranges` threads by using the efficient PTExecutors ExecutorService factory methods.
links:
- https://github.com/palantir/atlasdb/pull/5224