Skip to content

Commit

Permalink
Rebuild remote connections on profile changes (elastic#37678)
Browse files Browse the repository at this point in the history
Currently remote compression and ping schedule settings are dynamic.
However, we do not listen for changes. This commit adds listeners for
changes to those two settings. Additionally, when those settings change
we now close existing connections and open new ones with the settings
applied.

Fixes elastic#37201.
  • Loading branch information
Tim-Brooks authored Feb 19, 2019
1 parent 237d755 commit a5cbef9
Show file tree
Hide file tree
Showing 16 changed files with 342 additions and 133 deletions.
74 changes: 55 additions & 19 deletions docs/reference/modules/remote-clusters.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -27,39 +27,39 @@ more _gateway nodes_ and uses them to federate requests to the remote cluster.

[float]
[[configuring-remote-clusters]]
=== Configuring Remote Clusters
=== Configuring remote clusters

Remote clusters can be specified globally using
<<cluster-update-settings,cluster settings>> (which can be updated dynamically),
or local to individual nodes using the `elasticsearch.yml` file.
You can configure remote clusters globally by using
<<cluster-update-settings,cluster settings>>, which you can update dynamically.
Alternatively, you can configure them locally on individual nodes by using the `elasticsearch.yml` file.

If a remote cluster is configured via `elasticsearch.yml` only the nodes with
that configuration will be able to connect to the remote cluster. In other
words, functionality that relies on remote cluster requests will have to be
driven specifically from those nodes. Remote clusters set via the
<<cluster-update-settings,cluster settings API>> will be available on every node
in the cluster.

The `elasticsearch.yml` config file for a node that connects to remote clusters
needs to list the remote clusters that should be connected to, for instance:
If you specify the settings in `elasticsearch.yml` files, only the nodes with
those settings can connect to the remote cluster. In other words, functionality
that relies on remote cluster requests must be driven specifically from those
nodes. For example:

[source,yaml]
--------------------------------
cluster:
remote:
cluster_one: <1>
seeds: 127.0.0.1:9300
cluster_two: <1>
transport.ping_schedule: 30s <2>
cluster_two:
seeds: 127.0.0.1:9301
transport.compress: true <3>
--------------------------------
<1> `cluster_one` and `cluster_two` are arbitrary _cluster aliases_ representing
the connection to each cluster. These names are subsequently used to distinguish
between local and remote indices.
<2> A keep-alive ping is configured for `cluster_one`.
<3> Compression is explicitly enabled for requests to `cluster_two`.

For more information about the optional transport settings, see
<<modules-transport>>.

The equivalent example using the <<cluster-update-settings,cluster settings
API>> to add remote clusters to all nodes in the cluster would look like the
following:
If you use <<cluster-update-settings,cluster settings>>, the remote clusters are available on every node in the cluster. For example:

[source,js]
--------------------------------
Expand All @@ -71,12 +71,14 @@ PUT _cluster/settings
"cluster_one": {
"seeds": [
"127.0.0.1:9300"
]
],
"transport.ping_schedule": "30s"
},
"cluster_two": {
"seeds": [
"127.0.0.1:9301"
]
],
"transport.compress": true
},
"cluster_three": {
"seeds": [
Expand All @@ -92,6 +94,40 @@ PUT _cluster/settings
// TEST[setup:host]
// TEST[s/127.0.0.1:9300/\${transport_host}/]

You can dynamically update the compression and ping schedule settings. However,
you must re-include seeds in the settings update request. For example:

[source,js]
--------------------------------
PUT _cluster/settings
{
"persistent": {
"cluster": {
"remote": {
"cluster_one": {
"seeds": [
"127.0.0.1:9300"
],
"transport.ping_schedule": "60s"
},
"cluster_two": {
"seeds": [
"127.0.0.1:9301"
],
"transport.compress": false
}
}
}
}
}
--------------------------------
// CONSOLE
// TEST[continued]

NOTE: When the compression or ping schedule settings change, all the existing
node connections must close and re-open, which can cause in-flight requests to
fail.

A remote cluster can be deleted from the cluster settings by setting its seeds
to `null`:

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,16 +30,13 @@
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.util.concurrent.KeyedLock;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.threadpool.ThreadPool;

import java.io.Closeable;
import java.io.IOException;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
Expand All @@ -56,19 +53,17 @@ public class ConnectionManager implements Closeable {
private final ConcurrentMap<DiscoveryNode, Transport.Connection> connectedNodes = ConcurrentCollections.newConcurrentMap();
private final KeyedLock<String> connectionLock = new KeyedLock<>();
private final Transport transport;
private final ThreadPool threadPool;
private final ConnectionProfile defaultProfile;
private final AtomicBoolean isClosed = new AtomicBoolean(false);
private final ReadWriteLock closeLock = new ReentrantReadWriteLock();
private final DelegatingNodeConnectionListener connectionListener = new DelegatingNodeConnectionListener();

public ConnectionManager(Settings settings, Transport transport, ThreadPool threadPool) {
this(ConnectionProfile.buildDefaultConnectionProfile(settings), transport, threadPool);
public ConnectionManager(Settings settings, Transport transport) {
this(ConnectionProfile.buildDefaultConnectionProfile(settings), transport);
}

public ConnectionManager(ConnectionProfile connectionProfile, Transport transport, ThreadPool threadPool) {
public ConnectionManager(ConnectionProfile connectionProfile, Transport transport) {
this.transport = transport;
this.threadPool = threadPool;
this.defaultProfile = connectionProfile;
}

Expand Down Expand Up @@ -185,35 +180,23 @@ public int size() {

@Override
public void close() {
Transports.assertNotTransportThread("Closing ConnectionManager");
if (isClosed.compareAndSet(false, true)) {
CountDownLatch latch = new CountDownLatch(1);

// TODO: Consider moving all read/write lock (in Transport and this class) to the TransportService
threadPool.generic().execute(() -> {
closeLock.writeLock().lock();
try {
// we are holding a write lock so nobody adds to the connectedNodes / openConnections map - it's safe to first close
// all instances and then clear them maps
Iterator<Map.Entry<DiscoveryNode, Transport.Connection>> iterator = connectedNodes.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<DiscoveryNode, Transport.Connection> next = iterator.next();
try {
IOUtils.closeWhileHandlingException(next.getValue());
} finally {
iterator.remove();
}
closeLock.writeLock().lock();
try {
// we are holding a write lock so nobody adds to the connectedNodes / openConnections map - it's safe to first close
// all instances and then clear them maps
Iterator<Map.Entry<DiscoveryNode, Transport.Connection>> iterator = connectedNodes.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<DiscoveryNode, Transport.Connection> next = iterator.next();
try {
IOUtils.closeWhileHandlingException(next.getValue());
} finally {
iterator.remove();
}
} finally {
closeLock.writeLock().unlock();
latch.countDown();
}
});

try {
latch.await(30, TimeUnit.SECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
// ignore
} finally {
closeLock.writeLock().unlock();
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.elasticsearch.common.settings.SettingUpgrader;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.unit.TimeValue;

import java.net.InetAddress;
import java.net.InetSocketAddress;
Expand Down Expand Up @@ -283,21 +284,38 @@ protected Map<String, List<String>> groupClusterIndices(Set<String> remoteCluste
return perClusterIndices;
}

void updateRemoteCluster(String clusterAlias, List<String> addresses, String proxy) {
Boolean compress = TransportSettings.TRANSPORT_COMPRESS.get(settings);
TimeValue pingSchedule = TransportSettings.PING_SCHEDULE.get(settings);
updateRemoteCluster(clusterAlias, addresses, proxy, compress, pingSchedule);
}

void updateRemoteCluster(String clusterAlias, Settings settings) {
String proxy = REMOTE_CLUSTERS_PROXY.getConcreteSettingForNamespace(clusterAlias).get(settings);
List<String> addresses = REMOTE_CLUSTERS_SEEDS.getConcreteSettingForNamespace(clusterAlias).get(settings);
Boolean compress = RemoteClusterService.REMOTE_CLUSTER_COMPRESS.getConcreteSettingForNamespace(clusterAlias).get(settings);
TimeValue pingSchedule = RemoteClusterService.REMOTE_CLUSTER_PING_SCHEDULE
.getConcreteSettingForNamespace(clusterAlias)
.get(settings);

updateRemoteCluster(clusterAlias, addresses, proxy, compress, pingSchedule);
}

/**
* Subclasses must implement this to receive information about updated cluster aliases. If the given address list is
* empty the cluster alias is unregistered and should be removed.
*/
protected abstract void updateRemoteCluster(String clusterAlias, List<String> addresses, String proxy);
protected abstract void updateRemoteCluster(String clusterAlias, List<String> addresses, String proxy, boolean compressionEnabled,
TimeValue pingSchedule);

/**
* Registers this instance to listen to updates on the cluster settings.
*/
public void listenForUpdates(ClusterSettings clusterSettings) {
clusterSettings.addAffixUpdateConsumer(
RemoteClusterAware.REMOTE_CLUSTERS_PROXY,
RemoteClusterAware.REMOTE_CLUSTERS_SEEDS,
(key, value) -> updateRemoteCluster(key, value.v2(), value.v1()),
(namespace, value) -> {});
List<Setting.AffixSetting<?>> remoteClusterSettings = Arrays.asList(RemoteClusterAware.REMOTE_CLUSTERS_PROXY,
RemoteClusterAware.REMOTE_CLUSTERS_SEEDS, RemoteClusterService.REMOTE_CLUSTER_COMPRESS,
RemoteClusterService.REMOTE_CLUSTER_PING_SCHEDULE);
clusterSettings.addAffixGroupUpdateConsumer(remoteClusterSettings, this::updateRemoteCluster);
clusterSettings.addAffixUpdateConsumer(
RemoteClusterAware.SEARCH_REMOTE_CLUSTERS_PROXY,
RemoteClusterAware.SEARCH_REMOTE_CLUSTERS_SEEDS,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,6 @@
import java.util.function.Supplier;
import java.util.stream.Collectors;

import static org.elasticsearch.transport.RemoteClusterService.REMOTE_CLUSTER_COMPRESS;
import static org.elasticsearch.transport.RemoteClusterService.REMOTE_CLUSTER_PING_SCHEDULE;

/**
* Represents a connection to a single remote cluster. In contrast to a local cluster a remote cluster is not joined such that the
* current node is part of the cluster and it won't receive cluster state updates from the remote cluster. Remote clusters are also not
Expand Down Expand Up @@ -107,12 +104,13 @@ final class RemoteClusterConnection implements TransportConnectionListener, Clos
* @param maxNumRemoteConnections the maximum number of connections to the remote cluster
* @param nodePredicate a predicate to filter eligible remote nodes to connect to
* @param proxyAddress the proxy address
* @param connectionProfile the connection profile to use
*/
RemoteClusterConnection(Settings settings, String clusterAlias, List<Tuple<String, Supplier<DiscoveryNode>>> seedNodes,
TransportService transportService, int maxNumRemoteConnections, Predicate<DiscoveryNode> nodePredicate,
String proxyAddress) {
String proxyAddress, ConnectionProfile connectionProfile) {
this(settings, clusterAlias, seedNodes, transportService, maxNumRemoteConnections, nodePredicate, proxyAddress,
createConnectionManager(settings, clusterAlias, transportService));
createConnectionManager(connectionProfile, transportService));
}

// Public for tests to pass a StubbableConnectionManager
Expand Down Expand Up @@ -309,13 +307,23 @@ Transport.Connection getConnection() {

@Override
public void close() throws IOException {
IOUtils.close(connectHandler, connectionManager);
IOUtils.close(connectHandler);
// In the ConnectionManager we wait on connections being closed.
threadPool.generic().execute(connectionManager::close);
}

public boolean isClosed() {
return connectHandler.isClosed();
}

public String getProxyAddress() {
return proxyAddress;
}

public List<Tuple<String, Supplier<DiscoveryNode>>> getSeedNodes() {
return seedNodes;
}

/**
* The connect handler manages node discovery and the actual connect to the remote cluster.
* There is at most one connect job running at any time. If such a connect job is triggered
Expand Down Expand Up @@ -697,18 +705,8 @@ private synchronized void ensureIteratorAvailable() {
}
}

private static ConnectionManager createConnectionManager(Settings settings, String clusterAlias, TransportService transportService) {
ConnectionProfile.Builder builder = new ConnectionProfile.Builder()
.setConnectTimeout(TransportSettings.CONNECT_TIMEOUT.get(settings))
.setHandshakeTimeout(TransportSettings.CONNECT_TIMEOUT.get(settings))
.addConnections(6, TransportRequestOptions.Type.REG, TransportRequestOptions.Type.PING) // TODO make this configurable?
// we don't want this to be used for anything else but search
.addConnections(0, TransportRequestOptions.Type.BULK,
TransportRequestOptions.Type.STATE,
TransportRequestOptions.Type.RECOVERY)
.setCompressionEnabled(REMOTE_CLUSTER_COMPRESS.getConcreteSettingForNamespace(clusterAlias).get(settings))
.setPingInterval(REMOTE_CLUSTER_PING_SCHEDULE.getConcreteSettingForNamespace(clusterAlias).get(settings));
return new ConnectionManager(builder.build(), transportService.transport, transportService.threadPool);
private static ConnectionManager createConnectionManager(ConnectionProfile connectionProfile, TransportService transportService) {
return new ConnectionManager(connectionProfile, transportService.transport);
}

ConnectionManager getConnectionManager() {
Expand Down
Loading

0 comments on commit a5cbef9

Please sign in to comment.