Skip to content

Commit

Permalink
Watcher: Fix check for currently executed watches (#31137)
Browse files Browse the repository at this point in the history
The ack watch action has a check for currently executed watches, to make
sure that currently running watches cannot be acknowledged. This check
only checked on the coordinating node for watches being executed, but should
have checked the whole cluster using a WatcherStatsRequest, which is
being switched to in this commit.
  • Loading branch information
spinscale committed Jul 4, 2018
1 parent e80686c commit 6d17e91
Show file tree
Hide file tree
Showing 2 changed files with 87 additions and 62 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,13 @@
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.watcher.actions.ActionWrapper;
import org.elasticsearch.xpack.core.watcher.execution.WatchExecutionSnapshot;
import org.elasticsearch.xpack.core.watcher.transport.actions.ack.AckWatchAction;
import org.elasticsearch.xpack.core.watcher.transport.actions.ack.AckWatchRequest;
import org.elasticsearch.xpack.core.watcher.transport.actions.ack.AckWatchResponse;
import org.elasticsearch.xpack.core.watcher.transport.actions.stats.WatcherStatsAction;
import org.elasticsearch.xpack.core.watcher.transport.actions.stats.WatcherStatsRequest;
import org.elasticsearch.xpack.core.watcher.watch.Watch;
import org.elasticsearch.xpack.core.watcher.watch.WatchField;
import org.elasticsearch.xpack.watcher.execution.ExecutionService;
import org.elasticsearch.xpack.watcher.transport.actions.WatcherTransportAction;
import org.elasticsearch.xpack.watcher.watch.WatchParser;
import org.joda.time.DateTime;
Expand All @@ -53,85 +53,88 @@ public class TransportAckWatchAction extends WatcherTransportAction<AckWatchRequ

private final Clock clock;
private final WatchParser parser;
private ExecutionService executionService;
private final Client client;

@Inject
public TransportAckWatchAction(Settings settings, TransportService transportService, ThreadPool threadPool, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver, Clock clock, XPackLicenseState licenseState,
WatchParser parser, ExecutionService executionService, Client client, ClusterService clusterService) {
WatchParser parser, Client client, ClusterService clusterService) {
super(settings, AckWatchAction.NAME, transportService, threadPool, actionFilters, indexNameExpressionResolver,
licenseState, clusterService, AckWatchRequest::new, AckWatchResponse::new);
this.clock = clock;
this.parser = parser;
this.executionService = executionService;
this.client = client;
}

@Override
protected void masterOperation(AckWatchRequest request, ClusterState state,
ActionListener<AckWatchResponse> listener) throws Exception {
// if the watch to be acked is running currently, reject this request
List<WatchExecutionSnapshot> snapshots = executionService.currentExecutions();
boolean isWatchRunning = snapshots.stream().anyMatch(s -> s.watchId().equals(request.getWatchId()));
if (isWatchRunning) {
listener.onFailure(new ElasticsearchStatusException("watch[{}] is running currently, cannot ack until finished",
WatcherStatsRequest watcherStatsRequest = new WatcherStatsRequest();
watcherStatsRequest.includeCurrentWatches(true);

executeAsyncWithOrigin(client, WATCHER_ORIGIN, WatcherStatsAction.INSTANCE, watcherStatsRequest, ActionListener.wrap(response -> {
boolean isWatchRunning = response.getNodes().stream()
.anyMatch(node -> node.getSnapshots().stream().anyMatch(snapshot -> snapshot.watchId().equals(request.getWatchId())));
if (isWatchRunning) {
listener.onFailure(new ElasticsearchStatusException("watch[{}] is running currently, cannot ack until finished",
RestStatus.CONFLICT, request.getWatchId()));
return;
}

GetRequest getRequest = new GetRequest(Watch.INDEX, Watch.DOC_TYPE, request.getWatchId())
.preference(Preference.LOCAL.type()).realtime(true);

executeAsyncWithOrigin(client.threadPool().getThreadContext(), WATCHER_ORIGIN, getRequest,
ActionListener.<GetResponse>wrap((response) -> {
if (response.isExists() == false) {
listener.onFailure(new ResourceNotFoundException("Watch with id [{}] does not exist", request.getWatchId()));
} else {
DateTime now = new DateTime(clock.millis(), UTC);
Watch watch = parser.parseWithSecrets(request.getWatchId(), true, response.getSourceAsBytesRef(),
} else {
GetRequest getRequest = new GetRequest(Watch.INDEX, Watch.DOC_TYPE, request.getWatchId())
.preference(Preference.LOCAL.type()).realtime(true);

executeAsyncWithOrigin(client.threadPool().getThreadContext(), WATCHER_ORIGIN, getRequest,
ActionListener.<GetResponse>wrap(getResponse -> {
if (getResponse.isExists() == false) {
listener.onFailure(new ResourceNotFoundException("Watch with id [{}] does not exist", request.getWatchId()));
} else {
DateTime now = new DateTime(clock.millis(), UTC);
Watch watch = parser.parseWithSecrets(request.getWatchId(), true, getResponse.getSourceAsBytesRef(),
now, XContentType.JSON);
watch.version(response.getVersion());
watch.status().version(response.getVersion());
String[] actionIds = request.getActionIds();
if (actionIds == null || actionIds.length == 0) {
actionIds = new String[]{WatchField.ALL_ACTIONS_ID};
}
watch.version(getResponse.getVersion());
watch.status().version(getResponse.getVersion());
String[] actionIds = request.getActionIds();
if (actionIds == null || actionIds.length == 0) {
actionIds = new String[]{WatchField.ALL_ACTIONS_ID};
}

// exit early in case nothing changes
boolean isChanged = watch.ack(now, actionIds);
if (isChanged == false) {
listener.onResponse(new AckWatchResponse(watch.status()));
return;
}
// exit early in case nothing changes
boolean isChanged = watch.ack(now, actionIds);
if (isChanged == false) {
listener.onResponse(new AckWatchResponse(watch.status()));
return;
}

UpdateRequest updateRequest = new UpdateRequest(Watch.INDEX, Watch.DOC_TYPE, request.getWatchId());
// this may reject this action, but prevents concurrent updates from a watch execution
updateRequest.version(response.getVersion());
updateRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
XContentBuilder builder = jsonBuilder();
builder.startObject()
UpdateRequest updateRequest = new UpdateRequest(Watch.INDEX, Watch.DOC_TYPE, request.getWatchId());
// this may reject this action, but prevents concurrent updates from a watch execution
updateRequest.version(getResponse.getVersion());
updateRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
XContentBuilder builder = jsonBuilder();
builder.startObject()
.startObject(WatchField.STATUS.getPreferredName())
.startObject("actions");

List<String> actionIdsAsList = Arrays.asList(actionIds);
boolean updateAll = actionIdsAsList.contains("_all");
for (ActionWrapper actionWrapper : watch.actions()) {
if (updateAll || actionIdsAsList.contains(actionWrapper.id())) {
builder.startObject(actionWrapper.id())
List<String> actionIdsAsList = Arrays.asList(actionIds);
boolean updateAll = actionIdsAsList.contains("_all");
for (ActionWrapper actionWrapper : watch.actions()) {
if (updateAll || actionIdsAsList.contains(actionWrapper.id())) {
builder.startObject(actionWrapper.id())
.field("ack", watch.status().actionStatus(actionWrapper.id()).ackStatus(), ToXContent.EMPTY_PARAMS)
.endObject();
}
}
}

builder.endObject().endObject().endObject();
updateRequest.doc(builder);
builder.endObject().endObject().endObject();
updateRequest.doc(builder);

executeAsyncWithOrigin(client.threadPool().getThreadContext(), WATCHER_ORIGIN, updateRequest,
executeAsyncWithOrigin(client.threadPool().getThreadContext(), WATCHER_ORIGIN, updateRequest,
ActionListener.<UpdateResponse>wrap(
(updateResponse) -> listener.onResponse(new AckWatchResponse(watch.status())),
listener::onFailure), client::update);
}
}, listener::onFailure), client::get);
(updateResponse) -> listener.onResponse(new AckWatchResponse(watch.status())),
listener::onFailure), client::update);
}
}, listener::onFailure), client::get);

}

}, listener::onFailure));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,16 @@
package org.elasticsearch.xpack.watcher.transport.actions.ack;

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.ContextPreservingActionListener;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.settings.Settings;
Expand All @@ -22,11 +26,13 @@
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.watcher.WatcherMetaData;
import org.elasticsearch.xpack.core.watcher.execution.WatchExecutionSnapshot;
import org.elasticsearch.xpack.core.watcher.transport.actions.ack.AckWatchRequest;
import org.elasticsearch.xpack.core.watcher.transport.actions.ack.AckWatchResponse;
import org.elasticsearch.xpack.core.watcher.transport.actions.stats.WatcherStatsAction;
import org.elasticsearch.xpack.core.watcher.transport.actions.stats.WatcherStatsResponse;
import org.elasticsearch.xpack.core.watcher.watch.Watch;
import org.elasticsearch.xpack.watcher.execution.ExecutionService;
import org.elasticsearch.xpack.watcher.watch.WatchParser;
import org.junit.Before;

Expand All @@ -36,14 +42,14 @@

import static org.hamcrest.Matchers.is;
import static org.mockito.Matchers.anyObject;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

public class TransportAckWatchActionTests extends ESTestCase {

private TransportAckWatchAction action;
private ExecutionService executionService;
private Client client;

@Before
Expand All @@ -53,13 +59,13 @@ public void setupAction() {
ThreadContext threadContext = new ThreadContext(Settings.EMPTY);
when(threadPool.getThreadContext()).thenReturn(threadContext);
WatchParser watchParser = mock(WatchParser.class);
executionService = mock(ExecutionService.class);
ClusterService clusterService = mock(ClusterService.class);
client = mock(Client.class);
when(client.threadPool()).thenReturn(threadPool);
action = new TransportAckWatchAction(Settings.EMPTY, transportService, threadPool,
new ActionFilters(Collections.emptySet()), new IndexNameExpressionResolver(Settings.EMPTY),
Clock.systemUTC(), new XPackLicenseState(Settings.EMPTY), watchParser, executionService, client, clusterService);
Clock.systemUTC(), new XPackLicenseState(Settings.EMPTY), watchParser, client, clusterService);
when(client.threadPool()).thenReturn(threadPool);
}

public void testWatchNotFound() throws Exception {
Expand All @@ -71,6 +77,13 @@ public void testWatchNotFound() throws Exception {
return null;
}).when(client).get(anyObject(), anyObject());

doAnswer(invocation -> {
ContextPreservingActionListener listener = (ContextPreservingActionListener) invocation.getArguments()[2];
listener.onResponse(new WatcherStatsResponse(new ClusterName("clusterName"), new WatcherMetaData(false),
Collections.emptyList(), Collections.emptyList()));
return null;
}).when(client).execute(eq(WatcherStatsAction.INSTANCE), anyObject(), anyObject());

AckWatchRequest ackWatchRequest = new AckWatchRequest(watchId);
PlainActionFuture<AckWatchResponse> listener = PlainActionFuture.newFuture();
action.masterOperation(ackWatchRequest, null, listener);
Expand All @@ -82,9 +95,18 @@ public void testWatchNotFound() throws Exception {

public void testThatWatchCannotBeAckedWhileRunning() throws Exception {
String watchId = "my_watch_id";
WatchExecutionSnapshot snapshot = mock(WatchExecutionSnapshot.class);
when(snapshot.watchId()).thenReturn(watchId);
when(executionService.currentExecutions()).thenReturn(Collections.singletonList(snapshot));

doAnswer(invocation -> {
ContextPreservingActionListener listener = (ContextPreservingActionListener) invocation.getArguments()[2];
DiscoveryNode discoveryNode = new DiscoveryNode("node_2", buildNewFakeTransportAddress(), Version.CURRENT);
WatcherStatsResponse.Node node = new WatcherStatsResponse.Node(discoveryNode);
WatchExecutionSnapshot snapshot = mock(WatchExecutionSnapshot.class);
when(snapshot.watchId()).thenReturn(watchId);
node.setSnapshots(Collections.singletonList(snapshot));
listener.onResponse(new WatcherStatsResponse(new ClusterName("clusterName"),
new WatcherMetaData(false), Collections.singletonList(node), Collections.emptyList()));
return null;
}).when(client).execute(eq(WatcherStatsAction.INSTANCE), anyObject(), anyObject());

AckWatchRequest ackWatchRequest = new AckWatchRequest(watchId);
PlainActionFuture<AckWatchResponse> listener = PlainActionFuture.newFuture();
Expand All @@ -95,4 +117,4 @@ public void testThatWatchCannotBeAckedWhileRunning() throws Exception {
assertThat(e.getMessage(), is("watch[my_watch_id] is running currently, cannot ack until finished"));
assertThat(e.status(), is(RestStatus.CONFLICT));
}
}
}

0 comments on commit 6d17e91

Please sign in to comment.