From 3d82d2d9e7870e1aa14aa6c0a849e764a51547e3 Mon Sep 17 00:00:00 2001 From: wangzhi <1277975348@qq.com> Date: Tue, 7 Jun 2022 12:17:52 +0800 Subject: [PATCH] =?UTF-8?q?HBASE-27093=20AsyncNonMetaRegionLocator?= =?UTF-8?q?=EF=BC=9Aput=20Complete=20CompletableFuture=20outside=20lock=20?= =?UTF-8?q?block=20(#4496)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Duo Zhang (cherry picked from commit 176c43c5ad1aab01eb2d2b05c0cb90132e8d19b1) --- .../client/AsyncNonMetaRegionLocator.java | 54 +++++++++++++++---- 1 file changed, 44 insertions(+), 10 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java index df6c6b753ed7..0009415142c5 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java @@ -35,10 +35,12 @@ import static org.apache.hadoop.hbase.util.ConcurrentMapUtils.computeIfAbsent; import java.io.IOException; +import java.util.ArrayList; import java.util.Arrays; import java.util.HashSet; import java.util.Iterator; import java.util.LinkedHashMap; +import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; @@ -122,6 +124,26 @@ public boolean equals(Object obj) { } } + private static final class RegionLocationsFutureResult { + private final CompletableFuture future; + private final RegionLocations result; + private final Throwable e; + + public RegionLocationsFutureResult(CompletableFuture future, + RegionLocations result, Throwable e) { + this.future = future; + this.result = result; + this.e = e; + } + + public void complete() { + if (e != null) { + future.completeExceptionally(e); + } + future.complete(result); + } + } + private static final class TableCache { private final ConcurrentNavigableMap cache = @@ -148,18 +170,20 @@ public Optional getCandidate() { return allRequests.keySet().stream().filter(r -> !isPending(r)).findFirst(); } - public void clearCompletedRequests(RegionLocations locations) { + public List clearCompletedRequests(RegionLocations locations) { + List futureResultList = new ArrayList<>(); for (Iterator>> iter = allRequests.entrySet().iterator(); iter.hasNext();) { Map.Entry> entry = iter.next(); - if (tryComplete(entry.getKey(), entry.getValue(), locations)) { + if (tryComplete(entry.getKey(), entry.getValue(), locations, futureResultList)) { iter.remove(); } } + return futureResultList; } private boolean tryComplete(LocateRequest req, CompletableFuture future, - RegionLocations locations) { + RegionLocations locations, List futureResultList) { if (future.isDone()) { return true; } @@ -185,7 +209,7 @@ private boolean tryComplete(LocateRequest req, CompletableFuture futureResultList = new ArrayList<>(); synchronized (tableCache) { tableCache.pendingRequests.remove(req); - tableCache.clearCompletedRequests(addedLocs); + futureResultList.addAll(tableCache.clearCompletedRequests(addedLocs)); // Remove a complete locate request in a synchronized block, so the table cache must have // quota to send a candidate request. toSend = tableCache.getCandidate(); toSend.ifPresent(r -> tableCache.send(r)); } + futureResultList.forEach(RegionLocationsFutureResult::complete); toSend.ifPresent(r -> locateInMeta(tableName, r)); } else { // we meet an error assert error != null; + List futureResultList = new ArrayList<>(); synchronized (tableCache) { tableCache.pendingRequests.remove(req); // fail the request itself, no matter whether it is a DoNotRetryIOException, as we have // already retried several times - CompletableFuture future = tableCache.allRequests.remove(req); + CompletableFuture future = tableCache.allRequests.remove(req); if (future != null) { - future.completeExceptionally(error); + futureResultList.add(new RegionLocationsFutureResult(future, null, error)); } - tableCache.clearCompletedRequests(null); + futureResultList.addAll(tableCache.clearCompletedRequests(null)); // Remove a complete locate request in a synchronized block, so the table cache must have // quota to send a candidate request. toSend = tableCache.getCandidate(); toSend.ifPresent(r -> tableCache.send(r)); } + futureResultList.forEach(RegionLocationsFutureResult::complete); toSend.ifPresent(r -> locateInMeta(tableName, r)); } } @@ -543,9 +571,11 @@ public void onNext(Result[] results, ScanController controller) { continue; } RegionLocations addedLocs = addToCache(tableCache, locs); + List futureResultList = new ArrayList<>(); synchronized (tableCache) { - tableCache.clearCompletedRequests(addedLocs); + futureResultList.addAll(tableCache.clearCompletedRequests(addedLocs)); } + futureResultList.forEach(RegionLocationsFutureResult::complete); } } } @@ -677,12 +707,16 @@ void clearCache(TableName tableName) { if (tableCache == null) { return; } + List futureResultList = new ArrayList<>(); synchronized (tableCache) { if (!tableCache.allRequests.isEmpty()) { IOException error = new IOException("Cache cleared"); - tableCache.allRequests.values().forEach(f -> f.completeExceptionally(error)); + tableCache.allRequests.values().forEach(f -> { + futureResultList.add(new RegionLocationsFutureResult(f, null, error)); + }); } } + futureResultList.forEach(RegionLocationsFutureResult::complete); conn.getConnectionMetrics() .ifPresent(metrics -> metrics.incrMetaCacheNumClearRegion(tableCache.cache.size())); }