Skip to content

Commit

Permalink
HBASE-27650 Merging empty regions corrupts meta cache (branch-2) (apa…
Browse files Browse the repository at this point in the history
…che#5038)

Signed-off-by: Duo Zhang <[email protected]>
  • Loading branch information
bbeaudreault committed Feb 28, 2023
1 parent 890c89b commit 4b30d41
Show file tree
Hide file tree
Showing 9 changed files with 642 additions and 318 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,12 @@
import static org.apache.hadoop.hbase.HConstants.USE_META_REPLICAS;
import static org.apache.hadoop.hbase.HConstants.ZEROES;
import static org.apache.hadoop.hbase.TableName.META_TABLE_NAME;
import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.canUpdateOnError;
import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.createRegionLocations;
import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.isGood;
import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.removeRegionLocation;
import static org.apache.hadoop.hbase.client.ConnectionUtils.createClosestRowAfter;
import static org.apache.hadoop.hbase.client.ConnectionUtils.isEmptyStopRow;
import static org.apache.hadoop.hbase.client.RegionInfo.createRegionName;
import static org.apache.hadoop.hbase.client.RegionLocator.LOCATOR_META_REPLICAS_MODE;
import static org.apache.hadoop.hbase.util.Bytes.BYTES_COMPARATOR;
import static org.apache.hadoop.hbase.util.ConcurrentMapUtils.computeIfAbsent;

import java.io.IOException;
Expand All @@ -47,8 +44,6 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.ObjectUtils;
import org.apache.hadoop.hbase.CatalogReplicaMode;
Expand All @@ -66,8 +61,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hbase.thirdparty.com.google.common.base.Objects;

/**
* The asynchronous locator for regions other than meta.
*/
Expand Down Expand Up @@ -146,13 +139,15 @@ public void complete() {

private static final class TableCache {

private final ConcurrentNavigableMap<byte[], RegionLocations> cache =
new ConcurrentSkipListMap<>(BYTES_COMPARATOR);

private final Set<LocateRequest> pendingRequests = new HashSet<>();

private final Map<LocateRequest, CompletableFuture<RegionLocations>> allRequests =
new LinkedHashMap<>();
private final AsyncRegionLocationCache regionLocationCache;

public TableCache(TableName tableName) {
regionLocationCache = new AsyncRegionLocationCache(tableName);
}

public boolean hasQuota(int max) {
return pendingRequests.size() < max;
Expand Down Expand Up @@ -262,76 +257,7 @@ private boolean tryComplete(LocateRequest req, CompletableFuture<RegionLocations
}

private TableCache getTableCache(TableName tableName) {
return computeIfAbsent(cache, tableName, TableCache::new);
}

private boolean isEqual(RegionLocations locs1, RegionLocations locs2) {
HRegionLocation[] locArr1 = locs1.getRegionLocations();
HRegionLocation[] locArr2 = locs2.getRegionLocations();
if (locArr1.length != locArr2.length) {
return false;
}
for (int i = 0; i < locArr1.length; i++) {
// do not need to compare region info
HRegionLocation loc1 = locArr1[i];
HRegionLocation loc2 = locArr2[i];
if (loc1 == null) {
if (loc2 != null) {
return false;
}
} else {
if (loc2 == null) {
return false;
}
if (loc1.getSeqNum() != loc2.getSeqNum()) {
return false;
}
if (!Objects.equal(loc1.getServerName(), loc2.getServerName())) {
return false;
}
}
}
return true;
}

// if we successfully add the locations to cache, return the locations, otherwise return the one
// which prevents us being added. The upper layer can use this value to complete pending requests.
private RegionLocations addToCache(TableCache tableCache, RegionLocations locs) {
LOG.trace("Try adding {} to cache", locs);
byte[] startKey = locs.getRegionLocation().getRegion().getStartKey();
for (;;) {
RegionLocations oldLocs = tableCache.cache.putIfAbsent(startKey, locs);
if (oldLocs == null) {
return locs;
}
// check whether the regions are the same, this usually happens when table is split/merged, or
// deleted and recreated again.
RegionInfo region = locs.getRegionLocation().getRegion();
RegionInfo oldRegion = oldLocs.getRegionLocation().getRegion();
if (region.getEncodedName().equals(oldRegion.getEncodedName())) {
RegionLocations mergedLocs = oldLocs.mergeLocations(locs);
if (isEqual(mergedLocs, oldLocs)) {
// the merged one is the same with the old one, give up
LOG.trace("Will not add {} to cache because the old value {} "
+ " is newer than us or has the same server name."
+ " Maybe it is updated before we replace it", locs, oldLocs);
return oldLocs;
}
if (tableCache.cache.replace(startKey, oldLocs, mergedLocs)) {
return mergedLocs;
}
} else {
// the region is different, here we trust the one we fetched. This maybe wrong but finally
// the upper layer can detect this and trigger removal of the wrong locations
if (LOG.isDebugEnabled()) {
LOG.debug("The newnly fetch region {} is different from the old one {} for row '{}',"
+ " try replaing the old one...", region, oldRegion, Bytes.toStringBinary(startKey));
}
if (tableCache.cache.replace(startKey, oldLocs, locs)) {
return locs;
}
}
}
return computeIfAbsent(cache, tableName, () -> new TableCache(tableName));
}

private void complete(TableName tableName, LocateRequest req, RegionLocations locs,
Expand All @@ -343,7 +269,7 @@ private void complete(TableName tableName, LocateRequest req, RegionLocations lo
Optional<LocateRequest> toSend = Optional.empty();
TableCache tableCache = getTableCache(tableName);
if (locs != null) {
RegionLocations addedLocs = addToCache(tableCache, locs);
RegionLocations addedLocs = tableCache.regionLocationCache.add(locs);
List<RegionLocationsFutureResult> futureResultList = new ArrayList<>();
synchronized (tableCache) {
tableCache.pendingRequests.remove(req);
Expand Down Expand Up @@ -421,62 +347,24 @@ private void recordCacheMiss() {
conn.getConnectionMetrics().ifPresent(MetricsConnection::incrMetaCacheMiss);
}

private RegionLocations locateRowInCache(TableCache tableCache, TableName tableName, byte[] row,
int replicaId) {
Map.Entry<byte[], RegionLocations> entry = tableCache.cache.floorEntry(row);
if (entry == null) {
recordCacheMiss();
return null;
}
RegionLocations locs = entry.getValue();
HRegionLocation loc = locs.getRegionLocation(replicaId);
if (loc == null) {
private RegionLocations locateRowInCache(TableCache tableCache, byte[] row, int replicaId) {
RegionLocations locs = tableCache.regionLocationCache.findForRow(row, replicaId);
if (locs == null) {
recordCacheMiss();
return null;
}
byte[] endKey = loc.getRegion().getEndKey();
if (isEmptyStopRow(endKey) || Bytes.compareTo(row, endKey) < 0) {
if (LOG.isTraceEnabled()) {
LOG.trace("Found {} in cache for {}, row='{}', locateType={}, replicaId={}", loc, tableName,
Bytes.toStringBinary(row), RegionLocateType.CURRENT, replicaId);
}
recordCacheHit();
return locs;
} else {
recordCacheMiss();
return null;
recordCacheHit();
}
return locs;
}

private RegionLocations locateRowBeforeInCache(TableCache tableCache, TableName tableName,
byte[] row, int replicaId) {
boolean isEmptyStopRow = isEmptyStopRow(row);
Map.Entry<byte[], RegionLocations> entry =
isEmptyStopRow ? tableCache.cache.lastEntry() : tableCache.cache.lowerEntry(row);
if (entry == null) {
private RegionLocations locateRowBeforeInCache(TableCache tableCache, byte[] row, int replicaId) {
RegionLocations locs = tableCache.regionLocationCache.findForBeforeRow(row, replicaId);
if (locs == null) {
recordCacheMiss();
return null;
}
RegionLocations locs = entry.getValue();
HRegionLocation loc = locs.getRegionLocation(replicaId);
if (loc == null) {
recordCacheMiss();
return null;
}
if (
isEmptyStopRow(loc.getRegion().getEndKey())
|| (!isEmptyStopRow && Bytes.compareTo(loc.getRegion().getEndKey(), row) >= 0)
) {
if (LOG.isTraceEnabled()) {
LOG.trace("Found {} in cache for {}, row='{}', locateType={}, replicaId={}", loc, tableName,
Bytes.toStringBinary(row), RegionLocateType.BEFORE, replicaId);
}
recordCacheHit();
return locs;
} else {
recordCacheMiss();
return null;
recordCacheHit();
}
return locs;
}

private void locateInMeta(TableName tableName, LocateRequest req) {
Expand Down Expand Up @@ -570,7 +458,7 @@ public void onNext(Result[] results, ScanController controller) {
if (info == null || info.isOffline() || info.isSplitParent()) {
continue;
}
RegionLocations addedLocs = addToCache(tableCache, locs);
RegionLocations addedLocs = tableCache.regionLocationCache.add(locs);
List<RegionLocationsFutureResult> futureResultList = new ArrayList<>();
synchronized (tableCache) {
futureResultList.addAll(tableCache.clearCompletedRequests(addedLocs));
Expand All @@ -582,11 +470,11 @@ public void onNext(Result[] results, ScanController controller) {
});
}

private RegionLocations locateInCache(TableCache tableCache, TableName tableName, byte[] row,
int replicaId, RegionLocateType locateType) {
private RegionLocations locateInCache(TableCache tableCache, byte[] row, int replicaId,
RegionLocateType locateType) {
return locateType.equals(RegionLocateType.BEFORE)
? locateRowBeforeInCache(tableCache, tableName, row, replicaId)
: locateRowInCache(tableCache, tableName, row, replicaId);
? locateRowBeforeInCache(tableCache, row, replicaId)
: locateRowInCache(tableCache, row, replicaId);
}

// locateToPrevious is true means we will use the start key of a region to locate the region
Expand All @@ -598,7 +486,7 @@ private CompletableFuture<RegionLocations> getRegionLocationsInternal(TableName
assert !locateType.equals(RegionLocateType.AFTER);
TableCache tableCache = getTableCache(tableName);
if (!reload) {
RegionLocations locs = locateInCache(tableCache, tableName, row, replicaId, locateType);
RegionLocations locs = locateInCache(tableCache, row, replicaId, locateType);
if (isGood(locs, replicaId)) {
return CompletableFuture.completedFuture(locs);
}
Expand All @@ -609,7 +497,7 @@ private CompletableFuture<RegionLocations> getRegionLocationsInternal(TableName
synchronized (tableCache) {
// check again
if (!reload) {
RegionLocations locs = locateInCache(tableCache, tableName, row, replicaId, locateType);
RegionLocations locs = locateInCache(tableCache, row, replicaId, locateType);
if (isGood(locs, replicaId)) {
return CompletableFuture.completedFuture(locs);
}
Expand Down Expand Up @@ -648,51 +536,33 @@ private void recordClearRegionCache() {

private void removeLocationFromCache(HRegionLocation loc) {
TableCache tableCache = cache.get(loc.getRegion().getTable());
if (tableCache == null) {
return;
}
byte[] startKey = loc.getRegion().getStartKey();
for (;;) {
RegionLocations oldLocs = tableCache.cache.get(startKey);
if (oldLocs == null) {
return;
}
HRegionLocation oldLoc = oldLocs.getRegionLocation(loc.getRegion().getReplicaId());
if (!canUpdateOnError(loc, oldLoc)) {
return;
}
// Tell metaReplicaSelector that the location is stale. It will create a stale entry
// with timestamp internally. Next time the client looks up the same location,
// it will pick a different meta replica region.
if (this.metaReplicaMode == CatalogReplicaMode.LOAD_BALANCE) {
metaReplicaSelector.onError(loc);
if (tableCache != null) {
if (tableCache.regionLocationCache.remove(loc)) {
recordClearRegionCache();
updateMetaReplicaSelector(loc);
}
}
}

RegionLocations newLocs = removeRegionLocation(oldLocs, loc.getRegion().getReplicaId());
if (newLocs == null) {
if (tableCache.cache.remove(startKey, oldLocs)) {
recordClearRegionCache();
return;
}
} else {
if (tableCache.cache.replace(startKey, oldLocs, newLocs)) {
recordClearRegionCache();
return;
}
}
private void updateMetaReplicaSelector(HRegionLocation loc) {
// Tell metaReplicaSelector that the location is stale. It will create a stale entry
// with timestamp internally. Next time the client looks up the same location,
// it will pick a different meta replica region.
if (metaReplicaMode == CatalogReplicaMode.LOAD_BALANCE) {
metaReplicaSelector.onError(loc);
}
}

void addLocationToCache(HRegionLocation loc) {
addToCache(getTableCache(loc.getRegion().getTable()), createRegionLocations(loc));
getTableCache(loc.getRegion().getTable()).regionLocationCache.add(createRegionLocations(loc));
}

private HRegionLocation getCachedLocation(HRegionLocation loc) {
TableCache tableCache = cache.get(loc.getRegion().getTable());
if (tableCache == null) {
return null;
}
RegionLocations locs = tableCache.cache.get(loc.getRegion().getStartKey());
RegionLocations locs = tableCache.regionLocationCache.get(loc.getRegion().getStartKey());
return locs != null ? locs.getRegionLocation(loc.getRegion().getReplicaId()) : null;
}

Expand All @@ -717,8 +587,8 @@ void clearCache(TableName tableName) {
}
}
futureResultList.forEach(RegionLocationsFutureResult::complete);
conn.getConnectionMetrics()
.ifPresent(metrics -> metrics.incrMetaCacheNumClearRegion(tableCache.cache.size()));
conn.getConnectionMetrics().ifPresent(
metrics -> metrics.incrMetaCacheNumClearRegion(tableCache.regionLocationCache.size()));
}

void clearCache() {
Expand All @@ -727,19 +597,7 @@ void clearCache() {

void clearCache(ServerName serverName) {
for (TableCache tableCache : cache.values()) {
for (Map.Entry<byte[], RegionLocations> entry : tableCache.cache.entrySet()) {
byte[] regionName = entry.getKey();
RegionLocations locs = entry.getValue();
RegionLocations newLocs = locs.removeByServer(serverName);
if (locs == newLocs) {
continue;
}
if (newLocs.isEmpty()) {
tableCache.cache.remove(regionName, locs);
} else {
tableCache.cache.replace(regionName, locs, newLocs);
}
}
tableCache.regionLocationCache.removeForServer(serverName);
}
}

Expand All @@ -749,6 +607,7 @@ RegionLocations getRegionLocationInCache(TableName tableName, byte[] row) {
if (tableCache == null) {
return null;
}
return locateRowInCache(tableCache, tableName, row, RegionReplicaUtil.DEFAULT_REPLICA_ID);
return locateRowInCache(tableCache, row, RegionReplicaUtil.DEFAULT_REPLICA_ID);
}

}
Loading

0 comments on commit 4b30d41

Please sign in to comment.