Skip to content

Commit

Permalink
Use Threadpool Time in ClusterApplierService (#39679) (#39685)
Browse files Browse the repository at this point in the history
* Use threadpool's time in `ClusterApplierService` to allow for deterministic tests
* This is a part of/requirement for #39504
  • Loading branch information
original-brownbear authored Mar 5, 2019
1 parent 380dc27 commit e8d9744
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.threadpool.ThreadPool;

import java.util.Objects;
import java.util.concurrent.atomic.AtomicReference;
Expand All @@ -44,6 +45,7 @@ public class ClusterStateObserver {
private final Predicate<ClusterState> MATCH_ALL_CHANGES_PREDICATE = state -> true;

private final ClusterApplierService clusterApplierService;
private final ThreadPool threadPool;
private final ThreadContext contextHolder;
volatile TimeValue timeOutValue;

Expand All @@ -52,7 +54,7 @@ public class ClusterStateObserver {
final TimeoutClusterStateListener clusterStateListener = new ObserverClusterStateListener();
// observingContext is not null when waiting on cluster state changes
final AtomicReference<ObservingContext> observingContext = new AtomicReference<>(null);
volatile Long startTimeNS;
volatile Long startTimeMS;
volatile boolean timedOut;


Expand Down Expand Up @@ -81,10 +83,11 @@ public ClusterStateObserver(ClusterState initialState, ClusterService clusterSer
public ClusterStateObserver(ClusterState initialState, ClusterApplierService clusterApplierService, @Nullable TimeValue timeout,
Logger logger, ThreadContext contextHolder) {
this.clusterApplierService = clusterApplierService;
this.threadPool = clusterApplierService.threadPool();
this.lastObservedState = new AtomicReference<>(new StoredState(initialState));
this.timeOutValue = timeout;
if (timeOutValue != null) {
this.startTimeNS = System.nanoTime();
this.startTimeMS = threadPool.relativeTimeInMillis();
}
this.logger = logger;
this.contextHolder = contextHolder;
Expand Down Expand Up @@ -134,7 +137,7 @@ public void waitForNextChange(Listener listener, Predicate<ClusterState> statePr
if (timeOutValue == null) {
timeOutValue = this.timeOutValue;
if (timeOutValue != null) {
long timeSinceStartMS = TimeValue.nsecToMSec(System.nanoTime() - startTimeNS);
long timeSinceStartMS = threadPool.relativeTimeInMillis() - startTimeMS;
timeoutTimeLeftMS = timeOutValue.millis() - timeSinceStartMS;
if (timeoutTimeLeftMS <= 0L) {
// things have timeout while we were busy -> notify
Expand All @@ -150,7 +153,7 @@ public void waitForNextChange(Listener listener, Predicate<ClusterState> statePr
timeoutTimeLeftMS = null;
}
} else {
this.startTimeNS = System.nanoTime();
this.startTimeMS = threadPool.relativeTimeInMillis();
this.timeOutValue = timeOutValue;
timeoutTimeLeftMS = timeOutValue.millis();
timedOut = false;
Expand Down Expand Up @@ -240,7 +243,7 @@ public void onTimeout(TimeValue timeout) {
ObservingContext context = observingContext.getAndSet(null);
if (context != null) {
clusterApplierService.removeTimeoutListener(this);
long timeSinceStartMS = TimeValue.nsecToMSec(System.nanoTime() - startTimeNS);
long timeSinceStartMS = threadPool.relativeTimeInMillis() - startTimeMS;
logger.trace("observer: timeout notification from cluster service. timeout setting [{}], time since start [{}]",
timeOutValue, new TimeValue(timeSinceStartMS));
// update to latest, in case people want to retry
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,10 @@ public void runOnApplierThread(final String source, Consumer<ClusterState> clust
runOnApplierThread(source, clusterStateConsumer, listener, Priority.HIGH);
}

public ThreadPool threadPool() {
return threadPool;
}

@Override
public void onNewClusterState(final String source, final Supplier<ClusterState> clusterStateSupplier,
final ClusterApplyListener listener) {
Expand Down Expand Up @@ -383,12 +387,12 @@ protected void runTask(UpdateTask task) {
logger.debug("processing [{}]: execute", task.source);
final ClusterState previousClusterState = state.get();

long startTimeNS = currentTimeInNanos();
long startTimeMS = currentTimeInMillis();
final ClusterState newClusterState;
try {
newClusterState = task.apply(previousClusterState);
} catch (Exception e) {
TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, TimeValue.nsecToMSec(currentTimeInNanos() - startTimeNS)));
TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, currentTimeInMillis() - startTimeMS));
logger.trace(() -> new ParameterizedMessage(
"failed to execute cluster state applier in [{}], state:\nversion [{}], source [{}]\n{}",
executionTime, previousClusterState.version(), task.source, previousClusterState), e);
Expand All @@ -398,7 +402,7 @@ protected void runTask(UpdateTask task) {
}

if (previousClusterState == newClusterState) {
TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, TimeValue.nsecToMSec(currentTimeInNanos() - startTimeNS)));
TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, currentTimeInMillis() - startTimeMS));
logger.debug("processing [{}]: took [{}] no change in cluster state", task.source, executionTime);
warnAboutSlowTaskIfNeeded(executionTime, task.source);
task.listener.onSuccess(task.source);
Expand All @@ -411,14 +415,14 @@ protected void runTask(UpdateTask task) {
}
try {
applyChanges(task, previousClusterState, newClusterState);
TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, TimeValue.nsecToMSec(currentTimeInNanos() - startTimeNS)));
TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, currentTimeInMillis() - startTimeMS));
logger.debug("processing [{}]: took [{}] done applying updated cluster state (version: {}, uuid: {})", task.source,
executionTime, newClusterState.version(),
newClusterState.stateUUID());
warnAboutSlowTaskIfNeeded(executionTime, task.source);
task.listener.onSuccess(task.source);
} catch (Exception e) {
TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, TimeValue.nsecToMSec(currentTimeInNanos() - startTimeNS)));
TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, currentTimeInMillis() - startTimeMS));
if (logger.isTraceEnabled()) {
logger.warn(new ParameterizedMessage(
"failed to apply updated cluster state in [{}]:\nversion [{}], uuid [{}], source [{}]\n{}",
Expand Down Expand Up @@ -617,8 +621,8 @@ public void run() {
}

// this one is overridden in tests so we can control time
protected long currentTimeInNanos() {
return System.nanoTime();
protected long currentTimeInMillis() {
return threadPool.relativeTimeInMillis();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ public void tearDown() throws Exception {
super.tearDown();
}

TimedClusterApplierService createTimedClusterService(boolean makeMaster) throws InterruptedException {
TimedClusterApplierService createTimedClusterService(boolean makeMaster) {
DiscoveryNode localNode = new DiscoveryNode("node1", buildNewFakeTransportAddress(), emptyMap(),
emptySet(), Version.CURRENT);
TimedClusterApplierService timedClusterApplierService = new TimedClusterApplierService(Settings.builder().put("cluster.name",
Expand Down Expand Up @@ -141,9 +141,9 @@ public void testClusterStateUpdateLogging() throws Exception {
Logger clusterLogger = LogManager.getLogger(ClusterApplierService.class);
Loggers.addAppender(clusterLogger, mockAppender);
try {
clusterApplierService.currentTimeOverride = System.nanoTime();
clusterApplierService.currentTimeOverride = threadPool.relativeTimeInMillis();
clusterApplierService.runOnApplierThread("test1",
currentState -> clusterApplierService.currentTimeOverride += TimeValue.timeValueSeconds(1).nanos(),
currentState -> clusterApplierService.currentTimeOverride += TimeValue.timeValueSeconds(1).millis(),
new ClusterApplyListener() {
@Override
public void onSuccess(String source) { }
Expand All @@ -155,7 +155,7 @@ public void onFailure(String source, Exception e) {
});
clusterApplierService.runOnApplierThread("test2",
currentState -> {
clusterApplierService.currentTimeOverride += TimeValue.timeValueSeconds(2).nanos();
clusterApplierService.currentTimeOverride += TimeValue.timeValueSeconds(2).millis();
throw new IllegalArgumentException("Testing handling of exceptions in the cluster state task");
},
new ClusterApplyListener() {
Expand Down Expand Up @@ -214,9 +214,9 @@ public void testLongClusterStateUpdateLogging() throws Exception {
try {
final CountDownLatch latch = new CountDownLatch(4);
final CountDownLatch processedFirstTask = new CountDownLatch(1);
clusterApplierService.currentTimeOverride = System.nanoTime();
clusterApplierService.currentTimeOverride = threadPool.relativeTimeInMillis();
clusterApplierService.runOnApplierThread("test1",
currentState -> clusterApplierService.currentTimeOverride += TimeValue.timeValueSeconds(1).nanos(),
currentState -> clusterApplierService.currentTimeOverride += TimeValue.timeValueSeconds(1).millis(),
new ClusterApplyListener() {
@Override
public void onSuccess(String source) {
Expand All @@ -232,7 +232,7 @@ public void onFailure(String source, Exception e) {
processedFirstTask.await();
clusterApplierService.runOnApplierThread("test2",
currentState -> {
clusterApplierService.currentTimeOverride += TimeValue.timeValueSeconds(32).nanos();
clusterApplierService.currentTimeOverride += TimeValue.timeValueSeconds(32).millis();
throw new IllegalArgumentException("Testing handling of exceptions in the cluster state task");
},
new ClusterApplyListener() {
Expand All @@ -247,7 +247,7 @@ public void onFailure(String source, Exception e) {
}
});
clusterApplierService.runOnApplierThread("test3",
currentState -> clusterApplierService.currentTimeOverride += TimeValue.timeValueSeconds(34).nanos(),
currentState -> clusterApplierService.currentTimeOverride += TimeValue.timeValueSeconds(34).millis(),
new ClusterApplyListener() {
@Override
public void onSuccess(String source) {
Expand Down Expand Up @@ -510,11 +510,11 @@ static class TimedClusterApplierService extends ClusterApplierService {
}

@Override
protected long currentTimeInNanos() {
protected long currentTimeInMillis() {
if (currentTimeOverride != null) {
return currentTimeOverride;
}
return super.currentTimeInNanos();
return super.currentTimeInMillis();
}
}
}

0 comments on commit e8d9744

Please sign in to comment.