Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Storage: optimize the overhead of late materialization #8730

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion dbms/src/Common/FailPoint.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,6 @@ namespace DB
M(force_use_dmfile_format_v3) \
M(force_set_mocked_s3_object_mtime) \
M(force_stop_background_checkpoint_upload) \
M(skip_seek_before_read_dmfile) \
M(exception_after_large_write_exceed) \
M(proactive_flush_force_set_type) \
M(exception_when_fetch_disagg_pages) \
Expand Down
38 changes: 23 additions & 15 deletions dbms/src/Core/Block.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -531,7 +531,7 @@ static ReturnType checkBlockStructure(const Block & lhs, const Block & rhs, cons
}
}

return ReturnType(true);
return static_cast<ReturnType>(true);
}

/// join blocks by columns
Expand Down Expand Up @@ -561,23 +561,31 @@ Block hstackBlocks(Blocks && blocks, const Block & header)
template <bool check_reserve>
Block vstackBlocks(Blocks && blocks)
{
if (blocks.empty())
return vstackBlocks<Blocks::const_iterator, check_reserve>(blocks.cbegin(), blocks.cend());
}

/// join blocks by rows
/// Note: blocks in original container will be moved.
template <typename Iter, bool check_reserve>
Block vstackBlocks(Iter begin, Iter end)
{
if (begin == end)
{
return {};
}

if (blocks.size() == 1)
if (std::next(begin) == end)
{
return std::move(blocks[0]);
return std::move(*begin);
}

size_t result_rows = 0;
for (const auto & block : blocks)
for (auto it = begin; it != end; ++it)
{
result_rows += block.rows();
result_rows += it->rows();
}

auto & first_block = blocks.front();
auto & first_block = *begin;
MutableColumns dst_columns(first_block.columns());

for (size_t i = 0; i < first_block.columns(); ++i)
Expand All @@ -588,9 +596,9 @@ Block vstackBlocks(Blocks && blocks)
else
{
size_t total_memory = 0;
for (const auto & block : blocks)
for (auto it = begin; it != end; ++it)
{
total_memory += block.getByPosition(i).column->byteSize();
total_memory += it->getByPosition(i).column->byteSize();
}
dst_columns[i]->reserveWithTotalMemoryHint(result_rows, total_memory);
}
Expand All @@ -602,14 +610,14 @@ Block vstackBlocks(Blocks && blocks)
total_allocated_bytes += column->allocatedBytes();
}

for (size_t i = 1; i < blocks.size(); ++i)
for (auto it = std::next(begin); it != end; ++it)
{
if (likely(blocks[i].rows() > 0))
if (likely(it->rows() > 0))
{
assert(blocksHaveEqualStructure(first_block, blocks[i]));
for (size_t idx = 0; idx < blocks[i].columns(); ++idx)
assert(blocksHaveEqualStructure(first_block, *it));
for (size_t idx = 0; idx < it->columns(); ++idx)
{
dst_columns[idx]->insertRangeFrom(*blocks[i].getByPosition(idx).column, 0, blocks[i].rows());
dst_columns[idx]->insertRangeFrom(*it->getByPosition(idx).column, 0, it->rows());
}
}
}
Expand Down Expand Up @@ -739,5 +747,5 @@ void Block::updateHash(SipHash & hash) const

template Block vstackBlocks<false>(Blocks && blocks);
template Block vstackBlocks<true>(Blocks && blocks);

template Block vstackBlocks<Blocks::const_iterator, false>(Blocks::const_iterator begin, Blocks::const_iterator end);
} // namespace DB
3 changes: 3 additions & 0 deletions dbms/src/Core/Block.h
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,9 @@ Block hstackBlocks(Blocks && blocks, const Block & header);
template <bool check_reserve = false>
Block vstackBlocks(Blocks && blocks);

template <typename Iter, bool check_reserve>
Block vstackBlocks(Iter begin, Iter end);

Block popBlocksListFront(BlocksList & blocks);

/// Compare number of columns, data types, column types, column names, and values of constant columns.
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Storages/DeltaMerge/BitmapFilter/BitmapFilter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,10 @@ void BitmapFilter::set(const UInt32 * data, UInt32 size, const FilterPtr & f)
}
}

void BitmapFilter::set(UInt32 start, UInt32 limit)
void BitmapFilter::set(UInt32 start, UInt32 limit, bool value)
{
RUNTIME_CHECK(start + limit <= filter.size(), start, limit, filter.size());
std::fill(filter.begin() + start, filter.begin() + start + limit, true);
std::fill(filter.begin() + start, filter.begin() + start + limit, value);
}

bool BitmapFilter::get(IColumn::Filter & f, UInt32 start, UInt32 limit) const
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/DeltaMerge/BitmapFilter/BitmapFilter.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ class BitmapFilter
void set(BlockInputStreamPtr & stream);
void set(const ColumnPtr & col, const FilterPtr & f);
void set(const UInt32 * data, UInt32 size, const FilterPtr & f);
void set(UInt32 start, UInt32 limit);
void set(UInt32 start, UInt32 limit, bool value = true);
// If return true, all data is match and do not fill the filter.
bool get(IColumn::Filter & f, UInt32 start, UInt32 limit) const;
// filter[start, limit] & f -> f
Expand Down
5 changes: 1 addition & 4 deletions dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFile.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,9 @@ class ColumnFile
: id(++MAX_COLUMN_FILE_ID)
{}

public:
virtual ~ColumnFile() = default;

public:
enum Type : UInt32
{
DELETE_RANGE = 1,
Expand Down Expand Up @@ -173,9 +173,6 @@ class ColumnFileReader
/// This method is only used to read raw data.
virtual Block readNextBlock() { throw Exception("Unsupported operation", ErrorCodes::LOGICAL_ERROR); }

/// This method used to skip next block.
virtual size_t skipNextBlock() { throw Exception("Unsupported operation", ErrorCodes::LOGICAL_ERROR); }

/// Create a new reader from current reader with different columns to read.
virtual ColumnFileReaderPtr createNewReader(const ColumnDefinesPtr & col_defs) = 0;
};
Expand Down
18 changes: 0 additions & 18 deletions dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileBig.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -370,24 +370,6 @@ Block ColumnFileBigReader::readNextBlock()
}
}

size_t ColumnFileBigReader::skipNextBlock()
{
initStream();

if (pk_ver_only)
{
if (next_block_index_in_cache >= cached_pk_ver_columns.size())
{
return 0;
}
return cached_pk_ver_columns[next_block_index_in_cache++].front()->size();
}
else
{
return file_stream->skipNextBlock();
}
}

ColumnFileReaderPtr ColumnFileBigReader::createNewReader(const ColumnDefinesPtr & new_col_defs)
{
// Currently we don't reuse the cache data.
Expand Down
2 changes: 0 additions & 2 deletions dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileBig.h
Original file line number Diff line number Diff line change
Expand Up @@ -188,8 +188,6 @@ class ColumnFileBigReader : public ColumnFileReader

Block readNextBlock() override;

size_t skipNextBlock() override;

ColumnFileReaderPtr createNewReader(const ColumnDefinesPtr & new_col_defs) override;
};

Expand Down
10 changes: 0 additions & 10 deletions dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileInMemory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -145,16 +145,6 @@ Block ColumnFileInMemoryReader::readNextBlock()
return genBlock(*col_defs, columns);
}


size_t ColumnFileInMemoryReader::skipNextBlock()
{
if (read_done)
return 0;

read_done = true;
return memory_file.getRows();
}

ColumnFileReaderPtr ColumnFileInMemoryReader::createNewReader(const ColumnDefinesPtr & new_col_defs)
{
// Reuse the cache data.
Expand Down
2 changes: 0 additions & 2 deletions dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileInMemory.h
Original file line number Diff line number Diff line change
Expand Up @@ -136,8 +136,6 @@ class ColumnFileInMemoryReader : public ColumnFileReader

Block readNextBlock() override;

size_t skipNextBlock() override;

ColumnFileReaderPtr createNewReader(const ColumnDefinesPtr & new_col_defs) override;
};

Expand Down
26 changes: 0 additions & 26 deletions dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileSetReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -115,32 +115,6 @@ class ColumnFileSetInputStream : public SkippableBlockInputStream

bool getSkippedRows(size_t &) override { throw Exception("Not implemented", ErrorCodes::NOT_IMPLEMENTED); }

size_t skipNextBlock() override
{
while (cur_column_file_reader || next_file_index < column_files_count)
{
if (!cur_column_file_reader)
{
if (column_files[next_file_index]->isDeleteRange())
{
++next_file_index;
continue;
}
else
{
cur_column_file_reader = reader.column_file_readers[next_file_index];
++next_file_index;
}
}
size_t skipped_rows = cur_column_file_reader->skipNextBlock();
if (skipped_rows > 0)
return skipped_rows;
else
cur_column_file_reader = {};
}
return 0;
}

Block read() override
{
while (cur_column_file_reader || next_file_index < column_files_count)
Expand Down
9 changes: 0 additions & 9 deletions dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileTiny.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -390,15 +390,6 @@ Block ColumnFileTinyReader::readNextBlock()
return genBlock(*col_defs, columns);
}

size_t ColumnFileTinyReader::skipNextBlock()
{
if (read_done)
return 0;

read_done = true;
return tiny_file.getRows();
}

ColumnFileReaderPtr ColumnFileTinyReader::createNewReader(const ColumnDefinesPtr & new_col_defs)
{
// Reuse the cache data.
Expand Down
2 changes: 0 additions & 2 deletions dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileTiny.h
Original file line number Diff line number Diff line change
Expand Up @@ -229,8 +229,6 @@ class ColumnFileTinyReader : public ColumnFileReader

Block readNextBlock() override;

size_t skipNextBlock() override;

ColumnFileReaderPtr createNewReader(const ColumnDefinesPtr & new_col_defs) override;
};
} // namespace DM
Expand Down
56 changes: 23 additions & 33 deletions dbms/src/Storages/DeltaMerge/Delta/DeltaValueSpace.h
Original file line number Diff line number Diff line change
Expand Up @@ -502,49 +502,39 @@ class DeltaValueInputStream : public SkippableBlockInputStream

bool getSkippedRows(size_t &) override { throw Exception("Not implemented", ErrorCodes::NOT_IMPLEMENTED); }

/// Skip next block in the stream.
/// Return the number of rows skipped.
/// Return 0 if meet the end of the stream.
size_t skipNextBlock() override
{
size_t skipped_rows = 0;
if (persisted_files_done)
{
skipped_rows = mem_table_input_stream.skipNextBlock();
read_rows += skipped_rows;
return skipped_rows;
}

if (skipped_rows = persisted_files_input_stream.skipNextBlock(); skipped_rows > 0)
{
read_rows += skipped_rows;
return skipped_rows;
}
else
{
persisted_files_done = true;
skipped_rows = mem_table_input_stream.skipNextBlock();
read_rows += skipped_rows;
return skipped_rows;
}
}

Block readWithFilter(const IColumn::Filter & filter) override
{
auto block = read();
if (size_t passed_count = countBytesInFilter(filter); passed_count != block.rows())
// LateMaterializationBlockInputStream will only read 16 blocks at most from the filter column,
// and since the block size in delta layer is small,
// we can stack all the blocks in memory and filter them together.
size_t total_rows = 0;
Blocks blocks;
do
{
auto block = doRead();
if (!block)
break;
total_rows += block.rows();
blocks.emplace_back(std::move(block));
} while (total_rows < filter.size());
if (blocks.empty())
return {};
auto block = vstackBlocks(std::move(blocks));
block.setStartOffset(read_rows);
read_rows += block.rows();
size_t passed_count = countBytesInFilter(filter);
for (auto & col : block)
{
for (auto & col : block)
{
col.column = col.column->filter(filter, passed_count);
}
col.column = col.column->filter(filter, passed_count);
}
return block;
}

Block read() override
{
auto block = doRead();
if (!block)
return {};
block.setStartOffset(read_rows);
read_rows += block.rows();
return block;
Expand Down
1 change: 0 additions & 1 deletion dbms/src/Storages/DeltaMerge/DeltaMerge.h
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,6 @@ class DeltaMergeBlockInputStream final
return true;
}

size_t skipNextBlock() override { throw Exception("Not implemented", ErrorCodes::NOT_IMPLEMENTED); }
Block readWithFilter(const IColumn::Filter &) override
{
throw Exception("Not implemented", ErrorCodes::NOT_IMPLEMENTED);
Expand Down
2 changes: 0 additions & 2 deletions dbms/src/Storages/DeltaMerge/File/DMFileBlockInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,6 @@ class DMFileBlockInputStream : public SkippableBlockInputStream

bool getSkippedRows(size_t & skip_rows) override { return reader.getSkippedRows(skip_rows); }

size_t skipNextBlock() override { return reader.skipNextBlock(); }

Block read() override { return reader.read(); }

Block readWithFilter(const IColumn::Filter & filter) override { return reader.readWithFilter(filter); }
Expand Down
Loading