Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stats to record how often the ClusterState diff mechanism is used successfully #26973

Merged
merged 33 commits into from
Oct 25, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
95f0969
Add statistics to record how often the ClusterState diff mechanism is…
DaveCTurner Oct 11, 2017
5ed348a
Picky picky
DaveCTurner Oct 11, 2017
db71be7
No need for Fields class
DaveCTurner Oct 11, 2017
092be64
Shorten arg name
DaveCTurner Oct 11, 2017
8675140
XContent not XContentFragment
DaveCTurner Oct 11, 2017
88bfda5
Make accumulators into AtomicLongs
DaveCTurner Oct 11, 2017
d825792
Add javadoc
DaveCTurner Oct 11, 2017
f004119
Slated for inclusion in 6.1
DaveCTurner Oct 11, 2017
ec982bd
Add stats for receivers of cluster states too
DaveCTurner Oct 11, 2017
c7a0192
Nit: Diffs -> Diff
DaveCTurner Oct 11, 2017
660f20a
Misread comment - should have been ToXContentObject
DaveCTurner Oct 11, 2017
5543e88
Don't bother collecting the master-side stats at all
DaveCTurner Oct 12, 2017
67c1a22
Stick with v7 for now.
DaveCTurner Oct 12, 2017
e0f3da3
Missed one
DaveCTurner Oct 12, 2017
a9a2828
Add assertions that the stats are updated as expected
DaveCTurner Oct 12, 2017
1852f8f
Remove assertions from existing test
DaveCTurner Oct 13, 2017
34c3b5a
Add testPublishClusterStateStats()
DaveCTurner Oct 13, 2017
6af7728
Better grammar
DaveCTurner Oct 13, 2017
b935508
Add REST assertions
DaveCTurner Oct 13, 2017
68d589d
Make count fields final
DaveCTurner Oct 16, 2017
efcd20d
Count incompatible cluster state diffs rather than all of them
DaveCTurner Oct 16, 2017
eaa0fd9
Assert that the sub-fields are >= 0
DaveCTurner Oct 16, 2017
a5124e0
No need for @Nullables
DaveCTurner Oct 16, 2017
d61551f
Skip these tests until v7.x
DaveCTurner Oct 16, 2017
3dd4219
Line length oops
DaveCTurner Oct 16, 2017
b589f36
Count incompatible states received in exception handler instead
DaveCTurner Oct 17, 2017
ed570df
Shorter keys in PublishClusterStateStats#toXContent()
DaveCTurner Oct 17, 2017
322c1de
Fewer linebreaks
DaveCTurner Oct 17, 2017
840a5bd
Fixed up REST API test
DaveCTurner Oct 23, 2017
54b840a
Merge branch 'master' into 2017-10-11-discovery-stats
DaveCTurner Oct 24, 2017
1fa86b7
Formatting
DaveCTurner Oct 24, 2017
ff54a0d
Inline
DaveCTurner Oct 24, 2017
168d94a
Make getters package-private
DaveCTurner Oct 24, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 23 additions & 3 deletions core/src/main/java/org/elasticsearch/discovery/DiscoveryStats.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,40 +19,56 @@

package org.elasticsearch.discovery;

import org.elasticsearch.Version;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.ToXContentFragment;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.discovery.zen.PendingClusterStateStats;
import org.elasticsearch.discovery.zen.PublishClusterStateStats;

import java.io.IOException;

public class DiscoveryStats implements Writeable, ToXContentFragment {

@Nullable
private final PendingClusterStateStats queueStats;
private final PublishClusterStateStats publishStats;

public DiscoveryStats(PendingClusterStateStats queueStats) {
public DiscoveryStats(PendingClusterStateStats queueStats, PublishClusterStateStats publishStats) {
this.queueStats = queueStats;
this.publishStats = publishStats;
}

public DiscoveryStats(StreamInput in) throws IOException {
queueStats = in.readOptionalWriteable(PendingClusterStateStats::new);

if (in.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) {
publishStats = in.readOptionalWriteable(PublishClusterStateStats::new);
} else {
publishStats = null;
}
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeOptionalWriteable(queueStats);

if (out.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) {
out.writeOptionalWriteable(publishStats);
}
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject(Fields.DISCOVERY);
if (queueStats != null ){
if (queueStats != null) {
queueStats.toXContent(builder, params);
}
if (publishStats != null) {
publishStats.toXContent(builder, params);
}
builder.endObject();
return builder;
}
Expand All @@ -64,4 +80,8 @@ static final class Fields {
public PendingClusterStateStats getQueueStats() {
return queueStats;
}

public PublishClusterStateStats getPublishStats() {
return publishStats;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateTaskListener;
import org.elasticsearch.cluster.block.ClusterBlocks;
Expand All @@ -34,6 +33,7 @@
import org.elasticsearch.discovery.Discovery;
import org.elasticsearch.discovery.DiscoveryStats;
import org.elasticsearch.discovery.zen.PendingClusterStateStats;
import org.elasticsearch.discovery.zen.PublishClusterStateStats;
import org.elasticsearch.transport.TransportService;

import java.io.IOException;
Expand Down Expand Up @@ -94,7 +94,7 @@ public void onFailure(String source, Exception e) {

@Override
public DiscoveryStats stats() {
return new DiscoveryStats((PendingClusterStateStats) null);
return new DiscoveryStats(null, null);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;

public class PublishClusterStateAction extends AbstractComponent {

Expand All @@ -90,6 +91,10 @@ public interface IncomingClusterStateListener {
private final IncomingClusterStateListener incomingClusterStateListener;
private final DiscoverySettings discoverySettings;

private final AtomicLong fullClusterStateReceivedCount = new AtomicLong();
private final AtomicLong incompatibleClusterStateDiffReceivedCount = new AtomicLong();
private final AtomicLong compatibleClusterStateDiffReceivedCount = new AtomicLong();

public PublishClusterStateAction(
Settings settings,
TransportService transportService,
Expand Down Expand Up @@ -380,11 +385,13 @@ protected void handleIncomingClusterStateRequest(BytesTransportRequest request,
// If true we received full cluster state - otherwise diffs
if (in.readBoolean()) {
incomingState = ClusterState.readFrom(in, transportService.getLocalNode());
fullClusterStateReceivedCount.incrementAndGet();
logger.debug("received full cluster state version [{}] with size [{}]", incomingState.version(),
request.bytes().length());
} else if (lastSeenClusterState != null) {
Diff<ClusterState> diff = ClusterState.readDiffFrom(in, lastSeenClusterState.nodes().getLocalNode());
incomingState = diff.apply(lastSeenClusterState);
compatibleClusterStateDiffReceivedCount.incrementAndGet();
logger.debug("received diff cluster state version [{}] with uuid [{}], diff size [{}]",
incomingState.version(), incomingState.stateUUID(), request.bytes().length());
} else {
Expand All @@ -394,6 +401,9 @@ protected void handleIncomingClusterStateRequest(BytesTransportRequest request,
incomingClusterStateListener.onIncomingClusterState(incomingState);
lastSeenClusterState = incomingState;
}
} catch (IncompatibleClusterStateVersionException e) {
incompatibleClusterStateDiffReceivedCount.incrementAndGet();
throw e;
} finally {
IOUtils.close(in);
}
Expand Down Expand Up @@ -636,4 +646,11 @@ public void setPublishingTimedOut(boolean isTimedOut) {
publishingTimedOut.set(isTimedOut);
}
}

public PublishClusterStateStats stats() {
return new PublishClusterStateStats(
fullClusterStateReceivedCount.get(),
incompatibleClusterStateDiffReceivedCount.get(),
compatibleClusterStateDiffReceivedCount.get());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.discovery.zen;

import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;

import java.io.IOException;

/**
* Class encapsulating stats about the PublishClusterStateAction
*/
public class PublishClusterStateStats implements Writeable, ToXContentObject {

private final long fullClusterStateReceivedCount;
private final long incompatibleClusterStateDiffReceivedCount;
private final long compatibleClusterStateDiffReceivedCount;

/**
* @param fullClusterStateReceivedCount the number of times this node has received a full copy of the cluster state from the master.
* @param incompatibleClusterStateDiffReceivedCount the number of times this node has received a cluster-state diff from the master.
* @param compatibleClusterStateDiffReceivedCount the number of times that received cluster-state diffs were compatible with
*/
public PublishClusterStateStats(long fullClusterStateReceivedCount,
long incompatibleClusterStateDiffReceivedCount,
long compatibleClusterStateDiffReceivedCount) {
this.fullClusterStateReceivedCount = fullClusterStateReceivedCount;
this.incompatibleClusterStateDiffReceivedCount = incompatibleClusterStateDiffReceivedCount;
this.compatibleClusterStateDiffReceivedCount = compatibleClusterStateDiffReceivedCount;
}

public PublishClusterStateStats(StreamInput in) throws IOException {
fullClusterStateReceivedCount = in.readVLong();
incompatibleClusterStateDiffReceivedCount = in.readVLong();
compatibleClusterStateDiffReceivedCount = in.readVLong();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVLong(fullClusterStateReceivedCount);
out.writeVLong(incompatibleClusterStateDiffReceivedCount);
out.writeVLong(compatibleClusterStateDiffReceivedCount);
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject("published_cluster_states");
{
builder.field("full_states", fullClusterStateReceivedCount);
builder.field("incompatible_diffs", incompatibleClusterStateDiffReceivedCount);
builder.field("compatible_diffs", compatibleClusterStateDiffReceivedCount);
}
builder.endObject();
return builder;
}

long getFullClusterStateReceivedCount() { return fullClusterStateReceivedCount; }

long getIncompatibleClusterStateDiffReceivedCount() { return incompatibleClusterStateDiffReceivedCount; }

long getCompatibleClusterStateDiffReceivedCount() { return compatibleClusterStateDiffReceivedCount; }

@Override
public String toString() {
return "PublishClusterStateStats(full=" + fullClusterStateReceivedCount
+ ", incompatible=" + incompatibleClusterStateDiffReceivedCount
+ ", compatible=" + compatibleClusterStateDiffReceivedCount
+ ")";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -412,8 +412,7 @@ Set<DiscoveryNode> getFaultDetectionNodes() {

@Override
public DiscoveryStats stats() {
PendingClusterStateStats queueStats = pendingStatesQueue.stats();
return new DiscoveryStats(queueStats);
return new DiscoveryStats(pendingStatesQueue.stats(), publishClusterState.stats());
}

public DiscoverySettings getDiscoverySettings() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.discovery.DiscoveryStats;
import org.elasticsearch.discovery.zen.PendingClusterStateStats;
import org.elasticsearch.discovery.zen.PublishClusterStateStats;
import org.elasticsearch.http.HttpStats;
import org.elasticsearch.indices.breaker.AllCircuitBreakerStats;
import org.elasticsearch.indices.breaker.CircuitBreakerStats;
Expand Down Expand Up @@ -392,8 +393,18 @@ private static NodeStats createNodeStats() {
}
ScriptStats scriptStats = frequently() ?
new ScriptStats(randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong()) : null;
DiscoveryStats discoveryStats = frequently() ? new DiscoveryStats(randomBoolean() ? new PendingClusterStateStats(randomInt(),
randomInt(), randomInt()) : null) : null;
DiscoveryStats discoveryStats = frequently()
? new DiscoveryStats(
randomBoolean()
? new PendingClusterStateStats(randomInt(), randomInt(), randomInt())
: null,
randomBoolean()
? new PublishClusterStateStats(
randomNonNegativeLong(),
randomNonNegativeLong(),
randomNonNegativeLong())
: null)
: null;
IngestStats ingestStats = null;
if (frequently()) {
IngestStats.Stats totalStats = new IngestStats.Stats(randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -705,6 +705,73 @@ public void testTimeoutOrCommit() throws Exception {
}
}

private void assertPublishClusterStateStats(String description, MockNode node, long expectedFull, long expectedIncompatibleDiffs,
long expectedCompatibleDiffs) {
PublishClusterStateStats stats = node.action.stats();
assertThat(description + ": full cluster states", stats.getFullClusterStateReceivedCount(), equalTo(expectedFull));
assertThat(description + ": incompatible cluster state diffs", stats.getIncompatibleClusterStateDiffReceivedCount(),
equalTo(expectedIncompatibleDiffs));
assertThat(description + ": compatible cluster state diffs", stats.getCompatibleClusterStateDiffReceivedCount(),
equalTo(expectedCompatibleDiffs));
}

public void testPublishClusterStateStats() throws Exception {
MockNode nodeA = createMockNode("nodeA").setAsMaster();
MockNode nodeB = createMockNode("nodeB");

assertPublishClusterStateStats("nodeA: initial state", nodeA, 0, 0, 0);
assertPublishClusterStateStats("nodeB: initial state", nodeB, 0, 0, 0);

// Initial cluster state
ClusterState clusterState = nodeA.clusterState;

// cluster state update - add nodeB
DiscoveryNodes discoveryNodes = DiscoveryNodes.builder(clusterState.nodes()).add(nodeB.discoveryNode).build();
ClusterState previousClusterState = clusterState;
clusterState = ClusterState.builder(clusterState).nodes(discoveryNodes).incrementVersion().build();
publishStateAndWait(nodeA.action, clusterState, previousClusterState);

// Sent as a full cluster state update
assertPublishClusterStateStats("nodeA: after full update", nodeA, 0, 0, 0);
assertPublishClusterStateStats("nodeB: after full update", nodeB, 1, 0, 0);

// Increment cluster state version
previousClusterState = clusterState;
clusterState = ClusterState.builder(clusterState).incrementVersion().build();
publishStateAndWait(nodeA.action, clusterState, previousClusterState);

// Sent, successfully, as a cluster state diff
assertPublishClusterStateStats("nodeA: after successful diff update", nodeA, 0, 0, 0);
assertPublishClusterStateStats("nodeB: after successful diff update", nodeB, 1, 0, 1);

// Increment cluster state version twice
previousClusterState = ClusterState.builder(clusterState).incrementVersion().build();
clusterState = ClusterState.builder(previousClusterState).incrementVersion().build();
publishStateAndWait(nodeA.action, clusterState, previousClusterState);

// Sent, unsuccessfully, as a diff and then retried as a full update
assertPublishClusterStateStats("nodeA: after unsuccessful diff update", nodeA, 0, 0, 0);
assertPublishClusterStateStats("nodeB: after unsuccessful diff update", nodeB, 2, 1, 1);

// node A steps down from being master
nodeA.resetMasterId();
nodeB.resetMasterId();

// node B becomes the master and sends a version of the cluster state that goes back
discoveryNodes = DiscoveryNodes.builder(discoveryNodes)
.add(nodeA.discoveryNode)
.add(nodeB.discoveryNode)
.masterNodeId(nodeB.discoveryNode.getId())
.localNodeId(nodeB.discoveryNode.getId())
.build();
previousClusterState = ClusterState.builder(new ClusterName("test")).nodes(discoveryNodes).build();
clusterState = ClusterState.builder(clusterState).nodes(discoveryNodes).incrementVersion().build();
publishStateAndWait(nodeB.action, clusterState, previousClusterState);

// Sent, unsuccessfully, as a diff, and then retried as a full update
assertPublishClusterStateStats("nodeA: B became master", nodeA, 1, 1, 0);
assertPublishClusterStateStats("nodeB: B became master", nodeB, 2, 1, 1);
}

private MetaData buildMetaDataForVersion(MetaData metaData, long version) {
ImmutableOpenMap.Builder<String, IndexMetaData> indices = ImmutableOpenMap.builder(metaData.indices());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportResponse;
import org.elasticsearch.transport.TransportService;
import org.hamcrest.Matchers;

import java.io.IOException;
import java.net.UnknownHostException;
Expand Down Expand Up @@ -255,6 +254,11 @@ public void testDiscoveryStats() throws Exception {
" \"total\" : 0,\n" +
" \"pending\" : 0,\n" +
" \"committed\" : 0\n" +
" },\n" +
" \"published_cluster_states\" : {\n" +
" \"full_states\" : 0,\n" +
" \"incompatible_diffs\" : 0,\n" +
" \"compatible_diffs\" : 0\n" +
" }\n" +
" }\n" +
"}";
Expand All @@ -275,6 +279,11 @@ public void testDiscoveryStats() throws Exception {
assertThat(stats.getQueueStats().getCommitted(), equalTo(0));
assertThat(stats.getQueueStats().getPending(), equalTo(0));

assertThat(stats.getPublishStats(), notNullValue());
assertThat(stats.getPublishStats().getFullClusterStateReceivedCount(), equalTo(0L));
assertThat(stats.getPublishStats().getIncompatibleClusterStateDiffReceivedCount(), equalTo(0L));
assertThat(stats.getPublishStats().getCompatibleClusterStateDiffReceivedCount(), equalTo(0L));

XContentBuilder builder = XContentFactory.jsonBuilder().prettyPrint();
builder.startObject();
stats.toXContent(builder, ToXContent.EMPTY_PARAMS);
Expand Down
Loading