From 82c354eea5c336ed4ec2e86532089ae48c1f517a Mon Sep 17 00:00:00 2001 From: Bryan Beaudreault Date: Wed, 20 Sep 2023 17:17:51 -0400 Subject: [PATCH] HBASE-28085 Configurably use scanner timeout as rpc timeout for scanner next calls (#5402) Signed-off-by: Nick Dimiduk Signed-off-by: Duo Zhang --- .../client/ClientAsyncPrefetchScanner.java | 6 +- .../hadoop/hbase/client/ClientScanner.java | 17 +- .../hbase/client/ClientSimpleScanner.java | 6 +- .../hbase/client/ConnectionConfiguration.java | 16 ++ .../client/ConnectionImplementation.java | 2 +- .../apache/hadoop/hbase/client/HTable.java | 6 +- .../hbase/client/ReversedClientScanner.java | 6 +- .../client/ScannerCallableWithReplicas.java | 49 ++++- .../hbase/client/TestClientScanner.java | 18 +- .../client/TestClientScannerTimeouts.java | 183 +++++++++++++++--- 10 files changed, 247 insertions(+), 62 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientAsyncPrefetchScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientAsyncPrefetchScanner.java index 769931b7083f..abd1267ffc4b 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientAsyncPrefetchScanner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientAsyncPrefetchScanner.java @@ -66,9 +66,11 @@ public ClientAsyncPrefetchScanner(Configuration configuration, Scan scan, TableN ClusterConnection connection, RpcRetryingCallerFactory rpcCallerFactory, RpcControllerFactory rpcControllerFactory, ExecutorService pool, int scanReadRpcTimeout, int scannerTimeout, int replicaCallTimeoutMicroSecondScan, - Map requestAttributes) throws IOException { + ConnectionConfiguration connectionConfiguration, Map requestAttributes) + throws IOException { super(configuration, scan, name, connection, rpcCallerFactory, rpcControllerFactory, pool, - scanReadRpcTimeout, scannerTimeout, replicaCallTimeoutMicroSecondScan, requestAttributes); + scanReadRpcTimeout, scannerTimeout, replicaCallTimeoutMicroSecondScan, + connectionConfiguration, requestAttributes); exceptionsQueue = new ConcurrentLinkedQueue<>(); final Context context = Context.current(); final Runnable runnable = context.wrap(new PrefetchRunnable()); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java index 33cfedc362ad..ef8e4b0404f6 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java @@ -33,7 +33,6 @@ import org.apache.commons.lang3.mutable.MutableBoolean; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.DoNotRetryIOException; -import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.NotServingRegionException; import org.apache.hadoop.hbase.TableName; @@ -78,6 +77,7 @@ public abstract class ClientScanner extends AbstractClientScanner { protected final TableName tableName; protected final int readRpcTimeout; protected final int scannerTimeout; + private final boolean useScannerTimeoutForNextCalls; protected boolean scanMetricsPublished = false; protected RpcRetryingCaller caller; protected RpcControllerFactory rpcControllerFactory; @@ -104,7 +104,8 @@ public abstract class ClientScanner extends AbstractClientScanner { public ClientScanner(final Configuration conf, final Scan scan, final TableName tableName, ClusterConnection connection, RpcRetryingCallerFactory rpcFactory, RpcControllerFactory controllerFactory, ExecutorService pool, int scanReadRpcTimeout, - int scannerTimeout, int primaryOperationTimeout, Map requestAttributes) + int scannerTimeout, int primaryOperationTimeout, + ConnectionConfiguration connectionConfiguration, Map requestAttributes) throws IOException { if (LOG.isTraceEnabled()) { LOG.trace( @@ -116,16 +117,15 @@ public ClientScanner(final Configuration conf, final Scan scan, final TableName this.connection = connection; this.pool = pool; this.primaryOperationTimeout = primaryOperationTimeout; - this.retries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, - HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER); + this.retries = connectionConfiguration.getRetriesNumber(); if (scan.getMaxResultSize() > 0) { this.maxScannerResultSize = scan.getMaxResultSize(); } else { - this.maxScannerResultSize = conf.getLong(HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY, - HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE); + this.maxScannerResultSize = connectionConfiguration.getScannerMaxResultSize(); } this.readRpcTimeout = scanReadRpcTimeout; this.scannerTimeout = scannerTimeout; + this.useScannerTimeoutForNextCalls = connectionConfiguration.isUseScannerTimeoutForNextCalls(); this.requestAttributes = requestAttributes; // check if application wants to collect scan metrics @@ -135,8 +135,7 @@ public ClientScanner(final Configuration conf, final Scan scan, final TableName if (this.scan.getCaching() > 0) { this.caching = this.scan.getCaching(); } else { - this.caching = conf.getInt(HConstants.HBASE_CLIENT_SCANNER_CACHING, - HConstants.DEFAULT_HBASE_CLIENT_SCANNER_CACHING); + this.caching = connectionConfiguration.getScannerCaching(); } this.caller = rpcFactory. newCaller(); @@ -255,7 +254,7 @@ protected boolean moveToNextRegion() { this.currentRegion = null; this.callable = new ScannerCallableWithReplicas(getTable(), getConnection(), createScannerCallable(), pool, primaryOperationTimeout, scan, getRetries(), readRpcTimeout, - scannerTimeout, caching, conf, caller); + scannerTimeout, useScannerTimeoutForNextCalls, caching, conf, caller); this.callable.setCaching(this.caching); incRegionCountMetrics(scanMetrics); return true; diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSimpleScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSimpleScanner.java index 81091ad3010c..bde036f88806 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSimpleScanner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSimpleScanner.java @@ -39,9 +39,11 @@ public ClientSimpleScanner(Configuration configuration, Scan scan, TableName nam ClusterConnection connection, RpcRetryingCallerFactory rpcCallerFactory, RpcControllerFactory rpcControllerFactory, ExecutorService pool, int scanReadRpcTimeout, int scannerTimeout, int replicaCallTimeoutMicroSecondScan, - Map requestAttributes) throws IOException { + ConnectionConfiguration connectionConfiguration, Map requestAttributes) + throws IOException { super(configuration, scan, name, connection, rpcCallerFactory, rpcControllerFactory, pool, - scanReadRpcTimeout, scannerTimeout, replicaCallTimeoutMicroSecondScan, requestAttributes); + scanReadRpcTimeout, scannerTimeout, replicaCallTimeoutMicroSecondScan, + connectionConfiguration, requestAttributes); } @Override diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionConfiguration.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionConfiguration.java index 93fa2600d898..2a6651b5dde0 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionConfiguration.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionConfiguration.java @@ -76,6 +76,12 @@ public class ConnectionConfiguration { public static final String HBASE_CLIENT_META_SCANNER_TIMEOUT = "hbase.client.meta.scanner.timeout.period"; + public static final String HBASE_CLIENT_USE_SCANNER_TIMEOUT_PERIOD_FOR_NEXT_CALLS = + "hbase.client.use.scanner.timeout.period.for.next.calls"; + + public static final boolean HBASE_CLIENT_USE_SCANNER_TIMEOUT_PERIOD_FOR_NEXT_CALLS_DEFAULT = + false; + private final long writeBufferSize; private final long writeBufferPeriodicFlushTimeoutMs; private final long writeBufferPeriodicFlushTimerTickMs; @@ -99,6 +105,7 @@ public class ConnectionConfiguration { private final boolean clientScannerAsyncPrefetch; private final long pauseMs; private final long pauseMsForServerOverloaded; + private final boolean useScannerTimeoutForNextCalls; /** * Constructor @@ -158,6 +165,9 @@ public class ConnectionConfiguration { HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD); this.metaScanTimeout = conf.getInt(HBASE_CLIENT_META_SCANNER_TIMEOUT, scanTimeout); + this.useScannerTimeoutForNextCalls = + conf.getBoolean(HBASE_CLIENT_USE_SCANNER_TIMEOUT_PERIOD_FOR_NEXT_CALLS, + HBASE_CLIENT_USE_SCANNER_TIMEOUT_PERIOD_FOR_NEXT_CALLS_DEFAULT); long pauseMs = conf.getLong(HBASE_CLIENT_PAUSE, DEFAULT_HBASE_CLIENT_PAUSE); long pauseMsForServerOverloaded = conf.getLong(HBASE_CLIENT_PAUSE_FOR_SERVER_OVERLOADED, @@ -201,6 +211,8 @@ protected ConnectionConfiguration() { this.metaScanTimeout = scanTimeout; this.pauseMs = DEFAULT_HBASE_CLIENT_PAUSE; this.pauseMsForServerOverloaded = DEFAULT_HBASE_CLIENT_PAUSE; + this.useScannerTimeoutForNextCalls = + HBASE_CLIENT_USE_SCANNER_TIMEOUT_PERIOD_FOR_NEXT_CALLS_DEFAULT; } public int getReadRpcTimeout() { @@ -275,6 +287,10 @@ public int getScanTimeout() { return scanTimeout; } + public boolean isUseScannerTimeoutForNextCalls() { + return useScannerTimeoutForNextCalls; + } + public int getMetaScanTimeout() { return metaScanTimeout; } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java index ff7418e39cd3..70d5760df485 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java @@ -1052,7 +1052,7 @@ private RegionLocations locateRegionInMeta(TableName tableName, byte[] row, bool ReversedClientScanner rcs = new ReversedClientScanner(conf, s, TableName.META_TABLE_NAME, this, rpcCallerFactory, rpcControllerFactory, getMetaLookupPool(), connectionConfig.getMetaReadRpcTimeout(), connectionConfig.getMetaScanTimeout(), - metaReplicaCallTimeoutScanInMicroSecond, Collections.emptyMap())) { + metaReplicaCallTimeoutScanInMicroSecond, connectionConfig, Collections.emptyMap())) { boolean tableNotFound = true; for (;;) { Result regionInfoRow = rcs.next(); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java index cc24d80f5ed8..386a7db3526e 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java @@ -321,16 +321,16 @@ public ResultScanner getScanner(Scan scan) throws IOException { if (scan.isReversed()) { return new ReversedClientScanner(getConfiguration(), scan, getName(), connection, rpcCallerFactory, rpcControllerFactory, pool, scanReadRpcTimeout, scanTimeout, - replicaTimeout, requestAttributes); + replicaTimeout, connConfiguration, requestAttributes); } else { if (async) { return new ClientAsyncPrefetchScanner(getConfiguration(), scan, getName(), connection, rpcCallerFactory, rpcControllerFactory, pool, scanReadRpcTimeout, scanTimeout, - replicaTimeout, requestAttributes); + replicaTimeout, connConfiguration, requestAttributes); } else { return new ClientSimpleScanner(getConfiguration(), scan, getName(), connection, rpcCallerFactory, rpcControllerFactory, pool, scanReadRpcTimeout, scanTimeout, - replicaTimeout, requestAttributes); + replicaTimeout, connConfiguration, requestAttributes); } } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedClientScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedClientScanner.java index 68a8e7b74067..36bbdb5b60e4 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedClientScanner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedClientScanner.java @@ -40,10 +40,12 @@ public class ReversedClientScanner extends ClientScanner { public ReversedClientScanner(Configuration conf, Scan scan, TableName tableName, ClusterConnection connection, RpcRetryingCallerFactory rpcFactory, RpcControllerFactory controllerFactory, ExecutorService pool, int scanReadRpcTimeout, - int scannerTimeout, int primaryOperationTimeout, Map requestAttributes) + int scannerTimeout, int primaryOperationTimeout, + ConnectionConfiguration connectionConfiguration, Map requestAttributes) throws IOException { super(conf, scan, tableName, connection, rpcFactory, controllerFactory, pool, - scanReadRpcTimeout, scannerTimeout, primaryOperationTimeout, requestAttributes); + scanReadRpcTimeout, scannerTimeout, primaryOperationTimeout, connectionConfiguration, + requestAttributes); } @Override diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java index 227ad849c846..5261ff4af5c5 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java @@ -57,6 +57,7 @@ class ScannerCallableWithReplicas implements RetryingCallable { AtomicBoolean replicaSwitched = new AtomicBoolean(false); private final ClusterConnection cConnection; protected final ExecutorService pool; + private final boolean useScannerTimeoutForNextCalls; protected final int timeBeforeReplicas; private final Scan scan; private final int retries; @@ -72,11 +73,12 @@ class ScannerCallableWithReplicas implements RetryingCallable { public ScannerCallableWithReplicas(TableName tableName, ClusterConnection cConnection, ScannerCallable baseCallable, ExecutorService pool, int timeBeforeReplicas, Scan scan, - int retries, int readRpcTimeout, int scannerTimeout, int caching, Configuration conf, - RpcRetryingCaller caller) { + int retries, int readRpcTimeout, int scannerTimeout, boolean useScannerTimeoutForNextCalls, + int caching, Configuration conf, RpcRetryingCaller caller) { this.currentScannerCallable = baseCallable; this.cConnection = cConnection; this.pool = pool; + this.useScannerTimeoutForNextCalls = useScannerTimeoutForNextCalls; if (timeBeforeReplicas < 0) { throw new IllegalArgumentException("Invalid value of operation timeout on the primary"); } @@ -187,9 +189,12 @@ public Result[] call(int timeout) throws IOException { pool, regionReplication * 5); AtomicBoolean done = new AtomicBoolean(false); + // make sure we use the same rpcTimeout for current and other replicas + int rpcTimeoutForCall = getRpcTimeout(); + replicaSwitched.set(false); // submit call for the primary replica or user specified replica - addCallsForCurrentReplica(cs); + addCallsForCurrentReplica(cs, rpcTimeoutForCall); int startIndex = 0; try { @@ -234,7 +239,7 @@ public Result[] call(int timeout) throws IOException { endIndex = 1; } else { // TODO: this may be an overkill for large region replication - addCallsForOtherReplicas(cs, 0, regionReplication - 1); + addCallsForOtherReplicas(cs, 0, regionReplication - 1, rpcTimeoutForCall); } try { @@ -326,15 +331,41 @@ public Cursor getCursor() { return currentScannerCallable != null ? currentScannerCallable.getCursor() : null; } - private void - addCallsForCurrentReplica(ResultBoundedCompletionService> cs) { + private void addCallsForCurrentReplica( + ResultBoundedCompletionService> cs, int rpcTimeout) { RetryingRPC retryingOnReplica = new RetryingRPC(currentScannerCallable); outstandingCallables.add(currentScannerCallable); - cs.submit(retryingOnReplica, readRpcTimeout, scannerTimeout, currentScannerCallable.id); + cs.submit(retryingOnReplica, rpcTimeout, scannerTimeout, currentScannerCallable.id); + } + + /** + * As we have a call sequence for scan, it is useless to have a different rpc timeout which is + * less than the scan timeout. If the server does not respond in time(usually this will not happen + * as we have heartbeat now), we will get an OutOfOrderScannerNextException when resending the + * next request and the only way to fix this is to close the scanner and open a new one. + *

+ * The legacy behavior of ScannerCallable has been to use readRpcTimeout despite the above. If + * using legacy behavior, we always use that. + *

+ * If new behavior is enabled, we determine the rpc timeout to use based on whether the scanner is + * open. If scanner is open, use scannerTimeout otherwise use readRpcTimeout. + */ + private int getRpcTimeout() { + if (useScannerTimeoutForNextCalls) { + return isNextCall() ? scannerTimeout : readRpcTimeout; + } else { + return readRpcTimeout; + } + } + + private boolean isNextCall() { + return currentScannerCallable != null && currentScannerCallable.scannerId != -1 + && !currentScannerCallable.renew && !currentScannerCallable.closed; } private void addCallsForOtherReplicas( - ResultBoundedCompletionService> cs, int min, int max) { + ResultBoundedCompletionService> cs, int min, int max, + int rpcTimeout) { for (int id = min; id <= max; id++) { if (currentScannerCallable.id == id) { @@ -344,7 +375,7 @@ private void addCallsForOtherReplicas( setStartRowForReplicaCallable(s); outstandingCallables.add(s); RetryingRPC retryingOnReplica = new RetryingRPC(s); - cs.submit(retryingOnReplica, readRpcTimeout, scannerTimeout, id); + cs.submit(retryingOnReplica, rpcTimeout, scannerTimeout, id); } } diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientScanner.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientScanner.java index 0025a4fdbdb4..9b5eb91bbd5c 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientScanner.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientScanner.java @@ -108,12 +108,12 @@ private static class MockClientScanner extends ClientSimpleScanner { public MockClientScanner(final Configuration conf, final Scan scan, final TableName tableName, ClusterConnection connection, RpcRetryingCallerFactory rpcFactory, - RpcControllerFactory controllerFactory, ExecutorService pool, int primaryOperationTimeout) - throws IOException { + RpcControllerFactory controllerFactory, ExecutorService pool, int primaryOperationTimeout, + ConnectionConfiguration connectionConfig) throws IOException { super(conf, scan, tableName, connection, rpcFactory, controllerFactory, pool, HConstants.DEFAULT_HBASE_RPC_TIMEOUT, HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, primaryOperationTimeout, - Collections.emptyMap()); + connectionConfig, Collections.emptyMap()); } @Override @@ -178,7 +178,7 @@ public Result[] answer(InvocationOnMock invocation) throws Throwable { try (MockClientScanner scanner = new MockClientScanner(conf, scan, TableName.valueOf(name.getMethodName()), clusterConn, - rpcFactory, controllerFactory, pool, Integer.MAX_VALUE)) { + rpcFactory, controllerFactory, pool, Integer.MAX_VALUE, connectionConfig)) { scanner.setRpcFinished(true); @@ -242,7 +242,7 @@ public Result[] answer(InvocationOnMock invocation) throws Throwable { try (MockClientScanner scanner = new MockClientScanner(conf, scan, TableName.valueOf(name.getMethodName()), clusterConn, - rpcFactory, controllerFactory, pool, Integer.MAX_VALUE)) { + rpcFactory, controllerFactory, pool, Integer.MAX_VALUE, connectionConfig)) { InOrder inOrder = Mockito.inOrder(caller); scanner.loadCache(); @@ -305,7 +305,7 @@ public Result[] answer(InvocationOnMock invocation) throws Throwable { try (MockClientScanner scanner = new MockClientScanner(conf, scan, TableName.valueOf(name.getMethodName()), clusterConn, - rpcFactory, controllerFactory, pool, Integer.MAX_VALUE)) { + rpcFactory, controllerFactory, pool, Integer.MAX_VALUE, connectionConfig)) { InOrder inOrder = Mockito.inOrder(caller); scanner.loadCache(); @@ -376,7 +376,7 @@ public Result[] answer(InvocationOnMock invocation) throws Throwable { try (MockClientScanner scanner = new MockClientScanner(conf, scan, TableName.valueOf(name.getMethodName()), clusterConn, - rpcFactory, controllerFactory, pool, Integer.MAX_VALUE)) { + rpcFactory, controllerFactory, pool, Integer.MAX_VALUE, connectionConfig)) { scanner.setRpcFinished(true); InOrder inOrder = Mockito.inOrder(caller); @@ -443,7 +443,7 @@ public Result[] answer(InvocationOnMock invocation) throws Throwable { try (MockClientScanner scanner = new MockClientScanner(conf, scan, TableName.valueOf(name.getMethodName()), clusterConn, - rpcFactory, controllerFactory, pool, Integer.MAX_VALUE)) { + rpcFactory, controllerFactory, pool, Integer.MAX_VALUE, connectionConfig)) { InOrder inOrder = Mockito.inOrder(caller); scanner.setRpcFinished(true); @@ -488,7 +488,7 @@ public void testExceptionsFromReplicasArePropagated() throws IOException { try (MockClientScanner scanner = new MockClientScanner(conf, scan, TableName.valueOf(name.getMethodName()), clusterConn, - rpcFactory, new RpcControllerFactory(conf), pool, Integer.MAX_VALUE)) { + rpcFactory, new RpcControllerFactory(conf), pool, Integer.MAX_VALUE, connectionConfig)) { Iterator iter = scanner.iterator(); while (iter.hasNext()) { iter.next(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientScannerTimeouts.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientScannerTimeouts.java index 2bff2297ff52..259bbc4ad9b5 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientScannerTimeouts.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientScannerTimeouts.java @@ -19,16 +19,20 @@ import static org.apache.hadoop.hbase.client.ConnectionConfiguration.HBASE_CLIENT_META_READ_RPC_TIMEOUT_KEY; import static org.apache.hadoop.hbase.client.ConnectionConfiguration.HBASE_CLIENT_META_SCANNER_TIMEOUT; +import static org.apache.hadoop.hbase.client.ConnectionConfiguration.HBASE_CLIENT_USE_SCANNER_TIMEOUT_PERIOD_FOR_NEXT_CALLS; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import java.io.IOException; import java.net.SocketTimeoutException; +import java.util.Collection; import java.util.concurrent.TimeUnit; import java.util.function.Supplier; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseCommonTestingUtility; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.MiniHBaseCluster; @@ -41,13 +45,17 @@ import org.apache.hadoop.hbase.testclassification.ClientTests; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.util.Bytes; +import org.junit.After; import org.junit.AfterClass; +import org.junit.Before; import org.junit.BeforeClass; import org.junit.ClassRule; import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.rules.TestName; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -57,6 +65,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse; +@RunWith(Parameterized.class) @Category({ MediumTests.class, ClientTests.class }) public class TestClientScannerTimeouts { @@ -67,8 +76,8 @@ public class TestClientScannerTimeouts { private static final Logger LOG = LoggerFactory.getLogger(TestClientScannerTimeouts.class); private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); - private static AsyncConnection ASYNC_CONN; - private static Connection CONN; + private AsyncConnection ASYNC_CONN; + private Connection CONN; private static final byte[] FAMILY = Bytes.toBytes("testFamily"); private static final byte[] QUALIFIER = Bytes.toBytes("testQualifier"); private static final byte[] VALUE = Bytes.toBytes("testValue"); @@ -79,7 +88,8 @@ public class TestClientScannerTimeouts { private static final byte[] ROW3 = Bytes.toBytes("row-3"); private static final int rpcTimeout = 1000; private static final int scanTimeout = 3 * rpcTimeout; - private static final int metaScanTimeout = 6 * rpcTimeout; + private static final int metaReadRpcTimeout = 6 * rpcTimeout; + private static final int metaScanTimeout = 9 * rpcTimeout; private static final int CLIENT_RETRIES_NUMBER = 3; private static TableName tableName; @@ -87,6 +97,14 @@ public class TestClientScannerTimeouts { @Rule public TestName name = new TestName(); + @Parameterized.Parameter + public boolean useScannerTimeoutPeriodForNextCalls; + + @Parameterized.Parameters + public static Collection parameters() { + return HBaseCommonTestingUtility.BOOLEAN_PARAMETERIZED; + } + @BeforeClass public static void setUpBeforeClass() throws Exception { Configuration conf = TEST_UTIL.getConfiguration(); @@ -97,25 +115,38 @@ public static void setUpBeforeClass() throws Exception { conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, CLIENT_RETRIES_NUMBER); conf.setInt(HConstants.HBASE_CLIENT_PAUSE, 1000); TEST_UTIL.startMiniCluster(1); + } + @Before + public void setUp() throws Exception { + Configuration conf = new Configuration(TEST_UTIL.getConfiguration()); conf.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, scanTimeout); - conf.setInt(HBASE_CLIENT_META_READ_RPC_TIMEOUT_KEY, metaScanTimeout); + conf.setInt(HBASE_CLIENT_META_READ_RPC_TIMEOUT_KEY, metaReadRpcTimeout); conf.setInt(HBASE_CLIENT_META_SCANNER_TIMEOUT, metaScanTimeout); + conf.setBoolean(HBASE_CLIENT_USE_SCANNER_TIMEOUT_PERIOD_FOR_NEXT_CALLS, + useScannerTimeoutPeriodForNextCalls); ASYNC_CONN = ConnectionFactory.createAsyncConnection(conf).get(); CONN = ConnectionFactory.createConnection(conf); } - @AfterClass - public static void tearDownAfterClass() throws Exception { + @After + public void after() throws Exception { CONN.close(); ASYNC_CONN.close(); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { TEST_UTIL.shutdownMiniCluster(); } public void setup(boolean isSystemTable) throws IOException { RSRpcServicesWithScanTimeout.reset(); - String nameAsString = name.getMethodName(); + // parameterization adds non-alphanumeric chars to the method name. strip them so + // it parses as a table name + String nameAsString = name.getMethodName().replaceAll("[^a-zA-Z0-9]", "_") + "-" + + useScannerTimeoutPeriodForNextCalls; if (isSystemTable) { nameAsString = NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR + ":" + nameAsString; } @@ -168,22 +199,10 @@ public void testRetryOutOfOrderScannerNextExceptionAsync() throws IOException { expectRetryOutOfOrderScannerNext(this::getAsyncScanner); } - /** - * verify that we honor the {@link HConstants#HBASE_RPC_READ_TIMEOUT_KEY} for normal scans. Use a - * special connection which has retries disabled, because otherwise the scanner will retry the - * timed out next() call and mess up the test. - */ @Test public void testNormalScanTimeoutOnNext() throws IOException { setup(false); - // Unlike AsyncTable, Table's ResultScanner.next() call uses rpcTimeout and - // will retry until scannerTimeout. This makes it more difficult to test the timeouts - // of normal next() calls. So we use a separate connection here which has retries disabled. - Configuration confNoRetries = new Configuration(CONN.getConfiguration()); - confNoRetries.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 0); - try (Connection conn = ConnectionFactory.createConnection(confNoRetries)) { - expectTimeoutOnNext(rpcTimeout, () -> getScanner(conn)); - } + testScanTimeoutOnNext(rpcTimeout, scanTimeout); } /** @@ -221,7 +240,30 @@ public void testNormalScanTimeoutOnOpenScannerAsync() throws IOException { @Test public void testMetaScanTimeoutOnNext() throws IOException { setup(true); - expectTimeoutOnNext(metaScanTimeout, this::getScanner); + testScanTimeoutOnNext(metaReadRpcTimeout, metaScanTimeout); + } + + private void testScanTimeoutOnNext(int rpcTimeout, int scannerTimeout) throws IOException { + if (useScannerTimeoutPeriodForNextCalls) { + // Since this has HBASE_CLIENT_USE_SCANNER_TIMEOUT_FOR_NEXT_CALLS enabled, we pass + // scannerTimeout as the expected timeout duration. + expectTimeoutOnNext(scannerTimeout, this::getScanner); + } else { + // Otherwise we pass rpcTimeout as the expected timeout duration. + // In this case we need a special connection which disables retries, otherwise the scanner + // will retry the timed out next() call, which will cause out of order exception and mess up + // the test + try (Connection conn = getNoRetriesConnection()) { + // Now since we disabled HBASE_CLIENT_USE_SCANNER_TIMEOUT_FOR_NEXT_CALLS, verify rpcTimeout + expectTimeoutOnNext(rpcTimeout, () -> getScanner(conn)); + } + } + } + + private Connection getNoRetriesConnection() throws IOException { + Configuration confNoRetries = new Configuration(CONN.getConfiguration()); + confNoRetries.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 0); + return ConnectionFactory.createConnection(confNoRetries); } /** @@ -240,7 +282,7 @@ public void testMetaScanTimeoutOnNextAsync() throws IOException { @Test public void testMetaScanTimeoutOnOpenScanner() throws IOException { setup(true); - expectTimeoutOnOpenScanner(metaScanTimeout, this::getScanner); + expectTimeoutOnOpenScanner(metaReadRpcTimeout, this::getScanner); } /** @@ -249,7 +291,51 @@ public void testMetaScanTimeoutOnOpenScanner() throws IOException { @Test public void testMetaScanTimeoutOnOpenScannerAsync() throws IOException { setup(true); - expectTimeoutOnOpenScanner(metaScanTimeout, this::getAsyncScanner); + expectTimeoutOnOpenScanner(metaReadRpcTimeout, this::getAsyncScanner); + } + + /** + * Test renewLease timeout for non-async scanner, which should use rpcTimeout. Async scanner does + * lease renewal automatically in the background, so renewLease() always returns false. So this + * test doesn't have an Async counterpart like the others. + */ + @Test + public void testNormalScanTimeoutOnRenewLease() throws IOException { + setup(false); + expectTimeoutOnRenewScanner(rpcTimeout, this::getScanner); + } + + /** + * Test renewLease timeout for non-async scanner, which should use rpcTimeout. Async scanner does + * lease renewal automatically in the background, so renewLease() always returns false. So this + * test doesn't have an Async counterpart like the others. + */ + @Test + public void testMetaScanTimeoutOnRenewLease() throws IOException { + setup(true); + expectTimeoutOnRenewScanner(metaReadRpcTimeout, this::getScanner); + } + + /** + * Test close timeout for non-async scanner, which should use rpcTimeout. Async scanner does + * closes async and always returns immediately. So this test doesn't have an Async counterpart + * like the others. + */ + @Test + public void testNormalScanTimeoutOnClose() throws IOException { + setup(false); + expectTimeoutOnCloseScanner(rpcTimeout, this::getScanner); + } + + /** + * Test close timeout for non-async scanner, which should use rpcTimeout. Async scanner does + * closes async and always returns immediately. So this test doesn't have an Async counterpart + * like the others. + */ + @Test + public void testMetaScanTimeoutOnClose() throws IOException { + setup(true); + expectTimeoutOnCloseScanner(metaReadRpcTimeout, this::getScanner); } private void expectRetryOutOfOrderScannerNext(Supplier scannerSupplier) @@ -358,6 +444,34 @@ private void expectTimeoutOnOpenScanner(int timeout, Supplier sca expectTimeout(start, timeout); } + private void expectTimeoutOnRenewScanner(int timeout, Supplier scannerSupplier) + throws IOException { + RSRpcServicesWithScanTimeout.setSleepForTimeout(timeout); + RSRpcServicesWithScanTimeout.sleepOnRenew = true; + LOG.info( + "Opening scanner, expecting no timeouts from first next() call from openScanner response"); + long start = System.nanoTime(); + ResultScanner scanner = scannerSupplier.get(); + scanner.next(); + assertFalse("Expected renewLease to fail due to timeout", scanner.renewLease()); + expectTimeout(start, timeout); + } + + private void expectTimeoutOnCloseScanner(int timeout, Supplier scannerSupplier) + throws IOException { + RSRpcServicesWithScanTimeout.setSleepForTimeout(timeout); + RSRpcServicesWithScanTimeout.sleepOnClose = true; + LOG.info( + "Opening scanner, expecting no timeouts from first next() call from openScanner response"); + long start = System.nanoTime(); + ResultScanner scanner = scannerSupplier.get(); + scanner.next(); + // close doesnt throw or return anything, so we can't verify it directly. + // but we can verify that it took as long as we expect below + scanner.close(); + expectTimeout(start, timeout); + } + private void expectTimeout(long start, int timeout) { long duration = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start); LOG.info("Expected duration >= {}, and got {}", timeout, duration); @@ -412,6 +526,8 @@ private static class RSRpcServicesWithScanTimeout extends RSRpcServices { private static long seqNoToSleepOn = -1; private static boolean sleepOnOpen = false; + private static boolean sleepOnRenew = false; + private static boolean sleepOnClose = false; private static volatile boolean slept; private static int tryNumber = 0; @@ -429,6 +545,8 @@ public static void reset() { throwAlways = false; threw = false; sleepOnOpen = false; + sleepOnRenew = false; + sleepOnClose = false; slept = false; tryNumber = 0; } @@ -443,7 +561,19 @@ public ScanResponse scan(final RpcController controller, final ScanRequest reque if (request.hasScannerId()) { LOG.info("Got request {}", request); ScanResponse scanResponse = super.scan(controller, request); - if (tableScannerId != request.getScannerId() || request.getCloseScanner()) { + if (tableScannerId != request.getScannerId()) { + return scanResponse; + } + if (request.getCloseScanner()) { + if (!slept && sleepOnClose) { + try { + LOG.info("SLEEPING " + sleepTime); + Thread.sleep(sleepTime); + } catch (InterruptedException e) { + } + slept = true; + tryNumber++; + } return scanResponse; } @@ -458,7 +588,10 @@ public ScanResponse scan(final RpcController controller, final ScanRequest reque throw new ServiceException(new OutOfOrderScannerNextException()); } - if (!slept && request.hasNextCallSeq() && seqNoToSleepOn == request.getNextCallSeq()) { + if ( + !slept && (request.hasNextCallSeq() && seqNoToSleepOn == request.getNextCallSeq() + || sleepOnRenew && request.getRenew()) + ) { try { LOG.info("SLEEPING " + sleepTime); Thread.sleep(sleepTime);