Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HBASE-28085 Configurably use scanner timeout as rpc timeout for scanner next calls #5402

Merged
merged 3 commits into from
Sep 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,11 @@ public ClientAsyncPrefetchScanner(Configuration configuration, Scan scan, TableN
ClusterConnection connection, RpcRetryingCallerFactory rpcCallerFactory,
RpcControllerFactory rpcControllerFactory, ExecutorService pool, int scanReadRpcTimeout,
int scannerTimeout, int replicaCallTimeoutMicroSecondScan,
Map<String, byte[]> requestAttributes) throws IOException {
ConnectionConfiguration connectionConfiguration, Map<String, byte[]> requestAttributes)
throws IOException {
super(configuration, scan, name, connection, rpcCallerFactory, rpcControllerFactory, pool,
scanReadRpcTimeout, scannerTimeout, replicaCallTimeoutMicroSecondScan, requestAttributes);
scanReadRpcTimeout, scannerTimeout, replicaCallTimeoutMicroSecondScan,
connectionConfiguration, requestAttributes);
exceptionsQueue = new ConcurrentLinkedQueue<>();
final Context context = Context.current();
final Runnable runnable = context.wrap(new PrefetchRunnable());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import org.apache.commons.lang3.mutable.MutableBoolean;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.NotServingRegionException;
import org.apache.hadoop.hbase.TableName;
Expand Down Expand Up @@ -78,6 +77,7 @@ public abstract class ClientScanner extends AbstractClientScanner {
protected final TableName tableName;
protected final int readRpcTimeout;
protected final int scannerTimeout;
private final boolean useScannerTimeoutForNextCalls;
protected boolean scanMetricsPublished = false;
protected RpcRetryingCaller<Result[]> caller;
protected RpcControllerFactory rpcControllerFactory;
Expand All @@ -104,7 +104,8 @@ public abstract class ClientScanner extends AbstractClientScanner {
public ClientScanner(final Configuration conf, final Scan scan, final TableName tableName,
ClusterConnection connection, RpcRetryingCallerFactory rpcFactory,
RpcControllerFactory controllerFactory, ExecutorService pool, int scanReadRpcTimeout,
int scannerTimeout, int primaryOperationTimeout, Map<String, byte[]> requestAttributes)
int scannerTimeout, int primaryOperationTimeout,
ConnectionConfiguration connectionConfiguration, Map<String, byte[]> requestAttributes)
throws IOException {
if (LOG.isTraceEnabled()) {
LOG.trace(
Expand All @@ -116,16 +117,15 @@ public ClientScanner(final Configuration conf, final Scan scan, final TableName
this.connection = connection;
this.pool = pool;
this.primaryOperationTimeout = primaryOperationTimeout;
this.retries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While I was here and needed a new config, I cleaned these up. Configuration getter calls introduce synchronization and parse cost, so it's better to use what we already parsed in ConnectionConfiguration.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
this.retries = connectionConfiguration.getRetriesNumber();
if (scan.getMaxResultSize() > 0) {
this.maxScannerResultSize = scan.getMaxResultSize();
} else {
this.maxScannerResultSize = conf.getLong(HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY,
HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE);
this.maxScannerResultSize = connectionConfiguration.getScannerMaxResultSize();
}
this.readRpcTimeout = scanReadRpcTimeout;
this.scannerTimeout = scannerTimeout;
this.useScannerTimeoutForNextCalls = connectionConfiguration.isUseScannerTimeoutForNextCalls();
this.requestAttributes = requestAttributes;

// check if application wants to collect scan metrics
Expand All @@ -135,8 +135,7 @@ public ClientScanner(final Configuration conf, final Scan scan, final TableName
if (this.scan.getCaching() > 0) {
this.caching = this.scan.getCaching();
} else {
this.caching = conf.getInt(HConstants.HBASE_CLIENT_SCANNER_CACHING,
HConstants.DEFAULT_HBASE_CLIENT_SCANNER_CACHING);
this.caching = connectionConfiguration.getScannerCaching();
}

this.caller = rpcFactory.<Result[]> newCaller();
Expand Down Expand Up @@ -255,7 +254,7 @@ protected boolean moveToNextRegion() {
this.currentRegion = null;
this.callable = new ScannerCallableWithReplicas(getTable(), getConnection(),
createScannerCallable(), pool, primaryOperationTimeout, scan, getRetries(), readRpcTimeout,
scannerTimeout, caching, conf, caller);
scannerTimeout, useScannerTimeoutForNextCalls, caching, conf, caller);
this.callable.setCaching(this.caching);
incRegionCountMetrics(scanMetrics);
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,11 @@ public ClientSimpleScanner(Configuration configuration, Scan scan, TableName nam
ClusterConnection connection, RpcRetryingCallerFactory rpcCallerFactory,
RpcControllerFactory rpcControllerFactory, ExecutorService pool, int scanReadRpcTimeout,
int scannerTimeout, int replicaCallTimeoutMicroSecondScan,
Map<String, byte[]> requestAttributes) throws IOException {
ConnectionConfiguration connectionConfiguration, Map<String, byte[]> requestAttributes)
throws IOException {
super(configuration, scan, name, connection, rpcCallerFactory, rpcControllerFactory, pool,
scanReadRpcTimeout, scannerTimeout, replicaCallTimeoutMicroSecondScan, requestAttributes);
scanReadRpcTimeout, scannerTimeout, replicaCallTimeoutMicroSecondScan,
connectionConfiguration, requestAttributes);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,12 @@ public class ConnectionConfiguration {
public static final String HBASE_CLIENT_META_SCANNER_TIMEOUT =
"hbase.client.meta.scanner.timeout.period";

public static final String HBASE_CLIENT_USE_SCANNER_TIMEOUT_PERIOD_FOR_NEXT_CALLS =
"hbase.client.use.scanner.timeout.period.for.next.calls";

public static final boolean HBASE_CLIENT_USE_SCANNER_TIMEOUT_PERIOD_FOR_NEXT_CALLS_DEFAULT =
false;

private final long writeBufferSize;
private final long writeBufferPeriodicFlushTimeoutMs;
private final long writeBufferPeriodicFlushTimerTickMs;
Expand All @@ -99,6 +105,7 @@ public class ConnectionConfiguration {
private final boolean clientScannerAsyncPrefetch;
private final long pauseMs;
private final long pauseMsForServerOverloaded;
private final boolean useScannerTimeoutForNextCalls;

/**
* Constructor
Expand Down Expand Up @@ -158,6 +165,9 @@ public class ConnectionConfiguration {
HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD);

this.metaScanTimeout = conf.getInt(HBASE_CLIENT_META_SCANNER_TIMEOUT, scanTimeout);
this.useScannerTimeoutForNextCalls =
conf.getBoolean(HBASE_CLIENT_USE_SCANNER_TIMEOUT_PERIOD_FOR_NEXT_CALLS,
HBASE_CLIENT_USE_SCANNER_TIMEOUT_PERIOD_FOR_NEXT_CALLS_DEFAULT);

long pauseMs = conf.getLong(HBASE_CLIENT_PAUSE, DEFAULT_HBASE_CLIENT_PAUSE);
long pauseMsForServerOverloaded = conf.getLong(HBASE_CLIENT_PAUSE_FOR_SERVER_OVERLOADED,
Expand Down Expand Up @@ -201,6 +211,8 @@ protected ConnectionConfiguration() {
this.metaScanTimeout = scanTimeout;
this.pauseMs = DEFAULT_HBASE_CLIENT_PAUSE;
this.pauseMsForServerOverloaded = DEFAULT_HBASE_CLIENT_PAUSE;
this.useScannerTimeoutForNextCalls =
HBASE_CLIENT_USE_SCANNER_TIMEOUT_PERIOD_FOR_NEXT_CALLS_DEFAULT;
}

public int getReadRpcTimeout() {
Expand Down Expand Up @@ -275,6 +287,10 @@ public int getScanTimeout() {
return scanTimeout;
}

public boolean isUseScannerTimeoutForNextCalls() {
return useScannerTimeoutForNextCalls;
}

public int getMetaScanTimeout() {
return metaScanTimeout;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1052,7 +1052,7 @@ private RegionLocations locateRegionInMeta(TableName tableName, byte[] row, bool
ReversedClientScanner rcs = new ReversedClientScanner(conf, s, TableName.META_TABLE_NAME,
this, rpcCallerFactory, rpcControllerFactory, getMetaLookupPool(),
connectionConfig.getMetaReadRpcTimeout(), connectionConfig.getMetaScanTimeout(),
metaReplicaCallTimeoutScanInMicroSecond, Collections.emptyMap())) {
metaReplicaCallTimeoutScanInMicroSecond, connectionConfig, Collections.emptyMap())) {
boolean tableNotFound = true;
for (;;) {
Result regionInfoRow = rcs.next();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -321,16 +321,16 @@ public ResultScanner getScanner(Scan scan) throws IOException {
if (scan.isReversed()) {
return new ReversedClientScanner(getConfiguration(), scan, getName(), connection,
rpcCallerFactory, rpcControllerFactory, pool, scanReadRpcTimeout, scanTimeout,
replicaTimeout, requestAttributes);
replicaTimeout, connConfiguration, requestAttributes);
} else {
if (async) {
return new ClientAsyncPrefetchScanner(getConfiguration(), scan, getName(), connection,
rpcCallerFactory, rpcControllerFactory, pool, scanReadRpcTimeout, scanTimeout,
replicaTimeout, requestAttributes);
replicaTimeout, connConfiguration, requestAttributes);
} else {
return new ClientSimpleScanner(getConfiguration(), scan, getName(), connection,
rpcCallerFactory, rpcControllerFactory, pool, scanReadRpcTimeout, scanTimeout,
replicaTimeout, requestAttributes);
replicaTimeout, connConfiguration, requestAttributes);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,12 @@ public class ReversedClientScanner extends ClientScanner {
public ReversedClientScanner(Configuration conf, Scan scan, TableName tableName,
ClusterConnection connection, RpcRetryingCallerFactory rpcFactory,
RpcControllerFactory controllerFactory, ExecutorService pool, int scanReadRpcTimeout,
int scannerTimeout, int primaryOperationTimeout, Map<String, byte[]> requestAttributes)
int scannerTimeout, int primaryOperationTimeout,
ConnectionConfiguration connectionConfiguration, Map<String, byte[]> requestAttributes)
throws IOException {
super(conf, scan, tableName, connection, rpcFactory, controllerFactory, pool,
scanReadRpcTimeout, scannerTimeout, primaryOperationTimeout, requestAttributes);
scanReadRpcTimeout, scannerTimeout, primaryOperationTimeout, connectionConfiguration,
requestAttributes);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ class ScannerCallableWithReplicas implements RetryingCallable<Result[]> {
AtomicBoolean replicaSwitched = new AtomicBoolean(false);
private final ClusterConnection cConnection;
protected final ExecutorService pool;
private final boolean useScannerTimeoutForNextCalls;
protected final int timeBeforeReplicas;
private final Scan scan;
private final int retries;
Expand All @@ -72,11 +73,12 @@ class ScannerCallableWithReplicas implements RetryingCallable<Result[]> {

public ScannerCallableWithReplicas(TableName tableName, ClusterConnection cConnection,
ScannerCallable baseCallable, ExecutorService pool, int timeBeforeReplicas, Scan scan,
int retries, int readRpcTimeout, int scannerTimeout, int caching, Configuration conf,
RpcRetryingCaller<Result[]> caller) {
int retries, int readRpcTimeout, int scannerTimeout, boolean useScannerTimeoutForNextCalls,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it better to thread through the instance of ConnectionConfiguration rather than this new boolean flag? Probably many of these fields can be folded down into an inexpensive field lookup from the ConnectionConfiguration instance instead.

Probably this is more of a refactor than you were ready to bite off, and all this code gets tossed as of 3.0, yes?

Copy link
Contributor Author

@bbeaudreault bbeaudreault Sep 19, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, it's not so simple here. The readRpcTimeout and scannerTimeout both can be overridden on a per-Table basis via TableBuilder. Caching can be overridden by the Scan. So really just retries and my new boolean could be unified in this way. I could do it, but yea trying not to refactor too much. Also I could imagine making retries configurable via TableBuilder in the future which would negate this change... not necessarily somethign we should design for, just saying.

Yes, this code goes away in 3.0, but I'm sure we'll be maintaining it for years to come. So I think it's worth doing useful refactors, but not sure this one qualifies given the special cases above.

int caching, Configuration conf, RpcRetryingCaller<Result[]> caller) {
this.currentScannerCallable = baseCallable;
this.cConnection = cConnection;
this.pool = pool;
this.useScannerTimeoutForNextCalls = useScannerTimeoutForNextCalls;
if (timeBeforeReplicas < 0) {
throw new IllegalArgumentException("Invalid value of operation timeout on the primary");
}
Expand Down Expand Up @@ -187,9 +189,12 @@ public Result[] call(int timeout) throws IOException {
pool, regionReplication * 5);

AtomicBoolean done = new AtomicBoolean(false);
// make sure we use the same rpcTimeout for current and other replicas
int rpcTimeoutForCall = getRpcTimeout();

replicaSwitched.set(false);
// submit call for the primary replica or user specified replica
addCallsForCurrentReplica(cs);
addCallsForCurrentReplica(cs, rpcTimeoutForCall);
int startIndex = 0;

try {
Expand Down Expand Up @@ -234,7 +239,7 @@ public Result[] call(int timeout) throws IOException {
endIndex = 1;
} else {
// TODO: this may be an overkill for large region replication
addCallsForOtherReplicas(cs, 0, regionReplication - 1);
addCallsForOtherReplicas(cs, 0, regionReplication - 1, rpcTimeoutForCall);
}

try {
Expand Down Expand Up @@ -326,15 +331,41 @@ public Cursor getCursor() {
return currentScannerCallable != null ? currentScannerCallable.getCursor() : null;
}

private void
addCallsForCurrentReplica(ResultBoundedCompletionService<Pair<Result[], ScannerCallable>> cs) {
private void addCallsForCurrentReplica(
ResultBoundedCompletionService<Pair<Result[], ScannerCallable>> cs, int rpcTimeout) {
RetryingRPC retryingOnReplica = new RetryingRPC(currentScannerCallable);
outstandingCallables.add(currentScannerCallable);
cs.submit(retryingOnReplica, readRpcTimeout, scannerTimeout, currentScannerCallable.id);
cs.submit(retryingOnReplica, rpcTimeout, scannerTimeout, currentScannerCallable.id);
}

/**
* As we have a call sequence for scan, it is useless to have a different rpc timeout which is
* less than the scan timeout. If the server does not respond in time(usually this will not happen
* as we have heartbeat now), we will get an OutOfOrderScannerNextException when resending the
* next request and the only way to fix this is to close the scanner and open a new one.
* <p>
* The legacy behavior of ScannerCallable has been to use readRpcTimeout despite the above. If
* using legacy behavior, we always use that.
* <p>
* If new behavior is enabled, we determine the rpc timeout to use based on whether the scanner is
* open. If scanner is open, use scannerTimeout otherwise use readRpcTimeout.
*/
private int getRpcTimeout() {
if (useScannerTimeoutForNextCalls) {
return isNextCall() ? scannerTimeout : readRpcTimeout;
} else {
return readRpcTimeout;
}
}

private boolean isNextCall() {
return currentScannerCallable != null && currentScannerCallable.scannerId != -1
&& !currentScannerCallable.renew && !currentScannerCallable.closed;
}

private void addCallsForOtherReplicas(
ResultBoundedCompletionService<Pair<Result[], ScannerCallable>> cs, int min, int max) {
ResultBoundedCompletionService<Pair<Result[], ScannerCallable>> cs, int min, int max,
int rpcTimeout) {

for (int id = min; id <= max; id++) {
if (currentScannerCallable.id == id) {
Expand All @@ -344,7 +375,7 @@ private void addCallsForOtherReplicas(
setStartRowForReplicaCallable(s);
outstandingCallables.add(s);
RetryingRPC retryingOnReplica = new RetryingRPC(s);
cs.submit(retryingOnReplica, readRpcTimeout, scannerTimeout, id);
cs.submit(retryingOnReplica, rpcTimeout, scannerTimeout, id);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,12 +108,12 @@ private static class MockClientScanner extends ClientSimpleScanner {

public MockClientScanner(final Configuration conf, final Scan scan, final TableName tableName,
ClusterConnection connection, RpcRetryingCallerFactory rpcFactory,
RpcControllerFactory controllerFactory, ExecutorService pool, int primaryOperationTimeout)
throws IOException {
RpcControllerFactory controllerFactory, ExecutorService pool, int primaryOperationTimeout,
ConnectionConfiguration connectionConfig) throws IOException {
super(conf, scan, tableName, connection, rpcFactory, controllerFactory, pool,
HConstants.DEFAULT_HBASE_RPC_TIMEOUT,
HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, primaryOperationTimeout,
Collections.emptyMap());
connectionConfig, Collections.emptyMap());
}

@Override
Expand Down Expand Up @@ -178,7 +178,7 @@ public Result[] answer(InvocationOnMock invocation) throws Throwable {

try (MockClientScanner scanner =
new MockClientScanner(conf, scan, TableName.valueOf(name.getMethodName()), clusterConn,
rpcFactory, controllerFactory, pool, Integer.MAX_VALUE)) {
rpcFactory, controllerFactory, pool, Integer.MAX_VALUE, connectionConfig)) {

scanner.setRpcFinished(true);

Expand Down Expand Up @@ -242,7 +242,7 @@ public Result[] answer(InvocationOnMock invocation) throws Throwable {

try (MockClientScanner scanner =
new MockClientScanner(conf, scan, TableName.valueOf(name.getMethodName()), clusterConn,
rpcFactory, controllerFactory, pool, Integer.MAX_VALUE)) {
rpcFactory, controllerFactory, pool, Integer.MAX_VALUE, connectionConfig)) {
InOrder inOrder = Mockito.inOrder(caller);

scanner.loadCache();
Expand Down Expand Up @@ -305,7 +305,7 @@ public Result[] answer(InvocationOnMock invocation) throws Throwable {

try (MockClientScanner scanner =
new MockClientScanner(conf, scan, TableName.valueOf(name.getMethodName()), clusterConn,
rpcFactory, controllerFactory, pool, Integer.MAX_VALUE)) {
rpcFactory, controllerFactory, pool, Integer.MAX_VALUE, connectionConfig)) {
InOrder inOrder = Mockito.inOrder(caller);

scanner.loadCache();
Expand Down Expand Up @@ -376,7 +376,7 @@ public Result[] answer(InvocationOnMock invocation) throws Throwable {

try (MockClientScanner scanner =
new MockClientScanner(conf, scan, TableName.valueOf(name.getMethodName()), clusterConn,
rpcFactory, controllerFactory, pool, Integer.MAX_VALUE)) {
rpcFactory, controllerFactory, pool, Integer.MAX_VALUE, connectionConfig)) {
scanner.setRpcFinished(true);

InOrder inOrder = Mockito.inOrder(caller);
Expand Down Expand Up @@ -443,7 +443,7 @@ public Result[] answer(InvocationOnMock invocation) throws Throwable {

try (MockClientScanner scanner =
new MockClientScanner(conf, scan, TableName.valueOf(name.getMethodName()), clusterConn,
rpcFactory, controllerFactory, pool, Integer.MAX_VALUE)) {
rpcFactory, controllerFactory, pool, Integer.MAX_VALUE, connectionConfig)) {
InOrder inOrder = Mockito.inOrder(caller);
scanner.setRpcFinished(true);

Expand Down Expand Up @@ -488,7 +488,7 @@ public void testExceptionsFromReplicasArePropagated() throws IOException {

try (MockClientScanner scanner =
new MockClientScanner(conf, scan, TableName.valueOf(name.getMethodName()), clusterConn,
rpcFactory, new RpcControllerFactory(conf), pool, Integer.MAX_VALUE)) {
rpcFactory, new RpcControllerFactory(conf), pool, Integer.MAX_VALUE, connectionConfig)) {
Iterator<Result> iter = scanner.iterator();
while (iter.hasNext()) {
iter.next();
Expand Down
Loading