Skip to content

Commit

Permalink
apacheGH-37002: [C++][Parquet] Add api to get RecordReader from RowGr…
Browse files Browse the repository at this point in the history
…oupReader (apache#37003)

Currently we only can get a ColumnReader for a column from RowGroupReader. We need an API to return a RecordReader for the column.

Moved ComputeLevelInfo from column_writer to level_conversion so that it can be shared between column_writer and file_reader.
* Closes: apache#37002

Lead-authored-by: Fatemah Panahi <[email protected]>
Co-authored-by: Fatemah Panahi <[email protected]>
Signed-off-by: Gang Wu <[email protected]>
  • Loading branch information
fatemehp authored and dgreiss committed Feb 17, 2024
1 parent 1a5f138 commit 5f06b6a
Show file tree
Hide file tree
Showing 6 changed files with 89 additions and 18 deletions.
19 changes: 1 addition & 18 deletions cpp/src/parquet/column_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -140,23 +140,6 @@ struct ValueBufferSlicer {
MemoryPool* pool_;
};

internal::LevelInfo ComputeLevelInfo(const ColumnDescriptor* descr) {
internal::LevelInfo level_info;
level_info.def_level = descr->max_definition_level();
level_info.rep_level = descr->max_repetition_level();

int16_t min_spaced_def_level = descr->max_definition_level();
const ::parquet::schema::Node* node = descr->schema_node().get();
while (node != nullptr && !node->is_repeated()) {
if (node->is_optional()) {
min_spaced_def_level--;
}
node = node->parent();
}
level_info.repeated_ancestor_def_level = min_spaced_def_level;
return level_info;
}

template <class T>
inline const T* AddIfNotNull(const T* base, int64_t offset) {
if (base != nullptr) {
Expand Down Expand Up @@ -738,7 +721,7 @@ class ColumnWriterImpl {
Encoding::type encoding, const WriterProperties* properties)
: metadata_(metadata),
descr_(metadata->descr()),
level_info_(ComputeLevelInfo(metadata->descr())),
level_info_(internal::LevelInfo::ComputeLevelInfo(metadata->descr())),
pager_(std::move(pager)),
has_dictionary_(use_dictionary),
encoding_(encoding),
Expand Down
20 changes: 20 additions & 0 deletions cpp/src/parquet/file_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,26 @@ std::shared_ptr<ColumnReader> RowGroupReader::Column(int i) {
const_cast<ReaderProperties*>(contents_->properties())->memory_pool());
}

std::shared_ptr<internal::RecordReader> RowGroupReader::RecordReader(int i) {
if (i >= metadata()->num_columns()) {
std::stringstream ss;
ss << "Trying to read column index " << i << " but row group metadata has only "
<< metadata()->num_columns() << " columns";
throw ParquetException(ss.str());
}
const ColumnDescriptor* descr = metadata()->schema()->Column(i);

std::unique_ptr<PageReader> page_reader = contents_->GetColumnPageReader(i);

internal::LevelInfo level_info = internal::LevelInfo::ComputeLevelInfo(descr);

auto reader = internal::RecordReader::Make(
descr, level_info, contents_->properties()->memory_pool(),
/* read_dictionary = */ false, contents_->properties()->read_dense_for_nullable());
reader->SetPageReader(std::move(page_reader));
return reader;
}

std::shared_ptr<ColumnReader> RowGroupReader::ColumnWithExposeEncoding(
int i, ExposedEncoding encoding_to_expose) {
std::shared_ptr<ColumnReader> reader = Column(i);
Expand Down
8 changes: 8 additions & 0 deletions cpp/src/parquet/file_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ class BloomFilterReader;
class PageReader;
class RowGroupMetaData;

namespace internal {
class RecordReader;
}

class PARQUET_EXPORT RowGroupReader {
public:
// Forward declare a virtual class 'Contents' to aid dependency injection and more
Expand All @@ -58,6 +62,10 @@ class PARQUET_EXPORT RowGroupReader {
// column. Ownership is shared with the RowGroupReader.
std::shared_ptr<ColumnReader> Column(int i);

// EXPERIMENTAL: Construct a RecordReader for the indicated column of the row group.
// Ownership is shared with the RowGroupReader.
std::shared_ptr<internal::RecordReader> RecordReader(int i);

// Construct a ColumnReader, trying to enable exposed encoding.
//
// For dictionary encoding, currently we only support column chunks that are fully
Expand Down
18 changes: 18 additions & 0 deletions cpp/src/parquet/level_conversion.h
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,24 @@ struct PARQUET_EXPORT LevelInfo {
return last_repeated_ancestor;
}

// Calculates and returns LevelInfo for a column descriptor.
static LevelInfo ComputeLevelInfo(const ColumnDescriptor* descr) {
LevelInfo level_info;
level_info.def_level = descr->max_definition_level();
level_info.rep_level = descr->max_repetition_level();

int16_t min_spaced_def_level = descr->max_definition_level();
const ::parquet::schema::Node* node = descr->schema_node().get();
while (node && !node->is_repeated()) {
if (node->is_optional()) {
min_spaced_def_level--;
}
node = node->parent();
}
level_info.repeated_ancestor_def_level = min_spaced_def_level;
return level_info;
}

friend std::ostream& operator<<(std::ostream& os, const LevelInfo& levels) {
// This print method is to silence valgrind issues. What's printed
// is not important because all asserts happen directly on
Expand Down
6 changes: 6 additions & 0 deletions cpp/src/parquet/properties.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,10 @@ class PARQUET_EXPORT ReaderProperties {
/// Disable buffered stream reading.
void disable_buffered_stream() { buffered_stream_enabled_ = false; }

bool read_dense_for_nullable() const { return read_dense_for_nullable_; }
void enable_read_dense_for_nullable() { read_dense_for_nullable_ = true; }
void disable_read_dense_for_nullable() { read_dense_for_nullable_ = false; }

/// Return the size of the buffered stream buffer.
int64_t buffer_size() const { return buffer_size_; }
/// Set the size of the buffered stream buffer in bytes.
Expand Down Expand Up @@ -123,6 +127,8 @@ class PARQUET_EXPORT ReaderProperties {
int32_t thrift_container_size_limit_ = kDefaultThriftContainerSizeLimit;
bool buffered_stream_enabled_ = false;
bool page_checksum_verification_ = false;
// Used with a RecordReader.
bool read_dense_for_nullable_ = false;
std::shared_ptr<FileDecryptionProperties> file_decryption_properties_;
};

Expand Down
36 changes: 36 additions & 0 deletions cpp/src/parquet/reader_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -502,6 +502,42 @@ TEST_F(TestAllTypesPlain, ColumnSelectionOutOfRange) {
ASSERT_THROW(printer2.DebugPrint(ss, columns), ParquetException);
}

// Tests that read_dense_for_nullable is passed down to the record
// reader. The functionality of read_dense_for_nullable is tested
// elsewhere.
TEST(TestFileReader, RecordReaderReadDenseForNullable) {
// We test the default which is false, and also test enabling and disabling
// read_dense_for_nullable.
std::vector<ReaderProperties> reader_properties(3);
reader_properties[1].enable_read_dense_for_nullable();
reader_properties[2].disable_read_dense_for_nullable();
for (const auto& reader_props : reader_properties) {
std::unique_ptr<ParquetFileReader> file_reader = ParquetFileReader::OpenFile(
alltypes_plain(), /* memory_map = */ false, reader_props);
std::shared_ptr<RowGroupReader> group = file_reader->RowGroup(0);
std::shared_ptr<internal::RecordReader> col_record_reader = group->RecordReader(0);
ASSERT_EQ(reader_props.read_dense_for_nullable(),
col_record_reader->read_dense_for_nullable());
}
}

// Tests getting a record reader from a row group reader.
TEST(TestFileReader, GetRecordReader) {
ReaderProperties reader_props;
std::unique_ptr<ParquetFileReader> file_reader = ParquetFileReader::OpenFile(
alltypes_plain(), /* memory_map = */ false, reader_props);
std::shared_ptr<RowGroupReader> group = file_reader->RowGroup(0);

std::shared_ptr<internal::RecordReader> col_record_reader_ = group->RecordReader(0);

ASSERT_TRUE(col_record_reader_->HasMoreData());
auto records_read = col_record_reader_->ReadRecords(4);
ASSERT_EQ(records_read, 4);
ASSERT_EQ(4, col_record_reader_->values_written());
ASSERT_EQ(4, col_record_reader_->levels_position());
ASSERT_EQ(8, col_record_reader_->levels_written());
}

class TestLocalFile : public ::testing::Test {
public:
void SetUp() {
Expand Down

0 comments on commit 5f06b6a

Please sign in to comment.