diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/NoLimitScannerContext.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/NoLimitScannerContext.java index 26d819214f1f..732723dfef61 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/NoLimitScannerContext.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/NoLimitScannerContext.java @@ -36,7 +36,7 @@ public class NoLimitScannerContext extends ScannerContext { public NoLimitScannerContext() { - super(false, null, false); + super(false, null, false, false); } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContext.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContext.java index 04eabb109ea7..6496632702d2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContext.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContext.java @@ -110,7 +110,17 @@ public class ScannerContext { */ final ServerSideScanMetrics metrics; - ScannerContext(boolean keepProgress, LimitFields limitsToCopy, boolean trackMetrics) { + /** + * Set this be on will make the scanners prematurely return if the kvs scanned reach the count + * limit of per heart beat, though the top cell on the store heap may not match the return rules, + * e.g. out of date by ttl, should be skipped by delete marker. + * It can make both user-scanners and compaction scanners be aborted as soon as possible when + * region is closing. See HBASE-25709. + */ + private boolean preventLoopReadEnabled = true; + + ScannerContext(boolean keepProgress, LimitFields limitsToCopy, boolean trackMetrics, + boolean preventLoopReadEnabled) { this.limits = new LimitFields(); if (limitsToCopy != null) { this.limits.copy(limitsToCopy); @@ -122,12 +132,17 @@ public class ScannerContext { this.keepProgress = keepProgress; this.scannerState = DEFAULT_STATE; this.metrics = trackMetrics ? new ServerSideScanMetrics() : null; + this.preventLoopReadEnabled = preventLoopReadEnabled; } public boolean isTrackingMetrics() { return this.metrics != null; } + public boolean isPreventLoopReadEnabled() { + return preventLoopReadEnabled; + } + /** * Get the metrics instance. Should only be called after a call to {@link #isTrackingMetrics()} * has been made to confirm that metrics are indeed being tracked. @@ -372,6 +387,7 @@ public static final class Builder { boolean keepProgress = DEFAULT_KEEP_PROGRESS; boolean trackMetrics = false; LimitFields limits = new LimitFields(); + boolean preventLoopReadEnabled = true; private Builder() { } @@ -407,9 +423,13 @@ public Builder setBatchLimit(int batchLimit) { limits.setBatch(batchLimit); return this; } + public Builder setPreventLoopReadEnabled(boolean enabled) { + this.preventLoopReadEnabled = enabled; + return this; + } public ScannerContext build() { - return new ScannerContext(keepProgress, limits, trackMetrics); + return new ScannerContext(keepProgress, limits, trackMetrics, preventLoopReadEnabled); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java index bd998770e7e4..35b4f0664fa4 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java @@ -751,6 +751,11 @@ public boolean next(List outResult, ScannerContext scannerContext) throws default: throw new RuntimeException("UNEXPECTED"); } + + // when reaching the heartbeat cells, try to return from the loop. + if (kvsScanned % cellsPerHeartbeatCheck == 0 && scannerContext.isPreventLoopReadEnabled()) { + return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues(); + } } while ((cell = this.heap.peek()) != null); if (count > 0) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java index 74f240a60198..aa87d8973d59 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java @@ -7030,6 +7030,73 @@ public void testCellTTLs() throws IOException { assertNull(r.getValue(fam1, q1)); } + @Test public void testTTLsUsingSmallHeartBeatCells() throws IOException { + IncrementingEnvironmentEdge edge = new IncrementingEnvironmentEdge(); + EnvironmentEdgeManager.injectEdge(edge); + + final byte[] row = Bytes.toBytes("testRow"); + final byte[] q1 = Bytes.toBytes("q1"); + final byte[] q2 = Bytes.toBytes("q2"); + final byte[] q3 = Bytes.toBytes("q3"); + final byte[] q4 = Bytes.toBytes("q4"); + final byte[] q5 = Bytes.toBytes("q5"); + final byte[] q6 = Bytes.toBytes("q6"); + final byte[] q7 = Bytes.toBytes("q7"); + final byte[] q8 = Bytes.toBytes("q8"); + + // 10 seconds + int ttlSecs = 10; + TableDescriptor tableDescriptor = + TableDescriptorBuilder.newBuilder(TableName.valueOf(name.getMethodName())).setColumnFamily( + ColumnFamilyDescriptorBuilder.newBuilder(fam1).setTimeToLive(ttlSecs).build()).build(); + + Configuration conf = new Configuration(TEST_UTIL.getConfiguration()); + conf.setInt(HFile.FORMAT_VERSION_KEY, HFile.MIN_FORMAT_VERSION_WITH_TAGS); + // using small heart beat cells + conf.setLong(StoreScanner.HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK, 2); + + region = HBaseTestingUtility + .createRegionAndWAL(RegionInfoBuilder.newBuilder(tableDescriptor.getTableName()).build(), + TEST_UTIL.getDataTestDir(), conf, tableDescriptor); + assertNotNull(region); + long now = EnvironmentEdgeManager.currentTime(); + // Add a cell that will expire in 5 seconds via cell TTL + region.put(new Put(row).addColumn(fam1, q1, now, HConstants.EMPTY_BYTE_ARRAY)); + region.put(new Put(row).addColumn(fam1, q2, now, HConstants.EMPTY_BYTE_ARRAY)); + region.put(new Put(row).addColumn(fam1, q3, now, HConstants.EMPTY_BYTE_ARRAY)); + // Add a cell that will expire after 10 seconds via family setting + region + .put(new Put(row).addColumn(fam1, q4, now + ttlSecs * 1000 + 1, HConstants.EMPTY_BYTE_ARRAY)); + region + .put(new Put(row).addColumn(fam1, q5, now + ttlSecs * 1000 + 1, HConstants.EMPTY_BYTE_ARRAY)); + + region.put(new Put(row).addColumn(fam1, q6, now, HConstants.EMPTY_BYTE_ARRAY)); + region.put(new Put(row).addColumn(fam1, q7, now, HConstants.EMPTY_BYTE_ARRAY)); + region + .put(new Put(row).addColumn(fam1, q8, now + ttlSecs * 1000 + 1, HConstants.EMPTY_BYTE_ARRAY)); + + // Flush so we are sure store scanning gets this right + region.flush(true); + + // A query at time T+0 should return all cells + checkScan(8); + + // Increment time to T+ttlSecs seconds + edge.incrementTime(ttlSecs * 1000); + checkScan(3); + } + + private void checkScan(int expectCellSize) throws IOException{ + Scan s = new Scan().withStartRow(row); + ScannerContext.Builder contextBuilder = ScannerContext.newBuilder(true); + ScannerContext scannerContext = contextBuilder.build(); + RegionScanner scanner = region.getScanner(s); + List kvs = new ArrayList<>(); + scanner.next(kvs, scannerContext); + assertEquals(expectCellSize, kvs.size()); + scanner.close(); + } + @Test public void testIncrementTimestampsAreMonotonic() throws IOException { region = initHRegion(tableName, method, CONF, fam1); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java index 20d7a29898ac..91b0aade7259 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java @@ -1283,6 +1283,92 @@ public long getSmallestReadPoint(HStore store) { } } + @Test + public void testPreventLoopRead() throws Exception { + init(this.name.getMethodName()); + Configuration conf = HBaseConfiguration.create(); + // use small heart beat cells + conf.setLong(StoreScanner.HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK, 2); + IncrementingEnvironmentEdge edge = new IncrementingEnvironmentEdge(); + EnvironmentEdgeManager.injectEdge(edge); + byte[] r0 = Bytes.toBytes("row0"); + byte[] value0 = Bytes.toBytes("value0"); + byte[] value1 = Bytes.toBytes("value1"); + MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing(); + long ts = EnvironmentEdgeManager.currentTime(); + long seqId = 100; + init(name.getMethodName(), conf, TableDescriptorBuilder.newBuilder(TableName.valueOf(table)), + ColumnFamilyDescriptorBuilder.newBuilder(family).setTimeToLive(10).build(), + new MyStoreHook() { + @Override public long getSmallestReadPoint(HStore store) { + return seqId + 3; + } + }); + // The cells having the value0 will be expired + store.add(createCell(r0, qf1, ts, seqId, value0), memStoreSizing); + store.add(createCell(r0, qf2, ts, seqId, value0), memStoreSizing); + store.add(createCell(r0, qf3, ts, seqId, value0), memStoreSizing); + store.add(createCell(r0, qf4, ts + 10000 + 1, seqId, value1), memStoreSizing); + store.add(createCell(r0, qf5, ts, seqId, value0), memStoreSizing); + store.add(createCell(r0, qf6, ts + 10000 + 1, seqId, value1), memStoreSizing); + + List myList = new ArrayList<>(); + Scan scan = new Scan().withStartRow(r0); + ScannerContext.Builder contextBuilder = ScannerContext.newBuilder(false); + // test normal scan, should return all the cells + ScannerContext scannerContext = contextBuilder.build(); + try (InternalScanner scanner = (InternalScanner) store.getScanner(scan, null, + seqId + 3)) { + scanner.next(myList, scannerContext); + assertEquals(6, myList.size()); + } + + // test skip two ttl cells and return with empty results, default prevent loop skip is on + edge.incrementTime(10 * 1000); + scannerContext = contextBuilder.build(); + myList.clear(); + try (InternalScanner scanner = (InternalScanner) store.getScanner(scan, null, + seqId + 3)) { + // r0 + scanner.next(myList, scannerContext); + assertEquals(0, myList.size()); + } + + // should scan all non-ttl expired cells by iterative next + int resultCells = 0; + try (InternalScanner scanner = (InternalScanner) store.getScanner(scan, null, + seqId + 3)) { + boolean hasMore = true; + while (hasMore) { + myList.clear(); + hasMore = scanner.next(myList, scannerContext); + assertTrue(myList.size() < 6); + resultCells += myList.size(); + } + for (Cell c : myList) { + byte[] actualValue = CellUtil.cloneValue(c); + assertTrue("expected:" + Bytes.toStringBinary(value1) + ", actual:" + Bytes + .toStringBinary(actualValue), Bytes.equals(actualValue, value1)); + } + } + assertEquals(2, resultCells); + + // set prevent loop skip off + scannerContext = contextBuilder.setPreventLoopReadEnabled(false).build(); + // should scan all cells not expired at one time + myList.clear(); + try (InternalScanner scanner = (InternalScanner) store.getScanner(scan, null, + seqId + 3)) { + scanner.next(myList, scannerContext); + assertEquals(2, myList.size()); + for (Cell c : myList) { + byte[] actualValue = CellUtil.cloneValue(c); + assertTrue("expected:" + Bytes.toStringBinary(value1) + ", actual:" + Bytes + .toStringBinary(actualValue), Bytes.equals(actualValue, value1)); + } + } + } + @Test public void testCreateScannerAndSnapshotConcurrently() throws IOException, InterruptedException { Configuration conf = HBaseConfiguration.create();