Skip to content

Commit

Permalink
Add a shard snapshot current status debug string (elastic#118198)
Browse files Browse the repository at this point in the history
SnapshotShardsService will update the new debug string field in
IndexShardSnapshotStatus as a shard snapshot operation proceeds so
that the current state can be logged. The
SnapshotShutdownProgressTracker will iterate through the
SnapshotShardsService's list of shard snapshots and log their current
status.

We want to know where in the code a shard snapshot operation
potentially gets stuck. This new field should be updated as
frequently as is reasonable to inform on the shard snapshot's progress.

Closes ES-10261
  • Loading branch information
DiannaHohensee authored Dec 10, 2024
1 parent a27e5db commit 47be542
Show file tree
Hide file tree
Showing 6 changed files with 166 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -524,6 +524,15 @@ public void testSnapshotShutdownProgressTracker() throws Exception {
"Pause signals have been set for all shard snapshots on data node [" + nodeForRemovalId + "]"
)
);
mockLog.addExpectation(
new MockLog.SeenEventExpectation(
"SnapshotShutdownProgressTracker index shard snapshot status messages",
SnapshotShutdownProgressTracker.class.getCanonicalName(),
Level.INFO,
// Expect the shard snapshot to stall in data file upload, since we've blocked the data node file upload to the blob store.
"statusDescription='enqueued file snapshot tasks: threads running concurrent file uploads'"
)
);

putShutdownForRemovalMetadata(nodeForRemoval, clusterService);

Expand Down Expand Up @@ -583,6 +592,14 @@ public void testSnapshotShutdownProgressTracker() throws Exception {
"Current active shard snapshot stats on data node [" + nodeForRemovalId + "]*Paused [" + numShards + "]"
)
);
mockLog.addExpectation(
new MockLog.SeenEventExpectation(
"SnapshotShutdownProgressTracker index shard snapshot messages",
SnapshotShutdownProgressTracker.class.getCanonicalName(),
Level.INFO,
"statusDescription='finished: master notification attempt complete'"
)
);

// Release the master node to respond
snapshotStatusUpdateLatch.countDown();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ public enum AbortStatus {
private long processedSize;
private String failure;
private final SubscribableListener<AbortStatus> abortListeners = new SubscribableListener<>();
private volatile String statusDescription;

private IndexShardSnapshotStatus(
final Stage stage,
Expand All @@ -110,7 +111,8 @@ private IndexShardSnapshotStatus(
final long totalSize,
final long processedSize,
final String failure,
final ShardGeneration generation
final ShardGeneration generation,
final String statusDescription
) {
this.stage = new AtomicReference<>(Objects.requireNonNull(stage));
this.generation = new AtomicReference<>(generation);
Expand All @@ -124,6 +126,7 @@ private IndexShardSnapshotStatus(
this.processedSize = processedSize;
this.incrementalSize = incrementalSize;
this.failure = failure;
updateStatusDescription(statusDescription);
}

public synchronized Copy moveToStarted(
Expand Down Expand Up @@ -272,6 +275,15 @@ public synchronized void addProcessedFiles(int count, long totalSize) {
processedSize += totalSize;
}

/**
* Updates the string explanation for what the snapshot is actively doing right now.
*/
public void updateStatusDescription(String statusString) {
assert statusString != null;
assert statusString.isEmpty() == false;
this.statusDescription = statusString;
}

/**
* Returns a copy of the current {@link IndexShardSnapshotStatus}. This method is
* intended to be used when a coherent state of {@link IndexShardSnapshotStatus} is needed.
Expand All @@ -289,20 +301,21 @@ public synchronized IndexShardSnapshotStatus.Copy asCopy() {
incrementalSize,
totalSize,
processedSize,
failure
failure,
statusDescription
);
}

public static IndexShardSnapshotStatus newInitializing(ShardGeneration generation) {
return new IndexShardSnapshotStatus(Stage.INIT, 0L, 0L, 0, 0, 0, 0, 0, 0, null, generation);
return new IndexShardSnapshotStatus(Stage.INIT, 0L, 0L, 0, 0, 0, 0, 0, 0, null, generation, "initializing");
}

public static IndexShardSnapshotStatus.Copy newFailed(final String failure) {
assert failure != null : "expecting non null failure for a failed IndexShardSnapshotStatus";
if (failure == null) {
throw new IllegalArgumentException("A failure description is required for a failed IndexShardSnapshotStatus");
}
return new IndexShardSnapshotStatus(Stage.FAILURE, 0L, 0L, 0, 0, 0, 0, 0, 0, failure, null).asCopy();
return new IndexShardSnapshotStatus(Stage.FAILURE, 0L, 0L, 0, 0, 0, 0, 0, 0, failure, null, "initialized as failed").asCopy();
}

public static IndexShardSnapshotStatus.Copy newDone(
Expand All @@ -326,7 +339,8 @@ public static IndexShardSnapshotStatus.Copy newDone(
size,
incrementalSize,
null,
generation
generation,
"initialized as done"
).asCopy();
}

Expand All @@ -345,6 +359,7 @@ public static class Copy {
private final long processedSize;
private final long incrementalSize;
private final String failure;
private final String statusDescription;

public Copy(
final Stage stage,
Expand All @@ -356,7 +371,8 @@ public Copy(
final long incrementalSize,
final long totalSize,
final long processedSize,
final String failure
final String failure,
final String statusDescription
) {
this.stage = stage;
this.startTime = startTime;
Expand All @@ -368,6 +384,7 @@ public Copy(
this.processedSize = processedSize;
this.incrementalSize = incrementalSize;
this.failure = failure;
this.statusDescription = statusDescription;
}

public Stage getStage() {
Expand Down Expand Up @@ -410,6 +427,10 @@ public String getFailure() {
return failure;
}

public String getStatusDescription() {
return statusDescription;
}

@Override
public String toString() {
return "index shard snapshot status ("
Expand All @@ -433,6 +454,8 @@ public String toString() {
+ processedSize
+ ", failure='"
+ failure
+ "', statusDescription='"
+ statusDescription
+ '\''
+ ')';
}
Expand Down Expand Up @@ -461,6 +484,8 @@ public String toString() {
+ processedSize
+ ", failure='"
+ failure
+ "', statusDescription='"
+ statusDescription
+ '\''
+ ')';
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3186,6 +3186,7 @@ private void writeAtomic(

@Override
public void snapshotShard(SnapshotShardContext context) {
context.status().updateStatusDescription("queued in snapshot task runner");
shardSnapshotTaskRunner.enqueueShardSnapshot(context);
}

Expand All @@ -3198,6 +3199,7 @@ private void doSnapshotShard(SnapshotShardContext context) {
final ShardId shardId = store.shardId();
final SnapshotId snapshotId = context.snapshotId();
final IndexShardSnapshotStatus snapshotStatus = context.status();
snapshotStatus.updateStatusDescription("snapshot task runner: setting up shard snapshot");
final long startTime = threadPool.absoluteTimeInMillis();
try {
final ShardGeneration generation = snapshotStatus.generation();
Expand All @@ -3206,6 +3208,7 @@ private void doSnapshotShard(SnapshotShardContext context) {
final Set<String> blobs;
if (generation == null) {
snapshotStatus.ensureNotAborted();
snapshotStatus.updateStatusDescription("snapshot task runner: listing blob prefixes");
try {
blobs = shardContainer.listBlobsByPrefix(OperationPurpose.SNAPSHOT_METADATA, SNAPSHOT_INDEX_PREFIX).keySet();
} catch (IOException e) {
Expand All @@ -3216,6 +3219,7 @@ private void doSnapshotShard(SnapshotShardContext context) {
}

snapshotStatus.ensureNotAborted();
snapshotStatus.updateStatusDescription("snapshot task runner: loading snapshot blobs");
Tuple<BlobStoreIndexShardSnapshots, ShardGeneration> tuple = buildBlobStoreIndexShardSnapshots(
context.indexId(),
shardId.id(),
Expand Down Expand Up @@ -3316,6 +3320,7 @@ private void doSnapshotShard(SnapshotShardContext context) {
indexCommitPointFiles = filesFromSegmentInfos;
}

snapshotStatus.updateStatusDescription("snapshot task runner: starting shard snapshot");
snapshotStatus.moveToStarted(
startTime,
indexIncrementalFileCount,
Expand All @@ -3342,6 +3347,7 @@ private void doSnapshotShard(SnapshotShardContext context) {
BlobStoreIndexShardSnapshot.FileInfo.SERIALIZE_WRITER_UUID,
Boolean.toString(writeFileInfoWriterUUID)
);
snapshotStatus.updateStatusDescription("snapshot task runner: updating blob store with new shard generation");
INDEX_SHARD_SNAPSHOTS_FORMAT.write(
updatedBlobStoreIndexShardSnapshots,
shardContainer,
Expand Down Expand Up @@ -3387,6 +3393,7 @@ private void doSnapshotShard(SnapshotShardContext context) {
BlobStoreIndexShardSnapshot.FileInfo.SERIALIZE_WRITER_UUID,
Boolean.toString(writeFileInfoWriterUUID)
);
snapshotStatus.updateStatusDescription("no shard generations: writing new index-${N} file");
writeShardIndexBlobAtomic(shardContainer, newGen, updatedBlobStoreIndexShardSnapshots, serializationParams);
} catch (IOException e) {
throw new IndexShardSnapshotFailedException(
Expand All @@ -3401,6 +3408,7 @@ private void doSnapshotShard(SnapshotShardContext context) {
}
snapshotStatus.addProcessedFiles(finalFilesInShardMetadataCount, finalFilesInShardMetadataSize);
try {
snapshotStatus.updateStatusDescription("no shard generations: deleting blobs");
deleteFromContainer(OperationPurpose.SNAPSHOT_METADATA, shardContainer, blobsToDelete.iterator());
} catch (IOException e) {
logger.warn(
Expand All @@ -3414,6 +3422,7 @@ private void doSnapshotShard(SnapshotShardContext context) {
// filesToSnapshot will be emptied while snapshotting the file. We make a copy here for cleanup purpose in case of failure.
final AtomicReference<List<FileInfo>> fileToCleanUp = new AtomicReference<>(List.copyOf(filesToSnapshot));
final ActionListener<Collection<Void>> allFilesUploadedListener = ActionListener.assertOnce(ActionListener.wrap(ignore -> {
snapshotStatus.updateStatusDescription("all files uploaded: finalizing");
final IndexShardSnapshotStatus.Copy lastSnapshotStatus = snapshotStatus.moveToFinalize();

// now create and write the commit point
Expand All @@ -3435,6 +3444,7 @@ private void doSnapshotShard(SnapshotShardContext context) {
BlobStoreIndexShardSnapshot.FileInfo.SERIALIZE_WRITER_UUID,
Boolean.toString(writeFileInfoWriterUUID)
);
snapshotStatus.updateStatusDescription("all files uploaded: writing to index shard file");
INDEX_SHARD_SNAPSHOT_FORMAT.write(
blobStoreIndexShardSnapshot,
shardContainer,
Expand All @@ -3451,10 +3461,12 @@ private void doSnapshotShard(SnapshotShardContext context) {
ByteSizeValue.ofBytes(blobStoreIndexShardSnapshot.totalSize()),
getSegmentInfoFileCount(blobStoreIndexShardSnapshot.indexFiles())
);
snapshotStatus.updateStatusDescription("all files uploaded: done");
snapshotStatus.moveToDone(threadPool.absoluteTimeInMillis(), shardSnapshotResult);
context.onResponse(shardSnapshotResult);
}, e -> {
try {
snapshotStatus.updateStatusDescription("all files uploaded: cleaning up data files, exception while finalizing: " + e);
shardContainer.deleteBlobsIgnoringIfNotExists(
OperationPurpose.SNAPSHOT_DATA,
Iterators.flatMap(fileToCleanUp.get().iterator(), f -> Iterators.forRange(0, f.numberOfParts(), f::partName))
Expand Down Expand Up @@ -3517,6 +3529,7 @@ protected void snapshotFiles(
) {
final int noOfFilesToSnapshot = filesToSnapshot.size();
final ActionListener<Void> filesListener = fileQueueListener(filesToSnapshot, noOfFilesToSnapshot, allFilesUploadedListener);
context.status().updateStatusDescription("enqueued file snapshot tasks: threads running concurrent file uploads");
for (int i = 0; i < noOfFilesToSnapshot; i++) {
shardSnapshotTaskRunner.enqueueFileSnapshot(context, filesToSnapshot::poll, filesListener);
}
Expand Down
Loading

0 comments on commit 47be542

Please sign in to comment.