Skip to content

Commit

Permalink
Stats to record how often the ClusterState diff mechanism is used suc…
Browse files Browse the repository at this point in the history
…cessfully (elastic#26973)

It's believed that using diffs obsoletes the other mechanism for reusing the
bits of the ClusterState that didn't change between updates, but in fact we
don't know for sure how often the diff mechanism works successfully. The stats
collected here will tell us.
  • Loading branch information
DaveCTurner committed Oct 25, 2017
1 parent e660c6f commit ec82ea6
Show file tree
Hide file tree
Showing 9 changed files with 236 additions and 10 deletions.
26 changes: 23 additions & 3 deletions core/src/main/java/org/elasticsearch/discovery/DiscoveryStats.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,40 +19,56 @@

package org.elasticsearch.discovery;

import org.elasticsearch.Version;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.ToXContentFragment;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.discovery.zen.PendingClusterStateStats;
import org.elasticsearch.discovery.zen.PublishClusterStateStats;

import java.io.IOException;

public class DiscoveryStats implements Writeable, ToXContentFragment {

@Nullable
private final PendingClusterStateStats queueStats;
private final PublishClusterStateStats publishStats;

public DiscoveryStats(PendingClusterStateStats queueStats) {
public DiscoveryStats(PendingClusterStateStats queueStats, PublishClusterStateStats publishStats) {
this.queueStats = queueStats;
this.publishStats = publishStats;
}

public DiscoveryStats(StreamInput in) throws IOException {
queueStats = in.readOptionalWriteable(PendingClusterStateStats::new);

if (in.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) {
publishStats = in.readOptionalWriteable(PublishClusterStateStats::new);
} else {
publishStats = null;
}
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeOptionalWriteable(queueStats);

if (out.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) {
out.writeOptionalWriteable(publishStats);
}
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject(Fields.DISCOVERY);
if (queueStats != null ){
if (queueStats != null) {
queueStats.toXContent(builder, params);
}
if (publishStats != null) {
publishStats.toXContent(builder, params);
}
builder.endObject();
return builder;
}
Expand All @@ -64,4 +80,8 @@ static final class Fields {
public PendingClusterStateStats getQueueStats() {
return queueStats;
}

public PublishClusterStateStats getPublishStats() {
return publishStats;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateTaskListener;
import org.elasticsearch.cluster.block.ClusterBlocks;
Expand All @@ -34,6 +33,7 @@
import org.elasticsearch.discovery.Discovery;
import org.elasticsearch.discovery.DiscoveryStats;
import org.elasticsearch.discovery.zen.PendingClusterStateStats;
import org.elasticsearch.discovery.zen.PublishClusterStateStats;
import org.elasticsearch.transport.TransportService;

import java.io.IOException;
Expand Down Expand Up @@ -94,7 +94,7 @@ public void onFailure(String source, Exception e) {

@Override
public DiscoveryStats stats() {
return new DiscoveryStats((PendingClusterStateStats) null);
return new DiscoveryStats(null, null);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;

public class PublishClusterStateAction extends AbstractComponent {

Expand All @@ -90,6 +91,10 @@ public interface IncomingClusterStateListener {
private final IncomingClusterStateListener incomingClusterStateListener;
private final DiscoverySettings discoverySettings;

private final AtomicLong fullClusterStateReceivedCount = new AtomicLong();
private final AtomicLong incompatibleClusterStateDiffReceivedCount = new AtomicLong();
private final AtomicLong compatibleClusterStateDiffReceivedCount = new AtomicLong();

public PublishClusterStateAction(
Settings settings,
TransportService transportService,
Expand Down Expand Up @@ -380,11 +385,13 @@ protected void handleIncomingClusterStateRequest(BytesTransportRequest request,
// If true we received full cluster state - otherwise diffs
if (in.readBoolean()) {
incomingState = ClusterState.readFrom(in, transportService.getLocalNode());
fullClusterStateReceivedCount.incrementAndGet();
logger.debug("received full cluster state version [{}] with size [{}]", incomingState.version(),
request.bytes().length());
} else if (lastSeenClusterState != null) {
Diff<ClusterState> diff = ClusterState.readDiffFrom(in, lastSeenClusterState.nodes().getLocalNode());
incomingState = diff.apply(lastSeenClusterState);
compatibleClusterStateDiffReceivedCount.incrementAndGet();
logger.debug("received diff cluster state version [{}] with uuid [{}], diff size [{}]",
incomingState.version(), incomingState.stateUUID(), request.bytes().length());
} else {
Expand All @@ -394,6 +401,9 @@ protected void handleIncomingClusterStateRequest(BytesTransportRequest request,
incomingClusterStateListener.onIncomingClusterState(incomingState);
lastSeenClusterState = incomingState;
}
} catch (IncompatibleClusterStateVersionException e) {
incompatibleClusterStateDiffReceivedCount.incrementAndGet();
throw e;
} finally {
IOUtils.close(in);
}
Expand Down Expand Up @@ -636,4 +646,11 @@ public void setPublishingTimedOut(boolean isTimedOut) {
publishingTimedOut.set(isTimedOut);
}
}

public PublishClusterStateStats stats() {
return new PublishClusterStateStats(
fullClusterStateReceivedCount.get(),
incompatibleClusterStateDiffReceivedCount.get(),
compatibleClusterStateDiffReceivedCount.get());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.discovery.zen;

import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;

import java.io.IOException;

/**
* Class encapsulating stats about the PublishClusterStateAction
*/
public class PublishClusterStateStats implements Writeable, ToXContentObject {

private final long fullClusterStateReceivedCount;
private final long incompatibleClusterStateDiffReceivedCount;
private final long compatibleClusterStateDiffReceivedCount;

/**
* @param fullClusterStateReceivedCount the number of times this node has received a full copy of the cluster state from the master.
* @param incompatibleClusterStateDiffReceivedCount the number of times this node has received a cluster-state diff from the master.
* @param compatibleClusterStateDiffReceivedCount the number of times that received cluster-state diffs were compatible with
*/
public PublishClusterStateStats(long fullClusterStateReceivedCount,
long incompatibleClusterStateDiffReceivedCount,
long compatibleClusterStateDiffReceivedCount) {
this.fullClusterStateReceivedCount = fullClusterStateReceivedCount;
this.incompatibleClusterStateDiffReceivedCount = incompatibleClusterStateDiffReceivedCount;
this.compatibleClusterStateDiffReceivedCount = compatibleClusterStateDiffReceivedCount;
}

public PublishClusterStateStats(StreamInput in) throws IOException {
fullClusterStateReceivedCount = in.readVLong();
incompatibleClusterStateDiffReceivedCount = in.readVLong();
compatibleClusterStateDiffReceivedCount = in.readVLong();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVLong(fullClusterStateReceivedCount);
out.writeVLong(incompatibleClusterStateDiffReceivedCount);
out.writeVLong(compatibleClusterStateDiffReceivedCount);
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject("published_cluster_states");
{
builder.field("full_states", fullClusterStateReceivedCount);
builder.field("incompatible_diffs", incompatibleClusterStateDiffReceivedCount);
builder.field("compatible_diffs", compatibleClusterStateDiffReceivedCount);
}
builder.endObject();
return builder;
}

long getFullClusterStateReceivedCount() { return fullClusterStateReceivedCount; }

long getIncompatibleClusterStateDiffReceivedCount() { return incompatibleClusterStateDiffReceivedCount; }

long getCompatibleClusterStateDiffReceivedCount() { return compatibleClusterStateDiffReceivedCount; }

@Override
public String toString() {
return "PublishClusterStateStats(full=" + fullClusterStateReceivedCount
+ ", incompatible=" + incompatibleClusterStateDiffReceivedCount
+ ", compatible=" + compatibleClusterStateDiffReceivedCount
+ ")";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -412,8 +412,7 @@ Set<DiscoveryNode> getFaultDetectionNodes() {

@Override
public DiscoveryStats stats() {
PendingClusterStateStats queueStats = pendingStatesQueue.stats();
return new DiscoveryStats(queueStats);
return new DiscoveryStats(pendingStatesQueue.stats(), publishClusterState.stats());
}

public DiscoverySettings getDiscoverySettings() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.discovery.DiscoveryStats;
import org.elasticsearch.discovery.zen.PendingClusterStateStats;
import org.elasticsearch.discovery.zen.PublishClusterStateStats;
import org.elasticsearch.http.HttpStats;
import org.elasticsearch.indices.breaker.AllCircuitBreakerStats;
import org.elasticsearch.indices.breaker.CircuitBreakerStats;
Expand Down Expand Up @@ -410,8 +411,18 @@ private static NodeStats createNodeStats() {
allCircuitBreakerStats = new AllCircuitBreakerStats(circuitBreakerStatsArray);
}
ScriptStats scriptStats = frequently() ? new ScriptStats(randomNonNegativeLong(), randomNonNegativeLong()) : null;
DiscoveryStats discoveryStats = frequently() ? new DiscoveryStats(randomBoolean() ? new PendingClusterStateStats(randomInt(),
randomInt(), randomInt()) : null) : null;
DiscoveryStats discoveryStats = frequently()
? new DiscoveryStats(
randomBoolean()
? new PendingClusterStateStats(randomInt(), randomInt(), randomInt())
: null,
randomBoolean()
? new PublishClusterStateStats(
randomNonNegativeLong(),
randomNonNegativeLong(),
randomNonNegativeLong())
: null)
: null;
IngestStats ingestStats = null;
if (frequently()) {
IngestStats.Stats totalStats = new IngestStats.Stats(randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -705,6 +705,73 @@ public void testTimeoutOrCommit() throws Exception {
}
}

private void assertPublishClusterStateStats(String description, MockNode node, long expectedFull, long expectedIncompatibleDiffs,
long expectedCompatibleDiffs) {
PublishClusterStateStats stats = node.action.stats();
assertThat(description + ": full cluster states", stats.getFullClusterStateReceivedCount(), equalTo(expectedFull));
assertThat(description + ": incompatible cluster state diffs", stats.getIncompatibleClusterStateDiffReceivedCount(),
equalTo(expectedIncompatibleDiffs));
assertThat(description + ": compatible cluster state diffs", stats.getCompatibleClusterStateDiffReceivedCount(),
equalTo(expectedCompatibleDiffs));
}

public void testPublishClusterStateStats() throws Exception {
MockNode nodeA = createMockNode("nodeA").setAsMaster();
MockNode nodeB = createMockNode("nodeB");

assertPublishClusterStateStats("nodeA: initial state", nodeA, 0, 0, 0);
assertPublishClusterStateStats("nodeB: initial state", nodeB, 0, 0, 0);

// Initial cluster state
ClusterState clusterState = nodeA.clusterState;

// cluster state update - add nodeB
DiscoveryNodes discoveryNodes = DiscoveryNodes.builder(clusterState.nodes()).add(nodeB.discoveryNode).build();
ClusterState previousClusterState = clusterState;
clusterState = ClusterState.builder(clusterState).nodes(discoveryNodes).incrementVersion().build();
publishStateAndWait(nodeA.action, clusterState, previousClusterState);

// Sent as a full cluster state update
assertPublishClusterStateStats("nodeA: after full update", nodeA, 0, 0, 0);
assertPublishClusterStateStats("nodeB: after full update", nodeB, 1, 0, 0);

// Increment cluster state version
previousClusterState = clusterState;
clusterState = ClusterState.builder(clusterState).incrementVersion().build();
publishStateAndWait(nodeA.action, clusterState, previousClusterState);

// Sent, successfully, as a cluster state diff
assertPublishClusterStateStats("nodeA: after successful diff update", nodeA, 0, 0, 0);
assertPublishClusterStateStats("nodeB: after successful diff update", nodeB, 1, 0, 1);

// Increment cluster state version twice
previousClusterState = ClusterState.builder(clusterState).incrementVersion().build();
clusterState = ClusterState.builder(previousClusterState).incrementVersion().build();
publishStateAndWait(nodeA.action, clusterState, previousClusterState);

// Sent, unsuccessfully, as a diff and then retried as a full update
assertPublishClusterStateStats("nodeA: after unsuccessful diff update", nodeA, 0, 0, 0);
assertPublishClusterStateStats("nodeB: after unsuccessful diff update", nodeB, 2, 1, 1);

// node A steps down from being master
nodeA.resetMasterId();
nodeB.resetMasterId();

// node B becomes the master and sends a version of the cluster state that goes back
discoveryNodes = DiscoveryNodes.builder(discoveryNodes)
.add(nodeA.discoveryNode)
.add(nodeB.discoveryNode)
.masterNodeId(nodeB.discoveryNode.getId())
.localNodeId(nodeB.discoveryNode.getId())
.build();
previousClusterState = ClusterState.builder(new ClusterName("test")).nodes(discoveryNodes).build();
clusterState = ClusterState.builder(clusterState).nodes(discoveryNodes).incrementVersion().build();
publishStateAndWait(nodeB.action, clusterState, previousClusterState);

// Sent, unsuccessfully, as a diff, and then retried as a full update
assertPublishClusterStateStats("nodeA: B became master", nodeA, 1, 1, 0);
assertPublishClusterStateStats("nodeB: B became master", nodeB, 2, 1, 1);
}

private MetaData buildMetaDataForVersion(MetaData metaData, long version) {
ImmutableOpenMap.Builder<String, IndexMetaData> indices = ImmutableOpenMap.builder(metaData.indices());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportResponse;
import org.elasticsearch.transport.TransportService;
import org.hamcrest.Matchers;

import java.io.IOException;
import java.net.UnknownHostException;
Expand Down Expand Up @@ -255,6 +254,11 @@ public void testDiscoveryStats() throws Exception {
" \"total\" : 0,\n" +
" \"pending\" : 0,\n" +
" \"committed\" : 0\n" +
" },\n" +
" \"published_cluster_states\" : {\n" +
" \"full_states\" : 0,\n" +
" \"incompatible_diffs\" : 0,\n" +
" \"compatible_diffs\" : 0\n" +
" }\n" +
" }\n" +
"}";
Expand All @@ -275,6 +279,11 @@ public void testDiscoveryStats() throws Exception {
assertThat(stats.getQueueStats().getCommitted(), equalTo(0));
assertThat(stats.getQueueStats().getPending(), equalTo(0));

assertThat(stats.getPublishStats(), notNullValue());
assertThat(stats.getPublishStats().getFullClusterStateReceivedCount(), equalTo(0L));
assertThat(stats.getPublishStats().getIncompatibleClusterStateDiffReceivedCount(), equalTo(0L));
assertThat(stats.getPublishStats().getCompatibleClusterStateDiffReceivedCount(), equalTo(0L));

XContentBuilder builder = XContentFactory.jsonBuilder().prettyPrint();
builder.startObject();
stats.toXContent(builder, ToXContent.EMPTY_PARAMS);
Expand Down
Loading

0 comments on commit ec82ea6

Please sign in to comment.