Skip to content

Commit

Permalink
HBASE-25709 Close region may stuck when region is compacting and skip…
Browse files Browse the repository at this point in the history
…ped most cells read (apache#3117)

Signed-off-by: Andrew Purtell <[email protected]>
  • Loading branch information
sunhelly authored Mar 7, 2022
1 parent fee9bb0 commit f3a48d1
Show file tree
Hide file tree
Showing 3 changed files with 144 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -756,6 +756,11 @@ public boolean next(List<Cell> outResult, ScannerContext scannerContext) throws
default:
throw new RuntimeException("UNEXPECTED");
}

// when reaching the heartbeat cells, try to return from the loop.
if (kvsScanned % cellsPerHeartbeatCheck == 0) {
return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues();
}
} while ((cell = this.heap.peek()) != null);

if (count > 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7047,6 +7047,74 @@ public void testCellTTLs() throws IOException {
assertNull(r.getValue(fam1, q1));
}

@Test
public void testTTLsUsingSmallHeartBeatCells() throws IOException {
IncrementingEnvironmentEdge edge = new IncrementingEnvironmentEdge();
EnvironmentEdgeManager.injectEdge(edge);

final byte[] row = Bytes.toBytes("testRow");
final byte[] q1 = Bytes.toBytes("q1");
final byte[] q2 = Bytes.toBytes("q2");
final byte[] q3 = Bytes.toBytes("q3");
final byte[] q4 = Bytes.toBytes("q4");
final byte[] q5 = Bytes.toBytes("q5");
final byte[] q6 = Bytes.toBytes("q6");
final byte[] q7 = Bytes.toBytes("q7");
final byte[] q8 = Bytes.toBytes("q8");

// 10 seconds
int ttlSecs = 10;
TableDescriptor tableDescriptor =
TableDescriptorBuilder.newBuilder(TableName.valueOf(name.getMethodName())).setColumnFamily(
ColumnFamilyDescriptorBuilder.newBuilder(fam1).setTimeToLive(ttlSecs).build()).build();

Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
conf.setInt(HFile.FORMAT_VERSION_KEY, HFile.MIN_FORMAT_VERSION_WITH_TAGS);
// using small heart beat cells
conf.setLong(StoreScanner.HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK, 2);

region = HBaseTestingUtil
.createRegionAndWAL(RegionInfoBuilder.newBuilder(tableDescriptor.getTableName()).build(),
TEST_UTIL.getDataTestDir(), conf, tableDescriptor);
assertNotNull(region);
long now = EnvironmentEdgeManager.currentTime();
// Add a cell that will expire in 5 seconds via cell TTL
region.put(new Put(row).addColumn(fam1, q1, now, HConstants.EMPTY_BYTE_ARRAY));
region.put(new Put(row).addColumn(fam1, q2, now, HConstants.EMPTY_BYTE_ARRAY));
region.put(new Put(row).addColumn(fam1, q3, now, HConstants.EMPTY_BYTE_ARRAY));
// Add a cell that will expire after 10 seconds via family setting
region
.put(new Put(row).addColumn(fam1, q4, now + ttlSecs * 1000 + 1, HConstants.EMPTY_BYTE_ARRAY));
region
.put(new Put(row).addColumn(fam1, q5, now + ttlSecs * 1000 + 1, HConstants.EMPTY_BYTE_ARRAY));

region.put(new Put(row).addColumn(fam1, q6, now, HConstants.EMPTY_BYTE_ARRAY));
region.put(new Put(row).addColumn(fam1, q7, now, HConstants.EMPTY_BYTE_ARRAY));
region
.put(new Put(row).addColumn(fam1, q8, now + ttlSecs * 1000 + 1, HConstants.EMPTY_BYTE_ARRAY));

// Flush so we are sure store scanning gets this right
region.flush(true);

// A query at time T+0 should return all cells
checkScan(8);

// Increment time to T+ttlSecs seconds
edge.incrementTime(ttlSecs * 1000);
checkScan(3);
}

private void checkScan(int expectCellSize) throws IOException{
Scan s = new Scan().withStartRow(row);
ScannerContext.Builder contextBuilder = ScannerContext.newBuilder(true);
ScannerContext scannerContext = contextBuilder.build();
RegionScanner scanner = region.getScanner(s);
List<Cell> kvs = new ArrayList<>();
scanner.next(kvs, scannerContext);
assertEquals(expectCellSize, kvs.size());
scanner.close();
}

@Test
public void testIncrementTimestampsAreMonotonic() throws IOException {
region = initHRegion(tableName, method, CONF, fam1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1278,6 +1278,77 @@ public long getSmallestReadPoint(HStore store) {
}
}

@Test
public void testPreventLoopRead() throws Exception {
init(this.name.getMethodName());
Configuration conf = HBaseConfiguration.create();
// use small heart beat cells
conf.setLong(StoreScanner.HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK, 2);
IncrementingEnvironmentEdge edge = new IncrementingEnvironmentEdge();
EnvironmentEdgeManager.injectEdge(edge);
byte[] r0 = Bytes.toBytes("row0");
byte[] value0 = Bytes.toBytes("value0");
byte[] value1 = Bytes.toBytes("value1");
MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing();
long ts = EnvironmentEdgeManager.currentTime();
long seqId = 100;
init(name.getMethodName(), conf, TableDescriptorBuilder.newBuilder(TableName.valueOf(table)),
ColumnFamilyDescriptorBuilder.newBuilder(family).setTimeToLive(10).build(),
new MyStoreHook() {
@Override public long getSmallestReadPoint(HStore store) {
return seqId + 3;
}
});
// The cells having the value0 will be expired
store.add(createCell(r0, qf1, ts, seqId, value0), memStoreSizing);
store.add(createCell(r0, qf2, ts, seqId, value0), memStoreSizing);
store.add(createCell(r0, qf3, ts, seqId, value0), memStoreSizing);
store.add(createCell(r0, qf4, ts + 10000 + 1, seqId, value1), memStoreSizing);
store.add(createCell(r0, qf5, ts, seqId, value0), memStoreSizing);
store.add(createCell(r0, qf6, ts + 10000 + 1, seqId, value1), memStoreSizing);

List<Cell> myList = new ArrayList<>();
Scan scan = new Scan().withStartRow(r0);
ScannerContext.Builder contextBuilder = ScannerContext.newBuilder(false);
// test normal scan, should return all the cells
ScannerContext scannerContext = contextBuilder.build();
try (InternalScanner scanner = (InternalScanner) store.getScanner(scan, null,
seqId + 3)) {
scanner.next(myList, scannerContext);
assertEquals(6, myList.size());
}

// test skip two ttl cells and return with empty results, default prevent loop skip is on
edge.incrementTime(10 * 1000);
scannerContext = contextBuilder.build();
myList.clear();
try (InternalScanner scanner = (InternalScanner) store.getScanner(scan, null,
seqId + 3)) {
// r0
scanner.next(myList, scannerContext);
assertEquals(0, myList.size());
}

// should scan all non-ttl expired cells by iterative next
int resultCells = 0;
try (InternalScanner scanner = (InternalScanner) store.getScanner(scan, null,
seqId + 3)) {
boolean hasMore = true;
while (hasMore) {
myList.clear();
hasMore = scanner.next(myList, scannerContext);
assertTrue(myList.size() < 6);
resultCells += myList.size();
}
for (Cell c : myList) {
byte[] actualValue = CellUtil.cloneValue(c);
assertTrue("expected:" + Bytes.toStringBinary(value1) + ", actual:" + Bytes
.toStringBinary(actualValue), Bytes.equals(actualValue, value1));
}
}
assertEquals(2, resultCells);
}

@Test
public void testCreateScannerAndSnapshotConcurrently() throws IOException, InterruptedException {
Configuration conf = HBaseConfiguration.create();
Expand Down

0 comments on commit f3a48d1

Please sign in to comment.