From bc96fcc0303bdd10dc7be13f1625aadd3a548920 Mon Sep 17 00:00:00 2001 From: Gordon Brown Date: Tue, 23 Oct 2018 16:35:10 -0600 Subject: [PATCH] Add cluster-wide shard limit warnings (#34021) In a future major version, we will be introducing a soft limit on the number of shards in a cluster based on the number of nodes in the cluster. This limit will be configurable, and checked on operations which create or open shards and issue a warning if the operation would take the cluster over the limit. There is an option to enable strict enforcement of the limit, which turns the warnings into errors. In a future release, the option will be removed and strict enforcement will be the default (and only) behavior. --- .../migration/migrate_7_0/cluster.asciidoc | 0 docs/reference/modules/cluster/misc.asciidoc | 44 +++++- .../cluster/metadata/MetaData.java | 29 +++- .../metadata/MetaDataCreateIndexService.java | 26 +++- .../metadata/MetaDataIndexStateService.java | 34 +++++ .../MetaDataUpdateSettingsService.java | 24 +++ .../common/settings/ClusterSettings.java | 1 + .../elasticsearch/indices/IndicesService.java | 54 +++++++ .../snapshots/RestoreService.java | 2 +- .../MetaDataCreateIndexServiceTests.java | 33 +++++ .../MetaDataIndexStateServiceTests.java | 99 +++++++++++++ .../cluster/shards/ClusterShardLimitIT.java | 140 ++++++++++++++++++ .../indices/IndicesServiceTests.java | 78 ++++++++++ .../indices/cluster/ClusterStateChanges.java | 1 + .../deprecation/ClusterDeprecationChecks.java | 30 ++++ .../xpack/deprecation/DeprecationChecks.java | 2 +- 16 files changed, 590 insertions(+), 7 deletions(-) create mode 100644 docs/reference/migration/migrate_7_0/cluster.asciidoc create mode 100644 server/src/test/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateServiceTests.java create mode 100644 server/src/test/java/org/elasticsearch/cluster/shards/ClusterShardLimitIT.java create mode 100644 x-pack/plugin/deprecation/src/main/java/org/elasticsearch/xpack/deprecation/ClusterDeprecationChecks.java diff --git a/docs/reference/migration/migrate_7_0/cluster.asciidoc b/docs/reference/migration/migrate_7_0/cluster.asciidoc new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/docs/reference/modules/cluster/misc.asciidoc b/docs/reference/modules/cluster/misc.asciidoc index 3f12bd255de75..f397c3075b711 100644 --- a/docs/reference/modules/cluster/misc.asciidoc +++ b/docs/reference/modules/cluster/misc.asciidoc @@ -22,6 +22,48 @@ user with access to the <> API can make the cluster read-write again. +[[cluster-shard-limit]] + +==== Cluster Shard Limit + +In a Elasticsearch 7.0 and later, there will be a soft limit on the number of +shards in a cluster, based on the number of nodes in the cluster. This is +intended to prevent operations which may unintentionally destabilize the +cluster. Prior to 7.0, actions which would result in the cluster going over the +limit will issue a deprecation warning. + +NOTE: You can set the system property `es.enforce_max_shards_per_node` to `true` +to opt in to strict enforcement of the shard limit. If this system property is +set, actions which would result in the cluster going over the limit will result +in an error, rather than a deprecation warning. This property will be removed in +Elasticsearch 7.0, as strict enforcement of the limit will be the default and +only behavior. + +If an operation, such as creating a new index, restoring a snapshot of an index, +or opening a closed index would lead to the number of shards in the cluster +going over this limit, the operation will issue a deprecation warning. + +If the cluster is already over the limit, due to changes in node membership or +setting changes, all operations that create or open indices will issue warnings +until either the limit is increased as described below, or some indices are +<> or <> to bring the +number of shards below the limit. + +Replicas count towards this limit, but closed indexes do not. An index with 5 +primary shards and 2 replicas will be counted as 15 shards. Any closed index +is counted as 0, no matter how many shards and replicas it contains. + +The limit defaults to 1,000 shards per node, and be dynamically adjusted using +the following property: + +`cluster.max_shards_per_node`:: + + Controls the number of shards allowed in the cluster per node. + +For example, a 3-node cluster with the default setting would allow 3,000 shards +total, across all open indexes. If the above setting is changed to 1,500, then +the cluster would allow 4,500 shards total. + [[user-defined-data]] ==== User Defined Cluster Metadata @@ -109,4 +151,4 @@ Enable or disable allocation for persistent tasks: This setting does not affect the persistent tasks that are already being executed. Only newly created persistent tasks, or tasks that must be reassigned (after a node left the cluster, for example), are impacted by this setting. --- \ No newline at end of file +-- diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/MetaData.java b/server/src/main/java/org/elasticsearch/cluster/metadata/MetaData.java index 5c6d63f04b3fe..a050eefe4efef 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/MetaData.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/MetaData.java @@ -122,9 +122,11 @@ public enum XContentContext { public interface Custom extends NamedDiffable, ToXContentFragment, ClusterState.FeatureAware { EnumSet context(); - } + public static final Setting SETTING_CLUSTER_MAX_SHARDS_PER_NODE = + Setting.intSetting("cluster.max_shards_per_node", 1000, 1, Property.Dynamic, Property.NodeScope); + public static final Setting SETTING_READ_ONLY_SETTING = Setting.boolSetting("cluster.blocks.read_only", false, Property.Dynamic, Property.NodeScope); @@ -160,6 +162,7 @@ public interface Custom extends NamedDiffable, ToXContentFragment, Clust private final ImmutableOpenMap customs; private final transient int totalNumberOfShards; // Transient ? not serializable anyway? + private final int totalOpenIndexShards; private final int numberOfShards; private final String[] allIndices; @@ -182,12 +185,17 @@ public interface Custom extends NamedDiffable, ToXContentFragment, Clust this.customs = customs; this.templates = templates; int totalNumberOfShards = 0; + int totalOpenIndexShards = 0; int numberOfShards = 0; for (ObjectCursor cursor : indices.values()) { totalNumberOfShards += cursor.value.getTotalNumberOfShards(); numberOfShards += cursor.value.getNumberOfShards(); + if (IndexMetaData.State.OPEN.equals(cursor.value.getState())) { + totalOpenIndexShards += cursor.value.getTotalNumberOfShards(); + } } this.totalNumberOfShards = totalNumberOfShards; + this.totalOpenIndexShards = totalOpenIndexShards; this.numberOfShards = numberOfShards; this.allIndices = allIndices; @@ -635,10 +643,29 @@ public T custom(String type) { } + /** + * Gets the total number of shards from all indices, including replicas and + * closed indices. + * @return The total number shards from all indices. + */ public int getTotalNumberOfShards() { return this.totalNumberOfShards; } + /** + * Gets the total number of open shards from all indices. Includes + * replicas, but does not include shards that are part of closed indices. + * @return The total number of open shards from all indices. + */ + public int getTotalOpenIndexShards() { + return this.totalOpenIndexShards; + } + + /** + * Gets the number of primary shards from all indices, not including + * replicas. + * @return The number of primary shards from all indices. + */ public int getNumberOfShards() { return this.numberOfShards; } diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexService.java b/server/src/main/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexService.java index a4a581ee71528..ceff3af430eab 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexService.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexService.java @@ -83,6 +83,7 @@ import java.util.List; import java.util.Locale; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.BiFunction; @@ -570,12 +571,16 @@ public void onFailure(String source, Exception e) { private void validate(CreateIndexClusterStateUpdateRequest request, ClusterState state) { validateIndexName(request.index(), state); - validateIndexSettings(request.index(), request.settings(), forbidPrivateIndexSettings); + validateIndexSettings(request.index(), request.settings(), state, forbidPrivateIndexSettings); } - public void validateIndexSettings( - final String indexName, final Settings settings, final boolean forbidPrivateIndexSettings) throws IndexCreationException { + public void validateIndexSettings(String indexName, final Settings settings, final ClusterState clusterState, + final boolean forbidPrivateIndexSettings) throws IndexCreationException { List validationErrors = getIndexSettingsValidationErrors(settings, forbidPrivateIndexSettings); + + Optional shardAllocation = checkShardLimit(settings, clusterState, deprecationLogger); + shardAllocation.ifPresent(validationErrors::add); + if (validationErrors.isEmpty() == false) { ValidationException validationException = new ValidationException(); validationException.addValidationErrors(validationErrors); @@ -583,6 +588,21 @@ public void validateIndexSettings( } } + /** + * Checks whether an index can be created without going over the cluster shard limit. + * + * @param settings The settings of the index to be created. + * @param clusterState The current cluster state. + * @param deprecationLogger The logger to use to emit a deprecation warning, if appropriate. + * @return If present, an error message to be used to reject index creation. If empty, a signal that this operation may be carried out. + */ + static Optional checkShardLimit(Settings settings, ClusterState clusterState, DeprecationLogger deprecationLogger) { + int shardsToCreate = IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.get(settings) + * (1 + IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.get(settings)); + + return IndicesService.checkShardLimit(shardsToCreate, clusterState, deprecationLogger); + } + List getIndexSettingsValidationErrors(final Settings settings, final boolean forbidPrivateIndexSettings) { String customPath = IndexMetaData.INDEX_DATA_PATH_SETTING.get(settings); List validationErrors = new ArrayList<>(); diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateService.java b/server/src/main/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateService.java index 038c03f342a34..f7482edd10d0e 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateService.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateService.java @@ -36,8 +36,10 @@ import org.elasticsearch.cluster.routing.allocation.AllocationService; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Priority; +import org.elasticsearch.common.ValidationException; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.logging.DeprecationLogger; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.Index; import org.elasticsearch.indices.IndicesService; @@ -50,6 +52,7 @@ import java.util.Arrays; import java.util.HashSet; import java.util.List; +import java.util.Optional; import java.util.Set; /** @@ -175,6 +178,8 @@ public ClusterState execute(ClusterState currentState) { } } + validateShardLimit(currentState, request.indices(), deprecationLogger); + if (indicesToOpen.isEmpty()) { return currentState; } @@ -217,4 +222,33 @@ public ClusterState execute(ClusterState currentState) { }); } + /** + * Validates whether a list of indices can be opened without going over the cluster shard limit. Only counts indices which are + * currently closed and will be opened, ignores indices which are already open. + * + * @param currentState The current cluster state. + * @param indices The indices which are to be opened. + * @param deprecationLogger The logger to use to emit a deprecation warning, if appropriate. + * @throws ValidationException If this operation would take the cluster over the limit and enforcement is enabled. + */ + static void validateShardLimit(ClusterState currentState, Index[] indices, DeprecationLogger deprecationLogger) { + int shardsToOpen = Arrays.stream(indices) + .filter(index -> currentState.metaData().index(index).getState().equals(IndexMetaData.State.CLOSE)) + .mapToInt(index -> getTotalShardCount(currentState, index)) + .sum(); + + Optional error = IndicesService.checkShardLimit(shardsToOpen, currentState, deprecationLogger); + if (error.isPresent()) { + ValidationException ex = new ValidationException(); + ex.addValidationError(error.get()); + throw ex; + } + + } + + private static int getTotalShardCount(ClusterState state, Index index) { + IndexMetaData indexMetaData = state.metaData().index(index); + return indexMetaData.getNumberOfShards() * (1 + indexMetaData.getNumberOfReplicas()); + } + } diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/MetaDataUpdateSettingsService.java b/server/src/main/java/org/elasticsearch/cluster/metadata/MetaDataUpdateSettingsService.java index 75fcdced678ab..c89e6ddba9546 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/MetaDataUpdateSettingsService.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/MetaDataUpdateSettingsService.java @@ -33,6 +33,7 @@ import org.elasticsearch.cluster.routing.allocation.AllocationService; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Priority; +import org.elasticsearch.common.ValidationException; import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.inject.Inject; @@ -45,9 +46,11 @@ import org.elasticsearch.threadpool.ThreadPool; import java.io.IOException; +import java.util.Arrays; import java.util.HashSet; import java.util.Locale; import java.util.Map; +import java.util.Optional; import java.util.Set; import static org.elasticsearch.action.support.ContextPreservingActionListener.wrapPreservingContext; @@ -115,6 +118,7 @@ protected ClusterStateUpdateResponse newResponse(boolean acknowledged) { @Override public ClusterState execute(ClusterState currentState) { + RoutingTable.Builder routingTableBuilder = RoutingTable.builder(currentState.routingTable()); MetaData.Builder metaDataBuilder = MetaData.builder(currentState.metaData()); @@ -141,6 +145,18 @@ public ClusterState execute(ClusterState currentState) { int updatedNumberOfReplicas = openSettings.getAsInt(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, -1); if (updatedNumberOfReplicas != -1 && preserveExisting == false) { + + // Verify that this won't take us over the cluster shard limit. + int totalNewShards = Arrays.stream(request.indices()) + .mapToInt(i -> getTotalNewShards(i, currentState, updatedNumberOfReplicas)) + .sum(); + Optional error = IndicesService.checkShardLimit(totalNewShards, currentState, deprecationLogger); + if (error.isPresent()) { + ValidationException ex = new ValidationException(); + ex.addValidationError(error.get()); + throw ex; + } + // we do *not* update the in sync allocation ids as they will be removed upon the first index // operation which make these copies stale // TODO: update the list once the data is deleted by the node? @@ -224,6 +240,14 @@ public ClusterState execute(ClusterState currentState) { }); } + private int getTotalNewShards(Index index, ClusterState currentState, int updatedNumberOfReplicas) { + IndexMetaData indexMetaData = currentState.metaData().index(index); + int shardsInIndex = indexMetaData.getNumberOfShards(); + int oldNumberOfReplicas = indexMetaData.getNumberOfReplicas(); + int replicaIncrease = updatedNumberOfReplicas - oldNumberOfReplicas; + return replicaIncrease * shardsInIndex; + } + /** * Updates the cluster block only iff the setting exists in the given settings */ diff --git a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java index 72c1bb978cff1..f387205dfa8cf 100644 --- a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java @@ -197,6 +197,7 @@ public void apply(Settings value, Settings current, Settings previous) { MappingUpdatedAction.INDICES_MAPPING_DYNAMIC_TIMEOUT_SETTING, MetaData.SETTING_READ_ONLY_SETTING, MetaData.SETTING_READ_ONLY_ALLOW_DELETE_SETTING, + MetaData.SETTING_CLUSTER_MAX_SHARDS_PER_NODE, RecoverySettings.INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING, RecoverySettings.INDICES_RECOVERY_RETRY_DELAY_STATE_SYNC_SETTING, RecoverySettings.INDICES_RECOVERY_RETRY_DELAY_NETWORK_SETTING, diff --git a/server/src/main/java/org/elasticsearch/indices/IndicesService.java b/server/src/main/java/org/elasticsearch/indices/IndicesService.java index 24410ba79a022..d69cd6f167f63 100644 --- a/server/src/main/java/org/elasticsearch/indices/IndicesService.java +++ b/server/src/main/java/org/elasticsearch/indices/IndicesService.java @@ -38,6 +38,7 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.routing.RecoverySource; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.common.CheckedFunction; @@ -52,6 +53,7 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.lease.Releasable; +import org.elasticsearch.common.logging.DeprecationLogger; import org.elasticsearch.common.settings.IndexScopedSettings; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Setting.Property; @@ -156,6 +158,21 @@ public class IndicesService extends AbstractLifecycleComponent public static final String INDICES_SHARDS_CLOSED_TIMEOUT = "indices.shards_closed_timeout"; public static final Setting INDICES_CACHE_CLEAN_INTERVAL_SETTING = Setting.positiveTimeSetting("indices.cache.cleanup_interval", TimeValue.timeValueMinutes(1), Property.NodeScope); + private static final boolean ENFORCE_MAX_SHARDS_PER_NODE; + + static { + final String ENFORCE_SHARD_LIMIT_KEY = "es.enforce_max_shards_per_node"; + final String enforceMaxShardsPerNode = System.getProperty(ENFORCE_SHARD_LIMIT_KEY); + if (enforceMaxShardsPerNode == null) { + ENFORCE_MAX_SHARDS_PER_NODE = false; + } else if ("true".equals(enforceMaxShardsPerNode)) { + ENFORCE_MAX_SHARDS_PER_NODE = true; + } else { + throw new IllegalArgumentException(ENFORCE_SHARD_LIMIT_KEY + " may only be unset or set to [true] but was [" + + enforceMaxShardsPerNode + "]"); + } + } + private final PluginsService pluginsService; private final NodeEnvironment nodeEnv; private final NamedXContentRegistry xContentRegistry; @@ -1346,4 +1363,41 @@ public Function> getFieldFilter() { public boolean isMetaDataField(String field) { return mapperRegistry.isMetaDataField(field); } + + /** + * Checks to see if an operation can be performed without taking the cluster over the cluster-wide shard limit. Adds a deprecation + * warning or returns an error message as appropriate + * + * @param newShards The number of shards to be added by this operation + * @param state The current cluster state + * @param deprecationLogger The logger to use for deprecation warnings + * @return If present, an error message to be given as the reason for failing + * an operation. If empty, a sign that the operation is valid. + */ + public static Optional checkShardLimit(int newShards, ClusterState state, DeprecationLogger deprecationLogger) { + Settings theseSettings = state.metaData().settings(); + int nodeCount = state.getNodes().getDataNodes().size(); + + // Only enforce the shard limit if we have at least one data node, so that we don't block + // index creation during cluster setup + if (nodeCount == 0 || newShards < 0) { + return Optional.empty(); + } + int maxShardsPerNode = MetaData.SETTING_CLUSTER_MAX_SHARDS_PER_NODE.get(theseSettings); + int maxShardsInCluster = maxShardsPerNode * nodeCount; + int currentOpenShards = state.getMetaData().getTotalOpenIndexShards(); + + if ((currentOpenShards + newShards) > maxShardsInCluster) { + String errorMessage = "this action would add [" + newShards + "] total shards, but this cluster currently has [" + + currentOpenShards + "]/[" + maxShardsInCluster + "] maximum shards open"; + if (ENFORCE_MAX_SHARDS_PER_NODE) { + return Optional.of(errorMessage); + } else { + deprecationLogger.deprecated("In a future major version, this request will fail because {}. Before upgrading, " + + "reduce the number of shards in your cluster or adjust the cluster setting [{}].", + errorMessage, MetaData.SETTING_CLUSTER_MAX_SHARDS_PER_NODE.getKey()); + } + } + return Optional.empty(); + } } diff --git a/server/src/main/java/org/elasticsearch/snapshots/RestoreService.java b/server/src/main/java/org/elasticsearch/snapshots/RestoreService.java index 87ea8cb978fe4..791b59a1d5bb0 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/RestoreService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/RestoreService.java @@ -270,7 +270,7 @@ public ClusterState execute(ClusterState currentState) { // Index doesn't exist - create it and start recovery // Make sure that the index we are about to create has a validate name MetaDataCreateIndexService.validateIndexName(renamedIndexName, currentState); - createIndexService.validateIndexSettings(renamedIndexName, snapshotIndexMetaData.getSettings(), false); + createIndexService.validateIndexSettings(renamedIndexName, snapshotIndexMetaData.getSettings(), currentState, false); IndexMetaData.Builder indexMdBuilder = IndexMetaData.builder(snapshotIndexMetaData).state(IndexMetaData.State.OPEN).index(renamedIndexName); indexMdBuilder.settings(Settings.builder().put(snapshotIndexMetaData.getSettings()).put(IndexMetaData.SETTING_INDEX_UUID, UUIDs.randomBase64UUID())); if (!request.includeAliases() && !snapshotIndexMetaData.getAliases().isEmpty()) { diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexServiceTests.java b/server/src/test/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexServiceTests.java index b4125274ea374..336075f23bbb2 100644 --- a/server/src/test/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexServiceTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexServiceTests.java @@ -34,7 +34,9 @@ import org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator; import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders; import org.elasticsearch.cluster.routing.allocation.decider.MaxRetryAllocationDecider; +import org.elasticsearch.cluster.shards.ClusterShardLimitIT; import org.elasticsearch.common.Strings; +import org.elasticsearch.common.logging.DeprecationLogger; import org.elasticsearch.common.settings.IndexScopedSettings; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; @@ -56,6 +58,11 @@ import java.util.stream.Stream; import static java.util.Collections.emptyMap; +import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS; +import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS; +import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_VERSION_CREATED; +import static org.elasticsearch.cluster.shards.ClusterShardLimitIT.ShardCounts.forDataNodeCount; +import static org.elasticsearch.indices.IndicesServiceTests.createClusterForShardLimitTest; import static org.hamcrest.Matchers.endsWith; import static org.hamcrest.Matchers.equalTo; @@ -417,4 +424,30 @@ private void validateIndexName(String indexName, String errorMessage) { .getDefault(Settings.EMPTY)).build())); assertThat(e.getMessage(), endsWith(errorMessage)); } + + public void testShardLimitDeprecationWarning() { + int nodesInCluster = randomIntBetween(2,100); + ClusterShardLimitIT.ShardCounts counts = forDataNodeCount(nodesInCluster); + Settings clusterSettings = Settings.builder() + .put(MetaData.SETTING_CLUSTER_MAX_SHARDS_PER_NODE.getKey(), counts.getShardsPerNode()) + .build(); + ClusterState state = createClusterForShardLimitTest(nodesInCluster, counts.getFirstIndexShards(), counts.getFirstIndexReplicas(), + clusterSettings); + + Settings indexSettings = Settings.builder() + .put(SETTING_VERSION_CREATED, Version.CURRENT) + .put(SETTING_NUMBER_OF_SHARDS, counts.getFailingIndexShards()) + .put(SETTING_NUMBER_OF_REPLICAS, counts.getFailingIndexReplicas()) + .build(); + + DeprecationLogger deprecationLogger = new DeprecationLogger(logger); + MetaDataCreateIndexService.checkShardLimit(indexSettings, state, deprecationLogger); + int totalShards = counts.getFailingIndexShards() * (1 + counts.getFailingIndexReplicas()); + int currentShards = counts.getFirstIndexShards() * (1 + counts.getFirstIndexReplicas()); + int maxShards = counts.getShardsPerNode() * nodesInCluster; + assertWarnings("In a future major version, this request will fail because this action would add [" + + totalShards + "] total shards, but this cluster currently has [" + currentShards + "]/[" + maxShards + "] maximum shards open."+ + " Before upgrading, reduce the number of shards in your cluster or adjust the cluster setting [cluster.max_shards_per_node]."); + } + } diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateServiceTests.java b/server/src/test/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateServiceTests.java new file mode 100644 index 0000000000000..55e2216edb564 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateServiceTests.java @@ -0,0 +1,99 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.cluster.metadata; + +import org.elasticsearch.Version; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.shards.ClusterShardLimitIT; +import org.elasticsearch.common.collect.ImmutableOpenMap; +import org.elasticsearch.common.logging.DeprecationLogger; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.Index; +import org.elasticsearch.test.ESTestCase; + +import java.util.Arrays; +import java.util.stream.Collectors; + +import static org.elasticsearch.cluster.shards.ClusterShardLimitIT.ShardCounts.forDataNodeCount; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class MetaDataIndexStateServiceTests extends ESTestCase { + + public void testValidateShardLimitDeprecationWarning() { + int nodesInCluster = randomIntBetween(2,100); + ClusterShardLimitIT.ShardCounts counts = forDataNodeCount(nodesInCluster); + Settings clusterSettings = Settings.builder() + .put(MetaData.SETTING_CLUSTER_MAX_SHARDS_PER_NODE.getKey(), counts.getShardsPerNode()) + .build(); + ClusterState state = createClusterForShardLimitTest(nodesInCluster, counts.getFirstIndexShards(), counts.getFirstIndexReplicas(), + counts.getFailingIndexShards(), counts.getFailingIndexReplicas(), clusterSettings); + + Index[] indices = Arrays.stream(state.metaData().indices().values().toArray(IndexMetaData.class)) + .map(IndexMetaData::getIndex) + .collect(Collectors.toList()) + .toArray(new Index[2]); + + DeprecationLogger deprecationLogger = new DeprecationLogger(logger); + MetaDataIndexStateService.validateShardLimit(state, indices, deprecationLogger); + int totalShards = counts.getFailingIndexShards() * (1 + counts.getFailingIndexReplicas()); + int currentShards = counts.getFirstIndexShards() * (1 + counts.getFirstIndexReplicas()); + int maxShards = counts.getShardsPerNode() * nodesInCluster; + assertWarnings("In a future major version, this request will fail because this action would add [" + + totalShards + "] total shards, but this cluster currently has [" + currentShards + "]/[" + maxShards + "] maximum shards open."+ + " Before upgrading, reduce the number of shards in your cluster or adjust the cluster setting [cluster.max_shards_per_node]."); + } + + public static ClusterState createClusterForShardLimitTest(int nodesInCluster, int openIndexShards, int openIndexReplicas, + int closedIndexShards, int closedIndexReplicas, Settings clusterSettings) { + ImmutableOpenMap.Builder dataNodes = ImmutableOpenMap.builder(); + for (int i = 0; i < nodesInCluster; i++) { + dataNodes.put(randomAlphaOfLengthBetween(5,15), mock(DiscoveryNode.class)); + } + DiscoveryNodes nodes = mock(DiscoveryNodes.class); + when(nodes.getDataNodes()).thenReturn(dataNodes.build()); + + IndexMetaData.Builder openIndexMetaData = IndexMetaData.builder(randomAlphaOfLengthBetween(5, 15)) + .settings(Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)) + .creationDate(randomLong()) + .numberOfShards(openIndexShards) + .numberOfReplicas(openIndexReplicas); + IndexMetaData.Builder closedIndexMetaData = IndexMetaData.builder(randomAlphaOfLengthBetween(5, 15)) + .settings(Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)) + .creationDate(randomLong()) + .state(IndexMetaData.State.CLOSE) + .numberOfShards(closedIndexShards) + .numberOfReplicas(closedIndexReplicas); + MetaData.Builder metaData = MetaData.builder().put(openIndexMetaData).put(closedIndexMetaData); + if (randomBoolean()) { + metaData.persistentSettings(clusterSettings); + } else { + metaData.transientSettings(clusterSettings); + } + + return ClusterState.builder(ClusterName.DEFAULT) + .metaData(metaData) + .nodes(nodes) + .build(); + } +} diff --git a/server/src/test/java/org/elasticsearch/cluster/shards/ClusterShardLimitIT.java b/server/src/test/java/org/elasticsearch/cluster/shards/ClusterShardLimitIT.java new file mode 100644 index 0000000000000..f9958d3aba2cd --- /dev/null +++ b/server/src/test/java/org/elasticsearch/cluster/shards/ClusterShardLimitIT.java @@ -0,0 +1,140 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + +package org.elasticsearch.cluster.shards; + +import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.test.ESIntegTestCase; + +@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST) +public class ClusterShardLimitIT extends ESIntegTestCase { + private static final String shardsPerNodeKey = MetaData.SETTING_CLUSTER_MAX_SHARDS_PER_NODE.getKey(); + + public void testSettingClusterMaxShards() { + int shardsPerNode = between(1, 500_000); + setShardsPerNode(shardsPerNode); + } + + public void testMinimumPerNode() { + int negativeShardsPerNode = between(-50_000, 0); + try { + if (frequently()) { + client().admin().cluster() + .prepareUpdateSettings() + .setPersistentSettings(Settings.builder().put(shardsPerNodeKey, negativeShardsPerNode).build()) + .get(); + } else { + client().admin().cluster() + .prepareUpdateSettings() + .setTransientSettings(Settings.builder().put(shardsPerNodeKey, negativeShardsPerNode).build()) + .get(); + } + fail("should not be able to set negative shards per node"); + } catch (IllegalArgumentException ex) { + assertEquals("Failed to parse value [" + negativeShardsPerNode + "] for setting [cluster.max_shards_per_node] must be >= 1", + ex.getMessage()); + } + } + + private void setShardsPerNode(int shardsPerNode) { + try { + ClusterUpdateSettingsResponse response; + if (frequently()) { + response = client().admin().cluster() + .prepareUpdateSettings() + .setPersistentSettings(Settings.builder().put(shardsPerNodeKey, shardsPerNode).build()) + .get(); + assertEquals(shardsPerNode, response.getPersistentSettings().getAsInt(shardsPerNodeKey, -1).intValue()); + } else { + response = client().admin().cluster() + .prepareUpdateSettings() + .setTransientSettings(Settings.builder().put(shardsPerNodeKey, shardsPerNode).build()) + .get(); + assertEquals(shardsPerNode, response.getTransientSettings().getAsInt(shardsPerNodeKey, -1).intValue()); + } + } catch (IllegalArgumentException ex) { + fail(ex.getMessage()); + } + } + + public static class ShardCounts { + private final int shardsPerNode; + + private final int firstIndexShards; + private final int firstIndexReplicas; + + private final int failingIndexShards; + private final int failingIndexReplicas; + + private ShardCounts(int shardsPerNode, + int firstIndexShards, + int firstIndexReplicas, + int failingIndexShards, + int failingIndexReplicas) { + this.shardsPerNode = shardsPerNode; + this.firstIndexShards = firstIndexShards; + this.firstIndexReplicas = firstIndexReplicas; + this.failingIndexShards = failingIndexShards; + this.failingIndexReplicas = failingIndexReplicas; + } + + public static ShardCounts forDataNodeCount(int dataNodes) { + int mainIndexReplicas = between(0, dataNodes - 1); + int mainIndexShards = between(1, 10); + int totalShardsInIndex = (mainIndexReplicas + 1) * mainIndexShards; + int shardsPerNode = (int) Math.ceil((double) totalShardsInIndex / dataNodes); + int totalCap = shardsPerNode * dataNodes; + + int failingIndexShards; + int failingIndexReplicas; + if (dataNodes > 1 && frequently()) { + failingIndexShards = Math.max(1, totalCap - totalShardsInIndex); + failingIndexReplicas = between(1, dataNodes - 1); + } else { + failingIndexShards = totalCap - totalShardsInIndex + between(1, 10); + failingIndexReplicas = 0; + } + + return new ShardCounts(shardsPerNode, mainIndexShards, mainIndexReplicas, failingIndexShards, failingIndexReplicas); + } + + public int getShardsPerNode() { + return shardsPerNode; + } + + public int getFirstIndexShards() { + return firstIndexShards; + } + + public int getFirstIndexReplicas() { + return firstIndexReplicas; + } + + public int getFailingIndexShards() { + return failingIndexShards; + } + + public int getFailingIndexReplicas() { + return failingIndexReplicas; + } + } +} diff --git a/server/src/test/java/org/elasticsearch/indices/IndicesServiceTests.java b/server/src/test/java/org/elasticsearch/indices/IndicesServiceTests.java index 808cea4d0ee6e..a4873f7f31006 100644 --- a/server/src/test/java/org/elasticsearch/indices/IndicesServiceTests.java +++ b/server/src/test/java/org/elasticsearch/indices/IndicesServiceTests.java @@ -29,9 +29,14 @@ import org.elasticsearch.cluster.metadata.IndexGraveyard; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.cluster.shards.ClusterShardLimitIT; import org.elasticsearch.common.UUIDs; +import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.io.FileSystemUtils; +import org.elasticsearch.common.logging.DeprecationLogger; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; @@ -79,6 +84,7 @@ import java.util.stream.Stream; import static org.elasticsearch.action.support.WriteRequest.RefreshPolicy.IMMEDIATE; +import static org.elasticsearch.cluster.shards.ClusterShardLimitIT.ShardCounts.forDataNodeCount; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; import static org.hamcrest.Matchers.containsString; @@ -564,4 +570,76 @@ public void testConflictingEngineFactories() throws IOException { assertThat(e, hasToString(new RegexMatcher(pattern))); } + public void testOverShardLimit() { + int nodesInCluster = randomIntBetween(1,100); + ClusterShardLimitIT.ShardCounts counts = forDataNodeCount(nodesInCluster); + + Settings clusterSettings = Settings.builder() + .put(MetaData.SETTING_CLUSTER_MAX_SHARDS_PER_NODE.getKey(), counts.getShardsPerNode()) + .build(); + + ClusterState state = createClusterForShardLimitTest(nodesInCluster, counts.getFirstIndexShards(), counts.getFirstIndexReplicas(), + clusterSettings); + + int shardsToAdd = counts.getFailingIndexShards() * (1 + counts.getFailingIndexReplicas()); + DeprecationLogger deprecationLogger = new DeprecationLogger(logger); + Optional errorMessage = IndicesService.checkShardLimit(shardsToAdd, state, deprecationLogger); + + int totalShards = counts.getFailingIndexShards() * (1 + counts.getFailingIndexReplicas()); + int currentShards = counts.getFirstIndexShards() * (1 + counts.getFirstIndexReplicas()); + int maxShards = counts.getShardsPerNode() * nodesInCluster; + assertWarnings("In a future major version, this request will fail because this action would add [" + + totalShards + "] total shards, but this cluster currently has [" + currentShards + "]/[" + maxShards + "] maximum shards open."+ + " Before upgrading, reduce the number of shards in your cluster or adjust the cluster setting [cluster.max_shards_per_node]."); + assertFalse(errorMessage.isPresent()); + } + + public void testUnderShardLimit() { + int nodesInCluster = randomIntBetween(2,100); + // Calculate the counts for a cluster 1 node smaller than we have to ensure we have headroom + ClusterShardLimitIT.ShardCounts counts = forDataNodeCount(nodesInCluster - 1); + + Settings clusterSettings = Settings.builder() + .put(MetaData.SETTING_CLUSTER_MAX_SHARDS_PER_NODE.getKey(), counts.getShardsPerNode()) + .build(); + + ClusterState state = createClusterForShardLimitTest(nodesInCluster, counts.getFirstIndexShards(), counts.getFirstIndexReplicas(), + clusterSettings); + + int existingShards = counts.getFirstIndexShards() * (1 + counts.getFirstIndexReplicas()); + int shardsToAdd = randomIntBetween(1, (counts.getShardsPerNode() * nodesInCluster) - existingShards); + DeprecationLogger deprecationLogger = new DeprecationLogger(logger); + Optional errorMessage = IndicesService.checkShardLimit(shardsToAdd, state, deprecationLogger); + + assertFalse(errorMessage.isPresent()); + } + + public static ClusterState createClusterForShardLimitTest(int nodesInCluster, int shardsInIndex, int replicas, + Settings clusterSettings) { + ImmutableOpenMap.Builder dataNodes = ImmutableOpenMap.builder(); + for (int i = 0; i < nodesInCluster; i++) { + dataNodes.put(randomAlphaOfLengthBetween(5,15), mock(DiscoveryNode.class)); + } + DiscoveryNodes nodes = mock(DiscoveryNodes.class); + when(nodes.getDataNodes()).thenReturn(dataNodes.build()); + + IndexMetaData.Builder indexMetaData = IndexMetaData.builder(randomAlphaOfLengthBetween(5, 15)) + .settings(Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)) + .creationDate(randomLong()) + .numberOfShards(shardsInIndex) + .numberOfReplicas(replicas); + MetaData.Builder metaData = MetaData.builder().put(indexMetaData); + if (randomBoolean()) { + metaData.transientSettings(clusterSettings); + } else { + metaData.persistentSettings(clusterSettings); + } + + return ClusterState.builder(ClusterName.DEFAULT) + .metaData(metaData) + .nodes(nodes) + .build(); + } + + } diff --git a/server/src/test/java/org/elasticsearch/indices/cluster/ClusterStateChanges.java b/server/src/test/java/org/elasticsearch/indices/cluster/ClusterStateChanges.java index f331a4f100ce4..677c95a85cafb 100644 --- a/server/src/test/java/org/elasticsearch/indices/cluster/ClusterStateChanges.java +++ b/server/src/test/java/org/elasticsearch/indices/cluster/ClusterStateChanges.java @@ -144,6 +144,7 @@ public ClusterStateChanges(NamedXContentRegistry xContentRegistry, ThreadPool th // mocks clusterService = mock(ClusterService.class); + when(clusterService.getClusterSettings()).thenReturn(clusterSettings); IndicesService indicesService = mock(IndicesService.class); // MetaDataCreateIndexService creates indices using its IndicesService instance to check mappings -> fake it here try { diff --git a/x-pack/plugin/deprecation/src/main/java/org/elasticsearch/xpack/deprecation/ClusterDeprecationChecks.java b/x-pack/plugin/deprecation/src/main/java/org/elasticsearch/xpack/deprecation/ClusterDeprecationChecks.java new file mode 100644 index 0000000000000..7f11c2c2944a7 --- /dev/null +++ b/x-pack/plugin/deprecation/src/main/java/org/elasticsearch/xpack/deprecation/ClusterDeprecationChecks.java @@ -0,0 +1,30 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.deprecation; + +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.xpack.core.deprecation.DeprecationIssue; + +public class ClusterDeprecationChecks { + + static DeprecationIssue checkShardLimit(ClusterState state) { + int shardsPerNode = MetaData.SETTING_CLUSTER_MAX_SHARDS_PER_NODE.get(state.metaData().settings()); + int nodeCount = state.getNodes().getDataNodes().size(); + int maxShardsInCluster = shardsPerNode * nodeCount; + int currentOpenShards = state.getMetaData().getTotalOpenIndexShards(); + + if (currentOpenShards >= maxShardsInCluster) { + return new DeprecationIssue(DeprecationIssue.Level.WARNING, + "Number of open shards exceeds cluster soft limit", + "https://www.elastic.co/guide/en/elasticsearch/reference/master/breaking_70_cluster_changes.html", + "There are [" + currentOpenShards + "] open shards in this cluster, but the cluster is limited to [" + + shardsPerNode + "] per data node, for [" + maxShardsInCluster + "] maximum."); + } + return null; + } +} diff --git a/x-pack/plugin/deprecation/src/main/java/org/elasticsearch/xpack/deprecation/DeprecationChecks.java b/x-pack/plugin/deprecation/src/main/java/org/elasticsearch/xpack/deprecation/DeprecationChecks.java index af77342816cab..86c164fd1ef74 100644 --- a/x-pack/plugin/deprecation/src/main/java/org/elasticsearch/xpack/deprecation/DeprecationChecks.java +++ b/x-pack/plugin/deprecation/src/main/java/org/elasticsearch/xpack/deprecation/DeprecationChecks.java @@ -32,7 +32,7 @@ private DeprecationChecks() { static List> CLUSTER_SETTINGS_CHECKS = Collections.unmodifiableList(Arrays.asList( - // STUB + ClusterDeprecationChecks::checkShardLimit )); static List, List, DeprecationIssue>> NODE_SETTINGS_CHECKS =