From cfbf80d1c4153e2f15ea7b15acbfeb900c92bab3 Mon Sep 17 00:00:00 2001 From: Ruanhui <32773751+frostruan@users.noreply.github.com> Date: Thu, 3 Nov 2022 16:15:48 +0800 Subject: [PATCH] HBASE-27458 Use ReadWriteLock for region scanner readpoint map (#4859) Co-authored-by: huiruan Signed-off-by: Duo Zhang --- .../hadoop/hbase/regionserver/HRegion.java | 19 ++-- .../ReadPointCalculationLock.java | 90 +++++++++++++++++++ .../hbase/regionserver/RegionScannerImpl.java | 5 +- 3 files changed, 105 insertions(+), 9 deletions(-) create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReadPointCalculationLock.java diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 83fe1bf67d21..08987ea6b194 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -399,6 +399,7 @@ public void setRestoredRegion(boolean restoredRegion) { private final int miniBatchSize; final ConcurrentHashMap scannerReadPoints; + final ReadPointCalculationLock smallestReadPointCalcLock; /** * The sequence ID that was enLongAddered when this region was opened. @@ -443,19 +444,18 @@ public void setRestoredRegion(boolean restoredRegion) { * this readPoint, are included in every read operation. */ public long getSmallestReadPoint() { - long minimumReadPoint; // We need to ensure that while we are calculating the smallestReadPoint // no new RegionScanners can grab a readPoint that we are unaware of. - // We achieve this by synchronizing on the scannerReadPoints object. - synchronized (scannerReadPoints) { - minimumReadPoint = mvcc.getReadPoint(); + smallestReadPointCalcLock.lock(ReadPointCalculationLock.LockType.CALCULATION_LOCK); + try { + long minimumReadPoint = mvcc.getReadPoint(); for (Long readPoint : this.scannerReadPoints.values()) { - if (readPoint < minimumReadPoint) { - minimumReadPoint = readPoint; - } + minimumReadPoint = Math.min(minimumReadPoint, readPoint); } + return minimumReadPoint; + } finally { + smallestReadPointCalcLock.unlock(ReadPointCalculationLock.LockType.CALCULATION_LOCK); } - return minimumReadPoint; } /* @@ -798,6 +798,8 @@ public HRegion(final HRegionFileSystem fs, final WAL wal, final Configuration co } this.rowLockWaitDuration = tmpRowLockDuration; + this.smallestReadPointCalcLock = new ReadPointCalculationLock(conf); + this.isLoadingCfsOnDemandDefault = conf.getBoolean(LOAD_CFS_ON_DEMAND_CONFIG_KEY, true); this.htableDescriptor = htd; Set families = this.htableDescriptor.getColumnFamilyNames(); @@ -8140,6 +8142,7 @@ private void doAttachReplicateRegionReplicaAction(WALKeyImpl walKey, WALEdit wal // 1 x RegionSplitPolicy - splitPolicy // 1 x MetricsRegion - metricsRegion // 1 x MetricsRegionWrapperImpl - metricsRegionWrapper + // 1 x ReadPointCalculationLock - smallestReadPointCalcLock public static final long DEEP_OVERHEAD = FIXED_OVERHEAD + ClassSize.OBJECT + // closeLock (2 * ClassSize.ATOMIC_BOOLEAN) + // closed, closing (3 * ClassSize.ATOMIC_LONG) + // numPutsWithoutWAL, dataInMemoryWithoutWAL, diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReadPointCalculationLock.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReadPointCalculationLock.java new file mode 100644 index 000000000000..380b82de93a6 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReadPointCalculationLock.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import org.apache.hadoop.conf.Configuration; +import org.apache.yetus.audience.InterfaceAudience; + +/** + * Lock to manage concurrency between {@link RegionScanner} and + * {@link HRegion#getSmallestReadPoint()}. We need to ensure that while we are calculating the + * smallest read point, no new scanners can modify the scannerReadPoints Map. We used to achieve + * this by synchronizing on the scannerReadPoints object. But this may block the read thread and + * reduce the read performance. Since the scannerReadPoints object is a + * {@link java.util.concurrent.ConcurrentHashMap}, which is thread-safe, so the + * {@link RegionScanner} can record their read points concurrently, what it needs to do is just + * acquiring a shared lock. When we calculate the smallest read point, we need to acquire an + * exclusive lock. This can improve read performance in most scenarios, only not when we have a lot + * of delta operations, like {@link org.apache.hadoop.hbase.client.Append} or + * {@link org.apache.hadoop.hbase.client.Increment}. So we introduce a flag to enable/disable this + * feature. + */ +@InterfaceAudience.Private +public class ReadPointCalculationLock { + + public enum LockType { + CALCULATION_LOCK, + RECORDING_LOCK + } + + private final boolean useReadWriteLockForReadPoints; + private Lock lock; + private ReadWriteLock readWriteLock; + + ReadPointCalculationLock(Configuration conf) { + this.useReadWriteLockForReadPoints = + conf.getBoolean("hbase.region.readpoints.read.write.lock.enable", false); + if (useReadWriteLockForReadPoints) { + readWriteLock = new ReentrantReadWriteLock(); + } else { + lock = new ReentrantLock(); + } + } + + void lock(LockType lockType) { + if (useReadWriteLockForReadPoints) { + assert lock == null; + if (lockType == LockType.CALCULATION_LOCK) { + readWriteLock.writeLock().lock(); + } else { + readWriteLock.readLock().lock(); + } + } else { + assert readWriteLock == null; + lock.lock(); + } + } + + void unlock(LockType lockType) { + if (useReadWriteLockForReadPoints) { + assert lock == null; + if (lockType == LockType.CALCULATION_LOCK) { + readWriteLock.writeLock().unlock(); + } else { + readWriteLock.readLock().unlock(); + } + } else { + assert readWriteLock == null; + lock.unlock(); + } + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScannerImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScannerImpl.java index d010d71f6cf9..11d4c20f581b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScannerImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScannerImpl.java @@ -130,7 +130,8 @@ private static boolean hasNonce(HRegion region, long nonce) { long mvccReadPoint = PackagePrivateFieldAccessor.getMvccReadPoint(scan); this.scannerReadPoints = region.scannerReadPoints; this.rsServices = region.getRegionServerServices(); - synchronized (scannerReadPoints) { + region.smallestReadPointCalcLock.lock(ReadPointCalculationLock.LockType.RECORDING_LOCK); + try { if (mvccReadPoint > 0) { this.readPt = mvccReadPoint; } else if (hasNonce(region, nonce)) { @@ -139,6 +140,8 @@ private static boolean hasNonce(HRegion region, long nonce) { this.readPt = region.getReadPoint(isolationLevel); } scannerReadPoints.put(this, this.readPt); + } finally { + region.smallestReadPointCalcLock.unlock(ReadPointCalculationLock.LockType.RECORDING_LOCK); } initializeScanners(scan, additionalScanners); }