Skip to content

Commit

Permalink
Forecast write load during rollovers (#91425)
Browse files Browse the repository at this point in the history
  • Loading branch information
fcofdez authored Nov 14, 2022
1 parent dceff3f commit 089ee1d
Show file tree
Hide file tree
Showing 23 changed files with 1,058 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -62,3 +62,6 @@ org.apache.logging.log4j.message.ParameterizedMessage#<init>(java.lang.String, j
org.apache.logging.log4j.message.ParameterizedMessage#<init>(java.lang.String, java.lang.Object[])
org.apache.logging.log4j.message.ParameterizedMessage#<init>(java.lang.String, java.lang.Object)
org.apache.logging.log4j.message.ParameterizedMessage#<init>(java.lang.String, java.lang.Object, java.lang.Object)

@defaultMessage Use WriteLoadForecaster#getForecastedWriteLoad instead
org.elasticsearch.cluster.metadata.IndexMetadata#getForecastedWriteLoad()
5 changes: 5 additions & 0 deletions docs/changelog/91425.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 91425
summary: Forecast write load during rollovers
area: Allocation
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.elasticsearch.cluster.metadata.MetadataIndexAliasesService;
import org.elasticsearch.cluster.metadata.Template;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.routing.allocation.WriteLoadForecaster;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.IndexScopedSettings;
import org.elasticsearch.common.settings.Settings;
Expand Down Expand Up @@ -274,7 +275,8 @@ public void setup() throws Exception {
testThreadPool,
createIndexService,
indexAliasesService,
EmptySystemIndices.INSTANCE
EmptySystemIndices.INSTANCE,
WriteLoadForecaster.DEFAULT
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.elasticsearch.cluster.metadata.MetadataCreateIndexService;
import org.elasticsearch.cluster.metadata.MetadataIndexAliasesService;
import org.elasticsearch.cluster.metadata.MetadataIndexTemplateService;
import org.elasticsearch.cluster.routing.allocation.WriteLoadForecaster;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
Expand Down Expand Up @@ -64,18 +65,21 @@ public class MetadataRolloverService {
private final MetadataCreateIndexService createIndexService;
private final MetadataIndexAliasesService indexAliasesService;
private final SystemIndices systemIndices;
private final WriteLoadForecaster writeLoadForecaster;

@Inject
public MetadataRolloverService(
ThreadPool threadPool,
MetadataCreateIndexService createIndexService,
MetadataIndexAliasesService indexAliasesService,
SystemIndices systemIndices
SystemIndices systemIndices,
WriteLoadForecaster writeLoadForecaster
) {
this.threadPool = threadPool;
this.createIndexService = createIndexService;
this.indexAliasesService = indexAliasesService;
this.systemIndices = systemIndices;
this.writeLoadForecaster = writeLoadForecaster;
}

public record RolloverResult(String rolloverIndexName, String sourceIndexName, ClusterState clusterState) {
Expand Down Expand Up @@ -296,16 +300,16 @@ private RolloverResult rolloverDataStream(

RolloverInfo rolloverInfo = new RolloverInfo(dataStreamName, metConditions, threadPool.absoluteTimeInMillis());

newState = ClusterState.builder(newState)
.metadata(
Metadata.builder(newState.metadata())
.put(
IndexMetadata.builder(newState.metadata().index(originalWriteIndex))
.indexWriteLoad(sourceIndexWriteLoad)
.putRolloverInfo(rolloverInfo)
)
)
.build();
Metadata.Builder metadataBuilder = Metadata.builder(newState.metadata())
.put(
IndexMetadata.builder(newState.metadata().index(originalWriteIndex))
.indexWriteLoad(sourceIndexWriteLoad)
.putRolloverInfo(rolloverInfo)
);

metadataBuilder = writeLoadForecaster.withWriteLoadForecastForWriteIndex(dataStreamName, metadataBuilder);

newState = ClusterState.builder(newState).metadata(metadataBuilder).build();

return new RolloverResult(newWriteIndexName, originalWriteIndex.getName(), newState);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.routing.allocation.ExistingShardsAllocator;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.cluster.routing.allocation.WriteLoadForecaster;
import org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator;
import org.elasticsearch.cluster.routing.allocation.allocator.DesiredBalanceShardsAllocator;
import org.elasticsearch.cluster.routing.allocation.allocator.DesiredBalanceShardsAllocator.DesiredBalanceReconcilerAction;
Expand Down Expand Up @@ -122,7 +123,8 @@ public ClusterModule(
SnapshotsInfoService snapshotsInfoService,
ThreadPool threadPool,
SystemIndices systemIndices,
Supplier<RerouteService> rerouteServiceSupplier
Supplier<RerouteService> rerouteServiceSupplier,
WriteLoadForecaster writeLoadForecaster
) {
this.clusterPlugins = clusterPlugins;
this.deciderList = createAllocationDeciders(settings, clusterService.getClusterSettings(), clusterPlugins);
Expand All @@ -133,7 +135,8 @@ public ClusterModule(
threadPool,
clusterPlugins,
clusterService,
this::reconcile
this::reconcile,
writeLoadForecaster
);
this.clusterService = clusterService;
this.indexNameExpressionResolver = new IndexNameExpressionResolver(threadPool.getThreadContext(), systemIndices);
Expand Down Expand Up @@ -344,14 +347,15 @@ private static ShardsAllocator createShardsAllocator(
ThreadPool threadPool,
List<ClusterPlugin> clusterPlugins,
ClusterService clusterService,
DesiredBalanceReconcilerAction reconciler
DesiredBalanceReconcilerAction reconciler,
WriteLoadForecaster writeLoadForecaster
) {
Map<String, Supplier<ShardsAllocator>> allocators = new HashMap<>();
allocators.put(BALANCED_ALLOCATOR, () -> new BalancedShardsAllocator(settings, clusterSettings));
allocators.put(BALANCED_ALLOCATOR, () -> new BalancedShardsAllocator(settings, clusterSettings, writeLoadForecaster));
allocators.put(
DESIRED_BALANCE_ALLOCATOR,
() -> new DesiredBalanceShardsAllocator(
new BalancedShardsAllocator(settings, clusterSettings),
new BalancedShardsAllocator(settings, clusterSettings, writeLoadForecaster),
threadPool,
clusterService,
reconciler
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.OptionalDouble;
import java.util.Set;
import java.util.function.Function;

Expand Down Expand Up @@ -523,6 +524,8 @@ public Iterator<Setting<?>> settings() {
public static final String KEY_PRIMARY_TERMS = "primary_terms";
public static final String KEY_WRITE_LOAD = "write_load";

public static final String KEY_WRITE_LOAD_FORECAST = "write_load_forecast";

public static final String INDEX_STATE_FILE_PREFIX = "state-";

static final Version SYSTEM_INDEX_FLAG_ADDED = Version.V_7_10_0;
Expand Down Expand Up @@ -608,6 +611,8 @@ public Iterator<Setting<?>> settings() {
private final Instant timeSeriesEnd;
@Nullable
private final IndexWriteLoad writeLoad;
@Nullable
private final Double writeLoadForecast;

private IndexMetadata(
final Index index,
Expand Down Expand Up @@ -651,7 +656,8 @@ private IndexMetadata(
@Nullable final Instant timeSeriesStart,
@Nullable final Instant timeSeriesEnd,
final Version indexCompatibilityVersion,
@Nullable final IndexWriteLoad writeLoad
@Nullable final IndexWriteLoad writeLoad,
@Nullable final Double writeLoadForecast
) {
this.index = index;
this.version = version;
Expand Down Expand Up @@ -703,6 +709,7 @@ private IndexMetadata(
this.timeSeriesStart = timeSeriesStart;
this.timeSeriesEnd = timeSeriesEnd;
this.writeLoad = writeLoad;
this.writeLoadForecast = writeLoadForecast;
assert numberOfShards * routingFactor == routingNumShards : routingNumShards + " must be a multiple of " + numberOfShards;
}

Expand Down Expand Up @@ -752,7 +759,8 @@ IndexMetadata withMappingMetadata(MappingMetadata mapping) {
this.timeSeriesStart,
this.timeSeriesEnd,
this.indexCompatibilityVersion,
this.writeLoad
this.writeLoad,
this.writeLoadForecast
);
}

Expand Down Expand Up @@ -808,7 +816,8 @@ public IndexMetadata withInSyncAllocationIds(int shardId, Set<String> inSyncSet)
this.timeSeriesStart,
this.timeSeriesEnd,
this.indexCompatibilityVersion,
this.writeLoad
this.writeLoad,
this.writeLoadForecast
);
}

Expand Down Expand Up @@ -862,7 +871,8 @@ public IndexMetadata withIncrementedPrimaryTerm(int shardId) {
this.timeSeriesStart,
this.timeSeriesEnd,
this.indexCompatibilityVersion,
this.writeLoad
this.writeLoad,
this.writeLoadForecast
);
}

Expand Down Expand Up @@ -916,7 +926,8 @@ public IndexMetadata withTimestampRange(IndexLongFieldRange timestampRange) {
this.timeSeriesStart,
this.timeSeriesEnd,
this.indexCompatibilityVersion,
this.writeLoad
this.writeLoad,
this.writeLoadForecast
);
}

Expand Down Expand Up @@ -966,7 +977,8 @@ public IndexMetadata withIncrementedVersion() {
this.timeSeriesStart,
this.timeSeriesEnd,
this.indexCompatibilityVersion,
this.writeLoad
this.writeLoad,
this.writeLoadForecast
);
}

Expand Down Expand Up @@ -1162,6 +1174,10 @@ public IndexWriteLoad getWriteLoad() {
return writeLoad;
}

public OptionalDouble getForecastedWriteLoad() {
return writeLoadForecast == null ? OptionalDouble.empty() : OptionalDouble.of(writeLoadForecast);
}

public static final String INDEX_RESIZE_SOURCE_UUID_KEY = "index.resize.source.uuid";
public static final String INDEX_RESIZE_SOURCE_NAME_KEY = "index.resize.source.name";
public static final Setting<String> INDEX_RESIZE_SOURCE_UUID = Setting.simpleString(INDEX_RESIZE_SOURCE_UUID_KEY);
Expand Down Expand Up @@ -1397,6 +1413,7 @@ private static class IndexMetadataDiff implements Diff<IndexMetadata> {
private final boolean isSystem;
private final IndexLongFieldRange timestampRange;
private final IndexWriteLoad indexWriteLoad;
private final Double indexWriteLoadForecast;

IndexMetadataDiff(IndexMetadata before, IndexMetadata after) {
index = after.index.getName();
Expand Down Expand Up @@ -1431,6 +1448,7 @@ private static class IndexMetadataDiff implements Diff<IndexMetadata> {
isSystem = after.isSystem;
timestampRange = after.timestampRange;
indexWriteLoad = after.writeLoad;
indexWriteLoadForecast = after.writeLoadForecast;
}

private static final DiffableUtils.DiffableValueReader<String, AliasMetadata> ALIAS_METADATA_DIFF_VALUE_READER =
Expand Down Expand Up @@ -1483,8 +1501,10 @@ private static class IndexMetadataDiff implements Diff<IndexMetadata> {
timestampRange = IndexLongFieldRange.readFrom(in);
if (in.getVersion().onOrAfter(WRITE_LOAD_ADDED)) {
indexWriteLoad = in.readOptionalWriteable(IndexWriteLoad::new);
indexWriteLoadForecast = in.readOptionalDouble();
} else {
indexWriteLoad = null;
indexWriteLoadForecast = null;
}
}

Expand Down Expand Up @@ -1518,6 +1538,7 @@ public void writeTo(StreamOutput out) throws IOException {
timestampRange.writeTo(out);
if (out.getVersion().onOrAfter(WRITE_LOAD_ADDED)) {
out.writeOptionalWriteable(indexWriteLoad);
out.writeOptionalDouble(indexWriteLoadForecast);
}
}

Expand Down Expand Up @@ -1546,6 +1567,7 @@ public IndexMetadata apply(IndexMetadata part) {
builder.system(isSystem);
builder.timestampRange(timestampRange);
builder.indexWriteLoad(indexWriteLoad);
builder.indexWriteLoadForecast(indexWriteLoadForecast);
return builder.build();
}
}
Expand Down Expand Up @@ -1610,6 +1632,7 @@ public static IndexMetadata readFrom(StreamInput in, @Nullable Function<String,

if (in.getVersion().onOrAfter(WRITE_LOAD_ADDED)) {
builder.indexWriteLoad(in.readOptionalWriteable(IndexWriteLoad::new));
builder.indexWriteLoadForecast(in.readOptionalDouble());
}
return builder.build();
}
Expand Down Expand Up @@ -1654,6 +1677,7 @@ public void writeTo(StreamOutput out, boolean mappingsAsHash) throws IOException
timestampRange.writeTo(out);
if (out.getVersion().onOrAfter(WRITE_LOAD_ADDED)) {
out.writeOptionalWriteable(writeLoad);
out.writeOptionalDouble(writeLoadForecast);
}
}

Expand Down Expand Up @@ -1702,6 +1726,7 @@ public static class Builder {
private IndexLongFieldRange timestampRange = IndexLongFieldRange.NO_SHARDS;
private LifecycleExecutionState lifecycleExecutionState = LifecycleExecutionState.EMPTY_STATE;
private IndexWriteLoad indexWriteLoad = null;
private Double indexWriteLoadForecast = null;

public Builder(String index) {
this.index = index;
Expand Down Expand Up @@ -1731,6 +1756,7 @@ public Builder(IndexMetadata indexMetadata) {
this.timestampRange = indexMetadata.timestampRange;
this.lifecycleExecutionState = indexMetadata.lifecycleExecutionState;
this.indexWriteLoad = indexMetadata.writeLoad;
this.indexWriteLoadForecast = indexMetadata.writeLoadForecast;
}

public Builder index(String index) {
Expand Down Expand Up @@ -1950,6 +1976,11 @@ public Builder indexWriteLoad(IndexWriteLoad indexWriteLoad) {
return this;
}

public Builder indexWriteLoadForecast(Double indexWriteLoadForecast) {
this.indexWriteLoadForecast = indexWriteLoadForecast;
return this;
}

public IndexMetadata build() {
/*
* We expect that the metadata has been properly built to set the number of shards and the number of replicas, and do not rely
Expand Down Expand Up @@ -2128,7 +2159,8 @@ public IndexMetadata build() {
isTsdb ? IndexSettings.TIME_SERIES_START_TIME.get(settings) : null,
isTsdb ? IndexSettings.TIME_SERIES_END_TIME.get(settings) : null,
SETTING_INDEX_VERSION_COMPATIBILITY.get(settings),
indexWriteLoad
indexWriteLoad,
indexWriteLoadForecast
);
}

Expand Down Expand Up @@ -2246,6 +2278,10 @@ public static void toXContent(IndexMetadata indexMetadata, XContentBuilder build
builder.endObject();
}

if (indexMetadata.writeLoadForecast != null) {
builder.field(KEY_WRITE_LOAD_FORECAST, indexMetadata.writeLoadForecast);
}

builder.endObject();
}

Expand Down Expand Up @@ -2379,6 +2415,7 @@ public static IndexMetadata fromXContent(XContentParser parser, Map<String, Mapp
}
builder.putMapping(mappingsByHash.get(parser.text()));
}
case KEY_WRITE_LOAD_FORECAST -> builder.indexWriteLoadForecast(parser.doubleValue());
default -> throw new IllegalArgumentException("Unexpected field [" + currentFieldName + "]");
}
} else {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.cluster.routing.allocation;

import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.Metadata;

import java.util.OptionalDouble;

public interface WriteLoadForecaster {
WriteLoadForecaster DEFAULT = new DefaultWriteLoadForecaster();

Metadata.Builder withWriteLoadForecastForWriteIndex(String dataStreamName, Metadata.Builder metadata);

OptionalDouble getForecastedWriteLoad(IndexMetadata indexMetadata);

class DefaultWriteLoadForecaster implements WriteLoadForecaster {
@Override
public Metadata.Builder withWriteLoadForecastForWriteIndex(String dataStreamName, Metadata.Builder metadata) {
return metadata;
}

@Override
public OptionalDouble getForecastedWriteLoad(IndexMetadata indexMetadata) {
return OptionalDouble.empty();
}
}
}
Loading

0 comments on commit 089ee1d

Please sign in to comment.