Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HBASE-28509 ScanResumer.resume would perform unnecessary scan when cl… #5817

Merged
merged 5 commits into from
Apr 20, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -230,7 +230,8 @@ private enum ScanResumerState {
// Notice that, the public methods of this class is supposed to be called by upper layer only, and
// package private methods can only be called within the implementation of
// AsyncScanSingleRegionRpcRetryingCaller.
private final class ScanResumerImpl implements AdvancedScanResultConsumer.ScanResumer {
@InterfaceAudience.Private
final class ScanResumerImpl implements AdvancedScanResultConsumer.ScanResumer {

// INITIALIZED -> SUSPENDED -> RESUMED
// INITIALIZED -> RESUMED
@@ -250,6 +251,18 @@ private final class ScanResumerImpl implements AdvancedScanResultConsumer.ScanRe

@Override
public void resume() {
doResume(false);
}

/**
* This method is used when {@link ScanControllerImpl#suspend} had ever been called to get a
* {@link ScanResumerImpl}, but now user stops scan and does not need any more scan results.
*/
public void terminate() {
doResume(true);
}

private void doResume(boolean stopScan) {
// just used to fix findbugs warnings. In fact, if resume is called before prepare, then we
// just return at the first if condition without loading the resp and numValidResuls field. If
// resume is called after suspend, then it is also safe to just reference resp and
@@ -274,7 +287,11 @@ public void resume() {
localResp = this.resp;
localNumberOfCompleteRows = this.numberOfCompleteRows;
}
completeOrNext(localResp, localNumberOfCompleteRows);
if (stopScan) {
stopScan(localResp);
} else {
completeOrNext(localResp, localNumberOfCompleteRows);
}
}

private void scheduleRenewLeaseTask() {
@@ -536,12 +553,7 @@ private void onComplete(HBaseRpcController controller, ScanResponse resp) {
}
ScanControllerState state = scanController.destroy();
if (state == ScanControllerState.TERMINATED) {
if (resp.getMoreResultsInRegion()) {
// we have more results in region but user request to stop the scan, so we need to close the
// scanner explicitly.
closeScanner();
}
completeNoMoreResults();
stopScan(resp);
return;
}
int numberOfCompleteRows = resultCache.numberOfCompleteRows() - numberOfCompleteRowsBefore;
@@ -553,6 +565,15 @@ private void onComplete(HBaseRpcController controller, ScanResponse resp) {
completeOrNext(resp, numberOfCompleteRows);
}

private void stopScan(ScanResponse resp) {
if (resp.getMoreResultsInRegion()) {
// we have more results in region but user request to stop the scan, so we need to close the
// scanner explicitly.
closeScanner();
}
completeNoMoreResults();
}

private void call() {
// As we have a call sequence for scan, it is useless to have a different rpc timeout which is
// less than the scan timeout. If the server does not respond in time(usually this will not
Original file line number Diff line number Diff line change
@@ -25,6 +25,7 @@
import java.util.ArrayDeque;
import java.util.Queue;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.AsyncScanSingleRegionRpcRetryingCaller.ScanResumerImpl;
import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
import org.apache.hadoop.hbase.util.FutureUtils;
import org.apache.yetus.audience.InterfaceAudience;
@@ -143,6 +144,25 @@ private void resumePrefetch() {
resumer = null;
}

private void terminateResumerIfPossible() {
if (resumer == null) {
return;
}
// AsyncTableResultScanner.close means we do not need scan results any more, but for
// ScanResumerImpl.resume, it would perform another scan on RegionServer and call
// AsyncTableResultScanner.onNext again when ScanResponse is received. This time
// AsyncTableResultScanner.onNext would do nothing else but just discard ScanResponse
// because AsyncTableResultScanner.closed is true. So here we would better save this
// unnecessary scan on RegionServer and introduce ScanResumerImpl.terminate to close
// scanner directly.
if (resumer instanceof ScanResumerImpl) {
((ScanResumerImpl) resumer).terminate();
Apache9 marked this conversation as resolved.
Show resolved Hide resolved
} else {
resumePrefetch();
}
resumer = null;
}

@Override
public synchronized Result next() throws IOException {
while (queue.isEmpty()) {
@@ -173,9 +193,7 @@ public synchronized void close() {
closed = true;
queue.clear();
cacheSize = 0;
if (resumer != null) {
resumePrefetch();
}
terminateResumerIfPossible();
notifyAll();
}

Original file line number Diff line number Diff line change
@@ -19,7 +19,9 @@

import static org.junit.Assert.assertEquals;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.hadoop.hbase.HBaseClassTestRule;
@@ -78,21 +80,41 @@ private int getScannersCount() {

@Test
public void testCloseScannerWhileSuspending() throws Exception {
try (ResultScanner scanner = TABLE.getScanner(new Scan().setMaxResultSize(1))) {
TEST_UTIL.waitFor(10000, 100, new ExplainingPredicate<Exception>() {

@Override
public boolean evaluate() throws Exception {
return ((AsyncTableResultScanner) scanner).isSuspended();
}

@Override
public String explainFailure() throws Exception {
return "The given scanner has been suspended in time";
}
});
assertEquals(1, getScannersCount());
}
final AtomicInteger onNextCounter = new AtomicInteger(0);
final CountDownLatch latch = new CountDownLatch(1);
final Scan scan = new Scan().setMaxResultSize(1);
final AsyncTableResultScanner scanner = new AsyncTableResultScanner(TABLE_NAME, scan, 1) {
@Override
public void onNext(Result[] results, ScanController controller) {
onNextCounter.incrementAndGet();
super.onNext(results, controller);
}

@Override
public void onComplete() {
super.onComplete();
latch.countDown();
}
};

CONN.getTable(TABLE_NAME).scan(scan, scanner);

TEST_UTIL.waitFor(10000, 100, new ExplainingPredicate<Exception>() {

@Override
public boolean evaluate() throws Exception {
return scanner.isSuspended();
}

@Override
public String explainFailure() throws Exception {
return "The given scanner has been suspended in time";
}
});
assertEquals(1, getScannersCount());
assertEquals(1, onNextCounter.get());

scanner.close();
TEST_UTIL.waitFor(10000, 100, new ExplainingPredicate<Exception>() {

@Override
@@ -105,5 +127,7 @@ public String explainFailure() throws Exception {
return "Still have " + getScannersCount() + " scanners opened";
}
});
latch.await();
assertEquals(1, onNextCounter.get());
}
}