Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HBASE-22036 Rewrite TestScannerHeartbeatMessages #191

Merged
merged 1 commit into from
Apr 28, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;

import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.ArrayDeque;
import java.util.Queue;
import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
import org.apache.yetus.audience.InterfaceAudience;

import org.apache.hbase.thirdparty.com.google.common.base.Throwables;

/**
* A ResultScanner which will only send request to RS when there are no cached results when calling
* next, just like the ResultScanner in the old time. Mainly used for writing UTs, that we can
* control when to send request to RS. The default ResultScanner implementation will fetch in
* background.
*/
@InterfaceAudience.Private
public class ScanPerNextResultScanner implements ResultScanner, AdvancedScanResultConsumer {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems this class is only used for testing ? I prefer to move this into test package, or other future packages will use this ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is under the src/test/java?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, my bad..


private final AsyncTable<AdvancedScanResultConsumer> table;

private final Scan scan;

private final Queue<Result> queue = new ArrayDeque<>();

private ScanMetrics scanMetrics;

private boolean closed = false;

private Throwable error;

private ScanResumer resumer;

public ScanPerNextResultScanner(AsyncTable<AdvancedScanResultConsumer> table, Scan scan) {
this.table = table;
this.scan = scan;
}

@Override
public synchronized void onError(Throwable error) {
this.error = error;
notifyAll();
}

@Override
public synchronized void onComplete() {
closed = true;
notifyAll();
}

@Override
public void onScanMetricsCreated(ScanMetrics scanMetrics) {
this.scanMetrics = scanMetrics;
}

@Override
public synchronized void onNext(Result[] results, ScanController controller) {
assert results.length > 0;
if (closed) {
controller.terminate();
return;
}
for (Result result : results) {
queue.add(result);
}
notifyAll();
resumer = controller.suspend();
}

@Override
public synchronized void onHeartbeat(ScanController controller) {
if (closed) {
controller.terminate();
return;
}
if (scan.isNeedCursorResult()) {
controller.cursor().ifPresent(c -> queue.add(Result.createCursorResult(c)));
}
}

@Override
public synchronized Result next() throws IOException {
if (queue.isEmpty()) {
if (resumer != null) {
resumer.resume();
resumer = null;
} else {
table.scan(scan, this);
}
}
while (queue.isEmpty()) {
if (closed) {
return null;
}
if (error != null) {
Throwables.propagateIfPossible(error, IOException.class);
throw new IOException(error);
}
try {
wait();
} catch (InterruptedException e) {
throw new InterruptedIOException();
}
}
return queue.poll();
}

@Override
public synchronized void close() {
closed = true;
queue.clear();
if (resumer != null) {
resumer.resume();
resumer = null;
}
notifyAll();
}

@Override
public boolean renewLease() {
// The renew lease operation will be handled in background
return false;
}

@Override
public ScanMetrics getScanMetrics() {
return scanMetrics;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,16 @@
import org.apache.hadoop.hbase.HTestConst;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.AdvancedScanResultConsumer;
import org.apache.hadoop.hbase.client.AsyncConnection;
import org.apache.hadoop.hbase.client.AsyncTable;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.ScanPerNextResultScanner;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.filter.Filter;
Expand All @@ -58,10 +63,10 @@
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.experimental.categories.Category;

import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;

Expand All @@ -75,11 +80,7 @@
* the client when the server has exceeded the time limit during the processing of the scan. When
* the time limit is reached, the server will return to the Client whatever Results it has
* accumulated (potentially empty).
* <p/>
* TODO: with async client based sync client, we will fetch result in background which makes this
* test broken. We need to find another way to implement the test.
*/
@Ignore
@Category(MediumTests.class)
public class TestScannerHeartbeatMessages {

Expand All @@ -89,7 +90,7 @@ public class TestScannerHeartbeatMessages {

private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();

private static Table TABLE = null;
private static AsyncConnection CONN;

/**
* Table configuration
Expand Down Expand Up @@ -141,16 +142,19 @@ public static void setUpBeforeClass() throws Exception {
conf.setLong(StoreScanner.HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK, 1);
TEST_UTIL.startMiniCluster(1);

TABLE = createTestTable(TABLE_NAME, ROWS, FAMILIES, QUALIFIERS, VALUE);
createTestTable(TABLE_NAME, ROWS, FAMILIES, QUALIFIERS, VALUE);

Configuration newConf = new Configuration(conf);
newConf.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, CLIENT_TIMEOUT);
newConf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, CLIENT_TIMEOUT);
CONN = ConnectionFactory.createAsyncConnection(newConf).get();
}

static Table createTestTable(TableName name, byte[][] rows, byte[][] families,
byte[][] qualifiers, byte[] cellValue) throws IOException {
static void createTestTable(TableName name, byte[][] rows, byte[][] families, byte[][] qualifiers,
byte[] cellValue) throws IOException {
Table ht = TEST_UTIL.createTable(name, families);
List<Put> puts = createPuts(rows, families, qualifiers, cellValue);
ht.put(puts);
ht.getConfiguration().setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, CLIENT_TIMEOUT);
return ht;
}

/**
Expand All @@ -177,6 +181,7 @@ static ArrayList<Put> createPuts(byte[][] rows, byte[][] families, byte[][] qual

@AfterClass
public static void tearDownAfterClass() throws Exception {
Closeables.close(CONN, true);
TEST_UTIL.shutdownMiniCluster();
}

Expand Down Expand Up @@ -311,26 +316,28 @@ public Void call() throws Exception {
scan.setMaxResultSize(Long.MAX_VALUE);
scan.setCaching(Integer.MAX_VALUE);
scan.setFilter(new SparseCellFilter());
ResultScanner scanner = TABLE.getScanner(scan);
int num = 0;
while (scanner.next() != null) {
num++;
try (ScanPerNextResultScanner scanner =
new ScanPerNextResultScanner(CONN.getTable(TABLE_NAME), scan)) {
int num = 0;
while (scanner.next() != null) {
num++;
}
assertEquals(1, num);
}
assertEquals(1, num);
scanner.close();

scan = new Scan();
scan.setMaxResultSize(Long.MAX_VALUE);
scan.setCaching(Integer.MAX_VALUE);
scan.setFilter(new SparseCellFilter());
scan.setAllowPartialResults(true);
scanner = TABLE.getScanner(scan);
num = 0;
while (scanner.next() != null) {
num++;
try (ScanPerNextResultScanner scanner =
new ScanPerNextResultScanner(CONN.getTable(TABLE_NAME), scan)) {
int num = 0;
while (scanner.next() != null) {
num++;
}
assertEquals(NUM_FAMILIES * NUM_QUALIFIERS, num);
}
assertEquals(NUM_FAMILIES * NUM_QUALIFIERS, num);
scanner.close();

return null;
}
Expand All @@ -349,13 +356,14 @@ public Void call() throws Exception {
scan.setMaxResultSize(Long.MAX_VALUE);
scan.setCaching(Integer.MAX_VALUE);
scan.setFilter(new SparseRowFilter());
ResultScanner scanner = TABLE.getScanner(scan);
int num = 0;
while (scanner.next() != null) {
num++;
try (ScanPerNextResultScanner scanner =
new ScanPerNextResultScanner(CONN.getTable(TABLE_NAME), scan)) {
int num = 0;
while (scanner.next() != null) {
num++;
}
assertEquals(1, num);
}
assertEquals(1, num);
scanner.close();

return null;
}
Expand All @@ -374,8 +382,9 @@ public Void call() throws Exception {
private void testEquivalenceOfScanWithHeartbeats(final Scan scan, int rowSleepTime,
int cfSleepTime, boolean sleepBeforeCf) throws Exception {
disableSleeping();
final ResultScanner scanner = TABLE.getScanner(scan);
final ResultScanner scannerWithHeartbeats = TABLE.getScanner(scan);
AsyncTable<AdvancedScanResultConsumer> table = CONN.getTable(TABLE_NAME);
final ResultScanner scanner = new ScanPerNextResultScanner(table, scan);
final ResultScanner scannerWithHeartbeats = new ScanPerNextResultScanner(table, scan);

Result r1 = null;
Result r2 = null;
Expand Down