Skip to content

Commit

Permalink
HubSpot Backport: HBASE-27950 ClientSideRegionScanner does not adhere…
Browse files Browse the repository at this point in the history
… to RegionScanner.nextRaw contract (apache#5304)

Signed-off-by: Duo Zhang <[email protected]>
Signed-off-by: Bryan Beaudreault <[email protected]>
  • Loading branch information
hgromer authored and bbeaudreault committed Jun 23, 2023
1 parent 4d5dbe4 commit 65a0a9f
Show file tree
Hide file tree
Showing 2 changed files with 89 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ public class ClientSideRegionScanner extends AbstractClientScanner {
private HRegion region;
RegionScanner scanner;
List<Cell> values;
boolean hasMore = true;

public ClientSideRegionScanner(Configuration conf, FileSystem fs, Path rootDir,
TableDescriptor htd, RegionInfo hri, Scan scan, ScanMetrics scanMetrics) throws IOException {
Expand Down Expand Up @@ -90,12 +91,13 @@ public ClientSideRegionScanner(Configuration conf, FileSystem fs, Path rootDir,

@Override
public Result next() throws IOException {
values.clear();
scanner.nextRaw(values);
if (values.isEmpty()) {
// we are done
return null;
}
do {
if (!hasMore) {
return null;
}
values.clear();
this.hasMore = scanner.nextRaw(values);
} while (values.isEmpty());

Result result = Result.create(values);
if (this.scanMetrics != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
*/
package org.apache.hadoop.hbase.client;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
Expand All @@ -25,14 +27,19 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.CompareOperator;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
import org.apache.hadoop.hbase.io.hfile.BlockCache;
import org.apache.hadoop.hbase.io.hfile.IndexOnlyLruBlockCache;
import org.apache.hadoop.hbase.regionserver.StoreScanner;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
Expand All @@ -47,6 +54,9 @@ public class TestClientSideRegionScanner {
HBaseClassTestRule.forClass(TestClientSideRegionScanner.class);

private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
private static final TableName TABLE_NAME = TableName.valueOf("test");
private static final byte[] FAM_NAME = Bytes.toBytes("f");
private static final byte[] FAM_NAME_2 = Bytes.toBytes("f2");

private Configuration conf;
private Path rootDir;
Expand Down Expand Up @@ -113,4 +123,75 @@ public void testNoBlockCache() throws IOException {
BlockCache blockCache = clientSideRegionScanner.getRegion().getBlockCache();
assertNull(blockCache);
}

@Test
public void testContinuesToScanIfHasMore() throws IOException {
// In order to hit this bug, we need RegionScanner to exit out early despite having retrieved
// no values. By default, RegionScanner will continue until it gets values or a limit is
// exceeded. In the context of this test, the easiest way to do that is to trigger the
// retryImmediately behavior of converting from PREAD to STREAM. To do that, we set PREAD max
// bytes to a small number. Then we need to actually check our limits, which would trigger
// retryImmediately. To do that, we use joinedContinuationRow (essential family) through
// a SingleColumnValueFilter with setFilterIfMissing(true). This causes FAM_NAME_2 to be
// non-essential, and only brought in when the filter passes. This causes us to go through the
// limit check in joinedContinuationRow and exit out early with hasMore=true but empty values.
// Prior to HBASE-27950, the first call to ClientSideRegionScanner.next() below would
// return null because of the empty values list, and there would further be nulls in
// between row 5 and 8. With the fix, the scanner appropriately continues iterating, only
// returning if there are values or finally when hasMore=false at the end. Thus, the first
// next returns row 5, then row 8, then null because the scanner is exhausted. The intermediate
// nulls from rows in between are correctly skipped.

Configuration copyConf = new Configuration(conf);
copyConf.setLong(StoreScanner.STORESCANNER_PREAD_MAX_BYTES, 1);
int[] filteredRows = new int[] { 5, 8 };
FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ONE);
for (int filteredRow : filteredRows) {
SingleColumnValueFilter filter = new SingleColumnValueFilter(FAM_NAME,
Bytes.toBytes(filteredRow), CompareOperator.EQUAL, Bytes.toBytes(filteredRow));
filter.setFilterIfMissing(true);
filterList.addFilter(filter);
}

Scan scan = new Scan();
scan.setFilter(filterList);
scan.setLimit(1);

try (Table table = TEST_UTIL.createTable(TABLE_NAME, new byte[][] { FAM_NAME, FAM_NAME_2 })) {
TableDescriptor htd = TEST_UTIL.getAdmin().getDescriptor(TABLE_NAME);
RegionInfo hri = TEST_UTIL.getAdmin().getRegions(TABLE_NAME).get(0);

for (int i = 0; i < 10; ++i) {
table.put(createPut(i));
}

// Flush contents to disk so we can scan the fs
TEST_UTIL.getAdmin().flush(TABLE_NAME);

ClientSideRegionScanner clientSideRegionScanner =
new ClientSideRegionScanner(copyConf, fs, rootDir, htd, hri, scan, null);

Result result;

for (int filteredRow : filteredRows) {
result = clientSideRegionScanner.next();
assertNotNull(result);
assertEquals(Bytes.toInt(result.getRow()), filteredRow);
assertTrue(clientSideRegionScanner.hasMore);
}

result = clientSideRegionScanner.next();
assertNull(result);
assertFalse(clientSideRegionScanner.hasMore);
}
}

private static Put createPut(int rowAsInt) {
byte[] row = Bytes.toBytes(rowAsInt);
Put put = new Put(row);
put.addColumn(FAM_NAME, row, row);
put.addColumn(FAM_NAME_2, row, row);
return put;
}

}

0 comments on commit 65a0a9f

Please sign in to comment.