Skip to content

Commit

Permalink
Add tests for ByteBufferInputStream and fix bugs.
Browse files Browse the repository at this point in the history
  • Loading branch information
rdblue committed Feb 20, 2018
1 parent 614a2bb commit 103ed3d
Show file tree
Hide file tree
Showing 3 changed files with 519 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -977,7 +977,7 @@ public ColumnChunkPageReader readAllPages() throws IOException {
"Expected " + descriptor.metadata.getValueCount() + " values in column chunk at " +
getPath() + " offset " + descriptor.metadata.getFirstDataPageOffset() +
" but got " + valuesCountReadSoFar + " values instead over " + pagesInChunk.size()
+ " pages ending at file offset " + (descriptor.fileOffset + stream.getPos()));
+ " pages ending at file offset " + (descriptor.fileOffset + stream.position()));
}
BytesInputDecompressor decompressor = options.getCodecFactory().getDecompressor(descriptor.metadata.getCodec());
return new ColumnChunkPageReader(decompressor, pagesInChunk, dictionaryPage);
Expand Down Expand Up @@ -1036,12 +1036,12 @@ protected PageHeader readPageHeader() throws IOException {
}

public BytesInput readAsBytesInput(int size) throws IOException {
if (stream.getPos() + size > length) {
if (stream.position() + size > length) {
// this is to workaround a bug where the compressedLength
// of the chunk is missing the size of the header of the dictionary
// to allow reading older files (using dictionary) we need this.
// usually 13 to 19 bytes are missing
int l1 = length - (int) stream.getPos();
int l1 = length - (int) stream.position();
int l2 = size - l1;
LOG.info("completed the column chunk with {} bytes", l2);
return BytesInput.concat(super.readAsBytesInput(l1), BytesInput.copy(BytesInput.from(f, l2)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public ByteBufferInputStream(Collection<ByteBuffer> buffers) {
/**
* Returns the position in the stream.
*/
public long getPos() {
public long position() {
return position;
}

Expand Down Expand Up @@ -125,7 +125,7 @@ public List<ByteBuffer> sliceBuffers(long len) throws EOFException {
}

@Override
public int read(byte[] bytes, int off, int len) throws IOException {
public int read(byte[] bytes, int off, int len) {
if (len <= 0) {
return 0;
}
Expand All @@ -151,7 +151,7 @@ public int read(byte[] bytes, int off, int len) throws IOException {
}

@Override
public int read(byte[] bytes) throws IOException {
public int read(byte[] bytes) {
return read(bytes, 0, bytes.length);
}

Expand All @@ -163,6 +163,7 @@ public int read() throws IOException {

while (true) {
if (current.remaining() > 0) {
this.position += 1;
return current.get();
} else if (!nextBuffer()) {
// there are no more buffers
Expand All @@ -172,7 +173,7 @@ public int read() throws IOException {
}

@Override
public int available() throws IOException {
public int available() {
long remaining = length - position;
if (remaining > Integer.MAX_VALUE) {
return Integer.MAX_VALUE;
Expand All @@ -187,8 +188,10 @@ public synchronized void mark(int readlimit) {
discardMark();
}
this.mark = position;
this.markLimit = mark + readlimit;
markBuffers.add(current.duplicate());
this.markLimit = mark + readlimit + 1;
if (current != null) {
markBuffers.add(current.duplicate());
}
}

@Override
Expand All @@ -199,6 +202,7 @@ public synchronized void reset() throws IOException {
// have been used since mark was called.
this.iterator = concat(markBuffers.iterator(), iterator);
discardMark();
nextBuffer(); // go back to the marked buffers
} else {
throw new RuntimeException("No mark defined");
}
Expand All @@ -207,7 +211,7 @@ public synchronized void reset() throws IOException {
private synchronized void discardMark() {
this.mark = -1;
this.markLimit = 0;
markBuffers.clear();
markBuffers = new ArrayList<>();
}

@Override
Expand All @@ -216,17 +220,14 @@ public boolean markSupported() {
}

private boolean nextBuffer() {
Preconditions.checkState(current.remaining() == 0,
"Cannot advance to the next buffer until the current one is consumed.");

if (!iterator.hasNext()) {
this.current = null;
return false;
}

this.current = iterator.next().duplicate();

if (mark > 0) {
if (mark >= 0) {
if (position < markLimit) {
// the mark is defined and valid. save the new buffer
markBuffers.add(current.duplicate());
Expand Down
Loading

0 comments on commit 103ed3d

Please sign in to comment.