diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientAsyncPrefetchScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientAsyncPrefetchScanner.java index 769931b7083f..abd1267ffc4b 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientAsyncPrefetchScanner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientAsyncPrefetchScanner.java @@ -66,9 +66,11 @@ public ClientAsyncPrefetchScanner(Configuration configuration, Scan scan, TableN ClusterConnection connection, RpcRetryingCallerFactory rpcCallerFactory, RpcControllerFactory rpcControllerFactory, ExecutorService pool, int scanReadRpcTimeout, int scannerTimeout, int replicaCallTimeoutMicroSecondScan, - Map requestAttributes) throws IOException { + ConnectionConfiguration connectionConfiguration, Map requestAttributes) + throws IOException { super(configuration, scan, name, connection, rpcCallerFactory, rpcControllerFactory, pool, - scanReadRpcTimeout, scannerTimeout, replicaCallTimeoutMicroSecondScan, requestAttributes); + scanReadRpcTimeout, scannerTimeout, replicaCallTimeoutMicroSecondScan, + connectionConfiguration, requestAttributes); exceptionsQueue = new ConcurrentLinkedQueue<>(); final Context context = Context.current(); final Runnable runnable = context.wrap(new PrefetchRunnable()); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java index 33cfedc362ad..ef8e4b0404f6 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java @@ -33,7 +33,6 @@ import org.apache.commons.lang3.mutable.MutableBoolean; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.DoNotRetryIOException; -import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.NotServingRegionException; import org.apache.hadoop.hbase.TableName; @@ -78,6 +77,7 @@ public abstract class ClientScanner extends AbstractClientScanner { protected final TableName tableName; protected final int readRpcTimeout; protected final int scannerTimeout; + private final boolean useScannerTimeoutForNextCalls; protected boolean scanMetricsPublished = false; protected RpcRetryingCaller caller; protected RpcControllerFactory rpcControllerFactory; @@ -104,7 +104,8 @@ public abstract class ClientScanner extends AbstractClientScanner { public ClientScanner(final Configuration conf, final Scan scan, final TableName tableName, ClusterConnection connection, RpcRetryingCallerFactory rpcFactory, RpcControllerFactory controllerFactory, ExecutorService pool, int scanReadRpcTimeout, - int scannerTimeout, int primaryOperationTimeout, Map requestAttributes) + int scannerTimeout, int primaryOperationTimeout, + ConnectionConfiguration connectionConfiguration, Map requestAttributes) throws IOException { if (LOG.isTraceEnabled()) { LOG.trace( @@ -116,16 +117,15 @@ public ClientScanner(final Configuration conf, final Scan scan, final TableName this.connection = connection; this.pool = pool; this.primaryOperationTimeout = primaryOperationTimeout; - this.retries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, - HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER); + this.retries = connectionConfiguration.getRetriesNumber(); if (scan.getMaxResultSize() > 0) { this.maxScannerResultSize = scan.getMaxResultSize(); } else { - this.maxScannerResultSize = conf.getLong(HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY, - HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE); + this.maxScannerResultSize = connectionConfiguration.getScannerMaxResultSize(); } this.readRpcTimeout = scanReadRpcTimeout; this.scannerTimeout = scannerTimeout; + this.useScannerTimeoutForNextCalls = connectionConfiguration.isUseScannerTimeoutForNextCalls(); this.requestAttributes = requestAttributes; // check if application wants to collect scan metrics @@ -135,8 +135,7 @@ public ClientScanner(final Configuration conf, final Scan scan, final TableName if (this.scan.getCaching() > 0) { this.caching = this.scan.getCaching(); } else { - this.caching = conf.getInt(HConstants.HBASE_CLIENT_SCANNER_CACHING, - HConstants.DEFAULT_HBASE_CLIENT_SCANNER_CACHING); + this.caching = connectionConfiguration.getScannerCaching(); } this.caller = rpcFactory. newCaller(); @@ -255,7 +254,7 @@ protected boolean moveToNextRegion() { this.currentRegion = null; this.callable = new ScannerCallableWithReplicas(getTable(), getConnection(), createScannerCallable(), pool, primaryOperationTimeout, scan, getRetries(), readRpcTimeout, - scannerTimeout, caching, conf, caller); + scannerTimeout, useScannerTimeoutForNextCalls, caching, conf, caller); this.callable.setCaching(this.caching); incRegionCountMetrics(scanMetrics); return true; diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSimpleScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSimpleScanner.java index 81091ad3010c..bde036f88806 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSimpleScanner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSimpleScanner.java @@ -39,9 +39,11 @@ public ClientSimpleScanner(Configuration configuration, Scan scan, TableName nam ClusterConnection connection, RpcRetryingCallerFactory rpcCallerFactory, RpcControllerFactory rpcControllerFactory, ExecutorService pool, int scanReadRpcTimeout, int scannerTimeout, int replicaCallTimeoutMicroSecondScan, - Map requestAttributes) throws IOException { + ConnectionConfiguration connectionConfiguration, Map requestAttributes) + throws IOException { super(configuration, scan, name, connection, rpcCallerFactory, rpcControllerFactory, pool, - scanReadRpcTimeout, scannerTimeout, replicaCallTimeoutMicroSecondScan, requestAttributes); + scanReadRpcTimeout, scannerTimeout, replicaCallTimeoutMicroSecondScan, + connectionConfiguration, requestAttributes); } @Override diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionConfiguration.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionConfiguration.java index 93fa2600d898..90515296ca86 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionConfiguration.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionConfiguration.java @@ -76,6 +76,11 @@ public class ConnectionConfiguration { public static final String HBASE_CLIENT_META_SCANNER_TIMEOUT = "hbase.client.meta.scanner.timeout.period"; + public static final String HBASE_CLIENT_USE_SCANNER_TIMEOUT_FOR_NEXT_CALLS = + "hbase.client.use.scanner.timeout.for.next.calls"; + + public static final boolean HBASE_CLIENT_USE_SCANNER_TIMEOUT_FOR_NEXT_CALLS_DEFAULT = false; + private final long writeBufferSize; private final long writeBufferPeriodicFlushTimeoutMs; private final long writeBufferPeriodicFlushTimerTickMs; @@ -99,6 +104,7 @@ public class ConnectionConfiguration { private final boolean clientScannerAsyncPrefetch; private final long pauseMs; private final long pauseMsForServerOverloaded; + private final boolean useScannerTimeoutForNextCalls; /** * Constructor @@ -158,6 +164,9 @@ public class ConnectionConfiguration { HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD); this.metaScanTimeout = conf.getInt(HBASE_CLIENT_META_SCANNER_TIMEOUT, scanTimeout); + this.useScannerTimeoutForNextCalls = + conf.getBoolean(HBASE_CLIENT_USE_SCANNER_TIMEOUT_FOR_NEXT_CALLS, + HBASE_CLIENT_USE_SCANNER_TIMEOUT_FOR_NEXT_CALLS_DEFAULT); long pauseMs = conf.getLong(HBASE_CLIENT_PAUSE, DEFAULT_HBASE_CLIENT_PAUSE); long pauseMsForServerOverloaded = conf.getLong(HBASE_CLIENT_PAUSE_FOR_SERVER_OVERLOADED, @@ -201,6 +210,7 @@ protected ConnectionConfiguration() { this.metaScanTimeout = scanTimeout; this.pauseMs = DEFAULT_HBASE_CLIENT_PAUSE; this.pauseMsForServerOverloaded = DEFAULT_HBASE_CLIENT_PAUSE; + this.useScannerTimeoutForNextCalls = HBASE_CLIENT_USE_SCANNER_TIMEOUT_FOR_NEXT_CALLS_DEFAULT; } public int getReadRpcTimeout() { @@ -275,6 +285,10 @@ public int getScanTimeout() { return scanTimeout; } + public boolean isUseScannerTimeoutForNextCalls() { + return useScannerTimeoutForNextCalls; + } + public int getMetaScanTimeout() { return metaScanTimeout; } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java index 80f7a7959f12..38ac866e97e5 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java @@ -1052,7 +1052,7 @@ private RegionLocations locateRegionInMeta(TableName tableName, byte[] row, bool ReversedClientScanner rcs = new ReversedClientScanner(conf, s, TableName.META_TABLE_NAME, this, rpcCallerFactory, rpcControllerFactory, getMetaLookupPool(), connectionConfig.getMetaReadRpcTimeout(), connectionConfig.getMetaScanTimeout(), - metaReplicaCallTimeoutScanInMicroSecond, Collections.emptyMap())) { + metaReplicaCallTimeoutScanInMicroSecond, connectionConfig, Collections.emptyMap())) { boolean tableNotFound = true; for (;;) { Result regionInfoRow = rcs.next(); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java index cc24d80f5ed8..386a7db3526e 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java @@ -321,16 +321,16 @@ public ResultScanner getScanner(Scan scan) throws IOException { if (scan.isReversed()) { return new ReversedClientScanner(getConfiguration(), scan, getName(), connection, rpcCallerFactory, rpcControllerFactory, pool, scanReadRpcTimeout, scanTimeout, - replicaTimeout, requestAttributes); + replicaTimeout, connConfiguration, requestAttributes); } else { if (async) { return new ClientAsyncPrefetchScanner(getConfiguration(), scan, getName(), connection, rpcCallerFactory, rpcControllerFactory, pool, scanReadRpcTimeout, scanTimeout, - replicaTimeout, requestAttributes); + replicaTimeout, connConfiguration, requestAttributes); } else { return new ClientSimpleScanner(getConfiguration(), scan, getName(), connection, rpcCallerFactory, rpcControllerFactory, pool, scanReadRpcTimeout, scanTimeout, - replicaTimeout, requestAttributes); + replicaTimeout, connConfiguration, requestAttributes); } } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedClientScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedClientScanner.java index 68a8e7b74067..36bbdb5b60e4 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedClientScanner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedClientScanner.java @@ -40,10 +40,12 @@ public class ReversedClientScanner extends ClientScanner { public ReversedClientScanner(Configuration conf, Scan scan, TableName tableName, ClusterConnection connection, RpcRetryingCallerFactory rpcFactory, RpcControllerFactory controllerFactory, ExecutorService pool, int scanReadRpcTimeout, - int scannerTimeout, int primaryOperationTimeout, Map requestAttributes) + int scannerTimeout, int primaryOperationTimeout, + ConnectionConfiguration connectionConfiguration, Map requestAttributes) throws IOException { super(conf, scan, tableName, connection, rpcFactory, controllerFactory, pool, - scanReadRpcTimeout, scannerTimeout, primaryOperationTimeout, requestAttributes); + scanReadRpcTimeout, scannerTimeout, primaryOperationTimeout, connectionConfiguration, + requestAttributes); } @Override diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java index 227ad849c846..2927216f3246 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java @@ -57,6 +57,7 @@ class ScannerCallableWithReplicas implements RetryingCallable { AtomicBoolean replicaSwitched = new AtomicBoolean(false); private final ClusterConnection cConnection; protected final ExecutorService pool; + private final boolean useScannerTimeoutForNextCalls; protected final int timeBeforeReplicas; private final Scan scan; private final int retries; @@ -72,11 +73,12 @@ class ScannerCallableWithReplicas implements RetryingCallable { public ScannerCallableWithReplicas(TableName tableName, ClusterConnection cConnection, ScannerCallable baseCallable, ExecutorService pool, int timeBeforeReplicas, Scan scan, - int retries, int readRpcTimeout, int scannerTimeout, int caching, Configuration conf, - RpcRetryingCaller caller) { + int retries, int readRpcTimeout, int scannerTimeout, boolean useScannerTimeoutForNextCalls, + int caching, Configuration conf, RpcRetryingCaller caller) { this.currentScannerCallable = baseCallable; this.cConnection = cConnection; this.pool = pool; + this.useScannerTimeoutForNextCalls = useScannerTimeoutForNextCalls; if (timeBeforeReplicas < 0) { throw new IllegalArgumentException("Invalid value of operation timeout on the primary"); } @@ -187,9 +189,12 @@ public Result[] call(int timeout) throws IOException { pool, regionReplication * 5); AtomicBoolean done = new AtomicBoolean(false); + // make sure we use the same rpcTimeout for current and other replicas + int rpcTimeoutForCall = getRpcTimeout(); + replicaSwitched.set(false); // submit call for the primary replica or user specified replica - addCallsForCurrentReplica(cs); + addCallsForCurrentReplica(cs, rpcTimeoutForCall); int startIndex = 0; try { @@ -234,7 +239,7 @@ public Result[] call(int timeout) throws IOException { endIndex = 1; } else { // TODO: this may be an overkill for large region replication - addCallsForOtherReplicas(cs, 0, regionReplication - 1); + addCallsForOtherReplicas(cs, 0, regionReplication - 1, rpcTimeoutForCall); } try { @@ -326,15 +331,34 @@ public Cursor getCursor() { return currentScannerCallable != null ? currentScannerCallable.getCursor() : null; } - private void - addCallsForCurrentReplica(ResultBoundedCompletionService> cs) { + private void addCallsForCurrentReplica( + ResultBoundedCompletionService> cs, int rpcTimeout) { RetryingRPC retryingOnReplica = new RetryingRPC(currentScannerCallable); outstandingCallables.add(currentScannerCallable); - cs.submit(retryingOnReplica, readRpcTimeout, scannerTimeout, currentScannerCallable.id); + cs.submit(retryingOnReplica, rpcTimeout, scannerTimeout, currentScannerCallable.id); + } + + /** + * As we have a call sequence for scan, it is useless to have a different rpc timeout which is + * less than the scan timeout. If the server does not respond in time(usually this will not happen + * as we have heartbeat now), we will get an OutOfOrderScannerNextException when resending the + * next request and the only way to fix this is to close the scanner and open a new one. The + * legacy behavior of ScannerCallable has been to use readRpcTimeout despite the above. If using + * legacy behavior, we always use that. If new behavior is enabled, we determine the rpc timeout + * to use based on whether the scanner is open. If scanner is open, use scannerTimeout otherwise + * use readRpcTimeout. + */ + private int getRpcTimeout() { + if (useScannerTimeoutForNextCalls) { + return currentScannerCallable.scannerId == -1 ? readRpcTimeout : scannerTimeout; + } else { + return readRpcTimeout; + } } private void addCallsForOtherReplicas( - ResultBoundedCompletionService> cs, int min, int max) { + ResultBoundedCompletionService> cs, int min, int max, + int rpcTimeout) { for (int id = min; id <= max; id++) { if (currentScannerCallable.id == id) { @@ -344,7 +368,7 @@ private void addCallsForOtherReplicas( setStartRowForReplicaCallable(s); outstandingCallables.add(s); RetryingRPC retryingOnReplica = new RetryingRPC(s); - cs.submit(retryingOnReplica, readRpcTimeout, scannerTimeout, id); + cs.submit(retryingOnReplica, rpcTimeout, scannerTimeout, id); } } diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientScanner.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientScanner.java index 0025a4fdbdb4..9b5eb91bbd5c 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientScanner.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientScanner.java @@ -108,12 +108,12 @@ private static class MockClientScanner extends ClientSimpleScanner { public MockClientScanner(final Configuration conf, final Scan scan, final TableName tableName, ClusterConnection connection, RpcRetryingCallerFactory rpcFactory, - RpcControllerFactory controllerFactory, ExecutorService pool, int primaryOperationTimeout) - throws IOException { + RpcControllerFactory controllerFactory, ExecutorService pool, int primaryOperationTimeout, + ConnectionConfiguration connectionConfig) throws IOException { super(conf, scan, tableName, connection, rpcFactory, controllerFactory, pool, HConstants.DEFAULT_HBASE_RPC_TIMEOUT, HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, primaryOperationTimeout, - Collections.emptyMap()); + connectionConfig, Collections.emptyMap()); } @Override @@ -178,7 +178,7 @@ public Result[] answer(InvocationOnMock invocation) throws Throwable { try (MockClientScanner scanner = new MockClientScanner(conf, scan, TableName.valueOf(name.getMethodName()), clusterConn, - rpcFactory, controllerFactory, pool, Integer.MAX_VALUE)) { + rpcFactory, controllerFactory, pool, Integer.MAX_VALUE, connectionConfig)) { scanner.setRpcFinished(true); @@ -242,7 +242,7 @@ public Result[] answer(InvocationOnMock invocation) throws Throwable { try (MockClientScanner scanner = new MockClientScanner(conf, scan, TableName.valueOf(name.getMethodName()), clusterConn, - rpcFactory, controllerFactory, pool, Integer.MAX_VALUE)) { + rpcFactory, controllerFactory, pool, Integer.MAX_VALUE, connectionConfig)) { InOrder inOrder = Mockito.inOrder(caller); scanner.loadCache(); @@ -305,7 +305,7 @@ public Result[] answer(InvocationOnMock invocation) throws Throwable { try (MockClientScanner scanner = new MockClientScanner(conf, scan, TableName.valueOf(name.getMethodName()), clusterConn, - rpcFactory, controllerFactory, pool, Integer.MAX_VALUE)) { + rpcFactory, controllerFactory, pool, Integer.MAX_VALUE, connectionConfig)) { InOrder inOrder = Mockito.inOrder(caller); scanner.loadCache(); @@ -376,7 +376,7 @@ public Result[] answer(InvocationOnMock invocation) throws Throwable { try (MockClientScanner scanner = new MockClientScanner(conf, scan, TableName.valueOf(name.getMethodName()), clusterConn, - rpcFactory, controllerFactory, pool, Integer.MAX_VALUE)) { + rpcFactory, controllerFactory, pool, Integer.MAX_VALUE, connectionConfig)) { scanner.setRpcFinished(true); InOrder inOrder = Mockito.inOrder(caller); @@ -443,7 +443,7 @@ public Result[] answer(InvocationOnMock invocation) throws Throwable { try (MockClientScanner scanner = new MockClientScanner(conf, scan, TableName.valueOf(name.getMethodName()), clusterConn, - rpcFactory, controllerFactory, pool, Integer.MAX_VALUE)) { + rpcFactory, controllerFactory, pool, Integer.MAX_VALUE, connectionConfig)) { InOrder inOrder = Mockito.inOrder(caller); scanner.setRpcFinished(true); @@ -488,7 +488,7 @@ public void testExceptionsFromReplicasArePropagated() throws IOException { try (MockClientScanner scanner = new MockClientScanner(conf, scan, TableName.valueOf(name.getMethodName()), clusterConn, - rpcFactory, new RpcControllerFactory(conf), pool, Integer.MAX_VALUE)) { + rpcFactory, new RpcControllerFactory(conf), pool, Integer.MAX_VALUE, connectionConfig)) { Iterator iter = scanner.iterator(); while (iter.hasNext()) { iter.next(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientScannerTimeouts.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientScannerTimeouts.java index 2bff2297ff52..7595ff5f9614 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientScannerTimeouts.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientScannerTimeouts.java @@ -19,6 +19,7 @@ import static org.apache.hadoop.hbase.client.ConnectionConfiguration.HBASE_CLIENT_META_READ_RPC_TIMEOUT_KEY; import static org.apache.hadoop.hbase.client.ConnectionConfiguration.HBASE_CLIENT_META_SCANNER_TIMEOUT; +import static org.apache.hadoop.hbase.client.ConnectionConfiguration.HBASE_CLIENT_USE_SCANNER_TIMEOUT_FOR_NEXT_CALLS; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -101,6 +102,9 @@ public static void setUpBeforeClass() throws Exception { conf.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, scanTimeout); conf.setInt(HBASE_CLIENT_META_READ_RPC_TIMEOUT_KEY, metaScanTimeout); conf.setInt(HBASE_CLIENT_META_SCANNER_TIMEOUT, metaScanTimeout); + // set to true by default here. it only affects next() calls, and we'll explicitly + // set it to false in one of the tests below to test legacy behavior for next call + conf.setBoolean(HBASE_CLIENT_USE_SCANNER_TIMEOUT_FOR_NEXT_CALLS, true); ASYNC_CONN = ConnectionFactory.createAsyncConnection(conf).get(); CONN = ConnectionFactory.createConnection(conf); } @@ -174,18 +178,27 @@ public void testRetryOutOfOrderScannerNextExceptionAsync() throws IOException { * timed out next() call and mess up the test. */ @Test - public void testNormalScanTimeoutOnNext() throws IOException { + public void testNormalScanTimeoutOnNextWithLegacyMode() throws IOException { setup(false); - // Unlike AsyncTable, Table's ResultScanner.next() call uses rpcTimeout and - // will retry until scannerTimeout. This makes it more difficult to test the timeouts - // of normal next() calls. So we use a separate connection here which has retries disabled. Configuration confNoRetries = new Configuration(CONN.getConfiguration()); + // Disable scanner timeout usage for next calls on this special connection. This way we can + // verify that legacy connections use the rpcTimeout rather than scannerTimeout + confNoRetries + .setBoolean(ConnectionConfiguration.HBASE_CLIENT_USE_SCANNER_TIMEOUT_FOR_NEXT_CALLS, false); confNoRetries.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 0); try (Connection conn = ConnectionFactory.createConnection(confNoRetries)) { + // Now since we disabled HBASE_CLIENT_USE_SCANNER_TIMEOUT_FOR_NEXT_CALLS, verify rpcTimeout expectTimeoutOnNext(rpcTimeout, () -> getScanner(conn)); } } + @Test + public void testNormalScanTimeoutOnNextWithScannerTimeoutEnabled() throws IOException { + setup(false); + // Since this has HBASE_CLIENT_USE_SCANNER_TIMEOUT_FOR_NEXT_CALLS enabled, we pass scanTimeout + expectTimeoutOnNext(scanTimeout, this::getScanner); + } + /** * AsyncTable version of above */