diff --git a/build-tools-internal/src/main/resources/forbidden/es-all-signatures.txt b/build-tools-internal/src/main/resources/forbidden/es-all-signatures.txt index bb37ef72a1688..ca1e6d85be984 100644 --- a/build-tools-internal/src/main/resources/forbidden/es-all-signatures.txt +++ b/build-tools-internal/src/main/resources/forbidden/es-all-signatures.txt @@ -62,3 +62,6 @@ org.apache.logging.log4j.message.ParameterizedMessage#(java.lang.String, j org.apache.logging.log4j.message.ParameterizedMessage#(java.lang.String, java.lang.Object[]) org.apache.logging.log4j.message.ParameterizedMessage#(java.lang.String, java.lang.Object) org.apache.logging.log4j.message.ParameterizedMessage#(java.lang.String, java.lang.Object, java.lang.Object) + +@defaultMessage Use WriteLoadForecaster#getForecastedWriteLoad instead +org.elasticsearch.cluster.metadata.IndexMetadata#getForecastedWriteLoad() diff --git a/docs/changelog/91425.yaml b/docs/changelog/91425.yaml new file mode 100644 index 0000000000000..3200fcc2655f2 --- /dev/null +++ b/docs/changelog/91425.yaml @@ -0,0 +1,5 @@ +pr: 91425 +summary: Forecast write load during rollovers +area: Allocation +type: enhancement +issues: [] diff --git a/modules/data-streams/src/test/java/org/elasticsearch/datastreams/DataStreamGetWriteIndexTests.java b/modules/data-streams/src/test/java/org/elasticsearch/datastreams/DataStreamGetWriteIndexTests.java index 47b0da425db88..b7aceb6b0b46e 100644 --- a/modules/data-streams/src/test/java/org/elasticsearch/datastreams/DataStreamGetWriteIndexTests.java +++ b/modules/data-streams/src/test/java/org/elasticsearch/datastreams/DataStreamGetWriteIndexTests.java @@ -27,6 +27,7 @@ import org.elasticsearch.cluster.metadata.MetadataIndexAliasesService; import org.elasticsearch.cluster.metadata.Template; import org.elasticsearch.cluster.routing.allocation.AllocationService; +import org.elasticsearch.cluster.routing.allocation.WriteLoadForecaster; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.IndexScopedSettings; import org.elasticsearch.common.settings.Settings; @@ -274,7 +275,8 @@ public void setup() throws Exception { testThreadPool, createIndexService, indexAliasesService, - EmptySystemIndices.INSTANCE + EmptySystemIndices.INSTANCE, + WriteLoadForecaster.DEFAULT ); } diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/rollover/MetadataRolloverService.java b/server/src/main/java/org/elasticsearch/action/admin/indices/rollover/MetadataRolloverService.java index 9ab67dd4672b3..17d0b3a5607be 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/rollover/MetadataRolloverService.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/rollover/MetadataRolloverService.java @@ -24,6 +24,7 @@ import org.elasticsearch.cluster.metadata.MetadataCreateIndexService; import org.elasticsearch.cluster.metadata.MetadataIndexAliasesService; import org.elasticsearch.cluster.metadata.MetadataIndexTemplateService; +import org.elasticsearch.cluster.routing.allocation.WriteLoadForecaster; import org.elasticsearch.common.Strings; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; @@ -64,18 +65,21 @@ public class MetadataRolloverService { private final MetadataCreateIndexService createIndexService; private final MetadataIndexAliasesService indexAliasesService; private final SystemIndices systemIndices; + private final WriteLoadForecaster writeLoadForecaster; @Inject public MetadataRolloverService( ThreadPool threadPool, MetadataCreateIndexService createIndexService, MetadataIndexAliasesService indexAliasesService, - SystemIndices systemIndices + SystemIndices systemIndices, + WriteLoadForecaster writeLoadForecaster ) { this.threadPool = threadPool; this.createIndexService = createIndexService; this.indexAliasesService = indexAliasesService; this.systemIndices = systemIndices; + this.writeLoadForecaster = writeLoadForecaster; } public record RolloverResult(String rolloverIndexName, String sourceIndexName, ClusterState clusterState) { @@ -296,16 +300,16 @@ private RolloverResult rolloverDataStream( RolloverInfo rolloverInfo = new RolloverInfo(dataStreamName, metConditions, threadPool.absoluteTimeInMillis()); - newState = ClusterState.builder(newState) - .metadata( - Metadata.builder(newState.metadata()) - .put( - IndexMetadata.builder(newState.metadata().index(originalWriteIndex)) - .indexWriteLoad(sourceIndexWriteLoad) - .putRolloverInfo(rolloverInfo) - ) - ) - .build(); + Metadata.Builder metadataBuilder = Metadata.builder(newState.metadata()) + .put( + IndexMetadata.builder(newState.metadata().index(originalWriteIndex)) + .indexWriteLoad(sourceIndexWriteLoad) + .putRolloverInfo(rolloverInfo) + ); + + metadataBuilder = writeLoadForecaster.withWriteLoadForecastForWriteIndex(dataStreamName, metadataBuilder); + + newState = ClusterState.builder(newState).metadata(metadataBuilder).build(); return new RolloverResult(newWriteIndexName, originalWriteIndex.getName(), newState); } diff --git a/server/src/main/java/org/elasticsearch/cluster/ClusterModule.java b/server/src/main/java/org/elasticsearch/cluster/ClusterModule.java index d7d432c46239f..cf668ee18179e 100644 --- a/server/src/main/java/org/elasticsearch/cluster/ClusterModule.java +++ b/server/src/main/java/org/elasticsearch/cluster/ClusterModule.java @@ -29,6 +29,7 @@ import org.elasticsearch.cluster.routing.allocation.AllocationService; import org.elasticsearch.cluster.routing.allocation.ExistingShardsAllocator; import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; +import org.elasticsearch.cluster.routing.allocation.WriteLoadForecaster; import org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator; import org.elasticsearch.cluster.routing.allocation.allocator.DesiredBalanceShardsAllocator; import org.elasticsearch.cluster.routing.allocation.allocator.DesiredBalanceShardsAllocator.DesiredBalanceReconcilerAction; @@ -122,7 +123,8 @@ public ClusterModule( SnapshotsInfoService snapshotsInfoService, ThreadPool threadPool, SystemIndices systemIndices, - Supplier rerouteServiceSupplier + Supplier rerouteServiceSupplier, + WriteLoadForecaster writeLoadForecaster ) { this.clusterPlugins = clusterPlugins; this.deciderList = createAllocationDeciders(settings, clusterService.getClusterSettings(), clusterPlugins); @@ -133,7 +135,8 @@ public ClusterModule( threadPool, clusterPlugins, clusterService, - this::reconcile + this::reconcile, + writeLoadForecaster ); this.clusterService = clusterService; this.indexNameExpressionResolver = new IndexNameExpressionResolver(threadPool.getThreadContext(), systemIndices); @@ -344,14 +347,15 @@ private static ShardsAllocator createShardsAllocator( ThreadPool threadPool, List clusterPlugins, ClusterService clusterService, - DesiredBalanceReconcilerAction reconciler + DesiredBalanceReconcilerAction reconciler, + WriteLoadForecaster writeLoadForecaster ) { Map> allocators = new HashMap<>(); - allocators.put(BALANCED_ALLOCATOR, () -> new BalancedShardsAllocator(settings, clusterSettings)); + allocators.put(BALANCED_ALLOCATOR, () -> new BalancedShardsAllocator(settings, clusterSettings, writeLoadForecaster)); allocators.put( DESIRED_BALANCE_ALLOCATOR, () -> new DesiredBalanceShardsAllocator( - new BalancedShardsAllocator(settings, clusterSettings), + new BalancedShardsAllocator(settings, clusterSettings, writeLoadForecaster), threadPool, clusterService, reconciler diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/IndexMetadata.java b/server/src/main/java/org/elasticsearch/cluster/metadata/IndexMetadata.java index 5a651041e5143..7d3d74e50de2a 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/IndexMetadata.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/IndexMetadata.java @@ -69,6 +69,7 @@ import java.util.Locale; import java.util.Map; import java.util.Objects; +import java.util.OptionalDouble; import java.util.Set; import java.util.function.Function; @@ -523,6 +524,8 @@ public Iterator> settings() { public static final String KEY_PRIMARY_TERMS = "primary_terms"; public static final String KEY_WRITE_LOAD = "write_load"; + public static final String KEY_WRITE_LOAD_FORECAST = "write_load_forecast"; + public static final String INDEX_STATE_FILE_PREFIX = "state-"; static final Version SYSTEM_INDEX_FLAG_ADDED = Version.V_7_10_0; @@ -608,6 +611,8 @@ public Iterator> settings() { private final Instant timeSeriesEnd; @Nullable private final IndexWriteLoad writeLoad; + @Nullable + private final Double writeLoadForecast; private IndexMetadata( final Index index, @@ -651,7 +656,8 @@ private IndexMetadata( @Nullable final Instant timeSeriesStart, @Nullable final Instant timeSeriesEnd, final Version indexCompatibilityVersion, - @Nullable final IndexWriteLoad writeLoad + @Nullable final IndexWriteLoad writeLoad, + @Nullable final Double writeLoadForecast ) { this.index = index; this.version = version; @@ -703,6 +709,7 @@ private IndexMetadata( this.timeSeriesStart = timeSeriesStart; this.timeSeriesEnd = timeSeriesEnd; this.writeLoad = writeLoad; + this.writeLoadForecast = writeLoadForecast; assert numberOfShards * routingFactor == routingNumShards : routingNumShards + " must be a multiple of " + numberOfShards; } @@ -752,7 +759,8 @@ IndexMetadata withMappingMetadata(MappingMetadata mapping) { this.timeSeriesStart, this.timeSeriesEnd, this.indexCompatibilityVersion, - this.writeLoad + this.writeLoad, + this.writeLoadForecast ); } @@ -808,7 +816,8 @@ public IndexMetadata withInSyncAllocationIds(int shardId, Set inSyncSet) this.timeSeriesStart, this.timeSeriesEnd, this.indexCompatibilityVersion, - this.writeLoad + this.writeLoad, + this.writeLoadForecast ); } @@ -862,7 +871,8 @@ public IndexMetadata withIncrementedPrimaryTerm(int shardId) { this.timeSeriesStart, this.timeSeriesEnd, this.indexCompatibilityVersion, - this.writeLoad + this.writeLoad, + this.writeLoadForecast ); } @@ -916,7 +926,8 @@ public IndexMetadata withTimestampRange(IndexLongFieldRange timestampRange) { this.timeSeriesStart, this.timeSeriesEnd, this.indexCompatibilityVersion, - this.writeLoad + this.writeLoad, + this.writeLoadForecast ); } @@ -966,7 +977,8 @@ public IndexMetadata withIncrementedVersion() { this.timeSeriesStart, this.timeSeriesEnd, this.indexCompatibilityVersion, - this.writeLoad + this.writeLoad, + this.writeLoadForecast ); } @@ -1162,6 +1174,10 @@ public IndexWriteLoad getWriteLoad() { return writeLoad; } + public OptionalDouble getForecastedWriteLoad() { + return writeLoadForecast == null ? OptionalDouble.empty() : OptionalDouble.of(writeLoadForecast); + } + public static final String INDEX_RESIZE_SOURCE_UUID_KEY = "index.resize.source.uuid"; public static final String INDEX_RESIZE_SOURCE_NAME_KEY = "index.resize.source.name"; public static final Setting INDEX_RESIZE_SOURCE_UUID = Setting.simpleString(INDEX_RESIZE_SOURCE_UUID_KEY); @@ -1397,6 +1413,7 @@ private static class IndexMetadataDiff implements Diff { private final boolean isSystem; private final IndexLongFieldRange timestampRange; private final IndexWriteLoad indexWriteLoad; + private final Double indexWriteLoadForecast; IndexMetadataDiff(IndexMetadata before, IndexMetadata after) { index = after.index.getName(); @@ -1431,6 +1448,7 @@ private static class IndexMetadataDiff implements Diff { isSystem = after.isSystem; timestampRange = after.timestampRange; indexWriteLoad = after.writeLoad; + indexWriteLoadForecast = after.writeLoadForecast; } private static final DiffableUtils.DiffableValueReader ALIAS_METADATA_DIFF_VALUE_READER = @@ -1483,8 +1501,10 @@ private static class IndexMetadataDiff implements Diff { timestampRange = IndexLongFieldRange.readFrom(in); if (in.getVersion().onOrAfter(WRITE_LOAD_ADDED)) { indexWriteLoad = in.readOptionalWriteable(IndexWriteLoad::new); + indexWriteLoadForecast = in.readOptionalDouble(); } else { indexWriteLoad = null; + indexWriteLoadForecast = null; } } @@ -1518,6 +1538,7 @@ public void writeTo(StreamOutput out) throws IOException { timestampRange.writeTo(out); if (out.getVersion().onOrAfter(WRITE_LOAD_ADDED)) { out.writeOptionalWriteable(indexWriteLoad); + out.writeOptionalDouble(indexWriteLoadForecast); } } @@ -1546,6 +1567,7 @@ public IndexMetadata apply(IndexMetadata part) { builder.system(isSystem); builder.timestampRange(timestampRange); builder.indexWriteLoad(indexWriteLoad); + builder.indexWriteLoadForecast(indexWriteLoadForecast); return builder.build(); } } @@ -1610,6 +1632,7 @@ public static IndexMetadata readFrom(StreamInput in, @Nullable Function builder.indexWriteLoadForecast(parser.doubleValue()); default -> throw new IllegalArgumentException("Unexpected field [" + currentFieldName + "]"); } } else { diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/WriteLoadForecaster.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/WriteLoadForecaster.java new file mode 100644 index 0000000000000..f7749f0db4d04 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/WriteLoadForecaster.java @@ -0,0 +1,34 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.cluster.routing.allocation; + +import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.metadata.Metadata; + +import java.util.OptionalDouble; + +public interface WriteLoadForecaster { + WriteLoadForecaster DEFAULT = new DefaultWriteLoadForecaster(); + + Metadata.Builder withWriteLoadForecastForWriteIndex(String dataStreamName, Metadata.Builder metadata); + + OptionalDouble getForecastedWriteLoad(IndexMetadata indexMetadata); + + class DefaultWriteLoadForecaster implements WriteLoadForecaster { + @Override + public Metadata.Builder withWriteLoadForecastForWriteIndex(String dataStreamName, Metadata.Builder metadata) { + return metadata; + } + + @Override + public OptionalDouble getForecastedWriteLoad(IndexMetadata indexMetadata) { + return OptionalDouble.empty(); + } + } +} diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java index f86a401ea888e..ba0e5a8276a23 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java @@ -27,6 +27,7 @@ import org.elasticsearch.cluster.routing.allocation.NodeAllocationResult; import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; import org.elasticsearch.cluster.routing.allocation.ShardAllocationDecision; +import org.elasticsearch.cluster.routing.allocation.WriteLoadForecaster; import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders; import org.elasticsearch.cluster.routing.allocation.decider.Decision; import org.elasticsearch.cluster.routing.allocation.decider.Decision.Type; @@ -96,17 +97,19 @@ public class BalancedShardsAllocator implements ShardsAllocator { private volatile WeightFunction weightFunction; private volatile float threshold; + private final WriteLoadForecaster writeLoadForecaster; public BalancedShardsAllocator(Settings settings) { - this(settings, new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)); + this(settings, new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), WriteLoadForecaster.DEFAULT); } @Inject - public BalancedShardsAllocator(Settings settings, ClusterSettings clusterSettings) { + public BalancedShardsAllocator(Settings settings, ClusterSettings clusterSettings, WriteLoadForecaster writeLoadForecaster) { setWeightFunction(INDEX_BALANCE_FACTOR_SETTING.get(settings), SHARD_BALANCE_FACTOR_SETTING.get(settings)); setThreshold(THRESHOLD_SETTING.get(settings)); clusterSettings.addSettingsUpdateConsumer(INDEX_BALANCE_FACTOR_SETTING, SHARD_BALANCE_FACTOR_SETTING, this::setWeightFunction); clusterSettings.addSettingsUpdateConsumer(THRESHOLD_SETTING, this::setThreshold); + this.writeLoadForecaster = writeLoadForecaster; } private void setWeightFunction(float indexBalance, float shardBalanceFactor) { diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexWriteLoad.java b/server/src/main/java/org/elasticsearch/index/shard/IndexWriteLoad.java index e2922e0539685..d65151bc42538 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexWriteLoad.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexWriteLoad.java @@ -184,13 +184,15 @@ private Builder(int numShards) { Arrays.fill(uptimeInMillis, UNKNOWN_UPTIME); } - public void withShardWriteLoad(int shardId, double load, long uptimeInMillis) { + public Builder withShardWriteLoad(int shardId, double load, long uptimeInMillis) { if (shardId >= this.shardWriteLoad.length) { throw new IllegalArgumentException(); } this.shardWriteLoad[shardId] = load; this.uptimeInMillis[shardId] = uptimeInMillis; + + return this; } public IndexWriteLoad build() { diff --git a/server/src/main/java/org/elasticsearch/node/Node.java b/server/src/main/java/org/elasticsearch/node/Node.java index 0cb75947dfc36..f8cabf6ccaf72 100644 --- a/server/src/main/java/org/elasticsearch/node/Node.java +++ b/server/src/main/java/org/elasticsearch/node/Node.java @@ -63,6 +63,7 @@ import org.elasticsearch.cluster.routing.RerouteService; import org.elasticsearch.cluster.routing.allocation.DiskThresholdMonitor; import org.elasticsearch.cluster.routing.allocation.ShardsAvailabilityHealthIndicatorService; +import org.elasticsearch.cluster.routing.allocation.WriteLoadForecaster; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.StopWatch; import org.elasticsearch.common.breaker.CircuitBreaker; @@ -566,6 +567,11 @@ protected Node( repositoriesServiceReference::get, rerouteServiceReference::get ); + final WriteLoadForecaster writeLoadForecaster = getWriteLoadForecaster( + threadPool, + settings, + clusterService.getClusterSettings() + ); final ClusterModule clusterModule = new ClusterModule( settings, clusterService, @@ -574,7 +580,8 @@ protected Node( snapshotsInfoService, threadPool, systemIndices, - rerouteServiceReference::get + rerouteServiceReference::get, + writeLoadForecaster ); modules.add(clusterModule); IndicesModule indicesModule = new IndicesModule(pluginsService.filterPlugins(MapperPlugin.class)); @@ -1084,6 +1091,7 @@ protected Node( b.bind(HealthInfoCache.class).toInstance(nodeHealthOverview); b.bind(Tracer.class).toInstance(tracer); b.bind(FileSettingsService.class).toInstance(fileSettingsService); + b.bind(WriteLoadForecaster.class).toInstance(writeLoadForecaster); }); if (ReadinessService.enabled(environment)) { @@ -1233,6 +1241,23 @@ private RecoveryPlannerService getRecoveryPlannerService( return recoveryPlannerPlugins.get(0).createRecoveryPlannerService(shardSnapshotsService); } + private WriteLoadForecaster getWriteLoadForecaster(ThreadPool threadPool, Settings settings, ClusterSettings clusterSettings) { + final List clusterPlugins = pluginsService.filterPlugins(ClusterPlugin.class); + final List writeLoadForecasters = clusterPlugins.stream() + .flatMap(clusterPlugin -> clusterPlugin.createWriteLoadForecasters(threadPool, settings, clusterSettings).stream()) + .toList(); + + if (writeLoadForecasters.isEmpty()) { + return WriteLoadForecaster.DEFAULT; + } + + if (writeLoadForecasters.size() > 1) { + throw new IllegalStateException("A single WriteLoadForecaster was expected but got: " + writeLoadForecasters); + } + + return writeLoadForecasters.get(0); + } + protected TransportService newTransportService( Settings settings, Transport transport, diff --git a/server/src/main/java/org/elasticsearch/plugins/ClusterPlugin.java b/server/src/main/java/org/elasticsearch/plugins/ClusterPlugin.java index 53ed925c48ec3..3ca862d887313 100644 --- a/server/src/main/java/org/elasticsearch/plugins/ClusterPlugin.java +++ b/server/src/main/java/org/elasticsearch/plugins/ClusterPlugin.java @@ -9,10 +9,12 @@ package org.elasticsearch.plugins; import org.elasticsearch.cluster.routing.allocation.ExistingShardsAllocator; +import org.elasticsearch.cluster.routing.allocation.WriteLoadForecaster; import org.elasticsearch.cluster.routing.allocation.allocator.ShardsAllocator; import org.elasticsearch.cluster.routing.allocation.decider.AllocationDecider; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.threadpool.ThreadPool; import java.util.Collection; import java.util.Collections; @@ -58,6 +60,14 @@ default Map getExistingShardsAllocators() { return Collections.emptyMap(); } + default Collection createWriteLoadForecasters( + ThreadPool threadPool, + Settings settings, + ClusterSettings clusterSettings + ) { + return Collections.emptyList(); + } + /** * Called when the node is started */ diff --git a/server/src/test/java/org/elasticsearch/action/admin/indices/rollover/MetadataRolloverServiceTests.java b/server/src/test/java/org/elasticsearch/action/admin/indices/rollover/MetadataRolloverServiceTests.java index e95eae894fc6b..9500af1d086f6 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/indices/rollover/MetadataRolloverServiceTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/indices/rollover/MetadataRolloverServiceTests.java @@ -28,6 +28,7 @@ import org.elasticsearch.cluster.metadata.MetadataCreateIndexService; import org.elasticsearch.cluster.metadata.MetadataIndexAliasesService; import org.elasticsearch.cluster.metadata.Template; +import org.elasticsearch.cluster.routing.allocation.WriteLoadForecaster; import org.elasticsearch.common.Strings; import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.settings.Settings; @@ -684,7 +685,8 @@ public void testValidation() throws Exception { null, createIndexService, metadataIndexAliasesService, - EmptySystemIndices.INSTANCE + EmptySystemIndices.INSTANCE, + WriteLoadForecaster.DEFAULT ); String newIndexName = useDataStream == false && randomBoolean() ? "logs-index-9" : null; diff --git a/server/src/test/java/org/elasticsearch/action/admin/indices/rollover/TransportRolloverActionTests.java b/server/src/test/java/org/elasticsearch/action/admin/indices/rollover/TransportRolloverActionTests.java index 346b58c31cfbf..1a5b058127930 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/indices/rollover/TransportRolloverActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/indices/rollover/TransportRolloverActionTests.java @@ -33,6 +33,7 @@ import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.UnassignedInfo; import org.elasticsearch.cluster.routing.allocation.AllocationService; +import org.elasticsearch.cluster.routing.allocation.WriteLoadForecaster; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.settings.Settings; @@ -353,7 +354,8 @@ public void testConditionEvaluationWhenAliasToWriteAndReadIndicesConsidersOnlyPr mockThreadPool, mockCreateIndexService, mdIndexAliasesService, - EmptySystemIndices.INSTANCE + EmptySystemIndices.INSTANCE, + WriteLoadForecaster.DEFAULT ); final TransportRolloverAction transportRolloverAction = new TransportRolloverAction( mockTransportService, diff --git a/server/src/test/java/org/elasticsearch/cluster/ClusterModuleTests.java b/server/src/test/java/org/elasticsearch/cluster/ClusterModuleTests.java index 3ab20566ab74c..fe4147d5f8cb5 100644 --- a/server/src/test/java/org/elasticsearch/cluster/ClusterModuleTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/ClusterModuleTests.java @@ -13,6 +13,7 @@ import org.elasticsearch.cluster.routing.allocation.ExistingShardsAllocator; import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; import org.elasticsearch.cluster.routing.allocation.ShardAllocationDecision; +import org.elasticsearch.cluster.routing.allocation.WriteLoadForecaster; import org.elasticsearch.cluster.routing.allocation.allocator.ShardsAllocator; import org.elasticsearch.cluster.routing.allocation.decider.AllocationDecider; import org.elasticsearch.cluster.routing.allocation.decider.AwarenessAllocationDecider; @@ -155,7 +156,14 @@ public void testRegisterAllocationDeciderDuplicate() { public Collection createAllocationDeciders(Settings settings, ClusterSettings clusterSettings) { return Collections.singletonList(new EnableAllocationDecider(settings, clusterSettings)); } - }), clusterInfoService, null, threadPool, EmptySystemIndices.INSTANCE, ClusterModuleTests::getFakeRerouteService) + }), + clusterInfoService, + null, + threadPool, + EmptySystemIndices.INSTANCE, + ClusterModuleTests::getFakeRerouteService, + WriteLoadForecaster.DEFAULT + ) ); assertEquals(e.getMessage(), "Cannot specify allocation decider [" + EnableAllocationDecider.class.getName() + "] twice"); } @@ -166,7 +174,14 @@ public void testRegisterAllocationDecider() { public Collection createAllocationDeciders(Settings settings, ClusterSettings clusterSettings) { return Collections.singletonList(new FakeAllocationDecider()); } - }), clusterInfoService, null, threadPool, EmptySystemIndices.INSTANCE, ClusterModuleTests::getFakeRerouteService); + }), + clusterInfoService, + null, + threadPool, + EmptySystemIndices.INSTANCE, + ClusterModuleTests::getFakeRerouteService, + WriteLoadForecaster.DEFAULT + ); assertTrue(module.deciderList.stream().anyMatch(d -> d.getClass().equals(FakeAllocationDecider.class))); } @@ -176,7 +191,14 @@ private ClusterModule newClusterModuleWithShardsAllocator(Settings settings, Str public Map> getShardsAllocators(Settings settings, ClusterSettings clusterSettings) { return Collections.singletonMap(name, supplier); } - }), clusterInfoService, null, threadPool, EmptySystemIndices.INSTANCE, ClusterModuleTests::getFakeRerouteService); + }), + clusterInfoService, + null, + threadPool, + EmptySystemIndices.INSTANCE, + ClusterModuleTests::getFakeRerouteService, + WriteLoadForecaster.DEFAULT + ); } public void testRegisterShardsAllocator() { @@ -205,7 +227,8 @@ public void testUnknownShardsAllocator() { null, threadPool, EmptySystemIndices.INSTANCE, - ClusterModuleTests::getFakeRerouteService + ClusterModuleTests::getFakeRerouteService, + WriteLoadForecaster.DEFAULT ) ); assertEquals("Unknown ShardsAllocator [dne]", e.getMessage()); @@ -263,7 +286,8 @@ public void testRejectsReservedExistingShardsAllocatorName() { null, threadPool, EmptySystemIndices.INSTANCE, - ClusterModuleTests::getFakeRerouteService + ClusterModuleTests::getFakeRerouteService, + WriteLoadForecaster.DEFAULT ); expectThrows(IllegalArgumentException.class, () -> clusterModule.setExistingShardsAllocators(new TestGatewayAllocator())); } @@ -277,7 +301,8 @@ public void testRejectsDuplicateExistingShardsAllocatorName() { null, threadPool, EmptySystemIndices.INSTANCE, - ClusterModuleTests::getFakeRerouteService + ClusterModuleTests::getFakeRerouteService, + WriteLoadForecaster.DEFAULT ); expectThrows(IllegalArgumentException.class, () -> clusterModule.setExistingShardsAllocators(new TestGatewayAllocator())); } diff --git a/server/src/test/java/org/elasticsearch/cluster/ClusterStateTests.java b/server/src/test/java/org/elasticsearch/cluster/ClusterStateTests.java index f76b223884703..15d3f39ec5b5c 100644 --- a/server/src/test/java/org/elasticsearch/cluster/ClusterStateTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/ClusterStateTests.java @@ -286,7 +286,8 @@ public void testToXContent() throws IOException { "write_load": { "loads": [-1.0], "uptimes": [-1] - } + }, + "write_load_forecast" : 8.0 } }, "index-graveyard": { @@ -506,7 +507,8 @@ public void testToXContent_FlatSettingTrue_ReduceMappingFalse() throws IOExcepti "uptimes" : [ -1 ] - } + }, + "write_load_forecast" : 8.0 } }, "index-graveyard" : { @@ -733,7 +735,8 @@ public void testToXContent_FlatSettingFalse_ReduceMappingTrue() throws IOExcepti "uptimes" : [ -1 ] - } + }, + "write_load_forecast" : 8.0 } }, "index-graveyard" : { @@ -923,6 +926,7 @@ private ClusterState buildClusterState() throws IOException { .numberOfReplicas(2) .putRolloverInfo(new RolloverInfo("rolloveAlias", new ArrayList<>(), 1L)) .indexWriteLoad(IndexWriteLoad.builder(1).build()) + .indexWriteLoadForecast(8.0) .build(); return ClusterState.builder(ClusterName.DEFAULT) diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/IndexMetadataTests.java b/server/src/test/java/org/elasticsearch/cluster/metadata/IndexMetadataTests.java index 229133e6ba4d7..8af1d75cd5393 100644 --- a/server/src/test/java/org/elasticsearch/cluster/metadata/IndexMetadataTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/metadata/IndexMetadataTests.java @@ -25,6 +25,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.util.set.Sets; +import org.elasticsearch.core.SuppressForbidden; import org.elasticsearch.core.TimeValue; import org.elasticsearch.index.shard.IndexWriteLoad; import org.elasticsearch.index.shard.ShardId; @@ -66,6 +67,7 @@ protected NamedXContentRegistry xContentRegistry() { return new NamedXContentRegistry(IndicesModule.getNamedXContents()); } + @SuppressForbidden(reason = "Use IndexMetadata#getForecastedWriteLoad to ensure that the serialized value is correct") public void testIndexMetadataSerialization() throws IOException { Integer numShard = randomFrom(1, 2, 4, 8, 16); int numberOfReplicas = randomIntBetween(0, 10); @@ -74,6 +76,7 @@ public void testIndexMetadataSerialization() throws IOException { customMap.put(randomAlphaOfLength(5), randomAlphaOfLength(10)); customMap.put(randomAlphaOfLength(10), randomAlphaOfLength(15)); IndexWriteLoad indexWriteLoad = randomBoolean() ? randomWriteLoad(numShard) : null; + Double indexWriteLoadForecast = randomBoolean() ? randomDoubleBetween(0.0, 128, true) : null; IndexMetadata metadata = IndexMetadata.builder("foo") .settings( Settings.builder() @@ -101,6 +104,7 @@ public void testIndexMetadataSerialization() throws IOException { ) ) .indexWriteLoad(indexWriteLoad) + .indexWriteLoadForecast(indexWriteLoadForecast) .build(); assertEquals(system, metadata.isSystem()); @@ -130,6 +134,7 @@ public void testIndexMetadataSerialization() throws IOException { assertEquals(metadata.getCustomData(), expectedCustom); assertEquals(metadata.getCustomData(), fromXContentMeta.getCustomData()); assertEquals(metadata.getWriteLoad(), fromXContentMeta.getWriteLoad()); + assertEquals(metadata.getForecastedWriteLoad(), fromXContentMeta.getForecastedWriteLoad()); final BytesStreamOutput out = new BytesStreamOutput(); metadata.writeTo(out); @@ -151,6 +156,7 @@ public void testIndexMetadataSerialization() throws IOException { assertEquals(metadata.getCustomData(), deserialized.getCustomData()); assertEquals(metadata.isSystem(), deserialized.isSystem()); assertEquals(metadata.getWriteLoad(), deserialized.getWriteLoad()); + assertEquals(metadata.getForecastedWriteLoad(), fromXContentMeta.getForecastedWriteLoad()); } } diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/BalanceConfigurationTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/BalanceConfigurationTests.java index eeed2324dec75..3eb5c353585a1 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/BalanceConfigurationTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/BalanceConfigurationTests.java @@ -289,7 +289,7 @@ public void testPersistedSettings() { settings.put(BalancedShardsAllocator.SHARD_BALANCE_FACTOR_SETTING.getKey(), 0.3); settings.put(BalancedShardsAllocator.THRESHOLD_SETTING.getKey(), 2.0); ClusterSettings service = new ClusterSettings(Settings.builder().build(), ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); - BalancedShardsAllocator allocator = new BalancedShardsAllocator(settings.build(), service); + BalancedShardsAllocator allocator = new BalancedShardsAllocator(settings.build(), service, WriteLoadForecaster.DEFAULT); assertThat(allocator.getIndexBalance(), Matchers.equalTo(0.2f)); assertThat(allocator.getShardBalance(), Matchers.equalTo(0.3f)); assertThat(allocator.getThreshold(), Matchers.equalTo(2.0f)); diff --git a/test/framework/src/main/java/org/elasticsearch/cluster/metadata/DataStreamTestHelper.java b/test/framework/src/main/java/org/elasticsearch/cluster/metadata/DataStreamTestHelper.java index ea8160a3a1601..3d58830ddf491 100644 --- a/test/framework/src/main/java/org/elasticsearch/cluster/metadata/DataStreamTestHelper.java +++ b/test/framework/src/main/java/org/elasticsearch/cluster/metadata/DataStreamTestHelper.java @@ -13,6 +13,7 @@ import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.routing.allocation.AllocationService; +import org.elasticsearch.cluster.routing.allocation.WriteLoadForecaster; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.settings.IndexScopedSettings; @@ -489,7 +490,13 @@ public static MetadataRolloverService getMetadataRolloverService( new IndexSettingProviders(providers) ); MetadataIndexAliasesService indexAliasesService = new MetadataIndexAliasesService(clusterService, indicesService, null, registry); - return new MetadataRolloverService(testThreadPool, createIndexService, indexAliasesService, EmptySystemIndices.INSTANCE); + return new MetadataRolloverService( + testThreadPool, + createIndexService, + indexAliasesService, + EmptySystemIndices.INSTANCE, + WriteLoadForecaster.DEFAULT + ); } public static MetadataFieldMapper getDataStreamTimestampFieldMapper() { diff --git a/x-pack/plugin/write-load-forecaster/build.gradle b/x-pack/plugin/write-load-forecaster/build.gradle new file mode 100644 index 0000000000000..7fb79f06ff35a --- /dev/null +++ b/x-pack/plugin/write-load-forecaster/build.gradle @@ -0,0 +1,16 @@ +apply plugin: 'elasticsearch.internal-es-plugin' +apply plugin: 'elasticsearch.internal-cluster-test' + +esplugin { + name 'x-pack-write-load-forecaster' + description 'x' + classname 'org.elasticsearch.xpack.writeloadforecaster.WriteLoadForecasterPlugin' + extendedPlugins = ['x-pack-core'] +} +archivesBaseName = 'x-pack-write-load-forecaster' + +dependencies { + compileOnly project(path: xpackModule('core')) + testImplementation(testArtifact(project(xpackModule('core')))) + testImplementation project(':modules:data-streams') +} diff --git a/x-pack/plugin/write-load-forecaster/src/internalClusterTest/java/org/elasticsearch/xpack/writeloadforecaster/WriteLoadForecasterIT.java b/x-pack/plugin/write-load-forecaster/src/internalClusterTest/java/org/elasticsearch/xpack/writeloadforecaster/WriteLoadForecasterIT.java new file mode 100644 index 0000000000000..bef663671065a --- /dev/null +++ b/x-pack/plugin/write-load-forecaster/src/internalClusterTest/java/org/elasticsearch/xpack/writeloadforecaster/WriteLoadForecasterIT.java @@ -0,0 +1,223 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.writeloadforecaster; + +import org.elasticsearch.action.DocWriteRequest; +import org.elasticsearch.action.admin.indices.rollover.RolloverRequest; +import org.elasticsearch.action.admin.indices.stats.IndexShardStats; +import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse; +import org.elasticsearch.action.admin.indices.stats.ShardStats; +import org.elasticsearch.action.admin.indices.template.put.PutComposableIndexTemplateAction; +import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.datastreams.CreateDataStreamAction; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.ComposableIndexTemplate; +import org.elasticsearch.cluster.metadata.DataStream; +import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.metadata.Template; +import org.elasticsearch.cluster.routing.allocation.WriteLoadForecaster; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.core.SuppressForbidden; +import org.elasticsearch.datastreams.DataStreamsPlugin; +import org.elasticsearch.index.mapper.DateFieldMapper; +import org.elasticsearch.index.shard.IndexingStats; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.plugins.PluginsService; +import org.elasticsearch.test.ESIntegTestCase; +import org.elasticsearch.xcontent.XContentType; +import org.junit.Before; + +import java.util.Collection; +import java.util.List; +import java.util.Locale; +import java.util.OptionalDouble; +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.elasticsearch.cluster.metadata.MetadataIndexTemplateService.DEFAULT_TIMESTAMP_FIELD; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.not; + +@SuppressForbidden(reason = "Uses IndexMetadata#getForecastedWriteLoad to validate the computation") +public class WriteLoadForecasterIT extends ESIntegTestCase { + + @Override + protected Collection> nodePlugins() { + return List.of(DataStreamsPlugin.class, FakeLicenseWriteLoadForecasterPlugin.class); + } + + @Before + public void ensureValidLicense() { + setHasValidLicense(true); + } + + public void testWriteLoadForecastGetsPopulatedDuringRollovers() throws Exception { + final String dataStreamName = "logs-es"; + setUpDataStreamWriteDocsAndRollover(dataStreamName); + + final ClusterState clusterState = internalCluster().getCurrentMasterNodeInstance(ClusterService.class).state(); + final DataStream dataStream = clusterState.getMetadata().dataStreams().get(dataStreamName); + final IndexMetadata writeIndexMetadata = clusterState.metadata().getIndexSafe(dataStream.getWriteIndex()); + + final OptionalDouble indexMetadataForecastedWriteLoad = writeIndexMetadata.getForecastedWriteLoad(); + assertThat(indexMetadataForecastedWriteLoad.isPresent(), is(equalTo(true))); + assertThat(indexMetadataForecastedWriteLoad.getAsDouble(), is(greaterThan(0.0))); + + final WriteLoadForecaster writeLoadForecaster = internalCluster().getCurrentMasterNodeInstance(WriteLoadForecaster.class); + final OptionalDouble forecastedWriteLoad = writeLoadForecaster.getForecastedWriteLoad(writeIndexMetadata); + + assertThat(forecastedWriteLoad.isPresent(), is(equalTo(true))); + assertThat(forecastedWriteLoad.getAsDouble(), is(equalTo(indexMetadataForecastedWriteLoad.getAsDouble()))); + + setHasValidLicense(false); + + final OptionalDouble forecastedWriteLoadAfterLicenseChange = writeLoadForecaster.getForecastedWriteLoad(writeIndexMetadata); + assertThat(forecastedWriteLoadAfterLicenseChange.isPresent(), is(equalTo(false))); + } + + public void testWriteLoadForecastDoesNotGetPopulatedWithInvalidLicense() throws Exception { + setHasValidLicense(false); + + final String dataStreamName = "logs-es"; + setUpDataStreamWriteDocsAndRollover(dataStreamName); + + final ClusterState clusterState = internalCluster().getCurrentMasterNodeInstance(ClusterService.class).state(); + final DataStream dataStream = clusterState.getMetadata().dataStreams().get(dataStreamName); + final IndexMetadata writeIndexMetadata = clusterState.metadata().getIndexSafe(dataStream.getWriteIndex()); + + assertThat(writeIndexMetadata.getForecastedWriteLoad().isPresent(), is(equalTo(false))); + } + + public void testWriteLoadForecastIsOverriddenBySetting() throws Exception { + final double writeLoadForecastOverride = randomDoubleBetween(64, 128, true); + final String dataStreamName = "logs-es"; + setUpDataStreamWriteDocsAndRollover( + dataStreamName, + Settings.builder() + .put(WriteLoadForecasterPlugin.OVERRIDE_WRITE_LOAD_FORECAST_SETTING.getKey(), writeLoadForecastOverride) + .build() + ); + + final ClusterState clusterState = internalCluster().getCurrentMasterNodeInstance(ClusterService.class).state(); + final DataStream dataStream = clusterState.getMetadata().dataStreams().get(dataStreamName); + final IndexMetadata writeIndexMetadata = clusterState.metadata().getIndexSafe(dataStream.getWriteIndex()); + + final OptionalDouble indexMetadataForecastedWriteLoad = writeIndexMetadata.getForecastedWriteLoad(); + assertThat(indexMetadataForecastedWriteLoad.isPresent(), is(equalTo(true))); + assertThat(indexMetadataForecastedWriteLoad.getAsDouble(), is(greaterThan(0.0))); + + final WriteLoadForecaster writeLoadForecaster = internalCluster().getCurrentMasterNodeInstance(WriteLoadForecaster.class); + final OptionalDouble forecastedWriteLoad = writeLoadForecaster.getForecastedWriteLoad(writeIndexMetadata); + + assertThat(forecastedWriteLoad.isPresent(), is(equalTo(true))); + assertThat(forecastedWriteLoad.getAsDouble(), is(equalTo(writeLoadForecastOverride))); + assertThat(forecastedWriteLoad.getAsDouble(), is(not(equalTo(indexMetadataForecastedWriteLoad.getAsDouble())))); + + setHasValidLicense(false); + + final OptionalDouble forecastedWriteLoadAfterLicenseChange = writeLoadForecaster.getForecastedWriteLoad(writeIndexMetadata); + assertThat(forecastedWriteLoadAfterLicenseChange.isPresent(), is(equalTo(false))); + } + + private void setUpDataStreamWriteDocsAndRollover(String dataStreamName) throws Exception { + setUpDataStreamWriteDocsAndRollover(dataStreamName, Settings.EMPTY); + } + + private void setUpDataStreamWriteDocsAndRollover(String dataStreamName, Settings extraIndexTemplateSettings) throws Exception { + final int numberOfShards = randomIntBetween(1, 5); + final int numberOfReplicas = randomIntBetween(0, 1); + final Settings indexSettings = Settings.builder() + .put(extraIndexTemplateSettings) + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, numberOfShards) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, numberOfReplicas) + .build(); + + assertAcked( + client().execute( + PutComposableIndexTemplateAction.INSTANCE, + new PutComposableIndexTemplateAction.Request("my-template").indexTemplate( + new ComposableIndexTemplate( + List.of("logs-*"), + new Template(indexSettings, null, null), + null, + null, + null, + null, + new ComposableIndexTemplate.DataStreamTemplate(), + null + ) + ) + ).actionGet() + ); + assertAcked(client().execute(CreateDataStreamAction.INSTANCE, new CreateDataStreamAction.Request(dataStreamName)).actionGet()); + + final int numberOfRollovers = randomIntBetween(5, 10); + for (int i = 0; i < numberOfRollovers; i++) { + + assertBusy(() -> { + for (int j = 0; j < 10; j++) { + indexDocs(dataStreamName, randomIntBetween(100, 200)); + } + + final ClusterState clusterState = internalCluster().getCurrentMasterNodeInstance(ClusterService.class).state(); + final DataStream dataStream = clusterState.getMetadata().dataStreams().get(dataStreamName); + final String writeIndex = dataStream.getWriteIndex().getName(); + final IndicesStatsResponse indicesStatsResponse = client().admin().indices().prepareStats(writeIndex).get(); + for (IndexShardStats indexShardStats : indicesStatsResponse.getIndex(writeIndex).getIndexShards().values()) { + for (ShardStats shard : indexShardStats.getShards()) { + final IndexingStats.Stats shardIndexingStats = shard.getStats().getIndexing().getTotal(); + // Ensure that we have enough clock granularity before rolling over to ensure that we capture _some_ write load + assertThat(shardIndexingStats.getTotalActiveTimeInMillis(), is(greaterThan(0L))); + assertThat(shardIndexingStats.getWriteLoad(), is(greaterThan(0.0))); + } + } + }); + + assertAcked(client().admin().indices().rolloverIndex(new RolloverRequest(dataStreamName, null)).actionGet()); + } + } + + static void indexDocs(String dataStream, int numDocs) { + BulkRequest bulkRequest = new BulkRequest(); + for (int i = 0; i < numDocs; i++) { + String value = DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.formatMillis(System.currentTimeMillis()); + bulkRequest.add( + new IndexRequest(dataStream).opType(DocWriteRequest.OpType.CREATE) + .source(String.format(Locale.ROOT, "{\"%s\":\"%s\"}", DEFAULT_TIMESTAMP_FIELD, value), XContentType.JSON) + ); + } + client().bulk(bulkRequest).actionGet(); + } + + private void setHasValidLicense(boolean hasValidLicense) { + for (PluginsService pluginsService : internalCluster().getInstances(PluginsService.class)) { + for (var writeLoadForecasterPlugin : pluginsService.filterPlugins(FakeLicenseWriteLoadForecasterPlugin.class)) { + writeLoadForecasterPlugin.setHasValidLicense(hasValidLicense); + } + } + } + + public static class FakeLicenseWriteLoadForecasterPlugin extends WriteLoadForecasterPlugin { + private final AtomicBoolean hasValidLicense = new AtomicBoolean(true); + + public FakeLicenseWriteLoadForecasterPlugin() {} + + void setHasValidLicense(boolean validLicense) { + hasValidLicense.set(validLicense); + } + + @Override + protected boolean hasValidLicense() { + return hasValidLicense.get(); + } + } +} diff --git a/x-pack/plugin/write-load-forecaster/src/main/java/org/elasticsearch/xpack/writeloadforecaster/LicensedWriteLoadForecaster.java b/x-pack/plugin/write-load-forecaster/src/main/java/org/elasticsearch/xpack/writeloadforecaster/LicensedWriteLoadForecaster.java new file mode 100644 index 0000000000000..7746a2329ff5a --- /dev/null +++ b/x-pack/plugin/write-load-forecaster/src/main/java/org/elasticsearch/xpack/writeloadforecaster/LicensedWriteLoadForecaster.java @@ -0,0 +1,149 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.writeloadforecaster; + +import org.elasticsearch.cluster.metadata.DataStream; +import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.metadata.Metadata; +import org.elasticsearch.cluster.routing.allocation.WriteLoadForecaster; +import org.elasticsearch.common.settings.ClusterSettings; +import org.elasticsearch.common.settings.Setting; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.core.SuppressForbidden; +import org.elasticsearch.core.TimeValue; +import org.elasticsearch.index.Index; +import org.elasticsearch.index.shard.IndexWriteLoad; +import org.elasticsearch.threadpool.ThreadPool; + +import java.util.List; +import java.util.Objects; +import java.util.OptionalDouble; +import java.util.OptionalLong; +import java.util.function.BooleanSupplier; + +class LicensedWriteLoadForecaster implements WriteLoadForecaster { + public static final Setting MAX_INDEX_AGE_SETTING = Setting.timeSetting( + "write_load_forecaster.max_index_age", + TimeValue.timeValueDays(7), + TimeValue.timeValueHours(1), + Setting.Property.NodeScope, + Setting.Property.Dynamic + ); + private final BooleanSupplier hasValidLicense; + private final ThreadPool threadPool; + private volatile TimeValue maxIndexAge; + + LicensedWriteLoadForecaster( + BooleanSupplier hasValidLicense, + ThreadPool threadPool, + Settings settings, + ClusterSettings clusterSettings + ) { + this(hasValidLicense, threadPool, MAX_INDEX_AGE_SETTING.get(settings)); + clusterSettings.addSettingsUpdateConsumer(MAX_INDEX_AGE_SETTING, this::setMaxIndexAgeSetting); + } + + // exposed for tests only + LicensedWriteLoadForecaster(BooleanSupplier hasValidLicense, ThreadPool threadPool, TimeValue maxIndexAge) { + this.hasValidLicense = hasValidLicense; + this.threadPool = threadPool; + this.maxIndexAge = maxIndexAge; + } + + private void setMaxIndexAgeSetting(TimeValue updatedMaxIndexAge) { + this.maxIndexAge = updatedMaxIndexAge; + } + + @Override + public Metadata.Builder withWriteLoadForecastForWriteIndex(String dataStreamName, Metadata.Builder metadata) { + if (hasValidLicense.getAsBoolean() == false) { + return metadata; + } + + final DataStream dataStream = metadata.dataStream(dataStreamName); + + if (dataStream == null) { + return metadata; + } + + final List indicesWriteLoadWithinMaxAgeRange = getIndicesWithinMaxAgeRange(dataStream, metadata).stream() + .filter(index -> index.equals(dataStream.getWriteIndex()) == false) + .map(metadata::getSafe) + .map(IndexMetadata::getWriteLoad) + .filter(Objects::nonNull) + .toList(); + + OptionalDouble forecastIndexWriteLoad = forecastIndexWriteLoad(indicesWriteLoadWithinMaxAgeRange); + + if (forecastIndexWriteLoad.isEmpty()) { + return metadata; + } + + final IndexMetadata writeIndex = metadata.getSafe(dataStream.getWriteIndex()); + metadata.put(IndexMetadata.builder(writeIndex).indexWriteLoadForecast(forecastIndexWriteLoad.getAsDouble()).build(), false); + + return metadata; + } + + // Visible for testing + static OptionalDouble forecastIndexWriteLoad(List indicesWriteLoadWithinMaxAgeRange) { + double totalWeightedWriteLoad = 0; + long totalShardUptime = 0; + for (IndexWriteLoad writeLoad : indicesWriteLoadWithinMaxAgeRange) { + for (int shardId = 0; shardId < writeLoad.numberOfShards(); shardId++) { + final OptionalDouble writeLoadForShard = writeLoad.getWriteLoadForShard(shardId); + final OptionalLong uptimeInMillisForShard = writeLoad.getUptimeInMillisForShard(shardId); + if (writeLoadForShard.isPresent()) { + assert uptimeInMillisForShard.isPresent(); + double shardWriteLoad = writeLoadForShard.getAsDouble(); + long shardUptimeInMillis = uptimeInMillisForShard.getAsLong(); + totalWeightedWriteLoad += shardWriteLoad * shardUptimeInMillis; + totalShardUptime += shardUptimeInMillis; + } + } + } + + return totalShardUptime == 0 ? OptionalDouble.empty() : OptionalDouble.of(totalWeightedWriteLoad / totalShardUptime); + } + + // Visible for testing + List getIndicesWithinMaxAgeRange(DataStream dataStream, Metadata.Builder metadata) { + final List dataStreamIndices = dataStream.getIndices(); + final long currentTimeMillis = threadPool.absoluteTimeInMillis(); + // Consider at least 1 index (including the write index) for cases where rollovers happen less often than maxIndexAge + int firstIndexWithinAgeRange = Math.max(dataStreamIndices.size() - 2, 0); + for (int i = 0; i < dataStreamIndices.size(); i++) { + Index index = dataStreamIndices.get(i); + final IndexMetadata indexMetadata = metadata.getSafe(index); + final long indexAge = currentTimeMillis - indexMetadata.getCreationDate(); + if (indexAge < maxIndexAge.getMillis()) { + // We need to consider the previous index too in order to cover the entire max-index-age range. + firstIndexWithinAgeRange = i == 0 ? 0 : i - 1; + break; + } + } + return dataStreamIndices.subList(firstIndexWithinAgeRange, dataStreamIndices.size()); + } + + @Override + @SuppressForbidden(reason = "This is the only place where IndexMetadata#getForecastedWriteLoad is allowed to be used") + public OptionalDouble getForecastedWriteLoad(IndexMetadata indexMetadata) { + if (hasValidLicense.getAsBoolean() == false) { + return OptionalDouble.empty(); + } + + if (WriteLoadForecasterPlugin.OVERRIDE_WRITE_LOAD_FORECAST_SETTING.exists(indexMetadata.getSettings())) { + Double overrideWriteLoadForecast = WriteLoadForecasterPlugin.OVERRIDE_WRITE_LOAD_FORECAST_SETTING.get( + indexMetadata.getSettings() + ); + return OptionalDouble.of(overrideWriteLoadForecast); + } + + return indexMetadata.getForecastedWriteLoad(); + } +} diff --git a/x-pack/plugin/write-load-forecaster/src/main/java/org/elasticsearch/xpack/writeloadforecaster/WriteLoadForecasterPlugin.java b/x-pack/plugin/write-load-forecaster/src/main/java/org/elasticsearch/xpack/writeloadforecaster/WriteLoadForecasterPlugin.java new file mode 100644 index 0000000000000..2272c1258ee3b --- /dev/null +++ b/x-pack/plugin/write-load-forecaster/src/main/java/org/elasticsearch/xpack/writeloadforecaster/WriteLoadForecasterPlugin.java @@ -0,0 +1,60 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.writeloadforecaster; + +import org.elasticsearch.cluster.routing.allocation.WriteLoadForecaster; +import org.elasticsearch.common.settings.ClusterSettings; +import org.elasticsearch.common.settings.Setting; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.license.License; +import org.elasticsearch.license.LicensedFeature; +import org.elasticsearch.plugins.ClusterPlugin; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.xpack.core.XPackPlugin; + +import java.util.Collection; +import java.util.List; + +import static org.elasticsearch.xpack.writeloadforecaster.LicensedWriteLoadForecaster.MAX_INDEX_AGE_SETTING; + +public class WriteLoadForecasterPlugin extends Plugin implements ClusterPlugin { + public static final LicensedFeature.Momentary WRITE_LOAD_FORECAST_FEATURE = LicensedFeature.momentary( + null, + "write-load-forecast", + License.OperationMode.ENTERPRISE + ); + + public static final Setting OVERRIDE_WRITE_LOAD_FORECAST_SETTING = Setting.doubleSetting( + "index.override_write_load_forecast", + 0.0, + 0.0, + Setting.Property.Dynamic, + Setting.Property.IndexScope + ); + + public WriteLoadForecasterPlugin() {} + + protected boolean hasValidLicense() { + return WRITE_LOAD_FORECAST_FEATURE.check(XPackPlugin.getSharedLicenseState()); + } + + @Override + public List> getSettings() { + return List.of(MAX_INDEX_AGE_SETTING, OVERRIDE_WRITE_LOAD_FORECAST_SETTING); + } + + @Override + public Collection createWriteLoadForecasters( + ThreadPool threadPool, + Settings settings, + ClusterSettings clusterSettings + ) { + return List.of(new LicensedWriteLoadForecaster(this::hasValidLicense, threadPool, settings, clusterSettings)); + } +} diff --git a/x-pack/plugin/write-load-forecaster/src/test/java/org/elasticsearch/xpack/writeloadforecaster/LicensedWriteLoadForecasterTests.java b/x-pack/plugin/write-load-forecaster/src/test/java/org/elasticsearch/xpack/writeloadforecaster/LicensedWriteLoadForecasterTests.java new file mode 100644 index 0000000000000..29f2b44e814b2 --- /dev/null +++ b/x-pack/plugin/write-load-forecaster/src/test/java/org/elasticsearch/xpack/writeloadforecaster/LicensedWriteLoadForecasterTests.java @@ -0,0 +1,394 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.writeloadforecaster; + +import org.elasticsearch.Version; +import org.elasticsearch.cluster.metadata.DataStream; +import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.metadata.Metadata; +import org.elasticsearch.cluster.routing.allocation.WriteLoadForecaster; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.core.TimeValue; +import org.elasticsearch.index.Index; +import org.elasticsearch.index.IndexMode; +import org.elasticsearch.index.shard.IndexWriteLoad; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.TestThreadPool; +import org.elasticsearch.threadpool.ThreadPool; +import org.junit.After; +import org.junit.Before; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.OptionalDouble; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.elasticsearch.xpack.writeloadforecaster.LicensedWriteLoadForecaster.forecastIndexWriteLoad; +import static org.hamcrest.Matchers.closeTo; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.is; + +public class LicensedWriteLoadForecasterTests extends ESTestCase { + ThreadPool threadPool; + + @Before + public void setUpThreadPool() { + threadPool = new TestThreadPool(getTestName()); + } + + @After + public void tearDownThreadPool() { + ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS); + } + + public void testWriteLoadForecastIsAddedToWriteIndex() { + final TimeValue maxIndexAge = TimeValue.timeValueDays(7); + final AtomicBoolean hasValidLicense = new AtomicBoolean(true); + final WriteLoadForecaster writeLoadForecaster = new LicensedWriteLoadForecaster(hasValidLicense::get, threadPool, maxIndexAge); + + final Metadata.Builder metadataBuilder = Metadata.builder(); + final String dataStreamName = "logs-es"; + final int numberOfBackingIndices = 10; + final int numberOfShards = randomIntBetween(1, 5); + final List backingIndices = new ArrayList<>(); + for (int i = 0; i < numberOfBackingIndices; i++) { + final IndexMetadata indexMetadata = createIndexMetadata( + DataStream.getDefaultBackingIndexName(dataStreamName, i), + numberOfShards, + randomIndexWriteLoad(numberOfShards), + System.currentTimeMillis() - (maxIndexAge.millis() / 2) + ); + backingIndices.add(indexMetadata.getIndex()); + metadataBuilder.put(indexMetadata, false); + } + + final IndexMetadata writeIndexMetadata = createIndexMetadata( + DataStream.getDefaultBackingIndexName(dataStreamName, numberOfBackingIndices), + numberOfShards, + null, + System.currentTimeMillis() + ); + backingIndices.add(writeIndexMetadata.getIndex()); + metadataBuilder.put(writeIndexMetadata, false); + + final DataStream dataStream = createDataStream(dataStreamName, backingIndices); + metadataBuilder.put(dataStream); + + final Metadata.Builder updatedMetadataBuilder = writeLoadForecaster.withWriteLoadForecastForWriteIndex( + dataStream.getName(), + metadataBuilder + ); + + final IndexMetadata writeIndex = updatedMetadataBuilder.getSafe(dataStream.getWriteIndex()); + + final OptionalDouble forecastedWriteLoad = writeLoadForecaster.getForecastedWriteLoad(writeIndex); + + assertThat(forecastedWriteLoad.isPresent(), is(true)); + assertThat(forecastedWriteLoad.getAsDouble(), is(greaterThan(0.0))); + + hasValidLicense.set(false); + + final OptionalDouble forecastedWriteLoadAfterLicenseChange = writeLoadForecaster.getForecastedWriteLoad(writeIndex); + assertThat(forecastedWriteLoadAfterLicenseChange.isPresent(), is(false)); + } + + public void testUptimeIsUsedToWeightWriteLoad() { + final TimeValue maxIndexAge = TimeValue.timeValueDays(7); + final var metadataBuilder = Metadata.builder(); + final String dataStreamName = "logs-es"; + final int numberOfShards = 5; + final List backingIndices = new ArrayList<>(); + // Weighted avg 14.4 + final IndexMetadata indexMetadata = createIndexMetadata( + DataStream.getDefaultBackingIndexName(dataStreamName, 0), + numberOfShards, + IndexWriteLoad.builder(numberOfShards) + .withShardWriteLoad(0, 12, 80) + .withShardWriteLoad(1, 24, 5) + .withShardWriteLoad(2, 24, 5) + .withShardWriteLoad(3, 24, 5) + .withShardWriteLoad(4, 24, 5) + .build(), + System.currentTimeMillis() - (maxIndexAge.millis() / 2) + ); + backingIndices.add(indexMetadata.getIndex()); + metadataBuilder.put(indexMetadata, false); + + final IndexMetadata writeIndexMetadata = createIndexMetadata( + DataStream.getDefaultBackingIndexName(dataStreamName, 1), + numberOfShards, + null, + System.currentTimeMillis() + ); + backingIndices.add(writeIndexMetadata.getIndex()); + metadataBuilder.put(writeIndexMetadata, false); + + final DataStream dataStream = createDataStream(dataStreamName, backingIndices); + metadataBuilder.put(dataStream); + + final WriteLoadForecaster writeLoadForecaster = new LicensedWriteLoadForecaster(() -> true, threadPool, maxIndexAge); + + final Metadata.Builder updatedMetadataBuilder = writeLoadForecaster.withWriteLoadForecastForWriteIndex( + dataStream.getName(), + metadataBuilder + ); + + final IndexMetadata writeIndex = updatedMetadataBuilder.getSafe(dataStream.getWriteIndex()); + + final OptionalDouble forecastedWriteLoad = writeLoadForecaster.getForecastedWriteLoad(writeIndex); + + assertThat(forecastedWriteLoad.isPresent(), is(true)); + assertThat(forecastedWriteLoad.getAsDouble(), is(equalTo(14.4))); + } + + public void testForecastedWriteLoadIsOverriddenBySetting() { + final TimeValue maxIndexAge = TimeValue.timeValueDays(7); + final AtomicBoolean hasValidLicense = new AtomicBoolean(true); + final WriteLoadForecaster writeLoadForecaster = new LicensedWriteLoadForecaster(hasValidLicense::get, threadPool, maxIndexAge); + + final Metadata.Builder metadataBuilder = Metadata.builder(); + final String dataStreamName = "logs-es"; + final int numberOfBackingIndices = 10; + final int numberOfShards = randomIntBetween(1, 5); + final List backingIndices = new ArrayList<>(); + for (int i = 0; i < numberOfBackingIndices; i++) { + final IndexMetadata indexMetadata = createIndexMetadata( + DataStream.getDefaultBackingIndexName(dataStreamName, i), + numberOfShards, + randomIndexWriteLoad(numberOfShards), + System.currentTimeMillis() - (maxIndexAge.millis() / 2) + ); + backingIndices.add(indexMetadata.getIndex()); + metadataBuilder.put(indexMetadata, false); + } + + final IndexMetadata writeIndexMetadata = createIndexMetadata( + DataStream.getDefaultBackingIndexName(dataStreamName, numberOfBackingIndices), + numberOfShards, + null, + System.currentTimeMillis(), + Settings.builder().put(WriteLoadForecasterPlugin.OVERRIDE_WRITE_LOAD_FORECAST_SETTING.getKey(), 0.6).build() + ); + backingIndices.add(writeIndexMetadata.getIndex()); + metadataBuilder.put(writeIndexMetadata, false); + + final DataStream dataStream = createDataStream(dataStreamName, backingIndices); + metadataBuilder.put(dataStream); + + final Metadata.Builder updatedMetadataBuilder = writeLoadForecaster.withWriteLoadForecastForWriteIndex( + dataStream.getName(), + metadataBuilder + ); + + final IndexMetadata writeIndex = updatedMetadataBuilder.getSafe(dataStream.getWriteIndex()); + + final OptionalDouble forecastedWriteLoad = writeLoadForecaster.getForecastedWriteLoad(writeIndex); + + assertThat(forecastedWriteLoad.isPresent(), is(true)); + assertThat(forecastedWriteLoad.getAsDouble(), is(equalTo(0.6))); + + hasValidLicense.set(false); + + final OptionalDouble forecastedWriteLoadAfterLicenseChange = writeLoadForecaster.getForecastedWriteLoad(writeIndex); + assertThat(forecastedWriteLoadAfterLicenseChange.isPresent(), is(false)); + } + + public void testWriteLoadForecast() { + { + OptionalDouble writeLoadForecast = forecastIndexWriteLoad(List.of()); + assertThat(writeLoadForecast.isEmpty(), is(true)); + } + + { + OptionalDouble writeLoadForecast = forecastIndexWriteLoad(List.of(IndexWriteLoad.builder(5).build())); + assertThat(writeLoadForecast.isEmpty(), is(true)); + } + + { + OptionalDouble writeLoadForecast = forecastIndexWriteLoad( + List.of(IndexWriteLoad.builder(1).withShardWriteLoad(0, 12, 100).build()) + ); + assertThat(writeLoadForecast.isPresent(), is(true)); + assertThat(writeLoadForecast.getAsDouble(), is(equalTo(12.0))); + } + + { + OptionalDouble writeLoadForecast = forecastIndexWriteLoad( + List.of( + IndexWriteLoad.builder(5) + .withShardWriteLoad(0, 12, 80) + .withShardWriteLoad(1, 24, 5) + .withShardWriteLoad(2, 24, 5) + .withShardWriteLoad(3, 24, 5) + .withShardWriteLoad(4, 24, 5) + .build() + ) + ); + assertThat(writeLoadForecast.isPresent(), is(true)); + assertThat(writeLoadForecast.getAsDouble(), is(equalTo(14.4))); + } + + { + OptionalDouble writeLoadForecast = forecastIndexWriteLoad( + List.of( + IndexWriteLoad.builder(5) + .withShardWriteLoad(0, 12, 80) + .withShardWriteLoad(1, 24, 5) + .withShardWriteLoad(2, 24, 5) + .withShardWriteLoad(3, 24, 5) + .withShardWriteLoad(4, 24, 4) + .build(), + // Since this shard uptime is really low, it doesn't add much to the avg + IndexWriteLoad.builder(1).withShardWriteLoad(0, 120, 1).build() + ) + ); + assertThat(writeLoadForecast.isPresent(), is(true)); + assertThat(writeLoadForecast.getAsDouble(), is(equalTo(15.36))); + } + + { + OptionalDouble writeLoadForecast = forecastIndexWriteLoad( + List.of( + IndexWriteLoad.builder(2).withShardWriteLoad(0, 12, 25).withShardWriteLoad(1, 12, 25).build(), + + IndexWriteLoad.builder(1).withShardWriteLoad(0, 12, 50).build() + ) + ); + assertThat(writeLoadForecast.isPresent(), is(true)); + assertThat(writeLoadForecast.getAsDouble(), is(equalTo(12.0))); + } + + { + // All indices have the same uptime, therefore it's just a regular avg + OptionalDouble writeLoadForecast = forecastIndexWriteLoad( + List.of( + IndexWriteLoad.builder(3) + .withShardWriteLoad(0, 25, 1) + .withShardWriteLoad(1, 18, 1) + .withShardWriteLoad(2, 23, 1) + .build(), + + IndexWriteLoad.builder(2).withShardWriteLoad(0, 6, 1).withShardWriteLoad(1, 8, 1).build(), + + IndexWriteLoad.builder(1).withShardWriteLoad(0, 15, 1).build() + ) + ); + assertThat(writeLoadForecast.isPresent(), is(true)); + assertThat(writeLoadForecast.getAsDouble(), is(closeTo(15.83, 0.01))); + } + } + + public void testGetIndicesWithinMaxAgeRange() { + final TimeValue maxIndexAge = TimeValue.timeValueDays(7); + final LicensedWriteLoadForecaster writeLoadForecaster = new LicensedWriteLoadForecaster(() -> true, threadPool, maxIndexAge); + + final Metadata.Builder metadataBuilder = Metadata.builder(); + final int numberOfBackingIndicesOlderThanMinAge = randomIntBetween(0, 10); + final int numberOfBackingIndicesWithinMinAnge = randomIntBetween(0, 10); + final int numberOfShards = 1; + final List backingIndices = new ArrayList<>(); + final String dataStreamName = "logs-es"; + final List backingIndicesOlderThanMinAge = new ArrayList<>(); + for (int i = 0; i < numberOfBackingIndicesOlderThanMinAge; i++) { + long creationDate = System.currentTimeMillis() - maxIndexAge.millis() * 2; + final IndexMetadata indexMetadata = createIndexMetadata( + DataStream.getDefaultBackingIndexName(dataStreamName, backingIndices.size(), creationDate), + numberOfShards, + randomIndexWriteLoad(numberOfShards), + creationDate + ); + backingIndices.add(indexMetadata.getIndex()); + backingIndicesOlderThanMinAge.add(indexMetadata.getIndex()); + metadataBuilder.put(indexMetadata, false); + } + + final List backingIndicesWithinMinAge = new ArrayList<>(); + for (int i = 0; i < numberOfBackingIndicesWithinMinAnge; i++) { + final long createdAt = System.currentTimeMillis() - (maxIndexAge.getMillis() / 2); + final IndexMetadata indexMetadata = createIndexMetadata( + DataStream.getDefaultBackingIndexName(dataStreamName, backingIndices.size(), createdAt), + numberOfShards, + randomIndexWriteLoad(numberOfShards), + createdAt + ); + backingIndices.add(indexMetadata.getIndex()); + backingIndicesWithinMinAge.add(indexMetadata.getIndex()); + metadataBuilder.put(indexMetadata, false); + } + + final String writeIndexName = DataStream.getDefaultBackingIndexName(dataStreamName, backingIndices.size()); + final IndexMetadata writeIndexMetadata = createIndexMetadata(writeIndexName, numberOfShards, null, System.currentTimeMillis()); + backingIndices.add(writeIndexMetadata.getIndex()); + metadataBuilder.put(writeIndexMetadata, false); + + final DataStream dataStream = createDataStream(dataStreamName, backingIndices); + + metadataBuilder.put(dataStream); + + final List indicesWithinMaxAgeRange = writeLoadForecaster.getIndicesWithinMaxAgeRange(dataStream, metadataBuilder); + + final List expectedIndicesWithinMaxAgeRange = new ArrayList<>(); + if (numberOfBackingIndicesOlderThanMinAge > 0) { + expectedIndicesWithinMaxAgeRange.add(backingIndicesOlderThanMinAge.get(backingIndicesOlderThanMinAge.size() - 1)); + } + expectedIndicesWithinMaxAgeRange.addAll(backingIndicesWithinMinAge); + expectedIndicesWithinMaxAgeRange.add(writeIndexMetadata.getIndex()); + + assertThat(indicesWithinMaxAgeRange, is(equalTo(expectedIndicesWithinMaxAgeRange))); + } + + private IndexWriteLoad randomIndexWriteLoad(int numberOfShards) { + IndexWriteLoad.Builder builder = IndexWriteLoad.builder(numberOfShards); + for (int shardId = 0; shardId < numberOfShards; shardId++) { + builder.withShardWriteLoad(shardId, randomDoubleBetween(0, 64, true), randomLongBetween(1, 10)); + } + return builder.build(); + } + + private IndexMetadata createIndexMetadata(String indexName, int numberOfShards, IndexWriteLoad indexWriteLoad, long createdAt) { + return createIndexMetadata(indexName, numberOfShards, indexWriteLoad, createdAt, Settings.EMPTY); + } + + private IndexMetadata createIndexMetadata( + String indexName, + int numberOfShards, + IndexWriteLoad indexWriteLoad, + long createdAt, + Settings extraSettings + ) { + return IndexMetadata.builder(indexName) + .settings( + Settings.builder() + .put(extraSettings) + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, numberOfShards) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT) + .build() + ) + .indexWriteLoad(indexWriteLoad) + .creationDate(createdAt) + .build(); + } + + private DataStream createDataStream(String name, List backingIndices) { + return new DataStream( + name, + backingIndices, + backingIndices.size(), + Collections.emptyMap(), + false, + false, + false, + false, + IndexMode.STANDARD + ); + } +}