From 4101d9369ae5031a683fc766042a8b6cd9a1ab27 Mon Sep 17 00:00:00 2001 From: haxiaolin Date: Thu, 2 Mar 2023 17:26:54 +0800 Subject: [PATCH] HBASE-25709 Close region may stuck when region is compacting and skipped most cells read --- .../org/apache/hadoop/hbase/HConstants.java | 5 ++ .../hbase/mob/DefaultMobStoreCompactor.java | 6 +- .../regionserver/compactions/Compactor.java | 9 ++- .../hbase/mob/FaultyMobStoreCompactor.java | 6 +- .../hbase/regionserver/TestCompaction.java | 1 + .../hbase/regionserver/TestHRegion.java | 69 +++++++++++++++++++ 6 files changed, 90 insertions(+), 6 deletions(-) diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java index 26c105b40cb6..d56607653fca 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java @@ -326,6 +326,11 @@ public enum OperationStatusCode { public static final String COMPACTION_KV_MAX = "hbase.hstore.compaction.kv.max"; public static final int COMPACTION_KV_MAX_DEFAULT = 10; + /** Parameter name for the scanner size limit to be used in compactions */ + public static final String COMPACTION_SCANNER_SIZE_MAX = + "hbase.hstore.compaction.scanner.size.limit"; + public static final long COMPACTION_SCANNER_SIZE_MAX_DEFAULT = 10 * 1024 * 1024L; // 10MB + /** Parameter name for HBase instance root directory */ public static final String HBASE_DIR = "hbase.rootdir"; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java index 695a632b243c..44f77b62ad8b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java @@ -345,8 +345,10 @@ protected boolean performCompaction(FileDetails fd, InternalScanner scanner, Cel long cellsSizeCompactedToMob = 0, cellsSizeCompactedFromMob = 0; boolean finished = false; - ScannerContext scannerContext = - ScannerContext.newBuilder().setBatchLimit(compactionKVMax).build(); + ScannerContext scannerContext = ScannerContext.newBuilder().setBatchLimit(compactionKVMax) + .setSizeLimit(ScannerContext.LimitScope.BETWEEN_CELLS, Long.MAX_VALUE, Long.MAX_VALUE, + compactScannerSizeLimit) + .build(); throughputController.start(compactionName); KeyValueScanner kvs = (scanner instanceof KeyValueScanner) ? (KeyValueScanner) scanner : null; long shippedCallSizeLimit = diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java index 363fa29909e0..d9ad265da64e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java @@ -83,6 +83,7 @@ public abstract class Compactor { protected final Configuration conf; protected final HStore store; protected final int compactionKVMax; + protected final long compactScannerSizeLimit; protected final Compression.Algorithm majorCompactionCompression; protected final Compression.Algorithm minorCompactionCompression; @@ -109,6 +110,8 @@ public abstract class Compactor { this.store = store; this.compactionKVMax = this.conf.getInt(HConstants.COMPACTION_KV_MAX, HConstants.COMPACTION_KV_MAX_DEFAULT); + this.compactScannerSizeLimit = this.conf.getLong(HConstants.COMPACTION_SCANNER_SIZE_MAX, + HConstants.COMPACTION_SCANNER_SIZE_MAX_DEFAULT); this.majorCompactionCompression = (store.getColumnFamilyDescriptor() == null) ? Compression.Algorithm.NONE : store.getColumnFamilyDescriptor().getMajorCompactionCompressionType(); @@ -429,8 +432,10 @@ protected boolean performCompaction(FileDetails fd, InternalScanner scanner, Cel String compactionName = ThroughputControlUtil.getNameForThrottling(store, "compaction"); long now = 0; boolean hasMore; - ScannerContext scannerContext = - ScannerContext.newBuilder().setBatchLimit(compactionKVMax).build(); + ScannerContext scannerContext = ScannerContext.newBuilder().setBatchLimit(compactionKVMax) + .setSizeLimit(ScannerContext.LimitScope.BETWEEN_CELLS, Long.MAX_VALUE, Long.MAX_VALUE, + compactScannerSizeLimit) + .build(); throughputController.start(compactionName); Shipper shipper = (scanner instanceof Shipper) ? (Shipper) scanner : null; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/FaultyMobStoreCompactor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/FaultyMobStoreCompactor.java index 96675fb69e5c..943a7acb6481 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/FaultyMobStoreCompactor.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/FaultyMobStoreCompactor.java @@ -142,8 +142,10 @@ protected boolean performCompaction(FileDetails fd, InternalScanner scanner, Cel long cellsSizeCompactedToMob = 0, cellsSizeCompactedFromMob = 0; boolean finished = false; - ScannerContext scannerContext = - ScannerContext.newBuilder().setBatchLimit(compactionKVMax).build(); + ScannerContext scannerContext = ScannerContext.newBuilder().setBatchLimit(compactionKVMax) + .setSizeLimit(ScannerContext.LimitScope.BETWEEN_CELLS, Long.MAX_VALUE, Long.MAX_VALUE, + compactScannerSizeLimit) + .build(); throughputController.start(compactionName); KeyValueScanner kvs = (scanner instanceof KeyValueScanner) ? (KeyValueScanner) scanner : null; long shippedCallSizeLimit = diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java index c4d5d35f7d88..c0bc72079cb7 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java @@ -121,6 +121,7 @@ public TestCompaction() { // Set cache flush size to 1MB conf.setInt(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 1024 * 1024); conf.setInt(HConstants.HREGION_MEMSTORE_BLOCK_MULTIPLIER, 100); + conf.setLong(HConstants.COMPACTION_SCANNER_SIZE_MAX, 10L); conf.set(CompactionThroughputControllerFactory.HBASE_THROUGHPUT_CONTROLLER_KEY, NoLimitThroughputController.class.getName()); compactionThreshold = conf.getInt("hbase.hstore.compactionThreshold", 3); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java index 637aab9ff5cf..4282d16fa6e1 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java @@ -6919,6 +6919,75 @@ public void testCellTTLs() throws IOException { assertNull(r.getValue(fam1, q1)); } + @Test + public void testTTLsUsingSmallHeartBeatCells() throws IOException { + IncrementingEnvironmentEdge edge = new IncrementingEnvironmentEdge(); + EnvironmentEdgeManager.injectEdge(edge); + + final byte[] row = Bytes.toBytes("testRow"); + final byte[] q1 = Bytes.toBytes("q1"); + final byte[] q2 = Bytes.toBytes("q2"); + final byte[] q3 = Bytes.toBytes("q3"); + final byte[] q4 = Bytes.toBytes("q4"); + final byte[] q5 = Bytes.toBytes("q5"); + final byte[] q6 = Bytes.toBytes("q6"); + final byte[] q7 = Bytes.toBytes("q7"); + final byte[] q8 = Bytes.toBytes("q8"); + + // 10 seconds + int ttlSecs = 10; + TableDescriptor tableDescriptor = + TableDescriptorBuilder.newBuilder(TableName.valueOf(name.getMethodName())).setColumnFamily( + ColumnFamilyDescriptorBuilder.newBuilder(fam1).setTimeToLive(ttlSecs).build()).build(); + + Configuration conf = new Configuration(TEST_UTIL.getConfiguration()); + conf.setInt(HFile.FORMAT_VERSION_KEY, HFile.MIN_FORMAT_VERSION_WITH_TAGS); + // using small heart beat cells + conf.setLong(StoreScanner.HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK, 2); + + region = HBaseTestingUtil.createRegionAndWAL( + RegionInfoBuilder.newBuilder(tableDescriptor.getTableName()).build(), + TEST_UTIL.getDataTestDir(), conf, tableDescriptor); + assertNotNull(region); + long now = EnvironmentEdgeManager.currentTime(); + // Add a cell that will expire in 5 seconds via cell TTL + region.put(new Put(row).addColumn(fam1, q1, now, HConstants.EMPTY_BYTE_ARRAY)); + region.put(new Put(row).addColumn(fam1, q2, now, HConstants.EMPTY_BYTE_ARRAY)); + region.put(new Put(row).addColumn(fam1, q3, now, HConstants.EMPTY_BYTE_ARRAY)); + // Add a cell that will expire after 10 seconds via family setting + region + .put(new Put(row).addColumn(fam1, q4, now + ttlSecs * 1000 + 1, HConstants.EMPTY_BYTE_ARRAY)); + region + .put(new Put(row).addColumn(fam1, q5, now + ttlSecs * 1000 + 1, HConstants.EMPTY_BYTE_ARRAY)); + + region.put(new Put(row).addColumn(fam1, q6, now, HConstants.EMPTY_BYTE_ARRAY)); + region.put(new Put(row).addColumn(fam1, q7, now, HConstants.EMPTY_BYTE_ARRAY)); + region + .put(new Put(row).addColumn(fam1, q8, now + ttlSecs * 1000 + 1, HConstants.EMPTY_BYTE_ARRAY)); + + // Flush so we are sure store scanning gets this right + region.flush(true); + + // A query at time T+0 should return all cells + checkScan(8); + region.delete(new Delete(row).addColumn(fam1, q8)); + + // Increment time to T+ttlSecs seconds + edge.incrementTime(ttlSecs * 1000); + checkScan(2); + } + + private void checkScan(int expectCellSize) throws IOException { + Scan s = new Scan().withStartRow(row); + ScannerContext.Builder contextBuilder = ScannerContext.newBuilder(true); + ScannerContext scannerContext = contextBuilder.build(); + RegionScanner scanner = region.getScanner(s); + List kvs = new ArrayList<>(); + scanner.next(kvs, scannerContext); + assertEquals(expectCellSize, kvs.size()); + scanner.close(); + } + @Test public void testIncrementTimestampsAreMonotonic() throws IOException { region = initHRegion(tableName, method, CONF, fam1);