diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java b/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java index 8a66998a47424..f03b3bf273511 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java @@ -41,6 +41,7 @@ import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.unit.TimeValue; @@ -86,7 +87,7 @@ * {@link RemoteClusterService#REMOTE_CONNECTIONS_PER_CLUSTER} until either all eligible nodes are exhausted or the maximum number of * connections per cluster has been reached. */ -final class RemoteClusterConnection implements TransportConnectionListener, Closeable { +public final class RemoteClusterConnection implements TransportConnectionListener, Closeable { private static final Logger logger = LogManager.getLogger(RemoteClusterConnection.class); @@ -102,8 +103,14 @@ final class RemoteClusterConnection implements TransportConnectionListener, Clos private volatile boolean skipUnavailable; private final ConnectHandler connectHandler; private final TimeValue initialConnectionTimeout; + private final int maxPendingConnectionListeners; + private SetOnce remoteClusterName = new SetOnce<>(); + // this setting is intentionally not registered, it is only used in tests + public static final Setting REMOTE_MAX_PENDING_CONNECTION_LISTENERS = + Setting.intSetting("cluster.remote.max_pending_connection_listeners", 1000, Setting.Property.NodeScope); + /** * Creates a new {@link RemoteClusterConnection} * @param settings the nodes settings object @@ -128,6 +135,7 @@ final class RemoteClusterConnection implements TransportConnectionListener, Clos String proxyAddress, ConnectionManager connectionManager) { this.transportService = transportService; this.maxNumRemoteConnections = maxNumRemoteConnections; + this.maxPendingConnectionListeners = REMOTE_MAX_PENDING_CONNECTION_LISTENERS.get(settings); this.nodePredicate = nodePredicate; this.clusterAlias = clusterAlias; this.connectionManager = connectionManager; @@ -393,14 +401,14 @@ public List>> getSeedNodes() { * There is at most one connect job running at any time. If such a connect job is triggered * while another job is running the provided listeners are queued and batched up until the current running job returns. * - * The handler has a built-in queue that can hold up to 100 connect attempts and will reject requests once the queue is full. + * The handler has a built-in queue that can hold up to 1000 connect attempts and will reject requests once the queue is full. * In a scenario when a remote cluster becomes unavailable we will queue requests up but if we can't connect quick enough * we will just reject the connect trigger which will lead to failing searches. */ private class ConnectHandler implements Closeable { private final Semaphore running = new Semaphore(1); private final AtomicBoolean closed = new AtomicBoolean(false); - private final BlockingQueue> queue = new ArrayBlockingQueue<>(100); + private final BlockingQueue> queue = new ArrayBlockingQueue<>(maxPendingConnectionListeners); private final CancellableThreads cancellableThreads = new CancellableThreads(); /** diff --git a/test/framework/src/main/java/org/elasticsearch/test/InternalSettingsPlugin.java b/test/framework/src/main/java/org/elasticsearch/test/InternalSettingsPlugin.java index f69eac248862f..ef432a3cbdb2a 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/InternalSettingsPlugin.java +++ b/test/framework/src/main/java/org/elasticsearch/test/InternalSettingsPlugin.java @@ -26,6 +26,7 @@ import org.elasticsearch.index.IndexService; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.transport.RemoteClusterConnection; import java.util.Arrays; import java.util.List; @@ -50,6 +51,7 @@ public List> getSettings() { INDEX_CREATION_DATE_SETTING, PROVIDED_NAME_SETTING, TRANSLOG_RETENTION_CHECK_INTERVAL_SETTING, + RemoteClusterConnection.REMOTE_MAX_PENDING_CONNECTION_LISTENERS, IndexService.GLOBAL_CHECKPOINT_SYNC_INTERVAL_SETTING, IndexModule.INDEX_QUERY_CACHE_EVERYTHING_SETTING, IndexSettings.INDEX_TRANSLOG_RETENTION_TOTAL_FILES_SETTING diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrIntegTestCase.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrIntegTestCase.java index 69ee2ac7f19b6..23728cb56f07f 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrIntegTestCase.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrIntegTestCase.java @@ -76,6 +76,7 @@ import org.elasticsearch.test.TestCluster; import org.elasticsearch.test.discovery.TestZenDiscovery; import org.elasticsearch.test.transport.MockTransportService; +import org.elasticsearch.transport.RemoteClusterConnection; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.ccr.CcrSettings; import org.elasticsearch.xpack.ccr.LocalStateCcr; @@ -139,7 +140,11 @@ protected Settings leaderClusterSettings() { } protected Settings followerClusterSettings() { - return Settings.EMPTY; + final Settings.Builder builder = Settings.builder(); + if (randomBoolean()) { + builder.put(RemoteClusterConnection.REMOTE_MAX_PENDING_CONNECTION_LISTENERS.getKey(), randomIntBetween(1, 100)); + } + return builder.build(); } @Before diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrRetentionLeaseIT.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrRetentionLeaseIT.java index b2a485a8445f5..854591d1a715a 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrRetentionLeaseIT.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrRetentionLeaseIT.java @@ -117,6 +117,7 @@ protected Collection> nodePlugins() { @Override protected Settings followerClusterSettings() { return Settings.builder() + .put(super.followerClusterSettings()) .put(CcrRetentionLeases.RETENTION_LEASE_RENEW_INTERVAL_SETTING.getKey(), TimeValue.timeValueMillis(200)) .build(); } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/RestartIndexFollowingIT.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/RestartIndexFollowingIT.java index 4e852aeb031d0..9e6140317f127 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/RestartIndexFollowingIT.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/RestartIndexFollowingIT.java @@ -12,6 +12,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.transport.RemoteClusterConnection; import org.elasticsearch.transport.RemoteConnectionInfo; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.CcrIntegTestCase; @@ -37,6 +38,15 @@ protected boolean configureRemoteClusterViaNodeSettings() { return false; } + @Override + protected Settings followerClusterSettings() { + final Settings.Builder settings = Settings.builder().put(super.followerClusterSettings()); + if (randomBoolean()) { + settings.put(RemoteClusterConnection.REMOTE_MAX_PENDING_CONNECTION_LISTENERS.getKey(), 1); + } + return settings.build(); + } + public void testFollowIndex() throws Exception { final String leaderIndexSettings = getIndexSettings(1, 0, singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true"));