diff --git a/server/src/internalClusterTest/java/org/elasticsearch/reservedstate/service/FileSettingsServiceIT.java b/server/src/internalClusterTest/java/org/elasticsearch/reservedstate/service/FileSettingsServiceIT.java index 85f0e2cf7e3ff..7ac966271a995 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/reservedstate/service/FileSettingsServiceIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/reservedstate/service/FileSettingsServiceIT.java @@ -21,12 +21,14 @@ import org.elasticsearch.cluster.metadata.ReservedStateHandlerMetadata; import org.elasticsearch.cluster.metadata.ReservedStateMetadata; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.Priority; import org.elasticsearch.common.Randomness; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.core.Strings; import org.elasticsearch.core.Tuple; import org.elasticsearch.reservedstate.action.ReservedClusterSettingsAction; import org.elasticsearch.test.ESIntegTestCase; +import org.elasticsearch.test.ESTestCase; import org.junit.Before; import java.io.IOException; @@ -34,6 +36,7 @@ import java.nio.file.Path; import java.nio.file.StandardCopyOption; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; @@ -357,7 +360,7 @@ private void assertClusterStateNotSaved(CountDownLatch savedClusterState, Atomic updateClusterSettings(Settings.builder().put("search.allow_expensive_queries", "false")); } - public void testErrorSaved() throws Exception { + public void testErrorNotSaved() throws Exception { internalCluster().setBootstrapMasterNodeIndex(0); logger.info("--> start data node / non master node"); String dataNode = internalCluster().startNode(Settings.builder().put(dataOnlyNode()).put("discovery.initial_state_timeout", "1s")); @@ -381,6 +384,40 @@ public void testErrorSaved() throws Exception { assertClusterStateNotSaved(savedClusterState.v1(), savedClusterState.v2()); } + public void testLastSettingsInBatchApplied() throws Exception { + internalCluster().setBootstrapMasterNodeIndex(0); + logger.info("--> start data node / non master node"); + String dataNode = internalCluster().startNode(Settings.builder().put(dataOnlyNode()).put("discovery.initial_state_timeout", "1s")); + FileSettingsService dataFileSettingsService = internalCluster().getInstance(FileSettingsService.class, dataNode); + + assertFalse(dataFileSettingsService.watching()); + + logger.info("--> start master node"); + final String masterNode = internalCluster().startMasterOnlyNode(); + assertMasterNode(internalCluster().nonMasterClient(), masterNode); + var savedClusterState = setupClusterStateListener(masterNode); + + FileSettingsService masterFileSettingsService = internalCluster().getInstance(FileSettingsService.class, masterNode); + + assertTrue(masterFileSettingsService.watching()); + assertFalse(dataFileSettingsService.watching()); + + final var masterNodeClusterService = internalCluster().getCurrentMasterNodeInstance(ClusterService.class); + final var barrier = new CyclicBarrier(2); + masterNodeClusterService.createTaskQueue("block", Priority.NORMAL, batchExecutionContext -> { + safeAwait(barrier); + safeAwait(barrier); + batchExecutionContext.taskContexts().forEach(c -> c.success(() -> {})); + return batchExecutionContext.initialState(); + }).submitTask("block", ESTestCase::fail, null); + + safeAwait(barrier); + writeJSONFile(masterNode, testJSON, logger, versionCounter.incrementAndGet()); // Valid but skipped + writeJSONFile(masterNode, testJSON43mb, logger, versionCounter.incrementAndGet()); // The last valid setting + safeAwait(barrier); + assertClusterStateSaveOK(savedClusterState.v1(), savedClusterState.v2(), "43mb"); + } + public void testErrorCanRecoverOnRestart() throws Exception { internalCluster().setBootstrapMasterNodeIndex(0); logger.info("--> start data node / non master node"); diff --git a/server/src/main/java/org/elasticsearch/reservedstate/service/ReservedStateUpdateTask.java b/server/src/main/java/org/elasticsearch/reservedstate/service/ReservedStateUpdateTask.java index 90ae9923910d1..a5f3822ef803f 100644 --- a/server/src/main/java/org/elasticsearch/reservedstate/service/ReservedStateUpdateTask.java +++ b/server/src/main/java/org/elasticsearch/reservedstate/service/ReservedStateUpdateTask.java @@ -26,6 +26,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.Comparator; import java.util.List; import java.util.Map; import java.util.Set; @@ -70,6 +71,13 @@ public ReservedStateUpdateTask( this.listener = listener; } + /** + * @return true if {@code this} would overwrite the effects of {@code prev} assuming {@code this} is executed later. + */ + public boolean supersedes(ReservedStateUpdateTask prev) { + return versionCheck.test(prev.stateChunk.metadata().version(), this.stateChunk.metadata().version()); + } + @Override public void onFailure(Exception e) { listener.onFailure(e); @@ -213,4 +221,13 @@ static boolean checkMetadataVersion( return false; } + @Override + public String toString() { + return "ReservedStateUpdateTask{" + "namespace='" + namespace + '\'' + ", metadata=" + stateChunk.metadata() + '}'; + } + + /** + * x < y if x.{@linkplain #supersedes}(y) + */ + public static final Comparator SUPERSEDING_FIRST = (x, y) -> x.supersedes(y) ? -1 : y.supersedes(x) ? 1 : 0; } diff --git a/server/src/main/java/org/elasticsearch/reservedstate/service/ReservedStateUpdateTaskExecutor.java b/server/src/main/java/org/elasticsearch/reservedstate/service/ReservedStateUpdateTaskExecutor.java index eb81b94ba53ed..2517b8113e3fb 100644 --- a/server/src/main/java/org/elasticsearch/reservedstate/service/ReservedStateUpdateTaskExecutor.java +++ b/server/src/main/java/org/elasticsearch/reservedstate/service/ReservedStateUpdateTaskExecutor.java @@ -14,15 +14,19 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionResponse; import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.SimpleBatchedExecutor; +import org.elasticsearch.cluster.ClusterStateTaskExecutor; import org.elasticsearch.cluster.routing.RerouteService; import org.elasticsearch.common.Priority; -import org.elasticsearch.core.Tuple; + +import java.util.ArrayList; + +import static java.util.Comparator.comparing; +import static org.elasticsearch.reservedstate.service.ReservedStateUpdateTask.SUPERSEDING_FIRST; /** * Reserved cluster state update task executor */ -public class ReservedStateUpdateTaskExecutor extends SimpleBatchedExecutor { +public class ReservedStateUpdateTaskExecutor implements ClusterStateTaskExecutor { private static final Logger logger = LogManager.getLogger(ReservedStateUpdateTaskExecutor.class); @@ -34,17 +38,56 @@ public ReservedStateUpdateTaskExecutor(RerouteService rerouteService) { } @Override - public Tuple executeTask(ReservedStateUpdateTask task, ClusterState clusterState) { - return Tuple.tuple(task.execute(clusterState), null); - } + public final ClusterState execute(BatchExecutionContext batchExecutionContext) throws Exception { + var initState = batchExecutionContext.initialState(); + var taskContexts = batchExecutionContext.taskContexts(); + if (taskContexts.isEmpty()) { + return initState; + } - @Override - public void taskSucceeded(ReservedStateUpdateTask task, Void unused) { - task.listener().onResponse(ActionResponse.Empty.INSTANCE); + // In a given batch of update tasks, only one will actually take effect, + // and we want to execute only that task, because if we execute all the tasks + // one after another, that will require all the tasks to be capable of executing + // correctly even without prior state updates being applied. + // + // The correct task to run would be whichever one would take effect if we were to + // run the tasks one-per-batch. In effect, this is the task with the highest version number; + // if multiple tasks have the same version number, their ReservedStateVersionCheck fields + // will be used to break the tie. + // + // One wrinkle is: if the task fails, then we will know retroactively that it was + // not the task that actually took effect, and we must then identify which of the + // remaining tasks would have taken effect. We achieve this by sorting the tasks + // using the SUPERSEDING_FIRST comparator. + + var candidates = new ArrayList<>(taskContexts); + candidates.sort(comparing(TaskContext::getTask, SUPERSEDING_FIRST)); + for (var iter = candidates.iterator(); iter.hasNext();) { + TaskContext taskContext = iter.next(); + logger.info("Effective task: {}", taskContext.getTask()); + ClusterState clusterState = initState; + try (var ignored = taskContext.captureResponseHeaders()) { + var task = taskContext.getTask(); + clusterState = task.execute(clusterState); + taskContext.success(() -> task.listener().onResponse(ActionResponse.Empty.INSTANCE)); + logger.debug("-> Update task succeeded"); + // All the others conceptually "succeeded" and then were superseded by the effective task + iter.forEachRemaining(c -> c.success(() -> c.getTask().listener().onResponse(ActionResponse.Empty.INSTANCE))); + return clusterState; + } catch (Exception e) { + taskContext.onFailure(e); + if (candidates.isEmpty() == false) { + logger.warn("-> Update task failed; will try the previous update task"); + } + } + } + + logger.warn("All {} update tasks failed; returning initial state", taskContexts.size()); + return initState; } @Override - public void clusterStatePublished() { + public final void clusterStatePublished(ClusterState newClusterState) { rerouteService.reroute( "reroute after saving and reserving part of the cluster state", Priority.NORMAL, @@ -54,4 +97,5 @@ public void clusterStatePublished() { ) ); } + } diff --git a/server/src/test/java/org/elasticsearch/reservedstate/service/ReservedClusterStateServiceTests.java b/server/src/test/java/org/elasticsearch/reservedstate/service/ReservedClusterStateServiceTests.java index efe3566064170..a333b36e2861e 100644 --- a/server/src/test/java/org/elasticsearch/reservedstate/service/ReservedClusterStateServiceTests.java +++ b/server/src/test/java/org/elasticsearch/reservedstate/service/ReservedClusterStateServiceTests.java @@ -39,6 +39,7 @@ import org.mockito.ArgumentMatchers; import java.io.IOException; +import java.util.ArrayList; import java.util.Collection; import java.util.LinkedHashSet; import java.util.List; @@ -48,6 +49,13 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; +import static org.elasticsearch.reservedstate.service.ReservedClusterStateServiceTests.MockUpdateSpec.higher; +import static org.elasticsearch.reservedstate.service.ReservedClusterStateServiceTests.MockUpdateSpec.higherOrSame; +import static org.elasticsearch.reservedstate.service.ReservedClusterStateServiceTests.TaskState.FAILED; +import static org.elasticsearch.reservedstate.service.ReservedClusterStateServiceTests.TaskState.INCOMPLETE; +import static org.elasticsearch.reservedstate.service.ReservedClusterStateServiceTests.TaskState.SUCCEEDED; +import static org.elasticsearch.reservedstate.service.ReservedStateVersionCheck.HIGHER_OR_SAME_VERSION; +import static org.elasticsearch.reservedstate.service.ReservedStateVersionCheck.HIGHER_VERSION_ONLY; import static org.hamcrest.Matchers.anyOf; import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.containsString; @@ -63,7 +71,9 @@ import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.reset; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -77,13 +87,28 @@ private static MasterServiceTaskQueue mo return (MasterServiceTaskQueue) mock(MasterServiceTaskQueue.class); } - private static class TestTaskContext implements ClusterStateTaskExecutor.TaskContext { + enum TaskState { + INCOMPLETE, + FAILED, + + /** + * Also used for when a task was skipped because another task takes precedence and that one succeeded. + */ + SUCCEEDED, + } + + static class TestTaskContext implements ClusterStateTaskExecutor.TaskContext { private final T task; + private final AtomicReference state = new AtomicReference<>(INCOMPLETE); private TestTaskContext(T task) { this.task = task; } + public TaskState getState() { + return state.get(); + } + @Override public T getTask() { return task; @@ -91,25 +116,40 @@ public T getTask() { @Override public void success(Runnable onPublicationSuccess) { + assert state.get() == INCOMPLETE; onPublicationSuccess.run(); + setCompletedState(SUCCEEDED); } @Override - public void success(Consumer publishedStateConsumer) {} + public void success(Consumer publishedStateConsumer) { + setCompletedState(SUCCEEDED); + } @Override - public void success(Runnable onPublicationSuccess, ClusterStateAckListener clusterStateAckListener) {} + public void success(Runnable onPublicationSuccess, ClusterStateAckListener clusterStateAckListener) { + setCompletedState(SUCCEEDED); + } @Override - public void success(Consumer publishedStateConsumer, ClusterStateAckListener clusterStateAckListener) {} + public void success(Consumer publishedStateConsumer, ClusterStateAckListener clusterStateAckListener) { + setCompletedState(SUCCEEDED); + } @Override - public void onFailure(Exception failure) {} + public void onFailure(Exception failure) { + setCompletedState(FAILED); + } @Override public Releasable captureResponseHeaders() { return null; } + + private void setCompletedState(TaskState newState) { + var prev = state.getAndSet(newState); + assert prev == INCOMPLETE; + } } private static class TestStateHandler implements ReservedClusterStateHandler> { @@ -171,7 +211,7 @@ public void testOperatorController() throws IOException { controller.process( "operator", parser, - randomFrom(ReservedStateVersionCheck.HIGHER_VERSION_ONLY, ReservedStateVersionCheck.HIGHER_OR_SAME_VERSION), + randomFrom(HIGHER_VERSION_ONLY, ReservedStateVersionCheck.HIGHER_OR_SAME_VERSION), x::set ); @@ -206,7 +246,7 @@ public void testOperatorController() throws IOException { controller.process( "operator", parser, - randomFrom(ReservedStateVersionCheck.HIGHER_VERSION_ONLY, ReservedStateVersionCheck.HIGHER_OR_SAME_VERSION), + randomFrom(HIGHER_VERSION_ONLY, ReservedStateVersionCheck.HIGHER_OR_SAME_VERSION), Assert::assertNull ); } @@ -247,15 +287,7 @@ public void testUpdateStateTasks() throws Exception { AtomicBoolean successCalled = new AtomicBoolean(false); ReservedStateUpdateTask task = spy( - new ReservedStateUpdateTask( - "test", - null, - ReservedStateVersionCheck.HIGHER_VERSION_ONLY, - Map.of(), - Set.of(), - errorState -> {}, - ActionListener.noop() - ) + new ReservedStateUpdateTask("test", null, HIGHER_VERSION_ONLY, Map.of(), Set.of(), errorState -> {}, ActionListener.noop()) ); doReturn(state).when(task).execute(any()); @@ -279,6 +311,146 @@ public void success(Runnable onPublicationSuccess) { verify(rerouteService, times(1)).reroute(anyString(), any(), any()); } + public void testBatchLastUpdateIsApplied() throws Exception { + ClusterName clusterName = new ClusterName("test"); + var updates = mockUpdateSequence(clusterName, List.of(higher(1001), higher(1002))); + ClusterState state0 = ClusterState.builder(clusterName).version(1000).build(); + ClusterState newState = new ReservedStateUpdateTaskExecutor(mock(RerouteService.class)).execute( + new ClusterStateTaskExecutor.BatchExecutionContext<>(state0, updates.contexts, () -> null) + ); + + assertThat("State should be the final state", newState, sameInstance(updates.states().get(updates.states().size() - 1))); + + // Only process the final task; the intermediate ones can be skipped + verify(updates.tasks().get(0), times(0)).execute(any()); + verify(updates.tasks().get(1), times(1)).execute(any()); + + assertTaskStates(updates, SUCCEEDED, SUCCEEDED); + } + + public void testBatchLastSuccessfulUpdateIsApplied() throws Exception { + ClusterName clusterName = new ClusterName("test"); + + var updates = mockUpdateSequence(clusterName, List.of(higher(1001), higher(1002), higher(1003))); + + // Inject an error in the last update + reset(updates.tasks().get(2)); + doThrow(UnsupportedOperationException.class).when(updates.tasks().get(2)).execute(any()); + + ClusterState state0 = ClusterState.builder(clusterName).version(1000).build(); + ClusterState newState = new ReservedStateUpdateTaskExecutor(mock(RerouteService.class)).execute( + new ClusterStateTaskExecutor.BatchExecutionContext<>(state0, updates.contexts, () -> null) + ); + + assertThat("State should be the last successful state", newState, sameInstance(updates.states().get(1))); + + assertTaskStates(updates, SUCCEEDED, SUCCEEDED, FAILED); + + // Only process the final task; the intermediate ones can be skipped + verify(updates.tasks().get(2), times(1)).execute(any()); // Tried the last one, it failed + verify(updates.tasks().get(1), times(1)).execute(any()); // Tried the second-last one, it succeeded + verify(updates.tasks().get(0), times(0)).execute(any()); // Didn't bother trying the first one + } + + public void testBatchHigherVersionEarlierWins() throws Exception { + ClusterName clusterName = new ClusterName("test"); + var updates = mockUpdateSequence(clusterName, List.of(higher(1001), higher(1003), higher(1002))); // Out of order + ClusterState state0 = ClusterState.builder(clusterName).version(1000).build(); + ClusterState newState = new ReservedStateUpdateTaskExecutor(mock(RerouteService.class)).execute( + new ClusterStateTaskExecutor.BatchExecutionContext<>(state0, updates.contexts, () -> null) + ); + + assertThat("State should be the highest-versioned state", newState, sameInstance(updates.states().get(1))); + + assertTaskStates(updates, SUCCEEDED, SUCCEEDED, SUCCEEDED); + + verify(updates.tasks().get(0), times(0)).execute(any()); + verify(updates.tasks().get(1), times(1)).execute(any()); + verify(updates.tasks().get(2), times(0)).execute(any()); // Prior task had higher version + } + + public void testBatchEqualVersionEarlierWins() throws Exception { + ClusterName clusterName = new ClusterName("test"); + var updates = mockUpdateSequence(clusterName, List.of(higher(1003), higher(1003), higher(1002))); + ClusterState state0 = ClusterState.builder(clusterName).version(1000).build(); + ClusterState newState = new ReservedStateUpdateTaskExecutor(mock(RerouteService.class)).execute( + new ClusterStateTaskExecutor.BatchExecutionContext<>(state0, updates.contexts, () -> null) + ); + + assertThat("State should be the first instance of the highest-versioned state", newState, sameInstance(updates.states().get(0))); + + assertTaskStates(updates, SUCCEEDED, SUCCEEDED, SUCCEEDED); + + // Only process the highest-version task; the others can be skipped + verify(updates.tasks().get(0), times(1)).execute(any()); + verify(updates.tasks().get(1), times(0)).execute(any()); // Prior task already had the same version + verify(updates.tasks().get(2), times(0)).execute(any()); + } + + public void testBatchEqualVersionLaterWins() throws Exception { + ClusterName clusterName = new ClusterName("test"); + var updates = mockUpdateSequence(clusterName, List.of(higher(1003), higherOrSame(1003), higher(1002))); + ClusterState state0 = ClusterState.builder(clusterName).version(1000).build(); + var batch = new ClusterStateTaskExecutor.BatchExecutionContext<>(state0, updates.contexts, () -> null); + ClusterState newState = new ReservedStateUpdateTaskExecutor(mock(RerouteService.class)).execute(batch); + + assertThat("State should be the last instance of the highest-versioned state", newState, sameInstance(updates.states().get(1))); + + assertTaskStates(updates, SUCCEEDED, SUCCEEDED, SUCCEEDED); + + // Only process the highest-version task; the others can be skipped + verify(updates.tasks().get(0), times(0)).execute(any()); // Next task has same version and uses higherOrSame + verify(updates.tasks().get(1), times(1)).execute(any()); + verify(updates.tasks().get(2), times(0)).execute(any()); + } + + record MockUpdateSpec(long version, ReservedStateVersionCheck check) { + public static MockUpdateSpec higher(long version) { + return new MockUpdateSpec(version, HIGHER_VERSION_ONLY); + } + + public static MockUpdateSpec higherOrSame(long version) { + return new MockUpdateSpec(version, HIGHER_OR_SAME_VERSION); + } + } + + /** + * @param tasks Mockito spies configured to return a specific state + * @param states the corresponding states returned by {@link #tasks} + */ + record MockUpdateSequence( + List> contexts, + List tasks, + List states + ) {} + + private MockUpdateSequence mockUpdateSequence(ClusterName clusterName, List specs) { + List tasks = new ArrayList<>(specs.size()); + List states = new ArrayList<>(specs.size()); + for (var spec : specs) { + var stateChunk = new ReservedStateChunk(Map.of(), new ReservedStateVersion(spec.version(), BuildVersion.current())); + ReservedStateUpdateTask realTask = new ReservedStateUpdateTask( + clusterName.value(), + stateChunk, + spec.check(), + Map.of(), + Set.of(), + errorState -> fail("Unexpected error"), + ActionListener.noop() + ); + ClusterState state = ClusterState.builder(clusterName).version(spec.version()).build(); + ReservedStateUpdateTask task = spy(realTask); + doReturn(state).when(task).execute(any()); + tasks.add(task); + states.add(state); + } + return new MockUpdateSequence(tasks.stream().map(TestTaskContext::new).toList(), tasks, states); + } + + private void assertTaskStates(MockUpdateSequence updates, TaskState... stateSequence) { + assertEquals(List.of(stateSequence), updates.contexts().stream().map(TestTaskContext::getState).toList()); + } + public void testUpdateErrorState() { ClusterService clusterService = mock(ClusterService.class); ClusterState state = ClusterState.builder(new ClusterName("test")).build(); @@ -297,7 +469,7 @@ public void testUpdateErrorState() { ErrorState error = new ErrorState( "namespace", 2L, - ReservedStateVersionCheck.HIGHER_VERSION_ONLY, + HIGHER_VERSION_ONLY, List.of("error"), ReservedStateErrorMetadata.ErrorKind.TRANSIENT ); @@ -324,7 +496,7 @@ public void testUpdateErrorState() { ErrorState oldError = new ErrorState( "namespace", 1L, - ReservedStateVersionCheck.HIGHER_VERSION_ONLY, + HIGHER_VERSION_ONLY, List.of("old error"), ReservedStateErrorMetadata.ErrorKind.TRANSIENT ); @@ -342,7 +514,7 @@ public void testErrorStateTask() throws Exception { new ErrorState( "test", 1L, - ReservedStateVersionCheck.HIGHER_VERSION_ONLY, + HIGHER_VERSION_ONLY, List.of("some parse error", "some io error"), ReservedStateErrorMetadata.ErrorKind.PARSING ), @@ -390,23 +562,23 @@ public TransformState transform(Object source, TransformState prevState) throws Metadata metadata = Metadata.builder().put(operatorMetadata).build(); ClusterState state = ClusterState.builder(new ClusterName("test")).metadata(metadata).build(); - assertFalse(ReservedStateErrorTask.isNewError(operatorMetadata, 2L, ReservedStateVersionCheck.HIGHER_VERSION_ONLY)); - assertFalse(ReservedStateErrorTask.isNewError(operatorMetadata, 1L, ReservedStateVersionCheck.HIGHER_VERSION_ONLY)); + assertFalse(ReservedStateErrorTask.isNewError(operatorMetadata, 2L, HIGHER_VERSION_ONLY)); + assertFalse(ReservedStateErrorTask.isNewError(operatorMetadata, 1L, HIGHER_VERSION_ONLY)); assertTrue(ReservedStateErrorTask.isNewError(operatorMetadata, 2L, ReservedStateVersionCheck.HIGHER_OR_SAME_VERSION)); - assertTrue(ReservedStateErrorTask.isNewError(operatorMetadata, 3L, ReservedStateVersionCheck.HIGHER_VERSION_ONLY)); - assertTrue(ReservedStateErrorTask.isNewError(null, 1L, ReservedStateVersionCheck.HIGHER_VERSION_ONLY)); + assertTrue(ReservedStateErrorTask.isNewError(operatorMetadata, 3L, HIGHER_VERSION_ONLY)); + assertTrue(ReservedStateErrorTask.isNewError(null, 1L, HIGHER_VERSION_ONLY)); assertTrue(ReservedStateErrorTask.isNewError(null, 1L, ReservedStateVersionCheck.HIGHER_OR_SAME_VERSION)); var chunk = new ReservedStateChunk(Map.of("one", "two", "maker", "three"), new ReservedStateVersion(2L, BuildVersion.current())); var orderedHandlers = List.of(exceptionThrower.name(), newStateMaker.name()); - // We submit a task with two handler, one will cause an exception, the other will create a new state. + // We submit a task with two handlers, one will cause an exception, the other will create a new state. // When we fail to update the metadata because of version, we ensure that the returned state is equal to the // original state by pointer reference to avoid cluster state update task to run. ReservedStateUpdateTask task = new ReservedStateUpdateTask( "namespace_one", chunk, - ReservedStateVersionCheck.HIGHER_VERSION_ONLY, + HIGHER_VERSION_ONLY, Map.of(exceptionThrower.name(), exceptionThrower, newStateMaker.name(), newStateMaker), orderedHandlers, errorState -> assertFalse(ReservedStateErrorTask.isNewError(operatorMetadata, errorState.version(), errorState.versionCheck())), @@ -458,7 +630,7 @@ public void testCheckMetadataVersion() { ReservedStateUpdateTask task = new ReservedStateUpdateTask( "test", new ReservedStateChunk(Map.of(), new ReservedStateVersion(124L, BuildVersion.current())), - ReservedStateVersionCheck.HIGHER_VERSION_ONLY, + HIGHER_VERSION_ONLY, Map.of(), List.of(), e -> {}, @@ -468,7 +640,7 @@ public void testCheckMetadataVersion() { task = new ReservedStateUpdateTask( "test", new ReservedStateChunk(Map.of(), new ReservedStateVersion(124L, BuildVersion.current())), - ReservedStateVersionCheck.HIGHER_VERSION_ONLY, + HIGHER_VERSION_ONLY, Map.of(), List.of(), e -> {}, @@ -479,7 +651,7 @@ public void testCheckMetadataVersion() { task = new ReservedStateUpdateTask( "test", new ReservedStateChunk(Map.of(), new ReservedStateVersion(123L, BuildVersion.current())), - ReservedStateVersionCheck.HIGHER_VERSION_ONLY, + HIGHER_VERSION_ONLY, Map.of(), List.of(), e -> {}, @@ -500,7 +672,7 @@ public void testCheckMetadataVersion() { task = new ReservedStateUpdateTask( "test", new ReservedStateChunk(Map.of(), new ReservedStateVersion(122L, BuildVersion.current())), - ReservedStateVersionCheck.HIGHER_VERSION_ONLY, + HIGHER_VERSION_ONLY, Map.of(), List.of(), e -> {}, @@ -521,7 +693,7 @@ public void testCheckMetadataVersion() { task = new ReservedStateUpdateTask( "test", new ReservedStateChunk(Map.of(), new ReservedStateVersion(124L, BuildVersionTests.increment(BuildVersion.current()))), - ReservedStateVersionCheck.HIGHER_VERSION_ONLY, + HIGHER_VERSION_ONLY, Map.of(), List.of(), e -> {}, @@ -625,11 +797,11 @@ public void testCheckAndReportError() { final var controller = spy(new ReservedClusterStateService(clusterService, mock(RerouteService.class), List.of())); - assertNull(controller.checkAndReportError("test", List.of(), null, ReservedStateVersionCheck.HIGHER_VERSION_ONLY)); + assertNull(controller.checkAndReportError("test", List.of(), null, HIGHER_VERSION_ONLY)); verify(controller, times(0)).updateErrorState(any()); var version = new ReservedStateVersion(2L, BuildVersion.current()); - var error = controller.checkAndReportError("test", List.of("test error"), version, ReservedStateVersionCheck.HIGHER_VERSION_ONLY); + var error = controller.checkAndReportError("test", List.of("test error"), version, HIGHER_VERSION_ONLY); assertThat(error, instanceOf(IllegalStateException.class)); assertThat(error.getMessage(), is("Error processing state change request for test, errors: test error")); verify(controller, times(1)).updateErrorState(any());