Skip to content

Commit

Permalink
Add generic Set support to streams
Browse files Browse the repository at this point in the history
This commit adds support for reading and writing sets as generic values
in stream input and output.

closes elastic#54708
  • Loading branch information
rjernst committed Apr 4, 2020
1 parent bcc13ec commit b87e197
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -70,11 +70,13 @@
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.function.IntFunction;

import static org.elasticsearch.ElasticsearchException.readStackTrace;
Expand Down Expand Up @@ -735,6 +737,10 @@ public Object readGenericValue() throws IOException {
return readGeoPoint();
case 23:
return readZonedDateTime();
case 24:
return readCollection(StreamInput::readGenericValue, LinkedHashSet::new, Collections.emptySet());
case 25:
return readCollection(StreamInput::readGenericValue, HashSet::new, Collections.emptySet());
default:
throw new IOException("Can't read unknown type [" + type + "]");
}
Expand Down Expand Up @@ -820,6 +826,18 @@ private Map readHashMap() throws IOException {
return map10;
}

private Set<?> readSet(Function<Integer, Set<Object>> ctor) throws IOException {
int size = readArraySize();
if (size == 0) {
return Collections.emptySet();
}
Set<Object> set = ctor.apply(size);
for (int i = 0; i < size; i++) {
set.add(readGenericValue());
}
return set;
}

private Date readDate() throws IOException {
return new Date(readLong());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,10 @@
import java.util.EnumSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.IntFunction;

Expand Down Expand Up @@ -790,7 +792,18 @@ public final void writeOptionalInstant(@Nullable Instant instant) throws IOExcep
// joda does not understand "Z" for utc, so we must special case
o.writeString(zoneId.equals("Z") ? DateTimeZone.UTC.getID() : zoneId);
o.writeLong(zonedDateTime.toInstant().toEpochMilli());
}));
}),
entry(
Set.class,
(o, v) -> {
if (v instanceof LinkedHashSet) {
o.writeByte((byte) 24);
} else {
o.writeByte((byte) 25);
}
o.writeCollection((Set<?>) v, StreamOutput::writeGenericValue);
}
));

/**
* Notice: when serialization a map, the stream out map with the stream in map maybe have the
Expand All @@ -810,6 +823,8 @@ public void writeGenericValue(@Nullable Object value) throws IOException {
type = Object[].class;
} else if (value instanceof Map) {
type = Map.class;
} else if (value instanceof Set) {
type = Set.class;
} else if (value instanceof ReadableInstant) {
type = ReadableInstant.class;
} else if (value instanceof BytesReference) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import org.apache.lucene.util.BytesRef;
import org.elasticsearch.common.CheckedBiConsumer;
import org.elasticsearch.common.CheckedConsumer;
import org.elasticsearch.common.CheckedFunction;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
Expand All @@ -38,11 +39,13 @@
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
Expand Down Expand Up @@ -434,4 +437,32 @@ public void testSecureStringSerialization() throws IOException {
}
}

public void testGenericSet() throws IOException {
Set<String> set = Set.of("a", "b", "c", "d", "e");
assertGenericRoundtrip(set);
// reverse order in normal set so linked hashset does not match the order
var list = new ArrayList<>(set);
Collections.reverse(list);
assertGenericRoundtrip(new LinkedHashSet<>(list));
}

private void assertSerialization(CheckedConsumer<StreamOutput, IOException> outputAssertions,
CheckedConsumer<StreamInput, IOException> inputAssertions) throws IOException {
try (BytesStreamOutput output = new BytesStreamOutput()) {
outputAssertions.accept(output);
final BytesReference bytesReference = output.bytes();
final StreamInput input = bytesReference.streamInput();
inputAssertions.accept(input);
}
}

private void assertGenericRoundtrip(Object original) throws IOException {
assertSerialization(output -> {
output.writeGenericValue(original);
}, input -> {
Object read = input.readGenericValue();
assertThat(read, equalTo(original));
});
}

}

0 comments on commit b87e197

Please sign in to comment.