From f2d69bdc5eb0be46b28440ad495191c3f2b0d522 Mon Sep 17 00:00:00 2001 From: iverase Date: Sat, 14 Oct 2023 13:20:53 +0200 Subject: [PATCH] Introduce BytesReference#copy method --- .../common/bytes/BytesArray.java | 8 ++++ .../common/bytes/BytesReference.java | 9 +++- .../bytes/BytesReferenceStreamInput.java | 22 ++++------ .../common/bytes/CompositeBytesReference.java | 20 +++++++++ .../common/bytes/PagedBytesReference.java | 20 +++++++++ .../bytes/ReleasableBytesReference.java | 6 +++ .../common/io/stream/FilterStreamInput.java | 16 ++++++++ .../common/bytes/ZeroBytesReference.java | 5 +++ .../common/bytes/ZeroBytesReferenceTests.java | 17 ++++++-- .../common/io/stream/AbstractStreamTests.java | 29 +++++++------ .../indices/IndicesRequestCacheTests.java | 5 +++ .../bytes/AbstractBytesReferenceTestCase.java | 41 +++++++++++++++++++ .../RandomBlobContentBytesReference.java | 6 +++ 13 files changed, 170 insertions(+), 34 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/common/bytes/BytesArray.java b/server/src/main/java/org/elasticsearch/common/bytes/BytesArray.java index 1e171b954aa7d..56da15f114c36 100644 --- a/server/src/main/java/org/elasticsearch/common/bytes/BytesArray.java +++ b/server/src/main/java/org/elasticsearch/common/bytes/BytesArray.java @@ -92,6 +92,14 @@ public BytesReference slice(int from, int length) { return new BytesArray(bytes, offset + from, length); } + @Override + public BytesReference copy(int from, int length) { + Objects.checkFromIndexSize(from, length, this.length); + final byte[] copy = new byte[length]; + System.arraycopy(bytes, offset + from, copy, 0, length); + return new BytesArray(copy); + } + @Override public boolean hasArray() { return true; diff --git a/server/src/main/java/org/elasticsearch/common/bytes/BytesReference.java b/server/src/main/java/org/elasticsearch/common/bytes/BytesReference.java index 21292a92d1dc1..e83924722d704 100644 --- a/server/src/main/java/org/elasticsearch/common/bytes/BytesReference.java +++ b/server/src/main/java/org/elasticsearch/common/bytes/BytesReference.java @@ -153,10 +153,17 @@ static BytesReference fromByteArray(ByteArray byteArray, int length) { int length(); /** - * Slice the bytes from the {@code from} index up to {@code length}. + * Slice the bytes from the {@code from} index up to {@code length}. The slice contains + * a direct reference to the internal pages. */ BytesReference slice(int from, int length); + /** + * Make a copy the bytes from the {@code from} index up to {@code length}. The copy does not + * contain a direct reference to the internal pages. + */ + BytesReference copy(int from, int length); + /** * The amount of memory used by this BytesReference */ diff --git a/server/src/main/java/org/elasticsearch/common/bytes/BytesReferenceStreamInput.java b/server/src/main/java/org/elasticsearch/common/bytes/BytesReferenceStreamInput.java index 22bb8e5ffc4f9..dc184bc021a39 100644 --- a/server/src/main/java/org/elasticsearch/common/bytes/BytesReferenceStreamInput.java +++ b/server/src/main/java/org/elasticsearch/common/bytes/BytesReferenceStreamInput.java @@ -12,12 +12,11 @@ import org.apache.lucene.util.BytesRefIterator; import org.elasticsearch.common.io.stream.ByteBufferStreamInput; import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.unit.ByteSizeValue; import java.io.EOFException; import java.io.IOException; import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.List; import java.util.Objects; /** @@ -217,19 +216,14 @@ public void reset() throws IOException { @Override public BytesReference readBytesReference(int length) throws IOException { - if (length == 0) { - return BytesArray.EMPTY; - } - final List slices = new ArrayList<>(); - while (length > 0) { - maybeNextSlice(); - final byte[] bytes = slice.array().clone(); - final int currentLen = Math.min(length, slice.remaining()); - slices.add(new BytesArray(bytes, slice.position(), currentLen)); - skip(currentLen); - length -= currentLen; + if (length < ByteSizeValue.ofMb(1).getBytes()) { + // if the length is small enough we can just copy the bytes in a single array + return super.readBytesReference(length); + } else { + final int offset = offset(); + skip(length); // advance stream + return bytesReference.copy(offset, length); } - return CompositeBytesReference.of(slices.toArray(BytesReference[]::new)); } @Override diff --git a/server/src/main/java/org/elasticsearch/common/bytes/CompositeBytesReference.java b/server/src/main/java/org/elasticsearch/common/bytes/CompositeBytesReference.java index 09ccab35d1e43..fc71112329297 100644 --- a/server/src/main/java/org/elasticsearch/common/bytes/CompositeBytesReference.java +++ b/server/src/main/java/org/elasticsearch/common/bytes/CompositeBytesReference.java @@ -164,6 +164,26 @@ public BytesReference slice(int from, int length) { return CompositeBytesReference.ofMultiple(inSlice); } + @Override + public BytesReference copy(int from, int length) { + Objects.checkFromIndexSize(from, length, this.length); + final int to = from + length; + final int limit = getOffsetIndex(to - 1); + final int start = getOffsetIndex(from); + final int numCopies = 1 + (limit - start); + final int inCopyOffset = from - offsets[start]; + if (numCopies == 1) { + return references[start].copy(inCopyOffset, length); + } + final BytesReference[] inCopy = new BytesReference[numCopies]; + inCopy[0] = references[start].copy(inCopyOffset, references[start].length() - inCopyOffset); + for (int i = 1, j = start + 1; i < inCopy.length - 1; i++, j++) { + inCopy[i] = references[j].copy(0, references[j].length()); + } + inCopy[inCopy.length - 1] = references[limit].copy(0, to - offsets[limit]); + return CompositeBytesReference.ofMultiple(inCopy); + } + private int getOffsetIndex(int offset) { final int i = Arrays.binarySearch(offsets, offset); return i < 0 ? (-(i + 1)) - 1 : i; diff --git a/server/src/main/java/org/elasticsearch/common/bytes/PagedBytesReference.java b/server/src/main/java/org/elasticsearch/common/bytes/PagedBytesReference.java index 8e743b1fcbcd0..a2a992b0d9631 100644 --- a/server/src/main/java/org/elasticsearch/common/bytes/PagedBytesReference.java +++ b/server/src/main/java/org/elasticsearch/common/bytes/PagedBytesReference.java @@ -10,6 +10,7 @@ import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.BytesRefIterator; +import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.ByteArray; import org.elasticsearch.common.util.PageCacheRecycler; @@ -48,6 +49,25 @@ public BytesReference slice(int from, int length) { return new PagedBytesReference(byteArray, offset + from, length); } + @Override + public BytesReference copy(int from, int length) { + Objects.checkFromIndexSize(from, length, this.length); + final ByteArray byteArray = BigArrays.NON_RECYCLING_INSTANCE.newByteArray(length); + final int offset = this.offset + from; + final int initialFragmentSize = offset != 0 ? PAGE_SIZE - (offset % PAGE_SIZE) : PAGE_SIZE; + final BytesRef slice = new BytesRef(); + int nextFragmentSize = Math.min(length, initialFragmentSize); + int position = 0; + while (nextFragmentSize != 0) { + final boolean materialized = this.byteArray.get(offset + position, nextFragmentSize, slice); + assert materialized == false : "iteration should be page aligned but array got materialized"; + byteArray.set(position, slice.bytes, slice.offset, slice.length); + position += nextFragmentSize; + nextFragmentSize = Math.min(length - position, PAGE_SIZE); + } + return BytesReference.fromByteArray(byteArray, length); + } + @Override public BytesRef toBytesRef() { BytesRef bref = new BytesRef(); diff --git a/server/src/main/java/org/elasticsearch/common/bytes/ReleasableBytesReference.java b/server/src/main/java/org/elasticsearch/common/bytes/ReleasableBytesReference.java index 337e4cd28c2b3..782973181383d 100644 --- a/server/src/main/java/org/elasticsearch/common/bytes/ReleasableBytesReference.java +++ b/server/src/main/java/org/elasticsearch/common/bytes/ReleasableBytesReference.java @@ -139,6 +139,12 @@ public BytesReference slice(int from, int length) { return delegate.slice(from, length); } + @Override + public BytesReference copy(int from, int length) { + assert hasReferences(); + return delegate.copy(from, length); + } + @Override public long ramBytesUsed() { return delegate.ramBytesUsed(); diff --git a/server/src/main/java/org/elasticsearch/common/io/stream/FilterStreamInput.java b/server/src/main/java/org/elasticsearch/common/io/stream/FilterStreamInput.java index 7dbbe1738ecd7..0901da0e8fb73 100644 --- a/server/src/main/java/org/elasticsearch/common/io/stream/FilterStreamInput.java +++ b/server/src/main/java/org/elasticsearch/common/io/stream/FilterStreamInput.java @@ -9,6 +9,7 @@ package org.elasticsearch.common.io.stream; import org.elasticsearch.TransportVersion; +import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.bytes.ReleasableBytesReference; import java.io.EOFException; @@ -35,6 +36,11 @@ public void readBytes(byte[] b, int offset, int len) throws IOException { delegate.readBytes(b, offset, len); } + @Override + public BytesReference readBytesReference() throws IOException { + return delegate.readBytesReference(); + } + @Override public ReleasableBytesReference readReleasableBytesReference() throws IOException { return delegate.readReleasableBytesReference(); @@ -51,6 +57,16 @@ public ReleasableBytesReference readAllToReleasableBytesReference() throws IOExc return delegate.readAllToReleasableBytesReference(); } + @Override + public BytesReference readOptionalBytesReference() throws IOException { + return delegate.readOptionalBytesReference(); + } + + @Override + public BytesReference readBytesReference(int length) throws IOException { + return delegate.readBytesReference(length); + } + @Override public short readShort() throws IOException { return delegate.readShort(); diff --git a/server/src/test/java/org/elasticsearch/common/bytes/ZeroBytesReference.java b/server/src/test/java/org/elasticsearch/common/bytes/ZeroBytesReference.java index ecacc29f45164..c10edc0d806e5 100644 --- a/server/src/test/java/org/elasticsearch/common/bytes/ZeroBytesReference.java +++ b/server/src/test/java/org/elasticsearch/common/bytes/ZeroBytesReference.java @@ -46,6 +46,11 @@ public BytesReference slice(int from, int length) { return new ZeroBytesReference(length); } + @Override + public BytesReference copy(int from, int length) { + return slice(from, length); + } + @Override public long ramBytesUsed() { return 0; diff --git a/server/src/test/java/org/elasticsearch/common/bytes/ZeroBytesReferenceTests.java b/server/src/test/java/org/elasticsearch/common/bytes/ZeroBytesReferenceTests.java index f90cb870ea22a..3414a03f7a821 100644 --- a/server/src/test/java/org/elasticsearch/common/bytes/ZeroBytesReferenceTests.java +++ b/server/src/test/java/org/elasticsearch/common/bytes/ZeroBytesReferenceTests.java @@ -8,8 +8,6 @@ package org.elasticsearch.common.bytes; -import java.io.IOException; - import static org.hamcrest.Matchers.containsString; public class ZeroBytesReferenceTests extends AbstractBytesReferenceTestCase { @@ -39,9 +37,20 @@ public void testSliceToBytesRef() { // ZeroBytesReference shifts offsets } - public void testWriteWithIterator() throws IOException { - AssertionError error = expectThrows(AssertionError.class, () -> super.testWriteWithIterator()); + @Override + public void testSlice() { + AssertionError error = expectThrows(AssertionError.class, super::testSlice); + assertThat(error.getMessage(), containsString("Internal pages from ZeroBytesReference must be zero")); + } + + @Override + public void testCopy() { + AssertionError error = expectThrows(AssertionError.class, super::testCopy); assertThat(error.getMessage(), containsString("Internal pages from ZeroBytesReference must be zero")); } + public void testWriteWithIterator() { + AssertionError error = expectThrows(AssertionError.class, super::testWriteWithIterator); + assertThat(error.getMessage(), containsString("Internal pages from ZeroBytesReference must be zero")); + } } diff --git a/server/src/test/java/org/elasticsearch/common/io/stream/AbstractStreamTests.java b/server/src/test/java/org/elasticsearch/common/io/stream/AbstractStreamTests.java index aaecefa7e679a..058072ce40939 100644 --- a/server/src/test/java/org/elasticsearch/common/io/stream/AbstractStreamTests.java +++ b/server/src/test/java/org/elasticsearch/common/io/stream/AbstractStreamTests.java @@ -739,23 +739,22 @@ private void assertGenericRoundtrip(Object original) throws IOException { } public void testBytesReferenceSerialization() throws IOException { - final int length = randomIntBetween(1024, 1024 * 1024); - final byte[] bytes = new byte[length]; - random().nextBytes(bytes); - BytesReference expected; - if (randomBoolean()) { - final ByteArray byteArray = BigArrays.NON_RECYCLING_INSTANCE.newByteArray(length, randomBoolean()); - byteArray.set(0, bytes, 0, length); - expected = BytesReference.fromByteArray(byteArray, length); - } else { - expected = new BytesArray(bytes); + byte[][] bytes = new byte[randomIntBetween(1, 5)][]; + for (int i = 0; i < bytes.length; i++) { + final int length = randomIntBetween(1024, 3 * 1024 * 1024); + bytes[i] = new byte[length]; + random().nextBytes(bytes[i]); + } + final BytesStreamOutput output = new BytesStreamOutput(); + final BytesReference[] expected = new BytesReference[bytes.length]; + for (int i = 0; i < bytes.length; i++) { + expected[i] = new BytesArray(bytes[i]); + output.writeBytesReference(expected[i]); } - BytesStreamOutput output = new BytesStreamOutput(); - output.writeBytesReference(expected); final StreamInput in = getStreamInput(output.bytes()); - - final BytesReference loaded = in.readBytesReference(); - assertThat(loaded, equalTo(expected)); + for (BytesReference bytesReference : expected) { + assertThat(in.readBytesReference().toBytesRef(), equalTo(bytesReference.toBytesRef())); + } } } diff --git a/server/src/test/java/org/elasticsearch/indices/IndicesRequestCacheTests.java b/server/src/test/java/org/elasticsearch/indices/IndicesRequestCacheTests.java index 590dc72e2a72b..6ec57c4f6d4ec 100644 --- a/server/src/test/java/org/elasticsearch/indices/IndicesRequestCacheTests.java +++ b/server/src/test/java/org/elasticsearch/indices/IndicesRequestCacheTests.java @@ -594,6 +594,11 @@ public BytesReference slice(int from, int length) { return null; } + @Override + public BytesReference copy(int from, int length) { + return null; + } + @Override public BytesRef toBytesRef() { return null; diff --git a/test/framework/src/main/java/org/elasticsearch/common/bytes/AbstractBytesReferenceTestCase.java b/test/framework/src/main/java/org/elasticsearch/common/bytes/AbstractBytesReferenceTestCase.java index 675db5bbd6330..00fbbdfc9f697 100644 --- a/test/framework/src/main/java/org/elasticsearch/common/bytes/AbstractBytesReferenceTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/common/bytes/AbstractBytesReferenceTestCase.java @@ -81,6 +81,47 @@ public void testSlice() throws IOException { // the offset can be anywhere assertEquals(sliceLength, singlePageOrNull.length); } + // make sure the slice is affected by changes to the original + final BytesRefIterator iterator = pbr.iterator(); + BytesRef bytesRef; + while ((bytesRef = iterator.next()) != null) { + for (int i = 0; i < bytesRef.length; i++) { + bytesRef.bytes[bytesRef.offset + i]++; + } + } + for (int i = 0; i < sliceLength; i++) { + assertEquals(pbr.get(i + sliceOffset), slice.get(i)); + } + } + } + + public void testCopy() throws IOException { + for (int length : new int[] { 0, 1, randomIntBetween(2, PAGE_SIZE), randomIntBetween(PAGE_SIZE + 1, 3 * PAGE_SIZE) }) { + BytesReference pbr = newBytesReference(length); + int sliceOffset = randomIntBetween(0, length / 2); + int copyLength = Math.max(0, length - sliceOffset - 1); + BytesReference slice = pbr.copy(sliceOffset, copyLength); + assertEquals(copyLength, slice.length()); + for (int i = 0; i < copyLength; i++) { + assertEquals(pbr.get(i + sliceOffset), slice.get(i)); + } + BytesRef singlePageOrNull = getSinglePageOrNull(slice); + if (singlePageOrNull != null) { + // we can't assert the offset since if the length is smaller than the refercence + // the offset can be anywhere + assertEquals(copyLength, singlePageOrNull.length); + } + // make sure copy is not affected by changes to the original + final BytesRefIterator iterator = pbr.iterator(); + BytesRef bytesRef; + while ((bytesRef = iterator.next()) != null) { + for (int i = 0; i < bytesRef.length; i++) { + bytesRef.bytes[bytesRef.offset + i]++; + } + } + for (int i = 0; i < copyLength; i++) { + assertEquals(pbr.get(i + sliceOffset), (byte) (slice.get(i) + 1)); + } } } diff --git a/x-pack/plugin/snapshot-repo-test-kit/src/main/java/org/elasticsearch/repositories/blobstore/testkit/RandomBlobContentBytesReference.java b/x-pack/plugin/snapshot-repo-test-kit/src/main/java/org/elasticsearch/repositories/blobstore/testkit/RandomBlobContentBytesReference.java index 44627000a2de9..a2227decbcaba 100644 --- a/x-pack/plugin/snapshot-repo-test-kit/src/main/java/org/elasticsearch/repositories/blobstore/testkit/RandomBlobContentBytesReference.java +++ b/x-pack/plugin/snapshot-repo-test-kit/src/main/java/org/elasticsearch/repositories/blobstore/testkit/RandomBlobContentBytesReference.java @@ -40,6 +40,12 @@ public BytesReference slice(int from, int length) { throw new UnsupportedOperationException("RandomBlobContentBytesReference#slice(int, int) is unsupported"); } + @Override + public BytesReference copy(int from, int length) { + assert false : "must not copy a RandomBlobContentBytesReference"; + throw new UnsupportedOperationException("RandomBlobContentBytesReference#copy(int, int) is unsupported"); + } + @Override public long ramBytesUsed() { // no need for accurate accounting of the overhead since we don't really account for these things anyway