Skip to content

Commit

Permalink
Watcher add stopped listener (elastic#43939)
Browse files Browse the repository at this point in the history
When Watcher is stopped and there are still outstanding watches running
Watcher will report it self as stopped. In normal cases, this is not problematic.

However, for integration tests Watcher is started and stopped between
each test to help ensure a clean slate for each test. The tests are blocking
only on the stopped state and make an implicit assumption that all watches are
finished if the Watcher is stopped. This is an incorrect assumption since
Stopped really means, "I will not accept any more watches". This can lead to
un-predictable behavior in the tests such as message : "Watch is already queued
in thread pool" and state: "not_executed_already_queued".
This can also change the .watcher-history if watches linger between tests.

This commit changes the semantics of a manual stopping watcher to now mean:
"I will not accept any more watches AND all running watches are complete".
There is now an intermediary step "Stopping" and callback to allow transition
to a "Stopped" state when all Watches have completed.

Additionally since this impacts how long the tests will block waiting for a
"Stopped" state, the timeout has been increased.

Related: elastic#42409
  • Loading branch information
jakelandis committed Aug 16, 2019
1 parent 50c65d0 commit e233f7a
Show file tree
Hide file tree
Showing 14 changed files with 110 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
*/
package org.elasticsearch.xpack.watcher;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateListener;
Expand All @@ -25,6 +27,7 @@

import java.util.Collections;
import java.util.Comparator;
import java.util.EnumSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
Expand All @@ -35,10 +38,12 @@

public class WatcherLifeCycleService implements ClusterStateListener {

private static final Logger logger = LogManager.getLogger(WatcherLifeCycleService.class);
private final AtomicReference<WatcherState> state = new AtomicReference<>(WatcherState.STARTED);
private final AtomicReference<List<ShardRouting>> previousShardRoutings = new AtomicReference<>(Collections.emptyList());
private volatile boolean shutDown = false; // indicates that the node has been shutdown and we should never start watcher after this.
private volatile WatcherService watcherService;
private final EnumSet<WatcherState> stopStates = EnumSet.of(WatcherState.STOPPED, WatcherState.STOPPING);

WatcherLifeCycleService(ClusterService clusterService, WatcherService watcherService) {
this.watcherService = watcherService;
Expand All @@ -57,8 +62,10 @@ synchronized void shutDown() {
this.state.set(WatcherState.STOPPING);
shutDown = true;
clearAllocationIds();
watcherService.shutDown();
this.state.set(WatcherState.STOPPED);
watcherService.shutDown(() -> {
this.state.set(WatcherState.STOPPED);
logger.info("watcher has stopped and shutdown");
});
}

/**
Expand Down Expand Up @@ -88,9 +95,10 @@ public void clusterChanged(ClusterChangedEvent event) {
}

boolean isWatcherStoppedManually = isWatcherStoppedManually(event.state());
boolean isStoppedOrStopping = stopStates.contains(this.state.get());
// if this is not a data node, we need to start it ourselves possibly
if (event.state().nodes().getLocalNode().isDataNode() == false &&
isWatcherStoppedManually == false && this.state.get() == WatcherState.STOPPED) {
isWatcherStoppedManually == false && isStoppedOrStopping) {
this.state.set(WatcherState.STARTING);
watcherService.start(event.state(), () -> this.state.set(WatcherState.STARTED));
return;
Expand All @@ -99,8 +107,20 @@ public void clusterChanged(ClusterChangedEvent event) {
if (isWatcherStoppedManually) {
if (this.state.get() == WatcherState.STARTED) {
clearAllocationIds();
watcherService.stop("watcher manually marked to shutdown by cluster state update");
this.state.set(WatcherState.STOPPED);
boolean stopping = this.state.compareAndSet(WatcherState.STARTED, WatcherState.STOPPING);
if (stopping) {
//waiting to set state to stopped until after all currently running watches are finished
watcherService.stop("watcher manually marked to shutdown by cluster state update", () -> {
//only transition from stopping -> stopped (which may not be the case if restarted quickly)
boolean stopped = state.compareAndSet(WatcherState.STOPPING, WatcherState.STOPPED);
if (stopped) {
logger.info("watcher has stopped");
} else {
logger.info("watcher has not been stopped. not currently in a stopping state, current state [{}]", state.get());
}

});
}
}
return;
}
Expand Down Expand Up @@ -142,7 +162,7 @@ public void clusterChanged(ClusterChangedEvent event) {
previousShardRoutings.set(localAffectedShardRoutings);
if (state.get() == WatcherState.STARTED) {
watcherService.reload(event.state(), "new local watcher shard allocation ids");
} else if (state.get() == WatcherState.STOPPED) {
} else if (isStoppedOrStopping) {
this.state.set(WatcherState.STARTING);
watcherService.start(event.state(), () -> this.state.set(WatcherState.STARTED));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.ClientHelper;
import org.elasticsearch.xpack.core.upgrade.UpgradeField;
import org.elasticsearch.xpack.core.watcher.WatcherState;
import org.elasticsearch.xpack.core.watcher.execution.TriggeredWatchStoreField;
import org.elasticsearch.xpack.core.watcher.watch.Watch;
import org.elasticsearch.xpack.watcher.execution.ExecutionService;
Expand Down Expand Up @@ -144,24 +145,29 @@ public boolean validate(ClusterState state) {
}

/**
* Stops the watcher service and marks its services as paused
* Stops the watcher service and marks its services as paused. Callers should set the Watcher state to {@link WatcherState#STOPPING}
* prior to calling this method.
*
* @param stoppedListener The listener that will set Watcher state to: {@link WatcherState#STOPPED}, may not be {@code null}
*/
public void stop(String reason) {
public void stop(String reason, Runnable stoppedListener) {
assert stoppedListener != null;
logger.info("stopping watch service, reason [{}]", reason);
executionService.pause();
executionService.pause(stoppedListener);
triggerService.pauseExecution();
}

/**
* shuts down the trigger service as well to make sure there are no lingering threads
* also no need to check anything, as this is final, we just can go to status STOPPED
*
* @param stoppedListener The listener that will set Watcher state to: {@link WatcherState#STOPPED}, may not be {@code null}
*/
void shutDown() {
void shutDown(Runnable stoppedListener) {
assert stoppedListener != null;
logger.info("stopping watch service, reason [shutdown initiated]");
executionService.pause();
executionService.pause(stoppedListener);
triggerService.stop();
stopExecutor();
logger.debug("watch service has stopped");
}

void stopExecutor() {
Expand All @@ -185,7 +191,7 @@ void reload(ClusterState state, String reason) {
processedClusterStateVersion.set(state.getVersion());

triggerService.pauseExecution();
int cancelledTaskCount = executionService.clearExecutionsAndQueue();
int cancelledTaskCount = executionService.clearExecutionsAndQueue(() -> {});
logger.info("reloading watcher, reason [{}], cancelled [{}] queued tasks", reason, cancelledTaskCount);

executor.execute(wrapWatcherService(() -> reloadInner(state, reason, false),
Expand Down Expand Up @@ -256,7 +262,7 @@ private synchronized boolean reloadInner(ClusterState state, String reason, bool
*/
public void pauseExecution(String reason) {
triggerService.pauseExecution();
int cancelledTaskCount = executionService.pause();
int cancelledTaskCount = executionService.pause(() -> {});
logger.info("paused watch execution, reason [{}], cancelled [{}] queued tasks", reason, cancelledTaskCount);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,11 @@
*/
package org.elasticsearch.xpack.watcher.execution;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.util.SetOnce;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.xpack.core.watcher.WatcherState;

import java.util.Iterator;
import java.util.concurrent.ConcurrentHashMap;
Expand All @@ -19,6 +22,7 @@

public final class CurrentExecutions implements Iterable<ExecutionService.WatchExecution> {

private static final Logger logger = LogManager.getLogger(CurrentExecutions.class);
private final ConcurrentMap<String, ExecutionService.WatchExecution> currentExecutions = new ConcurrentHashMap<>();
// the condition of the lock is used to wait and signal the finishing of all executions on shutdown
private final ReentrantLock lock = new ReentrantLock();
Expand Down Expand Up @@ -63,9 +67,12 @@ public void remove(String id) {
* Calling this method makes the class stop accepting new executions and throws and exception instead.
* In addition it waits for a certain amount of time for current executions to finish before returning
*
* @param maxStopTimeout The maximum wait time to wait to current executions to finish
* @param maxStopTimeout The maximum wait time to wait to current executions to finish
* @param stoppedListener The listener that will set Watcher state to: {@link WatcherState#STOPPED}, may be a no-op assuming the
* {@link WatcherState#STOPPED} is set elsewhere or not needed to be set.
*/
void sealAndAwaitEmpty(TimeValue maxStopTimeout) {
void sealAndAwaitEmpty(TimeValue maxStopTimeout, Runnable stoppedListener) {
assert stoppedListener != null;
lock.lock();
// We may have current executions still going on.
// We should try to wait for the current executions to have completed.
Expand All @@ -81,6 +88,8 @@ void sealAndAwaitEmpty(TimeValue maxStopTimeout) {
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
//fully stop Watcher after all executions are finished
stoppedListener.run();
lock.unlock();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.index.engine.DocumentMissingException;
import org.elasticsearch.index.engine.VersionConflictEngineException;
import org.elasticsearch.xpack.core.watcher.WatcherState;
import org.elasticsearch.xpack.core.watcher.actions.ActionWrapper;
import org.elasticsearch.xpack.core.watcher.actions.ActionWrapperResult;
import org.elasticsearch.xpack.core.watcher.common.stats.Counters;
Expand Down Expand Up @@ -137,23 +138,31 @@ public void unPause() {
* Pausing means, that no new watch executions will be done unless this pausing is explicitly unset.
* This is important when watcher is stopped, so that scheduled watches do not accidentally get executed.
* This should not be used when we need to reload watcher based on some cluster state changes, then just calling
* {@link #clearExecutionsAndQueue()} is the way to go
* {@link #clearExecutionsAndQueue(Runnable)} is the way to go
*
* @param stoppedListener The listener that will set Watcher state to: {@link WatcherState#STOPPED}, may be a no-op assuming the
* {@link WatcherState#STOPPED} is set elsewhere or not needed to be set.
*
* @return the number of tasks that have been removed
*/
public int pause() {
public int pause(Runnable stoppedListener) {
assert stoppedListener != null;
paused.set(true);
return clearExecutionsAndQueue();
return clearExecutionsAndQueue(stoppedListener);
}

/**
* Empty the currently queued tasks and wait for current executions to finish.
*
* @param stoppedListener The listener that will set Watcher state to: {@link WatcherState#STOPPED}, may be a no-op assuming the
* {@link WatcherState#STOPPED} is set elsewhere or not needed to be set.
*
* @return the number of tasks that have been removed
*/
public int clearExecutionsAndQueue() {
public int clearExecutionsAndQueue(Runnable stoppedListener) {
assert stoppedListener != null;
int cancelledTaskCount = executor.queue().drainTo(new ArrayList<>());
this.clearExecutions();
this.clearExecutions(stoppedListener);
return cancelledTaskCount;
}

Expand Down Expand Up @@ -280,8 +289,10 @@ public WatchRecord execute(WatchExecutionContext ctx) {
ctx.setNodeId(clusterService.localNode().getId());
WatchRecord record = null;
final String watchId = ctx.id().watchId();
//pull this to a local reference since the class reference can be swapped, and need to ensure same object is used for put/remove
final CurrentExecutions currentExecutions = this.currentExecutions.get();
try {
boolean executionAlreadyExists = currentExecutions.get().put(watchId, new WatchExecution(ctx, Thread.currentThread()));
boolean executionAlreadyExists = currentExecutions.put(watchId, new WatchExecution(ctx, Thread.currentThread()));
if (executionAlreadyExists) {
logger.trace("not executing watch [{}] because it is already queued", watchId);
record = ctx.abortBeforeExecution(ExecutionState.NOT_EXECUTED_ALREADY_QUEUED, "Watch is already queued in thread pool");
Expand Down Expand Up @@ -336,7 +347,7 @@ record = createWatchRecord(record, ctx, e);

triggeredWatchStore.delete(ctx.id());
}
currentExecutions.get().remove(watchId);
currentExecutions.remove(watchId);
logger.debug("finished [{}]/[{}]", watchId, ctx.id());
}
return record;
Expand Down Expand Up @@ -580,11 +591,15 @@ public Counters executionTimes() {
/**
* This clears out the current executions and sets new empty current executions
* This is needed, because when this method is called, watcher keeps running, so sealing executions would be a bad idea
*
* @param stoppedListener The listener that will set Watcher state to: {@link WatcherState#STOPPED}, may be a no-op assuming the
* {@link WatcherState#STOPPED} is set elsewhere or not needed to be set.
*/
private void clearExecutions() {
private void clearExecutions(Runnable stoppedListener) {
assert stoppedListener != null;
final CurrentExecutions currentExecutionsBeforeSetting = currentExecutions.getAndSet(new CurrentExecutions());
// clear old executions in background, no need to wait
genericExecutor.execute(() -> currentExecutionsBeforeSetting.sealAndAwaitEmpty(maxStopTimeout));
genericExecutor.execute(() -> currentExecutionsBeforeSetting.sealAndAwaitEmpty(maxStopTimeout, stoppedListener));
}

// the watch execution task takes another runnable as parameter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.elasticsearch.xpack.core.watcher.WatcherState;
import org.elasticsearch.xpack.core.watcher.watch.Watch;
import org.junit.Before;
import org.mockito.ArgumentCaptor;
import org.mockito.stubbing.Answer;

import java.util.Collections;
Expand Down Expand Up @@ -133,8 +134,8 @@ public void testShutdown() {
when(watcherService.validate(clusterState)).thenReturn(true);

lifeCycleService.shutDown();
verify(watcherService, never()).stop(anyString());
verify(watcherService, times(1)).shutDown();
verify(watcherService, never()).stop(anyString(), any());
verify(watcherService, times(1)).shutDown(any());

reset(watcherService);
lifeCycleService.clusterChanged(new ClusterChangedEvent("any", clusterState, clusterState));
Expand Down Expand Up @@ -175,7 +176,12 @@ public void testManualStartStop() {
.build();

lifeCycleService.clusterChanged(new ClusterChangedEvent("foo", stoppedClusterState, clusterState));
verify(watcherService, times(1)).stop(eq("watcher manually marked to shutdown by cluster state update"));
ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class);
verify(watcherService, times(1))
.stop(eq("watcher manually marked to shutdown by cluster state update"), captor.capture());
assertEquals(WatcherState.STOPPING, lifeCycleService.getState());
captor.getValue().run();
assertEquals(WatcherState.STOPPED, lifeCycleService.getState());

// Starting via cluster state update, as the watcher metadata block is removed/set to true
reset(watcherService);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -269,8 +269,8 @@ void stopExecutor() {
csBuilder.metaData(MetaData.builder());

service.reload(csBuilder.build(), "whatever");
verify(executionService).clearExecutionsAndQueue();
verify(executionService, never()).pause();
verify(executionService).clearExecutionsAndQueue(any());
verify(executionService, never()).pause(any());
verify(triggerService).pauseExecution();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -538,10 +538,12 @@ protected void stopWatcher() throws Exception {
WatcherStatsResponse watcherStatsResponse = watcherClient().prepareWatcherStats().get();
assertThat(watcherStatsResponse.hasFailures(), is(false));
List<Tuple<String, WatcherState>> currentStatesFromStatsRequest = watcherStatsResponse.getNodes().stream()
.map(response -> Tuple.tuple(response.getNode().getName(), response.getWatcherState()))
.collect(Collectors.toList());
.map(response -> Tuple.tuple(response.getNode().getName() + " (" + response.getThreadPoolQueueSize() + ")",
response.getWatcherState())).collect(Collectors.toList());
List<WatcherState> states = currentStatesFromStatsRequest.stream().map(Tuple::v2).collect(Collectors.toList());



logger.info("waiting to stop watcher, current states {}", currentStatesFromStatsRequest);

boolean isAllStateStarted = states.stream().allMatch(w -> w == WatcherState.STARTED);
Expand All @@ -566,7 +568,7 @@ protected void stopWatcher() throws Exception {
}

throw new AssertionError("unexpected state, retrying with next run");
});
}, 30, TimeUnit.SECONDS);
}

public static class NoopEmailService extends EmailService {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.junit.Before;

import java.util.Collections;
import java.util.concurrent.TimeUnit;

import static java.util.Collections.emptyList;
import static java.util.Collections.emptyMap;
Expand Down Expand Up @@ -108,7 +109,7 @@ public void stopWatcher() throws Exception {
default:
throw new AssertionError("unknown state[" + state + "]");
}
});
}, 30, TimeUnit.SECONDS);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import java.io.IOException;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
Expand Down Expand Up @@ -91,8 +92,7 @@ public void startWatcher() throws Exception {

@After
public void stopWatcher() throws Exception {
adminClient().performRequest(new Request("DELETE", "/my_test_index"));


assertBusy(() -> {
try {
Response statsResponse = adminClient().performRequest(new Request("GET", "/_watcher/stats"));
Expand All @@ -118,7 +118,9 @@ public void stopWatcher() throws Exception {
} catch (IOException e) {
throw new AssertionError(e);
}
});
}, 30, TimeUnit.SECONDS);

adminClient().performRequest(new Request("DELETE", "/my_test_index"));
}

@Override
Expand Down
Loading

0 comments on commit e233f7a

Please sign in to comment.