diff --git a/modules/data-streams/src/javaRestTest/java/org/elasticsearch/datastreams/FailureStoreQueryParamIT.java b/modules/data-streams/src/javaRestTest/java/org/elasticsearch/datastreams/FailureStoreQueryParamIT.java new file mode 100644 index 0000000000000..1aee1e5ecc7da --- /dev/null +++ b/modules/data-streams/src/javaRestTest/java/org/elasticsearch/datastreams/FailureStoreQueryParamIT.java @@ -0,0 +1,184 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.datastreams; + +import org.elasticsearch.client.Request; +import org.elasticsearch.client.Response; +import org.hamcrest.MatcherAssert; +import org.junit.Before; + +import java.io.IOException; +import java.util.List; +import java.util.Map; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; + +/** + * This should be a yaml test, but in order to write one we would need to expose the new parameter in the rest-api-spec. + * We do not want to do that until the feature flag is removed. For this reason, we temporarily, test the affected APIs here. + * Please convert this to a yaml test when the feature flag is removed. + */ +public class FailureStoreQueryParamIT extends DisabledSecurityDataStreamTestCase { + + private static final String DATA_STREAM_NAME = "failure-data-stream"; + private String backingIndex; + private String failureStoreIndex; + + @SuppressWarnings("unchecked") + @Before + public void setup() throws IOException { + Request putComposableIndexTemplateRequest = new Request("POST", "/_index_template/ds-template"); + putComposableIndexTemplateRequest.setJsonEntity(""" + { + "index_patterns": ["failure-data-stream"], + "template": { + "settings": { + "number_of_replicas": 0 + } + }, + "data_stream": { + "failure_store": true + } + } + """); + assertOK(client().performRequest(putComposableIndexTemplateRequest)); + + assertOK(client().performRequest(new Request("PUT", "/_data_stream/" + DATA_STREAM_NAME))); + ensureGreen(DATA_STREAM_NAME); + + final Response dataStreamResponse = client().performRequest(new Request("GET", "/_data_stream/" + DATA_STREAM_NAME)); + List dataStreams = (List) entityAsMap(dataStreamResponse).get("data_streams"); + MatcherAssert.assertThat(dataStreams.size(), is(1)); + Map dataStream = (Map) dataStreams.get(0); + MatcherAssert.assertThat(dataStream.get("name"), equalTo(DATA_STREAM_NAME)); + List backingIndices = getBackingIndices(dataStream); + MatcherAssert.assertThat(backingIndices.size(), is(1)); + List failureStore = getFailureStore(dataStream); + MatcherAssert.assertThat(failureStore.size(), is(1)); + backingIndex = backingIndices.get(0); + failureStoreIndex = failureStore.get(0); + } + + public void testGetIndexApi() throws IOException { + { + final Response indicesResponse = client().performRequest(new Request("GET", "/" + DATA_STREAM_NAME)); + Map indices = entityAsMap(indicesResponse); + MatcherAssert.assertThat(indices.size(), is(2)); + MatcherAssert.assertThat(indices.containsKey(backingIndex), is(true)); + MatcherAssert.assertThat(indices.containsKey(failureStoreIndex), is(true)); + } + { + final Response indicesResponse = client().performRequest(new Request("GET", "/" + DATA_STREAM_NAME + "?failure_store=exclude")); + Map indices = entityAsMap(indicesResponse); + MatcherAssert.assertThat(indices.size(), is(1)); + MatcherAssert.assertThat(indices.containsKey(backingIndex), is(true)); + } + { + final Response indicesResponse = client().performRequest(new Request("GET", "/" + DATA_STREAM_NAME + "?failure_store=only")); + Map indices = entityAsMap(indicesResponse); + MatcherAssert.assertThat(indices.size(), is(1)); + MatcherAssert.assertThat(indices.containsKey(failureStoreIndex), is(true)); + } + } + + @SuppressWarnings("unchecked") + public void testGetIndexStatsApi() throws IOException { + { + final Response indicesResponse = client().performRequest(new Request("GET", "/" + DATA_STREAM_NAME + "/_stats")); + Map indices = (Map) entityAsMap(indicesResponse).get("indices"); + MatcherAssert.assertThat(indices.size(), is(1)); + MatcherAssert.assertThat(indices.containsKey(backingIndex), is(true)); + } + { + final Response indicesResponse = client().performRequest( + new Request("GET", "/" + DATA_STREAM_NAME + "/_stats?failure_store=include") + ); + Map indices = (Map) entityAsMap(indicesResponse).get("indices"); + MatcherAssert.assertThat(indices.size(), is(2)); + MatcherAssert.assertThat(indices.containsKey(backingIndex), is(true)); + MatcherAssert.assertThat(indices.containsKey(failureStoreIndex), is(true)); + } + { + final Response indicesResponse = client().performRequest( + new Request("GET", "/" + DATA_STREAM_NAME + "/_stats?failure_store=only") + ); + Map indices = (Map) entityAsMap(indicesResponse).get("indices"); + MatcherAssert.assertThat(indices.size(), is(1)); + MatcherAssert.assertThat(indices.containsKey(failureStoreIndex), is(true)); + } + } + + public void testGetIndexSettingsApi() throws IOException { + { + final Response indicesResponse = client().performRequest(new Request("GET", "/" + DATA_STREAM_NAME + "/_settings")); + Map indices = entityAsMap(indicesResponse); + MatcherAssert.assertThat(indices.size(), is(1)); + MatcherAssert.assertThat(indices.containsKey(backingIndex), is(true)); + } + { + final Response indicesResponse = client().performRequest( + new Request("GET", "/" + DATA_STREAM_NAME + "/_settings?failure_store=include") + ); + Map indices = entityAsMap(indicesResponse); + MatcherAssert.assertThat(indices.size(), is(2)); + MatcherAssert.assertThat(indices.containsKey(backingIndex), is(true)); + MatcherAssert.assertThat(indices.containsKey(failureStoreIndex), is(true)); + } + { + final Response indicesResponse = client().performRequest( + new Request("GET", "/" + DATA_STREAM_NAME + "/_settings?failure_store=only") + ); + Map indices = entityAsMap(indicesResponse); + MatcherAssert.assertThat(indices.size(), is(1)); + MatcherAssert.assertThat(indices.containsKey(failureStoreIndex), is(true)); + } + } + + public void testGetIndexMappingApi() throws IOException { + { + final Response indicesResponse = client().performRequest(new Request("GET", "/" + DATA_STREAM_NAME + "/_mapping")); + Map indices = entityAsMap(indicesResponse); + MatcherAssert.assertThat(indices.size(), is(1)); + MatcherAssert.assertThat(indices.containsKey(backingIndex), is(true)); + } + { + final Response indicesResponse = client().performRequest( + new Request("GET", "/" + DATA_STREAM_NAME + "/_mapping?failure_store=include") + ); + Map indices = entityAsMap(indicesResponse); + MatcherAssert.assertThat(indices.size(), is(2)); + MatcherAssert.assertThat(indices.containsKey(backingIndex), is(true)); + MatcherAssert.assertThat(indices.containsKey(failureStoreIndex), is(true)); + } + { + final Response indicesResponse = client().performRequest( + new Request("GET", "/" + DATA_STREAM_NAME + "/_mapping?failure_store=only") + ); + Map indices = entityAsMap(indicesResponse); + MatcherAssert.assertThat(indices.size(), is(1)); + MatcherAssert.assertThat(indices.containsKey(failureStoreIndex), is(true)); + } + } + + private List getBackingIndices(Map response) { + return getIndices(response, "indices"); + } + + private List getFailureStore(Map response) { + return getIndices(response, "failure_indices"); + + } + + @SuppressWarnings("unchecked") + private List getIndices(Map response, String fieldName) { + List> indices = (List>) response.get(fieldName); + return indices.stream().map(index -> index.get("index_name")).toList(); + } +} diff --git a/server/src/main/java/org/elasticsearch/TransportVersions.java b/server/src/main/java/org/elasticsearch/TransportVersions.java index 7b3ca0e2f069a..ab8e811237607 100644 --- a/server/src/main/java/org/elasticsearch/TransportVersions.java +++ b/server/src/main/java/org/elasticsearch/TransportVersions.java @@ -191,6 +191,7 @@ static TransportVersion def(int id) { public static final TransportVersion NESTED_KNN_MORE_INNER_HITS = def(8_577_00_0); public static final TransportVersion REQUIRE_DATA_STREAM_ADDED = def(8_578_00_0); public static final TransportVersion ML_INFERENCE_COHERE_EMBEDDINGS_ADDED = def(8_579_00_0); + public static final TransportVersion ADD_FAILURE_STORE_OPTIONS = def(8_579_00_0); /* * STOP! READ THIS FIRST! No, really, diff --git a/server/src/main/java/org/elasticsearch/action/support/IndicesOptions.java b/server/src/main/java/org/elasticsearch/action/support/IndicesOptions.java index 5a168e1695e55..0aefe5ed65c1f 100644 --- a/server/src/main/java/org/elasticsearch/action/support/IndicesOptions.java +++ b/server/src/main/java/org/elasticsearch/action/support/IndicesOptions.java @@ -10,6 +10,7 @@ import joptsimple.internal.Strings; import org.elasticsearch.ElasticsearchParseException; +import org.elasticsearch.TransportVersions; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; @@ -44,9 +45,12 @@ * @param generalOptions, applies to all the resolved indices and defines if throttled will be included and if certain type of * aliases or indices are allowed, or they will throw an error. */ -public record IndicesOptions(ConcreteTargetOptions concreteTargetOptions, WildcardOptions wildcardOptions, GeneralOptions generalOptions) - implements - ToXContentFragment { +public record IndicesOptions( + ConcreteTargetOptions concreteTargetOptions, + WildcardOptions wildcardOptions, + GeneralOptions generalOptions, + FailureStoreOptions failureStoreOptions +) implements ToXContentFragment { public static IndicesOptions.Builder newBuilder() { return new Builder(); @@ -381,6 +385,73 @@ public static Builder newBuilder(GeneralOptions generalOptions) { } } + public record FailureStoreOptions(boolean includeBackingIndices, boolean includeFailureIndices) implements ToXContentFragment { + + public static final FailureStoreOptions DEFAULT = newBuilder().build(); + + public static Builder newBuilder() { + return new Builder(); + } + + public static Builder newBuilder(FailureStoreOptions failureStoreOptions) { + return new Builder(failureStoreOptions); + } + + public static FailureStoreOptions fromRequest(RestRequest request, FailureStoreOptions defaultOption) { + String failureStoreString = request.param("failure_store"); + if (failureStoreString == null) { + return defaultOption == null ? DEFAULT : defaultOption; + } + return switch (failureStoreString) { + case "only" -> new FailureStoreOptions(false, true); + case "exclude" -> DEFAULT; + case "include" -> new FailureStoreOptions(true, true); + default -> throw new IllegalArgumentException( + "parameter [failure_store] does not support value [" + failureStoreString + "]" + ); + }; + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + String value; + if (includeFailureIndices && includeBackingIndices == false) { + value = "only"; + } else if (includeFailureIndices) { + value = "include"; + } else { + value = "exclude"; + } + return builder.field("failure_store", value); + } + + public static class Builder { + private boolean includeBackingIndices; + private boolean includeFailureIndices; + + Builder() {} + + Builder(FailureStoreOptions failureStoreOptions) { + includeBackingIndices = failureStoreOptions.includeBackingIndices(); + includeFailureIndices = failureStoreOptions.includeFailureIndices(); + } + + public Builder includeBackingIndices(boolean includeBackingIndices) { + this.includeBackingIndices = includeBackingIndices; + return this; + } + + public Builder includeFailureIndices(boolean includeFailureIndices) { + this.includeFailureIndices = includeFailureIndices; + return this; + } + + public FailureStoreOptions build() { + return new FailureStoreOptions(includeBackingIndices, includeFailureIndices); + } + } + } + /** * This class is maintained for backwards compatibility and performance purposes. We use it for serialisation along with {@link Option}. */ @@ -411,7 +482,9 @@ private enum Option { ERROR_WHEN_ALIASES_TO_MULTIPLE_INDICES, ERROR_WHEN_CLOSED_INDICES, - EXCLUDE_THROTTLED; + EXCLUDE_THROTTLED, + EXCLUDE_BACKING_INDICES, + INCLUDE_FAILURE_STORE; } private static final DeprecationLogger DEPRECATION_LOGGER = DeprecationLogger.getLogger(IndicesOptions.class); @@ -539,6 +612,14 @@ public boolean ignoreThrottled() { return generalOptions().removeThrottled(); } + public boolean includeBackingIndices() { + return failureStoreOptions().includeBackingIndices(); + } + + public boolean includeFailureIndices() { + return failureStoreOptions().includeFailureIndices(); + } + public void writeIndicesOptions(StreamOutput out) throws IOException { EnumSet