Skip to content

Commit

Permalink
Merge branch 'main' into kderusso/semantic-text-match-query-support
Browse files Browse the repository at this point in the history
  • Loading branch information
kderusso authored Dec 10, 2024
2 parents 7dfa0a2 + 47be542 commit b6b7ab4
Show file tree
Hide file tree
Showing 25 changed files with 480 additions and 43 deletions.
5 changes: 5 additions & 0 deletions docs/changelog/118354.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 118354
summary: Fix log message format bugs
area: Ingest Node
type: bug
issues: []
5 changes: 5 additions & 0 deletions docs/changelog/118378.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 118378
summary: Opt into extra data stream resolution
area: ES|QL
type: bug
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -1025,7 +1025,7 @@ public void onResponse(Void unused) {
// should be no other processes interacting with the repository.
logger.warn(
Strings.format(
"failed to clean up multipart upload [{}] of blob [{}][{}][{}]",
"failed to clean up multipart upload [%s] of blob [%s][%s][%s]",
abortMultipartUploadRequest.getUploadId(),
blobStore.getRepositoryMetadata().name(),
abortMultipartUploadRequest.getBucketName(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ private String getRollupIndexName() throws IOException {
if (asMap.size() == 1) {
return (String) asMap.keySet().toArray()[0];
}
logger.warn("--> No matching rollup name for path [%s]", endpoint);
logger.warn("--> No matching rollup name for path [{}]", endpoint);
return null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ private String getRollupIndexName() throws IOException {
if (asMap.size() == 1) {
return (String) asMap.keySet().toArray()[0];
}
logger.warn("--> No matching rollup name for path [%s]", endpoint);
logger.warn("--> No matching rollup name for path [{}]", endpoint);
return null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,7 @@ public void testRetentionLeasesSyncOnRecovery() throws Exception {
.getShardOrNull(new ShardId(resolveIndex("index"), 0));
final int length = randomIntBetween(1, 8);
final Map<String, RetentionLease> currentRetentionLeases = new LinkedHashMap<>();
logger.info("adding retention [{}}] leases", length);
logger.info("adding retention [{}] leases", length);
for (int i = 0; i < length; i++) {
final String id = randomValueOtherThanMany(currentRetentionLeases.keySet()::contains, () -> randomAlphaOfLength(8));
final long retainingSequenceNumber = randomLongBetween(0, Long.MAX_VALUE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -524,6 +524,15 @@ public void testSnapshotShutdownProgressTracker() throws Exception {
"Pause signals have been set for all shard snapshots on data node [" + nodeForRemovalId + "]"
)
);
mockLog.addExpectation(
new MockLog.SeenEventExpectation(
"SnapshotShutdownProgressTracker index shard snapshot status messages",
SnapshotShutdownProgressTracker.class.getCanonicalName(),
Level.INFO,
// Expect the shard snapshot to stall in data file upload, since we've blocked the data node file upload to the blob store.
"statusDescription='enqueued file snapshot tasks: threads running concurrent file uploads'"
)
);

putShutdownForRemovalMetadata(nodeForRemoval, clusterService);

Expand Down Expand Up @@ -583,6 +592,14 @@ public void testSnapshotShutdownProgressTracker() throws Exception {
"Current active shard snapshot stats on data node [" + nodeForRemovalId + "]*Paused [" + numShards + "]"
)
);
mockLog.addExpectation(
new MockLog.SeenEventExpectation(
"SnapshotShutdownProgressTracker index shard snapshot messages",
SnapshotShutdownProgressTracker.class.getCanonicalName(),
Level.INFO,
"statusDescription='finished: master notification attempt complete'"
)
);

// Release the master node to respond
snapshotStatusUpdateLatch.countDown();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@
import org.elasticsearch.node.Node;
import org.elasticsearch.node.NodeValidationException;
import org.elasticsearch.plugins.PluginsLoader;
import org.elasticsearch.rest.MethodHandlers;
import org.elasticsearch.transport.RequestHandlerRegistry;

import java.io.IOException;
import java.io.InputStream;
Expand Down Expand Up @@ -201,7 +203,11 @@ private static void initPhase2(Bootstrap bootstrap) throws IOException {
SubscribableListener.class,
RunOnce.class,
// We eagerly initialize to work around log4j permissions & JDK-8309727
VectorUtil.class
VectorUtil.class,
// RequestHandlerRegistry and MethodHandlers classes do nontrivial static initialization which should always succeed but load
// it now (before SM) to be sure
RequestHandlerRegistry.class,
MethodHandlers.class
);

// load the plugin Java modules and layers now for use in entitlements
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ public enum AbortStatus {
private long processedSize;
private String failure;
private final SubscribableListener<AbortStatus> abortListeners = new SubscribableListener<>();
private volatile String statusDescription;

private IndexShardSnapshotStatus(
final Stage stage,
Expand All @@ -110,7 +111,8 @@ private IndexShardSnapshotStatus(
final long totalSize,
final long processedSize,
final String failure,
final ShardGeneration generation
final ShardGeneration generation,
final String statusDescription
) {
this.stage = new AtomicReference<>(Objects.requireNonNull(stage));
this.generation = new AtomicReference<>(generation);
Expand All @@ -124,6 +126,7 @@ private IndexShardSnapshotStatus(
this.processedSize = processedSize;
this.incrementalSize = incrementalSize;
this.failure = failure;
updateStatusDescription(statusDescription);
}

public synchronized Copy moveToStarted(
Expand Down Expand Up @@ -272,6 +275,15 @@ public synchronized void addProcessedFiles(int count, long totalSize) {
processedSize += totalSize;
}

/**
* Updates the string explanation for what the snapshot is actively doing right now.
*/
public void updateStatusDescription(String statusString) {
assert statusString != null;
assert statusString.isEmpty() == false;
this.statusDescription = statusString;
}

/**
* Returns a copy of the current {@link IndexShardSnapshotStatus}. This method is
* intended to be used when a coherent state of {@link IndexShardSnapshotStatus} is needed.
Expand All @@ -289,20 +301,21 @@ public synchronized IndexShardSnapshotStatus.Copy asCopy() {
incrementalSize,
totalSize,
processedSize,
failure
failure,
statusDescription
);
}

public static IndexShardSnapshotStatus newInitializing(ShardGeneration generation) {
return new IndexShardSnapshotStatus(Stage.INIT, 0L, 0L, 0, 0, 0, 0, 0, 0, null, generation);
return new IndexShardSnapshotStatus(Stage.INIT, 0L, 0L, 0, 0, 0, 0, 0, 0, null, generation, "initializing");
}

public static IndexShardSnapshotStatus.Copy newFailed(final String failure) {
assert failure != null : "expecting non null failure for a failed IndexShardSnapshotStatus";
if (failure == null) {
throw new IllegalArgumentException("A failure description is required for a failed IndexShardSnapshotStatus");
}
return new IndexShardSnapshotStatus(Stage.FAILURE, 0L, 0L, 0, 0, 0, 0, 0, 0, failure, null).asCopy();
return new IndexShardSnapshotStatus(Stage.FAILURE, 0L, 0L, 0, 0, 0, 0, 0, 0, failure, null, "initialized as failed").asCopy();
}

public static IndexShardSnapshotStatus.Copy newDone(
Expand All @@ -326,7 +339,8 @@ public static IndexShardSnapshotStatus.Copy newDone(
size,
incrementalSize,
null,
generation
generation,
"initialized as done"
).asCopy();
}

Expand All @@ -345,6 +359,7 @@ public static class Copy {
private final long processedSize;
private final long incrementalSize;
private final String failure;
private final String statusDescription;

public Copy(
final Stage stage,
Expand All @@ -356,7 +371,8 @@ public Copy(
final long incrementalSize,
final long totalSize,
final long processedSize,
final String failure
final String failure,
final String statusDescription
) {
this.stage = stage;
this.startTime = startTime;
Expand All @@ -368,6 +384,7 @@ public Copy(
this.processedSize = processedSize;
this.incrementalSize = incrementalSize;
this.failure = failure;
this.statusDescription = statusDescription;
}

public Stage getStage() {
Expand Down Expand Up @@ -410,6 +427,10 @@ public String getFailure() {
return failure;
}

public String getStatusDescription() {
return statusDescription;
}

@Override
public String toString() {
return "index shard snapshot status ("
Expand All @@ -433,6 +454,8 @@ public String toString() {
+ processedSize
+ ", failure='"
+ failure
+ "', statusDescription='"
+ statusDescription
+ '\''
+ ')';
}
Expand Down Expand Up @@ -461,6 +484,8 @@ public String toString() {
+ processedSize
+ ", failure='"
+ failure
+ "', statusDescription='"
+ statusDescription
+ '\''
+ ')';
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3186,6 +3186,7 @@ private void writeAtomic(

@Override
public void snapshotShard(SnapshotShardContext context) {
context.status().updateStatusDescription("queued in snapshot task runner");
shardSnapshotTaskRunner.enqueueShardSnapshot(context);
}

Expand All @@ -3198,6 +3199,7 @@ private void doSnapshotShard(SnapshotShardContext context) {
final ShardId shardId = store.shardId();
final SnapshotId snapshotId = context.snapshotId();
final IndexShardSnapshotStatus snapshotStatus = context.status();
snapshotStatus.updateStatusDescription("snapshot task runner: setting up shard snapshot");
final long startTime = threadPool.absoluteTimeInMillis();
try {
final ShardGeneration generation = snapshotStatus.generation();
Expand All @@ -3206,6 +3208,7 @@ private void doSnapshotShard(SnapshotShardContext context) {
final Set<String> blobs;
if (generation == null) {
snapshotStatus.ensureNotAborted();
snapshotStatus.updateStatusDescription("snapshot task runner: listing blob prefixes");
try {
blobs = shardContainer.listBlobsByPrefix(OperationPurpose.SNAPSHOT_METADATA, SNAPSHOT_INDEX_PREFIX).keySet();
} catch (IOException e) {
Expand All @@ -3216,6 +3219,7 @@ private void doSnapshotShard(SnapshotShardContext context) {
}

snapshotStatus.ensureNotAborted();
snapshotStatus.updateStatusDescription("snapshot task runner: loading snapshot blobs");
Tuple<BlobStoreIndexShardSnapshots, ShardGeneration> tuple = buildBlobStoreIndexShardSnapshots(
context.indexId(),
shardId.id(),
Expand Down Expand Up @@ -3316,6 +3320,7 @@ private void doSnapshotShard(SnapshotShardContext context) {
indexCommitPointFiles = filesFromSegmentInfos;
}

snapshotStatus.updateStatusDescription("snapshot task runner: starting shard snapshot");
snapshotStatus.moveToStarted(
startTime,
indexIncrementalFileCount,
Expand All @@ -3342,6 +3347,7 @@ private void doSnapshotShard(SnapshotShardContext context) {
BlobStoreIndexShardSnapshot.FileInfo.SERIALIZE_WRITER_UUID,
Boolean.toString(writeFileInfoWriterUUID)
);
snapshotStatus.updateStatusDescription("snapshot task runner: updating blob store with new shard generation");
INDEX_SHARD_SNAPSHOTS_FORMAT.write(
updatedBlobStoreIndexShardSnapshots,
shardContainer,
Expand Down Expand Up @@ -3387,6 +3393,7 @@ private void doSnapshotShard(SnapshotShardContext context) {
BlobStoreIndexShardSnapshot.FileInfo.SERIALIZE_WRITER_UUID,
Boolean.toString(writeFileInfoWriterUUID)
);
snapshotStatus.updateStatusDescription("no shard generations: writing new index-${N} file");
writeShardIndexBlobAtomic(shardContainer, newGen, updatedBlobStoreIndexShardSnapshots, serializationParams);
} catch (IOException e) {
throw new IndexShardSnapshotFailedException(
Expand All @@ -3401,6 +3408,7 @@ private void doSnapshotShard(SnapshotShardContext context) {
}
snapshotStatus.addProcessedFiles(finalFilesInShardMetadataCount, finalFilesInShardMetadataSize);
try {
snapshotStatus.updateStatusDescription("no shard generations: deleting blobs");
deleteFromContainer(OperationPurpose.SNAPSHOT_METADATA, shardContainer, blobsToDelete.iterator());
} catch (IOException e) {
logger.warn(
Expand All @@ -3414,6 +3422,7 @@ private void doSnapshotShard(SnapshotShardContext context) {
// filesToSnapshot will be emptied while snapshotting the file. We make a copy here for cleanup purpose in case of failure.
final AtomicReference<List<FileInfo>> fileToCleanUp = new AtomicReference<>(List.copyOf(filesToSnapshot));
final ActionListener<Collection<Void>> allFilesUploadedListener = ActionListener.assertOnce(ActionListener.wrap(ignore -> {
snapshotStatus.updateStatusDescription("all files uploaded: finalizing");
final IndexShardSnapshotStatus.Copy lastSnapshotStatus = snapshotStatus.moveToFinalize();

// now create and write the commit point
Expand All @@ -3435,6 +3444,7 @@ private void doSnapshotShard(SnapshotShardContext context) {
BlobStoreIndexShardSnapshot.FileInfo.SERIALIZE_WRITER_UUID,
Boolean.toString(writeFileInfoWriterUUID)
);
snapshotStatus.updateStatusDescription("all files uploaded: writing to index shard file");
INDEX_SHARD_SNAPSHOT_FORMAT.write(
blobStoreIndexShardSnapshot,
shardContainer,
Expand All @@ -3451,10 +3461,12 @@ private void doSnapshotShard(SnapshotShardContext context) {
ByteSizeValue.ofBytes(blobStoreIndexShardSnapshot.totalSize()),
getSegmentInfoFileCount(blobStoreIndexShardSnapshot.indexFiles())
);
snapshotStatus.updateStatusDescription("all files uploaded: done");
snapshotStatus.moveToDone(threadPool.absoluteTimeInMillis(), shardSnapshotResult);
context.onResponse(shardSnapshotResult);
}, e -> {
try {
snapshotStatus.updateStatusDescription("all files uploaded: cleaning up data files, exception while finalizing: " + e);
shardContainer.deleteBlobsIgnoringIfNotExists(
OperationPurpose.SNAPSHOT_DATA,
Iterators.flatMap(fileToCleanUp.get().iterator(), f -> Iterators.forRange(0, f.numberOfParts(), f::partName))
Expand Down Expand Up @@ -3484,12 +3496,10 @@ private static void ensureNotAborted(ShardId shardId, SnapshotId snapshotId, Ind
// A normally running shard snapshot should be in stage INIT or STARTED. And we know it's not in PAUSING or ABORTED because
// the ensureNotAborted() call above did not throw. The remaining options don't make sense, if they ever happen.
logger.error(
() -> Strings.format(
"Shard snapshot found an unexpected state. ShardId [{}], SnapshotID [{}], Stage [{}]",
shardId,
snapshotId,
shardSnapshotStage
)
"Shard snapshot found an unexpected state. ShardId [{}], SnapshotID [{}], Stage [{}]",
shardId,
snapshotId,
shardSnapshotStage
);
assert false;
}
Expand Down Expand Up @@ -3519,6 +3529,7 @@ protected void snapshotFiles(
) {
final int noOfFilesToSnapshot = filesToSnapshot.size();
final ActionListener<Void> filesListener = fileQueueListener(filesToSnapshot, noOfFilesToSnapshot, allFilesUploadedListener);
context.status().updateStatusDescription("enqueued file snapshot tasks: threads running concurrent file uploads");
for (int i = 0; i < noOfFilesToSnapshot; i++) {
shardSnapshotTaskRunner.enqueueFileSnapshot(context, filesToSnapshot::poll, filesListener);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,13 @@
/**
* Encapsulate multiple handlers for the same path, allowing different handlers for different HTTP verbs and versions.
*/
final class MethodHandlers {
public final class MethodHandlers {

private final String path;
private final Map<RestRequest.Method, Map<RestApiVersion, RestHandler>> methodHandlers;

@SuppressWarnings("unused") // only accessed via #STATS_TRACKER_HANDLE, lazy initialized because instances consume non-trivial heap
private volatile HttpRouteStatsTracker statsTracker;
private HttpRouteStatsTracker statsTracker;

private static final VarHandle STATS_TRACKER_HANDLE;

Expand Down
Loading

0 comments on commit b6b7ab4

Please sign in to comment.