Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Zen2] Minor housekeeping of tests #34315

Merged
merged 1 commit into from
Oct 5, 2018
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.hamcrest.Matchers.not;

@TestLogging("org.elasticsearch.cluster.coordination:TRACE,org.elasticsearch.discovery:TRACE")
Expand Down Expand Up @@ -305,7 +306,7 @@ public void testAckListenerReceivesNackFromLeader() {

leader.setClusterStateApplyResponse(ClusterStateApplyResponse.FAIL);
AckCollector ackCollector = leader.submitValue(randomLong());
cluster.runFor(DEFAULT_CLUSTER_STATE_UPDATE_DELAY);
cluster.runFor(DEFAULT_CLUSTER_STATE_UPDATE_DELAY, "committing value");
assertTrue(leader.coordinator.getMode() != Coordinator.Mode.LEADER || leader.coordinator.getCurrentTerm() > startingTerm);
leader.setClusterStateApplyResponse(ClusterStateApplyResponse.SUCCEED);
cluster.stabilise();
Expand All @@ -325,7 +326,7 @@ public void testAckListenerReceivesNoAckFromHangingFollower() {

follower0.setClusterStateApplyResponse(ClusterStateApplyResponse.HANG);
AckCollector ackCollector = leader.submitValue(randomLong());
cluster.runFor(DEFAULT_CLUSTER_STATE_UPDATE_DELAY);
cluster.runFor(DEFAULT_CLUSTER_STATE_UPDATE_DELAY, "committing value");
assertTrue("expected immediate ack from " + follower1, ackCollector.hasAckedSuccessfully(follower1));
assertFalse("expected no ack from " + leader, ackCollector.hasAckedSuccessfully(leader));
cluster.stabilise();
Expand All @@ -344,7 +345,7 @@ public void testAckListenerReceivesNacksIfPublicationTimesOut() {
follower0.blackhole();
follower1.blackhole();
AckCollector ackCollector = leader.submitValue(randomLong());
cluster.runFor(DEFAULT_CLUSTER_STATE_UPDATE_DELAY);
cluster.runFor(DEFAULT_CLUSTER_STATE_UPDATE_DELAY, "committing value");
assertFalse("expected no immediate ack from " + leader, ackCollector.hasAcked(leader));
assertFalse("expected no immediate ack from " + follower0, ackCollector.hasAcked(follower0));
assertFalse("expected no immediate ack from " + follower1, ackCollector.hasAcked(follower1));
Expand Down Expand Up @@ -501,6 +502,8 @@ void runRandomly() {

while (finishTime == -1 || deterministicTaskQueue.getCurrentTimeMillis() <= finishTime) {
step++;
final int thisStep = step; // for lambdas

if (randomSteps <= step && finishTime == -1) {
finishTime = deterministicTaskQueue.getLatestDeferredExecutionTime();
deterministicTaskQueue.setExecutionDelayVariabilityMillis(DEFAULT_DELAY_VARIABILITY);
Expand All @@ -511,14 +514,19 @@ void runRandomly() {
if (rarely()) {
final ClusterNode clusterNode = getAnyNodePreferringLeaders();
final int newValue = randomInt();
logger.debug("----> [runRandomly {}] proposing new value [{}] to [{}]", step, newValue, clusterNode.getId());
clusterNode.submitValue(newValue);
onNode(clusterNode.getLocalNode(), () -> {
logger.debug("----> [runRandomly {}] proposing new value [{}] to [{}]",
thisStep, newValue, clusterNode.getId());
clusterNode.submitValue(newValue);
}).run();
} else if (rarely()) {
final ClusterNode clusterNode = getAnyNode();
logger.debug("----> [runRandomly {}] forcing {} to become candidate", step, clusterNode.getId());
synchronized (clusterNode.coordinator.mutex) {
clusterNode.coordinator.becomeCandidate("runRandomly");
}
onNode(clusterNode.getLocalNode(), () -> {
logger.debug("----> [runRandomly {}] forcing {} to become candidate", thisStep, clusterNode.getId());
synchronized (clusterNode.coordinator.mutex) {
clusterNode.coordinator.becomeCandidate("runRandomly");
}
}).run();
} else if (rarely()) {
final ClusterNode clusterNode = getAnyNode();

Expand Down Expand Up @@ -587,10 +595,10 @@ void stabilise() {
stabilise(DEFAULT_STABILISATION_TIME);
}

void stabilise(long stabiliationDurationMillis) {
logger.info("--> stabilising until [{}ms]", deterministicTaskQueue.getCurrentTimeMillis() + stabiliationDurationMillis);
deterministicTaskQueue.setExecutionDelayVariabilityMillis(DEFAULT_DELAY_VARIABILITY);
runFor(stabiliationDurationMillis);
void stabilise(long stabilisationDurationMillis) {
assertThat("stabilisation requires default delay variability (and proper cleanup of raised variability)",
deterministicTaskQueue.getExecutionDelayVariabilityMillis(), lessThanOrEqualTo(DEFAULT_DELAY_VARIABILITY));
runFor(stabilisationDurationMillis, "stabilising");

// TODO remove when term-bumping is enabled
final long maxTerm = clusterNodes.stream().map(n -> n.coordinator.getCurrentTerm()).max(Long::compare).orElse(0L);
Expand All @@ -600,20 +608,22 @@ void stabilise(long stabiliationDurationMillis) {
if (maxLeaderTerm < maxTerm) {
logger.info("--> forcing a term bump, maxTerm={}, maxLeaderTerm={}", maxTerm, maxLeaderTerm);
final ClusterNode leader = getAnyLeader();
synchronized (leader.coordinator.mutex) {
leader.coordinator.ensureTermAtLeast(leader.localNode, maxTerm + 1);
}
leader.coordinator.startElection();
logger.info("--> re-stabilising after term bump until [{}ms]",
deterministicTaskQueue.getCurrentTimeMillis() + DEFAULT_ELECTION_DELAY);
runFor(DEFAULT_ELECTION_DELAY);
onNode(leader.getLocalNode(), () -> {
synchronized (leader.coordinator.mutex) {
leader.coordinator.ensureTermAtLeast(leader.localNode, maxTerm + 1);
}
leader.coordinator.startElection();
}).run();
runFor(DEFAULT_ELECTION_DELAY, "re-stabilising after term bump");
}
logger.info("--> end of stabilisation");

assertUniqueLeaderAndExpectedModes();
}

void runFor(long runDurationMillis) {
void runFor(long runDurationMillis, String description) {
final long endTime = deterministicTaskQueue.getCurrentTimeMillis() + runDurationMillis;
logger.info("----> runFor({}ms) running until [{}ms]: {}", runDurationMillis, endTime, description);

while (deterministicTaskQueue.getCurrentTimeMillis() < endTime) {

Expand All @@ -637,6 +647,8 @@ void runFor(long runDurationMillis) {

deterministicTaskQueue.advanceTime();
}

logger.info("----> runFor({}ms) completed run until [{}ms]: {}", runDurationMillis, endTime, description);
}

private boolean isConnectedPair(ClusterNode n1, ClusterNode n2) {
Expand Down Expand Up @@ -963,7 +975,7 @@ public void run() {
final ClusterState newClusterState = clusterStateSupplier.get();
assert oldClusterState.version() <= newClusterState.version() :
"updating cluster state from version "
+ oldClusterState.version() + " to stale version " + newClusterState.version();
+ oldClusterState.version() + " to stale version " + newClusterState.version();
clusterApplier.lastAppliedClusterState = newClusterState;
}

Expand Down