Skip to content

Commit

Permalink
Wait for shards to be active after closing indices (#38854)
Browse files Browse the repository at this point in the history
This commit changes the Close Index API to add a `wait_for_active_shards`
 parameter that allows to wait for shards of closed indices to be active before 
returning a response.

Relates #33888
  • Loading branch information
tlrx authored Feb 26, 2019
1 parent 5e7a428 commit 3f9993d
Show file tree
Hide file tree
Showing 24 changed files with 459 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@
"options" : ["open","closed","none","all"],
"default" : "open",
"description" : "Whether to expand wildcard expression to concrete indices that are open, closed or both."
},
"wait_for_active_shards": {
"type" : "string",
"description" : "Sets the number of active shards to wait for before the operation returns."
}
}
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
- do:
indices.close:
index: test_index
- is_true: acknowledged

- do:
catch: bad_request
Expand All @@ -24,6 +25,7 @@
- do:
indices.open:
index: test_index
- is_true: acknowledged

- do:
cluster.health:
Expand All @@ -50,11 +52,33 @@
- do:
indices.close:
index: test_index
- is_true: acknowledged

- do:
indices.open:
index: test_index
wait_for_active_shards: all
- is_true: acknowledged
- match: { acknowledged: true }
- match: { shards_acknowledged: true }

---
"Close index with wait_for_active_shards set to all":
- skip:
version: " - 7.99.99"
reason: "closed indices are replicated starting version 8.0"

- do:
indices.create:
index: test_index
body:
settings:
number_of_replicas: 0

- do:
indices.close:
index: test_index
wait_for_active_shards: all
- is_true: acknowledged
- match: { acknowledged: true }
- match: { shards_acknowledged: true }
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ setup:
- do:
indices.close:
index: _all
- is_true: acknowledged

- do:
catch: bad_request
Expand All @@ -36,6 +37,7 @@ setup:
- do:
indices.open:
index: _all
- is_true: acknowledged

- do:
cluster.health:
Expand All @@ -51,6 +53,7 @@ setup:
- do:
indices.close:
index: test_*
- is_true: acknowledged

- do:
catch: bad_request
Expand All @@ -61,6 +64,7 @@ setup:
- do:
indices.open:
index: test_*
- is_true: acknowledged

- do:
cluster.health:
Expand All @@ -76,6 +80,7 @@ setup:
- do:
indices.close:
index: '*'
- is_true: acknowledged

- do:
catch: bad_request
Expand All @@ -86,6 +91,7 @@ setup:
- do:
indices.open:
index: '*'
- is_true: acknowledged

- do:
cluster.health:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,8 @@
package org.elasticsearch.action.admin.indices.close;

import org.elasticsearch.action.Action;
import org.elasticsearch.action.support.master.AcknowledgedResponse;

public class CloseIndexAction extends Action<AcknowledgedResponse> {
public class CloseIndexAction extends Action<CloseIndexResponse> {

public static final CloseIndexAction INSTANCE = new CloseIndexAction();
public static final String NAME = "indices:admin/close";
Expand All @@ -32,7 +31,7 @@ private CloseIndexAction() {
}

@Override
public AcknowledgedResponse newResponse() {
return new AcknowledgedResponse();
public CloseIndexResponse newResponse() {
return new CloseIndexResponse();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,16 @@
*/
package org.elasticsearch.action.admin.indices.close;

import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.cluster.ack.IndicesClusterStateUpdateRequest;

/**
* Cluster state update request that allows to close one or more indices
*/
public class CloseIndexClusterStateUpdateRequest extends IndicesClusterStateUpdateRequest<CloseIndexClusterStateUpdateRequest> {

private final long taskId;
private long taskId;
private ActiveShardCount waitForActiveShards = ActiveShardCount.DEFAULT;

public CloseIndexClusterStateUpdateRequest(final long taskId) {
this.taskId = taskId;
Expand All @@ -34,4 +36,18 @@ public CloseIndexClusterStateUpdateRequest(final long taskId) {
public long taskId() {
return taskId;
}

public CloseIndexClusterStateUpdateRequest taskId(final long taskId) {
this.taskId = taskId;
return this;
}

public ActiveShardCount waitForActiveShards() {
return waitForActiveShards;
}

public CloseIndexClusterStateUpdateRequest waitForActiveShards(final ActiveShardCount waitForActiveShards) {
this.waitForActiveShards = waitForActiveShards;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@

package org.elasticsearch.action.admin.indices.close;

import org.elasticsearch.Version;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.IndicesRequest;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.master.AcknowledgedRequest;
import org.elasticsearch.common.io.stream.StreamInput;
Expand All @@ -38,6 +40,7 @@ public class CloseIndexRequest extends AcknowledgedRequest<CloseIndexRequest> im

private String[] indices;
private IndicesOptions indicesOptions = IndicesOptions.strictExpandOpen();
private ActiveShardCount waitForActiveShards = ActiveShardCount.DEFAULT; //NORELEASE Changes this to NONE to keep previous behavior

public CloseIndexRequest() {
}
Expand Down Expand Up @@ -101,17 +104,34 @@ public CloseIndexRequest indicesOptions(IndicesOptions indicesOptions) {
return this;
}

public ActiveShardCount waitForActiveShards() {
return waitForActiveShards;
}

public CloseIndexRequest waitForActiveShards(final ActiveShardCount waitForActiveShards) {
this.waitForActiveShards = waitForActiveShards;
return this;
}

@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
indices = in.readStringArray();
indicesOptions = IndicesOptions.readIndicesOptions(in);
if (in.getVersion().onOrAfter(Version.V_8_0_0)) {
waitForActiveShards = ActiveShardCount.readFrom(in);
} else {
waitForActiveShards = ActiveShardCount.NONE;
}
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeStringArray(indices);
indicesOptions.writeIndicesOptions(out);
if (out.getVersion().onOrAfter(Version.V_8_0_0)) {
waitForActiveShards.writeTo(out);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,16 @@

package org.elasticsearch.action.admin.indices.close;

import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.master.AcknowledgedRequestBuilder;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.ElasticsearchClient;

/**
* Builder for close index request
*/
public class CloseIndexRequestBuilder
extends AcknowledgedRequestBuilder<CloseIndexRequest, AcknowledgedResponse, CloseIndexRequestBuilder> {
extends AcknowledgedRequestBuilder<CloseIndexRequest, CloseIndexResponse, CloseIndexRequestBuilder> {

public CloseIndexRequestBuilder(ElasticsearchClient client, CloseIndexAction action) {
super(client, action, new CloseIndexRequest());
Expand Down Expand Up @@ -60,4 +60,31 @@ public CloseIndexRequestBuilder setIndicesOptions(IndicesOptions indicesOptions)
request.indicesOptions(indicesOptions);
return this;
}

/**
* Sets the number of shard copies that should be active for indices closing to return.
* Defaults to {@link ActiveShardCount#DEFAULT}, which will wait for one shard copy
* (the primary) to become active. Set this value to {@link ActiveShardCount#ALL} to
* wait for all shards (primary and all replicas) to be active before returning.
* Otherwise, use {@link ActiveShardCount#from(int)} to set this value to any
* non-negative integer, up to the number of copies per shard (number of replicas + 1),
* to wait for the desired amount of shard copies to become active before returning.
* Indices closing will only wait up until the timeout value for the number of shard copies
* to be active before returning.
*
* @param waitForActiveShards number of active shard copies to wait on
*/
public CloseIndexRequestBuilder setWaitForActiveShards(final ActiveShardCount waitForActiveShards) {
request.waitForActiveShards(waitForActiveShards);
return this;
}

/**
* A shortcut for {@link #setWaitForActiveShards(ActiveShardCount)} where the numerical
* shard count is passed in, instead of having to first call {@link ActiveShardCount#from(int)}
* to get the ActiveShardCount.
*/
public CloseIndexRequestBuilder setWaitForActiveShards(final int waitForActiveShards) {
return setWaitForActiveShards(ActiveShardCount.from(waitForActiveShards));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.indices.close;

import org.elasticsearch.Version;
import org.elasticsearch.action.support.master.ShardsAcknowledgedResponse;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;

import java.io.IOException;

public class CloseIndexResponse extends ShardsAcknowledgedResponse {

CloseIndexResponse() {
}

public CloseIndexResponse(final boolean acknowledged, final boolean shardsAcknowledged) {
super(acknowledged, shardsAcknowledged);
}

@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
if (in.getVersion().onOrAfter(Version.V_8_0_0)) {
readShardsAcknowledged(in);
}
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
if (out.getVersion().onOrAfter(Version.V_8_0_0)) {
writeShardsAcknowledged(out);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.DestructiveOperations;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
Expand All @@ -44,7 +43,7 @@
/**
* Close index action
*/
public class TransportCloseIndexAction extends TransportMasterNodeAction<CloseIndexRequest, AcknowledgedResponse> {
public class TransportCloseIndexAction extends TransportMasterNodeAction<CloseIndexRequest, CloseIndexResponse> {

private final MetaDataIndexStateService indexStateService;
private final DestructiveOperations destructiveOperations;
Expand Down Expand Up @@ -76,12 +75,12 @@ protected String executor() {
}

@Override
protected AcknowledgedResponse newResponse() {
return new AcknowledgedResponse();
protected CloseIndexResponse newResponse() {
return new CloseIndexResponse();
}

@Override
protected void doExecute(Task task, CloseIndexRequest request, ActionListener<AcknowledgedResponse> listener) {
protected void doExecute(Task task, CloseIndexRequest request, ActionListener<CloseIndexResponse> listener) {
destructiveOperations.failDestructive(request.indices());
if (closeIndexEnabled == false) {
throw new IllegalStateException("closing indices is disabled - set [" + CLUSTER_INDICES_CLOSE_ENABLE_SETTING.getKey() +
Expand All @@ -97,29 +96,33 @@ protected ClusterBlockException checkBlock(CloseIndexRequest request, ClusterSta
}

@Override
protected void masterOperation(final CloseIndexRequest request, final ClusterState state,
final ActionListener<AcknowledgedResponse> listener) {
protected void masterOperation(final CloseIndexRequest request,
final ClusterState state,
final ActionListener<CloseIndexResponse> listener) {
throw new UnsupportedOperationException("The task parameter is required");
}

@Override
protected void masterOperation(final Task task, final CloseIndexRequest request, final ClusterState state,
final ActionListener<AcknowledgedResponse> listener) throws Exception {
protected void masterOperation(final Task task,
final CloseIndexRequest request,
final ClusterState state,
final ActionListener<CloseIndexResponse> listener) throws Exception {
final Index[] concreteIndices = indexNameExpressionResolver.concreteIndices(state, request);
if (concreteIndices == null || concreteIndices.length == 0) {
listener.onResponse(new AcknowledgedResponse(true));
listener.onResponse(new CloseIndexResponse(true, false));
return;
}

final CloseIndexClusterStateUpdateRequest closeRequest = new CloseIndexClusterStateUpdateRequest(task.getId())
.ackTimeout(request.timeout())
.masterNodeTimeout(request.masterNodeTimeout())
.waitForActiveShards(request.waitForActiveShards())
.indices(concreteIndices);

indexStateService.closeIndices(closeRequest, new ActionListener<AcknowledgedResponse>() {
indexStateService.closeIndices(closeRequest, new ActionListener<CloseIndexResponse>() {

@Override
public void onResponse(final AcknowledgedResponse response) {
public void onResponse(final CloseIndexResponse response) {
listener.onResponse(response);
}

Expand Down
Loading

0 comments on commit 3f9993d

Please sign in to comment.