Skip to content

Commit

Permalink
Cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
baibaichen committed Feb 28, 2024
1 parent 05fcf76 commit f3da025
Show file tree
Hide file tree
Showing 6 changed files with 14 additions and 664 deletions.
280 changes: 7 additions & 273 deletions cpp-ch/local-engine/Storages/Parquet/VectorizedParquetRecordReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ ::arrow::io::ReadRange ComputeColumnChunkRange(

namespace local_engine
{
VectorizedColumnReader2::VectorizedColumnReader2(
VectorizedColumnReader::VectorizedColumnReader(
const parquet::arrow::SchemaField & field, ParquetFileReaderExtBase * reader, const std::vector<int32_t> & row_groups)
: arrowField_(field.field)
, input_(field.column_index, reader, row_groups)
Expand All @@ -95,7 +95,7 @@ VectorizedColumnReader2::VectorizedColumnReader2(
NextRowGroup();
}

void VectorizedColumnReader2::NextRowGroup()
void VectorizedColumnReader::NextRowGroup()
{
input_.NextChunkWithRowRange().and_then(
[&](ColumnChunkPageRead && read) -> std::optional<int64_t>
Expand All @@ -105,7 +105,7 @@ void VectorizedColumnReader2::NextRowGroup()
});
}

void VectorizedColumnReader2::SetPageReader(std::unique_ptr<parquet::PageReader> reader, const ReadSequence & read_sequence)
void VectorizedColumnReader::SetPageReader(std::unique_ptr<parquet::PageReader> reader, const ReadSequence & read_sequence)
{
if (read_state_)
{
Expand All @@ -121,7 +121,7 @@ void VectorizedColumnReader2::SetPageReader(std::unique_ptr<parquet::PageReader>
read_state_ = std::make_unique<ParquetReadState>(read_sequence);
}

std::shared_ptr<arrow::ChunkedArray> VectorizedColumnReader2::readBatch(int64_t batch_size)
std::shared_ptr<arrow::ChunkedArray> VectorizedColumnReader::readBatch(int64_t batch_size)
{
record_reader_->Reset();
record_reader_->Reserve(batch_size);
Expand Down Expand Up @@ -154,7 +154,7 @@ std::shared_ptr<arrow::ChunkedArray> VectorizedColumnReader2::readBatch(int64_t
return result;
}

VectorizedParquetRecordReader2::VectorizedParquetRecordReader2(const DB::Block & header, const DB::FormatSettings & format_settings)
VectorizedParquetRecordReader::VectorizedParquetRecordReader(const DB::Block & header, const DB::FormatSettings & format_settings)
: format_settings_(format_settings)
, arrowColumnToCHColumn_(
header,
Expand All @@ -166,7 +166,7 @@ VectorizedParquetRecordReader2::VectorizedParquetRecordReader2(const DB::Block &
{
}

bool VectorizedParquetRecordReader2::initialize(
bool VectorizedParquetRecordReader::initialize(
const DB::Block & header,
const std::shared_ptr<arrow::io::RandomAccessFile> & arrow_file,
const std::shared_ptr<ColumnIndexFilter> & column_index_filter,
Expand Down Expand Up @@ -221,7 +221,7 @@ bool VectorizedParquetRecordReader2::initialize(
return true;
}

DB::Chunk VectorizedParquetRecordReader2::nextBatch()
DB::Chunk VectorizedParquetRecordReader::nextBatch()
{
assert(initialized());
::arrow::ChunkedArrayVector columns(columnVectors_.size());
Expand All @@ -245,194 +245,6 @@ DB::Chunk VectorizedParquetRecordReader2::nextBatch()
return {};
}

VectorizedColumnReader::VectorizedColumnReader(
const int32_t column_index, const std::shared_ptr<arrow::Field> & field, const parquet::ColumnDescriptor * descr)
: column_index_(column_index)
, field_(field)
, descr_(descr)
, record_reader_(parquet::internal::RecordReader::Make(
descr_, ComputeLevelInfo(descr_), default_arrow_pool(), field_->type()->id() == ::arrow::Type::DICTIONARY))
{
}
void VectorizedColumnReader::SetPageReader(std::unique_ptr<parquet::PageReader> reader, const ReadSequence & read_sequence)
{
if (read_state_)
{
read_state_->hasLastSkip().and_then(
[&](const int64_t skip) -> std::optional<int64_t>
{
assert(skip < 0);
record_reader_->SkipRecords(-skip);
return std::nullopt;
});
}
record_reader_->SetPageReader(std::move(reader));
read_state_ = std::make_unique<ParquetReadState>(read_sequence);
}

void VectorizedColumnReader::prepareRead(const int64_t batch_size) const
{
assert(read_state_);
record_reader_->Reset();
record_reader_->Reserve(batch_size);
}

int64_t VectorizedColumnReader::readBatch(int64_t batch_size) const
{
while (read_state_->hasMoreRead() && batch_size > 0)
{
const int64_t readNumber = read_state_->currentRead();
if (readNumber < 0)
{
const int64_t records_skipped = record_reader_->SkipRecords(-readNumber);
assert(records_skipped == -readNumber);
read_state_->skip(records_skipped);
}
else
{
const int64_t readBatch = std::min(batch_size, readNumber);
const int64_t records_read = record_reader_->ReadRecords(readBatch);
assert(records_read == readBatch);
batch_size -= records_read;
read_state_->read(records_read);
}
}
return batch_size;
}

std::shared_ptr<arrow::ChunkedArray> VectorizedColumnReader::finishRead() const
{
std::shared_ptr<arrow::ChunkedArray> result;
THROW_ARROW_NOT_OK(parquet::arrow::TransferColumnData(record_reader_.get(), field_, descr_, default_arrow_pool(), &result));
return result;
}

VectorizedParquetRecordReader::VectorizedParquetRecordReader(const DB::Block & header, const DB::FormatSettings & format_settings)
: format_settings_(format_settings)
, arrowColumnToCHColumn_(
header,
"Parquet",
format_settings.parquet.allow_missing_columns,
format_settings.null_as_default,
format_settings.date_time_overflow_behavior,
format_settings.parquet.case_insensitive_column_matching)
{
}

void VectorizedParquetRecordReader::initialize(
const DB::Block & header,
const std::shared_ptr<arrow::io::RandomAccessFile> & arrow_file,
const std::shared_ptr<ColumnIndexFilter> & column_index_filter,
const std::shared_ptr<parquet::FileMetaData> & metadata)
{
auto file_reader = parquet::ParquetFileReader::Open(arrow_file, parquet::default_reader_properties(), metadata);
const parquet::ArrowReaderProperties properties;
const parquet::FileMetaData & file_metadata = *(file_reader->metadata());
const parquet::SchemaDescriptor * parquet_schema = file_metadata.schema();
const auto keyValueMetadata = file_metadata.key_value_metadata();
parquet::arrow::SchemaManifest manifest;
THROW_ARROW_NOT_OK(parquet::arrow::SchemaManifest::Make(parquet_schema, keyValueMetadata, properties, &manifest));
std::vector<std::shared_ptr<arrow::Field>> fields;
fields.reserve(manifest.schema_fields.size());
for (auto const & schema_field : manifest.schema_fields)
fields.emplace_back(schema_field.field);
const arrow::Schema schema(fields, keyValueMetadata);

/// column pruning
DB::ArrowFieldIndexUtil field_util(
format_settings_.parquet.case_insensitive_column_matching, format_settings_.parquet.allow_missing_columns);
const std::vector<int32_t> column_indices = field_util.findRequiredIndices(header, schema);
THROW_ARROW_NOT_OK_OR_ASSIGN(std::vector<int> field_indices, manifest.GetFieldIndices(column_indices));

/// row groups pruning
std::vector<int32_t> row_groups(
boost::counting_iterator<int32_t>(0), boost::counting_iterator<int32_t>(file_metadata.num_row_groups()));
if (!format_settings_.parquet.skip_row_groups.empty())
{
row_groups.erase(
std::ranges::remove_if(row_groups, [&](const int32_t i) { return format_settings_.parquet.skip_row_groups.contains(i); })
.begin(),
row_groups.end());
}

assert(!row_groups.empty());
parquetFileReader_
= std::make_unique<ParquetFileReaderExt>(arrow_file, std::move(file_reader), column_index_filter, row_groups, field_indices);
auto pageStores = parquetFileReader_->readFilteredRowGroups();
assert(pageStores);

columnVectors_.reserve(field_indices.size());

for (auto const & column_index : field_indices)
{
auto const & field = manifest.schema_fields[column_index];
assert(field.column_index >= 0);
assert(column_index == field.column_index);
columnVectors_.emplace_back(column_index, field.field, parquet_schema->Column(column_index));
auto & [page_reader, page_read_infos] = pageStores->find(column_index)->second;
assert(page_reader);
columnVectors_.back().SetPageReader(std::move(page_reader), page_read_infos);
}
}

DB::Chunk VectorizedParquetRecordReader::nextBatch()
{
assert(initialized());

int64_t batch = format_settings_.parquet.max_block_size;
for (auto & vectorized_column_reader : columnVectors_)
vectorized_column_reader.prepareRead(batch);
std::vector<int64_t> remain_read(columnVectors_.size());

do
{
for (size_t i = 0; i < columnVectors_.size(); ++i)
remain_read[i] += columnVectors_[i].readBatch(batch);

const int64_t remainReadReords = remain_read.back();
#ifndef NDEBUG
for (const auto & read_number : remain_read)
assert(read_number == remainReadReords);
#endif

if (remainReadReords == 0)
break;

batch = remainReadReords;
const auto pageStores = parquetFileReader_->readFilteredRowGroups();
if (!pageStores)
break;

for (auto & vectorized_column_reader : columnVectors_)
{
auto & [page_reader, page_read_infos] = pageStores->find(vectorized_column_reader.column_index())->second;
assert(page_reader);
vectorized_column_reader.SetPageReader(std::move(page_reader), page_read_infos);
}
} while (true);

::arrow::ChunkedArrayVector columns(columnVectors_.size());
DB::ArrowColumnToCHColumn::NameToColumnPtr name_to_column_ptr;
for (const auto & vectorized_column_reader : columnVectors_)
{
const std::shared_ptr<arrow::ChunkedArray> arrow_column = vectorized_column_reader.finishRead();

std::string column_name = vectorized_column_reader.field_->name();
if (format_settings_.parquet.case_insensitive_column_matching)
boost::to_lower(column_name);
name_to_column_ptr[column_name] = arrow_column;
}

const size_t num_rows = name_to_column_ptr.begin()->second->length();
if (num_rows > 0)
{
DB::Chunk result;
arrowColumnToCHColumn_.arrowColumnsToCHChunk(result, name_to_column_ptr, num_rows, nullptr);
return result;
}
return {};
}

ParquetFileReaderExtBase::ParquetFileReaderExtBase(
const std::shared_ptr<arrow::io::RandomAccessFile> & source,
std::unique_ptr<parquet::ParquetFileReader> parquetFileReader,
Expand Down Expand Up @@ -501,84 +313,6 @@ const ColumnIndexStore & ParquetFileReaderExtBase::getColumnIndexStore(const int
return *(row_group_column_index_stores_[row_group]);
}

ColumnChunkPageReadStorePtr
ParquetFileReaderExt::readRowGroupsBase(const parquet::RowGroupMetaData & rg, const BuildRead & build_read) const
{
const int columns = rg.num_columns();
auto result = std::make_unique<ColumnChunkPageReadStore>();
result->reserve(column_indices_.size());
for (int column_index = 0; column_index < columns; ++column_index)
{
if (!column_indices_.contains(column_index))
continue;
result->emplace(column_index, readColumnChunkPageBase(rg, column_index, build_read));
}
return result;
}

ColumnChunkPageReadStorePtr ParquetFileReaderExt::readFilteredRowGroups(
const parquet::RowGroupMetaData & rg, const RowRanges & row_ranges, const ColumnIndexStore & column_index_store) const
{
return readRowGroupsBase(
rg,
[&](const int32_t column_index, const arrow::io::ReadRange & col_range)
{
const auto * col_desc = rg.schema()->Column(column_index);
const ColumnIndex & index = *(column_index_store.find(col_desc->name())->second);
return buildRead(rg.num_rows(), col_range, index.GetOffsetIndex().page_locations(), row_ranges);
});
}

ColumnChunkPageReadStorePtr ParquetFileReaderExt::readRowGroups(const parquet::RowGroupMetaData & rg) const
{
return readRowGroupsBase(rg, [&](int32_t, const arrow::io::ReadRange & col_range) { return buildAllRead(rg.num_rows(), col_range); });
}

ParquetFileReaderExt::ParquetFileReaderExt(
const std::shared_ptr<arrow::io::RandomAccessFile> & source,
std::unique_ptr<parquet::ParquetFileReader> parquetFileReader,
const std::shared_ptr<ColumnIndexFilter> & column_index_filter,
const std::vector<int32_t> & row_groups,
const std::vector<int32_t> & column_indices)
: ParquetFileReaderExtBase(source, std::move(parquetFileReader), column_index_filter, column_indices)
, row_groups_(row_groups.begin(), row_groups.end())
{
}

ColumnChunkPageReadStorePtr ParquetFileReaderExt::readFilteredRowGroups()
{
while (hasMoreRead())
{
const int32_t row_group = row_groups_.front();
const auto rowGroup = RowGroup(row_group);

if (rowGroup->num_rows() == 0) // Skip Empty RowGroup
{
advanceRowGroup();
continue;
}

ColumnChunkPageReadStorePtr result;
if (canPruningPage(row_group))
{
const RowRanges & row_ranges = getRowRanges(row_group);
if (row_ranges.rowCount() == 0) // There are no matching rows in this row group -> skipping it
{
advanceRowGroup();
continue;
}
result = readFilteredRowGroups(*rowGroup, row_ranges, getColumnIndexStore(row_group));
}
else
{
result = readRowGroups(*rowGroup);
}
advanceRowGroup();
return result;
}
return nullptr;
}

/// input format
VectorizedParquetBlockInputFormat::VectorizedParquetBlockInputFormat(
DB::ReadBuffer & in_, const DB::Block & header_, const DB::FormatSettings & format_settings)
Expand Down
Loading

0 comments on commit f3da025

Please sign in to comment.