diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/PublicationTransportHandler.java b/server/src/main/java/org/elasticsearch/cluster/coordination/PublicationTransportHandler.java index 82bdb495722ca..87a52208e0757 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/PublicationTransportHandler.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/PublicationTransportHandler.java @@ -185,6 +185,7 @@ public PublicationContext newPublicationContext(ClusterChangedEvent clusterChang public void sendPublishRequest(DiscoveryNode destination, PublishRequest publishRequest, ActionListener originalListener) { assert publishRequest.getAcceptedState() == clusterChangedEvent.state() : "state got switched on us"; + assert transportService.getThreadPool().getThreadContext().isSystemContext(); final ActionListener responseActionListener; if (destination.equals(nodes.getLocalNode())) { // if publishing to self, use original request instead (see currentPublishRequestToSelf for explanation) @@ -221,6 +222,7 @@ public void onFailure(Exception e) { @Override public void sendApplyCommit(DiscoveryNode destination, ApplyCommitRequest applyCommitRequest, ActionListener responseActionListener) { + assert transportService.getThreadPool().getThreadContext().isSystemContext(); final String actionName; final TransportRequest transportRequest; if (Coordinator.isZen1Node(destination)) { diff --git a/server/src/main/java/org/elasticsearch/cluster/service/ClusterApplierService.java b/server/src/main/java/org/elasticsearch/cluster/service/ClusterApplierService.java index f5bbe2d420be9..5a1929961f219 100644 --- a/server/src/main/java/org/elasticsearch/cluster/service/ClusterApplierService.java +++ b/server/src/main/java/org/elasticsearch/cluster/service/ClusterApplierService.java @@ -46,6 +46,7 @@ import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor; +import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.common.util.iterable.Iterables; import org.elasticsearch.threadpool.Scheduler; import org.elasticsearch.threadpool.ThreadPool; @@ -55,6 +56,7 @@ import java.util.Collections; import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.Objects; import java.util.Queue; import java.util.concurrent.ConcurrentHashMap; @@ -347,7 +349,18 @@ private void submitStateUpdateTask(final String source, final ClusterStateTaskCo if (!lifecycle.started()) { return; } - try { + + final ThreadContext threadContext = threadPool.getThreadContext(); + final Map> responseHeaders = threadContext.getResponseHeaders(); + try (ThreadContext.StoredContext ignored = threadContext.stashContext()) { + threadContext.markAsSystemContext(); + // copy any accumulated response headers (e.g. deprecation warnings) since we may send responses from cluster state listeners + for (Map.Entry> responseHeader : responseHeaders.entrySet()) { + for (String value : responseHeader.getValue()) { + threadContext.addResponseHeader(responseHeader.getKey(), value); + } + } + UpdateTask updateTask = new UpdateTask(config.priority(), source, new SafeClusterApplyListener(listener, logger), executor); if (config.timeout() != null) { threadPoolExecutor.execute(updateTask, config.timeout(), diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/NodeJoinTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/NodeJoinTests.java index 1ea352fe4e3a1..76316445a33d2 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/NodeJoinTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/NodeJoinTests.java @@ -122,9 +122,10 @@ private static ClusterState initialState(DiscoveryNode localNode, long term, lon private void setupFakeMasterServiceAndCoordinator(long term, ClusterState initialState) { deterministicTaskQueue = new DeterministicTaskQueue(Settings.builder().put(Node.NODE_NAME_SETTING.getKey(), "test").build(), random()); + final ThreadPool fakeThreadPool = deterministicTaskQueue.getThreadPool(); FakeThreadPoolMasterService fakeMasterService = new FakeThreadPoolMasterService("test_node","test", - deterministicTaskQueue::scheduleNow); - setupMasterServiceAndCoordinator(term, initialState, fakeMasterService, deterministicTaskQueue.getThreadPool(), Randomness.get()); + fakeThreadPool, deterministicTaskQueue::scheduleNow); + setupMasterServiceAndCoordinator(term, initialState, fakeMasterService, fakeThreadPool, Randomness.get()); fakeMasterService.setClusterStatePublisher((event, publishListener, ackListener) -> { coordinator.handlePublishRequest(new PublishRequest(event.state())); publishListener.onResponse(null); diff --git a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java index eba98c92e2028..a7b3cdfb3474a 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java @@ -1167,15 +1167,15 @@ private final class TestClusterNode { TestClusterNode(DiscoveryNode node) throws IOException { this.node = node; final Environment environment = createEnvironment(node.getName()); - masterService = new FakeThreadPoolMasterService(node.getName(), "test", deterministicTaskQueue::scheduleNow); + threadPool = deterministicTaskQueue.getThreadPool(runnable -> CoordinatorTests.onNodeLog(node, runnable)); + masterService = new FakeThreadPoolMasterService(node.getName(), "test", threadPool, deterministicTaskQueue::scheduleNow); final Settings settings = environment.settings(); final ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); - threadPool = deterministicTaskQueue.getThreadPool(); clusterService = new ClusterService(settings, clusterSettings, masterService, new ClusterApplierService(node.getName(), settings, clusterSettings, threadPool) { @Override protected PrioritizedEsThreadPoolExecutor createThreadPoolExecutor() { - return new MockSinglePrioritizingExecutor(node.getName(), deterministicTaskQueue); + return new MockSinglePrioritizingExecutor(node.getName(), deterministicTaskQueue, threadPool); } @Override @@ -1215,7 +1215,7 @@ protected NamedWriteableRegistry writeableRegistry() { } }; transportService = mockTransport.createTransportService( - settings, deterministicTaskQueue.getThreadPool(runnable -> CoordinatorTests.onNodeLog(node, runnable)), + settings, threadPool, new TransportInterceptor() { @Override public TransportRequestHandler interceptHandler(String action, String executor, diff --git a/test/framework/src/main/java/org/elasticsearch/cluster/coordination/AbstractCoordinatorTestCase.java b/test/framework/src/main/java/org/elasticsearch/cluster/coordination/AbstractCoordinatorTestCase.java index ce202e9a655f9..115f60fc23c87 100644 --- a/test/framework/src/main/java/org/elasticsearch/cluster/coordination/AbstractCoordinatorTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/cluster/coordination/AbstractCoordinatorTestCase.java @@ -911,6 +911,7 @@ class ClusterNode { } private void setUp() { + final ThreadPool threadPool = deterministicTaskQueue.getThreadPool(this::onNode); mockTransport = new DisruptableMockTransport(localNode, logger) { @Override protected void execute(Runnable runnable) { @@ -928,24 +929,20 @@ protected Optional getDisruptableMockTransport(Transpo .filter(transport -> transport.getLocalNode().getAddress().equals(address)).findAny(); } }; - final Settings settings = nodeSettings.hasValue(DiscoveryModule.DISCOVERY_TYPE_SETTING.getKey()) ? nodeSettings : Settings.builder().put(nodeSettings) .putList(ClusterBootstrapService.INITIAL_MASTER_NODES_SETTING.getKey(), ClusterBootstrapService.INITIAL_MASTER_NODES_SETTING.get(Settings.EMPTY)).build(); // suppress auto-bootstrap - transportService = mockTransport.createTransportService( - settings, deterministicTaskQueue.getThreadPool(this::onNode), - getTransportInterceptor(localNode, deterministicTaskQueue.getThreadPool(this::onNode)), - a -> localNode, null, emptySet()); - masterService = new AckedFakeThreadPoolMasterService(localNode.getId(), "test", + transportService = mockTransport.createTransportService(settings, threadPool, + getTransportInterceptor(localNode, threadPool), a -> localNode, null, emptySet()); + masterService = new AckedFakeThreadPoolMasterService(localNode.getId(), "test", threadPool, runnable -> deterministicTaskQueue.scheduleNow(onNode(runnable))); final ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); clusterApplierService = new DisruptableClusterApplierService(localNode.getId(), settings, clusterSettings, - deterministicTaskQueue, this::onNode); + deterministicTaskQueue, threadPool); clusterService = new ClusterService(settings, clusterSettings, masterService, clusterApplierService); clusterService.setNodeConnectionsService( - new NodeConnectionsService(clusterService.getSettings(), deterministicTaskQueue.getThreadPool(this::onNode), - transportService)); + new NodeConnectionsService(clusterService.getSettings(), threadPool, transportService)); final Collection> onJoinValidators = Collections.singletonList((dn, cs) -> extraJoinValidators.forEach(validator -> validator.accept(dn, cs))); final AllocationService allocationService = ESAllocationTestCase.createAllocationService(Settings.EMPTY); @@ -955,7 +952,7 @@ protected Optional getDisruptableMockTransport(Transpo getElectionStrategy()); masterService.setClusterStatePublisher(coordinator); final GatewayService gatewayService = new GatewayService(settings, allocationService, clusterService, - deterministicTaskQueue.getThreadPool(this::onNode), null, coordinator); + threadPool, null, coordinator); logger.trace("starting up [{}]", localNode); transportService.start(); @@ -1292,8 +1289,9 @@ static class AckedFakeThreadPoolMasterService extends FakeThreadPoolMasterServic AckCollector nextAckCollector = new AckCollector(); - AckedFakeThreadPoolMasterService(String nodeName, String serviceName, Consumer onTaskAvailableToRun) { - super(nodeName, serviceName, onTaskAvailableToRun); + AckedFakeThreadPoolMasterService(String nodeName, String serviceName, ThreadPool threadPool, + Consumer onTaskAvailableToRun) { + super(nodeName, serviceName, threadPool, onTaskAvailableToRun); } @Override @@ -1323,8 +1321,8 @@ static class DisruptableClusterApplierService extends ClusterApplierService { private boolean applicationMayFail; DisruptableClusterApplierService(String nodeName, Settings settings, ClusterSettings clusterSettings, - DeterministicTaskQueue deterministicTaskQueue, Function runnableWrapper) { - super(nodeName, settings, clusterSettings, deterministicTaskQueue.getThreadPool(runnableWrapper)); + DeterministicTaskQueue deterministicTaskQueue, ThreadPool threadPool) { + super(nodeName, settings, clusterSettings, threadPool); this.nodeName = nodeName; this.deterministicTaskQueue = deterministicTaskQueue; addStateApplier(event -> { @@ -1344,7 +1342,7 @@ static class DisruptableClusterApplierService extends ClusterApplierService { @Override protected PrioritizedEsThreadPoolExecutor createThreadPoolExecutor() { - return new MockSinglePrioritizingExecutor(nodeName, deterministicTaskQueue); + return new MockSinglePrioritizingExecutor(nodeName, deterministicTaskQueue, threadPool); } @Override diff --git a/test/framework/src/main/java/org/elasticsearch/cluster/coordination/MockSinglePrioritizingExecutor.java b/test/framework/src/main/java/org/elasticsearch/cluster/coordination/MockSinglePrioritizingExecutor.java index add5014e555fc..553188ac24539 100644 --- a/test/framework/src/main/java/org/elasticsearch/cluster/coordination/MockSinglePrioritizingExecutor.java +++ b/test/framework/src/main/java/org/elasticsearch/cluster/coordination/MockSinglePrioritizingExecutor.java @@ -20,6 +20,7 @@ import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor; +import org.elasticsearch.threadpool.ThreadPool; import java.util.concurrent.TimeUnit; @@ -29,7 +30,7 @@ */ public class MockSinglePrioritizingExecutor extends PrioritizedEsThreadPoolExecutor { - public MockSinglePrioritizingExecutor(String name, DeterministicTaskQueue deterministicTaskQueue) { + public MockSinglePrioritizingExecutor(String name, DeterministicTaskQueue deterministicTaskQueue, ThreadPool threadPool) { super(name, 0, 1, 0L, TimeUnit.MILLISECONDS, r -> new Thread() { @Override @@ -51,7 +52,7 @@ public String toString() { }); } }, - deterministicTaskQueue.getThreadPool().getThreadContext(), deterministicTaskQueue.getThreadPool().scheduler()); + threadPool.getThreadContext(), threadPool.scheduler()); } @Override diff --git a/test/framework/src/main/java/org/elasticsearch/cluster/service/ClusterApplierAssertionPlugin.java b/test/framework/src/main/java/org/elasticsearch/cluster/service/ClusterApplierAssertionPlugin.java new file mode 100644 index 0000000000000..b44760ae37195 --- /dev/null +++ b/test/framework/src/main/java/org/elasticsearch/cluster/service/ClusterApplierAssertionPlugin.java @@ -0,0 +1,50 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.cluster.service; + +import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.common.io.stream.NamedWriteableRegistry; +import org.elasticsearch.common.xcontent.NamedXContentRegistry; +import org.elasticsearch.env.Environment; +import org.elasticsearch.env.NodeEnvironment; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.script.ScriptService; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.watcher.ResourceWatcherService; + +import java.util.Collection; +import java.util.Collections; + +public class ClusterApplierAssertionPlugin extends Plugin { + @Override + public Collection createComponents(Client client, ClusterService clusterService, ThreadPool threadPool, + ResourceWatcherService resourceWatcherService, ScriptService scriptService, + NamedXContentRegistry xContentRegistry, Environment environment, + NodeEnvironment nodeEnvironment, NamedWriteableRegistry namedWriteableRegistry, + IndexNameExpressionResolver indexNameExpressionResolver) { + clusterService.addStateApplier(event -> { + assert threadPool.getThreadContext().isSystemContext(); + }); + clusterService.addListener(event -> { + assert threadPool.getThreadContext().isSystemContext(); + }); + return Collections.emptyList(); + } +} diff --git a/test/framework/src/main/java/org/elasticsearch/cluster/service/FakeThreadPoolMasterService.java b/test/framework/src/main/java/org/elasticsearch/cluster/service/FakeThreadPoolMasterService.java index 74d9cc17a5325..100662f24a6ab 100644 --- a/test/framework/src/main/java/org/elasticsearch/cluster/service/FakeThreadPoolMasterService.java +++ b/test/framework/src/main/java/org/elasticsearch/cluster/service/FakeThreadPoolMasterService.java @@ -41,8 +41,6 @@ import static org.apache.lucene.util.LuceneTestCase.random; import static org.elasticsearch.test.ESTestCase.randomInt; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; public class FakeThreadPoolMasterService extends MasterService { private static final Logger logger = LogManager.getLogger(FakeThreadPoolMasterService.class); @@ -54,21 +52,14 @@ public class FakeThreadPoolMasterService extends MasterService { private boolean taskInProgress = false; private boolean waitForPublish = false; - public FakeThreadPoolMasterService(String nodeName, String serviceName, Consumer onTaskAvailableToRun) { + public FakeThreadPoolMasterService(String nodeName, String serviceName, ThreadPool threadPool, + Consumer onTaskAvailableToRun) { super(Settings.builder().put(Node.NODE_NAME_SETTING.getKey(), nodeName).build(), - new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), - createMockThreadPool()); + new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), threadPool); this.name = serviceName; this.onTaskAvailableToRun = onTaskAvailableToRun; } - private static ThreadPool createMockThreadPool() { - final ThreadContext context = new ThreadContext(Settings.EMPTY); - final ThreadPool mockThreadPool = mock(ThreadPool.class); - when(mockThreadPool.getThreadContext()).thenReturn(context); - return mockThreadPool; - } - @Override protected PrioritizedEsThreadPoolExecutor createThreadPoolExecutor() { return new PrioritizedEsThreadPoolExecutor(name, 1, 1, 1, TimeUnit.SECONDS, EsExecutors.daemonThreadFactory(name), @@ -110,7 +101,11 @@ public void run() { final Runnable task = pendingTasks.remove(taskIndex); taskInProgress = true; scheduledNextTask = false; - task.run(); + final ThreadContext threadContext = threadPool.getThreadContext(); + try (ThreadContext.StoredContext ignored = threadContext.stashContext()) { + threadContext.markAsSystemContext(); + task.run(); + } if (waitForPublish == false) { taskInProgress = false; } @@ -168,4 +163,5 @@ public void onFailure(Exception e) { protected AckListener wrapAckListener(AckListener ackListener) { return ackListener; } + } diff --git a/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java index fb1c2d4880515..0cf465093ed50 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java @@ -79,6 +79,7 @@ import org.elasticsearch.cluster.routing.UnassignedInfo; import org.elasticsearch.cluster.routing.allocation.DiskThresholdSettings; import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider; +import org.elasticsearch.cluster.service.ClusterApplierAssertionPlugin; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Priority; @@ -1935,6 +1936,9 @@ protected Collection> getMockPlugins() { if (randomBoolean()) { mocks.add(MockFieldFilterPlugin.class); } + if (randomBoolean()) { + mocks.add(ClusterApplierAssertionPlugin.class); + } } if (addMockTransportService()) { diff --git a/test/framework/src/test/java/org/elasticsearch/cluster/coordination/MockSinglePrioritizingExecutorTests.java b/test/framework/src/test/java/org/elasticsearch/cluster/coordination/MockSinglePrioritizingExecutorTests.java index 427f0d32c1270..66424de7f75d6 100644 --- a/test/framework/src/test/java/org/elasticsearch/cluster/coordination/MockSinglePrioritizingExecutorTests.java +++ b/test/framework/src/test/java/org/elasticsearch/cluster/coordination/MockSinglePrioritizingExecutorTests.java @@ -29,7 +29,7 @@ public class MockSinglePrioritizingExecutorTests extends ESTestCase { public void testPrioritizedEsThreadPoolExecutor() { final DeterministicTaskQueue taskQueue = DeterministicTaskQueueTests.newTaskQueue(); - final PrioritizedEsThreadPoolExecutor executor = new MockSinglePrioritizingExecutor("test", taskQueue); + final PrioritizedEsThreadPoolExecutor executor = new MockSinglePrioritizingExecutor("test", taskQueue, taskQueue.getThreadPool()); final AtomicBoolean called1 = new AtomicBoolean(); final AtomicBoolean called2 = new AtomicBoolean(); executor.execute(new PrioritizedRunnable(Priority.NORMAL) { diff --git a/test/framework/src/test/java/org/elasticsearch/cluster/service/FakeThreadPoolMasterServiceTests.java b/test/framework/src/test/java/org/elasticsearch/cluster/service/FakeThreadPoolMasterServiceTests.java index f97477707e461..c3e162254421b 100644 --- a/test/framework/src/test/java/org/elasticsearch/cluster/service/FakeThreadPoolMasterServiceTests.java +++ b/test/framework/src/test/java/org/elasticsearch/cluster/service/FakeThreadPoolMasterServiceTests.java @@ -27,7 +27,10 @@ import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodeRole; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.ThreadPool; import java.util.ArrayList; import java.util.Collections; @@ -37,6 +40,8 @@ import java.util.concurrent.atomic.AtomicReference; import static org.hamcrest.Matchers.equalTo; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; public class FakeThreadPoolMasterServiceTests extends ESTestCase { @@ -48,7 +53,10 @@ public void testFakeMasterService() { lastClusterStateRef.set(ClusterStateCreationUtils.state(discoveryNode, discoveryNode)); long firstClusterStateVersion = lastClusterStateRef.get().version(); AtomicReference> publishingCallback = new AtomicReference<>(); - FakeThreadPoolMasterService masterService = new FakeThreadPoolMasterService("test_node","test", runnableTasks::add); + final ThreadContext context = new ThreadContext(Settings.EMPTY); + final ThreadPool mockThreadPool = mock(ThreadPool.class); + when(mockThreadPool.getThreadContext()).thenReturn(context); + FakeThreadPoolMasterService masterService = new FakeThreadPoolMasterService("test_node","test", mockThreadPool, runnableTasks::add); masterService.setClusterStateSupplier(lastClusterStateRef::get); masterService.setClusterStatePublisher((event, publishListener, ackListener) -> { lastClusterStateRef.set(event.state());