Skip to content

Commit

Permalink
Introduce client feature tracking (#31020)
Browse files Browse the repository at this point in the history
This commit introduces the ability for a client to communicate to the
server features that it can support and for these features to be used in
influencing the decisions that the server makes when communicating with
the client. To this end we carry the features from the client to the
underlying stream as we carry the version of the client today. This
enables us to enhance the logic where we make protocol decisions on the
basis of the version on the stream to also make protocol decisions on
the basis of the features on the stream. With such functionality, the
client can communicate to the server if it is a transport client, or if
it has, for example, X-Pack installed. This enables us to support
rolling upgrades from the OSS distribution to the default distribution
without breaking client connectivity as we can now elect to serialize
customs in the cluster state depending on whether or not the client
reports to us using the feature capabilities that it can under these
customs. This means that we would avoid sending a client pieces of the
cluster state that it can not understand. However, we want to take care
and always send the full cluster state during node-to-node communication
as otherwise we would end up with different understanding of what is in
the cluster state across nodes depending on which features they reported
to have. This is why when deciding whether or not to write out a custom
we always send the custom if the client is not a transport client and
otherwise do not send the custom if the client is transport client that
does not report to have the feature required by the custom.

Co-authored-by: Yannick Welsch <[email protected]>
  • Loading branch information
jasontedor and ywelsch committed Jun 1, 2018
1 parent 0794d4a commit ac9c154
Show file tree
Hide file tree
Showing 24 changed files with 883 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,10 @@ else if (readableBytes >= TcpHeader.HEADER_SIZE) {
try (ThreadContext context = new ThreadContext(Settings.EMPTY)) {
context.readHeaders(in);
}
// now we decode the features
if (in.getVersion().onOrAfter(Version.V_6_3_0)) {
in.readStringArray();
}
// now we can decode the action name
sb.append(", action: ").append(in.readString());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,8 @@ public abstract class TransportClient extends AbstractClient {
public static final Setting<Boolean> CLIENT_TRANSPORT_SNIFF =
Setting.boolSetting("client.transport.sniff", false, Setting.Property.NodeScope);

public static final String TRANSPORT_CLIENT_FEATURE = "transport_client";

private static PluginsService newPluginService(final Settings settings, Collection<Class<? extends Plugin>> plugins) {
final Settings.Builder settingsBuilder = Settings.builder()
.put(TcpTransport.PING_SCHEDULE.getKey(), "5s") // enable by default the transport schedule ping interval
Expand Down Expand Up @@ -128,7 +130,12 @@ private static ClientTemplate buildTemplate(Settings providedSettings, Settings
providedSettings = Settings.builder().put(providedSettings).put(Node.NODE_NAME_SETTING.getKey(), "_client_").build();
}
final PluginsService pluginsService = newPluginService(providedSettings, plugins);
final Settings settings = Settings.builder().put(defaultSettings).put(pluginsService.updatedSettings()).build();
final Settings settings =
Settings.builder()
.put(defaultSettings)
.put(pluginsService.updatedSettings())
.put(TcpTransport.FEATURE_PREFIX + "." + TRANSPORT_CLIENT_FEATURE, true)
.build();
final List<Closeable> resourcesToClose = new ArrayList<>();
final ThreadPool threadPool = new ThreadPool(settings);
resourcesToClose.add(() -> ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS));
Expand Down
66 changes: 61 additions & 5 deletions server/src/main/java/org/elasticsearch/cluster/ClusterState.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.carrotsearch.hppc.cursors.ObjectCursor;
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;

import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.cluster.block.ClusterBlock;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.metadata.IndexMetaData;
Expand Down Expand Up @@ -61,6 +62,7 @@
import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.Set;

/**
Expand Down Expand Up @@ -90,7 +92,51 @@ public class ClusterState implements ToXContentFragment, Diffable<ClusterState>

public static final ClusterState EMPTY_STATE = builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)).build();

public interface Custom extends NamedDiffable<Custom>, ToXContentFragment {
/**
* An interface that implementors use when a class requires a client to maybe have a feature.
*/
public interface FeatureAware {

/**
* An optional feature that is required for the client to have.
*
* @return an empty optional if no feature is required otherwise a string representing the required feature
*/
default Optional<String> getRequiredFeature() {
return Optional.empty();
}

/**
* Tests whether or not the custom should be serialized. The criteria are:
* <ul>
* <li>the output stream must be at least the minimum supported version of the custom</li>
* <li>the output stream must have the feature required by the custom (if any) or not be a transport client</li>
* </ul>
* <p>
* That is, we only serialize customs to clients than can understand the custom based on the version of the client and the features
* that the client has. For transport clients we can be lenient in requiring a feature in which case we do not send the custom but
* for connected nodes we always require that the node has the required feature.
*
* @param out the output stream
* @param custom the custom to serialize
* @param <T> the type of the custom
* @return true if the custom should be serialized and false otherwise
*/
static <T extends NamedDiffable & FeatureAware> boolean shouldSerializeCustom(final StreamOutput out, final T custom) {
if (out.getVersion().before(custom.getMinimalSupportedVersion())) {
return false;
}
if (custom.getRequiredFeature().isPresent()) {
final String requiredFeature = custom.getRequiredFeature().get();
// if it is a transport client we are lenient yet for a connected node it must have the required feature
return out.hasFeature(requiredFeature) || out.hasFeature(TransportClient.TRANSPORT_CLIENT_FEATURE) == false;
}
return true;
}

}

public interface Custom extends NamedDiffable<Custom>, ToXContentFragment, FeatureAware {

/**
* Returns <code>true</code> iff this {@link Custom} is private to the cluster and should never be send to a client.
Expand All @@ -99,6 +145,7 @@ public interface Custom extends NamedDiffable<Custom>, ToXContentFragment {
default boolean isPrivate() {
return false;
}

}

private static final NamedDiffableValueSerializer<Custom> CUSTOM_VALUE_SERIALIZER = new NamedDiffableValueSerializer<>(Custom.class);
Expand Down Expand Up @@ -244,6 +291,15 @@ public String toString() {
sb.append("isa_ids ").append(indexMetaData.inSyncAllocationIds(shard)).append("\n");
}
}
if (metaData.customs().isEmpty() == false) {
sb.append("metadata customs:\n");
for (final ObjectObjectCursor<String, MetaData.Custom> cursor : metaData.customs()) {
final String type = cursor.key;
final MetaData.Custom custom = cursor.value;
sb.append(TAB).append(type).append(": ").append(custom);
}
sb.append("\n");
}
sb.append(blocks());
sb.append(nodes());
sb.append(routingTable());
Expand Down Expand Up @@ -688,14 +744,14 @@ public void writeTo(StreamOutput out) throws IOException {
blocks.writeTo(out);
// filter out custom states not supported by the other node
int numberOfCustoms = 0;
for (ObjectCursor<Custom> cursor : customs.values()) {
if (out.getVersion().onOrAfter(cursor.value.getMinimalSupportedVersion())) {
for (final ObjectCursor<Custom> cursor : customs.values()) {
if (FeatureAware.shouldSerializeCustom(out, cursor.value)) {
numberOfCustoms++;
}
}
out.writeVInt(numberOfCustoms);
for (ObjectCursor<Custom> cursor : customs.values()) {
if (out.getVersion().onOrAfter(cursor.value.getMinimalSupportedVersion())) {
for (final ObjectCursor<Custom> cursor : customs.values()) {
if (FeatureAware.shouldSerializeCustom(out, cursor.value)) {
out.writeNamedWriteable(cursor.value);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.util.CollectionUtil;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterState.FeatureAware;
import org.elasticsearch.cluster.Diff;
import org.elasticsearch.cluster.Diffable;
import org.elasticsearch.cluster.DiffableUtils;
Expand Down Expand Up @@ -117,9 +119,10 @@ public enum XContentContext {
*/
public static EnumSet<XContentContext> ALL_CONTEXTS = EnumSet.allOf(XContentContext.class);

public interface Custom extends NamedDiffable<Custom>, ToXContentFragment {
public interface Custom extends NamedDiffable<Custom>, ToXContentFragment, ClusterState.FeatureAware {

EnumSet<XContentContext> context();

}

public static final Setting<Boolean> SETTING_READ_ONLY_SETTING =
Expand Down Expand Up @@ -789,14 +792,14 @@ public void writeTo(StreamOutput out) throws IOException {
}
// filter out custom states not supported by the other node
int numberOfCustoms = 0;
for (ObjectCursor<Custom> cursor : customs.values()) {
if (out.getVersion().onOrAfter(cursor.value.getMinimalSupportedVersion())) {
for (final ObjectCursor<Custom> cursor : customs.values()) {
if (FeatureAware.shouldSerializeCustom(out, cursor.value)) {
numberOfCustoms++;
}
}
out.writeVInt(numberOfCustoms);
for (ObjectCursor<Custom> cursor : customs.values()) {
if (out.getVersion().onOrAfter(cursor.value.getMinimalSupportedVersion())) {
for (final ObjectCursor<Custom> cursor : customs.values()) {
if (FeatureAware.shouldSerializeCustom(out, cursor.value)) {
out.writeNamedWriteable(cursor.value);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
import org.apache.lucene.util.BytesRefBuilder;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.geo.GeoPoint;
Expand Down Expand Up @@ -57,10 +59,12 @@
import java.util.Date;
import java.util.EnumMap;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.IntFunction;

Expand Down Expand Up @@ -97,6 +101,7 @@ public abstract class StreamOutput extends OutputStream {
}

private Version version = Version.CURRENT;
private Set<String> features = Collections.emptySet();

/**
* The version of the node on the other side of this stream.
Expand All @@ -112,6 +117,27 @@ public void setVersion(Version version) {
this.version = version;
}

/**
* Test if the stream has the specified feature. Features are used when serializing {@link ClusterState.Custom} or
* {@link MetaData.Custom}; see also {@link ClusterState.FeatureAware}.
*
* @param feature the feature to test
* @return true if the stream has the specified feature
*/
public boolean hasFeature(final String feature) {
return this.features.contains(feature);
}

/**
* Set the features on the stream. See {@link StreamOutput#hasFeature(String)}.
*
* @param features the features on the stream
*/
public void setFeatures(final Set<String> features) {
assert this.features.isEmpty() : this.features;
this.features = Collections.unmodifiableSet(new HashSet<>(features));
}

public long position() throws IOException {
throw new UnsupportedOperationException();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -383,6 +383,7 @@ public void apply(Settings value, Settings current, Settings previous) {
ClusterModule.SHARDS_ALLOCATOR_TYPE_SETTING,
EsExecutors.PROCESSORS_SETTING,
ThreadContext.DEFAULT_HEADERS_SETTING,
TcpTransport.DEFAULT_FEATURES_SETTING,
Loggers.LOG_DEFAULT_LEVEL_SETTING,
Loggers.LOG_LEVEL_SETTING,
NodeEnvironment.MAX_LOCAL_STORAGE_NODES_SETTING,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.function.Predicate;
import java.util.stream.Collectors;
Expand Down
13 changes: 13 additions & 0 deletions server/src/main/java/org/elasticsearch/plugins/Plugin.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.elasticsearch.bootstrap.BootstrapCheck;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterModule;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.IndexTemplateMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
Expand Down Expand Up @@ -56,6 +57,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.UnaryOperator;

/**
Expand All @@ -79,6 +81,17 @@
*/
public abstract class Plugin implements Closeable {

/**
* A feature exposed by the plugin. This should be used if a plugin exposes {@link ClusterState.Custom} or {@link MetaData.Custom}; see
* also {@link ClusterState.FeatureAware}.
*
* @return a feature set represented by this plugin, or the empty optional if the plugin does not expose cluster state or metadata
* customs
*/
protected Optional<String> getFeature() {
return Optional.empty();
}

/**
* Node level guice modules.
*/
Expand Down
21 changes: 21 additions & 0 deletions server/src/main/java/org/elasticsearch/plugins/PluginsService.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.IndexModule;
import org.elasticsearch.threadpool.ExecutorBuilder;
import org.elasticsearch.transport.TcpTransport;

import java.io.IOException;
import java.lang.reflect.Constructor;
Expand All @@ -63,7 +64,9 @@
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.TreeMap;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
Expand Down Expand Up @@ -200,6 +203,7 @@ private static void logPluginInfo(final List<PluginInfo> pluginInfos, final Stri

public Settings updatedSettings() {
Map<String, String> foundSettings = new HashMap<>();
final Map<String, String> features = new TreeMap<>();
final Settings.Builder builder = Settings.builder();
for (Tuple<PluginInfo, Plugin> plugin : plugins) {
Settings settings = plugin.v2().additionalSettings();
Expand All @@ -211,6 +215,23 @@ public Settings updatedSettings() {
}
}
builder.put(settings);
final Optional<String> maybeFeature = plugin.v2().getFeature();
if (maybeFeature.isPresent()) {
final String feature = maybeFeature.get();
if (features.containsKey(feature)) {
final String message = String.format(
Locale.ROOT,
"duplicate feature [%s] in plugin [%s], already added in [%s]",
feature,
plugin.v1().getName(),
features.get(feature));
throw new IllegalArgumentException(message);
}
features.put(feature, plugin.v1().getName());
}
}
for (final String feature : features.keySet()) {
builder.put(TcpTransport.FEATURE_PREFIX + "." + feature, true);
}
return builder.put(this.settings).build();
}
Expand Down
Loading

0 comments on commit ac9c154

Please sign in to comment.