diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java index e6c758379b0f..8a80b6a25c91 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java @@ -312,13 +312,19 @@ private void close() { ScanRequest request = RequestConverter.buildScanRequest(this.scannerId, 0, true, this.scanMetrics != null); HBaseRpcController controller = rpcControllerFactory.newController(); - // pull in the original priority, but then try to set to HIGH. - // it will take whatever is highest. - controller.setPriority(controller.getPriority()); - controller.setPriority(HConstants.HIGH_QOS); - if (controller.hasCallTimeout()) { - controller.setCallTimeout(controller.getCallTimeout()); + + // Set fields from the original controller onto the close-specific controller + // We set the timeout and the priority -- we will overwrite the priority to HIGH + // below, but the controller will take whichever is highest. + if (getRpcController() instanceof HBaseRpcController) { + HBaseRpcController original = (HBaseRpcController) getRpcController(); + controller.setPriority(original.getPriority()); + if (original.hasCallTimeout()) { + controller.setCallTimeout(original.getCallTimeout()); + } } + controller.setPriority(HConstants.HIGH_QOS); + try { getStub().scan(controller, request); } catch (Exception e) { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcControllerImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcControllerImpl.java index 7dde67c38e1c..3058247db48d 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcControllerImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcControllerImpl.java @@ -262,4 +262,11 @@ public synchronized void notifyOnCancel(RpcCallback callback, Cancellati action.run(false); } } + + @Override public String toString() { + return "HBaseRpcControllerImpl{" + "callTimeout=" + callTimeout + ", done=" + done + + ", cancelled=" + cancelled + ", cancellationCbs=" + cancellationCbs + ", exception=" + + exception + ", regionInfo=" + regionInfo + ", priority=" + priority + ", cellScanner=" + + cellScanner + '}'; + } } diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestTableRpcPriority.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestTableRpcPriority.java index 7aef284ed7c6..e0416f9aeaa8 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestTableRpcPriority.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestTableRpcPriority.java @@ -32,20 +32,19 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; - import java.io.IOException; import java.util.Arrays; import java.util.Optional; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicInteger; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellBuilderFactory; import org.apache.hadoop.hbase.CellBuilderType; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.RegionLocations; import org.apache.hadoop.hbase.ServerName; @@ -74,6 +73,8 @@ */ @Category({ ClientTests.class, MediumTests.class }) public class TestTableRpcPriority { + private static final int RPC_TIMEOUT = 600000; + @ClassRule public static final HBaseClassTestRule CLASS_RULE = HBaseClassTestRule.forClass(TestTableRpcPriority.class); @@ -89,6 +90,10 @@ public void setUp() throws IOException, ServiceException { stub = mock(ClientProtos.ClientService.BlockingInterface.class); Configuration conf = HBaseConfiguration.create(); + // we set this so that we can test that it gets set into the rpc controller in the tests below + conf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, RPC_TIMEOUT); + conf.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, RPC_TIMEOUT); + ExecutorService executorService = Executors.newCachedThreadPool(); conn = new ConnectionImplementation(conf, executorService, UserProvider.instantiate(conf).getCurrent(), new DoNothingConnectionRegistry(conf)) { @@ -122,6 +127,16 @@ public void testScan() throws Exception { testForTable(TableName.valueOf(name.getMethodName()), Optional.of(19)); } + /** + * This test verifies that our closeScanner request honors the original + * priority of the scan if it's greater than our expected HIGH_QOS for close calls. + */ + @Test + public void testScanSuperHighPriority() throws Exception { + mockScan(1000); + testForTable(TableName.valueOf(name.getMethodName()), Optional.of(1000)); + } + @Test public void testScanNormalTable() throws Exception { mockScan(NORMAL_QOS); @@ -153,11 +168,22 @@ private void testForTable(TableName tableName, Optional priority) throw // just verify that the calls happened. verification of priority occurred in the mocking // open, next, then several renew lease verify(stub, atLeast(3)).scan(any(), any(ClientProtos.ScanRequest.class)); - verify(stub, times(1)).scan(any(), assertScannerCloseRequest()); + verify(stub, times(1)).scan( + assertControllerArgs(Math.max(priority.orElse(0), HIGH_QOS)), assertScannerCloseRequest()); } private void mockScan(int scanPriority) throws ServiceException { int scannerId = 1; + + doAnswer(new Answer() { + @Override public ClientProtos.ScanResponse answer(InvocationOnMock invocation) + throws Throwable { + throw new IllegalArgumentException( + "Call not covered by explicit mock for arguments controller=" + + invocation.getArgument(0) + ", request=" + invocation.getArgument(1)); + } + }).when(stub).scan(any(), any()); + AtomicInteger scanNextCalled = new AtomicInteger(0); doAnswer(new Answer() { @@ -182,7 +208,7 @@ public ClientProtos.ScanResponse answer(InvocationOnMock invocation) return builder.setTtl(800).setMoreResultsInRegion(true).setMoreResults(true) .addResults(ProtobufUtil.toResult(result)).build(); } - }).when(stub).scan(assertPriority(scanPriority), any(ClientProtos.ScanRequest.class)); + }).when(stub).scan(assertControllerArgs(scanPriority), any()); doAnswer(new Answer() { @@ -197,15 +223,19 @@ public ClientProtos.ScanResponse answer(InvocationOnMock invocation) return ClientProtos.ScanResponse.getDefaultInstance(); } - }).when(stub).scan(assertPriority(HIGH_QOS), assertScannerCloseRequest()); + }).when(stub).scan(assertControllerArgs(Math.max(scanPriority, HIGH_QOS)), + assertScannerCloseRequest()); } - private HBaseRpcController assertPriority(int priority) { + private HBaseRpcController assertControllerArgs(int priority) { return argThat(new ArgumentMatcher() { @Override public boolean matches(HBaseRpcController controller) { - return controller.getPriority() == priority; + // check specified priority, but also check that it has a timeout + // this ensures that our conversion from the base controller to the close-specific + // controller honored the original arguments. + return controller.getPriority() == priority && controller.hasCallTimeout(); } }); }