Skip to content

Commit

Permalink
Use ReadWriteLock for region scanner readpoint map
Browse files Browse the repository at this point in the history
  • Loading branch information
huiruan committed Nov 2, 2022
1 parent 748cad6 commit 83c7983
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,9 @@ public void setRestoredRegion(boolean restoredRegion) {
private final int miniBatchSize;

final ConcurrentHashMap<RegionScanner, Long> scannerReadPoints;
// Lock to manage concurrency between RegionScanner and getSmallestReadPoint
final ReentrantReadWriteLock scannerReadPointsLock = new ReentrantReadWriteLock();
final boolean useReadWriteLockForReadPoints;

/**
* The sequence ID that was enLongAddered when this region was opened.
Expand Down Expand Up @@ -446,18 +449,26 @@ public long getSmallestReadPoint() {
long minimumReadPoint;
// We need to ensure that while we are calculating the smallestReadPoint
// no new RegionScanners can grab a readPoint that we are unaware of.
// We achieve this by synchronizing on the scannerReadPoints object.
synchronized (scannerReadPoints) {
minimumReadPoint = mvcc.getReadPoint();
for (Long readPoint : this.scannerReadPoints.values()) {
if (readPoint < minimumReadPoint) {
minimumReadPoint = readPoint;
}
if (useReadWriteLockForReadPoints) {
scannerReadPointsLock.writeLock().lock();
try {
minimumReadPoint = calculateSmallestReadPoint();
} finally {
scannerReadPointsLock.writeLock().unlock();
}
} else {
// We achieve this by synchronizing on the scannerReadPoints object.
synchronized (scannerReadPoints) {
minimumReadPoint = calculateSmallestReadPoint();
}
}
return minimumReadPoint;
}

private long calculateSmallestReadPoint() {
return scannerReadPoints.values().stream().mapToLong(Long::longValue).min().orElse(0L);
}

/*
* Data structure of write state flags used coordinating flushes, compactions and closes.
*/
Expand Down Expand Up @@ -798,6 +809,13 @@ public HRegion(final HRegionFileSystem fs, final WAL wal, final Configuration co
}
this.rowLockWaitDuration = tmpRowLockDuration;

this.useReadWriteLockForReadPoints =
conf.getBoolean("hbase.readpoints.read.write.lock.enable", false);
if (LOG.isDebugEnabled()) {
LOG.debug("region = {}, useReadWriteLockForReadPoints = {}", getRegionInfo(),
useReadWriteLockForReadPoints);
}

this.isLoadingCfsOnDemandDefault = conf.getBoolean(LOAD_CFS_ON_DEMAND_CONFIG_KEY, true);
this.htableDescriptor = htd;
Set<byte[]> families = this.htableDescriptor.getColumnFamilyNames();
Expand Down Expand Up @@ -8145,7 +8163,7 @@ private void doAttachReplicateRegionReplicaAction(WALKeyImpl walKey, WALEdit wal
(3 * ClassSize.CONCURRENT_HASHMAP) + // lockedRows, scannerReadPoints, regionLockHolders
WriteState.HEAP_SIZE + // writestate
ClassSize.CONCURRENT_SKIPLISTMAP + ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY + // stores
(2 * ClassSize.REENTRANT_LOCK) + // lock, updatesLock
(3 * ClassSize.REENTRANT_LOCK) + // lock, updatesLock, scannerReadPointsLock
MultiVersionConcurrencyControl.FIXED_SIZE // mvcc
+ 2 * ClassSize.TREEMAP // maxSeqIdInStores, replicationScopes
+ 2 * ClassSize.ATOMIC_INTEGER // majorInProgress, minorInProgress
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,19 +130,34 @@ private static boolean hasNonce(HRegion region, long nonce) {
long mvccReadPoint = PackagePrivateFieldAccessor.getMvccReadPoint(scan);
this.scannerReadPoints = region.scannerReadPoints;
this.rsServices = region.getRegionServerServices();
synchronized (scannerReadPoints) {
if (mvccReadPoint > 0) {
this.readPt = mvccReadPoint;
} else if (hasNonce(region, nonce)) {
this.readPt = rsServices.getNonceManager().getMvccFromOperationContext(nonceGroup, nonce);
} else {
this.readPt = region.getReadPoint(isolationLevel);
if (region.useReadWriteLockForReadPoints) {
region.scannerReadPointsLock.readLock().lock();
try {
this.readPt = calculateReadPoint(isolationLevel, mvccReadPoint, nonceGroup, nonce);
scannerReadPoints.put(this, this.readPt);
} finally {
region.scannerReadPointsLock.readLock().unlock();
}
} else {
synchronized (scannerReadPoints) {
this.readPt = calculateReadPoint(isolationLevel, mvccReadPoint, nonceGroup, nonce);
scannerReadPoints.put(this, this.readPt);
}
scannerReadPoints.put(this, this.readPt);
}
initializeScanners(scan, additionalScanners);
}

private long calculateReadPoint(IsolationLevel isolationLevel, long mvccReadPoint,
long nonceGroup, long nonce) {
if (mvccReadPoint > 0) {
return mvccReadPoint;
}
if (hasNonce(region, nonce)) {
return rsServices.getNonceManager().getMvccFromOperationContext(nonceGroup, nonce);
}
return region.getReadPoint(isolationLevel);
}

private void initializeScanners(Scan scan, List<KeyValueScanner> additionalScanners)
throws IOException {
// Here we separate all scanners into two lists - scanner that provide data required
Expand Down

0 comments on commit 83c7983

Please sign in to comment.