diff --git a/server/src/internalClusterTest/java/org/elasticsearch/snapshots/SnapshotShutdownIT.java b/server/src/internalClusterTest/java/org/elasticsearch/snapshots/SnapshotShutdownIT.java index e5e641bfdda21..755ee960be73e 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/snapshots/SnapshotShutdownIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/snapshots/SnapshotShutdownIT.java @@ -524,6 +524,15 @@ public void testSnapshotShutdownProgressTracker() throws Exception { "Pause signals have been set for all shard snapshots on data node [" + nodeForRemovalId + "]" ) ); + mockLog.addExpectation( + new MockLog.SeenEventExpectation( + "SnapshotShutdownProgressTracker index shard snapshot status messages", + SnapshotShutdownProgressTracker.class.getCanonicalName(), + Level.INFO, + // Expect the shard snapshot to stall in data file upload, since we've blocked the data node file upload to the blob store. + "statusDescription='enqueued file snapshot tasks: threads running concurrent file uploads'" + ) + ); putShutdownForRemovalMetadata(nodeForRemoval, clusterService); @@ -583,6 +592,14 @@ public void testSnapshotShutdownProgressTracker() throws Exception { "Current active shard snapshot stats on data node [" + nodeForRemovalId + "]*Paused [" + numShards + "]" ) ); + mockLog.addExpectation( + new MockLog.SeenEventExpectation( + "SnapshotShutdownProgressTracker index shard snapshot messages", + SnapshotShutdownProgressTracker.class.getCanonicalName(), + Level.INFO, + "statusDescription='finished: master notification attempt complete'" + ) + ); // Release the master node to respond snapshotStatusUpdateLatch.countDown(); diff --git a/server/src/main/java/org/elasticsearch/index/snapshots/IndexShardSnapshotStatus.java b/server/src/main/java/org/elasticsearch/index/snapshots/IndexShardSnapshotStatus.java index d8bd460f6f819..6aa6a5e498789 100644 --- a/server/src/main/java/org/elasticsearch/index/snapshots/IndexShardSnapshotStatus.java +++ b/server/src/main/java/org/elasticsearch/index/snapshots/IndexShardSnapshotStatus.java @@ -98,6 +98,7 @@ public enum AbortStatus { private long processedSize; private String failure; private final SubscribableListener abortListeners = new SubscribableListener<>(); + private volatile String statusDescription; private IndexShardSnapshotStatus( final Stage stage, @@ -110,7 +111,8 @@ private IndexShardSnapshotStatus( final long totalSize, final long processedSize, final String failure, - final ShardGeneration generation + final ShardGeneration generation, + final String statusDescription ) { this.stage = new AtomicReference<>(Objects.requireNonNull(stage)); this.generation = new AtomicReference<>(generation); @@ -124,6 +126,7 @@ private IndexShardSnapshotStatus( this.processedSize = processedSize; this.incrementalSize = incrementalSize; this.failure = failure; + updateStatusDescription(statusDescription); } public synchronized Copy moveToStarted( @@ -272,6 +275,15 @@ public synchronized void addProcessedFiles(int count, long totalSize) { processedSize += totalSize; } + /** + * Updates the string explanation for what the snapshot is actively doing right now. + */ + public void updateStatusDescription(String statusString) { + assert statusString != null; + assert statusString.isEmpty() == false; + this.statusDescription = statusString; + } + /** * Returns a copy of the current {@link IndexShardSnapshotStatus}. This method is * intended to be used when a coherent state of {@link IndexShardSnapshotStatus} is needed. @@ -289,12 +301,13 @@ public synchronized IndexShardSnapshotStatus.Copy asCopy() { incrementalSize, totalSize, processedSize, - failure + failure, + statusDescription ); } public static IndexShardSnapshotStatus newInitializing(ShardGeneration generation) { - return new IndexShardSnapshotStatus(Stage.INIT, 0L, 0L, 0, 0, 0, 0, 0, 0, null, generation); + return new IndexShardSnapshotStatus(Stage.INIT, 0L, 0L, 0, 0, 0, 0, 0, 0, null, generation, "initializing"); } public static IndexShardSnapshotStatus.Copy newFailed(final String failure) { @@ -302,7 +315,7 @@ public static IndexShardSnapshotStatus.Copy newFailed(final String failure) { if (failure == null) { throw new IllegalArgumentException("A failure description is required for a failed IndexShardSnapshotStatus"); } - return new IndexShardSnapshotStatus(Stage.FAILURE, 0L, 0L, 0, 0, 0, 0, 0, 0, failure, null).asCopy(); + return new IndexShardSnapshotStatus(Stage.FAILURE, 0L, 0L, 0, 0, 0, 0, 0, 0, failure, null, "initialized as failed").asCopy(); } public static IndexShardSnapshotStatus.Copy newDone( @@ -326,7 +339,8 @@ public static IndexShardSnapshotStatus.Copy newDone( size, incrementalSize, null, - generation + generation, + "initialized as done" ).asCopy(); } @@ -345,6 +359,7 @@ public static class Copy { private final long processedSize; private final long incrementalSize; private final String failure; + private final String statusDescription; public Copy( final Stage stage, @@ -356,7 +371,8 @@ public Copy( final long incrementalSize, final long totalSize, final long processedSize, - final String failure + final String failure, + final String statusDescription ) { this.stage = stage; this.startTime = startTime; @@ -368,6 +384,7 @@ public Copy( this.processedSize = processedSize; this.incrementalSize = incrementalSize; this.failure = failure; + this.statusDescription = statusDescription; } public Stage getStage() { @@ -410,6 +427,10 @@ public String getFailure() { return failure; } + public String getStatusDescription() { + return statusDescription; + } + @Override public String toString() { return "index shard snapshot status (" @@ -433,6 +454,8 @@ public String toString() { + processedSize + ", failure='" + failure + + "', statusDescription='" + + statusDescription + '\'' + ')'; } @@ -461,6 +484,8 @@ public String toString() { + processedSize + ", failure='" + failure + + "', statusDescription='" + + statusDescription + '\'' + ')'; } diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java index cc22b60991703..11386eba10196 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java @@ -3186,6 +3186,7 @@ private void writeAtomic( @Override public void snapshotShard(SnapshotShardContext context) { + context.status().updateStatusDescription("queued in snapshot task runner"); shardSnapshotTaskRunner.enqueueShardSnapshot(context); } @@ -3198,6 +3199,7 @@ private void doSnapshotShard(SnapshotShardContext context) { final ShardId shardId = store.shardId(); final SnapshotId snapshotId = context.snapshotId(); final IndexShardSnapshotStatus snapshotStatus = context.status(); + snapshotStatus.updateStatusDescription("snapshot task runner: setting up shard snapshot"); final long startTime = threadPool.absoluteTimeInMillis(); try { final ShardGeneration generation = snapshotStatus.generation(); @@ -3206,6 +3208,7 @@ private void doSnapshotShard(SnapshotShardContext context) { final Set blobs; if (generation == null) { snapshotStatus.ensureNotAborted(); + snapshotStatus.updateStatusDescription("snapshot task runner: listing blob prefixes"); try { blobs = shardContainer.listBlobsByPrefix(OperationPurpose.SNAPSHOT_METADATA, SNAPSHOT_INDEX_PREFIX).keySet(); } catch (IOException e) { @@ -3216,6 +3219,7 @@ private void doSnapshotShard(SnapshotShardContext context) { } snapshotStatus.ensureNotAborted(); + snapshotStatus.updateStatusDescription("snapshot task runner: loading snapshot blobs"); Tuple tuple = buildBlobStoreIndexShardSnapshots( context.indexId(), shardId.id(), @@ -3316,6 +3320,7 @@ private void doSnapshotShard(SnapshotShardContext context) { indexCommitPointFiles = filesFromSegmentInfos; } + snapshotStatus.updateStatusDescription("snapshot task runner: starting shard snapshot"); snapshotStatus.moveToStarted( startTime, indexIncrementalFileCount, @@ -3342,6 +3347,7 @@ private void doSnapshotShard(SnapshotShardContext context) { BlobStoreIndexShardSnapshot.FileInfo.SERIALIZE_WRITER_UUID, Boolean.toString(writeFileInfoWriterUUID) ); + snapshotStatus.updateStatusDescription("snapshot task runner: updating blob store with new shard generation"); INDEX_SHARD_SNAPSHOTS_FORMAT.write( updatedBlobStoreIndexShardSnapshots, shardContainer, @@ -3387,6 +3393,7 @@ private void doSnapshotShard(SnapshotShardContext context) { BlobStoreIndexShardSnapshot.FileInfo.SERIALIZE_WRITER_UUID, Boolean.toString(writeFileInfoWriterUUID) ); + snapshotStatus.updateStatusDescription("no shard generations: writing new index-${N} file"); writeShardIndexBlobAtomic(shardContainer, newGen, updatedBlobStoreIndexShardSnapshots, serializationParams); } catch (IOException e) { throw new IndexShardSnapshotFailedException( @@ -3401,6 +3408,7 @@ private void doSnapshotShard(SnapshotShardContext context) { } snapshotStatus.addProcessedFiles(finalFilesInShardMetadataCount, finalFilesInShardMetadataSize); try { + snapshotStatus.updateStatusDescription("no shard generations: deleting blobs"); deleteFromContainer(OperationPurpose.SNAPSHOT_METADATA, shardContainer, blobsToDelete.iterator()); } catch (IOException e) { logger.warn( @@ -3414,6 +3422,7 @@ private void doSnapshotShard(SnapshotShardContext context) { // filesToSnapshot will be emptied while snapshotting the file. We make a copy here for cleanup purpose in case of failure. final AtomicReference> fileToCleanUp = new AtomicReference<>(List.copyOf(filesToSnapshot)); final ActionListener> allFilesUploadedListener = ActionListener.assertOnce(ActionListener.wrap(ignore -> { + snapshotStatus.updateStatusDescription("all files uploaded: finalizing"); final IndexShardSnapshotStatus.Copy lastSnapshotStatus = snapshotStatus.moveToFinalize(); // now create and write the commit point @@ -3435,6 +3444,7 @@ private void doSnapshotShard(SnapshotShardContext context) { BlobStoreIndexShardSnapshot.FileInfo.SERIALIZE_WRITER_UUID, Boolean.toString(writeFileInfoWriterUUID) ); + snapshotStatus.updateStatusDescription("all files uploaded: writing to index shard file"); INDEX_SHARD_SNAPSHOT_FORMAT.write( blobStoreIndexShardSnapshot, shardContainer, @@ -3451,10 +3461,12 @@ private void doSnapshotShard(SnapshotShardContext context) { ByteSizeValue.ofBytes(blobStoreIndexShardSnapshot.totalSize()), getSegmentInfoFileCount(blobStoreIndexShardSnapshot.indexFiles()) ); + snapshotStatus.updateStatusDescription("all files uploaded: done"); snapshotStatus.moveToDone(threadPool.absoluteTimeInMillis(), shardSnapshotResult); context.onResponse(shardSnapshotResult); }, e -> { try { + snapshotStatus.updateStatusDescription("all files uploaded: cleaning up data files, exception while finalizing: " + e); shardContainer.deleteBlobsIgnoringIfNotExists( OperationPurpose.SNAPSHOT_DATA, Iterators.flatMap(fileToCleanUp.get().iterator(), f -> Iterators.forRange(0, f.numberOfParts(), f::partName)) @@ -3517,6 +3529,7 @@ protected void snapshotFiles( ) { final int noOfFilesToSnapshot = filesToSnapshot.size(); final ActionListener filesListener = fileQueueListener(filesToSnapshot, noOfFilesToSnapshot, allFilesUploadedListener); + context.status().updateStatusDescription("enqueued file snapshot tasks: threads running concurrent file uploads"); for (int i = 0; i < noOfFilesToSnapshot; i++) { shardSnapshotTaskRunner.enqueueFileSnapshot(context, filesToSnapshot::poll, filesListener); } diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java index 234c0239a68ce..90111c44fbd96 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java @@ -61,6 +61,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.function.Supplier; import static java.util.Collections.emptyMap; import static org.elasticsearch.core.Strings.format; @@ -108,6 +109,7 @@ public SnapshotShardsService( this.threadPool = transportService.getThreadPool(); this.snapshotShutdownProgressTracker = new SnapshotShutdownProgressTracker( () -> clusterService.state().nodes().getLocalNodeId(), + (callerLogger) -> logIndexShardSnapshotStatuses(callerLogger), clusterService.getClusterSettings(), threadPool ); @@ -234,6 +236,14 @@ public void beforeIndexShardClosed(ShardId shardId, @Nullable IndexShard indexSh } } + private void logIndexShardSnapshotStatuses(Logger callerLogger) { + for (var snapshotStatuses : shardSnapshots.values()) { + for (var shardSnapshot : snapshotStatuses.entrySet()) { + callerLogger.info(Strings.format("ShardId %s, %s", shardSnapshot.getKey(), shardSnapshot.getValue())); + } + } + } + /** * Returns status of shards that are snapshotted on the node and belong to the given snapshot *

@@ -321,7 +331,8 @@ private void handleUpdatedSnapshotsInProgressEntry(String localNodeId, boolean r sid, ShardState.FAILED, shard.getValue().reason(), - shard.getValue().generation() + shard.getValue().generation(), + () -> null ); } } else { @@ -372,6 +383,7 @@ private void startNewShardSnapshots(String localNodeId, SnapshotsInProgress.Entr + snapshotStatus.generation() + "] for snapshot with old-format compatibility"; shardSnapshotTasks.add(newShardSnapshotTask(shardId, snapshot, indexId, snapshotStatus, entry.version(), entry.startTime())); + snapshotStatus.updateStatusDescription("shard snapshot scheduled to start"); } threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(() -> shardSnapshotTasks.forEach(Runnable::run)); @@ -383,6 +395,7 @@ private void pauseShardSnapshotsForNodeRemoval(String localNodeId, SnapshotsInPr for (final Map.Entry shardEntry : entry.shards().entrySet()) { final ShardId shardId = shardEntry.getKey(); final ShardSnapshotStatus masterShardSnapshotStatus = shardEntry.getValue(); + IndexShardSnapshotStatus indexShardSnapshotStatus = localShardSnapshots.get(shardId); if (masterShardSnapshotStatus.state() != ShardState.INIT) { // shard snapshot not currently scheduled by master @@ -402,7 +415,11 @@ private void pauseShardSnapshotsForNodeRemoval(String localNodeId, SnapshotsInPr shardId, ShardState.PAUSED_FOR_NODE_REMOVAL, "paused", - masterShardSnapshotStatus.generation() + masterShardSnapshotStatus.generation(), + () -> { + indexShardSnapshotStatus.updateStatusDescription("finished: master notification attempt complete"); + return null; + } ); } else { // shard snapshot currently running, mark for pause @@ -419,9 +436,16 @@ private Runnable newShardSnapshotTask( final IndexVersion entryVersion, final long entryStartTime ) { + Supplier postMasterNotificationAction = () -> { + snapshotStatus.updateStatusDescription("finished: master notification attempt complete"); + return null; + }; + + // Listener that runs on completion of the shard snapshot: it will notify the master node of success or failure. ActionListener snapshotResultListener = new ActionListener<>() { @Override public void onResponse(ShardSnapshotResult shardSnapshotResult) { + snapshotStatus.updateStatusDescription("snapshot succeeded: proceeding to notify master of success"); final ShardGeneration newGeneration = shardSnapshotResult.getGeneration(); assert newGeneration != null; assert newGeneration.equals(snapshotStatus.generation()); @@ -436,11 +460,13 @@ public void onResponse(ShardSnapshotResult shardSnapshotResult) { snapshotStatus.generation() ); } - notifySuccessfulSnapshotShard(snapshot, shardId, shardSnapshotResult); + + notifySuccessfulSnapshotShard(snapshot, shardId, shardSnapshotResult, postMasterNotificationAction); } @Override public void onFailure(Exception e) { + snapshotStatus.updateStatusDescription("failed with exception '" + e + ": proceeding to notify master of failure"); final String failure; final Stage nextStage; if (e instanceof AbortedSnapshotException) { @@ -457,7 +483,14 @@ public void onFailure(Exception e) { logger.warn(() -> format("[%s][%s] failed to snapshot shard", shardId, snapshot), e); } final var shardState = snapshotStatus.moveToUnsuccessful(nextStage, failure, threadPool.absoluteTimeInMillis()); - notifyUnsuccessfulSnapshotShard(snapshot, shardId, shardState, failure, snapshotStatus.generation()); + notifyUnsuccessfulSnapshotShard( + snapshot, + shardId, + shardState, + failure, + snapshotStatus.generation(), + postMasterNotificationAction + ); } }; @@ -508,6 +541,7 @@ private void snapshot( ActionListener resultListener ) { ActionListener.run(resultListener, listener -> { + snapshotStatus.updateStatusDescription("has started"); snapshotStatus.ensureNotAborted(); final IndexShard indexShard = indicesService.indexServiceSafe(shardId.getIndex()).getShard(shardId.id()); if (indexShard.routingEntry().primary() == false) { @@ -527,7 +561,9 @@ private void snapshot( final Repository repository = repositoriesService.repository(snapshot.getRepository()); SnapshotIndexCommit snapshotIndexCommit = null; try { + snapshotStatus.updateStatusDescription("acquiring commit reference from IndexShard: triggers a shard flush"); snapshotIndexCommit = new SnapshotIndexCommit(indexShard.acquireIndexCommitForSnapshot()); + snapshotStatus.updateStatusDescription("commit reference acquired, proceeding with snapshot"); final var shardStateId = getShardStateId(indexShard, snapshotIndexCommit.indexCommit()); // not aborted so indexCommit() ok snapshotStatus.addAbortListener(makeAbortListener(indexShard.shardId(), snapshot, snapshotIndexCommit)); snapshotStatus.ensureNotAborted(); @@ -652,8 +688,12 @@ private void syncShardStatsOnNewMaster(List entries) snapshot.snapshot(), shardId ); - notifySuccessfulSnapshotShard(snapshot.snapshot(), shardId, localShard.getValue().getShardSnapshotResult()); - + notifySuccessfulSnapshotShard( + snapshot.snapshot(), + shardId, + localShard.getValue().getShardSnapshotResult(), + () -> null + ); } else if (stage == Stage.FAILURE) { // but we think the shard failed - we need to make new master know that the shard failed logger.debug( @@ -667,7 +707,8 @@ private void syncShardStatsOnNewMaster(List entries) shardId, ShardState.FAILED, indexShardSnapshotStatus.getFailure(), - localShard.getValue().generation() + localShard.getValue().generation(), + () -> null ); } else if (stage == Stage.PAUSED) { // but we think the shard has paused - we need to make new master know that @@ -680,7 +721,8 @@ private void syncShardStatsOnNewMaster(List entries) shardId, ShardState.PAUSED_FOR_NODE_REMOVAL, indexShardSnapshotStatus.getFailure(), - localShard.getValue().generation() + localShard.getValue().generation(), + () -> null ); } } @@ -693,10 +735,20 @@ private void syncShardStatsOnNewMaster(List entries) /** * Notify the master node that the given shard snapshot completed successfully. */ - private void notifySuccessfulSnapshotShard(final Snapshot snapshot, final ShardId shardId, ShardSnapshotResult shardSnapshotResult) { + private void notifySuccessfulSnapshotShard( + final Snapshot snapshot, + final ShardId shardId, + ShardSnapshotResult shardSnapshotResult, + Supplier postMasterNotificationAction + ) { assert shardSnapshotResult != null; assert shardSnapshotResult.getGeneration() != null; - sendSnapshotShardUpdate(snapshot, shardId, ShardSnapshotStatus.success(clusterService.localNode().getId(), shardSnapshotResult)); + sendSnapshotShardUpdate( + snapshot, + shardId, + ShardSnapshotStatus.success(clusterService.localNode().getId(), shardSnapshotResult), + postMasterNotificationAction + ); } /** @@ -707,13 +759,15 @@ private void notifyUnsuccessfulSnapshotShard( final ShardId shardId, final ShardState shardState, final String failure, - final ShardGeneration generation + final ShardGeneration generation, + Supplier postMasterNotificationAction ) { assert shardState == ShardState.FAILED || shardState == ShardState.PAUSED_FOR_NODE_REMOVAL : shardState; sendSnapshotShardUpdate( snapshot, shardId, - new ShardSnapshotStatus(clusterService.localNode().getId(), shardState, generation, failure) + new ShardSnapshotStatus(clusterService.localNode().getId(), shardState, generation, failure), + postMasterNotificationAction ); if (shardState == ShardState.PAUSED_FOR_NODE_REMOVAL) { logger.debug( @@ -726,7 +780,12 @@ private void notifyUnsuccessfulSnapshotShard( } /** Updates the shard snapshot status by sending a {@link UpdateIndexShardSnapshotStatusRequest} to the master node */ - private void sendSnapshotShardUpdate(final Snapshot snapshot, final ShardId shardId, final ShardSnapshotStatus status) { + private void sendSnapshotShardUpdate( + final Snapshot snapshot, + final ShardId shardId, + final ShardSnapshotStatus status, + Supplier postMasterNotificationAction + ) { ActionListener updateResultListener = new ActionListener<>() { @Override public void onResponse(Void aVoid) { @@ -738,9 +797,11 @@ public void onFailure(Exception e) { logger.warn(() -> format("[%s][%s] failed to update snapshot state to [%s]", shardId, snapshot, status), e); } }; + snapshotShutdownProgressTracker.trackRequestSentToMaster(snapshot, shardId); var releaseTrackerRequestRunsBeforeResultListener = ActionListener.runBefore(updateResultListener, () -> { snapshotShutdownProgressTracker.releaseRequestSentToMaster(snapshot, shardId); + postMasterNotificationAction.get(); }); remoteFailedRequestDeduplicator.executeOnce( diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotShutdownProgressTracker.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotShutdownProgressTracker.java index 5d81e3c4e46af..45f2fb96fce4e 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotShutdownProgressTracker.java +++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotShutdownProgressTracker.java @@ -25,6 +25,7 @@ import java.util.Map; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Consumer; import java.util.function.Supplier; /** @@ -45,6 +46,7 @@ public class SnapshotShutdownProgressTracker { private static final Logger logger = LogManager.getLogger(SnapshotShutdownProgressTracker.class); private final Supplier getLocalNodeId; + private final Consumer logIndexShardSnapshotStatuses; private final ThreadPool threadPool; private volatile TimeValue progressLoggerInterval; @@ -83,8 +85,14 @@ public class SnapshotShutdownProgressTracker { private final AtomicLong abortedCount = new AtomicLong(); private final AtomicLong pausedCount = new AtomicLong(); - public SnapshotShutdownProgressTracker(Supplier localNodeIdSupplier, ClusterSettings clusterSettings, ThreadPool threadPool) { + public SnapshotShutdownProgressTracker( + Supplier localNodeIdSupplier, + Consumer logShardStatuses, + ClusterSettings clusterSettings, + ThreadPool threadPool + ) { this.getLocalNodeId = localNodeIdSupplier; + this.logIndexShardSnapshotStatuses = logShardStatuses; clusterSettings.initializeAndWatch( SNAPSHOT_PROGRESS_DURING_SHUTDOWN_LOG_INTERVAL_SETTING, value -> this.progressLoggerInterval = value @@ -122,14 +130,14 @@ private void cancelProgressLogger() { } /** - * Logs some statistics about shard snapshot progress. + * Logs information about shard snapshot progress. */ private void logProgressReport() { logger.info( """ Current active shard snapshot stats on data node [{}]. \ - Node shutdown cluster state update received at [{}]. \ - Finished signalling shard snapshots to pause at [{}]. \ + Node shutdown cluster state update received at [{} millis]. \ + Finished signalling shard snapshots to pause at [{} millis]. \ Number shard snapshots running [{}]. \ Number shard snapshots waiting for master node reply to status update request [{}] \ Shard snapshot completion stats since shutdown began: Done [{}]; Failed [{}]; Aborted [{}]; Paused [{}]\ @@ -144,6 +152,8 @@ private void logProgressReport() { abortedCount.get(), pausedCount.get() ); + // Use a callback to log the shard snapshot details. + logIndexShardSnapshotStatuses.accept(logger); } /** diff --git a/server/src/test/java/org/elasticsearch/snapshots/SnapshotShutdownProgressTrackerTests.java b/server/src/test/java/org/elasticsearch/snapshots/SnapshotShutdownProgressTrackerTests.java index fbf742ae2ea57..8adcb3eb9d5f4 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/SnapshotShutdownProgressTrackerTests.java +++ b/server/src/test/java/org/elasticsearch/snapshots/SnapshotShutdownProgressTrackerTests.java @@ -110,8 +110,10 @@ void simulateShardSnapshotsCompleting(SnapshotShutdownProgressTracker tracker, i } public void testTrackerLogsStats() { + final String dummyStatusMsg = "Dummy log message for index shard snapshot statuses"; SnapshotShutdownProgressTracker tracker = new SnapshotShutdownProgressTracker( getLocalNodeIdSupplier, + (callerLogger) -> callerLogger.info(dummyStatusMsg), clusterSettings, testThreadPool ); @@ -144,6 +146,14 @@ public void testTrackerLogsStats() { "*Shard snapshot completion stats since shutdown began: Done [2]; Failed [1]; Aborted [1]; Paused [1]*" ) ); + mockLog.addExpectation( + new MockLog.SeenEventExpectation( + "index shard snapshot statuses", + SnapshotShutdownProgressTracker.class.getCanonicalName(), + Level.INFO, + dummyStatusMsg + ) + ); // Simulate updating the shard snapshot completion stats. simulateShardSnapshotsCompleting(tracker, 5); @@ -171,6 +181,7 @@ public void testTrackerProgressLoggingIntervalSettingCanBeDisabled() { ); SnapshotShutdownProgressTracker tracker = new SnapshotShutdownProgressTracker( getLocalNodeIdSupplier, + (callerLogger) -> {}, clusterSettingsDisabledLogging, testThreadPool ); @@ -214,6 +225,7 @@ public void testTrackerIntervalSettingDynamically() { ); SnapshotShutdownProgressTracker tracker = new SnapshotShutdownProgressTracker( getLocalNodeIdSupplier, + (callerLogger) -> {}, clusterSettingsDisabledLogging, testThreadPool ); @@ -253,6 +265,7 @@ public void testTrackerIntervalSettingDynamically() { public void testTrackerPauseTimestamp() { SnapshotShutdownProgressTracker tracker = new SnapshotShutdownProgressTracker( getLocalNodeIdSupplier, + (callerLogger) -> {}, clusterSettings, testThreadPool ); @@ -263,7 +276,7 @@ public void testTrackerPauseTimestamp() { "pausing timestamp should be set", SnapshotShutdownProgressTracker.class.getName(), Level.INFO, - "*Finished signalling shard snapshots to pause at [" + testThreadPool.relativeTimeInMillis() + "]*" + "*Finished signalling shard snapshots to pause at [" + testThreadPool.relativeTimeInMillis() + " millis]*" ) ); @@ -283,6 +296,7 @@ public void testTrackerPauseTimestamp() { public void testTrackerRequestsToMaster() { SnapshotShutdownProgressTracker tracker = new SnapshotShutdownProgressTracker( getLocalNodeIdSupplier, + (callerLogger) -> {}, clusterSettings, testThreadPool ); @@ -335,6 +349,7 @@ public void testTrackerRequestsToMaster() { public void testTrackerClearShutdown() { SnapshotShutdownProgressTracker tracker = new SnapshotShutdownProgressTracker( getLocalNodeIdSupplier, + (callerLogger) -> {}, clusterSettings, testThreadPool ); @@ -345,7 +360,7 @@ public void testTrackerClearShutdown() { "pausing timestamp should be unset", SnapshotShutdownProgressTracker.class.getName(), Level.INFO, - "*Finished signalling shard snapshots to pause at [-1]*" + "*Finished signalling shard snapshots to pause at [-1 millis]*" ) );