Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a shard snapshot current status debug string #118198

Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -524,6 +524,15 @@ public void testSnapshotShutdownProgressTracker() throws Exception {
"Pause signals have been set for all shard snapshots on data node [" + nodeForRemovalId + "]"
)
);
mockLog.addExpectation(
new MockLog.SeenEventExpectation(
"SnapshotShutdownProgressTracker index shard snapshot status messages",
SnapshotShutdownProgressTracker.class.getCanonicalName(),
Level.INFO,
// Expect the shard snapshot to stall in data file upload, since we've blocked the data node file upload to the blob store.
"debugStatusString='enqueued file snapshot tasks: threads running concurrent file uploads'"
)
);

putShutdownForRemovalMetadata(nodeForRemoval, clusterService);

Expand Down Expand Up @@ -583,6 +592,14 @@ public void testSnapshotShutdownProgressTracker() throws Exception {
"Current active shard snapshot stats on data node [" + nodeForRemovalId + "]*Paused [" + numShards + "]"
)
);
mockLog.addExpectation(
new MockLog.SeenEventExpectation(
"SnapshotShutdownProgressTracker index shard snapshot messages",
SnapshotShutdownProgressTracker.class.getCanonicalName(),
Level.INFO,
"debugStatusString='finished: master notification attempt complete'"
)
);

// Release the master node to respond
snapshotStatusUpdateLatch.countDown();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ public enum AbortStatus {
private long processedSize;
private String failure;
private final SubscribableListener<AbortStatus> abortListeners = new SubscribableListener<>();
private String debugStatusString;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably needs to be volatile so that toString() and friends all see the latest value. And in that case there's no need to make the update method synchronized.

Suggested change
private String debugStatusString;
private volatile String debugStatusString;

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, the toString(). Changed 👍 .


private IndexShardSnapshotStatus(
final Stage stage,
Expand All @@ -110,7 +111,8 @@ private IndexShardSnapshotStatus(
final long totalSize,
final long processedSize,
final String failure,
final ShardGeneration generation
final ShardGeneration generation,
final String debugStatusString
) {
this.stage = new AtomicReference<>(Objects.requireNonNull(stage));
this.generation = new AtomicReference<>(generation);
Expand All @@ -124,6 +126,7 @@ private IndexShardSnapshotStatus(
this.processedSize = processedSize;
this.incrementalSize = incrementalSize;
this.failure = failure;
this.debugStatusString = debugStatusString;
}

public synchronized Copy moveToStarted(
Expand Down Expand Up @@ -272,6 +275,13 @@ public synchronized void addProcessedFiles(int count, long totalSize) {
processedSize += totalSize;
}

/**
* Updates the string explanation for what the snapshot is actively doing right now.
*/
public synchronized void updateDebugStatusString(String statusString) {
this.debugStatusString = statusString;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we at least assert that statusString is not null (or empty)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, added 👍

}

/**
* Returns a copy of the current {@link IndexShardSnapshotStatus}. This method is
* intended to be used when a coherent state of {@link IndexShardSnapshotStatus} is needed.
Expand All @@ -289,20 +299,21 @@ public synchronized IndexShardSnapshotStatus.Copy asCopy() {
incrementalSize,
totalSize,
processedSize,
failure
failure,
debugStatusString
);
}

public static IndexShardSnapshotStatus newInitializing(ShardGeneration generation) {
return new IndexShardSnapshotStatus(Stage.INIT, 0L, 0L, 0, 0, 0, 0, 0, 0, null, generation);
return new IndexShardSnapshotStatus(Stage.INIT, 0L, 0L, 0, 0, 0, 0, 0, 0, null, generation, "initializing");
}

public static IndexShardSnapshotStatus.Copy newFailed(final String failure) {
assert failure != null : "expecting non null failure for a failed IndexShardSnapshotStatus";
if (failure == null) {
throw new IllegalArgumentException("A failure description is required for a failed IndexShardSnapshotStatus");
}
return new IndexShardSnapshotStatus(Stage.FAILURE, 0L, 0L, 0, 0, 0, 0, 0, 0, failure, null).asCopy();
return new IndexShardSnapshotStatus(Stage.FAILURE, 0L, 0L, 0, 0, 0, 0, 0, 0, failure, null, "instantiated as failed").asCopy();
}

public static IndexShardSnapshotStatus.Copy newDone(
Expand All @@ -326,7 +337,8 @@ public static IndexShardSnapshotStatus.Copy newDone(
size,
incrementalSize,
null,
generation
generation,
"instantiated as done"
).asCopy();
}

Expand All @@ -345,6 +357,7 @@ public static class Copy {
private final long processedSize;
private final long incrementalSize;
private final String failure;
private final String debugStatusString;

public Copy(
final Stage stage,
Expand All @@ -356,7 +369,8 @@ public Copy(
final long incrementalSize,
final long totalSize,
final long processedSize,
final String failure
final String failure,
final String debugStatusString
) {
this.stage = stage;
this.startTime = startTime;
Expand All @@ -368,6 +382,7 @@ public Copy(
this.processedSize = processedSize;
this.incrementalSize = incrementalSize;
this.failure = failure;
this.debugStatusString = debugStatusString;
}

public Stage getStage() {
Expand Down Expand Up @@ -410,6 +425,10 @@ public String getFailure() {
return failure;
}

public String getDebugStatusString() {
return debugStatusString;
}

@Override
public String toString() {
return "index shard snapshot status ("
Expand All @@ -433,6 +452,8 @@ public String toString() {
+ processedSize
+ ", failure='"
+ failure
+ "', debugStatusString='"
+ debugStatusString
+ '\''
+ ')';
}
Expand Down Expand Up @@ -461,6 +482,8 @@ public String toString() {
+ processedSize
+ ", failure='"
+ failure
+ "', debugStatusString='"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure about exposing this to users as debugStatusString, which I believe might happen since this is logged at INFO sometimes. Seems a little overly technical. How about status or statusDescription?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed. updated to statusDescription 👍

+ debugStatusString
+ '\''
+ ')';
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3186,6 +3186,7 @@ private void writeAtomic(

@Override
public void snapshotShard(SnapshotShardContext context) {
context.status().updateDebugStatusString("queued in snapshot task runner");
shardSnapshotTaskRunner.enqueueShardSnapshot(context);
}

Expand All @@ -3198,6 +3199,7 @@ private void doSnapshotShard(SnapshotShardContext context) {
final ShardId shardId = store.shardId();
final SnapshotId snapshotId = context.snapshotId();
final IndexShardSnapshotStatus snapshotStatus = context.status();
snapshotStatus.updateDebugStatusString("snapshot task runner: setting up shard snapshot");
final long startTime = threadPool.absoluteTimeInMillis();
try {
final ShardGeneration generation = snapshotStatus.generation();
Expand All @@ -3206,6 +3208,7 @@ private void doSnapshotShard(SnapshotShardContext context) {
final Set<String> blobs;
if (generation == null) {
snapshotStatus.ensureNotAborted();
snapshotStatus.updateDebugStatusString("snapshot task runner: listing blob prefixes");
try {
blobs = shardContainer.listBlobsByPrefix(OperationPurpose.SNAPSHOT_METADATA, SNAPSHOT_INDEX_PREFIX).keySet();
} catch (IOException e) {
Expand All @@ -3216,6 +3219,7 @@ private void doSnapshotShard(SnapshotShardContext context) {
}

snapshotStatus.ensureNotAborted();
snapshotStatus.updateDebugStatusString("snapshot task runner: loading snapshot blobs");
Tuple<BlobStoreIndexShardSnapshots, ShardGeneration> tuple = buildBlobStoreIndexShardSnapshots(
context.indexId(),
shardId.id(),
Expand Down Expand Up @@ -3316,6 +3320,7 @@ private void doSnapshotShard(SnapshotShardContext context) {
indexCommitPointFiles = filesFromSegmentInfos;
}

snapshotStatus.updateDebugStatusString("snapshot task runner: starting shard snapshot");
snapshotStatus.moveToStarted(
startTime,
indexIncrementalFileCount,
Expand All @@ -3342,6 +3347,7 @@ private void doSnapshotShard(SnapshotShardContext context) {
BlobStoreIndexShardSnapshot.FileInfo.SERIALIZE_WRITER_UUID,
Boolean.toString(writeFileInfoWriterUUID)
);
snapshotStatus.updateDebugStatusString("snapshot task runner: updating blob store with new shard generation");
INDEX_SHARD_SNAPSHOTS_FORMAT.write(
updatedBlobStoreIndexShardSnapshots,
shardContainer,
Expand Down Expand Up @@ -3387,6 +3393,7 @@ private void doSnapshotShard(SnapshotShardContext context) {
BlobStoreIndexShardSnapshot.FileInfo.SERIALIZE_WRITER_UUID,
Boolean.toString(writeFileInfoWriterUUID)
);
snapshotStatus.updateDebugStatusString("no shard generations: writing new index-${N} file");
writeShardIndexBlobAtomic(shardContainer, newGen, updatedBlobStoreIndexShardSnapshots, serializationParams);
} catch (IOException e) {
throw new IndexShardSnapshotFailedException(
Expand All @@ -3401,6 +3408,7 @@ private void doSnapshotShard(SnapshotShardContext context) {
}
snapshotStatus.addProcessedFiles(finalFilesInShardMetadataCount, finalFilesInShardMetadataSize);
try {
snapshotStatus.updateDebugStatusString("no shard generations: deleting blobs");
deleteFromContainer(OperationPurpose.SNAPSHOT_METADATA, shardContainer, blobsToDelete.iterator());
} catch (IOException e) {
logger.warn(
Expand All @@ -3414,6 +3422,7 @@ private void doSnapshotShard(SnapshotShardContext context) {
// filesToSnapshot will be emptied while snapshotting the file. We make a copy here for cleanup purpose in case of failure.
final AtomicReference<List<FileInfo>> fileToCleanUp = new AtomicReference<>(List.copyOf(filesToSnapshot));
final ActionListener<Collection<Void>> allFilesUploadedListener = ActionListener.assertOnce(ActionListener.wrap(ignore -> {
snapshotStatus.updateDebugStatusString("all files uploaded: finalizing");
final IndexShardSnapshotStatus.Copy lastSnapshotStatus = snapshotStatus.moveToFinalize();

// now create and write the commit point
Expand All @@ -3435,6 +3444,7 @@ private void doSnapshotShard(SnapshotShardContext context) {
BlobStoreIndexShardSnapshot.FileInfo.SERIALIZE_WRITER_UUID,
Boolean.toString(writeFileInfoWriterUUID)
);
snapshotStatus.updateDebugStatusString("all files uploaded: writing to index shard file");
INDEX_SHARD_SNAPSHOT_FORMAT.write(
blobStoreIndexShardSnapshot,
shardContainer,
Expand All @@ -3451,10 +3461,12 @@ private void doSnapshotShard(SnapshotShardContext context) {
ByteSizeValue.ofBytes(blobStoreIndexShardSnapshot.totalSize()),
getSegmentInfoFileCount(blobStoreIndexShardSnapshot.indexFiles())
);
snapshotStatus.updateDebugStatusString("all files uploaded: done");
snapshotStatus.moveToDone(threadPool.absoluteTimeInMillis(), shardSnapshotResult);
context.onResponse(shardSnapshotResult);
}, e -> {
try {
snapshotStatus.updateDebugStatusString("all files uploaded: cleaning up data files, exception while finalizing: " + e);
shardContainer.deleteBlobsIgnoringIfNotExists(
OperationPurpose.SNAPSHOT_DATA,
Iterators.flatMap(fileToCleanUp.get().iterator(), f -> Iterators.forRange(0, f.numberOfParts(), f::partName))
Expand Down Expand Up @@ -3519,6 +3531,7 @@ protected void snapshotFiles(
) {
final int noOfFilesToSnapshot = filesToSnapshot.size();
final ActionListener<Void> filesListener = fileQueueListener(filesToSnapshot, noOfFilesToSnapshot, allFilesUploadedListener);
context.status().updateDebugStatusString("enqueued file snapshot tasks: threads running concurrent file uploads");
for (int i = 0; i < noOfFilesToSnapshot; i++) {
shardSnapshotTaskRunner.enqueueFileSnapshot(context, filesToSnapshot::poll, filesListener);
}
Expand Down
Loading