Skip to content

Commit

Permalink
Revert "trying to remove internal for RecordReader"
Browse files Browse the repository at this point in the history
This reverts commit 79361dd.
  • Loading branch information
mapleFU committed Jul 3, 2024
1 parent 08265f2 commit 1063a01
Show file tree
Hide file tree
Showing 10 changed files with 39 additions and 30 deletions.
2 changes: 1 addition & 1 deletion cpp/src/parquet/arrow/reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ using arrow::internal::Iota;
// Help reduce verbosity
using ParquetReader = parquet::ParquetFileReader;

using parquet::RecordReader;
using parquet::internal::RecordReader;

namespace bit_util = arrow::bit_util;

Expand Down
6 changes: 3 additions & 3 deletions cpp/src/parquet/arrow/reader_internal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,9 @@ using ::arrow::internal::SafeLeftShift;
using ::arrow::util::Float16;
using ::arrow::util::SafeLoadAs;

using parquet::BinaryRecordReader;
using parquet::DictionaryRecordReader;
using parquet::RecordReader;
using parquet::internal::BinaryRecordReader;
using parquet::internal::DictionaryRecordReader;
using parquet::internal::RecordReader;
using parquet::schema::GroupNode;
using parquet::schema::Node;
using parquet::schema::PrimitiveNode;
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/parquet/arrow/reader_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ class FileColumnIterator {
using FileColumnIteratorFactory =
std::function<FileColumnIterator*(int, ParquetFileReader*)>;

Status TransferColumnData(::parquet::RecordReader* reader,
Status TransferColumnData(::parquet::internal::RecordReader* reader,
const std::shared_ptr<::arrow::Field>& value_field,
const ColumnDescriptor* descr, ::arrow::MemoryPool* pool,
std::shared_ptr<::arrow::ChunkedArray>* out);
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/parquet/arrow/test_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@

namespace parquet {

using RecordReader;
using internal::RecordReader;

namespace arrow {

Expand Down
4 changes: 4 additions & 0 deletions cpp/src/parquet/column_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,8 @@ class TypedColumnReader : public ColumnReader {
int32_t* dict_len) = 0;
};

namespace internal {

/// \brief Stateful column reader that delimits semantic records for both flat
/// and nested columns
///
Expand Down Expand Up @@ -484,6 +486,8 @@ class DictionaryRecordReader : virtual public RecordReader {
virtual std::shared_ptr<::arrow::ChunkedArray> GetResult() = 0;
};

} // namespace internal

using BoolReader = TypedColumnReader<BooleanType>;
using Int32Reader = TypedColumnReader<Int32Type>;
using Int64Reader = TypedColumnReader<Int64Type>;
Expand Down
8 changes: 4 additions & 4 deletions cpp/src/parquet/column_reader_benchmark.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@
namespace parquet {

using benchmark::DoNotOptimize;
using parquet::RecordReader;
using parquet::Repetition;
using parquet::internal::RecordReader;
using parquet::test::MakePages;
using schema::NodePtr;

Expand Down Expand Up @@ -72,9 +72,9 @@ class BenchmarkHelper {
internal::LevelInfo level_info;
level_info.def_level = descr_->max_definition_level();
level_info.rep_level = descr_->max_repetition_level();
record_reader_ =
RecordReader::Make(descr_.get(), level_info, ::arrow::default_memory_pool(),
/*read_dictionary=*/false, read_dense_for_nullable);
record_reader_ = internal::RecordReader::Make(
descr_.get(), level_info, ::arrow::default_memory_pool(),
/*read_dictionary=*/false, read_dense_for_nullable);
record_reader_->SetPageReader(std::move(pager));
return record_reader_.get();
}
Expand Down
27 changes: 14 additions & 13 deletions cpp/src/parquet/column_reader_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -753,9 +753,9 @@ class RecordReaderPrimitiveTypeTest
NodePtr root = GroupNode::Make("root", Repetition::REQUIRED, {column});
schema_descriptor_.Init(root);
descr_ = schema_descriptor_.Column(0);
record_reader_ = RecordReader::Make(descr_, ComputeLevelInfo(descr_),
::arrow::default_memory_pool(),
/*read_dictionary=*/false, GetParam());
record_reader_ = internal::RecordReader::Make(descr_, ComputeLevelInfo(descr_),
::arrow::default_memory_pool(),
/*read_dictionary=*/false, GetParam());
}

void CheckReadValues(std::vector<int32_t> expected_values,
Expand Down Expand Up @@ -796,7 +796,7 @@ class RecordReaderPrimitiveTypeTest

protected:
SchemaDescriptor schema_descriptor_;
std::shared_ptr<RecordReader> record_reader_;
std::shared_ptr<internal::RecordReader> record_reader_;
const ColumnDescriptor* descr_;
};

Expand Down Expand Up @@ -1492,9 +1492,9 @@ class FLBARecordReaderTest : public ::testing::TestWithParam<bool> {
MakePages<FLBAType>(descr_.get(), num_pages, levels_per_page, def_levels_,
rep_levels_, values_, buffer_, pages_, Encoding::PLAIN);
auto pager = std::make_unique<MockPageReader>(pages_);
record_reader_ =
RecordReader::Make(descr_.get(), level_info, ::arrow::default_memory_pool(),
/*read_dictionary=*/false, read_dense_for_nullable());
record_reader_ = internal::RecordReader::Make(
descr_.get(), level_info, ::arrow::default_memory_pool(),
/*read_dictionary=*/false, read_dense_for_nullable());
record_reader_->SetPageReader(std::move(pager));
}

Expand Down Expand Up @@ -1554,7 +1554,7 @@ class FLBARecordReaderTest : public ::testing::TestWithParam<bool> {
}

protected:
std::shared_ptr<RecordReader> record_reader_;
std::shared_ptr<internal::RecordReader> record_reader_;

private:
int levels_per_page_;
Expand Down Expand Up @@ -1586,9 +1586,9 @@ class ByteArrayRecordReaderTest : public ::testing::TestWithParam<bool> {

auto pager = std::make_unique<MockPageReader>(pages_);

record_reader_ =
RecordReader::Make(descr_.get(), level_info, ::arrow::default_memory_pool(),
/*read_dictionary=*/false, read_dense_for_nullable());
record_reader_ = internal::RecordReader::Make(
descr_.get(), level_info, ::arrow::default_memory_pool(),
/*read_dictionary=*/false, read_dense_for_nullable());
record_reader_->SetPageReader(std::move(pager));
}

Expand Down Expand Up @@ -1648,7 +1648,7 @@ class ByteArrayRecordReaderTest : public ::testing::TestWithParam<bool> {
}

protected:
std::shared_ptr<RecordReader> record_reader_;
std::shared_ptr<internal::RecordReader> record_reader_;

private:
int levels_per_page_;
Expand Down Expand Up @@ -1777,7 +1777,8 @@ TEST_P(RecordReaderStressTest, StressTest) {
pager.reset(new test::MockPageReader(pages));

// Set up the RecordReader.
std::shared_ptr<RecordReader> record_reader = RecordReader::Make(&descr, level_info);
std::shared_ptr<internal::RecordReader> record_reader =
internal::RecordReader::Make(&descr, level_info);
record_reader->SetPageReader(std::move(pager));

// Figure out how many total records.
Expand Down
7 changes: 4 additions & 3 deletions cpp/src/parquet/file_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,8 @@ std::shared_ptr<ColumnReader> RowGroupReader::Column(int i) {
const_cast<ReaderProperties*>(contents_->properties())->memory_pool());
}

std::shared_ptr<RecordReader> RowGroupReader::RecordReader(int i, bool read_dictionary) {
std::shared_ptr<internal::RecordReader> RowGroupReader::RecordReader(
int i, bool read_dictionary) {
if (i >= metadata()->num_columns()) {
std::stringstream ss;
ss << "Trying to read column index " << i << " but row group metadata has only "
Expand All @@ -124,7 +125,7 @@ std::shared_ptr<RecordReader> RowGroupReader::RecordReader(int i, bool read_dict

internal::LevelInfo level_info = internal::LevelInfo::ComputeLevelInfo(descr);

auto reader = RecordReader::Make(
auto reader = internal::RecordReader::Make(
descr, level_info, contents_->properties()->memory_pool(), read_dictionary,
contents_->properties()->read_dense_for_nullable());
reader->SetPageReader(std::move(page_reader));
Expand All @@ -144,7 +145,7 @@ std::shared_ptr<ColumnReader> RowGroupReader::ColumnWithExposeEncoding(
return reader;
}

std::shared_ptr<RecordReader> RowGroupReader::RecordReaderWithExposeEncoding(
std::shared_ptr<internal::RecordReader> RowGroupReader::RecordReaderWithExposeEncoding(
int i, ExposedEncoding encoding_to_expose) {
return RecordReader(
i,
Expand Down
7 changes: 5 additions & 2 deletions cpp/src/parquet/file_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,9 @@ class BloomFilterReader;
class PageReader;
class RowGroupMetaData;

namespace internal {
class RecordReader;
}

class PARQUET_EXPORT RowGroupReader {
public:
Expand All @@ -62,7 +64,8 @@ class PARQUET_EXPORT RowGroupReader {

// EXPERIMENTAL: Construct a RecordReader for the indicated column of the row group.
// Ownership is shared with the RowGroupReader.
std::shared_ptr<RecordReader> RecordReader(int i, bool read_dictionary = false);
std::shared_ptr<internal::RecordReader> RecordReader(int i,
bool read_dictionary = false);

// Construct a ColumnReader, trying to enable exposed encoding.
//
Expand All @@ -87,7 +90,7 @@ class PARQUET_EXPORT RowGroupReader {
// reader will read decoded data without exposing the dictionary.
//
// \note API EXPERIMENTAL
std::shared_ptr<RecordReader> RecordReaderWithExposeEncoding(
std::shared_ptr<internal::RecordReader> RecordReaderWithExposeEncoding(
int i, ExposedEncoding encoding_to_expose);

std::unique_ptr<PageReader> GetColumnPageReader(int i);
Expand Down
4 changes: 2 additions & 2 deletions cpp/src/parquet/reader_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -598,7 +598,7 @@ TEST(TestFileReader, RecordReaderReadDenseForNullable) {
std::unique_ptr<ParquetFileReader> file_reader = ParquetFileReader::OpenFile(
alltypes_plain(), /* memory_map = */ false, reader_props);
std::shared_ptr<RowGroupReader> group = file_reader->RowGroup(0);
std::shared_ptr<RecordReader> col_record_reader = group->RecordReader(0);
std::shared_ptr<internal::RecordReader> col_record_reader = group->RecordReader(0);
ASSERT_EQ(reader_props.read_dense_for_nullable(),
col_record_reader->read_dense_for_nullable());
}
Expand All @@ -611,7 +611,7 @@ TEST(TestFileReader, GetRecordReader) {
alltypes_plain(), /* memory_map = */ false, reader_props);
std::shared_ptr<RowGroupReader> group = file_reader->RowGroup(0);

std::shared_ptr<RecordReader> col_record_reader_ = group->RecordReader(0);
std::shared_ptr<internal::RecordReader> col_record_reader_ = group->RecordReader(0);

ASSERT_TRUE(col_record_reader_->HasMoreData());
auto records_read = col_record_reader_->ReadRecords(4);
Expand Down

0 comments on commit 1063a01

Please sign in to comment.