Skip to content

Commit

Permalink
Use setIfSeqNo(...) and setIfPrimaryTerm(...) for updating watch stat…
Browse files Browse the repository at this point in the history
…us if all nodes are at least on 6.7.0 (#40888)

Only use UpdateRequest#setIfSeqNo(...) and UpdateRequest#setIfPrimaryTerm(...) for updating watch status if all nodes are at least on 6.7.0.
Otherwise fallback using UpdateRequest#version(...)

Closes #40841
  • Loading branch information
martijnvg authored Apr 11, 2019
1 parent 70cca88 commit b366fb9
Show file tree
Hide file tree
Showing 8 changed files with 154 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ public class Watch implements ToXContentObject {

private final long sourceSeqNo;
private final long sourcePrimaryTerm;
private transient long version;

public Watch(String id, Trigger trigger, ExecutableInput input, ExecutableCondition condition, @Nullable ExecutableTransform transform,
@Nullable TimeValue throttlePeriod, List<ActionWrapper> actions, @Nullable Map<String, Object> metadata,
Expand Down Expand Up @@ -107,6 +108,14 @@ public long getSourcePrimaryTerm() {
return sourcePrimaryTerm;
}

public long version() {
return version;
}

public void version(long version) {
this.version = version;
}

/**
* Sets the state of this watch to in/active
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ public Engine.Index preIndex(ShardId shardId, Engine.Index operation) {
try {
Watch watch = parser.parseWithSecrets(operation.id(), true, operation.source(), now, XContentType.JSON,
operation.getIfSeqNo(), operation.getIfPrimaryTerm());
watch.version(operation.version());
ShardAllocationConfiguration shardAllocationConfiguration = configuration.localShards.get(shardId);
if (shardAllocationConfiguration == null) {
logger.debug("no distributed watch execution info found for watch [{}] on shard [{}], got configuration for {}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.apache.logging.log4j.util.Supplier;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkResponse;
Expand Down Expand Up @@ -278,8 +279,10 @@ record = ctx.abortBeforeExecution(ExecutionState.NOT_EXECUTED_ALREADY_QUEUED, "W
if (resp.isExists() == false) {
throw new ResourceNotFoundException("watch [{}] does not exist", watchId);
}
return parser.parseWithSecrets(watchId, true, resp.getSourceAsBytesRef(), ctx.executionTime(), XContentType.JSON,
resp.getSeqNo(), resp.getPrimaryTerm());
Watch watch = parser.parseWithSecrets(watchId, true, resp.getSourceAsBytesRef(), ctx.executionTime(),
XContentType.JSON, resp.getSeqNo(), resp.getPrimaryTerm());
watch.version(resp.getVersion());
return watch;
});
} catch (ResourceNotFoundException e) {
String message = "unable to find watch for record [" + ctx.id() + "]";
Expand Down Expand Up @@ -350,8 +353,13 @@ public void updateWatchStatus(Watch watch) throws IOException {

UpdateRequest updateRequest = new UpdateRequest(Watch.INDEX, Watch.DOC_TYPE, watch.id());
updateRequest.doc(source);
updateRequest.setIfSeqNo(watch.getSourceSeqNo());
updateRequest.setIfPrimaryTerm(watch.getSourcePrimaryTerm());
boolean useSeqNoForCAS = clusterService.state().nodes().getMinNodeVersion().onOrAfter(Version.V_6_7_0);
if (useSeqNoForCAS) {
updateRequest.setIfSeqNo(watch.getSourceSeqNo());
updateRequest.setIfPrimaryTerm(watch.getSourcePrimaryTerm());
} else {
updateRequest.version(watch.version());
}
try (ThreadContext.StoredContext ignore = stashWithOrigin(client.threadPool().getThreadContext(), WATCHER_ORIGIN)) {
client.update(updateRequest).actionGet(indexDefaultTimeout);
} catch (DocumentMissingException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ protected void masterOperation(AckWatchRequest request, ClusterState state,
DateTime now = new DateTime(clock.millis(), UTC);
Watch watch = parser.parseWithSecrets(request.getWatchId(), true, getResponse.getSourceAsBytesRef(),
now, XContentType.JSON, getResponse.getSeqNo(), getResponse.getPrimaryTerm());
watch.version(getResponse.getVersion());
watch.status().version(getResponse.getVersion());
String[] actionIds = request.getActionIds();
if (actionIds == null || actionIds.length == 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ protected void masterOperation(ActivateWatchRequest request, ClusterState state,
if (getResponse.isExists()) {
Watch watch = parser.parseWithSecrets(request.getWatchId(), true, getResponse.getSourceAsBytesRef(), now,
XContentType.JSON, getResponse.getSeqNo(), getResponse.getPrimaryTerm());
watch.version(getResponse.getVersion());
watch.status().version(getResponse.getVersion());
// if we are not yet running in distributed mode, only call triggerservice, if we are on the master node
if (localExecute(request) == false && this.clusterService.state().nodes().isLocalNodeElectedMaster()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ protected void masterOperation(GetWatchRequest request, ClusterState state,
DateTime now = new DateTime(clock.millis(), UTC);
Watch watch = parser.parseWithSecrets(request.getId(), true, getResponse.getSourceAsBytesRef(), now,
XContentType.JSON, getResponse.getSeqNo(), getResponse.getPrimaryTerm());
watch.version(getRequest.version());
watch.toXContent(builder, WatcherParams.builder()
.hideSecrets(true)
.includeStatus(false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.settings.Settings;
Expand Down Expand Up @@ -150,6 +153,9 @@ public void init() throws Exception {
DiscoveryNode discoveryNode = new DiscoveryNode("node_1", ESTestCase.buildNewFakeTransportAddress(), Collections.emptyMap(),
new HashSet<>(asList(DiscoveryNode.Role.values())), Version.CURRENT);
ClusterService clusterService = mock(ClusterService.class);
when(clusterService.state()).thenReturn(ClusterState.builder(new ClusterName("cluster"))
.nodes(DiscoveryNodes.builder().add(discoveryNode).build())
.build());
when(clusterService.localNode()).thenReturn(discoveryNode);

executionService = new ExecutionService(Settings.EMPTY, historyStore, triggeredWatchStore, executor, clock, parser,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.upgrades;

import org.elasticsearch.Version;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response;
import org.elasticsearch.cluster.routing.Murmur3HashFunction;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.xpack.core.watcher.condition.AlwaysCondition;

import java.io.IOException;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import static org.elasticsearch.xpack.watcher.actions.ActionBuilders.loggingAction;
import static org.elasticsearch.xpack.watcher.client.WatchSourceBuilders.watchBuilder;
import static org.elasticsearch.xpack.watcher.input.InputBuilders.simpleInput;
import static org.elasticsearch.xpack.watcher.trigger.TriggerBuilders.schedule;
import static org.elasticsearch.xpack.watcher.trigger.schedule.Schedules.interval;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;

/**
* This rolling upgrade node tests whether watcher is able to update the watch status after execution in a mixed cluster.
*
* Versions before 6.7.0 the watch status was using the version to do optimistic locking, after 6.7.0 sequence number and
* primary term are used. The problem was that bwc logic was forgotten to be added, so in a mixed versions cluster, when
* a watch is executed and its watch status is updated then an update request using sequence number / primary term as
* way to do optimistic locking can be sent to nodes that don't support this.
*
* This test tries to simulate a situation where the bug manifests. This requires watches to be run by multiple nodes
* holding a .watches index shard.
*/
public class WatcherUpgradeIT extends AbstractUpgradeTestCase {

public void testWatchesKeepRunning() throws Exception {
if (UPGRADED_FROM_VERSION.before(Version.V_6_0_0)) {
logger.info("Skipping test. Upgrading from before 6.0 makes this test too complicated.");
return;
}

final int numWatches = 16;

if (CLUSTER_TYPE.equals(ClusterType.OLD)) {
final String watch = watchBuilder()
.trigger(schedule(interval("5s")))
.input(simpleInput())
.condition(AlwaysCondition.INSTANCE)
.addAction("_action1", loggingAction("{{ctx.watch_id}}"))
.buildAsBytes(XContentType.JSON)
.utf8ToString();

for (int i = 0; i < numWatches; i++) {
// Using a random id helps to distribute the watches between watcher services on the different nodes with
// a .watches index shard:
String id = UUIDs.randomBase64UUID();
logger.info("Adding watch [{}/{}]", id, Math.floorMod(Murmur3HashFunction.hash(id), 3));
Request putWatchRequest = new Request("PUT", "/_xpack/watcher/watch/" + id);
putWatchRequest.setJsonEntity(watch);
assertOK(client().performRequest(putWatchRequest));

if (i == 0) {
// Increasing the number of replicas to makes it more likely that an upgraded node sends an
// update request (in order to update watch status) to a non upgraded node.
Request updateSettingsRequest = new Request("PUT", "/.watches/_settings");
updateSettingsRequest.setJsonEntity(Strings.toString(Settings.builder()
.put("index.number_of_replicas", 2)
.put("index.auto_expand_replicas", (String) null)
.build()));
assertOK(client().performRequest(updateSettingsRequest));
ensureAllWatchesIndexShardsStarted();
}
}
} else {
ensureAllWatchesIndexShardsStarted();
// Restarting watcher helps to ensure that after a node upgrade each node will be executing watches:
// (and not that a non upgraded node is in charge of watches that an upgraded node should run)
assertOK(client().performRequest(new Request("POST", "/_xpack/watcher/_stop")));
assertOK(client().performRequest(new Request("POST", "/_xpack/watcher/_start")));

// Casually checking whether watches are executing:
for (int i = 0; i < 10; i++) {
int previous = getWatchHistoryEntriesCount();
assertBusy(() -> {
Integer totalHits = getWatchHistoryEntriesCount();
assertThat(totalHits, greaterThan(previous));
}, 30, TimeUnit.SECONDS);
}
}
}

private int getWatchHistoryEntriesCount() throws IOException {
Request refreshRequest = new Request("POST", "/.watcher-history-*/_refresh");
assertOK(client().performRequest(refreshRequest));

Request searchRequest = new Request("GET", "/.watcher-history-*/_search");
searchRequest.setJsonEntity("{\"query\": {\"match\": {\"state\": {\"query\": \"executed\"}}}}");

Response response = client().performRequest(searchRequest);
assertEquals(200, response.getStatusLine().getStatusCode());
Map<String, Object> responseBody = entityAsMap(response);
return (Integer) ((Map<?, ?>) responseBody.get("hits")).get("total");
}

private void ensureAllWatchesIndexShardsStarted() throws Exception {
assertBusy(() -> {
Request request = new Request("GET", "/_cluster/health/.watches");
Response response = client().performRequest(request);
assertEquals(200, response.getStatusLine().getStatusCode());
Map<String, Object> responseBody = entityAsMap(response);
int activeShards = (int) responseBody.get("active_shards");
assertThat(activeShards, equalTo(3));
}, 30, TimeUnit.SECONDS);
}

}

0 comments on commit b366fb9

Please sign in to comment.