From be2c7bf0f8427387e84c68ad8d2d9abbc60a64da Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Fri, 1 May 2020 20:29:03 -0400 Subject: [PATCH] Retry follow task when remote connection queue full (#56073) If more than 100 shard-follow tasks are trying to connect to the remote cluster, then some of them will abort with "connect listener queue is full". This is because we retry on ESRejectedExecutionException, but not on RejectedExecutionException. Backport of #55314 --- .../transport/RemoteClusterConnection.java | 14 +++++++++++--- .../elasticsearch/test/InternalSettingsPlugin.java | 2 ++ .../org/elasticsearch/xpack/CcrIntegTestCase.java | 7 ++++++- .../xpack/ccr/CcrRetentionLeaseIT.java | 1 + .../xpack/ccr/RestartIndexFollowingIT.java | 10 ++++++++++ 5 files changed, 30 insertions(+), 4 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java b/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java index 8a66998a47424..f03b3bf273511 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java @@ -41,6 +41,7 @@ import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.unit.TimeValue; @@ -86,7 +87,7 @@ * {@link RemoteClusterService#REMOTE_CONNECTIONS_PER_CLUSTER} until either all eligible nodes are exhausted or the maximum number of * connections per cluster has been reached. */ -final class RemoteClusterConnection implements TransportConnectionListener, Closeable { +public final class RemoteClusterConnection implements TransportConnectionListener, Closeable { private static final Logger logger = LogManager.getLogger(RemoteClusterConnection.class); @@ -102,8 +103,14 @@ final class RemoteClusterConnection implements TransportConnectionListener, Clos private volatile boolean skipUnavailable; private final ConnectHandler connectHandler; private final TimeValue initialConnectionTimeout; + private final int maxPendingConnectionListeners; + private SetOnce remoteClusterName = new SetOnce<>(); + // this setting is intentionally not registered, it is only used in tests + public static final Setting REMOTE_MAX_PENDING_CONNECTION_LISTENERS = + Setting.intSetting("cluster.remote.max_pending_connection_listeners", 1000, Setting.Property.NodeScope); + /** * Creates a new {@link RemoteClusterConnection} * @param settings the nodes settings object @@ -128,6 +135,7 @@ final class RemoteClusterConnection implements TransportConnectionListener, Clos String proxyAddress, ConnectionManager connectionManager) { this.transportService = transportService; this.maxNumRemoteConnections = maxNumRemoteConnections; + this.maxPendingConnectionListeners = REMOTE_MAX_PENDING_CONNECTION_LISTENERS.get(settings); this.nodePredicate = nodePredicate; this.clusterAlias = clusterAlias; this.connectionManager = connectionManager; @@ -393,14 +401,14 @@ public List>> getSeedNodes() { * There is at most one connect job running at any time. If such a connect job is triggered * while another job is running the provided listeners are queued and batched up until the current running job returns. * - * The handler has a built-in queue that can hold up to 100 connect attempts and will reject requests once the queue is full. + * The handler has a built-in queue that can hold up to 1000 connect attempts and will reject requests once the queue is full. * In a scenario when a remote cluster becomes unavailable we will queue requests up but if we can't connect quick enough * we will just reject the connect trigger which will lead to failing searches. */ private class ConnectHandler implements Closeable { private final Semaphore running = new Semaphore(1); private final AtomicBoolean closed = new AtomicBoolean(false); - private final BlockingQueue> queue = new ArrayBlockingQueue<>(100); + private final BlockingQueue> queue = new ArrayBlockingQueue<>(maxPendingConnectionListeners); private final CancellableThreads cancellableThreads = new CancellableThreads(); /** diff --git a/test/framework/src/main/java/org/elasticsearch/test/InternalSettingsPlugin.java b/test/framework/src/main/java/org/elasticsearch/test/InternalSettingsPlugin.java index f69eac248862f..ef432a3cbdb2a 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/InternalSettingsPlugin.java +++ b/test/framework/src/main/java/org/elasticsearch/test/InternalSettingsPlugin.java @@ -26,6 +26,7 @@ import org.elasticsearch.index.IndexService; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.transport.RemoteClusterConnection; import java.util.Arrays; import java.util.List; @@ -50,6 +51,7 @@ public List> getSettings() { INDEX_CREATION_DATE_SETTING, PROVIDED_NAME_SETTING, TRANSLOG_RETENTION_CHECK_INTERVAL_SETTING, + RemoteClusterConnection.REMOTE_MAX_PENDING_CONNECTION_LISTENERS, IndexService.GLOBAL_CHECKPOINT_SYNC_INTERVAL_SETTING, IndexModule.INDEX_QUERY_CACHE_EVERYTHING_SETTING, IndexSettings.INDEX_TRANSLOG_RETENTION_TOTAL_FILES_SETTING diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrIntegTestCase.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrIntegTestCase.java index 69ee2ac7f19b6..23728cb56f07f 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrIntegTestCase.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrIntegTestCase.java @@ -76,6 +76,7 @@ import org.elasticsearch.test.TestCluster; import org.elasticsearch.test.discovery.TestZenDiscovery; import org.elasticsearch.test.transport.MockTransportService; +import org.elasticsearch.transport.RemoteClusterConnection; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.ccr.CcrSettings; import org.elasticsearch.xpack.ccr.LocalStateCcr; @@ -139,7 +140,11 @@ protected Settings leaderClusterSettings() { } protected Settings followerClusterSettings() { - return Settings.EMPTY; + final Settings.Builder builder = Settings.builder(); + if (randomBoolean()) { + builder.put(RemoteClusterConnection.REMOTE_MAX_PENDING_CONNECTION_LISTENERS.getKey(), randomIntBetween(1, 100)); + } + return builder.build(); } @Before diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrRetentionLeaseIT.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrRetentionLeaseIT.java index b2a485a8445f5..854591d1a715a 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrRetentionLeaseIT.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrRetentionLeaseIT.java @@ -117,6 +117,7 @@ protected Collection> nodePlugins() { @Override protected Settings followerClusterSettings() { return Settings.builder() + .put(super.followerClusterSettings()) .put(CcrRetentionLeases.RETENTION_LEASE_RENEW_INTERVAL_SETTING.getKey(), TimeValue.timeValueMillis(200)) .build(); } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/RestartIndexFollowingIT.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/RestartIndexFollowingIT.java index 4e852aeb031d0..9e6140317f127 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/RestartIndexFollowingIT.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/RestartIndexFollowingIT.java @@ -12,6 +12,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.transport.RemoteClusterConnection; import org.elasticsearch.transport.RemoteConnectionInfo; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.CcrIntegTestCase; @@ -37,6 +38,15 @@ protected boolean configureRemoteClusterViaNodeSettings() { return false; } + @Override + protected Settings followerClusterSettings() { + final Settings.Builder settings = Settings.builder().put(super.followerClusterSettings()); + if (randomBoolean()) { + settings.put(RemoteClusterConnection.REMOTE_MAX_PENDING_CONNECTION_LISTENERS.getKey(), 1); + } + return settings.build(); + } + public void testFollowIndex() throws Exception { final String leaderIndexSettings = getIndexSettings(1, 0, singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true"));