Skip to content

Commit

Permalink
Revert "HBASE-25709 Close region may stuck when region is compacting …
Browse files Browse the repository at this point in the history
…and skipped most cells read (#4536) (#5086)"

This reverts commit 1ac7bbc.
  • Loading branch information
sunhelly committed Mar 6, 2023
1 parent 1ac7bbc commit 2cdcab2
Show file tree
Hide file tree
Showing 6 changed files with 6 additions and 90 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -360,11 +360,6 @@ public enum OperationStatusCode {
public static final String COMPACTION_KV_MAX = "hbase.hstore.compaction.kv.max";
public static final int COMPACTION_KV_MAX_DEFAULT = 10;

/** Parameter name for the scanner size limit to be used in compactions */
public static final String COMPACTION_SCANNER_SIZE_MAX =
"hbase.hstore.compaction.scanner.size.limit";
public static final long COMPACTION_SCANNER_SIZE_MAX_DEFAULT = 10 * 1024 * 1024L; // 10MB

/** Parameter name for HBase instance root directory */
public static final String HBASE_DIR = "hbase.rootdir";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -346,10 +346,8 @@ protected boolean performCompaction(FileDetails fd, InternalScanner scanner, Cel
long cellsSizeCompactedToMob = 0, cellsSizeCompactedFromMob = 0;
boolean finished = false;

ScannerContext scannerContext = ScannerContext.newBuilder().setBatchLimit(compactionKVMax)
.setSizeLimit(ScannerContext.LimitScope.BETWEEN_CELLS, Long.MAX_VALUE, Long.MAX_VALUE,
compactScannerSizeLimit)
.build();
ScannerContext scannerContext =
ScannerContext.newBuilder().setBatchLimit(compactionKVMax).build();
throughputController.start(compactionName);
KeyValueScanner kvs = (scanner instanceof KeyValueScanner) ? (KeyValueScanner) scanner : null;
long shippedCallSizeLimit =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,6 @@ public abstract class Compactor<T extends CellSink> {
protected final Configuration conf;
protected final HStore store;
protected final int compactionKVMax;
protected final long compactScannerSizeLimit;
protected final Compression.Algorithm majorCompactionCompression;
protected final Compression.Algorithm minorCompactionCompression;

Expand All @@ -109,8 +108,6 @@ public abstract class Compactor<T extends CellSink> {
this.store = store;
this.compactionKVMax =
this.conf.getInt(HConstants.COMPACTION_KV_MAX, HConstants.COMPACTION_KV_MAX_DEFAULT);
this.compactScannerSizeLimit = this.conf.getLong(HConstants.COMPACTION_SCANNER_SIZE_MAX,
HConstants.COMPACTION_SCANNER_SIZE_MAX_DEFAULT);
this.majorCompactionCompression = (store.getColumnFamilyDescriptor() == null)
? Compression.Algorithm.NONE
: store.getColumnFamilyDescriptor().getMajorCompactionCompressionType();
Expand Down Expand Up @@ -431,10 +428,8 @@ protected boolean performCompaction(FileDetails fd, InternalScanner scanner, Cel
String compactionName = ThroughputControlUtil.getNameForThrottling(store, "compaction");
long now = 0;
boolean hasMore;
ScannerContext scannerContext = ScannerContext.newBuilder().setBatchLimit(compactionKVMax)
.setSizeLimit(ScannerContext.LimitScope.BETWEEN_CELLS, Long.MAX_VALUE, Long.MAX_VALUE,
compactScannerSizeLimit)
.build();
ScannerContext scannerContext =
ScannerContext.newBuilder().setBatchLimit(compactionKVMax).build();

throughputController.start(compactionName);
Shipper shipper = (scanner instanceof Shipper) ? (Shipper) scanner : null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,10 +140,8 @@ protected boolean performCompaction(FileDetails fd, InternalScanner scanner, Cel
long cellsSizeCompactedToMob = 0, cellsSizeCompactedFromMob = 0;
boolean finished = false;

ScannerContext scannerContext = ScannerContext.newBuilder().setBatchLimit(compactionKVMax)
.setSizeLimit(ScannerContext.LimitScope.BETWEEN_CELLS, Long.MAX_VALUE, Long.MAX_VALUE,
compactScannerSizeLimit)
.build();
ScannerContext scannerContext =
ScannerContext.newBuilder().setBatchLimit(compactionKVMax).build();
throughputController.start(compactionName);
KeyValueScanner kvs = (scanner instanceof KeyValueScanner) ? (KeyValueScanner) scanner : null;
long shippedCallSizeLimit =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,6 @@ public TestCompaction() {
// Set cache flush size to 1MB
conf.setInt(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 1024 * 1024);
conf.setInt(HConstants.HREGION_MEMSTORE_BLOCK_MULTIPLIER, 100);
conf.setLong(HConstants.COMPACTION_SCANNER_SIZE_MAX, 10L);
conf.set(CompactionThroughputControllerFactory.HBASE_THROUGHPUT_CONTROLLER_KEY,
NoLimitThroughputController.class.getName());
compactionThreshold = conf.getInt("hbase.hstore.compactionThreshold", 3);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6910,75 +6910,6 @@ public void testCellTTLs() throws IOException {
assertNull(r.getValue(fam1, q1));
}

@Test
public void testTTLsUsingSmallHeartBeatCells() throws IOException {
IncrementingEnvironmentEdge edge = new IncrementingEnvironmentEdge();
EnvironmentEdgeManager.injectEdge(edge);

final byte[] row = Bytes.toBytes("testRow");
final byte[] q1 = Bytes.toBytes("q1");
final byte[] q2 = Bytes.toBytes("q2");
final byte[] q3 = Bytes.toBytes("q3");
final byte[] q4 = Bytes.toBytes("q4");
final byte[] q5 = Bytes.toBytes("q5");
final byte[] q6 = Bytes.toBytes("q6");
final byte[] q7 = Bytes.toBytes("q7");
final byte[] q8 = Bytes.toBytes("q8");

// 10 seconds
int ttlSecs = 10;
TableDescriptor tableDescriptor =
TableDescriptorBuilder.newBuilder(TableName.valueOf(name.getMethodName())).setColumnFamily(
ColumnFamilyDescriptorBuilder.newBuilder(fam1).setTimeToLive(ttlSecs).build()).build();

Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
conf.setInt(HFile.FORMAT_VERSION_KEY, HFile.MIN_FORMAT_VERSION_WITH_TAGS);
// using small heart beat cells
conf.setLong(StoreScanner.HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK, 2);

region = HBaseTestingUtility.createRegionAndWAL(
RegionInfoBuilder.newBuilder(tableDescriptor.getTableName()).build(),
TEST_UTIL.getDataTestDir(), conf, tableDescriptor);
assertNotNull(region);
long now = EnvironmentEdgeManager.currentTime();
// Add a cell that will expire in 5 seconds via cell TTL
region.put(new Put(row).addColumn(fam1, q1, now, HConstants.EMPTY_BYTE_ARRAY));
region.put(new Put(row).addColumn(fam1, q2, now, HConstants.EMPTY_BYTE_ARRAY));
region.put(new Put(row).addColumn(fam1, q3, now, HConstants.EMPTY_BYTE_ARRAY));
// Add a cell that will expire after 10 seconds via family setting
region
.put(new Put(row).addColumn(fam1, q4, now + ttlSecs * 1000 + 1, HConstants.EMPTY_BYTE_ARRAY));
region
.put(new Put(row).addColumn(fam1, q5, now + ttlSecs * 1000 + 1, HConstants.EMPTY_BYTE_ARRAY));

region.put(new Put(row).addColumn(fam1, q6, now, HConstants.EMPTY_BYTE_ARRAY));
region.put(new Put(row).addColumn(fam1, q7, now, HConstants.EMPTY_BYTE_ARRAY));
region
.put(new Put(row).addColumn(fam1, q8, now + ttlSecs * 1000 + 1, HConstants.EMPTY_BYTE_ARRAY));

// Flush so we are sure store scanning gets this right
region.flush(true);

// A query at time T+0 should return all cells
checkScan(8);
region.delete(new Delete(row).addColumn(fam1, q8));

// Increment time to T+ttlSecs seconds
edge.incrementTime(ttlSecs * 1000);
checkScan(2);
}

private void checkScan(int expectCellSize) throws IOException {
Scan s = new Scan().withStartRow(row);
ScannerContext.Builder contextBuilder = ScannerContext.newBuilder(true);
ScannerContext scannerContext = contextBuilder.build();
RegionScanner scanner = region.getScanner(s);
List<Cell> kvs = new ArrayList<>();
scanner.next(kvs, scannerContext);
assertEquals(expectCellSize, kvs.size());
scanner.close();
}

@Test
public void testIncrementTimestampsAreMonotonic() throws IOException {
region = initHRegion(tableName, method, CONF, fam1);
Expand Down

0 comments on commit 2cdcab2

Please sign in to comment.