Skip to content

Commit

Permalink
HBASE-22036 Rewrite TestScannerHeartbeatMessages
Browse files Browse the repository at this point in the history
  • Loading branch information
Apache9 committed Apr 28, 2019
1 parent dfa4f47 commit cfdbbbc
Show file tree
Hide file tree
Showing 2 changed files with 187 additions and 31 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;

import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.ArrayDeque;
import java.util.Queue;
import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
import org.apache.yetus.audience.InterfaceAudience;

import org.apache.hbase.thirdparty.com.google.common.base.Throwables;

/**
* A ResultScanner which will only send request to RS when there are no cached results when calling
* next, just like the ResultScanner in the old time. Mainly used for writing UTs, that we can
* control when to send request to RS. The default ResultScanner implementation will fetch in
* background.
*/
@InterfaceAudience.Private
public class ScanPerNextResultScanner implements ResultScanner, AdvancedScanResultConsumer {

private final AsyncTable<AdvancedScanResultConsumer> table;

private final Scan scan;

private final Queue<Result> queue = new ArrayDeque<>();

private ScanMetrics scanMetrics;

private boolean closed = false;

private Throwable error;

private ScanResumer resumer;

public ScanPerNextResultScanner(AsyncTable<AdvancedScanResultConsumer> table, Scan scan) {
this.table = table;
this.scan = scan;
}

@Override
public synchronized void onError(Throwable error) {
this.error = error;
notifyAll();
}

@Override
public synchronized void onComplete() {
closed = true;
notifyAll();
}

@Override
public void onScanMetricsCreated(ScanMetrics scanMetrics) {
this.scanMetrics = scanMetrics;
}

@Override
public synchronized void onNext(Result[] results, ScanController controller) {
assert results.length > 0;
if (closed) {
controller.terminate();
return;
}
for (Result result : results) {
queue.add(result);
}
notifyAll();
resumer = controller.suspend();
}

@Override
public synchronized void onHeartbeat(ScanController controller) {
if (closed) {
controller.terminate();
return;
}
if (scan.isNeedCursorResult()) {
controller.cursor().ifPresent(c -> queue.add(Result.createCursorResult(c)));
}
}

@Override
public synchronized Result next() throws IOException {
if (queue.isEmpty()) {
if (resumer != null) {
resumer.resume();
resumer = null;
} else {
table.scan(scan, this);
}
}
while (queue.isEmpty()) {
if (closed) {
return null;
}
if (error != null) {
Throwables.propagateIfPossible(error, IOException.class);
throw new IOException(error);
}
try {
wait();
} catch (InterruptedException e) {
throw new InterruptedIOException();
}
}
return queue.poll();
}

@Override
public synchronized void close() {
closed = true;
queue.clear();
if (resumer != null) {
resumer.resume();
resumer = null;
}
notifyAll();
}

@Override
public boolean renewLease() {
// The renew lease operation will be handled in background
return false;
}

@Override
public ScanMetrics getScanMetrics() {
return scanMetrics;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,16 @@
import org.apache.hadoop.hbase.HTestConst;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.AdvancedScanResultConsumer;
import org.apache.hadoop.hbase.client.AsyncConnection;
import org.apache.hadoop.hbase.client.AsyncTable;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.ScanPerNextResultScanner;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.filter.Filter;
Expand All @@ -58,10 +63,10 @@
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.experimental.categories.Category;

import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;

Expand All @@ -75,11 +80,7 @@
* the client when the server has exceeded the time limit during the processing of the scan. When
* the time limit is reached, the server will return to the Client whatever Results it has
* accumulated (potentially empty).
* <p/>
* TODO: with async client based sync client, we will fetch result in background which makes this
* test broken. We need to find another way to implement the test.
*/
@Ignore
@Category(MediumTests.class)
public class TestScannerHeartbeatMessages {

Expand All @@ -89,7 +90,7 @@ public class TestScannerHeartbeatMessages {

private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();

private static Table TABLE = null;
private static AsyncConnection CONN;

/**
* Table configuration
Expand Down Expand Up @@ -141,16 +142,19 @@ public static void setUpBeforeClass() throws Exception {
conf.setLong(StoreScanner.HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK, 1);
TEST_UTIL.startMiniCluster(1);

TABLE = createTestTable(TABLE_NAME, ROWS, FAMILIES, QUALIFIERS, VALUE);
createTestTable(TABLE_NAME, ROWS, FAMILIES, QUALIFIERS, VALUE);

Configuration newConf = new Configuration(conf);
newConf.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, CLIENT_TIMEOUT);
newConf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, CLIENT_TIMEOUT);
CONN = ConnectionFactory.createAsyncConnection(newConf).get();
}

static Table createTestTable(TableName name, byte[][] rows, byte[][] families,
byte[][] qualifiers, byte[] cellValue) throws IOException {
static void createTestTable(TableName name, byte[][] rows, byte[][] families, byte[][] qualifiers,
byte[] cellValue) throws IOException {
Table ht = TEST_UTIL.createTable(name, families);
List<Put> puts = createPuts(rows, families, qualifiers, cellValue);
ht.put(puts);
ht.getConfiguration().setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, CLIENT_TIMEOUT);
return ht;
}

/**
Expand All @@ -177,6 +181,7 @@ static ArrayList<Put> createPuts(byte[][] rows, byte[][] families, byte[][] qual

@AfterClass
public static void tearDownAfterClass() throws Exception {
Closeables.close(CONN, true);
TEST_UTIL.shutdownMiniCluster();
}

Expand Down Expand Up @@ -311,26 +316,28 @@ public Void call() throws Exception {
scan.setMaxResultSize(Long.MAX_VALUE);
scan.setCaching(Integer.MAX_VALUE);
scan.setFilter(new SparseCellFilter());
ResultScanner scanner = TABLE.getScanner(scan);
int num = 0;
while (scanner.next() != null) {
num++;
try (ScanPerNextResultScanner scanner =
new ScanPerNextResultScanner(CONN.getTable(TABLE_NAME), scan)) {
int num = 0;
while (scanner.next() != null) {
num++;
}
assertEquals(1, num);
}
assertEquals(1, num);
scanner.close();

scan = new Scan();
scan.setMaxResultSize(Long.MAX_VALUE);
scan.setCaching(Integer.MAX_VALUE);
scan.setFilter(new SparseCellFilter());
scan.setAllowPartialResults(true);
scanner = TABLE.getScanner(scan);
num = 0;
while (scanner.next() != null) {
num++;
try (ScanPerNextResultScanner scanner =
new ScanPerNextResultScanner(CONN.getTable(TABLE_NAME), scan)) {
int num = 0;
while (scanner.next() != null) {
num++;
}
assertEquals(NUM_FAMILIES * NUM_QUALIFIERS, num);
}
assertEquals(NUM_FAMILIES * NUM_QUALIFIERS, num);
scanner.close();

return null;
}
Expand All @@ -349,13 +356,14 @@ public Void call() throws Exception {
scan.setMaxResultSize(Long.MAX_VALUE);
scan.setCaching(Integer.MAX_VALUE);
scan.setFilter(new SparseRowFilter());
ResultScanner scanner = TABLE.getScanner(scan);
int num = 0;
while (scanner.next() != null) {
num++;
try (ScanPerNextResultScanner scanner =
new ScanPerNextResultScanner(CONN.getTable(TABLE_NAME), scan)) {
int num = 0;
while (scanner.next() != null) {
num++;
}
assertEquals(1, num);
}
assertEquals(1, num);
scanner.close();

return null;
}
Expand All @@ -374,8 +382,9 @@ public Void call() throws Exception {
private void testEquivalenceOfScanWithHeartbeats(final Scan scan, int rowSleepTime,
int cfSleepTime, boolean sleepBeforeCf) throws Exception {
disableSleeping();
final ResultScanner scanner = TABLE.getScanner(scan);
final ResultScanner scannerWithHeartbeats = TABLE.getScanner(scan);
AsyncTable<AdvancedScanResultConsumer> table = CONN.getTable(TABLE_NAME);
final ResultScanner scanner = new ScanPerNextResultScanner(table, scan);
final ResultScanner scannerWithHeartbeats = new ScanPerNextResultScanner(table, scan);

Result r1 = null;
Result r2 = null;
Expand Down

0 comments on commit cfdbbbc

Please sign in to comment.