Skip to content

Commit

Permalink
Add support for cluster state diffs
Browse files Browse the repository at this point in the history
Adds support for calculating and sending diffs instead of full cluster state of the most frequently changing elements - cluster state, meta data and routing table.

Closes #6295
  • Loading branch information
imotov committed Apr 29, 2015
1 parent 6e1c995 commit 478c253
Show file tree
Hide file tree
Showing 60 changed files with 3,792 additions and 1,095 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public void readFrom(StreamInput in) throws IOException {
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
ClusterState.Builder.writeTo(state, out);
state.writeTo(out);
writeAcknowledged(out);
RoutingExplanations.writeTo(explanations, out);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,6 @@ public void readFrom(StreamInput in) throws IOException {
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
clusterName.writeTo(out);
ClusterState.Builder.writeTo(clusterState, out);
clusterState.writeTo(out);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

package org.elasticsearch.action.admin.cluster.state;

import com.carrotsearch.hppc.cursors.ObjectCursor;
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
Expand All @@ -29,7 +28,6 @@
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.metadata.MetaData.Custom;
Expand All @@ -39,11 +37,6 @@
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

import java.util.List;

import static com.google.common.collect.Lists.newArrayList;
import static org.elasticsearch.cluster.metadata.MetaData.lookupFactorySafe;

/**
*
*/
Expand Down Expand Up @@ -84,6 +77,7 @@ protected void masterOperation(final ClusterStateRequest request, final ClusterS
logger.trace("Serving cluster state request using version {}", currentState.version());
ClusterState.Builder builder = ClusterState.builder(currentState.getClusterName());
builder.version(currentState.version());
builder.uuid(currentState.uuid());
if (request.nodes()) {
builder.nodes(currentState.nodes());
}
Expand Down Expand Up @@ -122,10 +116,9 @@ protected void masterOperation(final ClusterStateRequest request, final ClusterS
}

// Filter our metadata that shouldn't be returned by API
for(ObjectCursor<String> type : currentState.metaData().customs().keys()) {
Custom.Factory factory = lookupFactorySafe(type.value);
if(!factory.context().contains(MetaData.XContentContext.API)) {
mdBuilder.removeCustom(type.value);
for(ObjectObjectCursor<String, Custom> custom : currentState.metaData().customs()) {
if(!custom.value.context().contains(MetaData.XContentContext.API)) {
mdBuilder.removeCustom(custom.key);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeString(entry.key);
out.writeVInt(entry.value.size());
for (AliasMetaData aliasMetaData : entry.value) {
AliasMetaData.Builder.writeTo(aliasMetaData, out);
aliasMetaData.writeTo(out);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -396,11 +396,11 @@ public CreateIndexRequest source(Map<String, Object> source) {
aliases((Map<String, Object>) entry.getValue());
} else {
// maybe custom?
IndexMetaData.Custom.Factory factory = IndexMetaData.lookupFactory(name);
if (factory != null) {
IndexMetaData.Custom proto = IndexMetaData.lookupPrototype(name);
if (proto != null) {
found = true;
try {
customs.put(name, factory.fromMap((Map<String, Object>) entry.getValue()));
customs.put(name, proto.fromMap((Map<String, Object>) entry.getValue()));
} catch (IOException e) {
throw new ElasticsearchParseException("failed to parse custom metadata for [" + name + "]");
}
Expand Down Expand Up @@ -448,7 +448,7 @@ public void readFrom(StreamInput in) throws IOException {
int customSize = in.readVInt();
for (int i = 0; i < customSize; i++) {
String type = in.readString();
IndexMetaData.Custom customIndexMetaData = IndexMetaData.lookupFactorySafe(type).readFrom(in);
IndexMetaData.Custom customIndexMetaData = IndexMetaData.lookupPrototypeSafe(type).readFrom(in);
customs.put(type, customIndexMetaData);
}
int aliasesSize = in.readVInt();
Expand All @@ -472,7 +472,7 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeVInt(customs.size());
for (Map.Entry<String, IndexMetaData.Custom> entry : customs.entrySet()) {
out.writeString(entry.getKey());
IndexMetaData.lookupFactorySafe(entry.getKey()).writeTo(entry.getValue(), out);
entry.getValue().writeTo(out);
}
out.writeVInt(aliases.size());
for (Alias alias : aliases) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ public void readFrom(StreamInput in) throws IOException {
int valueSize = in.readVInt();
ImmutableOpenMap.Builder<String, MappingMetaData> mappingEntryBuilder = ImmutableOpenMap.builder();
for (int j = 0; j < valueSize; j++) {
mappingEntryBuilder.put(in.readString(), MappingMetaData.readFrom(in));
mappingEntryBuilder.put(in.readString(), MappingMetaData.PROTO.readFrom(in));
}
mappingsMapBuilder.put(key, mappingEntryBuilder.build());
}
Expand Down Expand Up @@ -181,15 +181,15 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeVInt(indexEntry.value.size());
for (ObjectObjectCursor<String, MappingMetaData> mappingEntry : indexEntry.value) {
out.writeString(mappingEntry.key);
MappingMetaData.writeTo(mappingEntry.value, out);
mappingEntry.value.writeTo(out);
}
}
out.writeVInt(aliases.size());
for (ObjectObjectCursor<String, ImmutableList<AliasMetaData>> indexEntry : aliases) {
out.writeString(indexEntry.key);
out.writeVInt(indexEntry.value.size());
for (AliasMetaData aliasEntry : indexEntry.value) {
AliasMetaData.Builder.writeTo(aliasEntry, out);
aliasEntry.writeTo(out);
}
}
out.writeVInt(settings.size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public void readFrom(StreamInput in) throws IOException {
int valueSize = in.readVInt();
ImmutableOpenMap.Builder<String, MappingMetaData> typeMapBuilder = ImmutableOpenMap.builder();
for (int j = 0; j < valueSize; j++) {
typeMapBuilder.put(in.readString(), MappingMetaData.readFrom(in));
typeMapBuilder.put(in.readString(), MappingMetaData.PROTO.readFrom(in));
}
indexMapBuilder.put(key, typeMapBuilder.build());
}
Expand All @@ -75,7 +75,7 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeVInt(indexEntry.value.size());
for (ObjectObjectCursor<String, MappingMetaData> typeEntry : indexEntry.value) {
out.writeString(typeEntry.key);
MappingMetaData.writeTo(typeEntry.value, out);
typeEntry.value.writeTo(out);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeVInt(indexTemplates.size());
for (IndexTemplateMetaData indexTemplate : indexTemplates) {
IndexTemplateMetaData.Builder.writeTo(indexTemplate, out);
indexTemplate.writeTo(out);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -292,10 +292,10 @@ public PutIndexTemplateRequest source(Map templateSource) {
aliases((Map<String, Object>) entry.getValue());
} else {
// maybe custom?
IndexMetaData.Custom.Factory factory = IndexMetaData.lookupFactory(name);
if (factory != null) {
IndexMetaData.Custom proto = IndexMetaData.lookupPrototype(name);
if (proto != null) {
try {
customs.put(name, factory.fromMap((Map<String, Object>) entry.getValue()));
customs.put(name, proto.fromMap((Map<String, Object>) entry.getValue()));
} catch (IOException e) {
throw new ElasticsearchParseException("failed to parse custom metadata for [" + name + "]");
}
Expand Down Expand Up @@ -440,7 +440,7 @@ public void readFrom(StreamInput in) throws IOException {
int customSize = in.readVInt();
for (int i = 0; i < customSize; i++) {
String type = in.readString();
IndexMetaData.Custom customIndexMetaData = IndexMetaData.lookupFactorySafe(type).readFrom(in);
IndexMetaData.Custom customIndexMetaData = IndexMetaData.lookupPrototypeSafe(type).readFrom(in);
customs.put(type, customIndexMetaData);
}
int aliasesSize = in.readVInt();
Expand All @@ -466,7 +466,7 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeVInt(customs.size());
for (Map.Entry<String, IndexMetaData.Custom> entry : customs.entrySet()) {
out.writeString(entry.getKey());
IndexMetaData.lookupFactorySafe(entry.getKey()).writeTo(entry.getValue(), out);
entry.getValue().writeTo(out);
}
out.writeVInt(aliases.size());
for (Alias alias : aliases) {
Expand Down
108 changes: 108 additions & 0 deletions src/main/java/org/elasticsearch/cluster/AbstractDiffable.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.cluster;

import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.io.stream.StreamableReader;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;

import java.io.IOException;

/**
* Abstract diffable object with simple diffs implementation that sends the entire object if object has changed or
* nothing is object remained the same.
*/
public abstract class AbstractDiffable<T extends Diffable<T>> implements Diffable<T> {

@Override
public Diff<T> diff(T previousState) {
if (this.get().equals(previousState)) {
return new CompleteDiff<>();
} else {
return new CompleteDiff<>(get());
}
}

@Override
public Diff<T> readDiffFrom(StreamInput in) throws IOException {
return new CompleteDiff<>(this, in);
}

public static <T extends Diffable<T>> Diff<T> readDiffFrom(StreamableReader<T> reader, StreamInput in) throws IOException {
return new CompleteDiff<T>(reader, in);
}

private static class CompleteDiff<T extends Diffable<T>> implements Diff<T> {

@Nullable
private final T part;

/**
* Creates simple diff with changes
*/
public CompleteDiff(T part) {
this.part = part;
}

/**
* Creates simple diff without changes
*/
public CompleteDiff() {
this.part = null;
}

/**
* Read simple diff from the stream
*/
public CompleteDiff(StreamableReader<T> reader, StreamInput in) throws IOException {
if (in.readBoolean()) {
this.part = reader.readFrom(in);
} else {
this.part = null;
}
}

@Override
public void writeTo(StreamOutput out) throws IOException {
if (part != null) {
out.writeBoolean(true);
part.writeTo(out);
} else {
out.writeBoolean(false);
}
}

@Override
public T apply(T part) {
if (this.part != null) {
return this.part;
} else {
return part;
}
}
}

@SuppressWarnings("unchecked")
public T get() {
return (T) this;
}
}

Loading

0 comments on commit 478c253

Please sign in to comment.