From 716a19c19d6ae95825b4107836d1b0a0b6e9ab02 Mon Sep 17 00:00:00 2001 From: Bryan Beaudreault Date: Thu, 14 Sep 2023 10:53:25 -0400 Subject: [PATCH 1/3] HBASE-28085 Configurably use scanner timeout as rpc timeout for scanner next calls --- .../client/ClientAsyncPrefetchScanner.java | 6 ++- .../hadoop/hbase/client/ClientScanner.java | 17 ++++--- .../hbase/client/ClientSimpleScanner.java | 6 ++- .../hbase/client/ConnectionConfiguration.java | 14 ++++++ .../client/ConnectionImplementation.java | 2 +- .../apache/hadoop/hbase/client/HTable.java | 6 +-- .../hbase/client/ReversedClientScanner.java | 6 ++- .../client/ScannerCallableWithReplicas.java | 44 +++++++++++++++---- .../hbase/client/TestClientScanner.java | 18 ++++---- .../client/TestClientScannerTimeouts.java | 21 +++++++-- 10 files changed, 99 insertions(+), 41 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientAsyncPrefetchScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientAsyncPrefetchScanner.java index 769931b7083f..abd1267ffc4b 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientAsyncPrefetchScanner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientAsyncPrefetchScanner.java @@ -66,9 +66,11 @@ public ClientAsyncPrefetchScanner(Configuration configuration, Scan scan, TableN ClusterConnection connection, RpcRetryingCallerFactory rpcCallerFactory, RpcControllerFactory rpcControllerFactory, ExecutorService pool, int scanReadRpcTimeout, int scannerTimeout, int replicaCallTimeoutMicroSecondScan, - Map requestAttributes) throws IOException { + ConnectionConfiguration connectionConfiguration, Map requestAttributes) + throws IOException { super(configuration, scan, name, connection, rpcCallerFactory, rpcControllerFactory, pool, - scanReadRpcTimeout, scannerTimeout, replicaCallTimeoutMicroSecondScan, requestAttributes); + scanReadRpcTimeout, scannerTimeout, replicaCallTimeoutMicroSecondScan, + connectionConfiguration, requestAttributes); exceptionsQueue = new ConcurrentLinkedQueue<>(); final Context context = Context.current(); final Runnable runnable = context.wrap(new PrefetchRunnable()); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java index 33cfedc362ad..ef8e4b0404f6 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java @@ -33,7 +33,6 @@ import org.apache.commons.lang3.mutable.MutableBoolean; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.DoNotRetryIOException; -import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.NotServingRegionException; import org.apache.hadoop.hbase.TableName; @@ -78,6 +77,7 @@ public abstract class ClientScanner extends AbstractClientScanner { protected final TableName tableName; protected final int readRpcTimeout; protected final int scannerTimeout; + private final boolean useScannerTimeoutForNextCalls; protected boolean scanMetricsPublished = false; protected RpcRetryingCaller caller; protected RpcControllerFactory rpcControllerFactory; @@ -104,7 +104,8 @@ public abstract class ClientScanner extends AbstractClientScanner { public ClientScanner(final Configuration conf, final Scan scan, final TableName tableName, ClusterConnection connection, RpcRetryingCallerFactory rpcFactory, RpcControllerFactory controllerFactory, ExecutorService pool, int scanReadRpcTimeout, - int scannerTimeout, int primaryOperationTimeout, Map requestAttributes) + int scannerTimeout, int primaryOperationTimeout, + ConnectionConfiguration connectionConfiguration, Map requestAttributes) throws IOException { if (LOG.isTraceEnabled()) { LOG.trace( @@ -116,16 +117,15 @@ public ClientScanner(final Configuration conf, final Scan scan, final TableName this.connection = connection; this.pool = pool; this.primaryOperationTimeout = primaryOperationTimeout; - this.retries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, - HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER); + this.retries = connectionConfiguration.getRetriesNumber(); if (scan.getMaxResultSize() > 0) { this.maxScannerResultSize = scan.getMaxResultSize(); } else { - this.maxScannerResultSize = conf.getLong(HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY, - HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE); + this.maxScannerResultSize = connectionConfiguration.getScannerMaxResultSize(); } this.readRpcTimeout = scanReadRpcTimeout; this.scannerTimeout = scannerTimeout; + this.useScannerTimeoutForNextCalls = connectionConfiguration.isUseScannerTimeoutForNextCalls(); this.requestAttributes = requestAttributes; // check if application wants to collect scan metrics @@ -135,8 +135,7 @@ public ClientScanner(final Configuration conf, final Scan scan, final TableName if (this.scan.getCaching() > 0) { this.caching = this.scan.getCaching(); } else { - this.caching = conf.getInt(HConstants.HBASE_CLIENT_SCANNER_CACHING, - HConstants.DEFAULT_HBASE_CLIENT_SCANNER_CACHING); + this.caching = connectionConfiguration.getScannerCaching(); } this.caller = rpcFactory. newCaller(); @@ -255,7 +254,7 @@ protected boolean moveToNextRegion() { this.currentRegion = null; this.callable = new ScannerCallableWithReplicas(getTable(), getConnection(), createScannerCallable(), pool, primaryOperationTimeout, scan, getRetries(), readRpcTimeout, - scannerTimeout, caching, conf, caller); + scannerTimeout, useScannerTimeoutForNextCalls, caching, conf, caller); this.callable.setCaching(this.caching); incRegionCountMetrics(scanMetrics); return true; diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSimpleScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSimpleScanner.java index 81091ad3010c..bde036f88806 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSimpleScanner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSimpleScanner.java @@ -39,9 +39,11 @@ public ClientSimpleScanner(Configuration configuration, Scan scan, TableName nam ClusterConnection connection, RpcRetryingCallerFactory rpcCallerFactory, RpcControllerFactory rpcControllerFactory, ExecutorService pool, int scanReadRpcTimeout, int scannerTimeout, int replicaCallTimeoutMicroSecondScan, - Map requestAttributes) throws IOException { + ConnectionConfiguration connectionConfiguration, Map requestAttributes) + throws IOException { super(configuration, scan, name, connection, rpcCallerFactory, rpcControllerFactory, pool, - scanReadRpcTimeout, scannerTimeout, replicaCallTimeoutMicroSecondScan, requestAttributes); + scanReadRpcTimeout, scannerTimeout, replicaCallTimeoutMicroSecondScan, + connectionConfiguration, requestAttributes); } @Override diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionConfiguration.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionConfiguration.java index 93fa2600d898..90515296ca86 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionConfiguration.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionConfiguration.java @@ -76,6 +76,11 @@ public class ConnectionConfiguration { public static final String HBASE_CLIENT_META_SCANNER_TIMEOUT = "hbase.client.meta.scanner.timeout.period"; + public static final String HBASE_CLIENT_USE_SCANNER_TIMEOUT_FOR_NEXT_CALLS = + "hbase.client.use.scanner.timeout.for.next.calls"; + + public static final boolean HBASE_CLIENT_USE_SCANNER_TIMEOUT_FOR_NEXT_CALLS_DEFAULT = false; + private final long writeBufferSize; private final long writeBufferPeriodicFlushTimeoutMs; private final long writeBufferPeriodicFlushTimerTickMs; @@ -99,6 +104,7 @@ public class ConnectionConfiguration { private final boolean clientScannerAsyncPrefetch; private final long pauseMs; private final long pauseMsForServerOverloaded; + private final boolean useScannerTimeoutForNextCalls; /** * Constructor @@ -158,6 +164,9 @@ public class ConnectionConfiguration { HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD); this.metaScanTimeout = conf.getInt(HBASE_CLIENT_META_SCANNER_TIMEOUT, scanTimeout); + this.useScannerTimeoutForNextCalls = + conf.getBoolean(HBASE_CLIENT_USE_SCANNER_TIMEOUT_FOR_NEXT_CALLS, + HBASE_CLIENT_USE_SCANNER_TIMEOUT_FOR_NEXT_CALLS_DEFAULT); long pauseMs = conf.getLong(HBASE_CLIENT_PAUSE, DEFAULT_HBASE_CLIENT_PAUSE); long pauseMsForServerOverloaded = conf.getLong(HBASE_CLIENT_PAUSE_FOR_SERVER_OVERLOADED, @@ -201,6 +210,7 @@ protected ConnectionConfiguration() { this.metaScanTimeout = scanTimeout; this.pauseMs = DEFAULT_HBASE_CLIENT_PAUSE; this.pauseMsForServerOverloaded = DEFAULT_HBASE_CLIENT_PAUSE; + this.useScannerTimeoutForNextCalls = HBASE_CLIENT_USE_SCANNER_TIMEOUT_FOR_NEXT_CALLS_DEFAULT; } public int getReadRpcTimeout() { @@ -275,6 +285,10 @@ public int getScanTimeout() { return scanTimeout; } + public boolean isUseScannerTimeoutForNextCalls() { + return useScannerTimeoutForNextCalls; + } + public int getMetaScanTimeout() { return metaScanTimeout; } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java index 80f7a7959f12..38ac866e97e5 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java @@ -1052,7 +1052,7 @@ private RegionLocations locateRegionInMeta(TableName tableName, byte[] row, bool ReversedClientScanner rcs = new ReversedClientScanner(conf, s, TableName.META_TABLE_NAME, this, rpcCallerFactory, rpcControllerFactory, getMetaLookupPool(), connectionConfig.getMetaReadRpcTimeout(), connectionConfig.getMetaScanTimeout(), - metaReplicaCallTimeoutScanInMicroSecond, Collections.emptyMap())) { + metaReplicaCallTimeoutScanInMicroSecond, connectionConfig, Collections.emptyMap())) { boolean tableNotFound = true; for (;;) { Result regionInfoRow = rcs.next(); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java index cc24d80f5ed8..386a7db3526e 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java @@ -321,16 +321,16 @@ public ResultScanner getScanner(Scan scan) throws IOException { if (scan.isReversed()) { return new ReversedClientScanner(getConfiguration(), scan, getName(), connection, rpcCallerFactory, rpcControllerFactory, pool, scanReadRpcTimeout, scanTimeout, - replicaTimeout, requestAttributes); + replicaTimeout, connConfiguration, requestAttributes); } else { if (async) { return new ClientAsyncPrefetchScanner(getConfiguration(), scan, getName(), connection, rpcCallerFactory, rpcControllerFactory, pool, scanReadRpcTimeout, scanTimeout, - replicaTimeout, requestAttributes); + replicaTimeout, connConfiguration, requestAttributes); } else { return new ClientSimpleScanner(getConfiguration(), scan, getName(), connection, rpcCallerFactory, rpcControllerFactory, pool, scanReadRpcTimeout, scanTimeout, - replicaTimeout, requestAttributes); + replicaTimeout, connConfiguration, requestAttributes); } } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedClientScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedClientScanner.java index 68a8e7b74067..36bbdb5b60e4 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedClientScanner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedClientScanner.java @@ -40,10 +40,12 @@ public class ReversedClientScanner extends ClientScanner { public ReversedClientScanner(Configuration conf, Scan scan, TableName tableName, ClusterConnection connection, RpcRetryingCallerFactory rpcFactory, RpcControllerFactory controllerFactory, ExecutorService pool, int scanReadRpcTimeout, - int scannerTimeout, int primaryOperationTimeout, Map requestAttributes) + int scannerTimeout, int primaryOperationTimeout, + ConnectionConfiguration connectionConfiguration, Map requestAttributes) throws IOException { super(conf, scan, tableName, connection, rpcFactory, controllerFactory, pool, - scanReadRpcTimeout, scannerTimeout, primaryOperationTimeout, requestAttributes); + scanReadRpcTimeout, scannerTimeout, primaryOperationTimeout, connectionConfiguration, + requestAttributes); } @Override diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java index 227ad849c846..81331ca7379f 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java @@ -57,6 +57,7 @@ class ScannerCallableWithReplicas implements RetryingCallable { AtomicBoolean replicaSwitched = new AtomicBoolean(false); private final ClusterConnection cConnection; protected final ExecutorService pool; + private final boolean useScannerTimeoutForNextCalls; protected final int timeBeforeReplicas; private final Scan scan; private final int retries; @@ -72,11 +73,12 @@ class ScannerCallableWithReplicas implements RetryingCallable { public ScannerCallableWithReplicas(TableName tableName, ClusterConnection cConnection, ScannerCallable baseCallable, ExecutorService pool, int timeBeforeReplicas, Scan scan, - int retries, int readRpcTimeout, int scannerTimeout, int caching, Configuration conf, - RpcRetryingCaller caller) { + int retries, int readRpcTimeout, int scannerTimeout, boolean useScannerTimeoutForNextCalls, + int caching, Configuration conf, RpcRetryingCaller caller) { this.currentScannerCallable = baseCallable; this.cConnection = cConnection; this.pool = pool; + this.useScannerTimeoutForNextCalls = useScannerTimeoutForNextCalls; if (timeBeforeReplicas < 0) { throw new IllegalArgumentException("Invalid value of operation timeout on the primary"); } @@ -187,9 +189,12 @@ public Result[] call(int timeout) throws IOException { pool, regionReplication * 5); AtomicBoolean done = new AtomicBoolean(false); + // make sure we use the same rpcTimeout for current and other replicas + int rpcTimeoutForCall = getRpcTimeout(); + replicaSwitched.set(false); // submit call for the primary replica or user specified replica - addCallsForCurrentReplica(cs); + addCallsForCurrentReplica(cs, rpcTimeoutForCall); int startIndex = 0; try { @@ -234,7 +239,7 @@ public Result[] call(int timeout) throws IOException { endIndex = 1; } else { // TODO: this may be an overkill for large region replication - addCallsForOtherReplicas(cs, 0, regionReplication - 1); + addCallsForOtherReplicas(cs, 0, regionReplication - 1, rpcTimeoutForCall); } try { @@ -326,15 +331,36 @@ public Cursor getCursor() { return currentScannerCallable != null ? currentScannerCallable.getCursor() : null; } - private void - addCallsForCurrentReplica(ResultBoundedCompletionService> cs) { + private void addCallsForCurrentReplica( + ResultBoundedCompletionService> cs, int rpcTimeout) { RetryingRPC retryingOnReplica = new RetryingRPC(currentScannerCallable); outstandingCallables.add(currentScannerCallable); - cs.submit(retryingOnReplica, readRpcTimeout, scannerTimeout, currentScannerCallable.id); + cs.submit(retryingOnReplica, rpcTimeout, scannerTimeout, currentScannerCallable.id); + } + + /** + * As we have a call sequence for scan, it is useless to have a different rpc timeout which is + * less than the scan timeout. If the server does not respond in time(usually this will not happen + * as we have heartbeat now), we will get an OutOfOrderScannerNextException when resending the + * next request and the only way to fix this is to close the scanner and open a new one. + *

+ * The legacy behavior of ScannerCallable has been to use readRpcTimeout despite the above. If + * using legacy behavior, we always use that. + *

+ * If new behavior is enabled, we determine the rpc timeout to use based on whether the scanner is + * open. If scanner is open, use scannerTimeout otherwise use readRpcTimeout. + */ + private int getRpcTimeout() { + if (useScannerTimeoutForNextCalls) { + return currentScannerCallable.scannerId == -1 ? readRpcTimeout : scannerTimeout; + } else { + return readRpcTimeout; + } } private void addCallsForOtherReplicas( - ResultBoundedCompletionService> cs, int min, int max) { + ResultBoundedCompletionService> cs, int min, int max, + int rpcTimeout) { for (int id = min; id <= max; id++) { if (currentScannerCallable.id == id) { @@ -344,7 +370,7 @@ private void addCallsForOtherReplicas( setStartRowForReplicaCallable(s); outstandingCallables.add(s); RetryingRPC retryingOnReplica = new RetryingRPC(s); - cs.submit(retryingOnReplica, readRpcTimeout, scannerTimeout, id); + cs.submit(retryingOnReplica, rpcTimeout, scannerTimeout, id); } } diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientScanner.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientScanner.java index 0025a4fdbdb4..9b5eb91bbd5c 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientScanner.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientScanner.java @@ -108,12 +108,12 @@ private static class MockClientScanner extends ClientSimpleScanner { public MockClientScanner(final Configuration conf, final Scan scan, final TableName tableName, ClusterConnection connection, RpcRetryingCallerFactory rpcFactory, - RpcControllerFactory controllerFactory, ExecutorService pool, int primaryOperationTimeout) - throws IOException { + RpcControllerFactory controllerFactory, ExecutorService pool, int primaryOperationTimeout, + ConnectionConfiguration connectionConfig) throws IOException { super(conf, scan, tableName, connection, rpcFactory, controllerFactory, pool, HConstants.DEFAULT_HBASE_RPC_TIMEOUT, HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, primaryOperationTimeout, - Collections.emptyMap()); + connectionConfig, Collections.emptyMap()); } @Override @@ -178,7 +178,7 @@ public Result[] answer(InvocationOnMock invocation) throws Throwable { try (MockClientScanner scanner = new MockClientScanner(conf, scan, TableName.valueOf(name.getMethodName()), clusterConn, - rpcFactory, controllerFactory, pool, Integer.MAX_VALUE)) { + rpcFactory, controllerFactory, pool, Integer.MAX_VALUE, connectionConfig)) { scanner.setRpcFinished(true); @@ -242,7 +242,7 @@ public Result[] answer(InvocationOnMock invocation) throws Throwable { try (MockClientScanner scanner = new MockClientScanner(conf, scan, TableName.valueOf(name.getMethodName()), clusterConn, - rpcFactory, controllerFactory, pool, Integer.MAX_VALUE)) { + rpcFactory, controllerFactory, pool, Integer.MAX_VALUE, connectionConfig)) { InOrder inOrder = Mockito.inOrder(caller); scanner.loadCache(); @@ -305,7 +305,7 @@ public Result[] answer(InvocationOnMock invocation) throws Throwable { try (MockClientScanner scanner = new MockClientScanner(conf, scan, TableName.valueOf(name.getMethodName()), clusterConn, - rpcFactory, controllerFactory, pool, Integer.MAX_VALUE)) { + rpcFactory, controllerFactory, pool, Integer.MAX_VALUE, connectionConfig)) { InOrder inOrder = Mockito.inOrder(caller); scanner.loadCache(); @@ -376,7 +376,7 @@ public Result[] answer(InvocationOnMock invocation) throws Throwable { try (MockClientScanner scanner = new MockClientScanner(conf, scan, TableName.valueOf(name.getMethodName()), clusterConn, - rpcFactory, controllerFactory, pool, Integer.MAX_VALUE)) { + rpcFactory, controllerFactory, pool, Integer.MAX_VALUE, connectionConfig)) { scanner.setRpcFinished(true); InOrder inOrder = Mockito.inOrder(caller); @@ -443,7 +443,7 @@ public Result[] answer(InvocationOnMock invocation) throws Throwable { try (MockClientScanner scanner = new MockClientScanner(conf, scan, TableName.valueOf(name.getMethodName()), clusterConn, - rpcFactory, controllerFactory, pool, Integer.MAX_VALUE)) { + rpcFactory, controllerFactory, pool, Integer.MAX_VALUE, connectionConfig)) { InOrder inOrder = Mockito.inOrder(caller); scanner.setRpcFinished(true); @@ -488,7 +488,7 @@ public void testExceptionsFromReplicasArePropagated() throws IOException { try (MockClientScanner scanner = new MockClientScanner(conf, scan, TableName.valueOf(name.getMethodName()), clusterConn, - rpcFactory, new RpcControllerFactory(conf), pool, Integer.MAX_VALUE)) { + rpcFactory, new RpcControllerFactory(conf), pool, Integer.MAX_VALUE, connectionConfig)) { Iterator iter = scanner.iterator(); while (iter.hasNext()) { iter.next(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientScannerTimeouts.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientScannerTimeouts.java index 2bff2297ff52..7595ff5f9614 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientScannerTimeouts.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientScannerTimeouts.java @@ -19,6 +19,7 @@ import static org.apache.hadoop.hbase.client.ConnectionConfiguration.HBASE_CLIENT_META_READ_RPC_TIMEOUT_KEY; import static org.apache.hadoop.hbase.client.ConnectionConfiguration.HBASE_CLIENT_META_SCANNER_TIMEOUT; +import static org.apache.hadoop.hbase.client.ConnectionConfiguration.HBASE_CLIENT_USE_SCANNER_TIMEOUT_FOR_NEXT_CALLS; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -101,6 +102,9 @@ public static void setUpBeforeClass() throws Exception { conf.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, scanTimeout); conf.setInt(HBASE_CLIENT_META_READ_RPC_TIMEOUT_KEY, metaScanTimeout); conf.setInt(HBASE_CLIENT_META_SCANNER_TIMEOUT, metaScanTimeout); + // set to true by default here. it only affects next() calls, and we'll explicitly + // set it to false in one of the tests below to test legacy behavior for next call + conf.setBoolean(HBASE_CLIENT_USE_SCANNER_TIMEOUT_FOR_NEXT_CALLS, true); ASYNC_CONN = ConnectionFactory.createAsyncConnection(conf).get(); CONN = ConnectionFactory.createConnection(conf); } @@ -174,18 +178,27 @@ public void testRetryOutOfOrderScannerNextExceptionAsync() throws IOException { * timed out next() call and mess up the test. */ @Test - public void testNormalScanTimeoutOnNext() throws IOException { + public void testNormalScanTimeoutOnNextWithLegacyMode() throws IOException { setup(false); - // Unlike AsyncTable, Table's ResultScanner.next() call uses rpcTimeout and - // will retry until scannerTimeout. This makes it more difficult to test the timeouts - // of normal next() calls. So we use a separate connection here which has retries disabled. Configuration confNoRetries = new Configuration(CONN.getConfiguration()); + // Disable scanner timeout usage for next calls on this special connection. This way we can + // verify that legacy connections use the rpcTimeout rather than scannerTimeout + confNoRetries + .setBoolean(ConnectionConfiguration.HBASE_CLIENT_USE_SCANNER_TIMEOUT_FOR_NEXT_CALLS, false); confNoRetries.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 0); try (Connection conn = ConnectionFactory.createConnection(confNoRetries)) { + // Now since we disabled HBASE_CLIENT_USE_SCANNER_TIMEOUT_FOR_NEXT_CALLS, verify rpcTimeout expectTimeoutOnNext(rpcTimeout, () -> getScanner(conn)); } } + @Test + public void testNormalScanTimeoutOnNextWithScannerTimeoutEnabled() throws IOException { + setup(false); + // Since this has HBASE_CLIENT_USE_SCANNER_TIMEOUT_FOR_NEXT_CALLS enabled, we pass scanTimeout + expectTimeoutOnNext(scanTimeout, this::getScanner); + } + /** * AsyncTable version of above */ From 5912d272a7aa32aff7e7e38f26a1ebd1e88abbe4 Mon Sep 17 00:00:00 2001 From: Bryan Beaudreault Date: Tue, 19 Sep 2023 14:33:53 -0400 Subject: [PATCH 2/3] review fixes --- .../hbase/client/ConnectionConfiguration.java | 14 +-- .../client/TestClientScannerTimeouts.java | 102 +++++++++++------- 2 files changed, 73 insertions(+), 43 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionConfiguration.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionConfiguration.java index 90515296ca86..2a6651b5dde0 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionConfiguration.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionConfiguration.java @@ -76,10 +76,11 @@ public class ConnectionConfiguration { public static final String HBASE_CLIENT_META_SCANNER_TIMEOUT = "hbase.client.meta.scanner.timeout.period"; - public static final String HBASE_CLIENT_USE_SCANNER_TIMEOUT_FOR_NEXT_CALLS = - "hbase.client.use.scanner.timeout.for.next.calls"; + public static final String HBASE_CLIENT_USE_SCANNER_TIMEOUT_PERIOD_FOR_NEXT_CALLS = + "hbase.client.use.scanner.timeout.period.for.next.calls"; - public static final boolean HBASE_CLIENT_USE_SCANNER_TIMEOUT_FOR_NEXT_CALLS_DEFAULT = false; + public static final boolean HBASE_CLIENT_USE_SCANNER_TIMEOUT_PERIOD_FOR_NEXT_CALLS_DEFAULT = + false; private final long writeBufferSize; private final long writeBufferPeriodicFlushTimeoutMs; @@ -165,8 +166,8 @@ public class ConnectionConfiguration { this.metaScanTimeout = conf.getInt(HBASE_CLIENT_META_SCANNER_TIMEOUT, scanTimeout); this.useScannerTimeoutForNextCalls = - conf.getBoolean(HBASE_CLIENT_USE_SCANNER_TIMEOUT_FOR_NEXT_CALLS, - HBASE_CLIENT_USE_SCANNER_TIMEOUT_FOR_NEXT_CALLS_DEFAULT); + conf.getBoolean(HBASE_CLIENT_USE_SCANNER_TIMEOUT_PERIOD_FOR_NEXT_CALLS, + HBASE_CLIENT_USE_SCANNER_TIMEOUT_PERIOD_FOR_NEXT_CALLS_DEFAULT); long pauseMs = conf.getLong(HBASE_CLIENT_PAUSE, DEFAULT_HBASE_CLIENT_PAUSE); long pauseMsForServerOverloaded = conf.getLong(HBASE_CLIENT_PAUSE_FOR_SERVER_OVERLOADED, @@ -210,7 +211,8 @@ protected ConnectionConfiguration() { this.metaScanTimeout = scanTimeout; this.pauseMs = DEFAULT_HBASE_CLIENT_PAUSE; this.pauseMsForServerOverloaded = DEFAULT_HBASE_CLIENT_PAUSE; - this.useScannerTimeoutForNextCalls = HBASE_CLIENT_USE_SCANNER_TIMEOUT_FOR_NEXT_CALLS_DEFAULT; + this.useScannerTimeoutForNextCalls = + HBASE_CLIENT_USE_SCANNER_TIMEOUT_PERIOD_FOR_NEXT_CALLS_DEFAULT; } public int getReadRpcTimeout() { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientScannerTimeouts.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientScannerTimeouts.java index 7595ff5f9614..c0fb94273c24 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientScannerTimeouts.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientScannerTimeouts.java @@ -19,17 +19,19 @@ import static org.apache.hadoop.hbase.client.ConnectionConfiguration.HBASE_CLIENT_META_READ_RPC_TIMEOUT_KEY; import static org.apache.hadoop.hbase.client.ConnectionConfiguration.HBASE_CLIENT_META_SCANNER_TIMEOUT; -import static org.apache.hadoop.hbase.client.ConnectionConfiguration.HBASE_CLIENT_USE_SCANNER_TIMEOUT_FOR_NEXT_CALLS; +import static org.apache.hadoop.hbase.client.ConnectionConfiguration.HBASE_CLIENT_USE_SCANNER_TIMEOUT_PERIOD_FOR_NEXT_CALLS; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import java.io.IOException; import java.net.SocketTimeoutException; +import java.util.Collection; import java.util.concurrent.TimeUnit; import java.util.function.Supplier; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseCommonTestingUtility; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.MiniHBaseCluster; @@ -42,13 +44,17 @@ import org.apache.hadoop.hbase.testclassification.ClientTests; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.util.Bytes; +import org.junit.After; import org.junit.AfterClass; +import org.junit.Before; import org.junit.BeforeClass; import org.junit.ClassRule; import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.rules.TestName; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -58,6 +64,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse; +@RunWith(Parameterized.class) @Category({ MediumTests.class, ClientTests.class }) public class TestClientScannerTimeouts { @@ -68,8 +75,8 @@ public class TestClientScannerTimeouts { private static final Logger LOG = LoggerFactory.getLogger(TestClientScannerTimeouts.class); private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); - private static AsyncConnection ASYNC_CONN; - private static Connection CONN; + private AsyncConnection ASYNC_CONN; + private Connection CONN; private static final byte[] FAMILY = Bytes.toBytes("testFamily"); private static final byte[] QUALIFIER = Bytes.toBytes("testQualifier"); private static final byte[] VALUE = Bytes.toBytes("testValue"); @@ -80,7 +87,8 @@ public class TestClientScannerTimeouts { private static final byte[] ROW3 = Bytes.toBytes("row-3"); private static final int rpcTimeout = 1000; private static final int scanTimeout = 3 * rpcTimeout; - private static final int metaScanTimeout = 6 * rpcTimeout; + private static final int metaReadRpcTimeout = 6 * rpcTimeout; + private static final int metaScanTimeout = 9 * rpcTimeout; private static final int CLIENT_RETRIES_NUMBER = 3; private static TableName tableName; @@ -88,6 +96,14 @@ public class TestClientScannerTimeouts { @Rule public TestName name = new TestName(); + @Parameterized.Parameter + public boolean useScannerTimeoutPeriodForNextCalls; + + @Parameterized.Parameters + public static Collection parameters() { + return HBaseCommonTestingUtility.BOOLEAN_PARAMETERIZED; + } + @BeforeClass public static void setUpBeforeClass() throws Exception { Configuration conf = TEST_UTIL.getConfiguration(); @@ -98,28 +114,38 @@ public static void setUpBeforeClass() throws Exception { conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, CLIENT_RETRIES_NUMBER); conf.setInt(HConstants.HBASE_CLIENT_PAUSE, 1000); TEST_UTIL.startMiniCluster(1); + } + @Before + public void setUp() throws Exception { + Configuration conf = new Configuration(TEST_UTIL.getConfiguration()); conf.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, scanTimeout); - conf.setInt(HBASE_CLIENT_META_READ_RPC_TIMEOUT_KEY, metaScanTimeout); + conf.setInt(HBASE_CLIENT_META_READ_RPC_TIMEOUT_KEY, metaReadRpcTimeout); conf.setInt(HBASE_CLIENT_META_SCANNER_TIMEOUT, metaScanTimeout); - // set to true by default here. it only affects next() calls, and we'll explicitly - // set it to false in one of the tests below to test legacy behavior for next call - conf.setBoolean(HBASE_CLIENT_USE_SCANNER_TIMEOUT_FOR_NEXT_CALLS, true); + conf.setBoolean(HBASE_CLIENT_USE_SCANNER_TIMEOUT_PERIOD_FOR_NEXT_CALLS, + useScannerTimeoutPeriodForNextCalls); ASYNC_CONN = ConnectionFactory.createAsyncConnection(conf).get(); CONN = ConnectionFactory.createConnection(conf); } - @AfterClass - public static void tearDownAfterClass() throws Exception { + @After + public void after() throws Exception { CONN.close(); ASYNC_CONN.close(); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { TEST_UTIL.shutdownMiniCluster(); } public void setup(boolean isSystemTable) throws IOException { RSRpcServicesWithScanTimeout.reset(); - String nameAsString = name.getMethodName(); + // parameterization adds non-alphanumeric chars to the method name. strip them so + // it parses as a table name + String nameAsString = name.getMethodName().replaceAll("[^a-zA-Z0-9]", "_") + "-" + + useScannerTimeoutPeriodForNextCalls; if (isSystemTable) { nameAsString = NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR + ":" + nameAsString; } @@ -172,31 +198,10 @@ public void testRetryOutOfOrderScannerNextExceptionAsync() throws IOException { expectRetryOutOfOrderScannerNext(this::getAsyncScanner); } - /** - * verify that we honor the {@link HConstants#HBASE_RPC_READ_TIMEOUT_KEY} for normal scans. Use a - * special connection which has retries disabled, because otherwise the scanner will retry the - * timed out next() call and mess up the test. - */ @Test - public void testNormalScanTimeoutOnNextWithLegacyMode() throws IOException { + public void testNormalScanTimeoutOnNext() throws IOException { setup(false); - Configuration confNoRetries = new Configuration(CONN.getConfiguration()); - // Disable scanner timeout usage for next calls on this special connection. This way we can - // verify that legacy connections use the rpcTimeout rather than scannerTimeout - confNoRetries - .setBoolean(ConnectionConfiguration.HBASE_CLIENT_USE_SCANNER_TIMEOUT_FOR_NEXT_CALLS, false); - confNoRetries.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 0); - try (Connection conn = ConnectionFactory.createConnection(confNoRetries)) { - // Now since we disabled HBASE_CLIENT_USE_SCANNER_TIMEOUT_FOR_NEXT_CALLS, verify rpcTimeout - expectTimeoutOnNext(rpcTimeout, () -> getScanner(conn)); - } - } - - @Test - public void testNormalScanTimeoutOnNextWithScannerTimeoutEnabled() throws IOException { - setup(false); - // Since this has HBASE_CLIENT_USE_SCANNER_TIMEOUT_FOR_NEXT_CALLS enabled, we pass scanTimeout - expectTimeoutOnNext(scanTimeout, this::getScanner); + testScanTimeoutOnNext(rpcTimeout, scanTimeout); } /** @@ -234,7 +239,30 @@ public void testNormalScanTimeoutOnOpenScannerAsync() throws IOException { @Test public void testMetaScanTimeoutOnNext() throws IOException { setup(true); - expectTimeoutOnNext(metaScanTimeout, this::getScanner); + testScanTimeoutOnNext(metaReadRpcTimeout, metaScanTimeout); + } + + private void testScanTimeoutOnNext(int rpcTimeout, int scannerTimeout) throws IOException { + if (useScannerTimeoutPeriodForNextCalls) { + // Since this has HBASE_CLIENT_USE_SCANNER_TIMEOUT_FOR_NEXT_CALLS enabled, we pass + // scannerTimeout as the expected timeout duration. + expectTimeoutOnNext(scannerTimeout, this::getScanner); + } else { + // Otherwise we pass rpcTimeout as the expected timeout duration. + // In this case we need a special connection which disables retries, otherwise the scanner + // will retry the timed out next() call, which will cause out of order exception and mess up + // the test + try (Connection conn = getNoRetriesConnection()) { + // Now since we disabled HBASE_CLIENT_USE_SCANNER_TIMEOUT_FOR_NEXT_CALLS, verify rpcTimeout + expectTimeoutOnNext(rpcTimeout, () -> getScanner(conn)); + } + } + } + + private Connection getNoRetriesConnection() throws IOException { + Configuration confNoRetries = new Configuration(CONN.getConfiguration()); + confNoRetries.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 0); + return ConnectionFactory.createConnection(confNoRetries); } /** @@ -253,7 +281,7 @@ public void testMetaScanTimeoutOnNextAsync() throws IOException { @Test public void testMetaScanTimeoutOnOpenScanner() throws IOException { setup(true); - expectTimeoutOnOpenScanner(metaScanTimeout, this::getScanner); + expectTimeoutOnOpenScanner(metaReadRpcTimeout, this::getScanner); } /** @@ -262,7 +290,7 @@ public void testMetaScanTimeoutOnOpenScanner() throws IOException { @Test public void testMetaScanTimeoutOnOpenScannerAsync() throws IOException { setup(true); - expectTimeoutOnOpenScanner(metaScanTimeout, this::getAsyncScanner); + expectTimeoutOnOpenScanner(metaReadRpcTimeout, this::getAsyncScanner); } private void expectRetryOutOfOrderScannerNext(Supplier scannerSupplier) From e1fdbc585aa79fd9fb005a4c1813e136712d177e Mon Sep 17 00:00:00 2001 From: Bryan Beaudreault Date: Wed, 20 Sep 2023 10:56:07 -0400 Subject: [PATCH 3/3] Handle renewLease and close, and add tests for them --- .../client/ScannerCallableWithReplicas.java | 7 +- .../client/TestClientScannerTimeouts.java | 96 ++++++++++++++++++- 2 files changed, 100 insertions(+), 3 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java index 81331ca7379f..5261ff4af5c5 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java @@ -352,12 +352,17 @@ private void addCallsForCurrentReplica( */ private int getRpcTimeout() { if (useScannerTimeoutForNextCalls) { - return currentScannerCallable.scannerId == -1 ? readRpcTimeout : scannerTimeout; + return isNextCall() ? scannerTimeout : readRpcTimeout; } else { return readRpcTimeout; } } + private boolean isNextCall() { + return currentScannerCallable != null && currentScannerCallable.scannerId != -1 + && !currentScannerCallable.renew && !currentScannerCallable.closed; + } + private void addCallsForOtherReplicas( ResultBoundedCompletionService> cs, int min, int max, int rpcTimeout) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientScannerTimeouts.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientScannerTimeouts.java index c0fb94273c24..259bbc4ad9b5 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientScannerTimeouts.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientScannerTimeouts.java @@ -21,6 +21,7 @@ import static org.apache.hadoop.hbase.client.ConnectionConfiguration.HBASE_CLIENT_META_SCANNER_TIMEOUT; import static org.apache.hadoop.hbase.client.ConnectionConfiguration.HBASE_CLIENT_USE_SCANNER_TIMEOUT_PERIOD_FOR_NEXT_CALLS; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -293,6 +294,50 @@ public void testMetaScanTimeoutOnOpenScannerAsync() throws IOException { expectTimeoutOnOpenScanner(metaReadRpcTimeout, this::getAsyncScanner); } + /** + * Test renewLease timeout for non-async scanner, which should use rpcTimeout. Async scanner does + * lease renewal automatically in the background, so renewLease() always returns false. So this + * test doesn't have an Async counterpart like the others. + */ + @Test + public void testNormalScanTimeoutOnRenewLease() throws IOException { + setup(false); + expectTimeoutOnRenewScanner(rpcTimeout, this::getScanner); + } + + /** + * Test renewLease timeout for non-async scanner, which should use rpcTimeout. Async scanner does + * lease renewal automatically in the background, so renewLease() always returns false. So this + * test doesn't have an Async counterpart like the others. + */ + @Test + public void testMetaScanTimeoutOnRenewLease() throws IOException { + setup(true); + expectTimeoutOnRenewScanner(metaReadRpcTimeout, this::getScanner); + } + + /** + * Test close timeout for non-async scanner, which should use rpcTimeout. Async scanner does + * closes async and always returns immediately. So this test doesn't have an Async counterpart + * like the others. + */ + @Test + public void testNormalScanTimeoutOnClose() throws IOException { + setup(false); + expectTimeoutOnCloseScanner(rpcTimeout, this::getScanner); + } + + /** + * Test close timeout for non-async scanner, which should use rpcTimeout. Async scanner does + * closes async and always returns immediately. So this test doesn't have an Async counterpart + * like the others. + */ + @Test + public void testMetaScanTimeoutOnClose() throws IOException { + setup(true); + expectTimeoutOnCloseScanner(metaReadRpcTimeout, this::getScanner); + } + private void expectRetryOutOfOrderScannerNext(Supplier scannerSupplier) throws IOException { setup(false); @@ -399,6 +444,34 @@ private void expectTimeoutOnOpenScanner(int timeout, Supplier sca expectTimeout(start, timeout); } + private void expectTimeoutOnRenewScanner(int timeout, Supplier scannerSupplier) + throws IOException { + RSRpcServicesWithScanTimeout.setSleepForTimeout(timeout); + RSRpcServicesWithScanTimeout.sleepOnRenew = true; + LOG.info( + "Opening scanner, expecting no timeouts from first next() call from openScanner response"); + long start = System.nanoTime(); + ResultScanner scanner = scannerSupplier.get(); + scanner.next(); + assertFalse("Expected renewLease to fail due to timeout", scanner.renewLease()); + expectTimeout(start, timeout); + } + + private void expectTimeoutOnCloseScanner(int timeout, Supplier scannerSupplier) + throws IOException { + RSRpcServicesWithScanTimeout.setSleepForTimeout(timeout); + RSRpcServicesWithScanTimeout.sleepOnClose = true; + LOG.info( + "Opening scanner, expecting no timeouts from first next() call from openScanner response"); + long start = System.nanoTime(); + ResultScanner scanner = scannerSupplier.get(); + scanner.next(); + // close doesnt throw or return anything, so we can't verify it directly. + // but we can verify that it took as long as we expect below + scanner.close(); + expectTimeout(start, timeout); + } + private void expectTimeout(long start, int timeout) { long duration = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start); LOG.info("Expected duration >= {}, and got {}", timeout, duration); @@ -453,6 +526,8 @@ private static class RSRpcServicesWithScanTimeout extends RSRpcServices { private static long seqNoToSleepOn = -1; private static boolean sleepOnOpen = false; + private static boolean sleepOnRenew = false; + private static boolean sleepOnClose = false; private static volatile boolean slept; private static int tryNumber = 0; @@ -470,6 +545,8 @@ public static void reset() { throwAlways = false; threw = false; sleepOnOpen = false; + sleepOnRenew = false; + sleepOnClose = false; slept = false; tryNumber = 0; } @@ -484,7 +561,19 @@ public ScanResponse scan(final RpcController controller, final ScanRequest reque if (request.hasScannerId()) { LOG.info("Got request {}", request); ScanResponse scanResponse = super.scan(controller, request); - if (tableScannerId != request.getScannerId() || request.getCloseScanner()) { + if (tableScannerId != request.getScannerId()) { + return scanResponse; + } + if (request.getCloseScanner()) { + if (!slept && sleepOnClose) { + try { + LOG.info("SLEEPING " + sleepTime); + Thread.sleep(sleepTime); + } catch (InterruptedException e) { + } + slept = true; + tryNumber++; + } return scanResponse; } @@ -499,7 +588,10 @@ public ScanResponse scan(final RpcController controller, final ScanRequest reque throw new ServiceException(new OutOfOrderScannerNextException()); } - if (!slept && request.hasNextCallSeq() && seqNoToSleepOn == request.getNextCallSeq()) { + if ( + !slept && (request.hasNextCallSeq() && seqNoToSleepOn == request.getNextCallSeq() + || sleepOnRenew && request.getRenew()) + ) { try { LOG.info("SLEEPING " + sleepTime); Thread.sleep(sleepTime);