From 8e5152643a4c90ba9267f7c258de337cdaa446a6 Mon Sep 17 00:00:00 2001 From: David Turner Date: Thu, 19 Mar 2020 09:17:27 +0000 Subject: [PATCH 1/7] Assertions during publication --- .../cluster/coordination/PublicationTransportHandler.java | 2 ++ .../cluster/service/FakeThreadPoolMasterService.java | 6 +++++- 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/PublicationTransportHandler.java b/server/src/main/java/org/elasticsearch/cluster/coordination/PublicationTransportHandler.java index a4b2f509b2ed8..1a6db35a8091e 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/PublicationTransportHandler.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/PublicationTransportHandler.java @@ -161,6 +161,7 @@ public PublicationContext newPublicationContext(ClusterChangedEvent clusterChang public void sendPublishRequest(DiscoveryNode destination, PublishRequest publishRequest, ActionListener originalListener) { assert publishRequest.getAcceptedState() == clusterChangedEvent.state() : "state got switched on us"; + assert transportService.getThreadPool().getThreadContext().isSystemContext(); final ActionListener responseActionListener; if (destination.equals(nodes.getLocalNode())) { // if publishing to self, use original request instead (see currentPublishRequestToSelf for explanation) @@ -197,6 +198,7 @@ public void onFailure(Exception e) { @Override public void sendApplyCommit(DiscoveryNode destination, ApplyCommitRequest applyCommitRequest, ActionListener responseActionListener) { + assert transportService.getThreadPool().getThreadContext().isSystemContext(); transportService.sendRequest(destination, COMMIT_STATE_ACTION_NAME, applyCommitRequest, stateRequestOptions, new TransportResponseHandler() { diff --git a/test/framework/src/main/java/org/elasticsearch/cluster/service/FakeThreadPoolMasterService.java b/test/framework/src/main/java/org/elasticsearch/cluster/service/FakeThreadPoolMasterService.java index 74d9cc17a5325..179a8dcb9848d 100644 --- a/test/framework/src/main/java/org/elasticsearch/cluster/service/FakeThreadPoolMasterService.java +++ b/test/framework/src/main/java/org/elasticsearch/cluster/service/FakeThreadPoolMasterService.java @@ -110,7 +110,11 @@ public void run() { final Runnable task = pendingTasks.remove(taskIndex); taskInProgress = true; scheduledNextTask = false; - task.run(); + final ThreadContext threadContext = threadPool.getThreadContext(); + try (ThreadContext.StoredContext ignored = threadContext.stashContext()) { + threadContext.markAsSystemContext(); + task.run(); + } if (waitForPublish == false) { taskInProgress = false; } From 848fae520acb5ad53f2127852a2f4661f7f2797f Mon Sep 17 00:00:00 2001 From: David Turner Date: Thu, 19 Mar 2020 09:33:34 +0000 Subject: [PATCH 2/7] Cop out --- .../elasticsearch/cluster/service/ClusterApplierService.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/service/ClusterApplierService.java b/server/src/main/java/org/elasticsearch/cluster/service/ClusterApplierService.java index 574919f6751ab..4912c8f17a8e7 100644 --- a/server/src/main/java/org/elasticsearch/cluster/service/ClusterApplierService.java +++ b/server/src/main/java/org/elasticsearch/cluster/service/ClusterApplierService.java @@ -46,6 +46,7 @@ import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor; +import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.common.util.iterable.Iterables; import org.elasticsearch.threadpool.Scheduler; import org.elasticsearch.threadpool.ThreadPool; @@ -347,7 +348,9 @@ private void submitStateUpdateTask(final String source, final ClusterStateTaskCo if (!lifecycle.started()) { return; } - try { + final ThreadContext threadContext = threadPool.getThreadContext(); + try (ThreadContext.StoredContext ignored = threadContext.stashContext()) { + threadContext.markAsSystemContext(); UpdateTask updateTask = new UpdateTask(config.priority(), source, new SafeClusterApplyListener(listener, logger), executor); if (config.timeout() != null) { threadPoolExecutor.execute(updateTask, config.timeout(), From 7f8c33e4e7e27ace4a77e694fe8c8017d54dc017 Mon Sep 17 00:00:00 2001 From: David Turner Date: Thu, 19 Mar 2020 09:42:00 +0000 Subject: [PATCH 3/7] Assert that appliers/listeners are in system context --- .../ClusterApplierAssertionPlugin.java | 50 +++++++++++++++++++ .../elasticsearch/test/ESIntegTestCase.java | 4 ++ 2 files changed, 54 insertions(+) create mode 100644 test/framework/src/main/java/org/elasticsearch/cluster/service/ClusterApplierAssertionPlugin.java diff --git a/test/framework/src/main/java/org/elasticsearch/cluster/service/ClusterApplierAssertionPlugin.java b/test/framework/src/main/java/org/elasticsearch/cluster/service/ClusterApplierAssertionPlugin.java new file mode 100644 index 0000000000000..b44760ae37195 --- /dev/null +++ b/test/framework/src/main/java/org/elasticsearch/cluster/service/ClusterApplierAssertionPlugin.java @@ -0,0 +1,50 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.cluster.service; + +import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.common.io.stream.NamedWriteableRegistry; +import org.elasticsearch.common.xcontent.NamedXContentRegistry; +import org.elasticsearch.env.Environment; +import org.elasticsearch.env.NodeEnvironment; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.script.ScriptService; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.watcher.ResourceWatcherService; + +import java.util.Collection; +import java.util.Collections; + +public class ClusterApplierAssertionPlugin extends Plugin { + @Override + public Collection createComponents(Client client, ClusterService clusterService, ThreadPool threadPool, + ResourceWatcherService resourceWatcherService, ScriptService scriptService, + NamedXContentRegistry xContentRegistry, Environment environment, + NodeEnvironment nodeEnvironment, NamedWriteableRegistry namedWriteableRegistry, + IndexNameExpressionResolver indexNameExpressionResolver) { + clusterService.addStateApplier(event -> { + assert threadPool.getThreadContext().isSystemContext(); + }); + clusterService.addListener(event -> { + assert threadPool.getThreadContext().isSystemContext(); + }); + return Collections.emptyList(); + } +} diff --git a/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java index 3116792c502d6..4a482d6d412cf 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java @@ -72,6 +72,7 @@ import org.elasticsearch.cluster.routing.UnassignedInfo; import org.elasticsearch.cluster.routing.allocation.DiskThresholdSettings; import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider; +import org.elasticsearch.cluster.service.ClusterApplierAssertionPlugin; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Priority; @@ -1784,6 +1785,9 @@ protected Collection> getMockPlugins() { if (randomBoolean()) { mocks.add(MockFieldFilterPlugin.class); } + if (randomBoolean()) { + mocks.add(ClusterApplierAssertionPlugin.class); + } } if (addMockTransportService()) { From 52163d6598ec1f65a75b9d06736cc697adff7dc9 Mon Sep 17 00:00:00 2001 From: David Turner Date: Thu, 19 Mar 2020 10:04:18 +0000 Subject: [PATCH 4/7] Use the same threadpool everywhere --- .../cluster/coordination/NodeJoinTests.java | 2 +- .../snapshots/SnapshotResiliencyTests.java | 6 ++-- .../AbstractCoordinatorTestCase.java | 30 +++++++++---------- .../MockSinglePrioritizingExecutor.java | 5 ++-- .../service/FakeThreadPoolMasterService.java | 16 +++------- .../MockSinglePrioritizingExecutorTests.java | 2 +- .../FakeThreadPoolMasterServiceTests.java | 10 ++++++- 7 files changed, 35 insertions(+), 36 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/NodeJoinTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/NodeJoinTests.java index ac3075597dc57..4ec05ea0fc955 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/NodeJoinTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/NodeJoinTests.java @@ -123,7 +123,7 @@ private void setupFakeMasterServiceAndCoordinator(long term, ClusterState initia deterministicTaskQueue = new DeterministicTaskQueue(Settings.builder().put(Node.NODE_NAME_SETTING.getKey(), "test").build(), random()); FakeThreadPoolMasterService fakeMasterService = new FakeThreadPoolMasterService("test_node","test", - deterministicTaskQueue::scheduleNow); + threadPool, deterministicTaskQueue::scheduleNow); setupMasterServiceAndCoordinator(term, initialState, fakeMasterService, deterministicTaskQueue.getThreadPool(), Randomness.get()); fakeMasterService.setClusterStatePublisher((event, publishListener, ackListener) -> { coordinator.handlePublishRequest(new PublishRequest(event.state())); diff --git a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java index 062e64ba55b74..aae5ea3ffd400 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java @@ -1163,15 +1163,15 @@ private final class TestClusterNode { TestClusterNode(DiscoveryNode node) throws IOException { this.node = node; final Environment environment = createEnvironment(node.getName()); - masterService = new FakeThreadPoolMasterService(node.getName(), "test", deterministicTaskQueue::scheduleNow); + threadPool = deterministicTaskQueue.getThreadPool(); + masterService = new FakeThreadPoolMasterService(node.getName(), "test", threadPool, deterministicTaskQueue::scheduleNow); final Settings settings = environment.settings(); final ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); - threadPool = deterministicTaskQueue.getThreadPool(); clusterService = new ClusterService(settings, clusterSettings, masterService, new ClusterApplierService(node.getName(), settings, clusterSettings, threadPool) { @Override protected PrioritizedEsThreadPoolExecutor createThreadPoolExecutor() { - return new MockSinglePrioritizingExecutor(node.getName(), deterministicTaskQueue); + return new MockSinglePrioritizingExecutor(node.getName(), deterministicTaskQueue, threadPool); } @Override diff --git a/test/framework/src/main/java/org/elasticsearch/cluster/coordination/AbstractCoordinatorTestCase.java b/test/framework/src/main/java/org/elasticsearch/cluster/coordination/AbstractCoordinatorTestCase.java index 5239827f2b171..8b40ff3454d56 100644 --- a/test/framework/src/main/java/org/elasticsearch/cluster/coordination/AbstractCoordinatorTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/cluster/coordination/AbstractCoordinatorTestCase.java @@ -58,6 +58,7 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor; +import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.discovery.DiscoveryModule; import org.elasticsearch.discovery.SeedHostsProvider; import org.elasticsearch.env.NodeEnvironment; @@ -911,6 +912,7 @@ class ClusterNode { } private void setUp() { + final ThreadPool threadPool = deterministicTaskQueue.getThreadPool(this::onNode); mockTransport = new DisruptableMockTransport(localNode, logger) { @Override protected void execute(Runnable runnable) { @@ -928,24 +930,20 @@ protected Optional getDisruptableMockTransport(Transpo .filter(transport -> transport.getLocalNode().getAddress().equals(address)).findAny(); } }; - final Settings settings = nodeSettings.hasValue(DiscoveryModule.DISCOVERY_TYPE_SETTING.getKey()) ? nodeSettings : Settings.builder().put(nodeSettings) .putList(ClusterBootstrapService.INITIAL_MASTER_NODES_SETTING.getKey(), ClusterBootstrapService.INITIAL_MASTER_NODES_SETTING.get(Settings.EMPTY)).build(); // suppress auto-bootstrap - transportService = mockTransport.createTransportService( - settings, deterministicTaskQueue.getThreadPool(this::onNode), - getTransportInterceptor(localNode, deterministicTaskQueue.getThreadPool(this::onNode)), - a -> localNode, null, emptySet()); - masterService = new AckedFakeThreadPoolMasterService(localNode.getId(), "test", + transportService = mockTransport.createTransportService(settings, threadPool, + getTransportInterceptor(localNode, threadPool), a -> localNode, null, emptySet()); + masterService = new AckedFakeThreadPoolMasterService(localNode.getId(), "test", threadPool, runnable -> deterministicTaskQueue.scheduleNow(onNode(runnable))); final ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); clusterApplierService = new DisruptableClusterApplierService(localNode.getId(), settings, clusterSettings, - deterministicTaskQueue, this::onNode); + deterministicTaskQueue, threadPool); clusterService = new ClusterService(settings, clusterSettings, masterService, clusterApplierService); clusterService.setNodeConnectionsService( - new NodeConnectionsService(clusterService.getSettings(), deterministicTaskQueue.getThreadPool(this::onNode), - transportService)); + new NodeConnectionsService(clusterService.getSettings(), threadPool, transportService)); final Collection> onJoinValidators = Collections.singletonList((dn, cs) -> extraJoinValidators.forEach(validator -> validator.accept(dn, cs))); final AllocationService allocationService = ESAllocationTestCase.createAllocationService(Settings.EMPTY); @@ -954,8 +952,8 @@ protected Optional getDisruptableMockTransport(Transpo Cluster.this::provideSeedHosts, clusterApplierService, onJoinValidators, Randomness.get(), (s, p, r) -> {}, getElectionStrategy()); masterService.setClusterStatePublisher(coordinator); - final GatewayService gatewayService = new GatewayService(settings, allocationService, clusterService, - deterministicTaskQueue.getThreadPool(this::onNode), coordinator, null); + final GatewayService gatewayService + = new GatewayService(settings, allocationService, clusterService, threadPool, coordinator, null); logger.trace("starting up [{}]", localNode); transportService.start(); @@ -1292,8 +1290,8 @@ static class AckedFakeThreadPoolMasterService extends FakeThreadPoolMasterServic AckCollector nextAckCollector = new AckCollector(); - AckedFakeThreadPoolMasterService(String nodeName, String serviceName, Consumer onTaskAvailableToRun) { - super(nodeName, serviceName, onTaskAvailableToRun); + AckedFakeThreadPoolMasterService(String nodeName, String serviceName, ThreadPool threadPool, Consumer onTaskAvailableToRun) { + super(nodeName, serviceName, threadPool, onTaskAvailableToRun); } @Override @@ -1323,8 +1321,8 @@ static class DisruptableClusterApplierService extends ClusterApplierService { private boolean applicationMayFail; DisruptableClusterApplierService(String nodeName, Settings settings, ClusterSettings clusterSettings, - DeterministicTaskQueue deterministicTaskQueue, Function runnableWrapper) { - super(nodeName, settings, clusterSettings, deterministicTaskQueue.getThreadPool(runnableWrapper)); + DeterministicTaskQueue deterministicTaskQueue, ThreadPool threadPool) { + super(nodeName, settings, clusterSettings, threadPool); this.nodeName = nodeName; this.deterministicTaskQueue = deterministicTaskQueue; addStateApplier(event -> { @@ -1344,7 +1342,7 @@ static class DisruptableClusterApplierService extends ClusterApplierService { @Override protected PrioritizedEsThreadPoolExecutor createThreadPoolExecutor() { - return new MockSinglePrioritizingExecutor(nodeName, deterministicTaskQueue); + return new MockSinglePrioritizingExecutor(nodeName, deterministicTaskQueue, threadPool); } @Override diff --git a/test/framework/src/main/java/org/elasticsearch/cluster/coordination/MockSinglePrioritizingExecutor.java b/test/framework/src/main/java/org/elasticsearch/cluster/coordination/MockSinglePrioritizingExecutor.java index add5014e555fc..553188ac24539 100644 --- a/test/framework/src/main/java/org/elasticsearch/cluster/coordination/MockSinglePrioritizingExecutor.java +++ b/test/framework/src/main/java/org/elasticsearch/cluster/coordination/MockSinglePrioritizingExecutor.java @@ -20,6 +20,7 @@ import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor; +import org.elasticsearch.threadpool.ThreadPool; import java.util.concurrent.TimeUnit; @@ -29,7 +30,7 @@ */ public class MockSinglePrioritizingExecutor extends PrioritizedEsThreadPoolExecutor { - public MockSinglePrioritizingExecutor(String name, DeterministicTaskQueue deterministicTaskQueue) { + public MockSinglePrioritizingExecutor(String name, DeterministicTaskQueue deterministicTaskQueue, ThreadPool threadPool) { super(name, 0, 1, 0L, TimeUnit.MILLISECONDS, r -> new Thread() { @Override @@ -51,7 +52,7 @@ public String toString() { }); } }, - deterministicTaskQueue.getThreadPool().getThreadContext(), deterministicTaskQueue.getThreadPool().scheduler()); + threadPool.getThreadContext(), threadPool.scheduler()); } @Override diff --git a/test/framework/src/main/java/org/elasticsearch/cluster/service/FakeThreadPoolMasterService.java b/test/framework/src/main/java/org/elasticsearch/cluster/service/FakeThreadPoolMasterService.java index 179a8dcb9848d..100662f24a6ab 100644 --- a/test/framework/src/main/java/org/elasticsearch/cluster/service/FakeThreadPoolMasterService.java +++ b/test/framework/src/main/java/org/elasticsearch/cluster/service/FakeThreadPoolMasterService.java @@ -41,8 +41,6 @@ import static org.apache.lucene.util.LuceneTestCase.random; import static org.elasticsearch.test.ESTestCase.randomInt; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; public class FakeThreadPoolMasterService extends MasterService { private static final Logger logger = LogManager.getLogger(FakeThreadPoolMasterService.class); @@ -54,21 +52,14 @@ public class FakeThreadPoolMasterService extends MasterService { private boolean taskInProgress = false; private boolean waitForPublish = false; - public FakeThreadPoolMasterService(String nodeName, String serviceName, Consumer onTaskAvailableToRun) { + public FakeThreadPoolMasterService(String nodeName, String serviceName, ThreadPool threadPool, + Consumer onTaskAvailableToRun) { super(Settings.builder().put(Node.NODE_NAME_SETTING.getKey(), nodeName).build(), - new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), - createMockThreadPool()); + new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), threadPool); this.name = serviceName; this.onTaskAvailableToRun = onTaskAvailableToRun; } - private static ThreadPool createMockThreadPool() { - final ThreadContext context = new ThreadContext(Settings.EMPTY); - final ThreadPool mockThreadPool = mock(ThreadPool.class); - when(mockThreadPool.getThreadContext()).thenReturn(context); - return mockThreadPool; - } - @Override protected PrioritizedEsThreadPoolExecutor createThreadPoolExecutor() { return new PrioritizedEsThreadPoolExecutor(name, 1, 1, 1, TimeUnit.SECONDS, EsExecutors.daemonThreadFactory(name), @@ -172,4 +163,5 @@ public void onFailure(Exception e) { protected AckListener wrapAckListener(AckListener ackListener) { return ackListener; } + } diff --git a/test/framework/src/test/java/org/elasticsearch/cluster/coordination/MockSinglePrioritizingExecutorTests.java b/test/framework/src/test/java/org/elasticsearch/cluster/coordination/MockSinglePrioritizingExecutorTests.java index 427f0d32c1270..66424de7f75d6 100644 --- a/test/framework/src/test/java/org/elasticsearch/cluster/coordination/MockSinglePrioritizingExecutorTests.java +++ b/test/framework/src/test/java/org/elasticsearch/cluster/coordination/MockSinglePrioritizingExecutorTests.java @@ -29,7 +29,7 @@ public class MockSinglePrioritizingExecutorTests extends ESTestCase { public void testPrioritizedEsThreadPoolExecutor() { final DeterministicTaskQueue taskQueue = DeterministicTaskQueueTests.newTaskQueue(); - final PrioritizedEsThreadPoolExecutor executor = new MockSinglePrioritizingExecutor("test", taskQueue); + final PrioritizedEsThreadPoolExecutor executor = new MockSinglePrioritizingExecutor("test", taskQueue, taskQueue.getThreadPool()); final AtomicBoolean called1 = new AtomicBoolean(); final AtomicBoolean called2 = new AtomicBoolean(); executor.execute(new PrioritizedRunnable(Priority.NORMAL) { diff --git a/test/framework/src/test/java/org/elasticsearch/cluster/service/FakeThreadPoolMasterServiceTests.java b/test/framework/src/test/java/org/elasticsearch/cluster/service/FakeThreadPoolMasterServiceTests.java index f97477707e461..c3e162254421b 100644 --- a/test/framework/src/test/java/org/elasticsearch/cluster/service/FakeThreadPoolMasterServiceTests.java +++ b/test/framework/src/test/java/org/elasticsearch/cluster/service/FakeThreadPoolMasterServiceTests.java @@ -27,7 +27,10 @@ import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodeRole; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.ThreadPool; import java.util.ArrayList; import java.util.Collections; @@ -37,6 +40,8 @@ import java.util.concurrent.atomic.AtomicReference; import static org.hamcrest.Matchers.equalTo; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; public class FakeThreadPoolMasterServiceTests extends ESTestCase { @@ -48,7 +53,10 @@ public void testFakeMasterService() { lastClusterStateRef.set(ClusterStateCreationUtils.state(discoveryNode, discoveryNode)); long firstClusterStateVersion = lastClusterStateRef.get().version(); AtomicReference> publishingCallback = new AtomicReference<>(); - FakeThreadPoolMasterService masterService = new FakeThreadPoolMasterService("test_node","test", runnableTasks::add); + final ThreadContext context = new ThreadContext(Settings.EMPTY); + final ThreadPool mockThreadPool = mock(ThreadPool.class); + when(mockThreadPool.getThreadContext()).thenReturn(context); + FakeThreadPoolMasterService masterService = new FakeThreadPoolMasterService("test_node","test", mockThreadPool, runnableTasks::add); masterService.setClusterStateSupplier(lastClusterStateRef::get); masterService.setClusterStatePublisher((event, publishListener, ackListener) -> { lastClusterStateRef.set(event.state()); From e98b2cfe895208ea572df56bf95a17192f59129b Mon Sep 17 00:00:00 2001 From: David Turner Date: Thu, 19 Mar 2020 10:11:47 +0000 Subject: [PATCH 5/7] More threadpool consistency --- .../org/elasticsearch/snapshots/SnapshotResiliencyTests.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java index aae5ea3ffd400..2139268fe3888 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java @@ -1163,7 +1163,7 @@ private final class TestClusterNode { TestClusterNode(DiscoveryNode node) throws IOException { this.node = node; final Environment environment = createEnvironment(node.getName()); - threadPool = deterministicTaskQueue.getThreadPool(); + threadPool = deterministicTaskQueue.getThreadPool(runnable -> CoordinatorTests.onNodeLog(node, runnable)); masterService = new FakeThreadPoolMasterService(node.getName(), "test", threadPool, deterministicTaskQueue::scheduleNow); final Settings settings = environment.settings(); final ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); @@ -1211,7 +1211,7 @@ protected NamedWriteableRegistry writeableRegistry() { } }; transportService = mockTransport.createTransportService( - settings, deterministicTaskQueue.getThreadPool(runnable -> CoordinatorTests.onNodeLog(node, runnable)), + settings, threadPool, new TransportInterceptor() { @Override public TransportRequestHandler interceptHandler(String action, String executor, From 561a7f8d780a547fce46d153f593cbac4cdb299a Mon Sep 17 00:00:00 2001 From: David Turner Date: Thu, 19 Mar 2020 10:24:51 +0000 Subject: [PATCH 6/7] Precommit --- .../cluster/coordination/AbstractCoordinatorTestCase.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/test/framework/src/main/java/org/elasticsearch/cluster/coordination/AbstractCoordinatorTestCase.java b/test/framework/src/main/java/org/elasticsearch/cluster/coordination/AbstractCoordinatorTestCase.java index 8b40ff3454d56..491bd7d858091 100644 --- a/test/framework/src/main/java/org/elasticsearch/cluster/coordination/AbstractCoordinatorTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/cluster/coordination/AbstractCoordinatorTestCase.java @@ -58,7 +58,6 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor; -import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.discovery.DiscoveryModule; import org.elasticsearch.discovery.SeedHostsProvider; import org.elasticsearch.env.NodeEnvironment; @@ -1290,7 +1289,8 @@ static class AckedFakeThreadPoolMasterService extends FakeThreadPoolMasterServic AckCollector nextAckCollector = new AckCollector(); - AckedFakeThreadPoolMasterService(String nodeName, String serviceName, ThreadPool threadPool, Consumer onTaskAvailableToRun) { + AckedFakeThreadPoolMasterService(String nodeName, String serviceName, ThreadPool threadPool, + Consumer onTaskAvailableToRun) { super(nodeName, serviceName, threadPool, onTaskAvailableToRun); } From 897a0a13ffbe392c537c93a77b46909fdbdb505e Mon Sep 17 00:00:00 2001 From: David Turner Date: Thu, 19 Mar 2020 10:34:42 +0000 Subject: [PATCH 7/7] Apply cluster states in system context Today cluster states are sometimes (rarely) applied in the default context rather than system context, which means that any appliers which capture their contexts cannot do things like remote transport actions when security is enabled. There are at least two ways that we end up applying the cluster state in the default context: 1. locally applying a cluster state that indicates that the master has failed 2. the elected master times out while waiting for a response from another node This commit ensures that cluster states are always applied in the system context. Mitigates #53751 --- .../elasticsearch/cluster/coordination/NodeJoinTests.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/NodeJoinTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/NodeJoinTests.java index 4ec05ea0fc955..6557d647c75fa 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/NodeJoinTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/NodeJoinTests.java @@ -122,9 +122,10 @@ private static ClusterState initialState(DiscoveryNode localNode, long term, lon private void setupFakeMasterServiceAndCoordinator(long term, ClusterState initialState) { deterministicTaskQueue = new DeterministicTaskQueue(Settings.builder().put(Node.NODE_NAME_SETTING.getKey(), "test").build(), random()); + final ThreadPool fakeThreadPool = deterministicTaskQueue.getThreadPool(); FakeThreadPoolMasterService fakeMasterService = new FakeThreadPoolMasterService("test_node","test", - threadPool, deterministicTaskQueue::scheduleNow); - setupMasterServiceAndCoordinator(term, initialState, fakeMasterService, deterministicTaskQueue.getThreadPool(), Randomness.get()); + fakeThreadPool, deterministicTaskQueue::scheduleNow); + setupMasterServiceAndCoordinator(term, initialState, fakeMasterService, fakeThreadPool, Randomness.get()); fakeMasterService.setClusterStatePublisher((event, publishListener, ackListener) -> { coordinator.handlePublishRequest(new PublishRequest(event.state())); publishListener.onResponse(null);