Skip to content

Commit

Permalink
Introduce ShardState Enum + Slight Cleanup SnapshotsInProgress
Browse files Browse the repository at this point in the history
  • Loading branch information
mkleen committed May 26, 2020
1 parent 969ef59 commit ccb764c
Show file tree
Hide file tree
Showing 4 changed files with 128 additions and 119 deletions.
212 changes: 109 additions & 103 deletions server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,30 +22,27 @@
import com.carrotsearch.hppc.ObjectContainer;
import com.carrotsearch.hppc.cursors.ObjectCursor;
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
import io.crate.common.unit.TimeValue;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterState.Custom;
import javax.annotation.Nullable;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import io.crate.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.repositories.IndexId;
import org.elasticsearch.snapshots.Snapshot;

import javax.annotation.Nullable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.Objects;

/**
* Meta data about snapshots that are currently executing
Expand All @@ -55,14 +52,7 @@ public class SnapshotsInProgress extends AbstractNamedDiffable<Custom> implement

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;

SnapshotsInProgress that = (SnapshotsInProgress) o;

if (!entries.equals(that.entries)) return false;

return true;
return entries.equals(((SnapshotsInProgress) o).entries);
}

@Override
Expand All @@ -82,7 +72,7 @@ public String toString() {
return builder.append("]").toString();
}

public static class Entry implements ToXContent {
public static class Entry {
private final State state;
private final Snapshot snapshot;
private final boolean includeGlobalState;
Expand All @@ -109,7 +99,6 @@ public Entry(Snapshot snapshot, boolean includeGlobalState, boolean partial, Sta
} else {
this.shards = shards;
this.waitingIndices = findWaitingIndices(shards);
assert assertShardsConsistent(state, indices, shards);
}
this.repositoryStateId = repositoryStateId;
this.failure = failure;
Expand All @@ -122,31 +111,18 @@ public Entry(Snapshot snapshot, boolean includeGlobalState, boolean partial, Sta

public Entry(Entry entry, State state, ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards) {
this(entry.snapshot, entry.includeGlobalState, entry.partial, state, entry.indices, entry.startTime,
entry.repositoryStateId, shards, entry.failure);
entry.repositoryStateId, shards, entry.failure);
}

public Entry(Entry entry, State state, ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards, String failure) {
this(entry.snapshot, entry.includeGlobalState, entry.partial, state, entry.indices, entry.startTime,
entry.repositoryStateId, shards, failure);
entry.repositoryStateId, shards, failure);
}

public Entry(Entry entry, ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards) {
this(entry, entry.state, shards, entry.failure);
}

private static boolean assertShardsConsistent(State state, List<IndexId> indices,
ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards) {
if ((state == State.INIT || state == State.ABORTED) && shards.isEmpty()) {
return true;
}
final Set<String> indexNames = indices.stream().map(IndexId::getName).collect(Collectors.toSet());
final Set<String> indexNamesInShards = new HashSet<>();
shards.keysIt().forEachRemaining(s -> indexNamesInShards.add(s.getIndexName()));
assert indexNames.equals(indexNamesInShards)
: "Indices in shards " + indexNamesInShards + " differ from expected indices " + indexNames + " for state [" + state + "]";
return true;
}

public Snapshot snapshot() {
return this.snapshot;
}
Expand Down Expand Up @@ -201,6 +177,7 @@ public boolean equals(Object o) {
if (!shards.equals(entry.shards)) return false;
if (!snapshot.equals(entry.snapshot)) return false;
if (state != entry.state) return false;
if (!waitingIndices.equals(entry.waitingIndices)) return false;
if (repositoryStateId != entry.repositoryStateId) return false;

return true;
Expand All @@ -214,71 +191,22 @@ public int hashCode() {
result = 31 * result + (partial ? 1 : 0);
result = 31 * result + shards.hashCode();
result = 31 * result + indices.hashCode();
result = 31 * result + waitingIndices.hashCode();
result = 31 * result + Long.hashCode(startTime);
result = 31 * result + Long.hashCode(repositoryStateId);
return result;
}

@Override
public String toString() {
return Strings.toString(this);
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(REPOSITORY, snapshot.getRepository());
builder.field(SNAPSHOT, snapshot.getSnapshotId().getName());
builder.field(UUID, snapshot.getSnapshotId().getUUID());
builder.field(INCLUDE_GLOBAL_STATE, includeGlobalState());
builder.field(PARTIAL, partial);
builder.field(STATE, state);
builder.startArray(INDICES);
{
for (IndexId index : indices) {
index.toXContent(builder, params);
}
}
builder.endArray();
builder.humanReadableField(START_TIME_MILLIS, START_TIME, new TimeValue(startTime));
builder.field(REPOSITORY_STATE_ID, repositoryStateId);
builder.startArray(SHARDS);
{
for (ObjectObjectCursor<ShardId, ShardSnapshotStatus> shardEntry : shards) {
ShardId shardId = shardEntry.key;
ShardSnapshotStatus status = shardEntry.value;
builder.startObject();
{
builder.field(INDEX, shardId.getIndex());
builder.field(SHARD, shardId.getId());
builder.field(STATE, status.state());
builder.field(NODE, status.nodeId());
}
builder.endObject();
}
}
builder.endArray();
builder.endObject();
return builder;
}

@Override
public boolean isFragment() {
return false;
return snapshot.toString();
}

// package private for testing
ImmutableOpenMap<String, List<ShardId>> findWaitingIndices(ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards) {
private ImmutableOpenMap<String, List<ShardId>> findWaitingIndices(ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards) {
Map<String, List<ShardId>> waitingIndicesMap = new HashMap<>();
for (ObjectObjectCursor<ShardId, ShardSnapshotStatus> entry : shards) {
if (entry.value.state() == State.WAITING) {
final String indexName = entry.key.getIndexName();
List<ShardId> waitingShards = waitingIndicesMap.get(indexName);
if (waitingShards == null) {
waitingShards = new ArrayList<>();
waitingIndicesMap.put(indexName, waitingShards);
}
waitingShards.add(entry.key);
if (entry.value.state() == ShardState.WAITING) {
waitingIndicesMap.computeIfAbsent(entry.key.getIndexName(), k -> new ArrayList<>()).add(entry.key);
}
}
if (waitingIndicesMap.isEmpty()) {
Expand All @@ -300,27 +228,27 @@ ImmutableOpenMap<String, List<ShardId>> findWaitingIndices(ImmutableOpenMap<Shar
*/
public static boolean completed(ObjectContainer<ShardSnapshotStatus> shards) {
for (ObjectCursor<ShardSnapshotStatus> status : shards) {
if (status.value.state().completed() == false) {
if (status.value.state().completed == false) {
return false;
}
}
return true;
}

public static class ShardSnapshotStatus {
private final State state;
private final ShardState state;
private final String nodeId;
private final String reason;

public ShardSnapshotStatus(String nodeId) {
this(nodeId, State.INIT);
this(nodeId, ShardState.INIT);
}

public ShardSnapshotStatus(String nodeId, State state) {
public ShardSnapshotStatus(String nodeId, ShardState state) {
this(nodeId, state, null);
}

public ShardSnapshotStatus(String nodeId, State state, String reason) {
public ShardSnapshotStatus(String nodeId, ShardState state, String reason) {
this.nodeId = nodeId;
this.state = state;
this.reason = reason;
Expand All @@ -330,11 +258,11 @@ public ShardSnapshotStatus(String nodeId, State state, String reason) {

public ShardSnapshotStatus(StreamInput in) throws IOException {
nodeId = in.readOptionalString();
state = State.fromValue(in.readByte());
state = ShardState.fromValue(in.readByte());
reason = in.readOptionalString();
}

public State state() {
public ShardState state() {
return state;
}

Expand All @@ -356,14 +284,9 @@ public void writeTo(StreamOutput out) throws IOException {
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;

ShardSnapshotStatus status = (ShardSnapshotStatus) o;
return Objects.equals(nodeId, status.nodeId) && Objects.equals(reason, status.reason) && state == status.state;

if (nodeId != null ? !nodeId.equals(status.nodeId) : status.nodeId != null) return false;
if (reason != null ? !reason.equals(status.reason) : status.reason != null) return false;
if (state != status.state) return false;

return true;
}

@Override
Expand All @@ -389,11 +312,11 @@ public enum State {
MISSING((byte) 5, true, true),
WAITING((byte) 6, false, false);

private byte value;
private final byte value;

private boolean completed;
private final boolean completed;

private boolean failed;
private final boolean failed;

State(byte value, boolean completed, boolean failed) {
this.value = value;
Expand Down Expand Up @@ -437,7 +360,6 @@ public static State fromValue(byte value) {

private final List<Entry> entries;


public SnapshotsInProgress(List<Entry> entries) {
this.entries = entries;
}
Expand Down Expand Up @@ -551,9 +473,93 @@ public void writeTo(StreamOutput out) throws IOException {
public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException {
builder.startArray(SNAPSHOTS);
for (Entry entry : entries) {
entry.toXContent(builder, params);
toXContent(entry, builder, params);
}
builder.endArray();
return builder;
}

public void toXContent(Entry entry, XContentBuilder builder, ToXContent.Params params) throws IOException {
builder.startObject();
builder.field(REPOSITORY, entry.snapshot().getRepository());
builder.field(SNAPSHOT, entry.snapshot().getSnapshotId().getName());
builder.field(UUID, entry.snapshot().getSnapshotId().getUUID());
builder.field(INCLUDE_GLOBAL_STATE, entry.includeGlobalState());
builder.field(PARTIAL, entry.partial());
builder.field(STATE, entry.state());
builder.startArray(INDICES);
{
for (IndexId index : entry.indices()) {
index.toXContent(builder, params);
}
}
builder.endArray();
builder.humanReadableField(START_TIME_MILLIS, START_TIME, new TimeValue(entry.startTime()));
builder.field(REPOSITORY_STATE_ID, entry.getRepositoryStateId());
builder.startArray(SHARDS);
{
for (ObjectObjectCursor<ShardId, ShardSnapshotStatus> shardEntry : entry.shards) {
ShardId shardId = shardEntry.key;
ShardSnapshotStatus status = shardEntry.value;
builder.startObject();
{
builder.field(INDEX, shardId.getIndex());
builder.field(SHARD, shardId.getId());
builder.field(STATE, status.state());
builder.field(NODE, status.nodeId());
}
builder.endObject();
}
}
builder.endArray();
builder.endObject();
}

public enum ShardState {
INIT((byte) 0, false, false),
SUCCESS((byte) 2, true, false),
FAILED((byte) 3, true, true),
ABORTED((byte) 4, false, true),
MISSING((byte) 5, true, true),
WAITING((byte) 6, false, false);

private final byte value;

private final boolean completed;

private final boolean failed;

ShardState(byte value, boolean completed, boolean failed) {
this.value = value;
this.completed = completed;
this.failed = failed;
}

public boolean completed() {
return completed;
}

public boolean failed() {
return failed;
}

public static ShardState fromValue(byte value) {
switch (value) {
case 0:
return INIT;
case 2:
return SUCCESS;
case 3:
return FAILED;
case 4:
return ABORTED;
case 5:
return MISSING;
case 6:
return WAITING;
default:
throw new IllegalArgumentException("No shard snapshot state for value [" + value + "]");
}
}
}
}
Loading

0 comments on commit ccb764c

Please sign in to comment.