diff --git a/parquet-common/src/main/java/org/apache/parquet/bytes/CapacityByteArrayOutputStream.java b/parquet-common/src/main/java/org/apache/parquet/bytes/CapacityByteArrayOutputStream.java index 65e5b0ee4c..c859777a86 100644 --- a/parquet-common/src/main/java/org/apache/parquet/bytes/CapacityByteArrayOutputStream.java +++ b/parquet-common/src/main/java/org/apache/parquet/bytes/CapacityByteArrayOutputStream.java @@ -61,7 +61,6 @@ public class CapacityByteArrayOutputStream extends OutputStream { private final List slabs = new ArrayList(); private ByteBuffer currentSlab; - private int currentSlabIndex; private int bytesAllocated = 0; private int bytesUsed = 0; private ByteBufferAllocator allocator; @@ -193,7 +192,6 @@ private void addSlab(int minimumSize) { this.currentSlab = allocator.allocate(nextSlabSize); this.slabs.add(currentSlab); this.bytesAllocated = Math.addExact(this.bytesAllocated, nextSlabSize); - this.currentSlabIndex = 0; } @Override @@ -201,10 +199,8 @@ public void write(int b) { if (!currentSlab.hasRemaining()) { addSlab(1); } - currentSlab.put(currentSlabIndex, (byte) b); - currentSlabIndex += 1; - currentSlab.position(currentSlabIndex); - bytesUsed = Math.addExact(bytesUsed, 1); + currentSlab.put((byte) b); + bytesUsed += 1; } @Override @@ -214,21 +210,16 @@ public void write(byte b[], int off, int len) { throw new IndexOutOfBoundsException( String.format("Given byte array of size %d, with requested length(%d) and offset(%d)", b.length, len, off)); } - if (len >= currentSlab.remaining()) { + if (len > currentSlab.remaining()) { final int length1 = currentSlab.remaining(); currentSlab.put(b, off, length1); - bytesUsed = Math.addExact(bytesUsed, length1); - currentSlabIndex += length1; final int length2 = len - length1; addSlab(length2); currentSlab.put(b, off + length1, length2); - currentSlabIndex = length2; - bytesUsed = Math.addExact(bytesUsed, length2); } else { currentSlab.put(b, off, len); - currentSlabIndex += len; - bytesUsed = Math.addExact(bytesUsed, len); } + bytesUsed += len; } private void writeToOutput(OutputStream out, ByteBuffer buf, int len) throws IOException { @@ -252,10 +243,9 @@ private void writeToOutput(OutputStream out, ByteBuffer buf, int len) throws IOE * @exception IOException if an I/O error occurs. */ public void writeTo(OutputStream out) throws IOException { - for (int i = 0; i < slabs.size() - 1; i++) { - writeToOutput(out, slabs.get(i), slabs.get(i).position()); + for (ByteBuffer slab : slabs) { + writeToOutput(out, slab, slab.position()); } - writeToOutput(out, currentSlab, currentSlabIndex); } /** @@ -290,7 +280,6 @@ public void reset() { this.bytesAllocated = 0; this.bytesUsed = 0; this.currentSlab = EMPTY_SLAB; - this.currentSlabIndex = 0; } /** diff --git a/parquet-encoding/src/test/java/org/apache/parquet/bytes/TestCapacityByteArrayOutputStream.java b/parquet-encoding/src/test/java/org/apache/parquet/bytes/TestCapacityByteArrayOutputStream.java index 89db1981f4..b26f486fb9 100644 --- a/parquet-encoding/src/test/java/org/apache/parquet/bytes/TestCapacityByteArrayOutputStream.java +++ b/parquet-encoding/src/test/java/org/apache/parquet/bytes/TestCapacityByteArrayOutputStream.java @@ -49,6 +49,35 @@ public void testWriteArray() throws Throwable { validate(capacityByteArrayOutputStream, v * 3); } + @Test + public void testWriteArrayExpand() throws Throwable { + CapacityByteArrayOutputStream capacityByteArrayOutputStream = newCapacityBAOS(2); + assertEquals(0, capacityByteArrayOutputStream.getCapacity()); + + byte[] toWrite = {(byte) (1), (byte) (2), (byte) (3), (byte) (4)}; + int toWriteOffset = 0; + int writeLength = 2; + // write 2 bytes array + capacityByteArrayOutputStream.write(toWrite, toWriteOffset, writeLength); + toWriteOffset += writeLength; + assertEquals(2, capacityByteArrayOutputStream.size()); + assertEquals(2, capacityByteArrayOutputStream.getCapacity()); + + // write 1 byte array, expand capacity to 4 + writeLength = 1; + capacityByteArrayOutputStream.write(toWrite, toWriteOffset, writeLength); + toWriteOffset += writeLength; + assertEquals(3, capacityByteArrayOutputStream.size()); + assertEquals(4, capacityByteArrayOutputStream.getCapacity()); + + // write 1 byte array, not expand + capacityByteArrayOutputStream.write(toWrite, toWriteOffset, writeLength); + assertEquals(4, capacityByteArrayOutputStream.size()); + assertEquals(4, capacityByteArrayOutputStream.getCapacity()); + final byte[] byteArray = BytesInput.from(capacityByteArrayOutputStream).toByteArray(); + assertArrayEquals(toWrite, byteArray); + } + @Test public void testWriteArrayAndInt() throws Throwable { CapacityByteArrayOutputStream capacityByteArrayOutputStream = newCapacityBAOS(10);