Skip to content

Commit

Permalink
Hoisted by my own pedantic petard
Browse files Browse the repository at this point in the history
  • Loading branch information
DaveCTurner committed Aug 20, 2021
1 parent 9bdb2c9 commit 211c13e
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ private void setupFakeMasterServiceAndCoordinator(long term, ClusterState initia
fakeThreadPool, deterministicTaskQueue::scheduleNow);
setupMasterServiceAndCoordinator(term, initialState, fakeMasterService, fakeThreadPool, Randomness.get(), nodeHealthService);
fakeMasterService.setClusterStatePublisher((clusterStatePublicationEvent, publishListener, ackListener) -> {
ClusterServiceUtils.setAllElapsedMillis(clusterStatePublicationEvent);
coordinator.handlePublishRequest(new PublishRequest(clusterStatePublicationEvent.getNewState()));
publishListener.onResponse(null);
});
Expand All @@ -132,6 +133,7 @@ private void setupRealMasterServiceAndCoordinator(long term, ClusterState initia
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), threadPool);
AtomicReference<ClusterState> clusterStateRef = new AtomicReference<>(initialState);
masterService.setClusterStatePublisher((clusterStatePublicationEvent, publishListener, ackListener) -> {
ClusterServiceUtils.setAllElapsedMillis(clusterStatePublicationEvent);
clusterStateRef.set(clusterStatePublicationEvent.getNewState());
publishListener.onResponse(null);
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.node.Node;
import org.elasticsearch.test.ClusterServiceUtils;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.MockLogAppender;
import org.elasticsearch.test.junit.annotations.TestLogging;
Expand Down Expand Up @@ -118,6 +119,7 @@ private MasterService createMasterService(boolean makeMaster) {
final AtomicReference<ClusterState> clusterStateRef = new AtomicReference<>(initialClusterState);
masterService.setClusterStatePublisher((clusterStatePublicationEvent, publishListener, ackListener) -> {
clusterStateRef.set(clusterStatePublicationEvent.getNewState());
ClusterServiceUtils.setAllElapsedMillis(clusterStatePublicationEvent);
publishListener.onResponse(null);
});
masterService.setClusterStateSupplier(clusterStateRef::get);
Expand Down Expand Up @@ -726,6 +728,7 @@ public void testLongClusterStateUpdateLogging() throws Exception {
.blocks(ClusterBlocks.EMPTY_CLUSTER_BLOCK).build();
final AtomicReference<ClusterState> clusterStateRef = new AtomicReference<>(initialClusterState);
masterService.setClusterStatePublisher((clusterStatePublicationEvent, publishListener, ackListener) -> {
ClusterServiceUtils.setAllElapsedMillis(clusterStatePublicationEvent);
if (clusterStatePublicationEvent.getSummary().contains("test5")) {
relativeTimeInMillis += MasterService.MASTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING.get(Settings.EMPTY).millis()
+ randomLongBetween(1, 1000000);
Expand Down Expand Up @@ -894,7 +897,10 @@ public void testAcking() throws InterruptedException {
.masterNodeId(node1.getId()))
.blocks(ClusterBlocks.EMPTY_CLUSTER_BLOCK).build();
final AtomicReference<ClusterStatePublisher> publisherRef = new AtomicReference<>();
masterService.setClusterStatePublisher((e, pl, al) -> publisherRef.get().publish(e, pl, al));
masterService.setClusterStatePublisher((e, pl, al) -> {
ClusterServiceUtils.setAllElapsedMillis(e);
publisherRef.get().publish(e, pl, al);
});
masterService.setClusterStateSupplier(() -> initialClusterState);
masterService.start();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStatePublicationEvent;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.NodeConnectionsService;
import org.elasticsearch.cluster.block.ClusterBlocks;
Expand Down Expand Up @@ -135,7 +136,8 @@ public void disconnectFromNodesExcept(DiscoveryNodes nodesToKeep) {
}

public static ClusterStatePublisher createClusterStatePublisher(ClusterApplier clusterApplier) {
return (clusterStatePublicationEvent, publishListener, ackListener) ->
return (clusterStatePublicationEvent, publishListener, ackListener) -> {
setAllElapsedMillis(clusterStatePublicationEvent);
clusterApplier.onNewClusterState(
"mock_publish_to_self[" + clusterStatePublicationEvent.getSummary() + "]",
clusterStatePublicationEvent::getNewState,
Expand All @@ -149,8 +151,8 @@ public void onSuccess(String source) {
public void onFailure(String source, Exception e) {
publishListener.onFailure(e);
}
}
);
});
};
}

public static ClusterService createClusterService(ClusterState initialState, ThreadPool threadPool) {
Expand All @@ -169,4 +171,12 @@ public static void setState(ClusterService clusterService, ClusterState.Builder
public static void setState(ClusterService clusterService, ClusterState clusterState) {
setState(clusterService.getClusterApplierService(), clusterState);
}

public static void setAllElapsedMillis(ClusterStatePublicationEvent clusterStatePublicationEvent) {
clusterStatePublicationEvent.setPublicationContextConstructionElapsedMillis(0L);
clusterStatePublicationEvent.setPublicationCommitElapsedMillis(0L);
clusterStatePublicationEvent.setPublicationCompletionElapsedMillis(0L);
clusterStatePublicationEvent.setMasterApplyElapsedMillis(0L);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.elasticsearch.cluster.node.DiscoveryNodeRole;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.test.ClusterServiceUtils;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool;

Expand Down Expand Up @@ -58,6 +59,7 @@ public void testFakeMasterService() {
FakeThreadPoolMasterService masterService = new FakeThreadPoolMasterService("test_node","test", mockThreadPool, runnableTasks::add);
masterService.setClusterStateSupplier(lastClusterStateRef::get);
masterService.setClusterStatePublisher((clusterStatePublicationEvent, publishListener, ackListener) -> {
ClusterServiceUtils.setAllElapsedMillis(clusterStatePublicationEvent);
lastClusterStateRef.set(clusterStatePublicationEvent.getNewState());
publishingCallback.set(publishListener);
});
Expand Down

0 comments on commit 211c13e

Please sign in to comment.