Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Close Index API] Add unique UUID to ClusterBlock #36775

Merged
merged 17 commits into from
Jan 7, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
package org.elasticsearch.test.rest;

import org.apache.http.util.EntityUtils;
import org.apache.lucene.util.LuceneTestCase;
import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.client.Request;
Expand All @@ -41,7 +40,6 @@
/**
* Tests that wait for refresh is fired if the index is closed.
*/
@LuceneTestCase.AwaitsFix(bugUrl = "to be created")
public class WaitForRefreshAndCloseIT extends ESRestTestCase {
@Before
public void setupIndex() throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,10 @@
import org.elasticsearch.cluster.block.ClusterBlock;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.MetaDataIndexStateService;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.shard.IndexShard;
Expand All @@ -41,13 +42,14 @@
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

import java.io.IOException;
import java.util.Objects;
import java.util.function.Consumer;

public class TransportVerifyShardBeforeCloseAction extends TransportReplicationAction<
TransportVerifyShardBeforeCloseAction.ShardRequest, TransportVerifyShardBeforeCloseAction.ShardRequest, ReplicationResponse> {

public static final String NAME = CloseIndexAction.NAME + "[s]";
public static final ClusterBlock EXPECTED_BLOCK = MetaDataIndexStateService.INDEX_CLOSED_BLOCK;

@Inject
public TransportVerifyShardBeforeCloseAction(final Settings settings, final TransportService transportService,
Expand Down Expand Up @@ -83,25 +85,25 @@ protected void acquireReplicaOperationPermit(final IndexShard replica,
@Override
protected PrimaryResult<ShardRequest, ReplicationResponse> shardOperationOnPrimary(final ShardRequest shardRequest,
final IndexShard primary) throws Exception {
executeShardOperation(primary);
executeShardOperation(shardRequest, primary);
return new PrimaryResult<>(shardRequest, new ReplicationResponse());
}

@Override
protected ReplicaResult shardOperationOnReplica(final ShardRequest shardRequest, final IndexShard replica) throws Exception {
executeShardOperation(replica);
executeShardOperation(shardRequest, replica);
return new ReplicaResult();
}

private void executeShardOperation(final IndexShard indexShard) {
private void executeShardOperation(final ShardRequest request, final IndexShard indexShard) {
final ShardId shardId = indexShard.shardId();
if (indexShard.getActiveOperationsCount() != 0) {
throw new IllegalStateException("On-going operations in progress while checking index shard " + shardId + " before closing");
}

final ClusterBlocks clusterBlocks = clusterService.state().blocks();
if (clusterBlocks.hasIndexBlock(shardId.getIndexName(), EXPECTED_BLOCK) == false) {
throw new IllegalStateException("Index shard " + shardId + " must be blocked by " + EXPECTED_BLOCK + " before closing");
if (clusterBlocks.hasIndexBlock(shardId.getIndexName(), request.clusterBlock()) == false) {
throw new IllegalStateException("Index shard " + shardId + " must be blocked by " + request.clusterBlock() + " before closing");
}

final long maxSeqNo = indexShard.seqNoStats().getMaxSeqNo();
Expand Down Expand Up @@ -139,17 +141,36 @@ public void markShardCopyAsStaleIfNeeded(final ShardId shardId, final String all

public static class ShardRequest extends ReplicationRequest<ShardRequest> {

private ClusterBlock clusterBlock;

ShardRequest(){
}

public ShardRequest(final ShardId shardId, final TaskId parentTaskId) {
public ShardRequest(final ShardId shardId, final ClusterBlock clusterBlock, final TaskId parentTaskId) {
super(shardId);
this.clusterBlock = Objects.requireNonNull(clusterBlock);
setParentTask(parentTaskId);
}

@Override
public String toString() {
return "verify shard before close {" + shardId + "}";
return "verify shard " + shardId + " before close with block " + clusterBlock;
}

@Override
public void readFrom(final StreamInput in) throws IOException {
super.readFrom(in);
clusterBlock = ClusterBlock.readClusterBlock(in);
}

@Override
public void writeTo(final StreamOutput out) throws IOException {
super.writeTo(out);
clusterBlock.writeTo(out);
}

public ClusterBlock clusterBlock() {
return clusterBlock;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

package org.elasticsearch.cluster.block;

import org.elasticsearch.Version;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
Expand All @@ -30,29 +32,31 @@
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.Locale;
import java.util.Objects;

public class ClusterBlock implements Streamable, ToXContentFragment {

private int id;

private @Nullable String uuid;
private String description;

private EnumSet<ClusterBlockLevel> levels;

private boolean retryable;

private boolean disableStatePersistence = false;

private boolean allowReleaseResources;

private RestStatus status;

ClusterBlock() {
private ClusterBlock() {
}

public ClusterBlock(int id, String description, boolean retryable, boolean disableStatePersistence,
boolean allowReleaseResources, RestStatus status, EnumSet<ClusterBlockLevel> levels) {
this(id, null, description, retryable, disableStatePersistence, allowReleaseResources, status, levels);
}

public ClusterBlock(int id, String description, boolean retryable, boolean disableStatePersistence, boolean allowReleaseResources,
RestStatus status, EnumSet<ClusterBlockLevel> levels) {
public ClusterBlock(int id, String uuid, String description, boolean retryable, boolean disableStatePersistence,
boolean allowReleaseResources, RestStatus status, EnumSet<ClusterBlockLevel> levels) {
this.id = id;
this.uuid = uuid;
this.description = description;
this.retryable = retryable;
this.disableStatePersistence = disableStatePersistence;
Expand All @@ -65,6 +69,10 @@ public int id() {
return this.id;
}

public String uuid() {
tlrx marked this conversation as resolved.
Show resolved Hide resolved
return uuid;
}

public String description() {
return this.description;
}
Expand Down Expand Up @@ -104,6 +112,9 @@ public boolean disableStatePersistence() {
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject(Integer.toString(id));
if (uuid != null) {
builder.field("uuid", uuid);
}
builder.field("description", description);
builder.field("retryable", retryable);
if (disableStatePersistence) {
Expand All @@ -127,6 +138,11 @@ public static ClusterBlock readClusterBlock(StreamInput in) throws IOException {
@Override
public void readFrom(StreamInput in) throws IOException {
id = in.readVInt();
if (in.getVersion().onOrAfter(Version.V_7_0_0)) {
uuid = in.readOptionalString();
} else {
uuid = null;
}
description = in.readString();
final int len = in.readVInt();
ArrayList<ClusterBlockLevel> levels = new ArrayList<>(len);
Expand All @@ -143,6 +159,9 @@ public void readFrom(StreamInput in) throws IOException {
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVInt(id);
if (out.getVersion().onOrAfter(Version.V_7_0_0)) {
out.writeOptionalString(uuid);
}
out.writeString(description);
out.writeVInt(levels.size());
for (ClusterBlockLevel level : levels) {
Expand All @@ -157,7 +176,11 @@ public void writeTo(StreamOutput out) throws IOException {
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append(id).append(",").append(description).append(", blocks ");
sb.append(id).append(",");
if (uuid != null) {
sb.append(uuid).append(',');
}
sb.append(description).append(", blocks ");
String delimiter = "";
for (ClusterBlockLevel level : levels) {
sb.append(delimiter).append(level.name());
Expand All @@ -168,19 +191,19 @@ public String toString() {

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;

ClusterBlock that = (ClusterBlock) o;

if (id != that.id) return false;

return true;
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
final ClusterBlock that = (ClusterBlock) o;
return id == that.id && Objects.equals(uuid, that.uuid);
}

@Override
public int hashCode() {
return id;
return Objects.hash(id, uuid);
}

public boolean isAllowReleaseResources() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.elasticsearch.cluster.Diff;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaDataIndexStateService;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
Expand Down Expand Up @@ -147,6 +148,31 @@ public boolean hasIndexBlock(String index, ClusterBlock block) {
return indicesBlocks.containsKey(index) && indicesBlocks.get(index).contains(block);
}

public boolean hasIndexBlockWithId(String index, int blockId) {
final Set<ClusterBlock> clusterBlocks = indicesBlocks.get(index);
if (clusterBlocks != null) {
for (ClusterBlock clusterBlock : clusterBlocks) {
if (clusterBlock.id() == blockId) {
return true;
}
}
}
return false;
}

@Nullable
public ClusterBlock getIndexBlockWithId(final String index, final int blockId) {
final Set<ClusterBlock> clusterBlocks = indicesBlocks.get(index);
if (clusterBlocks != null) {
for (ClusterBlock clusterBlock : clusterBlocks) {
if (clusterBlock.id() == blockId) {
return clusterBlock;
}
}
}
return null;
}

public void globalBlockedRaiseException(ClusterBlockLevel level) throws ClusterBlockException {
ClusterBlockException blockException = globalBlockedException(level);
if (blockException != null) {
Expand Down Expand Up @@ -403,6 +429,18 @@ public Builder removeIndexBlock(String index, ClusterBlock block) {
return this;
}

public Builder removeIndexBlockWithId(String index, int blockId) {
final Set<ClusterBlock> indexBlocks = indices.get(index);
if (indexBlocks == null) {
return this;
}
indexBlocks.removeIf(block -> block.id() == blockId);
if (indexBlocks.isEmpty()) {
indices.remove(index);
}
return this;
}

public ClusterBlocks build() {
// We copy the block sets here in case of the builder is modified after build is called
ImmutableOpenMap.Builder<String, Set<ClusterBlock>> indicesBuilder = ImmutableOpenMap.builder(indices.size());
Expand Down
Loading