Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add method to check if object is generically writeable in stream #54936

Merged
merged 3 commits into from
Apr 21, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -805,6 +805,23 @@ public final void writeOptionalInstant(@Nullable Instant instant) throws IOExcep
}
));

private static Class<?> getGenericType(Object value) {
if (value instanceof List) {
return List.class;
} else if (value instanceof Object[]) {
return Object[].class;
} else if (value instanceof Map) {
return Map.class;
} else if (value instanceof Set) {
return Set.class;
} else if (value instanceof ReadableInstant) {
return ReadableInstant.class;
} else if (value instanceof BytesReference) {
return BytesReference.class;
} else {
return value.getClass();
}
}
/**
* Notice: when serialization a map, the stream out map with the stream in map maybe have the
* different key-value orders, they will maybe have different stream order.
Expand All @@ -816,22 +833,7 @@ public void writeGenericValue(@Nullable Object value) throws IOException {
writeByte((byte) -1);
return;
}
final Class type;
if (value instanceof List) {
type = List.class;
} else if (value instanceof Object[]) {
type = Object[].class;
} else if (value instanceof Map) {
type = Map.class;
} else if (value instanceof Set) {
type = Set.class;
} else if (value instanceof ReadableInstant) {
type = ReadableInstant.class;
} else if (value instanceof BytesReference) {
type = BytesReference.class;
} else {
type = value.getClass();
}
final Class<?> type = getGenericType(value);
final Writer writer = WRITERS.get(type);
if (writer != null) {
writer.write(this, value);
Expand All @@ -840,6 +842,38 @@ public void writeGenericValue(@Nullable Object value) throws IOException {
}
}

public static void checkWriteable(@Nullable Object value) throws IllegalArgumentException {
if (value == null) {
return;
}
final Class<?> type = getGenericType(value);

if (type == List.class) {
@SuppressWarnings("unchecked") List<Object> list = (List<Object>) value;
for (Object v : list) {
checkWriteable(v);
}
} else if (type == Object[].class) {
Object[] array = (Object[]) value;
for (Object v : array) {
checkWriteable(v);
}
} else if (type == Map.class) {
@SuppressWarnings("unchecked") Map<String, Object> map = (Map<String, Object>) value;
for (Map.Entry<String, Object> entry : map.entrySet()) {
checkWriteable(entry.getKey());
checkWriteable(entry.getValue());
}
} else if (type == Set.class) {
@SuppressWarnings("unchecked") Set<Object> set = (Set<Object>) value;
for (Object v : set) {
checkWriteable(v);
}
} else if (WRITERS.containsKey(type) == false) {
throw new IllegalArgumentException("Cannot write type [" + type.getCanonicalName() + "] to stream");
}
}

public void writeIntArray(int[] values) throws IOException {
writeVInt(values.length);
for (int value : values) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.search.Scorable;
import org.apache.lucene.search.ScoreMode;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.util.CollectionUtils;
import org.elasticsearch.script.Script;
import org.elasticsearch.script.ScriptedMetricAggContexts;
Expand Down Expand Up @@ -90,6 +91,7 @@ public InternalAggregation buildAggregation(long owningBucketOrdinal) {
} else {
aggregation = aggState;
}
StreamOutput.checkWriteable(aggregation);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we could/should combine this with org.elasticsearch.common.util.CollectionUtils#ensureNoSelfReferences(java.lang.Object, java.lang.String) somehow? It seems like technically we would want to use this in all the spots that we currently also use that method to validate script returns (maybe I'm missing some detail and we wouldn't need to do this kind of validation in other script return spots and only need the no-self-references there for some reason?). That way we'd save iterating through the object to validate twice?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure we need both in all cases. ensureNoSelfReferences guarantees we can do any operation on the object in question (not resulting in an infinite loop). Checking if the object is writeable is only needed when we plan to write the object to transfer to another node. For example, with scripted metric aggs, we don't every write the node local results from init or map scripts, only the output of combine here that would be transferred to the coordinating node for the final reduce.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok makes sense then :) => I think the approach is fine.

return new InternalScriptedMetric(name, aggregation, reduceScript, metadata());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -445,6 +445,37 @@ public void testGenericSet() throws IOException {
assertGenericRoundtrip(new LinkedHashSet<>(list));
}

private static class Unwriteable {}

private void assertNotWriteable(Object o, Class<?> type) {
IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> StreamOutput.checkWriteable(o));
assertThat(e.getMessage(), equalTo("Cannot write type [" + type.getCanonicalName() + "] to stream"));
}

public void testIsWriteable() throws IOException {
assertNotWriteable(new Unwriteable(), Unwriteable.class);
}

public void testSetIsWriteable() throws IOException {
StreamOutput.checkWriteable(Set.of("a", "b"));
assertNotWriteable(Set.of(new Unwriteable()), Unwriteable.class);
}

public void testListIsWriteable() throws IOException {
StreamOutput.checkWriteable(List.of("a", "b"));
assertNotWriteable(List.of(new Unwriteable()), Unwriteable.class);
}

public void testMapIsWriteable() throws IOException {
StreamOutput.checkWriteable(Map.of("a", "b", "c", "d"));
assertNotWriteable(Map.of("a", new Unwriteable()), Unwriteable.class);
}

public void testObjectArrayIsWriteable() throws IOException {
StreamOutput.checkWriteable(new Object[] {"a", "b"});
assertNotWriteable(new Object[] {new Unwriteable()}, Unwriteable.class);
}

private void assertSerialization(CheckedConsumer<StreamOutput, IOException> outputAssertions,
CheckedConsumer<StreamInput, IOException> inputAssertions) throws IOException {
try (BytesStreamOutput output = new BytesStreamOutput()) {
Expand Down