Skip to content

Commit

Permalink
Storage: row ID generation for MVCC bitmap filter (#6458)
Browse files Browse the repository at this point in the history
ref #6296
  • Loading branch information
JinheLin authored Jan 31, 2023
1 parent 288a763 commit 4a118fc
Show file tree
Hide file tree
Showing 39 changed files with 1,802 additions and 79 deletions.
1 change: 1 addition & 0 deletions dbms/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ add_headers_and_sources(dbms src/Interpreters/ClusterProxy)
add_headers_and_sources(dbms src/Columns)
add_headers_and_sources(dbms src/Storages)
add_headers_and_sources(dbms src/Storages/DeltaMerge)
add_headers_and_sources(dbms src/Storages/DeltaMerge/BitmapFilter)
add_headers_and_sources(dbms src/Storages/DeltaMerge/Index)
add_headers_and_sources(dbms src/Storages/DeltaMerge/Filter)
add_headers_and_sources(dbms src/Storages/DeltaMerge/FilterParser)
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Common/CurrentMetrics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
M(DT_SnapshotOfDeltaMerge) \
M(DT_SnapshotOfDeltaCompact) \
M(DT_SnapshotOfPlaceIndex) \
M(DT_SnapshotOfBitmapFilter) \
M(IOLimiterPendingBgWriteReq) \
M(IOLimiterPendingFgWriteReq) \
M(IOLimiterPendingBgReadReq) \
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Core/Block.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ size_t Block::rows() const
if (elem.column)
return elem.column->size();

return 0;
return segment_row_id_col != nullptr ? segment_row_id_col->size() : 0;
}


Expand Down
15 changes: 13 additions & 2 deletions dbms/src/Core/Block.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,13 @@ class Block
Container data;
IndexByName index_by_name;

// `start_offset` is the offset of first row in this Block.
// It is used for calculating `segment_row_id`.
UInt64 start_offset = 0;
// `segment_row_id_col` is a virtual column that represents the records' row id in the corresponding segment.
// Only used for calculating MVCC-bitmap-filter.
ColumnPtr segment_row_id_col;

public:
BlockInfo info;

Expand Down Expand Up @@ -110,8 +117,8 @@ class Block
/// Approximate number of allocated bytes in memory - for profiling and limits.
size_t allocatedBytes() const;

explicit operator bool() const { return !data.empty(); }
bool operator!() const { return data.empty(); }
explicit operator bool() const { return !data.empty() || segment_row_id_col != nullptr; }
bool operator!() const { return data.empty() && segment_row_id_col == nullptr; }

/** Get a list of column names separated by commas. */
std::string dumpNames() const;
Expand Down Expand Up @@ -149,6 +156,10 @@ class Block
*/
void updateHash(SipHash & hash) const;

void setStartOffset(UInt64 offset) { start_offset = offset; }
UInt64 startOffset() const { return start_offset; }
void setSegmentRowIdCol(ColumnPtr && col) { segment_row_id_col = col; }
ColumnPtr segmentRowIdCol() const { return segment_row_id_col; }

private:
void eraseImpl(size_t position);
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Interpreters/Settings.h
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,7 @@ struct Settings
\
M(SettingDouble, dt_page_gc_threshold, 0.5, "Max valid rate of deciding to do a GC in PageStorage") \
M(SettingBool, dt_enable_read_thread, true, "Enable storage read thread or not") \
M(SettingBool, dt_enable_bitmap_filter, true, "Use bitmap filter to read data or not") \
\
M(SettingChecksumAlgorithm, dt_checksum_algorithm, ChecksumAlgo::XXH3, "Checksum algorithm for delta tree stable storage") \
M(SettingCompressionMethod, dt_compression_method, CompressionMethod::LZ4, "The method of data compression when writing.") \
Expand Down
120 changes: 120 additions & 0 deletions dbms/src/Storages/DeltaMerge/BitmapFilter/BitmapFilter.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
// Copyright 2022 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <Storages/DeltaMerge/BitmapFilter/BitmapFilter.h>
#include <Storages/DeltaMerge/DeltaMergeHelpers.h>
#include <Storages/DeltaMerge/Segment.h>

namespace DB::DM
{
BitmapFilter::BitmapFilter(UInt32 size_, bool default_value)
: filter(size_, default_value)
, all_match(default_value)
{}

void BitmapFilter::set(BlockInputStreamPtr & stream)
{
stream->readPrefix();
for (;;)
{
FilterPtr f = nullptr;
auto blk = stream->read(f, /*res_filter*/ true);
if (likely(blk))
{
set(blk.segmentRowIdCol(), f);
}
else
{
break;
}
}
stream->readSuffix();
}

void BitmapFilter::set(const ColumnPtr & col, const FilterPtr & f)
{
const auto * v = toColumnVectorDataPtr<UInt32>(col);
set(v->data(), v->size(), f);
}

void BitmapFilter::set(const UInt32 * data, UInt32 size, const FilterPtr & f)
{
if (size == 0)
{
return;
}
if (!f)
{
for (UInt32 i = 0; i < size; i++)
{
UInt32 row_id = *(data + i);
filter[row_id] = true;
}
}
else
{
RUNTIME_CHECK(size == f->size(), size, f->size());
for (UInt32 i = 0; i < size; i++)
{
UInt32 row_id = *(data + i);
filter[row_id] = (*f)[i];
}
}
}

void BitmapFilter::set(UInt32 start, UInt32 limit)
{
RUNTIME_CHECK(start + limit <= filter.size(), start, limit, filter.size());
std::fill(filter.begin() + start, filter.begin() + start + limit, true);
}

bool BitmapFilter::get(IColumn::Filter & f, UInt32 start, UInt32 limit) const
{
RUNTIME_CHECK(start + limit <= filter.size(), start, limit, filter.size());
auto begin = filter.cbegin() + start;
auto end = filter.cbegin() + start + limit;
if (all_match || std::find(begin, end, false) == end)
{
return true;
}
else
{
std::copy(begin, end, f.begin());
return false;
}
}

void BitmapFilter::runOptimize()
{
all_match = std::find(filter.begin(), filter.end(), false) == filter.end();
}

String BitmapFilter::toDebugString() const
{
String s(filter.size(), '1');
for (UInt32 i = 0; i < filter.size(); i++)
{
if (!filter[i])
{
s[i] = '0';
}
}
return fmt::format("{}", s);
}

size_t BitmapFilter::count() const
{
return std::count(filter.cbegin(), filter.cend(), true);
}
} // namespace DB::DM
46 changes: 46 additions & 0 deletions dbms/src/Storages/DeltaMerge/BitmapFilter/BitmapFilter.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
// Copyright 2022 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include <Columns/IColumn.h>
#include <DataStreams/IBlockInputStream.h>

namespace DB::DM
{

class BitmapFilter
{
public:
BitmapFilter(UInt32 size_, bool default_value);

void set(BlockInputStreamPtr & stream);
void set(const ColumnPtr & col, const FilterPtr & f);
void set(const UInt32 * data, UInt32 size, const FilterPtr & f);
void set(UInt32 start, UInt32 limit);
// If return true, all data is match and do not fill the filter.
bool get(IColumn::Filter & f, UInt32 start, UInt32 limit) const;

void runOptimize();

String toDebugString() const;
size_t count() const;

private:
std::vector<bool> filter;
bool all_match;
};

using BitmapFilterPtr = std::shared_ptr<BitmapFilter>;
} // namespace DB::DM
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
// Copyright 2022 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <Storages/DeltaMerge/BitmapFilter/BitmapFilter.h>
#include <Storages/DeltaMerge/BitmapFilter/BitmapFilterBlockInputStream.h>
#include <Storages/DeltaMerge/DeltaMergeHelpers.h>

namespace DB::DM
{
BitmapFilterBlockInputStream::BitmapFilterBlockInputStream(
const ColumnDefines & columns_to_read,
BlockInputStreamPtr stable_,
BlockInputStreamPtr delta_,
size_t stable_rows_,
const BitmapFilterPtr & bitmap_filter_,
const String & req_id_)
: header(toEmptyBlock(columns_to_read))
, stable(stable_)
, delta(delta_)
, stable_rows(stable_rows_)
, bitmap_filter(bitmap_filter_)
, log(Logger::get(NAME, req_id_))
{}

Block BitmapFilterBlockInputStream::readImpl(FilterPtr & res_filter, bool return_filter)
{
auto [block, from_delta] = readBlock();
if (block)
{
if (from_delta)
{
block.setStartOffset(block.startOffset() + stable_rows);
}

filter.resize(block.rows());
bool all_match = bitmap_filter->get(filter, block.startOffset(), block.rows());
if (!all_match)
{
if (return_filter)
{
res_filter = &filter;
}
else
{
for (auto & col : block)
{
col.column = col.column->filter(filter, block.rows());
}
}
}
else
{
res_filter = nullptr;
}
}
return block;
}

// <Block, from_delta>
std::pair<Block, bool> BitmapFilterBlockInputStream::readBlock()
{
if (stable == nullptr && delta == nullptr)
{
return {{}, false};
}

if (stable == nullptr)
{
return {delta->read(), true};
}

auto block = stable->read();
if (block)
{
return {block, false};
}
else
{
stable = nullptr;
if (delta != nullptr)
{
block = delta->read();
}
return {block, true};
}
}

} // namespace DB::DM
Loading

0 comments on commit 4a118fc

Please sign in to comment.