Skip to content

Commit

Permalink
[PARQUET-2357] Modest refactor of CapacityByteArrayOutputStream
Browse files Browse the repository at this point in the history
Change-Id: I98258d01875b61cedc9a5f13bfa925d0830de09e
  • Loading branch information
fengjiajie committed Sep 29, 2023
1 parent a1bacf8 commit 8c6d962
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ public class CapacityByteArrayOutputStream extends OutputStream {
private final List<ByteBuffer> slabs = new ArrayList<ByteBuffer>();

private ByteBuffer currentSlab;
private int currentSlabIndex;
private int bytesAllocated = 0;
private int bytesUsed = 0;
private ByteBufferAllocator allocator;
Expand Down Expand Up @@ -193,17 +192,14 @@ private void addSlab(int minimumSize) {
this.currentSlab = allocator.allocate(nextSlabSize);
this.slabs.add(currentSlab);
this.bytesAllocated = Math.addExact(this.bytesAllocated, nextSlabSize);
this.currentSlabIndex = 0;
}

@Override
public void write(int b) {
if (!currentSlab.hasRemaining()) {
addSlab(1);
}
currentSlab.put(currentSlabIndex, (byte) b);
currentSlabIndex += 1;
currentSlab.position(currentSlabIndex);
currentSlab.put((byte) b);
bytesUsed = Math.addExact(bytesUsed, 1);
}

Expand All @@ -214,19 +210,16 @@ public void write(byte b[], int off, int len) {
throw new IndexOutOfBoundsException(
String.format("Given byte array of size %d, with requested length(%d) and offset(%d)", b.length, len, off));
}
if (len >= currentSlab.remaining()) {
if (len > currentSlab.remaining()) {
final int length1 = currentSlab.remaining();
currentSlab.put(b, off, length1);
bytesUsed = Math.addExact(bytesUsed, length1);
currentSlabIndex += length1;
final int length2 = len - length1;
addSlab(length2);
currentSlab.put(b, off + length1, length2);
currentSlabIndex = length2;
bytesUsed = Math.addExact(bytesUsed, length2);
} else {
currentSlab.put(b, off, len);
currentSlabIndex += len;
bytesUsed = Math.addExact(bytesUsed, len);
}
}
Expand All @@ -252,10 +245,9 @@ private void writeToOutput(OutputStream out, ByteBuffer buf, int len) throws IOE
* @exception IOException if an I/O error occurs.
*/
public void writeTo(OutputStream out) throws IOException {
for (int i = 0; i < slabs.size() - 1; i++) {
writeToOutput(out, slabs.get(i), slabs.get(i).position());
for (ByteBuffer slab : slabs) {
writeToOutput(out, slab, slab.position());
}
writeToOutput(out, currentSlab, currentSlabIndex);
}

/**
Expand Down Expand Up @@ -290,7 +282,6 @@ public void reset() {
this.bytesAllocated = 0;
this.bytesUsed = 0;
this.currentSlab = EMPTY_SLAB;
this.currentSlabIndex = 0;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,41 @@ public void testWriteArray() throws Throwable {
validate(capacityByteArrayOutputStream, v * 3);
}

@Test
public void testWriteArrayExpand() throws Throwable {
CapacityByteArrayOutputStream capacityByteArrayOutputStream = newCapacityBAOS(2);
assertEquals(0, capacityByteArrayOutputStream.getCapacity());

byte[] toWrite = {(byte) (1), (byte) (2), (byte) (3), (byte) (4)};
int toWriteOffset = 0;
int writeLength = 2;
// write 2 bytes array
capacityByteArrayOutputStream.write(toWrite, toWriteOffset, writeLength);
toWriteOffset += writeLength;
assertEquals(2, capacityByteArrayOutputStream.size());
assertEquals(2, capacityByteArrayOutputStream.getCapacity());

// write 1 byte array, expand capacity to 4
writeLength = 1;
capacityByteArrayOutputStream.write(toWrite, toWriteOffset, writeLength);
toWriteOffset += writeLength;
assertEquals(3, capacityByteArrayOutputStream.size());
assertEquals(4, capacityByteArrayOutputStream.getCapacity());

// write 1 byte array, not expand
capacityByteArrayOutputStream.write(toWrite, toWriteOffset, writeLength);
assertEquals(4, capacityByteArrayOutputStream.size());
assertEquals(4, capacityByteArrayOutputStream.getCapacity());
final byte[] byteArray = BytesInput.from(capacityByteArrayOutputStream).toByteArray();
assertArrayEquals(toWrite, byteArray);

// more test for expand
capacityByteArrayOutputStream = newCapacityBAOS(3);
int v = 23;
writeArraysOf3(capacityByteArrayOutputStream, v);
validate(capacityByteArrayOutputStream, v * 3);
}

@Test
public void testWriteArrayAndInt() throws Throwable {
CapacityByteArrayOutputStream capacityByteArrayOutputStream = newCapacityBAOS(10);
Expand Down

0 comments on commit 8c6d962

Please sign in to comment.