Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Forecast write load during rollovers #91425

Merged
merged 16 commits into from
Nov 14, 2022
Merged

Conversation

fcofdez
Copy link
Contributor

@fcofdez fcofdez commented Nov 8, 2022

No description provided.

@fcofdez fcofdez added >enhancement :Distributed Coordination/Allocation All issues relating to the decision making around placing a shard (both master logic & on the nodes) Team:Distributed (Obsolete) Meta label for distributed team (obsolete). Replaced by Distributed Indexing/Coordination. v8.6.0 labels Nov 8, 2022
@elasticsearchmachine
Copy link
Collaborator

Hi @fcofdez, I've created a changelog YAML for you.

out.writeDoubleArray(forecastedShardWriteLoad);
}

public static ClusterState maybeIncludeWriteLoadForecast(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the method that computes the forecasted write load for the new write index.

TimeValue minShardUptime
) {
final IndexMetadata writeIndex = clusterState.metadata().getIndexSafe(dataStream.getWriteIndex());
if (IndexSettings.FORECAST_WRITE_LOAD_SETTING.get(writeIndex.getSettings()) == false) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This private setting is set by the new plugin in order to check if the cluster has the proper license to use this feature.

Settings allSettings,
List<CompressedXContent> combinedTemplateMappings
) {
if (dataStreamName != null && metadata.dataStreams().get(dataStreamName) != null && hasValidLicense.getAsBoolean()) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is where we decider if we can add the forecasted write load depending on the current license. It's a bit convoluted but I couldn't find a better way to hook into the rollover.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One option is to add a new method to ClusterPlugin that returns a WriteLoadHandler, which can then be implemented in the plugin? This can then have the 2-3 methods necessary (calculate write-load, get write-load).

Copy link
Contributor

@henningandersen henningandersen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, left a few early comments on this.


final IndexWriteLoad writeLoad = indexMetadata.getWriteLoad();
for (int shardId = 0; shardId < numberOfShards; shardId++) {
OptionalDouble writeLoadForShard = writeLoad.getWriteLoadForShard(shardId);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can fail if the number of shards differ. Also, we may not consider all shards if the number of shards is now less than before.

dataStream,
newState,
TimeValue.timeValueDays(7),
TimeValue.timeValueHours(8)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems high and I think it excludes all indices that rollover more frequently than every 8 hours from ever having a write load forecast?

I think we can lower it to say 1 hour. But I also wonder if we should also use the avg up time of all candidate shards. Such that we would use min(avgUpTime, fixedMinimumUpTime) as the actual minimum uptime.

if (allSettings.hasValue(IndexSettings.DEFAULT_WRITE_LOAD_SETTING.getKey())) {
// TODO: warn when the setting exists and the license is invalid?
settingsBuilder.put(
IndexSettings.DEFAULT_INTERNAL_WRITE_LOAD_SETTING.getKey(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks unnecessary? I think we can instead guard the read of the write load setting by the FORECAST_WRITE_LOAD_SETTING?

if (shardIndicesTookIntoAccount > 0) {
modified = true;
double normalizedShardLoad = (totalWriteLoad[shardId] / shardIndicesTookIntoAccount) / maxWriteLoad[shardId];
projectedWriteLoad.withForecastedWriteLoad(shardId, normalizedShardLoad);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should instead just end up with one load number on the index. This can then be evenly spread across the shards. Trying to preserve any shard stickyness (i.e., routing use) seems heroic and makes it more difficult to disregard shards with short uptimes.

Settings allSettings,
List<CompressedXContent> combinedTemplateMappings
) {
if (dataStreamName != null && metadata.dataStreams().get(dataStreamName) != null && hasValidLicense.getAsBoolean()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One option is to add a new method to ClusterPlugin that returns a WriteLoadHandler, which can then be implemented in the plugin? This can then have the 2-3 methods necessary (calculate write-load, get write-load).

@fcofdez
Copy link
Contributor Author

fcofdez commented Nov 9, 2022

@henningandersen thanks for the early feedback, I iterated this based on your feedback, now all the forecast logic lives in the plugin and we compute a single value based on all the past indices write load average. Let me know what you think.

Copy link
Contributor

@henningandersen henningandersen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This direction looks good. I left a few detail comments. Did not dive into tests yet, since this is draft, happy to do so if tests are ready for review?

final long indexAge = System.currentTimeMillis() - indexMetadata.getCreationDate();
final IndexWriteLoad writeLoad = indexMetadata.getWriteLoad();

if (index.equals(dataStream.getWriteIndex()) || indexAge > maxIndexAge.millis() || writeLoad == null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For rollovers that happen less frequently than maxIndexAge, I think we'd want to include the index. I think we also want to include the first index in creation order that has creation-date before the max-age (though that may be unnecessary if we just ensure to include minimum 1). The reason being that in order to look back max-age time, you need to include the last index that was created before max-age since the life time of that overlaps the lookback period.

assert uptimeInMillisForShard.isPresent();
double shardWriteLoad = writeLoadForShard.getAsDouble();
long shardUptimeInMillis = uptimeInMillisForShard.getAsLong();
if (shardUptimeInMillis > minShardUptime.millis()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we can improve this, avoiding the min shard up time setting. My idea is to use the up-time to weigh the avg. i.e., we compute sum(shardWriteLoad*shardUptimeInMillis) and sum(shardUpTimeInMillis) (summing over all shards of all indices) and then calculate the final indexWriteLoadAvg (a few lines below) as

sum(shardWriteLoad*shardUptimeInMillis) / sum(shardUpTimeInMillis)

That way, if a shard had very short uptime, it's impact on the indexWriteLoadAvg will also be very small. And we avoid the minimum shard uptime setting, since the inaccuracy of short uptimes will not be important, but we'll still use whatever we have as the basis.

Otherwise I think we still risk not having any data if we rollover for instance every 30 mins on an index (unless if I missed where that is addressed).

);

public static final Setting<Double> DEFAULT_WRITE_LOAD_FORECAST_SETTING = Setting.doubleSetting(
"index.default_write_load_forecast",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this not the override more than the default write load? It seems to override our automated value.

Copy link
Contributor

@DaveCTurner DaveCTurner left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a few thoughts on the overall structure, I'm taking a deeper look at the details of the computation now.

@@ -307,6 +311,8 @@ private RolloverResult rolloverDataStream(
)
.build();

newState = writeLoadForecaster.withWriteLoadForecastForWriteIndex(dataStreamName, newState);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we make this an operator IndexMetadata.Builder -> IndexMetadata.Builder instead of ClusterState -> ClusterState to avoid having to build a new Metadata twice?

@@ -1162,6 +1174,10 @@ public IndexWriteLoad getWriteLoad() {
return writeLoad;
}

public OptionalDouble getForecastedWriteLoad() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be good to mark this as a forbidden API so we don't inadvertently call it directly, which would bypass the license check built into WriteLoadForecaster#getForecastedWriteLoad.

@fcofdez fcofdez changed the title [WIP] Forecast normalized write load during rollovers Forecast write load during rollovers Nov 10, 2022
@fcofdez fcofdez marked this pull request as ready for review November 10, 2022 14:30
@elasticsearchmachine
Copy link
Collaborator

Pinging @elastic/es-distributed (Team:Distributed)

@fcofdez
Copy link
Contributor Author

fcofdez commented Nov 10, 2022

I think this is ready for review, there are a few doc tests failures that I'm looking into, but I think that shouldn't block the review.

@fcofdez
Copy link
Contributor Author

fcofdez commented Nov 10, 2022

@elasticmachine run elasticsearch-ci/part-1

Copy link
Contributor

@henningandersen henningandersen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM.

Left a few comments, but unless radical changes come up, no need for another review round.

for (int i = 0; i < dataStreamIndices.size(); i++) {
Index index = dataStreamIndices.get(i);
final IndexMetadata indexMetadata = metadata.getSafe(index);
final long indexAge = System.currentTimeMillis() - indexMetadata.getCreationDate();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use ThreadPool.absoluteTimeInMillis instead - and only read it once outside the loop?

Comment on lines 120 to 122
return firstIndexWithinAgeRange == 0
? dataStreamIndices
: dataStreamIndices.subList(firstIndexWithinAgeRange, dataStreamIndices.size());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The full list optimization seems unimportant, can we not just do:

Suggested change
return firstIndexWithinAgeRange == 0
? dataStreamIndices
: dataStreamIndices.subList(firstIndexWithinAgeRange, dataStreamIndices.size());
return dataStreamIndices.subList(firstIndexWithinAgeRange, dataStreamIndices.size());


final OptionalDouble indexMetadataForecastedWriteLoad = writeIndexMetadata.getForecastedWriteLoad();
assertThat(indexMetadataForecastedWriteLoad.isPresent(), is(equalTo(true)));
assertThat(indexMetadataForecastedWriteLoad.getAsDouble(), is(greaterThanOrEqualTo(0.0)));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we change this to:

Suggested change
assertThat(indexMetadataForecastedWriteLoad.getAsDouble(), is(greaterThanOrEqualTo(0.0)));
assertThat(indexMetadataForecastedWriteLoad.getAsDouble(), is(greaterThan(0.0)));

since we assert busy in the setup, waiting for a write load to be registered?

}

public static class FakeLicenseWriteLoadForecasterPlugin extends WriteLoadForecasterPlugin {
private static final AtomicBoolean hasValidLicense = new AtomicBoolean(true);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd prefer to change to a non-static field here. I think you can find the plugins using

internalCluster().getInstances(PluginsService.class).filterPlugins(FakeLicenseWriteLoadForecasterPlugin.class))


final OptionalDouble indexMetadataForecastedWriteLoad = writeIndexMetadata.getForecastedWriteLoad();
assertThat(indexMetadataForecastedWriteLoad.isPresent(), is(equalTo(true)));
assertThat(indexMetadataForecastedWriteLoad.getAsDouble(), is(greaterThanOrEqualTo(0.0)));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we change this to:

Suggested change
assertThat(indexMetadataForecastedWriteLoad.getAsDouble(), is(greaterThanOrEqualTo(0.0)));
assertThat(indexMetadataForecastedWriteLoad.getAsDouble(), is(greaterThan(0.0)));

since we assert busy in the setup, waiting for a write load to be registered?

clusterSettings.addSettingsUpdateConsumer(MAX_INDEX_AGE_SETTING, this::setMaxIndexAgeSetting);
}

LicensedWriteLoadForecaster(BooleanSupplier hasValidLicense, TimeValue maxIndexAge) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add

// exposed for tests only

?


final IndexMetadata writeIndex = updatedMetadataBuilder.getSafe(dataStream.getWriteIndex());

final OptionalDouble forecastedWriteLoadForShard = writeLoadForecaster.getForecastedWriteLoad(writeIndex);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure I understand why the variable is named "ForShard"? I think the write load we obtain here is for the index in total and need to be divided by number of shards? Perhaps we need to do that inside getForecastedWriteLoad? A question for the integration into the actual balancer I suppose.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a leftover from the previous approach 🤦.

@fcofdez
Copy link
Contributor Author

fcofdez commented Nov 14, 2022

@elasticmachine update branch

@fcofdez
Copy link
Contributor Author

fcofdez commented Nov 14, 2022

@elasticmachine generate changelog

@elasticsearchmachine
Copy link
Collaborator

Hi @fcofdez, I've created a changelog YAML for you.

@fcofdez fcofdez merged commit 089ee1d into elastic:main Nov 14, 2022
@fcofdez
Copy link
Contributor Author

fcofdez commented Nov 14, 2022

Thanks for the reviews!

weizijun added a commit to weizijun/elasticsearch that referenced this pull request Nov 15, 2022
* main: (163 commits)
  [DOCS] Edits frequent items aggregation (elastic#91564)
  Handle providers of optional services in ubermodule classloader (elastic#91217)
  Add `exportDockerImages` lifecycle task for exporting docker tarballs (elastic#91571)
  Fix CSV dependency report output file location in DRA CI job
  Fix variable placeholder for Strings.format calls (elastic#91531)
  Fix output dir creation in ConcatFileTask (elastic#91568)
  Fix declaration of dependencies in DRA snapshots CI job (elastic#91569)
  Upgrade Gradle Enterprise plugin to 3.11.4 (elastic#91435)
  Ingest DateProcessor (small) speedup, optimize collections code in DateFormatter.forPattern (elastic#91521)
  Fix inter project handling of generateDependenciesReport (elastic#91555)
  [Synthetics] Add synthetics-* read to fleet-server (elastic#91391)
  [ML] Copy more settings when creating DF analytics destination index (elastic#91546)
  Reduce CartesianCentroidIT flakiness (elastic#91553)
  Propagate last node to reinitialized routing tables (elastic#91549)
  Forecast write load during rollovers (elastic#91425)
  [DOCS] Warn about potential overhead of named queries (elastic#91512)
  Datastream unavailable exception metadata (elastic#91461)
  Generate docker images and dependency report in DRA ci job (elastic#91545)
  Support cartesian_bounds aggregation on point and shape (elastic#91298)
  Add support for EQL samples queries (elastic#91312)
  ...

# Conflicts:
#	x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/downsample/RollupShardIndexer.java
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
:Distributed Coordination/Allocation All issues relating to the decision making around placing a shard (both master logic & on the nodes) >enhancement Team:Distributed (Obsolete) Meta label for distributed team (obsolete). Replaced by Distributed Indexing/Coordination. v8.6.0
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants