diff --git a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/transport/actions/put/TransportPutWatchAction.java b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/transport/actions/put/TransportPutWatchAction.java index 7dcca20e2019e..17495bcad1ceb 100644 --- a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/transport/actions/put/TransportPutWatchAction.java +++ b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/transport/actions/put/TransportPutWatchAction.java @@ -63,6 +63,7 @@ public class TransportPutWatchAction extends WatcherTransportActionwrap(response -> { boolean created = response.getResult() == DocWriteResponse.Result.CREATED; - if (localExecute(request) == false && watch.status().state().isActive()) { + // if not yet in distributed mode (mixed 5/6 version in cluster), only trigger on the master node + if (localExecute(request) == false && + this.clusterService.state().nodes().isLocalNodeElectedMaster() && + watch.status().state().isActive()) { triggerService.add(watch); } listener.onResponse(new PutWatchResponse(response.getId(), response.getVersion(), created)); diff --git a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/transport/actions/put/TransportPutWatchActionTests.java b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/transport/actions/put/TransportPutWatchActionTests.java index ce223b1c9fd15..0844f573cc041 100644 --- a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/transport/actions/put/TransportPutWatchActionTests.java +++ b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/transport/actions/put/TransportPutWatchActionTests.java @@ -5,13 +5,18 @@ */ package org.elasticsearch.xpack.watcher.transport.actions.put; +import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.index.IndexRequest; -import org.elasticsearch.action.index.IndexResponse; +import org.elasticsearch.action.DocWriteResponse; import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.update.UpdateRequest; +import org.elasticsearch.action.update.UpdateResponse; import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.ThreadContext; @@ -23,17 +28,23 @@ import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.core.ClientHelper; import org.elasticsearch.xpack.core.watcher.transport.actions.put.PutWatchRequest; +import org.elasticsearch.xpack.core.watcher.transport.actions.put.PutWatchResponse; import org.elasticsearch.xpack.core.watcher.watch.ClockMock; import org.elasticsearch.xpack.core.watcher.watch.Watch; +import org.elasticsearch.xpack.core.watcher.watch.WatchStatus; import org.elasticsearch.xpack.watcher.test.WatchExecutionContextMockBuilder; import org.elasticsearch.xpack.watcher.trigger.TriggerService; import org.elasticsearch.xpack.watcher.watch.WatchParser; +import org.joda.time.DateTime; +import org.joda.time.DateTimeZone; import org.junit.Before; import org.mockito.ArgumentCaptor; import java.util.Collections; +import java.util.HashSet; import java.util.Map; +import static java.util.Arrays.asList; import static org.hamcrest.Matchers.hasKey; import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.is; @@ -45,18 +56,20 @@ import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyZeroInteractions; import static org.mockito.Mockito.when; public class TransportPutWatchActionTests extends ESTestCase { private TransportPutWatchAction action; - private Watch watch = new WatchExecutionContextMockBuilder("_id").buildMock().watch(); - private ThreadContext threadContext = new ThreadContext(Settings.EMPTY); + private final Watch watch = new WatchExecutionContextMockBuilder("_id").buildMock().watch(); + private final ThreadContext threadContext = new ThreadContext(Settings.EMPTY); + private final ClusterService clusterService = mock(ClusterService.class); + private final TriggerService triggerService = mock(TriggerService.class); + private final ActionListener listener = ActionListener.wrap(r -> {}, e -> assertThat(e, is(nullValue()))); @Before public void setupAction() throws Exception { - TriggerService triggerService = mock(TriggerService.class); - ClusterService clusterService = mock(ClusterService.class); ThreadPool threadPool = mock(ThreadPool.class); when(threadPool.getThreadContext()).thenReturn(threadContext); @@ -64,19 +77,24 @@ public void setupAction() throws Exception { WatchParser parser = mock(WatchParser.class); when(parser.parseWithSecrets(eq("_id"), eq(false), anyObject(), anyObject(), anyObject(), anyBoolean())).thenReturn(watch); + WatchStatus status = mock(WatchStatus.class); + WatchStatus.State state = new WatchStatus.State(true, DateTime.now(DateTimeZone.UTC)); + when(status.state()).thenReturn(state); + when(watch.status()).thenReturn(status); Client client = mock(Client.class); when(client.threadPool()).thenReturn(threadPool); // mock an index response that calls the listener doAnswer(invocation -> { - IndexRequest request = (IndexRequest) invocation.getArguments()[1]; - ActionListener listener = (ActionListener) invocation.getArguments()[2]; + UpdateRequest request = (UpdateRequest) invocation.getArguments()[0]; + ActionListener listener = (ActionListener) invocation.getArguments()[1]; ShardId shardId = new ShardId(new Index(Watch.INDEX, "uuid"), 0); - listener.onResponse(new IndexResponse(shardId, request.type(), request.id(), 1, 1, 1, true)); + listener.onResponse(new UpdateResponse(shardId, request.type(), request.id(), request.version(), + DocWriteResponse.Result.UPDATED)); return null; - }).when(client).execute(any(), any(), any()); + }).when(client).update(any(), any()); action = new TransportPutWatchAction(Settings.EMPTY, transportService, threadPool, new ActionFilters(Collections.emptySet()), new IndexNameExpressionResolver(Settings.EMPTY), new ClockMock(), @@ -84,7 +102,6 @@ public void setupAction() throws Exception { } public void testHeadersAreFilteredWhenPuttingWatches() throws Exception { - ClusterState state = mock(ClusterState.class); // set up threadcontext with some arbitrary info String headerName = randomFrom(ClientHelper.SECURITY_HEADER_FILTERS); threadContext.putHeader(headerName, randomAlphaOfLength(10)); @@ -92,7 +109,17 @@ public void testHeadersAreFilteredWhenPuttingWatches() throws Exception { PutWatchRequest putWatchRequest = new PutWatchRequest(); putWatchRequest.setId("_id"); - action.masterOperation(putWatchRequest, state, ActionListener.wrap(r -> {}, e -> assertThat(e, is(nullValue())))); + + ClusterState state = ClusterState.builder(new ClusterName("my_cluster")) + .nodes(DiscoveryNodes.builder() + .masterNodeId("node_1") + .localNodeId(randomFrom("node_1", "node_2")) + .add(newNode("node_1", Version.CURRENT)) + .add(newNode("node_2", Version.CURRENT))) + .build(); + when(clusterService.state()).thenReturn(state); + + action.masterOperation(putWatchRequest, state, listener); ArgumentCaptor captor = ArgumentCaptor.forClass(Map.class); verify(watch.status()).setHeaders(captor.capture()); @@ -100,4 +127,63 @@ public void testHeadersAreFilteredWhenPuttingWatches() throws Exception { assertThat(capturedHeaders.keySet(), hasSize(1)); assertThat(capturedHeaders, hasKey(headerName)); } -} \ No newline at end of file + + public void testWatchesAreNeverTriggeredWhenDistributed() throws Exception { + PutWatchRequest putWatchRequest = new PutWatchRequest(); + putWatchRequest.setId("_id"); + + ClusterState clusterState = ClusterState.builder(new ClusterName("my_cluster")) + .nodes(DiscoveryNodes.builder() + .masterNodeId("node_1") + .localNodeId(randomFrom("node_1", "node_2")) + .add(newNode("node_1", Version.CURRENT)) + .add(newNode("node_2", Version.CURRENT))) + .build(); + when(clusterService.state()).thenReturn(clusterState); + + action.masterOperation(putWatchRequest, clusterState, listener); + + verifyZeroInteractions(triggerService); + } + + public void testWatchesAreNotTriggeredOnNonMasterWhenNotDistributed() throws Exception { + PutWatchRequest putWatchRequest = new PutWatchRequest(); + putWatchRequest.setId("_id"); + + ClusterState clusterState = ClusterState.builder(new ClusterName("my_cluster")) + .nodes(DiscoveryNodes.builder() + .masterNodeId("node_2") + .localNodeId("node_1") + .add(newNode("node_1", Version.CURRENT)) + .add(newNode("node_2", Version.V_5_6_10))) + .build(); + when(clusterService.state()).thenReturn(clusterState); + + action.masterOperation(putWatchRequest, clusterState, listener); + + verifyZeroInteractions(triggerService); + } + + public void testWatchesAreTriggeredOnMasterWhenNotDistributed() throws Exception { + PutWatchRequest putWatchRequest = new PutWatchRequest(); + putWatchRequest.setId("_id"); + + ClusterState clusterState = ClusterState.builder(new ClusterName("my_cluster")) + .nodes(DiscoveryNodes.builder() + .masterNodeId("node_1") + .localNodeId("node_1") + .add(newNode("node_1", Version.CURRENT)) + .add(newNode("node_2", Version.V_5_6_10))) + .build(); + when(clusterService.state()).thenReturn(clusterState); + + action.masterOperation(putWatchRequest, clusterState, listener); + + verify(triggerService).add(eq(watch)); + } + + private static DiscoveryNode newNode(String nodeId, Version version) { + return new DiscoveryNode(nodeId, ESTestCase.buildNewFakeTransportAddress(), Collections.emptyMap(), + new HashSet<>(asList(DiscoveryNode.Role.values())), version); + } +}