From 604ffea97c9d294eaa6f6f827d3dddf8e0a9c13b Mon Sep 17 00:00:00 2001 From: Dan Hermann Date: Wed, 13 Oct 2021 11:51:32 -0500 Subject: [PATCH 01/13] REST endpoint for data stream modification service --- .../api/indices.modify_data_stream.json | 27 +++ .../cluster/metadata/DataStreamAction.java | 170 ++++++++++++++---- .../metadata/MetadataDataStreamsService.java | 90 ++++++++-- .../java/org/elasticsearch/node/Node.java | 3 + .../metadata/DataStreamActionTests.java | 50 ++++++ .../core/action/ModifyDataStreamsAction.java | 21 +++ .../xpack/datastreams/DataStreamsPlugin.java | 10 +- .../ModifyDataStreamTransportAction.java | 68 +++++++ .../rest/RestModifyDataStreamAction.java | 50 ++++++ 9 files changed, 438 insertions(+), 51 deletions(-) create mode 100644 rest-api-spec/src/main/resources/rest-api-spec/api/indices.modify_data_stream.json create mode 100644 server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamActionTests.java create mode 100644 x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/action/ModifyDataStreamsAction.java create mode 100644 x-pack/plugin/data-streams/src/main/java/org/elasticsearch/xpack/datastreams/action/ModifyDataStreamTransportAction.java create mode 100644 x-pack/plugin/data-streams/src/main/java/org/elasticsearch/xpack/datastreams/rest/RestModifyDataStreamAction.java diff --git a/rest-api-spec/src/main/resources/rest-api-spec/api/indices.modify_data_stream.json b/rest-api-spec/src/main/resources/rest-api-spec/api/indices.modify_data_stream.json new file mode 100644 index 000000000000..7a68b8abbe07 --- /dev/null +++ b/rest-api-spec/src/main/resources/rest-api-spec/api/indices.modify_data_stream.json @@ -0,0 +1,27 @@ +{ + "indices.modify_data_stream":{ + "documentation":{ + "url":"https://www.elastic.co/guide/en/elasticsearch/reference/master/data-streams.html", + "description":"Creates a data stream" + }, + "stability":"stable", + "visibility":"public", + "headers":{ + "accept": [ "application/json"] + }, + "url":{ + "paths":[ + { + "path":"/_data_stream/_modify", + "methods":["POST"] + } + ] + }, + "params":{ + }, + "body":{ + "description":"The data stream modifications", + "required":true + } + } +} diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/DataStreamAction.java b/server/src/main/java/org/elasticsearch/cluster/metadata/DataStreamAction.java index 63e9d0e10ed0..08d4d96b056e 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/DataStreamAction.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/DataStreamAction.java @@ -9,73 +9,173 @@ package org.elasticsearch.cluster.metadata; import org.elasticsearch.common.Strings; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.xcontent.ConstructingObjectParser; +import org.elasticsearch.xcontent.ObjectParser; +import org.elasticsearch.xcontent.ParseField; +import org.elasticsearch.xcontent.ToXContentObject; +import org.elasticsearch.xcontent.XContentBuilder; +import org.elasticsearch.xcontent.XContentParser; + +import java.io.IOException; +import java.util.function.Supplier; + +import static org.elasticsearch.xcontent.ConstructingObjectParser.optionalConstructorArg; /** - * Operations on data streams + * Operations on data streams. Currently supports adding and removing backing indices. */ -public abstract class DataStreamAction { - private final String dataStream; +public class DataStreamAction implements Writeable, ToXContentObject { + + private static final ParseField DATA_STREAM = new ParseField("data_stream"); + private static final ParseField INDEX = new ParseField("index"); + + private static final ParseField ADD_BACKING_INDEX = new ParseField("add_backing_index"); + private static final ParseField REMOVE_BACKING_INDEX = new ParseField("remove_backing_index"); + + public enum Type { + ADD_BACKING_INDEX((byte) 0, DataStreamAction.ADD_BACKING_INDEX), + REMOVE_BACKING_INDEX((byte) 1, DataStreamAction.REMOVE_BACKING_INDEX); + + private final byte value; + private final String fieldName; + + Type(byte value, ParseField field) { + this.value = value; + this.fieldName = field.getPreferredName(); + } + + public byte value() { + return value; + } + + public static Type fromValue(byte value) { + switch (value) { + case 0: return ADD_BACKING_INDEX; + case 1: return REMOVE_BACKING_INDEX; + default: throw new IllegalArgumentException("no data stream action type for [" + value + "]"); + } + } + } + + private final Type type; + private String dataStream; + private String index; public static DataStreamAction addBackingIndex(String dataStream, String index) { - return new DataStreamAction.AddBackingIndex(dataStream, index); + return new DataStreamAction(Type.ADD_BACKING_INDEX, dataStream, index); } public static DataStreamAction removeBackingIndex(String dataStream, String index) { - return new DataStreamAction.RemoveBackingIndex(dataStream, index); + return new DataStreamAction(Type.REMOVE_BACKING_INDEX, dataStream, index); + } + + DataStreamAction(StreamInput in) throws IOException { + this.type = Type.fromValue(in.readByte()); + this.dataStream = in.readString(); + this.index = in.readString(); } - private DataStreamAction(String dataStream) { + private DataStreamAction(Type type, String dataStream, String index) { if (false == Strings.hasText(dataStream)) { throw new IllegalArgumentException("[data_stream] is required"); } + if (false == Strings.hasText(index)) { + throw new IllegalArgumentException("[index] is required"); + } + this.type = type; this.dataStream = dataStream; + this.index = index; + } + + DataStreamAction(Type type) { + this.type = type; } - /** - * Data stream on which the operation should act - */ public String getDataStream() { return dataStream; } - public static class AddBackingIndex extends DataStreamAction { - - private final String index; + public void setDataStream(String datastream) { + this.dataStream = datastream; + } - private AddBackingIndex(String dataStream, String index) { - super(dataStream); + public String getIndex() { + return index; + } - if (false == Strings.hasText(index)) { - throw new IllegalArgumentException("[index] is required"); - } + public void setIndex(String index) { + this.index = index; + } - this.index = index; - } + public Type getType() { + return type; + } - public String getIndex() { - return index; - } + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.startObject(type.fieldName); + builder.field(DATA_STREAM.getPreferredName(), dataStream); + builder.field(INDEX.getPreferredName(), index); + builder.endObject(); + builder.endObject(); + return builder; + } + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeByte(type.value()); + out.writeString(dataStream); + out.writeString(index); } - public static class RemoveBackingIndex extends DataStreamAction { + public static DataStreamAction fromXContent(XContentParser parser) throws IOException { + return PARSER.apply(parser, null); + } - private final String index; + private static final ObjectParser ADD_BACKING_INDEX_PARSER = parser( + ADD_BACKING_INDEX.getPreferredName(), + () -> new DataStreamAction(Type.ADD_BACKING_INDEX) + ); + private static final ObjectParser REMOVE_BACKING_INDEX_PARSER = parser( + REMOVE_BACKING_INDEX.getPreferredName(), + () -> new DataStreamAction(Type.REMOVE_BACKING_INDEX) + ); + static { + ADD_BACKING_INDEX_PARSER.declareField(DataStreamAction::setDataStream, XContentParser::text, DATA_STREAM, + ObjectParser.ValueType.STRING); + ADD_BACKING_INDEX_PARSER.declareField(DataStreamAction::setIndex, XContentParser::text, INDEX, ObjectParser.ValueType.STRING); + REMOVE_BACKING_INDEX_PARSER.declareField(DataStreamAction::setDataStream, XContentParser::text, DATA_STREAM, + ObjectParser.ValueType.STRING); + REMOVE_BACKING_INDEX_PARSER.declareField(DataStreamAction::setIndex, XContentParser::text, INDEX, ObjectParser.ValueType.STRING); + } - private RemoveBackingIndex(String dataStream, String index) { - super(dataStream); + private static ObjectParser parser(String name, Supplier supplier) { + ObjectParser parser = new ObjectParser<>(name, supplier); + return parser; + } - if (false == Strings.hasText(index)) { - throw new IllegalArgumentException("[index] is required"); + public static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>( + "data_stream_action", a -> { + // Take the first action and error if there is more than one action + DataStreamAction action = null; + for (Object o : a) { + if (o != null) { + if (action == null) { + action = (DataStreamAction) o; + } else { + throw new IllegalArgumentException("too many data stream operations declared on operation entry"); + } } - - this.index = index; - } - - public String getIndex() { - return index; } - + return action; + }); + static { + PARSER.declareObject(optionalConstructorArg(), ADD_BACKING_INDEX_PARSER, ADD_BACKING_INDEX); + PARSER.declareObject(optionalConstructorArg(), REMOVE_BACKING_INDEX_PARSER, REMOVE_BACKING_INDEX); } } diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataDataStreamsService.java b/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataDataStreamsService.java index 5dff916955b0..428643243d97 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataDataStreamsService.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataDataStreamsService.java @@ -9,21 +9,33 @@ package org.elasticsearch.cluster.metadata; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.IndicesRequest; +import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.action.support.master.AcknowledgedRequest; import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.cluster.AckedClusterStateUpdateTask; import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.ack.ClusterStateUpdateRequest; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Priority; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.indices.IndicesService; +import org.elasticsearch.xcontent.ObjectParser; +import org.elasticsearch.xcontent.ParseField; +import org.elasticsearch.xcontent.ToXContentObject; +import org.elasticsearch.xcontent.XContentBuilder; import java.io.IOException; +import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.function.Function; +import static org.elasticsearch.action.ValidateActions.addValidationError; + /** * Handles data stream modification requests. */ @@ -37,9 +49,9 @@ public MetadataDataStreamsService(ClusterService clusterService, IndicesService this.indicesService = indicesService; } - public void updateBackingIndices(final ModifyDataStreamRequest request, - final ActionListener listener) { - if (request.actions().size() == 0) { + public void modifyDataStream(final ModifyDataStreamRequest request, + final ActionListener listener) { + if (request.getActions().size() == 0) { listener.onResponse(AcknowledgedResponse.TRUE); } else { clusterService.submitStateUpdateTask("update-backing-indices", @@ -48,7 +60,7 @@ public void updateBackingIndices(final ModifyDataStreamRequest request, public ClusterState execute(ClusterState currentState) { return modifyDataStream( currentState, - request.actions(), + request.getActions(), indexMetadata -> { try { return indicesService.createIndexMapperService(indexMetadata); @@ -66,7 +78,7 @@ public ClusterState execute(ClusterState currentState) { * Computes the resulting cluster state after applying all requested data stream modifications in order. * * @param currentState current cluster state - * @param actions ordered list of modifications to perform + * @param actions ordered list of modifications to perform * @return resulting cluster state after all modifications have been performed */ static ClusterState modifyDataStream( @@ -78,20 +90,20 @@ static ClusterState modifyDataStream( for (var action : actions) { Metadata.Builder builder = Metadata.builder(updatedMetadata); - if (action instanceof DataStreamAction.AddBackingIndex) { + if (action.getType() == DataStreamAction.Type.ADD_BACKING_INDEX) { addBackingIndex( updatedMetadata, builder, mapperSupplier, action.getDataStream(), - ((DataStreamAction.AddBackingIndex) action).getIndex() + action.getIndex() ); - } else if (action instanceof DataStreamAction.RemoveBackingIndex) { + } else if (action.getType() == DataStreamAction.Type.REMOVE_BACKING_INDEX) { removeBackingIndex( updatedMetadata, builder, action.getDataStream(), - ((DataStreamAction.RemoveBackingIndex) action).getIndex() + action.getIndex() ); } else { throw new IllegalStateException("unsupported data stream action type [" + action.getClass().getName() + "]"); @@ -155,17 +167,67 @@ private static IndexAbstraction validateIndex(Metadata metadata, String indexNam return index; } - public static final class ModifyDataStreamRequest extends ClusterStateUpdateRequest { + public static final class ModifyDataStreamRequest + extends AcknowledgedRequest + implements IndicesRequest, ToXContentObject { + + private static final IndicesOptions INDICES_OPTIONS = IndicesOptions.fromOptions(false, false, true, false, true, false, true, false); + + private List actions; - private final List actions; + public ModifyDataStreamRequest() {} + + public ModifyDataStreamRequest(StreamInput in) throws IOException { + super(in); + actions = in.readList(DataStreamAction::new); + } public ModifyDataStreamRequest(List actions) { this.actions = Collections.unmodifiableList(actions); } - public List actions() { + public List getActions() { return actions; } - } + public void setActions(List actions) { + this.actions = Collections.unmodifiableList(actions); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.startArray("actions"); + for (DataStreamAction action : actions) { + action.toXContent(builder, params); + } + builder.endArray(); + builder.endObject(); + return builder; + } + + @Override + public ActionRequestValidationException validate() { + if (actions.isEmpty()) { + return addValidationError("must specify at least one data stream modification action", null); + } + return null; + } + + public static final ObjectParser PARSER = + new ObjectParser<>("data_stream_actions", ModifyDataStreamRequest::new); + static { + PARSER.declareObjectArray(ModifyDataStreamRequest::setActions, DataStreamAction.PARSER, new ParseField("actions")); + } + + @Override + public String[] indices() { + return actions.stream().map(DataStreamAction::getDataStream).toArray(String[]::new); + } + + @Override + public IndicesOptions indicesOptions() { + return INDICES_OPTIONS; + } + } } diff --git a/server/src/main/java/org/elasticsearch/node/Node.java b/server/src/main/java/org/elasticsearch/node/Node.java index 45211fdb917b..d3eeb8e970df 100644 --- a/server/src/main/java/org/elasticsearch/node/Node.java +++ b/server/src/main/java/org/elasticsearch/node/Node.java @@ -45,6 +45,7 @@ import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.cluster.metadata.MetadataCreateDataStreamService; import org.elasticsearch.cluster.metadata.MetadataCreateIndexService; +import org.elasticsearch.cluster.metadata.MetadataDataStreamsService; import org.elasticsearch.cluster.metadata.SystemIndexMetadataUpgradeService; import org.elasticsearch.cluster.metadata.TemplateUpgradeService; import org.elasticsearch.cluster.node.DiscoveryNode; @@ -563,6 +564,7 @@ protected Node(final Environment initialEnvironment, final MetadataCreateDataStreamService metadataCreateDataStreamService = new MetadataCreateDataStreamService(threadPool, clusterService, metadataCreateIndexService); + final MetadataDataStreamsService metadataDataStreamsService = new MetadataDataStreamsService(clusterService, indicesService); Collection pluginComponents = pluginsService.filterPlugins(Plugin.class).stream() .flatMap(p -> p.createComponents(client, clusterService, threadPool, resourceWatcherService, @@ -694,6 +696,7 @@ protected Node(final Environment initialEnvironment, b.bind(AliasValidator.class).toInstance(aliasValidator); b.bind(MetadataCreateIndexService.class).toInstance(metadataCreateIndexService); b.bind(MetadataCreateDataStreamService.class).toInstance(metadataCreateDataStreamService); + b.bind(MetadataDataStreamsService.class).toInstance(metadataDataStreamsService); b.bind(SearchService.class).toInstance(searchService); b.bind(SearchTransportService.class).toInstance(searchTransportService); b.bind(SearchPhaseController.class).toInstance(new SearchPhaseController(searchService::aggReduceContextBuilder)); diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamActionTests.java b/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamActionTests.java new file mode 100644 index 000000000000..3509cdcb5959 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamActionTests.java @@ -0,0 +1,50 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.cluster.metadata; + +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.xcontent.ToXContent; +import org.elasticsearch.xcontent.XContentParser; +import org.elasticsearch.xcontent.XContentType; + +import java.io.IOException; + +import static org.hamcrest.CoreMatchers.equalTo; + +public class DataStreamActionTests extends ESTestCase { + + public void testToAndFromXContent() throws IOException { + DataStreamAction action = createTestInstance(); + XContentType xContentType = randomFrom(XContentType.values()); + + BytesReference shuffled = toShuffledXContent(action, xContentType, ToXContent.EMPTY_PARAMS, true); + + DataStreamAction parsedAction; + try (XContentParser parser = createParser(xContentType.xContent(), shuffled)) { + parsedAction = DataStreamAction.fromXContent(parser); + assertNull(parser.nextToken()); + } + + assertThat(parsedAction.getType(), equalTo(action.getType())); + assertThat(parsedAction.getDataStream(), equalTo(action.getDataStream())); + assertThat(parsedAction.getIndex(), equalTo(action.getIndex())); + } + + private DataStreamAction createTestInstance() { + DataStreamAction action = new DataStreamAction(randomBoolean() + ? DataStreamAction.Type.ADD_BACKING_INDEX + : DataStreamAction.Type.REMOVE_BACKING_INDEX + ); + action.setDataStream(randomAlphaOfLength(8)); + action.setIndex(randomAlphaOfLength(8)); + return action; + } + +} diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/action/ModifyDataStreamsAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/action/ModifyDataStreamsAction.java new file mode 100644 index 000000000000..0811ee2479fd --- /dev/null +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/action/ModifyDataStreamsAction.java @@ -0,0 +1,21 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.core.action; + +import org.elasticsearch.action.ActionType; +import org.elasticsearch.action.support.master.AcknowledgedResponse; + +public class ModifyDataStreamsAction extends ActionType { + + public static final ModifyDataStreamsAction INSTANCE = new ModifyDataStreamsAction(); + public static final String NAME = "indices:admin/data_stream/modify"; + + private ModifyDataStreamsAction() { + super(NAME, AcknowledgedResponse::readFrom); + } +} diff --git a/x-pack/plugin/data-streams/src/main/java/org/elasticsearch/xpack/datastreams/DataStreamsPlugin.java b/x-pack/plugin/data-streams/src/main/java/org/elasticsearch/xpack/datastreams/DataStreamsPlugin.java index 3c5e954e68ce..3e1337cc8f8d 100644 --- a/x-pack/plugin/data-streams/src/main/java/org/elasticsearch/xpack/datastreams/DataStreamsPlugin.java +++ b/x-pack/plugin/data-streams/src/main/java/org/elasticsearch/xpack/datastreams/DataStreamsPlugin.java @@ -26,6 +26,7 @@ import org.elasticsearch.xpack.core.action.DataStreamsStatsAction; import org.elasticsearch.xpack.core.action.DeleteDataStreamAction; import org.elasticsearch.xpack.core.action.GetDataStreamAction; +import org.elasticsearch.xpack.core.action.ModifyDataStreamsAction; import org.elasticsearch.xpack.core.action.PromoteDataStreamAction; import org.elasticsearch.xpack.core.action.XPackInfoFeatureAction; import org.elasticsearch.xpack.core.action.XPackUsageFeatureAction; @@ -36,12 +37,14 @@ import org.elasticsearch.xpack.datastreams.action.DeleteDataStreamTransportAction; import org.elasticsearch.xpack.datastreams.action.GetDataStreamsTransportAction; import org.elasticsearch.xpack.datastreams.action.MigrateToDataStreamTransportAction; +import org.elasticsearch.xpack.datastreams.action.ModifyDataStreamTransportAction; import org.elasticsearch.xpack.datastreams.action.PromoteDataStreamTransportAction; import org.elasticsearch.xpack.datastreams.rest.RestCreateDataStreamAction; import org.elasticsearch.xpack.datastreams.rest.RestDataStreamsStatsAction; import org.elasticsearch.xpack.datastreams.rest.RestDeleteDataStreamAction; import org.elasticsearch.xpack.datastreams.rest.RestGetDataStreamsAction; import org.elasticsearch.xpack.datastreams.rest.RestMigrateToDataStreamAction; +import org.elasticsearch.xpack.datastreams.rest.RestModifyDataStreamAction; import org.elasticsearch.xpack.datastreams.rest.RestPromoteDataStreamAction; import java.util.List; @@ -65,6 +68,7 @@ public Map getMetadataMappers() { var dsInfoAction = new ActionHandler<>(XPackInfoFeatureAction.DATA_STREAMS, DataStreamInfoTransportAction.class); var migrateAction = new ActionHandler<>(MigrateToDataStreamAction.INSTANCE, MigrateToDataStreamTransportAction.class); var promoteAction = new ActionHandler<>(PromoteDataStreamAction.INSTANCE, PromoteDataStreamTransportAction.class); + var modifyAction = new ActionHandler<>(ModifyDataStreamsAction.INSTANCE, ModifyDataStreamTransportAction.class); return List.of( createDsAction, deleteDsInfoAction, @@ -73,7 +77,8 @@ public Map getMetadataMappers() { dsUsageAction, dsInfoAction, migrateAction, - promoteAction + promoteAction, + modifyAction ); } @@ -93,6 +98,7 @@ public List getRestHandlers( var dsStatsAction = new RestDataStreamsStatsAction(); var migrateAction = new RestMigrateToDataStreamAction(); var promoteAction = new RestPromoteDataStreamAction(); - return List.of(createDsAction, deleteDsAction, getDsAction, dsStatsAction, migrateAction, promoteAction); + var modifyAction = new RestModifyDataStreamAction(); + return List.of(createDsAction, deleteDsAction, getDsAction, dsStatsAction, migrateAction, promoteAction, modifyAction); } } diff --git a/x-pack/plugin/data-streams/src/main/java/org/elasticsearch/xpack/datastreams/action/ModifyDataStreamTransportAction.java b/x-pack/plugin/data-streams/src/main/java/org/elasticsearch/xpack/datastreams/action/ModifyDataStreamTransportAction.java new file mode 100644 index 000000000000..d7b5ffe27c39 --- /dev/null +++ b/x-pack/plugin/data-streams/src/main/java/org/elasticsearch/xpack/datastreams/action/ModifyDataStreamTransportAction.java @@ -0,0 +1,68 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.datastreams.action; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.action.support.master.AcknowledgedTransportMasterNodeAction; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.block.ClusterBlockException; +import org.elasticsearch.cluster.block.ClusterBlockLevel; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.metadata.MetadataDataStreamsService; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.core.action.ModifyDataStreamsAction; + +public class ModifyDataStreamTransportAction + extends AcknowledgedTransportMasterNodeAction { + + private final MetadataDataStreamsService metadataDataStreamsService; + + @Inject + public ModifyDataStreamTransportAction( + TransportService transportService, + ClusterService clusterService, + ThreadPool threadPool, + ActionFilters actionFilters, + IndexNameExpressionResolver indexNameExpressionResolver, + MetadataDataStreamsService metadataDataStreamsService + ) { + super( + ModifyDataStreamsAction.NAME, + transportService, + clusterService, + threadPool, + actionFilters, + MetadataDataStreamsService.ModifyDataStreamRequest::new, + indexNameExpressionResolver, + ThreadPool.Names.SAME + ); + this.metadataDataStreamsService = metadataDataStreamsService; + } + + @Override + protected void masterOperation( + Task task, + MetadataDataStreamsService.ModifyDataStreamRequest request, + ClusterState state, + ActionListener listener + ) throws Exception { + metadataDataStreamsService.modifyDataStream(request, listener); + } + + @Override + protected ClusterBlockException checkBlock(MetadataDataStreamsService.ModifyDataStreamRequest request, ClusterState state) { + return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE); + } + +} diff --git a/x-pack/plugin/data-streams/src/main/java/org/elasticsearch/xpack/datastreams/rest/RestModifyDataStreamAction.java b/x-pack/plugin/data-streams/src/main/java/org/elasticsearch/xpack/datastreams/rest/RestModifyDataStreamAction.java new file mode 100644 index 000000000000..d0f856e53646 --- /dev/null +++ b/x-pack/plugin/data-streams/src/main/java/org/elasticsearch/xpack/datastreams/rest/RestModifyDataStreamAction.java @@ -0,0 +1,50 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.datastreams.rest; + +import org.elasticsearch.client.node.NodeClient; +import org.elasticsearch.cluster.metadata.MetadataDataStreamsService; +import org.elasticsearch.rest.BaseRestHandler; +import org.elasticsearch.rest.RestRequest; +import org.elasticsearch.rest.action.RestToXContentListener; +import org.elasticsearch.xcontent.XContentParser; +import org.elasticsearch.xpack.core.action.ModifyDataStreamsAction; + +import java.io.IOException; +import java.util.List; + +import static org.elasticsearch.rest.RestRequest.Method.POST; + +public class RestModifyDataStreamAction extends BaseRestHandler { + + @Override + public String getName() { + return "modify_data_stream_action"; + } + + @Override + public List routes() { + return List.of(new Route(POST, "/_data_stream/_modify")); + } + + @Override + protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException { + MetadataDataStreamsService.ModifyDataStreamRequest modifyDsRequest = new MetadataDataStreamsService.ModifyDataStreamRequest(); + modifyDsRequest.masterNodeTimeout(request.paramAsTime("master_timeout", modifyDsRequest.masterNodeTimeout())); + modifyDsRequest.timeout(request.paramAsTime("timeout", modifyDsRequest.timeout())); + try (XContentParser parser = request.contentParser()) { + MetadataDataStreamsService.ModifyDataStreamRequest.PARSER.parse(parser, modifyDsRequest, null); + } + if (modifyDsRequest.getActions() == null || modifyDsRequest.getActions().isEmpty()) { + throw new IllegalArgumentException("no data stream actions specified"); + } + return channel -> client.execute(ModifyDataStreamsAction.INSTANCE, modifyDsRequest, new RestToXContentListener<>(channel)); + } + +} + From 62c067c2c811c7290491880e3121b4a46e312a60 Mon Sep 17 00:00:00 2001 From: Dan Hermann Date: Fri, 15 Oct 2021 12:47:05 -0500 Subject: [PATCH 02/13] no one defies the checkstyle gods --- .../datastreams/action/ModifyDataStreamTransportAction.java | 4 ++-- .../xpack/datastreams/rest/RestModifyDataStreamAction.java | 1 - 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/x-pack/plugin/data-streams/src/main/java/org/elasticsearch/xpack/datastreams/action/ModifyDataStreamTransportAction.java b/x-pack/plugin/data-streams/src/main/java/org/elasticsearch/xpack/datastreams/action/ModifyDataStreamTransportAction.java index d7b5ffe27c39..0e44366262a5 100644 --- a/x-pack/plugin/data-streams/src/main/java/org/elasticsearch/xpack/datastreams/action/ModifyDataStreamTransportAction.java +++ b/x-pack/plugin/data-streams/src/main/java/org/elasticsearch/xpack/datastreams/action/ModifyDataStreamTransportAction.java @@ -23,8 +23,8 @@ import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.core.action.ModifyDataStreamsAction; -public class ModifyDataStreamTransportAction - extends AcknowledgedTransportMasterNodeAction { +public class ModifyDataStreamTransportAction extends AcknowledgedTransportMasterNodeAction< + MetadataDataStreamsService.ModifyDataStreamRequest> { private final MetadataDataStreamsService metadataDataStreamsService; diff --git a/x-pack/plugin/data-streams/src/main/java/org/elasticsearch/xpack/datastreams/rest/RestModifyDataStreamAction.java b/x-pack/plugin/data-streams/src/main/java/org/elasticsearch/xpack/datastreams/rest/RestModifyDataStreamAction.java index d0f856e53646..3692dde598a2 100644 --- a/x-pack/plugin/data-streams/src/main/java/org/elasticsearch/xpack/datastreams/rest/RestModifyDataStreamAction.java +++ b/x-pack/plugin/data-streams/src/main/java/org/elasticsearch/xpack/datastreams/rest/RestModifyDataStreamAction.java @@ -47,4 +47,3 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli } } - From 8b40466f79bb9a6b9cb8697bf32a69bf6c374349 Mon Sep 17 00:00:00 2001 From: Dan Hermann Date: Fri, 15 Oct 2021 12:49:38 -0500 Subject: [PATCH 03/13] especially not in the server module! --- .../cluster/metadata/MetadataDataStreamsService.java | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataDataStreamsService.java b/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataDataStreamsService.java index 428643243d97..4512241b10f2 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataDataStreamsService.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataDataStreamsService.java @@ -18,7 +18,6 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Priority; -import org.elasticsearch.common.Strings; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.mapper.MapperService; @@ -29,7 +28,6 @@ import org.elasticsearch.xcontent.XContentBuilder; import java.io.IOException; -import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.function.Function; @@ -171,7 +169,8 @@ public static final class ModifyDataStreamRequest extends AcknowledgedRequest implements IndicesRequest, ToXContentObject { - private static final IndicesOptions INDICES_OPTIONS = IndicesOptions.fromOptions(false, false, true, false, true, false, true, false); + private static final IndicesOptions INDICES_OPTIONS = + IndicesOptions.fromOptions(false, false, true, false, true, false, true, false); private List actions; From f485e11bcc63e9bca15333a66ca90d0ce1b00739 Mon Sep 17 00:00:00 2001 From: Dan Hermann Date: Fri, 15 Oct 2021 14:15:56 -0500 Subject: [PATCH 04/13] Classify action as non-operator and add stub for YAML test --- .../xpack/security/operator/Constants.java | 1 + .../data_stream/170_modify_data_stream.yml | 58 +++++++++++++++++++ 2 files changed, 59 insertions(+) create mode 100644 x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/data_stream/170_modify_data_stream.yml diff --git a/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java b/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java index c82494caeaf7..bd9ebb2dfe57 100644 --- a/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java +++ b/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java @@ -354,6 +354,7 @@ public class Constants { "indices:admin/data_stream/delete", "indices:admin/data_stream/get", "indices:admin/data_stream/migrate", + "indices:admin/data_stream/modify", "indices:admin/data_stream/promote", "indices:admin/delete", "indices:admin/flush", diff --git a/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/data_stream/170_modify_data_stream.yml b/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/data_stream/170_modify_data_stream.yml new file mode 100644 index 000000000000..c56595e29cc9 --- /dev/null +++ b/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/data_stream/170_modify_data_stream.yml @@ -0,0 +1,58 @@ +--- +"Modify a data stream": + - skip: + version: " - 7.99.99" + reason: "change to 7.15.99 after backporting" + features: allowed_warnings + + - do: + allowed_warnings: + - "index template [my-template] has index patterns [data-*] matching patterns from existing older templates [global] with patterns (global => [*]); this template [my-template] will take precedence during new index creation" + indices.put_index_template: + name: my-template + body: + index_patterns: [data-*] + data_stream: {} + + - do: + indices.create_data_stream: + name: data-stream-for-rollover + - is_true: acknowledged + + # rollover data stream to create new backing index + - do: + indices.rollover: + alias: "data-stream-for-rollover" + + - match: { old_index: "/\\.ds-data-stream-for-rollover-(\\d{4}\\.\\d{2}\\.\\d{2}-)?000001/" } + - match: { new_index: "/\\.ds-data-stream-for-rollover-(\\d{4}\\.\\d{2}\\.\\d{2}-)?000002/" } + - match: { rolled_over: true } + - match: { dry_run: false } + + # save the backing index names for later use + - do: + indices.get_data_stream: + name: data-stream-for-rollover + - set: { data_streams.0.indices.0.index_name: idx0name } + + # ensure new index is created + - do: + indices.exists: + index: $idx0name + + - is_true: '' + + - do: + indices.get_data_stream: + name: "*" + - match: { data_streams.0.name: data-stream-for-rollover } + - match: { data_streams.0.timestamp_field.name: '@timestamp' } + - match: { data_streams.0.generation: 2 } + - length: { data_streams.0.indices: 2 } + - match: { data_streams.0.indices.0.index_name: '/\.ds-data-stream-for-rollover-(\d{4}\.\d{2}\.\d{2}-)?000001/' } + - match: { data_streams.0.indices.1.index_name: '/\.ds-data-stream-for-rollover-(\d{4}\.\d{2}\.\d{2}-)?000002/' } + + - do: + indices.delete_data_stream: + name: data-stream-for-rollover + - is_true: acknowledged From 2bfd2b45decfa8f287ec94931127b92a296f50a3 Mon Sep 17 00:00:00 2001 From: Dan Hermann Date: Fri, 15 Oct 2021 15:35:31 -0500 Subject: [PATCH 05/13] add a test and a comment for IndicesOptions --- .../metadata/MetadataDataStreamsService.java | 4 ++ .../MetadataDataStreamsServiceTests.java | 51 +++++++++++++++++++ 2 files changed, 55 insertions(+) diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataDataStreamsService.java b/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataDataStreamsService.java index 4512241b10f2..b40a11c017cc 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataDataStreamsService.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataDataStreamsService.java @@ -169,6 +169,10 @@ public static final class ModifyDataStreamRequest extends AcknowledgedRequest implements IndicesRequest, ToXContentObject { + // relevant only for authorizing the request, so require every specified + // index to exist, expand wildcards only to open indices, prohibit + // wildcard expressions that resolve to zero indices, and do not attempt + // to resolve expressions as aliases private static final IndicesOptions INDICES_OPTIONS = IndicesOptions.fromOptions(false, false, true, false, true, false, true, false); diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/MetadataDataStreamsServiceTests.java b/server/src/test/java/org/elasticsearch/cluster/metadata/MetadataDataStreamsServiceTests.java index 92206ce8e5ea..0c861e174452 100644 --- a/server/src/test/java/org/elasticsearch/cluster/metadata/MetadataDataStreamsServiceTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/metadata/MetadataDataStreamsServiceTests.java @@ -22,11 +22,13 @@ import java.util.Arrays; import java.util.Collection; import java.util.List; +import java.util.Locale; import java.util.stream.Collectors; import static org.elasticsearch.cluster.metadata.DataStreamTestHelper.createTimestampField; import static org.elasticsearch.cluster.metadata.DataStreamTestHelper.generateMapping; import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.notNullValue; @@ -143,6 +145,55 @@ public void testRemoveBackingIndex() { assertNull(newState.metadata().getIndicesLookup().get(indexToRemove.getIndex().getName()).getParentDataStream()); } + public void testRemoveWriteIndexIsProhibited() { + final long epochMillis = System.currentTimeMillis(); + final int numBackingIndices = randomIntBetween(1, 4); + final String dataStreamName = randomAlphaOfLength(5); + IndexMetadata[] backingIndices = new IndexMetadata[numBackingIndices]; + Metadata.Builder mb = Metadata.builder(); + for (int k = 0; k < numBackingIndices; k++) { + backingIndices[k] = + IndexMetadata.builder(DataStream.getDefaultBackingIndexName(dataStreamName, k + 1, epochMillis)) + .settings(Settings.builder().put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT)) + .numberOfShards(1) + .numberOfReplicas(0) + .putMapping(generateMapping("@timestamp")) + .build(); + mb.put(backingIndices[k], false); + } + + mb.put(new DataStream( + dataStreamName, + createTimestampField("@timestamp"), + Arrays.stream(backingIndices).map(IndexMetadata::getIndex).collect(Collectors.toList()) + ) + ); + + final IndexMetadata indexToRemove = backingIndices[numBackingIndices - 1]; + ClusterState originalState = ClusterState.builder(new ClusterName("dummy")).metadata(mb.build()).build(); + + IllegalArgumentException e = expectThrows( + IllegalArgumentException.class, + () -> MetadataDataStreamsService.modifyDataStream( + originalState, + List.of(DataStreamAction.removeBackingIndex(dataStreamName, indexToRemove.getIndex().getName())), + this::getMapperService + ) + ); + + assertThat( + e.getMessage(), + containsString( + String.format( + Locale.ROOT, + "cannot remove backing index [%s] of data stream [%s] because it is the write index", + indexToRemove.getIndex().getName(), + dataStreamName + ) + ) + ); + } + public void testAddRemoveAddRoundtripInSingleRequest() { final long epochMillis = System.currentTimeMillis(); final int numBackingIndices = randomIntBetween(1, 4); From 3c34705b803908195c69a1c4d635cb22ae31c312 Mon Sep 17 00:00:00 2001 From: Dan Hermann Date: Fri, 15 Oct 2021 16:41:53 -0500 Subject: [PATCH 06/13] finalize YAML test --- .../cluster/metadata/DataStream.java | 2 +- .../data_stream/170_modify_data_stream.yml | 70 ++++++++++++++----- 2 files changed, 52 insertions(+), 20 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java b/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java index 0fa3ebe26187..714d7850dbfa 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java @@ -214,7 +214,7 @@ public DataStream removeBackingIndex(Index index) { index.getName(), name )); } - if (generation == (backingIndexPosition + 1)) { + if (indices.size() == (backingIndexPosition + 1)) { throw new IllegalArgumentException(String.format( Locale.ROOT, "cannot remove backing index [%s] of data stream [%s] because it is the write index", diff --git a/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/data_stream/170_modify_data_stream.yml b/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/data_stream/170_modify_data_stream.yml index c56595e29cc9..0c20ebf16aa3 100644 --- a/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/data_stream/170_modify_data_stream.yml +++ b/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/data_stream/170_modify_data_stream.yml @@ -16,43 +16,75 @@ - do: indices.create_data_stream: - name: data-stream-for-rollover + name: data-stream-for-modification - is_true: acknowledged # rollover data stream to create new backing index - do: indices.rollover: - alias: "data-stream-for-rollover" + alias: "data-stream-for-modification" + - is_true: acknowledged - - match: { old_index: "/\\.ds-data-stream-for-rollover-(\\d{4}\\.\\d{2}\\.\\d{2}-)?000001/" } - - match: { new_index: "/\\.ds-data-stream-for-rollover-(\\d{4}\\.\\d{2}\\.\\d{2}-)?000002/" } - - match: { rolled_over: true } - - match: { dry_run: false } + # save index names for later use + - do: + indices.get_data_stream: + name: data-stream-for-modification + - set: { data_streams.0.indices.0.index_name: first_index } + - set: { data_streams.0.indices.1.index_name: write_index } + + - do: + index: + index: test_index1 + body: { "foo": "bar1", "@timestamp": "2009-11-15T14:12:12" } + + - do: + indices.modify_data_stream: + body: + actions: + - add_backing_index: + data_stream: "data-stream-for-modification" + index: "test_index1" + - is_true: acknowledged - # save the backing index names for later use - do: indices.get_data_stream: - name: data-stream-for-rollover - - set: { data_streams.0.indices.0.index_name: idx0name } + name: "data-stream-for-modification" + - match: { data_streams.0.name: data-stream-for-modification } + - match: { data_streams.0.timestamp_field.name: '@timestamp' } + - match: { data_streams.0.generation: 3 } + - length: { data_streams.0.indices: 3 } + - match: { data_streams.0.indices.0.index_name: 'test_index1' } + - match: { data_streams.0.indices.1.index_name: $first_index } + - match: { data_streams.0.indices.2.index_name: $write_index } - # ensure new index is created - do: - indices.exists: - index: $idx0name + catch: bad_request + indices.modify_data_stream: + body: + actions: + - remove_backing_index: + data_stream: "data-stream-for-modification" + index: $write_index - - is_true: '' + - do: + indices.modify_data_stream: + body: + actions: + - remove_backing_index: + data_stream: "data-stream-for-modification" + index: "test_index1" - do: indices.get_data_stream: - name: "*" - - match: { data_streams.0.name: data-stream-for-rollover } + name: "data-stream-for-modification" + - match: { data_streams.0.name: data-stream-for-modification } - match: { data_streams.0.timestamp_field.name: '@timestamp' } - - match: { data_streams.0.generation: 2 } + - match: { data_streams.0.generation: 3 } - length: { data_streams.0.indices: 2 } - - match: { data_streams.0.indices.0.index_name: '/\.ds-data-stream-for-rollover-(\d{4}\.\d{2}\.\d{2}-)?000001/' } - - match: { data_streams.0.indices.1.index_name: '/\.ds-data-stream-for-rollover-(\d{4}\.\d{2}\.\d{2}-)?000002/' } + - match: { data_streams.0.indices.0.index_name: $first_index } + - match: { data_streams.0.indices.1.index_name: $write_index } - do: indices.delete_data_stream: - name: data-stream-for-rollover + name: data-stream-for-modification - is_true: acknowledged From 4acb6125674c41dadae00fff7130ae7f77502ba8 Mon Sep 17 00:00:00 2001 From: Dan Hermann Date: Sat, 16 Oct 2021 06:01:49 -0500 Subject: [PATCH 07/13] more specific catch in test --- .../rest-api-spec/test/data_stream/170_modify_data_stream.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/data_stream/170_modify_data_stream.yml b/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/data_stream/170_modify_data_stream.yml index 0c20ebf16aa3..034c3701a2ec 100644 --- a/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/data_stream/170_modify_data_stream.yml +++ b/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/data_stream/170_modify_data_stream.yml @@ -58,7 +58,7 @@ - match: { data_streams.0.indices.2.index_name: $write_index } - do: - catch: bad_request + catch: /cannot remove backing index \[.*\] of data stream \[data-stream-for-modification\] because it is the write index/ indices.modify_data_stream: body: actions: From 9861b3718dd97ac5350efc756f0081353f45e892 Mon Sep 17 00:00:00 2001 From: Dan Hermann Date: Sat, 16 Oct 2021 11:08:23 -0500 Subject: [PATCH 08/13] move request class into action class, move all classes from xpack to server module, consistent naming --- .../elasticsearch/action/ActionModule.java | 9 ++ .../datastreams/ModifyDataStreamsAction.java | 107 ++++++++++++++++++ .../ModifyDataStreamsTransportAction.java | 20 ++-- .../cluster/metadata/DataStreamAction.java | 2 +- .../metadata/MetadataDataStreamsService.java | 84 +------------- .../RestModifyDataStreamsAction.java | 16 +-- .../core/action/ModifyDataStreamsAction.java | 21 ---- .../xpack/datastreams/DataStreamsPlugin.java | 10 +- 8 files changed, 139 insertions(+), 130 deletions(-) create mode 100644 server/src/main/java/org/elasticsearch/action/datastreams/ModifyDataStreamsAction.java rename x-pack/plugin/data-streams/src/main/java/org/elasticsearch/xpack/datastreams/action/ModifyDataStreamTransportAction.java => server/src/main/java/org/elasticsearch/action/datastreams/ModifyDataStreamsTransportAction.java (75%) rename x-pack/plugin/data-streams/src/main/java/org/elasticsearch/xpack/datastreams/rest/RestModifyDataStreamAction.java => server/src/main/java/org/elasticsearch/rest/action/datastreams/RestModifyDataStreamsAction.java (71%) delete mode 100644 x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/action/ModifyDataStreamsAction.java diff --git a/server/src/main/java/org/elasticsearch/action/ActionModule.java b/server/src/main/java/org/elasticsearch/action/ActionModule.java index 197f1c567a41..5f5793d4afcc 100644 --- a/server/src/main/java/org/elasticsearch/action/ActionModule.java +++ b/server/src/main/java/org/elasticsearch/action/ActionModule.java @@ -189,6 +189,8 @@ import org.elasticsearch.action.bulk.BulkAction; import org.elasticsearch.action.bulk.TransportBulkAction; import org.elasticsearch.action.bulk.TransportShardBulkAction; +import org.elasticsearch.action.datastreams.ModifyDataStreamsAction; +import org.elasticsearch.action.datastreams.ModifyDataStreamsTransportAction; import org.elasticsearch.action.delete.DeleteAction; import org.elasticsearch.action.delete.TransportDeleteAction; import org.elasticsearch.action.explain.ExplainAction; @@ -370,6 +372,7 @@ import org.elasticsearch.rest.action.cat.RestTasksAction; import org.elasticsearch.rest.action.cat.RestTemplatesAction; import org.elasticsearch.rest.action.cat.RestThreadPoolAction; +import org.elasticsearch.rest.action.datastreams.RestModifyDataStreamsAction; import org.elasticsearch.rest.action.document.RestBulkAction; import org.elasticsearch.rest.action.document.RestDeleteAction; import org.elasticsearch.rest.action.document.RestGetAction; @@ -599,6 +602,9 @@ public void reg actions.register(AnalyzeIndexDiskUsageAction.INSTANCE, TransportAnalyzeIndexDiskUsageAction.class); actions.register(FieldUsageStatsAction.INSTANCE, TransportFieldUsageAction.class); + //Data streams + actions.register(ModifyDataStreamsAction.INSTANCE, ModifyDataStreamsTransportAction.class); + //Indexed scripts actions.register(PutStoredScriptAction.INSTANCE, TransportPutStoredScriptAction.class); actions.register(GetStoredScriptAction.INSTANCE, TransportGetStoredScriptAction.class); @@ -763,6 +769,9 @@ public void initRestHandlers(Supplier nodesInCluster) { registerHandler.accept(new RestReloadSecureSettingsAction()); + // Data streams + registerHandler.accept(new RestModifyDataStreamsAction()); + // Scripts API registerHandler.accept(new RestGetStoredScriptAction()); registerHandler.accept(new RestPutStoredScriptAction()); diff --git a/server/src/main/java/org/elasticsearch/action/datastreams/ModifyDataStreamsAction.java b/server/src/main/java/org/elasticsearch/action/datastreams/ModifyDataStreamsAction.java new file mode 100644 index 000000000000..b27c0d96f412 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/action/datastreams/ModifyDataStreamsAction.java @@ -0,0 +1,107 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.action.datastreams; + +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.ActionType; +import org.elasticsearch.action.IndicesRequest; +import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.action.support.master.AcknowledgedRequest; +import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.cluster.metadata.DataStreamAction; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.xcontent.ObjectParser; +import org.elasticsearch.xcontent.ParseField; +import org.elasticsearch.xcontent.ToXContentObject; +import org.elasticsearch.xcontent.XContentBuilder; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; + +import static org.elasticsearch.action.ValidateActions.addValidationError; + +public class ModifyDataStreamsAction extends ActionType { + + public static final ModifyDataStreamsAction INSTANCE = new ModifyDataStreamsAction(); + public static final String NAME = "indices:admin/data_stream/modify"; + + private ModifyDataStreamsAction() { + super(NAME, AcknowledgedResponse::readFrom); + } + + public static final class Request + extends AcknowledgedRequest + implements IndicesRequest, ToXContentObject { + + // relevant only for authorizing the request, so require every specified + // index to exist, expand wildcards only to open indices, prohibit + // wildcard expressions that resolve to zero indices, and do not attempt + // to resolve expressions as aliases + private static final IndicesOptions INDICES_OPTIONS = + IndicesOptions.fromOptions(false, false, true, false, true, false, true, false); + + private List actions; + + public Request() {} + + public Request(StreamInput in) throws IOException { + super(in); + actions = in.readList(DataStreamAction::new); + } + + public Request(List actions) { + this.actions = Collections.unmodifiableList(actions); + } + + public List getActions() { + return actions; + } + + public void setActions(List actions) { + this.actions = Collections.unmodifiableList(actions); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.startArray("actions"); + for (DataStreamAction action : actions) { + action.toXContent(builder, params); + } + builder.endArray(); + builder.endObject(); + return builder; + } + + @Override + public ActionRequestValidationException validate() { + if (actions.isEmpty()) { + return addValidationError("must specify at least one data stream modification action", null); + } + return null; + } + + public static final ObjectParser PARSER = + new ObjectParser<>("data_stream_actions", Request::new); + static { + PARSER.declareObjectArray(Request::setActions, DataStreamAction.PARSER, new ParseField("actions")); + } + + @Override + public String[] indices() { + return actions.stream().map(DataStreamAction::getDataStream).toArray(String[]::new); + } + + @Override + public IndicesOptions indicesOptions() { + return INDICES_OPTIONS; + } + } +} diff --git a/x-pack/plugin/data-streams/src/main/java/org/elasticsearch/xpack/datastreams/action/ModifyDataStreamTransportAction.java b/server/src/main/java/org/elasticsearch/action/datastreams/ModifyDataStreamsTransportAction.java similarity index 75% rename from x-pack/plugin/data-streams/src/main/java/org/elasticsearch/xpack/datastreams/action/ModifyDataStreamTransportAction.java rename to server/src/main/java/org/elasticsearch/action/datastreams/ModifyDataStreamsTransportAction.java index 0e44366262a5..ff1d0e6ce075 100644 --- a/x-pack/plugin/data-streams/src/main/java/org/elasticsearch/xpack/datastreams/action/ModifyDataStreamTransportAction.java +++ b/server/src/main/java/org/elasticsearch/action/datastreams/ModifyDataStreamsTransportAction.java @@ -1,11 +1,12 @@ /* * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one * or more contributor license agreements. Licensed under the Elastic License - * 2.0; you may not use this file except in compliance with the Elastic License - * 2.0. + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. */ -package org.elasticsearch.xpack.datastreams.action; +package org.elasticsearch.action.datastreams; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.ActionFilters; @@ -21,15 +22,14 @@ import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; -import org.elasticsearch.xpack.core.action.ModifyDataStreamsAction; -public class ModifyDataStreamTransportAction extends AcknowledgedTransportMasterNodeAction< - MetadataDataStreamsService.ModifyDataStreamRequest> { +public class ModifyDataStreamsTransportAction extends AcknowledgedTransportMasterNodeAction< + ModifyDataStreamsAction.Request> { private final MetadataDataStreamsService metadataDataStreamsService; @Inject - public ModifyDataStreamTransportAction( + public ModifyDataStreamsTransportAction( TransportService transportService, ClusterService clusterService, ThreadPool threadPool, @@ -43,7 +43,7 @@ public ModifyDataStreamTransportAction( clusterService, threadPool, actionFilters, - MetadataDataStreamsService.ModifyDataStreamRequest::new, + ModifyDataStreamsAction.Request::new, indexNameExpressionResolver, ThreadPool.Names.SAME ); @@ -53,7 +53,7 @@ public ModifyDataStreamTransportAction( @Override protected void masterOperation( Task task, - MetadataDataStreamsService.ModifyDataStreamRequest request, + ModifyDataStreamsAction.Request request, ClusterState state, ActionListener listener ) throws Exception { @@ -61,7 +61,7 @@ protected void masterOperation( } @Override - protected ClusterBlockException checkBlock(MetadataDataStreamsService.ModifyDataStreamRequest request, ClusterState state) { + protected ClusterBlockException checkBlock(ModifyDataStreamsAction.Request request, ClusterState state) { return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE); } diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/DataStreamAction.java b/server/src/main/java/org/elasticsearch/cluster/metadata/DataStreamAction.java index 08d4d96b056e..32bb68d355f3 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/DataStreamAction.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/DataStreamAction.java @@ -72,7 +72,7 @@ public static DataStreamAction removeBackingIndex(String dataStream, String inde return new DataStreamAction(Type.REMOVE_BACKING_INDEX, dataStream, index); } - DataStreamAction(StreamInput in) throws IOException { + public DataStreamAction(StreamInput in) throws IOException { this.type = Type.fromValue(in.readByte()); this.dataStream = in.readString(); this.index = in.readString(); diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataDataStreamsService.java b/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataDataStreamsService.java index b40a11c017cc..934158970520 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataDataStreamsService.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataDataStreamsService.java @@ -9,31 +9,19 @@ package org.elasticsearch.cluster.metadata; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.ActionRequestValidationException; -import org.elasticsearch.action.IndicesRequest; -import org.elasticsearch.action.support.IndicesOptions; -import org.elasticsearch.action.support.master.AcknowledgedRequest; +import org.elasticsearch.action.datastreams.ModifyDataStreamsAction; import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.cluster.AckedClusterStateUpdateTask; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Priority; -import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.indices.IndicesService; -import org.elasticsearch.xcontent.ObjectParser; -import org.elasticsearch.xcontent.ParseField; -import org.elasticsearch.xcontent.ToXContentObject; -import org.elasticsearch.xcontent.XContentBuilder; import java.io.IOException; -import java.util.Collections; -import java.util.List; import java.util.function.Function; -import static org.elasticsearch.action.ValidateActions.addValidationError; - /** * Handles data stream modification requests. */ @@ -47,7 +35,7 @@ public MetadataDataStreamsService(ClusterService clusterService, IndicesService this.indicesService = indicesService; } - public void modifyDataStream(final ModifyDataStreamRequest request, + public void modifyDataStream(final ModifyDataStreamsAction.Request request, final ActionListener listener) { if (request.getActions().size() == 0) { listener.onResponse(AcknowledgedResponse.TRUE); @@ -165,72 +153,4 @@ private static IndexAbstraction validateIndex(Metadata metadata, String indexNam return index; } - public static final class ModifyDataStreamRequest - extends AcknowledgedRequest - implements IndicesRequest, ToXContentObject { - - // relevant only for authorizing the request, so require every specified - // index to exist, expand wildcards only to open indices, prohibit - // wildcard expressions that resolve to zero indices, and do not attempt - // to resolve expressions as aliases - private static final IndicesOptions INDICES_OPTIONS = - IndicesOptions.fromOptions(false, false, true, false, true, false, true, false); - - private List actions; - - public ModifyDataStreamRequest() {} - - public ModifyDataStreamRequest(StreamInput in) throws IOException { - super(in); - actions = in.readList(DataStreamAction::new); - } - - public ModifyDataStreamRequest(List actions) { - this.actions = Collections.unmodifiableList(actions); - } - - public List getActions() { - return actions; - } - - public void setActions(List actions) { - this.actions = Collections.unmodifiableList(actions); - } - - @Override - public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { - builder.startObject(); - builder.startArray("actions"); - for (DataStreamAction action : actions) { - action.toXContent(builder, params); - } - builder.endArray(); - builder.endObject(); - return builder; - } - - @Override - public ActionRequestValidationException validate() { - if (actions.isEmpty()) { - return addValidationError("must specify at least one data stream modification action", null); - } - return null; - } - - public static final ObjectParser PARSER = - new ObjectParser<>("data_stream_actions", ModifyDataStreamRequest::new); - static { - PARSER.declareObjectArray(ModifyDataStreamRequest::setActions, DataStreamAction.PARSER, new ParseField("actions")); - } - - @Override - public String[] indices() { - return actions.stream().map(DataStreamAction::getDataStream).toArray(String[]::new); - } - - @Override - public IndicesOptions indicesOptions() { - return INDICES_OPTIONS; - } - } } diff --git a/x-pack/plugin/data-streams/src/main/java/org/elasticsearch/xpack/datastreams/rest/RestModifyDataStreamAction.java b/server/src/main/java/org/elasticsearch/rest/action/datastreams/RestModifyDataStreamsAction.java similarity index 71% rename from x-pack/plugin/data-streams/src/main/java/org/elasticsearch/xpack/datastreams/rest/RestModifyDataStreamAction.java rename to server/src/main/java/org/elasticsearch/rest/action/datastreams/RestModifyDataStreamsAction.java index 3692dde598a2..bb651ff3b98e 100644 --- a/x-pack/plugin/data-streams/src/main/java/org/elasticsearch/xpack/datastreams/rest/RestModifyDataStreamAction.java +++ b/server/src/main/java/org/elasticsearch/rest/action/datastreams/RestModifyDataStreamsAction.java @@ -1,26 +1,26 @@ /* * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one * or more contributor license agreements. Licensed under the Elastic License - * 2.0; you may not use this file except in compliance with the Elastic License - * 2.0. + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. */ -package org.elasticsearch.xpack.datastreams.rest; +package org.elasticsearch.rest.action.datastreams; import org.elasticsearch.client.node.NodeClient; -import org.elasticsearch.cluster.metadata.MetadataDataStreamsService; import org.elasticsearch.rest.BaseRestHandler; import org.elasticsearch.rest.RestRequest; import org.elasticsearch.rest.action.RestToXContentListener; import org.elasticsearch.xcontent.XContentParser; -import org.elasticsearch.xpack.core.action.ModifyDataStreamsAction; +import org.elasticsearch.action.datastreams.ModifyDataStreamsAction; import java.io.IOException; import java.util.List; import static org.elasticsearch.rest.RestRequest.Method.POST; -public class RestModifyDataStreamAction extends BaseRestHandler { +public class RestModifyDataStreamsAction extends BaseRestHandler { @Override public String getName() { @@ -34,11 +34,11 @@ public List routes() { @Override protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException { - MetadataDataStreamsService.ModifyDataStreamRequest modifyDsRequest = new MetadataDataStreamsService.ModifyDataStreamRequest(); + ModifyDataStreamsAction.Request modifyDsRequest = new ModifyDataStreamsAction.Request(); modifyDsRequest.masterNodeTimeout(request.paramAsTime("master_timeout", modifyDsRequest.masterNodeTimeout())); modifyDsRequest.timeout(request.paramAsTime("timeout", modifyDsRequest.timeout())); try (XContentParser parser = request.contentParser()) { - MetadataDataStreamsService.ModifyDataStreamRequest.PARSER.parse(parser, modifyDsRequest, null); + ModifyDataStreamsAction.Request.PARSER.parse(parser, modifyDsRequest, null); } if (modifyDsRequest.getActions() == null || modifyDsRequest.getActions().isEmpty()) { throw new IllegalArgumentException("no data stream actions specified"); diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/action/ModifyDataStreamsAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/action/ModifyDataStreamsAction.java deleted file mode 100644 index 0811ee2479fd..000000000000 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/action/ModifyDataStreamsAction.java +++ /dev/null @@ -1,21 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License - * 2.0; you may not use this file except in compliance with the Elastic License - * 2.0. - */ - -package org.elasticsearch.xpack.core.action; - -import org.elasticsearch.action.ActionType; -import org.elasticsearch.action.support.master.AcknowledgedResponse; - -public class ModifyDataStreamsAction extends ActionType { - - public static final ModifyDataStreamsAction INSTANCE = new ModifyDataStreamsAction(); - public static final String NAME = "indices:admin/data_stream/modify"; - - private ModifyDataStreamsAction() { - super(NAME, AcknowledgedResponse::readFrom); - } -} diff --git a/x-pack/plugin/data-streams/src/main/java/org/elasticsearch/xpack/datastreams/DataStreamsPlugin.java b/x-pack/plugin/data-streams/src/main/java/org/elasticsearch/xpack/datastreams/DataStreamsPlugin.java index 3e1337cc8f8d..3c5e954e68ce 100644 --- a/x-pack/plugin/data-streams/src/main/java/org/elasticsearch/xpack/datastreams/DataStreamsPlugin.java +++ b/x-pack/plugin/data-streams/src/main/java/org/elasticsearch/xpack/datastreams/DataStreamsPlugin.java @@ -26,7 +26,6 @@ import org.elasticsearch.xpack.core.action.DataStreamsStatsAction; import org.elasticsearch.xpack.core.action.DeleteDataStreamAction; import org.elasticsearch.xpack.core.action.GetDataStreamAction; -import org.elasticsearch.xpack.core.action.ModifyDataStreamsAction; import org.elasticsearch.xpack.core.action.PromoteDataStreamAction; import org.elasticsearch.xpack.core.action.XPackInfoFeatureAction; import org.elasticsearch.xpack.core.action.XPackUsageFeatureAction; @@ -37,14 +36,12 @@ import org.elasticsearch.xpack.datastreams.action.DeleteDataStreamTransportAction; import org.elasticsearch.xpack.datastreams.action.GetDataStreamsTransportAction; import org.elasticsearch.xpack.datastreams.action.MigrateToDataStreamTransportAction; -import org.elasticsearch.xpack.datastreams.action.ModifyDataStreamTransportAction; import org.elasticsearch.xpack.datastreams.action.PromoteDataStreamTransportAction; import org.elasticsearch.xpack.datastreams.rest.RestCreateDataStreamAction; import org.elasticsearch.xpack.datastreams.rest.RestDataStreamsStatsAction; import org.elasticsearch.xpack.datastreams.rest.RestDeleteDataStreamAction; import org.elasticsearch.xpack.datastreams.rest.RestGetDataStreamsAction; import org.elasticsearch.xpack.datastreams.rest.RestMigrateToDataStreamAction; -import org.elasticsearch.xpack.datastreams.rest.RestModifyDataStreamAction; import org.elasticsearch.xpack.datastreams.rest.RestPromoteDataStreamAction; import java.util.List; @@ -68,7 +65,6 @@ public Map getMetadataMappers() { var dsInfoAction = new ActionHandler<>(XPackInfoFeatureAction.DATA_STREAMS, DataStreamInfoTransportAction.class); var migrateAction = new ActionHandler<>(MigrateToDataStreamAction.INSTANCE, MigrateToDataStreamTransportAction.class); var promoteAction = new ActionHandler<>(PromoteDataStreamAction.INSTANCE, PromoteDataStreamTransportAction.class); - var modifyAction = new ActionHandler<>(ModifyDataStreamsAction.INSTANCE, ModifyDataStreamTransportAction.class); return List.of( createDsAction, deleteDsInfoAction, @@ -77,8 +73,7 @@ public Map getMetadataMappers() { dsUsageAction, dsInfoAction, migrateAction, - promoteAction, - modifyAction + promoteAction ); } @@ -98,7 +93,6 @@ public List getRestHandlers( var dsStatsAction = new RestDataStreamsStatsAction(); var migrateAction = new RestMigrateToDataStreamAction(); var promoteAction = new RestPromoteDataStreamAction(); - var modifyAction = new RestModifyDataStreamAction(); - return List.of(createDsAction, deleteDsAction, getDsAction, dsStatsAction, migrateAction, promoteAction, modifyAction); + return List.of(createDsAction, deleteDsAction, getDsAction, dsStatsAction, migrateAction, promoteAction); } } From 3c26d6be3225147f448b07745e07769364baac67 Mon Sep 17 00:00:00 2001 From: Dan Hermann Date: Sat, 16 Oct 2021 11:36:01 -0500 Subject: [PATCH 09/13] add unit test for request class --- .../datastreams/ModifyDataStreamsAction.java | 25 +++++++++++++ .../cluster/metadata/DataStreamAction.java | 19 ++++++++++ .../ModifyDataStreamsRequestTests.java | 36 +++++++++++++++++++ .../metadata/DataStreamActionTests.java | 2 +- 4 files changed, 81 insertions(+), 1 deletion(-) create mode 100644 server/src/test/java/org/elasticsearch/action/datastreams/ModifyDataStreamsRequestTests.java diff --git a/server/src/main/java/org/elasticsearch/action/datastreams/ModifyDataStreamsAction.java b/server/src/main/java/org/elasticsearch/action/datastreams/ModifyDataStreamsAction.java index b27c0d96f412..11d88f83a0c6 100644 --- a/server/src/main/java/org/elasticsearch/action/datastreams/ModifyDataStreamsAction.java +++ b/server/src/main/java/org/elasticsearch/action/datastreams/ModifyDataStreamsAction.java @@ -11,19 +11,23 @@ import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.ActionType; import org.elasticsearch.action.IndicesRequest; +import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.action.support.master.AcknowledgedRequest; import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.cluster.metadata.DataStreamAction; import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.xcontent.ObjectParser; import org.elasticsearch.xcontent.ParseField; import org.elasticsearch.xcontent.ToXContentObject; import org.elasticsearch.xcontent.XContentBuilder; import java.io.IOException; +import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.Objects; import static org.elasticsearch.action.ValidateActions.addValidationError; @@ -56,6 +60,12 @@ public Request(StreamInput in) throws IOException { actions = in.readList(DataStreamAction::new); } + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeList(actions); + } + public Request(List actions) { this.actions = Collections.unmodifiableList(actions); } @@ -103,5 +113,20 @@ public String[] indices() { public IndicesOptions indicesOptions() { return INDICES_OPTIONS; } + + @Override + public boolean equals(Object obj) { + if (obj == null || obj.getClass() != getClass()) { + return false; + } + Request other = (Request) obj; + return Arrays.equals(actions.toArray(), other.actions.toArray()); + } + + @Override + public int hashCode() { + return Objects.hash(actions); + } + } } diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/DataStreamAction.java b/server/src/main/java/org/elasticsearch/cluster/metadata/DataStreamAction.java index 32bb68d355f3..d6e50cccf3da 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/DataStreamAction.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/DataStreamAction.java @@ -8,6 +8,7 @@ package org.elasticsearch.cluster.metadata; +import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest; import org.elasticsearch.common.Strings; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; @@ -20,6 +21,8 @@ import org.elasticsearch.xcontent.XContentParser; import java.io.IOException; +import java.util.Arrays; +import java.util.Objects; import java.util.function.Supplier; import static org.elasticsearch.xcontent.ConstructingObjectParser.optionalConstructorArg; @@ -178,4 +181,20 @@ private static ObjectParser parser(String name, Supplier PARSER.declareObject(optionalConstructorArg(), REMOVE_BACKING_INDEX_PARSER, REMOVE_BACKING_INDEX); } + @Override + public boolean equals(Object obj) { + if (obj == null || obj.getClass() != getClass()) { + return false; + } + DataStreamAction other = (DataStreamAction) obj; + return Objects.equals(type, other.type) + && Objects.equals(dataStream, other.dataStream) + && Objects.equals(index, other.index); + } + + @Override + public int hashCode() { + return Objects.hash(type, dataStream, index); + } + } diff --git a/server/src/test/java/org/elasticsearch/action/datastreams/ModifyDataStreamsRequestTests.java b/server/src/test/java/org/elasticsearch/action/datastreams/ModifyDataStreamsRequestTests.java new file mode 100644 index 000000000000..2358ec85528d --- /dev/null +++ b/server/src/test/java/org/elasticsearch/action/datastreams/ModifyDataStreamsRequestTests.java @@ -0,0 +1,36 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.action.datastreams; + +import org.elasticsearch.cluster.metadata.DataStreamAction; +import org.elasticsearch.cluster.metadata.DataStreamActionTests; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.test.AbstractWireSerializingTestCase; +import org.elasticsearch.action.datastreams.ModifyDataStreamsAction.Request; + +import java.util.ArrayList; +import java.util.List; + +public class ModifyDataStreamsRequestTests extends AbstractWireSerializingTestCase { + + @Override + protected Writeable.Reader instanceReader() { + return Request::new; + } + + @Override + protected Request createTestInstance() { + final int numActions = randomIntBetween(1, 10); + List actions = new ArrayList<>(); + for (int k = 1; k <= numActions; k++) { + actions.add(DataStreamActionTests.createTestInstance()); + } + return new Request(actions); + } +} diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamActionTests.java b/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamActionTests.java index 3509cdcb5959..d40546579946 100644 --- a/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamActionTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamActionTests.java @@ -37,7 +37,7 @@ public void testToAndFromXContent() throws IOException { assertThat(parsedAction.getIndex(), equalTo(action.getIndex())); } - private DataStreamAction createTestInstance() { + public static DataStreamAction createTestInstance() { DataStreamAction action = new DataStreamAction(randomBoolean() ? DataStreamAction.Type.ADD_BACKING_INDEX : DataStreamAction.Type.REMOVE_BACKING_INDEX From 65fdd9dd0407cc85f71dc24e30d9f86e42e17aa5 Mon Sep 17 00:00:00 2001 From: Dan Hermann Date: Sat, 16 Oct 2021 12:05:14 -0500 Subject: [PATCH 10/13] address remaining review comments --- .../datastreams/ModifyDataStreamsAction.java | 20 ++++++++----------- .../ModifyDataStreamsTransportAction.java | 7 ++++++- .../cluster/metadata/DataStreamAction.java | 4 +--- .../RestModifyDataStreamsAction.java | 10 +++++----- 4 files changed, 20 insertions(+), 21 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/datastreams/ModifyDataStreamsAction.java b/server/src/main/java/org/elasticsearch/action/datastreams/ModifyDataStreamsAction.java index 11d88f83a0c6..52733937de69 100644 --- a/server/src/main/java/org/elasticsearch/action/datastreams/ModifyDataStreamsAction.java +++ b/server/src/main/java/org/elasticsearch/action/datastreams/ModifyDataStreamsAction.java @@ -11,14 +11,13 @@ import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.ActionType; import org.elasticsearch.action.IndicesRequest; -import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.action.support.master.AcknowledgedRequest; import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.cluster.metadata.DataStreamAction; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.xcontent.ObjectParser; +import org.elasticsearch.xcontent.ConstructingObjectParser; import org.elasticsearch.xcontent.ParseField; import org.elasticsearch.xcontent.ToXContentObject; import org.elasticsearch.xcontent.XContentBuilder; @@ -51,9 +50,7 @@ public static final class Request private static final IndicesOptions INDICES_OPTIONS = IndicesOptions.fromOptions(false, false, true, false, true, false, true, false); - private List actions; - - public Request() {} + private final List actions; public Request(StreamInput in) throws IOException { super(in); @@ -74,10 +71,6 @@ public List getActions() { return actions; } - public void setActions(List actions) { - this.actions = Collections.unmodifiableList(actions); - } - @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(); @@ -98,10 +91,13 @@ public ActionRequestValidationException validate() { return null; } - public static final ObjectParser PARSER = - new ObjectParser<>("data_stream_actions", Request::new); + @SuppressWarnings("unchecked") + public static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>( + "data_stream_actions", + args -> new Request(((List) args[0])) + ); static { - PARSER.declareObjectArray(Request::setActions, DataStreamAction.PARSER, new ParseField("actions")); + PARSER.declareObjectArray(ConstructingObjectParser.constructorArg(), DataStreamAction.PARSER, new ParseField("actions")); } @Override diff --git a/server/src/main/java/org/elasticsearch/action/datastreams/ModifyDataStreamsTransportAction.java b/server/src/main/java/org/elasticsearch/action/datastreams/ModifyDataStreamsTransportAction.java index ff1d0e6ce075..cb174e0f71a0 100644 --- a/server/src/main/java/org/elasticsearch/action/datastreams/ModifyDataStreamsTransportAction.java +++ b/server/src/main/java/org/elasticsearch/action/datastreams/ModifyDataStreamsTransportAction.java @@ -62,7 +62,12 @@ protected void masterOperation( @Override protected ClusterBlockException checkBlock(ModifyDataStreamsAction.Request request, ClusterState state) { - return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE); + ClusterBlockException globalBlock = state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE); + if (globalBlock != null) { + return globalBlock; + } + return state.blocks().indicesBlockedException(ClusterBlockLevel.METADATA_WRITE, + indexNameExpressionResolver.concreteIndexNames(state, request)); } } diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/DataStreamAction.java b/server/src/main/java/org/elasticsearch/cluster/metadata/DataStreamAction.java index d6e50cccf3da..48b9cd910036 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/DataStreamAction.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/DataStreamAction.java @@ -8,7 +8,6 @@ package org.elasticsearch.cluster.metadata; -import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest; import org.elasticsearch.common.Strings; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; @@ -21,7 +20,6 @@ import org.elasticsearch.xcontent.XContentParser; import java.io.IOException; -import java.util.Arrays; import java.util.Objects; import java.util.function.Supplier; @@ -88,7 +86,7 @@ private DataStreamAction(Type type, String dataStream, String index) { if (false == Strings.hasText(index)) { throw new IllegalArgumentException("[index] is required"); } - this.type = type; + this.type = Objects.requireNonNull(type, "[type] must not be null"); this.dataStream = dataStream; this.index = index; } diff --git a/server/src/main/java/org/elasticsearch/rest/action/datastreams/RestModifyDataStreamsAction.java b/server/src/main/java/org/elasticsearch/rest/action/datastreams/RestModifyDataStreamsAction.java index bb651ff3b98e..3a8862f902fc 100644 --- a/server/src/main/java/org/elasticsearch/rest/action/datastreams/RestModifyDataStreamsAction.java +++ b/server/src/main/java/org/elasticsearch/rest/action/datastreams/RestModifyDataStreamsAction.java @@ -34,15 +34,15 @@ public List routes() { @Override protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException { - ModifyDataStreamsAction.Request modifyDsRequest = new ModifyDataStreamsAction.Request(); - modifyDsRequest.masterNodeTimeout(request.paramAsTime("master_timeout", modifyDsRequest.masterNodeTimeout())); - modifyDsRequest.timeout(request.paramAsTime("timeout", modifyDsRequest.timeout())); + ModifyDataStreamsAction.Request modifyDsRequest; try (XContentParser parser = request.contentParser()) { - ModifyDataStreamsAction.Request.PARSER.parse(parser, modifyDsRequest, null); + modifyDsRequest = ModifyDataStreamsAction.Request.PARSER.parse(parser, null); } if (modifyDsRequest.getActions() == null || modifyDsRequest.getActions().isEmpty()) { - throw new IllegalArgumentException("no data stream actions specified"); + throw new IllegalArgumentException("no data stream actions specified, at least one must be specified"); } + modifyDsRequest.masterNodeTimeout(request.paramAsTime("master_timeout", modifyDsRequest.masterNodeTimeout())); + modifyDsRequest.timeout(request.paramAsTime("timeout", modifyDsRequest.timeout())); return channel -> client.execute(ModifyDataStreamsAction.INSTANCE, modifyDsRequest, new RestToXContentListener<>(channel)); } From 027ef7aaf9c35cd423829ea4c8bdcdf0f136a2d0 Mon Sep 17 00:00:00 2001 From: Dan Hermann Date: Sat, 16 Oct 2021 12:10:39 -0500 Subject: [PATCH 11/13] fix test --- .../action/datastreams/ModifyDataStreamsAction.java | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/server/src/main/java/org/elasticsearch/action/datastreams/ModifyDataStreamsAction.java b/server/src/main/java/org/elasticsearch/action/datastreams/ModifyDataStreamsAction.java index 52733937de69..15a7aaf73f3d 100644 --- a/server/src/main/java/org/elasticsearch/action/datastreams/ModifyDataStreamsAction.java +++ b/server/src/main/java/org/elasticsearch/action/datastreams/ModifyDataStreamsAction.java @@ -110,6 +110,11 @@ public IndicesOptions indicesOptions() { return INDICES_OPTIONS; } + @Override + public boolean includeDataStreams() { + return true; + } + @Override public boolean equals(Object obj) { if (obj == null || obj.getClass() != getClass()) { From 8403d45b26d25a4fce3f0213f025911a0b11f45f Mon Sep 17 00:00:00 2001 From: Dan Hermann Date: Tue, 19 Oct 2021 07:00:14 -0500 Subject: [PATCH 12/13] add mutateInstance test method --- .../datastreams/ModifyDataStreamsRequestTests.java | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/server/src/test/java/org/elasticsearch/action/datastreams/ModifyDataStreamsRequestTests.java b/server/src/test/java/org/elasticsearch/action/datastreams/ModifyDataStreamsRequestTests.java index 2358ec85528d..c0fce3246dc0 100644 --- a/server/src/test/java/org/elasticsearch/action/datastreams/ModifyDataStreamsRequestTests.java +++ b/server/src/test/java/org/elasticsearch/action/datastreams/ModifyDataStreamsRequestTests.java @@ -8,12 +8,13 @@ package org.elasticsearch.action.datastreams; +import org.elasticsearch.action.datastreams.ModifyDataStreamsAction.Request; import org.elasticsearch.cluster.metadata.DataStreamAction; import org.elasticsearch.cluster.metadata.DataStreamActionTests; import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.test.AbstractWireSerializingTestCase; -import org.elasticsearch.action.datastreams.ModifyDataStreamsAction.Request; +import java.io.IOException; import java.util.ArrayList; import java.util.List; @@ -33,4 +34,14 @@ protected Request createTestInstance() { } return new Request(actions); } + + @Override + protected Request mutateInstance(Request request) throws IOException { + final int moreActions = randomIntBetween(1, 5); + List actions = new ArrayList<>(request.getActions()); + for (int k = 1; k <= moreActions; k++) { + actions.add(DataStreamActionTests.createTestInstance()); + } + return new Request(actions); + } } From 50e24f7be5842d4ee2ffb0b4ffc4852ec1fcd771 Mon Sep 17 00:00:00 2001 From: Dan Hermann Date: Tue, 19 Oct 2021 07:20:29 -0500 Subject: [PATCH 13/13] API spec comments --- .../rest-api-spec/api/indices.modify_data_stream.json | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/rest-api-spec/src/main/resources/rest-api-spec/api/indices.modify_data_stream.json b/rest-api-spec/src/main/resources/rest-api-spec/api/indices.modify_data_stream.json index 7a68b8abbe07..ea095289b72b 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/api/indices.modify_data_stream.json +++ b/rest-api-spec/src/main/resources/rest-api-spec/api/indices.modify_data_stream.json @@ -2,12 +2,13 @@ "indices.modify_data_stream":{ "documentation":{ "url":"https://www.elastic.co/guide/en/elasticsearch/reference/master/data-streams.html", - "description":"Creates a data stream" + "description":"Modifies a data stream" }, "stability":"stable", "visibility":"public", "headers":{ - "accept": [ "application/json"] + "accept": [ "application/json"], + "content_type": ["application/json"] }, "url":{ "paths":[