Skip to content

Commit

Permalink
PARQUET-2210: [C++][Parquet] Skip pages based on header metadata usin…
Browse files Browse the repository at this point in the history
…g a callback (apache#14603)

Currently, we do not expose the page header metadata and they cannot be used for skipping pages. I propose exposing the metadata through a callback that would allow the caller to decide if they want to read or skip the page based on the metadata. 

Authored-by: Fatemah Panahi <[email protected]>
Signed-off-by: Micah Kornfield <[email protected]>
  • Loading branch information
fatemehp authored Jan 12, 2023
1 parent 37a7965 commit 97998d8
Show file tree
Hide file tree
Showing 6 changed files with 457 additions and 34 deletions.
99 changes: 71 additions & 28 deletions cpp/src/parquet/column_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,12 @@ EncodedStatistics ExtractStatsFromHeader(const H& header) {
return page_statistics;
}

void CheckNumValuesInHeader(int num_values) {
if (num_values < 0) {
throw ParquetException("Invalid page header (negative number of values)");
}
}

// ----------------------------------------------------------------------
// SerializedPageReader deserializes Thrift metadata and pages that have been
// assembled in a serialized stream for storing in a Parquet files
Expand Down Expand Up @@ -269,6 +275,11 @@ class SerializedPageReader : public PageReader {
int compressed_len, int uncompressed_len,
int levels_byte_len = 0);

// Returns true for non-data pages, and if we should skip based on
// data_page_filter_. Performs basic checks on values in the page header.
// Fills in data_page_statistics.
bool ShouldSkipPage(EncodedStatistics* data_page_statistics);

const ReaderProperties properties_;
std::shared_ptr<ArrowInputStream> stream_;

Expand Down Expand Up @@ -342,6 +353,55 @@ void SerializedPageReader::UpdateDecryption(const std::shared_ptr<Decryptor>& de
}
}

bool SerializedPageReader::ShouldSkipPage(EncodedStatistics* data_page_statistics) {
const PageType::type page_type = LoadEnumSafe(&current_page_header_.type);
if (page_type == PageType::DATA_PAGE) {
const format::DataPageHeader& header = current_page_header_.data_page_header;
CheckNumValuesInHeader(header.num_values);
*data_page_statistics = ExtractStatsFromHeader(header);
seen_num_values_ += header.num_values;
if (data_page_filter_) {
const EncodedStatistics* filter_statistics =
data_page_statistics->is_set() ? data_page_statistics : nullptr;
DataPageStats data_page_stats(filter_statistics, header.num_values,
/*num_rows=*/std::nullopt);
if (data_page_filter_(data_page_stats)) {
return true;
}
}
} else if (page_type == PageType::DATA_PAGE_V2) {
const format::DataPageHeaderV2& header = current_page_header_.data_page_header_v2;
CheckNumValuesInHeader(header.num_values);
if (header.num_rows < 0) {
throw ParquetException("Invalid page header (negative number of rows)");
}
if (header.definition_levels_byte_length < 0 ||
header.repetition_levels_byte_length < 0) {
throw ParquetException("Invalid page header (negative levels byte length)");
}
*data_page_statistics = ExtractStatsFromHeader(header);
seen_num_values_ += header.num_values;
if (data_page_filter_) {
const EncodedStatistics* filter_statistics =
data_page_statistics->is_set() ? data_page_statistics : nullptr;
DataPageStats data_page_stats(filter_statistics, header.num_values,
header.num_rows);
if (data_page_filter_(data_page_stats)) {
return true;
}
}
} else if (page_type == PageType::DICTIONARY_PAGE) {
const format::DictionaryPageHeader& dict_header =
current_page_header_.dictionary_page_header;
CheckNumValuesInHeader(dict_header.num_values);
} else {
// We don't know what this page type is. We're allowed to skip non-data
// pages.
return true;
}
return false;
}

std::shared_ptr<Page> SerializedPageReader::NextPage() {
ThriftDeserializer deserializer(properties_);

Expand Down Expand Up @@ -391,6 +451,12 @@ std::shared_ptr<Page> SerializedPageReader::NextPage() {
throw ParquetException("Invalid page header");
}

EncodedStatistics data_page_statistics;
if (ShouldSkipPage(&data_page_statistics)) {
PARQUET_THROW_NOT_OK(stream_->Advance(compressed_len));
continue;
}

if (crypto_ctx_.data_decryptor != nullptr) {
UpdateDecryption(crypto_ctx_.data_decryptor, encryption::kDictionaryPage,
&data_page_aad_);
Expand All @@ -416,19 +482,14 @@ std::shared_ptr<Page> SerializedPageReader::NextPage() {
page_buffer = decryption_buffer_;
}

// Uncompress and construct the pages to return.
const PageType::type page_type = LoadEnumSafe(&current_page_header_.type);

if (page_type == PageType::DICTIONARY_PAGE) {
crypto_ctx_.start_decrypt_with_dictionary_page = false;
const format::DictionaryPageHeader& dict_header =
current_page_header_.dictionary_page_header;

bool is_sorted = dict_header.__isset.is_sorted ? dict_header.is_sorted : false;
if (dict_header.num_values < 0) {
throw ParquetException("Invalid page header (negative number of values)");
}

// Uncompress if needed
page_buffer =
DecompressIfNeeded(std::move(page_buffer), compressed_len, uncompressed_len);

Expand All @@ -438,39 +499,22 @@ std::shared_ptr<Page> SerializedPageReader::NextPage() {
} else if (page_type == PageType::DATA_PAGE) {
++page_ordinal_;
const format::DataPageHeader& header = current_page_header_.data_page_header;

if (header.num_values < 0) {
throw ParquetException("Invalid page header (negative number of values)");
}
EncodedStatistics page_statistics = ExtractStatsFromHeader(header);
seen_num_values_ += header.num_values;

// Uncompress if needed
page_buffer =
DecompressIfNeeded(std::move(page_buffer), compressed_len, uncompressed_len);

return std::make_shared<DataPageV1>(page_buffer, header.num_values,
LoadEnumSafe(&header.encoding),
LoadEnumSafe(&header.definition_level_encoding),
LoadEnumSafe(&header.repetition_level_encoding),
uncompressed_len, page_statistics);
uncompressed_len, data_page_statistics);
} else if (page_type == PageType::DATA_PAGE_V2) {
++page_ordinal_;
const format::DataPageHeaderV2& header = current_page_header_.data_page_header_v2;

if (header.num_values < 0) {
throw ParquetException("Invalid page header (negative number of values)");
}
if (header.definition_levels_byte_length < 0 ||
header.repetition_levels_byte_length < 0) {
throw ParquetException("Invalid page header (negative levels byte length)");
}
// Arrow prior to 3.0.0 set is_compressed to false but still compressed.
bool is_compressed =
(header.__isset.is_compressed ? header.is_compressed : false) ||
always_compressed_;
EncodedStatistics page_statistics = ExtractStatsFromHeader(header);
seen_num_values_ += header.num_values;

// Uncompress if needed
int levels_byte_len;
Expand All @@ -489,11 +533,10 @@ std::shared_ptr<Page> SerializedPageReader::NextPage() {
page_buffer, header.num_values, header.num_nulls, header.num_rows,
LoadEnumSafe(&header.encoding), header.definition_levels_byte_length,
header.repetition_levels_byte_length, uncompressed_len, is_compressed,
page_statistics);
data_page_statistics);
} else {
// We don't know what this page type is. We're allowed to skip non-data
// pages.
continue;
throw ParquetException(
"Internal error, we have already skipped non-data pages in ShouldSkipPage()");
}
}
return std::shared_ptr<Page>(nullptr);
Expand Down
39 changes: 39 additions & 0 deletions cpp/src/parquet/column_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

#include "parquet/exception.h"
#include "parquet/level_conversion.h"
#include "parquet/metadata.h"
#include "parquet/platform.h"
#include "parquet/properties.h"
#include "parquet/schema.h"
Expand Down Expand Up @@ -55,6 +56,26 @@ static constexpr uint32_t kDefaultMaxPageHeaderSize = 16 * 1024 * 1024;
// 16 KB is the default expected page header size
static constexpr uint32_t kDefaultPageHeaderSize = 16 * 1024;

// \brief DataPageStats stores encoded statistics and number of values/rows for
// a page.
struct PARQUET_EXPORT DataPageStats {
DataPageStats(const EncodedStatistics* encoded_statistics, int32_t num_values,
std::optional<int32_t> num_rows)
: encoded_statistics(encoded_statistics),
num_values(num_values),
num_rows(num_rows) {}

// Encoded statistics extracted from the page header.
// Nullptr if there are no statistics in the page header.
const EncodedStatistics* encoded_statistics;
// Number of values stored in the page. Filled for both V1 and V2 data pages.
// For repeated fields, this can be greater than number of rows. For
// non-repeated fields, this will be the same as the number of rows.
int32_t num_values;
// Number of rows stored in the page. std::nullopt if not available.
std::optional<int32_t> num_rows;
};

class PARQUET_EXPORT LevelDecoder {
public:
LevelDecoder();
Expand Down Expand Up @@ -100,6 +121,8 @@ struct CryptoContext {
// Abstract page iterator interface. This way, we can feed column pages to the
// ColumnReader through whatever mechanism we choose
class PARQUET_EXPORT PageReader {
using DataPageFilter = std::function<bool(const DataPageStats&)>;

public:
virtual ~PageReader() = default;

Expand All @@ -115,11 +138,27 @@ class PARQUET_EXPORT PageReader {
bool always_compressed = false,
const CryptoContext* ctx = NULLPTR);

// If data_page_filter is present (not null), NextPage() will call the
// callback function exactly once per page in the order the pages appear in
// the column. If the callback function returns true the page will be
// skipped. The callback will be called only if the page type is DATA_PAGE or
// DATA_PAGE_V2. Dictionary pages will not be skipped.
// Caller is responsible for checking that statistics are correct using
// ApplicationVersion::HasCorrectStatistics().
// \note API EXPERIMENTAL
void set_data_page_filter(DataPageFilter data_page_filter) {
data_page_filter_ = std::move(data_page_filter);
}

// @returns: shared_ptr<Page>(nullptr) on EOS, std::shared_ptr<Page>
// containing new Page otherwise
virtual std::shared_ptr<Page> NextPage() = 0;

virtual void set_max_page_header_size(uint32_t size) = 0;

protected:
// Callback that decides if we should skip a page or not.
DataPageFilter data_page_filter_;
};

class PARQUET_EXPORT ColumnReader {
Expand Down
Loading

0 comments on commit 97998d8

Please sign in to comment.