Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[7.x] Watcher dont add watches post index if stopped (#56556) #56629

Merged
merged 2 commits into from
May 12, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -427,7 +427,7 @@ public void afterBulk(long executionId, BulkRequest request, Throwable failure)
final WatcherLifeCycleService watcherLifeCycleService =
new WatcherLifeCycleService(clusterService, watcherService);

listener = new WatcherIndexingListener(watchParser, getClock(), triggerService);
listener = new WatcherIndexingListener(watchParser, getClock(), triggerService, watcherLifeCycleService.getState());
clusterService.addListener(listener);

return Arrays.asList(registry, inputRegistry, historyStore, triggerService, triggeredWatchParser,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.shard.IndexingOperationListener;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.xpack.core.watcher.WatcherState;
import org.elasticsearch.xpack.core.watcher.watch.Watch;
import org.elasticsearch.xpack.watcher.trigger.TriggerService;
import org.elasticsearch.xpack.watcher.watch.WatchParser;
Expand All @@ -38,11 +39,13 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Supplier;
import java.util.stream.Collectors;

import static org.elasticsearch.cluster.routing.ShardRoutingState.RELOCATING;
Expand All @@ -66,12 +69,14 @@ final class WatcherIndexingListener implements IndexingOperationListener, Cluste
private final WatchParser parser;
private final Clock clock;
private final TriggerService triggerService;
private final Supplier<WatcherState> watcherState;
private volatile Configuration configuration = INACTIVE;

WatcherIndexingListener(WatchParser parser, Clock clock, TriggerService triggerService) {
WatcherIndexingListener(WatchParser parser, Clock clock, TriggerService triggerService, Supplier<WatcherState> watcherState) {
this.parser = parser;
this.clock = clock;
this.triggerService = triggerService;
this.watcherState = watcherState;
}

// package private for testing
Expand Down Expand Up @@ -119,16 +124,17 @@ public void postIndex(ShardId shardId, Engine.Index operation, Engine.IndexResul
}

boolean shouldBeTriggered = shardAllocationConfiguration.shouldBeTriggered(watch.id());
if (shouldBeTriggered) {
if (watch.status().state().isActive()) {
WatcherState currentState = watcherState.get();
if (shouldBeTriggered && EnumSet.of(WatcherState.STOPPING, WatcherState.STOPPED).contains(currentState) == false) {
if (watch.status().state().isActive() ) {
logger.debug("adding watch [{}] to trigger service", watch.id());
triggerService.add(watch);
} else {
logger.debug("removing watch [{}] to trigger service", watch.id());
triggerService.remove(watch.id());
}
} else {
logger.debug("watch [{}] should not be triggered", watch.id());
logger.debug("watch [{}] should not be triggered. watcher state [{}]", watch.id(), currentState);
}
} catch (IOException e) {
throw new ElasticsearchParseException("Could not parse watch with id [{}]", e, operation.id());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import java.util.stream.Collectors;

import static org.elasticsearch.cluster.routing.ShardRoutingState.RELOCATING;
Expand Down Expand Up @@ -203,7 +204,7 @@ List<ShardRouting> shardRoutings() {
return previousShardRoutings.get();
}

public WatcherState getState() {
return state.get();
public Supplier<WatcherState> getState(){
return () -> state.get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ protected WatcherStatsResponse.Node newNodeResponse(StreamInput in) throws IOExc
@Override
protected WatcherStatsResponse.Node nodeOperation(WatcherStatsRequest.Node request) {
WatcherStatsResponse.Node statsResponse = new WatcherStatsResponse.Node(clusterService.localNode());
statsResponse.setWatcherState(lifeCycleService.getState());
statsResponse.setWatcherState(lifeCycleService.getState().get());
statsResponse.setThreadPoolQueueSize(executionService.executionThreadPoolQueueSize());
statsResponse.setThreadPoolMaxSize(executionService.executionThreadPoolMaxSize());
if (request.includeCurrentWatches()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.core.watcher.WatcherState;
import org.elasticsearch.xpack.core.watcher.watch.ClockMock;
import org.elasticsearch.xpack.core.watcher.watch.Watch;
import org.elasticsearch.xpack.core.watcher.watch.WatchStatus;
Expand Down Expand Up @@ -89,7 +90,7 @@ public class WatcherIndexingListenerTests extends ESTestCase {
@Before
public void setup() throws Exception {
clock.freeze();
listener = new WatcherIndexingListener(parser, clock, triggerService);
listener = new WatcherIndexingListener(parser, clock, triggerService, () -> WatcherState.STARTED);

Map<ShardId, ShardAllocationConfiguration> map = new HashMap<>();
map.put(shardId, new ShardAllocationConfiguration(0, 1, Collections.singletonList("foo")));
Expand Down Expand Up @@ -140,6 +141,29 @@ public void testPostIndex() throws Exception {
}
}

public void testPostIndexWhenStopped() throws Exception {
listener = new WatcherIndexingListener(parser, clock, triggerService, () -> WatcherState.STOPPED);
Map<ShardId, ShardAllocationConfiguration> map = new HashMap<>();
map.put(shardId, new ShardAllocationConfiguration(0, 1, Collections.singletonList("foo")));
listener.setConfiguration(new Configuration(Watch.INDEX, map));
when(operation.id()).thenReturn(randomAlphaOfLength(10));
when(operation.source()).thenReturn(BytesArray.EMPTY);
when(shardId.getIndexName()).thenReturn(Watch.INDEX);
List<Engine.Result.Type> types = new ArrayList<>(Arrays.asList(Engine.Result.Type.values()));
types.remove(Engine.Result.Type.FAILURE);
when(result.getResultType()).thenReturn(randomFrom(types));

boolean watchActive = randomBoolean();
boolean isNewWatch = randomBoolean();
Watch watch = mockWatch("_id", watchActive, isNewWatch);
when(parser.parseWithSecrets(anyObject(), eq(true), anyObject(), anyObject(), anyObject(), anyLong(), anyLong())).thenReturn(watch);

listener.postIndex(shardId, operation, result);
ZonedDateTime now = DateUtils.nowWithMillisResolution(clock);
verify(parser).parseWithSecrets(eq(operation.id()), eq(true), eq(BytesArray.EMPTY), eq(now), anyObject(), anyLong(), anyLong());
verifyZeroInteractions(triggerService);
}

// this test emulates an index with 10 shards, and ensures that triggering only happens on a
// single shard
public void testPostIndexWatchGetsOnlyTriggeredOnceAcrossAllShards() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,9 +179,9 @@ public void testManualStartStop() {
ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class);
verify(watcherService, times(1))
.stop(eq("watcher manually marked to shutdown by cluster state update"), captor.capture());
assertEquals(WatcherState.STOPPING, lifeCycleService.getState());
assertEquals(WatcherState.STOPPING, lifeCycleService.getState().get());
captor.getValue().run();
assertEquals(WatcherState.STOPPED, lifeCycleService.getState());
assertEquals(WatcherState.STOPPED, lifeCycleService.getState().get());

// Starting via cluster state update, as the watcher metadata block is removed/set to true
reset(watcherService);
Expand Down Expand Up @@ -480,7 +480,7 @@ public void testMasterOnlyNodeCanStart() {
new HashSet<>(roles), Version.CURRENT))).build();

lifeCycleService.clusterChanged(new ClusterChangedEvent("test", state, state));
assertThat(lifeCycleService.getState(), is(WatcherState.STARTED));
assertThat(lifeCycleService.getState().get(), is(WatcherState.STARTED));
}

public void testDataNodeWithoutDataCanStart() {
Expand All @@ -494,7 +494,7 @@ public void testDataNodeWithoutDataCanStart() {
.build();

lifeCycleService.clusterChanged(new ClusterChangedEvent("test", state, state));
assertThat(lifeCycleService.getState(), is(WatcherState.STARTED));
assertThat(lifeCycleService.getState().get(), is(WatcherState.STARTED));
}

// this emulates a node outage somewhere in the cluster that carried a watcher shard
Expand Down Expand Up @@ -584,7 +584,7 @@ private void startWatcher() {
when(watcherService.validate(state)).thenReturn(true);

lifeCycleService.clusterChanged(new ClusterChangedEvent("foo", state, emptyState));
assertThat(lifeCycleService.getState(), is(WatcherState.STARTED));
assertThat(lifeCycleService.getState().get(), is(WatcherState.STARTED));
verify(watcherService, times(1)).reload(eq(state), anyString());
assertThat(lifeCycleService.shardRoutings(), hasSize(1));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public void setupTransportAction() {
when(clusterService.state()).thenReturn(clusterState);

WatcherLifeCycleService watcherLifeCycleService = mock(WatcherLifeCycleService.class);
when(watcherLifeCycleService.getState()).thenReturn(WatcherState.STARTED);
when(watcherLifeCycleService.getState()).thenReturn(() -> WatcherState.STARTED);

ExecutionService executionService = mock(ExecutionService.class);
when(executionService.executionThreadPoolQueueSize()).thenReturn(100L);
Expand Down