From b48191a049e0df4493eeb8742c126c08395569aa Mon Sep 17 00:00:00 2001 From: Przemyslaw Gomulka Date: Fri, 14 Dec 2018 16:26:30 +0100 Subject: [PATCH] Fix line length in org.elasticsearch.snapshots Remove the line length suppression for this package and fix offending lines in both main and test relates #34884 --- .../resources/checkstyle_suppressions.xml | 4 - .../elasticsearch/snapshots/RestoreInfo.java | 3 +- .../snapshots/RestoreService.java | 92 +++++++++++++------ .../elasticsearch/snapshots/SnapshotInfo.java | 13 ++- .../snapshots/SnapshotShardsService.java | 32 ++++--- .../snapshots/SnapshotsService.java | 88 ++++++++++++------ .../DedicatedClusterSnapshotRestoreIT.java | 19 +++- .../SharedClusterSnapshotRestoreIT.java | 9 +- .../snapshots/mockstore/MockRepository.java | 3 +- 9 files changed, 173 insertions(+), 90 deletions(-) diff --git a/buildSrc/src/main/resources/checkstyle_suppressions.xml b/buildSrc/src/main/resources/checkstyle_suppressions.xml index 6e628eab0cbd3..3e3434e2d45a4 100644 --- a/buildSrc/src/main/resources/checkstyle_suppressions.xml +++ b/buildSrc/src/main/resources/checkstyle_suppressions.xml @@ -57,10 +57,6 @@ - - - - diff --git a/server/src/main/java/org/elasticsearch/snapshots/RestoreInfo.java b/server/src/main/java/org/elasticsearch/snapshots/RestoreInfo.java index 15d5d1ca26800..bc87c49dcca7a 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/RestoreInfo.java +++ b/server/src/main/java/org/elasticsearch/snapshots/RestoreInfo.java @@ -142,7 +142,8 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws return builder; } - private static final ObjectParser PARSER = new ObjectParser<>(RestoreInfo.class.getName(), true, RestoreInfo::new); + private static final ObjectParser PARSER = new ObjectParser<>(RestoreInfo.class.getName(), + true, RestoreInfo::new); static { ObjectParser shardsParser = new ObjectParser<>("shards", true, null); diff --git a/server/src/main/java/org/elasticsearch/snapshots/RestoreService.java b/server/src/main/java/org/elasticsearch/snapshots/RestoreService.java index 4c6090758dd20..eecac92d63e95 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/RestoreService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/RestoreService.java @@ -103,8 +103,8 @@ * First {@link #restoreSnapshot(RestoreRequest, org.elasticsearch.action.ActionListener)} * method reads information about snapshot and metadata from repository. In update cluster state task it checks restore * preconditions, restores global state if needed, creates {@link RestoreInProgress} record with list of shards that needs - * to be restored and adds this shard to the routing table using {@link RoutingTable.Builder#addAsRestore(IndexMetaData, SnapshotRecoverySource)} - * method. + * to be restored and adds this shard to the routing table using + * {@link RoutingTable.Builder#addAsRestore(IndexMetaData, SnapshotRecoverySource)} method. *

* Individual shards are getting restored as part of normal recovery process in * {@link IndexShard#restoreFromRepository(Repository)} )} @@ -256,15 +256,18 @@ public ClusterState execute(ClusterState currentState) { for (Map.Entry indexEntry : indices.entrySet()) { String index = indexEntry.getValue(); boolean partial = checkPartial(index); - SnapshotRecoverySource recoverySource = new SnapshotRecoverySource(restoreUUID, snapshot, snapshotInfo.version(), index); + SnapshotRecoverySource recoverySource = + new SnapshotRecoverySource(restoreUUID, snapshot, snapshotInfo.version(), index); String renamedIndexName = indexEntry.getKey(); IndexMetaData snapshotIndexMetaData = metaData.index(index); - snapshotIndexMetaData = updateIndexSettings(snapshotIndexMetaData, request.indexSettings, request.ignoreIndexSettings); + snapshotIndexMetaData = updateIndexSettings(snapshotIndexMetaData, + request.indexSettings, request.ignoreIndexSettings); try { snapshotIndexMetaData = metaDataIndexUpgradeService.upgradeIndexMetaData(snapshotIndexMetaData, minIndexCompatibilityVersion); } catch (Exception ex) { - throw new SnapshotRestoreException(snapshot, "cannot restore index [" + index + "] because it cannot be upgraded", ex); + throw new SnapshotRestoreException(snapshot, "cannot restore index [" + index + "] because it cannot be " + + "upgraded", ex); } // Check that the index is closed or doesn't exist IndexMetaData currentIndexMetaData = currentState.metaData().index(renamedIndexName); @@ -274,9 +277,16 @@ public ClusterState execute(ClusterState currentState) { // Index doesn't exist - create it and start recovery // Make sure that the index we are about to create has a validate name MetaDataCreateIndexService.validateIndexName(renamedIndexName, currentState); - createIndexService.validateIndexSettings(renamedIndexName, snapshotIndexMetaData.getSettings(), currentState, false); - IndexMetaData.Builder indexMdBuilder = IndexMetaData.builder(snapshotIndexMetaData).state(IndexMetaData.State.OPEN).index(renamedIndexName); - indexMdBuilder.settings(Settings.builder().put(snapshotIndexMetaData.getSettings()).put(IndexMetaData.SETTING_INDEX_UUID, UUIDs.randomBase64UUID())); + createIndexService.validateIndexSettings(renamedIndexName, + snapshotIndexMetaData.getSettings(), + currentState, + false); + IndexMetaData.Builder indexMdBuilder = IndexMetaData.builder(snapshotIndexMetaData) + .state(IndexMetaData.State.OPEN) + .index(renamedIndexName); + indexMdBuilder.settings(Settings.builder() + .put(snapshotIndexMetaData.getSettings()) + .put(IndexMetaData.SETTING_INDEX_UUID, UUIDs.randomBase64UUID())); if (!request.includeAliases() && !snapshotIndexMetaData.getAliases().isEmpty()) { // Remove all aliases - they shouldn't be restored indexMdBuilder.removeAllAliases(); @@ -296,10 +306,13 @@ public ClusterState execute(ClusterState currentState) { } else { validateExistingIndex(currentIndexMetaData, snapshotIndexMetaData, renamedIndexName, partial); // Index exists and it's closed - open it in metadata and start recovery - IndexMetaData.Builder indexMdBuilder = IndexMetaData.builder(snapshotIndexMetaData).state(IndexMetaData.State.OPEN); + IndexMetaData.Builder indexMdBuilder = IndexMetaData.builder(snapshotIndexMetaData) + .state(IndexMetaData.State.OPEN); indexMdBuilder.version(Math.max(snapshotIndexMetaData.getVersion(), currentIndexMetaData.getVersion() + 1)); - indexMdBuilder.mappingVersion(Math.max(snapshotIndexMetaData.getMappingVersion(), currentIndexMetaData.getMappingVersion() + 1)); - indexMdBuilder.settingsVersion(Math.max(snapshotIndexMetaData.getSettingsVersion(), currentIndexMetaData.getSettingsVersion() + 1)); + indexMdBuilder.mappingVersion(Math.max(snapshotIndexMetaData.getMappingVersion(), + currentIndexMetaData.getMappingVersion() + 1)); + indexMdBuilder.settingsVersion(Math.max(snapshotIndexMetaData.getSettingsVersion(), + currentIndexMetaData.getSettingsVersion() + 1)); if (!request.includeAliases()) { // Remove all snapshot aliases if (!snapshotIndexMetaData.getAliases().isEmpty()) { @@ -314,7 +327,10 @@ public ClusterState execute(ClusterState currentState) { aliases.add(alias.value); } } - indexMdBuilder.settings(Settings.builder().put(snapshotIndexMetaData.getSettings()).put(IndexMetaData.SETTING_INDEX_UUID, currentIndexMetaData.getIndexUUID())); + indexMdBuilder.settings(Settings.builder() + .put(snapshotIndexMetaData.getSettings()) + .put(IndexMetaData.SETTING_INDEX_UUID, + currentIndexMetaData.getIndexUUID())); IndexMetaData updatedIndexMetaData = indexMdBuilder.index(renamedIndexName).build(); rtBuilder.addAsRestore(updatedIndexMetaData, recoverySource); blocks.updateBlocks(updatedIndexMetaData); @@ -324,9 +340,12 @@ public ClusterState execute(ClusterState currentState) { for (int shard = 0; shard < snapshotIndexMetaData.getNumberOfShards(); shard++) { if (!ignoreShards.contains(shard)) { - shardsBuilder.put(new ShardId(renamedIndex, shard), new RestoreInProgress.ShardRestoreStatus(clusterService.state().nodes().getLocalNodeId())); + shardsBuilder.put(new ShardId(renamedIndex, shard), + new RestoreInProgress.ShardRestoreStatus(clusterService.state().nodes().getLocalNodeId())); } else { - shardsBuilder.put(new ShardId(renamedIndex, shard), new RestoreInProgress.ShardRestoreStatus(clusterService.state().nodes().getLocalNodeId(), RestoreInProgress.State.FAILURE)); + shardsBuilder.put(new ShardId(renamedIndex, shard), + new RestoreInProgress.ShardRestoreStatus(clusterService.state().nodes().getLocalNodeId(), + RestoreInProgress.State.FAILURE)); } } } @@ -390,7 +409,9 @@ restoreUUID, snapshot, overallState(RestoreInProgress.State.INIT, shards), private void checkAliasNameConflicts(Map renamedIndices, Set aliases) { for (Map.Entry renamedIndex : renamedIndices.entrySet()) { if (aliases.contains(renamedIndex.getKey())) { - throw new SnapshotRestoreException(snapshot, "cannot rename index [" + renamedIndex.getValue() + "] into [" + renamedIndex.getKey() + "] because of conflict with an alias with the same name"); + throw new SnapshotRestoreException(snapshot, + "cannot rename index [" + renamedIndex.getValue() + "] into [" + renamedIndex.getKey() + "] because of " + + "conflict with an alias with the same name"); } } } @@ -409,28 +430,34 @@ private boolean checkPartial(String index) { if (request.partial()) { return true; } else { - throw new SnapshotRestoreException(snapshot, "index [" + index + "] wasn't fully snapshotted - cannot restore"); + throw new SnapshotRestoreException(snapshot, "index [" + index + "] wasn't fully snapshotted - cannot " + + "restore"); } } else { return false; } } - private void validateExistingIndex(IndexMetaData currentIndexMetaData, IndexMetaData snapshotIndexMetaData, String renamedIndex, boolean partial) { + private void validateExistingIndex(IndexMetaData currentIndexMetaData, IndexMetaData snapshotIndexMetaData, + String renamedIndex, boolean partial) { // Index exist - checking that it's closed if (currentIndexMetaData.getState() != IndexMetaData.State.CLOSE) { // TODO: Enable restore for open indices - throw new SnapshotRestoreException(snapshot, "cannot restore index [" + renamedIndex + "] because an open index with same name already exists in the cluster. " + - "Either close or delete the existing index or restore the index under a different name by providing a rename pattern and replacement name"); + throw new SnapshotRestoreException(snapshot, "cannot restore index [" + renamedIndex + "] because an open index " + + "with same name already exists in the cluster. Either close or delete the existing index or restore the " + + "index under a different name by providing a rename pattern and replacement name"); } // Index exist - checking if it's partial restore if (partial) { - throw new SnapshotRestoreException(snapshot, "cannot restore partial index [" + renamedIndex + "] because such index already exists"); + throw new SnapshotRestoreException(snapshot, "cannot restore partial index [" + renamedIndex + "] because such " + + "index already exists"); } // Make sure that the number of shards is the same. That's the only thing that we cannot change if (currentIndexMetaData.getNumberOfShards() != snapshotIndexMetaData.getNumberOfShards()) { - throw new SnapshotRestoreException(snapshot, "cannot restore index [" + renamedIndex + "] with [" + currentIndexMetaData.getNumberOfShards() + - "] shards from a snapshot of index [" + snapshotIndexMetaData.getIndex().getName() + "] with [" + snapshotIndexMetaData.getNumberOfShards() + "] shards"); + throw new SnapshotRestoreException(snapshot, + "cannot restore index [" + renamedIndex + "] with [" + currentIndexMetaData.getNumberOfShards() + "] shards " + + "from a snapshot of index [" + snapshotIndexMetaData.getIndex().getName() + "] with [" + + snapshotIndexMetaData.getNumberOfShards() + "] shards"); } } @@ -442,7 +469,10 @@ private IndexMetaData updateIndexSettings(IndexMetaData indexMetaData, Settings if (changeSettings.names().isEmpty() && ignoreSettings.length == 0) { return indexMetaData; } - Settings normalizedChangeSettings = Settings.builder().put(changeSettings).normalizePrefix(IndexMetaData.INDEX_SETTING_PREFIX).build(); + Settings normalizedChangeSettings = Settings.builder() + .put(changeSettings) + .normalizePrefix(IndexMetaData.INDEX_SETTING_PREFIX) + .build(); IndexMetaData.Builder builder = IndexMetaData.builder(indexMetaData); Settings settings = indexMetaData.getSettings(); Set keyFilters = new HashSet<>(); @@ -504,7 +534,8 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS } catch (Exception e) { - logger.warn(() -> new ParameterizedMessage("[{}] failed to restore snapshot", request.repositoryName + ":" + request.snapshotName), e); + logger.warn(() -> new ParameterizedMessage("[{}] failed to restore snapshot", + request.repositoryName + ":" + request.snapshotName), e); listener.onFailure(e); } } @@ -521,12 +552,14 @@ public static RestoreInProgress updateRestoreStateWithDeletedIndices(RestoreInPr if (shardsBuilder == null) { shardsBuilder = ImmutableOpenMap.builder(entry.shards()); } - shardsBuilder.put(shardId, new ShardRestoreStatus(null, RestoreInProgress.State.FAILURE, "index was deleted")); + shardsBuilder.put(shardId, + new ShardRestoreStatus(null, RestoreInProgress.State.FAILURE, "index was deleted")); } } if (shardsBuilder != null) { ImmutableOpenMap shards = shardsBuilder.build(); - builder.add(new RestoreInProgress.Entry(entry.uuid(), entry.snapshot(), overallState(RestoreInProgress.State.STARTED, shards), entry.indices(), shards)); + builder.add(new RestoreInProgress.Entry(entry.uuid(), entry.snapshot(), + overallState(RestoreInProgress.State.STARTED, shards), entry.indices(), shards)); } else { builder.add(entry); } @@ -602,8 +635,8 @@ public void shardInitialized(ShardRouting unassignedShard, ShardRouting initiali initializedShard.recoverySource().getType() != RecoverySource.Type.SNAPSHOT) { changes(unassignedShard.recoverySource()).shards.put( unassignedShard.shardId(), - new ShardRestoreStatus(null, - RestoreInProgress.State.FAILURE, "recovery source type changed from snapshot to " + initializedShard.recoverySource()) + new ShardRestoreStatus(null, RestoreInProgress.State.FAILURE, + "recovery source type changed from snapshot to " + initializedShard.recoverySource()) ); } } @@ -672,7 +705,8 @@ public static RestoreInProgress.Entry restoreInProgress(ClusterState state, Stri return null; } - static class CleanRestoreStateTaskExecutor implements ClusterStateTaskExecutor, ClusterStateTaskListener { + static class CleanRestoreStateTaskExecutor implements ClusterStateTaskExecutor, + ClusterStateTaskListener { static class Task { final String uuid; diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotInfo.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotInfo.java index b512a570a8539..c1692f606178e 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotInfo.java +++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotInfo.java @@ -232,16 +232,18 @@ int getSuccessfulShards() { private final List shardFailures; public SnapshotInfo(SnapshotId snapshotId, List indices, SnapshotState state) { - this(snapshotId, indices, state, null, null, 0L, 0L, 0, 0, Collections.emptyList(), null); + this(snapshotId, indices, state, null, null, 0L, 0L, 0, 0, + Collections.emptyList(), null); } public SnapshotInfo(SnapshotId snapshotId, List indices, SnapshotState state, Version version) { - this(snapshotId, indices, state, null, version, 0L, 0L, 0, 0, Collections.emptyList(), null); + this(snapshotId, indices, state, null, version, 0L, 0L, 0, 0, + Collections.emptyList(), null); } public SnapshotInfo(SnapshotId snapshotId, List indices, long startTime, Boolean includeGlobalState) { - this(snapshotId, indices, SnapshotState.IN_PROGRESS, null, Version.CURRENT, startTime, 0L, 0, 0, - Collections.emptyList(), includeGlobalState); + this(snapshotId, indices, SnapshotState.IN_PROGRESS, null, Version.CURRENT, startTime, 0L, + 0, 0, Collections.emptyList(), includeGlobalState); } public SnapshotInfo(SnapshotId snapshotId, List indices, long startTime, String reason, long endTime, @@ -306,7 +308,8 @@ public SnapshotInfo(final StreamInput in) throws IOException { public static SnapshotInfo incompatible(SnapshotId snapshotId) { return new SnapshotInfo(snapshotId, Collections.emptyList(), SnapshotState.INCOMPATIBLE, "the snapshot is incompatible with the current version of Elasticsearch and its exact version is unknown", - null, 0L, 0L, 0, 0, Collections.emptyList(), null); + null, 0L, 0L, 0, 0, + Collections.emptyList(), null); } /** diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java index 7e590bc4104a9..40c89f10ccbc5 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java @@ -116,8 +116,8 @@ public class SnapshotShardsService extends AbstractLifecycleComponent implements private final UpdateSnapshotStatusAction updateSnapshotStatusHandler; @Inject - public SnapshotShardsService(Settings settings, ClusterService clusterService, SnapshotsService snapshotsService, ThreadPool threadPool, - TransportService transportService, IndicesService indicesService, + public SnapshotShardsService(Settings settings, ClusterService clusterService, SnapshotsService snapshotsService, + ThreadPool threadPool, TransportService transportService, IndicesService indicesService, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) { super(settings); this.indicesService = indicesService; @@ -188,7 +188,8 @@ public void beforeIndexShardClosed(ShardId shardId, @Nullable IndexShard indexSh for (Map.Entry> snapshotShards : snapshotShardsMap.entrySet()) { Map shards = snapshotShards.getValue(); if (shards.containsKey(shardId)) { - logger.debug("[{}] shard closing, abort snapshotting for snapshot [{}]", shardId, snapshotShards.getKey().getSnapshotId()); + logger.debug("[{}] shard closing, abort snapshotting for snapshot [{}]", + shardId, snapshotShards.getKey().getSnapshotId()); shards.get(shardId).abortIfNotCompleted("shard is closing, aborting"); } } @@ -337,7 +338,8 @@ public void doRun() { @Override public void onFailure(Exception e) { - logger.warn(() -> new ParameterizedMessage("[{}][{}] failed to snapshot shard", shardId, snapshot), e); + logger.warn(() -> new ParameterizedMessage("[{}][{}] failed to snapshot shard", + shardId, snapshot), e); failure.set(e); } @@ -367,7 +369,8 @@ public void onAfter() { * @param snapshot snapshot * @param snapshotStatus snapshot status */ - private void snapshot(final IndexShard indexShard, final Snapshot snapshot, final IndexId indexId, final IndexShardSnapshotStatus snapshotStatus) { + private void snapshot(final IndexShard indexShard, final Snapshot snapshot, final IndexId indexId, + final IndexShardSnapshotStatus snapshotStatus) { final ShardId shardId = indexShard.shardId(); if (indexShard.routingEntry().primary() == false) { throw new IndexShardSnapshotFailedException(shardId, "snapshot should be performed only on primary"); @@ -526,7 +529,8 @@ void sendSnapshotShardUpdate(final Snapshot snapshot, final ShardId shardId, fin * * @param request update shard status request */ - private void innerUpdateSnapshotState(final UpdateIndexShardSnapshotStatusRequest request, ActionListener listener) { + private void innerUpdateSnapshotState(final UpdateIndexShardSnapshotStatusRequest request, + ActionListener listener) { logger.trace("received updated snapshot restore state [{}]", request); clusterService.submitStateUpdateTask( "update snapshot state", @@ -549,7 +553,8 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS class SnapshotStateExecutor implements ClusterStateTaskExecutor { @Override - public ClusterTasksResult execute(ClusterState currentState, List tasks) throws Exception { + public ClusterTasksResult + execute(ClusterState currentState, List tasks) throws Exception { final SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE); if (snapshots != null) { int changedCount = 0; @@ -560,7 +565,8 @@ public ClusterTasksResult execute(Cluster for (UpdateIndexShardSnapshotStatusRequest updateSnapshotState : tasks) { if (entry.snapshot().equals(updateSnapshotState.snapshot())) { - logger.trace("[{}] Updating shard [{}] with status [{}]", updateSnapshotState.snapshot(), updateSnapshotState.shardId(), updateSnapshotState.status().state()); + logger.trace("[{}] Updating shard [{}] with status [{}]", updateSnapshotState.snapshot(), + updateSnapshotState.shardId(), updateSnapshotState.status().state()); if (updated == false) { shards.putAll(entry.shards()); updated = true; @@ -588,7 +594,8 @@ public ClusterTasksResult execute(Cluster if (changedCount > 0) { logger.trace("changed cluster state triggered by {} snapshot state updates", changedCount); - final SnapshotsInProgress updatedSnapshots = new SnapshotsInProgress(entries.toArray(new SnapshotsInProgress.Entry[entries.size()])); + final SnapshotsInProgress updatedSnapshots = + new SnapshotsInProgress(entries.toArray(new SnapshotsInProgress.Entry[entries.size()])); return ClusterTasksResult.builder().successes(tasks).build( ClusterState.builder(currentState).putCustom(SnapshotsInProgress.TYPE, updatedSnapshots).build()); } @@ -606,8 +613,8 @@ private class UpdateSnapshotStatusAction UpdateSnapshotStatusAction(TransportService transportService, ClusterService clusterService, ThreadPool threadPool, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) { super( - SnapshotShardsService.UPDATE_SNAPSHOT_STATUS_ACTION_NAME, transportService, clusterService, threadPool, actionFilters, - indexNameExpressionResolver, UpdateIndexShardSnapshotStatusRequest::new + SnapshotShardsService.UPDATE_SNAPSHOT_STATUS_ACTION_NAME, transportService, clusterService, threadPool, + actionFilters, indexNameExpressionResolver, UpdateIndexShardSnapshotStatusRequest::new ); } @@ -622,7 +629,8 @@ protected UpdateIndexShardSnapshotStatusResponse newResponse() { } @Override - protected void masterOperation(UpdateIndexShardSnapshotStatusRequest request, ClusterState state, ActionListener listener) throws Exception { + protected void masterOperation(UpdateIndexShardSnapshotStatusRequest request, ClusterState state, + ActionListener listener) throws Exception { innerUpdateSnapshotState(request, listener); } diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java index fa7c757aaca7b..8c505d20d17ff 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java @@ -92,16 +92,20 @@ *

* A typical snapshot creating process looks like this: *

    - *
  • On the master node the {@link #createSnapshot(SnapshotRequest, CreateSnapshotListener)} is called and makes sure that no snapshots is currently running - * and registers the new snapshot in cluster state
  • - *
  • When cluster state is updated the {@link #beginSnapshot(ClusterState, SnapshotsInProgress.Entry, boolean, CreateSnapshotListener)} method - * kicks in and initializes the snapshot in the repository and then populates list of shards that needs to be snapshotted in cluster state
  • + *
  • On the master node the {@link #createSnapshot(SnapshotRequest, CreateSnapshotListener)} is called and makes sure that no snapshots + * is currently running and registers the new snapshot in cluster state
  • + *
  • When cluster state is updated + * the {@link #beginSnapshot(ClusterState, SnapshotsInProgress.Entry, boolean, CreateSnapshotListener)} method kicks in and initializes + * the snapshot in the repository and then populates list of shards that needs to be snapshotted in cluster state
  • *
  • Each data node is watching for these shards and when new shards scheduled for snapshotting appear in the cluster state, data nodes * start processing them through {@link SnapshotShardsService#processIndexShardSnapshots(ClusterChangedEvent)} method
  • - *
  • Once shard snapshot is created data node updates state of the shard in the cluster state using the {@link SnapshotShardsService#sendSnapshotShardUpdate(Snapshot, ShardId, ShardSnapshotStatus)} method
  • - *
  • When last shard is completed master node in {@link SnapshotShardsService#innerUpdateSnapshotState} method marks the snapshot as completed
  • + *
  • Once shard snapshot is created data node updates state of the shard in the cluster state using + * the {@link SnapshotShardsService#sendSnapshotShardUpdate(Snapshot, ShardId, ShardSnapshotStatus)} method
  • + *
  • When last shard is completed master node in {@link SnapshotShardsService#innerUpdateSnapshotState} method marks the snapshot + * as completed
  • *
  • After cluster state is updated, the {@link #endSnapshot(SnapshotsInProgress.Entry)} finalizes snapshot in the repository, - * notifies all {@link #snapshotCompletionListeners} that snapshot is completed, and finally calls {@link #removeSnapshotFromClusterState(Snapshot, SnapshotInfo, Exception)} to remove snapshot from cluster state
  • + * notifies all {@link #snapshotCompletionListeners} that snapshot is completed, and finally calls + * {@link #removeSnapshotFromClusterState(Snapshot, SnapshotInfo, Exception)} to remove snapshot from cluster state *
*/ public class SnapshotsService extends AbstractLifecycleComponent implements ClusterStateApplier { @@ -118,7 +122,8 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus private final CopyOnWriteArrayList snapshotCompletionListeners = new CopyOnWriteArrayList<>(); @Inject - public SnapshotsService(Settings settings, ClusterService clusterService, IndexNameExpressionResolver indexNameExpressionResolver, RepositoriesService repositoriesService, ThreadPool threadPool) { + public SnapshotsService(Settings settings, ClusterService clusterService, IndexNameExpressionResolver indexNameExpressionResolver, + RepositoriesService repositoriesService, ThreadPool threadPool) { super(settings); this.clusterService = clusterService; this.indexNameExpressionResolver = indexNameExpressionResolver; @@ -253,7 +258,8 @@ public ClusterState execute(ClusterState currentState) { SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE); if (snapshots == null || snapshots.entries().isEmpty()) { // Store newSnapshot here to be processed in clusterStateProcessed - List indices = Arrays.asList(indexNameExpressionResolver.concreteIndexNames(currentState, request.indicesOptions(), request.indices())); + List indices = Arrays.asList(indexNameExpressionResolver.concreteIndexNames(currentState, + request.indicesOptions(), request.indices())); logger.trace("[{}][{}] creating snapshot for indices [{}]", repositoryName, snapshotName, indices); List snapshotIndices = repositoryData.resolveNewIndices(indices); newSnapshot = new SnapshotsInProgress.Entry(new Snapshot(repositoryName, snapshotId), @@ -393,9 +399,11 @@ public ClusterState execute(ClusterState currentState) { if (entry.state() != State.ABORTED) { // Replace the snapshot that was just intialized - ImmutableOpenMap shards = shards(currentState, entry.indices()); + ImmutableOpenMap shards = + shards(currentState, entry.indices()); if (!partial) { - Tuple, Set> indicesWithMissingShards = indicesWithMissingShards(shards, currentState.metaData()); + Tuple, Set> indicesWithMissingShards = indicesWithMissingShards(shards, + currentState.metaData()); Set missing = indicesWithMissingShards.v1(); Set closed = indicesWithMissingShards.v2(); if (missing.isEmpty() == false || closed.isEmpty() == false) { @@ -437,8 +445,10 @@ public ClusterState execute(ClusterState currentState) { @Override public void onFailure(String source, Exception e) { - logger.warn(() -> new ParameterizedMessage("[{}] failed to create snapshot", snapshot.snapshot().getSnapshotId()), e); - removeSnapshotFromClusterState(snapshot.snapshot(), null, e, new CleanupAfterErrorListener(snapshot, true, userCreateSnapshotListener, e)); + logger.warn(() -> new ParameterizedMessage("[{}] failed to create snapshot", + snapshot.snapshot().getSnapshotId()), e); + removeSnapshotFromClusterState(snapshot.snapshot(), null, e, + new CleanupAfterErrorListener(snapshot, true, userCreateSnapshotListener, e)); } @Override @@ -471,8 +481,10 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS @Override public void onFailure(Exception e) { - logger.warn(() -> new ParameterizedMessage("failed to create snapshot [{}]", snapshot.snapshot().getSnapshotId()), e); - removeSnapshotFromClusterState(snapshot.snapshot(), null, e, new CleanupAfterErrorListener(snapshot, snapshotCreated, userCreateSnapshotListener, e)); + logger.warn(() -> new ParameterizedMessage("failed to create snapshot [{}]", + snapshot.snapshot().getSnapshotId()), e); + removeSnapshotFromClusterState(snapshot.snapshot(), null, e, + new CleanupAfterErrorListener(snapshot, snapshotCreated, userCreateSnapshotListener, e)); } }); } @@ -484,7 +496,8 @@ private class CleanupAfterErrorListener implements ActionListener private final CreateSnapshotListener userCreateSnapshotListener; private final Exception e; - CleanupAfterErrorListener(SnapshotsInProgress.Entry snapshot, boolean snapshotCreated, CreateSnapshotListener userCreateSnapshotListener, Exception e) { + CleanupAfterErrorListener(SnapshotsInProgress.Entry snapshot, boolean snapshotCreated, + CreateSnapshotListener userCreateSnapshotListener, Exception e) { this.snapshot = snapshot; this.snapshotCreated = snapshotCreated; this.userCreateSnapshotListener = userCreateSnapshotListener; @@ -520,7 +533,8 @@ private void cleanupAfterError(Exception exception) { snapshot.includeGlobalState()); } catch (Exception inner) { inner.addSuppressed(exception); - logger.warn(() -> new ParameterizedMessage("[{}] failed to close snapshot in repository", snapshot.snapshot()), inner); + logger.warn(() -> new ParameterizedMessage("[{}] failed to close snapshot in repository", + snapshot.snapshot()), inner); } } userCreateSnapshotListener.onFailure(e); @@ -744,8 +758,10 @@ public ClusterState execute(ClusterState currentState) throws Exception { } else { // TODO: Restart snapshot on another node? snapshotChanged = true; - logger.warn("failing snapshot of shard [{}] on closed node [{}]", shardEntry.key, shardStatus.nodeId()); - shards.put(shardEntry.key, new ShardSnapshotStatus(shardStatus.nodeId(), State.FAILED, "node shutdown")); + logger.warn("failing snapshot of shard [{}] on closed node [{}]", + shardEntry.key, shardStatus.nodeId()); + shards.put(shardEntry.key, + new ShardSnapshotStatus(shardStatus.nodeId(), State.FAILED, "node shutdown")); } } } @@ -808,7 +824,8 @@ public ClusterState execute(ClusterState currentState) throws Exception { for (final SnapshotsInProgress.Entry snapshot : snapshots.entries()) { SnapshotsInProgress.Entry updatedSnapshot = snapshot; if (snapshot.state() == State.STARTED) { - ImmutableOpenMap shards = processWaitingShards(snapshot.shards(), routingTable); + ImmutableOpenMap shards = processWaitingShards(snapshot.shards(), + routingTable); if (shards != null) { changed = true; if (!snapshot.state().completed() && completed(shards.values())) { @@ -831,7 +848,8 @@ public ClusterState execute(ClusterState currentState) throws Exception { @Override public void onFailure(String source, Exception e) { - logger.warn(() -> new ParameterizedMessage("failed to update snapshot state after shards started from [{}] ", source), e); + logger.warn(() -> + new ParameterizedMessage("failed to update snapshot state after shards started from [{}] ", source), e); } }); } @@ -929,12 +947,14 @@ private boolean removedNodesCleanupNeeded(ClusterChangedEvent event) { * @param shards list of shard statuses * @return list of failed and closed indices */ - private Tuple, Set> indicesWithMissingShards(ImmutableOpenMap shards, MetaData metaData) { + private Tuple, Set> indicesWithMissingShards( + ImmutableOpenMap shards, MetaData metaData) { Set missing = new HashSet<>(); Set closed = new HashSet<>(); for (ObjectObjectCursor entry : shards) { if (entry.value.state() == State.MISSING) { - if (metaData.hasIndex(entry.key.getIndex().getName()) && metaData.getIndexSafe(entry.key.getIndex()).getState() == IndexMetaData.State.CLOSE) { + if (metaData.hasIndex(entry.key.getIndex().getName()) && + metaData.getIndexSafe(entry.key.getIndex()).getState() == IndexMetaData.State.CLOSE) { closed.add(entry.key.getIndex().getName()); } else { missing.add(entry.key.getIndex().getName()); @@ -1130,7 +1150,8 @@ private void deleteSnapshot(final Snapshot snapshot, final DeleteSnapshotListene public ClusterState execute(ClusterState currentState) throws Exception { SnapshotDeletionsInProgress deletionsInProgress = currentState.custom(SnapshotDeletionsInProgress.TYPE); if (deletionsInProgress != null && deletionsInProgress.hasDeletionsInProgress()) { - throw new ConcurrentSnapshotExecutionException(snapshot, "cannot delete - another snapshot is currently being deleted"); + throw new ConcurrentSnapshotExecutionException(snapshot, + "cannot delete - another snapshot is currently being deleted"); } RestoreInProgress restoreInProgress = currentState.custom(RestoreInProgress.TYPE); if (restoreInProgress != null) { @@ -1236,7 +1257,8 @@ public void onSnapshotCompletion(Snapshot completedSnapshot, SnapshotInfo snapsh listener, true); } catch (Exception ex) { - logger.warn(() -> new ParameterizedMessage("[{}] failed to delete snapshot", snapshot), ex); + logger.warn(() -> + new ParameterizedMessage("[{}] failed to delete snapshot", snapshot), ex); } } ); @@ -1384,7 +1406,8 @@ private ImmutableOpenMap shard IndexMetaData indexMetaData = metaData.index(indexName); if (indexMetaData == null) { // The index was deleted before we managed to start the snapshot - mark it as missing. - builder.put(new ShardId(indexName, IndexMetaData.INDEX_UUID_NA_VALUE, 0), new SnapshotsInProgress.ShardSnapshotStatus(null, State.MISSING, "missing index")); + builder.put(new ShardId(indexName, IndexMetaData.INDEX_UUID_NA_VALUE, 0), + new SnapshotsInProgress.ShardSnapshotStatus(null, State.MISSING, "missing index")); } else if (indexMetaData.getState() == IndexMetaData.State.CLOSE) { for (int i = 0; i < indexMetaData.getNumberOfShards(); i++) { ShardId shardId = new ShardId(indexMetaData.getIndex(), i); @@ -1397,17 +1420,22 @@ private ImmutableOpenMap shard if (indexRoutingTable != null) { ShardRouting primary = indexRoutingTable.shard(i).primaryShard(); if (primary == null || !primary.assignedToNode()) { - builder.put(shardId, new SnapshotsInProgress.ShardSnapshotStatus(null, State.MISSING, "primary shard is not allocated")); + builder.put(shardId, + new SnapshotsInProgress.ShardSnapshotStatus(null, State.MISSING, "primary shard is not allocated")); } else if (primary.relocating() || primary.initializing()) { - // The WAITING state was introduced in V1.2.0 - don't use it if there are nodes with older version in the cluster + // The WAITING state was introduced in V1.2.0 - + // don't use it if there are nodes with older version in the cluster builder.put(shardId, new SnapshotsInProgress.ShardSnapshotStatus(primary.currentNodeId(), State.WAITING)); } else if (!primary.started()) { - builder.put(shardId, new SnapshotsInProgress.ShardSnapshotStatus(primary.currentNodeId(), State.MISSING, "primary shard hasn't been started yet")); + builder.put(shardId, + new SnapshotsInProgress.ShardSnapshotStatus(primary.currentNodeId(), State.MISSING, + "primary shard hasn't been started yet")); } else { builder.put(shardId, new SnapshotsInProgress.ShardSnapshotStatus(primary.currentNodeId())); } } else { - builder.put(shardId, new SnapshotsInProgress.ShardSnapshotStatus(null, State.MISSING, "missing routing table")); + builder.put(shardId, new SnapshotsInProgress.ShardSnapshotStatus(null, State.MISSING, + "missing routing table")); } } } diff --git a/server/src/test/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java b/server/src/test/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java index a3be8cfa15baa..74efe5f68ccfc 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java +++ b/server/src/test/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java @@ -367,7 +367,8 @@ public void testSnapshotDuringNodeShutdown() throws Exception { logger.info("--> start 2 nodes"); Client client = client(); - assertAcked(prepareCreate("test-idx", 2, Settings.builder().put("number_of_shards", 2).put("number_of_replicas", 0))); + assertAcked(prepareCreate("test-idx", 2, Settings.builder().put("number_of_shards", 2) + .put("number_of_replicas", 0))); ensureGreen(); logger.info("--> indexing some data"); @@ -392,7 +393,10 @@ public void testSnapshotDuringNodeShutdown() throws Exception { String blockedNode = blockNodeWithIndex("test-repo", "test-idx"); logger.info("--> snapshot"); - client.admin().cluster().prepareCreateSnapshot("test-repo", "test-snap").setWaitForCompletion(false).setIndices("test-idx").get(); + client.admin().cluster().prepareCreateSnapshot("test-repo", "test-snap") + .setWaitForCompletion(false) + .setIndices("test-idx") + .get(); logger.info("--> waiting for block to kick in"); waitForBlock(blockedNode, "test-repo", TimeValue.timeValueSeconds(60)); @@ -415,7 +419,8 @@ public void testSnapshotWithStuckNode() throws Exception { nodes.add(internalCluster().startNode()); Client client = client(); - assertAcked(prepareCreate("test-idx", 2, Settings.builder().put("number_of_shards", 2).put("number_of_replicas", 0))); + assertAcked(prepareCreate("test-idx", 2, Settings.builder().put("number_of_shards", 2) + .put("number_of_replicas", 0))); ensureGreen(); logger.info("--> indexing some data"); @@ -443,7 +448,10 @@ public void testSnapshotWithStuckNode() throws Exception { int numberOfFilesBeforeSnapshot = numberOfFiles(repo); logger.info("--> snapshot"); - client.admin().cluster().prepareCreateSnapshot("test-repo", "test-snap").setWaitForCompletion(false).setIndices("test-idx").get(); + client.admin().cluster().prepareCreateSnapshot("test-repo", "test-snap") + .setWaitForCompletion(false) + .setIndices("test-idx") + .get(); logger.info("--> waiting for block to kick in"); waitForBlock(blockedNode, "test-repo", TimeValue.timeValueSeconds(60)); @@ -509,7 +517,8 @@ public void testRestoreIndexWithMissingShards() throws Exception { ensureGreen("test-idx-all"); logger.info("--> create an index that will be closed"); - assertAcked(prepareCreate("test-idx-closed", 1, Settings.builder().put("number_of_shards", 4).put("number_of_replicas", 0))); + assertAcked(prepareCreate("test-idx-closed", 1, Settings.builder().put("number_of_shards", 4) + .put("number_of_replicas", 0))); ensureGreen("test-idx-closed"); logger.info("--> indexing some data into test-idx-all"); diff --git a/server/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java b/server/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java index 35e813756bca8..fcb06acd60a33 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java +++ b/server/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java @@ -1345,7 +1345,8 @@ public void testDeleteSnapshotWithMissingIndexAndShardMetadata() throws Exceptio client.admin().cluster().prepareDeleteSnapshot("test-repo", "test-snap-1").get(); logger.info("--> make sure snapshot doesn't exist"); - assertThrows(client.admin().cluster().prepareGetSnapshots("test-repo").addSnapshots("test-snap-1"), SnapshotMissingException.class); + assertThrows(client.admin().cluster().prepareGetSnapshots("test-repo") + .addSnapshots("test-snap-1"), SnapshotMissingException.class); for (String index : indices) { assertTrue(Files.notExists(indicesPath.resolve(indexIds.get(index).getId()))); @@ -1384,7 +1385,8 @@ public void testDeleteSnapshotWithMissingMetadata() throws Exception { client.admin().cluster().prepareDeleteSnapshot("test-repo", "test-snap-1").get(); logger.info("--> make sure snapshot doesn't exist"); - assertThrows(client.admin().cluster().prepareGetSnapshots("test-repo").addSnapshots("test-snap-1"), SnapshotMissingException.class); + assertThrows(client.admin().cluster().prepareGetSnapshots("test-repo") + .addSnapshots("test-snap-1"), SnapshotMissingException.class); } public void testDeleteSnapshotWithCorruptedSnapshotFile() throws Exception { @@ -2014,7 +2016,8 @@ public void testSnapshotStatus() throws Exception { logger.info("--> waiting for block to kick in"); waitForBlock(blockedNode, "test-repo", TimeValue.timeValueSeconds(60)); - logger.info("--> execution was blocked on node [{}], checking snapshot status with specified repository and snapshot", blockedNode); + logger.info("--> execution was blocked on node [{}], checking snapshot status with specified repository and snapshot", + blockedNode); SnapshotsStatusResponse response = client.admin().cluster().prepareSnapshotStatus("test-repo").execute().actionGet(); assertThat(response.getSnapshots().size(), equalTo(1)); SnapshotStatus snapshotStatus = response.getSnapshots().get(0); diff --git a/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockRepository.java b/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockRepository.java index 9dca64d5b831d..2b634385b29af 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockRepository.java +++ b/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockRepository.java @@ -346,7 +346,8 @@ public Map listBlobsByPrefix(String blobNamePrefix) throws } @Override - public void writeBlob(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) throws IOException { + public void writeBlob(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) + throws IOException { maybeIOExceptionOrBlock(blobName); super.writeBlob(blobName, inputStream, blobSize, failIfAlreadyExists); if (RandomizedContext.current().getRandom().nextBoolean()) {