Skip to content

Commit

Permalink
HBASE-27093 AsyncNonMetaRegionLocator:put Complete CompletableFuture …
Browse files Browse the repository at this point in the history
…outside lock block (#4496)

Signed-off-by: Duo Zhang <[email protected]>
(cherry picked from commit 176c43c)
  • Loading branch information
xiaowangzhixiao authored and Apache9 committed Jun 7, 2022
1 parent f51e247 commit 3d82d2d
Showing 1 changed file with 44 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,12 @@
import static org.apache.hadoop.hbase.util.ConcurrentMapUtils.computeIfAbsent;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
Expand Down Expand Up @@ -122,6 +124,26 @@ public boolean equals(Object obj) {
}
}

private static final class RegionLocationsFutureResult {
private final CompletableFuture<RegionLocations> future;
private final RegionLocations result;
private final Throwable e;

public RegionLocationsFutureResult(CompletableFuture<RegionLocations> future,
RegionLocations result, Throwable e) {
this.future = future;
this.result = result;
this.e = e;
}

public void complete() {
if (e != null) {
future.completeExceptionally(e);
}
future.complete(result);
}
}

private static final class TableCache {

private final ConcurrentNavigableMap<byte[], RegionLocations> cache =
Expand All @@ -148,18 +170,20 @@ public Optional<LocateRequest> getCandidate() {
return allRequests.keySet().stream().filter(r -> !isPending(r)).findFirst();
}

public void clearCompletedRequests(RegionLocations locations) {
public List<RegionLocationsFutureResult> clearCompletedRequests(RegionLocations locations) {
List<RegionLocationsFutureResult> futureResultList = new ArrayList<>();
for (Iterator<Map.Entry<LocateRequest, CompletableFuture<RegionLocations>>> iter =
allRequests.entrySet().iterator(); iter.hasNext();) {
Map.Entry<LocateRequest, CompletableFuture<RegionLocations>> entry = iter.next();
if (tryComplete(entry.getKey(), entry.getValue(), locations)) {
if (tryComplete(entry.getKey(), entry.getValue(), locations, futureResultList)) {
iter.remove();
}
}
return futureResultList;
}

private boolean tryComplete(LocateRequest req, CompletableFuture<RegionLocations> future,
RegionLocations locations) {
RegionLocations locations, List<RegionLocationsFutureResult> futureResultList) {
if (future.isDone()) {
return true;
}
Expand All @@ -185,7 +209,7 @@ private boolean tryComplete(LocateRequest req, CompletableFuture<RegionLocations
completed = loc.getRegion().containsRow(req.row);
}
if (completed) {
future.complete(locations);
futureResultList.add(new RegionLocationsFutureResult(future, locations, null));
return true;
} else {
return false;
Expand Down Expand Up @@ -320,32 +344,36 @@ private void complete(TableName tableName, LocateRequest req, RegionLocations lo
TableCache tableCache = getTableCache(tableName);
if (locs != null) {
RegionLocations addedLocs = addToCache(tableCache, locs);
List<RegionLocationsFutureResult> futureResultList = new ArrayList<>();
synchronized (tableCache) {
tableCache.pendingRequests.remove(req);
tableCache.clearCompletedRequests(addedLocs);
futureResultList.addAll(tableCache.clearCompletedRequests(addedLocs));
// Remove a complete locate request in a synchronized block, so the table cache must have
// quota to send a candidate request.
toSend = tableCache.getCandidate();
toSend.ifPresent(r -> tableCache.send(r));
}
futureResultList.forEach(RegionLocationsFutureResult::complete);
toSend.ifPresent(r -> locateInMeta(tableName, r));
} else {
// we meet an error
assert error != null;
List<RegionLocationsFutureResult> futureResultList = new ArrayList<>();
synchronized (tableCache) {
tableCache.pendingRequests.remove(req);
// fail the request itself, no matter whether it is a DoNotRetryIOException, as we have
// already retried several times
CompletableFuture<?> future = tableCache.allRequests.remove(req);
CompletableFuture<RegionLocations> future = tableCache.allRequests.remove(req);
if (future != null) {
future.completeExceptionally(error);
futureResultList.add(new RegionLocationsFutureResult(future, null, error));
}
tableCache.clearCompletedRequests(null);
futureResultList.addAll(tableCache.clearCompletedRequests(null));
// Remove a complete locate request in a synchronized block, so the table cache must have
// quota to send a candidate request.
toSend = tableCache.getCandidate();
toSend.ifPresent(r -> tableCache.send(r));
}
futureResultList.forEach(RegionLocationsFutureResult::complete);
toSend.ifPresent(r -> locateInMeta(tableName, r));
}
}
Expand Down Expand Up @@ -543,9 +571,11 @@ public void onNext(Result[] results, ScanController controller) {
continue;
}
RegionLocations addedLocs = addToCache(tableCache, locs);
List<RegionLocationsFutureResult> futureResultList = new ArrayList<>();
synchronized (tableCache) {
tableCache.clearCompletedRequests(addedLocs);
futureResultList.addAll(tableCache.clearCompletedRequests(addedLocs));
}
futureResultList.forEach(RegionLocationsFutureResult::complete);
}
}
}
Expand Down Expand Up @@ -677,12 +707,16 @@ void clearCache(TableName tableName) {
if (tableCache == null) {
return;
}
List<RegionLocationsFutureResult> futureResultList = new ArrayList<>();
synchronized (tableCache) {
if (!tableCache.allRequests.isEmpty()) {
IOException error = new IOException("Cache cleared");
tableCache.allRequests.values().forEach(f -> f.completeExceptionally(error));
tableCache.allRequests.values().forEach(f -> {
futureResultList.add(new RegionLocationsFutureResult(f, null, error));
});
}
}
futureResultList.forEach(RegionLocationsFutureResult::complete);
conn.getConnectionMetrics()
.ifPresent(metrics -> metrics.incrMetaCacheNumClearRegion(tableCache.cache.size()));
}
Expand Down

0 comments on commit 3d82d2d

Please sign in to comment.