Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-37002: [C++][Parquet] Add api to get RecordReader from RowGroupReader #37003

Merged
merged 10 commits into from
Oct 17, 2023
19 changes: 1 addition & 18 deletions cpp/src/parquet/column_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -140,23 +140,6 @@ struct ValueBufferSlicer {
MemoryPool* pool_;
};

internal::LevelInfo ComputeLevelInfo(const ColumnDescriptor* descr) {
internal::LevelInfo level_info;
level_info.def_level = descr->max_definition_level();
level_info.rep_level = descr->max_repetition_level();

int16_t min_spaced_def_level = descr->max_definition_level();
const ::parquet::schema::Node* node = descr->schema_node().get();
while (node != nullptr && !node->is_repeated()) {
if (node->is_optional()) {
min_spaced_def_level--;
}
node = node->parent();
}
level_info.repeated_ancestor_def_level = min_spaced_def_level;
return level_info;
}

template <class T>
inline const T* AddIfNotNull(const T* base, int64_t offset) {
if (base != nullptr) {
Expand Down Expand Up @@ -738,7 +721,7 @@ class ColumnWriterImpl {
Encoding::type encoding, const WriterProperties* properties)
: metadata_(metadata),
descr_(metadata->descr()),
level_info_(ComputeLevelInfo(metadata->descr())),
level_info_(internal::LevelInfo::ComputeLevelInfo(metadata->descr())),
pager_(std::move(pager)),
has_dictionary_(use_dictionary),
encoding_(encoding),
Expand Down
20 changes: 20 additions & 0 deletions cpp/src/parquet/file_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,26 @@ std::shared_ptr<ColumnReader> RowGroupReader::Column(int i) {
const_cast<ReaderProperties*>(contents_->properties())->memory_pool());
}

std::shared_ptr<internal::RecordReader> RowGroupReader::RecordReader(int i) {
if (i >= metadata()->num_columns()) {
std::stringstream ss;
ss << "Trying to read column index " << i << " but row group metadata has only "
<< metadata()->num_columns() << " columns";
throw ParquetException(ss.str());
}
const ColumnDescriptor* descr = metadata()->schema()->Column(i);

std::unique_ptr<PageReader> page_reader = contents_->GetColumnPageReader(i);

internal::LevelInfo level_info = internal::LevelInfo::ComputeLevelInfo(descr);

auto reader = internal::RecordReader::Make(
descr, level_info, contents_->properties()->memory_pool(),
/* read_dictionary = */ false, contents_->properties()->read_dense_for_nullable());
reader->SetPageReader(std::move(page_reader));
return reader;
}

std::shared_ptr<ColumnReader> RowGroupReader::ColumnWithExposeEncoding(
int i, ExposedEncoding encoding_to_expose) {
std::shared_ptr<ColumnReader> reader = Column(i);
Expand Down
5 changes: 5 additions & 0 deletions cpp/src/parquet/file_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

#include "arrow/io/caching.h"
#include "arrow/util/type_fwd.h"
#include "parquet/column_reader.h"
wgtmac marked this conversation as resolved.
Show resolved Hide resolved
#include "parquet/metadata.h" // IWYU pragma: keep
#include "parquet/platform.h"
#include "parquet/properties.h"
Expand Down Expand Up @@ -58,6 +59,10 @@ class PARQUET_EXPORT RowGroupReader {
// column. Ownership is shared with the RowGroupReader.
std::shared_ptr<ColumnReader> Column(int i);

// Construct a RecordReader for the indicated row group-relative column i.
// Ownership is shared with the RowGroupReader.
wgtmac marked this conversation as resolved.
Show resolved Hide resolved
std::shared_ptr<internal::RecordReader> RecordReader(int i);

// Construct a ColumnReader, trying to enable exposed encoding.
//
// For dictionary encoding, currently we only support column chunks that are fully
Expand Down
18 changes: 18 additions & 0 deletions cpp/src/parquet/level_conversion.h
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,24 @@ struct PARQUET_EXPORT LevelInfo {
return last_repeated_ancestor;
}

// Calculates and returns LevelInfo for a column descriptor.
static LevelInfo ComputeLevelInfo(const ColumnDescriptor* descr) {
LevelInfo level_info;
level_info.def_level = descr->max_definition_level();
level_info.rep_level = descr->max_repetition_level();

int16_t min_spaced_def_level = descr->max_definition_level();
const ::parquet::schema::Node* node = descr->schema_node().get();
while (node && !node->is_repeated()) {
if (node->is_optional()) {
min_spaced_def_level--;
}
node = node->parent();
}
level_info.repeated_ancestor_def_level = min_spaced_def_level;
return level_info;
}

friend std::ostream& operator<<(std::ostream& os, const LevelInfo& levels) {
// This print method is to silence valgrind issues. What's printed
// is not important because all asserts happen directly on
Expand Down
6 changes: 6 additions & 0 deletions cpp/src/parquet/properties.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,10 @@ class PARQUET_EXPORT ReaderProperties {
/// Disable buffered stream reading.
void disable_buffered_stream() { buffered_stream_enabled_ = false; }

bool read_dense_for_nullable() const { return read_dense_for_nullable_; }
void enable_read_dense_for_nullable() { read_dense_for_nullable_ = true; }
void disable_read_dense_for_nullable() { read_dense_for_nullable_ = false; }

/// Return the size of the buffered stream buffer.
int64_t buffer_size() const { return buffer_size_; }
/// Set the size of the buffered stream buffer in bytes.
Expand Down Expand Up @@ -123,6 +127,8 @@ class PARQUET_EXPORT ReaderProperties {
int32_t thrift_container_size_limit_ = kDefaultThriftContainerSizeLimit;
bool buffered_stream_enabled_ = false;
bool page_checksum_verification_ = false;
// Used with a RecordReader.
bool read_dense_for_nullable_ = false;
std::shared_ptr<FileDecryptionProperties> file_decryption_properties_;
};

Expand Down
19 changes: 19 additions & 0 deletions cpp/src/parquet/reader_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -502,6 +502,25 @@ TEST_F(TestAllTypesPlain, ColumnSelectionOutOfRange) {
ASSERT_THROW(printer2.DebugPrint(ss, columns), ParquetException);
}

// Tests getting a record reader from a row group reader.
TEST(TestFileReader, GetRecordReader) {
ReaderProperties reader_props;
reader_props.enable_read_dense_for_nullable();
std::unique_ptr<ParquetFileReader> file_reader = ParquetFileReader::OpenFile(
alltypes_plain(), /* memory_map = */ false, reader_props);
std::shared_ptr<RowGroupReader> group = file_reader->RowGroup(0);

std::shared_ptr<internal::RecordReader> col_record_reader_ = group->RecordReader(0);

ASSERT_TRUE(col_record_reader_->read_dense_for_nullable());
fatemehp marked this conversation as resolved.
Show resolved Hide resolved
ASSERT_TRUE(col_record_reader_->HasMoreData());
auto records_read = col_record_reader_->ReadRecords(4);
ASSERT_EQ(records_read, 4);
ASSERT_EQ(4, col_record_reader_->values_written());
ASSERT_EQ(4, col_record_reader_->levels_position());
ASSERT_EQ(8, col_record_reader_->levels_written());
}

class TestLocalFile : public ::testing::Test {
public:
void SetUp() {
Expand Down
Loading