diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/ScanPerNextResultScanner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/ScanPerNextResultScanner.java new file mode 100644 index 000000000000..c8665e912667 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/ScanPerNextResultScanner.java @@ -0,0 +1,147 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import java.io.IOException; +import java.io.InterruptedIOException; +import java.util.ArrayDeque; +import java.util.Queue; +import org.apache.hadoop.hbase.client.metrics.ScanMetrics; +import org.apache.yetus.audience.InterfaceAudience; + +import org.apache.hbase.thirdparty.com.google.common.base.Throwables; + +/** + * A ResultScanner which will only send request to RS when there are no cached results when calling + * next, just like the ResultScanner in the old time. Mainly used for writing UTs, that we can + * control when to send request to RS. The default ResultScanner implementation will fetch in + * background. + */ +@InterfaceAudience.Private +public class ScanPerNextResultScanner implements ResultScanner, AdvancedScanResultConsumer { + + private final AsyncTable table; + + private final Scan scan; + + private final Queue queue = new ArrayDeque<>(); + + private ScanMetrics scanMetrics; + + private boolean closed = false; + + private Throwable error; + + private ScanResumer resumer; + + public ScanPerNextResultScanner(AsyncTable table, Scan scan) { + this.table = table; + this.scan = scan; + } + + @Override + public synchronized void onError(Throwable error) { + this.error = error; + notifyAll(); + } + + @Override + public synchronized void onComplete() { + closed = true; + notifyAll(); + } + + @Override + public void onScanMetricsCreated(ScanMetrics scanMetrics) { + this.scanMetrics = scanMetrics; + } + + @Override + public synchronized void onNext(Result[] results, ScanController controller) { + assert results.length > 0; + if (closed) { + controller.terminate(); + return; + } + for (Result result : results) { + queue.add(result); + } + notifyAll(); + resumer = controller.suspend(); + } + + @Override + public synchronized void onHeartbeat(ScanController controller) { + if (closed) { + controller.terminate(); + return; + } + if (scan.isNeedCursorResult()) { + controller.cursor().ifPresent(c -> queue.add(Result.createCursorResult(c))); + } + } + + @Override + public synchronized Result next() throws IOException { + if (queue.isEmpty()) { + if (resumer != null) { + resumer.resume(); + resumer = null; + } else { + table.scan(scan, this); + } + } + while (queue.isEmpty()) { + if (closed) { + return null; + } + if (error != null) { + Throwables.propagateIfPossible(error, IOException.class); + throw new IOException(error); + } + try { + wait(); + } catch (InterruptedException e) { + throw new InterruptedIOException(); + } + } + return queue.poll(); + } + + @Override + public synchronized void close() { + closed = true; + queue.clear(); + if (resumer != null) { + resumer.resume(); + resumer = null; + } + notifyAll(); + } + + @Override + public boolean renewLease() { + // The renew lease operation will be handled in background + return false; + } + + @Override + public ScanMetrics getScanMetrics() { + return scanMetrics; + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerHeartbeatMessages.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerHeartbeatMessages.java index ea9f7e7319a7..7a21941d3290 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerHeartbeatMessages.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerHeartbeatMessages.java @@ -39,11 +39,16 @@ import org.apache.hadoop.hbase.HTestConst; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.AdvancedScanResultConsumer; +import org.apache.hadoop.hbase.client.AsyncConnection; +import org.apache.hadoop.hbase.client.AsyncTable; +import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.ScanPerNextResultScanner; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.TableDescriptor; import org.apache.hadoop.hbase.filter.Filter; @@ -58,10 +63,10 @@ import org.junit.Before; import org.junit.BeforeClass; import org.junit.ClassRule; -import org.junit.Ignore; import org.junit.Test; import org.junit.experimental.categories.Category; +import org.apache.hbase.thirdparty.com.google.common.io.Closeables; import org.apache.hbase.thirdparty.com.google.protobuf.RpcController; import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; @@ -75,11 +80,7 @@ * the client when the server has exceeded the time limit during the processing of the scan. When * the time limit is reached, the server will return to the Client whatever Results it has * accumulated (potentially empty). - *

- * TODO: with async client based sync client, we will fetch result in background which makes this - * test broken. We need to find another way to implement the test. */ -@Ignore @Category(MediumTests.class) public class TestScannerHeartbeatMessages { @@ -89,7 +90,7 @@ public class TestScannerHeartbeatMessages { private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); - private static Table TABLE = null; + private static AsyncConnection CONN; /** * Table configuration @@ -141,16 +142,19 @@ public static void setUpBeforeClass() throws Exception { conf.setLong(StoreScanner.HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK, 1); TEST_UTIL.startMiniCluster(1); - TABLE = createTestTable(TABLE_NAME, ROWS, FAMILIES, QUALIFIERS, VALUE); + createTestTable(TABLE_NAME, ROWS, FAMILIES, QUALIFIERS, VALUE); + + Configuration newConf = new Configuration(conf); + newConf.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, CLIENT_TIMEOUT); + newConf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, CLIENT_TIMEOUT); + CONN = ConnectionFactory.createAsyncConnection(newConf).get(); } - static Table createTestTable(TableName name, byte[][] rows, byte[][] families, - byte[][] qualifiers, byte[] cellValue) throws IOException { + static void createTestTable(TableName name, byte[][] rows, byte[][] families, byte[][] qualifiers, + byte[] cellValue) throws IOException { Table ht = TEST_UTIL.createTable(name, families); List puts = createPuts(rows, families, qualifiers, cellValue); ht.put(puts); - ht.getConfiguration().setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, CLIENT_TIMEOUT); - return ht; } /** @@ -177,6 +181,7 @@ static ArrayList createPuts(byte[][] rows, byte[][] families, byte[][] qual @AfterClass public static void tearDownAfterClass() throws Exception { + Closeables.close(CONN, true); TEST_UTIL.shutdownMiniCluster(); } @@ -311,26 +316,28 @@ public Void call() throws Exception { scan.setMaxResultSize(Long.MAX_VALUE); scan.setCaching(Integer.MAX_VALUE); scan.setFilter(new SparseCellFilter()); - ResultScanner scanner = TABLE.getScanner(scan); - int num = 0; - while (scanner.next() != null) { - num++; + try (ScanPerNextResultScanner scanner = + new ScanPerNextResultScanner(CONN.getTable(TABLE_NAME), scan)) { + int num = 0; + while (scanner.next() != null) { + num++; + } + assertEquals(1, num); } - assertEquals(1, num); - scanner.close(); scan = new Scan(); scan.setMaxResultSize(Long.MAX_VALUE); scan.setCaching(Integer.MAX_VALUE); scan.setFilter(new SparseCellFilter()); scan.setAllowPartialResults(true); - scanner = TABLE.getScanner(scan); - num = 0; - while (scanner.next() != null) { - num++; + try (ScanPerNextResultScanner scanner = + new ScanPerNextResultScanner(CONN.getTable(TABLE_NAME), scan)) { + int num = 0; + while (scanner.next() != null) { + num++; + } + assertEquals(NUM_FAMILIES * NUM_QUALIFIERS, num); } - assertEquals(NUM_FAMILIES * NUM_QUALIFIERS, num); - scanner.close(); return null; } @@ -349,13 +356,14 @@ public Void call() throws Exception { scan.setMaxResultSize(Long.MAX_VALUE); scan.setCaching(Integer.MAX_VALUE); scan.setFilter(new SparseRowFilter()); - ResultScanner scanner = TABLE.getScanner(scan); - int num = 0; - while (scanner.next() != null) { - num++; + try (ScanPerNextResultScanner scanner = + new ScanPerNextResultScanner(CONN.getTable(TABLE_NAME), scan)) { + int num = 0; + while (scanner.next() != null) { + num++; + } + assertEquals(1, num); } - assertEquals(1, num); - scanner.close(); return null; } @@ -374,8 +382,9 @@ public Void call() throws Exception { private void testEquivalenceOfScanWithHeartbeats(final Scan scan, int rowSleepTime, int cfSleepTime, boolean sleepBeforeCf) throws Exception { disableSleeping(); - final ResultScanner scanner = TABLE.getScanner(scan); - final ResultScanner scannerWithHeartbeats = TABLE.getScanner(scan); + AsyncTable table = CONN.getTable(TABLE_NAME); + final ResultScanner scanner = new ScanPerNextResultScanner(table, scan); + final ResultScanner scannerWithHeartbeats = new ScanPerNextResultScanner(table, scan); Result r1 = null; Result r2 = null;