Skip to content

Commit

Permalink
review comment changes
Browse files Browse the repository at this point in the history
  • Loading branch information
virajjasani committed Oct 10, 2019
1 parent 1170f28 commit 9ade583
Show file tree
Hide file tree
Showing 16 changed files with 197 additions and 196 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ private static Optional<TableState> getTableState(Result r) throws IOException {
* {@link CompletableFuture}.
*/
public static CompletableFuture<List<HRegionLocation>> getTableHRegionLocations(
AsyncTable<AdvancedScanResultConsumer> metaTable, Optional<TableName> tableName) {
AsyncTable<AdvancedScanResultConsumer> metaTable, TableName tableName) {
CompletableFuture<List<HRegionLocation>> future = new CompletableFuture<>();
addListener(getTableRegionsAndLocations(metaTable, tableName, true), (locations, err) -> {
if (err != null) {
Expand All @@ -202,37 +202,39 @@ public static CompletableFuture<List<HRegionLocation>> getTableHRegionLocations(
* {@link CompletableFuture}.
*/
private static CompletableFuture<List<Pair<RegionInfo, ServerName>>> getTableRegionsAndLocations(
AsyncTable<AdvancedScanResultConsumer> metaTable, final Optional<TableName> tableName,
final boolean excludeOfflinedSplitParents) {
final AsyncTable<AdvancedScanResultConsumer> metaTable,
final TableName tableName, final boolean excludeOfflinedSplitParents) {
CompletableFuture<List<Pair<RegionInfo, ServerName>>> future = new CompletableFuture<>();
if (tableName.filter((t) -> t.equals(TableName.META_TABLE_NAME)).isPresent()) {
if (TableName.META_TABLE_NAME.equals(tableName)) {
future.completeExceptionally(new IOException(
"This method can't be used to locate meta regions;" + " use MetaTableLocator instead"));
}

// Make a version of CollectingVisitor that collects RegionInfo and ServerAddress
CollectingVisitor<Pair<RegionInfo, ServerName>> visitor = new CollectingVisitor<Pair<RegionInfo, ServerName>>() {
private Optional<RegionLocations> current = null;
CollectingVisitor<Pair<RegionInfo, ServerName>> visitor =
new CollectingVisitor<Pair<RegionInfo, ServerName>>() {
private RegionLocations current = null;

@Override
public boolean visit(Result r) throws IOException {
current = getRegionLocations(r);
if (!current.isPresent() || current.get().getRegionLocation().getRegion() == null) {
Optional<RegionLocations> currentRegionLocations = getRegionLocations(r);
current = currentRegionLocations.orElse(null);
if (current == null || current.getRegionLocation().getRegion() == null) {
LOG.warn("No serialized RegionInfo in " + r);
return true;
}
RegionInfo hri = current.get().getRegionLocation().getRegion();
RegionInfo hri = current.getRegionLocation().getRegion();
if (excludeOfflinedSplitParents && hri.isSplitParent()) return true;
// Else call super and add this Result to the collection.
return super.visit(r);
}

@Override
void add(Result r) {
if (!current.isPresent()) {
if (current == null) {
return;
}
for (HRegionLocation loc : current.get().getRegionLocations()) {
for (HRegionLocation loc : current.getRegionLocations()) {
if (loc != null) {
this.results.add(new Pair<RegionInfo, ServerName>(loc.getRegion(), loc
.getServerName()));
Expand All @@ -259,7 +261,7 @@ void add(Result r) {
* @param visitor Visitor invoked against each row
*/
private static CompletableFuture<Void> scanMeta(AsyncTable<AdvancedScanResultConsumer> metaTable,
Optional<TableName> tableName, QueryType type, final Visitor visitor) {
TableName tableName, QueryType type, final Visitor visitor) {
return scanMeta(metaTable, getTableStartRowForMeta(tableName, type),
getTableStopRowForMeta(tableName, type), type, Integer.MAX_VALUE, visitor);
}
Expand All @@ -274,15 +276,18 @@ private static CompletableFuture<Void> scanMeta(AsyncTable<AdvancedScanResultCon
* @param visitor Visitor invoked against each row
*/
private static CompletableFuture<Void> scanMeta(AsyncTable<AdvancedScanResultConsumer> metaTable,
Optional<byte[]> startRow, Optional<byte[]> stopRow, QueryType type, int maxRows,
final Visitor visitor) {
byte[] startRow, byte[] stopRow, QueryType type, int maxRows, final Visitor visitor) {
int rowUpperLimit = maxRows > 0 ? maxRows : Integer.MAX_VALUE;
Scan scan = getMetaScan(metaTable, rowUpperLimit);
for (byte[] family : type.getFamilies()) {
scan.addFamily(family);
}
startRow.ifPresent(scan::withStartRow);
stopRow.ifPresent(scan::withStopRow);
if (startRow != null) {
scan.withStartRow(startRow);
}
if (stopRow != null) {
scan.withStopRow(stopRow);
}

if (LOG.isDebugEnabled()) {
LOG.debug("Scanning META" + " starting at row=" + Bytes.toStringBinary(scan.getStartRow())
Expand Down Expand Up @@ -466,52 +471,56 @@ private static long getSeqNumDuringOpen(final Result r, final int replicaId) {
* @param tableName table we're working with
* @return start row for scanning META according to query type
*/
private static Optional<byte[]> getTableStartRowForMeta(Optional<TableName> tableName,
QueryType type) {
return tableName.map((table) -> {
switch (type) {
case REGION:
case REPLICATION:
byte[] startRow = new byte[table.getName().length + 2];
System.arraycopy(table.getName(), 0, startRow, 0, table.getName().length);
startRow[startRow.length - 2] = HConstants.DELIMITER;
startRow[startRow.length - 1] = HConstants.DELIMITER;
return startRow;
case ALL:
case TABLE:
default:
return table.getName();
private static byte[] getTableStartRowForMeta(TableName tableName, QueryType type) {
if (tableName == null) {
return null;
}
switch (type) {
case REGION:
case REPLICATION: {
byte[] startRow = new byte[tableName.getName().length + 2];
System.arraycopy(tableName.getName(), 0, startRow, 0, tableName.getName().length);
startRow[startRow.length - 2] = HConstants.DELIMITER;
startRow[startRow.length - 1] = HConstants.DELIMITER;
return startRow;
}
});
case ALL:
case TABLE:
default: {
return tableName.getName();
}
}
}

/**
* @param tableName table we're working with
* @return stop row for scanning META according to query type
*/
private static Optional<byte[]> getTableStopRowForMeta(Optional<TableName> tableName,
QueryType type) {
return tableName.map((table) -> {
final byte[] stopRow;
switch (type) {
case REGION:
case REPLICATION:
stopRow = new byte[table.getName().length + 3];
System.arraycopy(table.getName(), 0, stopRow, 0, table.getName().length);
stopRow[stopRow.length - 3] = ' ';
stopRow[stopRow.length - 2] = HConstants.DELIMITER;
stopRow[stopRow.length - 1] = HConstants.DELIMITER;
break;
case ALL:
case TABLE:
default:
stopRow = new byte[table.getName().length + 1];
System.arraycopy(table.getName(), 0, stopRow, 0, table.getName().length);
stopRow[stopRow.length - 1] = ' ';
break;
private static byte[] getTableStopRowForMeta(TableName tableName, QueryType type) {
if (tableName == null) {
return null;
}
final byte[] stopRow;
switch (type) {
case REGION:
case REPLICATION: {
stopRow = new byte[tableName.getName().length + 3];
System.arraycopy(tableName.getName(), 0, stopRow, 0, tableName.getName().length);
stopRow[stopRow.length - 3] = ' ';
stopRow[stopRow.length - 2] = HConstants.DELIMITER;
stopRow[stopRow.length - 1] = HConstants.DELIMITER;
break;
}
return stopRow;
});
case ALL:
case TABLE:
default: {
stopRow = new byte[tableName.getName().length + 1];
System.arraycopy(tableName.getName(), 0, stopRow, 0, tableName.getName().length);
stopRow[stopRow.length - 1] = ' ';
break;
}
}
return stopRow;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.removeRegionLocation;
import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.replaceRegionLocation;

import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.hadoop.hbase.HRegionLocation;
Expand Down Expand Up @@ -110,7 +109,7 @@ private void removeLocationFromCache(HRegionLocation loc) {

void updateCachedLocationOnError(HRegionLocation loc, Throwable exception) {
AsyncRegionLocatorHelper.updateCachedLocationOnError(loc, exception, this::getCacheLocation,
this::addLocationToCache, this::removeLocationFromCache, Optional.empty());
this::addLocationToCache, this::removeLocationFromCache, null);
}

void clearCache() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ public Optional<LocateRequest> getCandidate() {
return allRequests.keySet().stream().filter(r -> !isPending(r)).findFirst();
}

public void clearCompletedRequests(Optional<RegionLocations> locations) {
public void clearCompletedRequests(RegionLocations locations) {
for (Iterator<Map.Entry<LocateRequest, CompletableFuture<RegionLocations>>> iter =
allRequests.entrySet().iterator(); iter.hasNext();) {
Map.Entry<LocateRequest, CompletableFuture<RegionLocations>> entry = iter.next();
Expand All @@ -156,15 +156,14 @@ public void clearCompletedRequests(Optional<RegionLocations> locations) {
}

private boolean tryComplete(LocateRequest req, CompletableFuture<RegionLocations> future,
Optional<RegionLocations> locations) {
RegionLocations locations) {
if (future.isDone()) {
return true;
}
if (!locations.isPresent()) {
if (locations == null) {
return false;
}
RegionLocations locs = locations.get();
HRegionLocation loc = ObjectUtils.firstNonNull(locs.getRegionLocations());
HRegionLocation loc = ObjectUtils.firstNonNull(locations.getRegionLocations());
// we should at least have one location available, otherwise the request should fail and
// should not arrive here
assert loc != null;
Expand All @@ -183,7 +182,7 @@ private boolean tryComplete(LocateRequest req, CompletableFuture<RegionLocations
completed = loc.getRegion().containsRow(req.row);
}
if (completed) {
future.complete(locs);
future.complete(locations);
return true;
} else {
return false;
Expand Down Expand Up @@ -286,7 +285,7 @@ private void complete(TableName tableName, LocateRequest req, RegionLocations lo
RegionLocations addedLocs = addToCache(tableCache, locs);
synchronized (tableCache) {
tableCache.pendingRequests.remove(req);
tableCache.clearCompletedRequests(Optional.of(addedLocs));
tableCache.clearCompletedRequests(addedLocs);
// Remove a complete locate request in a synchronized block, so the table cache must have
// quota to send a candidate request.
toSend = tableCache.getCandidate();
Expand All @@ -304,7 +303,7 @@ private void complete(TableName tableName, LocateRequest req, RegionLocations lo
if (future != null) {
future.completeExceptionally(error);
}
tableCache.clearCompletedRequests(Optional.empty());
tableCache.clearCompletedRequests(null);
// Remove a complete locate request in a synchronized block, so the table cache must have
// quota to send a candidate request.
toSend = tableCache.getCandidate();
Expand Down Expand Up @@ -491,7 +490,7 @@ public void onNext(Result[] results, ScanController controller) {
}
RegionLocations addedLocs = addToCache(tableCache, locs);
synchronized (tableCache) {
tableCache.clearCompletedRequests(Optional.of(addedLocs));
tableCache.clearCompletedRequests(addedLocs);
}
}
}
Expand Down Expand Up @@ -604,8 +603,9 @@ private HRegionLocation getCachedLocation(HRegionLocation loc) {
}

void updateCachedLocationOnError(HRegionLocation loc, Throwable exception) {
Optional<MetricsConnection> connectionMetrics = conn.getConnectionMetrics();
AsyncRegionLocatorHelper.updateCachedLocationOnError(loc, exception, this::getCachedLocation,
this::addLocationToCache, this::removeLocationFromCache, conn.getConnectionMetrics());
this::addLocationToCache, this::removeLocationFromCache, connectionMetrics.orElse(null));
}

void clearCache(TableName tableName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import static org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil.isMetaClearingException;

import java.util.Arrays;
import java.util.Optional;
import java.util.function.Consumer;
import java.util.function.Function;
import org.apache.commons.lang3.ObjectUtils;
Expand Down Expand Up @@ -53,7 +52,7 @@ static boolean canUpdateOnError(HRegionLocation loc, HRegionLocation oldLoc) {
static void updateCachedLocationOnError(HRegionLocation loc, Throwable exception,
Function<HRegionLocation, HRegionLocation> cachedLocationSupplier,
Consumer<HRegionLocation> addToCache, Consumer<HRegionLocation> removeFromCache,
Optional<MetricsConnection> metrics) {
MetricsConnection metrics) {
HRegionLocation oldLoc = cachedLocationSupplier.apply(loc);
if (LOG.isDebugEnabled()) {
LOG.debug("Try updating {} , the old value is {}, error={}", loc, oldLoc,
Expand Down Expand Up @@ -81,7 +80,9 @@ static void updateCachedLocationOnError(HRegionLocation loc, Throwable exception
addToCache.accept(newLoc);
} else {
LOG.debug("Try removing {} from cache", loc);
metrics.ifPresent(m -> m.incrCacheDroppingExceptions(exception));
if (metrics != null) {
metrics.incrCacheDroppingExceptions(exception);
}
removeFromCache.accept(loc);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import org.apache.hadoop.hbase.AsyncMetaTableAccessor;
import org.apache.hadoop.hbase.HRegionLocation;
Expand Down Expand Up @@ -60,7 +59,7 @@ public CompletableFuture<List<HRegionLocation>> getAllRegionLocations() {
.thenApply(locs -> Arrays.asList(locs.getRegionLocations()));
}
return AsyncMetaTableAccessor.getTableHRegionLocations(conn.getTable(TableName.META_TABLE_NAME),
Optional.of(tableName));
tableName);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -724,7 +724,7 @@ public CompletableFuture<Boolean> isTableAvailable(TableName tableName) {
future.complete(false);
} else {
addListener(
AsyncMetaTableAccessor.getTableHRegionLocations(metaTable, Optional.of(tableName)),
AsyncMetaTableAccessor.getTableHRegionLocations(metaTable, tableName),
(locations, error1) -> {
if (error1 != null) {
future.completeExceptionally(error1);
Expand Down Expand Up @@ -849,7 +849,7 @@ public CompletableFuture<List<RegionInfo>> getRegions(TableName tableName) {
.thenApply(locs -> Stream.of(locs.getRegionLocations()).map(HRegionLocation::getRegion)
.collect(Collectors.toList()));
} else {
return AsyncMetaTableAccessor.getTableHRegionLocations(metaTable, Optional.of(tableName))
return AsyncMetaTableAccessor.getTableHRegionLocations(metaTable, tableName)
.thenApply(
locs -> locs.stream().map(HRegionLocation::getRegion).collect(Collectors.toList()));
}
Expand Down Expand Up @@ -1089,7 +1089,7 @@ private CompletableFuture<List<HRegionLocation>> getTableHRegionLocations(TableN
return future;
} else {
// For non-meta table, we fetch all locations by scanning hbase:meta table
return AsyncMetaTableAccessor.getTableHRegionLocations(metaTable, Optional.of(tableName));
return AsyncMetaTableAccessor.getTableHRegionLocations(metaTable, tableName);
}
}

Expand Down
Loading

0 comments on commit 9ade583

Please sign in to comment.