Skip to content

Commit

Permalink
HBASE-27798 Client side should back off based on wait interval in Rpc…
Browse files Browse the repository at this point in the history
…ThrottlingException (#5275)

Signed-off-by: Bryan Beaudreault <[email protected]>
Signed-off-by: Duo Zhang <[email protected]>
  • Loading branch information
rmdmattingly authored Jun 30, 2023
1 parent 25455b6 commit bba2f98
Show file tree
Hide file tree
Showing 7 changed files with 656 additions and 78 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,7 @@
package org.apache.hadoop.hbase.client;

import static org.apache.hadoop.hbase.CellUtil.createCellScanner;
import static org.apache.hadoop.hbase.client.ConnectionUtils.SLEEP_DELTA_NS;
import static org.apache.hadoop.hbase.client.ConnectionUtils.calcPriority;
import static org.apache.hadoop.hbase.client.ConnectionUtils.getPauseTime;
import static org.apache.hadoop.hbase.client.ConnectionUtils.resetController;
import static org.apache.hadoop.hbase.client.ConnectionUtils.translateException;
import static org.apache.hadoop.hbase.util.ConcurrentMapUtils.computeIfAbsent;
Expand All @@ -35,6 +33,7 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
Expand All @@ -56,6 +55,7 @@
import org.apache.hadoop.hbase.client.MultiResponse.RegionResult;
import org.apache.hadoop.hbase.client.RetriesExhaustedException.ThrowableWithExtraContext;
import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicy;
import org.apache.hadoop.hbase.client.backoff.HBaseServerExceptionPauseManager;
import org.apache.hadoop.hbase.client.backoff.ServerStatistics;
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.util.Bytes;
Expand Down Expand Up @@ -102,10 +102,6 @@ class AsyncBatchRpcRetryingCaller<T> {

private final IdentityHashMap<Action, List<ThrowableWithExtraContext>> action2Errors;

private final long pauseNs;

private final long pauseNsForServerOverloaded;

private final int maxAttempts;

private final long operationTimeoutNs;
Expand All @@ -116,6 +112,8 @@ class AsyncBatchRpcRetryingCaller<T> {

private final long startNs;

private final HBaseServerExceptionPauseManager pauseManager;

// we can not use HRegionLocation as the map key because the hashCode and equals method of
// HRegionLocation only consider serverName.
private static final class RegionRequest {
Expand Down Expand Up @@ -155,8 +153,6 @@ public AsyncBatchRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn,
this.retryTimer = retryTimer;
this.conn = conn;
this.tableName = tableName;
this.pauseNs = pauseNs;
this.pauseNsForServerOverloaded = pauseNsForServerOverloaded;
this.maxAttempts = maxAttempts;
this.operationTimeoutNs = operationTimeoutNs;
this.rpcTimeoutNs = rpcTimeoutNs;
Expand All @@ -182,6 +178,8 @@ public AsyncBatchRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn,
}
this.action2Errors = new IdentityHashMap<>();
this.startNs = System.nanoTime();
this.pauseManager =
new HBaseServerExceptionPauseManager(pauseNs, pauseNsForServerOverloaded, operationTimeoutNs);
}

private static boolean hasIncrementOrAppend(Row action) {
Expand All @@ -204,10 +202,6 @@ private static boolean hasIncrementOrAppend(RowMutations mutations) {
return false;
}

private long remainingTimeNs() {
return operationTimeoutNs - (System.nanoTime() - startNs);
}

private List<ThrowableWithExtraContext> removeErrors(Action action) {
synchronized (action2Errors) {
return action2Errors.remove(action);
Expand Down Expand Up @@ -360,14 +354,14 @@ private void onComplete(Map<byte[], RegionRequest> actionsByRegion, int tries,
}
});
if (!failedActions.isEmpty()) {
tryResubmit(failedActions.stream(), tries, retryImmediately.booleanValue(), false);
tryResubmit(failedActions.stream(), tries, retryImmediately.booleanValue(), null);
}
}

private void sendToServer(ServerName serverName, ServerRequest serverReq, int tries) {
long remainingNs;
if (operationTimeoutNs > 0) {
remainingNs = remainingTimeNs();
remainingNs = pauseManager.remainingTimeNs(startNs);
if (remainingNs <= 0) {
failAll(serverReq.actionsByRegion.values().stream().flatMap(r -> r.actions.stream()),
tries);
Expand Down Expand Up @@ -465,30 +459,23 @@ private void onError(Map<byte[], RegionRequest> actionsByRegion, int tries, Thro
List<Action> copiedActions = actionsByRegion.values().stream().flatMap(r -> r.actions.stream())
.collect(Collectors.toList());
addError(copiedActions, error, serverName);
tryResubmit(copiedActions.stream(), tries, error instanceof RetryImmediatelyException,
HBaseServerException.isServerOverloaded(error));
tryResubmit(copiedActions.stream(), tries, error instanceof RetryImmediatelyException, error);
}

private void tryResubmit(Stream<Action> actions, int tries, boolean immediately,
boolean isServerOverloaded) {
Throwable error) {
if (immediately) {
groupAndSend(actions, tries);
return;
}
long delayNs;
long pauseNsToUse = isServerOverloaded ? pauseNsForServerOverloaded : pauseNs;
if (operationTimeoutNs > 0) {
long maxDelayNs = remainingTimeNs() - SLEEP_DELTA_NS;
if (maxDelayNs <= 0) {
failAll(actions, tries);
return;
}
delayNs = Math.min(maxDelayNs, getPauseTime(pauseNsToUse, tries - 1));
} else {
delayNs = getPauseTime(pauseNsToUse, tries - 1);
}

if (isServerOverloaded) {
OptionalLong maybePauseNsToUse = pauseManager.getPauseNsFromException(error, tries, startNs);
if (!maybePauseNsToUse.isPresent()) {
failAll(actions, tries);
return;
}
long delayNs = maybePauseNsToUse.getAsLong();
if (HBaseServerException.isServerOverloaded(error)) {
Optional<MetricsConnection> metrics = conn.getConnectionMetrics();
metrics.ifPresent(m -> m.incrementServerOverloadedBackoffTime(delayNs, TimeUnit.NANOSECONDS));
}
Expand All @@ -498,7 +485,7 @@ private void tryResubmit(Stream<Action> actions, int tries, boolean immediately,
private void groupAndSend(Stream<Action> actions, int tries) {
long locateTimeoutNs;
if (operationTimeoutNs > 0) {
locateTimeoutNs = remainingTimeNs();
locateTimeoutNs = pauseManager.remainingTimeNs(startNs);
if (locateTimeoutNs <= 0) {
failAll(actions, tries);
return;
Expand Down Expand Up @@ -529,7 +516,7 @@ private void groupAndSend(Stream<Action> actions, int tries) {
sendOrDelay(actionsByServer, tries);
}
if (!locateFailed.isEmpty()) {
tryResubmit(locateFailed.stream(), tries, false, false);
tryResubmit(locateFailed.stream(), tries, false, null);
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,13 @@
*/
package org.apache.hadoop.hbase.client;

import static org.apache.hadoop.hbase.client.ConnectionUtils.SLEEP_DELTA_NS;
import static org.apache.hadoop.hbase.client.ConnectionUtils.getPauseTime;
import static org.apache.hadoop.hbase.client.ConnectionUtils.resetController;
import static org.apache.hadoop.hbase.client.ConnectionUtils.translateException;

import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
Expand All @@ -35,6 +34,7 @@
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNotEnabledException;
import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.client.backoff.HBaseServerExceptionPauseManager;
import org.apache.hadoop.hbase.exceptions.ScannerResetException;
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
Expand All @@ -56,10 +56,6 @@ public abstract class AsyncRpcRetryingCaller<T> {

private final long startNs;

private final long pauseNs;

private final long pauseNsForServerOverloaded;

private int tries = 1;

private final int maxAttempts;
Expand All @@ -78,14 +74,14 @@ public abstract class AsyncRpcRetryingCaller<T> {

protected final HBaseRpcController controller;

private final HBaseServerExceptionPauseManager pauseManager;

public AsyncRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn, int priority,
long pauseNs, long pauseNsForServerOverloaded, int maxAttempts, long operationTimeoutNs,
long rpcTimeoutNs, int startLogErrorsCnt) {
this.retryTimer = retryTimer;
this.conn = conn;
this.priority = priority;
this.pauseNs = pauseNs;
this.pauseNsForServerOverloaded = pauseNsForServerOverloaded;
this.maxAttempts = maxAttempts;
this.operationTimeoutNs = operationTimeoutNs;
this.rpcTimeoutNs = rpcTimeoutNs;
Expand All @@ -95,14 +91,16 @@ public AsyncRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn, int pr
this.controller.setPriority(priority);
this.exceptions = new ArrayList<>();
this.startNs = System.nanoTime();
this.pauseManager =
new HBaseServerExceptionPauseManager(pauseNs, pauseNsForServerOverloaded, operationTimeoutNs);
}

private long elapsedMs() {
return TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNs);
}

protected final long remainingTimeNs() {
return operationTimeoutNs - (System.nanoTime() - startNs);
return pauseManager.remainingTimeNs(startNs);
}

protected final void completeExceptionally() {
Expand All @@ -125,19 +123,12 @@ protected final void resetCallTimeout() {
}

private void tryScheduleRetry(Throwable error) {
long pauseNsToUse =
HBaseServerException.isServerOverloaded(error) ? pauseNsForServerOverloaded : pauseNs;
long delayNs;
if (operationTimeoutNs > 0) {
long maxDelayNs = remainingTimeNs() - SLEEP_DELTA_NS;
if (maxDelayNs <= 0) {
completeExceptionally();
return;
}
delayNs = Math.min(maxDelayNs, getPauseTime(pauseNsToUse, tries - 1));
} else {
delayNs = getPauseTime(pauseNsToUse, tries - 1);
OptionalLong maybePauseNsToUse = pauseManager.getPauseNsFromException(error, tries, startNs);
if (!maybePauseNsToUse.isPresent()) {
completeExceptionally();
return;
}
long delayNs = maybePauseNsToUse.getAsLong();
tries++;
if (HBaseServerException.isServerOverloaded(error)) {
Optional<MetricsConnection> metrics = conn.getConnectionMetrics();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@
*/
package org.apache.hadoop.hbase.client;

import static org.apache.hadoop.hbase.client.ConnectionUtils.SLEEP_DELTA_NS;
import static org.apache.hadoop.hbase.client.ConnectionUtils.getPauseTime;
import static org.apache.hadoop.hbase.client.ConnectionUtils.incRPCCallsMetrics;
import static org.apache.hadoop.hbase.client.ConnectionUtils.incRPCRetriesMetrics;
import static org.apache.hadoop.hbase.client.ConnectionUtils.noMoreResultsForReverseScan;
Expand All @@ -34,6 +32,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hbase.DoNotRetryIOException;
Expand All @@ -43,6 +42,7 @@
import org.apache.hadoop.hbase.NotServingRegionException;
import org.apache.hadoop.hbase.UnknownScannerException;
import org.apache.hadoop.hbase.client.AdvancedScanResultConsumer.ScanResumer;
import org.apache.hadoop.hbase.client.backoff.HBaseServerExceptionPauseManager;
import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException;
import org.apache.hadoop.hbase.exceptions.ScannerResetException;
Expand Down Expand Up @@ -99,10 +99,6 @@ class AsyncScanSingleRegionRpcRetryingCaller {

private final long scannerLeaseTimeoutPeriodNs;

private final long pauseNs;

private final long pauseNsForServerOverloaded;

private final int maxAttempts;

private final long scanTimeoutNs;
Expand Down Expand Up @@ -131,6 +127,8 @@ class AsyncScanSingleRegionRpcRetryingCaller {

private long nextCallSeq = -1L;

private final HBaseServerExceptionPauseManager pauseManager;

private enum ScanControllerState {
INITIALIZED,
SUSPENDED,
Expand Down Expand Up @@ -330,8 +328,6 @@ public AsyncScanSingleRegionRpcRetryingCaller(Timer retryTimer, AsyncConnectionI
this.loc = loc;
this.regionServerRemote = isRegionServerRemote;
this.scannerLeaseTimeoutPeriodNs = scannerLeaseTimeoutPeriodNs;
this.pauseNs = pauseNs;
this.pauseNsForServerOverloaded = pauseNsForServerOverloaded;
this.maxAttempts = maxAttempts;
this.scanTimeoutNs = scanTimeoutNs;
this.rpcTimeoutNs = rpcTimeoutNs;
Expand All @@ -346,16 +342,14 @@ public AsyncScanSingleRegionRpcRetryingCaller(Timer retryTimer, AsyncConnectionI
this.controller = conn.rpcControllerFactory.newController();
this.controller.setPriority(priority);
this.exceptions = new ArrayList<>();
this.pauseManager =
new HBaseServerExceptionPauseManager(pauseNs, pauseNsForServerOverloaded, scanTimeoutNs);
}

private long elapsedMs() {
return TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - nextCallStartNs);
}

private long remainingTimeNs() {
return scanTimeoutNs - (System.nanoTime() - nextCallStartNs);
}

private void closeScanner() {
incRPCCallsMetrics(scanMetrics, regionServerRemote);
resetController(controller, rpcTimeoutNs, HConstants.HIGH_QOS);
Expand Down Expand Up @@ -418,19 +412,14 @@ private void onError(Throwable error) {
completeExceptionally(!scannerClosed);
return;
}
long delayNs;
long pauseNsToUse =
HBaseServerException.isServerOverloaded(error) ? pauseNsForServerOverloaded : pauseNs;
if (scanTimeoutNs > 0) {
long maxDelayNs = remainingTimeNs() - SLEEP_DELTA_NS;
if (maxDelayNs <= 0) {
completeExceptionally(!scannerClosed);
return;
}
delayNs = Math.min(maxDelayNs, getPauseTime(pauseNsToUse, tries - 1));
} else {
delayNs = getPauseTime(pauseNsToUse, tries - 1);

OptionalLong maybePauseNsToUse =
pauseManager.getPauseNsFromException(error, tries, nextCallStartNs);
if (!maybePauseNsToUse.isPresent()) {
completeExceptionally(!scannerClosed);
return;
}
long delayNs = maybePauseNsToUse.getAsLong();
if (scannerClosed) {
completeWhenError(false);
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ static Result filterCells(Result result, Cell keepCellsAfter) {
}

// Add a delta to avoid timeout immediately after a retry sleeping.
static final long SLEEP_DELTA_NS = TimeUnit.MILLISECONDS.toNanos(1);
public static final long SLEEP_DELTA_NS = TimeUnit.MILLISECONDS.toNanos(1);

static Get toCheckExistenceOnly(Get get) {
if (get.isCheckExistenceOnly()) {
Expand Down
Loading

0 comments on commit bba2f98

Please sign in to comment.