From bfbbd73dab8ba0da1fb78193629833d4ddc584de Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Wed, 15 Jan 2020 15:50:08 +0100 Subject: [PATCH] Block too many concurrent mapping updates (#51038) Ensures that there are not too many concurrent dynamic mapping updates going out from the data nodes to the master. Closes #50670 --- .../action/index/MappingUpdatedAction.java | 66 ++++++++++ .../common/settings/ClusterSettings.java | 1 + .../index/MappingUpdatedActionTests.java | 118 ++++++++++++++++++ .../snapshots/SnapshotResiliencyTests.java | 1 + .../test/InternalTestCluster.java | 2 + 5 files changed, 188 insertions(+) create mode 100644 server/src/test/java/org/elasticsearch/cluster/action/index/MappingUpdatedActionTests.java diff --git a/server/src/main/java/org/elasticsearch/cluster/action/index/MappingUpdatedAction.java b/server/src/main/java/org/elasticsearch/cluster/action/index/MappingUpdatedAction.java index 874b5eb00646c..f1051bca36c43 100644 --- a/server/src/main/java/org/elasticsearch/cluster/action/index/MappingUpdatedAction.java +++ b/server/src/main/java/org/elasticsearch/cluster/action/index/MappingUpdatedAction.java @@ -30,10 +30,13 @@ import org.elasticsearch.common.settings.Setting.Property; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.concurrent.RunOnce; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.Index; import org.elasticsearch.index.mapper.Mapping; +import java.util.concurrent.Semaphore; + /** * Called by shards in the cluster when their mapping was dynamically updated and it needs to be updated * in the cluster state meta data (and broadcast to all members). @@ -44,19 +47,30 @@ public class MappingUpdatedAction { Setting.positiveTimeSetting("indices.mapping.dynamic_timeout", TimeValue.timeValueSeconds(30), Property.Dynamic, Property.NodeScope); + public static final Setting INDICES_MAX_IN_FLIGHT_UPDATES_SETTING = + Setting.intSetting("indices.mapping.max_in_flight_updates", 10, 1, 1000, + Property.Dynamic, Property.NodeScope); + private IndicesAdminClient client; private volatile TimeValue dynamicMappingUpdateTimeout; + private final AdjustableSemaphore semaphore; @Inject public MappingUpdatedAction(Settings settings, ClusterSettings clusterSettings) { this.dynamicMappingUpdateTimeout = INDICES_MAPPING_DYNAMIC_TIMEOUT_SETTING.get(settings); + this.semaphore = new AdjustableSemaphore(INDICES_MAX_IN_FLIGHT_UPDATES_SETTING.get(settings), true); clusterSettings.addSettingsUpdateConsumer(INDICES_MAPPING_DYNAMIC_TIMEOUT_SETTING, this::setDynamicMappingUpdateTimeout); + clusterSettings.addSettingsUpdateConsumer(INDICES_MAX_IN_FLIGHT_UPDATES_SETTING, this::setMaxInFlightUpdates); } private void setDynamicMappingUpdateTimeout(TimeValue dynamicMappingUpdateTimeout) { this.dynamicMappingUpdateTimeout = dynamicMappingUpdateTimeout; } + private void setMaxInFlightUpdates(int maxInFlightUpdates) { + semaphore.setMaxPermits(maxInFlightUpdates); + } + public void setClient(Client client) { this.client = client.admin().indices(); } @@ -68,6 +82,32 @@ public void setClient(Client client) { * potentially waiting for a master node to be available. */ public void updateMappingOnMaster(Index index, Mapping mappingUpdate, ActionListener listener) { + final RunOnce release = new RunOnce(() -> semaphore.release()); + try { + semaphore.acquire(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + listener.onFailure(e); + return; + } + boolean successFullySent = false; + try { + sendUpdateMapping(index, mappingUpdate, ActionListener.runBefore(listener, release::run)); + successFullySent = true; + } finally { + if (successFullySent == false) { + release.run(); + } + } + } + + // used by tests + int blockedThreads() { + return semaphore.getQueueLength(); + } + + // can be overridden by tests + protected void sendUpdateMapping(Index index, Mapping mappingUpdate, ActionListener listener) { client.preparePutMapping().setConcreteIndex(index).setSource(mappingUpdate.toString(), XContentType.JSON) .setMasterNodeTimeout(dynamicMappingUpdateTimeout).setTimeout(TimeValue.ZERO) .execute(new ActionListener<>() { @@ -82,4 +122,30 @@ public void onFailure(Exception e) { } }); } + + static class AdjustableSemaphore extends Semaphore { + + private final Object maxPermitsMutex = new Object(); + private int maxPermits; + + AdjustableSemaphore(int maxPermits, boolean fair) { + super(maxPermits, fair); + this.maxPermits = maxPermits; + } + + void setMaxPermits(int permits) { + synchronized (maxPermitsMutex) { + final int diff = Math.subtractExact(permits, maxPermits); + if (diff > 0) { + // add permits + release(diff); + } else if (diff < 0) { + // remove permits + reducePermits(Math.negateExact(diff)); + } + + maxPermits = permits; + } + } + } } diff --git a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java index 6cb2bbd3eaafd..f8ea133c0ed63 100644 --- a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java @@ -201,6 +201,7 @@ public void apply(Settings value, Settings current, Settings previous) { IndicesService.INDICES_ID_FIELD_DATA_ENABLED_SETTING, IndicesService.WRITE_DANGLING_INDICES_INFO_SETTING, MappingUpdatedAction.INDICES_MAPPING_DYNAMIC_TIMEOUT_SETTING, + MappingUpdatedAction.INDICES_MAX_IN_FLIGHT_UPDATES_SETTING, MetaData.SETTING_READ_ONLY_SETTING, MetaData.SETTING_READ_ONLY_ALLOW_DELETE_SETTING, MetaData.SETTING_CLUSTER_MAX_SHARDS_PER_NODE, diff --git a/server/src/test/java/org/elasticsearch/cluster/action/index/MappingUpdatedActionTests.java b/server/src/test/java/org/elasticsearch/cluster/action/index/MappingUpdatedActionTests.java new file mode 100644 index 0000000000000..7f19e725c5d4f --- /dev/null +++ b/server/src/test/java/org/elasticsearch/cluster/action/index/MappingUpdatedActionTests.java @@ -0,0 +1,118 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.cluster.action.index; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.support.PlainActionFuture; +import org.elasticsearch.cluster.action.index.MappingUpdatedAction.AdjustableSemaphore; +import org.elasticsearch.common.settings.ClusterSettings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.Index; +import org.elasticsearch.index.mapper.Mapping; +import org.elasticsearch.test.ESTestCase; + +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; + +public class MappingUpdatedActionTests extends ESTestCase { + + public void testAdjustableSemaphore() { + AdjustableSemaphore sem = new AdjustableSemaphore(1, randomBoolean()); + assertEquals(1, sem.availablePermits()); + assertTrue(sem.tryAcquire()); + assertEquals(0, sem.availablePermits()); + assertFalse(sem.tryAcquire()); + assertEquals(0, sem.availablePermits()); + + // increase the number of max permits to 2 + sem.setMaxPermits(2); + assertEquals(1, sem.availablePermits()); + assertTrue(sem.tryAcquire()); + assertEquals(0, sem.availablePermits()); + + // release all current permits + sem.release(); + assertEquals(1, sem.availablePermits()); + sem.release(); + assertEquals(2, sem.availablePermits()); + + // reduce number of max permits to 1 + sem.setMaxPermits(1); + assertEquals(1, sem.availablePermits()); + // set back to 2 + sem.setMaxPermits(2); + assertEquals(2, sem.availablePermits()); + + // take both permits and reduce max permits + assertTrue(sem.tryAcquire()); + assertTrue(sem.tryAcquire()); + assertEquals(0, sem.availablePermits()); + assertFalse(sem.tryAcquire()); + sem.setMaxPermits(1); + assertEquals(-1, sem.availablePermits()); + assertFalse(sem.tryAcquire()); + + // release one permit + sem.release(); + assertEquals(0, sem.availablePermits()); + assertFalse(sem.tryAcquire()); + + // release second permit + sem.release(); + assertEquals(1, sem.availablePermits()); + assertTrue(sem.tryAcquire()); + } + + public void testMappingUpdatedActionBlocks() throws Exception { + List> inFlightListeners = new CopyOnWriteArrayList<>(); + final MappingUpdatedAction mua = new MappingUpdatedAction(Settings.builder() + .put(MappingUpdatedAction.INDICES_MAX_IN_FLIGHT_UPDATES_SETTING.getKey(), 1).build(), + new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)) { + + @Override + protected void sendUpdateMapping(Index index, Mapping mappingUpdate, ActionListener listener) { + inFlightListeners.add(listener); + } + }; + + PlainActionFuture fut1 = new PlainActionFuture<>(); + mua.updateMappingOnMaster(null, null, fut1); + assertEquals(1, inFlightListeners.size()); + assertEquals(0, mua.blockedThreads()); + + PlainActionFuture fut2 = new PlainActionFuture<>(); + Thread thread = new Thread(() -> { + mua.updateMappingOnMaster(null, null, fut2); // blocked + }); + thread.start(); + assertBusy(() -> assertEquals(1, mua.blockedThreads())); + + assertEquals(1, inFlightListeners.size()); + assertFalse(fut1.isDone()); + inFlightListeners.remove(0).onResponse(null); + assertTrue(fut1.isDone()); + + thread.join(); + assertEquals(0, mua.blockedThreads()); + assertEquals(1, inFlightListeners.size()); + assertFalse(fut2.isDone()); + inFlightListeners.remove(0).onResponse(null); + assertTrue(fut2.isDone()); + } +} diff --git a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java index 9b8950c813b4f..03153595dcf18 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java @@ -885,6 +885,7 @@ private Environment createEnvironment(String nodeName) { .put(Environment.PATH_REPO_SETTING.getKey(), tempDir.resolve("repo").toAbsolutePath()) .putList(ClusterBootstrapService.INITIAL_MASTER_NODES_SETTING.getKey(), ClusterBootstrapService.INITIAL_MASTER_NODES_SETTING.get(Settings.EMPTY)) + .put(MappingUpdatedAction.INDICES_MAX_IN_FLIGHT_UPDATES_SETTING.getKey(), 1000) // o.w. some tests might block .build()); } diff --git a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java index 98911fe3f4cb1..58144da28d111 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java +++ b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java @@ -471,6 +471,8 @@ private static Settings getRandomNodeSettings(long seed) { if (random.nextBoolean()) { builder.put(MappingUpdatedAction.INDICES_MAPPING_DYNAMIC_TIMEOUT_SETTING.getKey(), timeValueSeconds(RandomNumbers.randomIntBetween(random, 10, 30)).getStringRep()); + builder.put(MappingUpdatedAction.INDICES_MAX_IN_FLIGHT_UPDATES_SETTING.getKey(), + RandomNumbers.randomIntBetween(random, 1, 10)); } // turning on the real memory circuit breaker leads to spurious test failures. As have no full control over heap usage, we