diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/CallDroppedException.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/CallDroppedException.java index 13ab3ed47cee..3feaaaf17a81 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/CallDroppedException.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/CallDroppedException.java @@ -18,8 +18,6 @@ package org.apache.hadoop.hbase; -import java.io.IOException; - import org.apache.yetus.audience.InterfaceAudience; /** @@ -28,14 +26,16 @@ */ @SuppressWarnings("serial") @InterfaceAudience.Public -public class CallDroppedException extends IOException { +public class CallDroppedException extends HBaseServerException { public CallDroppedException() { - super(); + // For now all call drops are due to server being overloaded. + // We could decouple this if desired. + super(true); } // Absence of this constructor prevents proper unwrapping of // remote exception on the client side public CallDroppedException(String message) { - super(message); + super(true, message); } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/CallQueueTooBigException.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/CallQueueTooBigException.java index 12fa242693c8..6bf68bc4ad0e 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/CallQueueTooBigException.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/CallQueueTooBigException.java @@ -18,13 +18,15 @@ package org.apache.hadoop.hbase; -import java.io.IOException; - import org.apache.yetus.audience.InterfaceAudience; +/** + * Returned to clients when their request was dropped because the call queue was too big to + * accept a new call. Clients should retry upon receiving it. + */ @SuppressWarnings("serial") @InterfaceAudience.Public -public class CallQueueTooBigException extends IOException { +public class CallQueueTooBigException extends CallDroppedException { public CallQueueTooBigException() { super(); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/HBaseServerException.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/HBaseServerException.java new file mode 100644 index 000000000000..c72ed19e486b --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/HBaseServerException.java @@ -0,0 +1,72 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase; + +import org.apache.yetus.audience.InterfaceAudience; + +/** + * Base class for exceptions thrown by an HBase server. May contain extra info about + * the state of the server when the exception was thrown. + */ +@InterfaceAudience.Public +public class HBaseServerException extends HBaseIOException { + private boolean serverOverloaded; + + public HBaseServerException() { + this(false); + } + + public HBaseServerException(String message) { + this(false, message); + } + + public HBaseServerException(boolean serverOverloaded) { + this.serverOverloaded = serverOverloaded; + } + + public HBaseServerException(boolean serverOverloaded, String message) { + super(message); + this.serverOverloaded = serverOverloaded; + } + + /** + * @param t throwable to check for server overloaded state + * @return True if the server was considered overloaded when the exception was thrown + */ + public static boolean isServerOverloaded(Throwable t) { + if (t instanceof HBaseServerException) { + return ((HBaseServerException) t).isServerOverloaded(); + } + return false; + } + + /** + * Necessary for parsing RemoteException on client side + * @param serverOverloaded True if server was overloaded when exception was thrown + */ + public void setServerOverloaded(boolean serverOverloaded) { + this.serverOverloaded = serverOverloaded; + } + + /** + * @return True if server was considered overloaded when exception was thrown + */ + public boolean isServerOverloaded() { + return serverOverloaded; + } +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdminBuilder.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdminBuilder.java index 49bc350bb9a6..c55977dba5e9 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdminBuilder.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdminBuilder.java @@ -21,6 +21,7 @@ import java.util.concurrent.TimeUnit; +import org.apache.hadoop.hbase.HBaseServerException; import org.apache.yetus.audience.InterfaceAudience; /** @@ -50,21 +51,42 @@ public interface AsyncAdminBuilder { * Set the base pause time for retrying. We use an exponential policy to generate sleep time when * retrying. * @return this for invocation chaining - * @see #setRetryPauseForCQTBE(long, TimeUnit) + * @see #setRetryPauseForServerOverloaded(long, TimeUnit) */ AsyncAdminBuilder setRetryPause(long timeout, TimeUnit unit); /** - * Set the base pause time for retrying when we hit {@code CallQueueTooBigException}. We use an - * exponential policy to generate sleep time when retrying. + * Set the base pause time for retrying when {@link HBaseServerException#isServerOverloaded()}. + * We use an exponential policy to generate sleep time from this base when retrying. *

* This value should be greater than the normal pause value which could be set with the above - * {@link #setRetryPause(long, TimeUnit)} method, as usually {@code CallQueueTooBigException} - * means the server is overloaded. We just use the normal pause value for - * {@code CallQueueTooBigException} if here you specify a smaller value. + * {@link #setRetryPause(long, TimeUnit)} method, as usually + * {@link HBaseServerException#isServerOverloaded()} means the server is overloaded. We just use + * the normal pause value for {@link HBaseServerException#isServerOverloaded()} if here you + * specify a smaller value. + * * @see #setRetryPause(long, TimeUnit) + * @deprecated Since 2.5.0, will be removed in 4.0.0. Please use + * {@link #setRetryPauseForServerOverloaded(long, TimeUnit)} instead. */ - AsyncAdminBuilder setRetryPauseForCQTBE(long pause, TimeUnit unit); + @Deprecated + default AsyncAdminBuilder setRetryPauseForCQTBE(long pause, TimeUnit unit) { + return setRetryPauseForServerOverloaded(pause, unit); + } + + /** + * Set the base pause time for retrying when {@link HBaseServerException#isServerOverloaded()}. + * We use an exponential policy to generate sleep time when retrying. + *

+ * This value should be greater than the normal pause value which could be set with the above + * {@link #setRetryPause(long, TimeUnit)} method, as usually + * {@link HBaseServerException#isServerOverloaded()} means the server is overloaded. We just use + * the normal pause value for {@link HBaseServerException#isServerOverloaded()} if here you + * specify a smaller value. + * + * @see #setRetryPause(long, TimeUnit) + */ + AsyncAdminBuilder setRetryPauseForServerOverloaded(long pause, TimeUnit unit); /** * Set the max retry times for an admin operation. Usually it is the max attempt times minus 1. diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdminBuilderBase.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdminBuilderBase.java index ffb3ae97ecff..cd023d8134d8 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdminBuilderBase.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdminBuilderBase.java @@ -33,7 +33,7 @@ abstract class AsyncAdminBuilderBase implements AsyncAdminBuilder { protected long pauseNs; - protected long pauseForCQTBENs; + protected long pauseNsForServerOverloaded; protected int maxAttempts; @@ -43,7 +43,7 @@ abstract class AsyncAdminBuilderBase implements AsyncAdminBuilder { this.rpcTimeoutNs = connConf.getRpcTimeoutNs(); this.operationTimeoutNs = connConf.getOperationTimeoutNs(); this.pauseNs = connConf.getPauseNs(); - this.pauseForCQTBENs = connConf.getPauseForCQTBENs(); + this.pauseNsForServerOverloaded = connConf.getPauseNsForServerOverloaded(); this.maxAttempts = connConf.getMaxRetries(); this.startLogErrorsCnt = connConf.getStartLogErrorsCnt(); } @@ -67,8 +67,8 @@ public AsyncAdminBuilder setRetryPause(long timeout, TimeUnit unit) { } @Override - public AsyncAdminBuilder setRetryPauseForCQTBE(long timeout, TimeUnit unit) { - this.pauseForCQTBENs = unit.toNanos(timeout); + public AsyncAdminBuilder setRetryPauseForServerOverloaded(long timeout, TimeUnit unit) { + this.pauseNsForServerOverloaded = unit.toNanos(timeout); return this; } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdminRequestRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdminRequestRetryingCaller.java index 7a381db39c82..f03e8b5cacb3 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdminRequestRetryingCaller.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdminRequestRetryingCaller.java @@ -44,10 +44,10 @@ public interface Callable { private ServerName serverName; public AsyncAdminRequestRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn, int priority, - long pauseNs, long pauseForCQTBENs, int maxAttempts, long operationTimeoutNs, + long pauseNs, long pauseNsForServerOverloaded, int maxAttempts, long operationTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt, ServerName serverName, Callable callable) { - super(retryTimer, conn, priority, pauseNs, pauseForCQTBENs, maxAttempts, operationTimeoutNs, - rpcTimeoutNs, startLogErrorsCnt); + super(retryTimer, conn, priority, pauseNs, pauseNsForServerOverloaded, maxAttempts, + operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt); this.serverName = serverName; this.callable = callable; } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java index 7af385da2d83..6e4ed552931f 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java @@ -45,9 +45,9 @@ import java.util.stream.Collectors; import java.util.stream.Stream; import org.apache.commons.lang3.mutable.MutableBoolean; -import org.apache.hadoop.hbase.CallQueueTooBigException; import org.apache.hadoop.hbase.CellScannable; import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.hadoop.hbase.HBaseServerException; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.RetryImmediatelyException; @@ -63,9 +63,7 @@ import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import org.apache.hbase.thirdparty.io.netty.util.Timer; - import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; @@ -104,7 +102,7 @@ class AsyncBatchRpcRetryingCaller { private final long pauseNs; - private final long pauseForCQTBENs; + private final long pauseNsForServerOverloaded; private final int maxAttempts; @@ -150,13 +148,14 @@ public int getPriority() { } public AsyncBatchRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn, - TableName tableName, List actions, long pauseNs, long pauseForCQTBENs, - int maxAttempts, long operationTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) { + TableName tableName, List actions, long pauseNs, + long pauseNsForServerOverloaded, int maxAttempts, long operationTimeoutNs, + long rpcTimeoutNs, int startLogErrorsCnt) { this.retryTimer = retryTimer; this.conn = conn; this.tableName = tableName; this.pauseNs = pauseNs; - this.pauseForCQTBENs = pauseForCQTBENs; + this.pauseNsForServerOverloaded = pauseNsForServerOverloaded; this.maxAttempts = maxAttempts; this.operationTimeoutNs = operationTimeoutNs; this.rpcTimeoutNs = rpcTimeoutNs; @@ -466,17 +465,17 @@ private void onError(Map actionsByRegion, int tries, Thro .collect(Collectors.toList()); addError(copiedActions, error, serverName); tryResubmit(copiedActions.stream(), tries, error instanceof RetryImmediatelyException, - error instanceof CallQueueTooBigException); + HBaseServerException.isServerOverloaded(error)); } private void tryResubmit(Stream actions, int tries, boolean immediately, - boolean isCallQueueTooBig) { + boolean isServerOverloaded) { if (immediately) { groupAndSend(actions, tries); return; } long delayNs; - long pauseNsToUse = isCallQueueTooBig ? pauseForCQTBENs : pauseNs; + long pauseNsToUse = isServerOverloaded ? pauseNsForServerOverloaded : pauseNs; if (operationTimeoutNs > 0) { long maxDelayNs = remainingTimeNs() - SLEEP_DELTA_NS; if (maxDelayNs <= 0) { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java index 0e2c9c616df5..80e3b0ce0471 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java @@ -78,7 +78,7 @@ class AsyncClientScanner { private final long pauseNs; - private final long pauseForCQTBENs; + private final long pauseNsForServerOverloaded; private final int maxAttempts; @@ -93,7 +93,7 @@ class AsyncClientScanner { private final Span span; public AsyncClientScanner(Scan scan, AdvancedScanResultConsumer consumer, TableName tableName, - AsyncConnectionImpl conn, Timer retryTimer, long pauseNs, long pauseForCQTBENs, + AsyncConnectionImpl conn, Timer retryTimer, long pauseNs, long pauseNsForServerOverloaded, int maxAttempts, long scanTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) { if (scan.getStartRow() == null) { scan.withStartRow(EMPTY_START_ROW, scan.includeStartRow()); @@ -107,7 +107,7 @@ public AsyncClientScanner(Scan scan, AdvancedScanResultConsumer consumer, TableN this.conn = conn; this.retryTimer = retryTimer; this.pauseNs = pauseNs; - this.pauseForCQTBENs = pauseForCQTBENs; + this.pauseNsForServerOverloaded = pauseNsForServerOverloaded; this.maxAttempts = maxAttempts; this.scanTimeoutNs = scanTimeoutNs; this.rpcTimeoutNs = rpcTimeoutNs; @@ -198,7 +198,8 @@ private void startScan(OpenScannerResponse resp) { .setScan(scan).metrics(scanMetrics).consumer(consumer).resultCache(resultCache) .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS) .scanTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, TimeUnit.NANOSECONDS) - .pauseForCQTBE(pauseForCQTBENs, TimeUnit.NANOSECONDS).maxAttempts(maxAttempts) + .pauseForServerOverloaded(pauseNsForServerOverloaded, TimeUnit.NANOSECONDS) + .maxAttempts(maxAttempts) .startLogErrorsCnt(startLogErrorsCnt).start(resp.controller, resp.resp), (hasMore, error) -> { try (Scope ignored = span.makeCurrent()) { @@ -231,7 +232,8 @@ private CompletableFuture openScanner(int replicaId) { .row(scan.getStartRow()).replicaId(replicaId).locateType(getLocateType(scan)) .priority(scan.getPriority()).rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS) .operationTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS) - .pause(pauseNs, TimeUnit.NANOSECONDS).pauseForCQTBE(pauseForCQTBENs, TimeUnit.NANOSECONDS) + .pause(pauseNs, TimeUnit.NANOSECONDS) + .pauseForServerOverloaded(pauseNsForServerOverloaded, TimeUnit.NANOSECONDS) .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt) .action(this::callOpenScanner).call(); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java index cddfcb926ed5..69616649d57e 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java @@ -17,46 +17,15 @@ */ package org.apache.hadoop.hbase.client; -import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT; -import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_CLIENT_PAUSE; -import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER; -import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_CLIENT_SCANNER_CACHING; -import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE; import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD; import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_META_SCANNER_CACHING; -import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_RPC_TIMEOUT; -import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_META_OPERATION_TIMEOUT; -import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_META_REPLICA_SCAN_TIMEOUT; -import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_META_REPLICA_SCAN_TIMEOUT_DEFAULT; -import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_OPERATION_TIMEOUT; -import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_PAUSE; -import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_PAUSE_FOR_CQTBE; -import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_RETRIES_NUMBER; -import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_SCANNER_CACHING; -import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY; import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD; import static org.apache.hadoop.hbase.HConstants.HBASE_META_SCANNER_CACHING; import static org.apache.hadoop.hbase.HConstants.HBASE_RPC_READ_TIMEOUT_KEY; -import static org.apache.hadoop.hbase.HConstants.HBASE_RPC_TIMEOUT_KEY; import static org.apache.hadoop.hbase.HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY; -import static org.apache.hadoop.hbase.client.AsyncProcess.DEFAULT_START_LOG_ERRORS_AFTER_COUNT; -import static org.apache.hadoop.hbase.client.AsyncProcess.START_LOG_ERRORS_AFTER_COUNT_KEY; -import static org.apache.hadoop.hbase.client.ConnectionConfiguration.MAX_KEYVALUE_SIZE_DEFAULT; -import static org.apache.hadoop.hbase.client.ConnectionConfiguration.MAX_KEYVALUE_SIZE_KEY; -import static org.apache.hadoop.hbase.client.ConnectionConfiguration.PRIMARY_CALL_TIMEOUT_MICROSECOND; -import static org.apache.hadoop.hbase.client.ConnectionConfiguration.PRIMARY_CALL_TIMEOUT_MICROSECOND_DEFAULT; -import static org.apache.hadoop.hbase.client.ConnectionConfiguration.PRIMARY_SCAN_TIMEOUT_MICROSECOND; -import static org.apache.hadoop.hbase.client.ConnectionConfiguration.PRIMARY_SCAN_TIMEOUT_MICROSECOND_DEFAULT; -import static org.apache.hadoop.hbase.client.ConnectionConfiguration.WRITE_BUFFER_PERIODIC_FLUSH_TIMEOUT_MS; -import static org.apache.hadoop.hbase.client.ConnectionConfiguration.WRITE_BUFFER_PERIODIC_FLUSH_TIMEOUT_MS_DEFAULT; -import static org.apache.hadoop.hbase.client.ConnectionConfiguration.WRITE_BUFFER_SIZE_DEFAULT; -import static org.apache.hadoop.hbase.client.ConnectionConfiguration.WRITE_BUFFER_SIZE_KEY; - import java.util.concurrent.TimeUnit; import org.apache.hadoop.conf.Configuration; import org.apache.yetus.audience.InterfaceAudience; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * Timeout configs. @@ -64,7 +33,15 @@ @InterfaceAudience.Private class AsyncConnectionConfiguration { - private static final Logger LOG = LoggerFactory.getLogger(AsyncConnectionConfiguration.class); + /** + * Configure the number of failures after which the client will start logging. A few failures + * is fine: region moved, then is not opened, then is overloaded. We try to have an acceptable + * heuristic for the number of errors we don't log. 5 was chosen because we wait for 1s at + * this stage. + */ + public static final String START_LOG_ERRORS_AFTER_COUNT_KEY = + "hbase.client.start.log.errors.counter"; + public static final int DEFAULT_START_LOG_ERRORS_AFTER_COUNT = 5; private final long metaOperationTimeoutNs; @@ -84,7 +61,7 @@ class AsyncConnectionConfiguration { private final long pauseNs; - private final long pauseForCQTBENs; + private final long pauseNsForServerOverloaded; private final int maxRetries; @@ -118,50 +95,45 @@ class AsyncConnectionConfiguration { private final int maxKeyValueSize; AsyncConnectionConfiguration(Configuration conf) { + ConnectionConfiguration connectionConf = new ConnectionConfiguration(conf); + + // fields we can pull directly from connection configuration + this.scannerCaching = connectionConf.getScannerCaching(); + this.scannerMaxResultSize = connectionConf.getScannerMaxResultSize(); + this.writeBufferSize = connectionConf.getWriteBufferSize(); + this.writeBufferPeriodicFlushTimeoutNs = connectionConf.getWriteBufferPeriodicFlushTimeoutMs(); + this.maxKeyValueSize = connectionConf.getMaxKeyValueSize(); + this.maxRetries = connectionConf.getRetriesNumber(); + + // fields from connection configuration that need to be converted to nanos this.metaOperationTimeoutNs = TimeUnit.MILLISECONDS.toNanos( - conf.getLong(HBASE_CLIENT_META_OPERATION_TIMEOUT, DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT)); - this.operationTimeoutNs = TimeUnit.MILLISECONDS.toNanos( - conf.getLong(HBASE_CLIENT_OPERATION_TIMEOUT, DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT)); - long rpcTimeoutMs = conf.getLong(HBASE_RPC_TIMEOUT_KEY, DEFAULT_HBASE_RPC_TIMEOUT); - this.rpcTimeoutNs = TimeUnit.MILLISECONDS.toNanos(rpcTimeoutMs); + connectionConf.getMetaOperationTimeout()); + this.operationTimeoutNs = TimeUnit.MILLISECONDS.toNanos(connectionConf.getOperationTimeout()); + this.rpcTimeoutNs = TimeUnit.MILLISECONDS.toNanos(connectionConf.getRpcTimeout()); this.readRpcTimeoutNs = - TimeUnit.MILLISECONDS.toNanos(conf.getLong(HBASE_RPC_READ_TIMEOUT_KEY, rpcTimeoutMs)); + TimeUnit.MILLISECONDS.toNanos(conf.getLong(HBASE_RPC_READ_TIMEOUT_KEY, + connectionConf.getReadRpcTimeout())); this.writeRpcTimeoutNs = - TimeUnit.MILLISECONDS.toNanos(conf.getLong(HBASE_RPC_WRITE_TIMEOUT_KEY, rpcTimeoutMs)); - long pauseMs = conf.getLong(HBASE_CLIENT_PAUSE, DEFAULT_HBASE_CLIENT_PAUSE); - long pauseForCQTBEMs = conf.getLong(HBASE_CLIENT_PAUSE_FOR_CQTBE, pauseMs); - if (pauseForCQTBEMs < pauseMs) { - LOG.warn( - "The {} setting: {} ms is less than the {} setting: {} ms, use the greater one instead", - HBASE_CLIENT_PAUSE_FOR_CQTBE, pauseForCQTBEMs, HBASE_CLIENT_PAUSE, pauseMs); - pauseForCQTBEMs = pauseMs; - } - this.pauseNs = TimeUnit.MILLISECONDS.toNanos(pauseMs); - this.pauseForCQTBENs = TimeUnit.MILLISECONDS.toNanos(pauseForCQTBEMs); - this.maxRetries = conf.getInt(HBASE_CLIENT_RETRIES_NUMBER, DEFAULT_HBASE_CLIENT_RETRIES_NUMBER); + TimeUnit.MILLISECONDS.toNanos(conf.getLong(HBASE_RPC_WRITE_TIMEOUT_KEY, + connectionConf.getWriteRpcTimeout())); + this.pauseNs = TimeUnit.MILLISECONDS.toNanos(connectionConf.getPauseMillis()); + this.pauseNsForServerOverloaded = TimeUnit.MILLISECONDS.toNanos( + connectionConf.getPauseMillisForServerOverloaded()); + this.primaryCallTimeoutNs = TimeUnit.MICROSECONDS.toNanos( + connectionConf.getPrimaryCallTimeoutMicroSecond()); + this.primaryScanTimeoutNs = TimeUnit.MICROSECONDS.toNanos( + connectionConf.getReplicaCallTimeoutMicroSecondScan()); + this.primaryMetaScanTimeoutNs = + TimeUnit.MICROSECONDS.toNanos(connectionConf.getMetaReplicaCallTimeoutMicroSecondScan()); + this.scanTimeoutNs = TimeUnit.MILLISECONDS.toNanos( + conf.getInt(HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, + DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD)); + + // fields not in connection configuration this.startLogErrorsCnt = conf.getInt(START_LOG_ERRORS_AFTER_COUNT_KEY, DEFAULT_START_LOG_ERRORS_AFTER_COUNT); - this.scanTimeoutNs = TimeUnit.MILLISECONDS.toNanos( - conf.getInt(HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, - DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD)); - this.scannerCaching = - conf.getInt(HBASE_CLIENT_SCANNER_CACHING, DEFAULT_HBASE_CLIENT_SCANNER_CACHING); this.metaScannerCaching = conf.getInt(HBASE_META_SCANNER_CACHING, DEFAULT_HBASE_META_SCANNER_CACHING); - this.scannerMaxResultSize = conf.getLong(HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY, - DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE); - this.writeBufferSize = conf.getLong(WRITE_BUFFER_SIZE_KEY, WRITE_BUFFER_SIZE_DEFAULT); - this.writeBufferPeriodicFlushTimeoutNs = - TimeUnit.MILLISECONDS.toNanos(conf.getLong(WRITE_BUFFER_PERIODIC_FLUSH_TIMEOUT_MS, - WRITE_BUFFER_PERIODIC_FLUSH_TIMEOUT_MS_DEFAULT)); - this.primaryCallTimeoutNs = TimeUnit.MICROSECONDS.toNanos( - conf.getLong(PRIMARY_CALL_TIMEOUT_MICROSECOND, PRIMARY_CALL_TIMEOUT_MICROSECOND_DEFAULT)); - this.primaryScanTimeoutNs = TimeUnit.MICROSECONDS.toNanos( - conf.getLong(PRIMARY_SCAN_TIMEOUT_MICROSECOND, PRIMARY_SCAN_TIMEOUT_MICROSECOND_DEFAULT)); - this.primaryMetaScanTimeoutNs = - TimeUnit.MICROSECONDS.toNanos(conf.getLong(HBASE_CLIENT_META_REPLICA_SCAN_TIMEOUT, - HBASE_CLIENT_META_REPLICA_SCAN_TIMEOUT_DEFAULT)); - this.maxKeyValueSize = conf.getInt(MAX_KEYVALUE_SIZE_KEY, MAX_KEYVALUE_SIZE_DEFAULT); } long getMetaOperationTimeoutNs() { @@ -188,8 +160,8 @@ long getPauseNs() { return pauseNs; } - long getPauseForCQTBENs() { - return pauseForCQTBENs; + long getPauseNsForServerOverloaded() { + return pauseNsForServerOverloaded; } int getMaxRetries() { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMasterRequestRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMasterRequestRpcRetryingCaller.java index de2778cf6d78..976e9e78477c 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMasterRequestRpcRetryingCaller.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMasterRequestRpcRetryingCaller.java @@ -44,10 +44,10 @@ public interface Callable { private final Callable callable; public AsyncMasterRequestRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn, - Callable callable, int priority, long pauseNs, long pauseForCQTBENs, int maxRetries, - long operationTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) { - super(retryTimer, conn, priority, pauseNs, pauseForCQTBENs, maxRetries, operationTimeoutNs, - rpcTimeoutNs, startLogErrorsCnt); + Callable callable, int priority, long pauseNs, long pauseNsForServerOverloaded, + int maxRetries, long operationTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) { + super(retryTimer, conn, priority, pauseNs, pauseNsForServerOverloaded, maxRetries, + operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt); this.callable = callable; } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java index 6071cb63645e..2b155842999e 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java @@ -134,14 +134,13 @@ public void waitUntilDone() throws InterruptedIOException { final long id; final ClusterConnection connection; + final ConnectionConfiguration connectionConfiguration; private final RpcRetryingCallerFactory rpcCallerFactory; final RpcControllerFactory rpcFactory; // Start configuration settings. final int startLogErrorsCnt; - final long pause; - final long pauseForCQTBE;// pause for CallQueueTooBigException, if specified final int numTries; long serverTrackerTimeout; final long primaryCallTimeoutMicroseconds; @@ -156,30 +155,25 @@ public void waitUntilDone() throws InterruptedIOException { public static final String LOG_DETAILS_PERIOD = "hbase.client.log.detail.period.ms"; private static final int DEFAULT_LOG_DETAILS_PERIOD = 10000; private final int periodToLog; + AsyncProcess(ClusterConnection hc, Configuration conf, - RpcRetryingCallerFactory rpcCaller, RpcControllerFactory rpcFactory) { + RpcRetryingCallerFactory rpcCaller, RpcControllerFactory rpcFactory) { + this(hc, conf, rpcCaller, rpcFactory, hc.getConnectionConfiguration().getRetriesNumber()); + } + + AsyncProcess(ClusterConnection hc, Configuration conf, + RpcRetryingCallerFactory rpcCaller, RpcControllerFactory rpcFactory, int retriesNumber) { if (hc == null) { throw new IllegalArgumentException("ClusterConnection cannot be null."); } this.connection = hc; + this.connectionConfiguration = connection.getConnectionConfiguration(); this.id = COUNTER.incrementAndGet(); - this.pause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE, - HConstants.DEFAULT_HBASE_CLIENT_PAUSE); - long configuredPauseForCQTBE = conf.getLong(HConstants.HBASE_CLIENT_PAUSE_FOR_CQTBE, pause); - if (configuredPauseForCQTBE < pause) { - LOG.warn("The " + HConstants.HBASE_CLIENT_PAUSE_FOR_CQTBE + " setting: " - + configuredPauseForCQTBE + " is smaller than " + HConstants.HBASE_CLIENT_PAUSE - + ", will use " + pause + " instead."); - this.pauseForCQTBE = pause; - } else { - this.pauseForCQTBE = configuredPauseForCQTBE; - } // how many times we could try in total, one more than retry number - this.numTries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, - HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER) + 1; + this.numTries = retriesNumber + 1; this.primaryCallTimeoutMicroseconds = conf.getInt(PRIMARY_CALL_TIMEOUT_KEY, 10000); this.startLogErrorsCnt = conf.getInt(START_LOG_ERRORS_AFTER_COUNT_KEY, DEFAULT_START_LOG_ERRORS_AFTER_COUNT); @@ -193,7 +187,8 @@ public void waitUntilDone() throws InterruptedIOException { // we will do more retries in aggregate, but the user will be none the wiser. this.serverTrackerTimeout = 0L; for (int i = 0; i < this.numTries; ++i) { - serverTrackerTimeout = serverTrackerTimeout + ConnectionUtils.getPauseTime(this.pause, i); + serverTrackerTimeout = serverTrackerTimeout + + ConnectionUtils.getPauseTime(connectionConfiguration.getPauseMillis(), i); } this.rpcCallerFactory = rpcCaller; diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFutureImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFutureImpl.java index ca6d5342d57a..d5da4db36fc8 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFutureImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFutureImpl.java @@ -36,8 +36,8 @@ import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; -import org.apache.hadoop.hbase.CallQueueTooBigException; import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.hadoop.hbase.HBaseServerException; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.RegionLocations; @@ -738,11 +738,14 @@ private void resubmit(ServerName oldServer, List toReplay, long backOffTime; if (retryImmediately) { backOffTime = 0; - } else if (throwable instanceof CallQueueTooBigException) { - // Give a special check on CQTBE, see #HBASE-17114 - backOffTime = errorsByServer.calculateBackoffTime(oldServer, asyncProcess.pauseForCQTBE); + } else if (HBaseServerException.isServerOverloaded(throwable)) { + // Give a special check when encountering an exception indicating the server is overloaded. + // see #HBASE-17114 and HBASE-26807 + backOffTime = errorsByServer.calculateBackoffTime(oldServer, + asyncProcess.connectionConfiguration.getPauseMillisForServerOverloaded()); } else { - backOffTime = errorsByServer.calculateBackoffTime(oldServer, asyncProcess.pause); + backOffTime = errorsByServer.calculateBackoffTime(oldServer, + asyncProcess.connectionConfiguration.getPauseMillis()); } if (numAttempt > asyncProcess.startLogErrorsCnt) { // We use this value to have some logs when we have multiple failures, but not too many diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCaller.java index 586e7d52e074..82ab90a47fa7 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCaller.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCaller.java @@ -31,8 +31,8 @@ import java.util.concurrent.TimeUnit; import java.util.function.Consumer; import java.util.function.Supplier; -import org.apache.hadoop.hbase.CallQueueTooBigException; import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.hadoop.hbase.HBaseServerException; import org.apache.hadoop.hbase.NotServingRegionException; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableNotEnabledException; @@ -44,7 +44,6 @@ import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import org.apache.hbase.thirdparty.io.netty.util.Timer; @InterfaceAudience.Private @@ -60,7 +59,7 @@ public abstract class AsyncRpcRetryingCaller { private final long pauseNs; - private final long pauseForCQTBENs; + private final long pauseNsForServerOverloaded; private int tries = 1; @@ -81,13 +80,13 @@ public abstract class AsyncRpcRetryingCaller { protected final HBaseRpcController controller; public AsyncRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn, int priority, - long pauseNs, long pauseForCQTBENs, int maxAttempts, long operationTimeoutNs, + long pauseNs, long pauseNsForServerOverloaded, int maxAttempts, long operationTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) { this.retryTimer = retryTimer; this.conn = conn; this.priority = priority; this.pauseNs = pauseNs; - this.pauseForCQTBENs = pauseForCQTBENs; + this.pauseNsForServerOverloaded = pauseNsForServerOverloaded; this.maxAttempts = maxAttempts; this.operationTimeoutNs = operationTimeoutNs; this.rpcTimeoutNs = rpcTimeoutNs; @@ -127,7 +126,8 @@ protected final void resetCallTimeout() { } private void tryScheduleRetry(Throwable error) { - long pauseNsToUse = error instanceof CallQueueTooBigException ? pauseForCQTBENs : pauseNs; + long pauseNsToUse = HBaseServerException.isServerOverloaded(error) ? + pauseNsForServerOverloaded : pauseNs; long delayNs; if (operationTimeoutNs > 0) { long maxDelayNs = remainingTimeNs() - SLEEP_DELTA_NS; diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java index 48bde4434be7..d501998f8684 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java @@ -58,7 +58,7 @@ private abstract class BuilderBase { protected long pauseNs = conn.connConf.getPauseNs(); - protected long pauseForCQTBENs = conn.connConf.getPauseForCQTBENs(); + protected long pauseNsForServerOverloaded = conn.connConf.getPauseNsForServerOverloaded(); protected int maxAttempts = retries2Attempts(conn.connConf.getMaxRetries()); @@ -119,8 +119,8 @@ public SingleRequestCallerBuilder pause(long pause, TimeUnit unit) { return this; } - public SingleRequestCallerBuilder pauseForCQTBE(long pause, TimeUnit unit) { - this.pauseForCQTBENs = unit.toNanos(pause); + public SingleRequestCallerBuilder pauseForServerOverloaded(long pause, TimeUnit unit) { + this.pauseNsForServerOverloaded = unit.toNanos(pause); return this; } @@ -156,8 +156,8 @@ private void preCheck() { public AsyncSingleRequestRpcRetryingCaller build() { preCheck(); return new AsyncSingleRequestRpcRetryingCaller<>(retryTimer, conn, tableName, row, replicaId, - locateType, callable, priority, pauseNs, pauseForCQTBENs, maxAttempts, operationTimeoutNs, - rpcTimeoutNs, startLogErrorsCnt); + locateType, callable, priority, pauseNs, pauseNsForServerOverloaded, maxAttempts, + operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt); } /** @@ -263,8 +263,8 @@ public ScanSingleRegionCallerBuilder pause(long pause, TimeUnit unit) { return this; } - public ScanSingleRegionCallerBuilder pauseForCQTBE(long pause, TimeUnit unit) { - this.pauseForCQTBENs = unit.toNanos(pause); + public ScanSingleRegionCallerBuilder pauseForServerOverloaded(long pause, TimeUnit unit) { + this.pauseNsForServerOverloaded = unit.toNanos(pause); return this; } @@ -292,8 +292,8 @@ public AsyncScanSingleRegionRpcRetryingCaller build() { preCheck(); return new AsyncScanSingleRegionRpcRetryingCaller(retryTimer, conn, scan, scanMetrics, scannerId, resultCache, consumer, stub, loc, isRegionServerRemote, priority, - scannerLeaseTimeoutPeriodNs, pauseNs, pauseForCQTBENs, maxAttempts, scanTimeoutNs, - rpcTimeoutNs, startLogErrorsCnt); + scannerLeaseTimeoutPeriodNs, pauseNs, pauseNsForServerOverloaded, maxAttempts, + scanTimeoutNs, rpcTimeoutNs, startLogErrorsCnt); } /** @@ -347,8 +347,8 @@ public BatchCallerBuilder pause(long pause, TimeUnit unit) { return this; } - public BatchCallerBuilder pauseForCQTBE(long pause, TimeUnit unit) { - this.pauseForCQTBENs = unit.toNanos(pause); + public BatchCallerBuilder pauseForServerOverloaded(long pause, TimeUnit unit) { + this.pauseNsForServerOverloaded = unit.toNanos(pause); return this; } @@ -364,7 +364,8 @@ public BatchCallerBuilder startLogErrorsCnt(int startLogErrorsCnt) { public AsyncBatchRpcRetryingCaller build() { return new AsyncBatchRpcRetryingCaller<>(retryTimer, conn, tableName, actions, pauseNs, - pauseForCQTBENs, maxAttempts, operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt); + pauseNsForServerOverloaded, maxAttempts, operationTimeoutNs, rpcTimeoutNs, + startLogErrorsCnt); } public List> call() { @@ -406,8 +407,8 @@ public MasterRequestCallerBuilder pause(long pause, TimeUnit unit) { return this; } - public MasterRequestCallerBuilder pauseForCQTBE(long pause, TimeUnit unit) { - this.pauseForCQTBENs = unit.toNanos(pause); + public MasterRequestCallerBuilder pauseForServerOverloaded(long pause, TimeUnit unit) { + this.pauseNsForServerOverloaded = unit.toNanos(pause); return this; } @@ -438,7 +439,8 @@ private void preCheck() { public AsyncMasterRequestRpcRetryingCaller build() { preCheck(); return new AsyncMasterRequestRpcRetryingCaller(retryTimer, conn, callable, priority, - pauseNs, pauseForCQTBENs, maxAttempts, operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt); + pauseNs, pauseNsForServerOverloaded, maxAttempts, operationTimeoutNs, rpcTimeoutNs, + startLogErrorsCnt); } /** @@ -487,8 +489,8 @@ public AdminRequestCallerBuilder pause(long pause, TimeUnit unit) { return this; } - public AdminRequestCallerBuilder pauseForCQTBE(long pause, TimeUnit unit) { - this.pauseForCQTBENs = unit.toNanos(pause); + public AdminRequestCallerBuilder pauseForServerOverloaded(long pause, TimeUnit unit) { + this.pauseNsForServerOverloaded = unit.toNanos(pause); return this; } @@ -514,8 +516,9 @@ public AdminRequestCallerBuilder priority(int priority) { public AsyncAdminRequestRetryingCaller build() { return new AsyncAdminRequestRetryingCaller(retryTimer, conn, priority, pauseNs, - pauseForCQTBENs, maxAttempts, operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt, - checkNotNull(serverName, "serverName is null"), checkNotNull(callable, "action is null")); + pauseNsForServerOverloaded, maxAttempts, operationTimeoutNs, rpcTimeoutNs, + startLogErrorsCnt, checkNotNull(serverName, "serverName is null"), + checkNotNull(callable, "action is null")); } public CompletableFuture call() { @@ -558,8 +561,8 @@ public ServerRequestCallerBuilder pause(long pause, TimeUnit unit) { return this; } - public ServerRequestCallerBuilder pauseForCQTBE(long pause, TimeUnit unit) { - this.pauseForCQTBENs = unit.toNanos(pause); + public ServerRequestCallerBuilder pauseForServerOverloaded(long pause, TimeUnit unit) { + this.pauseNsForServerOverloaded = unit.toNanos(pause); return this; } @@ -579,7 +582,8 @@ public ServerRequestCallerBuilder serverName(ServerName serverName) { } public AsyncServerRequestRpcRetryingCaller build() { - return new AsyncServerRequestRpcRetryingCaller(retryTimer, conn, pauseNs, pauseForCQTBENs, + return new AsyncServerRequestRpcRetryingCaller(retryTimer, conn, pauseNs, + pauseNsForServerOverloaded, maxAttempts, operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt, checkNotNull(serverName, "serverName is null"), checkNotNull(callable, "action is null")); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java index e9aa962edb38..973c0539cf49 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java @@ -36,8 +36,8 @@ import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; -import org.apache.hadoop.hbase.CallQueueTooBigException; import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.hadoop.hbase.HBaseServerException; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.NotServingRegionException; @@ -101,7 +101,7 @@ class AsyncScanSingleRegionRpcRetryingCaller { private final long pauseNs; - private final long pauseForCQTBENs; + private final long pauseNsForServerOverloaded; private final int maxAttempts; @@ -310,7 +310,7 @@ public AsyncScanSingleRegionRpcRetryingCaller(Timer retryTimer, AsyncConnectionI Scan scan, ScanMetrics scanMetrics, long scannerId, ScanResultCache resultCache, AdvancedScanResultConsumer consumer, Interface stub, HRegionLocation loc, boolean isRegionServerRemote, int priority, long scannerLeaseTimeoutPeriodNs, long pauseNs, - long pauseForCQTBENs, int maxAttempts, long scanTimeoutNs, long rpcTimeoutNs, + long pauseNsForServerOverloaded, int maxAttempts, long scanTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) { this.retryTimer = retryTimer; this.scan = scan; @@ -323,7 +323,7 @@ public AsyncScanSingleRegionRpcRetryingCaller(Timer retryTimer, AsyncConnectionI this.regionServerRemote = isRegionServerRemote; this.scannerLeaseTimeoutPeriodNs = scannerLeaseTimeoutPeriodNs; this.pauseNs = pauseNs; - this.pauseForCQTBENs = pauseForCQTBENs; + this.pauseNsForServerOverloaded = pauseNsForServerOverloaded; this.maxAttempts = maxAttempts; this.scanTimeoutNs = scanTimeoutNs; this.rpcTimeoutNs = rpcTimeoutNs; @@ -413,7 +413,8 @@ private void onError(Throwable error) { return; } long delayNs; - long pauseNsToUse = error instanceof CallQueueTooBigException ? pauseForCQTBENs : pauseNs; + long pauseNsToUse = HBaseServerException.isServerOverloaded(error) ? + pauseNsForServerOverloaded : pauseNs; if (scanTimeoutNs > 0) { long maxDelayNs = remainingTimeNs() - SLEEP_DELTA_NS; if (maxDelayNs <= 0) { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncServerRequestRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncServerRequestRpcRetryingCaller.java index 52a2abe39440..8c6cf81f4c71 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncServerRequestRpcRetryingCaller.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncServerRequestRpcRetryingCaller.java @@ -46,10 +46,10 @@ public interface Callable { private ServerName serverName; public AsyncServerRequestRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn, - long pauseNs, long pauseForCQTBENs, int maxAttempts, long operationTimeoutNs, + long pauseNs, long pauseNsForServerOverloaded, int maxAttempts, long operationTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt, ServerName serverName, Callable callable) { - super(retryTimer, conn, HConstants.NORMAL_QOS, pauseNs, pauseForCQTBENs, maxAttempts, - operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt); + super(retryTimer, conn, HConstants.NORMAL_QOS, pauseNs, pauseNsForServerOverloaded, + maxAttempts, operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt); this.serverName = serverName; this.callable = callable; } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java index 2a552c71b3dd..31fa1834bb70 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java @@ -56,10 +56,10 @@ CompletableFuture call(HBaseRpcController controller, HRegionLocation loc, public AsyncSingleRequestRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn, TableName tableName, byte[] row, int replicaId, RegionLocateType locateType, - Callable callable, int priority, long pauseNs, long pauseForCQTBENs, int maxAttempts, - long operationTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) { - super(retryTimer, conn, priority, pauseNs, pauseForCQTBENs, maxAttempts, operationTimeoutNs, - rpcTimeoutNs, startLogErrorsCnt); + Callable callable, int priority, long pauseNs, long pauseNsForServerOverloaded, + int maxAttempts, long operationTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) { + super(retryTimer, conn, priority, pauseNs, pauseNsForServerOverloaded, maxAttempts, + operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt); this.tableName = tableName; this.row = row; this.replicaId = replicaId; diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBuilder.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBuilder.java index 4c883a8332d7..ebf98f98bc3e 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBuilder.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBuilder.java @@ -21,6 +21,7 @@ import java.util.concurrent.TimeUnit; +import org.apache.hadoop.hbase.HBaseServerException; import org.apache.yetus.audience.InterfaceAudience; /** @@ -76,21 +77,42 @@ public interface AsyncTableBuilder { /** * Set the base pause time for retrying. We use an exponential policy to generate sleep time when * retrying. - * @see #setRetryPauseForCQTBE(long, TimeUnit) + * @see #setRetryPauseForServerOverloaded(long, TimeUnit) */ AsyncTableBuilder setRetryPause(long pause, TimeUnit unit); /** - * Set the base pause time for retrying when we hit {@code CallQueueTooBigException}. We use an - * exponential policy to generate sleep time when retrying. + * Set the base pause time for retrying when {@link HBaseServerException#isServerOverloaded()}. + * We use an exponential policy to generate sleep time when retrying. *

* This value should be greater than the normal pause value which could be set with the above - * {@link #setRetryPause(long, TimeUnit)} method, as usually {@code CallQueueTooBigException} - * means the server is overloaded. We just use the normal pause value for - * {@code CallQueueTooBigException} if here you specify a smaller value. + * {@link #setRetryPause(long, TimeUnit)} method, as usually + * {@link HBaseServerException#isServerOverloaded()} means the server is overloaded. We just use + * the normal pause value for {@link HBaseServerException#isServerOverloaded()} if here you + * specify a smaller value. + * * @see #setRetryPause(long, TimeUnit) + * @deprecated Since 2.5.0, will be removed in 4.0.0. Please use + * {@link #setRetryPauseForServerOverloaded(long, TimeUnit)} instead. */ - AsyncTableBuilder setRetryPauseForCQTBE(long pause, TimeUnit unit); + @Deprecated + default AsyncTableBuilder setRetryPauseForCQTBE(long pause, TimeUnit unit) { + return setRetryPauseForServerOverloaded(pause, unit); + } + + /** + * Set the base pause time for retrying when {@link HBaseServerException#isServerOverloaded()}. + * We use an exponential policy to generate sleep time when retrying. + *

+ * This value should be greater than the normal pause value which could be set with the above + * {@link #setRetryPause(long, TimeUnit)} method, as usually + * {@link HBaseServerException#isServerOverloaded()} means the server is overloaded. We just use + * the normal pause value for {@link HBaseServerException#isServerOverloaded()} if here you + * specify a smaller value. + * + * @see #setRetryPause(long, TimeUnit) + */ + AsyncTableBuilder setRetryPauseForServerOverloaded(long pause, TimeUnit unit); /** * Set the max retry times for an operation. Usually it is the max attempt times minus 1. diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBuilderBase.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBuilderBase.java index 399d9ddfaffe..bec9f1236907 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBuilderBase.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBuilderBase.java @@ -45,7 +45,7 @@ abstract class AsyncTableBuilderBase protected long pauseNs; - protected long pauseForCQTBENs; + protected long pauseNsForServerOverloaded; protected int maxAttempts; @@ -60,7 +60,7 @@ abstract class AsyncTableBuilderBase this.readRpcTimeoutNs = connConf.getReadRpcTimeoutNs(); this.writeRpcTimeoutNs = connConf.getWriteRpcTimeoutNs(); this.pauseNs = connConf.getPauseNs(); - this.pauseForCQTBENs = connConf.getPauseForCQTBENs(); + this.pauseNsForServerOverloaded = connConf.getPauseNsForServerOverloaded(); this.maxAttempts = retries2Attempts(connConf.getMaxRetries()); this.startLogErrorsCnt = connConf.getStartLogErrorsCnt(); } @@ -102,8 +102,8 @@ public AsyncTableBuilderBase setRetryPause(long pause, TimeUnit unit) { } @Override - public AsyncTableBuilderBase setRetryPauseForCQTBE(long pause, TimeUnit unit) { - this.pauseForCQTBENs = unit.toNanos(pause); + public AsyncTableBuilderBase setRetryPauseForServerOverloaded(long pause, TimeUnit unit) { + this.pauseNsForServerOverloaded = unit.toNanos(pause); return this; } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionConfiguration.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionConfiguration.java index 19a398b8c66d..eee2139796f1 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionConfiguration.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionConfiguration.java @@ -11,9 +11,13 @@ package org.apache.hadoop.hbase.client; +import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_CLIENT_PAUSE; +import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_PAUSE; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HConstants; import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Configuration parameters for the connection. @@ -25,6 +29,7 @@ */ @InterfaceAudience.Private public class ConnectionConfiguration { + private static final Logger LOG = LoggerFactory.getLogger(ConnectionConfiguration.class); public static final String WRITE_BUFFER_SIZE_KEY = "hbase.client.write.buffer"; public static final long WRITE_BUFFER_SIZE_DEFAULT = 2097152; @@ -43,6 +48,24 @@ public class ConnectionConfiguration { "hbase.client.replicaCallTimeout.scan"; public static final int PRIMARY_SCAN_TIMEOUT_MICROSECOND_DEFAULT = 1000000; // 1s + /** + * Parameter name for client pause when server is overloaded, denoted by an exception + * where {@link org.apache.hadoop.hbase.HBaseServerException#isServerOverloaded(Throwable)} + * is true. + */ + public static final String HBASE_CLIENT_PAUSE_FOR_SERVER_OVERLOADED = + "hbase.client.pause.server.overloaded"; + + static { + // This is added where the configs are referenced. It may be too late to happen before + // any user _sets_ the old cqtbe config onto a Configuration option. So we still need + // to handle checking both properties in parsing below. The benefit of calling this is + // that it should still cause Configuration to log a warning if we do end up falling + // through to the old deprecated config. + Configuration.addDeprecation( + HConstants.HBASE_CLIENT_PAUSE_FOR_CQTBE, HBASE_CLIENT_PAUSE_FOR_SERVER_OVERLOADED); + } + private final long writeBufferSize; private final long writeBufferPeriodicFlushTimeoutMs; private final long writeBufferPeriodicFlushTimerTickMs; @@ -60,6 +83,8 @@ public class ConnectionConfiguration { private final int writeRpcTimeout; // toggle for async/sync prefetch private final boolean clientScannerAsyncPrefetch; + private final long pauseMs; + private final long pauseMsForServerOverloaded; /** * Constructor @@ -115,6 +140,21 @@ public class ConnectionConfiguration { this.writeRpcTimeout = conf.getInt(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY, conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT)); + + + long pauseMs = conf.getLong(HBASE_CLIENT_PAUSE, DEFAULT_HBASE_CLIENT_PAUSE); + long pauseMsForServerOverloaded = conf.getLong(HBASE_CLIENT_PAUSE_FOR_SERVER_OVERLOADED, + conf.getLong(HConstants.HBASE_CLIENT_PAUSE_FOR_CQTBE, pauseMs)); + if (pauseMsForServerOverloaded < pauseMs) { + LOG.warn( + "The {} setting: {} ms is less than the {} setting: {} ms, use the greater one instead", + HBASE_CLIENT_PAUSE_FOR_SERVER_OVERLOADED, pauseMsForServerOverloaded, + HBASE_CLIENT_PAUSE, pauseMs); + pauseMsForServerOverloaded = pauseMs; + } + + this.pauseMs = pauseMs; + this.pauseMsForServerOverloaded = pauseMsForServerOverloaded; } /** @@ -140,6 +180,8 @@ protected ConnectionConfiguration() { this.readRpcTimeout = HConstants.DEFAULT_HBASE_RPC_TIMEOUT; this.writeRpcTimeout = HConstants.DEFAULT_HBASE_RPC_TIMEOUT; this.rpcTimeout = HConstants.DEFAULT_HBASE_RPC_TIMEOUT; + this.pauseMs = DEFAULT_HBASE_CLIENT_PAUSE; + this.pauseMsForServerOverloaded = DEFAULT_HBASE_CLIENT_PAUSE; } public int getReadRpcTimeout() { @@ -205,4 +247,12 @@ public boolean isClientScannerAsyncPrefetch() { public int getRpcTimeout() { return rpcTimeout; } + + public long getPauseMillis() { + return pauseMs; + } + + public long getPauseMillisForServerOverloaded() { + return pauseMsForServerOverloaded; + } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java index ff4368ba4853..ba10801968e0 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java @@ -52,9 +52,9 @@ import java.util.concurrent.locks.ReentrantLock; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.AuthUtil; -import org.apache.hadoop.hbase.CallQueueTooBigException; import org.apache.hadoop.hbase.ChoreService; import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.hadoop.hbase.HBaseServerException; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.MasterNotRunningException; @@ -168,8 +168,6 @@ public class ConnectionImplementation implements ClusterConnection, Closeable { public static final String RETRIES_BY_SERVER_KEY = "hbase.client.retries.by.server"; private static final Logger LOG = LoggerFactory.getLogger(ConnectionImplementation.class); - private final long pause; - private final long pauseForCQTBE;// pause for CallQueueTooBigException, if specified // The mode tells if HedgedRead, LoadBalance mode is supported. // The default mode is CatalogReplicaMode.None. private CatalogReplicaMode metaReplicaMode; @@ -267,16 +265,6 @@ public class ConnectionImplementation implements ClusterConnection, Closeable { this.batchPool = (ThreadPoolExecutor) pool; this.connectionConfig = new ConnectionConfiguration(conf); this.closed = false; - this.pause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE, HConstants.DEFAULT_HBASE_CLIENT_PAUSE); - long configuredPauseForCQTBE = conf.getLong(HConstants.HBASE_CLIENT_PAUSE_FOR_CQTBE, pause); - if (configuredPauseForCQTBE < pause) { - LOG.warn("The " + HConstants.HBASE_CLIENT_PAUSE_FOR_CQTBE + " setting: " - + configuredPauseForCQTBE + " is smaller than " + HConstants.HBASE_CLIENT_PAUSE - + ", will use " + pause + " instead."); - this.pauseForCQTBE = pause; - } else { - this.pauseForCQTBE = configuredPauseForCQTBE; - } this.metaReplicaCallTimeoutScanInMicroSecond = connectionConfig.getMetaReplicaCallTimeoutMicroSecondScan(); @@ -946,7 +934,7 @@ private RegionLocations locateRegionInMeta(TableName tableName, byte[] row, bool metaCache.clearCache(tableName, row, replicaId); } // Query the meta region - long pauseBase = this.pause; + long pauseBase = connectionConfig.getPauseMillis(); takeUserRegionLock(); try { // We don't need to check if useCache is enabled or not. Even if useCache is false @@ -1036,9 +1024,10 @@ private RegionLocations locateRegionInMeta(TableName tableName, byte[] row, bool if (e instanceof RemoteException) { e = ((RemoteException) e).unwrapRemoteException(); } - if (e instanceof CallQueueTooBigException) { - // Give a special check on CallQueueTooBigException, see #HBASE-17114 - pauseBase = this.pauseForCQTBE; + if (HBaseServerException.isServerOverloaded(e)) { + // Give a special pause when encountering an exception indicating the server + // is overloaded. see #HBASE-17114 and HBASE-26807 + pauseBase = connectionConfig.getPauseMillisForServerOverloaded(); } if (tries < maxAttempts - 1) { LOG.debug( diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java index 539b02dba3ad..3b0efba5f3b6 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java @@ -35,7 +35,6 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.ServerName; @@ -77,7 +76,7 @@ public class HTableMultiplexer { private final Map serverToFlushWorkerMap = new ConcurrentHashMap<>(); - private final Configuration workerConf; + private final Configuration conf; private final ClusterConnection conn; private final ExecutorService pool; private final int maxAttempts; @@ -116,11 +115,7 @@ public HTableMultiplexer(Connection conn, Configuration conf, this.executor = Executors.newScheduledThreadPool(initThreads, new ThreadFactoryBuilder().setDaemon(true).setNameFormat("HTableFlushWorker-%d").build()); - - this.workerConf = HBaseConfiguration.create(conf); - // We do not do the retry because we need to reassign puts to different queues if regions are - // moved. - this.workerConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 0); + this.conf = conf; } /** @@ -245,7 +240,7 @@ LinkedBlockingQueue getQueue(HRegionLocation addr) { worker = serverToFlushWorkerMap.get(addr); if (worker == null) { // Create the flush worker - worker = new FlushWorker(workerConf, this.conn, addr, this, + worker = new FlushWorker(conf, this.conn, addr, this, perRegionServerBufferQueueSize, pool, executor); this.serverToFlushWorkerMap.put(addr, worker); executor.scheduleAtFixedRate(worker, flushPeriod, flushPeriod, TimeUnit.MILLISECONDS); @@ -454,7 +449,9 @@ public FlushWorker(Configuration conf, ClusterConnection conn, HRegionLocation a HConstants.DEFAULT_HBASE_RPC_TIMEOUT)); this.operationTimeout = conf.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT); - this.ap = new AsyncProcess(conn, conf, rpcCallerFactory, rpcControllerFactory); + // Specify 0 retries in AsyncProcess because we need to reassign puts to different queues + // if regions are moved. + this.ap = new AsyncProcess(conn, conf, rpcCallerFactory, rpcControllerFactory, 0); this.executor = executor; this.maxRetryInQueue = conf.getInt(TABLE_MULTIPLEXER_MAX_RETRIES_IN_QUEUE, 10000); this.pool = pool; diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java index 29698c201336..4468c18b1797 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java @@ -333,7 +333,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin { private final long pauseNs; - private final long pauseForCQTBENs; + private final long pauseNsForServerOverloaded; private final int maxAttempts; @@ -349,15 +349,15 @@ class RawAsyncHBaseAdmin implements AsyncAdmin { this.rpcTimeoutNs = builder.rpcTimeoutNs; this.operationTimeoutNs = builder.operationTimeoutNs; this.pauseNs = builder.pauseNs; - if (builder.pauseForCQTBENs < builder.pauseNs) { + if (builder.pauseNsForServerOverloaded < builder.pauseNs) { LOG.warn( - "Configured value of pauseForCQTBENs is {} ms, which is less than" + + "Configured value of pauseNsForServerOverloaded is {} ms, which is less than" + " the normal pause value {} ms, use the greater one instead", - TimeUnit.NANOSECONDS.toMillis(builder.pauseForCQTBENs), + TimeUnit.NANOSECONDS.toMillis(builder.pauseNsForServerOverloaded), TimeUnit.NANOSECONDS.toMillis(builder.pauseNs)); - this.pauseForCQTBENs = builder.pauseNs; + this.pauseNsForServerOverloaded = builder.pauseNs; } else { - this.pauseForCQTBENs = builder.pauseForCQTBENs; + this.pauseNsForServerOverloaded = builder.pauseNsForServerOverloaded; } this.maxAttempts = builder.maxAttempts; this.startLogErrorsCnt = builder.startLogErrorsCnt; @@ -368,7 +368,8 @@ private MasterRequestCallerBuilder newMasterCaller() { return this.connection.callerFactory. masterRequest() .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS) .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS) - .pause(pauseNs, TimeUnit.NANOSECONDS).pauseForCQTBE(pauseForCQTBENs, TimeUnit.NANOSECONDS) + .pause(pauseNs, TimeUnit.NANOSECONDS) + .pauseForServerOverloaded(pauseNsForServerOverloaded, TimeUnit.NANOSECONDS) .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt); } @@ -376,7 +377,8 @@ private AdminRequestCallerBuilder newAdminCaller() { return this.connection.callerFactory. adminRequest() .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS) .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS) - .pause(pauseNs, TimeUnit.NANOSECONDS).pauseForCQTBE(pauseForCQTBENs, TimeUnit.NANOSECONDS) + .pause(pauseNs, TimeUnit.NANOSECONDS) + .pauseForServerOverloaded(pauseNsForServerOverloaded, TimeUnit.NANOSECONDS) .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt); } @@ -3458,7 +3460,8 @@ private ServerRequestCallerBuilder newServerCaller() { return this.connection.callerFactory. serverRequest() .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS) .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS) - .pause(pauseNs, TimeUnit.NANOSECONDS).pauseForCQTBE(pauseForCQTBENs, TimeUnit.NANOSECONDS) + .pause(pauseNs, TimeUnit.NANOSECONDS) + .pauseForServerOverloaded(pauseNsForServerOverloaded, TimeUnit.NANOSECONDS) .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java index c20610465b77..30791b1aa825 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java @@ -26,7 +26,6 @@ import static org.apache.hadoop.hbase.trace.TraceUtil.tracedFuture; import static org.apache.hadoop.hbase.trace.TraceUtil.tracedFutures; import static org.apache.hadoop.hbase.util.FutureUtils.addListener; - import com.google.protobuf.RpcChannel; import io.opentelemetry.api.trace.Span; import io.opentelemetry.api.trace.StatusCode; @@ -59,11 +58,9 @@ import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback; import org.apache.hbase.thirdparty.io.netty.util.Timer; - import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter; @@ -112,7 +109,7 @@ class RawAsyncTableImpl implements AsyncTable { private final long pauseNs; - private final long pauseForCQTBENs; + private final long pauseNsForServerOverloaded; private final int maxAttempts; @@ -128,15 +125,15 @@ class RawAsyncTableImpl implements AsyncTable { this.operationTimeoutNs = builder.operationTimeoutNs; this.scanTimeoutNs = builder.scanTimeoutNs; this.pauseNs = builder.pauseNs; - if (builder.pauseForCQTBENs < builder.pauseNs) { + if (builder.pauseNsForServerOverloaded < builder.pauseNs) { LOG.warn( - "Configured value of pauseForCQTBENs is {} ms, which is less than" - + " the normal pause value {} ms, use the greater one instead", - TimeUnit.NANOSECONDS.toMillis(builder.pauseForCQTBENs), + "Configured value of pauseNsForServerOverloaded is {} ms, which is less than" + + " the normal pause value {} ms, use the greater one instead", + TimeUnit.NANOSECONDS.toMillis(builder.pauseNsForServerOverloaded), TimeUnit.NANOSECONDS.toMillis(builder.pauseNs)); - this.pauseForCQTBENs = builder.pauseNs; + this.pauseNsForServerOverloaded = builder.pauseNs; } else { - this.pauseForCQTBENs = builder.pauseForCQTBENs; + this.pauseNsForServerOverloaded = builder.pauseNsForServerOverloaded; } this.maxAttempts = builder.maxAttempts; this.startLogErrorsCnt = builder.startLogErrorsCnt; @@ -242,11 +239,12 @@ private CompletableFuture noncedMutate(long nonceGroup, long n } private SingleRequestCallerBuilder newCaller(byte[] row, int priority, long rpcTimeoutNs) { - return conn.callerFactory. single().table(tableName).row(row).priority(priority) - .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS) - .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS) - .pause(pauseNs, TimeUnit.NANOSECONDS).pauseForCQTBE(pauseForCQTBENs, TimeUnit.NANOSECONDS) - .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt); + return conn.callerFactory.single().table(tableName).row(row).priority(priority) + .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS) + .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS) + .pause(pauseNs, TimeUnit.NANOSECONDS) + .pauseForServerOverloaded(pauseNsForServerOverloaded, TimeUnit.NANOSECONDS) + .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt); } private SingleRequestCallerBuilder @@ -648,8 +646,8 @@ private Scan setDefaultScanConfig(Scan scan) { @Override public void scan(Scan scan, AdvancedScanResultConsumer consumer) { new AsyncClientScanner(setDefaultScanConfig(scan), consumer, tableName, conn, retryTimer, - pauseNs, pauseForCQTBENs, maxAttempts, scanTimeoutNs, readRpcTimeoutNs, startLogErrorsCnt) - .start(); + pauseNs, pauseNsForServerOverloaded, maxAttempts, scanTimeoutNs, readRpcTimeoutNs, + startLogErrorsCnt).start(); } private long resultSize2CacheSize(long maxResultSize) { @@ -742,10 +740,10 @@ private List> batch(List actions, long r } } return conn.callerFactory.batch().table(tableName).actions(actions) - .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS) - .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, TimeUnit.NANOSECONDS) - .pauseForCQTBE(pauseForCQTBENs, TimeUnit.NANOSECONDS).maxAttempts(maxAttempts) - .startLogErrorsCnt(startLogErrorsCnt).call(); + .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS) + .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, TimeUnit.NANOSECONDS) + .pauseForServerOverloaded(pauseNsForServerOverloaded, TimeUnit.NANOSECONDS) + .startLogErrorsCnt(startLogErrorsCnt).call(); } @Override diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java index e7a3d1801044..7425f8837f62 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java @@ -21,8 +21,6 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.util.ReflectionUtils; import org.apache.yetus.audience.InterfaceAudience; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * Factory to create an {@link RpcRetryingCaller} @@ -32,12 +30,8 @@ public class RpcRetryingCallerFactory { /** Configuration key for a custom {@link RpcRetryingCaller} */ public static final String CUSTOM_CALLER_CONF_KEY = "hbase.rpc.callerfactory.class"; - private static final Logger LOG = LoggerFactory.getLogger(RpcRetryingCallerFactory.class); protected final Configuration conf; - private final long pause; - private final long pauseForCQTBE;// pause for CallQueueTooBigException, if specified - private final int retries; - private final int rpcTimeout; + private final ConnectionConfiguration connectionConf; private final RetryingCallerInterceptor interceptor; private final int startLogErrorsCnt; /* These below data members are UNUSED!!!*/ @@ -50,25 +44,12 @@ public RpcRetryingCallerFactory(Configuration conf) { public RpcRetryingCallerFactory(Configuration conf, RetryingCallerInterceptor interceptor) { this.conf = conf; - pause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE, - HConstants.DEFAULT_HBASE_CLIENT_PAUSE); - long configuredPauseForCQTBE = conf.getLong(HConstants.HBASE_CLIENT_PAUSE_FOR_CQTBE, pause); - if (configuredPauseForCQTBE < pause) { - LOG.warn("The " + HConstants.HBASE_CLIENT_PAUSE_FOR_CQTBE + " setting: " - + configuredPauseForCQTBE + " is smaller than " + HConstants.HBASE_CLIENT_PAUSE - + ", will use " + pause + " instead."); - this.pauseForCQTBE = pause; - } else { - this.pauseForCQTBE = configuredPauseForCQTBE; - } - retries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, - HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER); + this.connectionConf = new ConnectionConfiguration(conf); startLogErrorsCnt = conf.getInt(AsyncProcess.START_LOG_ERRORS_AFTER_COUNT_KEY, AsyncProcess.DEFAULT_START_LOG_ERRORS_AFTER_COUNT); this.interceptor = interceptor; enableBackPressure = conf.getBoolean(HConstants.ENABLE_CLIENT_BACKPRESSURE, HConstants.DEFAULT_ENABLE_CLIENT_BACKPRESSURE); - rpcTimeout = conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,HConstants.DEFAULT_HBASE_RPC_TIMEOUT); } /** @@ -84,9 +65,11 @@ public void setStatisticTracker(ServerStatisticTracker statisticTracker) { public RpcRetryingCaller newCaller(int rpcTimeout) { // We store the values in the factory instance. This way, constructing new objects // is cheap as it does not require parsing a complex structure. - RpcRetryingCaller caller = new RpcRetryingCallerImpl<>(pause, pauseForCQTBE, retries, - interceptor, startLogErrorsCnt, rpcTimeout); - return caller; + return new RpcRetryingCallerImpl<>( + connectionConf.getPauseMillis(), + connectionConf.getPauseMillisForServerOverloaded(), + connectionConf.getRetriesNumber(), + interceptor, startLogErrorsCnt, rpcTimeout); } /** @@ -95,9 +78,12 @@ public RpcRetryingCaller newCaller(int rpcTimeout) { public RpcRetryingCaller newCaller() { // We store the values in the factory instance. This way, constructing new objects // is cheap as it does not require parsing a complex structure. - RpcRetryingCaller caller = new RpcRetryingCallerImpl<>(pause, pauseForCQTBE, retries, - interceptor, startLogErrorsCnt, rpcTimeout); - return caller; + return new RpcRetryingCallerImpl<>( + connectionConf.getPauseMillis(), + connectionConf.getPauseMillisForServerOverloaded(), + connectionConf.getRetriesNumber(), + interceptor, startLogErrorsCnt, + connectionConf.getRpcTimeout()); } public static RpcRetryingCallerFactory instantiate(Configuration configuration) { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerImpl.java index 2a7dc5f7e7a9..57a864174439 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerImpl.java @@ -30,8 +30,8 @@ import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; -import org.apache.hadoop.hbase.CallQueueTooBigException; import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.hadoop.hbase.HBaseServerException; import org.apache.hadoop.hbase.exceptions.PreemptiveFastFailException; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.ExceptionUtil; @@ -60,7 +60,7 @@ public class RpcRetryingCallerImpl implements RpcRetryingCaller { private final int startLogErrorsCnt; private final long pause; - private final long pauseForCQTBE; + private final long pauseForServerOverloaded; private final int maxAttempts;// how many times to try private final int rpcTimeout;// timeout for each rpc request private final AtomicBoolean cancelled = new AtomicBoolean(false); @@ -68,15 +68,16 @@ public class RpcRetryingCallerImpl implements RpcRetryingCaller { private final RetryingCallerInterceptorContext context; private final RetryingTimeTracker tracker; - public RpcRetryingCallerImpl(long pause, long pauseForCQTBE, int retries, int startLogErrorsCnt) { - this(pause, pauseForCQTBE, retries, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, - startLogErrorsCnt, 0); + public RpcRetryingCallerImpl(long pause, long pauseForServerOverloaded, int retries, + int startLogErrorsCnt) { + this(pause, pauseForServerOverloaded, retries, + RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, startLogErrorsCnt, 0); } - public RpcRetryingCallerImpl(long pause, long pauseForCQTBE, int retries, + public RpcRetryingCallerImpl(long pause, long pauseForServerOverloaded, int retries, RetryingCallerInterceptor interceptor, int startLogErrorsCnt, int rpcTimeout) { this.pause = pause; - this.pauseForCQTBE = pauseForCQTBE; + this.pauseForServerOverloaded = pauseForServerOverloaded; this.maxAttempts = retries2Attempts(retries); this.interceptor = interceptor; context = interceptor.createEmptyContext(); @@ -148,8 +149,10 @@ public T callWithRetries(RetryingCallable callable, int callTimeout) // If the server is dead, we need to wait a little before retrying, to give // a chance to the regions to be moved // get right pause time, start by RETRY_BACKOFF[0] * pauseBase, where pauseBase might be - // special when encountering CallQueueTooBigException, see #HBASE-17114 - long pauseBase = (t instanceof CallQueueTooBigException) ? pauseForCQTBE : pause; + // special when encountering an exception indicating the server is overloaded. + // see #HBASE-17114 and HBASE-26807 + long pauseBase = HBaseServerException.isServerOverloaded(t) + ? pauseForServerOverloaded : pause; expectedSleep = callable.sleep(pauseBase, tries); // If, after the planned sleep, there won't be enough time left, we stop now. diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/exceptions/ClientExceptionsUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/exceptions/ClientExceptionsUtil.java index 2482a632ca8d..0e40e97eee17 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/exceptions/ClientExceptionsUtil.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/exceptions/ClientExceptionsUtil.java @@ -111,28 +111,6 @@ public static Throwable findException(Object exception) { return null; } - /** - * Checks if the exception is CallQueueTooBig exception (maybe wrapped - * into some RemoteException). - * @param t exception to check - * @return true if it's a CQTBE, false otherwise - */ - public static boolean isCallQueueTooBigException(Throwable t) { - t = findException(t); - return (t instanceof CallQueueTooBigException); - } - - /** - * Checks if the exception is CallDroppedException (maybe wrapped - * into some RemoteException). - * @param t exception to check - * @return true if it's a CQTBE, false otherwise - */ - public static boolean isCallDroppedException(Throwable t) { - t = findException(t); - return (t instanceof CallDroppedException); - } - // This list covers most connectivity exceptions but not all. // For example, in SocketOutputStream a plain IOException is thrown at times when the channel is // closed. diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java index fd42214d1d30..2b71493e76c9 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java @@ -40,14 +40,12 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.ipc.RemoteException; import org.apache.yetus.audience.InterfaceAudience; - import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; import org.apache.hbase.thirdparty.com.google.protobuf.CodedOutputStream; import org.apache.hbase.thirdparty.com.google.protobuf.Message; import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf; import org.apache.hbase.thirdparty.io.netty.channel.EventLoop; import org.apache.hbase.thirdparty.io.netty.util.concurrent.FastThreadLocal; - import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.CellBlockMeta; import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ExceptionResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader; @@ -140,11 +138,13 @@ static RequestHeader buildRequestHeader(Call call, CellBlockMeta cellBlockMeta) static RemoteException createRemoteException(final ExceptionResponse e) { String innerExceptionClassName = e.getExceptionClassName(); boolean doNotRetry = e.getDoNotRetry(); + boolean serverOverloaded = e.hasServerOverloaded() && e.getServerOverloaded(); return e.hasHostname() ? - // If a hostname then add it to the RemoteWithExtrasException - new RemoteWithExtrasException(innerExceptionClassName, e.getStackTrace(), e.getHostname(), - e.getPort(), doNotRetry) - : new RemoteWithExtrasException(innerExceptionClassName, e.getStackTrace(), doNotRetry); + // If a hostname then add it to the RemoteWithExtrasException + new RemoteWithExtrasException(innerExceptionClassName, e.getStackTrace(), e.getHostname(), + e.getPort(), doNotRetry, serverOverloaded) : + new RemoteWithExtrasException(innerExceptionClassName, e.getStackTrace(), doNotRetry, + serverOverloaded); } /** diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RemoteWithExtrasException.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RemoteWithExtrasException.java index bfc66e622e85..108b9068a2d9 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RemoteWithExtrasException.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RemoteWithExtrasException.java @@ -25,6 +25,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HBaseServerException; import org.apache.hadoop.hbase.util.DynamicClassLoader; import org.apache.hadoop.ipc.RemoteException; import org.apache.yetus.audience.InterfaceAudience; @@ -41,6 +42,7 @@ public class RemoteWithExtrasException extends RemoteException { private final String hostname; private final int port; private final boolean doNotRetry; + private final boolean serverOverloaded; /** * Dynamic class loader to load filter/comparators @@ -58,15 +60,26 @@ private final static class ClassLoaderHolder { } public RemoteWithExtrasException(String className, String msg, final boolean doNotRetry) { - this(className, msg, null, -1, doNotRetry); + this(className, msg, doNotRetry, false); + } + + public RemoteWithExtrasException(String className, String msg, final boolean doNotRetry, + final boolean serverOverloaded) { + this(className, msg, null, -1, doNotRetry, serverOverloaded); } public RemoteWithExtrasException(String className, String msg, final String hostname, - final int port, final boolean doNotRetry) { + final int port, final boolean doNotRetry) { + this(className, msg, hostname, port, doNotRetry, false); + } + + public RemoteWithExtrasException(String className, String msg, final String hostname, + final int port, final boolean doNotRetry, final boolean serverOverloaded) { super(className, msg); this.hostname = hostname; this.port = port; this.doNotRetry = doNotRetry; + this.serverOverloaded = serverOverloaded; } @Override @@ -98,6 +111,17 @@ private IOException instantiateException(Class cls) throw cn.setAccessible(true); IOException ex = cn.newInstance(this.getMessage()); ex.initCause(this); + + if (ex instanceof HBaseServerException) { + // this is a newly constructed exception. + // if an exception defaults to meaning isServerOverloaded, we use that. + // otherwise, see if the remote exception value should mean setting to true. + HBaseServerException serverException = (HBaseServerException) ex; + if (serverOverloaded && !serverException.isServerOverloaded()) { + serverException.setServerOverloaded(true); + } + } + return ex; } @@ -121,4 +145,11 @@ public int getPort() { public boolean isDoNotRetry() { return this.doNotRetry; } + + /** + * @return True if the server was considered overloaded when the exception was thrown. + */ + public boolean isServerOverloaded() { + return serverOverloaded; + } } diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncConnectionConfiguration.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncConnectionConfiguration.java index b2d5b872e757..d5a2b5b24861 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncConnectionConfiguration.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncConnectionConfiguration.java @@ -18,7 +18,7 @@ package org.apache.hadoop.hbase.client; import static org.junit.Assert.assertEquals; - +import static org.junit.Assert.assertTrue; import java.util.concurrent.TimeUnit; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseClassTestRule; @@ -40,6 +40,23 @@ public class TestAsyncConnectionConfiguration { public static final HBaseClassTestRule CLASS_RULE = HBaseClassTestRule.forClass(TestAsyncConnectionConfiguration.class); + @Test + public void itHandlesDeprecatedPauseForCQTBE() { + Configuration conf = new Configuration(); + long timeoutMs = 1000; + conf.setLong(HConstants.HBASE_CLIENT_PAUSE_FOR_CQTBE, timeoutMs); + AsyncConnectionConfiguration config = new AsyncConnectionConfiguration(conf); + + assertTrue(Configuration.isDeprecated(HConstants.HBASE_CLIENT_PAUSE_FOR_CQTBE)); + long expected = TimeUnit.MILLISECONDS.toNanos(timeoutMs); + assertEquals(expected, config.getPauseNsForServerOverloaded()); + + conf = new Configuration(); + conf.setLong(ConnectionConfiguration.HBASE_CLIENT_PAUSE_FOR_SERVER_OVERLOADED, timeoutMs); + config = new AsyncConnectionConfiguration(conf); + assertEquals(expected, config.getPauseNsForServerOverloaded()); + } + @Test public void testDefaultReadWriteRpcTimeout() { Configuration conf = HBaseConfiguration.create(); diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java index 1aa3bb70ab2d..32e5a03ecbca 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java @@ -48,9 +48,11 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.CallDroppedException; import org.apache.hadoop.hbase.CallQueueTooBigException; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseServerException; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HRegionLocation; @@ -1038,7 +1040,12 @@ public void run() { } private ClusterConnection createHConnection() throws IOException { - ClusterConnection hc = createHConnectionCommon(); + return createHConnection(CONNECTION_CONFIG); + } + + private ClusterConnection createHConnection(ConnectionConfiguration configuration) + throws IOException { + ClusterConnection hc = createHConnectionCommon(configuration); setMockLocation(hc, DUMMY_BYTES_1, new RegionLocations(loc1)); setMockLocation(hc, DUMMY_BYTES_2, new RegionLocations(loc2)); setMockLocation(hc, DUMMY_BYTES_3, new RegionLocations(loc3)); @@ -1048,8 +1055,9 @@ private ClusterConnection createHConnection() throws IOException { return hc; } - private ClusterConnection createHConnectionWithReplicas() throws IOException { - ClusterConnection hc = createHConnectionCommon(); + private ClusterConnection createHConnectionWithReplicas(ConnectionConfiguration configuration) + throws IOException { + ClusterConnection hc = createHConnectionCommon(configuration); setMockLocation(hc, DUMMY_BYTES_1, hrls1); setMockLocation(hc, DUMMY_BYTES_2, hrls2); setMockLocation(hc, DUMMY_BYTES_3, hrls3); @@ -1076,13 +1084,14 @@ private static void setMockLocation(ClusterConnection hc, byte[] row, Mockito.anyBoolean(), Mockito.anyBoolean())).thenReturn(result); } - private ClusterConnection createHConnectionCommon() { + private ClusterConnection createHConnectionCommon( + ConnectionConfiguration connectionConfiguration) { ClusterConnection hc = Mockito.mock(ClusterConnection.class); NonceGenerator ng = Mockito.mock(NonceGenerator.class); Mockito.when(ng.getNonceGroup()).thenReturn(HConstants.NO_NONCE); Mockito.when(hc.getNonceGenerator()).thenReturn(ng); Mockito.when(hc.getConfiguration()).thenReturn(CONF); - Mockito.when(hc.getConnectionConfiguration()).thenReturn(CONNECTION_CONFIG); + Mockito.when(hc.getConnectionConfiguration()).thenReturn(connectionConfiguration); return hc; } @@ -1608,11 +1617,11 @@ private MyAsyncProcessWithReplicas createReplicaAp( // TODO: this is kind of timing dependent... perhaps it should detect from createCaller // that the replica call has happened and that way control the ordering. Configuration conf = new Configuration(); - ClusterConnection conn = createHConnectionWithReplicas(); conf.setInt(AsyncProcess.PRIMARY_CALL_TIMEOUT_KEY, replicaAfterMs * 1000); if (retries >= 0) { conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, retries); } + ClusterConnection conn = createHConnectionWithReplicas(new ConnectionConfiguration(conf)); MyAsyncProcessWithReplicas ap = new MyAsyncProcessWithReplicas(conn, conf); ap.setCallDelays(primaryMs, replicaMs); return ap; @@ -1736,20 +1745,31 @@ public void testUncheckedException() throws Exception { } /** - * Test and make sure we could use a special pause setting when retry with - * CallQueueTooBigException, see HBASE-17114 - * @throws Exception if unexpected error happened during test + * Below tests make sure we could use a special pause setting when retry an exception + * where {@link HBaseServerException#isServerOverloaded(Throwable)} is true, see HBASE-17114 */ + @Test - public void testRetryPauseWithCallQueueTooBigException() throws Exception { - Configuration myConf = new Configuration(CONF); + public void testRetryPauseWhenServerOverloadedDueToCQTBE() throws Exception { + testRetryPauseWhenServerIsOverloaded(new CallQueueTooBigException()); + } + + @Test + public void testRetryPauseWhenServerOverloadedDueToCDE() throws Exception { + testRetryPauseWhenServerIsOverloaded(new CallDroppedException()); + } + + private void testRetryPauseWhenServerIsOverloaded( + HBaseServerException exception) throws IOException { + Configuration conf = new Configuration(CONF); final long specialPause = 500L; final int retries = 1; - myConf.setLong(HConstants.HBASE_CLIENT_PAUSE_FOR_CQTBE, specialPause); - myConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, retries); - ClusterConnection conn = new MyConnectionImpl(myConf); + conf.setLong(ConnectionConfiguration.HBASE_CLIENT_PAUSE_FOR_SERVER_OVERLOADED, specialPause); + conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, retries); + + ClusterConnection conn = new MyConnectionImpl(conf); AsyncProcessWithFailure ap = - new AsyncProcessWithFailure(conn, myConf, new CallQueueTooBigException()); + new AsyncProcessWithFailure(conn, conf, exception); BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE); BufferedMutatorImpl mutator = new BufferedMutatorImpl(conn, bufferParam, ap); @@ -1779,8 +1799,8 @@ public void testRetryPauseWithCallQueueTooBigException() throws Exception { // check and confirm normal IOE will use the normal pause final long normalPause = - myConf.getLong(HConstants.HBASE_CLIENT_PAUSE, HConstants.DEFAULT_HBASE_CLIENT_PAUSE); - ap = new AsyncProcessWithFailure(conn, myConf, new IOException()); + conf.getLong(HConstants.HBASE_CLIENT_PAUSE, HConstants.DEFAULT_HBASE_CLIENT_PAUSE); + ap = new AsyncProcessWithFailure(conn, conf, new IOException()); bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE); mutator = new BufferedMutatorImpl(conn, bufferParam, ap); Assert.assertNotNull(mutator.getAsyncProcess().createServerErrorTracker()); @@ -1806,9 +1826,9 @@ public void testRetryPauseWithCallQueueTooBigException() throws Exception { @Test public void testRetryWithExceptionClearsMetaCache() throws Exception { - ClusterConnection conn = createHConnection(); - Configuration myConf = conn.getConfiguration(); + Configuration myConf = new Configuration(CONF); myConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 0); + ClusterConnection conn = createHConnection(new ConnectionConfiguration(myConf)); AsyncProcessWithFailure ap = new AsyncProcessWithFailure(conn, myConf, new RegionOpeningException("test")); diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestConnectionConfiguration.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestConnectionConfiguration.java new file mode 100644 index 000000000000..5f1b4879e891 --- /dev/null +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestConnectionConfiguration.java @@ -0,0 +1,54 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.testclassification.ClientTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ ClientTests.class, SmallTests.class}) +public class TestConnectionConfiguration { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestConnectionConfiguration.class); + + @Test + public void itHandlesDeprecatedPauseForCQTBE() { + Configuration conf = new Configuration(); + long timeoutMs = 1000; + conf.setLong(HConstants.HBASE_CLIENT_PAUSE_FOR_CQTBE, timeoutMs); + ConnectionConfiguration config = new ConnectionConfiguration(conf); + + assertTrue(Configuration.isDeprecated(HConstants.HBASE_CLIENT_PAUSE_FOR_CQTBE)); + assertEquals(timeoutMs, config.getPauseMillisForServerOverloaded()); + + conf = new Configuration(); + conf.setLong(ConnectionConfiguration.HBASE_CLIENT_PAUSE_FOR_SERVER_OVERLOADED, timeoutMs); + config = new ConnectionConfiguration(conf); + assertEquals(timeoutMs, config.getPauseMillisForServerOverloaded()); + } +} diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestRpcRetryingCallerImpl.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestRpcRetryingCallerImpl.java new file mode 100644 index 000000000000..cc6d6f4229aa --- /dev/null +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestRpcRetryingCallerImpl.java @@ -0,0 +1,110 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import java.io.IOException; +import org.apache.hadoop.hbase.CallDroppedException; +import org.apache.hadoop.hbase.CallQueueTooBigException; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseServerException; +import org.apache.hadoop.hbase.testclassification.ClientTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ ClientTests.class, SmallTests.class}) +public class TestRpcRetryingCallerImpl { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestRpcRetryingCallerImpl.class); + + @Test + public void itUsesSpecialPauseForCQTBE() throws Exception { + itUsesSpecialPauseForServerOverloaded(CallQueueTooBigException.class); + } + + @Test + public void itUsesSpecialPauseForCDE() throws Exception { + itUsesSpecialPauseForServerOverloaded(CallDroppedException.class); + } + + private void itUsesSpecialPauseForServerOverloaded( + Class exceptionClass) throws Exception { + + // the actual values don't matter here as long as they're distinct. + // the ThrowingCallable will assert that the passed in pause from RpcRetryingCallerImpl + // matches the specialPauseMillis + long pauseMillis = 1; + long specialPauseMillis = 2; + + RpcRetryingCallerImpl caller = + new RpcRetryingCallerImpl<>(pauseMillis, specialPauseMillis, 2, 0); + + RetryingCallable callable = new ThrowingCallable( + CallQueueTooBigException.class, specialPauseMillis); + try { + caller.callWithRetries(callable, 5000); + fail("Expected " + exceptionClass.getSimpleName()); + } catch (RetriesExhaustedException e) { + assertTrue(e.getCause() instanceof HBaseServerException); + } + } + + private static class ThrowingCallable implements RetryingCallable { + private final Class exceptionClass; + private final long specialPauseMillis; + + public ThrowingCallable(Class exceptionClass, + long specialPauseMillis) { + this.exceptionClass = exceptionClass; + this.specialPauseMillis = specialPauseMillis; + } + + @Override + public void prepare(boolean reload) throws IOException { + + } + + @Override + public void throwable(Throwable t, boolean retrying) { + + } + + @Override + public String getExceptionMessageAdditionalDetail() { + return null; + } + + @Override + public long sleep(long pause, int tries) { + assertEquals(pause, specialPauseMillis); + return 0; + } + + @Override + public Void call(int callTimeout) throws Exception { + throw exceptionClass.getConstructor().newInstance(); + } + } +} diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestRemoteWithExtrasException.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestRemoteWithExtrasException.java new file mode 100644 index 000000000000..df6e6f2045ac --- /dev/null +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestRemoteWithExtrasException.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.ipc; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import java.io.IOException; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseServerException; +import org.apache.hadoop.hbase.testclassification.ClientTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ ClientTests.class, SmallTests.class }) +public class TestRemoteWithExtrasException { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestRemoteWithExtrasException.class); + + /** + * test verifies that we honor the inherent value of an exception for isServerOverloaded. + * We don't want a false value passed into RemoteWithExtrasExceptions to override the + * inherent value of an exception if it's already true. This could be due to an out of date + * server not sending the proto field we expect. + */ + @Test + public void itUsesExceptionDefaultValueForServerOverloaded() { + // pass false for server overloaded, we still expect the exception to be true due to + // the exception type + RemoteWithExtrasException ex = + new RemoteWithExtrasException(ServerOverloadedException.class.getName(), + "server is overloaded", false, false); + IOException result = ex.unwrapRemoteException(); + + assertEquals(result.getClass(), ServerOverloadedException.class); + assertTrue(((ServerOverloadedException) result).isServerOverloaded()); + } + + @Test + public void itUsesPassedServerOverloadedValue() { + String exceptionClass = HBaseServerException.class.getName(); + String message = "server is overloaded"; + RemoteWithExtrasException ex = + new RemoteWithExtrasException(exceptionClass, message, false, false); + IOException result = ex.unwrapRemoteException(); + + assertTrue(result instanceof HBaseServerException); + assertFalse(((HBaseServerException) result).isServerOverloaded()); + + // run again with true value passed in + ex = new RemoteWithExtrasException(exceptionClass, message, false, true); + result = ex.unwrapRemoteException(); + + assertTrue(result instanceof HBaseServerException); + // expect true this time + assertTrue(((HBaseServerException) result).isServerOverloaded()); + } + + private static class ServerOverloadedException extends HBaseServerException { + public ServerOverloadedException(String message) { + super(true, message); + } + } + +} diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java index db9ccee2eff1..3c15c7890559 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java @@ -831,7 +831,10 @@ public enum OperationStatusCode { /** * Parameter name for client pause value for special case such as call queue too big, etc. + * @deprecated Since 2.5.0, will be removed in 4.0.0. Please use + * hbase.client.pause.server.overloaded instead. */ + @Deprecated public static final String HBASE_CLIENT_PAUSE_FOR_CQTBE = "hbase.client.pause.cqtbe"; /** diff --git a/hbase-common/src/main/resources/hbase-default.xml b/hbase-common/src/main/resources/hbase-default.xml index 9238b647757d..ce5011deba22 100644 --- a/hbase-common/src/main/resources/hbase-default.xml +++ b/hbase-common/src/main/resources/hbase-default.xml @@ -493,12 +493,14 @@ possible configurations would overwhelm and obscure the important. this initial pause amount and how this pause works w/ retries. - hbase.client.pause.cqtbe + hbase.client.pause.server.overloaded - Whether or not to use a special client pause for - CallQueueTooBigException (cqtbe). Set this property to a higher value - than hbase.client.pause if you observe frequent CQTBE from the same - RegionServer and the call queue there keeps full + Pause time when encountering an exception indicating a + server is overloaded, CallQueueTooBigException or CallDroppedException. + Set this property to a higher value than hbase.client.pause if you + observe frequent CallQueueTooBigException or CallDroppedException from the same + RegionServer and the call queue there keeps filling up. This config used to be + called hbase.client.pause.cqtbe, which has been deprecated as of 2.5.0. hbase.client.retries.number diff --git a/hbase-protocol-shaded/src/main/protobuf/RPC.proto b/hbase-protocol-shaded/src/main/protobuf/RPC.proto index 1ccf6e84ee33..d1b73f7e0197 100644 --- a/hbase-protocol-shaded/src/main/protobuf/RPC.proto +++ b/hbase-protocol-shaded/src/main/protobuf/RPC.proto @@ -119,6 +119,8 @@ message ExceptionResponse { optional int32 port = 4; // Set if we are NOT to retry on receipt of this exception optional bool do_not_retry = 5; + // Set true if the server was considered to be overloaded when exception was thrown + optional bool server_overloaded = 6; } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerCall.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerCall.java index 59b0ad10e593..d827efea66fd 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerCall.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerCall.java @@ -30,6 +30,7 @@ import java.util.concurrent.atomic.AtomicInteger; import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.hadoop.hbase.HBaseServerException; import org.apache.hadoop.hbase.exceptions.RegionMovedException; import org.apache.hadoop.hbase.io.ByteBuffAllocator; import org.apache.hadoop.hbase.io.ByteBufferListOutputStream; @@ -320,6 +321,9 @@ static void setExceptionResponse(Throwable t, String errorMsg, RegionMovedException rme = (RegionMovedException)t; exceptionBuilder.setHostname(rme.getHostname()); exceptionBuilder.setPort(rme.getPort()); + } else if (t instanceof HBaseServerException) { + HBaseServerException hse = (HBaseServerException) t; + exceptionBuilder.setServerOverloaded(hse.isServerOverloaded()); } // Set the exception as the result of the method invocation. headerBuilder.setException(exceptionBuilder.build()); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java index ea25856ade94..6cd41bd2bd07 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java @@ -111,6 +111,8 @@ public static ClusterConnection getMockedConnectionAndDecorate(final Configurati throws IOException { ConnectionImplementation c = Mockito.mock(ConnectionImplementation.class); Mockito.when(c.getConfiguration()).thenReturn(conf); + ConnectionConfiguration connectionConfiguration = new ConnectionConfiguration(conf); + Mockito.when(c.getConnectionConfiguration()).thenReturn(connectionConfiguration); Mockito.doNothing().when(c).close(); // Make it so we return a particular location when asked. final HRegionLocation loc = new HRegionLocation(hri, sn); @@ -134,9 +136,10 @@ public static ClusterConnection getMockedConnectionAndDecorate(final Configurati } NonceGenerator ng = Mockito.mock(NonceGenerator.class); Mockito.when(c.getNonceGenerator()).thenReturn(ng); - Mockito.when(c.getAsyncProcess()).thenReturn( + AsyncProcess asyncProcess = new AsyncProcess(c, conf, RpcRetryingCallerFactory.instantiate(conf), - RpcControllerFactory.instantiate(conf))); + RpcControllerFactory.instantiate(conf)); + Mockito.when(c.getAsyncProcess()).thenReturn(asyncProcess); Mockito.when(c.getNewRpcRetryingCallerFactory(conf)).thenReturn( RpcRetryingCallerFactory.instantiate(conf, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, null)); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncClientPauseForCallQueueTooBig.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncClientPauseForServerOverloaded.java similarity index 66% rename from hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncClientPauseForCallQueueTooBig.java rename to hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncClientPauseForServerOverloaded.java index fb20ea0aa589..6bfe8e677154 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncClientPauseForCallQueueTooBig.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncClientPauseForServerOverloaded.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hbase.client; +import static org.apache.hadoop.hbase.client.ConnectionConfiguration.HBASE_CLIENT_PAUSE_FOR_SERVER_OVERLOADED; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; @@ -36,9 +37,11 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.ipc.CallRunner; +import org.apache.hadoop.hbase.ipc.PluggableBlockingQueue; import org.apache.hadoop.hbase.ipc.PriorityFunction; import org.apache.hadoop.hbase.ipc.RpcScheduler; import org.apache.hadoop.hbase.ipc.SimpleRpcScheduler; +import org.apache.hadoop.hbase.ipc.TestPluggableQueueImpl; import org.apache.hadoop.hbase.regionserver.RSRpcServices; import org.apache.hadoop.hbase.regionserver.RpcSchedulerFactory; import org.apache.hadoop.hbase.regionserver.SimpleRpcSchedulerFactory; @@ -56,31 +59,46 @@ import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor; @Category({ MediumTests.class, ClientTests.class }) -public class TestAsyncClientPauseForCallQueueTooBig { +public class TestAsyncClientPauseForServerOverloaded { @ClassRule public static final HBaseClassTestRule CLASS_RULE = - HBaseClassTestRule.forClass(TestAsyncClientPauseForCallQueueTooBig.class); + HBaseClassTestRule.forClass(TestAsyncClientPauseForServerOverloaded.class); private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); - private static TableName TABLE_NAME = TableName.valueOf("CQTBE"); + private static TableName TABLE_NAME = TableName.valueOf("ServerOverloaded"); private static byte[] FAMILY = Bytes.toBytes("Family"); private static byte[] QUALIFIER = Bytes.toBytes("Qualifier"); - private static long PAUSE_FOR_CQTBE_NS = TimeUnit.SECONDS.toNanos(1); + private static long PAUSE_FOR_SERVER_OVERLOADED_NANOS = TimeUnit.SECONDS.toNanos(1); + private static long PAUSE_FOR_SERVER_OVERLOADED_MILLIS = TimeUnit.NANOSECONDS.toMillis( + PAUSE_FOR_SERVER_OVERLOADED_NANOS); private static AsyncConnection CONN; - private static boolean FAIL = false; + private static volatile FailMode MODE = null; - private static ConcurrentMap INVOKED = new ConcurrentHashMap<>(); + enum FailMode { + CALL_QUEUE_TOO_BIG, + CALL_DROPPED; - public static final class CQTBERpcScheduler extends SimpleRpcScheduler { + private ConcurrentMap invoked = new ConcurrentHashMap<>(); - public CQTBERpcScheduler(Configuration conf, int handlerCount, int priorityHandlerCount, + // this is for test scan, where we will send a open scanner first and then a next, and we + // expect that we hit CQTBE two times. + private boolean shouldFail(CallRunner callRunner) { + MethodDescriptor method = callRunner.getRpcCall().getMethod(); + return invoked.computeIfAbsent(method, + k -> new AtomicInteger(0)).getAndIncrement() % 2 == 0; + } + } + + public static final class OverloadedRpcScheduler extends SimpleRpcScheduler { + + public OverloadedRpcScheduler(Configuration conf, int handlerCount, int priorityHandlerCount, int replicationHandlerCount, int metaTransitionHandler, PriorityFunction priority, Abortable server, int highPriorityLevel) { super(conf, handlerCount, priorityHandlerCount, replicationHandlerCount, @@ -89,25 +107,36 @@ public CQTBERpcScheduler(Configuration conf, int handlerCount, int priorityHandl @Override public boolean dispatch(CallRunner callTask) { - if (FAIL) { - MethodDescriptor method = callTask.getRpcCall().getMethod(); - // this is for test scan, where we will send a open scanner first and then a next, and we - // expect that we hit CQTBE two times. - if (INVOKED.computeIfAbsent(method, k -> new AtomicInteger(0)).getAndIncrement() % 2 == 0) { - return false; - } + if (MODE == FailMode.CALL_QUEUE_TOO_BIG && MODE.shouldFail(callTask)) { + return false; } return super.dispatch(callTask); } } - public static final class CQTBERpcSchedulerFactory extends SimpleRpcSchedulerFactory { + public static final class OverloadedQueue extends TestPluggableQueueImpl { + + public OverloadedQueue(int maxQueueLength, PriorityFunction priority, Configuration conf) { + super(maxQueueLength, priority, conf); + } + + @Override + public boolean offer(CallRunner callRunner) { + if (MODE == FailMode.CALL_DROPPED && MODE.shouldFail(callRunner)) { + callRunner.drop(); + return true; + } + return super.offer(callRunner); + } + } + + public static final class OverloadedRpcSchedulerFactory extends SimpleRpcSchedulerFactory { @Override public RpcScheduler create(Configuration conf, PriorityFunction priority, Abortable server) { int handlerCount = conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT, HConstants.DEFAULT_REGION_SERVER_HANDLER_COUNT); - return new CQTBERpcScheduler(conf, handlerCount, + return new OverloadedRpcScheduler(conf, handlerCount, conf.getInt(HConstants.REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT, HConstants.DEFAULT_REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT), conf.getInt(HConstants.REGION_SERVER_REPLICATION_HANDLER_COUNT, @@ -122,12 +151,16 @@ public RpcScheduler create(Configuration conf, PriorityFunction priority, Aborta @BeforeClass public static void setUp() throws Exception { UTIL.getConfiguration().setLong(HConstants.HBASE_CLIENT_PAUSE, 10); - UTIL.getConfiguration().setLong(HConstants.HBASE_CLIENT_PAUSE_FOR_CQTBE, - TimeUnit.NANOSECONDS.toMillis(PAUSE_FOR_CQTBE_NS)); + UTIL.getConfiguration().set("hbase.ipc.server.callqueue.type", "pluggable"); + UTIL.getConfiguration().setClass("hbase.ipc.server.callqueue.pluggable.queue.class.name", + OverloadedQueue.class, PluggableBlockingQueue.class); UTIL.getConfiguration().setClass(RSRpcServices.REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS, - CQTBERpcSchedulerFactory.class, RpcSchedulerFactory.class); + OverloadedRpcSchedulerFactory.class, RpcSchedulerFactory.class); UTIL.startMiniCluster(1); - CONN = ConnectionFactory.createAsyncConnection(UTIL.getConfiguration()).get(); + + Configuration conf = new Configuration(UTIL.getConfiguration()); + conf.setLong(HBASE_CLIENT_PAUSE_FOR_SERVER_OVERLOADED, PAUSE_FOR_SERVER_OVERLOADED_MILLIS); + CONN = ConnectionFactory.createAsyncConnection(conf).get(); } @AfterClass @@ -143,22 +176,28 @@ public void setUpBeforeTest() throws IOException { table.put(new Put(Bytes.toBytes(i)).addColumn(FAMILY, QUALIFIER, Bytes.toBytes(i))); } } - FAIL = true; + MODE = FailMode.CALL_QUEUE_TOO_BIG; } @After public void tearDownAfterTest() throws IOException { - FAIL = false; - INVOKED.clear(); + for (FailMode mode : FailMode.values()) { + mode.invoked.clear(); + } + MODE = null; UTIL.getAdmin().disableTable(TABLE_NAME); UTIL.getAdmin().deleteTable(TABLE_NAME); } private void assertTime(Callable callable, long time) throws Exception { - long startNs = System.nanoTime(); - callable.call(); - long costNs = System.nanoTime() - startNs; - assertTrue(costNs > time); + for (FailMode mode : FailMode.values()) { + MODE = mode; + + long startNs = System.nanoTime(); + callable.call(); + long costNs = System.nanoTime() - startNs; + assertTrue(costNs > time); + } } @Test @@ -167,7 +206,7 @@ public void testGet() throws Exception { Result result = CONN.getTable(TABLE_NAME).get(new Get(Bytes.toBytes(0))).get(); assertArrayEquals(Bytes.toBytes(0), result.getValue(FAMILY, QUALIFIER)); return null; - }, PAUSE_FOR_CQTBE_NS); + }, PAUSE_FOR_SERVER_OVERLOADED_NANOS); } @Test @@ -181,7 +220,7 @@ public void testBatch() throws Exception { } } return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get(); - }, PAUSE_FOR_CQTBE_NS); + }, PAUSE_FOR_SERVER_OVERLOADED_NANOS); } @Test @@ -197,6 +236,6 @@ public void testScan() throws Exception { assertNull(scanner.next()); } return null; - }, PAUSE_FOR_CQTBE_NS * 2); + }, PAUSE_FOR_SERVER_OVERLOADED_NANOS * 2); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestConnectionImplementation.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestConnectionImplementation.java index 1472fe77fcc2..74bae57da343 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestConnectionImplementation.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestConnectionImplementation.java @@ -22,9 +22,9 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; - import java.io.IOException; import java.lang.reflect.Field; +import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Modifier; import java.net.SocketTimeoutException; import java.util.ArrayList; @@ -41,8 +41,11 @@ import java.util.stream.Collectors; import java.util.stream.IntStream; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.CallDroppedException; +import org.apache.hadoop.hbase.CallQueueTooBigException; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseServerException; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionLocation; @@ -79,7 +82,6 @@ import org.junit.rules.TestName; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import org.apache.hbase.thirdparty.com.google.common.collect.Lists; import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.hbase.thirdparty.io.netty.util.ResourceLeakDetector; @@ -1086,6 +1088,100 @@ public void testLocateRegionsWithRegionReplicas() throws IOException { } } + @Test + public void testLocateRegionsRetrySpecialPauseCQTBE() throws IOException { + testLocateRegionsRetrySpecialPause(CallQueueTooBigException.class); + } + + @Test + public void testLocateRegionsRetrySpecialPauseCDE() throws IOException { + testLocateRegionsRetrySpecialPause(CallDroppedException.class); + } + + private void testLocateRegionsRetrySpecialPause( + Class exceptionClass) throws IOException { + + int regionReplication = 3; + byte[] family = Bytes.toBytes("cf"); + TableName tableName = TableName.valueOf(name.getMethodName()); + + // Create a table with region replicas + TableDescriptorBuilder builder = TableDescriptorBuilder + .newBuilder(tableName) + .setRegionReplication(regionReplication) + .setColumnFamily(ColumnFamilyDescriptorBuilder.of(family)); + TEST_UTIL.getAdmin().createTable(builder.build()); + + Configuration conf = new Configuration(TEST_UTIL.getConfiguration()); + + conf.setClass(RpcRetryingCallerFactory.CUSTOM_CALLER_CONF_KEY, + ThrowingCallerFactory.class, RpcRetryingCallerFactory.class); + conf.setClass("testSpecialPauseException", exceptionClass, HBaseServerException.class); + + conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2); + // normal pause very short, 10 millis + conf.setInt(HConstants.HBASE_CLIENT_PAUSE, 10); + + // special pause 10x longer, so we can detect it + long specialPauseMillis = 1000; + conf.setLong(ConnectionConfiguration.HBASE_CLIENT_PAUSE_FOR_SERVER_OVERLOADED, + specialPauseMillis); + + try (ConnectionImplementation con = + (ConnectionImplementation) ConnectionFactory.createConnection(conf)) { + // Get locations of the regions of the table + + long start = System.nanoTime(); + try { + con.locateRegion(tableName, new byte[0], false, true, 0); + } catch (HBaseServerException e) { + assertTrue(e.isServerOverloaded()); + // pass: expected + } + assertTrue(System.nanoTime() - start > TimeUnit.MILLISECONDS.toNanos(specialPauseMillis)); + } finally { + TEST_UTIL.deleteTable(tableName); + } + } + + private static class ThrowingCallerFactory extends RpcRetryingCallerFactory { + + private final Class exceptionClass; + + public ThrowingCallerFactory(Configuration conf) { + super(conf); + this.exceptionClass = conf.getClass("testSpecialPauseException", + null, HBaseServerException.class); + } + + @Override public RpcRetryingCaller newCaller(int rpcTimeout) { + return newCaller(); + } + + @Override public RpcRetryingCaller newCaller() { + return new RpcRetryingCaller() { + @Override public void cancel() { + + } + + @Override public T callWithRetries(RetryingCallable callable, int callTimeout) + throws IOException, RuntimeException { + return callWithoutRetries(null, 0); + } + + @Override public T callWithoutRetries(RetryingCallable callable, int callTimeout) + throws IOException, RuntimeException { + try { + throw exceptionClass.getConstructor().newInstance(); + } catch (IllegalAccessException | InstantiationException + | InvocationTargetException | NoSuchMethodException e) { + throw new RuntimeException(e); + } + } + }; + } + } + @Test public void testMetaLookupThreadPoolCreated() throws Exception { final TableName tableName = TableName.valueOf(name.getMethodName());