Skip to content

Commit

Permalink
PARQUET-2443: [WIP] Make InputStream buffering lazy
Browse files Browse the repository at this point in the history
  • Loading branch information
clairemcginty committed Mar 10, 2024
1 parent 64e74f2 commit 30f4afd
Show file tree
Hide file tree
Showing 9 changed files with 700 additions and 268 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.parquet.bytes;

import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import org.apache.parquet.io.SeekableInputStream;

public class LazyByteBufferInputStream extends InputStream {
private final SeekableInputStream underlying;
private final int bufferSize;
private final long totalReadableBytes;

private long streamOffset;
private int bytesReadInBuffer;
private int totalBytesRead;
private ByteBuffer delegate;
private ByteBufferInputStream buffer;

public static LazyByteBufferInputStream wrap(
SeekableInputStream underlying,
ByteBufferAllocator allocator,
long startingOffset,
long readableBytes,
int bufferSize)
throws IOException {
return new LazyByteBufferInputStream(underlying, allocator, startingOffset, readableBytes, bufferSize);
}

LazyByteBufferInputStream(
SeekableInputStream underlying,
ByteBufferAllocator allocator,
long startingOffset,
long readableBytes,
int bufferSize) {
this.underlying = underlying;
this.totalReadableBytes = readableBytes;
this.bytesReadInBuffer = 0;
this.bufferSize = (int) Math.min(readableBytes, bufferSize); // @Todo clean up casting...
this.streamOffset = startingOffset;
this.delegate = allocator.allocate(this.bufferSize);
try {
underlying.seek(startingOffset);
underlying.read(delegate);
delegate.flip();
} catch (IOException e) {
throw new RuntimeException("Failed to initialize LazyByteBufferInputStream", e);
}
this.buffer = ByteBufferInputStream.wrap(delegate);
}

public ByteBuffer getDelegate() {
return delegate;
}

public void refillBuffers() {
try {
final byte[] unreadBytes =
Arrays.copyOfRange(delegate.array(), delegate.capacity() - buffer.available(), delegate.capacity());
this.streamOffset += bytesReadInBuffer;
delegate.put(unreadBytes);
underlying.seek(streamOffset + unreadBytes.length);
underlying.read(delegate); // Todo check that we're not reading past total chunk size
delegate.flip();
} catch (IOException e) {
throw new RuntimeException(e); // @Todo better exception handling
}

this.bytesReadInBuffer = 0;
this.buffer = ByteBufferInputStream.wrap(delegate);
}

private void checkRefill() {
checkRefill(1);
}

private void checkRefill(long bytesNeeded) {
if (buffer.available() < bytesNeeded) {
refillBuffers();
}
}

public List<ByteBuffer> sliceBuffers(int len) throws IOException {
if (len == 0) {
return Collections.emptyList();
}

checkRefill(len);

final ByteBuffer slice =
ByteBuffer.wrap(Arrays.copyOfRange(delegate.array(), bytesReadInBuffer, bytesReadInBuffer + len));
slice.position(0);

buffer.skip(len);
bytesReadInBuffer += len;
totalBytesRead += len;
return Collections.singletonList(slice);
}

public long position() {
return totalBytesRead;
}

private int wrapRead(int bytesRead) {
this.totalBytesRead += bytesRead;
this.bytesReadInBuffer += bytesRead;
return bytesRead;
}

@Override
public int read() throws IOException {
checkRefill();
wrapRead(1);
return buffer.read();
}

@Override
public int read(byte[] b) throws IOException {
checkRefill(b.length);
return wrapRead(buffer.read(b));
}

@Override
public int read(byte[] b, int off, int len) throws IOException {
checkRefill(len);
return wrapRead(buffer.read(b, off, len));
}

@Override
public byte[] readAllBytes() throws IOException {
throw new UnsupportedOperationException("readAllBytes");
}

@Override
public byte[] readNBytes(int len) throws IOException {
throw new UnsupportedOperationException("readNBytes");
}

@Override
public int readNBytes(byte[] b, int off, int len) throws IOException {
throw new UnsupportedOperationException("readNBytes");
}

@Override
public long skip(long n) {
checkRefill(n);
totalBytesRead += n;
bytesReadInBuffer += n;
return buffer.skip(n);
}

@Override
public int available() {
return (int) (totalReadableBytes - totalBytesRead);
}

@Override
public void close() throws IOException {
buffer.close();
}

@Override
public synchronized void mark(int readlimit) {
buffer.mark(readlimit);
}

@Override
public synchronized void reset() throws IOException {
buffer.reset();
}

@Override
public boolean markSupported() {
return buffer.markSupported();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ public List<ByteBuffer> sliceBuffers(long length) throws EOFException {
if (length == 0) {
return Collections.emptyList();
}

// System.out.println("Length = " + length + ", remaining= " + buffer.remaining());
if (length > buffer.remaining()) {
throw new EOFException();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.parquet.bytes;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import org.apache.parquet.io.DelegatingSeekableInputStream;
import org.apache.parquet.io.SeekableInputStream;
import org.junit.Assert;
import org.junit.Test;

public class TestLazyBufferInputStream {
static final ByteBuffer DATA = ByteBuffer.wrap(new byte[] {
0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29,
30, 31, 32, 33, 34
});

protected LazyByteBufferInputStream newStream() {
ByteBufferInputStream stream = ByteBufferInputStream.wrap(DATA);
SeekableInputStream inputStream = new DelegatingSeekableInputStream(stream) {
@Override
public long getPos() throws IOException {
return stream.position();
}

@Override
public void seek(long newPos) throws IOException {
stream.skip(newPos - stream.position());
}
};

return new LazyByteBufferInputStream(inputStream, new HeapByteBufferAllocator(), 2, 20, 6);
}

@Test
public void testReadBytes() throws Exception {
LazyByteBufferInputStream stream = newStream();
Assert.assertEquals(20, stream.available());

Assert.assertEquals(stream.read(), DATA.array()[2]);
Assert.assertEquals(stream.read(), DATA.array()[3]);
Assert.assertEquals(stream.read(), DATA.array()[4]);
Assert.assertEquals(stream.read(), DATA.array()[5]);
Assert.assertEquals(stream.read(), DATA.array()[6]);

Assert.assertEquals(15, stream.available());

Assert.assertEquals(stream.read(), DATA.array()[7]);
Assert.assertEquals(stream.read(), DATA.array()[8]);
Assert.assertEquals(stream.read(), DATA.array()[9]);
Assert.assertEquals(stream.read(), DATA.array()[10]);
Assert.assertEquals(stream.read(), DATA.array()[11]);

Assert.assertEquals(10, stream.available());
stream.close();
}

@Test
public void testReadByteBuffers() throws Exception {
LazyByteBufferInputStream stream = newStream();
byte[] buf1 = new byte[4];
Assert.assertEquals(stream.read(buf1), 4);
System.out.println(
"actual bytes = " + Arrays.toString(buf1) + ", expected =" + Arrays.toString(new byte[] {2, 3, 4, 5}));
Assert.assertArrayEquals(new byte[] {2, 3, 4, 5}, buf1);
Assert.assertEquals(stream.available(), 16);
Assert.assertEquals(stream.position(), 4);

byte[] buf2 = new byte[4];
Assert.assertEquals(stream.read(buf2), 4);
Assert.assertArrayEquals(new byte[] {6, 7, 8, 9}, buf2);
Assert.assertEquals(stream.available(), 12);
Assert.assertEquals(stream.position(), 8);

byte[] buf3 = new byte[6];
Assert.assertEquals(stream.read(buf3), 6);
Assert.assertArrayEquals(new byte[] {10, 11, 12, 13, 14, 15}, buf3);
Assert.assertEquals(stream.available(), 6);
Assert.assertEquals(stream.position(), 14);

byte[] buf4 = new byte[6];
Assert.assertEquals(stream.read(buf4), 6);
Assert.assertArrayEquals(new byte[] {16, 17, 18, 19, 20, 21}, buf4);
Assert.assertEquals(stream.available(), 0);
Assert.assertEquals(stream.position(), 20);

Assert.assertEquals(0, stream.available());
stream.close();
}

@Test
public void testSliceBuffers() throws Exception {
final LazyByteBufferInputStream stream = newStream();

// Initialize with a few reads to test when position != 0
stream.read();
stream.read();

Assert.assertEquals(18, stream.available());
Assert.assertEquals(2, stream.position());

ByteBuffer slice1 = stream.sliceBuffers(4).get(0);
Assert.assertEquals(0, slice1.position());
Assert.assertEquals(4, slice1.remaining());

byte[] buf = new byte[4];
buf[0] = slice1.get();
buf[1] = slice1.get();
buf[2] = slice1.get();
buf[3] = slice1.get();
Assert.assertArrayEquals(new byte[] {4, 5, 6, 7}, buf);
Assert.assertEquals(14, stream.available());
Assert.assertEquals(6, stream.position());

ByteBuffer slice2 = stream.sliceBuffers(6).get(0);
Assert.assertEquals(0, slice2.position());
Assert.assertEquals(6, slice2.remaining());

byte[] buf2 = new byte[6];
buf2[0] = slice2.get();
buf2[1] = slice2.get();
buf2[2] = slice2.get();
buf2[3] = slice2.get();
buf2[4] = slice2.get();
buf2[5] = slice2.get();
Assert.assertArrayEquals(new byte[] {8, 9, 10, 11, 12, 13}, buf2);
Assert.assertEquals(8, stream.available());
Assert.assertEquals(12, stream.position());

stream.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ private HadoopReadOptions(
boolean usePageChecksumVerification,
boolean useBloomFilter,
boolean useOffHeapDecryptBuffer,
boolean eagerlyReadFullRowGroup,
int columnChunkReadSize,
FilterCompat.Filter recordFilter,
MetadataFilter metadataFilter,
CompressionCodecFactory codecFactory,
Expand All @@ -62,7 +62,7 @@ private HadoopReadOptions(
usePageChecksumVerification,
useBloomFilter,
useOffHeapDecryptBuffer,
eagerlyReadFullRowGroup,
columnChunkReadSize,
recordFilter,
metadataFilter,
codecFactory,
Expand Down Expand Up @@ -125,7 +125,7 @@ public ParquetReadOptions build() {
usePageChecksumVerification,
useBloomFilter,
useOffHeapDecryptBuffer,
eagerlyReadFullRowGroup,
columnChunkBufferSize,
recordFilter,
metadataFilter,
codecFactory,
Expand Down
Loading

0 comments on commit 30f4afd

Please sign in to comment.