Skip to content

Commit

Permalink
HBASE-28085 Configurably use scanner timeout as rpc timeout for scann…
Browse files Browse the repository at this point in the history
…er next calls
  • Loading branch information
bbeaudreault committed Sep 14, 2023
1 parent 3b34bc2 commit 1dc500c
Show file tree
Hide file tree
Showing 10 changed files with 97 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,11 @@ public ClientAsyncPrefetchScanner(Configuration configuration, Scan scan, TableN
ClusterConnection connection, RpcRetryingCallerFactory rpcCallerFactory,
RpcControllerFactory rpcControllerFactory, ExecutorService pool, int scanReadRpcTimeout,
int scannerTimeout, int replicaCallTimeoutMicroSecondScan,
Map<String, byte[]> requestAttributes) throws IOException {
ConnectionConfiguration connectionConfiguration, Map<String, byte[]> requestAttributes)
throws IOException {
super(configuration, scan, name, connection, rpcCallerFactory, rpcControllerFactory, pool,
scanReadRpcTimeout, scannerTimeout, replicaCallTimeoutMicroSecondScan, requestAttributes);
scanReadRpcTimeout, scannerTimeout, replicaCallTimeoutMicroSecondScan,
connectionConfiguration, requestAttributes);
exceptionsQueue = new ConcurrentLinkedQueue<>();
final Context context = Context.current();
final Runnable runnable = context.wrap(new PrefetchRunnable());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import org.apache.commons.lang3.mutable.MutableBoolean;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.NotServingRegionException;
import org.apache.hadoop.hbase.TableName;
Expand Down Expand Up @@ -78,6 +77,7 @@ public abstract class ClientScanner extends AbstractClientScanner {
protected final TableName tableName;
protected final int readRpcTimeout;
protected final int scannerTimeout;
private final boolean useScannerTimeoutForNextCalls;
protected boolean scanMetricsPublished = false;
protected RpcRetryingCaller<Result[]> caller;
protected RpcControllerFactory rpcControllerFactory;
Expand All @@ -104,7 +104,8 @@ public abstract class ClientScanner extends AbstractClientScanner {
public ClientScanner(final Configuration conf, final Scan scan, final TableName tableName,
ClusterConnection connection, RpcRetryingCallerFactory rpcFactory,
RpcControllerFactory controllerFactory, ExecutorService pool, int scanReadRpcTimeout,
int scannerTimeout, int primaryOperationTimeout, Map<String, byte[]> requestAttributes)
int scannerTimeout, int primaryOperationTimeout,
ConnectionConfiguration connectionConfiguration, Map<String, byte[]> requestAttributes)
throws IOException {
if (LOG.isTraceEnabled()) {
LOG.trace(
Expand All @@ -116,16 +117,15 @@ public ClientScanner(final Configuration conf, final Scan scan, final TableName
this.connection = connection;
this.pool = pool;
this.primaryOperationTimeout = primaryOperationTimeout;
this.retries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
this.retries = connectionConfiguration.getRetriesNumber();
if (scan.getMaxResultSize() > 0) {
this.maxScannerResultSize = scan.getMaxResultSize();
} else {
this.maxScannerResultSize = conf.getLong(HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY,
HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE);
this.maxScannerResultSize = connectionConfiguration.getScannerMaxResultSize();
}
this.readRpcTimeout = scanReadRpcTimeout;
this.scannerTimeout = scannerTimeout;
this.useScannerTimeoutForNextCalls = connectionConfiguration.isUseScannerTimeoutForNextCalls();
this.requestAttributes = requestAttributes;

// check if application wants to collect scan metrics
Expand All @@ -135,8 +135,7 @@ public ClientScanner(final Configuration conf, final Scan scan, final TableName
if (this.scan.getCaching() > 0) {
this.caching = this.scan.getCaching();
} else {
this.caching = conf.getInt(HConstants.HBASE_CLIENT_SCANNER_CACHING,
HConstants.DEFAULT_HBASE_CLIENT_SCANNER_CACHING);
this.caching = connectionConfiguration.getScannerCaching();
}

this.caller = rpcFactory.<Result[]> newCaller();
Expand Down Expand Up @@ -255,7 +254,7 @@ protected boolean moveToNextRegion() {
this.currentRegion = null;
this.callable = new ScannerCallableWithReplicas(getTable(), getConnection(),
createScannerCallable(), pool, primaryOperationTimeout, scan, getRetries(), readRpcTimeout,
scannerTimeout, caching, conf, caller);
scannerTimeout, useScannerTimeoutForNextCalls, caching, conf, caller);
this.callable.setCaching(this.caching);
incRegionCountMetrics(scanMetrics);
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,11 @@ public ClientSimpleScanner(Configuration configuration, Scan scan, TableName nam
ClusterConnection connection, RpcRetryingCallerFactory rpcCallerFactory,
RpcControllerFactory rpcControllerFactory, ExecutorService pool, int scanReadRpcTimeout,
int scannerTimeout, int replicaCallTimeoutMicroSecondScan,
Map<String, byte[]> requestAttributes) throws IOException {
ConnectionConfiguration connectionConfiguration, Map<String, byte[]> requestAttributes)
throws IOException {
super(configuration, scan, name, connection, rpcCallerFactory, rpcControllerFactory, pool,
scanReadRpcTimeout, scannerTimeout, replicaCallTimeoutMicroSecondScan, requestAttributes);
scanReadRpcTimeout, scannerTimeout, replicaCallTimeoutMicroSecondScan,
connectionConfiguration, requestAttributes);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,11 @@ public class ConnectionConfiguration {
public static final String HBASE_CLIENT_META_SCANNER_TIMEOUT =
"hbase.client.meta.scanner.timeout.period";

public static final String HBASE_CLIENT_USE_SCANNER_TIMEOUT_FOR_NEXT_CALLS =
"hbase.client.use.scanner.timeout.for.next.calls";

public static final boolean HBASE_CLIENT_USE_SCANNER_TIMEOUT_FOR_NEXT_CALLS_DEFAULT = false;

private final long writeBufferSize;
private final long writeBufferPeriodicFlushTimeoutMs;
private final long writeBufferPeriodicFlushTimerTickMs;
Expand All @@ -99,6 +104,7 @@ public class ConnectionConfiguration {
private final boolean clientScannerAsyncPrefetch;
private final long pauseMs;
private final long pauseMsForServerOverloaded;
private final boolean useScannerTimeoutForNextCalls;

/**
* Constructor
Expand Down Expand Up @@ -158,6 +164,9 @@ public class ConnectionConfiguration {
HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD);

this.metaScanTimeout = conf.getInt(HBASE_CLIENT_META_SCANNER_TIMEOUT, scanTimeout);
this.useScannerTimeoutForNextCalls =
conf.getBoolean(HBASE_CLIENT_USE_SCANNER_TIMEOUT_FOR_NEXT_CALLS,
HBASE_CLIENT_USE_SCANNER_TIMEOUT_FOR_NEXT_CALLS_DEFAULT);

long pauseMs = conf.getLong(HBASE_CLIENT_PAUSE, DEFAULT_HBASE_CLIENT_PAUSE);
long pauseMsForServerOverloaded = conf.getLong(HBASE_CLIENT_PAUSE_FOR_SERVER_OVERLOADED,
Expand Down Expand Up @@ -201,6 +210,7 @@ protected ConnectionConfiguration() {
this.metaScanTimeout = scanTimeout;
this.pauseMs = DEFAULT_HBASE_CLIENT_PAUSE;
this.pauseMsForServerOverloaded = DEFAULT_HBASE_CLIENT_PAUSE;
this.useScannerTimeoutForNextCalls = HBASE_CLIENT_USE_SCANNER_TIMEOUT_FOR_NEXT_CALLS_DEFAULT;
}

public int getReadRpcTimeout() {
Expand Down Expand Up @@ -275,6 +285,10 @@ public int getScanTimeout() {
return scanTimeout;
}

public boolean isUseScannerTimeoutForNextCalls() {
return useScannerTimeoutForNextCalls;
}

public int getMetaScanTimeout() {
return metaScanTimeout;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1052,7 +1052,7 @@ private RegionLocations locateRegionInMeta(TableName tableName, byte[] row, bool
ReversedClientScanner rcs = new ReversedClientScanner(conf, s, TableName.META_TABLE_NAME,
this, rpcCallerFactory, rpcControllerFactory, getMetaLookupPool(),
connectionConfig.getMetaReadRpcTimeout(), connectionConfig.getMetaScanTimeout(),
metaReplicaCallTimeoutScanInMicroSecond, Collections.emptyMap())) {
metaReplicaCallTimeoutScanInMicroSecond, connectionConfig, Collections.emptyMap())) {
boolean tableNotFound = true;
for (;;) {
Result regionInfoRow = rcs.next();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -321,16 +321,16 @@ public ResultScanner getScanner(Scan scan) throws IOException {
if (scan.isReversed()) {
return new ReversedClientScanner(getConfiguration(), scan, getName(), connection,
rpcCallerFactory, rpcControllerFactory, pool, scanReadRpcTimeout, scanTimeout,
replicaTimeout, requestAttributes);
replicaTimeout, connConfiguration, requestAttributes);
} else {
if (async) {
return new ClientAsyncPrefetchScanner(getConfiguration(), scan, getName(), connection,
rpcCallerFactory, rpcControllerFactory, pool, scanReadRpcTimeout, scanTimeout,
replicaTimeout, requestAttributes);
replicaTimeout, connConfiguration, requestAttributes);
} else {
return new ClientSimpleScanner(getConfiguration(), scan, getName(), connection,
rpcCallerFactory, rpcControllerFactory, pool, scanReadRpcTimeout, scanTimeout,
replicaTimeout, requestAttributes);
replicaTimeout, connConfiguration, requestAttributes);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,12 @@ public class ReversedClientScanner extends ClientScanner {
public ReversedClientScanner(Configuration conf, Scan scan, TableName tableName,
ClusterConnection connection, RpcRetryingCallerFactory rpcFactory,
RpcControllerFactory controllerFactory, ExecutorService pool, int scanReadRpcTimeout,
int scannerTimeout, int primaryOperationTimeout, Map<String, byte[]> requestAttributes)
int scannerTimeout, int primaryOperationTimeout,
ConnectionConfiguration connectionConfiguration, Map<String, byte[]> requestAttributes)
throws IOException {
super(conf, scan, tableName, connection, rpcFactory, controllerFactory, pool,
scanReadRpcTimeout, scannerTimeout, primaryOperationTimeout, requestAttributes);
scanReadRpcTimeout, scannerTimeout, primaryOperationTimeout, connectionConfiguration,
requestAttributes);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ class ScannerCallableWithReplicas implements RetryingCallable<Result[]> {
AtomicBoolean replicaSwitched = new AtomicBoolean(false);
private final ClusterConnection cConnection;
protected final ExecutorService pool;
private final boolean useScannerTimeoutForNextCalls;
protected final int timeBeforeReplicas;
private final Scan scan;
private final int retries;
Expand All @@ -72,11 +73,12 @@ class ScannerCallableWithReplicas implements RetryingCallable<Result[]> {

public ScannerCallableWithReplicas(TableName tableName, ClusterConnection cConnection,
ScannerCallable baseCallable, ExecutorService pool, int timeBeforeReplicas, Scan scan,
int retries, int readRpcTimeout, int scannerTimeout, int caching, Configuration conf,
RpcRetryingCaller<Result[]> caller) {
int retries, int readRpcTimeout, int scannerTimeout, boolean useScannerTimeoutForNextCalls,
int caching, Configuration conf, RpcRetryingCaller<Result[]> caller) {
this.currentScannerCallable = baseCallable;
this.cConnection = cConnection;
this.pool = pool;
this.useScannerTimeoutForNextCalls = useScannerTimeoutForNextCalls;
if (timeBeforeReplicas < 0) {
throw new IllegalArgumentException("Invalid value of operation timeout on the primary");
}
Expand Down Expand Up @@ -187,9 +189,12 @@ public Result[] call(int timeout) throws IOException {
pool, regionReplication * 5);

AtomicBoolean done = new AtomicBoolean(false);
// make sure we use the same rpcTimeout for current and other replicas
int rpcTimeoutForCall = getRpcTimeout();

replicaSwitched.set(false);
// submit call for the primary replica or user specified replica
addCallsForCurrentReplica(cs);
addCallsForCurrentReplica(cs, rpcTimeoutForCall);
int startIndex = 0;

try {
Expand Down Expand Up @@ -234,7 +239,7 @@ public Result[] call(int timeout) throws IOException {
endIndex = 1;
} else {
// TODO: this may be an overkill for large region replication
addCallsForOtherReplicas(cs, 0, regionReplication - 1);
addCallsForOtherReplicas(cs, 0, regionReplication - 1, rpcTimeoutForCall);
}

try {
Expand Down Expand Up @@ -326,15 +331,34 @@ public Cursor getCursor() {
return currentScannerCallable != null ? currentScannerCallable.getCursor() : null;
}

private void
addCallsForCurrentReplica(ResultBoundedCompletionService<Pair<Result[], ScannerCallable>> cs) {
private void addCallsForCurrentReplica(
ResultBoundedCompletionService<Pair<Result[], ScannerCallable>> cs, int rpcTimeout) {
RetryingRPC retryingOnReplica = new RetryingRPC(currentScannerCallable);
outstandingCallables.add(currentScannerCallable);
cs.submit(retryingOnReplica, readRpcTimeout, scannerTimeout, currentScannerCallable.id);
cs.submit(retryingOnReplica, rpcTimeout, scannerTimeout, currentScannerCallable.id);
}

/**
* As we have a call sequence for scan, it is useless to have a different rpc timeout which is
* less than the scan timeout. If the server does not respond in time(usually this will not happen
* as we have heartbeat now), we will get an OutOfOrderScannerNextException when resending the
* next request and the only way to fix this is to close the scanner and open a new one. The
* legacy behavior of ScannerCallable has been to use readRpcTimeout despite the above. If using
* legacy behavior, we always use that. If new behavior is enabled, we determine the rpc timeout
* to use based on whether the scanner is open. If scanner is open, use scannerTimeout otherwise
* use readRpcTimeout.
*/
private int getRpcTimeout() {
if (useScannerTimeoutForNextCalls) {
return currentScannerCallable.scannerId == -1 ? readRpcTimeout : scannerTimeout;
} else {
return readRpcTimeout;
}
}

private void addCallsForOtherReplicas(
ResultBoundedCompletionService<Pair<Result[], ScannerCallable>> cs, int min, int max) {
ResultBoundedCompletionService<Pair<Result[], ScannerCallable>> cs, int min, int max,
int rpcTimeout) {

for (int id = min; id <= max; id++) {
if (currentScannerCallable.id == id) {
Expand All @@ -344,7 +368,7 @@ private void addCallsForOtherReplicas(
setStartRowForReplicaCallable(s);
outstandingCallables.add(s);
RetryingRPC retryingOnReplica = new RetryingRPC(s);
cs.submit(retryingOnReplica, readRpcTimeout, scannerTimeout, id);
cs.submit(retryingOnReplica, rpcTimeout, scannerTimeout, id);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,12 +108,12 @@ private static class MockClientScanner extends ClientSimpleScanner {

public MockClientScanner(final Configuration conf, final Scan scan, final TableName tableName,
ClusterConnection connection, RpcRetryingCallerFactory rpcFactory,
RpcControllerFactory controllerFactory, ExecutorService pool, int primaryOperationTimeout)
throws IOException {
RpcControllerFactory controllerFactory, ExecutorService pool, int primaryOperationTimeout,
ConnectionConfiguration connectionConfig) throws IOException {
super(conf, scan, tableName, connection, rpcFactory, controllerFactory, pool,
HConstants.DEFAULT_HBASE_RPC_TIMEOUT,
HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, primaryOperationTimeout,
Collections.emptyMap());
connectionConfig, Collections.emptyMap());
}

@Override
Expand Down Expand Up @@ -178,7 +178,7 @@ public Result[] answer(InvocationOnMock invocation) throws Throwable {

try (MockClientScanner scanner =
new MockClientScanner(conf, scan, TableName.valueOf(name.getMethodName()), clusterConn,
rpcFactory, controllerFactory, pool, Integer.MAX_VALUE)) {
rpcFactory, controllerFactory, pool, Integer.MAX_VALUE, connectionConfig)) {

scanner.setRpcFinished(true);

Expand Down Expand Up @@ -242,7 +242,7 @@ public Result[] answer(InvocationOnMock invocation) throws Throwable {

try (MockClientScanner scanner =
new MockClientScanner(conf, scan, TableName.valueOf(name.getMethodName()), clusterConn,
rpcFactory, controllerFactory, pool, Integer.MAX_VALUE)) {
rpcFactory, controllerFactory, pool, Integer.MAX_VALUE, connectionConfig)) {
InOrder inOrder = Mockito.inOrder(caller);

scanner.loadCache();
Expand Down Expand Up @@ -305,7 +305,7 @@ public Result[] answer(InvocationOnMock invocation) throws Throwable {

try (MockClientScanner scanner =
new MockClientScanner(conf, scan, TableName.valueOf(name.getMethodName()), clusterConn,
rpcFactory, controllerFactory, pool, Integer.MAX_VALUE)) {
rpcFactory, controllerFactory, pool, Integer.MAX_VALUE, connectionConfig)) {
InOrder inOrder = Mockito.inOrder(caller);

scanner.loadCache();
Expand Down Expand Up @@ -376,7 +376,7 @@ public Result[] answer(InvocationOnMock invocation) throws Throwable {

try (MockClientScanner scanner =
new MockClientScanner(conf, scan, TableName.valueOf(name.getMethodName()), clusterConn,
rpcFactory, controllerFactory, pool, Integer.MAX_VALUE)) {
rpcFactory, controllerFactory, pool, Integer.MAX_VALUE, connectionConfig)) {
scanner.setRpcFinished(true);

InOrder inOrder = Mockito.inOrder(caller);
Expand Down Expand Up @@ -443,7 +443,7 @@ public Result[] answer(InvocationOnMock invocation) throws Throwable {

try (MockClientScanner scanner =
new MockClientScanner(conf, scan, TableName.valueOf(name.getMethodName()), clusterConn,
rpcFactory, controllerFactory, pool, Integer.MAX_VALUE)) {
rpcFactory, controllerFactory, pool, Integer.MAX_VALUE, connectionConfig)) {
InOrder inOrder = Mockito.inOrder(caller);
scanner.setRpcFinished(true);

Expand Down Expand Up @@ -488,7 +488,7 @@ public void testExceptionsFromReplicasArePropagated() throws IOException {

try (MockClientScanner scanner =
new MockClientScanner(conf, scan, TableName.valueOf(name.getMethodName()), clusterConn,
rpcFactory, new RpcControllerFactory(conf), pool, Integer.MAX_VALUE)) {
rpcFactory, new RpcControllerFactory(conf), pool, Integer.MAX_VALUE, connectionConfig)) {
Iterator<Result> iter = scanner.iterator();
while (iter.hasNext()) {
iter.next();
Expand Down
Loading

0 comments on commit 1dc500c

Please sign in to comment.