Skip to content

Commit

Permalink
remove case
Browse files Browse the repository at this point in the history
  • Loading branch information
ywangd committed Oct 10, 2023
1 parent 3e326e3 commit 19f89ba
Showing 1 changed file with 77 additions and 79 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ public class GatewayServiceTests extends ESTestCase {
public void setUp() throws Exception {
super.setUp();
deterministicTaskQueue = new DeterministicTaskQueue();
assertThat(deterministicTaskQueue.getCurrentTimeMillis(), equalTo(0L));
rerouteCount = new AtomicInteger();
dataNodeIdPrefix = randomAlphaOfLength(10) + "-";
}
Expand Down Expand Up @@ -212,7 +213,6 @@ public void testImmediateRecovery() {
assertThat(rerouteCount.get(), equalTo(0));

// Recover immediately
final long initialTimeInMillis = deterministicTaskQueue.getCurrentTimeMillis();
final var setClusterStateTaskQueue = createSetClusterStateTaskQueue(clusterService);
final ClusterState clusterStateOfTerm1 = incrementTerm(initialState);
setClusterStateTaskQueue.submitTask(randomAlphaOfLength(5), new SetClusterStateTask(clusterStateOfTerm1), null);
Expand Down Expand Up @@ -244,26 +244,84 @@ public void testImmediateRecovery() {

// Never ran any scheduled task since recovery is immediate
assertThat(deterministicTaskQueue.hasDeferredTasks(), is(false));
assertThat(deterministicTaskQueue.getCurrentTimeMillis(), equalTo(initialTimeInMillis));
assertThat(deterministicTaskQueue.getCurrentTimeMillis(), equalTo(0L));
}

public void testScheduledRecovery() {
final var hasRecoverAfterTime = randomBoolean();
createGatewayServiceForScheduledRecovery(randomIntBetween(2, 5), hasRecoverAfterTime);

// Recover when the scheduled recovery is ready to run
deterministicTaskQueue.runAllTasksInTimeOrder();
assertClusterStateBlocks(false);
assertThat(rerouteCount.get(), equalTo(1));
assertTimeElapsed(TimeValue.timeValueMinutes(hasRecoverAfterTime ? 10 : 5).millis());
}

public void testScheduledRecoveryCancelledWhenClusterCanRecoverImmediately() {
final var expectedNumberOfDataNodes = randomIntBetween(2, 5);
final boolean hasRecoverAfterTime = randomBoolean();
final var gatewayService = createGatewayServiceForScheduledRecovery(expectedNumberOfDataNodes, hasRecoverAfterTime);
final var pendingStateRecoveryOfTerm1 = gatewayService.currentPendingStateRecovery;

// The 1st schedule is cancelled when the cluster has enough nodes
final var setDataNodeCountTaskQueue = createSetDataNodeCountTaskQueue(clusterService);
setDataNodeCountTaskQueue.submitTask(randomAlphaOfLength(5), new SetDataNodeCountTask(expectedNumberOfDataNodes), null);
deterministicTaskQueue.runAllRunnableTasks();
assertClusterStateBlocks(false);
assertThat(rerouteCount.get(), equalTo(1));
assertThat(gatewayService.currentPendingStateRecovery, sameInstance(pendingStateRecoveryOfTerm1));
assertThat(deterministicTaskQueue.getCurrentTimeMillis(), equalTo(0L));
// Cancelled scheduled recovery is a no-op
deterministicTaskQueue.runAllTasksInTimeOrder();
assertThat(rerouteCount.get(), equalTo(1));
assertTimeElapsed(TimeValue.timeValueMinutes(hasRecoverAfterTime ? 10 : 5).millis());
}

public void testScheduledRecoveryNoOpWhenNewTermBegins() {
final var hasRecoverAfterTime = randomBoolean();
final var gatewayService = createGatewayServiceForScheduledRecovery(randomIntBetween(2, 5), hasRecoverAfterTime);
final var setClusterStateTaskQueue = createSetClusterStateTaskQueue(clusterService);
final var pendingStateRecoveryOfTerm1 = gatewayService.currentPendingStateRecovery;

// The 1st schedule is effectively cancelled if a new term begins
final TimeValue elapsed = TimeValue.timeValueMinutes(1);
final ClusterState clusterStateOfTerm2 = incrementTerm(clusterService.state());
deterministicTaskQueue.scheduleAt(
elapsed.millis(),
() -> setClusterStateTaskQueue.submitTask(randomAlphaOfLength(5), new SetClusterStateTask(clusterStateOfTerm2), null)
);
deterministicTaskQueue.advanceTime();
deterministicTaskQueue.runAllRunnableTasks();
assertThat(gatewayService.currentPendingStateRecovery, not(sameInstance(pendingStateRecoveryOfTerm1)));
// The 1st scheduled recovery is now a no-op
deterministicTaskQueue.advanceTime();
deterministicTaskQueue.runAllRunnableTasks();
assertThat(
deterministicTaskQueue.getCurrentTimeMillis(),
equalTo(TimeValue.timeValueMinutes(hasRecoverAfterTime ? 10 : 5).millis())
);
assertClusterStateBlocks(true);
assertThat(rerouteCount.get(), equalTo(0));
// The 2nd schedule will perform the recovery
deterministicTaskQueue.runAllTasksInTimeOrder();
assertClusterStateBlocks(false);
assertThat(rerouteCount.get(), equalTo(1));
assertTimeElapsed(elapsed.millis() + TimeValue.timeValueMinutes(hasRecoverAfterTime ? 10 : 5).millis());
}

private GatewayService createGatewayServiceForScheduledRecovery(int expectedNumberOfDataNodes, boolean hasRecoverAfterTime) {
final Settings.Builder settingsBuilder = Settings.builder();
final int expectedNumberOfDataNodes = randomIntBetween(2, 5);
settingsBuilder.put(EXPECTED_DATA_NODES_SETTING.getKey(), expectedNumberOfDataNodes);
final boolean hasRecoverAfterTime = randomBoolean();
if (hasRecoverAfterTime) {
settingsBuilder.put(RECOVER_AFTER_TIME_SETTING.getKey(), TimeValue.timeValueMinutes(10));
}

final ClusterState initialState = buildClusterState(1, 0);
final GatewayService gatewayService = createService(settingsBuilder, initialState);
assertClusterStateBlocks(true);

// The 1st scheduled recovery for the initial term
final long initialTimeInMillis = deterministicTaskQueue.getCurrentTimeMillis();
final var setClusterStateTaskQueue = createSetClusterStateTaskQueue(clusterService);
final ClusterState clusterStateOfTerm1 = incrementTerm(initialState);
final var setClusterStateTaskQueue = createSetClusterStateTaskQueue(clusterService);
setClusterStateTaskQueue.submitTask(randomAlphaOfLength(5), new SetClusterStateTask(clusterStateOfTerm1), null);
deterministicTaskQueue.runAllRunnableTasks(); // publish cluster state term change
// recovery is scheduled but has not run yet
Expand All @@ -272,67 +330,7 @@ public void testScheduledRecovery() {
assertThat(rerouteCount.get(), equalTo(0));
final GatewayService.PendingStateRecovery pendingStateRecoveryOfInitialTerm = gatewayService.currentPendingStateRecovery;
assertThat(pendingStateRecoveryOfInitialTerm, notNullValue());

// The 1st scheduled recovery may or may not run depend on what happens to the cluster next
final int caseNo = randomIntBetween(0, 2);
switch (caseNo) {
case 0:
// Recover when the 1st scheduled recovery is ready to run
deterministicTaskQueue.runAllTasksInTimeOrder();
assertClusterStateBlocks(false);
assertThat(rerouteCount.get(), equalTo(1));
assertThat(
deterministicTaskQueue.getCurrentTimeMillis(),
equalTo(initialTimeInMillis + TimeValue.timeValueMinutes(hasRecoverAfterTime ? 10 : 5).millis())
);
break;
case 1:
// The 1st schedule is cancelled if the cluster can recover immediately due to expected data nodes is reached
final var setDataNodeCountTaskQueue = createSetDataNodeCountTaskQueue(clusterService);
setDataNodeCountTaskQueue.submitTask(randomAlphaOfLength(5), new SetDataNodeCountTask(expectedNumberOfDataNodes), null);
deterministicTaskQueue.runAllRunnableTasks();
assertClusterStateBlocks(false);
assertThat(rerouteCount.get(), equalTo(1));
assertThat(gatewayService.currentPendingStateRecovery, sameInstance(pendingStateRecoveryOfInitialTerm));
assertThat(deterministicTaskQueue.getCurrentTimeMillis(), equalTo(initialTimeInMillis));
// Cancelled scheduled recovery is a no-op
deterministicTaskQueue.runAllTasksInTimeOrder();
assertThat(rerouteCount.get(), equalTo(1));
break;
case 2:
// The 1st schedule is effective cancelled if a new term begins
final TimeValue elapsed = TimeValue.timeValueMinutes(1);
final ClusterState clusterStateOfTerm2 = ClusterState.builder(clusterStateOfTerm1)
.metadata(Metadata.builder().coordinationMetadata(CoordinationMetadata.builder().term(2).build()).build())
.build();
deterministicTaskQueue.scheduleAt(
initialTimeInMillis + elapsed.millis(),
() -> setClusterStateTaskQueue.submitTask(randomAlphaOfLength(5), new SetClusterStateTask(clusterStateOfTerm2), null)
);
deterministicTaskQueue.advanceTime();
deterministicTaskQueue.runAllRunnableTasks();
assertThat(gatewayService.currentPendingStateRecovery, not(sameInstance(pendingStateRecoveryOfInitialTerm)));
// The 1st scheduled recovery is now a no-op
deterministicTaskQueue.advanceTime();
deterministicTaskQueue.runAllRunnableTasks();
assertThat(
deterministicTaskQueue.getCurrentTimeMillis(),
equalTo(initialTimeInMillis + TimeValue.timeValueMinutes(hasRecoverAfterTime ? 10 : 5).millis())
);
assertClusterStateBlocks(true);
assertThat(rerouteCount.get(), equalTo(0));
// The 2nd schedule will perform the recovery
deterministicTaskQueue.runAllTasksInTimeOrder();
assertClusterStateBlocks(false);
assertThat(rerouteCount.get(), equalTo(1));
assertThat(
deterministicTaskQueue.getCurrentTimeMillis(),
equalTo(initialTimeInMillis + elapsed.millis() + TimeValue.timeValueMinutes(hasRecoverAfterTime ? 10 : 5).millis())
);
break;
default:
throw new AssertionError("unknown case number " + caseNo);
}
return gatewayService;
}

public void testScheduledRecoveryWithRecoverAfterNodes() {
Expand All @@ -352,11 +350,10 @@ public void testScheduledRecoveryWithRecoverAfterNodes() {
assertClusterStateBlocks(true);

// Not recover because recoverAfterDataNodes not met
final long initialTimeInMillis = deterministicTaskQueue.getCurrentTimeMillis();
final var setDataNodeCountTaskQueue = createSetDataNodeCountTaskQueue(clusterService);
setDataNodeCountTaskQueue.submitTask(randomAlphaOfLength(5), new SetDataNodeCountTask(recoverAfterNodes - 1), null);
deterministicTaskQueue.runAllTasksInTimeOrder();
assertThat(deterministicTaskQueue.getCurrentTimeMillis(), equalTo(initialTimeInMillis));
assertThat(deterministicTaskQueue.getCurrentTimeMillis(), equalTo(0L));
assertClusterStateBlocks(true);
final var pendingStateRecoveryOfInitialTerm = gatewayService.currentPendingStateRecovery;
assertThat(pendingStateRecoveryOfInitialTerm, notNullValue());
Expand All @@ -370,29 +367,29 @@ public void testScheduledRecoveryWithRecoverAfterNodes() {
// The 1st schedule is cancelled when data nodes drop below recoverAfterDataNodes
final TimeValue elapsed = TimeValue.timeValueMinutes(1);
deterministicTaskQueue.scheduleAt(
initialTimeInMillis + elapsed.millis(),
elapsed.millis(),
() -> setDataNodeCountTaskQueue.submitTask(randomAlphaOfLength(5), new SetDataNodeCountTask(recoverAfterNodes - 1), null)
);
deterministicTaskQueue.advanceTime();
deterministicTaskQueue.runAllRunnableTasks();

// The 2nd scheduled recovery when data nodes are above recoverAfterDataNodes again
deterministicTaskQueue.scheduleAt(
initialTimeInMillis + elapsed.millis() * 2,
elapsed.millis() * 2,
() -> setDataNodeCountTaskQueue.submitTask(randomAlphaOfLength(5), new SetDataNodeCountTask(recoverAfterNodes), null)
);
deterministicTaskQueue.advanceTime();
deterministicTaskQueue.runAllRunnableTasks();

assertThat(gatewayService.currentPendingStateRecovery, sameInstance(pendingStateRecoveryOfInitialTerm));
assertThat(deterministicTaskQueue.getCurrentTimeMillis(), equalTo(initialTimeInMillis + elapsed.millis() * 2));
assertThat(deterministicTaskQueue.getCurrentTimeMillis(), equalTo(elapsed.millis() * 2));

// The 1st scheduled recovery is now a no-op since it is cancelled
deterministicTaskQueue.advanceTime();
deterministicTaskQueue.runAllRunnableTasks();
assertThat(
deterministicTaskQueue.getCurrentTimeMillis(),
equalTo(initialTimeInMillis + TimeValue.timeValueMinutes(hasRecoverAfterTime ? 10 : 5).millis())
equalTo(TimeValue.timeValueMinutes(hasRecoverAfterTime ? 10 : 5).millis())
);
assertClusterStateBlocks(true);
assertThat(rerouteCount.get(), equalTo(0));
Expand All @@ -401,16 +398,17 @@ public void testScheduledRecoveryWithRecoverAfterNodes() {
deterministicTaskQueue.runAllTasksInTimeOrder();
assertClusterStateBlocks(false);
assertThat(rerouteCount.get(), equalTo(1));
assertThat(
deterministicTaskQueue.getCurrentTimeMillis(),
equalTo(initialTimeInMillis + elapsed.millis() * 2 + TimeValue.timeValueMinutes(hasRecoverAfterTime ? 10 : 5).millis())
);
assertTimeElapsed(elapsed.millis() * 2 + TimeValue.timeValueMinutes(hasRecoverAfterTime ? 10 : 5).millis());
}

private void assertClusterStateBlocks(boolean isBlocked) {
assertThat(clusterService.state().blocks().hasGlobalBlock(STATE_NOT_RECOVERED_BLOCK), is(isBlocked));
}

private void assertTimeElapsed(long elapsedInMillis) {
assertThat(deterministicTaskQueue.getCurrentTimeMillis(), equalTo(elapsedInMillis));
}

private ClusterState buildClusterState(int numberOfNodes, long term) {
assert numberOfNodes >= 1;
final String localNodeId = dataNodeIdPrefix + "0";
Expand Down

0 comments on commit 19f89ba

Please sign in to comment.