diff --git a/server/src/main/java/org/elasticsearch/common/bytes/BytesReferenceStreamInput.java b/server/src/main/java/org/elasticsearch/common/bytes/BytesReferenceStreamInput.java index 2fca882724bbd..22bb8e5ffc4f9 100644 --- a/server/src/main/java/org/elasticsearch/common/bytes/BytesReferenceStreamInput.java +++ b/server/src/main/java/org/elasticsearch/common/bytes/BytesReferenceStreamInput.java @@ -16,6 +16,8 @@ import java.io.EOFException; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; import java.util.Objects; /** @@ -213,6 +215,23 @@ public void reset() throws IOException { } } + @Override + public BytesReference readBytesReference(int length) throws IOException { + if (length == 0) { + return BytesArray.EMPTY; + } + final List slices = new ArrayList<>(); + while (length > 0) { + maybeNextSlice(); + final byte[] bytes = slice.array().clone(); + final int currentLen = Math.min(length, slice.remaining()); + slices.add(new BytesArray(bytes, slice.position(), currentLen)); + skip(currentLen); + length -= currentLen; + } + return CompositeBytesReference.of(slices.toArray(BytesReference[]::new)); + } + @Override public boolean markSupported() { return true; diff --git a/server/src/main/java/org/elasticsearch/common/io/stream/StreamInput.java b/server/src/main/java/org/elasticsearch/common/io/stream/StreamInput.java index fb0846ab8d208..96240dd053edb 100644 --- a/server/src/main/java/org/elasticsearch/common/io/stream/StreamInput.java +++ b/server/src/main/java/org/elasticsearch/common/io/stream/StreamInput.java @@ -23,8 +23,6 @@ import org.elasticsearch.common.geo.GeoPoint; import org.elasticsearch.common.settings.SecureString; import org.elasticsearch.common.text.Text; -import org.elasticsearch.common.util.BigArrays; -import org.elasticsearch.common.util.ByteArray; import org.elasticsearch.common.util.Maps; import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.core.CharArrays; @@ -163,9 +161,9 @@ public BytesReference readBytesReference(int length) throws IOException { if (length == 0) { return BytesArray.EMPTY; } - final ByteArray byteArray = BigArrays.NON_RECYCLING_INSTANCE.newByteArray(length); - byteArray.set(0, this, length); - return BytesReference.fromByteArray(byteArray, length); + byte[] bytes = new byte[length]; + readBytes(bytes, 0, length); + return new BytesArray(bytes, 0, length); } public BytesRef readBytesRef() throws IOException { diff --git a/server/src/main/java/org/elasticsearch/common/util/BigArrays.java b/server/src/main/java/org/elasticsearch/common/util/BigArrays.java index 13aebe7578e8d..edec336c2a028 100644 --- a/server/src/main/java/org/elasticsearch/common/util/BigArrays.java +++ b/server/src/main/java/org/elasticsearch/common/util/BigArrays.java @@ -14,7 +14,6 @@ import org.elasticsearch.common.breaker.CircuitBreaker; import org.elasticsearch.common.breaker.CircuitBreakingException; import org.elasticsearch.common.breaker.PreallocatedCircuitBreakerService; -import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.recycler.Recycler; import org.elasticsearch.core.Nullable; @@ -137,12 +136,6 @@ public void set(long index, byte[] buf, int offset, int len) { System.arraycopy(buf, offset, array, (int) index, len); } - @Override - public void set(long index, StreamInput input, int len) throws IOException { - assert indexIsInt(index); - input.readBytes(array, (int) index, len); - } - @Override public void fill(long fromIndex, long toIndex, byte value) { assert indexIsInt(fromIndex); diff --git a/server/src/main/java/org/elasticsearch/common/util/BigByteArray.java b/server/src/main/java/org/elasticsearch/common/util/BigByteArray.java index 614ba91782e86..72a2fc41a9a12 100644 --- a/server/src/main/java/org/elasticsearch/common/util/BigByteArray.java +++ b/server/src/main/java/org/elasticsearch/common/util/BigByteArray.java @@ -11,7 +11,6 @@ import org.apache.lucene.util.ArrayUtil; import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.RamUsageEstimator; -import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import java.io.IOException; @@ -107,25 +106,6 @@ public void set(long index, byte[] buf, int offset, int len) { } } - @Override - public void set(long index, StreamInput input, int len) throws IOException { - assert index + len <= size(); - int pageIndex = pageIndex(index); - final int indexInPage = indexInPage(index); - if (indexInPage + len <= pageSize()) { - input.readBytes(pages[pageIndex], indexInPage, len); - } else { - int copyLen = pageSize() - indexInPage; - input.readBytes(pages[pageIndex], indexInPage, copyLen); - do { - ++pageIndex; - len -= copyLen; - copyLen = Math.min(len, pageSize()); - input.readBytes(pages[pageIndex], 0, copyLen); - } while (len > copyLen); - } - } - @Override public void fill(long fromIndex, long toIndex, byte value) { if (fromIndex > toIndex) { diff --git a/server/src/main/java/org/elasticsearch/common/util/ByteArray.java b/server/src/main/java/org/elasticsearch/common/util/ByteArray.java index 0ece7e31c8642..e3b51ee7d2e32 100644 --- a/server/src/main/java/org/elasticsearch/common/util/ByteArray.java +++ b/server/src/main/java/org/elasticsearch/common/util/ByteArray.java @@ -46,11 +46,6 @@ static ByteArray readFrom(StreamInput in) throws IOException { */ void set(long index, byte[] buf, int offset, int len); - /** - * Bulk set from stream input. - */ - void set(long index, StreamInput input, int len) throws IOException; - /** * Fill slots between fromIndex inclusive to toIndex exclusive with value. */ diff --git a/server/src/main/java/org/elasticsearch/common/util/ReleasableByteArray.java b/server/src/main/java/org/elasticsearch/common/util/ReleasableByteArray.java index 0e5518e7e4aad..0102195f4e809 100644 --- a/server/src/main/java/org/elasticsearch/common/util/ReleasableByteArray.java +++ b/server/src/main/java/org/elasticsearch/common/util/ReleasableByteArray.java @@ -69,11 +69,6 @@ public void set(long index, byte[] buf, int offset, int len) { throw new UnsupportedOperationException(); } - @Override - public void set(long index, StreamInput input, int len) throws IOException { - throw new UnsupportedOperationException(); - } - @Override public void fill(long fromIndex, long toIndex, byte value) { throw new UnsupportedOperationException(); diff --git a/server/src/test/java/org/elasticsearch/common/io/stream/BytesStreamsTests.java b/server/src/test/java/org/elasticsearch/common/io/stream/BytesStreamsTests.java index 6b6299cadf188..e6fb07039ed3b 100644 --- a/server/src/test/java/org/elasticsearch/common/io/stream/BytesStreamsTests.java +++ b/server/src/test/java/org/elasticsearch/common/io/stream/BytesStreamsTests.java @@ -575,10 +575,16 @@ public void testWriteBigBytesReference() throws IOException { final BytesStreamOutput out = new BytesStreamOutput(); out.writeBytesReference(expected); - final StreamInput in = StreamInput.wrap(BytesReference.toBytes(out.bytes())); - final BytesReference loaded = in.readBytesReference(); - - assertThat(loaded, equalTo(expected)); + { + final StreamInput in = out.bytes().streamInput(); // use the BytesReference version + final BytesReference loaded = in.readBytesReference(); + assertThat(loaded, equalTo(expected)); + } + { + final StreamInput in = StreamInput.wrap(BytesReference.toBytes(out.bytes())); // use the byte[] version + final BytesReference loaded = in.readBytesReference(); + assertThat(loaded, equalTo(expected)); + } } private abstract static class BaseNamedWriteable implements NamedWriteable { diff --git a/server/src/test/java/org/elasticsearch/common/util/BigArraysTests.java b/server/src/test/java/org/elasticsearch/common/util/BigArraysTests.java index 611cafdf1f02b..0ada1f67d1045 100644 --- a/server/src/test/java/org/elasticsearch/common/util/BigArraysTests.java +++ b/server/src/test/java/org/elasticsearch/common/util/BigArraysTests.java @@ -12,7 +12,6 @@ import org.elasticsearch.common.breaker.CircuitBreaker; import org.elasticsearch.common.breaker.CircuitBreakingException; import org.elasticsearch.common.breaker.PreallocatedCircuitBreakerService; -import org.elasticsearch.common.io.stream.ByteArrayStreamInput; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; @@ -21,7 +20,6 @@ import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; import org.elasticsearch.test.ESTestCase; -import java.io.IOException; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.util.ArrayList; @@ -261,24 +259,19 @@ public void testByteArrayBulkGet() { array2.close(); } - public void testByteArrayBulkSet() throws IOException { + public void testByteArrayBulkSet() { final byte[] array1 = new byte[randomIntBetween(1, 4000000)]; random().nextBytes(array1); final ByteArray array2 = bigArrays.newByteArray(array1.length, randomBoolean()); - final ByteArray array3 = bigArrays.newByteArray(array1.length, randomBoolean()); - ByteArrayStreamInput input = new ByteArrayStreamInput(array1); for (int i = 0; i < array1.length;) { final int len = Math.min(array1.length - i, randomBoolean() ? randomInt(10) : randomInt(3 * PageCacheRecycler.BYTE_PAGE_SIZE)); array2.set(i, array1, i, len); - array3.set(i, input, len); i += len; } for (int i = 0; i < array1.length; ++i) { assertEquals(array1[i], array2.get(i)); - assertEquals(array1[i], array3.get(i)); } array2.close(); - array3.close(); } public void testByteArrayEquals() { diff --git a/test/framework/src/main/java/org/elasticsearch/common/util/MockBigArrays.java b/test/framework/src/main/java/org/elasticsearch/common/util/MockBigArrays.java index 2bf1b555025c1..0a0592b5a01f2 100644 --- a/test/framework/src/main/java/org/elasticsearch/common/util/MockBigArrays.java +++ b/test/framework/src/main/java/org/elasticsearch/common/util/MockBigArrays.java @@ -20,7 +20,6 @@ import org.elasticsearch.common.breaker.CircuitBreaker; import org.elasticsearch.common.breaker.CircuitBreakingException; import org.elasticsearch.common.breaker.NoopCircuitBreaker; -import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeValue; @@ -394,11 +393,6 @@ public void set(long index, byte[] buf, int offset, int len) { in.set(index, buf, offset, len); } - @Override - public void set(long index, StreamInput input, int len) throws IOException { - in.set(index, input, len); - } - @Override public void fill(long fromIndex, long toIndex, byte value) { in.fill(fromIndex, toIndex, value);