From c42ca473b0d0c593cb6a6d94807dca1f699b7fcc Mon Sep 17 00:00:00 2001 From: Bryan Beaudreault Date: Thu, 24 Mar 2022 17:23:51 -0400 Subject: [PATCH] Unify blocking configs in ConnectionConfiguration, add special pause for server overloaded --- .../hbase/client/AsyncAdminBuilderBase.java | 6 +- .../AsyncAdminRequestRetryingCaller.java | 4 +- .../client/AsyncConnectionConfiguration.java | 28 +++-- .../hadoop/hbase/client/AsyncProcess.java | 21 +--- .../hbase/client/AsyncRequestFutureImpl.java | 12 +- .../client/AsyncRpcRetryingCallerFactory.java | 2 +- .../hbase/client/AsyncTableBuilderBase.java | 6 +- .../hbase/client/ConnectionConfiguration.java | 40 ++++++- .../client/ConnectionImplementation.java | 23 +--- .../hbase/client/RawAsyncHBaseAdmin.java | 6 +- .../hbase/client/RawAsyncTableImpl.java | 6 +- .../client/RpcRetryingCallerFactory.java | 40 +++---- .../hbase/client/RpcRetryingCallerImpl.java | 20 ++-- .../TestAsyncConnectionConfiguration.java | 19 ++- .../hadoop/hbase/client/TestAsyncProcess.java | 58 ++++++--- .../client/TestConnectionConfiguration.java | 36 ++++++ .../client/TestRpcRetryingCallerImpl.java | 110 ++++++++++++++++++ .../org/apache/hadoop/hbase/HConstants.java | 1 + .../client/HConnectionTestingUtility.java | 7 +- ...stAsyncClientPauseForServerOverloaded.java | 30 +---- .../client/TestConnectionImplementation.java | 99 +++++++++++++++- 21 files changed, 421 insertions(+), 153 deletions(-) create mode 100644 hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestConnectionConfiguration.java create mode 100644 hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestRpcRetryingCallerImpl.java diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdminBuilderBase.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdminBuilderBase.java index ab39d34f3843..cd023d8134d8 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdminBuilderBase.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdminBuilderBase.java @@ -33,7 +33,7 @@ abstract class AsyncAdminBuilderBase implements AsyncAdminBuilder { protected long pauseNs; - protected long pauseForServerOverloaded; + protected long pauseNsForServerOverloaded; protected int maxAttempts; @@ -43,7 +43,7 @@ abstract class AsyncAdminBuilderBase implements AsyncAdminBuilder { this.rpcTimeoutNs = connConf.getRpcTimeoutNs(); this.operationTimeoutNs = connConf.getOperationTimeoutNs(); this.pauseNs = connConf.getPauseNs(); - this.pauseForServerOverloaded = connConf.getPauseForServerOverloaded(); + this.pauseNsForServerOverloaded = connConf.getPauseNsForServerOverloaded(); this.maxAttempts = connConf.getMaxRetries(); this.startLogErrorsCnt = connConf.getStartLogErrorsCnt(); } @@ -68,7 +68,7 @@ public AsyncAdminBuilder setRetryPause(long timeout, TimeUnit unit) { @Override public AsyncAdminBuilder setRetryPauseForServerOverloaded(long timeout, TimeUnit unit) { - this.pauseForServerOverloaded = unit.toNanos(timeout); + this.pauseNsForServerOverloaded = unit.toNanos(timeout); return this; } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdminRequestRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdminRequestRetryingCaller.java index a40eb8fe0c1c..f03e8b5cacb3 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdminRequestRetryingCaller.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdminRequestRetryingCaller.java @@ -44,9 +44,9 @@ public interface Callable { private ServerName serverName; public AsyncAdminRequestRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn, int priority, - long pauseNs, long pauseForServerOverloaded, int maxAttempts, long operationTimeoutNs, + long pauseNs, long pauseNsForServerOverloaded, int maxAttempts, long operationTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt, ServerName serverName, Callable callable) { - super(retryTimer, conn, priority, pauseNs, pauseForServerOverloaded, maxAttempts, + super(retryTimer, conn, priority, pauseNs, pauseNsForServerOverloaded, maxAttempts, operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt); this.serverName = serverName; this.callable = callable; diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java index bda34b8ba86a..6d3a259790d9 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java @@ -54,6 +54,7 @@ import java.util.concurrent.TimeUnit; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HConstants; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -66,6 +67,16 @@ class AsyncConnectionConfiguration { private static final Logger LOG = LoggerFactory.getLogger(AsyncConnectionConfiguration.class); + static { + // This is added where the configs are referenced. It may be too late to happen before + // any user _sets_ the old cqtbe config onto a Configuration option. So we still need + // to handle checking both properties in parsing below. The benefit of calling this is + // that it should still cause Configuration to log a warning if we do end up falling + // through to the old deprecated config. + Configuration.addDeprecation( + HConstants.HBASE_CLIENT_PAUSE_FOR_CQTBE, HBASE_CLIENT_PAUSE_FOR_SERVER_OVERLOADED); + } + private final long metaOperationTimeoutNs; // timeout for a whole operation such as get, put or delete. Notice that scan will not be effected @@ -84,7 +95,7 @@ class AsyncConnectionConfiguration { private final long pauseNs; - private final long pauseForServerOverloaded; + private final long pauseNsForServerOverloaded; private final int maxRetries; @@ -129,16 +140,17 @@ class AsyncConnectionConfiguration { this.writeRpcTimeoutNs = TimeUnit.MILLISECONDS.toNanos(conf.getLong(HBASE_RPC_WRITE_TIMEOUT_KEY, rpcTimeoutMs)); long pauseMs = conf.getLong(HBASE_CLIENT_PAUSE, DEFAULT_HBASE_CLIENT_PAUSE); - long pauseForServerOverloaded = conf.getLong(HBASE_CLIENT_PAUSE_FOR_SERVER_OVERLOADED, pauseMs); - if (pauseForServerOverloaded < pauseMs) { + long pauseMsForServerOverloaded = conf.getLong(HBASE_CLIENT_PAUSE_FOR_SERVER_OVERLOADED, + conf.getLong(HConstants.HBASE_CLIENT_PAUSE_FOR_CQTBE, pauseMs)); + if (pauseMsForServerOverloaded < pauseMs) { LOG.warn( "The {} setting: {} ms is less than the {} setting: {} ms, use the greater one instead", - HBASE_CLIENT_PAUSE_FOR_SERVER_OVERLOADED, pauseForServerOverloaded, + HBASE_CLIENT_PAUSE_FOR_SERVER_OVERLOADED, pauseMsForServerOverloaded, HBASE_CLIENT_PAUSE, pauseMs); - pauseForServerOverloaded = pauseMs; + pauseMsForServerOverloaded = pauseMs; } this.pauseNs = TimeUnit.MILLISECONDS.toNanos(pauseMs); - this.pauseForServerOverloaded = TimeUnit.MILLISECONDS.toNanos(pauseForServerOverloaded); + this.pauseNsForServerOverloaded = TimeUnit.MILLISECONDS.toNanos(pauseMsForServerOverloaded); this.maxRetries = conf.getInt(HBASE_CLIENT_RETRIES_NUMBER, DEFAULT_HBASE_CLIENT_RETRIES_NUMBER); this.startLogErrorsCnt = conf.getInt(START_LOG_ERRORS_AFTER_COUNT_KEY, DEFAULT_START_LOG_ERRORS_AFTER_COUNT); @@ -189,8 +201,8 @@ long getPauseNs() { return pauseNs; } - long getPauseForServerOverloaded() { - return pauseForServerOverloaded; + long getPauseNsForServerOverloaded() { + return pauseNsForServerOverloaded; } int getMaxRetries() { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java index 6071cb63645e..b2f7ea4423a5 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java @@ -134,14 +134,13 @@ public void waitUntilDone() throws InterruptedIOException { final long id; final ClusterConnection connection; + final ConnectionConfiguration connectionConfiguration; private final RpcRetryingCallerFactory rpcCallerFactory; final RpcControllerFactory rpcFactory; // Start configuration settings. final int startLogErrorsCnt; - final long pause; - final long pauseForCQTBE;// pause for CallQueueTooBigException, if specified final int numTries; long serverTrackerTimeout; final long primaryCallTimeoutMicroseconds; @@ -163,23 +162,12 @@ public void waitUntilDone() throws InterruptedIOException { } this.connection = hc; + this.connectionConfiguration = connection.getConnectionConfiguration(); this.id = COUNTER.incrementAndGet(); - this.pause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE, - HConstants.DEFAULT_HBASE_CLIENT_PAUSE); - long configuredPauseForCQTBE = conf.getLong(HConstants.HBASE_CLIENT_PAUSE_FOR_CQTBE, pause); - if (configuredPauseForCQTBE < pause) { - LOG.warn("The " + HConstants.HBASE_CLIENT_PAUSE_FOR_CQTBE + " setting: " - + configuredPauseForCQTBE + " is smaller than " + HConstants.HBASE_CLIENT_PAUSE - + ", will use " + pause + " instead."); - this.pauseForCQTBE = pause; - } else { - this.pauseForCQTBE = configuredPauseForCQTBE; - } // how many times we could try in total, one more than retry number - this.numTries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, - HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER) + 1; + this.numTries = connectionConfiguration.getRetriesNumber() + 1; this.primaryCallTimeoutMicroseconds = conf.getInt(PRIMARY_CALL_TIMEOUT_KEY, 10000); this.startLogErrorsCnt = conf.getInt(START_LOG_ERRORS_AFTER_COUNT_KEY, DEFAULT_START_LOG_ERRORS_AFTER_COUNT); @@ -193,7 +181,8 @@ public void waitUntilDone() throws InterruptedIOException { // we will do more retries in aggregate, but the user will be none the wiser. this.serverTrackerTimeout = 0L; for (int i = 0; i < this.numTries; ++i) { - serverTrackerTimeout = serverTrackerTimeout + ConnectionUtils.getPauseTime(this.pause, i); + serverTrackerTimeout = serverTrackerTimeout + + ConnectionUtils.getPauseTime(connectionConfiguration.getPauseMillis(), i); } this.rpcCallerFactory = rpcCaller; diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFutureImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFutureImpl.java index ca6d5342d57a..fa3eeb14ca6a 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFutureImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFutureImpl.java @@ -36,13 +36,13 @@ import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; -import org.apache.hadoop.hbase.CallQueueTooBigException; import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.RegionLocations; import org.apache.hadoop.hbase.RetryImmediatelyException; import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.ServerOverloadedException; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.backoff.ServerStatistics; import org.apache.hadoop.hbase.client.coprocessor.Batch; @@ -738,11 +738,13 @@ private void resubmit(ServerName oldServer, List toReplay, long backOffTime; if (retryImmediately) { backOffTime = 0; - } else if (throwable instanceof CallQueueTooBigException) { - // Give a special check on CQTBE, see #HBASE-17114 - backOffTime = errorsByServer.calculateBackoffTime(oldServer, asyncProcess.pauseForCQTBE); + } else if (throwable instanceof ServerOverloadedException) { + // Give a special check on ServerOverloadedException, see #HBASE-17114 and HBASE-26807 + backOffTime = errorsByServer.calculateBackoffTime(oldServer, + asyncProcess.connectionConfiguration.getPauseMillisForServerOverloaded()); } else { - backOffTime = errorsByServer.calculateBackoffTime(oldServer, asyncProcess.pause); + backOffTime = errorsByServer.calculateBackoffTime(oldServer, + asyncProcess.connectionConfiguration.getPauseMillis()); } if (numAttempt > asyncProcess.startLogErrorsCnt) { // We use this value to have some logs when we have multiple failures, but not too many diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java index c3a426eeb74c..d501998f8684 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java @@ -58,7 +58,7 @@ private abstract class BuilderBase { protected long pauseNs = conn.connConf.getPauseNs(); - protected long pauseNsForServerOverloaded = conn.connConf.getPauseForServerOverloaded(); + protected long pauseNsForServerOverloaded = conn.connConf.getPauseNsForServerOverloaded(); protected int maxAttempts = retries2Attempts(conn.connConf.getMaxRetries()); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBuilderBase.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBuilderBase.java index 731bc7255f80..bec9f1236907 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBuilderBase.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBuilderBase.java @@ -45,7 +45,7 @@ abstract class AsyncTableBuilderBase protected long pauseNs; - protected long pauseForServerOverloaded; + protected long pauseNsForServerOverloaded; protected int maxAttempts; @@ -60,7 +60,7 @@ abstract class AsyncTableBuilderBase this.readRpcTimeoutNs = connConf.getReadRpcTimeoutNs(); this.writeRpcTimeoutNs = connConf.getWriteRpcTimeoutNs(); this.pauseNs = connConf.getPauseNs(); - this.pauseForServerOverloaded = connConf.getPauseForServerOverloaded(); + this.pauseNsForServerOverloaded = connConf.getPauseNsForServerOverloaded(); this.maxAttempts = retries2Attempts(connConf.getMaxRetries()); this.startLogErrorsCnt = connConf.getStartLogErrorsCnt(); } @@ -103,7 +103,7 @@ public AsyncTableBuilderBase setRetryPause(long pause, TimeUnit unit) { @Override public AsyncTableBuilderBase setRetryPauseForServerOverloaded(long pause, TimeUnit unit) { - this.pauseForServerOverloaded = unit.toNanos(pause); + this.pauseNsForServerOverloaded = unit.toNanos(pause); return this; } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionConfiguration.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionConfiguration.java index 61f58552a1f2..f42467d5e774 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionConfiguration.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionConfiguration.java @@ -11,10 +11,13 @@ package org.apache.hadoop.hbase.client; +import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_CLIENT_PAUSE; +import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_PAUSE; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.HConstants; import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Configuration parameters for the connection. @@ -24,8 +27,9 @@ * object so that expensive conf.getXXX() calls are avoided every time HTable, etc is instantiated. * see HBASE-12128 */ -@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG) +@InterfaceAudience.Private public class ConnectionConfiguration { + private static final Logger LOG = LoggerFactory.getLogger(ConnectionConfiguration.class); public static final String WRITE_BUFFER_SIZE_KEY = "hbase.client.write.buffer"; public static final long WRITE_BUFFER_SIZE_DEFAULT = 2097152; @@ -52,6 +56,11 @@ public class ConnectionConfiguration { "hbase.client.pause.server.overloaded"; static { + // This is added where the configs are referenced. It may be too late to happen before + // any user _sets_ the old cqtbe config onto a Configuration option. So we still need + // to handle checking both properties in parsing below. The benefit of calling this is + // that it should still cause Configuration to log a warning if we do end up falling + // through to the old deprecated config. Configuration.addDeprecation( HConstants.HBASE_CLIENT_PAUSE_FOR_CQTBE, HBASE_CLIENT_PAUSE_FOR_SERVER_OVERLOADED); } @@ -73,6 +82,8 @@ public class ConnectionConfiguration { private final int writeRpcTimeout; // toggle for async/sync prefetch private final boolean clientScannerAsyncPrefetch; + private final long pauseMs; + private final long pauseMsForServerOverloaded; /** * Constructor @@ -128,6 +139,21 @@ public class ConnectionConfiguration { this.writeRpcTimeout = conf.getInt(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY, conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT)); + + + long pauseMs = conf.getLong(HBASE_CLIENT_PAUSE, DEFAULT_HBASE_CLIENT_PAUSE); + long pauseMsForServerOverloaded = conf.getLong(HBASE_CLIENT_PAUSE_FOR_SERVER_OVERLOADED, + conf.getLong(HConstants.HBASE_CLIENT_PAUSE_FOR_CQTBE, pauseMs)); + if (pauseMsForServerOverloaded < pauseMs) { + LOG.warn( + "The {} setting: {} ms is less than the {} setting: {} ms, use the greater one instead", + HBASE_CLIENT_PAUSE_FOR_SERVER_OVERLOADED, pauseMsForServerOverloaded, + HBASE_CLIENT_PAUSE, pauseMs); + pauseMsForServerOverloaded = pauseMs; + } + + this.pauseMs = pauseMs; + this.pauseMsForServerOverloaded = pauseMsForServerOverloaded; } /** @@ -153,6 +179,8 @@ protected ConnectionConfiguration() { this.readRpcTimeout = HConstants.DEFAULT_HBASE_RPC_TIMEOUT; this.writeRpcTimeout = HConstants.DEFAULT_HBASE_RPC_TIMEOUT; this.rpcTimeout = HConstants.DEFAULT_HBASE_RPC_TIMEOUT; + this.pauseMs = DEFAULT_HBASE_CLIENT_PAUSE; + this.pauseMsForServerOverloaded = DEFAULT_HBASE_CLIENT_PAUSE; } public int getReadRpcTimeout() { @@ -218,4 +246,12 @@ public boolean isClientScannerAsyncPrefetch() { public int getRpcTimeout() { return rpcTimeout; } + + public long getPauseMillis() { + return pauseMs; + } + + public long getPauseMillisForServerOverloaded() { + return pauseMsForServerOverloaded; + } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java index 13715d9471ac..657d5ee0dec5 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java @@ -53,7 +53,6 @@ import java.util.stream.Collectors; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.AuthUtil; -import org.apache.hadoop.hbase.CallQueueTooBigException; import org.apache.hadoop.hbase.ChoreService; import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HConstants; @@ -62,6 +61,7 @@ import org.apache.hadoop.hbase.MetaTableAccessor; import org.apache.hadoop.hbase.RegionLocations; import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.ServerOverloadedException; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableNotEnabledException; import org.apache.hadoop.hbase.TableNotFoundException; @@ -174,8 +174,6 @@ public class ConnectionImplementation implements ClusterConnection, Closeable { public static final String RETRIES_BY_SERVER_KEY = "hbase.client.retries.by.server"; private static final Logger LOG = LoggerFactory.getLogger(ConnectionImplementation.class); - private final long pause; - private final long pauseForCQTBE;// pause for CallQueueTooBigException, if specified // The mode tells if HedgedRead, LoadBalance mode is supported. // The default mode is CatalogReplicaMode.None. private CatalogReplicaMode metaReplicaMode; @@ -274,17 +272,6 @@ public class ConnectionImplementation implements ClusterConnection, Closeable { this.batchPool = (ThreadPoolExecutor) pool; this.connectionConfig = new ConnectionConfiguration(conf); this.closed = false; - this.pause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE, - HConstants.DEFAULT_HBASE_CLIENT_PAUSE); - long configuredPauseForCQTBE = conf.getLong(HConstants.HBASE_CLIENT_PAUSE_FOR_CQTBE, pause); - if (configuredPauseForCQTBE < pause) { - LOG.warn("The " + HConstants.HBASE_CLIENT_PAUSE_FOR_CQTBE + " setting: " - + configuredPauseForCQTBE + " is smaller than " + HConstants.HBASE_CLIENT_PAUSE - + ", will use " + pause + " instead."); - this.pauseForCQTBE = pause; - } else { - this.pauseForCQTBE = configuredPauseForCQTBE; - } this.metaReplicaCallTimeoutScanInMicroSecond = connectionConfig.getMetaReplicaCallTimeoutMicroSecondScan(); @@ -965,7 +952,7 @@ private RegionLocations locateRegionInMeta(TableName tableName, byte[] row, bool metaCache.clearCache(tableName, row, replicaId); } // Query the meta region - long pauseBase = this.pause; + long pauseBase = connectionConfig.getPauseMillis(); takeUserRegionLock(); try { // We don't need to check if useCache is enabled or not. Even if useCache is false @@ -1052,9 +1039,9 @@ rpcControllerFactory, getMetaLookupPool(), metaReplicaCallTimeoutScanInMicroSeco if (e instanceof RemoteException) { e = ((RemoteException)e).unwrapRemoteException(); } - if (e instanceof CallQueueTooBigException) { - // Give a special check on CallQueueTooBigException, see #HBASE-17114 - pauseBase = this.pauseForCQTBE; + if (e instanceof ServerOverloadedException) { + // Give a special check on ServerOverloadedException, see #HBASE-17114 and HBASE-26807 + pauseBase = connectionConfig.getPauseMillisForServerOverloaded(); } if (tries < maxAttempts - 1) { LOG.debug("locateRegionInMeta parentTable='{}', attempt={} of {} failed; retrying " + diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java index 78c16862d802..4c46e0b7cd79 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java @@ -349,15 +349,15 @@ class RawAsyncHBaseAdmin implements AsyncAdmin { this.rpcTimeoutNs = builder.rpcTimeoutNs; this.operationTimeoutNs = builder.operationTimeoutNs; this.pauseNs = builder.pauseNs; - if (builder.pauseForServerOverloaded < builder.pauseNs) { + if (builder.pauseNsForServerOverloaded < builder.pauseNs) { LOG.warn( "Configured value of pauseNsForServerOverloaded is {} ms, which is less than" + " the normal pause value {} ms, use the greater one instead", - TimeUnit.NANOSECONDS.toMillis(builder.pauseForServerOverloaded), + TimeUnit.NANOSECONDS.toMillis(builder.pauseNsForServerOverloaded), TimeUnit.NANOSECONDS.toMillis(builder.pauseNs)); this.pauseNsForServerOverloaded = builder.pauseNs; } else { - this.pauseNsForServerOverloaded = builder.pauseForServerOverloaded; + this.pauseNsForServerOverloaded = builder.pauseNsForServerOverloaded; } this.maxAttempts = builder.maxAttempts; this.startLogErrorsCnt = builder.startLogErrorsCnt; diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java index f2bf3cea5e9b..08f0bb725860 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java @@ -125,15 +125,15 @@ class RawAsyncTableImpl implements AsyncTable { this.operationTimeoutNs = builder.operationTimeoutNs; this.scanTimeoutNs = builder.scanTimeoutNs; this.pauseNs = builder.pauseNs; - if (builder.pauseForServerOverloaded < builder.pauseNs) { + if (builder.pauseNsForServerOverloaded < builder.pauseNs) { LOG.warn( "Configured value of pauseNsForServerOverloaded is {} ms, which is less than" + " the normal pause value {} ms, use the greater one instead", - TimeUnit.NANOSECONDS.toMillis(builder.pauseForServerOverloaded), + TimeUnit.NANOSECONDS.toMillis(builder.pauseNsForServerOverloaded), TimeUnit.NANOSECONDS.toMillis(builder.pauseNs)); this.pauseNsForServerOverloaded = builder.pauseNs; } else { - this.pauseNsForServerOverloaded = builder.pauseForServerOverloaded; + this.pauseNsForServerOverloaded = builder.pauseNsForServerOverloaded; } this.maxAttempts = builder.maxAttempts; this.startLogErrorsCnt = builder.startLogErrorsCnt; diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java index e7a3d1801044..7425f8837f62 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java @@ -21,8 +21,6 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.util.ReflectionUtils; import org.apache.yetus.audience.InterfaceAudience; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * Factory to create an {@link RpcRetryingCaller} @@ -32,12 +30,8 @@ public class RpcRetryingCallerFactory { /** Configuration key for a custom {@link RpcRetryingCaller} */ public static final String CUSTOM_CALLER_CONF_KEY = "hbase.rpc.callerfactory.class"; - private static final Logger LOG = LoggerFactory.getLogger(RpcRetryingCallerFactory.class); protected final Configuration conf; - private final long pause; - private final long pauseForCQTBE;// pause for CallQueueTooBigException, if specified - private final int retries; - private final int rpcTimeout; + private final ConnectionConfiguration connectionConf; private final RetryingCallerInterceptor interceptor; private final int startLogErrorsCnt; /* These below data members are UNUSED!!!*/ @@ -50,25 +44,12 @@ public RpcRetryingCallerFactory(Configuration conf) { public RpcRetryingCallerFactory(Configuration conf, RetryingCallerInterceptor interceptor) { this.conf = conf; - pause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE, - HConstants.DEFAULT_HBASE_CLIENT_PAUSE); - long configuredPauseForCQTBE = conf.getLong(HConstants.HBASE_CLIENT_PAUSE_FOR_CQTBE, pause); - if (configuredPauseForCQTBE < pause) { - LOG.warn("The " + HConstants.HBASE_CLIENT_PAUSE_FOR_CQTBE + " setting: " - + configuredPauseForCQTBE + " is smaller than " + HConstants.HBASE_CLIENT_PAUSE - + ", will use " + pause + " instead."); - this.pauseForCQTBE = pause; - } else { - this.pauseForCQTBE = configuredPauseForCQTBE; - } - retries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, - HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER); + this.connectionConf = new ConnectionConfiguration(conf); startLogErrorsCnt = conf.getInt(AsyncProcess.START_LOG_ERRORS_AFTER_COUNT_KEY, AsyncProcess.DEFAULT_START_LOG_ERRORS_AFTER_COUNT); this.interceptor = interceptor; enableBackPressure = conf.getBoolean(HConstants.ENABLE_CLIENT_BACKPRESSURE, HConstants.DEFAULT_ENABLE_CLIENT_BACKPRESSURE); - rpcTimeout = conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,HConstants.DEFAULT_HBASE_RPC_TIMEOUT); } /** @@ -84,9 +65,11 @@ public void setStatisticTracker(ServerStatisticTracker statisticTracker) { public RpcRetryingCaller newCaller(int rpcTimeout) { // We store the values in the factory instance. This way, constructing new objects // is cheap as it does not require parsing a complex structure. - RpcRetryingCaller caller = new RpcRetryingCallerImpl<>(pause, pauseForCQTBE, retries, - interceptor, startLogErrorsCnt, rpcTimeout); - return caller; + return new RpcRetryingCallerImpl<>( + connectionConf.getPauseMillis(), + connectionConf.getPauseMillisForServerOverloaded(), + connectionConf.getRetriesNumber(), + interceptor, startLogErrorsCnt, rpcTimeout); } /** @@ -95,9 +78,12 @@ public RpcRetryingCaller newCaller(int rpcTimeout) { public RpcRetryingCaller newCaller() { // We store the values in the factory instance. This way, constructing new objects // is cheap as it does not require parsing a complex structure. - RpcRetryingCaller caller = new RpcRetryingCallerImpl<>(pause, pauseForCQTBE, retries, - interceptor, startLogErrorsCnt, rpcTimeout); - return caller; + return new RpcRetryingCallerImpl<>( + connectionConf.getPauseMillis(), + connectionConf.getPauseMillisForServerOverloaded(), + connectionConf.getRetriesNumber(), + interceptor, startLogErrorsCnt, + connectionConf.getRpcTimeout()); } public static RpcRetryingCallerFactory instantiate(Configuration configuration) { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerImpl.java index fb58bfae9eff..18caf49647d4 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerImpl.java @@ -30,8 +30,8 @@ import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; -import org.apache.hadoop.hbase.CallQueueTooBigException; import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.hadoop.hbase.ServerOverloadedException; import org.apache.hadoop.hbase.exceptions.PreemptiveFastFailException; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.ExceptionUtil; @@ -60,7 +60,7 @@ public class RpcRetryingCallerImpl implements RpcRetryingCaller { private final int startLogErrorsCnt; private final long pause; - private final long pauseForCQTBE; + private final long pauseForServerOverloaded; private final int maxAttempts;// how many times to try private final int rpcTimeout;// timeout for each rpc request private final AtomicBoolean cancelled = new AtomicBoolean(false); @@ -68,15 +68,16 @@ public class RpcRetryingCallerImpl implements RpcRetryingCaller { private final RetryingCallerInterceptorContext context; private final RetryingTimeTracker tracker; - public RpcRetryingCallerImpl(long pause, long pauseForCQTBE, int retries, int startLogErrorsCnt) { - this(pause, pauseForCQTBE, retries, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, - startLogErrorsCnt, 0); + public RpcRetryingCallerImpl(long pause, long pauseForServerOverloaded, int retries, + int startLogErrorsCnt) { + this(pause, pauseForServerOverloaded, retries, + RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, startLogErrorsCnt, 0); } - public RpcRetryingCallerImpl(long pause, long pauseForCQTBE, int retries, + public RpcRetryingCallerImpl(long pause, long pauseForServerOverloaded, int retries, RetryingCallerInterceptor interceptor, int startLogErrorsCnt, int rpcTimeout) { this.pause = pause; - this.pauseForCQTBE = pauseForCQTBE; + this.pauseForServerOverloaded = pauseForServerOverloaded; this.maxAttempts = retries2Attempts(retries); this.interceptor = interceptor; context = interceptor.createEmptyContext(); @@ -148,8 +149,9 @@ public T callWithRetries(RetryingCallable callable, int callTimeout) // If the server is dead, we need to wait a little before retrying, to give // a chance to the regions to be moved // get right pause time, start by RETRY_BACKOFF[0] * pauseBase, where pauseBase might be - // special when encountering CallQueueTooBigException, see #HBASE-17114 - long pauseBase = (t instanceof CallQueueTooBigException) ? pauseForCQTBE : pause; + // special when encountering ServerOverloadedException, see #HBASE-17114 and HBASE-26807 + long pauseBase = (t instanceof ServerOverloadedException) + ? pauseForServerOverloaded : pause; expectedSleep = callable.sleep(pauseBase, tries); // If, after the planned sleep, there won't be enough time left, we stop now. diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncConnectionConfiguration.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncConnectionConfiguration.java index b2d5b872e757..d5a2b5b24861 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncConnectionConfiguration.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncConnectionConfiguration.java @@ -18,7 +18,7 @@ package org.apache.hadoop.hbase.client; import static org.junit.Assert.assertEquals; - +import static org.junit.Assert.assertTrue; import java.util.concurrent.TimeUnit; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseClassTestRule; @@ -40,6 +40,23 @@ public class TestAsyncConnectionConfiguration { public static final HBaseClassTestRule CLASS_RULE = HBaseClassTestRule.forClass(TestAsyncConnectionConfiguration.class); + @Test + public void itHandlesDeprecatedPauseForCQTBE() { + Configuration conf = new Configuration(); + long timeoutMs = 1000; + conf.setLong(HConstants.HBASE_CLIENT_PAUSE_FOR_CQTBE, timeoutMs); + AsyncConnectionConfiguration config = new AsyncConnectionConfiguration(conf); + + assertTrue(Configuration.isDeprecated(HConstants.HBASE_CLIENT_PAUSE_FOR_CQTBE)); + long expected = TimeUnit.MILLISECONDS.toNanos(timeoutMs); + assertEquals(expected, config.getPauseNsForServerOverloaded()); + + conf = new Configuration(); + conf.setLong(ConnectionConfiguration.HBASE_CLIENT_PAUSE_FOR_SERVER_OVERLOADED, timeoutMs); + config = new AsyncConnectionConfiguration(conf); + assertEquals(expected, config.getPauseNsForServerOverloaded()); + } + @Test public void testDefaultReadWriteRpcTimeout() { Configuration conf = HBaseConfiguration.create(); diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java index 1aa3bb70ab2d..0f48480bea40 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java @@ -48,6 +48,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.CallDroppedException; import org.apache.hadoop.hbase.CallQueueTooBigException; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.HBaseClassTestRule; @@ -56,6 +57,7 @@ import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.RegionLocations; import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.ServerOverloadedException; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.AsyncProcessTask.ListRowAccess; import org.apache.hadoop.hbase.client.AsyncProcessTask.SubmittedRows; @@ -1038,7 +1040,12 @@ public void run() { } private ClusterConnection createHConnection() throws IOException { - ClusterConnection hc = createHConnectionCommon(); + return createHConnection(CONNECTION_CONFIG); + } + + private ClusterConnection createHConnection(ConnectionConfiguration configuration) + throws IOException { + ClusterConnection hc = createHConnectionCommon(configuration); setMockLocation(hc, DUMMY_BYTES_1, new RegionLocations(loc1)); setMockLocation(hc, DUMMY_BYTES_2, new RegionLocations(loc2)); setMockLocation(hc, DUMMY_BYTES_3, new RegionLocations(loc3)); @@ -1048,8 +1055,9 @@ private ClusterConnection createHConnection() throws IOException { return hc; } - private ClusterConnection createHConnectionWithReplicas() throws IOException { - ClusterConnection hc = createHConnectionCommon(); + private ClusterConnection createHConnectionWithReplicas(ConnectionConfiguration configuration) + throws IOException { + ClusterConnection hc = createHConnectionCommon(configuration); setMockLocation(hc, DUMMY_BYTES_1, hrls1); setMockLocation(hc, DUMMY_BYTES_2, hrls2); setMockLocation(hc, DUMMY_BYTES_3, hrls3); @@ -1076,13 +1084,14 @@ private static void setMockLocation(ClusterConnection hc, byte[] row, Mockito.anyBoolean(), Mockito.anyBoolean())).thenReturn(result); } - private ClusterConnection createHConnectionCommon() { + private ClusterConnection createHConnectionCommon( + ConnectionConfiguration connectionConfiguration) { ClusterConnection hc = Mockito.mock(ClusterConnection.class); NonceGenerator ng = Mockito.mock(NonceGenerator.class); Mockito.when(ng.getNonceGroup()).thenReturn(HConstants.NO_NONCE); Mockito.when(hc.getNonceGenerator()).thenReturn(ng); Mockito.when(hc.getConfiguration()).thenReturn(CONF); - Mockito.when(hc.getConnectionConfiguration()).thenReturn(CONNECTION_CONFIG); + Mockito.when(hc.getConnectionConfiguration()).thenReturn(connectionConfiguration); return hc; } @@ -1608,11 +1617,11 @@ private MyAsyncProcessWithReplicas createReplicaAp( // TODO: this is kind of timing dependent... perhaps it should detect from createCaller // that the replica call has happened and that way control the ordering. Configuration conf = new Configuration(); - ClusterConnection conn = createHConnectionWithReplicas(); conf.setInt(AsyncProcess.PRIMARY_CALL_TIMEOUT_KEY, replicaAfterMs * 1000); if (retries >= 0) { conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, retries); } + ClusterConnection conn = createHConnectionWithReplicas(new ConnectionConfiguration(conf)); MyAsyncProcessWithReplicas ap = new MyAsyncProcessWithReplicas(conn, conf); ap.setCallDelays(primaryMs, replicaMs); return ap; @@ -1736,20 +1745,31 @@ public void testUncheckedException() throws Exception { } /** - * Test and make sure we could use a special pause setting when retry with - * CallQueueTooBigException, see HBASE-17114 - * @throws Exception if unexpected error happened during test + * Below tests make sure we could use a special pause setting when retry with + * ServerOverloadedException, see HBASE-17114 */ + @Test - public void testRetryPauseWithCallQueueTooBigException() throws Exception { - Configuration myConf = new Configuration(CONF); + public void testRetryPauseWithServerOverloadedExceptionCQTBE() throws Exception { + testRetryPauseWithServerOverloadedException(new CallQueueTooBigException()); + } + + @Test + public void testRetryPauseWithServerOverloadedExceptionCDE() throws Exception { + testRetryPauseWithServerOverloadedException(new CallDroppedException()); + } + + private void testRetryPauseWithServerOverloadedException( + ServerOverloadedException exception) throws IOException { + Configuration conf = new Configuration(CONF); final long specialPause = 500L; final int retries = 1; - myConf.setLong(HConstants.HBASE_CLIENT_PAUSE_FOR_CQTBE, specialPause); - myConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, retries); - ClusterConnection conn = new MyConnectionImpl(myConf); + conf.setLong(ConnectionConfiguration.HBASE_CLIENT_PAUSE_FOR_SERVER_OVERLOADED, specialPause); + conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, retries); + + ClusterConnection conn = new MyConnectionImpl(conf); AsyncProcessWithFailure ap = - new AsyncProcessWithFailure(conn, myConf, new CallQueueTooBigException()); + new AsyncProcessWithFailure(conn, conf, exception); BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE); BufferedMutatorImpl mutator = new BufferedMutatorImpl(conn, bufferParam, ap); @@ -1779,8 +1799,8 @@ public void testRetryPauseWithCallQueueTooBigException() throws Exception { // check and confirm normal IOE will use the normal pause final long normalPause = - myConf.getLong(HConstants.HBASE_CLIENT_PAUSE, HConstants.DEFAULT_HBASE_CLIENT_PAUSE); - ap = new AsyncProcessWithFailure(conn, myConf, new IOException()); + conf.getLong(HConstants.HBASE_CLIENT_PAUSE, HConstants.DEFAULT_HBASE_CLIENT_PAUSE); + ap = new AsyncProcessWithFailure(conn, conf, new IOException()); bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE); mutator = new BufferedMutatorImpl(conn, bufferParam, ap); Assert.assertNotNull(mutator.getAsyncProcess().createServerErrorTracker()); @@ -1806,9 +1826,9 @@ public void testRetryPauseWithCallQueueTooBigException() throws Exception { @Test public void testRetryWithExceptionClearsMetaCache() throws Exception { - ClusterConnection conn = createHConnection(); - Configuration myConf = conn.getConfiguration(); + Configuration myConf = new Configuration(CONF); myConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 0); + ClusterConnection conn = createHConnection(new ConnectionConfiguration(myConf)); AsyncProcessWithFailure ap = new AsyncProcessWithFailure(conn, myConf, new RegionOpeningException("test")); diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestConnectionConfiguration.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestConnectionConfiguration.java new file mode 100644 index 000000000000..de4ddbb377c2 --- /dev/null +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestConnectionConfiguration.java @@ -0,0 +1,36 @@ +package org.apache.hadoop.hbase.client; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.testclassification.ClientTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ ClientTests.class, SmallTests.class}) +public class TestConnectionConfiguration { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestConnectionConfiguration.class); + + @Test + public void itHandlesDeprecatedPauseForCQTBE() { + Configuration conf = new Configuration(); + long timeoutMs = 1000; + conf.setLong(HConstants.HBASE_CLIENT_PAUSE_FOR_CQTBE, timeoutMs); + ConnectionConfiguration config = new ConnectionConfiguration(conf); + + assertTrue(Configuration.isDeprecated(HConstants.HBASE_CLIENT_PAUSE_FOR_CQTBE)); + assertEquals(timeoutMs, config.getPauseMillisForServerOverloaded()); + + conf = new Configuration(); + conf.setLong(ConnectionConfiguration.HBASE_CLIENT_PAUSE_FOR_SERVER_OVERLOADED, timeoutMs); + config = new ConnectionConfiguration(conf); + assertEquals(timeoutMs, config.getPauseMillisForServerOverloaded()); + } +} diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestRpcRetryingCallerImpl.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestRpcRetryingCallerImpl.java new file mode 100644 index 000000000000..acc85663f9a7 --- /dev/null +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestRpcRetryingCallerImpl.java @@ -0,0 +1,110 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import java.io.IOException; +import org.apache.hadoop.hbase.CallDroppedException; +import org.apache.hadoop.hbase.CallQueueTooBigException; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.ServerOverloadedException; +import org.apache.hadoop.hbase.testclassification.ClientTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ ClientTests.class, SmallTests.class}) +public class TestRpcRetryingCallerImpl { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestRpcRetryingCallerImpl.class); + + @Test + public void itUsesSpecialPauseForCQTBE() throws Exception { + itUsesSpecialPauseForServerOverloaded(CallQueueTooBigException.class); + } + + @Test + public void itUsesSpecialPauseForCDE() throws Exception { + itUsesSpecialPauseForServerOverloaded(CallDroppedException.class); + } + + private void itUsesSpecialPauseForServerOverloaded( + Class exceptionClass) throws Exception { + + // the actual values don't matter here as long as they're distinct. + // the ThrowingCallable will assert that the passed in pause from RpcRetryingCallerImpl + // matches the specialPauseMillis + long pauseMillis = 1; + long specialPauseMillis = 2; + + RpcRetryingCallerImpl caller = + new RpcRetryingCallerImpl<>(pauseMillis, specialPauseMillis, 2, 0); + + RetryingCallable callable = new ThrowingCallable( + CallQueueTooBigException.class, specialPauseMillis); + try { + caller.callWithRetries(callable, 5000); + fail("Expected " + exceptionClass.getSimpleName()); + } catch (RetriesExhaustedException e) { + assertTrue(e.getCause() instanceof ServerOverloadedException); + } + } + + private static class ThrowingCallable implements RetryingCallable { + private final Class exceptionClass; + private final long specialPauseMillis; + + public ThrowingCallable(Class exceptionClass, + long specialPauseMillis) { + this.exceptionClass = exceptionClass; + this.specialPauseMillis = specialPauseMillis; + } + + @Override + public void prepare(boolean reload) throws IOException { + + } + + @Override + public void throwable(Throwable t, boolean retrying) { + + } + + @Override + public String getExceptionMessageAdditionalDetail() { + return null; + } + + @Override + public long sleep(long pause, int tries) { + assertEquals(pause, specialPauseMillis); + return 0; + } + + @Override + public Void call(int callTimeout) throws Exception { + throw exceptionClass.getConstructor().newInstance(); + } + } +} diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java index 3c15c7890559..0f3183d68cc8 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java @@ -28,6 +28,7 @@ import java.util.regex.Pattern; import org.apache.commons.lang3.ArrayUtils; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.util.Bytes; import org.apache.yetus.audience.InterfaceAudience; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java index ea25856ade94..6cd41bd2bd07 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java @@ -111,6 +111,8 @@ public static ClusterConnection getMockedConnectionAndDecorate(final Configurati throws IOException { ConnectionImplementation c = Mockito.mock(ConnectionImplementation.class); Mockito.when(c.getConfiguration()).thenReturn(conf); + ConnectionConfiguration connectionConfiguration = new ConnectionConfiguration(conf); + Mockito.when(c.getConnectionConfiguration()).thenReturn(connectionConfiguration); Mockito.doNothing().when(c).close(); // Make it so we return a particular location when asked. final HRegionLocation loc = new HRegionLocation(hri, sn); @@ -134,9 +136,10 @@ public static ClusterConnection getMockedConnectionAndDecorate(final Configurati } NonceGenerator ng = Mockito.mock(NonceGenerator.class); Mockito.when(c.getNonceGenerator()).thenReturn(ng); - Mockito.when(c.getAsyncProcess()).thenReturn( + AsyncProcess asyncProcess = new AsyncProcess(c, conf, RpcRetryingCallerFactory.instantiate(conf), - RpcControllerFactory.instantiate(conf))); + RpcControllerFactory.instantiate(conf)); + Mockito.when(c.getAsyncProcess()).thenReturn(asyncProcess); Mockito.when(c.getNewRpcRetryingCallerFactory(conf)).thenReturn( RpcRetryingCallerFactory.instantiate(conf, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, null)); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncClientPauseForServerOverloaded.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncClientPauseForServerOverloaded.java index 28368f92ea85..6bfe8e677154 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncClientPauseForServerOverloaded.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncClientPauseForServerOverloaded.java @@ -17,11 +17,8 @@ */ package org.apache.hadoop.hbase.client; -import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_PAUSE_FOR_CQTBE; import static org.apache.hadoop.hbase.client.ConnectionConfiguration.HBASE_CLIENT_PAUSE_FOR_SERVER_OVERLOADED; import static org.junit.Assert.assertArrayEquals; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import java.io.IOException; @@ -81,7 +78,6 @@ public class TestAsyncClientPauseForServerOverloaded { PAUSE_FOR_SERVER_OVERLOADED_NANOS); private static AsyncConnection CONN; - private static AsyncConnection DEPRECATED_CONN; private static volatile FailMode MODE = null; @@ -165,10 +161,6 @@ public static void setUp() throws Exception { Configuration conf = new Configuration(UTIL.getConfiguration()); conf.setLong(HBASE_CLIENT_PAUSE_FOR_SERVER_OVERLOADED, PAUSE_FOR_SERVER_OVERLOADED_MILLIS); CONN = ConnectionFactory.createAsyncConnection(conf).get(); - - Configuration deprecatedConf = new Configuration(UTIL.getConfiguration()); - deprecatedConf.setLong(HBASE_CLIENT_PAUSE_FOR_CQTBE, PAUSE_FOR_SERVER_OVERLOADED_MILLIS); - DEPRECATED_CONN = ConnectionFactory.createAsyncConnection(deprecatedConf).get(); } @AfterClass @@ -208,30 +200,10 @@ private void assertTime(Callable callable, long time) throws Exception { } } - @Test - public void testDeprecation() throws Exception { - // Check that we handled deprecation appropriately - assertTrue(Configuration.isDeprecated(HBASE_CLIENT_PAUSE_FOR_CQTBE)); - assertEquals(DEPRECATED_CONN.getConfiguration() - .getLong(HBASE_CLIENT_PAUSE_FOR_SERVER_OVERLOADED, 0L), - PAUSE_FOR_SERVER_OVERLOADED_MILLIS); - assertEquals(DEPRECATED_CONN.getConfiguration() - .getLong(HBASE_CLIENT_PAUSE_FOR_CQTBE, 0L), - PAUSE_FOR_SERVER_OVERLOADED_MILLIS); - - // Do an end-to-end test that also proves that using the deprecated - // config still affected the timeout - testGetWithConn(DEPRECATED_CONN); - } - @Test public void testGet() throws Exception { - testGetWithConn(CONN); - } - - private void testGetWithConn(AsyncConnection conn) throws Exception { assertTime(() -> { - Result result = conn.getTable(TABLE_NAME).get(new Get(Bytes.toBytes(0))).get(); + Result result = CONN.getTable(TABLE_NAME).get(new Get(Bytes.toBytes(0))).get(); assertArrayEquals(Bytes.toBytes(0), result.getValue(FAMILY, QUALIFIER)); return null; }, PAUSE_FOR_SERVER_OVERLOADED_NANOS); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestConnectionImplementation.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestConnectionImplementation.java index 1472fe77fcc2..057f20f349dd 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestConnectionImplementation.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestConnectionImplementation.java @@ -22,9 +22,9 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; - import java.io.IOException; import java.lang.reflect.Field; +import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Modifier; import java.net.SocketTimeoutException; import java.util.ArrayList; @@ -41,6 +41,8 @@ import java.util.stream.Collectors; import java.util.stream.IntStream; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.CallDroppedException; +import org.apache.hadoop.hbase.CallQueueTooBigException; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseTestingUtility; @@ -48,6 +50,7 @@ import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.RegionLocations; import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.ServerOverloadedException; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.Waiter; import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil; @@ -79,7 +82,6 @@ import org.junit.rules.TestName; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import org.apache.hbase.thirdparty.com.google.common.collect.Lists; import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.hbase.thirdparty.io.netty.util.ResourceLeakDetector; @@ -1086,6 +1088,99 @@ public void testLocateRegionsWithRegionReplicas() throws IOException { } } + @Test + public void testLocateRegionsRetrySpecialPauseCQTBE() throws IOException { + testLocateRegionsRetrySpecialPause(CallQueueTooBigException.class); + } + + @Test + public void testLocateRegionsRetrySpecialPauseCDE() throws IOException { + testLocateRegionsRetrySpecialPause(CallDroppedException.class); + } + + private void testLocateRegionsRetrySpecialPause( + Class exceptionClass) throws IOException { + + int regionReplication = 3; + byte[] family = Bytes.toBytes("cf"); + TableName tableName = TableName.valueOf(name.getMethodName()); + + // Create a table with region replicas + TableDescriptorBuilder builder = TableDescriptorBuilder + .newBuilder(tableName) + .setRegionReplication(regionReplication) + .setColumnFamily(ColumnFamilyDescriptorBuilder.of(family)); + TEST_UTIL.getAdmin().createTable(builder.build()); + + Configuration conf = new Configuration(TEST_UTIL.getConfiguration()); + + conf.setClass(RpcRetryingCallerFactory.CUSTOM_CALLER_CONF_KEY, + ThrowingCallerFactory.class, RpcRetryingCallerFactory.class); + conf.setClass("testSpecialPauseException", exceptionClass, ServerOverloadedException.class); + + conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2); + // normal pause very short, 10 millis + conf.setInt(HConstants.HBASE_CLIENT_PAUSE, 10); + + // special pause 10x longer, so we can detect it + long specialPauseMillis = 1000; + conf.setLong(ConnectionConfiguration.HBASE_CLIENT_PAUSE_FOR_SERVER_OVERLOADED, + specialPauseMillis); + + try (ConnectionImplementation con = + (ConnectionImplementation) ConnectionFactory.createConnection(conf)) { + // Get locations of the regions of the table + + long start = System.nanoTime(); + try { + con.locateRegion(tableName, new byte[0], false, true, 0); + } catch (ServerOverloadedException e) { + // pass: expected + } + assertTrue(System.nanoTime() - start > TimeUnit.MILLISECONDS.toNanos(specialPauseMillis)); + } finally { + TEST_UTIL.deleteTable(tableName); + } + } + + private static class ThrowingCallerFactory extends RpcRetryingCallerFactory { + + private final Class exceptionClass; + + public ThrowingCallerFactory(Configuration conf) { + super(conf); + this.exceptionClass = conf.getClass("testSpecialPauseException", + null, ServerOverloadedException.class); + } + + @Override public RpcRetryingCaller newCaller(int rpcTimeout) { + return newCaller(); + } + + @Override public RpcRetryingCaller newCaller() { + return new RpcRetryingCaller() { + @Override public void cancel() { + + } + + @Override public T callWithRetries(RetryingCallable callable, int callTimeout) + throws IOException, RuntimeException { + return callWithoutRetries(null, 0); + } + + @Override public T callWithoutRetries(RetryingCallable callable, int callTimeout) + throws IOException, RuntimeException { + try { + throw exceptionClass.getConstructor().newInstance(); + } catch (IllegalAccessException | InstantiationException + | InvocationTargetException | NoSuchMethodException e) { + throw new RuntimeException(e); + } + } + }; + } + } + @Test public void testMetaLookupThreadPoolCreated() throws Exception { final TableName tableName = TableName.valueOf(name.getMethodName());