diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/RestHighLevelClientTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/RestHighLevelClientTests.java index 468dbbd5d3ef7..b35f59c3740da 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/RestHighLevelClientTests.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/RestHighLevelClientTests.java @@ -793,7 +793,10 @@ public void testApiNamingConventions() throws Exception { "scripts_painless_execute", "cluster.put_component_template", "cluster.get_component_template", - "cluster.delete_component_template" + "cluster.delete_component_template", + "indices.create_data_stream", + "indices.get_data_streams", + "indices.delete_data_stream" }; //These API are not required for high-level client feature completeness String[] notRequiredApi = new String[] { diff --git a/distribution/archives/integ-test-zip/build.gradle b/distribution/archives/integ-test-zip/build.gradle index 5fb5b36553b91..a0884b2555caf 100644 --- a/distribution/archives/integ-test-zip/build.gradle +++ b/distribution/archives/integ-test-zip/build.gradle @@ -37,5 +37,6 @@ integTest.runner { testClusters.integTest { if (BuildParams.isSnapshotBuild() == false) { systemProperty 'es.itv2_feature_flag_registered', 'true' + systemProperty 'es.datastreams_feature_flag_registered', 'true' } } diff --git a/qa/smoke-test-multinode/build.gradle b/qa/smoke-test-multinode/build.gradle index 1a2317fbb1667..3663d63ea4b51 100644 --- a/qa/smoke-test-multinode/build.gradle +++ b/qa/smoke-test-multinode/build.gradle @@ -16,6 +16,7 @@ * specific language governing permissions and limitations * under the License. */ +import org.elasticsearch.gradle.info.BuildParams import org.elasticsearch.gradle.info.BuildParams @@ -52,5 +53,6 @@ integTest.runner { testClusters.integTest { if (BuildParams.isSnapshotBuild() == false) { systemProperty 'es.itv2_feature_flag_registered', 'true' + systemProperty 'es.datastreams_feature_flag_registered', 'true' } } diff --git a/rest-api-spec/src/main/resources/rest-api-spec/api/indices.create_data_stream.json b/rest-api-spec/src/main/resources/rest-api-spec/api/indices.create_data_stream.json new file mode 100644 index 0000000000000..ef8615a69b1ca --- /dev/null +++ b/rest-api-spec/src/main/resources/rest-api-spec/api/indices.create_data_stream.json @@ -0,0 +1,31 @@ +{ + "indices.create_data_stream":{ + "documentation":{ + "url":"https://www.elastic.co/guide/en/elasticsearch/reference/master/data-streams.html", + "description":"Creates or updates a data stream" + }, + "stability":"experimental", + "url":{ + "paths":[ + { + "path":"/_data_stream/{name}", + "methods":[ + "PUT" + ], + "parts":{ + "name":{ + "type":"string", + "description":"The name of the data stream" + } + } + } + ] + }, + "params":{ + }, + "body":{ + "description":"The data stream definition", + "required":true + } + } +} diff --git a/rest-api-spec/src/main/resources/rest-api-spec/api/indices.delete_data_stream.json b/rest-api-spec/src/main/resources/rest-api-spec/api/indices.delete_data_stream.json new file mode 100644 index 0000000000000..71ed5808caefc --- /dev/null +++ b/rest-api-spec/src/main/resources/rest-api-spec/api/indices.delete_data_stream.json @@ -0,0 +1,26 @@ +{ + "indices.delete_data_stream":{ + "documentation":{ + "url":"https://www.elastic.co/guide/en/elasticsearch/reference/master/data-streams.html", + "description":"Deletes a data stream." + }, + "stability":"experimental", + "url":{ + "paths":[ + { + "path":"/_data_stream/{name}", + "methods":[ + "DELETE" + ], + "parts":{ + "name":{ + "type":"string", + "description":"The name of the data stream" + } + } + } + ] + }, + "params":{} + } +} diff --git a/rest-api-spec/src/main/resources/rest-api-spec/api/indices.get_data_streams.json b/rest-api-spec/src/main/resources/rest-api-spec/api/indices.get_data_streams.json new file mode 100644 index 0000000000000..42415068d4a5d --- /dev/null +++ b/rest-api-spec/src/main/resources/rest-api-spec/api/indices.get_data_streams.json @@ -0,0 +1,33 @@ +{ + "indices.get_data_streams":{ + "documentation":{ + "url":"https://www.elastic.co/guide/en/elasticsearch/reference/master/data-streams.html", + "description":"Returns data streams." + }, + "stability":"experimental", + "url":{ + "paths":[ + { + "path":"/_data_streams", + "methods":[ + "GET" + ] + }, + { + "path":"/_data_streams/{name}", + "methods":[ + "GET" + ], + "parts":{ + "name":{ + "type":"list", + "description":"The comma separated names of data streams" + } + } + } + ] + }, + "params":{ + } + } +} diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/indices.data_stream/10_basic.yml b/rest-api-spec/src/main/resources/rest-api-spec/test/indices.data_stream/10_basic.yml new file mode 100644 index 0000000000000..49754005b6db5 --- /dev/null +++ b/rest-api-spec/src/main/resources/rest-api-spec/test/indices.data_stream/10_basic.yml @@ -0,0 +1,26 @@ +--- +"Test stubs": + - skip: + version: " - 7.99.99" + reason: not backported yet + + - do: + indices.create_data_stream: + name: data-stream2 + body: + timestamp_field: "@timestamp" + - is_true: acknowledged + + - do: + indices.get_data_streams: {} + - match: { 0.name: my_data_stream1 } + - match: { 0.timestamp_field: '@timestamp' } + - match: { 0.indices: ['my_data_stream1-000000'] } + - match: { 1.name: my_data_stream2 } + - match: { 1.timestamp_field: '@timestamp' } + - match: { 1.indices: [] } + + - do: + indices.delete_data_stream: + name: data-stream2 + - is_true: acknowledged diff --git a/server/src/main/java/org/elasticsearch/action/ActionModule.java b/server/src/main/java/org/elasticsearch/action/ActionModule.java index aa8d92129f577..894f0805564e3 100644 --- a/server/src/main/java/org/elasticsearch/action/ActionModule.java +++ b/server/src/main/java/org/elasticsearch/action/ActionModule.java @@ -28,6 +28,9 @@ import org.elasticsearch.action.admin.cluster.configuration.ClearVotingConfigExclusionsAction; import org.elasticsearch.action.admin.cluster.configuration.TransportAddVotingConfigExclusionsAction; import org.elasticsearch.action.admin.cluster.configuration.TransportClearVotingConfigExclusionsAction; +import org.elasticsearch.action.admin.indices.datastream.DeleteDataStreamAction; +import org.elasticsearch.action.admin.indices.datastream.GetDataStreamsAction; +import org.elasticsearch.action.admin.indices.datastream.CreateDataStreamAction; import org.elasticsearch.action.admin.cluster.health.ClusterHealthAction; import org.elasticsearch.action.admin.cluster.health.TransportClusterHealthAction; import org.elasticsearch.action.admin.cluster.node.hotthreads.NodesHotThreadsAction; @@ -250,9 +253,11 @@ import org.elasticsearch.rest.action.admin.cluster.RestClusterStatsAction; import org.elasticsearch.rest.action.admin.cluster.RestClusterUpdateSettingsAction; import org.elasticsearch.rest.action.admin.cluster.RestCreateSnapshotAction; +import org.elasticsearch.rest.action.admin.indices.RestDeleteDataStreamAction; import org.elasticsearch.rest.action.admin.cluster.RestDeleteRepositoryAction; import org.elasticsearch.rest.action.admin.cluster.RestDeleteSnapshotAction; import org.elasticsearch.rest.action.admin.cluster.RestDeleteStoredScriptAction; +import org.elasticsearch.rest.action.admin.indices.RestGetDataStreamsAction; import org.elasticsearch.rest.action.admin.cluster.RestGetRepositoriesAction; import org.elasticsearch.rest.action.admin.cluster.RestGetScriptContextAction; import org.elasticsearch.rest.action.admin.cluster.RestGetScriptLanguageAction; @@ -265,6 +270,7 @@ import org.elasticsearch.rest.action.admin.cluster.RestNodesStatsAction; import org.elasticsearch.rest.action.admin.cluster.RestNodesUsageAction; import org.elasticsearch.rest.action.admin.cluster.RestPendingClusterTasksAction; +import org.elasticsearch.rest.action.admin.indices.RestCreateDataStreamAction; import org.elasticsearch.rest.action.admin.cluster.RestPutRepositoryAction; import org.elasticsearch.rest.action.admin.cluster.RestPutStoredScriptAction; import org.elasticsearch.rest.action.admin.cluster.RestReloadSecureSettingsAction; @@ -388,6 +394,24 @@ public class ActionModule extends AbstractModule { } } + private static final boolean DATASTREAMS_FEATURE_FLAG_REGISTERED; + + static { + final String property = System.getProperty("es.datastreams_feature_flag_registered"); + if (Build.CURRENT.isSnapshot() && property != null) { + throw new IllegalArgumentException("es.datastreams_feature_flag_registered is only supported in non-snapshot builds"); + } + if (Build.CURRENT.isSnapshot() || "true".equals(property)) { + DATASTREAMS_FEATURE_FLAG_REGISTERED = true; + } else if ("false".equals(property) || property == null) { + DATASTREAMS_FEATURE_FLAG_REGISTERED = false; + } else { + throw new IllegalArgumentException( + "expected es.datastreams_feature_flag_registered to be unset or [true|false] but was [" + property + "]" + ); + } + } + private final Settings settings; private final IndexNameExpressionResolver indexNameExpressionResolver; private final IndexScopedSettings indexScopedSettings; @@ -565,6 +589,13 @@ public void reg actionPlugins.stream().flatMap(p -> p.getActions().stream()).forEach(actions::register); + // Data streams: + if (DATASTREAMS_FEATURE_FLAG_REGISTERED) { + actions.register(CreateDataStreamAction.INSTANCE, CreateDataStreamAction.TransportAction.class); + actions.register(DeleteDataStreamAction.INSTANCE, DeleteDataStreamAction.TransportAction.class); + actions.register(GetDataStreamsAction.INSTANCE, GetDataStreamsAction.TransportAction.class); + } + // Persistent tasks: actions.register(StartPersistentTaskAction.INSTANCE, StartPersistentTaskAction.TransportAction.class); actions.register(UpdatePersistentTaskStatusAction.INSTANCE, UpdatePersistentTaskStatusAction.TransportAction.class); @@ -717,6 +748,13 @@ public void initRestHandlers(Supplier nodesInCluster) { registerHandler.accept(new RestDeletePipelineAction()); registerHandler.accept(new RestSimulatePipelineAction()); + // Data Stream API + if (DATASTREAMS_FEATURE_FLAG_REGISTERED) { + registerHandler.accept(new RestCreateDataStreamAction()); + registerHandler.accept(new RestDeleteDataStreamAction()); + registerHandler.accept(new RestGetDataStreamsAction()); + } + // CAT API registerHandler.accept(new RestAllocationAction()); registerHandler.accept(new RestShardsAction()); diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/datastream/CreateDataStreamAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/datastream/CreateDataStreamAction.java new file mode 100644 index 0000000000000..df6e829a28af4 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/datastream/CreateDataStreamAction.java @@ -0,0 +1,128 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.action.admin.indices.datastream; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.ActionType; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.action.support.master.MasterNodeRequest; +import org.elasticsearch.action.support.master.TransportMasterNodeAction; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.block.ClusterBlockException; +import org.elasticsearch.cluster.block.ClusterBlockLevel; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; + +import java.io.IOException; +import java.util.Objects; + +public class CreateDataStreamAction extends ActionType { + + public static final CreateDataStreamAction INSTANCE = new CreateDataStreamAction(); + public static final String NAME = "indices:admin/data_stream/create"; + + private CreateDataStreamAction() { + super(NAME, AcknowledgedResponse::new); + } + + public static class Request extends MasterNodeRequest { + + private final String name; + private String timestampFieldName; + + public Request(String name) { + this.name = name; + } + + public void setTimestampFieldName(String timestampFieldName) { + this.timestampFieldName = timestampFieldName; + } + + @Override + public ActionRequestValidationException validate() { + return null; + } + + public Request(StreamInput in) throws IOException { + super(in); + this.name = in.readString(); + this.timestampFieldName = in.readString(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeString(name); + out.writeString(timestampFieldName); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + Request request = (Request) o; + return name.equals(request.name) && + timestampFieldName.equals(request.timestampFieldName); + } + + @Override + public int hashCode() { + return Objects.hash(name, timestampFieldName); + } + } + + public static class TransportAction extends TransportMasterNodeAction { + + @Inject + public TransportAction(TransportService transportService, ClusterService clusterService, ThreadPool threadPool, + ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) { + super(NAME, transportService, clusterService, threadPool, actionFilters, Request::new, indexNameExpressionResolver); + } + + @Override + protected String executor() { + return ThreadPool.Names.SAME; + } + + @Override + protected AcknowledgedResponse read(StreamInput in) throws IOException { + return new AcknowledgedResponse(in); + } + + @Override + protected void masterOperation(Task task, Request request, ClusterState state, + ActionListener listener) throws Exception { + listener.onResponse(new AcknowledgedResponse(true)); + } + + @Override + protected ClusterBlockException checkBlock(Request request, ClusterState state) { + return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE); + } + } + +} diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/datastream/DeleteDataStreamAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/datastream/DeleteDataStreamAction.java new file mode 100644 index 0000000000000..20a2ba4aa2cd6 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/datastream/DeleteDataStreamAction.java @@ -0,0 +1,120 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.action.admin.indices.datastream; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.ActionType; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.action.support.master.MasterNodeRequest; +import org.elasticsearch.action.support.master.TransportMasterNodeAction; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.block.ClusterBlockException; +import org.elasticsearch.cluster.block.ClusterBlockLevel; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; + +import java.io.IOException; +import java.util.Objects; + +public class DeleteDataStreamAction extends ActionType { + + public static final DeleteDataStreamAction INSTANCE = new DeleteDataStreamAction(); + public static final String NAME = "indices:admin/data_stream/delete"; + + private DeleteDataStreamAction() { + super(NAME, AcknowledgedResponse::new); + } + + public static class Request extends MasterNodeRequest { + + private final String name; + + public Request(String name) { + this.name = Objects.requireNonNull(name); + } + + @Override + public ActionRequestValidationException validate() { + return null; + } + + public Request(StreamInput in) throws IOException { + super(in); + this.name = in.readString(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeString(name); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + Request request = (Request) o; + return name.equals(request.name); + } + + @Override + public int hashCode() { + return Objects.hash(name); + } + } + + public static class TransportAction extends TransportMasterNodeAction { + + @Inject + public TransportAction(TransportService transportService, ClusterService clusterService, ThreadPool threadPool, + ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) { + super(NAME, transportService, clusterService, threadPool, actionFilters, Request::new, indexNameExpressionResolver); + } + + @Override + protected String executor() { + return ThreadPool.Names.SAME; + } + + @Override + protected AcknowledgedResponse read(StreamInput in) throws IOException { + return new AcknowledgedResponse(in); + } + + @Override + protected void masterOperation(Task task, Request request, ClusterState state, + ActionListener listener) throws Exception { + listener.onResponse(new AcknowledgedResponse(true)); + } + + @Override + protected ClusterBlockException checkBlock(Request request, ClusterState state) { + return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE); + } + } + +} diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/datastream/GetDataStreamsAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/datastream/GetDataStreamsAction.java new file mode 100644 index 0000000000000..1549f056e811f --- /dev/null +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/datastream/GetDataStreamsAction.java @@ -0,0 +1,170 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.action.admin.indices.datastream; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.ActionType; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.master.MasterNodeReadRequest; +import org.elasticsearch.action.support.master.TransportMasterNodeReadAction; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.block.ClusterBlockException; +import org.elasticsearch.cluster.block.ClusterBlockLevel; +import org.elasticsearch.cluster.metadata.DataStream; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.xcontent.ToXContentObject; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.Objects; + +public class GetDataStreamsAction extends ActionType { + + public static final GetDataStreamsAction INSTANCE = new GetDataStreamsAction(); + public static final String NAME = "indices:admin/data_stream/get"; + + private GetDataStreamsAction() { + super(NAME, Response::new); + } + + public static class Request extends MasterNodeReadRequest { + + private final String[] names; + + public Request(String[] names) { + this.names = Objects.requireNonNull(names); + } + + @Override + public ActionRequestValidationException validate() { + return null; + } + + public Request(StreamInput in) throws IOException { + super(in); + this.names = in.readStringArray(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeStringArray(names); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + Request request = (Request) o; + return Arrays.equals(names, request.names); + } + + @Override + public int hashCode() { + return Arrays.hashCode(names); + } + } + + public static class Response extends ActionResponse implements ToXContentObject { + + private final List dataStreams; + + public Response(List dataStreams) { + this.dataStreams = dataStreams; + } + + public Response(StreamInput in) throws IOException { + this(in.readList(DataStream::new)); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeList(dataStreams); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startArray(); + for (DataStream dataStream : dataStreams) { + dataStream.toXContent(builder, params); + } + builder.endArray(); + return builder; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + Response response = (Response) o; + return dataStreams.equals(response.dataStreams); + } + + @Override + public int hashCode() { + return Objects.hash(dataStreams); + } + } + + public static class TransportAction extends TransportMasterNodeReadAction { + + @Inject + public TransportAction(TransportService transportService, ClusterService clusterService, ThreadPool threadPool, + ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) { + super(NAME, transportService, clusterService, threadPool, actionFilters, Request::new, indexNameExpressionResolver); + } + + @Override + protected String executor() { + return ThreadPool.Names.SAME; + } + + @Override + protected Response read(StreamInput in) throws IOException { + return new Response(in); + } + + @Override + protected void masterOperation(Task task, Request request, ClusterState state, + ActionListener listener) throws Exception { + List dataStreams = List.of( + new DataStream("my_data_stream1", "@timestamp", List.of("my_data_stream1-000000")), + new DataStream("my_data_stream2", "@timestamp", List.of()) + ); + listener.onResponse(new Response(dataStreams)); + } + + @Override + protected ClusterBlockException checkBlock(Request request, ClusterState state) { + return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE); + } + } + +} diff --git a/server/src/main/java/org/elasticsearch/client/IndicesAdminClient.java b/server/src/main/java/org/elasticsearch/client/IndicesAdminClient.java index 36b34a7b24c85..13f28dbbce210 100644 --- a/server/src/main/java/org/elasticsearch/client/IndicesAdminClient.java +++ b/server/src/main/java/org/elasticsearch/client/IndicesAdminClient.java @@ -21,6 +21,9 @@ import org.elasticsearch.action.ActionFuture; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.indices.datastream.DeleteDataStreamAction; +import org.elasticsearch.action.admin.indices.datastream.GetDataStreamsAction; +import org.elasticsearch.action.admin.indices.datastream.CreateDataStreamAction; import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest; import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequestBuilder; import org.elasticsearch.action.admin.indices.alias.get.GetAliasesRequest; @@ -713,4 +716,33 @@ public interface IndicesAdminClient extends ElasticsearchClient { */ void rolloverIndex(RolloverRequest request, ActionListener listener); + /** + * Store a data stream + */ + void createDataStream(CreateDataStreamAction.Request request, ActionListener listener); + + /** + * Store a data stream + */ + ActionFuture createDataStream(CreateDataStreamAction.Request request); + + /** + * Delete a data stream + */ + void deleteDataStream(DeleteDataStreamAction.Request request, ActionListener listener); + + /** + * Delete a data stream + */ + ActionFuture deleteDataStream(DeleteDataStreamAction.Request request); + + /** + * Get data streams + */ + void getDataStreams(GetDataStreamsAction.Request request, ActionListener listener); + + /** + * Get data streams + */ + ActionFuture getDataStreams(GetDataStreamsAction.Request request); } diff --git a/server/src/main/java/org/elasticsearch/client/support/AbstractClient.java b/server/src/main/java/org/elasticsearch/client/support/AbstractClient.java index 1ee480fb55edd..669df02de212b 100644 --- a/server/src/main/java/org/elasticsearch/client/support/AbstractClient.java +++ b/server/src/main/java/org/elasticsearch/client/support/AbstractClient.java @@ -30,6 +30,9 @@ import org.elasticsearch.action.admin.cluster.allocation.ClusterAllocationExplainRequest; import org.elasticsearch.action.admin.cluster.allocation.ClusterAllocationExplainRequestBuilder; import org.elasticsearch.action.admin.cluster.allocation.ClusterAllocationExplainResponse; +import org.elasticsearch.action.admin.indices.datastream.DeleteDataStreamAction; +import org.elasticsearch.action.admin.indices.datastream.GetDataStreamsAction; +import org.elasticsearch.action.admin.indices.datastream.CreateDataStreamAction; import org.elasticsearch.action.admin.cluster.health.ClusterHealthAction; import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest; import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequestBuilder; @@ -1657,6 +1660,36 @@ public ActionFuture getSettings(GetSettingsRequest request) public void getSettings(GetSettingsRequest request, ActionListener listener) { execute(GetSettingsAction.INSTANCE, request, listener); } + + @Override + public void createDataStream(CreateDataStreamAction.Request request, ActionListener listener) { + execute(CreateDataStreamAction.INSTANCE, request, listener); + } + + @Override + public ActionFuture createDataStream(CreateDataStreamAction.Request request) { + return execute(CreateDataStreamAction.INSTANCE, request); + } + + @Override + public void deleteDataStream(DeleteDataStreamAction.Request request, ActionListener listener) { + execute(DeleteDataStreamAction.INSTANCE, request, listener); + } + + @Override + public ActionFuture deleteDataStream(DeleteDataStreamAction.Request request) { + return execute(DeleteDataStreamAction.INSTANCE, request); + } + + @Override + public void getDataStreams(GetDataStreamsAction.Request request, ActionListener listener) { + execute(GetDataStreamsAction.INSTANCE, request, listener); + } + + @Override + public ActionFuture getDataStreams(GetDataStreamsAction.Request request) { + return execute(GetDataStreamsAction.INSTANCE, request); + } } @Override diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java b/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java new file mode 100644 index 0000000000000..f6bba191a173b --- /dev/null +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java @@ -0,0 +1,116 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.cluster.metadata; + +import org.elasticsearch.cluster.AbstractDiffable; +import org.elasticsearch.cluster.Diff; +import org.elasticsearch.common.ParseField; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.xcontent.ConstructingObjectParser; +import org.elasticsearch.common.xcontent.ToXContentObject; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentParser; + +import java.io.IOException; +import java.util.List; +import java.util.Objects; + +public final class DataStream extends AbstractDiffable implements ToXContentObject { + + private final String name; + private final String timeStampField; + private final List indices; + + public DataStream(String name, String timeStampField, List indices) { + this.name = name; + this.timeStampField = timeStampField; + this.indices = indices; + } + + public String getName() { + return name; + } + + public String getTimeStampField() { + return timeStampField; + } + + public List getIndices() { + return indices; + } + + public DataStream(StreamInput in) throws IOException { + this(in.readString(), in.readString(), in.readStringList()); + } + + public static Diff readDiffFrom(StreamInput in) throws IOException { + return readDiffFrom(DataStream::new, in); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeString(name); + out.writeString(timeStampField); + out.writeStringCollection(indices); + } + + public static final ParseField NAME_FIELD = new ParseField("name"); + public static final ParseField TIMESTAMP_FIELD_FIELD = new ParseField("timestamp_field"); + public static final ParseField INDICES_FIELD = new ParseField("indices"); + + @SuppressWarnings("unchecked") + private static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>("data_stream", + args -> new DataStream((String) args[0], (String) args[1], (List) args[2])); + + static { + PARSER.declareString(ConstructingObjectParser.constructorArg(), NAME_FIELD); + PARSER.declareString(ConstructingObjectParser.constructorArg(), TIMESTAMP_FIELD_FIELD); + PARSER.declareStringArray(ConstructingObjectParser.constructorArg(), INDICES_FIELD); + } + + public static DataStream fromXContent(XContentParser parser) throws IOException { + return PARSER.parse(parser, null); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.field(NAME_FIELD.getPreferredName(), name); + builder.field(TIMESTAMP_FIELD_FIELD.getPreferredName(), timeStampField); + builder.field(INDICES_FIELD.getPreferredName(), indices); + builder.endObject(); + return builder; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + DataStream that = (DataStream) o; + return name.equals(that.name) && + timeStampField.equals(that.timeStampField) && + indices.equals(that.indices); + } + + @Override + public int hashCode() { + return Objects.hash(name, timeStampField, indices); + } +} diff --git a/server/src/main/java/org/elasticsearch/rest/action/admin/indices/RestCreateDataStreamAction.java b/server/src/main/java/org/elasticsearch/rest/action/admin/indices/RestCreateDataStreamAction.java new file mode 100644 index 0000000000000..67b67677195dc --- /dev/null +++ b/server/src/main/java/org/elasticsearch/rest/action/admin/indices/RestCreateDataStreamAction.java @@ -0,0 +1,58 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.rest.action.admin.indices; + +import org.elasticsearch.action.admin.indices.datastream.CreateDataStreamAction; +import org.elasticsearch.client.node.NodeClient; +import org.elasticsearch.cluster.metadata.DataStream; +import org.elasticsearch.rest.BaseRestHandler; +import org.elasticsearch.rest.RestRequest; +import org.elasticsearch.rest.action.RestToXContentListener; + +import java.io.IOException; +import java.util.List; +import java.util.Map; + +public class RestCreateDataStreamAction extends BaseRestHandler { + + @Override + public String getName() { + return "create_data_stream_action"; + } + + @Override + public List routes() { + return List.of( + new Route(RestRequest.Method.PUT, "/_data_stream/{name}") + ); + } + + @Override + protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException { + CreateDataStreamAction.Request putDataStreamRequest = new CreateDataStreamAction.Request(request.param("name")); + request.withContentOrSourceParamParserOrNull(parser -> { + Map body = parser.map(); + String timeStampFieldName = (String) body.get(DataStream.TIMESTAMP_FIELD_FIELD.getPreferredName()); + if (timeStampFieldName != null) { + putDataStreamRequest.setTimestampFieldName(timeStampFieldName); + } + }); + return channel -> client.admin().indices().createDataStream(putDataStreamRequest, new RestToXContentListener<>(channel)); + } +} diff --git a/server/src/main/java/org/elasticsearch/rest/action/admin/indices/RestDeleteDataStreamAction.java b/server/src/main/java/org/elasticsearch/rest/action/admin/indices/RestDeleteDataStreamAction.java new file mode 100644 index 0000000000000..b69ee981d12b3 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/rest/action/admin/indices/RestDeleteDataStreamAction.java @@ -0,0 +1,47 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.rest.action.admin.indices; + +import org.elasticsearch.action.admin.indices.datastream.DeleteDataStreamAction; +import org.elasticsearch.client.node.NodeClient; +import org.elasticsearch.rest.BaseRestHandler; +import org.elasticsearch.rest.RestRequest; +import org.elasticsearch.rest.action.RestToXContentListener; + +import java.io.IOException; +import java.util.List; + +public class RestDeleteDataStreamAction extends BaseRestHandler { + + @Override + public String getName() { + return "delete_data_stream_action"; + } + + @Override + public List routes() { + return List.of(new Route(RestRequest.Method.DELETE, "/_data_stream/{name}")); + } + + @Override + protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException { + DeleteDataStreamAction.Request deleteDataStreamRequest = new DeleteDataStreamAction.Request(request.param("name")); + return channel -> client.admin().indices().deleteDataStream(deleteDataStreamRequest, new RestToXContentListener<>(channel)); + } +} diff --git a/server/src/main/java/org/elasticsearch/rest/action/admin/indices/RestGetDataStreamsAction.java b/server/src/main/java/org/elasticsearch/rest/action/admin/indices/RestGetDataStreamsAction.java new file mode 100644 index 0000000000000..fdbe63829882e --- /dev/null +++ b/server/src/main/java/org/elasticsearch/rest/action/admin/indices/RestGetDataStreamsAction.java @@ -0,0 +1,52 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.rest.action.admin.indices; + +import org.elasticsearch.action.admin.indices.datastream.GetDataStreamsAction; +import org.elasticsearch.client.node.NodeClient; +import org.elasticsearch.common.Strings; +import org.elasticsearch.rest.BaseRestHandler; +import org.elasticsearch.rest.RestRequest; +import org.elasticsearch.rest.action.RestToXContentListener; + +import java.io.IOException; +import java.util.List; + +public class RestGetDataStreamsAction extends BaseRestHandler { + + @Override + public String getName() { + return "get_data_streams_action"; + } + + @Override + public List routes() { + return List.of( + new Route(RestRequest.Method.GET, "/_data_streams"), + new Route(RestRequest.Method.GET, "/_data_streams/{name}") + ); + } + + @Override + protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException { + String[] names = Strings.splitStringByCommaToArray(request.param("name")); + GetDataStreamsAction.Request getDataStreamsRequest = new GetDataStreamsAction.Request(names); + return channel -> client.admin().indices().getDataStreams(getDataStreamsRequest, new RestToXContentListener<>(channel)); + } +} diff --git a/server/src/test/java/org/elasticsearch/action/admin/indices/datastream/CreateDataStreamRequestTests.java b/server/src/test/java/org/elasticsearch/action/admin/indices/datastream/CreateDataStreamRequestTests.java new file mode 100644 index 0000000000000..d6a846c205fb3 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/action/admin/indices/datastream/CreateDataStreamRequestTests.java @@ -0,0 +1,38 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.action.admin.indices.datastream; + +import org.elasticsearch.action.admin.indices.datastream.CreateDataStreamAction.Request; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.test.AbstractWireSerializingTestCase; + +public class CreateDataStreamRequestTests extends AbstractWireSerializingTestCase { + + @Override + protected Writeable.Reader instanceReader() { + return Request::new; + } + + @Override + protected Request createTestInstance() { + Request request = new Request(randomAlphaOfLength(8)); + request.setTimestampFieldName(randomAlphaOfLength(8)); + return request; + } +} diff --git a/server/src/test/java/org/elasticsearch/action/admin/indices/datastream/DeleteDataStreamRequestTests.java b/server/src/test/java/org/elasticsearch/action/admin/indices/datastream/DeleteDataStreamRequestTests.java new file mode 100644 index 0000000000000..f460065699795 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/action/admin/indices/datastream/DeleteDataStreamRequestTests.java @@ -0,0 +1,36 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.action.admin.indices.datastream; + +import org.elasticsearch.action.admin.indices.datastream.DeleteDataStreamAction.Request; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.test.AbstractWireSerializingTestCase; + +public class DeleteDataStreamRequestTests extends AbstractWireSerializingTestCase { + + @Override + protected Writeable.Reader instanceReader() { + return Request::new; + } + + @Override + protected Request createTestInstance() { + return new Request(randomAlphaOfLength(8)); + } +} diff --git a/server/src/test/java/org/elasticsearch/action/admin/indices/datastream/GetDataStreamsRequestTests.java b/server/src/test/java/org/elasticsearch/action/admin/indices/datastream/GetDataStreamsRequestTests.java new file mode 100644 index 0000000000000..062bdef629cc9 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/action/admin/indices/datastream/GetDataStreamsRequestTests.java @@ -0,0 +1,36 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.action.admin.indices.datastream; + +import org.elasticsearch.action.admin.indices.datastream.GetDataStreamsAction.Request; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.test.AbstractWireSerializingTestCase; + +public class GetDataStreamsRequestTests extends AbstractWireSerializingTestCase { + + @Override + protected Writeable.Reader instanceReader() { + return Request::new; + } + + @Override + protected Request createTestInstance() { + return new Request(generateRandomStringArray(8, 8, false)); + } +} diff --git a/server/src/test/java/org/elasticsearch/action/admin/indices/datastream/GetDataStreamsResponseTests.java b/server/src/test/java/org/elasticsearch/action/admin/indices/datastream/GetDataStreamsResponseTests.java new file mode 100644 index 0000000000000..5c74c515634cc --- /dev/null +++ b/server/src/test/java/org/elasticsearch/action/admin/indices/datastream/GetDataStreamsResponseTests.java @@ -0,0 +1,60 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.action.admin.indices.datastream; + +import org.elasticsearch.action.admin.indices.datastream.GetDataStreamsAction.Response; +import org.elasticsearch.cluster.metadata.DataStream; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.common.xcontent.XContentParser.Token; +import org.elasticsearch.test.AbstractSerializingTestCase; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +public class GetDataStreamsResponseTests extends AbstractSerializingTestCase { + + @Override + protected Writeable.Reader instanceReader() { + return Response::new; + } + + @Override + protected Response doParseInstance(XContentParser parser) throws IOException { + List dataStreams = new ArrayList<>(); + for (Token token = parser.nextToken(); token != Token.END_ARRAY; token = parser.nextToken()) { + if (token == Token.START_OBJECT) { + dataStreams.add(DataStream.fromXContent(parser)); + } + } + return new Response(dataStreams); + } + + @Override + protected Response createTestInstance() { + int numDataStreams = randomIntBetween(0, 8); + List dataStreams = new ArrayList<>(); + for (int i = 0; i < numDataStreams; i++) { + dataStreams.add(new DataStream(randomAlphaOfLength(4), randomAlphaOfLength(4), + List.of(generateRandomStringArray(8, 4, false)))); + } + return new Response(dataStreams); + } +} diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamTests.java b/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamTests.java new file mode 100644 index 0000000000000..072165ab098ef --- /dev/null +++ b/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamTests.java @@ -0,0 +1,50 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.cluster.metadata; + +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.test.AbstractSerializingTestCase; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +public class DataStreamTests extends AbstractSerializingTestCase { + + @Override + protected DataStream doParseInstance(XContentParser parser) throws IOException { + return DataStream.fromXContent(parser); + } + + @Override + protected Writeable.Reader instanceReader() { + return DataStream::new; + } + + @Override + protected DataStream createTestInstance() { + int numIndices = randomIntBetween(0, 128); + List indices = new ArrayList<>(numIndices); + for (int i = 0; i < numIndices; i++) { + indices.add(randomAlphaOfLength(10)); + } + return new DataStream(randomAlphaOfLength(10), randomAlphaOfLength(10), indices); + } +} diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authz/privilege/ClusterPrivilegeResolver.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authz/privilege/ClusterPrivilegeResolver.java index 7fa41f033ea44..23023001e27f1 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authz/privilege/ClusterPrivilegeResolver.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authz/privilege/ClusterPrivilegeResolver.java @@ -55,7 +55,7 @@ public class ClusterPrivilegeResolver { private static final Set MONITOR_TRANSFORM_PATTERN = Set.of("cluster:monitor/data_frame/*", "cluster:monitor/transform/*"); private static final Set MONITOR_WATCHER_PATTERN = Set.of("cluster:monitor/xpack/watcher/*"); private static final Set MONITOR_ROLLUP_PATTERN = Set.of("cluster:monitor/xpack/rollup/*"); - private static final Set ALL_CLUSTER_PATTERN = Set.of("cluster:*", "indices:admin/template/*"); + private static final Set ALL_CLUSTER_PATTERN = Set.of("cluster:*", "indices:admin/template/*", "indices:admin/data_stream/*"); private static final Set MANAGE_ML_PATTERN = Set.of("cluster:admin/xpack/ml/*", "cluster:monitor/xpack/ml/*"); private static final Set MANAGE_TRANSFORM_PATTERN = Set.of("cluster:admin/data_frame/*", "cluster:monitor/data_frame/*", "cluster:monitor/transform/*", "cluster:admin/transform/*"); @@ -197,7 +197,10 @@ public static Set names() { } public static boolean isClusterAction(String actionName) { - return actionName.startsWith("cluster:") || actionName.startsWith("indices:admin/template/"); + return actionName.startsWith("cluster:") || + actionName.startsWith("indices:admin/template/") || + // todo: hack until we implement security of data_streams + actionName.startsWith("indices:admin/data_stream/"); } private static String actionToPattern(String text) { diff --git a/x-pack/qa/core-rest-tests-with-security/build.gradle b/x-pack/qa/core-rest-tests-with-security/build.gradle index 139fb7cd2cabe..651c6af0308a2 100644 --- a/x-pack/qa/core-rest-tests-with-security/build.gradle +++ b/x-pack/qa/core-rest-tests-with-security/build.gradle @@ -43,5 +43,6 @@ testClusters.integTest { testClusters.integTest { if (BuildParams.isSnapshotBuild() == false) { systemProperty 'es.itv2_feature_flag_registered', 'true' + systemProperty 'es.datastreams_feature_flag_registered', 'true' } }