diff --git a/server/src/main/java/org/elasticsearch/discovery/BlockingClusterStatePublishResponseHandler.java b/server/src/main/java/org/elasticsearch/discovery/BlockingClusterStatePublishResponseHandler.java index 72e59675c1bfa..8d5ef1926cdd5 100644 --- a/server/src/main/java/org/elasticsearch/discovery/BlockingClusterStatePublishResponseHandler.java +++ b/server/src/main/java/org/elasticsearch/discovery/BlockingClusterStatePublishResponseHandler.java @@ -22,6 +22,7 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; +import java.util.Collections; import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -35,6 +36,7 @@ public class BlockingClusterStatePublishResponseHandler { private final CountDownLatch latch; private final Set pendingNodes; + private final Set failedNodes; /** * Creates a new BlockingClusterStatePublishResponseHandler @@ -44,6 +46,7 @@ public BlockingClusterStatePublishResponseHandler(Set publishingT this.pendingNodes = ConcurrentCollections.newConcurrentSet(); this.pendingNodes.addAll(publishingToNodes); this.latch = new CountDownLatch(pendingNodes.size()); + this.failedNodes = ConcurrentCollections.newConcurrentSet(); } /** @@ -64,6 +67,8 @@ public void onResponse(DiscoveryNode node) { public void onFailure(DiscoveryNode node, Exception e) { boolean found = pendingNodes.remove(node); assert found : "node [" + node + "] already responded or failed"; + boolean added = failedNodes.add(node); + assert added : "duplicate failures for " + node; latch.countDown(); } @@ -86,4 +91,11 @@ public DiscoveryNode[] pendingNodes() { // nulls if some nodes responded in the meanwhile return pendingNodes.toArray(new DiscoveryNode[0]); } + + /** + * returns a set of nodes for which publication has failed. + */ + public Set getFailedNodes() { + return Collections.unmodifiableSet(failedNodes); + } } diff --git a/server/src/main/java/org/elasticsearch/discovery/zen/PublishClusterStateAction.java b/server/src/main/java/org/elasticsearch/discovery/zen/PublishClusterStateAction.java index 382a42141d83a..cd87a41526313 100644 --- a/server/src/main/java/org/elasticsearch/discovery/zen/PublishClusterStateAction.java +++ b/server/src/main/java/org/elasticsearch/discovery/zen/PublishClusterStateAction.java @@ -20,7 +20,6 @@ package org.elasticsearch.discovery.zen; import org.apache.logging.log4j.message.ParameterizedMessage; -import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; @@ -41,6 +40,7 @@ import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.discovery.AckClusterStatePublishResponseHandler; import org.elasticsearch.discovery.BlockingClusterStatePublishResponseHandler; import org.elasticsearch.discovery.Discovery; @@ -207,6 +207,12 @@ private void innerPublish(final ClusterChangedEvent clusterChangedEvent, final S clusterState.version(), publishTimeout, pendingNodes); } } + // The failure is logged under debug when a sending failed. we now log a summary. + Set failedNodes = publishResponseHandler.getFailedNodes(); + if (failedNodes.isEmpty() == false) { + logger.warn("publishing cluster state with version [{}] failed for the following nodes: [{}]", + clusterChangedEvent.state().version(), failedNodes); + } } catch (InterruptedException e) { // ignore & restore interrupt Thread.currentThread().interrupt(); @@ -367,14 +373,14 @@ public static BytesReference serializeDiffClusterState(Diff diff, Version nodeVe protected void handleIncomingClusterStateRequest(BytesTransportRequest request, TransportChannel channel) throws IOException { Compressor compressor = CompressorFactory.compressor(request.bytes()); StreamInput in = request.bytes().streamInput(); - try { - if (compressor != null) { - in = compressor.streamInput(in); - } - in = new NamedWriteableAwareStreamInput(in, namedWriteableRegistry); - in.setVersion(request.version()); - synchronized (lastSeenClusterStateMutex) { - final ClusterState incomingState; + final ClusterState incomingState; + synchronized (lastSeenClusterStateMutex) { + try { + if (compressor != null) { + in = compressor.streamInput(in); + } + in = new NamedWriteableAwareStreamInput(in, namedWriteableRegistry); + in.setVersion(request.version()); // If true we received full cluster state - otherwise diffs if (in.readBoolean()) { incomingState = ClusterState.readFrom(in, transportService.getLocalNode()); @@ -391,14 +397,17 @@ protected void handleIncomingClusterStateRequest(BytesTransportRequest request, logger.debug("received diff for but don't have any local cluster state - requesting full state"); throw new IncompatibleClusterStateVersionException("have no local cluster state"); } - incomingClusterStateListener.onIncomingClusterState(incomingState); - lastSeenClusterState = incomingState; + } catch (IncompatibleClusterStateVersionException e) { + incompatibleClusterStateDiffReceivedCount.incrementAndGet(); + throw e; + } catch (Exception e) { + logger.warn("unexpected error while deserializing an incoming cluster state", e); + throw e; + } finally { + IOUtils.close(in); } - } catch (IncompatibleClusterStateVersionException e) { - incompatibleClusterStateDiffReceivedCount.incrementAndGet(); - throw e; - } finally { - IOUtils.close(in); + incomingClusterStateListener.onIncomingClusterState(incomingState); + lastSeenClusterState = incomingState; } channel.sendResponse(TransportResponse.Empty.INSTANCE); } diff --git a/server/src/test/java/org/elasticsearch/discovery/BlockingClusterStatePublishResponseHandlerTests.java b/server/src/test/java/org/elasticsearch/discovery/BlockingClusterStatePublishResponseHandlerTests.java index 6d0ee8a97821e..9504344236b86 100644 --- a/server/src/test/java/org/elasticsearch/discovery/BlockingClusterStatePublishResponseHandlerTests.java +++ b/server/src/test/java/org/elasticsearch/discovery/BlockingClusterStatePublishResponseHandlerTests.java @@ -85,10 +85,16 @@ public void testConcurrentAccess() throws InterruptedException { int firstRound = randomIntBetween(5, nodeCount - 1); Thread[] threads = new Thread[firstRound]; CyclicBarrier barrier = new CyclicBarrier(firstRound); + Set expectedFailures = new HashSet<>(); Set completedNodes = new HashSet<>(); for (int i = 0; i < threads.length; i++) { - completedNodes.add(allNodes[i]); - threads[i] = new Thread(new PublishResponder(randomBoolean(), allNodes[i], barrier, logger, handler)); + final DiscoveryNode node = allNodes[i]; + completedNodes.add(node); + final boolean fail = randomBoolean(); + if (fail) { + expectedFailures.add(node); + } + threads[i] = new Thread(new PublishResponder(fail, node, barrier, logger, handler)); threads[i].start(); } // wait on the threads to finish @@ -105,7 +111,12 @@ public void testConcurrentAccess() throws InterruptedException { barrier = new CyclicBarrier(secondRound); for (int i = 0; i < threads.length; i++) { - threads[i] = new Thread(new PublishResponder(randomBoolean(), allNodes[firstRound + i], barrier, logger, handler)); + final DiscoveryNode node = allNodes[firstRound + i]; + final boolean fail = randomBoolean(); + if (fail) { + expectedFailures.add(node); + } + threads[i] = new Thread(new PublishResponder(fail, node, barrier, logger, handler)); threads[i].start(); } // wait on the threads to finish @@ -114,6 +125,6 @@ public void testConcurrentAccess() throws InterruptedException { } assertTrue("expected handler not to timeout as all nodes responded", handler.awaitAllNodes(new TimeValue(10))); assertThat(handler.pendingNodes(), arrayWithSize(0)); - + assertThat(handler.getFailedNodes(), equalTo(expectedFailures)); } }