From 0a533f8df408955695a10d8d23632df3f70462ae Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Mon, 8 Jan 2018 11:53:31 +0100 Subject: [PATCH 1/4] Consistent updates of IndexShardSnapshotStatus IndexShardSnapshotStatus represents the status of an index shard snapshot. During the snapshot process it can be updated multiple times, it can also be aborted and the the Get Snapshot and Snapshot Status APIs are susceptible to read the snapshot statuses. Right now, the information are not updated in a coherent manner, meaning that a IndexShardSnapshotStatus can be FAILED but the failure is not updated yet. Or it could be FINALIZE with no index version. This commit changes IndexShardSnapshotStatus so that the Stage is updated coherently with any required information. It also provides a asCopy() method that returns the status of a IndexShardSnapshotStatus at a given point in time, ensuring that all information are coherent. --- .../status/SnapshotIndexShardStatus.java | 15 +- .../snapshots/status/SnapshotStats.java | 21 +- .../status/TransportNodesSnapshotsStatus.java | 16 +- .../TransportSnapshotsStatusAction.java | 3 +- .../snapshots/IndexShardSnapshotStatus.java | 317 +++++++++--------- .../repositories/Repository.java | 2 +- .../blobstore/BlobStoreRepository.java | 62 ++-- .../snapshots/SnapshotShardsService.java | 40 +-- .../snapshots/SnapshotsService.java | 9 +- .../snapshots/SnapshotShardsServiceIT.java | 2 +- .../index/shard/IndexShardTestCase.java | 10 +- 11 files changed, 243 insertions(+), 254 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/SnapshotIndexShardStatus.java b/core/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/SnapshotIndexShardStatus.java index 324cb3712adf1..1b7ead5b96510 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/SnapshotIndexShardStatus.java +++ b/core/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/SnapshotIndexShardStatus.java @@ -22,7 +22,6 @@ import org.elasticsearch.action.support.broadcast.BroadcastShardResponse; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.common.xcontent.ToXContent.Params; import org.elasticsearch.common.xcontent.ToXContentFragment; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.index.shard.ShardId; @@ -49,13 +48,13 @@ private SnapshotIndexShardStatus() { this.stats = new SnapshotStats(); } - SnapshotIndexShardStatus(ShardId shardId, IndexShardSnapshotStatus indexShardStatus) { + SnapshotIndexShardStatus(ShardId shardId, IndexShardSnapshotStatus.Copy indexShardStatus) { this(shardId, indexShardStatus, null); } - SnapshotIndexShardStatus(ShardId shardId, IndexShardSnapshotStatus indexShardStatus, String nodeId) { + SnapshotIndexShardStatus(ShardId shardId, IndexShardSnapshotStatus.Copy indexShardStatus, String nodeId) { super(shardId); - switch (indexShardStatus.stage()) { + switch (indexShardStatus.getStage()) { case INIT: stage = SnapshotIndexShardStage.INIT; break; @@ -72,10 +71,12 @@ private SnapshotIndexShardStatus() { stage = SnapshotIndexShardStage.FAILURE; break; default: - throw new IllegalArgumentException("Unknown stage type " + indexShardStatus.stage()); + throw new IllegalArgumentException("Unknown stage type " + indexShardStatus.getStage()); } - stats = new SnapshotStats(indexShardStatus); - failure = indexShardStatus.failure(); + this.stats = new SnapshotStats(indexShardStatus.getStartTime(), indexShardStatus.getTotalTime(), + indexShardStatus.getNumberOfFiles(), indexShardStatus.getProcessedFiles(), + indexShardStatus.getTotalSize(), indexShardStatus.getProcessedSize()); + this.failure = indexShardStatus.getFailure(); this.nodeId = nodeId; } diff --git a/core/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/SnapshotStats.java b/core/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/SnapshotStats.java index ba11e51d56f87..5b2bdd7c614c6 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/SnapshotStats.java +++ b/core/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/SnapshotStats.java @@ -25,33 +25,28 @@ import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.ToXContentFragment; import org.elasticsearch.common.xcontent.XContentBuilder; -import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus; import java.io.IOException; public class SnapshotStats implements Streamable, ToXContentFragment { - private long startTime; + private long startTime; private long time; - private int numberOfFiles; - private int processedFiles; - private long totalSize; - private long processedSize; SnapshotStats() { } - SnapshotStats(IndexShardSnapshotStatus indexShardStatus) { - startTime = indexShardStatus.startTime(); - time = indexShardStatus.time(); - numberOfFiles = indexShardStatus.numberOfFiles(); - processedFiles = indexShardStatus.processedFiles(); - totalSize = indexShardStatus.totalSize(); - processedSize = indexShardStatus.processedSize(); + SnapshotStats(long startTime, long time, int numberOfFiles, int processedFiles, long totalSize, long processedSize) { + this.startTime = startTime; + this.time = time; + this.numberOfFiles = numberOfFiles; + this.processedFiles = processedFiles; + this.totalSize = totalSize; + this.processedSize = processedSize; } /** diff --git a/core/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/TransportNodesSnapshotsStatus.java b/core/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/TransportNodesSnapshotsStatus.java index 872793f6ef21a..77578546b9585 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/TransportNodesSnapshotsStatus.java +++ b/core/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/TransportNodesSnapshotsStatus.java @@ -96,7 +96,7 @@ protected NodesSnapshotStatus newResponse(Request request, List> snapshotMapBuilder = new HashMap<>(); try { - String nodeId = clusterService.localNode().getId(); + final String nodeId = clusterService.localNode().getId(); for (Snapshot snapshot : request.snapshots) { Map shardsStatus = snapshotShardsService.currentSnapshotShards(snapshot); if (shardsStatus == null) { @@ -104,15 +104,17 @@ protected NodeSnapshotStatus nodeOperation(NodeRequest request) { } Map shardMapBuilder = new HashMap<>(); for (Map.Entry shardEntry : shardsStatus.entrySet()) { - SnapshotIndexShardStatus shardStatus; - IndexShardSnapshotStatus.Stage stage = shardEntry.getValue().stage(); + final ShardId shardId = shardEntry.getKey(); + + final IndexShardSnapshotStatus.Copy lastSnapshotStatus = shardEntry.getValue().asCopy(); + final IndexShardSnapshotStatus.Stage stage = lastSnapshotStatus.getStage(); + + String shardNodeId = null; if (stage != IndexShardSnapshotStatus.Stage.DONE && stage != IndexShardSnapshotStatus.Stage.FAILURE) { // Store node id for the snapshots that are currently running. - shardStatus = new SnapshotIndexShardStatus(shardEntry.getKey(), shardEntry.getValue(), nodeId); - } else { - shardStatus = new SnapshotIndexShardStatus(shardEntry.getKey(), shardEntry.getValue()); + shardNodeId = nodeId; } - shardMapBuilder.put(shardEntry.getKey(), shardStatus); + shardMapBuilder.put(shardEntry.getKey(), new SnapshotIndexShardStatus(shardId, lastSnapshotStatus, shardNodeId)); } snapshotMapBuilder.put(snapshot, unmodifiableMap(shardMapBuilder)); } diff --git a/core/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/TransportSnapshotsStatusAction.java b/core/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/TransportSnapshotsStatusAction.java index 71bb1995dd57e..dc13c8dab5188 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/TransportSnapshotsStatusAction.java +++ b/core/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/TransportSnapshotsStatusAction.java @@ -233,7 +233,8 @@ private SnapshotsStatusResponse buildResponse(SnapshotsStatusRequest request, Li Map shardStatues = snapshotsService.snapshotShards(request.repository(), snapshotInfo); for (Map.Entry shardStatus : shardStatues.entrySet()) { - shardStatusBuilder.add(new SnapshotIndexShardStatus(shardStatus.getKey(), shardStatus.getValue())); + IndexShardSnapshotStatus.Copy lastSnapshotStatus = shardStatus.getValue().asCopy(); + shardStatusBuilder.add(new SnapshotIndexShardStatus(shardStatus.getKey(), lastSnapshotStatus)); } final SnapshotsInProgress.State state; switch (snapshotInfo.state()) { diff --git a/core/src/main/java/org/elasticsearch/index/snapshots/IndexShardSnapshotStatus.java b/core/src/main/java/org/elasticsearch/index/snapshots/IndexShardSnapshotStatus.java index 644caa7520be5..af9ca042435bc 100644 --- a/core/src/main/java/org/elasticsearch/index/snapshots/IndexShardSnapshotStatus.java +++ b/core/src/main/java/org/elasticsearch/index/snapshots/IndexShardSnapshotStatus.java @@ -19,6 +19,12 @@ package org.elasticsearch.index.snapshots; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.unit.TimeValue; + +import java.util.Objects; +import java.util.concurrent.atomic.AtomicReference; + /** * Represent shard snapshot status */ @@ -47,119 +53,86 @@ public enum Stage { /** * Snapshot failed */ - FAILURE + FAILURE, + /** + * Snapshot aborted + */ + ABORTED } - private Stage stage = Stage.INIT; - + private final AtomicReference stage; private long startTime; - - private long time; - + private long totalTime; private int numberOfFiles; - private volatile int processedFiles; - private long totalSize; - private volatile long processedSize; - private long indexVersion; - - private volatile boolean aborted; - private String failure; - /** - * Returns current snapshot stage - * - * @return current snapshot stage - */ - public Stage stage() { - return this.stage; - } - - /** - * Sets new snapshot stage - * - * @param stage new snapshot stage - */ - public void updateStage(Stage stage) { - this.stage = stage; - } - - /** - * Returns snapshot start time - * - * @return snapshot start time - */ - public long startTime() { - return this.startTime; - } - - /** - * Sets snapshot start time - * - * @param startTime snapshot start time - */ - public void startTime(long startTime) { + private IndexShardSnapshotStatus(final Stage stage, final long startTime, final long totalTime, + final int numberOfFiles, final int processedFiles, final long totalSize, final long processedSize, + final long indexVersion, final String failure) { + this.stage = new AtomicReference<>(Objects.requireNonNull(stage)); this.startTime = startTime; + this.totalTime = totalTime; + this.numberOfFiles = numberOfFiles; + this.processedFiles = processedFiles; + this.totalSize = totalSize; + this.processedSize = processedSize; + this.indexVersion = indexVersion; + this.failure = failure; } - /** - * Returns snapshot processing time - * - * @return processing time - */ - public long time() { - return this.time; + public synchronized Copy moveToStarted(final long startTime, final int numberOfFiles, final long totalSize) { + ensureNotAborted(); + if (stage.compareAndSet(Stage.INIT, Stage.STARTED)) { + this.startTime = startTime; + this.numberOfFiles = numberOfFiles; + this.totalSize = totalSize; + } else { + throw new IllegalStateException("Unable to move the shard snapshot status to started: it is not initializing"); + } + return asCopy(); } - /** - * Sets snapshot processing time - * - * @param time snapshot processing time - */ - public void time(long time) { - this.time = time; + public synchronized Copy moveToFinalize(final long indexVersion) { + ensureNotAborted(); + if (stage.compareAndSet(Stage.STARTED, Stage.FINALIZE)) { + this.indexVersion = indexVersion; + } else { + throw new IllegalStateException("Unable to move the shard snapshot status to finalize: it is not started"); + } + return asCopy(); } - /** - * Returns true if snapshot process was aborted - * - * @return true if snapshot process was aborted - */ - public boolean aborted() { - return this.aborted; + public synchronized Copy moveToDone(final long endTime) { + ensureNotAborted(); + if (stage.compareAndSet(Stage.FINALIZE, Stage.DONE)) { + this.totalTime = Math.max(0L, endTime - startTime); + } else { + throw new IllegalStateException("Unable to move the shard snapshot status to done: it is not finalizing"); + } + return asCopy(); } - /** - * Marks snapshot as aborted - */ - public void abort() { - this.aborted = true; + public synchronized void moveToAborted(final String failure) { + if (stage.getAndSet(Stage.ABORTED) != Stage.ABORTED) { + this.failure = failure; + } } - /** - * Sets files stats - * - * @param numberOfFiles number of files in this snapshot - * @param totalSize total size of files in this snapshot - */ - public void files(int numberOfFiles, long totalSize) { - this.numberOfFiles = numberOfFiles; - this.totalSize = totalSize; + public synchronized void moveToFailed(final long endTime, final String failure) { + if (stage.getAndSet(Stage.FAILURE) != Stage.FAILURE) { + this.totalTime = Math.max(0L, endTime - startTime); + this.failure = failure; + } } - /** - * Sets processed files stats - * - * @param numberOfFiles number of files in this snapshot - * @param totalSize total size of files in this snapshot - */ - public synchronized void processedFiles(int numberOfFiles, long totalSize) { - processedFiles = numberOfFiles; - processedSize = totalSize; + public void ensureNotAborted() { + if (stage.get() == Stage.ABORTED) { + throw new IllegalStateException("Aborted"); + } } /** @@ -171,71 +144,105 @@ public synchronized void addProcessedFile(long size) { } /** - * Number of files - * - * @return number of files - */ - public int numberOfFiles() { - return numberOfFiles; - } - - /** - * Total snapshot size - * - * @return snapshot size - */ - public long totalSize() { - return totalSize; - } - - /** - * Number of processed files - * - * @return number of processed files - */ - public int processedFiles() { - return processedFiles; - } - - /** - * Size of processed files + * Returns a copy of the current {@link IndexShardSnapshotStatus}. This method is + * intended to be used when a coherent state of {@link IndexShardSnapshotStatus} is needed. * - * @return size of processed files - */ - public long processedSize() { - return processedSize; - } - - - /** - * Sets index version - * - * @param indexVersion index version - */ - public void indexVersion(long indexVersion) { - this.indexVersion = indexVersion; - } - - /** - * Returns index version - * - * @return index version - */ - public long indexVersion() { - return indexVersion; - } - - /** - * Sets the reason for the failure if the snapshot is in the {@link IndexShardSnapshotStatus.Stage#FAILURE} state - */ - public void failure(String failure) { - this.failure = failure; - } - - /** - * Returns the reason for the failure if the snapshot is in the {@link IndexShardSnapshotStatus.Stage#FAILURE} state - */ - public String failure() { - return failure; + * @return a {@link IndexShardSnapshotStatus.Copy} + */ + public synchronized IndexShardSnapshotStatus.Copy asCopy() { + return new IndexShardSnapshotStatus.Copy(stage.get(), startTime, totalTime, numberOfFiles, processedFiles, totalSize, processedSize, + indexVersion, failure); + } + + public static IndexShardSnapshotStatus newInitializing() { + return new IndexShardSnapshotStatus(Stage.INIT, 0L, 0L, 0, 0, 0, 0, 0, null); + } + + public static IndexShardSnapshotStatus newFailed(final String failure) { + if (failure == null) { + throw new IllegalArgumentException("A failure description is required for a failed IndexShardSnapshotStatus"); + } + return new IndexShardSnapshotStatus(Stage.FAILURE, 0L, 0L, 0, 0, 0, 0, 0, failure); + } + + public static IndexShardSnapshotStatus newDone(final long startTime, final long totalTime, final int files, final long size) { + // The snapshot is done which means the number of processed files is the same as total + return new IndexShardSnapshotStatus(Stage.DONE, startTime, totalTime, files, files, size, size, 0, null); + } + + /** + * Returns an immutable state of {@link IndexShardSnapshotStatus} at a given point in time. + */ + public static class Copy { + + private final Stage stage; + private final long startTime; + private final long totalTime; + private final int numberOfFiles; + private final int processedFiles; + private final long totalSize; + private final long processedSize; + private final long indexVersion; + private final String failure; + + public Copy(final Stage stage, final long startTime, final long totalTime, + final int numberOfFiles, final int processedFiles, final long totalSize, final long processedSize, + final long indexVersion, final String failure) { + this.stage = stage; + this.startTime = startTime; + this.totalTime = totalTime; + this.numberOfFiles = numberOfFiles; + this.processedFiles = processedFiles; + this.totalSize = totalSize; + this.processedSize = processedSize; + this.indexVersion = indexVersion; + this.failure = failure; + } + + public Stage getStage() { + return stage; + } + + public long getStartTime() { + return startTime; + } + + public long getTotalTime() { + return totalTime; + } + + public int getNumberOfFiles() { + return numberOfFiles; + } + + public int getProcessedFiles() { + return processedFiles; + } + + public long getTotalSize() { + return totalSize; + } + + public long getProcessedSize() { + return processedSize; + } + + public long getIndexVersion() { + return indexVersion; + } + + public String getFailure() { + return failure; + } + + @Override + public String toString() { + return new StringBuilder() + .append("took [").append(TimeValue.timeValueMillis(getTotalTime())).append("], ") + .append("index version [").append(getIndexVersion()).append("], ") + .append("number_of_files [").append(getNumberOfFiles()).append("], ") + .append("total_size [").append(new ByteSizeValue(getTotalSize())).append("]") + .toString(); + } } } diff --git a/core/src/main/java/org/elasticsearch/repositories/Repository.java b/core/src/main/java/org/elasticsearch/repositories/Repository.java index f711a72b67757..3e8682ba81003 100644 --- a/core/src/main/java/org/elasticsearch/repositories/Repository.java +++ b/core/src/main/java/org/elasticsearch/repositories/Repository.java @@ -180,7 +180,7 @@ SnapshotInfo finalizeSnapshot(SnapshotId snapshotId, List indices, long * Repository implementations shouldn't release the snapshot index commit point. It is done by the method caller. *

* As snapshot process progresses, implementation of this method should update {@link IndexShardSnapshotStatus} object and check - * {@link IndexShardSnapshotStatus#aborted()} to see if the snapshot process should be aborted. + * {@link IndexShardSnapshotStatus#ensureNotAborted()} to see if the snapshot process should be aborted. * * @param shard shard to be snapshotted * @param snapshotId snapshot id diff --git a/core/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/core/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java index 06812be5aab2c..145a2feedbb83 100644 --- a/core/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java +++ b/core/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java @@ -805,17 +805,11 @@ private void writeAtomic(final String blobName, final BytesReference bytesRef) t @Override public void snapshotShard(IndexShard shard, SnapshotId snapshotId, IndexId indexId, IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus) { - SnapshotContext snapshotContext = new SnapshotContext(shard, snapshotId, indexId, snapshotStatus); - snapshotStatus.startTime(System.currentTimeMillis()); - + SnapshotContext snapshotContext = new SnapshotContext(shard, snapshotId, indexId, snapshotStatus, System.currentTimeMillis()); try { snapshotContext.snapshot(snapshotIndexCommit); - snapshotStatus.time(System.currentTimeMillis() - snapshotStatus.startTime()); - snapshotStatus.updateStage(IndexShardSnapshotStatus.Stage.DONE); } catch (Exception e) { - snapshotStatus.time(System.currentTimeMillis() - snapshotStatus.startTime()); - snapshotStatus.updateStage(IndexShardSnapshotStatus.Stage.FAILURE); - snapshotStatus.failure(ExceptionsHelper.detailedMessage(e)); + snapshotStatus.moveToFailed(System.currentTimeMillis(), ExceptionsHelper.detailedMessage(e)); if (e instanceof IndexShardSnapshotFailedException) { throw (IndexShardSnapshotFailedException) e; } else { @@ -838,14 +832,7 @@ public void restoreShard(IndexShard shard, SnapshotId snapshotId, Version versio public IndexShardSnapshotStatus getShardSnapshotStatus(SnapshotId snapshotId, Version version, IndexId indexId, ShardId shardId) { Context context = new Context(snapshotId, version, indexId, shardId); BlobStoreIndexShardSnapshot snapshot = context.loadSnapshot(); - IndexShardSnapshotStatus status = new IndexShardSnapshotStatus(); - status.updateStage(IndexShardSnapshotStatus.Stage.DONE); - status.startTime(snapshot.startTime()); - status.files(snapshot.numberOfFiles(), snapshot.totalSize()); - // The snapshot is done which means the number of processed files is the same as total - status.processedFiles(snapshot.numberOfFiles(), snapshot.totalSize()); - status.time(snapshot.time()); - return status; + return IndexShardSnapshotStatus.newDone(snapshot.startTime(), snapshot.time(), snapshot.numberOfFiles(), snapshot.totalSize()); } @Override @@ -1103,8 +1090,8 @@ protected Tuple buildBlobStoreIndexShardS private class SnapshotContext extends Context { private final Store store; - private final IndexShardSnapshotStatus snapshotStatus; + private final long startTime; /** * Constructs new context @@ -1114,10 +1101,11 @@ private class SnapshotContext extends Context { * @param indexId the id of the index being snapshotted * @param snapshotStatus snapshot status to report progress */ - SnapshotContext(IndexShard shard, SnapshotId snapshotId, IndexId indexId, IndexShardSnapshotStatus snapshotStatus) { + SnapshotContext(IndexShard shard, SnapshotId snapshotId, IndexId indexId, IndexShardSnapshotStatus snapshotStatus, long startTime) { super(snapshotId, Version.CURRENT, indexId, shard.shardId()); this.snapshotStatus = snapshotStatus; this.store = shard.store(); + this.startTime = startTime; } /** @@ -1156,10 +1144,8 @@ public void snapshot(IndexCommit snapshotIndexCommit) { throw new IndexShardSnapshotFailedException(shardId, "Failed to get store file metadata", e); } for (String fileName : fileNames) { - if (snapshotStatus.aborted()) { - logger.debug("[{}] [{}] Aborted on the file [{}], exiting", shardId, snapshotId, fileName); - throw new IndexShardSnapshotFailedException(shardId, "Aborted"); - } + snapshotStatus.ensureNotAborted(); + logger.trace("[{}] [{}] Processing [{}]", shardId, snapshotId, fileName); final StoreFileMetaData md = metadata.get(fileName); BlobStoreIndexShardSnapshot.FileInfo existingFileInfo = null; @@ -1195,14 +1181,7 @@ public void snapshot(IndexCommit snapshotIndexCommit) { } } - snapshotStatus.files(indexNumberOfFiles, indexTotalFilesSize); - - if (snapshotStatus.aborted()) { - logger.debug("[{}] [{}] Aborted during initialization", shardId, snapshotId); - throw new IndexShardSnapshotFailedException(shardId, "Aborted"); - } - - snapshotStatus.updateStage(IndexShardSnapshotStatus.Stage.STARTED); + snapshotStatus.moveToStarted(startTime, indexNumberOfFiles, indexTotalFilesSize); for (BlobStoreIndexShardSnapshot.FileInfo snapshotFileInfo : filesToSnapshot) { try { @@ -1212,14 +1191,19 @@ public void snapshot(IndexCommit snapshotIndexCommit) { } } - snapshotStatus.indexVersion(snapshotIndexCommit.getGeneration()); + final IndexShardSnapshotStatus.Copy lastSnapshotStatus = snapshotStatus.moveToFinalize(snapshotIndexCommit.getGeneration()); + // now create and write the commit point - snapshotStatus.updateStage(IndexShardSnapshotStatus.Stage.FINALIZE); + final BlobStoreIndexShardSnapshot snapshot = new BlobStoreIndexShardSnapshot(snapshotId.getName(), + lastSnapshotStatus.getIndexVersion(), + indexCommitPointFiles, + lastSnapshotStatus.getStartTime(), + // snapshotStatus.startTime() is assigned on the same machine, + // so it's safe to use with VLong + System.currentTimeMillis() - lastSnapshotStatus.getStartTime(), + lastSnapshotStatus.getNumberOfFiles(), + lastSnapshotStatus.getTotalSize()); - BlobStoreIndexShardSnapshot snapshot = new BlobStoreIndexShardSnapshot(snapshotId.getName(), - snapshotIndexCommit.getGeneration(), indexCommitPointFiles, snapshotStatus.startTime(), - // snapshotStatus.startTime() is assigned on the same machine, so it's safe to use with VLong - System.currentTimeMillis() - snapshotStatus.startTime(), indexNumberOfFiles, indexTotalFilesSize); //TODO: The time stored in snapshot doesn't include cleanup time. logger.trace("[{}] [{}] writing shard snapshot file", shardId, snapshotId); try { @@ -1237,7 +1221,7 @@ public void snapshot(IndexCommit snapshotIndexCommit) { } // finalize the snapshot and rewrite the snapshot index with the next sequential snapshot index finalize(newSnapshotsList, fileListGeneration + 1, blobs); - snapshotStatus.updateStage(IndexShardSnapshotStatus.Stage.DONE); + snapshotStatus.moveToDone(System.currentTimeMillis()); } finally { store.decRef(); } @@ -1335,7 +1319,9 @@ public int read(byte[] b, int off, int len) throws IOException { } private void checkAborted() { - if (snapshotStatus.aborted()) { + try { + snapshotStatus.ensureNotAborted(); + } catch (IllegalStateException e) { logger.debug("[{}] [{}] Aborted on the file [{}], exiting", shardId, snapshotId, fileName); throw new IndexShardSnapshotFailedException(shardId, "Aborted"); } diff --git a/core/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java b/core/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java index 248f9a555a3d6..d921b81fbb835 100644 --- a/core/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java +++ b/core/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java @@ -51,7 +51,6 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.index.engine.Engine; @@ -188,7 +187,7 @@ public void beforeIndexShardClosed(ShardId shardId, @Nullable IndexShard indexSh Map shards = snapshotShards.getValue().shards; if (shards.containsKey(shardId)) { logger.debug("[{}] shard closing, abort snapshotting for snapshot [{}]", shardId, snapshotShards.getKey().getSnapshotId()); - shards.get(shardId).abort(); + shards.get(shardId).moveToAborted("shard is closing, aborting"); } } } @@ -230,8 +229,9 @@ private void processIndexShardSnapshots(ClusterChangedEvent event) { // running shards is missed, then the snapshot is removed is a subsequent cluster // state update, which is being processed here for (IndexShardSnapshotStatus snapshotStatus : entry.getValue().shards.values()) { - if (snapshotStatus.stage() == Stage.INIT || snapshotStatus.stage() == Stage.STARTED) { - snapshotStatus.abort(); + final IndexShardSnapshotStatus.Stage stage = snapshotStatus.asCopy().getStage(); + if (stage == Stage.INIT || stage == Stage.STARTED) { + snapshotStatus.moveToAborted("snapshot has been removed in cluster state, aborting"); } } } @@ -255,7 +255,7 @@ private void processIndexShardSnapshots(ClusterChangedEvent event) { if (localNodeId.equals(shard.value.nodeId())) { if (shard.value.state() == State.INIT && (snapshotShards == null || !snapshotShards.shards.containsKey(shard.key))) { logger.trace("[{}] - Adding shard to the queue", shard.key); - startedShards.put(shard.key, new IndexShardSnapshotStatus()); + startedShards.put(shard.key, IndexShardSnapshotStatus.newInitializing()); } } } @@ -279,12 +279,15 @@ private void processIndexShardSnapshots(ClusterChangedEvent event) { SnapshotShards snapshotShards = shardSnapshots.get(entry.snapshot()); if (snapshotShards != null) { for (ObjectObjectCursor shard : entry.shards()) { - IndexShardSnapshotStatus snapshotStatus = snapshotShards.shards.get(shard.key); + + final IndexShardSnapshotStatus snapshotStatus = snapshotShards.shards.get(shard.key); if (snapshotStatus != null) { - switch (snapshotStatus.stage()) { + final IndexShardSnapshotStatus.Copy lastSnapshotStatus = snapshotStatus.asCopy(); + final Stage stage = lastSnapshotStatus.getStage(); + switch (stage) { case INIT: case STARTED: - snapshotStatus.abort(); + snapshotStatus.moveToAborted("snapshot has been aborted"); break; case FINALIZE: logger.debug("[{}] trying to cancel snapshot on shard [{}] that is finalizing, " + @@ -298,10 +301,10 @@ private void processIndexShardSnapshots(ClusterChangedEvent event) { case FAILURE: logger.debug("[{}] trying to cancel snapshot on the shard [{}] that has already failed, " + "updating status on the master", entry.snapshot(), shard.key); - notifyFailedSnapshotShard(entry.snapshot(), shard.key, localNodeId, snapshotStatus.failure()); + notifyFailedSnapshotShard(entry.snapshot(), shard.key, localNodeId, lastSnapshotStatus.getFailure()); break; default: - throw new IllegalStateException("Unknown snapshot shard stage " + snapshotStatus.stage()); + throw new IllegalStateException("Unknown snapshot shard stage " + stage); } } } @@ -400,12 +403,8 @@ private void snapshot(final IndexShard indexShard, final Snapshot snapshot, fina try (Engine.IndexCommitRef snapshotRef = indexShard.acquireIndexCommit(true)) { repository.snapshotShard(indexShard, snapshot.getSnapshotId(), indexId, snapshotRef.getIndexCommit(), snapshotStatus); if (logger.isDebugEnabled()) { - StringBuilder details = new StringBuilder(); - details.append(" index : version [").append(snapshotStatus.indexVersion()); - details.append("], number_of_files [").append(snapshotStatus.numberOfFiles()); - details.append("] with total_size [").append(new ByteSizeValue(snapshotStatus.totalSize())).append("]\n"); - logger.debug("snapshot ({}) completed to {}, took [{}]\n{}", snapshot, repository, - TimeValue.timeValueMillis(snapshotStatus.time()), details); + final IndexShardSnapshotStatus.Copy lastSnapshotStatus = snapshotStatus.asCopy(); + logger.debug("snapshot ({}) completed to {} {}", snapshot, repository, lastSnapshotStatus); } } } catch (SnapshotFailedEngineException | IndexShardSnapshotFailedException e) { @@ -432,21 +431,22 @@ private void syncShardStatsOnNewMaster(ClusterChangedEvent event) { ImmutableOpenMap masterShards = snapshot.shards(); for(Map.Entry localShard : localShards.entrySet()) { ShardId shardId = localShard.getKey(); - IndexShardSnapshotStatus localShardStatus = localShard.getValue(); ShardSnapshotStatus masterShard = masterShards.get(shardId); if (masterShard != null && masterShard.state().completed() == false) { + final IndexShardSnapshotStatus.Copy indexShardSnapshotStatus = localShard.getValue().asCopy(); + final Stage stage = indexShardSnapshotStatus.getStage(); // Master knows about the shard and thinks it has not completed - if (localShardStatus.stage() == Stage.DONE) { + if (stage == Stage.DONE) { // but we think the shard is done - we need to make new master know that the shard is done logger.debug("[{}] new master thinks the shard [{}] is not completed but the shard is done locally, " + "updating status on the master", snapshot.snapshot(), shardId); notifySuccessfulSnapshotShard(snapshot.snapshot(), shardId, localNodeId); - } else if (localShard.getValue().stage() == Stage.FAILURE) { + } else if (stage == Stage.FAILURE) { // but we think the shard failed - we need to make new master know that the shard failed logger.debug("[{}] new master thinks the shard [{}] is not completed but the shard failed locally, " + "updating status on master", snapshot.snapshot(), shardId); - notifyFailedSnapshotShard(snapshot.snapshot(), shardId, localNodeId, localShardStatus.failure()); + notifyFailedSnapshotShard(snapshot.snapshot(), shardId, localNodeId, indexShardSnapshotStatus.getFailure()); } } } diff --git a/core/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java b/core/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java index e19394714731f..ef999fe9d0045 100644 --- a/core/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java +++ b/core/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java @@ -598,10 +598,7 @@ public Map snapshotShards(final String reposi ShardId shardId = new ShardId(indexMetaData.getIndex(), i); SnapshotShardFailure shardFailure = findShardFailure(snapshotInfo.shardFailures(), shardId); if (shardFailure != null) { - IndexShardSnapshotStatus shardSnapshotStatus = new IndexShardSnapshotStatus(); - shardSnapshotStatus.updateStage(IndexShardSnapshotStatus.Stage.FAILURE); - shardSnapshotStatus.failure(shardFailure.reason()); - shardStatus.put(shardId, shardSnapshotStatus); + shardStatus.put(shardId, IndexShardSnapshotStatus.newFailed(shardFailure.reason())); } else { final IndexShardSnapshotStatus shardSnapshotStatus; if (snapshotInfo.state() == SnapshotState.FAILED) { @@ -612,9 +609,7 @@ public Map snapshotShards(final String reposi // snapshot status will throw an exception. Instead, we create // a status for the shard to indicate that the shard snapshot // could not be taken due to partial being set to false. - shardSnapshotStatus = new IndexShardSnapshotStatus(); - shardSnapshotStatus.updateStage(IndexShardSnapshotStatus.Stage.FAILURE); - shardSnapshotStatus.failure("skipped"); + shardSnapshotStatus = IndexShardSnapshotStatus.newFailed("skipped"); } else { shardSnapshotStatus = repository.getShardSnapshotStatus( snapshotInfo.snapshotId(), diff --git a/core/src/test/java/org/elasticsearch/snapshots/SnapshotShardsServiceIT.java b/core/src/test/java/org/elasticsearch/snapshots/SnapshotShardsServiceIT.java index 651cd96776e75..8431c8fa69f54 100644 --- a/core/src/test/java/org/elasticsearch/snapshots/SnapshotShardsServiceIT.java +++ b/core/src/test/java/org/elasticsearch/snapshots/SnapshotShardsServiceIT.java @@ -93,7 +93,7 @@ public void testRetryPostingSnapshotStatusMessages() throws Exception { assertBusy(() -> { final Snapshot snapshot = new Snapshot("test-repo", snapshotId); List stages = snapshotShardsService.currentSnapshotShards(snapshot) - .values().stream().map(IndexShardSnapshotStatus::stage).collect(Collectors.toList()); + .values().stream().map(status -> status.asCopy().getStage()).collect(Collectors.toList()); assertThat(stages, hasSize(shards)); assertThat(stages, everyItem(equalTo(IndexShardSnapshotStatus.Stage.DONE))); }); diff --git a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java index c06b4a433cb0a..4737befa30e45 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java @@ -619,16 +619,18 @@ protected void recoverShardFromSnapshot(final IndexShard shard, protected void snapshotShard(final IndexShard shard, final Snapshot snapshot, final Repository repository) throws IOException { - final IndexShardSnapshotStatus snapshotStatus = new IndexShardSnapshotStatus(); + final IndexShardSnapshotStatus snapshotStatus = IndexShardSnapshotStatus.newInitializing(); try (Engine.IndexCommitRef indexCommitRef = shard.acquireIndexCommit(true)) { Index index = shard.shardId().getIndex(); IndexId indexId = new IndexId(index.getName(), index.getUUID()); repository.snapshotShard(shard, snapshot.getSnapshotId(), indexId, indexCommitRef.getIndexCommit(), snapshotStatus); } - assertEquals(IndexShardSnapshotStatus.Stage.DONE, snapshotStatus.stage()); - assertEquals(shard.snapshotStoreMetadata().size(), snapshotStatus.numberOfFiles()); - assertNull(snapshotStatus.failure()); + + final IndexShardSnapshotStatus.Copy lastSnapshotStatus = snapshotStatus.asCopy(); + assertEquals(IndexShardSnapshotStatus.Stage.DONE, lastSnapshotStatus.getStage()); + assertEquals(shard.snapshotStoreMetadata().size(), lastSnapshotStatus.getNumberOfFiles()); + assertNull(lastSnapshotStatus.getFailure()); } /** From 25afc10fa352cc1fac2b3c26b9968ed2b589de3c Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Mon, 8 Jan 2018 16:36:23 +0100 Subject: [PATCH 2/4] Apply feedback --- .../snapshots/IndexShardSnapshotStatus.java | 26 ++++++++---- .../snapshots/SnapshotShardsService.java | 41 ++++++++----------- 2 files changed, 33 insertions(+), 34 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/index/snapshots/IndexShardSnapshotStatus.java b/core/src/main/java/org/elasticsearch/index/snapshots/IndexShardSnapshotStatus.java index af9ca042435bc..9d2fac1492946 100644 --- a/core/src/main/java/org/elasticsearch/index/snapshots/IndexShardSnapshotStatus.java +++ b/core/src/main/java/org/elasticsearch/index/snapshots/IndexShardSnapshotStatus.java @@ -64,9 +64,9 @@ public enum Stage { private long startTime; private long totalTime; private int numberOfFiles; - private volatile int processedFiles; + private int processedFiles; private long totalSize; - private volatile long processedSize; + private long processedSize; private long indexVersion; private String failure; @@ -85,41 +85,49 @@ private IndexShardSnapshotStatus(final Stage stage, final long startTime, final } public synchronized Copy moveToStarted(final long startTime, final int numberOfFiles, final long totalSize) { - ensureNotAborted(); if (stage.compareAndSet(Stage.INIT, Stage.STARTED)) { this.startTime = startTime; this.numberOfFiles = numberOfFiles; this.totalSize = totalSize; } else { - throw new IllegalStateException("Unable to move the shard snapshot status to started: it is not initializing"); + throw new IllegalStateException("Unable to move the shard snapshot status to [STARTED]: " + + "expecting [INIT] but got [" + stage.get() + "]"); } return asCopy(); } public synchronized Copy moveToFinalize(final long indexVersion) { - ensureNotAborted(); if (stage.compareAndSet(Stage.STARTED, Stage.FINALIZE)) { this.indexVersion = indexVersion; } else { - throw new IllegalStateException("Unable to move the shard snapshot status to finalize: it is not started"); + throw new IllegalStateException("Unable to move the shard snapshot status to [FINALIZE]: " + + "expecting [STARTED] but got [" + stage.get() + "]"); } return asCopy(); } public synchronized Copy moveToDone(final long endTime) { - ensureNotAborted(); if (stage.compareAndSet(Stage.FINALIZE, Stage.DONE)) { this.totalTime = Math.max(0L, endTime - startTime); } else { - throw new IllegalStateException("Unable to move the shard snapshot status to done: it is not finalizing"); + throw new IllegalStateException("Unable to move the shard snapshot status to [DONE]: " + + "expecting [FINALIZE] but got [" + stage.get() + "]"); } return asCopy(); } - public synchronized void moveToAborted(final String failure) { + public synchronized Copy moveToAborted(final String failure) { if (stage.getAndSet(Stage.ABORTED) != Stage.ABORTED) { this.failure = failure; } + return asCopy(); + } + + public synchronized Copy abortIfNotCompleted(final String failure) { + if (stage.compareAndSet(Stage.INIT, Stage.ABORTED) || stage.compareAndSet(Stage.STARTED, Stage.ABORTED)) { + this.failure = failure; + } + return asCopy(); } public synchronized void moveToFailed(final long endTime, final String failure) { diff --git a/core/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java b/core/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java index d921b81fbb835..7044a8efb640c 100644 --- a/core/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java +++ b/core/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java @@ -230,9 +230,7 @@ private void processIndexShardSnapshots(ClusterChangedEvent event) { // state update, which is being processed here for (IndexShardSnapshotStatus snapshotStatus : entry.getValue().shards.values()) { final IndexShardSnapshotStatus.Stage stage = snapshotStatus.asCopy().getStage(); - if (stage == Stage.INIT || stage == Stage.STARTED) { - snapshotStatus.moveToAborted("snapshot has been removed in cluster state, aborting"); - } + snapshotStatus.abortIfNotCompleted("snapshot has been removed in cluster state, aborting"); } } } @@ -278,33 +276,26 @@ private void processIndexShardSnapshots(ClusterChangedEvent event) { // Abort all running shards for this snapshot SnapshotShards snapshotShards = shardSnapshots.get(entry.snapshot()); if (snapshotShards != null) { + final String failure = "snapshot has been aborted"; for (ObjectObjectCursor shard : entry.shards()) { final IndexShardSnapshotStatus snapshotStatus = snapshotShards.shards.get(shard.key); if (snapshotStatus != null) { - final IndexShardSnapshotStatus.Copy lastSnapshotStatus = snapshotStatus.asCopy(); + final IndexShardSnapshotStatus.Copy lastSnapshotStatus = snapshotStatus.abortIfNotCompleted(failure); final Stage stage = lastSnapshotStatus.getStage(); - switch (stage) { - case INIT: - case STARTED: - snapshotStatus.moveToAborted("snapshot has been aborted"); - break; - case FINALIZE: - logger.debug("[{}] trying to cancel snapshot on shard [{}] that is finalizing, " + - "letting it finish", entry.snapshot(), shard.key); - break; - case DONE: - logger.debug("[{}] trying to cancel snapshot on the shard [{}] that is already done, " + - "updating status on the master", entry.snapshot(), shard.key); - notifySuccessfulSnapshotShard(entry.snapshot(), shard.key, localNodeId); - break; - case FAILURE: - logger.debug("[{}] trying to cancel snapshot on the shard [{}] that has already failed, " + - "updating status on the master", entry.snapshot(), shard.key); - notifyFailedSnapshotShard(entry.snapshot(), shard.key, localNodeId, lastSnapshotStatus.getFailure()); - break; - default: - throw new IllegalStateException("Unknown snapshot shard stage " + stage); + if (stage == Stage.FINALIZE) { + logger.debug("[{}] trying to cancel snapshot on shard [{}] that is finalizing, " + + "letting it finish", entry.snapshot(), shard.key); + + } else if (stage == Stage.DONE) { + logger.debug("[{}] trying to cancel snapshot on the shard [{}] that is already done, " + + "updating status on the master", entry.snapshot(), shard.key); + notifySuccessfulSnapshotShard(entry.snapshot(), shard.key, localNodeId); + + } else if (stage == Stage.FAILURE) { + logger.debug("[{}] trying to cancel snapshot on the shard [{}] that has already failed, " + + "updating status on the master", entry.snapshot(), shard.key); + notifyFailedSnapshotShard(entry.snapshot(), shard.key, localNodeId, lastSnapshotStatus.getFailure()); } } } From 12bf56cd675b83acc5a3eb4e7e0fd996f2c9bea3 Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Tue, 9 Jan 2018 08:53:46 +0100 Subject: [PATCH 3/4] Change toString() --- .../snapshots/IndexShardSnapshotStatus.java | 20 ++++++++++--------- .../snapshots/SnapshotShardsService.java | 2 +- 2 files changed, 12 insertions(+), 10 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/index/snapshots/IndexShardSnapshotStatus.java b/core/src/main/java/org/elasticsearch/index/snapshots/IndexShardSnapshotStatus.java index 9d2fac1492946..7f6a0724d8b71 100644 --- a/core/src/main/java/org/elasticsearch/index/snapshots/IndexShardSnapshotStatus.java +++ b/core/src/main/java/org/elasticsearch/index/snapshots/IndexShardSnapshotStatus.java @@ -19,9 +19,6 @@ package org.elasticsearch.index.snapshots; -import org.elasticsearch.common.unit.ByteSizeValue; -import org.elasticsearch.common.unit.TimeValue; - import java.util.Objects; import java.util.concurrent.atomic.AtomicReference; @@ -245,12 +242,17 @@ public String getFailure() { @Override public String toString() { - return new StringBuilder() - .append("took [").append(TimeValue.timeValueMillis(getTotalTime())).append("], ") - .append("index version [").append(getIndexVersion()).append("], ") - .append("number_of_files [").append(getNumberOfFiles()).append("], ") - .append("total_size [").append(new ByteSizeValue(getTotalSize())).append("]") - .toString(); + return "index shard snapshot status (" + + "stage=" + stage + + ", startTime=" + startTime + + ", totalTime=" + totalTime + + ", numberOfFiles=" + numberOfFiles + + ", processedFiles=" + processedFiles + + ", totalSize=" + totalSize + + ", processedSize=" + processedSize + + ", indexVersion=" + indexVersion + + ", failure='" + failure + '\'' + + ')'; } } } diff --git a/core/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java b/core/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java index 7044a8efb640c..c0682b72e9222 100644 --- a/core/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java +++ b/core/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java @@ -395,7 +395,7 @@ private void snapshot(final IndexShard indexShard, final Snapshot snapshot, fina repository.snapshotShard(indexShard, snapshot.getSnapshotId(), indexId, snapshotRef.getIndexCommit(), snapshotStatus); if (logger.isDebugEnabled()) { final IndexShardSnapshotStatus.Copy lastSnapshotStatus = snapshotStatus.asCopy(); - logger.debug("snapshot ({}) completed to {} {}", snapshot, repository, lastSnapshotStatus); + logger.debug("snapshot ({}) completed to {} with {}", snapshot, repository, lastSnapshotStatus); } } } catch (SnapshotFailedEngineException | IndexShardSnapshotFailedException e) { From 93a51a5ad34aa45c49be04539559f8c11253488b Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Tue, 9 Jan 2018 12:43:00 +0100 Subject: [PATCH 4/4] Apply feedback 2 --- .../snapshots/IndexShardSnapshotStatus.java | 14 +-- .../repositories/Repository.java | 2 +- .../blobstore/BlobStoreRepository.java | 101 +++++++++--------- .../snapshots/SnapshotShardsService.java | 3 +- 4 files changed, 57 insertions(+), 63 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/index/snapshots/IndexShardSnapshotStatus.java b/core/src/main/java/org/elasticsearch/index/snapshots/IndexShardSnapshotStatus.java index 7f6a0724d8b71..f1c247a41bb6d 100644 --- a/core/src/main/java/org/elasticsearch/index/snapshots/IndexShardSnapshotStatus.java +++ b/core/src/main/java/org/elasticsearch/index/snapshots/IndexShardSnapshotStatus.java @@ -113,13 +113,6 @@ public synchronized Copy moveToDone(final long endTime) { return asCopy(); } - public synchronized Copy moveToAborted(final String failure) { - if (stage.getAndSet(Stage.ABORTED) != Stage.ABORTED) { - this.failure = failure; - } - return asCopy(); - } - public synchronized Copy abortIfNotCompleted(final String failure) { if (stage.compareAndSet(Stage.INIT, Stage.ABORTED) || stage.compareAndSet(Stage.STARTED, Stage.ABORTED)) { this.failure = failure; @@ -134,10 +127,8 @@ public synchronized void moveToFailed(final long endTime, final String failure) } } - public void ensureNotAborted() { - if (stage.get() == Stage.ABORTED) { - throw new IllegalStateException("Aborted"); - } + public boolean isAborted() { + return stage.get() == Stage.ABORTED; } /** @@ -164,6 +155,7 @@ public static IndexShardSnapshotStatus newInitializing() { } public static IndexShardSnapshotStatus newFailed(final String failure) { + assert failure != null : "expecting non null failure for a failed IndexShardSnapshotStatus"; if (failure == null) { throw new IllegalArgumentException("A failure description is required for a failed IndexShardSnapshotStatus"); } diff --git a/core/src/main/java/org/elasticsearch/repositories/Repository.java b/core/src/main/java/org/elasticsearch/repositories/Repository.java index 3e8682ba81003..4c3d58e67ff72 100644 --- a/core/src/main/java/org/elasticsearch/repositories/Repository.java +++ b/core/src/main/java/org/elasticsearch/repositories/Repository.java @@ -180,7 +180,7 @@ SnapshotInfo finalizeSnapshot(SnapshotId snapshotId, List indices, long * Repository implementations shouldn't release the snapshot index commit point. It is done by the method caller. *

* As snapshot process progresses, implementation of this method should update {@link IndexShardSnapshotStatus} object and check - * {@link IndexShardSnapshotStatus#ensureNotAborted()} to see if the snapshot process should be aborted. + * {@link IndexShardSnapshotStatus#isAborted()} to see if the snapshot process should be aborted. * * @param shard shard to be snapshotted * @param snapshotId snapshot id diff --git a/core/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/core/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java index 145a2feedbb83..9068c6ff39743 100644 --- a/core/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java +++ b/core/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java @@ -1113,24 +1113,25 @@ private class SnapshotContext extends Context { * * @param snapshotIndexCommit snapshot commit point */ - public void snapshot(IndexCommit snapshotIndexCommit) { + public void snapshot(final IndexCommit snapshotIndexCommit) { logger.debug("[{}] [{}] snapshot to [{}] ...", shardId, snapshotId, metadata.name()); - store.incRef(); + + final Map blobs; try { - final Map blobs; - try { - blobs = blobContainer.listBlobs(); - } catch (IOException e) { - throw new IndexShardSnapshotFailedException(shardId, "failed to list blobs", e); - } + blobs = blobContainer.listBlobs(); + } catch (IOException e) { + throw new IndexShardSnapshotFailedException(shardId, "failed to list blobs", e); + } - long generation = findLatestFileNameGeneration(blobs); - Tuple tuple = buildBlobStoreIndexShardSnapshots(blobs); - BlobStoreIndexShardSnapshots snapshots = tuple.v1(); - int fileListGeneration = tuple.v2(); + long generation = findLatestFileNameGeneration(blobs); + Tuple tuple = buildBlobStoreIndexShardSnapshots(blobs); + BlobStoreIndexShardSnapshots snapshots = tuple.v1(); + int fileListGeneration = tuple.v2(); - final List indexCommitPointFiles = new ArrayList<>(); + final List indexCommitPointFiles = new ArrayList<>(); + store.incRef(); + try { int indexNumberOfFiles = 0; long indexTotalFilesSize = 0; ArrayList filesToSnapshot = new ArrayList<>(); @@ -1144,7 +1145,10 @@ public void snapshot(IndexCommit snapshotIndexCommit) { throw new IndexShardSnapshotFailedException(shardId, "Failed to get store file metadata", e); } for (String fileName : fileNames) { - snapshotStatus.ensureNotAborted(); + if (snapshotStatus.isAborted()) { + logger.debug("[{}] [{}] Aborted on the file [{}], exiting", shardId, snapshotId, fileName); + throw new IndexShardSnapshotFailedException(shardId, "Aborted"); + } logger.trace("[{}] [{}] Processing [{}]", shardId, snapshotId, fileName); final StoreFileMetaData md = metadata.get(fileName); @@ -1190,41 +1194,42 @@ public void snapshot(IndexCommit snapshotIndexCommit) { throw new IndexShardSnapshotFailedException(shardId, "Failed to perform snapshot (index files)", e); } } - - final IndexShardSnapshotStatus.Copy lastSnapshotStatus = snapshotStatus.moveToFinalize(snapshotIndexCommit.getGeneration()); - - // now create and write the commit point - final BlobStoreIndexShardSnapshot snapshot = new BlobStoreIndexShardSnapshot(snapshotId.getName(), - lastSnapshotStatus.getIndexVersion(), - indexCommitPointFiles, - lastSnapshotStatus.getStartTime(), - // snapshotStatus.startTime() is assigned on the same machine, - // so it's safe to use with VLong - System.currentTimeMillis() - lastSnapshotStatus.getStartTime(), - lastSnapshotStatus.getNumberOfFiles(), - lastSnapshotStatus.getTotalSize()); - - //TODO: The time stored in snapshot doesn't include cleanup time. - logger.trace("[{}] [{}] writing shard snapshot file", shardId, snapshotId); - try { - indexShardSnapshotFormat.write(snapshot, blobContainer, snapshotId.getUUID()); - } catch (IOException e) { - throw new IndexShardSnapshotFailedException(shardId, "Failed to write commit point", e); - } - - // delete all files that are not referenced by any commit point - // build a new BlobStoreIndexShardSnapshot, that includes this one and all the saved ones - List newSnapshotsList = new ArrayList<>(); - newSnapshotsList.add(new SnapshotFiles(snapshot.snapshot(), snapshot.indexFiles())); - for (SnapshotFiles point : snapshots) { - newSnapshotsList.add(point); - } - // finalize the snapshot and rewrite the snapshot index with the next sequential snapshot index - finalize(newSnapshotsList, fileListGeneration + 1, blobs); - snapshotStatus.moveToDone(System.currentTimeMillis()); } finally { store.decRef(); } + + final IndexShardSnapshotStatus.Copy lastSnapshotStatus = snapshotStatus.moveToFinalize(snapshotIndexCommit.getGeneration()); + + // now create and write the commit point + final BlobStoreIndexShardSnapshot snapshot = new BlobStoreIndexShardSnapshot(snapshotId.getName(), + lastSnapshotStatus.getIndexVersion(), + indexCommitPointFiles, + lastSnapshotStatus.getStartTime(), + // snapshotStatus.startTime() is assigned on the same machine, + // so it's safe to use with VLong + System.currentTimeMillis() - lastSnapshotStatus.getStartTime(), + lastSnapshotStatus.getNumberOfFiles(), + lastSnapshotStatus.getTotalSize()); + + //TODO: The time stored in snapshot doesn't include cleanup time. + logger.trace("[{}] [{}] writing shard snapshot file", shardId, snapshotId); + try { + indexShardSnapshotFormat.write(snapshot, blobContainer, snapshotId.getUUID()); + } catch (IOException e) { + throw new IndexShardSnapshotFailedException(shardId, "Failed to write commit point", e); + } + + // delete all files that are not referenced by any commit point + // build a new BlobStoreIndexShardSnapshot, that includes this one and all the saved ones + List newSnapshotsList = new ArrayList<>(); + newSnapshotsList.add(new SnapshotFiles(snapshot.snapshot(), snapshot.indexFiles())); + for (SnapshotFiles point : snapshots) { + newSnapshotsList.add(point); + } + // finalize the snapshot and rewrite the snapshot index with the next sequential snapshot index + finalize(newSnapshotsList, fileListGeneration + 1, blobs); + snapshotStatus.moveToDone(System.currentTimeMillis()); + } /** @@ -1319,9 +1324,7 @@ public int read(byte[] b, int off, int len) throws IOException { } private void checkAborted() { - try { - snapshotStatus.ensureNotAborted(); - } catch (IllegalStateException e) { + if (snapshotStatus.isAborted()) { logger.debug("[{}] [{}] Aborted on the file [{}], exiting", shardId, snapshotId, fileName); throw new IndexShardSnapshotFailedException(shardId, "Aborted"); } diff --git a/core/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java b/core/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java index c0682b72e9222..35e0b10fd8769 100644 --- a/core/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java +++ b/core/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java @@ -187,7 +187,7 @@ public void beforeIndexShardClosed(ShardId shardId, @Nullable IndexShard indexSh Map shards = snapshotShards.getValue().shards; if (shards.containsKey(shardId)) { logger.debug("[{}] shard closing, abort snapshotting for snapshot [{}]", shardId, snapshotShards.getKey().getSnapshotId()); - shards.get(shardId).moveToAborted("shard is closing, aborting"); + shards.get(shardId).abortIfNotCompleted("shard is closing, aborting"); } } } @@ -229,7 +229,6 @@ private void processIndexShardSnapshots(ClusterChangedEvent event) { // running shards is missed, then the snapshot is removed is a subsequent cluster // state update, which is being processed here for (IndexShardSnapshotStatus snapshotStatus : entry.getValue().shards.values()) { - final IndexShardSnapshotStatus.Stage stage = snapshotStatus.asCopy().getStage(); snapshotStatus.abortIfNotCompleted("snapshot has been removed in cluster state, aborting"); } }