From 9e5aefcbc90d8a94fbff0476bed2d67b41fbb418 Mon Sep 17 00:00:00 2001 From: gmarouli Date: Mon, 18 Dec 2023 17:40:47 +0200 Subject: [PATCH 01/14] Add the failure store parameter to `GET /{index}` --- .../org/elasticsearch/TransportVersions.java | 1 + .../elasticsearch/action/IndicesRequest.java | 9 ++ .../action/support/DataStreamOptions.java | 129 ++++++++++++++++++ .../master/info/ClusterInfoRequest.java | 19 +++ .../cluster/metadata/DataStream.java | 5 + .../metadata/IndexNameExpressionResolver.java | 101 ++++++++++++-- .../admin/indices/RestGetIndicesAction.java | 5 + 7 files changed, 261 insertions(+), 8 deletions(-) create mode 100644 server/src/main/java/org/elasticsearch/action/support/DataStreamOptions.java diff --git a/server/src/main/java/org/elasticsearch/TransportVersions.java b/server/src/main/java/org/elasticsearch/TransportVersions.java index 625871d25734b..f64eadcf60e15 100644 --- a/server/src/main/java/org/elasticsearch/TransportVersions.java +++ b/server/src/main/java/org/elasticsearch/TransportVersions.java @@ -193,6 +193,7 @@ static TransportVersion def(int id) { public static final TransportVersion ENRICH_ELASTICSEARCH_VERSION_REMOVED = def(8_560_00_0); public static final TransportVersion NODE_STATS_REQUEST_SIMPLIFIED = def(8_561_00_0); public static final TransportVersion TEXT_EXPANSION_TOKEN_PRUNING_CONFIG_ADDED = def(8_562_00_0); + public static final TransportVersion ADD_DATA_STREAM_OPTIONS = def(8_563_00_0); /* * STOP! READ THIS FIRST! No, really, diff --git a/server/src/main/java/org/elasticsearch/action/IndicesRequest.java b/server/src/main/java/org/elasticsearch/action/IndicesRequest.java index e9dfda3954c93..7216f352681b8 100644 --- a/server/src/main/java/org/elasticsearch/action/IndicesRequest.java +++ b/server/src/main/java/org/elasticsearch/action/IndicesRequest.java @@ -8,6 +8,7 @@ package org.elasticsearch.action; +import org.elasticsearch.action.support.DataStreamOptions; import org.elasticsearch.action.support.IndicesOptions; /** @@ -29,6 +30,14 @@ public interface IndicesRequest { */ IndicesOptions indicesOptions(); + /** + * Returns the data stream options used to resolve indices. They tell for instance whether we need to only expand + * to the normal backing indices or also to the failure store. + */ + default DataStreamOptions dataStreamOptions() { + return DataStreamOptions.EXCLUDE_FAILURE_STORE; + } + /** * Determines whether the request should be applied to data streams. When {@code false}, none of the names or * wildcard expressions in {@link #indices} should be applied to or expanded to any data streams. All layers diff --git a/server/src/main/java/org/elasticsearch/action/support/DataStreamOptions.java b/server/src/main/java/org/elasticsearch/action/support/DataStreamOptions.java new file mode 100644 index 0000000000000..7575ac463504a --- /dev/null +++ b/server/src/main/java/org/elasticsearch/action/support/DataStreamOptions.java @@ -0,0 +1,129 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ +package org.elasticsearch.action.support; + +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.core.Nullable; +import org.elasticsearch.rest.RestRequest; +import org.elasticsearch.xcontent.ParseField; +import org.elasticsearch.xcontent.ToXContentFragment; +import org.elasticsearch.xcontent.XContentBuilder; +import org.elasticsearch.xcontent.XContentParser; + +import java.io.IOException; +import java.util.Locale; +import java.util.Map; + +/** + * Controls how to deal with the backing indices of data streams. Currently, it handles normal backing indices and the failure + * store indices. + */ +public record DataStreamOptions(FailureStore failureStore) implements ToXContentFragment { + + public enum FailureStore { + INCLUDE, + EXCLUDE, + ONLY; + + public static FailureStore parseParameter(String value, FailureStore defaultState) { + if (value == null) { + return defaultState; + } + + return switch (value) { + case "include" -> INCLUDE; + case "exclude" -> EXCLUDE; + case "only" -> ONLY; + default -> throw new IllegalArgumentException("Value [" + value + "] is not a valid failure store option"); + }; + } + + String toXContentValue() { + return toString().toLowerCase(Locale.ROOT); + } + + public static boolean includeNormalIndices(@Nullable FailureStore failureStore) { + return failureStore == null || failureStore.equals(ONLY) == false; + } + + public static boolean includeFailureIndices(@Nullable FailureStore failureStore) { + return failureStore != null && failureStore.equals(EXCLUDE) == false; + } + } + + public static final DataStreamOptions INCLUDE_FAILURE_STORE = new DataStreamOptions(FailureStore.INCLUDE); + public static final DataStreamOptions EXCLUDE_FAILURE_STORE = new DataStreamOptions(FailureStore.EXCLUDE); + public static final DataStreamOptions ONLY_FAILURE_STORE = new DataStreamOptions(FailureStore.ONLY); + + public boolean includeNormalIndices() { + return FailureStore.includeNormalIndices(failureStore); + } + + public boolean includeFailureIndices() { + return FailureStore.includeFailureIndices(failureStore); + } + + public void writeDataStreamOptions(StreamOutput out) throws IOException { + out.writeEnum(failureStore); + } + + public static DataStreamOptions readDataStreamOptions(StreamInput in) throws IOException { + return new DataStreamOptions(in.readEnum(FailureStore.class)); + } + + public static DataStreamOptions fromRequest(RestRequest request, DataStreamOptions defaultSettings) { + return fromParameters(request.param("failure_store"), defaultSettings); + } + + public static DataStreamOptions fromMap(Map map, DataStreamOptions defaultSettings) { + return fromParameters( + (String) (map.containsKey("failure_store") ? map.get("failure_store") : map.get("failureStore")), + defaultSettings + ); + } + + /** + * Returns true if the name represents a valid name for one of the indices option + * false otherwise + */ + public static boolean isDataStreamOptions(String name) { + return "failure_store".equals(name); + } + + public static DataStreamOptions fromParameters(String failureStoreString, DataStreamOptions defaultSettings) { + if (failureStoreString == null) { + return defaultSettings; + } + + return new DataStreamOptions(FailureStore.parseParameter(failureStoreString, defaultSettings.failureStore)); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + if (failureStore != null) { + builder.field(FAILURE_STORE_FIELD.getPreferredName(), failureStore.toXContentValue()); + } + return builder; + } + + private static final ParseField FAILURE_STORE_FIELD = new ParseField("failure_store"); + + public static DataStreamOptions fromXContent(XContentParser parser) throws IOException { + return fromXContent(parser, null); + } + + public static DataStreamOptions fromXContent(XContentParser parser, @Nullable DataStreamOptions defaults) throws IOException { + throw new IllegalStateException("Not implemented yet [mgouseti]"); + } + + @Override + public String toString() { + return "DataStreamOptions{" + "failureStore=" + failureStore + '}'; + } +} diff --git a/server/src/main/java/org/elasticsearch/action/support/master/info/ClusterInfoRequest.java b/server/src/main/java/org/elasticsearch/action/support/master/info/ClusterInfoRequest.java index 22f0da70137af..be9013d6dc390 100644 --- a/server/src/main/java/org/elasticsearch/action/support/master/info/ClusterInfoRequest.java +++ b/server/src/main/java/org/elasticsearch/action/support/master/info/ClusterInfoRequest.java @@ -10,6 +10,7 @@ import org.elasticsearch.TransportVersions; import org.elasticsearch.action.IndicesRequest; +import org.elasticsearch.action.support.DataStreamOptions; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.action.support.master.MasterNodeReadRequest; import org.elasticsearch.common.Strings; @@ -25,6 +26,7 @@ public abstract class ClusterInfoRequest 1 && context.getOptions().allowAliasesToMultipleIndices() == false) { + if (resolvesToMoreThanOneIndices(indexAbstraction, context) + && context.getOptions().allowAliasesToMultipleIndices() == false) { String[] indexNames = new String[indexAbstraction.getIndices().size()]; int i = 0; for (Index indexName : indexAbstraction.getIndices()) { @@ -380,9 +398,29 @@ Index[] concreteIndices(Context context, String... indexExpressions) { ); } - for (Index index : indexAbstraction.getIndices()) { - if (shouldTrackConcreteIndex(context, context.getOptions(), index)) { - concreteIndicesResult.add(index); + if (indexAbstraction.getType() == Type.DATA_STREAM) { + DataStream dataStream = (DataStream) indexAbstraction; + if (context.dataStreamOptions.includeNormalIndices()) { + for (Index index : indexAbstraction.getIndices()) { + if (shouldTrackConcreteIndex(context, context.getOptions(), index)) { + concreteIndicesResult.add(index); + } + } + } + if (DataStream.isFailureStoreEnabled() + && dataStream.isFailureStore() + && context.dataStreamOptions.includeFailureIndices()) { + for (Index index : dataStream.getFailureIndices()) { + if (shouldTrackConcreteIndex(context, context.getOptions(), index)) { + concreteIndicesResult.add(index); + } + } + } + } else { + for (Index index : indexAbstraction.getIndices()) { + if (shouldTrackConcreteIndex(context, context.getOptions(), index)) { + concreteIndicesResult.add(index); + } } } } @@ -395,6 +433,21 @@ Index[] concreteIndices(Context context, String... indexExpressions) { return concreteIndicesResult.toArray(Index.EMPTY_ARRAY); } + private static boolean resolvesToMoreThanOneIndices(IndexAbstraction indexAbstraction, Context context) { + if (indexAbstraction.getType() == Type.DATA_STREAM) { + DataStream dataStream = (DataStream) indexAbstraction; + int count = 0; + if (DataStream.isFailureStoreEnabled() == false || context.dataStreamOptions.includeNormalIndices()) { + count += dataStream.getIndices().size(); + } + if (DataStream.isFailureStoreEnabled() && dataStream.isFailureStore() && context.dataStreamOptions.includeFailureIndices()) { + count += dataStream.getFailureIndices().size(); + } + return count > 1; + } + return indexAbstraction.getIndices().size() > 1; + } + private void checkSystemIndexAccess(Context context, Set concreteIndices) { final Predicate systemIndexAccessPredicate = context.getSystemIndexAccessPredicate(); if (systemIndexAccessPredicate == ALWAYS_TRUE) { @@ -971,6 +1024,7 @@ public static class Context { private final ClusterState state; private final IndicesOptions options; + private final DataStreamOptions dataStreamOptions; private final long startTime; private final boolean preserveAliases; private final boolean resolveToWriteIndex; @@ -1014,6 +1068,33 @@ public static class Context { this( state, options, + DataStreamOptions.EXCLUDE_FAILURE_STORE, + System.currentTimeMillis(), + preserveAliases, + resolveToWriteIndex, + includeDataStreams, + false, + systemIndexAccessLevel, + systemIndexAccessPredicate, + netNewSystemIndexPredicate + ); + } + + Context( + ClusterState state, + IndicesOptions options, + DataStreamOptions dataStreamOptions, + boolean preserveAliases, + boolean resolveToWriteIndex, + boolean includeDataStreams, + SystemIndexAccessLevel systemIndexAccessLevel, + Predicate systemIndexAccessPredicate, + Predicate netNewSystemIndexPredicate + ) { + this( + state, + options, + dataStreamOptions, System.currentTimeMillis(), preserveAliases, resolveToWriteIndex, @@ -1039,6 +1120,7 @@ public static class Context { this( state, options, + DataStreamOptions.EXCLUDE_FAILURE_STORE, System.currentTimeMillis(), preserveAliases, resolveToWriteIndex, @@ -1061,6 +1143,7 @@ public static class Context { this( state, options, + DataStreamOptions.EXCLUDE_FAILURE_STORE, startTime, false, false, @@ -1075,6 +1158,7 @@ public static class Context { protected Context( ClusterState state, IndicesOptions options, + DataStreamOptions dataStreamOptions, long startTime, boolean preserveAliases, boolean resolveToWriteIndex, @@ -1086,6 +1170,7 @@ protected Context( ) { this.state = state; this.options = options; + this.dataStreamOptions = dataStreamOptions; this.startTime = startTime; this.preserveAliases = preserveAliases; this.resolveToWriteIndex = resolveToWriteIndex; @@ -1722,7 +1807,7 @@ public ResolverContext() { } public ResolverContext(long startTime) { - super(null, null, startTime, false, false, false, false, SystemIndexAccessLevel.ALL, name -> false, name -> false); + super(null, null, null, startTime, false, false, false, false, SystemIndexAccessLevel.ALL, name -> false, name -> false); } @Override diff --git a/server/src/main/java/org/elasticsearch/rest/action/admin/indices/RestGetIndicesAction.java b/server/src/main/java/org/elasticsearch/rest/action/admin/indices/RestGetIndicesAction.java index 3550a7151ce43..97bb806e13466 100644 --- a/server/src/main/java/org/elasticsearch/rest/action/admin/indices/RestGetIndicesAction.java +++ b/server/src/main/java/org/elasticsearch/rest/action/admin/indices/RestGetIndicesAction.java @@ -9,8 +9,10 @@ package org.elasticsearch.rest.action.admin.indices; import org.elasticsearch.action.admin.indices.get.GetIndexRequest; +import org.elasticsearch.action.support.DataStreamOptions; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.client.internal.node.NodeClient; +import org.elasticsearch.cluster.metadata.DataStream; import org.elasticsearch.common.Strings; import org.elasticsearch.common.logging.DeprecationLogger; import org.elasticsearch.common.settings.Settings; @@ -64,6 +66,9 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC final GetIndexRequest getIndexRequest = new GetIndexRequest(); getIndexRequest.indices(indices); getIndexRequest.indicesOptions(IndicesOptions.fromRequest(request, getIndexRequest.indicesOptions())); + if (DataStream.isFailureStoreEnabled()) { + getIndexRequest.dataStreamOptions(DataStreamOptions.fromRequest(request, getIndexRequest.dataStreamOptions())); + } getIndexRequest.local(request.paramAsBoolean("local", getIndexRequest.local())); getIndexRequest.masterNodeTimeout(request.paramAsTime("master_timeout", getIndexRequest.masterNodeTimeout())); getIndexRequest.humanReadable(request.paramAsBoolean("human", false)); From 67fccca8037b3276eb119402783c74518ac6bd52 Mon Sep 17 00:00:00 2001 From: gmarouli Date: Tue, 19 Dec 2023 12:26:17 +0200 Subject: [PATCH 02/14] Add the failure store parameter to `GET /{index}/stats` --- .../support/broadcast/BroadcastRequest.java | 37 +++++++++++++++++-- .../admin/indices/RestIndicesStatsAction.java | 5 +++ 2 files changed, 39 insertions(+), 3 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/support/broadcast/BroadcastRequest.java b/server/src/main/java/org/elasticsearch/action/support/broadcast/BroadcastRequest.java index 569e407f7f601..97611d9da53a5 100644 --- a/server/src/main/java/org/elasticsearch/action/support/broadcast/BroadcastRequest.java +++ b/server/src/main/java/org/elasticsearch/action/support/broadcast/BroadcastRequest.java @@ -8,9 +8,11 @@ package org.elasticsearch.action.support.broadcast; +import org.elasticsearch.TransportVersions; import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.IndicesRequest; +import org.elasticsearch.action.support.DataStreamOptions; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; @@ -22,9 +24,11 @@ public class BroadcastRequest> extends ActionRequest implements IndicesRequest.Replaceable { public static final IndicesOptions DEFAULT_INDICES_OPTIONS = IndicesOptions.strictExpandOpenAndForbidClosed(); + public static final DataStreamOptions DEFAULT_DATA_STREAM_OPTIONS = DataStreamOptions.EXCLUDE_FAILURE_STORE; protected String[] indices; private IndicesOptions indicesOptions; + private DataStreamOptions dataStreamOptions; @Nullable // if timeout is infinite private TimeValue timeout; @@ -34,19 +38,32 @@ public BroadcastRequest(StreamInput in) throws IOException { indices = in.readStringArray(); indicesOptions = IndicesOptions.readIndicesOptions(in); timeout = in.readOptionalTimeValue(); + if (in.getTransportVersion().onOrAfter(TransportVersions.ADD_DATA_STREAM_OPTIONS)) { + dataStreamOptions = DataStreamOptions.readDataStreamOptions(in); + } } protected BroadcastRequest(String... indices) { - this(indices, IndicesOptions.strictExpandOpenAndForbidClosed()); + this(indices, IndicesOptions.strictExpandOpenAndForbidClosed(), DEFAULT_DATA_STREAM_OPTIONS); } protected BroadcastRequest(String[] indices, IndicesOptions indicesOptions) { - this(indices, indicesOptions, null); + this(indices, indicesOptions, DEFAULT_DATA_STREAM_OPTIONS, null); } - protected BroadcastRequest(String[] indices, IndicesOptions indicesOptions, @Nullable TimeValue timeout) { + protected BroadcastRequest(String[] indices, IndicesOptions indicesOptions, DataStreamOptions dataStreamOptions) { + this(indices, indicesOptions, dataStreamOptions, null); + } + + protected BroadcastRequest( + String[] indices, + IndicesOptions indicesOptions, + DataStreamOptions dataStreamOptions, + @Nullable TimeValue timeout + ) { this.indices = indices; this.indicesOptions = indicesOptions; + this.dataStreamOptions = dataStreamOptions; this.timeout = timeout; } @@ -72,12 +89,23 @@ public IndicesOptions indicesOptions() { return indicesOptions; } + @Override + public DataStreamOptions dataStreamOptions() { + return dataStreamOptions; + } + @SuppressWarnings("unchecked") public final Request indicesOptions(IndicesOptions indicesOptions) { this.indicesOptions = indicesOptions; return (Request) this; } + @SuppressWarnings("unchecked") + public final Request dataStreamOptions(DataStreamOptions dataStreamOptions) { + this.dataStreamOptions = dataStreamOptions; + return (Request) this; + } + @Nullable // if timeout is infinite public TimeValue timeout() { return timeout; @@ -100,5 +128,8 @@ public void writeTo(StreamOutput out) throws IOException { out.writeStringArrayNullable(indices); indicesOptions.writeIndicesOptions(out); out.writeOptionalTimeValue(timeout); + if (out.getTransportVersion().onOrAfter(TransportVersions.ADD_DATA_STREAM_OPTIONS)) { + dataStreamOptions.writeDataStreamOptions(out); + } } } diff --git a/server/src/main/java/org/elasticsearch/rest/action/admin/indices/RestIndicesStatsAction.java b/server/src/main/java/org/elasticsearch/rest/action/admin/indices/RestIndicesStatsAction.java index 90aa366d4ecdf..dc769b9623712 100644 --- a/server/src/main/java/org/elasticsearch/rest/action/admin/indices/RestIndicesStatsAction.java +++ b/server/src/main/java/org/elasticsearch/rest/action/admin/indices/RestIndicesStatsAction.java @@ -12,8 +12,10 @@ import org.elasticsearch.action.admin.indices.stats.CommonStatsFlags; import org.elasticsearch.action.admin.indices.stats.CommonStatsFlags.Flag; import org.elasticsearch.action.admin.indices.stats.IndicesStatsRequest; +import org.elasticsearch.action.support.DataStreamOptions; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.client.internal.node.NodeClient; +import org.elasticsearch.cluster.metadata.DataStream; import org.elasticsearch.common.Strings; import org.elasticsearch.common.logging.DeprecationLogger; import org.elasticsearch.core.RestApiVersion; @@ -88,6 +90,9 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC assert indicesStatsRequest.indicesOptions() == IndicesOptions.strictExpandOpenAndForbidClosed() : "IndicesStats default indices options changed"; indicesStatsRequest.indicesOptions(IndicesOptions.fromRequest(request, defaultIndicesOption)); + if (DataStream.isFailureStoreEnabled()) { + indicesStatsRequest.dataStreamOptions(DataStreamOptions.fromRequest(request, indicesStatsRequest.dataStreamOptions())); + } indicesStatsRequest.indices(Strings.splitStringByCommaToArray(request.param("index"))); // level parameter validation ClusterStatsLevel.of(request, ClusterStatsLevel.INDICES); From 0e51d815b619be52d728c93b5e2aba023ab298fb Mon Sep 17 00:00:00 2001 From: gmarouli Date: Tue, 19 Dec 2023 14:27:55 +0200 Subject: [PATCH 03/14] Fix data stream options initialization --- .../action/support/broadcast/BroadcastRequest.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/server/src/main/java/org/elasticsearch/action/support/broadcast/BroadcastRequest.java b/server/src/main/java/org/elasticsearch/action/support/broadcast/BroadcastRequest.java index 97611d9da53a5..447c727926fc9 100644 --- a/server/src/main/java/org/elasticsearch/action/support/broadcast/BroadcastRequest.java +++ b/server/src/main/java/org/elasticsearch/action/support/broadcast/BroadcastRequest.java @@ -40,6 +40,8 @@ public BroadcastRequest(StreamInput in) throws IOException { timeout = in.readOptionalTimeValue(); if (in.getTransportVersion().onOrAfter(TransportVersions.ADD_DATA_STREAM_OPTIONS)) { dataStreamOptions = DataStreamOptions.readDataStreamOptions(in); + } else { + dataStreamOptions = DEFAULT_DATA_STREAM_OPTIONS; } } From a8a4624da216a795d7dfc2a25ab65412dee05b4e Mon Sep 17 00:00:00 2001 From: gmarouli Date: Tue, 19 Dec 2023 15:36:00 +0200 Subject: [PATCH 04/14] Small improvements --- .../action/support/DataStreamOptions.java | 23 ++++--------- .../support/broadcast/BroadcastRequest.java | 4 +-- .../master/info/ClusterInfoRequest.java | 4 +-- .../support/DataStreamOptionsTests.java | 34 +++++++++++++++++++ 4 files changed, 45 insertions(+), 20 deletions(-) create mode 100644 server/src/test/java/org/elasticsearch/action/support/DataStreamOptionsTests.java diff --git a/server/src/main/java/org/elasticsearch/action/support/DataStreamOptions.java b/server/src/main/java/org/elasticsearch/action/support/DataStreamOptions.java index 7575ac463504a..344684aeaa1a2 100644 --- a/server/src/main/java/org/elasticsearch/action/support/DataStreamOptions.java +++ b/server/src/main/java/org/elasticsearch/action/support/DataStreamOptions.java @@ -9,12 +9,12 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.core.Nullable; import org.elasticsearch.rest.RestRequest; import org.elasticsearch.xcontent.ParseField; import org.elasticsearch.xcontent.ToXContentFragment; import org.elasticsearch.xcontent.XContentBuilder; -import org.elasticsearch.xcontent.XContentParser; import java.io.IOException; import java.util.Locale; @@ -24,7 +24,9 @@ * Controls how to deal with the backing indices of data streams. Currently, it handles normal backing indices and the failure * store indices. */ -public record DataStreamOptions(FailureStore failureStore) implements ToXContentFragment { +public record DataStreamOptions(FailureStore failureStore) implements ToXContentFragment, Writeable { + + private static final ParseField FAILURE_STORE_FIELD = new ParseField("failure_store"); public enum FailureStore { INCLUDE, @@ -57,9 +59,7 @@ public static boolean includeFailureIndices(@Nullable FailureStore failureStore) } } - public static final DataStreamOptions INCLUDE_FAILURE_STORE = new DataStreamOptions(FailureStore.INCLUDE); public static final DataStreamOptions EXCLUDE_FAILURE_STORE = new DataStreamOptions(FailureStore.EXCLUDE); - public static final DataStreamOptions ONLY_FAILURE_STORE = new DataStreamOptions(FailureStore.ONLY); public boolean includeNormalIndices() { return FailureStore.includeNormalIndices(failureStore); @@ -69,11 +69,12 @@ public boolean includeFailureIndices() { return FailureStore.includeFailureIndices(failureStore); } - public void writeDataStreamOptions(StreamOutput out) throws IOException { + @Override + public void writeTo(StreamOutput out) throws IOException { out.writeEnum(failureStore); } - public static DataStreamOptions readDataStreamOptions(StreamInput in) throws IOException { + public static DataStreamOptions read(StreamInput in) throws IOException { return new DataStreamOptions(in.readEnum(FailureStore.class)); } @@ -112,16 +113,6 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws return builder; } - private static final ParseField FAILURE_STORE_FIELD = new ParseField("failure_store"); - - public static DataStreamOptions fromXContent(XContentParser parser) throws IOException { - return fromXContent(parser, null); - } - - public static DataStreamOptions fromXContent(XContentParser parser, @Nullable DataStreamOptions defaults) throws IOException { - throw new IllegalStateException("Not implemented yet [mgouseti]"); - } - @Override public String toString() { return "DataStreamOptions{" + "failureStore=" + failureStore + '}'; diff --git a/server/src/main/java/org/elasticsearch/action/support/broadcast/BroadcastRequest.java b/server/src/main/java/org/elasticsearch/action/support/broadcast/BroadcastRequest.java index 447c727926fc9..f3b4110def118 100644 --- a/server/src/main/java/org/elasticsearch/action/support/broadcast/BroadcastRequest.java +++ b/server/src/main/java/org/elasticsearch/action/support/broadcast/BroadcastRequest.java @@ -39,7 +39,7 @@ public BroadcastRequest(StreamInput in) throws IOException { indicesOptions = IndicesOptions.readIndicesOptions(in); timeout = in.readOptionalTimeValue(); if (in.getTransportVersion().onOrAfter(TransportVersions.ADD_DATA_STREAM_OPTIONS)) { - dataStreamOptions = DataStreamOptions.readDataStreamOptions(in); + dataStreamOptions = DataStreamOptions.read(in); } else { dataStreamOptions = DEFAULT_DATA_STREAM_OPTIONS; } @@ -131,7 +131,7 @@ public void writeTo(StreamOutput out) throws IOException { indicesOptions.writeIndicesOptions(out); out.writeOptionalTimeValue(timeout); if (out.getTransportVersion().onOrAfter(TransportVersions.ADD_DATA_STREAM_OPTIONS)) { - dataStreamOptions.writeDataStreamOptions(out); + dataStreamOptions.writeTo(out); } } } diff --git a/server/src/main/java/org/elasticsearch/action/support/master/info/ClusterInfoRequest.java b/server/src/main/java/org/elasticsearch/action/support/master/info/ClusterInfoRequest.java index be9013d6dc390..ca3f5c28f7455 100644 --- a/server/src/main/java/org/elasticsearch/action/support/master/info/ClusterInfoRequest.java +++ b/server/src/main/java/org/elasticsearch/action/support/master/info/ClusterInfoRequest.java @@ -38,7 +38,7 @@ public ClusterInfoRequest(StreamInput in) throws IOException { } indicesOptions = IndicesOptions.readIndicesOptions(in); if (in.getTransportVersion().onOrAfter(TransportVersions.ADD_DATA_STREAM_OPTIONS)) { - dataStreamOptions = DataStreamOptions.readDataStreamOptions(in); + dataStreamOptions = DataStreamOptions.read(in); } } @@ -51,7 +51,7 @@ public void writeTo(StreamOutput out) throws IOException { } indicesOptions.writeIndicesOptions(out); if (out.getTransportVersion().onOrAfter(TransportVersions.ADD_DATA_STREAM_OPTIONS)) { - dataStreamOptions.writeDataStreamOptions(out); + dataStreamOptions.writeTo(out); } } diff --git a/server/src/test/java/org/elasticsearch/action/support/DataStreamOptionsTests.java b/server/src/test/java/org/elasticsearch/action/support/DataStreamOptionsTests.java new file mode 100644 index 0000000000000..c480130af378a --- /dev/null +++ b/server/src/test/java/org/elasticsearch/action/support/DataStreamOptionsTests.java @@ -0,0 +1,34 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.action.support; + +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.test.AbstractWireSerializingTestCase; + +import java.io.IOException; + +public class DataStreamOptionsTests extends AbstractWireSerializingTestCase { + + @Override + protected Writeable.Reader instanceReader() { + return DataStreamOptions::read; + } + + @Override + protected DataStreamOptions createTestInstance() { + return new DataStreamOptions(randomFrom(DataStreamOptions.FailureStore.values())); + } + + @Override + protected DataStreamOptions mutateInstance(DataStreamOptions instance) throws IOException { + return new DataStreamOptions( + randomValueOtherThan(instance.failureStore(), () -> randomFrom(DataStreamOptions.FailureStore.values())) + ); + } +} From f1bf01bfbcc023af6c6229ae68aa5260c1054a35 Mon Sep 17 00:00:00 2001 From: gmarouli Date: Wed, 20 Dec 2023 11:23:22 +0200 Subject: [PATCH 05/14] Add yaml test --- .../data_stream/60_get_backing_indices.yml | 47 +++++++++++++++++++ .../rest-api-spec/api/indices.get.json | 10 ++++ 2 files changed, 57 insertions(+) diff --git a/modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/60_get_backing_indices.yml b/modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/60_get_backing_indices.yml index d74ab4770fcc5..7b131b069ee36 100644 --- a/modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/60_get_backing_indices.yml +++ b/modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/60_get_backing_indices.yml @@ -47,3 +47,50 @@ indices.delete_data_stream: name: data-stream1 - is_true: acknowledged + +--- +"Get data stream with failure store": + - skip: + version: " - 8.12.99" + reason: "data stream failure stores can be retrieved via the GET index API in 8.13+" + features: allowed_warnings + + - do: + allowed_warnings: + - "index template [my-template1] has index patterns [failure-data-stream1, failure-data-stream2] matching patterns from existing older templates [global] with patterns (global => [*]); this template [my-template1] will take precedence during new index creation" + indices.put_index_template: + name: my-template1 + body: + index_patterns: [ failure-data-stream] + data_stream: + failure_store: true + + - do: + indices.create_data_stream: + name: failure-data-stream + - is_true: acknowledged + + - do: + cluster.health: + wait_for_status: green + + # save the backing & failure store index names for later use + - do: + indices.get_data_stream: + name: failure-data-stream + - set: { data_streams.0.indices.0.index_name: idx0name } + - set: { data_streams.0.failure_indices.0.index_name: failure0name } + + - do: + indices.get: + index: "failure-data-stream" + failure_store: "only" + - is_false: .$idx0name + - is_true: .$failure0name + + - do: + indices.get: + index: "failure-data-stream" + failure_store: "include" + - is_true: .$idx0name + - is_true: .$failure0name diff --git a/rest-api-spec/src/main/resources/rest-api-spec/api/indices.get.json b/rest-api-spec/src/main/resources/rest-api-spec/api/indices.get.json index e0cd96e346a7b..5ea468b1d1d7b 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/api/indices.get.json +++ b/rest-api-spec/src/main/resources/rest-api-spec/api/indices.get.json @@ -72,6 +72,16 @@ "master_timeout":{ "type":"time", "description":"Specify timeout for connection to master" + }, + "failure_store": { + "type": "enum", + "options":[ + "only", + "exclude", + "include" + ], + "default":"exclude", + "description":"Decides which type of backing indices of a data stream to return, normal or failure store." } } } From f22777614650f8f7257f661a67141f65e82e147c Mon Sep 17 00:00:00 2001 From: gmarouli Date: Wed, 10 Jan 2024 13:27:23 +0200 Subject: [PATCH 06/14] Convert yaml test to java rest test to not leak the query param --- .../datastreams/FailureStoreIT.java | 83 +++++++++++++++++++ .../data_stream/60_get_backing_indices.yml | 47 ----------- .../rest-api-spec/api/indices.get.json | 10 --- 3 files changed, 83 insertions(+), 57 deletions(-) create mode 100644 modules/data-streams/src/javaRestTest/java/org/elasticsearch/datastreams/FailureStoreIT.java diff --git a/modules/data-streams/src/javaRestTest/java/org/elasticsearch/datastreams/FailureStoreIT.java b/modules/data-streams/src/javaRestTest/java/org/elasticsearch/datastreams/FailureStoreIT.java new file mode 100644 index 0000000000000..85f7722892f4c --- /dev/null +++ b/modules/data-streams/src/javaRestTest/java/org/elasticsearch/datastreams/FailureStoreIT.java @@ -0,0 +1,83 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.datastreams; + +import org.elasticsearch.client.Request; +import org.elasticsearch.client.Response; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import org.hamcrest.MatcherAssert; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; + +public class FailureStoreIT extends DisabledSecurityDataStreamTestCase { + + /** + * This should be a yaml tests, but in order to write one we would need to expose in the rest-api-spec the new parameter. + * We do not want to do that until the feature flag is removed. For this reason, we temporarily, test the API here. + * Please convert this to a yaml test when the feature flag is removed. + */ + @SuppressWarnings("unchecked") + public void testRestApi() throws IOException { + Request putComposableIndexTemplateRequest = new Request("POST", "/_index_template/ds-template"); + putComposableIndexTemplateRequest.setJsonEntity(""" + { + "index_patterns": ["failure-data-stream"], + "template": { + "settings": { + "number_of_replicas": 0 + } + }, + "data_stream": { + "failure_store": true + } + } + """); + assertOK(client().performRequest(putComposableIndexTemplateRequest)); + + String dataStreamName = "failure-data-stream"; + assertOK(client().performRequest(new Request("PUT", "/_data_stream/" + dataStreamName))); + ensureGreen(dataStreamName); + + final Response dataStreamResponse = client().performRequest(new Request("GET", "/_data_stream/" + dataStreamName)); + List dataStreams = (List) entityAsMap(dataStreamResponse).get("data_streams"); + MatcherAssert.assertThat(dataStreams.size(), is(1)); + Map dataStream = (Map) dataStreams.get(0); + MatcherAssert.assertThat(dataStream.get("name"), equalTo(dataStreamName)); + List> backingIndices = (List>) dataStream.get("indices"); + MatcherAssert.assertThat(backingIndices.size(), is(1)); + List> failureStore = (List>) dataStream.get("failure_indices"); + MatcherAssert.assertThat(failureStore.size(), is(1)); + String backingIndex = backingIndices.get(0).get("index_name"); + String failureStoreIndex = failureStore.get(0).get("index_name"); + + { + final Response indicesResponse = client().performRequest(new Request("GET", "/" + dataStreamName)); + Map indices = entityAsMap(indicesResponse); + MatcherAssert.assertThat(indices.size(), is(1)); + MatcherAssert.assertThat(indices.containsKey(backingIndex), is(true)); + } + { + final Response indicesResponse = client().performRequest(new Request("GET", "/" + dataStreamName + "?failure_store=include")); + Map indices = entityAsMap(indicesResponse); + MatcherAssert.assertThat(indices.size(), is(2)); + MatcherAssert.assertThat(indices.containsKey(backingIndex), is(true)); + MatcherAssert.assertThat(indices.containsKey(failureStoreIndex), is(true)); + } + { + final Response indicesResponse = client().performRequest(new Request("GET", "/" + dataStreamName + "?failure_store=only")); + Map indices = entityAsMap(indicesResponse); + MatcherAssert.assertThat(indices.size(), is(1)); + MatcherAssert.assertThat(indices.containsKey(failureStoreIndex), is(true)); + } + } +} diff --git a/modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/60_get_backing_indices.yml b/modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/60_get_backing_indices.yml index 7b131b069ee36..d74ab4770fcc5 100644 --- a/modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/60_get_backing_indices.yml +++ b/modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/60_get_backing_indices.yml @@ -47,50 +47,3 @@ indices.delete_data_stream: name: data-stream1 - is_true: acknowledged - ---- -"Get data stream with failure store": - - skip: - version: " - 8.12.99" - reason: "data stream failure stores can be retrieved via the GET index API in 8.13+" - features: allowed_warnings - - - do: - allowed_warnings: - - "index template [my-template1] has index patterns [failure-data-stream1, failure-data-stream2] matching patterns from existing older templates [global] with patterns (global => [*]); this template [my-template1] will take precedence during new index creation" - indices.put_index_template: - name: my-template1 - body: - index_patterns: [ failure-data-stream] - data_stream: - failure_store: true - - - do: - indices.create_data_stream: - name: failure-data-stream - - is_true: acknowledged - - - do: - cluster.health: - wait_for_status: green - - # save the backing & failure store index names for later use - - do: - indices.get_data_stream: - name: failure-data-stream - - set: { data_streams.0.indices.0.index_name: idx0name } - - set: { data_streams.0.failure_indices.0.index_name: failure0name } - - - do: - indices.get: - index: "failure-data-stream" - failure_store: "only" - - is_false: .$idx0name - - is_true: .$failure0name - - - do: - indices.get: - index: "failure-data-stream" - failure_store: "include" - - is_true: .$idx0name - - is_true: .$failure0name diff --git a/rest-api-spec/src/main/resources/rest-api-spec/api/indices.get.json b/rest-api-spec/src/main/resources/rest-api-spec/api/indices.get.json index 5ea468b1d1d7b..e0cd96e346a7b 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/api/indices.get.json +++ b/rest-api-spec/src/main/resources/rest-api-spec/api/indices.get.json @@ -72,16 +72,6 @@ "master_timeout":{ "type":"time", "description":"Specify timeout for connection to master" - }, - "failure_store": { - "type": "enum", - "options":[ - "only", - "exclude", - "include" - ], - "default":"exclude", - "description":"Decides which type of backing indices of a data stream to return, normal or failure store." } } } From 18d2312728048752ad4fee66539cf628d59729f7 Mon Sep 17 00:00:00 2001 From: gmarouli Date: Wed, 10 Jan 2024 14:43:44 +0200 Subject: [PATCH 07/14] Polish comments --- .../java/org/elasticsearch/datastreams/FailureStoreIT.java | 2 +- .../main/java/org/elasticsearch/action/IndicesRequest.java | 5 +++-- .../org/elasticsearch/action/support/DataStreamOptions.java | 4 ++-- 3 files changed, 6 insertions(+), 5 deletions(-) diff --git a/modules/data-streams/src/javaRestTest/java/org/elasticsearch/datastreams/FailureStoreIT.java b/modules/data-streams/src/javaRestTest/java/org/elasticsearch/datastreams/FailureStoreIT.java index 85f7722892f4c..6cc33a482ecaf 100644 --- a/modules/data-streams/src/javaRestTest/java/org/elasticsearch/datastreams/FailureStoreIT.java +++ b/modules/data-streams/src/javaRestTest/java/org/elasticsearch/datastreams/FailureStoreIT.java @@ -10,11 +10,11 @@ import org.elasticsearch.client.Request; import org.elasticsearch.client.Response; +import org.hamcrest.MatcherAssert; import java.io.IOException; import java.util.List; import java.util.Map; -import org.hamcrest.MatcherAssert; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; diff --git a/server/src/main/java/org/elasticsearch/action/IndicesRequest.java b/server/src/main/java/org/elasticsearch/action/IndicesRequest.java index 7216f352681b8..76e16f27c15ee 100644 --- a/server/src/main/java/org/elasticsearch/action/IndicesRequest.java +++ b/server/src/main/java/org/elasticsearch/action/IndicesRequest.java @@ -31,8 +31,9 @@ public interface IndicesRequest { IndicesOptions indicesOptions(); /** - * Returns the data stream options used to resolve indices. They tell for instance whether we need to only expand - * to the normal backing indices or also to the failure store. + * Returns the data stream options used to resolve indices. They tell for instance whether we need to include only + * the backing indices and/or only the failure store indices as well. + * By default, the failure store indices are excluded. */ default DataStreamOptions dataStreamOptions() { return DataStreamOptions.EXCLUDE_FAILURE_STORE; diff --git a/server/src/main/java/org/elasticsearch/action/support/DataStreamOptions.java b/server/src/main/java/org/elasticsearch/action/support/DataStreamOptions.java index 344684aeaa1a2..8fa8cd9abdd2a 100644 --- a/server/src/main/java/org/elasticsearch/action/support/DataStreamOptions.java +++ b/server/src/main/java/org/elasticsearch/action/support/DataStreamOptions.java @@ -21,7 +21,7 @@ import java.util.Map; /** - * Controls how to deal with the backing indices of data streams. Currently, it handles normal backing indices and the failure + * Controls which indices associated with a data stream to include. Currently, it handles backing indices and the failure * store indices. */ public record DataStreamOptions(FailureStore failureStore) implements ToXContentFragment, Writeable { @@ -93,7 +93,7 @@ public static DataStreamOptions fromMap(Map map, DataStreamOptio * Returns true if the name represents a valid name for one of the indices option * false otherwise */ - public static boolean isDataStreamOptions(String name) { + public static boolean isDataStreamOption(String name) { return "failure_store".equals(name); } From 3ee2d70b33e8bdc7d74344235b3a73c69f570642 Mon Sep 17 00:00:00 2001 From: gmarouli Date: Wed, 10 Jan 2024 17:53:56 +0200 Subject: [PATCH 08/14] Implement settings and mapping --- .../datastreams/FailureStoreIT.java | 83 -------- .../datastreams/FailureStoreQueryParamIT.java | 184 ++++++++++++++++++ .../settings/get/GetSettingsRequest.java | 16 +- .../get/GetSettingsRequestBuilder.java | 11 ++ .../action/support/DataStreamOptions.java | 2 + .../admin/indices/RestGetIndicesAction.java | 2 +- .../admin/indices/RestGetMappingAction.java | 5 + .../admin/indices/RestGetSettingsAction.java | 5 + .../rest/action/cat/RestIndicesAction.java | 3 + 9 files changed, 226 insertions(+), 85 deletions(-) delete mode 100644 modules/data-streams/src/javaRestTest/java/org/elasticsearch/datastreams/FailureStoreIT.java create mode 100644 modules/data-streams/src/javaRestTest/java/org/elasticsearch/datastreams/FailureStoreQueryParamIT.java diff --git a/modules/data-streams/src/javaRestTest/java/org/elasticsearch/datastreams/FailureStoreIT.java b/modules/data-streams/src/javaRestTest/java/org/elasticsearch/datastreams/FailureStoreIT.java deleted file mode 100644 index 6cc33a482ecaf..0000000000000 --- a/modules/data-streams/src/javaRestTest/java/org/elasticsearch/datastreams/FailureStoreIT.java +++ /dev/null @@ -1,83 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License - * 2.0 and the Server Side Public License, v 1; you may not use this file except - * in compliance with, at your election, the Elastic License 2.0 or the Server - * Side Public License, v 1. - */ - -package org.elasticsearch.datastreams; - -import org.elasticsearch.client.Request; -import org.elasticsearch.client.Response; -import org.hamcrest.MatcherAssert; - -import java.io.IOException; -import java.util.List; -import java.util.Map; - -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.is; - -public class FailureStoreIT extends DisabledSecurityDataStreamTestCase { - - /** - * This should be a yaml tests, but in order to write one we would need to expose in the rest-api-spec the new parameter. - * We do not want to do that until the feature flag is removed. For this reason, we temporarily, test the API here. - * Please convert this to a yaml test when the feature flag is removed. - */ - @SuppressWarnings("unchecked") - public void testRestApi() throws IOException { - Request putComposableIndexTemplateRequest = new Request("POST", "/_index_template/ds-template"); - putComposableIndexTemplateRequest.setJsonEntity(""" - { - "index_patterns": ["failure-data-stream"], - "template": { - "settings": { - "number_of_replicas": 0 - } - }, - "data_stream": { - "failure_store": true - } - } - """); - assertOK(client().performRequest(putComposableIndexTemplateRequest)); - - String dataStreamName = "failure-data-stream"; - assertOK(client().performRequest(new Request("PUT", "/_data_stream/" + dataStreamName))); - ensureGreen(dataStreamName); - - final Response dataStreamResponse = client().performRequest(new Request("GET", "/_data_stream/" + dataStreamName)); - List dataStreams = (List) entityAsMap(dataStreamResponse).get("data_streams"); - MatcherAssert.assertThat(dataStreams.size(), is(1)); - Map dataStream = (Map) dataStreams.get(0); - MatcherAssert.assertThat(dataStream.get("name"), equalTo(dataStreamName)); - List> backingIndices = (List>) dataStream.get("indices"); - MatcherAssert.assertThat(backingIndices.size(), is(1)); - List> failureStore = (List>) dataStream.get("failure_indices"); - MatcherAssert.assertThat(failureStore.size(), is(1)); - String backingIndex = backingIndices.get(0).get("index_name"); - String failureStoreIndex = failureStore.get(0).get("index_name"); - - { - final Response indicesResponse = client().performRequest(new Request("GET", "/" + dataStreamName)); - Map indices = entityAsMap(indicesResponse); - MatcherAssert.assertThat(indices.size(), is(1)); - MatcherAssert.assertThat(indices.containsKey(backingIndex), is(true)); - } - { - final Response indicesResponse = client().performRequest(new Request("GET", "/" + dataStreamName + "?failure_store=include")); - Map indices = entityAsMap(indicesResponse); - MatcherAssert.assertThat(indices.size(), is(2)); - MatcherAssert.assertThat(indices.containsKey(backingIndex), is(true)); - MatcherAssert.assertThat(indices.containsKey(failureStoreIndex), is(true)); - } - { - final Response indicesResponse = client().performRequest(new Request("GET", "/" + dataStreamName + "?failure_store=only")); - Map indices = entityAsMap(indicesResponse); - MatcherAssert.assertThat(indices.size(), is(1)); - MatcherAssert.assertThat(indices.containsKey(failureStoreIndex), is(true)); - } - } -} diff --git a/modules/data-streams/src/javaRestTest/java/org/elasticsearch/datastreams/FailureStoreQueryParamIT.java b/modules/data-streams/src/javaRestTest/java/org/elasticsearch/datastreams/FailureStoreQueryParamIT.java new file mode 100644 index 0000000000000..96b292d9b2aec --- /dev/null +++ b/modules/data-streams/src/javaRestTest/java/org/elasticsearch/datastreams/FailureStoreQueryParamIT.java @@ -0,0 +1,184 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.datastreams; + +import org.elasticsearch.client.Request; +import org.elasticsearch.client.Response; +import org.hamcrest.MatcherAssert; +import org.junit.Before; + +import java.io.IOException; +import java.util.List; +import java.util.Map; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; + +/** + * This should be a yaml tests, but in order to write one we would need to expose in the rest-api-spec the new parameter. + * We do not want to do that until the feature flag is removed. For this reason, we temporarily, test the API here. + * Please convert this to a yaml test when the feature flag is removed. + */ +public class FailureStoreQueryParamIT extends DisabledSecurityDataStreamTestCase { + + private static final String DATA_STREAM_NAME = "failure-data-stream"; + private String backingIndex; + private String failureStoreIndex; + + @SuppressWarnings("unchecked") + @Before + public void setup() throws IOException { + Request putComposableIndexTemplateRequest = new Request("POST", "/_index_template/ds-template"); + putComposableIndexTemplateRequest.setJsonEntity(""" + { + "index_patterns": ["failure-data-stream"], + "template": { + "settings": { + "number_of_replicas": 0 + } + }, + "data_stream": { + "failure_store": true + } + } + """); + assertOK(client().performRequest(putComposableIndexTemplateRequest)); + + assertOK(client().performRequest(new Request("PUT", "/_data_stream/" + DATA_STREAM_NAME))); + ensureGreen(DATA_STREAM_NAME); + + final Response dataStreamResponse = client().performRequest(new Request("GET", "/_data_stream/" + DATA_STREAM_NAME)); + List dataStreams = (List) entityAsMap(dataStreamResponse).get("data_streams"); + MatcherAssert.assertThat(dataStreams.size(), is(1)); + Map dataStream = (Map) dataStreams.get(0); + MatcherAssert.assertThat(dataStream.get("name"), equalTo(DATA_STREAM_NAME)); + List backingIndices = getBackingIndices(dataStream); + MatcherAssert.assertThat(backingIndices.size(), is(1)); + List failureStore = getFailureStore(dataStream); + MatcherAssert.assertThat(failureStore.size(), is(1)); + backingIndex = backingIndices.get(0); + failureStoreIndex = failureStore.get(0); + } + + public void testGetIndexApi() throws IOException { + { + final Response indicesResponse = client().performRequest(new Request("GET", "/" + DATA_STREAM_NAME)); + Map indices = entityAsMap(indicesResponse); + MatcherAssert.assertThat(indices.size(), is(2)); + MatcherAssert.assertThat(indices.containsKey(backingIndex), is(true)); + MatcherAssert.assertThat(indices.containsKey(failureStoreIndex), is(true)); + } + { + final Response indicesResponse = client().performRequest(new Request("GET", "/" + DATA_STREAM_NAME + "?failure_store=exclude")); + Map indices = entityAsMap(indicesResponse); + MatcherAssert.assertThat(indices.size(), is(1)); + MatcherAssert.assertThat(indices.containsKey(backingIndex), is(true)); + } + { + final Response indicesResponse = client().performRequest(new Request("GET", "/" + DATA_STREAM_NAME + "?failure_store=only")); + Map indices = entityAsMap(indicesResponse); + MatcherAssert.assertThat(indices.size(), is(1)); + MatcherAssert.assertThat(indices.containsKey(failureStoreIndex), is(true)); + } + } + + @SuppressWarnings("unchecked") + public void testGetIndexStatsApi() throws IOException { + { + final Response indicesResponse = client().performRequest(new Request("GET", "/" + DATA_STREAM_NAME + "/_stats")); + Map indices = (Map) entityAsMap(indicesResponse).get("indices"); + MatcherAssert.assertThat(indices.size(), is(1)); + MatcherAssert.assertThat(indices.containsKey(backingIndex), is(true)); + } + { + final Response indicesResponse = client().performRequest( + new Request("GET", "/" + DATA_STREAM_NAME + "/_stats?failure_store=include") + ); + Map indices = (Map) entityAsMap(indicesResponse).get("indices"); + MatcherAssert.assertThat(indices.size(), is(2)); + MatcherAssert.assertThat(indices.containsKey(backingIndex), is(true)); + MatcherAssert.assertThat(indices.containsKey(failureStoreIndex), is(true)); + } + { + final Response indicesResponse = client().performRequest( + new Request("GET", "/" + DATA_STREAM_NAME + "/_stats?failure_store=only") + ); + Map indices = (Map) entityAsMap(indicesResponse).get("indices"); + MatcherAssert.assertThat(indices.size(), is(1)); + MatcherAssert.assertThat(indices.containsKey(failureStoreIndex), is(true)); + } + } + + public void testGetIndexSettingsApi() throws IOException { + { + final Response indicesResponse = client().performRequest(new Request("GET", "/" + DATA_STREAM_NAME + "/_settings")); + Map indices = entityAsMap(indicesResponse); + MatcherAssert.assertThat(indices.size(), is(1)); + MatcherAssert.assertThat(indices.containsKey(backingIndex), is(true)); + } + { + final Response indicesResponse = client().performRequest( + new Request("GET", "/" + DATA_STREAM_NAME + "/_settings?failure_store=include") + ); + Map indices = entityAsMap(indicesResponse); + MatcherAssert.assertThat(indices.size(), is(2)); + MatcherAssert.assertThat(indices.containsKey(backingIndex), is(true)); + MatcherAssert.assertThat(indices.containsKey(failureStoreIndex), is(true)); + } + { + final Response indicesResponse = client().performRequest( + new Request("GET", "/" + DATA_STREAM_NAME + "/_settings?failure_store=only") + ); + Map indices = entityAsMap(indicesResponse); + MatcherAssert.assertThat(indices.size(), is(1)); + MatcherAssert.assertThat(indices.containsKey(failureStoreIndex), is(true)); + } + } + + public void testGetIndexMappingApi() throws IOException { + { + final Response indicesResponse = client().performRequest(new Request("GET", "/" + DATA_STREAM_NAME + "/_mapping")); + Map indices = entityAsMap(indicesResponse); + MatcherAssert.assertThat(indices.size(), is(1)); + MatcherAssert.assertThat(indices.containsKey(backingIndex), is(true)); + } + { + final Response indicesResponse = client().performRequest( + new Request("GET", "/" + DATA_STREAM_NAME + "/_mapping?failure_store=include") + ); + Map indices = entityAsMap(indicesResponse); + MatcherAssert.assertThat(indices.size(), is(2)); + MatcherAssert.assertThat(indices.containsKey(backingIndex), is(true)); + MatcherAssert.assertThat(indices.containsKey(failureStoreIndex), is(true)); + } + { + final Response indicesResponse = client().performRequest( + new Request("GET", "/" + DATA_STREAM_NAME + "/_mapping?failure_store=only") + ); + Map indices = entityAsMap(indicesResponse); + MatcherAssert.assertThat(indices.size(), is(1)); + MatcherAssert.assertThat(indices.containsKey(failureStoreIndex), is(true)); + } + } + + private List getBackingIndices(Map response) { + return getIndices(response, "indices"); + } + + private List getFailureStore(Map response) { + return getIndices(response, "failure_indices"); + + } + + @SuppressWarnings("unchecked") + private List getIndices(Map response, String fieldName) { + List> indices = (List>) response.get(fieldName); + return indices.stream().map(index -> index.get("index_name")).toList(); + } +} diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/settings/get/GetSettingsRequest.java b/server/src/main/java/org/elasticsearch/action/admin/indices/settings/get/GetSettingsRequest.java index 96cbfc80c8d67..76da86aae0b10 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/settings/get/GetSettingsRequest.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/settings/get/GetSettingsRequest.java @@ -11,6 +11,7 @@ import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.IndicesRequest; import org.elasticsearch.action.ValidateActions; +import org.elasticsearch.action.support.DataStreamOptions; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.action.support.master.MasterNodeReadRequest; import org.elasticsearch.common.Strings; @@ -24,9 +25,11 @@ public class GetSettingsRequest extends MasterNodeReadRequest implements IndicesRequest.Replaceable { public static final IndicesOptions DEFAULT_INDICES_OPTIONS = IndicesOptions.fromOptions(false, true, true, true); + public static final DataStreamOptions DEFAULT_DATA_STREAM_OPTIONS = DataStreamOptions.EXCLUDE_FAILURE_STORE; private String[] indices = Strings.EMPTY_ARRAY; private IndicesOptions indicesOptions = DEFAULT_INDICES_OPTIONS; + private DataStreamOptions dataStreamOptions = DEFAULT_DATA_STREAM_OPTIONS; private String[] names = Strings.EMPTY_ARRAY; private boolean humanReadable = false; private boolean includeDefaults = false; @@ -42,6 +45,11 @@ public GetSettingsRequest indicesOptions(IndicesOptions indicesOptions) { return this; } + public GetSettingsRequest dataStreamOptions(DataStreamOptions dataStreamOptions) { + this.dataStreamOptions = dataStreamOptions; + return this; + } + /** * When include_defaults is set, return default values which are normally suppressed. * This flag is specific to the rest client. @@ -82,6 +90,11 @@ public IndicesOptions indicesOptions() { return indicesOptions; } + @Override + public DataStreamOptions dataStreamOptions() { + return dataStreamOptions; + } + @Override public boolean includeDataStreams() { return true; @@ -127,12 +140,13 @@ public boolean equals(Object o) { && includeDefaults == that.includeDefaults && Arrays.equals(indices, that.indices) && Objects.equals(indicesOptions, that.indicesOptions) + && Objects.equals(dataStreamOptions, that.dataStreamOptions) && Arrays.equals(names, that.names); } @Override public int hashCode() { - int result = Objects.hash(indicesOptions, humanReadable, includeDefaults); + int result = Objects.hash(indicesOptions, humanReadable, includeDefaults, dataStreamOptions); result = 31 * result + Arrays.hashCode(indices); result = 31 * result + Arrays.hashCode(names); return result; diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/settings/get/GetSettingsRequestBuilder.java b/server/src/main/java/org/elasticsearch/action/admin/indices/settings/get/GetSettingsRequestBuilder.java index 166e5cb497ad6..8a0ded8e8e870 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/settings/get/GetSettingsRequestBuilder.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/settings/get/GetSettingsRequestBuilder.java @@ -8,6 +8,7 @@ package org.elasticsearch.action.admin.indices.settings.get; +import org.elasticsearch.action.support.DataStreamOptions; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.action.support.master.MasterNodeReadOperationRequestBuilder; import org.elasticsearch.client.internal.ElasticsearchClient; @@ -42,6 +43,16 @@ public GetSettingsRequestBuilder setIndicesOptions(IndicesOptions options) { return this; } + /** + * Specifies what type of data stream related indices to include. + *

+ * For example just backing indices, or to also include failure store. + */ + public GetSettingsRequestBuilder setDataStreamOptions(DataStreamOptions options) { + request.dataStreamOptions(options); + return this; + } + public GetSettingsRequestBuilder setNames(String... names) { request.names(names); return this; diff --git a/server/src/main/java/org/elasticsearch/action/support/DataStreamOptions.java b/server/src/main/java/org/elasticsearch/action/support/DataStreamOptions.java index 8fa8cd9abdd2a..d19389ed69a9d 100644 --- a/server/src/main/java/org/elasticsearch/action/support/DataStreamOptions.java +++ b/server/src/main/java/org/elasticsearch/action/support/DataStreamOptions.java @@ -61,6 +61,8 @@ public static boolean includeFailureIndices(@Nullable FailureStore failureStore) public static final DataStreamOptions EXCLUDE_FAILURE_STORE = new DataStreamOptions(FailureStore.EXCLUDE); + public static final DataStreamOptions INCLUDE_FAILURE_STORE = new DataStreamOptions(FailureStore.INCLUDE); + public boolean includeNormalIndices() { return FailureStore.includeNormalIndices(failureStore); } diff --git a/server/src/main/java/org/elasticsearch/rest/action/admin/indices/RestGetIndicesAction.java b/server/src/main/java/org/elasticsearch/rest/action/admin/indices/RestGetIndicesAction.java index 7a7a70eb6f8b3..610e0784addda 100644 --- a/server/src/main/java/org/elasticsearch/rest/action/admin/indices/RestGetIndicesAction.java +++ b/server/src/main/java/org/elasticsearch/rest/action/admin/indices/RestGetIndicesAction.java @@ -67,7 +67,7 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC getIndexRequest.indices(indices); getIndexRequest.indicesOptions(IndicesOptions.fromRequest(request, getIndexRequest.indicesOptions())); if (DataStream.isFailureStoreEnabled()) { - getIndexRequest.dataStreamOptions(DataStreamOptions.fromRequest(request, getIndexRequest.dataStreamOptions())); + getIndexRequest.dataStreamOptions(DataStreamOptions.fromRequest(request, DataStreamOptions.INCLUDE_FAILURE_STORE)); } getIndexRequest.local(request.paramAsBoolean("local", getIndexRequest.local())); getIndexRequest.masterNodeTimeout(request.paramAsTime("master_timeout", getIndexRequest.masterNodeTimeout())); diff --git a/server/src/main/java/org/elasticsearch/rest/action/admin/indices/RestGetMappingAction.java b/server/src/main/java/org/elasticsearch/rest/action/admin/indices/RestGetMappingAction.java index 065399076c12a..8760999d9aab8 100644 --- a/server/src/main/java/org/elasticsearch/rest/action/admin/indices/RestGetMappingAction.java +++ b/server/src/main/java/org/elasticsearch/rest/action/admin/indices/RestGetMappingAction.java @@ -9,8 +9,10 @@ package org.elasticsearch.rest.action.admin.indices; import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsRequest; +import org.elasticsearch.action.support.DataStreamOptions; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.client.internal.node.NodeClient; +import org.elasticsearch.cluster.metadata.DataStream; import org.elasticsearch.common.Strings; import org.elasticsearch.common.logging.DeprecationLogger; import org.elasticsearch.core.RestApiVersion; @@ -87,6 +89,9 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC final TimeValue timeout = request.paramAsTime("master_timeout", getMappingsRequest.masterNodeTimeout()); getMappingsRequest.masterNodeTimeout(timeout); getMappingsRequest.local(request.paramAsBoolean("local", getMappingsRequest.local())); + if (DataStream.isFailureStoreEnabled()) { + getMappingsRequest.dataStreamOptions(DataStreamOptions.fromRequest(request, getMappingsRequest.dataStreamOptions())); + } final HttpChannel httpChannel = request.getHttpChannel(); return channel -> new RestCancellableNodeClient(client, httpChannel).admin() .indices() diff --git a/server/src/main/java/org/elasticsearch/rest/action/admin/indices/RestGetSettingsAction.java b/server/src/main/java/org/elasticsearch/rest/action/admin/indices/RestGetSettingsAction.java index af72e66f6127d..e3cb3b134bbf7 100644 --- a/server/src/main/java/org/elasticsearch/rest/action/admin/indices/RestGetSettingsAction.java +++ b/server/src/main/java/org/elasticsearch/rest/action/admin/indices/RestGetSettingsAction.java @@ -9,8 +9,10 @@ package org.elasticsearch.rest.action.admin.indices; import org.elasticsearch.action.admin.indices.settings.get.GetSettingsRequest; +import org.elasticsearch.action.support.DataStreamOptions; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.client.internal.node.NodeClient; +import org.elasticsearch.cluster.metadata.DataStream; import org.elasticsearch.common.Strings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.rest.BaseRestHandler; @@ -56,6 +58,9 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC .names(names); getSettingsRequest.local(request.paramAsBoolean("local", getSettingsRequest.local())); getSettingsRequest.masterNodeTimeout(request.paramAsTime("master_timeout", getSettingsRequest.masterNodeTimeout())); + if (DataStream.isFailureStoreEnabled()) { + getSettingsRequest.dataStreamOptions(DataStreamOptions.fromRequest(request, getSettingsRequest.dataStreamOptions())); + } return channel -> client.admin().indices().getSettings(getSettingsRequest, new RestRefCountedChunkedToXContentListener<>(channel)); } } diff --git a/server/src/main/java/org/elasticsearch/rest/action/cat/RestIndicesAction.java b/server/src/main/java/org/elasticsearch/rest/action/cat/RestIndicesAction.java index ccfcb9b505e92..5b8b3ab1bf90f 100644 --- a/server/src/main/java/org/elasticsearch/rest/action/cat/RestIndicesAction.java +++ b/server/src/main/java/org/elasticsearch/rest/action/cat/RestIndicesAction.java @@ -13,6 +13,7 @@ import org.elasticsearch.action.admin.indices.stats.CommonStats; import org.elasticsearch.action.admin.indices.stats.IndexStats; import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse; +import org.elasticsearch.action.support.DataStreamOptions; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.action.support.RefCountingListener; import org.elasticsearch.client.internal.node.NodeClient; @@ -79,6 +80,7 @@ protected void documentation(StringBuilder sb) { public RestChannelConsumer doCatRequest(final RestRequest request, final NodeClient client) { final String[] indices = Strings.splitStringByCommaToArray(request.param("index")); final IndicesOptions indicesOptions = IndicesOptions.fromRequest(request, IndicesOptions.strictExpand()); + final DataStreamOptions dataStreamOptions = DataStreamOptions.fromRequest(request, DataStreamOptions.INCLUDE_FAILURE_STORE); final TimeValue masterNodeTimeout = request.paramAsTime("master_timeout", DEFAULT_MASTER_NODE_TIMEOUT); final boolean includeUnloadedSegments = request.paramAsBoolean("include_unloaded_segments", false); @@ -116,6 +118,7 @@ public RestResponse buildResponse(Void ignored) throws Exception { .indices() .prepareGetSettings(indices) .setIndicesOptions(indicesOptions) + .setDataStreamOptions(dataStreamOptions) .setMasterNodeTimeout(masterNodeTimeout) .setNames(IndexSettings.INDEX_SEARCH_THROTTLED.getKey()) .execute(listeners.acquire(indexSettingsRef::set)); From a6332bb28c1f31b41da04356e5196bc6faa7ce26 Mon Sep 17 00:00:00 2001 From: gmarouli Date: Thu, 11 Jan 2024 16:57:31 +0200 Subject: [PATCH 09/14] Polishing --- .../datastreams/FailureStoreQueryParamIT.java | 2 +- .../elasticsearch/action/IndicesRequest.java | 2 +- .../action/support/IndicesOptionsTests.java | 62 ++++++------------- 3 files changed, 20 insertions(+), 46 deletions(-) diff --git a/modules/data-streams/src/javaRestTest/java/org/elasticsearch/datastreams/FailureStoreQueryParamIT.java b/modules/data-streams/src/javaRestTest/java/org/elasticsearch/datastreams/FailureStoreQueryParamIT.java index 96b292d9b2aec..56db8b1dcb809 100644 --- a/modules/data-streams/src/javaRestTest/java/org/elasticsearch/datastreams/FailureStoreQueryParamIT.java +++ b/modules/data-streams/src/javaRestTest/java/org/elasticsearch/datastreams/FailureStoreQueryParamIT.java @@ -21,7 +21,7 @@ import static org.hamcrest.Matchers.is; /** - * This should be a yaml tests, but in order to write one we would need to expose in the rest-api-spec the new parameter. + * This should be a yaml tests, but in order to write one we would need to expose the new parameter in the rest-api-spec. * We do not want to do that until the feature flag is removed. For this reason, we temporarily, test the API here. * Please convert this to a yaml test when the feature flag is removed. */ diff --git a/server/src/main/java/org/elasticsearch/action/IndicesRequest.java b/server/src/main/java/org/elasticsearch/action/IndicesRequest.java index 76e16f27c15ee..dd5738c3d6056 100644 --- a/server/src/main/java/org/elasticsearch/action/IndicesRequest.java +++ b/server/src/main/java/org/elasticsearch/action/IndicesRequest.java @@ -32,7 +32,7 @@ public interface IndicesRequest { /** * Returns the data stream options used to resolve indices. They tell for instance whether we need to include only - * the backing indices and/or only the failure store indices as well. + * the backing indices and/or/only the failure store indices as well. * By default, the failure store indices are excluded. */ default DataStreamOptions dataStreamOptions() { diff --git a/server/src/test/java/org/elasticsearch/action/support/IndicesOptionsTests.java b/server/src/test/java/org/elasticsearch/action/support/IndicesOptionsTests.java index 6b84b9cf147df..d9afeaeec3475 100644 --- a/server/src/test/java/org/elasticsearch/action/support/IndicesOptionsTests.java +++ b/server/src/test/java/org/elasticsearch/action/support/IndicesOptionsTests.java @@ -41,17 +41,7 @@ public class IndicesOptionsTests extends ESTestCase { public void testSerialization() throws Exception { int iterations = randomIntBetween(5, 20); for (int i = 0; i < iterations; i++) { - IndicesOptions indicesOptions = IndicesOptions.fromOptions( - randomBoolean(), - randomBoolean(), - randomBoolean(), - randomBoolean(), - randomBoolean(), - randomBoolean(), - randomBoolean(), - randomBoolean(), - randomBoolean() - ); + IndicesOptions indicesOptions = randomIndicesOptions(); BytesStreamOutput output = new BytesStreamOutput(); indicesOptions.writeIndicesOptions(output); @@ -72,6 +62,20 @@ public void testSerialization() throws Exception { } } + public static IndicesOptions randomIndicesOptions() { + return IndicesOptions.fromOptions( + randomBoolean(), + randomBoolean(), + randomBoolean(), + randomBoolean(), + randomBoolean(), + randomBoolean(), + randomBoolean(), + randomBoolean(), + randomBoolean() + ); + } + public void testFromOptions() { final boolean ignoreUnavailable = randomBoolean(); final boolean allowNoIndices = randomBoolean(); @@ -113,17 +117,7 @@ public void testFromOptionsWithDefaultOptions() { boolean expandToOpenIndices = randomBoolean(); boolean expandToClosedIndices = randomBoolean(); - IndicesOptions defaultOptions = IndicesOptions.fromOptions( - randomBoolean(), - randomBoolean(), - randomBoolean(), - randomBoolean(), - randomBoolean(), - randomBoolean(), - randomBoolean(), - randomBoolean(), - randomBoolean() - ); + IndicesOptions defaultOptions = randomIndicesOptions(); IndicesOptions indicesOptions = IndicesOptions.fromOptions( ignoreUnavailable, @@ -176,17 +170,7 @@ public void testFromParameters() { boolean allowNoIndices = randomBoolean(); String allowNoIndicesString = Boolean.toString(allowNoIndices); - IndicesOptions defaultOptions = IndicesOptions.fromOptions( - randomBoolean(), - randomBoolean(), - randomBoolean(), - randomBoolean(), - randomBoolean(), - randomBoolean(), - randomBoolean(), - randomBoolean(), - randomBoolean() - ); + IndicesOptions defaultOptions = randomIndicesOptions(); IndicesOptions updatedOptions = IndicesOptions.fromParameters( expandWildcardsString, @@ -207,17 +191,7 @@ public void testFromParameters() { } public void testEqualityAndHashCode() { - IndicesOptions indicesOptions = IndicesOptions.fromOptions( - randomBoolean(), - randomBoolean(), - randomBoolean(), - randomBoolean(), - randomBoolean(), - randomBoolean(), - randomBoolean(), - randomBoolean(), - randomBoolean() - ); + IndicesOptions indicesOptions = randomIndicesOptions(); EqualsHashCodeTestUtils.checkEqualsAndHashCode(indicesOptions, opts -> { return IndicesOptions.fromOptions( From 4414ac82d3ac123526561629b4b8a6ecae8bc255 Mon Sep 17 00:00:00 2001 From: gmarouli Date: Mon, 15 Jan 2024 19:08:11 +0200 Subject: [PATCH 10/14] Add unit tests for the IndexNameExpressionResolver --- .../action/support/DataStreamOptions.java | 8 ++- .../metadata/IndexNameExpressionResolver.java | 40 ++++++++++-- .../IndexNameExpressionResolverTests.java | 65 ++++++++++++++++++- .../metadata/DataStreamTestHelper.java | 13 +++- 4 files changed, 112 insertions(+), 14 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/support/DataStreamOptions.java b/server/src/main/java/org/elasticsearch/action/support/DataStreamOptions.java index d19389ed69a9d..567e8d6f80334 100644 --- a/server/src/main/java/org/elasticsearch/action/support/DataStreamOptions.java +++ b/server/src/main/java/org/elasticsearch/action/support/DataStreamOptions.java @@ -50,7 +50,7 @@ String toXContentValue() { return toString().toLowerCase(Locale.ROOT); } - public static boolean includeNormalIndices(@Nullable FailureStore failureStore) { + public static boolean includeBackingIndices(@Nullable FailureStore failureStore) { return failureStore == null || failureStore.equals(ONLY) == false; } @@ -63,8 +63,10 @@ public static boolean includeFailureIndices(@Nullable FailureStore failureStore) public static final DataStreamOptions INCLUDE_FAILURE_STORE = new DataStreamOptions(FailureStore.INCLUDE); - public boolean includeNormalIndices() { - return FailureStore.includeNormalIndices(failureStore); + public static final DataStreamOptions ONLY_FAILURE_STORE = new DataStreamOptions(FailureStore.ONLY); + + public boolean includeBackingIndices() { + return FailureStore.includeBackingIndices(failureStore); } public boolean includeFailureIndices() { diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/IndexNameExpressionResolver.java b/server/src/main/java/org/elasticsearch/cluster/metadata/IndexNameExpressionResolver.java index 8e8b63646ff8f..0fa3e86ad4713 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/IndexNameExpressionResolver.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/IndexNameExpressionResolver.java @@ -303,6 +303,27 @@ public Index[] concreteIndices(ClusterState state, IndicesOptions options, boole return concreteIndices(context, indexExpressions); } + public Index[] concreteIndices( + ClusterState state, + IndicesOptions options, + DataStreamOptions dataStreamOptions, + boolean includeDataStreams, + String... indexExpressions + ) { + Context context = new Context( + state, + options, + dataStreamOptions, + false, + false, + includeDataStreams, + getSystemIndexAccessLevel(), + getSystemIndexAccessPredicate(), + getNetNewSystemIndexPredicate() + ); + return concreteIndices(context, indexExpressions); + } + /** * Translates the provided index expression into actual concrete indices, properly deduplicated. * @@ -365,7 +386,8 @@ Index[] concreteIndices(Context context, String... indexExpressions) { } } else if (indexAbstraction.getType() == Type.DATA_STREAM && context.isResolveToWriteIndex()) { if (DataStream.isFailureStoreEnabled() == false - || DataStreamOptions.FailureStore.includeNormalIndices(context.dataStreamOptions.failureStore())) { + || context.dataStreamOptions == null + || context.dataStreamOptions.includeBackingIndices()) { Index writeIndex = indexAbstraction.getWriteIndex(); if (addIndex(writeIndex, null, context)) { concreteIndicesResult.add(writeIndex); @@ -373,11 +395,12 @@ Index[] concreteIndices(Context context, String... indexExpressions) { } DataStream dataStream = (DataStream) indexAbstraction; if (DataStream.isFailureStoreEnabled() + && context.dataStreamOptions != null && dataStream.isFailureStore() - && DataStreamOptions.FailureStore.includeFailureIndices(context.dataStreamOptions.failureStore())) { - Index writeIndex = dataStream.getFailureStoreWriteIndex(); - if (writeIndex != null && addIndex(writeIndex, null, context)) { - concreteIndicesResult.add(writeIndex); + && context.dataStreamOptions.includeFailureIndices()) { + Index failureStoreWriteIndex = dataStream.getFailureStoreWriteIndex(); + if (failureStoreWriteIndex != null && addIndex(failureStoreWriteIndex, null, context)) { + concreteIndicesResult.add(failureStoreWriteIndex); } } } else { @@ -400,7 +423,9 @@ Index[] concreteIndices(Context context, String... indexExpressions) { if (indexAbstraction.getType() == Type.DATA_STREAM) { DataStream dataStream = (DataStream) indexAbstraction; - if (context.dataStreamOptions.includeNormalIndices()) { + if (DataStream.isFailureStoreEnabled() == false + || context.dataStreamOptions == null + || context.dataStreamOptions.includeBackingIndices()) { for (Index index : indexAbstraction.getIndices()) { if (shouldTrackConcreteIndex(context, context.getOptions(), index)) { concreteIndicesResult.add(index); @@ -409,6 +434,7 @@ Index[] concreteIndices(Context context, String... indexExpressions) { } if (DataStream.isFailureStoreEnabled() && dataStream.isFailureStore() + && context.dataStreamOptions != null && context.dataStreamOptions.includeFailureIndices()) { for (Index index : dataStream.getFailureIndices()) { if (shouldTrackConcreteIndex(context, context.getOptions(), index)) { @@ -437,7 +463,7 @@ private static boolean resolvesToMoreThanOneIndices(IndexAbstraction indexAbstra if (indexAbstraction.getType() == Type.DATA_STREAM) { DataStream dataStream = (DataStream) indexAbstraction; int count = 0; - if (DataStream.isFailureStoreEnabled() == false || context.dataStreamOptions.includeNormalIndices()) { + if (DataStream.isFailureStoreEnabled() == false || context.dataStreamOptions.includeBackingIndices()) { count += dataStream.getIndices().size(); } if (DataStream.isFailureStoreEnabled() && dataStream.isFailureStore() && context.dataStreamOptions.includeFailureIndices()) { diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/IndexNameExpressionResolverTests.java b/server/src/test/java/org/elasticsearch/cluster/metadata/IndexNameExpressionResolverTests.java index 99dc1c84ba15b..02d739594185d 100644 --- a/server/src/test/java/org/elasticsearch/cluster/metadata/IndexNameExpressionResolverTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/metadata/IndexNameExpressionResolverTests.java @@ -16,6 +16,7 @@ import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.support.DataStreamOptions; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.cluster.ClusterName; @@ -52,9 +53,7 @@ import java.util.Set; import java.util.function.Function; -import static org.elasticsearch.cluster.metadata.DataStreamTestHelper.backingIndexEqualTo; -import static org.elasticsearch.cluster.metadata.DataStreamTestHelper.createBackingIndex; -import static org.elasticsearch.cluster.metadata.DataStreamTestHelper.newInstance; +import static org.elasticsearch.cluster.metadata.DataStreamTestHelper.*; import static org.elasticsearch.cluster.metadata.IndexMetadata.INDEX_HIDDEN_SETTING; import static org.elasticsearch.common.util.set.Sets.newHashSet; import static org.elasticsearch.indices.SystemIndices.EXTERNAL_SYSTEM_INDEX_ACCESS_CONTROL_HEADER_KEY; @@ -2729,6 +2728,66 @@ public void testDataStreams() { } } + public void testDataStreamsWithFailureStore() { + final String dataStreamName = "my-data-stream"; + IndexMetadata index1 = createBackingIndex(dataStreamName, 1, epochMillis).build(); + IndexMetadata index2 = createBackingIndex(dataStreamName, 2, epochMillis).build(); + IndexMetadata failureIndex1 = createFailureIndex(dataStreamName, 1, epochMillis).build(); + IndexMetadata failureIndex2 = createFailureIndex(dataStreamName, 2, epochMillis).build(); + + Metadata.Builder mdBuilder = Metadata.builder() + .put(index1, false) + .put(index2, false) + .put(failureIndex1, false) + .put(failureIndex2, false) + .put( + newInstance( + dataStreamName, + List.of(index1.getIndex(), index2.getIndex()), + List.of(failureIndex1.getIndex(), failureIndex2.getIndex()) + ) + ); + ClusterState state = ClusterState.builder(new ClusterName("_name")).metadata(mdBuilder).build(); + + { + IndicesOptions indicesOptions = IndicesOptions.STRICT_EXPAND_OPEN; + Index[] result = indexNameExpressionResolver.concreteIndices(state, indicesOptions, true, "my-data-stream"); + assertThat(result.length, equalTo(2)); + assertThat(result[0].getName(), equalTo(DataStream.getDefaultBackingIndexName(dataStreamName, 1, epochMillis))); + assertThat(result[1].getName(), equalTo(DataStream.getDefaultBackingIndexName(dataStreamName, 2, epochMillis))); + } + + { + IndicesOptions indicesOptions = IndicesOptions.STRICT_EXPAND_OPEN; + Index[] result = indexNameExpressionResolver.concreteIndices( + state, + indicesOptions, + DataStreamOptions.INCLUDE_FAILURE_STORE, + true, + "my-data-stream" + ); + assertThat(result.length, equalTo(4)); + assertThat(result[0].getName(), equalTo(DataStream.getDefaultBackingIndexName(dataStreamName, 1, epochMillis))); + assertThat(result[1].getName(), equalTo(DataStream.getDefaultBackingIndexName(dataStreamName, 2, epochMillis))); + assertThat(result[2].getName(), equalTo(DataStream.getDefaultFailureStoreName(dataStreamName, 1, epochMillis))); + assertThat(result[3].getName(), equalTo(DataStream.getDefaultFailureStoreName(dataStreamName, 2, epochMillis))); + } + + { + IndicesOptions indicesOptions = IndicesOptions.STRICT_EXPAND_OPEN; + Index[] result = indexNameExpressionResolver.concreteIndices( + state, + indicesOptions, + DataStreamOptions.ONLY_FAILURE_STORE, + true, + "my-data-stream" + ); + assertThat(result.length, equalTo(2)); + assertThat(result[0].getName(), equalTo(DataStream.getDefaultFailureStoreName(dataStreamName, 1, epochMillis))); + assertThat(result[1].getName(), equalTo(DataStream.getDefaultFailureStoreName(dataStreamName, 2, epochMillis))); + } + } + public void testDataStreamAliases() { String dataStream1 = "my-data-stream-1"; IndexMetadata index1 = createBackingIndex(dataStream1, 1, epochMillis).build(); diff --git a/test/framework/src/main/java/org/elasticsearch/cluster/metadata/DataStreamTestHelper.java b/test/framework/src/main/java/org/elasticsearch/cluster/metadata/DataStreamTestHelper.java index d0b30bff92f3e..c216b3639d111 100644 --- a/test/framework/src/main/java/org/elasticsearch/cluster/metadata/DataStreamTestHelper.java +++ b/test/framework/src/main/java/org/elasticsearch/cluster/metadata/DataStreamTestHelper.java @@ -90,6 +90,10 @@ public static DataStream newInstance(String name, List indices) { return newInstance(name, indices, indices.size(), null); } + public static DataStream newInstance(String name, List indices, List failureIndices) { + return newInstance(name, indices, indices.size(), null, false, null, failureIndices); + } + public static DataStream newInstance(String name, List indices, long generation, Map metadata) { return newInstance(name, indices, generation, metadata, false); } @@ -124,7 +128,7 @@ public static DataStream newInstance( @Nullable DataStreamLifecycle lifecycle, List failureStores ) { - return new DataStream(name, indices, generation, metadata, false, replicated, false, false, null, lifecycle, false, failureStores); + return new DataStream(name, indices, generation, metadata, false, replicated, false, false, null, lifecycle, failureStores.isEmpty() == false, failureStores); } public static String getLegacyDefaultBackingIndexName( @@ -169,6 +173,13 @@ public static IndexMetadata.Builder createBackingIndex(String dataStreamName, in .numberOfReplicas(NUMBER_OF_REPLICAS); } + public static IndexMetadata.Builder createFailureIndex(String dataStreamName, int generation, long epochMillis) { + return IndexMetadata.builder(DataStream.getDefaultFailureStoreName(dataStreamName, generation, epochMillis)) + .settings(SETTINGS) + .numberOfShards(NUMBER_OF_SHARDS) + .numberOfReplicas(NUMBER_OF_REPLICAS); + } + public static IndexMetadata.Builder getIndexMetadataBuilderForIndex(Index index) { return IndexMetadata.builder(index.getName()) .settings(Settings.builder().put(SETTINGS.build()).put(SETTING_INDEX_UUID, index.getUUID())) From 26355b900034e05b26d036b394a1bdf41caf9648 Mon Sep 17 00:00:00 2001 From: gmarouli Date: Tue, 16 Jan 2024 09:41:00 +0200 Subject: [PATCH 11/14] Fix format in DataStreamTestHelper --- .../metadata/DataStreamTestHelper.java | 21 +++++++++++++++---- 1 file changed, 17 insertions(+), 4 deletions(-) diff --git a/test/framework/src/main/java/org/elasticsearch/cluster/metadata/DataStreamTestHelper.java b/test/framework/src/main/java/org/elasticsearch/cluster/metadata/DataStreamTestHelper.java index c216b3639d111..8f9d68176fdda 100644 --- a/test/framework/src/main/java/org/elasticsearch/cluster/metadata/DataStreamTestHelper.java +++ b/test/framework/src/main/java/org/elasticsearch/cluster/metadata/DataStreamTestHelper.java @@ -128,7 +128,20 @@ public static DataStream newInstance( @Nullable DataStreamLifecycle lifecycle, List failureStores ) { - return new DataStream(name, indices, generation, metadata, false, replicated, false, false, null, lifecycle, failureStores.isEmpty() == false, failureStores); + return new DataStream( + name, + indices, + generation, + metadata, + false, + replicated, + false, + false, + null, + lifecycle, + failureStores.isEmpty() == false, + failureStores + ); } public static String getLegacyDefaultBackingIndexName( @@ -175,9 +188,9 @@ public static IndexMetadata.Builder createBackingIndex(String dataStreamName, in public static IndexMetadata.Builder createFailureIndex(String dataStreamName, int generation, long epochMillis) { return IndexMetadata.builder(DataStream.getDefaultFailureStoreName(dataStreamName, generation, epochMillis)) - .settings(SETTINGS) - .numberOfShards(NUMBER_OF_SHARDS) - .numberOfReplicas(NUMBER_OF_REPLICAS); + .settings(SETTINGS) + .numberOfShards(NUMBER_OF_SHARDS) + .numberOfReplicas(NUMBER_OF_REPLICAS); } public static IndexMetadata.Builder getIndexMetadataBuilderForIndex(Index index) { From b7afe451ebddcef2899aa8f691fad7d3c8e8cbb3 Mon Sep 17 00:00:00 2001 From: gmarouli Date: Tue, 16 Jan 2024 11:59:05 +0200 Subject: [PATCH 12/14] Remove star import --- .../cluster/metadata/IndexNameExpressionResolverTests.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/IndexNameExpressionResolverTests.java b/server/src/test/java/org/elasticsearch/cluster/metadata/IndexNameExpressionResolverTests.java index 02d739594185d..dadf68aa71f3f 100644 --- a/server/src/test/java/org/elasticsearch/cluster/metadata/IndexNameExpressionResolverTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/metadata/IndexNameExpressionResolverTests.java @@ -53,7 +53,10 @@ import java.util.Set; import java.util.function.Function; -import static org.elasticsearch.cluster.metadata.DataStreamTestHelper.*; +import static org.elasticsearch.cluster.metadata.DataStreamTestHelper.backingIndexEqualTo; +import static org.elasticsearch.cluster.metadata.DataStreamTestHelper.createBackingIndex; +import static org.elasticsearch.cluster.metadata.DataStreamTestHelper.createFailureIndex; +import static org.elasticsearch.cluster.metadata.DataStreamTestHelper.newInstance; import static org.elasticsearch.cluster.metadata.IndexMetadata.INDEX_HIDDEN_SETTING; import static org.elasticsearch.common.util.set.Sets.newHashSet; import static org.elasticsearch.indices.SystemIndices.EXTERNAL_SYSTEM_INDEX_ACCESS_CONTROL_HEADER_KEY; From ca1a009cf0bbb84409496abce06690be49fc5edb Mon Sep 17 00:00:00 2001 From: gmarouli Date: Tue, 16 Jan 2024 13:22:02 +0200 Subject: [PATCH 13/14] Polishing --- .../datastreams/FailureStoreQueryParamIT.java | 4 +- .../metadata/IndexNameExpressionResolver.java | 80 ++++++++++--------- .../DateMathExpressionResolverTests.java | 2 +- .../cluster/metadata/ExpressionListTests.java | 2 +- .../IndexNameExpressionResolverTests.java | 16 ++-- 5 files changed, 56 insertions(+), 48 deletions(-) diff --git a/modules/data-streams/src/javaRestTest/java/org/elasticsearch/datastreams/FailureStoreQueryParamIT.java b/modules/data-streams/src/javaRestTest/java/org/elasticsearch/datastreams/FailureStoreQueryParamIT.java index 56db8b1dcb809..1aee1e5ecc7da 100644 --- a/modules/data-streams/src/javaRestTest/java/org/elasticsearch/datastreams/FailureStoreQueryParamIT.java +++ b/modules/data-streams/src/javaRestTest/java/org/elasticsearch/datastreams/FailureStoreQueryParamIT.java @@ -21,8 +21,8 @@ import static org.hamcrest.Matchers.is; /** - * This should be a yaml tests, but in order to write one we would need to expose the new parameter in the rest-api-spec. - * We do not want to do that until the feature flag is removed. For this reason, we temporarily, test the API here. + * This should be a yaml test, but in order to write one we would need to expose the new parameter in the rest-api-spec. + * We do not want to do that until the feature flag is removed. For this reason, we temporarily, test the affected APIs here. * Please convert this to a yaml test when the feature flag is removed. */ public class FailureStoreQueryParamIT extends DisabledSecurityDataStreamTestCase { diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/IndexNameExpressionResolver.java b/server/src/main/java/org/elasticsearch/cluster/metadata/IndexNameExpressionResolver.java index 0fa3e86ad4713..2b23218143f75 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/IndexNameExpressionResolver.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/IndexNameExpressionResolver.java @@ -249,7 +249,7 @@ public IndexAbstraction resolveWriteIndexAbstraction(ClusterState state, DocWrit } protected static Collection resolveExpressions(Context context, String... expressions) { - if (context.getOptions().expandWildcardExpressions() == false) { + if (context.getIndicesOptions().expandWildcardExpressions() == false) { if (expressions == null || expressions.length == 0 || expressions.length == 1 && Metadata.ALL.equals(expressions[0])) { return List.of(); } else { @@ -385,9 +385,7 @@ Index[] concreteIndices(Context context, String... indexExpressions) { concreteIndicesResult.add(writeIndex); } } else if (indexAbstraction.getType() == Type.DATA_STREAM && context.isResolveToWriteIndex()) { - if (DataStream.isFailureStoreEnabled() == false - || context.dataStreamOptions == null - || context.dataStreamOptions.includeBackingIndices()) { + if (DataStream.isFailureStoreEnabled() == false || context.getDataStreamOptions().includeBackingIndices()) { Index writeIndex = indexAbstraction.getWriteIndex(); if (addIndex(writeIndex, null, context)) { concreteIndicesResult.add(writeIndex); @@ -395,9 +393,8 @@ Index[] concreteIndices(Context context, String... indexExpressions) { } DataStream dataStream = (DataStream) indexAbstraction; if (DataStream.isFailureStoreEnabled() - && context.dataStreamOptions != null && dataStream.isFailureStore() - && context.dataStreamOptions.includeFailureIndices()) { + && context.getDataStreamOptions().includeFailureIndices()) { Index failureStoreWriteIndex = dataStream.getFailureStoreWriteIndex(); if (failureStoreWriteIndex != null && addIndex(failureStoreWriteIndex, null, context)) { concreteIndicesResult.add(failureStoreWriteIndex); @@ -405,7 +402,7 @@ Index[] concreteIndices(Context context, String... indexExpressions) { } } else { if (resolvesToMoreThanOneIndices(indexAbstraction, context) - && context.getOptions().allowAliasesToMultipleIndices() == false) { + && context.getIndicesOptions().allowAliasesToMultipleIndices() == false) { String[] indexNames = new String[indexAbstraction.getIndices().size()]; int i = 0; for (Index indexName : indexAbstraction.getIndices()) { @@ -423,28 +420,25 @@ Index[] concreteIndices(Context context, String... indexExpressions) { if (indexAbstraction.getType() == Type.DATA_STREAM) { DataStream dataStream = (DataStream) indexAbstraction; - if (DataStream.isFailureStoreEnabled() == false - || context.dataStreamOptions == null - || context.dataStreamOptions.includeBackingIndices()) { + if (DataStream.isFailureStoreEnabled() == false || context.getDataStreamOptions().includeBackingIndices()) { for (Index index : indexAbstraction.getIndices()) { - if (shouldTrackConcreteIndex(context, context.getOptions(), index)) { + if (shouldTrackConcreteIndex(context, context.getIndicesOptions(), index)) { concreteIndicesResult.add(index); } } } if (DataStream.isFailureStoreEnabled() && dataStream.isFailureStore() - && context.dataStreamOptions != null - && context.dataStreamOptions.includeFailureIndices()) { + && context.getDataStreamOptions().includeFailureIndices()) { for (Index index : dataStream.getFailureIndices()) { - if (shouldTrackConcreteIndex(context, context.getOptions(), index)) { + if (shouldTrackConcreteIndex(context, context.getIndicesOptions(), index)) { concreteIndicesResult.add(index); } } } } else { for (Index index : indexAbstraction.getIndices()) { - if (shouldTrackConcreteIndex(context, context.getOptions(), index)) { + if (shouldTrackConcreteIndex(context, context.getIndicesOptions(), index)) { concreteIndicesResult.add(index); } } @@ -452,7 +446,7 @@ Index[] concreteIndices(Context context, String... indexExpressions) { } } - if (context.getOptions().allowNoIndices() == false && concreteIndicesResult.isEmpty()) { + if (context.getIndicesOptions().allowNoIndices() == false && concreteIndicesResult.isEmpty()) { throw notFoundException(indexExpressions); } checkSystemIndexAccess(context, concreteIndicesResult); @@ -463,10 +457,12 @@ private static boolean resolvesToMoreThanOneIndices(IndexAbstraction indexAbstra if (indexAbstraction.getType() == Type.DATA_STREAM) { DataStream dataStream = (DataStream) indexAbstraction; int count = 0; - if (DataStream.isFailureStoreEnabled() == false || context.dataStreamOptions.includeBackingIndices()) { + if (DataStream.isFailureStoreEnabled() == false || context.getDataStreamOptions().includeBackingIndices()) { count += dataStream.getIndices().size(); } - if (DataStream.isFailureStoreEnabled() && dataStream.isFailureStore() && context.dataStreamOptions.includeFailureIndices()) { + if (DataStream.isFailureStoreEnabled() + && dataStream.isFailureStore() + && context.getDataStreamOptions().includeFailureIndices()) { count += dataStream.getFailureIndices().size(); } return count > 1; @@ -585,7 +581,7 @@ private static boolean addIndex(Index index, IndexMetadata imd, Context context) // we changed it to look at the `index.frozen` setting instead, since frozen indices were the only // type of index to use the `search_throttled` threadpool at that time. // NOTE: We can't reference the Setting object, which is only defined and registered in x-pack. - if (context.options.ignoreThrottled()) { + if (context.indicesOptions.ignoreThrottled()) { imd = imd != null ? imd : context.state.metadata().index(index); return imd.getSettings().getAsBoolean("index.frozen", false) == false; } else { @@ -1049,7 +1045,7 @@ public Predicate getNetNewSystemIndexPredicate() { public static class Context { private final ClusterState state; - private final IndicesOptions options; + private final IndicesOptions indicesOptions; private final DataStreamOptions dataStreamOptions; private final long startTime; private final boolean preserveAliases; @@ -1195,7 +1191,7 @@ protected Context( Predicate netNewSystemIndexPredicate ) { this.state = state; - this.options = options; + this.indicesOptions = options; this.dataStreamOptions = dataStreamOptions; this.startTime = startTime; this.preserveAliases = preserveAliases; @@ -1211,8 +1207,12 @@ public ClusterState getState() { return state; } - public IndicesOptions getOptions() { - return options; + public IndicesOptions getIndicesOptions() { + return indicesOptions; + } + + public DataStreamOptions getDataStreamOptions() { + return dataStreamOptions; } public long getStartTime() { @@ -1280,7 +1280,7 @@ public static Collection resolveAll(Context context) { indexAbstraction -> indexAbstraction.isSystem() == false || context.systemIndexAccessPredicate.test(indexAbstraction.getName()) ); - if (context.getOptions().expandWildcardsHidden() == false) { + if (context.getIndicesOptions().expandWildcardsHidden() == false) { dataStreamsAbstractions = dataStreamsAbstractions.filter(indexAbstraction -> indexAbstraction.isHidden() == false); } // dedup backing indices if expand hidden indices option is true @@ -1317,7 +1317,7 @@ public static Collection resolve(Context context, List expressio Stream matchingResources = matchResourcesToWildcard(context, expression.get()); Stream matchingOpenClosedNames = expandToOpenClosed(context, matchingResources); AtomicBoolean emptyWildcardExpansion = new AtomicBoolean(false); - if (context.getOptions().allowNoIndices() == false) { + if (context.getIndicesOptions().allowNoIndices() == false) { emptyWildcardExpansion.set(true); matchingOpenClosedNames = matchingOpenClosedNames.peek(x -> emptyWildcardExpansion.set(false)); } @@ -1378,7 +1378,7 @@ private static Stream matchResourcesToWildcard(Context context ); } } - if (context.getOptions().ignoreAliases()) { + if (context.getIndicesOptions().ignoreAliases()) { matchesStream = matchesStream.filter(indexAbstraction -> indexAbstraction.getType() != Type.ALIAS); } if (context.includeDataStreams() == false) { @@ -1393,7 +1393,7 @@ private static Stream matchResourcesToWildcard(Context context && context.netNewSystemIndexPredicate.test(indexAbstraction.getName()) == false) || context.systemIndexAccessPredicate.test(indexAbstraction.getName()) ); - if (context.getOptions().expandWildcardsHidden() == false) { + if (context.getIndicesOptions().expandWildcardsHidden() == false) { if (wildcardExpression.startsWith(".")) { // there is this behavior that hidden indices that start with "." are not hidden if the wildcard expression also // starts with "." @@ -1425,7 +1425,7 @@ private static Map filterIndicesLookupForSuffixWildcar * then all index resources are filtered by their open/closed status. */ private static Stream expandToOpenClosed(Context context, Stream resources) { - final IndexMetadata.State excludeState = excludeState(context.getOptions()); + final IndexMetadata.State excludeState = excludeState(context.getIndicesOptions()); return resources.flatMap(indexAbstraction -> { if (context.isPreserveAliases() && indexAbstraction.getType() == Type.ALIAS) { return Stream.of(indexAbstraction.getName()); @@ -1442,7 +1442,10 @@ private static Stream expandToOpenClosed(Context context, Stream resolveEmptyOrTrivialWildcard(Context context) { - final String[] allIndices = resolveEmptyOrTrivialWildcardToAllIndices(context.getOptions(), context.getState().metadata()); + final String[] allIndices = resolveEmptyOrTrivialWildcardToAllIndices( + context.getIndicesOptions(), + context.getState().metadata() + ); if (context.systemIndexAccessLevel == SystemIndexAccessLevel.ALL) { return List.of(allIndices); } else { @@ -1687,7 +1690,7 @@ private ExplicitResourceNameFilter() { * Only explicit resource names are considered for filtering. Wildcard and exclusion expressions are kept in. */ public static List filterUnavailable(Context context, List expressions) { - ensureRemoteIndicesRequireIgnoreUnavailable(context.getOptions(), expressions); + ensureRemoteIndicesRequireIgnoreUnavailable(context.getIndicesOptions(), expressions); List result = new ArrayList<>(expressions.size()); for (ExpressionList.Expression expression : new ExpressionList(context, expressions)) { validateAliasOrIndex(expression); @@ -1705,7 +1708,7 @@ public static List filterUnavailable(Context context, List expre */ @Nullable private static boolean ensureAliasOrIndexExists(Context context, String name) { - boolean ignoreUnavailable = context.getOptions().ignoreUnavailable(); + boolean ignoreUnavailable = context.getIndicesOptions().ignoreUnavailable(); IndexAbstraction indexAbstraction = context.getState().getMetadata().getIndicesLookup().get(name); if (indexAbstraction == null) { if (ignoreUnavailable) { @@ -1715,7 +1718,7 @@ private static boolean ensureAliasOrIndexExists(Context context, String name) { } } // treat aliases as unavailable indices when ignoreAliases is set to true (e.g. delete index and update aliases api) - if (indexAbstraction.getType() == Type.ALIAS && context.getOptions().ignoreAliases()) { + if (indexAbstraction.getType() == Type.ALIAS && context.getIndicesOptions().ignoreAliases()) { if (ignoreUnavailable) { return false; } else { @@ -1799,7 +1802,7 @@ public ExpressionList(Context context, List expressionStrings) { boolean wildcardSeen = false; for (String expressionString : expressionStrings) { boolean isExclusion = expressionString.startsWith("-") && wildcardSeen; - if (context.getOptions().expandWildcardExpressions() && isWildcard(expressionString)) { + if (context.getIndicesOptions().expandWildcardExpressions() && isWildcard(expressionString)) { wildcardSeen = true; expressionsList.add(new Expression(expressionString, true, isExclusion)); } else { @@ -1824,8 +1827,8 @@ public Iterator iterator() { } /** - * This is a context for the DateMathExpressionResolver which does not require {@code IndicesOptions} or {@code ClusterState} - * since it uses only the start time to resolve expressions. + * This is a context for the DateMathExpressionResolver which does not require {@code IndicesOptions} or + * {@code DataStreamOptions} or {@code ClusterState} since it uses only the start time to resolve expressions. */ public static final class ResolverContext extends Context { public ResolverContext() { @@ -1842,7 +1845,12 @@ public ClusterState getState() { } @Override - public IndicesOptions getOptions() { + public IndicesOptions getIndicesOptions() { + throw new UnsupportedOperationException("should never be called"); + } + + @Override + public DataStreamOptions getDataStreamOptions() { throw new UnsupportedOperationException("should never be called"); } } diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/DateMathExpressionResolverTests.java b/server/src/test/java/org/elasticsearch/cluster/metadata/DateMathExpressionResolverTests.java index f9a744c41ca61..4be6e272d909e 100644 --- a/server/src/test/java/org/elasticsearch/cluster/metadata/DateMathExpressionResolverTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/metadata/DateMathExpressionResolverTests.java @@ -195,7 +195,7 @@ public void testExpression_CustomTimeZoneInIndexName() throws Exception { } Context context = new Context( this.context.getState(), - this.context.getOptions(), + this.context.getIndicesOptions(), now.toInstant().toEpochMilli(), SystemIndexAccessLevel.NONE, name -> false, diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/ExpressionListTests.java b/server/src/test/java/org/elasticsearch/cluster/metadata/ExpressionListTests.java index 5235ca9671318..072362408bb46 100644 --- a/server/src/test/java/org/elasticsearch/cluster/metadata/ExpressionListTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/metadata/ExpressionListTests.java @@ -302,7 +302,7 @@ private IndicesOptions getNoExpandWildcardsIndicesOptions() { private Context getContextWithOptions(IndicesOptions indicesOptions) { Context context = mock(Context.class); - when(context.getOptions()).thenReturn(indicesOptions); + when(context.getIndicesOptions()).thenReturn(indicesOptions); return context; } } diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/IndexNameExpressionResolverTests.java b/server/src/test/java/org/elasticsearch/cluster/metadata/IndexNameExpressionResolverTests.java index dadf68aa71f3f..2e6ae971c1ef6 100644 --- a/server/src/test/java/org/elasticsearch/cluster/metadata/IndexNameExpressionResolverTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/metadata/IndexNameExpressionResolverTests.java @@ -1484,10 +1484,10 @@ public void testIndexOptionsFailClosedIndicesAndAliases() { state, IndicesOptions.fromOptions( true, - contextICE.getOptions().allowNoIndices(), - contextICE.getOptions().expandWildcardsOpen(), - contextICE.getOptions().expandWildcardsClosed(), - contextICE.getOptions() + contextICE.getIndicesOptions().allowNoIndices(), + contextICE.getIndicesOptions().expandWildcardsOpen(), + contextICE.getIndicesOptions().expandWildcardsClosed(), + contextICE.getIndicesOptions() ), SystemIndexAccessLevel.NONE ); @@ -1523,10 +1523,10 @@ public void testIndexOptionsFailClosedIndicesAndAliases() { state, IndicesOptions.fromOptions( true, - context.getOptions().allowNoIndices(), - context.getOptions().expandWildcardsOpen(), - context.getOptions().expandWildcardsClosed(), - context.getOptions() + context.getIndicesOptions().allowNoIndices(), + context.getIndicesOptions().expandWildcardsOpen(), + context.getIndicesOptions().expandWildcardsClosed(), + context.getIndicesOptions() ), SystemIndexAccessLevel.NONE ); From d56de12714063bc98bdc5a3991a43a0752754d32 Mon Sep 17 00:00:00 2001 From: gmarouli Date: Thu, 18 Jan 2024 14:37:57 +0200 Subject: [PATCH 14/14] Fix broken failure store in cat indices API --- .../cluster/state/ClusterStateRequest.java | 19 +++ .../state/ClusterStateRequestBuilder.java | 6 + .../BroadcastOperationRequestBuilder.java | 7 + .../metadata/IndexNameExpressionResolver.java | 49 +++++-- .../rest/action/cat/RestIndicesAction.java | 7 +- .../IndexNameExpressionResolverTests.java | 120 ++++++++++++++++++ 6 files changed, 192 insertions(+), 16 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/state/ClusterStateRequest.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/state/ClusterStateRequest.java index e9de49dcbf5b4..b710e4347e521 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/state/ClusterStateRequest.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/state/ClusterStateRequest.java @@ -8,8 +8,10 @@ package org.elasticsearch.action.admin.cluster.state; +import org.elasticsearch.TransportVersions; import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.IndicesRequest; +import org.elasticsearch.action.support.DataStreamOptions; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.action.support.master.MasterNodeReadRequest; import org.elasticsearch.common.Strings; @@ -37,6 +39,7 @@ public class ClusterStateRequest extends MasterNodeReadRequest 1; @@ -1432,7 +1426,18 @@ private static Stream expandToOpenClosed(Context context, Stream indicesStateStream = indexAbstraction.getIndices().stream().map(context.state.metadata()::index); + Stream indicesStateStream = Stream.of(); + if (shouldIncludeBackingIndices(context.getDataStreamOptions())) { + indicesStateStream = indexAbstraction.getIndices().stream().map(context.state.metadata()::index); + } + if (indexAbstraction.getType() == Type.DATA_STREAM + && shouldIncludeFailureStoreIndices(context.getDataStreamOptions(), (DataStream) indexAbstraction)) { + DataStream dataStream = (DataStream) indexAbstraction; + indicesStateStream = Stream.concat( + indicesStateStream, + dataStream.getFailureIndices().stream().map(context.state.metadata()::index) + ); + } if (excludeState != null) { indicesStateStream = indicesStateStream.filter(indexMeta -> indexMeta.getState() != excludeState); } @@ -1444,6 +1449,7 @@ private static Stream expandToOpenClosed(Context context, Stream resolveEmptyOrTrivialWildcard(Context context) { final String[] allIndices = resolveEmptyOrTrivialWildcardToAllIndices( context.getIndicesOptions(), + context.getDataStreamOptions(), context.getState().metadata() ); if (context.systemIndexAccessLevel == SystemIndexAccessLevel.ALL) { @@ -1476,7 +1482,14 @@ private static List resolveEmptyOrTrivialWildcardWithAllowedSystemIndice }).toList(); } - private static String[] resolveEmptyOrTrivialWildcardToAllIndices(IndicesOptions options, Metadata metadata) { + private static String[] resolveEmptyOrTrivialWildcardToAllIndices( + IndicesOptions options, + DataStreamOptions dataStreamOptions, + Metadata metadata + ) { + if (shouldIncludeBackingIndices(dataStreamOptions) == false) { + return Strings.EMPTY_ARRAY; + } if (options.expandWildcardsOpen() && options.expandWildcardsClosed() && options.expandWildcardsHidden()) { return metadata.getConcreteAllIndices(); } else if (options.expandWildcardsOpen() && options.expandWildcardsClosed()) { @@ -1495,6 +1508,14 @@ private static String[] resolveEmptyOrTrivialWildcardToAllIndices(IndicesOptions } } + private static boolean shouldIncludeBackingIndices(DataStreamOptions dataStreamOptions) { + return DataStream.isFailureStoreEnabled() == false || dataStreamOptions.includeBackingIndices(); + } + + private static boolean shouldIncludeFailureStoreIndices(DataStreamOptions dataStreamOptions, DataStream dataStream) { + return DataStream.isFailureStoreEnabled() && dataStreamOptions.includeFailureIndices() && dataStream.isFailureStore(); + } + public static final class DateMathExpressionResolver { private static final DateFormatter DEFAULT_DATE_FORMATTER = DateFormatter.forPattern("uuuu.MM.dd"); diff --git a/server/src/main/java/org/elasticsearch/rest/action/cat/RestIndicesAction.java b/server/src/main/java/org/elasticsearch/rest/action/cat/RestIndicesAction.java index 5b8b3ab1bf90f..2527c74046564 100644 --- a/server/src/main/java/org/elasticsearch/rest/action/cat/RestIndicesAction.java +++ b/server/src/main/java/org/elasticsearch/rest/action/cat/RestIndicesAction.java @@ -124,9 +124,10 @@ public RestResponse buildResponse(Void ignored) throws Exception { .execute(listeners.acquire(indexSettingsRef::set)); // The other requests just provide additional detail, and wildcards may be resolved differently depending on the type of - // request in the presence of security plugins, so set the IndicesOptions for all the sub-requests to be as inclusive as - // possible. + // request in the presence of security plugins, so set the IndicesOptions and DataStreamOptions for all the sub-requests + // to be as inclusive as possible. final IndicesOptions subRequestIndicesOptions = IndicesOptions.lenientExpandHidden(); + final DataStreamOptions subRequestDataStreamOptions = DataStreamOptions.INCLUDE_FAILURE_STORE; client.admin() .cluster() @@ -136,6 +137,7 @@ public RestResponse buildResponse(Void ignored) throws Exception { .setRoutingTable(true) .setIndices(indices) .setIndicesOptions(subRequestIndicesOptions) + .setDataStreamOptions(subRequestDataStreamOptions) .setMasterNodeTimeout(masterNodeTimeout) .execute(listeners.acquire(clusterStateRef::set)); @@ -143,6 +145,7 @@ public RestResponse buildResponse(Void ignored) throws Exception { .indices() .prepareStats(indices) .setIndicesOptions(subRequestIndicesOptions) + .setDataStreamOptions(subRequestDataStreamOptions) .all() .setIncludeUnloadedSegments(includeUnloadedSegments) .execute(listeners.acquire(indicesStatsRef::set)); diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/IndexNameExpressionResolverTests.java b/server/src/test/java/org/elasticsearch/cluster/metadata/IndexNameExpressionResolverTests.java index 2e6ae971c1ef6..bb7aaadd0cfdd 100644 --- a/server/src/test/java/org/elasticsearch/cluster/metadata/IndexNameExpressionResolverTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/metadata/IndexNameExpressionResolverTests.java @@ -2737,12 +2737,14 @@ public void testDataStreamsWithFailureStore() { IndexMetadata index2 = createBackingIndex(dataStreamName, 2, epochMillis).build(); IndexMetadata failureIndex1 = createFailureIndex(dataStreamName, 1, epochMillis).build(); IndexMetadata failureIndex2 = createFailureIndex(dataStreamName, 2, epochMillis).build(); + IndexMetadata otherIndex = indexBuilder("my-other-index", Settings.EMPTY).state(State.OPEN).build(); Metadata.Builder mdBuilder = Metadata.builder() .put(index1, false) .put(index2, false) .put(failureIndex1, false) .put(failureIndex2, false) + .put(otherIndex, false) .put( newInstance( dataStreamName, @@ -2752,6 +2754,7 @@ public void testDataStreamsWithFailureStore() { ); ClusterState state = ClusterState.builder(new ClusterName("_name")).metadata(mdBuilder).build(); + // Test default with an exact data stream name { IndicesOptions indicesOptions = IndicesOptions.STRICT_EXPAND_OPEN; Index[] result = indexNameExpressionResolver.concreteIndices(state, indicesOptions, true, "my-data-stream"); @@ -2760,6 +2763,7 @@ public void testDataStreamsWithFailureStore() { assertThat(result[1].getName(), equalTo(DataStream.getDefaultBackingIndexName(dataStreamName, 2, epochMillis))); } + // Test include failure store with an exact data stream name { IndicesOptions indicesOptions = IndicesOptions.STRICT_EXPAND_OPEN; Index[] result = indexNameExpressionResolver.concreteIndices( @@ -2776,6 +2780,7 @@ public void testDataStreamsWithFailureStore() { assertThat(result[3].getName(), equalTo(DataStream.getDefaultFailureStoreName(dataStreamName, 2, epochMillis))); } + // Test only failure store with an exact data stream name { IndicesOptions indicesOptions = IndicesOptions.STRICT_EXPAND_OPEN; Index[] result = indexNameExpressionResolver.concreteIndices( @@ -2789,6 +2794,121 @@ public void testDataStreamsWithFailureStore() { assertThat(result[0].getName(), equalTo(DataStream.getDefaultFailureStoreName(dataStreamName, 1, epochMillis))); assertThat(result[1].getName(), equalTo(DataStream.getDefaultFailureStoreName(dataStreamName, 2, epochMillis))); } + + // Test default without any expressions + { + IndicesOptions indicesOptions = IndicesOptions.STRICT_EXPAND_OPEN; + Index[] result = indexNameExpressionResolver.concreteIndices(state, indicesOptions, true); + assertThat(result.length, equalTo(3)); + List indexNames = Arrays.stream(result).map(Index::getName).toList(); + assertThat( + indexNames, + containsInAnyOrder( + DataStream.getDefaultBackingIndexName(dataStreamName, 2, epochMillis), + DataStream.getDefaultBackingIndexName(dataStreamName, 1, epochMillis), + otherIndex.getIndex().getName() + ) + ); + } + + // Test include failure store without any expressions + { + IndicesOptions indicesOptions = IndicesOptions.STRICT_EXPAND_OPEN; + Index[] result = indexNameExpressionResolver.concreteIndices( + state, + indicesOptions, + DataStreamOptions.INCLUDE_FAILURE_STORE, + true + ); + assertThat(result.length, equalTo(5)); + List indexNames = Arrays.stream(result).map(Index::getName).toList(); + assertThat( + indexNames, + containsInAnyOrder( + DataStream.getDefaultBackingIndexName(dataStreamName, 2, epochMillis), + DataStream.getDefaultBackingIndexName(dataStreamName, 1, epochMillis), + DataStream.getDefaultFailureStoreName(dataStreamName, 2, epochMillis), + DataStream.getDefaultFailureStoreName(dataStreamName, 1, epochMillis), + otherIndex.getIndex().getName() + ) + ); + } + + // Test only failure store without any expressions + { + IndicesOptions indicesOptions = IndicesOptions.STRICT_EXPAND_OPEN; + Index[] result = indexNameExpressionResolver.concreteIndices(state, indicesOptions, DataStreamOptions.ONLY_FAILURE_STORE, true); + assertThat(result.length, equalTo(2)); + List indexNames = Arrays.stream(result).map(Index::getName).toList(); + assertThat( + indexNames, + containsInAnyOrder( + DataStream.getDefaultFailureStoreName(dataStreamName, 2, epochMillis), + DataStream.getDefaultFailureStoreName(dataStreamName, 1, epochMillis) + ) + ); + } + + // Test default with wildcard expression + { + IndicesOptions indicesOptions = IndicesOptions.STRICT_EXPAND_OPEN; + Index[] result = indexNameExpressionResolver.concreteIndices(state, indicesOptions, true, "my-*"); + assertThat(result.length, equalTo(3)); + List indexNames = Arrays.stream(result).map(Index::getName).toList(); + assertThat( + indexNames, + containsInAnyOrder( + DataStream.getDefaultBackingIndexName(dataStreamName, 2, epochMillis), + DataStream.getDefaultBackingIndexName(dataStreamName, 1, epochMillis), + otherIndex.getIndex().getName() + ) + ); + } + + // Test include failure store with wildcard expression + { + IndicesOptions indicesOptions = IndicesOptions.STRICT_EXPAND_OPEN; + Index[] result = indexNameExpressionResolver.concreteIndices( + state, + indicesOptions, + DataStreamOptions.INCLUDE_FAILURE_STORE, + true, + "my-*" + ); + assertThat(result.length, equalTo(5)); + List indexNames = Arrays.stream(result).map(Index::getName).toList(); + assertThat( + indexNames, + containsInAnyOrder( + DataStream.getDefaultBackingIndexName(dataStreamName, 2, epochMillis), + DataStream.getDefaultBackingIndexName(dataStreamName, 1, epochMillis), + DataStream.getDefaultFailureStoreName(dataStreamName, 2, epochMillis), + DataStream.getDefaultFailureStoreName(dataStreamName, 1, epochMillis), + otherIndex.getIndex().getName() + ) + ); + } + + // Test only failure store with wildcard expression + { + IndicesOptions indicesOptions = IndicesOptions.STRICT_EXPAND_OPEN; + Index[] result = indexNameExpressionResolver.concreteIndices( + state, + indicesOptions, + DataStreamOptions.ONLY_FAILURE_STORE, + true, + "my-*" + ); + assertThat(result.length, equalTo(2)); + List indexNames = Arrays.stream(result).map(Index::getName).toList(); + assertThat( + indexNames, + containsInAnyOrder( + DataStream.getDefaultFailureStoreName(dataStreamName, 2, epochMillis), + DataStream.getDefaultFailureStoreName(dataStreamName, 1, epochMillis) + ) + ); + } } public void testDataStreamAliases() {