diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java index 949cb9f54b55..f7fbb0692876 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java @@ -768,7 +768,11 @@ public boolean next(List outResult, ScannerContext scannerContext) throws // One last chance to break due to size limit. The INCLUDE* cases above already check // limit and continue. For the various filtered cases, we need to check because block // size limit may have been exceeded even if we don't add cells to result list. - if (scannerContext.checkSizeLimit(LimitScope.BETWEEN_CELLS)) { + // And when reaching the heartbeat cells, try to return from the loop. + if ( + scannerContext.checkSizeLimit(LimitScope.BETWEEN_CELLS) + || kvsScanned % cellsPerHeartbeatCheck == 0 + ) { return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues(); } } while ((cell = this.heap.peek()) != null); @@ -1065,10 +1069,9 @@ private void resetQueryMatcher(Cell lastTopKey) { if (cell == null) { cell = lastTopKey; } - if ((matcher.currentRow() == null) || !CellUtil.matchingRows(cell, matcher.currentRow())) { + // The setToNewRow will call reset internally + if (matcher.setToNewRow(cell)) { this.countPerRow = 0; - // The setToNewRow will call reset internally - matcher.setToNewRow(cell); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/ScanQueryMatcher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/ScanQueryMatcher.java index 614465c1827f..106b0018d029 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/ScanQueryMatcher.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/ScanQueryMatcher.java @@ -262,10 +262,16 @@ public void clearCurrentRow() { /** * Set the row when there is change in row */ - public void setToNewRow(Cell currentRow) { - this.currentRow = currentRow; - columns.reset(); - reset(); + public boolean setToNewRow(Cell currentRow) { + if ( + this.currentRow == null || this.rowComparator.compareRows(currentRow, this.currentRow) != 0 + ) { + this.currentRow = currentRow; + columns.reset(); + reset(); + return true; + } + return false; } public abstract boolean isUserScan(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java index 637aab9ff5cf..4282d16fa6e1 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java @@ -6919,6 +6919,75 @@ public void testCellTTLs() throws IOException { assertNull(r.getValue(fam1, q1)); } + @Test + public void testTTLsUsingSmallHeartBeatCells() throws IOException { + IncrementingEnvironmentEdge edge = new IncrementingEnvironmentEdge(); + EnvironmentEdgeManager.injectEdge(edge); + + final byte[] row = Bytes.toBytes("testRow"); + final byte[] q1 = Bytes.toBytes("q1"); + final byte[] q2 = Bytes.toBytes("q2"); + final byte[] q3 = Bytes.toBytes("q3"); + final byte[] q4 = Bytes.toBytes("q4"); + final byte[] q5 = Bytes.toBytes("q5"); + final byte[] q6 = Bytes.toBytes("q6"); + final byte[] q7 = Bytes.toBytes("q7"); + final byte[] q8 = Bytes.toBytes("q8"); + + // 10 seconds + int ttlSecs = 10; + TableDescriptor tableDescriptor = + TableDescriptorBuilder.newBuilder(TableName.valueOf(name.getMethodName())).setColumnFamily( + ColumnFamilyDescriptorBuilder.newBuilder(fam1).setTimeToLive(ttlSecs).build()).build(); + + Configuration conf = new Configuration(TEST_UTIL.getConfiguration()); + conf.setInt(HFile.FORMAT_VERSION_KEY, HFile.MIN_FORMAT_VERSION_WITH_TAGS); + // using small heart beat cells + conf.setLong(StoreScanner.HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK, 2); + + region = HBaseTestingUtil.createRegionAndWAL( + RegionInfoBuilder.newBuilder(tableDescriptor.getTableName()).build(), + TEST_UTIL.getDataTestDir(), conf, tableDescriptor); + assertNotNull(region); + long now = EnvironmentEdgeManager.currentTime(); + // Add a cell that will expire in 5 seconds via cell TTL + region.put(new Put(row).addColumn(fam1, q1, now, HConstants.EMPTY_BYTE_ARRAY)); + region.put(new Put(row).addColumn(fam1, q2, now, HConstants.EMPTY_BYTE_ARRAY)); + region.put(new Put(row).addColumn(fam1, q3, now, HConstants.EMPTY_BYTE_ARRAY)); + // Add a cell that will expire after 10 seconds via family setting + region + .put(new Put(row).addColumn(fam1, q4, now + ttlSecs * 1000 + 1, HConstants.EMPTY_BYTE_ARRAY)); + region + .put(new Put(row).addColumn(fam1, q5, now + ttlSecs * 1000 + 1, HConstants.EMPTY_BYTE_ARRAY)); + + region.put(new Put(row).addColumn(fam1, q6, now, HConstants.EMPTY_BYTE_ARRAY)); + region.put(new Put(row).addColumn(fam1, q7, now, HConstants.EMPTY_BYTE_ARRAY)); + region + .put(new Put(row).addColumn(fam1, q8, now + ttlSecs * 1000 + 1, HConstants.EMPTY_BYTE_ARRAY)); + + // Flush so we are sure store scanning gets this right + region.flush(true); + + // A query at time T+0 should return all cells + checkScan(8); + region.delete(new Delete(row).addColumn(fam1, q8)); + + // Increment time to T+ttlSecs seconds + edge.incrementTime(ttlSecs * 1000); + checkScan(2); + } + + private void checkScan(int expectCellSize) throws IOException { + Scan s = new Scan().withStartRow(row); + ScannerContext.Builder contextBuilder = ScannerContext.newBuilder(true); + ScannerContext scannerContext = contextBuilder.build(); + RegionScanner scanner = region.getScanner(s); + List kvs = new ArrayList<>(); + scanner.next(kvs, scannerContext); + assertEquals(expectCellSize, kvs.size()); + scanner.close(); + } + @Test public void testIncrementTimestampsAreMonotonic() throws IOException { region = initHRegion(tableName, method, CONF, fam1); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java index 2e999dfaa455..766a48e83259 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java @@ -1410,6 +1410,75 @@ public long getSmallestReadPoint(HStore store) { } } + @Test + public void testPreventLoopRead() throws Exception { + init(this.name.getMethodName()); + Configuration conf = HBaseConfiguration.create(); + // use small heart beat cells + conf.setLong(StoreScanner.HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK, 2); + IncrementingEnvironmentEdge edge = new IncrementingEnvironmentEdge(); + EnvironmentEdgeManager.injectEdge(edge); + byte[] r0 = Bytes.toBytes("row0"); + byte[] value0 = Bytes.toBytes("value0"); + byte[] value1 = Bytes.toBytes("value1"); + MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing(); + long ts = EnvironmentEdgeManager.currentTime(); + long seqId = 100; + init(name.getMethodName(), conf, TableDescriptorBuilder.newBuilder(TableName.valueOf(table)), + ColumnFamilyDescriptorBuilder.newBuilder(family).setTimeToLive(10).build(), + new MyStoreHook() { + @Override + public long getSmallestReadPoint(HStore store) { + return seqId + 3; + } + }); + // The cells having the value0 will be expired + store.add(createCell(r0, qf1, ts, seqId, value0), memStoreSizing); + store.add(createCell(r0, qf2, ts, seqId, value0), memStoreSizing); + store.add(createCell(r0, qf3, ts, seqId, value0), memStoreSizing); + store.add(createCell(r0, qf4, ts + 10000 + 1, seqId, value1), memStoreSizing); + store.add(createCell(r0, qf5, ts, seqId, value0), memStoreSizing); + store.add(createCell(r0, qf6, ts + 10000 + 1, seqId, value1), memStoreSizing); + + List myList = new ArrayList<>(); + Scan scan = new Scan().withStartRow(r0); + ScannerContext.Builder contextBuilder = ScannerContext.newBuilder(false); + // test normal scan, should return all the cells + ScannerContext scannerContext = contextBuilder.build(); + try (InternalScanner scanner = (InternalScanner) store.getScanner(scan, null, seqId + 3)) { + scanner.next(myList, scannerContext); + assertEquals(6, myList.size()); + } + + // test skip two ttl cells and return with empty results, default prevent loop skip is on + edge.incrementTime(10 * 1000); + scannerContext = contextBuilder.build(); + myList.clear(); + try (InternalScanner scanner = (InternalScanner) store.getScanner(scan, null, seqId + 3)) { + // r0 + scanner.next(myList, scannerContext); + assertEquals(0, myList.size()); + } + + // should scan all non-ttl expired cells by iterative next + int resultCells = 0; + try (InternalScanner scanner = (InternalScanner) store.getScanner(scan, null, seqId + 3)) { + boolean hasMore = true; + while (hasMore) { + myList.clear(); + hasMore = scanner.next(myList, scannerContext); + assertTrue(myList.size() < 6); + resultCells += myList.size(); + } + for (Cell c : myList) { + byte[] actualValue = CellUtil.cloneValue(c); + assertTrue("expected:" + Bytes.toStringBinary(value1) + ", actual:" + + Bytes.toStringBinary(actualValue), Bytes.equals(actualValue, value1)); + } + } + assertEquals(2, resultCells); + } + @Test public void testCreateScannerAndSnapshotConcurrently() throws IOException, InterruptedException { Configuration conf = HBaseConfiguration.create();