Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Storage: Refactor DMFileReader #8854

Merged
merged 28 commits into from
Apr 3, 2024
Merged
Show file tree
Hide file tree
Changes from 25 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
b03deeb
refactor
Lloyd-Pottiger Mar 20, 2024
cb7bcd0
address
Lloyd-Pottiger Mar 25, 2024
1c3dbf6
address
Lloyd-Pottiger Mar 26, 2024
38e747a
address
Lloyd-Pottiger Mar 26, 2024
1a5b563
move to private
Lloyd-Pottiger Mar 29, 2024
225f6d0
address comments
Lloyd-Pottiger Mar 29, 2024
396902f
address comments
Lloyd-Pottiger Mar 29, 2024
b680779
address comments
Lloyd-Pottiger Apr 1, 2024
009c646
address comments
Lloyd-Pottiger Apr 1, 2024
9eed282
save some comments
JaySon-Huang Apr 1, 2024
a835e5e
address comments
Lloyd-Pottiger Apr 1, 2024
c7e6e07
address comments
Lloyd-Pottiger Apr 1, 2024
86b2e23
Merge pull request #3 from JaySon-Huang/refactor-dmfile
Lloyd-Pottiger Apr 1, 2024
844ed57
address comments
Lloyd-Pottiger Apr 1, 2024
b042950
Change unordered_set to vector
JinheLin Apr 2, 2024
6b4dc35
Merge pull request #4 from JinheLin/unordered_set_to_vector
Lloyd-Pottiger Apr 2, 2024
5afbe88
fix
Lloyd-Pottiger Apr 2, 2024
ecafc55
Decrease indents of readColumn to make code more readable.
JinheLin Apr 2, 2024
8ef2d6e
Merge pull request #5 from JinheLin/refine_read_column
Lloyd-Pottiger Apr 2, 2024
cde32ec
Remove unused header
JinheLin Apr 2, 2024
7447ccc
Merge pull request #6 from JinheLin/rm_useless_header
Lloyd-Pottiger Apr 2, 2024
9458001
remove must_seek
Lloyd-Pottiger Apr 2, 2024
17a7ddf
format
Lloyd-Pottiger Apr 2, 2024
3f561d4
Merge branch 'master' into refactor-dmfile
ti-chi-bot[bot] Apr 2, 2024
8a07ffd
Merge branch 'master' into refactor-dmfile
ti-chi-bot[bot] Apr 2, 2024
1fe9d18
fix tidy
Lloyd-Pottiger Apr 3, 2024
c4a9594
Merge branch 'master' into refactor-dmfile
Lloyd-Pottiger Apr 3, 2024
eff91be
Merge branch 'master' into refactor-dmfile
Lloyd-Pottiger Apr 3, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 16 additions & 22 deletions dbms/src/Storages/DeltaMerge/BitmapFilter/BitmapFilter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include <Common/Exception.h>
#include <Storages/DeltaMerge/BitmapFilter/BitmapFilter.h>
#include <Storages/DeltaMerge/DeltaMergeHelpers.h>
#include <Storages/DeltaMerge/Segment.h>
Expand All @@ -30,53 +31,46 @@ void BitmapFilter::set(BlockInputStreamPtr & stream)
{
FilterPtr f = nullptr;
auto blk = stream->read(f, /*res_filter*/ true);
if (likely(blk))
{
set(blk.segmentRowIdCol(), f);
}
else
if (unlikely(!blk))
{
break;
}

const auto & row_ids_col = blk.segmentRowIdCol();
const auto * v = toColumnVectorDataPtr<UInt32>(row_ids_col);
assert(v != nullptr); // the segmentRowIdCol must be a UInt32 column
set(std::span{v->data(), v->size()}, f);
}
stream->readSuffix();
}

void BitmapFilter::set(const ColumnPtr & col, const FilterPtr & f)
{
const auto * v = toColumnVectorDataPtr<UInt32>(col);
set(v->data(), v->size(), f);
}

void BitmapFilter::set(const UInt32 * data, UInt32 size, const FilterPtr & f)
void BitmapFilter::set(std::span<const UInt32> row_ids, const FilterPtr & f)
{
if (size == 0)
if (row_ids.empty())
{
return;
}
if (!f)
{
for (UInt32 i = 0; i < size; i++)
for (auto row_id : row_ids)
{
UInt32 row_id = *(data + i);
filter[row_id] = true;
}
}
else
{
RUNTIME_CHECK(size == f->size(), size, f->size());
for (UInt32 i = 0; i < size; i++)
RUNTIME_CHECK(row_ids.size() == f->size(), row_ids.size(), f->size());
for (UInt32 i = 0; i < row_ids.size(); i++)
{
UInt32 row_id = *(data + i);
filter[row_id] = (*f)[i];
filter[row_ids[i]] = (*f)[i];
}
}
}

void BitmapFilter::set(UInt32 start, UInt32 limit)
void BitmapFilter::set(UInt32 start, UInt32 limit, bool value)
{
RUNTIME_CHECK(start + limit <= filter.size(), start, limit, filter.size());
std::fill(filter.begin() + start, filter.begin() + start + limit, true);
std::fill_n(filter.begin() + start, limit, value);
}

bool BitmapFilter::get(IColumn::Filter & f, UInt32 start, UInt32 limit) const
Expand Down Expand Up @@ -127,4 +121,4 @@ size_t BitmapFilter::count() const
{
return std::count(filter.cbegin(), filter.cend(), true);
}
} // namespace DB::DM
} // namespace DB::DM
15 changes: 10 additions & 5 deletions dbms/src/Storages/DeltaMerge/BitmapFilter/BitmapFilter.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
#include <Columns/IColumn.h>
#include <DataStreams/IBlockInputStream.h>

#include <span>

namespace DB::DM
{

Expand All @@ -25,13 +27,14 @@ class BitmapFilter
public:
BitmapFilter(UInt32 size_, bool default_value);

// Read blocks from `stream` and set the rows_id to be true according to the
// `segmentRowIdCol` in the block read from `stream`.
void set(BlockInputStreamPtr & stream);
void set(const ColumnPtr & col, const FilterPtr & f);
void set(const UInt32 * data, UInt32 size, const FilterPtr & f);
void set(UInt32 start, UInt32 limit);
// f[start, satrt+limit) = value
void set(UInt32 start, UInt32 limit, bool value = true);
// If return true, all data is match and do not fill the filter.
bool get(IColumn::Filter & f, UInt32 start, UInt32 limit) const;
// filter[start, limit] & f -> f
// filter[start, satrt+limit) & f -> f
void rangeAnd(IColumn::Filter & f, UInt32 start, UInt32 limit) const;

void runOptimize();
Expand All @@ -40,9 +43,11 @@ class BitmapFilter
size_t count() const;

private:
void set(std::span<const UInt32> row_ids, const FilterPtr & f);

std::vector<bool> filter;
bool all_match;
};

using BitmapFilterPtr = std::shared_ptr<BitmapFilter>;
} // namespace DB::DM
} // namespace DB::DM
61 changes: 53 additions & 8 deletions dbms/src/Storages/DeltaMerge/File/ColumnCache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,15 @@ namespace DB
{
namespace DM
{
RangeWithStrategys ColumnCache::getReadStrategy(size_t pack_id, size_t pack_count, ColId column_id)
RangeWithStrategys ColumnCache::getReadStrategy(size_t start_pack_idx, size_t pack_count, ColId column_id)
{
PackRange target_range{pack_id, pack_id + pack_count};
PackRange target_range{start_pack_idx, start_pack_idx + pack_count};

RangeWithStrategys range_and_strategys;

Strategy strategy = Strategy::Unknown;
range_and_strategys.reserve(pack_count);
auto strategy = Strategy::Unknown;
size_t range_start = 0;
for (size_t cursor = target_range.first; cursor < target_range.second; cursor++)
for (size_t cursor = target_range.first; cursor < target_range.second; ++cursor)
{
if (isPackInCache(cursor, column_id))
{
Expand All @@ -36,7 +36,7 @@ RangeWithStrategys ColumnCache::getReadStrategy(size_t pack_id, size_t pack_coun
}
else if (strategy == Strategy::Disk)
{
range_and_strategys.emplace_back(std::make_pair(PackRange{range_start, cursor}, Strategy::Disk));
range_and_strategys.emplace_back(PackRange{range_start, cursor}, Strategy::Disk);
}
range_start = cursor;
strategy = Strategy::Memory;
Expand All @@ -45,7 +45,7 @@ RangeWithStrategys ColumnCache::getReadStrategy(size_t pack_id, size_t pack_coun
{
if (strategy == Strategy::Memory)
{
range_and_strategys.emplace_back(std::make_pair(PackRange{range_start, cursor}, Strategy::Memory));
range_and_strategys.emplace_back(PackRange{range_start, cursor}, Strategy::Memory);
}
else if (strategy == Strategy::Disk)
{
Expand All @@ -55,8 +55,53 @@ RangeWithStrategys ColumnCache::getReadStrategy(size_t pack_id, size_t pack_coun
strategy = Strategy::Disk;
}
}
range_and_strategys.emplace_back(std::make_pair(PackRange{range_start, target_range.second}, strategy));
range_and_strategys.emplace_back(PackRange{range_start, target_range.second}, strategy);
range_and_strategys.shrink_to_fit();
return range_and_strategys;
}

RangeWithStrategys ColumnCache::getReadStrategy(
JinheLin marked this conversation as resolved.
Show resolved Hide resolved
size_t start_pack_idx,
size_t pack_count,
const std::vector<size_t> & clean_read_pack_idx)
{
PackRange target_range{start_pack_idx, start_pack_idx + pack_count};

RangeWithStrategys range_and_strategys;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Strategies

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

keep this, it means multiple RangeWithStrategy

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it is a good idea, Poco is an example.
image

However, I think it isn't a big matter, don't take this as a serious review. Just my point of view.

range_and_strategys.reserve(pack_count);
auto strategy = Strategy::Unknown;
size_t range_start = 0;
for (size_t cursor = target_range.first; cursor < target_range.second; ++cursor)
{
if (std::find(clean_read_pack_idx.cbegin(), clean_read_pack_idx.cend(), cursor) != clean_read_pack_idx.cend())
{
if (strategy == Strategy::Memory)
{
continue;
}
else if (strategy == Strategy::Disk)
{
range_and_strategys.emplace_back(PackRange{range_start, cursor}, Strategy::Disk);
}
range_start = cursor;
strategy = Strategy::Memory;
}
else
{
if (strategy == Strategy::Memory)
{
range_and_strategys.emplace_back(PackRange{range_start, cursor}, Strategy::Memory);
}
else if (strategy == Strategy::Disk)
{
continue;
}
range_start = cursor;
strategy = Strategy::Disk;
}
}
range_and_strategys.emplace_back(PackRange{range_start, target_range.second}, strategy);
range_and_strategys.shrink_to_fit();
return range_and_strategys;
}

Expand Down
6 changes: 5 additions & 1 deletion dbms/src/Storages/DeltaMerge/File/ColumnCache.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,11 @@ class ColumnCache

using RangeWithStrategy = std::pair<PackRange, ColumnCache::Strategy>;
using RangeWithStrategys = std::vector<RangeWithStrategy>;
RangeWithStrategys getReadStrategy(size_t pack_id, size_t pack_count, ColId column_id);
RangeWithStrategys getReadStrategy(size_t start_pack_idx, size_t pack_count, ColId column_id);
static RangeWithStrategys getReadStrategy(
size_t start_pack_idx,
size_t pack_count,
const std::vector<size_t> & clean_read_pack_idx);

void tryPutColumn(size_t pack_id, ColId column_id, const ColumnPtr & column, size_t rows_offset, size_t rows_count);

Expand Down
Loading