diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java index 2692a0b776b0..bcb85d24ecdc 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java @@ -35,10 +35,12 @@ import static org.apache.hadoop.hbase.util.ConcurrentMapUtils.computeIfAbsent; import java.io.IOException; +import java.util.ArrayList; import java.util.Arrays; import java.util.HashSet; import java.util.Iterator; import java.util.LinkedHashMap; +import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; @@ -122,6 +124,26 @@ public boolean equals(Object obj) { } } + private static final class RegionLocationsFutureResult { + private final CompletableFuture future; + private final RegionLocations result; + private final Throwable e; + + public RegionLocationsFutureResult(CompletableFuture future, + RegionLocations result, Throwable e) { + this.future = future; + this.result = result; + this.e = e; + } + + public void complete() { + if (e != null) { + future.completeExceptionally(e); + } + future.complete(result); + } + } + private static final class TableCache { private final ConcurrentNavigableMap cache = @@ -148,18 +170,20 @@ public Optional getCandidate() { return allRequests.keySet().stream().filter(r -> !isPending(r)).findFirst(); } - public void clearCompletedRequests(RegionLocations locations) { + public List clearCompletedRequests(RegionLocations locations) { + List futureResultList = new ArrayList<>(); for (Iterator>> iter = allRequests.entrySet().iterator(); iter.hasNext();) { Map.Entry> entry = iter.next(); - if (tryComplete(entry.getKey(), entry.getValue(), locations)) { + if (tryComplete(entry.getKey(), entry.getValue(), locations, futureResultList)) { iter.remove(); } } + return futureResultList; } private boolean tryComplete(LocateRequest req, CompletableFuture future, - RegionLocations locations) { + RegionLocations locations, List futureResultList) { if (future.isDone()) { return true; } @@ -185,7 +209,7 @@ private boolean tryComplete(LocateRequest req, CompletableFuture futureResultList = new ArrayList<>(); synchronized (tableCache) { tableCache.pendingRequests.remove(req); - tableCache.clearCompletedRequests(addedLocs); + futureResultList.addAll(tableCache.clearCompletedRequests(addedLocs)); // Remove a complete locate request in a synchronized block, so the table cache must have // quota to send a candidate request. toSend = tableCache.getCandidate(); toSend.ifPresent(r -> tableCache.send(r)); } + futureResultList.forEach(RegionLocationsFutureResult::complete); toSend.ifPresent(r -> locateInMeta(tableName, r)); } else { // we meet an error assert error != null; + List futureResultList = new ArrayList<>(); synchronized (tableCache) { tableCache.pendingRequests.remove(req); // fail the request itself, no matter whether it is a DoNotRetryIOException, as we have // already retried several times - CompletableFuture future = tableCache.allRequests.remove(req); + CompletableFuture future = tableCache.allRequests.remove(req); if (future != null) { - future.completeExceptionally(error); + futureResultList.add(new RegionLocationsFutureResult(future, null, error)); } - tableCache.clearCompletedRequests(null); + futureResultList.addAll(tableCache.clearCompletedRequests(null)); // Remove a complete locate request in a synchronized block, so the table cache must have // quota to send a candidate request. toSend = tableCache.getCandidate(); toSend.ifPresent(r -> tableCache.send(r)); } + futureResultList.forEach(RegionLocationsFutureResult::complete); toSend.ifPresent(r -> locateInMeta(tableName, r)); } } @@ -542,9 +570,11 @@ public void onNext(Result[] results, ScanController controller) { continue; } RegionLocations addedLocs = addToCache(tableCache, locs); + List futureResultList = new ArrayList<>(); synchronized (tableCache) { - tableCache.clearCompletedRequests(addedLocs); + futureResultList.addAll(tableCache.clearCompletedRequests(addedLocs)); } + futureResultList.forEach(RegionLocationsFutureResult::complete); } } } @@ -676,12 +706,16 @@ void clearCache(TableName tableName) { if (tableCache == null) { return; } + List futureResultList = new ArrayList<>(); synchronized (tableCache) { if (!tableCache.allRequests.isEmpty()) { IOException error = new IOException("Cache cleared"); - tableCache.allRequests.values().forEach(f -> f.completeExceptionally(error)); + tableCache.allRequests.values().forEach(f -> { + futureResultList.add(new RegionLocationsFutureResult(f, null, error)); + }); } } + futureResultList.forEach(RegionLocationsFutureResult::complete); conn.getConnectionMetrics() .ifPresent(metrics -> metrics.incrMetaCacheNumClearRegion(tableCache.cache.size())); }