From 56379a314377f08c4fc1b365e4052de04d6cc8d6 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Tue, 10 Dec 2024 21:01:18 +0100 Subject: [PATCH 1/6] Lazily initialize TransportActionStats in RequestHandlerRegistry (#117435) Same as #114107 but for the transport side. We are consuming more than 0.5M for these stat instances and a large fraction if not the majority will go unused in many use-cases, given how expensive stats are in terms of volatile operations the overhead of an added `getAcquire` is trivial in the absence of writes. Outside of reducing the prod footprint this is also kind of nice in x-pack internal cluster tests in particular where it saves quite a bit of heap. --- .../bootstrap/Elasticsearch.java | 8 +++- .../elasticsearch/rest/MethodHandlers.java | 4 +- .../transport/RequestHandlerRegistry.java | 39 +++++++++++++++++-- .../transport/TransportActionStats.java | 2 + 4 files changed, 47 insertions(+), 6 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/bootstrap/Elasticsearch.java b/server/src/main/java/org/elasticsearch/bootstrap/Elasticsearch.java index 04f93494a50f1..ae59f6578f03a 100644 --- a/server/src/main/java/org/elasticsearch/bootstrap/Elasticsearch.java +++ b/server/src/main/java/org/elasticsearch/bootstrap/Elasticsearch.java @@ -44,6 +44,8 @@ import org.elasticsearch.node.Node; import org.elasticsearch.node.NodeValidationException; import org.elasticsearch.plugins.PluginsLoader; +import org.elasticsearch.rest.MethodHandlers; +import org.elasticsearch.transport.RequestHandlerRegistry; import java.io.IOException; import java.io.InputStream; @@ -201,7 +203,11 @@ private static void initPhase2(Bootstrap bootstrap) throws IOException { SubscribableListener.class, RunOnce.class, // We eagerly initialize to work around log4j permissions & JDK-8309727 - VectorUtil.class + VectorUtil.class, + // RequestHandlerRegistry and MethodHandlers classes do nontrivial static initialization which should always succeed but load + // it now (before SM) to be sure + RequestHandlerRegistry.class, + MethodHandlers.class ); // load the plugin Java modules and layers now for use in entitlements diff --git a/server/src/main/java/org/elasticsearch/rest/MethodHandlers.java b/server/src/main/java/org/elasticsearch/rest/MethodHandlers.java index 2f53f48f9ae5b..bfb8ca018fca2 100644 --- a/server/src/main/java/org/elasticsearch/rest/MethodHandlers.java +++ b/server/src/main/java/org/elasticsearch/rest/MethodHandlers.java @@ -22,13 +22,13 @@ /** * Encapsulate multiple handlers for the same path, allowing different handlers for different HTTP verbs and versions. */ -final class MethodHandlers { +public final class MethodHandlers { private final String path; private final Map> methodHandlers; @SuppressWarnings("unused") // only accessed via #STATS_TRACKER_HANDLE, lazy initialized because instances consume non-trivial heap - private volatile HttpRouteStatsTracker statsTracker; + private HttpRouteStatsTracker statsTracker; private static final VarHandle STATS_TRACKER_HANDLE; diff --git a/server/src/main/java/org/elasticsearch/transport/RequestHandlerRegistry.java b/server/src/main/java/org/elasticsearch/transport/RequestHandlerRegistry.java index bfe6377b495ab..84a8ee1b2ebbf 100644 --- a/server/src/main/java/org/elasticsearch/transport/RequestHandlerRegistry.java +++ b/server/src/main/java/org/elasticsearch/transport/RequestHandlerRegistry.java @@ -19,6 +19,8 @@ import org.elasticsearch.telemetry.tracing.Tracer; import java.io.IOException; +import java.lang.invoke.MethodHandles; +import java.lang.invoke.VarHandle; import java.util.concurrent.Executor; import static org.elasticsearch.core.Releasables.assertOnce; @@ -33,7 +35,19 @@ public class RequestHandlerRegistry implements private final TaskManager taskManager; private final Tracer tracer; private final Writeable.Reader requestReader; - private final TransportActionStatsTracker statsTracker = new TransportActionStatsTracker(); + @SuppressWarnings("unused") // only accessed via #STATS_TRACKER_HANDLE, lazy initialized because instances consume non-trivial heap + private TransportActionStatsTracker statsTracker; + + private static final VarHandle STATS_TRACKER_HANDLE; + + static { + try { + STATS_TRACKER_HANDLE = MethodHandles.lookup() + .findVarHandle(RequestHandlerRegistry.class, "statsTracker", TransportActionStatsTracker.class); + } catch (Exception e) { + throw new ExceptionInInitializerError(e); + } + } public RequestHandlerRegistry( String action, @@ -118,15 +132,34 @@ public static RequestHandlerRegistry replaceHand } public void addRequestStats(int messageSize) { - statsTracker.addRequestStats(messageSize); + statsTracker().addRequestStats(messageSize); } @Override public void addResponseStats(int messageSize) { - statsTracker.addResponseStats(messageSize); + statsTracker().addResponseStats(messageSize); } public TransportActionStats getStats() { + var statsTracker = existingStatsTracker(); + if (statsTracker == null) { + return TransportActionStats.EMPTY; + } return statsTracker.getStats(); } + + private TransportActionStatsTracker statsTracker() { + var tracker = existingStatsTracker(); + if (tracker == null) { + var newTracker = new TransportActionStatsTracker(); + if ((tracker = (TransportActionStatsTracker) STATS_TRACKER_HANDLE.compareAndExchange(this, null, newTracker)) == null) { + tracker = newTracker; + } + } + return tracker; + } + + private TransportActionStatsTracker existingStatsTracker() { + return (TransportActionStatsTracker) STATS_TRACKER_HANDLE.getAcquire(this); + } } diff --git a/server/src/main/java/org/elasticsearch/transport/TransportActionStats.java b/server/src/main/java/org/elasticsearch/transport/TransportActionStats.java index feed042a5934e..f35443cdc8d6d 100644 --- a/server/src/main/java/org/elasticsearch/transport/TransportActionStats.java +++ b/server/src/main/java/org/elasticsearch/transport/TransportActionStats.java @@ -27,6 +27,8 @@ public record TransportActionStats( long[] responseSizeHistogram ) implements Writeable, ToXContentObject { + public static final TransportActionStats EMPTY = new TransportActionStats(0, 0, new long[0], 0, 0, new long[0]); + public TransportActionStats(StreamInput in) throws IOException { this(in.readVLong(), in.readVLong(), in.readVLongArray(), in.readVLong(), in.readVLong(), in.readVLongArray()); } From 537f4ce8718e6f238e5a519c18374718f8566748 Mon Sep 17 00:00:00 2001 From: Joe Gallo Date: Tue, 10 Dec 2024 15:01:53 -0500 Subject: [PATCH 2/6] Fix log message format bugs (#118354) --- docs/changelog/118354.yaml | 5 +++++ .../elasticsearch/repositories/s3/S3BlobContainer.java | 2 +- .../upgrades/FullClusterRestartDownsampleIT.java | 2 +- .../java/org/elasticsearch/upgrades/DownsampleIT.java | 2 +- .../elasticsearch/index/seqno/RetentionLeaseIT.java | 2 +- .../repositories/blobstore/BlobStoreRepository.java | 10 ++++------ .../xpack/downsample/DownsampleShardIndexer.java | 2 +- .../org/elasticsearch/xpack/enrich/EnrichPlugin.java | 2 +- 8 files changed, 15 insertions(+), 12 deletions(-) create mode 100644 docs/changelog/118354.yaml diff --git a/docs/changelog/118354.yaml b/docs/changelog/118354.yaml new file mode 100644 index 0000000000000..e2d72db121276 --- /dev/null +++ b/docs/changelog/118354.yaml @@ -0,0 +1,5 @@ +pr: 118354 +summary: Fix log message format bugs +area: Ingest Node +type: bug +issues: [] diff --git a/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java b/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java index f527dcd42814c..bf693222a4b72 100644 --- a/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java +++ b/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java @@ -1025,7 +1025,7 @@ public void onResponse(Void unused) { // should be no other processes interacting with the repository. logger.warn( Strings.format( - "failed to clean up multipart upload [{}] of blob [{}][{}][{}]", + "failed to clean up multipart upload [%s] of blob [%s][%s][%s]", abortMultipartUploadRequest.getUploadId(), blobStore.getRepositoryMetadata().name(), abortMultipartUploadRequest.getBucketName(), diff --git a/qa/full-cluster-restart/src/javaRestTest/java/org/elasticsearch/upgrades/FullClusterRestartDownsampleIT.java b/qa/full-cluster-restart/src/javaRestTest/java/org/elasticsearch/upgrades/FullClusterRestartDownsampleIT.java index d98d53baf9015..f907870fc8254 100644 --- a/qa/full-cluster-restart/src/javaRestTest/java/org/elasticsearch/upgrades/FullClusterRestartDownsampleIT.java +++ b/qa/full-cluster-restart/src/javaRestTest/java/org/elasticsearch/upgrades/FullClusterRestartDownsampleIT.java @@ -263,7 +263,7 @@ private String getRollupIndexName() throws IOException { if (asMap.size() == 1) { return (String) asMap.keySet().toArray()[0]; } - logger.warn("--> No matching rollup name for path [%s]", endpoint); + logger.warn("--> No matching rollup name for path [{}]", endpoint); return null; } diff --git a/qa/rolling-upgrade/src/javaRestTest/java/org/elasticsearch/upgrades/DownsampleIT.java b/qa/rolling-upgrade/src/javaRestTest/java/org/elasticsearch/upgrades/DownsampleIT.java index bca0c26ad2c32..b1212913b7fb0 100644 --- a/qa/rolling-upgrade/src/javaRestTest/java/org/elasticsearch/upgrades/DownsampleIT.java +++ b/qa/rolling-upgrade/src/javaRestTest/java/org/elasticsearch/upgrades/DownsampleIT.java @@ -238,7 +238,7 @@ private String getRollupIndexName() throws IOException { if (asMap.size() == 1) { return (String) asMap.keySet().toArray()[0]; } - logger.warn("--> No matching rollup name for path [%s]", endpoint); + logger.warn("--> No matching rollup name for path [{}]", endpoint); return null; } diff --git a/server/src/internalClusterTest/java/org/elasticsearch/index/seqno/RetentionLeaseIT.java b/server/src/internalClusterTest/java/org/elasticsearch/index/seqno/RetentionLeaseIT.java index 1c5d67d1fa40a..70689dc689673 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/index/seqno/RetentionLeaseIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/index/seqno/RetentionLeaseIT.java @@ -336,7 +336,7 @@ public void testRetentionLeasesSyncOnRecovery() throws Exception { .getShardOrNull(new ShardId(resolveIndex("index"), 0)); final int length = randomIntBetween(1, 8); final Map currentRetentionLeases = new LinkedHashMap<>(); - logger.info("adding retention [{}}] leases", length); + logger.info("adding retention [{}] leases", length); for (int i = 0; i < length; i++) { final String id = randomValueOtherThanMany(currentRetentionLeases.keySet()::contains, () -> randomAlphaOfLength(8)); final long retainingSequenceNumber = randomLongBetween(0, Long.MAX_VALUE); diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java index f1c3d82b74cab..cc22b60991703 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java @@ -3484,12 +3484,10 @@ private static void ensureNotAborted(ShardId shardId, SnapshotId snapshotId, Ind // A normally running shard snapshot should be in stage INIT or STARTED. And we know it's not in PAUSING or ABORTED because // the ensureNotAborted() call above did not throw. The remaining options don't make sense, if they ever happen. logger.error( - () -> Strings.format( - "Shard snapshot found an unexpected state. ShardId [{}], SnapshotID [{}], Stage [{}]", - shardId, - snapshotId, - shardSnapshotStage - ) + "Shard snapshot found an unexpected state. ShardId [{}], SnapshotID [{}], Stage [{}]", + shardId, + snapshotId, + shardSnapshotStage ); assert false; } diff --git a/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/DownsampleShardIndexer.java b/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/DownsampleShardIndexer.java index 81aea221ebb4f..50149ec2cbe58 100644 --- a/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/DownsampleShardIndexer.java +++ b/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/DownsampleShardIndexer.java @@ -386,7 +386,7 @@ public void collect(int docId, long owningBucketOrd) throws IOException { if (logger.isTraceEnabled()) { logger.trace( - "Doc: [{}] - _tsid: [{}], @timestamp: [{}}] -> downsample bucket ts: [{}]", + "Doc: [{}] - _tsid: [{}], @timestamp: [{}] -> downsample bucket ts: [{}]", docId, DocValueFormat.TIME_SERIES_ID.format(tsidHash), timestampFormat.format(timestamp), diff --git a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPlugin.java b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPlugin.java index d46639d700420..ef455acb645d9 100644 --- a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPlugin.java +++ b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPlugin.java @@ -172,7 +172,7 @@ public EnrichPlugin(final Settings settings) { if (settings.hasValue(CACHE_SIZE_SETTING_NAME)) { throw new IllegalArgumentException( Strings.format( - "Both [{}] and [{}] are set, please use [{}]", + "Both [%s] and [%s] are set, please use [%s]", CACHE_SIZE_SETTING_NAME, CACHE_SIZE_SETTING_BWC_NAME, CACHE_SIZE_SETTING_NAME From 5406850abf7a83e8078001dc7d81081fa6e77bc8 Mon Sep 17 00:00:00 2001 From: Keith Massey Date: Tue, 10 Dec 2024 14:35:49 -0600 Subject: [PATCH 3/6] Disabling ReindexDataStreamTransportActionIT in release mode (#118377) The actions in the migrate plugin are currently behind a feature flag. This disables the integration test for them if the flag is not set. Closes #118376 --- .../migrate/action/ReindexDataStreamTransportActionIT.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/x-pack/plugin/migrate/src/internalClusterTest/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamTransportActionIT.java b/x-pack/plugin/migrate/src/internalClusterTest/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamTransportActionIT.java index 515250bb58a94..7f2243ed76849 100644 --- a/x-pack/plugin/migrate/src/internalClusterTest/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamTransportActionIT.java +++ b/x-pack/plugin/migrate/src/internalClusterTest/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamTransportActionIT.java @@ -40,6 +40,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicReference; +import static org.elasticsearch.xpack.migrate.action.ReindexDataStreamAction.REINDEX_DATA_STREAM_FEATURE_FLAG; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; @@ -51,6 +52,7 @@ protected Collection> nodePlugins() { } public void testNonExistentDataStream() { + assumeTrue("requires the migration reindex feature flag", REINDEX_DATA_STREAM_FEATURE_FLAG.isEnabled()); String nonExistentDataStreamName = randomAlphaOfLength(50); ReindexDataStreamRequest reindexDataStreamRequest = new ReindexDataStreamRequest( ReindexDataStreamAction.Mode.UPGRADE, @@ -64,6 +66,7 @@ public void testNonExistentDataStream() { } public void testAlreadyUpToDateDataStream() throws Exception { + assumeTrue("requires the migration reindex feature flag", REINDEX_DATA_STREAM_FEATURE_FLAG.isEnabled()); String dataStreamName = randomAlphaOfLength(50).toLowerCase(Locale.ROOT); ReindexDataStreamRequest reindexDataStreamRequest = new ReindexDataStreamRequest( ReindexDataStreamAction.Mode.UPGRADE, From 87b0f72cf571491b7535c7a96024b8d7e55a0d01 Mon Sep 17 00:00:00 2001 From: Nik Everett Date: Tue, 10 Dec 2024 16:28:12 -0500 Subject: [PATCH 4/6] ESQL: Opt into extra data stream resolution (#118378) * ESQL: Opt into extra data stream resolution This opts ESQL's data node request into extra data stream resolution. * Update docs/changelog/118378.yaml --- docs/changelog/118378.yaml | 5 + .../xpack/esql/EsqlSecurityIT.java | 129 ++++++++++++++++++ .../src/javaRestTest/resources/roles.yml | 54 ++++++++ .../xpack/esql/plugin/DataNodeRequest.java | 5 + 4 files changed, 193 insertions(+) create mode 100644 docs/changelog/118378.yaml diff --git a/docs/changelog/118378.yaml b/docs/changelog/118378.yaml new file mode 100644 index 0000000000000..d6c388b671968 --- /dev/null +++ b/docs/changelog/118378.yaml @@ -0,0 +1,5 @@ +pr: 118378 +summary: Opt into extra data stream resolution +area: ES|QL +type: bug +issues: [] diff --git a/x-pack/plugin/esql/qa/security/src/javaRestTest/java/org/elasticsearch/xpack/esql/EsqlSecurityIT.java b/x-pack/plugin/esql/qa/security/src/javaRestTest/java/org/elasticsearch/xpack/esql/EsqlSecurityIT.java index f8f1fe872711d..9566aeb8f28dc 100644 --- a/x-pack/plugin/esql/qa/security/src/javaRestTest/java/org/elasticsearch/xpack/esql/EsqlSecurityIT.java +++ b/x-pack/plugin/esql/qa/security/src/javaRestTest/java/org/elasticsearch/xpack/esql/EsqlSecurityIT.java @@ -34,6 +34,7 @@ import java.util.Locale; import java.util.Map; +import static org.elasticsearch.test.ListMatcher.matchesList; import static org.elasticsearch.test.MapMatcher.assertMap; import static org.elasticsearch.test.MapMatcher.matchesMap; import static org.hamcrest.Matchers.containsString; @@ -56,6 +57,11 @@ public class EsqlSecurityIT extends ESRestTestCase { .user("metadata1_read2", "x-pack-test-password", "metadata1_read2", false) .user("alias_user1", "x-pack-test-password", "alias_user1", false) .user("alias_user2", "x-pack-test-password", "alias_user2", false) + .user("logs_foo_all", "x-pack-test-password", "logs_foo_all", false) + .user("logs_foo_16_only", "x-pack-test-password", "logs_foo_16_only", false) + .user("logs_foo_after_2021", "x-pack-test-password", "logs_foo_after_2021", false) + .user("logs_foo_after_2021_pattern", "x-pack-test-password", "logs_foo_after_2021_pattern", false) + .user("logs_foo_after_2021_alias", "x-pack-test-password", "logs_foo_after_2021_alias", false) .build(); @Override @@ -342,6 +348,14 @@ public void testDocumentLevelSecurity() throws Exception { assertThat(respMap.get("values"), equalTo(List.of(List.of(10.0)))); } + public void testDocumentLevelSecurityFromStar() throws Exception { + Response resp = runESQLCommand("user3", "from in*x | stats sum=sum(value)"); + assertOK(resp); + Map respMap = entityAsMap(resp); + assertThat(respMap.get("columns"), equalTo(List.of(Map.of("name", "sum", "type", "double")))); + assertThat(respMap.get("values"), equalTo(List.of(List.of(10.0)))); + } + public void testFieldLevelSecurityAllow() throws Exception { Response resp = runESQLCommand("fls_user", "FROM index* | SORT value | LIMIT 1"); assertOK(resp); @@ -545,6 +559,22 @@ private void removeEnrichPolicy() throws Exception { client().performRequest(new Request("DELETE", "_enrich/policy/songs")); } + public void testDataStream() throws IOException { + createDataStream(); + MapMatcher twoResults = matchesMap().extraOk().entry("values", matchesList().item(matchesList().item(2))); + MapMatcher oneResult = matchesMap().extraOk().entry("values", matchesList().item(matchesList().item(1))); + assertMap(entityAsMap(runESQLCommand("logs_foo_all", "FROM logs-foo | STATS COUNT(*)")), twoResults); + assertMap(entityAsMap(runESQLCommand("logs_foo_16_only", "FROM logs-foo | STATS COUNT(*)")), oneResult); + assertMap(entityAsMap(runESQLCommand("logs_foo_after_2021", "FROM logs-foo | STATS COUNT(*)")), oneResult); + assertMap(entityAsMap(runESQLCommand("logs_foo_after_2021_pattern", "FROM logs-foo | STATS COUNT(*)")), oneResult); + assertMap(entityAsMap(runESQLCommand("logs_foo_after_2021_alias", "FROM alias-foo | STATS COUNT(*)")), oneResult); + assertMap(entityAsMap(runESQLCommand("logs_foo_all", "FROM logs-* | STATS COUNT(*)")), twoResults); + assertMap(entityAsMap(runESQLCommand("logs_foo_16_only", "FROM logs-* | STATS COUNT(*)")), oneResult); + assertMap(entityAsMap(runESQLCommand("logs_foo_after_2021", "FROM logs-* | STATS COUNT(*)")), oneResult); + assertMap(entityAsMap(runESQLCommand("logs_foo_after_2021_pattern", "FROM logs-* | STATS COUNT(*)")), oneResult); + assertMap(entityAsMap(runESQLCommand("logs_foo_after_2021_alias", "FROM alias-* | STATS COUNT(*)")), oneResult); + } + protected Response runESQLCommand(String user, String command) throws IOException { if (command.toLowerCase(Locale.ROOT).contains("limit") == false) { // add a (high) limit to avoid warnings on default limit @@ -592,4 +622,103 @@ static Settings randomPragmas() { } return settings.build(); } + + private void createDataStream() throws IOException { + createDataStreamPolicy(); + createDataStreamComponentTemplate(); + createDataStreamIndexTemplate(); + createDataStreamDocuments(); + createDataStreamAlias(); + } + + private void createDataStreamPolicy() throws IOException { + Request request = new Request("PUT", "_ilm/policy/my-lifecycle-policy"); + request.setJsonEntity(""" + { + "policy": { + "phases": { + "hot": { + "actions": { + "rollover": { + "max_primary_shard_size": "50gb" + } + } + }, + "delete": { + "min_age": "735d", + "actions": { + "delete": {} + } + } + } + } + }"""); + client().performRequest(request); + } + + private void createDataStreamComponentTemplate() throws IOException { + Request request = new Request("PUT", "_component_template/my-template"); + request.setJsonEntity(""" + { + "template": { + "settings": { + "index.lifecycle.name": "my-lifecycle-policy" + }, + "mappings": { + "properties": { + "@timestamp": { + "type": "date", + "format": "date_optional_time||epoch_millis" + }, + "data_stream": { + "properties": { + "namespace": {"type": "keyword"} + } + } + } + } + } + }"""); + client().performRequest(request); + } + + private void createDataStreamIndexTemplate() throws IOException { + Request request = new Request("PUT", "_index_template/my-index-template"); + request.setJsonEntity(""" + { + "index_patterns": ["logs-*"], + "data_stream": {}, + "composed_of": ["my-template"], + "priority": 500 + }"""); + client().performRequest(request); + } + + private void createDataStreamDocuments() throws IOException { + Request request = new Request("POST", "logs-foo/_bulk"); + request.addParameter("refresh", ""); + request.setJsonEntity(""" + { "create" : {} } + { "@timestamp": "2099-05-06T16:21:15.000Z", "data_stream": {"namespace": "16"} } + { "create" : {} } + { "@timestamp": "2001-05-06T16:21:15.000Z", "data_stream": {"namespace": "17"} } + """); + assertMap(entityAsMap(client().performRequest(request)), matchesMap().extraOk().entry("errors", false)); + } + + private void createDataStreamAlias() throws IOException { + Request request = new Request("PUT", "_alias"); + request.setJsonEntity(""" + { + "actions": [ + { + "add": { + "index": "logs-foo", + "alias": "alias-foo" + } + } + ] + }"""); + assertMap(entityAsMap(client().performRequest(request)), matchesMap().extraOk().entry("errors", false)); + } } diff --git a/x-pack/plugin/esql/qa/security/src/javaRestTest/resources/roles.yml b/x-pack/plugin/esql/qa/security/src/javaRestTest/resources/roles.yml index 5c0164782d181..365a072edb74e 100644 --- a/x-pack/plugin/esql/qa/security/src/javaRestTest/resources/roles.yml +++ b/x-pack/plugin/esql/qa/security/src/javaRestTest/resources/roles.yml @@ -92,3 +92,57 @@ fls_user: privileges: [ 'read' ] field_security: grant: [ value ] + +logs_foo_all: + cluster: [] + indices: + - names: [ 'logs-foo' ] + privileges: [ 'read' ] + +logs_foo_16_only: + cluster: [] + indices: + - names: [ 'logs-foo' ] + privileges: [ 'read' ] + query: | + { + "term": { + "data_stream.namespace": "16" + } + } + +logs_foo_after_2021: + cluster: [] + indices: + - names: [ 'logs-foo' ] + privileges: [ 'read' ] + query: | + { + "range": { + "@timestamp": {"gte": "2021-01-01T00:00:00"} + } + } + +logs_foo_after_2021_pattern: + cluster: [] + indices: + - names: [ 'logs-*' ] + privileges: [ 'read' ] + query: | + { + "range": { + "@timestamp": {"gte": "2021-01-01T00:00:00"} + } + } + +logs_foo_after_2021_alias: + cluster: [] + indices: + - names: [ 'alias-foo' ] + privileges: [ 'read' ] + query: | + { + "range": { + "@timestamp": {"gte": "2021-01-01T00:00:00"} + } + } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequest.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequest.java index 4c01d326ed7bc..6014e24e39c5f 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequest.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequest.java @@ -118,6 +118,11 @@ public IndicesRequest indices(String... indices) { return this; } + @Override + public boolean includeDataStreams() { + return true; + } + @Override public IndicesOptions indicesOptions() { return indicesOptions; From a27e5dba731bbcb2859228ba463bc6e16a9e74cb Mon Sep 17 00:00:00 2001 From: Keith Massey Date: Tue, 10 Dec 2024 15:29:35 -0600 Subject: [PATCH 5/6] Disabling the migration reindex yaml rest tests unless feature is available (#118382) This disables the migration reindex yaml rest tests when the feature flag is not available, so that the build doesn't fail in release mode, like: ``` ./gradlew ":x-pack:plugin:yamlRestTest" --tests "org.elasticsearch.xpack.test.rest.XPackRestIT.test {p0=migrate/*}" -Dtests.seed=28C1E85204B9DCAE -Dbuild.snapshot=false -Dtests.jvm.argline="-Dbuild.snapshot=false" -Dlicense.key=x-pack/license-tools/src/test/resources/public.key -Dtests.locale=bn-BD -Dtests.timezone=America/Dawson -Druntime.java=23 ``` --- .../rest/RestMigrationReindexAction.java | 14 ++++++++++ .../rest-api-spec/test/migrate/10_reindex.yml | 28 +++++++++++++++++++ .../test/migrate/20_reindex_status.yml | 14 ++++++++++ 3 files changed, 56 insertions(+) diff --git a/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/rest/RestMigrationReindexAction.java b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/rest/RestMigrationReindexAction.java index a7f630d68234d..19cb439495e9a 100644 --- a/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/rest/RestMigrationReindexAction.java +++ b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/rest/RestMigrationReindexAction.java @@ -20,11 +20,16 @@ import org.elasticsearch.xpack.migrate.action.ReindexDataStreamAction.ReindexDataStreamResponse; import java.io.IOException; +import java.util.Collections; +import java.util.HashSet; import java.util.List; +import java.util.Set; import static org.elasticsearch.rest.RestRequest.Method.POST; +import static org.elasticsearch.xpack.migrate.action.ReindexDataStreamAction.REINDEX_DATA_STREAM_FEATURE_FLAG; public class RestMigrationReindexAction extends BaseRestHandler { + public static final String MIGRATION_REINDEX_CAPABILITY = "migration_reindex"; @Override public String getName() { @@ -49,6 +54,15 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli ); } + @Override + public Set supportedCapabilities() { + Set capabilities = new HashSet<>(); + if (REINDEX_DATA_STREAM_FEATURE_FLAG.isEnabled()) { + capabilities.add(MIGRATION_REINDEX_CAPABILITY); + } + return Collections.unmodifiableSet(capabilities); + } + static class ReindexDataStreamRestToXContentListener extends RestBuilderListener { ReindexDataStreamRestToXContentListener(RestChannel channel) { diff --git a/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/migrate/10_reindex.yml b/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/migrate/10_reindex.yml index 01a41b3aa8c94..f50a7a65f53d3 100644 --- a/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/migrate/10_reindex.yml +++ b/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/migrate/10_reindex.yml @@ -6,6 +6,13 @@ setup: --- "Test Reindex With Unsupported Mode": + - requires: + reason: "migration reindex is behind a feature flag" + test_runner_features: [capabilities] + capabilities: + - method: POST + path: /_migration/reindex + capabilities: [migration_reindex] - do: catch: /illegal_argument_exception/ migrate.reindex: @@ -19,6 +26,13 @@ setup: --- "Test Reindex With Nonexistent Data Stream": + - requires: + reason: "migration reindex is behind a feature flag" + test_runner_features: [capabilities] + capabilities: + - method: POST + path: /_migration/reindex + capabilities: [migration_reindex] - do: catch: /resource_not_found_exception/ migrate.reindex: @@ -44,6 +58,13 @@ setup: --- "Test Reindex With Bad Data Stream Name": + - requires: + reason: "migration reindex is behind a feature flag" + test_runner_features: [capabilities] + capabilities: + - method: POST + path: /_migration/reindex + capabilities: [migration_reindex] - do: catch: /illegal_argument_exception/ migrate.reindex: @@ -57,6 +78,13 @@ setup: --- "Test Reindex With Existing Data Stream": + - requires: + reason: "migration reindex is behind a feature flag" + test_runner_features: [capabilities] + capabilities: + - method: POST + path: /_migration/reindex + capabilities: [migration_reindex] - do: indices.put_index_template: name: my-template1 diff --git a/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/migrate/20_reindex_status.yml b/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/migrate/20_reindex_status.yml index 3fe133aeda70e..ae343a0b4db95 100644 --- a/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/migrate/20_reindex_status.yml +++ b/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/migrate/20_reindex_status.yml @@ -6,6 +6,13 @@ setup: --- "Test get reindex status with nonexistent task id": + - requires: + reason: "migration reindex is behind a feature flag" + test_runner_features: [capabilities] + capabilities: + - method: POST + path: /_migration/reindex + capabilities: [migration_reindex] - do: catch: /resource_not_found_exception/ migrate.get_reindex_status: @@ -13,6 +20,13 @@ setup: --- "Test Reindex With Existing Data Stream": + - requires: + reason: "migration reindex is behind a feature flag" + test_runner_features: [capabilities] + capabilities: + - method: POST + path: /_migration/reindex + capabilities: [migration_reindex] - do: indices.put_index_template: name: my-template1 From 47be542459ee024801fc2fd8bea5e8cc5dea2e0e Mon Sep 17 00:00:00 2001 From: Dianna Hohensee Date: Tue, 10 Dec 2024 16:33:57 -0500 Subject: [PATCH 6/6] Add a shard snapshot current status debug string (#118198) SnapshotShardsService will update the new debug string field in IndexShardSnapshotStatus as a shard snapshot operation proceeds so that the current state can be logged. The SnapshotShutdownProgressTracker will iterate through the SnapshotShardsService's list of shard snapshots and log their current status. We want to know where in the code a shard snapshot operation potentially gets stuck. This new field should be updated as frequently as is reasonable to inform on the shard snapshot's progress. Closes ES-10261 --- .../snapshots/SnapshotShutdownIT.java | 17 ++++ .../snapshots/IndexShardSnapshotStatus.java | 37 ++++++-- .../blobstore/BlobStoreRepository.java | 13 +++ .../snapshots/SnapshotShardsService.java | 87 ++++++++++++++++--- .../SnapshotShutdownProgressTracker.java | 18 +++- .../SnapshotShutdownProgressTrackerTests.java | 19 +++- 6 files changed, 166 insertions(+), 25 deletions(-) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/snapshots/SnapshotShutdownIT.java b/server/src/internalClusterTest/java/org/elasticsearch/snapshots/SnapshotShutdownIT.java index e5e641bfdda21..755ee960be73e 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/snapshots/SnapshotShutdownIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/snapshots/SnapshotShutdownIT.java @@ -524,6 +524,15 @@ public void testSnapshotShutdownProgressTracker() throws Exception { "Pause signals have been set for all shard snapshots on data node [" + nodeForRemovalId + "]" ) ); + mockLog.addExpectation( + new MockLog.SeenEventExpectation( + "SnapshotShutdownProgressTracker index shard snapshot status messages", + SnapshotShutdownProgressTracker.class.getCanonicalName(), + Level.INFO, + // Expect the shard snapshot to stall in data file upload, since we've blocked the data node file upload to the blob store. + "statusDescription='enqueued file snapshot tasks: threads running concurrent file uploads'" + ) + ); putShutdownForRemovalMetadata(nodeForRemoval, clusterService); @@ -583,6 +592,14 @@ public void testSnapshotShutdownProgressTracker() throws Exception { "Current active shard snapshot stats on data node [" + nodeForRemovalId + "]*Paused [" + numShards + "]" ) ); + mockLog.addExpectation( + new MockLog.SeenEventExpectation( + "SnapshotShutdownProgressTracker index shard snapshot messages", + SnapshotShutdownProgressTracker.class.getCanonicalName(), + Level.INFO, + "statusDescription='finished: master notification attempt complete'" + ) + ); // Release the master node to respond snapshotStatusUpdateLatch.countDown(); diff --git a/server/src/main/java/org/elasticsearch/index/snapshots/IndexShardSnapshotStatus.java b/server/src/main/java/org/elasticsearch/index/snapshots/IndexShardSnapshotStatus.java index d8bd460f6f819..6aa6a5e498789 100644 --- a/server/src/main/java/org/elasticsearch/index/snapshots/IndexShardSnapshotStatus.java +++ b/server/src/main/java/org/elasticsearch/index/snapshots/IndexShardSnapshotStatus.java @@ -98,6 +98,7 @@ public enum AbortStatus { private long processedSize; private String failure; private final SubscribableListener abortListeners = new SubscribableListener<>(); + private volatile String statusDescription; private IndexShardSnapshotStatus( final Stage stage, @@ -110,7 +111,8 @@ private IndexShardSnapshotStatus( final long totalSize, final long processedSize, final String failure, - final ShardGeneration generation + final ShardGeneration generation, + final String statusDescription ) { this.stage = new AtomicReference<>(Objects.requireNonNull(stage)); this.generation = new AtomicReference<>(generation); @@ -124,6 +126,7 @@ private IndexShardSnapshotStatus( this.processedSize = processedSize; this.incrementalSize = incrementalSize; this.failure = failure; + updateStatusDescription(statusDescription); } public synchronized Copy moveToStarted( @@ -272,6 +275,15 @@ public synchronized void addProcessedFiles(int count, long totalSize) { processedSize += totalSize; } + /** + * Updates the string explanation for what the snapshot is actively doing right now. + */ + public void updateStatusDescription(String statusString) { + assert statusString != null; + assert statusString.isEmpty() == false; + this.statusDescription = statusString; + } + /** * Returns a copy of the current {@link IndexShardSnapshotStatus}. This method is * intended to be used when a coherent state of {@link IndexShardSnapshotStatus} is needed. @@ -289,12 +301,13 @@ public synchronized IndexShardSnapshotStatus.Copy asCopy() { incrementalSize, totalSize, processedSize, - failure + failure, + statusDescription ); } public static IndexShardSnapshotStatus newInitializing(ShardGeneration generation) { - return new IndexShardSnapshotStatus(Stage.INIT, 0L, 0L, 0, 0, 0, 0, 0, 0, null, generation); + return new IndexShardSnapshotStatus(Stage.INIT, 0L, 0L, 0, 0, 0, 0, 0, 0, null, generation, "initializing"); } public static IndexShardSnapshotStatus.Copy newFailed(final String failure) { @@ -302,7 +315,7 @@ public static IndexShardSnapshotStatus.Copy newFailed(final String failure) { if (failure == null) { throw new IllegalArgumentException("A failure description is required for a failed IndexShardSnapshotStatus"); } - return new IndexShardSnapshotStatus(Stage.FAILURE, 0L, 0L, 0, 0, 0, 0, 0, 0, failure, null).asCopy(); + return new IndexShardSnapshotStatus(Stage.FAILURE, 0L, 0L, 0, 0, 0, 0, 0, 0, failure, null, "initialized as failed").asCopy(); } public static IndexShardSnapshotStatus.Copy newDone( @@ -326,7 +339,8 @@ public static IndexShardSnapshotStatus.Copy newDone( size, incrementalSize, null, - generation + generation, + "initialized as done" ).asCopy(); } @@ -345,6 +359,7 @@ public static class Copy { private final long processedSize; private final long incrementalSize; private final String failure; + private final String statusDescription; public Copy( final Stage stage, @@ -356,7 +371,8 @@ public Copy( final long incrementalSize, final long totalSize, final long processedSize, - final String failure + final String failure, + final String statusDescription ) { this.stage = stage; this.startTime = startTime; @@ -368,6 +384,7 @@ public Copy( this.processedSize = processedSize; this.incrementalSize = incrementalSize; this.failure = failure; + this.statusDescription = statusDescription; } public Stage getStage() { @@ -410,6 +427,10 @@ public String getFailure() { return failure; } + public String getStatusDescription() { + return statusDescription; + } + @Override public String toString() { return "index shard snapshot status (" @@ -433,6 +454,8 @@ public String toString() { + processedSize + ", failure='" + failure + + "', statusDescription='" + + statusDescription + '\'' + ')'; } @@ -461,6 +484,8 @@ public String toString() { + processedSize + ", failure='" + failure + + "', statusDescription='" + + statusDescription + '\'' + ')'; } diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java index cc22b60991703..11386eba10196 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java @@ -3186,6 +3186,7 @@ private void writeAtomic( @Override public void snapshotShard(SnapshotShardContext context) { + context.status().updateStatusDescription("queued in snapshot task runner"); shardSnapshotTaskRunner.enqueueShardSnapshot(context); } @@ -3198,6 +3199,7 @@ private void doSnapshotShard(SnapshotShardContext context) { final ShardId shardId = store.shardId(); final SnapshotId snapshotId = context.snapshotId(); final IndexShardSnapshotStatus snapshotStatus = context.status(); + snapshotStatus.updateStatusDescription("snapshot task runner: setting up shard snapshot"); final long startTime = threadPool.absoluteTimeInMillis(); try { final ShardGeneration generation = snapshotStatus.generation(); @@ -3206,6 +3208,7 @@ private void doSnapshotShard(SnapshotShardContext context) { final Set blobs; if (generation == null) { snapshotStatus.ensureNotAborted(); + snapshotStatus.updateStatusDescription("snapshot task runner: listing blob prefixes"); try { blobs = shardContainer.listBlobsByPrefix(OperationPurpose.SNAPSHOT_METADATA, SNAPSHOT_INDEX_PREFIX).keySet(); } catch (IOException e) { @@ -3216,6 +3219,7 @@ private void doSnapshotShard(SnapshotShardContext context) { } snapshotStatus.ensureNotAborted(); + snapshotStatus.updateStatusDescription("snapshot task runner: loading snapshot blobs"); Tuple tuple = buildBlobStoreIndexShardSnapshots( context.indexId(), shardId.id(), @@ -3316,6 +3320,7 @@ private void doSnapshotShard(SnapshotShardContext context) { indexCommitPointFiles = filesFromSegmentInfos; } + snapshotStatus.updateStatusDescription("snapshot task runner: starting shard snapshot"); snapshotStatus.moveToStarted( startTime, indexIncrementalFileCount, @@ -3342,6 +3347,7 @@ private void doSnapshotShard(SnapshotShardContext context) { BlobStoreIndexShardSnapshot.FileInfo.SERIALIZE_WRITER_UUID, Boolean.toString(writeFileInfoWriterUUID) ); + snapshotStatus.updateStatusDescription("snapshot task runner: updating blob store with new shard generation"); INDEX_SHARD_SNAPSHOTS_FORMAT.write( updatedBlobStoreIndexShardSnapshots, shardContainer, @@ -3387,6 +3393,7 @@ private void doSnapshotShard(SnapshotShardContext context) { BlobStoreIndexShardSnapshot.FileInfo.SERIALIZE_WRITER_UUID, Boolean.toString(writeFileInfoWriterUUID) ); + snapshotStatus.updateStatusDescription("no shard generations: writing new index-${N} file"); writeShardIndexBlobAtomic(shardContainer, newGen, updatedBlobStoreIndexShardSnapshots, serializationParams); } catch (IOException e) { throw new IndexShardSnapshotFailedException( @@ -3401,6 +3408,7 @@ private void doSnapshotShard(SnapshotShardContext context) { } snapshotStatus.addProcessedFiles(finalFilesInShardMetadataCount, finalFilesInShardMetadataSize); try { + snapshotStatus.updateStatusDescription("no shard generations: deleting blobs"); deleteFromContainer(OperationPurpose.SNAPSHOT_METADATA, shardContainer, blobsToDelete.iterator()); } catch (IOException e) { logger.warn( @@ -3414,6 +3422,7 @@ private void doSnapshotShard(SnapshotShardContext context) { // filesToSnapshot will be emptied while snapshotting the file. We make a copy here for cleanup purpose in case of failure. final AtomicReference> fileToCleanUp = new AtomicReference<>(List.copyOf(filesToSnapshot)); final ActionListener> allFilesUploadedListener = ActionListener.assertOnce(ActionListener.wrap(ignore -> { + snapshotStatus.updateStatusDescription("all files uploaded: finalizing"); final IndexShardSnapshotStatus.Copy lastSnapshotStatus = snapshotStatus.moveToFinalize(); // now create and write the commit point @@ -3435,6 +3444,7 @@ private void doSnapshotShard(SnapshotShardContext context) { BlobStoreIndexShardSnapshot.FileInfo.SERIALIZE_WRITER_UUID, Boolean.toString(writeFileInfoWriterUUID) ); + snapshotStatus.updateStatusDescription("all files uploaded: writing to index shard file"); INDEX_SHARD_SNAPSHOT_FORMAT.write( blobStoreIndexShardSnapshot, shardContainer, @@ -3451,10 +3461,12 @@ private void doSnapshotShard(SnapshotShardContext context) { ByteSizeValue.ofBytes(blobStoreIndexShardSnapshot.totalSize()), getSegmentInfoFileCount(blobStoreIndexShardSnapshot.indexFiles()) ); + snapshotStatus.updateStatusDescription("all files uploaded: done"); snapshotStatus.moveToDone(threadPool.absoluteTimeInMillis(), shardSnapshotResult); context.onResponse(shardSnapshotResult); }, e -> { try { + snapshotStatus.updateStatusDescription("all files uploaded: cleaning up data files, exception while finalizing: " + e); shardContainer.deleteBlobsIgnoringIfNotExists( OperationPurpose.SNAPSHOT_DATA, Iterators.flatMap(fileToCleanUp.get().iterator(), f -> Iterators.forRange(0, f.numberOfParts(), f::partName)) @@ -3517,6 +3529,7 @@ protected void snapshotFiles( ) { final int noOfFilesToSnapshot = filesToSnapshot.size(); final ActionListener filesListener = fileQueueListener(filesToSnapshot, noOfFilesToSnapshot, allFilesUploadedListener); + context.status().updateStatusDescription("enqueued file snapshot tasks: threads running concurrent file uploads"); for (int i = 0; i < noOfFilesToSnapshot; i++) { shardSnapshotTaskRunner.enqueueFileSnapshot(context, filesToSnapshot::poll, filesListener); } diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java index 234c0239a68ce..90111c44fbd96 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java @@ -61,6 +61,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.function.Supplier; import static java.util.Collections.emptyMap; import static org.elasticsearch.core.Strings.format; @@ -108,6 +109,7 @@ public SnapshotShardsService( this.threadPool = transportService.getThreadPool(); this.snapshotShutdownProgressTracker = new SnapshotShutdownProgressTracker( () -> clusterService.state().nodes().getLocalNodeId(), + (callerLogger) -> logIndexShardSnapshotStatuses(callerLogger), clusterService.getClusterSettings(), threadPool ); @@ -234,6 +236,14 @@ public void beforeIndexShardClosed(ShardId shardId, @Nullable IndexShard indexSh } } + private void logIndexShardSnapshotStatuses(Logger callerLogger) { + for (var snapshotStatuses : shardSnapshots.values()) { + for (var shardSnapshot : snapshotStatuses.entrySet()) { + callerLogger.info(Strings.format("ShardId %s, %s", shardSnapshot.getKey(), shardSnapshot.getValue())); + } + } + } + /** * Returns status of shards that are snapshotted on the node and belong to the given snapshot *

@@ -321,7 +331,8 @@ private void handleUpdatedSnapshotsInProgressEntry(String localNodeId, boolean r sid, ShardState.FAILED, shard.getValue().reason(), - shard.getValue().generation() + shard.getValue().generation(), + () -> null ); } } else { @@ -372,6 +383,7 @@ private void startNewShardSnapshots(String localNodeId, SnapshotsInProgress.Entr + snapshotStatus.generation() + "] for snapshot with old-format compatibility"; shardSnapshotTasks.add(newShardSnapshotTask(shardId, snapshot, indexId, snapshotStatus, entry.version(), entry.startTime())); + snapshotStatus.updateStatusDescription("shard snapshot scheduled to start"); } threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(() -> shardSnapshotTasks.forEach(Runnable::run)); @@ -383,6 +395,7 @@ private void pauseShardSnapshotsForNodeRemoval(String localNodeId, SnapshotsInPr for (final Map.Entry shardEntry : entry.shards().entrySet()) { final ShardId shardId = shardEntry.getKey(); final ShardSnapshotStatus masterShardSnapshotStatus = shardEntry.getValue(); + IndexShardSnapshotStatus indexShardSnapshotStatus = localShardSnapshots.get(shardId); if (masterShardSnapshotStatus.state() != ShardState.INIT) { // shard snapshot not currently scheduled by master @@ -402,7 +415,11 @@ private void pauseShardSnapshotsForNodeRemoval(String localNodeId, SnapshotsInPr shardId, ShardState.PAUSED_FOR_NODE_REMOVAL, "paused", - masterShardSnapshotStatus.generation() + masterShardSnapshotStatus.generation(), + () -> { + indexShardSnapshotStatus.updateStatusDescription("finished: master notification attempt complete"); + return null; + } ); } else { // shard snapshot currently running, mark for pause @@ -419,9 +436,16 @@ private Runnable newShardSnapshotTask( final IndexVersion entryVersion, final long entryStartTime ) { + Supplier postMasterNotificationAction = () -> { + snapshotStatus.updateStatusDescription("finished: master notification attempt complete"); + return null; + }; + + // Listener that runs on completion of the shard snapshot: it will notify the master node of success or failure. ActionListener snapshotResultListener = new ActionListener<>() { @Override public void onResponse(ShardSnapshotResult shardSnapshotResult) { + snapshotStatus.updateStatusDescription("snapshot succeeded: proceeding to notify master of success"); final ShardGeneration newGeneration = shardSnapshotResult.getGeneration(); assert newGeneration != null; assert newGeneration.equals(snapshotStatus.generation()); @@ -436,11 +460,13 @@ public void onResponse(ShardSnapshotResult shardSnapshotResult) { snapshotStatus.generation() ); } - notifySuccessfulSnapshotShard(snapshot, shardId, shardSnapshotResult); + + notifySuccessfulSnapshotShard(snapshot, shardId, shardSnapshotResult, postMasterNotificationAction); } @Override public void onFailure(Exception e) { + snapshotStatus.updateStatusDescription("failed with exception '" + e + ": proceeding to notify master of failure"); final String failure; final Stage nextStage; if (e instanceof AbortedSnapshotException) { @@ -457,7 +483,14 @@ public void onFailure(Exception e) { logger.warn(() -> format("[%s][%s] failed to snapshot shard", shardId, snapshot), e); } final var shardState = snapshotStatus.moveToUnsuccessful(nextStage, failure, threadPool.absoluteTimeInMillis()); - notifyUnsuccessfulSnapshotShard(snapshot, shardId, shardState, failure, snapshotStatus.generation()); + notifyUnsuccessfulSnapshotShard( + snapshot, + shardId, + shardState, + failure, + snapshotStatus.generation(), + postMasterNotificationAction + ); } }; @@ -508,6 +541,7 @@ private void snapshot( ActionListener resultListener ) { ActionListener.run(resultListener, listener -> { + snapshotStatus.updateStatusDescription("has started"); snapshotStatus.ensureNotAborted(); final IndexShard indexShard = indicesService.indexServiceSafe(shardId.getIndex()).getShard(shardId.id()); if (indexShard.routingEntry().primary() == false) { @@ -527,7 +561,9 @@ private void snapshot( final Repository repository = repositoriesService.repository(snapshot.getRepository()); SnapshotIndexCommit snapshotIndexCommit = null; try { + snapshotStatus.updateStatusDescription("acquiring commit reference from IndexShard: triggers a shard flush"); snapshotIndexCommit = new SnapshotIndexCommit(indexShard.acquireIndexCommitForSnapshot()); + snapshotStatus.updateStatusDescription("commit reference acquired, proceeding with snapshot"); final var shardStateId = getShardStateId(indexShard, snapshotIndexCommit.indexCommit()); // not aborted so indexCommit() ok snapshotStatus.addAbortListener(makeAbortListener(indexShard.shardId(), snapshot, snapshotIndexCommit)); snapshotStatus.ensureNotAborted(); @@ -652,8 +688,12 @@ private void syncShardStatsOnNewMaster(List entries) snapshot.snapshot(), shardId ); - notifySuccessfulSnapshotShard(snapshot.snapshot(), shardId, localShard.getValue().getShardSnapshotResult()); - + notifySuccessfulSnapshotShard( + snapshot.snapshot(), + shardId, + localShard.getValue().getShardSnapshotResult(), + () -> null + ); } else if (stage == Stage.FAILURE) { // but we think the shard failed - we need to make new master know that the shard failed logger.debug( @@ -667,7 +707,8 @@ private void syncShardStatsOnNewMaster(List entries) shardId, ShardState.FAILED, indexShardSnapshotStatus.getFailure(), - localShard.getValue().generation() + localShard.getValue().generation(), + () -> null ); } else if (stage == Stage.PAUSED) { // but we think the shard has paused - we need to make new master know that @@ -680,7 +721,8 @@ private void syncShardStatsOnNewMaster(List entries) shardId, ShardState.PAUSED_FOR_NODE_REMOVAL, indexShardSnapshotStatus.getFailure(), - localShard.getValue().generation() + localShard.getValue().generation(), + () -> null ); } } @@ -693,10 +735,20 @@ private void syncShardStatsOnNewMaster(List entries) /** * Notify the master node that the given shard snapshot completed successfully. */ - private void notifySuccessfulSnapshotShard(final Snapshot snapshot, final ShardId shardId, ShardSnapshotResult shardSnapshotResult) { + private void notifySuccessfulSnapshotShard( + final Snapshot snapshot, + final ShardId shardId, + ShardSnapshotResult shardSnapshotResult, + Supplier postMasterNotificationAction + ) { assert shardSnapshotResult != null; assert shardSnapshotResult.getGeneration() != null; - sendSnapshotShardUpdate(snapshot, shardId, ShardSnapshotStatus.success(clusterService.localNode().getId(), shardSnapshotResult)); + sendSnapshotShardUpdate( + snapshot, + shardId, + ShardSnapshotStatus.success(clusterService.localNode().getId(), shardSnapshotResult), + postMasterNotificationAction + ); } /** @@ -707,13 +759,15 @@ private void notifyUnsuccessfulSnapshotShard( final ShardId shardId, final ShardState shardState, final String failure, - final ShardGeneration generation + final ShardGeneration generation, + Supplier postMasterNotificationAction ) { assert shardState == ShardState.FAILED || shardState == ShardState.PAUSED_FOR_NODE_REMOVAL : shardState; sendSnapshotShardUpdate( snapshot, shardId, - new ShardSnapshotStatus(clusterService.localNode().getId(), shardState, generation, failure) + new ShardSnapshotStatus(clusterService.localNode().getId(), shardState, generation, failure), + postMasterNotificationAction ); if (shardState == ShardState.PAUSED_FOR_NODE_REMOVAL) { logger.debug( @@ -726,7 +780,12 @@ private void notifyUnsuccessfulSnapshotShard( } /** Updates the shard snapshot status by sending a {@link UpdateIndexShardSnapshotStatusRequest} to the master node */ - private void sendSnapshotShardUpdate(final Snapshot snapshot, final ShardId shardId, final ShardSnapshotStatus status) { + private void sendSnapshotShardUpdate( + final Snapshot snapshot, + final ShardId shardId, + final ShardSnapshotStatus status, + Supplier postMasterNotificationAction + ) { ActionListener updateResultListener = new ActionListener<>() { @Override public void onResponse(Void aVoid) { @@ -738,9 +797,11 @@ public void onFailure(Exception e) { logger.warn(() -> format("[%s][%s] failed to update snapshot state to [%s]", shardId, snapshot, status), e); } }; + snapshotShutdownProgressTracker.trackRequestSentToMaster(snapshot, shardId); var releaseTrackerRequestRunsBeforeResultListener = ActionListener.runBefore(updateResultListener, () -> { snapshotShutdownProgressTracker.releaseRequestSentToMaster(snapshot, shardId); + postMasterNotificationAction.get(); }); remoteFailedRequestDeduplicator.executeOnce( diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotShutdownProgressTracker.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotShutdownProgressTracker.java index 5d81e3c4e46af..45f2fb96fce4e 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotShutdownProgressTracker.java +++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotShutdownProgressTracker.java @@ -25,6 +25,7 @@ import java.util.Map; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Consumer; import java.util.function.Supplier; /** @@ -45,6 +46,7 @@ public class SnapshotShutdownProgressTracker { private static final Logger logger = LogManager.getLogger(SnapshotShutdownProgressTracker.class); private final Supplier getLocalNodeId; + private final Consumer logIndexShardSnapshotStatuses; private final ThreadPool threadPool; private volatile TimeValue progressLoggerInterval; @@ -83,8 +85,14 @@ public class SnapshotShutdownProgressTracker { private final AtomicLong abortedCount = new AtomicLong(); private final AtomicLong pausedCount = new AtomicLong(); - public SnapshotShutdownProgressTracker(Supplier localNodeIdSupplier, ClusterSettings clusterSettings, ThreadPool threadPool) { + public SnapshotShutdownProgressTracker( + Supplier localNodeIdSupplier, + Consumer logShardStatuses, + ClusterSettings clusterSettings, + ThreadPool threadPool + ) { this.getLocalNodeId = localNodeIdSupplier; + this.logIndexShardSnapshotStatuses = logShardStatuses; clusterSettings.initializeAndWatch( SNAPSHOT_PROGRESS_DURING_SHUTDOWN_LOG_INTERVAL_SETTING, value -> this.progressLoggerInterval = value @@ -122,14 +130,14 @@ private void cancelProgressLogger() { } /** - * Logs some statistics about shard snapshot progress. + * Logs information about shard snapshot progress. */ private void logProgressReport() { logger.info( """ Current active shard snapshot stats on data node [{}]. \ - Node shutdown cluster state update received at [{}]. \ - Finished signalling shard snapshots to pause at [{}]. \ + Node shutdown cluster state update received at [{} millis]. \ + Finished signalling shard snapshots to pause at [{} millis]. \ Number shard snapshots running [{}]. \ Number shard snapshots waiting for master node reply to status update request [{}] \ Shard snapshot completion stats since shutdown began: Done [{}]; Failed [{}]; Aborted [{}]; Paused [{}]\ @@ -144,6 +152,8 @@ private void logProgressReport() { abortedCount.get(), pausedCount.get() ); + // Use a callback to log the shard snapshot details. + logIndexShardSnapshotStatuses.accept(logger); } /** diff --git a/server/src/test/java/org/elasticsearch/snapshots/SnapshotShutdownProgressTrackerTests.java b/server/src/test/java/org/elasticsearch/snapshots/SnapshotShutdownProgressTrackerTests.java index fbf742ae2ea57..8adcb3eb9d5f4 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/SnapshotShutdownProgressTrackerTests.java +++ b/server/src/test/java/org/elasticsearch/snapshots/SnapshotShutdownProgressTrackerTests.java @@ -110,8 +110,10 @@ void simulateShardSnapshotsCompleting(SnapshotShutdownProgressTracker tracker, i } public void testTrackerLogsStats() { + final String dummyStatusMsg = "Dummy log message for index shard snapshot statuses"; SnapshotShutdownProgressTracker tracker = new SnapshotShutdownProgressTracker( getLocalNodeIdSupplier, + (callerLogger) -> callerLogger.info(dummyStatusMsg), clusterSettings, testThreadPool ); @@ -144,6 +146,14 @@ public void testTrackerLogsStats() { "*Shard snapshot completion stats since shutdown began: Done [2]; Failed [1]; Aborted [1]; Paused [1]*" ) ); + mockLog.addExpectation( + new MockLog.SeenEventExpectation( + "index shard snapshot statuses", + SnapshotShutdownProgressTracker.class.getCanonicalName(), + Level.INFO, + dummyStatusMsg + ) + ); // Simulate updating the shard snapshot completion stats. simulateShardSnapshotsCompleting(tracker, 5); @@ -171,6 +181,7 @@ public void testTrackerProgressLoggingIntervalSettingCanBeDisabled() { ); SnapshotShutdownProgressTracker tracker = new SnapshotShutdownProgressTracker( getLocalNodeIdSupplier, + (callerLogger) -> {}, clusterSettingsDisabledLogging, testThreadPool ); @@ -214,6 +225,7 @@ public void testTrackerIntervalSettingDynamically() { ); SnapshotShutdownProgressTracker tracker = new SnapshotShutdownProgressTracker( getLocalNodeIdSupplier, + (callerLogger) -> {}, clusterSettingsDisabledLogging, testThreadPool ); @@ -253,6 +265,7 @@ public void testTrackerIntervalSettingDynamically() { public void testTrackerPauseTimestamp() { SnapshotShutdownProgressTracker tracker = new SnapshotShutdownProgressTracker( getLocalNodeIdSupplier, + (callerLogger) -> {}, clusterSettings, testThreadPool ); @@ -263,7 +276,7 @@ public void testTrackerPauseTimestamp() { "pausing timestamp should be set", SnapshotShutdownProgressTracker.class.getName(), Level.INFO, - "*Finished signalling shard snapshots to pause at [" + testThreadPool.relativeTimeInMillis() + "]*" + "*Finished signalling shard snapshots to pause at [" + testThreadPool.relativeTimeInMillis() + " millis]*" ) ); @@ -283,6 +296,7 @@ public void testTrackerPauseTimestamp() { public void testTrackerRequestsToMaster() { SnapshotShutdownProgressTracker tracker = new SnapshotShutdownProgressTracker( getLocalNodeIdSupplier, + (callerLogger) -> {}, clusterSettings, testThreadPool ); @@ -335,6 +349,7 @@ public void testTrackerRequestsToMaster() { public void testTrackerClearShutdown() { SnapshotShutdownProgressTracker tracker = new SnapshotShutdownProgressTracker( getLocalNodeIdSupplier, + (callerLogger) -> {}, clusterSettings, testThreadPool ); @@ -345,7 +360,7 @@ public void testTrackerClearShutdown() { "pausing timestamp should be unset", SnapshotShutdownProgressTracker.class.getName(), Level.INFO, - "*Finished signalling shard snapshots to pause at [-1]*" + "*Finished signalling shard snapshots to pause at [-1 millis]*" ) );