From daab082117d53aa612d487be3e7f118f2fae3609 Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Tue, 24 Jul 2018 14:32:25 +0200 Subject: [PATCH 01/19] Add Publication --- .../cluster/coordination/Publication.java | 393 ++++++++++++++++++ 1 file changed, 393 insertions(+) create mode 100644 server/src/main/java/org/elasticsearch/cluster/coordination/Publication.java diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/Publication.java b/server/src/main/java/org/elasticsearch/cluster/coordination/Publication.java new file mode 100644 index 0000000000000..c08ad558328f3 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/Publication.java @@ -0,0 +1,393 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.cluster.coordination; + +import org.apache.logging.log4j.message.ParameterizedMessage; +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.component.AbstractComponent; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.discovery.Discovery.AckListener; +import org.elasticsearch.discovery.zen2.Messages.ApplyCommit; +import org.elasticsearch.discovery.zen2.Messages.LegislatorPublishResponse; +import org.elasticsearch.discovery.zen2.Messages.PublishRequest; +import org.elasticsearch.transport.TransportException; +import org.elasticsearch.transport.TransportResponse; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.LongSupplier; + +public abstract class Publication extends AbstractComponent { + + private final AtomicReference applyCommitReference; + private final List publicationTargets; + private final PublishRequest publishRequest; + private final AckListener ackListener; + private final DiscoveryNode localNode; + private final LongSupplier currentTimeSupplier; + private final long startTime; + private boolean isCompleted; + + public Publication(Settings settings, PublishRequest publishRequest, AckListener ackListener, LongSupplier currentTimeSupplier) { + super(settings); + this.publishRequest = publishRequest; + this.ackListener = ackListener; + this.localNode = publishRequest.getAcceptedState().nodes().getLocalNode(); + this.currentTimeSupplier = currentTimeSupplier; + applyCommitReference = new AtomicReference<>(); + startTime = currentTimeSupplier.getAsLong(); + + publicationTargets = new ArrayList<>(publishRequest.getAcceptedState().getNodes().getNodes().size()); + publishRequest.getAcceptedState().getNodes().iterator().forEachRemaining(n -> publicationTargets.add(new PublicationTarget(n))); + } + + @Override + public String toString() { + // everything here is immutable so no synchronisation required + return "Publication{term=" + publishRequest.getAcceptedState().term() + + ", version=" + publishRequest.getAcceptedState().version() + '}'; + } + + public void start(Set faultyNodes) { + logger.trace("publishing {} to {}", publishRequest, publicationTargets); + + Set localFaultyNodes = new HashSet<>(faultyNodes); + for (final DiscoveryNode faultyNode : localFaultyNodes) { + onFaultyNode(faultyNode); + } + onPossibleCommitFailure(); + publicationTargets.forEach(PublicationTarget::sendPublishRequest); + } + + private void onPossibleCompletion() { + if (isCompleted) { + return; + } + + for (final PublicationTarget target : publicationTargets) { + if (target.publicationTargetStateMachine.isActive()) { + return; + } + } + + for (final PublicationTarget target : publicationTargets) { + if (target.discoveryNode.equals(localNode) && target.publicationTargetStateMachine.isFailed()) { + logger.debug("onPossibleCompletion: [{}] failed on master", this); + assert isCompleted == false; + isCompleted = true; + onCompletion(false); + return; + } + } + + assert isCompleted == false; + isCompleted = true; + onCompletion(true); + assert applyCommitReference.get() != null; + logger.trace("onPossibleCompletion: [{}] was successful, applying new state locally", this); + } + + // For assertions only: verify that this invariant holds + private boolean publicationCompletedIffAllTargetsInactive() { + for (final PublicationTarget target : publicationTargets) { + if (target.publicationTargetStateMachine.isActive()) { + return isCompleted == false; + } + } + return isCompleted; + } + + public void onCommitted(final ApplyCommitRequest applyCommit) { + assert applyCommitReference.get() == null; + applyCommitReference.set(applyCommit); + ackListener.onCommit(TimeValue.timeValueMillis(currentTimeSupplier.getAsLong() - startTime)); + publicationTargets.stream().filter(PublicationTarget::isWaitingForQuorum).forEach(PublicationTarget::sendApplyCommit); + } + + private void onPossibleCommitFailure() { + if (applyCommitReference.get() != null) { + onPossibleCompletion(); + return; + } + + CoordinationState.VoteCollection possiblySuccessfulNodes = new CoordinationState.VoteCollection(); + for (PublicationTarget publicationTarget : publicationTargets) { + if (publicationTarget.publicationTargetStateMachine.mayCommitInFuture()) { + possiblySuccessfulNodes.addVote(publicationTarget.discoveryNode); + } else { + assert publicationTarget.publicationTargetStateMachine.isFailed() : publicationTarget.publicationTargetStateMachine; + } + } + + if (isPublishQuorum(possiblySuccessfulNodes) == false) { + logger.debug("onPossibleCommitFailure: non-failed nodes do not form a quorum, so {} cannot succeed", this); + failActiveTargets(); + } + } + + void failActiveTargets() { + publicationTargets.stream().filter(PublicationTarget::isActive).forEach(PublicationTarget::setFailed); + onPossibleCompletion(); + } + + public void onTimeout() { + publicationTargets.stream().filter(PublicationTarget::isActive).forEach(PublicationTarget::onTimeOut); + onPossibleCompletion(); + } + + void onFaultyNode(DiscoveryNode faultyNode) { + publicationTargets.forEach(t -> t.onFaultyNode(faultyNode)); + onPossibleCompletion(); + } + + protected abstract void onCompletion(boolean success); + + protected abstract boolean isPublishQuorum(CoordinationState.VoteCollection votes); + + protected abstract Optional handlePublishResponse(DiscoveryNode sourceNode, PublishResponse publishResponse); + + protected abstract void onPossibleJoin(DiscoveryNode sourceNode, LegislatorPublishResponse response); + + protected abstract void sendPublishRequest(DiscoveryNode destination, PublishRequest publishRequest, + ActionListener responseActionListener); + + protected abstract void sendApplyCommit(DiscoveryNode destination, ApplyCommitRequest applyCommit, + ActionListener responseActionListener); + + enum PublicationTargetState { + NOT_STARTED, + FAILED, + SENT_PUBLISH_REQUEST, + WAITING_FOR_QUORUM, + SENT_APPLY_COMMIT, + APPLIED_COMMIT, + } + + static class PublicationTargetStateMachine { + private PublicationTargetState state = PublicationTargetState.NOT_STARTED; + + public void setState(PublicationTargetState newState) { + switch (newState) { + case NOT_STARTED: + assert false : state + " -> " + newState; + break; + case SENT_PUBLISH_REQUEST: + assert state == PublicationTargetState.NOT_STARTED : state + " -> " + newState; + break; + case WAITING_FOR_QUORUM: + assert state == PublicationTargetState.SENT_PUBLISH_REQUEST : state + " -> " + newState; + break; + case SENT_APPLY_COMMIT: + assert state == PublicationTargetState.WAITING_FOR_QUORUM : state + " -> " + newState; + break; + case APPLIED_COMMIT: + assert state == PublicationTargetState.SENT_APPLY_COMMIT : state + " -> " + newState; + break; + case FAILED: + assert state != PublicationTargetState.APPLIED_COMMIT : state + " -> " + newState; + break; + } + state = newState; + } + + public boolean isActive() { + return state != PublicationTargetState.FAILED + && state != PublicationTargetState.APPLIED_COMMIT; + } + + public boolean isWaitingForQuorum() { + return state == PublicationTargetState.WAITING_FOR_QUORUM; + } + + public boolean mayCommitInFuture() { + return (state == PublicationTargetState.NOT_STARTED + || state == PublicationTargetState.SENT_PUBLISH_REQUEST + || state == PublicationTargetState.WAITING_FOR_QUORUM); + } + + public boolean isFailed() { + return state == PublicationTargetState.FAILED; + } + + @Override + public String toString() { + // TODO DANGER non-volatile, mutable variable requires synchronisation + return state.toString(); + } + } + + private class PublicationTarget { + private final DiscoveryNode discoveryNode; + private final PublicationTargetStateMachine publicationTargetStateMachine = new PublicationTargetStateMachine(); + private boolean ackIsPending = true; + + private PublicationTarget(DiscoveryNode discoveryNode) { + this.discoveryNode = discoveryNode; + } + + @Override + public String toString() { + // everything here is immutable so no synchronisation required + return discoveryNode.getId(); + } + + public void sendPublishRequest() { + if (publicationTargetStateMachine.isFailed()) { + return; + } + publicationTargetStateMachine.setState(PublicationTargetState.SENT_PUBLISH_REQUEST); + Publication.this.sendPublishRequest(discoveryNode, publishRequest, new PublicationTarget.PublishResponseHandler()); + // TODO Can this ^ fail with an exception? Target should be failed if so. + assert publicationCompletedIffAllTargetsInactive(); + } + + void handlePublishResponse(PublishResponse publishResponse) { + assert publicationTargetStateMachine.isWaitingForQuorum() : publicationTargetStateMachine; + + logger.trace("handlePublishResponse: handling [{}] from [{}])", publishResponse, discoveryNode); + if (applyCommitReference.get() != null) { + sendApplyCommit(); + } else { + Publication.this.handlePublishResponse(discoveryNode, publishResponse).ifPresent(Publication.this::onCommitted); + } + } + + public void sendApplyCommit() { + publicationTargetStateMachine.setState(PublicationTargetState.SENT_APPLY_COMMIT); + + ApplyCommitRequest applyCommit = applyCommitReference.get(); + assert applyCommit != null; + + Publication.this.sendApplyCommit(discoveryNode, applyCommit, new PublicationTarget.ApplyCommitResponseHandler()); + assert publicationCompletedIffAllTargetsInactive(); + } + + public boolean isWaitingForQuorum() { + return publicationTargetStateMachine.isWaitingForQuorum(); + } + + public boolean isActive() { + return publicationTargetStateMachine.isActive(); + } + + public void setFailed() { + assert isActive(); + publicationTargetStateMachine.setState(PublicationTargetState.FAILED); + ackOnce(new ElasticsearchException("publication failed")); + } + + public void onTimeOut() { + assert isActive(); + publicationTargetStateMachine.setState(PublicationTargetState.FAILED); + if (applyCommitReference.get() == null) { + ackOnce(new ElasticsearchException("publication timed out")); + } + } + + public void onFaultyNode(DiscoveryNode faultyNode) { + if (isActive() && discoveryNode.equals(faultyNode)) { + logger.debug("onFaultyNode: [{}] is faulty, failing target in publication of version [{}] in term [{}]", faultyNode, + publishRequest.getAcceptedState().version(), publishRequest.getAcceptedState().term()); + setFailed(); + onPossibleCommitFailure(); + } + } + + private void ackOnce(Exception e) { + if (ackIsPending && localNode.equals(discoveryNode) == false) { + ackIsPending = false; + ackListener.onNodeAck(discoveryNode, e); + } + } + + private class PublishResponseHandler implements ActionListener { + + @Override + public void onResponse(LegislatorPublishResponse response) { + if (publicationTargetStateMachine.isFailed()) { + logger.debug("PublishResponseHandler.handleResponse: already failed, ignoring response from [{}]", discoveryNode); + assert publicationCompletedIffAllTargetsInactive(); + return; + } + + onPossibleJoin(discoveryNode, response); + + publicationTargetStateMachine.setState(PublicationTargetState.WAITING_FOR_QUORUM); + handlePublishResponse(response.getPublishResponse()); + + assert publicationCompletedIffAllTargetsInactive(); + } + + @Override + public void onFailure(Exception e) { + assert e instanceof TransportException; + final TransportException exp = (TransportException) e; + if (exp.getRootCause() instanceof CoordinationStateRejectedException) { + logger.debug("PublishResponseHandler: [{}] failed: {}", discoveryNode, exp.getRootCause().getMessage()); + } else { + logger.debug(() -> new ParameterizedMessage("PublishResponseHandler: [{}] failed", discoveryNode), exp); + } + publicationTargetStateMachine.setState(PublicationTargetState.FAILED); + onPossibleCommitFailure(); + assert publicationCompletedIffAllTargetsInactive(); + ackOnce(exp); + } + + } + + private class ApplyCommitResponseHandler implements ActionListener { + + @Override + public void onResponse(TransportResponse.Empty ignored) { + if (publicationTargetStateMachine.isFailed()) { + logger.debug("ApplyCommitResponseHandler.handleResponse: already failed, ignoring response from [{}]", + discoveryNode); + return; + } + publicationTargetStateMachine.setState(PublicationTargetState.APPLIED_COMMIT); + onPossibleCompletion(); + assert publicationCompletedIffAllTargetsInactive(); + ackOnce(null); + } + + @Override + public void onFailure(Exception e) { + assert e instanceof TransportException; + final TransportException exp = (TransportException) e; + if (exp.getRootCause() instanceof CoordinationStateRejectedException) { + logger.debug("ApplyCommitResponseHandler: [{}] failed: {}", discoveryNode, exp.getRootCause().getMessage()); + } else { + logger.debug(() -> new ParameterizedMessage("ApplyCommitResponseHandler: [{}] failed", discoveryNode), exp); + } + publicationTargetStateMachine.setState(PublicationTargetState.FAILED); + onPossibleCompletion(); + assert publicationCompletedIffAllTargetsInactive(); + ackOnce(exp); + } + } + } +} From bb95797c416cf7ae983ec79318aacd74480e270d Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Tue, 24 Jul 2018 17:47:15 +0200 Subject: [PATCH 02/19] wip --- .../LegislatorPublishResponse.java | 56 +++++++++ .../cluster/coordination/Publication.java | 4 - .../coordination/PublicationTests.java | 118 ++++++++++++++++++ 3 files changed, 174 insertions(+), 4 deletions(-) create mode 100644 server/src/main/java/org/elasticsearch/cluster/coordination/LegislatorPublishResponse.java create mode 100644 server/src/test/java/org/elasticsearch/cluster/coordination/PublicationTests.java diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/LegislatorPublishResponse.java b/server/src/main/java/org/elasticsearch/cluster/coordination/LegislatorPublishResponse.java new file mode 100644 index 0000000000000..a44125c312ec7 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/LegislatorPublishResponse.java @@ -0,0 +1,56 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.cluster.coordination; + +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.transport.TransportResponse; + +import java.io.IOException; +import java.util.Optional; + +public class LegislatorPublishResponse extends TransportResponse { + private final PublishResponse publishResponse; + private final Optional optionalJoin; // if vote was granted due to node having lower term + + public LegislatorPublishResponse(PublishResponse publishResponse, Optional optionalJoin) { + this.publishResponse = publishResponse; + this.optionalJoin = optionalJoin; + } + + public LegislatorPublishResponse(StreamInput in) throws IOException { + this.publishResponse = new PublishResponse(in); + this.optionalJoin = Optional.ofNullable(in.readOptionalWriteable(Join::new)); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + publishResponse.writeTo(out); + out.writeOptionalWriteable(optionalJoin.orElse(null)); + } + + public PublishResponse getPublishResponse() { + return publishResponse; + } + + public Optional getJoin() { + return optionalJoin; + } +} diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/Publication.java b/server/src/main/java/org/elasticsearch/cluster/coordination/Publication.java index c08ad558328f3..ed01bb47ae121 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/Publication.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/Publication.java @@ -27,9 +27,6 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.discovery.Discovery.AckListener; -import org.elasticsearch.discovery.zen2.Messages.ApplyCommit; -import org.elasticsearch.discovery.zen2.Messages.LegislatorPublishResponse; -import org.elasticsearch.discovery.zen2.Messages.PublishRequest; import org.elasticsearch.transport.TransportException; import org.elasticsearch.transport.TransportResponse; @@ -67,7 +64,6 @@ public Publication(Settings settings, PublishRequest publishRequest, AckListener @Override public String toString() { - // everything here is immutable so no synchronisation required return "Publication{term=" + publishRequest.getAcceptedState().term() + ", version=" + publishRequest.getAcceptedState().version() + '}'; } diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/PublicationTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/PublicationTests.java new file mode 100644 index 0000000000000..22a412b8e4b60 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/PublicationTests.java @@ -0,0 +1,118 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.cluster.coordination; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.discovery.Discovery; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.transport.TransportResponse; + +import java.util.Collections; +import java.util.Optional; +import java.util.function.LongSupplier; + +public class PublicationTests extends ESTestCase { + + class MockNode { + + MockNode(Settings settings, DiscoveryNode localNode) { + ClusterState initialState = CoordinationStateTests.clusterState(0L, 0L, localNode, ClusterState.VotingConfiguration.EMPTY_CONFIG, ClusterState.VotingConfiguration.EMPTY_CONFIG, 42L); + + coordinationState = new CoordinationState(settings, localNode, new CoordinationStateTests.InMemoryPersistedState(0L, + ))ClusterState.builder(CLUSTER_NAME).nodes(DiscoveryNodes.builder() + .add(discoveryNode).localNodeId(discoveryNode.getId()).build()).build(); + } + + final CoordinationState coordinationState; + + Publication currentPublication; + + public void publish(ClusterState clusterState) { + PublishRequest publishRequest = coordinationState.handleClientValue(clusterState); + currentPublication = new MockPublication(this, Settings.EMPTY, publishRequest, new Discovery.AckListener() { + + @Override + public void onCommit(TimeValue commitTime) { + + } + + @Override + public void onNodeAck(DiscoveryNode node, Exception e) { + + } + }, () -> 0L); + currentPublication.start(Collections.emptySet()); + } + } + + class MockPublication extends Publication { + + private final MockNode mockNode; + + public MockPublication(MockNode mockNode, Settings settings, PublishRequest publishRequest, Discovery.AckListener ackListener, + LongSupplier currentTimeSupplier) { + super(settings, publishRequest, ackListener, currentTimeSupplier); + this.mockNode = mockNode; + } + + @Override + protected void onCompletion(boolean success) { + mockNode.currentPublication = null; + } + + @Override + protected boolean isPublishQuorum(CoordinationState.VoteCollection votes) { + return mockNode.coordinationState.isPublishQuorum(votes); + } + + @Override + protected Optional handlePublishResponse(DiscoveryNode sourceNode, PublishResponse publishResponse) { + return mockNode.coordinationState.handlePublishResponse(sourceNode, publishResponse); + } + + @Override + protected void onPossibleJoin(DiscoveryNode sourceNode, LegislatorPublishResponse response) { + + } + + @Override + protected void sendPublishRequest(DiscoveryNode destination, PublishRequest publishRequest, + ActionListener responseActionListener) { + + } + + @Override + protected void sendApplyCommit(DiscoveryNode destination, ApplyCommitRequest applyCommit, + ActionListener responseActionListener) { + + } + } + + public void testSimpleClusterStatePublishing() throws Exception { + + } + + +} From 1f7880666555aff3b39fb00c3b442faac822a2ea Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Wed, 25 Jul 2018 11:45:15 +0200 Subject: [PATCH 03/19] improve testing --- .../coordination/CoordinationStateTests.java | 2 +- .../coordination/PublicationTests.java | 126 +++++++++++++----- 2 files changed, 93 insertions(+), 35 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinationStateTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinationStateTests.java index e7f61efa69054..e39e14840ddf1 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinationStateTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinationStateTests.java @@ -74,7 +74,7 @@ public void setupNodes() { cs3 = createCoordinationState(new InMemoryPersistedState(0L, initialStateNode3), node3); } - private DiscoveryNode createNode(String id) { + public static DiscoveryNode createNode(String id) { return new DiscoveryNode(id, buildNewFakeTransportAddress(), Version.CURRENT); } diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/PublicationTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/PublicationTests.java index 22a412b8e4b60..1913430caf57a 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/PublicationTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/PublicationTests.java @@ -24,72 +24,81 @@ import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.discovery.Discovery; +import org.elasticsearch.discovery.zen.PublishClusterStateActionTests; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.transport.TransportResponse; +import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import java.util.Optional; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; import java.util.function.LongSupplier; +import static org.hamcrest.Matchers.equalTo; + public class PublicationTests extends ESTestCase { class MockNode { MockNode(Settings settings, DiscoveryNode localNode) { - ClusterState initialState = CoordinationStateTests.clusterState(0L, 0L, localNode, ClusterState.VotingConfiguration.EMPTY_CONFIG, ClusterState.VotingConfiguration.EMPTY_CONFIG, 42L); - + this.localNode = localNode; + ClusterState initialState = CoordinationStateTests.clusterState(0L, 0L, localNode, + ClusterState.VotingConfiguration.EMPTY_CONFIG, ClusterState.VotingConfiguration.EMPTY_CONFIG, 0L); coordinationState = new CoordinationState(settings, localNode, new CoordinationStateTests.InMemoryPersistedState(0L, - ))ClusterState.builder(CLUSTER_NAME).nodes(DiscoveryNodes.builder() - .add(discoveryNode).localNodeId(discoveryNode.getId()).build()).build(); + initialState)); } - final CoordinationState coordinationState; + final DiscoveryNode localNode; - Publication currentPublication; + final CoordinationState coordinationState; - public void publish(ClusterState clusterState) { + public MockPublication publish(ClusterState clusterState, Discovery.AckListener ackListener) { PublishRequest publishRequest = coordinationState.handleClientValue(clusterState); - currentPublication = new MockPublication(this, Settings.EMPTY, publishRequest, new Discovery.AckListener() { - + MockPublication currentPublication = new MockPublication(Settings.EMPTY, publishRequest, ackListener, () -> 0L) { @Override - public void onCommit(TimeValue commitTime) { - + protected boolean isPublishQuorum(CoordinationState.VoteCollection votes) { + return coordinationState.isPublishQuorum(votes); } @Override - public void onNodeAck(DiscoveryNode node, Exception e) { - + protected Optional handlePublishResponse(DiscoveryNode sourceNode, PublishResponse publishResponse) { + return coordinationState.handlePublishResponse(sourceNode, publishResponse); } - }, () -> 0L); + }; currentPublication.start(Collections.emptySet()); + return currentPublication; } } - class MockPublication extends Publication { + abstract class MockPublication extends Publication { + + final PublishRequest publishRequest; + + ApplyCommitRequest applyCommit; + + boolean completed; - private final MockNode mockNode; + boolean success; - public MockPublication(MockNode mockNode, Settings settings, PublishRequest publishRequest, Discovery.AckListener ackListener, + Map> pendingPublications = new HashMap<>(); + Map> pendingCommits = new HashMap<>(); + + public MockPublication(Settings settings, PublishRequest publishRequest, Discovery.AckListener ackListener, LongSupplier currentTimeSupplier) { super(settings, publishRequest, ackListener, currentTimeSupplier); - this.mockNode = mockNode; + this.publishRequest = publishRequest; } @Override protected void onCompletion(boolean success) { - mockNode.currentPublication = null; - } - - @Override - protected boolean isPublishQuorum(CoordinationState.VoteCollection votes) { - return mockNode.coordinationState.isPublishQuorum(votes); - } - - @Override - protected Optional handlePublishResponse(DiscoveryNode sourceNode, PublishResponse publishResponse) { - return mockNode.coordinationState.handlePublishResponse(sourceNode, publishResponse); + completed = true; + this.success = success; } @Override @@ -100,18 +109,67 @@ protected void onPossibleJoin(DiscoveryNode sourceNode, LegislatorPublishRespons @Override protected void sendPublishRequest(DiscoveryNode destination, PublishRequest publishRequest, ActionListener responseActionListener) { - + assertSame(publishRequest, this.publishRequest); + assertNull(pendingPublications.put(destination, responseActionListener)); } @Override protected void sendApplyCommit(DiscoveryNode destination, ApplyCommitRequest applyCommit, ActionListener responseActionListener) { - + if (this.applyCommit == null) { + this.applyCommit = applyCommit; + } else { + assertSame(applyCommit, this.applyCommit); + } + assertNull(pendingCommits.put(destination, responseActionListener)); } } - public void testSimpleClusterStatePublishing() throws Exception { - + public void testSimpleClusterStatePublishing() throws InterruptedException { + DiscoveryNode n1 = CoordinationStateTests.createNode("node1"); + DiscoveryNode n2 = CoordinationStateTests.createNode("node2"); + DiscoveryNode n3 = CoordinationStateTests.createNode("node3"); + + MockNode node1 = new MockNode(Settings.EMPTY, n1); + MockNode node2 = new MockNode(Settings.EMPTY, n2); + MockNode node3 = new MockNode(Settings.EMPTY, n3); + List nodes = Arrays.asList(node1, node2, node3); + + Function nodeResolver = dn -> nodes.stream().filter(mn -> mn.localNode.equals(dn)).findFirst().get(); + + ClusterState.VotingConfiguration singleNodeConfig = new ClusterState.VotingConfiguration(Sets.newHashSet(node1.localNode.getId())); + node1.coordinationState.setInitialState( + CoordinationStateTests.clusterState(0L, 1L, n1, singleNodeConfig, singleNodeConfig, 0L)); + StartJoinRequest startJoinRequest = new StartJoinRequest(n1, 1L); + node1.coordinationState.handleJoin(node1.coordinationState.handleStartJoin(startJoinRequest)); + assertTrue(node1.coordinationState.electionWon()); + node1.coordinationState.handleJoin(node2.coordinationState.handleStartJoin(startJoinRequest)); + node1.coordinationState.handleJoin(node3.coordinationState.handleStartJoin(startJoinRequest)); + + PublishClusterStateActionTests.AssertingAckListener ackListener = + new PublishClusterStateActionTests.AssertingAckListener(nodes.size()); + MockPublication publication = node1.publish(CoordinationStateTests.clusterState(1L, 2L, + DiscoveryNodes.builder().add(n1).add(n2).add(n3).localNodeId(n1.getId()).build(), + singleNodeConfig, singleNodeConfig, 42L), ackListener); + + assertThat(publication.pendingPublications.keySet(), equalTo(Sets.newHashSet(n1, n2, n3))); + assertTrue(publication.pendingCommits.isEmpty()); + publication.pendingPublications.entrySet().stream().forEach(e -> { + PublishResponse publishResponse = nodeResolver.apply(e.getKey()).coordinationState.handlePublishRequest( + publication.publishRequest); + e.getValue().onResponse(new LegislatorPublishResponse(publishResponse, Optional.empty())); + }); + + assertThat(publication.pendingCommits.keySet(), equalTo(Sets.newHashSet(n1, n2, n3))); + publication.pendingCommits.entrySet().stream().forEach(e -> { + nodeResolver.apply(e.getKey()).coordinationState.handleCommit(publication.applyCommit); + e.getValue().onResponse(TransportResponse.Empty.INSTANCE); + }); + + assertTrue(publication.completed); + assertTrue(publication.success); + + ackListener.await(0L, TimeUnit.SECONDS); } From 79199b57ee4f6274aaffe6abcb6036b466109e9a Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Fri, 27 Jul 2018 12:06:08 +0200 Subject: [PATCH 04/19] fix impl and add more tests --- .../cluster/coordination/Publication.java | 76 +++--- ...onse.java => PublishWithJoinResponse.java} | 32 ++- .../cluster/coordination/MessagesTests.java | 29 ++ .../coordination/PublicationTests.java | 257 ++++++++++++++++-- .../zen/PublishClusterStateActionTests.java | 8 +- 5 files changed, 332 insertions(+), 70 deletions(-) rename server/src/main/java/org/elasticsearch/cluster/coordination/{LegislatorPublishResponse.java => PublishWithJoinResponse.java} (65%) diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/Publication.java b/server/src/main/java/org/elasticsearch/cluster/coordination/Publication.java index ed01bb47ae121..c3b129cc8aee8 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/Publication.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/Publication.java @@ -31,7 +31,6 @@ import org.elasticsearch.transport.TransportResponse; import java.util.ArrayList; -import java.util.HashSet; import java.util.List; import java.util.Optional; import java.util.Set; @@ -62,23 +61,26 @@ public Publication(Settings settings, PublishRequest publishRequest, AckListener publishRequest.getAcceptedState().getNodes().iterator().forEachRemaining(n -> publicationTargets.add(new PublicationTarget(n))); } - @Override - public String toString() { - return "Publication{term=" + publishRequest.getAcceptedState().term() + - ", version=" + publishRequest.getAcceptedState().version() + '}'; - } - public void start(Set faultyNodes) { logger.trace("publishing {} to {}", publishRequest, publicationTargets); - Set localFaultyNodes = new HashSet<>(faultyNodes); - for (final DiscoveryNode faultyNode : localFaultyNodes) { + for (final DiscoveryNode faultyNode : faultyNodes) { onFaultyNode(faultyNode); } onPossibleCommitFailure(); publicationTargets.forEach(PublicationTarget::sendPublishRequest); } + public void onTimeout() { + publicationTargets.stream().filter(PublicationTarget::isActive).forEach(PublicationTarget::onTimeOut); + onPossibleCompletion(); + } + + public void onFaultyNode(DiscoveryNode faultyNode) { + publicationTargets.forEach(t -> t.onFaultyNode(faultyNode)); + onPossibleCompletion(); + } + private void onPossibleCompletion() { if (isCompleted) { return; @@ -117,13 +119,6 @@ private boolean publicationCompletedIffAllTargetsInactive() { return isCompleted; } - public void onCommitted(final ApplyCommitRequest applyCommit) { - assert applyCommitReference.get() == null; - applyCommitReference.set(applyCommit); - ackListener.onCommit(TimeValue.timeValueMillis(currentTimeSupplier.getAsLong() - startTime)); - publicationTargets.stream().filter(PublicationTarget::isWaitingForQuorum).forEach(PublicationTarget::sendApplyCommit); - } - private void onPossibleCommitFailure() { if (applyCommitReference.get() != null) { onPossibleCompletion(); @@ -141,39 +136,31 @@ private void onPossibleCommitFailure() { if (isPublishQuorum(possiblySuccessfulNodes) == false) { logger.debug("onPossibleCommitFailure: non-failed nodes do not form a quorum, so {} cannot succeed", this); - failActiveTargets(); + publicationTargets.stream().filter(PublicationTarget::isActive).forEach(PublicationTarget::setFailed); + onPossibleCompletion(); } } - void failActiveTargets() { - publicationTargets.stream().filter(PublicationTarget::isActive).forEach(PublicationTarget::setFailed); - onPossibleCompletion(); - } - - public void onTimeout() { - publicationTargets.stream().filter(PublicationTarget::isActive).forEach(PublicationTarget::onTimeOut); - onPossibleCompletion(); - } - - void onFaultyNode(DiscoveryNode faultyNode) { - publicationTargets.forEach(t -> t.onFaultyNode(faultyNode)); - onPossibleCompletion(); - } - protected abstract void onCompletion(boolean success); protected abstract boolean isPublishQuorum(CoordinationState.VoteCollection votes); protected abstract Optional handlePublishResponse(DiscoveryNode sourceNode, PublishResponse publishResponse); - protected abstract void onPossibleJoin(DiscoveryNode sourceNode, LegislatorPublishResponse response); + protected abstract void onPossibleJoin(DiscoveryNode sourceNode, PublishWithJoinResponse response); protected abstract void sendPublishRequest(DiscoveryNode destination, PublishRequest publishRequest, - ActionListener responseActionListener); + ActionListener responseActionListener); protected abstract void sendApplyCommit(DiscoveryNode destination, ApplyCommitRequest applyCommit, ActionListener responseActionListener); + @Override + public String toString() { + return "Publication{term=" + publishRequest.getAcceptedState().term() + + ", version=" + publishRequest.getAcceptedState().version() + '}'; + } + enum PublicationTargetState { NOT_STARTED, FAILED, @@ -268,7 +255,12 @@ void handlePublishResponse(PublishResponse publishResponse) { if (applyCommitReference.get() != null) { sendApplyCommit(); } else { - Publication.this.handlePublishResponse(discoveryNode, publishResponse).ifPresent(Publication.this::onCommitted); + Publication.this.handlePublishResponse(discoveryNode, publishResponse).ifPresent(applyCommit -> { + assert applyCommitReference.get() == null; + applyCommitReference.set(applyCommit); + ackListener.onCommit(TimeValue.timeValueMillis(currentTimeSupplier.getAsLong() - startTime)); + publicationTargets.stream().filter(PublicationTarget::isWaitingForQuorum).forEach(PublicationTarget::sendApplyCommit); + }); } } @@ -320,12 +312,19 @@ private void ackOnce(Exception e) { } } - private class PublishResponseHandler implements ActionListener { + private class PublishResponseHandler implements ActionListener { @Override - public void onResponse(LegislatorPublishResponse response) { + public void onResponse(PublishWithJoinResponse response) { if (publicationTargetStateMachine.isFailed()) { - logger.debug("PublishResponseHandler.handleResponse: already failed, ignoring response from [{}]", discoveryNode); + if (applyCommitReference.get() != null) { + logger.trace("PublishResponseHandler.handleResponse: handling [{}] from [{}] while already failed", + response.getPublishResponse(), discoveryNode); + Publication.this.sendApplyCommit(discoveryNode, applyCommitReference.get(), + new PublicationTarget.ApplyCommitResponseHandler()); + } else { + logger.debug("PublishResponseHandler.handleResponse: already failed, ignoring response from [{}]", discoveryNode); + } assert publicationCompletedIffAllTargetsInactive(); return; } @@ -362,6 +361,7 @@ public void onResponse(TransportResponse.Empty ignored) { if (publicationTargetStateMachine.isFailed()) { logger.debug("ApplyCommitResponseHandler.handleResponse: already failed, ignoring response from [{}]", discoveryNode); + ackOnce(null); // still need to ack return; } publicationTargetStateMachine.setState(PublicationTargetState.APPLIED_COMMIT); diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/LegislatorPublishResponse.java b/server/src/main/java/org/elasticsearch/cluster/coordination/PublishWithJoinResponse.java similarity index 65% rename from server/src/main/java/org/elasticsearch/cluster/coordination/LegislatorPublishResponse.java rename to server/src/main/java/org/elasticsearch/cluster/coordination/PublishWithJoinResponse.java index a44125c312ec7..402ef5fe7e697 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/LegislatorPublishResponse.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/PublishWithJoinResponse.java @@ -25,16 +25,16 @@ import java.io.IOException; import java.util.Optional; -public class LegislatorPublishResponse extends TransportResponse { +public class PublishWithJoinResponse extends TransportResponse { private final PublishResponse publishResponse; private final Optional optionalJoin; // if vote was granted due to node having lower term - public LegislatorPublishResponse(PublishResponse publishResponse, Optional optionalJoin) { + public PublishWithJoinResponse(PublishResponse publishResponse, Optional optionalJoin) { this.publishResponse = publishResponse; this.optionalJoin = optionalJoin; } - public LegislatorPublishResponse(StreamInput in) throws IOException { + public PublishWithJoinResponse(StreamInput in) throws IOException { this.publishResponse = new PublishResponse(in); this.optionalJoin = Optional.ofNullable(in.readOptionalWriteable(Join::new)); } @@ -53,4 +53,30 @@ public PublishResponse getPublishResponse() { public Optional getJoin() { return optionalJoin; } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (!(o instanceof PublishWithJoinResponse)) return false; + + PublishWithJoinResponse that = (PublishWithJoinResponse) o; + + if (!publishResponse.equals(that.publishResponse)) return false; + return optionalJoin.equals(that.optionalJoin); + } + + @Override + public int hashCode() { + int result = publishResponse.hashCode(); + result = 31 * result + optionalJoin.hashCode(); + return result; + } + + @Override + public String toString() { + return "PublishWithJoinResponse{" + + "publishResponse=" + publishResponse + + ", optionalJoin=" + optionalJoin + + '}'; + } } diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/MessagesTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/MessagesTests.java index 7fa4a3217348f..1770770e02f78 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/MessagesTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/MessagesTests.java @@ -25,6 +25,8 @@ import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.EqualsHashCodeTestUtils; +import java.util.Optional; + public class MessagesTests extends ESTestCase { private DiscoveryNode createNode(String id) { @@ -96,6 +98,33 @@ public void testPublishResponseEqualsHashCodeSerialization() { }); } + public void testPublishWithJoinResponseEqualsHashCodeSerialization() { + PublishResponse initialPublishResponse = new PublishResponse(randomNonNegativeLong(), randomNonNegativeLong()); + Join initialJoin = new Join(createNode(randomAlphaOfLength(10)), createNode(randomAlphaOfLength(10)), randomNonNegativeLong(), + randomNonNegativeLong(), + randomNonNegativeLong()); + PublishWithJoinResponse initialPublishWithJoinResponse = new PublishWithJoinResponse(initialPublishResponse, + randomBoolean() ? Optional.empty() : Optional.of(initialJoin)); + EqualsHashCodeTestUtils.checkEqualsAndHashCode(initialPublishWithJoinResponse, + publishWithJoinResponse -> copyWriteable(publishWithJoinResponse, writableRegistry(), PublishWithJoinResponse::new), + publishWithJoinResponse -> { + switch (randomInt(1)) { + case 0: + // change publish response + return new PublishWithJoinResponse(new PublishResponse(randomNonNegativeLong(), randomNonNegativeLong()), + publishWithJoinResponse.getJoin()); + case 1: + // change optional join + Join newJoin = new Join(createNode(randomAlphaOfLength(10)), createNode(randomAlphaOfLength(10)), + randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong()); + return new PublishWithJoinResponse(publishWithJoinResponse.getPublishResponse(), + publishWithJoinResponse.getJoin().isPresent() && randomBoolean() ? Optional.empty() : Optional.of(newJoin)); + default: + throw new AssertionError(); + } + }); + } + public void testStartJoinRequestEqualsHashCodeSerialization() { StartJoinRequest initialStartJoinRequest = new StartJoinRequest(createNode(randomAlphaOfLength(10)), randomNonNegativeLong()); EqualsHashCodeTestUtils.checkEqualsAndHashCode(initialStartJoinRequest, diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/PublicationTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/PublicationTests.java index 1913430caf57a..090e42812b5ee 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/PublicationTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/PublicationTests.java @@ -21,26 +21,39 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterState.VotingConfiguration; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.discovery.Discovery; import org.elasticsearch.discovery.zen.PublishClusterStateActionTests; +import org.elasticsearch.discovery.zen.PublishClusterStateActionTests.AssertingAckListener; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.transport.TransportException; import org.elasticsearch.transport.TransportResponse; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Random; +import java.util.Set; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; import java.util.function.LongSupplier; +import java.util.stream.Collector; +import java.util.stream.Collectors; +import java.util.stream.Stream; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.lessThanOrEqualTo; public class PublicationTests extends ESTestCase { @@ -49,7 +62,7 @@ class MockNode { MockNode(Settings settings, DiscoveryNode localNode) { this.localNode = localNode; ClusterState initialState = CoordinationStateTests.clusterState(0L, 0L, localNode, - ClusterState.VotingConfiguration.EMPTY_CONFIG, ClusterState.VotingConfiguration.EMPTY_CONFIG, 0L); + VotingConfiguration.EMPTY_CONFIG, VotingConfiguration.EMPTY_CONFIG, 0L); coordinationState = new CoordinationState(settings, localNode, new CoordinationStateTests.InMemoryPersistedState(0L, initialState)); } @@ -58,7 +71,7 @@ class MockNode { final CoordinationState coordinationState; - public MockPublication publish(ClusterState clusterState, Discovery.AckListener ackListener) { + public MockPublication publish(ClusterState clusterState, Discovery.AckListener ackListener, Set faultyNodes) { PublishRequest publishRequest = coordinationState.handleClientValue(clusterState); MockPublication currentPublication = new MockPublication(Settings.EMPTY, publishRequest, ackListener, () -> 0L) { @Override @@ -71,7 +84,7 @@ protected Optional handlePublishResponse(DiscoveryNode sourc return coordinationState.handlePublishResponse(sourceNode, publishResponse); } }; - currentPublication.start(Collections.emptySet()); + currentPublication.start(faultyNodes); return currentPublication; } } @@ -86,7 +99,7 @@ abstract class MockPublication extends Publication { boolean success; - Map> pendingPublications = new HashMap<>(); + Map> pendingPublications = new HashMap<>(); Map> pendingCommits = new HashMap<>(); public MockPublication(Settings settings, PublishRequest publishRequest, Discovery.AckListener ackListener, @@ -102,13 +115,13 @@ protected void onCompletion(boolean success) { } @Override - protected void onPossibleJoin(DiscoveryNode sourceNode, LegislatorPublishResponse response) { + protected void onPossibleJoin(DiscoveryNode sourceNode, PublishWithJoinResponse response) { } @Override protected void sendPublishRequest(DiscoveryNode destination, PublishRequest publishRequest, - ActionListener responseActionListener) { + ActionListener responseActionListener) { assertSame(publishRequest, this.publishRequest); assertNull(pendingPublications.put(destination, responseActionListener)); } @@ -125,43 +138,49 @@ protected void sendApplyCommit(DiscoveryNode destination, ApplyCommitRequest app } } - public void testSimpleClusterStatePublishing() throws InterruptedException { - DiscoveryNode n1 = CoordinationStateTests.createNode("node1"); - DiscoveryNode n2 = CoordinationStateTests.createNode("node2"); - DiscoveryNode n3 = CoordinationStateTests.createNode("node3"); + DiscoveryNode n1 = CoordinationStateTests.createNode("node1"); + DiscoveryNode n2 = CoordinationStateTests.createNode("node2"); + DiscoveryNode n3 = CoordinationStateTests.createNode("node3"); + Set discoNodes = Sets.newHashSet(n1, n2, n3); - MockNode node1 = new MockNode(Settings.EMPTY, n1); - MockNode node2 = new MockNode(Settings.EMPTY, n2); - MockNode node3 = new MockNode(Settings.EMPTY, n3); - List nodes = Arrays.asList(node1, node2, node3); + MockNode node1 = new MockNode(Settings.EMPTY, n1); + MockNode node2 = new MockNode(Settings.EMPTY, n2); + MockNode node3 = new MockNode(Settings.EMPTY, n3); + List nodes = Arrays.asList(node1, node2, node3); - Function nodeResolver = dn -> nodes.stream().filter(mn -> mn.localNode.equals(dn)).findFirst().get(); + Function nodeResolver = dn -> nodes.stream().filter(mn -> mn.localNode.equals(dn)).findFirst().get(); - ClusterState.VotingConfiguration singleNodeConfig = new ClusterState.VotingConfiguration(Sets.newHashSet(node1.localNode.getId())); - node1.coordinationState.setInitialState( - CoordinationStateTests.clusterState(0L, 1L, n1, singleNodeConfig, singleNodeConfig, 0L)); + private void initializeCluster(VotingConfiguration initialConfig) { + node1.coordinationState.setInitialState(CoordinationStateTests.clusterState(0L, 1L, n1, initialConfig, initialConfig, 0L)); StartJoinRequest startJoinRequest = new StartJoinRequest(n1, 1L); node1.coordinationState.handleJoin(node1.coordinationState.handleStartJoin(startJoinRequest)); - assertTrue(node1.coordinationState.electionWon()); node1.coordinationState.handleJoin(node2.coordinationState.handleStartJoin(startJoinRequest)); node1.coordinationState.handleJoin(node3.coordinationState.handleStartJoin(startJoinRequest)); + assertTrue(node1.coordinationState.electionWon()); + } - PublishClusterStateActionTests.AssertingAckListener ackListener = - new PublishClusterStateActionTests.AssertingAckListener(nodes.size()); + public void testSimpleClusterStatePublishing() throws InterruptedException { + VotingConfiguration singleNodeConfig = new VotingConfiguration(Sets.newHashSet(n1.getId())); + initializeCluster(singleNodeConfig); + + AssertingAckListener ackListener = new AssertingAckListener(nodes.size()); + DiscoveryNodes discoveryNodes = DiscoveryNodes.builder().add(n1).add(n2).add(n3).localNodeId(n1.getId()).build(); MockPublication publication = node1.publish(CoordinationStateTests.clusterState(1L, 2L, - DiscoveryNodes.builder().add(n1).add(n2).add(n3).localNodeId(n1.getId()).build(), - singleNodeConfig, singleNodeConfig, 42L), ackListener); + discoveryNodes, singleNodeConfig, singleNodeConfig, 42L), ackListener, Collections.emptySet()); - assertThat(publication.pendingPublications.keySet(), equalTo(Sets.newHashSet(n1, n2, n3))); + assertThat(publication.pendingPublications.keySet(), equalTo(discoNodes)); assertTrue(publication.pendingCommits.isEmpty()); - publication.pendingPublications.entrySet().stream().forEach(e -> { + publication.pendingPublications.entrySet().stream().collect(shuffle()).forEach(e -> { PublishResponse publishResponse = nodeResolver.apply(e.getKey()).coordinationState.handlePublishRequest( publication.publishRequest); - e.getValue().onResponse(new LegislatorPublishResponse(publishResponse, Optional.empty())); + e.getValue().onResponse(new PublishWithJoinResponse(publishResponse, Optional.empty())); }); - assertThat(publication.pendingCommits.keySet(), equalTo(Sets.newHashSet(n1, n2, n3))); - publication.pendingCommits.entrySet().stream().forEach(e -> { + assertThat(publication.pendingCommits.keySet(), equalTo(discoNodes)); + assertNotNull(publication.applyCommit); + assertEquals(publication.applyCommit.getTerm(), publication.publishRequest.getAcceptedState().term()); + assertEquals(publication.applyCommit.getVersion(), publication.publishRequest.getAcceptedState().version()); + publication.pendingCommits.entrySet().stream().collect(shuffle()).forEach(e -> { nodeResolver.apply(e.getKey()).coordinationState.handleCommit(publication.applyCommit); e.getValue().onResponse(TransportResponse.Empty.INSTANCE); }); @@ -172,5 +191,187 @@ public void testSimpleClusterStatePublishing() throws InterruptedException { ackListener.await(0L, TimeUnit.SECONDS); } + public void testClusterStatePublishingWithFaultyNode() throws InterruptedException { + VotingConfiguration singleNodeConfig = new VotingConfiguration(Sets.newHashSet(n1.getId())); + initializeCluster(singleNodeConfig); + + AssertingAckListener ackListener = new AssertingAckListener(nodes.size()); + DiscoveryNodes discoveryNodes = DiscoveryNodes.builder().add(n1).add(n2).add(n3).localNodeId(n1.getId()).build(); + + boolean failNodeWhenCommitting = randomBoolean(); + boolean publicationDidNotMakeItToNode2 = randomBoolean(); + AtomicInteger remainingActions = new AtomicInteger( + failNodeWhenCommitting ? (publicationDidNotMakeItToNode2 ? 2 : 3) : 4); + int injectFaultAt = randomInt(remainingActions.get() - 1); + logger.info("Injecting fault at: {}, failNodeWhenCommitting: {}, publicationDidNotMakeItToNode2: {}", injectFaultAt, + failNodeWhenCommitting, publicationDidNotMakeItToNode2); + + Set initialFaultyNodes = + failNodeWhenCommitting == false && remainingActions.decrementAndGet() == injectFaultAt ? + Collections.singleton(n2) : Collections.emptySet(); + MockPublication publication = node1.publish(CoordinationStateTests.clusterState(1L, 2L, + discoveryNodes, singleNodeConfig, singleNodeConfig, 42L), ackListener, initialFaultyNodes); + + publication.pendingPublications.entrySet().stream().collect(shuffle()).forEach(e -> { + if (failNodeWhenCommitting == false) { + if (remainingActions.decrementAndGet() == injectFaultAt) { + publication.onFaultyNode(n2); + } + if (e.getKey().equals(n2) == false || randomBoolean()) { + PublishResponse publishResponse = nodeResolver.apply(e.getKey()).coordinationState.handlePublishRequest( + publication.publishRequest); + e.getValue().onResponse(new PublishWithJoinResponse(publishResponse, Optional.empty())); + } + } else { + if (e.getKey().equals(n2) == false || publicationDidNotMakeItToNode2 == false) { + PublishResponse publishResponse = nodeResolver.apply(e.getKey()).coordinationState.handlePublishRequest( + publication.publishRequest); + e.getValue().onResponse(new PublishWithJoinResponse(publishResponse, Optional.empty())); + } + } + }); + + publication.pendingCommits.entrySet().stream().collect(shuffle()).forEach(e -> { + if (failNodeWhenCommitting == false) { + nodeResolver.apply(e.getKey()).coordinationState.handleCommit(publication.applyCommit); + e.getValue().onResponse(TransportResponse.Empty.INSTANCE); + } else { + if (e.getKey().equals(n2)) { + // we must fail node before committing for the node, otherwise failing the node is ignored + publication.onFaultyNode(n2); + } + if (remainingActions.decrementAndGet() == injectFaultAt) { + publication.onFaultyNode(n2); + } + if (e.getKey().equals(n2) == false || randomBoolean()) { + nodeResolver.apply(e.getKey()).coordinationState.handleCommit(publication.applyCommit); + e.getValue().onResponse(TransportResponse.Empty.INSTANCE); + } + } + }); + + // we need to complete publication by failing the node + if (failNodeWhenCommitting && publicationDidNotMakeItToNode2 && remainingActions.get() > injectFaultAt) { + publication.onFaultyNode(n2); + } + + assertTrue(publication.completed); + assertTrue(publication.success); + + publication.onFaultyNode(randomFrom(n1, n3)); // has no influence + + List> errors = ackListener.awaitErrors(0L, TimeUnit.SECONDS); + assertThat(errors.size(), equalTo(1)); + assertThat(errors.get(0).v1(), equalTo(n2)); + } + + public void testClusterStatePublishingFailsOrTimesOutBeforeCommit() throws InterruptedException { + VotingConfiguration config = new VotingConfiguration(Sets.newHashSet(n1.getId(), n2.getId())); + initializeCluster(config); + + AssertingAckListener ackListener = new AssertingAckListener(nodes.size()); + DiscoveryNodes discoveryNodes = DiscoveryNodes.builder().add(n1).add(n2).add(n3).localNodeId(n1.getId()).build(); + MockPublication publication = node1.publish(CoordinationStateTests.clusterState(1L, 2L, + discoveryNodes, config, config, 42L), ackListener, Collections.emptySet()); + + publication.pendingPublications.entrySet().stream().collect(shuffle()).forEach(e -> { + if (e.getKey().equals(n2)) { + if (randomBoolean()) { + publication.onTimeout(); + } else { + e.getValue().onFailure(new TransportException(new Exception("dummy failure"))); + } + assertTrue(publication.completed); + assertFalse(publication.success); + } else if (randomBoolean()) { + PublishResponse publishResponse = nodeResolver.apply(e.getKey()).coordinationState.handlePublishRequest( + publication.publishRequest); + e.getValue().onResponse(new PublishWithJoinResponse(publishResponse, Optional.empty())); + } + }); + + assertThat(publication.pendingCommits.keySet(), equalTo(Collections.emptySet())); + assertNull(publication.applyCommit); + assertTrue(publication.completed); + assertFalse(publication.success); + + List> errors = ackListener.awaitErrors(0L, TimeUnit.SECONDS); + assertThat(errors.size(), equalTo(2)); // publication does not ack for the local node + } + + public void testClusterStatePublishingTimesOutAfterCommit() throws InterruptedException { + VotingConfiguration config = new VotingConfiguration(Sets.newHashSet(n1.getId(), n2.getId())); + initializeCluster(config); + + AssertingAckListener ackListener = new AssertingAckListener(nodes.size()); + DiscoveryNodes discoveryNodes = DiscoveryNodes.builder().add(n1).add(n2).add(n3).localNodeId(n1.getId()).build(); + MockPublication publication = node1.publish(CoordinationStateTests.clusterState(1L, 2L, + discoveryNodes, config, config, 42L), ackListener, Collections.emptySet()); + + boolean publishedToN3 = randomBoolean(); + publication.pendingPublications.entrySet().stream().collect(shuffle()).forEach(e -> { + if (e.getKey().equals(n3) == false || publishedToN3) { + PublishResponse publishResponse = nodeResolver.apply(e.getKey()).coordinationState.handlePublishRequest( + publication.publishRequest); + e.getValue().onResponse(new PublishWithJoinResponse(publishResponse, Optional.empty())); + } + }); + + assertNotNull(publication.applyCommit); + + Set committingNodes = new HashSet<>(randomSubsetOf(discoNodes)); + if (publishedToN3 == false) { + committingNodes.remove(n3); + } + + logger.info("Committing nodes: {}", committingNodes); + + publication.pendingCommits.entrySet().stream().collect(shuffle()).forEach(e -> { + if (committingNodes.contains(e.getKey())) { + nodeResolver.apply(e.getKey()).coordinationState.handleCommit(publication.applyCommit); + e.getValue().onResponse(TransportResponse.Empty.INSTANCE); + } + }); + + publication.onTimeout(); + assertTrue(publication.completed); + + if (committingNodes.contains(n1)) { // master needs to commit for publication to be successful + assertTrue(publication.success); + } else { + assertFalse(publication.success); + } + + Set ackedNodes = new HashSet<>(committingNodes); + ackedNodes.remove(n1); // publication does not ack for the master node + assertEquals(ackedNodes, ackListener.await(0L, TimeUnit.SECONDS)); + + // check that acking still works after publication completed + if (publishedToN3 == false) { + publication.pendingPublications.get(n3).onResponse( + new PublishWithJoinResponse(node3.coordinationState.handlePublishRequest(publication.publishRequest), Optional.empty())); + } + + assertEquals(discoNodes, publication.pendingCommits.keySet()); + + Set nonCommittedNodes = Sets.difference(discoNodes, committingNodes); + logger.info("Non-committed nodes: {}", nonCommittedNodes); + nonCommittedNodes.stream().collect(shuffle()).forEach(n -> { + if (n.equals(n1) == false || randomBoolean()) { + publication.pendingCommits.get(n).onResponse(TransportResponse.Empty.INSTANCE); + } + }); + + assertEquals(Sets.newHashSet(n2, n3), ackListener.await(0L, TimeUnit.SECONDS)); // publication does not ack for the local node + } + + public static Collector> shuffle() { + return Collectors.collectingAndThen(Collectors.toList(), + ts -> { + Collections.shuffle(ts, random()); + return ts.stream(); + }); + } + } diff --git a/server/src/test/java/org/elasticsearch/discovery/zen/PublishClusterStateActionTests.java b/server/src/test/java/org/elasticsearch/discovery/zen/PublishClusterStateActionTests.java index 0503a4f819d23..57bde7f70ccbc 100644 --- a/server/src/test/java/org/elasticsearch/discovery/zen/PublishClusterStateActionTests.java +++ b/server/src/test/java/org/elasticsearch/discovery/zen/PublishClusterStateActionTests.java @@ -65,8 +65,10 @@ import java.util.Arrays; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -816,6 +818,7 @@ public AssertingAckListener publishState(PublishClusterStateAction action, Clust public static class AssertingAckListener implements Discovery.AckListener { private final List> errors = new CopyOnWriteArrayList<>(); + private final Set successfulAcks = Collections.synchronizedSet(new HashSet<>()); private final CountDownLatch countDown; private final CountDownLatch commitCountDown; @@ -833,13 +836,16 @@ public void onCommit(TimeValue commitTime) { public void onNodeAck(DiscoveryNode node, @Nullable Exception e) { if (e != null) { errors.add(new Tuple<>(node, e)); + } else { + successfulAcks.add(node); } countDown.countDown(); } - public void await(long timeout, TimeUnit unit) throws InterruptedException { + public Set await(long timeout, TimeUnit unit) throws InterruptedException { assertThat(awaitErrors(timeout, unit), emptyIterable()); assertTrue(commitCountDown.await(timeout, unit)); + return new HashSet<>(successfulAcks); } public List> awaitErrors(long timeout, TimeUnit unit) throws InterruptedException { From 9569856fdc8a40637fee3dd0b5ebe7f7739bff80 Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Fri, 27 Jul 2018 16:09:39 +0200 Subject: [PATCH 05/19] more fixes --- .../cluster/coordination/Publication.java | 113 ++++++++---------- .../coordination/PublicationTests.java | 13 +- 2 files changed, 59 insertions(+), 67 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/Publication.java b/server/src/main/java/org/elasticsearch/cluster/coordination/Publication.java index c3b129cc8aee8..772bbc938f7a7 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/Publication.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/Publication.java @@ -26,6 +26,7 @@ import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.discovery.Discovery; import org.elasticsearch.discovery.Discovery.AckListener; import org.elasticsearch.transport.TransportException; import org.elasticsearch.transport.TransportResponse; @@ -34,19 +35,21 @@ import java.util.List; import java.util.Optional; import java.util.Set; -import java.util.concurrent.atomic.AtomicReference; import java.util.function.LongSupplier; public abstract class Publication extends AbstractComponent { - private final AtomicReference applyCommitReference; private final List publicationTargets; + private final PublicationTarget localPublicationTarget; private final PublishRequest publishRequest; private final AckListener ackListener; private final DiscoveryNode localNode; private final LongSupplier currentTimeSupplier; private final long startTime; - private boolean isCompleted; + + private Optional applyCommitRequest; // set when state is committed + private boolean isCompleted; // set when publication is completed + private boolean timedOut; // set when publication timed out public Publication(Settings settings, PublishRequest publishRequest, AckListener ackListener, LongSupplier currentTimeSupplier) { super(settings); @@ -54,11 +57,11 @@ public Publication(Settings settings, PublishRequest publishRequest, AckListener this.ackListener = ackListener; this.localNode = publishRequest.getAcceptedState().nodes().getLocalNode(); this.currentTimeSupplier = currentTimeSupplier; - applyCommitReference = new AtomicReference<>(); startTime = currentTimeSupplier.getAsLong(); - + applyCommitRequest = Optional.empty(); publicationTargets = new ArrayList<>(publishRequest.getAcceptedState().getNodes().getNodes().size()); publishRequest.getAcceptedState().getNodes().iterator().forEachRemaining(n -> publicationTargets.add(new PublicationTarget(n))); + localPublicationTarget = publicationTargets.stream().filter(target -> target.discoveryNode.equals(localNode)).findFirst().get(); } public void start(Set faultyNodes) { @@ -72,7 +75,14 @@ public void start(Set faultyNodes) { } public void onTimeout() { - publicationTargets.stream().filter(PublicationTarget::isActive).forEach(PublicationTarget::onTimeOut); + assert timedOut == false; + timedOut = true; + if (localPublicationTarget.publicationTargetStateMachine.isActive()) { + logger.debug("onTimeout: [{}] timed out before master committed", this); + // fail all current publications + final Exception e = new ElasticsearchException("publication timed out before master committed"); + publicationTargets.stream().filter(PublicationTarget::isActive).forEach(pt -> pt.setFailed(e)); + } onPossibleCompletion(); } @@ -86,41 +96,43 @@ private void onPossibleCompletion() { return; } - for (final PublicationTarget target : publicationTargets) { - if (target.publicationTargetStateMachine.isActive()) { - return; + if (timedOut == false) { + for (final PublicationTarget target : publicationTargets) { + if (target.publicationTargetStateMachine.isActive()) { + return; + } } } - for (final PublicationTarget target : publicationTargets) { - if (target.discoveryNode.equals(localNode) && target.publicationTargetStateMachine.isFailed()) { - logger.debug("onPossibleCompletion: [{}] failed on master", this); - assert isCompleted == false; - isCompleted = true; - onCompletion(false); - return; - } + if (localPublicationTarget.publicationTargetStateMachine.isFailed()) { + logger.debug("onPossibleCompletion: [{}] failed on master", this); + assert isCompleted == false; + isCompleted = true; + onCompletion(false); + return; } assert isCompleted == false; isCompleted = true; onCompletion(true); - assert applyCommitReference.get() != null; + assert applyCommitRequest.isPresent(); logger.trace("onPossibleCompletion: [{}] was successful, applying new state locally", this); } // For assertions only: verify that this invariant holds - private boolean publicationCompletedIffAllTargetsInactive() { - for (final PublicationTarget target : publicationTargets) { - if (target.publicationTargetStateMachine.isActive()) { - return isCompleted == false; + private boolean publicationCompletedIffAllTargetsInactiveOrTimedOut() { + if (timedOut == false) { + for (final PublicationTarget target : publicationTargets) { + if (target.publicationTargetStateMachine.isActive()) { + return isCompleted == false; + } } } return isCompleted; } private void onPossibleCommitFailure() { - if (applyCommitReference.get() != null) { + if (applyCommitRequest.isPresent()) { onPossibleCompletion(); return; } @@ -136,7 +148,8 @@ private void onPossibleCommitFailure() { if (isPublishQuorum(possiblySuccessfulNodes) == false) { logger.debug("onPossibleCommitFailure: non-failed nodes do not form a quorum, so {} cannot succeed", this); - publicationTargets.stream().filter(PublicationTarget::isActive).forEach(PublicationTarget::setFailed); + Exception e = new Discovery.FailedToCommitClusterStateException("non-failed nodes do not form a quorum"); + publicationTargets.stream().filter(PublicationTarget::isActive).forEach(pt -> pt.setFailed(e)); onPossibleCompletion(); } } @@ -218,7 +231,6 @@ public boolean isFailed() { @Override public String toString() { - // TODO DANGER non-volatile, mutable variable requires synchronisation return state.toString(); } } @@ -245,19 +257,19 @@ public void sendPublishRequest() { publicationTargetStateMachine.setState(PublicationTargetState.SENT_PUBLISH_REQUEST); Publication.this.sendPublishRequest(discoveryNode, publishRequest, new PublicationTarget.PublishResponseHandler()); // TODO Can this ^ fail with an exception? Target should be failed if so. - assert publicationCompletedIffAllTargetsInactive(); + assert publicationCompletedIffAllTargetsInactiveOrTimedOut(); } void handlePublishResponse(PublishResponse publishResponse) { assert publicationTargetStateMachine.isWaitingForQuorum() : publicationTargetStateMachine; logger.trace("handlePublishResponse: handling [{}] from [{}])", publishResponse, discoveryNode); - if (applyCommitReference.get() != null) { + if (applyCommitRequest.isPresent()) { sendApplyCommit(); } else { Publication.this.handlePublishResponse(discoveryNode, publishResponse).ifPresent(applyCommit -> { - assert applyCommitReference.get() == null; - applyCommitReference.set(applyCommit); + assert applyCommitRequest.isPresent() == false; + applyCommitRequest = Optional.of(applyCommit); ackListener.onCommit(TimeValue.timeValueMillis(currentTimeSupplier.getAsLong() - startTime)); publicationTargets.stream().filter(PublicationTarget::isWaitingForQuorum).forEach(PublicationTarget::sendApplyCommit); }); @@ -266,12 +278,9 @@ void handlePublishResponse(PublishResponse publishResponse) { public void sendApplyCommit() { publicationTargetStateMachine.setState(PublicationTargetState.SENT_APPLY_COMMIT); - - ApplyCommitRequest applyCommit = applyCommitReference.get(); - assert applyCommit != null; - - Publication.this.sendApplyCommit(discoveryNode, applyCommit, new PublicationTarget.ApplyCommitResponseHandler()); - assert publicationCompletedIffAllTargetsInactive(); + assert applyCommitRequest.isPresent(); + Publication.this.sendApplyCommit(discoveryNode, applyCommitRequest.get(), new PublicationTarget.ApplyCommitResponseHandler()); + assert publicationCompletedIffAllTargetsInactiveOrTimedOut(); } public boolean isWaitingForQuorum() { @@ -282,25 +291,17 @@ public boolean isActive() { return publicationTargetStateMachine.isActive(); } - public void setFailed() { - assert isActive(); - publicationTargetStateMachine.setState(PublicationTargetState.FAILED); - ackOnce(new ElasticsearchException("publication failed")); - } - - public void onTimeOut() { + public void setFailed(Exception e) { assert isActive(); publicationTargetStateMachine.setState(PublicationTargetState.FAILED); - if (applyCommitReference.get() == null) { - ackOnce(new ElasticsearchException("publication timed out")); - } + ackOnce(e); } public void onFaultyNode(DiscoveryNode faultyNode) { if (isActive() && discoveryNode.equals(faultyNode)) { logger.debug("onFaultyNode: [{}] is faulty, failing target in publication of version [{}] in term [{}]", faultyNode, publishRequest.getAcceptedState().version(), publishRequest.getAcceptedState().term()); - setFailed(); + setFailed(new Exception("faulty node")); onPossibleCommitFailure(); } } @@ -317,15 +318,8 @@ private class PublishResponseHandler implements ActionListener ackedNodes = new HashSet<>(committingNodes); ackedNodes.remove(n1); // publication does not ack for the master node assertEquals(ackedNodes, ackListener.await(0L, TimeUnit.SECONDS)); From b1d7aedbc04fa44953d4db9d47dd5c8b869da881 Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Mon, 30 Jul 2018 08:51:44 +0200 Subject: [PATCH 06/19] simplify --- .../cluster/coordination/Publication.java | 24 ++++------- .../coordination/PublicationTests.java | 42 +++++++------------ 2 files changed, 23 insertions(+), 43 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/Publication.java b/server/src/main/java/org/elasticsearch/cluster/coordination/Publication.java index 772bbc938f7a7..19c976841881b 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/Publication.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/Publication.java @@ -40,10 +40,8 @@ public abstract class Publication extends AbstractComponent { private final List publicationTargets; - private final PublicationTarget localPublicationTarget; private final PublishRequest publishRequest; private final AckListener ackListener; - private final DiscoveryNode localNode; private final LongSupplier currentTimeSupplier; private final long startTime; @@ -55,13 +53,11 @@ public Publication(Settings settings, PublishRequest publishRequest, AckListener super(settings); this.publishRequest = publishRequest; this.ackListener = ackListener; - this.localNode = publishRequest.getAcceptedState().nodes().getLocalNode(); this.currentTimeSupplier = currentTimeSupplier; startTime = currentTimeSupplier.getAsLong(); applyCommitRequest = Optional.empty(); publicationTargets = new ArrayList<>(publishRequest.getAcceptedState().getNodes().getNodes().size()); publishRequest.getAcceptedState().getNodes().iterator().forEachRemaining(n -> publicationTargets.add(new PublicationTarget(n))); - localPublicationTarget = publicationTargets.stream().filter(target -> target.discoveryNode.equals(localNode)).findFirst().get(); } public void start(Set faultyNodes) { @@ -77,10 +73,10 @@ public void start(Set faultyNodes) { public void onTimeout() { assert timedOut == false; timedOut = true; - if (localPublicationTarget.publicationTargetStateMachine.isActive()) { - logger.debug("onTimeout: [{}] timed out before master committed", this); + if (applyCommitRequest.isPresent() == false) { + logger.debug("onTimeout: [{}] timed out before committing", this); // fail all current publications - final Exception e = new ElasticsearchException("publication timed out before master committed"); + final Exception e = new ElasticsearchException("publication timed out before committing"); publicationTargets.stream().filter(PublicationTarget::isActive).forEach(pt -> pt.setFailed(e)); } onPossibleCompletion(); @@ -104,8 +100,8 @@ private void onPossibleCompletion() { } } - if (localPublicationTarget.publicationTargetStateMachine.isFailed()) { - logger.debug("onPossibleCompletion: [{}] failed on master", this); + if (applyCommitRequest.isPresent() == false) { + logger.debug("onPossibleCompletion: [{}] commit failed", this); assert isCompleted == false; isCompleted = true; onCompletion(false); @@ -137,7 +133,7 @@ private void onPossibleCommitFailure() { return; } - CoordinationState.VoteCollection possiblySuccessfulNodes = new CoordinationState.VoteCollection(); + final CoordinationState.VoteCollection possiblySuccessfulNodes = new CoordinationState.VoteCollection(); for (PublicationTarget publicationTarget : publicationTargets) { if (publicationTarget.publicationTargetStateMachine.mayCommitInFuture()) { possiblySuccessfulNodes.addVote(publicationTarget.discoveryNode); @@ -154,7 +150,7 @@ private void onPossibleCommitFailure() { } } - protected abstract void onCompletion(boolean success); + protected abstract void onCompletion(boolean committed); protected abstract boolean isPublishQuorum(CoordinationState.VoteCollection votes); @@ -246,7 +242,6 @@ private PublicationTarget(DiscoveryNode discoveryNode) { @Override public String toString() { - // everything here is immutable so no synchronisation required return discoveryNode.getId(); } @@ -262,7 +257,6 @@ public void sendPublishRequest() { void handlePublishResponse(PublishResponse publishResponse) { assert publicationTargetStateMachine.isWaitingForQuorum() : publicationTargetStateMachine; - logger.trace("handlePublishResponse: handling [{}] from [{}])", publishResponse, discoveryNode); if (applyCommitRequest.isPresent()) { sendApplyCommit(); @@ -301,13 +295,13 @@ public void onFaultyNode(DiscoveryNode faultyNode) { if (isActive() && discoveryNode.equals(faultyNode)) { logger.debug("onFaultyNode: [{}] is faulty, failing target in publication of version [{}] in term [{}]", faultyNode, publishRequest.getAcceptedState().version(), publishRequest.getAcceptedState().term()); - setFailed(new Exception("faulty node")); + setFailed(new ElasticsearchException("faulty node")); onPossibleCommitFailure(); } } private void ackOnce(Exception e) { - if (ackIsPending && localNode.equals(discoveryNode) == false) { + if (ackIsPending) { ackIsPending = false; ackListener.onNodeAck(discoveryNode, e); } diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/PublicationTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/PublicationTests.java index 87d2c959df420..a9beca9cd8c7d 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/PublicationTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/PublicationTests.java @@ -93,7 +93,7 @@ abstract class MockPublication extends Publication { boolean completed; - boolean success; + boolean committed; Map> pendingPublications = new HashMap<>(); Map> pendingCommits = new HashMap<>(); @@ -105,9 +105,9 @@ public MockPublication(Settings settings, PublishRequest publishRequest, Discove } @Override - protected void onCompletion(boolean success) { + protected void onCompletion(boolean committed) { completed = true; - this.success = success; + this.committed = committed; } @Override @@ -182,9 +182,9 @@ public void testSimpleClusterStatePublishing() throws InterruptedException { }); assertTrue(publication.completed); - assertTrue(publication.success); + assertTrue(publication.committed); - ackListener.await(0L, TimeUnit.SECONDS); + assertThat(ackListener.await(0L, TimeUnit.SECONDS).size(), equalTo(3)); } public void testClusterStatePublishingWithFaultyNode() throws InterruptedException { @@ -252,7 +252,7 @@ public void testClusterStatePublishingWithFaultyNode() throws InterruptedExcepti } assertTrue(publication.completed); - assertTrue(publication.success); + assertTrue(publication.committed); publication.onFaultyNode(randomFrom(n1, n3)); // has no influence @@ -278,7 +278,7 @@ public void testClusterStatePublishingFailsOrTimesOutBeforeCommit() throws Inter e.getValue().onFailure(new TransportException(new Exception("dummy failure"))); } assertTrue(publication.completed); - assertFalse(publication.success); + assertFalse(publication.committed); } else if (randomBoolean()) { PublishResponse publishResponse = nodeResolver.apply(e.getKey()).coordinationState.handlePublishRequest( publication.publishRequest); @@ -289,10 +289,10 @@ public void testClusterStatePublishingFailsOrTimesOutBeforeCommit() throws Inter assertThat(publication.pendingCommits.keySet(), equalTo(Collections.emptySet())); assertNull(publication.applyCommit); assertTrue(publication.completed); - assertFalse(publication.success); + assertFalse(publication.committed); List> errors = ackListener.awaitErrors(0L, TimeUnit.SECONDS); - assertThat(errors.size(), equalTo(2)); // publication does not ack for the local node + assertThat(errors.size(), equalTo(3)); } public void testClusterStatePublishingTimesOutAfterCommit() throws InterruptedException { @@ -331,19 +331,8 @@ public void testClusterStatePublishingTimesOutAfterCommit() throws InterruptedEx publication.onTimeout(); assertTrue(publication.completed); - - if (committingNodes.contains(n1) == false) { // master needs to commit for publication to be successful - assertFalse(publication.success); - // some nodes might have already successfully acked the change, so we can't assert how many exceptions to expect - ackListener.awaitErrors(0L, TimeUnit.SECONDS); - return; - } - - assertTrue(publication.success); - - Set ackedNodes = new HashSet<>(committingNodes); - ackedNodes.remove(n1); // publication does not ack for the master node - assertEquals(ackedNodes, ackListener.await(0L, TimeUnit.SECONDS)); + assertTrue(publication.committed); + assertEquals(committingNodes, ackListener.await(0L, TimeUnit.SECONDS)); // check that acking still works after publication completed if (publishedToN3 == false) { @@ -355,13 +344,10 @@ public void testClusterStatePublishingTimesOutAfterCommit() throws InterruptedEx Set nonCommittedNodes = Sets.difference(discoNodes, committingNodes); logger.info("Non-committed nodes: {}", nonCommittedNodes); - nonCommittedNodes.stream().collect(shuffle()).forEach(n -> { - if (n.equals(n1) == false || randomBoolean()) { - publication.pendingCommits.get(n).onResponse(TransportResponse.Empty.INSTANCE); - } - }); + nonCommittedNodes.stream().collect(shuffle()).forEach(n -> + publication.pendingCommits.get(n).onResponse(TransportResponse.Empty.INSTANCE)); - assertEquals(Sets.newHashSet(n2, n3), ackListener.await(0L, TimeUnit.SECONDS)); // publication does not ack for the local node + assertEquals(discoNodes, ackListener.await(0L, TimeUnit.SECONDS)); } public static Collector> shuffle() { From ad2ea39acaf9803a003f02e6a45d8f207f399a22 Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Thu, 2 Aug 2018 15:02:29 +0200 Subject: [PATCH 07/19] minor stuff --- .../org/elasticsearch/cluster/coordination/Publication.java | 6 +++--- .../cluster/coordination/PublishWithJoinResponse.java | 6 +++++- .../cluster/coordination/PublicationTests.java | 2 +- 3 files changed, 9 insertions(+), 5 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/Publication.java b/server/src/main/java/org/elasticsearch/cluster/coordination/Publication.java index 19c976841881b..1d94f2a91a36c 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/Publication.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/Publication.java @@ -143,7 +143,8 @@ private void onPossibleCommitFailure() { } if (isPublishQuorum(possiblySuccessfulNodes) == false) { - logger.debug("onPossibleCommitFailure: non-failed nodes do not form a quorum, so {} cannot succeed", this); + logger.debug("onPossibleCommitFailure: non-failed nodes {} do not form a quorum, so {} cannot succeed", + possiblySuccessfulNodes, this); Exception e = new Discovery.FailedToCommitClusterStateException("non-failed nodes do not form a quorum"); publicationTargets.stream().filter(PublicationTarget::isActive).forEach(pt -> pt.setFailed(e)); onPossibleCompletion(); @@ -293,8 +294,7 @@ public void setFailed(Exception e) { public void onFaultyNode(DiscoveryNode faultyNode) { if (isActive() && discoveryNode.equals(faultyNode)) { - logger.debug("onFaultyNode: [{}] is faulty, failing target in publication of version [{}] in term [{}]", faultyNode, - publishRequest.getAcceptedState().version(), publishRequest.getAcceptedState().term()); + logger.debug("onFaultyNode: [{}] is faulty, failing target in publication {}", faultyNode, Publication.this); setFailed(new ElasticsearchException("faulty node")); onPossibleCommitFailure(); } diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/PublishWithJoinResponse.java b/server/src/main/java/org/elasticsearch/cluster/coordination/PublishWithJoinResponse.java index 402ef5fe7e697..8628177895ad5 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/PublishWithJoinResponse.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/PublishWithJoinResponse.java @@ -25,9 +25,13 @@ import java.io.IOException; import java.util.Optional; +/** + * Response to a {@link PublishRequest}. Encapsulates both a {@link PublishResponse} + * and an optional {@link Join}. + */ public class PublishWithJoinResponse extends TransportResponse { private final PublishResponse publishResponse; - private final Optional optionalJoin; // if vote was granted due to node having lower term + private final Optional optionalJoin; public PublishWithJoinResponse(PublishResponse publishResponse, Optional optionalJoin) { this.publishResponse = publishResponse; diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/PublicationTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/PublicationTests.java index a9beca9cd8c7d..474d91bf8d886 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/PublicationTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/PublicationTests.java @@ -98,7 +98,7 @@ abstract class MockPublication extends Publication { Map> pendingPublications = new HashMap<>(); Map> pendingCommits = new HashMap<>(); - public MockPublication(Settings settings, PublishRequest publishRequest, Discovery.AckListener ackListener, + MockPublication(Settings settings, PublishRequest publishRequest, Discovery.AckListener ackListener, LongSupplier currentTimeSupplier) { super(settings, publishRequest, ackListener, currentTimeSupplier); this.publishRequest = publishRequest; From a21e8c7b84921a71fcf2bd3abb580e37a18bf43d Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Mon, 6 Aug 2018 09:54:56 +0200 Subject: [PATCH 08/19] inline and remove TermVersionResponse --- .../cluster/coordination/PublishResponse.java | 44 +++++++++- .../coordination/TermVersionResponse.java | 82 ------------------- 2 files changed, 40 insertions(+), 86 deletions(-) delete mode 100644 server/src/main/java/org/elasticsearch/cluster/coordination/TermVersionResponse.java diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/PublishResponse.java b/server/src/main/java/org/elasticsearch/cluster/coordination/PublishResponse.java index f213657ed2e7a..be7c11857021a 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/PublishResponse.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/PublishResponse.java @@ -20,25 +20,43 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; import java.io.IOException; /** * Response to a {@link PublishRequest}, carrying the term and version of the request. + * Typically wrapped in a {@link PublishWithJoinResponse}. */ -public class PublishResponse extends TermVersionResponse { +public class PublishResponse implements Writeable { + + protected final long term; + protected final long version; public PublishResponse(long term, long version) { - super(term, version); + assert term >= 0; + assert version >= 0; + + this.term = term; + this.version = version; } public PublishResponse(StreamInput in) throws IOException { - super(in); + this(in.readLong(), in.readLong()); } @Override public void writeTo(StreamOutput out) throws IOException { - super.writeTo(out); + out.writeLong(term); + out.writeLong(version); + } + + public long getTerm() { + return term; + } + + public long getVersion() { + return version; } @Override @@ -48,4 +66,22 @@ public String toString() { ", version=" + version + '}'; } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + PublishResponse response = (PublishResponse) o; + + if (term != response.term) return false; + return version == response.version; + } + + @Override + public int hashCode() { + int result = (int) (term ^ (term >>> 32)); + result = 31 * result + (int) (version ^ (version >>> 32)); + return result; + } } diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/TermVersionResponse.java b/server/src/main/java/org/elasticsearch/cluster/coordination/TermVersionResponse.java deleted file mode 100644 index 5eba2e6b732a5..0000000000000 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/TermVersionResponse.java +++ /dev/null @@ -1,82 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.elasticsearch.cluster.coordination; - -import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.transport.TransportResponse; - -import java.io.IOException; - -abstract class TermVersionResponse extends TransportResponse { - protected final long term; - protected final long version; - - TermVersionResponse(long term, long version) { - assert term >= 0; - assert version >= 0; - - this.term = term; - this.version = version; - } - - TermVersionResponse(StreamInput in) throws IOException { - this(in.readLong(), in.readLong()); - } - - @Override - public void writeTo(StreamOutput out) throws IOException { - out.writeLong(term); - out.writeLong(version); - } - - public long getTerm() { - return term; - } - - public long getVersion() { - return version; - } - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - - TermVersionResponse response = (TermVersionResponse) o; - - if (term != response.term) return false; - return version == response.version; - } - - @Override - public int hashCode() { - int result = (int) (term ^ (term >>> 32)); - result = 31 * result + (int) (version ^ (version >>> 32)); - return result; - } - - @Override - public String toString() { - return "TermVersionResponse{" + - "term=" + term + - ", version=" + version + - '}'; - } -} From 25d5835ea8c91ccbc2811c087387bd80afbf7049 Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Mon, 6 Aug 2018 10:10:44 +0200 Subject: [PATCH 09/19] remove log message part about applying state --- .../org/elasticsearch/cluster/coordination/Publication.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/Publication.java b/server/src/main/java/org/elasticsearch/cluster/coordination/Publication.java index 1d94f2a91a36c..6a238f549b899 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/Publication.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/Publication.java @@ -112,7 +112,7 @@ private void onPossibleCompletion() { isCompleted = true; onCompletion(true); assert applyCommitRequest.isPresent(); - logger.trace("onPossibleCompletion: [{}] was successful, applying new state locally", this); + logger.trace("onPossibleCompletion: [{}] was successful", this); } // For assertions only: verify that this invariant holds From 43e3399f16437fb4413fb6dbe48c9121b7634304 Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Mon, 6 Aug 2018 10:12:32 +0200 Subject: [PATCH 10/19] Add todo for onPossibleJoin --- .../java/org/elasticsearch/cluster/coordination/Publication.java | 1 + 1 file changed, 1 insertion(+) diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/Publication.java b/server/src/main/java/org/elasticsearch/cluster/coordination/Publication.java index 6a238f549b899..76630e6b56e69 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/Publication.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/Publication.java @@ -317,6 +317,7 @@ public void onResponse(PublishWithJoinResponse response) { return; } + // TODO: check if we need to pass the full response here or if it's sufficient to just pass the optional join. onPossibleJoin(discoveryNode, response); publicationTargetStateMachine.setState(PublicationTargetState.WAITING_FOR_QUORUM); From b7b4e39b9806d0920b8ecade8127aced7563f866 Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Mon, 6 Aug 2018 10:34:35 +0200 Subject: [PATCH 11/19] inline PublicationTargetStateMachine --- .../cluster/coordination/Publication.java | 127 +++++++----------- 1 file changed, 48 insertions(+), 79 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/Publication.java b/server/src/main/java/org/elasticsearch/cluster/coordination/Publication.java index 76630e6b56e69..e5541e9565172 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/Publication.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/Publication.java @@ -94,7 +94,7 @@ private void onPossibleCompletion() { if (timedOut == false) { for (final PublicationTarget target : publicationTargets) { - if (target.publicationTargetStateMachine.isActive()) { + if (target.isActive()) { return; } } @@ -119,7 +119,7 @@ private void onPossibleCompletion() { private boolean publicationCompletedIffAllTargetsInactiveOrTimedOut() { if (timedOut == false) { for (final PublicationTarget target : publicationTargets) { - if (target.publicationTargetStateMachine.isActive()) { + if (target.isActive()) { return isCompleted == false; } } @@ -135,10 +135,10 @@ private void onPossibleCommitFailure() { final CoordinationState.VoteCollection possiblySuccessfulNodes = new CoordinationState.VoteCollection(); for (PublicationTarget publicationTarget : publicationTargets) { - if (publicationTarget.publicationTargetStateMachine.mayCommitInFuture()) { + if (publicationTarget.mayCommitInFuture()) { possiblySuccessfulNodes.addVote(publicationTarget.discoveryNode); } else { - assert publicationTarget.publicationTargetStateMachine.isFailed() : publicationTarget.publicationTargetStateMachine; + assert publicationTarget.isFailed() : publicationTarget; } } @@ -180,62 +180,10 @@ enum PublicationTargetState { APPLIED_COMMIT, } - static class PublicationTargetStateMachine { - private PublicationTargetState state = PublicationTargetState.NOT_STARTED; - - public void setState(PublicationTargetState newState) { - switch (newState) { - case NOT_STARTED: - assert false : state + " -> " + newState; - break; - case SENT_PUBLISH_REQUEST: - assert state == PublicationTargetState.NOT_STARTED : state + " -> " + newState; - break; - case WAITING_FOR_QUORUM: - assert state == PublicationTargetState.SENT_PUBLISH_REQUEST : state + " -> " + newState; - break; - case SENT_APPLY_COMMIT: - assert state == PublicationTargetState.WAITING_FOR_QUORUM : state + " -> " + newState; - break; - case APPLIED_COMMIT: - assert state == PublicationTargetState.SENT_APPLY_COMMIT : state + " -> " + newState; - break; - case FAILED: - assert state != PublicationTargetState.APPLIED_COMMIT : state + " -> " + newState; - break; - } - state = newState; - } - - public boolean isActive() { - return state != PublicationTargetState.FAILED - && state != PublicationTargetState.APPLIED_COMMIT; - } - - public boolean isWaitingForQuorum() { - return state == PublicationTargetState.WAITING_FOR_QUORUM; - } - - public boolean mayCommitInFuture() { - return (state == PublicationTargetState.NOT_STARTED - || state == PublicationTargetState.SENT_PUBLISH_REQUEST - || state == PublicationTargetState.WAITING_FOR_QUORUM); - } - - public boolean isFailed() { - return state == PublicationTargetState.FAILED; - } - - @Override - public String toString() { - return state.toString(); - } - } - private class PublicationTarget { private final DiscoveryNode discoveryNode; - private final PublicationTargetStateMachine publicationTargetStateMachine = new PublicationTargetStateMachine(); private boolean ackIsPending = true; + private PublicationTargetState state = PublicationTargetState.NOT_STARTED; private PublicationTarget(DiscoveryNode discoveryNode) { this.discoveryNode = discoveryNode; @@ -243,21 +191,26 @@ private PublicationTarget(DiscoveryNode discoveryNode) { @Override public String toString() { - return discoveryNode.getId(); + return "PublicationTarget{" + + "discoveryNode=" + discoveryNode + + ", state=" + state + + ", ackIsPending=" + ackIsPending + + '}'; } public void sendPublishRequest() { - if (publicationTargetStateMachine.isFailed()) { + if (isFailed()) { return; } - publicationTargetStateMachine.setState(PublicationTargetState.SENT_PUBLISH_REQUEST); + assert state == PublicationTargetState.NOT_STARTED : state + " -> " + PublicationTargetState.SENT_PUBLISH_REQUEST; + state = PublicationTargetState.SENT_PUBLISH_REQUEST; Publication.this.sendPublishRequest(discoveryNode, publishRequest, new PublicationTarget.PublishResponseHandler()); // TODO Can this ^ fail with an exception? Target should be failed if so. assert publicationCompletedIffAllTargetsInactiveOrTimedOut(); } void handlePublishResponse(PublishResponse publishResponse) { - assert publicationTargetStateMachine.isWaitingForQuorum() : publicationTargetStateMachine; + assert isWaitingForQuorum() : this; logger.trace("handlePublishResponse: handling [{}] from [{}])", publishResponse, discoveryNode); if (applyCommitRequest.isPresent()) { sendApplyCommit(); @@ -272,23 +225,22 @@ void handlePublishResponse(PublishResponse publishResponse) { } public void sendApplyCommit() { - publicationTargetStateMachine.setState(PublicationTargetState.SENT_APPLY_COMMIT); + assert state == PublicationTargetState.WAITING_FOR_QUORUM : state + " -> " + PublicationTargetState.SENT_APPLY_COMMIT; + state = PublicationTargetState.SENT_APPLY_COMMIT; assert applyCommitRequest.isPresent(); Publication.this.sendApplyCommit(discoveryNode, applyCommitRequest.get(), new PublicationTarget.ApplyCommitResponseHandler()); assert publicationCompletedIffAllTargetsInactiveOrTimedOut(); } - public boolean isWaitingForQuorum() { - return publicationTargetStateMachine.isWaitingForQuorum(); - } - - public boolean isActive() { - return publicationTargetStateMachine.isActive(); + public void setAppliedCommit() { + assert state == PublicationTargetState.SENT_APPLY_COMMIT : state + " -> " + PublicationTargetState.APPLIED_COMMIT; + state = PublicationTargetState.APPLIED_COMMIT; + ackOnce(null); } public void setFailed(Exception e) { - assert isActive(); - publicationTargetStateMachine.setState(PublicationTargetState.FAILED); + assert state != PublicationTargetState.APPLIED_COMMIT : state + " -> " + PublicationTargetState.FAILED; + state = PublicationTargetState.FAILED; ackOnce(e); } @@ -307,11 +259,30 @@ private void ackOnce(Exception e) { } } + public boolean isActive() { + return state != PublicationTargetState.FAILED + && state != PublicationTargetState.APPLIED_COMMIT; + } + + public boolean isWaitingForQuorum() { + return state == PublicationTargetState.WAITING_FOR_QUORUM; + } + + public boolean mayCommitInFuture() { + return (state == PublicationTargetState.NOT_STARTED + || state == PublicationTargetState.SENT_PUBLISH_REQUEST + || state == PublicationTargetState.WAITING_FOR_QUORUM); + } + + public boolean isFailed() { + return state == PublicationTargetState.FAILED; + } + private class PublishResponseHandler implements ActionListener { @Override public void onResponse(PublishWithJoinResponse response) { - if (publicationTargetStateMachine.isFailed()) { + if (isFailed()) { logger.debug("PublishResponseHandler.handleResponse: already failed, ignoring response from [{}]", discoveryNode); assert publicationCompletedIffAllTargetsInactiveOrTimedOut(); return; @@ -320,7 +291,8 @@ public void onResponse(PublishWithJoinResponse response) { // TODO: check if we need to pass the full response here or if it's sufficient to just pass the optional join. onPossibleJoin(discoveryNode, response); - publicationTargetStateMachine.setState(PublicationTargetState.WAITING_FOR_QUORUM); + assert state == PublicationTargetState.SENT_PUBLISH_REQUEST : state + " -> " + PublicationTargetState.WAITING_FOR_QUORUM; + state = PublicationTargetState.WAITING_FOR_QUORUM; handlePublishResponse(response.getPublishResponse()); assert publicationCompletedIffAllTargetsInactiveOrTimedOut(); @@ -335,10 +307,9 @@ public void onFailure(Exception e) { } else { logger.debug(() -> new ParameterizedMessage("PublishResponseHandler: [{}] failed", discoveryNode), exp); } - publicationTargetStateMachine.setState(PublicationTargetState.FAILED); + setFailed(e); onPossibleCommitFailure(); assert publicationCompletedIffAllTargetsInactiveOrTimedOut(); - ackOnce(exp); } } @@ -347,15 +318,14 @@ private class ApplyCommitResponseHandler implements ActionListener new ParameterizedMessage("ApplyCommitResponseHandler: [{}] failed", discoveryNode), exp); } - publicationTargetStateMachine.setState(PublicationTargetState.FAILED); + setFailed(e); onPossibleCompletion(); assert publicationCompletedIffAllTargetsInactiveOrTimedOut(); - ackOnce(exp); } } } From f09659ceda699bf19d581973dabe073bd318c132 Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Mon, 6 Aug 2018 10:57:05 +0200 Subject: [PATCH 12/19] addressed a first batch of comments --- .../coordination/PublicationTests.java | 24 ++++++++++++++++--- 1 file changed, 21 insertions(+), 3 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/PublicationTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/PublicationTests.java index 474d91bf8d886..a90bc8b084417 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/PublicationTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/PublicationTests.java @@ -42,6 +42,7 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; import java.util.function.LongSupplier; @@ -49,6 +50,7 @@ import java.util.stream.Collectors; import java.util.stream.Stream; +import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.equalTo; public class PublicationTests extends ESTestCase { @@ -97,6 +99,7 @@ abstract class MockPublication extends Publication { Map> pendingPublications = new HashMap<>(); Map> pendingCommits = new HashMap<>(); + Map possibleJoins = new HashMap<>(); MockPublication(Settings settings, PublishRequest publishRequest, Discovery.AckListener ackListener, LongSupplier currentTimeSupplier) { @@ -106,13 +109,14 @@ abstract class MockPublication extends Publication { @Override protected void onCompletion(boolean committed) { + assertFalse(completed); completed = true; this.committed = committed; } @Override protected void onPossibleJoin(DiscoveryNode sourceNode, PublishWithJoinResponse response) { - + assertNull(possibleJoins.put(sourceNode, response)); } @Override @@ -166,10 +170,22 @@ public void testSimpleClusterStatePublishing() throws InterruptedException { assertThat(publication.pendingPublications.keySet(), equalTo(discoNodes)); assertTrue(publication.pendingCommits.isEmpty()); + AtomicBoolean processedNode1PublishResponse = new AtomicBoolean(); publication.pendingPublications.entrySet().stream().collect(shuffle()).forEach(e -> { PublishResponse publishResponse = nodeResolver.apply(e.getKey()).coordinationState.handlePublishRequest( publication.publishRequest); - e.getValue().onResponse(new PublishWithJoinResponse(publishResponse, Optional.empty())); + assertNotEquals(processedNode1PublishResponse.get(), publication.pendingCommits.isEmpty()); + assertFalse(publication.possibleJoins.containsKey(e.getKey())); + PublishWithJoinResponse publishWithJoinResponse = new PublishWithJoinResponse(publishResponse, + randomBoolean() ? Optional.empty() : Optional.of(new Join(e.getKey(), n1, randomNonNegativeLong(), + randomNonNegativeLong(), randomNonNegativeLong()))); + e.getValue().onResponse(publishWithJoinResponse); + assertTrue(publication.possibleJoins.containsKey(e.getKey())); + assertEquals(publishWithJoinResponse, publication.possibleJoins.get(e.getKey())); + if (e.getKey().equals(n1)) { + processedNode1PublishResponse.set(true); + } + assertNotEquals(processedNode1PublishResponse.get(), publication.pendingCommits.isEmpty()); }); assertThat(publication.pendingCommits.keySet(), equalTo(discoNodes)); @@ -177,6 +193,8 @@ public void testSimpleClusterStatePublishing() throws InterruptedException { assertEquals(publication.applyCommit.getTerm(), publication.publishRequest.getAcceptedState().term()); assertEquals(publication.applyCommit.getVersion(), publication.publishRequest.getAcceptedState().version()); publication.pendingCommits.entrySet().stream().collect(shuffle()).forEach(e -> { + assertFalse(publication.completed); + assertFalse(publication.committed); nodeResolver.apply(e.getKey()).coordinationState.handleCommit(publication.applyCommit); e.getValue().onResponse(TransportResponse.Empty.INSTANCE); }); @@ -184,7 +202,7 @@ public void testSimpleClusterStatePublishing() throws InterruptedException { assertTrue(publication.completed); assertTrue(publication.committed); - assertThat(ackListener.await(0L, TimeUnit.SECONDS).size(), equalTo(3)); + assertThat(ackListener.await(0L, TimeUnit.SECONDS), containsInAnyOrder(n1, n2, n3)); } public void testClusterStatePublishingWithFaultyNode() throws InterruptedException { From a76c4f1c7cde62f4530abf930aa89dd2f4a3fa8e Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Mon, 6 Aug 2018 11:16:08 +0200 Subject: [PATCH 13/19] split testClusterStatePublishingWithFaultyNode into two tests --- .../coordination/PublicationTests.java | 98 +++++++++++-------- 1 file changed, 59 insertions(+), 39 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/PublicationTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/PublicationTests.java index a90bc8b084417..7715ec0d5fa44 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/PublicationTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/PublicationTests.java @@ -205,67 +205,87 @@ public void testSimpleClusterStatePublishing() throws InterruptedException { assertThat(ackListener.await(0L, TimeUnit.SECONDS), containsInAnyOrder(n1, n2, n3)); } - public void testClusterStatePublishingWithFaultyNode() throws InterruptedException { + public void testClusterStatePublishingWithFaultyNodeBeforeCommit() throws InterruptedException { VotingConfiguration singleNodeConfig = new VotingConfiguration(Sets.newHashSet(n1.getId())); initializeCluster(singleNodeConfig); AssertingAckListener ackListener = new AssertingAckListener(nodes.size()); DiscoveryNodes discoveryNodes = DiscoveryNodes.builder().add(n1).add(n2).add(n3).localNodeId(n1.getId()).build(); - boolean failNodeWhenCommitting = randomBoolean(); - boolean publicationDidNotMakeItToNode2 = randomBoolean(); - AtomicInteger remainingActions = new AtomicInteger( - failNodeWhenCommitting ? (publicationDidNotMakeItToNode2 ? 2 : 3) : 4); + AtomicInteger remainingActions = new AtomicInteger(4); // number of publish actions + initial faulty nodes injection int injectFaultAt = randomInt(remainingActions.get() - 1); - logger.info("Injecting fault at: {}, failNodeWhenCommitting: {}, publicationDidNotMakeItToNode2: {}", injectFaultAt, - failNodeWhenCommitting, publicationDidNotMakeItToNode2); + logger.info("Injecting fault at: {}", injectFaultAt); - Set initialFaultyNodes = - failNodeWhenCommitting == false && remainingActions.decrementAndGet() == injectFaultAt ? - Collections.singleton(n2) : Collections.emptySet(); + Set initialFaultyNodes = remainingActions.decrementAndGet() == injectFaultAt ? + Collections.singleton(n2) : Collections.emptySet(); MockPublication publication = node1.publish(CoordinationStateTests.clusterState(1L, 2L, discoveryNodes, singleNodeConfig, singleNodeConfig, 42L), ackListener, initialFaultyNodes); publication.pendingPublications.entrySet().stream().collect(shuffle()).forEach(e -> { - if (failNodeWhenCommitting == false) { - if (remainingActions.decrementAndGet() == injectFaultAt) { - publication.onFaultyNode(n2); - } - if (e.getKey().equals(n2) == false || randomBoolean()) { - PublishResponse publishResponse = nodeResolver.apply(e.getKey()).coordinationState.handlePublishRequest( - publication.publishRequest); - e.getValue().onResponse(new PublishWithJoinResponse(publishResponse, Optional.empty())); - } - } else { - if (e.getKey().equals(n2) == false || publicationDidNotMakeItToNode2 == false) { - PublishResponse publishResponse = nodeResolver.apply(e.getKey()).coordinationState.handlePublishRequest( - publication.publishRequest); - e.getValue().onResponse(new PublishWithJoinResponse(publishResponse, Optional.empty())); - } + if (remainingActions.decrementAndGet() == injectFaultAt) { + publication.onFaultyNode(n2); + } + if (e.getKey().equals(n2) == false || randomBoolean()) { + PublishResponse publishResponse = nodeResolver.apply(e.getKey()).coordinationState.handlePublishRequest( + publication.publishRequest); + e.getValue().onResponse(new PublishWithJoinResponse(publishResponse, Optional.empty())); + } + }); + + publication.pendingCommits.entrySet().stream().collect(shuffle()).forEach(e -> { + nodeResolver.apply(e.getKey()).coordinationState.handleCommit(publication.applyCommit); + e.getValue().onResponse(TransportResponse.Empty.INSTANCE); + }); + + assertTrue(publication.completed); + assertTrue(publication.committed); + + publication.onFaultyNode(randomFrom(n1, n3)); // has no influence + + List> errors = ackListener.awaitErrors(0L, TimeUnit.SECONDS); + assertThat(errors.size(), equalTo(1)); + assertThat(errors.get(0).v1(), equalTo(n2)); + } + + public void testClusterStatePublishingWithFaultyNodeAfterCommit() throws InterruptedException { + VotingConfiguration singleNodeConfig = new VotingConfiguration(Sets.newHashSet(n1.getId())); + initializeCluster(singleNodeConfig); + + AssertingAckListener ackListener = new AssertingAckListener(nodes.size()); + DiscoveryNodes discoveryNodes = DiscoveryNodes.builder().add(n1).add(n2).add(n3).localNodeId(n1.getId()).build(); + + boolean publicationDidNotMakeItToNode2 = randomBoolean(); + AtomicInteger remainingActions = new AtomicInteger(publicationDidNotMakeItToNode2 ? 2 : 3); + int injectFaultAt = randomInt(remainingActions.get() - 1); + logger.info("Injecting fault at: {}, publicationDidNotMakeItToNode2: {}", injectFaultAt, publicationDidNotMakeItToNode2); + + MockPublication publication = node1.publish(CoordinationStateTests.clusterState(1L, 2L, + discoveryNodes, singleNodeConfig, singleNodeConfig, 42L), ackListener, Collections.emptySet()); + + publication.pendingPublications.entrySet().stream().collect(shuffle()).forEach(e -> { + if (e.getKey().equals(n2) == false || publicationDidNotMakeItToNode2 == false) { + PublishResponse publishResponse = nodeResolver.apply(e.getKey()).coordinationState.handlePublishRequest( + publication.publishRequest); + e.getValue().onResponse(new PublishWithJoinResponse(publishResponse, Optional.empty())); } }); publication.pendingCommits.entrySet().stream().collect(shuffle()).forEach(e -> { - if (failNodeWhenCommitting == false) { + if (e.getKey().equals(n2)) { + // we must fail node before committing for the node, otherwise failing the node is ignored + publication.onFaultyNode(n2); + } + if (remainingActions.decrementAndGet() == injectFaultAt) { + publication.onFaultyNode(n2); + } + if (e.getKey().equals(n2) == false || randomBoolean()) { nodeResolver.apply(e.getKey()).coordinationState.handleCommit(publication.applyCommit); e.getValue().onResponse(TransportResponse.Empty.INSTANCE); - } else { - if (e.getKey().equals(n2)) { - // we must fail node before committing for the node, otherwise failing the node is ignored - publication.onFaultyNode(n2); - } - if (remainingActions.decrementAndGet() == injectFaultAt) { - publication.onFaultyNode(n2); - } - if (e.getKey().equals(n2) == false || randomBoolean()) { - nodeResolver.apply(e.getKey()).coordinationState.handleCommit(publication.applyCommit); - e.getValue().onResponse(TransportResponse.Empty.INSTANCE); - } } }); // we need to complete publication by failing the node - if (failNodeWhenCommitting && publicationDidNotMakeItToNode2 && remainingActions.get() > injectFaultAt) { + if (publicationDidNotMakeItToNode2 && remainingActions.get() > injectFaultAt) { publication.onFaultyNode(n2); } From 47d1c9fe5156a107771cd01d2b055242fdd017e5 Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Mon, 6 Aug 2018 11:20:30 +0200 Subject: [PATCH 14/19] check failure --- .../elasticsearch/cluster/coordination/PublicationTests.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/PublicationTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/PublicationTests.java index 7715ec0d5fa44..bb9d717349cfa 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/PublicationTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/PublicationTests.java @@ -51,6 +51,7 @@ import java.util.stream.Stream; import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; public class PublicationTests extends ESTestCase { @@ -245,6 +246,7 @@ public void testClusterStatePublishingWithFaultyNodeBeforeCommit() throws Interr List> errors = ackListener.awaitErrors(0L, TimeUnit.SECONDS); assertThat(errors.size(), equalTo(1)); assertThat(errors.get(0).v1(), equalTo(n2)); + assertThat(errors.get(0).v2().getMessage(), containsString("faulty node")); } public void testClusterStatePublishingWithFaultyNodeAfterCommit() throws InterruptedException { @@ -297,6 +299,7 @@ public void testClusterStatePublishingWithFaultyNodeAfterCommit() throws Interru List> errors = ackListener.awaitErrors(0L, TimeUnit.SECONDS); assertThat(errors.size(), equalTo(1)); assertThat(errors.get(0).v1(), equalTo(n2)); + assertThat(errors.get(0).v2().getMessage(), containsString("faulty node")); } public void testClusterStatePublishingFailsOrTimesOutBeforeCommit() throws InterruptedException { From b711edb489424c4d5fd0e9aa3529cbf2767fb591 Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Mon, 6 Aug 2018 11:32:09 +0200 Subject: [PATCH 15/19] exception messages --- .../org/elasticsearch/cluster/coordination/Publication.java | 6 ++++-- .../cluster/coordination/PublicationTests.java | 6 +++++- 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/Publication.java b/server/src/main/java/org/elasticsearch/cluster/coordination/Publication.java index e5541e9565172..6694df5adc1ce 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/Publication.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/Publication.java @@ -307,7 +307,8 @@ public void onFailure(Exception e) { } else { logger.debug(() -> new ParameterizedMessage("PublishResponseHandler: [{}] failed", discoveryNode), exp); } - setFailed(e); + assert ((TransportException) e).getRootCause() instanceof Exception; + setFailed((Exception) exp.getRootCause()); onPossibleCommitFailure(); assert publicationCompletedIffAllTargetsInactiveOrTimedOut(); } @@ -337,7 +338,8 @@ public void onFailure(Exception e) { } else { logger.debug(() -> new ParameterizedMessage("ApplyCommitResponseHandler: [{}] failed", discoveryNode), exp); } - setFailed(e); + assert ((TransportException) e).getRootCause() instanceof Exception; + setFailed((Exception) exp.getRootCause()); onPossibleCompletion(); assert publicationCompletedIffAllTargetsInactiveOrTimedOut(); } diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/PublicationTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/PublicationTests.java index bb9d717349cfa..384f7f43b4b47 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/PublicationTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/PublicationTests.java @@ -311,9 +311,10 @@ public void testClusterStatePublishingFailsOrTimesOutBeforeCommit() throws Inter MockPublication publication = node1.publish(CoordinationStateTests.clusterState(1L, 2L, discoveryNodes, config, config, 42L), ackListener, Collections.emptySet()); + boolean timeOut = randomBoolean(); publication.pendingPublications.entrySet().stream().collect(shuffle()).forEach(e -> { if (e.getKey().equals(n2)) { - if (randomBoolean()) { + if (timeOut) { publication.onTimeout(); } else { e.getValue().onFailure(new TransportException(new Exception("dummy failure"))); @@ -334,6 +335,9 @@ public void testClusterStatePublishingFailsOrTimesOutBeforeCommit() throws Inter List> errors = ackListener.awaitErrors(0L, TimeUnit.SECONDS); assertThat(errors.size(), equalTo(3)); + assertThat(errors.stream().map(Tuple::v1).collect(Collectors.toList()), containsInAnyOrder(n1, n2, n3)); + errors.stream().map(Tuple::v2).forEach(throwable -> + assertThat(throwable.getMessage(), containsString(timeOut ? "timed out" : "dummy failure"))); } public void testClusterStatePublishingTimesOutAfterCommit() throws InterruptedException { From e310291d1d12ff03354fe03fd71303e01a519fa7 Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Mon, 6 Aug 2018 11:43:41 +0200 Subject: [PATCH 16/19] access modifiers --- .../cluster/coordination/Publication.java | 22 +++++++++---------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/Publication.java b/server/src/main/java/org/elasticsearch/cluster/coordination/Publication.java index 6694df5adc1ce..f0f3ab9769e8e 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/Publication.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/Publication.java @@ -198,13 +198,13 @@ public String toString() { '}'; } - public void sendPublishRequest() { + void sendPublishRequest() { if (isFailed()) { return; } assert state == PublicationTargetState.NOT_STARTED : state + " -> " + PublicationTargetState.SENT_PUBLISH_REQUEST; state = PublicationTargetState.SENT_PUBLISH_REQUEST; - Publication.this.sendPublishRequest(discoveryNode, publishRequest, new PublicationTarget.PublishResponseHandler()); + Publication.this.sendPublishRequest(discoveryNode, publishRequest, new PublishResponseHandler()); // TODO Can this ^ fail with an exception? Target should be failed if so. assert publicationCompletedIffAllTargetsInactiveOrTimedOut(); } @@ -224,27 +224,27 @@ void handlePublishResponse(PublishResponse publishResponse) { } } - public void sendApplyCommit() { + void sendApplyCommit() { assert state == PublicationTargetState.WAITING_FOR_QUORUM : state + " -> " + PublicationTargetState.SENT_APPLY_COMMIT; state = PublicationTargetState.SENT_APPLY_COMMIT; assert applyCommitRequest.isPresent(); - Publication.this.sendApplyCommit(discoveryNode, applyCommitRequest.get(), new PublicationTarget.ApplyCommitResponseHandler()); + Publication.this.sendApplyCommit(discoveryNode, applyCommitRequest.get(), new ApplyCommitResponseHandler()); assert publicationCompletedIffAllTargetsInactiveOrTimedOut(); } - public void setAppliedCommit() { + void setAppliedCommit() { assert state == PublicationTargetState.SENT_APPLY_COMMIT : state + " -> " + PublicationTargetState.APPLIED_COMMIT; state = PublicationTargetState.APPLIED_COMMIT; ackOnce(null); } - public void setFailed(Exception e) { + void setFailed(Exception e) { assert state != PublicationTargetState.APPLIED_COMMIT : state + " -> " + PublicationTargetState.FAILED; state = PublicationTargetState.FAILED; ackOnce(e); } - public void onFaultyNode(DiscoveryNode faultyNode) { + void onFaultyNode(DiscoveryNode faultyNode) { if (isActive() && discoveryNode.equals(faultyNode)) { logger.debug("onFaultyNode: [{}] is faulty, failing target in publication {}", faultyNode, Publication.this); setFailed(new ElasticsearchException("faulty node")); @@ -259,22 +259,22 @@ private void ackOnce(Exception e) { } } - public boolean isActive() { + boolean isActive() { return state != PublicationTargetState.FAILED && state != PublicationTargetState.APPLIED_COMMIT; } - public boolean isWaitingForQuorum() { + boolean isWaitingForQuorum() { return state == PublicationTargetState.WAITING_FOR_QUORUM; } - public boolean mayCommitInFuture() { + boolean mayCommitInFuture() { return (state == PublicationTargetState.NOT_STARTED || state == PublicationTargetState.SENT_PUBLISH_REQUEST || state == PublicationTargetState.WAITING_FOR_QUORUM); } - public boolean isFailed() { + boolean isFailed() { return state == PublicationTargetState.FAILED; } From 0199b3de7b25524c7205d0f824d4bfa8ed6ffe1d Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Mon, 6 Aug 2018 11:58:21 +0200 Subject: [PATCH 17/19] test publish response arriving after commit --- .../coordination/PublicationTests.java | 25 +++++++++++++++++-- 1 file changed, 23 insertions(+), 2 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/PublicationTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/PublicationTests.java index 384f7f43b4b47..9f05739f952d2 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/PublicationTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/PublicationTests.java @@ -172,13 +172,17 @@ public void testSimpleClusterStatePublishing() throws InterruptedException { assertThat(publication.pendingPublications.keySet(), equalTo(discoNodes)); assertTrue(publication.pendingCommits.isEmpty()); AtomicBoolean processedNode1PublishResponse = new AtomicBoolean(); + boolean delayProcessingNode2PublishResponse = randomBoolean(); publication.pendingPublications.entrySet().stream().collect(shuffle()).forEach(e -> { + if (delayProcessingNode2PublishResponse && e.getKey().equals(n2)) { + return; + } PublishResponse publishResponse = nodeResolver.apply(e.getKey()).coordinationState.handlePublishRequest( publication.publishRequest); assertNotEquals(processedNode1PublishResponse.get(), publication.pendingCommits.isEmpty()); assertFalse(publication.possibleJoins.containsKey(e.getKey())); PublishWithJoinResponse publishWithJoinResponse = new PublishWithJoinResponse(publishResponse, - randomBoolean() ? Optional.empty() : Optional.of(new Join(e.getKey(), n1, randomNonNegativeLong(), + randomBoolean() ? Optional.empty() : Optional.of(new Join(e.getKey(), randomFrom(n1, n2, n3), randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong()))); e.getValue().onResponse(publishWithJoinResponse); assertTrue(publication.possibleJoins.containsKey(e.getKey())); @@ -189,7 +193,11 @@ public void testSimpleClusterStatePublishing() throws InterruptedException { assertNotEquals(processedNode1PublishResponse.get(), publication.pendingCommits.isEmpty()); }); - assertThat(publication.pendingCommits.keySet(), equalTo(discoNodes)); + if (delayProcessingNode2PublishResponse) { + assertThat(publication.pendingCommits.keySet(), equalTo(Sets.newHashSet(n1, n3))); + } else { + assertThat(publication.pendingCommits.keySet(), equalTo(discoNodes)); + } assertNotNull(publication.applyCommit); assertEquals(publication.applyCommit.getTerm(), publication.publishRequest.getAcceptedState().term()); assertEquals(publication.applyCommit.getVersion(), publication.publishRequest.getAcceptedState().version()); @@ -200,6 +208,19 @@ public void testSimpleClusterStatePublishing() throws InterruptedException { e.getValue().onResponse(TransportResponse.Empty.INSTANCE); }); + if (delayProcessingNode2PublishResponse) { + assertFalse(publication.completed); + assertFalse(publication.committed); + PublishResponse publishResponse = nodeResolver.apply(n2).coordinationState.handlePublishRequest( + publication.publishRequest); + publication.pendingPublications.get(n2).onResponse(new PublishWithJoinResponse(publishResponse, Optional.empty())); + assertThat(publication.pendingCommits.keySet(), equalTo(discoNodes)); + + assertFalse(publication.completed); + assertFalse(publication.committed); + publication.pendingCommits.get(n2).onResponse(TransportResponse.Empty.INSTANCE); + } + assertTrue(publication.completed); assertTrue(publication.committed); From a1e136b2335c9c9d1f8fd1216149e092f97dd035 Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Mon, 6 Aug 2018 12:03:20 +0200 Subject: [PATCH 18/19] fix assertion in test --- .../elasticsearch/cluster/coordination/PublicationTests.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/PublicationTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/PublicationTests.java index 9f05739f952d2..6825bc0d873dc 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/PublicationTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/PublicationTests.java @@ -357,8 +357,9 @@ public void testClusterStatePublishingFailsOrTimesOutBeforeCommit() throws Inter List> errors = ackListener.awaitErrors(0L, TimeUnit.SECONDS); assertThat(errors.size(), equalTo(3)); assertThat(errors.stream().map(Tuple::v1).collect(Collectors.toList()), containsInAnyOrder(n1, n2, n3)); - errors.stream().map(Tuple::v2).forEach(throwable -> - assertThat(throwable.getMessage(), containsString(timeOut ? "timed out" : "dummy failure"))); + errors.stream().forEach(tuple -> + assertThat(tuple.v2().getMessage(), containsString(timeOut ? "timed out" : + tuple.v1().equals(n2) ? "dummy failure" : "non-failed nodes do not form a quorum"))); } public void testClusterStatePublishingTimesOutAfterCommit() throws InterruptedException { From 799157d72d468f2cb457bdb7badb78cb83c82c0a Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Mon, 6 Aug 2018 15:52:36 +0200 Subject: [PATCH 19/19] review comments --- .../org/elasticsearch/cluster/coordination/Publication.java | 4 ++-- .../elasticsearch/cluster/coordination/PublicationTests.java | 3 ++- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/Publication.java b/server/src/main/java/org/elasticsearch/cluster/coordination/Publication.java index f0f3ab9769e8e..3fef7415739fd 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/Publication.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/Publication.java @@ -180,12 +180,12 @@ enum PublicationTargetState { APPLIED_COMMIT, } - private class PublicationTarget { + class PublicationTarget { private final DiscoveryNode discoveryNode; private boolean ackIsPending = true; private PublicationTargetState state = PublicationTargetState.NOT_STARTED; - private PublicationTarget(DiscoveryNode discoveryNode) { + PublicationTarget(DiscoveryNode discoveryNode) { this.discoveryNode = discoveryNode; } diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/PublicationTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/PublicationTests.java index 6825bc0d873dc..19c7f436c4f8f 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/PublicationTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/PublicationTests.java @@ -363,7 +363,8 @@ public void testClusterStatePublishingFailsOrTimesOutBeforeCommit() throws Inter } public void testClusterStatePublishingTimesOutAfterCommit() throws InterruptedException { - VotingConfiguration config = new VotingConfiguration(Sets.newHashSet(n1.getId(), n2.getId())); + VotingConfiguration config = new VotingConfiguration(randomBoolean() ? + Sets.newHashSet(n1.getId(), n2.getId()) : Sets.newHashSet(n1.getId(), n2.getId(), n3.getId())); initializeCluster(config); AssertingAckListener ackListener = new AssertingAckListener(nodes.size());