Skip to content

Commit

Permalink
Introduce BytesReference#copy method
Browse files Browse the repository at this point in the history
  • Loading branch information
iverase committed Oct 14, 2023
1 parent 0f9d76a commit f2d69bd
Show file tree
Hide file tree
Showing 13 changed files with 170 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,14 @@ public BytesReference slice(int from, int length) {
return new BytesArray(bytes, offset + from, length);
}

@Override
public BytesReference copy(int from, int length) {
Objects.checkFromIndexSize(from, length, this.length);
final byte[] copy = new byte[length];
System.arraycopy(bytes, offset + from, copy, 0, length);
return new BytesArray(copy);
}

@Override
public boolean hasArray() {
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,10 +153,17 @@ static BytesReference fromByteArray(ByteArray byteArray, int length) {
int length();

/**
* Slice the bytes from the {@code from} index up to {@code length}.
* Slice the bytes from the {@code from} index up to {@code length}. The slice contains
* a direct reference to the internal pages.
*/
BytesReference slice(int from, int length);

/**
* Make a copy the bytes from the {@code from} index up to {@code length}. The copy does not
* contain a direct reference to the internal pages.
*/
BytesReference copy(int from, int length);

/**
* The amount of memory used by this BytesReference
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,11 @@
import org.apache.lucene.util.BytesRefIterator;
import org.elasticsearch.common.io.stream.ByteBufferStreamInput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.unit.ByteSizeValue;

import java.io.EOFException;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;

/**
Expand Down Expand Up @@ -217,19 +216,14 @@ public void reset() throws IOException {

@Override
public BytesReference readBytesReference(int length) throws IOException {
if (length == 0) {
return BytesArray.EMPTY;
}
final List<BytesReference> slices = new ArrayList<>();
while (length > 0) {
maybeNextSlice();
final byte[] bytes = slice.array().clone();
final int currentLen = Math.min(length, slice.remaining());
slices.add(new BytesArray(bytes, slice.position(), currentLen));
skip(currentLen);
length -= currentLen;
if (length < ByteSizeValue.ofMb(1).getBytes()) {
// if the length is small enough we can just copy the bytes in a single array
return super.readBytesReference(length);
} else {
final int offset = offset();
skip(length); // advance stream
return bytesReference.copy(offset, length);
}
return CompositeBytesReference.of(slices.toArray(BytesReference[]::new));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,26 @@ public BytesReference slice(int from, int length) {
return CompositeBytesReference.ofMultiple(inSlice);
}

@Override
public BytesReference copy(int from, int length) {
Objects.checkFromIndexSize(from, length, this.length);
final int to = from + length;
final int limit = getOffsetIndex(to - 1);
final int start = getOffsetIndex(from);
final int numCopies = 1 + (limit - start);
final int inCopyOffset = from - offsets[start];
if (numCopies == 1) {
return references[start].copy(inCopyOffset, length);
}
final BytesReference[] inCopy = new BytesReference[numCopies];
inCopy[0] = references[start].copy(inCopyOffset, references[start].length() - inCopyOffset);
for (int i = 1, j = start + 1; i < inCopy.length - 1; i++, j++) {
inCopy[i] = references[j].copy(0, references[j].length());
}
inCopy[inCopy.length - 1] = references[limit].copy(0, to - offsets[limit]);
return CompositeBytesReference.ofMultiple(inCopy);
}

private int getOffsetIndex(int offset) {
final int i = Arrays.binarySearch(offsets, offset);
return i < 0 ? (-(i + 1)) - 1 : i;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.BytesRefIterator;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.ByteArray;
import org.elasticsearch.common.util.PageCacheRecycler;

Expand Down Expand Up @@ -48,6 +49,25 @@ public BytesReference slice(int from, int length) {
return new PagedBytesReference(byteArray, offset + from, length);
}

@Override
public BytesReference copy(int from, int length) {
Objects.checkFromIndexSize(from, length, this.length);
final ByteArray byteArray = BigArrays.NON_RECYCLING_INSTANCE.newByteArray(length);
final int offset = this.offset + from;
final int initialFragmentSize = offset != 0 ? PAGE_SIZE - (offset % PAGE_SIZE) : PAGE_SIZE;
final BytesRef slice = new BytesRef();
int nextFragmentSize = Math.min(length, initialFragmentSize);
int position = 0;
while (nextFragmentSize != 0) {
final boolean materialized = this.byteArray.get(offset + position, nextFragmentSize, slice);
assert materialized == false : "iteration should be page aligned but array got materialized";
byteArray.set(position, slice.bytes, slice.offset, slice.length);
position += nextFragmentSize;
nextFragmentSize = Math.min(length - position, PAGE_SIZE);
}
return BytesReference.fromByteArray(byteArray, length);
}

@Override
public BytesRef toBytesRef() {
BytesRef bref = new BytesRef();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,12 @@ public BytesReference slice(int from, int length) {
return delegate.slice(from, length);
}

@Override
public BytesReference copy(int from, int length) {
assert hasReferences();
return delegate.copy(from, length);
}

@Override
public long ramBytesUsed() {
return delegate.ramBytesUsed();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
package org.elasticsearch.common.io.stream;

import org.elasticsearch.TransportVersion;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.bytes.ReleasableBytesReference;

import java.io.EOFException;
Expand All @@ -35,6 +36,11 @@ public void readBytes(byte[] b, int offset, int len) throws IOException {
delegate.readBytes(b, offset, len);
}

@Override
public BytesReference readBytesReference() throws IOException {
return delegate.readBytesReference();
}

@Override
public ReleasableBytesReference readReleasableBytesReference() throws IOException {
return delegate.readReleasableBytesReference();
Expand All @@ -51,6 +57,16 @@ public ReleasableBytesReference readAllToReleasableBytesReference() throws IOExc
return delegate.readAllToReleasableBytesReference();
}

@Override
public BytesReference readOptionalBytesReference() throws IOException {
return delegate.readOptionalBytesReference();
}

@Override
public BytesReference readBytesReference(int length) throws IOException {
return delegate.readBytesReference(length);
}

@Override
public short readShort() throws IOException {
return delegate.readShort();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,11 @@ public BytesReference slice(int from, int length) {
return new ZeroBytesReference(length);
}

@Override
public BytesReference copy(int from, int length) {
return slice(from, length);
}

@Override
public long ramBytesUsed() {
return 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@

package org.elasticsearch.common.bytes;

import java.io.IOException;

import static org.hamcrest.Matchers.containsString;

public class ZeroBytesReferenceTests extends AbstractBytesReferenceTestCase {
Expand Down Expand Up @@ -39,9 +37,20 @@ public void testSliceToBytesRef() {
// ZeroBytesReference shifts offsets
}

public void testWriteWithIterator() throws IOException {
AssertionError error = expectThrows(AssertionError.class, () -> super.testWriteWithIterator());
@Override
public void testSlice() {
AssertionError error = expectThrows(AssertionError.class, super::testSlice);
assertThat(error.getMessage(), containsString("Internal pages from ZeroBytesReference must be zero"));
}

@Override
public void testCopy() {
AssertionError error = expectThrows(AssertionError.class, super::testCopy);
assertThat(error.getMessage(), containsString("Internal pages from ZeroBytesReference must be zero"));
}

public void testWriteWithIterator() {
AssertionError error = expectThrows(AssertionError.class, super::testWriteWithIterator);
assertThat(error.getMessage(), containsString("Internal pages from ZeroBytesReference must be zero"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -739,23 +739,22 @@ private void assertGenericRoundtrip(Object original) throws IOException {
}

public void testBytesReferenceSerialization() throws IOException {
final int length = randomIntBetween(1024, 1024 * 1024);
final byte[] bytes = new byte[length];
random().nextBytes(bytes);
BytesReference expected;
if (randomBoolean()) {
final ByteArray byteArray = BigArrays.NON_RECYCLING_INSTANCE.newByteArray(length, randomBoolean());
byteArray.set(0, bytes, 0, length);
expected = BytesReference.fromByteArray(byteArray, length);
} else {
expected = new BytesArray(bytes);
byte[][] bytes = new byte[randomIntBetween(1, 5)][];
for (int i = 0; i < bytes.length; i++) {
final int length = randomIntBetween(1024, 3 * 1024 * 1024);
bytes[i] = new byte[length];
random().nextBytes(bytes[i]);
}
final BytesStreamOutput output = new BytesStreamOutput();
final BytesReference[] expected = new BytesReference[bytes.length];
for (int i = 0; i < bytes.length; i++) {
expected[i] = new BytesArray(bytes[i]);
output.writeBytesReference(expected[i]);
}

BytesStreamOutput output = new BytesStreamOutput();
output.writeBytesReference(expected);
final StreamInput in = getStreamInput(output.bytes());

final BytesReference loaded = in.readBytesReference();
assertThat(loaded, equalTo(expected));
for (BytesReference bytesReference : expected) {
assertThat(in.readBytesReference().toBytesRef(), equalTo(bytesReference.toBytesRef()));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -594,6 +594,11 @@ public BytesReference slice(int from, int length) {
return null;
}

@Override
public BytesReference copy(int from, int length) {
return null;
}

@Override
public BytesRef toBytesRef() {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,47 @@ public void testSlice() throws IOException {
// the offset can be anywhere
assertEquals(sliceLength, singlePageOrNull.length);
}
// make sure the slice is affected by changes to the original
final BytesRefIterator iterator = pbr.iterator();
BytesRef bytesRef;
while ((bytesRef = iterator.next()) != null) {
for (int i = 0; i < bytesRef.length; i++) {
bytesRef.bytes[bytesRef.offset + i]++;
}
}
for (int i = 0; i < sliceLength; i++) {
assertEquals(pbr.get(i + sliceOffset), slice.get(i));
}
}
}

public void testCopy() throws IOException {
for (int length : new int[] { 0, 1, randomIntBetween(2, PAGE_SIZE), randomIntBetween(PAGE_SIZE + 1, 3 * PAGE_SIZE) }) {
BytesReference pbr = newBytesReference(length);
int sliceOffset = randomIntBetween(0, length / 2);
int copyLength = Math.max(0, length - sliceOffset - 1);
BytesReference slice = pbr.copy(sliceOffset, copyLength);
assertEquals(copyLength, slice.length());
for (int i = 0; i < copyLength; i++) {
assertEquals(pbr.get(i + sliceOffset), slice.get(i));
}
BytesRef singlePageOrNull = getSinglePageOrNull(slice);
if (singlePageOrNull != null) {
// we can't assert the offset since if the length is smaller than the refercence
// the offset can be anywhere
assertEquals(copyLength, singlePageOrNull.length);
}
// make sure copy is not affected by changes to the original
final BytesRefIterator iterator = pbr.iterator();
BytesRef bytesRef;
while ((bytesRef = iterator.next()) != null) {
for (int i = 0; i < bytesRef.length; i++) {
bytesRef.bytes[bytesRef.offset + i]++;
}
}
for (int i = 0; i < copyLength; i++) {
assertEquals(pbr.get(i + sliceOffset), (byte) (slice.get(i) + 1));
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,12 @@ public BytesReference slice(int from, int length) {
throw new UnsupportedOperationException("RandomBlobContentBytesReference#slice(int, int) is unsupported");
}

@Override
public BytesReference copy(int from, int length) {
assert false : "must not copy a RandomBlobContentBytesReference";
throw new UnsupportedOperationException("RandomBlobContentBytesReference#copy(int, int) is unsupported");
}

@Override
public long ramBytesUsed() {
// no need for accurate accounting of the overhead since we don't really account for these things anyway
Expand Down

0 comments on commit f2d69bd

Please sign in to comment.