diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientAsyncPrefetchScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientAsyncPrefetchScanner.java index 3b556e8f788e..abd1267ffc4b 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientAsyncPrefetchScanner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientAsyncPrefetchScanner.java @@ -65,10 +65,12 @@ public class ClientAsyncPrefetchScanner extends ClientSimpleScanner { public ClientAsyncPrefetchScanner(Configuration configuration, Scan scan, TableName name, ClusterConnection connection, RpcRetryingCallerFactory rpcCallerFactory, RpcControllerFactory rpcControllerFactory, ExecutorService pool, int scanReadRpcTimeout, - int scannerTimeout, int replicaCallTimeoutMicroSecondScan, ConnectionConfiguration connectionConfiguration, - Map requestAttributes) throws IOException { + int scannerTimeout, int replicaCallTimeoutMicroSecondScan, + ConnectionConfiguration connectionConfiguration, Map requestAttributes) + throws IOException { super(configuration, scan, name, connection, rpcCallerFactory, rpcControllerFactory, pool, - scanReadRpcTimeout, scannerTimeout, replicaCallTimeoutMicroSecondScan, connectionConfiguration, requestAttributes); + scanReadRpcTimeout, scannerTimeout, replicaCallTimeoutMicroSecondScan, + connectionConfiguration, requestAttributes); exceptionsQueue = new ConcurrentLinkedQueue<>(); final Context context = Context.current(); final Runnable runnable = context.wrap(new PrefetchRunnable()); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java index 005a07f34dff..ef8e4b0404f6 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java @@ -33,7 +33,6 @@ import org.apache.commons.lang3.mutable.MutableBoolean; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.DoNotRetryIOException; -import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.NotServingRegionException; import org.apache.hadoop.hbase.TableName; @@ -105,8 +104,8 @@ public abstract class ClientScanner extends AbstractClientScanner { public ClientScanner(final Configuration conf, final Scan scan, final TableName tableName, ClusterConnection connection, RpcRetryingCallerFactory rpcFactory, RpcControllerFactory controllerFactory, ExecutorService pool, int scanReadRpcTimeout, - int scannerTimeout, int primaryOperationTimeout, ConnectionConfiguration connectionConfiguration, - Map requestAttributes) + int scannerTimeout, int primaryOperationTimeout, + ConnectionConfiguration connectionConfiguration, Map requestAttributes) throws IOException { if (LOG.isTraceEnabled()) { LOG.trace( diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSimpleScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSimpleScanner.java index 847d61ebeee1..bde036f88806 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSimpleScanner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSimpleScanner.java @@ -39,9 +39,11 @@ public ClientSimpleScanner(Configuration configuration, Scan scan, TableName nam ClusterConnection connection, RpcRetryingCallerFactory rpcCallerFactory, RpcControllerFactory rpcControllerFactory, ExecutorService pool, int scanReadRpcTimeout, int scannerTimeout, int replicaCallTimeoutMicroSecondScan, - ConnectionConfiguration connectionConfiguration, Map requestAttributes) throws IOException { + ConnectionConfiguration connectionConfiguration, Map requestAttributes) + throws IOException { super(configuration, scan, name, connection, rpcCallerFactory, rpcControllerFactory, pool, - scanReadRpcTimeout, scannerTimeout, replicaCallTimeoutMicroSecondScan, connectionConfiguration, requestAttributes); + scanReadRpcTimeout, scannerTimeout, replicaCallTimeoutMicroSecondScan, + connectionConfiguration, requestAttributes); } @Override diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionConfiguration.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionConfiguration.java index 68deddee635b..90515296ca86 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionConfiguration.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionConfiguration.java @@ -164,7 +164,9 @@ public class ConnectionConfiguration { HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD); this.metaScanTimeout = conf.getInt(HBASE_CLIENT_META_SCANNER_TIMEOUT, scanTimeout); - this.useScannerTimeoutForNextCalls = conf.getBoolean(HBASE_CLIENT_USE_SCANNER_TIMEOUT_FOR_NEXT_CALLS, HBASE_CLIENT_USE_SCANNER_TIMEOUT_FOR_NEXT_CALLS_DEFAULT); + this.useScannerTimeoutForNextCalls = + conf.getBoolean(HBASE_CLIENT_USE_SCANNER_TIMEOUT_FOR_NEXT_CALLS, + HBASE_CLIENT_USE_SCANNER_TIMEOUT_FOR_NEXT_CALLS_DEFAULT); long pauseMs = conf.getLong(HBASE_CLIENT_PAUSE, DEFAULT_HBASE_CLIENT_PAUSE); long pauseMsForServerOverloaded = conf.getLong(HBASE_CLIENT_PAUSE_FOR_SERVER_OVERLOADED, diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedClientScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedClientScanner.java index cd6af5a80c49..36bbdb5b60e4 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedClientScanner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedClientScanner.java @@ -40,11 +40,12 @@ public class ReversedClientScanner extends ClientScanner { public ReversedClientScanner(Configuration conf, Scan scan, TableName tableName, ClusterConnection connection, RpcRetryingCallerFactory rpcFactory, RpcControllerFactory controllerFactory, ExecutorService pool, int scanReadRpcTimeout, - int scannerTimeout, int primaryOperationTimeout, ConnectionConfiguration connectionConfiguration, - Map requestAttributes) + int scannerTimeout, int primaryOperationTimeout, + ConnectionConfiguration connectionConfiguration, Map requestAttributes) throws IOException { super(conf, scan, tableName, connection, rpcFactory, controllerFactory, pool, - scanReadRpcTimeout, scannerTimeout, primaryOperationTimeout, connectionConfiguration, requestAttributes); + scanReadRpcTimeout, scannerTimeout, primaryOperationTimeout, connectionConfiguration, + requestAttributes); } @Override diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java index a41e2e863a31..2927216f3246 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java @@ -73,8 +73,8 @@ class ScannerCallableWithReplicas implements RetryingCallable { public ScannerCallableWithReplicas(TableName tableName, ClusterConnection cConnection, ScannerCallable baseCallable, ExecutorService pool, int timeBeforeReplicas, Scan scan, - int retries, int readRpcTimeout, int scannerTimeout, boolean useScannerTimeoutForNextCalls, int caching, Configuration conf, - RpcRetryingCaller caller) { + int retries, int readRpcTimeout, int scannerTimeout, boolean useScannerTimeoutForNextCalls, + int caching, Configuration conf, RpcRetryingCaller caller) { this.currentScannerCallable = baseCallable; this.cConnection = cConnection; this.pool = pool; @@ -331,8 +331,8 @@ public Cursor getCursor() { return currentScannerCallable != null ? currentScannerCallable.getCursor() : null; } - private void - addCallsForCurrentReplica(ResultBoundedCompletionService> cs, int rpcTimeout) { + private void addCallsForCurrentReplica( + ResultBoundedCompletionService> cs, int rpcTimeout) { RetryingRPC retryingOnReplica = new RetryingRPC(currentScannerCallable); outstandingCallables.add(currentScannerCallable); cs.submit(retryingOnReplica, rpcTimeout, scannerTimeout, currentScannerCallable.id); @@ -340,15 +340,13 @@ public Cursor getCursor() { /** * As we have a call sequence for scan, it is useless to have a different rpc timeout which is - * less than the scan timeout. If the server does not respond in time(usually this will not - * happen as we have heartbeat now), we will get an OutOfOrderScannerNextException when - * resending the next request and the only way to fix this is to close the scanner and open a - * new one. - * - * The legacy behavior of ScannerCallable has been to use readRpcTimeout despite the above. If - * using legacy behavior, we always use that. If new behavior is enabled, we determine the rpc - * timeout to use based on whether the scanner is open. If scanner is open, use scannerTimeout - * otherwise use readRpcTimeout. + * less than the scan timeout. If the server does not respond in time(usually this will not happen + * as we have heartbeat now), we will get an OutOfOrderScannerNextException when resending the + * next request and the only way to fix this is to close the scanner and open a new one. The + * legacy behavior of ScannerCallable has been to use readRpcTimeout despite the above. If using + * legacy behavior, we always use that. If new behavior is enabled, we determine the rpc timeout + * to use based on whether the scanner is open. If scanner is open, use scannerTimeout otherwise + * use readRpcTimeout. */ private int getRpcTimeout() { if (useScannerTimeoutForNextCalls) { @@ -359,7 +357,8 @@ private int getRpcTimeout() { } private void addCallsForOtherReplicas( - ResultBoundedCompletionService> cs, int min, int max, int rpcTimeout) { + ResultBoundedCompletionService> cs, int min, int max, + int rpcTimeout) { for (int id = min; id <= max; id++) { if (currentScannerCallable.id == id) {