Skip to content

Commit

Permalink
Watcher dont add watches post index if stopped (#56556)
Browse files Browse the repository at this point in the history
Watcher adds watches to the trigger service on the postIndex action
for the .watches index. This has the (intentional) side effect of also
adding the watches to the stats. The tests rely on these stats for their
assertions. The tests also start and stop Watcher between each test for
a clean slate.

When Watcher executes it updates the .watches index and upon this update
it will go through the postIndex method and end up added that watch to the
trigger service (and stats). Functionally this is not a problem, if Watcher
is stopping or stopped since Watcher is also paused and will not execute
the watch. However, with specific timing and expectations of a clean slate
can cause issues the test assertions against the stats.

This commit ensures that the postIndex action only adds to the trigger service
if the Watcher state is not stopping or stopped. When started back up it will
re-read index .watches.

This commit also un-mutes the tests related to #53177 and #56534
  • Loading branch information
jakelandis authored May 12, 2020
1 parent 918ef65 commit 5aa36ef
Show file tree
Hide file tree
Showing 9 changed files with 46 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,12 @@
*/
package org.elasticsearch.smoketest;

import org.apache.lucene.util.LuceneTestCase;
import org.elasticsearch.test.rest.yaml.ClientYamlTestCandidate;
import org.elasticsearch.xpack.watcher.WatcherYamlSuiteTestCase;

/**
* Runs the YAML rest tests against an external cluster
*/
@LuceneTestCase.AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/53177")
public class WatcherYamlRestIT extends WatcherYamlSuiteTestCase {
public WatcherYamlRestIT(ClientYamlTestCandidate testCandidate) {
super(testCandidate);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@

import com.carrotsearch.randomizedtesting.annotations.Name;
import com.carrotsearch.randomizedtesting.annotations.ParametersFactory;
import org.apache.lucene.util.LuceneTestCase;
import org.elasticsearch.client.Request;
import org.elasticsearch.common.settings.SecureString;
import org.elasticsearch.common.settings.Settings;
Expand All @@ -19,7 +18,6 @@

import static org.elasticsearch.xpack.test.SecuritySettingsSourceField.basicAuthHeaderValue;

@LuceneTestCase.AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/53177")
public class SmokeTestWatcherWithSecurityClientYamlTestSuiteIT extends WatcherYamlSuiteTestCase {

private static final String TEST_ADMIN_USERNAME = "test_admin";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -423,7 +423,7 @@ public void afterBulk(long executionId, BulkRequest request, Throwable failure)
final WatcherLifeCycleService watcherLifeCycleService =
new WatcherLifeCycleService(clusterService, watcherService);

listener = new WatcherIndexingListener(watchParser, getClock(), triggerService);
listener = new WatcherIndexingListener(watchParser, getClock(), triggerService, watcherLifeCycleService.getState());
clusterService.addListener(listener);

// note: clock is needed here until actions can be constructed directly instead of by guice
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.shard.IndexingOperationListener;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.xpack.core.watcher.WatcherState;
import org.elasticsearch.xpack.core.watcher.watch.Watch;
import org.elasticsearch.xpack.watcher.trigger.TriggerService;
import org.elasticsearch.xpack.watcher.watch.WatchParser;
Expand All @@ -38,11 +39,13 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Supplier;
import java.util.stream.Collectors;

import static org.elasticsearch.cluster.routing.ShardRoutingState.RELOCATING;
Expand All @@ -66,12 +69,14 @@ final class WatcherIndexingListener implements IndexingOperationListener, Cluste
private final WatchParser parser;
private final Clock clock;
private final TriggerService triggerService;
private final Supplier<WatcherState> watcherState;
private volatile Configuration configuration = INACTIVE;

WatcherIndexingListener(WatchParser parser, Clock clock, TriggerService triggerService) {
WatcherIndexingListener(WatchParser parser, Clock clock, TriggerService triggerService, Supplier<WatcherState> watcherState) {
this.parser = parser;
this.clock = clock;
this.triggerService = triggerService;
this.watcherState = watcherState;
}

// package private for testing
Expand Down Expand Up @@ -119,16 +124,17 @@ public void postIndex(ShardId shardId, Engine.Index operation, Engine.IndexResul
}

boolean shouldBeTriggered = shardAllocationConfiguration.shouldBeTriggered(watch.id());
if (shouldBeTriggered) {
if (watch.status().state().isActive()) {
WatcherState currentState = watcherState.get();
if (shouldBeTriggered && EnumSet.of(WatcherState.STOPPING, WatcherState.STOPPED).contains(currentState) == false) {
if (watch.status().state().isActive() ) {
logger.debug("adding watch [{}] to trigger service", watch.id());
triggerService.add(watch);
} else {
logger.debug("removing watch [{}] to trigger service", watch.id());
triggerService.remove(watch.id());
}
} else {
logger.debug("watch [{}] should not be triggered", watch.id());
logger.debug("watch [{}] should not be triggered. watcher state [{}]", watch.id(), currentState);
}
} catch (IOException e) {
throw new ElasticsearchParseException("Could not parse watch with id [{}]", e, operation.id());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import java.util.stream.Collectors;

import static org.elasticsearch.cluster.routing.ShardRoutingState.RELOCATING;
Expand Down Expand Up @@ -203,7 +204,7 @@ List<ShardRouting> shardRoutings() {
return previousShardRoutings.get();
}

public WatcherState getState() {
return state.get();
public Supplier<WatcherState> getState(){
return () -> state.get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ protected WatcherStatsResponse.Node newNodeResponse(StreamInput in) throws IOExc
@Override
protected WatcherStatsResponse.Node nodeOperation(WatcherStatsRequest.Node request, Task task) {
WatcherStatsResponse.Node statsResponse = new WatcherStatsResponse.Node(clusterService.localNode());
statsResponse.setWatcherState(lifeCycleService.getState());
statsResponse.setWatcherState(lifeCycleService.getState().get());
statsResponse.setThreadPoolQueueSize(executionService.executionThreadPoolQueueSize());
statsResponse.setThreadPoolMaxSize(executionService.executionThreadPoolMaxSize());
if (request.includeCurrentWatches()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.core.watcher.WatcherState;
import org.elasticsearch.xpack.core.watcher.watch.ClockMock;
import org.elasticsearch.xpack.core.watcher.watch.Watch;
import org.elasticsearch.xpack.core.watcher.watch.WatchStatus;
Expand Down Expand Up @@ -88,7 +89,7 @@ public class WatcherIndexingListenerTests extends ESTestCase {
@Before
public void setup() throws Exception {
clock.freeze();
listener = new WatcherIndexingListener(parser, clock, triggerService);
listener = new WatcherIndexingListener(parser, clock, triggerService, () -> WatcherState.STARTED);

Map<ShardId, ShardAllocationConfiguration> map = new HashMap<>();
map.put(shardId, new ShardAllocationConfiguration(0, 1, Collections.singletonList("foo")));
Expand Down Expand Up @@ -139,6 +140,29 @@ public void testPostIndex() throws Exception {
}
}

public void testPostIndexWhenStopped() throws Exception {
listener = new WatcherIndexingListener(parser, clock, triggerService, () -> WatcherState.STOPPED);
Map<ShardId, ShardAllocationConfiguration> map = new HashMap<>();
map.put(shardId, new ShardAllocationConfiguration(0, 1, Collections.singletonList("foo")));
listener.setConfiguration(new Configuration(Watch.INDEX, map));
when(operation.id()).thenReturn(randomAlphaOfLength(10));
when(operation.source()).thenReturn(BytesArray.EMPTY);
when(shardId.getIndexName()).thenReturn(Watch.INDEX);
List<Engine.Result.Type> types = new ArrayList<>(List.of(Engine.Result.Type.values()));
types.remove(Engine.Result.Type.FAILURE);
when(result.getResultType()).thenReturn(randomFrom(types));

boolean watchActive = randomBoolean();
boolean isNewWatch = randomBoolean();
Watch watch = mockWatch("_id", watchActive, isNewWatch);
when(parser.parseWithSecrets(anyObject(), eq(true), anyObject(), anyObject(), anyObject(), anyLong(), anyLong())).thenReturn(watch);

listener.postIndex(shardId, operation, result);
ZonedDateTime now = DateUtils.nowWithMillisResolution(clock);
verify(parser).parseWithSecrets(eq(operation.id()), eq(true), eq(BytesArray.EMPTY), eq(now), anyObject(), anyLong(), anyLong());
verifyZeroInteractions(triggerService);
}

// this test emulates an index with 10 shards, and ensures that triggering only happens on a
// single shard
public void testPostIndexWatchGetsOnlyTriggeredOnceAcrossAllShards() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,9 +179,9 @@ public void testManualStartStop() {
ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class);
verify(watcherService, times(1))
.stop(eq("watcher manually marked to shutdown by cluster state update"), captor.capture());
assertEquals(WatcherState.STOPPING, lifeCycleService.getState());
assertEquals(WatcherState.STOPPING, lifeCycleService.getState().get());
captor.getValue().run();
assertEquals(WatcherState.STOPPED, lifeCycleService.getState());
assertEquals(WatcherState.STOPPED, lifeCycleService.getState().get());

// Starting via cluster state update, as the watcher metadata block is removed/set to true
reset(watcherService);
Expand Down Expand Up @@ -480,7 +480,7 @@ public void testMasterOnlyNodeCanStart() {
new HashSet<>(roles), Version.CURRENT))).build();

lifeCycleService.clusterChanged(new ClusterChangedEvent("test", state, state));
assertThat(lifeCycleService.getState(), is(WatcherState.STARTED));
assertThat(lifeCycleService.getState().get(), is(WatcherState.STARTED));
}

public void testDataNodeWithoutDataCanStart() {
Expand All @@ -494,7 +494,7 @@ public void testDataNodeWithoutDataCanStart() {
.build();

lifeCycleService.clusterChanged(new ClusterChangedEvent("test", state, state));
assertThat(lifeCycleService.getState(), is(WatcherState.STARTED));
assertThat(lifeCycleService.getState().get(), is(WatcherState.STARTED));
}

// this emulates a node outage somewhere in the cluster that carried a watcher shard
Expand Down Expand Up @@ -584,7 +584,7 @@ private void startWatcher() {
when(watcherService.validate(state)).thenReturn(true);

lifeCycleService.clusterChanged(new ClusterChangedEvent("foo", state, emptyState));
assertThat(lifeCycleService.getState(), is(WatcherState.STARTED));
assertThat(lifeCycleService.getState().get(), is(WatcherState.STARTED));
verify(watcherService, times(1)).reload(eq(state), anyString());
assertThat(lifeCycleService.shardRoutings(), hasSize(1));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public void setupTransportAction() {
when(clusterService.state()).thenReturn(clusterState);

WatcherLifeCycleService watcherLifeCycleService = mock(WatcherLifeCycleService.class);
when(watcherLifeCycleService.getState()).thenReturn(WatcherState.STARTED);
when(watcherLifeCycleService.getState()).thenReturn(() -> WatcherState.STARTED);

ExecutionService executionService = mock(ExecutionService.class);
when(executionService.executionThreadPoolQueueSize()).thenReturn(100L);
Expand Down

0 comments on commit 5aa36ef

Please sign in to comment.