Skip to content

Commit

Permalink
HBASE-26784 Addendum: Close scanner request should properly inherit o…
Browse files Browse the repository at this point in the history
…riginal timeout and priority
  • Loading branch information
bbeaudreault committed Mar 8, 2022
1 parent 5bae04e commit 53402aa
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -312,13 +312,19 @@ private void close() {
ScanRequest request =
RequestConverter.buildScanRequest(this.scannerId, 0, true, this.scanMetrics != null);
HBaseRpcController controller = rpcControllerFactory.newController();
// pull in the original priority, but then try to set to HIGH.
// it will take whatever is highest.
controller.setPriority(controller.getPriority());
controller.setPriority(HConstants.HIGH_QOS);
if (controller.hasCallTimeout()) {
controller.setCallTimeout(controller.getCallTimeout());

// Set fields from the original controller onto the close-specific controller
// We set the timeout and the priority -- we will overwrite the priority to HIGH
// below, but the controller will take whichever is highest.
if (getRpcController() instanceof HBaseRpcController) {
HBaseRpcController original = (HBaseRpcController) getRpcController();
controller.setPriority(original.getPriority());
if (original.hasCallTimeout()) {
controller.setCallTimeout(original.getCallTimeout());
}
}
controller.setPriority(HConstants.HIGH_QOS);

try {
getStub().scan(controller, request);
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -262,4 +262,11 @@ public synchronized void notifyOnCancel(RpcCallback<Object> callback, Cancellati
action.run(false);
}
}

@Override public String toString() {
return "HBaseRpcControllerImpl{" + "callTimeout=" + callTimeout + ", done=" + done
+ ", cancelled=" + cancelled + ", cancellationCbs=" + cancellationCbs + ", exception="
+ exception + ", regionInfo=" + regionInfo + ", priority=" + priority + ", cellScanner="
+ cellScanner + '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,20 +32,19 @@
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;

import java.io.IOException;
import java.util.Arrays;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellBuilderFactory;
import org.apache.hadoop.hbase.CellBuilderType;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.ServerName;
Expand Down Expand Up @@ -74,6 +73,8 @@
*/
@Category({ ClientTests.class, MediumTests.class })
public class TestTableRpcPriority {
private static final int RPC_TIMEOUT = 600000;

@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestTableRpcPriority.class);
Expand All @@ -89,6 +90,10 @@ public void setUp() throws IOException, ServiceException {
stub = mock(ClientProtos.ClientService.BlockingInterface.class);

Configuration conf = HBaseConfiguration.create();
// we set this so that we can test that it gets set into the rpc controller in the tests below
conf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, RPC_TIMEOUT);
conf.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, RPC_TIMEOUT);

ExecutorService executorService = Executors.newCachedThreadPool();
conn = new ConnectionImplementation(conf, executorService,
UserProvider.instantiate(conf).getCurrent(), new DoNothingConnectionRegistry(conf)) {
Expand Down Expand Up @@ -122,6 +127,16 @@ public void testScan() throws Exception {
testForTable(TableName.valueOf(name.getMethodName()), Optional.of(19));
}

/**
* This test verifies that our closeScanner request honors the original
* priority of the scan if it's greater than our expected HIGH_QOS for close calls.
*/
@Test
public void testScanSuperHighPriority() throws Exception {
mockScan(1000);
testForTable(TableName.valueOf(name.getMethodName()), Optional.of(1000));
}

@Test
public void testScanNormalTable() throws Exception {
mockScan(NORMAL_QOS);
Expand Down Expand Up @@ -153,11 +168,22 @@ private void testForTable(TableName tableName, Optional<Integer> priority) throw
// just verify that the calls happened. verification of priority occurred in the mocking
// open, next, then several renew lease
verify(stub, atLeast(3)).scan(any(), any(ClientProtos.ScanRequest.class));
verify(stub, times(1)).scan(any(), assertScannerCloseRequest());
verify(stub, times(1)).scan(
assertControllerArgs(Math.max(priority.orElse(0), HIGH_QOS)), assertScannerCloseRequest());
}

private void mockScan(int scanPriority) throws ServiceException {
int scannerId = 1;

doAnswer(new Answer<ClientProtos.ScanResponse>() {
@Override public ClientProtos.ScanResponse answer(InvocationOnMock invocation)
throws Throwable {
throw new IllegalArgumentException(
"Call not covered by explicit mock for arguments controller="
+ invocation.getArgument(0) + ", request=" + invocation.getArgument(1));
}
}).when(stub).scan(any(), any());

AtomicInteger scanNextCalled = new AtomicInteger(0);
doAnswer(new Answer<ClientProtos.ScanResponse>() {

Expand All @@ -182,7 +208,7 @@ public ClientProtos.ScanResponse answer(InvocationOnMock invocation)
return builder.setTtl(800).setMoreResultsInRegion(true).setMoreResults(true)
.addResults(ProtobufUtil.toResult(result)).build();
}
}).when(stub).scan(assertPriority(scanPriority), any(ClientProtos.ScanRequest.class));
}).when(stub).scan(assertControllerArgs(scanPriority), any());

doAnswer(new Answer<ClientProtos.ScanResponse>() {

Expand All @@ -197,15 +223,19 @@ public ClientProtos.ScanResponse answer(InvocationOnMock invocation)

return ClientProtos.ScanResponse.getDefaultInstance();
}
}).when(stub).scan(assertPriority(HIGH_QOS), assertScannerCloseRequest());
}).when(stub).scan(assertControllerArgs(Math.max(scanPriority, HIGH_QOS)),
assertScannerCloseRequest());
}

private HBaseRpcController assertPriority(int priority) {
private HBaseRpcController assertControllerArgs(int priority) {
return argThat(new ArgumentMatcher<HBaseRpcController>() {

@Override
public boolean matches(HBaseRpcController controller) {
return controller.getPriority() == priority;
// check specified priority, but also check that it has a timeout
// this ensures that our conversion from the base controller to the close-specific
// controller honored the original arguments.
return controller.getPriority() == priority && controller.hasCallTimeout();
}
});
}
Expand Down

0 comments on commit 53402aa

Please sign in to comment.