Skip to content

Commit

Permalink
Add support to cluster state diffs
Browse files Browse the repository at this point in the history
First iteration of cluster state diffs that adds support for diffs to the most frequently changing elements - cluster state, meta data and routing table.

Closes elastic#6295
  • Loading branch information
imotov committed Mar 23, 2015
1 parent 56117e3 commit 69d7c4b
Show file tree
Hide file tree
Showing 32 changed files with 1,223 additions and 61 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ protected void masterOperation(final ClusterStateRequest request, final ClusterS
logger.trace("Serving cluster state request using version {}", currentState.version());
ClusterState.Builder builder = ClusterState.builder(currentState.getClusterName());
builder.version(currentState.version());
builder.uuid(currentState.uuid());
if (request.nodes()) {
builder.nodes(currentState.nodes());
}
Expand Down
127 changes: 119 additions & 8 deletions src/main/java/org/elasticsearch/cluster/ClusterState.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
import com.google.common.collect.ImmutableSet;
import org.elasticsearch.ElasticsearchIllegalArgumentException;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.DiffableUtils.KeyedReader;
import org.elasticsearch.cluster.block.ClusterBlock;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.metadata.IndexMetaData;
Expand Down Expand Up @@ -77,7 +77,7 @@ public byte id() {
}
}

public interface Custom {
public interface Custom extends Diffable<Custom> {

interface Factory<T extends Custom> {

Expand Down Expand Up @@ -118,6 +118,8 @@ public static <T extends Custom> Custom.Factory<T> lookupFactorySafe(String type

private final long version;

private final String uuid;

private final RoutingTable routingTable;

private final DiscoveryNodes nodes;
Expand All @@ -135,12 +137,13 @@ public static <T extends Custom> Custom.Factory<T> lookupFactorySafe(String type

private volatile ClusterStateStatus status;

public ClusterState(long version, ClusterState state) {
this(state.clusterName, version, state.metaData(), state.routingTable(), state.nodes(), state.blocks(), state.customs());
public ClusterState(long version, String uuid, ClusterState state) {
this(state.clusterName, version, uuid, state.metaData(), state.routingTable(), state.nodes(), state.blocks(), state.customs());
}

public ClusterState(ClusterName clusterName, long version, MetaData metaData, RoutingTable routingTable, DiscoveryNodes nodes, ClusterBlocks blocks, ImmutableOpenMap<String, Custom> customs) {
public ClusterState(ClusterName clusterName, long version, String uuid, MetaData metaData, RoutingTable routingTable, DiscoveryNodes nodes, ClusterBlocks blocks, ImmutableOpenMap<String, Custom> customs) {
this.version = version;
this.uuid = uuid;
this.clusterName = clusterName;
this.metaData = metaData;
this.routingTable = routingTable;
Expand All @@ -167,6 +170,14 @@ public long getVersion() {
return version();
}

public String uuid() {
return this.uuid;
}

public String getUuid() {
return uuid();
}

public DiscoveryNodes nodes() {
return this.nodes;
}
Expand Down Expand Up @@ -234,6 +245,7 @@ public RoutingNodes readOnlyRoutingNodes() {
public String prettyPrint() {
StringBuilder sb = new StringBuilder();
sb.append("version: ").append(version).append("\n");
sb.append("uuid: ").append(uuid).append("\n");
sb.append("meta data version: ").append(metaData.version()).append("\n");
sb.append(nodes().prettyPrint());
sb.append(routingTable().prettyPrint());
Expand Down Expand Up @@ -304,14 +316,13 @@ public String toString() {
}
}



@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
EnumSet<Metric> metrics = Metric.parseString(params.param("metric", "_all"), true);

if (metrics.contains(Metric.VERSION)) {
builder.field("version", version);
builder.field("uuid", uuid);
}

if (metrics.contains(Metric.MASTER_NODE)) {
Expand Down Expand Up @@ -508,6 +519,7 @@ public static class Builder {

private final ClusterName clusterName;
private long version = 0;
private String uuid = "_na_";
private MetaData metaData = MetaData.EMPTY_META_DATA;
private RoutingTable routingTable = RoutingTable.EMPTY_ROUTING_TABLE;
private DiscoveryNodes nodes = DiscoveryNodes.EMPTY_NODES;
Expand All @@ -518,6 +530,7 @@ public static class Builder {
public Builder(ClusterState state) {
this.clusterName = state.clusterName;
this.version = state.version();
this.uuid = state.uuid();
this.nodes = state.nodes();
this.routingTable = state.routingTable();
this.metaData = state.metaData();
Expand Down Expand Up @@ -576,6 +589,16 @@ public Builder version(long version) {
return this;
}

public Builder uuid(String uuid) {
this.uuid = uuid;
return this;
}

public Builder randomUuid() {
this.uuid = Strings.randomBase64UUID();
return this;
}

public Custom getCustom(String type) {
return customs.get(type);
}
Expand All @@ -591,7 +614,7 @@ public Builder removeCustom(String type) {
}

public ClusterState build() {
return new ClusterState(clusterName, version, metaData, routingTable, nodes, blocks, customs.build());
return new ClusterState(clusterName, version, uuid, metaData, routingTable, nodes, blocks, customs.build());
}

public static byte[] toBytes(ClusterState state) throws IOException {
Expand All @@ -600,6 +623,13 @@ public static byte[] toBytes(ClusterState state) throws IOException {
return os.bytes().toBytes();
}

public static byte[] toDiffBytes(ClusterState previousState, ClusterState state) throws IOException {
Diff diff = state.diffs(previousState);
BytesStreamOutput os = new BytesStreamOutput();
diff.writeTo(os);
return os.bytes().toBytes();
}

/**
* @param data input bytes
* @param localNode used to set the local node in the cluster state.
Expand All @@ -611,6 +641,7 @@ public static ClusterState fromBytes(byte[] data, DiscoveryNode localNode) throw
public static void writeTo(ClusterState state, StreamOutput out) throws IOException {
state.clusterName.writeTo(out);
out.writeLong(state.version());
out.writeString(state.uuid());
MetaData.Builder.writeTo(state.metaData(), out);
RoutingTable.Builder.writeTo(state.routingTable(), out);
DiscoveryNodes.Builder.writeTo(state.nodes(), out);
Expand All @@ -630,6 +661,7 @@ public static ClusterState readFrom(StreamInput in, @Nullable DiscoveryNode loca
ClusterName clusterName = ClusterName.readClusterName(in);
Builder builder = new Builder(clusterName);
builder.version = in.readLong();
builder.uuid = in.readString();
builder.metaData = MetaData.Builder.readFrom(in);
builder.routingTable = RoutingTable.Builder.readFrom(in);
builder.nodes = DiscoveryNodes.Builder.readFrom(in, localNode);
Expand All @@ -642,5 +674,84 @@ public static ClusterState readFrom(StreamInput in, @Nullable DiscoveryNode loca
}
return builder.build();
}

}

public Diff diffs(ClusterState previousState) {
return new ClusterStateDiff(previousState, this);
}


private class ClusterStateDiff implements Diff {

private final long version;

private final String previousUuid;

private final String uuid;

private final ClusterName clusterName;

private final Diff routingTable;

private final Diff nodes;

private final Diff metaData;

private final Diff blocks;

private final Diff customs;

public ClusterStateDiff(ClusterState before, ClusterState after) {
previousUuid = before.uuid;
uuid = after.uuid;
version = after.version;
clusterName = after.clusterName;
routingTable = after.routingTable.diffs(before.routingTable);
nodes = after.nodes.diffs(before.nodes);
metaData = after.metaData.diffs(before.metaData);
blocks = after.blocks.diffs(before.blocks);
customs = DiffableUtils.diff(before.customs, after.customs);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
clusterName.writeTo(out);
out.writeString(previousUuid);
out.writeString(uuid);
out.writeLong(version);
routingTable.writeTo(out);
nodes.writeTo(out);
metaData.writeTo(out);
blocks.writeTo(out);
customs.writeTo(out);
}
}

public ClusterState readDiffs(StreamInput in, DiscoveryNode localNode) throws IOException {
Builder builder = new Builder(ClusterName.readClusterName(in));
String previousUuid = in.readString();
String uuid = in.readString();
long version = in.readLong();
if (uuid.equals(this.uuid)) {
// no need to read the rest - cluster state didn't change
return this;
}
if (previousUuid.equals(this.uuid) == false) {
throw new IncompatibleClusterStateVersionException("Expected diffs for version " + version() + " with uuid " + this.uuid + " got version " + version + " and uuid " + previousUuid);
}
builder.uuid(uuid);
builder.version(version);
builder.routingTable(routingTable.readDiffs(in));
builder.nodes(nodes.readDiffs(in, localNode));
builder.metaData(metaData.readDiffs(in));
builder.blocks(blocks.readDiffs(in));
DiffableUtils.readDiff(customs, in, new KeyedReader<Custom>() {
@Override
public Custom readFrom(StreamInput in, String key) throws IOException {
return lookupFactorySafe(key).readFrom(in);
}
});
return builder.build();
}
}
37 changes: 37 additions & 0 deletions src/main/java/org/elasticsearch/cluster/Diff.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.cluster;

import org.elasticsearch.common.io.stream.StreamOutput;

import java.io.IOException;

/**
* Represents difference between states of cluster state parts
*/
public interface Diff {

/**
* Writes the differences into the output stream
* @param out
* @throws IOException
*/
void writeTo(StreamOutput out) throws IOException;
}
42 changes: 42 additions & 0 deletions src/main/java/org/elasticsearch/cluster/Diffable.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.cluster;

import org.elasticsearch.common.io.stream.ImmutableStreamable;
import org.elasticsearch.common.io.stream.StreamInput;

import java.io.IOException;

/**
* Cluster state part, changes in which can be serialized
*/
public interface Diffable<T> extends ImmutableStreamable<T> {

/**
* Returns serializable object representing differences between this and previousState
*/
Diff diffs(T previousState);

/**
* Reads the {@link org.elasticsearch.cluster.Diff} from StreamInput and applies them to this
*/
T readDiffs(StreamInput in) throws IOException;

}
Loading

0 comments on commit 69d7c4b

Please sign in to comment.