From b0714503ace60a1d0c3c583423262d475d4ef932 Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Tue, 4 Jul 2017 18:43:59 +0200 Subject: [PATCH] Switch indices read-only if a node runs out of disk space Today when we run out of disk all kinds of crazy things can happen and nodes are becoming hard to maintain once out of disk is hit. While we try to move shards away if we hit watermarks this might not be possible in many situations. Based on the discussion in #24299 this change monitors disk utiliation and adds a floodstage watermark that causes all indices that are allocated on a node hitting the floodstage mark to be switched read-only (with the option to be deleted). This allows users to react on the low disk situation while subsequent write requests will be rejected. Users can switch individual indices read-write once the situation is sorted out. There is no automatic read-write switch once the node has enough space. This requires user interaction. The floodstage watermark is set to `95%` utilization by default. Closes #24299 --- .../cluster/ClusterInfoService.java | 11 -- .../cluster/EmptyClusterInfoService.java | 5 - .../cluster/InternalClusterInfoService.java | 24 ++- .../allocation/DiskThresholdMonitor.java | 64 ++++++-- .../allocation/DiskThresholdSettings.java | 32 +++- .../common/settings/ClusterSettings.java | 1 + .../java/org/elasticsearch/node/Node.java | 15 +- .../org/elasticsearch/node/NodeModule.java | 42 ------ .../allocation/DiskThresholdMonitorTests.java | 139 ++++++++++++++++++ .../ExpectedShardSizeAllocationTests.java | 42 ++---- .../allocation/RebalanceAfterActiveTests.java | 24 +-- .../decider/DiskThresholdDeciderTests.java | 39 ----- .../MockInternalClusterInfoService.java | 6 +- .../java/org/elasticsearch/node/MockNode.java | 8 +- 14 files changed, 268 insertions(+), 184 deletions(-) delete mode 100644 core/src/main/java/org/elasticsearch/node/NodeModule.java create mode 100644 core/src/test/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdMonitorTests.java diff --git a/core/src/main/java/org/elasticsearch/cluster/ClusterInfoService.java b/core/src/main/java/org/elasticsearch/cluster/ClusterInfoService.java index c17bc08ac0a9e..ebc14b800cf35 100644 --- a/core/src/main/java/org/elasticsearch/cluster/ClusterInfoService.java +++ b/core/src/main/java/org/elasticsearch/cluster/ClusterInfoService.java @@ -27,15 +27,4 @@ public interface ClusterInfoService { /** The latest cluster information */ ClusterInfo getClusterInfo(); - - /** Add a listener that will be called every time new information is gathered */ - void addListener(Listener listener); - - /** - * Interface for listeners to implement in order to perform actions when - * new information about the cluster has been gathered - */ - interface Listener { - void onNewInfo(ClusterInfo info); - } } diff --git a/core/src/main/java/org/elasticsearch/cluster/EmptyClusterInfoService.java b/core/src/main/java/org/elasticsearch/cluster/EmptyClusterInfoService.java index b552508532c82..8e705a2406301 100644 --- a/core/src/main/java/org/elasticsearch/cluster/EmptyClusterInfoService.java +++ b/core/src/main/java/org/elasticsearch/cluster/EmptyClusterInfoService.java @@ -36,9 +36,4 @@ private EmptyClusterInfoService() { public ClusterInfo getClusterInfo() { return ClusterInfo.EMPTY; } - - @Override - public void addListener(Listener listener) { - // no-op, no new info is ever gathered, so adding listeners is useless - } } diff --git a/core/src/main/java/org/elasticsearch/cluster/InternalClusterInfoService.java b/core/src/main/java/org/elasticsearch/cluster/InternalClusterInfoService.java index b0baac6bd9029..ae16dbe88a621 100644 --- a/core/src/main/java/org/elasticsearch/cluster/InternalClusterInfoService.java +++ b/core/src/main/java/org/elasticsearch/cluster/InternalClusterInfoService.java @@ -52,6 +52,7 @@ import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; /** * InternalClusterInfoService provides the ClusterInfoService interface, @@ -86,9 +87,10 @@ public class InternalClusterInfoService extends AbstractComponent private final ClusterService clusterService; private final ThreadPool threadPool; private final NodeClient client; - private final List listeners = new CopyOnWriteArrayList<>(); + private final Consumer listener; - public InternalClusterInfoService(Settings settings, ClusterService clusterService, ThreadPool threadPool, NodeClient client) { + public InternalClusterInfoService(Settings settings, ClusterService clusterService, ThreadPool threadPool, NodeClient client, + Consumer listener) { super(settings); this.leastAvailableSpaceUsages = ImmutableOpenMap.of(); this.mostAvailableSpaceUsages = ImmutableOpenMap.of(); @@ -109,6 +111,7 @@ public InternalClusterInfoService(Settings settings, ClusterService clusterServi this.clusterService.addLocalNodeMasterListener(this); // Add to listen for state changes (when nodes are added) this.clusterService.addListener(this); + this.listener = listener; } private void setEnabled(boolean enabled) { @@ -201,11 +204,6 @@ public ClusterInfo getClusterInfo() { return new ClusterInfo(leastAvailableSpaceUsages, mostAvailableSpaceUsages, shardSizes, shardRoutingToDataPath); } - @Override - public void addListener(Listener listener) { - this.listeners.add(listener); - } - /** * Class used to submit {@link #maybeRefresh()} on the * {@link InternalClusterInfoService} threadpool, these jobs will @@ -362,21 +360,17 @@ public void onFailure(Exception e) { logger.warn("Failed to update shard information for ClusterInfoUpdateJob within {} timeout", fetchTimeout); } ClusterInfo clusterInfo = getClusterInfo(); - for (Listener l : listeners) { - try { - l.onNewInfo(clusterInfo); - } catch (Exception e) { - logger.info("Failed executing ClusterInfoService listener", e); - } + try { + listener.accept(clusterInfo); + } catch (Exception e) { + logger.info("Failed executing ClusterInfoService listener", e); } return clusterInfo; } static void buildShardLevelInfo(Logger logger, ShardStats[] stats, ImmutableOpenMap.Builder newShardSizes, ImmutableOpenMap.Builder newShardRoutingToDataPath, ClusterState state) { - MetaData meta = state.getMetaData(); for (ShardStats s : stats) { - IndexMetaData indexMeta = meta.index(s.getShardRouting().index()); newShardRoutingToDataPath.put(s.getShardRouting(), s.getDataPath()); long size = s.getStats().getStore().sizeInBytes(); String sid = ClusterInfo.shardIdentifierFromRouting(s.getShardRouting()); diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdMonitor.java b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdMonitor.java index 390acda0fa519..885712b19ef5b 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdMonitor.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdMonitor.java @@ -19,18 +19,23 @@ package org.elasticsearch.cluster.routing.allocation; +import java.util.HashSet; import java.util.Set; +import java.util.function.Supplier; import com.carrotsearch.hppc.ObjectLookupContainer; import com.carrotsearch.hppc.cursors.ObjectObjectCursor; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterInfo; -import org.elasticsearch.cluster.ClusterInfoService; +import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.DiskUsage; +import org.elasticsearch.cluster.block.ClusterBlockLevel; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.routing.RoutingNode; +import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.common.Strings; import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.component.AbstractComponent; -import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.set.Sets; @@ -40,21 +45,19 @@ * reroute if it does. Also responsible for logging about nodes that have * passed the disk watermarks */ -public class DiskThresholdMonitor extends AbstractComponent implements ClusterInfoService.Listener { +public class DiskThresholdMonitor extends AbstractComponent { private final DiskThresholdSettings diskThresholdSettings; private final Client client; private final Set nodeHasPassedWatermark = Sets.newConcurrentHashSet(); - + private final Supplier clusterStateSupplier; private long lastRunNS; - // TODO: remove injection when ClusterInfoService is not injected - @Inject - public DiskThresholdMonitor(Settings settings, ClusterSettings clusterSettings, - ClusterInfoService infoService, Client client) { + public DiskThresholdMonitor(Settings settings, Supplier clusterStateSupplier, ClusterSettings clusterSettings, + Client client) { super(settings); + this.clusterStateSupplier = clusterStateSupplier; this.diskThresholdSettings = new DiskThresholdSettings(settings, clusterSettings); this.client = client; - infoService.addListener(this); } /** @@ -62,7 +65,10 @@ public DiskThresholdMonitor(Settings settings, ClusterSettings clusterSettings, */ private void warnAboutDiskIfNeeded(DiskUsage usage) { // Check absolute disk values - if (usage.getFreeBytes() < diskThresholdSettings.getFreeBytesThresholdHigh().getBytes()) { + if (usage.getFreeBytes() < diskThresholdSettings.getFreeBytesThresholdFloodStage().getBytes()) { + logger.warn("floodstage disk watermark [{}] exceeded on {}, all indices on this node will marked read-only", + diskThresholdSettings.getFreeBytesThresholdFloodStage(), usage); + } else if (usage.getFreeBytes() < diskThresholdSettings.getFreeBytesThresholdHigh().getBytes()) { logger.warn("high disk watermark [{}] exceeded on {}, shards will be relocated away from this node", diskThresholdSettings.getFreeBytesThresholdHigh(), usage); } else if (usage.getFreeBytes() < diskThresholdSettings.getFreeBytesThresholdLow().getBytes()) { @@ -72,6 +78,9 @@ private void warnAboutDiskIfNeeded(DiskUsage usage) { // Check percentage disk values if (usage.getFreeDiskAsPercentage() < diskThresholdSettings.getFreeDiskThresholdHigh()) { + logger.warn("floodstage disk watermark [{}] exceeded on {}, all indices on this node will marked read-only", + Strings.format1Decimals(100.0 - diskThresholdSettings.getFreeDiskThresholdFloodStage(), "%"), usage); + } else if (usage.getFreeDiskAsPercentage() < diskThresholdSettings.getFreeDiskThresholdHigh()) { logger.warn("high disk watermark [{}] exceeded on {}, shards will be relocated away from this node", Strings.format1Decimals(100.0 - diskThresholdSettings.getFreeDiskThresholdHigh(), "%"), usage); } else if (usage.getFreeDiskAsPercentage() < diskThresholdSettings.getFreeDiskThresholdLow()) { @@ -80,7 +89,7 @@ private void warnAboutDiskIfNeeded(DiskUsage usage) { } } - @Override + public void onNewInfo(ClusterInfo info) { ImmutableOpenMap usages = info.getNodeLeastAvailableDiskUsages(); if (usages != null) { @@ -95,12 +104,21 @@ public void onNewInfo(ClusterInfo info) { nodeHasPassedWatermark.remove(node); } } - + ClusterState state = clusterStateSupplier.get(); + Set indicesToMarkReadOnly = new HashSet<>(); for (ObjectObjectCursor entry : usages) { String node = entry.key; DiskUsage usage = entry.value; warnAboutDiskIfNeeded(usage); - if (usage.getFreeBytes() < diskThresholdSettings.getFreeBytesThresholdHigh().getBytes() || + if (usage.getFreeBytes() < diskThresholdSettings.getFreeBytesThresholdFloodStage().getBytes() || + usage.getFreeDiskAsPercentage() < diskThresholdSettings.getFreeDiskThresholdFloodStage()) { + RoutingNode routingNode = state.getRoutingNodes().node(node); + if (routingNode != null) { // this might happen if we haven't got the full cluster-state yet?! + for (ShardRouting routing : routingNode) { + indicesToMarkReadOnly.add(routing.index().getName()); + } + } + } else if (usage.getFreeBytes() < diskThresholdSettings.getFreeBytesThresholdHigh().getBytes() || usage.getFreeDiskAsPercentage() < diskThresholdSettings.getFreeDiskThresholdHigh()) { if ((System.nanoTime() - lastRunNS) > diskThresholdSettings.getRerouteInterval().nanos()) { lastRunNS = System.nanoTime(); @@ -136,9 +154,25 @@ public void onNewInfo(ClusterInfo info) { } if (reroute) { logger.info("rerouting shards: [{}]", explanation); - // Execute an empty reroute, but don't block on the response - client.admin().cluster().prepareReroute().execute(); + reroute(); + } + indicesToMarkReadOnly.removeIf(index -> + state.getBlocks().indexBlocked(ClusterBlockLevel.WRITE, index) + ); + if (indicesToMarkReadOnly.isEmpty() == false) { + markIndicesReadOnly(indicesToMarkReadOnly); } } } + + protected void markIndicesReadOnly(Set indicesToMarkReadOnly) { + // set read-only block but don't block on the response + client.admin().indices().prepareUpdateSettings(indicesToMarkReadOnly.toArray(Strings.EMPTY_ARRAY)). + setSettings(Settings.builder().put(IndexMetaData.SETTING_READ_ONLY_ALLOW_DELETE, true).build()).execute(); + } + + protected void reroute() { + // Execute an empty reroute, but don't block on the response + client.admin().cluster().prepareReroute().execute(); + } } diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdSettings.java b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdSettings.java index b87add57ce75a..7c1d3b2081743 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdSettings.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdSettings.java @@ -42,6 +42,10 @@ public class DiskThresholdSettings { new Setting<>("cluster.routing.allocation.disk.watermark.high", "90%", (s) -> validWatermarkSetting(s, "cluster.routing.allocation.disk.watermark.high"), Setting.Property.Dynamic, Setting.Property.NodeScope); + public static final Setting CLUSTER_ROUTING_ALLOCATION_FLOOD_STAGE_SETTING = + new Setting<>("cluster.routing.allocation.disk.floodstage", "95%", + (s) -> validWatermarkSetting(s, "cluster.routing.allocation.disk.floodstage"), + Setting.Property.Dynamic, Setting.Property.NodeScope); public static final Setting CLUSTER_ROUTING_ALLOCATION_INCLUDE_RELOCATIONS_SETTING = Setting.boolSetting("cluster.routing.allocation.disk.include_relocations", true, Setting.Property.Dynamic, Setting.Property.NodeScope);; @@ -58,17 +62,23 @@ public class DiskThresholdSettings { private volatile boolean includeRelocations; private volatile boolean enabled; private volatile TimeValue rerouteInterval; + private volatile String floodStageRaw; + private volatile Double freeDiskThresholdFloodStage; + private volatile ByteSizeValue freeBytesThresholdFloodStage; public DiskThresholdSettings(Settings settings, ClusterSettings clusterSettings) { final String lowWatermark = CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.get(settings); final String highWatermark = CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING.get(settings); + final String floodStage = CLUSTER_ROUTING_ALLOCATION_FLOOD_STAGE_SETTING.get(settings); setHighWatermark(highWatermark); setLowWatermark(lowWatermark); + setFloodStageRaw(floodStage); this.includeRelocations = CLUSTER_ROUTING_ALLOCATION_INCLUDE_RELOCATIONS_SETTING.get(settings); this.rerouteInterval = CLUSTER_ROUTING_ALLOCATION_REROUTE_INTERVAL_SETTING.get(settings); this.enabled = CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED_SETTING.get(settings); clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING, this::setLowWatermark); clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING, this::setHighWatermark); + clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_FLOOD_STAGE_SETTING, this::setFloodStageRaw); clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_INCLUDE_RELOCATIONS_SETTING, this::setIncludeRelocations); clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_REROUTE_INTERVAL_SETTING, this::setRerouteInterval); clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED_SETTING, this::setEnabled); @@ -99,7 +109,15 @@ private void setHighWatermark(String highWatermark) { this.highWatermarkRaw = highWatermark; this.freeDiskThresholdHigh = 100.0 - thresholdPercentageFromWatermark(highWatermark); this.freeBytesThresholdHigh = thresholdBytesFromWatermark(highWatermark, - CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.getKey()); + CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING.getKey()); + } + + private void setFloodStageRaw(String floodStageRaw) { + // Watermark is expressed in terms of used data, but we need "free" data watermark + this.floodStageRaw = floodStageRaw; + this.freeDiskThresholdFloodStage = 100.0 - thresholdPercentageFromWatermark(floodStageRaw); + this.freeBytesThresholdFloodStage = thresholdBytesFromWatermark(floodStageRaw, + CLUSTER_ROUTING_ALLOCATION_FLOOD_STAGE_SETTING.getKey()); } /** @@ -132,6 +150,18 @@ public ByteSizeValue getFreeBytesThresholdHigh() { return freeBytesThresholdHigh; } + public Double getFreeDiskThresholdFloodStage() { + return freeDiskThresholdFloodStage; + } + + public ByteSizeValue getFreeBytesThresholdFloodStage() { + return freeBytesThresholdFloodStage; + } + + public String getFloodStageRaw() { + return floodStageRaw; + } + public boolean includeRelocations() { return includeRelocations; } diff --git a/core/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java b/core/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java index febede42da5f6..9634c2825481c 100644 --- a/core/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java +++ b/core/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java @@ -199,6 +199,7 @@ public void apply(Settings value, Settings current, Settings previous) { ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_RECOVERIES_SETTING, DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING, DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING, + DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_FLOOD_STAGE_SETTING, DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED_SETTING, DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_INCLUDE_RELOCATIONS_SETTING, DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_REROUTE_INTERVAL_SETTING, diff --git a/core/src/main/java/org/elasticsearch/node/Node.java b/core/src/main/java/org/elasticsearch/node/Node.java index b9e6668666945..831b3d39d6713 100644 --- a/core/src/main/java/org/elasticsearch/node/Node.java +++ b/core/src/main/java/org/elasticsearch/node/Node.java @@ -36,6 +36,7 @@ import org.elasticsearch.bootstrap.BootstrapCheck; import org.elasticsearch.client.Client; import org.elasticsearch.client.node.NodeClient; +import org.elasticsearch.cluster.ClusterInfo; import org.elasticsearch.cluster.ClusterInfoService; import org.elasticsearch.cluster.ClusterModule; import org.elasticsearch.cluster.ClusterState; @@ -50,6 +51,7 @@ import org.elasticsearch.cluster.metadata.TemplateUpgradeService; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.RoutingService; +import org.elasticsearch.cluster.routing.allocation.DiskThresholdMonitor; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.StopWatch; import org.elasticsearch.common.component.Lifecycle; @@ -333,7 +335,10 @@ protected Node(final Environment environment, Collection resourcesToClose.add(clusterService); final IngestService ingestService = new IngestService(clusterService.getClusterSettings(), settings, threadPool, this.environment, scriptModule.getScriptService(), analysisModule.getAnalysisRegistry(), pluginsService.filterPlugins(IngestPlugin.class)); - final ClusterInfoService clusterInfoService = newClusterInfoService(settings, clusterService, threadPool, client); + final DiskThresholdMonitor listener = new DiskThresholdMonitor(settings, clusterService::state, + clusterService.getClusterSettings(), client); + final ClusterInfoService clusterInfoService = newClusterInfoService(settings, clusterService, threadPool, client, + listener::onNewInfo); final UsageService usageService = new UsageService(settings); ModulesBuilder modules = new ModulesBuilder(); @@ -342,7 +347,6 @@ protected Node(final Environment environment, Collection modules.add(pluginModule); } final MonitorService monitorService = new MonitorService(settings, nodeEnvironment, threadPool, clusterInfoService); - modules.add(new NodeModule(this, monitorService)); ClusterModule clusterModule = new ClusterModule(settings, clusterService, pluginsService.filterPlugins(ClusterPlugin.class), clusterInfoService); modules.add(clusterModule); @@ -442,6 +446,7 @@ protected Node(final Environment environment, Collection transportService, indicesService, pluginsService, circuitBreakerService, scriptModule.getScriptService(), httpServerTransport, ingestService, clusterService, settingsModule.getSettingsFilter()); modules.add(b -> { + b.bind(Node.class).toInstance(this); b.bind(NodeService.class).toInstance(nodeService); b.bind(NamedXContentRegistry.class).toInstance(xContentRegistry); b.bind(PluginsService.class).toInstance(pluginsService); @@ -457,7 +462,7 @@ protected Node(final Environment environment, Collection b.bind(ScriptService.class).toInstance(scriptModule.getScriptService()); b.bind(AnalysisRegistry.class).toInstance(analysisModule.getAnalysisRegistry()); b.bind(IngestService.class).toInstance(ingestService); - b.bind(UsageService.class).toInstance(usageService); + b.bind(UsageService.class).toInstance(usageService); b.bind(NamedWriteableRegistry.class).toInstance(namedWriteableRegistry); b.bind(MetaDataUpgrader.class).toInstance(metaDataUpgrader); b.bind(MetaStateService.class).toInstance(metaStateService); @@ -917,8 +922,8 @@ protected Node newTribeClientNode(Settings settings, Collection listeners) { + return new InternalClusterInfoService(settings, clusterService, threadPool, client, listeners); } private static class LocalNodeFactory implements Function { diff --git a/core/src/main/java/org/elasticsearch/node/NodeModule.java b/core/src/main/java/org/elasticsearch/node/NodeModule.java deleted file mode 100644 index 929e889503ea4..0000000000000 --- a/core/src/main/java/org/elasticsearch/node/NodeModule.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.node; - -import org.elasticsearch.cluster.routing.allocation.DiskThresholdMonitor; -import org.elasticsearch.common.inject.AbstractModule; -import org.elasticsearch.monitor.MonitorService; - -public class NodeModule extends AbstractModule { - - private final Node node; - private final MonitorService monitorService; - - public NodeModule(Node node, MonitorService monitorService) { - this.node = node; - this.monitorService = monitorService; - } - - @Override - protected void configure() { - bind(Node.class).toInstance(node); - bind(MonitorService.class).toInstance(monitorService); - bind(DiskThresholdMonitor.class).asEagerSingleton(); - } -} diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdMonitorTests.java b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdMonitorTests.java new file mode 100644 index 0000000000000..bc243b2db0bb4 --- /dev/null +++ b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdMonitorTests.java @@ -0,0 +1,139 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.cluster.routing.allocation; + +import org.elasticsearch.Version; +import org.elasticsearch.cluster.ClusterInfo; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.DiskUsage; +import org.elasticsearch.cluster.ESAllocationTestCase; +import org.elasticsearch.cluster.block.ClusterBlockLevel; +import org.elasticsearch.cluster.block.ClusterBlocks; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.routing.RoutingTable; +import org.elasticsearch.common.collect.ImmutableOpenMap; +import org.elasticsearch.common.settings.ClusterSettings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.test.ESTestCase; + +import java.util.Arrays; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING; + +public class DiskThresholdMonitorTests extends ESAllocationTestCase { + + + public void testMarkFloodStageIndicesReadOnly() { + AllocationService allocation = createAllocationService(Settings.builder() + .put("cluster.routing.allocation.node_concurrent_recoveries", 10).build()); + Settings settings = Settings.EMPTY; + MetaData metaData = MetaData.builder() + .put(IndexMetaData.builder("test").settings(settings(Version.CURRENT) + .put("index.routing.allocation.require._id", "node2")).numberOfShards(1).numberOfReplicas(0)) + .put(IndexMetaData.builder("test_1").settings(settings(Version.CURRENT) + .put("index.routing.allocation.require._id", "node1")).numberOfShards(1).numberOfReplicas(0)) + .put(IndexMetaData.builder("test_2").settings(settings(Version.CURRENT) + .put("index.routing.allocation.require._id", "node1")).numberOfShards(1).numberOfReplicas(0)) + .build(); + RoutingTable routingTable = RoutingTable.builder() + .addAsNew(metaData.index("test")) + .addAsNew(metaData.index("test_1")) + .addAsNew(metaData.index("test_2")) + + .build(); + ClusterState clusterState = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) + .metaData(metaData).routingTable(routingTable).build(); + logger.info("adding two nodes and performing rerouting"); + clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder().add(newNode("node1")) + .add(newNode("node2"))).build(); + clusterState = allocation.reroute(clusterState, "reroute"); + logger.info("start primary shard"); + clusterState = allocation.applyStartedShards(clusterState, clusterState.getRoutingNodes().shardsWithState(INITIALIZING)); + ClusterState finalState = clusterState; + AtomicBoolean reroute = new AtomicBoolean(false); + AtomicReference> indices = new AtomicReference<>(); + DiskThresholdMonitor monitor = new DiskThresholdMonitor(settings, () -> finalState, + new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), null) { + @Override + protected void reroute() { + assertTrue(reroute.compareAndSet(false, true)); + } + + @Override + protected void markIndicesReadOnly(Set indicesToMarkReadOnly) { + assertTrue(indices.compareAndSet(null, indicesToMarkReadOnly)); + } + }; + ImmutableOpenMap.Builder builder = ImmutableOpenMap.builder(); + builder.put("node1", new DiskUsage("node1","node1", "/foo/bar", 100, 4)); + builder.put("node2", new DiskUsage("node2","node2", "/foo/bar", 100, 30)); + monitor.onNewInfo(new ClusterInfo(builder.build(), null, null, null)); + assertFalse(reroute.get()); + assertEquals(new HashSet<>(Arrays.asList("test_1", "test_2")), indices.get()); + + indices.set(null); + builder = ImmutableOpenMap.builder(); + builder.put("node1", new DiskUsage("node1","node1", "/foo/bar", 100, 4)); + builder.put("node2", new DiskUsage("node2","node2", "/foo/bar", 100, 5)); + monitor.onNewInfo(new ClusterInfo(builder.build(), null, null, null)); + assertTrue(reroute.get()); + assertEquals(new HashSet<>(Arrays.asList("test_1", "test_2")), indices.get()); + IndexMetaData indexMetaData = IndexMetaData.builder(clusterState.metaData().index("test_2")).settings(Settings.builder() + .put(clusterState.metaData() + .index("test_2").getSettings()) + .put(IndexMetaData.INDEX_BLOCKS_READ_ONLY_ALLOW_DELETE_SETTING.getKey(), true)).build(); + + // now we mark one index as read-only and assert that we don't mark it as such again + final ClusterState anotherFinalClusterState = ClusterState.builder(clusterState).metaData(MetaData.builder(clusterState.metaData()) + .put(clusterState.metaData().index("test"), false) + .put(clusterState.metaData().index("test_1"), false) + .put(indexMetaData, true).build()) + .blocks(ClusterBlocks.builder().addBlocks(indexMetaData).build()).build(); + assertTrue(anotherFinalClusterState.blocks().indexBlocked(ClusterBlockLevel.WRITE, "test_2")); + + monitor = new DiskThresholdMonitor(settings, () -> anotherFinalClusterState, + new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), null) { + @Override + protected void reroute() { + assertTrue(reroute.compareAndSet(false, true)); + } + + @Override + protected void markIndicesReadOnly(Set indicesToMarkReadOnly) { + assertTrue(indices.compareAndSet(null, indicesToMarkReadOnly)); + } + }; + + indices.set(null); + reroute.set(false); + builder = ImmutableOpenMap.builder(); + builder.put("node1", new DiskUsage("node1","node1", "/foo/bar", 100, 4)); + builder.put("node2", new DiskUsage("node2","node2", "/foo/bar", 100, 5)); + monitor.onNewInfo(new ClusterInfo(builder.build(), null, null, null)); + assertTrue(reroute.get()); + assertEquals(new HashSet<>(Arrays.asList("test_1")), indices.get()); + } +} diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/ExpectedShardSizeAllocationTests.java b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/ExpectedShardSizeAllocationTests.java index f73f97b61a3e8..1ed5a3ac7ed90 100644 --- a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/ExpectedShardSizeAllocationTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/ExpectedShardSizeAllocationTests.java @@ -46,22 +46,13 @@ public class ExpectedShardSizeAllocationTests extends ESAllocationTestCase { public void testInitializingHasExpectedSize() { final long byteSize = randomIntBetween(0, Integer.MAX_VALUE); - AllocationService strategy = createAllocationService(Settings.EMPTY, new ClusterInfoService() { + AllocationService strategy = createAllocationService(Settings.EMPTY, () -> new ClusterInfo() { @Override - public ClusterInfo getClusterInfo() { - return new ClusterInfo() { - @Override - public Long getShardSize(ShardRouting shardRouting) { - if (shardRouting.getIndexName().equals("test") && shardRouting.shardId().getId() == 0) { - return byteSize; - } - return null; - } - }; - } - - @Override - public void addListener(Listener listener) { + public Long getShardSize(ShardRouting shardRouting) { + if (shardRouting.getIndexName().equals("test") && shardRouting.shardId().getId() == 0) { + return byteSize; + } + return null; } }); @@ -101,22 +92,13 @@ public void addListener(Listener listener) { public void testExpectedSizeOnMove() { final long byteSize = randomIntBetween(0, Integer.MAX_VALUE); - final AllocationService allocation = createAllocationService(Settings.EMPTY, new ClusterInfoService() { - @Override - public ClusterInfo getClusterInfo() { - return new ClusterInfo() { - @Override - public Long getShardSize(ShardRouting shardRouting) { - if (shardRouting.getIndexName().equals("test") && shardRouting.shardId().getId() == 0) { - return byteSize; - } - return null; - } - }; - } - + final AllocationService allocation = createAllocationService(Settings.EMPTY, () -> new ClusterInfo() { @Override - public void addListener(Listener listener) { + public Long getShardSize(ShardRouting shardRouting) { + if (shardRouting.getIndexName().equals("test") && shardRouting.shardId().getId() == 0) { + return byteSize; + } + return null; } }); logger.info("creating an index with 1 shard, no replica"); diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/RebalanceAfterActiveTests.java b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/RebalanceAfterActiveTests.java index ea8cbe19b7f4c..f6ab967a10b46 100644 --- a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/RebalanceAfterActiveTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/RebalanceAfterActiveTests.java @@ -57,23 +57,15 @@ public void testRebalanceOnlyAfterAllShardsAreActive() { .put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always") .put("cluster.routing.allocation.cluster_concurrent_rebalance", -1) .build(), - new ClusterInfoService() { - @Override - public ClusterInfo getClusterInfo() { - return new ClusterInfo() { - @Override - public Long getShardSize(ShardRouting shardRouting) { - if (shardRouting.getIndexName().equals("test")) { - return sizes[shardRouting.getId()]; - } - return null; } - }; + () -> new ClusterInfo() { + @Override + public Long getShardSize(ShardRouting shardRouting) { + if (shardRouting.getIndexName().equals("test")) { + return sizes[shardRouting.getId()]; } - - @Override - public void addListener(Listener listener) { - } - }); + return null; + } + }); logger.info("Building initial routing table"); MetaData metaData = MetaData.builder() diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderTests.java b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderTests.java index e4f6ed79c6940..805a8259bd0d7 100644 --- a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderTests.java @@ -104,11 +104,6 @@ public ClusterInfo getClusterInfo() { logger.info("--> calling fake getClusterInfo"); return clusterInfo; } - - @Override - public void addListener(Listener listener) { - // noop - } }; AllocationService strategy = new AllocationService(Settings.builder() .put("cluster.routing.allocation.node_concurrent_recoveries", 10) @@ -292,11 +287,6 @@ public ClusterInfo getClusterInfo() { logger.info("--> calling fake getClusterInfo"); return clusterInfo; } - - @Override - public void addListener(Listener listener) { - // noop - } }; AllocationService strategy = new AllocationService(Settings.builder() @@ -350,11 +340,6 @@ public ClusterInfo getClusterInfo() { logger.info("--> calling fake getClusterInfo"); return clusterInfo2; } - - @Override - public void addListener(Listener listener) { - // noop - } }; strategy = new AllocationService(Settings.builder() .put("cluster.routing.allocation.node_concurrent_recoveries", 10) @@ -548,11 +533,6 @@ public ClusterInfo getClusterInfo() { logger.info("--> calling fake getClusterInfo"); return clusterInfo; } - - @Override - public void addListener(Listener listener) { - // noop - } }; AllocationService strategy = new AllocationService(Settings.builder() @@ -620,11 +600,6 @@ public ClusterInfo getClusterInfo() { logger.info("--> calling fake getClusterInfo"); return clusterInfo; } - - @Override - public void addListener(Listener listener) { - // noop - } }; AllocationService strategy = new AllocationService(Settings.builder() @@ -726,11 +701,6 @@ public ClusterInfo getClusterInfo() { logger.info("--> calling fake getClusterInfo"); return clusterInfo; } - - @Override - public void addListener(Listener listener) { - // noop - } }; AllocationService strategy = new AllocationService(Settings.builder() @@ -914,11 +884,6 @@ public ClusterInfo getClusterInfo() { logger.info("--> calling fake getClusterInfo"); return clusterInfo; } - - @Override - public void addListener(Listener listener) { - // noop - } }; AllocationDeciders deciders = new AllocationDeciders(Settings.EMPTY, new HashSet<>(Arrays.asList( new SameShardAllocationDecider( @@ -1015,10 +980,6 @@ public ClusterInfo getClusterInfo() { logger.info("--> calling fake getClusterInfo"); return clusterInfo; } - - @Override - public void addListener(Listener listener) { - } }; AllocationDeciders deciders = new AllocationDeciders(Settings.EMPTY, new HashSet<>(Arrays.asList( diff --git a/test/framework/src/main/java/org/elasticsearch/cluster/MockInternalClusterInfoService.java b/test/framework/src/main/java/org/elasticsearch/cluster/MockInternalClusterInfoService.java index fc455783575a3..4f6e8bb192482 100644 --- a/test/framework/src/main/java/org/elasticsearch/cluster/MockInternalClusterInfoService.java +++ b/test/framework/src/main/java/org/elasticsearch/cluster/MockInternalClusterInfoService.java @@ -21,6 +21,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.concurrent.CountDownLatch; +import java.util.function.Consumer; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; @@ -69,8 +70,9 @@ public static NodeStats makeStats(String nodeName, DiskUsage usage) { null, null, null); } - public MockInternalClusterInfoService(Settings settings, ClusterService clusterService, ThreadPool threadPool, NodeClient client) { - super(settings, clusterService, threadPool, client); + public MockInternalClusterInfoService(Settings settings, ClusterService clusterService, ThreadPool threadPool, NodeClient client, + Consumer listener) { + super(settings, clusterService, threadPool, client, listener); this.clusterName = ClusterName.CLUSTER_NAME_SETTING.get(settings); stats[0] = makeStats("node_t1", new DiskUsage("node_t1", "n1", "/dev/null", 100, 100)); stats[1] = makeStats("node_t2", new DiskUsage("node_t2", "n2", "/dev/null", 100, 100)); diff --git a/test/framework/src/main/java/org/elasticsearch/node/MockNode.java b/test/framework/src/main/java/org/elasticsearch/node/MockNode.java index 619583850176c..2e0aa98a920cd 100644 --- a/test/framework/src/main/java/org/elasticsearch/node/MockNode.java +++ b/test/framework/src/main/java/org/elasticsearch/node/MockNode.java @@ -20,6 +20,7 @@ package org.elasticsearch.node; import org.elasticsearch.client.node.NodeClient; +import org.elasticsearch.cluster.ClusterInfo; import org.elasticsearch.cluster.ClusterInfoService; import org.elasticsearch.cluster.MockInternalClusterInfoService; import org.elasticsearch.cluster.node.DiscoveryNode; @@ -47,6 +48,7 @@ import java.nio.file.Path; import java.util.Collection; import java.util.Collections; +import java.util.function.Consumer; import java.util.function.Function; /** @@ -124,11 +126,11 @@ protected void processRecoverySettings(ClusterSettings clusterSettings, Recovery @Override protected ClusterInfoService newClusterInfoService(Settings settings, ClusterService clusterService, - ThreadPool threadPool, NodeClient client) { + ThreadPool threadPool, NodeClient client, Consumer listener) { if (getPluginsService().filterPlugins(MockInternalClusterInfoService.TestPlugin.class).isEmpty()) { - return super.newClusterInfoService(settings, clusterService, threadPool, client); + return super.newClusterInfoService(settings, clusterService, threadPool, client, listener); } else { - return new MockInternalClusterInfoService(settings, clusterService, threadPool, client); + return new MockInternalClusterInfoService(settings, clusterService, threadPool, client, listener); } } }