Skip to content

Commit

Permalink
Add a shard snapshot current state debug string
Browse files Browse the repository at this point in the history
The SnapshotShutdownProgressTracker will iterate through the
SnapshotShardsService's list of shard snapshots and log their current
status. SnapshotShardsService will now update a new debug string
field in IndexShardSnapshotStatus as a shard snapshot operation
proceeds so that the current state can be logged.
  • Loading branch information
DiannaHohensee committed Dec 6, 2024
1 parent 4f030ef commit 3c06926
Show file tree
Hide file tree
Showing 6 changed files with 162 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -524,6 +524,15 @@ public void testSnapshotShutdownProgressTracker() throws Exception {
"Pause signals have been set for all shard snapshots on data node [" + nodeForRemovalId + "]"
)
);
mockLog.addExpectation(
new MockLog.SeenEventExpectation(
"SnapshotShutdownProgressTracker index shard snapshot status messages",
SnapshotShutdownProgressTracker.class.getCanonicalName(),
Level.INFO,
// Expect the shard snapshot to stall in data file upload, since we've blocked the data node file upload to the blob store.
"debugStatusString='enqueued file snapshot tasks: threads running concurrent file uploads'"
)
);

putShutdownForRemovalMetadata(nodeForRemoval, clusterService);

Expand Down Expand Up @@ -583,6 +592,14 @@ public void testSnapshotShutdownProgressTracker() throws Exception {
"Current active shard snapshot stats on data node [" + nodeForRemovalId + "]*Paused [" + numShards + "]"
)
);
mockLog.addExpectation(
new MockLog.SeenEventExpectation(
"SnapshotShutdownProgressTracker index shard snapshot messages",
SnapshotShutdownProgressTracker.class.getCanonicalName(),
Level.INFO,
"debugStatusString='finished: master notification attempt complete'"
)
);

// Release the master node to respond
snapshotStatusUpdateLatch.countDown();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ public enum AbortStatus {
private long processedSize;
private String failure;
private final SubscribableListener<AbortStatus> abortListeners = new SubscribableListener<>();
private String debugStatusString;

private IndexShardSnapshotStatus(
final Stage stage,
Expand All @@ -110,7 +111,8 @@ private IndexShardSnapshotStatus(
final long totalSize,
final long processedSize,
final String failure,
final ShardGeneration generation
final ShardGeneration generation,
final String debugStatusString
) {
this.stage = new AtomicReference<>(Objects.requireNonNull(stage));
this.generation = new AtomicReference<>(generation);
Expand All @@ -124,6 +126,7 @@ private IndexShardSnapshotStatus(
this.processedSize = processedSize;
this.incrementalSize = incrementalSize;
this.failure = failure;
this.debugStatusString = debugStatusString;
}

public synchronized Copy moveToStarted(
Expand Down Expand Up @@ -272,6 +275,13 @@ public synchronized void addProcessedFiles(int count, long totalSize) {
processedSize += totalSize;
}

/**
* Updates the string explanation for what the snapshot is actively doing right now.
*/
public synchronized void updateDebugStatusString(String statusString) {
this.debugStatusString = statusString;
}

/**
* Returns a copy of the current {@link IndexShardSnapshotStatus}. This method is
* intended to be used when a coherent state of {@link IndexShardSnapshotStatus} is needed.
Expand All @@ -289,20 +299,21 @@ public synchronized IndexShardSnapshotStatus.Copy asCopy() {
incrementalSize,
totalSize,
processedSize,
failure
failure,
debugStatusString
);
}

public static IndexShardSnapshotStatus newInitializing(ShardGeneration generation) {
return new IndexShardSnapshotStatus(Stage.INIT, 0L, 0L, 0, 0, 0, 0, 0, 0, null, generation);
return new IndexShardSnapshotStatus(Stage.INIT, 0L, 0L, 0, 0, 0, 0, 0, 0, null, generation, "initializing");
}

public static IndexShardSnapshotStatus.Copy newFailed(final String failure) {
assert failure != null : "expecting non null failure for a failed IndexShardSnapshotStatus";
if (failure == null) {
throw new IllegalArgumentException("A failure description is required for a failed IndexShardSnapshotStatus");
}
return new IndexShardSnapshotStatus(Stage.FAILURE, 0L, 0L, 0, 0, 0, 0, 0, 0, failure, null).asCopy();
return new IndexShardSnapshotStatus(Stage.FAILURE, 0L, 0L, 0, 0, 0, 0, 0, 0, failure, null, "instantiated as failed").asCopy();
}

public static IndexShardSnapshotStatus.Copy newDone(
Expand All @@ -326,7 +337,8 @@ public static IndexShardSnapshotStatus.Copy newDone(
size,
incrementalSize,
null,
generation
generation,
"instantiated as done"
).asCopy();
}

Expand All @@ -345,6 +357,7 @@ public static class Copy {
private final long processedSize;
private final long incrementalSize;
private final String failure;
private final String debugStatusString;

public Copy(
final Stage stage,
Expand All @@ -356,7 +369,8 @@ public Copy(
final long incrementalSize,
final long totalSize,
final long processedSize,
final String failure
final String failure,
final String debugStatusString
) {
this.stage = stage;
this.startTime = startTime;
Expand All @@ -368,6 +382,7 @@ public Copy(
this.processedSize = processedSize;
this.incrementalSize = incrementalSize;
this.failure = failure;
this.debugStatusString = debugStatusString;
}

public Stage getStage() {
Expand Down Expand Up @@ -410,6 +425,10 @@ public String getFailure() {
return failure;
}

public String getDebugStatusString() {
return debugStatusString;
}

@Override
public String toString() {
return "index shard snapshot status ("
Expand All @@ -433,6 +452,8 @@ public String toString() {
+ processedSize
+ ", failure='"
+ failure
+ "', debugStatusString='"
+ debugStatusString
+ '\''
+ ')';
}
Expand Down Expand Up @@ -461,6 +482,8 @@ public String toString() {
+ processedSize
+ ", failure='"
+ failure
+ "', debugStatusString='"
+ debugStatusString
+ '\''
+ ')';
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3186,6 +3186,7 @@ private void writeAtomic(

@Override
public void snapshotShard(SnapshotShardContext context) {
context.status().updateDebugStatusString("queued in snapshot task runner");
shardSnapshotTaskRunner.enqueueShardSnapshot(context);
}

Expand All @@ -3198,6 +3199,7 @@ private void doSnapshotShard(SnapshotShardContext context) {
final ShardId shardId = store.shardId();
final SnapshotId snapshotId = context.snapshotId();
final IndexShardSnapshotStatus snapshotStatus = context.status();
snapshotStatus.updateDebugStatusString("snapshot task runner: setting up shard snapshot");
final long startTime = threadPool.absoluteTimeInMillis();
try {
final ShardGeneration generation = snapshotStatus.generation();
Expand All @@ -3206,6 +3208,7 @@ private void doSnapshotShard(SnapshotShardContext context) {
final Set<String> blobs;
if (generation == null) {
snapshotStatus.ensureNotAborted();
snapshotStatus.updateDebugStatusString("snapshot task runner: listing blob prefixes");
try {
blobs = shardContainer.listBlobsByPrefix(OperationPurpose.SNAPSHOT_METADATA, SNAPSHOT_INDEX_PREFIX).keySet();
} catch (IOException e) {
Expand All @@ -3216,6 +3219,7 @@ private void doSnapshotShard(SnapshotShardContext context) {
}

snapshotStatus.ensureNotAborted();
snapshotStatus.updateDebugStatusString("snapshot task runner: loading snapshot blobs");
Tuple<BlobStoreIndexShardSnapshots, ShardGeneration> tuple = buildBlobStoreIndexShardSnapshots(
context.indexId(),
shardId.id(),
Expand Down Expand Up @@ -3316,6 +3320,7 @@ private void doSnapshotShard(SnapshotShardContext context) {
indexCommitPointFiles = filesFromSegmentInfos;
}

snapshotStatus.updateDebugStatusString("snapshot task runner: starting shard snapshot");
snapshotStatus.moveToStarted(
startTime,
indexIncrementalFileCount,
Expand All @@ -3342,6 +3347,7 @@ private void doSnapshotShard(SnapshotShardContext context) {
BlobStoreIndexShardSnapshot.FileInfo.SERIALIZE_WRITER_UUID,
Boolean.toString(writeFileInfoWriterUUID)
);
snapshotStatus.updateDebugStatusString("snapshot task runner: updating blob store with new shard generation");
INDEX_SHARD_SNAPSHOTS_FORMAT.write(
updatedBlobStoreIndexShardSnapshots,
shardContainer,
Expand Down Expand Up @@ -3387,6 +3393,7 @@ private void doSnapshotShard(SnapshotShardContext context) {
BlobStoreIndexShardSnapshot.FileInfo.SERIALIZE_WRITER_UUID,
Boolean.toString(writeFileInfoWriterUUID)
);
snapshotStatus.updateDebugStatusString("no shard generations: writing new index-${N} file");
writeShardIndexBlobAtomic(shardContainer, newGen, updatedBlobStoreIndexShardSnapshots, serializationParams);
} catch (IOException e) {
throw new IndexShardSnapshotFailedException(
Expand All @@ -3401,6 +3408,7 @@ private void doSnapshotShard(SnapshotShardContext context) {
}
snapshotStatus.addProcessedFiles(finalFilesInShardMetadataCount, finalFilesInShardMetadataSize);
try {
snapshotStatus.updateDebugStatusString("no shard generations: deleting blobs");
deleteFromContainer(OperationPurpose.SNAPSHOT_METADATA, shardContainer, blobsToDelete.iterator());
} catch (IOException e) {
logger.warn(
Expand All @@ -3414,6 +3422,7 @@ private void doSnapshotShard(SnapshotShardContext context) {
// filesToSnapshot will be emptied while snapshotting the file. We make a copy here for cleanup purpose in case of failure.
final AtomicReference<List<FileInfo>> fileToCleanUp = new AtomicReference<>(List.copyOf(filesToSnapshot));
final ActionListener<Collection<Void>> allFilesUploadedListener = ActionListener.assertOnce(ActionListener.wrap(ignore -> {
snapshotStatus.updateDebugStatusString("all files uploaded: finalizing");
final IndexShardSnapshotStatus.Copy lastSnapshotStatus = snapshotStatus.moveToFinalize();

// now create and write the commit point
Expand All @@ -3435,6 +3444,7 @@ private void doSnapshotShard(SnapshotShardContext context) {
BlobStoreIndexShardSnapshot.FileInfo.SERIALIZE_WRITER_UUID,
Boolean.toString(writeFileInfoWriterUUID)
);
snapshotStatus.updateDebugStatusString("all files uploaded: writing to index shard file");
INDEX_SHARD_SNAPSHOT_FORMAT.write(
blobStoreIndexShardSnapshot,
shardContainer,
Expand All @@ -3451,10 +3461,12 @@ private void doSnapshotShard(SnapshotShardContext context) {
ByteSizeValue.ofBytes(blobStoreIndexShardSnapshot.totalSize()),
getSegmentInfoFileCount(blobStoreIndexShardSnapshot.indexFiles())
);
snapshotStatus.updateDebugStatusString("all files uploaded: done");
snapshotStatus.moveToDone(threadPool.absoluteTimeInMillis(), shardSnapshotResult);
context.onResponse(shardSnapshotResult);
}, e -> {
try {
snapshotStatus.updateDebugStatusString("all files uploaded: cleaning up data files, exception while finalizing: " + e);
shardContainer.deleteBlobsIgnoringIfNotExists(
OperationPurpose.SNAPSHOT_DATA,
Iterators.flatMap(fileToCleanUp.get().iterator(), f -> Iterators.forRange(0, f.numberOfParts(), f::partName))
Expand Down Expand Up @@ -3519,6 +3531,7 @@ protected void snapshotFiles(
) {
final int noOfFilesToSnapshot = filesToSnapshot.size();
final ActionListener<Void> filesListener = fileQueueListener(filesToSnapshot, noOfFilesToSnapshot, allFilesUploadedListener);
context.status().updateDebugStatusString("enqueued file snapshot tasks: threads running concurrent file uploads");
for (int i = 0; i < noOfFilesToSnapshot; i++) {
shardSnapshotTaskRunner.enqueueFileSnapshot(context, filesToSnapshot::poll, filesListener);
}
Expand Down
Loading

0 comments on commit 3c06926

Please sign in to comment.