Skip to content

Commit

Permalink
PARQUET-2443: Simplify iterator
Browse files Browse the repository at this point in the history
  • Loading branch information
clairemcginty committed Mar 6, 2024
1 parent 2ef563a commit 116b88b
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 59 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -395,11 +395,7 @@ static final class LazyColumnChunkPageReader extends ColumnChunkPageReader {
ParquetReadOptions options) {
this.decompressor = decompressor;
this.compressedPages = compressedPages;
try {
this.compressedDictionaryPage = compressedPages.getDictionaryPage();
} catch (IOException e) {
throw new ParquetDecodingException("Failed to buffer to first data page", e);
}
this.compressedDictionaryPage = compressedPages.getDictionaryPage();
this.runningValueCount = 0;
this.offsetIndex = offsetIndex;
this.rowCount = rowCount;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1743,52 +1743,65 @@ public ColumnChunkPageReader readAllPages(
private DictionaryPage dictionaryPage = null;
private PageHeader currentPageHeader = null;

private boolean exhausted = false;
private boolean bufferedToFirstDataPage = false;

@Override
DictionaryPage getDictionaryPage() throws IOException {
if (currentPageHeader == null) {
seekToNextDataPage();
DictionaryPage getDictionaryPage() {
if (!bufferedToFirstDataPage) {
bufferToFirstDataPage();
}
return dictionaryPage;
}

@Override
boolean hasNextDataPage() throws IOException {
if (exhausted) {
return false;
public boolean hasNext() {
if (!bufferedToFirstDataPage) {
bufferToFirstDataPage();
}
if (currentPageHeader == null) {
seekToNextDataPage();
return hasMorePages();
}

private BytesInput readAsBytesInput(int size) {
try {
return BytesInput.from(stream.sliceBuffers(size));
} catch (IOException e) {
throw new ParquetDecodingException("Failed to read page bytes", e);
}
return !exhausted;
}

private BytesInput readAsBytesInput(int size) throws IOException {
return BytesInput.from(stream.sliceBuffers(size));
private boolean hasMorePages() {
return offsetIndex == null
? valuesCountReadSoFar < descriptor.metadata.getValueCount()
: dataPageCountReadSoFar < offsetIndex.getPageCount();
}

/**
* Seeks to next Data page in the chunk, parsing any Dictionary pages and skipping other non-data page types
* @return true if another data page is available in the chunk, or false if all data pages are exhausted
* @throws IOException
* Seeks to first Data page in the chunk, parsing any Dictionary pages and skipping other non-data page types
*/
private boolean seekToNextDataPage() throws IOException {
@Override
void bufferToFirstDataPage() {
if (!bufferedToFirstDataPage) {
bufferNextDataPage();
bufferedToFirstDataPage = true;
}
}

private void bufferNextDataPage() {
while (true) {
if (!hasMorePages(valuesCountReadSoFar, dataPageCountReadSoFar)) {
if (!hasMorePages()) {
this.currentPageHeader = null;
this.exhausted = true;
// Done reading, validate all bytes have been read
if (offsetIndex == null && valuesCountReadSoFar != descriptor.metadata.getValueCount()) {
// Would be nice to have a CorruptParquetFileException or something as a subclass?
throw new IOException("Expected " + descriptor.metadata.getValueCount()
throw new ParquetDecodingException(new IOException("Expected "
+ descriptor.metadata.getValueCount()
+ " values in column chunk at " + getPath()
+ " offset " + descriptor.metadata.getFirstDataPageOffset() + " but got "
+ valuesCountReadSoFar + " values instead over " + dataPageCountReadSoFar
+ " pages ending at file offset "
+ (descriptor.fileOffset + stream.position()));
+ (descriptor.fileOffset + stream.position())));
}
return false;
return;
}

try {
Expand Down Expand Up @@ -1817,7 +1830,7 @@ private boolean seekToNextDataPage() throws IOException {
final int compressedPageSize = currentPageHeader.getCompressed_page_size();
final PageType pageType = currentPageHeader.type;
if (pageType == PageType.DATA_PAGE || pageType == PageType.DATA_PAGE_V2) {
return true;
return;
}

final int uncompressedPageSize = currentPageHeader.getUncompressed_page_size();
Expand All @@ -1844,15 +1857,19 @@ private boolean seekToNextDataPage() throws IOException {
"skipping page of type {} of size {}",
currentPageHeader.getType(),
compressedPageSize);
stream.skipFully(compressedPageSize);
try {
stream.skipFully(compressedPageSize);
} catch (IOException e) {
throw new ParquetDecodingException(e);
}
}
}
}

@Override
DataPage nextDataPage() throws IOException {
if (!hasNextDataPage()) {
return null; // @Todo should throw NoSuchElementException?
public DataPage next() {
if (currentPageHeader == null) {
bufferNextDataPage();
}

int uncompressedPageSize = currentPageHeader.getUncompressed_page_size();
Expand Down Expand Up @@ -1968,12 +1985,6 @@ DataPage nextDataPage() throws IOException {
}
}

private boolean hasMorePages(long valuesCountReadSoFar, int dataPageCountReadSoFar) {
return offsetIndex == null
? valuesCountReadSoFar < descriptor.metadata.getValueCount()
: dataPageCountReadSoFar < offsetIndex.getPageCount();
}

private int getPageOrdinal(int dataPageCountReadSoFar) {
if (null == offsetIndex) {
return dataPageCountReadSoFar;
Expand All @@ -1999,29 +2010,12 @@ abstract static class PageIterator implements Iterator<DataPage> {
this.dataPageHeaderAAD = dataPageHeaderAAD;
}

abstract boolean hasNextDataPage() throws IOException;

abstract DataPage nextDataPage() throws IOException;

abstract DictionaryPage getDictionaryPage() throws IOException;

@Override
public boolean hasNext() {
try {
return hasNextDataPage();
} catch (IOException e) {
throw new ParquetDecodingException(e);
}
}
/**
* Read the chunk up to the first data page so that its DictionaryPage, if exists, is materialized.
*/
abstract void bufferToFirstDataPage();

@Override
public DataPage next() {
try {
return nextDataPage();
} catch (IOException e) {
throw new ParquetDecodingException(e);
}
}
abstract DictionaryPage getDictionaryPage();
}

/**
Expand Down

0 comments on commit 116b88b

Please sign in to comment.