From c9d8a3a2b6bff2130634ec1e572cdf2ccea203e4 Mon Sep 17 00:00:00 2001 From: Joe Gallo Date: Fri, 13 Dec 2024 13:47:51 -0500 Subject: [PATCH 1/7] Add replica handling to the ILM MountSnapshotStep (#118687) --- .../xpack/core/ilm/AsyncWaitStep.java | 3 + .../xpack/core/ilm/DeleteAction.java | 6 +- .../xpack/core/ilm/DownsampleAction.java | 3 +- .../xpack/core/ilm/ForceMergeAction.java | 3 +- .../xpack/core/ilm/MountSnapshotStep.java | 29 ++-- .../xpack/core/ilm/ReadOnlyAction.java | 3 +- .../core/ilm/SearchableSnapshotAction.java | 7 +- .../xpack/core/ilm/ShrinkAction.java | 3 +- .../core/ilm/TimeseriesLifecycleType.java | 13 +- .../WaitUntilTimeSeriesEndTimePassesStep.java | 5 +- .../core/ilm/MountSnapshotStepTests.java | 161 ++++++++++++------ .../ilm/SearchableSnapshotActionTests.java | 95 ++++------- ...UntilTimeSeriesEndTimePassesStepTests.java | 9 +- 13 files changed, 182 insertions(+), 158 deletions(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/AsyncWaitStep.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/AsyncWaitStep.java index 6a72af5bce5e9..fc5e8d473b763 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/AsyncWaitStep.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/AsyncWaitStep.java @@ -8,6 +8,7 @@ import org.elasticsearch.client.internal.Client; import org.elasticsearch.cluster.metadata.Metadata; +import org.elasticsearch.core.Nullable; import org.elasticsearch.core.TimeValue; import org.elasticsearch.index.Index; import org.elasticsearch.xcontent.ToXContentObject; @@ -20,6 +21,7 @@ */ public abstract class AsyncWaitStep extends Step { + @Nullable private final Client client; public AsyncWaitStep(StepKey key, StepKey nextStepKey, Client client) { @@ -27,6 +29,7 @@ public AsyncWaitStep(StepKey key, StepKey nextStepKey, Client client) { this.client = client; } + @Nullable protected Client getClient() { return client; } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/DeleteAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/DeleteAction.java index 8712cefac5d31..6c2ab86995a6d 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/DeleteAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/DeleteAction.java @@ -93,8 +93,7 @@ public List toSteps(Client client, String phase, Step.StepKey nextStepKey) WaitUntilTimeSeriesEndTimePassesStep waitUntilTimeSeriesEndTimeStep = new WaitUntilTimeSeriesEndTimePassesStep( waitTimeSeriesEndTimePassesKey, cleanSnapshotKey, - Instant::now, - client + Instant::now ); CleanupSnapshotStep cleanupSnapshotStep = new CleanupSnapshotStep(cleanSnapshotKey, deleteStepKey, client); DeleteStep deleteStep = new DeleteStep(deleteStepKey, nextStepKey, client); @@ -108,8 +107,7 @@ public List toSteps(Client client, String phase, Step.StepKey nextStepKey) WaitUntilTimeSeriesEndTimePassesStep waitUntilTimeSeriesEndTimeStep = new WaitUntilTimeSeriesEndTimePassesStep( waitTimeSeriesEndTimePassesKey, deleteStepKey, - Instant::now, - client + Instant::now ); DeleteStep deleteStep = new DeleteStep(deleteStepKey, nextStepKey, client); return List.of(waitForNoFollowersStep, waitUntilTimeSeriesEndTimeStep, deleteStep); diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/DownsampleAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/DownsampleAction.java index 697f948e47832..6ce9e05e4a464 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/DownsampleAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/DownsampleAction.java @@ -200,8 +200,7 @@ public List toSteps(Client client, String phase, StepKey nextStepKey) { WaitUntilTimeSeriesEndTimePassesStep waitUntilTimeSeriesEndTimeStep = new WaitUntilTimeSeriesEndTimePassesStep( waitTimeSeriesEndTimePassesKey, readOnlyKey, - Instant::now, - client + Instant::now ); // Mark source index as read-only ReadOnlyStep readOnlyStep = new ReadOnlyStep(readOnlyKey, generateDownsampleIndexNameKey, client); diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/ForceMergeAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/ForceMergeAction.java index f8f4ce2bb0354..ac398bccb64e4 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/ForceMergeAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/ForceMergeAction.java @@ -162,8 +162,7 @@ public List toSteps(Client client, String phase, Step.StepKey nextStepKey) WaitUntilTimeSeriesEndTimePassesStep waitUntilTimeSeriesEndTimeStep = new WaitUntilTimeSeriesEndTimePassesStep( waitTimeSeriesEndTimePassesKey, codecChange ? closeKey : forceMergeKey, - Instant::now, - client + Instant::now ); // Indices already in this step key when upgrading need to know how to move forward but stop making the index diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/MountSnapshotStep.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/MountSnapshotStep.java index 7d045f2950e1b..82d41b91fea4f 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/MountSnapshotStep.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/MountSnapshotStep.java @@ -41,6 +41,7 @@ public class MountSnapshotStep extends AsyncRetryDuringSnapshotActionStep { private final MountSearchableSnapshotRequest.Storage storageType; @Nullable private final Integer totalShardsPerNode; + private final int replicas; public MountSnapshotStep( StepKey key, @@ -48,7 +49,8 @@ public MountSnapshotStep( Client client, String restoredIndexPrefix, MountSearchableSnapshotRequest.Storage storageType, - @Nullable Integer totalShardsPerNode + @Nullable Integer totalShardsPerNode, + int replicas ) { super(key, nextStepKey, client); this.restoredIndexPrefix = restoredIndexPrefix; @@ -57,16 +59,10 @@ public MountSnapshotStep( throw new IllegalArgumentException("[" + SearchableSnapshotAction.TOTAL_SHARDS_PER_NODE.getPreferredName() + "] must be >= 1"); } this.totalShardsPerNode = totalShardsPerNode; - } - public MountSnapshotStep( - StepKey key, - StepKey nextStepKey, - Client client, - String restoredIndexPrefix, - MountSearchableSnapshotRequest.Storage storageType - ) { - this(key, nextStepKey, client, restoredIndexPrefix, storageType, null); + // this isn't directly settable by the user, so validation by assertion is sufficient + assert replicas >= 0 : "number of replicas must be gte zero, but was [" + replicas + "]"; + this.replicas = replicas; } @Override @@ -87,6 +83,10 @@ public Integer getTotalShardsPerNode() { return totalShardsPerNode; } + public int getReplicas() { + return replicas; + } + @Override void performDuringNoSnapshot(IndexMetadata indexMetadata, ClusterState currentClusterState, ActionListener listener) { String indexName = indexMetadata.getIndex().getName(); @@ -162,11 +162,13 @@ void performDuringNoSnapshot(IndexMetadata indexMetadata, ClusterState currentCl } final Settings.Builder settingsBuilder = Settings.builder(); - overrideTierPreference(this.getKey().phase()).ifPresent(override -> settingsBuilder.put(DataTier.TIER_PREFERENCE, override)); if (totalShardsPerNode != null) { settingsBuilder.put(ShardsLimitAllocationDecider.INDEX_TOTAL_SHARDS_PER_NODE_SETTING.getKey(), totalShardsPerNode); } + if (replicas > 0) { + settingsBuilder.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, replicas); + } final MountSearchableSnapshotRequest mountSearchableSnapshotRequest = new MountSearchableSnapshotRequest( TimeValue.MAX_VALUE, @@ -245,7 +247,7 @@ String[] ignoredIndexSettings() { @Override public int hashCode() { - return Objects.hash(super.hashCode(), restoredIndexPrefix, storageType, totalShardsPerNode); + return Objects.hash(super.hashCode(), restoredIndexPrefix, storageType, totalShardsPerNode, replicas); } @Override @@ -260,6 +262,7 @@ public boolean equals(Object obj) { return super.equals(obj) && Objects.equals(restoredIndexPrefix, other.restoredIndexPrefix) && Objects.equals(storageType, other.storageType) - && Objects.equals(totalShardsPerNode, other.totalShardsPerNode); + && Objects.equals(totalShardsPerNode, other.totalShardsPerNode) + && Objects.equals(replicas, other.replicas); } } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/ReadOnlyAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/ReadOnlyAction.java index 2b03dc77eb5b6..b36156842acf5 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/ReadOnlyAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/ReadOnlyAction.java @@ -67,8 +67,7 @@ public List toSteps(Client client, String phase, StepKey nextStepKey) { WaitUntilTimeSeriesEndTimePassesStep waitUntilTimeSeriesEndTimeStep = new WaitUntilTimeSeriesEndTimePassesStep( waitTimeSeriesEndTimePassesKey, readOnlyKey, - Instant::now, - client + Instant::now ); ReadOnlyStep readOnlyStep = new ReadOnlyStep(readOnlyKey, nextStepKey, client); return List.of(checkNotWriteIndexStep, waitUntilTimeSeriesEndTimeStep, readOnlyStep); diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/SearchableSnapshotAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/SearchableSnapshotAction.java index f585575534b76..b746ee8ea7c07 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/SearchableSnapshotAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/SearchableSnapshotAction.java @@ -113,6 +113,7 @@ public String getSnapshotRepository() { return snapshotRepository; } + @Nullable public Integer getTotalShardsPerNode() { return totalShardsPerNode; } @@ -230,8 +231,7 @@ public List toSteps(Client client, String phase, StepKey nextStepKey, XPac WaitUntilTimeSeriesEndTimePassesStep waitUntilTimeSeriesEndTimeStep = new WaitUntilTimeSeriesEndTimePassesStep( waitTimeSeriesEndTimePassesKey, skipGeneratingSnapshotKey, - Instant::now, - client + Instant::now ); // When generating a snapshot, we either jump to the force merge step, or we skip the @@ -318,7 +318,8 @@ public List toSteps(Client client, String phase, StepKey nextStepKey, XPac client, getRestoredIndexPrefix(mountSnapshotKey), storageType, - totalShardsPerNode + totalShardsPerNode, + 0 ); WaitForIndexColorStep waitForGreenIndexHealthStep = new WaitForIndexColorStep( waitForGreenRestoredIndexKey, diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/ShrinkAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/ShrinkAction.java index 70ec5da1d8a2a..f7478518613e2 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/ShrinkAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/ShrinkAction.java @@ -231,8 +231,7 @@ public List toSteps(Client client, String phase, Step.StepKey nextStepKey) WaitUntilTimeSeriesEndTimePassesStep waitUntilTimeSeriesEndTimeStep = new WaitUntilTimeSeriesEndTimePassesStep( waitTimeSeriesEndTimePassesKey, readOnlyKey, - Instant::now, - client + Instant::now ); ReadOnlyStep readOnlyStep = new ReadOnlyStep(readOnlyKey, checkTargetShardsCountKey, client); CheckTargetShardsCountStep checkTargetShardsCountStep = new CheckTargetShardsCountStep( diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/TimeseriesLifecycleType.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/TimeseriesLifecycleType.java index 0fd280f440f39..10a4c7086a0cc 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/TimeseriesLifecycleType.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/TimeseriesLifecycleType.java @@ -27,7 +27,6 @@ import java.util.Set; import java.util.function.Function; import java.util.stream.Collectors; -import java.util.stream.Stream; /** * Represents the lifecycle of an index from creation to deletion. A @@ -49,7 +48,7 @@ public class TimeseriesLifecycleType implements LifecycleType { static final String DELETE_PHASE = "delete"; public static final List ORDERED_VALID_PHASES = List.of(HOT_PHASE, WARM_PHASE, COLD_PHASE, FROZEN_PHASE, DELETE_PHASE); - public static final List ORDERED_VALID_HOT_ACTIONS = Stream.of( + public static final List ORDERED_VALID_HOT_ACTIONS = List.of( SetPriorityAction.NAME, UnfollowAction.NAME, RolloverAction.NAME, @@ -58,8 +57,8 @@ public class TimeseriesLifecycleType implements LifecycleType { ShrinkAction.NAME, ForceMergeAction.NAME, SearchableSnapshotAction.NAME - ).filter(Objects::nonNull).toList(); - public static final List ORDERED_VALID_WARM_ACTIONS = Stream.of( + ); + public static final List ORDERED_VALID_WARM_ACTIONS = List.of( SetPriorityAction.NAME, UnfollowAction.NAME, ReadOnlyAction.NAME, @@ -68,8 +67,8 @@ public class TimeseriesLifecycleType implements LifecycleType { MigrateAction.NAME, ShrinkAction.NAME, ForceMergeAction.NAME - ).filter(Objects::nonNull).toList(); - public static final List ORDERED_VALID_COLD_ACTIONS = Stream.of( + ); + public static final List ORDERED_VALID_COLD_ACTIONS = List.of( SetPriorityAction.NAME, UnfollowAction.NAME, ReadOnlyAction.NAME, @@ -78,7 +77,7 @@ public class TimeseriesLifecycleType implements LifecycleType { AllocateAction.NAME, MigrateAction.NAME, FreezeAction.NAME - ).filter(Objects::nonNull).toList(); + ); public static final List ORDERED_VALID_FROZEN_ACTIONS = List.of(UnfollowAction.NAME, SearchableSnapshotAction.NAME); public static final List ORDERED_VALID_DELETE_ACTIONS = List.of(WaitForSnapshotAction.NAME, DeleteAction.NAME); diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/WaitUntilTimeSeriesEndTimePassesStep.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/WaitUntilTimeSeriesEndTimePassesStep.java index 50a7d48672c8e..3e190a26dd961 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/WaitUntilTimeSeriesEndTimePassesStep.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/WaitUntilTimeSeriesEndTimePassesStep.java @@ -6,7 +6,6 @@ */ package org.elasticsearch.xpack.core.ilm; -import org.elasticsearch.client.internal.Client; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.common.Strings; @@ -33,8 +32,8 @@ public class WaitUntilTimeSeriesEndTimePassesStep extends AsyncWaitStep { public static final String NAME = "check-ts-end-time-passed"; private final Supplier nowSupplier; - public WaitUntilTimeSeriesEndTimePassesStep(StepKey key, StepKey nextStepKey, Supplier nowSupplier, Client client) { - super(key, nextStepKey, client); + public WaitUntilTimeSeriesEndTimePassesStep(StepKey key, StepKey nextStepKey, Supplier nowSupplier) { + super(key, nextStepKey, null); this.nowSupplier = nowSupplier; } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/MountSnapshotStepTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/MountSnapshotStepTests.java index 8ca7a00ab0948..7ccdb1a27326a 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/MountSnapshotStepTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/MountSnapshotStepTests.java @@ -16,6 +16,7 @@ import org.elasticsearch.cluster.metadata.LifecycleExecutionState; import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.cluster.routing.allocation.decider.ShardsLimitAllocationDecider; +import org.elasticsearch.core.Nullable; import org.elasticsearch.index.IndexVersion; import org.elasticsearch.snapshots.RestoreInfo; import org.elasticsearch.test.client.NoOpClient; @@ -42,7 +43,7 @@ public MountSnapshotStep createRandomInstance() { String restoredIndexPrefix = randomAlphaOfLength(10); MountSearchableSnapshotRequest.Storage storage = randomStorageType(); Integer totalShardsPerNode = randomTotalShardsPerNode(true); - return new MountSnapshotStep(stepKey, nextStepKey, client, restoredIndexPrefix, storage, totalShardsPerNode); + return new MountSnapshotStep(stepKey, nextStepKey, client, restoredIndexPrefix, storage, totalShardsPerNode, 0); } public static MountSearchableSnapshotRequest.Storage randomStorageType() { @@ -61,7 +62,8 @@ protected MountSnapshotStep copyInstance(MountSnapshotStep instance) { instance.getClient(), instance.getRestoredIndexPrefix(), instance.getStorage(), - instance.getTotalShardsPerNode() + instance.getTotalShardsPerNode(), + instance.getReplicas() ); } @@ -72,7 +74,8 @@ public MountSnapshotStep mutateInstance(MountSnapshotStep instance) { String restoredIndexPrefix = instance.getRestoredIndexPrefix(); MountSearchableSnapshotRequest.Storage storage = instance.getStorage(); Integer totalShardsPerNode = instance.getTotalShardsPerNode(); - switch (between(0, 4)) { + int replicas = instance.getReplicas(); + switch (between(0, 5)) { case 0: key = new StepKey(key.phase(), key.action(), key.name() + randomAlphaOfLength(5)); break; @@ -94,10 +97,13 @@ public MountSnapshotStep mutateInstance(MountSnapshotStep instance) { case 4: totalShardsPerNode = totalShardsPerNode == null ? 1 : totalShardsPerNode + randomIntBetween(1, 100); break; + case 5: + replicas = replicas == 0 ? 1 : 0; // swap between 0 and 1 + break; default: throw new AssertionError("Illegal randomisation branch"); } - return new MountSnapshotStep(key, nextKey, instance.getClient(), restoredIndexPrefix, storage, totalShardsPerNode); + return new MountSnapshotStep(key, nextKey, instance.getClient(), restoredIndexPrefix, storage, totalShardsPerNode, replicas); } public void testCreateWithInvalidTotalShardsPerNode() throws Exception { @@ -111,7 +117,8 @@ public void testCreateWithInvalidTotalShardsPerNode() throws Exception { client, RESTORED_INDEX_PREFIX, randomStorageType(), - invalidTotalShardsPerNode + invalidTotalShardsPerNode, + 0 ) ); assertEquals("[total_shards_per_node] must be >= 1", exception.getMessage()); @@ -195,14 +202,18 @@ public void testPerformAction() throws Exception { indexName, RESTORED_INDEX_PREFIX, indexName, - new String[] { LifecycleSettings.LIFECYCLE_NAME } + new String[] { LifecycleSettings.LIFECYCLE_NAME }, + null, + 0 ); MountSnapshotStep step = new MountSnapshotStep( randomStepKey(), randomStepKey(), client, RESTORED_INDEX_PREFIX, - randomStorageType() + randomStorageType(), + null, + 0 ); performActionAndWait(step, indexMetadata, clusterState, null); } @@ -237,7 +248,9 @@ public void testResponseStatusHandling() throws Exception { randomStepKey(), clientPropagatingOKResponse, RESTORED_INDEX_PREFIX, - randomStorageType() + randomStorageType(), + null, + 0 ); performActionAndWait(step, indexMetadata, clusterState, null); } @@ -252,7 +265,9 @@ public void testResponseStatusHandling() throws Exception { randomStepKey(), clientPropagatingACCEPTEDResponse, RESTORED_INDEX_PREFIX, - randomStorageType() + randomStorageType(), + null, + 0 ); performActionAndWait(step, indexMetadata, clusterState, null); } @@ -289,47 +304,49 @@ public void testMountWithPartialAndRestoredPrefix() throws Exception { ); } - public void doTestMountWithoutSnapshotIndexNameInState(String prefix) throws Exception { - { - String indexNameSnippet = randomAlphaOfLength(10); - String indexName = prefix + indexNameSnippet; - String policyName = "test-ilm-policy"; - Map ilmCustom = new HashMap<>(); - String snapshotName = indexName + "-" + policyName; - ilmCustom.put("snapshot_name", snapshotName); - String repository = "repository"; - ilmCustom.put("snapshot_repository", repository); + private void doTestMountWithoutSnapshotIndexNameInState(String prefix) throws Exception { + String indexNameSnippet = randomAlphaOfLength(10); + String indexName = prefix + indexNameSnippet; + String policyName = "test-ilm-policy"; + Map ilmCustom = new HashMap<>(); + String snapshotName = indexName + "-" + policyName; + ilmCustom.put("snapshot_name", snapshotName); + String repository = "repository"; + ilmCustom.put("snapshot_repository", repository); - IndexMetadata.Builder indexMetadataBuilder = IndexMetadata.builder(indexName) - .settings(settings(IndexVersion.current()).put(LifecycleSettings.LIFECYCLE_NAME, policyName)) - .putCustom(LifecycleExecutionState.ILM_CUSTOM_METADATA_KEY, ilmCustom) - .numberOfShards(randomIntBetween(1, 5)) - .numberOfReplicas(randomIntBetween(0, 5)); - IndexMetadata indexMetadata = indexMetadataBuilder.build(); + IndexMetadata.Builder indexMetadataBuilder = IndexMetadata.builder(indexName) + .settings(settings(IndexVersion.current()).put(LifecycleSettings.LIFECYCLE_NAME, policyName)) + .putCustom(LifecycleExecutionState.ILM_CUSTOM_METADATA_KEY, ilmCustom) + .numberOfShards(randomIntBetween(1, 5)) + .numberOfReplicas(randomIntBetween(0, 5)); + IndexMetadata indexMetadata = indexMetadataBuilder.build(); - ClusterState clusterState = ClusterState.builder(emptyClusterState()) - .metadata(Metadata.builder().put(indexMetadata, true).build()) - .build(); + ClusterState clusterState = ClusterState.builder(emptyClusterState()) + .metadata(Metadata.builder().put(indexMetadata, true).build()) + .build(); - try (var threadPool = createThreadPool()) { - final var client = getRestoreSnapshotRequestAssertingClient( - threadPool, - repository, - snapshotName, - indexName, - RESTORED_INDEX_PREFIX, - indexNameSnippet, - new String[] { LifecycleSettings.LIFECYCLE_NAME } - ); - MountSnapshotStep step = new MountSnapshotStep( - randomStepKey(), - randomStepKey(), - client, - RESTORED_INDEX_PREFIX, - randomStorageType() - ); - performActionAndWait(step, indexMetadata, clusterState, null); - } + try (var threadPool = createThreadPool()) { + final var client = getRestoreSnapshotRequestAssertingClient( + threadPool, + repository, + snapshotName, + indexName, + RESTORED_INDEX_PREFIX, + indexNameSnippet, + new String[] { LifecycleSettings.LIFECYCLE_NAME }, + null, + 0 + ); + MountSnapshotStep step = new MountSnapshotStep( + randomStepKey(), + randomStepKey(), + client, + RESTORED_INDEX_PREFIX, + randomStorageType(), + null, + 0 + ); + performActionAndWait(step, indexMetadata, clusterState, null); } } @@ -361,7 +378,11 @@ public void testIgnoreTotalShardsPerNodeInFrozenPhase() throws Exception { indexName, RESTORED_INDEX_PREFIX, indexName, - new String[] { LifecycleSettings.LIFECYCLE_NAME, ShardsLimitAllocationDecider.INDEX_TOTAL_SHARDS_PER_NODE_SETTING.getKey() } + new String[] { + LifecycleSettings.LIFECYCLE_NAME, + ShardsLimitAllocationDecider.INDEX_TOTAL_SHARDS_PER_NODE_SETTING.getKey() }, + null, + 0 ); MountSnapshotStep step = new MountSnapshotStep( new StepKey(TimeseriesLifecycleType.FROZEN_PHASE, randomAlphaOfLength(10), randomAlphaOfLength(10)), @@ -369,13 +390,14 @@ public void testIgnoreTotalShardsPerNodeInFrozenPhase() throws Exception { client, RESTORED_INDEX_PREFIX, randomStorageType(), - null + null, + 0 ); performActionAndWait(step, indexMetadata, clusterState, null); } } - public void testDoNotIgnoreTotalShardsPerNodeIfSet() throws Exception { + public void testDoNotIgnoreTotalShardsPerNodeAndReplicasIfSet() throws Exception { String indexName = randomAlphaOfLength(10); String policyName = "test-ilm-policy"; Map ilmCustom = new HashMap<>(); @@ -395,6 +417,9 @@ public void testDoNotIgnoreTotalShardsPerNodeIfSet() throws Exception { .metadata(Metadata.builder().put(indexMetadata, true).build()) .build(); + final Integer totalShardsPerNode = randomTotalShardsPerNode(false); + final int replicas = randomIntBetween(1, 5); + try (var threadPool = createThreadPool()) { final var client = getRestoreSnapshotRequestAssertingClient( threadPool, @@ -403,7 +428,9 @@ public void testDoNotIgnoreTotalShardsPerNodeIfSet() throws Exception { indexName, RESTORED_INDEX_PREFIX, indexName, - new String[] { LifecycleSettings.LIFECYCLE_NAME } + new String[] { LifecycleSettings.LIFECYCLE_NAME }, + totalShardsPerNode, + replicas ); MountSnapshotStep step = new MountSnapshotStep( new StepKey(TimeseriesLifecycleType.FROZEN_PHASE, randomAlphaOfLength(10), randomAlphaOfLength(10)), @@ -411,7 +438,8 @@ public void testDoNotIgnoreTotalShardsPerNodeIfSet() throws Exception { client, RESTORED_INDEX_PREFIX, randomStorageType(), - randomTotalShardsPerNode(false) + totalShardsPerNode, + replicas ); performActionAndWait(step, indexMetadata, clusterState, null); } @@ -439,7 +467,9 @@ private NoOpClient getRestoreSnapshotRequestAssertingClient( String indexName, String restoredIndexPrefix, String expectedSnapshotIndexName, - String[] expectedIgnoredIndexSettings + String[] expectedIgnoredIndexSettings, + @Nullable Integer totalShardsPerNode, + int replicas ) { return new NoOpClient(threadPool) { @Override @@ -462,6 +492,31 @@ protected void assertThat(mountSearchableSnapshotRequest.mountedIndexName(), is(restoredIndexPrefix + indexName)); assertThat(mountSearchableSnapshotRequest.snapshotIndexName(), is(expectedSnapshotIndexName)); + if (totalShardsPerNode != null) { + Integer totalShardsPerNodeSettingValue = ShardsLimitAllocationDecider.INDEX_TOTAL_SHARDS_PER_NODE_SETTING.get( + mountSearchableSnapshotRequest.indexSettings() + ); + assertThat(totalShardsPerNodeSettingValue, is(totalShardsPerNode)); + } else { + assertThat( + mountSearchableSnapshotRequest.indexSettings() + .hasValue(ShardsLimitAllocationDecider.INDEX_TOTAL_SHARDS_PER_NODE_SETTING.getKey()), + is(false) + ); + } + + if (replicas > 0) { + Integer numberOfReplicasSettingValue = IndexMetadata.INDEX_NUMBER_OF_REPLICAS_SETTING.get( + mountSearchableSnapshotRequest.indexSettings() + ); + assertThat(numberOfReplicasSettingValue, is(replicas)); + } else { + assertThat( + mountSearchableSnapshotRequest.indexSettings().hasValue(IndexMetadata.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey()), + is(false) + ); + } + // invoke the awaiting listener with a very generic 'response', just to fulfill the contract listener.onResponse((Response) new RestoreSnapshotResponse((RestoreInfo) null)); } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/SearchableSnapshotActionTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/SearchableSnapshotActionTests.java index ca219fdde3d57..5304b7885f96c 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/SearchableSnapshotActionTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/SearchableSnapshotActionTests.java @@ -14,6 +14,8 @@ import java.io.IOException; import java.util.List; +import java.util.Objects; +import java.util.stream.Stream; import static org.elasticsearch.xpack.core.ilm.SearchableSnapshotAction.NAME; import static org.elasticsearch.xpack.core.ilm.SearchableSnapshotAction.TOTAL_SHARDS_PER_NODE; @@ -29,40 +31,23 @@ public void testToSteps() { StepKey nextStepKey = new StepKey(phase, randomAlphaOfLengthBetween(1, 5), randomAlphaOfLengthBetween(1, 5)); List steps = action.toSteps(null, phase, nextStepKey, null); - assertThat(steps.size(), is(action.isForceMergeIndex() ? 19 : 17)); - - List expectedSteps = action.isForceMergeIndex() - ? expectedStepKeysWithForceMerge(phase) - : expectedStepKeysNoForceMerge(phase); - - assertThat(steps.get(0).getKey(), is(expectedSteps.get(0))); - assertThat(steps.get(1).getKey(), is(expectedSteps.get(1))); - assertThat(steps.get(2).getKey(), is(expectedSteps.get(2))); - assertThat(steps.get(3).getKey(), is(expectedSteps.get(3))); - assertThat(steps.get(4).getKey(), is(expectedSteps.get(4))); - assertThat(steps.get(5).getKey(), is(expectedSteps.get(5))); - assertThat(steps.get(6).getKey(), is(expectedSteps.get(6))); - assertThat(steps.get(7).getKey(), is(expectedSteps.get(7))); - assertThat(steps.get(8).getKey(), is(expectedSteps.get(8))); - assertThat(steps.get(9).getKey(), is(expectedSteps.get(9))); - assertThat(steps.get(10).getKey(), is(expectedSteps.get(10))); - assertThat(steps.get(11).getKey(), is(expectedSteps.get(11))); - assertThat(steps.get(12).getKey(), is(expectedSteps.get(12))); - assertThat(steps.get(13).getKey(), is(expectedSteps.get(13))); - assertThat(steps.get(14).getKey(), is(expectedSteps.get(14))); - assertThat(steps.get(15).getKey(), is(expectedSteps.get(15))); - - if (action.isForceMergeIndex()) { - assertThat(steps.get(16).getKey(), is(expectedSteps.get(16))); - assertThat(steps.get(17).getKey(), is(expectedSteps.get(17))); - CreateSnapshotStep createSnapshotStep = (CreateSnapshotStep) steps.get(9); - assertThat(createSnapshotStep.getNextKeyOnIncomplete(), is(expectedSteps.get(8))); - validateWaitForDataTierStep(phase, steps, 10, 11); - } else { - CreateSnapshotStep createSnapshotStep = (CreateSnapshotStep) steps.get(7); - assertThat(createSnapshotStep.getNextKeyOnIncomplete(), is(expectedSteps.get(6))); - validateWaitForDataTierStep(phase, steps, 8, 9); + + List expectedSteps = expectedStepKeys(phase, action.isForceMergeIndex()); + assertThat(steps.size(), is(expectedSteps.size())); + for (int i = 0; i < expectedSteps.size(); i++) { + assertThat("steps match expectation at index " + i, steps.get(i).getKey(), is(expectedSteps.get(i))); + } + + int index = -1; + for (int i = 0; i < expectedSteps.size(); i++) { + if (expectedSteps.get(i).name().equals(CreateSnapshotStep.NAME)) { + index = i; + break; + } } + CreateSnapshotStep createSnapshotStep = (CreateSnapshotStep) steps.get(index); + assertThat(createSnapshotStep.getNextKeyOnIncomplete(), is(expectedSteps.get(index - 1))); + validateWaitForDataTierStep(phase, steps, index + 1, index + 2); } private void validateWaitForDataTierStep(String phase, List steps, int waitForDataTierStepIndex, int mountStepIndex) { @@ -108,15 +93,15 @@ public void testCreateWithInvalidTotalShardsPerNode() { assertEquals("[" + TOTAL_SHARDS_PER_NODE.getPreferredName() + "] must be >= 1", exception.getMessage()); } - private List expectedStepKeysWithForceMerge(String phase) { - return List.of( + private List expectedStepKeys(String phase, boolean forceMergeIndex) { + return Stream.of( new StepKey(phase, NAME, SearchableSnapshotAction.CONDITIONAL_SKIP_ACTION_STEP), new StepKey(phase, NAME, CheckNotDataStreamWriteIndexStep.NAME), new StepKey(phase, NAME, WaitForNoFollowersStep.NAME), new StepKey(phase, NAME, WaitUntilTimeSeriesEndTimePassesStep.NAME), new StepKey(phase, NAME, SearchableSnapshotAction.CONDITIONAL_SKIP_GENERATE_AND_CLEAN), - new StepKey(phase, NAME, ForceMergeStep.NAME), - new StepKey(phase, NAME, SegmentCountStep.NAME), + forceMergeIndex ? new StepKey(phase, NAME, ForceMergeStep.NAME) : null, + forceMergeIndex ? new StepKey(phase, NAME, SegmentCountStep.NAME) : null, new StepKey(phase, NAME, GenerateSnapshotNameStep.NAME), new StepKey(phase, NAME, CleanupSnapshotStep.NAME), new StepKey(phase, NAME, CreateSnapshotStep.NAME), @@ -129,29 +114,7 @@ private List expectedStepKeysWithForceMerge(String phase) { new StepKey(phase, NAME, ReplaceDataStreamBackingIndexStep.NAME), new StepKey(phase, NAME, DeleteStep.NAME), new StepKey(phase, NAME, SwapAliasesAndDeleteSourceIndexStep.NAME) - ); - } - - private List expectedStepKeysNoForceMerge(String phase) { - return List.of( - new StepKey(phase, NAME, SearchableSnapshotAction.CONDITIONAL_SKIP_ACTION_STEP), - new StepKey(phase, NAME, CheckNotDataStreamWriteIndexStep.NAME), - new StepKey(phase, NAME, WaitForNoFollowersStep.NAME), - new StepKey(phase, NAME, WaitUntilTimeSeriesEndTimePassesStep.NAME), - new StepKey(phase, NAME, SearchableSnapshotAction.CONDITIONAL_SKIP_GENERATE_AND_CLEAN), - new StepKey(phase, NAME, GenerateSnapshotNameStep.NAME), - new StepKey(phase, NAME, CleanupSnapshotStep.NAME), - new StepKey(phase, NAME, CreateSnapshotStep.NAME), - new StepKey(phase, NAME, WaitForDataTierStep.NAME), - new StepKey(phase, NAME, MountSnapshotStep.NAME), - new StepKey(phase, NAME, WaitForIndexColorStep.NAME), - new StepKey(phase, NAME, CopyExecutionStateStep.NAME), - new StepKey(phase, NAME, CopySettingsStep.NAME), - new StepKey(phase, NAME, SearchableSnapshotAction.CONDITIONAL_DATASTREAM_CHECK_KEY), - new StepKey(phase, NAME, ReplaceDataStreamBackingIndexStep.NAME), - new StepKey(phase, NAME, DeleteStep.NAME), - new StepKey(phase, NAME, SwapAliasesAndDeleteSourceIndexStep.NAME) - ); + ).filter(Objects::nonNull).toList(); } @Override @@ -172,8 +135,16 @@ protected Writeable.Reader instanceReader() { @Override protected SearchableSnapshotAction mutateInstance(SearchableSnapshotAction instance) { return switch (randomIntBetween(0, 2)) { - case 0 -> new SearchableSnapshotAction(randomAlphaOfLengthBetween(5, 10), instance.isForceMergeIndex()); - case 1 -> new SearchableSnapshotAction(instance.getSnapshotRepository(), instance.isForceMergeIndex() == false); + case 0 -> new SearchableSnapshotAction( + randomAlphaOfLengthBetween(5, 10), + instance.isForceMergeIndex(), + instance.getTotalShardsPerNode() + ); + case 1 -> new SearchableSnapshotAction( + instance.getSnapshotRepository(), + instance.isForceMergeIndex() == false, + instance.getTotalShardsPerNode() + ); case 2 -> new SearchableSnapshotAction( instance.getSnapshotRepository(), instance.isForceMergeIndex(), diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/WaitUntilTimeSeriesEndTimePassesStepTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/WaitUntilTimeSeriesEndTimePassesStepTests.java index 8ca6c0016a791..15bbbe7446429 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/WaitUntilTimeSeriesEndTimePassesStepTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/WaitUntilTimeSeriesEndTimePassesStepTests.java @@ -30,7 +30,7 @@ public class WaitUntilTimeSeriesEndTimePassesStepTests extends AbstractStepTestC protected WaitUntilTimeSeriesEndTimePassesStep createRandomInstance() { Step.StepKey stepKey = randomStepKey(); Step.StepKey nextStepKey = randomStepKey(); - return new WaitUntilTimeSeriesEndTimePassesStep(stepKey, nextStepKey, Instant::now, client); + return new WaitUntilTimeSeriesEndTimePassesStep(stepKey, nextStepKey, Instant::now); } @Override @@ -42,12 +42,12 @@ protected WaitUntilTimeSeriesEndTimePassesStep mutateInstance(WaitUntilTimeSerie case 0 -> key = new Step.StepKey(key.phase(), key.action(), key.name() + randomAlphaOfLength(5)); case 1 -> nextKey = new Step.StepKey(nextKey.phase(), nextKey.action(), nextKey.name() + randomAlphaOfLength(5)); } - return new WaitUntilTimeSeriesEndTimePassesStep(key, nextKey, Instant::now, client); + return new WaitUntilTimeSeriesEndTimePassesStep(key, nextKey, Instant::now); } @Override protected WaitUntilTimeSeriesEndTimePassesStep copyInstance(WaitUntilTimeSeriesEndTimePassesStep instance) { - return new WaitUntilTimeSeriesEndTimePassesStep(instance.getKey(), instance.getNextStepKey(), Instant::now, client); + return new WaitUntilTimeSeriesEndTimePassesStep(instance.getKey(), instance.getNextStepKey(), Instant::now); } public void testEvaluateCondition() { @@ -68,8 +68,7 @@ public void testEvaluateCondition() { WaitUntilTimeSeriesEndTimePassesStep step = new WaitUntilTimeSeriesEndTimePassesStep( randomStepKey(), randomStepKey(), - () -> currentTime, - client + () -> currentTime ); { // end_time has lapsed already so condition must be met From dcadb08b57bc65ac5f989ad60e18eac9fcce42c4 Mon Sep 17 00:00:00 2001 From: Keith Massey Date: Fri, 13 Dec 2024 13:37:07 -0600 Subject: [PATCH 2/7] Unmuting XPackRestTest migrate reindex tests (#118407) These were fixed by https://github.com/elastic/elasticsearch/pull/118382 Closes #118401 Closes #118272 Closes #118273 Closes #118274 --- muted-tests.yml | 12 ------------ 1 file changed, 12 deletions(-) diff --git a/muted-tests.yml b/muted-tests.yml index e0d011c1bf239..1255739e818be 100644 --- a/muted-tests.yml +++ b/muted-tests.yml @@ -294,18 +294,6 @@ tests: - class: org.elasticsearch.docker.test.DockerYmlTestSuiteIT method: test {p0=/11_nodes/Test cat nodes output} issue: https://github.com/elastic/elasticsearch/issues/118397 -- class: org.elasticsearch.xpack.test.rest.XPackRestIT - method: test {p0=migrate/20_reindex_status/Test get reindex status with nonexistent task id} - issue: https://github.com/elastic/elasticsearch/issues/118401 -- class: org.elasticsearch.xpack.test.rest.XPackRestIT - method: test {p0=migrate/10_reindex/Test Reindex With Nonexistent Data Stream} - issue: https://github.com/elastic/elasticsearch/issues/118274 -- class: org.elasticsearch.xpack.test.rest.XPackRestIT - method: test {p0=migrate/10_reindex/Test Reindex With Bad Data Stream Name} - issue: https://github.com/elastic/elasticsearch/issues/118272 -- class: org.elasticsearch.xpack.test.rest.XPackRestIT - method: test {p0=migrate/10_reindex/Test Reindex With Unsupported Mode} - issue: https://github.com/elastic/elasticsearch/issues/118273 - class: org.elasticsearch.xpack.security.operator.OperatorPrivilegesIT method: testEveryActionIsEitherOperatorOnlyOrNonOperator issue: https://github.com/elastic/elasticsearch/issues/118220 From 908cf9ab768c4854e51926f28b62fd2108c69aa7 Mon Sep 17 00:00:00 2001 From: Quentin Deschamps Date: Fri, 13 Dec 2024 20:54:48 +0100 Subject: [PATCH 3/7] Fix moving function linear weighted avg (#118516) Fix moving function linear weighted avg --- docs/changelog/118516.yaml | 6 ++++ modules/aggregations/build.gradle | 1 + .../test/aggregations/moving_fn.yml | 31 +++++++++++++------ .../action/search/SearchCapabilities.java | 3 ++ .../pipeline/MovingFunctions.java | 4 +-- .../MovFnWhitelistedFunctionTests.java | 2 +- 6 files changed, 34 insertions(+), 13 deletions(-) create mode 100644 docs/changelog/118516.yaml diff --git a/docs/changelog/118516.yaml b/docs/changelog/118516.yaml new file mode 100644 index 0000000000000..8a618a6d6cfd7 --- /dev/null +++ b/docs/changelog/118516.yaml @@ -0,0 +1,6 @@ +pr: 118435 +summary: Fix moving function linear weighted avg +area: Aggregations +type: bug +issues: + - 113751 diff --git a/modules/aggregations/build.gradle b/modules/aggregations/build.gradle index 94fdddf6d711a..3b5fb6ddecde9 100644 --- a/modules/aggregations/build.gradle +++ b/modules/aggregations/build.gradle @@ -48,4 +48,5 @@ tasks.named("yamlRestCompatTestTransform").configure({ task -> task.skipTest("aggregations/date_agg_per_day_of_week/Date aggregartion per day of week", "week-date behaviour has changed") task.skipTest("aggregations/time_series/Configure with no synthetic source", "temporary until backport") task.skipTest("aggregations/percentiles_hdr_metric/Negative values test", "returned exception has changed") + task.skipTest("aggregations/moving_fn/linearWeightedAvg", "math was wrong in previous versions") }) diff --git a/modules/aggregations/src/yamlRestTest/resources/rest-api-spec/test/aggregations/moving_fn.yml b/modules/aggregations/src/yamlRestTest/resources/rest-api-spec/test/aggregations/moving_fn.yml index cd6feb601b1df..3abad87d57907 100644 --- a/modules/aggregations/src/yamlRestTest/resources/rest-api-spec/test/aggregations/moving_fn.yml +++ b/modules/aggregations/src/yamlRestTest/resources/rest-api-spec/test/aggregations/moving_fn.yml @@ -255,6 +255,17 @@ linearWeightedAvg: - skip: features: close_to + - requires: + test_runner_features: [capabilities] + + - requires: + capabilities: + - method: POST + path: /_search + parameters: [method, path, parameters, capabilities] + capabilities: [moving_fn_right_math] + reason: "math not fixed yet" + - do: search: index: no_gaps @@ -275,11 +286,11 @@ linearWeightedAvg: - match: { hits.total.value: 6 } - length: { aggregations.@timestamp.buckets: 6 } - is_false: aggregations.@timestamp.buckets.0.d.value - - close_to: { aggregations.@timestamp.buckets.1.d.value: { value: 0.500, error: 0.0005 } } - - close_to: { aggregations.@timestamp.buckets.2.d.value: { value: 1.250, error: 0.0005 } } - - close_to: { aggregations.@timestamp.buckets.3.d.value: { value: 1.000, error: 0.0005 } } - - close_to: { aggregations.@timestamp.buckets.4.d.value: { value: 2.250, error: 0.0005 } } - - close_to: { aggregations.@timestamp.buckets.5.d.value: { value: 3.500, error: 0.0005 } } + - close_to: { aggregations.@timestamp.buckets.1.d.value: { value: 1.000, error: 0.0005 } } + - close_to: { aggregations.@timestamp.buckets.2.d.value: { value: 1.667, error: 0.0005 } } + - close_to: { aggregations.@timestamp.buckets.3.d.value: { value: 1.333, error: 0.0005 } } + - close_to: { aggregations.@timestamp.buckets.4.d.value: { value: 3.000, error: 0.0005 } } + - close_to: { aggregations.@timestamp.buckets.5.d.value: { value: 4.667, error: 0.0005 } } - do: search: @@ -301,11 +312,11 @@ linearWeightedAvg: - match: { hits.total.value: 6 } - length: { aggregations.@timestamp.buckets: 6 } - is_false: aggregations.@timestamp.buckets.0.d.value - - close_to: { aggregations.@timestamp.buckets.1.d.value: { value: 0.500, error: 0.0005 } } - - close_to: { aggregations.@timestamp.buckets.2.d.value: { value: 1.250, error: 0.0005 } } - - close_to: { aggregations.@timestamp.buckets.3.d.value: { value: 1.143, error: 0.0005 } } - - close_to: { aggregations.@timestamp.buckets.4.d.value: { value: 2.286, error: 0.0005 } } - - close_to: { aggregations.@timestamp.buckets.5.d.value: { value: 3.429, error: 0.0005 } } + - close_to: { aggregations.@timestamp.buckets.1.d.value: { value: 1.000, error: 0.0005 } } + - close_to: { aggregations.@timestamp.buckets.2.d.value: { value: 1.667, error: 0.0005 } } + - close_to: { aggregations.@timestamp.buckets.3.d.value: { value: 1.333, error: 0.0005 } } + - close_to: { aggregations.@timestamp.buckets.4.d.value: { value: 2.667, error: 0.0005 } } + - close_to: { aggregations.@timestamp.buckets.5.d.value: { value: 4.000, error: 0.0005 } } --- ewma: diff --git a/server/src/main/java/org/elasticsearch/rest/action/search/SearchCapabilities.java b/server/src/main/java/org/elasticsearch/rest/action/search/SearchCapabilities.java index 86304c8c4bde2..06f8f8f3c1be6 100644 --- a/server/src/main/java/org/elasticsearch/rest/action/search/SearchCapabilities.java +++ b/server/src/main/java/org/elasticsearch/rest/action/search/SearchCapabilities.java @@ -42,6 +42,8 @@ private SearchCapabilities() {} private static final String RANK_VECTORS_SCRIPT_ACCESS = "rank_vectors_script_access"; /** Initial support for rank-vectors maxSim functions access. */ private static final String RANK_VECTORS_SCRIPT_MAX_SIM = "rank_vectors_script_max_sim_with_bugfix"; + /** Fixed the math in {@code moving_fn}'s {@code linearWeightedAvg}. */ + private static final String MOVING_FN_RIGHT_MATH = "moving_fn_right_math"; private static final String RANDOM_SAMPLER_WITH_SCORED_SUBAGGS = "random_sampler_with_scored_subaggs"; private static final String OPTIMIZED_SCALAR_QUANTIZATION_BBQ = "optimized_scalar_quantization_bbq"; @@ -59,6 +61,7 @@ private SearchCapabilities() {} capabilities.add(RANDOM_SAMPLER_WITH_SCORED_SUBAGGS); capabilities.add(OPTIMIZED_SCALAR_QUANTIZATION_BBQ); capabilities.add(KNN_QUANTIZED_VECTOR_RESCORE); + capabilities.add(MOVING_FN_RIGHT_MATH); if (RankVectorsFieldMapper.FEATURE_FLAG.isEnabled()) { capabilities.add(RANK_VECTORS_FIELD_MAPPER); capabilities.add(RANK_VECTORS_SCRIPT_ACCESS); diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/MovingFunctions.java b/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/MovingFunctions.java index 02e3c76e5e793..46584c171d16c 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/MovingFunctions.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/MovingFunctions.java @@ -100,7 +100,7 @@ public static double stdDev(double[] values, double avg) { */ public static double linearWeightedAvg(double[] values) { double avg = 0; - long totalWeight = 1; + long totalWeight = 0; long current = 1; for (double v : values) { @@ -110,7 +110,7 @@ public static double linearWeightedAvg(double[] values) { current += 1; } } - return totalWeight == 1 ? Double.NaN : avg / totalWeight; + return totalWeight == 0 ? Double.NaN : avg / totalWeight; } /** diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/pipeline/MovFnWhitelistedFunctionTests.java b/server/src/test/java/org/elasticsearch/search/aggregations/pipeline/MovFnWhitelistedFunctionTests.java index 69173957aebab..3bc458880db0a 100644 --- a/server/src/test/java/org/elasticsearch/search/aggregations/pipeline/MovFnWhitelistedFunctionTests.java +++ b/server/src/test/java/org/elasticsearch/search/aggregations/pipeline/MovFnWhitelistedFunctionTests.java @@ -326,7 +326,7 @@ public void testLinearMovAvg() { } double avg = 0; - long totalWeight = 1; + long totalWeight = 0; long current = 1; for (double value : window) { From b4610c8e26f292f73545b659a0bcdba7022ad1d0 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Fri, 13 Dec 2024 21:28:11 +0100 Subject: [PATCH 4/7] Remove needless sending of OriginalIndices in SearchFreeContextRequest (#117245) We don't need to use this request, the handler for freeing of scroll requests literally goes to the same transport handler and doesn't come with the list of indices. The original security need for keeping the list of indices around is long gone. --- .../action/IndicesRequestIT.java | 13 +--- .../search/AbstractSearchAsyncAction.java | 6 +- .../action/search/DfsQueryPhase.java | 6 +- .../action/search/SearchPhase.java | 6 +- .../action/search/SearchTransportService.java | 70 ++----------------- .../AbstractSearchAsyncActionTests.java | 6 +- .../action/search/MockSearchPhaseContext.java | 2 +- .../action/search/SearchAsyncActionTests.java | 15 +++- .../test/ESSingleNodeTestCase.java | 4 +- .../xpack/security/authz/RBACEngine.java | 26 +++---- 10 files changed, 44 insertions(+), 110 deletions(-) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/action/IndicesRequestIT.java b/server/src/internalClusterTest/java/org/elasticsearch/action/IndicesRequestIT.java index 8bedf436e3698..f5860cedcd989 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/action/IndicesRequestIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/action/IndicesRequestIT.java @@ -556,11 +556,7 @@ public void testUpdateSettings() { } public void testSearchQueryThenFetch() throws Exception { - interceptTransportActions( - SearchTransportService.QUERY_ACTION_NAME, - SearchTransportService.FETCH_ID_ACTION_NAME, - SearchTransportService.FREE_CONTEXT_ACTION_NAME - ); + interceptTransportActions(SearchTransportService.QUERY_ACTION_NAME, SearchTransportService.FETCH_ID_ACTION_NAME); String[] randomIndicesOrAliases = randomIndicesOrAliases(); for (int i = 0; i < randomIndicesOrAliases.length; i++) { @@ -580,16 +576,13 @@ public void testSearchQueryThenFetch() throws Exception { SearchTransportService.QUERY_ACTION_NAME, SearchTransportService.FETCH_ID_ACTION_NAME ); - // free context messages are not necessarily sent, but if they are, check their indices - assertIndicesSubsetOptionalRequests(Arrays.asList(searchRequest.indices()), SearchTransportService.FREE_CONTEXT_ACTION_NAME); } public void testSearchDfsQueryThenFetch() throws Exception { interceptTransportActions( SearchTransportService.DFS_ACTION_NAME, SearchTransportService.QUERY_ID_ACTION_NAME, - SearchTransportService.FETCH_ID_ACTION_NAME, - SearchTransportService.FREE_CONTEXT_ACTION_NAME + SearchTransportService.FETCH_ID_ACTION_NAME ); String[] randomIndicesOrAliases = randomIndicesOrAliases(); @@ -611,8 +604,6 @@ public void testSearchDfsQueryThenFetch() throws Exception { SearchTransportService.QUERY_ID_ACTION_NAME, SearchTransportService.FETCH_ID_ACTION_NAME ); - // free context messages are not necessarily sent, but if they are, check their indices - assertIndicesSubsetOptionalRequests(Arrays.asList(searchRequest.indices()), SearchTransportService.FREE_CONTEXT_ACTION_NAME); } private static void assertSameIndices(IndicesRequest originalRequest, String... actions) { diff --git a/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java b/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java index 800193e258dba..47abfe266c524 100644 --- a/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java @@ -711,7 +711,7 @@ private void raisePhaseFailure(SearchPhaseExecutionException exception) { try { SearchShardTarget searchShardTarget = entry.getSearchShardTarget(); Transport.Connection connection = getConnection(searchShardTarget.getClusterAlias(), searchShardTarget.getNodeId()); - sendReleaseSearchContext(entry.getContextId(), connection, getOriginalIndices(entry.getShardIndex())); + sendReleaseSearchContext(entry.getContextId(), connection); } catch (Exception inner) { inner.addSuppressed(exception); logger.trace("failed to release context", inner); @@ -727,10 +727,10 @@ private void raisePhaseFailure(SearchPhaseExecutionException exception) { * @see org.elasticsearch.search.fetch.FetchSearchResult#getContextId() * */ - void sendReleaseSearchContext(ShardSearchContextId contextId, Transport.Connection connection, OriginalIndices originalIndices) { + void sendReleaseSearchContext(ShardSearchContextId contextId, Transport.Connection connection) { assert isPartOfPointInTime(contextId) == false : "Must not release point in time context [" + contextId + "]"; if (connection != null) { - searchTransportService.sendFreeContext(connection, contextId, originalIndices); + searchTransportService.sendFreeContext(connection, contextId, ActionListener.noop()); } } diff --git a/server/src/main/java/org/elasticsearch/action/search/DfsQueryPhase.java b/server/src/main/java/org/elasticsearch/action/search/DfsQueryPhase.java index 285dd0a22fd7e..cc8c4becea9a9 100644 --- a/server/src/main/java/org/elasticsearch/action/search/DfsQueryPhase.java +++ b/server/src/main/java/org/elasticsearch/action/search/DfsQueryPhase.java @@ -119,11 +119,7 @@ public void onFailure(Exception exception) { // the query might not have been executed at all (for example because thread pool rejected // execution) and the search context that was created in dfs phase might not be released. // release it again to be in the safe side - context.sendReleaseSearchContext( - querySearchRequest.contextId(), - connection, - context.getOriginalIndices(shardIndex) - ); + context.sendReleaseSearchContext(querySearchRequest.contextId(), connection); } } } diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchPhase.java b/server/src/main/java/org/elasticsearch/action/search/SearchPhase.java index 986f7210c0d1b..7d849a72abf9d 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchPhase.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchPhase.java @@ -97,11 +97,7 @@ protected static void releaseIrrelevantSearchContext(SearchPhaseResult searchPha context.getLogger().trace("trying to release search context [{}]", phaseResult.getContextId()); SearchShardTarget shardTarget = phaseResult.getSearchShardTarget(); Transport.Connection connection = context.getConnection(shardTarget.getClusterAlias(), shardTarget.getNodeId()); - context.sendReleaseSearchContext( - phaseResult.getContextId(), - connection, - context.getOriginalIndices(phaseResult.getShardIndex()) - ); + context.sendReleaseSearchContext(phaseResult.getContextId(), connection); } catch (Exception e) { context.getLogger().trace("failed to release context", e); } diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchTransportService.java b/server/src/main/java/org/elasticsearch/action/search/SearchTransportService.java index 8444a92b24432..cfc2e1bcdaf2b 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchTransportService.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchTransportService.java @@ -13,12 +13,10 @@ import org.apache.logging.log4j.Logger; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListenerResponseHandler; -import org.elasticsearch.action.IndicesRequest; import org.elasticsearch.action.OriginalIndices; import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest; import org.elasticsearch.action.admin.cluster.node.tasks.get.TransportGetTaskAction; import org.elasticsearch.action.support.ChannelActionListener; -import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.client.internal.OriginSettingClient; import org.elasticsearch.client.internal.node.NodeClient; import org.elasticsearch.cluster.node.DiscoveryNode; @@ -124,24 +122,6 @@ public SearchTransportService( this.responseWrapper = responseWrapper; } - private static final ActionListenerResponseHandler SEND_FREE_CONTEXT_LISTENER = - new ActionListenerResponseHandler<>( - ActionListener.noop(), - SearchFreeContextResponse::readFrom, - TransportResponseHandler.TRANSPORT_WORKER - ); - - public void sendFreeContext(Transport.Connection connection, final ShardSearchContextId contextId, OriginalIndices originalIndices) { - transportService.sendRequest( - connection, - FREE_CONTEXT_ACTION_NAME, - new SearchFreeContextRequest(originalIndices, contextId), - TransportRequestOptions.EMPTY, - // no need to respond if it was freed or not - SEND_FREE_CONTEXT_LISTENER - ); - } - public void sendFreeContext( Transport.Connection connection, ShardSearchContextId contextId, @@ -370,43 +350,6 @@ private static class ClearScrollContextsRequest extends TransportRequest { } } - static class SearchFreeContextRequest extends ScrollFreeContextRequest implements IndicesRequest { - private final OriginalIndices originalIndices; - - SearchFreeContextRequest(OriginalIndices originalIndices, ShardSearchContextId id) { - super(id); - this.originalIndices = originalIndices; - } - - SearchFreeContextRequest(StreamInput in) throws IOException { - super(in); - originalIndices = OriginalIndices.readOriginalIndices(in); - } - - @Override - public void writeTo(StreamOutput out) throws IOException { - super.writeTo(out); - OriginalIndices.writeOriginalIndices(originalIndices, out); - } - - @Override - public String[] indices() { - if (originalIndices == null) { - return null; - } - return originalIndices.indices(); - } - - @Override - public IndicesOptions indicesOptions() { - if (originalIndices == null) { - return null; - } - return originalIndices.indicesOptions(); - } - - } - public static class SearchFreeContextResponse extends TransportResponse { private static final SearchFreeContextResponse FREED = new SearchFreeContextResponse(true); @@ -456,12 +399,13 @@ public static void registerRequestHandler(TransportService transportService, Sea SearchFreeContextResponse::readFrom ); - transportService.registerRequestHandler( - FREE_CONTEXT_ACTION_NAME, - freeContextExecutor, - SearchFreeContextRequest::new, - freeContextHandler - ); + // TODO: remove this handler once the lowest compatible version stops using it + transportService.registerRequestHandler(FREE_CONTEXT_ACTION_NAME, freeContextExecutor, in -> { + var res = new ScrollFreeContextRequest(in); + // this handler exists for BwC purposes only, we don't need the original indices to free the context + OriginalIndices.readOriginalIndices(in); + return res; + }, freeContextHandler); TransportActionProxy.registerProxyAction(transportService, FREE_CONTEXT_ACTION_NAME, false, SearchFreeContextResponse::readFrom); transportService.registerRequestHandler( diff --git a/server/src/test/java/org/elasticsearch/action/search/AbstractSearchAsyncActionTests.java b/server/src/test/java/org/elasticsearch/action/search/AbstractSearchAsyncActionTests.java index 725a4583d104a..71bf2a47cfa47 100644 --- a/server/src/test/java/org/elasticsearch/action/search/AbstractSearchAsyncActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/AbstractSearchAsyncActionTests.java @@ -112,11 +112,7 @@ long buildTookInMillis() { } @Override - public void sendReleaseSearchContext( - ShardSearchContextId contextId, - Transport.Connection connection, - OriginalIndices originalIndices - ) { + public void sendReleaseSearchContext(ShardSearchContextId contextId, Transport.Connection connection) { releasedContexts.add(contextId); } diff --git a/server/src/test/java/org/elasticsearch/action/search/MockSearchPhaseContext.java b/server/src/test/java/org/elasticsearch/action/search/MockSearchPhaseContext.java index 7a38858d8477a..cf65d756811ad 100644 --- a/server/src/test/java/org/elasticsearch/action/search/MockSearchPhaseContext.java +++ b/server/src/test/java/org/elasticsearch/action/search/MockSearchPhaseContext.java @@ -155,7 +155,7 @@ protected void executePhaseOnShard( } @Override - public void sendReleaseSearchContext(ShardSearchContextId contextId, Transport.Connection connection, OriginalIndices originalIndices) { + public void sendReleaseSearchContext(ShardSearchContextId contextId, Transport.Connection connection) { releasedSearchContexts.add(contextId); } diff --git a/server/src/test/java/org/elasticsearch/action/search/SearchAsyncActionTests.java b/server/src/test/java/org/elasticsearch/action/search/SearchAsyncActionTests.java index b4ddd48172d01..2361beb7ad036 100644 --- a/server/src/test/java/org/elasticsearch/action/search/SearchAsyncActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/SearchAsyncActionTests.java @@ -296,7 +296,11 @@ public void testFanOutAndCollect() throws InterruptedException { AtomicInteger numFreedContext = new AtomicInteger(); SearchTransportService transportService = new SearchTransportService(null, null, null) { @Override - public void sendFreeContext(Transport.Connection connection, ShardSearchContextId contextId, OriginalIndices originalIndices) { + public void sendFreeContext( + Transport.Connection connection, + ShardSearchContextId contextId, + ActionListener listener + ) { numFreedContext.incrementAndGet(); assertTrue(nodeToContextMap.containsKey(connection.getNode())); assertTrue(nodeToContextMap.get(connection.getNode()).remove(contextId)); @@ -363,7 +367,7 @@ public void run() { for (int i = 0; i < results.getNumShards(); i++) { TestSearchPhaseResult result = results.getAtomicArray().get(i); assertEquals(result.node.getId(), result.getSearchShardTarget().getNodeId()); - sendReleaseSearchContext(result.getContextId(), new MockConnection(result.node), OriginalIndices.NONE); + sendReleaseSearchContext(result.getContextId(), new MockConnection(result.node)); } responseListener.onResponse(testResponse); if (latchTriggered.compareAndSet(false, true) == false) { @@ -421,8 +425,13 @@ public void testFanOutAndFail() throws InterruptedException { ); AtomicInteger numFreedContext = new AtomicInteger(); SearchTransportService transportService = new SearchTransportService(null, null, null) { + @Override - public void sendFreeContext(Transport.Connection connection, ShardSearchContextId contextId, OriginalIndices originalIndices) { + public void sendFreeContext( + Transport.Connection connection, + ShardSearchContextId contextId, + ActionListener listener + ) { assertNotNull(contextId); numFreedContext.incrementAndGet(); assertTrue(nodeToContextMap.containsKey(connection.getNode())); diff --git a/test/framework/src/main/java/org/elasticsearch/test/ESSingleNodeTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/ESSingleNodeTestCase.java index 459d5573d7c12..63334bd70306f 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/ESSingleNodeTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/ESSingleNodeTestCase.java @@ -78,7 +78,7 @@ import java.util.stream.Collectors; import java.util.stream.Stream; -import static org.elasticsearch.action.search.SearchTransportService.FREE_CONTEXT_ACTION_NAME; +import static org.elasticsearch.action.search.SearchTransportService.FREE_CONTEXT_SCROLL_ACTION_NAME; import static org.elasticsearch.cluster.coordination.ClusterBootstrapService.INITIAL_MASTER_NODES_SETTING; import static org.elasticsearch.discovery.SettingsBasedSeedHostsProvider.DISCOVERY_SEED_HOSTS_SETTING; import static org.elasticsearch.test.NodeRoles.dataNode; @@ -482,7 +482,7 @@ protected void ensureNoInitializingShards() { */ protected void ensureAllFreeContextActionsAreConsumed() throws Exception { logger.info("--> waiting for all free_context tasks to complete within a reasonable time"); - safeGet(clusterAdmin().prepareListTasks().setActions(FREE_CONTEXT_ACTION_NAME + "*").setWaitForCompletion(true).execute()); + safeGet(clusterAdmin().prepareListTasks().setActions(FREE_CONTEXT_SCROLL_ACTION_NAME + "*").setWaitForCompletion(true).execute()); } /** diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/RBACEngine.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/RBACEngine.java index fa6187798da25..2353d710059ff 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/RBACEngine.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/RBACEngine.java @@ -129,6 +129,19 @@ public class RBACEngine implements AuthorizationEngine { private static final String DELETE_SUB_REQUEST_REPLICA = TransportDeleteAction.NAME + "[r]"; private static final Logger logger = LogManager.getLogger(RBACEngine.class); + + private static final Set SCROLL_RELATED_ACTIONS = Set.of( + TransportSearchScrollAction.TYPE.name(), + SearchTransportService.FETCH_ID_SCROLL_ACTION_NAME, + SearchTransportService.QUERY_FETCH_SCROLL_ACTION_NAME, + SearchTransportService.QUERY_SCROLL_ACTION_NAME, + SearchTransportService.FREE_CONTEXT_ACTION_NAME, + SearchTransportService.FREE_CONTEXT_SCROLL_ACTION_NAME, + TransportClearScrollAction.NAME, + "indices:data/read/sql/close_cursor", + SearchTransportService.CLEAR_SCROLL_CONTEXTS_ACTION_NAME + ); + private final Settings settings; private final CompositeRolesStore rolesStore; private final FieldPermissionsCache fieldPermissionsCache; @@ -320,7 +333,7 @@ public void authorizeIndexAction( // need to validate that the action is allowed and then move on listener.onResponse(role.checkIndicesAction(action) ? IndexAuthorizationResult.EMPTY : IndexAuthorizationResult.DENIED); } else if (request instanceof IndicesRequest == false) { - if (isScrollRelatedAction(action)) { + if (SCROLL_RELATED_ACTIONS.contains(action)) { // scroll is special // some APIs are indices requests that are not actually associated with indices. For example, // search scroll request, is categorized under the indices context, but doesn't hold indices names @@ -1000,17 +1013,6 @@ public int hashCode() { } } - private static boolean isScrollRelatedAction(String action) { - return action.equals(TransportSearchScrollAction.TYPE.name()) - || action.equals(SearchTransportService.FETCH_ID_SCROLL_ACTION_NAME) - || action.equals(SearchTransportService.QUERY_FETCH_SCROLL_ACTION_NAME) - || action.equals(SearchTransportService.QUERY_SCROLL_ACTION_NAME) - || action.equals(SearchTransportService.FREE_CONTEXT_SCROLL_ACTION_NAME) - || action.equals(TransportClearScrollAction.NAME) - || action.equals("indices:data/read/sql/close_cursor") - || action.equals(SearchTransportService.CLEAR_SCROLL_CONTEXTS_ACTION_NAME); - } - private static boolean isAsyncRelatedAction(String action) { return action.equals(SubmitAsyncSearchAction.NAME) || action.equals(GetAsyncSearchAction.NAME) From 0a6ce27825d15a7e8294a99ee1aca52eb05f8ae0 Mon Sep 17 00:00:00 2001 From: Parker Timmins Date: Fri, 13 Dec 2024 15:12:25 -0600 Subject: [PATCH 5/7] Add ReindexDatastreamIndexAction (#116996) Add an action to reindex a single index from a source index to a destination index. Unlike the reindex action, this action copies settings and mappings from the source index to the dest index before performing the reindex. This action is part of work to reindex data streams and will be called on each of the backing indices within a data stream. --- docs/changelog/116996.yaml | 5 + .../metadata/MetadataCreateIndexService.java | 48 ++- .../action/ReindexDatastreamIndexIT.java | 408 ++++++++++++++++++ .../xpack/migrate/MigratePlugin.java | 3 + .../action/ReindexDataStreamAction.java | 2 +- .../action/ReindexDataStreamIndexAction.java | 121 ++++++ ...ReindexDataStreamIndexTransportAction.java | 211 +++++++++ .../ReindexDatastreamIndexRequestTests.java | 29 ++ .../ReindexDatastreamIndexResponseTests.java | 29 ++ .../xpack/security/operator/Constants.java | 1 + 10 files changed, 841 insertions(+), 16 deletions(-) create mode 100644 docs/changelog/116996.yaml create mode 100644 x-pack/plugin/migrate/src/internalClusterTest/java/org/elasticsearch/xpack/migrate/action/ReindexDatastreamIndexIT.java create mode 100644 x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamIndexAction.java create mode 100644 x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamIndexTransportAction.java create mode 100644 x-pack/plugin/migrate/src/test/java/org/elasticsearch/xpack/migrate/action/ReindexDatastreamIndexRequestTests.java create mode 100644 x-pack/plugin/migrate/src/test/java/org/elasticsearch/xpack/migrate/action/ReindexDatastreamIndexResponseTests.java diff --git a/docs/changelog/116996.yaml b/docs/changelog/116996.yaml new file mode 100644 index 0000000000000..59f59355131bf --- /dev/null +++ b/docs/changelog/116996.yaml @@ -0,0 +1,5 @@ +pr: 116996 +summary: Initial work on `ReindexDatastreamIndexAction` +area: Data streams +type: enhancement +issues: [] diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataCreateIndexService.java b/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataCreateIndexService.java index 75c2c06f36c8e..a99822c9de00c 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataCreateIndexService.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataCreateIndexService.java @@ -1675,23 +1675,11 @@ static void prepareResizeIndexSettings( throw new IllegalStateException("unknown resize type is " + type); } - final Settings.Builder builder = Settings.builder(); + final Settings.Builder builder; if (copySettings) { - // copy all settings and non-copyable settings and settings that have already been set (e.g., from the request) - for (final String key : sourceMetadata.getSettings().keySet()) { - final Setting setting = indexScopedSettings.get(key); - if (setting == null) { - assert indexScopedSettings.isPrivateSetting(key) : key; - } else if (setting.getProperties().contains(Setting.Property.NotCopyableOnResize)) { - continue; - } - // do not override settings that have already been set (for example, from the request) - if (indexSettingsBuilder.keys().contains(key)) { - continue; - } - builder.copy(key, sourceMetadata.getSettings()); - } + builder = copySettingsFromSource(true, sourceMetadata.getSettings(), indexScopedSettings, indexSettingsBuilder); } else { + builder = Settings.builder(); final Predicate sourceSettingsPredicate = (s) -> (s.startsWith("index.similarity.") || s.startsWith("index.analysis.") || s.startsWith("index.sort.") @@ -1709,6 +1697,36 @@ static void prepareResizeIndexSettings( } } + public static Settings.Builder copySettingsFromSource( + boolean copyPrivateSettings, + Settings sourceSettings, + IndexScopedSettings indexScopedSettings, + Settings.Builder indexSettingsBuilder + ) { + final Settings.Builder builder = Settings.builder(); + for (final String key : sourceSettings.keySet()) { + final Setting setting = indexScopedSettings.get(key); + if (setting == null) { + assert indexScopedSettings.isPrivateSetting(key) : key; + if (copyPrivateSettings == false) { + continue; + } + } else if (setting.getProperties().contains(Setting.Property.NotCopyableOnResize)) { + continue; + } else if (setting.isPrivateIndex()) { + if (copyPrivateSettings == false) { + continue; + } + } + // do not override settings that have already been set (for example, from the request) + if (indexSettingsBuilder.keys().contains(key)) { + continue; + } + builder.copy(key, sourceSettings); + } + return builder; + } + /** * Returns a default number of routing shards based on the number of shards of the index. The default number of routing shards will * allow any index to be split at least once and at most 10 times by a factor of two. The closer the number or shards gets to 1024 diff --git a/x-pack/plugin/migrate/src/internalClusterTest/java/org/elasticsearch/xpack/migrate/action/ReindexDatastreamIndexIT.java b/x-pack/plugin/migrate/src/internalClusterTest/java/org/elasticsearch/xpack/migrate/action/ReindexDatastreamIndexIT.java new file mode 100644 index 0000000000000..e492f035da866 --- /dev/null +++ b/x-pack/plugin/migrate/src/internalClusterTest/java/org/elasticsearch/xpack/migrate/action/ReindexDatastreamIndexIT.java @@ -0,0 +1,408 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.migrate.action; + +import org.elasticsearch.action.DocWriteRequest; +import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; +import org.elasticsearch.action.admin.indices.get.GetIndexRequest; +import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsRequest; +import org.elasticsearch.action.admin.indices.refresh.RefreshRequest; +import org.elasticsearch.action.admin.indices.rollover.RolloverRequest; +import org.elasticsearch.action.admin.indices.settings.get.GetSettingsRequest; +import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest; +import org.elasticsearch.action.admin.indices.template.put.TransportPutComposableIndexTemplateAction; +import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.cluster.block.ClusterBlockException; +import org.elasticsearch.cluster.metadata.ComposableIndexTemplate; +import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.metadata.MappingMetadata; +import org.elasticsearch.cluster.metadata.Template; +import org.elasticsearch.common.compress.CompressedXContent; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.time.DateFormatter; +import org.elasticsearch.common.time.FormatNames; +import org.elasticsearch.common.xcontent.support.XContentMapValues; +import org.elasticsearch.datastreams.DataStreamsPlugin; +import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.index.mapper.DateFieldMapper; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.reindex.ReindexPlugin; +import org.elasticsearch.test.ESIntegTestCase; +import org.elasticsearch.test.transport.MockTransportService; +import org.elasticsearch.xcontent.XContentType; +import org.elasticsearch.xpack.migrate.MigratePlugin; + +import java.io.IOException; +import java.time.Instant; +import java.util.Collection; +import java.util.List; +import java.util.Locale; +import java.util.Map; + +import static org.elasticsearch.cluster.metadata.MetadataIndexTemplateService.DEFAULT_TIMESTAMP_FIELD; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; +import static org.elasticsearch.xcontent.XContentFactory.jsonBuilder; +import static org.elasticsearch.xpack.migrate.action.ReindexDataStreamAction.REINDEX_DATA_STREAM_FEATURE_FLAG; +import static org.hamcrest.Matchers.equalTo; + +public class ReindexDatastreamIndexIT extends ESIntegTestCase { + + private static final String MAPPING = """ + { + "_doc":{ + "dynamic":"strict", + "properties":{ + "foo1":{ + "type":"text" + } + } + } + } + """; + + @Override + protected Collection> nodePlugins() { + return List.of(MigratePlugin.class, ReindexPlugin.class, MockTransportService.TestPlugin.class, DataStreamsPlugin.class); + } + + public void testDestIndexDeletedIfExists() throws Exception { + assumeTrue("requires the migration reindex feature flag", REINDEX_DATA_STREAM_FEATURE_FLAG.isEnabled()); + + // empty source index + var sourceIndex = randomAlphaOfLength(20).toLowerCase(Locale.ROOT); + indicesAdmin().create(new CreateIndexRequest(sourceIndex)).get(); + + // dest index with docs + var destIndex = ReindexDataStreamIndexTransportAction.generateDestIndexName(sourceIndex); + indicesAdmin().create(new CreateIndexRequest(destIndex)).actionGet(); + indexDocs(destIndex, 10); + assertHitCount(prepareSearch(destIndex).setSize(0), 10); + + // call reindex + client().execute(ReindexDataStreamIndexAction.INSTANCE, new ReindexDataStreamIndexAction.Request(sourceIndex)).actionGet(); + + // verify that dest still exists, but is now empty + assertTrue(indexExists(destIndex)); + assertHitCount(prepareSearch(destIndex).setSize(0), 0); + } + + public void testDestIndexNameSet() throws Exception { + assumeTrue("requires the migration reindex feature flag", REINDEX_DATA_STREAM_FEATURE_FLAG.isEnabled()); + + var sourceIndex = randomAlphaOfLength(20).toLowerCase(Locale.ROOT); + indicesAdmin().create(new CreateIndexRequest(sourceIndex)).get(); + + // call reindex + var response = client().execute(ReindexDataStreamIndexAction.INSTANCE, new ReindexDataStreamIndexAction.Request(sourceIndex)) + .actionGet(); + + var expectedDestIndexName = ReindexDataStreamIndexTransportAction.generateDestIndexName(sourceIndex); + assertEquals(expectedDestIndexName, response.getDestIndex()); + } + + public void testDestIndexContainsDocs() throws Exception { + assumeTrue("requires the migration reindex feature flag", REINDEX_DATA_STREAM_FEATURE_FLAG.isEnabled()); + + // source index with docs + var numDocs = randomIntBetween(1, 100); + var sourceIndex = randomAlphaOfLength(20).toLowerCase(Locale.ROOT); + indicesAdmin().create(new CreateIndexRequest(sourceIndex)).get(); + indexDocs(sourceIndex, numDocs); + + // call reindex + var response = client().execute(ReindexDataStreamIndexAction.INSTANCE, new ReindexDataStreamIndexAction.Request(sourceIndex)) + .actionGet(); + indicesAdmin().refresh(new RefreshRequest(response.getDestIndex())).actionGet(); + + // verify that dest contains docs + assertHitCount(prepareSearch(response.getDestIndex()).setSize(0), numDocs); + } + + public void testSetSourceToReadOnly() throws Exception { + assumeTrue("requires the migration reindex feature flag", REINDEX_DATA_STREAM_FEATURE_FLAG.isEnabled()); + + // empty source index + var sourceIndex = randomAlphaOfLength(20).toLowerCase(Locale.ROOT); + indicesAdmin().create(new CreateIndexRequest(sourceIndex)).get(); + + // call reindex + client().execute(ReindexDataStreamIndexAction.INSTANCE, new ReindexDataStreamIndexAction.Request(sourceIndex)).actionGet(); + + // assert that write to source fails + var indexReq = new IndexRequest(sourceIndex).source(jsonBuilder().startObject().field("field", "1").endObject()); + assertThrows(ClusterBlockException.class, () -> client().index(indexReq).actionGet()); + assertHitCount(prepareSearch(sourceIndex).setSize(0), 0); + } + + public void testSettingsAddedBeforeReindex() throws Exception { + assumeTrue("requires the migration reindex feature flag", REINDEX_DATA_STREAM_FEATURE_FLAG.isEnabled()); + + // start with a static setting + var numShards = randomIntBetween(1, 10); + var staticSettings = Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, numShards).build(); + var sourceIndex = randomAlphaOfLength(20).toLowerCase(Locale.ROOT); + indicesAdmin().create(new CreateIndexRequest(sourceIndex, staticSettings)).get(); + + // update with a dynamic setting + var numReplicas = randomIntBetween(0, 10); + var dynamicSettings = Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, numReplicas).build(); + indicesAdmin().updateSettings(new UpdateSettingsRequest(dynamicSettings, sourceIndex)).actionGet(); + + // call reindex + var destIndex = client().execute(ReindexDataStreamIndexAction.INSTANCE, new ReindexDataStreamIndexAction.Request(sourceIndex)) + .actionGet() + .getDestIndex(); + + // assert both static and dynamic settings set on dest index + var settingsResponse = indicesAdmin().getSettings(new GetSettingsRequest().indices(destIndex)).actionGet(); + assertEquals(numReplicas, Integer.parseInt(settingsResponse.getSetting(destIndex, IndexMetadata.SETTING_NUMBER_OF_REPLICAS))); + assertEquals(numShards, Integer.parseInt(settingsResponse.getSetting(destIndex, IndexMetadata.SETTING_NUMBER_OF_SHARDS))); + } + + public void testMappingsAddedToDestIndex() throws Exception { + assumeTrue("requires the migration reindex feature flag", REINDEX_DATA_STREAM_FEATURE_FLAG.isEnabled()); + + var sourceIndex = randomAlphaOfLength(20).toLowerCase(Locale.ROOT); + String mapping = """ + { + "_doc":{ + "dynamic":"strict", + "properties":{ + "foo1":{ + "type":"text" + } + } + } + } + """; + indicesAdmin().create(new CreateIndexRequest(sourceIndex).mapping(mapping)).actionGet(); + + // call reindex + var destIndex = client().execute(ReindexDataStreamIndexAction.INSTANCE, new ReindexDataStreamIndexAction.Request(sourceIndex)) + .actionGet() + .getDestIndex(); + + var mappingsResponse = indicesAdmin().getMappings(new GetMappingsRequest().indices(sourceIndex, destIndex)).actionGet(); + Map mappings = mappingsResponse.mappings(); + var destMappings = mappings.get(destIndex).sourceAsMap(); + var sourceMappings = mappings.get(sourceIndex).sourceAsMap(); + + assertEquals(sourceMappings, destMappings); + // sanity check specific value from dest mapping + assertEquals("text", XContentMapValues.extractValue("properties.foo1.type", destMappings)); + } + + public void testReadOnlyAddedBack() { + assumeTrue("requires the migration reindex feature flag", REINDEX_DATA_STREAM_FEATURE_FLAG.isEnabled()); + + // Create source index with read-only and/or block-writes + var sourceIndex = randomAlphaOfLength(20).toLowerCase(Locale.ROOT); + boolean isReadOnly = randomBoolean(); + boolean isBlockWrites = randomBoolean(); + var settings = Settings.builder() + .put(IndexMetadata.SETTING_READ_ONLY, isReadOnly) + .put(IndexMetadata.SETTING_BLOCKS_WRITE, isBlockWrites) + .build(); + indicesAdmin().create(new CreateIndexRequest(sourceIndex, settings)).actionGet(); + + // call reindex + var destIndex = client().execute(ReindexDataStreamIndexAction.INSTANCE, new ReindexDataStreamIndexAction.Request(sourceIndex)) + .actionGet() + .getDestIndex(); + + // assert read-only settings added back to dest index + var settingsResponse = indicesAdmin().getSettings(new GetSettingsRequest().indices(destIndex)).actionGet(); + assertEquals(isReadOnly, Boolean.parseBoolean(settingsResponse.getSetting(destIndex, IndexMetadata.SETTING_READ_ONLY))); + assertEquals(isBlockWrites, Boolean.parseBoolean(settingsResponse.getSetting(destIndex, IndexMetadata.SETTING_BLOCKS_WRITE))); + + removeReadOnly(sourceIndex); + removeReadOnly(destIndex); + } + + public void testSettingsAndMappingsFromTemplate() throws IOException { + assumeTrue("requires the migration reindex feature flag", REINDEX_DATA_STREAM_FEATURE_FLAG.isEnabled()); + + var numShards = randomIntBetween(1, 10); + var numReplicas = randomIntBetween(0, 10); + + var settings = Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, numShards) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, numReplicas) + .build(); + + // Create template with settings and mappings + var template = ComposableIndexTemplate.builder() + .indexPatterns(List.of("logs-*")) + .template(new Template(settings, CompressedXContent.fromJSON(MAPPING), null)) + .build(); + var request = new TransportPutComposableIndexTemplateAction.Request("logs-template"); + request.indexTemplate(template); + client().execute(TransportPutComposableIndexTemplateAction.TYPE, request).actionGet(); + + var sourceIndex = "logs-" + randomAlphaOfLength(20).toLowerCase(Locale.ROOT); + indicesAdmin().create(new CreateIndexRequest(sourceIndex)).actionGet(); + + // call reindex + var destIndex = client().execute(ReindexDataStreamIndexAction.INSTANCE, new ReindexDataStreamIndexAction.Request(sourceIndex)) + .actionGet() + .getDestIndex(); + + // verify settings from templates copied to dest index + { + var settingsResponse = indicesAdmin().getSettings(new GetSettingsRequest().indices(destIndex)).actionGet(); + assertEquals(numReplicas, Integer.parseInt(settingsResponse.getSetting(destIndex, IndexMetadata.SETTING_NUMBER_OF_REPLICAS))); + assertEquals(numShards, Integer.parseInt(settingsResponse.getSetting(destIndex, IndexMetadata.SETTING_NUMBER_OF_SHARDS))); + } + + // verify mappings from templates copied to dest index + { + var mappingsResponse = indicesAdmin().getMappings(new GetMappingsRequest().indices(sourceIndex, destIndex)).actionGet(); + var destMappings = mappingsResponse.mappings().get(destIndex).sourceAsMap(); + var sourceMappings = mappingsResponse.mappings().get(sourceIndex).sourceAsMap(); + assertEquals(sourceMappings, destMappings); + // sanity check specific value from dest mapping + assertEquals("text", XContentMapValues.extractValue("properties.foo1.type", destMappings)); + } + } + + private static final String TSDB_MAPPING = """ + { + "_doc":{ + "properties": { + "@timestamp" : { + "type": "date" + }, + "metricset": { + "type": "keyword", + "time_series_dimension": true + } + } + } + }"""; + + private static final String TSDB_DOC = """ + { + "@timestamp": "$time", + "metricset": "pod", + "k8s": { + "pod": { + "name": "dog", + "uid":"df3145b3-0563-4d3b-a0f7-897eb2876ea9", + "ip": "10.10.55.3", + "network": { + "tx": 1434595272, + "rx": 530605511 + } + } + } + } + """; + + public void testTsdbStartEndSet() throws Exception { + assumeTrue("requires the migration reindex feature flag", REINDEX_DATA_STREAM_FEATURE_FLAG.isEnabled()); + + var templateSettings = Settings.builder().put("index.mode", "time_series"); + if (randomBoolean()) { + templateSettings.put("index.routing_path", "metricset"); + } + var mapping = new CompressedXContent(TSDB_MAPPING); + + // create template + var request = new TransportPutComposableIndexTemplateAction.Request("id"); + request.indexTemplate( + ComposableIndexTemplate.builder() + .indexPatterns(List.of("k8s*")) + .template(new Template(templateSettings.build(), mapping, null)) + .dataStreamTemplate(new ComposableIndexTemplate.DataStreamTemplate(false, false)) + .build() + ); + client().execute(TransportPutComposableIndexTemplateAction.TYPE, request).actionGet(); + + // index doc + Instant time = Instant.now(); + String backingIndexName; + { + var indexRequest = new IndexRequest("k8s").opType(DocWriteRequest.OpType.CREATE); + indexRequest.source(TSDB_DOC.replace("$time", formatInstant(time)), XContentType.JSON); + var indexResponse = client().index(indexRequest).actionGet(); + backingIndexName = indexResponse.getIndex(); + } + + var sourceSettings = indicesAdmin().getIndex(new GetIndexRequest().indices(backingIndexName)) + .actionGet() + .getSettings() + .get(backingIndexName); + Instant startTime = IndexSettings.TIME_SERIES_START_TIME.get(sourceSettings); + Instant endTime = IndexSettings.TIME_SERIES_END_TIME.get(sourceSettings); + + // sanity check start/end time on source + assertNotNull(startTime); + assertNotNull(endTime); + assertTrue(endTime.isAfter(startTime)); + + // force a rollover so can call reindex and delete + var rolloverRequest = new RolloverRequest("k8s", null); + var rolloverResponse = indicesAdmin().rolloverIndex(rolloverRequest).actionGet(); + rolloverResponse.getNewIndex(); + + // call reindex on the original backing index + var destIndex = client().execute(ReindexDataStreamIndexAction.INSTANCE, new ReindexDataStreamIndexAction.Request(backingIndexName)) + .actionGet() + .getDestIndex(); + + var destSettings = indicesAdmin().getIndex(new GetIndexRequest().indices(destIndex)).actionGet().getSettings().get(destIndex); + var destStart = IndexSettings.TIME_SERIES_START_TIME.get(destSettings); + var destEnd = IndexSettings.TIME_SERIES_END_TIME.get(destSettings); + + assertEquals(startTime, destStart); + assertEquals(endTime, destEnd); + } + + // TODO more logsdb/tsdb specific tests + // TODO more data stream specific tests (how are data streams indices are different from regular indices?) + // TODO check other IndexMetadata fields that need to be fixed after the fact + // TODO what happens if don't have necessary perms for a given index? + + private static void removeReadOnly(String index) { + var settings = Settings.builder() + .put(IndexMetadata.SETTING_READ_ONLY, false) + .put(IndexMetadata.SETTING_BLOCKS_WRITE, false) + .build(); + assertAcked(indicesAdmin().updateSettings(new UpdateSettingsRequest(settings, index)).actionGet()); + } + + private static void indexDocs(String index, int numDocs) { + BulkRequest bulkRequest = new BulkRequest(); + for (int i = 0; i < numDocs; i++) { + String value = DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.formatMillis(System.currentTimeMillis()); + bulkRequest.add( + new IndexRequest(index).opType(DocWriteRequest.OpType.CREATE) + .id(i + "") + .source(String.format(Locale.ROOT, "{\"%s\":\"%s\"}", DEFAULT_TIMESTAMP_FIELD, value), XContentType.JSON) + ); + } + BulkResponse bulkResponse = client().bulk(bulkRequest).actionGet(); + assertThat(bulkResponse.getItems().length, equalTo(numDocs)); + indicesAdmin().refresh(new RefreshRequest(index)).actionGet(); + } + + private static String formatInstant(Instant instant) { + return DateFormatter.forPattern(FormatNames.STRICT_DATE_OPTIONAL_TIME.getName()).format(instant); + } + + private static String getIndexUUID(String index) { + return indicesAdmin().getIndex(new GetIndexRequest().indices(index)) + .actionGet() + .getSettings() + .get(index) + .get(IndexMetadata.SETTING_INDEX_UUID); + } +} diff --git a/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/MigratePlugin.java b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/MigratePlugin.java index 26f8e57102a4d..f42d05727b9fd 100644 --- a/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/MigratePlugin.java +++ b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/MigratePlugin.java @@ -37,6 +37,8 @@ import org.elasticsearch.xpack.migrate.action.GetMigrationReindexStatusAction; import org.elasticsearch.xpack.migrate.action.GetMigrationReindexStatusTransportAction; import org.elasticsearch.xpack.migrate.action.ReindexDataStreamAction; +import org.elasticsearch.xpack.migrate.action.ReindexDataStreamIndexAction; +import org.elasticsearch.xpack.migrate.action.ReindexDataStreamIndexTransportAction; import org.elasticsearch.xpack.migrate.action.ReindexDataStreamTransportAction; import org.elasticsearch.xpack.migrate.rest.RestCancelReindexDataStreamAction; import org.elasticsearch.xpack.migrate.rest.RestGetMigrationReindexStatusAction; @@ -84,6 +86,7 @@ public List getRestHandlers( actions.add(new ActionHandler<>(ReindexDataStreamAction.INSTANCE, ReindexDataStreamTransportAction.class)); actions.add(new ActionHandler<>(GetMigrationReindexStatusAction.INSTANCE, GetMigrationReindexStatusTransportAction.class)); actions.add(new ActionHandler<>(CancelReindexDataStreamAction.INSTANCE, CancelReindexDataStreamTransportAction.class)); + actions.add(new ActionHandler<>(ReindexDataStreamIndexAction.INSTANCE, ReindexDataStreamIndexTransportAction.class)); } return actions; } diff --git a/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamAction.java b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamAction.java index 39d4170f6e712..b10bea9e54230 100644 --- a/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamAction.java +++ b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamAction.java @@ -47,7 +47,7 @@ public class ReindexDataStreamAction extends ActionType { + + public static final String NAME = "indices:admin/data_stream/index/reindex"; + + public static final ActionType INSTANCE = new ReindexDataStreamIndexAction(); + + private ReindexDataStreamIndexAction() { + super(NAME); + } + + public static class Request extends ActionRequest implements IndicesRequest { + + private final String sourceIndex; + + public Request(String sourceIndex) { + this.sourceIndex = sourceIndex; + } + + public Request(StreamInput in) throws IOException { + super(in); + this.sourceIndex = in.readString(); + } + + public static Request readFrom(StreamInput in) throws IOException { + return new Request(in); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeString(sourceIndex); + } + + @Override + public ActionRequestValidationException validate() { + return null; + } + + public String getSourceIndex() { + return sourceIndex; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + Request request = (Request) o; + return Objects.equals(sourceIndex, request.sourceIndex); + } + + @Override + public int hashCode() { + return Objects.hash(sourceIndex); + } + + @Override + public String[] indices() { + return new String[] { sourceIndex }; + } + + @Override + public IndicesOptions indicesOptions() { + return IndicesOptions.strictSingleIndexNoExpandForbidClosed(); + } + } + + public static class Response extends ActionResponse { + private final String destIndex; + + public Response(String destIndex) { + this.destIndex = destIndex; + } + + public Response(StreamInput in) throws IOException { + super(in); + this.destIndex = in.readString(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeString(destIndex); + } + + public String getDestIndex() { + return destIndex; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + Response response = (Response) o; + return Objects.equals(destIndex, response.destIndex); + } + + @Override + public int hashCode() { + return Objects.hash(destIndex); + } + } +} diff --git a/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamIndexTransportAction.java b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamIndexTransportAction.java new file mode 100644 index 0000000000000..8863c45691c92 --- /dev/null +++ b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamIndexTransportAction.java @@ -0,0 +1,211 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ +package org.elasticsearch.xpack.migrate.action; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; +import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; +import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; +import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.HandledTransportAction; +import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.action.support.SubscribableListener; +import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.client.internal.Client; +import org.elasticsearch.cluster.block.ClusterBlockException; +import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.metadata.MetadataCreateIndexService; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.settings.IndexScopedSettings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.core.TimeValue; +import org.elasticsearch.index.reindex.BulkByScrollResponse; +import org.elasticsearch.index.reindex.ReindexAction; +import org.elasticsearch.index.reindex.ReindexRequest; +import org.elasticsearch.injection.guice.Inject; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; + +import java.util.Locale; +import java.util.Map; +import java.util.Set; + +public class ReindexDataStreamIndexTransportAction extends HandledTransportAction< + ReindexDataStreamIndexAction.Request, + ReindexDataStreamIndexAction.Response> { + + private static final Logger logger = LogManager.getLogger(ReindexDataStreamIndexTransportAction.class); + + private static final Set SETTINGS_TO_ADD_BACK = Set.of(IndexMetadata.SETTING_BLOCKS_WRITE, IndexMetadata.SETTING_READ_ONLY); + + private static final IndicesOptions IGNORE_MISSING_OPTIONS = IndicesOptions.fromOptions(true, true, false, false); + private final ClusterService clusterService; + private final Client client; + private final IndexScopedSettings indexScopedSettings; + + @Inject + public ReindexDataStreamIndexTransportAction( + TransportService transportService, + ClusterService clusterService, + ActionFilters actionFilters, + Client client, + IndexScopedSettings indexScopedSettings + ) { + super( + ReindexDataStreamIndexAction.NAME, + false, + transportService, + actionFilters, + ReindexDataStreamIndexAction.Request::new, + transportService.getThreadPool().executor(ThreadPool.Names.GENERIC) + ); + this.clusterService = clusterService; + this.client = client; + this.indexScopedSettings = indexScopedSettings; + } + + @Override + protected void doExecute( + Task task, + ReindexDataStreamIndexAction.Request request, + ActionListener listener + ) { + var sourceIndexName = request.getSourceIndex(); + var destIndexName = generateDestIndexName(sourceIndexName); + IndexMetadata sourceIndex = clusterService.state().getMetadata().index(sourceIndexName); + Settings settingsBefore = sourceIndex.getSettings(); + + var hasOldVersion = ReindexDataStreamAction.getOldIndexVersionPredicate(clusterService.state().metadata()); + if (hasOldVersion.test(sourceIndex.getIndex()) == false) { + logger.warn( + "Migrating index [{}] with version [{}] is unnecessary as its version is not before [{}]", + sourceIndexName, + sourceIndex.getCreationVersion(), + ReindexDataStreamAction.MINIMUM_WRITEABLE_VERSION_AFTER_UPGRADE + ); + } + + SubscribableListener.newForked(l -> setBlockWrites(sourceIndexName, l)) + .andThen(l -> deleteDestIfExists(destIndexName, l)) + .andThen(l -> createIndex(sourceIndex, destIndexName, l)) + .andThen(l -> reindex(sourceIndexName, destIndexName, l)) + .andThen(l -> updateSettings(settingsBefore, destIndexName, l)) + .andThenApply(ignored -> new ReindexDataStreamIndexAction.Response(destIndexName)) + .addListener(listener); + } + + private void setBlockWrites(String sourceIndexName, ActionListener listener) { + logger.debug("Setting write block on source index [{}]", sourceIndexName); + final Settings readOnlySettings = Settings.builder().put(IndexMetadata.INDEX_BLOCKS_WRITE_SETTING.getKey(), true).build(); + var updateSettingsRequest = new UpdateSettingsRequest(readOnlySettings, sourceIndexName); + client.admin().indices().updateSettings(updateSettingsRequest, new ActionListener<>() { + @Override + public void onResponse(AcknowledgedResponse response) { + if (response.isAcknowledged()) { + listener.onResponse(null); + } else { + var errorMessage = String.format(Locale.ROOT, "Could not set read-only on source index [%s]", sourceIndexName); + listener.onFailure(new ElasticsearchException(errorMessage)); + } + } + + @Override + public void onFailure(Exception e) { + if (e instanceof ClusterBlockException || e.getCause() instanceof ClusterBlockException) { + // It's fine if read-only is already set + listener.onResponse(null); + } else { + listener.onFailure(e); + } + } + }); + } + + private void deleteDestIfExists(String destIndexName, ActionListener listener) { + logger.debug("Attempting to delete index [{}]", destIndexName); + var deleteIndexRequest = new DeleteIndexRequest(destIndexName).indicesOptions(IGNORE_MISSING_OPTIONS) + .masterNodeTimeout(TimeValue.MAX_VALUE); + var errorMessage = String.format(Locale.ROOT, "Failed to acknowledge delete of index [%s]", destIndexName); + client.admin().indices().delete(deleteIndexRequest, failIfNotAcknowledged(listener, errorMessage)); + } + + private void createIndex(IndexMetadata sourceIndex, String destIndexName, ActionListener listener) { + logger.debug("Creating destination index [{}] for source index [{}]", destIndexName, sourceIndex.getIndex().getName()); + + // Create destination with subset of source index settings that can be added before reindex + var settings = getPreSettings(sourceIndex); + + var sourceMapping = sourceIndex.mapping(); + Map mapping = sourceMapping != null ? sourceMapping.rawSourceAsMap() : Map.of(); + var createIndexRequest = new CreateIndexRequest(destIndexName).settings(settings).mapping(mapping); + + var errorMessage = String.format(Locale.ROOT, "Could not create index [%s]", destIndexName); + client.admin().indices().create(createIndexRequest, failIfNotAcknowledged(listener, errorMessage)); + } + + private void reindex(String sourceIndexName, String destIndexName, ActionListener listener) { + logger.debug("Reindex to destination index [{}] from source index [{}]", destIndexName, sourceIndexName); + var reindexRequest = new ReindexRequest(); + reindexRequest.setSourceIndices(sourceIndexName); + reindexRequest.getSearchRequest().allowPartialSearchResults(false); + reindexRequest.getSearchRequest().source().fetchSource(true); + reindexRequest.setDestIndex(destIndexName); + client.execute(ReindexAction.INSTANCE, reindexRequest, listener); + } + + private void updateSettings(Settings settingsBefore, String destIndexName, ActionListener listener) { + logger.debug("Adding settings from source index that could not be added before reindex"); + + Settings postSettings = getPostSettings(settingsBefore); + if (postSettings.isEmpty()) { + listener.onResponse(null); + return; + } + + var updateSettingsRequest = new UpdateSettingsRequest(postSettings, destIndexName); + var errorMessage = String.format(Locale.ROOT, "Could not update settings on index [%s]", destIndexName); + client.admin().indices().updateSettings(updateSettingsRequest, failIfNotAcknowledged(listener, errorMessage)); + } + + // Filter source index settings to subset of settings that can be included during reindex. + // Similar to the settings filtering done when reindexing for upgrade in Kibana + // https://github.com/elastic/kibana/blob/8a8363f02cc990732eb9cbb60cd388643a336bed/x-pack + // /plugins/upgrade_assistant/server/lib/reindexing/index_settings.ts#L155 + private Settings getPreSettings(IndexMetadata sourceIndex) { + // filter settings that will be added back later + var filtered = sourceIndex.getSettings().filter(settingName -> SETTINGS_TO_ADD_BACK.contains(settingName) == false); + + // filter private and non-copyable settings + var builder = MetadataCreateIndexService.copySettingsFromSource(false, filtered, indexScopedSettings, Settings.builder()); + return builder.build(); + } + + private Settings getPostSettings(Settings settingsBefore) { + return settingsBefore.filter(SETTINGS_TO_ADD_BACK::contains); + } + + public static String generateDestIndexName(String sourceIndex) { + return "migrated-" + sourceIndex; + } + + private static ActionListener failIfNotAcknowledged( + ActionListener listener, + String errorMessage + ) { + return listener.delegateFailureAndWrap((delegate, response) -> { + if (response.isAcknowledged()) { + delegate.onResponse(null); + } + throw new ElasticsearchException(errorMessage); + }); + } +} diff --git a/x-pack/plugin/migrate/src/test/java/org/elasticsearch/xpack/migrate/action/ReindexDatastreamIndexRequestTests.java b/x-pack/plugin/migrate/src/test/java/org/elasticsearch/xpack/migrate/action/ReindexDatastreamIndexRequestTests.java new file mode 100644 index 0000000000000..a057056474ef1 --- /dev/null +++ b/x-pack/plugin/migrate/src/test/java/org/elasticsearch/xpack/migrate/action/ReindexDatastreamIndexRequestTests.java @@ -0,0 +1,29 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.migrate.action; + +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.test.AbstractWireSerializingTestCase; +import org.elasticsearch.xpack.migrate.action.ReindexDataStreamIndexAction.Request; + +public class ReindexDatastreamIndexRequestTests extends AbstractWireSerializingTestCase { + @Override + protected Writeable.Reader instanceReader() { + return Request::new; + } + + @Override + protected Request createTestInstance() { + return new Request(randomAlphaOfLength(20)); + } + + @Override + protected Request mutateInstance(Request instance) { + return new ReindexDataStreamIndexAction.Request(randomValueOtherThan(instance.getSourceIndex(), () -> randomAlphaOfLength(20))); + } +} diff --git a/x-pack/plugin/migrate/src/test/java/org/elasticsearch/xpack/migrate/action/ReindexDatastreamIndexResponseTests.java b/x-pack/plugin/migrate/src/test/java/org/elasticsearch/xpack/migrate/action/ReindexDatastreamIndexResponseTests.java new file mode 100644 index 0000000000000..752e173585f0e --- /dev/null +++ b/x-pack/plugin/migrate/src/test/java/org/elasticsearch/xpack/migrate/action/ReindexDatastreamIndexResponseTests.java @@ -0,0 +1,29 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.migrate.action; + +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.test.AbstractWireSerializingTestCase; +import org.elasticsearch.xpack.migrate.action.ReindexDataStreamIndexAction.Response; + +public class ReindexDatastreamIndexResponseTests extends AbstractWireSerializingTestCase { + @Override + protected Writeable.Reader instanceReader() { + return Response::new; + } + + @Override + protected Response createTestInstance() { + return new Response(randomAlphaOfLength(20)); + } + + @Override + protected Response mutateInstance(Response instance) { + return new Response(randomValueOtherThan(instance.getDestIndex(), () -> randomAlphaOfLength(20))); + } +} diff --git a/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java b/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java index b139d1526ec20..c07e4b2c541a2 100644 --- a/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java +++ b/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java @@ -638,6 +638,7 @@ public class Constants { "internal:admin/indices/prevalidate_shard_path", "internal:index/metadata/migration_version/update", new FeatureFlag("reindex_data_stream").isEnabled() ? "indices:admin/migration/reindex_status" : null, + new FeatureFlag("reindex_data_stream").isEnabled() ? "indices:admin/data_stream/index/reindex" : null, new FeatureFlag("reindex_data_stream").isEnabled() ? "indices:admin/data_stream/reindex" : null, new FeatureFlag("reindex_data_stream").isEnabled() ? "indices:admin/data_stream/reindex_cancel" : null, "internal:admin/repository/verify", From e454d74de4e7eaf7054c3992632b4004e864ca7c Mon Sep 17 00:00:00 2001 From: Mark Vieira Date: Fri, 13 Dec 2024 13:13:44 -0800 Subject: [PATCH 6/7] Add optional pull request check for running with entitlements enabled (#118693) --- .../pipelines/pull-request/part-1-entitlements.yml | 11 +++++++++++ .../pipelines/pull-request/part-2-entitlements.yml | 11 +++++++++++ .../pipelines/pull-request/part-3-entitlements.yml | 11 +++++++++++ .../pipelines/pull-request/part-4-entitlements.yml | 11 +++++++++++ .../pipelines/pull-request/part-5-entitlements.yml | 11 +++++++++++ .../gradle/testclusters/ElasticsearchNode.java | 6 ++---- 6 files changed, 57 insertions(+), 4 deletions(-) create mode 100644 .buildkite/pipelines/pull-request/part-1-entitlements.yml create mode 100644 .buildkite/pipelines/pull-request/part-2-entitlements.yml create mode 100644 .buildkite/pipelines/pull-request/part-3-entitlements.yml create mode 100644 .buildkite/pipelines/pull-request/part-4-entitlements.yml create mode 100644 .buildkite/pipelines/pull-request/part-5-entitlements.yml diff --git a/.buildkite/pipelines/pull-request/part-1-entitlements.yml b/.buildkite/pipelines/pull-request/part-1-entitlements.yml new file mode 100644 index 0000000000000..abb9edf67484f --- /dev/null +++ b/.buildkite/pipelines/pull-request/part-1-entitlements.yml @@ -0,0 +1,11 @@ +config: + allow-labels: "test-entitlements" +steps: + - label: part-1-entitlements + command: .ci/scripts/run-gradle.sh -Dignore.tests.seed -Dtests.jvm.argline="-Des.entitlements.enabled=true" checkPart1 + timeout_in_minutes: 300 + agents: + provider: gcp + image: family/elasticsearch-ubuntu-2004 + machineType: custom-32-98304 + buildDirectory: /dev/shm/bk diff --git a/.buildkite/pipelines/pull-request/part-2-entitlements.yml b/.buildkite/pipelines/pull-request/part-2-entitlements.yml new file mode 100644 index 0000000000000..ef889f3819c5b --- /dev/null +++ b/.buildkite/pipelines/pull-request/part-2-entitlements.yml @@ -0,0 +1,11 @@ +config: + allow-labels: "test-entitlements" +steps: + - label: part-2-entitlements + command: .ci/scripts/run-gradle.sh -Dignore.tests.seed -Dtests.jvm.argline="-Des.entitlements.enabled=true" checkPart2 + timeout_in_minutes: 300 + agents: + provider: gcp + image: family/elasticsearch-ubuntu-2004 + machineType: custom-32-98304 + buildDirectory: /dev/shm/bk diff --git a/.buildkite/pipelines/pull-request/part-3-entitlements.yml b/.buildkite/pipelines/pull-request/part-3-entitlements.yml new file mode 100644 index 0000000000000..c31ae5e6a4ce3 --- /dev/null +++ b/.buildkite/pipelines/pull-request/part-3-entitlements.yml @@ -0,0 +1,11 @@ +config: + allow-labels: "test-entitlements" +steps: + - label: part-3-entitlements + command: .ci/scripts/run-gradle.sh -Dignore.tests.seed -Dtests.jvm.argline="-Des.entitlements.enabled=true" checkPart3 + timeout_in_minutes: 300 + agents: + provider: gcp + image: family/elasticsearch-ubuntu-2004 + machineType: custom-32-98304 + buildDirectory: /dev/shm/bk diff --git a/.buildkite/pipelines/pull-request/part-4-entitlements.yml b/.buildkite/pipelines/pull-request/part-4-entitlements.yml new file mode 100644 index 0000000000000..5817dacbad80b --- /dev/null +++ b/.buildkite/pipelines/pull-request/part-4-entitlements.yml @@ -0,0 +1,11 @@ +config: + allow-labels: "test-entitlements" +steps: + - label: part-4-entitlements + command: .ci/scripts/run-gradle.sh -Dignore.tests.seed -Dtests.jvm.argline="-Des.entitlements.enabled=true" checkPart4 + timeout_in_minutes: 300 + agents: + provider: gcp + image: family/elasticsearch-ubuntu-2004 + machineType: custom-32-98304 + buildDirectory: /dev/shm/bk diff --git a/.buildkite/pipelines/pull-request/part-5-entitlements.yml b/.buildkite/pipelines/pull-request/part-5-entitlements.yml new file mode 100644 index 0000000000000..5a92282361576 --- /dev/null +++ b/.buildkite/pipelines/pull-request/part-5-entitlements.yml @@ -0,0 +1,11 @@ +config: + allow-labels: "test-entitlements" +steps: + - label: part-5-entitlements + command: .ci/scripts/run-gradle.sh -Dignore.tests.seed -Dtests.jvm.argline="-Des.entitlements.enabled=true" checkPart5 + timeout_in_minutes: 300 + agents: + provider: gcp + image: family/elasticsearch-ubuntu-2004 + machineType: custom-32-98304 + buildDirectory: /dev/shm/bk diff --git a/build-tools/src/main/java/org/elasticsearch/gradle/testclusters/ElasticsearchNode.java b/build-tools/src/main/java/org/elasticsearch/gradle/testclusters/ElasticsearchNode.java index 4cb67e249b0b0..2f0f85bc6e159 100644 --- a/build-tools/src/main/java/org/elasticsearch/gradle/testclusters/ElasticsearchNode.java +++ b/build-tools/src/main/java/org/elasticsearch/gradle/testclusters/ElasticsearchNode.java @@ -876,10 +876,8 @@ private void startElasticsearchProcess() { // Don't inherit anything from the environment for as that would lack reproducibility environment.clear(); environment.putAll(getESEnvironment()); - if (cliJvmArgs.isEmpty() == false) { - String cliJvmArgsString = String.join(" ", cliJvmArgs); - environment.put("CLI_JAVA_OPTS", cliJvmArgsString); - } + String cliJvmArgsString = String.join(" ", cliJvmArgs); + environment.put("CLI_JAVA_OPTS", cliJvmArgsString + " " + System.getProperty("tests.jvm.argline", "")); // Direct the stderr to the ES log file. This should capture any jvm problems to start. // Stdout is discarded because ES duplicates the log file to stdout when run in the foreground. From ac0ba24847a6f09ef8686659cd16ca74bc1d79d1 Mon Sep 17 00:00:00 2001 From: Costin Leau Date: Fri, 13 Dec 2024 13:17:56 -0800 Subject: [PATCH 7/7] ESQL: Push down filter passed lookup join (#118410) Improve the planner to detect filters that can be pushed down 'through' a LOOKUP JOIN by determining the conditions scoped to the left/main side and moving them closer to the source. Relates #118305 --- docs/changelog/118410.yaml | 5 + .../core/expression/predicate/Predicates.java | 2 +- .../src/main/resources/lookup-join.csv-spec | 65 +++++ .../logical/PushDownAndCombineFilters.java | 55 ++++ .../optimizer/LogicalPlanOptimizerTests.java | 245 +++++++++++++++++- 5 files changed, 369 insertions(+), 3 deletions(-) create mode 100644 docs/changelog/118410.yaml diff --git a/docs/changelog/118410.yaml b/docs/changelog/118410.yaml new file mode 100644 index 0000000000000..ccc7f71ee2e1c --- /dev/null +++ b/docs/changelog/118410.yaml @@ -0,0 +1,5 @@ +pr: 118410 +summary: Push down filter passed lookup join +area: ES|QL +type: enhancement +issues: [] diff --git a/x-pack/plugin/esql-core/src/main/java/org/elasticsearch/xpack/esql/core/expression/predicate/Predicates.java b/x-pack/plugin/esql-core/src/main/java/org/elasticsearch/xpack/esql/core/expression/predicate/Predicates.java index e63cc1fcf25fe..32f7e181933b4 100644 --- a/x-pack/plugin/esql-core/src/main/java/org/elasticsearch/xpack/esql/core/expression/predicate/Predicates.java +++ b/x-pack/plugin/esql-core/src/main/java/org/elasticsearch/xpack/esql/core/expression/predicate/Predicates.java @@ -61,7 +61,7 @@ public static Expression combineAnd(List exps) { * * using the given combiner. * - * While a bit longer, this method creates a balanced tree as oppose to a plain + * While a bit longer, this method creates a balanced tree as opposed to a plain * recursive approach which creates an unbalanced one (either to the left or right). */ private static Expression combine(List exps, BiFunction combiner) { diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/lookup-join.csv-spec b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/lookup-join.csv-spec index f6704d33934af..c39f4ae7b4e0c 100644 --- a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/lookup-join.csv-spec +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/lookup-join.csv-spec @@ -418,3 +418,68 @@ count:long | type:keyword 3 | Success 1 | Disconnected ; + +// +// Filtering tests +// + +lookupWithFilterOnLeftSideField +required_capability: join_lookup_v5 + +FROM employees +| EVAL language_code = languages +| LOOKUP JOIN languages_lookup ON language_code +| SORT emp_no +| KEEP emp_no, language_code, language_name +| WHERE emp_no >= 10091 AND emp_no < 10094 +; + +emp_no:integer | language_code:integer | language_name:keyword +10091 | 3 | Spanish +10092 | 1 | English +10093 | 3 | Spanish +; + +lookupMessageWithFilterOnRightSideField-Ignore +required_capability: join_lookup_v5 + +FROM sample_data +| LOOKUP JOIN message_types_lookup ON message +| WHERE type == "Error" +| KEEP @timestamp, client_ip, event_duration, message, type +| SORT @timestamp DESC +; + +@timestamp:date | client_ip:ip | event_duration:long | message:keyword | type:keyword +2023-10-23T13:53:55.832Z | 172.21.3.15 | 5033755 | Connection error | Error +2023-10-23T13:52:55.015Z | 172.21.3.15 | 8268153 | Connection error | Error +2023-10-23T13:51:54.732Z | 172.21.3.15 | 725448 | Connection error | Error +; + +lookupWithFieldAndRightSideAfterStats +required_capability: join_lookup_v5 + +FROM sample_data +| LOOKUP JOIN message_types_lookup ON message +| STATS count = count(message) BY type +| WHERE type == "Error" +; + +count:long | type:keyword +3 | Error +; + +lookupWithFieldOnJoinKey-Ignore +required_capability: join_lookup_v5 + +FROM employees +| EVAL language_code = languages +| LOOKUP JOIN languages_lookup ON language_code +| WHERE language_code > 1 AND language_name IS NOT NULL +| KEEP emp_no, language_code, language_name +; + +emp_no:integer | language_code:integer | language_name:keyword +10001 | 2 | French +10003 | 4 | German +; diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/PushDownAndCombineFilters.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/PushDownAndCombineFilters.java index 15e49c22a44db..9ec902e729f54 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/PushDownAndCombineFilters.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/PushDownAndCombineFilters.java @@ -15,6 +15,7 @@ import org.elasticsearch.xpack.esql.core.expression.Expressions; import org.elasticsearch.xpack.esql.core.expression.ReferenceAttribute; import org.elasticsearch.xpack.esql.core.expression.predicate.Predicates; +import org.elasticsearch.xpack.esql.core.util.CollectionUtils; import org.elasticsearch.xpack.esql.plan.logical.Enrich; import org.elasticsearch.xpack.esql.plan.logical.Eval; import org.elasticsearch.xpack.esql.plan.logical.Filter; @@ -23,6 +24,8 @@ import org.elasticsearch.xpack.esql.plan.logical.Project; import org.elasticsearch.xpack.esql.plan.logical.RegexExtract; import org.elasticsearch.xpack.esql.plan.logical.UnaryPlan; +import org.elasticsearch.xpack.esql.plan.logical.join.Join; +import org.elasticsearch.xpack.esql.plan.logical.join.JoinTypes; import java.util.ArrayList; import java.util.List; @@ -76,11 +79,63 @@ protected LogicalPlan rule(Filter filter) { } else if (child instanceof OrderBy orderBy) { // swap the filter with its child plan = orderBy.replaceChild(filter.with(orderBy.child(), condition)); + } else if (child instanceof Join join) { + return pushDownPastJoin(filter, join); } // cannot push past a Limit, this could change the tailing result set returned return plan; } + private record ScopedFilter(List commonFilters, List leftFilters, List rightFilters) {} + + // split the filter condition in 3 parts: + // 1. filter scoped to the left + // 2. filter scoped to the right + // 3. filter that requires both sides to be evaluated + private static ScopedFilter scopeFilter(List filters, LogicalPlan left, LogicalPlan right) { + List rest = new ArrayList<>(filters); + List leftFilters = new ArrayList<>(); + List rightFilters = new ArrayList<>(); + + AttributeSet leftOutput = left.outputSet(); + AttributeSet rightOutput = right.outputSet(); + + // first remove things that are left scoped only + rest.removeIf(f -> f.references().subsetOf(leftOutput) && leftFilters.add(f)); + // followed by right scoped only + rest.removeIf(f -> f.references().subsetOf(rightOutput) && rightFilters.add(f)); + return new ScopedFilter(rest, leftFilters, rightFilters); + } + + private static LogicalPlan pushDownPastJoin(Filter filter, Join join) { + LogicalPlan plan = filter; + // pushdown only through LEFT joins + // TODO: generalize this for other join types + if (join.config().type() == JoinTypes.LEFT) { + LogicalPlan left = join.left(); + LogicalPlan right = join.right(); + + // split the filter condition in 3 parts: + // 1. filter scoped to the left + // 2. filter scoped to the right + // 3. filter that requires both sides to be evaluated + ScopedFilter scoped = scopeFilter(Predicates.splitAnd(filter.condition()), left, right); + // push the left scoped filter down to the left child, keep the rest intact + if (scoped.leftFilters.size() > 0) { + // push the filter down to the left child + left = new Filter(left.source(), left, Predicates.combineAnd(scoped.leftFilters)); + // update the join with the new left child + join = (Join) join.replaceLeft(left); + + // keep the remaining filters in place, otherwise return the new join; + Expression remainingFilter = Predicates.combineAnd(CollectionUtils.combine(scoped.commonFilters, scoped.rightFilters)); + plan = remainingFilter != null ? filter.with(join, remainingFilter) : join; + } + } + // ignore the rest of the join + return plan; + } + private static Function NO_OP = expression -> expression; private static LogicalPlan maybePushDownPastUnary( diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizerTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizerTests.java index d97a8bb2bc27f..0a01dfc088cfb 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizerTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizerTests.java @@ -7,6 +7,7 @@ package org.elasticsearch.xpack.esql.optimizer; +import org.apache.lucene.util.BytesRef; import org.elasticsearch.Build; import org.elasticsearch.common.logging.LoggerMessageFormat; import org.elasticsearch.common.lucene.BytesRefs; @@ -217,6 +218,11 @@ public static void init() { enrichResolution = new EnrichResolution(); AnalyzerTestUtils.loadEnrichPolicyResolution(enrichResolution, "languages_idx", "id", "languages_idx", "mapping-languages.json"); + var lookupMapping = loadMapping("mapping-languages.json"); + IndexResolution lookupResolution = IndexResolution.valid( + new EsIndex("language_code", lookupMapping, Map.of("language_code", IndexMode.LOOKUP)) + ); + // Most tests used data from the test index, so we load it here, and use it in the plan() function. mapping = loadMapping("mapping-basic.json"); EsIndex test = new EsIndex("test", mapping, Map.of("test", IndexMode.STANDARD)); @@ -5740,7 +5746,7 @@ public void testLookupSimple() { String query = """ FROM test | RENAME languages AS int - | LOOKUP int_number_names ON int"""; + | LOOKUP_?? int_number_names ON int"""; if (Build.current().isSnapshot() == false) { var e = expectThrows(ParsingException.class, () -> analyze(query)); assertThat(e.getMessage(), containsString("line 3:3: mismatched input 'LOOKUP' expecting {")); @@ -5820,7 +5826,7 @@ public void testLookupStats() { String query = """ FROM test | RENAME languages AS int - | LOOKUP int_number_names ON int + | LOOKUP_?? int_number_names ON int | STATS MIN(emp_no) BY name"""; if (Build.current().isSnapshot() == false) { var e = expectThrows(ParsingException.class, () -> analyze(query)); @@ -5889,6 +5895,241 @@ public void testLookupStats() { ); } + // + // Lookup JOIN + // + + /** + * Filter on join keys should be pushed down + * Expects + * Project[[_meta_field{f}#13, emp_no{f}#7, first_name{f}#8, gender{f}#9, hire_date{f}#14, job{f}#15, job.raw{f}#16, lang + * uage_code{r}#4, last_name{f}#11, long_noidx{f}#17, salary{f}#12, language_name{f}#19]] + * \_Join[LEFT,[language_code{r}#4],[language_code{r}#4],[language_code{f}#18]] + * |_EsqlProject[[_meta_field{f}#13, emp_no{f}#7, first_name{f}#8, gender{f}#9, hire_date{f}#14, job{f}#15, job.raw{f}#16, lang + * uages{f}#10 AS language_code, last_name{f}#11, long_noidx{f}#17, salary{f}#12]] + * | \_Limit[1000[INTEGER]] + * | \_Filter[languages{f}#10 > 1[INTEGER]] + * | \_EsRelation[test][_meta_field{f}#13, emp_no{f}#7, first_name{f}#8, ge..] + * \_EsRelation[language_code][LOOKUP][language_code{f}#18, language_name{f}#19] + */ + public void testLookupJoinPushDownFilterOnJoinKeyWithRename() { + String query = """ + FROM test + | RENAME languages AS language_code + | LOOKUP JOIN language_code ON language_code + | WHERE language_code > 1 + """; + var plan = optimizedPlan(query); + + var project = as(plan, Project.class); + var join = as(project.child(), Join.class); + assertThat(join.config().type(), equalTo(JoinTypes.LEFT)); + project = as(join.left(), Project.class); + var limit = as(project.child(), Limit.class); + assertThat(limit.limit().fold(), equalTo(1000)); + var filter = as(limit.child(), Filter.class); + // assert that the rename has been undone + var op = as(filter.condition(), GreaterThan.class); + var field = as(op.left(), FieldAttribute.class); + assertThat(field.name(), equalTo("languages")); + + var literal = as(op.right(), Literal.class); + assertThat(literal.value(), equalTo(1)); + + var leftRel = as(filter.child(), EsRelation.class); + var rightRel = as(join.right(), EsRelation.class); + } + + /** + * Filter on on left side fields (outside the join key) should be pushed down + * Expects + * Project[[_meta_field{f}#13, emp_no{f}#7, first_name{f}#8, gender{f}#9, hire_date{f}#14, job{f}#15, job.raw{f}#16, lang + * uage_code{r}#4, last_name{f}#11, long_noidx{f}#17, salary{f}#12, language_name{f}#19]] + * \_Join[LEFT,[language_code{r}#4],[language_code{r}#4],[language_code{f}#18]] + * |_EsqlProject[[_meta_field{f}#13, emp_no{f}#7, first_name{f}#8, gender{f}#9, hire_date{f}#14, job{f}#15, job.raw{f}#16, lang + * uages{f}#10 AS language_code, last_name{f}#11, long_noidx{f}#17, salary{f}#12]] + * | \_Limit[1000[INTEGER]] + * | \_Filter[emp_no{f}#7 > 1[INTEGER]] + * | \_EsRelation[test][_meta_field{f}#13, emp_no{f}#7, first_name{f}#8, ge..] + * \_EsRelation[language_code][LOOKUP][language_code{f}#18, language_name{f}#19] + */ + public void testLookupJoinPushDownFilterOnLeftSideField() { + String query = """ + FROM test + | RENAME languages AS language_code + | LOOKUP JOIN language_code ON language_code + | WHERE emp_no > 1 + """; + + var plan = optimizedPlan(query); + + var project = as(plan, Project.class); + var join = as(project.child(), Join.class); + assertThat(join.config().type(), equalTo(JoinTypes.LEFT)); + project = as(join.left(), Project.class); + + var limit = as(project.child(), Limit.class); + assertThat(limit.limit().fold(), equalTo(1000)); + var filter = as(limit.child(), Filter.class); + var op = as(filter.condition(), GreaterThan.class); + var field = as(op.left(), FieldAttribute.class); + assertThat(field.name(), equalTo("emp_no")); + + var literal = as(op.right(), Literal.class); + assertThat(literal.value(), equalTo(1)); + + var leftRel = as(filter.child(), EsRelation.class); + var rightRel = as(join.right(), EsRelation.class); + } + + /** + * Filter works on the right side fields and thus cannot be pushed down + * Expects + * Project[[_meta_field{f}#13, emp_no{f}#7, first_name{f}#8, gender{f}#9, hire_date{f}#14, job{f}#15, job.raw{f}#16, lang + * uage_code{r}#4, last_name{f}#11, long_noidx{f}#17, salary{f}#12, language_name{f}#19]] + * \_Limit[1000[INTEGER]] + * \_Filter[language_name{f}#19 == [45 6e 67 6c 69 73 68][KEYWORD]] + * \_Join[LEFT,[language_code{r}#4],[language_code{r}#4],[language_code{f}#18]] + * |_EsqlProject[[_meta_field{f}#13, emp_no{f}#7, first_name{f}#8, gender{f}#9, hire_date{f}#14, job{f}#15, job.raw{f}#16, lang + * uages{f}#10 AS language_code, last_name{f}#11, long_noidx{f}#17, salary{f}#12]] + * | \_EsRelation[test][_meta_field{f}#13, emp_no{f}#7, first_name{f}#8, ge..] + * \_EsRelation[language_code][LOOKUP][language_code{f}#18, language_name{f}#19] + */ + public void testLookupJoinPushDownDisabledForLookupField() { + String query = """ + FROM test + | RENAME languages AS language_code + | LOOKUP JOIN language_code ON language_code + | WHERE language_name == "English" + """; + + var plan = optimizedPlan(query); + + var project = as(plan, Project.class); + var limit = as(project.child(), Limit.class); + assertThat(limit.limit().fold(), equalTo(1000)); + + var filter = as(limit.child(), Filter.class); + var op = as(filter.condition(), Equals.class); + var field = as(op.left(), FieldAttribute.class); + assertThat(field.name(), equalTo("language_name")); + var literal = as(op.right(), Literal.class); + assertThat(literal.value(), equalTo(new BytesRef("English"))); + + var join = as(filter.child(), Join.class); + assertThat(join.config().type(), equalTo(JoinTypes.LEFT)); + project = as(join.left(), Project.class); + + var leftRel = as(project.child(), EsRelation.class); + var rightRel = as(join.right(), EsRelation.class); + } + + /** + * Split the conjunction into pushable and non pushable filters. + * Expects + * Project[[_meta_field{f}#14, emp_no{f}#8, first_name{f}#9, gender{f}#10, hire_date{f}#15, job{f}#16, job.raw{f}#17, lan + * guage_code{r}#4, last_name{f}#12, long_noidx{f}#18, salary{f}#13, language_name{f}#20]] + * \_Limit[1000[INTEGER]] + * \_Filter[language_name{f}#20 == [45 6e 67 6c 69 73 68][KEYWORD]] + * \_Join[LEFT,[language_code{r}#4],[language_code{r}#4],[language_code{f}#19]] + * |_EsqlProject[[_meta_field{f}#14, emp_no{f}#8, first_name{f}#9, gender{f}#10, hire_date{f}#15, job{f}#16, job.raw{f}#17, lan + * guages{f}#11 AS language_code, last_name{f}#12, long_noidx{f}#18, salary{f}#13]] + * | \_Filter[emp_no{f}#8 > 1[INTEGER]] + * | \_EsRelation[test][_meta_field{f}#14, emp_no{f}#8, first_name{f}#9, ge..] + * \_EsRelation[language_code][LOOKUP][language_code{f}#19, language_name{f}#20] + */ + public void testLookupJoinPushDownSeparatedForConjunctionBetweenLeftAndRightField() { + String query = """ + FROM test + | RENAME languages AS language_code + | LOOKUP JOIN language_code ON language_code + | WHERE language_name == "English" AND emp_no > 1 + """; + + var plan = optimizedPlan(query); + + var project = as(plan, Project.class); + var limit = as(project.child(), Limit.class); + assertThat(limit.limit().fold(), equalTo(1000)); + // filter kept in place, working on the right side + var filter = as(limit.child(), Filter.class); + EsqlBinaryComparison op = as(filter.condition(), Equals.class); + var field = as(op.left(), FieldAttribute.class); + assertThat(field.name(), equalTo("language_name")); + var literal = as(op.right(), Literal.class); + assertThat(literal.value(), equalTo(new BytesRef("English"))); + + var join = as(filter.child(), Join.class); + assertThat(join.config().type(), equalTo(JoinTypes.LEFT)); + project = as(join.left(), Project.class); + // filter pushed down + filter = as(project.child(), Filter.class); + op = as(filter.condition(), GreaterThan.class); + field = as(op.left(), FieldAttribute.class); + assertThat(field.name(), equalTo("emp_no")); + + literal = as(op.right(), Literal.class); + assertThat(literal.value(), equalTo(1)); + + var leftRel = as(filter.child(), EsRelation.class); + var rightRel = as(join.right(), EsRelation.class); + + } + + /** + * Disjunctions however keep the filter in place, even on pushable fields + * Expects + * Project[[_meta_field{f}#14, emp_no{f}#8, first_name{f}#9, gender{f}#10, hire_date{f}#15, job{f}#16, job.raw{f}#17, lan + * guage_code{r}#4, last_name{f}#12, long_noidx{f}#18, salary{f}#13, language_name{f}#20]] + * \_Limit[1000[INTEGER]] + * \_Filter[language_name{f}#20 == [45 6e 67 6c 69 73 68][KEYWORD] OR emp_no{f}#8 > 1[INTEGER]] + * \_Join[LEFT,[language_code{r}#4],[language_code{r}#4],[language_code{f}#19]] + * |_EsqlProject[[_meta_field{f}#14, emp_no{f}#8, first_name{f}#9, gender{f}#10, hire_date{f}#15, job{f}#16, job.raw{f}#17, lan + * guages{f}#11 AS language_code, last_name{f}#12, long_noidx{f}#18, salary{f}#13]] + * | \_EsRelation[test][_meta_field{f}#14, emp_no{f}#8, first_name{f}#9, ge..] + * \_EsRelation[language_code][LOOKUP][language_code{f}#19, language_name{f}#20] + */ + public void testLookupJoinPushDownDisabledForDisjunctionBetweenLeftAndRightField() { + String query = """ + FROM test + | RENAME languages AS language_code + | LOOKUP JOIN language_code ON language_code + | WHERE language_name == "English" OR emp_no > 1 + """; + + var plan = optimizedPlan(query); + + var project = as(plan, Project.class); + var limit = as(project.child(), Limit.class); + assertThat(limit.limit().fold(), equalTo(1000)); + + var filter = as(limit.child(), Filter.class); + var or = as(filter.condition(), Or.class); + EsqlBinaryComparison op = as(or.left(), Equals.class); + // OR left side + var field = as(op.left(), FieldAttribute.class); + assertThat(field.name(), equalTo("language_name")); + var literal = as(op.right(), Literal.class); + assertThat(literal.value(), equalTo(new BytesRef("English"))); + // OR right side + op = as(or.right(), GreaterThan.class); + field = as(op.left(), FieldAttribute.class); + assertThat(field.name(), equalTo("emp_no")); + literal = as(op.right(), Literal.class); + assertThat(literal.value(), equalTo(1)); + + var join = as(filter.child(), Join.class); + assertThat(join.config().type(), equalTo(JoinTypes.LEFT)); + project = as(join.left(), Project.class); + + var leftRel = as(project.child(), EsRelation.class); + var rightRel = as(join.right(), EsRelation.class); + } + + // + // + // + public void testTranslateMetricsWithoutGrouping() { assumeTrue("requires snapshot builds", Build.current().isSnapshot()); var query = "METRICS k8s max(rate(network.total_bytes_in))";