From 387eb38bfd83d23a2c6592d7253a302f9e37fa81 Mon Sep 17 00:00:00 2001 From: Andrei Dan Date: Fri, 15 Mar 2024 13:28:28 +0000 Subject: [PATCH] Autoshard data streams on rollover (#106076) This enhances our rollover logic to use the data stream autosharding service to retrieve an autosharding recommendation. If the recommendation is an INCREASE_SHARDS or an COOLDOWN_PREVENTED_INCREASE_SHARDS we'll create a rollover condition that'll capture this recommendation, such that rollover will be triggered in ourder to increase the number of shards even if other "regular" conditions are not met (or in the case where cooldown prevented rollover, display the information as to why in the rollover response). All other recommednations are passed to the `MetadataRolloverService` that'll do the needful to ensure the new write index of the data stream receives the correct number of shards. Note that a DECREASE_SHARDS recommendation will reduce the number of shards for a data stream when one of the other "regular" rollover conditions match. It will not trigger a rollover itself, only the INCREASE_SHARDS recommendation will. Some notes on the `NOT_APPLICABLE` recommendation: N/A results are switching back a data stream to the sharding configured in the index template. A data stream can be using auto sharding and later be excluded from the functionality using the `data_streams.auto_sharding.excludes` setting. After a data stream is excluded it needs to start using the number of shards configured in the backing index template. The new autosharding_condition will look like this in the rollover response: ``` "acknowledged": true, "shards_acknowledged": true, "old_index": ".ds-logs-nginx-2024.03.13-000003", "new_index": ".ds-logs-nginx-2024.03.13-000004", "rolled_over": true, "dry_run": false, "lazy": false, "conditions": { "[optimal_shard_count : 3]": true } ``` and like so in the `met_conditions` field, part of rollover info in the cluster state : ``` "rollover_info" : { "logs-nginx" : { "met_conditions" : { "max_docs" : 20000000, "optimal_shard_count" : 3 }, "time" : 1710421491138 } }, ``` --- .../datastreams/DataStreamAutoshardingIT.java | 615 ++++++++++++++++++ .../datastreams/DataStreamFeatures.java | 4 +- .../DataStreamGetWriteIndexTests.java | 2 +- ...etadataDataStreamRolloverServiceTests.java | 11 +- .../org/elasticsearch/TransportVersions.java | 1 + .../admin/indices/rollover/Condition.java | 8 +- .../indices/rollover/LazyRolloverAction.java | 6 +- .../rollover/MetadataRolloverService.java | 71 +- .../rollover/OptimalShardCountCondition.java | 70 ++ .../indices/rollover/RolloverConditions.java | 24 +- .../rollover/TransportRolloverAction.java | 82 ++- .../DataStreamAutoShardingService.java | 22 +- .../node/TransportBroadcastByNodeAction.java | 6 +- .../cluster/metadata/DataStream.java | 16 +- .../common/settings/ClusterSettings.java | 6 + .../elasticsearch/indices/IndicesModule.java | 9 +- .../elasticsearch/node/NodeConstruction.java | 10 + .../indices/rollover/ConditionTests.java | 35 + ...adataRolloverServiceAutoShardingTests.java | 489 ++++++++++++++ .../MetadataRolloverServiceTests.java | 4 + .../OptimalShardCountConditionTests.java | 33 + .../rollover/RolloverConditionsTests.java | 34 + .../TransportRolloverActionTests.java | 18 +- .../DataStreamAutoShardingServiceTests.java | 72 +- .../cluster/metadata/DataStreamTests.java | 68 +- .../cluster/metadata/IndexMetadataTests.java | 5 +- .../metadata/DataStreamTestHelper.java | 26 + .../ReactiveStorageDeciderService.java | 7 +- 28 files changed, 1706 insertions(+), 48 deletions(-) create mode 100644 modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/DataStreamAutoshardingIT.java create mode 100644 server/src/main/java/org/elasticsearch/action/admin/indices/rollover/OptimalShardCountCondition.java create mode 100644 server/src/test/java/org/elasticsearch/action/admin/indices/rollover/MetadataRolloverServiceAutoShardingTests.java create mode 100644 server/src/test/java/org/elasticsearch/action/admin/indices/rollover/OptimalShardCountConditionTests.java diff --git a/modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/DataStreamAutoshardingIT.java b/modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/DataStreamAutoshardingIT.java new file mode 100644 index 0000000000000..332622cc98db8 --- /dev/null +++ b/modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/DataStreamAutoshardingIT.java @@ -0,0 +1,615 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ +package org.elasticsearch.datastreams; + +import org.elasticsearch.action.DocWriteRequest; +import org.elasticsearch.action.admin.indices.refresh.RefreshRequest; +import org.elasticsearch.action.admin.indices.rollover.Condition; +import org.elasticsearch.action.admin.indices.rollover.MaxDocsCondition; +import org.elasticsearch.action.admin.indices.rollover.OptimalShardCountCondition; +import org.elasticsearch.action.admin.indices.rollover.RolloverConditions; +import org.elasticsearch.action.admin.indices.rollover.RolloverInfo; +import org.elasticsearch.action.admin.indices.rollover.RolloverRequest; +import org.elasticsearch.action.admin.indices.rollover.RolloverResponse; +import org.elasticsearch.action.admin.indices.stats.CommonStats; +import org.elasticsearch.action.admin.indices.stats.IndicesStatsAction; +import org.elasticsearch.action.admin.indices.stats.ShardStats; +import org.elasticsearch.action.admin.indices.stats.TransportIndicesStatsAction; +import org.elasticsearch.action.admin.indices.template.put.TransportPutComposableIndexTemplateAction; +import org.elasticsearch.action.bulk.BulkItemResponse; +import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.action.datastreams.CreateDataStreamAction; +import org.elasticsearch.action.datastreams.autosharding.DataStreamAutoShardingService; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.ComposableIndexTemplate; +import org.elasticsearch.cluster.metadata.DataStream; +import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.metadata.Template; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.routing.RecoverySource; +import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.UnassignedInfo; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.settings.Setting; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.core.Nullable; +import org.elasticsearch.datastreams.lifecycle.DataStreamLifecycleService; +import org.elasticsearch.index.Index; +import org.elasticsearch.index.mapper.DateFieldMapper; +import org.elasticsearch.index.shard.DocsStats; +import org.elasticsearch.index.shard.IndexingStats; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.index.shard.ShardPath; +import org.elasticsearch.index.store.StoreStats; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.test.ESIntegTestCase; +import org.elasticsearch.test.transport.MockTransportService; +import org.elasticsearch.xcontent.XContentType; +import org.junit.After; +import org.junit.Before; + +import java.io.IOException; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Locale; +import java.util.Map; + +import static org.elasticsearch.action.datastreams.autosharding.DataStreamAutoShardingService.DATA_STREAMS_AUTO_SHARDING_ENABLED; +import static org.elasticsearch.cluster.metadata.MetadataIndexTemplateService.DEFAULT_TIMESTAMP_FIELD; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.nullValue; +import static org.hamcrest.Matchers.startsWith; + +public class DataStreamAutoshardingIT extends ESIntegTestCase { + + @Override + protected Collection> nodePlugins() { + return List.of(DataStreamsPlugin.class, MockTransportService.TestPlugin.class, TestAutoshardingPlugin.class); + } + + @Before + public void configureClusterSettings() { + updateClusterSettings( + Settings.builder() + .putList(DataStreamAutoShardingService.DATA_STREAMS_AUTO_SHARDING_EXCLUDES_SETTING.getKey(), List.of()) + // we want to manually trigger the rollovers in this test suite to be able to assert incrementally the changes in shard + // configurations + .put(DataStreamLifecycleService.DATA_STREAM_LIFECYCLE_POLL_INTERVAL, "30d") + ); + } + + @After + public void resetClusterSetting() { + updateClusterSettings( + Settings.builder() + .putNull(DataStreamAutoShardingService.DATA_STREAMS_AUTO_SHARDING_EXCLUDES_SETTING.getKey()) + .putNull(DataStreamLifecycleService.DATA_STREAM_LIFECYCLE_POLL_INTERVAL) + ); + } + + public void testRolloverOnAutoShardCondition() throws Exception { + final String dataStreamName = "logs-es"; + + putComposableIndexTemplate( + "my-template", + List.of("logs-*"), + Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 3).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0).build() + ); + final var createDataStreamRequest = new CreateDataStreamAction.Request(dataStreamName); + assertAcked(client().execute(CreateDataStreamAction.INSTANCE, createDataStreamRequest).actionGet()); + + indexDocs(dataStreamName, randomIntBetween(100, 200)); + + { + ClusterState clusterStateBeforeRollover = internalCluster().getCurrentMasterNodeInstance(ClusterService.class).state(); + DataStream dataStreamBeforeRollover = clusterStateBeforeRollover.getMetadata().dataStreams().get(dataStreamName); + String assignedShardNodeId = clusterStateBeforeRollover.routingTable() + .index(dataStreamBeforeRollover.getWriteIndex()) + .shard(0) + .primaryShard() + .currentNodeId(); + + Index firstGenerationIndex = clusterStateBeforeRollover.metadata().dataStreams().get(dataStreamName).getWriteIndex(); + IndexMetadata firstGenerationMeta = clusterStateBeforeRollover.getMetadata().index(firstGenerationIndex); + + List shards = new ArrayList<>(firstGenerationMeta.getNumberOfShards()); + for (int i = 0; i < firstGenerationMeta.getNumberOfShards(); i++) { + // the shard stats will yield a write load of 75.0 which will make the auto sharding service recommend an optimal number + // of 5 shards + shards.add(getShardStats(firstGenerationMeta, i, 75, assignedShardNodeId)); + } + + for (DiscoveryNode node : clusterStateBeforeRollover.nodes().getAllNodes()) { + MockTransportService.getInstance(node.getName()) + .addRequestHandlingBehavior(IndicesStatsAction.NAME + "[n]", (handler, request, channel, task) -> { + TransportIndicesStatsAction instance = internalCluster().getInstance( + TransportIndicesStatsAction.class, + node.getName() + ); + channel.sendResponse( + instance.new NodeResponse(node.getId(), firstGenerationMeta.getNumberOfShards(), shards, List.of()) + ); + }); + } + + assertAcked(indicesAdmin().rolloverIndex(new RolloverRequest(dataStreamName, null)).actionGet()); + + ClusterState clusterStateAfterRollover = internalCluster().getCurrentMasterNodeInstance(ClusterService.class).state(); + DataStream dataStream = clusterStateAfterRollover.getMetadata().dataStreams().get(dataStreamName); + IndexMetadata secondGenerationMeta = clusterStateAfterRollover.metadata().getIndexSafe(dataStream.getWriteIndex()); + + // we auto sharded up to 5 shards + assertThat(secondGenerationMeta.getNumberOfShards(), is(5)); + + IndexMetadata index = clusterStateAfterRollover.metadata().index(firstGenerationIndex); + Map rolloverInfos = index.getRolloverInfos(); + assertThat(rolloverInfos.size(), is(1)); + List> metConditions = rolloverInfos.get(dataStreamName).getMetConditions(); + assertThat(metConditions.size(), is(1)); + assertThat(metConditions.get(0).value(), instanceOf(Integer.class)); + int autoShardingRolloverInfo = (int) metConditions.get(0).value(); + assertThat(autoShardingRolloverInfo, is(5)); + } + + // let's do another rollover now that will not increase the number of shards because the increase shards cooldown has not lapsed, + // however the rollover will use the existing/previous auto shard configuration and the new generation index will have 5 shards + { + ClusterState clusterStateBeforeRollover = internalCluster().getCurrentMasterNodeInstance(ClusterService.class).state(); + DataStream dataStreamBeforeRollover = clusterStateBeforeRollover.getMetadata().dataStreams().get(dataStreamName); + String assignedShardNodeId = clusterStateBeforeRollover.routingTable() + .index(dataStreamBeforeRollover.getWriteIndex()) + .shard(0) + .primaryShard() + .currentNodeId(); + + IndexMetadata secondGenerationMeta = clusterStateBeforeRollover.metadata().index(dataStreamBeforeRollover.getIndices().get(1)); + List shards = new ArrayList<>(secondGenerationMeta.getNumberOfShards()); + for (int i = 0; i < secondGenerationMeta.getNumberOfShards(); i++) { + // the shard stats will yield a write load of 100.0 which will make the auto sharding service recommend an optimal number of + // 7 shards + shards.add(getShardStats(secondGenerationMeta, i, 100, assignedShardNodeId)); + } + + for (DiscoveryNode node : clusterStateBeforeRollover.nodes().getAllNodes()) { + MockTransportService.getInstance(node.getName()) + .addRequestHandlingBehavior(IndicesStatsAction.NAME + "[n]", (handler, request, channel, task) -> { + TransportIndicesStatsAction instance = internalCluster().getInstance( + TransportIndicesStatsAction.class, + node.getName() + ); + channel.sendResponse( + instance.new NodeResponse(node.getId(), secondGenerationMeta.getNumberOfShards(), shards, List.of()) + ); + }); + } + + RolloverResponse response = indicesAdmin().rolloverIndex(new RolloverRequest(dataStreamName, null)).actionGet(); + assertAcked(response); + Map conditionStatus = response.getConditionStatus(); + // empty rollover executed + assertThat(conditionStatus.size(), is(0)); + + ClusterState clusterStateAfterRollover = internalCluster().getCurrentMasterNodeInstance(ClusterService.class).state(); + DataStream dataStream = clusterStateAfterRollover.getMetadata().dataStreams().get(dataStreamName); + IndexMetadata thirdGenerationMeta = clusterStateAfterRollover.metadata().getIndexSafe(dataStream.getWriteIndex()); + + // we remained on 5 shards due to the increase shards cooldown + assertThat(thirdGenerationMeta.getNumberOfShards(), is(5)); + } + + { + try { + // eliminate the increase shards cooldown and re-do the rollover should configure the data stream to 7 shards + // this time also add a rollover condition that does NOT match so that we test that it's the auto sharding that triggers + // indeed the rollover + updateClusterSettings( + Settings.builder().put(DataStreamAutoShardingService.DATA_STREAMS_AUTO_SHARDING_INCREASE_SHARDS_COOLDOWN.getKey(), "0s") + ); + + ClusterState clusterStateBeforeRollover = internalCluster().getCurrentMasterNodeInstance(ClusterService.class).state(); + DataStream dataStreamBeforeRollover = clusterStateBeforeRollover.getMetadata().dataStreams().get(dataStreamName); + String assignedShardNodeId = clusterStateBeforeRollover.routingTable() + .index(dataStreamBeforeRollover.getWriteIndex()) + .shard(0) + .primaryShard() + .currentNodeId(); + + IndexMetadata thirdGenIndex = clusterStateBeforeRollover.metadata().index(dataStreamBeforeRollover.getIndices().get(2)); + List shards = new ArrayList<>(thirdGenIndex.getNumberOfShards()); + for (int i = 0; i < thirdGenIndex.getNumberOfShards(); i++) { + // the shard stats will yield a write load of 100.0 which will make the auto sharding service recommend an optimal + // number of 7 shards + shards.add(getShardStats(thirdGenIndex, i, 100, assignedShardNodeId)); + } + + for (DiscoveryNode node : clusterStateBeforeRollover.nodes().getAllNodes()) { + MockTransportService.getInstance(node.getName()) + .addRequestHandlingBehavior(IndicesStatsAction.NAME + "[n]", (handler, request, channel, task) -> { + TransportIndicesStatsAction instance = internalCluster().getInstance( + TransportIndicesStatsAction.class, + node.getName() + ); + channel.sendResponse( + instance.new NodeResponse(node.getId(), thirdGenIndex.getNumberOfShards(), shards, List.of()) + ); + }); + } + + RolloverRequest request = new RolloverRequest(dataStreamName, null); + request.setConditions(RolloverConditions.newBuilder().addMaxIndexDocsCondition(1_000_000L).build()); + RolloverResponse response = indicesAdmin().rolloverIndex(request).actionGet(); + assertAcked(response); + Map conditionStatus = response.getConditionStatus(); + assertThat(conditionStatus.size(), is(2)); + for (Map.Entry entry : conditionStatus.entrySet()) { + if (entry.getKey().equals(new MaxDocsCondition(1_000_000L).toString())) { + assertThat(entry.getValue(), is(false)); + } else { + assertThat(entry.getKey(), is(new OptimalShardCountCondition(7).toString())); + assertThat(entry.getValue(), is(true)); + } + } + + ClusterState clusterStateAfterRollover = internalCluster().getCurrentMasterNodeInstance(ClusterService.class).state(); + DataStream dataStream = clusterStateAfterRollover.getMetadata().dataStreams().get(dataStreamName); + IndexMetadata fourthGenerationMeta = clusterStateAfterRollover.metadata().getIndexSafe(dataStream.getWriteIndex()); + + // we auto-sharded up to 7 shards as there was no cooldown period + assertThat(fourthGenerationMeta.getNumberOfShards(), is(7)); + } finally { + // reset increase shards cooldown value + updateClusterSettings( + Settings.builder().putNull(DataStreamAutoShardingService.DATA_STREAMS_AUTO_SHARDING_INCREASE_SHARDS_COOLDOWN.getKey()) + ); + } + } + } + + public void testReduceShardsOnRollover() throws IOException { + final String dataStreamName = "logs-es"; + + // start with 3 shards + putComposableIndexTemplate( + "my-template", + List.of("logs-*"), + Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 3).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0).build() + ); + final var createDataStreamRequest = new CreateDataStreamAction.Request(dataStreamName); + assertAcked(client().execute(CreateDataStreamAction.INSTANCE, createDataStreamRequest).actionGet()); + + indexDocs(dataStreamName, randomIntBetween(100, 200)); + + { + // rollover executes but the reduction in shard number will not be executed due to the reduce shards cooldown + ClusterState clusterStateBeforeRollover = internalCluster().getCurrentMasterNodeInstance(ClusterService.class).state(); + DataStream dataStreamBeforeRollover = clusterStateBeforeRollover.getMetadata().dataStreams().get(dataStreamName); + String assignedShardNodeId = clusterStateBeforeRollover.routingTable() + .index(dataStreamBeforeRollover.getWriteIndex()) + .shard(0) + .primaryShard() + .currentNodeId(); + + Index firstGenerationIndex = clusterStateBeforeRollover.metadata().dataStreams().get(dataStreamName).getWriteIndex(); + IndexMetadata firstGenerationMeta = clusterStateBeforeRollover.getMetadata().index(firstGenerationIndex); + + List shards = new ArrayList<>(firstGenerationMeta.getNumberOfShards()); + for (int i = 0; i < firstGenerationMeta.getNumberOfShards(); i++) { + // the shard stats will yield a write load of 2.0 which will make the auto sharding service recommend an optimal number + // of 2 shards + shards.add(getShardStats(firstGenerationMeta, i, 2, assignedShardNodeId)); + } + + for (DiscoveryNode node : clusterStateBeforeRollover.nodes().getAllNodes()) { + MockTransportService.getInstance(node.getName()) + .addRequestHandlingBehavior(IndicesStatsAction.NAME + "[n]", (handler, request, channel, task) -> { + TransportIndicesStatsAction instance = internalCluster().getInstance( + TransportIndicesStatsAction.class, + node.getName() + ); + channel.sendResponse( + instance.new NodeResponse(node.getId(), firstGenerationMeta.getNumberOfShards(), shards, List.of()) + ); + }); + } + + assertAcked(indicesAdmin().rolloverIndex(new RolloverRequest(dataStreamName, null)).actionGet()); + + ClusterState clusterStateAfterRollover = internalCluster().getCurrentMasterNodeInstance(ClusterService.class).state(); + DataStream dataStream = clusterStateAfterRollover.getMetadata().dataStreams().get(dataStreamName); + IndexMetadata secondGenerationMeta = clusterStateAfterRollover.metadata().getIndexSafe(dataStream.getWriteIndex()); + + // we kept the number of shards to 3 as the reduce shards cooldown prevented us reducing the number of shards + assertThat(secondGenerationMeta.getNumberOfShards(), is(3)); + } + + { + // temporarily disable reduce shards cooldown and test that a rollover that doesn't match ANOTHER condition will not be + // executed just because we need to reduce the number of shards, and then that rollover when a different condition does + // indeed match will execute the rollover and the number of shards will be reduced to 2 + try { + updateClusterSettings( + Settings.builder().put(DataStreamAutoShardingService.DATA_STREAMS_AUTO_SHARDING_DECREASE_SHARDS_COOLDOWN.getKey(), "0s") + ); + + ClusterState clusterStateBeforeRollover = internalCluster().getCurrentMasterNodeInstance(ClusterService.class).state(); + DataStream dataStreamBeforeRollover = clusterStateBeforeRollover.getMetadata().dataStreams().get(dataStreamName); + String assignedShardNodeId = clusterStateBeforeRollover.routingTable() + .index(dataStreamBeforeRollover.getWriteIndex()) + .shard(0) + .primaryShard() + .currentNodeId(); + + IndexMetadata secondGenerationIndex = clusterStateBeforeRollover.metadata() + .index(dataStreamBeforeRollover.getIndices().get(1)); + List shards = new ArrayList<>(secondGenerationIndex.getNumberOfShards()); + for (int i = 0; i < secondGenerationIndex.getNumberOfShards(); i++) { + // the shard stats will yield a write load of 2.0 which will make the auto sharding service recommend an optimal + // number of 2 shards + shards.add(getShardStats(secondGenerationIndex, i, 2, assignedShardNodeId)); + } + + for (DiscoveryNode node : clusterStateBeforeRollover.nodes().getAllNodes()) { + MockTransportService.getInstance(node.getName()) + .addRequestHandlingBehavior(IndicesStatsAction.NAME + "[n]", (handler, request, channel, task) -> { + TransportIndicesStatsAction instance = internalCluster().getInstance( + TransportIndicesStatsAction.class, + node.getName() + ); + channel.sendResponse( + instance.new NodeResponse(node.getId(), secondGenerationIndex.getNumberOfShards(), shards, List.of()) + ); + }); + } + + RolloverRequest request = new RolloverRequest(dataStreamName, null); + // adding condition that does NOT match + request.setConditions(RolloverConditions.newBuilder().addMaxIndexDocsCondition(1_000_000L).build()); + RolloverResponse response = indicesAdmin().rolloverIndex(request).actionGet(); + assertThat(response.isRolledOver(), is(false)); + Map conditionStatus = response.getConditionStatus(); + assertThat(conditionStatus.size(), is(1)); + assertThat(conditionStatus.get(new MaxDocsCondition(1_000_000L).toString()), is(false)); + + // let's rollover with a condition that does match and test that the number of shards is reduced to 2 + indexDocs(dataStreamName, 100); + request = new RolloverRequest(dataStreamName, null); + // adding condition that does NOT match + request.setConditions(RolloverConditions.newBuilder().addMaxIndexDocsCondition(1L).build()); + response = indicesAdmin().rolloverIndex(request).actionGet(); + assertThat(response.isRolledOver(), is(true)); + conditionStatus = response.getConditionStatus(); + assertThat(conditionStatus.size(), is(2)); + for (Map.Entry entry : conditionStatus.entrySet()) { + if (entry.getKey().equals(new MaxDocsCondition(1L).toString())) { + assertThat(conditionStatus.get(new MaxDocsCondition(1L).toString()), is(true)); + } else { + assertThat(conditionStatus.get(new OptimalShardCountCondition(2).toString()), is(true)); + } + } + + ClusterState clusterStateAfterRollover = internalCluster().getCurrentMasterNodeInstance(ClusterService.class).state(); + DataStream dataStream = clusterStateAfterRollover.getMetadata().dataStreams().get(dataStreamName); + IndexMetadata thirdGenerationMeta = clusterStateAfterRollover.metadata().getIndexSafe(dataStream.getWriteIndex()); + + assertThat(thirdGenerationMeta.getNumberOfShards(), is(2)); + } finally { + // reset increase shards cooldown value + updateClusterSettings( + Settings.builder().putNull(DataStreamAutoShardingService.DATA_STREAMS_AUTO_SHARDING_DECREASE_SHARDS_COOLDOWN.getKey()) + ); + + } + + } + + } + + public void testLazyRolloverKeepsPreviousAutoshardingDecision() throws IOException { + final String dataStreamName = "logs-es"; + + putComposableIndexTemplate( + "my-template", + List.of("logs-*"), + Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 3).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0).build() + ); + final var createDataStreamRequest = new CreateDataStreamAction.Request(dataStreamName); + assertAcked(client().execute(CreateDataStreamAction.INSTANCE, createDataStreamRequest).actionGet()); + + indexDocs(dataStreamName, randomIntBetween(100, 200)); + + { + ClusterState clusterStateBeforeRollover = internalCluster().getCurrentMasterNodeInstance(ClusterService.class).state(); + DataStream dataStreamBeforeRollover = clusterStateBeforeRollover.getMetadata().dataStreams().get(dataStreamName); + + Index firstGenerationIndex = clusterStateBeforeRollover.metadata().dataStreams().get(dataStreamName).getWriteIndex(); + IndexMetadata firstGenerationMeta = clusterStateBeforeRollover.getMetadata().index(firstGenerationIndex); + + List shards = new ArrayList<>(firstGenerationMeta.getNumberOfShards()); + for (int i = 0; i < firstGenerationMeta.getNumberOfShards(); i++) { + // the shard stats will yield a write load of 75.0 which will make the auto sharding service recommend an optimal number + // of 5 shards + shards.add( + getShardStats( + firstGenerationMeta, + i, + 75, + clusterStateBeforeRollover.routingTable() + .index(dataStreamBeforeRollover.getWriteIndex()) + .shard(0) + .primaryShard() + .currentNodeId() + ) + ); + } + + for (DiscoveryNode node : clusterStateBeforeRollover.nodes().getAllNodes()) { + MockTransportService.getInstance(node.getName()) + .addRequestHandlingBehavior(IndicesStatsAction.NAME + "[n]", (handler, request, channel, task) -> { + TransportIndicesStatsAction instance = internalCluster().getInstance( + TransportIndicesStatsAction.class, + node.getName() + ); + channel.sendResponse( + instance.new NodeResponse(node.getId(), firstGenerationMeta.getNumberOfShards(), shards, List.of()) + ); + }); + } + + assertAcked(indicesAdmin().rolloverIndex(new RolloverRequest(dataStreamName, null)).actionGet()); + + ClusterState clusterStateAfterRollover = internalCluster().getCurrentMasterNodeInstance(ClusterService.class).state(); + DataStream dataStream = clusterStateAfterRollover.getMetadata().dataStreams().get(dataStreamName); + IndexMetadata secondGenerationMeta = clusterStateAfterRollover.metadata().getIndexSafe(dataStream.getWriteIndex()); + + // we auto sharded up to 5 shards + assertThat(secondGenerationMeta.getNumberOfShards(), is(5)); + } + + { + try { + // eliminate the increase shards cooldown so there are no potential barriers to another increase shards option (we'll + // actually also simulate the stats such that an increase to 7 is warranted) and execute a lazy rollover that should not + // indeed auto shard up, but just keep the existing auto sharding event and create a new index with 5 shards (as dictated + // by the existing auto sharding event) + updateClusterSettings( + Settings.builder().put(DataStreamAutoShardingService.DATA_STREAMS_AUTO_SHARDING_INCREASE_SHARDS_COOLDOWN.getKey(), "0s") + ); + + ClusterState clusterStateBeforeRollover = internalCluster().getCurrentMasterNodeInstance(ClusterService.class).state(); + DataStream dataStreamBeforeRollover = clusterStateBeforeRollover.getMetadata().dataStreams().get(dataStreamName); + + IndexMetadata secondGenIndex = clusterStateBeforeRollover.metadata().index(dataStreamBeforeRollover.getIndices().get(1)); + List shards = new ArrayList<>(secondGenIndex.getNumberOfShards()); + for (int i = 0; i < secondGenIndex.getNumberOfShards(); i++) { + // the shard stats will yield a write load of 100.0 which will make the auto sharding service recommend an optimal + // number of 7 shards + shards.add( + getShardStats( + secondGenIndex, + i, + 100, + clusterStateBeforeRollover.routingTable() + .index(dataStreamBeforeRollover.getWriteIndex()) + .shard(i) + .primaryShard() + .currentNodeId() + ) + ); + } + + for (DiscoveryNode node : clusterStateBeforeRollover.nodes().getAllNodes()) { + MockTransportService.getInstance(node.getName()) + .addRequestHandlingBehavior(IndicesStatsAction.NAME + "[n]", (handler, request, channel, task) -> { + TransportIndicesStatsAction instance = internalCluster().getInstance( + TransportIndicesStatsAction.class, + node.getName() + ); + channel.sendResponse( + instance.new NodeResponse(node.getId(), secondGenIndex.getNumberOfShards(), shards, List.of()) + ); + }); + } + + RolloverRequest request = new RolloverRequest(dataStreamName, null); + request.lazy(true); + assertAcked(indicesAdmin().rolloverIndex(request).actionGet()); + + // index some docs so the rollover is executed + indexDocs(dataStreamName, 10); + ClusterState clusterStateAfterRollover = internalCluster().getCurrentMasterNodeInstance(ClusterService.class).state(); + DataStream dataStream = clusterStateAfterRollover.getMetadata().dataStreams().get(dataStreamName); + IndexMetadata thirdGenerationIndex = clusterStateAfterRollover.metadata().getIndexSafe(dataStream.getWriteIndex()); + + // we kept the number of shards to 5 as we did a lazy rollover + assertThat(thirdGenerationIndex.getNumberOfShards(), is(5)); + } finally { + // reset increase shards cooldown value + updateClusterSettings( + Settings.builder().putNull(DataStreamAutoShardingService.DATA_STREAMS_AUTO_SHARDING_INCREASE_SHARDS_COOLDOWN.getKey()) + ); + } + } + } + + private static ShardStats getShardStats(IndexMetadata indexMeta, int shardIndex, long targetWriteLoad, String assignedShardNodeId) { + ShardId shardId = new ShardId(indexMeta.getIndex(), shardIndex); + Path path = createTempDir().resolve("indices").resolve(indexMeta.getIndexUUID()).resolve(String.valueOf(shardIndex)); + ShardRouting shardRouting = ShardRouting.newUnassigned( + shardId, + true, + RecoverySource.EmptyStoreRecoverySource.INSTANCE, + new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, null), + ShardRouting.Role.DEFAULT + ); + shardRouting = shardRouting.initialize(assignedShardNodeId, null, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE); + shardRouting = shardRouting.moveToStarted(ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE); + CommonStats stats = new CommonStats(); + stats.docs = new DocsStats(100, 0, randomByteSizeValue().getBytes()); + stats.store = new StoreStats(); + stats.indexing = new IndexingStats(new IndexingStats.Stats(1, 1, 1, 1, 1, 1, 1, 1, false, 1, targetWriteLoad, 1)); + return new ShardStats(shardRouting, new ShardPath(false, path, path, shardId), stats, null, null, null, false, 0); + } + + static void putComposableIndexTemplate(String id, List patterns, @Nullable Settings settings) throws IOException { + TransportPutComposableIndexTemplateAction.Request request = new TransportPutComposableIndexTemplateAction.Request(id); + request.indexTemplate( + ComposableIndexTemplate.builder() + .indexPatterns(patterns) + .template(new Template(settings, null, null, null)) + .dataStreamTemplate(new ComposableIndexTemplate.DataStreamTemplate()) + .build() + ); + client().execute(TransportPutComposableIndexTemplateAction.TYPE, request).actionGet(); + } + + static void indexDocs(String dataStream, int numDocs) { + BulkRequest bulkRequest = new BulkRequest(); + for (int i = 0; i < numDocs; i++) { + String value = DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.formatMillis(System.currentTimeMillis()); + bulkRequest.add( + new IndexRequest(dataStream).opType(DocWriteRequest.OpType.CREATE) + .source(String.format(Locale.ROOT, "{\"%s\":\"%s\"}", DEFAULT_TIMESTAMP_FIELD, value), XContentType.JSON) + ); + } + BulkResponse bulkResponse = client().bulk(bulkRequest).actionGet(); + assertThat(bulkResponse.getItems().length, equalTo(numDocs)); + String backingIndexPrefix = DataStream.BACKING_INDEX_PREFIX + dataStream; + for (BulkItemResponse itemResponse : bulkResponse) { + assertThat(itemResponse.getFailureMessage(), nullValue()); + assertThat(itemResponse.status(), equalTo(RestStatus.CREATED)); + assertThat(itemResponse.getIndex(), startsWith(backingIndexPrefix)); + } + indicesAdmin().refresh(new RefreshRequest(dataStream)).actionGet(); + } + + /** + * Test plugin that registers an additional setting. + */ + public static class TestAutoshardingPlugin extends Plugin { + @Override + public List> getSettings() { + return List.of( + Setting.boolSetting(DATA_STREAMS_AUTO_SHARDING_ENABLED, false, Setting.Property.Dynamic, Setting.Property.NodeScope) + ); + } + + @Override + public Settings additionalSettings() { + return Settings.builder().put(DATA_STREAMS_AUTO_SHARDING_ENABLED, true).build(); + } + } + +} diff --git a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/DataStreamFeatures.java b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/DataStreamFeatures.java index 734c10570ab2b..06dc8919360f8 100644 --- a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/DataStreamFeatures.java +++ b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/DataStreamFeatures.java @@ -9,6 +9,7 @@ package org.elasticsearch.datastreams; import org.elasticsearch.action.admin.indices.rollover.LazyRolloverAction; +import org.elasticsearch.action.datastreams.autosharding.DataStreamAutoShardingService; import org.elasticsearch.datastreams.lifecycle.health.DataStreamLifecycleHealthInfoPublisher; import org.elasticsearch.features.FeatureSpecification; import org.elasticsearch.features.NodeFeature; @@ -24,7 +25,8 @@ public class DataStreamFeatures implements FeatureSpecification { public Set getFeatures() { return Set.of( DataStreamLifecycleHealthInfoPublisher.DSL_HEALTH_INFO_FEATURE, // Added in 8.12 - LazyRolloverAction.DATA_STREAM_LAZY_ROLLOVER // Added in 8.13 + LazyRolloverAction.DATA_STREAM_LAZY_ROLLOVER, // Added in 8.13 + DataStreamAutoShardingService.DATA_STREAM_AUTO_SHARDING_FEATURE ); } } diff --git a/modules/data-streams/src/test/java/org/elasticsearch/datastreams/DataStreamGetWriteIndexTests.java b/modules/data-streams/src/test/java/org/elasticsearch/datastreams/DataStreamGetWriteIndexTests.java index d0b41c847a61d..b61cbdc837010 100644 --- a/modules/data-streams/src/test/java/org/elasticsearch/datastreams/DataStreamGetWriteIndexTests.java +++ b/modules/data-streams/src/test/java/org/elasticsearch/datastreams/DataStreamGetWriteIndexTests.java @@ -318,7 +318,7 @@ private MetadataRolloverService.RolloverResult rolloverOver(ClusterState state, MaxDocsCondition condition = new MaxDocsCondition(randomNonNegativeLong()); List> metConditions = Collections.singletonList(condition); CreateIndexRequest createIndexRequest = new CreateIndexRequest("_na_"); - return rolloverService.rolloverClusterState(state, name, null, createIndexRequest, metConditions, time, false, false, null); + return rolloverService.rolloverClusterState(state, name, null, createIndexRequest, metConditions, time, false, false, null, null); } private Index getWriteIndex(ClusterState state, String name, String timestamp) { diff --git a/modules/data-streams/src/test/java/org/elasticsearch/datastreams/MetadataDataStreamRolloverServiceTests.java b/modules/data-streams/src/test/java/org/elasticsearch/datastreams/MetadataDataStreamRolloverServiceTests.java index 4f2df2c690bc8..2bfbeb8e37aaf 100644 --- a/modules/data-streams/src/test/java/org/elasticsearch/datastreams/MetadataDataStreamRolloverServiceTests.java +++ b/modules/data-streams/src/test/java/org/elasticsearch/datastreams/MetadataDataStreamRolloverServiceTests.java @@ -119,7 +119,8 @@ public void testRolloverClusterStateForDataStream() throws Exception { now, randomBoolean(), false, - indexStats + indexStats, + null ); long after = testThreadPool.absoluteTimeInMillis(); @@ -218,6 +219,7 @@ public void testRolloverAndMigrateDataStream() throws Exception { now, randomBoolean(), false, + null, null ); @@ -310,6 +312,7 @@ public void testChangingIndexModeFromTimeSeriesToSomethingElseNoEffectOnExisting now, randomBoolean(), false, + null, null ); @@ -375,7 +378,8 @@ public void testRolloverClusterStateWithBrokenOlderTsdbDataStream() throws Excep now, randomBoolean(), false, - indexStats + indexStats, + null ); long after = testThreadPool.absoluteTimeInMillis(); @@ -455,7 +459,8 @@ public void testRolloverClusterStateWithBrokenTsdbDataStream() throws Exception now, randomBoolean(), false, - indexStats + indexStats, + null ) ); assertThat(e.getMessage(), containsString("is overlapping with backing index")); diff --git a/server/src/main/java/org/elasticsearch/TransportVersions.java b/server/src/main/java/org/elasticsearch/TransportVersions.java index 5c09a5464171d..536a5db51e8a8 100644 --- a/server/src/main/java/org/elasticsearch/TransportVersions.java +++ b/server/src/main/java/org/elasticsearch/TransportVersions.java @@ -148,6 +148,7 @@ static TransportVersion def(int id) { public static final TransportVersion ESQL_SERIALIZE_BIG_VECTOR = def(8_608_00_0); public static final TransportVersion AGGS_EXCLUDED_DELETED_DOCS = def(8_609_00_0); public static final TransportVersion ESQL_SERIALIZE_BIG_ARRAY = def(8_610_00_0); + public static final TransportVersion AUTO_SHARDING_ROLLOVER_CONDITION = def(8_611_00_0); /* * STOP! READ THIS FIRST! No, really, diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/rollover/Condition.java b/server/src/main/java/org/elasticsearch/action/admin/indices/rollover/Condition.java index ba7d6b03043c5..b4a466dc9aa1e 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/rollover/Condition.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/rollover/Condition.java @@ -20,12 +20,14 @@ */ public abstract class Condition implements NamedWriteable, ToXContentFragment { - /** - * Describes the type of condition - a min_* condition (MIN) or max_* condition (MAX). + /* + * Describes the type of condition - a min_* condition (MIN), max_* condition (MAX), or an automatic condition (automatic conditions + * are something that the platform configures and manages) */ public enum Type { MIN, - MAX + MAX, + AUTOMATIC } protected T value; diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/rollover/LazyRolloverAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/rollover/LazyRolloverAction.java index 9266a320f598c..623186e052eb7 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/rollover/LazyRolloverAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/rollover/LazyRolloverAction.java @@ -9,6 +9,7 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionType; +import org.elasticsearch.action.datastreams.autosharding.DataStreamAutoShardingService; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.client.internal.Client; import org.elasticsearch.cluster.ClusterState; @@ -59,6 +60,7 @@ public TransportLazyRolloverAction( MetadataRolloverService rolloverService, AllocationService allocationService, MetadataDataStreamsService metadataDataStreamsService, + DataStreamAutoShardingService dataStreamAutoShardingService, Client client ) { super( @@ -71,7 +73,8 @@ public TransportLazyRolloverAction( rolloverService, client, allocationService, - metadataDataStreamsService + metadataDataStreamsService, + dataStreamAutoShardingService ); } @@ -121,6 +124,7 @@ protected void masterOperation( new RolloverRequest(rolloverRequest.getRolloverTarget(), null), null, trialRolloverResponse, + null, listener ); submitRolloverTask(rolloverRequest, source, rolloverTask); diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/rollover/MetadataRolloverService.java b/server/src/main/java/org/elasticsearch/action/admin/indices/rollover/MetadataRolloverService.java index 4972a784cc2bd..6645de880ad86 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/rollover/MetadataRolloverService.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/rollover/MetadataRolloverService.java @@ -8,14 +8,18 @@ package org.elasticsearch.action.admin.indices.rollover; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.elasticsearch.action.admin.indices.create.CreateIndexClusterStateUpdateRequest; import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; +import org.elasticsearch.action.datastreams.autosharding.AutoShardingResult; import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.AliasAction; import org.elasticsearch.cluster.metadata.AliasMetadata; import org.elasticsearch.cluster.metadata.ComposableIndexTemplate; import org.elasticsearch.cluster.metadata.DataStream; +import org.elasticsearch.cluster.metadata.DataStreamAutoShardingEvent; import org.elasticsearch.cluster.metadata.IndexAbstraction; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.IndexMetadataStats; @@ -61,6 +65,7 @@ * Service responsible for handling rollover requests for write aliases and data streams */ public class MetadataRolloverService { + private static final Logger logger = LogManager.getLogger(MetadataRolloverService.class); private static final Pattern INDEX_NAME_PATTERN = Pattern.compile("^.*-\\d+$"); private static final List VALID_ROLLOVER_TARGETS = List.of(ALIAS, DATA_STREAM); @@ -110,7 +115,8 @@ public RolloverResult rolloverClusterState( Instant now, boolean silent, boolean onlyValidate, - @Nullable IndexMetadataStats sourceIndexStats + @Nullable IndexMetadataStats sourceIndexStats, + @Nullable AutoShardingResult autoShardingResult ) throws Exception { validate(currentState.metadata(), rolloverTarget, newIndexName, createIndexRequest); final IndexAbstraction indexAbstraction = currentState.metadata().getIndicesLookup().get(rolloverTarget); @@ -134,7 +140,8 @@ public RolloverResult rolloverClusterState( now, silent, onlyValidate, - sourceIndexStats + sourceIndexStats, + autoShardingResult ); default -> // the validate method above prevents this case @@ -244,7 +251,8 @@ private RolloverResult rolloverDataStream( Instant now, boolean silent, boolean onlyValidate, - @Nullable IndexMetadataStats sourceIndexStats + @Nullable IndexMetadataStats sourceIndexStats, + @Nullable AutoShardingResult autoShardingResult ) throws Exception { if (SnapshotsService.snapshottingDataStreams(currentState, Collections.singleton(dataStream.getName())).isEmpty() == false) { @@ -281,6 +289,54 @@ private RolloverResult rolloverDataStream( return new RolloverResult(newWriteIndexName, originalWriteIndex.getName(), currentState); } + DataStreamAutoShardingEvent dataStreamAutoShardingEvent = autoShardingResult == null + ? dataStream.getAutoShardingEvent() + : switch (autoShardingResult.type()) { + case NO_CHANGE_REQUIRED -> { + logger.info( + "Rolling over data stream [{}] using existing auto-sharding recommendation [{}]", + dataStreamName, + dataStream.getAutoShardingEvent() + ); + yield dataStream.getAutoShardingEvent(); + } + case INCREASE_SHARDS, DECREASE_SHARDS -> { + logger.info("Auto sharding data stream [{}] to [{}]", dataStreamName, autoShardingResult); + yield new DataStreamAutoShardingEvent( + dataStream.getWriteIndex().getName(), + autoShardingResult.targetNumberOfShards(), + now.toEpochMilli() + ); + } + case COOLDOWN_PREVENTED_INCREASE, COOLDOWN_PREVENTED_DECREASE -> { + // we're in the cooldown period for this particular recommendation so perhaps use a previous autosharding + // recommendation (or the value configured in the backing index template otherwise) + if (dataStream.getAutoShardingEvent() != null) { + logger.info( + "Rolling over data stream [{}] using existing auto-sharding recommendation [{}]", + dataStreamName, + dataStream.getAutoShardingEvent() + ); + } + yield dataStream.getAutoShardingEvent(); + } + // data sharding might not be available due to the feature not being available/enabled or due to cluster level excludes + // being configured. the index template will dictate the number of shards as usual + case NOT_APPLICABLE -> { + logger.debug("auto sharding is not applicable for data stream [{}]", dataStreamName); + yield null; + } + }; + + // configure the number of shards using an auto sharding event (new, or existing) if we have one + if (dataStreamAutoShardingEvent != null) { + Settings settingsWithAutoSharding = Settings.builder() + .put(createIndexRequest.settings()) + .put(IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), dataStreamAutoShardingEvent.targetNumberOfShards()) + .build(); + createIndexRequest.settings(settingsWithAutoSharding); + } + var createIndexClusterStateRequest = prepareDataStreamCreateIndexRequest( dataStreamName, newWriteIndexName, @@ -298,7 +354,14 @@ private RolloverResult rolloverDataStream( silent, (builder, indexMetadata) -> { downgradeBrokenTsdbBackingIndices(dataStream, builder); - builder.put(dataStream.rollover(indexMetadata.getIndex(), newGeneration, metadata.isTimeSeriesTemplate(templateV2))); + builder.put( + dataStream.rollover( + indexMetadata.getIndex(), + newGeneration, + metadata.isTimeSeriesTemplate(templateV2), + dataStreamAutoShardingEvent + ) + ); }, rerouteCompletionIsNotRequired() ); diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/rollover/OptimalShardCountCondition.java b/server/src/main/java/org/elasticsearch/action/admin/indices/rollover/OptimalShardCountCondition.java new file mode 100644 index 0000000000000..93a11b8fe0855 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/rollover/OptimalShardCountCondition.java @@ -0,0 +1,70 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.action.admin.indices.rollover; + +import org.elasticsearch.TransportVersion; +import org.elasticsearch.TransportVersions; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.xcontent.XContentBuilder; +import org.elasticsearch.xcontent.XContentParser; + +import java.io.IOException; + +/** + * Condition for automatically increasing the number of shards for a data stream. This indicates the optimum number of shards that was + * configured for the index abstraction as part of rollover. + * It's more of a marker condition, when present the condition is met, more than a condition we evaluate against stats. + */ +public class OptimalShardCountCondition extends Condition { + public static final String NAME = "optimal_shard_count"; + + public OptimalShardCountCondition(int optimalShards) { + super(NAME, Type.AUTOMATIC); + this.value = optimalShards; + } + + public OptimalShardCountCondition(StreamInput in) throws IOException { + super(NAME, Type.AUTOMATIC); + this.value = in.readVInt(); + } + + @Override + public Result evaluate(final Stats stats) { + return new Result(this, true); + } + + @Override + public String getWriteableName() { + return NAME; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeVInt(value); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + return builder.field(NAME, value); + } + + public static OptimalShardCountCondition fromXContent(XContentParser parser) throws IOException { + if (parser.nextToken() == XContentParser.Token.VALUE_NUMBER) { + return new OptimalShardCountCondition(parser.intValue()); + } else { + throw new IllegalArgumentException("invalid token when parsing " + NAME + " condition: " + parser.currentToken()); + } + } + + @Override + boolean includedInVersion(TransportVersion version) { + return version.onOrAfter(TransportVersions.AUTO_SHARDING_ROLLOVER_CONDITION); + } +} diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/rollover/RolloverConditions.java b/server/src/main/java/org/elasticsearch/action/admin/indices/rollover/RolloverConditions.java index 24f93ccb45348..d7cd7459d4821 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/rollover/RolloverConditions.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/rollover/RolloverConditions.java @@ -7,6 +7,7 @@ */ package org.elasticsearch.action.admin.indices.rollover; +import org.elasticsearch.action.datastreams.autosharding.AutoShardingResult; import org.elasticsearch.common.Strings; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; @@ -27,6 +28,9 @@ import java.util.Map; import java.util.Objects; +import static org.elasticsearch.action.datastreams.autosharding.AutoShardingType.DECREASE_SHARDS; +import static org.elasticsearch.action.datastreams.autosharding.AutoShardingType.INCREASE_SHARDS; + /** * Contains the conditions that determine if an index can be rolled over or not. It is used by the {@link RolloverRequest}, * the Index Lifecycle Management and the Data Stream Lifecycle. @@ -243,7 +247,12 @@ public boolean areConditionsMet(Map conditionResults) { .filter(c -> Condition.Type.MAX == c.type()) .anyMatch(c -> conditionResults.getOrDefault(c.toString(), false)); - return conditionResults.size() == 0 || (allMinConditionsMet && anyMaxConditionsMet); + boolean anyAutomaticConditionsMet = conditions.values() + .stream() + .filter(c -> Condition.Type.AUTOMATIC == c.type()) + .anyMatch(c -> conditionResults.getOrDefault(c.toString(), false)); + + return conditionResults.size() == 0 || (allMinConditionsMet && anyMaxConditionsMet) || anyAutomaticConditionsMet; } public static RolloverConditions fromXContent(XContentParser parser) throws IOException { @@ -408,6 +417,19 @@ public Builder addMinPrimaryShardDocsCondition(Long numDocs) { return this; } + /** + * Adds an optimal shard count condition if the autosharding result is of type INCREASE or DECREASE_SHARDS, ignores it otherwise. + */ + public Builder addOptimalShardCountCondition(AutoShardingResult autoShardingResult) { + if (autoShardingResult.type().equals(INCREASE_SHARDS) || autoShardingResult.type().equals(DECREASE_SHARDS)) { + OptimalShardCountCondition optimalShardCountCondition = new OptimalShardCountCondition( + autoShardingResult.targetNumberOfShards() + ); + this.conditions.put(optimalShardCountCondition.name, optimalShardCountCondition); + } + return this; + } + public RolloverConditions build() { return new RolloverConditions(conditions); } diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/rollover/TransportRolloverAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/rollover/TransportRolloverAction.java index 481eda825b047..c295ccde01623 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/rollover/TransportRolloverAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/rollover/TransportRolloverAction.java @@ -17,6 +17,9 @@ import org.elasticsearch.action.admin.indices.stats.IndicesStatsRequest; import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse; import org.elasticsearch.action.admin.indices.stats.ShardStats; +import org.elasticsearch.action.datastreams.autosharding.AutoShardingResult; +import org.elasticsearch.action.datastreams.autosharding.AutoShardingType; +import org.elasticsearch.action.datastreams.autosharding.DataStreamAutoShardingService; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.ActiveShardsObserver; import org.elasticsearch.action.support.IndicesOptions; @@ -27,6 +30,7 @@ import org.elasticsearch.cluster.ClusterStateTaskListener; import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockLevel; +import org.elasticsearch.cluster.metadata.DataStream; import org.elasticsearch.cluster.metadata.IndexAbstraction; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.IndexMetadataStats; @@ -54,6 +58,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; @@ -70,6 +75,7 @@ public class TransportRolloverAction extends TransportMasterNodeAction rolloverTaskQueue; private final MetadataDataStreamsService metadataDataStreamsService; + private final DataStreamAutoShardingService dataStreamAutoShardingService; @Inject public TransportRolloverAction( @@ -81,7 +87,8 @@ public TransportRolloverAction( MetadataRolloverService rolloverService, Client client, AllocationService allocationService, - MetadataDataStreamsService metadataDataStreamsService + MetadataDataStreamsService metadataDataStreamsService, + DataStreamAutoShardingService dataStreamAutoShardingService ) { this( RolloverAction.INSTANCE, @@ -93,7 +100,8 @@ public TransportRolloverAction( rolloverService, client, allocationService, - metadataDataStreamsService + metadataDataStreamsService, + dataStreamAutoShardingService ); } @@ -107,7 +115,8 @@ public TransportRolloverAction( MetadataRolloverService rolloverService, Client client, AllocationService allocationService, - MetadataDataStreamsService metadataDataStreamsService + MetadataDataStreamsService metadataDataStreamsService, + DataStreamAutoShardingService dataStreamAutoShardingService ) { super( actionType.name(), @@ -127,6 +136,7 @@ public TransportRolloverAction( new RolloverExecutor(clusterService, allocationService, rolloverService, threadPool) ); this.metadataDataStreamsService = metadataDataStreamsService; + this.dataStreamAutoShardingService = dataStreamAutoShardingService; } @Override @@ -221,6 +231,40 @@ protected void masterOperation( listener.delegateFailureAndWrap((delegate, statsResponse) -> { + AutoShardingResult rolloverAutoSharding = null; + final IndexAbstraction indexAbstraction = clusterState.metadata() + .getIndicesLookup() + .get(rolloverRequest.getRolloverTarget()); + if (indexAbstraction.getType().equals(IndexAbstraction.Type.DATA_STREAM)) { + DataStream dataStream = (DataStream) indexAbstraction; + final Optional indexStats = Optional.ofNullable(statsResponse) + .map(stats -> stats.getIndex(dataStream.getWriteIndex().getName())); + + Double writeLoad = indexStats.map(stats -> stats.getTotal().getIndexing()) + .map(indexing -> indexing.getTotal().getWriteLoad()) + .orElse(null); + + rolloverAutoSharding = dataStreamAutoShardingService.calculate(clusterState, dataStream, writeLoad); + logger.debug("auto sharding result for data stream [{}] is [{}]", dataStream.getName(), rolloverAutoSharding); + + // if auto sharding recommends increasing the number of shards we want to trigger a rollover even if there are no + // other "regular" conditions matching (we want to aggressively increse the number of shards) so we're adding the + // automatic {@link OptimalShardCountCondition} to the rollover request conditions so it gets evaluated and triggers + // the rollover operation (having this condition met will also provide a useful paper trail as it'll get stored in + // the {@link org.elasticsearch.action.admin.indices.rollover.RolloverInfo#metConditions} ) + + // NOTE that the {@link AutoShardingType#DECREASE_SHARDS} recommendation is treated differently (i.e. added to the + // conditions later only if other "regular" rollover conditions match: see {@link RolloverTask#executeTask}) because we + // do NOT want to trigger a rollover **just** to reduce the number of shards, but we will reduce the number of shards + // when the rollover will naturally occur. + if (rolloverAutoSharding.type().equals(AutoShardingType.INCREASE_SHARDS)) { + RolloverConditions conditionsIncludingImplicit = RolloverConditions.newBuilder(rolloverRequest.getConditions()) + .addOptimalShardCountCondition(rolloverAutoSharding) + .build(); + rolloverRequest.setConditions(conditionsIncludingImplicit); + } + } + // Evaluate the conditions, so that we can tell without a cluster state update whether a rollover would occur. final Map trialConditionResults = evaluateConditions( rolloverRequest.getConditionValues(), @@ -247,7 +291,13 @@ protected void masterOperation( // Pre-check the conditions to see whether we should submit a new cluster state task if (rolloverRequest.areConditionsMet(trialConditionResults)) { String source = "rollover_index source [" + trialRolloverIndexName + "] to target [" + trialRolloverIndexName + "]"; - RolloverTask rolloverTask = new RolloverTask(rolloverRequest, statsResponse, trialRolloverResponse, delegate); + RolloverTask rolloverTask = new RolloverTask( + rolloverRequest, + statsResponse, + trialRolloverResponse, + rolloverAutoSharding, + delegate + ); submitRolloverTask(rolloverRequest, source, rolloverTask); } else { // conditions not met @@ -317,8 +367,10 @@ record RolloverTask( RolloverRequest rolloverRequest, IndicesStatsResponse statsResponse, RolloverResponse trialRolloverResponse, + @Nullable AutoShardingResult autoShardingResult, ActionListener listener ) implements ClusterStateTaskListener { + @Override public void onFailure(Exception e) { listener.onFailure(e); @@ -388,9 +440,24 @@ public ClusterState executeTask( ); if (rolloverRequest.getConditions().areConditionsMet(postConditionResults)) { + Map resultsIncludingDecreaseShards = new HashMap<>(postConditionResults); + if (rolloverTask.autoShardingResult != null + && rolloverTask.autoShardingResult.type().equals(AutoShardingType.DECREASE_SHARDS)) { + // if we're executing a rollover ("regular" conditions are met) and we're also decreasing the number of shards we'll + // include the decrease_shards optimal shard count condition in the response and {@link RolloverInfo#metConditions} + RolloverConditions conditionsIncludingDecreaseShards = RolloverConditions.newBuilder(rolloverRequest.getConditions()) + .addOptimalShardCountCondition(rolloverTask.autoShardingResult) + .build(); + rolloverRequest.setConditions(conditionsIncludingDecreaseShards); + resultsIncludingDecreaseShards.put( + new OptimalShardCountCondition(rolloverTask.autoShardingResult.targetNumberOfShards()).toString(), + true + ); + } + final List> metConditions = rolloverRequest.getConditionValues() .stream() - .filter(condition -> postConditionResults.get(condition.toString())) + .filter(condition -> resultsIncludingDecreaseShards.get(condition.toString())) .toList(); final IndexAbstraction rolloverTargetAbstraction = currentState.metadata() @@ -411,7 +478,8 @@ public ClusterState executeTask( Instant.now(), false, false, - sourceIndexStats + sourceIndexStats, + rolloverTask.autoShardingResult() ); results.add(rolloverResult); logger.trace("rollover result [{}]", rolloverResult); @@ -435,7 +503,7 @@ public ClusterState executeTask( // things like date resolution sourceIndexName, rolloverIndexName, - postConditionResults, + resultsIncludingDecreaseShards, false, true, true, diff --git a/server/src/main/java/org/elasticsearch/action/datastreams/autosharding/DataStreamAutoShardingService.java b/server/src/main/java/org/elasticsearch/action/datastreams/autosharding/DataStreamAutoShardingService.java index e830f538d222f..06aec69bc97da 100644 --- a/server/src/main/java/org/elasticsearch/action/datastreams/autosharding/DataStreamAutoShardingService.java +++ b/server/src/main/java/org/elasticsearch/action/datastreams/autosharding/DataStreamAutoShardingService.java @@ -29,6 +29,7 @@ import java.util.List; import java.util.Objects; import java.util.OptionalDouble; +import java.util.OptionalLong; import java.util.function.Function; import java.util.function.LongSupplier; @@ -380,12 +381,27 @@ static double getMaxIndexLoadWithinCoolingPeriod( // assume the current write index load is the highest observed and look back to find the actual maximum double maxIndexLoadWithinCoolingPeriod = writeIndexLoad; for (IndexWriteLoad writeLoad : writeLoadsWithinCoolingPeriod) { - double totalIndexLoad = 0; + // the IndexWriteLoad stores _for each shard_ a shard average write load ( calculated using : shard indexing time / shard + // uptime ) and its corresponding shard uptime + // + // to reconstruct the average _index_ write load we recalculate the shard indexing time by multiplying the shard write load + // to its uptime, and then, having the indexing time and uptime for each shard we calculate the average _index_ write load using + // (indexingTime_shard0 + indexingTime_shard1) / (uptime_shard0 + uptime_shard1) + // as {@link org.elasticsearch.index.shard.IndexingStats#add} does + double totalShardIndexingTime = 0; + long totalShardUptime = 0; for (int shardId = 0; shardId < writeLoad.numberOfShards(); shardId++) { final OptionalDouble writeLoadForShard = writeLoad.getWriteLoadForShard(shardId); - totalIndexLoad += writeLoadForShard.orElse(0); + final OptionalLong uptimeInMillisForShard = writeLoad.getUptimeInMillisForShard(shardId); + if (writeLoadForShard.isPresent()) { + assert uptimeInMillisForShard.isPresent(); + double shardIndexingTime = writeLoadForShard.getAsDouble() * uptimeInMillisForShard.getAsLong(); + long shardUptimeInMillis = uptimeInMillisForShard.getAsLong(); + totalShardIndexingTime += shardIndexingTime; + totalShardUptime += shardUptimeInMillis; + } } - + double totalIndexLoad = totalShardUptime == 0 ? 0.0 : (totalShardIndexingTime / totalShardUptime); if (totalIndexLoad > maxIndexLoadWithinCoolingPeriod) { maxIndexLoadWithinCoolingPeriod = totalIndexLoad; } diff --git a/server/src/main/java/org/elasticsearch/action/support/broadcast/node/TransportBroadcastByNodeAction.java b/server/src/main/java/org/elasticsearch/action/support/broadcast/node/TransportBroadcastByNodeAction.java index 19c7561ccdb15..45c13dde29d06 100644 --- a/server/src/main/java/org/elasticsearch/action/support/broadcast/node/TransportBroadcastByNodeAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/broadcast/node/TransportBroadcastByNodeAction.java @@ -542,7 +542,8 @@ public String toString() { } } - class NodeResponse extends TransportResponse { + // visible for testing + public class NodeResponse extends TransportResponse { protected String nodeId; protected int totalShards; protected List exceptions; @@ -560,7 +561,8 @@ class NodeResponse extends TransportResponse { } } - NodeResponse( + // visible for testing + public NodeResponse( String nodeId, int totalShards, List results, diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java b/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java index 073ba460a4698..a5f424f875eb7 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java @@ -462,19 +462,25 @@ public DataStreamAutoShardingEvent getAutoShardingEvent() { * @param writeIndex new write index * @param generation new generation * @param timeSeries whether the template that created this data stream is in time series mode + * @param autoShardingEvent the auto sharding event this rollover operation is applying * * @return new {@code DataStream} instance with the rollover operation applied */ - public DataStream rollover(Index writeIndex, long generation, boolean timeSeries) { + public DataStream rollover( + Index writeIndex, + long generation, + boolean timeSeries, + @Nullable DataStreamAutoShardingEvent autoShardingEvent + ) { ensureNotReplicated(); - return unsafeRollover(writeIndex, generation, timeSeries); + return unsafeRollover(writeIndex, generation, timeSeries, autoShardingEvent); } /** - * Like {@link #rollover(Index, long, boolean)}, but does no validation, use with care only. + * Like {@link #rollover(Index, long, boolean, DataStreamAutoShardingEvent)}, but does no validation, use with care only. */ - public DataStream unsafeRollover(Index writeIndex, long generation, boolean timeSeries) { + public DataStream unsafeRollover(Index writeIndex, long generation, boolean timeSeries, DataStreamAutoShardingEvent autoShardingEvent) { IndexMode indexMode = this.indexMode; if ((indexMode == null || indexMode == IndexMode.STANDARD) && timeSeries) { // This allows for migrating a data stream to be a tsdb data stream: @@ -506,7 +512,7 @@ public DataStream unsafeRollover(Index writeIndex, long generation, boolean time /** * Performs a dummy rollover on a {@code DataStream} instance and returns the tuple of the next write index name and next generation - * that this {@code DataStream} should roll over to using {@link #rollover(Index, long, boolean)}. + * that this {@code DataStream} should roll over to using {@link #rollover(Index, long, boolean, DataStreamAutoShardingEvent)}. * * @param clusterMetadata Cluster metadata * diff --git a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java index a2416fe6a6a15..21801dee844b0 100644 --- a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java @@ -11,6 +11,7 @@ import org.elasticsearch.action.admin.cluster.configuration.TransportAddVotingConfigExclusionsAction; import org.elasticsearch.action.admin.indices.close.TransportCloseIndexAction; import org.elasticsearch.action.bulk.WriteAckDelay; +import org.elasticsearch.action.datastreams.autosharding.DataStreamAutoShardingService; import org.elasticsearch.action.ingest.SimulatePipelineTransportAction; import org.elasticsearch.action.search.TransportSearchAction; import org.elasticsearch.action.support.AutoCreateIndex; @@ -209,6 +210,11 @@ public void apply(Settings value, Settings current, Settings previous) { BalancedShardsAllocator.WRITE_LOAD_BALANCE_FACTOR_SETTING, BalancedShardsAllocator.DISK_USAGE_BALANCE_FACTOR_SETTING, BalancedShardsAllocator.THRESHOLD_SETTING, + DataStreamAutoShardingService.DATA_STREAMS_AUTO_SHARDING_DECREASE_SHARDS_COOLDOWN, + DataStreamAutoShardingService.DATA_STREAMS_AUTO_SHARDING_INCREASE_SHARDS_COOLDOWN, + DataStreamAutoShardingService.DATA_STREAMS_AUTO_SHARDING_EXCLUDES_SETTING, + DataStreamAutoShardingService.CLUSTER_AUTO_SHARDING_MAX_WRITE_THREADS, + DataStreamAutoShardingService.CLUSTER_AUTO_SHARDING_MIN_WRITE_THREADS, DesiredBalanceComputer.PROGRESS_LOG_INTERVAL_SETTING, DesiredBalanceReconciler.UNDESIRED_ALLOCATIONS_LOG_INTERVAL_SETTING, DesiredBalanceReconciler.UNDESIRED_ALLOCATIONS_LOG_THRESHOLD_SETTING, diff --git a/server/src/main/java/org/elasticsearch/indices/IndicesModule.java b/server/src/main/java/org/elasticsearch/indices/IndicesModule.java index b94c95834f65a..048d9adb8e7e3 100644 --- a/server/src/main/java/org/elasticsearch/indices/IndicesModule.java +++ b/server/src/main/java/org/elasticsearch/indices/IndicesModule.java @@ -19,6 +19,7 @@ import org.elasticsearch.action.admin.indices.rollover.MinPrimaryShardDocsCondition; import org.elasticsearch.action.admin.indices.rollover.MinPrimaryShardSizeCondition; import org.elasticsearch.action.admin.indices.rollover.MinSizeCondition; +import org.elasticsearch.action.admin.indices.rollover.OptimalShardCountCondition; import org.elasticsearch.action.resync.TransportResyncReplicationAction; import org.elasticsearch.common.inject.AbstractModule; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; @@ -110,7 +111,8 @@ public static List getNamedWriteables() { new NamedWriteableRegistry.Entry(Condition.class, MaxDocsCondition.NAME, MaxDocsCondition::new), new NamedWriteableRegistry.Entry(Condition.class, MaxSizeCondition.NAME, MaxSizeCondition::new), new NamedWriteableRegistry.Entry(Condition.class, MaxPrimaryShardSizeCondition.NAME, MaxPrimaryShardSizeCondition::new), - new NamedWriteableRegistry.Entry(Condition.class, MaxPrimaryShardDocsCondition.NAME, MaxPrimaryShardDocsCondition::new) + new NamedWriteableRegistry.Entry(Condition.class, MaxPrimaryShardDocsCondition.NAME, MaxPrimaryShardDocsCondition::new), + new NamedWriteableRegistry.Entry(Condition.class, OptimalShardCountCondition.NAME, OptimalShardCountCondition::new) ); } @@ -165,6 +167,11 @@ public static List getNamedXContents() { Condition.class, new ParseField(MaxPrimaryShardDocsCondition.NAME), (p, c) -> MaxPrimaryShardDocsCondition.fromXContent(p) + ), + new NamedXContentRegistry.Entry( + Condition.class, + new ParseField(OptimalShardCountCondition.NAME), + (p, c) -> OptimalShardCountCondition.fromXContent(p) ) ); } diff --git a/server/src/main/java/org/elasticsearch/node/NodeConstruction.java b/server/src/main/java/org/elasticsearch/node/NodeConstruction.java index 19a6d200189f2..15ebe2752451d 100644 --- a/server/src/main/java/org/elasticsearch/node/NodeConstruction.java +++ b/server/src/main/java/org/elasticsearch/node/NodeConstruction.java @@ -22,6 +22,7 @@ import org.elasticsearch.action.ActionType; import org.elasticsearch.action.admin.cluster.repositories.reservedstate.ReservedRepositoryAction; import org.elasticsearch.action.admin.indices.template.reservedstate.ReservedComposableIndexTemplateAction; +import org.elasticsearch.action.datastreams.autosharding.DataStreamAutoShardingService; import org.elasticsearch.action.ingest.ReservedPipelineAction; import org.elasticsearch.action.search.SearchExecutionStatsCollector; import org.elasticsearch.action.search.SearchPhaseController; @@ -1061,6 +1062,14 @@ record PluginServiceInstances( modules.add(loadPluginComponents(pluginComponents)); + DataStreamAutoShardingService dataStreamAutoShardingService = new DataStreamAutoShardingService( + settings, + clusterService, + featureService, + threadPool::absoluteTimeInMillis + ); + dataStreamAutoShardingService.init(); + modules.add(b -> { b.bind(NodeService.class).toInstance(nodeService); b.bind(BigArrays.class).toInstance(bigArrays); @@ -1095,6 +1104,7 @@ record PluginServiceInstances( b.bind(IndexSettingProviders.class).toInstance(indexSettingProviders); b.bind(FileSettingsService.class).toInstance(fileSettingsService); b.bind(CompatibilityVersions.class).toInstance(compatibilityVersions); + b.bind(DataStreamAutoShardingService.class).toInstance(dataStreamAutoShardingService); }); if (ReadinessService.enabled(environment)) { diff --git a/server/src/test/java/org/elasticsearch/action/admin/indices/rollover/ConditionTests.java b/server/src/test/java/org/elasticsearch/action/admin/indices/rollover/ConditionTests.java index 56e2d63307103..a6a9bf14325d3 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/indices/rollover/ConditionTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/indices/rollover/ConditionTests.java @@ -12,8 +12,13 @@ import org.elasticsearch.core.TimeValue; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.EqualsHashCodeTestUtils; +import org.elasticsearch.xcontent.json.JsonXContent; + +import java.io.IOException; +import java.util.List; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; public class ConditionTests extends ESTestCase { @@ -347,6 +352,36 @@ public void testEqualsAndHashCode() { condition -> new MinPrimaryShardDocsCondition(condition.value), condition -> new MinPrimaryShardDocsCondition(randomNonNegativeLong()) ); + OptimalShardCountCondition optimalShardCountCondition = new OptimalShardCountCondition(3); + EqualsHashCodeTestUtils.checkEqualsAndHashCode( + optimalShardCountCondition, + condition -> new OptimalShardCountCondition(3), + condition -> new OptimalShardCountCondition(2) + ); + } + + public void testAutoShardCondition() { + OptimalShardCountCondition optimalShardCountCondition = new OptimalShardCountCondition(randomNonNegativeInt()); + assertThat( + optimalShardCountCondition.evaluate( + new Condition.Stats(1, randomNonNegativeLong(), randomByteSizeValue(), randomByteSizeValue(), 1) + ).matched(), + is(true) + ); + } + + public void testParseAutoShardConditionFromRolloverInfo() throws IOException { + long time = System.currentTimeMillis(); + RolloverInfo info = new RolloverInfo("logs-nginx", List.of(new OptimalShardCountCondition(3)), time); + + RolloverInfo parsedInfo = RolloverInfo.parse( + createParser( + JsonXContent.jsonXContent, + "{\n" + " \"met_conditions\": {\n" + " \"optimal_shard_count\": 3" + "\n},\n" + " \"time\": " + time + "\n" + " }" + ), + "logs-nginx" + ); + assertThat(parsedInfo, is(info)); } private static ByteSizeValue randomByteSize() { diff --git a/server/src/test/java/org/elasticsearch/action/admin/indices/rollover/MetadataRolloverServiceAutoShardingTests.java b/server/src/test/java/org/elasticsearch/action/admin/indices/rollover/MetadataRolloverServiceAutoShardingTests.java new file mode 100644 index 0000000000000..fd21e0c27099e --- /dev/null +++ b/server/src/test/java/org/elasticsearch/action/admin/indices/rollover/MetadataRolloverServiceAutoShardingTests.java @@ -0,0 +1,489 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.action.admin.indices.rollover; + +import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; +import org.elasticsearch.action.datastreams.autosharding.AutoShardingResult; +import org.elasticsearch.action.datastreams.autosharding.AutoShardingType; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.ComposableIndexTemplate; +import org.elasticsearch.cluster.metadata.DataStream; +import org.elasticsearch.cluster.metadata.DataStreamAutoShardingEvent; +import org.elasticsearch.cluster.metadata.DataStreamTestHelper; +import org.elasticsearch.cluster.metadata.IndexAbstraction; +import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.metadata.Metadata; +import org.elasticsearch.cluster.metadata.Template; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.core.TimeValue; +import org.elasticsearch.index.Index; +import org.elasticsearch.index.IndexVersion; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.TestThreadPool; +import org.elasticsearch.threadpool.ThreadPool; + +import java.time.Instant; +import java.util.List; +import java.util.Set; +import java.util.UUID; + +import static org.elasticsearch.action.datastreams.autosharding.AutoShardingType.COOLDOWN_PREVENTED_DECREASE; +import static org.elasticsearch.action.datastreams.autosharding.AutoShardingType.COOLDOWN_PREVENTED_INCREASE; +import static org.elasticsearch.action.datastreams.autosharding.AutoShardingType.DECREASE_SHARDS; +import static org.elasticsearch.action.datastreams.autosharding.AutoShardingType.INCREASE_SHARDS; +import static org.elasticsearch.action.datastreams.autosharding.AutoShardingType.NOT_APPLICABLE; +import static org.elasticsearch.action.datastreams.autosharding.AutoShardingType.NO_CHANGE_REQUIRED; +import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_INDEX_UUID; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.hasItem; +import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.lessThanOrEqualTo; + +public class MetadataRolloverServiceAutoShardingTests extends ESTestCase { + + public void testRolloverDataStreamWithoutExistingAutosharding() throws Exception { + String dataStreamName = "no_preexising_autoshard_event_ds"; + DataStream dataStream = DataStreamTestHelper.newInstance( + dataStreamName, + List.of( + new Index(DataStream.getDefaultBackingIndexName(dataStreamName, 1), UUID.randomUUID().toString()), + new Index(DataStream.getDefaultBackingIndexName(dataStreamName, 2), UUID.randomUUID().toString()), + new Index(DataStream.getDefaultBackingIndexName(dataStreamName, 3), UUID.randomUUID().toString()), + new Index(DataStream.getDefaultBackingIndexName(dataStreamName, 4), UUID.randomUUID().toString()), + new Index(DataStream.getDefaultBackingIndexName(dataStreamName, 5), UUID.randomUUID().toString()) + ), + 5, + null, + false, + null, + (DataStreamAutoShardingEvent) null + ); + ComposableIndexTemplate template = ComposableIndexTemplate.builder() + .indexPatterns(List.of(dataStreamName + "*")) + .dataStreamTemplate(new ComposableIndexTemplate.DataStreamTemplate()) + // all indices have, by default 3 shards (using a value GT 1 so we can test decreasing the number of shards) + .template(new Template(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 3).build(), null, null)) + .build(); + Metadata.Builder builder = Metadata.builder(); + builder.put("template", template); + for (Index index : dataStream.getIndices()) { + // all indices have, by default 3 shards (using a value GT 1 so we can test decreasing the number of shards) + builder.put(getIndexMetadataBuilderForIndex(index, 3)); + } + builder.put(dataStream); + final ClusterState clusterState = ClusterState.builder(new ClusterName("test")).metadata(builder).build(); + + ThreadPool testThreadPool = new TestThreadPool(getTestName()); + try { + MetadataRolloverService rolloverService = DataStreamTestHelper.getMetadataRolloverService( + dataStream, + testThreadPool, + Set.of(), + xContentRegistry() + ); + + // let's rollover the data stream using all the possible autosharding recommendations + for (AutoShardingType type : AutoShardingType.values()) { + long before = testThreadPool.absoluteTimeInMillis(); + switch (type) { + case INCREASE_SHARDS -> { + List> metConditions = List.of(new OptimalShardCountCondition(5)); + MetadataRolloverService.RolloverResult rolloverResult = rolloverService.rolloverClusterState( + clusterState, + dataStream.getName(), + null, + new CreateIndexRequest("_na_"), + metConditions, + Instant.now(), + randomBoolean(), + false, + null, + new AutoShardingResult(INCREASE_SHARDS, 3, 5, TimeValue.ZERO, 64.33) + ); + assertRolloverResult(dataStream, rolloverResult, before, testThreadPool.absoluteTimeInMillis(), metConditions, 5); + } + case DECREASE_SHARDS -> { + { + // we have another condition that matched, so the rollover will be executed and the new number of shards + // will be 1 + List> metConditions = List.of(new MaxDocsCondition(2L), new OptimalShardCountCondition(1)); + MetadataRolloverService.RolloverResult rolloverResult = rolloverService.rolloverClusterState( + clusterState, + dataStream.getName(), + null, + new CreateIndexRequest("_na_"), + metConditions, + Instant.now(), + randomBoolean(), + false, + null, + new AutoShardingResult(DECREASE_SHARDS, 3, 1, TimeValue.ZERO, 0.33) + ); + assertRolloverResult( + dataStream, + rolloverResult, + before, + testThreadPool.absoluteTimeInMillis(), + metConditions, + 1 + ); + } + + { + // even if the decrease shards recommendation is not a rollover condition, an empty POST _rollover request will + // configure the decrease shards recommendation + List> metConditions = List.of(new OptimalShardCountCondition(1)); + MetadataRolloverService.RolloverResult rolloverResult = rolloverService.rolloverClusterState( + clusterState, + dataStream.getName(), + null, + new CreateIndexRequest("_na_"), + metConditions, + Instant.now(), + randomBoolean(), + false, + null, + new AutoShardingResult(DECREASE_SHARDS, 3, 1, TimeValue.ZERO, 0.33) + ); + assertRolloverResult( + dataStream, + rolloverResult, + before, + testThreadPool.absoluteTimeInMillis(), + metConditions, + 1 + ); + } + } + case COOLDOWN_PREVENTED_INCREASE -> { + AutoShardingResult autoShardingResult = new AutoShardingResult( + COOLDOWN_PREVENTED_INCREASE, + 3, + 5, + TimeValue.timeValueMinutes(10), + 64.33 + ); + MetadataRolloverService.RolloverResult rolloverResult = rolloverService.rolloverClusterState( + clusterState, + dataStream.getName(), + null, + new CreateIndexRequest("_na_"), + List.of(), + Instant.now(), + randomBoolean(), + false, + null, + new AutoShardingResult(COOLDOWN_PREVENTED_INCREASE, 3, 5, TimeValue.timeValueMinutes(10), 64.33) + ); + // the expected number of shards remains 3 for the data stream due to the remaining cooldown + assertRolloverResult(dataStream, rolloverResult, before, testThreadPool.absoluteTimeInMillis(), List.of(), 3); + } + case COOLDOWN_PREVENTED_DECREASE -> { + MetadataRolloverService.RolloverResult rolloverResult = rolloverService.rolloverClusterState( + clusterState, + dataStream.getName(), + null, + new CreateIndexRequest("_na_"), + List.of(), + Instant.now(), + randomBoolean(), + false, + null, + new AutoShardingResult(COOLDOWN_PREVENTED_DECREASE, 3, 1, TimeValue.timeValueMinutes(10), 64.33) + ); + // the expected number of shards remains 3 for the data stream due to the remaining cooldown + assertRolloverResult(dataStream, rolloverResult, before, testThreadPool.absoluteTimeInMillis(), List.of(), 3); + } + case NO_CHANGE_REQUIRED -> { + List> metConditions = List.of(new MaxDocsCondition(randomNonNegativeLong())); + MetadataRolloverService.RolloverResult rolloverResult = rolloverService.rolloverClusterState( + clusterState, + dataStream.getName(), + null, + new CreateIndexRequest("_na_"), + metConditions, + Instant.now(), + randomBoolean(), + false, + null, + new AutoShardingResult(NO_CHANGE_REQUIRED, 3, 3, TimeValue.ZERO, 2.33) + ); + assertRolloverResult(dataStream, rolloverResult, before, testThreadPool.absoluteTimeInMillis(), metConditions, 3); + } + case NOT_APPLICABLE -> { + List> metConditions = List.of(new MaxDocsCondition(randomNonNegativeLong())); + MetadataRolloverService.RolloverResult rolloverResult = rolloverService.rolloverClusterState( + clusterState, + dataStream.getName(), + null, + new CreateIndexRequest("_na_"), + metConditions, + Instant.now(), + randomBoolean(), + false, + null, + new AutoShardingResult(NOT_APPLICABLE, 1, 1, TimeValue.MAX_VALUE, null) + ); + assertRolloverResult(dataStream, rolloverResult, before, testThreadPool.absoluteTimeInMillis(), metConditions, 3); + } + } + } + } finally { + testThreadPool.shutdown(); + } + } + + public void testRolloverDataStreamWithExistingAutoShardEvent() throws Exception { + String dataStreamName = "ds_with_existing_autoshard_event"; + String autoShardEventTriggerIndex = DataStream.getDefaultBackingIndexName(dataStreamName, 3); + DataStream dataStream = DataStreamTestHelper.newInstance( + dataStreamName, + List.of( + new Index(DataStream.getDefaultBackingIndexName(dataStreamName, 1), UUID.randomUUID().toString()), + new Index(DataStream.getDefaultBackingIndexName(dataStreamName, 2), UUID.randomUUID().toString()), + new Index(autoShardEventTriggerIndex, UUID.randomUUID().toString()), + new Index(DataStream.getDefaultBackingIndexName(dataStreamName, 4), UUID.randomUUID().toString()), + new Index(DataStream.getDefaultBackingIndexName(dataStreamName, 5), UUID.randomUUID().toString()) + ), + 5, + null, + false, + null, + new DataStreamAutoShardingEvent(autoShardEventTriggerIndex, 3, System.currentTimeMillis()) + ); + ComposableIndexTemplate template = ComposableIndexTemplate.builder() + .indexPatterns(List.of(dataStreamName + "*")) + .dataStreamTemplate(new ComposableIndexTemplate.DataStreamTemplate()) + // the index template does not configure any number of shards so we'll default to 1 + .build(); + Metadata.Builder builder = Metadata.builder(); + builder.put("template", template); + int numberOfShards = 1; + for (Index index : dataStream.getIndices()) { + if (index.getName().equals(autoShardEventTriggerIndex)) { + // we configure the indices to have 1 shard until the auto shard trigger index, after which we go to 3 shards + numberOfShards = 3; + } + builder.put(getIndexMetadataBuilderForIndex(index, numberOfShards)); + } + builder.put(dataStream); + final ClusterState clusterState = ClusterState.builder(new ClusterName("test")).metadata(builder).build(); + + ThreadPool testThreadPool = new TestThreadPool(getTestName()); + try { + MetadataRolloverService rolloverService = DataStreamTestHelper.getMetadataRolloverService( + dataStream, + testThreadPool, + Set.of(), + xContentRegistry() + ); + + // let's rollover the data stream using all the possible autosharding recommendations + for (AutoShardingType type : AutoShardingType.values()) { + long before = testThreadPool.absoluteTimeInMillis(); + switch (type) { + case INCREASE_SHARDS -> { + List> metConditions = List.of(new OptimalShardCountCondition(3)); + MetadataRolloverService.RolloverResult rolloverResult = rolloverService.rolloverClusterState( + clusterState, + dataStream.getName(), + null, + new CreateIndexRequest("_na_"), + metConditions, + Instant.now(), + randomBoolean(), + false, + null, + new AutoShardingResult(INCREASE_SHARDS, 3, 5, TimeValue.ZERO, 64.33) + ); + assertRolloverResult(dataStream, rolloverResult, before, testThreadPool.absoluteTimeInMillis(), metConditions, 5); + } + case DECREASE_SHARDS -> { + { + // we have another condition that matched, so the rollover will be executed and the new number of shards + // will be 1 + List> metConditions = List.of(new MaxDocsCondition(2L), new OptimalShardCountCondition(1)); + MetadataRolloverService.RolloverResult rolloverResult = rolloverService.rolloverClusterState( + clusterState, + dataStream.getName(), + null, + new CreateIndexRequest("_na_"), + metConditions, + Instant.now(), + randomBoolean(), + false, + null, + new AutoShardingResult(DECREASE_SHARDS, 3, 1, TimeValue.ZERO, 0.33) + ); + assertRolloverResult( + dataStream, + rolloverResult, + before, + testThreadPool.absoluteTimeInMillis(), + metConditions, + 1 + ); + } + + { + // even if the decrease shards recommendation is not a rollover condition, an empty POST _rollover request will + // configure the decrease shards recommendation + List> metConditions = List.of(new OptimalShardCountCondition(1)); + MetadataRolloverService.RolloverResult rolloverResult = rolloverService.rolloverClusterState( + clusterState, + dataStream.getName(), + null, + new CreateIndexRequest("_na_"), + metConditions, + Instant.now(), + randomBoolean(), + false, + null, + new AutoShardingResult(DECREASE_SHARDS, 3, 1, TimeValue.ZERO, 0.33) + ); + assertRolloverResult( + dataStream, + rolloverResult, + before, + testThreadPool.absoluteTimeInMillis(), + metConditions, + 1 + ); + } + } + case COOLDOWN_PREVENTED_INCREASE -> { + MetadataRolloverService.RolloverResult rolloverResult = rolloverService.rolloverClusterState( + clusterState, + dataStream.getName(), + null, + new CreateIndexRequest("_na_"), + List.of(), + Instant.now(), + randomBoolean(), + false, + null, + new AutoShardingResult(COOLDOWN_PREVENTED_INCREASE, 3, 5, TimeValue.timeValueMinutes(10), 64.33) + ); + // the expected number of shards remains 3 for the data stream due to the remaining cooldown + assertRolloverResult(dataStream, rolloverResult, before, testThreadPool.absoluteTimeInMillis(), List.of(), 3); + } + case COOLDOWN_PREVENTED_DECREASE -> { + MetadataRolloverService.RolloverResult rolloverResult = rolloverService.rolloverClusterState( + clusterState, + dataStream.getName(), + null, + new CreateIndexRequest("_na_"), + List.of(), + Instant.now(), + randomBoolean(), + false, + null, + new AutoShardingResult(COOLDOWN_PREVENTED_DECREASE, 3, 1, TimeValue.timeValueMinutes(10), 64.33) + ); + // the expected number of shards remains 3 for the data stream due to the remaining cooldown + assertRolloverResult(dataStream, rolloverResult, before, testThreadPool.absoluteTimeInMillis(), List.of(), 3); + } + case NO_CHANGE_REQUIRED -> { + List> metConditions = List.of(new MaxDocsCondition(randomNonNegativeLong())); + MetadataRolloverService.RolloverResult rolloverResult = rolloverService.rolloverClusterState( + clusterState, + dataStream.getName(), + null, + new CreateIndexRequest("_na_"), + metConditions, + Instant.now(), + randomBoolean(), + false, + null, + new AutoShardingResult(NO_CHANGE_REQUIRED, 3, 3, TimeValue.ZERO, 2.33) + ); + assertRolloverResult(dataStream, rolloverResult, before, testThreadPool.absoluteTimeInMillis(), metConditions, 3); + } + case NOT_APPLICABLE -> { + List> metConditions = List.of(new MaxDocsCondition(randomNonNegativeLong())); + MetadataRolloverService.RolloverResult rolloverResult = rolloverService.rolloverClusterState( + clusterState, + dataStream.getName(), + null, + new CreateIndexRequest("_na_"), + metConditions, + Instant.now(), + randomBoolean(), + false, + null, + new AutoShardingResult(NOT_APPLICABLE, 1, 1, TimeValue.MAX_VALUE, null) + ); + // if the auto sharding is not applicable we just use whatever's in the index template (1 shard in this case) + assertRolloverResult(dataStream, rolloverResult, before, testThreadPool.absoluteTimeInMillis(), metConditions, 1); + } + } + } + } finally { + testThreadPool.shutdown(); + } + } + + private static void assertRolloverResult( + DataStream preRolloverDataStream, + MetadataRolloverService.RolloverResult rolloverResult, + long before, + long after, + List> metConditions, + int expectedNumberOfShards + ) { + String sourceIndexName = DataStream.getDefaultBackingIndexName( + preRolloverDataStream.getName(), + preRolloverDataStream.getGeneration() + ); + String newIndexName = DataStream.getDefaultBackingIndexName( + preRolloverDataStream.getName(), + preRolloverDataStream.getGeneration() + 1 + ); + assertEquals(sourceIndexName, rolloverResult.sourceIndexName()); + assertEquals(newIndexName, rolloverResult.rolloverIndexName()); + Metadata rolloverMetadata = rolloverResult.clusterState().metadata(); + assertEquals(preRolloverDataStream.getIndices().size() + 1, rolloverMetadata.indices().size()); + IndexMetadata rolloverIndexMetadata = rolloverMetadata.index(newIndexName); + // number of shards remained the same + assertThat(rolloverIndexMetadata.getNumberOfShards(), is(expectedNumberOfShards)); + + IndexAbstraction ds = rolloverMetadata.getIndicesLookup().get(preRolloverDataStream.getName()); + assertThat(ds.getType(), equalTo(IndexAbstraction.Type.DATA_STREAM)); + assertThat(ds.getIndices(), hasSize(preRolloverDataStream.getIndices().size() + 1)); + assertThat(ds.getIndices(), hasItem(rolloverMetadata.index(sourceIndexName).getIndex())); + assertThat(ds.getIndices(), hasItem(rolloverIndexMetadata.getIndex())); + assertThat(ds.getWriteIndex(), equalTo(rolloverIndexMetadata.getIndex())); + + RolloverInfo info = rolloverMetadata.index(sourceIndexName).getRolloverInfos().get(preRolloverDataStream.getName()); + assertThat(info.getTime(), lessThanOrEqualTo(after)); + assertThat(info.getTime(), greaterThanOrEqualTo(before)); + assertThat(info.getMetConditions(), hasSize(metConditions.size())); + for (Condition rolloverInfoCondition : info.getMetConditions()) { + boolean foundMetCondition = false; + for (Condition metCondition : metConditions) { + if (metCondition.name.equals(rolloverInfoCondition.name)) { + foundMetCondition = true; + assertThat(rolloverInfoCondition.value, is(metCondition.value)); + break; + } + } + assertThat(foundMetCondition, is(true)); + } + } + + private static IndexMetadata.Builder getIndexMetadataBuilderForIndex(Index index, int numberOfShards) { + return IndexMetadata.builder(index.getName()) + .settings(ESTestCase.settings(IndexVersion.current()).put("index.hidden", true).put(SETTING_INDEX_UUID, index.getUUID())) + .numberOfShards(numberOfShards) + .numberOfReplicas(1); + } +} diff --git a/server/src/test/java/org/elasticsearch/action/admin/indices/rollover/MetadataRolloverServiceTests.java b/server/src/test/java/org/elasticsearch/action/admin/indices/rollover/MetadataRolloverServiceTests.java index 83bdc68d0b9c0..23905c9445d18 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/indices/rollover/MetadataRolloverServiceTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/indices/rollover/MetadataRolloverServiceTests.java @@ -547,6 +547,7 @@ public void testRolloverClusterState() throws Exception { Instant.now(), randomBoolean(), false, + null, null ); long after = testThreadPool.absoluteTimeInMillis(); @@ -615,6 +616,7 @@ public void testRolloverClusterStateForDataStream() throws Exception { Instant.now(), randomBoolean(), false, + null, null ); long after = testThreadPool.absoluteTimeInMillis(); @@ -701,6 +703,7 @@ public void testValidation() throws Exception { Instant.now(), randomBoolean(), true, + null, null ); @@ -742,6 +745,7 @@ public void testRolloverClusterStateForDataStreamNoTemplate() throws Exception { Instant.now(), false, randomBoolean(), + null, null ) ); diff --git a/server/src/test/java/org/elasticsearch/action/admin/indices/rollover/OptimalShardCountConditionTests.java b/server/src/test/java/org/elasticsearch/action/admin/indices/rollover/OptimalShardCountConditionTests.java new file mode 100644 index 0000000000000..b979a7f1ccd0e --- /dev/null +++ b/server/src/test/java/org/elasticsearch/action/admin/indices/rollover/OptimalShardCountConditionTests.java @@ -0,0 +1,33 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.action.admin.indices.rollover; + +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.test.AbstractWireSerializingTestCase; +import org.elasticsearch.test.ESTestCase; + +import java.io.IOException; + +public class OptimalShardCountConditionTests extends AbstractWireSerializingTestCase { + + @Override + protected Writeable.Reader instanceReader() { + return OptimalShardCountCondition::new; + } + + @Override + protected OptimalShardCountCondition createTestInstance() { + return new OptimalShardCountCondition(randomNonNegativeInt()); + } + + @Override + protected OptimalShardCountCondition mutateInstance(OptimalShardCountCondition instance) throws IOException { + return new OptimalShardCountCondition(randomValueOtherThan(instance.value, ESTestCase::randomNonNegativeInt)); + } +} diff --git a/server/src/test/java/org/elasticsearch/action/admin/indices/rollover/RolloverConditionsTests.java b/server/src/test/java/org/elasticsearch/action/admin/indices/rollover/RolloverConditionsTests.java index cda8a6086b53a..404c74d0854cf 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/indices/rollover/RolloverConditionsTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/indices/rollover/RolloverConditionsTests.java @@ -8,6 +8,8 @@ package org.elasticsearch.action.admin.indices.rollover; +import org.elasticsearch.action.datastreams.autosharding.AutoShardingResult; +import org.elasticsearch.action.datastreams.autosharding.AutoShardingType; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.unit.ByteSizeUnit; @@ -21,6 +23,8 @@ import java.util.Collections; import java.util.Map; +import static org.hamcrest.Matchers.is; + public class RolloverConditionsTests extends AbstractXContentSerializingTestCase { @Override @@ -157,5 +161,35 @@ public void testConditionsAreMet() { String minAgeCondition = new MinAgeCondition(age).toString(); assertFalse(rolloverConditions.areConditionsMet(Map.of(maxAgeCondition, true, minDocsCondition, true))); assertTrue(rolloverConditions.areConditionsMet(Map.of(maxAgeCondition, true, minDocsCondition, true, minAgeCondition, true))); + + OptimalShardCountCondition optimalShardCountCondition = new OptimalShardCountCondition(3); + rolloverConditions = RolloverConditions.newBuilder() + .addOptimalShardCountCondition( + randomBoolean() + ? new AutoShardingResult(AutoShardingType.INCREASE_SHARDS, 1, 3, TimeValue.ZERO, 3.0) + : new AutoShardingResult(AutoShardingType.DECREASE_SHARDS, 7, 3, TimeValue.ZERO, 0.8) + ) + .build(); + assertThat(rolloverConditions.areConditionsMet(Map.of(optimalShardCountCondition.toString(), true)), is(true)); + assertThat(rolloverConditions.areConditionsMet(Map.of(optimalShardCountCondition.toString(), false)), is(false)); + + // the rollover condition must be INCREASE or DECREASE_SHARDS, any other type should be ignored + rolloverConditions = RolloverConditions.newBuilder() + .addOptimalShardCountCondition( + new AutoShardingResult( + randomFrom( + AutoShardingType.COOLDOWN_PREVENTED_INCREASE, + AutoShardingType.COOLDOWN_PREVENTED_DECREASE, + AutoShardingType.NO_CHANGE_REQUIRED, + AutoShardingType.NOT_APPLICABLE + ), + 1, + 3, + TimeValue.ZERO, + 3.0 + ) + ) + .build(); + assertThat(rolloverConditions.getConditions().size(), is(0)); } } diff --git a/server/src/test/java/org/elasticsearch/action/admin/indices/rollover/TransportRolloverActionTests.java b/server/src/test/java/org/elasticsearch/action/admin/indices/rollover/TransportRolloverActionTests.java index 950d1a9f22f08..814cff37e0708 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/indices/rollover/TransportRolloverActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/indices/rollover/TransportRolloverActionTests.java @@ -16,6 +16,7 @@ import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse; import org.elasticsearch.action.admin.indices.stats.IndicesStatsTests; import org.elasticsearch.action.admin.indices.stats.ShardStats; +import org.elasticsearch.action.datastreams.autosharding.DataStreamAutoShardingService; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.action.support.master.AcknowledgedResponse; @@ -41,6 +42,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.core.TimeValue; +import org.elasticsearch.features.FeatureService; import org.elasticsearch.index.IndexMode; import org.elasticsearch.index.IndexVersion; import org.elasticsearch.index.cache.query.QueryCacheStats; @@ -111,6 +113,13 @@ public class TransportRolloverActionTests extends ESTestCase { WriteLoadForecaster.DEFAULT ); + final DataStreamAutoShardingService dataStreamAutoShardingService = new DataStreamAutoShardingService( + Settings.EMPTY, + mockClusterService, + new FeatureService(List.of()), + System::currentTimeMillis + ); + @Before public void setUpMocks() { when(mockNode.getId()).thenReturn("mocknode"); @@ -374,7 +383,8 @@ public void testConditionEvaluationWhenAliasToWriteAndReadIndicesConsidersOnlyPr rolloverService, mockClient, mockAllocationService, - mockMetadataDataStreamService + mockMetadataDataStreamService, + dataStreamAutoShardingService ); // For given alias, verify that condition evaluation fails when the condition doc count is greater than the primaries doc count @@ -449,7 +459,8 @@ public void testLazyRollover() throws Exception { rolloverService, mockClient, mockAllocationService, - mockMetadataDataStreamService + mockMetadataDataStreamService, + dataStreamAutoShardingService ); final PlainActionFuture future = new PlainActionFuture<>(); RolloverRequest rolloverRequest = new RolloverRequest("logs-ds", null); @@ -501,7 +512,8 @@ public void testLazyRolloverFails() throws Exception { rolloverService, mockClient, mockAllocationService, - mockMetadataDataStreamService + mockMetadataDataStreamService, + dataStreamAutoShardingService ); // Lazy rollover fails on a concrete index diff --git a/server/src/test/java/org/elasticsearch/action/datastreams/autosharding/DataStreamAutoShardingServiceTests.java b/server/src/test/java/org/elasticsearch/action/datastreams/autosharding/DataStreamAutoShardingServiceTests.java index 674b3e855e912..bc1ec6788eec6 100644 --- a/server/src/test/java/org/elasticsearch/action/datastreams/autosharding/DataStreamAutoShardingServiceTests.java +++ b/server/src/test/java/org/elasticsearch/action/datastreams/autosharding/DataStreamAutoShardingServiceTests.java @@ -51,7 +51,9 @@ import static org.elasticsearch.action.datastreams.autosharding.AutoShardingType.INCREASE_SHARDS; import static org.elasticsearch.action.datastreams.autosharding.AutoShardingType.NO_CHANGE_REQUIRED; import static org.elasticsearch.test.ClusterServiceUtils.createClusterService; +import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.lessThan; public class DataStreamAutoShardingServiceTests extends ESTestCase { @@ -65,10 +67,6 @@ public class DataStreamAutoShardingServiceTests extends ESTestCase { public void setupService() { threadPool = new TestThreadPool(getTestName()); Set> builtInClusterSettings = new HashSet<>(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); - builtInClusterSettings.add(DataStreamAutoShardingService.CLUSTER_AUTO_SHARDING_MIN_WRITE_THREADS); - builtInClusterSettings.add(DataStreamAutoShardingService.CLUSTER_AUTO_SHARDING_MAX_WRITE_THREADS); - builtInClusterSettings.add(DataStreamAutoShardingService.DATA_STREAMS_AUTO_SHARDING_INCREASE_SHARDS_COOLDOWN); - builtInClusterSettings.add(DataStreamAutoShardingService.DATA_STREAMS_AUTO_SHARDING_DECREASE_SHARDS_COOLDOWN); builtInClusterSettings.add( Setting.boolSetting( DataStreamAutoShardingService.DATA_STREAMS_AUTO_SHARDING_ENABLED, @@ -613,7 +611,7 @@ public void testGetMaxIndexLoadWithinCoolingPeriod() { indexMetadata = createIndexMetadata( DataStream.getDefaultBackingIndexName(dataStreamName, backingIndices.size(), createdAt), 3, - getWriteLoad(3, 3.0), // each backing index has a write load of 9.0 + getWriteLoad(3, 3.0), // each backing index has a write load of 3.0 createdAt ); } @@ -648,7 +646,67 @@ public void testGetMaxIndexLoadWithinCoolingPeriod() { () -> now ); // to cover the entire cooldown period, the last index before the cooling period is taken into account - assertThat(maxIndexLoadWithinCoolingPeriod, is(lastIndexBeforeCoolingPeriodHasLowWriteLoad ? 15.0 : 999.0)); + assertThat(maxIndexLoadWithinCoolingPeriod, is(lastIndexBeforeCoolingPeriodHasLowWriteLoad ? 5.0 : 999.0)); + } + + public void testIndexLoadWithinCoolingPeriodIsShardLoadsAvg() { + final TimeValue coolingPeriod = TimeValue.timeValueDays(3); + + final Metadata.Builder metadataBuilder = Metadata.builder(); + final int numberOfBackingIndicesWithinCoolingPeriod = randomIntBetween(3, 10); + final List backingIndices = new ArrayList<>(); + final String dataStreamName = "logs"; + long now = System.currentTimeMillis(); + + for (int i = 0; i < numberOfBackingIndicesWithinCoolingPeriod; i++) { + final long createdAt = now - (coolingPeriod.getMillis() / 2); + IndexMetadata indexMetadata; + IndexWriteLoad.Builder builder = IndexWriteLoad.builder(3); + for (int shardId = 0; shardId < 3; shardId++) { + switch (shardId) { + case 0 -> builder.withShardWriteLoad(shardId, 0.5, 40); + case 1 -> builder.withShardWriteLoad(shardId, 3.0, 10); + case 2 -> builder.withShardWriteLoad(shardId, 0.3333, 150); + } + } + indexMetadata = createIndexMetadata( + DataStream.getDefaultBackingIndexName(dataStreamName, backingIndices.size(), createdAt), + 3, + builder.build(), // max write index within cooling period should be 0.5 (ish) + createdAt + ); + backingIndices.add(indexMetadata.getIndex()); + metadataBuilder.put(indexMetadata, false); + } + + final String writeIndexName = DataStream.getDefaultBackingIndexName(dataStreamName, backingIndices.size()); + final IndexMetadata writeIndexMetadata = createIndexMetadata(writeIndexName, 3, getWriteLoad(3, 0.1), System.currentTimeMillis()); + backingIndices.add(writeIndexMetadata.getIndex()); + metadataBuilder.put(writeIndexMetadata, false); + + final DataStream dataStream = new DataStream( + dataStreamName, + backingIndices, + backingIndices.size(), + Collections.emptyMap(), + false, + false, + false, + false, + IndexMode.STANDARD + ); + + metadataBuilder.put(dataStream); + + double maxIndexLoadWithinCoolingPeriod = DataStreamAutoShardingService.getMaxIndexLoadWithinCoolingPeriod( + metadataBuilder.build(), + dataStream, + 0.1, + coolingPeriod, + () -> now + ); + assertThat(maxIndexLoadWithinCoolingPeriod, is(greaterThan(0.499))); + assertThat(maxIndexLoadWithinCoolingPeriod, is(lessThan(0.5))); } public void testAutoShardingResultValidation() { @@ -763,7 +821,7 @@ private IndexMetadata createIndexMetadata( private IndexWriteLoad getWriteLoad(int numberOfShards, double shardWriteLoad) { IndexWriteLoad.Builder builder = IndexWriteLoad.builder(numberOfShards); for (int shardId = 0; shardId < numberOfShards; shardId++) { - builder.withShardWriteLoad(shardId, shardWriteLoad, randomLongBetween(1, 10)); + builder.withShardWriteLoad(shardId, shardWriteLoad, 1); } return builder.build(); } diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamTests.java b/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamTests.java index a07cd8e60411a..8e1ce495fdf5c 100644 --- a/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamTests.java @@ -160,7 +160,7 @@ protected DataStream mutateInstance(DataStream instance) { public void testRollover() { DataStream ds = DataStreamTestHelper.randomInstance().promoteDataStream(); Tuple newCoordinates = ds.nextWriteIndexAndGeneration(Metadata.EMPTY_METADATA); - final DataStream rolledDs = ds.rollover(new Index(newCoordinates.v1(), UUIDs.randomBase64UUID()), newCoordinates.v2(), false); + final DataStream rolledDs = ds.rollover(new Index(newCoordinates.v1(), UUIDs.randomBase64UUID()), newCoordinates.v2(), false, null); assertThat(rolledDs.getName(), equalTo(ds.getName())); assertThat(rolledDs.getGeneration(), equalTo(ds.getGeneration() + 1)); assertThat(rolledDs.getIndices().size(), equalTo(ds.getIndices().size() + 1)); @@ -185,7 +185,7 @@ public void testRolloverWithConflictingBackingIndexName() { } final Tuple newCoordinates = ds.nextWriteIndexAndGeneration(builder.build()); - final DataStream rolledDs = ds.rollover(new Index(newCoordinates.v1(), UUIDs.randomBase64UUID()), newCoordinates.v2(), false); + final DataStream rolledDs = ds.rollover(new Index(newCoordinates.v1(), UUIDs.randomBase64UUID()), newCoordinates.v2(), false, null); assertThat(rolledDs.getName(), equalTo(ds.getName())); assertThat(rolledDs.getGeneration(), equalTo(ds.getGeneration() + numConflictingIndices + 1)); assertThat(rolledDs.getIndices().size(), equalTo(ds.getIndices().size() + 1)); @@ -215,7 +215,7 @@ public void testRolloverUpgradeToTsdbDataStream() { ); var newCoordinates = ds.nextWriteIndexAndGeneration(Metadata.EMPTY_METADATA); - var rolledDs = ds.rollover(new Index(newCoordinates.v1(), UUIDs.randomBase64UUID()), newCoordinates.v2(), true); + var rolledDs = ds.rollover(new Index(newCoordinates.v1(), UUIDs.randomBase64UUID()), newCoordinates.v2(), true, null); assertThat(rolledDs.getName(), equalTo(ds.getName())); assertThat(rolledDs.getGeneration(), equalTo(ds.getGeneration() + 1)); assertThat(rolledDs.getIndices().size(), equalTo(ds.getIndices().size() + 1)); @@ -243,7 +243,7 @@ public void testRolloverDowngradeToRegularDataStream() { ); var newCoordinates = ds.nextWriteIndexAndGeneration(Metadata.EMPTY_METADATA); - var rolledDs = ds.rollover(new Index(newCoordinates.v1(), UUIDs.randomBase64UUID()), newCoordinates.v2(), false); + var rolledDs = ds.rollover(new Index(newCoordinates.v1(), UUIDs.randomBase64UUID()), newCoordinates.v2(), false, null); assertThat(rolledDs.getName(), equalTo(ds.getName())); assertThat(rolledDs.getGeneration(), equalTo(ds.getGeneration() + 1)); assertThat(rolledDs.getIndices().size(), equalTo(ds.getIndices().size() + 1)); @@ -1752,6 +1752,66 @@ public void testGetIndicesWithinMaxAgeRange() { assertThat(indicesWithinMaxAgeRange, is(equalTo(expectedIndicesWithinMaxAgeRange))); } + public void testGetIndicesWithinMaxAgeRangeAllIndicesOutsideRange() { + final TimeValue maxIndexAge = TimeValue.timeValueDays(7); + + final Metadata.Builder metadataBuilder = Metadata.builder(); + final int numberOfBackingIndicesOlderThanMinAge = randomIntBetween(5, 10); + final int numberOfShards = 1; + final List backingIndices = new ArrayList<>(); + final String dataStreamName = "logs-es"; + final List backingIndicesOlderThanMinAge = new ArrayList<>(); + for (int i = 0; i < numberOfBackingIndicesOlderThanMinAge; i++) { + long creationDate = System.currentTimeMillis() - maxIndexAge.millis() * 2; + final IndexMetadata indexMetadata = createIndexMetadata( + DataStream.getDefaultBackingIndexName(dataStreamName, backingIndices.size(), creationDate), + randomIndexWriteLoad(numberOfShards), + creationDate + ); + backingIndices.add(indexMetadata.getIndex()); + backingIndicesOlderThanMinAge.add(indexMetadata.getIndex()); + metadataBuilder.put(indexMetadata, false); + } + + final String writeIndexName = DataStream.getDefaultBackingIndexName(dataStreamName, backingIndices.size()); + final IndexMetadata writeIndexMetadata = createIndexMetadata( + writeIndexName, + null, + System.currentTimeMillis() - maxIndexAge.millis() * 2 + ); + backingIndices.add(writeIndexMetadata.getIndex()); + metadataBuilder.put(writeIndexMetadata, false); + + final DataStream dataStream = new DataStream( + dataStreamName, + backingIndices, + backingIndices.size(), + Collections.emptyMap(), + false, + false, + false, + false, + randomBoolean() ? IndexMode.STANDARD : IndexMode.TIME_SERIES + ); + + metadataBuilder.put(dataStream); + + final List indicesWithinMaxAgeRange = DataStream.getIndicesWithinMaxAgeRange( + dataStream, + metadataBuilder::getSafe, + maxIndexAge, + System::currentTimeMillis + ); + + final List expectedIndicesWithinMaxAgeRange = new ArrayList<>(); + if (numberOfBackingIndicesOlderThanMinAge > 0) { + expectedIndicesWithinMaxAgeRange.add(backingIndicesOlderThanMinAge.get(backingIndicesOlderThanMinAge.size() - 1)); + } + expectedIndicesWithinMaxAgeRange.add(writeIndexMetadata.getIndex()); + assertThat(indicesWithinMaxAgeRange, is(equalTo(expectedIndicesWithinMaxAgeRange))); + assertThat(indicesWithinMaxAgeRange.get(indicesWithinMaxAgeRange.size() - 1).getName(), is(writeIndexName)); + } + private IndexWriteLoad randomIndexWriteLoad(int numberOfShards) { IndexWriteLoad.Builder builder = IndexWriteLoad.builder(numberOfShards); for (int shardId = 0; shardId < numberOfShards; shardId++) { diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/IndexMetadataTests.java b/server/src/test/java/org/elasticsearch/cluster/metadata/IndexMetadataTests.java index b4c9f670f66b6..5cc1a7206e7e4 100644 --- a/server/src/test/java/org/elasticsearch/cluster/metadata/IndexMetadataTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/metadata/IndexMetadataTests.java @@ -13,6 +13,7 @@ import org.elasticsearch.action.admin.indices.rollover.MaxPrimaryShardDocsCondition; import org.elasticsearch.action.admin.indices.rollover.MaxPrimaryShardSizeCondition; import org.elasticsearch.action.admin.indices.rollover.MaxSizeCondition; +import org.elasticsearch.action.admin.indices.rollover.OptimalShardCountCondition; import org.elasticsearch.action.admin.indices.rollover.RolloverInfo; import org.elasticsearch.cluster.routing.allocation.DataTier; import org.elasticsearch.common.Strings; @@ -97,7 +98,8 @@ public void testIndexMetadataSerialization() throws IOException { new MaxDocsCondition(randomNonNegativeLong()), new MaxSizeCondition(ByteSizeValue.ofBytes(randomNonNegativeLong())), new MaxPrimaryShardSizeCondition(ByteSizeValue.ofBytes(randomNonNegativeLong())), - new MaxPrimaryShardDocsCondition(randomNonNegativeLong()) + new MaxPrimaryShardDocsCondition(randomNonNegativeLong()), + new OptimalShardCountCondition(3) ), randomNonNegativeLong() ) @@ -128,6 +130,7 @@ public void testIndexMetadataSerialization() throws IOException { assertEquals(metadata.getCreationVersion(), fromXContentMeta.getCreationVersion()); assertEquals(metadata.getCompatibilityVersion(), fromXContentMeta.getCompatibilityVersion()); assertEquals(metadata.getRoutingNumShards(), fromXContentMeta.getRoutingNumShards()); + assertEquals(metadata.getRolloverInfos(), fromXContentMeta.getRolloverInfos()); assertEquals(metadata.getCreationDate(), fromXContentMeta.getCreationDate()); assertEquals(metadata.getRoutingFactor(), fromXContentMeta.getRoutingFactor()); assertEquals(metadata.primaryTerm(0), fromXContentMeta.primaryTerm(0)); diff --git a/test/framework/src/main/java/org/elasticsearch/cluster/metadata/DataStreamTestHelper.java b/test/framework/src/main/java/org/elasticsearch/cluster/metadata/DataStreamTestHelper.java index 8402b5756e915..5d6ba6c3a6d1d 100644 --- a/test/framework/src/main/java/org/elasticsearch/cluster/metadata/DataStreamTestHelper.java +++ b/test/framework/src/main/java/org/elasticsearch/cluster/metadata/DataStreamTestHelper.java @@ -120,6 +120,32 @@ public static DataStream newInstance( return newInstance(name, indices, generation, metadata, replicated, lifecycle, List.of()); } + public static DataStream newInstance( + String name, + List indices, + long generation, + Map metadata, + boolean replicated, + @Nullable DataStreamLifecycle lifecycle, + @Nullable DataStreamAutoShardingEvent autoShardingEvent + ) { + return new DataStream( + name, + indices, + generation, + metadata, + false, + replicated, + false, + false, + null, + lifecycle, + false, + List.of(), + autoShardingEvent + ); + } + public static DataStream newInstance( String name, List indices, diff --git a/x-pack/plugin/autoscaling/src/main/java/org/elasticsearch/xpack/autoscaling/storage/ReactiveStorageDeciderService.java b/x-pack/plugin/autoscaling/src/main/java/org/elasticsearch/xpack/autoscaling/storage/ReactiveStorageDeciderService.java index 2379e5f8e9380..28983fe34df91 100644 --- a/x-pack/plugin/autoscaling/src/main/java/org/elasticsearch/xpack/autoscaling/storage/ReactiveStorageDeciderService.java +++ b/x-pack/plugin/autoscaling/src/main/java/org/elasticsearch/xpack/autoscaling/storage/ReactiveStorageDeciderService.java @@ -817,7 +817,12 @@ private SingleForecast forecast(Metadata metadata, DataStream stream, long forec for (int i = 0; i < numberNewIndices; ++i) { final String uuid = UUIDs.randomBase64UUID(); final Tuple rolledDataStreamInfo = stream.unsafeNextWriteIndexAndGeneration(state.metadata()); - stream = stream.unsafeRollover(new Index(rolledDataStreamInfo.v1(), uuid), rolledDataStreamInfo.v2(), false); + stream = stream.unsafeRollover( + new Index(rolledDataStreamInfo.v1(), uuid), + rolledDataStreamInfo.v2(), + false, + stream.getAutoShardingEvent() + ); // this unintentionally copies the in-sync allocation ids too. This has the fortunate effect of these indices // not being regarded new by the disk threshold decider, thereby respecting the low watermark threshold even for primaries.