Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

API for adding and removing indices from a data stream #79279

Merged
merged 15 commits into from
Oct 19, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
{
"indices.modify_data_stream":{
sethmlarson marked this conversation as resolved.
Show resolved Hide resolved
"documentation":{
"url":"https://www.elastic.co/guide/en/elasticsearch/reference/master/data-streams.html",
"description":"Modifies a data stream"
},
"stability":"stable",
"visibility":"public",
"headers":{
"accept": [ "application/json"],
"content_type": ["application/json"]
},
"url":{
"paths":[
{
"path":"/_data_stream/_modify",
"methods":["POST"]
}
]
},
"params":{
},
"body":{
"description":"The data stream modifications",
"required":true
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,8 @@
import org.elasticsearch.action.bulk.BulkAction;
import org.elasticsearch.action.bulk.TransportBulkAction;
import org.elasticsearch.action.bulk.TransportShardBulkAction;
import org.elasticsearch.action.datastreams.ModifyDataStreamsAction;
import org.elasticsearch.action.datastreams.ModifyDataStreamsTransportAction;
import org.elasticsearch.action.delete.DeleteAction;
import org.elasticsearch.action.delete.TransportDeleteAction;
import org.elasticsearch.action.explain.ExplainAction;
Expand Down Expand Up @@ -370,6 +372,7 @@
import org.elasticsearch.rest.action.cat.RestTasksAction;
import org.elasticsearch.rest.action.cat.RestTemplatesAction;
import org.elasticsearch.rest.action.cat.RestThreadPoolAction;
import org.elasticsearch.rest.action.datastreams.RestModifyDataStreamsAction;
import org.elasticsearch.rest.action.document.RestBulkAction;
import org.elasticsearch.rest.action.document.RestDeleteAction;
import org.elasticsearch.rest.action.document.RestGetAction;
Expand Down Expand Up @@ -599,6 +602,9 @@ public <Request extends ActionRequest, Response extends ActionResponse> void reg
actions.register(AnalyzeIndexDiskUsageAction.INSTANCE, TransportAnalyzeIndexDiskUsageAction.class);
actions.register(FieldUsageStatsAction.INSTANCE, TransportFieldUsageAction.class);

//Data streams
actions.register(ModifyDataStreamsAction.INSTANCE, ModifyDataStreamsTransportAction.class);

//Indexed scripts
actions.register(PutStoredScriptAction.INSTANCE, TransportPutStoredScriptAction.class);
actions.register(GetStoredScriptAction.INSTANCE, TransportGetStoredScriptAction.class);
Expand Down Expand Up @@ -763,6 +769,9 @@ public void initRestHandlers(Supplier<DiscoveryNodes> nodesInCluster) {

registerHandler.accept(new RestReloadSecureSettingsAction());

// Data streams
registerHandler.accept(new RestModifyDataStreamsAction());

// Scripts API
registerHandler.accept(new RestGetStoredScriptAction());
registerHandler.accept(new RestPutStoredScriptAction());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.action.datastreams;

import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.IndicesRequest;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.master.AcknowledgedRequest;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.cluster.metadata.DataStreamAction;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.xcontent.ConstructingObjectParser;
import org.elasticsearch.xcontent.ParseField;
import org.elasticsearch.xcontent.ToXContentObject;
import org.elasticsearch.xcontent.XContentBuilder;

import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Objects;

import static org.elasticsearch.action.ValidateActions.addValidationError;

public class ModifyDataStreamsAction extends ActionType<AcknowledgedResponse> {

public static final ModifyDataStreamsAction INSTANCE = new ModifyDataStreamsAction();
public static final String NAME = "indices:admin/data_stream/modify";

private ModifyDataStreamsAction() {
super(NAME, AcknowledgedResponse::readFrom);
}

public static final class Request
extends AcknowledgedRequest<Request>
implements IndicesRequest, ToXContentObject {

// relevant only for authorizing the request, so require every specified
// index to exist, expand wildcards only to open indices, prohibit
// wildcard expressions that resolve to zero indices, and do not attempt
// to resolve expressions as aliases
private static final IndicesOptions INDICES_OPTIONS =
IndicesOptions.fromOptions(false, false, true, false, true, false, true, false);

private final List<DataStreamAction> actions;

public Request(StreamInput in) throws IOException {
super(in);
actions = in.readList(DataStreamAction::new);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeList(actions);
}

public Request(List<DataStreamAction> actions) {
this.actions = Collections.unmodifiableList(actions);
}

public List<DataStreamAction> getActions() {
return actions;
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.startArray("actions");
for (DataStreamAction action : actions) {
action.toXContent(builder, params);
}
builder.endArray();
builder.endObject();
return builder;
}

@Override
public ActionRequestValidationException validate() {
if (actions.isEmpty()) {
return addValidationError("must specify at least one data stream modification action", null);
}
return null;
}

@SuppressWarnings("unchecked")
public static final ConstructingObjectParser<Request, Void> PARSER = new ConstructingObjectParser<>(
"data_stream_actions",
args -> new Request(((List<DataStreamAction>) args[0]))
);
static {
PARSER.declareObjectArray(ConstructingObjectParser.constructorArg(), DataStreamAction.PARSER, new ParseField("actions"));
}

@Override
public String[] indices() {
return actions.stream().map(DataStreamAction::getDataStream).toArray(String[]::new);
}

@Override
public IndicesOptions indicesOptions() {
return INDICES_OPTIONS;
}

@Override
public boolean includeDataStreams() {
return true;
}

@Override
public boolean equals(Object obj) {
if (obj == null || obj.getClass() != getClass()) {
return false;
}
Request other = (Request) obj;
return Arrays.equals(actions.toArray(), other.actions.toArray());
}

@Override
public int hashCode() {
return Objects.hash(actions);
}

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.action.datastreams;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.support.master.AcknowledgedTransportMasterNodeAction;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.MetadataDataStreamsService;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

public class ModifyDataStreamsTransportAction extends AcknowledgedTransportMasterNodeAction<
ModifyDataStreamsAction.Request> {

private final MetadataDataStreamsService metadataDataStreamsService;

@Inject
public ModifyDataStreamsTransportAction(
TransportService transportService,
ClusterService clusterService,
ThreadPool threadPool,
ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver,
MetadataDataStreamsService metadataDataStreamsService
) {
super(
ModifyDataStreamsAction.NAME,
transportService,
clusterService,
threadPool,
actionFilters,
ModifyDataStreamsAction.Request::new,
indexNameExpressionResolver,
ThreadPool.Names.SAME
);
this.metadataDataStreamsService = metadataDataStreamsService;
}

@Override
protected void masterOperation(
Task task,
ModifyDataStreamsAction.Request request,
ClusterState state,
ActionListener<AcknowledgedResponse> listener
) throws Exception {
metadataDataStreamsService.modifyDataStream(request, listener);
}

@Override
protected ClusterBlockException checkBlock(ModifyDataStreamsAction.Request request, ClusterState state) {
ClusterBlockException globalBlock = state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE);
if (globalBlock != null) {
return globalBlock;
}
return state.blocks().indicesBlockedException(ClusterBlockLevel.METADATA_WRITE,
indexNameExpressionResolver.concreteIndexNames(state, request));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ public DataStream removeBackingIndex(Index index) {
index.getName(), name
));
}
if (generation == (backingIndexPosition + 1)) {
if (indices.size() == (backingIndexPosition + 1)) {
throw new IllegalArgumentException(String.format(
Locale.ROOT,
"cannot remove backing index [%s] of data stream [%s] because it is the write index",
Expand Down
Loading