Skip to content

Commit

Permalink
Retry follow task when remote connection queue full (#56073)
Browse files Browse the repository at this point in the history
If more than 100 shard-follow tasks are trying to connect to the remote
cluster, then some of them will abort with "connect listener queue is
full". This is because we retry on ESRejectedExecutionException, but not
on RejectedExecutionException.

Backport of #55314
  • Loading branch information
dnhatn authored May 2, 2020
1 parent 66f0e17 commit be2c7bf
Show file tree
Hide file tree
Showing 5 changed files with 30 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.unit.TimeValue;
Expand Down Expand Up @@ -86,7 +87,7 @@
* {@link RemoteClusterService#REMOTE_CONNECTIONS_PER_CLUSTER} until either all eligible nodes are exhausted or the maximum number of
* connections per cluster has been reached.
*/
final class RemoteClusterConnection implements TransportConnectionListener, Closeable {
public final class RemoteClusterConnection implements TransportConnectionListener, Closeable {

private static final Logger logger = LogManager.getLogger(RemoteClusterConnection.class);

Expand All @@ -102,8 +103,14 @@ final class RemoteClusterConnection implements TransportConnectionListener, Clos
private volatile boolean skipUnavailable;
private final ConnectHandler connectHandler;
private final TimeValue initialConnectionTimeout;
private final int maxPendingConnectionListeners;

private SetOnce<ClusterName> remoteClusterName = new SetOnce<>();

// this setting is intentionally not registered, it is only used in tests
public static final Setting<Integer> REMOTE_MAX_PENDING_CONNECTION_LISTENERS =
Setting.intSetting("cluster.remote.max_pending_connection_listeners", 1000, Setting.Property.NodeScope);

/**
* Creates a new {@link RemoteClusterConnection}
* @param settings the nodes settings object
Expand All @@ -128,6 +135,7 @@ final class RemoteClusterConnection implements TransportConnectionListener, Clos
String proxyAddress, ConnectionManager connectionManager) {
this.transportService = transportService;
this.maxNumRemoteConnections = maxNumRemoteConnections;
this.maxPendingConnectionListeners = REMOTE_MAX_PENDING_CONNECTION_LISTENERS.get(settings);
this.nodePredicate = nodePredicate;
this.clusterAlias = clusterAlias;
this.connectionManager = connectionManager;
Expand Down Expand Up @@ -393,14 +401,14 @@ public List<Tuple<String, Supplier<DiscoveryNode>>> getSeedNodes() {
* There is at most one connect job running at any time. If such a connect job is triggered
* while another job is running the provided listeners are queued and batched up until the current running job returns.
*
* The handler has a built-in queue that can hold up to 100 connect attempts and will reject requests once the queue is full.
* The handler has a built-in queue that can hold up to 1000 connect attempts and will reject requests once the queue is full.
* In a scenario when a remote cluster becomes unavailable we will queue requests up but if we can't connect quick enough
* we will just reject the connect trigger which will lead to failing searches.
*/
private class ConnectHandler implements Closeable {
private final Semaphore running = new Semaphore(1);
private final AtomicBoolean closed = new AtomicBoolean(false);
private final BlockingQueue<ActionListener<Void>> queue = new ArrayBlockingQueue<>(100);
private final BlockingQueue<ActionListener<Void>> queue = new ArrayBlockingQueue<>(maxPendingConnectionListeners);
private final CancellableThreads cancellableThreads = new CancellableThreads();

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.transport.RemoteClusterConnection;

import java.util.Arrays;
import java.util.List;
Expand All @@ -50,6 +51,7 @@ public List<Setting<?>> getSettings() {
INDEX_CREATION_DATE_SETTING,
PROVIDED_NAME_SETTING,
TRANSLOG_RETENTION_CHECK_INTERVAL_SETTING,
RemoteClusterConnection.REMOTE_MAX_PENDING_CONNECTION_LISTENERS,
IndexService.GLOBAL_CHECKPOINT_SYNC_INTERVAL_SETTING,
IndexModule.INDEX_QUERY_CACHE_EVERYTHING_SETTING,
IndexSettings.INDEX_TRANSLOG_RETENTION_TOTAL_FILES_SETTING
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
import org.elasticsearch.test.TestCluster;
import org.elasticsearch.test.discovery.TestZenDiscovery;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.transport.RemoteClusterConnection;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.ccr.CcrSettings;
import org.elasticsearch.xpack.ccr.LocalStateCcr;
Expand Down Expand Up @@ -139,7 +140,11 @@ protected Settings leaderClusterSettings() {
}

protected Settings followerClusterSettings() {
return Settings.EMPTY;
final Settings.Builder builder = Settings.builder();
if (randomBoolean()) {
builder.put(RemoteClusterConnection.REMOTE_MAX_PENDING_CONNECTION_LISTENERS.getKey(), randomIntBetween(1, 100));
}
return builder.build();
}

@Before
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ protected Collection<Class<? extends Plugin>> nodePlugins() {
@Override
protected Settings followerClusterSettings() {
return Settings.builder()
.put(super.followerClusterSettings())
.put(CcrRetentionLeases.RETENTION_LEASE_RENEW_INTERVAL_SETTING.getKey(), TimeValue.timeValueMillis(200))
.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.transport.RemoteClusterConnection;
import org.elasticsearch.transport.RemoteConnectionInfo;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.CcrIntegTestCase;
Expand All @@ -37,6 +38,15 @@ protected boolean configureRemoteClusterViaNodeSettings() {
return false;
}

@Override
protected Settings followerClusterSettings() {
final Settings.Builder settings = Settings.builder().put(super.followerClusterSettings());
if (randomBoolean()) {
settings.put(RemoteClusterConnection.REMOTE_MAX_PENDING_CONNECTION_LISTENERS.getKey(), 1);
}
return settings.build();
}

public void testFollowIndex() throws Exception {
final String leaderIndexSettings = getIndexSettings(1, 0,
singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true"));
Expand Down

0 comments on commit be2c7bf

Please sign in to comment.