Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update IndexShardSnapshotStatus when an exception is encountered #32265

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,11 @@ public enum SnapshotIndexShardStage {
/**
* Snapshot failed
*/
FAILURE((byte)4, true);
FAILURE((byte)4, true),
/**
* Snapshot aborted
*/
ABORTED((byte)5, false);

private byte value;

Expand Down Expand Up @@ -88,6 +92,8 @@ public static SnapshotIndexShardStage fromValue(byte value) {
return DONE;
case 4:
return FAILURE;
case 5:
return ABORTED;
default:
throw new IllegalArgumentException("No snapshot shard stage for value [" + value + "]");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,9 @@ private SnapshotIndexShardStatus() {
case FAILURE:
stage = SnapshotIndexShardStage.FAILURE;
break;
case ABORTED:
stage = SnapshotIndexShardStage.ABORTED;
break;
default:
throw new IllegalArgumentException("Unknown stage type " + indexShardStatus.getStage());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ public class SnapshotShardsStats implements ToXContentObject {
private int finalizingShards;
private int doneShards;
private int failedShards;
private int abortedShards;
private int totalShards;

SnapshotShardsStats(Collection<SnapshotIndexShardStatus> shards) {
Expand All @@ -62,6 +63,9 @@ public class SnapshotShardsStats implements ToXContentObject {
case FAILURE:
failedShards++;
break;
case ABORTED:
abortedShards++;
break;
default:
throw new IllegalArgumentException("Unknown stage type " + shard.getStage());
}
Expand Down Expand Up @@ -113,6 +117,13 @@ public int getFailedShards() {
return failedShards;
}

/**
* Number of shards with the snapshot in the aborted stage
*/
public int getAbortedShards() {
return abortedShards;
}

/**
* Total number of shards
*/
Expand All @@ -127,6 +138,7 @@ static final class Fields {
static final String FINALIZING = "finalizing";
static final String DONE = "done";
static final String FAILED = "failed";
static final String ABORTED = "aborted";
static final String TOTAL = "total";
}

Expand All @@ -139,6 +151,7 @@ public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params par
builder.field(Fields.FINALIZING, getFinalizingShards());
builder.field(Fields.DONE, getDoneShards());
builder.field(Fields.FAILED, getFailedShards());
builder.field(Fields.ABORTED, getAbortedShards());
builder.field(Fields.TOTAL, getTotalShards());
}
builder.endObject();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,10 @@ public boolean isAborted() {
return stage.get() == Stage.ABORTED;
}

public boolean isFailed() {
return stage.get() == Stage.FAILURE;
}

/**
* Increments number of processed files
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Priority;
Expand All @@ -52,6 +53,7 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.SnapshotFailedEngineException;
import org.elasticsearch.index.shard.IndexEventListener;
Expand Down Expand Up @@ -294,6 +296,14 @@ private void processIndexShardSnapshots(ClusterChangedEvent event) {
logger.debug("[{}] trying to cancel snapshot on the shard [{}] that has already failed, " +
"updating status on the master", entry.snapshot(), shard.key);
notifyFailedSnapshotShard(entry.snapshot(), shard.key, localNodeId, lastSnapshotStatus.getFailure());
} else if (stage == Stage.ABORTED) {
final IndexRoutingTable indexRoutingTable = event.state().getRoutingTable().index(shard.key.getIndexName());
final String shardCurrentNodeId = indexRoutingTable.getShards().get(shard.key.getId()).primaryShard().currentNodeId();
if (!shard.value.nodeId().equals(shardCurrentNodeId)) {
// Shard's snapshot node id and current primary node id are different, most likely no thread is working on it
final String failure_message = "Shard's snapshot state node and current primary node are different";
notifyFailedSnapshotShard(entry.snapshot(), shard.key, localNodeId, failure_message);
}
}
}
}
Expand Down Expand Up @@ -325,16 +335,22 @@ private void processIndexShardSnapshots(ClusterChangedEvent event) {

for (final Map.Entry<ShardId, IndexShardSnapshotStatus> shardEntry : entry.getValue().entrySet()) {
final ShardId shardId = shardEntry.getKey();
final IndexShard indexShard = indicesService.indexServiceSafe(shardId.getIndex()).getShardOrNull(shardId.id());
final IndexId indexId = indicesMap.get(shardId.getIndexName());
assert indexId != null;
executor.execute(new AbstractRunnable() {

final SetOnce<Exception> failure = new SetOnce<>();

@Override
public void doRun() {
snapshot(indexShard, snapshot, indexId, shardEntry.getValue());
try {
final IndexShard indexShard = indicesService.indexServiceSafe(shardId.getIndex()).getShardOrNull(shardId.id());
final IndexId indexId = indicesMap.get(shardId.getIndexName());
assert indexId != null;
snapshot(indexShard, snapshot, indexId, shardEntry.getValue());
} catch(IndexNotFoundException e) {
final String failure = "IndexNotFoundException while fetching index";
shardEntry.getValue().moveToFailed(System.currentTimeMillis(), failure);
throw e;
}
}

@Override
Expand All @@ -352,7 +368,12 @@ public void onRejection(Exception e) {
public void onAfter() {
final Exception exception = failure.get();
if (exception != null) {
notifyFailedSnapshotShard(snapshot, shardId, localNodeId, ExceptionsHelper.detailedMessage(exception));
final String failure = ExceptionsHelper.detailedMessage(exception);
if (!shardEntry.getValue().isFailed()) {
// The status is not yet moved to failed, move it before notifying the master
shardEntry.getValue().moveToFailed(System.currentTimeMillis(), failure);
}
notifyFailedSnapshotShard(snapshot, shardId, localNodeId, failure);
} else {
notifySuccessfulSnapshotShard(snapshot, shardId, localNodeId);
}
Expand All @@ -372,16 +393,22 @@ public void onAfter() {
private void snapshot(final IndexShard indexShard, final Snapshot snapshot, final IndexId indexId, final IndexShardSnapshotStatus snapshotStatus) {
final ShardId shardId = indexShard.shardId();
if (indexShard.routingEntry().primary() == false) {
final String failure = "Snapshot should be performed only on primary";
snapshotStatus.moveToFailed(System.currentTimeMillis(), failure);
throw new IndexShardSnapshotFailedException(shardId, "snapshot should be performed only on primary");
}
if (indexShard.routingEntry().relocating()) {
// do not snapshot when in the process of relocation of primaries so we won't get conflicts
final String failure = "Cannot snapshot while relocating";
snapshotStatus.moveToFailed(System.currentTimeMillis(), failure);
throw new IndexShardSnapshotFailedException(shardId, "cannot snapshot while relocating");
}

final IndexShardState indexShardState = indexShard.state();
if (indexShardState == IndexShardState.CREATED || indexShardState == IndexShardState.RECOVERING) {
// shard has just been created, or still recovering
final String failure = "Shard didn't fully recover yet";
snapshotStatus.moveToFailed(System.currentTimeMillis(), failure);
throw new IndexShardSnapshotFailedException(shardId, "shard didn't fully recover yet");
}

Expand All @@ -395,9 +422,11 @@ private void snapshot(final IndexShard indexShard, final Snapshot snapshot, fina
logger.debug("snapshot ({}) completed to {} with {}", snapshot, repository, lastSnapshotStatus);
}
}
} catch (SnapshotFailedEngineException | IndexShardSnapshotFailedException e) {
} catch (IndexShardSnapshotFailedException e) {
// Shard status already moved to failed by BlobStoreRepository, throw the exception
throw e;
} catch (Exception e) {
snapshotStatus.moveToFailed(System.currentTimeMillis(), ExceptionsHelper.detailedMessage(e));
throw new IndexShardSnapshotFailedException(shardId, "Failed to snapshot", e);
}
}
Expand Down